about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-22T11·02+0300
committerclbot <clbot@tvl.fyi>2024-04-23T12·30+0000
commit091de12a9a735e71c119e543dab9f2999a36a5a1 (patch)
tree8df65d32934e026cec80256226da031954f715c1 /tvix
parentdc444e55dcb75a634bce94fef9e29d90ea90fe5f (diff)
refactor(tvix/glue): move Fetch[er] into its own types, fetch lazily r/7993
We actually want to delay fetching until we actually need the file. A
simple evaluation asking for `.outPath` or `.drvPath` should work even
in a pure offline environment.

Before this CL, the fetching logic was quite distributed between
tvix_store_io, and builtins/fetchers.rs.

Rather than having various functions and conversions between structs,
describe a Fetch as an enum type, with the fields describing the fetch.

Define a store_path() function on top of `Fetch` which can be used to
ask for the calculated store path (if the digest has been provided
upfront).

Have a `Fetcher` struct, and give it a `fetch_and_persist` function,
taking a `Fetch` as well as a desired name, and have it deal with all
the logic of persisting the PathInfos. It also returns a StorePathRef,
similar to the `.store_path()` method on a `Fetch` struct.

In a followup CL, we can extend KnownPaths to track fetches AND
derivations, and then use `Fetcher` when we need to do IO into that
store path.

