diff options
Diffstat (limited to 'tvix/castore/src')
-rw-r--r-- | tvix/castore/src/digests.rs | 11 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/from_addr.rs | 17 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/mod.rs | 3 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/redb.rs | 308 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/tests/mod.rs | 1 |
5 files changed, 340 insertions, 0 deletions
diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs index 2311c95c4ddc..ef9a7326b3fb 100644 --- a/tvix/castore/src/digests.rs +++ b/tvix/castore/src/digests.rs @@ -26,6 +26,11 @@ impl From<B3Digest> for bytes::Bytes { } } +impl From<blake3::Hash> for B3Digest { + fn from(value: blake3::Hash) -> Self { + Self(Bytes::copy_from_slice(value.as_bytes())) + } +} impl From<digest::Output<blake3::Hasher>> for B3Digest { fn from(value: digest::Output<blake3::Hasher>) -> Self { let v = Into::<[u8; B3_LEN]>::into(value); @@ -67,6 +72,12 @@ impl From<&[u8; B3_LEN]> for B3Digest { } } +impl From<B3Digest> for [u8; B3_LEN] { + fn from(value: B3Digest) -> Self { + value.0.to_vec().try_into().unwrap() + } +} + impl Clone for B3Digest { fn clone(&self) -> Self { Self(self.0.to_owned()) diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index bc63f129fe9e..3feb8f3509fe 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -18,6 +18,11 @@ use super::DirectoryService; /// - `sled:///absolute/path/to/somewhere` /// Uses sled, using a path on the disk for persistency. Can be only opened /// from one process at the same time. +/// - `redb:` +/// Uses a in-memory redb implementation. +/// - `redb:///absolute/path/to/somewhere` +/// Uses redb, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. /// - `grpc+unix:///absolute/path/to/somewhere` /// Connects to a local tvix-store gRPC service via Unix socket. /// - `grpc+http://host:port`, `grpc+https://host:port` @@ -52,6 +57,8 @@ mod tests { lazy_static! { static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_REDB_1: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_REDB_2: TempDir = TempDir::new().unwrap(); } #[rstest] @@ -75,6 +82,16 @@ mod tests { #[case::memory_invalid_root_path("memory:///", false)] /// This sets a memory url path to "/foo", which is invalid. #[case::memory_invalid_root_path_foo("memory:///foo", false)] + /// This configures redb in temporary mode. + #[case::redb_valid_temporary("redb://", true)] + /// This configures redb with /, which should fail. + #[case::redb_invalid_root("redb:///", false)] + /// This configures redb with a host, not path, which should fail. + #[case::redb_invalid_host("redb://foo.example", false)] + /// This configures redb with a valid path, which should succeed. + #[case::redb_valid_path(&format!("redb://{}", &TMPDIR_REDB_1.path().join("foo").to_str().unwrap()), true)] + /// This configures redb with a host, and a valid path path, which should fail. + #[case::redb_invalid_host_with_valid_path(&format!("redb://foo.example{}", &TMPDIR_REDB_2.path().join("bar").to_str().unwrap()), false)] /// Correct scheme to connect to a unix socket. #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)] /// Correct scheme for unix socket, but setting a host too, which is invalid. diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index ffd9ea3636ec..17a78b179349 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -10,6 +10,7 @@ mod grpc; mod memory; mod object_store; mod order_validator; +mod redb; mod simple_putter; mod sled; #[cfg(test)] @@ -24,6 +25,7 @@ pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig}; pub use self::memory::{MemoryDirectoryService, MemoryDirectoryServiceConfig}; pub use self::object_store::{ObjectStoreDirectoryService, ObjectStoreDirectoryServiceConfig}; pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; +pub use self::redb::{RedbDirectoryService, RedbDirectoryServiceConfig}; pub use self::simple_putter::SimplePutter; pub use self::sled::{SledDirectoryService, SledDirectoryServiceConfig}; pub use self::traverse::descend_to; @@ -137,6 +139,7 @@ pub(crate) fn register_directory_services(reg: &mut Registry) { reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::CacheConfig>("cache"); reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc"); reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::SledDirectoryServiceConfig>("sled"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::RedbDirectoryServiceConfig>("redb"); #[cfg(feature = "cloud")] { reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable"); diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs new file mode 100644 index 000000000000..51dc87f92574 --- /dev/null +++ b/tvix/castore/src/directoryservice/redb.rs @@ -0,0 +1,308 @@ +use futures::stream::BoxStream; +use prost::Message; +use redb::{Database, TableDefinition}; +use std::{path::PathBuf, sync::Arc}; +use tonic::async_trait; +use tracing::{instrument, warn}; + +use crate::{ + composition::{CompositionContext, ServiceBuilder}, + digests, proto, B3Digest, Error, +}; + +use super::{ + traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, +}; + +const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = + TableDefinition::new("directory"); + +#[derive(Clone)] +pub struct RedbDirectoryService { + // We wrap the db in an Arc to be able to move it into spawn_blocking, + // as discussed in https://github.com/cberner/redb/issues/789 + db: Arc<Database>, +} + +impl RedbDirectoryService { + /// Constructs a new instance using the specified filesystem path for + /// storage. + pub async fn new(path: PathBuf) -> Result<Self, Error> { + if path == PathBuf::from("/") { + return Err(Error::StorageError( + "cowardly refusing to open / with redb".to_string(), + )); + } + + let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> { + let db = redb::Database::create(path)?; + create_schema(&db)?; + Ok(db) + }) + .await??; + + Ok(Self { db: Arc::new(db) }) + } + + /// Constructs a new instance using the in-memory backend. + pub fn new_temporary() -> Result<Self, Error> { + let db = + redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?; + + create_schema(&db)?; + + Ok(Self { db: Arc::new(db) }) + } +} + +/// Ensures all tables are present. +/// Opens a write transaction and calls open_table on DIRECTORY_TABLE, which will +/// create it if not present. +fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { + let txn = db.begin_write()?; + txn.open_table(DIRECTORY_TABLE)?; + txn.commit()?; + + Ok(()) +} + +#[async_trait] +impl DirectoryService for RedbDirectoryService { + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + let db = self.db.clone(); + + // Retrieves the protobuf-encoded Directory for the corresponding digest. + let db_get_resp = tokio::task::spawn_blocking({ + let digest_as_array: [u8; digests::B3_LEN] = digest.to_owned().into(); + move || -> Result<_, redb::Error> { + let txn = db.begin_read()?; + let table = txn.open_table(DIRECTORY_TABLE)?; + Ok(table.get(digest_as_array)?) + } + }) + .await? + .map_err(|e| { + warn!(err=%e, "failed to retrieve Directory"); + Error::StorageError("failed to retrieve Directory".to_string()) + })?; + + // The Directory was not found, return None. + let directory_data = match db_get_resp { + None => return Ok(None), + Some(d) => d, + }; + + // We check that the digest of the retrieved Directory matches the expected digest. + let actual_digest = blake3::hash(directory_data.value().as_slice()); + if actual_digest.as_bytes() != digest.as_slice() { + warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest"); + return Err(Error::StorageError( + "requested Directory got the wrong digest".to_string(), + )); + } + + // Attempt to decode the retrieved protobuf-encoded Directory, returning a parsing error if + // the decoding failed. + let directory = match proto::Directory::decode(&*directory_data.value()) { + Ok(dir) => { + // The returned Directory must be valid. + if let Err(e) = dir.validate() { + warn!(err=%e, "Directory failed validation"); + return Err(Error::StorageError( + "Directory failed validation".to_string(), + )); + } + dir + } + Err(e) => { + warn!(err=%e, "failed to parse Directory"); + return Err(Error::StorageError("failed to parse Directory".to_string())); + } + }; + + Ok(Some(directory)) + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + tokio::task::spawn_blocking({ + let db = self.db.clone(); + move || { + let digest = directory.digest(); + + // Validate the directory. + if let Err(e) = directory.validate() { + warn!(err=%e, "Directory failed validation"); + return Err(Error::StorageError( + "Directory failed validation".to_string(), + )); + } + + // Store the directory in the table. + let txn = db.begin_write()?; + { + let mut table = txn.open_table(DIRECTORY_TABLE)?; + let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into(); + table.insert(digest_as_array, directory.encode_to_vec())?; + } + txn.commit()?; + + Ok(digest) + } + }) + .await? + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<'static, Result<proto::Directory, Error>> { + // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single + // redb transaction to avoid constantly closing and opening new transactions for the + // database. + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> { + Box::new(RedbDirectoryPutter { + db: self.db.clone(), + directory_validator: Some(Default::default()), + }) + } +} + +pub struct RedbDirectoryPutter { + db: Arc<Database>, + + /// The directories (inside the directory validator) that we insert later, + /// or None, if they were already inserted. + directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>, +} + +#[async_trait] +impl DirectoryPutter for RedbDirectoryPutter { + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + match self.directory_validator { + None => return Err(Error::StorageError("already closed".to_string())), + Some(ref mut validator) => { + validator + .add(directory) + .map_err(|e| Error::StorageError(e.to_string()))?; + } + } + + Ok(()) + } + + #[instrument(level = "trace", skip_all, ret, err)] + async fn close(&mut self) -> Result<B3Digest, Error> { + match self.directory_validator.take() { + None => Err(Error::StorageError("already closed".to_string())), + Some(validator) => { + // Insert all directories as a batch. + tokio::task::spawn_blocking({ + let txn = self.db.begin_write()?; + move || { + // Retrieve the validated directories. + let directories = validator + .validate() + .map_err(|e| Error::StorageError(e.to_string()))? + .drain_leaves_to_root() + .collect::<Vec<_>>(); + + // Get the root digest, which is at the end (cf. insertion order) + let root_digest = directories + .last() + .ok_or_else(|| Error::StorageError("got no directories".to_string()))? + .digest(); + + { + let mut table = txn.open_table(DIRECTORY_TABLE)?; + + // Looping over all the verified directories, queuing them up for a + // batch insertion. + for directory in directories { + let digest_as_array: [u8; digests::B3_LEN] = + directory.digest().into(); + table.insert(digest_as_array, directory.encode_to_vec())?; + } + } + + txn.commit()?; + + Ok(root_digest) + } + }) + .await? + } + } + } +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct RedbDirectoryServiceConfig { + is_temporary: bool, + #[serde(default)] + /// required when is_temporary = false + path: Option<PathBuf>, +} + +impl TryFrom<url::Url> for RedbDirectoryServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // redb doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string()).into()); + } + + Ok(if url.path().is_empty() { + RedbDirectoryServiceConfig { + is_temporary: true, + path: None, + } + } else { + RedbDirectoryServiceConfig { + is_temporary: false, + path: Some(url.path().into()), + } + }) + } +} + +#[async_trait] +impl ServiceBuilder for RedbDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + match self { + RedbDirectoryServiceConfig { + is_temporary: true, + path: None, + } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)), + RedbDirectoryServiceConfig { + is_temporary: true, + path: Some(_), + } => Err(Error::StorageError( + "Temporary RedbDirectoryService can not have path".into(), + ) + .into()), + RedbDirectoryServiceConfig { + is_temporary: false, + path: None, + } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()), + RedbDirectoryServiceConfig { + is_temporary: false, + path: Some(path), + } => Ok(Arc::new(RedbDirectoryService::new(path.into()).await?)), + } + } +} diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index cb79bc61dbd9..b698f70ea469 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -26,6 +26,7 @@ use self::utils::make_grpc_directory_service_client; #[case::grpc(make_grpc_directory_service_client().await)] #[case::memory(directoryservice::from_addr("memory://").await.unwrap())] #[case::sled(directoryservice::from_addr("sled://").await.unwrap())] +#[case::redb(directoryservice::from_addr("redb://").await.unwrap())] #[case::objectstore(directoryservice::from_addr("objectstore+memory://").await.unwrap())] #[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))] pub fn directory_services(#[case] directory_service: impl DirectoryService) {} |