about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-07-20T08·03+0200
committerflokli <flokli@flokli.de>2024-07-20T17·23+0000
commit5d906054da2cfa68f1de201641b54c41e37524b4 (patch)
tree66d34ffda880b6b13dc93ceb416c8ab240e7875f
parent861cc1f341d6774397f6505027f7d8bcc15291f6 (diff)
feat(tvix/nar-bridge): support uploading NAR files r/8377
This ingests NAR files into the {Blob,Directory}Service, which are
already part of the AppState.

As we then need to correlate the root node to the uploaded PathInfo, we
need to keep a (short-lived) lookup table from NARHash to root node
around. We insert it into a `LruCache` after the NAR is uploaded, and
use `peek()` to do the lookup, which doesn't update the LRU list.

Change-Id: I48a4c6246bacf76559c5a4ccad2a0bc25c1b7900
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11986
Tested-by: BuildkiteCI
Reviewed-by: Brian Olsen <me@griff.name>
-rw-r--r--tvix/Cargo.lock13
-rw-r--r--tvix/Cargo.nix24
-rw-r--r--tvix/nar-bridge/Cargo.toml3
-rw-r--r--tvix/nar-bridge/src/lib.rs21
-rw-r--r--tvix/nar-bridge/src/nar.rs110
5 files changed, 158 insertions, 13 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 9e58235fae35..08b06663eecd 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -2246,9 +2246,12 @@ dependencies = [
  "bytes",
  "clap",
  "data-encoding",
+ "futures",
  "hex-literal",
  "itertools 0.12.0",
+ "lru",
  "nix-compat",
+ "parking_lot 0.12.3",
  "prost",
  "prost-build",
  "rstest",
@@ -2436,7 +2439,7 @@ dependencies = [
  "hyper 0.14.28",
  "itertools 0.12.0",
  "md-5",
- "parking_lot 0.12.2",
+ "parking_lot 0.12.3",
  "percent-encoding",
  "quick-xml",
  "rand",
@@ -2599,9 +2602,9 @@ dependencies = [
 
 [[package]]
 name = "parking_lot"
-version = "0.12.2"
+version = "0.12.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb"
+checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
 dependencies = [
  "lock_api",
  "parking_lot_core 0.9.9",
@@ -4542,7 +4545,7 @@ dependencies = [
  "lazy_static",
  "libc",
  "object_store",
- "parking_lot 0.12.2",
+ "parking_lot 0.12.3",
  "petgraph",
  "pin-project-lite",
  "prost",
@@ -4727,7 +4730,7 @@ dependencies = [
  "lazy_static",
  "lru",
  "nix-compat",
- "parking_lot 0.12.2",
+ "parking_lot 0.12.3",
  "pin-project-lite",
  "prost",
  "prost-build",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 221211c1c842..104c3dc825e1 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -6889,15 +6889,27 @@ rec {
             packageId = "data-encoding";
           }
           {
+            name = "futures";
+            packageId = "futures";
+          }
+          {
             name = "itertools";
             packageId = "itertools 0.12.0";
           }
           {
+            name = "lru";
+            packageId = "lru";
+          }
+          {
             name = "nix-compat";
             packageId = "nix-compat";
             features = [ "async" ];
           }
           {
+            name = "parking_lot";
+            packageId = "parking_lot 0.12.3";
+          }
+          {
             name = "prost";
             packageId = "prost";
           }
@@ -7533,7 +7545,7 @@ rec {
           }
           {
             name = "parking_lot";
-            packageId = "parking_lot 0.12.2";
+            packageId = "parking_lot 0.12.3";
           }
           {
             name = "percent-encoding";
@@ -8128,11 +8140,11 @@ rec {
         };
         resolvedDefaultFeatures = [ "default" ];
       };
-      "parking_lot 0.12.2" = rec {
+      "parking_lot 0.12.3" = rec {
         crateName = "parking_lot";
-        version = "0.12.2";
+        version = "0.12.3";
         edition = "2021";
-        sha256 = "1ys2dzz6cysjmwyivwxczl1ljpcf5cj4qmhdj07d5bkc9z5g0jky";
+        sha256 = "09ws9g6245iiq8z975h8ycf818a66q3c6zv4b5h8skpm7hc1igzi";
         authors = [
           "Amanieu d'Antras <amanieu@gmail.com>"
         ];
@@ -14484,7 +14496,7 @@ rec {
           }
           {
             name = "parking_lot";
-            packageId = "parking_lot 0.12.2";
+            packageId = "parking_lot 0.12.3";
           }
           {
             name = "petgraph";
@@ -15238,7 +15250,7 @@ rec {
           }
           {
             name = "parking_lot";
-            packageId = "parking_lot 0.12.2";
+            packageId = "parking_lot 0.12.3";
           }
           {
             name = "pin-project-lite";
diff --git a/tvix/nar-bridge/Cargo.toml b/tvix/nar-bridge/Cargo.toml
index 920b81c45cbe..7dc3a82848b6 100644
--- a/tvix/nar-bridge/Cargo.toml
+++ b/tvix/nar-bridge/Cargo.toml
@@ -8,6 +8,7 @@ axum = { version = "0.7.5", features = ["http2"] }
 bytes = "1.4.0"
 clap = { version = "4.0", features = ["derive", "env"] }
 data-encoding = "2.3.3"
+futures = "0.3.30"
 itertools = "0.12.0"
 prost = "0.12.1"
 nix-compat = { path = "../nix-compat", features = ["async"] }
@@ -23,6 +24,8 @@ tracing = "0.1.37"
 tracing-subscriber = "0.3.16"
 url = "2.4.0"
 serde = { version = "1.0.204", features = ["derive"] }
+lru = "0.12.3"
+parking_lot = "0.12.3"
 
 [build-dependencies]
 prost-build = "0.12.1"
diff --git a/tvix/nar-bridge/src/lib.rs b/tvix/nar-bridge/src/lib.rs
index 5d9ec43c1cde..5f0e8c19d26a 100644
--- a/tvix/nar-bridge/src/lib.rs
+++ b/tvix/nar-bridge/src/lib.rs
@@ -1,18 +1,32 @@
-use axum::routing::head;
+use axum::routing::{head, put};
 use axum::{routing::get, Router};
+use lru::LruCache;
+use parking_lot::RwLock;
+use std::num::NonZeroUsize;
 use std::sync::Arc;
 use tvix_castore::blobservice::BlobService;
 use tvix_castore::directoryservice::DirectoryService;
+use tvix_castore::proto::node::Node;
 use tvix_store::pathinfoservice::PathInfoService;
 
 mod nar;
 mod narinfo;
 
+/// The capacity of the lookup table from NarHash to [Node].
+/// Should be bigger than the number of concurrent NAR upload.
+/// Cannot be [NonZeroUsize] here due to rust-analyzer going bananas.
+/// SAFETY: 1000 != 0
+const ROOT_NODES_CACHE_CAPACITY: usize = 1000;
+
 #[derive(Clone)]
 pub struct AppState {
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
     path_info_service: Arc<dyn PathInfoService>,
+
+    /// Lookup table from NarHash to [Node], necessary to populate the root_node
+    /// field of the PathInfo when processing the narinfo upload.
+    root_nodes: Arc<RwLock<LruCache<[u8; 32], Node>>>,
 }
 
 impl AppState {
@@ -25,6 +39,10 @@ impl AppState {
             blob_service,
             directory_service,
             path_info_service,
+            root_nodes: Arc::new(RwLock::new(LruCache::new({
+                // SAFETY: 1000 != 0
+                unsafe { NonZeroUsize::new_unchecked(ROOT_NODES_CACHE_CAPACITY) }
+            }))),
         }
     }
 }
@@ -32,6 +50,7 @@ impl AppState {
 pub fn gen_router(priority: u64) -> Router<AppState> {
     Router::new()
         .route("/", get(root))
+        .route("/nar/:nar_str", put(nar::put))
         .route("/nar/tvix-castore/:root_node_enc", get(nar::get))
         .route("/:narinfo_str", get(narinfo::get))
         .route("/:narinfo_str", head(narinfo::head))
diff --git a/tvix/nar-bridge/src/nar.rs b/tvix/nar-bridge/src/nar.rs
index 63cd86e49969..5bce0c973ef3 100644
--- a/tvix/nar-bridge/src/nar.rs
+++ b/tvix/nar-bridge/src/nar.rs
@@ -4,9 +4,13 @@ use axum::http::StatusCode;
 use axum::response::Response;
 use bytes::Bytes;
 use data_encoding::BASE64URL_NOPAD;
+use futures::TryStreamExt;
+use nix_compat::nixbase32;
 use serde::Deserialize;
+use std::io;
 use tokio_util::io::ReaderStream;
-use tracing::{instrument, warn};
+use tracing::{instrument, warn, Span};
+use tvix_store::nar::ingest_nar_and_hash;
 
 use crate::AppState;
 
@@ -75,3 +79,107 @@ pub async fn get(
         .body(Body::from_stream(ReaderStream::new(r)))
         .unwrap())
 }
+
+#[instrument(skip(blob_service, directory_service, request))]
+pub async fn put(
+    axum::extract::Path(nar_str): axum::extract::Path<String>,
+    axum::extract::State(AppState {
+        blob_service,
+        directory_service,
+        root_nodes,
+        ..
+    }): axum::extract::State<AppState>,
+    request: axum::extract::Request,
+) -> Result<&'static str, StatusCode> {
+    let nar_hash_expected = parse_nar_str(&nar_str)?;
+
+    let s = request.into_body().into_data_stream();
+
+    let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
+        warn!(err=%e, "failed to read request body");
+        io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
+    }));
+
+    // ingest the NAR
+    let (root_node, nar_hash_actual, nar_size) =
+        ingest_nar_and_hash(blob_service.clone(), directory_service.clone(), &mut r)
+            .await
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+            .map_err(|e| {
+                warn!(err=%e, "failed to ingest nar");
+                StatusCode::INTERNAL_SERVER_ERROR
+            })?;
+
+    let s = Span::current();
+    s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
+    s.record("nar_size", nar_size);
+
+    if nar_hash_expected != nar_hash_actual {
+        warn!(
+            nar_hash.expected = nixbase32::encode(&nar_hash_expected),
+            nar_hash.actual = nixbase32::encode(&nar_hash_actual),
+            "nar hash mismatch"
+        );
+        return Err(StatusCode::BAD_REQUEST);
+    }
+
+    // store mapping of narhash to root node into root_nodes.
+    // we need it later to populate the root node when accepting the PathInfo.
+    root_nodes.write().put(nar_hash_actual, root_node);
+
+    Ok("")
+}
+
+// FUTUREWORK: maybe head by narhash. Though not too critical, as we do
+// implement HEAD for .narinfo.
+
+/// Parses a `14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar`
+/// string and returns the nixbase32-decoded digest.
+/// No compression is supported.
+fn parse_nar_str(s: &str) -> Result<[u8; 32], StatusCode> {
+    if !s.is_char_boundary(52) {
+        warn!("invalid string, no char boundary at 32");
+        return Err(StatusCode::NOT_FOUND);
+    }
+
+    Ok(match s.split_at(52) {
+        (hash_str, ".nar") => {
+            // we know this is 52 bytes
+            let hash_str_fixed: [u8; 52] = hash_str.as_bytes().try_into().unwrap();
+            nixbase32::decode_fixed(hash_str_fixed).map_err(|e| {
+                warn!(err=%e, "invalid digest");
+                StatusCode::NOT_FOUND
+            })?
+        }
+        _ => {
+            warn!("invalid string");
+            return Err(StatusCode::BAD_REQUEST);
+        }
+    })
+}
+
+#[cfg(test)]
+mod test {
+    use super::parse_nar_str;
+    use hex_literal::hex;
+
+    #[test]
+    fn success() {
+        assert_eq!(
+            hex!("13a8cf7ca57f68a9f1752acee36a72a55187d3a954443c112818926f26109d91"),
+            parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar").unwrap()
+        )
+    }
+
+    #[test]
+    fn failure() {
+        assert!(
+            parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar.x").is_err()
+        );
+        assert!(
+            parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar.xz").is_err()
+        );
+        assert!(parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0").is_err());
+        assert!(parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0🦊.nar").is_err())
+    }
+}