about summary refs log tree commit diff
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-09-19T16·46-0500
committerclbot <clbot@tvl.fyi>2023-09-21T17·58+0000
commit37a348b4fae16b2b1c5ec12deaa085a049833d7f (patch)
tree7a1b1a7160036777b010cd81628960c1ca07486e
parent7e737fde34260daa477794d63b0b3344b4a1d81b (diff)
refactor(tvix/store): Asyncify PathInfoService and DirectoryService r/6623
We've decided to asyncify all of the services to reduce some of the
pains going back and for between sync<->async. The end goal will be for
all the tvix-store internals to be async and then expose a sync
interface for things like tvix eval io.

Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369
Autosubmit: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
-rw-r--r--tvix/Cargo.lock1
-rw-r--r--tvix/Cargo.nix7
-rw-r--r--tvix/cli/src/tvix_store_io.rs70
-rw-r--r--tvix/nix-compat/src/nar/writer/mod.rs2
-rw-r--r--tvix/store/Cargo.toml5
-rw-r--r--tvix/store/src/bin/tvix-store.rs13
-rw-r--r--tvix/store/src/directoryservice/grpc.rs285
-rw-r--r--tvix/store/src/directoryservice/memory.rs20
-rw-r--r--tvix/store/src/directoryservice/mod.rs24
-rw-r--r--tvix/store/src/directoryservice/sled.rs20
-rw-r--r--tvix/store/src/directoryservice/traverse.rs115
-rw-r--r--tvix/store/src/directoryservice/utils.rs161
-rw-r--r--tvix/store/src/fs/mod.rs43
-rw-r--r--tvix/store/src/fs/tests.rs231
-rw-r--r--tvix/store/src/import.rs1
-rw-r--r--tvix/store/src/nar/renderer.rs61
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs176
-rw-r--r--tvix/store/src/pathinfoservice/memory.rs15
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs19
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs18
-rw-r--r--tvix/store/src/proto/grpc_directoryservice_wrapper.rs11
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs17
-rw-r--r--tvix/store/src/tests/import.rs2
-rw-r--r--tvix/store/src/tests/nar_renderer.rs186
24 files changed, 776 insertions, 727 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 02726751705c..1650833c455a 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -2924,6 +2924,7 @@ name = "tvix-store"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "async-stream",
  "blake3",
  "bytes",
  "clap 4.2.7",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 002aaeab0989..5c264fa01ae9 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -7496,7 +7496,7 @@ rec {
           "time" = [ "tokio/time" ];
           "tokio-util" = [ "dep:tokio-util" ];
         };
