diff options
author | Ilan Joselevich <personal@ilanjoselevich.com> | 2024-07-20T22·36+0200 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-07-22T18·49+0000 |
commit | 9f10a71ec5be4746cb67cc8dcb275a2436debaba (patch) | |
tree | b0e8224b88946c31d6131fa1428442bdb9f26402 /tvix/store/src/pathinfoservice/redb.rs | |
parent | dbe698042d73edf03ffcc7417f5427d57bcbec2f (diff) |
feat(tvix/store): add redb PathInfoService r/8405
This provides a PathInfoService implementation using redb (https://github.com/cberner/redb) as the underlying storage engine. Both an in-memory variant, as well as a filesystem one is provided, similar how it's done with the sled implementation. Supersedes: https://cl.tvl.fyi/c/depot/+/11692 Change-Id: I744619c51bf2efd0fb63659b12a27cbe0b2fd6fc Signed-off-by: Ilan Joselevich <personal@ilanjoselevich.com> Reviewed-on: https://cl.tvl.fyi/c/depot/+/11995 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/pathinfoservice/redb.rs')
-rw-r--r-- | tvix/store/src/pathinfoservice/redb.rs | 214 |
1 files changed, 214 insertions, 0 deletions
diff --git a/tvix/store/src/pathinfoservice/redb.rs b/tvix/store/src/pathinfoservice/redb.rs new file mode 100644 index 000000000000..180ec3ef7695 --- /dev/null +++ b/tvix/store/src/pathinfoservice/redb.rs @@ -0,0 +1,214 @@ +use super::PathInfoService; +use crate::proto::PathInfo; +use data_encoding::BASE64; +use futures::{stream::BoxStream, StreamExt}; +use prost::Message; +use redb::{Database, ReadableTable, TableDefinition}; +use std::{path::PathBuf, sync::Arc}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::async_trait; +use tracing::{instrument, warn}; +use tvix_castore::{ + composition::{CompositionContext, ServiceBuilder}, + Error, +}; + +const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo"); + +/// PathInfoService implementation using redb under the hood. +/// redb stores all of its data in a single file with a K/V pointing from a path's output hash to +/// its corresponding protobuf-encoded PathInfo. +pub struct RedbPathInfoService { + // We wrap db in an Arc to be able to move it into spawn_blocking, + // as discussed in https://github.com/cberner/redb/issues/789 + db: Arc<Database>, +} + +impl RedbPathInfoService { + /// Constructs a new instance using the specified file system path for + /// storage. + pub async fn new(path: PathBuf) -> Result<Self, Error> { + if path == PathBuf::from("/") { + return Err(Error::StorageError( + "cowardly refusing to open / with redb".to_string(), + )); + } + + let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> { + let db = redb::Database::create(path)?; + create_schema(&db)?; + Ok(db) + }) + .await??; + + Ok(Self { db: Arc::new(db) }) + } + + /// Constructs a new instance using the in-memory backend. + pub fn new_temporary() -> Result<Self, Error> { + let db = + redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?; + + create_schema(&db)?; + + Ok(Self { db: Arc::new(db) }) + } +} + +/// Ensures all tables are present. +fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { + // Opens a write transaction and calls open_table on PATHINFO_TABLE, which will + // create it if not present. + let txn = db.begin_write()?; + txn.open_table(PATHINFO_TABLE)?; + txn.commit()?; + + Ok(()) +} + +#[async_trait] +impl PathInfoService for RedbPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let db = self.db.clone(); + + tokio::task::spawn_blocking({ + move || { + let txn = db.begin_read()?; + let table = txn.open_table(PATHINFO_TABLE)?; + match table.get(digest)? { + Some(pathinfo_bytes) => Ok(Some( + PathInfo::decode(pathinfo_bytes.value().as_slice()).map_err(|e| { + warn!(err=%e, "failed to decode stored PathInfo"); + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?, + )), + None => Ok(None), + } + } + }) + .await? + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))? + .to_owned(); + + let path_info_encoded = path_info.encode_to_vec(); + let db = self.db.clone(); + + tokio::task::spawn_blocking({ + move || -> Result<(), Error> { + let txn = db.begin_write()?; + { + let mut table = txn.open_table(PATHINFO_TABLE)?; + table + .insert(store_path.digest(), path_info_encoded) + .map_err(|e| { + warn!(err=%e, "failed to insert PathInfo"); + Error::StorageError(format!("failed to insert PathInfo: {}", e)) + })?; + } + Ok(txn.commit()?) + } + }) + .await??; + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let db = self.db.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(50); + + // Spawn a blocking task which writes all PathInfos to tx. + tokio::task::spawn_blocking({ + move || -> Result<(), Error> { + let read_txn = db.begin_read()?; + let table = read_txn.open_table(PATHINFO_TABLE)?; + + for elem in table.iter()? { + let elem = elem?; + tokio::runtime::Handle::current() + .block_on(tx.send(Ok( + PathInfo::decode(elem.1.value().as_slice()).map_err(|e| { + Error::InvalidRequest(format!("invalid PathInfo: {}", e)) + })?, + ))) + .map_err(|e| Error::StorageError(e.to_string()))?; + } + + Ok(()) + } + }); + + ReceiverStream::from(rx).boxed() + } +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct RedbPathInfoServiceConfig { + is_temporary: bool, + #[serde(default)] + /// required when is_temporary = false + path: Option<PathBuf>, +} + +impl TryFrom<url::Url> for RedbPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // redb doesn't support host, and a path can be provided (otherwise it'll live in memory only) + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string()).into()); + } + + Ok(if url.path().is_empty() { + RedbPathInfoServiceConfig { + is_temporary: true, + path: None, + } + } else { + RedbPathInfoServiceConfig { + is_temporary: false, + path: Some(url.path().into()), + } + }) + } +} + +#[async_trait] +impl ServiceBuilder for RedbPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + match self { + RedbPathInfoServiceConfig { + is_temporary: true, + path: None, + } => Ok(Arc::new(RedbPathInfoService::new_temporary()?)), + RedbPathInfoServiceConfig { + is_temporary: true, + path: Some(_), + } => Err( + Error::StorageError("Temporary RedbPathInfoService can not have path".into()) + .into(), + ), + RedbPathInfoServiceConfig { + is_temporary: false, + path: None, + } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()), + RedbPathInfoServiceConfig { + is_temporary: false, + path: Some(path), + } => Ok(Arc::new(RedbPathInfoService::new(path.to_owned()).await?)), + } + } +} |