about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/directoryservice')
-rw-r--r--tvix/store/src/directoryservice/grpc.rs285
-rw-r--r--tvix/store/src/directoryservice/memory.rs20
-rw-r--r--tvix/store/src/directoryservice/mod.rs24
-rw-r--r--tvix/store/src/directoryservice/sled.rs20
-rw-r--r--tvix/store/src/directoryservice/traverse.rs115
-rw-r--r--tvix/store/src/directoryservice/utils.rs161
6 files changed, 301 insertions, 324 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs
index 73d88bb688a3..6257a8e81485 100644
--- a/tvix/store/src/directoryservice/grpc.rs
+++ b/tvix/store/src/directoryservice/grpc.rs
@@ -1,22 +1,24 @@
 use std::collections::HashSet;
+use std::pin::Pin;
 
 use super::{DirectoryPutter, DirectoryService};
 use crate::proto::{self, get_directory_request::ByWhat};
 use crate::{B3Digest, Error};
+use async_stream::try_stream;
+use futures::Stream;
 use tokio::net::UnixStream;
+use tokio::spawn;
 use tokio::sync::mpsc::UnboundedSender;
 use tokio::task::JoinHandle;
 use tokio_stream::wrappers::UnboundedReceiverStream;
+use tonic::async_trait;
+use tonic::Code;
 use tonic::{transport::Channel, Status};
-use tonic::{Code, Streaming};
 use tracing::{instrument, warn};
 
 /// Connects to a (remote) tvix-store DirectoryService over gRPC.
 #[derive(Clone)]
 pub struct GRPCDirectoryService {
-    /// A handle into the active tokio runtime. Necessary to spawn tasks.
-    tokio_handle: tokio::runtime::Handle,
-
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
     grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
@@ -28,13 +30,11 @@ impl GRPCDirectoryService {
     pub fn from_client(
         grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
     ) -> Self {
-        Self {
-            tokio_handle: tokio::runtime::Handle::current(),
-            grpc_client,
-        }
+        Self { grpc_client }
     }
 }
 
+#[async_trait]
 impl DirectoryService for GRPCDirectoryService {
     /// Constructs a [GRPCDirectoryService] from the passed [url::Url]:
     /// - scheme has to match `grpc+*://`.
@@ -89,11 +89,15 @@ impl DirectoryService for GRPCDirectoryService {
             }
         }
     }
