diff options
Diffstat (limited to 'tvix/nar-bridge/src')
-rw-r--r-- | tvix/nar-bridge/src/bin/nar-bridge.rs | 92 | ||||
-rw-r--r-- | tvix/nar-bridge/src/lib.rs | 85 | ||||
-rw-r--r-- | tvix/nar-bridge/src/nar.rs | 177 | ||||
-rw-r--r-- | tvix/nar-bridge/src/narinfo.rs | 153 |
4 files changed, 507 insertions, 0 deletions
diff --git a/tvix/nar-bridge/src/bin/nar-bridge.rs b/tvix/nar-bridge/src/bin/nar-bridge.rs new file mode 100644 index 000000000000..48eba0ac44ac --- /dev/null +++ b/tvix/nar-bridge/src/bin/nar-bridge.rs @@ -0,0 +1,92 @@ +use clap::Parser; +use mimalloc::MiMalloc; +use nar_bridge::AppState; +use tower::ServiceBuilder; +use tower_http::trace::{DefaultMakeSpan, TraceLayer}; +use tracing::info; +use tvix_store::utils::ServiceUrlsGrpc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +/// Expose the Nix HTTP Binary Cache protocol for a tvix-store. +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + #[clap(flatten)] + service_addrs: ServiceUrlsGrpc, + + /// The priority to announce at the `nix-cache-info` endpoint. + /// A lower number means it's *more preferred. + #[arg(long, env, default_value_t = 39)] + priority: u64, + + /// The address to listen on. + #[clap(flatten)] + listen_args: tokio_listener::ListenerAddressLFlag, + + #[cfg(feature = "otlp")] + /// Whether to configure OTLP. Set --otlp=false to disable. + #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))] + otlp: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + let cli = Cli::parse(); + + let _tracing_handle = { + #[allow(unused_mut)] + let mut builder = tvix_tracing::TracingBuilder::default(); + #[cfg(feature = "otlp")] + { + if cli.otlp { + builder = builder.enable_otlp("tvix.store"); + } + } + builder.build()? + }; + + // initialize stores + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = + tvix_store::utils::construct_services(cli.service_addrs).await?; + + let state = AppState::new(blob_service, directory_service, path_info_service); + + let app = nar_bridge::gen_router(cli.priority) + .layer( + ServiceBuilder::new() + .layer( + TraceLayer::new_for_http().make_span_with( + DefaultMakeSpan::new() + .level(tracing::Level::INFO) + .include_headers(true), + ), + ) + .map_request(tvix_tracing::propagate::axum::accept_trace), + ) + .with_state(state); + + let listen_address = &cli.listen_args.listen_address.unwrap_or_else(|| { + "[::]:9000" + .parse() + .expect("invalid fallback listen address") + }); + + let listener = tokio_listener::Listener::bind( + listen_address, + &Default::default(), + &cli.listen_args.listener_options, + ) + .await?; + + info!(listen_address=%listen_address, "starting daemon"); + + tokio_listener::axum07::serve( + listener, + app.into_make_service_with_connect_info::<tokio_listener::SomeSocketAddrClonable>(), + ) + .await?; + + Ok(()) +} diff --git a/tvix/nar-bridge/src/lib.rs b/tvix/nar-bridge/src/lib.rs new file mode 100644 index 000000000000..c4a6c8d5f2dc --- /dev/null +++ b/tvix/nar-bridge/src/lib.rs @@ -0,0 +1,85 @@ +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::routing::{head, put}; +use axum::{routing::get, Router}; +use lru::LruCache; +use nix_compat::nix_http; +use parking_lot::RwLock; +use std::num::NonZeroUsize; +use std::sync::Arc; +use tvix_castore::blobservice::BlobService; +use tvix_castore::directoryservice::DirectoryService; +use tvix_castore::Node; +use tvix_store::pathinfoservice::PathInfoService; + +mod nar; +mod narinfo; + +/// The capacity of the lookup table from NarHash to [Node]. +/// Should be bigger than the number of concurrent NAR upload. +/// Cannot be [NonZeroUsize] here due to rust-analyzer going bananas. +/// SAFETY: 1000 != 0 +const ROOT_NODES_CACHE_CAPACITY: usize = 1000; + +#[derive(Clone)] +pub struct AppState { + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + path_info_service: Arc<dyn PathInfoService>, + + /// Lookup table from NarHash to [Node], necessary to populate the root_node + /// field of the PathInfo when processing the narinfo upload. + root_nodes: Arc<RwLock<LruCache<[u8; 32], Node>>>, +} + +impl AppState { + pub fn new( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + path_info_service: Arc<dyn PathInfoService>, + ) -> Self { + Self { + blob_service, + directory_service, + path_info_service, + root_nodes: Arc::new(RwLock::new(LruCache::new({ + // SAFETY: 1000 != 0 + unsafe { NonZeroUsize::new_unchecked(ROOT_NODES_CACHE_CAPACITY) } + }))), + } + } +} + +pub fn gen_router(priority: u64) -> Router<AppState> { + Router::new() + .route("/", get(root)) + // FUTUREWORK: respond for NARs that we still have in root_nodes (at least HEAD) + // This avoids some unnecessary NAR uploading from multiple concurrent clients, and is cheap. + .route("/nar/:nar_str", get(four_o_four)) + .route("/nar/:nar_str", head(four_o_four)) + .route("/nar/:nar_str", put(nar::put)) + .route("/nar/tvix-castore/:root_node_enc", get(nar::get_head)) + .route("/nar/tvix-castore/:root_node_enc", head(nar::get_head)) + .route("/:narinfo_str", get(narinfo::get)) + .route("/:narinfo_str", head(narinfo::head)) + .route("/:narinfo_str", put(narinfo::put)) + .route("/nix-cache-info", get(move || nix_cache_info(priority))) +} + +async fn root() -> &'static str { + "Hello from nar-bridge" +} + +async fn four_o_four() -> Result<(), StatusCode> { + Err(StatusCode::NOT_FOUND) +} + +async fn nix_cache_info(priority: u64) -> impl IntoResponse { + ( + [("Content-Type", nix_http::MIME_TYPE_CACHE_INFO)], + format!( + "StoreDir: /nix/store\nWantMassQuery: 1\nPriority: {}\n", + priority + ), + ) +} diff --git a/tvix/nar-bridge/src/nar.rs b/tvix/nar-bridge/src/nar.rs new file mode 100644 index 000000000000..abc0d854d7c7 --- /dev/null +++ b/tvix/nar-bridge/src/nar.rs @@ -0,0 +1,177 @@ +use axum::extract::Query; +use axum::http::StatusCode; +use axum::response::Response; +use axum::{body::Body, response::IntoResponse}; +use axum_extra::{headers::Range, TypedHeader}; +use axum_range::{KnownSize, Ranged}; +use bytes::Bytes; +use data_encoding::BASE64URL_NOPAD; +use futures::TryStreamExt; +use nix_compat::{nix_http, nixbase32}; +use serde::Deserialize; +use std::io; +use tokio_util::io::ReaderStream; +use tracing::{instrument, warn, Span}; +use tvix_store::nar::ingest_nar_and_hash; + +use crate::AppState; + +#[derive(Debug, Deserialize)] +pub(crate) struct GetNARParams { + #[serde(rename = "narsize")] + nar_size: u64, +} + +#[instrument(skip(blob_service, directory_service))] +pub async fn get_head( + method: axum::http::Method, + ranges: Option<TypedHeader<Range>>, + axum::extract::Path(root_node_enc): axum::extract::Path<String>, + axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>, + axum::extract::State(AppState { + blob_service, + directory_service, + .. + }): axum::extract::State<AppState>, +) -> Result<impl axum::response::IntoResponse, StatusCode> { + use prost::Message; + // b64decode the root node passed *by the user* + let root_node_proto = BASE64URL_NOPAD + .decode(root_node_enc.as_bytes()) + .map_err(|e| { + warn!(err=%e, "unable to decode root node b64"); + StatusCode::NOT_FOUND + })?; + + // check the proto size to be somewhat reasonable before parsing it. + if root_node_proto.len() > 4096 { + warn!("rejected too large root node"); + return Err(StatusCode::BAD_REQUEST); + } + + // parse the proto + let root_node: tvix_castore::proto::Node = Message::decode(Bytes::from(root_node_proto)) + .map_err(|e| { + warn!(err=%e, "unable to decode root node proto"); + StatusCode::NOT_FOUND + })?; + + let root_node = root_node.try_into_anonymous_node().map_err(|e| { + warn!(err=%e, "root node validation failed"); + StatusCode::BAD_REQUEST + })?; + + Ok(( + // headers + [ + ("cache-control", "max-age=31536000, immutable"), + ("content-type", nix_http::MIME_TYPE_NAR), + ], + if method == axum::http::Method::HEAD { + // If this is a HEAD request, construct a response returning back the + // user-provided content-length, but don't actually talk to castore. + Response::builder() + .header("content-length", nar_size) + .body(Body::empty()) + .unwrap() + } else if let Some(TypedHeader(ranges)) = ranges { + // If this is a range request, construct a seekable NAR reader. + let r = + tvix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service) + .await + .map_err(|e| { + warn!(err=%e, "failed to construct seekable nar reader"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // ensure the user-supplied nar size was correct, no point returning data otherwise. + if r.stream_len() != nar_size { + warn!( + actual_nar_size = r.stream_len(), + supplied_nar_size = nar_size, + "wrong nar size supplied" + ); + return Err(StatusCode::BAD_REQUEST); + } + Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response() + } else { + // use the non-seekable codepath if there's no range(s) requested, + // as it uses less memory. + let (w, r) = tokio::io::duplex(1024 * 8); + + // spawn a task rendering the NAR to the client. + tokio::spawn(async move { + if let Err(e) = + tvix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await + { + warn!(err=%e, "failed to write out NAR"); + } + }); + + Response::builder() + .header("content-length", nar_size) + .body(Body::from_stream(ReaderStream::new(r))) + .unwrap() + }, + )) +} + +#[instrument(skip(blob_service, directory_service, request))] +pub async fn put( + axum::extract::Path(nar_str): axum::extract::Path<String>, + axum::extract::State(AppState { + blob_service, + directory_service, + root_nodes, + .. + }): axum::extract::State<AppState>, + request: axum::extract::Request, +) -> Result<&'static str, StatusCode> { + let (nar_hash_expected, compression_suffix) = + nix_http::parse_nar_str(&nar_str).ok_or(StatusCode::UNAUTHORIZED)?; + + // No paths with compression suffix are supported. + if !compression_suffix.is_empty() { + warn!(%compression_suffix, "invalid compression suffix requested"); + return Err(StatusCode::UNAUTHORIZED); + } + + let s = request.into_body().into_data_stream(); + + let mut r = tokio_util::io::StreamReader::new(s.map_err(|e| { + warn!(err=%e, "failed to read request body"); + io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) + })); + + // ingest the NAR + let (root_node, nar_hash_actual, nar_size) = + ingest_nar_and_hash(blob_service.clone(), directory_service.clone(), &mut r) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .map_err(|e| { + warn!(err=%e, "failed to ingest nar"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let s = Span::current(); + s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected)); + s.record("nar_size", nar_size); + + if nar_hash_expected != nar_hash_actual { + warn!( + nar_hash.expected = nixbase32::encode(&nar_hash_expected), + nar_hash.actual = nixbase32::encode(&nar_hash_actual), + "nar hash mismatch" + ); + return Err(StatusCode::BAD_REQUEST); + } + + // store mapping of narhash to root node into root_nodes. + // we need it later to populate the root node when accepting the PathInfo. + root_nodes.write().put(nar_hash_actual, root_node); + + Ok("") +} + +// FUTUREWORK: maybe head by narhash. Though not too critical, as we do +// implement HEAD for .narinfo. diff --git a/tvix/nar-bridge/src/narinfo.rs b/tvix/nar-bridge/src/narinfo.rs new file mode 100644 index 000000000000..76fda1d495c5 --- /dev/null +++ b/tvix/nar-bridge/src/narinfo.rs @@ -0,0 +1,153 @@ +use axum::{http::StatusCode, response::IntoResponse}; +use bytes::Bytes; +use nix_compat::{ + narinfo::{NarInfo, Signature}, + nix_http, nixbase32, + store_path::StorePath, +}; +use prost::Message; +use tracing::{instrument, warn, Span}; +use tvix_castore::proto::{self as castorepb}; +use tvix_store::pathinfoservice::PathInfo; + +use crate::AppState; + +/// The size limit for NARInfo uploads nar-bridge receives +const NARINFO_LIMIT: usize = 2 * 1024 * 1024; + +#[instrument(skip(path_info_service))] +pub async fn head( + axum::extract::Path(narinfo_str): axum::extract::Path<String>, + axum::extract::State(AppState { + path_info_service, .. + }): axum::extract::State<AppState>, +) -> Result<impl IntoResponse, StatusCode> { + let digest = nix_http::parse_narinfo_str(&narinfo_str).ok_or(StatusCode::NOT_FOUND)?; + Span::current().record("path_info.digest", &narinfo_str[0..32]); + + if path_info_service + .get(digest) + .await + .map_err(|e| { + warn!(err=%e, "failed to get PathInfo"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .is_some() + { + Ok(([("content-type", nix_http::MIME_TYPE_NARINFO)], "")) + } else { + warn!("PathInfo not found"); + Err(StatusCode::NOT_FOUND) + } +} + +#[instrument(skip(path_info_service))] +pub async fn get( + axum::extract::Path(narinfo_str): axum::extract::Path<String>, + axum::extract::State(AppState { + path_info_service, .. + }): axum::extract::State<AppState>, +) -> Result<impl IntoResponse, StatusCode> { + let digest = nix_http::parse_narinfo_str(&narinfo_str).ok_or(StatusCode::NOT_FOUND)?; + Span::current().record("path_info.digest", &narinfo_str[0..32]); + + // fetch the PathInfo + let path_info = path_info_service + .get(digest) + .await + .map_err(|e| { + warn!(err=%e, "failed to get PathInfo"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let url = format!( + "nar/tvix-castore/{}?narsize={}", + data_encoding::BASE64URL_NOPAD.encode( + &castorepb::Node::from_name_and_node("".into(), path_info.node.clone()).encode_to_vec() + ), + path_info.nar_size, + ); + + let mut narinfo = path_info.to_narinfo(); + narinfo.url = &url; + + Ok(( + [("content-type", nix_http::MIME_TYPE_NARINFO)], + narinfo.to_string(), + )) +} + +#[instrument(skip(path_info_service, root_nodes, request))] +pub async fn put( + axum::extract::Path(narinfo_str): axum::extract::Path<String>, + axum::extract::State(AppState { + path_info_service, + root_nodes, + .. + }): axum::extract::State<AppState>, + request: axum::extract::Request, +) -> Result<&'static str, StatusCode> { + let _narinfo_digest = nix_http::parse_narinfo_str(&narinfo_str).ok_or(StatusCode::UNAUTHORIZED); + Span::current().record("path_info.digest", &narinfo_str[0..32]); + + let narinfo_bytes: Bytes = axum::body::to_bytes(request.into_body(), NARINFO_LIMIT) + .await + .map_err(|e| { + warn!(err=%e, "unable to fetch body"); + StatusCode::BAD_REQUEST + })?; + + // Parse the narinfo from the body. + let narinfo_str = std::str::from_utf8(narinfo_bytes.as_ref()).map_err(|e| { + warn!(err=%e, "unable decode body as string"); + StatusCode::BAD_REQUEST + })?; + + let narinfo = NarInfo::parse(narinfo_str).map_err(|e| { + warn!(err=%e, "unable to parse narinfo"); + StatusCode::BAD_REQUEST + })?; + + // Extract the NARHash from the PathInfo. + Span::current().record("path_info.nar_info", nixbase32::encode(&narinfo.nar_hash)); + + // Lookup root node with peek, as we don't want to update the LRU list. + // We need to be careful to not hold the RwLock across the await point. + let maybe_root_node: Option<tvix_castore::Node> = + root_nodes.read().peek(&narinfo.nar_hash).cloned(); + + match maybe_root_node { + Some(root_node) => { + // Persist the PathInfo. + path_info_service + .put(PathInfo { + store_path: narinfo.store_path.to_owned(), + node: root_node, + references: narinfo.references.iter().map(StorePath::to_owned).collect(), + nar_sha256: narinfo.nar_hash, + nar_size: narinfo.nar_size, + signatures: narinfo + .signatures + .into_iter() + .map(|s| { + Signature::<String>::new(s.name().to_string(), s.bytes().to_owned()) + }) + .collect(), + deriver: narinfo.deriver.as_ref().map(StorePath::to_owned), + ca: narinfo.ca, + }) + .await + .map_err(|e| { + warn!(err=%e, "failed to persist the PathInfo"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok("") + } + None => { + warn!("received narinfo with unknown NARHash"); + Err(StatusCode::BAD_REQUEST) + } + } +} |