about summary refs log tree commit diff
path: root/tvix/store/src/blobservice/memory.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/blobservice/memory.rs')
-rw-r--r--tvix/store/src/blobservice/memory.rs87
1 files changed, 58 insertions, 29 deletions
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
index 57c86551e4de..9a796ca2c0c8 100644
--- a/tvix/store/src/blobservice/memory.rs
+++ b/tvix/store/src/blobservice/memory.rs
@@ -1,51 +1,80 @@
 use data_encoding::BASE64;
+use std::io::Cursor;
 use std::{
     collections::HashMap,
     sync::{Arc, RwLock},
 };
-use tracing::instrument;
+use tracing::{instrument, warn};
 
-use crate::{proto, Error};
+use super::{BlobService, BlobWriter};
+use crate::Error;
 
-use super::BlobService;
+// type B3Digest = [u8; 32];
+// struct B3Digest ([u8; 32]);
 
 #[derive(Clone, Default)]
 pub struct MemoryBlobService {
-    db: Arc<RwLock<HashMap<Vec<u8>, proto::BlobMeta>>>,
+    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
 }
 
 impl BlobService for MemoryBlobService {
-    #[instrument(skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))]
-    fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> {
-        if req.include_bao {
-            todo!("not implemented yet")
-        }
+    type BlobReader = Cursor<Vec<u8>>;
+    type BlobWriter = MemoryBlobWriter;
 
+    #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))]
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
         let db = self.db.read().unwrap();
-        // if include_chunks is also false, the user only wants to know if the
-        // blob is present at all.
-        if !req.include_chunks {
-            Ok(if db.contains_key(&req.digest) {
-                Some(proto::BlobMeta::default())
-            } else {
-                None
-            })
-        } else {
-            match db.get(&req.digest) {
-                None => Ok(None),
-                Some(blob_meta) => Ok(Some(blob_meta.clone())),
-            }
+        Ok(db.contains_key(digest))
+    }
+
+    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
+        let db = self.db.read().unwrap();
+
+        Ok(db.get(digest).map(|x| Cursor::new(x.clone())))
+    }
+
+    #[instrument(skip(self))]
+    fn open_write(&self) -> Result<Self::BlobWriter, Error> {
+        Ok(MemoryBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct MemoryBlobWriter {
+    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
+
+    buf: Vec<u8>,
+}
+
+impl MemoryBlobWriter {
+    fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self {
+        Self {
+            buf: Vec::new(),
+            db,
         }
     }
+}
+impl std::io::Write for MemoryBlobWriter {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        self.buf.write(buf)
+    }
+
+    fn flush(&mut self) -> std::io::Result<()> {
+        self.buf.flush()
+    }
+}
 
-    #[instrument(skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))]
-    fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> {
-        let mut db = self.db.write().unwrap();
+impl BlobWriter for MemoryBlobWriter {
+    fn close(self) -> Result<[u8; 32], Error> {
+        // in this memory implementation, we don't actually bother hashing
+        // incrementally while writing, but do it at the end.
+        let mut hasher = blake3::Hasher::new();
+        hasher.update(&self.buf);
+        let digest: [u8; 32] = hasher.finalize().into();
 
-        db.insert(blob_digest.to_vec(), blob_meta);
+        // open the database for writing.
+        let mut db = self.db.write()?;
+        db.insert(digest, self.buf);
 
-        Ok(())
-        // TODO: make sure all callers make sure the chunks exist.
-        // TODO: where should we calculate the bao?
+        Ok(digest)
     }
 }