about summary refs log tree commit diff
path: root/tvix/store/src/blobservice
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-05-11T12·49+0300
committerflokli <flokli@flokli.de>2023-05-11T14·27+0000
commit616fa4476f93e1782e68dc713e9e8cb77a426c7d (patch)
treef76a43e95c75d848d079706fbccfd442210ebebc /tvix/store/src/blobservice
parentb22b685f0b2524c088deacbf4e80e7b7c73b5afc (diff)
refactor(tvix/store): remove ChunkService r/6133
Whether chunking is involved or not, is an implementation detail of each
Blobstore. Consumers of a whole blob shouldn't need to worry about that.
It currently is not visible in the gRPC interface either. It
shouldn't bleed into everything.

Let the BlobService trait provide `open_read` and `open_write` methods,
which return handles providing io::Read or io::Write, and leave the
details up to the implementation.

This means, our custom BlobReader module can go away, and all the
chunking bits in there, too.

In the future, we might still want to add more chunking-aware syncing,
but as a syncing strategy some stores can expose, not as a fundamental
protocol component.

This currently needs "SyncReadIntoAsyncRead", taken and vendored in from
https://github.com/tokio-rs/tokio/pull/5669.
It provides a AsyncRead for a sync Read, which is necessary to connect
our (sync) BlobReader interface to a GRPC server implementation.

As an alternative, we could also make the BlobReader itself async, and
let consumers of the trait (EvalIO) deal with the async-ness, but this
is less of a change for now.

In terms of vendoring, I initially tried to move our tokio crate to
these commits, but ended up in version incompatibilities, so let's
vendor it in for now.

Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/store/src/blobservice')
-rw-r--r--tvix/store/src/blobservice/memory.rs87
-rw-r--r--tvix/store/src/blobservice/mod.rs37
-rw-r--r--tvix/store/src/blobservice/sled.rs103
3 files changed, 150 insertions, 77 deletions
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
index 57c86551e4..9a796ca2c0 100644
--- a/tvix/store/src/blobservice/memory.rs
+++ b/tvix/store/src/blobservice/memory.rs
@@ -1,51 +1,80 @@
 use data_encoding::BASE64;
+use std::io::Cursor;
 use std::{
     collections::HashMap,
     sync::{Arc, RwLock},
 };
-use tracing::instrument;
+use tracing::{instrument, warn};
 
-use crate::{proto, Error};
+use super::{BlobService, BlobWriter};
+use crate::Error;
 
