diff options
author | Ilan Joselevich <personal@ilanjoselevich.com> | 2024-07-26T20·46+0200 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-08-01T13·20+0000 |
commit | 41dc9ee6a2eddb9d378b12e6fdf28336499bff83 (patch) | |
tree | adeb7be66eeb628d2c765d10c836b6929be844e0 /tvix/castore/src/directoryservice/redb.rs | |
parent | 87d4b00ff54b5726c7e26ed456f0c5285e506a6b (diff) |
feat(tvix/castore): add RedbDirectoryService r/8437
This provides a DirectoryService implementation which uses redb (https://github.com/cberner/redb) as the database. It provides both in-memory and persistent on-filesystem implementations. Change-Id: Id8f7c812e2cf401cccd1c382b19907b17a6887bc Reviewed-on: https://cl.tvl.fyi/c/depot/+/12038 Tested-by: BuildkiteCI Autosubmit: Ilan Joselevich <personal@ilanjoselevich.com> Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/castore/src/directoryservice/redb.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/redb.rs | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs new file mode 100644 index 000000000000..51dc87f92574 --- /dev/null +++ b/tvix/castore/src/directoryservice/redb.rs @@ -0,0 +1,308 @@ +use futures::stream::BoxStream; +use prost::Message; +use redb::{Database, TableDefinition}; +use std::{path::PathBuf, sync::Arc}; +use tonic::async_trait; +use tracing::{instrument, warn}; + +use crate::{ + composition::{CompositionContext, ServiceBuilder}, + digests, proto, B3Digest, Error, +}; + +use super::{ + traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, +}; + +const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = + TableDefinition::new("directory"); + +#[derive(Clone)] +pub struct RedbDirectoryService { + // We wrap the db in an Arc to be able to move it into spawn_blocking, + // as discussed in https://github.com/cberner/redb/issues/789 + db: Arc<Database>, +} + +impl RedbDirectoryService { + /// Constructs a new instance using the specified filesystem path for + /// storage. + pub async fn new(path: PathBuf) -> Result<Self, Error> { + if path == PathBuf::from("/") { + return Err(Error::StorageError( + "cowardly refusing to open / with redb".to_string(), + )); + } + + let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> { + let db = redb::Database::create(path)?; + create_schema(&db)?; + Ok(db) + }) + .await??; + + Ok(Self { db: Arc::new(db) }) + } + + /// Constructs a new instance using the in-memory backend. + pub fn new_temporary() -> Result<Self, Error> { + let db = + redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?; + + create_schema(&db)?; + + Ok(Self { db: Arc::new(db) }) + } +} + +/// Ensures all tables are present. +/// Opens a write transaction and calls open_table on DIRECTORY_TABLE, which will +/// create it if not present. +fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { + let txn = db.begin_write()?; + txn.open_table(DIRECTORY_TABLE)?; + txn.commit()?; + + Ok(()) +} + +#[async_trait] +impl DirectoryService for RedbDirectoryService { + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + let db = self.db.clone(); + + // Retrieves the protobuf-encoded Directory for the corresponding digest. + let db_get_resp = tokio::task::spawn_blocking({ + let digest_as_array: [u8; digests::B3_LEN] = digest.to_owned().into(); + move || -> Result<_, redb::Error> { + let txn = db.begin_read()?; + let table = txn.open_table(DIRECTORY_TABLE)?; + Ok(table.get(digest_as_array)?) + } + }) + .await? + .map_err(|e| { + warn!(err=%e, "failed to retrieve Directory"); + Error::StorageError("failed to retrieve Directory".to_string()) + })?; + + // The Directory was not found, return None. + let directory_data = match db_get_resp { + None => return Ok(None), + Some(d) => d, + }; + + // We check that the digest of the retrieved Directory matches the expected digest. + let actual_digest = blake3::hash(directory_data.value().as_slice()); + if actual_digest.as_bytes() != digest.as_slice() { + warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest"); + return Err(Error::StorageError( + "requested Directory got the wrong digest".to_string(), + )); + } + + // Attempt to decode the retrieved protobuf-encoded Directory, returning a parsing error if + // the decoding failed. + let directory = match proto::Directory::decode(&*directory_data.value()) { + Ok(dir) => { + // The returned Directory must be valid. + if let Err(e) = dir.validate() { + warn!(err=%e, "Directory failed validation"); + return Err(Error::StorageError( + "Directory failed validation".to_string(), + )); + } + dir + } + Err(e) => { + warn!(err=%e, "failed to parse Directory"); + return Err(Error::StorageError("failed to parse Directory".to_string())); + } + }; + + Ok(Some(directory)) + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + tokio::task::spawn_blocking({ + let db = self.db.clone(); + move || { + let digest = directory.digest(); + + // Validate the directory. + if let Err(e) = directory.validate() { + warn!(err=%e, "Directory failed validation"); + return Err(Error::StorageError( + "Directory failed validation".to_string(), + )); + } + + // Store the directory in the table. + let txn = db.begin_write()?; + { + let mut table = txn.open_table(DIRECTORY_TABLE)?; + let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into(); + table.insert(digest_as_array, directory.encode_to_vec())?; + } + txn.commit()?; + + Ok(digest) + } + }) + .await? + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<'static, Result<proto::Directory, Error>> { + // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single + // redb transaction to avoid constantly closing and opening new transactions for the + // database. + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> { + Box::new(RedbDirectoryPutter { + db: self.db.clone(), + directory_validator: Some(Default::default()), + }) + } +} + +pub struct RedbDirectoryPutter { + db: Arc<Database>, + + /// The directories (inside the directory validator) that we insert later, + /// or None, if they were already inserted. + directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>, +} + +#[async_trait] +impl DirectoryPutter for RedbDirectoryPutter { + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + match self.directory_validator { + None => return Err(Error::StorageError("already closed".to_string())), + Some(ref mut validator) => { + validator + .add(directory) + .map_err(|e| Error::StorageError(e.to_string()))?; + } + } + + Ok(()) + } + + #[instrument(level = "trace", skip_all, ret, err)] + async fn close(&mut self) -> Result<B3Digest, Error> { + match self.directory_validator.take() { + None => Err(Error::StorageError("already closed".to_string())), + Some(validator) => { + // Insert all directories as a batch. + tokio::task::spawn_blocking({ + let txn = self.db.begin_write()?; + move || { + // Retrieve the validated directories. + let directories = validator + .validate() + .map_err(|e| Error::StorageError(e.to_string()))? + .drain_leaves_to_root() + .collect::<Vec<_>>(); + + // Get the root digest, which is at the end (cf. insertion order) + let root_digest = directories + .last() + .ok_or_else(|| Error::StorageError("got no directories".to_string()))? + .digest(); + + { + let mut table = txn.open_table(DIRECTORY_TABLE)?; + + // Looping over all the verified directories, queuing them up for a + // batch insertion. + for directory in directories { + let digest_as_array: [u8; digests::B3_LEN] = + directory.digest().into(); + table.insert(digest_as_array, directory.encode_to_vec())?; + } + } + + txn.commit()?; + + Ok(root_digest) + } + }) + .await? + } + } + } +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct RedbDirectoryServiceConfig { + is_temporary: bool, + #[serde(default)] + /// required when is_temporary = false + path: Option<PathBuf>, +} + +impl TryFrom<url::Url> for RedbDirectoryServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // redb doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string()).into()); + } + + Ok(if url.path().is_empty() { + RedbDirectoryServiceConfig { + is_temporary: true, + path: None, + } + } else { + RedbDirectoryServiceConfig { + is_temporary: false, + path: Some(url.path().into()), + } + }) + } +} + +#[async_trait] +impl ServiceBuilder for RedbDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + match self { + RedbDirectoryServiceConfig { + is_temporary: true, + path: None, + } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)), + RedbDirectoryServiceConfig { + is_temporary: true, + path: Some(_), + } => Err(Error::StorageError( + "Temporary RedbDirectoryService can not have path".into(), + ) + .into()), + RedbDirectoryServiceConfig { + is_temporary: false, + path: None, + } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()), + RedbDirectoryServiceConfig { + is_temporary: false, + path: Some(path), + } => Ok(Arc::new(RedbDirectoryService::new(path.into()).await?)), + } + } +} |