diff options
Diffstat (limited to 'tvix/store')
32 files changed, 4397 insertions, 0 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml new file mode 100644 index 000000000000..9065465af622 --- /dev/null +++ b/tvix/store/Cargo.toml @@ -0,0 +1,59 @@ +[package] +name = "tvix-store" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.68" +async-stream = "0.3.5" +blake3 = { version = "1.3.1", features = ["rayon", "std"] } +bytes = "1.4.0" +clap = { version = "4.0", features = ["derive", "env"] } +count-write = "0.1.0" +data-encoding = "2.3.3" +futures = "0.3.30" +lazy_static = "1.4.0" +nix-compat = { path = "../nix-compat", features = ["async"] } +pin-project-lite = "0.2.13" +prost = "0.12.1" +opentelemetry = { version = "0.21.0", optional = true} +opentelemetry-otlp = { version = "0.14.0", optional = true } +opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"], optional = true} +sha2 = "0.10.6" +sled = { version = "0.34.7" } +thiserror = "1.0.38" +tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } +tokio-listener = { version = "0.2.2", features = [ "tonic010" ] } +tokio-stream = { version = "0.1.14", features = ["fs"] } +tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } +tonic = { version = "0.10.2", features = ["tls", "tls-roots"] } +tower = "0.4.13" +tracing = "0.1.37" +tracing-opentelemetry = "0.22.0" +tracing-subscriber = { version = "0.3.16", features = ["json"] } +tvix-castore = { path = "../castore" } +url = "2.4.0" +walkdir = "2.4.0" +async-recursion = "1.0.5" +reqwest = { version = "0.11.22", features = ["rustls-tls", "stream"], default-features = false } +xz2 = "0.1.7" + +[dependencies.tonic-reflection] +optional = true +version = "0.10.2" + +[build-dependencies] +prost-build = "0.12.1" +tonic-build = "0.10.2" + +[dev-dependencies] +test-case = "3.3.1" +tempfile = "3.3.0" +tokio-retry = "0.3.0" + +[features] +default = ["fuse", "otlp", "tonic-reflection"] +fuse = ["tvix-castore/fuse"] +otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"] +tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"] +virtiofs = ["tvix-castore/virtiofs"] diff --git a/tvix/store/README.md b/tvix/store/README.md new file mode 100644 index 000000000000..a9d29671d8bb --- /dev/null +++ b/tvix/store/README.md @@ -0,0 +1,63 @@ +# //tvix/store + +This contains the code hosting the tvix-store. + +For the local store, Nix realizes files on the filesystem in `/nix/store` (and +maintains some metadata in a SQLite database). For "remote stores", it +communicates this metadata in NAR (Nix ARchive) and NARInfo format. + +Compared to the Nix model, `tvix-store` stores data on a much more granular +level than that, which provides more deduplication possibilities, and more +granular copying. + +However, enough information is preserved to still be able to render NAR and +NARInfo when needed. + +## More Information +The store consists out of two different gRPC services, `tvix.castore.v1` for +the low-level content-addressed bits, and `tvix.store.v1` for the Nix and +`StorePath`-specific bits. + +Check the `protos/` subfolder both here and in `castore` for the definition of +the exact RPC methods and messages. + +## Interacting with the GRPC service manually +The shell environment in `//tvix` provides `evans`, which is an interactive +REPL-based gPRC client. + +You can use it to connect to a `tvix-store` and call the various RPC methods. + +```shell +$ cargo run -- daemon & +$ evans --host localhost --port 8000 -r repl + ______ + | ____| + | |__ __ __ __ _ _ __ ___ + | __| \ \ / / / _. | | '_ \ / __| + | |____ \ V / | (_| | | | | | \__ \ + |______| \_/ \__,_| |_| |_| |___/ + + more expressive universal gRPC client + + +localhost:8000> package tvix.castore.v1 +tvix.castore.v1@localhost:8000> service BlobService + +tvix.castore.v1.BlobService@localhost:8000> call Put --bytes-from-file +data (TYPE_BYTES) => /run/current-system/system +{ + "digest": "KOM3/IHEx7YfInAnlJpAElYezq0Sxn9fRz7xuClwNfA=" +} + +tvix.castore.v1.BlobService@localhost:8000> call Read --bytes-as-base64 +digest (TYPE_BYTES) => KOM3/IHEx7YfInAnlJpAElYezq0Sxn9fRz7xuClwNfA= +{ + "data": "eDg2XzY0LWxpbnV4" +} + +$ echo eDg2XzY0LWxpbnV4 | base64 -d +x86_64-linux +``` + +Thanks to `tvix-store` providing gRPC Server Reflection (with `reflection` +feature), you don't need to point `evans` to the `.proto` files. diff --git a/tvix/store/build.rs b/tvix/store/build.rs new file mode 100644 index 000000000000..cfeda59698a0 --- /dev/null +++ b/tvix/store/build.rs @@ -0,0 +1,38 @@ +use std::io::Result; + +fn main() -> Result<()> { + #[allow(unused_mut)] + let mut builder = tonic_build::configure(); + + #[cfg(feature = "tonic-reflection")] + { + let out_dir = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap()); + let descriptor_path = out_dir.join("tvix.store.v1.bin"); + + builder = builder.file_descriptor_set_path(descriptor_path); + }; + + // https://github.com/hyperium/tonic/issues/908 + let mut config = prost_build::Config::new(); + config.bytes(["."]); + config.extern_path(".tvix.castore.v1", "::tvix_castore::proto"); + + builder + .build_server(true) + .build_client(true) + .compile_with_config( + config, + &[ + "tvix/store/protos/pathinfo.proto", + "tvix/store/protos/rpc_pathinfo.proto", + ], + // If we are in running `cargo build` manually, using `../..` works fine, + // but in case we run inside a nix build, we need to instead point PROTO_ROOT + // to a sparseTree containing that structure. + &[match std::env::var_os("PROTO_ROOT") { + Some(proto_root) => proto_root.to_str().unwrap().to_owned(), + None => "../..".to_string(), + }], + )?; + Ok(()) +} diff --git a/tvix/store/default.nix b/tvix/store/default.nix new file mode 100644 index 000000000000..35d2a22bb2ce --- /dev/null +++ b/tvix/store/default.nix @@ -0,0 +1,34 @@ +{ depot, pkgs, ... }: + +let + mkImportCheck = p: expectedPath: { + label = ":nix :import ${p} with tvix-store import"; + needsOutput = true; + command = pkgs.writeShellScript "tvix-import-check" '' + export BLOB_SERVICE_ADDR=memory:// + export DIRECTORY_SERVICE_ADDR=memory:// + export PATH_INFO_SERVICE_ADDR=memory:// + TVIX_STORE_OUTPUT=$(result/bin/tvix-store import ${p}) + EXPECTED='${/* the vebatim expected Tvix output: */expectedPath}' + + echo "tvix-store output: ''${TVIX_STORE_OUTPUT}" + if [ "$TVIX_STORE_OUTPUT" != "$EXPECTED" ]; then + echo "Correct would have been ''${EXPECTED}" + exit 1 + fi + + echo "Output was correct." + ''; + }; +in + +(depot.tvix.crates.workspaceMembers.tvix-store.build.override { + runTests = true; + # virtiofs feature currently fails to build on Darwin. + # we however can ship it for non-darwin. + features = if pkgs.stdenv.isDarwin then [ "default" ] else [ "default" "virtiofs" ]; +}).overrideAttrs (_: { + meta.ci.extraSteps = { + import-docs = (mkImportCheck "tvix/store/docs" ./docs); + }; +}) diff --git a/tvix/store/docs/api.md b/tvix/store/docs/api.md new file mode 100644 index 000000000000..c1dacc89a598 --- /dev/null +++ b/tvix/store/docs/api.md @@ -0,0 +1,288 @@ +tvix-[ca]store API +============== + +This document outlines the design of the API exposed by tvix-castore and tvix- +store, as well as other implementations of this store protocol. + +This document is meant to be read side-by-side with +[castore.md](../../tvix-castore/docs/castore.md) which describes the data model +in more detail. + +The store API has four main consumers: + +1. The evaluator (or more correctly, the CLI/coordinator, in the Tvix + case) communicates with the store to: + + * Upload files and directories (e.g. from `builtins.path`, or `src = ./path` + Nix expressions). + * Read files from the store where necessary (e.g. when `nixpkgs` is + located in the store, or for IFD). + +2. The builder communicates with the store to: + + * Upload files and directories after a build, to persist build artifacts in + the store. + +3. Tvix clients (such as users that have Tvix installed, or, depending + on perspective, builder environments) expect the store to + "materialise" on disk to provide a directory layout with store + paths. + +4. Stores may communicate with other stores, to substitute already built store + paths, i.e. a store acts as a binary cache for other stores. + +The store API attempts to reuse parts of its API between these three +consumers by making similarities explicit in the protocol. This leads +to a protocol that is slightly more complex than a simple "file +upload/download"-system, but at significantly greater efficiency, both in terms +of deduplication opportunities as well as granularity. + +## The Store model + +Contents inside a tvix-store can be grouped into three different message types: + + * Blobs + * Directories + * PathInfo (see further down) + +(check `castore.md` for more detailed field descriptions) + +### Blobs +A blob object contains the literal file contents of regular (or executable) +files. + +### Directory +A directory object describes the direct children of a directory. + +It contains: + - name of child (regular or executable) files, and their [blake3][blake3] hash. + - name of child symlinks, and their target (as string) + - name of child directories, and their [blake3][blake3] hash (forming a Merkle DAG) + +### Content-addressed Store Model +For example, lets consider a directory layout like this, with some +imaginary hashes of file contents: + +``` +. +├── file-1.txt hash: 5891b5b522d5df086d0ff0b110fb +└── nested + └── file-2.txt hash: abc6fd595fc079d3114d4b71a4d8 +``` + +A hash for the *directory* `nested` can be created by creating the `Directory` +object: + +```json +{ + "directories": [], + "files": [{ + "name": "file-2.txt", + "digest": "abc6fd595fc079d3114d4b71a4d8", + "size": 123, + }], + "symlink": [], +} +``` + +And then hashing a serialised form of that data structure. We use the blake3 +hash of the canonical protobuf representation. Let's assume the hash was +`ff0029485729bcde993720749232`. + +To create the directory object one layer up, we now refer to our `nested` +directory object in `directories`, and to `file-1.txt` in `files`: + +```json +{ + "directories": [{ + "name": "nested", + "digest": "ff0029485729bcde993720749232", + "size": 1, + }], + "files": [{ + "name": "file-1.txt", + "digest": "5891b5b522d5df086d0ff0b110fb", + "size": 124, + }] +} +``` + +This Merkle DAG of Directory objects, and flat store of blobs can be used to +describe any file/directory/symlink inside a store path. Due to its content- +addressed nature, it'll automatically deduplicate (re-)used (sub)directories, +and allow substitution from any (untrusted) source. + +The thing that's now only missing is the metadata to map/"mount" from the +content-addressed world to a physical path. + +### PathInfo +As most paths in the Nix store currently are input-addressed [^input-addressed], +and the `tvix-castore` data model is also not intrinsically using NAR hashes, +we need something mapping from an input-addressed "output path hash" (or a Nix- +specific content-addressed path) to the contents in the `tvix-castore` world. + +That's what `PathInfo` provides. It embeds the root node (Directory, File or +Symlink) at a given store path. + +The root nodes' `name` field is populated with the (base)name inside +`/nix/store`, so `xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-pname-1.2.3`. + +The `PathInfo` message also stores references to other store paths, and some +more NARInfo-specific metadata (signatures, narhash, narsize). + + +## API overview + +There's three different services: + +### BlobService +`BlobService` can be used to store and retrieve blobs of data, used to host +regular file contents. + +It is content-addressed, using [blake3][blake3] +as a hashing function. + +As blake3 is a tree hash, there's an opportunity to do +[verified streaming][bao] of parts of the file, +which doesn't need to trust any more information than the root hash itself. +Future extensions of the `BlobService` protocol will enable this. + +### DirectoryService +`DirectoryService` allows lookups (and uploads) of `Directory` messages, and +whole reference graphs of them. + + +### PathInfoService +The PathInfo service provides lookups from a store path hash to a `PathInfo` +message. + +## Example flows + +Below there are some common use cases of tvix-store, and how the different +services are used. + +### Upload files and directories +This is needed for `builtins.path` or `src = ./path` in Nix expressions (A), as +well as for uploading build artifacts to a store (B). + +The path specified needs to be (recursively, BFS-style) traversed. + * All file contents need to be hashed with blake3, and submitted to the + *BlobService* if not already present. + A reference to them needs to be added to the parent Directory object that's + constructed. + * All symlinks need to be added to the parent directory they reside in. + * Whenever a Directory has been fully traversed, it needs to be uploaded to + the *DirectoryService* and a reference to it needs to be added to the parent + Directory object. + +Most of the hashing / directory traversal/uploading can happen in parallel, +as long as Directory objects only refer to Directory objects and Blobs that +have already been uploaded. + +When reaching the root, a `PathInfo` object needs to be constructed. + + * In the case of content-addressed paths (A), the name of the root node is + based on the NAR representation of the contents. + It might make sense to be able to offload the NAR calculation to the store, + which can cache it. + * In the case of build artifacts (B), the output path is input-addressed and + known upfront. + +Contrary to Nix, this has the advantage of not having to upload a lot of things +to the store that didn't change. + +### Reading files from the store from the evaluator +This is the case when `nixpkgs` is located in the store, or IFD in general. + +The store client asks the `PathInfoService` for the `PathInfo` of the output +path in the request, and looks at the root node. + +If something other than the root of the store path is requested, like for +example `maintainers/maintainer-list.nix`, the root_node Directory is inspected +and potentially a chain of `Directory` objects requested from +*DirectoryService*. [^n+1query]. + +When the desired file is reached, the *BlobService* can be used to read the +contents of this file, and return it back to the evaluator. + +FUTUREWORK: define how importing from symlinks should/does work. + +Contrary to Nix, this has the advantage of not having to copy all of the +contents of a store path to the evaluating machine, but really only fetching +the files the evaluator currently cares about. + +### Materializing store paths on disk +This is useful for people running a Tvix-only system, or running builds on a +"Tvix remote builder" in its own mount namespace. + +In a system with Nix installed, we can't simply manually "extract" things to +`/nix/store`, as Nix assumes to own all writes to this location. +In these use cases, we're probably better off exposing a tvix-store as a local +binary cache (that's what `//tvix/nar-bridge` does). + +Assuming we are in an environment where we control `/nix/store` exclusively, a +"realize to disk" would either "extract" things from the `tvix-store` to a +filesystem, or expose a `FUSE`/`virtio-fs` filesystem. + +The latter is already implemented, and particularly interesting for (remote) +build workloads, as build inputs can be realized on-demand, which saves copying +around a lot of never- accessed files. + +In both cases, the API interactions are similar. + * The *PathInfoService* is asked for the `PathInfo` of the requested store path. + * If everything should be "extracted", the *DirectoryService* is asked for all + `Directory` objects in the closure, the file structure is created, all Blobs + are downloaded and placed in their corresponding location and all symlinks + are created accordingly. + * If this is a FUSE filesystem, we can decide to only request a subset, + similar to the "Reading files from the store from the evaluator" use case, + even though it might make sense to keep all Directory objects around. + (See the caveat in "Trust model" though!) + +### Stores communicating with other stores +The gRPC API exposed by the tvix-store allows composing multiple stores, and +implementing some caching strategies, that store clients don't need to be aware +of. + + * For example, a caching strategy could have a fast local tvix-store, that's + asked first and filled with data from a slower remote tvix-store. + + * Multiple stores could be asked for the same data, and whatever store returns + the right data first wins. + + +## Trust model / Distribution +As already described above, the only non-content-addressed service is the +`PathInfo` service. + +This means, all other messages (such as `Blob` and `Directory` messages) can be +substituted from many different, untrusted sources/mirrors, which will make +plugging in additional substitution strategies like IPFS, local network +neighbors super simple. That's also why it's living in the `tvix-castore` crate. + +As for `PathInfo`, we don't specify an additional signature mechanism yet, but +carry the NAR-based signatures from Nix along. + +This means, if we don't trust a remote `PathInfo` object, we currently need to +"stream" the NAR representation to validate these signatures. + +However, the slow part is downloading of NAR files, and considering we have +more granularity available, we might only need to download some small blobs, +rather than a whole NAR file. + +A future signature mechanism, that is only signing (parts of) the `PathInfo` +message, which only points to content-addressed data will enable verified +partial access into a store path, opening up opportunities for lazy filesystem +access etc. + + + +[blake3]: https://github.com/BLAKE3-team/BLAKE3 +[bao]: https://github.com/oconnor663/bao +[^input-addressed]: Nix hashes the A-Term representation of a .drv, after doing + some replacements on refered Input Derivations to calculate + output paths. +[^n+1query]: This would expose an N+1 query problem. However it's not a problem + in practice, as there's usually always a "local" caching store in + the loop, and *DirectoryService* supports a recursive lookup for + all `Directory` children of a `Directory` diff --git a/tvix/store/protos/LICENSE b/tvix/store/protos/LICENSE new file mode 100644 index 000000000000..2034ada6fd9a --- /dev/null +++ b/tvix/store/protos/LICENSE @@ -0,0 +1,21 @@ +Copyright © The Tvix Authors + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +“Software”), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + diff --git a/tvix/store/protos/default.nix b/tvix/store/protos/default.nix new file mode 100644 index 000000000000..56345d9338b2 --- /dev/null +++ b/tvix/store/protos/default.nix @@ -0,0 +1,55 @@ +{ depot, pkgs, ... }: +let + protos = depot.nix.sparseTree { + name = "store-protos"; + root = depot.path.origSrc; + paths = [ + # We need to include castore.proto (only), as it's referred. + ../../castore/protos/castore.proto + ./pathinfo.proto + ./rpc_pathinfo.proto + ../../../buf.yaml + ../../../buf.gen.yaml + ]; + }; +in +depot.nix.readTree.drvTargets { + inherit protos; + + # Lints and ensures formatting of the proto files. + check = pkgs.stdenv.mkDerivation { + name = "proto-check"; + src = protos; + + nativeBuildInputs = [ + pkgs.buf + ]; + + buildPhase = '' + export HOME=$TMPDIR + buf lint + buf format -d --exit-code + touch $out + ''; + }; + + # Produces the golang bindings. + go-bindings = pkgs.stdenv.mkDerivation { + name = "go-bindings"; + src = protos; + + nativeBuildInputs = [ + pkgs.buf + pkgs.protoc-gen-go + pkgs.protoc-gen-go-grpc + ]; + + buildPhase = '' + export HOME=$TMPDIR + buf generate + + mkdir -p $out + cp tvix/store/protos/*.pb.go $out/ + ''; + }; +} diff --git a/tvix/store/protos/pathinfo.proto b/tvix/store/protos/pathinfo.proto new file mode 100644 index 000000000000..b03e7e938e33 --- /dev/null +++ b/tvix/store/protos/pathinfo.proto @@ -0,0 +1,128 @@ +// SPDX-License-Identifier: MIT +// Copyright © 2022 The Tvix Authors +syntax = "proto3"; + +package tvix.store.v1; + +import "tvix/castore/protos/castore.proto"; + +option go_package = "code.tvl.fyi/tvix/store-go;storev1"; + +// PathInfo shows information about a Nix Store Path. +// That's a single element inside /nix/store. +message PathInfo { + // The path can be a directory, file or symlink. + tvix.castore.v1.Node node = 1; + + // List of references (output path hashes) + // This really is the raw *bytes*, after decoding nixbase32, and not a + // base32-encoded string. + repeated bytes references = 2; + + // see below. + NARInfo narinfo = 3; +} + +// Represents a path in the Nix store (a direct child of STORE_DIR). +// It is commonly formatted by a nixbase32-encoding the digest, and +// concatenating the name, separated by a `-`. +message StorePath { + // The string after digest and `-`. + string name = 1; + + // The digest (20 bytes). + bytes digest = 2; +} + +// Nix C++ uses NAR (Nix Archive) as a format to transfer store paths, +// and stores metadata and signatures in NARInfo files. +// Store all these attributes in a separate message. +// +// This is useful to render .narinfo files to clients, or to preserve/validate +// these signatures. +// As verifying these signatures requires the whole NAR file to be synthesized, +// moving to another signature scheme is desired. +// Even then, it still makes sense to hold this data, for old clients. +message NARInfo { + // This represents a (parsed) signature line in a .narinfo file. + message Signature { + string name = 1; + bytes data = 2; + } + + // This size of the NAR file, in bytes. + uint64 nar_size = 1; + + // The sha256 of the NAR file representation. + bytes nar_sha256 = 2; + + // The signatures in a .narinfo file. + repeated Signature signatures = 3; + + // A list of references. To validate .narinfo signatures, a fingerprint needs + // to be constructed. + // This fingerprint doesn't just contain the hashes of the output paths of all + // references (like PathInfo.references), but their whole (base)names, so we + // need to keep them somewhere. + repeated string reference_names = 4; + + // The StorePath of the .drv file producing this output. + // The .drv suffix is omitted in its `name` field. + StorePath deriver = 5; + + // The CA field in the .narinfo. + // Its textual representations seen in the wild are one of the following: + // - `fixed:r:sha256:1gcky5hlf5vqfzpyhihydmm54grhc94mcs8w7xr8613qsqb1v2j6` + // fixed-output derivations using "recursive" `outputHashMode`. + // - `fixed:sha256:19xqkh72crbcba7flwxyi3n293vav6d7qkzkh2v4zfyi4iia8vj8 + // fixed-output derivations using "flat" `outputHashMode` + // - `text:sha256:19xqkh72crbcba7flwxyi3n293vav6d7qkzkh2v4zfyi4iia8vj8` + // Text hashing, used for uploaded .drv files and outputs produced by + // builtins.toFile. + // + // Semantically, they can be split into the following components: + // - "content address prefix". Currently, "fixed" and "text" are supported. + // - "hash mode". Currently, "flat" and "recursive" are supported. + // - "hash type". The underlying hash function used. + // Currently, sha1, md5, sha256, sha512. + // - "digest". The digest itself. + // + // There are some restrictions on the possible combinations. + // For example, `text` and `fixed:recursive` always imply sha256. + // + // We use an enum to encode the possible combinations, and optimize for the + // common case, `fixed:recursive`, identified as `NAR_SHA256`. + CA ca = 6; + + message CA { + enum Hash { + // produced when uploading fixed-output store paths using NAR-based + // hashing (`outputHashMode = "recursive"`). + NAR_SHA256 = 0; + NAR_SHA1 = 1; + NAR_SHA512 = 2; + NAR_MD5 = 3; + + // Produced when uploading .drv files or outputs produced by + // builtins.toFile. + // Produces equivalent digests as FLAT_SHA256, but is a separate + // hashing type in Nix, affecting output path calculation. + TEXT_SHA256 = 4; + + // Produced when using fixed-output derivations with + // `outputHashMode = "flat"`. + FLAT_SHA1 = 5; + FLAT_MD5 = 6; + FLAT_SHA256 = 7; + FLAT_SHA512 = 8; + + // TODO: what happens in Rust if we introduce a new enum kind here? + } + + // The hashing type used. + Hash type = 1; + + // The digest, in raw bytes. + bytes digest = 2; + } +} diff --git a/tvix/store/protos/rpc_pathinfo.proto b/tvix/store/protos/rpc_pathinfo.proto new file mode 100644 index 000000000000..c1c91658ada2 --- /dev/null +++ b/tvix/store/protos/rpc_pathinfo.proto @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: MIT +// Copyright © 2022 The Tvix Authors +syntax = "proto3"; + +package tvix.store.v1; + +import "tvix/castore/protos/castore.proto"; +import "tvix/store/protos/pathinfo.proto"; + +option go_package = "code.tvl.fyi/tvix/store-go;storev1"; + +service PathInfoService { + // Return a PathInfo message matching the criteria specified in the + // GetPathInfoRequest message. + rpc Get(GetPathInfoRequest) returns (PathInfo); + + // Upload a PathInfo object to the remote end. It MUST not return until the + // PathInfo object has been written on the the remote end. + // + // The remote end MAY check if a potential DirectoryNode has already been + // uploaded. + // + // Uploading clients SHOULD obviously not steer other machines to try to + // substitute before from the remote end before having finished uploading + // PathInfo, Directories and Blobs. + // The returned PathInfo object MAY contain additional narinfo signatures, but + // is otherwise left untouched. + rpc Put(PathInfo) returns (PathInfo); + + // Calculate the NAR representation of the contents specified by the + // root_node. The calculation SHOULD be cached server-side for subsequent + // requests. + // + // All references (to blobs or Directory messages) MUST already exist in the + // store. + // + // The method can be used to produce a Nix fixed-output path, which contains + // the (compressed) sha256 of the NAR content representation in the root_node + // name (suffixed with the name). + // + // It can also be used to calculate arbitrary NAR hashes of output paths, in + // case a legacy Nix Binary Cache frontend is provided. + rpc CalculateNAR(tvix.castore.v1.Node) returns (CalculateNARResponse); + + // Return a stream of PathInfo messages matching the criteria specified in + // ListPathInfoRequest. + rpc List(ListPathInfoRequest) returns (stream PathInfo); +} + +// The parameters that can be used to lookup a (single) PathInfo object. +// Currently, only a lookup by output hash is supported. +message GetPathInfoRequest { + oneof by_what { + // The output hash of a nix path (20 bytes). + // This is the nixbase32-decoded portion of a Nix output path, so to substitute + // /nix/store/xm35nga2g20mz5sm5l6n8v3bdm86yj83-cowsay-3.04 + // this field would contain nixbase32dec("xm35nga2g20mz5sm5l6n8v3bdm86yj83"). + bytes by_output_hash = 1; + } +} + +// The parameters that can be used to lookup (multiple) PathInfo objects. +// Currently no filtering is possible, all objects are returned. +message ListPathInfoRequest {} + +// CalculateNARResponse is the response returned by the CalculateNAR request. +// +// It contains the size of the NAR representation (in bytes), and the sha56 +// digest. +message CalculateNARResponse { + // This size of the NAR file, in bytes. + uint64 nar_size = 1; + + // The sha256 of the NAR file representation. + bytes nar_sha256 = 2; +} diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs new file mode 100644 index 000000000000..ecee8d78f3b7 --- /dev/null +++ b/tvix/store/src/bin/tvix-store.rs @@ -0,0 +1,401 @@ +use clap::Parser; +use clap::Subcommand; + +use futures::future::try_join_all; +use std::path::PathBuf; +use std::sync::Arc; +use tokio_listener::Listener; +use tokio_listener::SystemOptions; +use tokio_listener::UserOptions; +use tonic::transport::Server; +use tracing::info; +use tracing::Level; +use tracing_subscriber::fmt::writer::MakeWriterExt; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use tvix_castore::proto::blob_service_server::BlobServiceServer; +use tvix_castore::proto::directory_service_server::DirectoryServiceServer; +use tvix_castore::proto::GRPCBlobServiceWrapper; +use tvix_castore::proto::GRPCDirectoryServiceWrapper; +use tvix_store::pathinfoservice::PathInfoService; +use tvix_store::proto::path_info_service_server::PathInfoServiceServer; +use tvix_store::proto::GRPCPathInfoServiceWrapper; + +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +use tvix_store::pathinfoservice::make_fs; + +#[cfg(feature = "fuse")] +use tvix_castore::fs::fuse::FuseDaemon; + +#[cfg(feature = "otlp")] +use opentelemetry::KeyValue; +#[cfg(feature = "otlp")] +use opentelemetry_sdk::{ + resource::{ResourceDetector, SdkProvidedResourceDetector}, + trace::BatchConfig, + Resource, +}; + +#[cfg(feature = "virtiofs")] +use tvix_castore::fs::virtiofs::start_virtiofs_daemon; + +#[cfg(feature = "tonic-reflection")] +use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET; +#[cfg(feature = "tonic-reflection")] +use tvix_store::proto::FILE_DESCRIPTOR_SET; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// Whether to log in JSON + #[arg(long)] + json: bool, + + #[arg(long)] + log_level: Option<Level>, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Runs the tvix-store daemon. + Daemon { + #[arg(long, short = 'l')] + listen_address: Option<String>, + + #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")] + blob_service_addr: String, + + #[arg( + long, + env, + default_value = "sled:///var/lib/tvix-store/directories.sled" + )] + directory_service_addr: String, + + #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")] + path_info_service_addr: String, + }, + /// Imports a list of paths into the store, print the store path for each of them. + Import { + #[clap(value_name = "PATH")] + paths: Vec<PathBuf>, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + }, + /// Mounts a tvix-store at the given mountpoint + #[cfg(feature = "fuse")] + Mount { + #[clap(value_name = "PATH")] + dest: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// Number of FUSE threads to spawn. + #[arg(long, env, default_value_t = default_threads())] + threads: usize, + + /// Whether to list elements at the root of the mount point. + /// This is useful if your PathInfoService doesn't provide an + /// (exhaustive) listing. + #[clap(long, short, action)] + list_root: bool, + }, + /// Starts a tvix-store virtiofs daemon at the given socket path. + #[cfg(feature = "virtiofs")] + #[command(name = "virtiofs")] + VirtioFs { + #[clap(value_name = "PATH")] + socket: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// Whether to list elements at the root of the mount point. + /// This is useful if your PathInfoService doesn't provide an + /// (exhaustive) listing. + #[clap(long, short, action)] + list_root: bool, + }, +} + +#[cfg(all(feature = "fuse", not(target_os = "macos")))] +fn default_threads() -> usize { + std::thread::available_parallelism() + .map(|threads| threads.into()) + .unwrap_or(4) +} +// On MacFUSE only a single channel will receive ENODEV when the file system is +// unmounted and so all the other channels will block forever. +// See https://github.com/osxfuse/osxfuse/issues/974 +#[cfg(all(feature = "fuse", target_os = "macos"))] +fn default_threads() -> usize { + 1 +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + let cli = Cli::parse(); + + // configure log settings + let level = cli.log_level.unwrap_or(Level::INFO); + + let subscriber = tracing_subscriber::registry() + .with( + cli.json.then_some( + tracing_subscriber::fmt::Layer::new() + .with_writer(std::io::stderr.with_max_level(level)) + .json(), + ), + ) + .with( + (!cli.json).then_some( + tracing_subscriber::fmt::Layer::new() + .with_writer(std::io::stderr.with_max_level(level)) + .pretty(), + ), + ); + + // Add the otlp layer (when otlp is enabled), then init the registry. + // Or if the feature is disabled, just init without adding the layer. + // It's necessary to do this separately, as every with() call chains the + // layer into the type of the registry. + #[cfg(feature = "otlp")] + { + subscriber + .with({ + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) + .with_batch_config(BatchConfig::default()) + .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ + // use SdkProvidedResourceDetector.detect to detect resources, + // but replace the default service name with our default. + // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 + let resources = + SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); + + // SdkProvidedResourceDetector currently always sets + // `service.name`, but we don't like its default. + if resources.get("service.name".into()).unwrap() == "unknown_service".into() + { + resources.merge(&Resource::new([KeyValue::new( + "service.name", + "tvix.store", + )])) + } else { + resources + } + })) + .install_batch(opentelemetry_sdk::runtime::Tokio)?; + + // Create a tracing layer with the configured tracer + tracing_opentelemetry::layer().with_tracer(tracer) + }) + .try_init()?; + } + + // Init the registry (when otlp is not enabled) + #[cfg(not(feature = "otlp"))] + { + subscriber.try_init()?; + } + + match cli.command { + Commands::Daemon { + listen_address, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + } => { + // initialize stores + let (blob_service, directory_service, path_info_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + let listen_address = listen_address + .unwrap_or_else(|| "[::]:8000".to_string()) + .parse() + .unwrap(); + + let mut server = Server::builder(); + + #[allow(unused_mut)] + let mut router = server + .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( + blob_service, + ))) + .add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::new(directory_service), + )) + .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( + Arc::from(path_info_service), + ))); + + #[cfg(feature = "tonic-reflection")] + { + let reflection_svc = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build()?; + router = router.add_service(reflection_svc); + } + + info!(listen_address=%listen_address, "starting daemon"); + + let listener = Listener::bind( + &listen_address, + &SystemOptions::default(), + &UserOptions::default(), + ) + .await?; + + router.serve_with_incoming(listener).await?; + } + Commands::Import { + paths, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + } => { + // FUTUREWORK: allow flat for single files? + let (blob_service, directory_service, path_info_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + // Arc the PathInfoService, as we clone it . + let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + + let tasks = paths + .into_iter() + .map(|path| { + tokio::task::spawn({ + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + let path_info_service = path_info_service.clone(); + + async move { + if let Ok(name) = tvix_store::import::path_to_name(&path) { + let resp = tvix_store::import::import_path_as_nar_ca( + &path, + name, + blob_service, + directory_service, + path_info_service, + ) + .await; + if let Ok(output_path) = resp { + // If the import was successful, print the path to stdout. + println!("{}", output_path.to_absolute_path()); + } + } + } + }) + }) + .collect::<Vec<_>>(); + + try_join_all(tasks).await?; + } + #[cfg(feature = "fuse")] + Commands::Mount { + dest, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + list_root, + threads, + } => { + let (blob_service, directory_service, path_info_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + let mut fuse_daemon = tokio::task::spawn_blocking(move || { + let fs = make_fs( + blob_service, + directory_service, + Arc::from(path_info_service), + list_root, + ); + info!(mount_path=?dest, "mounting"); + + FuseDaemon::new(fs, &dest, threads) + }) + .await??; + + // grab a handle to unmount the file system, and register a signal + // handler. + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + info!("interrupt received, unmounting…"); + tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; + info!("unmount occured, terminating…"); + Ok::<_, std::io::Error>(()) + }) + .await??; + } + #[cfg(feature = "virtiofs")] + Commands::VirtioFs { + socket, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + list_root, + } => { + let (blob_service, directory_service, path_info_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + tokio::task::spawn_blocking(move || { + let fs = make_fs( + blob_service, + directory_service, + Arc::from(path_info_service), + list_root, + ); + info!(socket_path=?socket, "starting virtiofs-daemon"); + + start_virtiofs_daemon(fs, socket) + }) + .await??; + } + }; + Ok(()) +} diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs new file mode 100644 index 000000000000..588a0108931e --- /dev/null +++ b/tvix/store/src/import.rs @@ -0,0 +1,156 @@ +use std::path::Path; + +use data_encoding::BASE64; +use tracing::{debug, instrument}; +use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, proto::node::Node, +}; + +use nix_compat::store_path::{self, StorePath}; + +use crate::{ + pathinfoservice::PathInfoService, + proto::{nar_info, NarInfo, PathInfo}, +}; + +pub fn log_node(node: &Node, path: &Path) { + match node { + Node::Directory(directory_node) => { + debug!( + path = ?path, + name = ?directory_node.name, + digest = BASE64.encode(&directory_node.digest), + "import successful", + ) + } + Node::File(file_node) => { + debug!( + path = ?path, + name = ?file_node.name, + digest = BASE64.encode(&file_node.digest), + "import successful" + ) + } + Node::Symlink(symlink_node) => { + debug!( + path = ?path, + name = ?symlink_node.name, + target = ?symlink_node.target, + "import successful" + ) + } + } +} + +/// Transform a path into its base name and returns an [`std::io::Error`] if it is `..` or if the +/// basename is not valid unicode. +#[inline] +pub fn path_to_name(path: &Path) -> std::io::Result<&str> { + path.file_name() + .and_then(|file_name| file_name.to_str()) + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "path must not be .. and the basename valid unicode", + ) + }) +} + +/// Takes the NAR size, SHA-256 of the NAR representation and the root node. +/// Returns the path information object for a content addressed NAR-style (recursive) object. +/// +/// This [`PathInfo`] can be further filled for signatures, deriver or verified for the expected +/// hashes. +#[inline] +pub fn derive_nar_ca_path_info(nar_size: u64, nar_sha256: [u8; 32], root_node: Node) -> PathInfo { + // assemble the [crate::proto::PathInfo] object. + PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node), + }), + // There's no reference scanning on path contents ingested like this. + references: vec![], + narinfo: Some(NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: vec![], + deriver: None, + ca: Some(nar_info::Ca { + r#type: nar_info::ca::Hash::NarSha256.into(), + digest: nar_sha256.to_vec().into(), + }), + }), + } +} + +/// Ingest the given path [`path`] and register the resulting output path in the +/// [`PathInfoService`] as a recursive fixed output NAR. +#[instrument(skip_all, fields(store_name=name, path=?path), err)] +pub async fn import_path_as_nar_ca<BS, DS, PS, P>( + path: P, + name: &str, + blob_service: BS, + directory_service: DS, + path_info_service: PS, +) -> Result<StorePath, std::io::Error> +where + P: AsRef<Path> + std::fmt::Debug, + BS: AsRef<dyn BlobService> + Clone, + DS: AsRef<dyn DirectoryService>, + PS: AsRef<dyn PathInfoService>, +{ + let root_node = + tvix_castore::import::ingest_path(blob_service, directory_service, &path).await?; + + // Ask the PathInfoService for the NAR size and sha256 + let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?; + + // Calculate the output path. This might still fail, as some names are illegal. + // FUTUREWORK: express the `name` at the type level to be valid and move the conversion + // at the caller level. + let output_path = store_path::build_nar_based_store_path(&nar_sha256, name).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid name: {}", name), + ) + })?; + + // assemble a new root_node with a name that is derived from the nar hash. + let root_node = root_node.rename(output_path.to_string().into_bytes().into()); + log_node(&root_node, path.as_ref()); + + let path_info = derive_nar_ca_path_info(nar_size, nar_sha256, root_node); + + // This new [`PathInfo`] that we get back from there might contain additional signatures or + // information set by the service itself. In this function, we silently swallow it because + // callers doesn't really need it. + let _path_info = path_info_service.as_ref().put(path_info).await?; + + Ok(output_path.to_owned()) +} + +#[cfg(test)] +mod tests { + use std::{ffi::OsStr, path::PathBuf}; + + use crate::import::path_to_name; + use test_case::test_case; + + #[test_case("a/b/c", "c"; "simple path")] + #[test_case("a/b/../c", "c"; "simple path containing ..")] + #[test_case("a/b/../c/d/../e", "e"; "path containing multiple ..")] + + fn test_path_to_name(path: &str, expected_name: &str) { + let path: PathBuf = path.into(); + assert_eq!(path_to_name(&path).expect("must succeed"), expected_name); + } + + #[test_case(b"a/b/.."; "path ending in ..")] + #[test_case(b"\xf8\xa1\xa1\xa1\xa1"; "non unicode path")] + + fn test_invalid_path_to_name(invalid_path: &[u8]) { + let path: PathBuf = unsafe { OsStr::from_encoded_bytes_unchecked(invalid_path) }.into(); + path_to_name(&path).expect_err("must fail"); + } +} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs new file mode 100644 index 000000000000..2fa86ff6a468 --- /dev/null +++ b/tvix/store/src/lib.rs @@ -0,0 +1,8 @@ +pub mod import; +pub mod nar; +pub mod pathinfoservice; +pub mod proto; +pub mod utils; + +#[cfg(test)] +mod tests; diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs new file mode 100644 index 000000000000..20b922a61f2e --- /dev/null +++ b/tvix/store/src/nar/import.rs @@ -0,0 +1,351 @@ +use bytes::Bytes; +use nix_compat::nar; +use std::io::{self, BufRead}; +use tokio_util::io::SyncIoBridge; +use tracing::warn; +use tvix_castore::{ + blobservice::BlobService, + directoryservice::{DirectoryPutter, DirectoryService}, + proto::{self as castorepb}, + B3Digest, +}; + +/// Accepts a reader providing a NAR. +/// Will traverse it, uploading blobs to the given [BlobService], and +/// directories to the given [DirectoryService]. +/// On success, the root node is returned. +/// This function is not async (because the NAR reader is not) +/// and calls [tokio::task::block_in_place] when interacting with backing +/// services, so make sure to only call this with spawn_blocking. +pub fn read_nar<R, BS, DS>( + r: &mut R, + blob_service: BS, + directory_service: DS, +) -> io::Result<castorepb::node::Node> +where + R: BufRead + Send, + BS: AsRef<dyn BlobService>, + DS: AsRef<dyn DirectoryService>, +{ + let handle = tokio::runtime::Handle::current(); + + let directory_putter = directory_service.as_ref().put_multiple_start(); + + let node = nix_compat::nar::reader::open(r)?; + let (root_node, mut directory_putter, _) = process_node( + handle.clone(), + "".into(), // this is the root node, it has an empty name + node, + &blob_service, + directory_putter, + )?; + + // In case the root node points to a directory, we need to close + // [directory_putter], and ensure the digest we got back from there matches + // what the root node is pointing to. + if let castorepb::node::Node::Directory(ref directory_node) = root_node { + // Close directory_putter to make sure all directories have been inserted. + let directory_putter_digest = + handle.block_on(handle.spawn(async move { directory_putter.close().await }))??; + let root_directory_node_digest: B3Digest = + directory_node.digest.clone().try_into().unwrap(); + + if directory_putter_digest != root_directory_node_digest { + warn!( + root_directory_node_digest = %root_directory_node_digest, + directory_putter_digest =%directory_putter_digest, + "directory digest mismatch", + ); + return Err(io::Error::new( + io::ErrorKind::Other, + "directory digest mismatch", + )); + } + } + // In case it's not a Directory, [directory_putter] doesn't need to be + // closed (as we didn't end up uploading anything). + // It can just be dropped, as documented in its trait. + + Ok(root_node) +} + +/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node]. +/// It does so by handling all three kinds, and recursing for directories. +/// +/// [DirectoryPutter] is passed around, so a single instance of it can be used, +/// which is sufficient, as this reads through the whole NAR linerarly. +fn process_node<BS>( + handle: tokio::runtime::Handle, + name: bytes::Bytes, + node: nar::reader::Node, + blob_service: BS, + directory_putter: Box<dyn DirectoryPutter>, +) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)> +where + BS: AsRef<dyn BlobService>, +{ + Ok(match node { + nar::reader::Node::Symlink { target } => ( + castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name, + target: target.into(), + }), + directory_putter, + blob_service, + ), + nar::reader::Node::File { executable, reader } => ( + castorepb::node::Node::File(process_file_reader( + handle, + name, + reader, + executable, + &blob_service, + )?), + directory_putter, + blob_service, + ), + nar::reader::Node::Directory(dir_reader) => { + let (directory_node, directory_putter, blob_service_back) = + process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; + + ( + castorepb::node::Node::Directory(directory_node), + directory_putter, + blob_service_back, + ) + } + }) +} + +/// Given a name and [nar::reader::FileReader], this ingests the file into the +/// passed [BlobService] and returns a [castorepb::FileNode]. +fn process_file_reader<BS>( + handle: tokio::runtime::Handle, + name: Bytes, + mut file_reader: nar::reader::FileReader, + executable: bool, + blob_service: BS, +) -> io::Result<castorepb::FileNode> +where + BS: AsRef<dyn BlobService>, +{ + // store the length. If we read any other length, reading will fail. + let expected_len = file_reader.len(); + + // prepare writing a new blob. + let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await }); + + // write the blob. + let mut blob_writer = { + let mut dst = SyncIoBridge::new(blob_writer); + + file_reader.copy(&mut dst)?; + dst.shutdown()?; + + // return back the blob_writer + dst.into_inner() + }; + + // close the blob_writer, retrieve the digest. + let blob_digest = handle.block_on(async { blob_writer.close().await })?; + + Ok(castorepb::FileNode { + name, + digest: blob_digest.into(), + size: expected_len, + executable, + }) +} + +/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode]. +/// It uses [process_node] to iterate over all children. +/// +/// [DirectoryPutter] is passed around, so a single instance of it can be used, +/// which is sufficient, as this reads through the whole NAR linerarly. +fn process_dir_reader<BS>( + handle: tokio::runtime::Handle, + name: Bytes, + mut dir_reader: nar::reader::DirReader, + blob_service: BS, + directory_putter: Box<dyn DirectoryPutter>, +) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)> +where + BS: AsRef<dyn BlobService>, +{ + let mut directory = castorepb::Directory::default(); + + let mut directory_putter = directory_putter; + let mut blob_service = blob_service; + while let Some(entry) = dir_reader.next()? { + let (node, directory_putter_back, blob_service_back) = process_node( + handle.clone(), + entry.name.into(), + entry.node, + blob_service, + directory_putter, + )?; + + blob_service = blob_service_back; + directory_putter = directory_putter_back; + + match node { + castorepb::node::Node::Directory(node) => directory.directories.push(node), + castorepb::node::Node::File(node) => directory.files.push(node), + castorepb::node::Node::Symlink(node) => directory.symlinks.push(node), + } + } + + // calculate digest and size. + let directory_digest = directory.digest(); + let directory_size = directory.size(); + + // upload the directory. This is a bit more verbose, as we want to get back + // directory_putter for later reuse. + let directory_putter = handle.block_on(handle.spawn(async move { + directory_putter.put(directory).await?; + Ok::<_, io::Error>(directory_putter) + }))??; + + Ok(( + castorepb::DirectoryNode { + name, + digest: directory_digest.into(), + size: directory_size, + }, + directory_putter, + blob_service, + )) +} + +#[cfg(test)] +mod test { + use crate::nar::read_nar; + use std::io::Cursor; + use std::sync::Arc; + + use tokio_stream::StreamExt; + use tvix_castore::blobservice::BlobService; + use tvix_castore::directoryservice::DirectoryService; + use tvix_castore::fixtures::{ + DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS, + HELLOWORLD_BLOB_DIGEST, + }; + use tvix_castore::proto as castorepb; + use tvix_castore::utils::{gen_blob_service, gen_directory_service}; + + use crate::tests::fixtures::{ + NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK, + }; + + #[tokio::test] + async fn single_symlink() { + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + + let handle = tokio::runtime::Handle::current(); + + let root_node = handle + .spawn_blocking(|| { + read_nar( + &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), + blob_service, + directory_service, + ) + }) + .await + .unwrap() + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "".into(), // name must be empty + target: "/nix/store/somewhereelse".into(), + }), + root_node + ); + } + + #[tokio::test] + async fn single_file() { + let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); + let directory_service = gen_directory_service(); + + let handle = tokio::runtime::Handle::current(); + + let root_node = handle + .spawn_blocking({ + let blob_service = blob_service.clone(); + move || { + read_nar( + &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), + blob_service, + directory_service, + ) + } + }) + .await + .unwrap() + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::File(castorepb::FileNode { + name: "".into(), // name must be empty + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: false, + }), + root_node + ); + + // blobservice must contain the blob + assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap()); + } + + #[tokio::test] + async fn complicated() { + let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); + let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); + + let handle = tokio::runtime::Handle::current(); + + let root_node = handle + .spawn_blocking({ + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + || { + read_nar( + &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), + blob_service, + directory_service, + ) + } + }) + .await + .unwrap() + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: "".into(), // name must be empty + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + root_node, + ); + + // blobservice must contain the blob + assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap()); + + // directoryservice must contain the directories, at least with get_recursive. + let resp: Result<Vec<castorepb::Directory>, _> = directory_service + .get_recursive(&DIRECTORY_COMPLICATED.digest()) + .collect() + .await; + + let directories = resp.unwrap(); + + assert_eq!(2, directories.len()); + assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]); + assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]); + } +} diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs new file mode 100644 index 000000000000..c1a7fc2a933f --- /dev/null +++ b/tvix/store/src/nar/mod.rs @@ -0,0 +1,27 @@ +use data_encoding::BASE64; +use tvix_castore::B3Digest; + +mod import; +mod renderer; +pub use import::read_nar; +pub use renderer::calculate_size_and_sha256; +pub use renderer::write_nar; + +/// Errors that can encounter while rendering NARs. +#[derive(Debug, thiserror::Error)] +pub enum RenderError { + #[error("failure talking to a backing store client: {0}")] + StoreError(#[source] std::io::Error), + + #[error("unable to find directory {}, referred from {:?}", .0, .1)] + DirectoryNotFound(B3Digest, bytes::Bytes), + + #[error("unable to find blob {}, referred from {:?}", BASE64.encode(.0), .1)] + BlobNotFound([u8; 32], bytes::Bytes), + + #[error("unexpected size in metadata for blob {}, referred from {:?} returned, expected {}, got {}", BASE64.encode(.0), .1, .2, .3)] + UnexpectedBlobMeta([u8; 32], bytes::Bytes, u32, u32), + + #[error("failure using the NAR writer: {0}")] + NARWriterError(std::io::Error), +} diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs new file mode 100644 index 000000000000..313397dcf31b --- /dev/null +++ b/tvix/store/src/nar/renderer.rs @@ -0,0 +1,216 @@ +use super::RenderError; +use async_recursion::async_recursion; +use count_write::CountWrite; +use nix_compat::nar::writer::r#async as nar_writer; +use sha2::{Digest, Sha256}; +use std::{ + pin::Pin, + task::{self, Poll}, +}; +use tokio::io::{self, AsyncWrite, BufReader}; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; +use tvix_castore::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{self as castorepb, NamedNode}, +}; + +/// Invoke [write_nar], and return the size and sha256 digest of the produced +/// NAR output. +pub async fn calculate_size_and_sha256<BS, DS>( + root_node: &castorepb::node::Node, + blob_service: BS, + directory_service: DS, +) -> Result<(u64, [u8; 32]), RenderError> +where + BS: AsRef<dyn BlobService> + Send, + DS: AsRef<dyn DirectoryService> + Send, +{ + let mut h = Sha256::new(); + let mut cw = CountWrite::from(&mut h); + + write_nar( + // The hasher doesn't speak async. It doesn't + // actually do any I/O, so it's fine to wrap. + AsyncIoBridge(&mut cw), + root_node, + blob_service, + directory_service, + ) + .await?; + + Ok((cw.count(), h.finalize().into())) +} + +/// The inverse of [tokio_util::io::SyncIoBridge]. +/// Don't use this with anything that actually does blocking I/O. +struct AsyncIoBridge<T>(T); + +impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + Poll::Ready(self.get_mut().0.write(buf)) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(self.get_mut().0.flush()) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + ) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } +} + +/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path, +/// and uses the passed blob_service and directory_service to perform the +/// necessary lookups as it traverses the structure. +/// The contents in NAR serialization are writen to the passed [AsyncWrite]. +pub async fn write_nar<W, BS, DS>( + w: W, + proto_root_node: &castorepb::node::Node, + blob_service: BS, + directory_service: DS, +) -> Result<(), RenderError> +where + W: AsyncWrite + Unpin + Send, + BS: AsRef<dyn BlobService> + Send, + DS: AsRef<dyn DirectoryService> + Send, +{ + // Initialize NAR writer + let mut w = w.compat_write(); + let nar_root_node = nar_writer::open(&mut w) + .await + .map_err(RenderError::NARWriterError)?; + + walk_node( + nar_root_node, + proto_root_node, + blob_service, + directory_service, + ) + .await?; + + Ok(()) +} + +/// Process an intermediate node in the structure. +/// This consumes the node. +#[async_recursion] +async fn walk_node<BS, DS>( + nar_node: nar_writer::Node<'async_recursion, '_>, + proto_node: &castorepb::node::Node, + blob_service: BS, + directory_service: DS, +) -> Result<(BS, DS), RenderError> +where + BS: AsRef<dyn BlobService> + Send, + DS: AsRef<dyn DirectoryService> + Send, +{ + match proto_node { + castorepb::node::Node::Symlink(proto_symlink_node) => { + nar_node + .symlink(&proto_symlink_node.target) + .await + .map_err(RenderError::NARWriterError)?; + } + castorepb::node::Node::File(proto_file_node) => { + let digest_len = proto_file_node.digest.len(); + let digest = proto_file_node.digest.clone().try_into().map_err(|_| { + RenderError::StoreError(io::Error::new( + io::ErrorKind::Other, + format!("invalid digest len {} in file node", digest_len), + )) + })?; + + let blob_reader = match blob_service + .as_ref() + .open_read(&digest) + .await + .map_err(RenderError::StoreError)? + { + Some(blob_reader) => Ok(BufReader::new(blob_reader)), + None => Err(RenderError::NARWriterError(io::Error::new( + io::ErrorKind::NotFound, + format!("blob with digest {} not found", &digest), + ))), + }?; + + nar_node + .file( + proto_file_node.executable, + proto_file_node.size, + &mut blob_reader.compat(), + ) + .await + .map_err(RenderError::NARWriterError)?; + } + castorepb::node::Node::Directory(proto_directory_node) => { + let digest_len = proto_directory_node.digest.len(); + let digest = proto_directory_node + .digest + .clone() + .try_into() + .map_err(|_| { + RenderError::StoreError(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid digest len {} in directory node", digest_len), + )) + })?; + + // look it up with the directory service + match directory_service + .as_ref() + .get(&digest) + .await + .map_err(|e| RenderError::StoreError(e.into()))? + { + // if it's None, that's an error! + None => Err(RenderError::DirectoryNotFound( + digest, + proto_directory_node.name.clone(), + ))?, + Some(proto_directory) => { + // start a directory node + let mut nar_node_directory = nar_node + .directory() + .await + .map_err(RenderError::NARWriterError)?; + + // We put blob_service, directory_service back here whenever we come up from + // the recursion. + let mut blob_service = blob_service; + let mut directory_service = directory_service; + + // for each node in the directory, create a new entry with its name, + // and then recurse on that entry. + for proto_node in proto_directory.nodes() { + let child_node = nar_node_directory + .entry(proto_node.get_name()) + .await + .map_err(RenderError::NARWriterError)?; + + (blob_service, directory_service) = + walk_node(child_node, &proto_node, blob_service, directory_service) + .await?; + } + + // close the directory + nar_node_directory + .close() + .await + .map_err(RenderError::NARWriterError)?; + + return Ok((blob_service, directory_service)); + } + } + } + } + + Ok((blob_service, directory_service)) +} diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs new file mode 100644 index 000000000000..5113b19c1535 --- /dev/null +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -0,0 +1,187 @@ +use crate::proto::path_info_service_client::PathInfoServiceClient; + +use super::{ + GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService, + SledPathInfoService, +}; + +use nix_compat::narinfo; +use std::sync::Arc; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; +use url::Url; + +/// Constructs a new instance of a [PathInfoService] from an URI. +/// +/// The following URIs are supported: +/// - `memory:` +/// Uses a in-memory implementation. +/// - `sled:` +/// Uses a in-memory sled implementation. +/// - `sled:///absolute/path/to/somewhere` +/// Uses sled, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. +/// - `nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=` +/// Exposes the Nix binary cache as a PathInfoService, ingesting NARs into the +/// {Blob,Directory}Service. You almost certainly want to use this with some cache. +/// The `trusted-public-keys` URL parameter can be provided, which will then +/// enable signature verification. +/// - `grpc+unix:///absolute/path/to/somewhere` +/// Connects to a local tvix-store gRPC service via Unix socket. +/// - `grpc+http://host:port`, `grpc+https://host:port` +/// Connects to a (remote) tvix-store gRPC service. +/// +/// As the [PathInfoService] needs to talk to [BlobService] and [DirectoryService], +/// these also need to be passed in. +pub async fn from_addr( + uri: &str, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) -> Result<Box<dyn PathInfoService>, Error> { + let url = + Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; + + Ok(if url.scheme() == "memory" { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string())); + } + Box::new(MemoryPathInfoService::new(blob_service, directory_service)) + } else if url.scheme() == "sled" { + // sled doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string())); + } + + if url.path() == "/" { + return Err(Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )); + } + + // TODO: expose other parameters as URL parameters? + + if url.path().is_empty() { + return Ok(Box::new( + SledPathInfoService::new_temporary(blob_service, directory_service) + .map_err(|e| Error::StorageError(e.to_string()))?, + )); + } + return Ok(Box::new( + SledPathInfoService::new(url.path(), blob_service, directory_service) + .map_err(|e| Error::StorageError(e.to_string()))?, + )); + } else if url.scheme() == "nix+http" || url.scheme() == "nix+https" { + // Stringify the URL and remove the nix+ prefix. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap(); + + let mut nix_http_path_info_service = + NixHTTPPathInfoService::new(new_url, blob_service, directory_service); + + let pairs = &url.query_pairs(); + for (k, v) in pairs.into_iter() { + if k == "trusted-public-keys" { + let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect(); + + let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len()); + for pubkey_str in pubkey_strs { + pubkeys + .push(narinfo::PubKey::parse(pubkey_str).map_err(|e| { + Error::StorageError(format!("invalid public key: {e}")) + })?); + } + + nix_http_path_info_service.set_public_keys(pubkeys); + } + } + + Box::new(nix_http_path_info_service) + } else if url.scheme().starts_with("grpc+") { + // schemes starting with grpc+ go to the GRPCPathInfoService. + // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + let client = PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?); + Box::new(GRPCPathInfoService::from_client(client)) + } else { + Err(Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))? + }) +} + +#[cfg(test)] +mod tests { + use super::from_addr; + use lazy_static::lazy_static; + use tempfile::TempDir; + use test_case::test_case; + use tvix_castore::utils::{gen_blob_service, gen_directory_service}; + + lazy_static! { + static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); + } + + // the gRPC tests below don't fail, because we connect lazily. + + /// This uses a unsupported scheme. + #[test_case("http://foo.example/test", false; "unsupported scheme")] + /// This configures sled in temporary mode. + #[test_case("sled://", true; "sled valid temporary")] + /// This configures sled with /, which should fail. + #[test_case("sled:///", false; "sled invalid root")] + /// This configures sled with a host, not path, which should fail. + #[test_case("sled://foo.example", false; "sled invalid host")] + /// This configures sled with a valid path path, which should succeed. + #[test_case(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true; "sled valid path")] + /// This configures sled with a host, and a valid path path, which should fail. + #[test_case(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false; "sled invalid host with valid path")] + /// This correctly sets the scheme, and doesn't set a path. + #[test_case("memory://", true; "memory valid")] + /// This sets a memory url host to `foo` + #[test_case("memory://foo", false; "memory invalid host")] + /// This sets a memory url path to "/", which is invalid. + #[test_case("memory:///", false; "memory invalid root path")] + /// This sets a memory url path to "/foo", which is invalid. + #[test_case("memory:///foo", false; "memory invalid root path foo")] + /// Correct Scheme for the cache.nixos.org binary cache. + #[test_case("nix+https://cache.nixos.org", true; "correct nix+https")] + /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL). + #[test_case("nix+http://cache.nixos.org", true; "correct nix+http")] + /// Correct Scheme for Nix HTTP Binary cache, with a subpath. + #[test_case("nix+http://192.0.2.1/foo", true; "correct nix http with subpath")] + /// Correct Scheme for Nix HTTP Binary cache, with a subpath and port. + #[test_case("nix+http://[::1]:8080/foo", true; "correct nix http with subpath and port")] + /// Correct Scheme for the cache.nixos.org binary cache, and correct trusted public key set + #[test_case("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", true; "correct nix+https with trusted-public-key")] + /// Correct Scheme for the cache.nixos.org binary cache, and two correct trusted public keys set + #[test_case("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=%20foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", true; "correct nix+https with two trusted-public-key")] + /// Correct scheme to connect to a unix socket. + #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")] + /// Correct scheme for unix socket, but setting a host too, which is invalid. + #[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")] + /// Correct scheme to connect to localhost, with port 12345 + #[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[test_case("grpc+http://localhost", true; "grpc valid http host without port")] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[test_case("grpc+https://localhost", true; "grpc valid https host without port")] + /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. + #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")] + #[tokio::test] + async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) { + let resp = from_addr( + uri_str, + gen_blob_service().into(), + gen_directory_service().into(), + ) + .await; + + assert_eq!(resp.is_ok(), is_ok); + } +} diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs new file mode 100644 index 000000000000..45d59fd0bcb8 --- /dev/null +++ b/tvix/store/src/pathinfoservice/fs/mod.rs @@ -0,0 +1,79 @@ +use futures::stream::BoxStream; +use futures::StreamExt; +use tonic::async_trait; +use tvix_castore::fs::{RootNodes, TvixStoreFs}; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; + +use super::PathInfoService; + +/// Helper to construct a [TvixStoreFs] from a [BlobService], [DirectoryService] +/// and [PathInfoService]. +/// This avoids users to have to interact with the wrapper struct directly, as +/// it leaks into the type signature of TvixStoreFS. +pub fn make_fs<BS, DS, PS>( + blob_service: BS, + directory_service: DS, + path_info_service: PS, + list_root: bool, +) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>> +where + BS: AsRef<dyn BlobService> + Send + Clone + 'static, + DS: AsRef<dyn DirectoryService> + Send + Clone + 'static, + PS: AsRef<dyn PathInfoService> + Send + Sync + Clone + 'static, +{ + TvixStoreFs::new( + blob_service, + directory_service, + RootNodesWrapper(path_info_service), + list_root, + ) +} + +/// Wrapper to satisfy Rust's orphan rules for trait implementations, as +/// RootNodes is coming from the [tvix-castore] crate. +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct RootNodesWrapper<T>(pub(crate) T); + +/// Implements root node lookup for any [PathInfoService]. This represents a flat +/// directory structure like /nix/store where each entry in the root filesystem +/// directory corresponds to a CA node. +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +#[async_trait] +impl<T> RootNodes for RootNodesWrapper<T> +where + T: AsRef<dyn PathInfoService> + Send + Sync, +{ + async fn get_by_basename(&self, name: &[u8]) -> Result<Option<castorepb::node::Node>, Error> { + let Ok(store_path) = nix_compat::store_path::StorePath::from_bytes(name) else { + return Ok(None); + }; + + Ok(self + .0 + .as_ref() + .get(*store_path.digest()) + .await? + .map(|path_info| { + path_info + .node + .expect("missing root node") + .node + .expect("empty node") + })) + } + + fn list(&self) -> BoxStream<Result<castorepb::node::Node, Error>> { + Box::pin(self.0.as_ref().list().map(|result| { + result.map(|path_info| { + path_info + .node + .expect("missing root node") + .node + .expect("empty node") + }) + })) + } +} diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs new file mode 100644 index 000000000000..4e15a5bb0b5b --- /dev/null +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -0,0 +1,204 @@ +use super::PathInfoService; +use crate::proto::{self, ListPathInfoRequest, PathInfo}; +use async_stream::try_stream; +use futures::stream::BoxStream; +use tonic::{async_trait, transport::Channel, Code}; +use tvix_castore::{proto as castorepb, Error}; + +/// Connects to a (remote) tvix-store PathInfoService over gRPC. +#[derive(Clone)] +pub struct GRPCPathInfoService { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, +} + +impl GRPCPathInfoService { + /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + ) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl PathInfoService for GRPCPathInfoService { + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let path_info = self + .grpc_client + .clone() + .get(proto::GetPathInfoRequest { + by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( + digest.to_vec().into(), + )), + }) + .await; + + match path_info { + Ok(path_info) => { + let path_info = path_info.into_inner(); + + path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid pathinfo: {}", e)))?; + + Ok(Some(path_info)) + } + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + let path_info = self + .grpc_client + .clone() + .put(path_info) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + Ok(path_info) + } + + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + let path_info = self + .grpc_client + .clone() + .calculate_nar(castorepb::Node { + node: Some(root_node.clone()), + }) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + let nar_sha256: [u8; 32] = path_info + .nar_sha256 + .to_vec() + .try_into() + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + + Ok((path_info.nar_size, nar_sha256)) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let mut grpc_client = self.grpc_client.clone(); + + let stream = try_stream! { + let resp = grpc_client.list(ListPathInfoRequest::default()).await; + + let mut stream = resp.map_err(|e| Error::StorageError(e.to_string()))?.into_inner(); + + loop { + match stream.message().await { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + Err(Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))?; + } + yield pathinfo + } + None => { + return; + }, + }, + Err(e) => Err(Error::StorageError(e.to_string()))?, + } + } + }; + + Box::pin(stream) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio_retry::strategy::ExponentialBackoff; + use tokio_retry::Retry; + use tokio_stream::wrappers::UnixListenerStream; + + use crate::pathinfoservice::MemoryPathInfoService; + use crate::proto::path_info_service_client::PathInfoServiceClient; + use crate::proto::GRPCPathInfoServiceWrapper; + use crate::tests::fixtures; + use crate::tests::utils::gen_blob_service; + use crate::tests::utils::gen_directory_service; + + use super::GRPCPathInfoService; + use super::PathInfoService; + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("daemon"); + + let path_clone = socket_path.clone(); + + // Spin up a server + tokio::spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = server.add_service( + crate::proto::path_info_service_server::PathInfoServiceServer::new( + GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new( + gen_blob_service(), + gen_directory_service(), + )) + as Box<dyn PathInfoService>), + ), + ); + router.serve_with_incoming(uds_stream).await + }); + + // wait for the socket to be created + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // prepare a client + let grpc_client = { + let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) + .expect("must parse"); + let client = PathInfoServiceClient::new( + tvix_castore::tonic::channel_from_url(&url) + .await + .expect("must succeed"), + ); + + GRPCPathInfoService::from_client(client) + }; + + let path_info = grpc_client + .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap()) + .await + .expect("must not be error"); + + assert!(path_info.is_none()); + } +} diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs new file mode 100644 index 000000000000..f8435dbbf809 --- /dev/null +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -0,0 +1,84 @@ +use super::PathInfoService; +use crate::{nar::calculate_size_and_sha256, proto::PathInfo}; +use futures::stream::{iter, BoxStream}; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; +use tonic::async_trait; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; + +pub struct MemoryPathInfoService<BS, DS> { + db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>, + + blob_service: BS, + directory_service: DS, +} + +impl<BS, DS> MemoryPathInfoService<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { + Self { + db: Default::default(), + blob_service, + directory_service, + } + } +} + +#[async_trait] +impl<BS, DS> PathInfoService for MemoryPathInfoService<BS, DS> +where + BS: AsRef<dyn BlobService> + Send + Sync, + DS: AsRef<dyn DirectoryService> + Send + Sync, +{ + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let db = self.db.read().unwrap(); + + match db.get(&digest) { + None => Ok(None), + Some(path_info) => Ok(Some(path_info.clone())), + } + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + match path_info.validate() { + Err(e) => Err(Error::InvalidRequest(format!( + "failed to validate PathInfo: {}", + e + ))), + + // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. + // This overwrites existing PathInfo objects. + Ok(nix_path) => { + let mut db = self.db.write().unwrap(); + db.insert(*nix_path.digest(), path_info.clone()); + + Ok(path_info) + } + } + } + + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) + .await + .map_err(|e| Error::StorageError(e.to_string())) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let db = self.db.read().unwrap(); + + // Copy all elements into a list. + // This is a bit ugly, because we can't have db escape the lifetime + // of this function, but elements need to be returned owned anyways, and this in- + // memory impl is only for testing purposes anyways. + let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + + Box::pin(iter(items)) + } +} diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs new file mode 100644 index 000000000000..55ada92b7e88 --- /dev/null +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -0,0 +1,52 @@ +mod from_addr; +mod grpc; +mod memory; +mod nix_http; +mod sled; + +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +mod fs; + +use futures::stream::BoxStream; +use tonic::async_trait; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; + +use crate::proto::PathInfo; + +pub use self::from_addr::from_addr; +pub use self::grpc::GRPCPathInfoService; +pub use self::memory::MemoryPathInfoService; +pub use self::nix_http::NixHTTPPathInfoService; +pub use self::sled::SledPathInfoService; + +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +pub use self::fs::make_fs; + +/// The base trait all PathInfo services need to implement. +#[async_trait] +pub trait PathInfoService: Send + Sync { + /// Retrieve a PathInfo message by the output digest. + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error>; + + /// Store a PathInfo message. Implementations MUST call validate and reject + /// invalid messages. + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>; + + /// Return the nar size and nar sha256 digest for a given root node. + /// This can be used to calculate NAR-based output paths, + /// and implementations are encouraged to cache it. + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error>; + + /// Iterate over all PathInfo objects in the store. + /// Implementations can decide to disallow listing. + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>; +} diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs new file mode 100644 index 000000000000..7b4130fcae27 --- /dev/null +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -0,0 +1,312 @@ +use std::io::{self, BufRead, Read, Write}; + +use data_encoding::BASE64; +use futures::{stream::BoxStream, TryStreamExt}; +use nix_compat::{ + narinfo::{self, NarInfo}, + nixbase32, +}; +use reqwest::StatusCode; +use sha2::{digest::FixedOutput, Digest, Sha256}; +use tonic::async_trait; +use tracing::{debug, instrument, warn}; +use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, +}; + +use crate::proto::PathInfo; + +use super::PathInfoService; + +/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache +/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix +/// Store Model. +/// It implements the [PathInfoService] trait in an interesting way: +/// Every [PathInfoService::get] fetches the .narinfo and referred NAR file, +/// inserting components into a [BlobService] and [DirectoryService], then +/// returning a [PathInfo] struct with the root. +/// +/// Due to this being quite a costly operation, clients are expected to layer +/// this service with store composition, so they're only ingested once. +/// +/// The client is expected to be (indirectly) using the same [BlobService] and +/// [DirectoryService], so able to fetch referred Directories and Blobs. +/// [PathInfoService::put] and [PathInfoService::calculate_nar] are not +/// implemented and return an error if called. +/// TODO: what about reading from nix-cache-info? +pub struct NixHTTPPathInfoService<BS, DS> { + base_url: url::Url, + http_client: reqwest::Client, + + blob_service: BS, + directory_service: DS, + + /// An optional list of [narinfo::PubKey]. + /// If set, the .narinfo files received need to have correct signature by at least one of these. + public_keys: Option<Vec<narinfo::PubKey>>, +} + +impl<BS, DS> NixHTTPPathInfoService<BS, DS> { + pub fn new(base_url: url::Url, blob_service: BS, directory_service: DS) -> Self { + Self { + base_url, + http_client: reqwest::Client::new(), + blob_service, + directory_service, + + public_keys: None, + } + } + + /// Configures [Self] to validate NARInfo fingerprints with the public keys passed. + pub fn set_public_keys(&mut self, public_keys: Vec<narinfo::PubKey>) { + self.public_keys = Some(public_keys); + } +} + +#[async_trait] +impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS> +where + BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, + DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, +{ + #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let narinfo_url = self + .base_url + .join(&format!("{}.narinfo", nixbase32::encode(&digest))) + .map_err(|e| { + warn!(e = %e, "unable to join URL"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to join url") + })?; + + debug!(narinfo_url= %narinfo_url, "constructed NARInfo url"); + + let resp = self + .http_client + .get(narinfo_url) + .send() + .await + .map_err(|e| { + warn!(e=%e,"unable to send NARInfo request"); + io::Error::new( + io::ErrorKind::InvalidInput, + "unable to send NARInfo request", + ) + })?; + + // In the case of a 404, return a NotFound. + // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix, + // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org. + if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN { + return Ok(None); + } + + let narinfo_str = resp.text().await.map_err(|e| { + warn!(e=%e,"unable to decode response as string"); + io::Error::new( + io::ErrorKind::InvalidData, + "unable to decode response as string", + ) + })?; + + // parse the received narinfo + let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| { + warn!(e=%e,"unable to parse response as NarInfo"); + io::Error::new( + io::ErrorKind::InvalidData, + "unable to parse response as NarInfo", + ) + })?; + + // if [self.public_keys] is set, ensure there's at least one valid signature. + if let Some(public_keys) = &self.public_keys { + let fingerprint = narinfo.fingerprint(); + + if !public_keys.iter().any(|pubkey| { + narinfo + .signatures + .iter() + .any(|sig| pubkey.verify(&fingerprint, sig)) + }) { + warn!("no valid signature found"); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "no valid signature found", + ))?; + } + } + + // Convert to a (sparse) PathInfo. We still need to populate the node field, + // and for this we need to download the NAR file. + // FUTUREWORK: Keep some database around mapping from narsha256 to + // (unnamed) rootnode, so we can use that (and the name from the + // StorePath) and avoid downloading the same NAR a second time. + let pathinfo: PathInfo = (&narinfo).into(); + + // create a request for the NAR file itself. + let nar_url = self.base_url.join(narinfo.url).map_err(|e| { + warn!(e = %e, "unable to join URL"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to join url") + })?; + debug!(nar_url= %nar_url, "constructed NAR url"); + + let resp = self + .http_client + .get(nar_url.clone()) + .send() + .await + .map_err(|e| { + warn!(e=%e,"unable to send NAR request"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request") + })?; + + // if the request is not successful, return an error. + if !resp.status().is_success() { + return Err(Error::StorageError(format!( + "unable to retrieve NAR at {}, status {}", + nar_url, + resp.status() + ))); + } + + // get an AsyncRead of the response body. + let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| { + let e = e.without_url(); + warn!(e=%e, "failed to get response body"); + io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) + })); + let sync_r = tokio_util::io::SyncIoBridge::new(async_r); + + // handle decompression, by wrapping the reader. + let sync_r: Box<dyn BufRead + Send> = match narinfo.compression { + Some("none") => Box::new(sync_r), + Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))), + Some(comp) => { + return Err(Error::InvalidRequest( + format!("unsupported compression: {}", comp).to_string(), + )) + } + None => { + return Err(Error::InvalidRequest( + "unsupported compression: bzip2".to_string(), + )) + } + }; + + let res = tokio::task::spawn_blocking({ + let blob_service = self.blob_service.clone(); + let directory_service = self.directory_service.clone(); + move || -> io::Result<_> { + // Wrap the reader once more, so we can calculate NarSize and NarHash + let mut sync_r = io::BufReader::new(NarReader::from(sync_r)); + let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?; + + let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner(); + + Ok((root_node, nar_hash, nar_size)) + } + }) + .await + .unwrap(); + + match res { + Ok((root_node, nar_hash, nar_size)) => { + // ensure the ingested narhash and narsize do actually match. + if narinfo.nar_size != nar_size { + warn!( + narinfo.nar_size = narinfo.nar_size, + http.nar_size = nar_size, + "NARSize mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarSize mismatch".to_string(), + ))?; + } else if narinfo.nar_hash != nar_hash { + warn!( + narinfo.nar_hash = BASE64.encode(&narinfo.nar_hash), + http.nar_hash = BASE64.encode(&nar_hash), + "NarHash mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarHash mismatch".to_string(), + ))?; + } + + Ok(Some(PathInfo { + node: Some(castorepb::Node { + // set the name of the root node to the digest-name of the store path. + node: Some( + root_node.rename(narinfo.store_path.to_string().to_owned().into()), + ), + }), + references: pathinfo.references, + narinfo: pathinfo.narinfo, + })) + } + Err(e) => Err(e.into()), + } + } + + #[instrument(skip_all, fields(path_info=?_path_info))] + async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> { + Err(Error::InvalidRequest( + "put not supported for this backend".to_string(), + )) + } + + #[instrument(skip_all, fields(root_node=?root_node))] + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + Err(Error::InvalidRequest( + "calculate_nar not supported for this backend".to_string(), + )) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + Box::pin(futures::stream::once(async { + Err(Error::InvalidRequest( + "list not supported for this backend".to_string(), + )) + })) + } +} + +/// Small helper reader implementing [std::io::Read]. +/// It can be used to wrap another reader, counts the number of bytes read +/// and the sha256 digest of the contents. +struct NarReader<R: Read> { + r: R, + + sha256: sha2::Sha256, + bytes_read: u64, +} + +impl<R: Read> NarReader<R> { + pub fn from(inner: R) -> Self { + Self { + r: inner, + sha256: Sha256::new(), + bytes_read: 0, + } + } + + /// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read. + pub fn into_inner(self) -> (R, [u8; 32], u64) { + (self.r, self.sha256.finalize_fixed().into(), self.bytes_read) + } +} + +impl<R: Read> Read for NarReader<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.r.read(buf).map(|n| { + self.bytes_read += n as u64; + self.sha256.write_all(&buf[..n]).unwrap(); + n + }) + } +} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs new file mode 100644 index 000000000000..7b6d7fd7abcd --- /dev/null +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -0,0 +1,140 @@ +use super::PathInfoService; +use crate::nar::calculate_size_and_sha256; +use crate::proto::PathInfo; +use futures::stream::iter; +use futures::stream::BoxStream; +use prost::Message; +use std::path::Path; +use tonic::async_trait; +use tracing::warn; +use tvix_castore::proto as castorepb; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; + +/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). +/// +/// The PathInfo messages are stored as encoded protos, and keyed by their output hash, +/// as that's currently the only request type available. +pub struct SledPathInfoService<BS, DS> { + db: sled::Db, + + blob_service: BS, + directory_service: DS, +} + +impl<BS, DS> SledPathInfoService<BS, DS> { + pub fn new<P: AsRef<Path>>( + p: P, + blob_service: BS, + directory_service: DS, + ) -> Result<Self, sled::Error> { + let config = sled::Config::default() + .use_compression(false) // is a required parameter + .path(p); + let db = config.open()?; + + Ok(Self { + db, + blob_service, + directory_service, + }) + } + + pub fn new_temporary(blob_service: BS, directory_service: DS) -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { + db, + blob_service, + directory_service, + }) + } +} + +#[async_trait] +impl<BS, DS> PathInfoService for SledPathInfoService<BS, DS> +where + BS: AsRef<dyn BlobService> + Send + Sync, + DS: AsRef<dyn DirectoryService> + Send + Sync, +{ + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + match self.db.get(digest) { + Ok(None) => Ok(None), + Ok(Some(data)) => match PathInfo::decode(&*data) { + Ok(path_info) => Ok(Some(path_info)), + Err(e) => { + warn!("failed to decode stored PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to decode stored PathInfo: {}", + e + ))) + } + }, + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to retrieve PathInfo: {}", + e + ))) + } + } + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + match path_info.validate() { + Err(e) => Err(Error::InvalidRequest(format!( + "failed to validate PathInfo: {}", + e + ))), + // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. + // This overwrites existing PathInfo objects. + Ok(nix_path) => match self + .db + .insert(*nix_path.digest(), path_info.encode_to_vec()) + { + Ok(_) => Ok(path_info), + Err(e) => { + warn!("failed to insert PathInfo: {}", e); + Err(Error::StorageError(format! { + "failed to insert PathInfo: {}", e + })) + } + }, + } + } + + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) + .await + .map_err(|e| Error::StorageError(e.to_string())) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + Box::pin(iter(self.db.iter().values().map(|v| match v { + Ok(data) => { + // we retrieved some bytes + match PathInfo::decode(&*data) { + Ok(path_info) => Ok(path_info), + Err(e) => { + warn!("failed to decode stored PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to decode stored PathInfo: {}", + e + ))) + } + } + } + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to retrieve PathInfo: {}", + e + ))) + } + }))) + } +} diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs new file mode 100644 index 000000000000..a5c5776cd4ef --- /dev/null +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -0,0 +1,116 @@ +use crate::nar::RenderError; +use crate::pathinfoservice::PathInfoService; +use crate::proto; +use futures::{stream::BoxStream, TryStreamExt}; +use std::ops::Deref; +use tonic::{async_trait, Request, Response, Result, Status}; +use tracing::{instrument, warn}; +use tvix_castore::proto as castorepb; + +pub struct GRPCPathInfoServiceWrapper<PS> { + inner: PS, + // FUTUREWORK: allow exposing without allowing listing +} + +impl<PS> GRPCPathInfoServiceWrapper<PS> { + pub fn new(path_info_service: PS) -> Self { + Self { + inner: path_info_service, + } + } +} + +#[async_trait] +impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS> +where + PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static, +{ + type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>; + + #[instrument(skip(self))] + async fn get( + &self, + request: Request<proto::GetPathInfoRequest>, + ) -> Result<Response<proto::PathInfo>> { + match request.into_inner().by_what { + None => Err(Status::unimplemented("by_what needs to be specified")), + Some(proto::get_path_info_request::ByWhat::ByOutputHash(output_digest)) => { + let digest: [u8; 20] = output_digest + .to_vec() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid output digest length"))?; + match self.inner.get(digest).await { + Ok(None) => Err(Status::not_found("PathInfo not found")), + Ok(Some(path_info)) => Ok(Response::new(path_info)), + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(e.into()) + } + } + } + } + } + + #[instrument(skip(self))] + async fn put(&self, request: Request<proto::PathInfo>) -> Result<Response<proto::PathInfo>> { + let path_info = request.into_inner(); + + // Store the PathInfo in the client. Clients MUST validate the data + // they receive, so we don't validate additionally here. + match self.inner.put(path_info).await { + Ok(path_info_new) => Ok(Response::new(path_info_new)), + Err(e) => { + warn!("failed to insert PathInfo: {}", e); + Err(e.into()) + } + } + } + + #[instrument(skip(self))] + async fn calculate_nar( + &self, + request: Request<castorepb::Node>, + ) -> Result<Response<proto::CalculateNarResponse>> { + match request.into_inner().node { + None => Err(Status::invalid_argument("no root node sent")), + Some(root_node) => { + let (nar_size, nar_sha256) = self + .inner + .calculate_nar(&root_node) + .await + .expect("error during nar calculation"); // TODO: handle error + + Ok(Response::new(proto::CalculateNarResponse { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + })) + } + } + } + + #[instrument(skip(self))] + async fn list( + &self, + _request: Request<proto::ListPathInfoRequest>, + ) -> Result<Response<Self::ListStream>, Status> { + let stream = Box::pin( + self.inner + .list() + .map_err(|e| Status::internal(e.to_string())), + ); + + Ok(Response::new(Box::pin(stream))) + } +} + +impl From<RenderError> for tonic::Status { + fn from(value: RenderError) -> Self { + match value { + RenderError::BlobNotFound(_, _) => Self::not_found(value.to_string()), + RenderError::DirectoryNotFound(_, _) => Self::not_found(value.to_string()), + RenderError::NARWriterError(_) => Self::internal(value.to_string()), + RenderError::StoreError(_) => Self::internal(value.to_string()), + RenderError::UnexpectedBlobMeta(_, _, _, _) => Self::internal(value.to_string()), + } + } +} diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs new file mode 100644 index 000000000000..665da8c6c58f --- /dev/null +++ b/tvix/store/src/proto/mod.rs @@ -0,0 +1,368 @@ +#![allow(clippy::derive_partial_eq_without_eq, non_snake_case)] +use bytes::Bytes; +use data_encoding::BASE64; +// https://github.com/hyperium/tonic/issues/1056 +use nix_compat::{ + narinfo::Flags, + nixhash::{CAHash, NixHash}, + store_path::{self, StorePathRef}, +}; +use thiserror::Error; +use tvix_castore::proto::{self as castorepb, NamedNode, ValidateNodeError}; + +mod grpc_pathinfoservice_wrapper; + +pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper; + +tonic::include_proto!("tvix.store.v1"); + +#[cfg(feature = "tonic-reflection")] +/// Compiled file descriptors for implementing [gRPC +/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g. +/// [`tonic_reflection`](https://docs.rs/tonic-reflection). +pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.store.v1"); + +#[cfg(test)] +mod tests; + +/// Errors that can occur during the validation of PathInfo messages. +#[derive(Debug, Error, PartialEq)] +pub enum ValidatePathInfoError { + /// Invalid length of a reference + #[error("Invalid length of digest at position {}, expected {}, got {}", .0, store_path::DIGEST_SIZE, .1)] + InvalidReferenceDigestLen(usize, usize), + + /// No node present + #[error("No node present")] + NoNodePresent, + + /// Node fails validation + #[error("Invalid root node: {:?}", .0.to_string())] + InvalidRootNode(ValidateNodeError), + + /// Invalid node name encountered. Root nodes in PathInfos have more strict name requirements + #[error("Failed to parse {0:?} as StorePath: {1}")] + InvalidNodeName(Vec<u8>, store_path::Error), + + /// The digest in narinfo.nar_sha256 has an invalid len. + #[error("Invalid narinfo.nar_sha256 length: expected {}, got {}", 32, .0)] + InvalidNarSha256DigestLen(usize), + + /// The number of references in the narinfo.reference_names field does not match + /// the number of references in the .references field. + #[error("Inconsistent Number of References: {0} (references) vs {1} (narinfo)")] + InconsistentNumberOfReferences(usize, usize), + + /// A string in narinfo.reference_names does not parse to a [store_path::StorePath]. + #[error("Invalid reference_name at position {0}: {1}")] + InvalidNarinfoReferenceName(usize, String), + + /// The digest in the parsed `.narinfo.reference_names[i]` does not match + /// the one in `.references[i]`.` + #[error("digest in reference_name at position {} does not match digest in PathInfo, expected {}, got {}", .0, BASE64.encode(.1), BASE64.encode(.2))] + InconsistentNarinfoReferenceNameDigest( + usize, + [u8; store_path::DIGEST_SIZE], + [u8; store_path::DIGEST_SIZE], + ), + + /// The deriver field is invalid. + #[error("deriver field is invalid: {0}")] + InvalidDeriverField(store_path::Error), +} + +/// Parses a root node name. +/// +/// On success, this returns the parsed [store_path::StorePath]. +/// On error, it returns an error generated from the supplied constructor. +fn parse_node_name_root<E>( + name: &[u8], + err: fn(Vec<u8>, store_path::Error) -> E, +) -> Result<store_path::StorePath, E> { + match store_path::StorePath::from_bytes(name) { + Ok(np) => Ok(np), + Err(e) => Err(err(name.to_vec(), e)), + } +} + +impl PathInfo { + /// validate performs some checks on the PathInfo struct, + /// Returning either a [store_path::StorePath] of the root node, or a + /// [ValidatePathInfoError]. + pub fn validate(&self) -> Result<store_path::StorePath, ValidatePathInfoError> { + // ensure the references have the right number of bytes. + for (i, reference) in self.references.iter().enumerate() { + if reference.len() != store_path::DIGEST_SIZE { + return Err(ValidatePathInfoError::InvalidReferenceDigestLen( + i, + reference.len(), + )); + } + } + + // If there is a narinfo field populated… + if let Some(narinfo) = &self.narinfo { + // ensure the nar_sha256 digest has the correct length. + if narinfo.nar_sha256.len() != 32 { + return Err(ValidatePathInfoError::InvalidNarSha256DigestLen( + narinfo.nar_sha256.len(), + )); + } + + // ensure the number of references there matches PathInfo.references count. + if narinfo.reference_names.len() != self.references.len() { + return Err(ValidatePathInfoError::InconsistentNumberOfReferences( + self.references.len(), + narinfo.reference_names.len(), + )); + } + + // parse references in reference_names. + for (i, reference_name_str) in narinfo.reference_names.iter().enumerate() { + // ensure thy parse as (non-absolute) store path + let reference_names_store_path = store_path::StorePath::from_bytes( + reference_name_str.as_bytes(), + ) + .map_err(|_| { + ValidatePathInfoError::InvalidNarinfoReferenceName( + i, + reference_name_str.to_owned(), + ) + })?; + + // ensure their digest matches the one at self.references[i]. + { + // This is safe, because we ensured the proper length earlier already. + let reference_digest = self.references[i].to_vec().try_into().unwrap(); + + if reference_names_store_path.digest() != &reference_digest { + return Err( + ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest( + i, + reference_digest, + *reference_names_store_path.digest(), + ), + ); + } + } + + // If the Deriver field is populated, ensure it parses to a + // [store_path::StorePath]. + // We can't check for it to *not* end with .drv, as the .drv files produced by + // recursive Nix end with multiple .drv suffixes, and only one is popped when + // converting to this field. + if let Some(deriver) = &narinfo.deriver { + store_path::StorePathRef::from_name_and_digest(&deriver.name, &deriver.digest) + .map_err(ValidatePathInfoError::InvalidDeriverField)? + .to_owned(); + } + } + } + + // Ensure there is a (root) node present, and it properly parses to a [store_path::StorePath]. + let root_nix_path = match &self.node { + None | Some(castorepb::Node { node: None }) => { + Err(ValidatePathInfoError::NoNodePresent)? + } + Some(castorepb::Node { node: Some(node) }) => { + node.validate() + .map_err(ValidatePathInfoError::InvalidRootNode)?; + // parse the name of the node itself and return + parse_node_name_root(node.get_name(), ValidatePathInfoError::InvalidNodeName)? + } + }; + + // return the root nix path + Ok(root_nix_path) + } + + /// With self and a given StorePathRef, this reconstructs a + /// [nix_compat::narinfo::NarInfo<'_>]. + /// It can be used to validate Signatures, or get back a (sparse) NarInfo + /// struct to prepare writing it out. + /// + /// This doesn't allocate any new data. + /// + /// Keep in mind this is not able to reconstruct all data present in the + /// NarInfo<'_>, as some of it is not stored at all: + /// - the `system`, `file_hash` and `file_size` fields are set to `None`. + /// - the URL is set to an empty string. + /// - Compression is set to "none" + /// + /// If you want to render it out to a string and be able to parse it back + /// in, at least URL *must* be set again. + pub fn as_narinfo<'a>( + &'a self, + store_path: store_path::StorePathRef<'a>, + ) -> Option<nix_compat::narinfo::NarInfo<'_>> { + let narinfo = &self.narinfo.as_ref()?; + + Some(nix_compat::narinfo::NarInfo { + flags: Flags::empty(), + store_path, + nar_hash: narinfo.nar_sha256.to_vec().try_into().unwrap(), + nar_size: narinfo.nar_size, + references: narinfo + .reference_names + .iter() + .map(|ref_name| { + // This shouldn't pass validation + StorePathRef::from_bytes(ref_name.as_bytes()).expect("invalid reference") + }) + .collect(), + signatures: narinfo + .signatures + .iter() + .map(|sig| { + nix_compat::narinfo::Signature::new( + &sig.name, + // This shouldn't pass validation + sig.data[..].try_into().expect("invalid signature len"), + ) + }) + .collect(), + ca: narinfo + .ca + .as_ref() + .map(|ca| ca.try_into().expect("invalid ca")), + system: None, + deriver: narinfo.deriver.as_ref().map(|deriver| { + StorePathRef::from_name_and_digest(&deriver.name, &deriver.digest) + .expect("invalid deriver") + }), + url: "", + compression: Some("none"), + file_hash: None, + file_size: None, + }) + } +} + +/// Errors that can occur when converting from a [nar_info::Ca] to a (stricter) +/// [nix_compat::nixhash::CAHash]. +#[derive(Debug, Error, PartialEq)] +pub enum ConvertCAError { + /// Invalid length of a reference + #[error("Invalid digest length '{0}' for type {1}")] + InvalidReferenceDigestLen(usize, &'static str), + /// Unknown Hash type + #[error("Unknown hash type: {0}")] + UnknownHashType(i32), +} + +impl TryFrom<&nar_info::Ca> for nix_compat::nixhash::CAHash { + type Error = ConvertCAError; + + fn try_from(value: &nar_info::Ca) -> Result<Self, Self::Error> { + Ok(match value.r#type { + typ if typ == nar_info::ca::Hash::FlatMd5 as i32 => { + Self::Flat(NixHash::Md5(value.digest[..].try_into().map_err(|_| { + ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatMd5") + })?)) + } + typ if typ == nar_info::ca::Hash::FlatSha1 as i32 => { + Self::Flat(NixHash::Sha1(value.digest[..].try_into().map_err( + |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha1"), + )?)) + } + typ if typ == nar_info::ca::Hash::FlatSha256 as i32 => { + Self::Flat(NixHash::Sha256(value.digest[..].try_into().map_err( + |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha256"), + )?)) + } + typ if typ == nar_info::ca::Hash::FlatSha512 as i32 => Self::Flat(NixHash::Sha512( + Box::new(value.digest[..].try_into().map_err(|_| { + ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha512") + })?), + )), + typ if typ == nar_info::ca::Hash::NarMd5 as i32 => { + Self::Nar(NixHash::Md5(value.digest[..].try_into().map_err(|_| { + ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarMd5") + })?)) + } + typ if typ == nar_info::ca::Hash::NarSha1 as i32 => { + Self::Nar(NixHash::Sha1(value.digest[..].try_into().map_err( + |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha1"), + )?)) + } + typ if typ == nar_info::ca::Hash::NarSha256 as i32 => { + Self::Nar(NixHash::Sha256(value.digest[..].try_into().map_err( + |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha256"), + )?)) + } + typ if typ == nar_info::ca::Hash::NarSha512 as i32 => Self::Nar(NixHash::Sha512( + Box::new(value.digest[..].try_into().map_err(|_| { + ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha512") + })?), + )), + typ => return Err(ConvertCAError::UnknownHashType(typ)), + }) + } +} + +impl From<&nix_compat::nixhash::CAHash> for nar_info::Ca { + fn from(value: &nix_compat::nixhash::CAHash) -> Self { + nar_info::Ca { + r#type: match value { + CAHash::Flat(NixHash::Md5(_)) => nar_info::ca::Hash::FlatMd5.into(), + CAHash::Flat(NixHash::Sha1(_)) => nar_info::ca::Hash::FlatSha1.into(), + CAHash::Flat(NixHash::Sha256(_)) => nar_info::ca::Hash::FlatSha256.into(), + CAHash::Flat(NixHash::Sha512(_)) => nar_info::ca::Hash::FlatSha512.into(), + CAHash::Nar(NixHash::Md5(_)) => nar_info::ca::Hash::NarMd5.into(), + CAHash::Nar(NixHash::Sha1(_)) => nar_info::ca::Hash::NarSha1.into(), + CAHash::Nar(NixHash::Sha256(_)) => nar_info::ca::Hash::NarSha256.into(), + CAHash::Nar(NixHash::Sha512(_)) => nar_info::ca::Hash::NarSha512.into(), + CAHash::Text(_) => nar_info::ca::Hash::TextSha256.into(), + }, + digest: match value { + CAHash::Flat(ref nixhash) | CAHash::Nar(ref nixhash) => { + nixhash.digest_as_bytes().to_vec().into() + } + CAHash::Text(digest) => digest.to_vec().into(), + }, + } + } +} + +impl From<&nix_compat::narinfo::NarInfo<'_>> for NarInfo { + /// Converts from a NarInfo (returned from the NARInfo parser) to the proto- + /// level NarInfo struct. + fn from(value: &nix_compat::narinfo::NarInfo<'_>) -> Self { + let signatures = value + .signatures + .iter() + .map(|sig| nar_info::Signature { + name: sig.name().to_string(), + data: Bytes::copy_from_slice(sig.bytes()), + }) + .collect(); + + NarInfo { + nar_size: value.nar_size, + nar_sha256: Bytes::copy_from_slice(&value.nar_hash), + signatures, + reference_names: value.references.iter().map(|r| r.to_string()).collect(), + deriver: value.deriver.as_ref().map(|sp| StorePath { + name: sp.name().to_owned(), + digest: Bytes::copy_from_slice(sp.digest()), + }), + ca: value.ca.as_ref().map(|ca| ca.into()), + } + } +} + +impl From<&nix_compat::narinfo::NarInfo<'_>> for PathInfo { + /// Converts from a NarInfo (returned from the NARInfo parser) to a PathInfo + /// struct with the node set to None. + fn from(value: &nix_compat::narinfo::NarInfo<'_>) -> Self { + Self { + node: None, + references: value + .references + .iter() + .map(|x| Bytes::copy_from_slice(x.digest())) + .collect(), + narinfo: Some(value.into()), + } + } +} diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs new file mode 100644 index 000000000000..8016b9901d96 --- /dev/null +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -0,0 +1,74 @@ +use crate::proto::get_path_info_request::ByWhat::ByOutputHash; +use crate::proto::path_info_service_server::PathInfoService as GRPCPathInfoService; +use crate::proto::GRPCPathInfoServiceWrapper; +use crate::proto::GetPathInfoRequest; +use crate::proto::PathInfo; +use crate::tests::fixtures::DUMMY_OUTPUT_HASH; +use crate::tests::utils::gen_blob_service; +use crate::tests::utils::gen_directory_service; +use crate::tests::utils::gen_pathinfo_service; +use futures::stream::BoxStream; +use std::sync::Arc; +use tonic::Request; +use tvix_castore::proto as castorepb; + +/// generates a GRPCPathInfoService out of blob, directory and pathinfo services. +/// +/// We only interact with it via the PathInfo GRPC interface. +/// It uses the NonCachingNARCalculationService NARCalculationService to +/// calculate NARs. +fn gen_grpc_service( +) -> Arc<dyn GRPCPathInfoService<ListStream = BoxStream<'static, Result<PathInfo, tonic::Status>>>> +{ + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service( + blob_service, + directory_service, + ))) +} + +/// Trying to get a non-existent PathInfo should return a not found error. +#[tokio::test] +async fn not_found() { + let service = gen_grpc_service(); + + let resp = service + .get(Request::new(GetPathInfoRequest { + by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.clone())), + })) + .await; + + let resp = resp.expect_err("must fail"); + assert_eq!(resp.code(), tonic::Code::NotFound); +} + +/// Put a PathInfo into the store, get it back. +#[tokio::test] +async fn put_get() { + let service = gen_grpc_service(); + + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "00000000000000000000000000000000-foo".into(), + target: "doesntmatter".into(), + })), + }), + ..Default::default() + }; + + let resp = service.put(Request::new(path_info.clone())).await; + + assert!(resp.is_ok()); + assert_eq!(resp.expect("must succeed").into_inner(), path_info); + + let resp = service + .get(Request::new(GetPathInfoRequest { + by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.clone())), + })) + .await; + + assert!(resp.is_ok()); + assert_eq!(resp.expect("must succeed").into_inner(), path_info); +} diff --git a/tvix/store/src/proto/tests/mod.rs b/tvix/store/src/proto/tests/mod.rs new file mode 100644 index 000000000000..bff885624380 --- /dev/null +++ b/tvix/store/src/proto/tests/mod.rs @@ -0,0 +1,2 @@ +mod grpc_pathinfoservice; +mod pathinfo; diff --git a/tvix/store/src/proto/tests/pathinfo.rs b/tvix/store/src/proto/tests/pathinfo.rs new file mode 100644 index 000000000000..7c1b69c2a376 --- /dev/null +++ b/tvix/store/src/proto/tests/pathinfo.rs @@ -0,0 +1,448 @@ +use crate::proto::{nar_info::Signature, NarInfo, PathInfo, ValidatePathInfoError}; +use crate::tests::fixtures::*; +use bytes::Bytes; +use data_encoding::BASE64; +use nix_compat::nixbase32; +use nix_compat::store_path::{self, StorePath, StorePathRef}; +use std::str::FromStr; +use test_case::test_case; +use tvix_castore::proto as castorepb; + +#[test_case( + None, + Err(ValidatePathInfoError::NoNodePresent) ; + "No node" +)] +#[test_case( + Some(castorepb::Node { node: None }), + Err(ValidatePathInfoError::NoNodePresent); + "No node 2" +)] +fn validate_no_node( + t_node: Option<castorepb::Node>, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: t_node, + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +#[test_case( + castorepb::DirectoryNode { + name: DUMMY_NAME.into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, + }, + Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed")); + "ok" +)] +#[test_case( + castorepb::DirectoryNode { + name: DUMMY_NAME.into(), + digest: Bytes::new(), + size: 0, + }, + Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))); + "invalid digest length" +)] +#[test_case( + castorepb::DirectoryNode { + name: "invalid".into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".into(), + store_path::Error::InvalidLength + )); + "invalid node name" +)] +fn validate_directory( + t_directory_node: castorepb::DirectoryNode, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Directory(t_directory_node)), + }), + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +#[test_case( + castorepb::FileNode { + name: DUMMY_NAME.into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, + executable: false, + }, + Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed")); + "ok" +)] +#[test_case( + castorepb::FileNode { + name: DUMMY_NAME.into(), + digest: Bytes::new(), + ..Default::default() + }, + Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))); + "invalid digest length" +)] +#[test_case( + castorepb::FileNode { + name: "invalid".into(), + digest: DUMMY_DIGEST.clone().into(), + ..Default::default() + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".into(), + store_path::Error::InvalidLength + )); + "invalid node name" +)] +fn validate_file( + t_file_node: castorepb::FileNode, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::File(t_file_node)), + }), + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +#[test_case( + castorepb::SymlinkNode { + name: DUMMY_NAME.into(), + target: "foo".into(), + }, + Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed")); + "ok" +)] +#[test_case( + castorepb::SymlinkNode { + name: "invalid".into(), + target: "foo".into(), + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".into(), + store_path::Error::InvalidLength + )); + "invalid node name" +)] +fn validate_symlink( + t_symlink_node: castorepb::SymlinkNode, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Symlink(t_symlink_node)), + }), + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +/// Ensure parsing a correct PathInfo without narinfo populated succeeds. +#[test] +fn validate_references_without_narinfo_ok() { + assert!(PATH_INFO_WITHOUT_NARINFO.validate().is_ok()); +} + +/// Ensure parsing a correct PathInfo with narinfo populated succeeds. +#[test] +fn validate_references_with_narinfo_ok() { + assert!(PATH_INFO_WITH_NARINFO.validate().is_ok()); +} + +/// Create a PathInfo with a wrong digest length in narinfo.nar_sha256, and +/// ensure validation fails. +#[test] +fn validate_wrong_nar_sha256() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + path_info.narinfo.as_mut().unwrap().nar_sha256 = vec![0xbe, 0xef].into(); + + match path_info.validate().expect_err("must_fail") { + ValidatePathInfoError::InvalidNarSha256DigestLen(2) => {} + e => panic!("unexpected error: {:?}", e), + }; +} + +/// Create a PathInfo with a wrong count of narinfo.reference_names, +/// and ensure validation fails. +#[test] +fn validate_inconsistent_num_refs_fail() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + path_info.narinfo.as_mut().unwrap().reference_names = vec![]; + + match path_info.validate().expect_err("must_fail") { + ValidatePathInfoError::InconsistentNumberOfReferences(1, 0) => {} + e => panic!("unexpected error: {:?}", e), + }; +} + +/// Create a PathInfo with a wrong digest length in references. +#[test] +fn validate_invalid_reference_digest_len() { + let mut path_info = PATH_INFO_WITHOUT_NARINFO.clone(); + path_info.references.push(vec![0xff, 0xff].into()); + + match path_info.validate().expect_err("must fail") { + ValidatePathInfoError::InvalidReferenceDigestLen( + 1, // position + 2, // unexpected digest len + ) => {} + e => panic!("unexpected error: {:?}", e), + }; +} + +/// Create a PathInfo with a narinfo.reference_name[1] that is no valid store path. +#[test] +fn validate_invalid_narinfo_reference_name() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + + // This is invalid, as the store prefix is not part of reference_names. + path_info.narinfo.as_mut().unwrap().reference_names[0] = + "/nix/store/00000000000000000000000000000000-dummy".to_string(); + + match path_info.validate().expect_err("must fail") { + ValidatePathInfoError::InvalidNarinfoReferenceName(0, reference_name) => { + assert_eq!( + "/nix/store/00000000000000000000000000000000-dummy", + reference_name + ); + } + e => panic!("unexpected error: {:?}", e), + } +} + +/// Create a PathInfo with a narinfo.reference_name[0] that doesn't match references[0]. +#[test] +fn validate_inconsistent_narinfo_reference_name_digest() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + + // mutate the first reference, they were all zeroes before + path_info.references[0] = vec![0xff; store_path::DIGEST_SIZE].into(); + + match path_info.validate().expect_err("must fail") { + ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest(0, e_expected, e_actual) => { + assert_eq!(path_info.references[0][..], e_expected); + assert_eq!(DUMMY_OUTPUT_HASH[..], e_actual); + } + e => panic!("unexpected error: {:?}", e), + } +} + +/// Create a node with an empty symlink target, and ensure it fails validation. +#[test] +fn validate_symlink_empty_target_invalid() { + let node = castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "foo".into(), + target: "".into(), + }); + + node.validate().expect_err("must fail validation"); +} + +/// Create a node with a symlink target including null bytes, and ensure it +/// fails validation. +#[test] +fn validate_symlink_target_null_byte_invalid() { + let node = castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "foo".into(), + target: "foo\0".into(), + }); + + node.validate().expect_err("must fail validation"); +} + +/// Create a PathInfo with a correct deriver field and ensure it succeeds. +#[test] +fn validate_valid_deriver() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + + // add a valid deriver + let narinfo = path_info.narinfo.as_mut().unwrap(); + narinfo.deriver = Some(crate::proto::StorePath { + name: "foo".to_string(), + digest: DUMMY_OUTPUT_HASH.clone(), + }); + + path_info.validate().expect("must validate"); +} + +/// Create a PathInfo with a broken deriver field and ensure it fails. +#[test] +fn validate_invalid_deriver() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + + // add a broken deriver (invalid digest) + let narinfo = path_info.narinfo.as_mut().unwrap(); + narinfo.deriver = Some(crate::proto::StorePath { + name: "foo".to_string(), + digest: vec![].into(), + }); + + match path_info.validate().expect_err("must fail validation") { + ValidatePathInfoError::InvalidDeriverField(_) => {} + e => panic!("unexpected error: {:?}", e), + } +} + +#[test] +fn from_nixcompat_narinfo() { + let narinfo_parsed = nix_compat::narinfo::NarInfo::parse( + r#"StorePath: /nix/store/s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1 +URL: nar/1nhgq6wcggx0plpy4991h3ginj6hipsdslv4fd4zml1n707j26yq.nar.xz +Compression: xz +FileHash: sha256:1nhgq6wcggx0plpy4991h3ginj6hipsdslv4fd4zml1n707j26yq +FileSize: 50088 +NarHash: sha256:0yzhigwjl6bws649vcs2asa4lbs8hg93hyix187gc7s7a74w5h80 +NarSize: 226488 +References: 3n58xw4373jp0ljirf06d8077j15pc4j-glibc-2.37-8 s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1 +Deriver: ib3sh3pcz10wsmavxvkdbayhqivbghlq-hello-2.12.1.drv +Sig: cache.nixos.org-1:8ijECciSFzWHwwGVOIVYdp2fOIOJAfmzGHPQVwpktfTQJF6kMPPDre7UtFw3o+VqenC5P8RikKOAAfN7CvPEAg=="#).expect("must parse"); + + assert_eq!( + PathInfo { + node: None, + references: vec![ + Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("3n58xw4373jp0ljirf06d8077j15pc4j").unwrap()), + Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("s66mzxpvicwk07gjbjfw9izjfa797vsw").unwrap()), + ], + narinfo: Some( + NarInfo { + nar_size: 226488, + nar_sha256: Bytes::copy_from_slice( + &nixbase32::decode_fixed::<32>("0yzhigwjl6bws649vcs2asa4lbs8hg93hyix187gc7s7a74w5h80".as_bytes()) + .unwrap() + ), + signatures: vec![Signature { + name: "cache.nixos.org-1".to_string(), + data: BASE64.decode("8ijECciSFzWHwwGVOIVYdp2fOIOJAfmzGHPQVwpktfTQJF6kMPPDre7UtFw3o+VqenC5P8RikKOAAfN7CvPEAg==".as_bytes()).unwrap().into(), + }], + reference_names: vec![ + "3n58xw4373jp0ljirf06d8077j15pc4j-glibc-2.37-8".to_string(), + "s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1".to_string() + ], + deriver: Some(crate::proto::StorePath { + digest: Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("ib3sh3pcz10wsmavxvkdbayhqivbghlq").unwrap()), + name: "hello-2.12.1".to_string(), + }), + ca: None, + } + ) + }, + (&narinfo_parsed).into(), + ); +} + +#[test] +fn from_nixcompat_narinfo_fod() { + let narinfo_parsed = nix_compat::narinfo::NarInfo::parse( + r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz +URL: nar/1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r.nar.xz +Compression: xz +FileHash: sha256:1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r +FileSize: 1033524 +NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh +NarSize: 1033416 +References: +Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv +Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg== +CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"# + ).expect("must parse"); + + assert_eq!( + PathInfo { + node: None, + references: vec![], + narinfo: Some( + NarInfo { + nar_size: 1033416, + nar_sha256: Bytes::copy_from_slice( + &nixbase32::decode_fixed::<32>( + "1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh" + ) + .unwrap() + ), + signatures: vec![Signature { + name: "cache.nixos.org-1".to_string(), + data: BASE64 + .decode("ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg==".as_bytes()) + .unwrap() + .into(), + }], + reference_names: vec![], + deriver: Some(crate::proto::StorePath { + digest: Bytes::copy_from_slice( + &nixbase32::decode_fixed::<20>("dyivpmlaq2km6c11i0s6bi6mbsx0ylqf").unwrap() + ), + name: "hello-2.12.1.tar.gz".to_string(), + }), + ca: Some(crate::proto::nar_info::Ca { + r#type: crate::proto::nar_info::ca::Hash::FlatSha256.into(), + digest: Bytes::copy_from_slice( + &nixbase32::decode_fixed::<32>( + "086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd" + ) + .unwrap() + ) + }), + } + ), + }, + (&narinfo_parsed).into() + ); +} + +/// Exercise .as_narinfo() on a PathInfo and ensure important fields are preserved.. +#[test] +fn as_narinfo() { + let narinfo_parsed = nix_compat::narinfo::NarInfo::parse( + r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz +URL: nar/1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r.nar.xz +Compression: xz +FileHash: sha256:1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r +FileSize: 1033524 +NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh +NarSize: 1033416 +References: +Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv +Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg== +CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"# + ).expect("must parse"); + + let path_info: PathInfo = (&narinfo_parsed).into(); + + let mut narinfo_returned = path_info + .as_narinfo( + StorePathRef::from_bytes(b"pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz") + .expect("invalid storepath"), + ) + .expect("must be some"); + narinfo_returned.url = "some.nar"; + + assert_eq!( + r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz +URL: some.nar +Compression: none +NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh +NarSize: 1033416 +References: +Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv +Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg== +CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd +"#, + narinfo_returned.to_string(), + ); +} diff --git a/tvix/store/src/tests/fixtures.rs b/tvix/store/src/tests/fixtures.rs new file mode 100644 index 000000000000..7c72d71c6d1e --- /dev/null +++ b/tvix/store/src/tests/fixtures.rs @@ -0,0 +1,125 @@ +use lazy_static::lazy_static; +pub use tvix_castore::fixtures::*; +use tvix_castore::proto as castorepb; + +use crate::proto::{nar_info::ca, nar_info::Ca, NarInfo, PathInfo}; + +pub const DUMMY_NAME: &str = "00000000000000000000000000000000-dummy"; + +lazy_static! { + // output hash + pub static ref DUMMY_OUTPUT_HASH: bytes::Bytes = vec![0; 20].into(); + + /// The NAR representation of a symlink pointing to `/nix/store/somewhereelse` + pub static ref NAR_CONTENTS_SYMLINK: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink" + 6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target + 24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o', + b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's', + b'e', // "/nix/store/somewhereelse" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")" + ]; + + /// The NAR representation of a regular file with the contents "Hello World!" + pub static ref NAR_CONTENTS_HELLOWORLD: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 12, 0, 0, 0, 0, 0, 0, 0, b'H', b'e', b'l', b'l', b'o', b' ', b'W', b'o', b'r', b'l', b'd', b'!', 0, 0, + 0, 0, // "Hello World!" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")" + ]; + + /// The NAR representation of a more complicated directory structure. + pub static ref NAR_CONTENTS_COMPLICATED: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 5, 0, 0, 0, 0, 0, 0, 0, b'.', b'k', b'e', b'e', b'p', 0, 0, 0, // ".keep" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 0, 0, 0, 0, 0, 0, 0, 0, // "" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 2, 0, 0, 0, 0, 0, 0, 0, b'a', b'a', 0, 0, 0, 0, 0, 0, // "aa" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink" + 6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target + 24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o', + b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's', + b'e', // "/nix/store/somewhereelse" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 4, 0, 0, 0, 0, 0, 0, 0, b'k', b'e', b'e', b'p', 0, 0, 0, 0, // "keep" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 5, 0, 0, 0, 0, 0, 0, 0, 46, 107, 101, 101, 112, 0, 0, 0, // ".keep" + 4, 0, 0, 0, 0, 0, 0, 0, 110, 111, 100, 101, 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 0, 0, 0, 0, 0, 0, 0, 0, // "" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + ]; + + /// A PathInfo message without .narinfo populated. + pub static ref PATH_INFO_WITHOUT_NARINFO : PathInfo = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: DUMMY_NAME.into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, + })), + }), + references: vec![DUMMY_OUTPUT_HASH.clone()], + narinfo: None, + }; + + /// A PathInfo message with .narinfo populated. + /// The references in `narinfo.reference_names` aligns with what's in + /// `references`. + pub static ref PATH_INFO_WITH_NARINFO : PathInfo = PathInfo { + narinfo: Some(NarInfo { + nar_size: 0, + nar_sha256: DUMMY_DIGEST.clone().into(), + signatures: vec![], + reference_names: vec![DUMMY_NAME.to_string()], + deriver: None, + ca: Some(Ca { r#type: ca::Hash::NarSha256.into(), digest: DUMMY_DIGEST.clone().into() }) + }), + ..PATH_INFO_WITHOUT_NARINFO.clone() + }; +} diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs new file mode 100644 index 000000000000..daea048deddf --- /dev/null +++ b/tvix/store/src/tests/mod.rs @@ -0,0 +1,3 @@ +pub mod fixtures; +mod nar_renderer; +pub mod utils; diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs new file mode 100644 index 000000000000..65059fe9c025 --- /dev/null +++ b/tvix/store/src/tests/nar_renderer.rs @@ -0,0 +1,231 @@ +use crate::nar::calculate_size_and_sha256; +use crate::nar::write_nar; +use crate::tests::fixtures::*; +use crate::tests::utils::*; +use sha2::{Digest, Sha256}; +use std::io; +use std::sync::Arc; +use tokio::io::sink; +use tvix_castore::blobservice::BlobService; +use tvix_castore::directoryservice::DirectoryService; +use tvix_castore::proto as castorepb; + +#[tokio::test] +async fn single_symlink() { + let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); + let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); + let mut buf: Vec<u8> = vec![]; + + write_nar( + &mut buf, + &castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "doesntmatter".into(), + target: "/nix/store/somewhereelse".into(), + }), + // don't put anything in the stores, as we don't actually do any requests. + blob_service, + directory_service, + ) + .await + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_SYMLINK.to_vec()); +} + +/// Make sure the NARRenderer fails if a referred blob doesn't exist. +#[tokio::test] +async fn single_file_missing_blob() { + let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); + let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); + + let e = write_nar( + sink(), + &castorepb::node::Node::File(castorepb::FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: false, + }), + // the blobservice is empty intentionally, to provoke the error. + blob_service, + directory_service, + ) + .await + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::NotFound, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } +} + +/// Make sure the NAR Renderer fails if the returned blob meta has another size +/// than specified in the proto node. +#[tokio::test] +async fn single_file_wrong_blob_size() { + let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); + + // insert blob into the store + let mut writer = blob_service.open_write().await; + tokio::io::copy( + &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .await + .unwrap(); + assert_eq!( + HELLOWORLD_BLOB_DIGEST.clone(), + writer.close().await.unwrap() + ); + + let bs = blob_service.clone(); + // Test with a root FileNode of a too big size + { + let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); + let e = write_nar( + sink(), + &castorepb::node::Node::File(castorepb::FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 42, // <- note the wrong size here! + executable: false, + }), + bs, + directory_service, + ) + .await + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::UnexpectedEof, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } + } + + let bs = blob_service.clone(); + // Test with a root FileNode of a too small size + { + let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); + let e = write_nar( + sink(), + &castorepb::node::Node::File(castorepb::FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 2, // <- note the wrong size here! + executable: false, + }), + bs, + directory_service, + ) + .await + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::InvalidInput, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } + } +} + +#[tokio::test] +async fn single_file() { + let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); + let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); + + // insert blob into the store + let mut writer = blob_service.open_write().await; + tokio::io::copy(&mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS), &mut writer) + .await + .unwrap(); + + assert_eq!( + HELLOWORLD_BLOB_DIGEST.clone(), + writer.close().await.unwrap() + ); + + let mut buf: Vec<u8> = vec![]; + + write_nar( + &mut buf, + &castorepb::node::Node::File(castorepb::FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: false, + }), + blob_service, + directory_service, + ) + .await + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec()); +} + +#[tokio::test] +async fn test_complicated() { + let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); + let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into(); + + // put all data into the stores. + // insert blob into the store + let mut writer = blob_service.open_write().await; + tokio::io::copy(&mut io::Cursor::new(EMPTY_BLOB_CONTENTS), &mut writer) + .await + .unwrap(); + assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap()); + + directory_service + .put(DIRECTORY_WITH_KEEP.clone()) + .await + .unwrap(); + directory_service + .put(DIRECTORY_COMPLICATED.clone()) + .await + .unwrap(); + + let mut buf: Vec<u8> = vec![]; + + let bs = blob_service.clone(); + let ds = directory_service.clone(); + + write_nar( + &mut buf, + &castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + bs, + ds, + ) + .await + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec()); + + // ensure calculate_nar does return the correct sha256 digest and sum. + let bs = blob_service.clone(); + let ds = directory_service.clone(); + let (nar_size, nar_digest) = calculate_size_and_sha256( + &castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + bs, + ds, + ) + .await + .expect("must succeed"); + + assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size); + let d = Sha256::digest(NAR_CONTENTS_COMPLICATED.clone()); + assert_eq!(d.as_slice(), nar_digest); +} diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs new file mode 100644 index 000000000000..040b7ee7f51f --- /dev/null +++ b/tvix/store/src/tests/utils.rs @@ -0,0 +1,16 @@ +use crate::pathinfoservice::{MemoryPathInfoService, PathInfoService}; +use std::sync::Arc; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; + +pub use tvix_castore::utils::*; + +pub fn gen_pathinfo_service<BS, DS>( + blob_service: BS, + directory_service: DS, +) -> Arc<dyn PathInfoService> +where + BS: AsRef<dyn BlobService> + Send + Sync + 'static, + DS: AsRef<dyn DirectoryService> + Send + Sync + 'static, +{ + Arc::new(MemoryPathInfoService::new(blob_service, directory_service)) +} diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs new file mode 100644 index 000000000000..041a9e683d59 --- /dev/null +++ b/tvix/store/src/utils.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; + +use tvix_castore::{ + blobservice::{self, BlobService}, + directoryservice::{self, DirectoryService}, +}; + +use crate::pathinfoservice::{self, PathInfoService}; + +/// Construct the three store handles from their addrs. +pub async fn construct_services( + blob_service_addr: impl AsRef<str>, + directory_service_addr: impl AsRef<str>, + path_info_service_addr: impl AsRef<str>, +) -> std::io::Result<( + Arc<dyn BlobService>, + Arc<dyn DirectoryService>, + Box<dyn PathInfoService>, +)> { + let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref()) + .await? + .into(); + let directory_service: Arc<dyn DirectoryService> = + directoryservice::from_addr(directory_service_addr.as_ref()) + .await? + .into(); + let path_info_service = pathinfoservice::from_addr( + path_info_service_addr.as_ref(), + blob_service.clone(), + directory_service.clone(), + ) + .await?; + + Ok((blob_service, directory_service, path_info_service)) +} |