about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice/redb.rs
diff options
context:
space:
mode:
authorIlan Joselevich <personal@ilanjoselevich.com>2024-07-26T20·46+0200
committerclbot <clbot@tvl.fyi>2024-08-01T13·20+0000
commit41dc9ee6a2eddb9d378b12e6fdf28336499bff83 (patch)
treeadeb7be66eeb628d2c765d10c836b6929be844e0 /tvix/castore/src/directoryservice/redb.rs
parent87d4b00ff54b5726c7e26ed456f0c5285e506a6b (diff)
feat(tvix/castore): add RedbDirectoryService r/8437
This provides a DirectoryService implementation which uses
redb (https://github.com/cberner/redb) as the database. It provides both
in-memory and persistent on-filesystem implementations.

Change-Id: Id8f7c812e2cf401cccd1c382b19907b17a6887bc
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12038
Tested-by: BuildkiteCI
Autosubmit: Ilan Joselevich <personal@ilanjoselevich.com>
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/castore/src/directoryservice/redb.rs')
-rw-r--r--tvix/castore/src/directoryservice/redb.rs308
1 files changed, 308 insertions, 0 deletions
diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs
new file mode 100644
index 000000000000..51dc87f92574
--- /dev/null
+++ b/tvix/castore/src/directoryservice/redb.rs
@@ -0,0 +1,308 @@
+use futures::stream::BoxStream;
+use prost::Message;
+use redb::{Database, TableDefinition};
+use std::{path::PathBuf, sync::Arc};
+use tonic::async_trait;
+use tracing::{instrument, warn};
+
+use crate::{
+    composition::{CompositionContext, ServiceBuilder},
+    digests, proto, B3Digest, Error,
+};
+
+use super::{
+    traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
+};
+
+const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> =
+    TableDefinition::new("directory");
+
+#[derive(Clone)]
+pub struct RedbDirectoryService {
+    // We wrap the db in an Arc to be able to move it into spawn_blocking,
+    // as discussed in https://github.com/cberner/redb/issues/789
+    db: Arc<Database>,
+}
+
+impl RedbDirectoryService {
+    /// Constructs a new instance using the specified filesystem path for
+    /// storage.
+    pub async fn new(path: PathBuf) -> Result<Self, Error> {
+        if path == PathBuf::from("/") {
+            return Err(Error::StorageError(
+                "cowardly refusing to open / with redb".to_string(),
+            ));
+        }
+
+        let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
+            let db = redb::Database::create(path)?;
+            create_schema(&db)?;
+            Ok(db)
+        })
+        .await??;
+
+        Ok(Self { db: Arc::new(db) })
+    }
+
+    /// Constructs a new instance using the in-memory backend.
+    pub fn new_temporary() -> Result<Self, Error> {
+        let db =
+            redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
+
+        create_schema(&db)?;
+
+        Ok(Self { db: Arc::new(db) })
+    }
+}
+
+/// Ensures all tables are present.
+/// Opens a write transaction and calls open_table on DIRECTORY_TABLE, which will
+/// create it if not present.
+fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
+    let txn = db.begin_write()?;
+    txn.open_table(DIRECTORY_TABLE)?;
+    txn.commit()?;
+
+    Ok(())
+}
+
+#[async_trait]
+impl DirectoryService for RedbDirectoryService {
+    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+        let db = self.db.clone();
+
+        // Retrieves the protobuf-encoded Directory for the corresponding digest.
+        let db_get_resp = tokio::task::spawn_blocking({
+            let digest_as_array: [u8; digests::B3_LEN] = digest.to_owned().into();
+            move || -> Result<_, redb::Error> {
+                let txn = db.begin_read()?;
+                let table = txn.open_table(DIRECTORY_TABLE)?;
+                Ok(table.get(digest_as_array)?)
+            }
+        })
+        .await?
+        .map_err(|e| {
+            warn!(err=%e, "failed to retrieve Directory");
+            Error::StorageError("failed to retrieve Directory".to_string())
+        })?;
+
+        // The Directory was not found, return None.
+        let directory_data = match db_get_resp {
+            None => return Ok(None),
+            Some(d) => d,
+        };
+
+        // We check that the digest of the retrieved Directory matches the expected digest.
+        let actual_digest = blake3::hash(directory_data.value().as_slice());
+        if actual_digest.as_bytes() != digest.as_slice() {
+            warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest");
+            return Err(Error::StorageError(
+                "requested Directory got the wrong digest".to_string(),
+            ));
+        }
+
+        // Attempt to decode the retrieved protobuf-encoded Directory, returning a parsing error if
+        // the decoding failed.
+        let directory = match proto::Directory::decode(&*directory_data.value()) {
+            Ok(dir) => {
+                // The returned Directory must be valid.
+                if let Err(e) = dir.validate() {
+                    warn!(err=%e, "Directory failed validation");
+                    return Err(Error::StorageError(
+                        "Directory failed validation".to_string(),
+                    ));
+                }
+                dir
+            }
+            Err(e) => {
+                warn!(err=%e, "failed to parse Directory");
+                return Err(Error::StorageError("failed to parse Directory".to_string()));
+            }
+        };
+
+        Ok(Some(directory))
+    }
+
+    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+        tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            move || {
+                let digest = directory.digest();
+
+                // Validate the directory.
+                if let Err(e) = directory.validate() {
+                    warn!(err=%e, "Directory failed validation");
+                    return Err(Error::StorageError(
+                        "Directory failed validation".to_string(),
+                    ));
+                }
+
+                // Store the directory in the table.
+                let txn = db.begin_write()?;
+                {
+                    let mut table = txn.open_table(DIRECTORY_TABLE)?;
+                    let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into();
+                    table.insert(digest_as_array, directory.encode_to_vec())?;
+                }
+                txn.commit()?;
+
+                Ok(digest)
+            }
+        })
+        .await?
+    }
+
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    fn get_recursive(
+        &self,
+        root_directory_digest: &B3Digest,
+    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+        // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
+        // redb transaction to avoid constantly closing and opening new transactions for the
+        // database.
+        traverse_directory(self.clone(), root_directory_digest)
+    }
+
+    #[instrument(skip_all)]
+    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> {
+        Box::new(RedbDirectoryPutter {
+            db: self.db.clone(),
+            directory_validator: Some(Default::default()),
+        })
+    }
+}
+
+pub struct RedbDirectoryPutter {
+    db: Arc<Database>,
+
+    /// The directories (inside the directory validator) that we insert later,
+    /// or None, if they were already inserted.
+    directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>,
+}
+
+#[async_trait]
+impl DirectoryPutter for RedbDirectoryPutter {
+    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+        match self.directory_validator {
+            None => return Err(Error::StorageError("already closed".to_string())),
+            Some(ref mut validator) => {
+                validator
+                    .add(directory)
+                    .map_err(|e| Error::StorageError(e.to_string()))?;
+            }
+        }
+
+        Ok(())
+    }
+
+    #[instrument(level = "trace", skip_all, ret, err)]
+    async fn close(&mut self) -> Result<B3Digest, Error> {
+        match self.directory_validator.take() {
+            None => Err(Error::StorageError("already closed".to_string())),
+            Some(validator) => {
+                // Insert all directories as a batch.
+                tokio::task::spawn_blocking({
+                    let txn = self.db.begin_write()?;
+                    move || {
+                        // Retrieve the validated directories.
+                        let directories = validator
+                            .validate()
+                            .map_err(|e| Error::StorageError(e.to_string()))?
+                            .drain_leaves_to_root()
+                            .collect::<Vec<_>>();
+
+                        // Get the root digest, which is at the end (cf. insertion order)
+                        let root_digest = directories
+                            .last()
+                            .ok_or_else(|| Error::StorageError("got no directories".to_string()))?
+                            .digest();
+
+                        {
+                            let mut table = txn.open_table(DIRECTORY_TABLE)?;
+
+                            // Looping over all the verified directories, queuing them up for a
+                            // batch insertion.
+                            for directory in directories {
+                                let digest_as_array: [u8; digests::B3_LEN] =
+                                    directory.digest().into();
+                                table.insert(digest_as_array, directory.encode_to_vec())?;
+                            }
+                        }
+
+                        txn.commit()?;
+
+                        Ok(root_digest)
+                    }
+                })
+                .await?
+            }
+        }
+    }
+}
+
+#[derive(serde::Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct RedbDirectoryServiceConfig {
+    is_temporary: bool,
+    #[serde(default)]
+    /// required when is_temporary = false
+    path: Option<PathBuf>,
+}
+
+impl TryFrom<url::Url> for RedbDirectoryServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        // redb doesn't support host, and a path can be provided (otherwise
+        // it'll live in memory only).
+        if url.has_host() {
+            return Err(Error::StorageError("no host allowed".to_string()).into());
+        }
+
+        Ok(if url.path().is_empty() {
+            RedbDirectoryServiceConfig {
+                is_temporary: true,
+                path: None,
+            }
+        } else {
+            RedbDirectoryServiceConfig {
+                is_temporary: false,
+                path: Some(url.path().into()),
+            }
+        })
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for RedbDirectoryServiceConfig {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        match self {
+            RedbDirectoryServiceConfig {
+                is_temporary: true,
+                path: None,
+            } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)),
+            RedbDirectoryServiceConfig {
+                is_temporary: true,
+                path: Some(_),
+            } => Err(Error::StorageError(
+                "Temporary RedbDirectoryService can not have path".into(),
+            )
+            .into()),
+            RedbDirectoryServiceConfig {
+                is_temporary: false,
+                path: None,
+            } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()),
+            RedbDirectoryServiceConfig {
+                is_temporary: false,
+                path: Some(path),
+            } => Ok(Arc::new(RedbDirectoryService::new(path.into()).await?)),
+        }
+    }
+}