diff options
Diffstat (limited to 'tvix/store/src/blobservice')
-rw-r--r-- | tvix/store/src/blobservice/grpc.rs | 96 | ||||
-rw-r--r-- | tvix/store/src/blobservice/memory.rs | 89 | ||||
-rw-r--r-- | tvix/store/src/blobservice/mod.rs | 15 | ||||
-rw-r--r-- | tvix/store/src/blobservice/sled.rs | 81 |
4 files changed, 192 insertions, 89 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index 0b08fbf46ad9..46ec64bce785 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -47,9 +47,6 @@ impl GRPCBlobService { } impl BlobService for GRPCBlobService { - type BlobReader = Box<dyn io::Read + Send>; - type BlobWriter = GRPCBlobWriter; - #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. @@ -76,7 +73,10 @@ impl BlobService for GRPCBlobService { // On success, this returns a Ok(Some(io::Read)), which can be used to read // the contents of the Blob, identified by the digest. - fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, crate::Error> { + fn open_read( + &self, + digest: &B3Digest, + ) -> Result<Option<Box<dyn io::Read + Send>>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest = digest.clone(); @@ -123,7 +123,7 @@ impl BlobService for GRPCBlobService { /// Returns a [Self::BlobWriter], that'll internally wrap each write in a // [proto::BlobChunk] and which is passed to the - fn open_write(&self) -> Result<Self::BlobWriter, crate::Error> { + fn open_write(&self) -> Result<Box<dyn BlobWriter>, crate::Error> { let mut grpc_client = self.grpc_client.clone(); // set up an mpsc channel passing around Bytes. @@ -155,11 +155,11 @@ impl BlobService for GRPCBlobService { // … which is then turned into a [io::Write]. let writer = SyncIoBridge::new(async_writer); - Ok(GRPCBlobWriter { + Ok(Box::new(GRPCBlobWriter { tokio_handle: self.tokio_handle.clone(), // TODO: is the clone() ok here? - task, - inner_writer: writer, - }) + task_and_writer: Some((task, writer)), + digest: None, + })) } } @@ -176,42 +176,74 @@ pub struct GRPCBlobWriter { /// containing the put request. tokio_handle: tokio::runtime::Handle, - /// The task containing the put request. - task: JoinHandle<Result<proto::PutBlobResponse, Status>>, + /// The task containing the put request, and the inner writer, if we're still writing. + task_and_writer: Option<( + JoinHandle<Result<proto::PutBlobResponse, Status>>, + BridgedWriter, + )>, - /// The inner Writer. - inner_writer: BridgedWriter, + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, } impl BlobWriter for GRPCBlobWriter { - fn close(mut self) -> Result<B3Digest, crate::Error> { - // invoke shutdown, so the inner writer closes its internal tx side of - // the channel. - self.inner_writer - .shutdown() - .map_err(|e| crate::Error::StorageError(e.to_string()))?; - - // block on the RPC call to return. - // This ensures all chunks are sent out, and have been received by the - // backend. - match self.tokio_handle.block_on(self.task)? { - Ok(resp) => { - // return the digest from the response. - B3Digest::from_vec(resp.digest).map_err(|_| { - crate::Error::StorageError("invalid root digest length in response".to_string()) - }) + fn close(&mut self) -> Result<B3Digest, crate::Error> { + if self.task_and_writer.is_none() { + // if we're already closed, return the b3 digest, which must exist. + // If it doesn't, we already closed and failed once, and didn't handle the error. + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (task, mut writer) = self.task_and_writer.take().unwrap(); + + // invoke shutdown, so the inner writer closes its internal tx side of + // the channel. + writer + .shutdown() + .map_err(|e| crate::Error::StorageError(e.to_string()))?; + + // block on the RPC call to return. + // This ensures all chunks are sent out, and have been received by the + // backend. + match self.tokio_handle.block_on(task)? { + Ok(resp) => { + // return the digest from the response, and store it in self.digest for subsequent closes. + let digest = B3Digest::from_vec(resp.digest).map_err(|_| { + crate::Error::StorageError( + "invalid root digest length in response".to_string(), + ) + })?; + self.digest = Some(digest.clone()); + Ok(digest) + } + Err(e) => Err(crate::Error::StorageError(e.to_string())), } - Err(e) => Err(crate::Error::StorageError(e.to_string())), } } } impl io::Write for GRPCBlobWriter { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.inner_writer.write(buf) + match &mut self.task_and_writer { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((_, ref mut writer)) => writer.write(buf), + } } fn flush(&mut self) -> io::Result<()> { - self.inner_writer.flush() + match &mut self.task_and_writer { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((_, ref mut writer)) => writer.flush(), + } } } diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 1ee59d108743..166eeabdb6a2 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -1,4 +1,4 @@ -use std::io::Cursor; +use std::io::{self, Cursor}; use std::{ collections::HashMap, sync::{Arc, RwLock}, @@ -14,63 +14,102 @@ pub struct MemoryBlobService { } impl BlobService for MemoryBlobService { - type BlobReader = Cursor<Vec<u8>>; - type BlobWriter = MemoryBlobWriter; - #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, Error> { let db = self.db.read().unwrap(); Ok(db.contains_key(digest)) } - fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> { + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> { let db = self.db.read().unwrap(); - Ok(db.get(digest).map(|x| Cursor::new(x.clone()))) + match db.get(digest).map(|x| Cursor::new(x.clone())) { + Some(result) => Ok(Some(Box::new(result))), + None => Ok(None), + } } #[instrument(skip(self))] - fn open_write(&self) -> Result<Self::BlobWriter, Error> { - Ok(MemoryBlobWriter::new(self.db.clone())) + fn open_write(&self) -> Result<Box<dyn BlobWriter>, Error> { + Ok(Box::new(MemoryBlobWriter::new(self.db.clone()))) } } pub struct MemoryBlobWriter { db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, - buf: Vec<u8>, + /// Contains the Vec and hasher, or None if already closed + writers: Option<(Vec<u8>, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, } impl MemoryBlobWriter { fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self { Self { - buf: Vec::new(), db, + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, } } } impl std::io::Write for MemoryBlobWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { - self.buf.write(buf) + fn write(&mut self, b: &[u8]) -> std::io::Result<usize> { + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&buf[..bytes_written]) + } + } } fn flush(&mut self) -> std::io::Result<()> { - self.buf.flush() + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + } } } impl BlobWriter for MemoryBlobWriter { - fn close(self) -> Result<B3Digest, Error> { - // in this memory implementation, we don't actually bother hashing - // incrementally while writing, but do it at the end. - let mut hasher = blake3::Hasher::new(); - hasher.update(&self.buf); - let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); - - // open the database for writing. - let mut db = self.db.write()?; - db.insert(digest.clone(), self.buf); - - Ok(digest) + fn close(&mut self) -> Result<B3Digest, Error> { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // Only insert if the blob doesn't already exist. + let db = self.db.read()?; + if !db.contains_key(&digest) { + // drop the read lock, so we can open for writing. + drop(db); + + // open the database for writing. + let mut db = self.db.write()?; + + // and put buf in there. This will move buf out. + db.insert(digest.clone(), buf); + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } } } diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index c5a2de124656..08565285c0d1 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -14,28 +14,25 @@ pub use self::sled::SledBlobService; /// It provides functions to check whether a given blob exists, /// a way to get a [io::Read] to a blob, and a method to initiate writing a new /// Blob, which returns a [BlobWriter], that can be used -pub trait BlobService { - type BlobReader: io::Read + Send + std::marker::Unpin; - type BlobWriter: BlobWriter + Send; - +pub trait BlobService: Send + Sync { /// Check if the service has the blob, by its content hash. fn has(&self, digest: &B3Digest) -> Result<bool, Error>; /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>. - fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error>; + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error>; /// Insert a new blob into the store. Returns a [BlobWriter], which /// implements [io::Write] and a [BlobWriter::close]. /// TODO: is there any reason we want this to be a Result<>, and not just T? - fn open_write(&self) -> Result<Self::BlobWriter, Error>; + fn open_write(&self) -> Result<Box<dyn BlobWriter>, Error>; } /// A [io::Write] that you need to close() afterwards, and get back the digest /// of the written blob. -pub trait BlobWriter: io::Write { +pub trait BlobWriter: io::Write + Send + Sync + 'static { /// Signal there's no more data to be written, and return the digest of the /// contents written. /// - /// This consumes self, so it's not possible to close twice. - fn close(self) -> Result<B3Digest, Error>; + /// Closing a already-closed BlobWriter is a no-op. + fn close(&mut self) -> Result<B3Digest, Error>; } diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 2b090335344d..3b130fcd9a05 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -28,9 +28,6 @@ impl SledBlobService { } impl BlobService for SledBlobService { - type BlobReader = Cursor<Vec<u8>>; - type BlobWriter = SledBlobWriter; - #[instrument(skip(self), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, Error> { match self.db.contains_key(digest.to_vec()) { @@ -40,55 +37,93 @@ impl BlobService for SledBlobService { } #[instrument(skip(self), fields(blob.digest=%digest))] - fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> { + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> { match self.db.get(digest.to_vec()) { Ok(None) => Ok(None), - Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))), + Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))), Err(e) => Err(Error::StorageError(e.to_string())), } } #[instrument(skip(self))] - fn open_write(&self) -> Result<Self::BlobWriter, Error> { - Ok(SledBlobWriter::new(self.db.clone())) + fn open_write(&self) -> Result<Box<dyn BlobWriter>, Error> { + Ok(Box::new(SledBlobWriter::new(self.db.clone()))) } } pub struct SledBlobWriter { db: sled::Db, - buf: Vec<u8>, - hasher: blake3::Hasher, + + /// Contains the Vec and hasher, or None if already closed + writers: Option<(Vec<u8>, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, } impl SledBlobWriter { pub fn new(db: sled::Db) -> Self { Self { - buf: Vec::default(), db, - hasher: blake3::Hasher::new(), + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, } } } impl io::Write for SledBlobWriter { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - let bytes_written = self.buf.write(buf)?; - self.hasher.write(&buf[..bytes_written]) + fn write(&mut self, b: &[u8]) -> io::Result<usize> { + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&buf[..bytes_written]) + } + } } fn flush(&mut self) -> io::Result<()> { - self.buf.flush() + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + } } } impl BlobWriter for SledBlobWriter { - fn close(self) -> Result<B3Digest, Error> { - let digest = self.hasher.finalize(); - self.db - .insert(digest.as_bytes(), self.buf) - .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; - - // We know self.hasher is doing blake3 hashing, so this won't fail. - Ok(B3Digest::from_vec(digest.as_bytes().to_vec()).unwrap()) + fn close(&mut self) -> Result<B3Digest, Error> { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // Only insert if the blob doesn't already exist. + if !self.db.contains_key(&digest.to_vec()).map_err(|e| { + Error::StorageError(format!("Unable to check if we have blob {}: {}", digest, e)) + })? { + // put buf in there. This will move buf out. + self.db + .insert(digest.to_vec(), buf) + .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } } } |