diff options
Diffstat (limited to 'tvix/store/src/pathinfoservice')
-rw-r--r-- | tvix/store/src/pathinfoservice/from_addr.rs | 32 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/nix_http.rs | 213 |
3 files changed, 240 insertions, 7 deletions
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 0f1f8d5c96a6..e7faf20144bb 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -1,6 +1,9 @@ use crate::proto::path_info_service_client::PathInfoServiceClient; -use super::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService, SledPathInfoService}; +use super::{ + GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService, + SledPathInfoService, +}; use std::sync::Arc; use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; @@ -62,6 +65,16 @@ pub async fn from_addr( SledPathInfoService::new(url.path().into(), blob_service, directory_service) .map_err(|e| Error::StorageError(e.to_string()))?, )); + } else if url.scheme() == "nix+http" || url.scheme() == "nix+https" { + // Stringify the URL and remove the nix+ prefix. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap(); + Arc::new(NixHTTPPathInfoService::new( + url, + blob_service, + directory_service, + )) } else if url.scheme().starts_with("grpc+") { // schemes starting with grpc+ go to the GRPCPathInfoService. // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. @@ -113,6 +126,14 @@ mod tests { #[test_case("memory:///", false; "memory invalid root path")] /// This sets a memory url path to "/foo", which is invalid. #[test_case("memory:///foo", false; "memory invalid root path foo")] + /// Correct Scheme for the cache.nixos.org binary cache. + #[test_case("nix+https://cache.nixos.org", true; "correct nix+https")] + /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL). + #[test_case("nix+http://cache.nixos.org", true; "correct nix+http")] + /// Correct Scheme for Nix HTTP Binary cache, with a subpath. + #[test_case("nix+http://192.0.2.1/foo", true; "correct nix http with subpath")] + /// Correct Scheme for Nix HTTP Binary cache, with a subpath and port. + #[test_case("nix+http://[::1]:8080/foo", true; "correct nix http with subpath and port")] /// Correct scheme to connect to a unix socket. #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")] /// Correct scheme for unix socket, but setting a host too, which is invalid. @@ -127,11 +148,8 @@ mod tests { #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")] #[tokio::test] async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) { - assert_eq!( - from_addr(uri_str, gen_blob_service(), gen_directory_service()) - .await - .is_ok(), - is_ok - ) + let resp = from_addr(uri_str, gen_blob_service(), gen_directory_service()).await; + + assert_eq!(resp.is_ok(), is_ok); } } diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index 3fde10179b36..5faa0900a0b0 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -1,6 +1,7 @@ mod from_addr; mod grpc; mod memory; +mod nix_http; mod sled; use futures::Stream; @@ -14,6 +15,7 @@ use crate::proto::PathInfo; pub use self::from_addr::from_addr; pub use self::grpc::GRPCPathInfoService; pub use self::memory::MemoryPathInfoService; +pub use self::nix_http::NixHTTPPathInfoService; pub use self::sled::SledPathInfoService; /// The base trait all PathInfo services need to implement. diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs new file mode 100644 index 000000000000..49cd515ce913 --- /dev/null +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -0,0 +1,213 @@ +use std::{ + io::{self, BufRead}, + pin::Pin, + sync::Arc, +}; + +use data_encoding::BASE64; +use futures::{Stream, TryStreamExt}; +use nix_compat::{narinfo::NarInfo, nixbase32}; +use reqwest::StatusCode; +use tonic::async_trait; +use tracing::{debug, instrument, warn}; +use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, +}; + +use crate::proto::PathInfo; + +use super::PathInfoService; + +/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache +/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix +/// Store Model. +/// It implements the [PathInfoService] trait in an interesting way: +/// Every [PathInfoService::get] fetches the .narinfo and referred NAR file, +/// inserting components into a [BlobService] and [DirectoryService], then +/// returning a [PathInfo] struct with the root. +/// +/// Due to this being quite a costly operation, clients are expected to layer +/// this service with store composition, so they're only ingested once. +/// +/// The client is expected to be (indirectly) using the same [BlobService] and +/// [DirectoryService], so able to fetch referred Directories and Blobs. +/// [PathInfoService::put] and [PathInfoService::nar] are not implemented and +/// return an error if called. +/// TODO: what about reading from nix-cache-info? +pub struct NixHTTPPathInfoService { + base_url: url::Url, + http_client: reqwest::Client, + + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +} + +impl NixHTTPPathInfoService { + pub fn new( + base_url: url::Url, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Self { + Self { + base_url, + http_client: reqwest::Client::new(), + blob_service, + directory_service, + } + } +} + +#[async_trait] +impl PathInfoService for NixHTTPPathInfoService { + #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let narinfo_url = self + .base_url + .join(&format!("{}.narinfo", nixbase32::encode(&digest))) + .map_err(|e| { + warn!(e = %e, "unable to join URL"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to join url") + })?; + + debug!(narinfo_url= %narinfo_url, "constructed NARInfo url"); + + let resp = self + .http_client + .get(narinfo_url) + .send() + .await + .map_err(|e| { + warn!(e=%e,"unable to send NARInfo request"); + io::Error::new( + io::ErrorKind::InvalidInput, + "unable to send NARInfo request", + ) + })?; + + // In the case of a 404, return a NotFound. + // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix, + // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org. + if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN { + return Ok(None); + } + + let narinfo_str = resp.text().await.map_err(|e| { + warn!(e=%e,"unable to decode response as string"); + io::Error::new( + io::ErrorKind::InvalidData, + "unable to decode response as string", + ) + })?; + + // parse the received narinfo + let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| { + warn!(e=%e,"unable to parse response as NarInfo"); + io::Error::new( + io::ErrorKind::InvalidData, + "unable to parse response as NarInfo", + ) + })?; + + // Convert to a (sparse) PathInfo. We still need to populate the node field, + // and for this we need to download the NAR file. + // FUTUREWORK: Keep some database around mapping from narsha256 to + // (unnamed) rootnode, so we can use that (and the name from the + // StorePath) and avoid downloading the same NAR a second time. + let pathinfo: PathInfo = (&narinfo).into(); + + // create a request for the NAR file itself. + let nar_url = self.base_url.join(narinfo.url).map_err(|e| { + warn!(e = %e, "unable to join URL"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to join url") + })?; + debug!(nar_url= %nar_url, "constructed NAR url"); + + let resp = self + .http_client + .get(nar_url.clone()) + .send() + .await + .map_err(|e| { + warn!(e=%e,"unable to send NAR request"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request") + })?; + + // if the request is not successful, return an error. + if !resp.status().is_success() { + return Err(Error::StorageError(format!( + "unable to retrieve NAR at {}, status {}", + nar_url, + resp.status() + ))); + } + + // get an AsyncRead of the response body. + let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| { + let e = e.without_url(); + warn!(e=%e, "failed to get response body"); + io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) + })); + let sync_r = std::io::BufReader::new(tokio_util::io::SyncIoBridge::new(async_r)); + + // handle decompression, by wrapping the reader. + let mut sync_r: Box<dyn BufRead + Send> = match narinfo.compression { + Some("none") => Box::new(sync_r), + Some("xz") => Box::new(std::io::BufReader::new(xz2::read::XzDecoder::new(sync_r))), + Some(comp) => { + return Err(Error::InvalidRequest( + format!("unsupported compression: {}", comp).to_string(), + )) + } + None => { + return Err(Error::InvalidRequest( + "unsupported compression: bzip2".to_string(), + )) + } + }; + + let res = tokio::task::spawn_blocking({ + let blob_service = self.blob_service.clone(); + let directory_service = self.directory_service.clone(); + move || crate::nar::read_nar(&mut sync_r, blob_service, directory_service) + }) + .await + .unwrap(); + + match res { + Ok(root_node) => Ok(Some(PathInfo { + node: Some(castorepb::Node { + // set the name of the root node to the digest-name of the store path. + node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())), + }), + references: pathinfo.references, + narinfo: pathinfo.narinfo, + })), + Err(e) => Err(e.into()), + } + } + + #[instrument(skip_all, fields(path_info=?_path_info))] + async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> { + Err(Error::InvalidRequest( + "put not supported for this backend".to_string(), + )) + } + + #[instrument(skip_all, fields(root_node=?root_node))] + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + Err(Error::InvalidRequest( + "calculate_nar not supported for this backend".to_string(), + )) + } + + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + Box::pin(futures::stream::once(async { + Err(Error::InvalidRequest( + "list not supported for this backend".to_string(), + )) + })) + } +} |