diff options
Diffstat (limited to 'tvix/castore')
26 files changed, 1199 insertions, 928 deletions
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index 4cbc29053b..d5d5c73f0a 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -10,7 +10,7 @@ async-tempfile = "0.4.0" blake3 = { version = "1.3.1", features = ["rayon", "std", "traits-preview"] } bstr = "1.6.0" bytes = "1.4.0" -data-encoding = "2.3.3" +data-encoding = "2.6.0" digest = "0.10.7" fastcdc = { version = "3.1.0", features = ["tokio"] } futures = "0.3.30" @@ -28,6 +28,8 @@ tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi tonic = "0.11.0" tower = "0.4.13" tracing = "0.1.37" +tracing-indicatif = "0.3.6" +tvix-tracing = { path = "../tracing", features = ["tonic"] } url = "2.4.0" walkdir = "2.4.0" zstd = "0.13.0" @@ -38,9 +40,7 @@ petgraph = "0.6.4" [dependencies.bigtable_rs] optional = true -# https://github.com/liufuyang/bigtable_rs/pull/72 -git = "https://github.com/flokli/bigtable_rs" -rev = "0af404741dfc40eb9fa99cf4d4140a09c5c20df7" +version = "0.2.10" [dependencies.fuse-backend-rs] optional = true @@ -50,6 +50,10 @@ version = "0.11.0" optional = true version = "0.2.144" +[dependencies.threadpool] +version = "1.8.1" +optional = true + [dependencies.tonic-reflection] optional = true version = "0.11.0" @@ -92,14 +96,14 @@ rstest_reuse = "0.6.0" xattr = "1.3.1" [features] -default = [] +default = ["cloud"] cloud = [ "dep:bigtable_rs", "object_store/aws", "object_store/azure", "object_store/gcp", ] -fs = ["dep:libc", "dep:fuse-backend-rs"] +fs = ["dep:fuse-backend-rs", "dep:threadpool", "dep:libc"] virtiofs = [ "fs", "dep:vhost", diff --git a/tvix/castore/default.nix b/tvix/castore/default.nix index 641d883760..03a12b6c20 100644 --- a/tvix/castore/default.nix +++ b/tvix/castore/default.nix @@ -1,23 +1,28 @@ -{ depot, pkgs, ... }: +{ depot, pkgs, lib, ... }: (depot.tvix.crates.workspaceMembers.tvix-castore.build.override { runTests = true; testPreRun = '' export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; ''; - - # enable some optional features. - features = [ "default" "cloud" ]; -}).overrideAttrs (_: { - meta.ci.targets = [ "integration-tests" ]; - passthru.integration-tests = depot.tvix.crates.workspaceMembers.tvix-castore.build.override { - runTests = true; - testPreRun = '' - export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; - export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}" +}).overrideAttrs (old: rec { + meta.ci.targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru); + passthru = (depot.tvix.utils.mkFeaturePowerset { + inherit (old) crateName; + features = ([ "cloud" "fuse" "tonic-reflection" ] + # virtiofs feature currently fails to build on Darwin + ++ lib.optional pkgs.stdenv.isLinux "virtiofs"); + override.testPreRun = '' + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt ''; - - # enable some optional features. - features = [ "default" "cloud" "integration" ]; + }) // { + integration-tests = depot.tvix.crates.workspaceMembers.${old.crateName}.build.override (old: { + runTests = true; + testPreRun = '' + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; + export PATH="$PATH:${pkgs.lib.makeBinPath [ pkgs.cbtemulator pkgs.google-cloud-bigtable-tool ]}" + ''; + features = old.features ++ [ "integration" ]; + }); }; }) diff --git a/tvix/castore/docs/blobstore-chunking.md b/tvix/castore/docs/blobstore-chunking.md deleted file mode 100644 index 49bbe69275..0000000000 --- a/tvix/castore/docs/blobstore-chunking.md +++ /dev/null @@ -1,147 +0,0 @@ -# BlobStore: Chunking & Verified Streaming - -`tvix-castore`'s BlobStore is a content-addressed storage system, using [blake3] -as hash function. - -Returned data is fetched by using the digest as lookup key, and can be verified -to be correct by feeding the received data through the hash function and -ensuring it matches the digest initially used for the lookup. - -This means, data can be downloaded by any untrusted third-party as well, as the -received data is validated to match the digest it was originally requested with. - -However, for larger blobs of data, having to download the entire blob at once is -wasteful, if we only care about a part of the blob. Think about mounting a -seekable data structure, like loop-mounting an .iso file, or doing partial reads -in a large Parquet file, a column-oriented data format. - -> We want to have the possibility to *seek* into a larger file. - -This however shouldn't compromise on data integrity properties - we should not -need to trust a peer we're downloading from to be "honest" about the partial -data we're reading. We should be able to verify smaller reads. - -Especially when substituting from an untrusted third-party, we want to be able -to detect quickly if that third-party is sending us wrong data, and terminate -the connection early. - -## Chunking -In content-addressed systems, this problem has historically been solved by -breaking larger blobs into smaller chunks, which can be fetched individually, -and making a hash of *this listing* the blob digest/identifier. - - - BitTorrent for example breaks files up into smaller chunks, and maintains - a list of sha1 digests for each of these chunks. Magnet links contain a - digest over this listing as an identifier. (See [bittorrent-v2][here for - more details]). - With the identifier, a client can fetch the entire list, and then recursively - "unpack the graph" of nodes, until it ends up with a list of individual small - chunks, which can be fetched individually. - - Similarly, IPFS with its IPLD model builds up a Merkle DAG, and uses the - digest of the root node as an identitier. - -These approaches solve the problem of being able to fetch smaller chunks in a -trusted fashion. They can also do some deduplication, in case there's the same -leaf nodes same leaf nodes in multiple places. - -However, they also have a big disadvantage. The chunking parameters, and the -"topology" of the graph structure itself "bleed" into the root hash of the -entire data structure itself. - -Depending on the chunking parameters used, there's different representations for -the same data, causing less data sharing/reuse in the overall system, in terms of how -many chunks need to be downloaded vs. are already available locally, as well as -how compact data is stored on-disk. - -This can be workarounded by agreeing on only a single way of chunking, but it's -not pretty and misses a lot of deduplication potential. - -### Chunking in Tvix' Blobstore -tvix-castore's BlobStore uses a hybrid approach to eliminate some of the -disadvantages, while still being content-addressed internally, with the -highlighted benefits. - -It uses [blake3] as hash function, and the blake3 digest of **the raw data -itself** as an identifier (rather than some application-specific Merkle DAG that -also embeds some chunking information). - -BLAKE3 is a tree hash where all left nodes fully populated, contrary to -conventional serial hash functions. To be able to validate the hash of a node, -one only needs the hash of the (2) children [^1], if any. - -This means one only needs to the root digest to validate a constructions, and these -constructions can be sent [separately][bao-spec]. - -This relieves us from the need of having to encode more granular chunking into -our data model / identifier upfront, but can make this a mostly a transport/ -storage concern. - -For the some more description on the (remote) protocol, check -`./blobstore-protocol.md`. - -#### Logical vs. physical chunking - -Due to the properties of the BLAKE3 hash function, we have logical blocks of -1KiB, but this doesn't necessarily imply we need to restrict ourselves to these -chunk sizes w.r.t. what "physical chunks" are sent over the wire between peers, -or are stored on-disk. - -The only thing we need to be able to read and verify an arbitrary byte range is -having the covering range of aligned 1K blocks, and a construction from the root -digest to the 1K block. - -Note the intermediate hash tree can be further trimmed, [omitting][bao-tree] -lower parts of the tree while still providing verified streaming - at the cost -of having to fetch larger covering ranges of aligned blocks. - -Let's pick an example. We identify each KiB by a number here for illustrational -purposes. - -Assuming we omit the last two layers of the hash tree, we end up with logical -4KiB leaf chunks (`bao_shift` of `2`). - -For a blob of 14 KiB total size, we could fetch logical blocks `[0..=3]`, -`[4..=7]`, `[8..=11]` and `[12..=13]` in an authenticated fashion: - -`[ 0 1 2 3 ] [ 4 5 6 7 ] [ 8 9 10 11 ] [ 12 13 ]` - -Assuming the server now informs us about the following physical chunking: - -``` -[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]` -``` - -If our application now wants to arbitrarily read from 0 until 4 (inclusive): - -``` -[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ] - |-------------| - -``` - -…we need to fetch physical chunks `[ 0 1 ]`, `[ 2 3 4 5 ]` and `[ 6 ] [ 7 8 ]`. - - -`[ 0 1 ]` and `[ 2 3 4 5 ]` are obvious, they contain the data we're -interested in. - -We however also need to fetch the physical chunks `[ 6 ]` and `[ 7 8 ]`, so we -can assemble `[ 4 5 6 7 ]` to verify both logical chunks: - -``` -[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ] -^ ^ ^ ^ -|----4KiB----|------4KiB-----| -``` - -Each physical chunk fetched can be validated to have the blake3 digest that was -communicated upfront, and can be stored in a client-side cache/storage, so -subsequent / other requests for the same data will be fast(er). - ---- - -[^1]: and the surrounding context, aka position inside the whole blob, which is available while verifying the tree -[bittorrent-v2]: https://blog.libtorrent.org/2020/09/bittorrent-v2/ -[blake3]: https://github.com/BLAKE3-team/BLAKE3 -[bao-spec]: https://github.com/oconnor663/bao/blob/master/docs/spec.md -[bao-tree]: https://github.com/n0-computer/bao-tree diff --git a/tvix/castore/docs/blobstore-protocol.md b/tvix/castore/docs/blobstore-protocol.md deleted file mode 100644 index 048cafc3d8..0000000000 --- a/tvix/castore/docs/blobstore-protocol.md +++ /dev/null @@ -1,104 +0,0 @@ -# BlobStore: Protocol / Composition - -This documents describes the protocol that BlobStore uses to substitute blobs -other ("remote") BlobStores. - -How to come up with the blake3 digest of the blob to fetch is left to another -layer in the stack. - -To put this into the context of Tvix as a Nix alternative, a blob represents an -individual file inside a StorePath. -In the Tvix Data Model, this is accomplished by having a `FileNode` (either the -`root_node` in a `PathInfo` message, or a individual file inside a `Directory` -message) encode a BLAKE3 digest. - -However, the whole infrastructure can be applied for other usecases requiring -exchange/storage or access into data of which the blake3 digest is known. - -## Protocol and Interfaces -As an RPC protocol, BlobStore currently uses gRPC. - -On the Rust side of things, every blob service implements the -[`BlobService`](../src/blobservice/mod.rs) async trait, which isn't -gRPC-specific. - -This `BlobService` trait provides functionality to check for existence of Blobs, -read from blobs, and write new blobs. -It also provides a method to ask for more granular chunks if they are available. - -In addition to some in-memory, on-disk and (soon) object-storage-based -implementations, we also have a `BlobService` implementation that talks to a -gRPC server, as well as a gRPC server wrapper component, which provides a gRPC -service for anything implementing the `BlobService` trait. - -This makes it very easy to talk to a remote `BlobService`, which does not even -need to be written in the same language, as long it speaks the same gRPC -protocol. - -It also puts very little requirements on someone implementing a new -`BlobService`, and how its internal storage or chunking algorithm looks like. - -The gRPC protocol is documented in `../protos/rpc_blobstore.proto`. -Contrary to the `BlobService` trait, it does not have any options for seeking/ -ranging, as it's more desirable to provide this through chunking (see also -`./blobstore-chunking.md`). - -## Composition -Different `BlobStore` are supposed to be "composed"/"layered" to express -caching, multiple local and remote sources. - -The fronting interface can be the same, it'd just be multiple "tiers" that can -respond to requests, depending on where the data resides. [^1] - -This makes it very simple for consumers, as they don't need to be aware of the -entire substitutor config. - -The flexibility of this doesn't need to be exposed to the user in the default -case; in most cases we should be fine with some form of on-disk storage and a -bunch of substituters with different priorities. - -### gRPC Clients -Clients are encouraged to always read blobs in a chunked fashion (asking for a -list of chunks for a blob via `BlobService.Stat()`, then fetching chunks via -`BlobService.Read()` as needed), instead of directly reading the entire blob via -`BlobService.Read()`. - -In a composition setting, this provides opportunity for caching, and avoids -downloading some chunks if they're already present locally (for example, because -they were already downloaded by reading from a similar blob earlier). - -It also removes the need for seeking to be a part of the gRPC protocol -alltogether, as chunks are supposed to be "reasonably small" [^2]. - -There's some further optimization potential, a `BlobService.Stat()` request -could tell the server it's happy with very small blobs just being inlined in -an additional additional field in the response, which would allow clients to -populate their local chunk store in a single roundtrip. - -## Verified Streaming -As already described in `./docs/blobstore-chunking.md`, the physical chunk -information sent in a `BlobService.Stat()` response is still sufficient to fetch -in an authenticated fashion. - -The exact protocol and formats are still a bit in flux, but here's some notes: - - - `BlobService.Stat()` request gets a `send_bao` field (bool), signalling a - [BAO][bao-spec] should be sent. Could also be `bao_shift` integer, signalling - how detailed (down to the leaf chunks) it should go. - The exact format (and request fields) still need to be defined, edef has some - ideas around omitting some intermediate hash nodes over the wire and - recomputing them, reducing size by another ~50% over [bao-tree]. - - `BlobService.Stat()` response gets some bao-related fields (`bao_shift` - field, signalling the actual format/shift level the server replies with, the - actual bao, and maybe some format specifier). - It would be nice to also be compatible with the baos used by [iroh], so we - can provide an implementation using it too. - ---- - -[^1]: We might want to have some backchannel, so it becomes possible to provide - feedback to the user that something is downloaded. -[^2]: Something between 512K-4M, TBD. -[bao-spec]: https://github.com/oconnor663/bao/blob/master/docs/spec.md -[bao-tree]: https://github.com/n0-computer/bao-tree -[iroh]: https://github.com/n0-computer/iroh diff --git a/tvix/castore/docs/data-model.md b/tvix/castore/docs/data-model.md deleted file mode 100644 index 2df6761aae..0000000000 --- a/tvix/castore/docs/data-model.md +++ /dev/null @@ -1,50 +0,0 @@ -# Data model - -This provides some more notes on the fields used in castore.proto. - -See `//tvix/store/docs/api.md` for the full context. - -## Directory message -`Directory` messages use the blake3 hash of their canonical protobuf -serialization as its identifier. - -A `Directory` message contains three lists, `directories`, `files` and -`symlinks`, holding `DirectoryNode`, `FileNode` and `SymlinkNode` messages -respectively. They describe all the direct child elements that are contained in -a directory. - -All three message types have a `name` field, specifying the (base)name of the -element (which MUST not contain slashes or null bytes, and MUST not be '.' or '..'). -For reproducibility reasons, the lists MUST be sorted by that name and also -MUST be unique across all three lists. - -In addition to the `name` field, the various *Node messages have the following -fields: - -## DirectoryNode -A `DirectoryNode` message represents a child directory. - -It has a `digest` field, which points to the identifier of another `Directory` -message, making a `Directory` a merkle tree (or strictly speaking, a graph, as -two elements pointing to a child directory with the same contents would point -to the same `Directory` message. - -There's also a `size` field, containing the (total) number of all child -elements in the referenced `Directory`, which helps for inode calculation. - -## FileNode -A `FileNode` message represents a child (regular) file. - -Its `digest` field contains the blake3 hash of the file contents. It can be -looked up in the `BlobService`. - -The `size` field contains the size of the blob the `digest` field refers to. - -The `executable` field specifies whether the file should be marked as -executable or not. - -## SymlinkNode -A `SymlinkNode` message represents a child symlink. - -In addition to the `name` field, the only additional field is the `target`, -which is a string containing the target of the symlink. diff --git a/tvix/castore/docs/why-not-git-trees.md b/tvix/castore/docs/why-not-git-trees.md deleted file mode 100644 index fd46252cf5..0000000000 --- a/tvix/castore/docs/why-not-git-trees.md +++ /dev/null @@ -1,57 +0,0 @@ -## Why not git tree objects? - -We've been experimenting with (some variations of) the git tree and object -format, and ultimately decided against using it as an internal format, and -instead adapted the one documented in the other documents here. - -While the tvix-store API protocol shares some similarities with the format used -in git for trees and objects, the git one has shown some significant -disadvantages: - -### The binary encoding itself - -#### trees -The git tree object format is a very binary, error-prone and -"made-to-be-read-and-written-from-C" format. - -Tree objects are a combination of null-terminated strings, and fields of known -length. References to other tree objects use the literal sha1 hash of another -tree object in this encoding. -Extensions of the format/changes are very hard to do right, because parsers are -not aware they might be parsing something different. - -The tvix-store protocol uses a canonical protobuf serialization, and uses -the [blake3][blake3] hash of that serialization to point to other `Directory` -messages. -It's both compact and with a wide range of libraries for encoders and decoders -in many programming languages. -The choice of protobuf makes it easy to add new fields, and make old clients -aware of some unknown fields being detected [^adding-fields]. - -#### blob -On disk, git blob objects start with a "blob" prefix, then the size of the -payload, and then the data itself. The hash of a blob is the literal sha1sum -over all of this - which makes it something very git specific to request for. - -tvix-store simply uses the [blake3][blake3] hash of the literal contents -when referring to a file/blob, which makes it very easy to ask other data -sources for the same data, as no git-specific payload is included in the hash. -This also plays very well together with things like [iroh][iroh-discussion], -which plans to provide a way to substitute (large)blobs by their blake3 hash -over the IPFS network. - -In addition to that, [blake3][blake3] makes it possible to do -[verified streaming][bao], as already described in other parts of the -documentation. - -The git tree object format uses sha1 both for references to other trees and -hashes of blobs, which isn't really a hash function to fundamentally base -everything on in 2023. -The [migration to sha256][git-sha256] also has been dead for some years now, -and it's unclear how a "blake3" version of this would even look like. - -[bao]: https://github.com/oconnor663/bao -[blake3]: https://github.com/BLAKE3-team/BLAKE3 -[git-sha256]: https://git-scm.com/docs/hash-function-transition/ -[iroh-discussion]: https://github.com/n0-computer/iroh/discussions/707#discussioncomment-5070197 -[^adding-fields]: Obviously, adding new fields will change hashes, but it's something that's easy to detect. \ No newline at end of file diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index 8898bbfb95..b7e266c4ea 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -30,18 +30,25 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error> // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?); - Box::new(GRPCBlobService::from_client(client)) + Box::new(GRPCBlobService::from_client( + BlobServiceClient::with_interceptor( + crate::tonic::channel_from_url(&url).await?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) } scheme if scheme.starts_with("objectstore+") => { // We need to convert the URL to string, strip the prefix there, and then // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. let trimmed_url = { let s = url.to_string(); - Url::parse(s.strip_prefix("objectstore+").unwrap()).unwrap() + let mut url = Url::parse(s.strip_prefix("objectstore+").unwrap()).unwrap(); + // trim the query pairs, they might contain credentials or local settings we don't want to send as-is. + url.set_query(None); + url }; Box::new( - ObjectStoreBlobService::parse_url(&trimmed_url) + ObjectStoreBlobService::parse_url_opts(&trimmed_url, url.query_pairs()) .map_err(|e| Error::StorageError(e.to_string()))?, ) } diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 5663cd3838..85250da99d 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -17,40 +17,43 @@ use tokio_util::{ io::{CopyToBytes, SinkWriter}, sync::PollSender, }; -use tonic::{async_trait, transport::Channel, Code, Status}; -use tracing::instrument; +use tonic::{async_trait, Code, Status}; +use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] -pub struct GRPCBlobService { +pub struct GRPCBlobService<T> { /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + grpc_client: proto::blob_service_client::BlobServiceClient<T>, } -impl GRPCBlobService { +impl<T> GRPCBlobService<T> { /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. - /// panics if called outside the context of a tokio runtime. - pub fn from_client( - grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, - ) -> Self { + pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self { Self { grpc_client } } } #[async_trait] -impl BlobService for GRPCBlobService { +impl<T> BlobService for GRPCBlobService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(skip(self, digest), fields(blob.digest=%digest))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { - let mut grpc_client = self.grpc_client.clone(); - let resp = grpc_client + match self + .grpc_client + .clone() .stat(proto::StatBlobRequest { digest: digest.clone().into(), ..Default::default() }) - .await; - - match resp { + .await + { Ok(_blob_meta) => Ok(true), Err(e) if e.code() == Code::NotFound => Ok(false), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), @@ -133,6 +136,8 @@ impl BlobService for GRPCBlobService { let task = tokio::spawn({ let mut grpc_client = self.grpc_client.clone(); async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) } + // instrument the task with the current span, this is not done by default + .in_current_span() }); // The tx part of the channel is converted to a sink of byte chunks. @@ -335,7 +340,6 @@ mod tests { .await .expect("must succeed"), ); - GRPCBlobService::from_client(client) }; diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs deleted file mode 100644 index b9746a5a05..0000000000 --- a/tvix/castore/src/directoryservice/closure_validator.rs +++ /dev/null @@ -1,309 +0,0 @@ -use std::collections::{HashMap, HashSet}; - -use bstr::ByteSlice; - -use petgraph::{ - graph::{DiGraph, NodeIndex}, - visit::{Bfs, Walker}, -}; -use tracing::instrument; - -use crate::{ - proto::{self, Directory}, - B3Digest, Error, -}; - -type DirectoryGraph = DiGraph<Directory, ()>; - -/// This can be used to validate a Directory closure (DAG of connected -/// Directories), and their insertion order. -/// -/// Directories need to be inserted (via `add`), in an order from the leaves to -/// the root (DFS Post-Order). -/// During insertion, We validate as much as we can at that time: -/// -/// - individual validation of Directory messages -/// - validation of insertion order (no upload of not-yet-known Directories) -/// - validation of size fields of referred Directories -/// -/// Internally it keeps all received Directories in a directed graph, -/// with node weights being the Directories and edges pointing to child -/// directories. -/// -/// Once all Directories have been inserted, a finalize function can be -/// called to get a (deduplicated and) validated list of directories, in -/// insertion order. -/// During finalize, a check for graph connectivity is performed too, to ensure -/// there's no disconnected components, and only one root. -#[derive(Default)] -pub struct ClosureValidator { - // A directed graph, using Directory as node weight, without edge weights. - // Edges point from parents to children. - graph: DirectoryGraph, - - // A lookup table from directory digest to node index. - digest_to_node_ix: HashMap<B3Digest, NodeIndex>, - - /// Keeps track of the last-inserted directory graph node index. - /// On a correct insert, this will be the root node, from which the DFS post - /// order traversal will start from. - last_directory_ix: Option<NodeIndex>, -} - -impl ClosureValidator { - /// Insert a new Directory into the closure. - /// Perform individual Directory validation, validation of insertion order - /// and size fields. - #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { - let digest = directory.digest(); - - // If we already saw this node previously, it's already validated and in the graph. - if self.digest_to_node_ix.contains_key(&digest) { - return Ok(()); - } - - // Do some general validation - directory - .validate() - .map_err(|e| Error::InvalidRequest(e.to_string()))?; - - // Ensure the directory only refers to directories which we already accepted. - // We lookup their node indices and add them to a HashSet. - let mut child_ixs = HashSet::new(); - for dir in &directory.directories { - let child_digest = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated - - // Ensure the digest has already been seen - let child_ix = *self.digest_to_node_ix.get(&child_digest).ok_or_else(|| { - Error::InvalidRequest(format!( - "'{}' refers to unseen child dir: {}", - dir.name.as_bstr(), - &child_digest - )) - })?; - - // Ensure the size specified in the child node matches the directory size itself. - let recorded_child_size = self - .graph - .node_weight(child_ix) - .expect("node not found") - .size(); - - // Ensure the size specified in the child node matches our records. - if dir.size != recorded_child_size { - return Err(Error::InvalidRequest(format!( - "'{}' has wrong size, specified {}, recorded {}", - dir.name.as_bstr(), - dir.size, - recorded_child_size - ))); - } - - child_ixs.insert(child_ix); - } - - // Insert node into the graph, and add edges to all children. - let node_ix = self.graph.add_node(directory); - for child_ix in child_ixs { - self.graph.add_edge(node_ix, child_ix, ()); - } - - // Record the mapping from digest to node_ix in our lookup table. - self.digest_to_node_ix.insert(digest, node_ix); - - // Update last_directory_ix. - self.last_directory_ix = Some(node_ix); - - Ok(()) - } - - /// Ensure that all inserted Directories are connected, then return a - /// (deduplicated) and validated list of directories, in from-leaves-to-root - /// order. - /// In case no elements have been inserted, returns an empty list. - #[instrument(level = "trace", skip_all, err)] - pub(crate) fn finalize(self) -> Result<Vec<Directory>, Error> { - let (graph, _) = match self.finalize_raw()? { - None => return Ok(vec![]), - Some(v) => v, - }; - // Dissolve the graph, returning the nodes as a Vec. - // As the graph was populated in a valid DFS PostOrder, we can return - // nodes in that same order. - let (nodes, _edges) = graph.into_nodes_edges(); - Ok(nodes.into_iter().map(|x| x.weight).collect()) - } - - /// Ensure that all inserted Directories are connected, then return a - /// (deduplicated) and validated list of directories, in from-root-to-leaves - /// order. - /// In case no elements have been inserted, returns an empty list. - #[instrument(level = "trace", skip_all, err)] - pub(crate) fn finalize_root_to_leaves(self) -> Result<Vec<Directory>, Error> { - let (graph, root) = match self.finalize_raw()? { - None => return Ok(vec![]), - Some(v) => v, - }; - - // do a BFS traversal of the graph, starting with the root node to get - // all nodes reachable from there. - let traversal = Bfs::new(&graph, root); - - let order = traversal.iter(&graph).collect::<Vec<_>>(); - - let (nodes, _edges) = graph.into_nodes_edges(); - - // Convert to option, so that we can take individual nodes out without messing up the - // indices - let mut nodes = nodes.into_iter().map(Some).collect::<Vec<_>>(); - - Ok(order - .iter() - .map(|i| nodes[i.index()].take().unwrap().weight) - .collect()) - } - - /// Internal implementation of closure validation - #[instrument(level = "trace", skip_all, err)] - fn finalize_raw(self) -> Result<Option<(DirectoryGraph, NodeIndex)>, Error> { - // If no nodes were inserted, an empty list is returned. - let last_directory_ix = if let Some(x) = self.last_directory_ix { - x - } else { - return Ok(None); - }; - - // do a BFS traversal of the graph, starting with the root node to get - // (the count of) all nodes reachable from there. - let mut traversal = Bfs::new(&self.graph, last_directory_ix); - - let mut visited_directory_count = 0; - #[cfg(debug_assertions)] - let mut visited_directory_ixs = HashSet::new(); - #[cfg_attr(not(debug_assertions), allow(unused))] - while let Some(directory_ix) = traversal.next(&self.graph) { - #[cfg(debug_assertions)] - visited_directory_ixs.insert(directory_ix); - - visited_directory_count += 1; - } - - // If the number of nodes collected equals the total number of nodes in - // the graph, we know all nodes are connected. - if visited_directory_count != self.graph.node_count() { - // more or less exhaustive error reporting. - #[cfg(debug_assertions)] - { - let all_directory_ixs: HashSet<_> = self.graph.node_indices().collect(); - - let unvisited_directories: HashSet<_> = all_directory_ixs - .difference(&visited_directory_ixs) - .map(|ix| self.graph.node_weight(*ix).expect("node not found")) - .collect(); - - return Err(Error::InvalidRequest(format!( - "found {} disconnected directories: {:?}", - self.graph.node_count() - visited_directory_ixs.len(), - unvisited_directories - ))); - } - #[cfg(not(debug_assertions))] - { - return Err(Error::InvalidRequest(format!( - "found {} disconnected directories", - self.graph.node_count() - visited_directory_count - ))); - } - } - - Ok(Some((self.graph, last_directory_ix))) - } -} - -#[cfg(test)] -mod tests { - use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, - proto::{self, Directory}, - }; - use lazy_static::lazy_static; - use rstest::rstest; - - lazy_static! { - pub static ref BROKEN_DIRECTORY : Directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // invalid name! - target: "doesntmatter".into(), - }], - ..Default::default() - }; - - pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() - }; - } - - use super::ClosureValidator; - - #[rstest] - /// Uploading an empty directory should succeed. - #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] - /// Uploading A, then B (referring to A) should succeed. - #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] - /// Uploading A, then A, then C (referring to A twice) should succeed. - /// We pretend to be a dumb client not deduping directories. - #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] - /// Uploading A, then C (referring to A twice) should succeed. - #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] - /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, - /// as B itself would be left unconnected. - #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] - /// Uploading B (referring to A) should fail immediately, because A was never uploaded. - #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] - /// Uploading a directory failing validation should fail immediately. - #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] - /// Uploading a directory which refers to another Directory with a wrong size should fail. - #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] - fn test_uploads( - #[case] directories_to_upload: &[&Directory], - #[case] exp_fail_upload_last: bool, - #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not. - ) { - let mut dcv = ClosureValidator::default(); - let len_directories_to_upload = directories_to_upload.len(); - - for (i, d) in directories_to_upload.iter().enumerate() { - let resp = dcv.add((*d).clone()); - if i == len_directories_to_upload - 1 && exp_fail_upload_last { - assert!(resp.is_err(), "expect last put to fail"); - - // We don't really care anymore what finalize() would return, as - // the add() failed. - return; - } else { - assert!(resp.is_ok(), "expect put to succeed"); - } - } - - // everything was uploaded successfully. Test finalize(). - let resp = dcv.finalize(); - - match exp_finalize { - Some(directories) => { - assert_eq!( - Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), - resp.expect("drain should succeed") - ); - } - None => { - resp.expect_err("drain should fail"); - } - } - } -} diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs new file mode 100644 index 0000000000..d3f351d6b6 --- /dev/null +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -0,0 +1,142 @@ +use futures::stream::BoxStream; +use futures::StreamExt; +use futures::TryFutureExt; +use futures::TryStreamExt; +use tonic::async_trait; +use tracing::{instrument, trace}; + +use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; +use crate::directoryservice::DirectoryPutter; +use crate::proto; +use crate::B3Digest; +use crate::Error; + +/// Asks near first, if not found, asks far. +/// If found in there, returns it, and *inserts* it into +/// near. +/// Specifically, it always obtains the entire directory closure from far and inserts it into near, +/// which is useful when far does not support accessing intermediate directories (but near does). +/// There is no negative cache. +/// Inserts and listings are not implemented for now. +#[derive(Clone)] +pub struct Cache<DS1, DS2> { + near: DS1, + far: DS2, +} + +impl<DS1, DS2> Cache<DS1, DS2> { + pub fn new(near: DS1, far: DS2) -> Self { + Self { near, far } + } +} + +#[async_trait] +impl<DS1, DS2> DirectoryService for Cache<DS1, DS2> +where + DS1: DirectoryService + Clone + 'static, + DS2: DirectoryService + Clone + 'static, +{ + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + match self.near.get(digest).await? { + Some(directory) => { + trace!("serving from cache"); + Ok(Some(directory)) + } + None => { + trace!("not found in near, asking remote…"); + + let mut copy = DirectoryGraph::with_order( + RootToLeavesValidator::new_with_root_digest(digest.clone()), + ); + + let mut stream = self.far.get_recursive(digest); + let root = stream.try_next().await?; + + if let Some(root) = root.clone() { + copy.add(root) + .map_err(|e| Error::StorageError(e.to_string()))?; + } + + while let Some(dir) = stream.try_next().await? { + copy.add(dir) + .map_err(|e| Error::StorageError(e.to_string()))?; + } + + let copy = copy + .validate() + .map_err(|e| Error::StorageError(e.to_string()))?; + + let mut put = self.near.put_multiple_start(); + for dir in copy.drain_leaves_to_root() { + put.put(dir).await?; + } + put.close().await?; + + Ok(root) + } + } + } + + #[instrument(skip_all)] + async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> { + Err(Error::StorageError("unimplemented".to_string())) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<'static, Result<proto::Directory, Error>> { + let near = self.near.clone(); + let far = self.far.clone(); + let digest = root_directory_digest.clone(); + Box::pin( + (async move { + let mut stream = near.get_recursive(&digest); + match stream.try_next().await? { + Some(first) => { + trace!("serving from cache"); + Ok(futures::stream::once(async { Ok(first) }) + .chain(stream) + .left_stream()) + } + None => { + trace!("not found in near, asking remote…"); + + let mut copy_for_near = DirectoryGraph::with_order( + RootToLeavesValidator::new_with_root_digest(digest.clone()), + ); + let mut copy_for_client = vec![]; + + let mut stream = far.get_recursive(&digest); + while let Some(dir) = stream.try_next().await? { + copy_for_near + .add(dir.clone()) + .map_err(|e| Error::StorageError(e.to_string()))?; + copy_for_client.push(dir); + } + + let copy_for_near = copy_for_near + .validate() + .map_err(|e| Error::StorageError(e.to_string()))?; + let mut put = near.put_multiple_start(); + for dir in copy_for_near.drain_leaves_to_root() { + put.put(dir).await?; + } + put.close().await?; + + Ok(futures::stream::iter(copy_for_client.into_iter().map(Ok)) + .right_stream()) + } + } + }) + .try_flatten_stream(), + ) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> { + Box::new(SimplePutter::new((*self).clone())) + } +} diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs new file mode 100644 index 0000000000..e6b9b16337 --- /dev/null +++ b/tvix/castore/src/directoryservice/directory_graph.rs @@ -0,0 +1,413 @@ +use std::collections::HashMap; + +use bstr::ByteSlice; + +use petgraph::{ + graph::{DiGraph, NodeIndex}, + visit::{Bfs, DfsPostOrder, EdgeRef, IntoNodeIdentifiers, Walker}, + Direction, Incoming, +}; +use tracing::instrument; + +use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; +use crate::{ + proto::{self, Directory, DirectoryNode}, + B3Digest, +}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("{0}")] + ValidationError(String), +} + +/// This can be used to validate and/or re-order a Directory closure (DAG of +/// connected Directories), and their insertion order. +/// +/// The DirectoryGraph is parametrized on the insertion order, and can be +/// constructed using the Default trait, or using `with_order` if the +/// OrderValidator needs to be customized. +/// +/// If the user is receiving directories from canonical protobuf encoding in +/// root-to-leaves order, and parsing them, she can call `digest_allowed` +/// _before_ parsing the protobuf record and then add it with `add_unchecked`. +/// All other users insert the directories via `add`, in their specified order. +/// During insertion, we validate as much as we can at that time: +/// +/// - individual validation of Directory messages +/// - validation of insertion order +/// - validation of size fields of referred Directories +/// +/// Internally it keeps all received Directories in a directed graph, +/// with node weights being the Directories and edges pointing to child/parent +/// directories. +/// +/// Once all Directories have been inserted, a validate function can be +/// called to perform a check for graph connectivity and ensure there's no +/// disconnected components or missing nodes. +/// Finally, the `drain_leaves_to_root` or `drain_root_to_leaves` can be +/// _chained_ on validate to get an iterator over the (deduplicated and) +/// validated list of directories in either order. +#[derive(Default)] +pub struct DirectoryGraph<O> { + // A directed graph, using Directory as node weight. + // Edges point from parents to children. + // + // Nodes with None weigths might exist when a digest has been referred to but the directory + // with this digest has not yet been sent. + // + // The option in the edge weight tracks the pending validation state of the respective edge, for example if + // the child has not been added yet. + graph: DiGraph<Option<Directory>, Option<DirectoryNode>>, + + // A lookup table from directory digest to node index. + digest_to_node_ix: HashMap<B3Digest, NodeIndex>, + + order_validator: O, +} + +pub struct ValidatedDirectoryGraph { + graph: DiGraph<Option<Directory>, Option<DirectoryNode>>, + + root: Option<NodeIndex>, +} + +fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> { + // Ensure the size specified in the child node matches our records. + if dir.size != child.size() { + return Err(Error::ValidationError(format!( + "'{}' has wrong size, specified {}, recorded {}", + dir.name.as_bstr(), + dir.size, + child.size(), + ))); + } + Ok(()) +} + +impl DirectoryGraph<LeavesToRootValidator> { + /// Insert a new Directory into the closure + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] + pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + if !self.order_validator.add_directory(&directory) { + return Err(Error::ValidationError( + "unknown directory was referenced".into(), + )); + } + self.add_order_unchecked(directory) + } +} + +impl DirectoryGraph<RootToLeavesValidator> { + /// If the user is parsing directories from canonical protobuf encoding, she can + /// call `digest_allowed` _before_ parsing the protobuf record and then add it + /// with `add_unchecked`. + pub fn digest_allowed(&self, digest: B3Digest) -> bool { + self.order_validator.digest_allowed(&digest) + } + + /// Insert a new Directory into the closure + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] + pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + let digest = directory.digest(); + if !self.order_validator.digest_allowed(&digest) { + return Err(Error::ValidationError("unexpected digest".into())); + } + self.order_validator.add_directory_unchecked(&directory); + self.add_order_unchecked(directory) + } +} + +impl<O: OrderValidator> DirectoryGraph<O> { + /// Customize the ordering, i.e. for pre-setting the root of the RootToLeavesValidator + pub fn with_order(order_validator: O) -> Self { + Self { + graph: Default::default(), + digest_to_node_ix: Default::default(), + order_validator, + } + } + + /// Adds a directory which has already been confirmed to be in-order to the graph + pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> { + // Do some basic validation + directory + .validate() + .map_err(|e| Error::ValidationError(e.to_string()))?; + + let digest = directory.digest(); + + // Teach the graph about the existence of a node with this digest + let ix = *self + .digest_to_node_ix + .entry(digest) + .or_insert_with(|| self.graph.add_node(None)); + + if self.graph[ix].is_some() { + // The node is already in the graph, there is nothing to do here. + return Ok(()); + } + + // set up edges to all child directories + for subdir in &directory.directories { + let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap(); + + let child_ix = *self + .digest_to_node_ix + .entry(subdir_digest) + .or_insert_with(|| self.graph.add_node(None)); + + let pending_edge_check = match &self.graph[child_ix] { + Some(child) => { + // child is already available, validate the edge now + check_edge(subdir, child)?; + None + } + None => Some(subdir.clone()), // pending validation + }; + self.graph.add_edge(ix, child_ix, pending_edge_check); + } + + // validate the edges from parents to this node + // this collects edge ids in a Vec because there is no edges_directed_mut :'c + for edge_id in self + .graph + .edges_directed(ix, Direction::Incoming) + .map(|edge_ref| edge_ref.id()) + .collect::<Vec<_>>() + .into_iter() + { + let edge_weight = self + .graph + .edge_weight_mut(edge_id) + .expect("edge not found") + .take() + .expect("edge is already validated"); + check_edge(&edge_weight, &directory)?; + } + + // finally, store the directory information in the node weight + self.graph[ix] = Some(directory); + + Ok(()) + } + + #[instrument(level = "trace", skip_all, err)] + pub fn validate(self) -> Result<ValidatedDirectoryGraph, Error> { + // find all initial nodes (nodes without incoming edges) + let mut roots = self + .graph + .node_identifiers() + .filter(|&a| self.graph.neighbors_directed(a, Incoming).next().is_none()); + + let root = roots.next(); + if roots.next().is_some() { + return Err(Error::ValidationError( + "graph has disconnected roots".into(), + )); + } + + // test that the graph is complete + if self.graph.raw_nodes().iter().any(|n| n.weight.is_none()) { + return Err(Error::ValidationError("graph is incomplete".into())); + } + + Ok(ValidatedDirectoryGraph { + graph: self.graph, + root, + }) + } +} + +impl ValidatedDirectoryGraph { + /// Return the list of directories in from-root-to-leaves order. + /// In case no elements have been inserted, returns an empty list. + /// + /// panics if the specified root is not in the graph + #[instrument(level = "trace", skip_all)] + pub fn drain_root_to_leaves(self) -> impl Iterator<Item = Directory> { + let order = match self.root { + Some(root) => { + // do a BFS traversal of the graph, starting with the root node + Bfs::new(&self.graph, root) + .iter(&self.graph) + .collect::<Vec<_>>() + } + None => vec![], // No nodes have been inserted, do not traverse + }; + + let (mut nodes, _edges) = self.graph.into_nodes_edges(); + + order + .into_iter() + .filter_map(move |i| nodes[i.index()].weight.take()) + } + + /// Return the list of directories in from-leaves-to-root order. + /// In case no elements have been inserted, returns an empty list. + /// + /// panics when the specified root is not in the graph + #[instrument(level = "trace", skip_all)] + pub fn drain_leaves_to_root(self) -> impl Iterator<Item = Directory> { + let order = match self.root { + Some(root) => { + // do a DFS Post-Order traversal of the graph, starting with the root node + DfsPostOrder::new(&self.graph, root) + .iter(&self.graph) + .collect::<Vec<_>>() + } + None => vec![], // No nodes have been inserted, do not traverse + }; + + let (mut nodes, _edges) = self.graph.into_nodes_edges(); + + order + .into_iter() + .filter_map(move |i| nodes[i.index()].weight.take()) + } +} + +#[cfg(test)] +mod tests { + use crate::{ + fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, + proto::{self, Directory}, + }; + use lazy_static::lazy_static; + use rstest::rstest; + + lazy_static! { + pub static ref BROKEN_DIRECTORY : Directory = Directory { + symlinks: vec![proto::SymlinkNode { + name: "".into(), // invalid name! + target: "doesntmatter".into(), + }], + ..Default::default() + }; + + pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { + directories: vec![proto::DirectoryNode { + name: "foo".into(), + digest: DIRECTORY_A.digest().into(), + size: DIRECTORY_A.size() + 42, // wrong! + }], + ..Default::default() + }; + } + + use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; + + #[rstest] + /// Uploading an empty directory should succeed. + #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] + /// Uploading A, then B (referring to A) should succeed. + #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] + /// Uploading A, then A, then C (referring to A twice) should succeed. + /// We pretend to be a dumb client not deduping directories. + #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] + /// Uploading A, then C (referring to A twice) should succeed. + #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] + /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, + /// as B itself would be left unconnected. + #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] + /// Uploading B (referring to A) should fail immediately, because A was never uploaded. + #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] + /// Uploading a directory failing validation should fail immediately. + #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] + /// Uploading a directory which refers to another Directory with a wrong size should fail. + #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] + fn test_uploads( + #[case] directories_to_upload: &[&Directory], + #[case] exp_fail_upload_last: bool, + #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not. + ) { + let mut dcv = DirectoryGraph::<LeavesToRootValidator>::default(); + let len_directories_to_upload = directories_to_upload.len(); + + for (i, d) in directories_to_upload.iter().enumerate() { + let resp = dcv.add((*d).clone()); + if i == len_directories_to_upload - 1 && exp_fail_upload_last { + assert!(resp.is_err(), "expect last put to fail"); + + // We don't really care anymore what finalize() would return, as + // the add() failed. + return; + } else { + assert!(resp.is_ok(), "expect put to succeed"); + } + } + + // everything was uploaded successfully. Test finalize(). + let resp = dcv + .validate() + .map(|validated| validated.drain_leaves_to_root().collect::<Vec<_>>()); + + match exp_finalize { + Some(directories) => { + assert_eq!( + Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), + resp.expect("drain should succeed") + ); + } + None => { + resp.expect_err("drain should fail"); + } + } + } + + #[rstest] + /// Downloading an empty directory should succeed. + #[case::empty_directory(&*DIRECTORY_A, &[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] + /// Downlading B, then A (referenced by B) should succeed. + #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_B, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] + /// Downloading C (referring to A twice), then A should succeed. + #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] + /// Downloading C, then B (both referring to A but not referring to each other) should fail immediately as B has no connection to C (the root) + #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)] + /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). + #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)] + /// Downloading a directory failing validation should fail immediately. + #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)] + /// Downloading a directory which refers to another Directory with a wrong size should fail. + #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)] + fn test_downloads( + #[case] root: &Directory, + #[case] directories_to_upload: &[&Directory], + #[case] exp_fail_upload_last: bool, + #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not. + ) { + let mut dcv = + DirectoryGraph::with_order(RootToLeavesValidator::new_with_root_digest(root.digest())); + let len_directories_to_upload = directories_to_upload.len(); + + for (i, d) in directories_to_upload.iter().enumerate() { + let resp = dcv.add((*d).clone()); + if i == len_directories_to_upload - 1 && exp_fail_upload_last { + assert!(resp.is_err(), "expect last put to fail"); + + // We don't really care anymore what finalize() would return, as + // the add() failed. + return; + } else { + assert!(resp.is_ok(), "expect put to succeed"); + } + } + + // everything was uploaded successfully. Test finalize(). + let resp = dcv + .validate() + .map(|validated| validated.drain_leaves_to_root().collect::<Vec<_>>()); + + match exp_finalize { + Some(directories) => { + assert_eq!( + Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), + resp.expect("drain should succeed") + ); + } + None => { + resp.expect_err("drain should fail"); + } + } + } +} diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index ee675ca68a..999170dcd1 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -63,18 +63,25 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Er // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?); - Box::new(GRPCDirectoryService::from_client(client)) + Box::new(GRPCDirectoryService::from_client( + DirectoryServiceClient::with_interceptor( + crate::tonic::channel_from_url(&url).await?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) } scheme if scheme.starts_with("objectstore+") => { // We need to convert the URL to string, strip the prefix there, and then // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. let trimmed_url = { let s = url.to_string(); - Url::parse(s.strip_prefix("objectstore+").unwrap()).unwrap() + let mut url = Url::parse(s.strip_prefix("objectstore+").unwrap()).unwrap(); + // trim the query pairs, they might contain credentials or local settings we don't want to send as-is. + url.set_query(None); + url }; Box::new( - ObjectStoreDirectoryService::parse_url(&trimmed_url) + ObjectStoreDirectoryService::parse_url_opts(&trimmed_url, url.query_pairs()) .map_err(|e| Error::StorageError(e.to_string()))?, ) } diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index fe935629bf..ca9b0de07b 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -9,31 +9,35 @@ use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::async_trait; -use tonic::Code; -use tonic::{transport::Channel, Status}; -use tracing::{instrument, warn}; +use tonic::{async_trait, Code, Status}; +use tracing::{instrument, warn, Instrument as _}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] -pub struct GRPCDirectoryService { +pub struct GRPCDirectoryService<T> { /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, } -impl GRPCDirectoryService { +impl<T> GRPCDirectoryService<T> { /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, ) -> Self { Self { grpc_client } } } #[async_trait] -impl DirectoryService for GRPCDirectoryService { +impl<T> DirectoryService for GRPCDirectoryService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] async fn get( &self, @@ -194,14 +198,17 @@ impl DirectoryService for GRPCDirectoryService { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move { - let s = grpc_client - .put(UnboundedReceiverStream::new(rx)) - .await? - .into_inner(); + let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn( + async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); - Ok(s) - }); + Ok(s) + } // instrument the task with the current span, this is not done by default + .in_current_span(), + ); Box::new(GRPCPutter { rq: Some((task, tx)), diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index 3f180ef162..eff4a685fa 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -2,11 +2,13 @@ use crate::{proto, B3Digest, Error}; use futures::stream::BoxStream; use tonic::async_trait; -mod closure_validator; +mod combinators; +mod directory_graph; mod from_addr; mod grpc; mod memory; mod object_store; +mod order_validator; mod simple_putter; mod sled; #[cfg(test)] @@ -14,11 +16,13 @@ pub mod tests; mod traverse; mod utils; -pub use self::closure_validator::ClosureValidator; +pub use self::combinators::Cache; +pub use self::directory_graph::DirectoryGraph; pub use self::from_addr::from_addr; pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService; pub use self::object_store::ObjectStoreDirectoryService; +pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; pub use self::simple_putter::SimplePutter; pub use self::sled::SledDirectoryService; pub use self::traverse::descend_to; diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index 64ce335edb..feaaaa39cd 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::sync::Arc; use data_encoding::HEXLOWER; @@ -16,7 +15,9 @@ use tonic::async_trait; use tracing::{instrument, trace, warn, Level}; use url::Url; -use super::{ClosureValidator, DirectoryPutter, DirectoryService}; +use super::{ + DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator, +}; use crate::{proto, B3Digest, Error}; /// Stores directory closures in an object store. @@ -97,9 +98,10 @@ impl DirectoryService for ObjectStoreDirectoryService { &self, root_directory_digest: &B3Digest, ) -> BoxStream<'static, Result<proto::Directory, Error>> { - // The Directory digests we're expecting to receive. - let mut expected_directory_digests: HashSet<B3Digest> = - HashSet::from([root_directory_digest.clone()]); + // Check that we are not passing on bogus from the object store to the client, and that the + // trust chain from the root digest to the leaves is intact + let mut order_validator = + RootToLeavesValidator::new_with_root_digest(root_directory_digest.clone()); let dir_path = derive_dirs_path(&self.base_path, root_directory_digest); let object_store = self.object_store.clone(); @@ -130,8 +132,7 @@ impl DirectoryService for ObjectStoreDirectoryService { let digest: B3Digest = hasher.update(&buf).finalize().as_bytes().into(); // Ensure to only decode the directory objects whose digests we trust - let was_expected = expected_directory_digests.remove(&digest); - if !was_expected { + if !order_validator.digest_allowed(&digest) { return Err(crate::Error::StorageError(format!( "received unexpected directory {}", digest @@ -143,13 +144,8 @@ impl DirectoryService for ObjectStoreDirectoryService { Error::StorageError(e.to_string()) })?; - for directory in &directory.directories { - // Allow the children to appear next - expected_directory_digests.insert( - B3Digest::try_from(directory.digest.clone()) - .map_err(|e| Error::StorageError(e.to_string()))?, - ); - } + // Allow the children to appear next + order_validator.add_directory_unchecked(&directory); Ok(directory) })()) @@ -177,7 +173,7 @@ struct ObjectStoreDirectoryPutter { object_store: Arc<dyn ObjectStore>, base_path: Path, - directory_validator: Option<ClosureValidator>, + directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>, } impl ObjectStoreDirectoryPutter { @@ -197,7 +193,9 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { - validator.add(directory)?; + validator + .add(directory) + .map_err(|e| Error::StorageError(e.to_string()))?; } } @@ -214,7 +212,11 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { // retrieve the validated directories. // It is important that they are in topological order (root first), // since that's how we want to retrieve them from the object store in the end. - let directories = validator.finalize_root_to_leaves()?; + let directories = validator + .validate() + .map_err(|e| Error::StorageError(e.to_string()))? + .drain_root_to_leaves() + .collect::<Vec<_>>(); // Get the root digest let root_digest = directories diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs new file mode 100644 index 0000000000..6045f5d241 --- /dev/null +++ b/tvix/castore/src/directoryservice/order_validator.rs @@ -0,0 +1,181 @@ +use std::collections::HashSet; +use tracing::warn; + +use crate::{proto::Directory, B3Digest}; + +pub trait OrderValidator { + /// Update the order validator's state with the directory + /// Returns whether the directory was accepted + fn add_directory(&mut self, directory: &Directory) -> bool; +} + +#[derive(Default)] +/// Validates that newly introduced directories are already referenced from +/// the root via existing directories. +/// Commonly used when _receiving_ a directory closure _from_ a store. +pub struct RootToLeavesValidator { + /// Only used to remember the root node, not for validation + expected_digests: HashSet<B3Digest>, +} + +impl RootToLeavesValidator { + /// Use to validate the root digest of the closure upon receiving the first + /// directory. + pub fn new_with_root_digest(root_digest: B3Digest) -> Self { + let mut this = Self::default(); + this.expected_digests.insert(root_digest); + this + } + + /// Checks if a directory is in-order based on its digest. + /// + /// Particularly useful when receiving directories in canonical protobuf + /// encoding, so that directories not connected to the root can be rejected + /// without parsing. + /// + /// After parsing, the directory must be passed to `add_directory_unchecked` + /// to add its children to the list of expected digests. + pub fn digest_allowed(&self, digest: &B3Digest) -> bool { + self.expected_digests.is_empty() // we don't know the root node; allow any + || self.expected_digests.contains(digest) + } + + /// Update the order validator's state with the directory + pub fn add_directory_unchecked(&mut self, directory: &Directory) { + // No initial root was specified and this is the first directory + if self.expected_digests.is_empty() { + self.expected_digests.insert(directory.digest()); + } + + for subdir in &directory.directories { + // Allow the children to appear next + let subdir_digest = subdir.digest.clone().try_into().unwrap(); + self.expected_digests.insert(subdir_digest); + } + } +} + +impl OrderValidator for RootToLeavesValidator { + fn add_directory(&mut self, directory: &Directory) -> bool { + if !self.digest_allowed(&directory.digest()) { + return false; + } + self.add_directory_unchecked(directory); + true + } +} + +#[derive(Default)] +/// Validates that newly uploaded directories only reference directories which +/// have already been introduced. +/// Commonly used when _uploading_ a directory closure _to_ a store. +pub struct LeavesToRootValidator { + /// This is empty in the beginning, and gets filled as leaves and intermediates are + /// inserted + allowed_references: HashSet<B3Digest>, +} + +impl OrderValidator for LeavesToRootValidator { + fn add_directory(&mut self, directory: &Directory) -> bool { + let digest = directory.digest(); + + for subdir in &directory.directories { + let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory() + if !self.allowed_references.contains(&subdir_digest) { + warn!( + directory.digest = %digest, + subdirectory.digest = %subdir_digest, + "unexpected directory reference" + ); + return false; + } + } + + self.allowed_references.insert(digest.clone()); + + true + } +} + +#[cfg(test)] +mod tests { + use super::{LeavesToRootValidator, RootToLeavesValidator}; + use crate::directoryservice::order_validator::OrderValidator; + use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; + use crate::proto::Directory; + use rstest::rstest; + + #[rstest] + /// Uploading an empty directory should succeed. + #[case::empty_directory(&[&*DIRECTORY_A], false)] + /// Uploading A, then B (referring to A) should succeed. + #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false)] + /// Uploading A, then A, then C (referring to A twice) should succeed. + /// We pretend to be a dumb client not deduping directories. + #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false)] + /// Uploading A, then C (referring to A twice) should succeed. + #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false)] + /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, + /// as B itself would be left unconnected. + #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false)] + /// Uploading B (referring to A) should fail immediately, because A was never uploaded. + #[case::dangling_pointer(&[&*DIRECTORY_B], true)] + fn leaves_to_root( + #[case] directories_to_upload: &[&Directory], + #[case] exp_fail_upload_last: bool, + ) { + let mut validator = LeavesToRootValidator::default(); + let len_directories_to_upload = directories_to_upload.len(); + + for (i, d) in directories_to_upload.iter().enumerate() { + let resp = validator.add_directory(d); + if i == len_directories_to_upload - 1 && exp_fail_upload_last { + assert!(!resp, "expect last put to fail"); + + // We don't really care anymore what finalize() would return, as + // the add() failed. + return; + } else { + assert!(resp, "expect put to succeed"); + } + } + } + + #[rstest] + /// Downloading an empty directory should succeed. + #[case::empty_directory(&*DIRECTORY_A, &[&*DIRECTORY_A], false)] + /// Downlading B, then A (referenced by B) should succeed. + #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_B, &*DIRECTORY_A], false)] + /// Downloading C (referring to A twice), then A should succeed. + #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_A], false)] + /// Downloading C, then B (both referring to A but not referring to each other) should fail immediately as B has no connection to C (the root) + #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true)] + /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). + #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true)] + fn root_to_leaves( + #[case] root: &Directory, + #[case] directories_to_upload: &[&Directory], + #[case] exp_fail_upload_last: bool, + ) { + let mut validator = RootToLeavesValidator::new_with_root_digest(root.digest()); + let len_directories_to_upload = directories_to_upload.len(); + + for (i, d) in directories_to_upload.iter().enumerate() { + let resp1 = validator.digest_allowed(&d.digest()); + let resp = validator.add_directory(d); + assert_eq!( + resp1, resp, + "digest_allowed should return the same value as add_directory" + ); + if i == len_directories_to_upload - 1 && exp_fail_upload_last { + assert!(!resp, "expect last put to fail"); + + // We don't really care anymore what finalize() would return, as + // the add() failed. + return; + } else { + assert!(resp, "expect put to succeed"); + } + } + } +} diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs index 25617ebcac..dc54e3d11d 100644 --- a/tvix/castore/src/directoryservice/simple_putter.rs +++ b/tvix/castore/src/directoryservice/simple_putter.rs @@ -1,6 +1,6 @@ -use super::ClosureValidator; use super::DirectoryPutter; use super::DirectoryService; +use super::{DirectoryGraph, LeavesToRootValidator}; use crate::proto; use crate::B3Digest; use crate::Error; @@ -14,7 +14,7 @@ use tracing::warn; pub struct SimplePutter<DS: DirectoryService> { directory_service: DS, - directory_validator: Option<ClosureValidator>, + directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>, } impl<DS: DirectoryService> SimplePutter<DS> { @@ -33,7 +33,9 @@ impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { - validator.add(directory)?; + validator + .add(directory) + .map_err(|e| Error::StorageError(e.to_string()))?; } } @@ -46,7 +48,11 @@ impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { None => Err(Error::InvalidRequest("already closed".to_string())), Some(validator) => { // retrieve the validated directories. - let directories = validator.finalize()?; + let directories = validator + .validate() + .map_err(|e| Error::StorageError(e.to_string()))? + .drain_leaves_to_root() + .collect::<Vec<_>>(); // Get the root digest, which is at the end (cf. insertion order) let root_digest = directories diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 9490a49c00..bd98ed6b1e 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -8,7 +8,7 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{ClosureValidator, DirectoryPutter, DirectoryService}; +use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; #[derive(Clone)] pub struct SledDirectoryService { @@ -135,7 +135,7 @@ pub struct SledDirectoryPutter { /// The directories (inside the directory validator) that we insert later, /// or None, if they were already inserted. - directory_validator: Option<ClosureValidator>, + directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>, } #[async_trait] @@ -145,7 +145,9 @@ impl DirectoryPutter for SledDirectoryPutter { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { - validator.add(directory)?; + validator + .add(directory) + .map_err(|e| Error::StorageError(e.to_string()))?; } } @@ -162,7 +164,11 @@ impl DirectoryPutter for SledDirectoryPutter { let tree = self.tree.clone(); move || { // retrieve the validated directories. - let directories = validator.finalize()?; + let directories = validator + .validate() + .map_err(|e| Error::StorageError(e.to_string()))? + .drain_leaves_to_root() + .collect::<Vec<_>>(); // Get the root digest, which is at the end (cf. insertion order) let root_digest = directories diff --git a/tvix/castore/src/fs/fuse.rs b/tvix/castore/src/fs/fuse/mod.rs index cd50618ff5..64ef29ed2a 100644 --- a/tvix/castore/src/fs/fuse.rs +++ b/tvix/castore/src/fs/fuse/mod.rs @@ -1,8 +1,13 @@ -use std::{io, path::Path, sync::Arc, thread}; +use std::{io, path::Path, sync::Arc}; use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use parking_lot::Mutex; +use threadpool::ThreadPool; use tracing::{error, instrument}; +#[cfg(test)] +mod tests; + struct FuseServer<FS> where FS: FileSystem + Sync + Send, @@ -46,9 +51,12 @@ where } } +/// Starts a [Filesystem] with the specified number of threads, and provides +/// functions to unmount, and wait for it to have completed. +#[derive(Clone)] pub struct FuseDaemon { - session: FuseSession, - threads: Vec<thread::JoinHandle<()>>, + session: Arc<Mutex<FuseSession>>, + threads: Arc<ThreadPool>, } impl FuseDaemon { @@ -56,7 +64,7 @@ impl FuseDaemon { pub fn new<FS, P>( fs: FS, mountpoint: P, - threads: usize, + num_threads: usize, allow_other: bool, ) -> Result<Self, io::Error> where @@ -73,40 +81,49 @@ impl FuseDaemon { session .mount() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - let mut join_handles = Vec::with_capacity(threads); - for _ in 0..threads { + + // construct a thread pool + let threads = threadpool::Builder::new() + .num_threads(num_threads) + .thread_name("fuse_server".to_string()) + .build(); + + for _ in 0..num_threads { + // for each thread requested, create and start a FuseServer accepting requests. let mut server = FuseServer { server: server.clone(), channel: session .new_channel() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, }; - let join_handle = thread::Builder::new() - .name("fuse_server".to_string()) - .spawn(move || { - let _ = server.start(); - })?; - join_handles.push(join_handle); + + threads.execute(move || { + let _ = server.start(); + }); } Ok(FuseDaemon { - session, - threads: join_handles, + session: Arc::new(Mutex::new(session)), + threads: Arc::new(threads), }) } + /// Waits for all threads to finish. + #[instrument(skip_all)] + pub fn wait(&self) { + self.threads.join() + } + + /// Send the unmount command, and waits for all threads to finish. #[instrument(skip_all, err)] - pub fn unmount(&mut self) -> Result<(), io::Error> { + pub fn unmount(&self) -> Result<(), io::Error> { + // Send the unmount command. self.session + .lock() .umount() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - for thread in self.threads.drain(..) { - thread.join().map_err(|_| { - io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") - })?; - } - + self.wait(); Ok(()) } } diff --git a/tvix/castore/src/fs/tests.rs b/tvix/castore/src/fs/fuse/tests.rs index d6eeb8a411..bcebcf4a72 100644 --- a/tvix/castore/src/fs/tests.rs +++ b/tvix/castore/src/fs/fuse/tests.rs @@ -11,7 +11,8 @@ use std::{ use tempfile::TempDir; use tokio_stream::{wrappers::ReadDirStream, StreamExt}; -use super::{fuse::FuseDaemon, TvixStoreFs}; +use super::FuseDaemon; +use crate::fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST}; use crate::proto as castorepb; use crate::proto::node::Node; use crate::{ @@ -247,7 +248,7 @@ async fn mount() { let (blob_service, directory_service) = gen_svcs(); - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, BTreeMap::default(), @@ -270,7 +271,7 @@ async fn root() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service) = gen_svcs(); - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, BTreeMap::default(), @@ -304,7 +305,7 @@ async fn root_with_listing() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -348,7 +349,7 @@ async fn stat_file_at_root() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -385,7 +386,7 @@ async fn read_file_at_root() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -422,7 +423,7 @@ async fn read_large_file_at_root() { populate_blob_b(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -467,7 +468,7 @@ async fn symlink_readlink() { populate_symlink(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -514,7 +515,7 @@ async fn read_stat_through_symlink() { populate_blob_a(&blob_service, &mut root_nodes).await; populate_symlink(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -559,7 +560,7 @@ async fn read_stat_directory() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -596,7 +597,7 @@ async fn xattr() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -614,12 +615,12 @@ async fn xattr() { // There should be 1 key, XATTR_NAME_DIRECTORY_DIGEST. assert_eq!(1, xattr_names.len(), "there should be 1 xattr name"); assert_eq!( - super::XATTR_NAME_DIRECTORY_DIGEST, + XATTR_NAME_DIRECTORY_DIGEST, xattr_names.first().unwrap().as_encoded_bytes() ); // The key should equal to the string-formatted b3 digest. - let val = xattr::get(&p, OsStr::from_bytes(super::XATTR_NAME_DIRECTORY_DIGEST)) + let val = xattr::get(&p, OsStr::from_bytes(XATTR_NAME_DIRECTORY_DIGEST)) .expect("must succeed") .expect("must be some"); assert_eq!( @@ -643,12 +644,12 @@ async fn xattr() { // There should be 1 key, XATTR_NAME_BLOB_DIGEST. assert_eq!(1, xattr_names.len(), "there should be 1 xattr name"); assert_eq!( - super::XATTR_NAME_BLOB_DIGEST, + XATTR_NAME_BLOB_DIGEST, xattr_names.first().unwrap().as_encoded_bytes() ); // The key should equal to the string-formatted b3 digest. - let val = xattr::get(&p, OsStr::from_bytes(super::XATTR_NAME_BLOB_DIGEST)) + let val = xattr::get(&p, OsStr::from_bytes(XATTR_NAME_BLOB_DIGEST)) .expect("must succeed") .expect("must be some"); assert_eq!( @@ -679,7 +680,7 @@ async fn read_blob_inside_dir() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -719,7 +720,7 @@ async fn read_blob_deep_inside_dir() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -762,7 +763,7 @@ async fn readdir() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -822,7 +823,7 @@ async fn readdir_deep() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -872,7 +873,7 @@ async fn check_attributes() { populate_symlink(&mut root_nodes).await; populate_blob_helloworld(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -947,7 +948,7 @@ async fn compare_inodes_directories() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -991,7 +992,7 @@ async fn compare_inodes_files() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1040,7 +1041,7 @@ async fn compare_inodes_symlinks() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; populate_symlink2(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1083,7 +1084,7 @@ async fn read_wrong_paths_in_root() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1138,7 +1139,7 @@ async fn disallow_writes() { let (blob_service, directory_service) = gen_svcs(); let root_nodes = BTreeMap::default(); - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1170,7 +1171,7 @@ async fn missing_directory() { populate_directorynode_without_directory(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1218,7 +1219,7 @@ async fn missing_blob() { populate_filenode_without_blob(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index 826523131f..b565ed60ac 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -9,9 +9,6 @@ pub mod fuse; #[cfg(feature = "virtiofs")] pub mod virtiofs; -#[cfg(test)] -mod tests; - pub use self::root_nodes::RootNodes; use self::{ file_attr::ROOT_FILE_ATTR, @@ -46,7 +43,7 @@ use tokio::{ io::{AsyncReadExt, AsyncSeekExt}, sync::mpsc, }; -use tracing::{debug, error, instrument, warn, Span}; +use tracing::{debug, error, instrument, warn, Instrument as _, Span}; /// This implements a read-only FUSE filesystem for a tvix-store /// with the passed [BlobService], [DirectoryService] and [RootNodes]. @@ -400,16 +397,20 @@ where // This task will run in the background immediately and will exit // after the stream ends or if we no longer want any more entries. - self.tokio_handle.spawn(async move { - let mut stream = root_nodes_provider.list().enumerate(); - while let Some(node) = stream.next().await { - if tx.send(node).await.is_err() { - // If we get a send error, it means the sync code - // doesn't want any more entries. - break; + self.tokio_handle.spawn( + async move { + let mut stream = root_nodes_provider.list().enumerate(); + while let Some(node) = stream.next().await { + if tx.send(node).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } } } - }); + // instrument the task with the current span, this is not done by default + .in_current_span(), + ); // Put the rx part into [self.dir_handles]. // TODO: this will overflow after 2**64 operations, diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index 0ebb4a2361..cd5b1290e0 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -1,38 +1,23 @@ //! Imports from an archive (tarballs) use std::collections::HashMap; -use std::io::{Cursor, Write}; -use std::sync::Arc; use petgraph::graph::{DiGraph, NodeIndex}; use petgraph::visit::{DfsPostOrder, EdgeRef}; use petgraph::Direction; use tokio::io::AsyncRead; -use tokio::sync::Semaphore; -use tokio::task::JoinSet; use tokio_stream::StreamExt; use tokio_tar::Archive; -use tokio_util::io::InspectReader; use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::import::{ingest_entries, IngestionEntry, IngestionError}; use crate::proto::node::Node; -use crate::B3Digest; -type TarPathBuf = std::path::PathBuf; - -/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the -/// background. -/// -/// This is a u32 since we acquire a weighted semaphore using the size of the blob. -/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of -/// the blob can be represented using a u32 and will not cause an overflow. -const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024; +use super::blobs::{self, ConcurrentBlobUploader}; -/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads. -const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024; +type TarPathBuf = std::path::PathBuf; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -57,13 +42,6 @@ pub enum Error { #[error("unable to read link name field for {0}: {1}")] LinkName(TarPathBuf, std::io::Error), - #[error("unable to read blob contents for {0}: {1}")] - BlobRead(TarPathBuf, std::io::Error), - - // FUTUREWORK: proper error for blob finalize - #[error("unable to finalize blob {0}: {1}")] - BlobFinalize(TarPathBuf, std::io::Error), - #[error("unsupported tar entry {0} type: {1:?}")] EntryType(TarPathBuf, tokio_tar::EntryType), @@ -72,6 +50,9 @@ pub enum Error { #[error("unexpected number of top level directory entries")] UnexpectedNumberOfTopLevelEntries, + + #[error(transparent)] + BlobUploadError(#[from] blobs::Error), } /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and @@ -94,8 +75,7 @@ where // In the first phase, collect up all the regular files and symlinks. let mut nodes = IngestionEntryGraph::new(); - let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE)); - let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new(); + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); let mut entries_iter = archive.entries().map_err(Error::Entries)?; while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? { @@ -110,77 +90,14 @@ where tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { - let header_size = header + let size = header .size() .map_err(|e| Error::Size(tar_path.clone(), e))?; - // If the blob is small enough, read it off the wire, compute the digest, - // and upload it to the [BlobService] in the background. - let (size, digest) = if header_size <= CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 { - let mut buffer = Vec::with_capacity(header_size as usize); - let mut hasher = blake3::Hasher::new(); - let mut reader = InspectReader::new(&mut entry, |bytes| { - hasher.write_all(bytes).unwrap(); - }); - - // Ensure that we don't buffer into memory until we've acquired a permit. - // This prevents consuming too much memory when performing concurrent - // blob uploads. - let permit = semaphore - .clone() - // This cast is safe because ensure the header_size is less than - // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32. - .acquire_many_owned(header_size as u32) - .await - .unwrap(); - let size = tokio::io::copy(&mut reader, &mut buffer) - .await - .map_err(|e| Error::Size(tar_path.clone(), e))?; - - let digest: B3Digest = hasher.finalize().as_bytes().into(); - - { - let blob_service = blob_service.clone(); - let digest = digest.clone(); - async_blob_uploads.spawn({ - let tar_path = tar_path.clone(); - async move { - let mut writer = blob_service.open_write().await; - - tokio::io::copy(&mut Cursor::new(buffer), &mut writer) - .await - .map_err(|e| Error::BlobRead(tar_path.clone(), e))?; - - let blob_digest = writer - .close() - .await - .map_err(|e| Error::BlobFinalize(tar_path, e))?; - - assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch"); - - // Make sure we hold the permit until we finish writing the blob - // to the [BlobService]. - drop(permit); - Ok(()) - } - }); - } - - (size, digest) - } else { - let mut writer = blob_service.open_write().await; - - let size = tokio::io::copy(&mut entry, &mut writer) - .await - .map_err(|e| Error::BlobRead(tar_path.clone(), e))?; - - let digest = writer - .close() - .await - .map_err(|e| Error::BlobFinalize(tar_path.clone(), e))?; - - (size, digest) - }; + let digest = blob_uploader + .upload(&path, size, &mut entry) + .await + .map_err(Error::BlobUploadError)?; let executable = entry .header() @@ -219,9 +136,7 @@ where nodes.add(entry)?; } - while let Some(result) = async_blob_uploads.join_next().await { - result.expect("task panicked")?; - } + blob_uploader.join().await.map_err(Error::BlobUploadError)?; let root_node = ingest_entries( directory_service, diff --git a/tvix/castore/src/import/blobs.rs b/tvix/castore/src/import/blobs.rs new file mode 100644 index 0000000000..8135d871d6 --- /dev/null +++ b/tvix/castore/src/import/blobs.rs @@ -0,0 +1,177 @@ +use std::{ + io::{Cursor, Write}, + sync::Arc, +}; + +use tokio::{ + io::AsyncRead, + sync::Semaphore, + task::{JoinError, JoinSet}, +}; +use tokio_util::io::InspectReader; + +use crate::{blobservice::BlobService, B3Digest, Path, PathBuf}; + +/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the +/// background. +/// +/// This is a u32 since we acquire a weighted semaphore using the size of the blob. +/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of +/// the blob can be represented using a u32 and will not cause an overflow. +const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024; + +/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads. +const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("unable to read blob contents for {0}: {1}")] + BlobRead(PathBuf, std::io::Error), + + // FUTUREWORK: proper error for blob finalize + #[error("unable to finalize blob {0}: {1}")] + BlobFinalize(PathBuf, std::io::Error), + + #[error("unexpected size for {path} wanted: {wanted} got: {got}")] + UnexpectedSize { + path: PathBuf, + wanted: u64, + got: u64, + }, + + #[error("blob upload join error: {0}")] + JoinError(#[from] JoinError), +} + +/// The concurrent blob uploader provides a mechanism for concurrently uploading small blobs. +/// This is useful when ingesting from sources like tarballs and archives which each blob entry +/// must be read sequentially. Ingesting many small blobs sequentially becomes slow due to +/// round trip time with the blob service. The concurrent blob uploader will buffer small +/// blobs in memory and upload them to the blob service in the background. +/// +/// Once all blobs have been uploaded, make sure to call [ConcurrentBlobUploader::join] to wait +/// for all background jobs to complete and check for any errors. +pub struct ConcurrentBlobUploader<BS> { + blob_service: BS, + upload_tasks: JoinSet<Result<(), Error>>, + upload_semaphore: Arc<Semaphore>, +} + +impl<BS> ConcurrentBlobUploader<BS> +where + BS: BlobService + Clone + 'static, +{ + /// Creates a new concurrent blob uploader which uploads blobs to the provided + /// blob service. + pub fn new(blob_service: BS) -> Self { + Self { + blob_service, + upload_tasks: JoinSet::new(), + upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)), + } + } + + /// Uploads a blob to the blob service. If the blob is small enough it will be read to a buffer + /// and uploaded in the background. + /// This will read the entirety of the provided reader unless an error occurs, even if blobs + /// are uploaded in the background.. + pub async fn upload<R>( + &mut self, + path: &Path, + expected_size: u64, + mut r: R, + ) -> Result<B3Digest, Error> + where + R: AsyncRead + Unpin, + { + if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 { + let mut buffer = Vec::with_capacity(expected_size as usize); + let mut hasher = blake3::Hasher::new(); + let mut reader = InspectReader::new(&mut r, |bytes| { + hasher.write_all(bytes).unwrap(); + }); + + let permit = self + .upload_semaphore + .clone() + // This cast is safe because ensure the header_size is less than + // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32. + .acquire_many_owned(expected_size as u32) + .await + .unwrap(); + let size = tokio::io::copy(&mut reader, &mut buffer) + .await + .map_err(|e| Error::BlobRead(path.into(), e))?; + let digest: B3Digest = hasher.finalize().as_bytes().into(); + + if size != expected_size { + return Err(Error::UnexpectedSize { + path: path.into(), + wanted: expected_size, + got: size, + }); + } + + self.upload_tasks.spawn({ + let blob_service = self.blob_service.clone(); + let expected_digest = digest.clone(); + let path = path.to_owned(); + let r = Cursor::new(buffer); + async move { + let digest = upload_blob(&blob_service, &path, expected_size, r).await?; + + assert_eq!(digest, expected_digest, "Tvix bug: blob digest mismatch"); + + // Make sure we hold the permit until we finish writing the blob + // to the [BlobService]. + drop(permit); + Ok(()) + } + }); + + return Ok(digest); + } + + upload_blob(&self.blob_service, path, expected_size, r).await + } + + /// Waits for all background upload jobs to complete, returning any upload errors. + pub async fn join(mut self) -> Result<(), Error> { + while let Some(result) = self.upload_tasks.join_next().await { + result??; + } + Ok(()) + } +} + +async fn upload_blob<BS, R>( + blob_service: &BS, + path: &Path, + expected_size: u64, + mut r: R, +) -> Result<B3Digest, Error> +where + BS: BlobService, + R: AsyncRead + Unpin, +{ + let mut writer = blob_service.open_write().await; + + let size = tokio::io::copy(&mut r, &mut writer) + .await + .map_err(|e| Error::BlobRead(path.into(), e))?; + + let digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(path.into(), e))?; + + if size != expected_size { + return Err(Error::UnexpectedSize { + path: path.into(), + wanted: expected_size, + got: size, + }); + } + + Ok(digest) +} diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index 9d3ecfe6ab..dc7821b810 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -6,7 +6,11 @@ use std::fs::FileType; use std::os::unix::ffi::OsStringExt; use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; +use tokio::io::BufReader; +use tokio_util::io::InspectReader; use tracing::instrument; +use tracing::Span; +use tracing_indicatif::span_ext::IndicatifSpanExt; use walkdir::DirEntry; use walkdir::WalkDir; @@ -26,7 +30,7 @@ use super::IngestionError; /// /// This function will walk the filesystem using `walkdir` and will consume /// `O(#number of entries)` space. -#[instrument(skip(blob_service, directory_service), fields(path), err)] +#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)] pub async fn ingest_path<BS, DS, P>( blob_service: BS, directory_service: DS, @@ -37,6 +41,10 @@ where BS: BlobService + Clone, DS: DirectoryService, { + let span = Span::current(); + span.pb_set_message(&format!("Ingesting {:?}", path)); + span.pb_start(); + let iter = WalkDir::new(path.as_ref()) .follow_links(false) .follow_root_links(false) @@ -44,7 +52,18 @@ where .into_iter(); let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref()); - ingest_entries(directory_service, entries).await + ingest_entries( + directory_service, + entries.inspect({ + let span = span.clone(); + move |e| { + if e.is_ok() { + span.pb_inc(1) + } + } + }), + ) + .await } /// Converts an iterator of [walkdir::DirEntry]s into a stream of ingestion entries. @@ -138,7 +157,7 @@ where } /// Uploads the file at the provided [Path] the the [BlobService]. -#[instrument(skip(blob_service), fields(path), err)] +#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)] async fn upload_blob<BS>( blob_service: BS, path: impl AsRef<std::path::Path>, @@ -146,16 +165,29 @@ async fn upload_blob<BS>( where BS: BlobService, { - let mut file = match tokio::fs::File::open(path.as_ref()).await { - Ok(file) => file, - Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)), - }; + let span = Span::current(); + span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); + span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref())); + span.pb_start(); - let mut writer = blob_service.open_write().await; + let file = tokio::fs::File::open(path.as_ref()) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; - if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { - return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)); - }; + let metadata = file + .metadata() + .await + .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?; + + span.pb_set_length(metadata.len()); + let reader = InspectReader::new(file, |d| { + span.pb_inc(d.len() as u64); + }); + + let mut writer = blob_service.open_write().await; + tokio::io::copy(&mut BufReader::new(reader), &mut writer) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; let digest = writer .close() diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index e8b27e469c..a9ac0be6b0 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -14,7 +14,6 @@ use crate::proto::FileNode; use crate::proto::SymlinkNode; use crate::B3Digest; use futures::{Stream, StreamExt}; - use tracing::Level; use std::collections::HashMap; @@ -24,6 +23,7 @@ mod error; pub use error::IngestionError; pub mod archive; +pub mod blobs; pub mod fs; /// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService]. diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs index 5c1428690c..ce1d2bcd24 100644 --- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -1,4 +1,5 @@ -use crate::directoryservice::ClosureValidator; +use crate::directoryservice::DirectoryGraph; +use crate::directoryservice::LeavesToRootValidator; use crate::proto; use crate::{directoryservice::DirectoryService, B3Digest}; use futures::stream::BoxStream; @@ -78,14 +79,20 @@ where ) -> Result<Response<proto::PutDirectoryResponse>, Status> { let mut req_inner = request.into_inner(); - // We put all Directory messages we receive into ClosureValidator first. - let mut validator = ClosureValidator::default(); + // We put all Directory messages we receive into DirectoryGraph. + let mut validator = DirectoryGraph::<LeavesToRootValidator>::default(); while let Some(directory) = req_inner.message().await? { - validator.add(directory)?; + validator + .add(directory) + .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?; } // drain, which validates connectivity too. - let directories = validator.finalize()?; + let directories = validator + .validate() + .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))? + .drain_leaves_to_root() + .collect::<Vec<_>>(); let mut directory_putter = self.directory_service.put_multiple_start(); for directory in directories { |