diff options
Diffstat (limited to 'tvix/store')
42 files changed, 7421 insertions, 0 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml new file mode 100644 index 000000000000..43421ef4a302 --- /dev/null +++ b/tvix/store/Cargo.toml @@ -0,0 +1,97 @@ +[package] +name = "tvix-store" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.68" +async-stream = "0.3.5" +blake3 = { version = "1.3.1", features = ["rayon", "std"] } +bytes = "1.4.0" +clap = { version = "4.0", features = ["derive", "env"] } +count-write = "0.1.0" +data-encoding = "2.3.3" +futures = "0.3.28" +lazy_static = "1.4.0" +nix-compat = { path = "../nix-compat" } +parking_lot = "0.12.1" +pin-project-lite = "0.2.13" +prost = "0.12.1" +sha2 = "0.10.6" +sled = { version = "0.34.7", features = ["compression"] } +thiserror = "1.0.38" +tokio-stream = { version = "0.1.14", features = ["fs"] } +tokio-util = { version = "0.7.9", features = ["io", "io-util"] } +tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } +tonic = "0.10.2" +tower = "0.4.13" +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.16", features = ["json"] } +tvix-castore = { path = "../castore" } +url = "2.4.0" +walkdir = "2.4.0" +tokio-listener = { version = "0.2.1" } + +[dependencies.fuse-backend-rs] +optional = true +# TODO: Switch back to upstream version once https://github.com/cloud-hypervisor/fuse-backend-rs/pull/157 lands. +git = "https://github.com/griff/fuse-backend-rs" +branch = "macfuse-fix" + +[dependencies.vhost] +optional = true +version = "0.6" + +[dependencies.vhost-user-backend] +optional = true +version = "0.8" + +[dependencies.virtio-queue] +optional = true +version = "0.7" + +[dependencies.vm-memory] +optional = true +version = "0.10" + +[dependencies.vmm-sys-util] +optional = true +version = "0.11" + +[dependencies.virtio-bindings] +optional = true +version = "0.2.1" + +[dependencies.tonic-reflection] +optional = true +version = "0.10.2" + +[dependencies.libc] +optional = true +version = "0.2.144" + +[build-dependencies] +prost-build = "0.12.1" +tonic-build = "0.10.2" + +[dev-dependencies] +test-case = "2.2.2" +tempfile = "3.3.0" +tokio-retry = "0.3.0" + +[features] +default = ["fuse", "tonic-reflection"] +fs = ["dep:libc", "dep:fuse-backend-rs"] +virtiofs = [ + "fs", + "dep:vhost", + "dep:vhost-user-backend", + "dep:virtio-queue", + "dep:vm-memory", + "dep:vmm-sys-util", + "dep:virtio-bindings", + "fuse-backend-rs?/vhost-user-fs", # impl FsCacheReqHandler for SlaveFsCacheReq + "fuse-backend-rs?/virtiofs", +] +fuse = ["fs"] +tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"] diff --git a/tvix/store/README.md b/tvix/store/README.md new file mode 100644 index 000000000000..a9d29671d8bb --- /dev/null +++ b/tvix/store/README.md @@ -0,0 +1,63 @@ +# //tvix/store + +This contains the code hosting the tvix-store. + +For the local store, Nix realizes files on the filesystem in `/nix/store` (and +maintains some metadata in a SQLite database). For "remote stores", it +communicates this metadata in NAR (Nix ARchive) and NARInfo format. + +Compared to the Nix model, `tvix-store` stores data on a much more granular +level than that, which provides more deduplication possibilities, and more +granular copying. + +However, enough information is preserved to still be able to render NAR and +NARInfo when needed. + +## More Information +The store consists out of two different gRPC services, `tvix.castore.v1` for +the low-level content-addressed bits, and `tvix.store.v1` for the Nix and +`StorePath`-specific bits. + +Check the `protos/` subfolder both here and in `castore` for the definition of +the exact RPC methods and messages. + +## Interacting with the GRPC service manually +The shell environment in `//tvix` provides `evans`, which is an interactive +REPL-based gPRC client. + +You can use it to connect to a `tvix-store` and call the various RPC methods. + +```shell +$ cargo run -- daemon & +$ evans --host localhost --port 8000 -r repl + ______ + | ____| + | |__ __ __ __ _ _ __ ___ + | __| \ \ / / / _. | | '_ \ / __| + | |____ \ V / | (_| | | | | | \__ \ + |______| \_/ \__,_| |_| |_| |___/ + + more expressive universal gRPC client + + +localhost:8000> package tvix.castore.v1 +tvix.castore.v1@localhost:8000> service BlobService + +tvix.castore.v1.BlobService@localhost:8000> call Put --bytes-from-file +data (TYPE_BYTES) => /run/current-system/system +{ + "digest": "KOM3/IHEx7YfInAnlJpAElYezq0Sxn9fRz7xuClwNfA=" +} + +tvix.castore.v1.BlobService@localhost:8000> call Read --bytes-as-base64 +digest (TYPE_BYTES) => KOM3/IHEx7YfInAnlJpAElYezq0Sxn9fRz7xuClwNfA= +{ + "data": "eDg2XzY0LWxpbnV4" +} + +$ echo eDg2XzY0LWxpbnV4 | base64 -d +x86_64-linux +``` + +Thanks to `tvix-store` providing gRPC Server Reflection (with `reflection` +feature), you don't need to point `evans` to the `.proto` files. diff --git a/tvix/store/build.rs b/tvix/store/build.rs new file mode 100644 index 000000000000..cfeda59698a0 --- /dev/null +++ b/tvix/store/build.rs @@ -0,0 +1,38 @@ +use std::io::Result; + +fn main() -> Result<()> { + #[allow(unused_mut)] + let mut builder = tonic_build::configure(); + + #[cfg(feature = "tonic-reflection")] + { + let out_dir = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap()); + let descriptor_path = out_dir.join("tvix.store.v1.bin"); + + builder = builder.file_descriptor_set_path(descriptor_path); + }; + + // https://github.com/hyperium/tonic/issues/908 + let mut config = prost_build::Config::new(); + config.bytes(["."]); + config.extern_path(".tvix.castore.v1", "::tvix_castore::proto"); + + builder + .build_server(true) + .build_client(true) + .compile_with_config( + config, + &[ + "tvix/store/protos/pathinfo.proto", + "tvix/store/protos/rpc_pathinfo.proto", + ], + // If we are in running `cargo build` manually, using `../..` works fine, + // but in case we run inside a nix build, we need to instead point PROTO_ROOT + // to a sparseTree containing that structure. + &[match std::env::var_os("PROTO_ROOT") { + Some(proto_root) => proto_root.to_str().unwrap().to_owned(), + None => "../..".to_string(), + }], + )?; + Ok(()) +} diff --git a/tvix/store/default.nix b/tvix/store/default.nix new file mode 100644 index 000000000000..35d2a22bb2ce --- /dev/null +++ b/tvix/store/default.nix @@ -0,0 +1,34 @@ +{ depot, pkgs, ... }: + +let + mkImportCheck = p: expectedPath: { + label = ":nix :import ${p} with tvix-store import"; + needsOutput = true; + command = pkgs.writeShellScript "tvix-import-check" '' + export BLOB_SERVICE_ADDR=memory:// + export DIRECTORY_SERVICE_ADDR=memory:// + export PATH_INFO_SERVICE_ADDR=memory:// + TVIX_STORE_OUTPUT=$(result/bin/tvix-store import ${p}) + EXPECTED='${/* the vebatim expected Tvix output: */expectedPath}' + + echo "tvix-store output: ''${TVIX_STORE_OUTPUT}" + if [ "$TVIX_STORE_OUTPUT" != "$EXPECTED" ]; then + echo "Correct would have been ''${EXPECTED}" + exit 1 + fi + + echo "Output was correct." + ''; + }; +in + +(depot.tvix.crates.workspaceMembers.tvix-store.build.override { + runTests = true; + # virtiofs feature currently fails to build on Darwin. + # we however can ship it for non-darwin. + features = if pkgs.stdenv.isDarwin then [ "default" ] else [ "default" "virtiofs" ]; +}).overrideAttrs (_: { + meta.ci.extraSteps = { + import-docs = (mkImportCheck "tvix/store/docs" ./docs); + }; +}) diff --git a/tvix/store/docs/api.md b/tvix/store/docs/api.md new file mode 100644 index 000000000000..6a4b98911c2c --- /dev/null +++ b/tvix/store/docs/api.md @@ -0,0 +1,283 @@ +tvix-store API +============== + +This document outlines the design of the API exposed by tvix-store, as +well as other implementations of this store protocol. + +This document is meant to be read side-by-side with [castore.md](./castore.md) which describes the data model in more detail. + +The store API has four main consumers: + +1. The evaluator (or more correctly, the CLI/coordinator, in the Tvix + case) communicates with the store to: + + * Upload files and directories (e.g. from `builtins.path`, or `src = ./path` + Nix expressions). + * Read files from the store where necessary (e.g. when `nixpkgs` is + located in the store, or for IFD). + +2. The builder communicates with the store to: + + * Upload files and directories after a build, to persist build artifacts in + the store. + +3. Tvix clients (such as users that have Tvix installed, or, depending + on perspective, builder environments) expect the store to + "materialise" on disk to provide a directory layout with store + paths. + +4. Stores may communicate with other stores, to substitute already built store + paths, i.e. a store acts as a binary cache for other stores. + +The store API attempts to reuse parts of its API between these three +consumers by making similarities explicit in the protocol. This leads +to a protocol that is slightly more complex than a simple "file +upload/download"-system, but at significantly greater efficiency, both in terms +of deduplication opportunities as well as granularity. + +## The Store model + +Contents inside a tvix-store can be grouped into three different message types: + + * Blobs + * Directories + * PathInfo (see further down) + +(check `castore.md` for more detailed field descriptions) + +### Blobs +A blob object contains the literal file contents of regular (or executable) +files. + +### Directory +A directory object describes the direct children of a directory. + +It contains: + - name of child (regular or executable) files, and their [blake3][blake3] hash. + - name of child symlinks, and their target (as string) + - name of child directories, and their [blake3][blake3] hash (forming a Merkle DAG) + +### Content-addressed Store Model +For example, lets consider a directory layout like this, with some +imaginary hashes of file contents: + +``` +. +├── file-1.txt hash: 5891b5b522d5df086d0ff0b110fb +└── nested + └── file-2.txt hash: abc6fd595fc079d3114d4b71a4d8 +``` + +A hash for the *directory* `nested` can be created by creating the `Directory` +object: + +```json +{ + "directories": [], + "files": [{ + "name": "file-2.txt", + "digest": "abc6fd595fc079d3114d4b71a4d8", + "size": 123, + }], + "symlink": [], +} +``` + +And then hashing a serialised form of that data structure. We use the blake3 +hash of the canonical protobuf representation. Let's assume the hash was +`ff0029485729bcde993720749232`. + +To create the directory object one layer up, we now refer to our `nested` +directory object in `directories`, and to `file-1.txt` in `files`: + +```json +{ + "directories": [{ + "name": "nested", + "digest": "ff0029485729bcde993720749232", + "size": 1, + }], + "files": [{ + "name": "file-1.txt", + "digest": "5891b5b522d5df086d0ff0b110fb", + "size": 124, + }] +} +``` + +This Merkle DAG of Directory objects, and flat store of blobs can be used to +describe any file/directory/symlink inside a store path. Due to its content- +addressed nature, it'll automatically deduplicate (re-)used (sub)directories, +and allow substitution from any (untrusted) source. + +The thing that's now only missing is the metadata to map/"mount" from the +content-addressed world to a physical path. + +### PathInfo +As most paths in the Nix store currently are input-addressed [^input-addressed], +we need something mapping from an input-addressed "output path hash" to the +contents in the content- addressed world. + +That's what `PathInfo` provides. It embeds the root node (Directory, File or +Symlink) at a given store path. + +The root nodes' `name` field is populated with the (base)name inside +`/nix/store`, so `xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-pname-1.2.3`. + +The `PathInfo` message also stores references to other store paths, and some +more NARInfo-specific metadata (signatures, narhash, narsize). + + +## API overview + +There's three different services: + +### BlobService +`BlobService` can be used to store and retrieve blobs of data, used to host +regular file contents. + +It is content-addressed, using [blake3][blake3] +as a hashing function. + +As blake3 is a tree hash, there's an opportunity to do +[verified streaming][bao] of parts of the file, +which doesn't need to trust any more information than the root hash itself. +Future extensions of the `BlobService` protocol will enable this. + +### DirectoryService +`DirectoryService` allows lookups (and uploads) of `Directory` messages, and +whole reference graphs of them. + + +### PathInfoService +The PathInfo service provides lookups from a store path hash to a `PathInfo` +message. + +## Example flows + +Below there are some common use cases of tvix-store, and how the different +services are used. + +### Upload files and directories +This is needed for `builtins.path` or `src = ./path` in Nix expressions (A), as +well as for uploading build artifacts to a store (B). + +The path specified needs to be (recursively, BFS-style) traversed. + * All file contents need to be hashed with blake3, and submitted to the + *BlobService* if not already present. + A reference to them needs to be added to the parent Directory object that's + constructed. + * All symlinks need to be added to the parent directory they reside in. + * Whenever a Directory has been fully traversed, it needs to be uploaded to + the *DirectoryService* and a reference to it needs to be added to the parent + Directory object. + +Most of the hashing / directory traversal/uploading can happen in parallel, +as long as Directory objects only refer to Directory objects and Blobs that +have already been uploaded. + +When reaching the root, a `PathInfo` object needs to be constructed. + + * In the case of content-addressed paths (A), the name of the root node is + based on the NAR representation of the contents. + It might make sense to be able to offload the NAR calculation to the store, + which can cache it. + * In the case of build artifacts (B), the output path is input-addressed and + known upfront. + +Contrary to Nix, this has the advantage of not having to upload a lot of things +to the store that didn't change. + +### Reading files from the store from the evaluator +This is the case when `nixpkgs` is located in the store, or IFD in general. + +The store client asks the `PathInfoService` for the `PathInfo` of the output +path in the request, and looks at the root node. + +If something other than the root of the store path is requested, like for +example `maintainers/maintainer-list.nix`, the root_node Directory is inspected +and potentially a chain of `Directory` objects requested from +*DirectoryService*. [^n+1query]. + +When the desired file is reached, the *BlobService* can be used to read the +contents of this file, and return it back to the evaluator. + +FUTUREWORK: define how importing from symlinks should/does work. + +Contrary to Nix, this has the advantage of not having to copy all of the +contents of a store path to the evaluating machine, but really only fetching +the files the evaluator currently cares about. + +### Materializing store paths on disk +This is useful for people running a Tvix-only system, or running builds on a +"Tvix remote builder" in its own mount namespace. + +In a system with Nix installed, we can't simply manually "extract" things to +`/nix/store`, as Nix assumes to own all writes to this location. +In these use cases, we're probably better off exposing a tvix-store as a local +binary cache (that's what nar-bridge does). + +Assuming we are in an environment where we control `/nix/store` exclusively, a +"realize to disk" would either "extract" things from the tvix-store to a +filesystem, or expose a FUSE filesystem. The latter would be particularly +interesting for remote build workloads, as build inputs can be realized on- +demand, which saves copying around a lot of never-accessed files. + +In both cases, the API interactions are similar. + * The *PathInfoService* is asked for the `PathInfo` of the requested store path. + * If everything should be "extracted", the *DirectoryService* is asked for all + `Directory` objects in the closure, the file structure is created, all Blobs + are downloaded and placed in their corresponding location and all symlinks + are created accordingly. + * If this is a FUSE filesystem, we can decide to only request a subset, + similar to the "Reading files from the store from the evaluator" use case, + even though it might make sense to keep all Directory objects around. + (See the caveat in "Trust model" though!) + +### Stores communicating with other stores +The gRPC API exposed by the tvix-store allows composing multiple stores, and +implementing some caching strategies, that store clients don't need to be aware +of. + + * For example, a caching strategy could have a fast local tvix-store, that's + asked first and filled with data from a slower remote tvix-store. + + * Multiple stores could be asked for the same data, and whatever store returns + the right data first wins. + + +## Trust model / Distribution +As already described above, the only non-content-addressed service is the +`PathInfo` service. + +This means, all other messages (such as `Blob` and `Directory` messages) can be +substituted from many different, untrusted sources/mirrors, which will make +plugging in additional substitution strategies like IPFS, local network +neighbors super simple. + +As for `PathInfo`, we don't specify an additional signature mechanism yet, but +carry the NAR-based signatures from Nix along. + +This means, if we don't trust a remote `PathInfo` object, we currently need to +"stream" the NAR representation to validate these signatures. + +However, the slow part is downloading of NAR files, and considering we have +more granularity available, we might only need to download some small blobs, +rather than a whole NAR file. + +A future signature mechanism, that is only signing (parts of) the `PathInfo` +message, which only points to content-addressed data will enable verified +partial access into a store path, opening up opportunities for lazy filesystem +access, which is very useful in remote builder scenarios. + + + +[blake3]: https://github.com/BLAKE3-team/BLAKE3 +[bao]: https://github.com/oconnor663/bao +[^input-addressed]: Nix hashes the A-Term representation of a .drv, after doing + some replacements on refered Input Derivations to calculate + output paths. +[^n+1query]: This would expose an N+1 query problem. However it's not a problem + in practice, as there's usually always a "local" caching store in + the loop, and *DirectoryService* supports a recursive lookup for + all `Directory` children of a `Directory` diff --git a/tvix/store/docs/castore.md b/tvix/store/docs/castore.md new file mode 100644 index 000000000000..f555ba5a861b --- /dev/null +++ b/tvix/store/docs/castore.md @@ -0,0 +1,50 @@ +# //tvix/store/docs/castore.md + +This provides some more notes on the fields used in castore.proto. + +It's meant to supplement `//tvix/store/docs/api.md`. + +## Directory message +`Directory` messages use the blake3 hash of their canonical protobuf +serialization as its identifier. + +A `Directory` message contains three lists, `directories`, `files` and +`symlinks`, holding `DirectoryNode`, `FileNode` and `SymlinkNode` messages +respectively. They describe all the direct child elements that are contained in +a directory. + +All three message types have a `name` field, specifying the (base)name of the +element (which MUST not contain slashes or null bytes, and MUST not be '.' or '..'). +For reproducibility reasons, the lists MUST be sorted by that name and also +MUST be unique across all three lists. + +In addition to the `name` field, the various *Node messages have the following +fields: + +## DirectoryNode +A `DirectoryNode` message represents a child directory. + +It has a `digest` field, which points to the identifier of another `Directory` +message, making a `Directory` a merkle tree (or strictly speaking, a graph, as +two elements pointing to a child directory with the same contents would point +to the same `Directory` message. + +There's also a `size` field, containing the (total) number of all child +elements in the referenced `Directory`, which helps for inode calculation. + +## FileNode +A `FileNode` message represents a child (regular) file. + +Its `digest` field contains the blake3 hash of the file contents. It can be +looked up in the `BlobService`. + +The `size` field contains the size of the blob the `digest` field refers to. + +The `executable` field specifies whether the file should be marked as +executable or not. + +## SymlinkNode +A `SymlinkNode` message represents a child symlink. + +In addition to the `name` field, the only additional field is the `target`, +which is a string containing the target of the symlink. diff --git a/tvix/store/docs/why-not-git-trees.md b/tvix/store/docs/why-not-git-trees.md new file mode 100644 index 000000000000..fd46252cf55c --- /dev/null +++ b/tvix/store/docs/why-not-git-trees.md @@ -0,0 +1,57 @@ +## Why not git tree objects? + +We've been experimenting with (some variations of) the git tree and object +format, and ultimately decided against using it as an internal format, and +instead adapted the one documented in the other documents here. + +While the tvix-store API protocol shares some similarities with the format used +in git for trees and objects, the git one has shown some significant +disadvantages: + +### The binary encoding itself + +#### trees +The git tree object format is a very binary, error-prone and +"made-to-be-read-and-written-from-C" format. + +Tree objects are a combination of null-terminated strings, and fields of known +length. References to other tree objects use the literal sha1 hash of another +tree object in this encoding. +Extensions of the format/changes are very hard to do right, because parsers are +not aware they might be parsing something different. + +The tvix-store protocol uses a canonical protobuf serialization, and uses +the [blake3][blake3] hash of that serialization to point to other `Directory` +messages. +It's both compact and with a wide range of libraries for encoders and decoders +in many programming languages. +The choice of protobuf makes it easy to add new fields, and make old clients +aware of some unknown fields being detected [^adding-fields]. + +#### blob +On disk, git blob objects start with a "blob" prefix, then the size of the +payload, and then the data itself. The hash of a blob is the literal sha1sum +over all of this - which makes it something very git specific to request for. + +tvix-store simply uses the [blake3][blake3] hash of the literal contents +when referring to a file/blob, which makes it very easy to ask other data +sources for the same data, as no git-specific payload is included in the hash. +This also plays very well together with things like [iroh][iroh-discussion], +which plans to provide a way to substitute (large)blobs by their blake3 hash +over the IPFS network. + +In addition to that, [blake3][blake3] makes it possible to do +[verified streaming][bao], as already described in other parts of the +documentation. + +The git tree object format uses sha1 both for references to other trees and +hashes of blobs, which isn't really a hash function to fundamentally base +everything on in 2023. +The [migration to sha256][git-sha256] also has been dead for some years now, +and it's unclear how a "blake3" version of this would even look like. + +[bao]: https://github.com/oconnor663/bao +[blake3]: https://github.com/BLAKE3-team/BLAKE3 +[git-sha256]: https://git-scm.com/docs/hash-function-transition/ +[iroh-discussion]: https://github.com/n0-computer/iroh/discussions/707#discussioncomment-5070197 +[^adding-fields]: Obviously, adding new fields will change hashes, but it's something that's easy to detect. \ No newline at end of file diff --git a/tvix/store/protos/LICENSE b/tvix/store/protos/LICENSE new file mode 100644 index 000000000000..2034ada6fd9a --- /dev/null +++ b/tvix/store/protos/LICENSE @@ -0,0 +1,21 @@ +Copyright © The Tvix Authors + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +“Software”), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + diff --git a/tvix/store/protos/go.mod b/tvix/store/protos/go.mod new file mode 100644 index 000000000000..3038b2a6c58c --- /dev/null +++ b/tvix/store/protos/go.mod @@ -0,0 +1,20 @@ +module code.tvl.fyi/tvix/store/protos + +go 1.19 + +require ( + code.tvl.fyi/tvix/castore/protos v0.0.0-20230922125121-72355662d742 + github.com/nix-community/go-nix v0.0.0-20231005143722-b0f8b73c06df + google.golang.org/grpc v1.51.0 + google.golang.org/protobuf v1.28.1 +) + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect + golang.org/x/net v0.7.0 // indirect + golang.org/x/sys v0.5.0 // indirect + golang.org/x/text v0.7.0 // indirect + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect + lukechampine.com/blake3 v1.1.7 // indirect +) diff --git a/tvix/store/protos/go.sum b/tvix/store/protos/go.sum new file mode 100644 index 000000000000..e1840f8d154f --- /dev/null +++ b/tvix/store/protos/go.sum @@ -0,0 +1,90 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +code.tvl.fyi/tvix/castore/protos v0.0.0-20230922125121-72355662d742 h1:x7LsxggggaN3acnCMNDO5LZLAV+A+rZ+R8TXzr+Lgsk= +code.tvl.fyi/tvix/castore/protos v0.0.0-20230922125121-72355662d742/go.mod h1:Ejhyvc0dJUWQMxtJxddfFuAF5N8IKIO94q5CP4czY8Y= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/nix-community/go-nix v0.0.0-20231005143722-b0f8b73c06df h1:n4I26uXUST5vmdsDWPo9ikK57il4htQyhnsLWoHYFmY= +github.com/nix-community/go-nix v0.0.0-20231005143722-b0f8b73c06df/go.mod h1:hHM9UK2zOCjvmiLgeaW4LVbOW/vBaRWFJGzfi31/slQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U= +google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0= +lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA= diff --git a/tvix/store/protos/pathinfo.go b/tvix/store/protos/pathinfo.go new file mode 100644 index 000000000000..6607b5c879fb --- /dev/null +++ b/tvix/store/protos/pathinfo.go @@ -0,0 +1,93 @@ +package storev1 + +import ( + "bytes" + "encoding/base64" + "fmt" + + "github.com/nix-community/go-nix/pkg/storepath" +) + +// Validate performs some checks on the PathInfo struct, returning either the +// StorePath of the root node, or an error. +func (p *PathInfo) Validate() (*storepath.StorePath, error) { + // ensure References has the right number of bytes. + for i, reference := range p.GetReferences() { + if len(reference) != storepath.PathHashSize { + return nil, fmt.Errorf("invalid length of digest at position %d, expected %d, got %d", i, storepath.PathHashSize, len(reference)) + } + } + + // If there's a Narinfo field populated.. + if narInfo := p.GetNarinfo(); narInfo != nil { + // ensure the number of references matches len(References). + if len(narInfo.GetReferenceNames()) != len(p.GetReferences()) { + return nil, fmt.Errorf("inconsistent number of references: %d (references) vs %d (narinfo)", len(narInfo.GetReferenceNames()), len(p.GetReferences())) + } + + // for each ReferenceName… + for i, referenceName := range narInfo.GetReferenceNames() { + // ensure it parses to a store path + storePath, err := storepath.FromString(referenceName) + if err != nil { + return nil, fmt.Errorf("invalid ReferenceName at position %d: %w", i, err) + } + + // ensure the digest matches the one at References[i] + if !bytes.Equal(p.GetReferences()[i], storePath.Digest) { + return nil, fmt.Errorf( + "digest in ReferenceName at position %d does not match digest in PathInfo, expected %s, got %s", + i, + base64.StdEncoding.EncodeToString(p.GetReferences()[i]), + base64.StdEncoding.EncodeToString(storePath.Digest), + ) + } + } + } + + // ensure there is a (root) node present + rootNode := p.GetNode() + if rootNode == nil { + return nil, fmt.Errorf("root node must be set") + } + + // ensure it properly parses to a store path, and in case it refers to a digest, ensure it has the right length. + if node := rootNode.GetDirectory(); node != nil { + if len(node.Digest) != 32 { + return nil, fmt.Errorf("invalid digest size for %s, expected %d, got %d", node.Name, 32, len(node.Digest)) + } + + storePath, err := storepath.FromString(string(node.GetName())) + + if err != nil { + return nil, fmt.Errorf("unable to parse %s as StorePath: %w", node.Name, err) + } + + return storePath, nil + + } else if node := rootNode.GetFile(); node != nil { + if len(node.Digest) != 32 { + return nil, fmt.Errorf("invalid digest size for %s, expected %d, got %d", node.Name, 32, len(node.Digest)) + } + + storePath, err := storepath.FromString(string(node.GetName())) + if err != nil { + return nil, fmt.Errorf("unable to parse %s as StorePath: %w", node.Name, err) + } + + return storePath, nil + + } else if node := rootNode.GetSymlink(); node != nil { + storePath, err := storepath.FromString(string(node.GetName())) + + if err != nil { + return nil, fmt.Errorf("unable to parse %s as StorePath: %w", node.Name, err) + } + + return storePath, nil + + } else { + // this would only happen if we introduced a new type + panic("unreachable") + } +} diff --git a/tvix/store/protos/pathinfo.pb.go b/tvix/store/protos/pathinfo.pb.go new file mode 100644 index 000000000000..1e5479ac8f75 --- /dev/null +++ b/tvix/store/protos/pathinfo.pb.go @@ -0,0 +1,369 @@ +// SPDX-License-Identifier: MIT +// Copyright © 2022 The Tvix Authors + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc (unknown) +// source: tvix/store/protos/pathinfo.proto + +package storev1 + +import ( + protos "code.tvl.fyi/tvix/castore/protos" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// PathInfo shows information about a Nix Store Path. +// That's a single element inside /nix/store. +type PathInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The path can be a directory, file or symlink. + Node *protos.Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"` + // List of references (output path hashes) + // This really is the raw *bytes*, after decoding nixbase32, and not a + // base32-encoded string. + References [][]byte `protobuf:"bytes,2,rep,name=references,proto3" json:"references,omitempty"` + // see below. + Narinfo *NARInfo `protobuf:"bytes,3,opt,name=narinfo,proto3" json:"narinfo,omitempty"` +} + +func (x *PathInfo) Reset() { + *x = PathInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_tvix_store_protos_pathinfo_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PathInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PathInfo) ProtoMessage() {} + +func (x *PathInfo) ProtoReflect() protoreflect.Message { + mi := &file_tvix_store_protos_pathinfo_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PathInfo.ProtoReflect.Descriptor instead. +func (*PathInfo) Descriptor() ([]byte, []int) { + return file_tvix_store_protos_pathinfo_proto_rawDescGZIP(), []int{0} +} + +func (x *PathInfo) GetNode() *protos.Node { + if x != nil { + return x.Node + } + return nil +} + +func (x *PathInfo) GetReferences() [][]byte { + if x != nil { + return x.References + } + return nil +} + +func (x *PathInfo) GetNarinfo() *NARInfo { + if x != nil { + return x.Narinfo + } + return nil +} + +// Nix C++ uses NAR (Nix Archive) as a format to transfer store paths, +// and stores metadata and signatures in NARInfo files. +// Store all these attributes in a separate message. +// +// This is useful to render .narinfo files to clients, or to preserve/validate +// these signatures. +// As verifying these signatures requires the whole NAR file to be synthesized, +// moving to another signature scheme is desired. +// Even then, it still makes sense to hold this data, for old clients. +type NARInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // This size of the NAR file, in bytes. + NarSize uint64 `protobuf:"varint,1,opt,name=nar_size,json=narSize,proto3" json:"nar_size,omitempty"` + // The sha256 of the NAR file representation. + NarSha256 []byte `protobuf:"bytes,2,opt,name=nar_sha256,json=narSha256,proto3" json:"nar_sha256,omitempty"` + // The signatures in a .narinfo file. + Signatures []*NARInfo_Signature `protobuf:"bytes,3,rep,name=signatures,proto3" json:"signatures,omitempty"` + // A list of references. To validate .narinfo signatures, a fingerprint + // needs to be constructed. + // This fingerprint doesn't just contain the hashes of the output paths of + // all references (like PathInfo.references), but their whole (base)names, + // so we need to keep them somewhere. + ReferenceNames []string `protobuf:"bytes,4,rep,name=reference_names,json=referenceNames,proto3" json:"reference_names,omitempty"` +} + +func (x *NARInfo) Reset() { + *x = NARInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_tvix_store_protos_pathinfo_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NARInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NARInfo) ProtoMessage() {} + +func (x *NARInfo) ProtoReflect() protoreflect.Message { + mi := &file_tvix_store_protos_pathinfo_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NARInfo.ProtoReflect.Descriptor instead. +func (*NARInfo) Descriptor() ([]byte, []int) { + return file_tvix_store_protos_pathinfo_proto_rawDescGZIP(), []int{1} +} + +func (x *NARInfo) GetNarSize() uint64 { + if x != nil { + return x.NarSize + } + return 0 +} + +func (x *NARInfo) GetNarSha256() []byte { + if x != nil { + return x.NarSha256 + } + return nil +} + +func (x *NARInfo) GetSignatures() []*NARInfo_Signature { + if x != nil { + return x.Signatures + } + return nil +} + +func (x *NARInfo) GetReferenceNames() []string { + if x != nil { + return x.ReferenceNames + } + return nil +} + +// This represents a (parsed) signature line in a .narinfo file. +type NARInfo_Signature struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *NARInfo_Signature) Reset() { + *x = NARInfo_Signature{} + if protoimpl.UnsafeEnabled { + mi := &file_tvix_store_protos_pathinfo_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NARInfo_Signature) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NARInfo_Signature) ProtoMessage() {} + +func (x *NARInfo_Signature) ProtoReflect() protoreflect.Message { + mi := &file_tvix_store_protos_pathinfo_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NARInfo_Signature.ProtoReflect.Descriptor instead. +func (*NARInfo_Signature) Descriptor() ([]byte, []int) { + return file_tvix_store_protos_pathinfo_proto_rawDescGZIP(), []int{1, 0} +} + +func (x *NARInfo_Signature) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *NARInfo_Signature) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_tvix_store_protos_pathinfo_proto protoreflect.FileDescriptor + +var file_tvix_store_protos_pathinfo_proto_rawDesc = []byte{ + 0x0a, 0x20, 0x74, 0x76, 0x69, 0x78, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x73, 0x2f, 0x70, 0x61, 0x74, 0x68, 0x69, 0x6e, 0x66, 0x6f, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x0d, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, + 0x31, 0x1a, 0x21, 0x74, 0x76, 0x69, 0x78, 0x2f, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x01, 0x0a, 0x08, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, + 0x6f, 0x12, 0x29, 0x0a, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x15, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, + 0x31, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x12, 0x1e, 0x0a, 0x0a, + 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0c, + 0x52, 0x0a, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x73, 0x12, 0x30, 0x0a, 0x07, + 0x6e, 0x61, 0x72, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, + 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4e, 0x41, + 0x52, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x07, 0x6e, 0x61, 0x72, 0x69, 0x6e, 0x66, 0x6f, 0x22, 0xe3, + 0x01, 0x0a, 0x07, 0x4e, 0x41, 0x52, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x19, 0x0a, 0x08, 0x6e, 0x61, + 0x72, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x6e, 0x61, + 0x72, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x6e, 0x61, 0x72, 0x5f, 0x73, 0x68, 0x61, + 0x32, 0x35, 0x36, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6e, 0x61, 0x72, 0x53, 0x68, + 0x61, 0x32, 0x35, 0x36, 0x12, 0x40, 0x0a, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, + 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, + 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4e, 0x41, 0x52, 0x49, 0x6e, 0x66, 0x6f, + 0x2e, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x52, 0x0a, 0x73, 0x69, 0x67, 0x6e, + 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, + 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, + 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x1a, + 0x33, 0x0a, 0x09, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x12, 0x0a, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x42, 0x28, 0x5a, 0x26, 0x63, 0x6f, 0x64, 0x65, 0x2e, 0x74, 0x76, 0x6c, + 0x2e, 0x66, 0x79, 0x69, 0x2f, 0x74, 0x76, 0x69, 0x78, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x3b, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x76, 0x31, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_tvix_store_protos_pathinfo_proto_rawDescOnce sync.Once + file_tvix_store_protos_pathinfo_proto_rawDescData = file_tvix_store_protos_pathinfo_proto_rawDesc +) + +func file_tvix_store_protos_pathinfo_proto_rawDescGZIP() []byte { + file_tvix_store_protos_pathinfo_proto_rawDescOnce.Do(func() { + file_tvix_store_protos_pathinfo_proto_rawDescData = protoimpl.X.CompressGZIP(file_tvix_store_protos_pathinfo_proto_rawDescData) + }) + return file_tvix_store_protos_pathinfo_proto_rawDescData +} + +var file_tvix_store_protos_pathinfo_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_tvix_store_protos_pathinfo_proto_goTypes = []interface{}{ + (*PathInfo)(nil), // 0: tvix.store.v1.PathInfo + (*NARInfo)(nil), // 1: tvix.store.v1.NARInfo + (*NARInfo_Signature)(nil), // 2: tvix.store.v1.NARInfo.Signature + (*protos.Node)(nil), // 3: tvix.castore.v1.Node +} +var file_tvix_store_protos_pathinfo_proto_depIdxs = []int32{ + 3, // 0: tvix.store.v1.PathInfo.node:type_name -> tvix.castore.v1.Node + 1, // 1: tvix.store.v1.PathInfo.narinfo:type_name -> tvix.store.v1.NARInfo + 2, // 2: tvix.store.v1.NARInfo.signatures:type_name -> tvix.store.v1.NARInfo.Signature + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_tvix_store_protos_pathinfo_proto_init() } +func file_tvix_store_protos_pathinfo_proto_init() { + if File_tvix_store_protos_pathinfo_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_tvix_store_protos_pathinfo_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PathInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tvix_store_protos_pathinfo_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NARInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tvix_store_protos_pathinfo_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NARInfo_Signature); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_tvix_store_protos_pathinfo_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_tvix_store_protos_pathinfo_proto_goTypes, + DependencyIndexes: file_tvix_store_protos_pathinfo_proto_depIdxs, + MessageInfos: file_tvix_store_protos_pathinfo_proto_msgTypes, + }.Build() + File_tvix_store_protos_pathinfo_proto = out.File + file_tvix_store_protos_pathinfo_proto_rawDesc = nil + file_tvix_store_protos_pathinfo_proto_goTypes = nil + file_tvix_store_protos_pathinfo_proto_depIdxs = nil +} diff --git a/tvix/store/protos/pathinfo.proto b/tvix/store/protos/pathinfo.proto new file mode 100644 index 000000000000..aa98c6df9a2d --- /dev/null +++ b/tvix/store/protos/pathinfo.proto @@ -0,0 +1,58 @@ +// SPDX-License-Identifier: MIT +// Copyright © 2022 The Tvix Authors +syntax = "proto3"; + +package tvix.store.v1; + +import "tvix/castore/protos/castore.proto"; + +option go_package = "code.tvl.fyi/tvix/store/protos;storev1"; + +// PathInfo shows information about a Nix Store Path. +// That's a single element inside /nix/store. +message PathInfo { + // The path can be a directory, file or symlink. + tvix.castore.v1.Node node = 1; + + // List of references (output path hashes) + // This really is the raw *bytes*, after decoding nixbase32, and not a + // base32-encoded string. + repeated bytes references = 2; + + // see below. + NARInfo narinfo = 3; +} + +// Nix C++ uses NAR (Nix Archive) as a format to transfer store paths, +// and stores metadata and signatures in NARInfo files. +// Store all these attributes in a separate message. +// +// This is useful to render .narinfo files to clients, or to preserve/validate +// these signatures. +// As verifying these signatures requires the whole NAR file to be synthesized, +// moving to another signature scheme is desired. +// Even then, it still makes sense to hold this data, for old clients. +message NARInfo { + // This represents a (parsed) signature line in a .narinfo file. + message Signature { + string name = 1; + bytes data = 2; + }; + + // This size of the NAR file, in bytes. + uint64 nar_size = 1; + + // The sha256 of the NAR file representation. + bytes nar_sha256 = 2; + + // The signatures in a .narinfo file. + repeated Signature signatures = 3; + + // A list of references. To validate .narinfo signatures, a fingerprint + // needs to be constructed. + // This fingerprint doesn't just contain the hashes of the output paths of + // all references (like PathInfo.references), but their whole (base)names, + // so we need to keep them somewhere. + repeated string reference_names = 4; + +} diff --git a/tvix/store/protos/rpc_pathinfo.pb.go b/tvix/store/protos/rpc_pathinfo.pb.go new file mode 100644 index 000000000000..8a3c10a82101 --- /dev/null +++ b/tvix/store/protos/rpc_pathinfo.pb.go @@ -0,0 +1,348 @@ +// SPDX-License-Identifier: MIT +// Copyright © 2022 The Tvix Authors + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc (unknown) +// source: tvix/store/protos/rpc_pathinfo.proto + +package storev1 + +import ( + protos "code.tvl.fyi/tvix/castore/protos" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// The parameters that can be used to lookup a (single) PathInfo object. +// Currently, only a lookup by output hash is supported. +type GetPathInfoRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to ByWhat: + // + // *GetPathInfoRequest_ByOutputHash + ByWhat isGetPathInfoRequest_ByWhat `protobuf_oneof:"by_what"` +} + +func (x *GetPathInfoRequest) Reset() { + *x = GetPathInfoRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetPathInfoRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetPathInfoRequest) ProtoMessage() {} + +func (x *GetPathInfoRequest) ProtoReflect() protoreflect.Message { + mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetPathInfoRequest.ProtoReflect.Descriptor instead. +func (*GetPathInfoRequest) Descriptor() ([]byte, []int) { + return file_tvix_store_protos_rpc_pathinfo_proto_rawDescGZIP(), []int{0} +} + +func (m *GetPathInfoRequest) GetByWhat() isGetPathInfoRequest_ByWhat { + if m != nil { + return m.ByWhat + } + return nil +} + +func (x *GetPathInfoRequest) GetByOutputHash() []byte { + if x, ok := x.GetByWhat().(*GetPathInfoRequest_ByOutputHash); ok { + return x.ByOutputHash + } + return nil +} + +type isGetPathInfoRequest_ByWhat interface { + isGetPathInfoRequest_ByWhat() +} + +type GetPathInfoRequest_ByOutputHash struct { + // The output hash of a nix path (20 bytes). + // This is the nixbase32-decoded portion of a Nix output path, so to substitute + // /nix/store/xm35nga2g20mz5sm5l6n8v3bdm86yj83-cowsay-3.04 + // this field would contain nixbase32dec("xm35nga2g20mz5sm5l6n8v3bdm86yj83"). + ByOutputHash []byte `protobuf:"bytes,1,opt,name=by_output_hash,json=byOutputHash,proto3,oneof"` +} + +func (*GetPathInfoRequest_ByOutputHash) isGetPathInfoRequest_ByWhat() {} + +// The parameters that can be used to lookup (multiple) PathInfo objects. +// Currently no filtering is possible, all objects are returned. +type ListPathInfoRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ListPathInfoRequest) Reset() { + *x = ListPathInfoRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListPathInfoRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListPathInfoRequest) ProtoMessage() {} + +func (x *ListPathInfoRequest) ProtoReflect() protoreflect.Message { + mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListPathInfoRequest.ProtoReflect.Descriptor instead. +func (*ListPathInfoRequest) Descriptor() ([]byte, []int) { + return file_tvix_store_protos_rpc_pathinfo_proto_rawDescGZIP(), []int{1} +} + +// CalculateNARResponse is the response returned by the CalculateNAR request. +// +// It contains the size of the NAR representation (in bytes), and the sha56 +// digest. +type CalculateNARResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // This size of the NAR file, in bytes. + NarSize uint64 `protobuf:"varint,1,opt,name=nar_size,json=narSize,proto3" json:"nar_size,omitempty"` + // The sha256 of the NAR file representation. + NarSha256 []byte `protobuf:"bytes,2,opt,name=nar_sha256,json=narSha256,proto3" json:"nar_sha256,omitempty"` +} + +func (x *CalculateNARResponse) Reset() { + *x = CalculateNARResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CalculateNARResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CalculateNARResponse) ProtoMessage() {} + +func (x *CalculateNARResponse) ProtoReflect() protoreflect.Message { + mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CalculateNARResponse.ProtoReflect.Descriptor instead. +func (*CalculateNARResponse) Descriptor() ([]byte, []int) { + return file_tvix_store_protos_rpc_pathinfo_proto_rawDescGZIP(), []int{2} +} + +func (x *CalculateNARResponse) GetNarSize() uint64 { + if x != nil { + return x.NarSize + } + return 0 +} + +func (x *CalculateNARResponse) GetNarSha256() []byte { + if x != nil { + return x.NarSha256 + } + return nil +} + +var File_tvix_store_protos_rpc_pathinfo_proto protoreflect.FileDescriptor + +var file_tvix_store_protos_rpc_pathinfo_proto_rawDesc = []byte{ + 0x0a, 0x24, 0x74, 0x76, 0x69, 0x78, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x69, 0x6e, 0x66, 0x6f, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, + 0x72, 0x65, 0x2e, 0x76, 0x31, 0x1a, 0x20, 0x74, 0x76, 0x69, 0x78, 0x2f, 0x73, 0x74, 0x6f, 0x72, + 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x70, 0x61, 0x74, 0x68, 0x69, 0x6e, 0x66, + 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x21, 0x74, 0x76, 0x69, 0x78, 0x2f, 0x63, 0x61, + 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x63, 0x61, 0x73, + 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x47, 0x0a, 0x12, 0x47, 0x65, + 0x74, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x26, 0x0a, 0x0e, 0x62, 0x79, 0x5f, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x5f, 0x68, 0x61, + 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x0c, 0x62, 0x79, 0x4f, 0x75, + 0x74, 0x70, 0x75, 0x74, 0x48, 0x61, 0x73, 0x68, 0x42, 0x09, 0x0a, 0x07, 0x62, 0x79, 0x5f, 0x77, + 0x68, 0x61, 0x74, 0x22, 0x15, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x61, 0x74, 0x68, 0x49, + 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x50, 0x0a, 0x14, 0x43, 0x61, + 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x4e, 0x41, 0x52, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x6e, 0x61, 0x72, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x6e, 0x61, 0x72, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x1d, 0x0a, + 0x0a, 0x6e, 0x61, 0x72, 0x5f, 0x73, 0x68, 0x61, 0x32, 0x35, 0x36, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x09, 0x6e, 0x61, 0x72, 0x53, 0x68, 0x61, 0x32, 0x35, 0x36, 0x32, 0xa0, 0x02, 0x0a, + 0x0f, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x41, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x21, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, + 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x74, 0x68, 0x49, + 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x74, 0x76, 0x69, + 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x49, + 0x6e, 0x66, 0x6f, 0x12, 0x37, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x17, 0x2e, 0x74, 0x76, 0x69, + 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x49, + 0x6e, 0x66, 0x6f, 0x1a, 0x17, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, + 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x4a, 0x0a, 0x0c, + 0x43, 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x4e, 0x41, 0x52, 0x12, 0x15, 0x2e, 0x74, + 0x76, 0x69, 0x78, 0x2e, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4e, + 0x6f, 0x64, 0x65, 0x1a, 0x23, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, + 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x4e, 0x41, 0x52, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, + 0x12, 0x22, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, + 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x30, 0x01, 0x42, + 0x28, 0x5a, 0x26, 0x63, 0x6f, 0x64, 0x65, 0x2e, 0x74, 0x76, 0x6c, 0x2e, 0x66, 0x79, 0x69, 0x2f, + 0x74, 0x76, 0x69, 0x78, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x73, 0x3b, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, +} + +var ( + file_tvix_store_protos_rpc_pathinfo_proto_rawDescOnce sync.Once + file_tvix_store_protos_rpc_pathinfo_proto_rawDescData = file_tvix_store_protos_rpc_pathinfo_proto_rawDesc +) + +func file_tvix_store_protos_rpc_pathinfo_proto_rawDescGZIP() []byte { + file_tvix_store_protos_rpc_pathinfo_proto_rawDescOnce.Do(func() { + file_tvix_store_protos_rpc_pathinfo_proto_rawDescData = protoimpl.X.CompressGZIP(file_tvix_store_protos_rpc_pathinfo_proto_rawDescData) + }) + return file_tvix_store_protos_rpc_pathinfo_proto_rawDescData +} + +var file_tvix_store_protos_rpc_pathinfo_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_tvix_store_protos_rpc_pathinfo_proto_goTypes = []interface{}{ + (*GetPathInfoRequest)(nil), // 0: tvix.store.v1.GetPathInfoRequest + (*ListPathInfoRequest)(nil), // 1: tvix.store.v1.ListPathInfoRequest + (*CalculateNARResponse)(nil), // 2: tvix.store.v1.CalculateNARResponse + (*PathInfo)(nil), // 3: tvix.store.v1.PathInfo + (*protos.Node)(nil), // 4: tvix.castore.v1.Node +} +var file_tvix_store_protos_rpc_pathinfo_proto_depIdxs = []int32{ + 0, // 0: tvix.store.v1.PathInfoService.Get:input_type -> tvix.store.v1.GetPathInfoRequest + 3, // 1: tvix.store.v1.PathInfoService.Put:input_type -> tvix.store.v1.PathInfo + 4, // 2: tvix.store.v1.PathInfoService.CalculateNAR:input_type -> tvix.castore.v1.Node + 1, // 3: tvix.store.v1.PathInfoService.List:input_type -> tvix.store.v1.ListPathInfoRequest + 3, // 4: tvix.store.v1.PathInfoService.Get:output_type -> tvix.store.v1.PathInfo + 3, // 5: tvix.store.v1.PathInfoService.Put:output_type -> tvix.store.v1.PathInfo + 2, // 6: tvix.store.v1.PathInfoService.CalculateNAR:output_type -> tvix.store.v1.CalculateNARResponse + 3, // 7: tvix.store.v1.PathInfoService.List:output_type -> tvix.store.v1.PathInfo + 4, // [4:8] is the sub-list for method output_type + 0, // [0:4] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_tvix_store_protos_rpc_pathinfo_proto_init() } +func file_tvix_store_protos_rpc_pathinfo_proto_init() { + if File_tvix_store_protos_rpc_pathinfo_proto != nil { + return + } + file_tvix_store_protos_pathinfo_proto_init() + if !protoimpl.UnsafeEnabled { + file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetPathInfoRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListPathInfoRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CalculateNARResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[0].OneofWrappers = []interface{}{ + (*GetPathInfoRequest_ByOutputHash)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_tvix_store_protos_rpc_pathinfo_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_tvix_store_protos_rpc_pathinfo_proto_goTypes, + DependencyIndexes: file_tvix_store_protos_rpc_pathinfo_proto_depIdxs, + MessageInfos: file_tvix_store_protos_rpc_pathinfo_proto_msgTypes, + }.Build() + File_tvix_store_protos_rpc_pathinfo_proto = out.File + file_tvix_store_protos_rpc_pathinfo_proto_rawDesc = nil + file_tvix_store_protos_rpc_pathinfo_proto_goTypes = nil + file_tvix_store_protos_rpc_pathinfo_proto_depIdxs = nil +} diff --git a/tvix/store/protos/rpc_pathinfo.proto b/tvix/store/protos/rpc_pathinfo.proto new file mode 100644 index 000000000000..1930e87de004 --- /dev/null +++ b/tvix/store/protos/rpc_pathinfo.proto @@ -0,0 +1,77 @@ +// SPDX-License-Identifier: MIT +// Copyright © 2022 The Tvix Authors +syntax = "proto3"; + +package tvix.store.v1; + +import "tvix/store/protos/pathinfo.proto"; +import "tvix/castore/protos/castore.proto"; + +option go_package = "code.tvl.fyi/tvix/store/protos;storev1"; + +service PathInfoService { + // Return a PathInfo message matching the criteria specified in the + // GetPathInfoRequest message. + rpc Get(GetPathInfoRequest) returns (PathInfo); + + // Upload a PathInfo object to the remote end. It MUST not return until the + // PathInfo object has been written on the the remote end. + // + // The remote end MAY check if a potential DirectoryNode has already been + // uploaded. + // + // Uploading clients SHOULD obviously not steer other machines to try to + // substitute before from the remote end before having finished uploading + // PathInfo, Directories and Blobs. + // The returned PathInfo object MAY contain additional narinfo signatures, + // but is otherwise left untouched. + rpc Put(PathInfo) returns (PathInfo); + + + // Calculate the NAR representation of the contents specified by the + // root_node. The calculation SHOULD be cached server-side for subsequent + // requests. + // + // All references (to blobs or Directory messages) MUST already exist in + // the store. + // + // The method can be used to produce a Nix fixed-output path, which + // contains the (compressed) sha256 of the NAR content representation in + // the root_node name (suffixed with the name). + // + // It can also be used to calculate arbitrary NAR hashes of output paths, + // in case a legacy Nix Binary Cache frontend is provided. + rpc CalculateNAR(tvix.castore.v1.Node) returns (CalculateNARResponse); + + // Return a stream of PathInfo messages matching the criteria specified in + // ListPathInfoRequest. + rpc List(ListPathInfoRequest) returns (stream PathInfo); +} + +// The parameters that can be used to lookup a (single) PathInfo object. +// Currently, only a lookup by output hash is supported. +message GetPathInfoRequest { + oneof by_what { + // The output hash of a nix path (20 bytes). + // This is the nixbase32-decoded portion of a Nix output path, so to substitute + // /nix/store/xm35nga2g20mz5sm5l6n8v3bdm86yj83-cowsay-3.04 + // this field would contain nixbase32dec("xm35nga2g20mz5sm5l6n8v3bdm86yj83"). + bytes by_output_hash = 1; + }; +} + +// The parameters that can be used to lookup (multiple) PathInfo objects. +// Currently no filtering is possible, all objects are returned. +message ListPathInfoRequest { } + +// CalculateNARResponse is the response returned by the CalculateNAR request. +// +// It contains the size of the NAR representation (in bytes), and the sha56 +// digest. +message CalculateNARResponse { + // This size of the NAR file, in bytes. + uint64 nar_size = 1; + + // The sha256 of the NAR file representation. + bytes nar_sha256 = 2; +} diff --git a/tvix/store/protos/rpc_pathinfo_grpc.pb.go b/tvix/store/protos/rpc_pathinfo_grpc.pb.go new file mode 100644 index 000000000000..10d8a7ffa49c --- /dev/null +++ b/tvix/store/protos/rpc_pathinfo_grpc.pb.go @@ -0,0 +1,308 @@ +// SPDX-License-Identifier: MIT +// Copyright © 2022 The Tvix Authors + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc (unknown) +// source: tvix/store/protos/rpc_pathinfo.proto + +package storev1 + +import ( + protos "code.tvl.fyi/tvix/castore/protos" + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + PathInfoService_Get_FullMethodName = "/tvix.store.v1.PathInfoService/Get" + PathInfoService_Put_FullMethodName = "/tvix.store.v1.PathInfoService/Put" + PathInfoService_CalculateNAR_FullMethodName = "/tvix.store.v1.PathInfoService/CalculateNAR" + PathInfoService_List_FullMethodName = "/tvix.store.v1.PathInfoService/List" +) + +// PathInfoServiceClient is the client API for PathInfoService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type PathInfoServiceClient interface { + // Return a PathInfo message matching the criteria specified in the + // GetPathInfoRequest message. + Get(ctx context.Context, in *GetPathInfoRequest, opts ...grpc.CallOption) (*PathInfo, error) + // Upload a PathInfo object to the remote end. It MUST not return until the + // PathInfo object has been written on the the remote end. + // + // The remote end MAY check if a potential DirectoryNode has already been + // uploaded. + // + // Uploading clients SHOULD obviously not steer other machines to try to + // substitute before from the remote end before having finished uploading + // PathInfo, Directories and Blobs. + // The returned PathInfo object MAY contain additional narinfo signatures, + // but is otherwise left untouched. + Put(ctx context.Context, in *PathInfo, opts ...grpc.CallOption) (*PathInfo, error) + // Calculate the NAR representation of the contents specified by the + // root_node. The calculation SHOULD be cached server-side for subsequent + // requests. + // + // All references (to blobs or Directory messages) MUST already exist in + // the store. + // + // The method can be used to produce a Nix fixed-output path, which + // contains the (compressed) sha256 of the NAR content representation in + // the root_node name (suffixed with the name). + // + // It can also be used to calculate arbitrary NAR hashes of output paths, + // in case a legacy Nix Binary Cache frontend is provided. + CalculateNAR(ctx context.Context, in *protos.Node, opts ...grpc.CallOption) (*CalculateNARResponse, error) + // Return a stream of PathInfo messages matching the criteria specified in + // ListPathInfoRequest. + List(ctx context.Context, in *ListPathInfoRequest, opts ...grpc.CallOption) (PathInfoService_ListClient, error) +} + +type pathInfoServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewPathInfoServiceClient(cc grpc.ClientConnInterface) PathInfoServiceClient { + return &pathInfoServiceClient{cc} +} + +func (c *pathInfoServiceClient) Get(ctx context.Context, in *GetPathInfoRequest, opts ...grpc.CallOption) (*PathInfo, error) { + out := new(PathInfo) + err := c.cc.Invoke(ctx, PathInfoService_Get_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *pathInfoServiceClient) Put(ctx context.Context, in *PathInfo, opts ...grpc.CallOption) (*PathInfo, error) { + out := new(PathInfo) + err := c.cc.Invoke(ctx, PathInfoService_Put_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *pathInfoServiceClient) CalculateNAR(ctx context.Context, in *protos.Node, opts ...grpc.CallOption) (*CalculateNARResponse, error) { + out := new(CalculateNARResponse) + err := c.cc.Invoke(ctx, PathInfoService_CalculateNAR_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *pathInfoServiceClient) List(ctx context.Context, in *ListPathInfoRequest, opts ...grpc.CallOption) (PathInfoService_ListClient, error) { + stream, err := c.cc.NewStream(ctx, &PathInfoService_ServiceDesc.Streams[0], PathInfoService_List_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &pathInfoServiceListClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type PathInfoService_ListClient interface { + Recv() (*PathInfo, error) + grpc.ClientStream +} + +type pathInfoServiceListClient struct { + grpc.ClientStream +} + +func (x *pathInfoServiceListClient) Recv() (*PathInfo, error) { + m := new(PathInfo) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// PathInfoServiceServer is the server API for PathInfoService service. +// All implementations must embed UnimplementedPathInfoServiceServer +// for forward compatibility +type PathInfoServiceServer interface { + // Return a PathInfo message matching the criteria specified in the + // GetPathInfoRequest message. + Get(context.Context, *GetPathInfoRequest) (*PathInfo, error) + // Upload a PathInfo object to the remote end. It MUST not return until the + // PathInfo object has been written on the the remote end. + // + // The remote end MAY check if a potential DirectoryNode has already been + // uploaded. + // + // Uploading clients SHOULD obviously not steer other machines to try to + // substitute before from the remote end before having finished uploading + // PathInfo, Directories and Blobs. + // The returned PathInfo object MAY contain additional narinfo signatures, + // but is otherwise left untouched. + Put(context.Context, *PathInfo) (*PathInfo, error) + // Calculate the NAR representation of the contents specified by the + // root_node. The calculation SHOULD be cached server-side for subsequent + // requests. + // + // All references (to blobs or Directory messages) MUST already exist in + // the store. + // + // The method can be used to produce a Nix fixed-output path, which + // contains the (compressed) sha256 of the NAR content representation in + // the root_node name (suffixed with the name). + // + // It can also be used to calculate arbitrary NAR hashes of output paths, + // in case a legacy Nix Binary Cache frontend is provided. + CalculateNAR(context.Context, *protos.Node) (*CalculateNARResponse, error) + // Return a stream of PathInfo messages matching the criteria specified in + // ListPathInfoRequest. + List(*ListPathInfoRequest, PathInfoService_ListServer) error + mustEmbedUnimplementedPathInfoServiceServer() +} + +// UnimplementedPathInfoServiceServer must be embedded to have forward compatible implementations. +type UnimplementedPathInfoServiceServer struct { +} + +func (UnimplementedPathInfoServiceServer) Get(context.Context, *GetPathInfoRequest) (*PathInfo, error) { + return nil, status.Errorf(codes.Unimplemented, "method Get not implemented") +} +func (UnimplementedPathInfoServiceServer) Put(context.Context, *PathInfo) (*PathInfo, error) { + return nil, status.Errorf(codes.Unimplemented, "method Put not implemented") +} +func (UnimplementedPathInfoServiceServer) CalculateNAR(context.Context, *protos.Node) (*CalculateNARResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CalculateNAR not implemented") +} +func (UnimplementedPathInfoServiceServer) List(*ListPathInfoRequest, PathInfoService_ListServer) error { + return status.Errorf(codes.Unimplemented, "method List not implemented") +} +func (UnimplementedPathInfoServiceServer) mustEmbedUnimplementedPathInfoServiceServer() {} + +// UnsafePathInfoServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to PathInfoServiceServer will +// result in compilation errors. +type UnsafePathInfoServiceServer interface { + mustEmbedUnimplementedPathInfoServiceServer() +} + +func RegisterPathInfoServiceServer(s grpc.ServiceRegistrar, srv PathInfoServiceServer) { + s.RegisterService(&PathInfoService_ServiceDesc, srv) +} + +func _PathInfoService_Get_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetPathInfoRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PathInfoServiceServer).Get(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: PathInfoService_Get_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PathInfoServiceServer).Get(ctx, req.(*GetPathInfoRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _PathInfoService_Put_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PathInfo) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PathInfoServiceServer).Put(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: PathInfoService_Put_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PathInfoServiceServer).Put(ctx, req.(*PathInfo)) + } + return interceptor(ctx, in, info, handler) +} + +func _PathInfoService_CalculateNAR_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(protos.Node) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PathInfoServiceServer).CalculateNAR(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: PathInfoService_CalculateNAR_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PathInfoServiceServer).CalculateNAR(ctx, req.(*protos.Node)) + } + return interceptor(ctx, in, info, handler) +} + +func _PathInfoService_List_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ListPathInfoRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(PathInfoServiceServer).List(m, &pathInfoServiceListServer{stream}) +} + +type PathInfoService_ListServer interface { + Send(*PathInfo) error + grpc.ServerStream +} + +type pathInfoServiceListServer struct { + grpc.ServerStream +} + +func (x *pathInfoServiceListServer) Send(m *PathInfo) error { + return x.ServerStream.SendMsg(m) +} + +// PathInfoService_ServiceDesc is the grpc.ServiceDesc for PathInfoService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var PathInfoService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "tvix.store.v1.PathInfoService", + HandlerType: (*PathInfoServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Get", + Handler: _PathInfoService_Get_Handler, + }, + { + MethodName: "Put", + Handler: _PathInfoService_Put_Handler, + }, + { + MethodName: "CalculateNAR", + Handler: _PathInfoService_CalculateNAR_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "List", + Handler: _PathInfoService_List_Handler, + ServerStreams: true, + }, + }, + Metadata: "tvix/store/protos/rpc_pathinfo.proto", +} diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs new file mode 100644 index 000000000000..11f19857dd6f --- /dev/null +++ b/tvix/store/src/bin/tvix-store.rs @@ -0,0 +1,425 @@ +use clap::Subcommand; +use data_encoding::BASE64; +use futures::future::try_join_all; +use nix_compat::store_path; +use nix_compat::store_path::StorePath; +use std::io; +use std::path::Path; +use std::path::PathBuf; +use tokio::task::JoinHandle; +use tracing_subscriber::prelude::*; +use tvix_castore::blobservice; +use tvix_castore::directoryservice; +use tvix_castore::import; +use tvix_castore::proto::blob_service_server::BlobServiceServer; +use tvix_castore::proto::directory_service_server::DirectoryServiceServer; +use tvix_castore::proto::node::Node; +use tvix_castore::proto::GRPCBlobServiceWrapper; +use tvix_castore::proto::GRPCDirectoryServiceWrapper; +use tvix_castore::proto::NamedNode; +use tvix_store::listener::ListenerStream; +use tvix_store::pathinfoservice; +use tvix_store::proto::path_info_service_server::PathInfoServiceServer; +use tvix_store::proto::GRPCPathInfoServiceWrapper; +use tvix_store::proto::NarInfo; +use tvix_store::proto::PathInfo; + +#[cfg(feature = "fs")] +use tvix_store::fs::TvixStoreFs; + +#[cfg(feature = "fuse")] +use tvix_store::fs::fuse::FuseDaemon; + +#[cfg(feature = "virtiofs")] +use tvix_store::fs::virtiofs::start_virtiofs_daemon; + +#[cfg(feature = "tonic-reflection")] +use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET; +#[cfg(feature = "tonic-reflection")] +use tvix_store::proto::FILE_DESCRIPTOR_SET; + +use clap::Parser; +use tonic::{transport::Server, Result}; +use tracing::{info, Level}; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// Whether to log in JSON + #[arg(long)] + json: bool, + + #[arg(long)] + log_level: Option<Level>, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Runs the tvix-store daemon. + Daemon { + #[arg(long, short = 'l')] + listen_address: Option<String>, + + #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")] + blob_service_addr: String, + + #[arg( + long, + env, + default_value = "sled:///var/lib/tvix-store/directories.sled" + )] + directory_service_addr: String, + + #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")] + path_info_service_addr: String, + }, + /// Imports a list of paths into the store, print the store path for each of them. + Import { + #[clap(value_name = "PATH")] + paths: Vec<PathBuf>, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + }, + /// Mounts a tvix-store at the given mountpoint + #[cfg(feature = "fuse")] + Mount { + #[clap(value_name = "PATH")] + dest: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// Number of FUSE threads to spawn. + #[arg(long, env, default_value_t = default_threads())] + threads: usize, + + /// Whether to list elements at the root of the mount point. + /// This is useful if your PathInfoService doesn't provide an + /// (exhaustive) listing. + #[clap(long, short, action)] + list_root: bool, + }, + /// Starts a tvix-store virtiofs daemon at the given socket path. + #[cfg(feature = "virtiofs")] + #[command(name = "virtiofs")] + VirtioFs { + #[clap(value_name = "PATH")] + socket: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// Whether to list elements at the root of the mount point. + /// This is useful if your PathInfoService doesn't provide an + /// (exhaustive) listing. + #[clap(long, short, action)] + list_root: bool, + }, +} + +#[cfg(all(feature = "fuse", not(target_os = "macos")))] +fn default_threads() -> usize { + std::thread::available_parallelism() + .map(|threads| threads.into()) + .unwrap_or(4) +} +// On MacFUSE only a single channel will receive ENODEV when the file system is +// unmounted and so all the other channels will block forever. +// See https://github.com/osxfuse/osxfuse/issues/974 +#[cfg(all(feature = "fuse", target_os = "macos"))] +fn default_threads() -> usize { + 1 +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + let cli = Cli::parse(); + + // configure log settings + let level = cli.log_level.unwrap_or(Level::INFO); + + let subscriber = tracing_subscriber::registry() + .with(if cli.json { + Some( + tracing_subscriber::fmt::Layer::new() + .with_writer(io::stderr.with_max_level(level)) + .json(), + ) + } else { + None + }) + .with(if !cli.json { + Some( + tracing_subscriber::fmt::Layer::new() + .with_writer(io::stderr.with_max_level(level)) + .pretty(), + ) + } else { + None + }); + + tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber"); + + match cli.command { + Commands::Daemon { + listen_address, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + } => { + // initialize stores + let blob_service = blobservice::from_addr(&blob_service_addr)?; + let directory_service = directoryservice::from_addr(&directory_service_addr)?; + let path_info_service = pathinfoservice::from_addr( + &path_info_service_addr, + blob_service.clone(), + directory_service.clone(), + )?; + + let listen_address = listen_address + .unwrap_or_else(|| "[::]:8000".to_string()) + .parse() + .unwrap(); + + let mut server = Server::builder(); + + #[allow(unused_mut)] + let mut router = server + .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( + blob_service, + ))) + .add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(directory_service), + )) + .add_service(PathInfoServiceServer::new( + GRPCPathInfoServiceWrapper::from(path_info_service), + )); + + #[cfg(feature = "tonic-reflection")] + { + let reflection_svc = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build()?; + router = router.add_service(reflection_svc); + } + + info!("tvix-store listening on {}", listen_address); + + let listener = ListenerStream::bind(&listen_address).await?; + + router.serve_with_incoming(listener).await?; + } + Commands::Import { + paths, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + } => { + // FUTUREWORK: allow flat for single files? + let blob_service = blobservice::from_addr(&blob_service_addr)?; + let directory_service = directoryservice::from_addr(&directory_service_addr)?; + let path_info_service = pathinfoservice::from_addr( + &path_info_service_addr, + blob_service.clone(), + directory_service.clone(), + )?; + + let tasks = paths + .into_iter() + .map(|path| { + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + let path_info_service = path_info_service.clone(); + + let task: JoinHandle<io::Result<()>> = tokio::task::spawn(async move { + // Ingest the path into blob and directory service. + let root_node = import::ingest_path( + blob_service.clone(), + directory_service.clone(), + &path, + ) + .await + .expect("failed to ingest path"); + + // Ask the PathInfoService for the NAR size and sha256 + let root_node_copy = root_node.clone(); + let path_info_service_clone = path_info_service.clone(); + let (nar_size, nar_sha256) = path_info_service_clone + .calculate_nar(&root_node_copy) + .await?; + + let name = path + .file_name() + .expect("path must not be ..") + .to_str() + .expect("path must be valid unicode"); + + let output_path = store_path::build_nar_based_store_path(&nar_sha256, name); + + // assemble a new root_node with a name that is derived from the nar hash. + let root_node = + root_node.rename(output_path.to_string().into_bytes().into()); + + // assemble the [crate::proto::PathInfo] object. + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node), + }), + // There's no reference scanning on path contents ingested like this. + references: vec![], + narinfo: Some(NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: vec![], + }), + }; + + // put into [PathInfoService], and return the PathInfo that we get back + // from there (it might contain additional signatures). + let path_info = path_info_service.put(path_info).await?; + + let node = path_info.node.unwrap().node.unwrap(); + + log_node(&node, &path); + + println!( + "{}", + StorePath::from_bytes(node.get_name()) + .unwrap() + .to_absolute_path() + ); + + Ok(()) + }); + task + }) + .collect::<Vec<_>>(); + + try_join_all(tasks).await?; + } + #[cfg(feature = "fuse")] + Commands::Mount { + dest, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + list_root, + threads, + } => { + let blob_service = blobservice::from_addr(&blob_service_addr)?; + let directory_service = directoryservice::from_addr(&directory_service_addr)?; + let path_info_service = pathinfoservice::from_addr( + &path_info_service_addr, + blob_service.clone(), + directory_service.clone(), + )?; + + let mut fuse_daemon = tokio::task::spawn_blocking(move || { + let f = TvixStoreFs::new( + blob_service, + directory_service, + path_info_service, + list_root, + ); + info!("mounting tvix-store on {:?}", &dest); + + FuseDaemon::new(f, &dest, threads) + }) + .await??; + + // grab a handle to unmount the file system, and register a signal + // handler. + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + info!("interrupt received, unmounting…"); + tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; + info!("unmount occured, terminating…"); + Ok::<_, io::Error>(()) + }) + .await??; + } + #[cfg(feature = "virtiofs")] + Commands::VirtioFs { + socket, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + list_root, + } => { + let blob_service = blobservice::from_addr(&blob_service_addr)?; + let directory_service = directoryservice::from_addr(&directory_service_addr)?; + let path_info_service = pathinfoservice::from_addr( + &path_info_service_addr, + blob_service.clone(), + directory_service.clone(), + )?; + + tokio::task::spawn_blocking(move || { + let fs = TvixStoreFs::new( + blob_service, + directory_service, + path_info_service, + list_root, + ); + info!("starting tvix-store virtiofs daemon on {:?}", &socket); + + start_virtiofs_daemon(fs, socket) + }) + .await??; + } + }; + Ok(()) +} + +fn log_node(node: &Node, path: &Path) { + match node { + Node::Directory(directory_node) => { + info!( + path = ?path, + name = ?directory_node.name, + digest = BASE64.encode(&directory_node.digest), + "import successful", + ) + } + Node::File(file_node) => { + info!( + path = ?path, + name = ?file_node.name, + digest = BASE64.encode(&file_node.digest), + "import successful" + ) + } + Node::Symlink(symlink_node) => { + info!( + path = ?path, + name = ?symlink_node.name, + target = ?symlink_node.target, + "import successful" + ) + } + } +} diff --git a/tvix/store/src/fs/file_attr.rs b/tvix/store/src/fs/file_attr.rs new file mode 100644 index 000000000000..562cd9f19002 --- /dev/null +++ b/tvix/store/src/fs/file_attr.rs @@ -0,0 +1,52 @@ +use super::inodes::{DirectoryInodeData, InodeData}; +use fuse_backend_rs::abi::fuse_abi::Attr; + +/// The [Attr] describing the root +pub const ROOT_FILE_ATTR: Attr = Attr { + ino: fuse_backend_rs::api::filesystem::ROOT_ID, + size: 0, + blksize: 1024, + blocks: 0, + mode: libc::S_IFDIR as u32 | 0o555, + atime: 0, + mtime: 0, + ctime: 0, + atimensec: 0, + mtimensec: 0, + ctimensec: 0, + nlink: 0, + uid: 0, + gid: 0, + rdev: 0, + flags: 0, + #[cfg(target_os = "macos")] + crtime: 0, + #[cfg(target_os = "macos")] + crtimensec: 0, + #[cfg(target_os = "macos")] + padding: 0, +}; + +/// for given &Node and inode, construct an [Attr] +pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> Attr { + Attr { + ino: inode, + // FUTUREWORK: play with this numbers, as it affects read sizes for client applications. + blocks: 1024, + size: match inode_data { + InodeData::Regular(_, size, _) => *size as u64, + InodeData::Symlink(target) => target.len() as u64, + InodeData::Directory(DirectoryInodeData::Sparse(_, size)) => *size as u64, + InodeData::Directory(DirectoryInodeData::Populated(_, ref children)) => { + children.len() as u64 + } + }, + mode: match inode_data { + InodeData::Regular(_, _, false) => libc::S_IFREG as u32 | 0o444, // no-executable files + InodeData::Regular(_, _, true) => libc::S_IFREG as u32 | 0o555, // executable files + InodeData::Symlink(_) => libc::S_IFLNK as u32 | 0o444, + InodeData::Directory(_) => libc::S_IFDIR as u32 | 0o555, + }, + ..Default::default() + } +} diff --git a/tvix/store/src/fs/fuse.rs b/tvix/store/src/fs/fuse.rs new file mode 100644 index 000000000000..d2a734882196 --- /dev/null +++ b/tvix/store/src/fs/fuse.rs @@ -0,0 +1,119 @@ +use std::{io, path::Path, sync::Arc, thread}; + +use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use tracing::error; + +struct FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, + channel: fuse_backend_rs::transport::FuseChannel, +} + +#[cfg(target_os = "macos")] +const BADFD: libc::c_int = libc::EBADF; +#[cfg(target_os = "linux")] +const BADFD: libc::c_int = libc::EBADFD; + +impl<FS> FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + fn start(&mut self) -> io::Result<()> { + loop { + if let Some((reader, writer)) = self + .channel + .get_request() + .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? + { + if let Err(e) = self + .server + .handle_message(reader, writer.into(), None, None) + { + match e { + // This indicates the session has been shut down. + fuse_backend_rs::Error::EncodeMessage(e) + if e.raw_os_error() == Some(BADFD) => + { + break; + } + error => { + error!(?error, "failed to handle fuse request"); + continue; + } + } + } + } else { + break; + } + } + Ok(()) + } +} + +pub struct FuseDaemon { + session: FuseSession, + threads: Vec<thread::JoinHandle<()>>, +} + +impl FuseDaemon { + pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error> + where + FS: FileSystem + Sync + Send + 'static, + P: AsRef<Path>, + { + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + #[cfg(target_os = "linux")] + session.set_allow_other(false); + session + .mount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + let mut join_handles = Vec::with_capacity(threads); + for _ in 0..threads { + let mut server = FuseServer { + server: server.clone(), + channel: session + .new_channel() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + }; + let join_handle = thread::Builder::new() + .name("fuse_server".to_string()) + .spawn(move || { + let _ = server.start(); + })?; + join_handles.push(join_handle); + } + + Ok(FuseDaemon { + session, + threads: join_handles, + }) + } + + pub fn unmount(&mut self) -> Result<(), io::Error> { + self.session + .umount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + for thread in self.threads.drain(..) { + thread.join().map_err(|_| { + io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") + })?; + } + + Ok(()) + } +} + +impl Drop for FuseDaemon { + fn drop(&mut self) { + if let Err(error) = self.unmount() { + error!(?error, "failed to unmont fuse filesystem") + } + } +} diff --git a/tvix/store/src/fs/inode_tracker.rs b/tvix/store/src/fs/inode_tracker.rs new file mode 100644 index 000000000000..daf6b4ee79c2 --- /dev/null +++ b/tvix/store/src/fs/inode_tracker.rs @@ -0,0 +1,457 @@ +use std::{collections::HashMap, sync::Arc}; + +use super::inodes::{DirectoryInodeData, InodeData}; +use tvix_castore::proto as castorepb; +use tvix_castore::B3Digest; + +/// InodeTracker keeps track of inodes, stores data being these inodes and deals +/// with inode allocation. +pub struct InodeTracker { + data: HashMap<u64, Arc<InodeData>>, + + // lookup table for blobs by their B3Digest + blob_digest_to_inode: HashMap<B3Digest, u64>, + + // lookup table for symlinks by their target + symlink_target_to_inode: HashMap<bytes::Bytes, u64>, + + // lookup table for directories by their B3Digest. + // Note the corresponding directory may not be present in data yet. + directory_digest_to_inode: HashMap<B3Digest, u64>, + + // the next inode to allocate + next_inode: u64, +} + +impl Default for InodeTracker { + fn default() -> Self { + Self { + data: Default::default(), + + blob_digest_to_inode: Default::default(), + symlink_target_to_inode: Default::default(), + directory_digest_to_inode: Default::default(), + + next_inode: 2, + } + } +} + +impl InodeTracker { + // Retrieves data for a given inode, if it exists. + pub fn get(&self, ino: u64) -> Option<Arc<InodeData>> { + self.data.get(&ino).cloned() + } + + // Stores data and returns the inode for it. + // In case an inode has already been allocated for the same data, that inode + // is returned, otherwise a new one is allocated. + // In case data is a [InodeData::Directory], inodes for all items are looked + // up + pub fn put(&mut self, data: InodeData) -> u64 { + match data { + InodeData::Regular(ref digest, _, _) => { + match self.blob_digest_to_inode.get(digest) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => self.insert_and_increment(data), + } + } + InodeData::Symlink(ref target) => { + match self.symlink_target_to_inode.get(target) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => self.insert_and_increment(data), + } + } + InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { + // check the lookup table if the B3Digest is known. + match self.directory_digest_to_inode.get(digest) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => { + // insert and return the inode + self.insert_and_increment(data) + } + } + } + // Inserting [DirectoryInodeData::Populated] usually replaces an + // existing [DirectoryInodeData::Sparse] one. + InodeData::Directory(DirectoryInodeData::Populated(ref digest, ref children)) => { + let dir_ino = self.directory_digest_to_inode.get(digest); + if let Some(dir_ino) = dir_ino { + let dir_ino = *dir_ino; + + // We know the data must exist, as we found it in [directory_digest_to_inode]. + let needs_update = match **self.data.get(&dir_ino).unwrap() { + InodeData::Regular(..) | InodeData::Symlink(_) => { + panic!("unexpected type at inode {}", dir_ino); + } + // already populated, nothing to do + InodeData::Directory(DirectoryInodeData::Populated(..)) => false, + // in case the actual data is sparse, replace it with the populated one. + // this allocates inodes for new children in the process. + InodeData::Directory(DirectoryInodeData::Sparse( + ref old_digest, + ref _old_size, + )) => { + // sanity checking to ensure we update the right node + debug_assert_eq!(old_digest, digest); + + true + } + }; + + if needs_update { + // populate inode fields in children + let children = self.allocate_inodes_for_children(children.to_vec()); + + // update sparse data with populated data + self.data.insert( + dir_ino, + Arc::new(InodeData::Directory(DirectoryInodeData::Populated( + digest.clone(), + children, + ))), + ); + } + + dir_ino + } else { + // populate inode fields in children + let children = self.allocate_inodes_for_children(children.to_vec()); + // insert and return InodeData + self.insert_and_increment(InodeData::Directory(DirectoryInodeData::Populated( + digest.clone(), + children, + ))) + } + } + } + } + + // Consume a list of children with zeroed inodes, and allocate (or fetch existing) inodes. + fn allocate_inodes_for_children( + &mut self, + children: Vec<(u64, castorepb::node::Node)>, + ) -> Vec<(u64, castorepb::node::Node)> { + // allocate new inodes for all children + let mut children_new: Vec<(u64, castorepb::node::Node)> = Vec::new(); + + for (child_ino, ref child_node) in children { + debug_assert_eq!(0, child_ino, "expected child inode to be 0"); + let child_ino = match child_node { + castorepb::node::Node::Directory(directory_node) => { + // Try putting the sparse data in. If we already have a + // populated version, it'll not update it. + self.put(directory_node.into()) + } + castorepb::node::Node::File(file_node) => self.put(file_node.into()), + castorepb::node::Node::Symlink(symlink_node) => self.put(symlink_node.into()), + }; + + children_new.push((child_ino, child_node.clone())) + } + children_new + } + + // Inserts the data and returns the inode it was stored at, while + // incrementing next_inode. + fn insert_and_increment(&mut self, data: InodeData) -> u64 { + let ino = self.next_inode; + // insert into lookup tables + match data { + InodeData::Regular(ref digest, _, _) => { + self.blob_digest_to_inode.insert(digest.clone(), ino); + } + InodeData::Symlink(ref target) => { + self.symlink_target_to_inode.insert(target.clone(), ino); + } + InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { + self.directory_digest_to_inode.insert(digest.clone(), ino); + } + // This is currently not used outside test fixtures. + // Usually a [DirectoryInodeData::Sparse] is inserted and later + // "upgraded" with more data. + // However, as a future optimization, a lookup for a PathInfo could trigger a + // [DirectoryService::get_recursive()] request that "forks into + // background" and prepopulates all Directories in a closure. + InodeData::Directory(DirectoryInodeData::Populated(ref digest, _)) => { + self.directory_digest_to_inode.insert(digest.clone(), ino); + } + } + // Insert data + self.data.insert(ino, Arc::new(data)); + + // increment inode counter and return old inode. + self.next_inode += 1; + ino + } +} + +#[cfg(test)] +mod tests { + use crate::fs::inodes::DirectoryInodeData; + use crate::tests::fixtures; + use tvix_castore::proto as castorepb; + + use super::InodeData; + use super::InodeTracker; + + /// Getting something non-existent should be none + #[test] + fn get_nonexistent() { + let inode_tracker = InodeTracker::default(); + assert!(inode_tracker.get(1).is_none()); + } + + /// Put of a regular file should allocate a uid, which should be the same when inserting again. + #[test] + fn put_regular() { + let mut inode_tracker = InodeTracker::default(); + let f = InodeData::Regular( + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u32, + false, + ); + + // put it in + let ino = inode_tracker.put(f.clone()); + + // a get should return the right data + let data = inode_tracker.get(ino).expect("must be some"); + match *data { + InodeData::Regular(ref digest, _, _) => { + assert_eq!(&fixtures::BLOB_A_DIGEST.clone(), digest); + } + InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"), + } + + // another put should return the same ino + assert_eq!(ino, inode_tracker.put(f)); + + // inserting another file should return a different ino + assert_ne!( + ino, + inode_tracker.put(InodeData::Regular( + fixtures::BLOB_B_DIGEST.clone(), + fixtures::BLOB_B.len() as u32, + false, + )) + ); + } + + // Put of a symlink should allocate a uid, which should be the same when inserting again + #[test] + fn put_symlink() { + let mut inode_tracker = InodeTracker::default(); + let f = InodeData::Symlink("target".into()); + + // put it in + let ino = inode_tracker.put(f.clone()); + + // a get should return the right data + let data = inode_tracker.get(ino).expect("must be some"); + match *data { + InodeData::Symlink(ref target) => { + assert_eq!(b"target".to_vec(), *target); + } + InodeData::Regular(..) | InodeData::Directory(..) => panic!("wrong type"), + } + + // another put should return the same ino + assert_eq!(ino, inode_tracker.put(f)); + + // inserting another file should return a different ino + assert_ne!(ino, inode_tracker.put(InodeData::Symlink("target2".into()))); + } + + // TODO: put sparse directory + + /// Put a directory into the inode tracker, which refers to a file not seen yet. + #[test] + fn put_directory_leaf() { + let mut inode_tracker = InodeTracker::default(); + + // this is a directory with a single item, a ".keep" file pointing to a 0 bytes blob. + let dir: InodeData = fixtures::DIRECTORY_WITH_KEEP.clone().into(); + + // put it in + let dir_ino = inode_tracker.put(dir); + + // a get should return the right data + let data = inode_tracker.get(dir_ino).expect("must be some"); + match *data { + InodeData::Directory(super::DirectoryInodeData::Sparse(..)) => { + panic!("wrong type"); + } + InodeData::Directory(super::DirectoryInodeData::Populated( + ref directory_digest, + ref children, + )) => { + // ensure the directory digest matches + assert_eq!(&fixtures::DIRECTORY_WITH_KEEP.digest(), directory_digest); + + // ensure the child is populated, with a different inode than + // the parent, and the data matches expectations. + assert_eq!(1, children.len()); + let (child_ino, child_node) = children.first().unwrap(); + assert_ne!(dir_ino, *child_ino); + assert_eq!( + &castorepb::node::Node::File( + fixtures::DIRECTORY_WITH_KEEP.files.first().unwrap().clone() + ), + child_node + ); + + // ensure looking up that inode directly returns the data + let child_data = inode_tracker.get(*child_ino).expect("must exist"); + match *child_data { + InodeData::Regular(ref digest, size, executable) => { + assert_eq!(&fixtures::EMPTY_BLOB_DIGEST.clone(), digest); + assert_eq!(0, size); + assert!(!executable); + } + InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"), + } + } + InodeData::Symlink(_) | InodeData::Regular(..) => panic!("wrong type"), + } + } + + /// Put a directory into the inode tracker, referring to files, directories + /// and symlinks not seen yet. + #[test] + fn put_directory_complicated() { + let mut inode_tracker = InodeTracker::default(); + + // this is a directory with a single item, a ".keep" file pointing to a 0 bytes blob. + let dir_complicated: InodeData = fixtures::DIRECTORY_COMPLICATED.clone().into(); + + // put it in + let dir_complicated_ino = inode_tracker.put(dir_complicated); + + // a get should return the right data + let dir_data = inode_tracker + .get(dir_complicated_ino) + .expect("must be some"); + + let child_dir_ino = match *dir_data { + InodeData::Directory(DirectoryInodeData::Sparse(..)) => { + panic!("wrong type"); + } + InodeData::Directory(DirectoryInodeData::Populated( + ref directory_digest, + ref children, + )) => { + // assert the directory digest matches + assert_eq!(&fixtures::DIRECTORY_COMPLICATED.digest(), directory_digest); + + // ensure there's three children, all with different inodes + assert_eq!(3, children.len()); + let mut seen_inodes = Vec::from([dir_complicated_ino]); + + // check the first child (.keep) + { + let (child_ino, child_node) = &children[0]; + assert!(!seen_inodes.contains(child_ino)); + assert_eq!( + &castorepb::node::Node::File( + fixtures::DIRECTORY_COMPLICATED.files[0].clone() + ), + child_node + ); + seen_inodes.push(*child_ino); + } + + // check the second child (aa) + { + let (child_ino, child_node) = &children[1]; + assert!(!seen_inodes.contains(child_ino)); + assert_eq!( + &castorepb::node::Node::Symlink( + fixtures::DIRECTORY_COMPLICATED.symlinks[0].clone() + ), + child_node + ); + seen_inodes.push(*child_ino); + } + + // check the third child (keep) + { + let (child_ino, child_node) = &children[2]; + assert!(!seen_inodes.contains(child_ino)); + assert_eq!( + &castorepb::node::Node::Directory( + fixtures::DIRECTORY_COMPLICATED.directories[0].clone() + ), + child_node + ); + seen_inodes.push(*child_ino); + + // return the child_ino + *child_ino + } + } + InodeData::Regular(..) | InodeData::Symlink(_) => panic!("wrong type"), + }; + + // get of the inode for child_ino + let child_dir_data = inode_tracker.get(child_dir_ino).expect("must be some"); + // it should be a sparse InodeData::Directory with the right digest. + match *child_dir_data { + InodeData::Directory(DirectoryInodeData::Sparse( + ref child_dir_digest, + child_dir_size, + )) => { + assert_eq!(&fixtures::DIRECTORY_WITH_KEEP.digest(), child_dir_digest); + assert_eq!(fixtures::DIRECTORY_WITH_KEEP.size(), child_dir_size); + } + InodeData::Directory(DirectoryInodeData::Populated(..)) + | InodeData::Regular(..) + | InodeData::Symlink(_) => { + panic!("wrong type") + } + } + + // put DIRECTORY_WITH_KEEP, which should return the same ino as [child_dir_ino], + // but update the sparse object to a populated one at the same time. + let child_dir_ino2 = inode_tracker.put(fixtures::DIRECTORY_WITH_KEEP.clone().into()); + assert_eq!(child_dir_ino, child_dir_ino2); + + // get the data + match *inode_tracker.get(child_dir_ino).expect("must be some") { + // it should be a populated InodeData::Directory with the right digest! + InodeData::Directory(DirectoryInodeData::Populated( + ref directory_digest, + ref children, + )) => { + // ensure the directory digest matches + assert_eq!(&fixtures::DIRECTORY_WITH_KEEP.digest(), directory_digest); + + // ensure the child is populated, with a different inode than + // the parent, and the data matches expectations. + assert_eq!(1, children.len()); + let (child_node_inode, child_node) = children.first().unwrap(); + assert_ne!(dir_complicated_ino, *child_node_inode); + assert_eq!( + &castorepb::node::Node::File( + fixtures::DIRECTORY_WITH_KEEP.files.first().unwrap().clone() + ), + child_node + ); + } + InodeData::Directory(DirectoryInodeData::Sparse(..)) + | InodeData::Regular(..) + | InodeData::Symlink(_) => panic!("wrong type"), + } + } +} + +// TODO: add test inserting a populated one first, then ensure an update doesn't degrade it back to sparse. diff --git a/tvix/store/src/fs/inodes.rs b/tvix/store/src/fs/inodes.rs new file mode 100644 index 000000000000..928f51059002 --- /dev/null +++ b/tvix/store/src/fs/inodes.rs @@ -0,0 +1,70 @@ +//! This module contains all the data structures used to track information +//! about inodes, which present tvix-store nodes in a filesystem. +use tvix_castore::proto as castorepb; +use tvix_castore::B3Digest; + +#[derive(Clone, Debug)] +pub enum InodeData { + Regular(B3Digest, u32, bool), // digest, size, executable + Symlink(bytes::Bytes), // target + Directory(DirectoryInodeData), // either [DirectoryInodeData:Sparse] or [DirectoryInodeData:Populated] +} + +/// This encodes the two different states of [InodeData::Directory]. +/// Either the data still is sparse (we only saw a [castorepb::DirectoryNode], +/// but didn't fetch the [castorepb::Directory] struct yet, or we processed a +/// lookup and did fetch the data. +#[derive(Clone, Debug)] +pub enum DirectoryInodeData { + Sparse(B3Digest, u32), // digest, size + Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)] +} + +impl From<&castorepb::node::Node> for InodeData { + fn from(value: &castorepb::node::Node) -> Self { + match value { + castorepb::node::Node::Directory(directory_node) => directory_node.into(), + castorepb::node::Node::File(file_node) => file_node.into(), + castorepb::node::Node::Symlink(symlink_node) => symlink_node.into(), + } + } +} + +impl From<&castorepb::SymlinkNode> for InodeData { + fn from(value: &castorepb::SymlinkNode) -> Self { + InodeData::Symlink(value.target.clone()) + } +} + +impl From<&castorepb::FileNode> for InodeData { + fn from(value: &castorepb::FileNode) -> Self { + InodeData::Regular( + value.digest.clone().try_into().unwrap(), + value.size, + value.executable, + ) + } +} + +/// Converts a DirectoryNode to a sparsely populated InodeData::Directory. +impl From<&castorepb::DirectoryNode> for InodeData { + fn from(value: &castorepb::DirectoryNode) -> Self { + InodeData::Directory(DirectoryInodeData::Sparse( + value.digest.clone().try_into().unwrap(), + value.size, + )) + } +} + +/// converts a proto::Directory to a InodeData::Directory(DirectoryInodeData::Populated(..)). +/// The inodes for each child are 0, because it's up to the InodeTracker to allocate them. +impl From<castorepb::Directory> for InodeData { + fn from(value: castorepb::Directory) -> Self { + let digest = value.digest(); + + let children: Vec<(u64, castorepb::node::Node)> = + value.nodes().map(|node| (0, node)).collect(); + + InodeData::Directory(DirectoryInodeData::Populated(digest, children)) + } +} diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs new file mode 100644 index 000000000000..1333983460ea --- /dev/null +++ b/tvix/store/src/fs/mod.rs @@ -0,0 +1,690 @@ +mod file_attr; +mod inode_tracker; +mod inodes; + +#[cfg(feature = "fuse")] +pub mod fuse; + +#[cfg(feature = "virtiofs")] +pub mod virtiofs; + +#[cfg(test)] +mod tests; + +use crate::pathinfoservice::PathInfoService; + +use fuse_backend_rs::abi::fuse_abi::stat64; +use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; +use futures::StreamExt; +use nix_compat::store_path::StorePath; +use parking_lot::RwLock; +use std::{ + collections::HashMap, + io, + str::FromStr, + sync::atomic::AtomicU64, + sync::{atomic::Ordering, Arc}, + time::Duration, +}; +use tokio::{ + io::{AsyncBufReadExt, AsyncSeekExt}, + sync::mpsc, +}; +use tracing::{debug, info_span, warn}; +use tvix_castore::{ + blobservice::{BlobReader, BlobService}, + directoryservice::DirectoryService, + proto::{node::Node, NamedNode}, + B3Digest, Error, +}; + +use self::{ + file_attr::{gen_file_attr, ROOT_FILE_ATTR}, + inode_tracker::InodeTracker, + inodes::{DirectoryInodeData, InodeData}, +}; + +/// This implements a read-only FUSE filesystem for a tvix-store +/// with the passed [BlobService], [DirectoryService] and [PathInfoService]. +/// +/// We don't allow listing on the root mountpoint (inode 0). +/// In the future, this might be made configurable once a listing method is +/// added to [self.path_info_service], and then show all store paths in that +/// store. +/// +/// Linux uses inodes in filesystems. When implementing FUSE, most calls are +/// *for* a given inode. +/// +/// This means, we need to have a stable mapping of inode numbers to the +/// corresponding store nodes. +/// +/// We internally delegate all inode allocation and state keeping to the +/// inode tracker, and store the currently "explored" store paths together with +/// root inode of the root. +/// +/// There's some places where inodes are allocated / data inserted into +/// the inode tracker, if not allocated before already: +/// - Processing a `lookup` request, either in the mount root, or somewhere +/// deeper +/// - Processing a `readdir` request +/// +/// Things pointing to the same contents get the same inodes, irrespective of +/// their own location. +/// This means: +/// - Symlinks with the same target will get the same inode. +/// - Regular/executable files with the same contents will get the same inode +/// - Directories with the same contents will get the same inode. +/// +/// Due to the above being valid across the whole store, and considering the +/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed +/// allocation", aka reserve Directory.size inodes for each PathInfo. +pub struct TvixStoreFs { + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + path_info_service: Arc<dyn PathInfoService>, + + /// Whether to (try) listing elements in the root. + list_root: bool, + + /// This maps a given StorePath to the inode we allocated for the root inode. + store_paths: RwLock<HashMap<StorePath, u64>>, + + /// This keeps track of inodes and data alongside them. + inode_tracker: RwLock<InodeTracker>, + + /// This holds all open file handles + file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>, + + next_file_handle: AtomicU64, + + tokio_handle: tokio::runtime::Handle, +} + +impl TvixStoreFs { + pub fn new( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + path_info_service: Arc<dyn PathInfoService>, + list_root: bool, + ) -> Self { + Self { + blob_service, + directory_service, + path_info_service, + + list_root, + + store_paths: RwLock::new(HashMap::default()), + inode_tracker: RwLock::new(Default::default()), + + file_handles: RwLock::new(Default::default()), + next_file_handle: AtomicU64::new(1), + tokio_handle: tokio::runtime::Handle::current(), + } + } + + /// This will turn a lookup request for [std::ffi::OsStr] in the root to + /// a ino and [InodeData]. + /// It will peek in [self.store_paths], and then either look it up from + /// [self.inode_tracker], + /// or otherwise fetch from [self.path_info_service], and then insert into + /// [self.inode_tracker]. + fn name_in_root_to_ino_and_data( + &self, + name: &std::ffi::CStr, + ) -> Result<Option<(u64, Arc<InodeData>)>, Error> { + // parse the name into a [StorePath]. + let store_path = if let Some(name) = name.to_str().ok() { + match StorePath::from_str(name) { + Ok(store_path) => store_path, + Err(e) => { + debug!(e=?e, "unable to parse as store path"); + // This is not an error, but a "ENOENT", as someone can stat + // a file inside the root that's no valid store path + return Ok(None); + } + } + } else { + debug!("{name:?} is not a valid utf-8 string"); + // same here. + return Ok(None); + }; + + let ino = { + // This extra scope makes sure we drop the read lock + // immediately after reading, to prevent deadlocks. + let store_paths = self.store_paths.read(); + store_paths.get(&store_path).cloned() + }; + + if let Some(ino) = ino { + // If we already have that store path, lookup the inode from + // self.store_paths and then get the data from [self.inode_tracker], + // which in the case of a [InodeData::Directory] will be fully + // populated. + Ok(Some(( + ino, + self.inode_tracker.read().get(ino).expect("must exist"), + ))) + } else { + // If we don't have it, look it up in PathInfoService. + let path_info_service = self.path_info_service.clone(); + let task = self + .tokio_handle + .spawn(async move { path_info_service.get(store_path.digest).await }); + match self.tokio_handle.block_on(task).unwrap()? { + // the pathinfo doesn't exist, so the file doesn't exist. + None => Ok(None), + Some(path_info) => { + // The pathinfo does exist, so there must be a root node + let root_node = path_info.node.unwrap().node.unwrap(); + + // The name must match what's passed in the lookup, otherwise we return nothing. + if root_node.get_name() != store_path.to_string().as_bytes() { + return Ok(None); + } + + // Let's check if someone else beat us to updating the inode tracker and + // store_paths map. + let mut store_paths = self.store_paths.write(); + if let Some(ino) = store_paths.get(&store_path).cloned() { + return Ok(Some(( + ino, + self.inode_tracker.read().get(ino).expect("must exist"), + ))); + } + + // insert the (sparse) inode data and register in + // self.store_paths. + // FUTUREWORK: change put to return the data after + // inserting, so we don't need to lookup a second + // time? + let (ino, inode) = { + let mut inode_tracker = self.inode_tracker.write(); + let ino = inode_tracker.put((&root_node).into()); + (ino, inode_tracker.get(ino).unwrap()) + }; + store_paths.insert(store_path, ino); + + Ok(Some((ino, inode))) + } + } + } + } + + /// This will lookup a directory by digest, and will turn it into a + /// [InodeData::Directory(DirectoryInodeData::Populated(..))]. + /// This is both used to initially insert the root node of a store path, + /// as well as when looking up an intermediate DirectoryNode. + fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> { + let directory_service = self.directory_service.clone(); + let directory_digest_clone = directory_digest.clone(); + let task = self + .tokio_handle + .spawn(async move { directory_service.get(&directory_digest_clone).await }); + match self.tokio_handle.block_on(task).unwrap() { + Err(e) => { + warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory"); + Err(e) + } + // If the Directory can't be found, this is a hole, bail out. + Ok(None) => { + tracing::error!(directory.digest=%directory_digest, "directory not found in directory service"); + Err(Error::StorageError(format!( + "directory {} not found", + directory_digest + ))) + } + Ok(Some(directory)) => Ok(directory.into()), + } + } +} + +impl FileSystem for TvixStoreFs { + type Inode = u64; + type Handle = u64; + + fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> { + Ok(FsOptions::empty()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn getattr( + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Option<Self::Handle>, + ) -> io::Result<(stat64, Duration)> { + if inode == ROOT_ID { + return Ok((ROOT_FILE_ATTR.into(), Duration::MAX)); + } + + match self.inode_tracker.read().get(inode) { + None => return Err(io::Error::from_raw_os_error(libc::ENOENT)), + Some(node) => { + debug!(node = ?node, "found node"); + Ok((gen_file_attr(&node, inode).into(), Duration::MAX)) + } + } + } + + #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))] + fn lookup( + &self, + _ctx: &Context, + parent: Self::Inode, + name: &std::ffi::CStr, + ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> { + debug!("lookup"); + + // This goes from a parent inode to a node. + // - If the parent is [ROOT_ID], we need to check + // [self.store_paths] (fetching from PathInfoService if needed) + // - Otherwise, lookup the parent in [self.inode_tracker] (which must be + // a [InodeData::Directory]), and find the child with that name. + if parent == ROOT_ID { + return match self.name_in_root_to_ino_and_data(name) { + Err(e) => { + warn!("{}", e); + Err(io::Error::from_raw_os_error(libc::ENOENT)) + } + Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), + Ok(Some((ino, inode_data))) => { + debug!(inode_data=?&inode_data, ino=ino, "Some"); + Ok(fuse_backend_rs::api::filesystem::Entry { + inode: ino, + attr: gen_file_attr(&inode_data, ino).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + }) + } + }; + } + + // This is the "lookup for "a" inside inode 42. + // We already know that inode 42 must be a directory. + // It might not be populated yet, so if it isn't, we do (by + // fetching from [self.directory_service]), and save the result in + // [self.inode_tracker]. + // Now it for sure is populated, so we search for that name in the + // list of children and return the FileAttrs. + + // TODO: Reduce the critical section of this write lock. + let mut inode_tracker = self.inode_tracker.write(); + let parent_data = inode_tracker.get(parent).unwrap(); + let parent_data = match *parent_data { + InodeData::Regular(..) | InodeData::Symlink(_) => { + // if the parent inode was not a directory, this doesn't make sense + return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); + } + InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { + match self.fetch_directory_inode_data(parent_digest) { + Ok(new_data) => { + // update data in [self.inode_tracker] with populated variant. + // FUTUREWORK: change put to return the data after + // inserting, so we don't need to lookup a second + // time? + let ino = inode_tracker.put(new_data); + inode_tracker.get(ino).unwrap() + } + Err(_e) => { + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + } + } + InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data, + }; + + // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))]. + let (parent_digest, children) = if let InodeData::Directory( + DirectoryInodeData::Populated(ref parent_digest, ref children), + ) = *parent_data + { + (parent_digest, children) + } else { + panic!("unexpected type") + }; + let span = info_span!("lookup", directory.digest = %parent_digest); + let _enter = span.enter(); + + // in the children, find the one with the desired name. + if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + // lookup the child [InodeData] in [self.inode_tracker]. + // We know the inodes for children have already been allocated. + let child_inode_data = inode_tracker.get(*child_ino).unwrap(); + + // Reply with the file attributes for the child. + // For child directories, we still have all data we need to reply. + Ok(fuse_backend_rs::api::filesystem::Entry { + inode: *child_ino, + attr: gen_file_attr(&child_inode_data, *child_ino).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + }) + } else { + // Child not found, return ENOENT. + Err(io::Error::from_raw_os_error(libc::ENOENT)) + } + } + + // TODO: readdirplus? + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))] + fn readdir( + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Self::Handle, + _size: u32, + offset: u64, + add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>, + ) -> io::Result<()> { + debug!("readdir"); + + if inode == ROOT_ID { + if !self.list_root { + return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo + } else { + let path_info_service = self.path_info_service.clone(); + let (tx, mut rx) = mpsc::channel(16); + + // This task will run in the background immediately and will exit + // after the stream ends or if we no longer want any more entries. + self.tokio_handle.spawn(async move { + let mut stream = path_info_service.list().skip(offset as usize).enumerate(); + while let Some(path_info) = stream.next().await { + if tx.send(path_info).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } + } + }); + + while let Some((i, path_info)) = rx.blocking_recv() { + let path_info = match path_info { + Err(e) => { + warn!("failed to retrieve pathinfo: {}", e); + return Err(io::Error::from_raw_os_error(libc::EPERM)); + } + Ok(path_info) => path_info, + }; + + // We know the root node exists and the store_path can be parsed because clients MUST validate. + let root_node = path_info.node.unwrap().node.unwrap(); + let store_path = StorePath::from_bytes(root_node.get_name()).unwrap(); + + let ino = { + // This extra scope makes sure we drop the read lock + // immediately after reading, to prevent deadlocks. + let store_paths = self.store_paths.read(); + store_paths.get(&store_path).cloned() + }; + let ino = match ino { + Some(ino) => ino, + None => { + // insert the (sparse) inode data and register in + // self.store_paths. + let ino = self.inode_tracker.write().put((&root_node).into()); + self.store_paths.write().insert(store_path.clone(), ino); + ino + } + }; + + let ty = match root_node { + Node::Directory(_) => libc::S_IFDIR, + Node::File(_) => libc::S_IFREG, + Node::Symlink(_) => libc::S_IFLNK, + }; + + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino, + offset: offset + i as u64 + 1, + type_: ty as u32, + name: store_path.to_string().as_bytes(), + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } + } + + return Ok(()); + } + } + + // lookup the inode data. + let mut inode_tracker = self.inode_tracker.write(); + let dir_inode_data = inode_tracker.get(inode).unwrap(); + let dir_inode_data = match *dir_inode_data { + InodeData::Regular(..) | InodeData::Symlink(..) => { + warn!("Not a directory"); + return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); + } + InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => { + match self.fetch_directory_inode_data(directory_digest) { + Ok(new_data) => { + // update data in [self.inode_tracker] with populated variant. + // FUTUREWORK: change put to return the data after + // inserting, so we don't need to lookup a second + // time? + let ino = inode_tracker.put(new_data.clone()); + inode_tracker.get(ino).unwrap() + } + Err(_e) => { + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + } + } + InodeData::Directory(DirectoryInodeData::Populated(..)) => dir_inode_data, + }; + + // now parent_data can only be InodeData::Directory(DirectoryInodeData::Populated(..)) + if let InodeData::Directory(DirectoryInodeData::Populated(ref _digest, ref children)) = + *dir_inode_data + { + for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() { + // the second parameter will become the "offset" parameter on the next call. + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino: *ino, + offset: offset + i as u64 + 1, + type_: match child_node { + Node::Directory(_) => libc::S_IFDIR as u32, + Node::File(_) => libc::S_IFREG as u32, + Node::Symlink(_) => libc::S_IFLNK as u32, + }, + name: child_node.get_name(), + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } + } + } else { + panic!("unexpected type") + } + + Ok(()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn open( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + _fuse_flags: u32, + ) -> io::Result<( + Option<Self::Handle>, + fuse_backend_rs::api::filesystem::OpenOptions, + )> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // lookup the inode + match *self.inode_tracker.read().get(inode).unwrap() { + // read is invalid on non-files. + InodeData::Directory(..) | InodeData::Symlink(_) => { + warn!("is directory"); + return Err(io::Error::from_raw_os_error(libc::EISDIR)); + } + InodeData::Regular(ref blob_digest, _blob_size, _) => { + let span = info_span!("read", blob.digest = %blob_digest); + let _enter = span.enter(); + + let blob_service = self.blob_service.clone(); + let blob_digest = blob_digest.clone(); + + let task = self + .tokio_handle + .spawn(async move { blob_service.open_read(&blob_digest).await }); + + let blob_reader = self.tokio_handle.block_on(task).unwrap(); + + match blob_reader { + Ok(None) => { + warn!("blob not found"); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + Err(e) => { + warn!(e=?e, "error opening blob"); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + Ok(Some(blob_reader)) => { + // get a new file handle + // TODO: this will overflow after 2**64 operations, + // which is fine for now. + // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 + // for the discussion on alternatives. + let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst); + + debug!("add file handle {}", fh); + self.file_handles + .write() + .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader))); + + Ok(( + Some(fh), + fuse_backend_rs::api::filesystem::OpenOptions::empty(), + )) + } + } + } + } + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))] + fn release( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + handle: Self::Handle, + _flush: bool, + _flock_release: bool, + _lock_owner: Option<u64>, + ) -> io::Result<()> { + // remove and get ownership on the blob reader + match self.file_handles.write().remove(&handle) { + // drop it, which will close it. + Some(blob_reader) => drop(blob_reader), + None => { + // These might already be dropped if a read error occured. + debug!("file_handle {} not found", handle); + } + } + + Ok(()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))] + fn read( + &self, + _ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter, + size: u32, + offset: u64, + _lock_owner: Option<u64>, + _flags: u32, + ) -> io::Result<usize> { + debug!("read"); + + // We need to take out the blob reader from self.file_handles, so we can + // interact with it in the separate task. + // On success, we pass it back out of the task, so we can put it back in self.file_handles. + let blob_reader = match self.file_handles.read().get(&handle) { + Some(blob_reader) => blob_reader.clone(), + None => { + warn!("file handle {} unknown", handle); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + }; + + let task = self.tokio_handle.spawn(async move { + let mut blob_reader = blob_reader.lock().await; + + // seek to the offset specified, which is relative to the start of the file. + let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await; + + match resp { + Ok(pos) => { + debug_assert_eq!(offset as u64, pos); + } + Err(e) => { + warn!("failed to seek to offset {}: {}", offset, e); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + } + + // As written in the fuse docs, read should send exactly the number + // of bytes requested except on EOF or error. + + let mut buf: Vec<u8> = Vec::with_capacity(size as usize); + + while (buf.len() as u64) < size as u64 { + let int_buf = blob_reader.fill_buf().await?; + // copy things from the internal buffer into buf to fill it till up until size + + // an empty buffer signals we reached EOF. + if int_buf.is_empty() { + break; + } + + // calculate how many bytes we can read from int_buf. + // It's either all of int_buf, or the number of bytes missing in buf to reach size. + let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len()); + + // copy these bytes into our buffer + buf.extend_from_slice(&int_buf[..len_to_copy]); + // and consume them in the buffered reader. + blob_reader.consume(len_to_copy); + } + + Ok(buf) + }); + + let buf = self.tokio_handle.block_on(task).unwrap()?; + + w.write(&buf) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // lookup the inode + match *self.inode_tracker.read().get(inode).unwrap() { + InodeData::Directory(..) | InodeData::Regular(..) => { + Err(io::Error::from_raw_os_error(libc::EINVAL)) + } + InodeData::Symlink(ref target) => Ok(target.to_vec()), + } + } +} diff --git a/tvix/store/src/fs/tests.rs b/tvix/store/src/fs/tests.rs new file mode 100644 index 000000000000..2adea0ceb3a9 --- /dev/null +++ b/tvix/store/src/fs/tests.rs @@ -0,0 +1,1171 @@ +use futures::StreamExt; +use std::io::Cursor; +use std::os::unix::prelude::MetadataExt; +use std::path::Path; +use std::sync::Arc; +use tokio::{fs, io}; +use tokio_stream::wrappers::ReadDirStream; +use tvix_castore::blobservice::BlobService; +use tvix_castore::directoryservice::DirectoryService; + +use tempfile::TempDir; + +use crate::fs::{fuse::FuseDaemon, TvixStoreFs}; +use crate::pathinfoservice::PathInfoService; +use crate::proto::PathInfo; +use crate::tests::fixtures; +use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; +use tvix_castore::proto as castorepb; + +const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; +const BLOB_B_NAME: &str = "55555555555555555555555555555555-test"; +const HELLOWORLD_BLOB_NAME: &str = "66666666666666666666666666666666-test"; +const SYMLINK_NAME: &str = "11111111111111111111111111111111-test"; +const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test"; +const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test"; +const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test"; + +fn gen_svcs() -> ( + Arc<dyn BlobService>, + Arc<dyn DirectoryService>, + Arc<dyn PathInfoService>, +) { + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + let path_info_service = gen_pathinfo_service(blob_service.clone(), directory_service.clone()); + + (blob_service, directory_service, path_info_service) +} + +fn do_mount<P: AsRef<Path>>( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + path_info_service: Arc<dyn PathInfoService>, + mountpoint: P, + list_root: bool, +) -> io::Result<FuseDaemon> { + let fs = TvixStoreFs::new( + blob_service, + directory_service, + path_info_service, + list_root, + ); + FuseDaemon::new(fs, mountpoint.as_ref(), 4) +} + +async fn populate_blob_a( + blob_service: &Arc<dyn BlobService>, + _directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, +) { + // Upload BLOB_A + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + // Create a PathInfo for it + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::File(castorepb::FileNode { + name: BLOB_A_NAME.into(), + digest: fixtures::BLOB_A_DIGEST.clone().into(), + size: fixtures::BLOB_A.len() as u32, + executable: false, + })), + }), + ..Default::default() + }; + path_info_service + .put(path_info) + .await + .expect("must succeed"); +} + +async fn populate_blob_b( + blob_service: &Arc<dyn BlobService>, + _directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, +) { + // Upload BLOB_B + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + // Create a PathInfo for it + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::File(castorepb::FileNode { + name: BLOB_B_NAME.into(), + digest: fixtures::BLOB_B_DIGEST.clone().into(), + size: fixtures::BLOB_B.len() as u32, + executable: false, + })), + }), + ..Default::default() + }; + path_info_service + .put(path_info) + .await + .expect("must succeed"); +} + +/// adds a blob containing helloworld and marks it as executable +async fn populate_helloworld_blob( + blob_service: &Arc<dyn BlobService>, + _directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, +) { + // Upload BLOB_B + let mut bw = blob_service.open_write().await; + tokio::io::copy( + &mut Cursor::new(fixtures::HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut bw, + ) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + // Create a PathInfo for it + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::File(castorepb::FileNode { + name: HELLOWORLD_BLOB_NAME.into(), + digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), + size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: true, + })), + }), + ..Default::default() + }; + path_info_service + .put(path_info) + .await + .expect("must succeed"); +} + +async fn populate_symlink( + _blob_service: &Arc<dyn BlobService>, + _directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, +) { + // Create a PathInfo for it + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: SYMLINK_NAME.into(), + target: BLOB_A_NAME.into(), + })), + }), + ..Default::default() + }; + path_info_service + .put(path_info) + .await + .expect("must succeed"); +} + +/// This writes a symlink pointing to /nix/store/somewhereelse, +/// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. +async fn populate_symlink2( + _blob_service: &Arc<dyn BlobService>, + _directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, +) { + // Create a PathInfo for it + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: SYMLINK_NAME2.into(), + target: "/nix/store/somewhereelse".into(), + })), + }), + ..Default::default() + }; + path_info_service + .put(path_info) + .await + .expect("must succeed"); +} + +async fn populate_directory_with_keep( + blob_service: &Arc<dyn BlobService>, + directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, +) { + // upload empty blob + let mut bw = blob_service.open_write().await; + assert_eq!( + fixtures::EMPTY_BLOB_DIGEST.to_vec(), + bw.close().await.expect("must succeed closing").to_vec(), + ); + + // upload directory + directory_service + .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await + .expect("must succeed uploading"); + + // upload pathinfo + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: DIRECTORY_WITH_KEEP_NAME.into(), + digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + size: fixtures::DIRECTORY_WITH_KEEP.size(), + })), + }), + ..Default::default() + }; + path_info_service + .put(path_info) + .await + .expect("must succeed"); +} + +/// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory +/// itself. +async fn populate_pathinfo_without_directory( + _: &Arc<dyn BlobService>, + _: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, +) { + // upload pathinfo + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: DIRECTORY_WITH_KEEP_NAME.into(), + digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + size: fixtures::DIRECTORY_WITH_KEEP.size(), + })), + }), + ..Default::default() + }; + path_info_service + .put(path_info) + .await + .expect("must succeed"); +} + +/// Insert , but don't provide the blob .keep is pointing to +async fn populate_blob_a_without_blob( + _: &Arc<dyn BlobService>, + _: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, +) { + // Create a PathInfo for blob A + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::File(castorepb::FileNode { + name: BLOB_A_NAME.into(), + digest: fixtures::BLOB_A_DIGEST.clone().into(), + size: fixtures::BLOB_A.len() as u32, + executable: false, + })), + }), + ..Default::default() + }; + path_info_service + .put(path_info) + .await + .expect("must succeed"); +} + +async fn populate_directory_complicated( + blob_service: &Arc<dyn BlobService>, + directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, +) { + // upload empty blob + let mut bw = blob_service.open_write().await; + assert_eq!( + fixtures::EMPTY_BLOB_DIGEST.to_vec(), + bw.close().await.expect("must succeed closing").to_vec(), + ); + + // upload inner directory + directory_service + .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await + .expect("must succeed uploading"); + + // uplodad parent directory + directory_service + .put(fixtures::DIRECTORY_COMPLICATED.clone()) + .await + .expect("must succeed uploading"); + + // upload pathinfo + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: DIRECTORY_COMPLICATED_NAME.into(), + digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), + size: fixtures::DIRECTORY_COMPLICATED.size(), + })), + }), + ..Default::default() + }; + path_info_service + .put(path_info) + .await + .expect("must succeed"); +} + +/// Ensure mounting itself doesn't fail +#[tokio::test] +async fn mount() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure listing the root isn't allowed +#[tokio::test] +async fn root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + { + // read_dir succeeds, but getting the first element will fail. + let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed")); + + let err = it + .next() + .await + .expect("must be some") + .expect_err("must be err"); + assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure listing the root is allowed if configured explicitly +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn root_with_listing() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + true, /* allow listing */ + ) + .expect("must succeed"); + + { + // read_dir succeeds, but getting the first element will fail. + let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed")); + + let e = it + .next() + .await + .expect("must be some") + .expect("must succeed"); + + let metadata = e.metadata().await.expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can stat a file at the root +#[tokio::test] +async fn stat_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + // peek at the file metadata + let metadata = fs::metadata(p).await.expect("must succeed"); + + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can read a file at the root +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + // read the file contents + let data = fs::read(p).await.expect("must succeed"); + + // ensure size and contents match + assert_eq!(fixtures::BLOB_A.len(), data.len()); + assert_eq!(fixtures::BLOB_A.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can read a large file at the root +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_large_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_b(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_B_NAME); + { + // peek at the file metadata + let metadata = fs::metadata(&p).await.expect("must succeed"); + + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_B.len() as u64, metadata.len()); + } + + // read the file contents + let data = fs::read(p).await.expect("must succeed"); + + // ensure size and contents match + assert_eq!(fixtures::BLOB_B.len(), data.len()); + assert_eq!(fixtures::BLOB_B.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read the target of a symlink +#[tokio::test] +async fn symlink_readlink() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_symlink(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(SYMLINK_NAME); + + let target = fs::read_link(&p).await.expect("must succeed"); + assert_eq!(BLOB_A_NAME, target.to_str().unwrap()); + + // peek at the file metadata, which follows symlinks. + // this must fail, as we didn't populate the target. + let e = fs::metadata(&p).await.expect_err("must fail"); + assert_eq!(std::io::ErrorKind::NotFound, e.kind()); + + // peeking at the file metadata without following symlinks will succeed. + let metadata = fs::symlink_metadata(&p).await.expect("must succeed"); + assert!(metadata.is_symlink()); + + // reading from the symlink (which follows) will fail, because the target doesn't exist. + let e = fs::read(p).await.expect_err("must fail"); + assert_eq!(std::io::ErrorKind::NotFound, e.kind()); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read and stat a regular file through a symlink pointing to it. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_stat_through_symlink() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + populate_symlink(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_symlink = tmpdir.path().join(SYMLINK_NAME); + let p_blob = tmpdir.path().join(SYMLINK_NAME); + + // peek at the file metadata, which follows symlinks. + // this must now return the same metadata as when statting at the target directly. + let metadata_symlink = fs::metadata(&p_symlink).await.expect("must succeed"); + let metadata_blob = fs::metadata(&p_blob).await.expect("must succeed"); + assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type()); + assert_eq!(metadata_blob.len(), metadata_symlink.len()); + + // reading from the symlink (which follows) will return the same data as if + // we were reading from the file directly. + assert_eq!( + fs::read(p_blob).await.expect("must succeed"), + fs::read(p_symlink).await.expect("must succeed"), + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read a directory in the root, and validate some attributes. +#[tokio::test] +async fn read_stat_directory() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + + // peek at the metadata of the directory + let metadata = fs::metadata(p).await.expect("must succeed"); + assert!(metadata.is_dir()); + assert!(metadata.permissions().readonly()); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Read a blob inside a directory. This ensures we successfully populate directory data. +async fn read_blob_inside_dir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); + + // peek at metadata. + let metadata = fs::metadata(&p).await.expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + + // read from it + let data = fs::read(&p).await.expect("must succeed"); + assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Read a blob inside a directory inside a directory. This ensures we properly +/// populate directories as we traverse down the structure. +async fn read_blob_deep_inside_dir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir + .path() + .join(DIRECTORY_COMPLICATED_NAME) + .join("keep") + .join(".keep"); + + // peek at metadata. + let metadata = fs::metadata(&p).await.expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + + // read from it + let data = fs::read(&p).await.expect("must succeed"); + assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure readdir works. +#[tokio::test] +async fn readdir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME); + + { + // read_dir should succeed. Collect all elements + let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed")) + .map(|e| e.expect("must not be err")) + .collect() + .await; + + assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and .. + + // We explicitly look at specific positions here, because we always emit + // them ordered. + + // ".keep", 0 byte file. + let e = &elements[0]; + assert_eq!(".keep", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); + + // "aa", symlink. + let e = &elements[1]; + assert_eq!("aa", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_symlink()); + + // "keep", directory + let e = &elements[2]; + assert_eq!("keep", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_dir()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory. +async fn readdir_deep() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); + + { + // read_dir should succeed. Collect all elements + let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed")) + .map(|e| e.expect("must not be err")) + .collect() + .await; + + assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and .. + + // ".keep", 0 byte file. + let e = &elements[0]; + assert_eq!(".keep", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Check attributes match how they show up in /nix/store normally. +#[tokio::test] +async fn check_attributes() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + populate_symlink(&blob_service, &directory_service, &path_info_service).await; + populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_file = tmpdir.path().join(BLOB_A_NAME); + let p_directory = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + let p_symlink = tmpdir.path().join(SYMLINK_NAME); + let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME); + + // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident. + let metadata_file = fs::symlink_metadata(&p_file).await.expect("must succeed"); + let metadata_executable_file = fs::symlink_metadata(&p_executable_file) + .await + .expect("must succeed"); + let metadata_directory = fs::symlink_metadata(&p_directory) + .await + .expect("must succeed"); + let metadata_symlink = fs::symlink_metadata(&p_symlink) + .await + .expect("must succeed"); + + // modes should match. We & with 0o777 to remove any higher bits. + assert_eq!(0o444, metadata_file.mode() & 0o777); + assert_eq!(0o555, metadata_executable_file.mode() & 0o777); + assert_eq!(0o555, metadata_directory.mode() & 0o777); + assert_eq!(0o444, metadata_symlink.mode() & 0o777); + + // files should have the correct filesize + assert_eq!(fixtures::BLOB_A.len() as u64, metadata_file.len()); + // directories should have their "size" as filesize + assert_eq!( + fixtures::DIRECTORY_WITH_KEEP.size() as u64, + metadata_directory.size() + ); + + for metadata in &[&metadata_file, &metadata_directory, &metadata_symlink] { + // uid and gid should be 0. + assert_eq!(0, metadata.uid()); + assert_eq!(0, metadata.gid()); + + // all times should be set to the unix epoch. + assert_eq!(0, metadata.atime()); + assert_eq!(0, metadata.mtime()); + assert_eq!(0, metadata.ctime()); + // crtime seems MacOS only + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Ensure we allocate the same inodes for the same directory contents. +/// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP. +async fn compare_inodes_directories() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + let p_sibling_dir = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); + + // peek at metadata. + assert_eq!( + fs::metadata(p_dir_with_keep) + .await + .expect("must succeed") + .ino(), + fs::metadata(p_sibling_dir) + .await + .expect("must succeed") + .ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we allocate the same inodes for the same directory contents. +/// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep +#[tokio::test] +async fn compare_inodes_files() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep"); + let p_keep2 = tmpdir + .path() + .join(DIRECTORY_COMPLICATED_NAME) + .join("keep") + .join(".keep"); + + // peek at metadata. + assert_eq!( + fs::metadata(p_keep1).await.expect("must succeed").ino(), + fs::metadata(p_keep2).await.expect("must succeed").ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we allocate the same inode for symlinks pointing to the same targets. +/// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2. +#[tokio::test] +async fn compare_inodes_symlinks() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + populate_symlink2(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa"); + let p2 = tmpdir.path().join(SYMLINK_NAME2); + + // peek at metadata. + assert_eq!( + fs::symlink_metadata(p1).await.expect("must succeed").ino(), + fs::symlink_metadata(p2).await.expect("must succeed").ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Check we match paths exactly. +#[tokio::test] +async fn read_wrong_paths_in_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + // wrong name + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); + + // invalid hash + assert!( + fs::metadata(tmpdir.path().join("0000000000000000000000000000000-test")) + .await + .is_err() + ); + + // right name, must exist + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-test")) + .await + .is_ok() + ); + + // now wrong name with right hash still may not exist + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Make sure writes are not allowed +#[tokio::test] +async fn disallow_writes() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + let e = fs::File::create(p).await.expect_err("must fail"); + + assert_eq!(Some(libc::EROFS), e.raw_os_error()); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Ensure we get an IO error if the directory service does not have the Directory object. +async fn missing_directory() { + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service) + .await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + + { + // `stat` on the path should succeed, because it doesn't trigger the directory request. + fs::metadata(&p).await.expect("must succeed"); + + // However, calling either `readdir` or `stat` on a child should fail with an IO error. + // It fails when trying to pull the first entry, because we don't implement opendir separately + ReadDirStream::new(fs::read_dir(&p).await.unwrap()) + .next() + .await + .expect("must be some") + .expect_err("must be err"); + + // rust currently sets e.kind() to Uncategorized, which isn't very + // helpful, so we don't look at the error more closely than that.. + fs::metadata(p.join(".keep")).await.expect_err("must fail"); + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Ensure we get an IO error if the blob service does not have the blob +async fn missing_blob() { + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + { + // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service. + fs::metadata(&p).await.expect("must succeed"); + + // However, calling read on the blob should fail. + // rust currently sets e.kind() to Uncategorized, which isn't very + // helpful, so we don't look at the error more closely than that.. + fs::read(p).await.expect_err("must fail"); + } + + fuse_daemon.unmount().expect("unmount"); +} diff --git a/tvix/store/src/fs/virtiofs.rs b/tvix/store/src/fs/virtiofs.rs new file mode 100644 index 000000000000..3786a84285cd --- /dev/null +++ b/tvix/store/src/fs/virtiofs.rs @@ -0,0 +1,237 @@ +use std::{ + convert, error, fmt, io, + ops::Deref, + path::Path, + sync::{Arc, MutexGuard, RwLock}, +}; + +use fuse_backend_rs::{ + api::{filesystem::FileSystem, server::Server}, + transport::{FsCacheReqHandler, Reader, VirtioFsWriter}, +}; +use tracing::error; +use vhost::vhost_user::{ + Listener, SlaveFsCacheReq, VhostUserProtocolFeatures, VhostUserVirtioFeatures, +}; +use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringMutex, VringState, VringT}; +use virtio_bindings::bindings::virtio_ring::{ + VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC, +}; +use virtio_queue::QueueT; +use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; +use vmm_sys_util::epoll::EventSet; + +const VIRTIO_F_VERSION_1: u32 = 32; +const NUM_QUEUES: usize = 2; +const QUEUE_SIZE: usize = 1024; + +#[derive(Debug)] +enum Error { + /// Failed to handle non-input event. + HandleEventNotEpollIn, + /// Failed to handle unknown event. + HandleEventUnknownEvent, + /// Invalid descriptor chain. + InvlaidDescriptorChain, + /// Failed to handle filesystem requests. + HandleRequests(fuse_backend_rs::Error), + /// Failed to construct new vhost user daemon. + NewDaemon, + /// Failed to start the vhost user daemon. + StartDaemon, + /// Failed to wait for the vhost user daemon. + WaitDaemon, +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "vhost_user_fs_error: {self:?}") + } +} + +impl error::Error for Error {} + +impl convert::From<Error> for io::Error { + fn from(e: Error) -> Self { + io::Error::new(io::ErrorKind::Other, e) + } +} + +struct VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + server: Arc<Server<Arc<FS>>>, + event_idx: bool, + guest_mem: GuestMemoryAtomic<GuestMemoryMmap>, + cache_req: Option<SlaveFsCacheReq>, +} + +impl<FS> VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + fn process_queue(&mut self, vring: &mut MutexGuard<VringState>) -> std::io::Result<bool> { + let mut used_descs = false; + + while let Some(desc_chain) = vring + .get_queue_mut() + .pop_descriptor_chain(self.guest_mem.memory()) + { + let memory = desc_chain.memory(); + let reader = Reader::from_descriptor_chain(memory, desc_chain.clone()) + .map_err(|_| Error::InvlaidDescriptorChain)?; + let writer = VirtioFsWriter::new(memory, desc_chain.clone()) + .map_err(|_| Error::InvlaidDescriptorChain)?; + + self.server + .handle_message( + reader, + writer.into(), + self.cache_req + .as_mut() + .map(|req| req as &mut dyn FsCacheReqHandler), + None, + ) + .map_err(Error::HandleRequests)?; + + // TODO: Is len 0 correct? + if let Err(error) = vring + .get_queue_mut() + .add_used(memory, desc_chain.head_index(), 0) + { + error!(?error, "failed to add desc back to ring"); + } + + // TODO: What happens if we error out before here? + used_descs = true; + } + + let needs_notification = if self.event_idx { + match vring + .get_queue_mut() + .needs_notification(self.guest_mem.memory().deref()) + { + Ok(needs_notification) => needs_notification, + Err(error) => { + error!(?error, "failed to check if queue needs notification"); + true + } + } + } else { + true + }; + + if needs_notification { + if let Err(error) = vring.signal_used_queue() { + error!(?error, "failed to signal used queue"); + } + } + + Ok(used_descs) + } +} + +impl<FS> VhostUserBackendMut<VringMutex> for VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + fn num_queues(&self) -> usize { + NUM_QUEUES + } + + fn max_queue_size(&self) -> usize { + QUEUE_SIZE + } + + fn features(&self) -> u64 { + 1 << VIRTIO_F_VERSION_1 + | 1 << VIRTIO_RING_F_INDIRECT_DESC + | 1 << VIRTIO_RING_F_EVENT_IDX + | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::SLAVE_REQ + } + + fn set_event_idx(&mut self, enabled: bool) { + self.event_idx = enabled; + } + + fn update_memory(&mut self, _mem: GuestMemoryAtomic<GuestMemoryMmap>) -> std::io::Result<()> { + // This is what most the vhost user implementations do... + Ok(()) + } + + fn set_slave_req_fd(&mut self, cache_req: SlaveFsCacheReq) { + self.cache_req = Some(cache_req); + } + + fn handle_event( + &mut self, + device_event: u16, + evset: vmm_sys_util::epoll::EventSet, + vrings: &[VringMutex], + _thread_id: usize, + ) -> std::io::Result<bool> { + if evset != EventSet::IN { + return Err(Error::HandleEventNotEpollIn.into()); + } + + let mut queue = match device_event { + // High priority queue + 0 => vrings[0].get_mut(), + // Regurlar priority queue + 1 => vrings[1].get_mut(), + _ => { + return Err(Error::HandleEventUnknownEvent.into()); + } + }; + + if self.event_idx { + loop { + queue + .get_queue_mut() + .enable_notification(self.guest_mem.memory().deref()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + if !self.process_queue(&mut queue)? { + break; + } + } + } else { + self.process_queue(&mut queue)?; + } + + Ok(false) + } +} + +pub fn start_virtiofs_daemon<FS, P>(fs: FS, socket: P) -> io::Result<()> +where + FS: FileSystem + Send + Sync + 'static, + P: AsRef<Path>, +{ + let guest_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); + + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let backend = Arc::new(RwLock::new(VhostUserFsBackend { + server, + guest_mem: guest_mem.clone(), + event_idx: false, + cache_req: None, + })); + + let listener = Listener::new(socket, true).unwrap(); + + let mut fs_daemon = + VhostUserDaemon::new(String::from("vhost-user-fs-tvix-store"), backend, guest_mem) + .map_err(|_| Error::NewDaemon)?; + + fs_daemon.start(listener).map_err(|_| Error::StartDaemon)?; + + fs_daemon.wait().map_err(|_| Error::WaitDaemon)?; + + Ok(()) +} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs new file mode 100644 index 000000000000..c988e147174b --- /dev/null +++ b/tvix/store/src/lib.rs @@ -0,0 +1,10 @@ +#[cfg(feature = "fs")] +pub mod fs; + +pub mod listener; +pub mod nar; +pub mod pathinfoservice; +pub mod proto; + +#[cfg(test)] +mod tests; diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs new file mode 100644 index 000000000000..ed1220803562 --- /dev/null +++ b/tvix/store/src/listener/mod.rs @@ -0,0 +1,131 @@ +use std::{ + io, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::Stream; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_listener::{Listener, ListenerAddress}; +use tonic::transport::server::{Connected, TcpConnectInfo, UdsConnectInfo}; + +/// A wrapper around a [Listener] which implements the [Stream] trait. +/// Mainly used to bridge [tokio_listener] with [tonic]. +pub struct ListenerStream { + inner: Listener, +} + +impl ListenerStream { + /// Convert a [Listener] into a [Stream]. + pub fn new(inner: Listener) -> Self { + Self { inner } + } + + /// Binds to the specified address and returns a [Stream] of connections. + pub async fn bind(addr: &ListenerAddress) -> io::Result<Self> { + let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?; + + Ok(Self::new(listener)) + } +} + +impl Stream for ListenerStream { + type Item = io::Result<Connection>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +pin_project! { + /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait + /// so it is compatible with [tonic]. + pub struct Connection { + #[pin] + inner: tokio_listener::Connection, + } +} + +impl Connection { + fn new(inner: tokio_listener::Connection) -> Self { + Self { inner } + } +} + +impl Deref for Connection { + type Target = tokio_listener::Connection; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for Connection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +#[derive(Clone)] +pub enum ListenerConnectInfo { + TCP(TcpConnectInfo), + Unix(UdsConnectInfo), + Stdio, + Other, +} + +impl Connected for Connection { + type ConnectInfo = ListenerConnectInfo; + + fn connect_info(&self) -> Self::ConnectInfo { + if let Some(tcp_stream) = self.try_borrow_tcp() { + ListenerConnectInfo::TCP(tcp_stream.connect_info()) + } else if let Some(unix_stream) = self.try_borrow_unix() { + ListenerConnectInfo::Unix(unix_stream.connect_info()) + } else if let Some(_) = self.try_borrow_stdio() { + ListenerConnectInfo::Stdio + } else { + ListenerConnectInfo::Other + } + } +} + +impl AsyncRead for Connection { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncWrite for Connection { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll<std::result::Result<usize, io::Error>> { + self.project().inner.poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<std::result::Result<(), io::Error>> { + self.project().inner.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<std::result::Result<(), io::Error>> { + self.project().inner.poll_shutdown(cx) + } +} diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs new file mode 100644 index 000000000000..fc6805e9e758 --- /dev/null +++ b/tvix/store/src/nar/mod.rs @@ -0,0 +1,25 @@ +use data_encoding::BASE64; +use tvix_castore::{B3Digest, Error}; + +mod renderer; +pub use renderer::calculate_size_and_sha256; +pub use renderer::write_nar; + +/// Errors that can encounter while rendering NARs. +#[derive(Debug, thiserror::Error)] +pub enum RenderError { + #[error("failure talking to a backing store client: {0}")] + StoreError(Error), + + #[error("unable to find directory {}, referred from {:?}", .0, .1)] + DirectoryNotFound(B3Digest, bytes::Bytes), + + #[error("unable to find blob {}, referred from {:?}", BASE64.encode(.0), .1)] + BlobNotFound([u8; 32], bytes::Bytes), + + #[error("unexpected size in metadata for blob {}, referred from {:?} returned, expected {}, got {}", BASE64.encode(.0), .1, .2, .3)] + UnexpectedBlobMeta([u8; 32], bytes::Bytes, u32, u32), + + #[error("failure using the NAR writer: {0}")] + NARWriterError(std::io::Error), +} diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs new file mode 100644 index 000000000000..55dce911ee1a --- /dev/null +++ b/tvix/store/src/nar/renderer.rs @@ -0,0 +1,166 @@ +use super::RenderError; +use count_write::CountWrite; +use nix_compat::nar; +use sha2::{Digest, Sha256}; +use std::{io, sync::Arc}; +use tokio::{io::BufReader, task::spawn_blocking}; +use tracing::warn; +use tvix_castore::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{self as castorepb, NamedNode}, + Error, +}; + +/// Invoke [write_nar], and return the size and sha256 digest of the produced +/// NAR output. +pub async fn calculate_size_and_sha256( + root_node: &castorepb::node::Node, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) -> Result<(u64, [u8; 32]), RenderError> { + let h = Sha256::new(); + let cw = CountWrite::from(h); + + let cw = write_nar(cw, root_node, blob_service, directory_service).await?; + + Ok((cw.count(), cw.into_inner().finalize().into())) +} + +/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path, +/// and uses the passed blob_service and directory_service to perform the +/// necessary lookups as it traverses the structure. +/// The contents in NAR serialization are writen to the passed [std::io::Write]. +/// +/// The writer is passed back in the return value. This is done because async Rust +/// lacks scoped blocking tasks, so we need to transfer ownership of the writer +/// internally. +/// +/// # Panics +/// This will panic if called outside the context of a Tokio runtime. +pub async fn write_nar<W: std::io::Write + Send + 'static>( + mut w: W, + proto_root_node: &castorepb::node::Node, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) -> Result<W, RenderError> { + let tokio_handle = tokio::runtime::Handle::current(); + let proto_root_node = proto_root_node.clone(); + + spawn_blocking(move || { + // Initialize NAR writer + let nar_root_node = nar::writer::open(&mut w).map_err(RenderError::NARWriterError)?; + + walk_node( + tokio_handle, + nar_root_node, + &proto_root_node, + blob_service, + directory_service, + )?; + + Ok(w) + }) + .await + .unwrap() +} + +/// Process an intermediate node in the structure. +/// This consumes the node. +fn walk_node( + tokio_handle: tokio::runtime::Handle, + nar_node: nar::writer::Node, + proto_node: &castorepb::node::Node, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) -> Result<(), RenderError> { + match proto_node { + castorepb::node::Node::Symlink(proto_symlink_node) => { + nar_node + .symlink(&proto_symlink_node.target) + .map_err(RenderError::NARWriterError)?; + } + castorepb::node::Node::File(proto_file_node) => { + let digest = proto_file_node.digest.clone().try_into().map_err(|_e| { + warn!( + file_node = ?proto_file_node, + "invalid digest length in file node", + ); + + RenderError::StoreError(Error::StorageError( + "invalid digest len in file node".to_string(), + )) + })?; + + let blob_reader = match tokio_handle + .block_on(async { blob_service.open_read(&digest).await }) + .map_err(RenderError::StoreError)? + { + Some(blob_reader) => Ok(BufReader::new(blob_reader)), + None => Err(RenderError::NARWriterError(io::Error::new( + io::ErrorKind::NotFound, + format!("blob with digest {} not found", &digest), + ))), + }?; + + nar_node + .file( + proto_file_node.executable, + proto_file_node.size.into(), + &mut tokio_util::io::SyncIoBridge::new(blob_reader), + ) + .map_err(RenderError::NARWriterError)?; + } + castorepb::node::Node::Directory(proto_directory_node) => { + let digest = proto_directory_node + .digest + .clone() + .try_into() + .map_err(|_e| { + RenderError::StoreError(Error::StorageError( + "invalid digest len in directory node".to_string(), + )) + })?; + + // look it up with the directory service + match tokio_handle + .block_on(async { directory_service.get(&digest).await }) + .map_err(RenderError::StoreError)? + { + // if it's None, that's an error! + None => { + return Err(RenderError::DirectoryNotFound( + digest, + proto_directory_node.name.clone(), + )) + } + Some(proto_directory) => { + // start a directory node + let mut nar_node_directory = + nar_node.directory().map_err(RenderError::NARWriterError)?; + + // for each node in the directory, create a new entry with its name, + // and then invoke walk_node on that entry. + for proto_node in proto_directory.nodes() { + let child_node = nar_node_directory + .entry(proto_node.get_name()) + .map_err(RenderError::NARWriterError)?; + walk_node( + tokio_handle.clone(), + child_node, + &proto_node, + blob_service.clone(), + directory_service.clone(), + )?; + } + + // close the directory + nar_node_directory + .close() + .map_err(RenderError::NARWriterError)?; + } + } + } + } + Ok(()) +} diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs new file mode 100644 index 000000000000..93cb487f29b9 --- /dev/null +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -0,0 +1,56 @@ +use super::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService, SledPathInfoService}; + +use std::sync::Arc; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; +use url::Url; + +/// Constructs a new instance of a [PathInfoService] from an URI. +/// +/// The following URIs are supported: +/// - `memory:` +/// Uses a in-memory implementation. +/// - `sled:` +/// Uses a in-memory sled implementation. +/// - `sled:///absolute/path/to/somewhere` +/// Uses sled, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. +/// - `grpc+unix:///absolute/path/to/somewhere` +/// Connects to a local tvix-store gRPC service via Unix socket. +/// - `grpc+http://host:port`, `grpc+https://host:port` +/// Connects to a (remote) tvix-store gRPC service. +/// +/// As the [PathInfoService] needs to talk to [BlobService] and [DirectoryService], +/// these also need to be passed in. +pub fn from_addr( + uri: &str, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) -> Result<Arc<dyn PathInfoService>, Error> { + let url = + Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; + + Ok(if url.scheme() == "memory" { + Arc::new(MemoryPathInfoService::from_url( + &url, + blob_service, + directory_service, + )?) + } else if url.scheme() == "sled" { + Arc::new(SledPathInfoService::from_url( + &url, + blob_service, + directory_service, + )?) + } else if url.scheme().starts_with("grpc+") { + Arc::new(GRPCPathInfoService::from_url( + &url, + blob_service, + directory_service, + )?) + } else { + Err(Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))? + }) +} diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs new file mode 100644 index 000000000000..a88828083940 --- /dev/null +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -0,0 +1,329 @@ +use super::PathInfoService; +use crate::proto::{self, ListPathInfoRequest, PathInfo}; +use async_stream::try_stream; +use futures::Stream; +use std::{pin::Pin, sync::Arc}; +use tokio::net::UnixStream; +use tonic::{async_trait, transport::Channel, Code}; +use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, +}; + +/// Connects to a (remote) tvix-store PathInfoService over gRPC. +#[derive(Clone)] +pub struct GRPCPathInfoService { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, +} + +impl GRPCPathInfoService { + /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + ) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl PathInfoService for GRPCPathInfoService { + /// Constructs a [GRPCPathInfoService] from the passed [url::Url]: + /// - scheme has to match `grpc+*://`. + /// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + /// - In the case of unix sockets, there must be a path, but may not be a host. + /// - In the case of non-unix sockets, there must be a host, but no path. + /// The blob_service and directory_service arguments are ignored, because the gRPC service already provides answers to these questions. + fn from_url( + url: &url::Url, + _blob_service: Arc<dyn BlobService>, + _directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, tvix_castore::Error> { + // Start checking for the scheme to start with grpc+. + match url.scheme().strip_prefix("grpc+") { + None => Err(Error::StorageError("invalid scheme".to_string())), + Some(rest) => { + if rest == "unix" { + if url.host_str().is_some() { + return Err(Error::StorageError("host may not be set".to_string())); + } + let path = url.path().to_string(); + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter + .unwrap() + .connect_with_connector_lazy(tower::service_fn( + move |_: tonic::transport::Uri| UnixStream::connect(path.clone()), + )); + let grpc_client = + proto::path_info_service_client::PathInfoServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } else { + // ensure path is empty, not supported with gRPC. + if !url.path().is_empty() { + return Err(tvix_castore::Error::StorageError( + "path may not be set".to_string(), + )); + } + + // clone the uri, and drop the grpc+ from the scheme. + // Recreate a new uri with the `grpc+` prefix dropped from the scheme. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let url = { + let url_str = url.to_string(); + let s_stripped = url_str.strip_prefix("grpc+").unwrap(); + url::Url::parse(s_stripped).unwrap() + }; + let channel = tonic::transport::Endpoint::try_from(url.to_string()) + .unwrap() + .connect_lazy(); + + let grpc_client = + proto::path_info_service_client::PathInfoServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } + } + } + } + + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let path_info = grpc_client + .get(proto::GetPathInfoRequest { + by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( + digest.to_vec().into(), + )), + }) + .await; + + match path_info { + Ok(path_info) => Ok(Some(path_info.into_inner())), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let path_info = grpc_client + .put(path_info) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + Ok(path_info) + } + + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + let root_node = root_node.clone(); + + let path_info = grpc_client + .calculate_nar(castorepb::Node { + node: Some(root_node), + }) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + let nar_sha256: [u8; 32] = path_info + .nar_sha256 + .to_vec() + .try_into() + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + + Ok((path_info.nar_size, nar_sha256)) + } + + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + let mut grpc_client = self.grpc_client.clone(); + + let stream = try_stream! { + let resp = grpc_client.list(ListPathInfoRequest::default()).await; + + let mut stream = resp.map_err(|e| Error::StorageError(e.to_string()))?.into_inner(); + + loop { + match stream.message().await { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + Err(Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))?; + } + yield pathinfo + } + None => { + return; + }, + }, + Err(e) => Err(Error::StorageError(e.to_string()))?, + } + } + }; + + Box::pin(stream) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio_retry::strategy::ExponentialBackoff; + use tokio_retry::Retry; + use tokio_stream::wrappers::UnixListenerStream; + + use crate::pathinfoservice::MemoryPathInfoService; + use crate::proto::GRPCPathInfoServiceWrapper; + use crate::tests::fixtures; + use crate::tests::utils::gen_blob_service; + use crate::tests::utils::gen_directory_service; + + use super::GRPCPathInfoService; + use super::PathInfoService; + + /// This uses the wrong scheme + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This uses the correct scheme for a unix socket. + /// The fact that /path/to/somewhere doesn't exist yet is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_unix_path() { + let url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This uses the correct scheme for a unix socket, + /// but sets a host, which is unsupported. + #[tokio::test] + async fn test_invalid_unix_path_with_domain() { + let url = + url::Url::parse("grpc+unix://host.example/path/to/somewhere").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This uses the correct scheme for a HTTP server. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_http() { + let url = url::Url::parse("grpc+http://localhost").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This uses the correct scheme for a HTTPS server. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_https() { + let url = url::Url::parse("grpc+https://localhost").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This uses the correct scheme, but also specifies + /// an additional path, which is not supported for gRPC. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_invalid_http_with_path() { + let url = url::Url::parse("grpc+https://localhost/some-path").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("daemon"); + + let path_clone = socket_path.clone(); + + // Spin up a server + tokio::spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = server.add_service( + crate::proto::path_info_service_server::PathInfoServiceServer::new( + GRPCPathInfoServiceWrapper::from(Arc::new(MemoryPathInfoService::new( + gen_blob_service(), + gen_directory_service(), + )) + as Arc<dyn PathInfoService>), + ), + ); + router.serve_with_incoming(uds_stream).await + }); + + // wait for the socket to be created + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // prepare a client + let grpc_client = { + let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) + .expect("must parse"); + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .expect("must succeed") + }; + + let path_info = grpc_client + .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap()) + .await + .expect("must not be error"); + + assert!(path_info.is_none()); + } +} diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs new file mode 100644 index 000000000000..dbb4b02dd013 --- /dev/null +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -0,0 +1,172 @@ +use super::PathInfoService; +use crate::{nar::calculate_size_and_sha256, proto::PathInfo}; +use futures::{stream::iter, Stream}; +use std::{ + collections::HashMap, + pin::Pin, + sync::{Arc, RwLock}, +}; +use tonic::async_trait; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; + +pub struct MemoryPathInfoService { + db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>, + + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +} + +impl MemoryPathInfoService { + pub fn new( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Self { + Self { + db: Default::default(), + blob_service, + directory_service, + } + } +} + +#[async_trait] +impl PathInfoService for MemoryPathInfoService { + /// Constructs a [MemoryPathInfoService] from the passed [url::Url]: + /// - scheme has to be `memory://` + /// - there may not be a host. + /// - there may not be a path. + fn from_url( + url: &url::Url, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, Error> { + if url.scheme() != "memory" { + return Err(Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string())); + } + + Ok(Self::new(blob_service, directory_service)) + } + + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let db = self.db.read().unwrap(); + + match db.get(&digest) { + None => Ok(None), + Some(path_info) => Ok(Some(path_info.clone())), + } + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + match path_info.validate() { + Err(e) => Err(Error::InvalidRequest(format!( + "failed to validate PathInfo: {}", + e + ))), + + // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. + // This overwrites existing PathInfo objects. + Ok(nix_path) => { + let mut db = self.db.write().unwrap(); + db.insert(nix_path.digest, path_info.clone()); + + Ok(path_info) + } + } + } + + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + calculate_size_and_sha256( + root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .await + .map_err(|e| Error::StorageError(e.to_string())) + } + + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + let db = self.db.read().unwrap(); + + // Copy all elements into a list. + // This is a bit ugly, because we can't have db escape the lifetime + // of this function, but elements need to be returned owned anyways, and this in- + // memory impl is only for testing purposes anyways. + let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + + Box::pin(iter(items)) + } +} + +#[cfg(test)] +mod tests { + use crate::tests::utils::gen_blob_service; + use crate::tests::utils::gen_directory_service; + + use super::MemoryPathInfoService; + use super::PathInfoService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!( + MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This correctly sets the scheme, and doesn't set a path. + #[test] + fn test_valid_scheme() { + let url = url::Url::parse("memory://").expect("must parse"); + + assert!( + MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This sets the host to `foo` + #[test] + fn test_invalid_host() { + let url = url::Url::parse("memory://foo").expect("must parse"); + + assert!( + MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This has the path "/", which is invalid. + #[test] + fn test_invalid_has_path() { + let url = url::Url::parse("memory:///").expect("must parse"); + + assert!( + MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This has the path "/foo", which is invalid. + #[test] + fn test_invalid_path2() { + let url = url::Url::parse("memory:///foo").expect("must parse"); + + assert!( + MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } +} diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs new file mode 100644 index 000000000000..af7bbc9f88e4 --- /dev/null +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -0,0 +1,61 @@ +mod from_addr; +mod grpc; +mod memory; +mod sled; + +use std::pin::Pin; +use std::sync::Arc; + +use futures::Stream; +use tonic::async_trait; +use tvix_castore::blobservice::BlobService; +use tvix_castore::directoryservice::DirectoryService; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; + +use crate::proto::PathInfo; + +pub use self::from_addr::from_addr; +pub use self::grpc::GRPCPathInfoService; +pub use self::memory::MemoryPathInfoService; +pub use self::sled::SledPathInfoService; + +/// The base trait all PathInfo services need to implement. +#[async_trait] +pub trait PathInfoService: Send + Sync { + /// Create a new instance by passing in a connection URL, as well + /// as instances of a [PathInfoService] and [DirectoryService] (as the + /// [PathInfoService] needs to talk to them). + /// TODO: check if we want to make this async, instead of lazily connecting + fn from_url( + url: &url::Url, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, Error> + where + Self: Sized; + + /// Retrieve a PathInfo message by the output digest. + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error>; + + /// Store a PathInfo message. Implementations MUST call validate and reject + /// invalid messages. + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>; + + /// Return the nar size and nar sha256 digest for a given root node. + /// This can be used to calculate NAR-based output paths, + /// and implementations are encouraged to cache it. + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error>; + + /// Iterate over all PathInfo objects in the store. + /// Implementations can decide to disallow listing. + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>>; +} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs new file mode 100644 index 000000000000..bac384ea0912 --- /dev/null +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -0,0 +1,269 @@ +use super::PathInfoService; +use crate::nar::calculate_size_and_sha256; +use crate::proto::PathInfo; +use futures::{stream::iter, Stream}; +use prost::Message; +use std::{path::PathBuf, pin::Pin, sync::Arc}; +use tonic::async_trait; +use tracing::warn; +use tvix_castore::proto as castorepb; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; + +/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). +/// +/// The PathInfo messages are stored as encoded protos, and keyed by their output hash, +/// as that's currently the only request type available. +pub struct SledPathInfoService { + db: sled::Db, + + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +} + +impl SledPathInfoService { + pub fn new( + p: PathBuf, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { + db, + blob_service, + directory_service, + }) + } + + pub fn new_temporary( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { + db, + blob_service, + directory_service, + }) + } +} + +#[async_trait] +impl PathInfoService for SledPathInfoService { + /// Constructs a [SledPathInfoService] from the passed [url::Url]: + /// - scheme has to be `sled://` + /// - there may not be a host. + /// - a path to the sled needs to be provided (which may not be `/`). + fn from_url( + url: &url::Url, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, Error> { + if url.scheme() != "sled" { + return Err(Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() { + return Err(Error::StorageError(format!( + "invalid host: {}", + url.host().unwrap() + ))); + } + + // TODO: expose compression and other parameters as URL parameters, drop new and new_temporary? + if url.path().is_empty() { + Self::new_temporary(blob_service, directory_service) + .map_err(|e| Error::StorageError(e.to_string())) + } else if url.path() == "/" { + Err(Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )) + } else { + Self::new(url.path().into(), blob_service, directory_service) + .map_err(|e| Error::StorageError(e.to_string())) + } + } + + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + match self.db.get(digest) { + Ok(None) => Ok(None), + Ok(Some(data)) => match PathInfo::decode(&*data) { + Ok(path_info) => Ok(Some(path_info)), + Err(e) => { + warn!("failed to decode stored PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to decode stored PathInfo: {}", + e + ))) + } + }, + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to retrieve PathInfo: {}", + e + ))) + } + } + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + match path_info.validate() { + Err(e) => Err(Error::InvalidRequest(format!( + "failed to validate PathInfo: {}", + e + ))), + // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. + // This overwrites existing PathInfo objects. + Ok(nix_path) => match self.db.insert(nix_path.digest, path_info.encode_to_vec()) { + Ok(_) => Ok(path_info), + Err(e) => { + warn!("failed to insert PathInfo: {}", e); + Err(Error::StorageError(format! { + "failed to insert PathInfo: {}", e + })) + } + }, + } + } + + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + calculate_size_and_sha256( + root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .await + .map_err(|e| Error::StorageError(e.to_string())) + } + + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + Box::pin(iter(self.db.iter().values().map(|v| match v { + Ok(data) => { + // we retrieved some bytes + match PathInfo::decode(&*data) { + Ok(path_info) => Ok(path_info), + Err(e) => { + warn!("failed to decode stored PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to decode stored PathInfo: {}", + e + ))) + } + } + } + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to retrieve PathInfo: {}", + e + ))) + } + }))) + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use crate::tests::utils::gen_blob_service; + use crate::tests::utils::gen_directory_service; + + use super::PathInfoService; + use super::SledPathInfoService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This uses the correct scheme, and doesn't specify a path (temporary sled). + #[test] + fn test_valid_scheme_temporary() { + let url = url::Url::parse("sled://").expect("must parse"); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This sets the path to a location that doesn't exist, which should fail (as sled doesn't mkdir -p) + #[test] + fn test_nonexistent_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().join("foo").join("bar").to_str().unwrap()); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This uses the correct scheme, and specifies / as path (which should fail + // for obvious reasons) + #[test] + fn test_invalid_path_root() { + let url = url::Url::parse("sled:///").expect("must parse"); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This uses the correct scheme, and sets a tempdir as location. + #[test] + fn test_valid_scheme_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This sets a host, rather than a path, which should fail. + #[test] + fn test_invalid_host() { + let url = url::Url::parse("sled://foo.example").expect("must parse"); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This sets a host AND a valid path, which should fail + #[test] + fn test_invalid_host_and_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } +} diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs new file mode 100644 index 000000000000..7632614291dc --- /dev/null +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -0,0 +1,125 @@ +use crate::nar::RenderError; +use crate::pathinfoservice::PathInfoService; +use crate::proto; +use futures::StreamExt; +use std::sync::Arc; +use tokio::task; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{async_trait, Request, Response, Result, Status}; +use tracing::{debug, instrument, warn}; +use tvix_castore::proto as castorepb; + +pub struct GRPCPathInfoServiceWrapper { + path_info_service: Arc<dyn PathInfoService>, + // FUTUREWORK: allow exposing without allowing listing +} + +impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper { + fn from(value: Arc<dyn PathInfoService>) -> Self { + Self { + path_info_service: value, + } + } +} + +#[async_trait] +impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper { + type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>; + + #[instrument(skip(self))] + async fn get( + &self, + request: Request<proto::GetPathInfoRequest>, + ) -> Result<Response<proto::PathInfo>> { + match request.into_inner().by_what { + None => Err(Status::unimplemented("by_what needs to be specified")), + Some(proto::get_path_info_request::ByWhat::ByOutputHash(output_digest)) => { + let digest: [u8; 20] = output_digest + .to_vec() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid output digest length"))?; + match self.path_info_service.get(digest).await { + Ok(None) => Err(Status::not_found("PathInfo not found")), + Ok(Some(path_info)) => Ok(Response::new(path_info)), + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(e.into()) + } + } + } + } + } + + #[instrument(skip(self))] + async fn put(&self, request: Request<proto::PathInfo>) -> Result<Response<proto::PathInfo>> { + let path_info = request.into_inner(); + + // Store the PathInfo in the client. Clients MUST validate the data + // they receive, so we don't validate additionally here. + match self.path_info_service.put(path_info).await { + Ok(path_info_new) => Ok(Response::new(path_info_new)), + Err(e) => { + warn!("failed to insert PathInfo: {}", e); + Err(e.into()) + } + } + } + + #[instrument(skip(self))] + async fn calculate_nar( + &self, + request: Request<castorepb::Node>, + ) -> Result<Response<proto::CalculateNarResponse>> { + match request.into_inner().node { + None => Err(Status::invalid_argument("no root node sent")), + Some(root_node) => { + let path_info_service = self.path_info_service.clone(); + let (nar_size, nar_sha256) = path_info_service + .calculate_nar(&root_node) + .await + .expect("error during nar calculation"); // TODO: handle error + + Ok(Response::new(proto::CalculateNarResponse { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + })) + } + } + } + + #[instrument(skip(self))] + async fn list( + &self, + _request: Request<proto::ListPathInfoRequest>, + ) -> Result<Response<Self::ListStream>, Status> { + let (tx, rx) = tokio::sync::mpsc::channel(5); + + let path_info_service = self.path_info_service.clone(); + + let _task = task::spawn(async move { + let mut stream = path_info_service.list(); + while let Some(e) = stream.next().await { + let res = e.map_err(|e| Status::internal(e.to_string())); + if tx.send(res).await.is_err() { + debug!("receiver dropped"); + break; + } + } + }); + + let receiver_stream = ReceiverStream::new(rx); + Ok(Response::new(receiver_stream)) + } +} + +impl From<RenderError> for tonic::Status { + fn from(value: RenderError) -> Self { + match value { + RenderError::BlobNotFound(_, _) => Self::not_found(value.to_string()), + RenderError::DirectoryNotFound(_, _) => Self::not_found(value.to_string()), + RenderError::NARWriterError(_) => Self::internal(value.to_string()), + RenderError::StoreError(_) => Self::internal(value.to_string()), + RenderError::UnexpectedBlobMeta(_, _, _, _) => Self::internal(value.to_string()), + } + } +} diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs new file mode 100644 index 000000000000..c1d9d0c46eb5 --- /dev/null +++ b/tvix/store/src/proto/mod.rs @@ -0,0 +1,173 @@ +#![allow(clippy::derive_partial_eq_without_eq, non_snake_case)] +use data_encoding::BASE64; +// https://github.com/hyperium/tonic/issues/1056 +use nix_compat::store_path::{self, StorePath}; +use thiserror::Error; +use tvix_castore::{ + proto::{self as castorepb, NamedNode}, + B3Digest, B3_LEN, +}; + +mod grpc_pathinfoservice_wrapper; + +pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper; + +tonic::include_proto!("tvix.store.v1"); + +#[cfg(feature = "tonic-reflection")] +/// Compiled file descriptors for implementing [gRPC +/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g. +/// [`tonic_reflection`](https://docs.rs/tonic-reflection). +pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.store.v1"); + +#[cfg(test)] +mod tests; + +/// Errors that can occur during the validation of PathInfo messages. +#[derive(Debug, Error, PartialEq)] +pub enum ValidatePathInfoError { + /// Invalid length of a reference + #[error("Invalid length of digest at position {}, expected {}, got {}", .0, store_path::DIGEST_SIZE, .1)] + InvalidReferenceDigestLen(usize, usize), + + /// No node present + #[error("No node present")] + NoNodePresent(), + + /// Invalid node name encountered. + #[error("Failed to parse {0:?} as StorePath: {1}")] + InvalidNodeName(Vec<u8>, store_path::Error), + + /// The digest the (root) node refers to has invalid length. + #[error("Invalid Digest length: expected {}, got {}", B3_LEN, .0)] + InvalidNodeDigestLen(usize), + + /// The number of references in the narinfo.reference_names field does not match + /// the number of references in the .references field. + #[error("Inconsistent Number of References: {0} (references) vs {1} (narinfo)")] + InconsistentNumberOfReferences(usize, usize), + + /// A string in narinfo.reference_names does not parse to a StorePath. + #[error("Invalid reference_name at position {0}: {1}")] + InvalidNarinfoReferenceName(usize, String), + + /// The digest in the parsed `.narinfo.reference_names[i]` does not match + /// the one in `.references[i]`.` + #[error("digest in reference_name at position {} does not match digest in PathInfo, expected {}, got {}", .0, BASE64.encode(.1), BASE64.encode(.2))] + InconsistentNarinfoReferenceNameDigest( + usize, + [u8; store_path::DIGEST_SIZE], + [u8; store_path::DIGEST_SIZE], + ), +} + +/// Parses a root node name. +/// +/// On success, this returns the parsed [StorePath]. +/// On error, it returns an error generated from the supplied constructor. +fn parse_node_name_root<E>( + name: &[u8], + err: fn(Vec<u8>, store_path::Error) -> E, +) -> Result<StorePath, E> { + match StorePath::from_bytes(name) { + Ok(np) => Ok(np), + Err(e) => Err(err(name.to_vec(), e)), + } +} + +impl PathInfo { + /// validate performs some checks on the PathInfo struct, + /// Returning either a [StorePath] of the root node, or a + /// [ValidatePathInfoError]. + pub fn validate(&self) -> Result<StorePath, ValidatePathInfoError> { + // ensure the references have the right number of bytes. + for (i, reference) in self.references.iter().enumerate() { + if reference.len() != store_path::DIGEST_SIZE { + return Err(ValidatePathInfoError::InvalidReferenceDigestLen( + i, + reference.len(), + )); + } + } + + // If there is a narinfo field populated, ensure the number of references there + // matches PathInfo.references count. + if let Some(narinfo) = &self.narinfo { + if narinfo.reference_names.len() != self.references.len() { + return Err(ValidatePathInfoError::InconsistentNumberOfReferences( + self.references.len(), + narinfo.reference_names.len(), + )); + } + + // parse references in reference_names. + for (i, reference_name_str) in narinfo.reference_names.iter().enumerate() { + // ensure thy parse as (non-absolute) store path + let reference_names_store_path = + StorePath::from_bytes(reference_name_str.as_bytes()).map_err(|_| { + ValidatePathInfoError::InvalidNarinfoReferenceName( + i, + reference_name_str.to_owned(), + ) + })?; + + // ensure their digest matches the one at self.references[i]. + { + // This is safe, because we ensured the proper length earlier already. + let reference_digest = self.references[i].to_vec().try_into().unwrap(); + + if reference_names_store_path.digest != reference_digest { + return Err( + ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest( + i, + reference_digest, + reference_names_store_path.digest, + ), + ); + } + } + } + } + + // Ensure there is a (root) node present, and it properly parses to a [StorePath]. + let root_nix_path = match &self.node { + None => { + return Err(ValidatePathInfoError::NoNodePresent()); + } + Some(castorepb::Node { node }) => match node { + None => { + return Err(ValidatePathInfoError::NoNodePresent()); + } + Some(node) => { + match node { + // for a directory root node, ensure the digest has the appropriate size. + castorepb::node::Node::Directory(directory_node) => { + if TryInto::<B3Digest>::try_into(directory_node.digest.clone()).is_err() + { + return Err(ValidatePathInfoError::InvalidNodeDigestLen( + directory_node.digest.len(), + )); + } + } + // for a file root node, ensure the digest has the appropriate size. + castorepb::node::Node::File(file_node) => { + // ensure the digest has the appropriate size. + if TryInto::<B3Digest>::try_into(file_node.digest.clone()).is_err() { + return Err(ValidatePathInfoError::InvalidNodeDigestLen( + file_node.digest.len(), + )); + } + } + // nothing to do specifically for symlinks + castorepb::node::Node::Symlink(_) => {} + } + // parse the name of the node itself and return + parse_node_name_root(&node.get_name(), ValidatePathInfoError::InvalidNodeName)? + } + }, + }; + + // return the root nix path + Ok(root_nix_path) + } +} diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs new file mode 100644 index 000000000000..c0b953d0f2e9 --- /dev/null +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -0,0 +1,73 @@ +use crate::proto::get_path_info_request::ByWhat::ByOutputHash; +use crate::proto::path_info_service_server::PathInfoService as GRPCPathInfoService; +use crate::proto::GRPCPathInfoServiceWrapper; +use crate::proto::GetPathInfoRequest; +use crate::proto::PathInfo; +use crate::tests::fixtures::DUMMY_OUTPUT_HASH; +use crate::tests::utils::gen_blob_service; +use crate::tests::utils::gen_directory_service; +use crate::tests::utils::gen_pathinfo_service; +use std::sync::Arc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::Request; +use tvix_castore::proto as castorepb; + +/// generates a GRPCPathInfoService out of blob, directory and pathinfo services. +/// +/// We only interact with it via the PathInfo GRPC interface. +/// It uses the NonCachingNARCalculationService NARCalculationService to +/// calculate NARs. +fn gen_grpc_service( +) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> { + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + Arc::new(GRPCPathInfoServiceWrapper::from(gen_pathinfo_service( + blob_service, + directory_service, + ))) +} + +/// Trying to get a non-existent PathInfo should return a not found error. +#[tokio::test] +async fn not_found() { + let service = gen_grpc_service(); + + let resp = service + .get(Request::new(GetPathInfoRequest { + by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.clone())), + })) + .await; + + let resp = resp.expect_err("must fail"); + assert_eq!(resp.code(), tonic::Code::NotFound); +} + +/// Put a PathInfo into the store, get it back. +#[tokio::test] +async fn put_get() { + let service = gen_grpc_service(); + + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "00000000000000000000000000000000-foo".into(), + target: "doesntmatter".into(), + })), + }), + ..Default::default() + }; + + let resp = service.put(Request::new(path_info.clone())).await; + + assert!(resp.is_ok()); + assert_eq!(resp.expect("must succeed").into_inner(), path_info); + + let resp = service + .get(Request::new(GetPathInfoRequest { + by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.clone())), + })) + .await; + + assert!(resp.is_ok()); + assert_eq!(resp.expect("must succeed").into_inner(), path_info); +} diff --git a/tvix/store/src/proto/tests/mod.rs b/tvix/store/src/proto/tests/mod.rs new file mode 100644 index 000000000000..bff885624380 --- /dev/null +++ b/tvix/store/src/proto/tests/mod.rs @@ -0,0 +1,2 @@ +mod grpc_pathinfoservice; +mod pathinfo; diff --git a/tvix/store/src/proto/tests/pathinfo.rs b/tvix/store/src/proto/tests/pathinfo.rs new file mode 100644 index 000000000000..43a94e0d46ae --- /dev/null +++ b/tvix/store/src/proto/tests/pathinfo.rs @@ -0,0 +1,228 @@ +use crate::proto::{PathInfo, ValidatePathInfoError}; +use crate::tests::fixtures::*; +use bytes::Bytes; +use nix_compat::store_path::{self, StorePath}; +use std::str::FromStr; +use test_case::test_case; +use tvix_castore::proto as castorepb; + +#[test_case( + None, + Err(ValidatePathInfoError::NoNodePresent()) ; + "No node" +)] +#[test_case( + Some(castorepb::Node { node: None }), + Err(ValidatePathInfoError::NoNodePresent()); + "No node 2" +)] +fn validate_no_node( + t_node: Option<castorepb::Node>, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: t_node, + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +#[test_case( + castorepb::DirectoryNode { + name: DUMMY_NAME.into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, + }, + Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed")); + "ok" +)] +#[test_case( + castorepb::DirectoryNode { + name: DUMMY_NAME.into(), + digest: Bytes::new(), + size: 0, + }, + Err(ValidatePathInfoError::InvalidNodeDigestLen(0)); + "invalid digest length" +)] +#[test_case( + castorepb::DirectoryNode { + name: "invalid".into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".into(), + store_path::Error::InvalidLength() + )); + "invalid node name" +)] +fn validate_directory( + t_directory_node: castorepb::DirectoryNode, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Directory(t_directory_node)), + }), + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +#[test_case( + castorepb::FileNode { + name: DUMMY_NAME.into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, + executable: false, + }, + Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed")); + "ok" +)] +#[test_case( + castorepb::FileNode { + name: DUMMY_NAME.into(), + digest: Bytes::new(), + ..Default::default() + }, + Err(ValidatePathInfoError::InvalidNodeDigestLen(0)); + "invalid digest length" +)] +#[test_case( + castorepb::FileNode { + name: "invalid".into(), + digest: DUMMY_DIGEST.clone().into(), + ..Default::default() + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".into(), + store_path::Error::InvalidLength() + )); + "invalid node name" +)] +fn validate_file( + t_file_node: castorepb::FileNode, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::File(t_file_node)), + }), + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +#[test_case( + castorepb::SymlinkNode { + name: DUMMY_NAME.into(), + ..Default::default() + }, + Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed")); + "ok" +)] +#[test_case( + castorepb::SymlinkNode { + name: "invalid".into(), + ..Default::default() + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".into(), + store_path::Error::InvalidLength() + )); + "invalid node name" +)] +fn validate_symlink( + t_symlink_node: castorepb::SymlinkNode, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Symlink(t_symlink_node)), + }), + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +/// Ensure parsing a correct PathInfo without narinfo populated succeeds. +#[test] +fn validate_references_without_narinfo_ok() { + assert!(PATH_INFO_WITHOUT_NARINFO.validate().is_ok()); +} + +/// Ensure parsing a correct PathInfo with narinfo populated succeeds. +#[test] +fn validate_references_with_narinfo_ok() { + assert!(PATH_INFO_WITH_NARINFO.validate().is_ok()); +} + +/// Create a PathInfo with a wrong count of narinfo.reference_names, +/// and ensure validation fails. +#[test] +fn validate_inconsistent_num_refs_fail() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + path_info.narinfo.as_mut().unwrap().reference_names = vec![]; + + match path_info.validate().expect_err("must_fail") { + ValidatePathInfoError::InconsistentNumberOfReferences(1, 0) => {} + e => panic!("unexpected error: {:?}", e), + }; +} + +/// Create a PathInfo with a wrong digest length in references. +#[test] +fn validate_invalid_reference_digest_len() { + let mut path_info = PATH_INFO_WITHOUT_NARINFO.clone(); + path_info.references.push(vec![0xff, 0xff].into()); + + match path_info.validate().expect_err("must fail") { + ValidatePathInfoError::InvalidReferenceDigestLen( + 1, // position + 2, // unexpected digest len + ) => {} + e => panic!("unexpected error: {:?}", e), + }; +} + +/// Create a PathInfo with a narinfo.reference_name[1] that is no valid store path. +#[test] +fn validate_invalid_narinfo_reference_name() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + + // This is invalid, as the store prefix is not part of reference_names. + path_info.narinfo.as_mut().unwrap().reference_names[0] = + "/nix/store/00000000000000000000000000000000-dummy".to_string(); + + match path_info.validate().expect_err("must fail") { + ValidatePathInfoError::InvalidNarinfoReferenceName(0, reference_name) => { + assert_eq!( + "/nix/store/00000000000000000000000000000000-dummy", + reference_name + ); + } + e => panic!("unexpected error: {:?}", e), + } +} + +/// Create a PathInfo with a narinfo.reference_name[0] that doesn't match references[0]. +#[test] +fn validate_inconsistent_narinfo_reference_name_digest() { + let mut path_info = PATH_INFO_WITH_NARINFO.clone(); + + // mutate the first reference, they were all zeroes before + path_info.references[0] = vec![0xff; store_path::DIGEST_SIZE].into(); + + match path_info.validate().expect_err("must fail") { + ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest(0, e_expected, e_actual) => { + assert_eq!(path_info.references[0][..], e_expected); + assert_eq!(DUMMY_OUTPUT_HASH[..], e_actual); + } + e => panic!("unexpected error: {:?}", e), + } +} diff --git a/tvix/store/src/tests/fixtures.rs b/tvix/store/src/tests/fixtures.rs new file mode 100644 index 000000000000..5ff37a508433 --- /dev/null +++ b/tvix/store/src/tests/fixtures.rs @@ -0,0 +1,126 @@ +use lazy_static::lazy_static; +pub use tvix_castore::fixtures::*; +use tvix_castore::proto as castorepb; + +use crate::proto::{NarInfo, PathInfo}; + +pub const DUMMY_NAME: &str = "00000000000000000000000000000000-dummy"; + +lazy_static! { + // output hash + pub static ref DUMMY_OUTPUT_HASH: bytes::Bytes = vec![ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00 + ].into(); + + /// The NAR representation of a symlink pointing to `/nix/store/somewhereelse` + pub static ref NAR_CONTENTS_SYMLINK: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink" + 6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target + 24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o', + b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's', + b'e', // "/nix/store/somewhereelse" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")" + ]; + + /// The NAR representation of a regular file with the contents "Hello World!" + pub static ref NAR_CONTENTS_HELLOWORLD: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 12, 0, 0, 0, 0, 0, 0, 0, b'H', b'e', b'l', b'l', b'o', b' ', b'W', b'o', b'r', b'l', b'd', b'!', 0, 0, + 0, 0, // "Hello World!" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")" + ]; + + /// The NAR representation of a more complicated directory structure. + pub static ref NAR_CONTENTS_COMPLICATED: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 5, 0, 0, 0, 0, 0, 0, 0, b'.', b'k', b'e', b'e', b'p', 0, 0, 0, // ".keep" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 0, 0, 0, 0, 0, 0, 0, 0, // "" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 2, 0, 0, 0, 0, 0, 0, 0, b'a', b'a', 0, 0, 0, 0, 0, 0, // "aa" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink" + 6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target + 24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o', + b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's', + b'e', // "/nix/store/somewhereelse" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 4, 0, 0, 0, 0, 0, 0, 0, b'k', b'e', b'e', b'p', 0, 0, 0, 0, // "keep" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 5, 0, 0, 0, 0, 0, 0, 0, 46, 107, 101, 101, 112, 0, 0, 0, // ".keep" + 4, 0, 0, 0, 0, 0, 0, 0, 110, 111, 100, 101, 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 0, 0, 0, 0, 0, 0, 0, 0, // "" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + ]; + + /// A PathInfo message without .narinfo populated. + pub static ref PATH_INFO_WITHOUT_NARINFO : PathInfo = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: DUMMY_NAME.into(), + digest: DUMMY_DIGEST.clone().into(), + size: 0, + })), + }), + references: vec![DUMMY_OUTPUT_HASH.clone().into()], + narinfo: None, + }; + + /// A PathInfo message with .narinfo populated. + /// The references in `narinfo.reference_names` aligns with what's in + /// `references`. + pub static ref PATH_INFO_WITH_NARINFO : PathInfo = PathInfo { + narinfo: Some(NarInfo { + nar_size: 0, + nar_sha256: DUMMY_DIGEST.clone().into(), + signatures: vec![], + reference_names: vec![DUMMY_NAME.to_string()], + }), + ..PATH_INFO_WITHOUT_NARINFO.clone() + }; +} diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs new file mode 100644 index 000000000000..daea048deddf --- /dev/null +++ b/tvix/store/src/tests/mod.rs @@ -0,0 +1,3 @@ +pub mod fixtures; +mod nar_renderer; +pub mod utils; diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs new file mode 100644 index 000000000000..485d7d115ff5 --- /dev/null +++ b/tvix/store/src/tests/nar_renderer.rs @@ -0,0 +1,233 @@ +use crate::nar::calculate_size_and_sha256; +use crate::nar::write_nar; +use crate::tests::fixtures::*; +use crate::tests::utils::*; +use sha2::{Digest, Sha256}; +use std::io; +use tvix_castore::proto::DirectoryNode; +use tvix_castore::proto::FileNode; +use tvix_castore::proto::{self as castorepb, SymlinkNode}; + +#[tokio::test] +async fn single_symlink() { + let buf: Vec<u8> = vec![]; + + let buf = write_nar( + buf, + &castorepb::node::Node::Symlink(SymlinkNode { + name: "doesntmatter".into(), + target: "/nix/store/somewhereelse".into(), + }), + // don't put anything in the stores, as we don't actually do any requests. + gen_blob_service(), + gen_directory_service(), + ) + .await + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_SYMLINK.to_vec()); +} + +/// Make sure the NARRenderer fails if a referred blob doesn't exist. +#[tokio::test] +async fn single_file_missing_blob() { + let buf: Vec<u8> = vec![]; + + let e = write_nar( + buf, + &castorepb::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + // the blobservice is empty intentionally, to provoke the error. + gen_blob_service(), + gen_directory_service(), + ) + .await + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::NotFound, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } +} + +/// Make sure the NAR Renderer fails if the returned blob meta has another size +/// than specified in the proto node. +#[tokio::test] +async fn single_file_wrong_blob_size() { + let blob_service = gen_blob_service(); + + // insert blob into the store + let mut writer = blob_service.open_write().await; + tokio::io::copy( + &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .await + .unwrap(); + assert_eq!( + HELLOWORLD_BLOB_DIGEST.clone(), + writer.close().await.unwrap() + ); + + let bs = blob_service.clone(); + // Test with a root FileNode of a too big size + { + let buf: Vec<u8> = vec![]; + + let e = write_nar( + buf, + &castorepb::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 42, // <- note the wrong size here! + executable: false, + }), + bs, + gen_directory_service(), + ) + .await + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::UnexpectedEof, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } + } + + let bs = blob_service.clone(); + // Test with a root FileNode of a too small size + { + let buf: Vec<u8> = vec![]; + + let e = write_nar( + buf, + &castorepb::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 2, // <- note the wrong size here! + executable: false, + }), + bs, + gen_directory_service(), + ) + .await + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::InvalidInput, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } + } +} + +#[tokio::test] +async fn single_file() { + let blob_service = gen_blob_service(); + + // insert blob into the store + let mut writer = blob_service.open_write().await; + tokio::io::copy( + &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.clone()), + &mut writer, + ) + .await + .unwrap(); + + assert_eq!( + HELLOWORLD_BLOB_DIGEST.clone(), + writer.close().await.unwrap() + ); + + let buf: Vec<u8> = vec![]; + + let buf = write_nar( + buf, + &castorepb::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + blob_service, + gen_directory_service(), + ) + .await + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec()); +} + +#[tokio::test] +async fn test_complicated() { + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + + // put all data into the stores. + // insert blob into the store + let mut writer = blob_service.open_write().await; + tokio::io::copy( + &mut io::Cursor::new(EMPTY_BLOB_CONTENTS.clone()), + &mut writer, + ) + .await + .unwrap(); + assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap()); + + directory_service + .put(DIRECTORY_WITH_KEEP.clone()) + .await + .unwrap(); + directory_service + .put(DIRECTORY_COMPLICATED.clone()) + .await + .unwrap(); + + let buf: Vec<u8> = vec![]; + + let bs = blob_service.clone(); + let ds = directory_service.clone(); + + let buf = write_nar( + buf, + &castorepb::node::Node::Directory(DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + bs, + ds, + ) + .await + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec()); + + // ensure calculate_nar does return the correct sha256 digest and sum. + let bs = blob_service.clone(); + let ds = directory_service.clone(); + let (nar_size, nar_digest) = calculate_size_and_sha256( + &castorepb::node::Node::Directory(DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + bs, + ds, + ) + .await + .expect("must succeed"); + + assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size); + let d = Sha256::digest(NAR_CONTENTS_COMPLICATED.clone()); + assert_eq!(d.as_slice(), nar_digest); +} diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs new file mode 100644 index 000000000000..961be6e7ac07 --- /dev/null +++ b/tvix/store/src/tests/utils.rs @@ -0,0 +1,12 @@ +use crate::pathinfoservice::{MemoryPathInfoService, PathInfoService}; +use std::sync::Arc; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; + +pub use tvix_castore::utils::*; + +pub fn gen_pathinfo_service( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) -> Arc<dyn PathInfoService> { + Arc::new(MemoryPathInfoService::new(blob_service, directory_service)) +} |