about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/pathinfoservice')
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs32
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs2
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs213
3 files changed, 240 insertions, 7 deletions
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index 0f1f8d5c96a6..e7faf20144bb 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -1,6 +1,9 @@
 use crate::proto::path_info_service_client::PathInfoServiceClient;
 
-use super::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService, SledPathInfoService};
+use super::{
+    GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService,
+    SledPathInfoService,
+};
 
 use std::sync::Arc;
 use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
@@ -62,6 +65,16 @@ pub async fn from_addr(
             SledPathInfoService::new(url.path().into(), blob_service, directory_service)
                 .map_err(|e| Error::StorageError(e.to_string()))?,
         ));
+    } else if url.scheme() == "nix+http" || url.scheme() == "nix+https" {
+        // Stringify the URL and remove the nix+ prefix.
+        // We can't use `url.set_scheme(rest)`, as it disallows
+        // setting something http(s) that previously wasn't.
+        let url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap();
+        Arc::new(NixHTTPPathInfoService::new(
+            url,
+            blob_service,
+            directory_service,
+        ))
     } else if url.scheme().starts_with("grpc+") {
         // schemes starting with grpc+ go to the GRPCPathInfoService.
         //   That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
@@ -113,6 +126,14 @@ mod tests {
     #[test_case("memory:///", false; "memory invalid root path")]
     /// This sets a memory url path to "/foo", which is invalid.
     #[test_case("memory:///foo", false; "memory invalid root path foo")]
+    /// Correct Scheme for the cache.nixos.org binary cache.
+    #[test_case("nix+https://cache.nixos.org", true; "correct nix+https")]
+    /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL).
+    #[test_case("nix+http://cache.nixos.org", true; "correct nix+http")]
+    /// Correct Scheme for Nix HTTP Binary cache, with a subpath.
+    #[test_case("nix+http://192.0.2.1/foo", true; "correct nix http with subpath")]
+    /// Correct Scheme for Nix HTTP Binary cache, with a subpath and port.
+    #[test_case("nix+http://[::1]:8080/foo", true; "correct nix http with subpath and port")]
     /// Correct scheme to connect to a unix socket.
     #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")]
     /// Correct scheme for unix socket, but setting a host too, which is invalid.
@@ -127,11 +148,8 @@ mod tests {
     #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")]
     #[tokio::test]
     async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) {
-        assert_eq!(
-            from_addr(uri_str, gen_blob_service(), gen_directory_service())
-                .await
-                .is_ok(),
-            is_ok
-        )
+        let resp = from_addr(uri_str, gen_blob_service(), gen_directory_service()).await;
+
+        assert_eq!(resp.is_ok(), is_ok);
     }
 }
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
index 3fde10179b36..5faa0900a0b0 100644
--- a/tvix/store/src/pathinfoservice/mod.rs
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -1,6 +1,7 @@
 mod from_addr;
 mod grpc;
 mod memory;
+mod nix_http;
 mod sled;
 
 use futures::Stream;
@@ -14,6 +15,7 @@ use crate::proto::PathInfo;
 pub use self::from_addr::from_addr;
 pub use self::grpc::GRPCPathInfoService;
 pub use self::memory::MemoryPathInfoService;
