about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/store/src/chunkservice/memory.rs61
-rw-r--r--tvix/store/src/chunkservice/mod.rs23
-rw-r--r--tvix/store/src/chunkservice/sled.rs63
-rw-r--r--tvix/store/src/lib.rs1
4 files changed, 148 insertions, 0 deletions
diff --git a/tvix/store/src/chunkservice/memory.rs b/tvix/store/src/chunkservice/memory.rs
new file mode 100644
index 000000000000..1ae8b9130565
--- /dev/null
+++ b/tvix/store/src/chunkservice/memory.rs
@@ -0,0 +1,61 @@
+use data_encoding::BASE64;
+use std::{
+    collections::HashMap,
+    sync::{Arc, RwLock},
+};
+use tracing::instrument;
+
+use crate::Error;
+
+use super::ChunkService;
+
+#[derive(Clone)]
+pub struct MemoryChunkService {
+    db: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>,
+}
+
+impl MemoryChunkService {
+    pub fn new() -> Self {
+        let db = Arc::new(RwLock::new(HashMap::default()));
+
+        Self { db }
+    }
+}
+
+impl ChunkService for MemoryChunkService {
+    #[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
+    fn has(&self, digest: &[u8]) -> Result<bool, Error> {
+        let db = self.db.read().unwrap();
+        Ok(db.get(digest).is_some())
+    }
+
+    #[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))]
+    fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error> {
+        let db = self.db.read().unwrap();
+        match db.get(digest) {
+            None => Ok(None),
+            Some(data) => {
+                // calculate the hash to verify this is really what we expect
+                let actual_digest = blake3::hash(&data).as_bytes().to_vec();
+                if actual_digest != digest {
+                    return Err(Error::StorageError(format!(
+                        "invalid hash encountered when reading chunk, expected {}, got {}",
+                        BASE64.encode(digest),
+                        BASE64.encode(&actual_digest),
+                    )));
+                }
+                Ok(Some(data.clone()))
+            }
+        }
+    }
+
+    #[instrument(skip(self, data))]
+    fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> {
+        let digest = blake3::hash(&data).as_bytes().to_vec();
+
+        let mut db = self.db.write().unwrap();
+        db.insert(digest.clone(), data);
+
+        Ok(digest)
+    }
+}
diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs
new file mode 100644
index 000000000000..83c91fd96ea3
--- /dev/null
+++ b/tvix/store/src/chunkservice/mod.rs
@@ -0,0 +1,23 @@
+pub mod memory;
+pub mod sled;
+
+use crate::Error;
+
+pub use self::memory::MemoryChunkService;
+pub use self::sled::SledChunkService;
+
+/// The base trait all ChunkService services need to implement.
+/// It allows checking for the existence, download and upload of chunks.
+/// It's usually used after consulting a [crate::blobservice::BlobService] for
+/// chunking information.
+pub trait ChunkService {
+    /// check if the service has a chunk, given by its digest.
+    fn has(&self, digest: &[u8]) -> Result<bool, Error>;
+
+    /// retrieve a chunk by its digest. Implementations MUST validate the digest
+    /// matches.
+    fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error>;
+
+    /// insert a chunk. returns the digest of the chunk, or an error.
+    fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error>;
+}
diff --git a/tvix/store/src/chunkservice/sled.rs b/tvix/store/src/chunkservice/sled.rs
new file mode 100644
index 000000000000..06194ddc5943
--- /dev/null
+++ b/tvix/store/src/chunkservice/sled.rs
@@ -0,0 +1,63 @@
+use std::path::PathBuf;
+
+use data_encoding::BASE64;
+use tracing::instrument;
+
+use crate::Error;
+
+use super::ChunkService;
+
+#[derive(Clone)]
+pub struct SledChunkService {
+    db: sled::Db,
+}
+
+impl SledChunkService {
+    pub fn new(p: PathBuf) -> Result<Self, sled::Error> {
+        let config = sled::Config::default().use_compression(true).path(p);
+        let db = config.open()?;
+
+        Ok(Self { db })
+    }
+}
+
+impl ChunkService for SledChunkService {
+    #[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
+    fn has(&self, digest: &[u8]) -> Result<bool, Error> {
+        match self.db.get(digest) {
+            Ok(None) => Ok(false),
+            Ok(Some(_)) => Ok(true),
+            Err(e) => Err(Error::StorageError(e.to_string())),
+        }
+    }
+
+    #[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))]
+    fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error> {
+        match self.db.get(digest) {
+            Ok(None) => Ok(None),
+            Ok(Some(data)) => {
+                // calculate the hash to verify this is really what we expect
+                let actual_digest = blake3::hash(&data).as_bytes().to_vec();
+                if actual_digest != digest {
+                    return Err(Error::StorageError(format!(
+                        "invalid hash encountered when reading chunk, expected {}, got {}",
+                        BASE64.encode(digest),
+                        BASE64.encode(&actual_digest),
+                    )));
+                }
+                Ok(Some(Vec::from(&*data)))
+            }
+            Err(e) => Err(Error::StorageError(e.to_string())),
+        }
+    }
+
+    #[instrument(name = "SledChunkService::put", skip(self, data))]
+    fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> {
+        let digest = blake3::hash(&data).as_bytes().to_vec();
+        let result = self.db.insert(&digest, data);
+        if let Err(e) = result {
+            return Err(Error::StorageError(e.to_string()));
+        }
+        Ok(digest)
+    }
+}
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index 2c8e8610daf6..e4d770c312a9 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -2,6 +2,7 @@ pub mod client;
 
 mod errors;
 
+pub mod chunkservice;
 pub mod proto;
 
 pub mod dummy_blob_service;