From d8e0fa8e5e67c7cc21a72cad545789914adcc798 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 20 Jan 2023 19:22:41 +0100 Subject: feat(tvix/store/directory): deduplicate Directory messages We can omit sending Directory messages to clients that have already been sent in the same stream. We can also omit storing a Directory message if we already have it - they're content-addressed anyways. Change-Id: Iba44565e07157a83a033177a2ffbdddced64ba5c Reviewed-on: https://cl.tvl.fyi/c/depot/+/7881 Reviewed-by: tazjin Tested-by: BuildkiteCI --- tvix/store/src/sled_directory_service.rs | 41 +++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 9 deletions(-) (limited to 'tvix/store/src/sled_directory_service.rs') diff --git a/tvix/store/src/sled_directory_service.rs b/tvix/store/src/sled_directory_service.rs index a966601fc423..3d56b8ef3e6a 100644 --- a/tvix/store/src/sled_directory_service.rs +++ b/tvix/store/src/sled_directory_service.rs @@ -53,6 +53,9 @@ fn send_directories( } } + // keep a list of all the Directory messages already sent, so we can omit sending the same. + let mut sent_directory_dgsts: HashSet> = HashSet::new(); + // look up the directory at the top of the queue while let Some(ref digest) = deq.pop_front() { let digest_b64: String = BASE64.encode(digest); @@ -87,9 +90,16 @@ fn send_directories( } // if recursion was requested, all its children need to be added to the queue. + // If a Directory message with the same digest has already + // been sent previously, we can skip enqueueing it. + // Same applies to when it already is in the queue. if req.recursive { for child_directory_node in &directory.directories { - deq.push_back(child_directory_node.digest.clone()); + if !sent_directory_dgsts.contains(&child_directory_node.digest) + && !deq.contains(&child_directory_node.digest) + { + deq.push_back(child_directory_node.digest.clone()); + } } } @@ -98,6 +108,8 @@ fn send_directories( debug!("error sending: {}", e); return Err(Status::internal("error sending")); } + + sent_directory_dgsts.insert(digest.to_vec()); } }, // some storage error? @@ -144,16 +156,27 @@ fn insert_directories( seen_directory_dgsts.insert(dgst.clone()); last_directory_dgst = Some(dgst.clone()); - // insert the directory into the database - if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) { - match e { - // TODO: conflict is a problem, as the whole transaction closure is retried? - sled::transaction::UnabortableTransactionError::Conflict => todo!(), - sled::transaction::UnabortableTransactionError::Storage(e) => { - warn!("storage error: {}", e); - return Err(Status::internal("storage error")); + // check if the directory already exists in the database. We can skip + // inserting if it's already there, as that'd be a no-op. + match txn.get(dgst.clone()) { + Ok(None) => { + // insert the directory into the database + if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) { + match e { + // TODO: conflict is a problem, as the whole transaction closure is retried? + sled::transaction::UnabortableTransactionError::Conflict => todo!(), + sled::transaction::UnabortableTransactionError::Storage(e) => { + warn!("storage error: {}", e); + return Err(Status::internal("storage error")); + } + } } } + Ok(Some(_)) => continue, + Err(e) => { + warn!("storage error: {}", e); + return Err(Status::internal("storage error")); + } } } -- cgit 1.4.1