diff options
author | Florian Klink <flokli@flokli.de> | 2023-01-20T18·22+0100 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-01-21T10·48+0000 |
commit | d8e0fa8e5e67c7cc21a72cad545789914adcc798 (patch) | |
tree | 8f3b40faa89e13083855bb6708f11f6c520af302 /tvix/store/src/sled_directory_service.rs | |
parent | e719da53be01151a2a232c55b9f74d7fa09224a9 (diff) |
feat(tvix/store/directory): deduplicate Directory messages r/5723
We can omit sending Directory messages to clients that have already been sent in the same stream. We can also omit storing a Directory message if we already have it - they're content-addressed anyways. Change-Id: Iba44565e07157a83a033177a2ffbdddced64ba5c Reviewed-on: https://cl.tvl.fyi/c/depot/+/7881 Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/sled_directory_service.rs')
-rw-r--r-- | tvix/store/src/sled_directory_service.rs | 41 |
1 files changed, 32 insertions, 9 deletions
diff --git a/tvix/store/src/sled_directory_service.rs b/tvix/store/src/sled_directory_service.rs index a966601fc423..3d56b8ef3e6a 100644 --- a/tvix/store/src/sled_directory_service.rs +++ b/tvix/store/src/sled_directory_service.rs @@ -53,6 +53,9 @@ fn send_directories( } } + // keep a list of all the Directory messages already sent, so we can omit sending the same. + let mut sent_directory_dgsts: HashSet<Vec<u8>> = HashSet::new(); + // look up the directory at the top of the queue while let Some(ref digest) = deq.pop_front() { let digest_b64: String = BASE64.encode(digest); @@ -87,9 +90,16 @@ fn send_directories( } // if recursion was requested, all its children need to be added to the queue. + // If a Directory message with the same digest has already + // been sent previously, we can skip enqueueing it. + // Same applies to when it already is in the queue. if req.recursive { for child_directory_node in &directory.directories { - deq.push_back(child_directory_node.digest.clone()); + if !sent_directory_dgsts.contains(&child_directory_node.digest) + && !deq.contains(&child_directory_node.digest) + { + deq.push_back(child_directory_node.digest.clone()); + } } } @@ -98,6 +108,8 @@ fn send_directories( debug!("error sending: {}", e); return Err(Status::internal("error sending")); } + + sent_directory_dgsts.insert(digest.to_vec()); } }, // some storage error? @@ -144,16 +156,27 @@ fn insert_directories( seen_directory_dgsts.insert(dgst.clone()); last_directory_dgst = Some(dgst.clone()); - // insert the directory into the database - if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) { - match e { - // TODO: conflict is a problem, as the whole transaction closure is retried? - sled::transaction::UnabortableTransactionError::Conflict => todo!(), - sled::transaction::UnabortableTransactionError::Storage(e) => { - warn!("storage error: {}", e); - return Err(Status::internal("storage error")); + // check if the directory already exists in the database. We can skip + // inserting if it's already there, as that'd be a no-op. + match txn.get(dgst.clone()) { + Ok(None) => { + // insert the directory into the database + if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) { + match e { + // TODO: conflict is a problem, as the whole transaction closure is retried? + sled::transaction::UnabortableTransactionError::Conflict => todo!(), + sled::transaction::UnabortableTransactionError::Storage(e) => { + warn!("storage error: {}", e); + return Err(Status::internal("storage error")); + } + } } } + Ok(Some(_)) => continue, + Err(e) => { + warn!("storage error: {}", e); + return Err(Status::internal("storage error")); + } } } |