+pub use self::nix_http::NixHTTPPathInfoService;
 pub use self::sled::SledPathInfoService;
 
 /// The base trait all PathInfo services need to implement.
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
new file mode 100644
index 000000000000..49cd515ce913
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -0,0 +1,213 @@
+use std::{
+    io::{self, BufRead},
+    pin::Pin,
+    sync::Arc,
+};
+
+use data_encoding::BASE64;
+use futures::{Stream, TryStreamExt};
+use nix_compat::{narinfo::NarInfo, nixbase32};
+use reqwest::StatusCode;
+use tonic::async_trait;
+use tracing::{debug, instrument, warn};
+use tvix_castore::{
+    blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
+};
+
+use crate::proto::PathInfo;
+
+use super::PathInfoService;
+
+/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
+/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix
+/// Store Model.
+/// It implements the [PathInfoService] trait in an interesting way:
+/// Every [PathInfoService::get] fetches the .narinfo and referred NAR file,
+/// inserting components into a [BlobService] and [DirectoryService], then
+/// returning a [PathInfo] struct with the root.
+///
+/// Due to this being quite a costly operation, clients are expected to layer
+/// this service with store composition, so they're only ingested once.
+///
+/// The client is expected to be (indirectly) using the same [BlobService] and
+/// [DirectoryService], so able to fetch referred Directories and Blobs.
+/// [PathInfoService::put] and [PathInfoService::nar] are not implemented and
+/// return an error if called.
+/// TODO: what about reading from nix-cache-info?
+pub struct NixHTTPPathInfoService {
+    base_url: url::Url,
+    http_client: reqwest::Client,
+
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+}
+
+impl NixHTTPPathInfoService {
+    pub fn new(
+        base_url: url::Url,
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) -> Self {
+        Self {
+            base_url,
+            http_client: reqwest::Client::new(),
+            blob_service,
+            directory_service,
+        }
+    }
+}
+
+#[async_trait]
+impl PathInfoService for NixHTTPPathInfoService {
+    #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let narinfo_url = self
+            .base_url
+            .join(&format!("{}.narinfo", nixbase32::encode(&digest)))
+            .map_err(|e| {
+                warn!(e = %e, "unable to join URL");
+                io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
+            })?;
+
+        debug!(narinfo_url= %narinfo_url, "constructed NARInfo url");
+
+        let resp = self
+            .http_client
+            .get(narinfo_url)
+            .send()
+            .await
+            .map_err(|e| {
+                warn!(e=%e,"unable to send NARInfo request");
+                io::Error::new(
+                    io::ErrorKind::InvalidInput,
+                    "unable to send NARInfo request",
+                )
+            })?;
+
+        // In the case of a 404, return a NotFound.
+        // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix,
+        // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org.
+        if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
+            return Ok(None);
+        }
+
+        let narinfo_str = resp.text().await.map_err(|e| {
+            warn!(e=%e,"unable to decode response as string");
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "unable to decode response as string",
+            )
+        })?;
+
+        // parse the received narinfo
+        let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| {
+            warn!(e=%e,"unable to parse response as NarInfo");
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "unable to parse response as NarInfo",
+            )
+        })?;
+
+        // Convert to a (sparse) PathInfo. We still need to populate the node field,
+        // and for this we need to download the NAR file.
+        // FUTUREWORK: Keep some database around mapping from narsha256 to
+        // (unnamed) rootnode, so we can use that (and the name from the
+        // StorePath) and avoid downloading the same NAR a second time.
+        let pathinfo: PathInfo = (&narinfo).into();
+
+        // create a request for the NAR file itself.
+        let nar_url = self.base_url.join(narinfo.url).map_err(|e| {
+            warn!(e = %e, "unable to join URL");
+            io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
+        })?;
+        debug!(nar_url= %nar_url, "constructed NAR url");
+
+        let resp = self
+            .http_client
+            .get(nar_url.clone())
+            .send()
+            .await
+            .map_err(|e| {
+                warn!(e=%e,"unable to send NAR request");
+                io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request")
+            })?;
+
+        // if the request is not successful, return an error.
+        if !resp.status().is_success() {
+            return Err(Error::StorageError(format!(
+                "unable to retrieve NAR at {}, status {}",
+                nar_url,
+                resp.status()
+            )));
+        }
+
+        // get an AsyncRead of the response body.
+        let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
+            let e = e.without_url();
+            warn!(e=%e, "failed to get response body");
+            io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
+        }));
+        let sync_r = std::io::BufReader::new(tokio_util::io::SyncIoBridge::new(async_r));
+
+        // handle decompression, by wrapping the reader.
+        let mut sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
+            Some("none") => Box::new(sync_r),
+            Some("xz") => Box::new(std::io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
+            Some(comp) => {
+                return Err(Error::InvalidRequest(
+                    format!("unsupported compression: {}", comp).to_string(),
+                ))
+            }
+            None => {
+                return Err(Error::InvalidRequest(
+                    "unsupported compression: bzip2".to_string(),
+                ))
+            }
+        };
+
+        let res = tokio::task::spawn_blocking({
+            let blob_service = self.blob_service.clone();
+            let directory_service = self.directory_service.clone();
+            move || crate::nar::read_nar(&mut sync_r, blob_service, directory_service)
+        })
+        .await
+        .unwrap();
+
+        match res {
+            Ok(root_node) => Ok(Some(PathInfo {
+                node: Some(castorepb::Node {
+                    // set the name of the root node to the digest-name of the store path.
+                    node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
+                }),
+                references: pathinfo.references,
+                narinfo: pathinfo.narinfo,
+            })),
+            Err(e) => Err(e.into()),
+        }
+    }
+
+    #[instrument(skip_all, fields(path_info=?_path_info))]
+    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
+        Err(Error::InvalidRequest(
+            "put not supported for this backend".to_string(),
+        ))
+    }
+
+    #[instrument(skip_all, fields(root_node=?root_node))]
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        Err(Error::InvalidRequest(
+            "calculate_nar not supported for this backend".to_string(),
+        ))
+    }
+
+    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
+        Box::pin(futures::stream::once(async {
+            Err(Error::InvalidRequest(
+                "list not supported for this backend".to_string(),
+            ))
+        }))
+    }
+}