about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-03-28T21·05+0100
committerflokli <flokli@flokli.de>2024-03-28T21·17+0000
commit156a5a0fb6eeb231431a86f8d53f3ef50816c03d (patch)
tree6481d3a2a599f266a421ce70100c2ae32a6805e7
parentbd32024047f4977b95ae9c04c1f7b2ad3b103a91 (diff)
refactor(tvix/glue): drop ingest_entries_sync r/7800
Make this function async, and do the block_on on the (single) callsite.

Change-Id: Ib8b0b54ab5370fe02ef95f38a45d8866868a9d60
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11285
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
-rw-r--r--tvix/glue/src/builtins/import.rs15
-rw-r--r--tvix/glue/src/tvix_store_io.rs23
2 files changed, 20 insertions, 18 deletions
diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs
index 50b99690eefd..88e483031f46 100644
--- a/tvix/glue/src/builtins/import.rs
+++ b/tvix/glue/src/builtins/import.rs
@@ -102,12 +102,15 @@ async fn filtered_ingest(
 
     pin_mut!(entries_stream);
 
-    state
-        .ingest_entries_sync(entries_stream)
-        .map_err(|err| ErrorKind::IO {
-            path: Some(path.to_path_buf()),
-            error: err.into(),
-        })
+    state.tokio_handle.block_on(async {
+        state
+            .ingest_entries(entries_stream)
+            .await
+            .map_err(|err| ErrorKind::IO {
+                path: Some(path.to_path_buf()),
+                error: err.into(),
+            })
+    })
 }
 
 #[builtins(state = "Rc<TvixStoreIO>")]
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index dc1974527b13..cbda365c20da 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -273,21 +273,20 @@ impl TvixStoreIO {
             .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))
     }
 
-    /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`]
-    /// with a [`tokio::runtime::Handle::block_on`] call for synchronicity.
-    pub(crate) fn ingest_entries_sync<S>(&self, entries_stream: S) -> io::Result<Node>
+    /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`],
+    /// passing the blob_service and directory_service that's used.
+    /// The error is mapped to std::io::Error for simplicity.
+    pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node>
     where
         S: Stream<Item = DirEntry> + Unpin,
     {
-        self.tokio_handle.block_on(async move {
-            tvix_castore::import::ingest_entries(
-                &self.blob_service,
-                &self.directory_service,
-                entries_stream,
-            )
-            .await
-            .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
-        })
+        tvix_castore::import::ingest_entries(
+            &self.blob_service,
+            &self.directory_service,
+            entries_stream,
+        )
+        .await
+        .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
     }
 
     pub(crate) async fn node_to_path_info(