diff options
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 152 |
1 files changed, 14 insertions, 138 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index 3c86f426308d..0994c44dfaff 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -4,10 +4,9 @@ use async_recursion::async_recursion; use bytes::Bytes; use futures::{StreamExt, TryStreamExt}; use nix_compat::nixhash::NixHash; -use nix_compat::store_path::{build_ca_path, StorePathRef}; +use nix_compat::store_path::StorePathRef; use nix_compat::{nixhash::CAHash, store_path::StorePath}; use sha2::{Digest, Sha256}; -use std::rc::Rc; use std::{ cell::RefCell, collections::BTreeSet, @@ -15,24 +14,22 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tokio::io::AsyncBufRead; -use tokio_util::io::{InspectReader, SyncIoBridge}; +use tokio_util::io::SyncIoBridge; use tracing::{error, instrument, warn, Level}; use tvix_build::buildservice::BuildService; -use tvix_castore::import::archive::ingest_archive; -use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO}; +use tvix_castore::proto::node::Node; +use tvix_eval::{EvalIO, FileType, StdIO}; use tvix_store::utils::AsyncIoBridge; use tvix_castore::{ blobservice::BlobService, directoryservice::{self, DirectoryService}, - proto::{node::Node, FileNode, NamedNode}, + proto::NamedNode, B3Digest, }; use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; -use crate::builtins::FetcherError; -use crate::decompression::DecompressedReader; +use crate::fetchers::Fetcher; use crate::known_paths::KnownPaths; use crate::tvix_build::derivation_to_build_request; @@ -60,7 +57,10 @@ pub struct TvixStoreIO { #[allow(dead_code)] build_service: Arc<dyn BuildService>, pub(crate) tokio_handle: tokio::runtime::Handle, - http_client: reqwest::Client, + + pub(crate) fetcher: + Fetcher<Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>>, + pub(crate) known_paths: RefCell<KnownPaths>, } @@ -73,13 +73,13 @@ impl TvixStoreIO { tokio_handle: tokio::runtime::Handle, ) -> Self { Self { - blob_service, - directory_service, - path_info_service, + blob_service: blob_service.clone(), + directory_service: directory_service.clone(), + path_info_service: path_info_service.clone(), std_io: StdIO {}, build_service, tokio_handle, - http_client: reqwest::Client::new(), + fetcher: Fetcher::new(blob_service, directory_service, path_info_service), known_paths: Default::default(), } } @@ -358,130 +358,6 @@ impl TvixStoreIO { .await? .is_some()) } - - async fn download<'a>(&self, url: &str) -> Result<impl AsyncBufRead + Unpin + 'a, ErrorKind> { - let resp = self - .http_client - .get(url) - .send() - .await - .map_err(FetcherError::from)?; - Ok(tokio_util::io::StreamReader::new( - resp.bytes_stream().map_err(|e| { - let e = e.without_url(); - warn!(%e, "failed to get response body"); - io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) - }), - )) - } - - pub async fn fetch_url( - &self, - url: &str, - name: &str, - hash: Option<&NixHash>, - ) -> Result<StorePath, ErrorKind> { - let mut sha = Sha256::new(); - let data = self.download(url).await?; - let mut data = InspectReader::new(data, |b| sha.update(b)); - let mut blob = self.blob_service.open_write().await; - let size = tokio::io::copy(&mut data, blob.as_mut()).await?; - drop(data); - let blob_digest = blob.close().await?; - let got = NixHash::Sha256(sha.finalize().into()); - - let hash = CAHash::Flat(if let Some(wanted) = hash { - if *wanted != got { - return Err(FetcherError::HashMismatch { - url: url.to_owned(), - wanted: wanted.clone(), - got, - } - .into()); - } - wanted.clone() - } else { - got - }); - - let path = build_ca_path(name, &hash, Vec::<String>::new(), false) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let node = Node::File(FileNode { - name: path.to_string().into(), - digest: blob_digest.into(), - size, - executable: false, - }); - - let (nar_size, nar_sha256) = self - .path_info_service - .calculate_nar(&node) - .await - .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?; - - let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(node.clone()), - }), - references: vec![], - narinfo: Some(tvix_store::proto::NarInfo { - nar_size, - nar_sha256: nar_sha256.to_vec().into(), - signatures: vec![], - reference_names: vec![], - deriver: None, /* ? */ - ca: Some((&hash).into()), - }), - }; - - self.path_info_service - .put(path_info) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - Ok(path.to_owned()) - } - - pub async fn fetch_tarball( - &self, - url: &str, - name: &str, - ca: Option<CAHash>, - ) -> Result<StorePath, ErrorKind> { - let data = self.download(url).await?; - let data = DecompressedReader::new(data); - let archive = tokio_tar::Archive::new(data); - let node = ingest_archive(&self.blob_service, &self.directory_service, archive) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - let (path_info, got, output_path) = self - .node_to_path_info( - name, - Path::new(""), - ca.clone().expect("TODO: support unspecified CA hash"), - node, - ) - .await?; - - if let Some(wanted) = &ca { - if *wanted.hash() != got { - return Err(FetcherError::HashMismatch { - url: url.to_owned(), - wanted: wanted.hash().into_owned(), - got, - } - .into()); - } - } - - self.path_info_service - .put(path_info) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - Ok(output_path) - } } impl EvalIO for TvixStoreIO { |