about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/store/src/client.rs10
-rw-r--r--tvix/store/src/dummy_blob_service.rs43
-rw-r--r--tvix/store/src/lib.rs5
-rw-r--r--tvix/store/src/sled_directory_service.rs300
-rw-r--r--tvix/store/src/sled_path_info_service.rs89
-rw-r--r--tvix/store/src/tests/directory_service.rs268
-rw-r--r--tvix/store/src/tests/mod.rs2
-rw-r--r--tvix/store/src/tests/path_info_service.rs67
8 files changed, 0 insertions, 784 deletions
diff --git a/tvix/store/src/client.rs b/tvix/store/src/client.rs
deleted file mode 100644
index 3b282eacdd70..000000000000
--- a/tvix/store/src/client.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-use crate::proto::Directory;
-
-pub trait StoreClient {
-    fn open_blob(&self, digest: Vec<u8>) -> std::io::Result<Box<dyn std::io::BufRead>>;
-
-    // TODO: stat_blob, put_blob?
-    fn get_directory(&self, digest: Vec<u8>) -> std::io::Result<Option<Directory>>;
-
-    // TODO: put_directory
-}
diff --git a/tvix/store/src/dummy_blob_service.rs b/tvix/store/src/dummy_blob_service.rs
deleted file mode 100644
index 690f6bde3869..000000000000
--- a/tvix/store/src/dummy_blob_service.rs
+++ /dev/null
@@ -1,43 +0,0 @@
-use tokio_stream::wrappers::ReceiverStream;
-
-use crate::proto::blob_service_server::BlobService;
-use crate::proto::BlobChunk;
-use crate::proto::BlobMeta;
-use crate::proto::PutBlobResponse;
-use crate::proto::ReadBlobRequest;
-use crate::proto::StatBlobRequest;
-use tonic::{Request, Response, Result, Status, Streaming};
-use tracing::{instrument, warn};
-
-const NOT_IMPLEMENTED_MSG: &str = "not implemented";
-
-pub struct DummyBlobService {}
-
-#[tonic::async_trait]
-impl BlobService for DummyBlobService {
-    type ReadStream = ReceiverStream<Result<BlobChunk>>;
-
-    #[instrument(skip(self))]
-    async fn stat(&self, _request: Request<StatBlobRequest>) -> Result<Response<BlobMeta>> {
-        warn!(NOT_IMPLEMENTED_MSG);
-        Err(Status::unimplemented(NOT_IMPLEMENTED_MSG))
-    }
-
-    #[instrument(skip(self))]
-    async fn read(
-        &self,
-        _request: Request<ReadBlobRequest>,
-    ) -> Result<Response<Self::ReadStream>, Status> {
-        warn!(NOT_IMPLEMENTED_MSG);
-        Err(Status::unimplemented(NOT_IMPLEMENTED_MSG))
-    }
-
-    #[instrument(skip(self, _request))]
-    async fn put(
-        &self,
-        _request: Request<Streaming<BlobChunk>>,
-    ) -> Result<Response<PutBlobResponse>> {
-        warn!(NOT_IMPLEMENTED_MSG);
-        Err(Status::unimplemented(NOT_IMPLEMENTED_MSG))
-    }
-}
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index f23b0b9b8833..f294e39ad41a 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,5 +1,3 @@
-pub mod client;
-
 mod blobreader;
 mod errors;
 
@@ -11,9 +9,6 @@ pub mod pathinfoservice;
 pub mod proto;
 
 pub use blobreader::BlobReader;
-pub mod dummy_blob_service;
-pub mod sled_directory_service;
-pub mod sled_path_info_service;
 pub use errors::Error;
 
 #[cfg(test)]
