about summary refs log tree commit diff
path: root/tvix/nar-bridge
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/nar-bridge')
-rw-r--r--tvix/nar-bridge/Cargo.toml3
-rw-r--r--tvix/nar-bridge/src/lib.rs21
-rw-r--r--tvix/nar-bridge/src/nar.rs110
3 files changed, 132 insertions, 2 deletions
diff --git a/tvix/nar-bridge/Cargo.toml b/tvix/nar-bridge/Cargo.toml
index 920b81c45cbe..7dc3a82848b6 100644
--- a/tvix/nar-bridge/Cargo.toml
+++ b/tvix/nar-bridge/Cargo.toml
@@ -8,6 +8,7 @@ axum = { version = "0.7.5", features = ["http2"] }
 bytes = "1.4.0"
 clap = { version = "4.0", features = ["derive", "env"] }
 data-encoding = "2.3.3"
+futures = "0.3.30"
 itertools = "0.12.0"
 prost = "0.12.1"
 nix-compat = { path = "../nix-compat", features = ["async"] }
@@ -23,6 +24,8 @@ tracing = "0.1.37"
 tracing-subscriber = "0.3.16"
 url = "2.4.0"
 serde = { version = "1.0.204", features = ["derive"] }
+lru = "0.12.3"
+parking_lot = "0.12.3"
 
 [build-dependencies]
 prost-build = "0.12.1"
diff --git a/tvix/nar-bridge/src/lib.rs b/tvix/nar-bridge/src/lib.rs
index 5d9ec43c1cde..5f0e8c19d26a 100644
--- a/tvix/nar-bridge/src/lib.rs
+++ b/tvix/nar-bridge/src/lib.rs
@@ -1,18 +1,32 @@
-use axum::routing::head;
+use axum::routing::{head, put};
 use axum::{routing::get, Router};
+use lru::LruCache;
+use parking_lot::RwLock;
+use std::num::NonZeroUsize;
 use std::sync::Arc;
 use tvix_castore::blobservice::BlobService;
 use tvix_castore::directoryservice::DirectoryService;
+use tvix_castore::proto::node::Node;
 use tvix_store::pathinfoservice::PathInfoService;
 
 mod nar;
 mod narinfo;
 
+/// The capacity of the lookup table from NarHash to [Node].
+/// Should be bigger than the number of concurrent NAR upload.
+/// Cannot be [NonZeroUsize] here due to rust-analyzer going bananas.
+/// SAFETY: 1000 != 0
+const ROOT_NODES_CACHE_CAPACITY: usize = 1000;
+
 #[derive(Clone)]
 pub struct AppState {
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
     path_info_service: Arc<dyn PathInfoService>,
+
+    /// Lookup table from NarHash to [Node], necessary to populate the root_node
+    /// field of the PathInfo when processing the narinfo upload.
+    root_nodes: Arc<RwLock<LruCache<[u8; 32], Node>>>,
 }
 
 impl AppState {
@@ -25,6 +39,10 @@ impl AppState {
             blob_service,
             directory_service,
             path_info_service,
+            root_nodes: Arc::new(RwLock::new(LruCache::new({
+                // SAFETY: 1000 != 0
+                unsafe { NonZeroUsize::new_unchecked(ROOT_NODES_CACHE_CAPACITY) }
+            }))),
         }
     }
 }
