From 26b77b2cf3138e4db236bf243b718f3de23b0529 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 26 Apr 2024 18:58:07 +0300 Subject: refactor(tvix/glue): move decompression into fetchers/ subdir This is specifically used for the fetcher code (only). Moving it to there for now. Change-Id: I1e1d0541b85340ef4ff3a4c6b3fa99b51853f539 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11532 Reviewed-by: edef Autosubmit: flokli Tested-by: BuildkiteCI --- tvix/glue/src/decompression.rs | 222 ---------------- tvix/glue/src/fetchers.rs | 441 ------------------------------- tvix/glue/src/fetchers/decompression.rs | 222 ++++++++++++++++ tvix/glue/src/fetchers/mod.rs | 444 ++++++++++++++++++++++++++++++++ tvix/glue/src/lib.rs | 1 - 5 files changed, 666 insertions(+), 664 deletions(-) delete mode 100644 tvix/glue/src/decompression.rs delete mode 100644 tvix/glue/src/fetchers.rs create mode 100644 tvix/glue/src/fetchers/decompression.rs create mode 100644 tvix/glue/src/fetchers/mod.rs (limited to 'tvix') diff --git a/tvix/glue/src/decompression.rs b/tvix/glue/src/decompression.rs deleted file mode 100644 index 11dc9d98352a..000000000000 --- a/tvix/glue/src/decompression.rs +++ /dev/null @@ -1,222 +0,0 @@ -#![allow(dead_code)] // TODO - -use std::{ - io, mem, - pin::Pin, - task::{Context, Poll}, -}; - -use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder}; -use futures::ready; -use pin_project::pin_project; -use tokio::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf}; - -const GZIP_MAGIC: [u8; 2] = [0x1f, 0x8b]; -const BZIP2_MAGIC: [u8; 3] = *b"BZh"; -const XZ_MAGIC: [u8; 6] = [0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]; -const BYTES_NEEDED: usize = 6; - -#[derive(Debug, Clone, Copy)] -enum Algorithm { - Gzip, - Bzip2, - Xz, -} - -impl Algorithm { - fn from_magic(magic: &[u8]) -> Option { - if magic.starts_with(&GZIP_MAGIC) { - Some(Self::Gzip) - } else if magic.starts_with(&BZIP2_MAGIC) { - Some(Self::Bzip2) - } else if magic.starts_with(&XZ_MAGIC) { - Some(Self::Xz) - } else { - None - } - } -} - -#[pin_project] -struct WithPreexistingBuffer { - buffer: Vec, - #[pin] - inner: R, -} - -impl AsyncRead for WithPreexistingBuffer -where - R: AsyncRead, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - let this = self.project(); - if !this.buffer.is_empty() { - // TODO: check if the buffer fits first - buf.put_slice(this.buffer); - this.buffer.clear(); - } - this.inner.poll_read(cx, buf) - } -} - -#[pin_project(project = DecompressedReaderInnerProj)] -enum DecompressedReaderInner { - Unknown { - buffer: Vec, - #[pin] - inner: Option, - }, - Gzip(#[pin] GzipDecoder>>), - Bzip2(#[pin] BzDecoder>>), - Xz(#[pin] XzDecoder>>), -} - -impl DecompressedReaderInner -where - R: AsyncBufRead, -{ - fn switch_to(&mut self, algorithm: Algorithm) { - let (buffer, inner) = match self { - DecompressedReaderInner::Unknown { buffer, inner } => { - (mem::take(buffer), inner.take().unwrap()) - } - DecompressedReaderInner::Gzip(_) - | DecompressedReaderInner::Bzip2(_) - | DecompressedReaderInner::Xz(_) => unreachable!(), - }; - let inner = BufReader::new(WithPreexistingBuffer { buffer, inner }); - - *self = match algorithm { - Algorithm::Gzip => Self::Gzip(GzipDecoder::new(inner)), - Algorithm::Bzip2 => Self::Bzip2(BzDecoder::new(inner)), - Algorithm::Xz => Self::Xz(XzDecoder::new(inner)), - } - } -} - -impl AsyncRead for DecompressedReaderInner -where - R: AsyncBufRead, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - match self.project() { - DecompressedReaderInnerProj::Unknown { .. } => { - unreachable!("Can't call poll_read on Unknown") - } - DecompressedReaderInnerProj::Gzip(inner) => inner.poll_read(cx, buf), - DecompressedReaderInnerProj::Bzip2(inner) => inner.poll_read(cx, buf), - DecompressedReaderInnerProj::Xz(inner) => inner.poll_read(cx, buf), - } - } -} - -#[pin_project] -pub struct DecompressedReader { - #[pin] - inner: DecompressedReaderInner, - switch_to: Option, -} - -impl DecompressedReader { - pub fn new(inner: R) -> Self { - Self { - inner: DecompressedReaderInner::Unknown { - buffer: vec![0; BYTES_NEEDED], - inner: Some(inner), - }, - switch_to: None, - } - } -} - -impl AsyncRead for DecompressedReader -where - R: AsyncBufRead + Unpin, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - let mut this = self.project(); - let (buffer, inner) = match this.inner.as_mut().project() { - DecompressedReaderInnerProj::Gzip(inner) => return inner.poll_read(cx, buf), - DecompressedReaderInnerProj::Bzip2(inner) => return inner.poll_read(cx, buf), - DecompressedReaderInnerProj::Xz(inner) => return inner.poll_read(cx, buf), - DecompressedReaderInnerProj::Unknown { buffer, inner } => (buffer, inner), - }; - - let mut our_buf = ReadBuf::new(buffer); - if let Err(e) = ready!(inner.as_pin_mut().unwrap().poll_read(cx, &mut our_buf)) { - return Poll::Ready(Err(e)); - } - - let data = our_buf.filled(); - if data.len() >= BYTES_NEEDED { - if let Some(algorithm) = Algorithm::from_magic(data) { - this.inner.as_mut().switch_to(algorithm); - } else { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::InvalidData, - "tar data not gz, bzip2, or xz compressed", - ))); - } - this.inner.poll_read(cx, buf) - } else { - cx.waker().wake_by_ref(); - Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use std::path::Path; - - use async_compression::tokio::bufread::GzipEncoder; - use futures::TryStreamExt; - use rstest::rstest; - use tokio::io::{AsyncReadExt, BufReader}; - use tokio_tar::Archive; - - use super::*; - - #[tokio::test] - async fn gzip() { - let data = b"abcdefghijk"; - let mut enc = GzipEncoder::new(&data[..]); - let mut gzipped = vec![]; - enc.read_to_end(&mut gzipped).await.unwrap(); - - let mut reader = DecompressedReader::new(BufReader::new(&gzipped[..])); - let mut round_tripped = vec![]; - reader.read_to_end(&mut round_tripped).await.unwrap(); - - assert_eq!(data[..], round_tripped[..]); - } - - #[rstest] - #[case::gzip(include_bytes!("tests/blob.tar.gz"))] - #[case::bzip2(include_bytes!("tests/blob.tar.bz2"))] - #[case::xz(include_bytes!("tests/blob.tar.xz"))] - #[tokio::test] - async fn compressed_tar(#[case] data: &[u8]) { - let reader = DecompressedReader::new(BufReader::new(data)); - let mut archive = Archive::new(reader); - let mut entries: Vec<_> = archive.entries().unwrap().try_collect().await.unwrap(); - - assert_eq!(entries.len(), 1); - assert_eq!(entries[0].path().unwrap().as_ref(), Path::new("empty")); - let mut data = String::new(); - entries[0].read_to_string(&mut data).await.unwrap(); - assert_eq!(data, ""); - } -} diff --git a/tvix/glue/src/fetchers.rs b/tvix/glue/src/fetchers.rs deleted file mode 100644 index 7560c447d8d2..000000000000 --- a/tvix/glue/src/fetchers.rs +++ /dev/null @@ -1,441 +0,0 @@ -use futures::TryStreamExt; -use md5::Md5; -use nix_compat::{ - nixhash::{CAHash, HashAlgo, NixHash}, - store_path::{build_ca_path, BuildStorePathError, StorePathRef}, -}; -use sha1::Sha1; -use sha2::{digest::Output, Digest, Sha256, Sha512}; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite}; -use tokio_util::io::InspectReader; -use tracing::warn; -use tvix_castore::{ - blobservice::BlobService, - directoryservice::DirectoryService, - proto::{node::Node, FileNode}, -}; -use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; -use url::Url; - -use crate::{builtins::FetcherError, decompression::DecompressedReader}; - -/// Representing options for doing a fetch. -#[derive(Clone, Eq, PartialEq)] -pub enum Fetch { - /// Fetch a literal file from the given URL, with an optional expected - /// NixHash of it. - /// TODO: check if this is *always* sha256, and if so, make it [u8; 32]. - URL(Url, Option), - - /// Fetch a tarball from the given URL and unpack. - /// The file must be a tape archive (.tar) compressed with gzip, bzip2 or xz. - /// The top-level path component of the files in the tarball is removed, - /// so it is best if the tarball contains a single directory at top level. - /// Optionally, a sha256 digest can be provided to verify the unpacked - /// contents against. - Tarball(Url, Option<[u8; 32]>), - - /// TODO - Git(), -} - -// Drops potentially sensitive username and password from a URL. -fn redact_url(url: &Url) -> Url { - let mut url = url.to_owned(); - if !url.username().is_empty() { - let _ = url.set_username("redacted"); - } - - if url.password().is_some() { - let _ = url.set_password(Some("redacted")); - } - - url -} - -impl std::fmt::Debug for Fetch { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Fetch::URL(url, nixhash) => { - let url = redact_url(url); - if let Some(nixhash) = nixhash { - write!(f, "URL [url: {}, exp_hash: Some({})]", &url, nixhash) - } else { - write!(f, "URL [url: {}, exp_hash: None]", &url) - } - } - Fetch::Tarball(url, exp_digest) => { - let url = redact_url(url); - if let Some(exp_digest) = exp_digest { - write!( - f, - "Tarball [url: {}, exp_hash: Some({})]", - url, - NixHash::Sha256(*exp_digest) - ) - } else { - write!(f, "Tarball [url: {}, exp_hash: None]", url) - } - } - Fetch::Git() => todo!(), - } - } -} - -impl Fetch { - /// If the [Fetch] contains an expected hash upfront, returns the resulting - /// store path. - /// This doesn't do any fetching. - pub fn store_path<'a>( - &self, - name: &'a str, - ) -> Result>, BuildStorePathError> { - let ca_hash = match self { - Fetch::URL(_, Some(nixhash)) => CAHash::Flat(nixhash.clone()), - Fetch::Tarball(_, Some(nar_sha256)) => CAHash::Nar(NixHash::Sha256(*nar_sha256)), - _ => return Ok(None), - }; - - // calculate the store path of this fetch - build_ca_path(name, &ca_hash, Vec::::new(), false).map(Some) - } -} - -/// Knows how to fetch a given [Fetch]. -pub struct Fetcher { - http_client: reqwest::Client, - blob_service: BS, - directory_service: DS, - path_info_service: PS, -} - -impl Fetcher { - pub fn new(blob_service: BS, directory_service: DS, path_info_service: PS) -> Self { - Self { - http_client: reqwest::Client::new(), - blob_service, - directory_service, - path_info_service, - } - } - - /// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it. - /// In case the URI uses the file:// scheme, use tokio::fs to open it. - async fn download(&self, url: Url) -> Result, FetcherError> { - match url.scheme() { - "file" => { - let f = tokio::fs::File::open(url.to_file_path().map_err(|_| { - // "Returns Err if the host is neither empty nor "localhost" - // (except on Windows, where file: URLs may have a non-local host)" - FetcherError::Io(std::io::Error::new( - std::io::ErrorKind::Other, - "invalid host for file:// scheme", - )) - })?) - .await?; - Ok(Box::new(tokio::io::BufReader::new(f))) - } - _ => { - let resp = self.http_client.get(url).send().await?; - Ok(Box::new(tokio_util::io::StreamReader::new( - resp.bytes_stream().map_err(|e| { - let e = e.without_url(); - warn!(%e, "failed to get response body"); - std::io::Error::new(std::io::ErrorKind::BrokenPipe, e) - }), - ))) - } - } - } -} - -/// Copies all data from the passed reader to the passed writer. -/// Afterwards, it also returns the resulting [Digest], as well as the number of -/// bytes copied. -/// The exact hash function used is left generic over all [Digest]. -async fn hash( - mut r: impl AsyncRead + Unpin, - mut w: impl AsyncWrite + Unpin, -) -> std::io::Result<(Output, u64)> { - let mut hasher = D::new(); - let bytes_copied = tokio::io::copy( - &mut InspectReader::new(&mut r, |d| hasher.write_all(d).unwrap()), - &mut w, - ) - .await?; - Ok((hasher.finalize(), bytes_copied)) -} - -impl Fetcher -where - BS: AsRef<(dyn BlobService + 'static)> + Clone + Send + Sync + 'static, - DS: AsRef<(dyn DirectoryService + 'static)>, - PS: PathInfoService, -{ - /// Ingest the data from a specified [Fetch]. - /// On success, return the root node, a content digest and length. - /// Returns an error if there was a failure during fetching, or the contents - /// didn't match the previously communicated hash contained inside the FetchArgs. - pub async fn ingest(&self, fetch: Fetch) -> Result<(Node, CAHash, u64), FetcherError> { - match fetch { - Fetch::URL(url, exp_nixhash) => { - // Construct a AsyncRead reading from the data as its downloaded. - let mut r = self.download(url.clone()).await?; - - // Construct a AsyncWrite to write into the BlobService. - let mut blob_writer = self.blob_service.open_write().await; - - // Copy the contents from the download reader to the blob writer. - // Calculate the digest of the file received, depending on the - // communicated expected hash (or sha256 if none provided). - let (actual_nixhash, blob_size) = match exp_nixhash - .as_ref() - .map(NixHash::algo) - .unwrap_or_else(|| HashAlgo::Sha256) - { - HashAlgo::Sha256 => hash::(&mut r, &mut blob_writer).await.map( - |(digest, bytes_written)| (NixHash::Sha256(digest.into()), bytes_written), - )?, - HashAlgo::Md5 => hash::(&mut r, &mut blob_writer).await.map( - |(digest, bytes_written)| (NixHash::Md5(digest.into()), bytes_written), - )?, - HashAlgo::Sha1 => hash::(&mut r, &mut blob_writer).await.map( - |(digest, bytes_written)| (NixHash::Sha1(digest.into()), bytes_written), - )?, - HashAlgo::Sha512 => hash::(&mut r, &mut blob_writer).await.map( - |(digest, bytes_written)| { - (NixHash::Sha512(Box::new(digest.into())), bytes_written) - }, - )?, - }; - - if let Some(exp_nixhash) = exp_nixhash { - if exp_nixhash != actual_nixhash { - return Err(FetcherError::HashMismatch { - url, - wanted: exp_nixhash, - got: actual_nixhash, - }); - } - } - - // Construct and return the FileNode describing the downloaded contents. - Ok(( - Node::File(FileNode { - name: vec![].into(), - digest: blob_writer.close().await?.into(), - size: blob_size, - executable: false, - }), - CAHash::Flat(actual_nixhash), - blob_size, - )) - } - Fetch::Tarball(url, exp_nar_sha256) => { - // Construct a AsyncRead reading from the data as its downloaded. - let r = self.download(url.clone()).await?; - - // Pop compression. - let r = DecompressedReader::new(r); - // Open the archive. - let archive = tokio_tar::Archive::new(r); - - // Ingest the archive, get the root node - let node = tvix_castore::import::archive::ingest_archive( - self.blob_service.clone(), - &self.directory_service, - archive, - ) - .await?; - - // If an expected NAR sha256 was provided, compare with the one - // calculated from our root node. - // Even if no expected NAR sha256 has been provided, we need - // the actual one later. - let (nar_size, actual_nar_sha256) = self - .path_info_service - .calculate_nar(&node) - .await - .map_err(|e| { - // convert the generic Store error to an IO error. - FetcherError::Io(e.into()) - })?; - - if let Some(exp_nar_sha256) = exp_nar_sha256 { - if exp_nar_sha256 != actual_nar_sha256 { - return Err(FetcherError::HashMismatch { - url, - wanted: NixHash::Sha256(exp_nar_sha256), - got: NixHash::Sha256(actual_nar_sha256), - }); - } - } - - Ok(( - node, - CAHash::Nar(NixHash::Sha256(actual_nar_sha256)), - nar_size, - )) - } - Fetch::Git() => todo!(), - } - } - - /// Ingests the data from a specified [Fetch], persists the returned node - /// in the PathInfoService, and returns the calculated StorePath, as well as - /// the root node pointing to the contents. - /// The root node can be used to descend into the data without doing the - /// lookup to the PathInfoService again. - pub async fn ingest_and_persist<'a>( - &self, - name: &'a str, - fetch: Fetch, - ) -> Result<(StorePathRef<'a>, Node), FetcherError> { - // Fetch file, return the (unnamed) (File)Node of its contents, ca hash and filesize. - let (node, ca_hash, size) = self.ingest(fetch).await?; - - // Calculate the store path to return later, which is done with the ca_hash. - let store_path = build_ca_path(name, &ca_hash, Vec::::new(), false)?; - - // Rename the node name to match the Store Path. - let node = node.rename(store_path.to_string().into()); - - // If the resulting hash is not a CAHash::Nar, we also need to invoke - // `calculate_nar` to calculate this representation, as it's required in - // the [PathInfo]. - let (nar_size, nar_sha256) = match &ca_hash { - CAHash::Flat(_nix_hash) => self - .path_info_service - .calculate_nar(&node) - .await - .map_err(|e| FetcherError::Io(e.into()))?, - CAHash::Nar(NixHash::Sha256(nar_sha256)) => (size, *nar_sha256), - CAHash::Nar(_) => unreachable!("Tvix bug: fetch returned non-sha256 CAHash::Nar"), - CAHash::Text(_) => unreachable!("Tvix bug: fetch returned CAHash::Text"), - }; - - // Construct the PathInfo and persist it. - let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { node: Some(node) }), - references: vec![], - narinfo: Some(tvix_store::proto::NarInfo { - nar_size, - nar_sha256: nar_sha256.to_vec().into(), - signatures: vec![], - reference_names: vec![], - deriver: None, - ca: Some(ca_hash.into()), - }), - }; - - let path_info = self - .path_info_service - .put(path_info) - .await - .map_err(|e| FetcherError::Io(e.into()))?; - - Ok((store_path, path_info.node.unwrap().node.unwrap())) - } -} - -/// Attempts to mimic `nix::libutil::baseNameOf` -pub(crate) fn url_basename(s: &str) -> &str { - if s.is_empty() { - return ""; - } - - let mut last = s.len() - 1; - if s.chars().nth(last).unwrap() == '/' && last > 0 { - last -= 1; - } - - if last == 0 { - return ""; - } - - let pos = match s[..=last].rfind('/') { - Some(pos) => { - if pos == last - 1 { - 0 - } else { - pos - } - } - None => 0, - }; - - &s[(pos + 1)..=last] -} - -#[cfg(test)] -mod tests { - mod fetch { - use nix_compat::nixbase32; - - use crate::fetchers::Fetch; - - use super::super::*; - - #[test] - fn fetchurl_store_path() { - let url = Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(); - let exp_nixhash = NixHash::Sha256( - nixbase32::decode_fixed("0nawkl04sj7psw6ikzay7kydj3dhd0fkwghcsf5rzaw4bmp4kbax") - .unwrap(), - ); - - let fetch = Fetch::URL(url, Some(exp_nixhash)); - assert_eq!( - "06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch", - &fetch - .store_path("notmuch-extract-patch") - .unwrap() - .unwrap() - .to_string(), - ) - } - - #[test] - fn fetch_tarball_store_path() { - let url = Url::parse("https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz").unwrap(); - let exp_nixbase32 = - nixbase32::decode_fixed("1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm") - .unwrap(); - let fetch = Fetch::Tarball(url, Some(exp_nixbase32)); - - assert_eq!( - "7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source", - &fetch.store_path("source").unwrap().unwrap().to_string(), - ) - } - } - - mod url_basename { - use super::super::*; - - #[test] - fn empty_path() { - assert_eq!(url_basename(""), ""); - } - - #[test] - fn path_on_root() { - assert_eq!(url_basename("/dir"), "dir"); - } - - #[test] - fn relative_path() { - assert_eq!(url_basename("dir/foo"), "foo"); - } - - #[test] - fn root_with_trailing_slash() { - assert_eq!(url_basename("/"), ""); - } - - #[test] - fn trailing_slash() { - assert_eq!(url_basename("/dir/"), "dir"); - } - } -} diff --git a/tvix/glue/src/fetchers/decompression.rs b/tvix/glue/src/fetchers/decompression.rs new file mode 100644 index 000000000000..f96fa60e34f7 --- /dev/null +++ b/tvix/glue/src/fetchers/decompression.rs @@ -0,0 +1,222 @@ +#![allow(dead_code)] // TODO + +use std::{ + io, mem, + pin::Pin, + task::{Context, Poll}, +}; + +use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder}; +use futures::ready; +use pin_project::pin_project; +use tokio::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf}; + +const GZIP_MAGIC: [u8; 2] = [0x1f, 0x8b]; +const BZIP2_MAGIC: [u8; 3] = *b"BZh"; +const XZ_MAGIC: [u8; 6] = [0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]; +const BYTES_NEEDED: usize = 6; + +#[derive(Debug, Clone, Copy)] +enum Algorithm { + Gzip, + Bzip2, + Xz, +} + +impl Algorithm { + fn from_magic(magic: &[u8]) -> Option { + if magic.starts_with(&GZIP_MAGIC) { + Some(Self::Gzip) + } else if magic.starts_with(&BZIP2_MAGIC) { + Some(Self::Bzip2) + } else if magic.starts_with(&XZ_MAGIC) { + Some(Self::Xz) + } else { + None + } + } +} + +#[pin_project] +struct WithPreexistingBuffer { + buffer: Vec, + #[pin] + inner: R, +} + +impl AsyncRead for WithPreexistingBuffer +where + R: AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.project(); + if !this.buffer.is_empty() { + // TODO: check if the buffer fits first + buf.put_slice(this.buffer); + this.buffer.clear(); + } + this.inner.poll_read(cx, buf) + } +} + +#[pin_project(project = DecompressedReaderInnerProj)] +enum DecompressedReaderInner { + Unknown { + buffer: Vec, + #[pin] + inner: Option, + }, + Gzip(#[pin] GzipDecoder>>), + Bzip2(#[pin] BzDecoder>>), + Xz(#[pin] XzDecoder>>), +} + +impl DecompressedReaderInner +where + R: AsyncBufRead, +{ + fn switch_to(&mut self, algorithm: Algorithm) { + let (buffer, inner) = match self { + DecompressedReaderInner::Unknown { buffer, inner } => { + (mem::take(buffer), inner.take().unwrap()) + } + DecompressedReaderInner::Gzip(_) + | DecompressedReaderInner::Bzip2(_) + | DecompressedReaderInner::Xz(_) => unreachable!(), + }; + let inner = BufReader::new(WithPreexistingBuffer { buffer, inner }); + + *self = match algorithm { + Algorithm::Gzip => Self::Gzip(GzipDecoder::new(inner)), + Algorithm::Bzip2 => Self::Bzip2(BzDecoder::new(inner)), + Algorithm::Xz => Self::Xz(XzDecoder::new(inner)), + } + } +} + +impl AsyncRead for DecompressedReaderInner +where + R: AsyncBufRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match self.project() { + DecompressedReaderInnerProj::Unknown { .. } => { + unreachable!("Can't call poll_read on Unknown") + } + DecompressedReaderInnerProj::Gzip(inner) => inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Bzip2(inner) => inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Xz(inner) => inner.poll_read(cx, buf), + } + } +} + +#[pin_project] +pub struct DecompressedReader { + #[pin] + inner: DecompressedReaderInner, + switch_to: Option, +} + +impl DecompressedReader { + pub fn new(inner: R) -> Self { + Self { + inner: DecompressedReaderInner::Unknown { + buffer: vec![0; BYTES_NEEDED], + inner: Some(inner), + }, + switch_to: None, + } + } +} + +impl AsyncRead for DecompressedReader +where + R: AsyncBufRead + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let mut this = self.project(); + let (buffer, inner) = match this.inner.as_mut().project() { + DecompressedReaderInnerProj::Gzip(inner) => return inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Bzip2(inner) => return inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Xz(inner) => return inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Unknown { buffer, inner } => (buffer, inner), + }; + + let mut our_buf = ReadBuf::new(buffer); + if let Err(e) = ready!(inner.as_pin_mut().unwrap().poll_read(cx, &mut our_buf)) { + return Poll::Ready(Err(e)); + } + + let data = our_buf.filled(); + if data.len() >= BYTES_NEEDED { + if let Some(algorithm) = Algorithm::from_magic(data) { + this.inner.as_mut().switch_to(algorithm); + } else { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + "tar data not gz, bzip2, or xz compressed", + ))); + } + this.inner.poll_read(cx, buf) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +#[cfg(test)] +mod tests { + use std::path::Path; + + use async_compression::tokio::bufread::GzipEncoder; + use futures::TryStreamExt; + use rstest::rstest; + use tokio::io::{AsyncReadExt, BufReader}; + use tokio_tar::Archive; + + use super::*; + + #[tokio::test] + async fn gzip() { + let data = b"abcdefghijk"; + let mut enc = GzipEncoder::new(&data[..]); + let mut gzipped = vec![]; + enc.read_to_end(&mut gzipped).await.unwrap(); + + let mut reader = DecompressedReader::new(BufReader::new(&gzipped[..])); + let mut round_tripped = vec![]; + reader.read_to_end(&mut round_tripped).await.unwrap(); + + assert_eq!(data[..], round_tripped[..]); + } + + #[rstest] + #[case::gzip(include_bytes!("../tests/blob.tar.gz"))] + #[case::bzip2(include_bytes!("../tests/blob.tar.bz2"))] + #[case::xz(include_bytes!("../tests/blob.tar.xz"))] + #[tokio::test] + async fn compressed_tar(#[case] data: &[u8]) { + let reader = DecompressedReader::new(BufReader::new(data)); + let mut archive = Archive::new(reader); + let mut entries: Vec<_> = archive.entries().unwrap().try_collect().await.unwrap(); + + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].path().unwrap().as_ref(), Path::new("empty")); + let mut data = String::new(); + entries[0].read_to_string(&mut data).await.unwrap(); + assert_eq!(data, ""); + } +} diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs new file mode 100644 index 000000000000..4b53d0fdac60 --- /dev/null +++ b/tvix/glue/src/fetchers/mod.rs @@ -0,0 +1,444 @@ +use futures::TryStreamExt; +use md5::Md5; +use nix_compat::{ + nixhash::{CAHash, HashAlgo, NixHash}, + store_path::{build_ca_path, BuildStorePathError, StorePathRef}, +}; +use sha1::Sha1; +use sha2::{digest::Output, Digest, Sha256, Sha512}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use tokio_util::io::InspectReader; +use tracing::warn; +use tvix_castore::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{node::Node, FileNode}, +}; +use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; +use url::Url; + +use crate::builtins::FetcherError; + +mod decompression; +use decompression::DecompressedReader; + +/// Representing options for doing a fetch. +#[derive(Clone, Eq, PartialEq)] +pub enum Fetch { + /// Fetch a literal file from the given URL, with an optional expected + /// NixHash of it. + /// TODO: check if this is *always* sha256, and if so, make it [u8; 32]. + URL(Url, Option), + + /// Fetch a tarball from the given URL and unpack. + /// The file must be a tape archive (.tar) compressed with gzip, bzip2 or xz. + /// The top-level path component of the files in the tarball is removed, + /// so it is best if the tarball contains a single directory at top level. + /// Optionally, a sha256 digest can be provided to verify the unpacked + /// contents against. + Tarball(Url, Option<[u8; 32]>), + + /// TODO + Git(), +} + +// Drops potentially sensitive username and password from a URL. +fn redact_url(url: &Url) -> Url { + let mut url = url.to_owned(); + if !url.username().is_empty() { + let _ = url.set_username("redacted"); + } + + if url.password().is_some() { + let _ = url.set_password(Some("redacted")); + } + + url +} + +impl std::fmt::Debug for Fetch { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Fetch::URL(url, nixhash) => { + let url = redact_url(url); + if let Some(nixhash) = nixhash { + write!(f, "URL [url: {}, exp_hash: Some({})]", &url, nixhash) + } else { + write!(f, "URL [url: {}, exp_hash: None]", &url) + } + } + Fetch::Tarball(url, exp_digest) => { + let url = redact_url(url); + if let Some(exp_digest) = exp_digest { + write!( + f, + "Tarball [url: {}, exp_hash: Some({})]", + url, + NixHash::Sha256(*exp_digest) + ) + } else { + write!(f, "Tarball [url: {}, exp_hash: None]", url) + } + } + Fetch::Git() => todo!(), + } + } +} + +impl Fetch { + /// If the [Fetch] contains an expected hash upfront, returns the resulting + /// store path. + /// This doesn't do any fetching. + pub fn store_path<'a>( + &self, + name: &'a str, + ) -> Result>, BuildStorePathError> { + let ca_hash = match self { + Fetch::URL(_, Some(nixhash)) => CAHash::Flat(nixhash.clone()), + Fetch::Tarball(_, Some(nar_sha256)) => CAHash::Nar(NixHash::Sha256(*nar_sha256)), + _ => return Ok(None), + }; + + // calculate the store path of this fetch + build_ca_path(name, &ca_hash, Vec::::new(), false).map(Some) + } +} + +/// Knows how to fetch a given [Fetch]. +pub struct Fetcher { + http_client: reqwest::Client, + blob_service: BS, + directory_service: DS, + path_info_service: PS, +} + +impl Fetcher { + pub fn new(blob_service: BS, directory_service: DS, path_info_service: PS) -> Self { + Self { + http_client: reqwest::Client::new(), + blob_service, + directory_service, + path_info_service, + } + } + + /// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it. + /// In case the URI uses the file:// scheme, use tokio::fs to open it. + async fn download(&self, url: Url) -> Result, FetcherError> { + match url.scheme() { + "file" => { + let f = tokio::fs::File::open(url.to_file_path().map_err(|_| { + // "Returns Err if the host is neither empty nor "localhost" + // (except on Windows, where file: URLs may have a non-local host)" + FetcherError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + "invalid host for file:// scheme", + )) + })?) + .await?; + Ok(Box::new(tokio::io::BufReader::new(f))) + } + _ => { + let resp = self.http_client.get(url).send().await?; + Ok(Box::new(tokio_util::io::StreamReader::new( + resp.bytes_stream().map_err(|e| { + let e = e.without_url(); + warn!(%e, "failed to get response body"); + std::io::Error::new(std::io::ErrorKind::BrokenPipe, e) + }), + ))) + } + } + } +} + +/// Copies all data from the passed reader to the passed writer. +/// Afterwards, it also returns the resulting [Digest], as well as the number of +/// bytes copied. +/// The exact hash function used is left generic over all [Digest]. +async fn hash( + mut r: impl AsyncRead + Unpin, + mut w: impl AsyncWrite + Unpin, +) -> std::io::Result<(Output, u64)> { + let mut hasher = D::new(); + let bytes_copied = tokio::io::copy( + &mut InspectReader::new(&mut r, |d| hasher.write_all(d).unwrap()), + &mut w, + ) + .await?; + Ok((hasher.finalize(), bytes_copied)) +} + +impl Fetcher +where + BS: AsRef<(dyn BlobService + 'static)> + Clone + Send + Sync + 'static, + DS: AsRef<(dyn DirectoryService + 'static)>, + PS: PathInfoService, +{ + /// Ingest the data from a specified [Fetch]. + /// On success, return the root node, a content digest and length. + /// Returns an error if there was a failure during fetching, or the contents + /// didn't match the previously communicated hash contained inside the FetchArgs. + pub async fn ingest(&self, fetch: Fetch) -> Result<(Node, CAHash, u64), FetcherError> { + match fetch { + Fetch::URL(url, exp_nixhash) => { + // Construct a AsyncRead reading from the data as its downloaded. + let mut r = self.download(url.clone()).await?; + + // Construct a AsyncWrite to write into the BlobService. + let mut blob_writer = self.blob_service.open_write().await; + + // Copy the contents from the download reader to the blob writer. + // Calculate the digest of the file received, depending on the + // communicated expected hash (or sha256 if none provided). + let (actual_nixhash, blob_size) = match exp_nixhash + .as_ref() + .map(NixHash::algo) + .unwrap_or_else(|| HashAlgo::Sha256) + { + HashAlgo::Sha256 => hash::(&mut r, &mut blob_writer).await.map( + |(digest, bytes_written)| (NixHash::Sha256(digest.into()), bytes_written), + )?, + HashAlgo::Md5 => hash::(&mut r, &mut blob_writer).await.map( + |(digest, bytes_written)| (NixHash::Md5(digest.into()), bytes_written), + )?, + HashAlgo::Sha1 => hash::(&mut r, &mut blob_writer).await.map( + |(digest, bytes_written)| (NixHash::Sha1(digest.into()), bytes_written), + )?, + HashAlgo::Sha512 => hash::(&mut r, &mut blob_writer).await.map( + |(digest, bytes_written)| { + (NixHash::Sha512(Box::new(digest.into())), bytes_written) + }, + )?, + }; + + if let Some(exp_nixhash) = exp_nixhash { + if exp_nixhash != actual_nixhash { + return Err(FetcherError::HashMismatch { + url, + wanted: exp_nixhash, + got: actual_nixhash, + }); + } + } + + // Construct and return the FileNode describing the downloaded contents. + Ok(( + Node::File(FileNode { + name: vec![].into(), + digest: blob_writer.close().await?.into(), + size: blob_size, + executable: false, + }), + CAHash::Flat(actual_nixhash), + blob_size, + )) + } + Fetch::Tarball(url, exp_nar_sha256) => { + // Construct a AsyncRead reading from the data as its downloaded. + let r = self.download(url.clone()).await?; + + // Pop compression. + let r = DecompressedReader::new(r); + // Open the archive. + let archive = tokio_tar::Archive::new(r); + + // Ingest the archive, get the root node + let node = tvix_castore::import::archive::ingest_archive( + self.blob_service.clone(), + &self.directory_service, + archive, + ) + .await?; + + // If an expected NAR sha256 was provided, compare with the one + // calculated from our root node. + // Even if no expected NAR sha256 has been provided, we need + // the actual one later. + let (nar_size, actual_nar_sha256) = self + .path_info_service + .calculate_nar(&node) + .await + .map_err(|e| { + // convert the generic Store error to an IO error. + FetcherError::Io(e.into()) + })?; + + if let Some(exp_nar_sha256) = exp_nar_sha256 { + if exp_nar_sha256 != actual_nar_sha256 { + return Err(FetcherError::HashMismatch { + url, + wanted: NixHash::Sha256(exp_nar_sha256), + got: NixHash::Sha256(actual_nar_sha256), + }); + } + } + + Ok(( + node, + CAHash::Nar(NixHash::Sha256(actual_nar_sha256)), + nar_size, + )) + } + Fetch::Git() => todo!(), + } + } + + /// Ingests the data from a specified [Fetch], persists the returned node + /// in the PathInfoService, and returns the calculated StorePath, as well as + /// the root node pointing to the contents. + /// The root node can be used to descend into the data without doing the + /// lookup to the PathInfoService again. + pub async fn ingest_and_persist<'a>( + &self, + name: &'a str, + fetch: Fetch, + ) -> Result<(StorePathRef<'a>, Node), FetcherError> { + // Fetch file, return the (unnamed) (File)Node of its contents, ca hash and filesize. + let (node, ca_hash, size) = self.ingest(fetch).await?; + + // Calculate the store path to return later, which is done with the ca_hash. + let store_path = build_ca_path(name, &ca_hash, Vec::::new(), false)?; + + // Rename the node name to match the Store Path. + let node = node.rename(store_path.to_string().into()); + + // If the resulting hash is not a CAHash::Nar, we also need to invoke + // `calculate_nar` to calculate this representation, as it's required in + // the [PathInfo]. + let (nar_size, nar_sha256) = match &ca_hash { + CAHash::Flat(_nix_hash) => self + .path_info_service + .calculate_nar(&node) + .await + .map_err(|e| FetcherError::Io(e.into()))?, + CAHash::Nar(NixHash::Sha256(nar_sha256)) => (size, *nar_sha256), + CAHash::Nar(_) => unreachable!("Tvix bug: fetch returned non-sha256 CAHash::Nar"), + CAHash::Text(_) => unreachable!("Tvix bug: fetch returned CAHash::Text"), + }; + + // Construct the PathInfo and persist it. + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { node: Some(node) }), + references: vec![], + narinfo: Some(tvix_store::proto::NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: vec![], + deriver: None, + ca: Some(ca_hash.into()), + }), + }; + + let path_info = self + .path_info_service + .put(path_info) + .await + .map_err(|e| FetcherError::Io(e.into()))?; + + Ok((store_path, path_info.node.unwrap().node.unwrap())) + } +} + +/// Attempts to mimic `nix::libutil::baseNameOf` +pub(crate) fn url_basename(s: &str) -> &str { + if s.is_empty() { + return ""; + } + + let mut last = s.len() - 1; + if s.chars().nth(last).unwrap() == '/' && last > 0 { + last -= 1; + } + + if last == 0 { + return ""; + } + + let pos = match s[..=last].rfind('/') { + Some(pos) => { + if pos == last - 1 { + 0 + } else { + pos + } + } + None => 0, + }; + + &s[(pos + 1)..=last] +} + +#[cfg(test)] +mod tests { + mod fetch { + use nix_compat::nixbase32; + + use crate::fetchers::Fetch; + + use super::super::*; + + #[test] + fn fetchurl_store_path() { + let url = Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(); + let exp_nixhash = NixHash::Sha256( + nixbase32::decode_fixed("0nawkl04sj7psw6ikzay7kydj3dhd0fkwghcsf5rzaw4bmp4kbax") + .unwrap(), + ); + + let fetch = Fetch::URL(url, Some(exp_nixhash)); + assert_eq!( + "06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch", + &fetch + .store_path("notmuch-extract-patch") + .unwrap() + .unwrap() + .to_string(), + ) + } + + #[test] + fn fetch_tarball_store_path() { + let url = Url::parse("https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz").unwrap(); + let exp_nixbase32 = + nixbase32::decode_fixed("1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm") + .unwrap(); + let fetch = Fetch::Tarball(url, Some(exp_nixbase32)); + + assert_eq!( + "7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source", + &fetch.store_path("source").unwrap().unwrap().to_string(), + ) + } + } + + mod url_basename { + use super::super::*; + + #[test] + fn empty_path() { + assert_eq!(url_basename(""), ""); + } + + #[test] + fn path_on_root() { + assert_eq!(url_basename("/dir"), "dir"); + } + + #[test] + fn relative_path() { + assert_eq!(url_basename("dir/foo"), "foo"); + } + + #[test] + fn root_with_trailing_slash() { + assert_eq!(url_basename("/"), ""); + } + + #[test] + fn trailing_slash() { + assert_eq!(url_basename("/dir/"), "dir"); + } + } +} diff --git a/tvix/glue/src/lib.rs b/tvix/glue/src/lib.rs index 8528f09e528c..2e5a3be103a1 100644 --- a/tvix/glue/src/lib.rs +++ b/tvix/glue/src/lib.rs @@ -6,7 +6,6 @@ pub mod tvix_build; pub mod tvix_io; pub mod tvix_store_io; -mod decompression; #[cfg(test)] mod tests; -- cgit 1.4.1