diff options
Diffstat (limited to 'tvix/nar-bridge')
-rw-r--r-- | tvix/nar-bridge/Cargo.toml | 40 | ||||
-rw-r--r-- | tvix/nar-bridge/default.nix | 11 | ||||
-rw-r--r-- | tvix/nar-bridge/src/bin/nar-bridge.rs | 84 | ||||
-rw-r--r-- | tvix/nar-bridge/src/lib.rs | 50 | ||||
-rw-r--r-- | tvix/nar-bridge/src/nar.rs | 77 | ||||
-rw-r--r-- | tvix/nar-bridge/src/narinfo.rs | 131 |
6 files changed, 393 insertions, 0 deletions
diff --git a/tvix/nar-bridge/Cargo.toml b/tvix/nar-bridge/Cargo.toml new file mode 100644 index 000000000000..920b81c45cbe --- /dev/null +++ b/tvix/nar-bridge/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "nar-bridge" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = { version = "0.7.5", features = ["http2"] } +bytes = "1.4.0" +clap = { version = "4.0", features = ["derive", "env"] } +data-encoding = "2.3.3" +itertools = "0.12.0" +prost = "0.12.1" +nix-compat = { path = "../nix-compat", features = ["async"] } +thiserror = "1.0.56" +tokio = { version = "1.32.0" } +tokio-listener = { version = "0.4.2", features = [ "axum07", "clap", "multi-listener", "sd_listen" ] } +tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } +tonic = { version = "0.11.0", features = ["tls", "tls-roots"] } +tvix-castore = { path = "../castore" } +tvix-store = { path = "../store" } +tvix-tracing = { path = "../tracing", features = ["tonic"] } +tracing = "0.1.37" +tracing-subscriber = "0.3.16" +url = "2.4.0" +serde = { version = "1.0.204", features = ["derive"] } + +[build-dependencies] +prost-build = "0.12.1" +tonic-build = "0.11.0" + +[features] +default = ["otlp"] +otlp = ["tvix-tracing/otlp"] + +[dev-dependencies] +hex-literal = "0.4.1" +rstest = "0.19.0" + +[lints] +workspace = true diff --git a/tvix/nar-bridge/default.nix b/tvix/nar-bridge/default.nix new file mode 100644 index 000000000000..3e116a1fc02b --- /dev/null +++ b/tvix/nar-bridge/default.nix @@ -0,0 +1,11 @@ +{ depot, lib, ... }: + +(depot.tvix.crates.workspaceMembers.nar-bridge.build.override { + runTests = true; +}).overrideAttrs (old: rec { + meta.ci.targets = lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru); + passthru = (depot.tvix.utils.mkFeaturePowerset { + inherit (old) crateName; + features = [ "otlp" ]; + }); +}) diff --git a/tvix/nar-bridge/src/bin/nar-bridge.rs b/tvix/nar-bridge/src/bin/nar-bridge.rs new file mode 100644 index 000000000000..e24068ca2300 --- /dev/null +++ b/tvix/nar-bridge/src/bin/nar-bridge.rs @@ -0,0 +1,84 @@ +use clap::Parser; +use nar_bridge::AppState; +use tracing::info; + +/// Expose the Nix HTTP Binary Cache protocol for a tvix-store. +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// The priority to announce at the `nix-cache-info` endpoint. + /// A lower number means it's *more preferred. + #[arg(long, env, default_value_t = 39)] + priority: u64, + + /// The address to listen on. + #[clap(flatten)] + listen_args: tokio_listener::ListenerAddressLFlag, + + #[cfg(feature = "otlp")] + /// Whether to configure OTLP. Set --otlp=false to disable. + #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))] + otlp: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + let cli = Cli::parse(); + + let _tracing_handle = { + #[allow(unused_mut)] + let mut builder = tvix_tracing::TracingBuilder::default(); + #[cfg(feature = "otlp")] + { + if cli.otlp { + builder = builder.enable_otlp("tvix.store"); + } + } + builder.build()? + }; + + // initialize stores + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = + tvix_store::utils::construct_services( + cli.blob_service_addr, + cli.directory_service_addr, + cli.path_info_service_addr, + ) + .await?; + + let state = AppState::new(blob_service, directory_service, path_info_service.into()); + + let app = nar_bridge::gen_router(cli.priority).with_state(state); + + let listen_address = &cli.listen_args.listen_address.unwrap_or_else(|| { + "[::]:8000" + .parse() + .expect("invalid fallback listen address") + }); + + let listener = tokio_listener::Listener::bind( + listen_address, + &Default::default(), + &cli.listen_args.listener_options, + ) + .await?; + + info!(listen_address=%listen_address, "starting daemon"); + + tokio_listener::axum07::serve( + listener, + app.into_make_service_with_connect_info::<tokio_listener::SomeSocketAddrClonable>(), + ) + .await?; + + Ok(()) +} diff --git a/tvix/nar-bridge/src/lib.rs b/tvix/nar-bridge/src/lib.rs new file mode 100644 index 000000000000..5d9ec43c1cde --- /dev/null +++ b/tvix/nar-bridge/src/lib.rs @@ -0,0 +1,50 @@ +use axum::routing::head; +use axum::{routing::get, Router}; +use std::sync::Arc; +use tvix_castore::blobservice::BlobService; +use tvix_castore::directoryservice::DirectoryService; +use tvix_store::pathinfoservice::PathInfoService; + +mod nar; +mod narinfo; + +#[derive(Clone)] +pub struct AppState { + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + path_info_service: Arc<dyn PathInfoService>, +} + +impl AppState { + pub fn new( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + path_info_service: Arc<dyn PathInfoService>, + ) -> Self { + Self { + blob_service, + directory_service, + path_info_service, + } + } +} + +pub fn gen_router(priority: u64) -> Router<AppState> { + Router::new() + .route("/", get(root)) + .route("/nar/tvix-castore/:root_node_enc", get(nar::get)) + .route("/:narinfo_str", get(narinfo::get)) + .route("/:narinfo_str", head(narinfo::head)) + .route("/nix-cache-info", get(move || nix_cache_info(priority))) +} + +async fn root() -> &'static str { + "Hello from nar-bridge" +} + +async fn nix_cache_info(priority: u64) -> String { + format!( + "StoreDir: /nix/store\nWantMassQuery: 1\nPriority: {}\n", + priority + ) +} diff --git a/tvix/nar-bridge/src/nar.rs b/tvix/nar-bridge/src/nar.rs new file mode 100644 index 000000000000..63cd86e49969 --- /dev/null +++ b/tvix/nar-bridge/src/nar.rs @@ -0,0 +1,77 @@ +use axum::body::Body; +use axum::extract::Query; +use axum::http::StatusCode; +use axum::response::Response; +use bytes::Bytes; +use data_encoding::BASE64URL_NOPAD; +use serde::Deserialize; +use tokio_util::io::ReaderStream; +use tracing::{instrument, warn}; + +use crate::AppState; + +#[derive(Debug, Deserialize)] +pub(crate) struct GetNARParams { + #[serde(rename = "narsize")] + nar_size: u64, +} + +#[instrument(skip(blob_service, directory_service))] +pub async fn get( + axum::extract::Path(root_node_enc): axum::extract::Path<String>, + axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>, + axum::extract::State(AppState { + blob_service, + directory_service, + .. + }): axum::extract::State<AppState>, +) -> Result<Response, StatusCode> { + use prost::Message; + // b64decode the root node passed *by the user* + let root_node_proto = BASE64URL_NOPAD + .decode(root_node_enc.as_bytes()) + .map_err(|e| { + warn!(err=%e, "unable to decode root node b64"); + StatusCode::NOT_FOUND + })?; + + // check the proto size to be somewhat reasonable before parsing it. + if root_node_proto.len() > 4096 { + warn!("rejected too large root node"); + return Err(StatusCode::BAD_REQUEST); + } + + // parse the proto + let root_node: tvix_castore::proto::Node = Message::decode(Bytes::from(root_node_enc)) + .map_err(|e| { + warn!(err=%e, "unable to decode root node proto"); + StatusCode::NOT_FOUND + })?; + + // validate it. + let root_node = root_node + .validate() + .map_err(|e| { + warn!(err=%e, "root node validation failed"); + StatusCode::BAD_REQUEST + })? + .to_owned(); + + let (w, r) = tokio::io::duplex(1024 * 8); + + // spawn a task rendering the NAR to the client + tokio::spawn(async move { + if let Err(e) = + tvix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await + { + warn!(err=%e, "failed to write out NAR"); + } + }); + + Ok(Response::builder() + .status(StatusCode::OK) + .header("cache-control", "max-age=31536000, immutable") + .header("content-length", nar_size) + .body(Body::from_stream(ReaderStream::new(r))) + .unwrap()) +} diff --git a/tvix/nar-bridge/src/narinfo.rs b/tvix/nar-bridge/src/narinfo.rs new file mode 100644 index 000000000000..7b6c1bdfdccf --- /dev/null +++ b/tvix/nar-bridge/src/narinfo.rs @@ -0,0 +1,131 @@ +use axum::http::StatusCode; +use nix_compat::nixbase32; +use tracing::{instrument, warn, Span}; +use tvix_castore::proto::node::Node; + +use crate::AppState; + +#[instrument(skip(path_info_service))] +pub async fn head( + axum::extract::Path(narinfo_str): axum::extract::Path<String>, + axum::extract::State(AppState { + path_info_service, .. + }): axum::extract::State<AppState>, +) -> Result<&'static str, StatusCode> { + let digest = parse_narinfo_str(&narinfo_str)?; + Span::current().record("path_info.digest", &narinfo_str[0..32]); + + if path_info_service + .get(digest) + .await + .map_err(|e| { + warn!(err=%e, "failed to get PathInfo"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .is_some() + { + Ok("") + } else { + warn!("PathInfo not found"); + Err(StatusCode::NOT_FOUND) + } +} + +#[instrument(skip(path_info_service))] +pub async fn get( + axum::extract::Path(narinfo_str): axum::extract::Path<String>, + axum::extract::State(AppState { + path_info_service, .. + }): axum::extract::State<AppState>, +) -> Result<String, StatusCode> { + let digest = parse_narinfo_str(&narinfo_str)?; + Span::current().record("path_info.digest", &narinfo_str[0..32]); + + // fetch the PathInfo + let path_info = path_info_service + .get(digest) + .await + .map_err(|e| { + warn!(err=%e, "failed to get PathInfo"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let store_path = path_info.validate().map_err(|e| { + warn!(err=%e, "invalid PathInfo"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let mut narinfo = path_info.to_narinfo(store_path).ok_or_else(|| { + warn!(path_info=?path_info, "PathInfo contained no NAR data"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // encode the (unnamed) root node in the NAR url itself. + let root_node = path_info + .node + .as_ref() + .and_then(|n| n.node.as_ref()) + .expect("root node must not be none") + .clone() + .rename("".into()); + + let mut buf = Vec::new(); + Node::encode(&root_node, &mut buf); + + let url = format!( + "nar/tvix-castore/{}?narsize={}", + data_encoding::BASE64URL_NOPAD.encode(&buf), + narinfo.nar_size, + ); + + narinfo.url = &url; + + Ok(narinfo.to_string()) +} + +/// Parses a `3mzh8lvgbynm9daj7c82k2sfsfhrsfsy.narinfo` string and returns the +/// nixbase32-decoded digest. +fn parse_narinfo_str(s: &str) -> Result<[u8; 20], StatusCode> { + if !s.is_char_boundary(32) { + warn!("invalid string, no char boundary at 32"); + return Err(StatusCode::NOT_FOUND); + } + + Ok(match s.split_at(32) { + (hash_str, ".narinfo") => { + // we know this is 32 bytes + let hash_str_fixed: [u8; 32] = hash_str.as_bytes().try_into().unwrap(); + nixbase32::decode_fixed(hash_str_fixed).map_err(|e| { + warn!(err=%e, "invalid digest"); + StatusCode::NOT_FOUND + })? + } + _ => { + warn!("invalid string"); + return Err(StatusCode::NOT_FOUND); + } + }) +} + +#[cfg(test)] +mod test { + use super::parse_narinfo_str; + use hex_literal::hex; + + #[test] + fn success() { + assert_eq!( + hex!("8a12321522fd91efbd60ebb2481af88580f61600"), + parse_narinfo_str("00bgd045z0d4icpbc2yyz4gx48ak44la.narinfo").unwrap() + ); + } + + #[test] + fn failure() { + assert!(parse_narinfo_str("00bgd045z0d4icpbc2yyz4gx48ak44la").is_err()); + assert!(parse_narinfo_str("/00bgd045z0d4icpbc2yyz4gx48ak44la").is_err()); + assert!(parse_narinfo_str("000000").is_err()); + assert!(parse_narinfo_str("00bgd045z0d4icpbc2yyz4gx48ak44l🦊.narinfo").is_err()); + } +} |