@@ -32,6 +50,7 @@ impl AppState {
 pub fn gen_router(priority: u64) -> Router<AppState> {
     Router::new()
         .route("/", get(root))
+        .route("/nar/:nar_str", put(nar::put))
         .route("/nar/tvix-castore/:root_node_enc", get(nar::get))
         .route("/:narinfo_str", get(narinfo::get))
         .route("/:narinfo_str", head(narinfo::head))
diff --git a/tvix/nar-bridge/src/nar.rs b/tvix/nar-bridge/src/nar.rs
index 63cd86e49969..5bce0c973ef3 100644
--- a/tvix/nar-bridge/src/nar.rs
+++ b/tvix/nar-bridge/src/nar.rs
@@ -4,9 +4,13 @@ use axum::http::StatusCode;
 use axum::response::Response;
 use bytes::Bytes;
 use data_encoding::BASE64URL_NOPAD;
+use futures::TryStreamExt;
+use nix_compat::nixbase32;
 use serde::Deserialize;
+use std::io;
 use tokio_util::io::ReaderStream;
-use tracing::{instrument, warn};
+use tracing::{instrument, warn, Span};
+use tvix_store::nar::ingest_nar_and_hash;
 
 use crate::AppState;
 
@@ -75,3 +79,107 @@ pub async fn get(
         .body(Body::from_stream(ReaderStream::new(r)))
         .unwrap())
 }
+
+#[instrument(skip(blob_service, directory_service, request))]
+pub async fn put(
+    axum::extract::Path(nar_str): axum::extract::Path<String>,
+    axum::extract::State(AppState {
+        blob_service,
+        directory_service,
+        root_nodes,
+        ..
+    }): axum::extract::State<AppState>,
+    request: axum::extract::Request,
+) -> Result<&'static str, StatusCode> {
+    let nar_hash_expected = parse_nar_str(&nar_str)?;
+
+    let s = request.into_body().into_data_stream();
+
+    let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
+        warn!(err=%e, "failed to read request body");
+        io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
+    }));
+
+    // ingest the NAR
+    let (root_node, nar_hash_actual, nar_size) =
+        ingest_nar_and_hash(blob_service.clone(), directory_service.clone(), &mut r)
+            .await
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+            .map_err(|e| {
+                warn!(err=%e, "failed to ingest nar");
+                StatusCode::INTERNAL_SERVER_ERROR
+            })?;
+
+    let s = Span::current();
+    s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
+    s.record("nar_size", nar_size);
+
+    if nar_hash_expected != nar_hash_actual {
+        warn!(
+            nar_hash.expected = nixbase32::encode(&nar_hash_expected),
+            nar_hash.actual = nixbase32::encode(&nar_hash_actual),
+            "nar hash mismatch"
+        );
+        return Err(StatusCode::BAD_REQUEST);
+    }
+
+    // store mapping of narhash to root node into root_nodes.
+    // we need it later to populate the root node when accepting the PathInfo.
+    root_nodes.write().put(nar_hash_actual, root_node);
+
+    Ok("")
+}
+
+// FUTUREWORK: maybe head by narhash. Though not too critical, as we do
+// implement HEAD for .narinfo.
+
+/// Parses a `14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar`
+/// string and returns the nixbase32-decoded digest.
+/// No compression is supported.
+fn parse_nar_str(s: &str) -> Result<[u8; 32], StatusCode> {
+    if !s.is_char_boundary(52) {
+        warn!("invalid string, no char boundary at 32");
+        return Err(StatusCode::NOT_FOUND);
+    }
+
+    Ok(match s.split_at(52) {
+        (hash_str, ".nar") => {
+            // we know this is 52 bytes
+            let hash_str_fixed: [u8; 52] = hash_str.as_bytes().try_into().unwrap();
+            nixbase32::decode_fixed(hash_str_fixed).map_err(|e| {
+                warn!(err=%e, "invalid digest");
+                StatusCode::NOT_FOUND
+            })?
+        }
+        _ => {
+            warn!("invalid string");
+            return Err(StatusCode::BAD_REQUEST);
+        }
+    })
+}
+
+#[cfg(test)]
+mod test {
+    use super::parse_nar_str;
+    use hex_literal::hex;
+
+    #[test]
+    fn success() {
+        assert_eq!(
+            hex!("13a8cf7ca57f68a9f1752acee36a72a55187d3a954443c112818926f26109d91"),
+            parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar").unwrap()
+        )
+    }
+
+    #[test]
+    fn failure() {
+        assert!(
+            parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar.x").is_err()
+        );
+        assert!(
+            parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0k.nar.xz").is_err()
+        );
+        assert!(parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0").is_err());
+        assert!(parse_nar_str("14cx20k6z4hq508kqi2lm79qfld5f9mf7kiafpqsjs3zlmycza0🦊.nar").is_err())
+    }
+}