about summary refs log tree commit diff
path: root/tvix/store/src/blobservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/blobservice')
-rw-r--r--tvix/store/src/blobservice/memory.rs87
-rw-r--r--tvix/store/src/blobservice/mod.rs37
-rw-r--r--tvix/store/src/blobservice/sled.rs103
3 files changed, 150 insertions, 77 deletions
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
index 57c86551e4..9a796ca2c0 100644
--- a/tvix/store/src/blobservice/memory.rs
+++ b/tvix/store/src/blobservice/memory.rs
@@ -1,51 +1,80 @@
 use data_encoding::BASE64;
+use std::io::Cursor;
 use std::{
     collections::HashMap,
     sync::{Arc, RwLock},
 };
-use tracing::instrument;
+use tracing::{instrument, warn};
 
-use crate::{proto, Error};
+use super::{BlobService, BlobWriter};
+use crate::Error;
 
-use super::BlobService;
+// type B3Digest = [u8; 32];
+// struct B3Digest ([u8; 32]);
 
 #[derive(Clone, Default)]
 pub struct MemoryBlobService {
-    db: Arc<RwLock<HashMap<Vec<u8>, proto::BlobMeta>>>,
+    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
 }
 
 impl BlobService for MemoryBlobService {
-    #[instrument(skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))]
-    fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> {
-        if req.include_bao {
-            todo!("not implemented yet")
-        }
+    type BlobReader = Cursor<Vec<u8>>;
+    type BlobWriter = MemoryBlobWriter;
 
+    #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))]
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
         let db = self.db.read().unwrap();
-        // if include_chunks is also false, the user only wants to know if the
-        // blob is present at all.
-        if !req.include_chunks {
-            Ok(if db.contains_key(&req.digest) {
-                Some(proto::BlobMeta::default())
-            } else {
-                None
-            })
-        } else {
-            match db.get(&req.digest) {
-                None => Ok(None),
-                Some(blob_meta) => Ok(Some(blob_meta.clone())),
-            }
+        Ok(db.contains_key(digest))
+    }
+
+    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
+        let db = self.db.read().unwrap();
+
+        Ok(db.get(digest).map(|x| Cursor::new(x.clone())))
+    }
+
+    #[instrument(skip(self))]
+    fn open_write(&self) -> Result<Self::BlobWriter, Error> {
+        Ok(MemoryBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct MemoryBlobWriter {
+    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
+
+    buf: Vec<u8>,
+}
+
+impl MemoryBlobWriter {
+    fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self {
+        Self {
+            buf: Vec::new(),
+            db,
         }
     }
+}
+impl std::io::Write for MemoryBlobWriter {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        self.buf.write(buf)
+    }
+
+    fn flush(&mut self) -> std::io::Result<()> {
+        self.buf.flush()
+    }
+}
 
-    #[instrument(skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))]
-    fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> {
-        let mut db = self.db.write().unwrap();
+impl BlobWriter for MemoryBlobWriter {
+    fn close(self) -> Result<[u8; 32], Error> {
+        // in this memory implementation, we don't actually bother hashing
+        // incrementally while writing, but do it at the end.
+        let mut hasher = blake3::Hasher::new();
+        hasher.update(&self.buf);
+        let digest: [u8; 32] = hasher.finalize().into();
 
-        db.insert(blob_digest.to_vec(), blob_meta);
+        // open the database for writing.
+        let mut db = self.db.write()?;
+        db.insert(digest, self.buf);
 
-        Ok(())
-        // TODO: make sure all callers make sure the chunks exist.
-        // TODO: where should we calculate the bao?
+        Ok(digest)
     }
 }
diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs
index 53e941795e..e97ccdf335 100644
--- a/tvix/store/src/blobservice/mod.rs
+++ b/tvix/store/src/blobservice/mod.rs
@@ -1,4 +1,6 @@
-use crate::{proto, Error};
+use std::io;
+
+use crate::Error;
 
 mod memory;
 mod sled;
@@ -7,14 +9,31 @@ pub use self::memory::MemoryBlobService;
 pub use self::sled::SledBlobService;
 
 /// The base trait all BlobService services need to implement.
-/// It provides information about how a blob is chunked,
-/// and allows creating new blobs by creating a BlobMeta (referring to chunks
-/// in a [crate::chunkservice::ChunkService]).
+/// It provides functions to check whether a given blob exists,
+/// a way to get a [io::Read] to a blob, and a method to initiate writing a new
+/// Blob, which returns a [BlobWriter], that can be used
 pub trait BlobService {
-    /// Retrieve chunking information for a given blob
-    fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error>;
+    type BlobReader: io::Read + Send + std::marker::Unpin;
+    type BlobWriter: BlobWriter + Send;
+
+    /// Check if the service has the blob, by its content hash.
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
+
+    /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>.
+    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error>;
+
+    /// Insert a new blob into the store. Returns a [BlobWriter], which
+    /// implements [io::Write] and a [BlobWriter::close].
+    /// TODO: is there any reason we want this to be a Result<>, and not just T?
+    fn open_write(&self) -> Result<Self::BlobWriter, Error>;
+}
 
-    /// Insert chunking information for a given blob.
-    /// Implementations SHOULD make sure chunks referred do exist.
-    fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error>;
+/// A [io::Write] that you need to close() afterwards, and get back the digest
+/// of the written blob.
+pub trait BlobWriter: io::Write {
+    /// Signal there's no more data to be written, and return the digest of the
+    /// contents written.
+    ///
+    /// This consumes self, so it's not possible to close twice.
+    fn close(self) -> Result<[u8; 32], Error>;
 }
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs
index 1ae34ee5fb..c229f799de 100644
--- a/tvix/store/src/blobservice/sled.rs
+++ b/tvix/store/src/blobservice/sled.rs
@@ -1,13 +1,13 @@
-use std::path::PathBuf;
+use std::{
+    io::{self, Cursor},
+    path::PathBuf,
+};
 
+use super::{BlobService, BlobWriter};
+use crate::Error;
 use data_encoding::BASE64;
-use prost::Message;
 use tracing::instrument;
 
-use crate::{proto, Error};
-
-use super::BlobService;
-
 #[derive(Clone)]
 pub struct SledBlobService {
     db: sled::Db,
@@ -30,44 +30,69 @@ impl SledBlobService {
 }
 
 impl BlobService for SledBlobService {
-    #[instrument(name = "SledBlobService::stat", skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))]
-    fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> {
-        if req.include_bao {
-            todo!("not implemented yet")
+    type BlobReader = Cursor<Vec<u8>>;
+    type BlobWriter = SledBlobWriter;
+
+    #[instrument(name = "SledBlobService::has", skip(self), fields(blob.digest=BASE64.encode(digest)))]
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
+        match self.db.contains_key(digest) {
+            Ok(has) => Ok(has),
+            Err(e) => Err(Error::StorageError(e.to_string())),
         }
+    }
 
-        // if include_chunks is also false, the user only wants to know if the
-        // blob is present at all.
-        if !req.include_chunks {
-            match self.db.contains_key(&req.digest) {
-                Ok(false) => Ok(None),
-                Ok(true) => Ok(Some(proto::BlobMeta::default())),
-                Err(e) => Err(Error::StorageError(e.to_string())),
-            }
-        } else {
-            match self.db.get(&req.digest) {
-                Ok(None) => Ok(None),
-                Ok(Some(data)) => match proto::BlobMeta::decode(&*data) {
-                    Ok(blob_meta) => Ok(Some(blob_meta)),
-                    Err(e) => Err(Error::StorageError(format!(
-                        "unable to parse blobmeta message for blob {}: {}",
-                        BASE64.encode(&req.digest),
-                        e
-                    ))),
-                },
-                Err(e) => Err(Error::StorageError(e.to_string())),
-            }
+    #[instrument(name = "SledBlobService::open_read", skip(self), fields(blob.digest=BASE64.encode(digest)))]
+    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
+        match self.db.get(digest) {
+            Ok(None) => Ok(None),
+            Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))),
+            Err(e) => Err(Error::StorageError(e.to_string())),
         }
     }
 
-    #[instrument(name = "SledBlobService::put", skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))]
-    fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> {
-        let result = self.db.insert(blob_digest, blob_meta.encode_to_vec());
-        if let Err(e) = result {
-            return Err(Error::StorageError(e.to_string()));
+    #[instrument(name = "SledBlobService::open_write", skip(self))]
+    fn open_write(&self) -> Result<Self::BlobWriter, Error> {
+        Ok(SledBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct SledBlobWriter {
+    db: sled::Db,
+    buf: Vec<u8>,
+    hasher: blake3::Hasher,
+}
+
+impl SledBlobWriter {
+    pub fn new(db: sled::Db) -> Self {
+        Self {
+            buf: Vec::default(),
+            db,
+            hasher: blake3::Hasher::new(),
         }
-        Ok(())
-        // TODO: make sure all callers make sure the chunks exist.
-        // TODO: where should we calculate the bao?
+    }
+}
+
+impl io::Write for SledBlobWriter {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        let bytes_written = self.buf.write(buf)?;
+        self.hasher.write(&buf[..bytes_written])
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.buf.flush()
+    }
+}
+
+impl BlobWriter for SledBlobWriter {
+    fn close(self) -> Result<[u8; 32], Error> {
+        let digest = self.hasher.finalize();
+        self.db.insert(digest.as_bytes(), self.buf).map_err(|e| {
+            Error::StorageError(format!("unable to insert blob: {}", e.to_string()))
+        })?;
+
+        Ok(digest
+            .to_owned()
+            .try_into()
+            .map_err(|_| Error::StorageError("invalid digest length in response".to_string()))?)
     }
 }