diff --git a/tvix/store/src/sled_directory_service.rs b/tvix/store/src/sled_directory_service.rs
deleted file mode 100644
index 4b2c6ed54d62..000000000000
--- a/tvix/store/src/sled_directory_service.rs
+++ /dev/null
@@ -1,300 +0,0 @@
-use data_encoding::BASE64;
-use std::collections::HashMap;
-use std::collections::HashSet;
-use std::collections::VecDeque;
-use std::path::PathBuf;
-use tokio::sync::mpsc::channel;
-use tokio::sync::mpsc::Sender;
-
-use prost::Message;
-use tokio::task;
-use tokio_stream::wrappers::ReceiverStream;
-
-use crate::proto::directory_service_server::DirectoryService;
-use crate::proto::get_directory_request::ByWhat;
-use crate::proto::Directory;
-use crate::proto::GetDirectoryRequest;
-use crate::proto::PutDirectoryResponse;
-use tonic::{Request, Response, Result, Status, Streaming};
-use tracing::{debug, instrument, warn};
-
-pub struct SledDirectoryService {
-    db: sled::Db,
-}
-
-impl SledDirectoryService {
-    pub fn new(p: PathBuf) -> Result<Self, anyhow::Error> {
-        let config = sled::Config::default().use_compression(true).path(p);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-}
-
-/// Lookup a directory, optionally recurse, and send the result to the passed sender.
-/// We pass in a txn, that has been opened on the outside.
-/// It's up to the user to wrap this in a TransactionError appropriately.
-/// This will open a sled txn to ensure a consistent view.
-fn send_directories(
-    txn: &sled::transaction::TransactionalTree,
-    tx: &Sender<Result<Directory>>,
-    req: GetDirectoryRequest,
-) -> Result<(), Status> {
-    // keep the list of directories to traverse
-    let mut deq: VecDeque<Vec<u8>> = VecDeque::new();
-
-    // look at the digest in the request and put it in the top of the queue.
-    match &req.by_what {
-        None => return Err(Status::invalid_argument("by_what needs to be specified")),
-        Some(ByWhat::Digest(digest)) => {
-            if digest.len() != 32 {
-                return Err(Status::invalid_argument("invalid digest length"));
-            }
-            deq.push_back(digest.clone());
-        }
-    }
-
-    // keep a list of all the Directory messages already sent, so we can omit sending the same.
-    let mut sent_directory_dgsts: HashSet<Vec<u8>> = HashSet::new();
-
-    // look up the directory at the top of the queue
-    while let Some(ref digest) = deq.pop_front() {
-        let digest_b64: String = BASE64.encode(digest);
-
-        match txn.get(digest) {
-            // The directory was not found, abort
-            Ok(None) => {
-                return Err(Status::not_found(format!(
-                    "directory {} not found",
-                    digest_b64
-                )))
-            }
-            // The directory was found, try to parse the data as Directory message
-            Ok(Some(data)) => match Directory::decode(&*data) {
-                Err(e) => {
-                    warn!("unable to parse directory {}: {}", digest_b64, e);
-                    return Err(Status::internal(format!(
-                        "unable to parse directory {}",
-                        digest_b64
-                    )));
-                }
-                Ok(directory) => {
-                    // Validate the retrieved Directory indeed has the
-                    // digest we expect it to have, to detect corruptions.
-                    let actual_digest = directory.digest();
-                    if actual_digest != digest.clone() {
-                        return Err(Status::data_loss(format!(
-                            "requested directory with digest {}, but got {}",
-                            digest_b64,
-                            BASE64.encode(&actual_digest)
-                        )));
-                    }
-
-                    // if recursion was requested, all its children need to be added to the queue.
-                    // If a Directory message with the same digest has already
-                    // been sent previously, we can skip enqueueing it.
-                    // Same applies to when it already is in the queue.
-                    if req.recursive {
-                        for child_directory_node in &directory.directories {
-                            if !sent_directory_dgsts.contains(&child_directory_node.digest)
-                                && !deq.contains(&child_directory_node.digest)
-                            {
-                                deq.push_back(child_directory_node.digest.clone());
-                            }
-                        }
-                    }
-
-                    // send the directory message to the client
-                    if let Err(e) = tx.blocking_send(Ok(directory)) {
-                        debug!("error sending: {}", e);
-                        return Err(Status::internal("error sending"));
-                    }
-
-                    sent_directory_dgsts.insert(digest.to_vec());
-                }
-            },
-            // some storage error?
-            Err(e) => {
-                // TODO: check what this error really means
-                warn!("storage error: {}", e);
-                return Err(Status::internal("storage error"));
-            }
-        };
-    }
-
-    // nothing left, we're done
-    Ok(())
-}
-
-/// Consume directories, and insert them into the database.
-/// Reject directory messages that refer to Directories not sent in the same stream.
-fn insert_directories(
-    txn: &sled::transaction::TransactionalTree,
-    directories: &Vec<Directory>,
-) -> Result<Vec<u8>, Status> {
-    // This keeps track of the seen directory keys, and their size.
-    // This is used to validate the size field of a reference to a previously sent directory.
-    // We don't need to keep the contents around, they're stored in the DB.
-    let mut seen_directories_sizes: HashMap<Vec<u8>, u32> = HashMap::new();
-    let mut last_directory_dgst: Option<Vec<u8>> = None;
-
-    for directory in directories {
-        // validate the directory itself.
-        if let Err(e) = directory.validate() {
-            return Err(Status::invalid_argument(format!(
-                "directory {} failed validation: {}",
-                BASE64.encode(&directory.digest()),
-                e,
-            )));
-        }
-
-        // for each child directory this directory refers to, we need
-        // to ensure it has been seen already in this stream, and that the size
-        // matches what we recorded.
-        for child_directory in &directory.directories {
-            match seen_directories_sizes.get(&child_directory.digest) {
-                None => {
-                    return Err(Status::invalid_argument(format!(
-                        "child directory '{}' ({}) in directory '{}' not seen yet",
-                        child_directory.name,
-                        BASE64.encode(&child_directory.digest),
-                        BASE64.encode(&directory.digest()),
-                    )));
-                }
-                Some(seen_child_directory_size) => {
-                    if seen_child_directory_size != &child_directory.size {
-                        return Err(Status::invalid_argument(format!(
-                            "child directory '{}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}",
-                            child_directory.name,
-                            BASE64.encode(&child_directory.digest),
-                            BASE64.encode(&directory.digest()),
-                            seen_child_directory_size,
-                            child_directory.size,
-                        )));
-                    }
-                }
-            }
-        }
-
-        // TODO: do we want to verify this somehow is connected to the current graph?
-        // theoretically, this currently allows uploading multiple
-        // disconnected graphs at the same time…
-
-        let dgst = directory.digest();
-        seen_directories_sizes.insert(dgst.clone(), directory.size());
-        last_directory_dgst = Some(dgst.clone());
-
-        // check if the directory already exists in the database. We can skip
-        // inserting if it's already there, as that'd be a no-op.
-        match txn.get(dgst.clone()) {
-            Ok(None) => {
-                // insert the directory into the database
-                if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) {
-                    match e {
-                        // TODO: conflict is a problem, as the whole transaction closure is retried?
-                        sled::transaction::UnabortableTransactionError::Conflict => todo!(),
-                        sled::transaction::UnabortableTransactionError::Storage(e) => {
-                            warn!("storage error: {}", e);
-                            return Err(Status::internal("storage error"));
-                        }
-                    }
-                }
-            }
-            Ok(Some(_)) => continue,
-            Err(e) => {
-                warn!("storage error: {}", e);
-                return Err(Status::internal("storage error"));
-            }
-        }
-    }
-
-    // no more directories to be send. Return the digest of the
-    // last (root) node, or an error if we don't receive a single one.
-    match last_directory_dgst {
-        Some(last_directory_dgst) => Ok(last_directory_dgst),
-        None => Err(Status::invalid_argument("no directories received")),
-    }
-}
-
-#[tonic::async_trait]
-impl DirectoryService for SledDirectoryService {
-    type GetStream = ReceiverStream<Result<Directory>>;
-
-    #[instrument(skip(self))]
-    async fn get(
-        &self,
-        request: Request<GetDirectoryRequest>,
-    ) -> Result<Response<Self::GetStream>, Status> {
-        let (tx, rx) = channel(5);
-
-        let req_inner = request.into_inner();
-
-        // clone self.db (cheap), so we don't refer to self in the thread.
-        let db = self.db.clone();
-
-        // kick off a thread
-        task::spawn_blocking(move || {
-            // open a DB transaction.
-            let txn_res = db.transaction(|txn| {
-                send_directories(txn, &tx, req_inner.clone())
-                    .map_err(sled::transaction::ConflictableTransactionError::Abort)
-            });
-
-            // handle transaction errors
-            match txn_res {
-                Ok(()) => Ok(()),
-                Err(sled::transaction::TransactionError::Abort(status)) => {
-                    // if the transaction was aborted, there was an error. Send it to the client
-                    tx.blocking_send(Err(status))
-                }
-                Err(sled::transaction::TransactionError::Storage(e)) => {
-                    warn!("storage error: {}", e);
-                    tx.blocking_send(Err(Status::internal("storage error")))
-                }
-            }
-        });
-
-        // NOTE: this always returns an Ok response, with the first item in the
-        // stream being a potential error, instead of directly returning the
-        // first error.
-        // Peeking on the first element seems to be extraordinarily hard,
-        // and client code considers these two equivalent anyways.
-        let receiver_stream = ReceiverStream::new(rx);
-        Ok(Response::new(receiver_stream))
-    }
-
-    #[instrument(skip(self, request))]
-    async fn put(
-        &self,
-        request: Request<Streaming<Directory>>,
-    ) -> Result<Response<PutDirectoryResponse>> {
-        let mut req_inner = request.into_inner();
-
-        // TODO: for now, we collect all Directory messages into a Vec, and
-        // pass it to the helper function.
-        // Ideally, we would validate things as we receive it, and have a way
-        // to reject early - but the semantics of transactions (and its
-        // retries) don't allow us to discard things that early anyways.
-        let mut directories: Vec<Directory> = Vec::new();
-
-        while let Some(directory) = req_inner.message().await? {
-            directories.push(directory)
-        }
-
-        let txn_res = self.db.transaction(|txn| {
-            insert_directories(txn, &directories)
-                .map_err(sled::transaction::ConflictableTransactionError::Abort)
-        });
-
-        match txn_res {
-            Ok(last_directory_dgst) => Ok(Response::new(PutDirectoryResponse {
-                root_digest: last_directory_dgst,
-            })),
-            Err(sled::transaction::TransactionError::Storage(e)) => {
-                warn!("storage error: {}", e);
-                Err(Status::internal("storage error"))
-            }
-            Err(sled::transaction::TransactionError::Abort(e)) => Err(e),
-        }
-    }
-}
diff --git a/tvix/store/src/sled_path_info_service.rs b/tvix/store/src/sled_path_info_service.rs
deleted file mode 100644
index 6b9212a3c912..000000000000
--- a/tvix/store/src/sled_path_info_service.rs
+++ /dev/null
@@ -1,89 +0,0 @@
-use prost::Message;
-use std::path::PathBuf;
-
-use crate::proto::get_path_info_request::ByWhat;
-use crate::proto::path_info_service_server::PathInfoService;
-use crate::proto::CalculateNarResponse;
-use crate::proto::GetPathInfoRequest;
-use crate::proto::Node;
-use crate::proto::PathInfo;
-use nix_compat::store_path::DIGEST_SIZE;
-use tonic::{Request, Response, Result, Status};
-use tracing::{instrument, warn};
-
-const NOT_IMPLEMENTED_MSG: &str = "not implemented";
-
-/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
-///
-/// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
-/// as that's currently the only request type available.
-pub struct SledPathInfoService {
-    db: sled::Db,
-}
-
-impl SledPathInfoService {
-    pub fn new(p: PathBuf) -> Result<Self, anyhow::Error> {
-        let config = sled::Config::default().use_compression(true).path(p);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-}
-
-#[tonic::async_trait]
-impl PathInfoService for SledPathInfoService {
-    #[instrument(skip(self))]
-    async fn get(&self, request: Request<GetPathInfoRequest>) -> Result<Response<PathInfo>> {
-        match request.into_inner().by_what {
-            None => Err(Status::unimplemented("by_what needs to be specified")),
-            Some(ByWhat::ByOutputHash(digest)) => {
-                if digest.len() != DIGEST_SIZE {
-                    return Err(Status::invalid_argument("invalid digest length"));
-                }
-
-                match self.db.get(digest) {
-                    Ok(None) => Err(Status::not_found("PathInfo not found")),
-                    Ok(Some(data)) => match PathInfo::decode(&*data) {
-                        Ok(path_info) => Ok(Response::new(path_info)),
-                        Err(e) => {
-                            warn!("failed to decode stored PathInfo: {}", e);
-                            Err(Status::internal("failed to decode stored PathInfo"))
-                        }
-                    },
-                    Err(e) => {
-                        warn!("failed to retrieve PathInfo: {}", e);
-                        Err(Status::internal("error during PathInfo lookup"))
-                    }
-                }
-            }
-        }
-    }
-
-    #[instrument(skip(self))]
-    async fn put(&self, request: Request<PathInfo>) -> Result<Response<PathInfo>> {
-        let path_info = request.into_inner();
-
-        // Call validate on the received PathInfo message.
-        match path_info.validate() {
-            Err(e) => Err(Status::invalid_argument(e.to_string())),
-            // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
-            // This overwrites existing PathInfo objects.
-            Ok(nix_path) => match self.db.insert(nix_path.digest, path_info.encode_to_vec()) {
-                Ok(_) => Ok(Response::new(path_info)),
-                Err(e) => {
-                    warn!("failed to insert PathInfo: {}", e);
-                    Err(Status::internal("failed to insert PathInfo"))
-                }
-            },
-        }
-    }
-
-    #[instrument(skip(self))]
-    async fn calculate_nar(
-        &self,
-        _request: Request<Node>,
-    ) -> Result<Response<CalculateNarResponse>> {
-        warn!(NOT_IMPLEMENTED_MSG);
-        Err(Status::unimplemented(NOT_IMPLEMENTED_MSG))
-    }
-}
diff --git a/tvix/store/src/tests/directory_service.rs b/tvix/store/src/tests/directory_service.rs
deleted file mode 100644
index 8c66ccb53866..000000000000
--- a/tvix/store/src/tests/directory_service.rs
+++ /dev/null
@@ -1,268 +0,0 @@
-use tempfile::TempDir;
-use tokio_stream::StreamExt;
-use tonic::Status;
-
-use crate::proto::directory_service_server::DirectoryService;
-use crate::proto::get_directory_request::ByWhat;
-use crate::proto::GetDirectoryRequest;
-use crate::proto::{Directory, DirectoryNode, SymlinkNode};
-use crate::sled_directory_service::SledDirectoryService;
-use lazy_static::lazy_static;
-
-lazy_static! {
-    static ref DIRECTORY_A: Directory = Directory::default();
-    static ref DIRECTORY_B: Directory = Directory {
-        directories: vec![DirectoryNode {
-            name: "a".to_string(),
-            digest: DIRECTORY_A.digest(),
-            size: DIRECTORY_A.size(),
-        }],
-        ..Default::default()
-    };
-    static ref DIRECTORY_C: Directory = Directory {
-        directories: vec![
-            DirectoryNode {
-                name: "a".to_string(),
-                digest: DIRECTORY_A.digest(),
-                size: DIRECTORY_A.size(),
-            },
-            DirectoryNode {
-                name: "a'".to_string(),
-                digest: DIRECTORY_A.digest(),
-                size: DIRECTORY_A.size(),
-            }
-        ],
-        ..Default::default()
-    };
-}
-
-/// Send the specified GetDirectoryRequest.
-/// Returns an error in the case of an error response, or an error in one of the items in the stream,
-/// or a Vec<Directory> in the case of a successful request.
-async fn get_directories<S: DirectoryService>(
-    svc: &S,
-    get_directory_request: GetDirectoryRequest,
-) -> Result<Vec<Directory>, Status> {
-    let resp = svc.get(tonic::Request::new(get_directory_request)).await;
-
-    // if the response is an error itself, return the error, otherwise unpack
-    let stream = match resp {
-        Ok(resp) => resp,
-        Err(status) => return Err(status),
-    }
-    .into_inner();
-
-    let directory_results: Vec<Result<Directory, Status>> = stream.collect().await;
-
-    // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status>
-    directory_results.into_iter().collect()
-}
-
-/// Trying to get a non-existent Directory should return a not found error.
-#[tokio::test]
-async fn not_found() -> anyhow::Result<()> {
-    let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?;
-
-    let resp = service
-        .get(tonic::Request::new(GetDirectoryRequest {
-            by_what: Some(ByWhat::Digest(DIRECTORY_A.digest())),
-            ..Default::default()
-        }))
-        .await;
-
-    let mut rx = resp.expect("must succeed").into_inner().into_inner();
-
-    // The stream should contain one element, an error with Code::NotFound.
-    let item = rx
-        .recv()
-        .await
-        .expect("must be some")
-        .expect_err("must be err");
-    assert_eq!(item.code(), tonic::Code::NotFound);
-
-    // … and nothing else
-    assert!(rx.recv().await.is_none());
-
-    Ok(())
-}
-
-/// Put a Directory into the store, get it back.
-#[tokio::test]
-async fn put_get() -> anyhow::Result<()> {
-    let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?;
-
-    let streaming_request = tonic_mock::streaming_request(vec![DIRECTORY_A.clone()]);
-    let put_resp = service
-        .put(streaming_request)
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    // the sent root_digest should match the calculated digest
-    assert_eq!(put_resp.root_digest, DIRECTORY_A.digest());
-
-    // get it back
-    let items = get_directories(
-        &service,
-        GetDirectoryRequest {
-            by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().to_vec())),
-            ..Default::default()
-        },
-    )
-    .await
-    .expect("must not error");
-
-    assert_eq!(vec![DIRECTORY_A.clone()], items);
-
-    Ok(())
-}
-
-/// Put multiple Directories into the store, and get them back
-#[tokio::test]
-async fn put_get_multiple() -> anyhow::Result<()> {
-    let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?;
-
-    // sending "b" (which refers to "a") without sending "a" first should fail.
-    let put_resp = service
-        .put(tonic_mock::streaming_request(vec![DIRECTORY_B.clone()]))
-        .await
-        .expect_err("must fail");
-
-    assert_eq!(tonic::Code::InvalidArgument, put_resp.code());
-
-    // sending "a", then "b" should succeed, and the response should contain the digest of b.
-    let put_resp = service
-        .put(tonic_mock::streaming_request(vec![
-            DIRECTORY_A.clone(),
-            DIRECTORY_B.clone(),
-        ]))
-        .await
-        .expect("must succeed");
-
-    assert_eq!(DIRECTORY_B.digest(), put_resp.into_inner().root_digest);
-
-    // now, request b, first in non-recursive mode.
-    let items = get_directories(
-        &service,
-        GetDirectoryRequest {
-            recursive: false,
-            by_what: Some(ByWhat::Digest(DIRECTORY_B.digest())),
-        },
-    )
-    .await
-    .expect("must not error");
-
-    // We expect to only get b.
-    assert_eq!(vec![DIRECTORY_B.clone()], items);
-
-    // now, request b, but in recursive mode.
-    let items = get_directories(
-        &service,
-        GetDirectoryRequest {
-            recursive: true,
-            by_what: Some(ByWhat::Digest(DIRECTORY_B.digest())),
-        },
-    )
-    .await
-    .expect("must not error");
-
-    // We expect to get b, and then a, because that's how we traverse down.
-    assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items);
-
-    Ok(())
-}
-
-/// Put multiple Directories into the store, and omit duplicates.
-#[tokio::test]
-async fn put_get_dedup() -> anyhow::Result<()> {
-    let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?;
-
-    // Send "A", then "C", which refers to "A" two times
-    // Pretend we're a dumb client sending A twice.
-    let put_resp = service
-        .put(tonic_mock::streaming_request(vec![
-            DIRECTORY_A.clone(),
-            DIRECTORY_A.clone(),
-            DIRECTORY_C.clone(),
-        ]))
-        .await
-        .expect("must succeed");
-
-    assert_eq!(DIRECTORY_C.digest(), put_resp.into_inner().root_digest);
-
-    // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice.
-    let items = get_directories(
-        &service,
-        GetDirectoryRequest {
-            recursive: true,
-            by_what: Some(ByWhat::Digest(DIRECTORY_C.digest())),
-        },
-    )
-    .await
-    .expect("must not error");
-
-    // We expect to get C, and then A (once, as the second A has been deduplicated).
-    assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items);
-
-    Ok(())
-}
-
-/// Trying to upload a Directory failing validation should fail.
-#[tokio::test]
-async fn put_reject_failed_validation() -> anyhow::Result<()> {
-    let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?;
-
-    // construct a broken Directory message that fails validation
-    let broken_directory = Directory {
-        symlinks: vec![SymlinkNode {
-            name: "".to_string(),
-            target: "doesntmatter".to_string(),
-        }],
-        ..Default::default()
-    };
-    assert!(broken_directory.validate().is_err());
-
-    // send it over, it must fail
-    let put_resp = service
-        .put(tonic_mock::streaming_request(vec![broken_directory]))
-        .await
-        .expect_err("must fail");
-
-    assert_eq!(put_resp.code(), tonic::Code::InvalidArgument);
-
-    Ok(())
-}
-
-/// Trying to upload a Directory with wrong size should fail.
-#[tokio::test]
-async fn put_reject_wrong_size() -> anyhow::Result<()> {
-    let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?;
-
-    // Construct a directory referring to DIRECTORY_A, but with wrong size.
-    let broken_parent_directory = Directory {
-        directories: vec![DirectoryNode {
-            name: "foo".to_string(),
-            digest: DIRECTORY_A.digest(),
-            size: 42,
-        }],
-        ..Default::default()
-    };
-    // Make sure we got the size wrong.
-    assert_ne!(
-        broken_parent_directory.directories[0].size,
-        DIRECTORY_A.size()
-    );
-
-    // now upload both (first A, then the broken parent). This must fail.
-    let put_resp = service
-        .put(tonic_mock::streaming_request(vec![
-            DIRECTORY_A.clone(),
-            broken_parent_directory,
-        ]))
-        .await
-        .expect_err("must fail");
-
-    assert_eq!(put_resp.code(), tonic::Code::InvalidArgument);
-
-    Ok(())
-}
diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs
index 6947b277d481..735b0789f80d 100644
--- a/tvix/store/src/tests/mod.rs
+++ b/tvix/store/src/tests/mod.rs
@@ -1,3 +1 @@
-mod directory_service;
 mod nar_renderer;