-        resolvedDefaultFeatures = [ "default" "net" "time" ];
+        resolvedDefaultFeatures = [ "default" "fs" "net" "time" ];
       };
       "tokio-util" = rec {
         crateName = "tokio-util";
@@ -8704,6 +8704,10 @@ rec {
             packageId = "anyhow";
           }
           {
+            name = "async-stream";
+            packageId = "async-stream";
+          }
+          {
             name = "blake3";
             packageId = "blake3";
             features = [ "rayon" "std" ];
@@ -8796,6 +8800,7 @@ rec {
           {
             name = "tokio-stream";
             packageId = "tokio-stream";
+            features = [ "fs" ];
           }
           {
             name = "tokio-util";
diff --git a/tvix/cli/src/tvix_store_io.rs b/tvix/cli/src/tvix_store_io.rs
index 1a373a705fe4..1ea718f1a188 100644
--- a/tvix/cli/src/tvix_store_io.rs
+++ b/tvix/cli/src/tvix_store_io.rs
@@ -57,12 +57,15 @@ impl TvixStoreIO {
         store_path: &StorePath,
         sub_path: &Path,
     ) -> Result<Option<Node>, io::Error> {
-        let path_info = {
-            match self.path_info_service.get(store_path.digest)? {
-                // If there's no PathInfo found, early exit
-                None => return Ok(None),
-                Some(path_info) => path_info,
-            }
+        let path_info_service = self.path_info_service.clone();
+        let digest = store_path.digest.clone();
+        let task = self
+            .tokio_handle
+            .spawn(async move { path_info_service.get(digest).await });
+        let path_info = match self.tokio_handle.block_on(task).unwrap()? {
+            // If there's no PathInfo found, early exit
+            None => return Ok(None),
+            Some(path_info) => path_info,
         };
 
         let root_node = {
@@ -84,11 +87,13 @@ impl TvixStoreIO {
             }
         };
 
-        Ok(directoryservice::traverse_to(
-            self.directory_service.clone(),
-            root_node,
-            sub_path,
-        )?)
+        let directory_service = self.directory_service.clone();
+        let sub_path = sub_path.to_owned();
+        let task = self.tokio_handle.spawn(async move {
+            directoryservice::traverse_to(directory_service, root_node, &sub_path).await
+        });
+
+        Ok(self.tokio_handle.block_on(task).unwrap()?)
     }
 }
 
@@ -195,17 +200,23 @@ impl EvalIO for TvixStoreIO {
                 match node {
                     Node::Directory(directory_node) => {
                         // fetch the Directory itself.
-                        let digest = directory_node.digest.clone().try_into().map_err(|_e| {
-                            io::Error::new(
-                                io::ErrorKind::InvalidData,
-                                format!(
-                                    "invalid digest length in directory node: {:?}",
-                                    directory_node
-                                ),
-                            )
-                        })?;
-
-                        if let Some(directory) = self.directory_service.get(&digest)? {
+                        let digest: B3Digest =
+                            directory_node.digest.clone().try_into().map_err(|_e| {
+                                io::Error::new(
+                                    io::ErrorKind::InvalidData,
+                                    format!(
+                                        "invalid digest length in directory node: {:?}",
+                                        directory_node
+                                    ),
+                                )
+                            })?;
+
+                        let directory_service = self.directory_service.clone();
+                        let digest_clone = digest.clone();
+                        let task = self
+                            .tokio_handle
+                            .spawn(async move { directory_service.get(&digest_clone).await });
+                        if let Some(directory) = self.tokio_handle.block_on(task).unwrap()? {
                             let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new();
                             for node in directory.nodes() {
                                 children.push(match node {
@@ -299,14 +310,11 @@ async fn import_path_with_pathinfo(
         .await
         .expect("error during import_path");
 
-    // Render the NAR. This is blocking.
-    let calc_task = tokio::task::spawn_blocking(move || {
-        let (nar_size, nar_sha256) =
-            calculate_size_and_sha256(&root_node, blob_service.clone(), directory_service.clone())
-                .expect("error during nar calculation"); // TODO: handle error
-        (nar_size, nar_sha256, root_node)
-    });
-    let (nar_size, nar_sha256, root_node) = calc_task.await.unwrap();
+    // Render the NAR.
+    let (nar_size, nar_sha256) =
+        calculate_size_and_sha256(&root_node, blob_service.clone(), directory_service.clone())
+            .await
+            .expect("error during nar calculation"); // TODO: handle error
 
     // TODO: make a path_to_name helper function?
     let name = path
@@ -339,7 +347,7 @@ async fn import_path_with_pathinfo(
 
     // put into [PathInfoService], and return the [PathInfo] that we get
     // back from there (it might contain additional signatures).
-    let path_info = path_info_service.put(path_info)?;
+    let path_info = path_info_service.put(path_info).await?;
 
     Ok(path_info)
 }
diff --git a/tvix/nix-compat/src/nar/writer/mod.rs b/tvix/nix-compat/src/nar/writer/mod.rs
index aafb26c86dba..f018e4212e80 100644
--- a/tvix/nix-compat/src/nar/writer/mod.rs
+++ b/tvix/nix-compat/src/nar/writer/mod.rs
@@ -37,7 +37,7 @@ use std::io::{
 mod wire;
 
 /// Convenience type alias for types implementing [`Write`].
-pub type Writer<'a> = dyn Write + 'a;
+pub type Writer<'a> = dyn Write + Send + 'a;
 
 /// Create a new NAR, writing the output to the specified writer.
 pub fn open<'a, 'w: 'a>(writer: &'a mut Writer<'w>) -> io::Result<Node<'a, 'w>> {
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index ffa4426aef6a..3097d3c368da 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -5,18 +5,20 @@ edition = "2021"
 
 [dependencies]
 anyhow = "1.0.68"
+async-stream = "0.3.5"
 blake3 = { version = "1.3.1", features = ["rayon", "std"] }
 clap = { version = "4.0", features = ["derive", "env"] }
 count-write = "0.1.0"
 data-encoding = "2.3.3"
 lazy_static = "1.4.0"
 nix-compat = { path = "../nix-compat" }
+parking_lot = "0.12.1"
 prost = "0.11.2"
 rayon = "1.6.1"
 sha2 = "0.10.6"
 sled = { version = "0.34.7", features = ["compression"] }
 thiserror = "1.0.38"
-tokio-stream = "0.1.14"
+tokio-stream = { version = "0.1.14", features = ["fs"] }
 tokio = { version = "1.28.0", features = ["fs", "net", "rt-multi-thread", "signal"] }
 tonic = "0.8.2"
 tracing = "0.1.37"
@@ -31,7 +33,6 @@ serde_json = "1.0"
 url = "2.4.0"
 pin-project-lite = "0.2.13"
 const-zero = "0.1.1"
-parking_lot = "0.12.1"
 
 [dependencies.fuse-backend-rs]
 optional = true
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 1be8b00bd9b8..7761855cccb1 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -229,11 +229,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                         // Ask the PathInfoService for the NAR size and sha256
                         let root_node_copy = root_node.clone();
                         let path_info_service_clone = path_info_service.clone();
-                        let (nar_size, nar_sha256) = tokio::task::spawn_blocking(move || {
-                            path_info_service_clone.calculate_nar(&root_node_copy)
-                        })
-                        .await
-                        .unwrap()?;
+                        let (nar_size, nar_sha256) = path_info_service_clone
+                            .calculate_nar(&root_node_copy)
+                            .await?;
 
                         // TODO: make a path_to_name helper function?
                         let name = path
@@ -265,10 +263,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
                         // put into [PathInfoService], and return the PathInfo that we get back
                         // from there (it might contain additional signatures).
-                        let path_info =
-                            tokio::task::spawn_blocking(move || path_info_service.put(path_info))
-                                .await
-                                .unwrap()?;
+                        let path_info = path_info_service.put(path_info).await?;
 
                         let node = path_info.node.unwrap().node.unwrap();
 
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs
index 73d88bb688a3..6257a8e81485 100644
--- a/tvix/store/src/directoryservice/grpc.rs
+++ b/tvix/store/src/directoryservice/grpc.rs
@@ -1,22 +1,24 @@
 use std::collections::HashSet;
+use std::pin::Pin;
 
 use super::{DirectoryPutter, DirectoryService};
 use crate::proto::{self, get_directory_request::ByWhat};
 use crate::{B3Digest, Error};
+use async_stream::try_stream;
+use futures::Stream;
 use tokio::net::UnixStream;
+use tokio::spawn;
 use tokio::sync::mpsc::UnboundedSender;
 use tokio::task::JoinHandle;
 use tokio_stream::wrappers::UnboundedReceiverStream;
+use tonic::async_trait;
+use tonic::Code;
 use tonic::{transport::Channel, Status};
-use tonic::{Code, Streaming};
 use tracing::{instrument, warn};
 
 /// Connects to a (remote) tvix-store DirectoryService over gRPC.
 #[derive(Clone)]
 pub struct GRPCDirectoryService {
-    /// A handle into the active tokio runtime. Necessary to spawn tasks.
-    tokio_handle: tokio::runtime::Handle,
-
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
     grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
@@ -28,13 +30,11 @@ impl GRPCDirectoryService {
     pub fn from_client(
         grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
     ) -> Self {
-        Self {
-            tokio_handle: tokio::runtime::Handle::current(),
-            grpc_client,
-        }
+        Self { grpc_client }
     }
 }
 
+#[async_trait]
 impl DirectoryService for GRPCDirectoryService {
     /// Constructs a [GRPCDirectoryService] from the passed [url::Url]:
     /// - scheme has to match `grpc+*://`.
@@ -89,11 +89,15 @@ impl DirectoryService for GRPCDirectoryService {
             }
         }
     }
-    fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> {
+
+    async fn get(
+        &self,
+        digest: &B3Digest,
+    ) -> Result<Option<crate::proto::Directory>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
         let digest_cpy = digest.clone();
-        let task = self.tokio_handle.spawn(async move {
+        let message = async move {
             let mut s = grpc_client
                 .get(proto::GetDirectoryRequest {
                     recursive: false,
@@ -104,10 +108,10 @@ impl DirectoryService for GRPCDirectoryService {
 
             // Retrieve the first message only, then close the stream (we set recursive to false)
             s.message().await
-        });
+        };
 
         let digest = digest.clone();
-        match self.tokio_handle.block_on(task)? {
+        match message.await {
             Ok(Some(directory)) => {
                 // Validate the retrieved Directory indeed has the
                 // digest we expect it to have, to detect corruptions.
@@ -134,14 +138,12 @@ impl DirectoryService for GRPCDirectoryService {
         }
     }
 
-    fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
+    async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
         let mut grpc_client = self.grpc_client.clone();
 
-        let task = self
-            .tokio_handle
-            .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });
+        let resp = grpc_client.put(tokio_stream::iter(vec![directory])).await;
 
-        match self.tokio_handle.block_on(task)? {
+        match resp {
             Ok(put_directory_resp) => Ok(put_directory_resp
                 .into_inner()
                 .root_digest
@@ -157,32 +159,82 @@ impl DirectoryService for GRPCDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> {
+    ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
         let mut grpc_client = self.grpc_client.clone();
+        let root_directory_digest = root_directory_digest.clone();
 
-        // clone so we can move it
-        let root_directory_digest_cpy = root_directory_digest.clone();
-
-        let task: JoinHandle<Result<Streaming<proto::Directory>, Status>> =
-            self.tokio_handle.spawn(async move {
-                let s = grpc_client
-                    .get(proto::GetDirectoryRequest {
-                        recursive: true,
-                        by_what: Some(ByWhat::Digest(root_directory_digest_cpy.into())),
-                    })
-                    .await?
-                    .into_inner();
-
-                Ok(s)
-            });
+        let stream = try_stream! {
+            let mut stream = grpc_client
+                .get(proto::GetDirectoryRequest {
+                    recursive: true,
+                    by_what: Some(ByWhat::Digest(root_directory_digest.clone().into())),
+                })
+                .await
+                .map_err(|e| crate::Error::StorageError(e.to_string()))?
+                .into_inner();
 
-        let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
+            // The Directory digests we received so far
+            let mut received_directory_digests: HashSet<B3Digest> = HashSet::new();
+            // The Directory digests we're still expecting to get sent.
+            let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]);
+
+            loop {
+                match stream.message().await {
+                    Ok(Some(directory)) => {
+                        // validate the directory itself.
+                        if let Err(e) = directory.validate() {
+                            Err(crate::Error::StorageError(format!(
+                                "directory {} failed validation: {}",
+                                directory.digest(),
+                                e,
+                            )))?;
+                        }
+                        // validate we actually expected that directory, and move it from expected to received.
+                        let directory_digest = directory.digest();
+                        let was_expected = expected_directory_digests.remove(&directory_digest);
+                        if !was_expected {
+                            // FUTUREWORK: dumb clients might send the same stuff twice.
+                            // as a fallback, we might want to tolerate receiving
+                            // it if it's in received_directory_digests (as that
+                            // means it once was in expected_directory_digests)
+                            Err(crate::Error::StorageError(format!(
+                                "received unexpected directory {}",
+                                directory_digest
+                            )))?;
+                        }
+                        received_directory_digests.insert(directory_digest);
+
+                        // register all children in expected_directory_digests.
+                        for child_directory in &directory.directories {
+                            // We ran validate() above, so we know these digests must be correct.
+                            let child_directory_digest =
+                                child_directory.digest.clone().try_into().unwrap();
+
+                            expected_directory_digests
+                                .insert(child_directory_digest);
+                        }
+
+                        yield directory;
+                    },
+                    Ok(None) => {
+                        // If we were still expecting something, that's an error.
+                        if !expected_directory_digests.is_empty() {
+                            Err(crate::Error::StorageError(format!(
+                                "still expected {} directories, but got premature end of stream",
+                                expected_directory_digests.len(),
+                            )))?
+                        } else {
+                            return
+                        }
+                    },
+                    Err(e) => {
+                        Err(crate::Error::StorageError(e.to_string()))?;
+                    },
+                }
+            }
+        };
 
-        Box::new(StreamIterator::new(
-            self.tokio_handle.clone(),
-            root_directory_digest.clone(),
-            stream,
-        ))
+        Box::pin(stream)
     }
 
     #[instrument(skip_all)]
@@ -194,110 +246,21 @@ impl DirectoryService for GRPCDirectoryService {
 
         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
 
-        let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> =
-            self.tokio_handle.spawn(async move {
-                let s = grpc_client
-                    .put(UnboundedReceiverStream::new(rx))
-                    .await?
-                    .into_inner();
-
-                Ok(s)
-            });
-
-        Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task))
-    }
-}
-
-pub struct StreamIterator {
-    /// A handle into the active tokio runtime. Necessary to run futures to completion.
-    tokio_handle: tokio::runtime::Handle,
-    // A stream of [proto::Directory]
-    stream: Streaming<proto::Directory>,
-    // The Directory digests we received so far
-    received_directory_digests: HashSet<B3Digest>,
-    // The Directory digests we're still expecting to get sent.
-    expected_directory_digests: HashSet<B3Digest>,
-}
-
-impl StreamIterator {
-    pub fn new(
-        tokio_handle: tokio::runtime::Handle,
-        root_digest: B3Digest,
-        stream: Streaming<proto::Directory>,
-    ) -> Self {
-        Self {
-            tokio_handle,
-            stream,
-            received_directory_digests: HashSet::new(),
-            expected_directory_digests: HashSet::from([root_digest]),
-        }
-    }
-}
-
-impl Iterator for StreamIterator {
-    type Item = Result<proto::Directory, crate::Error>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        match self.tokio_handle.block_on(self.stream.message()) {
-            Ok(ok) => match ok {
-                Some(directory) => {
-                    // validate the directory itself.
-                    if let Err(e) = directory.validate() {
-                        return Some(Err(crate::Error::StorageError(format!(
-                            "directory {} failed validation: {}",
-                            directory.digest(),
-                            e,
-                        ))));
-                    }
-                    // validate we actually expected that directory, and move it from expected to received.
-                    let directory_digest = directory.digest();
-                    let was_expected = self.expected_directory_digests.remove(&directory_digest);
-                    if !was_expected {
-                        // FUTUREWORK: dumb clients might send the same stuff twice.
-                        // as a fallback, we might want to tolerate receiving
-                        // it if it's in received_directory_digests (as that
-                        // means it once was in expected_directory_digests)
-                        return Some(Err(crate::Error::StorageError(format!(
-                            "received unexpected directory {}",
-                            directory_digest
-                        ))));
-                    }
-                    self.received_directory_digests.insert(directory_digest);
-
-                    // register all children in expected_directory_digests.
-                    for child_directory in &directory.directories {
-                        // We ran validate() above, so we know these digests must be correct.
-                        let child_directory_digest =
-                            child_directory.digest.clone().try_into().unwrap();
+        let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move {
+            let s = grpc_client
+                .put(UnboundedReceiverStream::new(rx))
+                .await?
+                .into_inner();
 
-                        self.expected_directory_digests
-                            .insert(child_directory_digest);
-                    }
+            Ok(s)
+        });
 
-                    Some(Ok(directory))
-                }
-                None => {
-                    // If we were still expecting something, that's an error.
-                    if !self.expected_directory_digests.is_empty() {
-                        Some(Err(crate::Error::StorageError(format!(
-                            "still expected {} directories, but got premature end of stream",
-                            self.expected_directory_digests.len(),
-                        ))))
-                    } else {
-                        None
-                    }
-                }
-            },
-            Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))),
-        }
+        Box::new(GRPCPutter::new(tx, task))
     }
 }
 
 /// Allows uploading multiple Directory messages in the same gRPC stream.
 pub struct GRPCPutter {
-    /// A handle into the active tokio runtime. Necessary to spawn tasks.
-    tokio_handle: tokio::runtime::Handle,
-
     /// Data about the current request - a handle to the task, and the tx part
     /// of the channel.
     /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request.
@@ -311,19 +274,18 @@ pub struct GRPCPutter {
 
 impl GRPCPutter {
     pub fn new(
-        tokio_handle: tokio::runtime::Handle,
         directory_sender: UnboundedSender<proto::Directory>,
         task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
     ) -> Self {
         Self {
-            tokio_handle,
             rq: Some((task, directory_sender)),
         }
     }
 }
 
+#[async_trait]
 impl DirectoryPutter for GRPCPutter {
-    fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
         match self.rq {
             // If we're not already closed, send the directory to directory_sender.
             Some((_, ref directory_sender)) => {
@@ -331,7 +293,7 @@ impl DirectoryPutter for GRPCPutter {
                     // If the channel has been prematurely closed, invoke close (so we can peek at the error code)
                     // That error code is much more helpful, because it
                     // contains the error message from the server.
-                    self.close()?;
+                    self.close().await?;
                 }
                 Ok(())
             }
@@ -343,7 +305,7 @@ impl DirectoryPutter for GRPCPutter {
     }
 
     /// Closes the stream for sending, and returns the value
-    fn close(&mut self) -> Result<B3Digest, crate::Error> {
+    async fn close(&mut self) -> Result<B3Digest, crate::Error> {
         // get self.rq, and replace it with None.
         // This ensures we can only close it once.
         match std::mem::take(&mut self.rq) {
@@ -352,9 +314,8 @@ impl DirectoryPutter for GRPCPutter {
                 // close directory_sender, so blocking on task will finish.
                 drop(directory_sender);
 
-                let root_digest = self
-                    .tokio_handle
-                    .block_on(task)?
+                let root_digest = task
+                    .await?
                     .map_err(|e| Error::StorageError(e.to_string()))?
                     .root_digest;
 
@@ -379,6 +340,7 @@ mod tests {
     use core::time;
     use std::thread;
 
+    use futures::StreamExt;
     use tempfile::TempDir;
     use tokio::net::{UnixListener, UnixStream};
     use tokio_stream::wrappers::UnixListenerStream;
@@ -446,7 +408,7 @@ mod tests {
             );
         }
 
-        let task = tester_runtime.spawn_blocking(move || {
+        tester_runtime.block_on(async move {
             // Create a channel, connecting to the uds at socket_path.
             // The URI is unused.
             let channel = Endpoint::try_from("http://[::]:50051")
@@ -465,6 +427,7 @@ mod tests {
                 None,
                 directory_service
                     .get(&DIRECTORY_A.digest())
+                    .await
                     .expect("must not fail")
             );
 
@@ -473,6 +436,7 @@ mod tests {
                 DIRECTORY_A.digest(),
                 directory_service
                     .put(DIRECTORY_A.clone())
+                    .await
                     .expect("must succeed")
             );
 
@@ -481,6 +445,7 @@ mod tests {
                 DIRECTORY_A.clone(),
                 directory_service
                     .get(&DIRECTORY_A.digest())
+                    .await
                     .expect("must succeed")
                     .expect("must be some")
             );
@@ -488,21 +453,22 @@ mod tests {
             // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
             directory_service
                 .put(DIRECTORY_B.clone())
+                .await
                 .expect_err("must fail");
 
             // Putting DIRECTORY_B in a put_multiple will succeed, but the close
             // will always fail.
             {
                 let mut handle = directory_service.put_multiple_start();
-                handle.put(DIRECTORY_B.clone()).expect("must succeed");
-                handle.close().expect_err("must fail");
+                handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+                handle.close().await.expect_err("must fail");
             }
 
             // Uploading A and then B should succeed, and closing should return the digest of B.
             let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_A.clone()).expect("must succeed");
-            handle.put(DIRECTORY_B.clone()).expect("must succeed");
-            let digest = handle.close().expect("must succeed");
+            handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
+            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+            let digest = handle.close().await.expect("must succeed");
             assert_eq!(DIRECTORY_B.digest(), digest);
 
             // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
@@ -511,6 +477,7 @@ mod tests {
                 DIRECTORY_B.clone(),
                 directories_it
                     .next()
+                    .await
                     .expect("must be some")
                     .expect("must succeed")
             );
@@ -518,6 +485,7 @@ mod tests {
                 DIRECTORY_A.clone(),
                 directories_it
                     .next()
+                    .await
                     .expect("must be some")
                     .expect("must succeed")
             );
@@ -529,15 +497,15 @@ mod tests {
             {
                 let mut handle = directory_service.put_multiple_start();
                 // sending out B will always be fine
-                handle.put(DIRECTORY_B.clone()).expect("must succeed");
+                handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
 
                 // whether we will be able to put A as well depends on whether we
                 // already received the error about B.
-                if handle.put(DIRECTORY_A.clone()).is_ok() {
+                if handle.put(DIRECTORY_A.clone()).await.is_ok() {
                     // If we didn't, and this was Ok(_), …
                     // a subsequent close MUST fail (because it waits for the
                     // server)
-                    handle.close().expect_err("must fail");
+                    handle.close().await.expect_err("must fail");
                 }
             }
 
@@ -547,7 +515,7 @@ mod tests {
             // and then assert that uploading anything else via the handle will fail.
             {
                 let mut handle = directory_service.put_multiple_start();
-                handle.put(DIRECTORY_B.clone()).expect("must succeed");
+                handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
 
                 let mut is_closed = false;
                 for _try in 1..1000 {
@@ -555,7 +523,7 @@ mod tests {
                         is_closed = true;
                         break;
                     }
-                    std::thread::sleep(time::Duration::from_millis(10))
+                    tokio::time::sleep(time::Duration::from_millis(10)).await;
                 }
 
                 assert!(
@@ -563,12 +531,13 @@ mod tests {
                     "expected channel to eventually close, but never happened"
                 );
 
-                handle.put(DIRECTORY_A.clone()).expect_err("must fail");
+                handle
+                    .put(DIRECTORY_A.clone())
+                    .await
+                    .expect_err("must fail");
             }
         });
 
-        tester_runtime.block_on(task)?;
-
         Ok(())
     }
 }
diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs
index 634dbf9922d0..ac67c999d01b 100644
--- a/tvix/store/src/directoryservice/memory.rs
+++ b/tvix/store/src/directoryservice/memory.rs
@@ -1,16 +1,20 @@
 use crate::{proto, B3Digest, Error};
+use futures::Stream;
 use std::collections::HashMap;
+use std::pin::Pin;
 use std::sync::{Arc, RwLock};
+use tonic::async_trait;
 use tracing::{instrument, warn};
 
-use super::utils::SimplePutter;
-use super::{DirectoryPutter, DirectoryService, DirectoryTraverser};
+use super::utils::{traverse_directory, SimplePutter};
+use super::{DirectoryPutter, DirectoryService};
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
     db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>,
 }
 
+#[async_trait]
 impl DirectoryService for MemoryDirectoryService {
     /// Constructs a [MemoryDirectoryService] from the passed [url::Url]:
     /// - scheme has to be `memory://`
@@ -27,8 +31,9 @@ impl DirectoryService for MemoryDirectoryService {
 
         Ok(Self::default())
     }
+
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
         let db = self.db.read()?;
 
         match db.get(digest) {
@@ -62,7 +67,7 @@ impl DirectoryService for MemoryDirectoryService {
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
         let digest = directory.digest();
 
         // validate the directory itself.
@@ -84,11 +89,8 @@ impl DirectoryService for MemoryDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> {
-        Box::new(DirectoryTraverser::with(
-            self.clone(),
-            root_directory_digest,
-        ))
+    ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
+        traverse_directory(self.clone(), root_directory_digest)
     }
 
     #[instrument(skip_all)]
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs
index fea4191400f3..09210dfed8e8 100644
--- a/tvix/store/src/directoryservice/mod.rs
+++ b/tvix/store/src/directoryservice/mod.rs
@@ -1,4 +1,7 @@
 use crate::{proto, B3Digest, Error};
+use futures::Stream;
+use std::pin::Pin;
+use tonic::async_trait;
 
 mod from_addr;
 mod grpc;
@@ -12,32 +15,38 @@ pub use self::grpc::GRPCDirectoryService;
 pub use self::memory::MemoryDirectoryService;
 pub use self::sled::SledDirectoryService;
 pub use self::traverse::traverse_to;
-pub use self::utils::DirectoryTraverser;
 
 /// The base trait all Directory services need to implement.
 /// This is a simple get and put of [crate::proto::Directory], returning their
 /// digest.
+#[async_trait]
 pub trait DirectoryService: Send + Sync {
     /// Create a new instance by passing in a connection URL.
+    /// TODO: check if we want to make this async, instead of lazily connecting
     fn from_url(url: &url::Url) -> Result<Self, Error>
     where
         Self: Sized;
 
     /// Get looks up a single Directory message by its digest.
     /// In case the directory is not found, Ok(None) is returned.
-    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
     /// Get uploads a single Directory message, and returns the calculated
     /// digest, or an error.
-    fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
 
     /// Looks up a closure of [proto::Directory].
-    /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`,
+    /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`,
     /// and we'd be able to add a default implementation for it here, but
     /// we can't have that yet.
+    ///
+    /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily,
+    /// and the box allows different underlying stream implementations to be returned since
+    /// Rust doesn't support this as a generic in traits yet. This is the same thing that
+    /// [async_trait] generates, but for streams instead of futures.
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send>;
+    ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>;
 
     /// Allows persisting a closure of [proto::Directory], which is a graph of
     /// connected Directory messages.
@@ -49,16 +58,17 @@ pub trait DirectoryService: Send + Sync {
 /// The consumer can periodically call [DirectoryPutter::put], starting from the
 /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
 /// retrieve the root digest (or an error).
+#[async_trait]
 pub trait DirectoryPutter: Send {
     /// Put a individual [proto::Directory] into the store.
     /// Error semantics and behaviour is up to the specific implementation of
     /// this trait.
     /// Due to bursting, the returned error might refer to an object previously
     /// sent via `put`.
-    fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
 
     /// Close the stream, and wait for any errors.
-    fn close(&mut self) -> Result<B3Digest, Error>;
+    async fn close(&mut self) -> Result<B3Digest, Error>;
 
     /// Return whether the stream is closed or not.
     /// Used from some [DirectoryService] implementations only.
diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs
index e741434eabb5..0dc5496803cb 100644
--- a/tvix/store/src/directoryservice/sled.rs
+++ b/tvix/store/src/directoryservice/sled.rs
@@ -1,12 +1,15 @@
 use crate::directoryservice::DirectoryPutter;
 use crate::proto::Directory;
 use crate::{proto, B3Digest, Error};
+use futures::Stream;
 use prost::Message;
 use std::path::PathBuf;
+use std::pin::Pin;
+use tonic::async_trait;
 use tracing::{instrument, warn};
 
-use super::utils::SimplePutter;
-use super::{DirectoryService, DirectoryTraverser};
+use super::utils::{traverse_directory, SimplePutter};
+use super::DirectoryService;
 
 #[derive(Clone)]
 pub struct SledDirectoryService {
@@ -29,6 +32,7 @@ impl SledDirectoryService {
     }
 }
 
+#[async_trait]
 impl DirectoryService for SledDirectoryService {
     /// Constructs a [SledDirectoryService] from the passed [url::Url]:
     /// - scheme has to be `sled://`
@@ -59,7 +63,7 @@ impl DirectoryService for SledDirectoryService {
     }
 
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
         match self.db.get(digest.to_vec()) {
             // The directory was not found, return
             Ok(None) => Ok(None),
@@ -99,7 +103,7 @@ impl DirectoryService for SledDirectoryService {
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
         let digest = directory.digest();
 
         // validate the directory itself.
@@ -121,12 +125,8 @@ impl DirectoryService for SledDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Box<(dyn Iterator<Item = Result<proto::Directory, Error>> + std::marker::Send + 'static)>
-    {
-        Box::new(DirectoryTraverser::with(
-            self.clone(),
-            root_directory_digest,
-        ))
+    ) -> Pin<Box<(dyn Stream<Item = Result<proto::Directory, Error>> + Send + 'static)>> {
+        traverse_directory(self.clone(), root_directory_digest)
     }
 
     #[instrument(skip_all)]
diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs
index 9029543267c1..0489c4458163 100644
--- a/tvix/store/src/directoryservice/traverse.rs
+++ b/tvix/store/src/directoryservice/traverse.rs
@@ -1,19 +1,18 @@
 use super::DirectoryService;
-use crate::{proto::NamedNode, Error};
+use crate::{proto::NamedNode, B3Digest, Error};
 use std::{os::unix::ffi::OsStrExt, sync::Arc};
 use tracing::{instrument, warn};
 
 /// This traverses from a (root) node to the given (sub)path, returning the Node
 /// at that path, or none, if there's nothing at that path.
-/// TODO: Do we want to rewrite this in a non-recursing fashion, and use
-/// [DirectoryService.get_recursive] to do less lookups?
+/// TODO: Do we want to use [DirectoryService.get_recursive] to do less lookups?
 /// Or do we consider this to be a non-issue due to store composition and local caching?
 /// TODO: the name of this function (and mod) is a bit bad, because it doesn't
 /// clearly distinguish it from the BFS traversers.
 #[instrument(skip(directory_service))]
-pub fn traverse_to(
+pub async fn traverse_to(
     directory_service: Arc<dyn DirectoryService>,
-    node: crate::proto::node::Node,
+    root_node: crate::proto::node::Node,
     path: &std::path::Path,
 ) -> Result<Option<crate::proto::node::Node>, Error> {
     // strip a possible `/` prefix from the path.
@@ -25,52 +24,56 @@ pub fn traverse_to(
         }
     };
 
+    let mut cur_node = root_node;
     let mut it = path.components();
 
-    match it.next() {
-        None => {
-            // the (remaining) path is empty, return the node we've been called with.
-            Ok(Some(node))
-        }
-        Some(first_component) => {
-            match node {
-                crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => {
-                    // There's still some path left, but the current node is no directory.
-                    // This means the path doesn't exist, as we can't reach it.
-                    Ok(None)
-                }
-                crate::proto::node::Node::Directory(directory_node) => {
-                    let digest = directory_node
-                        .digest
-                        .try_into()
-                        .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
-
-                    // fetch the linked node from the directory_service
-                    match directory_service.get(&digest)? {
-                        // If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
-                        None => {
-                            warn!("directory {} does not exist", digest);
-
-                            Err(Error::StorageError(format!(
-                                "directory {} does not exist",
-                                digest
-                            )))
-                        }
-                        Some(directory) => {
-                            // look for first_component in the [Directory].
-                            // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we
-                            // could stop as soon as e.name is larger than the search string.
-                            let child_node = directory
-                                .nodes()
-                                .find(|n| n.get_name() == first_component.as_os_str().as_bytes());
-
-                            match child_node {
-                                // child node not found means there's no such element inside the directory.
-                                None => Ok(None),
-                                // child node found, recurse with it and the rest of the path.
-                                Some(child_node) => {
-                                    let rest_path: std::path::PathBuf = it.collect();
-                                    traverse_to(directory_service, child_node, &rest_path)
+    loop {
+        match it.next() {
+            None => {
+                // the (remaining) path is empty, return the node we're current at.
+                return Ok(Some(cur_node));
+            }
+            Some(first_component) => {
+                match cur_node {
+                    crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => {
+                        // There's still some path left, but the current node is no directory.
+                        // This means the path doesn't exist, as we can't reach it.
+                        return Ok(None);
+                    }
+                    crate::proto::node::Node::Directory(directory_node) => {
+                        let digest: B3Digest = directory_node.digest.try_into().map_err(|_e| {
+                            Error::StorageError("invalid digest length".to_string())
+                        })?;
+
+                        // fetch the linked node from the directory_service
+                        match directory_service.get(&digest).await? {
+                            // If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
+                            None => {
+                                warn!("directory {} does not exist", digest);
+
+                                return Err(Error::StorageError(format!(
+                                    "directory {} does not exist",
+                                    digest
+                                )));
+                            }
+                            Some(directory) => {
+                                // look for first_component in the [Directory].
+                                // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we
+                                // could stop as soon as e.name is larger than the search string.
+                                let child_node = directory.nodes().find(|n| {
+                                    n.get_name() == first_component.as_os_str().as_bytes()
+                                });
+
+                                match child_node {
+                                    // child node not found means there's no such element inside the directory.
+                                    None => {
+                                        return Ok(None);
+                                    }
+                                    // child node found, return to top-of loop to find the next
+                                    // node in the path.
+                                    Some(child_node) => {
+                                        cur_node = child_node;
+                                    }
                                 }
                             }
                         }
@@ -92,16 +95,18 @@ mod tests {
 
     use super::traverse_to;
 
-    #[test]
-    fn test_traverse_to() {
+    #[tokio::test]
+    async fn test_traverse_to() {
         let directory_service = gen_directory_service();
 
         let mut handle = directory_service.put_multiple_start();
         handle
             .put(DIRECTORY_WITH_KEEP.clone())
+            .await
             .expect("must succeed");
         handle
             .put(DIRECTORY_COMPLICATED.clone())
+            .await
             .expect("must succeed");
 
         // construct the node for DIRECTORY_COMPLICATED
@@ -128,6 +133,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from(""),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(Some(node_directory_complicated.clone()), resp);
@@ -140,6 +146,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("keep"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(Some(node_directory_with_keep), resp);
@@ -152,6 +159,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("keep/.keep"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(Some(node_file_keep.clone()), resp);
@@ -164,6 +172,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("/keep/.keep"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(Some(node_file_keep), resp);
@@ -176,6 +185,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("void"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(None, resp);
@@ -188,6 +198,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("//v/oid"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(None, resp);
@@ -201,6 +212,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("keep/.keep/foo"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(None, resp);
@@ -213,6 +225,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("/"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(Some(node_directory_complicated), resp);
diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs
index 95f02f1f9ce8..4c5e7cfde37c 100644
--- a/tvix/store/src/directoryservice/utils.rs
+++ b/tvix/store/src/directoryservice/utils.rs
@@ -3,103 +3,85 @@ use super::DirectoryService;
 use crate::proto;
 use crate::B3Digest;
 use crate::Error;
+use async_stream::stream;
+use futures::Stream;
 use std::collections::{HashSet, VecDeque};
-use tracing::{debug_span, instrument, warn};
+use std::pin::Pin;
+use tonic::async_trait;
+use tracing::warn;
 
 /// Traverses a [proto::Directory] from the root to the children.
 ///
 /// This is mostly BFS, but directories are only returned once.
-pub struct DirectoryTraverser<DS: DirectoryService> {
+pub fn traverse_directory<DS: DirectoryService + 'static>(
     directory_service: DS,
-    /// The list of all directories that still need to be traversed. The next
-    /// element is picked from the front, new elements are enqueued at the
-    /// back.
-    worklist_directory_digests: VecDeque<B3Digest>,
-    /// The list of directory digests already sent to the consumer.
-    /// We omit sending the same directories multiple times.
-    sent_directory_digests: HashSet<B3Digest>,
-}
-
-impl<DS: DirectoryService> DirectoryTraverser<DS> {
-    pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self {
-        Self {
-            directory_service,
-            worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]),
-            sent_directory_digests: HashSet::new(),
-        }
-    }
-
-    // enqueue all child directory digests to the work queue, as
-    // long as they're not part of the worklist or already sent.
-    // This panics if the digest looks invalid, it's supposed to be checked first.
-    fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
-        for child_directory_node in &directory.directories {
-            // TODO: propagate error
-            let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
-
-            if self.worklist_directory_digests.contains(&child_digest)
-                || self.sent_directory_digests.contains(&child_digest)
-            {
-                continue;
-            }
-            self.worklist_directory_digests.push_back(child_digest);
-        }
-    }
-}
-
-impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
-    type Item = Result<proto::Directory, Error>;
-
-    #[instrument(skip_all)]
-    fn next(&mut self) -> Option<Self::Item> {
-        // fetch the next directory digest from the top of the work queue.
-        match self.worklist_directory_digests.pop_front() {
-            None => None,
-            Some(current_directory_digest) => {
-                let span = debug_span!("directory.digest", "{}", current_directory_digest);
-                let _ = span.enter();
-
-                // look up the directory itself.
-                let current_directory = match self.directory_service.get(&current_directory_digest)
-                {
-                    // if we got it
-                    Ok(Some(current_directory)) => {
-                        // validate, we don't want to send invalid directories.
-                        if let Err(e) = current_directory.validate() {
-                            warn!("directory failed validation: {}", e.to_string());
-                            return Some(Err(Error::StorageError(format!(
-                                "invalid directory: {}",
-                                current_directory_digest
-                            ))));
-                        }
-                        current_directory
-                    }
-                    // if it's not there, we have an inconsistent store!
-                    Ok(None) => {
-                        warn!("directory {} does not exist", current_directory_digest);
-                        return Some(Err(Error::StorageError(format!(
-                            "directory {} does not exist",
+    root_directory_digest: &B3Digest,
+) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
+    // The list of all directories that still need to be traversed. The next
+    // element is picked from the front, new elements are enqueued at the
+    // back.
+    let mut worklist_directory_digests: VecDeque<B3Digest> =
+        VecDeque::from([root_directory_digest.clone()]);
+    // The list of directory digests already sent to the consumer.
+    // We omit sending the same directories multiple times.
+    let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new();
+
+    let stream = stream! {
+        while let Some(current_directory_digest) = worklist_directory_digests.pop_front() {
+            match directory_service.get(&current_directory_digest).await {
+                // if it's not there, we have an inconsistent store!
+                Ok(None) => {
+                    warn!("directory {} does not exist", current_directory_digest);
+                    yield Err(Error::StorageError(format!(
+                        "directory {} does not exist",
+                        current_directory_digest
+                    )));
+                }
+                Err(e) => {
+                    warn!("failed to look up directory");
+                    yield Err(Error::StorageError(format!(
+                        "unable to look up directory {}: {}",
+                        current_directory_digest, e
+                    )));
+                }
+
+                // if we got it
+                Ok(Some(current_directory)) => {
+                    // validate, we don't want to send invalid directories.
+                    if let Err(e) = current_directory.validate() {
+                        warn!("directory failed validation: {}", e.to_string());
+                        yield Err(Error::StorageError(format!(
+                            "invalid directory: {}",
                             current_directory_digest
-                        ))));
-                    }
-                    Err(e) => {
-                        warn!("failed to look up directory");
-                        return Some(Err(Error::StorageError(format!(
-                            "unable to look up directory {}: {}",
-                            current_directory_digest, e
-                        ))));
+                        )));
                     }
-                };
 
-                // All DirectoryServices MUST validate directory nodes, before returning them out, so we
-                // can be sure [enqueue_child_directories] doesn't panic.
+                    // We're about to send this directory, so let's avoid sending it again if a
+                    // descendant has it.
+                    sent_directory_digests.insert(current_directory_digest);
+
+                    // enqueue all child directory digests to the work queue, as
+                    // long as they're not part of the worklist or already sent.
+                    // This panics if the digest looks invalid, it's supposed to be checked first.
+                    for child_directory_node in &current_directory.directories {
+                        // TODO: propagate error
+                        let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
+
+                        if worklist_directory_digests.contains(&child_digest)
+                            || sent_directory_digests.contains(&child_digest)
+                        {
+                            continue;
+                        }
+                        worklist_directory_digests.push_back(child_digest);
+                    }
 
-                // enqueue child directories
-                self.enqueue_child_directories(&current_directory);
-                Some(Ok(current_directory))
-            }
+                    yield Ok(current_directory);
+                }
+            };
         }
-    }
+    };
+
+    Box::pin(stream)
 }
 
 /// This is a simple implementation of a Directory uploader.
@@ -120,13 +102,14 @@ impl<DS: DirectoryService> SimplePutter<DS> {
     }
 }
 
+#[async_trait]
 impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
-    fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
         if self.closed {
             return Err(Error::StorageError("already closed".to_string()));
         }
 
-        let digest = self.directory_service.put(directory)?;
+        let digest = self.directory_service.put(directory).await?;
 
         // track the last directory digest
         self.last_directory_digest = Some(digest);
@@ -135,7 +118,7 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
     }
 
     /// We need to be mutable here, as that's the signature of the trait.
-    fn close(&mut self) -> Result<B3Digest, Error> {
+    async fn close(&mut self) -> Result<B3Digest, Error> {
         if self.closed {
             return Err(Error::StorageError("already closed".to_string()));
         }
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs
index 48e605406331..02d3bb3221ad 100644
--- a/tvix/store/src/fs/mod.rs
+++ b/tvix/store/src/fs/mod.rs
@@ -16,6 +16,7 @@ use crate::{
     B3Digest, Error,
 };
 use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
+use futures::StreamExt;
 use nix_compat::store_path::StorePath;
 use parking_lot::RwLock;
 use std::{
@@ -26,7 +27,10 @@ use std::{
     sync::{atomic::Ordering, Arc},
     time::Duration,
 };
-use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
+use tokio::{
+    io::{AsyncBufReadExt, AsyncSeekExt},
+    sync::mpsc,
+};
 use tracing::{debug, info_span, warn};
 
 use self::{
@@ -159,7 +163,11 @@ impl TvixStoreFs {
             )))
         } else {
             // If we don't have it, look it up in PathInfoService.
-            match self.path_info_service.get(store_path.digest)? {
+            let path_info_service = self.path_info_service.clone();
+            let task = self
+                .tokio_handle
+                .spawn(async move { path_info_service.get(store_path.digest).await });
+            match self.tokio_handle.block_on(task).unwrap()? {
                 // the pathinfo doesn't exist, so the file doesn't exist.
                 None => Ok(None),
                 Some(path_info) => {
@@ -204,7 +212,12 @@ impl TvixStoreFs {
     /// This is both used to initially insert the root node of a store path,
     /// as well as when looking up an intermediate DirectoryNode.
     fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> {
-        match self.directory_service.get(directory_digest) {
+        let directory_service = self.directory_service.clone();
+        let directory_digest_clone = directory_digest.clone();
+        let task = self
+            .tokio_handle
+            .spawn(async move { directory_service.get(&directory_digest_clone).await });
+        match self.tokio_handle.block_on(task).unwrap() {
             Err(e) => {
                 warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory");
                 Err(e)
@@ -369,12 +382,23 @@ impl FileSystem for TvixStoreFs {
             if !self.list_root {
                 return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
             } else {
-                for (i, path_info) in self
-                    .path_info_service
-                    .list()
-                    .skip(offset as usize)
-                    .enumerate()
-                {
+                let path_info_service = self.path_info_service.clone();
+                let (tx, mut rx) = mpsc::channel(16);
+
+                // This task will run in the background immediately and will exit
+                // after the stream ends or if we no longer want any more entries.
+                self.tokio_handle.spawn(async move {
+                    let mut stream = path_info_service.list().skip(offset as usize).enumerate();
+                    while let Some(path_info) = stream.next().await {
+                        if tx.send(path_info).await.is_err() {
+                            // If we get a send error, it means the sync code
+                            // doesn't want any more entries.
+                            break;
+                        }
+                    }
+                });
+
+                while let Some((i, path_info)) = rx.blocking_recv() {
                     let path_info = match path_info {
                         Err(e) => {
                             warn!("failed to retrieve pathinfo: {}", e);
@@ -421,6 +445,7 @@ impl FileSystem for TvixStoreFs {
                         break;
                     }
                 }
+
                 return Ok(());
             }
         }
diff --git a/tvix/store/src/fs/tests.rs b/tvix/store/src/fs/tests.rs
index 30f5ca3f40aa..6837f8aa293a 100644
--- a/tvix/store/src/fs/tests.rs
+++ b/tvix/store/src/fs/tests.rs
@@ -1,8 +1,10 @@
+use futures::StreamExt;
 use std::io::Cursor;
 use std::os::unix::prelude::MetadataExt;
 use std::path::Path;
 use std::sync::Arc;
-use std::{fs, io};
+use tokio::{fs, io};
+use tokio_stream::wrappers::ReadDirStream;
 
 use tempfile::TempDir;
 
@@ -75,7 +77,10 @@ async fn populate_blob_a(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 async fn populate_blob_b(
@@ -102,7 +107,10 @@ async fn populate_blob_b(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 /// adds a blob containing helloworld and marks it as executable
@@ -133,10 +141,13 @@ async fn populate_helloworld_blob(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
-fn populate_symlink(
+async fn populate_symlink(
     _blob_service: &Arc<dyn BlobService>,
     _directory_service: &Arc<dyn DirectoryService>,
     path_info_service: &Arc<dyn PathInfoService>,
@@ -151,12 +162,15 @@ fn populate_symlink(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 /// This writes a symlink pointing to /nix/store/somewhereelse,
 /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED.
-fn populate_symlink2(
+async fn populate_symlink2(
     _blob_service: &Arc<dyn BlobService>,
     _directory_service: &Arc<dyn DirectoryService>,
     path_info_service: &Arc<dyn PathInfoService>,
@@ -171,7 +185,10 @@ fn populate_symlink2(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 async fn populate_directory_with_keep(
@@ -189,6 +206,7 @@ async fn populate_directory_with_keep(
     // upload directory
     directory_service
         .put(fixtures::DIRECTORY_WITH_KEEP.clone())
+        .await
         .expect("must succeed uploading");
 
     // upload pathinfo
@@ -202,12 +220,15 @@ async fn populate_directory_with_keep(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 /// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory
 /// itself.
-fn populate_pathinfo_without_directory(
+async fn populate_pathinfo_without_directory(
     _: &Arc<dyn BlobService>,
     _: &Arc<dyn DirectoryService>,
     path_info_service: &Arc<dyn PathInfoService>,
@@ -223,11 +244,14 @@ fn populate_pathinfo_without_directory(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 /// Insert , but don't provide the blob .keep is pointing to
-fn populate_blob_a_without_blob(
+async fn populate_blob_a_without_blob(
     _: &Arc<dyn BlobService>,
     _: &Arc<dyn DirectoryService>,
     path_info_service: &Arc<dyn PathInfoService>,
@@ -244,7 +268,10 @@ fn populate_blob_a_without_blob(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 async fn populate_directory_complicated(
@@ -262,11 +289,13 @@ async fn populate_directory_complicated(
     // upload inner directory
     directory_service
         .put(fixtures::DIRECTORY_WITH_KEEP.clone())
+        .await
         .expect("must succeed uploading");
 
     // uplodad parent directory
     directory_service
         .put(fixtures::DIRECTORY_COMPLICATED.clone())
+        .await
         .expect("must succeed uploading");
 
     // upload pathinfo
@@ -280,7 +309,10 @@ async fn populate_directory_complicated(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 /// Ensure mounting itself doesn't fail
@@ -329,9 +361,13 @@ async fn root() {
 
     {
         // read_dir succeeds, but getting the first element will fail.
-        let mut it = fs::read_dir(tmpdir).expect("must succeed");
+        let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed"));
 
-        let err = it.next().expect("must be some").expect_err("must be err");
+        let err = it
+            .next()
+            .await
+            .expect("must be some")
+            .expect_err("must be err");
         assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind());
     }
 
@@ -362,11 +398,15 @@ async fn root_with_listing() {
 
     {
         // read_dir succeeds, but getting the first element will fail.
-        let mut it = fs::read_dir(tmpdir).expect("must succeed");
+        let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed"));
 
-        let e = it.next().expect("must be some").expect("must succeed");
+        let e = it
+            .next()
+            .await
+            .expect("must be some")
+            .expect("must succeed");
 
-        let metadata = e.metadata().expect("must succeed");
+        let metadata = e.metadata().await.expect("must succeed");
         assert!(metadata.is_file());
         assert!(metadata.permissions().readonly());
         assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len());
@@ -400,7 +440,7 @@ async fn stat_file_at_root() {
     let p = tmpdir.path().join(BLOB_A_NAME);
 
     // peek at the file metadata
-    let metadata = fs::metadata(p).expect("must succeed");
+    let metadata = fs::metadata(p).await.expect("must succeed");
 
     assert!(metadata.is_file());
     assert!(metadata.permissions().readonly());
@@ -434,7 +474,7 @@ async fn read_file_at_root() {
     let p = tmpdir.path().join(BLOB_A_NAME);
 
     // read the file contents
-    let data = fs::read(p).expect("must succeed");
+    let data = fs::read(p).await.expect("must succeed");
 
     // ensure size and contents match
     assert_eq!(fixtures::BLOB_A.len(), data.len());
@@ -468,7 +508,7 @@ async fn read_large_file_at_root() {
     let p = tmpdir.path().join(BLOB_B_NAME);
     {
         // peek at the file metadata
-        let metadata = fs::metadata(&p).expect("must succeed");
+        let metadata = fs::metadata(&p).await.expect("must succeed");
 
         assert!(metadata.is_file());
         assert!(metadata.permissions().readonly());
@@ -476,7 +516,7 @@ async fn read_large_file_at_root() {
     }
 
     // read the file contents
-    let data = fs::read(p).expect("must succeed");
+    let data = fs::read(p).await.expect("must succeed");
 
     // ensure size and contents match
     assert_eq!(fixtures::BLOB_B.len(), data.len());
@@ -496,7 +536,7 @@ async fn symlink_readlink() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
-    populate_symlink(&blob_service, &directory_service, &path_info_service);
+    populate_symlink(&blob_service, &directory_service, &path_info_service).await;
 
     let mut fuse_daemon = do_mount(
         blob_service,
@@ -509,20 +549,20 @@ async fn symlink_readlink() {
 
     let p = tmpdir.path().join(SYMLINK_NAME);
 
-    let target = fs::read_link(&p).expect("must succeed");
+    let target = fs::read_link(&p).await.expect("must succeed");
     assert_eq!(BLOB_A_NAME, target.to_str().unwrap());
 
     // peek at the file metadata, which follows symlinks.
     // this must fail, as we didn't populate the target.
-    let e = fs::metadata(&p).expect_err("must fail");
+    let e = fs::metadata(&p).await.expect_err("must fail");
     assert_eq!(std::io::ErrorKind::NotFound, e.kind());
 
     // peeking at the file metadata without following symlinks will succeed.
-    let metadata = fs::symlink_metadata(&p).expect("must succeed");
+    let metadata = fs::symlink_metadata(&p).await.expect("must succeed");
     assert!(metadata.is_symlink());
 
     // reading from the symlink (which follows) will fail, because the target doesn't exist.
-    let e = fs::read(p).expect_err("must fail");
+    let e = fs::read(p).await.expect_err("must fail");
     assert_eq!(std::io::ErrorKind::NotFound, e.kind());
 
     fuse_daemon.unmount().expect("unmount");
@@ -540,7 +580,7 @@ async fn read_stat_through_symlink() {
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
-    populate_symlink(&blob_service, &directory_service, &path_info_service);
+    populate_symlink(&blob_service, &directory_service, &path_info_service).await;
 
     let mut fuse_daemon = do_mount(
         blob_service,
@@ -556,16 +596,16 @@ async fn read_stat_through_symlink() {
 
     // peek at the file metadata, which follows symlinks.
     // this must now return the same metadata as when statting at the target directly.
-    let metadata_symlink = fs::metadata(&p_symlink).expect("must succeed");
-    let metadata_blob = fs::metadata(&p_blob).expect("must succeed");
+    let metadata_symlink = fs::metadata(&p_symlink).await.expect("must succeed");
+    let metadata_blob = fs::metadata(&p_blob).await.expect("must succeed");
     assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type());
     assert_eq!(metadata_blob.len(), metadata_symlink.len());
 
     // reading from the symlink (which follows) will return the same data as if
     // we were reading from the file directly.
     assert_eq!(
-        std::fs::read(p_blob).expect("must succeed"),
-        std::fs::read(p_symlink).expect("must succeed"),
+        fs::read(p_blob).await.expect("must succeed"),
+        fs::read(p_symlink).await.expect("must succeed"),
     );
 
     fuse_daemon.unmount().expect("unmount");
@@ -596,7 +636,7 @@ async fn read_stat_directory() {
     let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
 
     // peek at the metadata of the directory
-    let metadata = fs::metadata(p).expect("must succeed");
+    let metadata = fs::metadata(p).await.expect("must succeed");
     assert!(metadata.is_dir());
     assert!(metadata.permissions().readonly());
 
@@ -628,12 +668,12 @@ async fn read_blob_inside_dir() {
     let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep");
 
     // peek at metadata.
-    let metadata = fs::metadata(&p).expect("must succeed");
+    let metadata = fs::metadata(&p).await.expect("must succeed");
     assert!(metadata.is_file());
     assert!(metadata.permissions().readonly());
 
     // read from it
-    let data = fs::read(&p).expect("must succeed");
+    let data = fs::read(&p).await.expect("must succeed");
     assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data);
 
     fuse_daemon.unmount().expect("unmount");
@@ -669,12 +709,12 @@ async fn read_blob_deep_inside_dir() {
         .join(".keep");
 
     // peek at metadata.
-    let metadata = fs::metadata(&p).expect("must succeed");
+    let metadata = fs::metadata(&p).await.expect("must succeed");
     assert!(metadata.is_file());
     assert!(metadata.permissions().readonly());
 
     // read from it
-    let data = fs::read(&p).expect("must succeed");
+    let data = fs::read(&p).await.expect("must succeed");
     assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data);
 
     fuse_daemon.unmount().expect("unmount");
@@ -706,10 +746,10 @@ async fn readdir() {
 
     {
         // read_dir should succeed. Collect all elements
-        let elements: Vec<_> = fs::read_dir(p)
-            .expect("must succeed")
+        let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed"))
             .map(|e| e.expect("must not be err"))
-            .collect();
+            .collect()
+            .await;
 
         assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and ..
 
@@ -719,18 +759,18 @@ async fn readdir() {
         // ".keep", 0 byte file.
         let e = &elements[0];
         assert_eq!(".keep", e.file_name());
-        assert!(e.file_type().expect("must succeed").is_file());
-        assert_eq!(0, e.metadata().expect("must succeed").len());
+        assert!(e.file_type().await.expect("must succeed").is_file());
+        assert_eq!(0, e.metadata().await.expect("must succeed").len());
 
         // "aa", symlink.
         let e = &elements[1];
         assert_eq!("aa", e.file_name());
-        assert!(e.file_type().expect("must succeed").is_symlink());
+        assert!(e.file_type().await.expect("must succeed").is_symlink());
 
         // "keep", directory
         let e = &elements[2];
         assert_eq!("keep", e.file_name());
-        assert!(e.file_type().expect("must succeed").is_dir());
+        assert!(e.file_type().await.expect("must succeed").is_dir());
     }
 
     fuse_daemon.unmount().expect("unmount");
@@ -762,18 +802,18 @@ async fn readdir_deep() {
 
     {
         // read_dir should succeed. Collect all elements
-        let elements: Vec<_> = fs::read_dir(p)
-            .expect("must succeed")
+        let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed"))
             .map(|e| e.expect("must not be err"))
-            .collect();
+            .collect()
+            .await;
 
         assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and ..
 
         // ".keep", 0 byte file.
         let e = &elements[0];
         assert_eq!(".keep", e.file_name());
-        assert!(e.file_type().expect("must succeed").is_file());
-        assert_eq!(0, e.metadata().expect("must succeed").len());
+        assert!(e.file_type().await.expect("must succeed").is_file());
+        assert_eq!(0, e.metadata().await.expect("must succeed").len());
     }
 
     fuse_daemon.unmount().expect("unmount");
@@ -792,7 +832,7 @@ async fn check_attributes() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
     populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
-    populate_symlink(&blob_service, &directory_service, &path_info_service);
+    populate_symlink(&blob_service, &directory_service, &path_info_service).await;
     populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await;
 
     let mut fuse_daemon = do_mount(
@@ -810,10 +850,16 @@ async fn check_attributes() {
     let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME);
 
     // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident.
-    let metadata_file = fs::symlink_metadata(&p_file).expect("must succeed");
-    let metadata_executable_file = fs::symlink_metadata(&p_executable_file).expect("must succeed");
-    let metadata_directory = fs::symlink_metadata(&p_directory).expect("must succeed");
-    let metadata_symlink = fs::symlink_metadata(&p_symlink).expect("must succeed");
+    let metadata_file = fs::symlink_metadata(&p_file).await.expect("must succeed");
+    let metadata_executable_file = fs::symlink_metadata(&p_executable_file)
+        .await
+        .expect("must succeed");
+    let metadata_directory = fs::symlink_metadata(&p_directory)
+        .await
+        .expect("must succeed");
+    let metadata_symlink = fs::symlink_metadata(&p_symlink)
+        .await
+        .expect("must succeed");
 
     // modes should match. We & with 0o777 to remove any higher bits.
     assert_eq!(0o444, metadata_file.mode() & 0o777);
@@ -873,8 +919,14 @@ async fn compare_inodes_directories() {
 
     // peek at metadata.
     assert_eq!(
-        fs::metadata(p_dir_with_keep).expect("must succeed").ino(),
-        fs::metadata(p_sibling_dir).expect("must succeed").ino()
+        fs::metadata(p_dir_with_keep)
+            .await
+            .expect("must succeed")
+            .ino(),
+        fs::metadata(p_sibling_dir)
+            .await
+            .expect("must succeed")
+            .ino()
     );
 
     fuse_daemon.unmount().expect("unmount");
@@ -912,8 +964,8 @@ async fn compare_inodes_files() {
 
     // peek at metadata.
     assert_eq!(
-        fs::metadata(p_keep1).expect("must succeed").ino(),
-        fs::metadata(p_keep2).expect("must succeed").ino()
+        fs::metadata(p_keep1).await.expect("must succeed").ino(),
+        fs::metadata(p_keep2).await.expect("must succeed").ino()
     );
 
     fuse_daemon.unmount().expect("unmount");
@@ -932,7 +984,7 @@ async fn compare_inodes_symlinks() {
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
-    populate_symlink2(&blob_service, &directory_service, &path_info_service);
+    populate_symlink2(&blob_service, &directory_service, &path_info_service).await;
 
     let mut fuse_daemon = do_mount(
         blob_service,
@@ -948,8 +1000,8 @@ async fn compare_inodes_symlinks() {
 
     // peek at metadata.
     assert_eq!(
-        fs::symlink_metadata(p1).expect("must succeed").ino(),
-        fs::symlink_metadata(p2).expect("must succeed").ino()
+        fs::symlink_metadata(p1).await.expect("must succeed").ino(),
+        fs::symlink_metadata(p2).await.expect("must succeed").ino()
     );
 
     fuse_daemon.unmount().expect("unmount");
@@ -978,28 +1030,32 @@ async fn read_wrong_paths_in_root() {
     .expect("must succeed");
 
     // wrong name
-    assert!(!tmpdir
-        .path()
-        .join("00000000000000000000000000000000-tes")
-        .exists());
+    assert!(
+        fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes"))
+            .await
+            .is_err()
+    );
 
     // invalid hash
-    assert!(!tmpdir
-        .path()
-        .join("0000000000000000000000000000000-test")
-        .exists());
+    assert!(
+        fs::metadata(tmpdir.path().join("0000000000000000000000000000000-test"))
+            .await
+            .is_err()
+    );
 
     // right name, must exist
-    assert!(tmpdir
-        .path()
-        .join("00000000000000000000000000000000-test")
-        .exists());
+    assert!(
+        fs::metadata(tmpdir.path().join("00000000000000000000000000000000-test"))
+            .await
+            .is_ok()
+    );
 
     // now wrong name with right hash still may not exist
-    assert!(!tmpdir
-        .path()
-        .join("00000000000000000000000000000000-tes")
-        .exists());
+    assert!(
+        fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes"))
+            .await
+            .is_err()
+    );
 
     fuse_daemon.unmount().expect("unmount");
 }
@@ -1027,7 +1083,7 @@ async fn disallow_writes() {
     .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_A_NAME);
-    let e = std::fs::File::create(p).expect_err("must fail");
+    let e = fs::File::create(p).await.expect_err("must fail");
 
     assert_eq!(Some(libc::EROFS), e.raw_os_error());
 
@@ -1044,7 +1100,8 @@ async fn missing_directory() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
-    populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service);
+    populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service)
+        .await;
 
     let mut fuse_daemon = do_mount(
         blob_service,
@@ -1059,19 +1116,19 @@ async fn missing_directory() {
 
     {
         // `stat` on the path should succeed, because it doesn't trigger the directory request.
-        fs::metadata(&p).expect("must succeed");
+        fs::metadata(&p).await.expect("must succeed");
 
         // However, calling either `readdir` or `stat` on a child should fail with an IO error.
         // It fails when trying to pull the first entry, because we don't implement opendir separately
-        fs::read_dir(&p)
-            .unwrap()
+        ReadDirStream::new(fs::read_dir(&p).await.unwrap())
             .next()
+            .await
             .expect("must be some")
             .expect_err("must be err");
 
         // rust currently sets e.kind() to Uncategorized, which isn't very
         // helpful, so we don't look at the error more closely than that..
-        fs::metadata(p.join(".keep")).expect_err("must fail");
+        fs::metadata(p.join(".keep")).await.expect_err("must fail");
     }
 
     fuse_daemon.unmount().expect("unmount");
@@ -1087,7 +1144,7 @@ async fn missing_blob() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
-    populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service);
+    populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service).await;
 
     let mut fuse_daemon = do_mount(
         blob_service,
@@ -1102,12 +1159,12 @@ async fn missing_blob() {
 
     {
         // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service.
-        fs::metadata(&p).expect("must succeed");
+        fs::metadata(&p).await.expect("must succeed");
 
         // However, calling read on the blob should fail.
         // rust currently sets e.kind() to Uncategorized, which isn't very
         // helpful, so we don't look at the error more closely than that..
-        fs::read(p).expect_err("must fail");
+        fs::read(p).await.expect_err("must fail");
     }
 
     fuse_daemon.unmount().expect("unmount");
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index 6764eaddb457..6eebe500d275 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -72,6 +72,7 @@ async fn process_entry(
         // upload this directory
         directory_putter
             .put(directory)
+            .await
             .map_err(|e| Error::UploadDirectoryError(entry.path().to_path_buf(), e))?;
 
         return Ok(proto::node::Node::Directory(proto::DirectoryNode {
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index 4255148fc5e8..f1392472a50e 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -8,20 +8,20 @@ use count_write::CountWrite;
 use nix_compat::nar;
 use sha2::{Digest, Sha256};
 use std::{io, sync::Arc};
-use tokio::io::BufReader;
+use tokio::{io::BufReader, task::spawn_blocking};
 use tracing::warn;
 
 /// Invoke [write_nar], and return the size and sha256 digest of the produced
 /// NAR output.
-pub fn calculate_size_and_sha256(
+pub async fn calculate_size_and_sha256(
     root_node: &proto::node::Node,
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
 ) -> Result<(u64, [u8; 32]), RenderError> {
     let h = Sha256::new();
-    let mut cw = CountWrite::from(h);
+    let cw = CountWrite::from(h);
 
-    write_nar(&mut cw, root_node, blob_service, directory_service)?;
+    let cw = write_nar(cw, root_node, blob_service, directory_service).await?;
 
     Ok((cw.count(), cw.into_inner().finalize().into()))
 }
@@ -30,26 +30,44 @@ pub fn calculate_size_and_sha256(
 /// and uses the passed blob_service and directory_service to
 /// perform the necessary lookups as it traverses the structure.
 /// The contents in NAR serialization are writen to the passed [std::io::Write].
-pub fn write_nar<W: std::io::Write>(
-    w: &mut W,
+///
+/// The writer is passed back in the return value. This is done because async Rust
+/// lacks scoped blocking tasks, so we need to transfer ownership of the writer
+/// internally.
+///
+/// # Panics
+/// This will panic if called outside the context of a Tokio runtime.
+pub async fn write_nar<W: std::io::Write + Send + 'static>(
+    mut w: W,
     proto_root_node: &proto::node::Node,
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
-) -> Result<(), RenderError> {
-    // Initialize NAR writer
-    let nar_root_node = nar::writer::open(w).map_err(RenderError::NARWriterError)?;
+) -> Result<W, RenderError> {
+    let tokio_handle = tokio::runtime::Handle::current();
+    let proto_root_node = proto_root_node.clone();
+
+    spawn_blocking(move || {
+        // Initialize NAR writer
+        let nar_root_node = nar::writer::open(&mut w).map_err(RenderError::NARWriterError)?;
 
-    walk_node(
-        nar_root_node,
-        proto_root_node,
-        blob_service,
-        directory_service,
-    )
+        walk_node(
+            tokio_handle,
+            nar_root_node,
+            &proto_root_node,
+            blob_service,
+            directory_service,
+        )?;
+
+        Ok(w)
+    })
+    .await
+    .unwrap()
 }
 
 /// Process an intermediate node in the structure.
 /// This consumes the node.
 fn walk_node(
+    tokio_handle: tokio::runtime::Handle,
     nar_node: nar::writer::Node,
     proto_node: &proto::node::Node,
     blob_service: Arc<dyn BlobService>,
@@ -73,9 +91,6 @@ fn walk_node(
                 ))
             })?;
 
-            // HACK: blob_service is async, but this function isn't async yet..
-            let tokio_handle = tokio::runtime::Handle::current();
-
             let blob_reader = match tokio_handle
                 .block_on(async { blob_service.open_read(&digest).await })
                 .map_err(RenderError::StoreError)?
@@ -107,11 +122,10 @@ fn walk_node(
                 })?;
 
             // look it up with the directory service
-            let resp = directory_service
-                .get(&digest)
-                .map_err(RenderError::StoreError)?;
-
-            match resp {
+            match tokio_handle
+                .block_on(async { directory_service.get(&digest).await })
+                .map_err(RenderError::StoreError)?
+            {
                 // if it's None, that's an error!
                 None => {
                     return Err(RenderError::DirectoryNotFound(
@@ -131,6 +145,7 @@ fn walk_node(
                             .entry(proto_node.get_name())
                             .map_err(RenderError::NARWriterError)?;
                         walk_node(
+                            tokio_handle.clone(),
                             child_node,
                             &proto_node,
                             blob_service.clone(),
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index 1649655b69ea..c116ddbc8905 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -4,16 +4,15 @@ use crate::{
     directoryservice::DirectoryService,
     proto::{self, ListPathInfoRequest},
 };
-use std::sync::Arc;
-use tokio::{net::UnixStream, task::JoinHandle};
-use tonic::{transport::Channel, Code, Status, Streaming};
+use async_stream::try_stream;
+use futures::Stream;
+use std::{pin::Pin, sync::Arc};
+use tokio::net::UnixStream;
+use tonic::{async_trait, transport::Channel, Code};
 
 /// Connects to a (remote) tvix-store PathInfoService over gRPC.
 #[derive(Clone)]
 pub struct GRPCPathInfoService {
-    /// A handle into the active tokio runtime. Necessary to spawn tasks.
-    tokio_handle: tokio::runtime::Handle,
-
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
     grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
@@ -25,13 +24,11 @@ impl GRPCPathInfoService {
     pub fn from_client(
         grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
     ) -> Self {
-        Self {
-            tokio_handle: tokio::runtime::Handle::current(),
-            grpc_client,
-        }
+        Self { grpc_client }
     }
 }
 
+#[async_trait]
 impl PathInfoService for GRPCPathInfoService {
     /// Constructs a [GRPCPathInfoService] from the passed [url::Url]:
     /// - scheme has to match `grpc+*://`.
@@ -92,47 +89,39 @@ impl PathInfoService for GRPCPathInfoService {
         }
     }
 
-    fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> {
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> {
         // Get a new handle to the gRPC client.
         let mut grpc_client = self.grpc_client.clone();
 
-        let task: JoinHandle<Result<proto::PathInfo, Status>> =
-            self.tokio_handle.spawn(async move {
-                let path_info = grpc_client
-                    .get(proto::GetPathInfoRequest {
-                        by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
-                            digest.to_vec().into(),
-                        )),
-                    })
-                    .await?
-                    .into_inner();
-
-                Ok(path_info)
-            });
+        let path_info = grpc_client
+            .get(proto::GetPathInfoRequest {
+                by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
+                    digest.to_vec().into(),
+                )),
+            })
+            .await;
 
-        match self.tokio_handle.block_on(task)? {
-            Ok(path_info) => Ok(Some(path_info)),
+        match path_info {
+            Ok(path_info) => Ok(Some(path_info.into_inner())),
             Err(e) if e.code() == Code::NotFound => Ok(None),
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
         }
     }
 
-    fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> {
+    async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> {
         // Get a new handle to the gRPC client.
         let mut grpc_client = self.grpc_client.clone();
 
-        let task: JoinHandle<Result<proto::PathInfo, Status>> =
-            self.tokio_handle.spawn(async move {
-                let path_info = grpc_client.put(path_info).await?.into_inner();
-                Ok(path_info)
-            });
+        let path_info = grpc_client
+            .put(path_info)
+            .await
+            .map_err(|e| crate::Error::StorageError(e.to_string()))?
+            .into_inner();
 
-        self.tokio_handle
-            .block_on(task)?
-            .map_err(|e| crate::Error::StorageError(e.to_string()))
+        Ok(path_info)
     }
 
-    fn calculate_nar(
+    async fn calculate_nar(
         &self,
         root_node: &proto::node::Node,
     ) -> Result<(u64, [u8; 32]), crate::Error> {
@@ -140,83 +129,54 @@ impl PathInfoService for GRPCPathInfoService {
         let mut grpc_client = self.grpc_client.clone();
         let root_node = root_node.clone();
 
-        let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
-            let path_info = grpc_client
-                .calculate_nar(proto::Node {
-                    node: Some(root_node),
-                })
-                .await?
-                .into_inner();
-            Ok(path_info)
-        });
-
-        let resp = self
-            .tokio_handle
-            .block_on(task)?
-            .map_err(|e| crate::Error::StorageError(e.to_string()))?;
+        let path_info = grpc_client
+            .calculate_nar(proto::Node {
+                node: Some(root_node),
+            })
+            .await
+            .map_err(|e| crate::Error::StorageError(e.to_string()))?
+            .into_inner();
 
-        let nar_sha256: [u8; 32] = resp
+        let nar_sha256: [u8; 32] = path_info
             .nar_sha256
             .to_vec()
             .try_into()
             .map_err(|_e| crate::Error::StorageError("invalid digest length".to_string()))?;
 
-        Ok((resp.nar_size, nar_sha256))
+        Ok((path_info.nar_size, nar_sha256))
     }
 
-    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> {
-        // Get a new handle to the gRPC client.
+    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, crate::Error>> + Send>> {
         let mut grpc_client = self.grpc_client.clone();
 
-        let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
-            let s = grpc_client
-                .list(ListPathInfoRequest::default())
-                .await?
-                .into_inner();
-
-            Ok(s)
-        });
-
-        let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
-
-        Box::new(StreamIterator::new(self.tokio_handle.clone(), stream))
-    }
-}
-
-pub struct StreamIterator {
-    tokio_handle: tokio::runtime::Handle,
-    stream: Streaming<proto::PathInfo>,
-}
-
-impl StreamIterator {
-    pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self {
-        Self {
-            tokio_handle,
-            stream,
-        }
-    }
-}
-
-impl Iterator for StreamIterator {
-    type Item = Result<proto::PathInfo, crate::Error>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        match self.tokio_handle.block_on(self.stream.message()) {
-            Ok(o) => match o {
-                Some(pathinfo) => {
-                    // validate the pathinfo
-                    if let Err(e) = pathinfo.validate() {
-                        return Some(Err(crate::Error::StorageError(format!(
-                            "pathinfo {:?} failed validation: {}",
-                            pathinfo, e
-                        ))));
-                    }
-                    Some(Ok(pathinfo))
+        let stream = try_stream! {
+            let resp = grpc_client.list(ListPathInfoRequest::default()).await;
+
+            let mut stream = resp.map_err(|e| crate::Error::StorageError(e.to_string()))?.into_inner();
+
+            loop {
+                match stream.message().await {
+                    Ok(o) => match o {
+                        Some(pathinfo) => {
+                            // validate the pathinfo
+                            if let Err(e) = pathinfo.validate() {
+                                Err(crate::Error::StorageError(format!(
+                                    "pathinfo {:?} failed validation: {}",
+                                    pathinfo, e
+                                )))?;
+                            }
+                            yield pathinfo
+                        }
+                        None => {
+                            return;
+                        },
+                    },
+                    Err(e) => Err(crate::Error::StorageError(e.to_string()))?,
                 }
-                None => None,
-            },
-            Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))),
-        }
+            }
+        };
+
+        Box::pin(stream)
     }
 }
 
@@ -227,7 +187,6 @@ mod tests {
 
     use tempfile::TempDir;
     use tokio::net::UnixListener;
-    use tokio::task;
     use tokio::time;
     use tokio_stream::wrappers::UnixListenerStream;
 
@@ -377,13 +336,10 @@ mod tests {
             );
         }
 
-        let pi = task::spawn_blocking(move || {
-            client
-                .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap())
-                .expect("must not be error")
-        })
-        .await
-        .expect("must not be err");
+        let pi = client
+            .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap())
+            .await
+            .expect("must not be error");
 
         assert!(pi.is_none());
     }
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
index aba1216c6e96..4cdc411ffb28 100644
--- a/tvix/store/src/pathinfoservice/memory.rs
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -3,10 +3,13 @@ use crate::{
     blobservice::BlobService, directoryservice::DirectoryService, nar::calculate_size_and_sha256,
     proto, Error,
 };
+use futures::{stream::iter, Stream};
 use std::{
     collections::HashMap,
+    pin::Pin,
     sync::{Arc, RwLock},
 };
+use tonic::async_trait;
 
 pub struct MemoryPathInfoService {
     db: Arc<RwLock<HashMap<[u8; 20], proto::PathInfo>>>,
@@ -28,6 +31,7 @@ impl MemoryPathInfoService {
     }
 }
 
+#[async_trait]
 impl PathInfoService for MemoryPathInfoService {
     /// Constructs a [MemoryPathInfoService] from the passed [url::Url]:
     /// - scheme has to be `memory://`
@@ -49,7 +53,7 @@ impl PathInfoService for MemoryPathInfoService {
         Ok(Self::new(blob_service, directory_service))
     }
 
-    fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> {
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> {
         let db = self.db.read().unwrap();
 
         match db.get(&digest) {
@@ -58,7 +62,7 @@ impl PathInfoService for MemoryPathInfoService {
         }
     }
 
-    fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> {
+    async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> {
         // Call validate on the received PathInfo message.
         match path_info.validate() {
             Err(e) => Err(Error::InvalidRequest(format!(
@@ -77,16 +81,17 @@ impl PathInfoService for MemoryPathInfoService {
         }
     }
 
-    fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> {
+    async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> {
         calculate_size_and_sha256(
             root_node,
             self.blob_service.clone(),
             self.directory_service.clone(),
         )
+        .await
         .map_err(|e| Error::StorageError(e.to_string()))
     }
 
-    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_> {
+    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>> {
         let db = self.db.read().unwrap();
 
         // Copy all elements into a list.
@@ -95,7 +100,7 @@ impl PathInfoService for MemoryPathInfoService {
         // memory impl is only for testing purposes anyways.
         let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect();
 
-        Box::new(items.into_iter())
+        Box::pin(iter(items))
     }
 }
 
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
index 51d3e51115eb..a460c35a0289 100644
--- a/tvix/store/src/pathinfoservice/mod.rs
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -3,8 +3,12 @@ mod grpc;
 mod memory;
 mod sled;
 
+use std::pin::Pin;
 use std::sync::Arc;
 
+use futures::Stream;
+use tonic::async_trait;
+
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
 use crate::{proto, Error};
@@ -16,10 +20,12 @@ pub use self::sled::SledPathInfoService;
 
 /// The base trait all PathInfo services need to implement.
 /// This is a simple get and put of [proto::Directory], returning their digest.
+#[async_trait]
 pub trait PathInfoService: Send + Sync {
     /// Create a new instance by passing in a connection URL, as well
     /// as instances of a [PathInfoService] and [DirectoryService] (as the
     /// [PathInfoService] needs to talk to them).
+    /// TODO: check if we want to make this async, instead of lazily connecting
     fn from_url(
         url: &url::Url,
         blob_service: Arc<dyn BlobService>,
@@ -29,18 +35,23 @@ pub trait PathInfoService: Send + Sync {
         Self: Sized;
 
     /// Retrieve a PathInfo message by the output digest.
-    fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error>;
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error>;
 
     /// Store a PathInfo message. Implementations MUST call validate and reject
     /// invalid messages.
-    fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error>;
+    async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error>;
 
     /// Return the nar size and nar sha256 digest for a given root node.
     /// This can be used to calculate NAR-based output paths,
     /// and implementations are encouraged to cache it.
-    fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>;
+    async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>;
 
     /// Iterate over all PathInfo objects in the store.
     /// Implementations can decide to disallow listing.
-    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_>;
+    ///
+    /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily,
+    /// and the box allows different underlying stream implementations to be returned since
+    /// Rust doesn't support this as a generic in traits yet. This is the same thing that
+    /// [async_trait] generates, but for streams instead of futures.
+    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>>;
 }
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
index 4f327626d19d..a9d0b029ee6b 100644
--- a/tvix/store/src/pathinfoservice/sled.rs
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -3,8 +3,10 @@ use crate::{
     blobservice::BlobService, directoryservice::DirectoryService, nar::calculate_size_and_sha256,
     proto, Error,
 };
+use futures::{stream::iter, Stream};
 use prost::Message;
-use std::{path::PathBuf, sync::Arc};
+use std::{path::PathBuf, pin::Pin, sync::Arc};
+use tonic::async_trait;
 use tracing::warn;
 
 /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
@@ -49,6 +51,7 @@ impl SledPathInfoService {
     }
 }
 
+#[async_trait]
 impl PathInfoService for SledPathInfoService {
     /// Constructs a [SledPathInfoService] from the passed [url::Url]:
     /// - scheme has to be `sled://`
@@ -84,7 +87,7 @@ impl PathInfoService for SledPathInfoService {
         }
     }
 
-    fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> {
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> {
         match self.db.get(digest) {
             Ok(None) => Ok(None),
             Ok(Some(data)) => match proto::PathInfo::decode(&*data) {
@@ -107,7 +110,7 @@ impl PathInfoService for SledPathInfoService {
         }
     }
 
-    fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> {
+    async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> {
         // Call validate on the received PathInfo message.
         match path_info.validate() {
             Err(e) => Err(Error::InvalidRequest(format!(
@@ -128,17 +131,18 @@ impl PathInfoService for SledPathInfoService {
         }
     }
 
-    fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> {
+    async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> {
         calculate_size_and_sha256(
             root_node,
             self.blob_service.clone(),
             self.directory_service.clone(),
         )
+        .await
         .map_err(|e| Error::StorageError(e.to_string()))
     }
 
-    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send> {
-        Box::new(self.db.iter().values().map(|v| match v {
+    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>> {
+        Box::pin(iter(self.db.iter().values().map(|v| match v {
             Ok(data) => {
                 // we retrieved some bytes
                 match proto::PathInfo::decode(&*data) {
@@ -159,7 +163,7 @@ impl PathInfoService for SledPathInfoService {
                     e
                 )))
             }
-        }))
+        })))
     }
 }
 
diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
index ec53d7d76ce3..5e143a7bd7a8 100644
--- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
@@ -1,5 +1,6 @@
 use crate::proto;
 use crate::{directoryservice::DirectoryService, B3Digest};
+use futures::StreamExt;
 use std::collections::HashMap;
 use std::sync::Arc;
 use tokio::{sync::mpsc::channel, task};
@@ -47,7 +48,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
                     task::spawn(async move {
                         if !req_inner.recursive {
                             let e: Result<proto::Directory, Status> =
-                                match directory_service.get(&digest) {
+                                match directory_service.get(&digest).await {
                                     Ok(Some(directory)) => Ok(directory),
                                     Ok(None) => Err(Status::not_found(format!(
                                         "directory {} not found",
@@ -61,9 +62,9 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
                             }
                         } else {
                             // If recursive was requested, traverse via get_recursive.
-                            let directories_it = directory_service.get_recursive(&digest);
+                            let mut directories_it = directory_service.get_recursive(&digest);
 
-                            for e in directories_it {
+                            while let Some(e) = directories_it.next().await {
                                 // map err in res from Error to Status
                                 let res = e.map_err(|e| Status::internal(e.to_string()));
                                 if tx.send(res).await.is_err() {
@@ -157,7 +158,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
 
             // check if the directory already exists in the database. We can skip
             // inserting if it's already there, as that'd be a no-op.
-            match self.directory_service.get(&dgst) {
+            match self.directory_service.get(&dgst).await {
                 Err(e) => {
                     warn!("error checking if directory already exists: {}", e);
                     return Err(e.into());
@@ -166,7 +167,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
                 Ok(Some(_)) => {}
                 // insert if it doesn't already exist
                 Ok(None) => {
-                    self.directory_service.put(directory)?;
+                    self.directory_service.put(directory).await?;
                 }
             }
         }
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 33861d9ffa4e..14ceb34c3af7 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -1,6 +1,7 @@
 use crate::nar::RenderError;
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
+use futures::StreamExt;
 use std::sync::Arc;
 use tokio::task;
 use tokio_stream::wrappers::ReceiverStream;
@@ -36,7 +37,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
                     .to_vec()
                     .try_into()
                     .map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
-                match self.path_info_service.get(digest) {
+                match self.path_info_service.get(digest).await {
                     Ok(None) => Err(Status::not_found("PathInfo not found")),
                     Ok(Some(path_info)) => Ok(Response::new(path_info)),
                     Err(e) => {
@@ -54,7 +55,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
 
         // Store the PathInfo in the client. Clients MUST validate the data
         // they receive, so we don't validate additionally here.
-        match self.path_info_service.put(path_info) {
+        match self.path_info_service.put(path_info).await {
             Ok(path_info_new) => Ok(Response::new(path_info_new)),
             Err(e) => {
                 warn!("failed to insert PathInfo: {}", e);
@@ -72,11 +73,10 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
             None => Err(Status::invalid_argument("no root node sent")),
             Some(root_node) => {
                 let path_info_service = self.path_info_service.clone();
-                let (nar_size, nar_sha256) =
-                    task::spawn_blocking(move || path_info_service.calculate_nar(&root_node))
-                        .await
-                        .unwrap()
-                        .expect("error during nar calculation"); // TODO: handle error
+                let (nar_size, nar_sha256) = path_info_service
+                    .calculate_nar(&root_node)
+                    .await
+                    .expect("error during nar calculation"); // TODO: handle error
 
                 Ok(Response::new(proto::CalculateNarResponse {
                     nar_size,
@@ -96,7 +96,8 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
         let path_info_service = self.path_info_service.clone();
 
         let _task = task::spawn(async move {
-            for e in path_info_service.list() {
+            let mut stream = path_info_service.list();
+            while let Some(e) = stream.next().await {
                 let res = e.map_err(|e| Status::internal(e.to_string()));
                 if tx.send(res).await.is_err() {
                     debug!("receiver dropped");
diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs
index 45b9c3440d89..3f7f7dff9db1 100644
--- a/tvix/store/src/tests/import.rs
+++ b/tvix/store/src/tests/import.rs
@@ -111,10 +111,12 @@ async fn complicated() {
     // ensure DIRECTORY_WITH_KEEP and DIRECTORY_COMPLICATED have been uploaded
     assert!(directory_service
         .get(&DIRECTORY_WITH_KEEP.digest())
+        .await
         .unwrap()
         .is_some());
     assert!(directory_service
         .get(&DIRECTORY_COMPLICATED.digest())
+        .await
         .unwrap()
         .is_some());
 
diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs
index 22dbd7bcba9e..e0163dc7bd93 100644
--- a/tvix/store/src/tests/nar_renderer.rs
+++ b/tvix/store/src/tests/nar_renderer.rs
@@ -8,12 +8,12 @@ use crate::tests::utils::*;
 use sha2::{Digest, Sha256};
 use std::io;
 
-#[test]
-fn single_symlink() {
-    let mut buf: Vec<u8> = vec![];
+#[tokio::test]
+async fn single_symlink() {
+    let buf: Vec<u8> = vec![];
 
-    write_nar(
-        &mut buf,
+    let buf = write_nar(
+        buf,
         &crate::proto::node::Node::Symlink(SymlinkNode {
             name: "doesntmatter".into(),
             target: "/nix/store/somewhereelse".into(),
@@ -22,6 +22,7 @@ fn single_symlink() {
         gen_blob_service(),
         gen_directory_service(),
     )
+    .await
     .expect("must succeed");
 
     assert_eq!(buf, NAR_CONTENTS_SYMLINK.to_vec());
@@ -30,24 +31,21 @@ fn single_symlink() {
 /// Make sure the NARRenderer fails if a referred blob doesn't exist.
 #[tokio::test]
 async fn single_file_missing_blob() {
-    let mut buf: Vec<u8> = vec![];
+    let buf: Vec<u8> = vec![];
 
-    let e = tokio::task::spawn_blocking(move || {
-        write_nar(
-            &mut buf,
-            &crate::proto::node::Node::File(FileNode {
-                name: "doesntmatter".into(),
-                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-                size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
-                executable: false,
-            }),
-            // the blobservice is empty intentionally, to provoke the error.
-            gen_blob_service(),
-            gen_directory_service(),
-        )
-    })
+    let e = write_nar(
+        buf,
+        &crate::proto::node::Node::File(FileNode {
+            name: "doesntmatter".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
+            executable: false,
+        }),
+        // the blobservice is empty intentionally, to provoke the error.
+        gen_blob_service(),
+        gen_directory_service(),
+    )
     .await
-    .unwrap()
     .expect_err("must fail");
 
     match e {
@@ -80,23 +78,20 @@ async fn single_file_wrong_blob_size() {
     let bs = blob_service.clone();
     // Test with a root FileNode of a too big size
     {
-        let mut buf: Vec<u8> = vec![];
-
-        let e = tokio::task::spawn_blocking(move || {
-            write_nar(
-                &mut buf,
-                &crate::proto::node::Node::File(FileNode {
-                    name: "doesntmatter".into(),
-                    digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-                    size: 42, // <- note the wrong size here!
-                    executable: false,
-                }),
-                bs,
-                gen_directory_service(),
-            )
-        })
+        let buf: Vec<u8> = vec![];
+
+        let e = write_nar(
+            buf,
+            &crate::proto::node::Node::File(FileNode {
+                name: "doesntmatter".into(),
+                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+                size: 42, // <- note the wrong size here!
+                executable: false,
+            }),
+            bs,
+            gen_directory_service(),
+        )
         .await
-        .unwrap()
         .expect_err("must fail");
 
         match e {
@@ -110,24 +105,21 @@ async fn single_file_wrong_blob_size() {
     let bs = blob_service.clone();
     // Test with a root FileNode of a too small size
     {
-        let mut buf: Vec<u8> = vec![];
-
-        let e = tokio::task::spawn_blocking(move || {
-            write_nar(
-                &mut buf,
-                &crate::proto::node::Node::File(FileNode {
-                    name: "doesntmatter".into(),
-                    digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-                    size: 2, // <- note the wrong size here!
-                    executable: false,
-                }),
-                bs,
-                gen_directory_service(),
-            )
-            .expect_err("must fail")
-        })
+        let buf: Vec<u8> = vec![];
+
+        let e = write_nar(
+            buf,
+            &crate::proto::node::Node::File(FileNode {
+                name: "doesntmatter".into(),
+                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+                size: 2, // <- note the wrong size here!
+                executable: false,
+            }),
+            bs,
+            gen_directory_service(),
+        )
         .await
-        .unwrap();
+        .expect_err("must fail");
 
         match e {
             crate::nar::RenderError::NARWriterError(e) => {
@@ -156,26 +148,21 @@ async fn single_file() {
         writer.close().await.unwrap()
     );
 
-    let mut buf: Vec<u8> = vec![];
+    let buf: Vec<u8> = vec![];
 
-    let buf = tokio::task::spawn_blocking(move || {
-        write_nar(
-            &mut buf,
-            &crate::proto::node::Node::File(FileNode {
-                name: "doesntmatter".into(),
-                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-                size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
-                executable: false,
-            }),
-            blob_service,
-            gen_directory_service(),
-        )
-        .expect("must succeed");
-
-        buf
-    })
+    let buf = write_nar(
+        buf,
+        &crate::proto::node::Node::File(FileNode {
+            name: "doesntmatter".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
+            executable: false,
+        }),
+        blob_service,
+        gen_directory_service(),
+    )
     .await
-    .unwrap();
+    .expect("must succeed");
 
     assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec());
 }
@@ -196,51 +183,48 @@ async fn test_complicated() {
     .unwrap();
     assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap());
 
-    directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap();
+    directory_service
+        .put(DIRECTORY_WITH_KEEP.clone())
+        .await
+        .unwrap();
     directory_service
         .put(DIRECTORY_COMPLICATED.clone())
+        .await
         .unwrap();
 
-    let mut buf: Vec<u8> = vec![];
+    let buf: Vec<u8> = vec![];
 
     let bs = blob_service.clone();
     let ds = directory_service.clone();
 
-    let buf = tokio::task::spawn_blocking(move || {
-        write_nar(
-            &mut buf,
-            &crate::proto::node::Node::Directory(DirectoryNode {
-                name: "doesntmatter".into(),
-                digest: DIRECTORY_COMPLICATED.digest().into(),
-                size: DIRECTORY_COMPLICATED.size(),
-            }),
-            bs,
-            ds,
-        )
-        .expect("must succeed");
-        buf
-    })
+    let buf = write_nar(
+        buf,
+        &crate::proto::node::Node::Directory(DirectoryNode {
+            name: "doesntmatter".into(),
+            digest: DIRECTORY_COMPLICATED.digest().into(),
+            size: DIRECTORY_COMPLICATED.size(),
+        }),
+        bs,
+        ds,
+    )
     .await
-    .unwrap();
+    .expect("must succeed");
 
     assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec());
 
     // ensure calculate_nar does return the correct sha256 digest and sum.
     let bs = blob_service.clone();
     let ds = directory_service.clone();
-    let (nar_size, nar_digest) = tokio::task::spawn_blocking(move || {
-        calculate_size_and_sha256(
-            &crate::proto::node::Node::Directory(DirectoryNode {
-                name: "doesntmatter".into(),
-                digest: DIRECTORY_COMPLICATED.digest().into(),
-                size: DIRECTORY_COMPLICATED.size(),
-            }),
-            bs,
-            ds,
-        )
-    })
+    let (nar_size, nar_digest) = calculate_size_and_sha256(
+        &crate::proto::node::Node::Directory(DirectoryNode {
+            name: "doesntmatter".into(),
+            digest: DIRECTORY_COMPLICATED.digest().into(),
+            size: DIRECTORY_COMPLICATED.size(),
+        }),
+        bs,
+        ds,
+    )
     .await
-    .unwrap()
     .expect("must succeed");
 
     assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size);