-    fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> {
+
+    async fn get(
+        &self,
+        digest: &B3Digest,
+    ) -> Result<Option<crate::proto::Directory>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
         let digest_cpy = digest.clone();
-        let task = self.tokio_handle.spawn(async move {
+        let message = async move {
             let mut s = grpc_client
                 .get(proto::GetDirectoryRequest {
                     recursive: false,
@@ -104,10 +108,10 @@ impl DirectoryService for GRPCDirectoryService {
 
             // Retrieve the first message only, then close the stream (we set recursive to false)
             s.message().await
-        });
+        };
 
         let digest = digest.clone();
-        match self.tokio_handle.block_on(task)? {
+        match message.await {
             Ok(Some(directory)) => {
                 // Validate the retrieved Directory indeed has the
                 // digest we expect it to have, to detect corruptions.
@@ -134,14 +138,12 @@ impl DirectoryService for GRPCDirectoryService {
         }
     }
 
-    fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
+    async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
         let mut grpc_client = self.grpc_client.clone();
 
-        let task = self
-            .tokio_handle
-            .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });
+        let resp = grpc_client.put(tokio_stream::iter(vec![directory])).await;
 
-        match self.tokio_handle.block_on(task)? {
+        match resp {
             Ok(put_directory_resp) => Ok(put_directory_resp
                 .into_inner()
                 .root_digest
@@ -157,32 +159,82 @@ impl DirectoryService for GRPCDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> {
+    ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
         let mut grpc_client = self.grpc_client.clone();
+        let root_directory_digest = root_directory_digest.clone();
 
-        // clone so we can move it
-        let root_directory_digest_cpy = root_directory_digest.clone();
-
-        let task: JoinHandle<Result<Streaming<proto::Directory>, Status>> =
-            self.tokio_handle.spawn(async move {
-                let s = grpc_client
-                    .get(proto::GetDirectoryRequest {
-                        recursive: true,
-                        by_what: Some(ByWhat::Digest(root_directory_digest_cpy.into())),
-                    })
-                    .await?
-                    .into_inner();
-
-                Ok(s)
-            });
+        let stream = try_stream! {
+            let mut stream = grpc_client
+                .get(proto::GetDirectoryRequest {
+                    recursive: true,
+                    by_what: Some(ByWhat::Digest(root_directory_digest.clone().into())),
+                })
+                .await
+                .map_err(|e| crate::Error::StorageError(e.to_string()))?
+                .into_inner();
 
-        let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
+            // The Directory digests we received so far
+            let mut received_directory_digests: HashSet<B3Digest> = HashSet::new();
+            // The Directory digests we're still expecting to get sent.
+            let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]);
+
+            loop {
+                match stream.message().await {
+                    Ok(Some(directory)) => {
+                        // validate the directory itself.
+                        if let Err(e) = directory.validate() {
+                            Err(crate::Error::StorageError(format!(
+                                "directory {} failed validation: {}",
+                                directory.digest(),
+                                e,
+                            )))?;
+                        }
+                        // validate we actually expected that directory, and move it from expected to received.
+                        let directory_digest = directory.digest();
+                        let was_expected = expected_directory_digests.remove(&directory_digest);
+                        if !was_expected {
+                            // FUTUREWORK: dumb clients might send the same stuff twice.
+                            // as a fallback, we might want to tolerate receiving
+                            // it if it's in received_directory_digests (as that
+                            // means it once was in expected_directory_digests)
+                            Err(crate::Error::StorageError(format!(
+                                "received unexpected directory {}",
+                                directory_digest
+                            )))?;
+                        }
+                        received_directory_digests.insert(directory_digest);
+
+                        // register all children in expected_directory_digests.
+                        for child_directory in &directory.directories {
+                            // We ran validate() above, so we know these digests must be correct.
+                            let child_directory_digest =
+                                child_directory.digest.clone().try_into().unwrap();
+
+                            expected_directory_digests
+                                .insert(child_directory_digest);
+                        }
+
+                        yield directory;
+                    },
+                    Ok(None) => {
+                        // If we were still expecting something, that's an error.
+                        if !expected_directory_digests.is_empty() {
+                            Err(crate::Error::StorageError(format!(
+                                "still expected {} directories, but got premature end of stream",
+                                expected_directory_digests.len(),
+                            )))?
+                        } else {
+                            return
+                        }
+                    },
+                    Err(e) => {
+                        Err(crate::Error::StorageError(e.to_string()))?;
+                    },
+                }
+            }
+        };
 
-        Box::new(StreamIterator::new(
-            self.tokio_handle.clone(),
-            root_directory_digest.clone(),
-            stream,
-        ))
+        Box::pin(stream)
     }
 
     #[instrument(skip_all)]
@@ -194,110 +246,21 @@ impl DirectoryService for GRPCDirectoryService {
 
         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
 
-        let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> =
-            self.tokio_handle.spawn(async move {
-                let s = grpc_client
-                    .put(UnboundedReceiverStream::new(rx))
-                    .await?
-                    .into_inner();
-
-                Ok(s)
-            });
-
-        Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task))
-    }
-}
-
-pub struct StreamIterator {
-    /// A handle into the active tokio runtime. Necessary to run futures to completion.
-    tokio_handle: tokio::runtime::Handle,
-    // A stream of [proto::Directory]
-    stream: Streaming<proto::Directory>,
-    // The Directory digests we received so far
-    received_directory_digests: HashSet<B3Digest>,
-    // The Directory digests we're still expecting to get sent.
-    expected_directory_digests: HashSet<B3Digest>,
-}
-
-impl StreamIterator {
-    pub fn new(
-        tokio_handle: tokio::runtime::Handle,
-        root_digest: B3Digest,
-        stream: Streaming<proto::Directory>,
-    ) -> Self {
-        Self {
-            tokio_handle,
-            stream,
-            received_directory_digests: HashSet::new(),
-            expected_directory_digests: HashSet::from([root_digest]),
-        }
-    }
-}
-
-impl Iterator for StreamIterator {
-    type Item = Result<proto::Directory, crate::Error>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        match self.tokio_handle.block_on(self.stream.message()) {
-            Ok(ok) => match ok {
-                Some(directory) => {
-                    // validate the directory itself.
-                    if let Err(e) = directory.validate() {
-                        return Some(Err(crate::Error::StorageError(format!(
-                            "directory {} failed validation: {}",
-                            directory.digest(),
-                            e,
-                        ))));
-                    }
-                    // validate we actually expected that directory, and move it from expected to received.
-                    let directory_digest = directory.digest();
-                    let was_expected = self.expected_directory_digests.remove(&directory_digest);
-                    if !was_expected {
-                        // FUTUREWORK: dumb clients might send the same stuff twice.
-                        // as a fallback, we might want to tolerate receiving
-                        // it if it's in received_directory_digests (as that
-                        // means it once was in expected_directory_digests)
-                        return Some(Err(crate::Error::StorageError(format!(
-                            "received unexpected directory {}",
-                            directory_digest
-                        ))));
-                    }
-                    self.received_directory_digests.insert(directory_digest);
-
-                    // register all children in expected_directory_digests.
-                    for child_directory in &directory.directories {
-                        // We ran validate() above, so we know these digests must be correct.
-                        let child_directory_digest =
-                            child_directory.digest.clone().try_into().unwrap();
+        let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move {
+            let s = grpc_client
+                .put(UnboundedReceiverStream::new(rx))
+                .await?
+                .into_inner();
 
-                        self.expected_directory_digests
-                            .insert(child_directory_digest);
-                    }
+            Ok(s)
+        });
 
-                    Some(Ok(directory))
-                }
-                None => {
-                    // If we were still expecting something, that's an error.
-                    if !self.expected_directory_digests.is_empty() {
-                        Some(Err(crate::Error::StorageError(format!(
-                            "still expected {} directories, but got premature end of stream",
-                            self.expected_directory_digests.len(),
-                        ))))
-                    } else {
-                        None
-                    }
-                }
-            },
-            Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))),
-        }
+        Box::new(GRPCPutter::new(tx, task))
     }
 }
 
 /// Allows uploading multiple Directory messages in the same gRPC stream.
 pub struct GRPCPutter {
-    /// A handle into the active tokio runtime. Necessary to spawn tasks.
-    tokio_handle: tokio::runtime::Handle,
-
     /// Data about the current request - a handle to the task, and the tx part
     /// of the channel.
     /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request.
@@ -311,19 +274,18 @@ pub struct GRPCPutter {
 
 impl GRPCPutter {
     pub fn new(
-        tokio_handle: tokio::runtime::Handle,
         directory_sender: UnboundedSender<proto::Directory>,
         task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
     ) -> Self {
         Self {
-            tokio_handle,
             rq: Some((task, directory_sender)),
         }
     }
 }
 
+#[async_trait]
 impl DirectoryPutter for GRPCPutter {
-    fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
         match self.rq {
             // If we're not already closed, send the directory to directory_sender.
             Some((_, ref directory_sender)) => {
@@ -331,7 +293,7 @@ impl DirectoryPutter for GRPCPutter {
                     // If the channel has been prematurely closed, invoke close (so we can peek at the error code)
                     // That error code is much more helpful, because it
                     // contains the error message from the server.
-                    self.close()?;
+                    self.close().await?;
                 }
                 Ok(())
             }
@@ -343,7 +305,7 @@ impl DirectoryPutter for GRPCPutter {
     }
 
     /// Closes the stream for sending, and returns the value
-    fn close(&mut self) -> Result<B3Digest, crate::Error> {
+    async fn close(&mut self) -> Result<B3Digest, crate::Error> {
         // get self.rq, and replace it with None.
         // This ensures we can only close it once.
         match std::mem::take(&mut self.rq) {
@@ -352,9 +314,8 @@ impl DirectoryPutter for GRPCPutter {
                 // close directory_sender, so blocking on task will finish.
                 drop(directory_sender);
 
-                let root_digest = self
-                    .tokio_handle
-                    .block_on(task)?
+                let root_digest = task
+                    .await?
                     .map_err(|e| Error::StorageError(e.to_string()))?
                     .root_digest;
 
@@ -379,6 +340,7 @@ mod tests {
     use core::time;
     use std::thread;
 
+    use futures::StreamExt;
     use tempfile::TempDir;
     use tokio::net::{UnixListener, UnixStream};
     use tokio_stream::wrappers::UnixListenerStream;
@@ -446,7 +408,7 @@ mod tests {
             );
         }
 
-        let task = tester_runtime.spawn_blocking(move || {
+        tester_runtime.block_on(async move {
             // Create a channel, connecting to the uds at socket_path.
             // The URI is unused.
             let channel = Endpoint::try_from("http://[::]:50051")
@@ -465,6 +427,7 @@ mod tests {
                 None,
                 directory_service
                     .get(&DIRECTORY_A.digest())
+                    .await
                     .expect("must not fail")
             );
 
@@ -473,6 +436,7 @@ mod tests {
                 DIRECTORY_A.digest(),
                 directory_service
                     .put(DIRECTORY_A.clone())
+                    .await
                     .expect("must succeed")
             );
 
@@ -481,6 +445,7 @@ mod tests {
                 DIRECTORY_A.clone(),
                 directory_service
                     .get(&DIRECTORY_A.digest())
+                    .await
                     .expect("must succeed")
                     .expect("must be some")
             );
@@ -488,21 +453,22 @@ mod tests {
             // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
             directory_service
                 .put(DIRECTORY_B.clone())
+                .await
                 .expect_err("must fail");
 
             // Putting DIRECTORY_B in a put_multiple will succeed, but the close
             // will always fail.
             {
                 let mut handle = directory_service.put_multiple_start();
-                handle.put(DIRECTORY_B.clone()).expect("must succeed");
-                handle.close().expect_err("must fail");
+                handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+                handle.close().await.expect_err("must fail");
             }
 
             // Uploading A and then B should succeed, and closing should return the digest of B.
             let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_A.clone()).expect("must succeed");
-            handle.put(DIRECTORY_B.clone()).expect("must succeed");
-            let digest = handle.close().expect("must succeed");
+            handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
+            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+            let digest = handle.close().await.expect("must succeed");
             assert_eq!(DIRECTORY_B.digest(), digest);
 
             // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
@@ -511,6 +477,7 @@ mod tests {
                 DIRECTORY_B.clone(),
                 directories_it
                     .next()
+                    .await
                     .expect("must be some")
                     .expect("must succeed")
             );
@@ -518,6 +485,7 @@ mod tests {
                 DIRECTORY_A.clone(),
                 directories_it
                     .next()
+                    .await
                     .expect("must be some")
                     .expect("must succeed")
             );
@@ -529,15 +497,15 @@ mod tests {
             {
                 let mut handle = directory_service.put_multiple_start();
                 // sending out B will always be fine
-                handle.put(DIRECTORY_B.clone()).expect("must succeed");
+                handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
 
                 // whether we will be able to put A as well depends on whether we
                 // already received the error about B.
-                if handle.put(DIRECTORY_A.clone()).is_ok() {
+                if handle.put(DIRECTORY_A.clone()).await.is_ok() {
                     // If we didn't, and this was Ok(_), …
                     // a subsequent close MUST fail (because it waits for the
                     // server)
-                    handle.close().expect_err("must fail");
+                    handle.close().await.expect_err("must fail");
                 }
             }
 
@@ -547,7 +515,7 @@ mod tests {
             // and then assert that uploading anything else via the handle will fail.
             {
                 let mut handle = directory_service.put_multiple_start();
-                handle.put(DIRECTORY_B.clone()).expect("must succeed");
+                handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
 
                 let mut is_closed = false;
                 for _try in 1..1000 {
@@ -555,7 +523,7 @@ mod tests {
                         is_closed = true;
                         break;
                     }
-                    std::thread::sleep(time::Duration::from_millis(10))
+                    tokio::time::sleep(time::Duration::from_millis(10)).await;
                 }
 
                 assert!(
@@ -563,12 +531,13 @@ mod tests {
                     "expected channel to eventually close, but never happened"
                 );
 
-                handle.put(DIRECTORY_A.clone()).expect_err("must fail");
+                handle
+                    .put(DIRECTORY_A.clone())
+                    .await
+                    .expect_err("must fail");
             }
         });
 
-        tester_runtime.block_on(task)?;
-
         Ok(())
     }
 }
diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs
index 634dbf9922d0..ac67c999d01b 100644
--- a/tvix/store/src/directoryservice/memory.rs
+++ b/tvix/store/src/directoryservice/memory.rs
@@ -1,16 +1,20 @@
 use crate::{proto, B3Digest, Error};
+use futures::Stream;
 use std::collections::HashMap;
+use std::pin::Pin;
 use std::sync::{Arc, RwLock};
+use tonic::async_trait;
 use tracing::{instrument, warn};
 
-use super::utils::SimplePutter;
-use super::{DirectoryPutter, DirectoryService, DirectoryTraverser};
+use super::utils::{traverse_directory, SimplePutter};
+use super::{DirectoryPutter, DirectoryService};
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
     db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>,
 }
 
+#[async_trait]
 impl DirectoryService for MemoryDirectoryService {
     /// Constructs a [MemoryDirectoryService] from the passed [url::Url]:
     /// - scheme has to be `memory://`
@@ -27,8 +31,9 @@ impl DirectoryService for MemoryDirectoryService {
 
         Ok(Self::default())
     }
+
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
         let db = self.db.read()?;
 
         match db.get(digest) {
@@ -62,7 +67,7 @@ impl DirectoryService for MemoryDirectoryService {
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
         let digest = directory.digest();
 
         // validate the directory itself.
@@ -84,11 +89,8 @@ impl DirectoryService for MemoryDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> {
-        Box::new(DirectoryTraverser::with(
-            self.clone(),
-            root_directory_digest,
-        ))
+    ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
+        traverse_directory(self.clone(), root_directory_digest)
     }
 
     #[instrument(skip_all)]
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs
index fea4191400f3..09210dfed8e8 100644
--- a/tvix/store/src/directoryservice/mod.rs
+++ b/tvix/store/src/directoryservice/mod.rs
@@ -1,4 +1,7 @@
 use crate::{proto, B3Digest, Error};
+use futures::Stream;
+use std::pin::Pin;
+use tonic::async_trait;
 
 mod from_addr;
 mod grpc;
@@ -12,32 +15,38 @@ pub use self::grpc::GRPCDirectoryService;
 pub use self::memory::MemoryDirectoryService;
 pub use self::sled::SledDirectoryService;
 pub use self::traverse::traverse_to;
-pub use self::utils::DirectoryTraverser;
 
 /// The base trait all Directory services need to implement.
 /// This is a simple get and put of [crate::proto::Directory], returning their
 /// digest.
+#[async_trait]
 pub trait DirectoryService: Send + Sync {
     /// Create a new instance by passing in a connection URL.
+    /// TODO: check if we want to make this async, instead of lazily connecting
     fn from_url(url: &url::Url) -> Result<Self, Error>
     where
         Self: Sized;
 
     /// Get looks up a single Directory message by its digest.
     /// In case the directory is not found, Ok(None) is returned.
-    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
     /// Get uploads a single Directory message, and returns the calculated
     /// digest, or an error.
-    fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
 
     /// Looks up a closure of [proto::Directory].
-    /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`,
+    /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`,
     /// and we'd be able to add a default implementation for it here, but
     /// we can't have that yet.
+    ///
+    /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily,
+    /// and the box allows different underlying stream implementations to be returned since
+    /// Rust doesn't support this as a generic in traits yet. This is the same thing that
+    /// [async_trait] generates, but for streams instead of futures.
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send>;
+    ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>;
 
     /// Allows persisting a closure of [proto::Directory], which is a graph of
     /// connected Directory messages.
@@ -49,16 +58,17 @@ pub trait DirectoryService: Send + Sync {
 /// The consumer can periodically call [DirectoryPutter::put], starting from the
 /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
 /// retrieve the root digest (or an error).
+#[async_trait]
 pub trait DirectoryPutter: Send {
     /// Put a individual [proto::Directory] into the store.
     /// Error semantics and behaviour is up to the specific implementation of
     /// this trait.
     /// Due to bursting, the returned error might refer to an object previously
     /// sent via `put`.
-    fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
 
     /// Close the stream, and wait for any errors.
-    fn close(&mut self) -> Result<B3Digest, Error>;
+    async fn close(&mut self) -> Result<B3Digest, Error>;
 
     /// Return whether the stream is closed or not.
     /// Used from some [DirectoryService] implementations only.
diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs
index e741434eabb5..0dc5496803cb 100644
--- a/tvix/store/src/directoryservice/sled.rs
+++ b/tvix/store/src/directoryservice/sled.rs
@@ -1,12 +1,15 @@
 use crate::directoryservice::DirectoryPutter;
 use crate::proto::Directory;
 use crate::{proto, B3Digest, Error};
+use futures::Stream;
 use prost::Message;
 use std::path::PathBuf;
+use std::pin::Pin;
+use tonic::async_trait;
 use tracing::{instrument, warn};
 
-use super::utils::SimplePutter;
-use super::{DirectoryService, DirectoryTraverser};
+use super::utils::{traverse_directory, SimplePutter};
+use super::DirectoryService;
 
 #[derive(Clone)]
 pub struct SledDirectoryService {
@@ -29,6 +32,7 @@ impl SledDirectoryService {
     }
 }
 
+#[async_trait]
 impl DirectoryService for SledDirectoryService {
     /// Constructs a [SledDirectoryService] from the passed [url::Url]:
     /// - scheme has to be `sled://`
@@ -59,7 +63,7 @@ impl DirectoryService for SledDirectoryService {
     }
 
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
         match self.db.get(digest.to_vec()) {
             // The directory was not found, return
             Ok(None) => Ok(None),
@@ -99,7 +103,7 @@ impl DirectoryService for SledDirectoryService {
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
         let digest = directory.digest();
 
         // validate the directory itself.
@@ -121,12 +125,8 @@ impl DirectoryService for SledDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Box<(dyn Iterator<Item = Result<proto::Directory, Error>> + std::marker::Send + 'static)>
-    {
-        Box::new(DirectoryTraverser::with(
-            self.clone(),
-            root_directory_digest,
-        ))
+    ) -> Pin<Box<(dyn Stream<Item = Result<proto::Directory, Error>> + Send + 'static)>> {
+        traverse_directory(self.clone(), root_directory_digest)
     }
 
     #[instrument(skip_all)]
diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs
index 9029543267c1..0489c4458163 100644
--- a/tvix/store/src/directoryservice/traverse.rs
+++ b/tvix/store/src/directoryservice/traverse.rs
@@ -1,19 +1,18 @@
 use super::DirectoryService;
-use crate::{proto::NamedNode, Error};
+use crate::{proto::NamedNode, B3Digest, Error};
 use std::{os::unix::ffi::OsStrExt, sync::Arc};
 use tracing::{instrument, warn};
 
 /// This traverses from a (root) node to the given (sub)path, returning the Node
 /// at that path, or none, if there's nothing at that path.
-/// TODO: Do we want to rewrite this in a non-recursing fashion, and use
-/// [DirectoryService.get_recursive] to do less lookups?
+/// TODO: Do we want to use [DirectoryService.get_recursive] to do less lookups?
 /// Or do we consider this to be a non-issue due to store composition and local caching?
 /// TODO: the name of this function (and mod) is a bit bad, because it doesn't
 /// clearly distinguish it from the BFS traversers.
 #[instrument(skip(directory_service))]
-pub fn traverse_to(
+pub async fn traverse_to(
     directory_service: Arc<dyn DirectoryService>,
-    node: crate::proto::node::Node,
+    root_node: crate::proto::node::Node,
     path: &std::path::Path,
 ) -> Result<Option<crate::proto::node::Node>, Error> {
     // strip a possible `/` prefix from the path.
@@ -25,52 +24,56 @@ pub fn traverse_to(
         }
     };
 
+    let mut cur_node = root_node;
     let mut it = path.components();
 
-    match it.next() {
-        None => {
-            // the (remaining) path is empty, return the node we've been called with.
-            Ok(Some(node))
-        }
-        Some(first_component) => {
-            match node {
-                crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => {
-                    // There's still some path left, but the current node is no directory.
-                    // This means the path doesn't exist, as we can't reach it.
-                    Ok(None)
-                }
-                crate::proto::node::Node::Directory(directory_node) => {
-                    let digest = directory_node
-                        .digest
-                        .try_into()
-                        .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
-
-                    // fetch the linked node from the directory_service
-                    match directory_service.get(&digest)? {
-                        // If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
-                        None => {
-                            warn!("directory {} does not exist", digest);
-
-                            Err(Error::StorageError(format!(
-                                "directory {} does not exist",
-                                digest
-                            )))
-                        }
-                        Some(directory) => {
-                            // look for first_component in the [Directory].
-                            // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we
-                            // could stop as soon as e.name is larger than the search string.
-                            let child_node = directory
-                                .nodes()
-                                .find(|n| n.get_name() == first_component.as_os_str().as_bytes());
-
-                            match child_node {
-                                // child node not found means there's no such element inside the directory.
-                                None => Ok(None),
-                                // child node found, recurse with it and the rest of the path.
-                                Some(child_node) => {
-                                    let rest_path: std::path::PathBuf = it.collect();
-                                    traverse_to(directory_service, child_node, &rest_path)
+    loop {
+        match it.next() {
+            None => {
+                // the (remaining) path is empty, return the node we're current at.
+                return Ok(Some(cur_node));
+            }
+            Some(first_component) => {
+                match cur_node {
+                    crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => {
+                        // There's still some path left, but the current node is no directory.
+                        // This means the path doesn't exist, as we can't reach it.
+                        return Ok(None);
+                    }
+                    crate::proto::node::Node::Directory(directory_node) => {
+                        let digest: B3Digest = directory_node.digest.try_into().map_err(|_e| {
+                            Error::StorageError("invalid digest length".to_string())
+                        })?;
+
+                        // fetch the linked node from the directory_service
+                        match directory_service.get(&digest).await? {
+                            // If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
+                            None => {
+                                warn!("directory {} does not exist", digest);
+
+                                return Err(Error::StorageError(format!(
+                                    "directory {} does not exist",
+                                    digest
+                                )));
+                            }
+                            Some(directory) => {
+                                // look for first_component in the [Directory].
+                                // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we
+                                // could stop as soon as e.name is larger than the search string.
+                                let child_node = directory.nodes().find(|n| {
+                                    n.get_name() == first_component.as_os_str().as_bytes()
+                                });
+
+                                match child_node {
+                                    // child node not found means there's no such element inside the directory.
+                                    None => {
+                                        return Ok(None);
+                                    }
+                                    // child node found, return to top-of loop to find the next
+                                    // node in the path.
+                                    Some(child_node) => {
+                                        cur_node = child_node;
+                                    }
                                 }
                             }
                         }
@@ -92,16 +95,18 @@ mod tests {
 
     use super::traverse_to;
 
-    #[test]
-    fn test_traverse_to() {
+    #[tokio::test]
+    async fn test_traverse_to() {
         let directory_service = gen_directory_service();
 
         let mut handle = directory_service.put_multiple_start();
         handle
             .put(DIRECTORY_WITH_KEEP.clone())
+            .await
             .expect("must succeed");
         handle
             .put(DIRECTORY_COMPLICATED.clone())
+            .await
             .expect("must succeed");
 
         // construct the node for DIRECTORY_COMPLICATED
@@ -128,6 +133,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from(""),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(Some(node_directory_complicated.clone()), resp);
@@ -140,6 +146,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("keep"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(Some(node_directory_with_keep), resp);
@@ -152,6 +159,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("keep/.keep"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(Some(node_file_keep.clone()), resp);
@@ -164,6 +172,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("/keep/.keep"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(Some(node_file_keep), resp);
@@ -176,6 +185,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("void"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(None, resp);
@@ -188,6 +198,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("//v/oid"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(None, resp);
@@ -201,6 +212,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("keep/.keep/foo"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(None, resp);
@@ -213,6 +225,7 @@ mod tests {
                 node_directory_complicated.clone(),
                 &PathBuf::from("/"),
             )
+            .await
             .expect("must succeed");
 
             assert_eq!(Some(node_directory_complicated), resp);
diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs
index 95f02f1f9ce8..4c5e7cfde37c 100644
--- a/tvix/store/src/directoryservice/utils.rs
+++ b/tvix/store/src/directoryservice/utils.rs
@@ -3,103 +3,85 @@ use super::DirectoryService;
 use crate::proto;
 use crate::B3Digest;
 use crate::Error;
+use async_stream::stream;
+use futures::Stream;
 use std::collections::{HashSet, VecDeque};
-use tracing::{debug_span, instrument, warn};
+use std::pin::Pin;
+use tonic::async_trait;
+use tracing::warn;
 
 /// Traverses a [proto::Directory] from the root to the children.
 ///
 /// This is mostly BFS, but directories are only returned once.
-pub struct DirectoryTraverser<DS: DirectoryService> {
+pub fn traverse_directory<DS: DirectoryService + 'static>(
     directory_service: DS,
-    /// The list of all directories that still need to be traversed. The next
-    /// element is picked from the front, new elements are enqueued at the
-    /// back.
-    worklist_directory_digests: VecDeque<B3Digest>,
-    /// The list of directory digests already sent to the consumer.
-    /// We omit sending the same directories multiple times.
-    sent_directory_digests: HashSet<B3Digest>,
-}
-
-impl<DS: DirectoryService> DirectoryTraverser<DS> {
-    pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self {
-        Self {
-            directory_service,
-            worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]),
-            sent_directory_digests: HashSet::new(),
-        }
-    }
-
-    // enqueue all child directory digests to the work queue, as
-    // long as they're not part of the worklist or already sent.
-    // This panics if the digest looks invalid, it's supposed to be checked first.
-    fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
-        for child_directory_node in &directory.directories {
-            // TODO: propagate error
-            let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
-
-            if self.worklist_directory_digests.contains(&child_digest)
-                || self.sent_directory_digests.contains(&child_digest)
-            {
-                continue;
-            }
-            self.worklist_directory_digests.push_back(child_digest);
-        }
-    }
-}
-
-impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
-    type Item = Result<proto::Directory, Error>;
-
-    #[instrument(skip_all)]
-    fn next(&mut self) -> Option<Self::Item> {
-        // fetch the next directory digest from the top of the work queue.
-        match self.worklist_directory_digests.pop_front() {
-            None => None,
-            Some(current_directory_digest) => {
-                let span = debug_span!("directory.digest", "{}", current_directory_digest);
-                let _ = span.enter();
-
-                // look up the directory itself.
-                let current_directory = match self.directory_service.get(&current_directory_digest)
-                {
-                    // if we got it
-                    Ok(Some(current_directory)) => {
-                        // validate, we don't want to send invalid directories.
-                        if let Err(e) = current_directory.validate() {
-                            warn!("directory failed validation: {}", e.to_string());
-                            return Some(Err(Error::StorageError(format!(
-                                "invalid directory: {}",
-                                current_directory_digest
-                            ))));
-                        }
-                        current_directory
-                    }
-                    // if it's not there, we have an inconsistent store!
-                    Ok(None) => {
-                        warn!("directory {} does not exist", current_directory_digest);
-                        return Some(Err(Error::StorageError(format!(
-                            "directory {} does not exist",
+    root_directory_digest: &B3Digest,
+) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
+    // The list of all directories that still need to be traversed. The next
+    // element is picked from the front, new elements are enqueued at the
+    // back.
+    let mut worklist_directory_digests: VecDeque<B3Digest> =
+        VecDeque::from([root_directory_digest.clone()]);
+    // The list of directory digests already sent to the consumer.
+    // We omit sending the same directories multiple times.
+    let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new();
+
+    let stream = stream! {
+        while let Some(current_directory_digest) = worklist_directory_digests.pop_front() {
+            match directory_service.get(&current_directory_digest).await {
+                // if it's not there, we have an inconsistent store!
+                Ok(None) => {
+                    warn!("directory {} does not exist", current_directory_digest);
+                    yield Err(Error::StorageError(format!(
+                        "directory {} does not exist",
+                        current_directory_digest
+                    )));
+                }
+                Err(e) => {
+                    warn!("failed to look up directory");
+                    yield Err(Error::StorageError(format!(
+                        "unable to look up directory {}: {}",
+                        current_directory_digest, e
+                    )));
+                }
+
+                // if we got it
+                Ok(Some(current_directory)) => {
+                    // validate, we don't want to send invalid directories.
+                    if let Err(e) = current_directory.validate() {
+                        warn!("directory failed validation: {}", e.to_string());
+                        yield Err(Error::StorageError(format!(
+                            "invalid directory: {}",
                             current_directory_digest
-                        ))));
-                    }
-                    Err(e) => {
-                        warn!("failed to look up directory");
-                        return Some(Err(Error::StorageError(format!(
-                            "unable to look up directory {}: {}",
-                            current_directory_digest, e
-                        ))));
+                        )));
                     }
-                };
 
-                // All DirectoryServices MUST validate directory nodes, before returning them out, so we
-                // can be sure [enqueue_child_directories] doesn't panic.
+                    // We're about to send this directory, so let's avoid sending it again if a
+                    // descendant has it.
+                    sent_directory_digests.insert(current_directory_digest);
+
+                    // enqueue all child directory digests to the work queue, as
+                    // long as they're not part of the worklist or already sent.
+                    // This panics if the digest looks invalid, it's supposed to be checked first.
+                    for child_directory_node in &current_directory.directories {
+                        // TODO: propagate error
+                        let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
+
+                        if worklist_directory_digests.contains(&child_digest)
+                            || sent_directory_digests.contains(&child_digest)
+                        {
+                            continue;
+                        }
+                        worklist_directory_digests.push_back(child_digest);
+                    }
 
-                // enqueue child directories
-                self.enqueue_child_directories(&current_directory);
-                Some(Ok(current_directory))
-            }
+                    yield Ok(current_directory);
+                }
+            };
         }
-    }
+    };
+
+    Box::pin(stream)
 }
 
 /// This is a simple implementation of a Directory uploader.
@@ -120,13 +102,14 @@ impl<DS: DirectoryService> SimplePutter<DS> {
     }
 }
 
+#[async_trait]
 impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
-    fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
         if self.closed {
             return Err(Error::StorageError("already closed".to_string()));
         }
 
-        let digest = self.directory_service.put(directory)?;
+        let digest = self.directory_service.put(directory).await?;
 
         // track the last directory digest
         self.last_directory_digest = Some(digest);
@@ -135,7 +118,7 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
     }
 
     /// We need to be mutable here, as that's the signature of the trait.
-    fn close(&mut self) -> Result<B3Digest, Error> {
+    async fn close(&mut self) -> Result<B3Digest, Error> {
         if self.closed {
             return Err(Error::StorageError("already closed".to_string()));
         }