diff options
Diffstat (limited to 'tvix/store/src/blobservice')
-rw-r--r-- | tvix/store/src/blobservice/memory.rs | 87 | ||||
-rw-r--r-- | tvix/store/src/blobservice/mod.rs | 37 | ||||
-rw-r--r-- | tvix/store/src/blobservice/sled.rs | 103 |
3 files changed, 150 insertions, 77 deletions
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 57c86551e4de..9a796ca2c0c8 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -1,51 +1,80 @@ use data_encoding::BASE64; +use std::io::Cursor; use std::{ collections::HashMap, sync::{Arc, RwLock}, }; -use tracing::instrument; +use tracing::{instrument, warn}; -use crate::{proto, Error}; +use super::{BlobService, BlobWriter}; +use crate::Error; -use super::BlobService; +// type B3Digest = [u8; 32]; +// struct B3Digest ([u8; 32]); #[derive(Clone, Default)] pub struct MemoryBlobService { - db: Arc<RwLock<HashMap<Vec<u8>, proto::BlobMeta>>>, + db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, } impl BlobService for MemoryBlobService { - #[instrument(skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))] - fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> { - if req.include_bao { - todo!("not implemented yet") - } + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = MemoryBlobWriter; + #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))] + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { let db = self.db.read().unwrap(); - // if include_chunks is also false, the user only wants to know if the - // blob is present at all. - if !req.include_chunks { - Ok(if db.contains_key(&req.digest) { - Some(proto::BlobMeta::default()) - } else { - None - }) - } else { - match db.get(&req.digest) { - None => Ok(None), - Some(blob_meta) => Ok(Some(blob_meta.clone())), - } + Ok(db.contains_key(digest)) + } + + fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> { + let db = self.db.read().unwrap(); + + Ok(db.get(digest).map(|x| Cursor::new(x.clone()))) + } + + #[instrument(skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(MemoryBlobWriter::new(self.db.clone())) + } +} + +pub struct MemoryBlobWriter { + db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, + + buf: Vec<u8>, +} + +impl MemoryBlobWriter { + fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self { + Self { + buf: Vec::new(), + db, } } +} +impl std::io::Write for MemoryBlobWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + self.buf.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.buf.flush() + } +} - #[instrument(skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))] - fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> { - let mut db = self.db.write().unwrap(); +impl BlobWriter for MemoryBlobWriter { + fn close(self) -> Result<[u8; 32], Error> { + // in this memory implementation, we don't actually bother hashing + // incrementally while writing, but do it at the end. + let mut hasher = blake3::Hasher::new(); + hasher.update(&self.buf); + let digest: [u8; 32] = hasher.finalize().into(); - db.insert(blob_digest.to_vec(), blob_meta); + // open the database for writing. + let mut db = self.db.write()?; + db.insert(digest, self.buf); - Ok(()) - // TODO: make sure all callers make sure the chunks exist. - // TODO: where should we calculate the bao? + Ok(digest) } } diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index 53e941795e7e..e97ccdf335a0 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -1,4 +1,6 @@ -use crate::{proto, Error}; +use std::io; + +use crate::Error; mod memory; mod sled; @@ -7,14 +9,31 @@ pub use self::memory::MemoryBlobService; pub use self::sled::SledBlobService; /// The base trait all BlobService services need to implement. -/// It provides information about how a blob is chunked, -/// and allows creating new blobs by creating a BlobMeta (referring to chunks -/// in a [crate::chunkservice::ChunkService]). +/// It provides functions to check whether a given blob exists, +/// a way to get a [io::Read] to a blob, and a method to initiate writing a new +/// Blob, which returns a [BlobWriter], that can be used pub trait BlobService { - /// Retrieve chunking information for a given blob - fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error>; + type BlobReader: io::Read + Send + std::marker::Unpin; + type BlobWriter: BlobWriter + Send; + + /// Check if the service has the blob, by its content hash. + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>; + + /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>. + fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error>; + + /// Insert a new blob into the store. Returns a [BlobWriter], which + /// implements [io::Write] and a [BlobWriter::close]. + /// TODO: is there any reason we want this to be a Result<>, and not just T? + fn open_write(&self) -> Result<Self::BlobWriter, Error>; +} - /// Insert chunking information for a given blob. - /// Implementations SHOULD make sure chunks referred do exist. - fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error>; +/// A [io::Write] that you need to close() afterwards, and get back the digest +/// of the written blob. +pub trait BlobWriter: io::Write { + /// Signal there's no more data to be written, and return the digest of the + /// contents written. + /// + /// This consumes self, so it's not possible to close twice. + fn close(self) -> Result<[u8; 32], Error>; } diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 1ae34ee5fb9c..c229f799de98 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -1,13 +1,13 @@ -use std::path::PathBuf; +use std::{ + io::{self, Cursor}, + path::PathBuf, +}; +use super::{BlobService, BlobWriter}; +use crate::Error; use data_encoding::BASE64; -use prost::Message; use tracing::instrument; -use crate::{proto, Error}; - -use super::BlobService; - #[derive(Clone)] pub struct SledBlobService { db: sled::Db, @@ -30,44 +30,69 @@ impl SledBlobService { } impl BlobService for SledBlobService { - #[instrument(name = "SledBlobService::stat", skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))] - fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> { - if req.include_bao { - todo!("not implemented yet") + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = SledBlobWriter; + + #[instrument(name = "SledBlobService::has", skip(self), fields(blob.digest=BASE64.encode(digest)))] + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { + match self.db.contains_key(digest) { + Ok(has) => Ok(has), + Err(e) => Err(Error::StorageError(e.to_string())), } + } - // if include_chunks is also false, the user only wants to know if the - // blob is present at all. - if !req.include_chunks { - match self.db.contains_key(&req.digest) { - Ok(false) => Ok(None), - Ok(true) => Ok(Some(proto::BlobMeta::default())), - Err(e) => Err(Error::StorageError(e.to_string())), - } - } else { - match self.db.get(&req.digest) { - Ok(None) => Ok(None), - Ok(Some(data)) => match proto::BlobMeta::decode(&*data) { - Ok(blob_meta) => Ok(Some(blob_meta)), - Err(e) => Err(Error::StorageError(format!( - "unable to parse blobmeta message for blob {}: {}", - BASE64.encode(&req.digest), - e - ))), - }, - Err(e) => Err(Error::StorageError(e.to_string())), - } + #[instrument(name = "SledBlobService::open_read", skip(self), fields(blob.digest=BASE64.encode(digest)))] + fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> { + match self.db.get(digest) { + Ok(None) => Ok(None), + Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))), + Err(e) => Err(Error::StorageError(e.to_string())), } } - #[instrument(name = "SledBlobService::put", skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))] - fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> { - let result = self.db.insert(blob_digest, blob_meta.encode_to_vec()); - if let Err(e) = result { - return Err(Error::StorageError(e.to_string())); + #[instrument(name = "SledBlobService::open_write", skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(SledBlobWriter::new(self.db.clone())) + } +} + +pub struct SledBlobWriter { + db: sled::Db, + buf: Vec<u8>, + hasher: blake3::Hasher, +} + +impl SledBlobWriter { + pub fn new(db: sled::Db) -> Self { + Self { + buf: Vec::default(), + db, + hasher: blake3::Hasher::new(), } - Ok(()) - // TODO: make sure all callers make sure the chunks exist. - // TODO: where should we calculate the bao? + } +} + +impl io::Write for SledBlobWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + let bytes_written = self.buf.write(buf)?; + self.hasher.write(&buf[..bytes_written]) + } + + fn flush(&mut self) -> io::Result<()> { + self.buf.flush() + } +} + +impl BlobWriter for SledBlobWriter { + fn close(self) -> Result<[u8; 32], Error> { + let digest = self.hasher.finalize(); + self.db.insert(digest.as_bytes(), self.buf).map_err(|e| { + Error::StorageError(format!("unable to insert blob: {}", e.to_string())) + })?; + + Ok(digest + .to_owned() + .try_into() + .map_err(|_| Error::StorageError("invalid digest length in response".to_string()))?) } } |