diff options
-rw-r--r-- | tvix/store/src/sled_directory_service.rs | 41 | ||||
-rw-r--r-- | tvix/store/src/tests/directory_service.rs | 50 |
2 files changed, 82 insertions, 9 deletions
diff --git a/tvix/store/src/sled_directory_service.rs b/tvix/store/src/sled_directory_service.rs index a966601fc423..3d56b8ef3e6a 100644 --- a/tvix/store/src/sled_directory_service.rs +++ b/tvix/store/src/sled_directory_service.rs @@ -53,6 +53,9 @@ fn send_directories( } } + // keep a list of all the Directory messages already sent, so we can omit sending the same. + let mut sent_directory_dgsts: HashSet<Vec<u8>> = HashSet::new(); + // look up the directory at the top of the queue while let Some(ref digest) = deq.pop_front() { let digest_b64: String = BASE64.encode(digest); @@ -87,9 +90,16 @@ fn send_directories( } // if recursion was requested, all its children need to be added to the queue. + // If a Directory message with the same digest has already + // been sent previously, we can skip enqueueing it. + // Same applies to when it already is in the queue. if req.recursive { for child_directory_node in &directory.directories { - deq.push_back(child_directory_node.digest.clone()); + if !sent_directory_dgsts.contains(&child_directory_node.digest) + && !deq.contains(&child_directory_node.digest) + { + deq.push_back(child_directory_node.digest.clone()); + } } } @@ -98,6 +108,8 @@ fn send_directories( debug!("error sending: {}", e); return Err(Status::internal("error sending")); } + + sent_directory_dgsts.insert(digest.to_vec()); } }, // some storage error? @@ -144,16 +156,27 @@ fn insert_directories( seen_directory_dgsts.insert(dgst.clone()); last_directory_dgst = Some(dgst.clone()); - // insert the directory into the database - if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) { - match e { - // TODO: conflict is a problem, as the whole transaction closure is retried? - sled::transaction::UnabortableTransactionError::Conflict => todo!(), - sled::transaction::UnabortableTransactionError::Storage(e) => { - warn!("storage error: {}", e); - return Err(Status::internal("storage error")); + // check if the directory already exists in the database. We can skip + // inserting if it's already there, as that'd be a no-op. + match txn.get(dgst.clone()) { + Ok(None) => { + // insert the directory into the database + if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) { + match e { + // TODO: conflict is a problem, as the whole transaction closure is retried? + sled::transaction::UnabortableTransactionError::Conflict => todo!(), + sled::transaction::UnabortableTransactionError::Storage(e) => { + warn!("storage error: {}", e); + return Err(Status::internal("storage error")); + } + } } } + Ok(Some(_)) => continue, + Err(e) => { + warn!("storage error: {}", e); + return Err(Status::internal("storage error")); + } } } diff --git a/tvix/store/src/tests/directory_service.rs b/tvix/store/src/tests/directory_service.rs index c16feb243066..38cc0897c2ab 100644 --- a/tvix/store/src/tests/directory_service.rs +++ b/tvix/store/src/tests/directory_service.rs @@ -19,6 +19,21 @@ lazy_static! { }], ..Default::default() }; + static ref DIRECTORY_C: Directory = Directory { + directories: vec![ + DirectoryNode { + name: "a".to_string(), + digest: DIRECTORY_A.digest(), + size: DIRECTORY_A.size(), + }, + DirectoryNode { + name: "a'".to_string(), + digest: DIRECTORY_A.digest(), + size: DIRECTORY_A.size(), + } + ], + ..Default::default() + }; } /// Send the specified GetDirectoryRequest. @@ -156,3 +171,38 @@ async fn put_get_multiple() -> anyhow::Result<()> { Ok(()) } + +/// Put multiple Directories into the store, and omit duplicates. +#[tokio::test] +async fn put_get_dedup() -> anyhow::Result<()> { + let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?; + + // Send "A", then "C", which refers to "A" two times + // Pretend we're a dumb client sending A twice. + let put_resp = service + .put(tonic_mock::streaming_request(vec![ + DIRECTORY_A.clone(), + DIRECTORY_A.clone(), + DIRECTORY_C.clone(), + ])) + .await + .expect("must succeed"); + + assert_eq!(DIRECTORY_C.digest(), put_resp.into_inner().root_digest); + + // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice. + let items = get_directories( + &service, + GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(DIRECTORY_C.digest())), + }, + ) + .await + .expect("must not error"); + + // We expect to get C, and then A (once, as the second A has been deduplicated). + assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items); + + Ok(()) +} |