about summary refs log tree commit diff
path: root/tvix/nar-bridge
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/nar-bridge')
-rw-r--r--tvix/nar-bridge/Cargo.toml40
-rw-r--r--tvix/nar-bridge/default.nix11
-rw-r--r--tvix/nar-bridge/src/bin/nar-bridge.rs84
-rw-r--r--tvix/nar-bridge/src/lib.rs50
-rw-r--r--tvix/nar-bridge/src/nar.rs77
-rw-r--r--tvix/nar-bridge/src/narinfo.rs131
6 files changed, 393 insertions, 0 deletions
diff --git a/tvix/nar-bridge/Cargo.toml b/tvix/nar-bridge/Cargo.toml
new file mode 100644
index 000000000000..920b81c45cbe
--- /dev/null
+++ b/tvix/nar-bridge/Cargo.toml
@@ -0,0 +1,40 @@
+[package]
+name = "nar-bridge"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+axum = { version = "0.7.5", features = ["http2"] }
+bytes = "1.4.0"
+clap = { version = "4.0", features = ["derive", "env"] }
+data-encoding = "2.3.3"
+itertools = "0.12.0"
+prost = "0.12.1"
+nix-compat = { path = "../nix-compat", features = ["async"] }
+thiserror = "1.0.56"
+tokio = { version = "1.32.0" }
+tokio-listener = { version = "0.4.2", features = [ "axum07", "clap", "multi-listener", "sd_listen" ] }
+tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
+tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
+tvix-castore = { path = "../castore" }
+tvix-store = { path = "../store" }
+tvix-tracing = { path = "../tracing", features = ["tonic"] }
+tracing = "0.1.37"
+tracing-subscriber = "0.3.16"
+url = "2.4.0"
+serde = { version = "1.0.204", features = ["derive"] }
+
+[build-dependencies]
+prost-build = "0.12.1"
+tonic-build = "0.11.0"
+
+[features]
+default = ["otlp"]
+otlp = ["tvix-tracing/otlp"]
+
+[dev-dependencies]
+hex-literal = "0.4.1"
+rstest = "0.19.0"
+
+[lints]
+workspace = true
diff --git a/tvix/nar-bridge/default.nix b/tvix/nar-bridge/default.nix
new file mode 100644
index 000000000000..3e116a1fc02b
--- /dev/null
+++ b/tvix/nar-bridge/default.nix
@@ -0,0 +1,11 @@
+{ depot, lib, ... }:
+
+(depot.tvix.crates.workspaceMembers.nar-bridge.build.override {
+  runTests = true;
+}).overrideAttrs (old: rec {
+  meta.ci.targets = lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru);
+  passthru = (depot.tvix.utils.mkFeaturePowerset {
+    inherit (old) crateName;
+    features = [ "otlp" ];
+  });
+})
diff --git a/tvix/nar-bridge/src/bin/nar-bridge.rs b/tvix/nar-bridge/src/bin/nar-bridge.rs
new file mode 100644
index 000000000000..e24068ca2300
--- /dev/null
+++ b/tvix/nar-bridge/src/bin/nar-bridge.rs
@@ -0,0 +1,84 @@
+use clap::Parser;
+use nar_bridge::AppState;
+use tracing::info;
+
+/// Expose the Nix HTTP Binary Cache protocol for a tvix-store.
+#[derive(Parser)]
+#[command(author, version, about, long_about = None)]
+struct Cli {
+    #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+    blob_service_addr: String,
+
+    #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+    directory_service_addr: String,
+
+    #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+    path_info_service_addr: String,
+
+    /// The priority to announce at the `nix-cache-info` endpoint.
+    /// A lower number means it's *more preferred.
+    #[arg(long, env, default_value_t = 39)]
+    priority: u64,
+
+    /// The address to listen on.
+    #[clap(flatten)]
+    listen_args: tokio_listener::ListenerAddressLFlag,
+
+    #[cfg(feature = "otlp")]
+    /// Whether to configure OTLP. Set --otlp=false to disable.
+    #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))]
+    otlp: bool,
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
+    let cli = Cli::parse();
+
+    let _tracing_handle = {
+        #[allow(unused_mut)]
+        let mut builder = tvix_tracing::TracingBuilder::default();
+        #[cfg(feature = "otlp")]
+        {
+            if cli.otlp {
+                builder = builder.enable_otlp("tvix.store");
+            }
+        }
+        builder.build()?
+    };
+
+    // initialize stores
+    let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+        tvix_store::utils::construct_services(
+            cli.blob_service_addr,
+            cli.directory_service_addr,
+            cli.path_info_service_addr,
+        )
+        .await?;
+
+    let state = AppState::new(blob_service, directory_service, path_info_service.into());
+
+    let app = nar_bridge::gen_router(cli.priority).with_state(state);
+
+    let listen_address = &cli.listen_args.listen_address.unwrap_or_else(|| {
+        "[::]:8000"
+            .parse()
+            .expect("invalid fallback listen address")
+    });
+
+    let listener = tokio_listener::Listener::bind(
+        listen_address,
+        &Default::default(),
+        &cli.listen_args.listener_options,
+    )
+    .await?;
+
+    info!(listen_address=%listen_address, "starting daemon");
+
+    tokio_listener::axum07::serve(
+        listener,
+        app.into_make_service_with_connect_info::<tokio_listener::SomeSocketAddrClonable>(),
+    )
+    .await?;
+
+    Ok(())
+}
diff --git a/tvix/nar-bridge/src/lib.rs b/tvix/nar-bridge/src/lib.rs
new file mode 100644
index 000000000000..5d9ec43c1cde
--- /dev/null
+++ b/tvix/nar-bridge/src/lib.rs
@@ -0,0 +1,50 @@
+use axum::routing::head;
+use axum::{routing::get, Router};
+use std::sync::Arc;
+use tvix_castore::blobservice::BlobService;
+use tvix_castore::directoryservice::DirectoryService;
+use tvix_store::pathinfoservice::PathInfoService;
+
+mod nar;
+mod narinfo;
+
+#[derive(Clone)]
+pub struct AppState {
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+    path_info_service: Arc<dyn PathInfoService>,
+}
+
+impl AppState {
+    pub fn new(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+        path_info_service: Arc<dyn PathInfoService>,
+    ) -> Self {
+        Self {
+            blob_service,
+            directory_service,
+            path_info_service,
+        }
+    }
+}
+
+pub fn gen_router(priority: u64) -> Router<AppState> {
+    Router::new()
+        .route("/", get(root))
+        .route("/nar/tvix-castore/:root_node_enc", get(nar::get))
+        .route("/:narinfo_str", get(narinfo::get))
+        .route("/:narinfo_str", head(narinfo::head))
+        .route("/nix-cache-info", get(move || nix_cache_info(priority)))
+}
+
+async fn root() -> &'static str {
+    "Hello from nar-bridge"
+}
+
+async fn nix_cache_info(priority: u64) -> String {
+    format!(
+        "StoreDir: /nix/store\nWantMassQuery: 1\nPriority: {}\n",
+        priority
+    )
+}
diff --git a/tvix/nar-bridge/src/nar.rs b/tvix/nar-bridge/src/nar.rs
new file mode 100644
index 000000000000..63cd86e49969
--- /dev/null
+++ b/tvix/nar-bridge/src/nar.rs
@@ -0,0 +1,77 @@
+use axum::body::Body;
+use axum::extract::Query;
+use axum::http::StatusCode;
+use axum::response::Response;
+use bytes::Bytes;
+use data_encoding::BASE64URL_NOPAD;
+use serde::Deserialize;
+use tokio_util::io::ReaderStream;
+use tracing::{instrument, warn};
+
+use crate::AppState;
+
+#[derive(Debug, Deserialize)]
+pub(crate) struct GetNARParams {
+    #[serde(rename = "narsize")]
+    nar_size: u64,
+}
+
+#[instrument(skip(blob_service, directory_service))]
+pub async fn get(
+    axum::extract::Path(root_node_enc): axum::extract::Path<String>,
+    axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
+    axum::extract::State(AppState {
+        blob_service,
+        directory_service,
+        ..
+    }): axum::extract::State<AppState>,
+) -> Result<Response, StatusCode> {
+    use prost::Message;
+    // b64decode the root node passed *by the user*
+    let root_node_proto = BASE64URL_NOPAD
+        .decode(root_node_enc.as_bytes())
+        .map_err(|e| {
+            warn!(err=%e, "unable to decode root node b64");
+            StatusCode::NOT_FOUND
+        })?;
+
+    // check the proto size to be somewhat reasonable before parsing it.
+    if root_node_proto.len() > 4096 {
+        warn!("rejected too large root node");
+        return Err(StatusCode::BAD_REQUEST);
+    }
+
+    // parse the proto
+    let root_node: tvix_castore::proto::Node = Message::decode(Bytes::from(root_node_enc))
+        .map_err(|e| {
+            warn!(err=%e, "unable to decode root node proto");
+            StatusCode::NOT_FOUND
+        })?;
+
+    // validate it.
+    let root_node = root_node
+        .validate()
+        .map_err(|e| {
+            warn!(err=%e, "root node validation failed");
+            StatusCode::BAD_REQUEST
+        })?
+        .to_owned();
+
+    let (w, r) = tokio::io::duplex(1024 * 8);
+
+    // spawn a task rendering the NAR to the client
+    tokio::spawn(async move {
+        if let Err(e) =
+            tvix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
+        {
+            warn!(err=%e, "failed to write out NAR");
+        }
+    });
+
+    Ok(Response::builder()
+        .status(StatusCode::OK)
+        .header("cache-control", "max-age=31536000, immutable")
+        .header("content-length", nar_size)
+        .body(Body::from_stream(ReaderStream::new(r)))
+        .unwrap())
+}
diff --git a/tvix/nar-bridge/src/narinfo.rs b/tvix/nar-bridge/src/narinfo.rs
new file mode 100644
index 000000000000..7b6c1bdfdccf
--- /dev/null
+++ b/tvix/nar-bridge/src/narinfo.rs
@@ -0,0 +1,131 @@
+use axum::http::StatusCode;
+use nix_compat::nixbase32;
+use tracing::{instrument, warn, Span};
+use tvix_castore::proto::node::Node;
+
+use crate::AppState;
+
+#[instrument(skip(path_info_service))]
+pub async fn head(
+    axum::extract::Path(narinfo_str): axum::extract::Path<String>,
+    axum::extract::State(AppState {
+        path_info_service, ..
+    }): axum::extract::State<AppState>,
+) -> Result<&'static str, StatusCode> {
+    let digest = parse_narinfo_str(&narinfo_str)?;
+    Span::current().record("path_info.digest", &narinfo_str[0..32]);
+
+    if path_info_service
+        .get(digest)
+        .await
+        .map_err(|e| {
+            warn!(err=%e, "failed to get PathInfo");
+            StatusCode::INTERNAL_SERVER_ERROR
+        })?
+        .is_some()
+    {
+        Ok("")
+    } else {
+        warn!("PathInfo not found");
+        Err(StatusCode::NOT_FOUND)
+    }
+}
+
+#[instrument(skip(path_info_service))]
+pub async fn get(
+    axum::extract::Path(narinfo_str): axum::extract::Path<String>,
+    axum::extract::State(AppState {
+        path_info_service, ..
+    }): axum::extract::State<AppState>,
+) -> Result<String, StatusCode> {
+    let digest = parse_narinfo_str(&narinfo_str)?;
+    Span::current().record("path_info.digest", &narinfo_str[0..32]);
+
+    // fetch the PathInfo
+    let path_info = path_info_service
+        .get(digest)
+        .await
+        .map_err(|e| {
+            warn!(err=%e, "failed to get PathInfo");
+            StatusCode::INTERNAL_SERVER_ERROR
+        })?
+        .ok_or(StatusCode::NOT_FOUND)?;
+
+    let store_path = path_info.validate().map_err(|e| {
+        warn!(err=%e, "invalid PathInfo");
+        StatusCode::INTERNAL_SERVER_ERROR
+    })?;
+
+    let mut narinfo = path_info.to_narinfo(store_path).ok_or_else(|| {
+        warn!(path_info=?path_info, "PathInfo contained no NAR data");
+        StatusCode::INTERNAL_SERVER_ERROR
+    })?;
+
+    // encode the (unnamed) root node in the NAR url itself.
+    let root_node = path_info
+        .node
+        .as_ref()
+        .and_then(|n| n.node.as_ref())
+        .expect("root node must not be none")
+        .clone()
+        .rename("".into());
+
+    let mut buf = Vec::new();
+    Node::encode(&root_node, &mut buf);
+
+    let url = format!(
+        "nar/tvix-castore/{}?narsize={}",
+        data_encoding::BASE64URL_NOPAD.encode(&buf),
+        narinfo.nar_size,
+    );
+
+    narinfo.url = &url;
+
+    Ok(narinfo.to_string())
+}
+
+/// Parses a `3mzh8lvgbynm9daj7c82k2sfsfhrsfsy.narinfo` string and returns the
+/// nixbase32-decoded digest.
+fn parse_narinfo_str(s: &str) -> Result<[u8; 20], StatusCode> {
+    if !s.is_char_boundary(32) {
+        warn!("invalid string, no char boundary at 32");
+        return Err(StatusCode::NOT_FOUND);
+    }
+
+    Ok(match s.split_at(32) {
+        (hash_str, ".narinfo") => {
+            // we know this is 32 bytes
+            let hash_str_fixed: [u8; 32] = hash_str.as_bytes().try_into().unwrap();
+            nixbase32::decode_fixed(hash_str_fixed).map_err(|e| {
+                warn!(err=%e, "invalid digest");
+                StatusCode::NOT_FOUND
+            })?
+        }
+        _ => {
+            warn!("invalid string");
+            return Err(StatusCode::NOT_FOUND);
+        }
+    })
+}
+
+#[cfg(test)]
+mod test {
+    use super::parse_narinfo_str;
+    use hex_literal::hex;
+
+    #[test]
+    fn success() {
+        assert_eq!(
+            hex!("8a12321522fd91efbd60ebb2481af88580f61600"),
+            parse_narinfo_str("00bgd045z0d4icpbc2yyz4gx48ak44la.narinfo").unwrap()
+        );
+    }
+
+    #[test]
+    fn failure() {
+        assert!(parse_narinfo_str("00bgd045z0d4icpbc2yyz4gx48ak44la").is_err());
+        assert!(parse_narinfo_str("/00bgd045z0d4icpbc2yyz4gx48ak44la").is_err());
+        assert!(parse_narinfo_str("000000").is_err());
+        assert!(parse_narinfo_str("00bgd045z0d4icpbc2yyz4gx48ak44l🦊.narinfo").is_err());
+    }
+}