about summary refs log tree commit diff
path: root/tvix/store/src/sled_directory_service.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/sled_directory_service.rs')
-rw-r--r--tvix/store/src/sled_directory_service.rs41
1 files changed, 32 insertions, 9 deletions
diff --git a/tvix/store/src/sled_directory_service.rs b/tvix/store/src/sled_directory_service.rs
index a966601fc423..3d56b8ef3e6a 100644
--- a/tvix/store/src/sled_directory_service.rs
+++ b/tvix/store/src/sled_directory_service.rs
@@ -53,6 +53,9 @@ fn send_directories(
         }
     }
 
+    // keep a list of all the Directory messages already sent, so we can omit sending the same.
+    let mut sent_directory_dgsts: HashSet<Vec<u8>> = HashSet::new();
+
     // look up the directory at the top of the queue
     while let Some(ref digest) = deq.pop_front() {
         let digest_b64: String = BASE64.encode(digest);
@@ -87,9 +90,16 @@ fn send_directories(
                     }
 
                     // if recursion was requested, all its children need to be added to the queue.
+                    // If a Directory message with the same digest has already
+                    // been sent previously, we can skip enqueueing it.
+                    // Same applies to when it already is in the queue.
                     if req.recursive {
                         for child_directory_node in &directory.directories {
-                            deq.push_back(child_directory_node.digest.clone());
+                            if !sent_directory_dgsts.contains(&child_directory_node.digest)
+                                && !deq.contains(&child_directory_node.digest)
+                            {
+                                deq.push_back(child_directory_node.digest.clone());
+                            }
                         }
                     }
 
@@ -98,6 +108,8 @@ fn send_directories(
                         debug!("error sending: {}", e);
                         return Err(Status::internal("error sending"));
                     }
+
+                    sent_directory_dgsts.insert(digest.to_vec());
                 }
             },
             // some storage error?
@@ -144,16 +156,27 @@ fn insert_directories(
         seen_directory_dgsts.insert(dgst.clone());
         last_directory_dgst = Some(dgst.clone());
 
-        // insert the directory into the database
-        if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) {
-            match e {
-                // TODO: conflict is a problem, as the whole transaction closure is retried?
-                sled::transaction::UnabortableTransactionError::Conflict => todo!(),
-                sled::transaction::UnabortableTransactionError::Storage(e) => {
-                    warn!("storage error: {}", e);
-                    return Err(Status::internal("storage error"));
+        // check if the directory already exists in the database. We can skip
+        // inserting if it's already there, as that'd be a no-op.
+        match txn.get(dgst.clone()) {
+            Ok(None) => {
+                // insert the directory into the database
+                if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) {
+                    match e {
+                        // TODO: conflict is a problem, as the whole transaction closure is retried?
+                        sled::transaction::UnabortableTransactionError::Conflict => todo!(),
+                        sled::transaction::UnabortableTransactionError::Storage(e) => {
+                            warn!("storage error: {}", e);
+                            return Err(Status::internal("storage error"));
+                        }
+                    }
                 }
             }
+            Ok(Some(_)) => continue,
+            Err(e) => {
+                warn!("storage error: {}", e);
+                return Err(Status::internal("storage error"));
+            }
         }
     }