-mod path_info_service;
diff --git a/tvix/store/src/tests/path_info_service.rs b/tvix/store/src/tests/path_info_service.rs
deleted file mode 100644
index 42e6db36ca88..000000000000
--- a/tvix/store/src/tests/path_info_service.rs
+++ /dev/null
@@ -1,67 +0,0 @@
-use tempfile::TempDir;
-use tonic::Request;
-
-use crate::proto::get_path_info_request::ByWhat::ByOutputHash;
-use crate::proto::node::Node::Symlink;
-use crate::proto::path_info_service_server::PathInfoService;
-use crate::proto::PathInfo;
-use crate::proto::{GetPathInfoRequest, Node, SymlinkNode};
-use crate::sled_path_info_service::SledPathInfoService;
-
-use lazy_static::lazy_static;
-
-lazy_static! {
-    static ref DUMMY_OUTPUT_HASH: Vec<u8> = vec![
-        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-        0x00, 0x00, 0x00, 0x00, 0x00
-    ];
-}
-
-/// Trying to get a non-existent PathInfo should return a not found error.
-#[tokio::test]
-async fn not_found() -> anyhow::Result<()> {
-    let service = SledPathInfoService::new(TempDir::new()?.path().to_path_buf())?;
-
-    let resp = service
-        .get(Request::new(GetPathInfoRequest {
-            by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.to_vec())),
-        }))
-        .await;
-
-    let resp = resp.expect_err("must fail");
-    assert_eq!(resp.code(), tonic::Code::NotFound);
-
-    Ok(())
-}
-
-/// Put a PathInfo into the store, get it back.
-#[tokio::test]
-async fn put_get() -> anyhow::Result<()> {
-    let service = SledPathInfoService::new(TempDir::new()?.path().to_path_buf())?;
-
-    let path_info = PathInfo {
-        node: Some(Node {
-            node: Some(Symlink(SymlinkNode {
-                name: "00000000000000000000000000000000-foo".to_string(),
-                target: "doesntmatter".to_string(),
-            })),
-        }),
-        ..Default::default()
-    };
-
-    let resp = service.put(Request::new(path_info.clone())).await;
-
-    assert!(resp.is_ok());
-    assert_eq!(resp.expect("must succeed").into_inner(), path_info);
-
-    let resp = service
-        .get(Request::new(GetPathInfoRequest {
-            by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.to_vec())),
-        }))
-        .await;
-
-    assert!(resp.is_ok());
-    assert_eq!(resp.expect("must succeed").into_inner(), path_info);
-
-    Ok(())
-}