diff options
Diffstat (limited to 'tvix/store')
35 files changed, 5302 insertions, 0 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml new file mode 100644 index 000000000000..28dad300fe20 --- /dev/null +++ b/tvix/store/Cargo.toml @@ -0,0 +1,86 @@ +[package] +name = "tvix-store" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.68" +async-compression = { version = "0.4.9", features = ["tokio", "bzip2", "gzip", "xz", "zstd"]} +async-stream = "0.3.5" +blake3 = { version = "1.3.1", features = ["rayon", "std"] } +bstr = "1.6.0" +bytes = "1.4.0" +clap = { version = "4.0", features = ["derive", "env"] } +count-write = "0.1.0" +data-encoding = "2.3.3" +futures = "0.3.30" +lazy_static = "1.4.0" +nix-compat = { path = "../nix-compat", features = ["async"] } +pin-project-lite = "0.2.13" +prost = "0.12.1" +opentelemetry = { version = "0.22.0", optional = true} +opentelemetry-otlp = { version = "0.15.0", optional = true } +opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"], optional = true} +serde = { version = "1.0.197", features = [ "derive" ] } +serde_json = "1.0" +serde_with = "3.7.0" +serde_qs = "0.12.0" +sha2 = "0.10.6" +sled = { version = "0.34.7" } +thiserror = "1.0.38" +tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } +tokio-listener = { version = "0.4.1", features = [ "tonic011" ] } +tokio-stream = { version = "0.1.14", features = ["fs"] } +tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } +tonic = { version = "0.11.0", features = ["tls", "tls-roots"] } +tower = "0.4.13" +tracing = "0.1.37" +tracing-opentelemetry = "0.23.0" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +tvix-castore = { path = "../castore" } +url = "2.4.0" +walkdir = "2.4.0" +reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false } +lru = "0.12.3" +parking_lot = "0.12.2" +indicatif = "0.17.8" +tracing-indicatif = "0.3.6" + +[dependencies.tonic-reflection] +optional = true +version = "0.11.0" + +[dependencies.bigtable_rs] +optional = true +# https://github.com/liufuyang/bigtable_rs/pull/72 +git = "https://github.com/flokli/bigtable_rs" +rev = "0af404741dfc40eb9fa99cf4d4140a09c5c20df7" + +[build-dependencies] +prost-build = "0.12.1" +tonic-build = "0.11.0" + +[dev-dependencies] +async-process = "2.1.0" +rstest = "0.19.0" +rstest_reuse = "0.6.0" +tempfile = "3.3.0" +tokio-retry = "0.3.0" + +[features] +default = ["cloud", "fuse", "otlp", "tonic-reflection"] +cloud = [ + "dep:bigtable_rs", + "tvix-castore/cloud" +] +fuse = ["tvix-castore/fuse"] +otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"] +tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"] +virtiofs = ["tvix-castore/virtiofs"] +# Whether to run the integration tests. +# Requires the following packages in $PATH: +# cbtemulator, google-cloud-bigtable-tool +integration = [] + +[lints] +workspace = true diff --git a/tvix/store/README.md b/tvix/store/README.md new file mode 100644 index 000000000000..a9d29671d8bb --- /dev/null +++ b/tvix/store/README.md @@ -0,0 +1,63 @@ +# //tvix/store + +This contains the code hosting the tvix-store. + +For the local store, Nix realizes files on the filesystem in `/nix/store` (and +maintains some metadata in a SQLite database). For "remote stores", it +communicates this metadata in NAR (Nix ARchive) and NARInfo format. + +Compared to the Nix model, `tvix-store` stores data on a much more granular +level than that, which provides more deduplication possibilities, and more +granular copying. + +However, enough information is preserved to still be able to render NAR and +NARInfo when needed. + +## More Information +The store consists out of two different gRPC services, `tvix.castore.v1` for +the low-level content-addressed bits, and `tvix.store.v1` for the Nix and +`StorePath`-specific bits. + +Check the `protos/` subfolder both here and in `castore` for the definition of +the exact RPC methods and messages. + +## Interacting with the GRPC service manually +The shell environment in `//tvix` provides `evans`, which is an interactive +REPL-based gPRC client. + +You can use it to connect to a `tvix-store` and call the various RPC methods. + +```shell +$ cargo run -- daemon & +$ evans --host localhost --port 8000 -r repl + ______ + | ____| + | |__ __ __ __ _ _ __ ___ + | __| \ \ / / / _. | | '_ \ / __| + | |____ \ V / | (_| | | | | | \__ \ + |______| \_/ \__,_| |_| |_| |___/ + + more expressive universal gRPC client + + +localhost:8000> package tvix.castore.v1 +tvix.castore.v1@localhost:8000> service BlobService + +tvix.castore.v1.BlobService@localhost:8000> call Put --bytes-from-file +data (TYPE_BYTES) => /run/current-system/system +{ + "digest": "KOM3/IHEx7YfInAnlJpAElYezq0Sxn9fRz7xuClwNfA=" +} + +tvix.castore.v1.BlobService@localhost:8000> call Read --bytes-as-base64 +digest (TYPE_BYTES) => KOM3/IHEx7YfInAnlJpAElYezq0Sxn9fRz7xuClwNfA= +{ + "data": "eDg2XzY0LWxpbnV4" +} + +$ echo eDg2XzY0LWxpbnV4 | base64 -d +x86_64-linux +``` + +Thanks to `tvix-store` providing gRPC Server Reflection (with `reflection` +feature), you don't need to point `evans` to the `.proto` files. diff --git a/tvix/store/build.rs b/tvix/store/build.rs new file mode 100644 index 000000000000..809fa29578b5 --- /dev/null +++ b/tvix/store/build.rs @@ -0,0 +1,38 @@ +use std::io::Result; + +fn main() -> Result<()> { + #[allow(unused_mut)] + let mut builder = tonic_build::configure(); + + #[cfg(feature = "tonic-reflection")] + { + let out_dir = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap()); + let descriptor_path = out_dir.join("tvix.store.v1.bin"); + + builder = builder.file_descriptor_set_path(descriptor_path); + }; + + // https://github.com/hyperium/tonic/issues/908 + let mut config = prost_build::Config::new(); + config.bytes(["."]); + config.extern_path(".tvix.castore.v1", "::tvix_castore::proto"); + + builder + .build_server(true) + .build_client(true) + .emit_rerun_if_changed(false) + .compile_with_config( + config, + &[ + "tvix/store/protos/pathinfo.proto", + "tvix/store/protos/rpc_pathinfo.proto", + ], + // If we are in running `cargo build` manually, using `../..` works fine, + // but in case we run inside a nix build, we need to instead point PROTO_ROOT + // to a sparseTree containing that structure. + &[match std::env::var_os("PROTO_ROOT") { + Some(proto_root) => proto_root.to_str().unwrap().to_owned(), + None => "../..".to_string(), + }], + ) +} diff --git a/tvix/store/default.nix b/tvix/store/default.nix new file mode 100644 index 000000000000..78b499114cae --- /dev/null +++ b/tvix/store/default.nix @@ -0,0 +1,56 @@ +{ depot, pkgs, lib, ... }: + +let + mkImportCheck = p: expectedPath: { + label = ":nix :import ${p} with tvix-store import"; + needsOutput = true; + command = pkgs.writeShellScript "tvix-import-check" '' + export BLOB_SERVICE_ADDR=memory:// + export DIRECTORY_SERVICE_ADDR=memory:// + export PATH_INFO_SERVICE_ADDR=memory:// + TVIX_STORE_OUTPUT=$(result/bin/tvix-store import ${p}) + EXPECTED='${/* the vebatim expected Tvix output: */expectedPath}' + + echo "tvix-store output: ''${TVIX_STORE_OUTPUT}" + if [ "$TVIX_STORE_OUTPUT" != "$EXPECTED" ]; then + echo "Correct would have been ''${EXPECTED}" + exit 1 + fi + + echo "Output was correct." + ''; + }; +in + +(depot.tvix.crates.workspaceMembers.tvix-store.build.override (old: { + runTests = true; + testPreRun = '' + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt + ''; + features = old.features + # virtiofs feature currently fails to build on Darwin + ++ lib.optional pkgs.stdenv.isLinux "virtiofs"; +})).overrideAttrs (old: rec { + meta.ci = { + targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru); + extraSteps.import-docs = (mkImportCheck "tvix/store/docs" ./docs); + }; + passthru = (depot.tvix.utils.mkFeaturePowerset { + inherit (old) crateName; + features = ([ "cloud" "fuse" "otlp" "tonic-reflection" ] + # virtiofs feature currently fails to build on Darwin + ++ lib.optional pkgs.stdenv.isLinux "virtiofs"); + override.testPreRun = '' + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt + ''; + }) // { + integration-tests = depot.tvix.crates.workspaceMembers.${old.crateName}.build.override (old: { + runTests = true; + testPreRun = '' + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; + export PATH="$PATH:${pkgs.lib.makeBinPath [ pkgs.cbtemulator pkgs.google-cloud-bigtable-tool ]}" + ''; + features = old.features ++ [ "integration" ]; + }); + }; +}) diff --git a/tvix/store/docs/api.md b/tvix/store/docs/api.md new file mode 100644 index 000000000000..c5a5c477aa17 --- /dev/null +++ b/tvix/store/docs/api.md @@ -0,0 +1,288 @@ +tvix-[ca]store API +============== + +This document outlines the design of the API exposed by tvix-castore and tvix- +store, as well as other implementations of this store protocol. + +This document is meant to be read side-by-side with +[castore.md](../../castore/docs/data-model.md) which describes the data model +in more detail. + +The store API has four main consumers: + +1. The evaluator (or more correctly, the CLI/coordinator, in the Tvix + case) communicates with the store to: + + * Upload files and directories (e.g. from `builtins.path`, or `src = ./path` + Nix expressions). + * Read files from the store where necessary (e.g. when `nixpkgs` is + located in the store, or for IFD). + +2. The builder communicates with the store to: + + * Upload files and directories after a build, to persist build artifacts in + the store. + +3. Tvix clients (such as users that have Tvix installed, or, depending + on perspective, builder environments) expect the store to + "materialise" on disk to provide a directory layout with store + paths. + +4. Stores may communicate with other stores, to substitute already built store + paths, i.e. a store acts as a binary cache for other stores. + +The store API attempts to reuse parts of its API between these three +consumers by making similarities explicit in the protocol. This leads +to a protocol that is slightly more complex than a simple "file +upload/download"-system, but at significantly greater efficiency, both in terms +of deduplication opportunities as well as granularity. + +## The Store model + +Contents inside a tvix-store can be grouped into three different message types: + + * Blobs + * Directories + * PathInfo (see further down) + +(check `castore.md` for more detailed field descriptions) + +### Blobs +A blob object contains the literal file contents of regular (or executable) +files. + +### Directory +A directory object describes the direct children of a directory. + +It contains: + - name of child (regular or executable) files, and their [blake3][blake3] hash. + - name of child symlinks, and their target (as string) + - name of child directories, and their [blake3][blake3] hash (forming a Merkle DAG) + +### Content-addressed Store Model +For example, lets consider a directory layout like this, with some +imaginary hashes of file contents: + +``` +. +├── file-1.txt hash: 5891b5b522d5df086d0ff0b110fb +└── nested + └── file-2.txt hash: abc6fd595fc079d3114d4b71a4d8 +``` + +A hash for the *directory* `nested` can be created by creating the `Directory` +object: + +```json +{ + "directories": [], + "files": [{ + "name": "file-2.txt", + "digest": "abc6fd595fc079d3114d4b71a4d8", + "size": 123, + }], + "symlink": [], +} +``` + +And then hashing a serialised form of that data structure. We use the blake3 +hash of the canonical protobuf representation. Let's assume the hash was +`ff0029485729bcde993720749232`. + +To create the directory object one layer up, we now refer to our `nested` +directory object in `directories`, and to `file-1.txt` in `files`: + +```json +{ + "directories": [{ + "name": "nested", + "digest": "ff0029485729bcde993720749232", + "size": 1, + }], + "files": [{ + "name": "file-1.txt", + "digest": "5891b5b522d5df086d0ff0b110fb", + "size": 124, + }] +} +``` + +This Merkle DAG of Directory objects, and flat store of blobs can be used to +describe any file/directory/symlink inside a store path. Due to its content- +addressed nature, it'll automatically deduplicate (re-)used (sub)directories, +and allow substitution from any (untrusted) source. + +The thing that's now only missing is the metadata to map/"mount" from the +content-addressed world to a physical path. + +### PathInfo +As most paths in the Nix store currently are input-addressed [^input-addressed], +and the `tvix-castore` data model is also not intrinsically using NAR hashes, +we need something mapping from an input-addressed "output path hash" (or a Nix- +specific content-addressed path) to the contents in the `tvix-castore` world. + +That's what `PathInfo` provides. It embeds the root node (Directory, File or +Symlink) at a given store path. + +The root nodes' `name` field is populated with the (base)name inside +`/nix/store`, so `xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-pname-1.2.3`. + +The `PathInfo` message also stores references to other store paths, and some +more NARInfo-specific metadata (signatures, narhash, narsize). + + +## API overview + +There's three different services: + +### BlobService +`BlobService` can be used to store and retrieve blobs of data, used to host +regular file contents. + +It is content-addressed, using [blake3][blake3] +as a hashing function. + +As blake3 is a tree hash, there's an opportunity to do +[verified streaming][bao] of parts of the file, +which doesn't need to trust any more information than the root hash itself. +Future extensions of the `BlobService` protocol will enable this. + +### DirectoryService +`DirectoryService` allows lookups (and uploads) of `Directory` messages, and +whole reference graphs of them. + + +### PathInfoService +The PathInfo service provides lookups from a store path hash to a `PathInfo` +message. + +## Example flows + +Below there are some common use cases of tvix-store, and how the different +services are used. + +### Upload files and directories +This is needed for `builtins.path` or `src = ./path` in Nix expressions (A), as +well as for uploading build artifacts to a store (B). + +The path specified needs to be (recursively, BFS-style) traversed. + * All file contents need to be hashed with blake3, and submitted to the + *BlobService* if not already present. + A reference to them needs to be added to the parent Directory object that's + constructed. + * All symlinks need to be added to the parent directory they reside in. + * Whenever a Directory has been fully traversed, it needs to be uploaded to + the *DirectoryService* and a reference to it needs to be added to the parent + Directory object. + +Most of the hashing / directory traversal/uploading can happen in parallel, +as long as Directory objects only refer to Directory objects and Blobs that +have already been uploaded. + +When reaching the root, a `PathInfo` object needs to be constructed. + + * In the case of content-addressed paths (A), the name of the root node is + based on the NAR representation of the contents. + It might make sense to be able to offload the NAR calculation to the store, + which can cache it. + * In the case of build artifacts (B), the output path is input-addressed and + known upfront. + +Contrary to Nix, this has the advantage of not having to upload a lot of things +to the store that didn't change. + +### Reading files from the store from the evaluator +This is the case when `nixpkgs` is located in the store, or IFD in general. + +The store client asks the `PathInfoService` for the `PathInfo` of the output +path in the request, and looks at the root node. + +If something other than the root of the store path is requested, like for +example `maintainers/maintainer-list.nix`, the root_node Directory is inspected +and potentially a chain of `Directory` objects requested from +*DirectoryService*. [^n+1query]. + +When the desired file is reached, the *BlobService* can be used to read the +contents of this file, and return it back to the evaluator. + +FUTUREWORK: define how importing from symlinks should/does work. + +Contrary to Nix, this has the advantage of not having to copy all of the +contents of a store path to the evaluating machine, but really only fetching +the files the evaluator currently cares about. + +### Materializing store paths on disk +This is useful for people running a Tvix-only system, or running builds on a +"Tvix remote builder" in its own mount namespace. + +In a system with Nix installed, we can't simply manually "extract" things to +`/nix/store`, as Nix assumes to own all writes to this location. +In these use cases, we're probably better off exposing a tvix-store as a local +binary cache (that's what `//tvix/nar-bridge-go` does). + +Assuming we are in an environment where we control `/nix/store` exclusively, a +"realize to disk" would either "extract" things from the `tvix-store` to a +filesystem, or expose a `FUSE`/`virtio-fs` filesystem. + +The latter is already implemented, and particularly interesting for (remote) +build workloads, as build inputs can be realized on-demand, which saves copying +around a lot of never- accessed files. + +In both cases, the API interactions are similar. + * The *PathInfoService* is asked for the `PathInfo` of the requested store path. + * If everything should be "extracted", the *DirectoryService* is asked for all + `Directory` objects in the closure, the file structure is created, all Blobs + are downloaded and placed in their corresponding location and all symlinks + are created accordingly. + * If this is a FUSE filesystem, we can decide to only request a subset, + similar to the "Reading files from the store from the evaluator" use case, + even though it might make sense to keep all Directory objects around. + (See the caveat in "Trust model" though!) + +### Stores communicating with other stores +The gRPC API exposed by the tvix-store allows composing multiple stores, and +implementing some caching strategies, that store clients don't need to be aware +of. + + * For example, a caching strategy could have a fast local tvix-store, that's + asked first and filled with data from a slower remote tvix-store. + + * Multiple stores could be asked for the same data, and whatever store returns + the right data first wins. + + +## Trust model / Distribution +As already described above, the only non-content-addressed service is the +`PathInfo` service. + +This means, all other messages (such as `Blob` and `Directory` messages) can be +substituted from many different, untrusted sources/mirrors, which will make +plugging in additional substitution strategies like IPFS, local network +neighbors super simple. That's also why it's living in the `tvix-castore` crate. + +As for `PathInfo`, we don't specify an additional signature mechanism yet, but +carry the NAR-based signatures from Nix along. + +This means, if we don't trust a remote `PathInfo` object, we currently need to +"stream" the NAR representation to validate these signatures. + +However, the slow part is downloading of NAR files, and considering we have +more granularity available, we might only need to download some small blobs, +rather than a whole NAR file. + +A future signature mechanism, that is only signing (parts of) the `PathInfo` +message, which only points to content-addressed data will enable verified +partial access into a store path, opening up opportunities for lazy filesystem +access etc. + + + +[blake3]: https://github.com/BLAKE3-team/BLAKE3 +[bao]: https://github.com/oconnor663/bao +[^input-addressed]: Nix hashes the A-Term representation of a .drv, after doing + some replacements on refered Input Derivations to calculate + output paths. +[^n+1query]: This would expose an N+1 query problem. However it's not a problem + in practice, as there's usually always a "local" caching store in + the loop, and *DirectoryService* supports a recursive lookup for + all `Directory` children of a `Directory` diff --git a/tvix/store/protos/LICENSE b/tvix/store/protos/LICENSE new file mode 100644 index 000000000000..2034ada6fd9a --- /dev/null +++ b/tvix/store/protos/LICENSE @@ -0,0 +1,21 @@ +Copyright © The Tvix Authors + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +“Software”), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + diff --git a/tvix/store/protos/default.nix b/tvix/store/protos/default.nix new file mode 100644 index 000000000000..56345d9338b2 --- /dev/null +++ b/tvix/store/protos/default.nix @@ -0,0 +1,55 @@ +{ depot, pkgs, ... }: +let + protos = depot.nix.sparseTree { + name = "store-protos"; + root = depot.path.origSrc; + paths = [ + # We need to include castore.proto (only), as it's referred. + ../../castore/protos/castore.proto + ./pathinfo.proto + ./rpc_pathinfo.proto + ../../../buf.yaml + ../../../buf.gen.yaml + ]; + }; +in +depot.nix.readTree.drvTargets { + inherit protos; + + # Lints and ensures formatting of the proto files. + check = pkgs.stdenv.mkDerivation { + name = "proto-check"; + src = protos; + + nativeBuildInputs = [ + pkgs.buf + ]; + + buildPhase = '' + export HOME=$TMPDIR + buf lint + buf format -d --exit-code + touch $out + ''; + }; + + # Produces the golang bindings. + go-bindings = pkgs.stdenv.mkDerivation { + name = "go-bindings"; + src = protos; + + nativeBuildInputs = [ + pkgs.buf + pkgs.protoc-gen-go + pkgs.protoc-gen-go-grpc + ]; + + buildPhase = '' + export HOME=$TMPDIR + buf generate + + mkdir -p $out + cp tvix/store/protos/*.pb.go $out/ + ''; + }; +} diff --git a/tvix/store/protos/pathinfo.proto b/tvix/store/protos/pathinfo.proto new file mode 100644 index 000000000000..b03e7e938e33 --- /dev/null +++ b/tvix/store/protos/pathinfo.proto @@ -0,0 +1,128 @@ +// SPDX-License-Identifier: MIT +// Copyright © 2022 The Tvix Authors +syntax = "proto3"; + +package tvix.store.v1; + +import "tvix/castore/protos/castore.proto"; + +option go_package = "code.tvl.fyi/tvix/store-go;storev1"; + +// PathInfo shows information about a Nix Store Path. +// That's a single element inside /nix/store. +message PathInfo { + // The path can be a directory, file or symlink. + tvix.castore.v1.Node node = 1; + + // List of references (output path hashes) + // This really is the raw *bytes*, after decoding nixbase32, and not a + // base32-encoded string. + repeated bytes references = 2; + + // see below. + NARInfo narinfo = 3; +} + +// Represents a path in the Nix store (a direct child of STORE_DIR). +// It is commonly formatted by a nixbase32-encoding the digest, and +// concatenating the name, separated by a `-`. +message StorePath { + // The string after digest and `-`. + string name = 1; + + // The digest (20 bytes). + bytes digest = 2; +} + +// Nix C++ uses NAR (Nix Archive) as a format to transfer store paths, +// and stores metadata and signatures in NARInfo files. +// Store all these attributes in a separate message. +// +// This is useful to render .narinfo files to clients, or to preserve/validate +// these signatures. +// As verifying these signatures requires the whole NAR file to be synthesized, +// moving to another signature scheme is desired. +// Even then, it still makes sense to hold this data, for old clients. +message NARInfo { + // This represents a (parsed) signature line in a .narinfo file. + message Signature { + string name = 1; + bytes data = 2; + } + + // This size of the NAR file, in bytes. + uint64 nar_size = 1; + + // The sha256 of the NAR file representation. + bytes nar_sha256 = 2; + + // The signatures in a .narinfo file. + repeated Signature signatures = 3; + + // A list of references. To validate .narinfo signatures, a fingerprint needs + // to be constructed. + // This fingerprint doesn't just contain the hashes of the output paths of all + // references (like PathInfo.references), but their whole (base)names, so we + // need to keep them somewhere. + repeated string reference_names = 4; + + // The StorePath of the .drv file producing this output. + // The .drv suffix is omitted in its `name` field. + StorePath deriver = 5; + + // The CA field in the .narinfo. + // Its textual representations seen in the wild are one of the following: + // - `fixed:r:sha256:1gcky5hlf5vqfzpyhihydmm54grhc94mcs8w7xr8613qsqb1v2j6` + // fixed-output derivations using "recursive" `outputHashMode`. + // - `fixed:sha256:19xqkh72crbcba7flwxyi3n293vav6d7qkzkh2v4zfyi4iia8vj8 + // fixed-output derivations using "flat" `outputHashMode` + // - `text:sha256:19xqkh72crbcba7flwxyi3n293vav6d7qkzkh2v4zfyi4iia8vj8` + // Text hashing, used for uploaded .drv files and outputs produced by + // builtins.toFile. + // + // Semantically, they can be split into the following components: + // - "content address prefix". Currently, "fixed" and "text" are supported. + // - "hash mode". Currently, "flat" and "recursive" are supported. + // - "hash type". The underlying hash function used. + // Currently, sha1, md5, sha256, sha512. + // - "digest". The digest itself. + // + // There are some restrictions on the possible combinations. + // For example, `text` and `fixed:recursive` always imply sha256. + // + // We use an enum to encode the possible combinations, and optimize for the + // common case, `fixed:recursive`, identified as `NAR_SHA256`. + CA ca = 6; + + message CA { + enum Hash { + // produced when uploading fixed-output store paths using NAR-based + // hashing (`outputHashMode = "recursive"`). + NAR_SHA256 = 0; + NAR_SHA1 = 1; + NAR_SHA512 = 2; + NAR_MD5 = 3; + + // Produced when uploading .drv files or outputs produced by + // builtins.toFile. + // Produces equivalent digests as FLAT_SHA256, but is a separate + // hashing type in Nix, affecting output path calculation. + TEXT_SHA256 = 4; + + // Produced when using fixed-output derivations with + // `outputHashMode = "flat"`. + FLAT_SHA1 = 5; + FLAT_MD5 = 6; + FLAT_SHA256 = 7; + FLAT_SHA512 = 8; + + // TODO: what happens in Rust if we introduce a new enum kind here? + } + + // The hashing type used. + Hash type = 1; + + // The digest, in raw bytes. + bytes digest = 2; + } +} diff --git a/tvix/store/protos/rpc_pathinfo.proto b/tvix/store/protos/rpc_pathinfo.proto new file mode 100644 index 000000000000..c1c91658ada2 --- /dev/null +++ b/tvix/store/protos/rpc_pathinfo.proto @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: MIT +// Copyright © 2022 The Tvix Authors +syntax = "proto3"; + +package tvix.store.v1; + +import "tvix/castore/protos/castore.proto"; +import "tvix/store/protos/pathinfo.proto"; + +option go_package = "code.tvl.fyi/tvix/store-go;storev1"; + +service PathInfoService { + // Return a PathInfo message matching the criteria specified in the + // GetPathInfoRequest message. + rpc Get(GetPathInfoRequest) returns (PathInfo); + + // Upload a PathInfo object to the remote end. It MUST not return until the + // PathInfo object has been written on the the remote end. + // + // The remote end MAY check if a potential DirectoryNode has already been + // uploaded. + // + // Uploading clients SHOULD obviously not steer other machines to try to + // substitute before from the remote end before having finished uploading + // PathInfo, Directories and Blobs. + // The returned PathInfo object MAY contain additional narinfo signatures, but + // is otherwise left untouched. + rpc Put(PathInfo) returns (PathInfo); + + // Calculate the NAR representation of the contents specified by the + // root_node. The calculation SHOULD be cached server-side for subsequent + // requests. + // + // All references (to blobs or Directory messages) MUST already exist in the + // store. + // + // The method can be used to produce a Nix fixed-output path, which contains + // the (compressed) sha256 of the NAR content representation in the root_node + // name (suffixed with the name). + // + // It can also be used to calculate arbitrary NAR hashes of output paths, in + // case a legacy Nix Binary Cache frontend is provided. + rpc CalculateNAR(tvix.castore.v1.Node) returns (CalculateNARResponse); + + // Return a stream of PathInfo messages matching the criteria specified in + // ListPathInfoRequest. + rpc List(ListPathInfoRequest) returns (stream PathInfo); +} + +// The parameters that can be used to lookup a (single) PathInfo object. +// Currently, only a lookup by output hash is supported. +message GetPathInfoRequest { + oneof by_what { + // The output hash of a nix path (20 bytes). + // This is the nixbase32-decoded portion of a Nix output path, so to substitute + // /nix/store/xm35nga2g20mz5sm5l6n8v3bdm86yj83-cowsay-3.04 + // this field would contain nixbase32dec("xm35nga2g20mz5sm5l6n8v3bdm86yj83"). + bytes by_output_hash = 1; + } +} + +// The parameters that can be used to lookup (multiple) PathInfo objects. +// Currently no filtering is possible, all objects are returned. +message ListPathInfoRequest {} + +// CalculateNARResponse is the response returned by the CalculateNAR request. +// +// It contains the size of the NAR representation (in bytes), and the sha56 +// digest. +message CalculateNARResponse { + // This size of the NAR file, in bytes. + uint64 nar_size = 1; + + // The sha256 of the NAR file representation. + bytes nar_sha256 = 2; +} diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs new file mode 100644 index 000000000000..2a2d6fe6f7d0 --- /dev/null +++ b/tvix/store/src/bin/tvix-store.rs @@ -0,0 +1,617 @@ +use clap::Parser; +use clap::Subcommand; + +use futures::future::try_join_all; +use futures::StreamExt; +use futures::TryStreamExt; +use indicatif::ProgressStyle; +use nix_compat::path_info::ExportedPathInfo; +use serde::Deserialize; +use serde::Serialize; +use std::path::PathBuf; +use std::sync::Arc; +use tokio_listener::Listener; +use tokio_listener::SystemOptions; +use tokio_listener::UserOptions; +use tonic::transport::Server; +use tracing::info; +use tracing::info_span; +use tracing::instrument; +use tracing::Level; +use tracing::Span; +use tracing_indicatif::filter::IndicatifFilter; +use tracing_indicatif::{span_ext::IndicatifSpanExt, IndicatifLayer}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; +use tvix_castore::import::fs::ingest_path; +use tvix_store::nar::NarCalculationService; +use tvix_store::proto::NarInfo; +use tvix_store::proto::PathInfo; + +use tvix_castore::proto::blob_service_server::BlobServiceServer; +use tvix_castore::proto::directory_service_server::DirectoryServiceServer; +use tvix_castore::proto::GRPCBlobServiceWrapper; +use tvix_castore::proto::GRPCDirectoryServiceWrapper; +use tvix_store::pathinfoservice::PathInfoService; +use tvix_store::proto::path_info_service_server::PathInfoServiceServer; +use tvix_store::proto::GRPCPathInfoServiceWrapper; + +use lazy_static::lazy_static; + +// FUTUREWORK: move this to tracing crate +lazy_static! { + pub static ref PB_PROGRESS_STYLE: ProgressStyle = ProgressStyle::with_template( + "{span_child_prefix}{bar:30} {wide_msg} [{elapsed_precise}] {pos:>7}/{len:7}" + ) + .expect("invalid progress template"); + pub static ref PB_SPINNER_STYLE: ProgressStyle = ProgressStyle::with_template( + "{span_child_prefix}{spinner} {wide_msg} [{elapsed_precise}] {pos:>7}/{len:7}" + ) + .expect("invalid progress template"); +} + +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +use tvix_store::pathinfoservice::make_fs; + +#[cfg(feature = "fuse")] +use tvix_castore::fs::fuse::FuseDaemon; + +#[cfg(feature = "otlp")] +use opentelemetry::KeyValue; +#[cfg(feature = "otlp")] +use opentelemetry_sdk::{ + resource::{ResourceDetector, SdkProvidedResourceDetector}, + trace::BatchConfig, + Resource, +}; + +#[cfg(feature = "virtiofs")] +use tvix_castore::fs::virtiofs::start_virtiofs_daemon; + +#[cfg(feature = "tonic-reflection")] +use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET; +#[cfg(feature = "tonic-reflection")] +use tvix_store::proto::FILE_DESCRIPTOR_SET; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// Whether to configure OTLP. Set --otlp=false to disable. + #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))] + otlp: bool, + + /// A global log level to use when printing logs. + /// It's also possible to set `RUST_LOG` according to + /// `tracing_subscriber::filter::EnvFilter`, which will always have + /// priority. + #[arg(long)] + log_level: Option<Level>, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Runs the tvix-store daemon. + Daemon { + #[arg(long, short = 'l')] + listen_address: Option<String>, + + #[arg( + long, + env, + default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store" + )] + blob_service_addr: String, + + #[arg( + long, + env, + default_value = "sled:///var/lib/tvix-store/directories.sled" + )] + directory_service_addr: String, + + #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")] + path_info_service_addr: String, + }, + /// Imports a list of paths into the store, print the store path for each of them. + Import { + #[clap(value_name = "PATH")] + paths: Vec<PathBuf>, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + }, + + /// Copies a list of store paths on the system into tvix-store. + Copy { + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// A path pointing to a JSON file produced by the Nix + /// `__structuredAttrs` containing reference graph information provided + /// by the `exportReferencesGraph` feature. + /// + /// This can be used to invoke tvix-store inside a Nix derivation + /// copying to a Tvix store (or outside, if the JSON file is copied + /// out). + /// + /// Currently limited to the `closure` key inside that JSON file. + #[arg(value_name = "NIX_ATTRS_JSON_FILE", env = "NIX_ATTRS_JSON_FILE")] + reference_graph_path: PathBuf, + }, + /// Mounts a tvix-store at the given mountpoint + #[cfg(feature = "fuse")] + Mount { + #[clap(value_name = "PATH")] + dest: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// Number of FUSE threads to spawn. + #[arg(long, env, default_value_t = default_threads())] + threads: usize, + + #[arg(long, env, default_value_t = false)] + /// Whether to configure the mountpoint with allow_other. + /// Requires /etc/fuse.conf to contain the `user_allow_other` + /// option, configured via `programs.fuse.userAllowOther` on NixOS. + allow_other: bool, + + /// Whether to list elements at the root of the mount point. + /// This is useful if your PathInfoService doesn't provide an + /// (exhaustive) listing. + #[clap(long, short, action)] + list_root: bool, + + #[arg(long, default_value_t = true)] + /// Whether to expose blob and directory digests as extended attributes. + show_xattr: bool, + }, + /// Starts a tvix-store virtiofs daemon at the given socket path. + #[cfg(feature = "virtiofs")] + #[command(name = "virtiofs")] + VirtioFs { + #[clap(value_name = "PATH")] + socket: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// Whether to list elements at the root of the mount point. + /// This is useful if your PathInfoService doesn't provide an + /// (exhaustive) listing. + #[clap(long, short, action)] + list_root: bool, + + #[arg(long, default_value_t = true)] + /// Whether to expose blob and directory digests as extended attributes. + show_xattr: bool, + }, +} + +#[cfg(all(feature = "fuse", not(target_os = "macos")))] +fn default_threads() -> usize { + std::thread::available_parallelism() + .map(|threads| threads.into()) + .unwrap_or(4) +} +// On MacFUSE only a single channel will receive ENODEV when the file system is +// unmounted and so all the other channels will block forever. +// See https://github.com/osxfuse/osxfuse/issues/974 +#[cfg(all(feature = "fuse", target_os = "macos"))] +fn default_threads() -> usize { + 1 +} + +#[tokio::main] +#[instrument(fields(indicatif.pb_show=1))] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + let cli = Cli::parse(); + + // configure log settings + let level = cli.log_level.unwrap_or(Level::INFO); + + let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone()); + + // Set up the tracing subscriber. + let subscriber = tracing_subscriber::registry() + .with( + tracing_subscriber::fmt::Layer::new() + .with_writer(indicatif_layer.get_stderr_writer()) + .compact() + .with_filter( + EnvFilter::builder() + .with_default_directive(level.into()) + .from_env() + .expect("invalid RUST_LOG"), + ), + ) + .with(indicatif_layer.with_filter( + // only show progress for spans with indicatif.pb_show field being set + IndicatifFilter::new(false), + )); + + // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI) + // then init the registry. + // If the feature is feature-flagged out, just init without adding the layer. + // It's necessary to do this separately, as every with() call chains the + // layer into the type of the registry. + #[cfg(feature = "otlp")] + { + let subscriber = if cli.otlp { + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) + .with_batch_config(BatchConfig::default()) + .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ + // use SdkProvidedResourceDetector.detect to detect resources, + // but replace the default service name with our default. + // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 + let resources = + SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); + // SdkProvidedResourceDetector currently always sets + // `service.name`, but we don't like its default. + if resources.get("service.name".into()).unwrap() == "unknown_service".into() { + resources.merge(&Resource::new([KeyValue::new( + "service.name", + "tvix.store", + )])) + } else { + resources + } + })) + .install_batch(opentelemetry_sdk::runtime::Tokio)?; + + // Create a tracing layer with the configured tracer + let layer = tracing_opentelemetry::layer().with_tracer(tracer); + + subscriber.with(Some(layer)) + } else { + subscriber.with(None) + }; + + subscriber.try_init()?; + } + + // Init the registry (when otlp is not enabled) + #[cfg(not(feature = "otlp"))] + { + subscriber.try_init()?; + } + + match cli.command { + Commands::Daemon { + listen_address, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + } => { + // initialize stores + let (blob_service, directory_service, path_info_service, nar_calculation_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + let listen_address = listen_address + .unwrap_or_else(|| "[::]:8000".to_string()) + .parse() + .unwrap(); + + let mut server = Server::builder(); + + #[allow(unused_mut)] + let mut router = server + .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( + blob_service, + ))) + .add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::new(directory_service), + )) + .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( + Arc::from(path_info_service), + nar_calculation_service, + ))); + + #[cfg(feature = "tonic-reflection")] + { + let reflection_svc = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build()?; + router = router.add_service(reflection_svc); + } + + info!(listen_address=%listen_address, "starting daemon"); + + let listener = Listener::bind( + &listen_address, + &SystemOptions::default(), + &UserOptions::default(), + ) + .await?; + + router.serve_with_incoming(listener).await?; + } + Commands::Import { + paths, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + } => { + // FUTUREWORK: allow flat for single files? + let (blob_service, directory_service, path_info_service, nar_calculation_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + // Arc PathInfoService and NarCalculationService, as we clone it . + let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + let nar_calculation_service: Arc<dyn NarCalculationService> = + nar_calculation_service.into(); + + let root_span = { + let s = Span::current(); + s.pb_set_style(&PB_PROGRESS_STYLE); + s.pb_set_message("Importing paths"); + s.pb_set_length(paths.len() as u64); + s.pb_start(); + s + }; + + let tasks = paths + .into_iter() + .map(|path| { + let paths_span = root_span.clone(); + tokio::task::spawn({ + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + let path_info_service = path_info_service.clone(); + let nar_calculation_service = nar_calculation_service.clone(); + + async move { + if let Ok(name) = tvix_store::import::path_to_name(&path) { + let resp = tvix_store::import::import_path_as_nar_ca( + &path, + name, + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + ) + .await; + if let Ok(output_path) = resp { + // If the import was successful, print the path to stdout. + println!("{}", output_path.to_absolute_path()); + } + } + paths_span.pb_inc(1); + } + }) + }) + .collect::<Vec<_>>(); + + try_join_all(tasks).await?; + } + Commands::Copy { + blob_service_addr, + directory_service_addr, + path_info_service_addr, + reference_graph_path, + } => { + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + // Parse the file at reference_graph_path. + let reference_graph_json = tokio::fs::read(&reference_graph_path).await?; + + #[derive(Deserialize, Serialize)] + struct ReferenceGraph<'a> { + #[serde(borrow)] + closure: Vec<ExportedPathInfo<'a>>, + } + + let reference_graph: ReferenceGraph<'_> = + serde_json::from_slice(reference_graph_json.as_slice())?; + + // Arc the PathInfoService, as we clone it . + let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + + let lookups_span = info_span!( + "lookup pathinfos", + "indicatif.pb_show" = tracing::field::Empty + ); + lookups_span.pb_set_length(reference_graph.closure.len() as u64); + lookups_span.pb_set_style(&PB_PROGRESS_STYLE); + lookups_span.pb_start(); + + // From our reference graph, lookup all pathinfos that might exist. + let elems: Vec<_> = futures::stream::iter(reference_graph.closure) + .map(|elem| { + let path_info_service = path_info_service.clone(); + async move { + let resp = path_info_service + .get(*elem.path.digest()) + .await + .map(|resp| (elem, resp)); + + Span::current().pb_inc(1); + resp + } + }) + .buffer_unordered(50) + // Filter out all that are already uploaded. + // TODO: check if there's a better combinator for this + .try_filter_map(|(elem, path_info)| { + std::future::ready(if path_info.is_none() { + Ok(Some(elem)) + } else { + Ok(None) + }) + }) + .try_collect() + .await?; + + // Run ingest_path on all of them. + let uploads: Vec<_> = futures::stream::iter(elems) + .map(|elem| { + // Map to a future returning the root node, alongside with the closure info. + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + async move { + // Ingest the given path. + + ingest_path( + blob_service, + directory_service, + PathBuf::from(elem.path.to_absolute_path()), + ) + .await + .map(|root_node| (elem, root_node)) + } + }) + .buffer_unordered(10) + .try_collect() + .await?; + + // Insert them into the PathInfoService. + // FUTUREWORK: do this properly respecting the reference graph. + for (elem, root_node) in uploads { + // Create and upload a PathInfo pointing to the root_node, + // annotated with information we have from the reference graph. + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node), + }), + references: Vec::from_iter( + elem.references.iter().map(|e| e.digest().to_vec().into()), + ), + narinfo: Some(NarInfo { + nar_size: elem.nar_size, + nar_sha256: elem.nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: Vec::from_iter( + elem.references.iter().map(|e| e.to_string()), + ), + deriver: None, + ca: None, + }), + }; + + path_info_service.put(path_info).await?; + } + } + #[cfg(feature = "fuse")] + Commands::Mount { + dest, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + list_root, + threads, + allow_other, + show_xattr, + } => { + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + let mut fuse_daemon = tokio::task::spawn_blocking(move || { + let fs = make_fs( + blob_service, + directory_service, + Arc::from(path_info_service), + list_root, + show_xattr, + ); + info!(mount_path=?dest, "mounting"); + + FuseDaemon::new(fs, &dest, threads, allow_other) + }) + .await??; + + // grab a handle to unmount the file system, and register a signal + // handler. + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + info!("interrupt received, unmounting…"); + tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; + info!("unmount occured, terminating…"); + Ok::<_, std::io::Error>(()) + }) + .await??; + } + #[cfg(feature = "virtiofs")] + Commands::VirtioFs { + socket, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + list_root, + show_xattr, + } => { + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + tokio::task::spawn_blocking(move || { + let fs = make_fs( + blob_service, + directory_service, + Arc::from(path_info_service), + list_root, + show_xattr, + ); + info!(socket_path=?socket, "starting virtiofs-daemon"); + + start_virtiofs_daemon(fs, socket) + }) + .await??; + } + }; + Ok(()) +} diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs new file mode 100644 index 000000000000..888380bca9a0 --- /dev/null +++ b/tvix/store/src/import.rs @@ -0,0 +1,183 @@ +use std::path::Path; +use tracing::{debug, instrument}; +use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, import::fs::ingest_path, + proto::node::Node, B3Digest, +}; + +use nix_compat::{ + nixhash::{CAHash, NixHash}, + store_path::{self, StorePath}, +}; + +use crate::{ + nar::NarCalculationService, + pathinfoservice::PathInfoService, + proto::{nar_info, NarInfo, PathInfo}, +}; + +impl From<CAHash> for nar_info::Ca { + fn from(value: CAHash) -> Self { + let hash_type: nar_info::ca::Hash = (&value).into(); + let digest: bytes::Bytes = value.hash().to_string().into(); + nar_info::Ca { + r#type: hash_type.into(), + digest, + } + } +} + +pub fn log_node(node: &Node, path: &Path) { + match node { + Node::Directory(directory_node) => { + debug!( + path = ?path, + name = ?directory_node.name, + digest = %B3Digest::try_from(directory_node.digest.clone()).unwrap(), + "import successful", + ) + } + Node::File(file_node) => { + debug!( + path = ?path, + name = ?file_node.name, + digest = %B3Digest::try_from(file_node.digest.clone()).unwrap(), + "import successful" + ) + } + Node::Symlink(symlink_node) => { + debug!( + path = ?path, + name = ?symlink_node.name, + target = ?symlink_node.target, + "import successful" + ) + } + } +} + +/// Transform a path into its base name and returns an [`std::io::Error`] if it is `..` or if the +/// basename is not valid unicode. +#[inline] +pub fn path_to_name(path: &Path) -> std::io::Result<&str> { + path.file_name() + .and_then(|file_name| file_name.to_str()) + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "path must not be .. and the basename valid unicode", + ) + }) +} + +/// Takes the NAR size, SHA-256 of the NAR representation, the root node and optionally +/// a CA hash information. +/// +/// Returns the path information object for a NAR-style object. +/// +/// This [`PathInfo`] can be further filled for signatures, deriver or verified for the expected +/// hashes. +#[inline] +pub fn derive_nar_ca_path_info( + nar_size: u64, + nar_sha256: [u8; 32], + ca: Option<CAHash>, + root_node: Node, +) -> PathInfo { + // assemble the [crate::proto::PathInfo] object. + PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node), + }), + // There's no reference scanning on path contents ingested like this. + references: vec![], + narinfo: Some(NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: vec![], + deriver: None, + ca: ca.map(|ca_hash| ca_hash.into()), + }), + } +} + +/// Ingest the given path `path` and register the resulting output path in the +/// [`PathInfoService`] as a recursive fixed output NAR. +#[instrument(skip_all, fields(store_name=name, path=?path), err)] +pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>( + path: P, + name: &str, + blob_service: BS, + directory_service: DS, + path_info_service: PS, + nar_calculation_service: NS, +) -> Result<StorePath, std::io::Error> +where + P: AsRef<Path> + std::fmt::Debug, + BS: BlobService + Clone, + DS: DirectoryService, + PS: AsRef<dyn PathInfoService>, + NS: NarCalculationService, +{ + let root_node = ingest_path(blob_service, directory_service, path.as_ref()) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + // Ask for the NAR size and sha256 + let (nar_size, nar_sha256) = nar_calculation_service.calculate_nar(&root_node).await?; + + // Calculate the output path. This might still fail, as some names are illegal. + // FUTUREWORK: express the `name` at the type level to be valid and move the conversion + // at the caller level. + let output_path = store_path::build_nar_based_store_path(&nar_sha256, name).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid name: {}", name), + ) + })?; + + // assemble a new root_node with a name that is derived from the nar hash. + let root_node = root_node.rename(output_path.to_string().into_bytes().into()); + log_node(&root_node, path.as_ref()); + + let path_info = derive_nar_ca_path_info( + nar_size, + nar_sha256, + Some(CAHash::Nar(NixHash::Sha256(nar_sha256))), + root_node, + ); + + // This new [`PathInfo`] that we get back from there might contain additional signatures or + // information set by the service itself. In this function, we silently swallow it because + // callers doesn't really need it. + let _path_info = path_info_service.as_ref().put(path_info).await?; + + Ok(output_path.to_owned()) +} + +#[cfg(test)] +mod tests { + use std::{ffi::OsStr, path::PathBuf}; + + use crate::import::path_to_name; + use rstest::rstest; + + #[rstest] + #[case::simple_path("a/b/c", "c")] + #[case::simple_path_containing_dotdot("a/b/../c", "c")] + #[case::path_containing_multiple_dotdot("a/b/../c/d/../e", "e")] + + fn test_path_to_name(#[case] path: &str, #[case] expected_name: &str) { + let path: PathBuf = path.into(); + assert_eq!(path_to_name(&path).expect("must succeed"), expected_name); + } + + #[rstest] + #[case::path_ending_in_dotdot(b"a/b/..")] + #[case::non_unicode_path(b"\xf8\xa1\xa1\xa1\xa1")] + fn test_invalid_path_to_name(#[case] invalid_path: &[u8]) { + let path: PathBuf = unsafe { OsStr::from_encoded_bytes_unchecked(invalid_path) }.into(); + path_to_name(&path).expect_err("must fail"); + } +} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs new file mode 100644 index 000000000000..8c32aaf885e8 --- /dev/null +++ b/tvix/store/src/lib.rs @@ -0,0 +1,14 @@ +pub mod import; +pub mod nar; +pub mod pathinfoservice; +pub mod proto; +pub mod utils; + +#[cfg(test)] +mod tests; + +// That's what the rstest_reuse README asks us do, and fails about being unable +// to find rstest_reuse in crate root. +#[cfg(test)] +#[allow(clippy::single_component_path_imports)] +use rstest_reuse; diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs new file mode 100644 index 000000000000..36122d419d00 --- /dev/null +++ b/tvix/store/src/nar/import.rs @@ -0,0 +1,237 @@ +use nix_compat::nar::reader::r#async as nar_reader; +use tokio::{io::AsyncBufRead, sync::mpsc, try_join}; +use tvix_castore::{ + blobservice::BlobService, + directoryservice::DirectoryService, + import::{ + blobs::{self, ConcurrentBlobUploader}, + ingest_entries, IngestionEntry, IngestionError, + }, + proto::{node::Node, NamedNode}, + PathBuf, +}; + +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// It returns the castore root node or an error. +pub async fn ingest_nar<R, BS, DS>( + blob_service: BS, + directory_service: DS, + r: &mut R, +) -> Result<Node, IngestionError<Error>> +where + R: AsyncBufRead + Unpin + Send, + BS: BlobService + Clone + 'static, + DS: DirectoryService, +{ + // open the NAR for reading. + // The NAR reader emits nodes in DFS preorder. + let root_node = nar_reader::open(r).await.map_err(Error::IO)?; + + let (tx, rx) = mpsc::channel(1); + let rx = tokio_stream::wrappers::ReceiverStream::new(rx); + + let produce = async move { + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); + + let res = produce_nar_inner( + &mut blob_uploader, + root_node, + "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT. + tx.clone(), + ) + .await; + + if let Err(err) = blob_uploader.join().await { + tx.send(Err(err.into())) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; + } + + tx.send(res) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; + + Ok(()) + }; + + let consume = ingest_entries(directory_service, rx); + + let (_, node) = try_join!(produce, consume)?; + + // remove the fake "root" name again + debug_assert_eq!(&node.get_name(), b"root"); + Ok(node.rename("".into())) +} + +async fn produce_nar_inner<BS>( + blob_uploader: &mut ConcurrentBlobUploader<BS>, + node: nar_reader::Node<'_, '_>, + path: PathBuf, + tx: mpsc::Sender<Result<IngestionEntry, Error>>, +) -> Result<IngestionEntry, Error> +where + BS: BlobService + Clone + 'static, +{ + Ok(match node { + nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target }, + nar_reader::Node::File { + executable, + mut reader, + } => { + let size = reader.len(); + let digest = blob_uploader.upload(&path, size, &mut reader).await?; + + IngestionEntry::Regular { + path, + size, + executable, + digest, + } + } + nar_reader::Node::Directory(mut dir_reader) => { + while let Some(entry) = dir_reader.next().await? { + let mut path = path.clone(); + + // valid NAR names are valid castore names + path.try_push(entry.name) + .expect("Tvix bug: failed to join name"); + + let entry = Box::pin(produce_nar_inner( + blob_uploader, + entry.node, + path, + tx.clone(), + )) + .await?; + + tx.send(Ok(entry)).await.map_err(|e| { + Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) + })?; + } + + IngestionEntry::Dir { path } + } + }) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + IO(#[from] std::io::Error), + + #[error(transparent)] + BlobUpload(#[from] blobs::Error), +} + +#[cfg(test)] +mod test { + use crate::nar::ingest_nar; + use std::io::Cursor; + use std::sync::Arc; + + use rstest::*; + use tokio_stream::StreamExt; + use tvix_castore::blobservice::BlobService; + use tvix_castore::directoryservice::DirectoryService; + use tvix_castore::fixtures::{ + DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS, + HELLOWORLD_BLOB_DIGEST, + }; + use tvix_castore::proto as castorepb; + + use crate::tests::fixtures::{ + blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD, + NAR_CONTENTS_SYMLINK, + }; + + #[rstest] + #[tokio::test] + async fn single_symlink( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) { + let root_node = ingest_nar( + blob_service, + directory_service, + &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), + ) + .await + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "".into(), // name must be empty + target: "/nix/store/somewhereelse".into(), + }), + root_node + ); + } + + #[rstest] + #[tokio::test] + async fn single_file( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) { + let root_node = ingest_nar( + blob_service.clone(), + directory_service, + &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), + ) + .await + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::File(castorepb::FileNode { + name: "".into(), // name must be empty + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: false, + }), + root_node + ); + + // blobservice must contain the blob + assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap()); + } + + #[rstest] + #[tokio::test] + async fn complicated( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) { + let root_node = ingest_nar( + blob_service.clone(), + directory_service.clone(), + &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), + ) + .await + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: "".into(), // name must be empty + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + root_node, + ); + + // blobservice must contain the blob + assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap()); + + // directoryservice must contain the directories, at least with get_recursive. + let resp: Result<Vec<castorepb::Directory>, _> = directory_service + .get_recursive(&DIRECTORY_COMPLICATED.digest()) + .collect() + .await; + + let directories = resp.unwrap(); + + assert_eq!(2, directories.len()); + assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]); + assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]); + } +} diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs new file mode 100644 index 000000000000..164748a655e8 --- /dev/null +++ b/tvix/store/src/nar/mod.rs @@ -0,0 +1,52 @@ +use tonic::async_trait; +use tvix_castore::B3Digest; + +mod import; +mod renderer; +pub use import::ingest_nar; +pub use renderer::calculate_size_and_sha256; +pub use renderer::write_nar; +pub use renderer::SimpleRenderer; +use tvix_castore::proto as castorepb; + +#[async_trait] +pub trait NarCalculationService: Send + Sync { + /// Return the nar size and nar sha256 digest for a given root node. + /// This can be used to calculate NAR-based output paths. + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error>; +} + +#[async_trait] +impl<A> NarCalculationService for A +where + A: AsRef<dyn NarCalculationService> + Send + Sync, +{ + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error> { + self.as_ref().calculate_nar(root_node).await + } +} + +/// Errors that can encounter while rendering NARs. +#[derive(Debug, thiserror::Error)] +pub enum RenderError { + #[error("failure talking to a backing store client: {0}")] + StoreError(#[source] std::io::Error), + + #[error("unable to find directory {}, referred from {:?}", .0, .1)] + DirectoryNotFound(B3Digest, bytes::Bytes), + + #[error("unable to find blob {}, referred from {:?}", .0, .1)] + BlobNotFound(B3Digest, bytes::Bytes), + + #[error("unexpected size in metadata for blob {}, referred from {:?} returned, expected {}, got {}", .0, .1, .2, .3)] + UnexpectedBlobMeta(B3Digest, bytes::Bytes, u32, u32), + + #[error("failure using the NAR writer: {0}")] + NARWriterError(std::io::Error), +} diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs new file mode 100644 index 000000000000..efd67671db70 --- /dev/null +++ b/tvix/store/src/nar/renderer.rs @@ -0,0 +1,222 @@ +use crate::utils::AsyncIoBridge; + +use super::{NarCalculationService, RenderError}; +use count_write::CountWrite; +use nix_compat::nar::writer::r#async as nar_writer; +use sha2::{Digest, Sha256}; +use tokio::io::{self, AsyncWrite, BufReader}; +use tonic::async_trait; +use tvix_castore::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{self as castorepb, NamedNode}, +}; + +pub struct SimpleRenderer<BS, DS> { + blob_service: BS, + directory_service: DS, +} + +impl<BS, DS> SimpleRenderer<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { + Self { + blob_service, + directory_service, + } + } +} + +#[async_trait] +impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS> +where + BS: BlobService + Clone, + DS: DirectoryService + Clone, +{ + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error> { + calculate_size_and_sha256( + root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .await + .map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e))) + } +} + +/// Invoke [write_nar], and return the size and sha256 digest of the produced +/// NAR output. +pub async fn calculate_size_and_sha256<BS, DS>( + root_node: &castorepb::node::Node, + blob_service: BS, + directory_service: DS, +) -> Result<(u64, [u8; 32]), RenderError> +where + BS: BlobService + Send, + DS: DirectoryService + Send, +{ + let mut h = Sha256::new(); + let mut cw = CountWrite::from(&mut h); + + write_nar( + // The hasher doesn't speak async. It doesn't + // actually do any I/O, so it's fine to wrap. + AsyncIoBridge(&mut cw), + root_node, + blob_service, + directory_service, + ) + .await?; + + Ok((cw.count(), h.finalize().into())) +} + +/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path, +/// and uses the passed blob_service and directory_service to perform the +/// necessary lookups as it traverses the structure. +/// The contents in NAR serialization are writen to the passed [AsyncWrite]. +pub async fn write_nar<W, BS, DS>( + mut w: W, + proto_root_node: &castorepb::node::Node, + blob_service: BS, + directory_service: DS, +) -> Result<(), RenderError> +where + W: AsyncWrite + Unpin + Send, + BS: BlobService + Send, + DS: DirectoryService + Send, +{ + // Initialize NAR writer + let nar_root_node = nar_writer::open(&mut w) + .await + .map_err(RenderError::NARWriterError)?; + + walk_node( + nar_root_node, + proto_root_node, + blob_service, + directory_service, + ) + .await?; + + Ok(()) +} + +/// Process an intermediate node in the structure. +/// This consumes the node. +async fn walk_node<BS, DS>( + nar_node: nar_writer::Node<'_, '_>, + proto_node: &castorepb::node::Node, + blob_service: BS, + directory_service: DS, +) -> Result<(BS, DS), RenderError> +where + BS: BlobService + Send, + DS: DirectoryService + Send, +{ + match proto_node { + castorepb::node::Node::Symlink(proto_symlink_node) => { + nar_node + .symlink(&proto_symlink_node.target) + .await + .map_err(RenderError::NARWriterError)?; + } + castorepb::node::Node::File(proto_file_node) => { + let digest_len = proto_file_node.digest.len(); + let digest = proto_file_node.digest.clone().try_into().map_err(|_| { + RenderError::StoreError(io::Error::new( + io::ErrorKind::Other, + format!("invalid digest len {} in file node", digest_len), + )) + })?; + + let mut blob_reader = match blob_service + .open_read(&digest) + .await + .map_err(RenderError::StoreError)? + { + Some(blob_reader) => Ok(BufReader::new(blob_reader)), + None => Err(RenderError::NARWriterError(io::Error::new( + io::ErrorKind::NotFound, + format!("blob with digest {} not found", &digest), + ))), + }?; + + nar_node + .file( + proto_file_node.executable, + proto_file_node.size, + &mut blob_reader, + ) + .await + .map_err(RenderError::NARWriterError)?; + } + castorepb::node::Node::Directory(proto_directory_node) => { + let digest_len = proto_directory_node.digest.len(); + let digest = proto_directory_node + .digest + .clone() + .try_into() + .map_err(|_| { + RenderError::StoreError(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid digest len {} in directory node", digest_len), + )) + })?; + + // look it up with the directory service + match directory_service + .get(&digest) + .await + .map_err(|e| RenderError::StoreError(e.into()))? + { + // if it's None, that's an error! + None => Err(RenderError::DirectoryNotFound( + digest, + proto_directory_node.name.clone(), + ))?, + Some(proto_directory) => { + // start a directory node + let mut nar_node_directory = nar_node + .directory() + .await + .map_err(RenderError::NARWriterError)?; + + // We put blob_service, directory_service back here whenever we come up from + // the recursion. + let mut blob_service = blob_service; + let mut directory_service = directory_service; + + // for each node in the directory, create a new entry with its name, + // and then recurse on that entry. + for proto_node in proto_directory.nodes() { + let child_node = nar_node_directory + .entry(proto_node.get_name()) + .await + .map_err(RenderError::NARWriterError)?; + + (blob_service, directory_service) = Box::pin(walk_node( + child_node, + &proto_node, + blob_service, + directory_service, + )) + .await?; + } + + // close the directory + nar_node_directory + .close() + .await + .map_err(RenderError::NARWriterError)?; + + return Ok((blob_service, directory_service)); + } + } + } + } + + Ok((blob_service, directory_service)) +} diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs new file mode 100644 index 000000000000..707a686c0a54 --- /dev/null +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -0,0 +1,412 @@ +use super::PathInfoService; +use crate::proto; +use crate::proto::PathInfo; +use async_stream::try_stream; +use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2}; +use bytes::Bytes; +use data_encoding::HEXLOWER; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use prost::Message; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DurationSeconds}; +use tonic::async_trait; +use tracing::{instrument, trace}; +use tvix_castore::Error; + +/// There should not be more than 10 MiB in a single cell. +/// https://cloud.google.com/bigtable/docs/schema-design#cells +const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; + +/// Provides a [DirectoryService] implementation using +/// [Bigtable](https://cloud.google.com/bigtable/docs/) +/// as an underlying K/V store. +/// +/// # Data format +/// We use Bigtable as a plain K/V store. +/// The row key is the digest of the store path, in hexlower. +/// Inside the row, we currently have a single column/cell, again using the +/// hexlower store path digest. +/// Its value is the PathInfo message, serialized in canonical protobuf. +/// We currently only populate this column. +/// +/// Listing is ranging over all rows, and calculate_nar is returning a +/// "unimplemented" error. +#[derive(Clone)] +pub struct BigtablePathInfoService { + client: bigtable::BigTable, + params: BigtableParameters, + + #[cfg(test)] + #[allow(dead_code)] + /// Holds the temporary directory containing the unix socket, and the + /// spawned emulator process. + emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, +} + +/// Represents configuration of [BigtablePathInfoService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option<DurationSeconds<String>>")] + #[serde(default = "default_timeout")] + timeout: Option<std::time::Duration>, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +impl BigtableParameters { + #[cfg(test)] + pub fn default_for_tests() -> Self { + Self { + project_id: "project-1".into(), + instance_name: "instance-1".into(), + is_read_only: false, + channel_size: default_channel_size(), + timeout: default_timeout(), + table_name: "table-1".into(), + family_name: "cf1".into(), + app_profile_id: default_app_profile_id(), + } + } +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option<std::time::Duration> { + Some(std::time::Duration::from_secs(4)) +} + +impl BigtablePathInfoService { + #[cfg(not(test))] + pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + let connection = bigtable::BigTableConnection::new( + ¶ms.project_id, + ¶ms.instance_name, + params.is_read_only, + params.channel_size, + params.timeout, + ) + .await?; + + Ok(Self { + client: connection.client(), + params, + }) + } + + #[cfg(test)] + pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + use std::time::Duration; + + use async_process::{Command, Stdio}; + use tempfile::TempDir; + use tokio_retry::{strategy::ExponentialBackoff, Retry}; + + let tmpdir = TempDir::new().unwrap(); + + let socket_path = tmpdir.path().join("cbtemulator.sock"); + + let emulator_process = Command::new("cbtemulator") + .arg("-address") + .arg(socket_path.clone()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .expect("failed to spawn emulator"); + + Retry::spawn( + ExponentialBackoff::from_millis(20) + .max_delay(Duration::from_secs(1)) + .take(3), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // populate the emulator + for cmd in &[ + vec!["createtable", ¶ms.table_name], + vec!["createfamily", ¶ms.table_name, ¶ms.family_name], + ] { + Command::new("cbt") + .args({ + let mut args = vec![ + "-instance", + ¶ms.instance_name, + "-project", + ¶ms.project_id, + ]; + args.extend_from_slice(cmd); + args + }) + .env( + "BIGTABLE_EMULATOR_HOST", + format!("unix://{}", socket_path.to_string_lossy()), + ) + .output() + .await + .expect("failed to run cbt setup command"); + } + + let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator( + &format!("unix://{}", socket_path.to_string_lossy()), + ¶ms.project_id, + ¶ms.instance_name, + false, + None, + )?; + + Ok(Self { + client: connection.client(), + params, + emulator: (tmpdir, emulator_process).into(), + }) + } +} + +/// Derives the row/column key for a given output path. +/// We use hexlower encoding, also because it can't be misinterpreted as RE2. +fn derive_pathinfo_key(digest: &[u8; 20]) -> String { + HEXLOWER.encode(digest) +} + +#[async_trait] +impl PathInfoService for BigtablePathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let mut client = self.client.clone(); + let path_info_key = derive_pathinfo_key(&digest); + + let request = bigtable_v2::ReadRowsRequest { + app_profile_id: self.params.app_profile_id.to_string(), + table_name: client.get_full_table_name(&self.params.table_name), + rows_limit: 1, + rows: Some(bigtable_v2::RowSet { + row_keys: vec![path_info_key.clone().into()], + row_ranges: vec![], + }), + // Filter selected family name, and column qualifier matching the digest. + // The latter is to ensure we don't fail once we start adding more metadata. + filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::Chain( + bigtable_v2::row_filter::Chain { + filters: vec![ + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::FamilyNameRegexFilter( + self.params.family_name.to_string(), + ), + ), + }, + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + path_info_key.clone().into(), + ), + ), + }, + ], + }, + )), + }), + ..Default::default() + }; + + let mut response = client + .read_rows(request) + .await + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + + if response.len() != 1 { + if response.len() > 1 { + // This shouldn't happen, we limit number of rows to 1 + return Err(Error::StorageError( + "got more than one row from bigtable".into(), + )); + } + // else, this is simply a "not found". + return Ok(None); + } + + let (row_key, mut cells) = response.pop().unwrap(); + if row_key != path_info_key.as_bytes() { + // This shouldn't happen, we requested this row key. + return Err(Error::StorageError( + "got wrong row key from bigtable".into(), + )); + } + + let cell = cells + .pop() + .ok_or_else(|| Error::StorageError("found no cells".into()))?; + + // Ensure there's only one cell (so no more left after the pop()) + // This shouldn't happen, We filter out other cells in our query. + if !cells.is_empty() { + return Err(Error::StorageError( + "more than one cell returned from bigtable".into(), + )); + } + + // We also require the qualifier to be correct in the filter above, + // so this shouldn't happen. + if path_info_key.as_bytes() != cell.qualifier { + return Err(Error::StorageError("unexpected cell qualifier".into())); + } + + // Try to parse the value into a PathInfo message + let path_info = proto::PathInfo::decode(Bytes::from(cell.value)) + .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?; + + let store_path = path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?; + + if store_path.digest() != &digest { + return Err(Error::StorageError("PathInfo has unexpected digest".into())); + } + + Ok(Some(path_info)) + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("pathinfo failed validation: {}", e)))?; + + let mut client = self.client.clone(); + let path_info_key = derive_pathinfo_key(store_path.digest()); + + let data = path_info.encode_to_vec(); + if data.len() as u64 > CELL_SIZE_LIMIT { + return Err(Error::StorageError( + "PathInfo exceeds cell limit on Bigtable".into(), + )); + } + + let resp = client + .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest { + table_name: client.get_full_table_name(&self.params.table_name), + app_profile_id: self.params.app_profile_id.to_string(), + row_key: path_info_key.clone().into(), + predicate_filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + path_info_key.clone().into(), + )), + }), + // If the column was already found, do nothing. + true_mutations: vec![], + // Else, do the insert. + false_mutations: vec![ + // https://cloud.google.com/bigtable/docs/writes + bigtable_v2::Mutation { + mutation: Some(bigtable_v2::mutation::Mutation::SetCell( + bigtable_v2::mutation::SetCell { + family_name: self.params.family_name.to_string(), + column_qualifier: path_info_key.clone().into(), + timestamp_micros: -1, // use server time to fill timestamp + value: data, + }, + )), + }, + ], + }) + .await + .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?; + + if resp.predicate_matched { + trace!("already existed") + } + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let mut client = self.client.clone(); + + let request = bigtable_v2::ReadRowsRequest { + app_profile_id: self.params.app_profile_id.to_string(), + table_name: client.get_full_table_name(&self.params.table_name), + filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter( + self.params.family_name.to_string(), + )), + }), + ..Default::default() + }; + + let stream = try_stream! { + // TODO: add pagination, we don't want to hold all of this in memory. + let response = client + .read_rows(request) + .await + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + + for (row_key, mut cells) in response { + let cell = cells + .pop() + .ok_or_else(|| Error::StorageError("found no cells".into()))?; + + // The cell must have the same qualifier as the row key + if row_key != cell.qualifier { + Err(Error::StorageError("unexpected cell qualifier".into()))?; + } + + // Ensure there's only one cell (so no more left after the pop()) + // This shouldn't happen, We filter out other cells in our query. + if !cells.is_empty() { + + Err(Error::StorageError( + "more than one cell returned from bigtable".into(), + ))? + } + + // Try to parse the value into a PathInfo message. + let path_info = proto::PathInfo::decode(Bytes::from(cell.value)) + .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?; + + // Validate the containing PathInfo, ensure its StorePath digest + // matches row key. + let store_path = path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?; + + if store_path.digest().as_slice() != row_key.as_slice() { + Err(Error::StorageError("PathInfo has unexpected digest".into()))? + } + + + yield path_info + } + }; + + Box::pin(stream) + } +} diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs new file mode 100644 index 000000000000..664144ef494b --- /dev/null +++ b/tvix/store/src/pathinfoservice/combinators.rs @@ -0,0 +1,111 @@ +use crate::proto::PathInfo; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use tonic::async_trait; +use tracing::{debug, instrument}; +use tvix_castore::Error; + +use super::PathInfoService; + +/// Asks near first, if not found, asks far. +/// If found in there, returns it, and *inserts* it into +/// near. +/// There is no negative cache. +/// Inserts and listings are not implemented for now. +pub struct Cache<PS1, PS2> { + near: PS1, + far: PS2, +} + +impl<PS1, PS2> Cache<PS1, PS2> { + pub fn new(near: PS1, far: PS2) -> Self { + Self { near, far } + } +} + +#[async_trait] +impl<PS1, PS2> PathInfoService for Cache<PS1, PS2> +where + PS1: PathInfoService, + PS2: PathInfoService, +{ + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + match self.near.get(digest).await? { + Some(path_info) => { + debug!("serving from cache"); + Ok(Some(path_info)) + } + None => { + debug!("not found in near, asking remote…"); + match self.far.get(digest).await? { + None => Ok(None), + Some(path_info) => { + debug!("found in remote, adding to cache"); + self.near.put(path_info.clone()).await?; + Ok(Some(path_info)) + } + } + } + } + } + + async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> { + Err(Error::StorageError("unimplemented".to_string())) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + Box::pin(tokio_stream::once(Err(Error::StorageError( + "unimplemented".to_string(), + )))) + } +} + +#[cfg(test)] +mod test { + use std::num::NonZeroUsize; + + use crate::{ + pathinfoservice::{LruPathInfoService, MemoryPathInfoService, PathInfoService}, + tests::fixtures::PATH_INFO_WITH_NARINFO, + }; + + const PATH_INFO_DIGEST: [u8; 20] = [0; 20]; + + /// Helper function setting up an instance of a "far" and "near" + /// PathInfoService. + async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, MemoryPathInfoService> { + // Create an instance of a "far" PathInfoService. + let far = MemoryPathInfoService::default(); + + // … and an instance of a "near" PathInfoService. + let near = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap()); + + // create a Pathinfoservice combining the two and return it. + super::Cache::new(near, far) + } + + /// Getting from the far backend is gonna insert it into the near one. + #[tokio::test] + async fn test_populate_cache() { + let svc = create_pathinfoservice().await; + + // query the PathInfo, things should not be there. + assert!(svc.get(PATH_INFO_DIGEST).await.unwrap().is_none()); + + // insert it into the far one. + svc.far.put(PATH_INFO_WITH_NARINFO.clone()).await.unwrap(); + + // now try getting it again, it should succeed. + assert_eq!( + Some(PATH_INFO_WITH_NARINFO.clone()), + svc.get(PATH_INFO_DIGEST).await.unwrap() + ); + + // peek near, it should now be there. + assert_eq!( + Some(PATH_INFO_WITH_NARINFO.clone()), + svc.near.get(PATH_INFO_DIGEST).await.unwrap() + ); + } +} diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs new file mode 100644 index 000000000000..455909e7f235 --- /dev/null +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -0,0 +1,236 @@ +use crate::proto::path_info_service_client::PathInfoServiceClient; + +use super::{ + GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService, + SledPathInfoService, +}; + +use nix_compat::narinfo; +use std::sync::Arc; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; +use url::Url; + +/// Constructs a new instance of a [PathInfoService] from an URI. +/// +/// The following URIs are supported: +/// - `memory:` +/// Uses a in-memory implementation. +/// - `sled:` +/// Uses a in-memory sled implementation. +/// - `sled:///absolute/path/to/somewhere` +/// Uses sled, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. +/// - `nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=` +/// Exposes the Nix binary cache as a PathInfoService, ingesting NARs into the +/// {Blob,Directory}Service. You almost certainly want to use this with some cache. +/// The `trusted-public-keys` URL parameter can be provided, which will then +/// enable signature verification. +/// - `grpc+unix:///absolute/path/to/somewhere` +/// Connects to a local tvix-store gRPC service via Unix socket. +/// - `grpc+http://host:port`, `grpc+https://host:port` +/// Connects to a (remote) tvix-store gRPC service. +/// +/// As the [PathInfoService] needs to talk to [BlobService] and [DirectoryService], +/// these also need to be passed in. +pub async fn from_addr( + uri: &str, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) -> Result<Box<dyn PathInfoService>, Error> { + #[allow(unused_mut)] + let mut url = + Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; + + let path_info_service: Box<dyn PathInfoService> = match url.scheme() { + "memory" => { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string())); + } + Box::<MemoryPathInfoService>::default() + } + "sled" => { + // sled doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string())); + } + + if url.path() == "/" { + return Err(Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )); + } + + // TODO: expose other parameters as URL parameters? + + Box::new(if url.path().is_empty() { + SledPathInfoService::new_temporary() + .map_err(|e| Error::StorageError(e.to_string()))? + } else { + SledPathInfoService::new(url.path()) + .map_err(|e| Error::StorageError(e.to_string()))? + }) + } + "nix+http" | "nix+https" => { + // Stringify the URL and remove the nix+ prefix. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap(); + + let mut nix_http_path_info_service = + NixHTTPPathInfoService::new(new_url, blob_service, directory_service); + + let pairs = &url.query_pairs(); + for (k, v) in pairs.into_iter() { + if k == "trusted-public-keys" { + let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect(); + + let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len()); + for pubkey_str in pubkey_strs { + pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| { + Error::StorageError(format!("invalid public key: {e}")) + })?); + } + + nix_http_path_info_service.set_public_keys(pubkeys); + } + } + + Box::new(nix_http_path_info_service) + } + scheme if scheme.starts_with("grpc+") => { + // schemes starting with grpc+ go to the GRPCPathInfoService. + // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + let client = + PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?); + Box::new(GRPCPathInfoService::from_client(client)) + } + #[cfg(feature = "cloud")] + "bigtable" => { + use super::bigtable::BigtableParameters; + use super::BigtablePathInfoService; + + // parse the instance name from the hostname. + let instance_name = url + .host_str() + .ok_or_else(|| Error::StorageError("instance name missing".into()))? + .to_string(); + + // … but add it to the query string now, so we just need to parse that. + url.query_pairs_mut() + .append_pair("instance_name", &instance_name); + + let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + + Box::new( + BigtablePathInfoService::connect(params) + .await + .map_err(|e| Error::StorageError(e.to_string()))?, + ) + } + _ => Err(Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))?, + }; + + Ok(path_info_service) +} + +#[cfg(test)] +mod tests { + use super::from_addr; + use lazy_static::lazy_static; + use rstest::rstest; + use std::sync::Arc; + use tempfile::TempDir; + use tvix_castore::{ + blobservice::{BlobService, MemoryBlobService}, + directoryservice::{DirectoryService, MemoryDirectoryService}, + }; + + lazy_static! { + static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); + } + + // the gRPC tests below don't fail, because we connect lazily. + + #[rstest] + /// This uses a unsupported scheme. + #[case::unsupported_scheme("http://foo.example/test", false)] + /// This configures sled in temporary mode. + #[case::sled_temporary("sled://", true)] + /// This configures sled with /, which should fail. + #[case::sled_invalid_root("sled:///", false)] + /// This configures sled with a host, not path, which should fail. + #[case::sled_invalid_host("sled://foo.example", false)] + /// This configures sled with a valid path path, which should succeed. + #[case::sled_valid_path(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true)] + /// This configures sled with a host, and a valid path path, which should fail. + #[case::sled_invalid_host_with_valid_path(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false)] + /// This correctly sets the scheme, and doesn't set a path. + #[case::memory_valid("memory://", true)] + /// This sets a memory url host to `foo` + #[case::memory_invalid_host("memory://foo", false)] + /// This sets a memory url path to "/", which is invalid. + #[case::memory_invalid_root_path("memory:///", false)] + /// This sets a memory url path to "/foo", which is invalid. + #[case::memory_invalid_root_path_foo("memory:///foo", false)] + /// Correct Scheme for the cache.nixos.org binary cache. + #[case::correct_nix_https("nix+https://cache.nixos.org", true)] + /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL). + #[case::correct_nix_http("nix+http://cache.nixos.org", true)] + /// Correct Scheme for Nix HTTP Binary cache, with a subpath. + #[case::correct_nix_http_with_subpath("nix+http://192.0.2.1/foo", true)] + /// Correct Scheme for Nix HTTP Binary cache, with a subpath and port. + #[case::correct_nix_http_with_subpath_and_port("nix+http://[::1]:8080/foo", true)] + /// Correct Scheme for the cache.nixos.org binary cache, and correct trusted public key set + #[case::correct_nix_https_with_trusted_public_key("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", true)] + /// Correct Scheme for the cache.nixos.org binary cache, and two correct trusted public keys set + #[case::correct_nix_https_with_two_trusted_public_keys("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=%20foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", true)] + /// Correct scheme to connect to a unix socket. + #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)] + /// Correct scheme for unix socket, but setting a host too, which is invalid. + #[case::grpc_invalid_unix_socket_and_host("grpc+unix://host.example/path/to/somewhere", false)] + /// Correct scheme to connect to localhost, with port 12345 + #[case::grpc_valid_ipv6_localhost_port_12345("grpc+http://[::1]:12345", true)] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[case::grpc_valid_http_host_without_port("grpc+http://localhost", true)] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)] + /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. + #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] + /// A valid example for Bigtable. + #[cfg_attr( + all(feature = "cloud", feature = "integration"), + case::bigtable_valid( + "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1", + true + ) + )] + /// An invalid example for Bigtable, missing fields + #[cfg_attr( + all(feature = "cloud", feature = "integration"), + case::bigtable_invalid_missing_fields("bigtable://instance-1", false) + )] + #[tokio::test] + async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { + let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default()); + let directory_service: Arc<dyn DirectoryService> = + Arc::from(MemoryDirectoryService::default()); + + let resp = from_addr(uri_str, blob_service, directory_service).await; + + if exp_succeed { + resp.expect("should succeed"); + } else { + assert!(resp.is_err(), "should fail"); + } + } +} diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs new file mode 100644 index 000000000000..aa64b1c01f16 --- /dev/null +++ b/tvix/store/src/pathinfoservice/fs/mod.rs @@ -0,0 +1,81 @@ +use futures::stream::BoxStream; +use futures::StreamExt; +use tonic::async_trait; +use tvix_castore::fs::{RootNodes, TvixStoreFs}; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; + +use super::PathInfoService; + +/// Helper to construct a [TvixStoreFs] from a [BlobService], [DirectoryService] +/// and [PathInfoService]. +/// This avoids users to have to interact with the wrapper struct directly, as +/// it leaks into the type signature of TvixStoreFS. +pub fn make_fs<BS, DS, PS>( + blob_service: BS, + directory_service: DS, + path_info_service: PS, + list_root: bool, + show_xattr: bool, +) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>> +where + BS: AsRef<dyn BlobService> + Send + Clone + 'static, + DS: AsRef<dyn DirectoryService> + Send + Clone + 'static, + PS: AsRef<dyn PathInfoService> + Send + Sync + Clone + 'static, +{ + TvixStoreFs::new( + blob_service, + directory_service, + RootNodesWrapper(path_info_service), + list_root, + show_xattr, + ) +} + +/// Wrapper to satisfy Rust's orphan rules for trait implementations, as +/// RootNodes is coming from the [tvix-castore] crate. +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct RootNodesWrapper<T>(pub(crate) T); + +/// Implements root node lookup for any [PathInfoService]. This represents a flat +/// directory structure like /nix/store where each entry in the root filesystem +/// directory corresponds to a CA node. +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +#[async_trait] +impl<T> RootNodes for RootNodesWrapper<T> +where + T: AsRef<dyn PathInfoService> + Send + Sync, +{ + async fn get_by_basename(&self, name: &[u8]) -> Result<Option<castorepb::node::Node>, Error> { + let Ok(store_path) = nix_compat::store_path::StorePath::from_bytes(name) else { + return Ok(None); + }; + + Ok(self + .0 + .as_ref() + .get(*store_path.digest()) + .await? + .map(|path_info| { + path_info + .node + .expect("missing root node") + .node + .expect("empty node") + })) + } + + fn list(&self) -> BoxStream<Result<castorepb::node::Node, Error>> { + Box::pin(self.0.as_ref().list().map(|result| { + result.map(|path_info| { + path_info + .node + .expect("missing root node") + .node + .expect("empty node") + }) + })) + } +} diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs new file mode 100644 index 000000000000..f6a356cf180a --- /dev/null +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -0,0 +1,154 @@ +use super::PathInfoService; +use crate::{ + nar::NarCalculationService, + proto::{self, ListPathInfoRequest, PathInfo}, +}; +use async_stream::try_stream; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use tonic::{async_trait, transport::Channel, Code}; +use tracing::instrument; +use tvix_castore::{proto as castorepb, Error}; + +/// Connects to a (remote) tvix-store PathInfoService over gRPC. +#[derive(Clone)] +pub struct GRPCPathInfoService { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, +} + +impl GRPCPathInfoService { + /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + ) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl PathInfoService for GRPCPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let path_info = self + .grpc_client + .clone() + .get(proto::GetPathInfoRequest { + by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( + digest.to_vec().into(), + )), + }) + .await; + + match path_info { + Ok(path_info) => { + let path_info = path_info.into_inner(); + + path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid pathinfo: {}", e)))?; + + Ok(Some(path_info)) + } + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + let path_info = self + .grpc_client + .clone() + .put(path_info) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + Ok(path_info) + } + + #[instrument(level = "trace", skip_all)] + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let mut grpc_client = self.grpc_client.clone(); + + let stream = try_stream! { + let resp = grpc_client.list(ListPathInfoRequest::default()).await; + + let mut stream = resp.map_err(|e| Error::StorageError(e.to_string()))?.into_inner(); + + loop { + match stream.message().await { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + Err(Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))?; + } + yield pathinfo + } + None => { + return; + }, + }, + Err(e) => Err(Error::StorageError(e.to_string()))?, + } + } + }; + + Box::pin(stream) + } +} + +#[async_trait] +impl NarCalculationService for GRPCPathInfoService { + #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))] + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + let path_info = self + .grpc_client + .clone() + .calculate_nar(castorepb::Node { + node: Some(root_node.clone()), + }) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + let nar_sha256: [u8; 32] = path_info + .nar_sha256 + .to_vec() + .try_into() + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + + Ok((path_info.nar_size, nar_sha256)) + } +} + +#[cfg(test)] +mod tests { + use crate::pathinfoservice::tests::make_grpc_path_info_service_client; + use crate::pathinfoservice::PathInfoService; + use crate::tests::fixtures; + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let (_blob_service, _directory_service, path_info_service) = + make_grpc_path_info_service_client().await; + + let path_info = path_info_service + .get(fixtures::DUMMY_PATH_DIGEST) + .await + .expect("must not be error"); + + assert!(path_info.is_none()); + } +} diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs new file mode 100644 index 000000000000..da674f497ad6 --- /dev/null +++ b/tvix/store/src/pathinfoservice/lru.rs @@ -0,0 +1,128 @@ +use async_stream::try_stream; +use futures::stream::BoxStream; +use lru::LruCache; +use nix_compat::nixbase32; +use std::num::NonZeroUsize; +use std::sync::Arc; +use tokio::sync::RwLock; +use tonic::async_trait; +use tracing::instrument; + +use crate::proto::PathInfo; +use tvix_castore::Error; + +use super::PathInfoService; + +pub struct LruPathInfoService { + lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>, +} + +impl LruPathInfoService { + pub fn with_capacity(capacity: NonZeroUsize) -> Self { + Self { + lru: Arc::new(RwLock::new(LruCache::new(capacity))), + } + } +} + +#[async_trait] +impl PathInfoService for LruPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + Ok(self.lru.write().await.get(&digest).cloned()) + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // call validate + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("invalid PathInfo: {}", e)))?; + + self.lru + .write() + .await + .put(*store_path.digest(), path_info.clone()); + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let lru = self.lru.clone(); + Box::pin(try_stream! { + let lru = lru.read().await; + let it = lru.iter(); + + for (_k,v) in it { + yield v.clone() + } + }) + } +} + +#[cfg(test)] +mod test { + use std::num::NonZeroUsize; + + use crate::{ + pathinfoservice::{LruPathInfoService, PathInfoService}, + proto::PathInfo, + tests::fixtures::PATH_INFO_WITH_NARINFO, + }; + use lazy_static::lazy_static; + use tvix_castore::proto as castorepb; + + lazy_static! { + static ref PATHINFO_1: PathInfo = PATH_INFO_WITH_NARINFO.clone(); + static ref PATHINFO_1_DIGEST: [u8; 20] = [0; 20]; + static ref PATHINFO_2: PathInfo = { + let mut p = PATHINFO_1.clone(); + let root_node = p.node.as_mut().unwrap(); + if let castorepb::Node { node: Some(node) } = root_node { + let n = node.to_owned(); + *node = n.rename("11111111111111111111111111111111-dummy2".into()); + } else { + unreachable!() + } + p + }; + static ref PATHINFO_2_DIGEST: [u8; 20] = *(PATHINFO_2.validate().unwrap()).digest(); + } + + #[tokio::test] + async fn evict() { + let svc = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap()); + + // pathinfo_1 should not be there + assert!(svc + .get(*PATHINFO_1_DIGEST) + .await + .expect("no error") + .is_none()); + + // insert it + svc.put(PATHINFO_1.clone()).await.expect("no error"); + + // now it should be there. + assert_eq!( + Some(PATHINFO_1.clone()), + svc.get(*PATHINFO_1_DIGEST).await.expect("no error") + ); + + // insert pathinfo_2. This will evict pathinfo 1 + svc.put(PATHINFO_2.clone()).await.expect("no error"); + + // now pathinfo 2 should be there. + assert_eq!( + Some(PATHINFO_2.clone()), + svc.get(*PATHINFO_2_DIGEST).await.expect("no error") + ); + + // … but pathinfo 1 not anymore. + assert!(svc + .get(*PATHINFO_1_DIGEST) + .await + .expect("no error") + .is_none()); + } +} diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs new file mode 100644 index 000000000000..3de3221df27e --- /dev/null +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -0,0 +1,61 @@ +use super::PathInfoService; +use crate::proto::PathInfo; +use async_stream::try_stream; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; +use tonic::async_trait; +use tracing::instrument; +use tvix_castore::Error; + +#[derive(Default)] +pub struct MemoryPathInfoService { + db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>, +} + +#[async_trait] +impl PathInfoService for MemoryPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let db = self.db.read().await; + + match db.get(&digest) { + None => Ok(None), + Some(path_info) => Ok(Some(path_info.clone())), + } + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + match path_info.validate() { + Err(e) => Err(Error::InvalidRequest(format!( + "failed to validate PathInfo: {}", + e + ))), + + // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. + // This overwrites existing PathInfo objects. + Ok(nix_path) => { + let mut db = self.db.write().await; + db.insert(*nix_path.digest(), path_info.clone()); + + Ok(path_info) + } + } + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let db = self.db.clone(); + + Box::pin(try_stream! { + let db = db.read().await; + let it = db.iter(); + + for (_k, v) in it { + yield v.clone() + } + }) + } +} diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs new file mode 100644 index 000000000000..574bcc0b8b88 --- /dev/null +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -0,0 +1,73 @@ +mod combinators; +mod from_addr; +mod grpc; +mod lru; +mod memory; +mod nix_http; +mod sled; + +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +mod fs; + +#[cfg(test)] +mod tests; + +use futures::stream::BoxStream; +use tonic::async_trait; +use tvix_castore::Error; + +use crate::proto::PathInfo; + +pub use self::combinators::Cache as CachePathInfoService; +pub use self::from_addr::from_addr; +pub use self::grpc::GRPCPathInfoService; +pub use self::lru::LruPathInfoService; +pub use self::memory::MemoryPathInfoService; +pub use self::nix_http::NixHTTPPathInfoService; +pub use self::sled::SledPathInfoService; + +#[cfg(feature = "cloud")] +mod bigtable; +#[cfg(feature = "cloud")] +pub use self::bigtable::BigtablePathInfoService; + +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +pub use self::fs::make_fs; + +/// The base trait all PathInfo services need to implement. +#[async_trait] +pub trait PathInfoService: Send + Sync { + /// Retrieve a PathInfo message by the output digest. + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error>; + + /// Store a PathInfo message. Implementations MUST call validate and reject + /// invalid messages. + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>; + + /// Iterate over all PathInfo objects in the store. + /// Implementations can decide to disallow listing. + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>; +} + +#[async_trait] +impl<A> PathInfoService for A +where + A: AsRef<dyn PathInfoService> + Send + Sync, +{ + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + self.as_ref().get(digest).await + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + self.as_ref().put(path_info).await + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + self.as_ref().list() + } +} diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs new file mode 100644 index 000000000000..cccd4805c6ca --- /dev/null +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -0,0 +1,266 @@ +use futures::{stream::BoxStream, TryStreamExt}; +use nix_compat::{ + narinfo::{self, NarInfo}, + nixbase32, + nixhash::NixHash, +}; +use reqwest::StatusCode; +use sha2::Digest; +use std::io::{self, Write}; +use tokio::io::{AsyncRead, BufReader}; +use tokio_util::io::InspectReader; +use tonic::async_trait; +use tracing::{debug, instrument, warn}; +use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, +}; + +use crate::proto::PathInfo; + +use super::PathInfoService; + +/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache +/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix +/// Store Model. +/// It implements the [PathInfoService] trait in an interesting way: +/// Every [PathInfoService::get] fetches the .narinfo and referred NAR file, +/// inserting components into a [BlobService] and [DirectoryService], then +/// returning a [PathInfo] struct with the root. +/// +/// Due to this being quite a costly operation, clients are expected to layer +/// this service with store composition, so they're only ingested once. +/// +/// The client is expected to be (indirectly) using the same [BlobService] and +/// [DirectoryService], so able to fetch referred Directories and Blobs. +/// [PathInfoService::put] is not implemented and returns an error if called. +/// TODO: what about reading from nix-cache-info? +pub struct NixHTTPPathInfoService<BS, DS> { + base_url: url::Url, + http_client: reqwest::Client, + + blob_service: BS, + directory_service: DS, + + /// An optional list of [narinfo::PubKey]. + /// If set, the .narinfo files received need to have correct signature by at least one of these. + public_keys: Option<Vec<narinfo::PubKey>>, +} + +impl<BS, DS> NixHTTPPathInfoService<BS, DS> { + pub fn new(base_url: url::Url, blob_service: BS, directory_service: DS) -> Self { + Self { + base_url, + http_client: reqwest::Client::new(), + blob_service, + directory_service, + + public_keys: None, + } + } + + /// Configures [Self] to validate NARInfo fingerprints with the public keys passed. + pub fn set_public_keys(&mut self, public_keys: Vec<narinfo::PubKey>) { + self.public_keys = Some(public_keys); + } +} + +#[async_trait] +impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS> +where + BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, + DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, +{ + #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let narinfo_url = self + .base_url + .join(&format!("{}.narinfo", nixbase32::encode(&digest))) + .map_err(|e| { + warn!(e = %e, "unable to join URL"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to join url") + })?; + + debug!(narinfo_url= %narinfo_url, "constructed NARInfo url"); + + let resp = self + .http_client + .get(narinfo_url) + .send() + .await + .map_err(|e| { + warn!(e=%e,"unable to send NARInfo request"); + io::Error::new( + io::ErrorKind::InvalidInput, + "unable to send NARInfo request", + ) + })?; + + // In the case of a 404, return a NotFound. + // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix, + // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org. + if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN { + return Ok(None); + } + + let narinfo_str = resp.text().await.map_err(|e| { + warn!(e=%e,"unable to decode response as string"); + io::Error::new( + io::ErrorKind::InvalidData, + "unable to decode response as string", + ) + })?; + + // parse the received narinfo + let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| { + warn!(e=%e,"unable to parse response as NarInfo"); + io::Error::new( + io::ErrorKind::InvalidData, + "unable to parse response as NarInfo", + ) + })?; + + // if [self.public_keys] is set, ensure there's at least one valid signature. + if let Some(public_keys) = &self.public_keys { + let fingerprint = narinfo.fingerprint(); + + if !public_keys.iter().any(|pubkey| { + narinfo + .signatures + .iter() + .any(|sig| pubkey.verify(&fingerprint, sig)) + }) { + warn!("no valid signature found"); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "no valid signature found", + ))?; + } + } + + // Convert to a (sparse) PathInfo. We still need to populate the node field, + // and for this we need to download the NAR file. + // FUTUREWORK: Keep some database around mapping from narsha256 to + // (unnamed) rootnode, so we can use that (and the name from the + // StorePath) and avoid downloading the same NAR a second time. + let pathinfo: PathInfo = (&narinfo).into(); + + // create a request for the NAR file itself. + let nar_url = self.base_url.join(narinfo.url).map_err(|e| { + warn!(e = %e, "unable to join URL"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to join url") + })?; + debug!(nar_url= %nar_url, "constructed NAR url"); + + let resp = self + .http_client + .get(nar_url.clone()) + .send() + .await + .map_err(|e| { + warn!(e=%e,"unable to send NAR request"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request") + })?; + + // if the request is not successful, return an error. + if !resp.status().is_success() { + return Err(Error::StorageError(format!( + "unable to retrieve NAR at {}, status {}", + nar_url, + resp.status() + ))); + } + + // get a reader of the response body. + let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| { + let e = e.without_url(); + warn!(e=%e, "failed to get response body"); + io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) + })); + + // handle decompression, depending on the compression field. + let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression { + Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>, + Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some(comp_str) => { + return Err(Error::StorageError(format!( + "unsupported compression: {comp_str}" + ))); + } + }; + let mut nar_hash = sha2::Sha256::new(); + let mut nar_size = 0; + + // Assemble NarHash and NarSize as we read bytes. + let r = InspectReader::new(r, |b| { + nar_size += b.len() as u64; + nar_hash.write_all(b).unwrap(); + }); + + // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors. + let mut r = BufReader::new(r); + + let root_node = crate::nar::ingest_nar( + self.blob_service.clone(), + self.directory_service.clone(), + &mut r, + ) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + // ensure the ingested narhash and narsize do actually match. + if narinfo.nar_size != nar_size { + warn!( + narinfo.nar_size = narinfo.nar_size, + http.nar_size = nar_size, + "NarSize mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarSize mismatch".to_string(), + ))?; + } + let nar_hash: [u8; 32] = nar_hash.finalize().into(); + if narinfo.nar_hash != nar_hash { + warn!( + narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash), + http.nar_hash = %NixHash::Sha256(nar_hash), + "NarHash mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarHash mismatch".to_string(), + ))?; + } + + Ok(Some(PathInfo { + node: Some(castorepb::Node { + // set the name of the root node to the digest-name of the store path. + node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())), + }), + references: pathinfo.references, + narinfo: pathinfo.narinfo, + })) + } + + #[instrument(skip_all, fields(path_info=?_path_info))] + async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> { + Err(Error::InvalidRequest( + "put not supported for this backend".to_string(), + )) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + Box::pin(futures::stream::once(async { + Err(Error::InvalidRequest( + "list not supported for this backend".to_string(), + )) + })) + } +} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs new file mode 100644 index 000000000000..eb3cf2ff1b88 --- /dev/null +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -0,0 +1,117 @@ +use super::PathInfoService; +use crate::proto::PathInfo; +use async_stream::try_stream; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use prost::Message; +use std::path::Path; +use tonic::async_trait; +use tracing::instrument; +use tracing::warn; +use tvix_castore::Error; + +/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). +/// +/// The PathInfo messages are stored as encoded protos, and keyed by their output hash, +/// as that's currently the only request type available. +pub struct SledPathInfoService { + db: sled::Db, +} + +impl SledPathInfoService { + pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> { + let config = sled::Config::default() + .use_compression(false) // is a required parameter + .path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +#[async_trait] +impl PathInfoService for SledPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let resp = tokio::task::spawn_blocking({ + let db = self.db.clone(); + move || db.get(digest.as_slice()) + }) + .await? + .map_err(|e| { + warn!("failed to retrieve PathInfo: {}", e); + Error::StorageError(format!("failed to retrieve PathInfo: {}", e)) + })?; + match resp { + None => Ok(None), + Some(data) => { + let path_info = PathInfo::decode(&*data).map_err(|e| { + warn!("failed to decode stored PathInfo: {}", e); + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?; + Ok(Some(path_info)) + } + } + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?; + + // In case the PathInfo is valid, we were able to parse a StorePath. + // Store it in the database, keyed by its digest. + // This overwrites existing PathInfo objects. + tokio::task::spawn_blocking({ + let db = self.db.clone(); + let k = *store_path.digest(); + let data = path_info.encode_to_vec(); + move || db.insert(k, data) + }) + .await? + .map_err(|e| { + warn!("failed to insert PathInfo: {}", e); + Error::StorageError(format! { + "failed to insert PathInfo: {}", e + }) + })?; + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let db = self.db.clone(); + let mut it = db.iter().values(); + + Box::pin(try_stream! { + // Don't block the executor while waiting for .next(), so wrap that + // in a spawn_blocking call. + // We need to pass around it to be able to reuse it. + while let (Some(elem), new_it) = tokio::task::spawn_blocking(move || { + (it.next(), it) + }).await? { + it = new_it; + let data = elem.map_err(|e| { + warn!("failed to retrieve PathInfo: {}", e); + Error::StorageError(format!("failed to retrieve PathInfo: {}", e)) + })?; + + let path_info = PathInfo::decode(&*data).map_err(|e| { + warn!("failed to decode stored PathInfo: {}", e); + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?; + + yield path_info + } + }) + } +} diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs new file mode 100644 index 000000000000..061655e4bafc --- /dev/null +++ b/tvix/store/src/pathinfoservice/tests/mod.rs @@ -0,0 +1,72 @@ +//! This contains test scenarios that a given [PathInfoService] needs to pass. +//! We use [rstest] and [rstest_reuse] to provide all services we want to test +//! against, and then apply this template to all test functions. + +use rstest::*; +use rstest_reuse::{self, *}; + +use super::PathInfoService; +use crate::pathinfoservice::MemoryPathInfoService; +use crate::pathinfoservice::SledPathInfoService; +use crate::proto::PathInfo; +use crate::tests::fixtures::DUMMY_PATH_DIGEST; +use tvix_castore::proto as castorepb; + +mod utils; +pub use self::utils::make_grpc_path_info_service_client; + +#[cfg(all(feature = "cloud", feature = "integration"))] +use self::utils::make_bigtable_path_info_service; + +#[template] +#[rstest] +#[case::memory(MemoryPathInfoService::default())] +#[case::grpc({ + let (_, _, svc) = make_grpc_path_info_service_client().await; + svc +})] +#[case::sled(SledPathInfoService::new_temporary().unwrap())] +#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))] +pub fn path_info_services(#[case] svc: impl PathInfoService) {} + +// FUTUREWORK: add more tests rejecting invalid PathInfo messages. +// A subset of them should also ensure references to other PathInfos, or +// elements in {Blob,Directory}Service do exist. + +/// Trying to get a non-existent PathInfo should return Ok(None). +#[apply(path_info_services)] +#[tokio::test] +async fn not_found(svc: impl PathInfoService) { + assert!(svc + .get(DUMMY_PATH_DIGEST) + .await + .expect("must succeed") + .is_none()); +} + +/// Put a PathInfo into the store, get it back. +#[apply(path_info_services)] +#[tokio::test] +async fn put_get(svc: impl PathInfoService) { + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "00000000000000000000000000000000-foo".into(), + target: "doesntmatter".into(), + })), + }), + ..Default::default() + }; + + // insert + let resp = svc.put(path_info.clone()).await.expect("must succeed"); + + // expect the returned PathInfo to be equal (for now) + // in the future, some stores might add additional fields/signatures. + assert_eq!(path_info, resp); + + // get it back + let resp = svc.get(DUMMY_PATH_DIGEST).await.expect("must succeed"); + + assert_eq!(Some(path_info), resp); +} diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs new file mode 100644 index 000000000000..ee170468d1d2 --- /dev/null +++ b/tvix/store/src/pathinfoservice/tests/utils.rs @@ -0,0 +1,75 @@ +use std::sync::Arc; + +use tonic::transport::{Endpoint, Server, Uri}; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; + +use crate::{ + nar::{NarCalculationService, SimpleRenderer}, + pathinfoservice::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService}, + proto::{ + path_info_service_client::PathInfoServiceClient, + path_info_service_server::PathInfoServiceServer, GRPCPathInfoServiceWrapper, + }, + tests::fixtures::{blob_service, directory_service}, +}; + +/// Constructs and returns a gRPC PathInfoService. +/// We also return memory-based {Blob,Directory}Service, +/// as the consumer of this function accepts a 3-tuple. +pub async fn make_grpc_path_info_service_client( +) -> (impl BlobService, impl DirectoryService, GRPCPathInfoService) { + let (left, right) = tokio::io::duplex(64); + + let blob_service = blob_service(); + let directory_service = directory_service(); + + // spin up a server, which will only connect once, to the left side. + tokio::spawn({ + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + async move { + let path_info_service: Arc<dyn PathInfoService> = + Arc::from(MemoryPathInfoService::default()); + let nar_calculation_service = + Box::new(SimpleRenderer::new(blob_service, directory_service)) + as Box<dyn NarCalculationService>; + + // spin up a new PathInfoService + let mut server = Server::builder(); + let router = server.add_service(PathInfoServiceServer::new( + GRPCPathInfoServiceWrapper::new(path_info_service, nar_calculation_service), + )); + + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await + } + }); + + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + + let path_info_service = GRPCPathInfoService::from_client(PathInfoServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), + )); + + (blob_service, directory_service, path_info_service) +} + +#[cfg(all(feature = "cloud", feature = "integration"))] +pub(crate) async fn make_bigtable_path_info_service( +) -> crate::pathinfoservice::BigtablePathInfoService { + use crate::pathinfoservice::bigtable::BigtableParameters; + use crate::pathinfoservice::BigtablePathInfoService; + + BigtablePathInfoService::connect(BigtableParameters::default_for_tests()) + .await + .unwrap() +} diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs new file mode 100644 index 000000000000..68f557567629 --- /dev/null +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -0,0 +1,124 @@ +use crate::nar::{NarCalculationService, RenderError}; +use crate::pathinfoservice::PathInfoService; +use crate::proto; +use futures::{stream::BoxStream, TryStreamExt}; +use std::ops::Deref; +use tonic::{async_trait, Request, Response, Result, Status}; +use tracing::{instrument, warn}; +use tvix_castore::proto as castorepb; + +pub struct GRPCPathInfoServiceWrapper<PS, NS> { + path_info_service: PS, + // FUTUREWORK: allow exposing without allowing listing + nar_calculation_service: NS, +} + +impl<PS, NS> GRPCPathInfoServiceWrapper<PS, NS> { + pub fn new(path_info_service: PS, nar_calculation_service: NS) -> Self { + Self { + path_info_service, + nar_calculation_service, + } + } +} + +#[async_trait] +impl<PS, NS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, NS> +where + PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static, + NS: NarCalculationService + Send + Sync + 'static, +{ + type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>; + + #[instrument(skip_all)] + async fn get( + &self, + request: Request<proto::GetPathInfoRequest>, + ) -> Result<Response<proto::PathInfo>> { + match request.into_inner().by_what { + None => Err(Status::unimplemented("by_what needs to be specified")), + Some(proto::get_path_info_request::ByWhat::ByOutputHash(output_digest)) => { + let digest: [u8; 20] = output_digest + .to_vec() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid output digest length"))?; + match self.path_info_service.get(digest).await { + Ok(None) => Err(Status::not_found("PathInfo not found")), + Ok(Some(path_info)) => Ok(Response::new(path_info)), + Err(e) => { + warn!(err = %e, "failed to get PathInfo"); + Err(e.into()) + } + } + } + } + } + + #[instrument(skip_all)] + async fn put(&self, request: Request<proto::PathInfo>) -> Result<Response<proto::PathInfo>> { + let path_info = request.into_inner(); + + // Store the PathInfo in the client. Clients MUST validate the data + // they receive, so we don't validate additionally here. + match self.path_info_service.put(path_info).await { + Ok(path_info_new) => Ok(Response::new(path_info_new)), + Err(e) => { + warn!(err = %e, "failed to put PathInfo"); + Err(e.into()) + } + } + } + + #[instrument(skip_all)] + async fn calculate_nar( + &self, + request: Request<castorepb::Node>, + ) -> Result<Response<proto::CalculateNarResponse>> { + match request.into_inner().node { + None => Err(Status::invalid_argument("no root node sent")), + Some(root_node) => { + if let Err(e) = root_node.validate() { + warn!(err = %e, "invalid root node"); + Err(Status::invalid_argument("invalid root node"))? + } + + match self.nar_calculation_service.calculate_nar(&root_node).await { + Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + })), + Err(e) => { + warn!(err = %e, "error during NAR calculation"); + Err(e.into()) + } + } + } + } + } + + #[instrument(skip_all, err)] + async fn list( + &self, + _request: Request<proto::ListPathInfoRequest>, + ) -> Result<Response<Self::ListStream>, Status> { + let stream = Box::pin( + self.path_info_service + .list() + .map_err(|e| Status::internal(e.to_string())), + ); + + Ok(Response::new(Box::pin(stream))) + } +} + +impl From<RenderError> for tonic::Status { + fn from(value: RenderError) -> Self { + match value { + RenderError::BlobNotFound(_, _) => Self::not_found(value.to_string()), + RenderError::DirectoryNotFound(_, _) => Self::not_found(value.to_string()), + RenderError::NARWriterError(_) => Self::internal(value.to_string()), + RenderError::StoreError(_) => Self::internal(value.to_string()), + RenderError::UnexpectedBlobMeta(_, _, _, _) => Self::internal(value.to_string()), + } + } +} diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs new file mode 100644 index 000000000000..a09839c8bdf9 --- /dev/null +++ b/tvix/store/src/proto/mod.rs @@ -0,0 +1,374 @@ +#![allow(clippy::derive_partial_eq_without_eq, non_snake_case)] +use bstr::ByteSlice; +use bytes::Bytes; +use data_encoding::BASE64; +// https://github.com/hyperium/tonic/issues/1056 +use nix_compat::{ + narinfo::Flags, + nixhash::{CAHash, NixHash}, + store_path::{self, StorePathRef}, +}; +use thiserror::Error; +use tvix_castore::proto::{self as castorepb, NamedNode, ValidateNodeError}; + +mod grpc_pathinfoservice_wrapper; + +pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper; + +tonic::include_proto!("tvix.store.v1"); + +#[cfg(feature = "tonic-reflection")] +/// Compiled file descriptors for implementing [gRPC +/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g. +/// [`tonic_reflection`](https://docs.rs/tonic-reflection). +pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.store.v1"); + +#[cfg(test)] +mod tests; + +/// Errors that can occur during the validation of PathInfo messages. +#[derive(Debug, Error, PartialEq)] +pub enum ValidatePathInfoError { + /// Invalid length of a reference + #[error("Invalid length of digest at position {}, expected {}, got {}", .0, store_path::DIGEST_SIZE, .1)] + InvalidReferenceDigestLen(usize, usize), + + /// No node present + #[error("No node present")] + NoNodePresent, + + /// Node fails validation + #[error("Invalid root node: {:?}", .0.to_string())] + InvalidRootNode(ValidateNodeError), + + /// Invalid node name encountered. Root nodes in PathInfos have more strict name requirements + #[error("Failed to parse {} as StorePath: {1}", .0.to_str_lossy())] + InvalidNodeName(Vec<u8>, store_path::Error), + + /// The digest in narinfo.nar_sha256 has an invalid len. + #[error("Invalid narinfo.nar_sha256 length: expected {}, got {}", 32, .0)] + InvalidNarSha256DigestLen(usize), + + /// The number of references in the narinfo.reference_names field does not match + /// the number of references in the .references field. + #[error("Inconsistent Number of References: {0} (references) vs {1} (narinfo)")] + InconsistentNumberOfReferences(usize, usize), + + /// A string in narinfo.reference_names does not parse to a [store_path::StorePath]. + #[error("Invalid reference_name at position {0}: {1}")] + InvalidNarinfoReferenceName(usize, String), + + /// The digest in the parsed `.narinfo.reference_names[i]` does not match + /// the one in `.references[i]`.` + #[error("digest in reference_name at position {} does not match digest in PathInfo, expected {}, got {}", .0, BASE64.encode(.1), BASE64.encode(.2))] + InconsistentNarinfoReferenceNameDigest( + usize, + [u8; store_path::DIGEST_SIZE], + [u8; store_path::DIGEST_SIZE], + ), + + /// The deriver field is invalid. + #[error("deriver field is invalid: {0}")] + InvalidDeriverField(store_path::Error), +} + +/// Parses a root node name. +/// +/// On success, this returns the parsed [store_path::StorePathRef]. +/// On error, it returns an error generated from the supplied constructor. +fn parse_node_name_root<E>( + name: &[u8], + err: fn(Vec<u8>, store_path::Error) -> E, +) -> Result<store_path::StorePathRef<'_>, E> { + store_path::StorePathRef::from_bytes(name).map_err(|e| err(name.to_vec(), e)) +} + +impl PathInfo { + /// validate performs some checks on the PathInfo struct, + /// Returning either a [store_path::StorePath] of the root node, or a + /// [ValidatePathInfoError]. + pub fn validate(&self) -> Result<store_path::StorePathRef<'_>, ValidatePathInfoError> { + // ensure the references have the right number of bytes. + for (i, reference) in self.references.iter().enumerate() { + if reference.len() != store_path::DIGEST_SIZE { + return Err(ValidatePathInfoError::InvalidReferenceDigestLen( + i, + reference.len(), + )); + } + } + + // If there is a narinfo field populated… + if let Some(narinfo) = &self.narinfo { + // ensure the nar_sha256 digest has the correct length. + if narinfo.nar_sha256.len() != 32 { + return Err(ValidatePathInfoError::InvalidNarSha256DigestLen( + narinfo.nar_sha256.len(), + )); + } + + // ensure the number of references there matches PathInfo.references count. + if narinfo.reference_names.len() != self.references.len() { + return Err(ValidatePathInfoError::InconsistentNumberOfReferences( + self.references.len(), + narinfo.reference_names.len(), + )); + } + + // parse references in reference_names. + for (i, reference_name_str) in narinfo.reference_names.iter().enumerate() { + // ensure thy parse as (non-absolute) store path + let reference_names_store_path = store_path::StorePath::from_bytes( + reference_name_str.as_bytes(), + ) + .map_err(|_| { + ValidatePathInfoError::InvalidNarinfoReferenceName( + i, + reference_name_str.to_owned(), + ) + })?; + + // ensure their digest matches the one at self.references[i]. + { + // This is safe, because we ensured the proper length earlier already. + let reference_digest = self.references[i].to_vec().try_into().unwrap(); + + if reference_names_store_path.digest() != &reference_digest { + return Err( + ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest( + i, + reference_digest, + *reference_names_store_path.digest(), + ), + ); + } + } + + // If the Deriver field is populated, ensure it parses to a + // [store_path::StorePath]. + // We can't check for it to *not* end with .drv, as the .drv files produced by + // recursive Nix end with multiple .drv suffixes, and only one is popped when + // converting to this field. + if let Some(deriver) = &narinfo.deriver { + store_path::StorePathRef::from_name_and_digest(&deriver.name, &deriver.digest) + .map_err(ValidatePathInfoError::InvalidDeriverField)?; + } + } + } + + // Ensure there is a (root) node present, and it properly parses to a [store_path::StorePath]. + let root_nix_path = match &self.node { + None | Some(castorepb::Node { node: None }) => { + Err(ValidatePathInfoError::NoNodePresent)? + } + Some(castorepb::Node { node: Some(node) }) => { + node.validate() + .map_err(ValidatePathInfoError::InvalidRootNode)?; + // parse the name of the node itself and return + parse_node_name_root(node.get_name(), ValidatePathInfoError::InvalidNodeName)? + } + }; + + // return the root nix path + Ok(root_nix_path) + } + + /// With self and its store path name, this reconstructs a + /// [nix_compat::narinfo::NarInfo<'_>]. + /// It can be used to validate Signatures, or get back a (sparse) NarInfo + /// struct to prepare writing it out. + /// + /// It assumes self to be validated first, and will only return None if the + /// `narinfo` field is unpopulated. + /// + /// It does very little allocation (a Vec each for `signatures` and + /// `references`), the rest points to data owned elsewhere. + /// + /// Keep in mind this is not able to reconstruct all data present in the + /// NarInfo<'_>, as some of it is not stored at all: + /// - the `system`, `file_hash` and `file_size` fields are set to `None`. + /// - the URL is set to an empty string. + /// - Compression is set to "none" + /// + /// If you want to render it out to a string and be able to parse it back + /// in, at least URL *must* be set again. + pub fn to_narinfo<'a>( + &'a self, + store_path: store_path::StorePathRef<'a>, + ) -> Option<nix_compat::narinfo::NarInfo<'_>> { + let narinfo = &self.narinfo.as_ref()?; + + Some(nix_compat::narinfo::NarInfo { + flags: Flags::empty(), + store_path, + nar_hash: narinfo + .nar_sha256 + .as_ref() + .try_into() + .expect("invalid narhash"), + nar_size: narinfo.nar_size, + references: narinfo + .reference_names + .iter() + .map(|ref_name| { + // This shouldn't pass validation + StorePathRef::from_bytes(ref_name.as_bytes()).expect("invalid reference") + }) + .collect(), + signatures: narinfo + .signatures + .iter() + .map(|sig| { + nix_compat::narinfo::Signature::new( + &sig.name, + // This shouldn't pass validation + sig.data[..].try_into().expect("invalid signature len"), + ) + }) + .collect(), + ca: narinfo + .ca + .as_ref() + .map(|ca| ca.try_into().expect("invalid ca")), + system: None, + deriver: narinfo.deriver.as_ref().map(|deriver| { + StorePathRef::from_name_and_digest(&deriver.name, &deriver.digest) + .expect("invalid deriver") + }), + url: "", + compression: Some("none"), + file_hash: None, + file_size: None, + }) + } +} + +/// Errors that can occur when converting from a [nar_info::Ca] to a (stricter) +/// [nix_compat::nixhash::CAHash]. +#[derive(Debug, Error, PartialEq)] +pub enum ConvertCAError { + /// Invalid length of a reference + #[error("Invalid digest length '{0}' for type {1}")] + InvalidReferenceDigestLen(usize, &'static str), + /// Unknown Hash type + #[error("Unknown hash type: {0}")] + UnknownHashType(i32), +} + +impl TryFrom<&nar_info::Ca> for nix_compat::nixhash::CAHash { + type Error = ConvertCAError; + + fn try_from(value: &nar_info::Ca) -> Result<Self, Self::Error> { + Ok(match value.r#type { + typ if typ == nar_info::ca::Hash::FlatMd5 as i32 => { + Self::Flat(NixHash::Md5(value.digest[..].try_into().map_err(|_| { + ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatMd5") + })?)) + } + typ if typ == nar_info::ca::Hash::FlatSha1 as i32 => { + Self::Flat(NixHash::Sha1(value.digest[..].try_into().map_err( + |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha1"), + )?)) + } + typ if typ == nar_info::ca::Hash::FlatSha256 as i32 => { + Self::Flat(NixHash::Sha256(value.digest[..].try_into().map_err( + |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha256"), + )?)) + } + typ if typ == nar_info::ca::Hash::FlatSha512 as i32 => Self::Flat(NixHash::Sha512( + Box::new(value.digest[..].try_into().map_err(|_| { + ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha512") + })?), + )), + typ if typ == nar_info::ca::Hash::NarMd5 as i32 => { + Self::Nar(NixHash::Md5(value.digest[..].try_into().map_err(|_| { + ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarMd5") + })?)) + } + typ if typ == nar_info::ca::Hash::NarSha1 as i32 => { + Self::Nar(NixHash::Sha1(value.digest[..].try_into().map_err( + |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha1"), + )?)) + } + typ if typ == nar_info::ca::Hash::NarSha256 as i32 => { + Self::Nar(NixHash::Sha256(value.digest[..].try_into().map_err( + |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha256"), + )?)) + } + typ if typ == nar_info::ca::Hash::NarSha512 as i32 => Self::Nar(NixHash::Sha512( + Box::new(value.digest[..].try_into().map_err(|_| { + ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha512") + })?), + )), + typ => return Err(ConvertCAError::UnknownHashType(typ)), + }) + } +} + +impl From<&nix_compat::nixhash::CAHash> for nar_info::ca::Hash { + fn from(value: &nix_compat::nixhash::CAHash) -> Self { + match value { + CAHash::Flat(NixHash::Md5(_)) => nar_info::ca::Hash::FlatMd5, + CAHash::Flat(NixHash::Sha1(_)) => nar_info::ca::Hash::FlatSha1, + CAHash::Flat(NixHash::Sha256(_)) => nar_info::ca::Hash::FlatSha256, + CAHash::Flat(NixHash::Sha512(_)) => nar_info::ca::Hash::FlatSha512, + CAHash::Nar(NixHash::Md5(_)) => nar_info::ca::Hash::NarMd5, + CAHash::Nar(NixHash::Sha1(_)) => nar_info::ca::Hash::NarSha1, + CAHash::Nar(NixHash::Sha256(_)) => nar_info::ca::Hash::NarSha256, + CAHash::Nar(NixHash::Sha512(_)) => nar_info::ca::Hash::NarSha512, + CAHash::Text(_) => nar_info::ca::Hash::TextSha256, + } + } +} + +impl From<&nix_compat::nixhash::CAHash> for nar_info::Ca { + fn from(value: &nix_compat::nixhash::CAHash) -> Self { + nar_info::Ca { + r#type: Into::<nar_info::ca::Hash>::into(value) as i32, + digest: value.hash().digest_as_bytes().to_vec().into(), + } + } +} + +impl From<&nix_compat::narinfo::NarInfo<'_>> for NarInfo { + /// Converts from a NarInfo (returned from the NARInfo parser) to the proto- + /// level NarInfo struct. + fn from(value: &nix_compat::narinfo::NarInfo<'_>) -> Self { + let signatures = value + .signatures + .iter() + .map(|sig| nar_info::Signature { + name: sig.name().to_string(), + data: Bytes::copy_from_slice(sig.bytes()), + }) + .collect(); + + NarInfo { + nar_size: value.nar_size, + nar_sha256: Bytes::copy_from_slice(&value.nar_hash), + signatures, + reference_names: value.references.iter().map(|r| r.to_string()).collect(), + deriver: value.deriver.as_ref().map(|sp| StorePath { + name: sp.name().to_owned(), + digest: Bytes::copy_from_slice(sp.digest()), + }), + ca: value.ca.as_ref().map(|ca| ca.into()), + } + } +} + +impl From<&nix_compat::narinfo::NarInfo<'_>> for PathInfo { + /// Converts from a NarInfo (returned from the NARInfo parser) to a PathInfo + /// struct with the node set to None. + fn from(value: &nix_compat::narinfo::NarInfo<'_>) -> Self { + Self { + node: None, + references: value + .references + .iter() + .map(|x| Bytes::copy_from_slice(x.digest())) + .collect(), + narinfo: Some(value.into()), + } + } +} diff --git a/tvix/store/src/proto/tests/mod.rs b/tvix/store/src/proto/tests/mod.rs new file mode 100644 index 000000000000..c9c670202740 --- /dev/null +++ b/tvix/store/src/proto/tests/mod.rs @@ -0,0 +1 @@ +mod pathinfo; diff --git a/tvix/store/src/proto/tests/pathinfo.rs b/tvix/store/src/proto/tests/pathinfo.rs new file mode 100644 index 000000000000..4d0834878d7c --- /dev/null +++ b/tvix/store/src/proto/tests/pathinfo.rs @@ -0,0 +1,431 @@ +use crate::proto::{nar_info::Signature, NarInfo, PathInfo, ValidatePathInfoError}; +use crate::tests::fixtures::*; +use bytes::Bytes; +use data_encoding::BASE64; +use nix_compat::nixbase32; +use nix_compat::store_path::{self, StorePathRef}; +use rstest::rstest; +use tvix_castore::proto as castorepb; + +#[rstest] +#[case::no_node(None, Err(ValidatePathInfoError::NoNodePresent))] +#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::NoNodePresent))] + +fn validate_pathinfo( + #[case] node: Option<castorepb::Node>, + #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node, + ..Default::default() + }; + + assert_eq!(exp_result, p.validate()); + + let err = p.validate().expect_err("validation should fail"); + assert!(matches!(err, ValidatePathInfoError::NoNodePresent)); +} + +#[rstest] +#[case::ok(castorepb::DirectoryNode { + name: DUMMY_PATH.into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, +}, Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))] +#[case::invalid_digest_length(castorepb::DirectoryNode { + name: DUMMY_PATH.into(), + digest: Bytes::new(), + size: 0, +}, Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))))] +#[case::invalid_node_name_no_storepath(castorepb::DirectoryNode { + name: "invalid".into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, +}, Err(ValidatePathInfoError::InvalidNodeName( + "invalid".into(), + store_path::Error::InvalidLength +)))] +fn validate_directory( + #[case] directory_node: castorepb::DirectoryNode, + #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Directory(directory_node)), + }), + ..Default::default() + }; + assert_eq!(exp_result, p.validate()); +} + +#[rstest] +#[case::ok( + castorepb::FileNode { + name: DUMMY_PATH.into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, + executable: false, + }, + Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()) +)] +#[case::invalid_digest_len( + castorepb::FileNode { + name: DUMMY_PATH.into(), + digest: Bytes::new(), + ..Default::default() + }, + Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))) +)] +#[case::invalid_node_name( + castorepb::FileNode { + name: "invalid".into(), + digest: DUMMY_DIGEST.clone().into(), + ..Default::default() + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".into(), + store_path::Error::InvalidLength + )) +)] +fn validate_file( + #[case] file_node: castorepb::FileNode, + #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::File(file_node)), + }), + ..Default::default() + }; + assert_eq!(exp_result, p.validate()); +} + +#[rstest] +#[case::ok( + castorepb::SymlinkNode { + name: DUMMY_PATH.into(), + target: "foo".into(), + }, + Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()) +)] +#[case::invalid_node_name( + castorepb::SymlinkNode { + name: "invalid".into(), + target: "foo".into(), + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".into(), + store_path::Error::InvalidLength + )) +)] +fn validate_symlink( + #[case] symlink_node: castorepb::SymlinkNode, + #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Symlink(symlink_node)), + }), + ..Default::default() + }; + assert_eq!(exp_result, p.validate()); +} + +/// Ensure parsing a correct PathInfo without narinfo populated succeeds. +#[test] +fn validate_references_without_narinfo_ok() { + assert!(PATH_INFO_WITHOUT_NARINFO.validate().is_ok()); +} + +/// Ensure parsing a correct PathInfo with narinfo populated succeeds. +#[test] +fn validate_references_with_narinfo_ok() { + assert!(PATH_INFO_WITH_NARINFO.validate().is_ok()); +} + +/// Create a PathInfo with a wrong digest length in narinfo.nar_sha256, and +/// ensure validation fails. +#[test] +fn validate_wrong_nar_sha256() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + path_info.narinfo.as_mut().unwrap().nar_sha256 = vec![0xbe, 0xef].into(); + + match path_info.validate().expect_err("must_fail") { + ValidatePathInfoError::InvalidNarSha256DigestLen(2) => {} + e => panic!("unexpected error: {:?}", e), + }; +} + +/// Create a PathInfo with a wrong count of narinfo.reference_names, +/// and ensure validation fails. +#[test] +fn validate_inconsistent_num_refs_fail() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + path_info.narinfo.as_mut().unwrap().reference_names = vec![]; + + match path_info.validate().expect_err("must_fail") { + ValidatePathInfoError::InconsistentNumberOfReferences(1, 0) => {} + e => panic!("unexpected error: {:?}", e), + }; +} + +/// Create a PathInfo with a wrong digest length in references. +#[test] +fn validate_invalid_reference_digest_len() { + let mut path_info = PATH_INFO_WITHOUT_NARINFO.clone(); + path_info.references.push(vec![0xff, 0xff].into()); + + match path_info.validate().expect_err("must fail") { + ValidatePathInfoError::InvalidReferenceDigestLen( + 1, // position + 2, // unexpected digest len + ) => {} + e => panic!("unexpected error: {:?}", e), + }; +} + +/// Create a PathInfo with a narinfo.reference_name[1] that is no valid store path. +#[test] +fn validate_invalid_narinfo_reference_name() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + + // This is invalid, as the store prefix is not part of reference_names. + path_info.narinfo.as_mut().unwrap().reference_names[0] = + "/nix/store/00000000000000000000000000000000-dummy".to_string(); + + match path_info.validate().expect_err("must fail") { + ValidatePathInfoError::InvalidNarinfoReferenceName(0, reference_name) => { + assert_eq!( + "/nix/store/00000000000000000000000000000000-dummy", + reference_name + ); + } + e => panic!("unexpected error: {:?}", e), + } +} + +/// Create a PathInfo with a narinfo.reference_name[0] that doesn't match references[0]. +#[test] +fn validate_inconsistent_narinfo_reference_name_digest() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + + // mutate the first reference, they were all zeroes before + path_info.references[0] = vec![0xff; store_path::DIGEST_SIZE].into(); + + match path_info.validate().expect_err("must fail") { + ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest(0, e_expected, e_actual) => { + assert_eq!(path_info.references[0][..], e_expected[..]); + assert_eq!(DUMMY_PATH_DIGEST, e_actual); + } + e => panic!("unexpected error: {:?}", e), + } +} + +/// Create a node with an empty symlink target, and ensure it fails validation. +#[test] +fn validate_symlink_empty_target_invalid() { + let node = castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "foo".into(), + target: "".into(), + }); + + node.validate().expect_err("must fail validation"); +} + +/// Create a node with a symlink target including null bytes, and ensure it +/// fails validation. +#[test] +fn validate_symlink_target_null_byte_invalid() { + let node = castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "foo".into(), + target: "foo\0".into(), + }); + + node.validate().expect_err("must fail validation"); +} + +/// Create a PathInfo with a correct deriver field and ensure it succeeds. +#[test] +fn validate_valid_deriver() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + + // add a valid deriver + let narinfo = path_info.narinfo.as_mut().unwrap(); + narinfo.deriver = Some(crate::proto::StorePath { + name: "foo".to_string(), + digest: Bytes::from(DUMMY_PATH_DIGEST.as_slice()), + }); + + path_info.validate().expect("must validate"); +} + +/// Create a PathInfo with a broken deriver field and ensure it fails. +#[test] +fn validate_invalid_deriver() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + + // add a broken deriver (invalid digest) + let narinfo = path_info.narinfo.as_mut().unwrap(); + narinfo.deriver = Some(crate::proto::StorePath { + name: "foo".to_string(), + digest: vec![].into(), + }); + + match path_info.validate().expect_err("must fail validation") { + ValidatePathInfoError::InvalidDeriverField(_) => {} + e => panic!("unexpected error: {:?}", e), + } +} + +#[test] +fn from_nixcompat_narinfo() { + let narinfo_parsed = nix_compat::narinfo::NarInfo::parse( + r#"StorePath: /nix/store/s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1 +URL: nar/1nhgq6wcggx0plpy4991h3ginj6hipsdslv4fd4zml1n707j26yq.nar.xz +Compression: xz +FileHash: sha256:1nhgq6wcggx0plpy4991h3ginj6hipsdslv4fd4zml1n707j26yq +FileSize: 50088 +NarHash: sha256:0yzhigwjl6bws649vcs2asa4lbs8hg93hyix187gc7s7a74w5h80 +NarSize: 226488 +References: 3n58xw4373jp0ljirf06d8077j15pc4j-glibc-2.37-8 s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1 +Deriver: ib3sh3pcz10wsmavxvkdbayhqivbghlq-hello-2.12.1.drv +Sig: cache.nixos.org-1:8ijECciSFzWHwwGVOIVYdp2fOIOJAfmzGHPQVwpktfTQJF6kMPPDre7UtFw3o+VqenC5P8RikKOAAfN7CvPEAg=="#).expect("must parse"); + + assert_eq!( + PathInfo { + node: None, + references: vec![ + Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("3n58xw4373jp0ljirf06d8077j15pc4j").unwrap()), + Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("s66mzxpvicwk07gjbjfw9izjfa797vsw").unwrap()), + ], + narinfo: Some( + NarInfo { + nar_size: 226488, + nar_sha256: Bytes::copy_from_slice( + &nixbase32::decode_fixed::<32>("0yzhigwjl6bws649vcs2asa4lbs8hg93hyix187gc7s7a74w5h80".as_bytes()) + .unwrap() + ), + signatures: vec![Signature { + name: "cache.nixos.org-1".to_string(), + data: BASE64.decode("8ijECciSFzWHwwGVOIVYdp2fOIOJAfmzGHPQVwpktfTQJF6kMPPDre7UtFw3o+VqenC5P8RikKOAAfN7CvPEAg==".as_bytes()).unwrap().into(), + }], + reference_names: vec![ + "3n58xw4373jp0ljirf06d8077j15pc4j-glibc-2.37-8".to_string(), + "s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1".to_string() + ], + deriver: Some(crate::proto::StorePath { + digest: Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("ib3sh3pcz10wsmavxvkdbayhqivbghlq").unwrap()), + name: "hello-2.12.1".to_string(), + }), + ca: None, + } + ) + }, + (&narinfo_parsed).into(), + ); +} + +#[test] +fn from_nixcompat_narinfo_fod() { + let narinfo_parsed = nix_compat::narinfo::NarInfo::parse( + r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz +URL: nar/1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r.nar.xz +Compression: xz +FileHash: sha256:1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r +FileSize: 1033524 +NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh +NarSize: 1033416 +References: +Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv +Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg== +CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"# + ).expect("must parse"); + + assert_eq!( + PathInfo { + node: None, + references: vec![], + narinfo: Some( + NarInfo { + nar_size: 1033416, + nar_sha256: Bytes::copy_from_slice( + &nixbase32::decode_fixed::<32>( + "1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh" + ) + .unwrap() + ), + signatures: vec![Signature { + name: "cache.nixos.org-1".to_string(), + data: BASE64 + .decode("ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg==".as_bytes()) + .unwrap() + .into(), + }], + reference_names: vec![], + deriver: Some(crate::proto::StorePath { + digest: Bytes::copy_from_slice( + &nixbase32::decode_fixed::<20>("dyivpmlaq2km6c11i0s6bi6mbsx0ylqf").unwrap() + ), + name: "hello-2.12.1.tar.gz".to_string(), + }), + ca: Some(crate::proto::nar_info::Ca { + r#type: crate::proto::nar_info::ca::Hash::FlatSha256.into(), + digest: Bytes::copy_from_slice( + &nixbase32::decode_fixed::<32>( + "086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd" + ) + .unwrap() + ) + }), + } + ), + }, + (&narinfo_parsed).into() + ); +} + +/// Exercise .as_narinfo() on a PathInfo and ensure important fields are preserved.. +#[test] +fn as_narinfo() { + let narinfo_parsed = nix_compat::narinfo::NarInfo::parse( + r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz +URL: nar/1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r.nar.xz +Compression: xz +FileHash: sha256:1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r +FileSize: 1033524 +NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh +NarSize: 1033416 +References: +Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv +Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg== +CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"# + ).expect("must parse"); + + let path_info: PathInfo = (&narinfo_parsed).into(); + + let mut narinfo_returned = path_info + .to_narinfo( + StorePathRef::from_bytes(b"pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz") + .expect("invalid storepath"), + ) + .expect("must be some"); + narinfo_returned.url = "some.nar"; + + assert_eq!( + r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz +URL: some.nar +Compression: none +NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh +NarSize: 1033416 +References: +Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv +Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg== +CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd +"#, + narinfo_returned.to_string(), + ); +} diff --git a/tvix/store/src/tests/fixtures.rs b/tvix/store/src/tests/fixtures.rs new file mode 100644 index 000000000000..1c8359a2c0c7 --- /dev/null +++ b/tvix/store/src/tests/fixtures.rs @@ -0,0 +1,142 @@ +use lazy_static::lazy_static; +use rstest::*; +use std::sync::Arc; +pub use tvix_castore::fixtures::*; +use tvix_castore::{ + blobservice::{BlobService, MemoryBlobService}, + directoryservice::{DirectoryService, MemoryDirectoryService}, + proto as castorepb, +}; + +use crate::proto::{ + nar_info::{ca, Ca}, + NarInfo, PathInfo, +}; + +pub const DUMMY_PATH: &str = "00000000000000000000000000000000-dummy"; +pub const DUMMY_PATH_DIGEST: [u8; 20] = [0; 20]; + +lazy_static! { + /// The NAR representation of a symlink pointing to `/nix/store/somewhereelse` + pub static ref NAR_CONTENTS_SYMLINK: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink" + 6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target + 24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o', + b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's', + b'e', // "/nix/store/somewhereelse" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")" + ]; + + /// The NAR representation of a regular file with the contents "Hello World!" + pub static ref NAR_CONTENTS_HELLOWORLD: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 12, 0, 0, 0, 0, 0, 0, 0, b'H', b'e', b'l', b'l', b'o', b' ', b'W', b'o', b'r', b'l', b'd', b'!', 0, 0, + 0, 0, // "Hello World!" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")" + ]; + + /// The NAR representation of a more complicated directory structure. + pub static ref NAR_CONTENTS_COMPLICATED: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 5, 0, 0, 0, 0, 0, 0, 0, b'.', b'k', b'e', b'e', b'p', 0, 0, 0, // ".keep" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 0, 0, 0, 0, 0, 0, 0, 0, // "" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 2, 0, 0, 0, 0, 0, 0, 0, b'a', b'a', 0, 0, 0, 0, 0, 0, // "aa" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink" + 6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target + 24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o', + b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's', + b'e', // "/nix/store/somewhereelse" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 4, 0, 0, 0, 0, 0, 0, 0, b'k', b'e', b'e', b'p', 0, 0, 0, 0, // "keep" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 5, 0, 0, 0, 0, 0, 0, 0, 46, 107, 101, 101, 112, 0, 0, 0, // ".keep" + 4, 0, 0, 0, 0, 0, 0, 0, 110, 111, 100, 101, 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 0, 0, 0, 0, 0, 0, 0, 0, // "" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + ]; + + /// A PathInfo message without .narinfo populated. + pub static ref PATH_INFO_WITHOUT_NARINFO : PathInfo = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: DUMMY_PATH.into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, + })), + }), + references: vec![DUMMY_PATH_DIGEST.as_slice().into()], + narinfo: None, + }; + + /// A PathInfo message with .narinfo populated. + /// The references in `narinfo.reference_names` aligns with what's in + /// `references`. + pub static ref PATH_INFO_WITH_NARINFO : PathInfo = PathInfo { + narinfo: Some(NarInfo { + nar_size: 0, + nar_sha256: DUMMY_DIGEST.clone().into(), + signatures: vec![], + reference_names: vec![DUMMY_PATH.to_string()], + deriver: None, + ca: Some(Ca { r#type: ca::Hash::NarSha256.into(), digest: DUMMY_DIGEST.clone().into() }) + }), + ..PATH_INFO_WITHOUT_NARINFO.clone() + }; +} + +#[fixture] +pub(crate) fn blob_service() -> Arc<dyn BlobService> { + Arc::from(MemoryBlobService::default()) +} + +#[fixture] +pub(crate) fn directory_service() -> Arc<dyn DirectoryService> { + Arc::from(MemoryDirectoryService::default()) +} diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs new file mode 100644 index 000000000000..1e7fc3f6b451 --- /dev/null +++ b/tvix/store/src/tests/mod.rs @@ -0,0 +1,2 @@ +pub mod fixtures; +mod nar_renderer; diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs new file mode 100644 index 000000000000..8bfb5a72bb2f --- /dev/null +++ b/tvix/store/src/tests/nar_renderer.rs @@ -0,0 +1,228 @@ +use crate::nar::calculate_size_and_sha256; +use crate::nar::write_nar; +use crate::tests::fixtures::blob_service; +use crate::tests::fixtures::directory_service; +use crate::tests::fixtures::*; +use rstest::*; +use sha2::{Digest, Sha256}; +use std::io; +use std::sync::Arc; +use tokio::io::sink; +use tvix_castore::blobservice::BlobService; +use tvix_castore::directoryservice::DirectoryService; +use tvix_castore::proto as castorepb; + +#[rstest] +#[tokio::test] +async fn single_symlink( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) { + let mut buf: Vec<u8> = vec![]; + + write_nar( + &mut buf, + &castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "doesntmatter".into(), + target: "/nix/store/somewhereelse".into(), + }), + // don't put anything in the stores, as we don't actually do any requests. + blob_service, + directory_service, + ) + .await + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_SYMLINK.to_vec()); +} + +/// Make sure the NARRenderer fails if a referred blob doesn't exist. +#[rstest] +#[tokio::test] +async fn single_file_missing_blob( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) { + let e = write_nar( + sink(), + &castorepb::node::Node::File(castorepb::FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: false, + }), + // the blobservice is empty intentionally, to provoke the error. + blob_service, + directory_service, + ) + .await + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::NotFound, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } +} + +/// Make sure the NAR Renderer fails if the returned blob meta has another size +/// than specified in the proto node. +#[rstest] +#[tokio::test] +async fn single_file_wrong_blob_size( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) { + // insert blob into the store + let mut writer = blob_service.open_write().await; + tokio::io::copy( + &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .await + .unwrap(); + assert_eq!( + HELLOWORLD_BLOB_DIGEST.clone(), + writer.close().await.unwrap() + ); + + // Test with a root FileNode of a too big size + let e = write_nar( + sink(), + &castorepb::node::Node::File(castorepb::FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 42, // <- note the wrong size here! + executable: false, + }), + blob_service.clone(), + directory_service.clone(), + ) + .await + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::UnexpectedEof, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } + + // Test with a root FileNode of a too small size + let e = write_nar( + sink(), + &castorepb::node::Node::File(castorepb::FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 2, // <- note the wrong size here! + executable: false, + }), + blob_service, + directory_service, + ) + .await + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::InvalidInput, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } +} + +#[rstest] +#[tokio::test] +async fn single_file( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) { + // insert blob into the store + let mut writer = blob_service.open_write().await; + tokio::io::copy(&mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS), &mut writer) + .await + .unwrap(); + + assert_eq!( + HELLOWORLD_BLOB_DIGEST.clone(), + writer.close().await.unwrap() + ); + + let mut buf: Vec<u8> = vec![]; + + write_nar( + &mut buf, + &castorepb::node::Node::File(castorepb::FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: false, + }), + blob_service, + directory_service, + ) + .await + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec()); +} + +#[rstest] +#[tokio::test] +async fn test_complicated( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) { + // put all data into the stores. + // insert blob into the store + let mut writer = blob_service.open_write().await; + tokio::io::copy(&mut io::Cursor::new(EMPTY_BLOB_CONTENTS), &mut writer) + .await + .unwrap(); + assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap()); + + // insert directories + directory_service + .put(DIRECTORY_WITH_KEEP.clone()) + .await + .unwrap(); + directory_service + .put(DIRECTORY_COMPLICATED.clone()) + .await + .unwrap(); + + let mut buf: Vec<u8> = vec![]; + + write_nar( + &mut buf, + &castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + blob_service.clone(), + directory_service.clone(), + ) + .await + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec()); + + // ensure calculate_nar does return the correct sha256 digest and sum. + let (nar_size, nar_digest) = calculate_size_and_sha256( + &castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + blob_service, + directory_service, + ) + .await + .expect("must succeed"); + + assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size); + let d = Sha256::digest(NAR_CONTENTS_COMPLICATED.clone()); + assert_eq!(d.as_slice(), nar_digest); +} diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs new file mode 100644 index 000000000000..e6e42f6ec4cf --- /dev/null +++ b/tvix/store/src/utils.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; +use std::{ + pin::Pin, + task::{self, Poll}, +}; +use tokio::io::{self, AsyncWrite}; + +use tvix_castore::{ + blobservice::{self, BlobService}, + directoryservice::{self, DirectoryService}, +}; + +use crate::nar::{NarCalculationService, SimpleRenderer}; +use crate::pathinfoservice::{self, PathInfoService}; + +/// Construct the store handles from their addrs. +pub async fn construct_services( + blob_service_addr: impl AsRef<str>, + directory_service_addr: impl AsRef<str>, + path_info_service_addr: impl AsRef<str>, +) -> std::io::Result<( + Arc<dyn BlobService>, + Arc<dyn DirectoryService>, + Box<dyn PathInfoService>, + Box<dyn NarCalculationService>, +)> { + let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref()) + .await? + .into(); + let directory_service: Arc<dyn DirectoryService> = + directoryservice::from_addr(directory_service_addr.as_ref()) + .await? + .into(); + let path_info_service = pathinfoservice::from_addr( + path_info_service_addr.as_ref(), + blob_service.clone(), + directory_service.clone(), + ) + .await?; + + // TODO: grpc client also implements NarCalculationService + let nar_calculation_service = Box::new(SimpleRenderer::new( + blob_service.clone(), + directory_service.clone(), + )) as Box<dyn NarCalculationService>; + + Ok(( + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + )) +} + +/// The inverse of [tokio_util::io::SyncIoBridge]. +/// Don't use this with anything that actually does blocking I/O. +pub struct AsyncIoBridge<T>(pub T); + +impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + Poll::Ready(self.get_mut().0.write(buf)) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(self.get_mut().0.flush()) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + ) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } +} |