diff options
Diffstat (limited to 'tvix/castore')
55 files changed, 4120 insertions, 2311 deletions
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index ea4c598fe884..3c32e5f05229 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -4,98 +4,70 @@ version = "0.1.0" edition = "2021" [dependencies] -async-compression = { version = "0.4.9", features = ["tokio", "zstd"]} -async-stream = "0.3.5" -async-tempfile = "0.4.0" -blake3 = { version = "1.3.1", features = ["rayon", "std", "traits-preview"] } -bstr = "1.6.0" -bytes = "1.4.0" -data-encoding = "2.3.3" -digest = "0.10.7" -fastcdc = { version = "3.1.0", features = ["tokio"] } -futures = "0.3.30" -lazy_static = "1.4.0" -object_store = { version = "0.9.1", features = ["http"] } -parking_lot = "0.12.1" -pin-project-lite = "0.2.13" -prost = "0.12.1" -sled = { version = "0.34.7" } -thiserror = "1.0.38" -tokio-stream = { version = "0.1.14", features = ["fs", "net"] } -tokio-util = { version = "0.7.9", features = ["io", "io-util", "codec"] } -tokio-tar = "0.3.1" -tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } -tonic = "0.11.0" -tower = "0.4.13" -tracing = "0.1.37" -tracing-indicatif = "0.3.6" -tvix-tracing = { path = "../tracing" } -url = "2.4.0" -walkdir = "2.4.0" -zstd = "0.13.0" -serde = { version = "1.0.197", features = [ "derive" ] } -serde_with = "3.7.0" -serde_qs = "0.12.0" -petgraph = "0.6.4" - -[dependencies.bigtable_rs] -optional = true -# https://github.com/liufuyang/bigtable_rs/pull/72 -git = "https://github.com/flokli/bigtable_rs" -rev = "0af404741dfc40eb9fa99cf4d4140a09c5c20df7" - -[dependencies.fuse-backend-rs] -optional = true -version = "0.11.0" - -[dependencies.libc] -optional = true -version = "0.2.144" - -[dependencies.threadpool] -version = "1.8.1" -optional = true - -[dependencies.tonic-reflection] -optional = true -version = "0.11.0" - -[dependencies.vhost] -optional = true -version = "0.6" - -[dependencies.vhost-user-backend] -optional = true -version = "0.8" - -[dependencies.virtio-queue] -optional = true -version = "0.7" - -[dependencies.vm-memory] -optional = true -version = "0.10" - -[dependencies.vmm-sys-util] -optional = true -version = "0.11" - -[dependencies.virtio-bindings] -optional = true -version = "0.2.1" +async-compression = { workspace = true, features = ["tokio", "zstd"] } +async-stream = { workspace = true } +async-tempfile = { workspace = true } +blake3 = { workspace = true, features = ["rayon", "std", "traits-preview"] } +bstr = { workspace = true } +bytes = { workspace = true } +data-encoding = { workspace = true } +digest = { workspace = true } +fastcdc = { workspace = true, features = ["tokio"] } +futures = { workspace = true } +object_store = { workspace = true, features = ["http"] } +parking_lot = { workspace = true } +pin-project-lite = { workspace = true } +prost = { workspace = true } +thiserror = { workspace = true } +tokio-stream = { workspace = true, features = ["fs", "net"] } +tokio-util = { workspace = true, features = ["io", "io-util", "codec"] } +tokio-tar = { workspace = true } +tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } +tonic = { workspace = true } +tower = { workspace = true } +tracing = { workspace = true } +tracing-indicatif = { workspace = true } +tvix-tracing = { path = "../tracing", features = ["tonic"] } +url = { workspace = true } +walkdir = { workspace = true } +zstd = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_with = { workspace = true } +serde_qs = { workspace = true } +petgraph = { workspace = true } +pin-project = { workspace = true } +erased-serde = { workspace = true } +serde_tagged = { workspace = true } +hyper-util = { workspace = true } +redb = { workspace = true, features = ["logging"] } +bigtable_rs = { workspace = true, optional = true } +fuse-backend-rs = { workspace = true, optional = true } +libc = { workspace = true, optional = true } +threadpool = { workspace = true, optional = true } +tonic-reflection = { workspace = true, optional = true } +vhost = { workspace = true, optional = true } +vhost-user-backend = { workspace = true, optional = true } +virtio-queue = { workspace = true, optional = true } +vm-memory = { workspace = true, optional = true } +vmm-sys-util = { workspace = true, optional = true } +virtio-bindings = { workspace = true, optional = true } +wu-manber = { workspace = true } +auto_impl = "1.2.0" [build-dependencies] -prost-build = "0.12.1" -tonic-build = "0.11.0" +prost-build = { workspace = true } +tonic-build = { workspace = true } [dev-dependencies] -async-process = "2.1.0" -rstest = "0.19.0" -tempfile = "3.3.0" -tokio-retry = "0.3.0" -hex-literal = "0.4.1" -rstest_reuse = "0.6.0" -xattr = "1.3.1" +async-process = { workspace = true } +rstest = { workspace = true } +tempfile = { workspace = true } +tokio-retry = { workspace = true } +hex-literal = { workspace = true } +rstest_reuse = { workspace = true } +xattr = { workspace = true } +serde_json = { workspace = true } +tokio-test = { workspace = true } [features] default = ["cloud"] @@ -119,6 +91,9 @@ virtiofs = [ ] fuse = ["fs"] tonic-reflection = ["dep:tonic-reflection"] +# This feature enables anonymous url syntax which might inherently expose +# arbitrary composition possibilities to the user. +xp-composition-url-refs = [] # Whether to run the integration tests. # Requires the following packages in $PATH: # cbtemulator, google-cloud-bigtable-tool diff --git a/tvix/castore/build.rs b/tvix/castore/build.rs index 089c093e71b4..2250d4ebf0a2 100644 --- a/tvix/castore/build.rs +++ b/tvix/castore/build.rs @@ -12,17 +12,13 @@ fn main() -> Result<()> { builder = builder.file_descriptor_set_path(descriptor_path); }; - // https://github.com/hyperium/tonic/issues/908 - let mut config = prost_build::Config::new(); - config.bytes(["."]); - config.type_attribute(".", "#[derive(Eq, Hash)]"); - builder .build_server(true) .build_client(true) .emit_rerun_if_changed(false) - .compile_with_config( - config, + .bytes(["."]) + .type_attribute(".", "#[derive(Eq, Hash)]") + .compile_protos( &[ "tvix/castore/protos/castore.proto", "tvix/castore/protos/rpc_blobstore.proto", @@ -30,7 +26,7 @@ fn main() -> Result<()> { ], // If we are in running `cargo build` manually, using `../..` works fine, // but in case we run inside a nix build, we need to instead point PROTO_ROOT - // to a sparseTree containing that structure. + // to a custom tree containing that structure. &[match std::env::var_os("PROTO_ROOT") { Some(proto_root) => proto_root.to_str().unwrap().to_owned(), None => "../..".to_string(), diff --git a/tvix/castore/default.nix b/tvix/castore/default.nix index 03a12b6c2016..47c9a99980b8 100644 --- a/tvix/castore/default.nix +++ b/tvix/castore/default.nix @@ -3,23 +3,23 @@ (depot.tvix.crates.workspaceMembers.tvix-castore.build.override { runTests = true; testPreRun = '' - export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; + export SSL_CERT_FILE=/dev/null ''; }).overrideAttrs (old: rec { meta.ci.targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru); passthru = (depot.tvix.utils.mkFeaturePowerset { inherit (old) crateName; - features = ([ "cloud" "fuse" "tonic-reflection" ] + features = ([ "cloud" "fuse" "tonic-reflection" "xp-composition-url-refs" ] # virtiofs feature currently fails to build on Darwin ++ lib.optional pkgs.stdenv.isLinux "virtiofs"); override.testPreRun = '' - export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt + export SSL_CERT_FILE=/dev/null ''; }) // { integration-tests = depot.tvix.crates.workspaceMembers.${old.crateName}.build.override (old: { runTests = true; testPreRun = '' - export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; + export SSL_CERT_FILE=/dev/null export PATH="$PATH:${pkgs.lib.makeBinPath [ pkgs.cbtemulator pkgs.google-cloud-bigtable-tool ]}" ''; features = old.features ++ [ "integration" ]; diff --git a/tvix/castore/protos/default.nix b/tvix/castore/protos/default.nix index feef55690fb9..08bb8fcfeef1 100644 --- a/tvix/castore/protos/default.nix +++ b/tvix/castore/protos/default.nix @@ -1,16 +1,10 @@ -{ depot, pkgs, ... }: +{ depot, pkgs, lib, ... }: let - protos = depot.nix.sparseTree { - name = "castore-protos"; - root = depot.path.origSrc; - paths = [ - ./castore.proto - ./rpc_blobstore.proto - ./rpc_directory.proto - ../../../buf.yaml - ../../../buf.gen.yaml - ]; - }; + protos = lib.sourceByRegex depot.path.origSrc [ + "buf.yaml" + "buf.gen.yaml" + "^tvix(/castore(/protos(/.*\.proto)?)?)?$" + ]; in depot.nix.readTree.drvTargets { inherit protos; diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs index 6e8355874bca..0e809b300485 100644 --- a/tvix/castore/src/blobservice/chunked_reader.rs +++ b/tvix/castore/src/blobservice/chunked_reader.rs @@ -253,14 +253,16 @@ where #[cfg(test)] mod test { - use std::{io::SeekFrom, sync::Arc}; + use std::{ + io::SeekFrom, + sync::{Arc, LazyLock}, + }; use crate::{ blobservice::{chunked_reader::ChunkedReader, BlobService, MemoryBlobService}, B3Digest, }; use hex_literal::hex; - use lazy_static::lazy_static; use tokio::io::{AsyncReadExt, AsyncSeekExt}; const CHUNK_1: [u8; 2] = hex!("0001"); @@ -269,21 +271,26 @@ mod test { const CHUNK_4: [u8; 2] = hex!("0708"); const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f"); - lazy_static! { - // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]` - pub static ref CHUNK_1_DIGEST: B3Digest = blake3::hash(&CHUNK_1).as_bytes().into(); - pub static ref CHUNK_2_DIGEST: B3Digest = blake3::hash(&CHUNK_2).as_bytes().into(); - pub static ref CHUNK_3_DIGEST: B3Digest = blake3::hash(&CHUNK_3).as_bytes().into(); - pub static ref CHUNK_4_DIGEST: B3Digest = blake3::hash(&CHUNK_4).as_bytes().into(); - pub static ref CHUNK_5_DIGEST: B3Digest = blake3::hash(&CHUNK_5).as_bytes().into(); - pub static ref BLOB_1_LIST: [(B3Digest, u64); 5] = [ + // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]` + pub static CHUNK_1_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&CHUNK_1).as_bytes().into()); + pub static CHUNK_2_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&CHUNK_2).as_bytes().into()); + pub static CHUNK_3_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&CHUNK_3).as_bytes().into()); + pub static CHUNK_4_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&CHUNK_4).as_bytes().into()); + pub static CHUNK_5_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&CHUNK_5).as_bytes().into()); + pub static BLOB_1_LIST: LazyLock<[(B3Digest, u64); 5]> = LazyLock::new(|| { + [ (CHUNK_1_DIGEST.clone(), 2), (CHUNK_2_DIGEST.clone(), 4), (CHUNK_3_DIGEST.clone(), 1), (CHUNK_4_DIGEST.clone(), 2), (CHUNK_5_DIGEST.clone(), 7), - ]; - } + ] + }); use super::ChunkedBlob; diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs index 067eff96f488..1c90fb7bb055 100644 --- a/tvix/castore/src/blobservice/combinator.rs +++ b/tvix/castore/src/blobservice/combinator.rs @@ -1,22 +1,24 @@ -use futures::{StreamExt, TryStreamExt}; -use tokio_util::io::{ReaderStream, StreamReader}; +use std::sync::Arc; + use tonic::async_trait; -use tracing::{instrument, warn}; +use tracing::instrument; -use crate::B3Digest; +use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{B3Digest, Error}; -use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; +use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; -/// Combinator for a BlobService, using a "local" and "remote" blobservice. -/// Requests are tried in (and returned from) the local store first, only if -/// things are not present there, the remote BlobService is queried. -/// In case the local blobservice doesn't have the blob, we ask the remote -/// blobservice for chunks, and try to read each of these chunks from the local -/// blobservice again, before falling back to the remote one. -/// The remote BlobService is never written to. +/// Combinator for a BlobService, using a "near" and "far" blobservice. +/// Requests are tried in (and returned from) the near store first, only if +/// things are not present there, the far BlobService is queried. +/// In case the near blobservice doesn't have the blob, we ask the remote +/// blobservice for chunks, and try to read each of these chunks from the near +/// blobservice again, before falling back to the far one. +/// The far BlobService is never written to. pub struct CombinedBlobService<BL, BR> { - local: BL, - remote: BR, + instance_name: String, + near: BL, + far: BR, } impl<BL, BR> Clone for CombinedBlobService<BL, BR> @@ -26,8 +28,9 @@ where { fn clone(&self) -> Self { Self { - local: self.local.clone(), - remote: self.remote.clone(), + instance_name: self.instance_name.clone(), + near: self.near.clone(), + far: self.far.clone(), } } } @@ -38,95 +41,91 @@ where BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, { - #[instrument(skip(self, digest), fields(blob.digest=%digest))] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> { - Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?) + Ok(self.near.as_ref().has(digest).await? || self.far.as_ref().has(digest).await?) } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> { - if self.local.as_ref().has(digest).await? { - // local store has the blob, so we can assume it also has all chunks. - self.local.as_ref().open_read(digest).await + if self.near.as_ref().has(digest).await? { + // near store has the blob, so we can assume it also has all chunks. + self.near.as_ref().open_read(digest).await } else { - // Local store doesn't have the blob. + // near store doesn't have the blob. // Ask the remote one for the list of chunks, // and create a chunked reader that uses self.open_read() for // individual chunks. There's a chance we already have some chunks - // locally, meaning we don't need to fetch them all from the remote + // in near, meaning we don't need to fetch them all from the far // BlobService. - match self.remote.as_ref().chunks(digest).await? { - // blob doesn't exist on the remote side either, nothing we can do. + match self.far.as_ref().chunks(digest).await? { + // blob doesn't exist on the near side either, nothing we can do. None => Ok(None), Some(remote_chunks) => { - // if there's no more granular chunks, or the remote + // if there's no more granular chunks, or the far // blobservice doesn't support chunks, read the blob from - // the remote blobservice directly. + // the far blobservice directly. if remote_chunks.is_empty() { - return self.remote.as_ref().open_read(digest).await; + return self.far.as_ref().open_read(digest).await; } // otherwise, a chunked reader, which will always try the - // local backend first. + // near backend first. - // map Vec<ChunkMeta> to Vec<(B3Digest, u64)> - let chunks: Vec<(B3Digest, u64)> = remote_chunks - .into_iter() - .map(|chunk_meta| { + let chunked_reader = ChunkedReader::from_chunks( + remote_chunks.into_iter().map(|chunk| { ( - B3Digest::try_from(chunk_meta.digest) - .expect("invalid chunk digest"), - chunk_meta.size, + chunk.digest.try_into().expect("invalid b3 digest"), + chunk.size, ) - }) - .collect(); - - Ok(Some(make_chunked_reader(self.clone(), chunks))) + }), + Arc::new(self.clone()) as Arc<dyn BlobService>, + ); + Ok(Some(Box::new(chunked_reader))) } } } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { - // direct writes to the local one. - self.local.as_ref().open_write().await + // direct writes to the near one. + self.near.as_ref().open_write().await } } -fn make_chunked_reader<BS>( - // This must consume, as we can't retain references to blob_service, - // as it'd add a lifetime to BlobReader in general, which will get - // problematic in TvixStoreFs, which is using async move closures and cloning. - blob_service: BS, - // A list of b3 digests for individual chunks, and their sizes. - chunks: Vec<(B3Digest, u64)>, -) -> Box<dyn BlobReader> -where - BS: BlobService + Clone + 'static, -{ - // TODO: offset, verified streaming - - // construct readers for each chunk - let blob_service = blob_service.clone(); - let readers_stream = tokio_stream::iter(chunks).map(move |(digest, _)| { - let d = digest.to_owned(); - let blob_service = blob_service.clone(); - async move { - blob_service.open_read(&d.to_owned()).await?.ok_or_else(|| { - warn!(chunk.digest = %digest, "chunk not found"); - std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found") - }) - } - }); - - // convert the stream of readers to a stream of streams of byte chunks - let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) }); - - // flatten into one stream of byte chunks - let bytes_stream = bytes_streams.try_flatten(); +#[derive(serde::Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct CombinedBlobServiceConfig { + near: String, + far: String, +} - // convert into AsyncRead - let blob_reader = StreamReader::new(bytes_stream); +impl TryFrom<url::Url> for CombinedBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(_url: url::Url) -> Result<Self, Self::Error> { + Err(Error::StorageError( + "Instantiating a CombinedBlobService from a url is not supported".into(), + ) + .into()) + } +} - Box::new(NaiveSeeker::new(Box::pin(blob_reader))) +#[async_trait] +impl ServiceBuilder for CombinedBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> { + let (local, remote) = futures::join!( + context.resolve(self.near.clone()), + context.resolve(self.far.clone()) + ); + Ok(Arc::new(CombinedBlobService { + instance_name: instance_name.to_string(), + near: local?, + far: remote?, + })) + } } diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index 8898bbfb95ce..803e7fa6a575 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -1,8 +1,12 @@ +use std::sync::Arc; + use url::Url; -use crate::{proto::blob_service_client::BlobServiceClient, Error}; +use crate::composition::{ + with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG, +}; -use super::{BlobService, GRPCBlobService, MemoryBlobService, ObjectStoreBlobService}; +use super::BlobService; /// Constructs a new instance of a [BlobService] from an URI. /// @@ -12,46 +16,19 @@ use super::{BlobService, GRPCBlobService, MemoryBlobService, ObjectStoreBlobServ /// - `objectstore+*://` ([ObjectStoreBlobService]) /// /// See their `from_url` methods for more details about their syntax. -pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error> { +pub async fn from_addr( + uri: &str, +) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> { let url = Url::parse(uri) .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; - let blob_service: Box<dyn BlobService> = match url.scheme() { - "memory" => { - // memory doesn't support host or path in the URL. - if url.has_host() || !url.path().is_empty() { - return Err(Error::StorageError("invalid url".to_string())); - } - Box::<MemoryBlobService>::default() - } - scheme if scheme.starts_with("grpc+") => { - // schemes starting with grpc+ go to the GRPCPathInfoService. - // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. - // - In the case of unix sockets, there must be a path, but may not be a host. - // - In the case of non-unix sockets, there must be a host, but no path. - // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?); - Box::new(GRPCBlobService::from_client(client)) - } - scheme if scheme.starts_with("objectstore+") => { - // We need to convert the URL to string, strip the prefix there, and then - // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. - let trimmed_url = { - let s = url.to_string(); - Url::parse(s.strip_prefix("objectstore+").unwrap()).unwrap() - }; - Box::new( - ObjectStoreBlobService::parse_url(&trimmed_url) - .map_err(|e| Error::StorageError(e.to_string()))?, - ) - } - scheme => { - return Err(crate::Error::StorageError(format!( - "unknown scheme: {}", - scheme - ))) - } - }; + let blob_service_config = with_registry(®, || { + <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>::try_from(url) + })? + .0; + let blob_service = blob_service_config + .build("anonymous", &CompositionContext::blank(®)) + .await?; Ok(blob_service) } diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 5663cd3838ec..9f3efbe6763d 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,4 +1,5 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{ proto::{self, stat_blob_response::ChunkMeta}, B3Digest, @@ -17,47 +18,57 @@ use tokio_util::{ io::{CopyToBytes, SinkWriter}, sync::PollSender, }; -use tonic::{async_trait, transport::Channel, Code, Status}; -use tracing::instrument; +use tonic::{async_trait, Code, Status}; +use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] -pub struct GRPCBlobService { +pub struct GRPCBlobService<T> { + instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + grpc_client: proto::blob_service_client::BlobServiceClient<T>, } -impl GRPCBlobService { +impl<T> GRPCBlobService<T> { /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. - /// panics if called outside the context of a tokio runtime. pub fn from_client( - grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + instance_name: String, + grpc_client: proto::blob_service_client::BlobServiceClient<T>, ) -> Self { - Self { grpc_client } + Self { + instance_name, + grpc_client, + } } } #[async_trait] -impl BlobService for GRPCBlobService { - #[instrument(skip(self, digest), fields(blob.digest=%digest))] +impl<T> BlobService for GRPCBlobService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { - let mut grpc_client = self.grpc_client.clone(); - let resp = grpc_client + match self + .grpc_client + .clone() .stat(proto::StatBlobRequest { digest: digest.clone().into(), ..Default::default() }) - .await; - - match resp { + .await + { Ok(_blob_meta) => Ok(true), Err(e) if e.code() == Code::NotFound => Ok(false), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // First try to get a list of chunks. In case there's only one chunk returned, // buffer its data into a Vec, otherwise use a ChunkedReader. @@ -120,7 +131,7 @@ impl BlobService for GRPCBlobService { /// Returns a BlobWriter, that'll internally wrap each write in a /// [proto::BlobChunk], which is send to the gRPC server. - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // set up an mpsc channel passing around Bytes. let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); @@ -133,6 +144,8 @@ impl BlobService for GRPCBlobService { let task = tokio::spawn({ let mut grpc_client = self.grpc_client.clone(); async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) } + // instrument the task with the current span, this is not done by default + .in_current_span() }); // The tx part of the channel is converted to a sink of byte chunks. @@ -148,7 +161,7 @@ impl BlobService for GRPCBlobService { }) } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { let resp = self .grpc_client @@ -175,6 +188,43 @@ impl BlobService for GRPCBlobService { } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCBlobServiceConfig { + url: String, +} + +impl TryFrom<url::Url> for GRPCBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Ok(GRPCBlobServiceConfig { + url: url.to_string(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for GRPCBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::blob_service_client::BlobServiceClient::new( + crate::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCBlobService::from_client( + instance_name.to_string(), + client, + ))) + } +} + pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> { /// The task containing the put request, and the inner writer, if we're still writing. task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>, @@ -335,8 +385,7 @@ mod tests { .await .expect("must succeed"), ); - - GRPCBlobService::from_client(client) + GRPCBlobService::from_client("root".into(), client) }; let has = grpc_client diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs index 873d06b461de..348b8bb56d5b 100644 --- a/tvix/castore/src/blobservice/memory.rs +++ b/tvix/castore/src/blobservice/memory.rs @@ -6,22 +6,24 @@ use tonic::async_trait; use tracing::instrument; use super::{BlobReader, BlobService, BlobWriter}; -use crate::B3Digest; +use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{B3Digest, Error}; #[derive(Clone, Default)] pub struct MemoryBlobService { + instance_name: String, db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, } #[async_trait] impl BlobService for MemoryBlobService { - #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + #[instrument(skip_all, ret, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { let db = self.db.read(); Ok(db.contains_key(digest)) } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { let db = self.db.read(); @@ -31,12 +33,42 @@ impl BlobService for MemoryBlobService { } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { Box::new(MemoryBlobWriter::new(self.db.clone())) } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MemoryBlobServiceConfig {} + +impl TryFrom<url::Url> for MemoryBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string()).into()); + } + Ok(MemoryBlobServiceConfig {}) + } +} + +#[async_trait] +impl ServiceBuilder for MemoryBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(MemoryBlobService { + instance_name: instance_name.to_string(), + db: Default::default(), + })) + } +} + pub struct MemoryBlobWriter { db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index 50acd40bf769..efba927b586b 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -1,6 +1,9 @@ use std::io; + +use auto_impl::auto_impl; use tonic::async_trait; +use crate::composition::{Registry, ServiceBuilder}; use crate::proto::stat_blob_response::ChunkMeta; use crate::B3Digest; @@ -9,18 +12,17 @@ mod combinator; mod from_addr; mod grpc; mod memory; -mod naive_seeker; mod object_store; #[cfg(test)] pub mod tests; pub use self::chunked_reader::ChunkedReader; -pub use self::combinator::CombinedBlobService; +pub use self::combinator::{CombinedBlobService, CombinedBlobServiceConfig}; pub use self::from_addr::from_addr; -pub use self::grpc::GRPCBlobService; -pub use self::memory::MemoryBlobService; -pub use self::object_store::ObjectStoreBlobService; +pub use self::grpc::{GRPCBlobService, GRPCBlobServiceConfig}; +pub use self::memory::{MemoryBlobService, MemoryBlobServiceConfig}; +pub use self::object_store::{ObjectStoreBlobService, ObjectStoreBlobServiceConfig}; /// The base trait all BlobService services need to implement. /// It provides functions to check whether a given blob exists, @@ -28,6 +30,7 @@ pub use self::object_store::ObjectStoreBlobService; /// which will implement a writer interface, and also provides a close funtion, /// to finalize a blob and get its digest. #[async_trait] +#[auto_impl(&, &mut, Arc, Box)] pub trait BlobService: Send + Sync { /// Check if the service has the blob, by its content hash. /// On implementations returning chunks, this must also work for chunks. @@ -59,28 +62,6 @@ pub trait BlobService: Send + Sync { } } -#[async_trait] -impl<A> BlobService for A -where - A: AsRef<dyn BlobService> + Send + Sync, -{ - async fn has(&self, digest: &B3Digest) -> io::Result<bool> { - self.as_ref().has(digest).await - } - - async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { - self.as_ref().open_read(digest).await - } - - async fn open_write(&self) -> Box<dyn BlobWriter> { - self.as_ref().open_write().await - } - - async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { - self.as_ref().chunks(digest).await - } -} - /// A [tokio::io::AsyncWrite] that the user needs to close() afterwards for persist. /// On success, it returns the digest of the written blob. #[async_trait] @@ -101,3 +82,11 @@ impl BlobReader for io::Cursor<&'static [u8; 0]> {} impl BlobReader for io::Cursor<Vec<u8>> {} impl BlobReader for io::Cursor<bytes::Bytes> {} impl BlobReader for tokio::fs::File {} + +/// Registers the builtin BlobService implementations with the registry +pub(crate) fn register_blob_services(reg: &mut Registry) { + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::ObjectStoreBlobServiceConfig>("objectstore"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::MemoryBlobServiceConfig>("memory"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::CombinedBlobServiceConfig>("combined"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::GRPCBlobServiceConfig>("grpc"); +} diff --git a/tvix/castore/src/blobservice/naive_seeker.rs b/tvix/castore/src/blobservice/naive_seeker.rs deleted file mode 100644 index f5a530715093..000000000000 --- a/tvix/castore/src/blobservice/naive_seeker.rs +++ /dev/null @@ -1,265 +0,0 @@ -use super::BlobReader; -use futures::ready; -use pin_project_lite::pin_project; -use std::io; -use std::task::Poll; -use tokio::io::AsyncRead; -use tracing::{debug, instrument, trace, warn}; - -pin_project! { - /// This implements [tokio::io::AsyncSeek] for and [tokio::io::AsyncRead] by - /// simply skipping over some bytes, keeping track of the position. - /// It fails whenever you try to seek backwards. - /// - /// ## Pinning concerns: - /// - /// [NaiveSeeker] is itself pinned by callers, and we do not need to concern - /// ourselves regarding that. - /// - /// Though, its fields as per - /// <https://doc.rust-lang.org/std/pin/#pinning-is-not-structural-for-field> - /// can be pinned or unpinned. - /// - /// So we need to go over each field and choose our policy carefully. - /// - /// The obvious cases are the bookkeeping integers we keep in the structure, - /// those are private and not shared to anyone, we never build a - /// `Pin<&mut X>` out of them at any point, therefore, we can safely never - /// mark them as pinned. Of course, it is expected that no developer here - /// attempt to `pin!(self.pos)` to pin them because it makes no sense. If - /// they have to become pinned, they should be marked `#[pin]` and we need - /// to discuss it. - /// - /// So the bookkeeping integers are in the right state with respect to their - /// pinning status. The projection should offer direct access. - /// - /// On the `r` field, i.e. a `BufReader<R>`, given that - /// <https://docs.rs/tokio/latest/tokio/io/struct.BufReader.html#impl-Unpin-for-BufReader%3CR%3E> - /// is available, even a `Pin<&mut BufReader<R>>` can be safely moved. - /// - /// The only care we should have regards the internal reader itself, i.e. - /// the `R` instance, see that Tokio decided to `#[pin]` it too: - /// <https://docs.rs/tokio/latest/src/tokio/io/util/buf_reader.rs.html#29> - /// - /// In general, there's no `Unpin` instance for `R: tokio::io::AsyncRead` - /// (see <https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html>). - /// - /// Therefore, we could keep it unpinned and pin it in every call site - /// whenever we need to call `poll_*` which can be confusing to the non- - /// expert developer and we have a fair share amount of situations where the - /// [BufReader] instance is naked, i.e. in its `&mut BufReader<R>` - /// form, this is annoying because it could lead to expose the naked `R` - /// internal instance somehow and would produce a risk of making it move - /// unexpectedly. - /// - /// We choose the path of the least resistance as we have no reason to have - /// access to the raw `BufReader<R>` instance, we just `#[pin]` it too and - /// enjoy its `poll_*` safe APIs and push the unpinning concerns to the - /// internal implementations themselves, which studied the question longer - /// than us. - pub struct NaiveSeeker<R: tokio::io::AsyncRead> { - #[pin] - r: tokio::io::BufReader<R>, - pos: u64, - bytes_to_skip: u64, - } -} - -/// The buffer size used to discard data. -const DISCARD_BUF_SIZE: usize = 4096; - -impl<R: tokio::io::AsyncRead> NaiveSeeker<R> { - pub fn new(r: R) -> Self { - NaiveSeeker { - r: tokio::io::BufReader::new(r), - pos: 0, - bytes_to_skip: 0, - } - } -} - -impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> { - #[instrument(level = "trace", skip_all)] - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll<std::io::Result<()>> { - // The amount of data read can be determined by the increase - // in the length of the slice returned by `ReadBuf::filled`. - let filled_before = buf.filled().len(); - - let this = self.project(); - ready!(this.r.poll_read(cx, buf))?; - - let bytes_read = buf.filled().len() - filled_before; - *this.pos += bytes_read as u64; - - trace!(bytes_read = bytes_read, new_pos = this.pos, "poll_read"); - - Ok(()).into() - } -} - -impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> { - fn poll_fill_buf( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll<io::Result<&[u8]>> { - self.project().r.poll_fill_buf(cx) - } - - #[instrument(level = "trace", skip(self))] - fn consume(self: std::pin::Pin<&mut Self>, amt: usize) { - let this = self.project(); - this.r.consume(amt); - *this.pos += amt as u64; - - trace!(new_pos = this.pos, "consume"); - } -} - -impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> { - #[instrument(level="trace", skip(self), fields(inner_pos=%self.pos), err(Debug))] - fn start_seek( - self: std::pin::Pin<&mut Self>, - position: std::io::SeekFrom, - ) -> std::io::Result<()> { - let absolute_offset: u64 = match position { - io::SeekFrom::Start(start_offset) => { - if start_offset < self.pos { - return Err(io::Error::new( - io::ErrorKind::Unsupported, - format!("can't seek backwards ({} -> {})", self.pos, start_offset), - )); - } else { - start_offset - } - } - // we don't know the total size, can't support this. - io::SeekFrom::End(_end_offset) => { - return Err(io::Error::new( - io::ErrorKind::Unsupported, - "can't seek from end", - )); - } - io::SeekFrom::Current(relative_offset) => { - if relative_offset < 0 { - return Err(io::Error::new( - io::ErrorKind::Unsupported, - "can't seek backwards relative to current position", - )); - } else { - self.pos + relative_offset as u64 - } - } - }; - - // we already know absolute_offset is >= self.pos - debug_assert!( - absolute_offset >= self.pos, - "absolute_offset {} must be >= self.pos {}", - absolute_offset, - self.pos - ); - - // calculate bytes to skip - let this = self.project(); - *this.bytes_to_skip = absolute_offset - *this.pos; - - debug!(bytes_to_skip = *this.bytes_to_skip, "seek"); - - Ok(()) - } - - #[instrument(skip_all)] - fn poll_complete( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll<std::io::Result<u64>> { - if self.bytes_to_skip == 0 { - // return the new position (from the start of the stream) - return Poll::Ready(Ok(self.pos)); - } - - // discard some bytes, until pos is where we want it to be. - // We create a buffer that we'll discard later on. - let mut discard_buf = [0; DISCARD_BUF_SIZE]; - - // Loop until we've reached the desired seek position. This is done by issuing repeated - // `poll_read` calls. - // If the data is not available yet, we will yield back to the executor - // and wait to be polled again. - loop { - if self.bytes_to_skip == 0 { - return Poll::Ready(Ok(self.pos)); - } - - // calculate the length we want to skip at most, which is either a max - // buffer size, or the number of remaining bytes to read, whatever is - // smaller. - let bytes_to_skip_now = std::cmp::min(self.bytes_to_skip as usize, discard_buf.len()); - let mut discard_buf = tokio::io::ReadBuf::new(&mut discard_buf[..bytes_to_skip_now]); - - ready!(self.as_mut().poll_read(cx, &mut discard_buf))?; - let bytes_skipped = discard_buf.filled().len(); - - if bytes_skipped == 0 { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "got EOF while trying to skip bytes", - ))); - } - // decrement bytes to skip. The poll_read call already updated self.pos. - *self.as_mut().project().bytes_to_skip -= bytes_skipped as u64; - } - } -} - -impl<R: tokio::io::AsyncRead + Send + Unpin + 'static> BlobReader for NaiveSeeker<R> {} - -#[cfg(test)] -mod tests { - use super::{NaiveSeeker, DISCARD_BUF_SIZE}; - use std::io::{Cursor, SeekFrom}; - use tokio::io::{AsyncReadExt, AsyncSeekExt}; - - /// This seek requires multiple `poll_read` as we use a multiples of - /// DISCARD_BUF_SIZE when doing the seek. - /// This ensures we don't hang indefinitely. - #[tokio::test] - async fn seek() { - let buf = vec![0u8; DISCARD_BUF_SIZE * 4]; - let reader = Cursor::new(&buf); - let mut seeker = NaiveSeeker::new(reader); - seeker.seek(SeekFrom::Start(4000)).await.unwrap(); - } - - #[tokio::test] - async fn seek_read() { - let mut buf = vec![0u8; DISCARD_BUF_SIZE * 2]; - buf.extend_from_slice(&[1u8; DISCARD_BUF_SIZE * 2]); - buf.extend_from_slice(&[2u8; DISCARD_BUF_SIZE * 2]); - - let reader = Cursor::new(&buf); - let mut seeker = NaiveSeeker::new(reader); - - let mut read_buf = vec![0u8; DISCARD_BUF_SIZE]; - seeker.read_exact(&mut read_buf).await.expect("must read"); - assert_eq!(read_buf.as_slice(), &[0u8; DISCARD_BUF_SIZE]); - - seeker - .seek(SeekFrom::Current(DISCARD_BUF_SIZE as i64)) - .await - .expect("must seek"); - seeker.read_exact(&mut read_buf).await.expect("must read"); - assert_eq!(read_buf.as_slice(), &[1u8; DISCARD_BUF_SIZE]); - - seeker - .seek(SeekFrom::Start(2 * 2 * DISCARD_BUF_SIZE as u64)) - .await - .expect("must seek"); - seeker.read_exact(&mut read_buf).await.expect("must read"); - assert_eq!(read_buf.as_slice(), &[2u8; DISCARD_BUF_SIZE]); - } -} diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index d2d0a288a557..10874af64011 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, io::{self, Cursor}, pin::pin, sync::Arc, @@ -18,22 +19,13 @@ use tracing::{debug, instrument, trace, Level}; use url::Url; use crate::{ + composition::{CompositionContext, ServiceBuilder}, proto::{stat_blob_response::ChunkMeta, StatBlobResponse}, - B3Digest, B3HashingReader, + B3Digest, B3HashingReader, Error, }; use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; -#[derive(Clone)] -pub struct ObjectStoreBlobService { - object_store: Arc<dyn ObjectStore>, - base_path: Path, - - /// Average chunk size for FastCDC, in bytes. - /// min value is half, max value double of that number. - avg_chunk_size: u32, -} - /// Uses any object storage supported by the [object_store] crate to provide a /// tvix-castore [BlobService]. /// @@ -70,31 +62,15 @@ pub struct ObjectStoreBlobService { /// It also allows signalling any compression of chunks in the content-type. /// Migration *should* be possible by simply adding the right content-types to /// all keys stored so far, but no promises ;-) -impl ObjectStoreBlobService { - /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by - /// [object_store]. - /// Any path suffix becomes the base path of the object store. - /// additional options, the same as in [object_store::parse_url_opts] can - /// be passed. - pub fn parse_url_opts<I, K, V>(url: &Url, options: I) -> Result<Self, object_store::Error> - where - I: IntoIterator<Item = (K, V)>, - K: AsRef<str>, - V: Into<String>, - { - let (object_store, path) = object_store::parse_url_opts(url, options)?; - - Ok(Self { - object_store: Arc::new(object_store), - base_path: path, - avg_chunk_size: 256 * 1024, - }) - } +#[derive(Clone)] +pub struct ObjectStoreBlobService { + instance_name: String, + object_store: Arc<dyn ObjectStore>, + base_path: Path, - /// Like [Self::parse_url_opts], except without the options. - pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> { - Self::parse_url_opts(url, Vec::<(String, String)>::new()) - } + /// Average chunk size for FastCDC, in bytes. + /// min value is half, max value double of that number. + avg_chunk_size: u32, } #[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))] @@ -117,7 +93,7 @@ fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path { #[async_trait] impl BlobService for ObjectStoreBlobService { - #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { // TODO: clarify if this should work for chunks or not, and explicitly // document in the proto docs. @@ -137,7 +113,7 @@ impl BlobService for ObjectStoreBlobService { } } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // handle reading the empty blob. if digest.as_slice() == blake3::hash(b"").as_bytes() { @@ -194,7 +170,7 @@ impl BlobService for ObjectStoreBlobService { } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // ObjectStoreBlobWriter implements AsyncWrite, but all the chunking // needs an AsyncRead, so we create a pipe here. @@ -217,7 +193,7 @@ impl BlobService for ObjectStoreBlobService { }) } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { match self .object_store @@ -269,6 +245,72 @@ impl BlobService for ObjectStoreBlobService { } } +fn default_avg_chunk_size() -> u32 { + 256 * 1024 +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectStoreBlobServiceConfig { + object_store_url: String, + #[serde(default = "default_avg_chunk_size")] + avg_chunk_size: u32, + object_store_options: HashMap<String, String>, +} + +impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// [object_store]. + /// Any path suffix becomes the base path of the object store. + /// additional options, the same as in [object_store::parse_url_opts] can + /// be passed. + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // We need to convert the URL to string, strip the prefix there, and then + // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. + let trimmed_url = { + let s = url.to_string(); + let mut url = Url::parse( + s.strip_prefix("objectstore+") + .ok_or(Error::StorageError("Missing objectstore uri".into()))?, + )?; + // trim the query pairs, they might contain credentials or local settings we don't want to send as-is. + url.set_query(None); + url + }; + Ok(ObjectStoreBlobServiceConfig { + object_store_url: trimmed_url.into(), + object_store_options: url + .query_pairs() + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + avg_chunk_size: 256 * 1024, + }) + } +} + +#[async_trait] +impl ServiceBuilder for ObjectStoreBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (object_store, path) = object_store::parse_url_opts( + &self.object_store_url.parse()?, + &self.object_store_options, + )?; + Ok(Arc::new(ObjectStoreBlobService { + instance_name: instance_name.to_string(), + object_store: Arc::new(object_store), + base_path: path, + avg_chunk_size: self.avg_chunk_size, + })) + } +} + /// Reads blob contents from a AsyncRead, chunks and uploads them. /// On success, returns a [StatBlobResponse] pointing to the individual chunks. #[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)] @@ -309,6 +351,15 @@ async fn chunk_and_upload<R: AsyncRead + Unpin>( .collect::<io::Result<Vec<ChunkMeta>>>() .await?; + let chunks = if chunks.len() < 2 { + // The chunker returned only one chunk, which is the entire blob. + // According to the protocol, we must return an empty list of chunks + // when the blob is not split up further. + vec![] + } else { + chunks + }; + let stat_blob_response = StatBlobResponse { chunks, bao: "".into(), // still todo @@ -512,24 +563,36 @@ where #[cfg(test)] mod test { - use super::chunk_and_upload; + use super::{chunk_and_upload, default_avg_chunk_size}; use crate::{ blobservice::{BlobService, ObjectStoreBlobService}, - fixtures::{BLOB_A, BLOB_A_DIGEST}, + fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST}, }; use std::{io::Cursor, sync::Arc}; use url::Url; /// Tests chunk_and_upload directly, bypassing the BlobWriter at open_write(). + #[rstest::rstest] + #[case::a(&BLOB_A, &BLOB_A_DIGEST)] + #[case::b(&BLOB_B, &BLOB_B_DIGEST)] #[tokio::test] - async fn test_chunk_and_upload() { - let blobsvc = Arc::new( - ObjectStoreBlobService::parse_url(&Url::parse("memory:///").unwrap()).unwrap(), - ); - - let blob_digest = chunk_and_upload( - &mut Cursor::new(BLOB_A.to_vec()), - blobsvc.object_store.clone(), + async fn test_chunk_and_upload( + #[case] blob: &bytes::Bytes, + #[case] blob_digest: &crate::B3Digest, + ) { + let (object_store, base_path) = + object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap(); + let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store); + let blobsvc = Arc::new(ObjectStoreBlobService { + instance_name: "test".into(), + object_store: object_store.clone(), + avg_chunk_size: default_avg_chunk_size(), + base_path, + }); + + let inserted_blob_digest = chunk_and_upload( + &mut Cursor::new(blob.to_vec()), + object_store, object_store::path::Path::from("/"), 1024 / 2, 1024, @@ -538,9 +601,20 @@ mod test { .await .expect("chunk_and_upload succeeds"); - assert_eq!(BLOB_A_DIGEST.clone(), blob_digest); + assert_eq!(blob_digest.clone(), inserted_blob_digest); // Now we should have the blob - assert!(blobsvc.has(&BLOB_A_DIGEST).await.unwrap()); + assert!(blobsvc.has(blob_digest).await.unwrap()); + + // Check if it was chunked correctly + let chunks = blobsvc.chunks(blob_digest).await.unwrap().unwrap(); + if blob.len() < 1024 / 2 { + // The blob is smaller than the min chunk size, it should have been inserted as a whole + assert!(chunks.is_empty()); + } else if blob.len() > 1024 * 2 { + // The blob is larger than the max chunk size, make sure it was split up into at least + // two chunks + assert!(chunks.len() >= 2); + } } } diff --git a/tvix/castore/src/blobservice/tests/utils.rs b/tvix/castore/src/blobservice/tests/utils.rs index 706c4b5e4319..c7e41d965504 100644 --- a/tvix/castore/src/blobservice/tests/utils.rs +++ b/tvix/castore/src/blobservice/tests/utils.rs @@ -2,6 +2,7 @@ use crate::blobservice::{BlobService, MemoryBlobService}; use crate::proto::blob_service_client::BlobServiceClient; use crate::proto::GRPCBlobServiceWrapper; use crate::{blobservice::GRPCBlobService, proto::blob_service_server::BlobServiceServer}; +use hyper_util::rt::TokioIo; use tonic::transport::{Endpoint, Server, Uri}; /// Constructs and returns a gRPC BlobService. @@ -28,14 +29,17 @@ pub async fn make_grpc_blob_service_client() -> Box<dyn BlobService> { // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); - Box::new(GRPCBlobService::from_client(BlobServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(right) } - })) - .await - .unwrap(), - ))) + Box::new(GRPCBlobService::from_client( + "root".into(), + BlobServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } + })) + .await + .unwrap(), + ), + )) } diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs new file mode 100644 index 000000000000..1cbf97e1893a --- /dev/null +++ b/tvix/castore/src/composition.rs @@ -0,0 +1,582 @@ +//! The composition module allows composing different kinds of services based on a set of service +//! configurations _at runtime_. +//! +//! Store configs are deserialized with serde. The registry provides a stateful mapping from the +//! `type` tag of an internally tagged enum on the serde side to a Config struct which is +//! deserialized and then returned as a `Box<dyn ServiceBuilder<Output = dyn BlobService>>` +//! (the same for DirectoryService instead of BlobService etc). +//! +//! ### Example 1.: Implementing a new BlobService +//! +//! You need a Config struct which implements `DeserializeOwned` and +//! `ServiceBuilder<Output = dyn BlobService>`. +//! Provide the user with a function to call with +//! their registry. You register your new type as: +//! +//! ``` +//! use std::sync::Arc; +//! +//! use tvix_castore::composition::*; +//! use tvix_castore::blobservice::BlobService; +//! +//! #[derive(serde::Deserialize)] +//! struct MyBlobServiceConfig { +//! } +//! +//! #[tonic::async_trait] +//! impl ServiceBuilder for MyBlobServiceConfig { +//! type Output = dyn BlobService; +//! async fn build(&self, _: &str, _: &CompositionContext) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> { +//! todo!() +//! } +//! } +//! +//! impl TryFrom<url::Url> for MyBlobServiceConfig { +//! type Error = Box<dyn std::error::Error + Send + Sync>; +//! fn try_from(url: url::Url) -> Result<Self, Self::Error> { +//! todo!() +//! } +//! } +//! +//! pub fn add_my_service(reg: &mut Registry) { +//! reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, MyBlobServiceConfig>("myblobservicetype"); +//! } +//! ``` +//! +//! Now, when a user deserializes a store config with the type tag "myblobservicetype" into a +//! `Box<dyn ServiceBuilder<Output = Arc<dyn BlobService>>>`, it will be done via `MyBlobServiceConfig`. +//! +//! ### Example 2.: Composing stores to get one store +//! +//! ``` +//! use std::sync::Arc; +//! use tvix_castore::composition::*; +//! use tvix_castore::blobservice::BlobService; +//! +//! # fn main() -> Result<(), Box<dyn std::error::Error>> { +//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async move { +//! let blob_services_configs_json = serde_json::json!({ +//! "blobstore1": { +//! "type": "memory" +//! }, +//! "blobstore2": { +//! "type": "memory" +//! }, +//! "root": { +//! "type": "combined", +//! "near": "blobstore1", +//! "far": "blobstore2" +//! } +//! }); +//! +//! let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json))?; +//! let mut blob_service_composition = Composition::new(®); +//! blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); +//! let blob_service: Arc<dyn BlobService> = blob_service_composition.build("root").await?; +//! # Ok(()) +//! # }) +//! # } +//! ``` +//! +//! ### Example 3.: Creating another registry extending the default registry with third-party types +//! +//! ``` +//! # pub fn add_my_service(reg: &mut tvix_castore::composition::Registry) {} +//! let mut my_registry = tvix_castore::composition::Registry::default(); +//! tvix_castore::composition::add_default_services(&mut my_registry); +//! add_my_service(&mut my_registry); +//! ``` +//! +//! Continue with Example 2, with my_registry instead of REG +//! +//! EXPERIMENTAL: If the xp-composition-url-refs feature is enabled, +//! entrypoints can also be URL strings, which are created as +//! anonymous stores. Instantiations of the same URL will +//! result in a new, distinct anonymous store each time, so creating +//! two `memory://` stores with this method will not share the same view. +//! This behavior might change in the future. + +use erased_serde::deserialize; +use futures::future::BoxFuture; +use futures::FutureExt; +use serde::de::DeserializeOwned; +use serde_tagged::de::{BoxFnSeed, SeedFactory}; +use serde_tagged::util::TagString; +use std::any::{Any, TypeId}; +use std::cell::Cell; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::{Arc, LazyLock}; +use tonic::async_trait; + +/// Resolves tag names to the corresponding Config type. +// Registry implementation details: +// This is really ugly. Really we would want to store this as a generic static field: +// +// ``` +// struct Registry<T>(BTreeMap<(&'static str), RegistryEntry<T>); +// static REG<T>: Registry<T>; +// ``` +// +// so that one version of the static is generated for each Type that the registry is accessed for. +// However, this is not possible, because generics are only a thing in functions, and even there +// they will not interact with static items: +// https://doc.rust-lang.org/reference/items/static-items.html#statics--generics +// +// So instead, we make this lookup at runtime by putting the TypeId into the key. +// But now we can no longer store the `BoxFnSeed<T>` because we are lacking the generic parameter +// T, so instead store it as `Box<dyn Any>` and downcast to `&BoxFnSeed<T>` when performing the +// lookup. +// I said it was ugly... +#[derive(Default)] +pub struct Registry(BTreeMap<(TypeId, &'static str), Box<dyn Any + Sync>>); +pub type FromUrlSeed<T> = + Box<dyn Fn(url::Url) -> Result<T, Box<dyn std::error::Error + Send + Sync>> + Sync>; +pub struct RegistryEntry<T> { + serde_deserialize_seed: BoxFnSeed<DeserializeWithRegistry<T>>, + from_url_seed: FromUrlSeed<DeserializeWithRegistry<T>>, +} + +struct RegistryWithFakeType<'r, T>(&'r Registry, PhantomData<T>); + +impl<'r, 'de: 'r, T: 'static> SeedFactory<'de, TagString<'de>> for RegistryWithFakeType<'r, T> { + type Value = DeserializeWithRegistry<T>; + type Seed = &'r BoxFnSeed<Self::Value>; + + // Required method + fn seed<E>(self, tag: TagString<'de>) -> Result<Self::Seed, E> + where + E: serde::de::Error, + { + // using find() and not get() because of https://github.com/rust-lang/rust/issues/80389 + let seed: &Box<dyn Any + Sync> = self + .0 + .0 + .iter() + .find(|(k, _)| *k == &(TypeId::of::<T>(), tag.as_ref())) + .ok_or_else(|| serde::de::Error::custom(format!("Unknown type: {}", tag)))? + .1; + + let entry: &RegistryEntry<T> = <dyn Any>::downcast_ref(&**seed).unwrap(); + + Ok(&entry.serde_deserialize_seed) + } +} + +/// Wrapper type which implements Deserialize using the registry +/// +/// Wrap your type in this in order to deserialize it using a registry, e.g. +/// `RegistryWithFakeType<Box<dyn MyTrait>>`, then the types registered for `Box<dyn MyTrait>` +/// will be used. +pub struct DeserializeWithRegistry<T>(pub T); + +impl Registry { + /// Registers a mapping from type tag to a concrete type into the registry. + /// + /// The type parameters are very important: + /// After calling `register::<Box<dyn FooTrait>, FooStruct>("footype")`, when a user + /// deserializes into an input with the type tag "myblobservicetype" into a + /// `Box<dyn FooTrait>`, it will first call the Deserialize imple of `FooStruct` and + /// then convert it into a `Box<dyn FooTrait>` using From::from. + pub fn register< + T: 'static, + C: DeserializeOwned + + TryFrom<url::Url, Error = Box<dyn std::error::Error + Send + Sync>> + + Into<T>, + >( + &mut self, + type_name: &'static str, + ) { + self.0.insert( + (TypeId::of::<T>(), type_name), + Box::new(RegistryEntry { + serde_deserialize_seed: BoxFnSeed::new(|x| { + deserialize::<C>(x) + .map(Into::into) + .map(DeserializeWithRegistry) + }), + from_url_seed: Box::new(|url| { + C::try_from(url) + .map(Into::into) + .map(DeserializeWithRegistry) + }), + }), + ); + } +} + +impl<'de, T: 'static> serde::Deserialize<'de> for DeserializeWithRegistry<T> { + fn deserialize<D>(de: D) -> std::result::Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + serde_tagged::de::internal::deserialize( + de, + "type", + RegistryWithFakeType(ACTIVE_REG.get().unwrap(), PhantomData::<T>), + ) + } +} + +#[derive(Debug, thiserror::Error)] +enum TryFromUrlError { + #[error("Unknown type: {0}")] + UnknownTag(String), +} + +impl<T: 'static> TryFrom<url::Url> for DeserializeWithRegistry<T> { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + let tag = url.scheme().split('+').next().unwrap(); + // same as in the SeedFactory impl: using find() and not get() because of https://github.com/rust-lang/rust/issues/80389 + let seed = ACTIVE_REG + .get() + .unwrap() + .0 + .iter() + .find(|(k, _)| *k == &(TypeId::of::<T>(), tag)) + .ok_or_else(|| Box::new(TryFromUrlError::UnknownTag(tag.into())))? + .1; + let entry: &RegistryEntry<T> = <dyn Any>::downcast_ref(&**seed).unwrap(); + (entry.from_url_seed)(url) + } +} + +thread_local! { + /// The active Registry is global state, because there is no convenient and universal way to pass state + /// into the functions usually used for deserialization, e.g. `serde_json::from_str`, `toml::from_str`, + /// `serde_qs::from_str`. + static ACTIVE_REG: Cell<Option<&'static Registry>> = panic!("reg was accessed before initialization"); +} + +/// Run the provided closure with a registry context. +/// Any serde deserialize calls within the closure will use the registry to resolve tag names to +/// the corresponding Config type. +pub fn with_registry<R>(reg: &'static Registry, f: impl FnOnce() -> R) -> R { + ACTIVE_REG.set(Some(reg)); + let result = f(); + ACTIVE_REG.set(None); + result +} + +/// The provided registry of tvix_castore, with all builtin BlobStore/DirectoryStore implementations +pub static REG: LazyLock<&'static Registry> = LazyLock::new(|| { + let mut reg = Default::default(); + add_default_services(&mut reg); + // explicitly leak to get an &'static, so that we gain `&Registry: Send` from `Registry: Sync` + Box::leak(Box::new(reg)) +}); + +// ---------- End of generic registry code --------- // + +/// Register the builtin services of tvix_castore (blob services and directory +/// services) with the given registry. +/// This can be used outside to create your own registry with the builtin types +/// _and_ extra third party types. +pub fn add_default_services(reg: &mut Registry) { + crate::blobservice::register_blob_services(reg); + crate::directoryservice::register_directory_services(reg); +} + +pub struct CompositionContext<'a> { + // The stack used to detect recursive instantiations and prevent deadlocks + // The TypeId of the trait object is included to distinguish e.g. the + // BlobService "root" and the DirectoryService "root". + stack: Vec<(TypeId, String)>, + registry: &'static Registry, + composition: Option<&'a Composition>, +} + +impl<'a> CompositionContext<'a> { + /// Get a composition context for one-off store creation. + pub fn blank(registry: &'static Registry) -> Self { + Self { + registry, + stack: Default::default(), + composition: None, + } + } + + pub async fn resolve<T: ?Sized + Send + Sync + 'static>( + &self, + entrypoint: String, + ) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync + 'static>> { + // disallow recursion + if self + .stack + .contains(&(TypeId::of::<T>(), entrypoint.clone())) + { + return Err(CompositionError::Recursion( + self.stack.iter().map(|(_, n)| n.clone()).collect(), + ) + .into()); + } + + Ok(self.build_internal(entrypoint).await?) + } + + #[cfg(feature = "xp-composition-url-refs")] + async fn build_anonymous<T: ?Sized + Send + Sync + 'static>( + &self, + entrypoint: String, + ) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync>> { + let url = url::Url::parse(&entrypoint)?; + let config: DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>> = + with_registry(self.registry, || url.try_into())?; + config.0.build("anonymous", self).await + } + + fn build_internal<T: ?Sized + Send + Sync + 'static>( + &self, + entrypoint: String, + ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> { + #[cfg(feature = "xp-composition-url-refs")] + if entrypoint.contains("://") { + // There is a chance this is a url. we are building an anonymous store + return Box::pin(async move { + self.build_anonymous(entrypoint.clone()) + .await + .map_err(|e| CompositionError::Failed(entrypoint, Arc::from(e))) + }); + } + + let mut stores = match self.composition { + Some(comp) => comp.stores.lock().unwrap(), + None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))), + }; + let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) { + Some(v) => v, + None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))), + }; + // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what + // the new value should be. the Mutex stays locked the entire time, so nobody will ever see + // this temporary value. + let prev_val = std::mem::replace( + entry, + Box::new(InstantiationState::<T>::Done(Err( + CompositionError::Poisoned(entrypoint.clone()), + ))), + ); + let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() { + InstantiationState::Done(service) => ( + InstantiationState::Done(service.clone()), + futures::future::ready(service).boxed(), + ), + // the construction of the store has not started yet. + InstantiationState::Config(config) => { + let (tx, rx) = tokio::sync::watch::channel(None); + ( + InstantiationState::InProgress(rx), + (async move { + let mut new_context = CompositionContext { + composition: self.composition, + registry: self.registry, + stack: self.stack.clone(), + }; + new_context + .stack + .push((TypeId::of::<T>(), entrypoint.clone())); + let res = + config.build(&entrypoint, &new_context).await.map_err(|e| { + match e.downcast() { + Ok(e) => *e, + Err(e) => CompositionError::Failed(entrypoint, e.into()), + } + }); + tx.send(Some(res.clone())).unwrap(); + res + }) + .boxed(), + ) + } + // there is already a task driving forward the construction of this store, wait for it + // to notify us via the provided channel + InstantiationState::InProgress(mut recv) => { + (InstantiationState::InProgress(recv.clone()), { + (async move { + loop { + if let Some(v) = + recv.borrow_and_update().as_ref().map(|res| res.clone()) + { + break v; + } + recv.changed().await.unwrap(); + } + }) + .boxed() + }) + } + }; + *entry = Box::new(new_val); + ret + } +} + +#[async_trait] +/// This is the trait usually implemented on a per-store-type Config struct and +/// used to instantiate it. +pub trait ServiceBuilder: Send + Sync { + type Output: ?Sized; + async fn build( + &self, + instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>>; +} + +impl<T: ?Sized, S: ServiceBuilder<Output = T> + 'static> From<S> + for Box<dyn ServiceBuilder<Output = T>> +{ + fn from(t: S) -> Self { + Box::new(t) + } +} + +enum InstantiationState<T: ?Sized> { + Config(Box<dyn ServiceBuilder<Output = T>>), + InProgress(tokio::sync::watch::Receiver<Option<Result<Arc<T>, CompositionError>>>), + Done(Result<Arc<T>, CompositionError>), +} + +pub struct Composition { + registry: &'static Registry, + stores: std::sync::Mutex<HashMap<(TypeId, String), Box<dyn Any + Send + Sync>>>, +} + +#[derive(thiserror::Error, Clone, Debug)] +pub enum CompositionError { + #[error("store not found: {0}")] + NotFound(String), + #[error("recursion not allowed {0:?}")] + Recursion(Vec<String>), + #[error("store construction panicked {0}")] + Poisoned(String), + #[error("instantiation of service {0} failed: {1}")] + Failed(String, Arc<dyn std::error::Error + Send + Sync>), +} + +impl<T: ?Sized + Send + Sync + 'static> + Extend<( + String, + DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>, + )> for Composition +{ + fn extend<I>(&mut self, configs: I) + where + I: IntoIterator< + Item = ( + String, + DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>, + ), + >, + { + self.stores + .lock() + .unwrap() + .extend(configs.into_iter().map(|(k, v)| { + ( + (TypeId::of::<T>(), k), + Box::new(InstantiationState::Config(v.0)) as Box<dyn Any + Send + Sync>, + ) + })) + } +} + +impl Composition { + /// The given registry will be used for creation of anonymous stores during composition + pub fn new(registry: &'static Registry) -> Self { + Self { + registry, + stores: Default::default(), + } + } + + pub fn extend_with_configs<T: ?Sized + Send + Sync + 'static>( + &mut self, + // Keep the concrete `HashMap` type here since it allows for type + // inference of what type is previously being deserialized. + configs: HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>>, + ) { + self.extend(configs); + } + + /// Looks up the entrypoint name in the composition and returns an instantiated service. + pub async fn build<T: ?Sized + Send + Sync + 'static>( + &self, + entrypoint: &str, + ) -> Result<Arc<T>, CompositionError> { + self.context().build_internal(entrypoint.to_string()).await + } + + pub fn context(&self) -> CompositionContext { + CompositionContext { + registry: self.registry, + stack: vec![], + composition: Some(self), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::blobservice::BlobService; + use std::sync::Arc; + + /// Test that we return a reference to the same instance of MemoryBlobService (via ptr_eq) + /// when instantiating the same entrypoint twice. By instantiating concurrently, we also + /// test the channels notifying the second consumer when the store has been instantiated. + #[tokio::test] + async fn concurrent() { + let blob_services_configs_json = serde_json::json!({ + "root": { + "type": "memory", + } + }); + + let blob_services_configs = + with_registry(®, || serde_json::from_value(blob_services_configs_json)).unwrap(); + let mut blob_service_composition = Composition::new(®); + blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); + let (blob_service1, blob_service2) = tokio::join!( + blob_service_composition.build::<dyn BlobService>("root"), + blob_service_composition.build::<dyn BlobService>("root") + ); + assert!(Arc::ptr_eq( + &blob_service1.unwrap(), + &blob_service2.unwrap() + )); + } + + /// Test that we throw the correct error when an instantiation would recurse (deadlock) + #[tokio::test] + async fn reject_recursion() { + let blob_services_configs_json = serde_json::json!({ + "root": { + "type": "combined", + "near": "other", + "far": "other" + }, + "other": { + "type": "combined", + "near": "root", + "far": "root" + } + }); + + let blob_services_configs = + with_registry(®, || serde_json::from_value(blob_services_configs_json)).unwrap(); + let mut blob_service_composition = Composition::new(®); + blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); + match blob_service_composition + .build::<dyn BlobService>("root") + .await + { + Err(CompositionError::Recursion(stack)) => { + assert_eq!(stack, vec!["root".to_string(), "other".to_string()]) + } + other => panic!("should have returned an error, returned: {:?}", other.err()), + } + } +} diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs index 2311c95c4ddc..6c91045828be 100644 --- a/tvix/castore/src/digests.rs +++ b/tvix/castore/src/digests.rs @@ -2,18 +2,18 @@ use bytes::Bytes; use data_encoding::BASE64; use thiserror::Error; +pub const B3_LEN: usize = blake3::OUT_LEN; + #[derive(PartialEq, Eq, Hash)] -pub struct B3Digest(Bytes); +pub struct B3Digest([u8; B3_LEN]); // TODO: allow converting these errors to crate::Error -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq)] pub enum Error { #[error("invalid digest length: {0}")] InvalidDigestLen(usize), } -pub const B3_LEN: usize = 32; - impl B3Digest { pub fn as_slice(&self) -> &[u8] { &self.0[..] @@ -22,48 +22,60 @@ impl B3Digest { impl From<B3Digest> for bytes::Bytes { fn from(val: B3Digest) -> Self { - val.0 + Bytes::copy_from_slice(&val.0) } } +impl From<blake3::Hash> for B3Digest { + fn from(value: blake3::Hash) -> Self { + Self(*value.as_bytes()) + } +} impl From<digest::Output<blake3::Hasher>> for B3Digest { fn from(value: digest::Output<blake3::Hasher>) -> Self { - let v = Into::<[u8; B3_LEN]>::into(value); - Self(Bytes::copy_from_slice(&v)) + Self(value.into()) } } -impl TryFrom<Vec<u8>> for B3Digest { +impl TryFrom<&[u8]> for B3Digest { type Error = Error; - // constructs a [B3Digest] from a [Vec<u8>]. + // constructs a [B3Digest] from a &[u8]. // Returns an error if the digest has the wrong length. - fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> { - if value.len() != B3_LEN { - Err(Error::InvalidDigestLen(value.len())) - } else { - Ok(Self(value.into())) - } + fn try_from(value: &[u8]) -> Result<Self, Self::Error> { + Ok(Self( + value + .try_into() + .map_err(|_e| Error::InvalidDigestLen(value.len()))?, + )) } } impl TryFrom<bytes::Bytes> for B3Digest { type Error = Error; - // constructs a [B3Digest] from a [bytes::Bytes]. - // Returns an error if the digest has the wrong length. fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> { - if value.len() != B3_LEN { - Err(Error::InvalidDigestLen(value.len())) - } else { - Ok(Self(value)) - } + value[..].try_into() + } +} + +impl TryFrom<Vec<u8>> for B3Digest { + type Error = Error; + + fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> { + value[..].try_into() } } impl From<&[u8; B3_LEN]> for B3Digest { fn from(value: &[u8; B3_LEN]) -> Self { - Self(value.to_vec().into()) + Self(*value) + } +} + +impl From<B3Digest> for [u8; B3_LEN] { + fn from(value: B3Digest) -> Self { + value.0 } } diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index 1194c6ddc999..7473481c94b5 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -5,10 +5,14 @@ use futures::stream::BoxStream; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; +use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace, warn}; -use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter}; +use super::{ + utils::traverse_directory, Directory, DirectoryPutter, DirectoryService, SimplePutter, +}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; /// There should not be more than 10 MiB in a single cell. @@ -33,6 +37,7 @@ const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; /// directly at the root, so rely on store composition. #[derive(Clone)] pub struct BigtableDirectoryService { + instance_name: String, client: bigtable::BigTable, params: BigtableParameters, @@ -43,44 +48,12 @@ pub struct BigtableDirectoryService { emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, } -/// Represents configuration of [BigtableDirectoryService]. -/// This currently conflates both connect parameters and data model/client -/// behaviour parameters. -#[serde_as] -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct BigtableParameters { - project_id: String, - instance_name: String, - #[serde(default)] - is_read_only: bool, - #[serde(default = "default_channel_size")] - channel_size: usize, - - #[serde_as(as = "Option<DurationSeconds<String>>")] - #[serde(default = "default_timeout")] - timeout: Option<std::time::Duration>, - table_name: String, - family_name: String, - - #[serde(default = "default_app_profile_id")] - app_profile_id: String, -} - -fn default_app_profile_id() -> String { - "default".to_owned() -} - -fn default_channel_size() -> usize { - 4 -} - -fn default_timeout() -> Option<std::time::Duration> { - Some(std::time::Duration::from_secs(4)) -} - impl BigtableDirectoryService { #[cfg(not(test))] - pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + pub async fn connect( + instance_name: String, + params: BigtableParameters, + ) -> Result<Self, bigtable::Error> { let connection = bigtable::BigTableConnection::new( ¶ms.project_id, ¶ms.instance_name, @@ -91,13 +64,17 @@ impl BigtableDirectoryService { .await?; Ok(Self { + instance_name, client: connection.client(), params, }) } #[cfg(test)] - pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + pub async fn connect( + instance_name: String, + params: BigtableParameters, + ) -> Result<Self, bigtable::Error> { use std::time::Duration; use async_process::{Command, Stdio}; @@ -166,6 +143,7 @@ impl BigtableDirectoryService { )?; Ok(Self { + instance_name, client: connection.client(), params, emulator: (tmpdir, emulator_process).into(), @@ -181,8 +159,8 @@ fn derive_directory_key(digest: &B3Digest) -> String { #[async_trait] impl DirectoryService for BigtableDirectoryService { - #[instrument(skip(self, digest), err, fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let mut client = self.client.clone(); let directory_key = derive_directory_key(digest); @@ -274,28 +252,20 @@ impl DirectoryService for BigtableDirectoryService { // Try to parse the value into a Directory message. let directory = proto::Directory::decode(Bytes::from(row_cell.value)) - .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?; - - // validate the Directory. - directory - .validate() + .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))? + .try_into() .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?; Ok(Some(directory)) } - #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let directory_digest = directory.digest(); let mut client = self.client.clone(); let directory_key = derive_directory_key(&directory_digest); - // Ensure the directory we're trying to upload passes validation - directory - .validate() - .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?; - - let data = directory.encode_to_vec(); + let data = proto::Directory::from(directory).encode_to_vec(); if data.len() as u64 > CELL_SIZE_LIMIT { return Err(Error::StorageError( "Directory exceeds cell limit on Bigtable".into(), @@ -339,15 +309,15 @@ impl DirectoryService for BigtableDirectoryService { Ok(directory_digest) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, @@ -355,3 +325,73 @@ impl DirectoryService for BigtableDirectoryService { Box::new(SimplePutter::new(self.clone())) } } + +/// Represents configuration of [BigtableDirectoryService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option<DurationSeconds<String>>")] + #[serde(default = "default_timeout")] + timeout: Option<std::time::Duration>, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +#[async_trait] +impl ServiceBuilder for BigtableParameters { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> { + Ok(Arc::new( + BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?, + )) + } +} + +impl TryFrom<url::Url> for BigtableParameters { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(mut url: url::Url) -> Result<Self, Self::Error> { + // parse the instance name from the hostname. + let instance_name = url + .host_str() + .ok_or_else(|| Error::StorageError("instance name missing".into()))? + .to_string(); + + // โฆ but add it to the query string now, so we just need to parse that. + url.query_pairs_mut() + .append_pair("instance_name", &instance_name); + + let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + + Ok(params) + } +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option<std::time::Duration> { + Some(std::time::Duration::from_secs(4)) +} diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index d3f351d6b689..450c642715ac 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use futures::stream::BoxStream; use futures::StreamExt; use futures::TryFutureExt; @@ -5,9 +7,9 @@ use futures::TryStreamExt; use tonic::async_trait; use tracing::{instrument, trace}; -use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; +use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::directoryservice::DirectoryPutter; -use crate::proto; use crate::B3Digest; use crate::Error; @@ -20,13 +22,18 @@ use crate::Error; /// Inserts and listings are not implemented for now. #[derive(Clone)] pub struct Cache<DS1, DS2> { + instance_name: String, near: DS1, far: DS2, } impl<DS1, DS2> Cache<DS1, DS2> { - pub fn new(near: DS1, far: DS2) -> Self { - Self { near, far } + pub fn new(instance_name: String, near: DS1, far: DS2) -> Self { + Self { + instance_name, + near, + far, + } } } @@ -36,8 +43,8 @@ where DS1: DirectoryService + Clone + 'static, DS2: DirectoryService + Clone + 'static, { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { match self.near.get(digest).await? { Some(directory) => { trace!("serving from cache"); @@ -78,16 +85,16 @@ where } } - #[instrument(skip_all)] - async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> { + #[instrument(skip_all, fields(instance_name = %self.instance_name))] + async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> { Err(Error::StorageError("unimplemented".to_string())) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let near = self.near.clone(); let far = self.far.clone(); let digest = root_directory_digest.clone(); @@ -140,3 +147,41 @@ where Box::new(SimplePutter::new((*self).clone())) } } + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct CacheConfig { + near: String, + far: String, +} + +impl TryFrom<url::Url> for CacheConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // cache doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string()).into()); + } + Ok(serde_qs::from_str(url.query().unwrap_or_default())?) + } +} + +#[async_trait] +impl ServiceBuilder for CacheConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (near, far) = futures::join!( + context.resolve::<Self::Output>(self.near.clone()), + context.resolve::<Self::Output>(self.far.clone()) + ); + Ok(Arc::new(Cache { + instance_name: instance_name.to_string(), + near: near?, + far: far?, + })) + } +} diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs index e6b9b163370c..54f3cb3e9353 100644 --- a/tvix/castore/src/directoryservice/directory_graph.rs +++ b/tvix/castore/src/directoryservice/directory_graph.rs @@ -1,7 +1,5 @@ use std::collections::HashMap; -use bstr::ByteSlice; - use petgraph::{ graph::{DiGraph, NodeIndex}, visit::{Bfs, DfsPostOrder, EdgeRef, IntoNodeIdentifiers, Walker}, @@ -10,10 +8,7 @@ use petgraph::{ use tracing::instrument; use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; -use crate::{ - proto::{self, Directory, DirectoryNode}, - B3Digest, -}; +use crate::{path::PathComponent, B3Digest, Directory, Node}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -21,6 +16,11 @@ pub enum Error { ValidationError(String), } +struct EdgeWeight { + name: PathComponent, + size: u64, +} + /// This can be used to validate and/or re-order a Directory closure (DAG of /// connected Directories), and their insertion order. /// @@ -58,7 +58,7 @@ pub struct DirectoryGraph<O> { // // The option in the edge weight tracks the pending validation state of the respective edge, for example if // the child has not been added yet. - graph: DiGraph<Option<Directory>, Option<DirectoryNode>>, + graph: DiGraph<Option<Directory>, Option<EdgeWeight>>, // A lookup table from directory digest to node index. digest_to_node_ix: HashMap<B3Digest, NodeIndex>, @@ -67,18 +67,18 @@ pub struct DirectoryGraph<O> { } pub struct ValidatedDirectoryGraph { - graph: DiGraph<Option<Directory>, Option<DirectoryNode>>, + graph: DiGraph<Option<Directory>, Option<EdgeWeight>>, root: Option<NodeIndex>, } -fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> { +fn check_edge(edge: &EdgeWeight, child: &Directory) -> Result<(), Error> { // Ensure the size specified in the child node matches our records. - if dir.size != child.size() { + if edge.size != child.size() { return Err(Error::ValidationError(format!( "'{}' has wrong size, specified {}, recorded {}", - dir.name.as_bstr(), - dir.size, + edge.name, + edge.size, child.size(), ))); } @@ -88,7 +88,7 @@ fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> { impl DirectoryGraph<LeavesToRootValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { if !self.order_validator.add_directory(&directory) { return Err(Error::ValidationError( "unknown directory was referenced".into(), @@ -108,7 +108,7 @@ impl DirectoryGraph<RootToLeavesValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); if !self.order_validator.digest_allowed(&digest) { return Err(Error::ValidationError("unexpected digest".into())); @@ -129,12 +129,7 @@ impl<O: OrderValidator> DirectoryGraph<O> { } /// Adds a directory which has already been confirmed to be in-order to the graph - pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> { - // Do some basic validation - directory - .validate() - .map_err(|e| Error::ValidationError(e.to_string()))?; - + pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); // Teach the graph about the existence of a node with this digest @@ -149,23 +144,32 @@ impl<O: OrderValidator> DirectoryGraph<O> { } // set up edges to all child directories - for subdir in &directory.directories { - let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap(); - - let child_ix = *self - .digest_to_node_ix - .entry(subdir_digest) - .or_insert_with(|| self.graph.add_node(None)); - - let pending_edge_check = match &self.graph[child_ix] { - Some(child) => { - // child is already available, validate the edge now - check_edge(subdir, child)?; - None - } - None => Some(subdir.clone()), // pending validation - }; - self.graph.add_edge(ix, child_ix, pending_edge_check); + for (name, node) in directory.nodes() { + if let Node::Directory { digest, size } = node { + let child_ix = *self + .digest_to_node_ix + .entry(digest.clone()) + .or_insert_with(|| self.graph.add_node(None)); + + let pending_edge_check = match &self.graph[child_ix] { + Some(child) => { + // child is already available, validate the edge now + check_edge( + &EdgeWeight { + name: name.clone(), + size: *size, + }, + child, + )?; + None + } + None => Some(EdgeWeight { + name: name.clone(), + size: *size, + }), // pending validation + }; + self.graph.add_edge(ix, child_ix, pending_edge_check); + } } // validate the edges from parents to this node @@ -183,6 +187,7 @@ impl<O: OrderValidator> DirectoryGraph<O> { .expect("edge not found") .take() .expect("edge is already validated"); + check_edge(&edge_weight, &directory)?; } @@ -269,33 +274,23 @@ impl ValidatedDirectoryGraph { #[cfg(test)] mod tests { - use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, - proto::{self, Directory}, - }; - use lazy_static::lazy_static; + use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; + use crate::{Directory, Node}; use rstest::rstest; + use std::sync::LazyLock; - lazy_static! { - pub static ref BROKEN_DIRECTORY : Directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // invalid name! - target: "doesntmatter".into(), - }], - ..Default::default() - }; + use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; - pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), + pub static BROKEN_PARENT_DIRECTORY: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([( + "foo".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() - }; - } - - use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; + }, + )]) + .unwrap() + }); #[rstest] /// Uploading an empty directory should succeed. @@ -312,8 +307,6 @@ mod tests { #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] /// Uploading B (referring to A) should fail immediately, because A was never uploaded. #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] - /// Uploading a directory failing validation should fail immediately. - #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] /// Uploading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] fn test_uploads( @@ -366,8 +359,6 @@ mod tests { #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)] /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)] - /// Downloading a directory failing validation should fail immediately. - #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)] /// Downloading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)] fn test_downloads( diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index ee675ca68a9f..d50b5a35dd03 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -1,142 +1,62 @@ -use url::Url; +use std::sync::Arc; -use crate::{proto::directory_service_client::DirectoryServiceClient, Error}; +use url::Url; -use super::{ - DirectoryService, GRPCDirectoryService, MemoryDirectoryService, ObjectStoreDirectoryService, - SledDirectoryService, +use crate::composition::{ + with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG, }; +use super::DirectoryService; + /// Constructs a new instance of a [DirectoryService] from an URI. /// /// The following URIs are supported: /// - `memory:` /// Uses a in-memory implementation. -/// - `sled:` -/// Uses a in-memory sled implementation. -/// - `sled:///absolute/path/to/somewhere` -/// Uses sled, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. +/// - `redb:` +/// Uses a in-memory redb implementation. +/// - `redb:///absolute/path/to/somewhere` +/// Uses redb, using a path on the disk for persistency. Can be only opened /// from one process at the same time. /// - `grpc+unix:///absolute/path/to/somewhere` /// Connects to a local tvix-store gRPC service via Unix socket. /// - `grpc+http://host:port`, `grpc+https://host:port` /// Connects to a (remote) tvix-store gRPC service. -pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Error> { +pub async fn from_addr( + uri: &str, +) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> { #[allow(unused_mut)] let mut url = Url::parse(uri) .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; - let directory_service: Box<dyn DirectoryService> = match url.scheme() { - "memory" => { - // memory doesn't support host or path in the URL. - if url.has_host() || !url.path().is_empty() { - return Err(Error::StorageError("invalid url".to_string())); - } - Box::<MemoryDirectoryService>::default() - } - "sled" => { - // sled doesn't support host, and a path can be provided (otherwise - // it'll live in memory only). - if url.has_host() { - return Err(Error::StorageError("no host allowed".to_string())); - } - - if url.path() == "/" { - return Err(Error::StorageError( - "cowardly refusing to open / with sled".to_string(), - )); - } - - // TODO: expose compression and other parameters as URL parameters? - - Box::new(if url.path().is_empty() { - SledDirectoryService::new_temporary() - .map_err(|e| Error::StorageError(e.to_string()))? - } else { - SledDirectoryService::new(url.path()) - .map_err(|e| Error::StorageError(e.to_string()))? - }) - } - scheme if scheme.starts_with("grpc+") => { - // schemes starting with grpc+ go to the GRPCPathInfoService. - // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. - // - In the case of unix sockets, there must be a path, but may not be a host. - // - In the case of non-unix sockets, there must be a host, but no path. - // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?); - Box::new(GRPCDirectoryService::from_client(client)) - } - scheme if scheme.starts_with("objectstore+") => { - // We need to convert the URL to string, strip the prefix there, and then - // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. - let trimmed_url = { - let s = url.to_string(); - Url::parse(s.strip_prefix("objectstore+").unwrap()).unwrap() - }; - Box::new( - ObjectStoreDirectoryService::parse_url(&trimmed_url) - .map_err(|e| Error::StorageError(e.to_string()))?, - ) - } - #[cfg(feature = "cloud")] - "bigtable" => { - use super::bigtable::BigtableParameters; - use super::BigtableDirectoryService; - - // parse the instance name from the hostname. - let instance_name = url - .host_str() - .ok_or_else(|| Error::StorageError("instance name missing".into()))? - .to_string(); - - // โฆ but add it to the query string now, so we just need to parse that. - url.query_pairs_mut() - .append_pair("instance_name", &instance_name); - - let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) - .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + let directory_service_config = with_registry(®, || { + <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>>>::try_from( + url, + ) + })? + .0; + let directory_service = directory_service_config + .build("anonymous", &CompositionContext::blank(®)) + .await?; - Box::new( - BigtableDirectoryService::connect(params) - .await - .map_err(|e| Error::StorageError(e.to_string()))?, - ) - } - _ => { - return Err(crate::Error::StorageError(format!( - "unknown scheme: {}", - url.scheme() - ))) - } - }; Ok(directory_service) } #[cfg(test)] mod tests { + use std::sync::LazyLock; + use super::from_addr; - use lazy_static::lazy_static; use rstest::rstest; use tempfile::TempDir; - lazy_static! { - static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); - static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); - } + static TMPDIR_REDB_1: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap()); + static TMPDIR_REDB_2: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap()); #[rstest] /// This uses an unsupported scheme. #[case::unsupported_scheme("http://foo.example/test", false)] - /// This configures sled in temporary mode. - #[case::sled_valid_temporary("sled://", true)] - /// This configures sled with /, which should fail. - #[case::sled_invalid_root("sled:///", false)] - /// This configures sled with a host, not path, which should fail. - #[case::sled_invalid_host("sled://foo.example", false)] - /// This configures sled with a valid path path, which should succeed. - #[case::sled_valid_path(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true)] - /// This configures sled with a host, and a valid path path, which should fail. - #[case::sled_invalid_host_with_valid_path(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false)] /// This correctly sets the scheme, and doesn't set a path. #[case::memory_valid("memory://", true)] /// This sets a memory url host to `foo` @@ -145,6 +65,16 @@ mod tests { #[case::memory_invalid_root_path("memory:///", false)] /// This sets a memory url path to "/foo", which is invalid. #[case::memory_invalid_root_path_foo("memory:///foo", false)] + /// This configures redb in temporary mode. + #[case::redb_valid_temporary("redb://", true)] + /// This configures redb with /, which should fail. + #[case::redb_invalid_root("redb:///", false)] + /// This configures redb with a host, not path, which should fail. + #[case::redb_invalid_host("redb://foo.example", false)] + /// This configures redb with a valid path, which should succeed. + #[case::redb_valid_path(&format!("redb://{}", &TMPDIR_REDB_1.path().join("foo").to_str().unwrap()), true)] + /// This configures redb with a host, and a valid path path, which should fail. + #[case::redb_invalid_host_with_valid_path(&format!("redb://foo.example{}", &TMPDIR_REDB_2.path().join("bar").to_str().unwrap()), false)] /// Correct scheme to connect to a unix socket. #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)] /// Correct scheme for unix socket, but setting a host too, which is invalid. @@ -157,6 +87,16 @@ mod tests { #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)] /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] + /// A valid example for store composition using anonymous urls + #[cfg_attr( + feature = "xp-composition-url-refs", + case::anonymous_url_composition("cache://?near=memory://&far=memory://", true) + )] + /// Store composition with anonymous urls should fail if the feature is disabled + #[cfg_attr( + not(feature = "xp-composition-url-refs"), + case::anonymous_url_composition("cache://?near=memory://&far=memory://", false) + )] /// A valid example for Bigtable #[cfg_attr( all(feature = "cloud", feature = "integration"), diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index fe935629bfcb..3fd177a34f28 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,44 +1,52 @@ use std::collections::HashSet; -use super::{DirectoryPutter, DirectoryService}; +use super::{Directory, DirectoryPutter, DirectoryService}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::proto::{self, get_directory_request::ByWhat}; -use crate::{B3Digest, Error}; +use crate::{B3Digest, DirectoryError, Error}; use async_stream::try_stream; use futures::stream::BoxStream; +use std::sync::Arc; use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::async_trait; -use tonic::Code; -use tonic::{transport::Channel, Status}; -use tracing::{instrument, warn}; +use tonic::{async_trait, Code, Status}; +use tracing::{instrument, warn, Instrument as _}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] -pub struct GRPCDirectoryService { +pub struct GRPCDirectoryService<T> { + instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, } -impl GRPCDirectoryService { +impl<T> GRPCDirectoryService<T> { /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + instance_name: String, + grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, ) -> Self { - Self { grpc_client } + Self { + instance_name, + grpc_client, + } } } #[async_trait] -impl DirectoryService for GRPCDirectoryService { - #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] - async fn get( - &self, - digest: &B3Digest, - ) -> Result<Option<crate::proto::Directory>, crate::Error> { +impl<T> DirectoryService for GRPCDirectoryService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ + #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest_cpy = digest.clone(); @@ -66,15 +74,10 @@ impl DirectoryService for GRPCDirectoryService { "requested directory with digest {}, but got {}", digest, actual_digest ))) - } else if let Err(e) = directory.validate() { - // Validate the Directory itself is valid. - warn!("directory failed validation: {}", e.to_string()); - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - digest, e, - ))) } else { - Ok(Some(directory)) + Ok(Some(directory.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?)) } } Ok(None) => Ok(None), @@ -83,12 +86,12 @@ impl DirectoryService for GRPCDirectoryService { } } - #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> { let resp = self .grpc_client .clone() - .put(tokio_stream::once(directory)) + .put(tokio_stream::once(proto::Directory::from(directory))) .await; match resp { @@ -103,11 +106,11 @@ impl DirectoryService for GRPCDirectoryService { } } - #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest = root_directory_digest.clone(); @@ -124,19 +127,11 @@ impl DirectoryService for GRPCDirectoryService { // The Directory digests we received so far let mut received_directory_digests: HashSet<B3Digest> = HashSet::new(); // The Directory digests we're still expecting to get sent. - let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]); + let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest.clone()]); loop { match stream.message().await { Ok(Some(directory)) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))?; - } // validate we actually expected that directory, and move it from expected to received. let directory_digest = directory.digest(); let was_expected = expected_directory_digests.remove(&directory_digest); @@ -162,14 +157,28 @@ impl DirectoryService for GRPCDirectoryService { .insert(child_directory_digest); } + let directory = directory.try_into() + .map_err(|e: DirectoryError| Error::StorageError(e.to_string()))?; + yield directory; }, + Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => { + // The root directory of the requested closure was not found, return an + // empty stream + return + } Ok(None) => { - // If we were still expecting something, that's an error. - if !expected_directory_digests.is_empty() { + // The stream has ended + let diff_len = expected_directory_digests + // Account for directories which have been referenced more than once, + // but only received once since they were deduplicated + .difference(&received_directory_digests) + .count(); + // If this is not empty, then the closure is incomplete + if diff_len != 0 { Err(crate::Error::StorageError(format!( "still expected {} directories, but got premature end of stream", - expected_directory_digests.len(), + diff_len )))? } else { return @@ -194,14 +203,17 @@ impl DirectoryService for GRPCDirectoryService { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move { - let s = grpc_client - .put(UnboundedReceiverStream::new(rx)) - .await? - .into_inner(); + let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn( + async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); - Ok(s) - }); + Ok(s) + } // instrument the task with the current span, this is not done by default + .in_current_span(), + ); Box::new(GRPCPutter { rq: Some((task, tx)), @@ -209,6 +221,43 @@ impl DirectoryService for GRPCDirectoryService { } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCDirectoryServiceConfig { + url: String, +} + +impl TryFrom<url::Url> for GRPCDirectoryServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // This is normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Ok(GRPCDirectoryServiceConfig { + url: url.to_string(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for GRPCDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::directory_service_client::DirectoryServiceClient::new( + crate::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCDirectoryService::from_client( + instance_name.to_string(), + client, + ))) + } +} + /// Allows uploading multiple Directory messages in the same gRPC stream. pub struct GRPCPutter { /// Data about the current request - a handle to the task, and the tx part @@ -225,11 +274,11 @@ pub struct GRPCPutter { #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> { match self.rq { // If we're not already closed, send the directory to directory_sender. Some((_, ref directory_sender)) => { - if directory_sender.send(directory).is_err() { + if directory_sender.send(directory.into()).is_err() { // If the channel has been prematurely closed, invoke close (so we can peek at the error code) // That error code is much more helpful, because it // contains the error message from the server. @@ -333,7 +382,7 @@ mod tests { .await .expect("must succeed"), ); - GRPCDirectoryService::from_client(client) + GRPCDirectoryService::from_client("test-instance".into(), client) }; assert!(grpc_client diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index 3b2795c3968c..a43d7b8d8d31 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -1,4 +1,4 @@ -use crate::{proto, B3Digest, Error}; +use crate::{B3Digest, Error}; use futures::stream::BoxStream; use std::collections::HashMap; use std::sync::Arc; @@ -7,17 +7,20 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryPutter, DirectoryService, SimplePutter}; +use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter}; +use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::proto; #[derive(Clone, Default)] pub struct MemoryDirectoryService { + instance_name: String, db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, } #[async_trait] impl DirectoryService for MemoryDirectoryService { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.read().await; match db.get(digest) { @@ -36,48 +39,33 @@ impl DirectoryService for MemoryDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } - - Ok(Some(directory.clone())) + Ok(Some(directory.clone().try_into().map_err(|e| { + crate::Error::StorageError(format!("corrupted directory: {}", e)) + })?)) } } } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - // store it let mut db = self.db.write().await; - db.insert(digest.clone(), directory); + db.insert(digest.clone(), directory.into()); Ok(digest) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, @@ -85,3 +73,33 @@ impl DirectoryService for MemoryDirectoryService { Box::new(SimplePutter::new(self.clone())) } } + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MemoryDirectoryServiceConfig {} + +impl TryFrom<url::Url> for MemoryDirectoryServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string()).into()); + } + Ok(MemoryDirectoryServiceConfig {}) + } +} + +#[async_trait] +impl ServiceBuilder for MemoryDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(MemoryDirectoryService { + instance_name: instance_name.to_string(), + db: Default::default(), + })) + } +} diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index eff4a685fa6d..b3cb0f4fd67b 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -1,7 +1,9 @@ -use crate::{proto, B3Digest, Error}; +use crate::composition::{Registry, ServiceBuilder}; +use crate::{B3Digest, Directory, Error}; + +use auto_impl::auto_impl; use futures::stream::BoxStream; use tonic::async_trait; - mod combinators; mod directory_graph; mod from_addr; @@ -9,22 +11,22 @@ mod grpc; mod memory; mod object_store; mod order_validator; +mod redb; mod simple_putter; -mod sled; #[cfg(test)] pub mod tests; mod traverse; mod utils; -pub use self::combinators::Cache; -pub use self::directory_graph::DirectoryGraph; +pub use self::combinators::{Cache, CacheConfig}; +pub use self::directory_graph::{DirectoryGraph, ValidatedDirectoryGraph}; pub use self::from_addr::from_addr; -pub use self::grpc::GRPCDirectoryService; -pub use self::memory::MemoryDirectoryService; -pub use self::object_store::ObjectStoreDirectoryService; +pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig}; +pub use self::memory::{MemoryDirectoryService, MemoryDirectoryServiceConfig}; +pub use self::object_store::{ObjectStoreDirectoryService, ObjectStoreDirectoryServiceConfig}; pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; +pub use self::redb::{RedbDirectoryService, RedbDirectoryServiceConfig}; pub use self::simple_putter::SimplePutter; -pub use self::sled::SledDirectoryService; pub use self::traverse::descend_to; pub use self::utils::traverse_directory; @@ -32,12 +34,13 @@ pub use self::utils::traverse_directory; mod bigtable; #[cfg(feature = "cloud")] -pub use self::bigtable::BigtableDirectoryService; +pub use self::bigtable::{BigtableDirectoryService, BigtableParameters}; /// The base trait all Directory services need to implement. -/// This is a simple get and put of [crate::proto::Directory], returning their +/// This is a simple get and put of [Directory], returning their /// digest. #[async_trait] +#[auto_impl(&, &mut, Arc, Box)] pub trait DirectoryService: Send + Sync { /// Looks up a single Directory message by its digest. /// The returned Directory message *must* be valid. @@ -47,14 +50,14 @@ pub trait DirectoryService: Send + Sync { /// Directory digests that are at the "root", aka the last element that's /// sent to a DirectoryPutter. This makes sense for implementations bundling /// closures of directories together in batches. - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>; /// Uploads a single Directory message, and returns the calculated /// digest, or an error. An error *must* also be returned if the message is /// not valid. - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + async fn put(&self, directory: Directory) -> Result<B3Digest, Error>; - /// Looks up a closure of [proto::Directory]. - /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, + /// Looks up a closure of [Directory]. + /// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. /// @@ -67,42 +70,19 @@ pub trait DirectoryService: Send + Sync { /// Directories are sent in an order from the root to the leaves, so that /// the receiving side can validate each message to be a connected to the root /// that has initially been requested. + /// + /// In case the directory can not be found, this should return an empty stream. fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>>; + ) -> BoxStream<'static, Result<Directory, Error>>; - /// Allows persisting a closure of [proto::Directory], which is a graph of + /// Allows persisting a closure of [Directory], which is a graph of /// connected Directory messages. fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; } -#[async_trait] -impl<A> DirectoryService for A -where - A: AsRef<dyn DirectoryService> + Send + Sync, -{ - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { - self.as_ref().get(digest).await - } - - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - self.as_ref().put(directory).await - } - - fn get_recursive( - &self, - root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { - self.as_ref().get_recursive(root_directory_digest) - } - - fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> { - self.as_ref().put_multiple_start() - } -} - -/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// Provides a handle to put a closure of connected [Directory] elements. /// /// The consumer can periodically call [DirectoryPutter::put], starting from the /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to @@ -114,15 +94,28 @@ where /// but a single file or symlink. #[async_trait] pub trait DirectoryPutter: Send { - /// Put a individual [proto::Directory] into the store. + /// Put a individual [Directory] into the store. /// Error semantics and behaviour is up to the specific implementation of /// this trait. /// Due to bursting, the returned error might refer to an object previously /// sent via `put`. - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + async fn put(&mut self, directory: Directory) -> Result<(), Error>; /// Close the stream, and wait for any errors. /// If there's been any invalid Directory message uploaded, and error *must* /// be returned. async fn close(&mut self) -> Result<B3Digest, Error>; } + +/// Registers the builtin DirectoryService implementations with the registry +pub(crate) fn register_directory_services(reg: &mut Registry) { + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::ObjectStoreDirectoryServiceConfig>("objectstore"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::MemoryDirectoryServiceConfig>("memory"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::CacheConfig>("cache"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::RedbDirectoryServiceConfig>("redb"); + #[cfg(feature = "cloud")] + { + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable"); + } +} diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index feaaaa39cd50..1916e59eacaa 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use data_encoding::HEXLOWER; @@ -16,9 +17,11 @@ use tracing::{instrument, trace, warn, Level}; use url::Url; use super::{ - DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator, + Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, + RootToLeavesValidator, }; -use crate::{proto, B3Digest, Error}; +use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{proto, B3Digest, Error, Node}; /// Stores directory closures in an object store. /// Notably, this makes use of the option to disallow accessing child directories except when @@ -29,6 +32,7 @@ use crate::{proto, B3Digest, Error}; /// be returned to the client in get_recursive. #[derive(Clone)] pub struct ObjectStoreDirectoryService { + instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path, } @@ -46,7 +50,7 @@ fn derive_dirs_path(base_path: &Path, digest: &B3Digest) -> Path { const MAX_FRAME_LENGTH: usize = 1 * 1024 * 1024 * 1000; // 1 MiB // impl ObjectStoreDirectoryService { - /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// Constructs a new [ObjectStoreDirectoryService] from a [Url] supported by /// [object_store]. /// Any path suffix becomes the base path of the object store. /// additional options, the same as in [object_store::parse_url_opts] can @@ -60,6 +64,7 @@ impl ObjectStoreDirectoryService { let (object_store, path) = object_store::parse_url_opts(url, options)?; Ok(Self { + instance_name: "root".into(), object_store: Arc::new(object_store), base_path: path, }) @@ -69,20 +74,32 @@ impl ObjectStoreDirectoryService { pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> { Self::parse_url_opts(url, Vec::<(String, String)>::new()) } + + pub fn new(instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path) -> Self { + Self { + instance_name, + object_store, + base_path, + } + } } #[async_trait] impl DirectoryService for ObjectStoreDirectoryService { /// This is the same steps as for get_recursive anyways, so we just call get_recursive and /// return the first element of the stream and drop the request. - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.get_recursive(digest).take(1).next().await.transpose() } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - if !directory.directories.is_empty() { + #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { + // Ensure the directory doesn't contain other directory children + if directory + .nodes() + .any(|(_, e)| matches!(e, Node::Directory { .. })) + { return Err(Error::InvalidRequest( "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(), )); @@ -93,11 +110,11 @@ impl DirectoryService for ObjectStoreDirectoryService { handle.close().await } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // Check that we are not passing on bogus from the object store to the client, and that the // trust chain from the root digest to the leaves is intact let mut order_validator = @@ -143,6 +160,10 @@ impl DirectoryService for ObjectStoreDirectoryService { warn!("unable to parse directory {}: {}", digest, e); Error::StorageError(e.to_string()) })?; + let directory = Directory::try_from(directory).map_err(|e| { + warn!("unable to convert directory {}: {}", digest, e); + Error::StorageError(e.to_string()) + })?; // Allow the children to appear next order_validator.add_directory_unchecked(&directory); @@ -169,6 +190,60 @@ impl DirectoryService for ObjectStoreDirectoryService { } } +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectStoreDirectoryServiceConfig { + object_store_url: String, + #[serde(default)] + object_store_options: HashMap<String, String>, +} + +impl TryFrom<url::Url> for ObjectStoreDirectoryServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // We need to convert the URL to string, strip the prefix there, and then + // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. + let trimmed_url = { + let s = url.to_string(); + let mut url = Url::parse( + s.strip_prefix("objectstore+") + .ok_or(Error::StorageError("Missing objectstore uri".into()))?, + )?; + // trim the query pairs, they might contain credentials or local settings we don't want to send as-is. + url.set_query(None); + url + }; + Ok(ObjectStoreDirectoryServiceConfig { + object_store_url: trimmed_url.into(), + object_store_options: url + .query_pairs() + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for ObjectStoreDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (object_store, path) = object_store::parse_url_opts( + &self.object_store_url.parse()?, + &self.object_store_options, + )?; + Ok(Arc::new(ObjectStoreDirectoryService::new( + instance_name.to_string(), + Arc::new(object_store), + path, + ))) + } +} + struct ObjectStoreDirectoryPutter { object_store: Arc<dyn ObjectStore>, base_path: Path, @@ -189,7 +264,7 @@ impl ObjectStoreDirectoryPutter { #[async_trait] impl DirectoryPutter for ObjectStoreDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -247,7 +322,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { for directory in directories { directories_sink - .send(directory.encode_to_vec().into()) + .send(proto::Directory::from(directory).encode_to_vec().into()) .await?; } diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs index 6045f5d24198..973af92e1294 100644 --- a/tvix/castore/src/directoryservice/order_validator.rs +++ b/tvix/castore/src/directoryservice/order_validator.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use tracing::warn; -use crate::{proto::Directory, B3Digest}; +use super::Directory; +use crate::{B3Digest, Node}; pub trait OrderValidator { /// Update the order validator's state with the directory @@ -47,10 +48,11 @@ impl RootToLeavesValidator { self.expected_digests.insert(directory.digest()); } - for subdir in &directory.directories { - // Allow the children to appear next - let subdir_digest = subdir.digest.clone().try_into().unwrap(); - self.expected_digests.insert(subdir_digest); + // Allow the children to appear next + for (_, node) in directory.nodes() { + if let Node::Directory { digest, .. } = node { + self.expected_digests.insert(digest.clone()); + } } } } @@ -79,15 +81,20 @@ impl OrderValidator for LeavesToRootValidator { fn add_directory(&mut self, directory: &Directory) -> bool { let digest = directory.digest(); - for subdir in &directory.directories { - let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory() - if !self.allowed_references.contains(&subdir_digest) { - warn!( - directory.digest = %digest, - subdirectory.digest = %subdir_digest, - "unexpected directory reference" - ); - return false; + for (_, node) in directory.nodes() { + if let Node::Directory { + digest: subdir_node_digest, + .. + } = node + { + if !self.allowed_references.contains(subdir_node_digest) { + warn!( + directory.digest = %digest, + subdirectory.digest = %subdir_node_digest, + "unexpected directory reference" + ); + return false; + } } } @@ -101,8 +108,8 @@ impl OrderValidator for LeavesToRootValidator { mod tests { use super::{LeavesToRootValidator, RootToLeavesValidator}; use crate::directoryservice::order_validator::OrderValidator; + use crate::directoryservice::Directory; use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; - use crate::proto::Directory; use rstest::rstest; #[rstest] diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs new file mode 100644 index 000000000000..246d64173f10 --- /dev/null +++ b/tvix/castore/src/directoryservice/redb.rs @@ -0,0 +1,312 @@ +use futures::stream::BoxStream; +use prost::Message; +use redb::{Database, TableDefinition}; +use std::{path::PathBuf, sync::Arc}; +use tonic::async_trait; +use tracing::{instrument, warn}; + +use super::{ + traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService, + LeavesToRootValidator, +}; +use crate::{ + composition::{CompositionContext, ServiceBuilder}, + digests, proto, B3Digest, Error, +}; + +const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = + TableDefinition::new("directory"); + +#[derive(Clone)] +pub struct RedbDirectoryService { + instance_name: String, + // We wrap the db in an Arc to be able to move it into spawn_blocking, + // as discussed in https://github.com/cberner/redb/issues/789 + db: Arc<Database>, +} + +impl RedbDirectoryService { + /// Constructs a new instance using the specified filesystem path for + /// storage. + pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> { + if path == PathBuf::from("/") { + return Err(Error::StorageError( + "cowardly refusing to open / with redb".to_string(), + )); + } + + let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> { + let db = redb::Database::create(path)?; + create_schema(&db)?; + Ok(db) + }) + .await??; + + Ok(Self { + instance_name, + db: Arc::new(db), + }) + } + + /// Constructs a new instance using the in-memory backend. + pub fn new_temporary() -> Result<Self, Error> { + let db = + redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?; + + create_schema(&db)?; + + Ok(Self { + instance_name: "root".into(), + db: Arc::new(db), + }) + } +} + +/// Ensures all tables are present. +/// Opens a write transaction and calls open_table on DIRECTORY_TABLE, which will +/// create it if not present. +fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { + let txn = db.begin_write()?; + txn.open_table(DIRECTORY_TABLE)?; + txn.commit()?; + + Ok(()) +} + +#[async_trait] +impl DirectoryService for RedbDirectoryService { + #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { + let db = self.db.clone(); + + // Retrieves the protobuf-encoded Directory for the corresponding digest. + let db_get_resp = tokio::task::spawn_blocking({ + let digest_as_array: [u8; digests::B3_LEN] = digest.to_owned().into(); + move || -> Result<_, redb::Error> { + let txn = db.begin_read()?; + let table = txn.open_table(DIRECTORY_TABLE)?; + Ok(table.get(digest_as_array)?) + } + }) + .await? + .map_err(|e| { + warn!(err=%e, "failed to retrieve Directory"); + Error::StorageError("failed to retrieve Directory".to_string()) + })?; + + // The Directory was not found, return None. + let directory_data = match db_get_resp { + None => return Ok(None), + Some(d) => d, + }; + + // We check that the digest of the retrieved Directory matches the expected digest. + let actual_digest = blake3::hash(directory_data.value().as_slice()); + if actual_digest.as_bytes() != digest.as_slice() { + warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest"); + return Err(Error::StorageError( + "requested Directory got the wrong digest".to_string(), + )); + } + + // Attempt to decode the retrieved protobuf-encoded Directory, returning a parsing error if + // the decoding failed. + let directory = match proto::Directory::decode(&*directory_data.value()) { + Ok(dir) => { + // The returned Directory must be valid. + dir.try_into().map_err(|e| { + warn!(err=%e, "Directory failed validation"); + Error::StorageError("Directory failed validation".to_string()) + })? + } + Err(e) => { + warn!(err=%e, "failed to parse Directory"); + return Err(Error::StorageError("failed to parse Directory".to_string())); + } + }; + + Ok(Some(directory)) + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { + tokio::task::spawn_blocking({ + let db = self.db.clone(); + move || { + let digest = directory.digest(); + + // Store the directory in the table. + let txn = db.begin_write()?; + { + let mut table = txn.open_table(DIRECTORY_TABLE)?; + let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into(); + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; + } + txn.commit()?; + + Ok(digest) + } + }) + .await? + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<'static, Result<Directory, Error>> { + // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single + // redb transaction to avoid constantly closing and opening new transactions for the + // database. + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> { + Box::new(RedbDirectoryPutter { + db: self.db.clone(), + directory_validator: Some(Default::default()), + }) + } +} + +pub struct RedbDirectoryPutter { + db: Arc<Database>, + + /// The directories (inside the directory validator) that we insert later, + /// or None, if they were already inserted. + directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>, +} + +#[async_trait] +impl DirectoryPutter for RedbDirectoryPutter { + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] + async fn put(&mut self, directory: Directory) -> Result<(), Error> { + match self.directory_validator { + None => return Err(Error::StorageError("already closed".to_string())), + Some(ref mut validator) => { + validator + .add(directory) + .map_err(|e| Error::StorageError(e.to_string()))?; + } + } + + Ok(()) + } + + #[instrument(level = "trace", skip_all, ret, err)] + async fn close(&mut self) -> Result<B3Digest, Error> { + match self.directory_validator.take() { + None => Err(Error::StorageError("already closed".to_string())), + Some(validator) => { + // Insert all directories as a batch. + tokio::task::spawn_blocking({ + let txn = self.db.begin_write()?; + move || { + // Retrieve the validated directories. + let directories = validator + .validate() + .map_err(|e| Error::StorageError(e.to_string()))? + .drain_leaves_to_root() + .collect::<Vec<_>>(); + + // Get the root digest, which is at the end (cf. insertion order) + let root_digest = directories + .last() + .ok_or_else(|| Error::StorageError("got no directories".to_string()))? + .digest(); + + { + let mut table = txn.open_table(DIRECTORY_TABLE)?; + + // Looping over all the verified directories, queuing them up for a + // batch insertion. + for directory in directories { + let digest_as_array: [u8; digests::B3_LEN] = + directory.digest().into(); + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; + } + } + + txn.commit()?; + + Ok(root_digest) + } + }) + .await? + } + } + } +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct RedbDirectoryServiceConfig { + is_temporary: bool, + #[serde(default)] + /// required when is_temporary = false + path: Option<PathBuf>, +} + +impl TryFrom<url::Url> for RedbDirectoryServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // redb doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string()).into()); + } + + Ok(if url.path().is_empty() { + RedbDirectoryServiceConfig { + is_temporary: true, + path: None, + } + } else { + RedbDirectoryServiceConfig { + is_temporary: false, + path: Some(url.path().into()), + } + }) + } +} + +#[async_trait] +impl ServiceBuilder for RedbDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + match self { + RedbDirectoryServiceConfig { + is_temporary: true, + path: None, + } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)), + RedbDirectoryServiceConfig { + is_temporary: true, + path: Some(_), + } => Err(Error::StorageError( + "Temporary RedbDirectoryService can not have path".into(), + ) + .into()), + RedbDirectoryServiceConfig { + is_temporary: false, + path: None, + } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()), + RedbDirectoryServiceConfig { + is_temporary: false, + path: Some(path), + } => Ok(Arc::new( + RedbDirectoryService::new(instance_name.to_string(), path.into()).await?, + )), + } + } +} diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs index dc54e3d11d18..b4daaee61b22 100644 --- a/tvix/castore/src/directoryservice/simple_putter.rs +++ b/tvix/castore/src/directoryservice/simple_putter.rs @@ -1,7 +1,6 @@ use super::DirectoryPutter; use super::DirectoryService; -use super::{DirectoryGraph, LeavesToRootValidator}; -use crate::proto; +use super::{Directory, DirectoryGraph, LeavesToRootValidator}; use crate::B3Digest; use crate::Error; use tonic::async_trait; @@ -29,7 +28,7 @@ impl<DS: DirectoryService> SimplePutter<DS> { #[async_trait] impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs deleted file mode 100644 index bd98ed6b1e01..000000000000 --- a/tvix/castore/src/directoryservice/sled.rs +++ /dev/null @@ -1,195 +0,0 @@ -use crate::proto::Directory; -use crate::{proto, B3Digest, Error}; -use futures::stream::BoxStream; -use prost::Message; -use std::ops::Deref; -use std::path::Path; -use tonic::async_trait; -use tracing::{instrument, warn}; - -use super::utils::traverse_directory; -use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; - -#[derive(Clone)] -pub struct SledDirectoryService { - db: sled::Db, -} - -impl SledDirectoryService { - pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> { - let config = sled::Config::default() - .use_compression(false) // is a required parameter - .path(p); - let db = config.open()?; - - Ok(Self { db }) - } - - pub fn new_temporary() -> Result<Self, sled::Error> { - let config = sled::Config::default().temporary(true); - let db = config.open()?; - - Ok(Self { db }) - } -} - -#[async_trait] -impl DirectoryService for SledDirectoryService { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { - let resp = tokio::task::spawn_blocking({ - let db = self.db.clone(); - let digest = digest.clone(); - move || db.get(digest.as_slice()) - }) - .await? - .map_err(|e| { - warn!("failed to retrieve directory: {}", e); - Error::StorageError(format!("failed to retrieve directory: {}", e)) - })?; - - match resp { - // The directory was not found, return - None => Ok(None), - - // The directory was found, try to parse the data as Directory message - Some(data) => match Directory::decode(&*data) { - Ok(directory) => { - // Validate the retrieved Directory indeed has the - // digest we expect it to have, to detect corruptions. - let actual_digest = directory.digest(); - if actual_digest != *digest { - return Err(Error::StorageError(format!( - "requested directory with digest {}, but got {}", - digest, actual_digest - ))); - } - - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } - - Ok(Some(directory)) - } - Err(e) => { - warn!("unable to parse directory {}: {}", digest, e); - Err(Error::StorageError(e.to_string())) - } - }, - } - } - - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - tokio::task::spawn_blocking({ - let db = self.db.clone(); - move || { - let digest = directory.digest(); - - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - // store it - db.insert(digest.as_slice(), directory.encode_to_vec()) - .map_err(|e| Error::StorageError(e.to_string()))?; - - Ok(digest) - } - }) - .await? - } - - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive( - &self, - root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { - traverse_directory(self.clone(), root_directory_digest) - } - - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> - where - Self: Clone, - { - Box::new(SledDirectoryPutter { - tree: self.db.deref().clone(), - directory_validator: Some(Default::default()), - }) - } -} - -/// Buffers Directory messages to be uploaded and inserts them in a batch -/// transaction on close. -pub struct SledDirectoryPutter { - tree: sled::Tree, - - /// The directories (inside the directory validator) that we insert later, - /// or None, if they were already inserted. - directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>, -} - -#[async_trait] -impl DirectoryPutter for SledDirectoryPutter { - #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { - match self.directory_validator { - None => return Err(Error::StorageError("already closed".to_string())), - Some(ref mut validator) => { - validator - .add(directory) - .map_err(|e| Error::StorageError(e.to_string()))?; - } - } - - Ok(()) - } - - #[instrument(level = "trace", skip_all, ret, err)] - async fn close(&mut self) -> Result<B3Digest, Error> { - match self.directory_validator.take() { - None => Err(Error::InvalidRequest("already closed".to_string())), - Some(validator) => { - // Insert all directories as a batch. - tokio::task::spawn_blocking({ - let tree = self.tree.clone(); - move || { - // retrieve the validated directories. - let directories = validator - .validate() - .map_err(|e| Error::StorageError(e.to_string()))? - .drain_leaves_to_root() - .collect::<Vec<_>>(); - - // Get the root digest, which is at the end (cf. insertion order) - let root_digest = directories - .last() - .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))? - .digest(); - - let mut batch = sled::Batch::default(); - for directory in directories { - batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); - } - - tree.apply_batch(batch).map_err(|e| { - Error::StorageError(format!("unable to apply batch: {}", e)) - })?; - - Ok(root_digest) - } - }) - .await? - } - } - } -} diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index cc3c5b788a2c..d394a5679c32 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -8,10 +8,8 @@ use rstest_reuse::{self, *}; use super::DirectoryService; use crate::directoryservice; -use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, - proto::{self, Directory}, -}; +use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}; +use crate::{Directory, Node}; mod utils; use self::utils::make_grpc_directory_service_client; @@ -25,17 +23,27 @@ use self::utils::make_grpc_directory_service_client; #[rstest] #[case::grpc(make_grpc_directory_service_client().await)] #[case::memory(directoryservice::from_addr("memory://").await.unwrap())] -#[case::sled(directoryservice::from_addr("sled://").await.unwrap())] +#[case::redb(directoryservice::from_addr("redb://").await.unwrap())] #[case::objectstore(directoryservice::from_addr("objectstore+memory://").await.unwrap())] #[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))] pub fn directory_services(#[case] directory_service: impl DirectoryService) {} -/// Ensures asking for a directory that doesn't exist returns a Ok(None). +/// Ensures asking for a directory that doesn't exist returns a Ok(None), and a get_recursive +/// returns an empty stream. #[apply(directory_services)] #[tokio::test] async fn test_non_exist(directory_service: impl DirectoryService) { - let resp = directory_service.get(&DIRECTORY_A.digest()).await; - assert!(resp.unwrap().is_none()) + // single get + assert_eq!(Ok(None), directory_service.get(&DIRECTORY_A.digest()).await); + + // recursive get + assert_eq!( + Vec::<Result<Directory, crate::Error>>::new(), + directory_service + .get_recursive(&DIRECTORY_A.digest()) + .collect::<Vec<Result<Directory, crate::Error>>>() + .await + ); } /// Putting a single directory into the store, and then getting it out both via @@ -123,6 +131,46 @@ async fn put_get_multiple_dedup(directory_service: impl DirectoryService) { ) } +/// This tests the insertion and retrieval of a closure which contains a duplicated directory +/// (DIRECTORY_A, which is an empty directory), once in the root, and once in a subdir. +#[apply(directory_services)] +#[tokio::test] +async fn put_get_foo(directory_service: impl DirectoryService) { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_A.clone()).await.unwrap(); + handle.put(DIRECTORY_B.clone()).await.unwrap(); + handle.put(DIRECTORY_D.clone()).await.unwrap(); + let root_digest = handle.close().await.unwrap(); + assert_eq!( + DIRECTORY_D.digest(), + root_digest, + "root digest should match" + ); + + // Ensure we can get the closure back out of the service, and it is returned in a valid order + // (there are multiple valid possibilities) + let retrieved_closure = directory_service + .get_recursive(&DIRECTORY_D.digest()) + .collect::<Vec<_>>() + .await; + + let valid_closures = [ + vec![ + Ok(DIRECTORY_D.clone()), + Ok(DIRECTORY_B.clone()), + Ok(DIRECTORY_A.clone()), + ], + vec![ + Ok(DIRECTORY_D.clone()), + Ok(DIRECTORY_A.clone()), + Ok(DIRECTORY_B.clone()), + ], + ]; + if !valid_closures.contains(&retrieved_closure) { + panic!("invalid closure returned: {:?}", retrieved_closure); + } +} + /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, /// as B itself would be left unconnected. #[apply(directory_services)] @@ -161,58 +209,20 @@ async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService } } -/// Try uploading a Directory failing its internal validation, ensure it gets -/// rejected. -#[apply(directory_services)] -#[tokio::test] -async fn upload_reject_failing_validation(directory_service: impl DirectoryService) { - let broken_directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // wrong! - target: "doesntmatter".into(), - }], - ..Default::default() - }; - assert!(broken_directory.validate().is_err()); - - // Try to upload via single upload. - assert!( - directory_service - .put(broken_directory.clone()) - .await - .is_err(), - "single upload must fail" - ); - - // Try to upload via put_multiple. We're a bit more permissive here, the - // intermediate .put() might succeed, due to client-side bursting (in the - // case of gRPC), but then the close MUST fail. - let mut handle = directory_service.put_multiple_start(); - if handle.put(broken_directory).await.is_ok() { - assert!( - handle.close().await.is_err(), - "when succeeding put, close must fail" - ) - } -} - /// Try uploading a Directory that refers to a previously-uploaded directory. /// Both pass their isolated validation, but the size field in the parent is wrong. /// This should be rejected. #[apply(directory_services)] #[tokio::test] async fn upload_reject_wrong_size(directory_service: impl DirectoryService) { - let wrong_parent_directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), + let wrong_parent_directory = Directory::try_from_iter([( + "foo".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() - }; - - // Make sure isolated validation itself is ok - assert!(wrong_parent_directory.validate().is_ok()); + }, + )]) + .unwrap(); // Now upload both. Ensure it either fails during the second put, or during // the close. diff --git a/tvix/castore/src/directoryservice/tests/utils.rs b/tvix/castore/src/directoryservice/tests/utils.rs index 0f706695eec8..f2aefa32b3cc 100644 --- a/tvix/castore/src/directoryservice/tests/utils.rs +++ b/tvix/castore/src/directoryservice/tests/utils.rs @@ -6,6 +6,7 @@ use crate::{ proto::directory_service_server::DirectoryServiceServer, }; +use hyper_util::rt::TokioIo; use tonic::transport::{Endpoint, Server, Uri}; /// Constructs and returns a gRPC DirectoryService. @@ -32,12 +33,13 @@ pub async fn make_grpc_directory_service_client() -> Box<dyn DirectoryService> { // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); Box::new(GRPCDirectoryService::from_client( + "root".into(), DirectoryServiceClient::new( Endpoint::try_from("http://[::]:50051") .unwrap() .connect_with_connector(tower::service_fn(move |_: Uri| { let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(right) } + async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } })) .await .unwrap(), diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs index 17a51ae2bbff..0bd67e9bcf1f 100644 --- a/tvix/castore/src/directoryservice/traverse.rs +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -1,8 +1,4 @@ -use super::DirectoryService; -use crate::{ - proto::{node::Node, NamedNode}, - B3Digest, Error, Path, -}; +use crate::{directoryservice::DirectoryService, Error, Node, Path}; use tracing::{instrument, warn}; /// This descends from a (root) node to the given (sub)path, returning the Node @@ -17,19 +13,14 @@ where DS: AsRef<dyn DirectoryService>, { let mut parent_node = root_node; - for component in path.as_ref().components() { + for component in path.as_ref().components_bytes() { match parent_node { - Node::File(_) | Node::Symlink(_) => { + Node::File { .. } | Node::Symlink { .. } => { // There's still some path left, but the parent node is no directory. // This means the path doesn't exist, as we can't reach it. return Ok(None); } - Node::Directory(directory_node) => { - let digest: B3Digest = directory_node - .digest - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - + Node::Directory { digest, .. } => { // fetch the linked node from the directory_service. let directory = directory_service @@ -44,15 +35,16 @@ where })?; // look for the component in the [Directory]. - // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we - // could stop as soon as e.name is larger than the search string. - if let Some(child_node) = directory.nodes().find(|n| n.get_name() == component) { + if let Some((_child_name, child_node)) = directory + .into_nodes() + .find(|(name, _node)| name.as_ref() == component) + { // child node found, update prev_node to that and continue. - parent_node = child_node; + parent_node = child_node.clone(); } else { // child node not found means there's no such element inside the directory. return Ok(None); - } + }; } } } @@ -65,8 +57,8 @@ where mod tests { use crate::{ directoryservice, - fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, - PathBuf, + fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST}, + Node, PathBuf, }; use super::descend_to; @@ -88,21 +80,23 @@ mod tests { handle.close().await.expect("must upload"); // construct the node for DIRECTORY_COMPLICATED - let node_directory_complicated = - crate::proto::node::Node::Directory(crate::proto::DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }); + let node_directory_complicated = Node::Directory { + digest: DIRECTORY_COMPLICATED.digest(), + size: DIRECTORY_COMPLICATED.size(), + }; // construct the node for DIRECTORY_COMPLICATED - let node_directory_with_keep = crate::proto::node::Node::Directory( - DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), - ); + let node_directory_with_keep = Node::Directory { + digest: DIRECTORY_WITH_KEEP.digest(), + size: DIRECTORY_WITH_KEEP.size(), + }; // construct the node for the .keep file - let node_file_keep = - crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + let node_file_keep = Node::File { + digest: EMPTY_BLOB_DIGEST.clone(), + size: 0, + executable: false, + }; // traversal to an empty subpath should return the root node. { diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index a0ba395ecda8..d073c2c3c8ec 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -1,21 +1,22 @@ +use super::Directory; use super::DirectoryService; -use crate::proto; use crate::B3Digest; use crate::Error; +use crate::Node; use async_stream::try_stream; use futures::stream::BoxStream; use std::collections::{HashSet, VecDeque}; use tracing::instrument; use tracing::warn; -/// Traverses a [proto::Directory] from the root to the children. +/// Traverses a [Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. #[instrument(skip(directory_service))] pub fn traverse_directory<'a, DS: DirectoryService + 'static>( directory_service: DS, root_directory_digest: &B3Digest, -) -> BoxStream<'a, Result<proto::Directory, Error>> { +) -> BoxStream<'a, Result<Directory, Error>> { // The list of all directories that still need to be traversed. The next // element is picked from the front, new elements are enqueued at the // back. @@ -25,32 +26,30 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( // We omit sending the same directories multiple times. let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new(); + let root_directory_digest = root_directory_digest.clone(); + Box::pin(try_stream! { while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { - let current_directory = directory_service.get(¤t_directory_digest).await.map_err(|e| { + let current_directory = match directory_service.get(¤t_directory_digest).await.map_err(|e| { warn!("failed to look up directory"); Error::StorageError(format!( "unable to look up directory {}: {}", current_directory_digest, e )) - })?.ok_or_else(|| { - // if it's not there, we have an inconsistent store! - warn!("directory {} does not exist", current_directory_digest); - Error::StorageError(format!( - "directory {} does not exist", - current_directory_digest - )) - - })?; - - // validate, we don't want to send invalid directories. - current_directory.validate().map_err(|e| { - warn!("directory failed validation: {}", e.to_string()); - Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - )) - })?; + })? { + // the root node of the requested closure was not found, return an empty list + None if current_directory_digest == root_directory_digest => break, + // if a child directory of the closure is not there, we have an inconsistent store! + None => { + warn!("directory {} does not exist", current_directory_digest); + Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + )))?; + break; + } + Some(dir) => dir, + }; // We're about to send this directory, so let's avoid sending it again if a // descendant has it. @@ -59,16 +58,15 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( // enqueue all child directory digests to the work queue, as // long as they're not part of the worklist or already sent. // This panics if the digest looks invalid, it's supposed to be checked first. - for child_directory_node in ¤t_directory.directories { - // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); - - if worklist_directory_digests.contains(&child_digest) - || sent_directory_digests.contains(&child_digest) - { - continue; + for (_, child_directory_node) in current_directory.nodes() { + if let Node::Directory{digest: child_digest, ..} = child_directory_node { + if worklist_directory_digests.contains(child_digest) + || sent_directory_digests.contains(child_digest) + { + continue; + } + worklist_directory_digests.push_back(child_digest.clone()); } - worklist_directory_digests.push_back(child_digest); } yield current_directory; diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs index 8343d0774aec..1c4605200842 100644 --- a/tvix/castore/src/errors.rs +++ b/tvix/castore/src/errors.rs @@ -1,7 +1,13 @@ +use bstr::ByteSlice; use thiserror::Error; use tokio::task::JoinError; use tonic::Status; +use crate::{ + path::{PathComponent, PathComponentError}, + SymlinkTargetError, +}; + /// Errors related to communication with the store. #[derive(Debug, Error, PartialEq)] pub enum Error { @@ -12,6 +18,52 @@ pub enum Error { StorageError(String), } +/// Errors that occur during construction of [crate::Node] +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum ValidateNodeError { + /// Invalid digest length encountered + #[error("invalid digest length: {0}")] + InvalidDigestLen(usize), + /// Invalid symlink target + #[error("Invalid symlink target: {0}")] + InvalidSymlinkTarget(SymlinkTargetError), +} + +impl From<crate::digests::Error> for ValidateNodeError { + fn from(e: crate::digests::Error) -> Self { + match e { + crate::digests::Error::InvalidDigestLen(n) => ValidateNodeError::InvalidDigestLen(n), + } + } +} + +/// Errors that can occur when populating [crate::Directory] messages, +/// or parsing [crate::proto::Directory] +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum DirectoryError { + /// Multiple elements with the same name encountered + #[error("{:?} is a duplicate name", .0)] + DuplicateName(PathComponent), + /// Node failed validation + #[error("invalid node with name {}: {:?}", .0.as_bstr(), .1.to_string())] + InvalidNode(bytes::Bytes, ValidateNodeError), + #[error("Total size exceeds u64::MAX")] + SizeOverflow, + /// Invalid name encountered + #[error("Invalid name: {0}")] + InvalidName(PathComponentError), + /// This can occur if a protobuf node with a name is passed where we expect + /// it to be anonymous. + #[error("Name is set when it shouldn't")] + NameInAnonymousNode, + /// Elements are not in sorted order. Can only happen on protos + #[error("{:?} is not sorted", .0.as_bstr())] + WrongSorting(bytes::Bytes), + /// This can only happen if there's an unknown node type (on protos) + #[error("No node set")] + NoNodeSet, +} + impl From<JoinError> for Error { fn from(value: JoinError) -> Self { Error::StorageError(value.to_string()) @@ -33,6 +85,42 @@ impl From<crate::tonic::Error> for Error { } } +impl From<redb::Error> for Error { + fn from(value: redb::Error) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<redb::DatabaseError> for Error { + fn from(value: redb::DatabaseError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<redb::TableError> for Error { + fn from(value: redb::TableError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<redb::TransactionError> for Error { + fn from(value: redb::TransactionError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<redb::StorageError> for Error { + fn from(value: redb::StorageError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<redb::CommitError> for Error { + fn from(value: redb::CommitError) -> Self { + Error::StorageError(value.to_string()) + } +} + impl From<std::io::Error> for Error { fn from(value: std::io::Error) -> Self { if value.kind() == std::io::ErrorKind::InvalidInput { diff --git a/tvix/castore/src/fixtures.rs b/tvix/castore/src/fixtures.rs index a206d9b7ddc6..db0ee59daf60 100644 --- a/tvix/castore/src/fixtures.rs +++ b/tvix/castore/src/fixtures.rs @@ -1,88 +1,120 @@ -use crate::{ - proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode}, - B3Digest, -}; -use lazy_static::lazy_static; +use bytes::Bytes; +use std::sync::LazyLock; + +use crate::{B3Digest, Directory, Node}; pub const HELLOWORLD_BLOB_CONTENTS: &[u8] = b"Hello World!"; pub const EMPTY_BLOB_CONTENTS: &[u8] = b""; -lazy_static! { - pub static ref DUMMY_DIGEST: B3Digest = { - let u = [0u8; 32]; - (&u).into() - }; - pub static ref DUMMY_DIGEST_2: B3Digest = { - let mut u = [0u8; 32]; - u[0] = 0x10; - (&u).into() - }; - pub static ref DUMMY_DATA_1: bytes::Bytes = vec![0x01, 0x02, 0x03].into(); - pub static ref DUMMY_DATA_2: bytes::Bytes = vec![0x04, 0x05].into(); +pub static DUMMY_DIGEST: LazyLock<B3Digest> = LazyLock::new(|| (&[0u8; 32]).into()); +pub static DUMMY_DIGEST_2: LazyLock<B3Digest> = LazyLock::new(|| { + let mut u = [0u8; 32]; + u[0] = 0x10; + (&u).into() +}); +pub static DUMMY_DATA_1: LazyLock<Bytes> = LazyLock::new(|| vec![0x01, 0x02, 0x03].into()); +pub static DUMMY_DATA_2: LazyLock<Bytes> = LazyLock::new(|| vec![0x04, 0x05].into()); - pub static ref HELLOWORLD_BLOB_DIGEST: B3Digest = - blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into(); - pub static ref EMPTY_BLOB_DIGEST: B3Digest = - blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into(); +pub static HELLOWORLD_BLOB_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into()); +pub static EMPTY_BLOB_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into()); - // 2 bytes - pub static ref BLOB_A: bytes::Bytes = vec![0x00, 0x01].into(); - pub static ref BLOB_A_DIGEST: B3Digest = blake3::hash(&BLOB_A).as_bytes().into(); +// 2 bytes +pub static BLOB_A: LazyLock<Bytes> = LazyLock::new(|| vec![0x00, 0x01].into()); +pub static BLOB_A_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&BLOB_A).as_bytes().into()); - // 1MB - pub static ref BLOB_B: bytes::Bytes = (0..255).collect::<Vec<u8>>().repeat(4 * 1024).into(); - pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into(); +// 1MB +pub static BLOB_B: LazyLock<Bytes> = + LazyLock::new(|| (0..255).collect::<Vec<u8>>().repeat(4 * 1024).into()); +pub static BLOB_B_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&BLOB_B).as_bytes().into()); - // Directories - pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory { - directories: vec![], - files: vec![FileNode { - name: b".keep".to_vec().into(), - digest: EMPTY_BLOB_DIGEST.clone().into(), - size: 0, - executable: false, - }], - symlinks: vec![], - }; - pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory { - directories: vec![DirectoryNode { - name: b"keep".to_vec().into(), - digest: DIRECTORY_WITH_KEEP.digest().into(), - size: DIRECTORY_WITH_KEEP.size(), - }], - files: vec![FileNode { - name: b".keep".to_vec().into(), - digest: EMPTY_BLOB_DIGEST.clone().into(), +// Directories +pub static DIRECTORY_WITH_KEEP: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([( + ".keep".try_into().unwrap(), + Node::File { + digest: EMPTY_BLOB_DIGEST.clone(), size: 0, executable: false, - }], - symlinks: vec![SymlinkNode { - name: b"aa".to_vec().into(), - target: b"/nix/store/somewhereelse".to_vec().into(), - }], - }; - pub static ref DIRECTORY_A: Directory = Directory::default(); - pub static ref DIRECTORY_B: Directory = Directory { - directories: vec![DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), + }, + )]) + .unwrap() +}); +pub static DIRECTORY_COMPLICATED: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([ + ( + "keep".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_WITH_KEEP.digest(), + size: DIRECTORY_WITH_KEEP.size(), + }, + ), + ( + ".keep".try_into().unwrap(), + Node::File { + digest: EMPTY_BLOB_DIGEST.clone(), + size: 0, + executable: false, + }, + ), + ( + "aa".try_into().unwrap(), + Node::Symlink { + target: "/nix/store/somewhereelse".try_into().unwrap(), + }, + ), + ]) + .unwrap() +}); +pub static DIRECTORY_A: LazyLock<Directory> = LazyLock::new(Directory::new); +pub static DIRECTORY_B: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([( + "a".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size(), - }], - ..Default::default() - }; - pub static ref DIRECTORY_C: Directory = Directory { - directories: vec![ - DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), + }, + )]) + .unwrap() +}); +pub static DIRECTORY_C: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([ + ( + "a".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), + size: DIRECTORY_A.size(), + }, + ), + ( + "a'".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size(), }, - DirectoryNode { - name: b"a'".to_vec().into(), - digest: DIRECTORY_A.digest().into(), + ), + ]) + .unwrap() +}); +pub static DIRECTORY_D: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([ + ( + "a".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size(), - } - ], - ..Default::default() - }; -} + }, + ), + ( + "b".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_B.digest(), + size: DIRECTORY_B.size(), + }, + ), + ]) + .unwrap() +}); diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs index bcebcf4a7292..0d68af090daf 100644 --- a/tvix/castore/src/fs/fuse/tests.rs +++ b/tvix/castore/src/fs/fuse/tests.rs @@ -1,5 +1,4 @@ use bstr::ByteSlice; -use bytes::Bytes; use std::{ collections::BTreeMap, ffi::{OsStr, OsString}, @@ -12,13 +11,14 @@ use tempfile::TempDir; use tokio_stream::{wrappers::ReadDirStream, StreamExt}; use super::FuseDaemon; -use crate::fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST}; -use crate::proto as castorepb; -use crate::proto::node::Node; use crate::{ blobservice::{BlobService, MemoryBlobService}, directoryservice::{DirectoryService, MemoryDirectoryService}, - fixtures, + fixtures, Node, +}; +use crate::{ + fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST}, + PathComponent, }; const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; @@ -39,14 +39,14 @@ fn gen_svcs() -> (Arc<dyn BlobService>, Arc<dyn DirectoryService>) { fn do_mount<P: AsRef<Path>, BS, DS>( blob_service: BS, directory_service: DS, - root_nodes: BTreeMap<bytes::Bytes, Node>, + root_nodes: BTreeMap<PathComponent, Node>, mountpoint: P, list_root: bool, show_xattr: bool, ) -> io::Result<FuseDaemon> where - BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, - DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, + BS: BlobService + Send + Sync + Clone + 'static, + DS: DirectoryService + Send + Sync + Clone + 'static, { let fs = TvixStoreFs::new( blob_service, @@ -60,7 +60,7 @@ where async fn populate_blob_a( blob_service: &Arc<dyn BlobService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { let mut bw = blob_service.open_write().await; tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) @@ -69,19 +69,18 @@ async fn populate_blob_a( bw.close().await.expect("must succeed closing"); root_nodes.insert( - BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), + BLOB_A_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::BLOB_A_DIGEST.clone(), size: fixtures::BLOB_A.len() as u64, executable: false, - }), + }, ); } async fn populate_blob_b( blob_service: &Arc<dyn BlobService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { let mut bw = blob_service.open_write().await; tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) @@ -90,20 +89,19 @@ async fn populate_blob_b( bw.close().await.expect("must succeed closing"); root_nodes.insert( - BLOB_B_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_B_NAME.into(), - digest: fixtures::BLOB_B_DIGEST.clone().into(), + BLOB_B_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::BLOB_B_DIGEST.clone(), size: fixtures::BLOB_B.len() as u64, executable: false, - }), + }, ); } /// adds a blob containing helloworld and marks it as executable async fn populate_blob_helloworld( blob_service: &Arc<dyn BlobService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { let mut bw = blob_service.open_write().await; tokio::io::copy( @@ -115,42 +113,39 @@ async fn populate_blob_helloworld( bw.close().await.expect("must succeed closing"); root_nodes.insert( - HELLOWORLD_BLOB_NAME.into(), - Node::File(castorepb::FileNode { - name: HELLOWORLD_BLOB_NAME.into(), - digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), + HELLOWORLD_BLOB_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone(), size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, executable: true, - }), + }, ); } -async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_symlink(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - SYMLINK_NAME.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME.into(), - target: BLOB_A_NAME.into(), - }), + SYMLINK_NAME.try_into().unwrap(), + Node::Symlink { + target: BLOB_A_NAME.try_into().unwrap(), + }, ); } /// This writes a symlink pointing to /nix/store/somewhereelse, /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. -async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_symlink2(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - SYMLINK_NAME2.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME2.into(), - target: "/nix/store/somewhereelse".into(), - }), + SYMLINK_NAME2.try_into().unwrap(), + Node::Symlink { + target: "/nix/store/somewhereelse".try_into().unwrap(), + }, ); } async fn populate_directory_with_keep( blob_service: &Arc<dyn BlobService>, directory_service: &Arc<dyn DirectoryService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { // upload empty blob let mut bw = blob_service.open_write().await; @@ -166,45 +161,42 @@ async fn populate_directory_with_keep( .expect("must succeed uploading"); root_nodes.insert( - DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + DIRECTORY_WITH_KEEP_NAME.try_into().unwrap(), + Node::Directory { + digest: fixtures::DIRECTORY_WITH_KEEP.digest(), size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + }, ); } /// Create a root node for DIRECTORY_WITH_KEEP, but don't upload the Directory /// itself. -async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + DIRECTORY_WITH_KEEP_NAME.try_into().unwrap(), + Node::Directory { + digest: fixtures::DIRECTORY_WITH_KEEP.digest(), size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + }, ); } /// Insert BLOB_A, but don't provide the blob .keep is pointing to. -async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), + BLOB_A_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::BLOB_A_DIGEST.clone(), size: fixtures::BLOB_A.len() as u64, executable: false, - }), + }, ); } async fn populate_directory_complicated( blob_service: &Arc<dyn BlobService>, directory_service: &Arc<dyn DirectoryService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { // upload empty blob let mut bw = blob_service.open_write().await; @@ -226,12 +218,11 @@ async fn populate_directory_complicated( .expect("must succeed uploading"); root_nodes.insert( - DIRECTORY_COMPLICATED_NAME.into(), - Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_COMPLICATED_NAME.into(), - digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), + DIRECTORY_COMPLICATED_NAME.try_into().unwrap(), + Node::Directory { + digest: fixtures::DIRECTORY_COMPLICATED.digest(), size: fixtures::DIRECTORY_COMPLICATED.size(), - }), + }, ); } diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs index bdd459543470..2696fdede378 100644 --- a/tvix/castore/src/fs/inodes.rs +++ b/tvix/castore/src/fs/inodes.rs @@ -2,10 +2,7 @@ //! about inodes, which present tvix-castore nodes in a filesystem. use std::time::Duration; -use bytes::Bytes; - -use crate::proto as castorepb; -use crate::B3Digest; +use crate::{path::PathComponent, B3Digest, Node}; #[derive(Clone, Debug)] pub enum InodeData { @@ -20,27 +17,23 @@ pub enum InodeData { /// lookup and did fetch the data. #[derive(Clone, Debug)] pub enum DirectoryInodeData { - Sparse(B3Digest, u64), // digest, size - Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)] + Sparse(B3Digest, u64), // digest, size + Populated(B3Digest, Vec<(u64, PathComponent, Node)>), // [(child_inode, name, node)] } impl InodeData { /// Constructs a new InodeData by consuming a [Node]. - /// It splits off the orginal name, so it can be used later. - pub fn from_node(node: castorepb::node::Node) -> (Self, Bytes) { + pub fn from_node(node: &Node) -> Self { match node { - castorepb::node::Node::Directory(n) => ( - Self::Directory(DirectoryInodeData::Sparse( - n.digest.try_into().unwrap(), - n.size, - )), - n.name, - ), - castorepb::node::Node::File(n) => ( - Self::Regular(n.digest.try_into().unwrap(), n.size, n.executable), - n.name, - ), - castorepb::node::Node::Symlink(n) => (Self::Symlink(n.target), n.name), + Node::Directory { digest, size } => { + Self::Directory(DirectoryInodeData::Sparse(digest.clone(), *size)) + } + Node::File { + digest, + size, + executable, + } => Self::Regular(digest.clone(), *size, *executable), + Node::Symlink { target } => Self::Symlink(target.clone().into()), } } diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index 176199f64aaf..4f50868b8f44 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -15,15 +15,13 @@ use self::{ inode_tracker::InodeTracker, inodes::{DirectoryInodeData, InodeData}, }; -use crate::proto as castorepb; use crate::{ blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, - proto::{node::Node, NamedNode}, - B3Digest, + path::PathComponent, + B3Digest, Node, }; use bstr::ByteVec; -use bytes::Bytes; use fuse_backend_rs::abi::fuse_abi::{stat64, OpenOptions}; use fuse_backend_rs::api::filesystem::{ Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID, @@ -43,7 +41,7 @@ use tokio::{ io::{AsyncReadExt, AsyncSeekExt}, sync::mpsc, }; -use tracing::{debug, error, instrument, warn, Span}; +use tracing::{debug, error, instrument, warn, Instrument as _, Span}; /// This implements a read-only FUSE filesystem for a tvix-store /// with the passed [BlobService], [DirectoryService] and [RootNodes]. @@ -89,7 +87,7 @@ pub struct TvixStoreFs<BS, DS, RN> { show_xattr: bool, /// This maps a given basename in the root to the inode we allocated for the node. - root_nodes: RwLock<HashMap<Bytes, u64>>, + root_nodes: RwLock<HashMap<PathComponent, u64>>, /// This keeps track of inodes and data alongside them. inode_tracker: RwLock<InodeTracker>, @@ -105,7 +103,7 @@ pub struct TvixStoreFs<BS, DS, RN> { u64, ( Span, - Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>, + Arc<Mutex<mpsc::Receiver<(usize, Result<(PathComponent, Node), crate::Error>)>>>, ), >, >, @@ -123,8 +121,8 @@ pub struct TvixStoreFs<BS, DS, RN> { impl<BS, DS, RN> TvixStoreFs<BS, DS, RN> where - BS: AsRef<dyn BlobService> + Clone + Send, - DS: AsRef<dyn DirectoryService> + Clone + Send + 'static, + BS: BlobService + Clone + Send, + DS: DirectoryService + Clone + Send + 'static, RN: RootNodes + Clone + 'static, { pub fn new( @@ -156,7 +154,7 @@ where /// Retrieves the inode for a given root node basename, if present. /// This obtains a read lock on self.root_nodes. - fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> { + fn get_inode_for_root_name(&self, name: &PathComponent) -> Option<u64> { self.root_nodes.read().get(name).cloned() } @@ -167,8 +165,12 @@ where /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup /// in self.directory_service is performed, and self.inode_tracker is updated with the /// [DirectoryInodeData::Populated]. + #[allow(clippy::type_complexity)] #[instrument(skip(self), err)] - fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> { + fn get_directory_children( + &self, + ino: u64, + ) -> io::Result<(B3Digest, Vec<(u64, PathComponent, Node)>)> { let data = self.inode_tracker.read().get(ino).unwrap(); match *data { // if it's populated already, return children. @@ -184,7 +186,7 @@ where .block_on({ let directory_service = self.directory_service.clone(); let parent_digest = parent_digest.to_owned(); - async move { directory_service.as_ref().get(&parent_digest).await } + async move { directory_service.get(&parent_digest).await } })? .ok_or_else(|| { warn!(directory.digest=%parent_digest, "directory not found"); @@ -198,13 +200,13 @@ where let children = { let mut inode_tracker = self.inode_tracker.write(); - let children: Vec<(u64, castorepb::node::Node)> = directory - .nodes() - .map(|child_node| { - let (inode_data, _) = InodeData::from_node(child_node.clone()); + let children: Vec<(u64, PathComponent, Node)> = directory + .into_nodes() + .map(|(child_name, child_node)| { + let inode_data = InodeData::from_node(&child_node); let child_ino = inode_tracker.put(inode_data); - (child_ino, child_node) + (child_ino, child_name, child_node) }) .collect(); @@ -238,12 +240,12 @@ where /// In the case the name can't be found, a libc::ENOENT is returned. fn name_in_root_to_ino_and_data( &self, - name: &std::ffi::CStr, + name: &PathComponent, ) -> io::Result<(u64, Arc<InodeData>)> { // Look up the inode for that root node. // If there's one, [self.inode_tracker] MUST also contain the data, // which we can then return. - if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) { + if let Some(inode) = self.get_inode_for_root_name(name) { return Ok(( inode, self.inode_tracker @@ -257,7 +259,8 @@ where // We don't have it yet, look it up in [self.root_nodes]. match self.tokio_handle.block_on({ let root_nodes_provider = self.root_nodes_provider.clone(); - async move { root_nodes_provider.get_by_basename(name.to_bytes()).await } + let name = name.clone(); + async move { root_nodes_provider.get_by_basename(&name).await } }) { // if there was an error looking up the root node, propagate up an IO error. Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)), @@ -265,15 +268,9 @@ where Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), // The root node does exist Ok(Some(root_node)) => { - // The name must match what's passed in the lookup, otherwise this is also a ENOENT. - if root_node.get_name() != name.to_bytes() { - debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch"); - return Err(io::Error::from_raw_os_error(libc::ENOENT)); - } - // Let's check if someone else beat us to updating the inode tracker and // root_nodes map. This avoids locking inode_tracker for writing. - if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) { + if let Some(ino) = self.root_nodes.read().get(name) { return Ok(( *ino, self.inode_tracker.read().get(*ino).expect("must exist"), @@ -287,9 +284,9 @@ where // insert the (sparse) inode data and register in // self.root_nodes. - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&root_node); let ino = inode_tracker.put(inode_data.clone()); - root_nodes.insert(name, ino); + root_nodes.insert(name.to_owned(), ino); Ok((ino, Arc::new(inode_data))) } @@ -303,10 +300,22 @@ const ROOT_NODES_BUFFER_SIZE: usize = 16; const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest"; const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest"; +#[cfg(all(feature = "virtiofs", target_os = "linux"))] +impl<BS, DS, RN> fuse_backend_rs::api::filesystem::Layer for TvixStoreFs<BS, DS, RN> +where + BS: BlobService + Clone + Send + 'static, + DS: DirectoryService + Send + Clone + 'static, + RN: RootNodes + Clone + 'static, +{ + fn root_inode(&self) -> Self::Inode { + ROOT_ID + } +} + impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN> where - BS: AsRef<dyn BlobService> + Clone + Send + 'static, - DS: AsRef<dyn DirectoryService> + Send + Clone + 'static, + BS: BlobService + Clone + Send + 'static, + DS: DirectoryService + Send + Clone + 'static, RN: RootNodes + Clone + 'static, { type Handle = u64; @@ -345,13 +354,17 @@ where ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> { debug!("lookup"); + // convert the CStr to a PathComponent + // If it can't be converted, we definitely don't have anything here. + let name: PathComponent = name.try_into().map_err(|_| std::io::ErrorKind::NotFound)?; + // This goes from a parent inode to a node. // - If the parent is [ROOT_ID], we need to check // [self.root_nodes] (fetching from a [RootNode] provider if needed) // - Otherwise, lookup the parent in [self.inode_tracker] (which must be // a [InodeData::Directory]), and find the child with that name. if parent == ROOT_ID { - let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?; + let (ino, inode_data) = self.name_in_root_to_ino_and_data(&name)?; debug!(inode_data=?&inode_data, ino=ino, "Some"); return Ok(inode_data.as_fuse_entry(ino)); @@ -364,7 +377,7 @@ where // Search for that name in the list of children and return the FileAttrs. // in the children, find the one with the desired name. - if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + if let Some((child_ino, _, _)) = children.iter().find(|(_, n, _)| n == &name) { // lookup the child [InodeData] in [self.inode_tracker]. // We know the inodes for children have already been allocated. let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap(); @@ -397,16 +410,20 @@ where // This task will run in the background immediately and will exit // after the stream ends or if we no longer want any more entries. - self.tokio_handle.spawn(async move { - let mut stream = root_nodes_provider.list().enumerate(); - while let Some(node) = stream.next().await { - if tx.send(node).await.is_err() { - // If we get a send error, it means the sync code - // doesn't want any more entries. - break; + self.tokio_handle.spawn( + async move { + let mut stream = root_nodes_provider.list().enumerate(); + while let Some(e) = stream.next().await { + if tx.send(e).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } } } - }); + // instrument the task with the current span, this is not done by default + .in_current_span(), + ); // Put the rx part into [self.dir_handles]. // TODO: this will overflow after 2**64 operations, @@ -459,12 +476,12 @@ where .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; while let Some((i, n)) = rx.blocking_recv() { - let root_node = n.map_err(|e| { + let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EIO) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -479,7 +496,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), })?; // If the buffer is full, add_entry will return `Ok(0)`. if written == 0 { @@ -493,15 +510,17 @@ where let (parent_digest, children) = self.get_directory_children(inode)?; Span::current().record("directory.digest", parent_digest.to_string()); - for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + for (i, (ino, child_name, child_node)) in + children.into_iter().skip(offset as usize).enumerate() + { + let inode_data = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: child_name.as_ref(), })?; // If the buffer is full, add_entry will return `Ok(0)`. if written == 0 { @@ -546,12 +565,12 @@ where .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; while let Some((i, n)) = rx.blocking_recv() { - let root_node = n.map_err(|e| { + let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EPERM) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -567,7 +586,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), }, inode_data.as_fuse_entry(ino), )?; @@ -583,8 +602,8 @@ where let (parent_digest, children) = self.get_directory_children(inode)?; Span::current().record("directory.digest", parent_digest.to_string()); - for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() { + let inode_data = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry( @@ -592,7 +611,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), }, inode_data.as_fuse_entry(ino), )?; @@ -637,6 +656,7 @@ where ) -> io::Result<( Option<Self::Handle>, fuse_backend_rs::api::filesystem::OpenOptions, + Option<u32>, )> { if inode == ROOT_ID { return Err(io::Error::from_raw_os_error(libc::ENOSYS)); @@ -655,7 +675,7 @@ where match self.tokio_handle.block_on({ let blob_service = self.blob_service.clone(); let blob_digest = blob_digest.clone(); - async move { blob_service.as_ref().open_read(&blob_digest).await } + async move { blob_service.open_read(&blob_digest).await } }) { Ok(None) => { warn!("blob not found"); @@ -680,6 +700,7 @@ where Ok(( Some(fh), fuse_backend_rs::api::filesystem::OpenOptions::empty(), + None, )) } } diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs index 6609e049a1fc..5ed1a4d8d6c0 100644 --- a/tvix/castore/src/fs/root_nodes.rs +++ b/tvix/castore/src/fs/root_nodes.rs @@ -1,7 +1,6 @@ use std::collections::BTreeMap; -use crate::{proto::node::Node, Error}; -use bytes::Bytes; +use crate::{path::PathComponent, Error, Node}; use futures::stream::BoxStream; use tonic::async_trait; @@ -11,11 +10,12 @@ use tonic::async_trait; pub trait RootNodes: Send + Sync { /// Looks up a root CA node based on the basename of the node in the root /// directory of the filesystem. - async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>; + async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error>; - /// Lists all root CA nodes in the filesystem. An error can be returned - /// in case listing is not allowed - fn list(&self) -> BoxStream<Result<Node, Error>>; + /// Lists all root CA nodes in the filesystem, as a tuple of (base)name + /// and Node. + /// An error can be returned in case listing is not allowed. + fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>>; } #[async_trait] @@ -23,15 +23,17 @@ pub trait RootNodes: Send + Sync { /// the key is the node name. impl<T> RootNodes for T where - T: AsRef<BTreeMap<Bytes, Node>> + Send + Sync, + T: AsRef<BTreeMap<PathComponent, Node>> + Send + Sync, { - async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> { + async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error> { Ok(self.as_ref().get(name).cloned()) } - fn list(&self) -> BoxStream<Result<Node, Error>> { + fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> { Box::pin(tokio_stream::iter( - self.as_ref().iter().map(|(_, v)| Ok(v.clone())), + self.as_ref() + .iter() + .map(|(name, node)| Ok((name.to_owned(), node.to_owned()))), )) } } diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index cd5b1290e031..4cbff687b864 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -13,7 +13,7 @@ use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::import::{ingest_entries, IngestionEntry, IngestionError}; -use crate::proto::node::Node; +use crate::Node; use super::blobs::{self, ConcurrentBlobUploader}; @@ -292,44 +292,43 @@ impl IngestionEntryGraph { #[cfg(test)] mod test { - use crate::import::IngestionEntry; - use crate::B3Digest; + use std::sync::LazyLock; use super::{Error, IngestionEntryGraph}; + use crate::import::IngestionEntry; + use crate::B3Digest; - use lazy_static::lazy_static; use rstest::rstest; - lazy_static! { - pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into(); - pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { - path: "a".parse().unwrap() - }; - pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { - path: "b".parse().unwrap() - }; - pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { - path: "a/b".parse().unwrap() - }; - pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular { - path: "a".parse().unwrap(), - size: 0, - executable: false, - digest: EMPTY_DIGEST.clone(), - }; - pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular { - path: "a/b".parse().unwrap(), - size: 0, - executable: false, - digest: EMPTY_DIGEST.clone(), - }; - pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular { - path: "a/b/c".parse().unwrap(), - size: 0, - executable: false, - digest: EMPTY_DIGEST.clone(), - }; - } + pub static EMPTY_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&[]).as_bytes().into()); + pub static DIR_A: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir { + path: "a".parse().unwrap(), + }); + pub static DIR_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir { + path: "b".parse().unwrap(), + }); + pub static DIR_A_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir { + path: "a/b".parse().unwrap(), + }); + pub static FILE_A: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular { + path: "a".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_DIGEST.clone(), + }); + pub static FILE_A_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular { + path: "a/b".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_DIGEST.clone(), + }); + pub static FILE_A_B_C: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular { + path: "a/b/c".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_DIGEST.clone(), + }); #[rstest] #[case::implicit_directories(&[&*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])] diff --git a/tvix/castore/src/import/blobs.rs b/tvix/castore/src/import/blobs.rs index 8135d871d6c0..f71ee1e63768 100644 --- a/tvix/castore/src/import/blobs.rs +++ b/tvix/castore/src/import/blobs.rs @@ -28,6 +28,9 @@ pub enum Error { #[error("unable to read blob contents for {0}: {1}")] BlobRead(PathBuf, std::io::Error), + #[error("unable to check whether blob at {0} already exists: {1}")] + BlobCheck(PathBuf, std::io::Error), + // FUTUREWORK: proper error for blob finalize #[error("unable to finalize blob {0}: {1}")] BlobFinalize(PathBuf, std::io::Error), @@ -118,6 +121,16 @@ where let path = path.to_owned(); let r = Cursor::new(buffer); async move { + // We know the blob digest already, check it exists before sending it. + if blob_service + .has(&expected_digest) + .await + .map_err(|e| Error::BlobCheck(path.clone(), e))? + { + drop(permit); + return Ok(()); + } + let digest = upload_blob(&blob_service, &path, expected_size, r).await?; assert_eq!(digest, expected_digest, "Tvix bug: blob digest mismatch"); diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index dc7821b8101e..78eac6f6128d 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -8,7 +8,9 @@ use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; use tokio::io::BufReader; use tokio_util::io::InspectReader; +use tracing::info_span; use tracing::instrument; +use tracing::Instrument; use tracing::Span; use tracing_indicatif::span_ext::IndicatifSpanExt; use walkdir::DirEntry; @@ -16,8 +18,8 @@ use walkdir::WalkDir; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; -use crate::proto::node::Node; -use crate::B3Digest; +use crate::refscan::{ReferenceReader, ReferenceScanner}; +use crate::{B3Digest, Node}; use super::ingest_entries; use super::IngestionEntry; @@ -30,20 +32,24 @@ use super::IngestionError; /// /// This function will walk the filesystem using `walkdir` and will consume /// `O(#number of entries)` space. -#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)] -pub async fn ingest_path<BS, DS, P>( +#[instrument( + skip(blob_service, directory_service, reference_scanner), + fields(path), + err +)] +pub async fn ingest_path<BS, DS, P, P2>( blob_service: BS, directory_service: DS, path: P, + reference_scanner: Option<&ReferenceScanner<P2>>, ) -> Result<Node, IngestionError<Error>> where P: AsRef<std::path::Path> + std::fmt::Debug, BS: BlobService + Clone, DS: DirectoryService, + P2: AsRef<[u8]> + Send + Sync, { let span = Span::current(); - span.pb_set_message(&format!("Ingesting {:?}", path)); - span.pb_start(); let iter = WalkDir::new(path.as_ref()) .follow_links(false) @@ -51,7 +57,8 @@ where .contents_first(true) .into_iter(); - let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref()); + let entries = + dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner); ingest_entries( directory_service, entries.inspect({ @@ -72,14 +79,16 @@ where /// The produced stream is buffered, so uploads can happen concurrently. /// /// The root is the [Path] in the filesystem that is being ingested into the castore. -pub fn dir_entries_to_ingestion_stream<'a, BS, I>( +pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>( blob_service: BS, iter: I, root: &'a std::path::Path, + reference_scanner: Option<&'a ReferenceScanner<P>>, ) -> BoxStream<'a, Result<IngestionEntry, Error>> where BS: BlobService + Clone + 'a, I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a, + P: AsRef<[u8]> + Send + Sync, { let prefix = root.parent().unwrap_or_else(|| std::path::Path::new("")); @@ -90,7 +99,13 @@ where async move { match x { Ok(dir_entry) => { - dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await + dir_entry_to_ingestion_entry( + blob_service, + &dir_entry, + prefix, + reference_scanner, + ) + .await } Err(e) => Err(Error::Stat( prefix.to_path_buf(), @@ -108,13 +123,15 @@ where /// /// The prefix path is stripped from the path of each entry. This is usually the parent path /// of the path being ingested so that the last element of the stream only has one component. -pub async fn dir_entry_to_ingestion_entry<BS>( +pub async fn dir_entry_to_ingestion_entry<BS, P>( blob_service: BS, entry: &DirEntry, prefix: &std::path::Path, + reference_scanner: Option<&ReferenceScanner<P>>, ) -> Result<IngestionEntry, Error> where BS: BlobService, + P: AsRef<[u8]>, { let file_type = entry.file_type(); @@ -135,13 +152,25 @@ where .into_os_string() .into_vec(); + if let Some(reference_scanner) = &reference_scanner { + reference_scanner.scan(&target); + } + Ok(IngestionEntry::Symlink { path, target }) } else if file_type.is_file() { let metadata = entry .metadata() .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?; - let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?; + let digest = upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner) + .instrument({ + let span = info_span!("upload_blob", "indicatif.pb_show" = tracing::field::Empty); + span.pb_set_message(&format!("Uploading blob for {:?}", fs_path)); + span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); + + span + }) + .await?; Ok(IngestionEntry::Regular { path, @@ -157,17 +186,17 @@ where } /// Uploads the file at the provided [Path] the the [BlobService]. -#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)] -async fn upload_blob<BS>( +#[instrument(skip(blob_service, reference_scanner), fields(path), err)] +async fn upload_blob<BS, P>( blob_service: BS, path: impl AsRef<std::path::Path>, + reference_scanner: Option<&ReferenceScanner<P>>, ) -> Result<B3Digest, Error> where BS: BlobService, + P: AsRef<[u8]>, { let span = Span::current(); - span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); - span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref())); span.pb_start(); let file = tokio::fs::File::open(path.as_ref()) @@ -185,9 +214,16 @@ where }); let mut writer = blob_service.open_write().await; - tokio::io::copy(&mut BufReader::new(reader), &mut writer) - .await - .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; + if let Some(reference_scanner) = reference_scanner { + let mut reader = ReferenceReader::new(reference_scanner, BufReader::new(reader)); + tokio::io::copy(&mut reader, &mut writer) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; + } else { + tokio::io::copy(&mut BufReader::new(reader), &mut writer) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; + } let digest = writer .close() diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index a9ac0be6b064..6e10a64939a4 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -4,15 +4,9 @@ //! Specific implementations, such as ingesting from the filesystem, live in //! child modules. -use crate::directoryservice::DirectoryPutter; -use crate::directoryservice::DirectoryService; +use crate::directoryservice::{DirectoryPutter, DirectoryService}; use crate::path::{Path, PathBuf}; -use crate::proto::node::Node; -use crate::proto::Directory; -use crate::proto::DirectoryNode; -use crate::proto::FileNode; -use crate::proto::SymlinkNode; -use crate::B3Digest; +use crate::{B3Digest, Directory, Node, SymlinkTargetError}; use futures::{Stream, StreamExt}; use tracing::Level; @@ -65,14 +59,6 @@ where // we break the loop manually. .expect("Tvix bug: unexpected end of stream")?; - let name = entry - .path() - .file_name() - // If this is the root node, it will have an empty name. - .unwrap_or_default() - .to_owned() - .into(); - let node = match &mut entry { IngestionEntry::Dir { .. } => { // If the entry is a directory, we traversed all its children (and @@ -98,27 +84,31 @@ where IngestionError::UploadDirectoryError(entry.path().to_owned(), e) })?; - Node::Directory(DirectoryNode { - name, - digest: directory_digest.into(), + Node::Directory { + digest: directory_digest, size: directory_size, - }) + } } - IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode { - name, - target: target.to_owned().into(), - }), + IngestionEntry::Symlink { ref target, .. } => Node::Symlink { + target: bytes::Bytes::copy_from_slice(target).try_into().map_err( + |e: SymlinkTargetError| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(format!("invalid symlink target: {}", e)), + ) + }, + )?, + }, IngestionEntry::Regular { size, executable, digest, .. - } => Node::File(FileNode { - name, - digest: digest.to_owned().into(), + } => Node::File { + digest: digest.clone(), size: *size, executable: *executable, - }), + }, }; let parent = entry @@ -129,8 +119,24 @@ where if parent == crate::Path::ROOT { break node; } else { + let name = entry + .path() + .file_name() + // If this is the root node, it will have an empty name. + .unwrap_or_else(|| "".try_into().unwrap()) + .to_owned(); + // record node in parent directory, creating a new [Directory] if not there yet. - directories.entry(parent.to_owned()).or_default().add(node); + directories + .entry(parent.to_owned()) + .or_default() + .add(name, node) + .map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?; } }; @@ -155,15 +161,8 @@ where #[cfg(debug_assertions)] { - if let Node::Directory(directory_node) = &root_node { - debug_assert_eq!( - root_directory_digest, - directory_node - .digest - .to_vec() - .try_into() - .expect("invalid digest len") - ) + if let Node::Directory { digest, .. } = &root_node { + debug_assert_eq!(&root_directory_digest, digest); } else { unreachable!("Tvix bug: directory putter initialized but no root directory node"); } @@ -209,9 +208,8 @@ mod test { use rstest::rstest; use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST}; - use crate::proto::node::Node; - use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode}; use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST}; + use crate::{Directory, Node}; use super::ingest_entries; use super::IngestionEntry; @@ -223,18 +221,18 @@ mod test { executable: true, digest: DUMMY_DIGEST.clone(), }], - Node::File(FileNode { name: "foo".into(), digest: DUMMY_DIGEST.clone().into(), size: 42, executable: true } - ))] + Node::File{digest: DUMMY_DIGEST.clone(), size: 42, executable: true} + )] #[case::single_symlink(vec![IngestionEntry::Symlink { path: "foo".parse().unwrap(), target: b"blub".into(), }], - Node::Symlink(SymlinkNode { name: "foo".into(), target: "blub".into()}) + Node::Symlink{target: "blub".try_into().unwrap()} )] #[case::single_dir(vec![IngestionEntry::Dir { path: "foo".parse().unwrap(), }], - Node::Directory(DirectoryNode { name: "foo".into(), digest: Directory::default().digest().into(), size: Directory::default().size()}) + Node::Directory{digest: Directory::default().digest(), size: Directory::default().size()} )] #[case::dir_with_keep(vec![ IngestionEntry::Regular { @@ -247,7 +245,7 @@ mod test { path: "foo".parse().unwrap(), }, ], - Node::Directory(DirectoryNode { name: "foo".into(), digest: DIRECTORY_WITH_KEEP.digest().into(), size: DIRECTORY_WITH_KEEP.size() }) + Node::Directory{ digest: DIRECTORY_WITH_KEEP.digest(), size: DIRECTORY_WITH_KEEP.size()} )] /// This is intentionally a bit unsorted, though it still satisfies all /// requirements we have on the order of elements in the stream. @@ -275,7 +273,7 @@ mod test { path: "blub".parse().unwrap(), }, ], - Node::Directory(DirectoryNode { name: "blub".into(), digest: DIRECTORY_COMPLICATED.digest().into(), size:DIRECTORY_COMPLICATED.size() }) + Node::Directory{ digest: DIRECTORY_COMPLICATED.digest(), size: DIRECTORY_COMPLICATED.size() } )] #[tokio::test] async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) { diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index bdc533a8c5e6..93d06fd4582d 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -3,21 +3,26 @@ mod errors; mod hashing_reader; pub mod blobservice; +pub mod composition; pub mod directoryservice; pub mod fixtures; +pub mod refscan; #[cfg(feature = "fs")] pub mod fs; +mod nodes; +pub use nodes::*; + mod path; -pub use path::{Path, PathBuf}; +pub use path::{Path, PathBuf, PathComponent, PathComponentError}; pub mod import; pub mod proto; pub mod tonic; pub use digests::{B3Digest, B3_LEN}; -pub use errors::Error; +pub use errors::{DirectoryError, Error, ValidateNodeError}; pub use hashing_reader::{B3HashingReader, HashingReader}; #[cfg(test)] diff --git a/tvix/castore/src/nodes/directory.rs b/tvix/castore/src/nodes/directory.rs new file mode 100644 index 000000000000..f80e055dde80 --- /dev/null +++ b/tvix/castore/src/nodes/directory.rs @@ -0,0 +1,287 @@ +use std::collections::btree_map::{self, BTreeMap}; + +use crate::{errors::DirectoryError, path::PathComponent, proto, B3Digest, Node}; + +/// A Directory contains nodes, which can be Directory, File or Symlink nodes. +/// It attaches names to these nodes, which is the basename in that directory. +/// These names: +/// - MUST not contain slashes or null bytes +/// - MUST not be '.' or '..' +/// - MUST be unique across all three lists +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct Directory { + nodes: BTreeMap<PathComponent, Node>, +} + +impl Directory { + /// Constructs a new, empty Directory. + pub fn new() -> Self { + Directory { + nodes: BTreeMap::new(), + } + } + + /// Construct a [Directory] from tuples of name and [Node]. + /// + /// Inserting multiple elements with the same name will yield an error, as + /// well as exceeding the maximum size. + pub fn try_from_iter<T: IntoIterator<Item = (PathComponent, Node)>>( + iter: T, + ) -> Result<Directory, DirectoryError> { + let mut nodes = BTreeMap::new(); + + iter.into_iter().try_fold(0u64, |size, (name, node)| { + check_insert_node(size, &mut nodes, name, node) + })?; + + Ok(Self { nodes }) + } + + /// The size of a directory is the number of all regular and symlink elements, + /// the number of directory elements, and their size fields. + pub fn size(&self) -> u64 { + // It's impossible to create a Directory where the size overflows, because we + // check before every add() that the size won't overflow. + (self.nodes.len() as u64) + + self + .nodes() + .map(|(_name, n)| match n { + Node::Directory { size, .. } => 1 + size, + Node::File { .. } | Node::Symlink { .. } => 1, + }) + .sum::<u64>() + } + + /// Calculates the digest of a Directory, which is the blake3 hash of a + /// Directory protobuf message, serialized in protobuf canonical form. + pub fn digest(&self) -> B3Digest { + proto::Directory::from(self.clone()).digest() + } + + /// Allows iterating over all nodes (directories, files and symlinks) + /// For each, it returns a tuple of its name and node. + /// The elements are sorted by their names. + pub fn nodes(&self) -> impl Iterator<Item = (&PathComponent, &Node)> + Send + Sync + '_ { + self.nodes.iter() + } + + /// Dissolves a Directory into its individual names and nodes. + /// The elements are sorted by their names. + pub fn into_nodes(self) -> impl Iterator<Item = (PathComponent, Node)> + Send + Sync { + self.nodes.into_iter() + } + + /// Adds the specified [Node] to the [Directory] with a given name. + /// + /// Inserting a node that already exists with the same name in the directory + /// will yield an error, as well as exceeding the maximum size. + /// + /// In case you want to construct a [Directory] from multiple elements, use + /// [from_iter] instead. + pub fn add(&mut self, name: PathComponent, node: Node) -> Result<(), DirectoryError> { + check_insert_node(self.size(), &mut self.nodes, name, node)?; + Ok(()) + } +} + +fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { + iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) +} + +/// Helper function dealing with inserting nodes into the nodes [BTreeMap], +/// after ensuring the new size doesn't overlow and the key doesn't exist already. +/// +/// Returns the new total size, or an error. +fn check_insert_node( + current_size: u64, + nodes: &mut BTreeMap<PathComponent, Node>, + name: PathComponent, + node: Node, +) -> Result<u64, DirectoryError> { + // Check that the even after adding this new directory entry, the size calculation will not + // overflow + let new_size = checked_sum([ + current_size, + 1, + match node { + Node::Directory { size, .. } => size, + _ => 0, + }, + ]) + .ok_or(DirectoryError::SizeOverflow)?; + + match nodes.entry(name) { + btree_map::Entry::Vacant(e) => { + e.insert(node); + } + btree_map::Entry::Occupied(occupied) => { + return Err(DirectoryError::DuplicateName(occupied.key().to_owned())) + } + } + + Ok(new_size) +} + +#[cfg(test)] +mod test { + use super::{Directory, Node}; + use crate::fixtures::DUMMY_DIGEST; + use crate::{DirectoryError, PathComponent}; + + #[test] + fn from_iter_single() { + Directory::try_from_iter([( + PathComponent::try_from("b").unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + )]) + .unwrap(); + } + + #[test] + fn from_iter_multiple() { + let d = Directory::try_from_iter([ + ( + "b".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ), + ( + "a".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ), + ( + "z".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ), + ( + "f".try_into().unwrap(), + Node::File { + digest: DUMMY_DIGEST.clone(), + size: 1, + executable: true, + }, + ), + ( + "c".try_into().unwrap(), + Node::File { + digest: DUMMY_DIGEST.clone(), + size: 1, + executable: true, + }, + ), + ( + "g".try_into().unwrap(), + Node::File { + digest: DUMMY_DIGEST.clone(), + size: 1, + executable: true, + }, + ), + ( + "t".try_into().unwrap(), + Node::Symlink { + target: "a".try_into().unwrap(), + }, + ), + ( + "o".try_into().unwrap(), + Node::Symlink { + target: "a".try_into().unwrap(), + }, + ), + ( + "e".try_into().unwrap(), + Node::Symlink { + target: "a".try_into().unwrap(), + }, + ), + ]) + .unwrap(); + + // Convert to proto struct and back to ensure we are not generating any invalid structures + crate::Directory::try_from(crate::proto::Directory::from(d)) + .expect("directory should be valid"); + } + + #[test] + fn add_nodes_to_directory() { + let mut d = Directory::new(); + + d.add( + "b".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ) + .unwrap(); + d.add( + "a".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ) + .unwrap(); + + // Convert to proto struct and back to ensure we are not generating any invalid structures + crate::Directory::try_from(crate::proto::Directory::from(d)) + .expect("directory should be valid"); + } + + #[test] + fn validate_overflow() { + let mut d = Directory::new(); + + assert_eq!( + d.add( + "foo".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: u64::MAX + } + ), + Err(DirectoryError::SizeOverflow) + ); + } + + #[test] + fn add_duplicate_node_to_directory() { + let mut d = Directory::new(); + + d.add( + "a".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ) + .unwrap(); + assert_eq!( + format!( + "{}", + d.add( + "a".try_into().unwrap(), + Node::File { + digest: DUMMY_DIGEST.clone(), + size: 1, + executable: true + } + ) + .expect_err("adding duplicate dir entry must fail") + ), + "\"a\" is a duplicate name" + ); + } +} diff --git a/tvix/castore/src/nodes/mod.rs b/tvix/castore/src/nodes/mod.rs new file mode 100644 index 000000000000..ac7aa1e666df --- /dev/null +++ b/tvix/castore/src/nodes/mod.rs @@ -0,0 +1,48 @@ +//! This holds types describing nodes in the tvix-castore model. +mod directory; +mod symlink_target; + +use crate::B3Digest; +pub use directory::Directory; +pub use symlink_target::{SymlinkTarget, SymlinkTargetError}; + +/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode]. +/// Nodes themselves don't have names, what gives them names is either them +/// being inside a [Directory], or a root node with its own name attached to it. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Node { + /// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest]. + /// It also records a`size`. + /// Such a node is either an element in the [Directory] it itself is contained in, + /// or a standalone root node. + Directory { + /// The blake3 hash of a Directory message, serialized in protobuf canonical form. + digest: B3Digest, + /// Number of child elements in the Directory referred to by `digest`. + /// Calculated by summing up the numbers of nodes, and for each directory, + /// its size field. Can be used for inode allocation. + /// This field is precisely as verifiable as any other Merkle tree edge. + /// Resolve `digest`, and you can compute it incrementally. Resolve the entire + /// tree, and you can fully compute it from scratch. + /// A credulous implementation won't reject an excessive size, but this is + /// harmless: you'll have some ordinals without nodes. Undersizing is obvious + /// and easy to reject: you won't have an ordinal for some nodes. + size: u64, + }, + /// A FileNode represents a regular or executable file in a Directory or at the root. + File { + /// The blake3 digest of the file contents + digest: B3Digest, + + /// The file content size + size: u64, + + /// Whether the file is executable + executable: bool, + }, + /// A SymlinkNode represents a symbolic link in a Directory or at the root. + Symlink { + /// The target of the symlink. + target: SymlinkTarget, + }, +} diff --git a/tvix/castore/src/nodes/symlink_target.rs b/tvix/castore/src/nodes/symlink_target.rs new file mode 100644 index 000000000000..e9a1a0bd05c2 --- /dev/null +++ b/tvix/castore/src/nodes/symlink_target.rs @@ -0,0 +1,223 @@ +use bstr::ByteSlice; +use std::fmt::{self, Debug, Display}; + +/// A wrapper type for symlink targets. +/// Internally uses a [bytes::Bytes], but disallows empty targets and those +/// containing null bytes. +#[repr(transparent)] +#[derive(Clone, PartialEq, Eq)] +pub struct SymlinkTarget { + inner: bytes::Bytes, +} + +/// The maximum length a symlink target can have. +/// Linux allows 4095 bytes here. +pub const MAX_TARGET_LEN: usize = 4095; + +impl AsRef<[u8]> for SymlinkTarget { + fn as_ref(&self) -> &[u8] { + self.inner.as_ref() + } +} + +impl From<SymlinkTarget> for bytes::Bytes { + fn from(value: SymlinkTarget) -> Self { + value.inner + } +} + +fn validate_symlink_target<B: AsRef<[u8]>>(symlink_target: B) -> Result<B, SymlinkTargetError> { + let v = symlink_target.as_ref(); + + if v.is_empty() { + return Err(SymlinkTargetError::Empty); + } + if v.len() > MAX_TARGET_LEN { + return Err(SymlinkTargetError::TooLong); + } + if v.contains(&0x00) { + return Err(SymlinkTargetError::Null); + } + + Ok(symlink_target) +} + +impl TryFrom<bytes::Bytes> for SymlinkTarget { + type Error = SymlinkTargetError; + + fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> { + if let Err(e) = validate_symlink_target(&value) { + return Err(SymlinkTargetError::Convert(value, Box::new(e))); + } + + Ok(Self { inner: value }) + } +} + +impl TryFrom<&'static [u8]> for SymlinkTarget { + type Error = SymlinkTargetError; + + fn try_from(value: &'static [u8]) -> Result<Self, Self::Error> { + if let Err(e) = validate_symlink_target(&value) { + return Err(SymlinkTargetError::Convert(value.into(), Box::new(e))); + } + + Ok(Self { + inner: bytes::Bytes::from_static(value), + }) + } +} + +impl TryFrom<&str> for SymlinkTarget { + type Error = SymlinkTargetError; + + fn try_from(value: &str) -> Result<Self, Self::Error> { + if let Err(e) = validate_symlink_target(value) { + return Err(SymlinkTargetError::Convert( + value.to_owned().into(), + Box::new(e), + )); + } + + Ok(Self { + inner: bytes::Bytes::copy_from_slice(value.as_bytes()), + }) + } +} + +impl Debug for SymlinkTarget { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Debug::fmt(self.inner.as_bstr(), f) + } +} + +impl Display for SymlinkTarget { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Display::fmt(self.inner.as_bstr(), f) + } +} + +/// Errors created when constructing / converting to [SymlinkTarget]. +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +#[cfg_attr(test, derive(Clone))] +pub enum SymlinkTargetError { + #[error("cannot be empty")] + Empty, + #[error("cannot contain null bytes")] + Null, + #[error("cannot be over {} bytes long", MAX_TARGET_LEN)] + TooLong, + #[error("unable to convert '{:?}", .0.as_bstr())] + Convert(bytes::Bytes, Box<Self>), +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use rstest::rstest; + + use super::validate_symlink_target; + use super::{SymlinkTarget, SymlinkTargetError}; + + #[rstest] + #[case::empty(b"", SymlinkTargetError::Empty)] + #[case::null(b"foo\0", SymlinkTargetError::Null)] + fn errors(#[case] v: &'static [u8], #[case] err: SymlinkTargetError) { + { + assert_eq!( + Err(err.clone()), + validate_symlink_target(v), + "validate_symlink_target must fail as expected" + ); + } + + let exp_err_v = Bytes::from_static(v); + + // Bytes + { + let v = Bytes::from_static(v); + assert_eq!( + Err(SymlinkTargetError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + SymlinkTarget::try_from(v), + "conversion must fail as expected" + ); + } + // &[u8] + { + assert_eq!( + Err(SymlinkTargetError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + SymlinkTarget::try_from(v), + "conversion must fail as expected" + ); + } + // &str, if this is valid UTF-8 + { + if let Ok(v) = std::str::from_utf8(v) { + assert_eq!( + Err(SymlinkTargetError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + SymlinkTarget::try_from(v), + "conversion must fail as expected" + ); + } + } + } + + #[test] + fn error_toolong() { + assert_eq!( + Err(SymlinkTargetError::TooLong), + validate_symlink_target("X".repeat(5000).into_bytes().as_slice()) + ) + } + + #[rstest] + #[case::boring(b"aa")] + #[case::dot(b".")] + #[case::dotsandslashes(b"./..")] + #[case::dotdot(b"..")] + #[case::slashes(b"a/b")] + #[case::slashes_and_absolute(b"/a/b")] + #[case::invalid_utf8(b"\xc5\xc4\xd6")] + fn success(#[case] v: &'static [u8]) { + let exp = SymlinkTarget { inner: v.into() }; + + // Bytes + { + let v: Bytes = v.into(); + assert_eq!( + Ok(exp.clone()), + SymlinkTarget::try_from(v), + "conversion must succeed" + ) + } + + // &[u8] + { + assert_eq!( + Ok(exp.clone()), + SymlinkTarget::try_from(v), + "conversion must succeed" + ) + } + + // &str, if this is valid UTF-8 + { + if let Ok(v) = std::str::from_utf8(v) { + assert_eq!( + Ok(exp.clone()), + SymlinkTarget::try_from(v), + "conversion must succeed" + ) + } + } + } +} diff --git a/tvix/castore/src/path/component.rs b/tvix/castore/src/path/component.rs new file mode 100644 index 000000000000..78aca03c50fe --- /dev/null +++ b/tvix/castore/src/path/component.rs @@ -0,0 +1,268 @@ +use bstr::ByteSlice; +use std::fmt::{self, Debug, Display}; + +/// A wrapper type for validated path components in the castore model. +/// Internally uses a [bytes::Bytes], but disallows +/// slashes, and null bytes to be present, as well as +/// '.', '..' and the empty string. +/// It also rejects components that are too long (> 255 bytes). +#[repr(transparent)] +#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct PathComponent { + pub(super) inner: bytes::Bytes, +} + +/// The maximum length an individual path component can have. +/// Linux allows 255 bytes of actual name, so we pick that. +pub const MAX_NAME_LEN: usize = 255; + +impl AsRef<[u8]> for PathComponent { + fn as_ref(&self) -> &[u8] { + self.inner.as_ref() + } +} + +impl From<PathComponent> for bytes::Bytes { + fn from(value: PathComponent) -> Self { + value.inner + } +} + +pub(super) fn validate_name<B: AsRef<[u8]>>(name: B) -> Result<(), PathComponentError> { + match name.as_ref() { + b"" => Err(PathComponentError::Empty), + b".." => Err(PathComponentError::Parent), + b"." => Err(PathComponentError::CurDir), + v if v.len() > MAX_NAME_LEN => Err(PathComponentError::TooLong), + v if v.contains(&0x00) => Err(PathComponentError::Null), + v if v.contains(&b'/') => Err(PathComponentError::Slashes), + _ => Ok(()), + } +} + +impl TryFrom<bytes::Bytes> for PathComponent { + type Error = PathComponentError; + + fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> { + if let Err(e) = validate_name(&value) { + return Err(PathComponentError::Convert(value, Box::new(e))); + } + + Ok(Self { inner: value }) + } +} + +impl TryFrom<&'static [u8]> for PathComponent { + type Error = PathComponentError; + + fn try_from(value: &'static [u8]) -> Result<Self, Self::Error> { + if let Err(e) = validate_name(value) { + return Err(PathComponentError::Convert(value.into(), Box::new(e))); + } + + Ok(Self { + inner: bytes::Bytes::from_static(value), + }) + } +} + +impl TryFrom<&str> for PathComponent { + type Error = PathComponentError; + + fn try_from(value: &str) -> Result<Self, Self::Error> { + if let Err(e) = validate_name(value) { + return Err(PathComponentError::Convert( + value.to_owned().into(), + Box::new(e), + )); + } + + Ok(Self { + inner: bytes::Bytes::copy_from_slice(value.as_bytes()), + }) + } +} + +impl TryFrom<&std::ffi::CStr> for PathComponent { + type Error = PathComponentError; + + fn try_from(value: &std::ffi::CStr) -> Result<Self, Self::Error> { + let value = value.to_bytes(); + if let Err(e) = validate_name(value) { + return Err(PathComponentError::Convert( + value.to_owned().into(), + Box::new(e), + )); + } + + Ok(Self { + inner: bytes::Bytes::copy_from_slice(value), + }) + } +} + +impl Debug for PathComponent { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Debug::fmt(self.inner.as_bstr(), f) + } +} + +impl Display for PathComponent { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Display::fmt(self.inner.as_bstr(), f) + } +} + +/// Errors created when parsing / validating [PathComponent]. +#[derive(Debug, PartialEq, thiserror::Error)] +#[cfg_attr(test, derive(Clone))] +pub enum PathComponentError { + #[error("cannot be empty")] + Empty, + #[error("cannot contain null bytes")] + Null, + #[error("cannot be '.'")] + CurDir, + #[error("cannot be '..'")] + Parent, + #[error("cannot contain slashes")] + Slashes, + #[error("cannot be over {} bytes long", MAX_NAME_LEN)] + TooLong, + #[error("unable to convert '{:?}'", .0.as_bstr())] + Convert(bytes::Bytes, #[source] Box<Self>), +} + +#[cfg(test)] +mod tests { + use std::ffi::CString; + + use bytes::Bytes; + use rstest::rstest; + + use super::{validate_name, PathComponent, PathComponentError}; + + #[rstest] + #[case::empty(b"", PathComponentError::Empty)] + #[case::null(b"foo\0", PathComponentError::Null)] + #[case::curdir(b".", PathComponentError::CurDir)] + #[case::parent(b"..", PathComponentError::Parent)] + #[case::slashes1(b"a/b", PathComponentError::Slashes)] + #[case::slashes2(b"/", PathComponentError::Slashes)] + fn errors(#[case] v: &'static [u8], #[case] err: PathComponentError) { + { + assert_eq!( + Err(err.clone()), + validate_name(v), + "validate_name must fail as expected" + ); + } + + let exp_err_v = Bytes::from_static(v); + + // Bytes + { + let v = Bytes::from_static(v); + assert_eq!( + Err(PathComponentError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + PathComponent::try_from(v), + "conversion must fail as expected" + ); + } + // &[u8] + { + assert_eq!( + Err(PathComponentError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + PathComponent::try_from(v), + "conversion must fail as expected" + ); + } + // &str, if it is valid UTF-8 + { + if let Ok(v) = std::str::from_utf8(v) { + assert_eq!( + Err(PathComponentError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + PathComponent::try_from(v), + "conversion must fail as expected" + ); + } + } + // &CStr, if it can be constructed (fails if the payload contains null bytes) + { + if let Ok(v) = CString::new(v) { + let v = v.as_ref(); + assert_eq!( + Err(PathComponentError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + PathComponent::try_from(v), + "conversion must fail as expected" + ); + } + } + } + + #[test] + fn error_toolong() { + assert_eq!( + Err(PathComponentError::TooLong), + validate_name("X".repeat(500).into_bytes().as_slice()) + ) + } + + #[test] + fn success() { + let exp = PathComponent { inner: "aa".into() }; + + // Bytes + { + let v: Bytes = "aa".into(); + assert_eq!( + Ok(exp.clone()), + PathComponent::try_from(v), + "conversion must succeed" + ); + } + + // &[u8] + { + let v: &[u8] = b"aa"; + assert_eq!( + Ok(exp.clone()), + PathComponent::try_from(v), + "conversion must succeed" + ); + } + + // &str + { + let v: &str = "aa"; + assert_eq!( + Ok(exp.clone()), + PathComponent::try_from(v), + "conversion must succeed" + ); + } + + // &CStr + { + let v = CString::new("aa").expect("CString must construct"); + let v = v.as_c_str(); + assert_eq!( + Ok(exp.clone()), + PathComponent::try_from(v), + "conversion must succeed" + ); + } + } +} diff --git a/tvix/castore/src/path.rs b/tvix/castore/src/path/mod.rs index fcc2bd01fbd6..15f31a570da9 100644 --- a/tvix/castore/src/path.rs +++ b/tvix/castore/src/path/mod.rs @@ -1,5 +1,5 @@ //! Contains data structures to deal with Paths in the tvix-castore model. - +use bstr::ByteSlice; use std::{ borrow::Borrow, fmt::{self, Debug, Display}, @@ -8,9 +8,8 @@ use std::{ str::FromStr, }; -use bstr::ByteSlice; - -use crate::proto::validate_node_name; +mod component; +pub use component::{PathComponent, PathComponentError}; /// Represents a Path in the castore model. /// These are always relative, and platform-independent, which distinguishes @@ -38,7 +37,9 @@ impl Path { if !bytes.is_empty() { // Ensure all components are valid castore node names. for component in bytes.split_str(b"/") { - validate_node_name(component).ok()?; + if component::validate_name(component).is_err() { + return None; + } } } @@ -81,10 +82,26 @@ impl Path { Ok(v) } + /// Provides an iterator over the components of the path, + /// which are invividual [PathComponent]. + /// In case the path is empty, an empty iterator is returned. + pub fn components(&self) -> impl Iterator<Item = PathComponent> + '_ { + let mut iter = self.inner.split_str(&b"/"); + + // We don't want to return an empty element, consume it if it's the only one. + if self.inner.is_empty() { + let _ = iter.next(); + } + + iter.map(|b| PathComponent { + inner: bytes::Bytes::copy_from_slice(b), + }) + } + /// Produces an iterator over the components of the path, which are /// individual byte slices. /// In case the path is empty, an empty iterator is returned. - pub fn components(&self) -> impl Iterator<Item = &[u8]> { + pub fn components_bytes(&self) -> impl Iterator<Item = &[u8]> { let mut iter = self.inner.split_str(&b"/"); // We don't want to return an empty element, consume it if it's the only one. @@ -95,11 +112,16 @@ impl Path { iter } - /// Returns the final component of the Path, if there is one. - pub fn file_name(&self) -> Option<&[u8]> { + /// Returns the final component of the Path, if there is one, in bytes. + pub fn file_name(&self) -> Option<PathComponent> { self.components().last() } + /// Returns the final component of the Path, if there is one, in bytes. + pub fn file_name_bytes(&self) -> Option<&[u8]> { + self.components_bytes().last() + } + pub fn as_bytes(&self) -> &[u8] { &self.inner } @@ -211,7 +233,9 @@ impl PathBuf { /// Adjoins `name` to self. pub fn try_push(&mut self, name: &[u8]) -> Result<(), std::io::Error> { - validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?; + if component::validate_name(name).is_err() { + return Err(std::io::ErrorKind::InvalidData.into()); + } if !self.inner.is_empty() { self.inner.push(b'/'); @@ -329,7 +353,7 @@ mod test { assert_eq!(s.as_bytes(), p.as_bytes(), "inner bytes mismatch"); assert_eq!( num_components, - p.components().count(), + p.components_bytes().count(), "number of components mismatch" ); } @@ -396,10 +420,10 @@ mod test { #[case("a", vec!["a"])] #[case("a/b", vec!["a", "b"])] #[case("a/b/c", vec!["a","b", "c"])] - pub fn components(#[case] p: PathBuf, #[case] exp_components: Vec<&str>) { + pub fn components_bytes(#[case] p: PathBuf, #[case] exp_components: Vec<&str>) { assert_eq!( exp_components, - p.components() + p.components_bytes() .map(|x| x.to_str().unwrap()) .collect::<Vec<_>>() ); diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs index ce1d2bcd244a..62fdb34a25a0 100644 --- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -1,7 +1,5 @@ -use crate::directoryservice::DirectoryGraph; -use crate::directoryservice::LeavesToRootValidator; -use crate::proto; -use crate::{directoryservice::DirectoryService, B3Digest}; +use crate::directoryservice::{DirectoryGraph, DirectoryService, LeavesToRootValidator}; +use crate::{proto, B3Digest, DirectoryError}; use futures::stream::BoxStream; use futures::TryStreamExt; use std::ops::Deref; @@ -58,13 +56,16 @@ where Status::not_found(format!("directory {} not found", digest)) })?; - Box::pin(once(Ok(directory))) + Box::pin(once(Ok(directory.into()))) } else { // If recursive was requested, traverse via get_recursive. Box::pin( - self.directory_service.get_recursive(&digest).map_err(|e| { - tonic::Status::new(tonic::Code::Internal, e.to_string()) - }), + self.directory_service + .get_recursive(&digest) + .map_ok(proto::Directory::from) + .map_err(|e| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + }), ) } })) @@ -83,7 +84,9 @@ where let mut validator = DirectoryGraph::<LeavesToRootValidator>::default(); while let Some(directory) = req_inner.message().await? { validator - .add(directory) + .add(directory.try_into().map_err(|e: DirectoryError| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + })?) .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?; } diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs index 5374e3ae5a80..89c68a4ad97b 100644 --- a/tvix/castore/src/proto/mod.rs +++ b/tvix/castore/src/proto/mod.rs @@ -1,18 +1,14 @@ -#![allow(non_snake_case)] -// https://github.com/hyperium/tonic/issues/1056 -use bstr::ByteSlice; -use std::{collections::HashSet, iter::Peekable, str}; - use prost::Message; +use std::cmp::Ordering; + mod grpc_blobservice_wrapper; mod grpc_directoryservice_wrapper; +use crate::{path::PathComponent, B3Digest, DirectoryError}; pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; -use crate::{B3Digest, B3_LEN}; - tonic::include_proto!("tvix.castore.v1"); #[cfg(feature = "tonic-reflection")] @@ -24,38 +20,6 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix #[cfg(test)] mod tests; -/// Errors that can occur during the validation of [Directory] messages. -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum ValidateDirectoryError { - /// Elements are not in sorted order - #[error("{:?} is not sorted", .0.as_bstr())] - WrongSorting(Vec<u8>), - /// Multiple elements with the same name encountered - #[error("{:?} is a duplicate name", .0.as_bstr())] - DuplicateName(Vec<u8>), - /// Invalid node - #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())] - InvalidNode(Vec<u8>, ValidateNodeError), - #[error("Total size exceeds u32::MAX")] - SizeOverflow, -} - -/// Errors that occur during Node validation -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum ValidateNodeError { - #[error("No node set")] - NoNodeSet, - /// Invalid digest length encountered - #[error("Invalid Digest length: {0}")] - InvalidDigestLen(usize), - /// Invalid name encountered - #[error("Invalid name: {}", .0.as_bstr())] - InvalidName(Vec<u8>), - /// Invalid symlink target - #[error("Invalid symlink target: {}", .0.as_bstr())] - InvalidSymlinkTarget(Vec<u8>), -} - /// Errors that occur during StatBlobResponse validation #[derive(Debug, PartialEq, Eq, thiserror::Error)] pub enum ValidateStatBlobResponseError { @@ -64,184 +28,6 @@ pub enum ValidateStatBlobResponseError { InvalidDigestLen(usize, usize), } -/// Checks a Node name for validity as an intermediate node. -/// We disallow slashes, null bytes, '.', '..' and the empty string. -pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { - if name.is_empty() - || name == b".." - || name == b"." - || name.contains(&0x00) - || name.contains(&b'/') - { - Err(ValidateNodeError::InvalidName(name.to_owned())) - } else { - Ok(()) - } -} - -/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] -/// and [node::Node], so we can ask all of them for the name easily. -pub trait NamedNode { - fn get_name(&self) -> &[u8]; -} - -impl NamedNode for &FileNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for &DirectoryNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for &SymlinkNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for node::Node { - fn get_name(&self) -> &[u8] { - match self { - node::Node::File(node_file) => &node_file.name, - node::Node::Directory(node_directory) => &node_directory.name, - node::Node::Symlink(node_symlink) => &node_symlink.name, - } - } -} - -impl Node { - /// Ensures the node has a valid enum kind (is Some), and passes its - // per-enum validation. - pub fn validate(&self) -> Result<(), ValidateNodeError> { - if let Some(node) = self.node.as_ref() { - node.validate() - } else { - Err(ValidateNodeError::NoNodeSet) - } - } -} - -impl node::Node { - /// Returns the node with a new name. - pub fn rename(self, name: bytes::Bytes) -> Self { - match self { - node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }), - node::Node::File(n) => node::Node::File(FileNode { name, ..n }), - node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }), - } - } - - /// Ensures the node has a valid name, and checks the type-specific fields too. - pub fn validate(&self) -> Result<(), ValidateNodeError> { - match self { - // for a directory root node, ensure the digest has the appropriate size. - node::Node::Directory(directory_node) => { - if directory_node.digest.len() != B3_LEN { - Err(ValidateNodeError::InvalidDigestLen( - directory_node.digest.len(), - ))?; - } - validate_node_name(&directory_node.name) - } - // for a file root node, ensure the digest has the appropriate size. - node::Node::File(file_node) => { - if file_node.digest.len() != B3_LEN { - Err(ValidateNodeError::InvalidDigestLen(file_node.digest.len()))?; - } - validate_node_name(&file_node.name) - } - // ensure the symlink target is not empty and doesn't contain null bytes. - node::Node::Symlink(symlink_node) => { - if symlink_node.target.is_empty() || symlink_node.target.contains(&b'\0') { - Err(ValidateNodeError::InvalidSymlinkTarget( - symlink_node.target.to_vec(), - ))?; - } - validate_node_name(&symlink_node.name) - } - } - } -} - -impl PartialOrd for node::Node { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for node::Node { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for FileNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for FileNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for SymlinkNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for SymlinkNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for DirectoryNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for DirectoryNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -/// Accepts a name, and a mutable reference to the previous name. -/// If the passed name is larger than the previous one, the reference is updated. -/// If it's not, an error is returned. -fn update_if_lt_prev<'n>( - prev_name: &mut &'n [u8], - name: &'n [u8], -) -> Result<(), ValidateDirectoryError> { - if *name < **prev_name { - return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); - } - *prev_name = name; - Ok(()) -} - -/// Inserts the given name into a HashSet if it's not already in there. -/// If it is, an error is returned. -fn insert_once<'n>( - seen_names: &mut HashSet<&'n [u8]>, - name: &'n [u8], -) -> Result<(), ValidateDirectoryError> { - if seen_names.get(name).is_some() { - return Err(ValidateDirectoryError::DuplicateName(name.to_vec())); - } - seen_names.insert(name); - Ok(()) -} - fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) } @@ -278,117 +64,233 @@ impl Directory { .as_bytes() .into() } +} - /// validate checks the directory for invalid data, such as: - /// - violations of name restrictions - /// - invalid digest lengths - /// - not properly sorted lists - /// - duplicate names in the three lists - pub fn validate(&self) -> Result<(), ValidateDirectoryError> { - let mut seen_names: HashSet<&[u8]> = HashSet::new(); - - let mut last_directory_name: &[u8] = b""; - let mut last_file_name: &[u8] = b""; - let mut last_symlink_name: &[u8] = b""; - - // check directories - for directory_node in &self.directories { - node::Node::Directory(directory_node.clone()) - .validate() - .map_err(|e| { - ValidateDirectoryError::InvalidNode(directory_node.name.to_vec(), e) - })?; - - update_if_lt_prev(&mut last_directory_name, &directory_node.name)?; - insert_once(&mut seen_names, &directory_node.name)?; - } +impl TryFrom<Directory> for crate::Directory { + type Error = DirectoryError; + + fn try_from(value: Directory) -> Result<Self, Self::Error> { + // Check directories, files and symlinks are sorted + // We'll notice duplicates across all three fields when constructing the Directory. + // FUTUREWORK: use is_sorted() once stable, and/or implement the producer for + // [crate::Directory::try_from_iter] iterating over all three and doing all checks inline. + value + .directories + .iter() + .try_fold(&b""[..], |prev_name, e| { + match e.name.as_ref().cmp(prev_name) { + Ordering::Less => Err(DirectoryError::WrongSorting(e.name.to_owned())), + Ordering::Equal => Err(DirectoryError::DuplicateName( + e.name + .to_owned() + .try_into() + .map_err(DirectoryError::InvalidName)?, + )), + Ordering::Greater => Ok(e.name.as_ref()), + } + })?; + value.files.iter().try_fold(&b""[..], |prev_name, e| { + match e.name.as_ref().cmp(prev_name) { + Ordering::Less => Err(DirectoryError::WrongSorting(e.name.to_owned())), + Ordering::Equal => Err(DirectoryError::DuplicateName( + e.name + .to_owned() + .try_into() + .map_err(DirectoryError::InvalidName)?, + )), + Ordering::Greater => Ok(e.name.as_ref()), + } + })?; + value.symlinks.iter().try_fold(&b""[..], |prev_name, e| { + match e.name.as_ref().cmp(prev_name) { + Ordering::Less => Err(DirectoryError::WrongSorting(e.name.to_owned())), + Ordering::Equal => Err(DirectoryError::DuplicateName( + e.name + .to_owned() + .try_into() + .map_err(DirectoryError::InvalidName)?, + )), + Ordering::Greater => Ok(e.name.as_ref()), + } + })?; - // check files - for file_node in &self.files { - node::Node::File(file_node.clone()) - .validate() - .map_err(|e| ValidateDirectoryError::InvalidNode(file_node.name.to_vec(), e))?; + // FUTUREWORK: use is_sorted() once stable, and/or implement the producer for + // [crate::Directory::try_from_iter] iterating over all three and doing all checks inline. + let mut elems: Vec<(PathComponent, crate::Node)> = + Vec::with_capacity(value.directories.len() + value.files.len() + value.symlinks.len()); - update_if_lt_prev(&mut last_file_name, &file_node.name)?; - insert_once(&mut seen_names, &file_node.name)?; + for e in value.directories { + elems.push( + Node { + node: Some(node::Node::Directory(e)), + } + .try_into_name_and_node()?, + ); } - // check symlinks - for symlink_node in &self.symlinks { - node::Node::Symlink(symlink_node.clone()) - .validate() - .map_err(|e| ValidateDirectoryError::InvalidNode(symlink_node.name.to_vec(), e))?; + for e in value.files { + elems.push( + Node { + node: Some(node::Node::File(e)), + } + .try_into_name_and_node()?, + ) + } - update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?; - insert_once(&mut seen_names, &symlink_node.name)?; + for e in value.symlinks { + elems.push( + Node { + node: Some(node::Node::Symlink(e)), + } + .try_into_name_and_node()?, + ) } - self.size_checked() - .ok_or(ValidateDirectoryError::SizeOverflow)?; + crate::Directory::try_from_iter(elems) + } +} - Ok(()) +impl From<crate::Directory> for Directory { + fn from(value: crate::Directory) -> Self { + let mut directories = vec![]; + let mut files = vec![]; + let mut symlinks = vec![]; + + for (name, node) in value.into_nodes() { + match node { + crate::Node::File { + digest, + size, + executable, + } => files.push(FileNode { + name: name.into(), + digest: digest.into(), + size, + executable, + }), + crate::Node::Directory { digest, size } => directories.push(DirectoryNode { + name: name.into(), + digest: digest.into(), + size, + }), + crate::Node::Symlink { target } => { + symlinks.push(SymlinkNode { + name: name.into(), + target: target.into(), + }); + } + } + } + + Directory { + directories, + files, + symlinks, + } } +} - /// Allows iterating over all three nodes ([DirectoryNode], [FileNode], - /// [SymlinkNode]) in an ordered fashion, as long as the individual lists - /// are sorted (which can be checked by the [Directory::validate]). - pub fn nodes(&self) -> DirectoryNodesIterator { - return DirectoryNodesIterator { - i_directories: self.directories.iter().peekable(), - i_files: self.files.iter().peekable(), - i_symlinks: self.symlinks.iter().peekable(), - }; +impl Node { + /// Converts a proto [Node] to a [crate::Node], and splits off the name as a [PathComponent]. + pub fn try_into_name_and_node(self) -> Result<(PathComponent, crate::Node), DirectoryError> { + let (name_bytes, node) = self.try_into_unchecked_name_and_checked_node()?; + Ok(( + name_bytes.try_into().map_err(DirectoryError::InvalidName)?, + node, + )) } - /// Adds the specified [node::Node] to the [Directory], preserving sorted entries. - /// This assumes the [Directory] to be sorted prior to adding the node. - /// - /// Inserting an element that already exists with the same name in the directory is not - /// supported. - pub fn add(&mut self, node: node::Node) { - debug_assert!( - !self.files.iter().any(|x| x.get_name() == node.get_name()), - "name already exists in files" - ); - debug_assert!( - !self - .directories - .iter() - .any(|x| x.get_name() == node.get_name()), - "name already exists in directories" - ); - debug_assert!( - !self - .symlinks - .iter() - .any(|x| x.get_name() == node.get_name()), - "name already exists in symlinks" - ); - - match node { - node::Node::File(node) => { - let pos = self - .files - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.files.insert(pos, node); + /// Converts a proto [Node] to a [crate::Node], and splits off the name as a + /// [bytes::Bytes] without doing any checking of it. + fn try_into_unchecked_name_and_checked_node( + self, + ) -> Result<(bytes::Bytes, crate::Node), DirectoryError> { + match self.node.ok_or_else(|| DirectoryError::NoNodeSet)? { + node::Node::Directory(n) => { + let digest = B3Digest::try_from(n.digest) + .map_err(|e| DirectoryError::InvalidNode(n.name.clone(), e.into()))?; + + let node = crate::Node::Directory { + digest, + size: n.size, + }; + + Ok((n.name, node)) } - node::Node::Directory(node) => { - let pos = self - .directories - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.directories.insert(pos, node); + node::Node::File(n) => { + let digest = B3Digest::try_from(n.digest) + .map_err(|e| DirectoryError::InvalidNode(n.name.clone(), e.into()))?; + + let node = crate::Node::File { + digest, + size: n.size, + executable: n.executable, + }; + + Ok((n.name, node)) } - node::Node::Symlink(node) => { - let pos = self - .symlinks - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.symlinks.insert(pos, node); + + node::Node::Symlink(n) => { + let node = crate::Node::Symlink { + target: n.target.try_into().map_err(|e| { + DirectoryError::InvalidNode( + n.name.clone(), + crate::ValidateNodeError::InvalidSymlinkTarget(e), + ) + })?, + }; + + Ok((n.name, node)) } } } + + /// Converts a proto [Node] to a [crate::Node], and splits off the name and returns it as a + /// [bytes::Bytes]. + /// + /// The name must be empty. + pub fn try_into_anonymous_node(self) -> Result<crate::Node, DirectoryError> { + let (name, node) = Self::try_into_unchecked_name_and_checked_node(self)?; + + if !name.is_empty() { + return Err(DirectoryError::NameInAnonymousNode); + } + + Ok(node) + } + + /// Constructs a [Node] from a name and [crate::Node]. + /// The name is a [bytes::Bytes], not a [PathComponent], as we have use an + /// empty name in some places. + pub fn from_name_and_node(name: bytes::Bytes, n: crate::Node) -> Self { + match n { + crate::Node::Directory { digest, size } => Self { + node: Some(node::Node::Directory(DirectoryNode { + name, + digest: digest.into(), + size, + })), + }, + crate::Node::File { + digest, + size, + executable, + } => Self { + node: Some(node::Node::File(FileNode { + name, + digest: digest.into(), + size, + executable, + })), + }, + crate::Node::Symlink { target } => Self { + node: Some(node::Node::Symlink(SymlinkNode { + name, + target: target.into(), + })), + }, + } + } } impl StatBlobResponse { @@ -407,65 +309,3 @@ impl StatBlobResponse { Ok(()) } } - -/// Struct to hold the state of an iterator over all nodes of a Directory. -/// -/// Internally, this keeps peekable Iterators over all three lists of a -/// Directory message. -pub struct DirectoryNodesIterator<'a> { - // directory: &Directory, - i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>, - i_files: Peekable<std::slice::Iter<'a, FileNode>>, - i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>, -} - -/// looks at two elements implementing NamedNode, and returns true if "left -/// is smaller / comes first". -/// -/// Some(_) is preferred over None. -fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool { - match left { - // if left is None, right always wins - None => false, - Some(left_inner) => { - // left is Some. - match right { - // left is Some, right is None - left wins. - None => true, - Some(right_inner) => { - // both are Some - compare the name. - return left_inner.get_name() < right_inner.get_name(); - } - } - } - } -} - -impl Iterator for DirectoryNodesIterator<'_> { - type Item = node::Node; - - // next returns the next node in the Directory. - // we peek at all three internal iterators, and pick the one with the - // smallest name, to ensure lexicographical ordering. - // The individual lists are already known to be sorted. - fn next(&mut self) -> Option<Self::Item> { - if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) { - // i_directories is still in the game, compare with symlinks - if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) { - self.i_directories - .next() - .cloned() - .map(node::Node::Directory) - } else { - self.i_symlinks.next().cloned().map(node::Node::Symlink) - } - } else { - // i_files is still in the game, compare with symlinks - if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) { - self.i_files.next().cloned().map(node::Node::File) - } else { - self.i_symlinks.next().cloned().map(node::Node::Symlink) - } - } - } -} diff --git a/tvix/castore/src/proto/tests/directory.rs b/tvix/castore/src/proto/tests/directory.rs index 81b73a048d52..efbc4e9f2af1 100644 --- a/tvix/castore/src/proto/tests/directory.rs +++ b/tvix/castore/src/proto/tests/directory.rs @@ -1,7 +1,5 @@ -use crate::proto::{ - node, Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError, - ValidateNodeError, -}; +use crate::proto::{Directory, DirectoryError, DirectoryNode, FileNode, SymlinkNode}; +use crate::ValidateNodeError; use hex_literal::hex; @@ -149,7 +147,7 @@ fn digest() { #[test] fn validate_empty() { let d = Directory::default(); - assert_eq!(d.validate(), Ok(())); + assert!(crate::Directory::try_from(d).is_ok()); } #[test] @@ -157,18 +155,15 @@ fn validate_invalid_names() { { let d = Directory { directories: vec![DirectoryNode { - name: "".into(), + name: b"\0"[..].into(), digest: DUMMY_DIGEST.to_vec().into(), size: 42, }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { - assert_eq!(n, b"") - } - _ => panic!("unexpected error"), - }; + + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); } { @@ -180,12 +175,8 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { - assert_eq!(n, b".") - } - _ => panic!("unexpected error"), - }; + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); } { @@ -198,12 +189,8 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { - assert_eq!(n, b"..") - } - _ => panic!("unexpected error"), - }; + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); } { @@ -214,12 +201,8 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { - assert_eq!(n, b"\x00") - } - _ => panic!("unexpected error"), - }; + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); } { @@ -230,12 +213,20 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { - assert_eq!(n, b"foo/bar") - } - _ => panic!("unexpected error"), + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); + } + + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: bytes::Bytes::copy_from_slice("X".repeat(500).into_bytes().as_slice()), + target: "foo".into(), + }], + ..Default::default() }; + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); } } @@ -249,8 +240,8 @@ fn validate_invalid_digest() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => { + match crate::Directory::try_from(d).expect_err("must fail") { + DirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => { assert_eq!(n, 2) } _ => panic!("unexpected error"), @@ -276,15 +267,15 @@ fn validate_sorting() { ], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::WrongSorting(s) => { - assert_eq!(s, b"a"); + match crate::Directory::try_from(d).expect_err("must fail") { + DirectoryError::WrongSorting(s) => { + assert_eq!(s.as_ref(), b"a"); } _ => panic!("unexpected error"), } } - // "a" exists twice, bad. + // "a" exists twice (same types), bad. { let d = Directory { directories: vec![ @@ -301,9 +292,31 @@ fn validate_sorting() { ], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::DuplicateName(s) => { - assert_eq!(s, b"a"); + match crate::Directory::try_from(d).expect_err("must fail") { + DirectoryError::DuplicateName(s) => { + assert_eq!(s.as_ref(), b"a"); + } + _ => panic!("unexpected error"), + } + } + + // "a" exists twice (different types), bad. + { + let d = Directory { + directories: vec![DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }], + symlinks: vec![SymlinkNode { + name: "a".into(), + target: "b".into(), + }], + ..Default::default() + }; + match crate::Directory::try_from(d).expect_err("must fail") { + DirectoryError::DuplicateName(s) => { + assert_eq!(s.as_ref(), b"a"); } _ => panic!("unexpected error"), } @@ -327,7 +340,7 @@ fn validate_sorting() { ..Default::default() }; - d.validate().expect("validate shouldn't error"); + crate::Directory::try_from(d).expect("validate shouldn't error"); } // [b, c] and [a] are both properly sorted. @@ -352,101 +365,6 @@ fn validate_sorting() { ..Default::default() }; - d.validate().expect("validate shouldn't error"); + crate::Directory::try_from(d).expect("validate shouldn't error"); } } - -#[test] -fn validate_overflow() { - let d = Directory { - directories: vec![DirectoryNode { - name: "foo".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: u64::MAX, - }], - ..Default::default() - }; - - match d.validate().expect_err("must fail") { - ValidateDirectoryError::SizeOverflow => {} - _ => panic!("unexpected error"), - } -} - -#[test] -fn add_nodes_to_directory() { - let mut d = Directory { - ..Default::default() - }; - - d.add(node::Node::Directory(DirectoryNode { - name: "b".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::Directory(DirectoryNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::Directory(DirectoryNode { - name: "z".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - - d.add(node::Node::File(FileNode { - name: "f".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - d.add(node::Node::File(FileNode { - name: "c".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - d.add(node::Node::File(FileNode { - name: "g".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - - d.add(node::Node::Symlink(SymlinkNode { - name: "t".into(), - target: "a".into(), - })); - d.add(node::Node::Symlink(SymlinkNode { - name: "o".into(), - target: "a".into(), - })); - d.add(node::Node::Symlink(SymlinkNode { - name: "e".into(), - target: "a".into(), - })); - - d.validate().expect("directory should be valid"); -} - -#[test] -#[cfg_attr(not(debug_assertions), ignore)] -#[should_panic = "name already exists in directories"] -fn add_duplicate_node_to_directory_panics() { - let mut d = Directory { - ..Default::default() - }; - - d.add(node::Node::Directory(DirectoryNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::File(FileNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); -} diff --git a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs deleted file mode 100644 index 68f147a33210..000000000000 --- a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::proto::Directory; -use crate::proto::DirectoryNode; -use crate::proto::FileNode; -use crate::proto::NamedNode; -use crate::proto::SymlinkNode; - -#[test] -fn iterator() { - let d = Directory { - directories: vec![ - DirectoryNode { - name: "c".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "d".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "h".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "l".into(), - ..DirectoryNode::default() - }, - ], - files: vec![ - FileNode { - name: "b".into(), - ..FileNode::default() - }, - FileNode { - name: "e".into(), - ..FileNode::default() - }, - FileNode { - name: "g".into(), - ..FileNode::default() - }, - FileNode { - name: "j".into(), - ..FileNode::default() - }, - ], - symlinks: vec![ - SymlinkNode { - name: "a".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "f".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "i".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "k".into(), - ..SymlinkNode::default() - }, - ], - }; - - // We keep this strings here and convert to string to make the comparison - // less messy. - let mut node_names: Vec<String> = vec![]; - - for node in d.nodes() { - node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap()); - } - - assert_eq!( - vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"], - node_names - ); -} diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs index 8d903bacb6c5..9f6330914bff 100644 --- a/tvix/castore/src/proto/tests/mod.rs +++ b/tvix/castore/src/proto/tests/mod.rs @@ -1,2 +1,47 @@ +use super::{node, Node, SymlinkNode}; +use crate::DirectoryError; + mod directory; -mod directory_nodes_iterator; + +/// Create a node with an empty symlink target, and ensure it fails validation. +#[test] +fn convert_symlink_empty_target_invalid() { + Node { + node: Some(node::Node::Symlink(SymlinkNode { + name: "foo".into(), + target: "".into(), + })), + } + .try_into_name_and_node() + .expect_err("must fail validation"); +} + +/// Create a node with a symlink target including null bytes, and ensure it +/// fails validation. +#[test] +fn convert_symlink_target_null_byte_invalid() { + Node { + node: Some(node::Node::Symlink(SymlinkNode { + name: "foo".into(), + target: "foo\0".into(), + })), + } + .try_into_name_and_node() + .expect_err("must fail validation"); +} + +/// Create a node with a name, and ensure our ano +#[test] +fn convert_anonymous_with_name_fail() { + assert_eq!( + DirectoryError::NameInAnonymousNode, + Node { + node: Some(node::Node::Symlink(SymlinkNode { + name: "foo".into(), + target: "somewhereelse".into(), + })), + } + .try_into_anonymous_node() + .expect_err("must fail") + ) +} diff --git a/tvix/castore/src/refscan.rs b/tvix/castore/src/refscan.rs new file mode 100644 index 000000000000..352593020472 --- /dev/null +++ b/tvix/castore/src/refscan.rs @@ -0,0 +1,354 @@ +//! Simple scanner for non-overlapping, known references of Nix store paths in a +//! given string. +//! +//! This is used for determining build references (see +//! //tvix/eval/docs/build-references.md for more details). +//! +//! The scanner itself is using the Wu-Manber string-matching algorithm, using +//! our fork of the `wu-mamber` crate. +use pin_project::pin_project; +use std::collections::BTreeSet; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::task::{ready, Poll}; +use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; +use wu_manber::TwoByteWM; + +/// A searcher that incapsulates the candidates and the Wu-Manber searcher. +/// This is separate from the scanner because we need to look for the same +/// pattern in multiple outputs and don't want to pay the price of constructing +/// the searcher for each build output. +pub struct ReferencePatternInner<P> { + candidates: Vec<P>, + longest_candidate: usize, + // FUTUREWORK: Support overlapping patterns to be compatible with cpp Nix + searcher: Option<TwoByteWM>, +} + +#[derive(Clone)] +pub struct ReferencePattern<P> { + inner: Arc<ReferencePatternInner<P>>, +} + +impl<P> ReferencePattern<P> { + pub fn candidates(&self) -> &[P] { + &self.inner.candidates + } + + pub fn longest_candidate(&self) -> usize { + self.inner.longest_candidate + } +} + +impl<P: AsRef<[u8]>> ReferencePattern<P> { + /// Construct a new `ReferencePattern` that knows how to scan for the given + /// candidates. + pub fn new(candidates: Vec<P>) -> Self { + let searcher = if candidates.is_empty() { + None + } else { + Some(TwoByteWM::new(&candidates)) + }; + let longest_candidate = candidates.iter().fold(0, |v, c| v.max(c.as_ref().len())); + + ReferencePattern { + inner: Arc::new(ReferencePatternInner { + searcher, + candidates, + longest_candidate, + }), + } + } +} + +impl<P> From<Vec<P>> for ReferencePattern<P> +where + P: AsRef<[u8]>, +{ + fn from(candidates: Vec<P>) -> Self { + Self::new(candidates) + } +} + +/// Represents a "primed" reference scanner with an automaton that knows the set +/// of bytes patterns to scan for. +pub struct ReferenceScanner<P> { + pattern: ReferencePattern<P>, + matches: Vec<AtomicBool>, +} + +impl<P: AsRef<[u8]>> ReferenceScanner<P> { + /// Construct a new `ReferenceScanner` that knows how to scan for the given + /// candidate bytes patterns. + pub fn new<IP: Into<ReferencePattern<P>>>(pattern: IP) -> Self { + let pattern = pattern.into(); + let mut matches = Vec::new(); + for _ in 0..pattern.candidates().len() { + matches.push(AtomicBool::new(false)); + } + ReferenceScanner { pattern, matches } + } + + /// Scan the given buffer for all non-overlapping matches and collect them + /// in the scanner. + pub fn scan<S: AsRef<[u8]>>(&self, haystack: S) { + if haystack.as_ref().len() < self.pattern.longest_candidate() { + return; + } + + if let Some(searcher) = &self.pattern.inner.searcher { + for m in searcher.find(haystack) { + self.matches[m.pat_idx].store(true, Ordering::Release); + } + } + } + + pub fn pattern(&self) -> &ReferencePattern<P> { + &self.pattern + } + + pub fn matches(&self) -> Vec<bool> { + self.matches + .iter() + .map(|m| m.load(Ordering::Acquire)) + .collect() + } + + pub fn candidate_matches(&self) -> impl Iterator<Item = &P> { + let candidates = self.pattern.candidates(); + self.matches.iter().enumerate().filter_map(|(idx, found)| { + if found.load(Ordering::Acquire) { + Some(&candidates[idx]) + } else { + None + } + }) + } +} + +impl<P: Clone + Ord + AsRef<[u8]>> ReferenceScanner<P> { + /// Finalise the reference scanner and return the resulting matches. + pub fn finalise(self) -> BTreeSet<P> { + self.candidate_matches().cloned().collect() + } +} + +const DEFAULT_BUF_SIZE: usize = 8 * 1024; + +#[pin_project] +pub struct ReferenceReader<'a, P, R> { + scanner: &'a ReferenceScanner<P>, + buffer: Vec<u8>, + consumed: usize, + #[pin] + reader: R, +} + +impl<'a, P, R> ReferenceReader<'a, P, R> +where + P: AsRef<[u8]>, +{ + pub fn new(scanner: &'a ReferenceScanner<P>, reader: R) -> Self { + Self::with_capacity(DEFAULT_BUF_SIZE, scanner, reader) + } + + pub fn with_capacity(capacity: usize, scanner: &'a ReferenceScanner<P>, reader: R) -> Self { + // If capacity is not at least as long as longest_candidate we can't do a scan + let capacity = capacity.max(scanner.pattern().longest_candidate()); + ReferenceReader { + scanner, + buffer: Vec::with_capacity(capacity), + consumed: 0, + reader, + } + } +} + +impl<'a, P, R> AsyncRead for ReferenceReader<'a, P, R> +where + R: AsyncRead, + P: AsRef<[u8]>, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<std::io::Result<()>> { + let internal_buf = ready!(self.as_mut().poll_fill_buf(cx))?; + let amt = buf.remaining().min(internal_buf.len()); + buf.put_slice(&internal_buf[..amt]); + self.consume(amt); + Poll::Ready(Ok(())) + } +} + +impl<'a, P, R> AsyncBufRead for ReferenceReader<'a, P, R> +where + R: AsyncRead, + P: AsRef<[u8]>, +{ + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<std::io::Result<&[u8]>> { + #[allow(clippy::manual_saturating_arithmetic)] // for clarity + let overlap = self + .scanner + .pattern + .longest_candidate() + .checked_sub(1) + // If this overflows (longest_candidate = 0), that means there are no needles, + // so there is no need to have any overlap + .unwrap_or(0); + let mut this = self.project(); + // Still data in buffer + if *this.consumed < this.buffer.len() { + return Poll::Ready(Ok(&this.buffer[*this.consumed..])); + } + // We need to copy last `overlap` bytes to front to deal with references that overlap reads + if *this.consumed > overlap { + let start = this.buffer.len() - overlap; + this.buffer.copy_within(start.., 0); + this.buffer.truncate(overlap); + *this.consumed = overlap; + } + // Read at least until self.buffer.len() > overlap so we can do one scan + loop { + let filled = { + let mut buf = ReadBuf::uninit(this.buffer.spare_capacity_mut()); + ready!(this.reader.as_mut().poll_read(cx, &mut buf))?; + buf.filled().len() + }; + // SAFETY: We just read `filled` amount of data above + unsafe { + this.buffer.set_len(filled + this.buffer.len()); + } + if filled == 0 || this.buffer.len() > overlap { + break; + } + } + + #[allow(clippy::needless_borrows_for_generic_args)] // misfiring lint (breaks code below) + this.scanner.scan(&this.buffer); + + Poll::Ready(Ok(&this.buffer[*this.consumed..])) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + debug_assert!(self.consumed + amt <= self.buffer.len()); + let this = self.project(); + *this.consumed += amt; + } +} + +#[cfg(test)] +mod tests { + use rstest::rstest; + use tokio::io::AsyncReadExt as _; + use tokio_test::io::Builder; + + use super::*; + + // The actual derivation of `nixpkgs.hello`. + const HELLO_DRV: &str = r#"Derive([("out","/nix/store/33l4p0pn0mybmqzaxfkpppyh7vx1c74p-hello-2.12.1","","")],[("/nix/store/6z1jfnqqgyqr221zgbpm30v91yfj3r45-bash-5.1-p16.drv",["out"]),("/nix/store/ap9g09fxbicj836zm88d56dn3ff4clxl-stdenv-linux.drv",["out"]),("/nix/store/pf80kikyxr63wrw56k00i1kw6ba76qik-hello-2.12.1.tar.gz.drv",["out"])],["/nix/store/9krlzvny65gdc8s7kpb6lkx8cd02c25b-default-builder.sh"],"x86_64-linux","/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16/bin/bash",["-e","/nix/store/9krlzvny65gdc8s7kpb6lkx8cd02c25b-default-builder.sh"],[("buildInputs",""),("builder","/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16/bin/bash"),("cmakeFlags",""),("configureFlags",""),("depsBuildBuild",""),("depsBuildBuildPropagated",""),("depsBuildTarget",""),("depsBuildTargetPropagated",""),("depsHostHost",""),("depsHostHostPropagated",""),("depsTargetTarget",""),("depsTargetTargetPropagated",""),("doCheck","1"),("doInstallCheck",""),("mesonFlags",""),("name","hello-2.12.1"),("nativeBuildInputs",""),("out","/nix/store/33l4p0pn0mybmqzaxfkpppyh7vx1c74p-hello-2.12.1"),("outputs","out"),("patches",""),("pname","hello"),("propagatedBuildInputs",""),("propagatedNativeBuildInputs",""),("src","/nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz"),("stdenv","/nix/store/cp65c8nk29qq5cl1wyy5qyw103cwmax7-stdenv-linux"),("strictDeps",""),("system","x86_64-linux"),("version","2.12.1")])"#; + + #[test] + fn test_no_patterns() { + let scanner: ReferenceScanner<String> = ReferenceScanner::new(vec![]); + + scanner.scan(HELLO_DRV); + + let result = scanner.finalise(); + + assert_eq!(result.len(), 0); + } + + #[test] + fn test_single_match() { + let scanner = ReferenceScanner::new(vec![ + "/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16".to_string(), + ]); + scanner.scan(HELLO_DRV); + + let result = scanner.finalise(); + + assert_eq!(result.len(), 1); + assert!(result.contains("/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16")); + } + + #[test] + fn test_multiple_matches() { + let candidates = vec![ + // these exist in the drv: + "/nix/store/33l4p0pn0mybmqzaxfkpppyh7vx1c74p-hello-2.12.1".to_string(), + "/nix/store/pf80kikyxr63wrw56k00i1kw6ba76qik-hello-2.12.1.tar.gz.drv".to_string(), + "/nix/store/cp65c8nk29qq5cl1wyy5qyw103cwmax7-stdenv-linux".to_string(), + // this doesn't: + "/nix/store/fn7zvafq26f0c8b17brs7s95s10ibfzs-emacs-28.2.drv".to_string(), + ]; + + let scanner = ReferenceScanner::new(candidates.clone()); + scanner.scan(HELLO_DRV); + + let result = scanner.finalise(); + assert_eq!(result.len(), 3); + + for c in candidates[..3].iter() { + assert!(result.contains(c)); + } + } + + #[rstest] + #[case::normal(8096, 8096)] + #[case::small_capacity(8096, 1)] + #[case::small_read(1, 8096)] + #[case::all_small(1, 1)] + #[tokio::test] + async fn test_reference_reader(#[case] chunk_size: usize, #[case] capacity: usize) { + let candidates = vec![ + // these exist in the drv: + "33l4p0pn0mybmqzaxfkpppyh7vx1c74p", + "pf80kikyxr63wrw56k00i1kw6ba76qik", + "cp65c8nk29qq5cl1wyy5qyw103cwmax7", + // this doesn't: + "fn7zvafq26f0c8b17brs7s95s10ibfzs", + ]; + let pattern = ReferencePattern::new(candidates.clone()); + let scanner = ReferenceScanner::new(pattern); + let mut mock = Builder::new(); + for c in HELLO_DRV.as_bytes().chunks(chunk_size) { + mock.read(c); + } + let mock = mock.build(); + let mut reader = ReferenceReader::with_capacity(capacity, &scanner, mock); + let mut s = String::new(); + reader.read_to_string(&mut s).await.unwrap(); + assert_eq!(s, HELLO_DRV); + + let result = scanner.finalise(); + assert_eq!(result.len(), 3); + + for c in candidates[..3].iter() { + assert!(result.contains(c)); + } + } + + #[tokio::test] + async fn test_reference_reader_no_patterns() { + let pattern = ReferencePattern::new(Vec::<&str>::new()); + let scanner = ReferenceScanner::new(pattern); + let mut mock = Builder::new(); + mock.read(HELLO_DRV.as_bytes()); + let mock = mock.build(); + let mut reader = ReferenceReader::new(&scanner, mock); + let mut s = String::new(); + reader.read_to_string(&mut s).await.unwrap(); + assert_eq!(s, HELLO_DRV); + + let result = scanner.finalise(); + assert_eq!(result.len(), 0); + } + + // FUTUREWORK: Test with large file +} diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs index 8b3bd5ce0ffc..51d1b68a4ec9 100644 --- a/tvix/castore/src/tests/import.rs +++ b/tvix/castore/src/tests/import.rs @@ -2,15 +2,11 @@ use crate::blobservice::{self, BlobService}; use crate::directoryservice; use crate::fixtures::*; use crate::import::fs::ingest_path; -use crate::proto; +use crate::Node; -use std::sync::Arc; use tempfile::TempDir; #[cfg(target_family = "unix")] -use std::os::unix::ffi::OsStrExt; - -#[cfg(target_family = "unix")] #[tokio::test] async fn symlink() { let blob_service = blobservice::from_addr("memory://").await.unwrap(); @@ -25,48 +21,47 @@ async fn symlink() { ) .unwrap(); - let root_node = ingest_path( - Arc::from(blob_service), + let root_node = ingest_path::<_, _, _, &[u8]>( + blob_service, directory_service, tmpdir.path().join("doesntmatter"), + None, ) .await .expect("must succeed"); assert_eq!( - proto::node::Node::Symlink(proto::SymlinkNode { - name: "doesntmatter".into(), - target: "/nix/store/somewhereelse".into(), - }), + Node::Symlink { + target: "/nix/store/somewhereelse".try_into().unwrap() + }, root_node, ) } #[tokio::test] async fn single_file() { - let blob_service = - Arc::from(blobservice::from_addr("memory://").await.unwrap()) as Arc<dyn BlobService>; + let blob_service = blobservice::from_addr("memory://").await.unwrap(); let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let tmpdir = TempDir::new().unwrap(); std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap(); - let root_node = ingest_path( + let root_node = ingest_path::<_, _, _, &[u8]>( blob_service.clone(), directory_service, tmpdir.path().join("root"), + None, ) .await .expect("must succeed"); assert_eq!( - proto::node::Node::File(proto::FileNode { - name: "root".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + Node::File { + digest: HELLOWORLD_BLOB_DIGEST.clone(), size: HELLOWORLD_BLOB_CONTENTS.len() as u64, executable: false, - }), + }, root_node, ); @@ -77,8 +72,7 @@ async fn single_file() { #[cfg(target_family = "unix")] #[tokio::test] async fn complicated() { - let blob_service = - Arc::from(blobservice::from_addr("memory://").await.unwrap()) as Arc<dyn BlobService>; + let blob_service = blobservice::from_addr("memory://").await.unwrap(); let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let tmpdir = TempDir::new().unwrap(); @@ -92,23 +86,21 @@ async fn complicated() { // File ``keep/.keep` std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap(); - let root_node = ingest_path(blob_service.clone(), &directory_service, tmpdir.path()) - .await - .expect("must succeed"); + let root_node = ingest_path::<_, _, _, &[u8]>( + blob_service.clone(), + &directory_service, + tmpdir.path(), + None, + ) + .await + .expect("must succeed"); // ensure root_node matched expectations assert_eq!( - proto::node::Node::Directory(proto::DirectoryNode { - name: tmpdir - .path() - .file_name() - .unwrap() - .as_bytes() - .to_owned() - .into(), - digest: DIRECTORY_COMPLICATED.digest().into(), + Node::Directory { + digest: DIRECTORY_COMPLICATED.digest().clone(), size: DIRECTORY_COMPLICATED.size(), - }), + }, root_node, ); diff --git a/tvix/castore/src/tonic.rs b/tvix/castore/src/tonic.rs index 4b65d6b028ef..e63e1ad7aab8 100644 --- a/tvix/castore/src/tonic.rs +++ b/tvix/castore/src/tonic.rs @@ -1,3 +1,4 @@ +use hyper_util::rt::TokioIo; use tokio::net::UnixStream; use tonic::transport::{Channel, Endpoint}; @@ -25,7 +26,10 @@ pub async fn channel_from_url(url: &url::Url) -> Result<Channel, self::Error> { let connector = tower::service_fn({ let url = url.clone(); - move |_: tonic::transport::Uri| UnixStream::connect(url.path().to_string().clone()) + move |_: tonic::transport::Uri| { + let unix = UnixStream::connect(url.path().to_string().clone()); + async move { Ok::<_, std::io::Error>(TokioIo::new(unix.await?)) } + } }); // the URL doesn't matter |