about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice/nix_http.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/pathinfoservice/nix_http.rs')
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs213
1 files changed, 213 insertions, 0 deletions
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
new file mode 100644
index 000000000000..49cd515ce913
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -0,0 +1,213 @@
+use std::{
+    io::{self, BufRead},
+    pin::Pin,
+    sync::Arc,
+};
+
+use data_encoding::BASE64;
+use futures::{Stream, TryStreamExt};
+use nix_compat::{narinfo::NarInfo, nixbase32};
+use reqwest::StatusCode;
+use tonic::async_trait;
+use tracing::{debug, instrument, warn};
+use tvix_castore::{
+    blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
+};
+
+use crate::proto::PathInfo;
+
+use super::PathInfoService;
+
+/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
+/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix
+/// Store Model.
+/// It implements the [PathInfoService] trait in an interesting way:
+/// Every [PathInfoService::get] fetches the .narinfo and referred NAR file,
+/// inserting components into a [BlobService] and [DirectoryService], then
+/// returning a [PathInfo] struct with the root.
+///
+/// Due to this being quite a costly operation, clients are expected to layer
+/// this service with store composition, so they're only ingested once.
+///
+/// The client is expected to be (indirectly) using the same [BlobService] and
+/// [DirectoryService], so able to fetch referred Directories and Blobs.
+/// [PathInfoService::put] and [PathInfoService::nar] are not implemented and
+/// return an error if called.
+/// TODO: what about reading from nix-cache-info?
+pub struct NixHTTPPathInfoService {
+    base_url: url::Url,
+    http_client: reqwest::Client,
+
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+}
+
+impl NixHTTPPathInfoService {
+    pub fn new(
+        base_url: url::Url,
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) -> Self {
+        Self {
+            base_url,
+            http_client: reqwest::Client::new(),
+            blob_service,
+            directory_service,
+        }
+    }
+}
+
+#[async_trait]
+impl PathInfoService for NixHTTPPathInfoService {
+    #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let narinfo_url = self
+            .base_url
+            .join(&format!("{}.narinfo", nixbase32::encode(&digest)))
+            .map_err(|e| {
+                warn!(e = %e, "unable to join URL");
+                io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
+            })?;
+
+        debug!(narinfo_url= %narinfo_url, "constructed NARInfo url");
+
+        let resp = self
+            .http_client
+            .get(narinfo_url)
+            .send()
+            .await
+            .map_err(|e| {
+                warn!(e=%e,"unable to send NARInfo request");
+                io::Error::new(
+                    io::ErrorKind::InvalidInput,
+                    "unable to send NARInfo request",
+                )
+            })?;
+
+        // In the case of a 404, return a NotFound.
+        // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix,
+        // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org.
+        if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
+            return Ok(None);
+        }
+
+        let narinfo_str = resp.text().await.map_err(|e| {
+            warn!(e=%e,"unable to decode response as string");
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "unable to decode response as string",
+            )
+        })?;
+
+        // parse the received narinfo
+        let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| {
+            warn!(e=%e,"unable to parse response as NarInfo");
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "unable to parse response as NarInfo",
+            )
+        })?;
+
+        // Convert to a (sparse) PathInfo. We still need to populate the node field,
+        // and for this we need to download the NAR file.
+        // FUTUREWORK: Keep some database around mapping from narsha256 to
+        // (unnamed) rootnode, so we can use that (and the name from the
+        // StorePath) and avoid downloading the same NAR a second time.
+        let pathinfo: PathInfo = (&narinfo).into();
+
+        // create a request for the NAR file itself.
+        let nar_url = self.base_url.join(narinfo.url).map_err(|e| {
+            warn!(e = %e, "unable to join URL");
+            io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
+        })?;
+        debug!(nar_url= %nar_url, "constructed NAR url");
+
+        let resp = self
+            .http_client
+            .get(nar_url.clone())
+            .send()
+            .await
+            .map_err(|e| {
+                warn!(e=%e,"unable to send NAR request");
+                io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request")
+            })?;
+
+        // if the request is not successful, return an error.
+        if !resp.status().is_success() {
+            return Err(Error::StorageError(format!(
+                "unable to retrieve NAR at {}, status {}",
+                nar_url,
+                resp.status()
+            )));
+        }
+
+        // get an AsyncRead of the response body.
+        let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
+            let e = e.without_url();
+            warn!(e=%e, "failed to get response body");
+            io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
+        }));
+        let sync_r = std::io::BufReader::new(tokio_util::io::SyncIoBridge::new(async_r));
+
+        // handle decompression, by wrapping the reader.
+        let mut sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
+            Some("none") => Box::new(sync_r),
+            Some("xz") => Box::new(std::io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
+            Some(comp) => {
+                return Err(Error::InvalidRequest(
+                    format!("unsupported compression: {}", comp).to_string(),
+                ))
+            }
+            None => {
+                return Err(Error::InvalidRequest(
+                    "unsupported compression: bzip2".to_string(),
+                ))
+            }
+        };
+
+        let res = tokio::task::spawn_blocking({
+            let blob_service = self.blob_service.clone();
+            let directory_service = self.directory_service.clone();
+            move || crate::nar::read_nar(&mut sync_r, blob_service, directory_service)
+        })
+        .await
+        .unwrap();
+
+        match res {
+            Ok(root_node) => Ok(Some(PathInfo {
+                node: Some(castorepb::Node {
+                    // set the name of the root node to the digest-name of the store path.
+                    node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
+                }),
+                references: pathinfo.references,
+                narinfo: pathinfo.narinfo,
+            })),
+            Err(e) => Err(e.into()),
+        }
+    }
+
+    #[instrument(skip_all, fields(path_info=?_path_info))]
+    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
+        Err(Error::InvalidRequest(
+            "put not supported for this backend".to_string(),
+        ))
+    }
+
+    #[instrument(skip_all, fields(root_node=?root_node))]
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        Err(Error::InvalidRequest(
+            "calculate_nar not supported for this backend".to_string(),
+        ))
+    }
+
+    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
+        Box::pin(futures::stream::once(async {
+            Err(Error::InvalidRequest(
+                "list not supported for this backend".to_string(),
+            ))
+        }))
+    }
+}