about summary refs log tree commit diff
path: root/tvix/nar-bridge
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/nar-bridge')
-rw-r--r--tvix/nar-bridge/Cargo.toml48
-rw-r--r--tvix/nar-bridge/default.nix11
-rw-r--r--tvix/nar-bridge/src/bin/nar-bridge.rs103
-rw-r--r--tvix/nar-bridge/src/lib.rs75
-rw-r--r--tvix/nar-bridge/src/nar.rs203
-rw-r--r--tvix/nar-bridge/src/narinfo.rs153
6 files changed, 593 insertions, 0 deletions
diff --git a/tvix/nar-bridge/Cargo.toml b/tvix/nar-bridge/Cargo.toml
new file mode 100644
index 000000000000..ac23b597311f
--- /dev/null
+++ b/tvix/nar-bridge/Cargo.toml
@@ -0,0 +1,48 @@
+[package]
+name = "nar-bridge"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+axum = { workspace = true, features = ["http2"] }
+axum-extra = { workspace = true }
+axum-range = { workspace = true }
+tower = { workspace = true }
+tower-http = { workspace = true, features = ["trace"] }
+bytes = { workspace = true }
+clap = { workspace = true, features = ["derive", "env"] }
+data-encoding = { workspace = true }
+futures = { workspace = true }
+itertools = { workspace = true }
+prost = { workspace = true }
+nix-compat = { path = "../nix-compat", features = ["async"] }
+thiserror = { workspace = true }
+tokio = { workspace = true }
+tokio-listener = { workspace = true, features = ["axum07", "clap", "multi-listener", "sd_listen"] }
+tokio-util = { workspace = true, features = ["io", "io-util", "compat"] }
+tonic = { workspace = true, features = ["tls", "tls-roots"] }
+tvix-castore = { path = "../castore" }
+tvix-store = { path = "../store" }
+tvix-tracing = { path = "../tracing", features = ["tonic", "axum"] }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
+url = { workspace = true }
+serde = { workspace = true, features = ["derive"] }
+lru = { workspace = true }
+parking_lot = { workspace = true }
+mimalloc = { workspace = true }
+
+[build-dependencies]
+prost-build = { workspace = true }
+tonic-build = { workspace = true }
+
+[features]
+default = ["otlp"]
+otlp = ["tvix-tracing/otlp"]
+
+[dev-dependencies]
+hex-literal = { workspace = true }
+rstest = { workspace = true }
+
+[lints]
+workspace = true
diff --git a/tvix/nar-bridge/default.nix b/tvix/nar-bridge/default.nix
new file mode 100644
index 000000000000..2f1384e8211f
--- /dev/null
+++ b/tvix/nar-bridge/default.nix
@@ -0,0 +1,11 @@
+{ depot, lib, ... }:
+
+(depot.tvix.crates.workspaceMembers.nar-bridge.build.override {
+  runTests = true;
+}).overrideAttrs (old: rec {
+  meta.ci.targets = lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru);
+  passthru = old.passthru // (depot.tvix.utils.mkFeaturePowerset {
+    inherit (old) crateName;
+    features = [ "otlp" ];
+  });
+})
diff --git a/tvix/nar-bridge/src/bin/nar-bridge.rs b/tvix/nar-bridge/src/bin/nar-bridge.rs
new file mode 100644
index 000000000000..d9fce9dff322
--- /dev/null
+++ b/tvix/nar-bridge/src/bin/nar-bridge.rs
@@ -0,0 +1,103 @@
+use clap::Parser;
+use mimalloc::MiMalloc;
+use nar_bridge::AppState;
+use std::num::NonZeroUsize;
+use tower::ServiceBuilder;
+use tower_http::trace::{DefaultMakeSpan, TraceLayer};
+use tracing::info;
+use tvix_store::utils::ServiceUrlsGrpc;
+
+#[global_allocator]
+static GLOBAL: MiMalloc = MiMalloc;
+
+/// Expose the Nix HTTP Binary Cache protocol for a tvix-store.
+#[derive(Parser)]
+#[command(author, version, about, long_about = None)]
+struct Cli {
+    #[clap(flatten)]
+    service_addrs: ServiceUrlsGrpc,
+
+    /// The priority to announce at the `nix-cache-info` endpoint.
+    /// A lower number means it's *more preferred.
+    #[arg(long, env, default_value_t = 39)]
+    priority: u64,
+
+    /// The address to listen on.
+    #[clap(flatten)]
+    listen_args: tokio_listener::ListenerAddressLFlag,
+
+    /// The capacity of the lookup table from NarHash to [Node].
+    /// Should be bigger than the number of concurrent NAR uploads.
+    #[arg(long, env, default_value_t = NonZeroUsize::new(1000).unwrap())]
+    root_nodes_cache_capacity: NonZeroUsize,
+
+    #[cfg(feature = "otlp")]
+    /// Whether to configure OTLP. Set --otlp=false to disable.
+    #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))]
+    otlp: bool,
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
+    let cli = Cli::parse();
+
+    let _tracing_handle = {
+        #[allow(unused_mut)]
+        let mut builder = tvix_tracing::TracingBuilder::default();
+        #[cfg(feature = "otlp")]
+        {
+            if cli.otlp {
+                builder = builder.enable_otlp("tvix.store");
+            }
+        }
+        builder.build()?
+    };
+
+    // initialize stores
+    let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+        tvix_store::utils::construct_services(cli.service_addrs).await?;
+
+    let state = AppState::new(
+        blob_service,
+        directory_service,
+        path_info_service,
+        cli.root_nodes_cache_capacity,
+    );
+
+    let app = nar_bridge::gen_router(cli.priority)
+        .layer(
+            ServiceBuilder::new()
+                .layer(
+                    TraceLayer::new_for_http().make_span_with(
+                        DefaultMakeSpan::new()
+                            .level(tracing::Level::INFO)
+                            .include_headers(true),
+                    ),
+                )
+                .map_request(tvix_tracing::propagate::axum::accept_trace),
+        )
+        .with_state(state);
+
+    let listen_address = &cli.listen_args.listen_address.unwrap_or_else(|| {
+        "[::]:9000"
+            .parse()
+            .expect("invalid fallback listen address")
+    });
+
+    let listener = tokio_listener::Listener::bind(
+        listen_address,
+        &Default::default(),
+        &cli.listen_args.listener_options,
+    )
+    .await?;
+
+    info!(listen_address=%listen_address, "starting daemon");
+
+    tokio_listener::axum07::serve(
+        listener,
+        app.into_make_service_with_connect_info::<tokio_listener::SomeSocketAddrClonable>(),
+    )
+    .await?;
+
+    Ok(())
+}
diff --git a/tvix/nar-bridge/src/lib.rs b/tvix/nar-bridge/src/lib.rs
new file mode 100644
index 000000000000..db926e8cede4
--- /dev/null
+++ b/tvix/nar-bridge/src/lib.rs
@@ -0,0 +1,75 @@
+use axum::http::StatusCode;
+use axum::response::IntoResponse;
+use axum::routing::{head, put};
+use axum::{routing::get, Router};
+use lru::LruCache;
+use nix_compat::nix_http;
+use parking_lot::RwLock;
+use std::num::NonZeroUsize;
+use std::sync::Arc;
+use tvix_castore::blobservice::BlobService;
+use tvix_castore::directoryservice::DirectoryService;
+use tvix_castore::Node;
+use tvix_store::pathinfoservice::PathInfoService;
+
+mod nar;
+mod narinfo;
+
+#[derive(Clone)]
+pub struct AppState {
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+    path_info_service: Arc<dyn PathInfoService>,
+
+    /// Lookup table from NarHash to [Node], necessary to populate the root_node
+    /// field of the PathInfo when processing the narinfo upload.
+    root_nodes: Arc<RwLock<LruCache<[u8; 32], Node>>>,
+}
+
+impl AppState {
+    pub fn new(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+        path_info_service: Arc<dyn PathInfoService>,
+        root_nodes_cache_capacity: NonZeroUsize,
+    ) -> Self {
+        Self {
+            blob_service,
+            directory_service,
+            path_info_service,
+            root_nodes: Arc::new(RwLock::new(LruCache::new(root_nodes_cache_capacity))),
+        }
+    }
+}
+
+pub fn gen_router(priority: u64) -> Router<AppState> {
+    Router::new()
+        .route("/", get(root))
+        .route("/nar/:nar_str", get(four_o_four))
+        .route("/nar/:nar_str", head(nar::head_root_nodes))
+        .route("/nar/:nar_str", put(nar::put))
+        .route("/nar/tvix-castore/:root_node_enc", get(nar::get_head))
+        .route("/nar/tvix-castore/:root_node_enc", head(nar::get_head))
+        .route("/:narinfo_str", get(narinfo::get))
+        .route("/:narinfo_str", head(narinfo::head))
+        .route("/:narinfo_str", put(narinfo::put))
+        .route("/nix-cache-info", get(move || nix_cache_info(priority)))
+}
+
+async fn root() -> &'static str {
+    "Hello from nar-bridge"
+}
+
+async fn four_o_four() -> Result<(), StatusCode> {
+    Err(StatusCode::NOT_FOUND)
+}
+
+async fn nix_cache_info(priority: u64) -> impl IntoResponse {
+    (
+        [("Content-Type", nix_http::MIME_TYPE_CACHE_INFO)],
+        format!(
+            "StoreDir: /nix/store\nWantMassQuery: 1\nPriority: {}\n",
+            priority
+        ),
+    )
+}
diff --git a/tvix/nar-bridge/src/nar.rs b/tvix/nar-bridge/src/nar.rs
new file mode 100644
index 000000000000..292be2b1c5ec
--- /dev/null
+++ b/tvix/nar-bridge/src/nar.rs
@@ -0,0 +1,203 @@
+use axum::extract::Query;
+use axum::http::{Response, StatusCode};
+use axum::{body::Body, response::IntoResponse};
+use axum_extra::{headers::Range, TypedHeader};
+use axum_range::{KnownSize, Ranged};
+use bytes::Bytes;
+use data_encoding::BASE64URL_NOPAD;
+use futures::TryStreamExt;
+use nix_compat::{nix_http, nixbase32};
+use serde::Deserialize;
+use std::io;
+use tokio_util::io::ReaderStream;
+use tracing::{instrument, warn, Span};
+use tvix_store::nar::ingest_nar_and_hash;
+
+use crate::AppState;
+
+#[derive(Debug, Deserialize)]
+pub(crate) struct GetNARParams {
+    #[serde(rename = "narsize")]
+    nar_size: u64,
+}
+
+#[instrument(skip(blob_service, directory_service))]
+pub async fn get_head(
+    method: axum::http::Method,
+    ranges: Option<TypedHeader<Range>>,
+    axum::extract::Path(root_node_enc): axum::extract::Path<String>,
+    axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
+    axum::extract::State(AppState {
+        blob_service,
+        directory_service,
+        ..
+    }): axum::extract::State<AppState>,
+) -> Result<impl axum::response::IntoResponse, StatusCode> {
+    use prost::Message;
+    // b64decode the root node passed *by the user*
+    let root_node_proto = BASE64URL_NOPAD
+        .decode(root_node_enc.as_bytes())
+        .map_err(|e| {
+            warn!(err=%e, "unable to decode root node b64");
+            StatusCode::NOT_FOUND
+        })?;
+
+    // check the proto size to be somewhat reasonable before parsing it.
+    if root_node_proto.len() > 4096 {
+        warn!("rejected too large root node");
+        return Err(StatusCode::BAD_REQUEST);
+    }
+
+    // parse the proto
+    let root_node: tvix_castore::proto::Node = Message::decode(Bytes::from(root_node_proto))
+        .map_err(|e| {
+            warn!(err=%e, "unable to decode root node proto");
+            StatusCode::NOT_FOUND
+        })?;
+
+    let root_node = root_node.try_into_anonymous_node().map_err(|e| {
+        warn!(err=%e, "root node validation failed");
+        StatusCode::BAD_REQUEST
+    })?;
+
+    Ok((
+        // headers
+        [
+            ("cache-control", "max-age=31536000, immutable"),
+            ("content-type", nix_http::MIME_TYPE_NAR),
+        ],
+        if method == axum::http::Method::HEAD {
+            // If this is a HEAD request, construct a response returning back the
+            // user-provided content-length, but don't actually talk to castore.
+            Response::builder()
+                .header("content-length", nar_size)
+                .body(Body::empty())
+                .unwrap()
+        } else if let Some(TypedHeader(ranges)) = ranges {
+            // If this is a range request, construct a seekable NAR reader.
+            let r =
+                tvix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service)
+                    .await
+                    .map_err(|e| {
+                        warn!(err=%e, "failed to construct seekable nar reader");
+                        StatusCode::INTERNAL_SERVER_ERROR
+                    })?;
+
+            // ensure the user-supplied nar size was correct, no point returning data otherwise.
+            if r.stream_len() != nar_size {
+                warn!(
+                    actual_nar_size = r.stream_len(),
+                    supplied_nar_size = nar_size,
+                    "wrong nar size supplied"
+                );
+                return Err(StatusCode::BAD_REQUEST);
+            }
+            Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response()
+        } else {
+            // use the non-seekable codepath if there's no range(s) requested,
+            // as it uses less memory.
+            let (w, r) = tokio::io::duplex(1024 * 8);
+
+            // spawn a task rendering the NAR to the client.
+            tokio::spawn(async move {
+                if let Err(e) =
+                    tvix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
+                {
+                    warn!(err=%e, "failed to write out NAR");
+                }
+            });
+
+            Response::builder()
+                .header("content-length", nar_size)
+                .body(Body::from_stream(ReaderStream::new(r)))
+                .unwrap()
+        },
+    ))
+}
+
+/// Handler to respond to GET/HEAD requests for recently uploaded NAR files.
+/// Nix probes at {narhash}.nar[.compression_suffix] to determine whether a NAR
+/// has already been uploaded, by responding to (some of) these requests we
+/// avoid it unnecessarily uploading.
+/// We don't keep a full K/V from NAR hash to root note around, only the
+/// in-memory cache used to connect to the castore node when processing a PUT
+/// for the NARInfo.
+#[instrument(skip_all, fields(nar_str))]
+pub async fn head_root_nodes(
+    axum::extract::Path(nar_str): axum::extract::Path<String>,
+    axum::extract::State(AppState { root_nodes, .. }): axum::extract::State<AppState>,
+) -> Result<impl axum::response::IntoResponse, StatusCode> {
+    let (nar_hash, compression_suffix) =
+        nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
+
+    // No paths with compression suffix are supported.
+    if !compression_suffix.is_empty() {
+        warn!(%compression_suffix, "invalid compression suffix requested");
+        return Err(StatusCode::UNAUTHORIZED);
+    }
+
+    // Check root_nodes, updating the moving it to the most recently used,
+    // as it might be referred in a subsequent NARInfo upload.
+    if root_nodes.write().get(&nar_hash).is_some() {
+        Ok("")
+    } else {
+        Err(StatusCode::NOT_FOUND)
+    }
+}
+
+#[instrument(skip(blob_service, directory_service, request))]
+pub async fn put(
+    axum::extract::Path(nar_str): axum::extract::Path<String>,
+    axum::extract::State(AppState {
+        blob_service,
+        directory_service,
+        root_nodes,
+        ..
+    }): axum::extract::State<AppState>,
+    request: axum::extract::Request,
+) -> Result<&'static str, StatusCode> {
+    let (nar_hash_expected, compression_suffix) =
+        nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?;
+
+    // No paths with compression suffix are supported.
+    if !compression_suffix.is_empty() {
+        warn!(%compression_suffix, "invalid compression suffix requested");
+        return Err(StatusCode::UNAUTHORIZED);
+    }
+
+    let s = request.into_body().into_data_stream();
+
+    let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| {
+        warn!(err=%e, "failed to read request body");
+        io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
+    }));
+
+    // ingest the NAR
+    let (root_node, nar_hash_actual, nar_size) =
+        ingest_nar_and_hash(blob_service.clone(), directory_service.clone(), &mut r)
+            .await
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+            .map_err(|e| {
+                warn!(err=%e, "failed to ingest nar");
+                StatusCode::INTERNAL_SERVER_ERROR
+            })?;
+
+    let s = Span::current();
+    s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
+    s.record("nar_size", nar_size);
+
+    if nar_hash_expected != nar_hash_actual {
+        warn!(
+            nar_hash.expected = nixbase32::encode(&nar_hash_expected),
+            nar_hash.actual = nixbase32::encode(&nar_hash_actual),
+            "nar hash mismatch"
+        );
+        return Err(StatusCode::BAD_REQUEST);
+    }
+
+    // store mapping of narhash to root node into root_nodes.
+    // we need it later to populate the root node when accepting the PathInfo.
+    root_nodes.write().put(nar_hash_actual, root_node);
+
+    Ok("")
+}
diff --git a/tvix/nar-bridge/src/narinfo.rs b/tvix/nar-bridge/src/narinfo.rs
new file mode 100644
index 000000000000..76fda1d495c5
--- /dev/null
+++ b/tvix/nar-bridge/src/narinfo.rs
@@ -0,0 +1,153 @@
+use axum::{http::StatusCode, response::IntoResponse};
+use bytes::Bytes;
+use nix_compat::{
+    narinfo::{NarInfo, Signature},
+    nix_http, nixbase32,
+    store_path::StorePath,
+};
+use prost::Message;
+use tracing::{instrument, warn, Span};
+use tvix_castore::proto::{self as castorepb};
+use tvix_store::pathinfoservice::PathInfo;
+
+use crate::AppState;
+
+/// The size limit for NARInfo uploads nar-bridge receives
+const NARINFO_LIMIT: usize = 2 * 1024 * 1024;
+
+#[instrument(skip(path_info_service))]
+pub async fn head(
+    axum::extract::Path(narinfo_str): axum::extract::Path<String>,
+    axum::extract::State(AppState {
+        path_info_service, ..
+    }): axum::extract::State<AppState>,
+) -> Result<impl IntoResponse, StatusCode> {
+    let digest = nix_http::parse_narinfo_str(&narinfo_str).ok_or(StatusCode::NOT_FOUND)?;
+    Span::current().record("path_info.digest", &narinfo_str[0..32]);
+
+    if path_info_service
+        .get(digest)
+        .await
+        .map_err(|e| {
+            warn!(err=%e, "failed to get PathInfo");
+            StatusCode::INTERNAL_SERVER_ERROR
+        })?
+        .is_some()
+    {
+        Ok(([("content-type", nix_http::MIME_TYPE_NARINFO)], ""))
+    } else {
+        warn!("PathInfo not found");
+        Err(StatusCode::NOT_FOUND)
+    }
+}
+
+#[instrument(skip(path_info_service))]
+pub async fn get(
+    axum::extract::Path(narinfo_str): axum::extract::Path<String>,
+    axum::extract::State(AppState {
+        path_info_service, ..
+    }): axum::extract::State<AppState>,
+) -> Result<impl IntoResponse, StatusCode> {
+    let digest = nix_http::parse_narinfo_str(&narinfo_str).ok_or(StatusCode::NOT_FOUND)?;
+    Span::current().record("path_info.digest", &narinfo_str[0..32]);
+
+    // fetch the PathInfo
+    let path_info = path_info_service
+        .get(digest)
+        .await
+        .map_err(|e| {
+            warn!(err=%e, "failed to get PathInfo");
+            StatusCode::INTERNAL_SERVER_ERROR
+        })?
+        .ok_or(StatusCode::NOT_FOUND)?;
+
+    let url = format!(
+        "nar/tvix-castore/{}?narsize={}",
+        data_encoding::BASE64URL_NOPAD.encode(
+            &castorepb::Node::from_name_and_node("".into(), path_info.node.clone()).encode_to_vec()
+        ),
+        path_info.nar_size,
+    );
+
+    let mut narinfo = path_info.to_narinfo();
+    narinfo.url = &url;
+
+    Ok((
+        [("content-type", nix_http::MIME_TYPE_NARINFO)],
+        narinfo.to_string(),
+    ))
+}
+
+#[instrument(skip(path_info_service, root_nodes, request))]
+pub async fn put(
+    axum::extract::Path(narinfo_str): axum::extract::Path<String>,
+    axum::extract::State(AppState {
+        path_info_service,
+        root_nodes,
+        ..
+    }): axum::extract::State<AppState>,
+    request: axum::extract::Request,
+) -> Result<&'static str, StatusCode> {
+    let _narinfo_digest = nix_http::parse_narinfo_str(&narinfo_str).ok_or(StatusCode::UNAUTHORIZED);
+    Span::current().record("path_info.digest", &narinfo_str[0..32]);
+
+    let narinfo_bytes: Bytes = axum::body::to_bytes(request.into_body(), NARINFO_LIMIT)
+        .await
+        .map_err(|e| {
+            warn!(err=%e, "unable to fetch body");
+            StatusCode::BAD_REQUEST
+        })?;
+
+    // Parse the narinfo from the body.
+    let narinfo_str = std::str::from_utf8(narinfo_bytes.as_ref()).map_err(|e| {
+        warn!(err=%e, "unable decode body as string");
+        StatusCode::BAD_REQUEST
+    })?;
+
+    let narinfo = NarInfo::parse(narinfo_str).map_err(|e| {
+        warn!(err=%e, "unable to parse narinfo");
+        StatusCode::BAD_REQUEST
+    })?;
+
+    // Extract the NARHash from the PathInfo.
+    Span::current().record("path_info.nar_info", nixbase32::encode(&narinfo.nar_hash));
+
+    // Lookup root node with peek, as we don't want to update the LRU list.
+    // We need to be careful to not hold the RwLock across the await point.
+    let maybe_root_node: Option<tvix_castore::Node> =
+        root_nodes.read().peek(&narinfo.nar_hash).cloned();
+
+    match maybe_root_node {
+        Some(root_node) => {
+            // Persist the PathInfo.
+            path_info_service
+                .put(PathInfo {
+                    store_path: narinfo.store_path.to_owned(),
+                    node: root_node,
+                    references: narinfo.references.iter().map(StorePath::to_owned).collect(),
+                    nar_sha256: narinfo.nar_hash,
+                    nar_size: narinfo.nar_size,
+                    signatures: narinfo
+                        .signatures
+                        .into_iter()
+                        .map(|s| {
+                            Signature::<String>::new(s.name().to_string(), s.bytes().to_owned())
+                        })
+                        .collect(),
+                    deriver: narinfo.deriver.as_ref().map(StorePath::to_owned),
+                    ca: narinfo.ca,
+                })
+                .await
+                .map_err(|e| {
+                    warn!(err=%e, "failed to persist the PathInfo");
+                    StatusCode::INTERNAL_SERVER_ERROR
+                })?;
+
+            Ok("")
+        }
+        None => {
+            warn!("received narinfo with unknown NARHash");
+            Err(StatusCode::BAD_REQUEST)
+        }
+    }
+}