about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/Cargo.lock11
-rw-r--r--tvix/Cargo.nix39
-rw-r--r--tvix/boot/tests/default.nix1
-rw-r--r--tvix/castore/Cargo.toml1
-rw-r--r--tvix/castore/src/errors.rs36
-rw-r--r--tvix/store/Cargo.toml1
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs10
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs3
-rw-r--r--tvix/store/src/pathinfoservice/redb.rs214
-rw-r--r--tvix/store/src/pathinfoservice/tests/mod.rs2
10 files changed, 318 insertions, 0 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 5d5208b7e2d8..cb3ed4c096e8 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -3210,6 +3210,15 @@ dependencies = [
 ]
 
 [[package]]
+name = "redb"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a6dd20d3cdeb9c7d2366a0b16b93b35b75aec15309fbeb7ce477138c9f68c8c0"
+dependencies = [
+ "libc",
+]
+
+[[package]]
 name = "redox_syscall"
 version = "0.2.16"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4808,6 +4817,7 @@ dependencies = [
  "pin-project-lite",
  "prost 0.13.1",
  "prost-build 0.13.1",
+ "redb",
  "rstest",
  "rstest_reuse",
  "serde",
@@ -4994,6 +5004,7 @@ dependencies = [
  "pin-project-lite",
  "prost 0.13.1",
  "prost-build 0.13.1",
+ "redb",
  "reqwest",
  "reqwest-middleware",
  "rstest",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 64d0adabfdd4..4826317e9fb1 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -10068,6 +10068,37 @@ rec {
           "web_spin_lock" = [ "dep:wasm_sync" ];
         };
       };
+      "redb" = rec {
+        crateName = "redb";
+        version = "2.1.1";
+        edition = "2021";
+        sha256 = "1h68d2gqq4vpwiyfpyq9ag0swxavnf9npcd0cqipv77brp9j1pd6";
+        type = [ "cdylib" "rlib" ];
+        authors = [
+          "Christopher Berner <christopherberner@gmail.com>"
+        ];
+        dependencies = [
+          {
+            name = "libc";
+            packageId = "libc";
+            target = { target, features }: (target."unix" or false);
+          }
+        ];
+        devDependencies = [
+          {
+            name = "libc";
+            packageId = "libc";
+            target = { target, features }: (!("wasi" == target."os" or null));
+          }
+        ];
+        features = {
+          "log" = [ "dep:log" ];
+          "logging" = [ "log" ];
+          "pyo3" = [ "dep:pyo3" ];
+          "pyo3-build-config" = [ "dep:pyo3-build-config" ];
+          "python" = [ "pyo3" "pyo3-build-config" ];
+        };
+      };
       "redox_syscall 0.2.16" = rec {
         crateName = "redox_syscall";
         version = "0.2.16";
@@ -15648,6 +15679,10 @@ rec {
             packageId = "prost 0.13.1";
           }
           {
+            name = "redb";
+            packageId = "redb";
+          }
+          {
             name = "serde";
             packageId = "serde";
             features = [ "derive" ];
@@ -16407,6 +16442,10 @@ rec {
             packageId = "prost 0.13.1";
           }
           {
+            name = "redb";
+            packageId = "redb";
+          }
+          {
             name = "reqwest";
             packageId = "reqwest";
             usesDefaultFeatures = false;
diff --git a/tvix/boot/tests/default.nix b/tvix/boot/tests/default.nix
index a6c3047d08f5..101b71819d19 100644
--- a/tvix/boot/tests/default.nix
+++ b/tvix/boot/tests/default.nix
@@ -171,6 +171,7 @@ depot.nix.readTree.drvTargets
 
   closure-nixos = (mkBootTest {
     blobServiceAddr = "objectstore+file:///build/blobs";
+    pathInfoServiceAddr = "redb:///build/pathinfo.redb";
     path = testSystem;
     isClosure = true;
     vmCmdline = "init=${testSystem}/init panic=-1"; # reboot immediately on panic
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index aaa788ca707c..ded2292db750 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -40,6 +40,7 @@ petgraph = "0.6.4"
 erased-serde = "0.4.5"
 serde_tagged = "0.3.0"
 hyper-util = "0.1.6"
+redb = "2.1.1"
 
 [dependencies.bigtable_rs]
 optional = true
diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs
index 8343d0774aec..5bbcd7b04ef1 100644
--- a/tvix/castore/src/errors.rs
+++ b/tvix/castore/src/errors.rs
@@ -33,6 +33,42 @@ impl From<crate::tonic::Error> for Error {
     }
 }
 
+impl From<redb::Error> for Error {
+    fn from(value: redb::Error) -> Self {
+        Error::StorageError(value.to_string())
+    }
+}
+
+impl From<redb::DatabaseError> for Error {
+    fn from(value: redb::DatabaseError) -> Self {
+        Error::StorageError(value.to_string())
+    }
+}
+
+impl From<redb::TableError> for Error {
+    fn from(value: redb::TableError) -> Self {
+        Error::StorageError(value.to_string())
+    }
+}
+
+impl From<redb::TransactionError> for Error {
+    fn from(value: redb::TransactionError) -> Self {
+        Error::StorageError(value.to_string())
+    }
+}
+
+impl From<redb::StorageError> for Error {
+    fn from(value: redb::StorageError) -> Self {
+        Error::StorageError(value.to_string())
+    }
+}
+
+impl From<redb::CommitError> for Error {
+    fn from(value: redb::CommitError) -> Self {
+        Error::StorageError(value.to_string())
+    }
+}
+
 impl From<std::io::Error> for Error {
     fn from(value: std::io::Error) -> Self {
         if value.kind() == std::io::ErrorKind::InvalidInput {
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 964fc9be2e1a..de7d0a5042d0 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -45,6 +45,7 @@ tracing-indicatif = "0.3.6"
 hyper-util = "0.1.6"
 toml = { version = "0.8.15", optional = true }
 tonic-health = { version = "0.12.1", default-features = false }
+redb = "2.1.1"
 
 [dependencies.tonic-reflection]
 optional = true
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index 5635c226c2de..262e66c1bd7f 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -63,6 +63,8 @@ mod tests {
     lazy_static! {
         static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
         static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap();
+        static ref TMPDIR_REDB_1: TempDir = TempDir::new().unwrap();
+        static ref TMPDIR_REDB_2: TempDir = TempDir::new().unwrap();
     }
 
     // the gRPC tests below don't fail, because we connect lazily.
@@ -88,6 +90,14 @@ mod tests {
     #[case::memory_invalid_root_path("memory:///", false)]
     /// This sets a memory url path to "/foo", which is invalid.
     #[case::memory_invalid_root_path_foo("memory:///foo", false)]
+    /// redb with a host, and a valid path path, which should fail.
+    #[case::redb_invalid_host_with_valid_path(&format!("redb://foo.example{}", &TMPDIR_REDB_1.path().to_str().unwrap()), false)]
+    /// redb with / as path, which should fail.
+    #[case::redb_invalid_root("redb:///", false)]
+    /// redb with / as path, which should succeed.
+    #[case::redb_valid_path(&format!("redb://{}", &TMPDIR_REDB_2.path().join("foo").to_str().unwrap()), true)]
+    /// redb using the in-memory backend, which should succeed.
+    #[case::redb_valid_in_memory("redb://", true)]
     /// Correct Scheme for the cache.nixos.org binary cache.
     #[case::correct_nix_https("nix+https://cache.nixos.org", true)]
     /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL).
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
index 70f752f22916..d118a8af1e73 100644
--- a/tvix/store/src/pathinfoservice/mod.rs
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -4,6 +4,7 @@ mod grpc;
 mod lru;
 mod memory;
 mod nix_http;
+mod redb;
 mod sled;
 
 #[cfg(any(feature = "fuse", feature = "virtiofs"))]
@@ -28,6 +29,7 @@ pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig};
 pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig};
 pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig};
 pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig};
+pub use self::redb::{RedbPathInfoService, RedbPathInfoServiceConfig};
 pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig};
 
 #[cfg(feature = "cloud")]
@@ -88,6 +90,7 @@ pub(crate) fn register_pathinfo_services(reg: &mut Registry) {
     reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, MemoryPathInfoServiceConfig>("memory");
     reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, NixHTTPPathInfoServiceConfig>("nix");
     reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, SledPathInfoServiceConfig>("sled");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, RedbPathInfoServiceConfig>("redb");
     #[cfg(feature = "cloud")]
     {
         reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, BigtableParameters>(
diff --git a/tvix/store/src/pathinfoservice/redb.rs b/tvix/store/src/pathinfoservice/redb.rs
new file mode 100644
index 000000000000..180ec3ef7695
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/redb.rs
@@ -0,0 +1,214 @@
+use super::PathInfoService;
+use crate::proto::PathInfo;
+use data_encoding::BASE64;
+use futures::{stream::BoxStream, StreamExt};
+use prost::Message;
+use redb::{Database, ReadableTable, TableDefinition};
+use std::{path::PathBuf, sync::Arc};
+use tokio_stream::wrappers::ReceiverStream;
+use tonic::async_trait;
+use tracing::{instrument, warn};
+use tvix_castore::{
+    composition::{CompositionContext, ServiceBuilder},
+    Error,
+};
+
+const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
+
+/// PathInfoService implementation using redb under the hood.
+/// redb stores all of its data in a single file with a K/V pointing from a path's output hash to
+/// its corresponding protobuf-encoded PathInfo.
+pub struct RedbPathInfoService {
+    // We wrap db in an Arc to be able to move it into spawn_blocking,
+    // as discussed in https://github.com/cberner/redb/issues/789
+    db: Arc<Database>,
+}
+
+impl RedbPathInfoService {
+    /// Constructs a new instance using the specified file system path for
+    /// storage.
+    pub async fn new(path: PathBuf) -> Result<Self, Error> {
+        if path == PathBuf::from("/") {
+            return Err(Error::StorageError(
+                "cowardly refusing to open / with redb".to_string(),
+            ));
+        }
+
+        let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
+            let db = redb::Database::create(path)?;
+            create_schema(&db)?;
+            Ok(db)
+        })
+        .await??;
+
+        Ok(Self { db: Arc::new(db) })
+    }
+
+    /// Constructs a new instance using the in-memory backend.
+    pub fn new_temporary() -> Result<Self, Error> {
+        let db =
+            redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
+
+        create_schema(&db)?;
+
+        Ok(Self { db: Arc::new(db) })
+    }
+}
+
+/// Ensures all tables are present.
+fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
+    // Opens a write transaction and calls open_table on PATHINFO_TABLE, which will
+    // create it if not present.
+    let txn = db.begin_write()?;
+    txn.open_table(PATHINFO_TABLE)?;
+    txn.commit()?;
+
+    Ok(())
+}
+
+#[async_trait]
+impl PathInfoService for RedbPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let db = self.db.clone();
+
+        tokio::task::spawn_blocking({
+            move || {
+                let txn = db.begin_read()?;
+                let table = txn.open_table(PATHINFO_TABLE)?;
+                match table.get(digest)? {
+                    Some(pathinfo_bytes) => Ok(Some(
+                        PathInfo::decode(pathinfo_bytes.value().as_slice()).map_err(|e| {
+                            warn!(err=%e, "failed to decode stored PathInfo");
+                            Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                        })?,
+                    )),
+                    None => Ok(None),
+                }
+            }
+        })
+        .await?
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // Call validate on the received PathInfo message.
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?
+            .to_owned();
+
+        let path_info_encoded = path_info.encode_to_vec();
+        let db = self.db.clone();
+
+        tokio::task::spawn_blocking({
+            move || -> Result<(), Error> {
+                let txn = db.begin_write()?;
+                {
+                    let mut table = txn.open_table(PATHINFO_TABLE)?;
+                    table
+                        .insert(store_path.digest(), path_info_encoded)
+                        .map_err(|e| {
+                            warn!(err=%e, "failed to insert PathInfo");
+                            Error::StorageError(format!("failed to insert PathInfo: {}", e))
+                        })?;
+                }
+                Ok(txn.commit()?)
+            }
+        })
+        .await??;
+
+        Ok(path_info)
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let db = self.db.clone();
+        let (tx, rx) = tokio::sync::mpsc::channel(50);
+
+        // Spawn a blocking task which writes all PathInfos to tx.
+        tokio::task::spawn_blocking({
+            move || -> Result<(), Error> {
+                let read_txn = db.begin_read()?;
+                let table = read_txn.open_table(PATHINFO_TABLE)?;
+
+                for elem in table.iter()? {
+                    let elem = elem?;
+                    tokio::runtime::Handle::current()
+                        .block_on(tx.send(Ok(
+                            PathInfo::decode(elem.1.value().as_slice()).map_err(|e| {
+                                Error::InvalidRequest(format!("invalid PathInfo: {}", e))
+                            })?,
+                        )))
+                        .map_err(|e| Error::StorageError(e.to_string()))?;
+                }
+
+                Ok(())
+            }
+        });
+
+        ReceiverStream::from(rx).boxed()
+    }
+}
+
+#[derive(serde::Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct RedbPathInfoServiceConfig {
+    is_temporary: bool,
+    #[serde(default)]
+    /// required when is_temporary = false
+    path: Option<PathBuf>,
+}
+
+impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        // redb doesn't support host, and a path can be provided (otherwise it'll live in memory only)
+        if url.has_host() {
+            return Err(Error::StorageError("no host allowed".to_string()).into());
+        }
+
+        Ok(if url.path().is_empty() {
+            RedbPathInfoServiceConfig {
+                is_temporary: true,
+                path: None,
+            }
+        } else {
+            RedbPathInfoServiceConfig {
+                is_temporary: false,
+                path: Some(url.path().into()),
+            }
+        })
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for RedbPathInfoServiceConfig {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        match self {
+            RedbPathInfoServiceConfig {
+                is_temporary: true,
+                path: None,
+            } => Ok(Arc::new(RedbPathInfoService::new_temporary()?)),
+            RedbPathInfoServiceConfig {
+                is_temporary: true,
+                path: Some(_),
+            } => Err(
+                Error::StorageError("Temporary RedbPathInfoService can not have path".into())
+                    .into(),
+            ),
+            RedbPathInfoServiceConfig {
+                is_temporary: false,
+                path: None,
+            } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()),
+            RedbPathInfoServiceConfig {
+                is_temporary: false,
+                path: Some(path),
+            } => Ok(Arc::new(RedbPathInfoService::new(path.to_owned()).await?)),
+        }
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs
index 82684080156d..777588e9beda 100644
--- a/tvix/store/src/pathinfoservice/tests/mod.rs
+++ b/tvix/store/src/pathinfoservice/tests/mod.rs
@@ -7,6 +7,7 @@ use rstest::*;
 use rstest_reuse::{self, *};
 
 use super::PathInfoService;
+use crate::pathinfoservice::redb::RedbPathInfoService;
 use crate::pathinfoservice::MemoryPathInfoService;
 use crate::pathinfoservice::SledPathInfoService;
 use crate::proto::PathInfo;
@@ -27,6 +28,7 @@ use self::utils::make_bigtable_path_info_service;
     svc
 })]
 #[case::sled(SledPathInfoService::new_temporary().unwrap())]
+#[case::redb(RedbPathInfoService::new_temporary().unwrap())]
 #[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))]
 pub fn path_info_services(#[case] svc: impl PathInfoService) {}