diff options
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 107 |
1 files changed, 103 insertions, 4 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index c09f0098e43e..30ab97c0ca03 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -4,7 +4,12 @@ use async_recursion::async_recursion; use bytes::Bytes; use futures::Stream; use futures::{StreamExt, TryStreamExt}; +use nix_compat::nixhash::NixHash; +use nix_compat::store_path::{build_ca_path, StorePathRef}; use nix_compat::{nixhash::CAHash, store_path::StorePath}; +use sha2::{Digest, Sha256}; +use std::marker::Unpin; +use std::rc::Rc; use std::{ cell::RefCell, collections::BTreeSet, @@ -15,17 +20,18 @@ use std::{ use tokio::io::AsyncReadExt; use tracing::{error, instrument, warn, Level}; use tvix_build::buildservice::BuildService; -use tvix_eval::{EvalIO, FileType, StdIO}; +use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO}; use walkdir::DirEntry; use tvix_castore::{ blobservice::BlobService, directoryservice::{self, DirectoryService}, - proto::{node::Node, NamedNode}, + proto::{node::Node, FileNode, NamedNode}, B3Digest, }; use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; +use crate::builtins::FetcherError; use crate::known_paths::KnownPaths; use crate::tvix_build::derivation_to_build_request; @@ -51,7 +57,8 @@ pub struct TvixStoreIO { std_io: StdIO, #[allow(dead_code)] build_service: Arc<dyn BuildService>, - tokio_handle: tokio::runtime::Handle, + pub(crate) tokio_handle: tokio::runtime::Handle, + http_client: reqwest::Client, pub(crate) known_paths: RefCell<KnownPaths>, } @@ -70,6 +77,7 @@ impl TvixStoreIO { std_io: StdIO {}, build_service, tokio_handle, + http_client: reqwest::Client::new(), known_paths: Default::default(), } } @@ -278,7 +286,7 @@ impl TvixStoreIO { /// with a [`tokio::runtime::Handle::block_on`] call for synchronicity. pub(crate) fn ingest_entries_sync<S>(&self, entries_stream: S) -> io::Result<Node> where - S: Stream<Item = DirEntry> + std::marker::Unpin, + S: Stream<Item = DirEntry> + Unpin, { self.tokio_handle.block_on(async move { tvix_castore::import::ingest_entries( @@ -346,6 +354,97 @@ impl TvixStoreIO { .await }) } + + pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result<bool> { + Ok(self + .path_info_service + .as_ref() + .get(*store_path.digest()) + .await? + .is_some()) + } + + pub async fn fetch_url( + &self, + url: &str, + name: &str, + hash: Option<&NixHash>, + ) -> Result<StorePath, ErrorKind> { + let resp = self + .http_client + .get(url) + .send() + .await + .map_err(FetcherError::from)?; + let mut sha = Sha256::new(); + let mut data = tokio_util::io::StreamReader::new( + resp.bytes_stream() + .inspect_ok(|data| { + sha.update(data); + }) + .map_err(|e| { + let e = e.without_url(); + warn!(%e, "failed to get response body"); + io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) + }), + ); + + let mut blob = self.blob_service.open_write().await; + let size = tokio::io::copy(&mut data, blob.as_mut()).await?; + let blob_digest = blob.close().await?; + let got = NixHash::Sha256(sha.finalize().into()); + + let hash = CAHash::Flat(if let Some(wanted) = hash { + if *wanted != got { + return Err(FetcherError::HashMismatch { + url: url.to_owned(), + wanted: wanted.clone(), + got, + } + .into()); + } + wanted.clone() + } else { + got + }); + + let path = build_ca_path(name, &hash, Vec::<String>::new(), false) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let node = Node::File(FileNode { + name: path.to_string().into(), + digest: blob_digest.into(), + size, + executable: false, + }); + + let (nar_size, nar_sha256) = self + .path_info_service + .calculate_nar(&node) + .await + .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?; + + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(node.clone()), + }), + references: vec![], + narinfo: Some(tvix_store::proto::NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: vec![], + deriver: None, /* ? */ + ca: Some((&hash).into()), + }), + }; + + self.path_info_service + .put(path_info) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + Ok(path.to_owned()) + } } impl EvalIO for TvixStoreIO { |