about summary refs log tree commit diff
path: root/tvix/glue/src/tvix_store_io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r--tvix/glue/src/tvix_store_io.rs152
1 files changed, 14 insertions, 138 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 3c86f426308d..0994c44dfaff 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -4,10 +4,9 @@ use async_recursion::async_recursion;
 use bytes::Bytes;
 use futures::{StreamExt, TryStreamExt};
 use nix_compat::nixhash::NixHash;
-use nix_compat::store_path::{build_ca_path, StorePathRef};
+use nix_compat::store_path::StorePathRef;
 use nix_compat::{nixhash::CAHash, store_path::StorePath};
 use sha2::{Digest, Sha256};
-use std::rc::Rc;
 use std::{
     cell::RefCell,
     collections::BTreeSet,
@@ -15,24 +14,22 @@ use std::{
     path::{Path, PathBuf},
     sync::Arc,
 };
-use tokio::io::AsyncBufRead;
-use tokio_util::io::{InspectReader, SyncIoBridge};
+use tokio_util::io::SyncIoBridge;
 use tracing::{error, instrument, warn, Level};
 use tvix_build::buildservice::BuildService;
-use tvix_castore::import::archive::ingest_archive;
-use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
+use tvix_castore::proto::node::Node;
+use tvix_eval::{EvalIO, FileType, StdIO};
 use tvix_store::utils::AsyncIoBridge;
 
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::{self, DirectoryService},
-    proto::{node::Node, FileNode, NamedNode},
+    proto::NamedNode,
     B3Digest,
 };
 use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
 
-use crate::builtins::FetcherError;
-use crate::decompression::DecompressedReader;
+use crate::fetchers::Fetcher;
 use crate::known_paths::KnownPaths;
 use crate::tvix_build::derivation_to_build_request;
 
@@ -60,7 +57,10 @@ pub struct TvixStoreIO {
     #[allow(dead_code)]
     build_service: Arc<dyn BuildService>,
     pub(crate) tokio_handle: tokio::runtime::Handle,
-    http_client: reqwest::Client,
+
+    pub(crate) fetcher:
+        Fetcher<Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>>,
+
     pub(crate) known_paths: RefCell<KnownPaths>,
 }
 
@@ -73,13 +73,13 @@ impl TvixStoreIO {
         tokio_handle: tokio::runtime::Handle,
     ) -> Self {
         Self {
-            blob_service,
-            directory_service,
-            path_info_service,
+            blob_service: blob_service.clone(),
+            directory_service: directory_service.clone(),
+            path_info_service: path_info_service.clone(),
             std_io: StdIO {},
             build_service,
             tokio_handle,
-            http_client: reqwest::Client::new(),
+            fetcher: Fetcher::new(blob_service, directory_service, path_info_service),
             known_paths: Default::default(),
         }
     }
@@ -358,130 +358,6 @@ impl TvixStoreIO {
             .await?
             .is_some())
     }
-
-    async fn download<'a>(&self, url: &str) -> Result<impl AsyncBufRead + Unpin + 'a, ErrorKind> {
-        let resp = self
-            .http_client
-            .get(url)
-            .send()
-            .await
-            .map_err(FetcherError::from)?;
-        Ok(tokio_util::io::StreamReader::new(
-            resp.bytes_stream().map_err(|e| {
-                let e = e.without_url();
-                warn!(%e, "failed to get response body");
-                io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
-            }),
-        ))
-    }
-
-    pub async fn fetch_url(
-        &self,
-        url: &str,
-        name: &str,
-        hash: Option<&NixHash>,
-    ) -> Result<StorePath, ErrorKind> {
-        let mut sha = Sha256::new();
-        let data = self.download(url).await?;
-        let mut data = InspectReader::new(data, |b| sha.update(b));
-        let mut blob = self.blob_service.open_write().await;
-        let size = tokio::io::copy(&mut data, blob.as_mut()).await?;
-        drop(data);
-        let blob_digest = blob.close().await?;
-        let got = NixHash::Sha256(sha.finalize().into());
-
-        let hash = CAHash::Flat(if let Some(wanted) = hash {
-            if *wanted != got {
-                return Err(FetcherError::HashMismatch {
-                    url: url.to_owned(),
-                    wanted: wanted.clone(),
-                    got,
-                }
-                .into());
-            }
-            wanted.clone()
-        } else {
-            got
-        });
-
-        let path = build_ca_path(name, &hash, Vec::<String>::new(), false)
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
-        let node = Node::File(FileNode {
-            name: path.to_string().into(),
-            digest: blob_digest.into(),
-            size,
-            executable: false,
-        });
-
-        let (nar_size, nar_sha256) = self
-            .path_info_service
-            .calculate_nar(&node)
-            .await
-            .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
-
-        let path_info = PathInfo {
-            node: Some(tvix_castore::proto::Node {
-                node: Some(node.clone()),
-            }),
-            references: vec![],
-            narinfo: Some(tvix_store::proto::NarInfo {
-                nar_size,
-                nar_sha256: nar_sha256.to_vec().into(),
-                signatures: vec![],
-                reference_names: vec![],
-                deriver: None, /* ? */
-                ca: Some((&hash).into()),
-            }),
-        };
-
-        self.path_info_service
-            .put(path_info)
-            .await
-            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-
-        Ok(path.to_owned())
-    }
-
-    pub async fn fetch_tarball(
-        &self,
-        url: &str,
-        name: &str,
-        ca: Option<CAHash>,
-    ) -> Result<StorePath, ErrorKind> {
-        let data = self.download(url).await?;
-        let data = DecompressedReader::new(data);
-        let archive = tokio_tar::Archive::new(data);
-        let node = ingest_archive(&self.blob_service, &self.directory_service, archive)
-            .await
-            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-
-        let (path_info, got, output_path) = self
-            .node_to_path_info(
-                name,
-                Path::new(""),
-                ca.clone().expect("TODO: support unspecified CA hash"),
-                node,
-            )
-            .await?;
-
-        if let Some(wanted) = &ca {
-            if *wanted.hash() != got {
-                return Err(FetcherError::HashMismatch {
-                    url: url.to_owned(),
-                    wanted: wanted.hash().into_owned(),
-                    got,
-                }
-                .into());
-            }
-        }
-
-        self.path_info_service
-            .put(path_info)
-            .await
-            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-
-        Ok(output_path)
-    }
 }
 
 impl EvalIO for TvixStoreIO {