about summary refs log tree commit diff
path: root/tvix/store/src/fs/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/fs/mod.rs')
-rw-r--r--tvix/store/src/fs/mod.rs43
1 files changed, 34 insertions, 9 deletions
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs
index 48e605406331..02d3bb3221ad 100644
--- a/tvix/store/src/fs/mod.rs
+++ b/tvix/store/src/fs/mod.rs
@@ -16,6 +16,7 @@ use crate::{
     B3Digest, Error,
 };
 use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
+use futures::StreamExt;
 use nix_compat::store_path::StorePath;
 use parking_lot::RwLock;
 use std::{
@@ -26,7 +27,10 @@ use std::{
     sync::{atomic::Ordering, Arc},
     time::Duration,
 };
-use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
+use tokio::{
+    io::{AsyncBufReadExt, AsyncSeekExt},
+    sync::mpsc,
+};
 use tracing::{debug, info_span, warn};
 
 use self::{
@@ -159,7 +163,11 @@ impl TvixStoreFs {
             )))
         } else {
             // If we don't have it, look it up in PathInfoService.
-            match self.path_info_service.get(store_path.digest)? {
+            let path_info_service = self.path_info_service.clone();
+            let task = self
+                .tokio_handle
+                .spawn(async move { path_info_service.get(store_path.digest).await });
+            match self.tokio_handle.block_on(task).unwrap()? {
                 // the pathinfo doesn't exist, so the file doesn't exist.
                 None => Ok(None),
                 Some(path_info) => {
@@ -204,7 +212,12 @@ impl TvixStoreFs {
     /// This is both used to initially insert the root node of a store path,
     /// as well as when looking up an intermediate DirectoryNode.
     fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> {
-        match self.directory_service.get(directory_digest) {
+        let directory_service = self.directory_service.clone();
+        let directory_digest_clone = directory_digest.clone();
+        let task = self
+            .tokio_handle
+            .spawn(async move { directory_service.get(&directory_digest_clone).await });
+        match self.tokio_handle.block_on(task).unwrap() {
             Err(e) => {
                 warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory");
                 Err(e)
@@ -369,12 +382,23 @@ impl FileSystem for TvixStoreFs {
             if !self.list_root {
                 return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
             } else {
-                for (i, path_info) in self
-                    .path_info_service
-                    .list()
-                    .skip(offset as usize)
-                    .enumerate()
-                {
+                let path_info_service = self.path_info_service.clone();
+                let (tx, mut rx) = mpsc::channel(16);
+
+                // This task will run in the background immediately and will exit
+                // after the stream ends or if we no longer want any more entries.
+                self.tokio_handle.spawn(async move {
+                    let mut stream = path_info_service.list().skip(offset as usize).enumerate();
+                    while let Some(path_info) = stream.next().await {
+                        if tx.send(path_info).await.is_err() {
+                            // If we get a send error, it means the sync code
+                            // doesn't want any more entries.
+                            break;
+                        }
+                    }
+                });
+
+                while let Some((i, path_info)) = rx.blocking_recv() {
                     let path_info = match path_info {
                         Err(e) => {
                             warn!("failed to retrieve pathinfo: {}", e);
@@ -421,6 +445,7 @@ impl FileSystem for TvixStoreFs {
                         break;
                     }
                 }
+
                 return Ok(());
             }
         }