Change-Id: Ib39a96baeb661750a8706b461f8ba4abb342e777
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11500
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix')
-rw-r--r--tvix/Cargo.lock2
-rw-r--r--tvix/Cargo.nix8
-rw-r--r--tvix/glue/Cargo.toml2
-rw-r--r--tvix/glue/src/builtins/errors.rs16
-rw-r--r--tvix/glue/src/builtins/fetchers.rs355
-rw-r--r--tvix/glue/src/fetchers.rs387
-rw-r--r--tvix/glue/src/lib.rs1
-rw-r--r--tvix/glue/src/tvix_store_io.rs152
8 files changed, 531 insertions, 392 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 364fbed662..b2be20a897 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -4394,6 +4394,7 @@ dependencies = [
  "hex-literal",
  "lazy_static",
  "magic",
+ "md-5",
  "nix 0.27.1",
  "nix-compat",
  "pin-project",
@@ -4402,6 +4403,7 @@ dependencies = [
  "rstest",
  "serde",
  "serde_json",
+ "sha1",
  "sha2",
  "tempfile",
  "thiserror",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 9efdb7dee9..25e5dccfd7 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -14102,6 +14102,10 @@ rec {
             packageId = "magic";
           }
           {
+            name = "md-5";
+            packageId = "md-5";
+          }
+          {
             name = "nix-compat";
             packageId = "nix-compat";
           }
@@ -14124,6 +14128,10 @@ rec {
             packageId = "serde_json";
           }
           {
+            name = "sha1";
+            packageId = "sha1";
+          }
+          {
             name = "sha2";
             packageId = "sha2";
           }
diff --git a/tvix/glue/Cargo.toml b/tvix/glue/Cargo.toml
index 1c08616cfb..f280117674 100644
--- a/tvix/glue/Cargo.toml
+++ b/tvix/glue/Cargo.toml
@@ -25,6 +25,8 @@ thiserror = "1.0.38"
 serde = "1.0.195"
 serde_json = "1.0"
 sha2 = "0.10.8"
+sha1 = "0.10.6"
+md-5 = "0.10.6"
 walkdir = "2.4.0"
 
 [dependencies.async-compression]
diff --git a/tvix/glue/src/builtins/errors.rs b/tvix/glue/src/builtins/errors.rs
index 53351cf902..5aced2bde4 100644
--- a/tvix/glue/src/builtins/errors.rs
+++ b/tvix/glue/src/builtins/errors.rs
@@ -41,17 +41,17 @@ pub enum FetcherError {
     #[error("Invalid hash type '{0}' for fetcher")]
     InvalidHashType(&'static str),
 
-    #[error("Error in store path for fetcher output: {0}")]
-    StorePath(#[from] BuildStorePathError),
-
     #[error(transparent)]
     Http(#[from] reqwest::Error),
-}
 
-impl From<FetcherError> for tvix_eval::ErrorKind {
-    fn from(err: FetcherError) -> Self {
-        tvix_eval::ErrorKind::TvixError(Rc::new(err))
-    }
+    #[error(transparent)]
+    Io(#[from] std::io::Error),
+
+    #[error(transparent)]
+    Import(#[from] tvix_castore::import::Error),
+
+    #[error("Error calculating store path for fetcher output: {0}")]
+    StorePath(#[from] BuildStorePathError),
 }
 
 /// Errors related to `builtins.path` and `builtins.filterSource`,
diff --git a/tvix/glue/src/builtins/fetchers.rs b/tvix/glue/src/builtins/fetchers.rs
index d5735b7d09..ec5dd969bc 100644
--- a/tvix/glue/src/builtins/fetchers.rs
+++ b/tvix/glue/src/builtins/fetchers.rs
@@ -1,197 +1,74 @@
 //! Contains builtins that fetch paths from the Internet
 
-use crate::tvix_store_io::TvixStoreIO;
-use bstr::ByteSlice;
-use nix_compat::nixhash::{self, CAHash};
-use nix_compat::store_path::{build_ca_path, StorePathRef};
+use super::utils::select_string;
+use crate::{
+    fetchers::{url_basename, Fetch},
+    tvix_store_io::TvixStoreIO,
+};
+use nix_compat::nixhash;
+use nix_compat::nixhash::NixHash;
 use std::rc::Rc;
+use tracing::info;
 use tvix_eval::builtin_macros::builtins;
+use tvix_eval::generators::Gen;
 use tvix_eval::generators::GenCo;
-use tvix_eval::{CatchableErrorKind, ErrorKind, NixContextElement, NixString, Value};
-
-use super::utils::select_string;
-use super::{DerivationError, FetcherError};
-
-/// Attempts to mimic `nix::libutil::baseNameOf`
-fn url_basename(s: &str) -> &str {
-    if s.is_empty() {
-        return "";
-    }
-
-    let mut last = s.len() - 1;
-    if s.chars().nth(last).unwrap() == '/' && last > 0 {
-        last -= 1;
-    }
-
-    if last == 0 {
-        return "";
-    }
+use tvix_eval::{CatchableErrorKind, ErrorKind, Value};
 
-    let pos = match s[..=last].rfind('/') {
-        Some(pos) => {
-            if pos == last - 1 {
-                0
-            } else {
-                pos
-            }
-        }
-        None => 0,
-    };
-
-    &s[(pos + 1)..=last]
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-enum HashMode {
-    Flat,
-    Recursive,
-}
-
-/// Struct representing the arguments passed to fetcher functions
-#[derive(Debug, PartialEq, Eq)]
-struct FetchArgs {
+struct NixFetchArgs {
     url: String,
-    name: String,
-    hash: Option<CAHash>,
+    name: Option<String>,
+    sha256: Option<[u8; 32]>,
 }
 
-impl FetchArgs {
-    pub fn new(
-        url: String,
-        name: Option<String>,
-        sha256: Option<String>,
-        mode: HashMode,
-    ) -> nixhash::NixHashResult<Self> {
-        Ok(Self {
-            name: name.unwrap_or_else(|| url_basename(&url).to_owned()),
+// `fetchurl` and `fetchTarball` accept a single argument, which can either be the URL (as string),
+// or an attrset, where `url`, `sha256` and `name` keys are allowed.
+async fn extract_fetch_args(
+    co: &GenCo,
+    args: Value,
+) -> Result<Result<NixFetchArgs, CatchableErrorKind>, ErrorKind> {
+    if let Ok(url) = args.to_str() {
+        // Get the raw bytes, not the ToString repr.
+        let url = String::from_utf8(url.as_bytes().to_vec()).map_err(|_| ErrorKind::Utf8)?;
+        return Ok(Ok(NixFetchArgs {
             url,
-            hash: sha256
-                .map(|h| {
-                    let hash = nixhash::from_str(&h, Some("sha256"))?;
-                    Ok(match mode {
-                        HashMode::Flat => Some(nixhash::CAHash::Flat(hash)),
-                        HashMode::Recursive => Some(nixhash::CAHash::Nar(hash)),
-                    })
-                })
-                .transpose()?
-                .flatten(),
-        })
+            name: None,
+            sha256: None,
+        }));
     }
 
-    fn store_path(&self) -> Result<Option<StorePathRef>, ErrorKind> {
-        let Some(h) = &self.hash else {
-            return Ok(None);
-        };
-        build_ca_path(&self.name, h, Vec::<String>::new(), false)
-            .map(Some)
-            .map_err(|e| FetcherError::from(e).into())
-    }
-
-    async fn extract(
-        co: &GenCo,
-        args: Value,
-        default_name: Option<&str>,
-        mode: HashMode,
-    ) -> Result<Result<Self, CatchableErrorKind>, ErrorKind> {
-        if let Ok(url) = args.to_str() {
-            return Ok(Ok(FetchArgs::new(
-                url.to_str()?.to_owned(),
-                None,
-                None,
-                mode,
-            )
-            .map_err(DerivationError::InvalidOutputHash)?));
-        }
-
-        let attrs = args.to_attrs().map_err(|_| ErrorKind::TypeError {
-            expected: "attribute set or string",
-            actual: args.type_of(),
-        })?;
-
-        let url = match select_string(co, &attrs, "url").await? {
-            Ok(s) => s.ok_or_else(|| ErrorKind::AttributeNotFound { name: "url".into() })?,
-            Err(cek) => return Ok(Err(cek)),
-        };
-        let name = match select_string(co, &attrs, "name").await? {
-            Ok(s) => s.or_else(|| default_name.map(|s| s.to_owned())),
-            Err(cek) => return Ok(Err(cek)),
-        };
-        let sha256 = match select_string(co, &attrs, "sha256").await? {
-            Ok(s) => s,
-            Err(cek) => return Ok(Err(cek)),
-        };
+    let attrs = args.to_attrs().map_err(|_| ErrorKind::TypeError {
+        expected: "attribute set or contextless string",
+        actual: args.type_of(),
+    })?;
 
-        Ok(Ok(
-            FetchArgs::new(url, name, sha256, mode).map_err(DerivationError::InvalidOutputHash)?
-        ))
-    }
-}
+    let url = match select_string(co, &attrs, "url").await? {
+        Ok(s) => s.ok_or_else(|| ErrorKind::AttributeNotFound { name: "url".into() })?,
+        Err(cek) => return Ok(Err(cek)),
+    };
+    let name = match select_string(co, &attrs, "name").await? {
+        Ok(s) => s,
+        Err(cek) => return Ok(Err(cek)),
+    };
+    let sha256_str = match select_string(co, &attrs, "sha256").await? {
+        Ok(s) => s,
+        Err(cek) => return Ok(Err(cek)),
+    };
 
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-enum FetchMode {
-    Url,
-    Tarball,
-}
+    // TODO: disallow other attrset keys, to match Nix' behaviour.
 
-impl From<FetchMode> for HashMode {
-    fn from(value: FetchMode) -> Self {
-        match value {
-            FetchMode::Url => HashMode::Flat,
-            FetchMode::Tarball => HashMode::Recursive,
-        }
-    }
-}
+    // parse the sha256 string into a digest.
+    let sha256 = match sha256_str {
+        Some(sha256_str) => {
+            let nixhash = nixhash::from_str(&sha256_str, Some("sha256"))
+                // TODO: DerivationError::InvalidOutputHash should be moved to ErrorKind::InvalidHash and used here instead
+                .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
 
-impl FetchMode {
-    fn default_name(self) -> Option<&'static str> {
-        match self {
-            FetchMode::Url => None,
-            FetchMode::Tarball => Some("source"),
+            Some(nixhash.digest_as_bytes().try_into().expect("is sha256"))
         }
-    }
-}
-
-fn string_from_store_path(store_path: StorePathRef) -> NixString {
-    NixString::new_context_from(
-        NixContextElement::Plain(store_path.to_absolute_path()).into(),
-        store_path.to_absolute_path(),
-    )
-}
-
-async fn fetch(
-    state: Rc<TvixStoreIO>,
-    co: GenCo,
-    args: Value,
-    mode: FetchMode,
-) -> Result<Value, ErrorKind> {
-    let args = match FetchArgs::extract(&co, args, mode.default_name(), mode.into()).await? {
-        Ok(args) => args,
-        Err(cek) => return Ok(cek.into()),
+        None => None,
     };
 
-    if let Some(store_path) = args.store_path()? {
-        if state.store_path_exists(store_path).await? {
-            return Ok(string_from_store_path(store_path).into());
-        }
-    }
-
-    let ca = args.hash;
-    let store_path = Rc::clone(&state).tokio_handle.block_on(async move {
-        match mode {
-            FetchMode::Url => {
-                state
-                    .fetch_url(
-                        &args.url,
-                        &args.name,
-                        ca.as_ref().map(|c| c.hash().into_owned()).as_ref(),
-                    )
-                    .await
-            }
-            FetchMode::Tarball => state.fetch_tarball(&args.url, &args.name, ca).await,
-        }
-    })?;
-
-    Ok(string_from_store_path(store_path.as_ref()).into())
+    Ok(Ok(NixFetchArgs { url, name, sha256 }))
 }
 
 #[allow(unused_variables)] // for the `state` arg, for now
@@ -199,7 +76,36 @@ async fn fetch(
 pub(crate) mod fetcher_builtins {
     use super::*;
 
-    use tvix_eval::generators::Gen;
+    /// Consumes a fetch.
+    /// If there is enough info to calculate the store path without fetching,
+    /// queue the fetch to be fetched lazily, and return the store path.
+    /// If there's not enough info to calculate it, do the fetch now, and then
+    /// return the store path.
+    fn fetch_lazy(state: Rc<TvixStoreIO>, name: String, fetch: Fetch) -> Result<Value, ErrorKind> {
+        match fetch
+            .store_path(&name)
+            .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?
+        {
+            Some(store_path) => {
+                let path = store_path.to_absolute_path().into();
+                // TODO: add fetch to fetcher
+                drop(fetch);
+
+                Ok(Value::Path(Box::new(path)))
+            }
+            None => {
+                // If we don't have enough info, do the fetch now.
+                info!(?fetch, "triggering required fetch");
+
+                let (store_path, _root_node) = state
+                    .tokio_handle
+                    .block_on(async { state.fetcher.ingest_and_persist(&name, fetch).await })
+                    .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
+
+                Ok(Value::Path(Box::new(store_path.to_absolute_path().into())))
+            }
+        }
+    }
 
     #[builtin("fetchurl")]
     async fn builtin_fetchurl(
@@ -207,7 +113,21 @@ pub(crate) mod fetcher_builtins {
         co: GenCo,
         args: Value,
     ) -> Result<Value, ErrorKind> {
-        fetch(state, co, args, FetchMode::Url).await
+        let args = match extract_fetch_args(&co, args).await? {
+            Ok(args) => args,
+            Err(cek) => return Ok(Value::from(cek)),
+        };
+
+        // Derive the name from the URL basename if not set explicitly.
+        let name = args
+            .name
+            .unwrap_or_else(|| url_basename(&args.url).to_owned());
+
+        fetch_lazy(
+            state,
+            name,
+            Fetch::URL(args.url, args.sha256.map(NixHash::Sha256)),
+        )
     }
 
     #[builtin("fetchTarball")]
@@ -216,7 +136,18 @@ pub(crate) mod fetcher_builtins {
         co: GenCo,
         args: Value,
     ) -> Result<Value, ErrorKind> {
-        fetch(state, co, args, FetchMode::Tarball).await
+        let args = match extract_fetch_args(&co, args).await? {
+            Ok(args) => args,
+            Err(cek) => return Ok(Value::from(cek)),
+        };
+
+        // Name defaults to "source" if not set explicitly.
+        const DEFAULT_NAME_FETCH_TARBALL: &str = "source";
+        let name = args
+            .name
+            .unwrap_or_else(|| DEFAULT_NAME_FETCH_TARBALL.to_owned());
+
+        fetch_lazy(state, name, Fetch::Tarball(args.url, args.sha256))
     }
 
     #[builtin("fetchGit")]
@@ -228,71 +159,3 @@ pub(crate) mod fetcher_builtins {
         Err(ErrorKind::NotImplemented("fetchGit"))
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use std::str::FromStr;
-
-    use nix_compat::store_path::StorePath;
-
-    use super::*;
-
-    #[test]
-    fn fetchurl_store_path() {
-        let url = "https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch";
-        let sha256 = "0nawkl04sj7psw6ikzay7kydj3dhd0fkwghcsf5rzaw4bmp4kbax";
-        let args = FetchArgs::new(url.into(), None, Some(sha256.into()), HashMode::Flat).unwrap();
-
-        assert_eq!(
-            args.store_path().unwrap().unwrap().to_owned(),
-            StorePath::from_str("06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap()
-        )
-    }
-
-    #[test]
-    fn fetch_tarball_store_path() {
-        let url = "https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz";
-        let sha256 = "1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm";
-        let args = FetchArgs::new(
-            url.into(),
-            Some("source".into()),
-            Some(sha256.into()),
-            HashMode::Recursive,
-        )
-        .unwrap();
-
-        assert_eq!(
-            args.store_path().unwrap().unwrap().to_owned(),
-            StorePath::from_str("7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source").unwrap()
-        )
-    }
-
-    mod url_basename {
-        use super::*;
-
-        #[test]
-        fn empty_path() {
-            assert_eq!(url_basename(""), "");
-        }
-
-        #[test]
-        fn path_on_root() {
-            assert_eq!(url_basename("/dir"), "dir");
-        }
-
-        #[test]
-        fn relative_path() {
-            assert_eq!(url_basename("dir/foo"), "foo");
-        }
-
-        #[test]
-        fn root_with_trailing_slash() {
-            assert_eq!(url_basename("/"), "");
-        }
-
-        #[test]
-        fn trailing_slash() {
-            assert_eq!(url_basename("/dir/"), "dir");
-        }
-    }
-}
diff --git a/tvix/glue/src/fetchers.rs b/tvix/glue/src/fetchers.rs
new file mode 100644
index 0000000000..977be4a203
--- /dev/null
+++ b/tvix/glue/src/fetchers.rs
@@ -0,0 +1,387 @@
+use futures::TryStreamExt;
+use md5::Md5;
+use nix_compat::{
+    nixhash::{CAHash, HashAlgo, NixHash},
+    store_path::{build_ca_path, BuildStorePathError, StorePathRef},
+};
+use sha1::Sha1;
+use sha2::{digest::Output, Digest, Sha256, Sha512};
+use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite};
+use tokio_util::io::InspectReader;
+use tracing::warn;
+use tvix_castore::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    proto::{node::Node, FileNode},
+};
+use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
+
+use crate::{builtins::FetcherError, decompression::DecompressedReader};
+
+/// Representing options for doing a fetch.
+#[derive(Clone, Debug)]
+pub enum Fetch {
+    /// Fetch a literal file from the given URL, with an optional expected
+    /// NixHash of it.
+    /// TODO: check if this is *always* sha256, and if so, make it [u8; 32].
+    URL(String, Option<NixHash>),
+
+    /// Fetch a tarball from the given URL and unpack.
+    /// The file must be a tape archive (.tar) compressed with gzip, bzip2 or xz.
+    /// The top-level path component of the files in the tarball is removed,
+    /// so it is best if the tarball contains a single directory at top level.
+    /// Optionally, a sha256 digest can be provided to verify the unpacked
+    /// contents against.
+    Tarball(String, Option<[u8; 32]>),
+
+    /// TODO
+    Git(),
+}
+
+impl Fetch {
+    /// If the [Fetch] contains an expected hash upfront, returns the resulting
+    /// store path.
+    /// This doesn't do any fetching.
+    pub fn store_path<'a>(
+        &self,
+        name: &'a str,
+    ) -> Result<Option<StorePathRef<'a>>, BuildStorePathError> {
+        let ca_hash = match self {
+            Fetch::URL(_, Some(nixhash)) => CAHash::Flat(nixhash.clone()),
+            Fetch::Tarball(_, Some(nar_sha256)) => CAHash::Nar(NixHash::Sha256(*nar_sha256)),
+            _ => return Ok(None),
+        };
+
+        // calculate the store path of this fetch
+        build_ca_path(name, &ca_hash, Vec::<String>::new(), false).map(Some)
+    }
+}
+
+/// Knows how to fetch a given [Fetch].
+pub struct Fetcher<BS, DS, PS> {
+    http_client: reqwest::Client,
+    blob_service: BS,
+    directory_service: DS,
+    path_info_service: PS,
+}
+
+impl<BS, DS, PS> Fetcher<BS, DS, PS> {
+    pub fn new(blob_service: BS, directory_service: DS, path_info_service: PS) -> Self {
+        Self {
+            http_client: reqwest::Client::new(),
+            blob_service,
+            directory_service,
+            path_info_service,
+        }
+    }
+
+    /// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it.
+    async fn download<'a>(
+        &self,
+        url: &str,
+    ) -> Result<impl AsyncBufRead + Unpin + 'a, reqwest::Error> {
+        let resp = self.http_client.get(url).send().await?;
+        Ok(tokio_util::io::StreamReader::new(
+            resp.bytes_stream().map_err(|e| {
+                let e = e.without_url();
+                warn!(%e, "failed to get response body");
+                std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)
+            }),
+        ))
+    }
+}
+
+/// Copies all data from the passed reader to the passed writer.
+/// Afterwards, it also returns the resulting [Digest], as well as the number of
+/// bytes copied.
+/// The exact hash function used is left generic over all [Digest].
+async fn hash<D: Digest + std::io::Write>(
+    mut r: impl AsyncRead + Unpin,
+    mut w: impl AsyncWrite + Unpin,
+) -> std::io::Result<(Output<D>, u64)> {
+    let mut hasher = D::new();
+    let bytes_copied = tokio::io::copy(
+        &mut InspectReader::new(&mut r, |d| hasher.write_all(d).unwrap()),
+        &mut w,
+    )
+    .await?;
+    Ok((hasher.finalize(), bytes_copied))
+}
+
+impl<BS, DS, PS> Fetcher<BS, DS, PS>
+where
+    BS: AsRef<(dyn BlobService + 'static)> + Send + Sync,
+    DS: AsRef<(dyn DirectoryService + 'static)>,
+    PS: PathInfoService,
+{
+    /// Ingest the data from a specified [Fetch].
+    /// On success, return the root node, a content digest and length.
+    /// Returns an error if there was a failure during fetching, or the contents
+    /// didn't match the previously communicated hash contained inside the FetchArgs.
+    pub async fn ingest(&self, fetch: Fetch) -> Result<(Node, CAHash, u64), FetcherError> {
+        match fetch {
+            Fetch::URL(url, exp_nixhash) => {
+                // Construct a AsyncRead reading from the data as its downloaded.
+                let mut r = self.download(&url).await?;
+
+                // Construct a AsyncWrite to write into the BlobService.
+                let mut blob_writer = self.blob_service.open_write().await;
+
+                // Copy the contents from the download reader to the blob writer.
+                // Calculate the digest of the file received, depending on the
+                // communicated expected hash (or sha256 if none provided).
+                let (actual_nixhash, blob_size) = match exp_nixhash
+                    .as_ref()
+                    .map(NixHash::algo)
+                    .unwrap_or_else(|| HashAlgo::Sha256)
+                {
+                    HashAlgo::Sha256 => hash::<Sha256>(&mut r, &mut blob_writer).await.map(
+                        |(digest, bytes_written)| (NixHash::Sha256(digest.into()), bytes_written),
+                    )?,
+                    HashAlgo::Md5 => hash::<Md5>(&mut r, &mut blob_writer).await.map(
+                        |(digest, bytes_written)| (NixHash::Md5(digest.into()), bytes_written),
+                    )?,
+                    HashAlgo::Sha1 => hash::<Sha1>(&mut r, &mut blob_writer).await.map(
+                        |(digest, bytes_written)| (NixHash::Sha1(digest.into()), bytes_written),
+                    )?,
+                    HashAlgo::Sha512 => hash::<Sha512>(&mut r, &mut blob_writer).await.map(
+                        |(digest, bytes_written)| {
+                            (NixHash::Sha512(Box::new(digest.into())), bytes_written)
+                        },
+                    )?,
+                };
+
+                if let Some(exp_nixhash) = exp_nixhash {
+                    if exp_nixhash != actual_nixhash {
+                        return Err(FetcherError::HashMismatch {
+                            url,
+                            wanted: exp_nixhash,
+                            got: actual_nixhash,
+                        });
+                    }
+                }
+
+                // Construct and return the FileNode describing the downloaded contents.
+                Ok((
+                    Node::File(FileNode {
+                        name: vec![].into(),
+                        digest: blob_writer.close().await?.into(),
+                        size: blob_size,
+                        executable: false,
+                    }),
+                    CAHash::Flat(actual_nixhash),
+                    blob_size,
+                ))
+            }
+            Fetch::Tarball(url, exp_nar_sha256) => {
+                // Construct a AsyncRead reading from the data as its downloaded.
+                let r = self.download(&url).await?;
+
+                // Pop compression.
+                let r = DecompressedReader::new(r);
+                // Open the archive.
+                let archive = tokio_tar::Archive::new(r);
+
+                // Ingest the archive, get the root node
+                let node = tvix_castore::import::archive::ingest_archive(
+                    &self.blob_service,
+                    &self.directory_service,
+                    archive,
+                )
+                .await?;
+
+                // If an expected NAR sha256 was provided, compare with the one
+                // calculated from our root node.
+                // Even if no expected NAR sha256 has been provided, we need
+                // the actual one later.
+                let (nar_size, actual_nar_sha256) = self
+                    .path_info_service
+                    .calculate_nar(&node)
+                    .await
+                    .map_err(|e| {
+                        // convert the generic Store error to an IO error.
+                        FetcherError::Io(e.into())
+                    })?;
+
+                if let Some(exp_nar_sha256) = exp_nar_sha256 {
+                    if exp_nar_sha256 != actual_nar_sha256 {
+                        return Err(FetcherError::HashMismatch {
+                            url,
+                            wanted: NixHash::Sha256(exp_nar_sha256),
+                            got: NixHash::Sha256(actual_nar_sha256),
+                        });
+                    }
+                }
+
+                Ok((
+                    node,
+                    CAHash::Nar(NixHash::Sha256(actual_nar_sha256)),
+                    nar_size,
+                ))
+            }
+            Fetch::Git() => todo!(),
+        }
+    }
+
+    /// Ingests the data from a specified [Fetch], persists the returned node
+    /// in the PathInfoService, and returns the calculated StorePath, as well as
+    /// the root node pointing to the contents.
+    /// The root node can be used to descend into the data without doing the
+    /// lookup to the PathInfoService again.
+    pub async fn ingest_and_persist<'a>(
+        &self,
+        name: &'a str,
+        fetch: Fetch,
+    ) -> Result<(StorePathRef<'a>, Node), FetcherError> {
+        // Fetch file, return the (unnamed) (File)Node of its contents, ca hash and filesize.
+        let (mut node, ca_hash, size) = self.ingest(fetch).await?;
+
+        // Calculate the store path to return later, which is done with the ca_hash.
+        let store_path = build_ca_path(name, &ca_hash, Vec::<String>::new(), false)?;
+
+        // Rename the node name to match the Store Path.
+        if let Node::File(file_node) = &mut node {
+            file_node.name = store_path.to_string().into();
+        } else {
+            unreachable!("Tvix bug: do_fetch for URL returned non-FileNode");
+        }
+
+        // If the resulting hash is not a CAHash::Nar, we also need to invoke
+        // `calculate_nar` to calculate this representation, as it's required in
+        // the [PathInfo].
+        let (nar_size, nar_sha256) = match &ca_hash {
+            CAHash::Flat(_nix_hash) => self
+                .path_info_service
+                .calculate_nar(&node)
+                .await
+                .map_err(|e| FetcherError::Io(e.into()))?,
+            CAHash::Nar(NixHash::Sha256(nar_sha256)) => (size, *nar_sha256),
+            CAHash::Nar(_) => unreachable!("Tvix bug: fetch returned non-sha256 CAHash::Nar"),
+            CAHash::Text(_) => unreachable!("Tvix bug: fetch returned CAHash::Text"),
+        };
+
+        // Construct the PathInfo and persist it.
+        let path_info = PathInfo {
+            node: Some(tvix_castore::proto::Node { node: Some(node) }),
+            references: vec![],
+            narinfo: Some(tvix_store::proto::NarInfo {
+                nar_size,
+                nar_sha256: nar_sha256.to_vec().into(),
+                signatures: vec![],
+                reference_names: vec![],
+                deriver: None,
+                ca: Some(ca_hash.into()),
+            }),
+        };
+
+        let path_info = self
+            .path_info_service
+            .put(path_info)
+            .await
+            .map_err(|e| FetcherError::Io(e.into()))?;
+
+        Ok((store_path, path_info.node.unwrap().node.unwrap()))
+    }
+}
+
+/// Attempts to mimic `nix::libutil::baseNameOf`
+pub(crate) fn url_basename(s: &str) -> &str {
+    if s.is_empty() {
+        return "";
+    }
+
+    let mut last = s.len() - 1;
+    if s.chars().nth(last).unwrap() == '/' && last > 0 {
+        last -= 1;
+    }
+
+    if last == 0 {
+        return "";
+    }
+
+    let pos = match s[..=last].rfind('/') {
+        Some(pos) => {
+            if pos == last - 1 {
+                0
+            } else {
+                pos
+            }
+        }
+        None => 0,
+    };
+
+    &s[(pos + 1)..=last]
+}
+
+#[cfg(test)]
+mod tests {
+    mod fetch {
+        use nix_compat::nixbase32;
+
+        use crate::fetchers::Fetch;
+
+        use super::super::*;
+
+        #[test]
+        fn fetchurl_store_path() {
+            let url = "https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch";
+            let exp_nixhash = NixHash::Sha256(
+                nixbase32::decode_fixed("0nawkl04sj7psw6ikzay7kydj3dhd0fkwghcsf5rzaw4bmp4kbax")
+                    .unwrap(),
+            );
+
+            let fetch = Fetch::URL(url.into(), Some(exp_nixhash));
+            assert_eq!(
+                "06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch",
+                &fetch
+                    .store_path("notmuch-extract-patch")
+                    .unwrap()
+                    .unwrap()
+                    .to_string(),
+            )
+        }
+
+        #[test]
+        fn fetch_tarball_store_path() {
+            let url = "https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz";
+            let exp_nixbase32 =
+                nixbase32::decode_fixed("1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm")
+                    .unwrap();
+            let fetch = Fetch::Tarball(url.into(), Some(exp_nixbase32));
+
+            assert_eq!(
+                "7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source",
+                &fetch.store_path("source").unwrap().unwrap().to_string(),
+            )
+        }
+    }
+
+    mod url_basename {
+        use super::super::*;
+
+        #[test]
+        fn empty_path() {
+            assert_eq!(url_basename(""), "");
+        }
+
+        #[test]
+        fn path_on_root() {
+            assert_eq!(url_basename("/dir"), "dir");
+        }
+
+        #[test]
+        fn relative_path() {
+            assert_eq!(url_basename("dir/foo"), "foo");
+        }
+
+        #[test]
+        fn root_with_trailing_slash() {
+            assert_eq!(url_basename("/"), "");
+        }
+
+        #[test]
+        fn trailing_slash() {
+            assert_eq!(url_basename("/dir/"), "dir");
+        }
+    }
+}
diff --git a/tvix/glue/src/lib.rs b/tvix/glue/src/lib.rs
index f04d5ec3a0..8528f09e52 100644
--- a/tvix/glue/src/lib.rs
+++ b/tvix/glue/src/lib.rs
@@ -1,4 +1,5 @@
 pub mod builtins;
+pub mod fetchers;
 pub mod known_paths;
 pub mod refscan;
 pub mod tvix_build;
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 3c86f42630..0994c44dfa 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -4,10 +4,9 @@ use async_recursion::async_recursion;
 use bytes::Bytes;
 use futures::{StreamExt, TryStreamExt};
 use nix_compat::nixhash::NixHash;
-use nix_compat::store_path::{build_ca_path, StorePathRef};
+use nix_compat::store_path::StorePathRef;
 use nix_compat::{nixhash::CAHash, store_path::StorePath};
 use sha2::{Digest, Sha256};
-use std::rc::Rc;
 use std::{
     cell::RefCell,
     collections::BTreeSet,
@@ -15,24 +14,22 @@ use std::{
     path::{Path, PathBuf},
     sync::Arc,
 };
-use tokio::io::AsyncBufRead;
-use tokio_util::io::{InspectReader, SyncIoBridge};
+use tokio_util::io::SyncIoBridge;
 use tracing::{error, instrument, warn, Level};
 use tvix_build::buildservice::BuildService;
-use tvix_castore::import::archive::ingest_archive;
-use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
+use tvix_castore::proto::node::Node;
+use tvix_eval::{EvalIO, FileType, StdIO};
 use tvix_store::utils::AsyncIoBridge;
 
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::{self, DirectoryService},
-    proto::{node::Node, FileNode, NamedNode},
+    proto::NamedNode,
     B3Digest,
 };
 use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
 
-use crate::builtins::FetcherError;
-use crate::decompression::DecompressedReader;
+use crate::fetchers::Fetcher;
 use crate::known_paths::KnownPaths;
 use crate::tvix_build::derivation_to_build_request;
 
@@ -60,7 +57,10 @@ pub struct TvixStoreIO {
     #[allow(dead_code)]
     build_service: Arc<dyn BuildService>,
     pub(crate) tokio_handle: tokio::runtime::Handle,
-    http_client: reqwest::Client,
+
+    pub(crate) fetcher:
+        Fetcher<Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>>,
+
     pub(crate) known_paths: RefCell<KnownPaths>,
 }
 
@@ -73,13 +73,13 @@ impl TvixStoreIO {
         tokio_handle: tokio::runtime::Handle,
     ) -> Self {
         Self {
-            blob_service,
-            directory_service,
-            path_info_service,
+            blob_service: blob_service.clone(),
+            directory_service: directory_service.clone(),
+            path_info_service: path_info_service.clone(),
             std_io: StdIO {},
             build_service,
             tokio_handle,
-            http_client: reqwest::Client::new(),
+            fetcher: Fetcher::new(blob_service, directory_service, path_info_service),
             known_paths: Default::default(),
         }
     }
@@ -358,130 +358,6 @@ impl TvixStoreIO {
             .await?
             .is_some())
     }
-
-    async fn download<'a>(&self, url: &str) -> Result<impl AsyncBufRead + Unpin + 'a, ErrorKind> {
-        let resp = self
-            .http_client
-            .get(url)
-            .send()
-            .await
-            .map_err(FetcherError::from)?;
-        Ok(tokio_util::io::StreamReader::new(
-            resp.bytes_stream().map_err(|e| {
-                let e = e.without_url();
-                warn!(%e, "failed to get response body");
-                io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
-            }),
-        ))
-    }
-
-    pub async fn fetch_url(
-        &self,
-        url: &str,
-        name: &str,
-        hash: Option<&NixHash>,
-    ) -> Result<StorePath, ErrorKind> {
-        let mut sha = Sha256::new();
-        let data = self.download(url).await?;
-        let mut data = InspectReader::new(data, |b| sha.update(b));
-        let mut blob = self.blob_service.open_write().await;
-        let size = tokio::io::copy(&mut data, blob.as_mut()).await?;
-        drop(data);
-        let blob_digest = blob.close().await?;
-        let got = NixHash::Sha256(sha.finalize().into());
-
-        let hash = CAHash::Flat(if let Some(wanted) = hash {
-            if *wanted != got {
-                return Err(FetcherError::HashMismatch {
-                    url: url.to_owned(),
-                    wanted: wanted.clone(),
-                    got,
-                }
-                .into());
-            }
-            wanted.clone()
-        } else {
-            got
-        });
-
-        let path = build_ca_path(name, &hash, Vec::<String>::new(), false)
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
-        let node = Node::File(FileNode {
-            name: path.to_string().into(),
-            digest: blob_digest.into(),
-            size,
-            executable: false,
-        });
-
-        let (nar_size, nar_sha256) = self
-            .path_info_service
-            .calculate_nar(&node)
-            .await
-            .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
-
-        let path_info = PathInfo {
-            node: Some(tvix_castore::proto::Node {
-                node: Some(node.clone()),
-            }),
-            references: vec![],
-            narinfo: Some(tvix_store::proto::NarInfo {
-                nar_size,
-                nar_sha256: nar_sha256.to_vec().into(),
-                signatures: vec![],
-                reference_names: vec![],
-                deriver: None, /* ? */
-                ca: Some((&hash).into()),
-            }),
-        };
-
-        self.path_info_service
-            .put(path_info)
-            .await
-            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-
-        Ok(path.to_owned())
-    }
-
-    pub async fn fetch_tarball(
-        &self,
-        url: &str,
-        name: &str,
-        ca: Option<CAHash>,
-    ) -> Result<StorePath, ErrorKind> {
-        let data = self.download(url).await?;
-        let data = DecompressedReader::new(data);
-        let archive = tokio_tar::Archive::new(data);
-        let node = ingest_archive(&self.blob_service, &self.directory_service, archive)
-            .await
-            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-
-        let (path_info, got, output_path) = self
-            .node_to_path_info(
-                name,
-                Path::new(""),
-                ca.clone().expect("TODO: support unspecified CA hash"),
-                node,
-            )
-            .await?;
-
-        if let Some(wanted) = &ca {
-            if *wanted.hash() != got {
-                return Err(FetcherError::HashMismatch {
-                    url: url.to_owned(),
-                    wanted: wanted.hash().into_owned(),
-                    got,
-                }
-                .into());
-            }
-        }
-
-        self.path_info_service
-            .put(path_info)
-            .await
-            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-
-        Ok(output_path)
-    }
 }
 
 impl EvalIO for TvixStoreIO {