diff options
Diffstat (limited to 'tvix/castore')
53 files changed, 2625 insertions, 1903 deletions
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index ded2292db750..aa44e2e8ee4b 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -4,101 +4,70 @@ version = "0.1.0" edition = "2021" [dependencies] -async-compression = { version = "0.4.9", features = ["tokio", "zstd"]} -async-stream = "0.3.5" -async-tempfile = "0.4.0" -blake3 = { version = "1.3.1", features = ["rayon", "std", "traits-preview"] } -bstr = "1.6.0" -bytes = "1.4.0" -data-encoding = "2.6.0" -digest = "0.10.7" -fastcdc = { version = "3.1.0", features = ["tokio"] } -futures = "0.3.30" -lazy_static = "1.4.0" -object_store = { version = "0.10.1", features = ["http"] } -parking_lot = "0.12.1" -pin-project-lite = "0.2.13" -prost = "0.13.1" -sled = { version = "0.34.7" } -thiserror = "1.0.38" -tokio-stream = { version = "0.1.14", features = ["fs", "net"] } -tokio-util = { version = "0.7.9", features = ["io", "io-util", "codec"] } -tokio-tar = "0.3.1" -tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } -tonic = "0.12.0" -tower = "0.4.13" -tracing = "0.1.37" -tracing-indicatif = "0.3.6" +async-compression = { workspace = true, features = ["tokio", "zstd"] } +async-stream = { workspace = true } +async-tempfile = { workspace = true } +blake3 = { workspace = true, features = ["rayon", "std", "traits-preview"] } +bstr = { workspace = true } +bytes = { workspace = true } +data-encoding = { workspace = true } +digest = { workspace = true } +fastcdc = { workspace = true, features = ["tokio"] } +futures = { workspace = true } +object_store = { workspace = true, features = ["http"] } +parking_lot = { workspace = true } +pin-project-lite = { workspace = true } +prost = { workspace = true } +thiserror = { workspace = true } +tokio-stream = { workspace = true, features = ["fs", "net"] } +tokio-util = { workspace = true, features = ["io", "io-util", "codec"] } +tokio-tar = { workspace = true } +tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } +tonic = { workspace = true } +tower = { workspace = true } +tracing = { workspace = true } +tracing-indicatif = { workspace = true } tvix-tracing = { path = "../tracing", features = ["tonic"] } -url = "2.4.0" -walkdir = "2.4.0" -zstd = "0.13.0" -serde = { version = "1.0.197", features = [ "derive" ] } -serde_with = "3.7.0" -serde_qs = "0.12.0" -petgraph = "0.6.4" -erased-serde = "0.4.5" -serde_tagged = "0.3.0" -hyper-util = "0.1.6" -redb = "2.1.1" - -[dependencies.bigtable_rs] -optional = true -version = "0.2.10" - -[dependencies.fuse-backend-rs] -optional = true -version = "0.11.0" - -[dependencies.libc] -optional = true -version = "0.2.144" - -[dependencies.threadpool] -version = "1.8.1" -optional = true - -[dependencies.tonic-reflection] -optional = true -version = "0.12.0" - -[dependencies.vhost] -optional = true -version = "0.6" - -[dependencies.vhost-user-backend] -optional = true -version = "0.8" - -[dependencies.virtio-queue] -optional = true -version = "0.7" - -[dependencies.vm-memory] -optional = true -version = "0.10" - -[dependencies.vmm-sys-util] -optional = true -version = "0.11" - -[dependencies.virtio-bindings] -optional = true -version = "0.2.1" +url = { workspace = true } +walkdir = { workspace = true } +zstd = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_with = { workspace = true } +serde_qs = { workspace = true } +petgraph = { workspace = true } +pin-project = { workspace = true } +erased-serde = { workspace = true } +serde_tagged = { workspace = true } +hyper-util = { workspace = true } +redb = { workspace = true, features = ["logging"] } +bigtable_rs = { workspace = true, optional = true } +fuse-backend-rs = { workspace = true, optional = true } +libc = { workspace = true, optional = true } +threadpool = { workspace = true, optional = true } +tonic-reflection = { workspace = true, optional = true } +vhost = { workspace = true, optional = true } +vhost-user-backend = { workspace = true, optional = true } +virtio-queue = { workspace = true, optional = true } +vm-memory = { workspace = true, optional = true } +vmm-sys-util = { workspace = true, optional = true } +virtio-bindings = { workspace = true, optional = true } +wu-manber = { workspace = true } +auto_impl = "1.2.0" [build-dependencies] -prost-build = "0.13.1" -tonic-build = "0.12.0" +prost-build = { workspace = true } +tonic-build = { workspace = true } [dev-dependencies] -async-process = "2.1.0" -rstest = "0.19.0" -tempfile = "3.3.0" -tokio-retry = "0.3.0" -hex-literal = "0.4.1" -rstest_reuse = "0.6.0" -xattr = "1.3.1" -serde_json = "*" +async-process = { workspace = true } +rstest = { workspace = true } +tempfile = { workspace = true } +tokio-retry = { workspace = true } +hex-literal = { workspace = true } +rstest_reuse = { workspace = true } +xattr = { workspace = true } +serde_json = { workspace = true } +tokio-test = { workspace = true } [features] default = ["cloud"] @@ -122,6 +91,11 @@ virtiofs = [ ] fuse = ["fs"] tonic-reflection = ["dep:tonic-reflection"] +# It's already possible for other crates to build a +# fully fledged store composition system based on castore composition. +# However, this feature enables anonymous url syntax which might +# inherently expose arbitrary composition possibilities to the user. +xp-store-composition = [] # Whether to run the integration tests. # Requires the following packages in $PATH: # cbtemulator, google-cloud-bigtable-tool diff --git a/tvix/castore/build.rs b/tvix/castore/build.rs index 98e2ab348528..2250d4ebf0a2 100644 --- a/tvix/castore/build.rs +++ b/tvix/castore/build.rs @@ -18,7 +18,7 @@ fn main() -> Result<()> { .emit_rerun_if_changed(false) .bytes(["."]) .type_attribute(".", "#[derive(Eq, Hash)]") - .compile( + .compile_protos( &[ "tvix/castore/protos/castore.proto", "tvix/castore/protos/rpc_blobstore.proto", @@ -26,7 +26,7 @@ fn main() -> Result<()> { ], // If we are in running `cargo build` manually, using `../..` works fine, // but in case we run inside a nix build, we need to instead point PROTO_ROOT - // to a sparseTree containing that structure. + // to a custom tree containing that structure. &[match std::env::var_os("PROTO_ROOT") { Some(proto_root) => proto_root.to_str().unwrap().to_owned(), None => "../..".to_string(), diff --git a/tvix/castore/default.nix b/tvix/castore/default.nix index 9c210884f6e3..5314da9e0333 100644 --- a/tvix/castore/default.nix +++ b/tvix/castore/default.nix @@ -9,7 +9,7 @@ meta.ci.targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru); passthru = (depot.tvix.utils.mkFeaturePowerset { inherit (old) crateName; - features = ([ "cloud" "fuse" "tonic-reflection" ] + features = ([ "cloud" "fuse" "tonic-reflection" "xp-store-composition" ] # virtiofs feature currently fails to build on Darwin ++ lib.optional pkgs.stdenv.isLinux "virtiofs"); override.testPreRun = '' diff --git a/tvix/castore/protos/default.nix b/tvix/castore/protos/default.nix index feef55690fb9..08bb8fcfeef1 100644 --- a/tvix/castore/protos/default.nix +++ b/tvix/castore/protos/default.nix @@ -1,16 +1,10 @@ -{ depot, pkgs, ... }: +{ depot, pkgs, lib, ... }: let - protos = depot.nix.sparseTree { - name = "castore-protos"; - root = depot.path.origSrc; - paths = [ - ./castore.proto - ./rpc_blobstore.proto - ./rpc_directory.proto - ../../../buf.yaml - ../../../buf.gen.yaml - ]; - }; + protos = lib.sourceByRegex depot.path.origSrc [ + "buf.yaml" + "buf.gen.yaml" + "^tvix(/castore(/protos(/.*\.proto)?)?)?$" + ]; in depot.nix.readTree.drvTargets { inherit protos; diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs index 6e8355874bca..0e809b300485 100644 --- a/tvix/castore/src/blobservice/chunked_reader.rs +++ b/tvix/castore/src/blobservice/chunked_reader.rs @@ -253,14 +253,16 @@ where #[cfg(test)] mod test { - use std::{io::SeekFrom, sync::Arc}; + use std::{ + io::SeekFrom, + sync::{Arc, LazyLock}, + }; use crate::{ blobservice::{chunked_reader::ChunkedReader, BlobService, MemoryBlobService}, B3Digest, }; use hex_literal::hex; - use lazy_static::lazy_static; use tokio::io::{AsyncReadExt, AsyncSeekExt}; const CHUNK_1: [u8; 2] = hex!("0001"); @@ -269,21 +271,26 @@ mod test { const CHUNK_4: [u8; 2] = hex!("0708"); const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f"); - lazy_static! { - // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]` - pub static ref CHUNK_1_DIGEST: B3Digest = blake3::hash(&CHUNK_1).as_bytes().into(); - pub static ref CHUNK_2_DIGEST: B3Digest = blake3::hash(&CHUNK_2).as_bytes().into(); - pub static ref CHUNK_3_DIGEST: B3Digest = blake3::hash(&CHUNK_3).as_bytes().into(); - pub static ref CHUNK_4_DIGEST: B3Digest = blake3::hash(&CHUNK_4).as_bytes().into(); - pub static ref CHUNK_5_DIGEST: B3Digest = blake3::hash(&CHUNK_5).as_bytes().into(); - pub static ref BLOB_1_LIST: [(B3Digest, u64); 5] = [ + // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]` + pub static CHUNK_1_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&CHUNK_1).as_bytes().into()); + pub static CHUNK_2_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&CHUNK_2).as_bytes().into()); + pub static CHUNK_3_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&CHUNK_3).as_bytes().into()); + pub static CHUNK_4_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&CHUNK_4).as_bytes().into()); + pub static CHUNK_5_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&CHUNK_5).as_bytes().into()); + pub static BLOB_1_LIST: LazyLock<[(B3Digest, u64); 5]> = LazyLock::new(|| { + [ (CHUNK_1_DIGEST.clone(), 2), (CHUNK_2_DIGEST.clone(), 4), (CHUNK_3_DIGEST.clone(), 1), (CHUNK_4_DIGEST.clone(), 2), (CHUNK_5_DIGEST.clone(), 7), - ]; - } + ] + }); use super::ChunkedBlob; diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs index 6a964c8a8440..e5f61d6d0c2a 100644 --- a/tvix/castore/src/blobservice/combinator.rs +++ b/tvix/castore/src/blobservice/combinator.rs @@ -16,6 +16,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; /// blobservice again, before falling back to the remote one. /// The remote BlobService is never written to. pub struct CombinedBlobService<BL, BR> { + instance_name: String, local: BL, remote: BR, } @@ -27,6 +28,7 @@ where { fn clone(&self) -> Self { Self { + instance_name: self.instance_name.clone(), local: self.local.clone(), remote: self.remote.clone(), } @@ -39,12 +41,12 @@ where BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, { - #[instrument(skip(self, digest), fields(blob.digest=%digest))] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> { Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?) } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> { if self.local.as_ref().has(digest).await? { // local store has the blob, so we can assume it also has all chunks. @@ -84,7 +86,7 @@ where } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // direct writes to the local one. self.local.as_ref().open_write().await @@ -113,7 +115,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> { let (local, remote) = futures::join!( @@ -121,6 +123,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig { context.resolve(self.remote.clone()) ); Ok(Arc::new(CombinedBlobService { + instance_name: instance_name.to_string(), local: local?, remote: remote?, })) diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index c5cabaa9d945..803e7fa6a575 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -27,7 +27,7 @@ pub async fn from_addr( })? .0; let blob_service = blob_service_config - .build("anonymous", &CompositionContext::blank()) + .build("anonymous", &CompositionContext::blank(®)) .await?; Ok(blob_service) diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 0db3dfea4ad8..3453657d07ab 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -24,6 +24,7 @@ use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] pub struct GRPCBlobService<T> { + instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::blob_service_client::BlobServiceClient<T>, @@ -31,8 +32,14 @@ pub struct GRPCBlobService<T> { impl<T> GRPCBlobService<T> { /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. - pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self { - Self { grpc_client } + pub fn from_client( + instance_name: String, + grpc_client: proto::blob_service_client::BlobServiceClient<T>, + ) -> Self { + Self { + instance_name, + grpc_client, + } } } @@ -44,7 +51,7 @@ where <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, T::Future: Send, { - #[instrument(skip(self, digest), fields(blob.digest=%digest))] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { match self .grpc_client @@ -61,7 +68,7 @@ where } } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // First try to get a list of chunks. In case there's only one chunk returned, // buffer its data into a Vec, otherwise use a ChunkedReader. @@ -124,7 +131,7 @@ where /// Returns a BlobWriter, that'll internally wrap each write in a /// [proto::BlobChunk], which is send to the gRPC server. - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // set up an mpsc channel passing around Bytes. let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); @@ -154,7 +161,7 @@ where }) } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { let resp = self .grpc_client @@ -205,13 +212,16 @@ impl ServiceBuilder for GRPCBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let client = proto::blob_service_client::BlobServiceClient::new( crate::tonic::channel_from_url(&self.url.parse()?).await?, ); - Ok(Arc::new(GRPCBlobService::from_client(client))) + Ok(Arc::new(GRPCBlobService::from_client( + instance_name.to_string(), + client, + ))) } } @@ -375,7 +385,7 @@ mod tests { .await .expect("must succeed"), ); - GRPCBlobService::from_client(client) + GRPCBlobService::from_client("default".into(), client) }; let has = grpc_client diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs index 3d733f950470..348b8bb56d5b 100644 --- a/tvix/castore/src/blobservice/memory.rs +++ b/tvix/castore/src/blobservice/memory.rs @@ -11,18 +11,19 @@ use crate::{B3Digest, Error}; #[derive(Clone, Default)] pub struct MemoryBlobService { + instance_name: String, db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, } #[async_trait] impl BlobService for MemoryBlobService { - #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + #[instrument(skip_all, ret, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { let db = self.db.read(); Ok(db.contains_key(digest)) } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { let db = self.db.read(); @@ -32,7 +33,7 @@ impl BlobService for MemoryBlobService { } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { Box::new(MemoryBlobWriter::new(self.db.clone())) } @@ -58,10 +59,13 @@ impl ServiceBuilder for MemoryBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { - Ok(Arc::new(MemoryBlobService::default())) + Ok(Arc::new(MemoryBlobService { + instance_name: instance_name.to_string(), + db: Default::default(), + })) } } diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index 85292722fa7e..efba927b586b 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -1,5 +1,6 @@ use std::io; +use auto_impl::auto_impl; use tonic::async_trait; use crate::composition::{Registry, ServiceBuilder}; @@ -29,6 +30,7 @@ pub use self::object_store::{ObjectStoreBlobService, ObjectStoreBlobServiceConfi /// which will implement a writer interface, and also provides a close funtion, /// to finalize a blob and get its digest. #[async_trait] +#[auto_impl(&, &mut, Arc, Box)] pub trait BlobService: Send + Sync { /// Check if the service has the blob, by its content hash. /// On implementations returning chunks, this must also work for chunks. @@ -60,28 +62,6 @@ pub trait BlobService: Send + Sync { } } -#[async_trait] -impl<A> BlobService for A -where - A: AsRef<dyn BlobService> + Send + Sync, -{ - async fn has(&self, digest: &B3Digest) -> io::Result<bool> { - self.as_ref().has(digest).await - } - - async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { - self.as_ref().open_read(digest).await - } - - async fn open_write(&self) -> Box<dyn BlobWriter> { - self.as_ref().open_write().await - } - - async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { - self.as_ref().chunks(digest).await - } -} - /// A [tokio::io::AsyncWrite] that the user needs to close() afterwards for persist. /// On success, it returns the digest of the written blob. #[async_trait] diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index 5bb05cf26123..10874af64011 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -64,6 +64,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; /// all keys stored so far, but no promises ;-) #[derive(Clone)] pub struct ObjectStoreBlobService { + instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path, @@ -92,7 +93,7 @@ fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path { #[async_trait] impl BlobService for ObjectStoreBlobService { - #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { // TODO: clarify if this should work for chunks or not, and explicitly // document in the proto docs. @@ -112,7 +113,7 @@ impl BlobService for ObjectStoreBlobService { } } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // handle reading the empty blob. if digest.as_slice() == blake3::hash(b"").as_bytes() { @@ -169,7 +170,7 @@ impl BlobService for ObjectStoreBlobService { } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // ObjectStoreBlobWriter implements AsyncWrite, but all the chunking // needs an AsyncRead, so we create a pipe here. @@ -192,7 +193,7 @@ impl BlobService for ObjectStoreBlobService { }) } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { match self .object_store @@ -294,7 +295,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (object_store, path) = object_store::parse_url_opts( @@ -302,6 +303,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig { &self.object_store_options, )?; Ok(Arc::new(ObjectStoreBlobService { + instance_name: instance_name.to_string(), object_store: Arc::new(object_store), base_path: path, avg_chunk_size: self.avg_chunk_size, @@ -582,6 +584,7 @@ mod test { object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap(); let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store); let blobsvc = Arc::new(ObjectStoreBlobService { + instance_name: "test".into(), object_store: object_store.clone(), avg_chunk_size: default_avg_chunk_size(), base_path, diff --git a/tvix/castore/src/blobservice/tests/utils.rs b/tvix/castore/src/blobservice/tests/utils.rs index 7df4f00d3a09..7032d633dad0 100644 --- a/tvix/castore/src/blobservice/tests/utils.rs +++ b/tvix/castore/src/blobservice/tests/utils.rs @@ -29,14 +29,17 @@ pub async fn make_grpc_blob_service_client() -> Box<dyn BlobService> { // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); - Box::new(GRPCBlobService::from_client(BlobServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } - })) - .await - .unwrap(), - ))) + Box::new(GRPCBlobService::from_client( + "default".into(), + BlobServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } + })) + .await + .unwrap(), + ), + )) } diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs index c76daafc523d..b251187e1c34 100644 --- a/tvix/castore/src/composition.rs +++ b/tvix/castore/src/composition.rs @@ -70,7 +70,7 @@ //! }); //! //! let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json))?; -//! let mut blob_service_composition = Composition::default(); +//! let mut blob_service_composition = Composition::new(®); //! blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); //! let blob_service: Arc<dyn BlobService> = blob_service_composition.build("default").await?; //! # Ok(()) @@ -88,11 +88,17 @@ //! ``` //! //! Continue with Example 2, with my_registry instead of REG +//! +//! EXPERIMENTAL: If the xp-store-composition feature is enabled, +//! entrypoints can also be URL strings, which are created as +//! anonymous stores. Instantiations of the same URL will +//! result in a new, distinct anonymous store each time, so creating +//! two `memory://` stores with this method will not share the same view. +//! This behavior might change in the future. use erased_serde::deserialize; use futures::future::BoxFuture; use futures::FutureExt; -use lazy_static::lazy_static; use serde::de::DeserializeOwned; use serde_tagged::de::{BoxFnSeed, SeedFactory}; use serde_tagged::util::TagString; @@ -101,7 +107,7 @@ use std::cell::Cell; use std::collections::BTreeMap; use std::collections::HashMap; use std::marker::PhantomData; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use tonic::async_trait; /// Resolves tag names to the corresponding Config type. @@ -254,14 +260,13 @@ pub fn with_registry<R>(reg: &'static Registry, f: impl FnOnce() -> R) -> R { result } -lazy_static! { - /// The provided registry of tvix_castore, with all builtin BlobStore/DirectoryStore implementations - pub static ref REG: Registry = { - let mut reg = Default::default(); - add_default_services(&mut reg); - reg - }; -} +/// The provided registry of tvix_castore, with all builtin BlobStore/DirectoryStore implementations +pub static REG: LazyLock<&'static Registry> = LazyLock::new(|| { + let mut reg = Default::default(); + add_default_services(&mut reg); + // explicitly leak to get an &'static, so that we gain `&Registry: Send` from `Registry: Sync` + Box::leak(Box::new(reg)) +}); // ---------- End of generic registry code --------- // @@ -278,12 +283,15 @@ pub struct CompositionContext<'a> { // The TypeId of the trait object is included to distinguish e.g. the // BlobService "default" and the DirectoryService "default". stack: Vec<(TypeId, String)>, + registry: &'static Registry, composition: Option<&'a Composition>, } impl<'a> CompositionContext<'a> { - pub fn blank() -> Self { + /// Get a composition context for one-off store creation. + pub fn blank(registry: &'static Registry) -> Self { Self { + registry, stack: Default::default(), composition: None, } @@ -303,10 +311,104 @@ impl<'a> CompositionContext<'a> { ) .into()); } - match self.composition { - Some(comp) => Ok(comp.build_internal(self.stack.clone(), entrypoint).await?), - None => Err(CompositionError::NotFound(entrypoint).into()), + + Ok(self.build_internal(entrypoint).await?) + } + + #[cfg(feature = "xp-store-composition")] + async fn build_anonymous<T: ?Sized + Send + Sync + 'static>( + &self, + entrypoint: String, + ) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync>> { + let url = url::Url::parse(&entrypoint)?; + let config: DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>> = + with_registry(self.registry, || url.try_into())?; + config.0.build("anonymous", self).await + } + + fn build_internal<T: ?Sized + Send + Sync + 'static>( + &self, + entrypoint: String, + ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> { + #[cfg(feature = "xp-store-composition")] + if entrypoint.contains("://") { + // There is a chance this is a url. we are building an anonymous store + return Box::pin(async move { + self.build_anonymous(entrypoint.clone()) + .await + .map_err(|e| CompositionError::Failed(entrypoint, Arc::from(e))) + }); } + + let mut stores = match self.composition { + Some(comp) => comp.stores.lock().unwrap(), + None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))), + }; + let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) { + Some(v) => v, + None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))), + }; + // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what + // the new value should be. the Mutex stays locked the entire time, so nobody will ever see + // this temporary value. + let prev_val = std::mem::replace( + entry, + Box::new(InstantiationState::<T>::Done(Err( + CompositionError::Poisoned(entrypoint.clone()), + ))), + ); + let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() { + InstantiationState::Done(service) => ( + InstantiationState::Done(service.clone()), + futures::future::ready(service).boxed(), + ), + // the construction of the store has not started yet. + InstantiationState::Config(config) => { + let (tx, rx) = tokio::sync::watch::channel(None); + ( + InstantiationState::InProgress(rx), + (async move { + let mut new_context = CompositionContext { + composition: self.composition, + registry: self.registry, + stack: self.stack.clone(), + }; + new_context + .stack + .push((TypeId::of::<T>(), entrypoint.clone())); + let res = + config.build(&entrypoint, &new_context).await.map_err(|e| { + match e.downcast() { + Ok(e) => *e, + Err(e) => CompositionError::Failed(entrypoint, e.into()), + } + }); + tx.send(Some(res.clone())).unwrap(); + res + }) + .boxed(), + ) + } + // there is already a task driving forward the construction of this store, wait for it + // to notify us via the provided channel + InstantiationState::InProgress(mut recv) => { + (InstantiationState::InProgress(recv.clone()), { + (async move { + loop { + if let Some(v) = + recv.borrow_and_update().as_ref().map(|res| res.clone()) + { + break v; + } + recv.changed().await.unwrap(); + } + }) + .boxed() + }) + } + }; + *entry = Box::new(new_val); + ret } } @@ -336,8 +438,8 @@ enum InstantiationState<T: ?Sized> { Done(Result<Arc<T>, CompositionError>), } -#[derive(Default)] pub struct Composition { + registry: &'static Registry, stores: std::sync::Mutex<HashMap<(TypeId, String), Box<dyn Any + Send + Sync>>>, } @@ -381,6 +483,14 @@ impl<T: ?Sized + Send + Sync + 'static> } impl Composition { + /// The given registry will be used for creation of anonymous stores during composition + pub fn new(registry: &'static Registry) -> Self { + Self { + registry, + stores: Default::default(), + } + } + pub fn extend_with_configs<T: ?Sized + Send + Sync + 'static>( &mut self, // Keep the concrete `HashMap` type here since it allows for type @@ -390,87 +500,17 @@ impl Composition { self.extend(configs); } + /// Looks up the entrypoint name in the composition and returns an instantiated service. pub async fn build<T: ?Sized + Send + Sync + 'static>( &self, entrypoint: &str, ) -> Result<Arc<T>, CompositionError> { - self.build_internal(vec![], entrypoint.to_string()).await - } - - fn build_internal<T: ?Sized + Send + Sync + 'static>( - &self, - stack: Vec<(TypeId, String)>, - entrypoint: String, - ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> { - let mut stores = self.stores.lock().unwrap(); - let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) { - Some(v) => v, - None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))), - }; - // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what - // the new value should be. the Mutex stays locked the entire time, so nobody will ever see - // this temporary value. - let prev_val = std::mem::replace( - entry, - Box::new(InstantiationState::<T>::Done(Err( - CompositionError::Poisoned(entrypoint.clone()), - ))), - ); - let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() { - InstantiationState::Done(service) => ( - InstantiationState::Done(service.clone()), - futures::future::ready(service).boxed(), - ), - // the construction of the store has not started yet. - InstantiationState::Config(config) => { - let (tx, rx) = tokio::sync::watch::channel(None); - ( - InstantiationState::InProgress(rx), - (async move { - let mut new_context = CompositionContext { - stack: stack.clone(), - composition: Some(self), - }; - new_context - .stack - .push((TypeId::of::<T>(), entrypoint.clone())); - let res = - config.build(&entrypoint, &new_context).await.map_err(|e| { - match e.downcast() { - Ok(e) => *e, - Err(e) => CompositionError::Failed(entrypoint, e.into()), - } - }); - tx.send(Some(res.clone())).unwrap(); - res - }) - .boxed(), - ) - } - // there is already a task driving forward the construction of this store, wait for it - // to notify us via the provided channel - InstantiationState::InProgress(mut recv) => { - (InstantiationState::InProgress(recv.clone()), { - (async move { - loop { - if let Some(v) = - recv.borrow_and_update().as_ref().map(|res| res.clone()) - { - break v; - } - recv.changed().await.unwrap(); - } - }) - .boxed() - }) - } - }; - *entry = Box::new(new_val); - ret + self.context().build_internal(entrypoint.to_string()).await } pub fn context(&self) -> CompositionContext { CompositionContext { + registry: self.registry, stack: vec![], composition: Some(self), } @@ -496,7 +536,7 @@ mod test { let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json)).unwrap(); - let mut blob_service_composition = Composition::default(); + let mut blob_service_composition = Composition::new(®); blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); let (blob_service1, blob_service2) = tokio::join!( blob_service_composition.build::<dyn BlobService>("default"), @@ -526,7 +566,7 @@ mod test { let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json)).unwrap(); - let mut blob_service_composition = Composition::default(); + let mut blob_service_composition = Composition::new(®); blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); match blob_service_composition .build::<dyn BlobService>("default") diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs index ef9a7326b3fb..4d919ff0d873 100644 --- a/tvix/castore/src/digests.rs +++ b/tvix/castore/src/digests.rs @@ -6,7 +6,7 @@ use thiserror::Error; pub struct B3Digest(Bytes); // TODO: allow converting these errors to crate::Error -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq)] pub enum Error { #[error("invalid digest length: {0}")] InvalidDigestLen(usize), diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index d10dddaf9f60..7473481c94b5 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -9,7 +9,9 @@ use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace, warn}; -use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter}; +use super::{ + utils::traverse_directory, Directory, DirectoryPutter, DirectoryService, SimplePutter, +}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; @@ -35,6 +37,7 @@ const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; /// directly at the root, so rely on store composition. #[derive(Clone)] pub struct BigtableDirectoryService { + instance_name: String, client: bigtable::BigTable, params: BigtableParameters, @@ -47,7 +50,10 @@ pub struct BigtableDirectoryService { impl BigtableDirectoryService { #[cfg(not(test))] - pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + pub async fn connect( + instance_name: String, + params: BigtableParameters, + ) -> Result<Self, bigtable::Error> { let connection = bigtable::BigTableConnection::new( ¶ms.project_id, ¶ms.instance_name, @@ -58,13 +64,17 @@ impl BigtableDirectoryService { .await?; Ok(Self { + instance_name, client: connection.client(), params, }) } #[cfg(test)] - pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + pub async fn connect( + instance_name: String, + params: BigtableParameters, + ) -> Result<Self, bigtable::Error> { use std::time::Duration; use async_process::{Command, Stdio}; @@ -133,6 +143,7 @@ impl BigtableDirectoryService { )?; Ok(Self { + instance_name, client: connection.client(), params, emulator: (tmpdir, emulator_process).into(), @@ -148,8 +159,8 @@ fn derive_directory_key(digest: &B3Digest) -> String { #[async_trait] impl DirectoryService for BigtableDirectoryService { - #[instrument(skip(self, digest), err, fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let mut client = self.client.clone(); let directory_key = derive_directory_key(digest); @@ -241,28 +252,20 @@ impl DirectoryService for BigtableDirectoryService { // Try to parse the value into a Directory message. let directory = proto::Directory::decode(Bytes::from(row_cell.value)) - .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?; - - // validate the Directory. - directory - .validate() + .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))? + .try_into() .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?; Ok(Some(directory)) } - #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let directory_digest = directory.digest(); let mut client = self.client.clone(); let directory_key = derive_directory_key(&directory_digest); - // Ensure the directory we're trying to upload passes validation - directory - .validate() - .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?; - - let data = directory.encode_to_vec(); + let data = proto::Directory::from(directory).encode_to_vec(); if data.len() as u64 > CELL_SIZE_LIMIT { return Err(Error::StorageError( "Directory exceeds cell limit on Bigtable".into(), @@ -306,15 +309,15 @@ impl DirectoryService for BigtableDirectoryService { Ok(directory_digest) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, @@ -352,11 +355,11 @@ impl ServiceBuilder for BigtableParameters { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> { Ok(Arc::new( - BigtableDirectoryService::connect(self.clone()).await?, + BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?, )) } } diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index 0fdc82c16cb0..450c642715ac 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -7,10 +7,9 @@ use futures::TryStreamExt; use tonic::async_trait; use tracing::{instrument, trace}; -use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; +use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::directoryservice::DirectoryPutter; -use crate::proto; use crate::B3Digest; use crate::Error; @@ -23,13 +22,18 @@ use crate::Error; /// Inserts and listings are not implemented for now. #[derive(Clone)] pub struct Cache<DS1, DS2> { + instance_name: String, near: DS1, far: DS2, } impl<DS1, DS2> Cache<DS1, DS2> { - pub fn new(near: DS1, far: DS2) -> Self { - Self { near, far } + pub fn new(instance_name: String, near: DS1, far: DS2) -> Self { + Self { + instance_name, + near, + far, + } } } @@ -39,8 +43,8 @@ where DS1: DirectoryService + Clone + 'static, DS2: DirectoryService + Clone + 'static, { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { match self.near.get(digest).await? { Some(directory) => { trace!("serving from cache"); @@ -81,16 +85,16 @@ where } } - #[instrument(skip_all)] - async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> { + #[instrument(skip_all, fields(instance_name = %self.instance_name))] + async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> { Err(Error::StorageError("unimplemented".to_string())) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let near = self.near.clone(); let far = self.far.clone(); let digest = root_directory_digest.clone(); @@ -153,11 +157,12 @@ pub struct CacheConfig { impl TryFrom<url::Url> for CacheConfig { type Error = Box<dyn std::error::Error + Send + Sync>; - fn try_from(_url: url::Url) -> Result<Self, Self::Error> { - Err(Error::StorageError( - "Instantiating a CombinedDirectoryService from a url is not supported".into(), - ) - .into()) + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // cache doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string()).into()); + } + Ok(serde_qs::from_str(url.query().unwrap_or_default())?) } } @@ -166,14 +171,15 @@ impl ServiceBuilder for CacheConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (near, far) = futures::join!( - context.resolve(self.near.clone()), - context.resolve(self.far.clone()) + context.resolve::<Self::Output>(self.near.clone()), + context.resolve::<Self::Output>(self.far.clone()) ); Ok(Arc::new(Cache { + instance_name: instance_name.to_string(), near: near?, far: far?, })) diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs index e6b9b163370c..54f3cb3e9353 100644 --- a/tvix/castore/src/directoryservice/directory_graph.rs +++ b/tvix/castore/src/directoryservice/directory_graph.rs @@ -1,7 +1,5 @@ use std::collections::HashMap; -use bstr::ByteSlice; - use petgraph::{ graph::{DiGraph, NodeIndex}, visit::{Bfs, DfsPostOrder, EdgeRef, IntoNodeIdentifiers, Walker}, @@ -10,10 +8,7 @@ use petgraph::{ use tracing::instrument; use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; -use crate::{ - proto::{self, Directory, DirectoryNode}, - B3Digest, -}; +use crate::{path::PathComponent, B3Digest, Directory, Node}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -21,6 +16,11 @@ pub enum Error { ValidationError(String), } +struct EdgeWeight { + name: PathComponent, + size: u64, +} + /// This can be used to validate and/or re-order a Directory closure (DAG of /// connected Directories), and their insertion order. /// @@ -58,7 +58,7 @@ pub struct DirectoryGraph<O> { // // The option in the edge weight tracks the pending validation state of the respective edge, for example if // the child has not been added yet. - graph: DiGraph<Option<Directory>, Option<DirectoryNode>>, + graph: DiGraph<Option<Directory>, Option<EdgeWeight>>, // A lookup table from directory digest to node index. digest_to_node_ix: HashMap<B3Digest, NodeIndex>, @@ -67,18 +67,18 @@ pub struct DirectoryGraph<O> { } pub struct ValidatedDirectoryGraph { - graph: DiGraph<Option<Directory>, Option<DirectoryNode>>, + graph: DiGraph<Option<Directory>, Option<EdgeWeight>>, root: Option<NodeIndex>, } -fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> { +fn check_edge(edge: &EdgeWeight, child: &Directory) -> Result<(), Error> { // Ensure the size specified in the child node matches our records. - if dir.size != child.size() { + if edge.size != child.size() { return Err(Error::ValidationError(format!( "'{}' has wrong size, specified {}, recorded {}", - dir.name.as_bstr(), - dir.size, + edge.name, + edge.size, child.size(), ))); } @@ -88,7 +88,7 @@ fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> { impl DirectoryGraph<LeavesToRootValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { if !self.order_validator.add_directory(&directory) { return Err(Error::ValidationError( "unknown directory was referenced".into(), @@ -108,7 +108,7 @@ impl DirectoryGraph<RootToLeavesValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); if !self.order_validator.digest_allowed(&digest) { return Err(Error::ValidationError("unexpected digest".into())); @@ -129,12 +129,7 @@ impl<O: OrderValidator> DirectoryGraph<O> { } /// Adds a directory which has already been confirmed to be in-order to the graph - pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> { - // Do some basic validation - directory - .validate() - .map_err(|e| Error::ValidationError(e.to_string()))?; - + pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); // Teach the graph about the existence of a node with this digest @@ -149,23 +144,32 @@ impl<O: OrderValidator> DirectoryGraph<O> { } // set up edges to all child directories - for subdir in &directory.directories { - let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap(); - - let child_ix = *self - .digest_to_node_ix - .entry(subdir_digest) - .or_insert_with(|| self.graph.add_node(None)); - - let pending_edge_check = match &self.graph[child_ix] { - Some(child) => { - // child is already available, validate the edge now - check_edge(subdir, child)?; - None - } - None => Some(subdir.clone()), // pending validation - }; - self.graph.add_edge(ix, child_ix, pending_edge_check); + for (name, node) in directory.nodes() { + if let Node::Directory { digest, size } = node { + let child_ix = *self + .digest_to_node_ix + .entry(digest.clone()) + .or_insert_with(|| self.graph.add_node(None)); + + let pending_edge_check = match &self.graph[child_ix] { + Some(child) => { + // child is already available, validate the edge now + check_edge( + &EdgeWeight { + name: name.clone(), + size: *size, + }, + child, + )?; + None + } + None => Some(EdgeWeight { + name: name.clone(), + size: *size, + }), // pending validation + }; + self.graph.add_edge(ix, child_ix, pending_edge_check); + } } // validate the edges from parents to this node @@ -183,6 +187,7 @@ impl<O: OrderValidator> DirectoryGraph<O> { .expect("edge not found") .take() .expect("edge is already validated"); + check_edge(&edge_weight, &directory)?; } @@ -269,33 +274,23 @@ impl ValidatedDirectoryGraph { #[cfg(test)] mod tests { - use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, - proto::{self, Directory}, - }; - use lazy_static::lazy_static; + use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; + use crate::{Directory, Node}; use rstest::rstest; + use std::sync::LazyLock; - lazy_static! { - pub static ref BROKEN_DIRECTORY : Directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // invalid name! - target: "doesntmatter".into(), - }], - ..Default::default() - }; + use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; - pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), + pub static BROKEN_PARENT_DIRECTORY: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([( + "foo".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() - }; - } - - use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; + }, + )]) + .unwrap() + }); #[rstest] /// Uploading an empty directory should succeed. @@ -312,8 +307,6 @@ mod tests { #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] /// Uploading B (referring to A) should fail immediately, because A was never uploaded. #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] - /// Uploading a directory failing validation should fail immediately. - #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] /// Uploading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] fn test_uploads( @@ -366,8 +359,6 @@ mod tests { #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)] /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)] - /// Downloading a directory failing validation should fail immediately. - #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)] /// Downloading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)] fn test_downloads( diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index 3feb8f3509fe..2f7fc6d7de5a 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -13,10 +13,6 @@ use super::DirectoryService; /// The following URIs are supported: /// - `memory:` /// Uses a in-memory implementation. -/// - `sled:` -/// Uses a in-memory sled implementation. -/// - `sled:///absolute/path/to/somewhere` -/// Uses sled, using a path on the disk for persistency. Can be only opened /// from one process at the same time. /// - `redb:` /// Uses a in-memory redb implementation. @@ -41,7 +37,7 @@ pub async fn from_addr( })? .0; let directory_service = directory_service_config - .build("anonymous", &CompositionContext::blank()) + .build("anonymous", &CompositionContext::blank(®)) .await?; Ok(directory_service) @@ -49,31 +45,18 @@ pub async fn from_addr( #[cfg(test)] mod tests { + use std::sync::LazyLock; + use super::from_addr; - use lazy_static::lazy_static; use rstest::rstest; use tempfile::TempDir; - lazy_static! { - static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); - static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); - static ref TMPDIR_REDB_1: TempDir = TempDir::new().unwrap(); - static ref TMPDIR_REDB_2: TempDir = TempDir::new().unwrap(); - } + static TMPDIR_REDB_1: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap()); + static TMPDIR_REDB_2: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap()); #[rstest] /// This uses an unsupported scheme. #[case::unsupported_scheme("http://foo.example/test", false)] - /// This configures sled in temporary mode. - #[case::sled_valid_temporary("sled://", true)] - /// This configures sled with /, which should fail. - #[case::sled_invalid_root("sled:///", false)] - /// This configures sled with a host, not path, which should fail. - #[case::sled_invalid_host("sled://foo.example", false)] - /// This configures sled with a valid path path, which should succeed. - #[case::sled_valid_path(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true)] - /// This configures sled with a host, and a valid path path, which should fail. - #[case::sled_invalid_host_with_valid_path(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false)] /// This correctly sets the scheme, and doesn't set a path. #[case::memory_valid("memory://", true)] /// This sets a memory url host to `foo` @@ -104,6 +87,16 @@ mod tests { #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)] /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] + /// A valid example for store composition using anonymous urls + #[cfg_attr( + feature = "xp-store-composition", + case::anonymous_url_composition("cache://?near=memory://&far=memory://", true) + )] + /// Store composition with anonymous urls should fail if the feature is disabled + #[cfg_attr( + not(feature = "xp-store-composition"), + case::anonymous_url_composition("cache://?near=memory://&far=memory://", false) + )] /// A valid example for Bigtable #[cfg_attr( all(feature = "cloud", feature = "integration"), diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 4dc3931ed410..3fd177a34f28 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,9 +1,9 @@ use std::collections::HashSet; -use super::{DirectoryPutter, DirectoryService}; +use super::{Directory, DirectoryPutter, DirectoryService}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::proto::{self, get_directory_request::ByWhat}; -use crate::{B3Digest, Error}; +use crate::{B3Digest, DirectoryError, Error}; use async_stream::try_stream; use futures::stream::BoxStream; use std::sync::Arc; @@ -17,6 +17,7 @@ use tracing::{instrument, warn, Instrument as _}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] pub struct GRPCDirectoryService<T> { + instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, @@ -26,9 +27,13 @@ impl<T> GRPCDirectoryService<T> { /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( + instance_name: String, grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, ) -> Self { - Self { grpc_client } + Self { + instance_name, + grpc_client, + } } } @@ -40,11 +45,8 @@ where <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, T::Future: Send, { - #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] - async fn get( - &self, - digest: &B3Digest, - ) -> Result<Option<crate::proto::Directory>, crate::Error> { + #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest_cpy = digest.clone(); @@ -72,15 +74,10 @@ where "requested directory with digest {}, but got {}", digest, actual_digest ))) - } else if let Err(e) = directory.validate() { - // Validate the Directory itself is valid. - warn!("directory failed validation: {}", e.to_string()); - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - digest, e, - ))) } else { - Ok(Some(directory)) + Ok(Some(directory.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?)) } } Ok(None) => Ok(None), @@ -89,12 +86,12 @@ where } } - #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> { let resp = self .grpc_client .clone() - .put(tokio_stream::once(directory)) + .put(tokio_stream::once(proto::Directory::from(directory))) .await; match resp { @@ -109,11 +106,11 @@ where } } - #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest = root_directory_digest.clone(); @@ -135,14 +132,6 @@ where loop { match stream.message().await { Ok(Some(directory)) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))?; - } // validate we actually expected that directory, and move it from expected to received. let directory_digest = directory.digest(); let was_expected = expected_directory_digests.remove(&directory_digest); @@ -168,6 +157,9 @@ where .insert(child_directory_digest); } + let directory = directory.try_into() + .map_err(|e: DirectoryError| Error::StorageError(e.to_string()))?; + yield directory; }, Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => { @@ -253,13 +245,16 @@ impl ServiceBuilder for GRPCDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let client = proto::directory_service_client::DirectoryServiceClient::new( crate::tonic::channel_from_url(&self.url.parse()?).await?, ); - Ok(Arc::new(GRPCDirectoryService::from_client(client))) + Ok(Arc::new(GRPCDirectoryService::from_client( + instance_name.to_string(), + client, + ))) } } @@ -279,11 +274,11 @@ pub struct GRPCPutter { #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> { match self.rq { // If we're not already closed, send the directory to directory_sender. Some((_, ref directory_sender)) => { - if directory_sender.send(directory).is_err() { + if directory_sender.send(directory.into()).is_err() { // If the channel has been prematurely closed, invoke close (so we can peek at the error code) // That error code is much more helpful, because it // contains the error message from the server. @@ -387,7 +382,7 @@ mod tests { .await .expect("must succeed"), ); - GRPCDirectoryService::from_client(client) + GRPCDirectoryService::from_client("test-instance".into(), client) }; assert!(grpc_client diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index ada4606a5a57..a43d7b8d8d31 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -1,4 +1,4 @@ -use crate::{proto, B3Digest, Error}; +use crate::{B3Digest, Error}; use futures::stream::BoxStream; use std::collections::HashMap; use std::sync::Arc; @@ -7,18 +7,20 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryPutter, DirectoryService, SimplePutter}; +use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter}; use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::proto; #[derive(Clone, Default)] pub struct MemoryDirectoryService { + instance_name: String, db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, } #[async_trait] impl DirectoryService for MemoryDirectoryService { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.read().await; match db.get(digest) { @@ -37,48 +39,33 @@ impl DirectoryService for MemoryDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } - - Ok(Some(directory.clone())) + Ok(Some(directory.clone().try_into().map_err(|e| { + crate::Error::StorageError(format!("corrupted directory: {}", e)) + })?)) } } } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - // store it let mut db = self.db.write().await; - db.insert(digest.clone(), directory); + db.insert(digest.clone(), directory.into()); Ok(digest) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, @@ -107,9 +94,12 @@ impl ServiceBuilder for MemoryDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { - Ok(Arc::new(MemoryDirectoryService::default())) + Ok(Arc::new(MemoryDirectoryService { + instance_name: instance_name.to_string(), + db: Default::default(), + })) } } diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index 17a78b179349..b3cb0f4fd67b 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -1,6 +1,7 @@ use crate::composition::{Registry, ServiceBuilder}; -use crate::{proto, B3Digest, Error}; +use crate::{B3Digest, Directory, Error}; +use auto_impl::auto_impl; use futures::stream::BoxStream; use tonic::async_trait; mod combinators; @@ -12,14 +13,13 @@ mod object_store; mod order_validator; mod redb; mod simple_putter; -mod sled; #[cfg(test)] pub mod tests; mod traverse; mod utils; pub use self::combinators::{Cache, CacheConfig}; -pub use self::directory_graph::DirectoryGraph; +pub use self::directory_graph::{DirectoryGraph, ValidatedDirectoryGraph}; pub use self::from_addr::from_addr; pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig}; pub use self::memory::{MemoryDirectoryService, MemoryDirectoryServiceConfig}; @@ -27,7 +27,6 @@ pub use self::object_store::{ObjectStoreDirectoryService, ObjectStoreDirectorySe pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; pub use self::redb::{RedbDirectoryService, RedbDirectoryServiceConfig}; pub use self::simple_putter::SimplePutter; -pub use self::sled::{SledDirectoryService, SledDirectoryServiceConfig}; pub use self::traverse::descend_to; pub use self::utils::traverse_directory; @@ -38,9 +37,10 @@ mod bigtable; pub use self::bigtable::{BigtableDirectoryService, BigtableParameters}; /// The base trait all Directory services need to implement. -/// This is a simple get and put of [crate::proto::Directory], returning their +/// This is a simple get and put of [Directory], returning their /// digest. #[async_trait] +#[auto_impl(&, &mut, Arc, Box)] pub trait DirectoryService: Send + Sync { /// Looks up a single Directory message by its digest. /// The returned Directory message *must* be valid. @@ -50,14 +50,14 @@ pub trait DirectoryService: Send + Sync { /// Directory digests that are at the "root", aka the last element that's /// sent to a DirectoryPutter. This makes sense for implementations bundling /// closures of directories together in batches. - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>; /// Uploads a single Directory message, and returns the calculated /// digest, or an error. An error *must* also be returned if the message is /// not valid. - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + async fn put(&self, directory: Directory) -> Result<B3Digest, Error>; - /// Looks up a closure of [proto::Directory]. - /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, + /// Looks up a closure of [Directory]. + /// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. /// @@ -75,39 +75,14 @@ pub trait DirectoryService: Send + Sync { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>>; + ) -> BoxStream<'static, Result<Directory, Error>>; - /// Allows persisting a closure of [proto::Directory], which is a graph of + /// Allows persisting a closure of [Directory], which is a graph of /// connected Directory messages. fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; } -#[async_trait] -impl<A> DirectoryService for A -where - A: AsRef<dyn DirectoryService> + Send + Sync, -{ - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { - self.as_ref().get(digest).await - } - - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - self.as_ref().put(directory).await - } - - fn get_recursive( - &self, - root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { - self.as_ref().get_recursive(root_directory_digest) - } - - fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> { - self.as_ref().put_multiple_start() - } -} - -/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// Provides a handle to put a closure of connected [Directory] elements. /// /// The consumer can periodically call [DirectoryPutter::put], starting from the /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to @@ -119,12 +94,12 @@ where /// but a single file or symlink. #[async_trait] pub trait DirectoryPutter: Send { - /// Put a individual [proto::Directory] into the store. + /// Put a individual [Directory] into the store. /// Error semantics and behaviour is up to the specific implementation of /// this trait. /// Due to bursting, the returned error might refer to an object previously /// sent via `put`. - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + async fn put(&mut self, directory: Directory) -> Result<(), Error>; /// Close the stream, and wait for any errors. /// If there's been any invalid Directory message uploaded, and error *must* @@ -138,7 +113,6 @@ pub(crate) fn register_directory_services(reg: &mut Registry) { reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::MemoryDirectoryServiceConfig>("memory"); reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::CacheConfig>("cache"); reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc"); - reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::SledDirectoryServiceConfig>("sled"); reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::RedbDirectoryServiceConfig>("redb"); #[cfg(feature = "cloud")] { diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index a9a2cc8ef5c0..77578ec92f02 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -17,10 +17,11 @@ use tracing::{instrument, trace, warn, Level}; use url::Url; use super::{ - DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator, + Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, + RootToLeavesValidator, }; use crate::composition::{CompositionContext, ServiceBuilder}; -use crate::{proto, B3Digest, Error}; +use crate::{proto, B3Digest, Error, Node}; /// Stores directory closures in an object store. /// Notably, this makes use of the option to disallow accessing child directories except when @@ -31,6 +32,7 @@ use crate::{proto, B3Digest, Error}; /// be returned to the client in get_recursive. #[derive(Clone)] pub struct ObjectStoreDirectoryService { + instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path, } @@ -62,6 +64,7 @@ impl ObjectStoreDirectoryService { let (object_store, path) = object_store::parse_url_opts(url, options)?; Ok(Self { + instance_name: "default".into(), object_store: Arc::new(object_store), base_path: path, }) @@ -71,20 +74,32 @@ impl ObjectStoreDirectoryService { pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> { Self::parse_url_opts(url, Vec::<(String, String)>::new()) } + + pub fn new(instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path) -> Self { + Self { + instance_name, + object_store, + base_path, + } + } } #[async_trait] impl DirectoryService for ObjectStoreDirectoryService { /// This is the same steps as for get_recursive anyways, so we just call get_recursive and /// return the first element of the stream and drop the request. - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.get_recursive(digest).take(1).next().await.transpose() } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - if !directory.directories.is_empty() { + #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { + // Ensure the directory doesn't contain other directory children + if directory + .nodes() + .any(|(_, e)| matches!(e, Node::Directory { .. })) + { return Err(Error::InvalidRequest( "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(), )); @@ -95,11 +110,11 @@ impl DirectoryService for ObjectStoreDirectoryService { handle.close().await } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // Check that we are not passing on bogus from the object store to the client, and that the // trust chain from the root digest to the leaves is intact let mut order_validator = @@ -145,6 +160,10 @@ impl DirectoryService for ObjectStoreDirectoryService { warn!("unable to parse directory {}: {}", digest, e); Error::StorageError(e.to_string()) })?; + let directory = Directory::try_from(directory).map_err(|e| { + warn!("unable to convert directory {}: {}", digest, e); + Error::StorageError(e.to_string()) + })?; // Allow the children to appear next order_validator.add_directory_unchecked(&directory); @@ -210,17 +229,18 @@ impl ServiceBuilder for ObjectStoreDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (object_store, path) = object_store::parse_url_opts( &self.object_store_url.parse()?, &self.object_store_options, )?; - Ok(Arc::new(ObjectStoreDirectoryService { - object_store: Arc::new(object_store), - base_path: path, - })) + Ok(Arc::new(ObjectStoreDirectoryService::new( + instance_name.to_string(), + Arc::new(object_store), + path, + ))) } } @@ -244,7 +264,7 @@ impl ObjectStoreDirectoryPutter { #[async_trait] impl DirectoryPutter for ObjectStoreDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -302,7 +322,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { for directory in directories { directories_sink - .send(directory.encode_to_vec().into()) + .send(proto::Directory::from(directory).encode_to_vec().into()) .await?; } diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs index 6045f5d24198..973af92e1294 100644 --- a/tvix/castore/src/directoryservice/order_validator.rs +++ b/tvix/castore/src/directoryservice/order_validator.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use tracing::warn; -use crate::{proto::Directory, B3Digest}; +use super::Directory; +use crate::{B3Digest, Node}; pub trait OrderValidator { /// Update the order validator's state with the directory @@ -47,10 +48,11 @@ impl RootToLeavesValidator { self.expected_digests.insert(directory.digest()); } - for subdir in &directory.directories { - // Allow the children to appear next - let subdir_digest = subdir.digest.clone().try_into().unwrap(); - self.expected_digests.insert(subdir_digest); + // Allow the children to appear next + for (_, node) in directory.nodes() { + if let Node::Directory { digest, .. } = node { + self.expected_digests.insert(digest.clone()); + } } } } @@ -79,15 +81,20 @@ impl OrderValidator for LeavesToRootValidator { fn add_directory(&mut self, directory: &Directory) -> bool { let digest = directory.digest(); - for subdir in &directory.directories { - let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory() - if !self.allowed_references.contains(&subdir_digest) { - warn!( - directory.digest = %digest, - subdirectory.digest = %subdir_digest, - "unexpected directory reference" - ); - return false; + for (_, node) in directory.nodes() { + if let Node::Directory { + digest: subdir_node_digest, + .. + } = node + { + if !self.allowed_references.contains(subdir_node_digest) { + warn!( + directory.digest = %digest, + subdirectory.digest = %subdir_node_digest, + "unexpected directory reference" + ); + return false; + } } } @@ -101,8 +108,8 @@ impl OrderValidator for LeavesToRootValidator { mod tests { use super::{LeavesToRootValidator, RootToLeavesValidator}; use crate::directoryservice::order_validator::OrderValidator; + use crate::directoryservice::Directory; use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; - use crate::proto::Directory; use rstest::rstest; #[rstest] diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs index 51dc87f92574..ba1fb682d5da 100644 --- a/tvix/castore/src/directoryservice/redb.rs +++ b/tvix/castore/src/directoryservice/redb.rs @@ -5,20 +5,21 @@ use std::{path::PathBuf, sync::Arc}; use tonic::async_trait; use tracing::{instrument, warn}; +use super::{ + traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService, + LeavesToRootValidator, +}; use crate::{ composition::{CompositionContext, ServiceBuilder}, digests, proto, B3Digest, Error, }; -use super::{ - traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, -}; - const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = TableDefinition::new("directory"); #[derive(Clone)] pub struct RedbDirectoryService { + instance_name: String, // We wrap the db in an Arc to be able to move it into spawn_blocking, // as discussed in https://github.com/cberner/redb/issues/789 db: Arc<Database>, @@ -27,7 +28,7 @@ pub struct RedbDirectoryService { impl RedbDirectoryService { /// Constructs a new instance using the specified filesystem path for /// storage. - pub async fn new(path: PathBuf) -> Result<Self, Error> { + pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> { if path == PathBuf::from("/") { return Err(Error::StorageError( "cowardly refusing to open / with redb".to_string(), @@ -41,7 +42,10 @@ impl RedbDirectoryService { }) .await??; - Ok(Self { db: Arc::new(db) }) + Ok(Self { + instance_name, + db: Arc::new(db), + }) } /// Constructs a new instance using the in-memory backend. @@ -51,7 +55,10 @@ impl RedbDirectoryService { create_schema(&db)?; - Ok(Self { db: Arc::new(db) }) + Ok(Self { + instance_name: "default".into(), + db: Arc::new(db), + }) } } @@ -68,8 +75,8 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { #[async_trait] impl DirectoryService for RedbDirectoryService { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.clone(); // Retrieves the protobuf-encoded Directory for the corresponding digest. @@ -107,13 +114,10 @@ impl DirectoryService for RedbDirectoryService { let directory = match proto::Directory::decode(&*directory_data.value()) { Ok(dir) => { // The returned Directory must be valid. - if let Err(e) = dir.validate() { + dir.try_into().map_err(|e| { warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - dir + Error::StorageError("Directory failed validation".to_string()) + })? } Err(e) => { warn!(err=%e, "failed to parse Directory"); @@ -124,27 +128,22 @@ impl DirectoryService for RedbDirectoryService { Ok(Some(directory)) } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // Validate the directory. - if let Err(e) = directory.validate() { - warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - // Store the directory in the table. let txn = db.begin_write()?; { let mut table = txn.open_table(DIRECTORY_TABLE)?; let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } txn.commit()?; @@ -154,11 +153,11 @@ impl DirectoryService for RedbDirectoryService { .await? } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single // redb transaction to avoid constantly closing and opening new transactions for the // database. @@ -185,7 +184,7 @@ pub struct RedbDirectoryPutter { #[async_trait] impl DirectoryPutter for RedbDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -228,7 +227,10 @@ impl DirectoryPutter for RedbDirectoryPutter { for directory in directories { let digest_as_array: [u8; digests::B3_LEN] = directory.digest().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } } @@ -280,7 +282,7 @@ impl ServiceBuilder for RedbDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { match self { @@ -302,7 +304,9 @@ impl ServiceBuilder for RedbDirectoryServiceConfig { RedbDirectoryServiceConfig { is_temporary: false, path: Some(path), - } => Ok(Arc::new(RedbDirectoryService::new(path.into()).await?)), + } => Ok(Arc::new( + RedbDirectoryService::new(instance_name.to_string(), path.into()).await?, + )), } } } diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs index dc54e3d11d18..b4daaee61b22 100644 --- a/tvix/castore/src/directoryservice/simple_putter.rs +++ b/tvix/castore/src/directoryservice/simple_putter.rs @@ -1,7 +1,6 @@ use super::DirectoryPutter; use super::DirectoryService; -use super::{DirectoryGraph, LeavesToRootValidator}; -use crate::proto; +use super::{Directory, DirectoryGraph, LeavesToRootValidator}; use crate::B3Digest; use crate::Error; use tonic::async_trait; @@ -29,7 +28,7 @@ impl<DS: DirectoryService> SimplePutter<DS> { #[async_trait] impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs deleted file mode 100644 index 5766dec1a5c2..000000000000 --- a/tvix/castore/src/directoryservice/sled.rs +++ /dev/null @@ -1,269 +0,0 @@ -use crate::proto::Directory; -use crate::{proto, B3Digest, Error}; -use futures::stream::BoxStream; -use prost::Message; -use std::ops::Deref; -use std::path::Path; -use std::sync::Arc; -use tonic::async_trait; -use tracing::{instrument, warn}; - -use super::utils::traverse_directory; -use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; -use crate::composition::{CompositionContext, ServiceBuilder}; - -#[derive(Clone)] -pub struct SledDirectoryService { - db: sled::Db, -} - -impl SledDirectoryService { - pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> { - if p.as_ref() == Path::new("/") { - return Err(sled::Error::Unsupported( - "cowardly refusing to open / with sled".to_string(), - )); - } - - let config = sled::Config::default() - .use_compression(false) // is a required parameter - .path(p); - let db = config.open()?; - - Ok(Self { db }) - } - - pub fn new_temporary() -> Result<Self, sled::Error> { - let config = sled::Config::default().temporary(true); - let db = config.open()?; - - Ok(Self { db }) - } -} - -#[async_trait] -impl DirectoryService for SledDirectoryService { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { - let resp = tokio::task::spawn_blocking({ - let db = self.db.clone(); - let digest = digest.clone(); - move || db.get(digest.as_slice()) - }) - .await? - .map_err(|e| { - warn!("failed to retrieve directory: {}", e); - Error::StorageError(format!("failed to retrieve directory: {}", e)) - })?; - - match resp { - // The directory was not found, return - None => Ok(None), - - // The directory was found, try to parse the data as Directory message - Some(data) => match Directory::decode(&*data) { - Ok(directory) => { - // Validate the retrieved Directory indeed has the - // digest we expect it to have, to detect corruptions. - let actual_digest = directory.digest(); - if actual_digest != *digest { - return Err(Error::StorageError(format!( - "requested directory with digest {}, but got {}", - digest, actual_digest - ))); - } - - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } - - Ok(Some(directory)) - } - Err(e) => { - warn!("unable to parse directory {}: {}", digest, e); - Err(Error::StorageError(e.to_string())) - } - }, - } - } - - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - tokio::task::spawn_blocking({ - let db = self.db.clone(); - move || { - let digest = directory.digest(); - - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - // store it - db.insert(digest.as_slice(), directory.encode_to_vec()) - .map_err(|e| Error::StorageError(e.to_string()))?; - - Ok(digest) - } - }) - .await? - } - - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive( - &self, - root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { - traverse_directory(self.clone(), root_directory_digest) - } - - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> - where - Self: Clone, - { - Box::new(SledDirectoryPutter { - tree: self.db.deref().clone(), - directory_validator: Some(Default::default()), - }) - } -} - -#[derive(serde::Deserialize)] -#[serde(deny_unknown_fields)] -pub struct SledDirectoryServiceConfig { - is_temporary: bool, - #[serde(default)] - /// required when is_temporary = false - path: Option<String>, -} - -impl TryFrom<url::Url> for SledDirectoryServiceConfig { - type Error = Box<dyn std::error::Error + Send + Sync>; - fn try_from(url: url::Url) -> Result<Self, Self::Error> { - // sled doesn't support host, and a path can be provided (otherwise - // it'll live in memory only). - if url.has_host() { - return Err(Error::StorageError("no host allowed".to_string()).into()); - } - - // TODO: expose compression and other parameters as URL parameters? - - Ok(if url.path().is_empty() { - SledDirectoryServiceConfig { - is_temporary: true, - path: None, - } - } else { - SledDirectoryServiceConfig { - is_temporary: false, - path: Some(url.path().to_string()), - } - }) - } -} - -#[async_trait] -impl ServiceBuilder for SledDirectoryServiceConfig { - type Output = dyn DirectoryService; - async fn build<'a>( - &'a self, - _instance_name: &str, - _context: &CompositionContext, - ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { - match self { - SledDirectoryServiceConfig { - is_temporary: true, - path: None, - } => Ok(Arc::new(SledDirectoryService::new_temporary()?)), - SledDirectoryServiceConfig { - is_temporary: true, - path: Some(_), - } => Err(Error::StorageError( - "Temporary SledDirectoryService can not have path".into(), - ) - .into()), - SledDirectoryServiceConfig { - is_temporary: false, - path: None, - } => Err(Error::StorageError("SledDirectoryService is missing path".into()).into()), - SledDirectoryServiceConfig { - is_temporary: false, - path: Some(path), - } => Ok(Arc::new(SledDirectoryService::new(path)?)), - } - } -} - -/// Buffers Directory messages to be uploaded and inserts them in a batch -/// transaction on close. -pub struct SledDirectoryPutter { - tree: sled::Tree, - - /// The directories (inside the directory validator) that we insert later, - /// or None, if they were already inserted. - directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>, -} - -#[async_trait] -impl DirectoryPutter for SledDirectoryPutter { - #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { - match self.directory_validator { - None => return Err(Error::StorageError("already closed".to_string())), - Some(ref mut validator) => { - validator - .add(directory) - .map_err(|e| Error::StorageError(e.to_string()))?; - } - } - - Ok(()) - } - - #[instrument(level = "trace", skip_all, ret, err)] - async fn close(&mut self) -> Result<B3Digest, Error> { - match self.directory_validator.take() { - None => Err(Error::InvalidRequest("already closed".to_string())), - Some(validator) => { - // Insert all directories as a batch. - tokio::task::spawn_blocking({ - let tree = self.tree.clone(); - move || { - // retrieve the validated directories. - let directories = validator - .validate() - .map_err(|e| Error::StorageError(e.to_string()))? - .drain_leaves_to_root() - .collect::<Vec<_>>(); - - // Get the root digest, which is at the end (cf. insertion order) - let root_digest = directories - .last() - .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))? - .digest(); - - let mut batch = sled::Batch::default(); - for directory in directories { - batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); - } - - tree.apply_batch(batch).map_err(|e| { - Error::StorageError(format!("unable to apply batch: {}", e)) - })?; - - Ok(root_digest) - } - }) - .await? - } - } - } -} diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index b698f70ea469..d394a5679c32 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -8,10 +8,8 @@ use rstest_reuse::{self, *}; use super::DirectoryService; use crate::directoryservice; -use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}, - proto::{self, Directory}, -}; +use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}; +use crate::{Directory, Node}; mod utils; use self::utils::make_grpc_directory_service_client; @@ -25,7 +23,6 @@ use self::utils::make_grpc_directory_service_client; #[rstest] #[case::grpc(make_grpc_directory_service_client().await)] #[case::memory(directoryservice::from_addr("memory://").await.unwrap())] -#[case::sled(directoryservice::from_addr("sled://").await.unwrap())] #[case::redb(directoryservice::from_addr("redb://").await.unwrap())] #[case::objectstore(directoryservice::from_addr("objectstore+memory://").await.unwrap())] #[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))] @@ -41,10 +38,10 @@ async fn test_non_exist(directory_service: impl DirectoryService) { // recursive get assert_eq!( - Vec::<Result<proto::Directory, crate::Error>>::new(), + Vec::<Result<Directory, crate::Error>>::new(), directory_service .get_recursive(&DIRECTORY_A.digest()) - .collect::<Vec<Result<proto::Directory, crate::Error>>>() + .collect::<Vec<Result<Directory, crate::Error>>>() .await ); } @@ -212,58 +209,20 @@ async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService } } -/// Try uploading a Directory failing its internal validation, ensure it gets -/// rejected. -#[apply(directory_services)] -#[tokio::test] -async fn upload_reject_failing_validation(directory_service: impl DirectoryService) { - let broken_directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // wrong! - target: "doesntmatter".into(), - }], - ..Default::default() - }; - assert!(broken_directory.validate().is_err()); - - // Try to upload via single upload. - assert!( - directory_service - .put(broken_directory.clone()) - .await - .is_err(), - "single upload must fail" - ); - - // Try to upload via put_multiple. We're a bit more permissive here, the - // intermediate .put() might succeed, due to client-side bursting (in the - // case of gRPC), but then the close MUST fail. - let mut handle = directory_service.put_multiple_start(); - if handle.put(broken_directory).await.is_ok() { - assert!( - handle.close().await.is_err(), - "when succeeding put, close must fail" - ) - } -} - /// Try uploading a Directory that refers to a previously-uploaded directory. /// Both pass their isolated validation, but the size field in the parent is wrong. /// This should be rejected. #[apply(directory_services)] #[tokio::test] async fn upload_reject_wrong_size(directory_service: impl DirectoryService) { - let wrong_parent_directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), + let wrong_parent_directory = Directory::try_from_iter([( + "foo".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() - }; - - // Make sure isolated validation itself is ok - assert!(wrong_parent_directory.validate().is_ok()); + }, + )]) + .unwrap(); // Now upload both. Ensure it either fails during the second put, or during // the close. diff --git a/tvix/castore/src/directoryservice/tests/utils.rs b/tvix/castore/src/directoryservice/tests/utils.rs index 3d245ea412d5..1c5d68de0108 100644 --- a/tvix/castore/src/directoryservice/tests/utils.rs +++ b/tvix/castore/src/directoryservice/tests/utils.rs @@ -33,6 +33,7 @@ pub async fn make_grpc_directory_service_client() -> Box<dyn DirectoryService> { // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); Box::new(GRPCDirectoryService::from_client( + "default".into(), DirectoryServiceClient::new( Endpoint::try_from("http://[::]:50051") .unwrap() diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs index 17a51ae2bbff..0bd67e9bcf1f 100644 --- a/tvix/castore/src/directoryservice/traverse.rs +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -1,8 +1,4 @@ -use super::DirectoryService; -use crate::{ - proto::{node::Node, NamedNode}, - B3Digest, Error, Path, -}; +use crate::{directoryservice::DirectoryService, Error, Node, Path}; use tracing::{instrument, warn}; /// This descends from a (root) node to the given (sub)path, returning the Node @@ -17,19 +13,14 @@ where DS: AsRef<dyn DirectoryService>, { let mut parent_node = root_node; - for component in path.as_ref().components() { + for component in path.as_ref().components_bytes() { match parent_node { - Node::File(_) | Node::Symlink(_) => { + Node::File { .. } | Node::Symlink { .. } => { // There's still some path left, but the parent node is no directory. // This means the path doesn't exist, as we can't reach it. return Ok(None); } - Node::Directory(directory_node) => { - let digest: B3Digest = directory_node - .digest - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - + Node::Directory { digest, .. } => { // fetch the linked node from the directory_service. let directory = directory_service @@ -44,15 +35,16 @@ where })?; // look for the component in the [Directory]. - // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we - // could stop as soon as e.name is larger than the search string. - if let Some(child_node) = directory.nodes().find(|n| n.get_name() == component) { + if let Some((_child_name, child_node)) = directory + .into_nodes() + .find(|(name, _node)| name.as_ref() == component) + { // child node found, update prev_node to that and continue. - parent_node = child_node; + parent_node = child_node.clone(); } else { // child node not found means there's no such element inside the directory. return Ok(None); - } + }; } } } @@ -65,8 +57,8 @@ where mod tests { use crate::{ directoryservice, - fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, - PathBuf, + fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST}, + Node, PathBuf, }; use super::descend_to; @@ -88,21 +80,23 @@ mod tests { handle.close().await.expect("must upload"); // construct the node for DIRECTORY_COMPLICATED - let node_directory_complicated = - crate::proto::node::Node::Directory(crate::proto::DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }); + let node_directory_complicated = Node::Directory { + digest: DIRECTORY_COMPLICATED.digest(), + size: DIRECTORY_COMPLICATED.size(), + }; // construct the node for DIRECTORY_COMPLICATED - let node_directory_with_keep = crate::proto::node::Node::Directory( - DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), - ); + let node_directory_with_keep = Node::Directory { + digest: DIRECTORY_WITH_KEEP.digest(), + size: DIRECTORY_WITH_KEEP.size(), + }; // construct the node for the .keep file - let node_file_keep = - crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + let node_file_keep = Node::File { + digest: EMPTY_BLOB_DIGEST.clone(), + size: 0, + executable: false, + }; // traversal to an empty subpath should return the root node. { diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index 726734f55eec..d073c2c3c8ec 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -1,21 +1,22 @@ +use super::Directory; use super::DirectoryService; -use crate::proto; use crate::B3Digest; use crate::Error; +use crate::Node; use async_stream::try_stream; use futures::stream::BoxStream; use std::collections::{HashSet, VecDeque}; use tracing::instrument; use tracing::warn; -/// Traverses a [proto::Directory] from the root to the children. +/// Traverses a [Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. #[instrument(skip(directory_service))] pub fn traverse_directory<'a, DS: DirectoryService + 'static>( directory_service: DS, root_directory_digest: &B3Digest, -) -> BoxStream<'a, Result<proto::Directory, Error>> { +) -> BoxStream<'a, Result<Directory, Error>> { // The list of all directories that still need to be traversed. The next // element is picked from the front, new elements are enqueued at the // back. @@ -50,16 +51,6 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( Some(dir) => dir, }; - - // validate, we don't want to send invalid directories. - current_directory.validate().map_err(|e| { - warn!("directory failed validation: {}", e.to_string()); - Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - )) - })?; - // We're about to send this directory, so let's avoid sending it again if a // descendant has it. sent_directory_digests.insert(current_directory_digest); @@ -67,16 +58,15 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( // enqueue all child directory digests to the work queue, as // long as they're not part of the worklist or already sent. // This panics if the digest looks invalid, it's supposed to be checked first. - for child_directory_node in ¤t_directory.directories { - // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); - - if worklist_directory_digests.contains(&child_digest) - || sent_directory_digests.contains(&child_digest) - { - continue; + for (_, child_directory_node) in current_directory.nodes() { + if let Node::Directory{digest: child_digest, ..} = child_directory_node { + if worklist_directory_digests.contains(child_digest) + || sent_directory_digests.contains(child_digest) + { + continue; + } + worklist_directory_digests.push_back(child_digest.clone()); } - worklist_directory_digests.push_back(child_digest); } yield current_directory; diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs index 5bbcd7b04ef1..1c4605200842 100644 --- a/tvix/castore/src/errors.rs +++ b/tvix/castore/src/errors.rs @@ -1,7 +1,13 @@ +use bstr::ByteSlice; use thiserror::Error; use tokio::task::JoinError; use tonic::Status; +use crate::{ + path::{PathComponent, PathComponentError}, + SymlinkTargetError, +}; + /// Errors related to communication with the store. #[derive(Debug, Error, PartialEq)] pub enum Error { @@ -12,6 +18,52 @@ pub enum Error { StorageError(String), } +/// Errors that occur during construction of [crate::Node] +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum ValidateNodeError { + /// Invalid digest length encountered + #[error("invalid digest length: {0}")] + InvalidDigestLen(usize), + /// Invalid symlink target + #[error("Invalid symlink target: {0}")] + InvalidSymlinkTarget(SymlinkTargetError), +} + +impl From<crate::digests::Error> for ValidateNodeError { + fn from(e: crate::digests::Error) -> Self { + match e { + crate::digests::Error::InvalidDigestLen(n) => ValidateNodeError::InvalidDigestLen(n), + } + } +} + +/// Errors that can occur when populating [crate::Directory] messages, +/// or parsing [crate::proto::Directory] +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum DirectoryError { + /// Multiple elements with the same name encountered + #[error("{:?} is a duplicate name", .0)] + DuplicateName(PathComponent), + /// Node failed validation + #[error("invalid node with name {}: {:?}", .0.as_bstr(), .1.to_string())] + InvalidNode(bytes::Bytes, ValidateNodeError), + #[error("Total size exceeds u64::MAX")] + SizeOverflow, + /// Invalid name encountered + #[error("Invalid name: {0}")] + InvalidName(PathComponentError), + /// This can occur if a protobuf node with a name is passed where we expect + /// it to be anonymous. + #[error("Name is set when it shouldn't")] + NameInAnonymousNode, + /// Elements are not in sorted order. Can only happen on protos + #[error("{:?} is not sorted", .0.as_bstr())] + WrongSorting(bytes::Bytes), + /// This can only happen if there's an unknown node type (on protos) + #[error("No node set")] + NoNodeSet, +} + impl From<JoinError> for Error { fn from(value: JoinError) -> Self { Error::StorageError(value.to_string()) diff --git a/tvix/castore/src/fixtures.rs b/tvix/castore/src/fixtures.rs index 3ebda64a818a..db0ee59daf60 100644 --- a/tvix/castore/src/fixtures.rs +++ b/tvix/castore/src/fixtures.rs @@ -1,103 +1,120 @@ -use crate::{ - proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode}, - B3Digest, -}; -use lazy_static::lazy_static; +use bytes::Bytes; +use std::sync::LazyLock; + +use crate::{B3Digest, Directory, Node}; pub const HELLOWORLD_BLOB_CONTENTS: &[u8] = b"Hello World!"; pub const EMPTY_BLOB_CONTENTS: &[u8] = b""; -lazy_static! { - pub static ref DUMMY_DIGEST: B3Digest = { - let u = [0u8; 32]; - (&u).into() - }; - pub static ref DUMMY_DIGEST_2: B3Digest = { - let mut u = [0u8; 32]; - u[0] = 0x10; - (&u).into() - }; - pub static ref DUMMY_DATA_1: bytes::Bytes = vec![0x01, 0x02, 0x03].into(); - pub static ref DUMMY_DATA_2: bytes::Bytes = vec![0x04, 0x05].into(); +pub static DUMMY_DIGEST: LazyLock<B3Digest> = LazyLock::new(|| (&[0u8; 32]).into()); +pub static DUMMY_DIGEST_2: LazyLock<B3Digest> = LazyLock::new(|| { + let mut u = [0u8; 32]; + u[0] = 0x10; + (&u).into() +}); +pub static DUMMY_DATA_1: LazyLock<Bytes> = LazyLock::new(|| vec![0x01, 0x02, 0x03].into()); +pub static DUMMY_DATA_2: LazyLock<Bytes> = LazyLock::new(|| vec![0x04, 0x05].into()); - pub static ref HELLOWORLD_BLOB_DIGEST: B3Digest = - blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into(); - pub static ref EMPTY_BLOB_DIGEST: B3Digest = - blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into(); +pub static HELLOWORLD_BLOB_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into()); +pub static EMPTY_BLOB_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into()); - // 2 bytes - pub static ref BLOB_A: bytes::Bytes = vec![0x00, 0x01].into(); - pub static ref BLOB_A_DIGEST: B3Digest = blake3::hash(&BLOB_A).as_bytes().into(); +// 2 bytes +pub static BLOB_A: LazyLock<Bytes> = LazyLock::new(|| vec![0x00, 0x01].into()); +pub static BLOB_A_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&BLOB_A).as_bytes().into()); - // 1MB - pub static ref BLOB_B: bytes::Bytes = (0..255).collect::<Vec<u8>>().repeat(4 * 1024).into(); - pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into(); +// 1MB +pub static BLOB_B: LazyLock<Bytes> = + LazyLock::new(|| (0..255).collect::<Vec<u8>>().repeat(4 * 1024).into()); +pub static BLOB_B_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&BLOB_B).as_bytes().into()); - // Directories - pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory { - directories: vec![], - files: vec![FileNode { - name: b".keep".to_vec().into(), - digest: EMPTY_BLOB_DIGEST.clone().into(), - size: 0, - executable: false, - }], - symlinks: vec![], - }; - pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory { - directories: vec![DirectoryNode { - name: b"keep".to_vec().into(), - digest: DIRECTORY_WITH_KEEP.digest().into(), - size: DIRECTORY_WITH_KEEP.size(), - }], - files: vec![FileNode { - name: b".keep".to_vec().into(), - digest: EMPTY_BLOB_DIGEST.clone().into(), +// Directories +pub static DIRECTORY_WITH_KEEP: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([( + ".keep".try_into().unwrap(), + Node::File { + digest: EMPTY_BLOB_DIGEST.clone(), size: 0, executable: false, - }], - symlinks: vec![SymlinkNode { - name: b"aa".to_vec().into(), - target: b"/nix/store/somewhereelse".to_vec().into(), - }], - }; - pub static ref DIRECTORY_A: Directory = Directory::default(); - pub static ref DIRECTORY_B: Directory = Directory { - directories: vec![DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), + }, + )]) + .unwrap() +}); +pub static DIRECTORY_COMPLICATED: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([ + ( + "keep".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_WITH_KEEP.digest(), + size: DIRECTORY_WITH_KEEP.size(), + }, + ), + ( + ".keep".try_into().unwrap(), + Node::File { + digest: EMPTY_BLOB_DIGEST.clone(), + size: 0, + executable: false, + }, + ), + ( + "aa".try_into().unwrap(), + Node::Symlink { + target: "/nix/store/somewhereelse".try_into().unwrap(), + }, + ), + ]) + .unwrap() +}); +pub static DIRECTORY_A: LazyLock<Directory> = LazyLock::new(Directory::new); +pub static DIRECTORY_B: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([( + "a".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size(), - }], - ..Default::default() - }; - pub static ref DIRECTORY_C: Directory = Directory { - directories: vec![ - DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), + }, + )]) + .unwrap() +}); +pub static DIRECTORY_C: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([ + ( + "a".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size(), }, - DirectoryNode { - name: b"a'".to_vec().into(), - digest: DIRECTORY_A.digest().into(), + ), + ( + "a'".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size(), - } - ], - ..Default::default() - }; - pub static ref DIRECTORY_D: proto::Directory = proto::Directory { - directories: vec![ - DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), + }, + ), + ]) + .unwrap() +}); +pub static DIRECTORY_D: LazyLock<Directory> = LazyLock::new(|| { + Directory::try_from_iter([ + ( + "a".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size(), }, - DirectoryNode { - name: b"b'".to_vec().into(), - digest: DIRECTORY_B.digest().into(), + ), + ( + "b".try_into().unwrap(), + Node::Directory { + digest: DIRECTORY_B.digest(), size: DIRECTORY_B.size(), - } - ], - ..Default::default() - }; -} + }, + ), + ]) + .unwrap() +}); diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs index bcebcf4a7292..0d68af090daf 100644 --- a/tvix/castore/src/fs/fuse/tests.rs +++ b/tvix/castore/src/fs/fuse/tests.rs @@ -1,5 +1,4 @@ use bstr::ByteSlice; -use bytes::Bytes; use std::{ collections::BTreeMap, ffi::{OsStr, OsString}, @@ -12,13 +11,14 @@ use tempfile::TempDir; use tokio_stream::{wrappers::ReadDirStream, StreamExt}; use super::FuseDaemon; -use crate::fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST}; -use crate::proto as castorepb; -use crate::proto::node::Node; use crate::{ blobservice::{BlobService, MemoryBlobService}, directoryservice::{DirectoryService, MemoryDirectoryService}, - fixtures, + fixtures, Node, +}; +use crate::{ + fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST}, + PathComponent, }; const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; @@ -39,14 +39,14 @@ fn gen_svcs() -> (Arc<dyn BlobService>, Arc<dyn DirectoryService>) { fn do_mount<P: AsRef<Path>, BS, DS>( blob_service: BS, directory_service: DS, - root_nodes: BTreeMap<bytes::Bytes, Node>, + root_nodes: BTreeMap<PathComponent, Node>, mountpoint: P, list_root: bool, show_xattr: bool, ) -> io::Result<FuseDaemon> where - BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, - DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, + BS: BlobService + Send + Sync + Clone + 'static, + DS: DirectoryService + Send + Sync + Clone + 'static, { let fs = TvixStoreFs::new( blob_service, @@ -60,7 +60,7 @@ where async fn populate_blob_a( blob_service: &Arc<dyn BlobService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { let mut bw = blob_service.open_write().await; tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) @@ -69,19 +69,18 @@ async fn populate_blob_a( bw.close().await.expect("must succeed closing"); root_nodes.insert( - BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), + BLOB_A_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::BLOB_A_DIGEST.clone(), size: fixtures::BLOB_A.len() as u64, executable: false, - }), + }, ); } async fn populate_blob_b( blob_service: &Arc<dyn BlobService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { let mut bw = blob_service.open_write().await; tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) @@ -90,20 +89,19 @@ async fn populate_blob_b( bw.close().await.expect("must succeed closing"); root_nodes.insert( - BLOB_B_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_B_NAME.into(), - digest: fixtures::BLOB_B_DIGEST.clone().into(), + BLOB_B_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::BLOB_B_DIGEST.clone(), size: fixtures::BLOB_B.len() as u64, executable: false, - }), + }, ); } /// adds a blob containing helloworld and marks it as executable async fn populate_blob_helloworld( blob_service: &Arc<dyn BlobService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { let mut bw = blob_service.open_write().await; tokio::io::copy( @@ -115,42 +113,39 @@ async fn populate_blob_helloworld( bw.close().await.expect("must succeed closing"); root_nodes.insert( - HELLOWORLD_BLOB_NAME.into(), - Node::File(castorepb::FileNode { - name: HELLOWORLD_BLOB_NAME.into(), - digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), + HELLOWORLD_BLOB_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone(), size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, executable: true, - }), + }, ); } -async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_symlink(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - SYMLINK_NAME.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME.into(), - target: BLOB_A_NAME.into(), - }), + SYMLINK_NAME.try_into().unwrap(), + Node::Symlink { + target: BLOB_A_NAME.try_into().unwrap(), + }, ); } /// This writes a symlink pointing to /nix/store/somewhereelse, /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. -async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_symlink2(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - SYMLINK_NAME2.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME2.into(), - target: "/nix/store/somewhereelse".into(), - }), + SYMLINK_NAME2.try_into().unwrap(), + Node::Symlink { + target: "/nix/store/somewhereelse".try_into().unwrap(), + }, ); } async fn populate_directory_with_keep( blob_service: &Arc<dyn BlobService>, directory_service: &Arc<dyn DirectoryService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { // upload empty blob let mut bw = blob_service.open_write().await; @@ -166,45 +161,42 @@ async fn populate_directory_with_keep( .expect("must succeed uploading"); root_nodes.insert( - DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + DIRECTORY_WITH_KEEP_NAME.try_into().unwrap(), + Node::Directory { + digest: fixtures::DIRECTORY_WITH_KEEP.digest(), size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + }, ); } /// Create a root node for DIRECTORY_WITH_KEEP, but don't upload the Directory /// itself. -async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + DIRECTORY_WITH_KEEP_NAME.try_into().unwrap(), + Node::Directory { + digest: fixtures::DIRECTORY_WITH_KEEP.digest(), size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + }, ); } /// Insert BLOB_A, but don't provide the blob .keep is pointing to. -async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), + BLOB_A_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::BLOB_A_DIGEST.clone(), size: fixtures::BLOB_A.len() as u64, executable: false, - }), + }, ); } async fn populate_directory_complicated( blob_service: &Arc<dyn BlobService>, directory_service: &Arc<dyn DirectoryService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { // upload empty blob let mut bw = blob_service.open_write().await; @@ -226,12 +218,11 @@ async fn populate_directory_complicated( .expect("must succeed uploading"); root_nodes.insert( - DIRECTORY_COMPLICATED_NAME.into(), - Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_COMPLICATED_NAME.into(), - digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), + DIRECTORY_COMPLICATED_NAME.try_into().unwrap(), + Node::Directory { + digest: fixtures::DIRECTORY_COMPLICATED.digest(), size: fixtures::DIRECTORY_COMPLICATED.size(), - }), + }, ); } diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs index bdd459543470..2696fdede378 100644 --- a/tvix/castore/src/fs/inodes.rs +++ b/tvix/castore/src/fs/inodes.rs @@ -2,10 +2,7 @@ //! about inodes, which present tvix-castore nodes in a filesystem. use std::time::Duration; -use bytes::Bytes; - -use crate::proto as castorepb; -use crate::B3Digest; +use crate::{path::PathComponent, B3Digest, Node}; #[derive(Clone, Debug)] pub enum InodeData { @@ -20,27 +17,23 @@ pub enum InodeData { /// lookup and did fetch the data. #[derive(Clone, Debug)] pub enum DirectoryInodeData { - Sparse(B3Digest, u64), // digest, size - Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)] + Sparse(B3Digest, u64), // digest, size + Populated(B3Digest, Vec<(u64, PathComponent, Node)>), // [(child_inode, name, node)] } impl InodeData { /// Constructs a new InodeData by consuming a [Node]. - /// It splits off the orginal name, so it can be used later. - pub fn from_node(node: castorepb::node::Node) -> (Self, Bytes) { + pub fn from_node(node: &Node) -> Self { match node { - castorepb::node::Node::Directory(n) => ( - Self::Directory(DirectoryInodeData::Sparse( - n.digest.try_into().unwrap(), - n.size, - )), - n.name, - ), - castorepb::node::Node::File(n) => ( - Self::Regular(n.digest.try_into().unwrap(), n.size, n.executable), - n.name, - ), - castorepb::node::Node::Symlink(n) => (Self::Symlink(n.target), n.name), + Node::Directory { digest, size } => { + Self::Directory(DirectoryInodeData::Sparse(digest.clone(), *size)) + } + Node::File { + digest, + size, + executable, + } => Self::Regular(digest.clone(), *size, *executable), + Node::Symlink { target } => Self::Symlink(target.clone().into()), } } diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index b565ed60ac42..4f50868b8f44 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -15,15 +15,13 @@ use self::{ inode_tracker::InodeTracker, inodes::{DirectoryInodeData, InodeData}, }; -use crate::proto as castorepb; use crate::{ blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, - proto::{node::Node, NamedNode}, - B3Digest, + path::PathComponent, + B3Digest, Node, }; use bstr::ByteVec; -use bytes::Bytes; use fuse_backend_rs::abi::fuse_abi::{stat64, OpenOptions}; use fuse_backend_rs::api::filesystem::{ Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID, @@ -89,7 +87,7 @@ pub struct TvixStoreFs<BS, DS, RN> { show_xattr: bool, /// This maps a given basename in the root to the inode we allocated for the node. - root_nodes: RwLock<HashMap<Bytes, u64>>, + root_nodes: RwLock<HashMap<PathComponent, u64>>, /// This keeps track of inodes and data alongside them. inode_tracker: RwLock<InodeTracker>, @@ -105,7 +103,7 @@ pub struct TvixStoreFs<BS, DS, RN> { u64, ( Span, - Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>, + Arc<Mutex<mpsc::Receiver<(usize, Result<(PathComponent, Node), crate::Error>)>>>, ), >, >, @@ -123,8 +121,8 @@ pub struct TvixStoreFs<BS, DS, RN> { impl<BS, DS, RN> TvixStoreFs<BS, DS, RN> where - BS: AsRef<dyn BlobService> + Clone + Send, - DS: AsRef<dyn DirectoryService> + Clone + Send + 'static, + BS: BlobService + Clone + Send, + DS: DirectoryService + Clone + Send + 'static, RN: RootNodes + Clone + 'static, { pub fn new( @@ -156,7 +154,7 @@ where /// Retrieves the inode for a given root node basename, if present. /// This obtains a read lock on self.root_nodes. - fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> { + fn get_inode_for_root_name(&self, name: &PathComponent) -> Option<u64> { self.root_nodes.read().get(name).cloned() } @@ -167,8 +165,12 @@ where /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup /// in self.directory_service is performed, and self.inode_tracker is updated with the /// [DirectoryInodeData::Populated]. + #[allow(clippy::type_complexity)] #[instrument(skip(self), err)] - fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> { + fn get_directory_children( + &self, + ino: u64, + ) -> io::Result<(B3Digest, Vec<(u64, PathComponent, Node)>)> { let data = self.inode_tracker.read().get(ino).unwrap(); match *data { // if it's populated already, return children. @@ -184,7 +186,7 @@ where .block_on({ let directory_service = self.directory_service.clone(); let parent_digest = parent_digest.to_owned(); - async move { directory_service.as_ref().get(&parent_digest).await } + async move { directory_service.get(&parent_digest).await } })? .ok_or_else(|| { warn!(directory.digest=%parent_digest, "directory not found"); @@ -198,13 +200,13 @@ where let children = { let mut inode_tracker = self.inode_tracker.write(); - let children: Vec<(u64, castorepb::node::Node)> = directory - .nodes() - .map(|child_node| { - let (inode_data, _) = InodeData::from_node(child_node.clone()); + let children: Vec<(u64, PathComponent, Node)> = directory + .into_nodes() + .map(|(child_name, child_node)| { + let inode_data = InodeData::from_node(&child_node); let child_ino = inode_tracker.put(inode_data); - (child_ino, child_node) + (child_ino, child_name, child_node) }) .collect(); @@ -238,12 +240,12 @@ where /// In the case the name can't be found, a libc::ENOENT is returned. fn name_in_root_to_ino_and_data( &self, - name: &std::ffi::CStr, + name: &PathComponent, ) -> io::Result<(u64, Arc<InodeData>)> { // Look up the inode for that root node. // If there's one, [self.inode_tracker] MUST also contain the data, // which we can then return. - if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) { + if let Some(inode) = self.get_inode_for_root_name(name) { return Ok(( inode, self.inode_tracker @@ -257,7 +259,8 @@ where // We don't have it yet, look it up in [self.root_nodes]. match self.tokio_handle.block_on({ let root_nodes_provider = self.root_nodes_provider.clone(); - async move { root_nodes_provider.get_by_basename(name.to_bytes()).await } + let name = name.clone(); + async move { root_nodes_provider.get_by_basename(&name).await } }) { // if there was an error looking up the root node, propagate up an IO error. Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)), @@ -265,15 +268,9 @@ where Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), // The root node does exist Ok(Some(root_node)) => { - // The name must match what's passed in the lookup, otherwise this is also a ENOENT. - if root_node.get_name() != name.to_bytes() { - debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch"); - return Err(io::Error::from_raw_os_error(libc::ENOENT)); - } - // Let's check if someone else beat us to updating the inode tracker and // root_nodes map. This avoids locking inode_tracker for writing. - if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) { + if let Some(ino) = self.root_nodes.read().get(name) { return Ok(( *ino, self.inode_tracker.read().get(*ino).expect("must exist"), @@ -287,9 +284,9 @@ where // insert the (sparse) inode data and register in // self.root_nodes. - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&root_node); let ino = inode_tracker.put(inode_data.clone()); - root_nodes.insert(name, ino); + root_nodes.insert(name.to_owned(), ino); Ok((ino, Arc::new(inode_data))) } @@ -303,10 +300,22 @@ const ROOT_NODES_BUFFER_SIZE: usize = 16; const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest"; const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest"; +#[cfg(all(feature = "virtiofs", target_os = "linux"))] +impl<BS, DS, RN> fuse_backend_rs::api::filesystem::Layer for TvixStoreFs<BS, DS, RN> +where + BS: BlobService + Clone + Send + 'static, + DS: DirectoryService + Send + Clone + 'static, + RN: RootNodes + Clone + 'static, +{ + fn root_inode(&self) -> Self::Inode { + ROOT_ID + } +} + impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN> where - BS: AsRef<dyn BlobService> + Clone + Send + 'static, - DS: AsRef<dyn DirectoryService> + Send + Clone + 'static, + BS: BlobService + Clone + Send + 'static, + DS: DirectoryService + Send + Clone + 'static, RN: RootNodes + Clone + 'static, { type Handle = u64; @@ -345,13 +354,17 @@ where ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> { debug!("lookup"); + // convert the CStr to a PathComponent + // If it can't be converted, we definitely don't have anything here. + let name: PathComponent = name.try_into().map_err(|_| std::io::ErrorKind::NotFound)?; + // This goes from a parent inode to a node. // - If the parent is [ROOT_ID], we need to check // [self.root_nodes] (fetching from a [RootNode] provider if needed) // - Otherwise, lookup the parent in [self.inode_tracker] (which must be // a [InodeData::Directory]), and find the child with that name. if parent == ROOT_ID { - let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?; + let (ino, inode_data) = self.name_in_root_to_ino_and_data(&name)?; debug!(inode_data=?&inode_data, ino=ino, "Some"); return Ok(inode_data.as_fuse_entry(ino)); @@ -364,7 +377,7 @@ where // Search for that name in the list of children and return the FileAttrs. // in the children, find the one with the desired name. - if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + if let Some((child_ino, _, _)) = children.iter().find(|(_, n, _)| n == &name) { // lookup the child [InodeData] in [self.inode_tracker]. // We know the inodes for children have already been allocated. let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap(); @@ -400,8 +413,8 @@ where self.tokio_handle.spawn( async move { let mut stream = root_nodes_provider.list().enumerate(); - while let Some(node) = stream.next().await { - if tx.send(node).await.is_err() { + while let Some(e) = stream.next().await { + if tx.send(e).await.is_err() { // If we get a send error, it means the sync code // doesn't want any more entries. break; @@ -463,12 +476,12 @@ where .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; while let Some((i, n)) = rx.blocking_recv() { - let root_node = n.map_err(|e| { + let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EIO) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -483,7 +496,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), })?; // If the buffer is full, add_entry will return `Ok(0)`. if written == 0 { @@ -497,15 +510,17 @@ where let (parent_digest, children) = self.get_directory_children(inode)?; Span::current().record("directory.digest", parent_digest.to_string()); - for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + for (i, (ino, child_name, child_node)) in + children.into_iter().skip(offset as usize).enumerate() + { + let inode_data = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: child_name.as_ref(), })?; // If the buffer is full, add_entry will return `Ok(0)`. if written == 0 { @@ -550,12 +565,12 @@ where .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; while let Some((i, n)) = rx.blocking_recv() { - let root_node = n.map_err(|e| { + let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EPERM) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -571,7 +586,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), }, inode_data.as_fuse_entry(ino), )?; @@ -587,8 +602,8 @@ where let (parent_digest, children) = self.get_directory_children(inode)?; Span::current().record("directory.digest", parent_digest.to_string()); - for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() { + let inode_data = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry( @@ -596,7 +611,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), }, inode_data.as_fuse_entry(ino), )?; @@ -641,6 +656,7 @@ where ) -> io::Result<( Option<Self::Handle>, fuse_backend_rs::api::filesystem::OpenOptions, + Option<u32>, )> { if inode == ROOT_ID { return Err(io::Error::from_raw_os_error(libc::ENOSYS)); @@ -659,7 +675,7 @@ where match self.tokio_handle.block_on({ let blob_service = self.blob_service.clone(); let blob_digest = blob_digest.clone(); - async move { blob_service.as_ref().open_read(&blob_digest).await } + async move { blob_service.open_read(&blob_digest).await } }) { Ok(None) => { warn!("blob not found"); @@ -684,6 +700,7 @@ where Ok(( Some(fh), fuse_backend_rs::api::filesystem::OpenOptions::empty(), + None, )) } } diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs index 6609e049a1fc..5ed1a4d8d6c0 100644 --- a/tvix/castore/src/fs/root_nodes.rs +++ b/tvix/castore/src/fs/root_nodes.rs @@ -1,7 +1,6 @@ use std::collections::BTreeMap; -use crate::{proto::node::Node, Error}; -use bytes::Bytes; +use crate::{path::PathComponent, Error, Node}; use futures::stream::BoxStream; use tonic::async_trait; @@ -11,11 +10,12 @@ use tonic::async_trait; pub trait RootNodes: Send + Sync { /// Looks up a root CA node based on the basename of the node in the root /// directory of the filesystem. - async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>; + async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error>; - /// Lists all root CA nodes in the filesystem. An error can be returned - /// in case listing is not allowed - fn list(&self) -> BoxStream<Result<Node, Error>>; + /// Lists all root CA nodes in the filesystem, as a tuple of (base)name + /// and Node. + /// An error can be returned in case listing is not allowed. + fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>>; } #[async_trait] @@ -23,15 +23,17 @@ pub trait RootNodes: Send + Sync { /// the key is the node name. impl<T> RootNodes for T where - T: AsRef<BTreeMap<Bytes, Node>> + Send + Sync, + T: AsRef<BTreeMap<PathComponent, Node>> + Send + Sync, { - async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> { + async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error> { Ok(self.as_ref().get(name).cloned()) } - fn list(&self) -> BoxStream<Result<Node, Error>> { + fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> { Box::pin(tokio_stream::iter( - self.as_ref().iter().map(|(_, v)| Ok(v.clone())), + self.as_ref() + .iter() + .map(|(name, node)| Ok((name.to_owned(), node.to_owned()))), )) } } diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index cd5b1290e031..4cbff687b864 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -13,7 +13,7 @@ use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::import::{ingest_entries, IngestionEntry, IngestionError}; -use crate::proto::node::Node; +use crate::Node; use super::blobs::{self, ConcurrentBlobUploader}; @@ -292,44 +292,43 @@ impl IngestionEntryGraph { #[cfg(test)] mod test { - use crate::import::IngestionEntry; - use crate::B3Digest; + use std::sync::LazyLock; use super::{Error, IngestionEntryGraph}; + use crate::import::IngestionEntry; + use crate::B3Digest; - use lazy_static::lazy_static; use rstest::rstest; - lazy_static! { - pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into(); - pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { - path: "a".parse().unwrap() - }; - pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { - path: "b".parse().unwrap() - }; - pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { - path: "a/b".parse().unwrap() - }; - pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular { - path: "a".parse().unwrap(), - size: 0, - executable: false, - digest: EMPTY_DIGEST.clone(), - }; - pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular { - path: "a/b".parse().unwrap(), - size: 0, - executable: false, - digest: EMPTY_DIGEST.clone(), - }; - pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular { - path: "a/b/c".parse().unwrap(), - size: 0, - executable: false, - digest: EMPTY_DIGEST.clone(), - }; - } + pub static EMPTY_DIGEST: LazyLock<B3Digest> = + LazyLock::new(|| blake3::hash(&[]).as_bytes().into()); + pub static DIR_A: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir { + path: "a".parse().unwrap(), + }); + pub static DIR_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir { + path: "b".parse().unwrap(), + }); + pub static DIR_A_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir { + path: "a/b".parse().unwrap(), + }); + pub static FILE_A: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular { + path: "a".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_DIGEST.clone(), + }); + pub static FILE_A_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular { + path: "a/b".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_DIGEST.clone(), + }); + pub static FILE_A_B_C: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular { + path: "a/b/c".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_DIGEST.clone(), + }); #[rstest] #[case::implicit_directories(&[&*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])] diff --git a/tvix/castore/src/import/blobs.rs b/tvix/castore/src/import/blobs.rs index 8135d871d6c0..f71ee1e63768 100644 --- a/tvix/castore/src/import/blobs.rs +++ b/tvix/castore/src/import/blobs.rs @@ -28,6 +28,9 @@ pub enum Error { #[error("unable to read blob contents for {0}: {1}")] BlobRead(PathBuf, std::io::Error), + #[error("unable to check whether blob at {0} already exists: {1}")] + BlobCheck(PathBuf, std::io::Error), + // FUTUREWORK: proper error for blob finalize #[error("unable to finalize blob {0}: {1}")] BlobFinalize(PathBuf, std::io::Error), @@ -118,6 +121,16 @@ where let path = path.to_owned(); let r = Cursor::new(buffer); async move { + // We know the blob digest already, check it exists before sending it. + if blob_service + .has(&expected_digest) + .await + .map_err(|e| Error::BlobCheck(path.clone(), e))? + { + drop(permit); + return Ok(()); + } + let digest = upload_blob(&blob_service, &path, expected_size, r).await?; assert_eq!(digest, expected_digest, "Tvix bug: blob digest mismatch"); diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index dc7821b8101e..78eac6f6128d 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -8,7 +8,9 @@ use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; use tokio::io::BufReader; use tokio_util::io::InspectReader; +use tracing::info_span; use tracing::instrument; +use tracing::Instrument; use tracing::Span; use tracing_indicatif::span_ext::IndicatifSpanExt; use walkdir::DirEntry; @@ -16,8 +18,8 @@ use walkdir::WalkDir; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; -use crate::proto::node::Node; -use crate::B3Digest; +use crate::refscan::{ReferenceReader, ReferenceScanner}; +use crate::{B3Digest, Node}; use super::ingest_entries; use super::IngestionEntry; @@ -30,20 +32,24 @@ use super::IngestionError; /// /// This function will walk the filesystem using `walkdir` and will consume /// `O(#number of entries)` space. -#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)] -pub async fn ingest_path<BS, DS, P>( +#[instrument( + skip(blob_service, directory_service, reference_scanner), + fields(path), + err +)] +pub async fn ingest_path<BS, DS, P, P2>( blob_service: BS, directory_service: DS, path: P, + reference_scanner: Option<&ReferenceScanner<P2>>, ) -> Result<Node, IngestionError<Error>> where P: AsRef<std::path::Path> + std::fmt::Debug, BS: BlobService + Clone, DS: DirectoryService, + P2: AsRef<[u8]> + Send + Sync, { let span = Span::current(); - span.pb_set_message(&format!("Ingesting {:?}", path)); - span.pb_start(); let iter = WalkDir::new(path.as_ref()) .follow_links(false) @@ -51,7 +57,8 @@ where .contents_first(true) .into_iter(); - let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref()); + let entries = + dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner); ingest_entries( directory_service, entries.inspect({ @@ -72,14 +79,16 @@ where /// The produced stream is buffered, so uploads can happen concurrently. /// /// The root is the [Path] in the filesystem that is being ingested into the castore. -pub fn dir_entries_to_ingestion_stream<'a, BS, I>( +pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>( blob_service: BS, iter: I, root: &'a std::path::Path, + reference_scanner: Option<&'a ReferenceScanner<P>>, ) -> BoxStream<'a, Result<IngestionEntry, Error>> where BS: BlobService + Clone + 'a, I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a, + P: AsRef<[u8]> + Send + Sync, { let prefix = root.parent().unwrap_or_else(|| std::path::Path::new("")); @@ -90,7 +99,13 @@ where async move { match x { Ok(dir_entry) => { - dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await + dir_entry_to_ingestion_entry( + blob_service, + &dir_entry, + prefix, + reference_scanner, + ) + .await } Err(e) => Err(Error::Stat( prefix.to_path_buf(), @@ -108,13 +123,15 @@ where /// /// The prefix path is stripped from the path of each entry. This is usually the parent path /// of the path being ingested so that the last element of the stream only has one component. -pub async fn dir_entry_to_ingestion_entry<BS>( +pub async fn dir_entry_to_ingestion_entry<BS, P>( blob_service: BS, entry: &DirEntry, prefix: &std::path::Path, + reference_scanner: Option<&ReferenceScanner<P>>, ) -> Result<IngestionEntry, Error> where BS: BlobService, + P: AsRef<[u8]>, { let file_type = entry.file_type(); @@ -135,13 +152,25 @@ where .into_os_string() .into_vec(); + if let Some(reference_scanner) = &reference_scanner { + reference_scanner.scan(&target); + } + Ok(IngestionEntry::Symlink { path, target }) } else if file_type.is_file() { let metadata = entry .metadata() .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?; - let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?; + let digest = upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner) + .instrument({ + let span = info_span!("upload_blob", "indicatif.pb_show" = tracing::field::Empty); + span.pb_set_message(&format!("Uploading blob for {:?}", fs_path)); + span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); + + span + }) + .await?; Ok(IngestionEntry::Regular { path, @@ -157,17 +186,17 @@ where } /// Uploads the file at the provided [Path] the the [BlobService]. -#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)] -async fn upload_blob<BS>( +#[instrument(skip(blob_service, reference_scanner), fields(path), err)] +async fn upload_blob<BS, P>( blob_service: BS, path: impl AsRef<std::path::Path>, + reference_scanner: Option<&ReferenceScanner<P>>, ) -> Result<B3Digest, Error> where BS: BlobService, + P: AsRef<[u8]>, { let span = Span::current(); - span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); - span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref())); span.pb_start(); let file = tokio::fs::File::open(path.as_ref()) @@ -185,9 +214,16 @@ where }); let mut writer = blob_service.open_write().await; - tokio::io::copy(&mut BufReader::new(reader), &mut writer) - .await - .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; + if let Some(reference_scanner) = reference_scanner { + let mut reader = ReferenceReader::new(reference_scanner, BufReader::new(reader)); + tokio::io::copy(&mut reader, &mut writer) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; + } else { + tokio::io::copy(&mut BufReader::new(reader), &mut writer) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; + } let digest = writer .close() diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index a9ac0be6b064..6e10a64939a4 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -4,15 +4,9 @@ //! Specific implementations, such as ingesting from the filesystem, live in //! child modules. -use crate::directoryservice::DirectoryPutter; -use crate::directoryservice::DirectoryService; +use crate::directoryservice::{DirectoryPutter, DirectoryService}; use crate::path::{Path, PathBuf}; -use crate::proto::node::Node; -use crate::proto::Directory; -use crate::proto::DirectoryNode; -use crate::proto::FileNode; -use crate::proto::SymlinkNode; -use crate::B3Digest; +use crate::{B3Digest, Directory, Node, SymlinkTargetError}; use futures::{Stream, StreamExt}; use tracing::Level; @@ -65,14 +59,6 @@ where // we break the loop manually. .expect("Tvix bug: unexpected end of stream")?; - let name = entry - .path() - .file_name() - // If this is the root node, it will have an empty name. - .unwrap_or_default() - .to_owned() - .into(); - let node = match &mut entry { IngestionEntry::Dir { .. } => { // If the entry is a directory, we traversed all its children (and @@ -98,27 +84,31 @@ where IngestionError::UploadDirectoryError(entry.path().to_owned(), e) })?; - Node::Directory(DirectoryNode { - name, - digest: directory_digest.into(), + Node::Directory { + digest: directory_digest, size: directory_size, - }) + } } - IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode { - name, - target: target.to_owned().into(), - }), + IngestionEntry::Symlink { ref target, .. } => Node::Symlink { + target: bytes::Bytes::copy_from_slice(target).try_into().map_err( + |e: SymlinkTargetError| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(format!("invalid symlink target: {}", e)), + ) + }, + )?, + }, IngestionEntry::Regular { size, executable, digest, .. - } => Node::File(FileNode { - name, - digest: digest.to_owned().into(), + } => Node::File { + digest: digest.clone(), size: *size, executable: *executable, - }), + }, }; let parent = entry @@ -129,8 +119,24 @@ where if parent == crate::Path::ROOT { break node; } else { + let name = entry + .path() + .file_name() + // If this is the root node, it will have an empty name. + .unwrap_or_else(|| "".try_into().unwrap()) + .to_owned(); + // record node in parent directory, creating a new [Directory] if not there yet. - directories.entry(parent.to_owned()).or_default().add(node); + directories + .entry(parent.to_owned()) + .or_default() + .add(name, node) + .map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?; } }; @@ -155,15 +161,8 @@ where #[cfg(debug_assertions)] { - if let Node::Directory(directory_node) = &root_node { - debug_assert_eq!( - root_directory_digest, - directory_node - .digest - .to_vec() - .try_into() - .expect("invalid digest len") - ) + if let Node::Directory { digest, .. } = &root_node { + debug_assert_eq!(&root_directory_digest, digest); } else { unreachable!("Tvix bug: directory putter initialized but no root directory node"); } @@ -209,9 +208,8 @@ mod test { use rstest::rstest; use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST}; - use crate::proto::node::Node; - use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode}; use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST}; + use crate::{Directory, Node}; use super::ingest_entries; use super::IngestionEntry; @@ -223,18 +221,18 @@ mod test { executable: true, digest: DUMMY_DIGEST.clone(), }], - Node::File(FileNode { name: "foo".into(), digest: DUMMY_DIGEST.clone().into(), size: 42, executable: true } - ))] + Node::File{digest: DUMMY_DIGEST.clone(), size: 42, executable: true} + )] #[case::single_symlink(vec![IngestionEntry::Symlink { path: "foo".parse().unwrap(), target: b"blub".into(), }], - Node::Symlink(SymlinkNode { name: "foo".into(), target: "blub".into()}) + Node::Symlink{target: "blub".try_into().unwrap()} )] #[case::single_dir(vec![IngestionEntry::Dir { path: "foo".parse().unwrap(), }], - Node::Directory(DirectoryNode { name: "foo".into(), digest: Directory::default().digest().into(), size: Directory::default().size()}) + Node::Directory{digest: Directory::default().digest(), size: Directory::default().size()} )] #[case::dir_with_keep(vec![ IngestionEntry::Regular { @@ -247,7 +245,7 @@ mod test { path: "foo".parse().unwrap(), }, ], - Node::Directory(DirectoryNode { name: "foo".into(), digest: DIRECTORY_WITH_KEEP.digest().into(), size: DIRECTORY_WITH_KEEP.size() }) + Node::Directory{ digest: DIRECTORY_WITH_KEEP.digest(), size: DIRECTORY_WITH_KEEP.size()} )] /// This is intentionally a bit unsorted, though it still satisfies all /// requirements we have on the order of elements in the stream. @@ -275,7 +273,7 @@ mod test { path: "blub".parse().unwrap(), }, ], - Node::Directory(DirectoryNode { name: "blub".into(), digest: DIRECTORY_COMPLICATED.digest().into(), size:DIRECTORY_COMPLICATED.size() }) + Node::Directory{ digest: DIRECTORY_COMPLICATED.digest(), size: DIRECTORY_COMPLICATED.size() } )] #[tokio::test] async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) { diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index 4fca9801c97b..93d06fd4582d 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -6,19 +6,23 @@ pub mod blobservice; pub mod composition; pub mod directoryservice; pub mod fixtures; +pub mod refscan; #[cfg(feature = "fs")] pub mod fs; +mod nodes; +pub use nodes::*; + mod path; -pub use path::{Path, PathBuf}; +pub use path::{Path, PathBuf, PathComponent, PathComponentError}; pub mod import; pub mod proto; pub mod tonic; pub use digests::{B3Digest, B3_LEN}; -pub use errors::Error; +pub use errors::{DirectoryError, Error, ValidateNodeError}; pub use hashing_reader::{B3HashingReader, HashingReader}; #[cfg(test)] diff --git a/tvix/castore/src/nodes/directory.rs b/tvix/castore/src/nodes/directory.rs new file mode 100644 index 000000000000..f80e055dde80 --- /dev/null +++ b/tvix/castore/src/nodes/directory.rs @@ -0,0 +1,287 @@ +use std::collections::btree_map::{self, BTreeMap}; + +use crate::{errors::DirectoryError, path::PathComponent, proto, B3Digest, Node}; + +/// A Directory contains nodes, which can be Directory, File or Symlink nodes. +/// It attaches names to these nodes, which is the basename in that directory. +/// These names: +/// - MUST not contain slashes or null bytes +/// - MUST not be '.' or '..' +/// - MUST be unique across all three lists +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct Directory { + nodes: BTreeMap<PathComponent, Node>, +} + +impl Directory { + /// Constructs a new, empty Directory. + pub fn new() -> Self { + Directory { + nodes: BTreeMap::new(), + } + } + + /// Construct a [Directory] from tuples of name and [Node]. + /// + /// Inserting multiple elements with the same name will yield an error, as + /// well as exceeding the maximum size. + pub fn try_from_iter<T: IntoIterator<Item = (PathComponent, Node)>>( + iter: T, + ) -> Result<Directory, DirectoryError> { + let mut nodes = BTreeMap::new(); + + iter.into_iter().try_fold(0u64, |size, (name, node)| { + check_insert_node(size, &mut nodes, name, node) + })?; + + Ok(Self { nodes }) + } + + /// The size of a directory is the number of all regular and symlink elements, + /// the number of directory elements, and their size fields. + pub fn size(&self) -> u64 { + // It's impossible to create a Directory where the size overflows, because we + // check before every add() that the size won't overflow. + (self.nodes.len() as u64) + + self + .nodes() + .map(|(_name, n)| match n { + Node::Directory { size, .. } => 1 + size, + Node::File { .. } | Node::Symlink { .. } => 1, + }) + .sum::<u64>() + } + + /// Calculates the digest of a Directory, which is the blake3 hash of a + /// Directory protobuf message, serialized in protobuf canonical form. + pub fn digest(&self) -> B3Digest { + proto::Directory::from(self.clone()).digest() + } + + /// Allows iterating over all nodes (directories, files and symlinks) + /// For each, it returns a tuple of its name and node. + /// The elements are sorted by their names. + pub fn nodes(&self) -> impl Iterator<Item = (&PathComponent, &Node)> + Send + Sync + '_ { + self.nodes.iter() + } + + /// Dissolves a Directory into its individual names and nodes. + /// The elements are sorted by their names. + pub fn into_nodes(self) -> impl Iterator<Item = (PathComponent, Node)> + Send + Sync { + self.nodes.into_iter() + } + + /// Adds the specified [Node] to the [Directory] with a given name. + /// + /// Inserting a node that already exists with the same name in the directory + /// will yield an error, as well as exceeding the maximum size. + /// + /// In case you want to construct a [Directory] from multiple elements, use + /// [from_iter] instead. + pub fn add(&mut self, name: PathComponent, node: Node) -> Result<(), DirectoryError> { + check_insert_node(self.size(), &mut self.nodes, name, node)?; + Ok(()) + } +} + +fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { + iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) +} + +/// Helper function dealing with inserting nodes into the nodes [BTreeMap], +/// after ensuring the new size doesn't overlow and the key doesn't exist already. +/// +/// Returns the new total size, or an error. +fn check_insert_node( + current_size: u64, + nodes: &mut BTreeMap<PathComponent, Node>, + name: PathComponent, + node: Node, +) -> Result<u64, DirectoryError> { + // Check that the even after adding this new directory entry, the size calculation will not + // overflow + let new_size = checked_sum([ + current_size, + 1, + match node { + Node::Directory { size, .. } => size, + _ => 0, + }, + ]) + .ok_or(DirectoryError::SizeOverflow)?; + + match nodes.entry(name) { + btree_map::Entry::Vacant(e) => { + e.insert(node); + } + btree_map::Entry::Occupied(occupied) => { + return Err(DirectoryError::DuplicateName(occupied.key().to_owned())) + } + } + + Ok(new_size) +} + +#[cfg(test)] +mod test { + use super::{Directory, Node}; + use crate::fixtures::DUMMY_DIGEST; + use crate::{DirectoryError, PathComponent}; + + #[test] + fn from_iter_single() { + Directory::try_from_iter([( + PathComponent::try_from("b").unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + )]) + .unwrap(); + } + + #[test] + fn from_iter_multiple() { + let d = Directory::try_from_iter([ + ( + "b".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ), + ( + "a".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ), + ( + "z".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ), + ( + "f".try_into().unwrap(), + Node::File { + digest: DUMMY_DIGEST.clone(), + size: 1, + executable: true, + }, + ), + ( + "c".try_into().unwrap(), + Node::File { + digest: DUMMY_DIGEST.clone(), + size: 1, + executable: true, + }, + ), + ( + "g".try_into().unwrap(), + Node::File { + digest: DUMMY_DIGEST.clone(), + size: 1, + executable: true, + }, + ), + ( + "t".try_into().unwrap(), + Node::Symlink { + target: "a".try_into().unwrap(), + }, + ), + ( + "o".try_into().unwrap(), + Node::Symlink { + target: "a".try_into().unwrap(), + }, + ), + ( + "e".try_into().unwrap(), + Node::Symlink { + target: "a".try_into().unwrap(), + }, + ), + ]) + .unwrap(); + + // Convert to proto struct and back to ensure we are not generating any invalid structures + crate::Directory::try_from(crate::proto::Directory::from(d)) + .expect("directory should be valid"); + } + + #[test] + fn add_nodes_to_directory() { + let mut d = Directory::new(); + + d.add( + "b".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ) + .unwrap(); + d.add( + "a".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ) + .unwrap(); + + // Convert to proto struct and back to ensure we are not generating any invalid structures + crate::Directory::try_from(crate::proto::Directory::from(d)) + .expect("directory should be valid"); + } + + #[test] + fn validate_overflow() { + let mut d = Directory::new(); + + assert_eq!( + d.add( + "foo".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: u64::MAX + } + ), + Err(DirectoryError::SizeOverflow) + ); + } + + #[test] + fn add_duplicate_node_to_directory() { + let mut d = Directory::new(); + + d.add( + "a".try_into().unwrap(), + Node::Directory { + digest: DUMMY_DIGEST.clone(), + size: 1, + }, + ) + .unwrap(); + assert_eq!( + format!( + "{}", + d.add( + "a".try_into().unwrap(), + Node::File { + digest: DUMMY_DIGEST.clone(), + size: 1, + executable: true + } + ) + .expect_err("adding duplicate dir entry must fail") + ), + "\"a\" is a duplicate name" + ); + } +} diff --git a/tvix/castore/src/nodes/mod.rs b/tvix/castore/src/nodes/mod.rs new file mode 100644 index 000000000000..ac7aa1e666df --- /dev/null +++ b/tvix/castore/src/nodes/mod.rs @@ -0,0 +1,48 @@ +//! This holds types describing nodes in the tvix-castore model. +mod directory; +mod symlink_target; + +use crate::B3Digest; +pub use directory::Directory; +pub use symlink_target::{SymlinkTarget, SymlinkTargetError}; + +/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode]. +/// Nodes themselves don't have names, what gives them names is either them +/// being inside a [Directory], or a root node with its own name attached to it. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Node { + /// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest]. + /// It also records a`size`. + /// Such a node is either an element in the [Directory] it itself is contained in, + /// or a standalone root node. + Directory { + /// The blake3 hash of a Directory message, serialized in protobuf canonical form. + digest: B3Digest, + /// Number of child elements in the Directory referred to by `digest`. + /// Calculated by summing up the numbers of nodes, and for each directory, + /// its size field. Can be used for inode allocation. + /// This field is precisely as verifiable as any other Merkle tree edge. + /// Resolve `digest`, and you can compute it incrementally. Resolve the entire + /// tree, and you can fully compute it from scratch. + /// A credulous implementation won't reject an excessive size, but this is + /// harmless: you'll have some ordinals without nodes. Undersizing is obvious + /// and easy to reject: you won't have an ordinal for some nodes. + size: u64, + }, + /// A FileNode represents a regular or executable file in a Directory or at the root. + File { + /// The blake3 digest of the file contents + digest: B3Digest, + + /// The file content size + size: u64, + + /// Whether the file is executable + executable: bool, + }, + /// A SymlinkNode represents a symbolic link in a Directory or at the root. + Symlink { + /// The target of the symlink. + target: SymlinkTarget, + }, +} diff --git a/tvix/castore/src/nodes/symlink_target.rs b/tvix/castore/src/nodes/symlink_target.rs new file mode 100644 index 000000000000..e9a1a0bd05c2 --- /dev/null +++ b/tvix/castore/src/nodes/symlink_target.rs @@ -0,0 +1,223 @@ +use bstr::ByteSlice; +use std::fmt::{self, Debug, Display}; + +/// A wrapper type for symlink targets. +/// Internally uses a [bytes::Bytes], but disallows empty targets and those +/// containing null bytes. +#[repr(transparent)] +#[derive(Clone, PartialEq, Eq)] +pub struct SymlinkTarget { + inner: bytes::Bytes, +} + +/// The maximum length a symlink target can have. +/// Linux allows 4095 bytes here. +pub const MAX_TARGET_LEN: usize = 4095; + +impl AsRef<[u8]> for SymlinkTarget { + fn as_ref(&self) -> &[u8] { + self.inner.as_ref() + } +} + +impl From<SymlinkTarget> for bytes::Bytes { + fn from(value: SymlinkTarget) -> Self { + value.inner + } +} + +fn validate_symlink_target<B: AsRef<[u8]>>(symlink_target: B) -> Result<B, SymlinkTargetError> { + let v = symlink_target.as_ref(); + + if v.is_empty() { + return Err(SymlinkTargetError::Empty); + } + if v.len() > MAX_TARGET_LEN { + return Err(SymlinkTargetError::TooLong); + } + if v.contains(&0x00) { + return Err(SymlinkTargetError::Null); + } + + Ok(symlink_target) +} + +impl TryFrom<bytes::Bytes> for SymlinkTarget { + type Error = SymlinkTargetError; + + fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> { + if let Err(e) = validate_symlink_target(&value) { + return Err(SymlinkTargetError::Convert(value, Box::new(e))); + } + + Ok(Self { inner: value }) + } +} + +impl TryFrom<&'static [u8]> for SymlinkTarget { + type Error = SymlinkTargetError; + + fn try_from(value: &'static [u8]) -> Result<Self, Self::Error> { + if let Err(e) = validate_symlink_target(&value) { + return Err(SymlinkTargetError::Convert(value.into(), Box::new(e))); + } + + Ok(Self { + inner: bytes::Bytes::from_static(value), + }) + } +} + +impl TryFrom<&str> for SymlinkTarget { + type Error = SymlinkTargetError; + + fn try_from(value: &str) -> Result<Self, Self::Error> { + if let Err(e) = validate_symlink_target(value) { + return Err(SymlinkTargetError::Convert( + value.to_owned().into(), + Box::new(e), + )); + } + + Ok(Self { + inner: bytes::Bytes::copy_from_slice(value.as_bytes()), + }) + } +} + +impl Debug for SymlinkTarget { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Debug::fmt(self.inner.as_bstr(), f) + } +} + +impl Display for SymlinkTarget { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Display::fmt(self.inner.as_bstr(), f) + } +} + +/// Errors created when constructing / converting to [SymlinkTarget]. +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +#[cfg_attr(test, derive(Clone))] +pub enum SymlinkTargetError { + #[error("cannot be empty")] + Empty, + #[error("cannot contain null bytes")] + Null, + #[error("cannot be over {} bytes long", MAX_TARGET_LEN)] + TooLong, + #[error("unable to convert '{:?}", .0.as_bstr())] + Convert(bytes::Bytes, Box<Self>), +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use rstest::rstest; + + use super::validate_symlink_target; + use super::{SymlinkTarget, SymlinkTargetError}; + + #[rstest] + #[case::empty(b"", SymlinkTargetError::Empty)] + #[case::null(b"foo\0", SymlinkTargetError::Null)] + fn errors(#[case] v: &'static [u8], #[case] err: SymlinkTargetError) { + { + assert_eq!( + Err(err.clone()), + validate_symlink_target(v), + "validate_symlink_target must fail as expected" + ); + } + + let exp_err_v = Bytes::from_static(v); + + // Bytes + { + let v = Bytes::from_static(v); + assert_eq!( + Err(SymlinkTargetError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + SymlinkTarget::try_from(v), + "conversion must fail as expected" + ); + } + // &[u8] + { + assert_eq!( + Err(SymlinkTargetError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + SymlinkTarget::try_from(v), + "conversion must fail as expected" + ); + } + // &str, if this is valid UTF-8 + { + if let Ok(v) = std::str::from_utf8(v) { + assert_eq!( + Err(SymlinkTargetError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + SymlinkTarget::try_from(v), + "conversion must fail as expected" + ); + } + } + } + + #[test] + fn error_toolong() { + assert_eq!( + Err(SymlinkTargetError::TooLong), + validate_symlink_target("X".repeat(5000).into_bytes().as_slice()) + ) + } + + #[rstest] + #[case::boring(b"aa")] + #[case::dot(b".")] + #[case::dotsandslashes(b"./..")] + #[case::dotdot(b"..")] + #[case::slashes(b"a/b")] + #[case::slashes_and_absolute(b"/a/b")] + #[case::invalid_utf8(b"\xc5\xc4\xd6")] + fn success(#[case] v: &'static [u8]) { + let exp = SymlinkTarget { inner: v.into() }; + + // Bytes + { + let v: Bytes = v.into(); + assert_eq!( + Ok(exp.clone()), + SymlinkTarget::try_from(v), + "conversion must succeed" + ) + } + + // &[u8] + { + assert_eq!( + Ok(exp.clone()), + SymlinkTarget::try_from(v), + "conversion must succeed" + ) + } + + // &str, if this is valid UTF-8 + { + if let Ok(v) = std::str::from_utf8(v) { + assert_eq!( + Ok(exp.clone()), + SymlinkTarget::try_from(v), + "conversion must succeed" + ) + } + } + } +} diff --git a/tvix/castore/src/path/component.rs b/tvix/castore/src/path/component.rs new file mode 100644 index 000000000000..78aca03c50fe --- /dev/null +++ b/tvix/castore/src/path/component.rs @@ -0,0 +1,268 @@ +use bstr::ByteSlice; +use std::fmt::{self, Debug, Display}; + +/// A wrapper type for validated path components in the castore model. +/// Internally uses a [bytes::Bytes], but disallows +/// slashes, and null bytes to be present, as well as +/// '.', '..' and the empty string. +/// It also rejects components that are too long (> 255 bytes). +#[repr(transparent)] +#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct PathComponent { + pub(super) inner: bytes::Bytes, +} + +/// The maximum length an individual path component can have. +/// Linux allows 255 bytes of actual name, so we pick that. +pub const MAX_NAME_LEN: usize = 255; + +impl AsRef<[u8]> for PathComponent { + fn as_ref(&self) -> &[u8] { + self.inner.as_ref() + } +} + +impl From<PathComponent> for bytes::Bytes { + fn from(value: PathComponent) -> Self { + value.inner + } +} + +pub(super) fn validate_name<B: AsRef<[u8]>>(name: B) -> Result<(), PathComponentError> { + match name.as_ref() { + b"" => Err(PathComponentError::Empty), + b".." => Err(PathComponentError::Parent), + b"." => Err(PathComponentError::CurDir), + v if v.len() > MAX_NAME_LEN => Err(PathComponentError::TooLong), + v if v.contains(&0x00) => Err(PathComponentError::Null), + v if v.contains(&b'/') => Err(PathComponentError::Slashes), + _ => Ok(()), + } +} + +impl TryFrom<bytes::Bytes> for PathComponent { + type Error = PathComponentError; + + fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> { + if let Err(e) = validate_name(&value) { + return Err(PathComponentError::Convert(value, Box::new(e))); + } + + Ok(Self { inner: value }) + } +} + +impl TryFrom<&'static [u8]> for PathComponent { + type Error = PathComponentError; + + fn try_from(value: &'static [u8]) -> Result<Self, Self::Error> { + if let Err(e) = validate_name(value) { + return Err(PathComponentError::Convert(value.into(), Box::new(e))); + } + + Ok(Self { + inner: bytes::Bytes::from_static(value), + }) + } +} + +impl TryFrom<&str> for PathComponent { + type Error = PathComponentError; + + fn try_from(value: &str) -> Result<Self, Self::Error> { + if let Err(e) = validate_name(value) { + return Err(PathComponentError::Convert( + value.to_owned().into(), + Box::new(e), + )); + } + + Ok(Self { + inner: bytes::Bytes::copy_from_slice(value.as_bytes()), + }) + } +} + +impl TryFrom<&std::ffi::CStr> for PathComponent { + type Error = PathComponentError; + + fn try_from(value: &std::ffi::CStr) -> Result<Self, Self::Error> { + let value = value.to_bytes(); + if let Err(e) = validate_name(value) { + return Err(PathComponentError::Convert( + value.to_owned().into(), + Box::new(e), + )); + } + + Ok(Self { + inner: bytes::Bytes::copy_from_slice(value), + }) + } +} + +impl Debug for PathComponent { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Debug::fmt(self.inner.as_bstr(), f) + } +} + +impl Display for PathComponent { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Display::fmt(self.inner.as_bstr(), f) + } +} + +/// Errors created when parsing / validating [PathComponent]. +#[derive(Debug, PartialEq, thiserror::Error)] +#[cfg_attr(test, derive(Clone))] +pub enum PathComponentError { + #[error("cannot be empty")] + Empty, + #[error("cannot contain null bytes")] + Null, + #[error("cannot be '.'")] + CurDir, + #[error("cannot be '..'")] + Parent, + #[error("cannot contain slashes")] + Slashes, + #[error("cannot be over {} bytes long", MAX_NAME_LEN)] + TooLong, + #[error("unable to convert '{:?}'", .0.as_bstr())] + Convert(bytes::Bytes, #[source] Box<Self>), +} + +#[cfg(test)] +mod tests { + use std::ffi::CString; + + use bytes::Bytes; + use rstest::rstest; + + use super::{validate_name, PathComponent, PathComponentError}; + + #[rstest] + #[case::empty(b"", PathComponentError::Empty)] + #[case::null(b"foo\0", PathComponentError::Null)] + #[case::curdir(b".", PathComponentError::CurDir)] + #[case::parent(b"..", PathComponentError::Parent)] + #[case::slashes1(b"a/b", PathComponentError::Slashes)] + #[case::slashes2(b"/", PathComponentError::Slashes)] + fn errors(#[case] v: &'static [u8], #[case] err: PathComponentError) { + { + assert_eq!( + Err(err.clone()), + validate_name(v), + "validate_name must fail as expected" + ); + } + + let exp_err_v = Bytes::from_static(v); + + // Bytes + { + let v = Bytes::from_static(v); + assert_eq!( + Err(PathComponentError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + PathComponent::try_from(v), + "conversion must fail as expected" + ); + } + // &[u8] + { + assert_eq!( + Err(PathComponentError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + PathComponent::try_from(v), + "conversion must fail as expected" + ); + } + // &str, if it is valid UTF-8 + { + if let Ok(v) = std::str::from_utf8(v) { + assert_eq!( + Err(PathComponentError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + PathComponent::try_from(v), + "conversion must fail as expected" + ); + } + } + // &CStr, if it can be constructed (fails if the payload contains null bytes) + { + if let Ok(v) = CString::new(v) { + let v = v.as_ref(); + assert_eq!( + Err(PathComponentError::Convert( + exp_err_v.clone(), + Box::new(err.clone()) + )), + PathComponent::try_from(v), + "conversion must fail as expected" + ); + } + } + } + + #[test] + fn error_toolong() { + assert_eq!( + Err(PathComponentError::TooLong), + validate_name("X".repeat(500).into_bytes().as_slice()) + ) + } + + #[test] + fn success() { + let exp = PathComponent { inner: "aa".into() }; + + // Bytes + { + let v: Bytes = "aa".into(); + assert_eq!( + Ok(exp.clone()), + PathComponent::try_from(v), + "conversion must succeed" + ); + } + + // &[u8] + { + let v: &[u8] = b"aa"; + assert_eq!( + Ok(exp.clone()), + PathComponent::try_from(v), + "conversion must succeed" + ); + } + + // &str + { + let v: &str = "aa"; + assert_eq!( + Ok(exp.clone()), + PathComponent::try_from(v), + "conversion must succeed" + ); + } + + // &CStr + { + let v = CString::new("aa").expect("CString must construct"); + let v = v.as_c_str(); + assert_eq!( + Ok(exp.clone()), + PathComponent::try_from(v), + "conversion must succeed" + ); + } + } +} diff --git a/tvix/castore/src/path.rs b/tvix/castore/src/path/mod.rs index fcc2bd01fbd6..15f31a570da9 100644 --- a/tvix/castore/src/path.rs +++ b/tvix/castore/src/path/mod.rs @@ -1,5 +1,5 @@ //! Contains data structures to deal with Paths in the tvix-castore model. - +use bstr::ByteSlice; use std::{ borrow::Borrow, fmt::{self, Debug, Display}, @@ -8,9 +8,8 @@ use std::{ str::FromStr, }; -use bstr::ByteSlice; - -use crate::proto::validate_node_name; +mod component; +pub use component::{PathComponent, PathComponentError}; /// Represents a Path in the castore model. /// These are always relative, and platform-independent, which distinguishes @@ -38,7 +37,9 @@ impl Path { if !bytes.is_empty() { // Ensure all components are valid castore node names. for component in bytes.split_str(b"/") { - validate_node_name(component).ok()?; + if component::validate_name(component).is_err() { + return None; + } } } @@ -81,10 +82,26 @@ impl Path { Ok(v) } + /// Provides an iterator over the components of the path, + /// which are invividual [PathComponent]. + /// In case the path is empty, an empty iterator is returned. + pub fn components(&self) -> impl Iterator<Item = PathComponent> + '_ { + let mut iter = self.inner.split_str(&b"/"); + + // We don't want to return an empty element, consume it if it's the only one. + if self.inner.is_empty() { + let _ = iter.next(); + } + + iter.map(|b| PathComponent { + inner: bytes::Bytes::copy_from_slice(b), + }) + } + /// Produces an iterator over the components of the path, which are /// individual byte slices. /// In case the path is empty, an empty iterator is returned. - pub fn components(&self) -> impl Iterator<Item = &[u8]> { + pub fn components_bytes(&self) -> impl Iterator<Item = &[u8]> { let mut iter = self.inner.split_str(&b"/"); // We don't want to return an empty element, consume it if it's the only one. @@ -95,11 +112,16 @@ impl Path { iter } - /// Returns the final component of the Path, if there is one. - pub fn file_name(&self) -> Option<&[u8]> { + /// Returns the final component of the Path, if there is one, in bytes. + pub fn file_name(&self) -> Option<PathComponent> { self.components().last() } + /// Returns the final component of the Path, if there is one, in bytes. + pub fn file_name_bytes(&self) -> Option<&[u8]> { + self.components_bytes().last() + } + pub fn as_bytes(&self) -> &[u8] { &self.inner } @@ -211,7 +233,9 @@ impl PathBuf { /// Adjoins `name` to self. pub fn try_push(&mut self, name: &[u8]) -> Result<(), std::io::Error> { - validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?; + if component::validate_name(name).is_err() { + return Err(std::io::ErrorKind::InvalidData.into()); + } if !self.inner.is_empty() { self.inner.push(b'/'); @@ -329,7 +353,7 @@ mod test { assert_eq!(s.as_bytes(), p.as_bytes(), "inner bytes mismatch"); assert_eq!( num_components, - p.components().count(), + p.components_bytes().count(), "number of components mismatch" ); } @@ -396,10 +420,10 @@ mod test { #[case("a", vec!["a"])] #[case("a/b", vec!["a", "b"])] #[case("a/b/c", vec!["a","b", "c"])] - pub fn components(#[case] p: PathBuf, #[case] exp_components: Vec<&str>) { + pub fn components_bytes(#[case] p: PathBuf, #[case] exp_components: Vec<&str>) { assert_eq!( exp_components, - p.components() + p.components_bytes() .map(|x| x.to_str().unwrap()) .collect::<Vec<_>>() ); diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs index ce1d2bcd244a..62fdb34a25a0 100644 --- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -1,7 +1,5 @@ -use crate::directoryservice::DirectoryGraph; -use crate::directoryservice::LeavesToRootValidator; -use crate::proto; -use crate::{directoryservice::DirectoryService, B3Digest}; +use crate::directoryservice::{DirectoryGraph, DirectoryService, LeavesToRootValidator}; +use crate::{proto, B3Digest, DirectoryError}; use futures::stream::BoxStream; use futures::TryStreamExt; use std::ops::Deref; @@ -58,13 +56,16 @@ where Status::not_found(format!("directory {} not found", digest)) })?; - Box::pin(once(Ok(directory))) + Box::pin(once(Ok(directory.into()))) } else { // If recursive was requested, traverse via get_recursive. Box::pin( - self.directory_service.get_recursive(&digest).map_err(|e| { - tonic::Status::new(tonic::Code::Internal, e.to_string()) - }), + self.directory_service + .get_recursive(&digest) + .map_ok(proto::Directory::from) + .map_err(|e| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + }), ) } })) @@ -83,7 +84,9 @@ where let mut validator = DirectoryGraph::<LeavesToRootValidator>::default(); while let Some(directory) = req_inner.message().await? { validator - .add(directory) + .add(directory.try_into().map_err(|e: DirectoryError| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + })?) .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?; } diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs index a0cec896f753..89c68a4ad97b 100644 --- a/tvix/castore/src/proto/mod.rs +++ b/tvix/castore/src/proto/mod.rs @@ -1,18 +1,14 @@ -#![allow(non_snake_case)] -// https://github.com/hyperium/tonic/issues/1056 -use bstr::ByteSlice; -use std::{collections::HashSet, iter::Peekable, str}; - use prost::Message; +use std::cmp::Ordering; + mod grpc_blobservice_wrapper; mod grpc_directoryservice_wrapper; +use crate::{path::PathComponent, B3Digest, DirectoryError}; pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; -use crate::{B3Digest, B3_LEN}; - tonic::include_proto!("tvix.castore.v1"); #[cfg(feature = "tonic-reflection")] @@ -24,38 +20,6 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix #[cfg(test)] mod tests; -/// Errors that can occur during the validation of [Directory] messages. -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum ValidateDirectoryError { - /// Elements are not in sorted order - #[error("{:?} is not sorted", .0.as_bstr())] - WrongSorting(Vec<u8>), - /// Multiple elements with the same name encountered - #[error("{:?} is a duplicate name", .0.as_bstr())] - DuplicateName(Vec<u8>), - /// Invalid node - #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())] - InvalidNode(Vec<u8>, ValidateNodeError), - #[error("Total size exceeds u32::MAX")] - SizeOverflow, -} - -/// Errors that occur during Node validation -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum ValidateNodeError { - #[error("No node set")] - NoNodeSet, - /// Invalid digest length encountered - #[error("Invalid Digest length: {0}")] - InvalidDigestLen(usize), - /// Invalid name encountered - #[error("Invalid name: {}", .0.as_bstr())] - InvalidName(Vec<u8>), - /// Invalid symlink target - #[error("Invalid symlink target: {}", .0.as_bstr())] - InvalidSymlinkTarget(Vec<u8>), -} - /// Errors that occur during StatBlobResponse validation #[derive(Debug, PartialEq, Eq, thiserror::Error)] pub enum ValidateStatBlobResponseError { @@ -64,186 +28,6 @@ pub enum ValidateStatBlobResponseError { InvalidDigestLen(usize, usize), } -/// Checks a Node name for validity as an intermediate node. -/// We disallow slashes, null bytes, '.', '..' and the empty string. -pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { - if name.is_empty() - || name == b".." - || name == b"." - || name.contains(&0x00) - || name.contains(&b'/') - { - Err(ValidateNodeError::InvalidName(name.to_owned())) - } else { - Ok(()) - } -} - -/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] -/// and [node::Node], so we can ask all of them for the name easily. -pub trait NamedNode { - fn get_name(&self) -> &[u8]; -} - -impl NamedNode for &FileNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for &DirectoryNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for &SymlinkNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for node::Node { - fn get_name(&self) -> &[u8] { - match self { - node::Node::File(node_file) => &node_file.name, - node::Node::Directory(node_directory) => &node_directory.name, - node::Node::Symlink(node_symlink) => &node_symlink.name, - } - } -} - -impl Node { - /// Ensures the node has a valid enum kind (is Some), and passes its - // per-enum validation. - // The inner root node is returned for easier consumption. - pub fn validate(&self) -> Result<&node::Node, ValidateNodeError> { - if let Some(node) = self.node.as_ref() { - node.validate()?; - Ok(node) - } else { - Err(ValidateNodeError::NoNodeSet) - } - } -} - -impl node::Node { - /// Returns the node with a new name. - pub fn rename(self, name: bytes::Bytes) -> Self { - match self { - node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }), - node::Node::File(n) => node::Node::File(FileNode { name, ..n }), - node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }), - } - } - - /// Ensures the node has a valid name, and checks the type-specific fields too. - pub fn validate(&self) -> Result<(), ValidateNodeError> { - match self { - // for a directory root node, ensure the digest has the appropriate size. - node::Node::Directory(directory_node) => { - if directory_node.digest.len() != B3_LEN { - Err(ValidateNodeError::InvalidDigestLen( - directory_node.digest.len(), - ))?; - } - validate_node_name(&directory_node.name) - } - // for a file root node, ensure the digest has the appropriate size. - node::Node::File(file_node) => { - if file_node.digest.len() != B3_LEN { - Err(ValidateNodeError::InvalidDigestLen(file_node.digest.len()))?; - } - validate_node_name(&file_node.name) - } - // ensure the symlink target is not empty and doesn't contain null bytes. - node::Node::Symlink(symlink_node) => { - if symlink_node.target.is_empty() || symlink_node.target.contains(&b'\0') { - Err(ValidateNodeError::InvalidSymlinkTarget( - symlink_node.target.to_vec(), - ))?; - } - validate_node_name(&symlink_node.name) - } - } - } -} - -impl PartialOrd for node::Node { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for node::Node { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for FileNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for FileNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for SymlinkNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for SymlinkNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for DirectoryNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for DirectoryNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -/// Accepts a name, and a mutable reference to the previous name. -/// If the passed name is larger than the previous one, the reference is updated. -/// If it's not, an error is returned. -fn update_if_lt_prev<'n>( - prev_name: &mut &'n [u8], - name: &'n [u8], -) -> Result<(), ValidateDirectoryError> { - if *name < **prev_name { - return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); - } - *prev_name = name; - Ok(()) -} - -/// Inserts the given name into a HashSet if it's not already in there. -/// If it is, an error is returned. -fn insert_once<'n>( - seen_names: &mut HashSet<&'n [u8]>, - name: &'n [u8], -) -> Result<(), ValidateDirectoryError> { - if seen_names.get(name).is_some() { - return Err(ValidateDirectoryError::DuplicateName(name.to_vec())); - } - seen_names.insert(name); - Ok(()) -} - fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) } @@ -280,117 +64,233 @@ impl Directory { .as_bytes() .into() } +} - /// validate checks the directory for invalid data, such as: - /// - violations of name restrictions - /// - invalid digest lengths - /// - not properly sorted lists - /// - duplicate names in the three lists - pub fn validate(&self) -> Result<(), ValidateDirectoryError> { - let mut seen_names: HashSet<&[u8]> = HashSet::new(); - - let mut last_directory_name: &[u8] = b""; - let mut last_file_name: &[u8] = b""; - let mut last_symlink_name: &[u8] = b""; - - // check directories - for directory_node in &self.directories { - node::Node::Directory(directory_node.clone()) - .validate() - .map_err(|e| { - ValidateDirectoryError::InvalidNode(directory_node.name.to_vec(), e) - })?; - - update_if_lt_prev(&mut last_directory_name, &directory_node.name)?; - insert_once(&mut seen_names, &directory_node.name)?; - } +impl TryFrom<Directory> for crate::Directory { + type Error = DirectoryError; + + fn try_from(value: Directory) -> Result<Self, Self::Error> { + // Check directories, files and symlinks are sorted + // We'll notice duplicates across all three fields when constructing the Directory. + // FUTUREWORK: use is_sorted() once stable, and/or implement the producer for + // [crate::Directory::try_from_iter] iterating over all three and doing all checks inline. + value + .directories + .iter() + .try_fold(&b""[..], |prev_name, e| { + match e.name.as_ref().cmp(prev_name) { + Ordering::Less => Err(DirectoryError::WrongSorting(e.name.to_owned())), + Ordering::Equal => Err(DirectoryError::DuplicateName( + e.name + .to_owned() + .try_into() + .map_err(DirectoryError::InvalidName)?, + )), + Ordering::Greater => Ok(e.name.as_ref()), + } + })?; + value.files.iter().try_fold(&b""[..], |prev_name, e| { + match e.name.as_ref().cmp(prev_name) { + Ordering::Less => Err(DirectoryError::WrongSorting(e.name.to_owned())), + Ordering::Equal => Err(DirectoryError::DuplicateName( + e.name + .to_owned() + .try_into() + .map_err(DirectoryError::InvalidName)?, + )), + Ordering::Greater => Ok(e.name.as_ref()), + } + })?; + value.symlinks.iter().try_fold(&b""[..], |prev_name, e| { + match e.name.as_ref().cmp(prev_name) { + Ordering::Less => Err(DirectoryError::WrongSorting(e.name.to_owned())), + Ordering::Equal => Err(DirectoryError::DuplicateName( + e.name + .to_owned() + .try_into() + .map_err(DirectoryError::InvalidName)?, + )), + Ordering::Greater => Ok(e.name.as_ref()), + } + })?; - // check files - for file_node in &self.files { - node::Node::File(file_node.clone()) - .validate() - .map_err(|e| ValidateDirectoryError::InvalidNode(file_node.name.to_vec(), e))?; + // FUTUREWORK: use is_sorted() once stable, and/or implement the producer for + // [crate::Directory::try_from_iter] iterating over all three and doing all checks inline. + let mut elems: Vec<(PathComponent, crate::Node)> = + Vec::with_capacity(value.directories.len() + value.files.len() + value.symlinks.len()); - update_if_lt_prev(&mut last_file_name, &file_node.name)?; - insert_once(&mut seen_names, &file_node.name)?; + for e in value.directories { + elems.push( + Node { + node: Some(node::Node::Directory(e)), + } + .try_into_name_and_node()?, + ); } - // check symlinks - for symlink_node in &self.symlinks { - node::Node::Symlink(symlink_node.clone()) - .validate() - .map_err(|e| ValidateDirectoryError::InvalidNode(symlink_node.name.to_vec(), e))?; + for e in value.files { + elems.push( + Node { + node: Some(node::Node::File(e)), + } + .try_into_name_and_node()?, + ) + } - update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?; - insert_once(&mut seen_names, &symlink_node.name)?; + for e in value.symlinks { + elems.push( + Node { + node: Some(node::Node::Symlink(e)), + } + .try_into_name_and_node()?, + ) } - self.size_checked() - .ok_or(ValidateDirectoryError::SizeOverflow)?; + crate::Directory::try_from_iter(elems) + } +} - Ok(()) +impl From<crate::Directory> for Directory { + fn from(value: crate::Directory) -> Self { + let mut directories = vec![]; + let mut files = vec![]; + let mut symlinks = vec![]; + + for (name, node) in value.into_nodes() { + match node { + crate::Node::File { + digest, + size, + executable, + } => files.push(FileNode { + name: name.into(), + digest: digest.into(), + size, + executable, + }), + crate::Node::Directory { digest, size } => directories.push(DirectoryNode { + name: name.into(), + digest: digest.into(), + size, + }), + crate::Node::Symlink { target } => { + symlinks.push(SymlinkNode { + name: name.into(), + target: target.into(), + }); + } + } + } + + Directory { + directories, + files, + symlinks, + } } +} - /// Allows iterating over all three nodes ([DirectoryNode], [FileNode], - /// [SymlinkNode]) in an ordered fashion, as long as the individual lists - /// are sorted (which can be checked by the [Directory::validate]). - pub fn nodes(&self) -> DirectoryNodesIterator { - return DirectoryNodesIterator { - i_directories: self.directories.iter().peekable(), - i_files: self.files.iter().peekable(), - i_symlinks: self.symlinks.iter().peekable(), - }; +impl Node { + /// Converts a proto [Node] to a [crate::Node], and splits off the name as a [PathComponent]. + pub fn try_into_name_and_node(self) -> Result<(PathComponent, crate::Node), DirectoryError> { + let (name_bytes, node) = self.try_into_unchecked_name_and_checked_node()?; + Ok(( + name_bytes.try_into().map_err(DirectoryError::InvalidName)?, + node, + )) } - /// Adds the specified [node::Node] to the [Directory], preserving sorted entries. - /// This assumes the [Directory] to be sorted prior to adding the node. - /// - /// Inserting an element that already exists with the same name in the directory is not - /// supported. - pub fn add(&mut self, node: node::Node) { - debug_assert!( - !self.files.iter().any(|x| x.get_name() == node.get_name()), - "name already exists in files" - ); - debug_assert!( - !self - .directories - .iter() - .any(|x| x.get_name() == node.get_name()), - "name already exists in directories" - ); - debug_assert!( - !self - .symlinks - .iter() - .any(|x| x.get_name() == node.get_name()), - "name already exists in symlinks" - ); - - match node { - node::Node::File(node) => { - let pos = self - .files - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.files.insert(pos, node); + /// Converts a proto [Node] to a [crate::Node], and splits off the name as a + /// [bytes::Bytes] without doing any checking of it. + fn try_into_unchecked_name_and_checked_node( + self, + ) -> Result<(bytes::Bytes, crate::Node), DirectoryError> { + match self.node.ok_or_else(|| DirectoryError::NoNodeSet)? { + node::Node::Directory(n) => { + let digest = B3Digest::try_from(n.digest) + .map_err(|e| DirectoryError::InvalidNode(n.name.clone(), e.into()))?; + + let node = crate::Node::Directory { + digest, + size: n.size, + }; + + Ok((n.name, node)) } - node::Node::Directory(node) => { - let pos = self - .directories - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.directories.insert(pos, node); + node::Node::File(n) => { + let digest = B3Digest::try_from(n.digest) + .map_err(|e| DirectoryError::InvalidNode(n.name.clone(), e.into()))?; + + let node = crate::Node::File { + digest, + size: n.size, + executable: n.executable, + }; + + Ok((n.name, node)) } - node::Node::Symlink(node) => { - let pos = self - .symlinks - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.symlinks.insert(pos, node); + + node::Node::Symlink(n) => { + let node = crate::Node::Symlink { + target: n.target.try_into().map_err(|e| { + DirectoryError::InvalidNode( + n.name.clone(), + crate::ValidateNodeError::InvalidSymlinkTarget(e), + ) + })?, + }; + + Ok((n.name, node)) } } } + + /// Converts a proto [Node] to a [crate::Node], and splits off the name and returns it as a + /// [bytes::Bytes]. + /// + /// The name must be empty. + pub fn try_into_anonymous_node(self) -> Result<crate::Node, DirectoryError> { + let (name, node) = Self::try_into_unchecked_name_and_checked_node(self)?; + + if !name.is_empty() { + return Err(DirectoryError::NameInAnonymousNode); + } + + Ok(node) + } + + /// Constructs a [Node] from a name and [crate::Node]. + /// The name is a [bytes::Bytes], not a [PathComponent], as we have use an + /// empty name in some places. + pub fn from_name_and_node(name: bytes::Bytes, n: crate::Node) -> Self { + match n { + crate::Node::Directory { digest, size } => Self { + node: Some(node::Node::Directory(DirectoryNode { + name, + digest: digest.into(), + size, + })), + }, + crate::Node::File { + digest, + size, + executable, + } => Self { + node: Some(node::Node::File(FileNode { + name, + digest: digest.into(), + size, + executable, + })), + }, + crate::Node::Symlink { target } => Self { + node: Some(node::Node::Symlink(SymlinkNode { + name, + target: target.into(), + })), + }, + } + } } impl StatBlobResponse { @@ -409,65 +309,3 @@ impl StatBlobResponse { Ok(()) } } - -/// Struct to hold the state of an iterator over all nodes of a Directory. -/// -/// Internally, this keeps peekable Iterators over all three lists of a -/// Directory message. -pub struct DirectoryNodesIterator<'a> { - // directory: &Directory, - i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>, - i_files: Peekable<std::slice::Iter<'a, FileNode>>, - i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>, -} - -/// looks at two elements implementing NamedNode, and returns true if "left -/// is smaller / comes first". -/// -/// Some(_) is preferred over None. -fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool { - match left { - // if left is None, right always wins - None => false, - Some(left_inner) => { - // left is Some. - match right { - // left is Some, right is None - left wins. - None => true, - Some(right_inner) => { - // both are Some - compare the name. - return left_inner.get_name() < right_inner.get_name(); - } - } - } - } -} - -impl Iterator for DirectoryNodesIterator<'_> { - type Item = node::Node; - - // next returns the next node in the Directory. - // we peek at all three internal iterators, and pick the one with the - // smallest name, to ensure lexicographical ordering. - // The individual lists are already known to be sorted. - fn next(&mut self) -> Option<Self::Item> { - if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) { - // i_directories is still in the game, compare with symlinks - if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) { - self.i_directories - .next() - .cloned() - .map(node::Node::Directory) - } else { - self.i_symlinks.next().cloned().map(node::Node::Symlink) - } - } else { - // i_files is still in the game, compare with symlinks - if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) { - self.i_files.next().cloned().map(node::Node::File) - } else { - self.i_symlinks.next().cloned().map(node::Node::Symlink) - } - } - } -} diff --git a/tvix/castore/src/proto/tests/directory.rs b/tvix/castore/src/proto/tests/directory.rs index 81b73a048d52..efbc4e9f2af1 100644 --- a/tvix/castore/src/proto/tests/directory.rs +++ b/tvix/castore/src/proto/tests/directory.rs @@ -1,7 +1,5 @@ -use crate::proto::{ - node, Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError, - ValidateNodeError, -}; +use crate::proto::{Directory, DirectoryError, DirectoryNode, FileNode, SymlinkNode}; +use crate::ValidateNodeError; use hex_literal::hex; @@ -149,7 +147,7 @@ fn digest() { #[test] fn validate_empty() { let d = Directory::default(); - assert_eq!(d.validate(), Ok(())); + assert!(crate::Directory::try_from(d).is_ok()); } #[test] @@ -157,18 +155,15 @@ fn validate_invalid_names() { { let d = Directory { directories: vec![DirectoryNode { - name: "".into(), + name: b"\0"[..].into(), digest: DUMMY_DIGEST.to_vec().into(), size: 42, }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { - assert_eq!(n, b"") - } - _ => panic!("unexpected error"), - }; + + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); } { @@ -180,12 +175,8 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { - assert_eq!(n, b".") - } - _ => panic!("unexpected error"), - }; + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); } { @@ -198,12 +189,8 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { - assert_eq!(n, b"..") - } - _ => panic!("unexpected error"), - }; + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); } { @@ -214,12 +201,8 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { - assert_eq!(n, b"\x00") - } - _ => panic!("unexpected error"), - }; + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); } { @@ -230,12 +213,20 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { - assert_eq!(n, b"foo/bar") - } - _ => panic!("unexpected error"), + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); + } + + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: bytes::Bytes::copy_from_slice("X".repeat(500).into_bytes().as_slice()), + target: "foo".into(), + }], + ..Default::default() }; + let e = crate::Directory::try_from(d).expect_err("must fail"); + assert!(matches!(e, DirectoryError::InvalidName(_))); } } @@ -249,8 +240,8 @@ fn validate_invalid_digest() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => { + match crate::Directory::try_from(d).expect_err("must fail") { + DirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => { assert_eq!(n, 2) } _ => panic!("unexpected error"), @@ -276,15 +267,15 @@ fn validate_sorting() { ], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::WrongSorting(s) => { - assert_eq!(s, b"a"); + match crate::Directory::try_from(d).expect_err("must fail") { + DirectoryError::WrongSorting(s) => { + assert_eq!(s.as_ref(), b"a"); } _ => panic!("unexpected error"), } } - // "a" exists twice, bad. + // "a" exists twice (same types), bad. { let d = Directory { directories: vec![ @@ -301,9 +292,31 @@ fn validate_sorting() { ], ..Default::default() }; - match d.validate().expect_err("must fail") { - ValidateDirectoryError::DuplicateName(s) => { - assert_eq!(s, b"a"); + match crate::Directory::try_from(d).expect_err("must fail") { + DirectoryError::DuplicateName(s) => { + assert_eq!(s.as_ref(), b"a"); + } + _ => panic!("unexpected error"), + } + } + + // "a" exists twice (different types), bad. + { + let d = Directory { + directories: vec![DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }], + symlinks: vec![SymlinkNode { + name: "a".into(), + target: "b".into(), + }], + ..Default::default() + }; + match crate::Directory::try_from(d).expect_err("must fail") { + DirectoryError::DuplicateName(s) => { + assert_eq!(s.as_ref(), b"a"); } _ => panic!("unexpected error"), } @@ -327,7 +340,7 @@ fn validate_sorting() { ..Default::default() }; - d.validate().expect("validate shouldn't error"); + crate::Directory::try_from(d).expect("validate shouldn't error"); } // [b, c] and [a] are both properly sorted. @@ -352,101 +365,6 @@ fn validate_sorting() { ..Default::default() }; - d.validate().expect("validate shouldn't error"); + crate::Directory::try_from(d).expect("validate shouldn't error"); } } - -#[test] -fn validate_overflow() { - let d = Directory { - directories: vec![DirectoryNode { - name: "foo".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: u64::MAX, - }], - ..Default::default() - }; - - match d.validate().expect_err("must fail") { - ValidateDirectoryError::SizeOverflow => {} - _ => panic!("unexpected error"), - } -} - -#[test] -fn add_nodes_to_directory() { - let mut d = Directory { - ..Default::default() - }; - - d.add(node::Node::Directory(DirectoryNode { - name: "b".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::Directory(DirectoryNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::Directory(DirectoryNode { - name: "z".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - - d.add(node::Node::File(FileNode { - name: "f".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - d.add(node::Node::File(FileNode { - name: "c".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - d.add(node::Node::File(FileNode { - name: "g".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - - d.add(node::Node::Symlink(SymlinkNode { - name: "t".into(), - target: "a".into(), - })); - d.add(node::Node::Symlink(SymlinkNode { - name: "o".into(), - target: "a".into(), - })); - d.add(node::Node::Symlink(SymlinkNode { - name: "e".into(), - target: "a".into(), - })); - - d.validate().expect("directory should be valid"); -} - -#[test] -#[cfg_attr(not(debug_assertions), ignore)] -#[should_panic = "name already exists in directories"] -fn add_duplicate_node_to_directory_panics() { - let mut d = Directory { - ..Default::default() - }; - - d.add(node::Node::Directory(DirectoryNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::File(FileNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); -} diff --git a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs deleted file mode 100644 index 68f147a33210..000000000000 --- a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::proto::Directory; -use crate::proto::DirectoryNode; -use crate::proto::FileNode; -use crate::proto::NamedNode; -use crate::proto::SymlinkNode; - -#[test] -fn iterator() { - let d = Directory { - directories: vec![ - DirectoryNode { - name: "c".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "d".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "h".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "l".into(), - ..DirectoryNode::default() - }, - ], - files: vec![ - FileNode { - name: "b".into(), - ..FileNode::default() - }, - FileNode { - name: "e".into(), - ..FileNode::default() - }, - FileNode { - name: "g".into(), - ..FileNode::default() - }, - FileNode { - name: "j".into(), - ..FileNode::default() - }, - ], - symlinks: vec![ - SymlinkNode { - name: "a".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "f".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "i".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "k".into(), - ..SymlinkNode::default() - }, - ], - }; - - // We keep this strings here and convert to string to make the comparison - // less messy. - let mut node_names: Vec<String> = vec![]; - - for node in d.nodes() { - node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap()); - } - - assert_eq!( - vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"], - node_names - ); -} diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs index 8d903bacb6c5..9f6330914bff 100644 --- a/tvix/castore/src/proto/tests/mod.rs +++ b/tvix/castore/src/proto/tests/mod.rs @@ -1,2 +1,47 @@ +use super::{node, Node, SymlinkNode}; +use crate::DirectoryError; + mod directory; -mod directory_nodes_iterator; + +/// Create a node with an empty symlink target, and ensure it fails validation. +#[test] +fn convert_symlink_empty_target_invalid() { + Node { + node: Some(node::Node::Symlink(SymlinkNode { + name: "foo".into(), + target: "".into(), + })), + } + .try_into_name_and_node() + .expect_err("must fail validation"); +} + +/// Create a node with a symlink target including null bytes, and ensure it +/// fails validation. +#[test] +fn convert_symlink_target_null_byte_invalid() { + Node { + node: Some(node::Node::Symlink(SymlinkNode { + name: "foo".into(), + target: "foo\0".into(), + })), + } + .try_into_name_and_node() + .expect_err("must fail validation"); +} + +/// Create a node with a name, and ensure our ano +#[test] +fn convert_anonymous_with_name_fail() { + assert_eq!( + DirectoryError::NameInAnonymousNode, + Node { + node: Some(node::Node::Symlink(SymlinkNode { + name: "foo".into(), + target: "somewhereelse".into(), + })), + } + .try_into_anonymous_node() + .expect_err("must fail") + ) +} diff --git a/tvix/castore/src/refscan.rs b/tvix/castore/src/refscan.rs new file mode 100644 index 000000000000..352593020472 --- /dev/null +++ b/tvix/castore/src/refscan.rs @@ -0,0 +1,354 @@ +//! Simple scanner for non-overlapping, known references of Nix store paths in a +//! given string. +//! +//! This is used for determining build references (see +//! //tvix/eval/docs/build-references.md for more details). +//! +//! The scanner itself is using the Wu-Manber string-matching algorithm, using +//! our fork of the `wu-mamber` crate. +use pin_project::pin_project; +use std::collections::BTreeSet; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::task::{ready, Poll}; +use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; +use wu_manber::TwoByteWM; + +/// A searcher that incapsulates the candidates and the Wu-Manber searcher. +/// This is separate from the scanner because we need to look for the same +/// pattern in multiple outputs and don't want to pay the price of constructing +/// the searcher for each build output. +pub struct ReferencePatternInner<P> { + candidates: Vec<P>, + longest_candidate: usize, + // FUTUREWORK: Support overlapping patterns to be compatible with cpp Nix + searcher: Option<TwoByteWM>, +} + +#[derive(Clone)] +pub struct ReferencePattern<P> { + inner: Arc<ReferencePatternInner<P>>, +} + +impl<P> ReferencePattern<P> { + pub fn candidates(&self) -> &[P] { + &self.inner.candidates + } + + pub fn longest_candidate(&self) -> usize { + self.inner.longest_candidate + } +} + +impl<P: AsRef<[u8]>> ReferencePattern<P> { + /// Construct a new `ReferencePattern` that knows how to scan for the given + /// candidates. + pub fn new(candidates: Vec<P>) -> Self { + let searcher = if candidates.is_empty() { + None + } else { + Some(TwoByteWM::new(&candidates)) + }; + let longest_candidate = candidates.iter().fold(0, |v, c| v.max(c.as_ref().len())); + + ReferencePattern { + inner: Arc::new(ReferencePatternInner { + searcher, + candidates, + longest_candidate, + }), + } + } +} + +impl<P> From<Vec<P>> for ReferencePattern<P> +where + P: AsRef<[u8]>, +{ + fn from(candidates: Vec<P>) -> Self { + Self::new(candidates) + } +} + +/// Represents a "primed" reference scanner with an automaton that knows the set +/// of bytes patterns to scan for. +pub struct ReferenceScanner<P> { + pattern: ReferencePattern<P>, + matches: Vec<AtomicBool>, +} + +impl<P: AsRef<[u8]>> ReferenceScanner<P> { + /// Construct a new `ReferenceScanner` that knows how to scan for the given + /// candidate bytes patterns. + pub fn new<IP: Into<ReferencePattern<P>>>(pattern: IP) -> Self { + let pattern = pattern.into(); + let mut matches = Vec::new(); + for _ in 0..pattern.candidates().len() { + matches.push(AtomicBool::new(false)); + } + ReferenceScanner { pattern, matches } + } + + /// Scan the given buffer for all non-overlapping matches and collect them + /// in the scanner. + pub fn scan<S: AsRef<[u8]>>(&self, haystack: S) { + if haystack.as_ref().len() < self.pattern.longest_candidate() { + return; + } + + if let Some(searcher) = &self.pattern.inner.searcher { + for m in searcher.find(haystack) { + self.matches[m.pat_idx].store(true, Ordering::Release); + } + } + } + + pub fn pattern(&self) -> &ReferencePattern<P> { + &self.pattern + } + + pub fn matches(&self) -> Vec<bool> { + self.matches + .iter() + .map(|m| m.load(Ordering::Acquire)) + .collect() + } + + pub fn candidate_matches(&self) -> impl Iterator<Item = &P> { + let candidates = self.pattern.candidates(); + self.matches.iter().enumerate().filter_map(|(idx, found)| { + if found.load(Ordering::Acquire) { + Some(&candidates[idx]) + } else { + None + } + }) + } +} + +impl<P: Clone + Ord + AsRef<[u8]>> ReferenceScanner<P> { + /// Finalise the reference scanner and return the resulting matches. + pub fn finalise(self) -> BTreeSet<P> { + self.candidate_matches().cloned().collect() + } +} + +const DEFAULT_BUF_SIZE: usize = 8 * 1024; + +#[pin_project] +pub struct ReferenceReader<'a, P, R> { + scanner: &'a ReferenceScanner<P>, + buffer: Vec<u8>, + consumed: usize, + #[pin] + reader: R, +} + +impl<'a, P, R> ReferenceReader<'a, P, R> +where + P: AsRef<[u8]>, +{ + pub fn new(scanner: &'a ReferenceScanner<P>, reader: R) -> Self { + Self::with_capacity(DEFAULT_BUF_SIZE, scanner, reader) + } + + pub fn with_capacity(capacity: usize, scanner: &'a ReferenceScanner<P>, reader: R) -> Self { + // If capacity is not at least as long as longest_candidate we can't do a scan + let capacity = capacity.max(scanner.pattern().longest_candidate()); + ReferenceReader { + scanner, + buffer: Vec::with_capacity(capacity), + consumed: 0, + reader, + } + } +} + +impl<'a, P, R> AsyncRead for ReferenceReader<'a, P, R> +where + R: AsyncRead, + P: AsRef<[u8]>, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<std::io::Result<()>> { + let internal_buf = ready!(self.as_mut().poll_fill_buf(cx))?; + let amt = buf.remaining().min(internal_buf.len()); + buf.put_slice(&internal_buf[..amt]); + self.consume(amt); + Poll::Ready(Ok(())) + } +} + +impl<'a, P, R> AsyncBufRead for ReferenceReader<'a, P, R> +where + R: AsyncRead, + P: AsRef<[u8]>, +{ + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<std::io::Result<&[u8]>> { + #[allow(clippy::manual_saturating_arithmetic)] // for clarity + let overlap = self + .scanner + .pattern + .longest_candidate() + .checked_sub(1) + // If this overflows (longest_candidate = 0), that means there are no needles, + // so there is no need to have any overlap + .unwrap_or(0); + let mut this = self.project(); + // Still data in buffer + if *this.consumed < this.buffer.len() { + return Poll::Ready(Ok(&this.buffer[*this.consumed..])); + } + // We need to copy last `overlap` bytes to front to deal with references that overlap reads + if *this.consumed > overlap { + let start = this.buffer.len() - overlap; + this.buffer.copy_within(start.., 0); + this.buffer.truncate(overlap); + *this.consumed = overlap; + } + // Read at least until self.buffer.len() > overlap so we can do one scan + loop { + let filled = { + let mut buf = ReadBuf::uninit(this.buffer.spare_capacity_mut()); + ready!(this.reader.as_mut().poll_read(cx, &mut buf))?; + buf.filled().len() + }; + // SAFETY: We just read `filled` amount of data above + unsafe { + this.buffer.set_len(filled + this.buffer.len()); + } + if filled == 0 || this.buffer.len() > overlap { + break; + } + } + + #[allow(clippy::needless_borrows_for_generic_args)] // misfiring lint (breaks code below) + this.scanner.scan(&this.buffer); + + Poll::Ready(Ok(&this.buffer[*this.consumed..])) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + debug_assert!(self.consumed + amt <= self.buffer.len()); + let this = self.project(); + *this.consumed += amt; + } +} + +#[cfg(test)] +mod tests { + use rstest::rstest; + use tokio::io::AsyncReadExt as _; + use tokio_test::io::Builder; + + use super::*; + + // The actual derivation of `nixpkgs.hello`. + const HELLO_DRV: &str = r#"Derive([("out","/nix/store/33l4p0pn0mybmqzaxfkpppyh7vx1c74p-hello-2.12.1","","")],[("/nix/store/6z1jfnqqgyqr221zgbpm30v91yfj3r45-bash-5.1-p16.drv",["out"]),("/nix/store/ap9g09fxbicj836zm88d56dn3ff4clxl-stdenv-linux.drv",["out"]),("/nix/store/pf80kikyxr63wrw56k00i1kw6ba76qik-hello-2.12.1.tar.gz.drv",["out"])],["/nix/store/9krlzvny65gdc8s7kpb6lkx8cd02c25b-default-builder.sh"],"x86_64-linux","/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16/bin/bash",["-e","/nix/store/9krlzvny65gdc8s7kpb6lkx8cd02c25b-default-builder.sh"],[("buildInputs",""),("builder","/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16/bin/bash"),("cmakeFlags",""),("configureFlags",""),("depsBuildBuild",""),("depsBuildBuildPropagated",""),("depsBuildTarget",""),("depsBuildTargetPropagated",""),("depsHostHost",""),("depsHostHostPropagated",""),("depsTargetTarget",""),("depsTargetTargetPropagated",""),("doCheck","1"),("doInstallCheck",""),("mesonFlags",""),("name","hello-2.12.1"),("nativeBuildInputs",""),("out","/nix/store/33l4p0pn0mybmqzaxfkpppyh7vx1c74p-hello-2.12.1"),("outputs","out"),("patches",""),("pname","hello"),("propagatedBuildInputs",""),("propagatedNativeBuildInputs",""),("src","/nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz"),("stdenv","/nix/store/cp65c8nk29qq5cl1wyy5qyw103cwmax7-stdenv-linux"),("strictDeps",""),("system","x86_64-linux"),("version","2.12.1")])"#; + + #[test] + fn test_no_patterns() { + let scanner: ReferenceScanner<String> = ReferenceScanner::new(vec![]); + + scanner.scan(HELLO_DRV); + + let result = scanner.finalise(); + + assert_eq!(result.len(), 0); + } + + #[test] + fn test_single_match() { + let scanner = ReferenceScanner::new(vec![ + "/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16".to_string(), + ]); + scanner.scan(HELLO_DRV); + + let result = scanner.finalise(); + + assert_eq!(result.len(), 1); + assert!(result.contains("/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16")); + } + + #[test] + fn test_multiple_matches() { + let candidates = vec![ + // these exist in the drv: + "/nix/store/33l4p0pn0mybmqzaxfkpppyh7vx1c74p-hello-2.12.1".to_string(), + "/nix/store/pf80kikyxr63wrw56k00i1kw6ba76qik-hello-2.12.1.tar.gz.drv".to_string(), + "/nix/store/cp65c8nk29qq5cl1wyy5qyw103cwmax7-stdenv-linux".to_string(), + // this doesn't: + "/nix/store/fn7zvafq26f0c8b17brs7s95s10ibfzs-emacs-28.2.drv".to_string(), + ]; + + let scanner = ReferenceScanner::new(candidates.clone()); + scanner.scan(HELLO_DRV); + + let result = scanner.finalise(); + assert_eq!(result.len(), 3); + + for c in candidates[..3].iter() { + assert!(result.contains(c)); + } + } + + #[rstest] + #[case::normal(8096, 8096)] + #[case::small_capacity(8096, 1)] + #[case::small_read(1, 8096)] + #[case::all_small(1, 1)] + #[tokio::test] + async fn test_reference_reader(#[case] chunk_size: usize, #[case] capacity: usize) { + let candidates = vec![ + // these exist in the drv: + "33l4p0pn0mybmqzaxfkpppyh7vx1c74p", + "pf80kikyxr63wrw56k00i1kw6ba76qik", + "cp65c8nk29qq5cl1wyy5qyw103cwmax7", + // this doesn't: + "fn7zvafq26f0c8b17brs7s95s10ibfzs", + ]; + let pattern = ReferencePattern::new(candidates.clone()); + let scanner = ReferenceScanner::new(pattern); + let mut mock = Builder::new(); + for c in HELLO_DRV.as_bytes().chunks(chunk_size) { + mock.read(c); + } + let mock = mock.build(); + let mut reader = ReferenceReader::with_capacity(capacity, &scanner, mock); + let mut s = String::new(); + reader.read_to_string(&mut s).await.unwrap(); + assert_eq!(s, HELLO_DRV); + + let result = scanner.finalise(); + assert_eq!(result.len(), 3); + + for c in candidates[..3].iter() { + assert!(result.contains(c)); + } + } + + #[tokio::test] + async fn test_reference_reader_no_patterns() { + let pattern = ReferencePattern::new(Vec::<&str>::new()); + let scanner = ReferenceScanner::new(pattern); + let mut mock = Builder::new(); + mock.read(HELLO_DRV.as_bytes()); + let mock = mock.build(); + let mut reader = ReferenceReader::new(&scanner, mock); + let mut s = String::new(); + reader.read_to_string(&mut s).await.unwrap(); + assert_eq!(s, HELLO_DRV); + + let result = scanner.finalise(); + assert_eq!(result.len(), 0); + } + + // FUTUREWORK: Test with large file +} diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs index 72fb06aea877..51d1b68a4ec9 100644 --- a/tvix/castore/src/tests/import.rs +++ b/tvix/castore/src/tests/import.rs @@ -2,14 +2,11 @@ use crate::blobservice::{self, BlobService}; use crate::directoryservice; use crate::fixtures::*; use crate::import::fs::ingest_path; -use crate::proto; +use crate::Node; use tempfile::TempDir; #[cfg(target_family = "unix")] -use std::os::unix::ffi::OsStrExt; - -#[cfg(target_family = "unix")] #[tokio::test] async fn symlink() { let blob_service = blobservice::from_addr("memory://").await.unwrap(); @@ -24,19 +21,19 @@ async fn symlink() { ) .unwrap(); - let root_node = ingest_path( + let root_node = ingest_path::<_, _, _, &[u8]>( blob_service, directory_service, tmpdir.path().join("doesntmatter"), + None, ) .await .expect("must succeed"); assert_eq!( - proto::node::Node::Symlink(proto::SymlinkNode { - name: "doesntmatter".into(), - target: "/nix/store/somewhereelse".into(), - }), + Node::Symlink { + target: "/nix/store/somewhereelse".try_into().unwrap() + }, root_node, ) } @@ -50,21 +47,21 @@ async fn single_file() { std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap(); - let root_node = ingest_path( + let root_node = ingest_path::<_, _, _, &[u8]>( blob_service.clone(), directory_service, tmpdir.path().join("root"), + None, ) .await .expect("must succeed"); assert_eq!( - proto::node::Node::File(proto::FileNode { - name: "root".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + Node::File { + digest: HELLOWORLD_BLOB_DIGEST.clone(), size: HELLOWORLD_BLOB_CONTENTS.len() as u64, executable: false, - }), + }, root_node, ); @@ -89,23 +86,21 @@ async fn complicated() { // File ``keep/.keep` std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap(); - let root_node = ingest_path(blob_service.clone(), &directory_service, tmpdir.path()) - .await - .expect("must succeed"); + let root_node = ingest_path::<_, _, _, &[u8]>( + blob_service.clone(), + &directory_service, + tmpdir.path(), + None, + ) + .await + .expect("must succeed"); // ensure root_node matched expectations assert_eq!( - proto::node::Node::Directory(proto::DirectoryNode { - name: tmpdir - .path() - .file_name() - .unwrap() - .as_bytes() - .to_owned() - .into(), - digest: DIRECTORY_COMPLICATED.digest().into(), + Node::Directory { + digest: DIRECTORY_COMPLICATED.digest().clone(), size: DIRECTORY_COMPLICATED.size(), - }), + }, root_node, ); |