about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice/sled.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/blobservice/sled.rs')
-rw-r--r--tvix/castore/src/blobservice/sled.rs150
1 files changed, 0 insertions, 150 deletions
diff --git a/tvix/castore/src/blobservice/sled.rs b/tvix/castore/src/blobservice/sled.rs
deleted file mode 100644
index 3dd4bff7bc8e..000000000000
--- a/tvix/castore/src/blobservice/sled.rs
+++ /dev/null
@@ -1,150 +0,0 @@
-use super::{BlobReader, BlobService, BlobWriter};
-use crate::{B3Digest, Error};
-use std::{
-    io::{self, Cursor, Write},
-    path::Path,
-    task::Poll,
-};
-use tonic::async_trait;
-use tracing::instrument;
-
-#[derive(Clone)]
-pub struct SledBlobService {
-    db: sled::Db,
-}
-
-impl SledBlobService {
-    pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> {
-        let config = sled::Config::default()
-            .use_compression(false) // is a required parameter
-            .path(p);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-
-    pub fn new_temporary() -> Result<Self, sled::Error> {
-        let config = sled::Config::default().temporary(true);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-}
-
-#[async_trait]
-impl BlobService for SledBlobService {
-    #[instrument(skip(self), fields(blob.digest=%digest))]
-    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
-        match self.db.contains_key(digest.as_slice()) {
-            Ok(has) => Ok(has),
-            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
-        }
-    }
-
-    #[instrument(skip(self), fields(blob.digest=%digest))]
-    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
-        match self.db.get(digest.as_slice()) {
-            Ok(None) => Ok(None),
-            Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))),
-            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
-        }
-    }
-
-    #[instrument(skip(self))]
-    async fn open_write(&self) -> Box<dyn BlobWriter> {
-        Box::new(SledBlobWriter::new(self.db.clone()))
-    }
-}
-
-pub struct SledBlobWriter {
-    db: sled::Db,
-
-    /// Contains the buffer Vec and hasher, or None if already closed
-    writers: Option<(Vec<u8>, blake3::Hasher)>,
-
-    /// The digest that has been returned, if we successfully closed.
-    digest: Option<B3Digest>,
-}
-
-impl SledBlobWriter {
-    pub fn new(db: sled::Db) -> Self {
-        Self {
-            db,
-            writers: Some((Vec::new(), blake3::Hasher::new())),
-            digest: None,
-        }
-    }
-}
-
-impl tokio::io::AsyncWrite for SledBlobWriter {
-    fn poll_write(
-        mut self: std::pin::Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-        b: &[u8],
-    ) -> std::task::Poll<Result<usize, io::Error>> {
-        Poll::Ready(match &mut self.writers {
-            None => Err(io::Error::new(
-                io::ErrorKind::NotConnected,
-                "already closed",
-            )),
-            Some((ref mut buf, ref mut hasher)) => {
-                let bytes_written = buf.write(b)?;
-                hasher.write(&b[..bytes_written])
-            }
-        })
-    }
-
-    fn poll_flush(
-        mut self: std::pin::Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), io::Error>> {
-        Poll::Ready(match &mut self.writers {
-            None => Err(io::Error::new(
-                io::ErrorKind::NotConnected,
-                "already closed",
-            )),
-            Some(_) => Ok(()),
-        })
-    }
-
-    fn poll_shutdown(
-        self: std::pin::Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), io::Error>> {
-        // shutdown is "instantaneous", we only write to a Vec<u8> as buffer.
-        Poll::Ready(Ok(()))
-    }
-}
-
-#[async_trait]
-impl BlobWriter for SledBlobWriter {
-    async fn close(&mut self) -> io::Result<B3Digest> {
-        if self.writers.is_none() {
-            match &self.digest {
-                Some(digest) => Ok(digest.clone()),
-                None => Err(io::Error::new(
-                    io::ErrorKind::NotConnected,
-                    "already closed",
-                )),
-            }
-        } else {
-            let (buf, hasher) = self.writers.take().unwrap();
-
-            let digest: B3Digest = hasher.finalize().as_bytes().into();
-
-            // Only insert if the blob doesn't already exist.
-            if !self.db.contains_key(digest.as_slice()).map_err(|e| {
-                Error::StorageError(format!("Unable to check if we have blob {}: {}", digest, e))
-            })? {
-                // put buf in there. This will move buf out.
-                self.db
-                    .insert(digest.as_slice(), buf)
-                    .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?;
-            }
-
-            self.digest = Some(digest.clone());
-
-            Ok(digest)
-        }
-    }
-}