diff --git a/justfiles/testing.just b/justfiles/testing.just index 785f948ec..07b0a6d25 100644 --- a/justfiles/testing.just +++ b/justfiles/testing.just @@ -1,5 +1,13 @@ # just commands for CI & local development +[group('testing')] +bench-rustdoc-page host="http://127.0.0.1:8888": + rm -rf ignored/cratesfyi-prefix/archive_cache/* + ab \ + -n 10000 \ + -c 500 \ + {{ host }}/rayon/1.11.0/rayon/ + [group('testing')] [group('sqlx')] sqlx-prepare *args: _ensure_db_and_s3_are_running diff --git a/src/config.rs b/src/config.rs index da4705955..2f27a9765 100644 --- a/src/config.rs +++ b/src/config.rs @@ -87,6 +87,21 @@ pub struct Config { // for the remote archives? pub(crate) local_archive_cache_path: PathBuf, + // expected number of entries in the local archive cache. + // Makes server restarts faster by preallocating some data structures. + // General numbers (as of 2025-12): + // * we have ~1.5 mio releases with archive storage (and 400k without) + // * each release has on average 2 archive files (rustdoc, source) + // so, over all, 3 mio archive index files in S3. + // + // While due to crawlers we will download _all_ of them over time, the old + // metric "releases accessed in the last 10 minutes" was around 50k, if I + // recall correctly. + // We're using a local DashMap to store some locks for these indexes, + // and we already know in advance we need these 50k entries. + // So we can preallocate the DashMap with this number to avoid resizes. + pub(crate) local_archive_cache_expected_count: usize, + // Where to collect metrics for the metrics initiative. // When empty, we won't collect metrics. pub(crate) compiler_metrics_collection_path: Option, @@ -214,6 +229,10 @@ impl Config { "DOCSRS_ARCHIVE_INDEX_CACHE_PATH", prefix.join("archive_cache"), )?)?) + .local_archive_cache_expected_count(env( + "DOCSRS_ARCHIVE_INDEX_EXPECTED_COUNT", + 100_000usize, + )?) .compiler_metrics_collection_path(maybe_env("DOCSRS_COMPILER_METRICS_PATH")?) .temp_dir(temp_dir) .rustwide_workspace(env( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b9c9f2537..264e07b1b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -40,13 +40,15 @@ use std::{ path::{Path, PathBuf}, str::FromStr, sync::Arc, + time::Duration, }; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt}, runtime, - sync::RwLock, + sync::Mutex, + time::sleep, }; -use tracing::{error, info_span, instrument, trace, warn}; +use tracing::{debug, error, info_span, instrument, trace, warn}; use walkdir::WalkDir; const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index"; @@ -258,8 +260,8 @@ enum StorageBackend { pub struct AsyncStorage { backend: StorageBackend, config: Arc, - /// Locks to synchronize access to the locally cached archive index files. - locks: DashMap>>, + /// Locks to synchronize write-access to the locally cached archive index files. + locks: DashMap>>, } impl AsyncStorage { @@ -279,8 +281,8 @@ impl AsyncStorage { StorageBackend::S3(Box::new(S3Backend::new(&config, otel_metrics).await?)) } }, + locks: DashMap::with_capacity(config.local_archive_cache_expected_count), config, - locks: DashMap::new(), }) } @@ -454,12 +456,12 @@ impl AsyncStorage { Ok(raw_stream.decompress().await?) } - fn local_index_cache_lock(&self, local_index_path: impl AsRef) -> Arc> { + fn local_index_cache_lock(&self, local_index_path: impl AsRef) -> Arc> { let local_index_path = local_index_path.as_ref().to_path_buf(); self.locks .entry(local_index_path) - .or_insert_with(|| Arc::new(RwLock::new(()))) + .or_insert_with(|| Arc::new(Mutex::new(()))) .downgrade() .clone() } @@ -478,7 +480,7 @@ impl AsyncStorage { let rwlock = self.local_index_cache_lock(&local_index_path); - let _write_guard = rwlock.write().await; + let _write_guard = rwlock.lock().await; if tokio::fs::try_exists(&local_index_path).await? { tokio::fs::remove_file(&local_index_path).await?; @@ -487,10 +489,52 @@ impl AsyncStorage { Ok(()) } + #[instrument(skip(self))] + async fn download_archive_index( + &self, + local_index_path: &Path, + remote_index_path: &str, + ) -> Result<()> { + let parent = local_index_path + .parent() + .ok_or_else(|| anyhow::anyhow!("index path without parent"))? + .to_path_buf(); + tokio::fs::create_dir_all(&parent).await?; + + // Create a unique temp file in the cache folder. + let (temp_file, mut temp_path) = spawn_blocking({ + let folder = self.config.local_archive_cache_path.clone(); + move || -> Result<_> { tempfile::NamedTempFile::new_in(&folder).map_err(Into::into) } + }) + .await? + .into_parts(); + + // Download into temp file. + let mut temp_file = tokio::fs::File::from_std(temp_file); + let mut stream = self.get_stream(remote_index_path).await?.content; + tokio::io::copy(&mut stream, &mut temp_file).await?; + temp_file.sync_all().await?; + + temp_path.disable_cleanup(true); + + // Publish atomically. + // Will replace any existing file. + tokio::fs::rename(&temp_path, local_index_path).await?; + + // fsync parent dir to make rename durable + spawn_blocking(move || { + let dir = std::fs::File::open(parent)?; + dir.sync_all().map_err(Into::into) + }) + .await?; + + Ok(()) + } + /// Find find the file into needed to fetch a certain path inside a remote archive. /// Will try to use a local cache of the index file, and otherwise download it /// from storage. - #[instrument] + #[instrument(skip(self))] async fn find_in_archive_index( &self, archive_path: &str, @@ -504,57 +548,63 @@ impl AsyncStorage { latest_build_id.map(|id| id.0).unwrap_or(0) )); - let rwlock = self.local_index_cache_lock(&local_index_path); + // fast path: try to use whatever is there, no locking + match archive_index::find_in_file(&local_index_path, path_in_archive).await { + Ok(res) => return Ok(res), + Err(err) => { + debug!(?err, "archive index lookup failed, will try repair."); + } + } - // directly acquire the read-lock, so the syscall (`path.exists()`) below is already - // protected. - let mut _read_guard = rwlock.read().await; - - if !tokio::fs::try_exists(&local_index_path).await? { - // upgrade the lock to a write-lock for downloading & storing the index. - drop(_read_guard); - let _write_guard = rwlock.write().await; - - // check existence again in case of Race Condition (TOCTOU) - if !tokio::fs::try_exists(&local_index_path).await? { - // remote/folder/and/x.zip.index - let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}"); - - let parent = local_index_path - .parent() - .ok_or_else(|| anyhow!("index path without parent"))? - .to_path_buf(); - tokio::fs::create_dir_all(&parent).await?; - - let mut temp_path = spawn_blocking({ - // this creates the tempfile and directly drops it again, - // just to return a valid temp-path. - // This could be optimized. - let folder = self.config.local_archive_cache_path.clone(); - move || Ok(tempfile::NamedTempFile::new_in(&folder)?.into_temp_path()) - }) - .await?; + let lock = self.local_index_cache_lock(&local_index_path); - let mut file = tokio::fs::File::create(&temp_path).await?; - let mut stream = self.get_stream(&remote_index_path).await?.content; - tokio::io::copy(&mut stream, &mut file).await?; - file.sync_all().await?; + // At this point we know the index is missing or broken. + // Try to become the "downloader" without queueing as a writer. + if let Ok(write_guard) = lock.try_lock() { + // Double-check: maybe someone fixed it between our first failure and now. + if let Ok(res) = archive_index::find_in_file(&local_index_path, path_in_archive).await { + return Ok(res); + } - temp_path.disable_cleanup(true); - tokio::fs::rename(&temp_path, &local_index_path).await?; + let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}"); - // fsync parent dir to make rename durable (blocking) - spawn_blocking(move || { - let dir = std::fs::File::open(parent)?; - dir.sync_all().map_err(Into::into) - }) + // We are the repairer: download fresh index into place. + self.download_archive_index(&local_index_path, &remote_index_path) .await?; - } - _read_guard = _write_guard.downgrade(); + // Write lock is dropped here (end of scope), so others can proceed. + drop(write_guard); + + // Final attempt: if this still fails, bubble the error. + return archive_index::find_in_file(local_index_path, path_in_archive).await; + } + + // Someone else is already downloading/repairing. Don't queue on write(); just wait + // a bit and poll the fast path until it becomes readable or we give up. + const STEP_MS: u64 = 10; + const ATTEMPTS: u64 = 50; // = 500ms total wait + const TOTAL_WAIT_MS: u64 = STEP_MS * ATTEMPTS; + + let mut last_err = None; + + for _ in 0..ATTEMPTS { + sleep(Duration::from_millis(STEP_MS)).await; + + match archive_index::find_in_file(local_index_path.clone(), path_in_archive).await { + Ok(res) => return Ok(res), + Err(err) => { + // keep waiting; repair may still be in progress + last_err = Some(err); + } + } } - archive_index::find_in_file(local_index_path, path_in_archive).await + // Still not usable after waiting: return the last error we saw. + Err(last_err + .unwrap_or_else(|| anyhow!("archive index unavailable after repair wait")) + .context(format!( + "no archive index after waiting for {TOTAL_WAIT_MS}ms" + ))) } #[instrument]