diff options
Diffstat (limited to 'tvix/store/src/fs/mod.rs')
-rw-r--r-- | tvix/store/src/fs/mod.rs | 43 |
1 files changed, 34 insertions, 9 deletions
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs index 48e605406331..02d3bb3221ad 100644 --- a/tvix/store/src/fs/mod.rs +++ b/tvix/store/src/fs/mod.rs @@ -16,6 +16,7 @@ use crate::{ B3Digest, Error, }; use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; +use futures::StreamExt; use nix_compat::store_path::StorePath; use parking_lot::RwLock; use std::{ @@ -26,7 +27,10 @@ use std::{ sync::{atomic::Ordering, Arc}, time::Duration, }; -use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; +use tokio::{ + io::{AsyncBufReadExt, AsyncSeekExt}, + sync::mpsc, +}; use tracing::{debug, info_span, warn}; use self::{ @@ -159,7 +163,11 @@ impl TvixStoreFs { ))) } else { // If we don't have it, look it up in PathInfoService. - match self.path_info_service.get(store_path.digest)? { + let path_info_service = self.path_info_service.clone(); + let task = self + .tokio_handle + .spawn(async move { path_info_service.get(store_path.digest).await }); + match self.tokio_handle.block_on(task).unwrap()? { // the pathinfo doesn't exist, so the file doesn't exist. None => Ok(None), Some(path_info) => { @@ -204,7 +212,12 @@ impl TvixStoreFs { /// This is both used to initially insert the root node of a store path, /// as well as when looking up an intermediate DirectoryNode. fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> { - match self.directory_service.get(directory_digest) { + let directory_service = self.directory_service.clone(); + let directory_digest_clone = directory_digest.clone(); + let task = self + .tokio_handle + .spawn(async move { directory_service.get(&directory_digest_clone).await }); + match self.tokio_handle.block_on(task).unwrap() { Err(e) => { warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory"); Err(e) @@ -369,12 +382,23 @@ impl FileSystem for TvixStoreFs { if !self.list_root { return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } else { - for (i, path_info) in self - .path_info_service - .list() - .skip(offset as usize) - .enumerate() - { + let path_info_service = self.path_info_service.clone(); + let (tx, mut rx) = mpsc::channel(16); + + // This task will run in the background immediately and will exit + // after the stream ends or if we no longer want any more entries. + self.tokio_handle.spawn(async move { + let mut stream = path_info_service.list().skip(offset as usize).enumerate(); + while let Some(path_info) = stream.next().await { + if tx.send(path_info).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } + } + }); + + while let Some((i, path_info)) = rx.blocking_recv() { let path_info = match path_info { Err(e) => { warn!("failed to retrieve pathinfo: {}", e); @@ -421,6 +445,7 @@ impl FileSystem for TvixStoreFs { break; } } + return Ok(()); } } |