diff options
Diffstat (limited to 'tvix/castore/src')
-rw-r--r-- | tvix/castore/src/blobservice/from_addr.rs | 43 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/mod.rs | 2 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/sled.rs | 150 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/tests/mod.rs | 1 |
4 files changed, 1 insertions, 195 deletions
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index 3e3f943e5931..8898bbfb95ce 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -2,15 +2,12 @@ use url::Url; use crate::{proto::blob_service_client::BlobServiceClient, Error}; -use super::{ - BlobService, GRPCBlobService, MemoryBlobService, ObjectStoreBlobService, SledBlobService, -}; +use super::{BlobService, GRPCBlobService, MemoryBlobService, ObjectStoreBlobService}; /// Constructs a new instance of a [BlobService] from an URI. /// /// The following schemes are supported by the following services: /// - `memory://` ([MemoryBlobService]) -/// - `sled://` ([SledBlobService]) /// - `grpc+*://` ([GRPCBlobService]) /// - `objectstore+*://` ([ObjectStoreBlobService]) /// @@ -27,27 +24,6 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error> } Box::<MemoryBlobService>::default() } - "sled" => { - // sled doesn't support host, and a path can be provided (otherwise - // it'll live in memory only). - if url.has_host() { - return Err(Error::StorageError("no host allowed".to_string())); - } - - if url.path() == "/" { - return Err(Error::StorageError( - "cowardly refusing to open / with sled".to_string(), - )); - } - - // TODO: expose other parameters as URL parameters? - - Box::new(if url.path().is_empty() { - SledBlobService::new_temporary().map_err(|e| Error::StorageError(e.to_string()))? - } else { - SledBlobService::new(url.path()).map_err(|e| Error::StorageError(e.to_string()))? - }) - } scheme if scheme.starts_with("grpc+") => { // schemes starting with grpc+ go to the GRPCPathInfoService. // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. @@ -83,28 +59,11 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error> #[cfg(test)] mod tests { use super::from_addr; - use lazy_static::lazy_static; use rstest::rstest; - use tempfile::TempDir; - - lazy_static! { - static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); - static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); - } #[rstest] /// This uses an unsupported scheme. #[case::unsupported_scheme("http://foo.example/test", false)] - /// This configures sled in temporary mode. - #[case::sled_temporary("sled://", true)] - /// This configures sled with /, which should fail. - #[case::sled_invalid_root("sled:///", false)] - /// This configures sled with a host, not path, which should fail. - #[case::sled_invalid_host("sled://foo.example", false)] - /// This configures sled with a valid path path, which should succeed. - #[case::sled_valid_path(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true)] - /// This configures sled with a host, and a valid path path, which should fail. - #[case::sled_invalid_host_with_valid_path(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false)] /// This correctly sets the scheme, and doesn't set a path. #[case::memory_valid("memory://", true)] /// This sets a memory url host to `foo` diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index 4ba56a4af731..50acd40bf769 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -11,7 +11,6 @@ mod grpc; mod memory; mod naive_seeker; mod object_store; -mod sled; #[cfg(test)] pub mod tests; @@ -22,7 +21,6 @@ pub use self::from_addr::from_addr; pub use self::grpc::GRPCBlobService; pub use self::memory::MemoryBlobService; pub use self::object_store::ObjectStoreBlobService; -pub use self::sled::SledBlobService; /// The base trait all BlobService services need to implement. /// It provides functions to check whether a given blob exists, diff --git a/tvix/castore/src/blobservice/sled.rs b/tvix/castore/src/blobservice/sled.rs deleted file mode 100644 index 3dd4bff7bc8e..000000000000 --- a/tvix/castore/src/blobservice/sled.rs +++ /dev/null @@ -1,150 +0,0 @@ -use super::{BlobReader, BlobService, BlobWriter}; -use crate::{B3Digest, Error}; -use std::{ - io::{self, Cursor, Write}, - path::Path, - task::Poll, -}; -use tonic::async_trait; -use tracing::instrument; - -#[derive(Clone)] -pub struct SledBlobService { - db: sled::Db, -} - -impl SledBlobService { - pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> { - let config = sled::Config::default() - .use_compression(false) // is a required parameter - .path(p); - let db = config.open()?; - - Ok(Self { db }) - } - - pub fn new_temporary() -> Result<Self, sled::Error> { - let config = sled::Config::default().temporary(true); - let db = config.open()?; - - Ok(Self { db }) - } -} - -#[async_trait] -impl BlobService for SledBlobService { - #[instrument(skip(self), fields(blob.digest=%digest))] - async fn has(&self, digest: &B3Digest) -> io::Result<bool> { - match self.db.contains_key(digest.as_slice()) { - Ok(has) => Ok(has), - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), - } - } - - #[instrument(skip(self), fields(blob.digest=%digest))] - async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { - match self.db.get(digest.as_slice()) { - Ok(None) => Ok(None), - Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))), - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), - } - } - - #[instrument(skip(self))] - async fn open_write(&self) -> Box<dyn BlobWriter> { - Box::new(SledBlobWriter::new(self.db.clone())) - } -} - -pub struct SledBlobWriter { - db: sled::Db, - - /// Contains the buffer Vec and hasher, or None if already closed - writers: Option<(Vec<u8>, blake3::Hasher)>, - - /// The digest that has been returned, if we successfully closed. - digest: Option<B3Digest>, -} - -impl SledBlobWriter { - pub fn new(db: sled::Db) -> Self { - Self { - db, - writers: Some((Vec::new(), blake3::Hasher::new())), - digest: None, - } - } -} - -impl tokio::io::AsyncWrite for SledBlobWriter { - fn poll_write( - mut self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - b: &[u8], - ) -> std::task::Poll<Result<usize, io::Error>> { - Poll::Ready(match &mut self.writers { - None => Err(io::Error::new( - io::ErrorKind::NotConnected, - "already closed", - )), - Some((ref mut buf, ref mut hasher)) => { - let bytes_written = buf.write(b)?; - hasher.write(&b[..bytes_written]) - } - }) - } - - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Result<(), io::Error>> { - Poll::Ready(match &mut self.writers { - None => Err(io::Error::new( - io::ErrorKind::NotConnected, - "already closed", - )), - Some(_) => Ok(()), - }) - } - - fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Result<(), io::Error>> { - // shutdown is "instantaneous", we only write to a Vec<u8> as buffer. - Poll::Ready(Ok(())) - } -} - -#[async_trait] -impl BlobWriter for SledBlobWriter { - async fn close(&mut self) -> io::Result<B3Digest> { - if self.writers.is_none() { - match &self.digest { - Some(digest) => Ok(digest.clone()), - None => Err(io::Error::new( - io::ErrorKind::NotConnected, - "already closed", - )), - } - } else { - let (buf, hasher) = self.writers.take().unwrap(); - - let digest: B3Digest = hasher.finalize().as_bytes().into(); - - // Only insert if the blob doesn't already exist. - if !self.db.contains_key(digest.as_slice()).map_err(|e| { - Error::StorageError(format!("Unable to check if we have blob {}: {}", digest, e)) - })? { - // put buf in there. This will move buf out. - self.db - .insert(digest.as_slice(), buf) - .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; - } - - self.digest = Some(digest.clone()); - - Ok(digest) - } - } -} diff --git a/tvix/castore/src/blobservice/tests/mod.rs b/tvix/castore/src/blobservice/tests/mod.rs index 30c4e97634a4..0280faebb171 100644 --- a/tvix/castore/src/blobservice/tests/mod.rs +++ b/tvix/castore/src/blobservice/tests/mod.rs @@ -25,7 +25,6 @@ use self::utils::make_grpc_blob_service_client; #[case::grpc(make_grpc_blob_service_client().await)] #[case::memory(blobservice::from_addr("memory://").await.unwrap())] #[case::objectstore_memory(blobservice::from_addr("objectstore+memory://").await.unwrap())] -#[case::sled(blobservice::from_addr("sled://").await.unwrap())] pub fn blob_services(#[case] blob_service: impl BlobService) {} /// Using [BlobService::has] on a non-existing blob should return false. |