about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/build/src/buildservice/from_addr.rs18
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs163
-rw-r--r--tvix/castore/src/directoryservice/tests/mod.rs3
-rw-r--r--tvix/castore/src/directoryservice/traverse.rs8
-rw-r--r--tvix/castore/src/lib.rs1
-rw-r--r--tvix/castore/src/proto/tests/grpc_blobservice.rs94
-rw-r--r--tvix/castore/src/proto/tests/grpc_directoryservice.rs246
-rw-r--r--tvix/castore/src/proto/tests/mod.rs2
-rw-r--r--tvix/castore/src/tests/import.rs19
-rw-r--r--tvix/castore/src/utils.rs89
10 files changed, 31 insertions, 612 deletions
diff --git a/tvix/build/src/buildservice/from_addr.rs b/tvix/build/src/buildservice/from_addr.rs
index ee2b4e50b4da..f5c4e6a490bb 100644
--- a/tvix/build/src/buildservice/from_addr.rs
+++ b/tvix/build/src/buildservice/from_addr.rs
@@ -51,7 +51,10 @@ mod tests {
 
     use super::from_addr;
     use test_case::test_case;
-    use tvix_castore::utils::{gen_blob_service, gen_directory_service};
+    use tvix_castore::{
+        blobservice::{BlobService, MemoryBlobService},
+        directoryservice::{DirectoryService, MemoryDirectoryService},
+    };
 
     /// This uses an unsupported scheme.
     #[test_case("http://foo.example/test", false; "unsupported scheme")]
@@ -71,14 +74,13 @@ mod tests {
     #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")]
     #[tokio::test]
     async fn test_from_addr(uri_str: &str, is_ok: bool) {
+        let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
+        let directory_service: Arc<dyn DirectoryService> =
+            Arc::from(MemoryDirectoryService::default());
         assert_eq!(
-            from_addr(
-                uri_str,
-                Arc::from(gen_blob_service()),
-                Arc::from(gen_directory_service())
-            )
-            .await
-            .is_ok(),
+            from_addr(uri_str, blob_service, directory_service)
+                .await
+                .is_ok(),
             is_ok
         )
     }
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index fe410a38257d..84cf01e1679e 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -220,18 +220,6 @@ pub struct GRPCPutter {
     )>,
 }
 
-impl GRPCPutter {
-    // allows checking if the tx part of the channel is closed.
-    // only used in the test case.
-    #[cfg(test)]
-    fn is_closed(&self) -> bool {
-        match self.rq {
-            None => true,
-            Some((_, ref directory_sender)) => directory_sender.is_closed(),
-        }
-    }
-}
-
 #[async_trait]
 impl DirectoryPutter for GRPCPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
@@ -280,163 +268,18 @@ impl DirectoryPutter for GRPCPutter {
 
 #[cfg(test)]
 mod tests {
-    use core::time;
-    use futures::StreamExt;
-    use std::{any::Any, time::Duration};
+    use std::time::Duration;
     use tempfile::TempDir;
     use tokio::net::UnixListener;
     use tokio_retry::{strategy::ExponentialBackoff, Retry};
     use tokio_stream::wrappers::UnixListenerStream;
 
     use crate::{
-        directoryservice::{
-            grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService,
-            MemoryDirectoryService,
-        },
-        fixtures::{self, DIRECTORY_A, DIRECTORY_B},
+        directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService},
+        fixtures,
         proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper},
-        utils::gen_directorysvc_grpc_client,
     };
 
-    #[tokio::test]
-    async fn test() {
-        // create the GrpcDirectoryService
-        let directory_service =
-            super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await);
-
-        // try to get DIRECTORY_A should return Ok(None)
-        assert_eq!(
-            None,
-            directory_service
-                .get(&DIRECTORY_A.digest())
-                .await
-                .expect("must not fail")
-        );
-
-        // Now upload it
-        assert_eq!(
-            DIRECTORY_A.digest(),
-            directory_service
-                .put(DIRECTORY_A.clone())
-                .await
-                .expect("must succeed")
-        );
-
-        // And retrieve it, compare for equality.
-        assert_eq!(
-            DIRECTORY_A.clone(),
-            directory_service
-                .get(&DIRECTORY_A.digest())
-                .await
-                .expect("must succeed")
-                .expect("must be some")
-        );
-
-        // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
-        directory_service
-            .put(DIRECTORY_B.clone())
-            .await
-            .expect_err("must fail");
-
-        // Putting DIRECTORY_B in a put_multiple will succeed, but the close
-        // will always fail.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-            handle.close().await.expect_err("must fail");
-        }
-
-        // Uploading A and then B should succeed, and closing should return the digest of B.
-        let mut handle = directory_service.put_multiple_start();
-        handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
-        handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-        let digest = handle.close().await.expect("must succeed");
-        assert_eq!(DIRECTORY_B.digest(), digest);
-
-        // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
-        let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest());
-        assert_eq!(
-            DIRECTORY_B.clone(),
-            directories_it
-                .next()
-                .await
-                .expect("must be some")
-                .expect("must succeed")
-        );
-        assert_eq!(
-            DIRECTORY_A.clone(),
-            directories_it
-                .next()
-                .await
-                .expect("must be some")
-                .expect("must succeed")
-        );
-
-        // Uploading B and then A should fail, because B refers to A, which
-        // hasn't been uploaded yet.
-        // However, the client can burst, so we might not have received the
-        // error back from the server.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            // sending out B will always be fine
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-
-            // whether we will be able to put A as well depends on whether we
-            // already received the error about B.
-            if handle.put(DIRECTORY_A.clone()).await.is_ok() {
-                // If we didn't, and this was Ok(_), …
-                // a subsequent close MUST fail (because it waits for the
-                // server)
-                handle.close().await.expect_err("must fail");
-            }
-        }
-
-        // Now we do the same test as before, send B, then A, but wait
-        // a long long time so we already received the error from the server
-        // (causing the internal stream to be closed).
-        // Uploading anything else subsequently should then fail.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-
-            // get a GRPCPutter, so we can peek at [is_closed].
-            let handle_any = &mut handle as &mut dyn Any;
-
-            // `unchecked_downcast_mut` is unstable for now,
-            // https://github.com/rust-lang/rust/issues/90850
-            // We do the same thing here.
-            // The reason for why we cannot use the checked downcast lies
-            // in the fact that:
-            // - GRPCPutter has type ID A
-            // - Box<GRPCPutter> has type ID B
-            // - "Box<dyn GRPCPutter>" (invalid type) has type ID C
-            // B seems different from C in this context.
-            // We cannot unpack and perform upcast coercion of the traits as it's an unstable
-            // feature.
-            // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose
-            // of not making leak `is_closed` in the original trait.
-            let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) };
-            let mut is_closed = false;
-            for _try in 1..1000 {
-                if handle.is_closed() {
-                    is_closed = true;
-                    break;
-                }
-                tokio::time::sleep(time::Duration::from_millis(10)).await;
-            }
-
-            assert!(
-                is_closed,
-                "expected channel to eventually close, but never happened"
-            );
-
-            handle
-                .put(DIRECTORY_A.clone())
-                .await
-                .expect_err("must fail");
-        }
-    }
-
     /// This ensures connecting via gRPC works as expected.
     #[tokio::test]
     async fn test_valid_unix_path_ping_pong() {
diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs
index 5eb2d1919e80..cec49bb2c66a 100644
--- a/tvix/castore/src/directoryservice/tests/mod.rs
+++ b/tvix/castore/src/directoryservice/tests/mod.rs
@@ -180,7 +180,8 @@ async fn upload_reject_failing_validation(directory_service: impl DirectoryServi
     );
 
     // Try to upload via put_multiple. We're a bit more permissive here, the
-    // intermediate .put() might succeed, but then the close MUST fail.
+    // intermediate .put() might succeed, due to client-side bursting (in the
+    // case of gRPC), but then the close MUST fail.
     let mut handle = directory_service.put_multiple_start();
     if handle.put(broken_directory).await.is_ok() {
         assert!(
diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs
index 5c6975351b40..573581edbdd7 100644
--- a/tvix/castore/src/directoryservice/traverse.rs
+++ b/tvix/castore/src/directoryservice/traverse.rs
@@ -87,14 +87,16 @@ where
 mod tests {
     use std::path::PathBuf;
 
-    use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP};
-    use crate::utils::gen_directory_service;
+    use crate::{
+        directoryservice,
+        fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP},
+    };
 
     use super::descend_to;
 
     #[tokio::test]
     async fn test_descend_to() {
-        let directory_service = gen_directory_service();
+        let directory_service = directoryservice::from_addr("memory://").await.unwrap();
 
         let mut handle = directory_service.put_multiple_start();
         handle
diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs
index 1ce092135b83..1a7ac6b4b415 100644
--- a/tvix/castore/src/lib.rs
+++ b/tvix/castore/src/lib.rs
@@ -12,7 +12,6 @@ pub mod fs;
 pub mod import;
 pub mod proto;
 pub mod tonic;
-pub mod utils;
 
 pub use digests::{B3Digest, B3_LEN};
 pub use errors::Error;
diff --git a/tvix/castore/src/proto/tests/grpc_blobservice.rs b/tvix/castore/src/proto/tests/grpc_blobservice.rs
deleted file mode 100644
index fb202b7d8a51..000000000000
--- a/tvix/castore/src/proto/tests/grpc_blobservice.rs
+++ /dev/null
@@ -1,94 +0,0 @@
-use crate::fixtures::{BLOB_A, BLOB_A_DIGEST};
-use crate::proto::{BlobChunk, ReadBlobRequest, StatBlobRequest};
-use crate::utils::gen_blobsvc_grpc_client;
-use tokio_stream::StreamExt;
-
-/// Trying to read a non-existent blob should return a not found error.
-#[tokio::test]
-async fn not_found_read() {
-    let mut grpc_client = gen_blobsvc_grpc_client().await;
-
-    let resp = grpc_client
-        .read(ReadBlobRequest {
-            digest: BLOB_A_DIGEST.clone().into(),
-        })
-        .await;
-
-    // We can't use unwrap_err here, because the Ok value doesn't implement
-    // debug.
-    if let Err(e) = resp {
-        assert_eq!(e.code(), tonic::Code::NotFound);
-    } else {
-        panic!("resp is not err")
-    }
-}
-
-/// Trying to stat a non-existent blob should return a not found error.
-#[tokio::test]
-async fn not_found_stat() {
-    let mut grpc_client = gen_blobsvc_grpc_client().await;
-
-    let resp = grpc_client
-        .stat(StatBlobRequest {
-            digest: BLOB_A_DIGEST.clone().into(),
-            ..Default::default()
-        })
-        .await
-        .expect_err("must fail");
-
-    // The resp should be a status with Code::NotFound
-    assert_eq!(resp.code(), tonic::Code::NotFound);
-}
-
-/// Put a blob in the store, get it back.
-#[tokio::test]
-async fn put_read_stat() {
-    let mut grpc_client = gen_blobsvc_grpc_client().await;
-
-    // Send blob A.
-    let put_resp = grpc_client
-        .put(tokio_stream::once(BlobChunk {
-            data: BLOB_A.clone(),
-        }))
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    assert_eq!(BLOB_A_DIGEST.as_slice(), put_resp.digest);
-
-    // Stat for the digest of A.
-    // We currently don't ask for more granular chunking data, as we don't
-    // expose it yet.
-    let _resp = grpc_client
-        .stat(StatBlobRequest {
-            digest: BLOB_A_DIGEST.clone().into(),
-            ..Default::default()
-        })
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    // Read the blob. It should return the same data.
-    let resp = grpc_client
-        .read(ReadBlobRequest {
-            digest: BLOB_A_DIGEST.clone().into(),
-        })
-        .await;
-
-    let mut rx = resp.ok().unwrap().into_inner();
-
-    // the stream should contain one element, a BlobChunk with the same contents as BLOB_A.
-    let item = rx
-        .next()
-        .await
-        .expect("must be some")
-        .expect("must succeed");
-
-    assert_eq!(BLOB_A.clone(), item.data);
-
-    // … and no more elements
-    assert!(rx.next().await.is_none());
-
-    // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks.
-    // Test with some bigger blob too
-}
diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs
deleted file mode 100644
index dcd9a0ef010e..000000000000
--- a/tvix/castore/src/proto/tests/grpc_directoryservice.rs
+++ /dev/null
@@ -1,246 +0,0 @@
-use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
-use crate::proto::directory_service_client::DirectoryServiceClient;
-use crate::proto::get_directory_request::ByWhat;
-use crate::proto::GetDirectoryRequest;
-use crate::proto::{Directory, DirectoryNode, SymlinkNode};
-use crate::utils::gen_directorysvc_grpc_client;
-use tokio_stream::StreamExt;
-use tonic::transport::Channel;
-use tonic::Status;
-
-/// Send the specified GetDirectoryRequest.
-/// Returns an error in the case of an error response, or an error in one of
-/// the items in the stream, or a Vec<Directory> in the case of a successful
-/// request.
-async fn get_directories(
-    grpc_client: &mut DirectoryServiceClient<Channel>,
-    get_directory_request: GetDirectoryRequest,
-) -> Result<Vec<Directory>, Status> {
-    let resp = grpc_client.get(get_directory_request).await;
-
-    // if the response is an error itself, return the error, otherwise unpack
-    let stream = match resp {
-        Ok(resp) => resp,
-        Err(status) => return Err(status),
-    }
-    .into_inner();
-
-    let directory_results: Vec<Result<Directory, Status>> = stream.collect().await;
-
-    // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status>
-    directory_results.into_iter().collect()
-}
-
-/// Trying to get a non-existent Directory should return a not found error.
-#[tokio::test]
-async fn not_found() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    let resp = grpc_client
-        .get(GetDirectoryRequest {
-            by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())),
-            ..Default::default()
-        })
-        .await;
-
-    let stream = resp.expect("must succeed").into_inner();
-
-    let items: Vec<_> = stream.collect().await;
-
-    // The stream should contain one element, an error with Code::NotFound.
-    assert_eq!(1, items.len());
-    let item = items[0].clone();
-
-    assert!(item.is_err(), "must be err");
-    assert_eq!(
-        tonic::Code::NotFound,
-        item.unwrap_err().code(),
-        "must be err"
-    );
-}
-
-/// Put a Directory into the store, get it back.
-#[tokio::test]
-async fn put_get() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    // send directory A.
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::once(DIRECTORY_A.clone()))
-            .await
-            .expect("must succeed")
-            .into_inner()
-    };
-
-    // the sent root_digest should match the calculated digest
-    assert_eq!(put_resp.root_digest, DIRECTORY_A.digest().as_slice());
-
-    // get it back
-    let items = get_directories(
-        &mut grpc_client,
-        GetDirectoryRequest {
-            by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())),
-            ..Default::default()
-        },
-    )
-    .await
-    .expect("must not error");
-
-    assert_eq!(vec![DIRECTORY_A.clone()], items);
-}
-
-/// Put multiple Directories into the store, and get them back
-#[tokio::test]
-async fn put_get_multiple() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    // sending "b" (which refers to "a") without sending "a" first should fail.
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::once(DIRECTORY_B.clone()))
-            .await
-            .expect_err("must fail")
-    };
-
-    assert_eq!(tonic::Code::InvalidArgument, put_resp.code());
-
-    // sending "a", then "b" should succeed, and the response should contain the digest of b.
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::iter(vec![
-                DIRECTORY_A.clone(),
-                DIRECTORY_B.clone(),
-            ]))
-            .await
-            .expect("must succeed")
-            .into_inner()
-    };
-
-    assert_eq!(DIRECTORY_B.digest().as_slice(), put_resp.root_digest);
-
-    // now, request b, first in non-recursive mode.
-    let items = get_directories(
-        &mut grpc_client,
-        GetDirectoryRequest {
-            recursive: false,
-            by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())),
-        },
-    )
-    .await
-    .expect("must not error");
-
-    // We expect to only get b.
-    assert_eq!(vec![DIRECTORY_B.clone()], items);
-
-    // now, request b, but in recursive mode.
-    let items = get_directories(
-        &mut grpc_client,
-        GetDirectoryRequest {
-            recursive: true,
-            by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())),
-        },
-    )
-    .await
-    .expect("must not error");
-
-    // We expect to get b, and then a, because that's how we traverse down.
-    assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items);
-}
-
-/// Put multiple Directories into the store, and omit duplicates.
-#[tokio::test]
-async fn put_get_dedup() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    // Send "A", then "C", which refers to "A" two times
-    // Pretend we're a dumb client sending A twice.
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::iter(vec![
-                DIRECTORY_A.clone(),
-                DIRECTORY_A.clone(),
-                DIRECTORY_C.clone(),
-            ]))
-            .await
-            .expect("must succeed")
-    };
-
-    assert_eq!(
-        DIRECTORY_C.digest().as_slice(),
-        put_resp.into_inner().root_digest
-    );
-
-    // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice.
-    let items = get_directories(
-        &mut grpc_client,
-        GetDirectoryRequest {
-            recursive: true,
-            by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().into())),
-        },
-    )
-    .await
-    .expect("must not error");
-
-    // We expect to get C, and then A (once, as the second A has been deduplicated).
-    assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items);
-}
-
-/// Trying to upload a Directory failing validation should fail.
-#[tokio::test]
-async fn put_reject_failed_validation() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    // construct a broken Directory message that fails validation
-    let broken_directory = Directory {
-        symlinks: vec![SymlinkNode {
-            name: "".into(),
-            target: "doesntmatter".into(),
-        }],
-        ..Default::default()
-    };
-    assert!(broken_directory.validate().is_err());
-
-    // send it over, it must fail
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::once(broken_directory))
-            .await
-            .expect_err("must fail")
-    };
-
-    assert_eq!(put_resp.code(), tonic::Code::InvalidArgument);
-}
-
-/// Trying to upload a Directory with wrong size should fail.
-#[tokio::test]
-async fn put_reject_wrong_size() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    // Construct a directory referring to DIRECTORY_A, but with wrong size.
-    let broken_parent_directory = Directory {
-        directories: vec![DirectoryNode {
-            name: "foo".into(),
-            digest: DIRECTORY_A.digest().into(),
-            size: 42,
-        }],
-        ..Default::default()
-    };
-    // Make sure we got the size wrong.
-    assert_ne!(
-        broken_parent_directory.directories[0].size,
-        DIRECTORY_A.size()
-    );
-
-    // now upload both (first A, then the broken parent). This must fail.
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::iter(vec![
-                DIRECTORY_A.clone(),
-                broken_parent_directory,
-            ]))
-            .await
-            .expect_err("must fail")
-    };
-    assert_eq!(put_resp.code(), tonic::Code::InvalidArgument);
-}
diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs
index 8b62fadeb5a6..8d903bacb6c5 100644
--- a/tvix/castore/src/proto/tests/mod.rs
+++ b/tvix/castore/src/proto/tests/mod.rs
@@ -1,4 +1,2 @@
 mod directory;
 mod directory_nodes_iterator;
