diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/Cargo.lock | 13 | ||||
-rw-r--r-- | tvix/Cargo.nix | 24 | ||||
-rw-r--r-- | tvix/nar-bridge/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/nar-bridge/src/lib.rs | 21 | ||||
-rw-r--r-- | tvix/nar-bridge/src/nar.rs | 110 |
5 files changed, 158 insertions, 13 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 9e58235fae35..08b06663eecd 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -2246,9 +2246,12 @@ dependencies = [ "bytes", "clap", "data-encoding", + "futures", "hex-literal", "itertools 0.12.0", + "lru", "nix-compat", + "parking_lot 0.12.3", "prost", "prost-build", "rstest", @@ -2436,7 +2439,7 @@ dependencies = [ "hyper 0.14.28", "itertools 0.12.0", "md-5", - "parking_lot 0.12.2", + "parking_lot 0.12.3", "percent-encoding", "quick-xml", "rand", @@ -2599,9 +2602,9 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", "parking_lot_core 0.9.9", @@ -4542,7 +4545,7 @@ dependencies = [ "lazy_static", "libc", "object_store", - "parking_lot 0.12.2", + "parking_lot 0.12.3", "petgraph", "pin-project-lite", "prost", @@ -4727,7 +4730,7 @@ dependencies = [ "lazy_static", "lru", "nix-compat", - "parking_lot 0.12.2", + "parking_lot 0.12.3", "pin-project-lite", "prost", "prost-build", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 221211c1c842..104c3dc825e1 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -6889,15 +6889,27 @@ rec { packageId = "data-encoding"; } { + name = "futures"; + packageId = "futures"; + } + { name = "itertools"; packageId = "itertools 0.12.0"; } { + name = "lru"; + packageId = "lru"; + } + { name = "nix-compat"; packageId = "nix-compat"; features = [ "async" ]; } { + name = "parking_lot"; + packageId = "parking_lot 0.12.3"; + } + { name = "prost"; packageId = "prost"; } @@ -7533,7 +7545,7 @@ rec { } { name = "parking_lot"; - packageId = "parking_lot 0.12.2"; + packageId = "parking_lot 0.12.3"; } { name = "percent-encoding"; @@ -8128,11 +8140,11 @@ rec { }; resolvedDefaultFeatures = [ "default" ]; }; - "parking_lot 0.12.2" = rec { + "parking_lot 0.12.3" = rec { crateName = "parking_lot"; - version = "0.12.2"; + version = "0.12.3"; edition = "2021"; - sha256 = "1ys2dzz6cysjmwyivwxczl1ljpcf5cj4qmhdj07d5bkc9z5g0jky"; + sha256 = "09ws9g6245iiq8z975h8ycf818a66q3c6zv4b5h8skpm7hc1igzi"; authors = [ "Amanieu d'Antras <amanieu@gmail.com>" ]; @@ -14484,7 +14496,7 @@ rec { } { name = "parking_lot"; - packageId = "parking_lot 0.12.2"; + packageId = "parking_lot 0.12.3"; } { name = "petgraph"; @@ -15238,7 +15250,7 @@ rec { } { name = "parking_lot"; - packageId = "parking_lot 0.12.2"; + packageId = "parking_lot 0.12.3"; } { name = "pin-project-lite"; diff --git a/tvix/nar-bridge/Cargo.toml b/tvix/nar-bridge/Cargo.toml index 920b81c45cbe..7dc3a82848b6 100644 --- a/tvix/nar-bridge/Cargo.toml +++ b/tvix/nar-bridge/Cargo.toml @@ -8,6 +8,7 @@ axum = { version = "0.7.5", features = ["http2"] } bytes = "1.4.0" clap = { version = "4.0", features = ["derive", "env"] } data-encoding = "2.3.3" +futures = "0.3.30" itertools = "0.12.0" prost = "0.12.1" nix-compat = { path = "../nix-compat", features = ["async"] } @@ -23,6 +24,8 @@ tracing = "0.1.37" tracing-subscriber = "0.3.16" url = "2.4.0" serde = { version = "1.0.204", features = ["derive"] } +lru = "0.12.3" +parking_lot = "0.12.3" [build-dependencies] prost-build = "0.12.1" diff --git a/tvix/nar-bridge/src/lib.rs b/tvix/nar-bridge/src/lib.rs index 5d9ec43c1cde..5f0e8c19d26a 100644 --- a/tvix/nar-bridge/src/lib.rs +++ b/tvix/nar-bridge/src/lib.rs @@ -1,18 +1,32 @@ -use axum::routing::head; +use axum::routing::{head, put}; use axum::{routing::get, Router}; +use lru::LruCache; +use parking_lot::RwLock; +use std::num::NonZeroUsize; use std::sync::Arc; use tvix_castore::blobservice::BlobService; use tvix_castore::directoryservice::DirectoryService; +use tvix_castore::proto::node::Node; use tvix_store::pathinfoservice::PathInfoService; mod nar; mod narinfo; +/// The capacity of the lookup table from NarHash to [Node]. +/// Should be bigger than the number of concurrent NAR upload. +/// Cannot be [NonZeroUsize] here due to rust-analyzer going bananas. +/// SAFETY: 1000 != 0 +const ROOT_NODES_CACHE_CAPACITY: usize = 1000; + #[derive(Clone)] pub struct AppState { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, path_info_service: Arc<dyn PathInfoService>, + + /// Lookup table from NarHash to [Node], necessary to populate the root_node + /// field of the PathInfo when processing the narinfo upload. + root_nodes: Arc<RwLock<LruCache<[u8; 32], Node>>>, } impl AppState { @@ -25,6 +39,10 @@ impl AppState { blob_service, directory_service, path_info_service, + root_nodes: Arc::new(RwLock::new(LruCache::new({ + // SAFETY: 1000 != 0 + unsafe { NonZeroUsize::new_unchecked(ROOT_NODES_CACHE_CAPACITY) } + }))), } } } @@ -32,6 +50,7 @@ impl AppState { pub fn gen_router(priority: u64) -> Router<AppState> { Router::new() .route("/", get(root)) + .route("/nar/:nar_str", put(nar::put)) .route("/nar/tvix-castore/:root_node_enc", get(nar::get)) .route("/:narinfo_str", get(narinfo::get)) .route("/:narinfo_str", head(narinfo::head)) diff --git a/tvix/nar-bridge/src/nar.rs b/tvix/nar-bridge/src/nar.rs index 63cd86e49969..5bce0c973ef3 100644 --- a/tvix/nar-bridge/src/nar.rs +++ b/tvix/nar-bridge/src/nar.rs @@ -4,9 +4,13 @@ use axum::http::StatusCode; use axum::response::Response; use bytes::Bytes; use data_encoding::BASE64URL_NOPAD; +use futures::TryStreamExt; +use nix_compat::nixbase32; use serde::Deserialize; +use std::io; use tokio_util::io::ReaderStream; -use tracing::{instrument, warn}; +use tracing::{instrument, warn, Span}; +use tvix_store::nar::ingest_nar_and_hash; use crate::AppState; @@ -75,3 +79,107 @@ pub async fn get( .body(Body::from_stream(ReaderStream::new(r))) .unwrap()) } + +#[instrument(skip(blob_service, directory_service, request))] +pub async fn put( + axum::extract::Path(nar_str): axum::extract::Path<String>, + axum::extract::State(AppState { + blob_service, + directory_service, + root_nodes, + .. + }): axum::extract::State<AppState>, + request: axum::extract::Request, +) -> Result<&'static str, StatusCode> { + let nar_hash_expected = parse_nar_str(&nar_str)?; + + let s = request.into_body().into_data_stream(); + + let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| { + warn!(err=%e, "failed to read request body"); + io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) + })); + + // ingest the NAR + let (root_node, nar_hash_actual, nar_size) = + ingest_nar_and_hash(blob_service.clone(), directory_service.clone(), &mut r) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .map_err(|e| { + warn!(err=%e, "failed to ingest nar"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let s = Span::current(); + s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected)); + s.record("nar_size", nar_size); + + if nar_hash_expected != nar_hash_actual { + warn!( + nar_hash.expected = nixbase32::encode(&nar_hash_expected), + nar_hash.actual = nixbase32::encode(&nar_hash_actual), + "nar hash mismatch" + ); + return Err(StatusCode::BAD_REQUEST); + } + + // store mapping of narhash to root node into root_nodes. + // we need it later to populate the root node when accepting the PathInfo. + root_nodes.write().put(nar_hash_actual, root_node); + + Ok("") +} + +// FUTUREWORK: maybe head by narhash. Though not too critical, as we do +// implement HEAD for .narinfo. + +/// Parses a `14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar` +/// string and returns the nixbase32-decoded digest. +/// No compression is supported. +fn parse_nar_str(s: &str) -> Result<[u8; 32], StatusCode> { + if !s.is_char_boundary(52) { + warn!("invalid string, no char boundary at 32"); + return Err(StatusCode::NOT_FOUND); + } + + Ok(match s.split_at(52) { + (hash_str, ".nar") => { + // we know this is 52 bytes + let hash_str_fixed: [u8; 52] = hash_str.as_bytes().try_into().unwrap(); + nixbase32::decode_fixed(hash_str_fixed).map_err(|e| { + warn!(err=%e, "invalid digest"); + StatusCode::NOT_FOUND + })? + } + _ => { + warn!("invalid string"); + return Err(StatusCode::BAD_REQUEST); + } + }) +} + +#[cfg(test)] +mod test { + use super::parse_nar_str; + use hex_literal::hex; + + #[test] + fn success() { + assert_eq!( + hex!("13a8cf7ca57f68a9f1752acee36a72a55187d3a954443c112818926f26109d91"), + parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar").unwrap() + ) + } + + #[test] + fn failure() { + assert!( + parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar.x").is_err() + ); + assert!( + parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar.xz").is_err() + ); + assert!(parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0").is_err()); + assert!(parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0🦊.nar").is_err()) + } +} |