diff options
author | Florian Klink <flokli@flokli.de> | 2024-03-01T16·00+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2024-03-11T22·42+0000 |
commit | 1c2db676a08847c9b73256d976f2b1eccf17310b (patch) | |
tree | 1548920f0c2c777d32b8a93994875ed4fe294922 /tvix | |
parent | d327bf775d376462dbe8cc2fe601b782b3ff02d3 (diff) |
feat(tvix/castore/blobsvc): add object storage implementation r/7684
This uses the `object_store` crate to expose a tvix-castore BlobService backed by object storage. It's using FastCDC to chunk blobs into smaller chunks when writing to it. These are exposed at the .chunks() method. Change-Id: I2858c403d4d6490cdca73ebef03c26290b2b3c8e Reviewed-on: https://cl.tvl.fyi/c/depot/+/11076 Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Reviewed-by: Brian Olsen <me@griff.name>
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/Cargo.lock | 171 | ||||
-rw-r--r-- | tvix/Cargo.nix | 545 | ||||
-rw-r--r-- | tvix/castore/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/mod.rs | 3 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/object_store.rs | 557 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/simplefs.rs | 1 |
6 files changed, 1271 insertions, 9 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 750bb8364ef2..9f59ebe54f73 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -27,6 +27,21 @@ dependencies = [ ] [[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] name = "anes" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -391,6 +406,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] +name = "chrono" +version = "0.4.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "serde", + "windows-targets 0.52.0", +] + +[[package]] name = "ciborium" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -731,6 +759,12 @@ dependencies = [ ] [[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + +[[package]] name = "document-features" version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -811,6 +845,17 @@ dependencies = [ ] [[package]] +name = "fastcdc" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a71061d097bfa9a5a4d2efdec57990d9a88745020b365191d37e48541a1628f2" +dependencies = [ + "async-stream", + "tokio", + "tokio-stream", +] + +[[package]] name = "fastrand" version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1158,6 +1203,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] name = "hyper" version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1208,6 +1259,29 @@ dependencies = [ ] [[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] name = "idna" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1711,6 +1785,37 @@ dependencies = [ ] [[package]] +name = "object_store" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3" +dependencies = [ + "async-trait", + "base64", + "bytes", + "chrono", + "futures", + "humantime", + "hyper", + "itertools 0.12.0", + "md-5", + "parking_lot 0.12.1", + "percent-encoding", + "quick-xml", + "rand", + "reqwest", + "ring", + "rustls-pemfile 2.1.0", + "serde", + "serde_json", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + +[[package]] name = "once_cell" version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2130,6 +2235,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + +[[package]] name = "quote" version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2323,7 +2438,7 @@ dependencies = [ "pin-project-lite", "rustls", "rustls-native-certs", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -2458,7 +2573,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "schannel", "security-framework", ] @@ -2473,6 +2588,22 @@ dependencies = [ ] [[package]] +name = "rustls-pemfile" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c333bb734fcdedcea57de1602543590f545f127dc8b533324318fd492c5c70b" +dependencies = [ + "base64", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ede67b28608b4c60685c7d54122d4400d90f62b40caee7700e700380a390fa8" + +[[package]] name = "rustls-webpki" version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2734,6 +2865,28 @@ dependencies = [ ] [[package]] +name = "snafu" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" +dependencies = [ + "heck", + "proc-macro2 1.0.76", + "quote 1.0.35", + "syn 1.0.109", +] + +[[package]] name = "socket2" version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3215,7 +3368,7 @@ dependencies = [ "prost 0.12.3", "rustls", "rustls-native-certs", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "tokio", "tokio-rustls", "tokio-stream", @@ -3411,11 +3564,13 @@ dependencies = [ "bytes", "data-encoding", "digest", + "fastcdc", "fuse-backend-rs", "futures", "hex-literal", "lazy_static", "libc", + "object_store", "parking_lot 0.12.1", "pin-project-lite", "prost 0.12.3", @@ -3441,6 +3596,7 @@ dependencies = [ "vm-memory", "vmm-sys-util", "walkdir", + "zstd", ] [[package]] @@ -3962,6 +4118,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + +[[package]] name = "windows-sys" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 06a4b8a99c73..6fc42a71b59f 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -218,6 +218,32 @@ rec { }; resolvedDefaultFeatures = [ "default" "perf-literal" "std" ]; }; + "android-tzdata" = rec { + crateName = "android-tzdata"; + version = "0.1.1"; + edition = "2018"; + sha256 = "1w7ynjxrfs97xg3qlcdns4kgfpwcdv824g611fq32cag4cdr96g9"; + authors = [ + "RumovZ" + ]; + + }; + "android_system_properties" = rec { + crateName = "android_system_properties"; + version = "0.1.5"; + edition = "2018"; + sha256 = "04b3wrz12837j7mdczqd95b732gw5q7q66cv4yn4646lvccp57l1"; + authors = [ + "Nicolas Silva <nical@fastmail.com>" + ]; + dependencies = [ + { + name = "libc"; + packageId = "libc"; + } + ]; + + }; "anes" = rec { crateName = "anes"; version = "0.1.6"; @@ -1242,6 +1268,68 @@ rec { "rustc-dep-of-std" = [ "core" "compiler_builtins" ]; }; }; + "chrono" = rec { + crateName = "chrono"; + version = "0.4.34"; + edition = "2021"; + sha256 = "12zk0ja924f55va2fs0qj34xaygq46fy92blmc7qkmcj9dj1bh2v"; + dependencies = [ + { + name = "android-tzdata"; + packageId = "android-tzdata"; + optional = true; + target = { target, features }: ("android" == target."os" or null); + } + { + name = "iana-time-zone"; + packageId = "iana-time-zone"; + optional = true; + target = { target, features }: (target."unix" or false); + features = [ "fallback" ]; + } + { + name = "num-traits"; + packageId = "num-traits"; + usesDefaultFeatures = false; + } + { + name = "serde"; + packageId = "serde"; + optional = true; + usesDefaultFeatures = false; + } + { + name = "windows-targets"; + packageId = "windows-targets 0.52.0"; + optional = true; + target = { target, features }: (target."windows" or false); + } + ]; + features = { + "android-tzdata" = [ "dep:android-tzdata" ]; + "arbitrary" = [ "dep:arbitrary" ]; + "clock" = [ "winapi" "iana-time-zone" "android-tzdata" "now" ]; + "default" = [ "clock" "std" "oldtime" "wasmbind" ]; + "iana-time-zone" = [ "dep:iana-time-zone" ]; + "js-sys" = [ "dep:js-sys" ]; + "now" = [ "std" ]; + "pure-rust-locales" = [ "dep:pure-rust-locales" ]; + "rkyv" = [ "dep:rkyv" "rkyv/size_32" ]; + "rkyv-16" = [ "dep:rkyv" "rkyv?/size_16" ]; + "rkyv-32" = [ "dep:rkyv" "rkyv?/size_32" ]; + "rkyv-64" = [ "dep:rkyv" "rkyv?/size_64" ]; + "rkyv-validation" = [ "rkyv?/validation" ]; + "rustc-serialize" = [ "dep:rustc-serialize" ]; + "serde" = [ "dep:serde" ]; + "std" = [ "alloc" ]; + "unstable-locales" = [ "pure-rust-locales" ]; + "wasm-bindgen" = [ "dep:wasm-bindgen" ]; + "wasmbind" = [ "wasm-bindgen" "js-sys" ]; + "winapi" = [ "windows-targets" ]; + "windows-targets" = [ "dep:windows-targets" ]; + }; + resolvedDefaultFeatures = [ "alloc" "android-tzdata" "clock" "iana-time-zone" "now" "serde" "std" "winapi" "windows-targets" ]; + }; "ciborium" = rec { crateName = "ciborium"; version = "0.2.1"; @@ -2161,6 +2249,17 @@ rec { ]; }; + "doc-comment" = rec { + crateName = "doc-comment"; + version = "0.3.3"; + edition = "2015"; + sha256 = "043sprsf3wl926zmck1bm7gw0jq50mb76lkpk49vasfr6ax1p97y"; + libName = "doc_comment"; + authors = [ + "Guillaume Gomez <guillaume1.gomez@gmail.com>" + ]; + features = { }; + }; "document-features" = rec { crateName = "document-features"; version = "0.2.8"; @@ -2405,6 +2504,47 @@ rec { ]; features = { }; }; + "fastcdc" = rec { + crateName = "fastcdc"; + version = "3.1.0"; + edition = "2018"; + sha256 = "1wi82qd58j3ysf8m2dhb092qga6rj1wwbppgsajabadzjz862457"; + authors = [ + "Nathan Fiedler <nathanfiedler@fastmail.fm>" + ]; + dependencies = [ + { + name = "async-stream"; + packageId = "async-stream"; + optional = true; + } + { + name = "tokio"; + packageId = "tokio"; + optional = true; + features = [ "io-util" ]; + } + { + name = "tokio-stream"; + packageId = "tokio-stream"; + optional = true; + } + ]; + devDependencies = [ + { + name = "tokio"; + packageId = "tokio"; + features = [ "fs" "io-util" "rt" "rt-multi-thread" "macros" ]; + } + ]; + features = { + "async-stream" = [ "dep:async-stream" ]; + "futures" = [ "dep:futures" ]; + "tokio" = [ "dep:tokio" "tokio-stream" "async-stream" ]; + "tokio-stream" = [ "dep:tokio-stream" ]; + }; + resolvedDefaultFeatures = [ "async-stream" "default" "tokio" "tokio-stream" ]; + }; "fastrand" = rec { crateName = "fastrand"; version = "2.0.1"; @@ -3411,6 +3551,16 @@ rec { ]; }; + "humantime" = rec { + crateName = "humantime"; + version = "2.1.0"; + edition = "2018"; + sha256 = "1r55pfkkf5v0ji1x6izrjwdq9v6sc7bv99xj6srywcar37xmnfls"; + authors = [ + "Paul Colomiets <paul@colomiets.name>" + ]; + + }; "hyper" = rec { crateName = "hyper"; version = "0.14.28"; @@ -3628,6 +3778,67 @@ rec { ]; }; + "iana-time-zone" = rec { + crateName = "iana-time-zone"; + version = "0.1.60"; + edition = "2018"; + sha256 = "0hdid5xz3jznm04lysjm3vi93h3c523w0hcc3xba47jl3ddbpzz7"; + authors = [ + "Andrew Straw <strawman@astraw.com>" + "René Kijewski <rene.kijewski@fu-berlin.de>" + "Ryan Lopopolo <rjl@hyperbo.la>" + ]; + dependencies = [ + { + name = "android_system_properties"; + packageId = "android_system_properties"; + target = { target, features }: ("android" == target."os" or null); + } + { + name = "core-foundation-sys"; + packageId = "core-foundation-sys"; + target = { target, features }: (("macos" == target."os" or null) || ("ios" == target."os" or null)); + } + { + name = "iana-time-zone-haiku"; + packageId = "iana-time-zone-haiku"; + target = { target, features }: ("haiku" == target."os" or null); + } + { + name = "js-sys"; + packageId = "js-sys"; + target = { target, features }: ("wasm32" == target."arch" or null); + } + { + name = "wasm-bindgen"; + packageId = "wasm-bindgen"; + target = { target, features }: ("wasm32" == target."arch" or null); + } + { + name = "windows-core"; + packageId = "windows-core"; + target = { target, features }: ("windows" == target."os" or null); + } + ]; + features = { }; + resolvedDefaultFeatures = [ "fallback" ]; + }; + "iana-time-zone-haiku" = rec { + crateName = "iana-time-zone-haiku"; + version = "0.1.2"; + edition = "2018"; + sha256 = "17r6jmj31chn7xs9698r122mapq85mfnv98bb4pg6spm0si2f67k"; + authors = [ + "René Kijewski <crates.io@k6i.de>" + ]; + buildDependencies = [ + { + name = "cc"; + packageId = "cc"; + } + ]; + + }; "idna" = rec { crateName = "idna"; version = "0.5.0"; @@ -5140,6 +5351,165 @@ rec { }; resolvedDefaultFeatures = [ "archive" "coff" "elf" "macho" "pe" "read_core" "unaligned" ]; }; + "object_store" = rec { + crateName = "object_store"; + version = "0.9.1"; + edition = "2021"; + sha256 = "1cwx0xg57cp3z6xjgrqwp0gxgxsagls4h5cd212pmxpxcn5qywdq"; + dependencies = [ + { + name = "async-trait"; + packageId = "async-trait"; + } + { + name = "base64"; + packageId = "base64"; + optional = true; + usesDefaultFeatures = false; + features = [ "std" ]; + } + { + name = "bytes"; + packageId = "bytes"; + } + { + name = "chrono"; + packageId = "chrono"; + usesDefaultFeatures = false; + features = [ "clock" ]; + } + { + name = "futures"; + packageId = "futures"; + } + { + name = "humantime"; + packageId = "humantime"; + } + { + name = "hyper"; + packageId = "hyper"; + optional = true; + usesDefaultFeatures = false; + } + { + name = "itertools"; + packageId = "itertools 0.12.0"; + } + { + name = "md-5"; + packageId = "md-5"; + optional = true; + usesDefaultFeatures = false; + } + { + name = "parking_lot"; + packageId = "parking_lot 0.12.1"; + } + { + name = "percent-encoding"; + packageId = "percent-encoding"; + } + { + name = "quick-xml"; + packageId = "quick-xml"; + optional = true; + features = [ "serialize" "overlapped-lists" ]; + } + { + name = "rand"; + packageId = "rand"; + optional = true; + usesDefaultFeatures = false; + features = [ "std" "std_rng" ]; + } + { + name = "reqwest"; + packageId = "reqwest"; + optional = true; + usesDefaultFeatures = false; + features = [ "rustls-tls-native-roots" ]; + } + { + name = "ring"; + packageId = "ring"; + optional = true; + usesDefaultFeatures = false; + features = [ "std" ]; + } + { + name = "rustls-pemfile"; + packageId = "rustls-pemfile 2.1.0"; + optional = true; + usesDefaultFeatures = false; + features = [ "std" ]; + } + { + name = "serde"; + packageId = "serde"; + optional = true; + usesDefaultFeatures = false; + features = [ "derive" ]; + } + { + name = "serde_json"; + packageId = "serde_json"; + optional = true; + usesDefaultFeatures = false; + } + { + name = "snafu"; + packageId = "snafu"; + } + { + name = "tokio"; + packageId = "tokio"; + features = [ "sync" "macros" "rt" "time" "io-util" ]; + } + { + name = "tracing"; + packageId = "tracing"; + } + { + name = "url"; + packageId = "url"; + } + { + name = "walkdir"; + packageId = "walkdir"; + } + ]; + devDependencies = [ + { + name = "hyper"; + packageId = "hyper"; + features = [ "server" ]; + } + { + name = "rand"; + packageId = "rand"; + } + ]; + features = { + "aws" = [ "cloud" "md-5" ]; + "azure" = [ "cloud" ]; + "base64" = [ "dep:base64" ]; + "cloud" = [ "serde" "serde_json" "quick-xml" "hyper" "reqwest" "reqwest/json" "reqwest/stream" "chrono/serde" "base64" "rand" "ring" ]; + "gcp" = [ "cloud" "rustls-pemfile" ]; + "http" = [ "cloud" ]; + "hyper" = [ "dep:hyper" ]; + "md-5" = [ "dep:md-5" ]; + "quick-xml" = [ "dep:quick-xml" ]; + "rand" = [ "dep:rand" ]; + "reqwest" = [ "dep:reqwest" ]; + "ring" = [ "dep:ring" ]; + "rustls-pemfile" = [ "dep:rustls-pemfile" ]; + "serde" = [ "dep:serde" ]; + "serde_json" = [ "dep:serde_json" ]; + "tls-webpki-roots" = [ "reqwest?/rustls-tls-webpki-roots" ]; + }; + resolvedDefaultFeatures = [ "aws" "azure" "base64" "cloud" "gcp" "http" "hyper" "md-5" "quick-xml" "rand" "reqwest" "ring" "rustls-pemfile" "serde" "serde_json" ]; + }; "once_cell" = rec { crateName = "once_cell"; version = "1.19.0"; @@ -6419,6 +6789,35 @@ rec { ]; }; + "quick-xml" = rec { + crateName = "quick-xml"; + version = "0.31.0"; + edition = "2021"; + sha256 = "0cravqanylzh5cq2v6hzlfqgxcid5nrp2snnb3pf4m0and2a610h"; + dependencies = [ + { + name = "memchr"; + packageId = "memchr"; + } + { + name = "serde"; + packageId = "serde"; + optional = true; + } + ]; + features = { + "arbitrary" = [ "dep:arbitrary" ]; + "async-tokio" = [ "tokio" ]; + "document-features" = [ "dep:document-features" ]; + "encoding" = [ "encoding_rs" ]; + "encoding_rs" = [ "dep:encoding_rs" ]; + "serde" = [ "dep:serde" ]; + "serde-types" = [ "serde/derive" ]; + "serialize" = [ "serde" ]; + "tokio" = [ "dep:tokio" ]; + }; + resolvedDefaultFeatures = [ "default" "overlapped-lists" "serde" "serialize" ]; + }; "quote 0.6.13" = rec { crateName = "quote"; version = "0.6.13"; @@ -7013,7 +7412,7 @@ rec { } { name = "rustls-pemfile"; - packageId = "rustls-pemfile"; + packageId = "rustls-pemfile 1.0.4"; optional = true; target = { target, features }: (!("wasm32" == target."arch" or null)); } @@ -7170,7 +7569,7 @@ rec { "wasm-streams" = [ "dep:wasm-streams" ]; "webpki-roots" = [ "dep:webpki-roots" ]; }; - resolvedDefaultFeatures = [ "__rustls" "__tls" "hyper-rustls" "rustls" "rustls-native-certs" "rustls-pemfile" "rustls-tls-native-roots" "stream" "tokio-rustls" "tokio-util" "wasm-streams" ]; + resolvedDefaultFeatures = [ "__rustls" "__tls" "hyper-rustls" "json" "rustls" "rustls-native-certs" "rustls-pemfile" "rustls-tls-native-roots" "serde_json" "stream" "tokio-rustls" "tokio-util" "wasm-streams" ]; }; "ring" = rec { crateName = "ring"; @@ -7230,7 +7629,7 @@ rec { "std" = [ "alloc" ]; "wasm32_unknown_unknown_js" = [ "getrandom/js" ]; }; - resolvedDefaultFeatures = [ "alloc" "default" "dev_urandom_fallback" ]; + resolvedDefaultFeatures = [ "alloc" "default" "dev_urandom_fallback" "std" ]; }; "rnix" = rec { crateName = "rnix"; @@ -7584,7 +7983,7 @@ rec { } { name = "rustls-pemfile"; - packageId = "rustls-pemfile"; + packageId = "rustls-pemfile 1.0.4"; } { name = "schannel"; @@ -7599,7 +7998,7 @@ rec { ]; }; - "rustls-pemfile" = rec { + "rustls-pemfile 1.0.4" = rec { crateName = "rustls-pemfile"; version = "1.0.4"; edition = "2018"; @@ -7612,6 +8011,41 @@ rec { ]; }; + "rustls-pemfile 2.1.0" = rec { + crateName = "rustls-pemfile"; + version = "2.1.0"; + edition = "2018"; + sha256 = "02y7qn9d93ri4hrm72yw4zqlbxch6ma045nyazmdrppw6jvkncrw"; + dependencies = [ + { + name = "base64"; + packageId = "base64"; + usesDefaultFeatures = false; + features = [ "alloc" ]; + } + { + name = "rustls-pki-types"; + packageId = "rustls-pki-types"; + rename = "pki-types"; + } + ]; + features = { + "default" = [ "std" ]; + "std" = [ "base64/std" ]; + }; + resolvedDefaultFeatures = [ "std" ]; + }; + "rustls-pki-types" = rec { + crateName = "rustls-pki-types"; + version = "1.3.1"; + edition = "2021"; + sha256 = "1a0g7453h07701vyxjj05gv903a0shi43mf7hl3cdd08hsr6gpjy"; + features = { + "default" = [ "alloc" ]; + "std" = [ "alloc" ]; + }; + resolvedDefaultFeatures = [ "alloc" "default" ]; + }; "rustls-webpki" = rec { crateName = "rustls-webpki"; version = "0.101.7"; @@ -8372,6 +8806,74 @@ rec { }; resolvedDefaultFeatures = [ "default" "std" ]; }; + "snafu" = rec { + crateName = "snafu"; + version = "0.7.5"; + edition = "2018"; + sha256 = "1mj2j2gfbf8mm1hr02zrbrqrh2zp01f61xgkx0lpln2w0ankgpp4"; + authors = [ + "Jake Goulding <jake.goulding@gmail.com>" + ]; + dependencies = [ + { + name = "doc-comment"; + packageId = "doc-comment"; + usesDefaultFeatures = false; + } + { + name = "snafu-derive"; + packageId = "snafu-derive"; + } + ]; + features = { + "backtrace" = [ "dep:backtrace" ]; + "backtraces" = [ "std" "backtrace" ]; + "backtraces-impl-backtrace-crate" = [ "backtraces" ]; + "default" = [ "std" "rust_1_46" ]; + "futures" = [ "futures-core-crate" "pin-project" ]; + "futures-core-crate" = [ "dep:futures-core-crate" ]; + "futures-crate" = [ "dep:futures-crate" ]; + "internal-dev-dependencies" = [ "futures-crate" ]; + "pin-project" = [ "dep:pin-project" ]; + "rust_1_39" = [ "snafu-derive/rust_1_39" ]; + "rust_1_46" = [ "rust_1_39" "snafu-derive/rust_1_46" ]; + "rust_1_61" = [ "rust_1_46" "snafu-derive/rust_1_61" ]; + "unstable-backtraces-impl-std" = [ "backtraces-impl-std" "snafu-derive/unstable-backtraces-impl-std" ]; + "unstable-provider-api" = [ "snafu-derive/unstable-provider-api" ]; + }; + resolvedDefaultFeatures = [ "default" "rust_1_39" "rust_1_46" "std" ]; + }; + "snafu-derive" = rec { + crateName = "snafu-derive"; + version = "0.7.5"; + edition = "2018"; + sha256 = "1gzy9rzggs090zf7hfvgp4lm1glrmg9qzh796686jnq7bxk7j04r"; + procMacro = true; + authors = [ + "Jake Goulding <jake.goulding@gmail.com>" + ]; + dependencies = [ + { + name = "heck"; + packageId = "heck"; + } + { + name = "proc-macro2"; + packageId = "proc-macro2 1.0.76"; + } + { + name = "quote"; + packageId = "quote 1.0.35"; + } + { + name = "syn"; + packageId = "syn 1.0.109"; + features = [ "full" ]; + } + ]; + features = { }; + resolvedDefaultFeatures = [ "rust_1_39" "rust_1_46" ]; + }; "socket2" = rec { crateName = "socket2"; version = "0.5.5"; @@ -9751,7 +10253,7 @@ rec { } { name = "rustls-pemfile"; - packageId = "rustls-pemfile"; + packageId = "rustls-pemfile 1.0.4"; optional = true; } { @@ -10673,6 +11175,11 @@ rec { packageId = "digest"; } { + name = "fastcdc"; + packageId = "fastcdc"; + features = [ "tokio" ]; + } + { name = "fuse-backend-rs"; packageId = "fuse-backend-rs"; optional = true; @@ -10691,6 +11198,11 @@ rec { optional = true; } { + name = "object_store"; + packageId = "object_store"; + features = [ "aws" "azure" "gcp" "http" ]; + } + { name = "parking_lot"; packageId = "parking_lot 0.12.1"; } @@ -10780,6 +11292,10 @@ rec { name = "walkdir"; packageId = "walkdir"; } + { + name = "zstd"; + packageId = "zstd"; + } ]; buildDependencies = [ { @@ -12836,6 +13352,23 @@ rec { ]; }; + "windows-core" = rec { + crateName = "windows-core"; + version = "0.52.0"; + edition = "2021"; + sha256 = "1nc3qv7sy24x0nlnb32f7alzpd6f72l4p24vl65vydbyil669ark"; + authors = [ + "Microsoft" + ]; + dependencies = [ + { + name = "windows-targets"; + packageId = "windows-targets 0.52.0"; + } + ]; + features = { }; + resolvedDefaultFeatures = [ "default" ]; + }; "windows-sys 0.48.0" = rec { crateName = "windows-sys"; version = "0.48.0"; diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index a4b74cba1118..36b2d7ad50ce 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -11,8 +11,10 @@ bstr = "1.6.0" bytes = "1.4.0" data-encoding = "2.3.3" digest = "0.10.7" +fastcdc = { version = "3.1.0", features = ["tokio"] } futures = "0.3.30" lazy_static = "1.4.0" +object_store = { version = "0.9.1", features = ["aws", "azure", "gcp", "http"] } parking_lot = "0.12.1" pin-project-lite = "0.2.13" prost = "0.12.1" @@ -26,6 +28,7 @@ tower = "0.4.13" tracing = "0.1.37" url = "2.4.0" walkdir = "2.4.0" +zstd = "0.13.0" [dependencies.fuse-backend-rs] optional = true diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index 15d5ab459d07..478998a32c5c 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -10,6 +10,7 @@ mod from_addr; mod grpc; mod memory; mod naive_seeker; +mod object_store; mod simplefs; mod sled; @@ -21,6 +22,7 @@ pub use self::combinator::CombinedBlobService; pub use self::from_addr::from_addr; pub use self::grpc::GRPCBlobService; pub use self::memory::MemoryBlobService; +pub use self::object_store::ObjectStoreBlobService; pub use self::simplefs::SimpleFilesystemBlobService; pub use self::sled::SledBlobService; @@ -80,4 +82,5 @@ pub trait BlobReader: tokio::io::AsyncRead + tokio::io::AsyncSeek + Send + Unpin impl BlobReader for io::Cursor<&'static [u8]> {} impl BlobReader for io::Cursor<&'static [u8; 0]> {} impl BlobReader for io::Cursor<Vec<u8>> {} +impl BlobReader for io::Cursor<bytes::Bytes> {} impl BlobReader for tokio::fs::File {} diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs new file mode 100644 index 000000000000..3adcd7850e65 --- /dev/null +++ b/tvix/castore/src/blobservice/object_store.rs @@ -0,0 +1,557 @@ +use std::{ + io::{self, Cursor}, + pin::pin, + sync::Arc, + task::Poll, +}; + +use data_encoding::HEXLOWER; +use fastcdc::v2020::AsyncStreamCDC; +use futures::Future; +use object_store::{path::Path, ObjectStore}; +use pin_project_lite::pin_project; +use prost::Message; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio_stream::StreamExt; +use tonic::async_trait; +use tracing::{debug, info, instrument, trace}; +use url::Url; + +use crate::{ + proto::{stat_blob_response::ChunkMeta, StatBlobResponse}, + B3Digest, B3HashingReader, +}; + +use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; + +#[derive(Clone)] +pub struct ObjectStoreBlobService { + object_store: Arc<dyn ObjectStore>, + base_path: Path, + + /// Average chunk size for FastCDC, in bytes. + /// min value is half, max value double of that number. + avg_chunk_size: u32, +} + +/// Uses any object storage supported by the [object_store] crate to provide a +/// tvix-castore [BlobService]. +/// +/// # Data format +/// Data is organized in "blobs" and "chunks". +/// Blobs don't hold the actual data, but instead contain a list of more +/// granular chunks that assemble to the contents requested. +/// This allows clients to seek, and not download chunks they already have +/// locally, as it's referred to from other files. +/// Check `rpc_blobstore` and more general BlobStore docs on that. +/// +/// ## Blobs +/// Stored at `${base_path}/blobs/b3/$digest_key`. They contains the serialized +/// StatBlobResponse for the blob with the digest. +/// +/// ## Chunks +/// Chunks are stored at `${base_path}/chunks/b3/$digest_key`. They contain +/// the literal contents of the chunk, but are zstd-compressed. +/// +/// ## Digest key sharding +/// The blake3 digest encoded in lower hex, and sharded after the second +/// character. +/// The blob for "Hello World" is stored at +/// `${base_path}/blobs/b3/41/41f8394111eb713a22165c46c90ab8f0fd9399c92028fd6d288944b23ff5bf76`. +/// +/// This reduces the number of files in the same directory, which would be a +/// problem at least when using [object_store::local::LocalFileSystem]. +/// +/// # Future changes +/// There's no guarantees about this being a final format yet. +/// Once object_store gets support for additional metadata / content-types, +/// we can eliminate some requests (small blobs only consisting of a single +/// chunk can be stored as-is, without the blob index file). +/// It also allows signalling any compression of chunks in the content-type. +/// Migration *should* be possible by simply adding the right content-types to +/// all keys stored so far, but no promises ;-) +impl ObjectStoreBlobService { + /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// [object_store]. + /// Any path suffix becomes the base path of the object store. + /// additional options, the same as in [object_store::parse_url_opts] can + /// be passed. + pub fn parse_url_opts<I, K, V>(url: &Url, options: I) -> Result<Self, object_store::Error> + where + I: IntoIterator<Item = (K, V)>, + K: AsRef<str>, + V: Into<String>, + { + let (object_store, path) = object_store::parse_url_opts(url, options)?; + + Ok(Self { + object_store: Arc::new(object_store), + base_path: path, + avg_chunk_size: 256 * 1024, + }) + } + + /// Like [Self::parse_url_opts], except without the options. + pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> { + Self::parse_url_opts(url, Vec::<(String, String)>::new()) + } +} + +fn derive_blob_path(base_path: &Path, digest: &B3Digest) -> Path { + base_path + .child("blobs") + .child("b3") + .child(HEXLOWER.encode(&digest.as_slice()[..2])) + .child(HEXLOWER.encode(digest.as_slice())) +} + +fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path { + base_path + .child("chunks") + .child("b3") + .child(HEXLOWER.encode(&digest.as_slice()[..2])) + .child(HEXLOWER.encode(digest.as_slice())) +} + +#[async_trait] +impl BlobService for ObjectStoreBlobService { + #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> io::Result<bool> { + // TODO: clarify if this should work for chunks or not, and explicitly + // document in the proto docs. + let p = derive_blob_path(&self.base_path, digest); + + match self.object_store.head(&p).await { + Ok(_) => Ok(true), + Err(object_store::Error::NotFound { .. }) => Ok(false), + Err(e) => Err(e)?, + } + } + + #[instrument(skip_all, err, fields(blob.digest=%digest))] + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { + // handle reading the empty blob. + if digest.as_slice() == blake3::hash(b"").as_bytes() { + return Ok(Some(Box::new(Cursor::new(b"")) as Box<dyn BlobReader>)); + } + match self + .object_store + .get(&derive_chunk_path(&self.base_path, digest)) + .await + { + Ok(res) => { + // fetch the entire chunk into memory, decompress, ensure the b3 digest matches, + // and return a io::Cursor over that data. + // FUTUREWORK: use zstd::bulk to prevent decompression bombs + + let chunk_raw_bytes = res.bytes().await?; + let chunk_contents = zstd::stream::decode_all(Cursor::new(chunk_raw_bytes))?; + + if *digest != blake3::hash(&chunk_contents).as_bytes().into() { + Err(io::Error::other("chunk contents invalid"))?; + } + + Ok(Some(Box::new(Cursor::new(chunk_contents)))) + } + Err(object_store::Error::NotFound { .. }) => { + // NOTE: For public-facing things, we would want to stop here. + // Clients should fetch granularly, so they can make use of + // chunks they have locally. + // However, if this is used directly, without any caches, do the + // assembly here. + // This is subject to change, once we have store composition. + // TODO: make this configurable, and/or clarify behaviour for + // the gRPC server surface (explicitly document behaviour in the + // proto docs) + if let Some(chunks) = self.chunks(digest).await? { + let chunked_reader = ChunkedReader::from_chunks( + chunks.into_iter().map(|chunk| { + ( + chunk.digest.try_into().expect("invalid b3 digest"), + chunk.size, + ) + }), + Arc::new(self.clone()) as Arc<dyn BlobService>, + ); + + Ok(Some(Box::new(chunked_reader))) + } else { + // This is neither a chunk nor a blob, return None. + Ok(None) + } + } + Err(e) => Err(e.into()), + } + } + + #[instrument(skip_all)] + async fn open_write(&self) -> Box<dyn BlobWriter> { + // ObjectStoreBlobWriter implements AsyncWrite, but all the chunking + // needs an AsyncRead, so we create a pipe here. + // In its `AsyncWrite` implementation, `ObjectStoreBlobWriter` delegates + // writes to w. It periodically polls the future that's reading from the + // other side. + let (w, r) = tokio::io::duplex(self.avg_chunk_size as usize * 10); + + Box::new(ObjectStoreBlobWriter { + writer: Some(w), + fut: Some(Box::pin(chunk_and_upload( + r, + self.object_store.clone(), + self.base_path.clone(), + self.avg_chunk_size / 2, + self.avg_chunk_size, + self.avg_chunk_size * 2, + ))), + fut_output: None, + }) + } + + #[instrument(skip_all, err, fields(blob.digest=%digest))] + async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { + let p = derive_blob_path(&self.base_path, digest); + + match self.object_store.get(&p).await { + Ok(get_result) => { + // fetch the data at the blob path + let blob_data = get_result.bytes().await?; + // parse into StatBlobResponse + let stat_blob_response: StatBlobResponse = StatBlobResponse::decode(blob_data)?; + + Ok(Some(stat_blob_response.chunks)) + } + Err(object_store::Error::NotFound { .. }) => Ok(None), + Err(err) => Err(err.into()), + } + } +} + +/// Reads blob contents from a AsyncRead, chunks and uploads them. +/// On success, returns a [StatBlobResponse] pointing to the individual chunks. +#[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)] +async fn chunk_and_upload<R: AsyncRead + Unpin>( + r: R, + object_store: Arc<dyn ObjectStore>, + base_path: Path, + min_chunk_size: u32, + avg_chunk_size: u32, + max_chunk_size: u32, +) -> io::Result<B3Digest> { + // wrap reader with something calculating the blake3 hash of all data read. + let mut b3_r = B3HashingReader::from(r); + // set up a fastcdc chunker + let mut chunker = + AsyncStreamCDC::new(&mut b3_r, min_chunk_size, avg_chunk_size, max_chunk_size); + + /// This really should just belong into the closure at + /// `chunker.as_stream().then(|_| { … })``, but if we try to, rustc spits + /// higher-ranked lifetime errors at us. + async fn fastcdc_chunk_uploader( + resp: Result<fastcdc::v2020::ChunkData, fastcdc::v2020::Error>, + base_path: Path, + object_store: Arc<dyn ObjectStore>, + ) -> std::io::Result<ChunkMeta> { + let chunk_data = resp?; + let chunk_digest: B3Digest = blake3::hash(&chunk_data.data).as_bytes().into(); + let chunk_path = derive_chunk_path(&base_path, &chunk_digest); + + upload_chunk(object_store, chunk_digest, chunk_path, chunk_data.data).await + } + + // Use the fastcdc chunker to produce a stream of chunks, and upload these + // that don't exist to the backend. + let chunks = chunker + .as_stream() + .then(|resp| fastcdc_chunk_uploader(resp, base_path.clone(), object_store.clone())) + .collect::<io::Result<Vec<ChunkMeta>>>() + .await?; + + let stat_blob_response = StatBlobResponse { + chunks, + bao: "".into(), // still todo + }; + + // check for Blob, if it doesn't exist, persist. + let blob_digest: B3Digest = b3_r.digest().into(); + let blob_path = derive_blob_path(&base_path, &blob_digest); + + match object_store.head(&blob_path).await { + // blob already exists, nothing to do + Ok(_) => { + trace!( + blob.digest = %blob_digest, + blob.path = %blob_path, + "blob already exists on backend" + ); + } + // chunk does not yet exist, upload first + Err(object_store::Error::NotFound { .. }) => { + debug!( + blob.digest = %blob_digest, + blob.path = %blob_path, + "uploading blob" + ); + object_store + .put(&blob_path, stat_blob_response.encode_to_vec().into()) + .await?; + } + Err(err) => { + // other error + Err(err)? + } + } + + Ok(blob_digest) +} + +/// upload chunk if it doesn't exist yet. +#[instrument(skip_all, fields(chunk.digest = %chunk_digest, chunk.size = chunk_data.len(), chunk.path = %chunk_path), err)] +async fn upload_chunk( + object_store: Arc<dyn ObjectStore>, + chunk_digest: B3Digest, + chunk_path: Path, + chunk_data: Vec<u8>, +) -> std::io::Result<ChunkMeta> { + let chunk_size = chunk_data.len(); + match object_store.head(&chunk_path).await { + // chunk already exists, nothing to do + Ok(_) => { + info!("chunk already exists"); + } + + // chunk does not yet exist, compress and upload. + Err(object_store::Error::NotFound { .. }) => { + let chunk_data_compressed = + zstd::encode_all(Cursor::new(chunk_data), zstd::DEFAULT_COMPRESSION_LEVEL)?; + + info!(chunk.compressed_size=%chunk_data_compressed.len(), "uploading chunk"); + + object_store + .as_ref() + .put(&chunk_path, chunk_data_compressed.into()) + .await?; + } + // other error + Err(err) => Err(err)?, + } + + Ok(ChunkMeta { + digest: chunk_digest.into(), + size: chunk_size as u64, + }) +} + +pin_project! { + /// Takes care of blob uploads. + /// All writes are relayed to self.writer, and we continuously poll the + /// future (which will internally read from the other side of the pipe and + /// upload chunks). + /// Our BlobWriter::close() needs to drop self.writer, so the other side + /// will read EOF and can finalize the blob. + /// The future should then resolve and return the blob digest. + pub struct ObjectStoreBlobWriter<W, Fut> + where + W: AsyncWrite, + Fut: Future, + { + #[pin] + writer: Option<W>, + + #[pin] + fut: Option<Fut>, + + fut_output: Option<io::Result<B3Digest>> + } +} + +impl<W, Fut> tokio::io::AsyncWrite for ObjectStoreBlobWriter<W, Fut> +where + W: AsyncWrite + Send + Unpin, + Fut: Future, +{ + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<Result<usize, io::Error>> { + let this = self.project(); + // poll the future. + let fut = this.fut.as_pin_mut().expect("not future"); + let fut_p = fut.poll(cx); + // if it's ready, the only way this could have happened is that the + // upload failed, because we're only closing `self.writer` after all + // writes happened. + if fut_p.is_ready() { + return Poll::Ready(Err(io::Error::other("upload failed"))); + } + + // write to the underlying writer + this.writer + .as_pin_mut() + .expect("writer must be some") + .poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + let this = self.project(); + // poll the future. + let fut = this.fut.as_pin_mut().expect("not future"); + let fut_p = fut.poll(cx); + // if it's ready, the only way this could have happened is that the + // upload failed, because we're only closing `self.writer` after all + // writes happened. + if fut_p.is_ready() { + return Poll::Ready(Err(io::Error::other("upload failed"))); + } + + // Call poll_flush on the writer + this.writer + .as_pin_mut() + .expect("writer must be some") + .poll_flush(cx) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + // There's nothing to do on shutdown. We might have written some chunks + // that are nowhere else referenced, but cleaning them up here would be racy. + std::task::Poll::Ready(Ok(())) + } +} + +#[async_trait] +impl<W, Fut> BlobWriter for ObjectStoreBlobWriter<W, Fut> +where + W: AsyncWrite + Send + Unpin, + Fut: Future<Output = io::Result<B3Digest>> + Send + Unpin, +{ + async fn close(&mut self) -> io::Result<B3Digest> { + match self.writer.take() { + Some(mut writer) => { + // shut down the writer, so the other side will read EOF. + writer.shutdown().await?; + + // take out the future. + let fut = self.fut.take().expect("fut must be some"); + // await it. + let resp = pin!(fut).await; + + match resp.as_ref() { + // In the case of an Ok value, we store it in self.fut_output, + // so future calls to close can return that. + Ok(b3_digest) => { + self.fut_output = Some(Ok(b3_digest.clone())); + } + Err(e) => { + // for the error type, we need to cheat a bit, as + // they're not clone-able. + // Simply store a sloppy clone, with the same ErrorKind and message there. + self.fut_output = Some(Err(std::io::Error::new(e.kind(), e.to_string()))) + } + } + resp + } + None => { + // called a second time, return self.fut_output. + match self.fut_output.as_ref().unwrap() { + Ok(ref b3_digest) => Ok(b3_digest.clone()), + Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())), + } + } + } + } +} + +#[cfg(test)] +mod test { + use std::{io::Cursor, sync::Arc}; + + use test_case::test_case; + use url::Url; + + use super::chunk_and_upload; + use crate::{ + blobservice::{BlobService, ObjectStoreBlobService}, + fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST, EMPTY_BLOB_DIGEST}, + B3Digest, + }; + + #[test_case(&BLOB_A, &BLOB_A_DIGEST; "blob a")] + #[test_case(&BLOB_B, &BLOB_B_DIGEST; "blob b")] + #[test_case(&[], &EMPTY_BLOB_DIGEST; "empty blob")] + #[tokio::test] + async fn upload_blob(fix_blob_contents: &[u8], fix_blob_digest: &B3Digest) { + let blobsvc = + ObjectStoreBlobService::parse_url(&Url::parse("memory:///").unwrap()).unwrap(); + + // Initially, the blob should not be present + assert!(!blobsvc.has(fix_blob_digest).await.unwrap()); + + // Using the open_read should return a Ok(None). + // As for the empty blob, we're ok with it actually returning data too. + { + let resp = blobsvc + .open_read(fix_blob_digest) + .await + .expect("open should not fail"); + if fix_blob_digest != &*EMPTY_BLOB_DIGEST { + assert!(resp.is_none()); + } + } + + // upload blob + let mut bw = blobsvc.open_write().await; + tokio::io::copy(&mut Cursor::new(fix_blob_contents), &mut bw) + .await + .expect("copy succeeds"); + + let blob_digest = bw.close().await.expect("close succeeds"); + assert_eq!(fix_blob_digest, &blob_digest); + + // blob should be present now + assert!(blobsvc.has(fix_blob_digest).await.unwrap()); + + // reading it should return the same data. + let mut br = blobsvc + .open_read(fix_blob_digest) + .await + .expect("open succeeds") + .expect("is some"); + + let mut buf = Vec::new(); + tokio::io::copy(&mut br, &mut buf) + .await + .expect("copy must succeed"); + + assert_eq!(fix_blob_contents, buf, "read data should match"); + } + + /// Tests chunk_and_upload directly, bypassing the BlobWriter at open_write(). + #[tokio::test] + async fn test_chunk_and_upload() { + let blobsvc = Arc::new( + ObjectStoreBlobService::parse_url(&Url::parse("memory:///").unwrap()).unwrap(), + ); + + let blob_digest = chunk_and_upload( + &mut Cursor::new(BLOB_A.to_vec()), + blobsvc.object_store.clone(), + object_store::path::Path::from("/"), + 1024 / 2, + 1024, + 1024 * 2, + ) + .await + .expect("chunk_and_upload succeeds"); + + assert_eq!(BLOB_A_DIGEST.clone(), blob_digest); + + // Now we should have the blob + assert!(blobsvc.has(&BLOB_A_DIGEST).await.unwrap()); + } +} diff --git a/tvix/castore/src/blobservice/simplefs.rs b/tvix/castore/src/blobservice/simplefs.rs index b21db2808c23..2dcb24f34221 100644 --- a/tvix/castore/src/blobservice/simplefs.rs +++ b/tvix/castore/src/blobservice/simplefs.rs @@ -25,6 +25,7 @@ use super::{BlobReader, BlobService, BlobWriter}; /// /// **Disclaimer** : This very simple implementation is subject to change and does not give any /// final guarantees on the on-disk format. +/// TODO: migrate to object_store? #[derive(Clone)] pub struct SimpleFilesystemBlobService { /// Where the blobs are located on a filesystem already mounted. |