about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/store/src/sled_directory_service.rs41
-rw-r--r--tvix/store/src/tests/directory_service.rs50
2 files changed, 82 insertions, 9 deletions
diff --git a/tvix/store/src/sled_directory_service.rs b/tvix/store/src/sled_directory_service.rs
index a966601fc423..3d56b8ef3e6a 100644
--- a/tvix/store/src/sled_directory_service.rs
+++ b/tvix/store/src/sled_directory_service.rs
@@ -53,6 +53,9 @@ fn send_directories(
         }
     }
 
+    // keep a list of all the Directory messages already sent, so we can omit sending the same.
+    let mut sent_directory_dgsts: HashSet<Vec<u8>> = HashSet::new();
+
     // look up the directory at the top of the queue
     while let Some(ref digest) = deq.pop_front() {
         let digest_b64: String = BASE64.encode(digest);
@@ -87,9 +90,16 @@ fn send_directories(
                     }
 
                     // if recursion was requested, all its children need to be added to the queue.
+                    // If a Directory message with the same digest has already
+                    // been sent previously, we can skip enqueueing it.
+                    // Same applies to when it already is in the queue.
                     if req.recursive {
                         for child_directory_node in &directory.directories {
-                            deq.push_back(child_directory_node.digest.clone());
+                            if !sent_directory_dgsts.contains(&child_directory_node.digest)
+                                && !deq.contains(&child_directory_node.digest)
+                            {
+                                deq.push_back(child_directory_node.digest.clone());
+                            }
                         }
                     }
 
@@ -98,6 +108,8 @@ fn send_directories(
                         debug!("error sending: {}", e);
                         return Err(Status::internal("error sending"));
                     }
+
+                    sent_directory_dgsts.insert(digest.to_vec());
                 }
             },
             // some storage error?
@@ -144,16 +156,27 @@ fn insert_directories(
         seen_directory_dgsts.insert(dgst.clone());
         last_directory_dgst = Some(dgst.clone());
 
-        // insert the directory into the database
-        if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) {
-            match e {
-                // TODO: conflict is a problem, as the whole transaction closure is retried?
-                sled::transaction::UnabortableTransactionError::Conflict => todo!(),
-                sled::transaction::UnabortableTransactionError::Storage(e) => {
-                    warn!("storage error: {}", e);
-                    return Err(Status::internal("storage error"));
+        // check if the directory already exists in the database. We can skip
+        // inserting if it's already there, as that'd be a no-op.
+        match txn.get(dgst.clone()) {
+            Ok(None) => {
+                // insert the directory into the database
+                if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) {
+                    match e {
+                        // TODO: conflict is a problem, as the whole transaction closure is retried?
+                        sled::transaction::UnabortableTransactionError::Conflict => todo!(),
+                        sled::transaction::UnabortableTransactionError::Storage(e) => {
+                            warn!("storage error: {}", e);
+                            return Err(Status::internal("storage error"));
+                        }
+                    }
                 }
             }
+            Ok(Some(_)) => continue,
+            Err(e) => {
+                warn!("storage error: {}", e);
+                return Err(Status::internal("storage error"));
+            }
         }
     }
 
diff --git a/tvix/store/src/tests/directory_service.rs b/tvix/store/src/tests/directory_service.rs
index c16feb243066..38cc0897c2ab 100644
--- a/tvix/store/src/tests/directory_service.rs
+++ b/tvix/store/src/tests/directory_service.rs
@@ -19,6 +19,21 @@ lazy_static! {
         }],
         ..Default::default()
     };
+    static ref DIRECTORY_C: Directory = Directory {
+        directories: vec![
+            DirectoryNode {
+                name: "a".to_string(),
+                digest: DIRECTORY_A.digest(),
+                size: DIRECTORY_A.size(),
+            },
+            DirectoryNode {
+                name: "a'".to_string(),
+                digest: DIRECTORY_A.digest(),
+                size: DIRECTORY_A.size(),
+            }
+        ],
+        ..Default::default()
+    };
 }
 
 /// Send the specified GetDirectoryRequest.
@@ -156,3 +171,38 @@ async fn put_get_multiple() -> anyhow::Result<()> {
 
     Ok(())
 }
+
+/// Put multiple Directories into the store, and omit duplicates.
+#[tokio::test]
+async fn put_get_dedup() -> anyhow::Result<()> {
+    let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?;
+
+    // Send "A", then "C", which refers to "A" two times
+    // Pretend we're a dumb client sending A twice.
+    let put_resp = service
+        .put(tonic_mock::streaming_request(vec![
+            DIRECTORY_A.clone(),
+            DIRECTORY_A.clone(),
+            DIRECTORY_C.clone(),
+        ]))
+        .await
+        .expect("must succeed");
+
+    assert_eq!(DIRECTORY_C.digest(), put_resp.into_inner().root_digest);
+
+    // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice.
+    let items = get_directories(
+        &service,
+        GetDirectoryRequest {
+            recursive: true,
+            by_what: Some(ByWhat::Digest(DIRECTORY_C.digest())),
+        },
+    )
+    .await
+    .expect("must not error");
+
+    // We expect to get C, and then A (once, as the second A has been deduplicated).
+    assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items);
+
+    Ok(())
+}