-mod grpc_blobservice;
-mod grpc_directoryservice;
diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs
index 99e993f36da3..b44b71cd784d 100644
--- a/tvix/castore/src/tests/import.rs
+++ b/tvix/castore/src/tests/import.rs
@@ -1,8 +1,9 @@
-use crate::blobservice::BlobService;
+use crate::blobservice::{self, BlobService};
+use crate::directoryservice;
 use crate::fixtures::*;
 use crate::import::ingest_path;
 use crate::proto;
-use crate::utils::{gen_blob_service, gen_directory_service};
+
 use std::sync::Arc;
 use tempfile::TempDir;
 
@@ -12,8 +13,8 @@ use std::os::unix::ffi::OsStrExt;
 #[cfg(target_family = "unix")]
 #[tokio::test]
 async fn symlink() {
-    let blob_service = gen_blob_service();
-    let directory_service = gen_directory_service();
+    let blob_service = blobservice::from_addr("memory://").await.unwrap();
+    let directory_service = directoryservice::from_addr("memory://").await.unwrap();
 
     let tmpdir = TempDir::new().unwrap();
 
@@ -43,8 +44,9 @@ async fn symlink() {
 
 #[tokio::test]
 async fn single_file() {
-    let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
-    let directory_service = gen_directory_service();
+    let blob_service =
+        Arc::from(blobservice::from_addr("memory://").await.unwrap()) as Arc<dyn BlobService>;
+    let directory_service = directoryservice::from_addr("memory://").await.unwrap();
 
     let tmpdir = TempDir::new().unwrap();
 
@@ -75,8 +77,9 @@ async fn single_file() {
 #[cfg(target_family = "unix")]
 #[tokio::test]
 async fn complicated() {
-    let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
-    let directory_service = gen_directory_service();
+    let blob_service =
+        Arc::from(blobservice::from_addr("memory://").await.unwrap()) as Arc<dyn BlobService>;
+    let directory_service = directoryservice::from_addr("memory://").await.unwrap();
 
     let tmpdir = TempDir::new().unwrap();
 
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs
deleted file mode 100644
index ca68f9f26c5a..000000000000
--- a/tvix/castore/src/utils.rs
+++ /dev/null
@@ -1,89 +0,0 @@
-//! A crate containing constructors to provide instances of a BlobService and
-//! DirectoryService. Only used for testing purposes, but across crates.
-//! Should be removed once we have a better concept of a "Service registry".
-use tonic::transport::{Channel, Endpoint, Server, Uri};
-
-use crate::{
-    blobservice::{BlobService, MemoryBlobService},
-    directoryservice::{DirectoryService, MemoryDirectoryService},
-    proto::{
-        blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer,
-        directory_service_client::DirectoryServiceClient,
-        directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper,
-        GRPCDirectoryServiceWrapper,
-    },
-};
-
-pub fn gen_blob_service() -> Box<dyn BlobService> {
-    Box::<MemoryBlobService>::default()
-}
-
-pub fn gen_directory_service() -> Box<dyn DirectoryService> {
-    Box::<MemoryDirectoryService>::default()
-}
-
-/// This will spawn the a gRPC server with a DirectoryService client, connect a
-/// gRPC DirectoryService client and return it.
-#[allow(dead_code)]
-pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
-    let (left, right) = tokio::io::duplex(64);
-
-    // spin up a server, which will only connect once, to the left side.
-    tokio::spawn(async {
-        // spin up a new DirectoryService
-        let mut server = Server::builder();
-        let router = server.add_service(DirectoryServiceServer::new(
-            GRPCDirectoryServiceWrapper::new(gen_directory_service()),
-        ));
-
-        router
-            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
-            .await
-    });
-
-    // Create a client, connecting to the right side. The URI is unused.
-    let mut maybe_right = Some(right);
-    DirectoryServiceClient::new(
-        Endpoint::try_from("http://[::]:50051")
-            .unwrap()
-            .connect_with_connector(tower::service_fn(move |_: Uri| {
-                let right = maybe_right.take().unwrap();
-                async move { Ok::<_, std::io::Error>(right) }
-            }))
-            .await
-            .unwrap(),
-    )
-}
-
-/// This will spawn the a gRPC server with a BlobService client, connect a
-/// gRPC BlobService client and return it.
-#[allow(dead_code)]
-pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> {
-    let (left, right) = tokio::io::duplex(64);
-
-    // spin up a server, which will only connect once, to the left side.
-    tokio::spawn(async {
-        // spin up a new DirectoryService
-        let mut server = Server::builder();
-        let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
-            gen_blob_service(),
-        )));
-
-        router
-            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
-            .await
-    });
-
-    // Create a client, connecting to the right side. The URI is unused.
-    let mut maybe_right = Some(right);
-    BlobServiceClient::new(
-        Endpoint::try_from("http://[::]:50051")
-            .unwrap()
-            .connect_with_connector(tower::service_fn(move |_: Uri| {
-                let right = maybe_right.take().unwrap();
-                async move { Ok::<_, std::io::Error>(right) }
-            }))
-            .await
-            .unwrap(),
-    )
-}