-use super::BlobService;
+// type B3Digest = [u8; 32];
+// struct B3Digest ([u8; 32]);
 
 #[derive(Clone, Default)]
 pub struct MemoryBlobService {
-    db: Arc<RwLock<HashMap<Vec<u8>, proto::BlobMeta>>>,
+    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
 }
 
 impl BlobService for MemoryBlobService {
-    #[instrument(skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))]
-    fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> {
-        if req.include_bao {
-            todo!("not implemented yet")
-        }
+    type BlobReader = Cursor<Vec<u8>>;
+    type BlobWriter = MemoryBlobWriter;
 
+    #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))]
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
         let db = self.db.read().unwrap();
-        // if include_chunks is also false, the user only wants to know if the
-        // blob is present at all.
-        if !req.include_chunks {
-            Ok(if db.contains_key(&req.digest) {
-                Some(proto::BlobMeta::default())
-            } else {
-                None
-            })
-        } else {
-            match db.get(&req.digest) {
-                None => Ok(None),
-                Some(blob_meta) => Ok(Some(blob_meta.clone())),
-            }
+        Ok(db.contains_key(digest))
+    }
+
+    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
+        let db = self.db.read().unwrap();
+
+        Ok(db.get(digest).map(|x| Cursor::new(x.clone())))
+    }
+
+    #[instrument(skip(self))]
+    fn open_write(&self) -> Result<Self::BlobWriter, Error> {
+        Ok(MemoryBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct MemoryBlobWriter {
+    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
+
+    buf: Vec<u8>,
+}
+
+impl MemoryBlobWriter {
+    fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self {
+        Self {
+            buf: Vec::new(),
+            db,
         }
     }
+}
+impl std::io::Write for MemoryBlobWriter {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        self.buf.write(buf)
+    }
+
+    fn flush(&mut self) -> std::io::Result<()> {
+        self.buf.flush()
+    }
+}
 
-    #[instrument(skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))]
-    fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> {
-        let mut db = self.db.write().unwrap();
+impl BlobWriter for MemoryBlobWriter {
+    fn close(self) -> Result<[u8; 32], Error> {
+        // in this memory implementation, we don't actually bother hashing
+        // incrementally while writing, but do it at the end.
+        let mut hasher = blake3::Hasher::new();
+        hasher.update(&self.buf);
+        let digest: [u8; 32] = hasher.finalize().into();
 
-        db.insert(blob_digest.to_vec(), blob_meta);
+        // open the database for writing.
+        let mut db = self.db.write()?;
+        db.insert(digest, self.buf);
 
-        Ok(())
-        // TODO: make sure all callers make sure the chunks exist.
-        // TODO: where should we calculate the bao?
+        Ok(digest)
     }
 }
diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs
index 53e941795e..e97ccdf335 100644
--- a/tvix/store/src/blobservice/mod.rs
+++ b/tvix/store/src/blobservice/mod.rs
@@ -1,4 +1,6 @@
-use crate::{proto, Error};
+use std::io;
+
+use crate::Error;
 
 mod memory;
 mod sled;
@@ -7,14 +9,31 @@ pub use self::memory::MemoryBlobService;
 pub use self::sled::SledBlobService;
 
 /// The base trait all BlobService services need to implement.
-/// It provides information about how a blob is chunked,
-/// and allows creating new blobs by creating a BlobMeta (referring to chunks
-/// in a [crate::chunkservice::ChunkService]).
+/// It provides functions to check whether a given blob exists,
+/// a way to get a [io::Read] to a blob, and a method to initiate writing a new
+/// Blob, which returns a [BlobWriter], that can be used
 pub trait BlobService {
-    /// Retrieve chunking information for a given blob
-    fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error>;
+    type BlobReader: io::Read + Send + std::marker::Unpin;
+    type BlobWriter: BlobWriter + Send;
+
+    /// Check if the service has the blob, by its content hash.
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
+
+    /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>.
+    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error>;
+
+    /// Insert a new blob into the store. Returns a [BlobWriter], which
+    /// implements [io::Write] and a [BlobWriter::close].
+    /// TODO: is there any reason we want this to be a Result<>, and not just T?
+    fn open_write(&self) -> Result<Self::BlobWriter, Error>;
+}
 
-    /// Insert chunking information for a given blob.
-    /// Implementations SHOULD make sure chunks referred do exist.
-    fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error>;
+/// A [io::Write] that you need to close() afterwards, and get back the digest
+/// of the written blob.
+pub trait BlobWriter: io::Write {
+    /// Signal there's no more data to be written, and return the digest of the
+    /// contents written.
+    ///
+    /// This consumes self, so it's not possible to close twice.
+    fn close(self) -> Result<[u8; 32], Error>;
 }
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs
index 1ae34ee5fb..c229f799de 100644
--- a/tvix/store/src/blobservice/sled.rs
+++ b/tvix/store/src/blobservice/sled.rs
@@ -1,13 +1,13 @@
-use std::path::PathBuf;
+use std::{
+    io::{self, Cursor},
+    path::PathBuf,
+};
 
+use super::{BlobService, BlobWriter};
+use crate::Error;
 use data_encoding::BASE64;
-use prost::Message;
 use tracing::instrument;
 
-use crate::{proto, Error};
-
-use super::BlobService;
-
 #[derive(Clone)]
 pub struct SledBlobService {
     db: sled::Db,
@@ -30,44 +30,69 @@ impl SledBlobService {
 }
 
 impl BlobService for SledBlobService {
-    #[instrument(name = "SledBlobService::stat", skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))]
-    fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> {
-        if req.include_bao {
-            todo!("not implemented yet")
+    type BlobReader = Cursor<Vec<u8>>;
+    type BlobWriter = SledBlobWriter;
+
+    #[instrument(name = "SledBlobService::has", skip(self), fields(blob.digest=BASE64.encode(digest)))]
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
+        match self.db.contains_key(digest) {
+            Ok(has) => Ok(has),
+            Err(e) => Err(Error::StorageError(e.to_string())),
         }
+    }
 
-        // if include_chunks is also false, the user only wants to know if the
-        // blob is present at all.
-        if !req.include_chunks {
-            match self.db.contains_key(&req.digest) {
-                Ok(false) => Ok(None),
-                Ok(true) => Ok(Some(proto::BlobMeta::default())),
-                Err(e) => Err(Error::StorageError(e.to_string())),
-            }
-        } else {
-            match self.db.get(&req.digest) {
-                Ok(None) => Ok(None),
-                Ok(Some(data)) => match proto::BlobMeta::decode(&*data) {
-                    Ok(blob_meta) => Ok(Some(blob_meta)),
-                    Err(e) => Err(Error::StorageError(format!(
-                        "unable to parse blobmeta message for blob {}: {}",
-                        BASE64.encode(&req.digest),
-                        e
-                    ))),
-                },
-                Err(e) => Err(Error::StorageError(e.to_string())),
-            }
+    #[instrument(name = "SledBlobService::open_read", skip(self), fields(blob.digest=BASE64.encode(digest)))]
+    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
+        match self.db.get(digest) {
+            Ok(None) => Ok(None),
+            Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))),
+            Err(e) => Err(Error::StorageError(e.to_string())),
         }
     }
 
-    #[instrument(name = "SledBlobService::put", skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))]
-    fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> {
-        let result = self.db.insert(blob_digest, blob_meta.encode_to_vec());
-        if let Err(e) = result {
-            return Err(Error::StorageError(e.to_string()));
+    #[instrument(name = "SledBlobService::open_write", skip(self))]
+    fn open_write(&self) -> Result<Self::BlobWriter, Error> {
+        Ok(SledBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct SledBlobWriter {
+    db: sled::Db,
+    buf: Vec<u8>,
+    hasher: blake3::Hasher,
+}
+
+impl SledBlobWriter {
+    pub fn new(db: sled::Db) -> Self {
+        Self {
+            buf: Vec::default(),
+            db,
+            hasher: blake3::Hasher::new(),
         }
-        Ok(())
-        // TODO: make sure all callers make sure the chunks exist.
-        // TODO: where should we calculate the bao?
+    }
+}
+
+impl io::Write for SledBlobWriter {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        let bytes_written = self.buf.write(buf)?;
+        self.hasher.write(&buf[..bytes_written])
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.buf.flush()
+    }
+}
+
+impl BlobWriter for SledBlobWriter {
+    fn close(self) -> Result<[u8; 32], Error> {
+        let digest = self.hasher.finalize();
+        self.db.insert(digest.as_bytes(), self.buf).map_err(|e| {
+            Error::StorageError(format!("unable to insert blob: {}", e.to_string()))
+        })?;
+
+        Ok(digest
+            .to_owned()
+            .try_into()
+            .map_err(|_| Error::StorageError("invalid digest length in response".to_string()))?)
     }
 }