diff options
Diffstat (limited to 'tvix/store/src/blobservice/memory.rs')
-rw-r--r-- | tvix/store/src/blobservice/memory.rs | 87 |
1 files changed, 58 insertions, 29 deletions
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 57c86551e4de..9a796ca2c0c8 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -1,51 +1,80 @@ use data_encoding::BASE64; +use std::io::Cursor; use std::{ collections::HashMap, sync::{Arc, RwLock}, }; -use tracing::instrument; +use tracing::{instrument, warn}; -use crate::{proto, Error}; +use super::{BlobService, BlobWriter}; +use crate::Error; -use super::BlobService; +// type B3Digest = [u8; 32]; +// struct B3Digest ([u8; 32]); #[derive(Clone, Default)] pub struct MemoryBlobService { - db: Arc<RwLock<HashMap<Vec<u8>, proto::BlobMeta>>>, + db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, } impl BlobService for MemoryBlobService { - #[instrument(skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))] - fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> { - if req.include_bao { - todo!("not implemented yet") - } + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = MemoryBlobWriter; + #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))] + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { let db = self.db.read().unwrap(); - // if include_chunks is also false, the user only wants to know if the - // blob is present at all. - if !req.include_chunks { - Ok(if db.contains_key(&req.digest) { - Some(proto::BlobMeta::default()) - } else { - None - }) - } else { - match db.get(&req.digest) { - None => Ok(None), - Some(blob_meta) => Ok(Some(blob_meta.clone())), - } + Ok(db.contains_key(digest)) + } + + fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> { + let db = self.db.read().unwrap(); + + Ok(db.get(digest).map(|x| Cursor::new(x.clone()))) + } + + #[instrument(skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(MemoryBlobWriter::new(self.db.clone())) + } +} + +pub struct MemoryBlobWriter { + db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, + + buf: Vec<u8>, +} + +impl MemoryBlobWriter { + fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self { + Self { + buf: Vec::new(), + db, } } +} +impl std::io::Write for MemoryBlobWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + self.buf.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.buf.flush() + } +} - #[instrument(skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))] - fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> { - let mut db = self.db.write().unwrap(); +impl BlobWriter for MemoryBlobWriter { + fn close(self) -> Result<[u8; 32], Error> { + // in this memory implementation, we don't actually bother hashing + // incrementally while writing, but do it at the end. + let mut hasher = blake3::Hasher::new(); + hasher.update(&self.buf); + let digest: [u8; 32] = hasher.finalize().into(); - db.insert(blob_digest.to_vec(), blob_meta); + // open the database for writing. + let mut db = self.db.write()?; + db.insert(digest, self.buf); - Ok(()) - // TODO: make sure all callers make sure the chunks exist. - // TODO: where should we calculate the bao? + Ok(digest) } } |