about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice/sled.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/pathinfoservice/sled.rs')
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs125
1 files changed, 125 insertions, 0 deletions
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
new file mode 100644
index 0000000000..0255c031e2
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -0,0 +1,125 @@
+use super::PathInfoService;
+use crate::nar::calculate_size_and_sha256;
+use crate::proto::PathInfo;
+use data_encoding::BASE64;
+use futures::stream::iter;
+use futures::stream::BoxStream;
+use prost::Message;
+use std::path::Path;
+use tonic::async_trait;
+use tracing::instrument;
+use tracing::warn;
+use tvix_castore::proto as castorepb;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
+
+/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
+///
+/// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
+/// as that's currently the only request type available.
+pub struct SledPathInfoService<BS, DS> {
+    db: sled::Db,
+
+    blob_service: BS,
+    directory_service: DS,
+}
+
+impl<BS, DS> SledPathInfoService<BS, DS> {
+    pub fn new<P: AsRef<Path>>(
+        p: P,
+        blob_service: BS,
+        directory_service: DS,
+    ) -> Result<Self, sled::Error> {
+        let config = sled::Config::default()
+            .use_compression(false) // is a required parameter
+            .path(p);
+        let db = config.open()?;
+
+        Ok(Self {
+            db,
+            blob_service,
+            directory_service,
+        })
+    }
+
+    pub fn new_temporary(blob_service: BS, directory_service: DS) -> Result<Self, sled::Error> {
+        let config = sled::Config::default().temporary(true);
+        let db = config.open()?;
+
+        Ok(Self {
+            db,
+            blob_service,
+            directory_service,
+        })
+    }
+}
+
+#[async_trait]
+impl<BS, DS> PathInfoService for SledPathInfoService<BS, DS>
+where
+    BS: AsRef<dyn BlobService> + Send + Sync,
+    DS: AsRef<dyn DirectoryService> + Send + Sync,
+{
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        match self.db.get(digest).map_err(|e| {
+            warn!("failed to retrieve PathInfo: {}", e);
+            Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+        })? {
+            None => Ok(None),
+            Some(data) => {
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
+                    warn!("failed to decode stored PathInfo: {}", e);
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+                Ok(Some(path_info))
+            }
+        }
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // Call validate on the received PathInfo message.
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?;
+
+        // In case the PathInfo is valid, we were able to parse a StorePath.
+        // Store it in the database, keyed by its digest.
+        // This overwrites existing PathInfo objects.
+        self.db
+            .insert(store_path.digest(), path_info.encode_to_vec())
+            .map_err(|e| {
+                warn!("failed to insert PathInfo: {}", e);
+                Error::StorageError(format! {
+                    "failed to insert PathInfo: {}", e
+                })
+            })?;
+
+        Ok(path_info)
+    }
+
+    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
+            .await
+            .map_err(|e| Error::StorageError(e.to_string()))
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        Box::pin(iter(self.db.iter().values().map(|v| {
+            let data = v.map_err(|e| {
+                warn!("failed to retrieve PathInfo: {}", e);
+                Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+            })?;
+
+            let path_info = PathInfo::decode(&*data).map_err(|e| {
+                warn!("failed to decode stored PathInfo: {}", e);
+                Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+            })?;
+            Ok(path_info)
+        })))
+    }
+}