diff options
Diffstat (limited to 'tvix/store/src/blobservice/sled.rs')
-rw-r--r-- | tvix/store/src/blobservice/sled.rs | 81 |
1 files changed, 58 insertions, 23 deletions
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 2b090335344d..3b130fcd9a05 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -28,9 +28,6 @@ impl SledBlobService { } impl BlobService for SledBlobService { - type BlobReader = Cursor<Vec<u8>>; - type BlobWriter = SledBlobWriter; - #[instrument(skip(self), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, Error> { match self.db.contains_key(digest.to_vec()) { @@ -40,55 +37,93 @@ impl BlobService for SledBlobService { } #[instrument(skip(self), fields(blob.digest=%digest))] - fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> { + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> { match self.db.get(digest.to_vec()) { Ok(None) => Ok(None), - Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))), + Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))), Err(e) => Err(Error::StorageError(e.to_string())), } } #[instrument(skip(self))] - fn open_write(&self) -> Result<Self::BlobWriter, Error> { - Ok(SledBlobWriter::new(self.db.clone())) + fn open_write(&self) -> Result<Box<dyn BlobWriter>, Error> { + Ok(Box::new(SledBlobWriter::new(self.db.clone()))) } } pub struct SledBlobWriter { db: sled::Db, - buf: Vec<u8>, - hasher: blake3::Hasher, + + /// Contains the Vec and hasher, or None if already closed + writers: Option<(Vec<u8>, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, } impl SledBlobWriter { pub fn new(db: sled::Db) -> Self { Self { - buf: Vec::default(), db, - hasher: blake3::Hasher::new(), + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, } } } impl io::Write for SledBlobWriter { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - let bytes_written = self.buf.write(buf)?; - self.hasher.write(&buf[..bytes_written]) + fn write(&mut self, b: &[u8]) -> io::Result<usize> { + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&buf[..bytes_written]) + } + } } fn flush(&mut self) -> io::Result<()> { - self.buf.flush() + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + } } } impl BlobWriter for SledBlobWriter { - fn close(self) -> Result<B3Digest, Error> { - let digest = self.hasher.finalize(); - self.db - .insert(digest.as_bytes(), self.buf) - .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; - - // We know self.hasher is doing blake3 hashing, so this won't fail. - Ok(B3Digest::from_vec(digest.as_bytes().to_vec()).unwrap()) + fn close(&mut self) -> Result<B3Digest, Error> { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // Only insert if the blob doesn't already exist. + if !self.db.contains_key(&digest.to_vec()).map_err(|e| { + Error::StorageError(format!("Unable to check if we have blob {}: {}", digest, e)) + })? { + // put buf in there. This will move buf out. + self.db + .insert(digest.to_vec(), buf) + .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } } } |