about summary refs log tree commit diff
path: root/tvix/store/src/import.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-13T12·20+0200
committerflokli <flokli@flokli.de>2023-09-18T10·33+0000
commitda6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch)
tree5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/store/src/import.rs
parent3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff)
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync.

This however had some annoying consequences:

 - It became more and more complicated to track when we're in a context
   with an async runtime in the context or not, producing bugs like
   https://b.tvl.fyi/issues/304
 - The sync trait shielded away async clients from async worloads,
   requiring manual block_on code inside the gRPC client code, and
   spawn_blocking calls in consumers of the trait, even if they were
   async (like the gRPC server)
 - We had to write our own custom glue code (SyncReadIntoAsyncRead)
   to convert a sync io::Read into a tokio::io::AsyncRead, which already
   existed in tokio internally, but upstream ia hesitant to expose.

This now makes the BlobService trait async (via the async_trait macro,
like we already do in various gRPC parts), and replaces the sync readers
and writers with their async counterparts.

Tests interacting with a BlobService now need to have an async runtime
available, the easiest way for this is to mark the test functions
with the tokio::test macro, allowing us to directly .await in the test
function.

In places where we don't have an async runtime available from context
(like tvix-cli), we can pass one down explicitly.

Now that we don't provide a sync interface anymore, the (sync) FUSE
library now holds a pointer to a tokio runtime handle, and needs to at
least have 2 threads available when talking to a blob service (which is
why some of the tests now use the multi_thread flavor).

The FUSE tests got a bit more verbose, as we couldn't use the
setup_and_mount function accepting a callback anymore. We can hopefully
move some of the test fixture setup to rstest in the future to make this
less repetitive.

Co-Authored-By: Connor Brewster <cbrewster@hey.com>
Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/import.rs')
-rw-r--r--tvix/store/src/import.rs18
1 files changed, 9 insertions, 9 deletions
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index cd3dc01cfe0a..6764eaddb457 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -6,8 +6,6 @@ use std::sync::Arc;
 use std::{
     collections::HashMap,
     fmt::Debug,
-    fs::File,
-    io,
     os::unix::prelude::PermissionsExt,
     path::{Path, PathBuf},
 };
@@ -57,7 +55,7 @@ impl From<super::Error> for Error {
 //
 // It assumes the caller adds returned nodes to the directories it assembles.
 #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
-fn process_entry(
+async fn process_entry(
     blob_service: Arc<dyn BlobService>,
     directory_putter: &mut Box<dyn DirectoryPutter>,
     entry: &walkdir::DirEntry,
@@ -102,16 +100,17 @@ fn process_entry(
             .metadata()
             .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
 
-        let mut file = File::open(entry.path())
+        let mut file = tokio::fs::File::open(entry.path())
+            .await
             .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?;
 
-        let mut writer = blob_service.open_write();
+        let mut writer = blob_service.open_write().await;
 
-        if let Err(e) = io::copy(&mut file, &mut writer) {
+        if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
             return Err(Error::UnableToRead(entry.path().to_path_buf(), e));
         };
 
-        let digest = writer.close()?;
+        let digest = writer.close().await?;
 
         return Ok(proto::node::Node::File(proto::FileNode {
             name: entry.file_name().as_bytes().to_vec().into(),
@@ -137,7 +136,7 @@ fn process_entry(
 /// caller to possibly register it somewhere (and potentially rename it based on
 /// some naming scheme.
 #[instrument(skip(blob_service, directory_service), fields(path=?p))]
-pub fn ingest_path<P: AsRef<Path> + Debug>(
+pub async fn ingest_path<P: AsRef<Path> + Debug>(
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
     p: P,
@@ -175,7 +174,8 @@ pub fn ingest_path<P: AsRef<Path> + Debug>(
             &mut directory_putter,
             &entry,
             maybe_directory,
-        )?;
+        )
+        .await?;
 
         if entry.depth() == 0 {
             return Ok(node);