about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/pathinfoservice')
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs56
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs329
-rw-r--r--tvix/store/src/pathinfoservice/memory.rs172
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs61
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs269
5 files changed, 887 insertions, 0 deletions
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
new file mode 100644
index 000000000000..93cb487f29b9
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -0,0 +1,56 @@
+use super::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService, SledPathInfoService};
+
+use std::sync::Arc;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
+use url::Url;
+
+/// Constructs a new instance of a [PathInfoService] from an URI.
+///
+/// The following URIs are supported:
+/// - `memory:`
+///   Uses a in-memory implementation.
+/// - `sled:`
+///   Uses a in-memory sled implementation.
+/// - `sled:///absolute/path/to/somewhere`
+///   Uses sled, using a path on the disk for persistency. Can be only opened
+///   from one process at the same time.
+/// - `grpc+unix:///absolute/path/to/somewhere`
+///   Connects to a local tvix-store gRPC service via Unix socket.
+/// - `grpc+http://host:port`, `grpc+https://host:port`
+///    Connects to a (remote) tvix-store gRPC service.
+///
+/// As the [PathInfoService] needs to talk to [BlobService] and [DirectoryService],
+/// these also need to be passed in.
+pub fn from_addr(
+    uri: &str,
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) -> Result<Arc<dyn PathInfoService>, Error> {
+    let url =
+        Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
+
+    Ok(if url.scheme() == "memory" {
+        Arc::new(MemoryPathInfoService::from_url(
+            &url,
+            blob_service,
+            directory_service,
+        )?)
+    } else if url.scheme() == "sled" {
+        Arc::new(SledPathInfoService::from_url(
+            &url,
+            blob_service,
+            directory_service,
+        )?)
+    } else if url.scheme().starts_with("grpc+") {
+        Arc::new(GRPCPathInfoService::from_url(
+            &url,
+            blob_service,
+            directory_service,
+        )?)
+    } else {
+        Err(Error::StorageError(format!(
+            "unknown scheme: {}",
+            url.scheme()
+        )))?
+    })
+}
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
new file mode 100644
index 000000000000..a88828083940
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -0,0 +1,329 @@
+use super::PathInfoService;
+use crate::proto::{self, ListPathInfoRequest, PathInfo};
+use async_stream::try_stream;
+use futures::Stream;
+use std::{pin::Pin, sync::Arc};
+use tokio::net::UnixStream;
+use tonic::{async_trait, transport::Channel, Code};
+use tvix_castore::{
+    blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
+};
+
+/// Connects to a (remote) tvix-store PathInfoService over gRPC.
+#[derive(Clone)]
+pub struct GRPCPathInfoService {
+    /// The internal reference to a gRPC client.
+    /// Cloning it is cheap, and it internally handles concurrent requests.
+    grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
+}
+
+impl GRPCPathInfoService {
+    /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient].
+    /// panics if called outside the context of a tokio runtime.
+    pub fn from_client(
+        grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
+    ) -> Self {
+        Self { grpc_client }
+    }
+}
+
+#[async_trait]
+impl PathInfoService for GRPCPathInfoService {
+    /// Constructs a [GRPCPathInfoService] from the passed [url::Url]:
+    /// - scheme has to match `grpc+*://`.
+    ///   That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+    /// - In the case of unix sockets, there must be a path, but may not be a host.
+    /// - In the case of non-unix sockets, there must be a host, but no path.
+    /// The blob_service and directory_service arguments are ignored, because the gRPC service already provides answers to these questions.
+    fn from_url(
+        url: &url::Url,
+        _blob_service: Arc<dyn BlobService>,
+        _directory_service: Arc<dyn DirectoryService>,
+    ) -> Result<Self, tvix_castore::Error> {
+        // Start checking for the scheme to start with grpc+.
+        match url.scheme().strip_prefix("grpc+") {
+            None => Err(Error::StorageError("invalid scheme".to_string())),
+            Some(rest) => {
+                if rest == "unix" {
+                    if url.host_str().is_some() {
+                        return Err(Error::StorageError("host may not be set".to_string()));
+                    }
+                    let path = url.path().to_string();
+                    let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter
+                        .unwrap()
+                        .connect_with_connector_lazy(tower::service_fn(
+                            move |_: tonic::transport::Uri| UnixStream::connect(path.clone()),
+                        ));
+                    let grpc_client =
+                        proto::path_info_service_client::PathInfoServiceClient::new(channel);
+                    Ok(Self::from_client(grpc_client))
+                } else {
+                    // ensure path is empty, not supported with gRPC.
+                    if !url.path().is_empty() {
+                        return Err(tvix_castore::Error::StorageError(
+                            "path may not be set".to_string(),
+                        ));
+                    }
+
+                    // clone the uri, and drop the grpc+ from the scheme.
+                    // Recreate a new uri with the `grpc+` prefix dropped from the scheme.
+                    // We can't use `url.set_scheme(rest)`, as it disallows
+                    // setting something http(s) that previously wasn't.
+                    let url = {
+                        let url_str = url.to_string();
+                        let s_stripped = url_str.strip_prefix("grpc+").unwrap();
+                        url::Url::parse(s_stripped).unwrap()
+                    };
+                    let channel = tonic::transport::Endpoint::try_from(url.to_string())
+                        .unwrap()
+                        .connect_lazy();
+
+                    let grpc_client =
+                        proto::path_info_service_client::PathInfoServiceClient::new(channel);
+                    Ok(Self::from_client(grpc_client))
+                }
+            }
+        }
+    }
+
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        // Get a new handle to the gRPC client.
+        let mut grpc_client = self.grpc_client.clone();
+
+        let path_info = grpc_client
+            .get(proto::GetPathInfoRequest {
+                by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
+                    digest.to_vec().into(),
+                )),
+            })
+            .await;
+
+        match path_info {
+            Ok(path_info) => Ok(Some(path_info.into_inner())),
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(Error::StorageError(e.to_string())),
+        }
+    }
+
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // Get a new handle to the gRPC client.
+        let mut grpc_client = self.grpc_client.clone();
+
+        let path_info = grpc_client
+            .put(path_info)
+            .await
+            .map_err(|e| Error::StorageError(e.to_string()))?
+            .into_inner();
+
+        Ok(path_info)
+    }
+
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        // Get a new handle to the gRPC client.
+        let mut grpc_client = self.grpc_client.clone();
+        let root_node = root_node.clone();
+
+        let path_info = grpc_client
+            .calculate_nar(castorepb::Node {
+                node: Some(root_node),
+            })
+            .await
+            .map_err(|e| Error::StorageError(e.to_string()))?
+            .into_inner();
+
+        let nar_sha256: [u8; 32] = path_info
+            .nar_sha256
+            .to_vec()
+            .try_into()
+            .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
+
+        Ok((path_info.nar_size, nar_sha256))
+    }
+
+    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
+        let mut grpc_client = self.grpc_client.clone();
+
+        let stream = try_stream! {
+            let resp = grpc_client.list(ListPathInfoRequest::default()).await;
+
+            let mut stream = resp.map_err(|e| Error::StorageError(e.to_string()))?.into_inner();
+
+            loop {
+                match stream.message().await {
+                    Ok(o) => match o {
+                        Some(pathinfo) => {
+                            // validate the pathinfo
+                            if let Err(e) = pathinfo.validate() {
+                                Err(Error::StorageError(format!(
+                                    "pathinfo {:?} failed validation: {}",
+                                    pathinfo, e
+                                )))?;
+                            }
+                            yield pathinfo
+                        }
+                        None => {
+                            return;
+                        },
+                    },
+                    Err(e) => Err(Error::StorageError(e.to_string()))?,
+                }
+            }
+        };
+
+        Box::pin(stream)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+    use std::time::Duration;
+
+    use tempfile::TempDir;
+    use tokio::net::UnixListener;
+    use tokio_retry::strategy::ExponentialBackoff;
+    use tokio_retry::Retry;
+    use tokio_stream::wrappers::UnixListenerStream;
+
+    use crate::pathinfoservice::MemoryPathInfoService;
+    use crate::proto::GRPCPathInfoServiceWrapper;
+    use crate::tests::fixtures;
+    use crate::tests::utils::gen_blob_service;
+    use crate::tests::utils::gen_directory_service;
+
+    use super::GRPCPathInfoService;
+    use super::PathInfoService;
+
+    /// This uses the wrong scheme
+    #[test]
+    fn test_invalid_scheme() {
+        let url = url::Url::parse("http://foo.example/test").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This uses the correct scheme for a unix socket.
+    /// The fact that /path/to/somewhere doesn't exist yet is no problem, because we connect lazily.
+    #[tokio::test]
+    async fn test_valid_unix_path() {
+        let url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_ok()
+        );
+    }
+
+    /// This uses the correct scheme for a unix socket,
+    /// but sets a host, which is unsupported.
+    #[tokio::test]
+    async fn test_invalid_unix_path_with_domain() {
+        let url =
+            url::Url::parse("grpc+unix://host.example/path/to/somewhere").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This uses the correct scheme for a HTTP server.
+    /// The fact that nothing is listening there is no problem, because we connect lazily.
+    #[tokio::test]
+    async fn test_valid_http() {
+        let url = url::Url::parse("grpc+http://localhost").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_ok()
+        );
+    }
+
+    /// This uses the correct scheme for a HTTPS server.
+    /// The fact that nothing is listening there is no problem, because we connect lazily.
+    #[tokio::test]
+    async fn test_valid_https() {
+        let url = url::Url::parse("grpc+https://localhost").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_ok()
+        );
+    }
+
+    /// This uses the correct scheme, but also specifies
+    /// an additional path, which is not supported for gRPC.
+    /// The fact that nothing is listening there is no problem, because we connect lazily.
+    #[tokio::test]
+    async fn test_invalid_http_with_path() {
+        let url = url::Url::parse("grpc+https://localhost/some-path").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This ensures connecting via gRPC works as expected.
+    #[tokio::test]
+    async fn test_valid_unix_path_ping_pong() {
+        let tmpdir = TempDir::new().unwrap();
+        let socket_path = tmpdir.path().join("daemon");
+
+        let path_clone = socket_path.clone();
+
+        // Spin up a server
+        tokio::spawn(async {
+            let uds = UnixListener::bind(path_clone).unwrap();
+            let uds_stream = UnixListenerStream::new(uds);
+
+            // spin up a new server
+            let mut server = tonic::transport::Server::builder();
+            let router = server.add_service(
+                crate::proto::path_info_service_server::PathInfoServiceServer::new(
+                    GRPCPathInfoServiceWrapper::from(Arc::new(MemoryPathInfoService::new(
+                        gen_blob_service(),
+                        gen_directory_service(),
+                    ))
+                        as Arc<dyn PathInfoService>),
+                ),
+            );
+            router.serve_with_incoming(uds_stream).await
+        });
+
+        // wait for the socket to be created
+        Retry::spawn(
+            ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)),
+            || async {
+                if socket_path.exists() {
+                    Ok(())
+                } else {
+                    Err(())
+                }
+            },
+        )
+        .await
+        .expect("failed to wait for socket");
+
+        // prepare a client
+        let grpc_client = {
+            let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display()))
+                .expect("must parse");
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .expect("must succeed")
+        };
+
+        let path_info = grpc_client
+            .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap())
+            .await
+            .expect("must not be error");
+
+        assert!(path_info.is_none());
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
new file mode 100644
index 000000000000..dbb4b02dd013
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -0,0 +1,172 @@
+use super::PathInfoService;
+use crate::{nar::calculate_size_and_sha256, proto::PathInfo};
+use futures::{stream::iter, Stream};
+use std::{
+    collections::HashMap,
+    pin::Pin,
+    sync::{Arc, RwLock},
+};
+use tonic::async_trait;
+use tvix_castore::proto as castorepb;
+use tvix_castore::Error;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
+
+pub struct MemoryPathInfoService {
+    db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>,
+
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+}
+
+impl MemoryPathInfoService {
+    pub fn new(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) -> Self {
+        Self {
+            db: Default::default(),
+            blob_service,
+            directory_service,
+        }
+    }
+}
+
+#[async_trait]
+impl PathInfoService for MemoryPathInfoService {
+    /// Constructs a [MemoryPathInfoService] from the passed [url::Url]:
+    /// - scheme has to be `memory://`
+    /// - there may not be a host.
+    /// - there may not be a path.
+    fn from_url(
+        url: &url::Url,
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) -> Result<Self, Error> {
+        if url.scheme() != "memory" {
+            return Err(Error::StorageError("invalid scheme".to_string()));
+        }
+
+        if url.has_host() || !url.path().is_empty() {
+            return Err(Error::StorageError("invalid url".to_string()));
+        }
+
+        Ok(Self::new(blob_service, directory_service))
+    }
+
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let db = self.db.read().unwrap();
+
+        match db.get(&digest) {
+            None => Ok(None),
+            Some(path_info) => Ok(Some(path_info.clone())),
+        }
+    }
+
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // Call validate on the received PathInfo message.
+        match path_info.validate() {
+            Err(e) => Err(Error::InvalidRequest(format!(
+                "failed to validate PathInfo: {}",
+                e
+            ))),
+
+            // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
+            // This overwrites existing PathInfo objects.
+            Ok(nix_path) => {
+                let mut db = self.db.write().unwrap();
+                db.insert(nix_path.digest, path_info.clone());
+
+                Ok(path_info)
+            }
+        }
+    }
+
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        calculate_size_and_sha256(
+            root_node,
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+        )
+        .await
+        .map_err(|e| Error::StorageError(e.to_string()))
+    }
+
+    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
+        let db = self.db.read().unwrap();
+
+        // Copy all elements into a list.
+        // This is a bit ugly, because we can't have db escape the lifetime
+        // of this function, but elements need to be returned owned anyways, and this in-
+        // memory impl is only for testing purposes anyways.
+        let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect();
+
+        Box::pin(iter(items))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::tests::utils::gen_blob_service;
+    use crate::tests::utils::gen_directory_service;
+
+    use super::MemoryPathInfoService;
+    use super::PathInfoService;
+
+    /// This uses a wrong scheme.
+    #[test]
+    fn test_invalid_scheme() {
+        let url = url::Url::parse("http://foo.example/test").expect("must parse");
+
+        assert!(
+            MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This correctly sets the scheme, and doesn't set a path.
+    #[test]
+    fn test_valid_scheme() {
+        let url = url::Url::parse("memory://").expect("must parse");
+
+        assert!(
+            MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_ok()
+        );
+    }
+
+    /// This sets the host to `foo`
+    #[test]
+    fn test_invalid_host() {
+        let url = url::Url::parse("memory://foo").expect("must parse");
+
+        assert!(
+            MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This has the path "/", which is invalid.
+    #[test]
+    fn test_invalid_has_path() {
+        let url = url::Url::parse("memory:///").expect("must parse");
+
+        assert!(
+            MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This has the path "/foo", which is invalid.
+    #[test]
+    fn test_invalid_path2() {
+        let url = url::Url::parse("memory:///foo").expect("must parse");
+
+        assert!(
+            MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
new file mode 100644
index 000000000000..af7bbc9f88e4
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -0,0 +1,61 @@
+mod from_addr;
+mod grpc;
+mod memory;
+mod sled;
+
+use std::pin::Pin;
+use std::sync::Arc;
+
+use futures::Stream;
+use tonic::async_trait;
+use tvix_castore::blobservice::BlobService;
+use tvix_castore::directoryservice::DirectoryService;
+use tvix_castore::proto as castorepb;
+use tvix_castore::Error;
+
+use crate::proto::PathInfo;
+
+pub use self::from_addr::from_addr;
+pub use self::grpc::GRPCPathInfoService;
+pub use self::memory::MemoryPathInfoService;
+pub use self::sled::SledPathInfoService;
+
+/// The base trait all PathInfo services need to implement.
+#[async_trait]
+pub trait PathInfoService: Send + Sync {
+    /// Create a new instance by passing in a connection URL, as well
+    /// as instances of a [PathInfoService] and [DirectoryService] (as the
+    /// [PathInfoService] needs to talk to them).
+    /// TODO: check if we want to make this async, instead of lazily connecting
+    fn from_url(
+        url: &url::Url,
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) -> Result<Self, Error>
+    where
+        Self: Sized;
+
+    /// Retrieve a PathInfo message by the output digest.
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error>;
+
+    /// Store a PathInfo message. Implementations MUST call validate and reject
+    /// invalid messages.
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>;
+
+    /// Return the nar size and nar sha256 digest for a given root node.
+    /// This can be used to calculate NAR-based output paths,
+    /// and implementations are encouraged to cache it.
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error>;
+
+    /// Iterate over all PathInfo objects in the store.
+    /// Implementations can decide to disallow listing.
+    ///
+    /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily,
+    /// and the box allows different underlying stream implementations to be returned since
+    /// Rust doesn't support this as a generic in traits yet. This is the same thing that
+    /// [async_trait] generates, but for streams instead of futures.
+    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>>;
+}
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
new file mode 100644
index 000000000000..bac384ea0912
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -0,0 +1,269 @@
+use super::PathInfoService;
+use crate::nar::calculate_size_and_sha256;
+use crate::proto::PathInfo;
+use futures::{stream::iter, Stream};
+use prost::Message;
+use std::{path::PathBuf, pin::Pin, sync::Arc};
+use tonic::async_trait;
+use tracing::warn;
+use tvix_castore::proto as castorepb;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
+
+/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
+///
+/// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
+/// as that's currently the only request type available.
+pub struct SledPathInfoService {
+    db: sled::Db,
+
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+}
+
+impl SledPathInfoService {
+    pub fn new(
+        p: PathBuf,
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) -> Result<Self, sled::Error> {
+        let config = sled::Config::default().use_compression(true).path(p);
+        let db = config.open()?;
+
+        Ok(Self {
+            db,
+            blob_service,
+            directory_service,
+        })
+    }
+
+    pub fn new_temporary(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) -> Result<Self, sled::Error> {
+        let config = sled::Config::default().temporary(true);
+        let db = config.open()?;
+
+        Ok(Self {
+            db,
+            blob_service,
+            directory_service,
+        })
+    }
+}
+
+#[async_trait]
+impl PathInfoService for SledPathInfoService {
+    /// Constructs a [SledPathInfoService] from the passed [url::Url]:
+    /// - scheme has to be `sled://`
+    /// - there may not be a host.
+    /// - a path to the sled needs to be provided (which may not be `/`).
+    fn from_url(
+        url: &url::Url,
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) -> Result<Self, Error> {
+        if url.scheme() != "sled" {
+            return Err(Error::StorageError("invalid scheme".to_string()));
+        }
+
+        if url.has_host() {
+            return Err(Error::StorageError(format!(
+                "invalid host: {}",
+                url.host().unwrap()
+            )));
+        }
+
+        // TODO: expose compression and other parameters as URL parameters, drop new and new_temporary?
+        if url.path().is_empty() {
+            Self::new_temporary(blob_service, directory_service)
+                .map_err(|e| Error::StorageError(e.to_string()))
+        } else if url.path() == "/" {
+            Err(Error::StorageError(
+                "cowardly refusing to open / with sled".to_string(),
+            ))
+        } else {
+            Self::new(url.path().into(), blob_service, directory_service)
+                .map_err(|e| Error::StorageError(e.to_string()))
+        }
+    }
+
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        match self.db.get(digest) {
+            Ok(None) => Ok(None),
+            Ok(Some(data)) => match PathInfo::decode(&*data) {
+                Ok(path_info) => Ok(Some(path_info)),
+                Err(e) => {
+                    warn!("failed to decode stored PathInfo: {}", e);
+                    Err(Error::StorageError(format!(
+                        "failed to decode stored PathInfo: {}",
+                        e
+                    )))
+                }
+            },
+            Err(e) => {
+                warn!("failed to retrieve PathInfo: {}", e);
+                Err(Error::StorageError(format!(
+                    "failed to retrieve PathInfo: {}",
+                    e
+                )))
+            }
+        }
+    }
+
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // Call validate on the received PathInfo message.
+        match path_info.validate() {
+            Err(e) => Err(Error::InvalidRequest(format!(
+                "failed to validate PathInfo: {}",
+                e
+            ))),
+            // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
+            // This overwrites existing PathInfo objects.
+            Ok(nix_path) => match self.db.insert(nix_path.digest, path_info.encode_to_vec()) {
+                Ok(_) => Ok(path_info),
+                Err(e) => {
+                    warn!("failed to insert PathInfo: {}", e);
+                    Err(Error::StorageError(format! {
+                        "failed to insert PathInfo: {}", e
+                    }))
+                }
+            },
+        }
+    }
+
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        calculate_size_and_sha256(
+            root_node,
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+        )
+        .await
+        .map_err(|e| Error::StorageError(e.to_string()))
+    }
+
+    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
+        Box::pin(iter(self.db.iter().values().map(|v| match v {
+            Ok(data) => {
+                // we retrieved some bytes
+                match PathInfo::decode(&*data) {
+                    Ok(path_info) => Ok(path_info),
+                    Err(e) => {
+                        warn!("failed to decode stored PathInfo: {}", e);
+                        Err(Error::StorageError(format!(
+                            "failed to decode stored PathInfo: {}",
+                            e
+                        )))
+                    }
+                }
+            }
+            Err(e) => {
+                warn!("failed to retrieve PathInfo: {}", e);
+                Err(Error::StorageError(format!(
+                    "failed to retrieve PathInfo: {}",
+                    e
+                )))
+            }
+        })))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use tempfile::TempDir;
+
+    use crate::tests::utils::gen_blob_service;
+    use crate::tests::utils::gen_directory_service;
+
+    use super::PathInfoService;
+    use super::SledPathInfoService;
+
+    /// This uses a wrong scheme.
+    #[test]
+    fn test_invalid_scheme() {
+        let url = url::Url::parse("http://foo.example/test").expect("must parse");
+
+        assert!(
+            SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This uses the correct scheme, and doesn't specify a path (temporary sled).
+    #[test]
+    fn test_valid_scheme_temporary() {
+        let url = url::Url::parse("sled://").expect("must parse");
+
+        assert!(
+            SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_ok()
+        );
+    }
+
+    /// This sets the path to a location that doesn't exist, which should fail (as sled doesn't mkdir -p)
+    #[test]
+    fn test_nonexistent_path() {
+        let tmpdir = TempDir::new().unwrap();
+
+        let mut url = url::Url::parse("sled://foo.example").expect("must parse");
+        url.set_path(tmpdir.path().join("foo").join("bar").to_str().unwrap());
+
+        assert!(
+            SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This uses the correct scheme, and specifies / as path (which should fail
+    // for obvious reasons)
+    #[test]
+    fn test_invalid_path_root() {
+        let url = url::Url::parse("sled:///").expect("must parse");
+
+        assert!(
+            SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This uses the correct scheme, and sets a tempdir as location.
+    #[test]
+    fn test_valid_scheme_path() {
+        let tmpdir = TempDir::new().unwrap();
+
+        let mut url = url::Url::parse("sled://").expect("must parse");
+        url.set_path(tmpdir.path().to_str().unwrap());
+
+        assert!(
+            SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_ok()
+        );
+    }
+
+    /// This sets a host, rather than a path, which should fail.
+    #[test]
+    fn test_invalid_host() {
+        let url = url::Url::parse("sled://foo.example").expect("must parse");
+
+        assert!(
+            SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This sets a host AND a valid path, which should fail
+    #[test]
+    fn test_invalid_host_and_path() {
+        let tmpdir = TempDir::new().unwrap();
+
+        let mut url = url::Url::parse("sled://foo.example").expect("must parse");
+        url.set_path(tmpdir.path().to_str().unwrap());
+
+        assert!(
+            SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+}