diff options
author | Connor Brewster <cbrewster@hey.com> | 2023-09-19T16·46-0500 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-09-21T17·58+0000 |
commit | 37a348b4fae16b2b1c5ec12deaa085a049833d7f (patch) | |
tree | 7a1b1a7160036777b010cd81628960c1ca07486e /tvix/store/src/fs/mod.rs | |
parent | 7e737fde34260daa477794d63b0b3344b4a1d81b (diff) |
refactor(tvix/store): Asyncify PathInfoService and DirectoryService r/6623
We've decided to asyncify all of the services to reduce some of the pains going back and for between sync<->async. The end goal will be for all the tvix-store internals to be async and then expose a sync interface for things like tvix eval io. Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369 Autosubmit: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/fs/mod.rs')
-rw-r--r-- | tvix/store/src/fs/mod.rs | 43 |
1 files changed, 34 insertions, 9 deletions
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs index 48e605406331..02d3bb3221ad 100644 --- a/tvix/store/src/fs/mod.rs +++ b/tvix/store/src/fs/mod.rs @@ -16,6 +16,7 @@ use crate::{ B3Digest, Error, }; use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; +use futures::StreamExt; use nix_compat::store_path::StorePath; use parking_lot::RwLock; use std::{ @@ -26,7 +27,10 @@ use std::{ sync::{atomic::Ordering, Arc}, time::Duration, }; -use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; +use tokio::{ + io::{AsyncBufReadExt, AsyncSeekExt}, + sync::mpsc, +}; use tracing::{debug, info_span, warn}; use self::{ @@ -159,7 +163,11 @@ impl TvixStoreFs { ))) } else { // If we don't have it, look it up in PathInfoService. - match self.path_info_service.get(store_path.digest)? { + let path_info_service = self.path_info_service.clone(); + let task = self + .tokio_handle + .spawn(async move { path_info_service.get(store_path.digest).await }); + match self.tokio_handle.block_on(task).unwrap()? { // the pathinfo doesn't exist, so the file doesn't exist. None => Ok(None), Some(path_info) => { @@ -204,7 +212,12 @@ impl TvixStoreFs { /// This is both used to initially insert the root node of a store path, /// as well as when looking up an intermediate DirectoryNode. fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> { - match self.directory_service.get(directory_digest) { + let directory_service = self.directory_service.clone(); + let directory_digest_clone = directory_digest.clone(); + let task = self + .tokio_handle + .spawn(async move { directory_service.get(&directory_digest_clone).await }); + match self.tokio_handle.block_on(task).unwrap() { Err(e) => { warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory"); Err(e) @@ -369,12 +382,23 @@ impl FileSystem for TvixStoreFs { if !self.list_root { return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } else { - for (i, path_info) in self - .path_info_service - .list() - .skip(offset as usize) - .enumerate() - { + let path_info_service = self.path_info_service.clone(); + let (tx, mut rx) = mpsc::channel(16); + + // This task will run in the background immediately and will exit + // after the stream ends or if we no longer want any more entries. + self.tokio_handle.spawn(async move { + let mut stream = path_info_service.list().skip(offset as usize).enumerate(); + while let Some(path_info) = stream.next().await { + if tx.send(path_info).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } + } + }); + + while let Some((i, path_info)) = rx.blocking_recv() { let path_info = match path_info { Err(e) => { warn!("failed to retrieve pathinfo: {}", e); @@ -421,6 +445,7 @@ impl FileSystem for TvixStoreFs { break; } } + return Ok(()); } } |