about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/directoryservice/memory.rs76
-rw-r--r--tvix/store/src/directoryservice/mod.rs21
-rw-r--r--tvix/store/src/directoryservice/sled.rs84
-rw-r--r--tvix/store/src/lib.rs1
4 files changed, 182 insertions, 0 deletions
diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs
new file mode 100644
index 000000000000..34ee45203320
--- /dev/null
+++ b/tvix/store/src/directoryservice/memory.rs
@@ -0,0 +1,76 @@
+use crate::{proto, Error};
+use data_encoding::BASE64;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+use tracing::{instrument, warn};
+
+use super::DirectoryService;
+
+#[derive(Clone)]
+pub struct MemoryDirectoryService {
+    db: Arc<RwLock<HashMap<Vec<u8>, proto::Directory>>>,
+}
+
+impl MemoryDirectoryService {
+    pub fn new() -> Self {
+        let db = Arc::new(RwLock::new(HashMap::default()));
+
+        Self { db }
+    }
+}
+
+impl DirectoryService for MemoryDirectoryService {
+    // TODO: change api to only be by digest
+    #[instrument(skip(self, by_what))]
+    fn get(
+        &self,
+        by_what: &proto::get_directory_request::ByWhat,
+    ) -> Result<Option<proto::Directory>, Error> {
+        match by_what {
+            proto::get_directory_request::ByWhat::Digest(digest) => {
+                let db = self.db.read()?;
+
+                match db.get(digest) {
+                    // The directory was not found, return
+                    None => Ok(None),
+
+                    // The directory was found, try to parse the data as Directory message
+                    Some(directory) => {
+                        // Validate the retrieved Directory indeed has the
+                        // digest we expect it to have, to detect corruptions.
+                        let actual_digest = directory.digest();
+                        if actual_digest.as_slice() != digest {
+                            return Err(Error::StorageError(format!(
+                                "requested directory with digest {}, but got {}",
+                                BASE64.encode(digest),
+                                BASE64.encode(&actual_digest)
+                            )));
+                        }
+
+                        Ok(Some(directory.clone()))
+                    }
+                }
+            }
+        }
+    }
+
+    #[instrument(skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))]
+    fn put(&self, directory: proto::Directory) -> Result<Vec<u8>, Error> {
+        let digest = directory.digest();
+
+        // validate the directory itself.
+        if let Err(e) = directory.validate() {
+            return Err(Error::InvalidRequest(format!(
+                "directory {} failed validation: {}",
+                BASE64.encode(&digest),
+                e,
+            )));
+        }
+
+        // store it
+        let mut db = self.db.write()?;
+        db.insert(digest.clone(), directory);
+
+        Ok(digest)
+    }
+}
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs
new file mode 100644
index 000000000000..4abf823b23b4
--- /dev/null
+++ b/tvix/store/src/directoryservice/mod.rs
@@ -0,0 +1,21 @@
+use crate::{proto, Error};
+mod memory;
+mod sled;
+
+pub use self::memory::MemoryDirectoryService;
+pub use self::sled::SledDirectoryService;
+
+/// The base trait all Directory services need to implement.
+/// This is a simple get and put of [crate::proto::Directory], returning their
+/// digest.
+pub trait DirectoryService {
+    /// Get looks up a single Directory message by its digest.
+    /// In case the directory is not found, Ok(None) is returned.
+    fn get(
+        &self,
+        by_what: &proto::get_directory_request::ByWhat,
+    ) -> Result<Option<proto::Directory>, Error>;
+    /// Get uploads a single Directory message, and returns the calculated
+    /// digest, or an error.
+    fn put(&self, directory: proto::Directory) -> Result<Vec<u8>, Error>;
+}
diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs
new file mode 100644
index 000000000000..caa78615d30a
--- /dev/null
+++ b/tvix/store/src/directoryservice/sled.rs
@@ -0,0 +1,84 @@
+use crate::proto::Directory;
+use crate::{proto, Error};
+use data_encoding::BASE64;
+use prost::Message;
+use std::path::PathBuf;
+use tracing::{instrument, warn};
+
+use super::DirectoryService;
+
+#[derive(Clone)]
+pub struct SledDirectoryService {
+    db: sled::Db,
+}
+
+impl SledDirectoryService {
+    pub fn new(p: PathBuf) -> Result<Self, sled::Error> {
+        let config = sled::Config::default().use_compression(true).path(p);
+        let db = config.open()?;
+
+        Ok(Self { db })
+    }
+}
+
+impl DirectoryService for SledDirectoryService {
+    // TODO: change api to only be by digest
+    #[instrument(name = "SledDirectoryService::get", skip(self, by_what))]
+    fn get(
+        &self,
+        by_what: &proto::get_directory_request::ByWhat,
+    ) -> Result<Option<proto::Directory>, Error> {
+        match by_what {
+            proto::get_directory_request::ByWhat::Digest(digest) => {
+                match self.db.get(digest) {
+                    // The directory was not found, return
+                    Ok(None) => Ok(None),
+
+                    // The directory was found, try to parse the data as Directory message
+                    Ok(Some(data)) => match Directory::decode(&*data) {
+                        Ok(directory) => {
+                            // Validate the retrieved Directory indeed has the
+                            // digest we expect it to have, to detect corruptions.
+                            let actual_digest = directory.digest();
+                            if actual_digest.as_slice() != digest {
+                                return Err(Error::StorageError(format!(
+                                    "requested directory with digest {}, but got {}",
+                                    BASE64.encode(digest),
+                                    BASE64.encode(&actual_digest)
+                                )));
+                            }
+
+                            Ok(Some(directory))
+                        }
+                        Err(e) => {
+                            warn!("unable to parse directory {}: {}", BASE64.encode(digest), e);
+                            Err(Error::StorageError(e.to_string()))
+                        }
+                    },
+                    // some storage error?
+                    Err(e) => Err(Error::StorageError(e.to_string())),
+                }
+            }
+        }
+    }
+
+    #[instrument(name = "SledDirectoryService::put", skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))]
+    fn put(&self, directory: proto::Directory) -> Result<Vec<u8>, Error> {
+        let digest = directory.digest();
+
+        // validate the directory itself.
+        if let Err(e) = directory.validate() {
+            return Err(Error::InvalidRequest(format!(
+                "directory {} failed validation: {}",
+                BASE64.encode(&digest),
+                e,
+            )));
+        }
+        // store it
+        let result = self.db.insert(&digest, directory.encode_to_vec());
+        if let Err(e) = result {
+            return Err(Error::StorageError(e.to_string()));
+        }
+        Ok(digest)
+    }
+}
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index 12250a9c0d51..0279d13f9be7 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -5,6 +5,7 @@ mod errors;
 
 pub mod blobservice;
 pub mod chunkservice;
+pub mod directoryservice;
 pub mod proto;
 
 pub use blobreader::BlobReader;