about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-13T12·20+0200
committerflokli <flokli@flokli.de>2023-09-18T10·33+0000
commitda6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch)
tree5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix
parent3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff)
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync.

This however had some annoying consequences:

 - It became more and more complicated to track when we're in a context
   with an async runtime in the context or not, producing bugs like
   https://b.tvl.fyi/issues/304
 - The sync trait shielded away async clients from async worloads,
   requiring manual block_on code inside the gRPC client code, and
   spawn_blocking calls in consumers of the trait, even if they were
   async (like the gRPC server)
 - We had to write our own custom glue code (SyncReadIntoAsyncRead)
   to convert a sync io::Read into a tokio::io::AsyncRead, which already
   existed in tokio internally, but upstream ia hesitant to expose.

This now makes the BlobService trait async (via the async_trait macro,
like we already do in various gRPC parts), and replaces the sync readers
and writers with their async counterparts.

Tests interacting with a BlobService now need to have an async runtime
available, the easiest way for this is to mark the test functions
with the tokio::test macro, allowing us to directly .await in the test
function.

In places where we don't have an async runtime available from context
(like tvix-cli), we can pass one down explicitly.

Now that we don't provide a sync interface anymore, the (sync) FUSE
library now holds a pointer to a tokio runtime handle, and needs to at
least have 2 threads available when talking to a blob service (which is
why some of the tests now use the multi_thread flavor).

The FUSE tests got a bit more verbose, as we couldn't use the
setup_and_mount function accepting a callback anymore. We can hopefully
move some of the test fixture setup to rstest in the future to make this
less repetitive.

Co-Authored-By: Connor Brewster <cbrewster@hey.com>
Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix')
-rw-r--r--tvix/Cargo.lock90
-rw-r--r--tvix/Cargo.nix283
-rw-r--r--tvix/cli/Cargo.toml1
-rw-r--r--tvix/cli/src/main.rs9
-rw-r--r--tvix/cli/src/tvix_store_io.rs175
-rw-r--r--tvix/store/Cargo.toml3
-rw-r--r--tvix/store/src/bin/tvix-store.rs21
-rw-r--r--tvix/store/src/blobservice/dumb_seeker.rs110
-rw-r--r--tvix/store/src/blobservice/grpc.rs248
-rw-r--r--tvix/store/src/blobservice/memory.rs45
-rw-r--r--tvix/store/src/blobservice/mod.rs27
-rw-r--r--tvix/store/src/blobservice/naive_seeker.rs269
-rw-r--r--tvix/store/src/blobservice/sled.rs43
-rw-r--r--tvix/store/src/blobservice/tests.rs296
-rw-r--r--tvix/store/src/directoryservice/mod.rs2
-rw-r--r--tvix/store/src/fuse/mod.rs107
-rw-r--r--tvix/store/src/fuse/tests.rs472
-rw-r--r--tvix/store/src/import.rs18
-rw-r--r--tvix/store/src/nar/renderer.rs15
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs53
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs10
-rw-r--r--tvix/store/src/proto/mod.rs2
-rw-r--r--tvix/store/src/proto/sync_read_into_async_read.rs158
-rw-r--r--tvix/store/src/tests/import.rs19
-rw-r--r--tvix/store/src/tests/nar_renderer.rs212
25 files changed, 1693 insertions, 995 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index d1a31b08a590..f0a73f3a3f51 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -3,6 +3,21 @@
 version = 3
 
 [[package]]
+name = "addr2line"
+version = "0.21.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
+dependencies = [
+ "gimli",
+]
+
+[[package]]
+name = "adler"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
+
+[[package]]
 name = "aho-corasick"
 version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -180,6 +195,21 @@ dependencies = [
 ]
 
 [[package]]
+name = "backtrace"
+version = "0.3.69"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
+dependencies = [
+ "addr2line",
+ "cc",
+ "cfg-if",
+ "libc",
+ "miniz_oxide",
+ "object",
+ "rustc-demangle",
+]
+
+[[package]]
 name = "base64"
 version = "0.10.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -920,6 +950,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "gimli"
+version = "0.28.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
+
+[[package]]
 name = "glob"
 version = "0.3.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1043,7 +1079,7 @@ dependencies = [
  "httpdate",
  "itoa",
  "pin-project-lite",
- "socket2",
+ "socket2 0.4.9",
  "tokio",
  "tower-service",
  "tracing",
@@ -1243,9 +1279,9 @@ dependencies = [
 
 [[package]]
 name = "libc"
-version = "0.2.144"
+version = "0.2.148"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
+checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
 
 [[package]]
 name = "libm"
@@ -1306,6 +1342,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
 
 [[package]]
+name = "miniz_oxide"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
+dependencies = [
+ "adler",
+]
+
+[[package]]
 name = "mio"
 version = "0.8.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1408,6 +1453,15 @@ dependencies = [
 ]
 
 [[package]]
+name = "object"
+version = "0.32.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
 name = "once_cell"
 version = "1.17.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1525,9 +1579,9 @@ dependencies = [
 
 [[package]]
 name = "pin-project-lite"
-version = "0.2.9"
+version = "0.2.13"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
+checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
 
 [[package]]
 name = "pin-utils"
@@ -1951,6 +2005,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "rustc-demangle"
+version = "0.1.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
+
+[[package]]
 name = "rustc-hash"
 version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2177,6 +2237,16 @@ dependencies = [
 ]
 
 [[package]]
+name = "socket2"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e"
+dependencies = [
+ "libc",
+ "windows-sys 0.48.0",
+]
+
+[[package]]
 name = "ssri"
 version = "7.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2432,18 +2502,18 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 [[package]]
 name = "tokio"
-version = "1.28.0"
+version = "1.32.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c3c786bf8134e5a3a166db9b29ab8f48134739014a3eca7bc6bfa95d673b136f"
+checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
 dependencies = [
- "autocfg",
+ "backtrace",
  "bytes",
  "libc",
  "mio",
  "num_cpus",
  "pin-project-lite",
  "signal-hook-registry",
- "socket2",
+ "socket2 0.5.4",
  "tokio-macros",
  "windows-sys 0.48.0",
 ]
@@ -2734,6 +2804,7 @@ dependencies = [
  "smol_str",
  "ssri",
  "thiserror",
+ "tokio",
  "tracing",
  "tvix-eval",
  "tvix-store",
@@ -2805,6 +2876,7 @@ dependencies = [
  "lazy_static",
  "libc",
  "nix-compat",
+ "pin-project-lite",
  "prost",
  "prost-build",
  "rayon",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 4089008aaad6..e967b345e26f 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -129,6 +129,50 @@ rec {
     #   inject test dependencies into the build
 
     crates = {
+      "addr2line" = rec {
+        crateName = "addr2line";
+        version = "0.21.0";
+        edition = "2018";
+        sha256 = "1jx0k3iwyqr8klqbzk6kjvr496yd94aspis10vwsj5wy7gib4c4a";
+        dependencies = [
+          {
+            name = "gimli";
+            packageId = "gimli";
+            usesDefaultFeatures = false;
+            features = [ "read" ];
+          }
+        ];
+        features = {
+          "alloc" = [ "dep:alloc" ];
+          "compiler_builtins" = [ "dep:compiler_builtins" ];
+          "core" = [ "dep:core" ];
+          "cpp_demangle" = [ "dep:cpp_demangle" ];
+          "default" = [ "rustc-demangle" "cpp_demangle" "std-object" "fallible-iterator" "smallvec" "memmap2" ];
+          "fallible-iterator" = [ "dep:fallible-iterator" ];
+          "memmap2" = [ "dep:memmap2" ];
+          "object" = [ "dep:object" ];
+          "rustc-demangle" = [ "dep:rustc-demangle" ];
+          "rustc-dep-of-std" = [ "core" "alloc" "compiler_builtins" "gimli/rustc-dep-of-std" ];
+          "smallvec" = [ "dep:smallvec" ];
+          "std" = [ "gimli/std" ];
+          "std-object" = [ "std" "object" "object/std" "object/compression" "gimli/endian-reader" ];
+        };
+      };
+      "adler" = rec {
+        crateName = "adler";
+        version = "1.0.2";
+        edition = "2015";
+        sha256 = "1zim79cvzd5yrkzl3nyfx0avijwgk9fqv3yrscdy1cc79ih02qpj";
+        authors = [
+          "Jonas Schievink <jonasschievink@gmail.com>"
+        ];
+        features = {
+          "compiler_builtins" = [ "dep:compiler_builtins" ];
+          "core" = [ "dep:core" ];
+          "default" = [ "std" ];
+          "rustc-dep-of-std" = [ "core" "compiler_builtins" ];
+        };
+      };
       "aho-corasick" = rec {
         crateName = "aho-corasick";
         version = "1.0.1";
@@ -618,6 +662,67 @@ rec {
           "tracing" = [ "dep:tracing" ];
         };
       };
+      "backtrace" = rec {
+        crateName = "backtrace";
+        version = "0.3.69";
+        edition = "2018";
+        sha256 = "0dsq23dhw4pfndkx2nsa1ml2g31idm7ss7ljxp8d57avygivg290";
+        authors = [
+          "The Rust Project Developers"
+        ];
+        dependencies = [
+          {
+            name = "addr2line";
+            packageId = "addr2line";
+            usesDefaultFeatures = false;
+            target = { target, features }: (!((target."windows" or false) && ("msvc" == target."env") && (!("uwp" == target."vendor"))));
+          }
+          {
+            name = "cfg-if";
+            packageId = "cfg-if";
+          }
+          {
+            name = "libc";
+            packageId = "libc";
+            usesDefaultFeatures = false;
+            target = { target, features }: (!((target."windows" or false) && ("msvc" == target."env") && (!("uwp" == target."vendor"))));
+          }
+          {
+            name = "miniz_oxide";
+            packageId = "miniz_oxide";
+            usesDefaultFeatures = false;
+            target = { target, features }: (!((target."windows" or false) && ("msvc" == target."env") && (!("uwp" == target."vendor"))));
+          }
+          {
+            name = "object";
+            packageId = "object";
+            usesDefaultFeatures = false;
+            target = { target, features }: (!((target."windows" or false) && ("msvc" == target."env") && (!("uwp" == target."vendor"))));
+            features = [ "read_core" "elf" "macho" "pe" "unaligned" "archive" ];
+          }
+          {
+            name = "rustc-demangle";
+            packageId = "rustc-demangle";
+          }
+        ];
+        buildDependencies = [
+          {
+            name = "cc";
+            packageId = "cc";
+          }
+        ];
+        features = {
+          "cpp_demangle" = [ "dep:cpp_demangle" ];
+          "default" = [ "std" ];
+          "rustc-serialize" = [ "dep:rustc-serialize" ];
+          "serde" = [ "dep:serde" ];
+          "serialize-rustc" = [ "rustc-serialize" ];
+          "serialize-serde" = [ "serde" ];
+          "verify-winapi" = [ "winapi/dbghelp" "winapi/handleapi" "winapi/libloaderapi" "winapi/memoryapi" "winapi/minwindef" "winapi/processthreadsapi" "winapi/synchapi" "winapi/tlhelp32" "winapi/winbase" "winapi/winnt" ];
+          "winapi" = [ "dep:winapi" ];
+        };
+        resolvedDefaultFeatures = [ "default" "std" ];
+      };
       "base64 0.10.1" = rec {
         crateName = "base64";
         version = "0.10.1";
@@ -2586,6 +2691,23 @@ rec {
         };
         resolvedDefaultFeatures = [ "std" ];
       };
+      "gimli" = rec {
+        crateName = "gimli";
+        version = "0.28.0";
+        edition = "2018";
+        sha256 = "1h7hcl3chfvd2gfrrxjymnwj7anqxjslvz20kcargkvsya2dgf3g";
+        features = {
+          "default" = [ "read-all" "write" ];
+          "endian-reader" = [ "read" "dep:stable_deref_trait" ];
+          "fallible-iterator" = [ "dep:fallible-iterator" ];
+          "read" = [ "read-core" ];
+          "read-all" = [ "read" "std" "fallible-iterator" "endian-reader" ];
+          "rustc-dep-of-std" = [ "dep:core" "dep:alloc" "dep:compiler_builtins" ];
+          "std" = [ "fallible-iterator?/std" "stable_deref_trait?/std" ];
+          "write" = [ "dep:indexmap" ];
+        };
+        resolvedDefaultFeatures = [ "read" "read-core" ];
+      };
       "glob" = rec {
         crateName = "glob";
         version = "0.3.1";
@@ -2925,7 +3047,7 @@ rec {
           }
           {
             name = "socket2";
-            packageId = "socket2";
+            packageId = "socket2 0.4.9";
             optional = true;
             features = [ "all" ];
           }
@@ -3570,9 +3692,9 @@ rec {
       };
       "libc" = rec {
         crateName = "libc";
-        version = "0.2.144";
+        version = "0.2.148";
         edition = "2015";
-        sha256 = "1qfzrwhncsradwvdzd8vsj4mc31fh0rb5rvny3884rwa48fcq01b";
+        sha256 = "16rn9l8s5sj9n2jb2pw13ghqwa5nvjggkh9q3lp6vs1jfghp3p4w";
         authors = [
           "The Rust Project Developers"
         ];
@@ -3722,6 +3844,32 @@ rec {
         ];
 
       };
+      "miniz_oxide" = rec {
+        crateName = "miniz_oxide";
+        version = "0.7.1";
+        edition = "2018";
+        sha256 = "1ivl3rbbdm53bzscrd01g60l46lz5krl270487d8lhjvwl5hx0g7";
+        authors = [
+          "Frommi <daniil.liferenko@gmail.com>"
+          "oyvindln <oyvindln@users.noreply.github.com>"
+        ];
+        dependencies = [
+          {
+            name = "adler";
+            packageId = "adler";
+            usesDefaultFeatures = false;
+          }
+        ];
+        features = {
+          "alloc" = [ "dep:alloc" ];
+          "compiler_builtins" = [ "dep:compiler_builtins" ];
+          "core" = [ "dep:core" ];
+          "default" = [ "with-alloc" ];
+          "rustc-dep-of-std" = [ "core" "alloc" "compiler_builtins" "adler/rustc-dep-of-std" ];
+          "simd" = [ "simd-adler32" ];
+          "simd-adler32" = [ "dep:simd-adler32" ];
+        };
+      };
       "mio" = rec {
         crateName = "mio";
         version = "0.8.6";
@@ -3762,7 +3910,7 @@ rec {
         features = {
           "os-ext" = [ "os-poll" "windows-sys/Win32_System_Pipes" "windows-sys/Win32_Security" ];
         };
-        resolvedDefaultFeatures = [ "default" "net" "os-ext" "os-poll" ];
+        resolvedDefaultFeatures = [ "net" "os-ext" "os-poll" ];
       };
       "multimap" = rec {
         crateName = "multimap";
@@ -4035,6 +4183,38 @@ rec {
         ];
 
       };
+      "object" = rec {
+        crateName = "object";
+        version = "0.32.1";
+        edition = "2018";
+        sha256 = "1c02x4kvqpnl3wn7gz9idm4jrbirbycyqjgiw6lm1g9k77fzkxcw";
+        dependencies = [
+          {
+            name = "memchr";
+            packageId = "memchr";
+            usesDefaultFeatures = false;
+          }
+        ];
+        features = {
+          "all" = [ "read" "write" "std" "compression" "wasm" ];
+          "alloc" = [ "dep:alloc" ];
+          "compiler_builtins" = [ "dep:compiler_builtins" ];
+          "compression" = [ "dep:flate2" "dep:ruzstd" "std" ];
+          "core" = [ "dep:core" ];
+          "default" = [ "read" "compression" ];
+          "doc" = [ "read_core" "write_std" "std" "compression" "archive" "coff" "elf" "macho" "pe" "wasm" "xcoff" ];
+          "pe" = [ "coff" ];
+          "read" = [ "read_core" "archive" "coff" "elf" "macho" "pe" "xcoff" "unaligned" ];
+          "rustc-dep-of-std" = [ "core" "compiler_builtins" "alloc" "memchr/rustc-dep-of-std" ];
+          "std" = [ "memchr/std" ];
+          "unstable-all" = [ "all" "unstable" ];
+          "wasm" = [ "dep:wasmparser" ];
+          "write" = [ "write_std" "coff" "elf" "macho" "pe" "xcoff" ];
+          "write_core" = [ "dep:crc32fast" "dep:indexmap" "dep:hashbrown" ];
+          "write_std" = [ "write_core" "std" "indexmap?/std" "crc32fast?/std" ];
+        };
+        resolvedDefaultFeatures = [ "archive" "coff" "elf" "macho" "pe" "read_core" "unaligned" ];
+      };
       "once_cell" = rec {
         crateName = "once_cell";
         version = "1.17.1";
@@ -4316,9 +4496,9 @@ rec {
       };
       "pin-project-lite" = rec {
         crateName = "pin-project-lite";
-        version = "0.2.9";
+        version = "0.2.13";
         edition = "2018";
-        sha256 = "05n1z851l356hpgqadw4ar64mjanaxq1qlwqsf2k05ziq8xax9z0";
+        sha256 = "0n0bwr5qxlf0mhn2xkl36sy55118s9qmvx2yl5f3ixkb007lbywa";
 
       };
       "pin-utils" = rec {
@@ -5518,6 +5698,20 @@ rec {
           "serde1" = [ "serde" "text-size/serde" ];
         };
       };
+      "rustc-demangle" = rec {
+        crateName = "rustc-demangle";
+        version = "0.1.23";
+        edition = "2015";
+        sha256 = "0xnbk2bmyzshacjm2g1kd4zzv2y2az14bw3sjccq5qkpmsfvn9nn";
+        authors = [
+          "Alex Crichton <alex@alexcrichton.com>"
+        ];
+        features = {
+          "compiler_builtins" = [ "dep:compiler_builtins" ];
+          "core" = [ "dep:core" ];
+          "rustc-dep-of-std" = [ "core" "compiler_builtins" ];
+        };
+      };
       "rustc-hash" = rec {
         crateName = "rustc-hash";
         version = "1.1.0";
@@ -6242,7 +6436,7 @@ rec {
         };
         resolvedDefaultFeatures = [ "default" "std" ];
       };
-      "socket2" = rec {
+      "socket2 0.4.9" = rec {
         crateName = "socket2";
         version = "0.4.9";
         edition = "2018";
@@ -6267,6 +6461,31 @@ rec {
         features = { };
         resolvedDefaultFeatures = [ "all" ];
       };
+      "socket2 0.5.4" = rec {
+        crateName = "socket2";
+        version = "0.5.4";
+        edition = "2021";
+        sha256 = "17lqx8w2b3nysrkdbdz8y7fkikz5v77c052q57lxwajmxchfhca0";
+        authors = [
+          "Alex Crichton <alex@alexcrichton.com>"
+          "Thomas de Zeeuw <thomasdezeeuw@gmail.com>"
+        ];
+        dependencies = [
+          {
+            name = "libc";
+            packageId = "libc";
+            target = { target, features }: (target."unix" or false);
+          }
+          {
+            name = "windows-sys";
+            packageId = "windows-sys 0.48.0";
+            target = { target, features }: (target."windows" or false);
+            features = [ "Win32_Foundation" "Win32_Networking_WinSock" "Win32_System_IO" "Win32_System_Threading" "Win32_System_WindowsProgramming" ];
+          }
+        ];
+        features = { };
+        resolvedDefaultFeatures = [ "all" ];
+      };
       "ssri" = rec {
         crateName = "ssri";
         version = "7.0.0";
@@ -6909,14 +7128,19 @@ rec {
       };
       "tokio" = rec {
         crateName = "tokio";
-        version = "1.28.0";
+        version = "1.32.0";
         edition = "2021";
-        sha256 = "0vqk7dkmvadzqrxwlgja04wlf4s8iymjk6yvcshs7r9lh6zqdiy3";
+        sha256 = "1yck1349q23l22bgxcbqd3wkaffw2vmkf7z26m3wgmkcxmvn1v8p";
         authors = [
           "Tokio Contributors <team@tokio.rs>"
         ];
         dependencies = [
           {
+            name = "backtrace";
+            packageId = "backtrace";
+            target = { target, features }: (target."tokio_taskdump" or false);
+          }
+          {
             name = "bytes";
             packageId = "bytes";
             optional = true;
@@ -6931,6 +7155,7 @@ rec {
             name = "mio";
             packageId = "mio";
             optional = true;
+            usesDefaultFeatures = false;
           }
           {
             name = "num_cpus";
@@ -6949,9 +7174,9 @@ rec {
           }
           {
             name = "socket2";
-            packageId = "socket2";
+            packageId = "socket2 0.5.4";
             optional = true;
-            target = { target, features }: (!(("wasm32" == target."arch") || ("wasm64" == target."arch")));
+            target = { target, features }: (!(builtins.elem "wasm" target."family"));
             features = [ "all" ];
           }
           {
@@ -6962,22 +7187,10 @@ rec {
           {
             name = "windows-sys";
             packageId = "windows-sys 0.48.0";
-            target = { target, features }: (target."docsrs" or false);
-            features = [ "Win32_Foundation" "Win32_Security_Authorization" ];
-          }
-          {
-            name = "windows-sys";
-            packageId = "windows-sys 0.48.0";
             optional = true;
             target = { target, features }: (target."windows" or false);
           }
         ];
-        buildDependencies = [
-          {
-            name = "autocfg";
-            packageId = "autocfg";
-          }
-        ];
         devDependencies = [
           {
             name = "libc";
@@ -6986,8 +7199,14 @@ rec {
           }
           {
             name = "socket2";
-            packageId = "socket2";
-            target = { target, features }: (!(("wasm32" == target."arch") || ("wasm64" == target."arch")));
+            packageId = "socket2 0.5.4";
+            target = { target, features }: (!(builtins.elem "wasm" target."family"));
+          }
+          {
+            name = "windows-sys";
+            packageId = "windows-sys 0.48.0";
+            target = { target, features }: (target."windows" or false);
+            features = [ "Win32_Foundation" "Win32_Security_Authorization" ];
           }
         ];
         features = {
@@ -7010,7 +7229,7 @@ rec {
           "tracing" = [ "dep:tracing" ];
           "windows-sys" = [ "dep:windows-sys" ];
         };
-        resolvedDefaultFeatures = [ "bytes" "default" "io-std" "io-util" "libc" "macros" "mio" "net" "num_cpus" "rt" "rt-multi-thread" "signal" "signal-hook-registry" "socket2" "sync" "time" "tokio-macros" "windows-sys" ];
+        resolvedDefaultFeatures = [ "bytes" "default" "fs" "io-std" "io-util" "libc" "macros" "mio" "net" "num_cpus" "rt" "rt-multi-thread" "signal" "signal-hook-registry" "socket2" "sync" "time" "tokio-macros" "windows-sys" ];
       };
       "tokio-io-timeout" = rec {
         crateName = "tokio-io-timeout";
@@ -8074,6 +8293,10 @@ rec {
             packageId = "thiserror";
           }
           {
+            name = "tokio";
+            packageId = "tokio";
+          }
+          {
             name = "tracing";
             packageId = "tracing";
           }
@@ -8355,6 +8578,10 @@ rec {
             packageId = "nix-compat";
           }
           {
+            name = "pin-project-lite";
+            packageId = "pin-project-lite";
+          }
+          {
             name = "prost";
             packageId = "prost";
           }
@@ -8386,7 +8613,7 @@ rec {
           {
             name = "tokio";
             packageId = "tokio";
-            features = [ "net" "rt-multi-thread" "signal" ];
+            features = [ "fs" "net" "rt-multi-thread" "signal" ];
           }
           {
             name = "tokio-stream";
@@ -10007,7 +10234,7 @@ rec {
           "Win32_Web" = [ "Win32" ];
           "Win32_Web_InternetExplorer" = [ "Win32_Web" ];
         };
-        resolvedDefaultFeatures = [ "Win32" "Win32_Foundation" "Win32_NetworkManagement" "Win32_NetworkManagement_IpHelper" "Win32_Networking" "Win32_Networking_WinSock" "Win32_Security" "Win32_Security_Authorization" "Win32_Storage" "Win32_Storage_FileSystem" "Win32_System" "Win32_System_Console" "Win32_System_Diagnostics" "Win32_System_Diagnostics_Debug" "Win32_System_IO" "Win32_System_Pipes" "Win32_System_SystemServices" "Win32_System_Threading" "default" ];
+        resolvedDefaultFeatures = [ "Win32" "Win32_Foundation" "Win32_NetworkManagement" "Win32_NetworkManagement_IpHelper" "Win32_Networking" "Win32_Networking_WinSock" "Win32_Security" "Win32_Storage" "Win32_Storage_FileSystem" "Win32_System" "Win32_System_Console" "Win32_System_Diagnostics" "Win32_System_Diagnostics_Debug" "Win32_System_IO" "Win32_System_Pipes" "Win32_System_SystemServices" "Win32_System_Threading" "Win32_System_WindowsProgramming" "default" ];
       };
       "windows-targets 0.42.2" = rec {
         crateName = "windows-targets";
diff --git a/tvix/cli/Cargo.toml b/tvix/cli/Cargo.toml
index caebb169aaeb..a73d9be2393c 100644
--- a/tvix/cli/Cargo.toml
+++ b/tvix/cli/Cargo.toml
@@ -20,6 +20,7 @@ smol_str = "0.2.0"
 ssri = "7.0.0"
 thiserror = "1.0.38"
 tracing = "0.1.37"
+tokio = "1.28.0"
 
 [dependencies.wu-manber]
 git = "https://github.com/tvlfyi/wu-manber.git"
diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs
index 1980ac1731e7..65970e1d1cd8 100644
--- a/tvix/cli/src/main.rs
+++ b/tvix/cli/src/main.rs
@@ -80,9 +80,16 @@ fn interpret(code: &str, path: Option<PathBuf>, args: &Args, explain: bool) -> b
         directory_service.clone(),
     ));
 
+    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
+
     eval.io_handle = Box::new(tvix_io::TvixIO::new(
         known_paths.clone(),
-        TvixStoreIO::new(blob_service, directory_service, path_info_service),
+        TvixStoreIO::new(
+            blob_service,
+            directory_service,
+            path_info_service,
+            tokio_runtime.handle().clone(),
+        ),
     ));
 
     // bundle fetchurl.nix (used in nixpkgs) by resolving <nix> to
diff --git a/tvix/cli/src/tvix_store_io.rs b/tvix/cli/src/tvix_store_io.rs
index 2c673385342f..1a373a705fe4 100644
--- a/tvix/cli/src/tvix_store_io.rs
+++ b/tvix/cli/src/tvix_store_io.rs
@@ -2,6 +2,7 @@
 
 use nix_compat::store_path::{self, StorePath};
 use std::{io, path::Path, path::PathBuf, sync::Arc};
+use tokio::io::AsyncReadExt;
 use tracing::{error, instrument, warn};
 use tvix_eval::{EvalIO, FileType, StdIO};
 
@@ -27,6 +28,7 @@ pub struct TvixStoreIO {
     directory_service: Arc<dyn DirectoryService>,
     path_info_service: Arc<dyn PathInfoService>,
     std_io: StdIO,
+    tokio_handle: tokio::runtime::Handle,
 }
 
 impl TvixStoreIO {
@@ -34,12 +36,14 @@ impl TvixStoreIO {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
         path_info_service: Arc<dyn PathInfoService>,
+        tokio_handle: tokio::runtime::Handle,
     ) -> Self {
         Self {
             blob_service,
             directory_service,
             path_info_service,
             std_io: StdIO {},
+            tokio_handle,
         }
     }
 
@@ -86,65 +90,6 @@ impl TvixStoreIO {
             sub_path,
         )?)
     }
-
-    /// Imports a given path on the filesystem into the store, and returns the
-    /// [PathInfo] describing the path, that was sent to
-    /// [PathInfoService].
-    /// While not part of the [EvalIO], it's still useful for clients who
-    /// care about the [PathInfo].
-    #[instrument(skip(self), ret, err)]
-    pub fn import_path_with_pathinfo(&self, path: &std::path::Path) -> Result<PathInfo, io::Error> {
-        // Call [import::ingest_path], which will walk over the given path and return a root_node.
-        let root_node = import::ingest_path(
-            self.blob_service.clone(),
-            self.directory_service.clone(),
-            path,
-        )
-        .expect("error during import_path");
-
-        // Render the NAR
-        let (nar_size, nar_sha256) = calculate_size_and_sha256(
-            &root_node,
-            self.blob_service.clone(),
-            self.directory_service.clone(),
-        )
-        .expect("error during nar calculation"); // TODO: handle error
-
-        // TODO: make a path_to_name helper function?
-        let name = path
-            .file_name()
-            .expect("path must not be ..")
-            .to_str()
-            .expect("path must be valid unicode");
-
-        let output_path = store_path::build_nar_based_store_path(&nar_sha256, name);
-
-        // assemble a new root_node with a name that is derived from the nar hash.
-        let root_node = root_node.rename(output_path.to_string().into_bytes().into());
-
-        // assemble the [PathInfo] object.
-        let path_info = PathInfo {
-            node: Some(tvix_store::proto::Node {
-                node: Some(root_node),
-            }),
-            // There's no reference scanning on path contents ingested like this.
-            references: vec![],
-            narinfo: Some(NarInfo {
-                nar_size,
-                nar_sha256: nar_sha256.to_vec().into(),
-                signatures: vec![],
-                reference_names: vec![],
-                // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6`
-                // do we need this anywhere?
-            }),
-        };
-
-        // put into [PathInfoService], and return the [PathInfo] that we get
-        // back from there (it might contain additional signatures).
-        let path_info = self.path_info_service.put(path_info)?;
-
-        Ok(path_info)
-    }
 }
 
 impl EvalIO for TvixStoreIO {
@@ -197,24 +142,33 @@ impl EvalIO for TvixStoreIO {
                                 )
                             })?;
 
-                        let reader = {
-                            let resp = self.blob_service.open_read(&digest)?;
-                            match resp {
-                                Some(blob_reader) => blob_reader,
-                                None => {
-                                    error!(
-                                        blob.digest = %digest,
-                                        "blob not found",
-                                    );
-                                    Err(io::Error::new(
-                                        io::ErrorKind::NotFound,
-                                        format!("blob {} not found", &digest),
-                                    ))?
+                        let blob_service = self.blob_service.clone();
+
+                        let task = self.tokio_handle.spawn(async move {
+                            let mut reader = {
+                                let resp = blob_service.open_read(&digest).await?;
+                                match resp {
+                                    Some(blob_reader) => blob_reader,
+                                    None => {
+                                        error!(
+                                            blob.digest = %digest,
+                                            "blob not found",
+                                        );
+                                        Err(io::Error::new(
+                                            io::ErrorKind::NotFound,
+                                            format!("blob {} not found", &digest),
+                                        ))?
+                                    }
                                 }
-                            }
-                        };
+                            };
 
-                        io::read_to_string(reader)
+                            let mut buf = String::new();
+
+                            reader.read_to_string(&mut buf).await?;
+                            Ok(buf)
+                        });
+
+                        self.tokio_handle.block_on(task).unwrap()
                     }
                     Node::Symlink(_symlink_node) => Err(io::Error::new(
                         io::ErrorKind::Unsupported,
@@ -296,7 +250,16 @@ impl EvalIO for TvixStoreIO {
 
     #[instrument(skip(self), ret, err)]
     fn import_path(&self, path: &std::path::Path) -> Result<PathBuf, std::io::Error> {
-        let path_info = self.import_path_with_pathinfo(path)?;
+        let p = path.to_owned();
+        let blob_service = self.blob_service.clone();
+        let directory_service = self.directory_service.clone();
+        let path_info_service = self.path_info_service.clone();
+
+        let task = self.tokio_handle.spawn(async move {
+            import_path_with_pathinfo(blob_service, directory_service, path_info_service, &p).await
+        });
+
+        let path_info = self.tokio_handle.block_on(task).unwrap()?;
 
         // from the [PathInfo], extract the store path (as string).
         Ok({
@@ -320,3 +283,63 @@ impl EvalIO for TvixStoreIO {
         Some("/nix/store".to_string())
     }
 }
+
+/// Imports a given path on the filesystem into the store, and returns the
+/// [PathInfo] describing the path, that was sent to
+/// [PathInfoService].
+#[instrument(skip(blob_service, directory_service, path_info_service), ret, err)]
+async fn import_path_with_pathinfo(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+    path_info_service: Arc<dyn PathInfoService>,
+    path: &std::path::Path,
+) -> Result<PathInfo, io::Error> {
+    // Call [import::ingest_path], which will walk over the given path and return a root_node.
+    let root_node = import::ingest_path(blob_service.clone(), directory_service.clone(), path)
+        .await
+        .expect("error during import_path");
+
+    // Render the NAR. This is blocking.
+    let calc_task = tokio::task::spawn_blocking(move || {
+        let (nar_size, nar_sha256) =
+            calculate_size_and_sha256(&root_node, blob_service.clone(), directory_service.clone())
+                .expect("error during nar calculation"); // TODO: handle error
+        (nar_size, nar_sha256, root_node)
+    });
+    let (nar_size, nar_sha256, root_node) = calc_task.await.unwrap();
+
+    // TODO: make a path_to_name helper function?
+    let name = path
+        .file_name()
+        .expect("path must not be ..")
+        .to_str()
+        .expect("path must be valid unicode");
+
+    let output_path = store_path::build_nar_based_store_path(&nar_sha256, name);
+
+    // assemble a new root_node with a name that is derived from the nar hash.
+    let root_node = root_node.rename(output_path.to_string().into_bytes().into());
+
+    // assemble the [PathInfo] object.
+    let path_info = PathInfo {
+        node: Some(tvix_store::proto::Node {
+            node: Some(root_node),
+        }),
+        // There's no reference scanning on path contents ingested like this.
+        references: vec![],
+        narinfo: Some(NarInfo {
+            nar_size,
+            nar_sha256: nar_sha256.to_vec().into(),
+            signatures: vec![],
+            reference_names: vec![],
+            // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6`
+            // do we need this anywhere?
+        }),
+    };
+
+    // put into [PathInfoService], and return the [PathInfo] that we get
+    // back from there (it might contain additional signatures).
+    let path_info = path_info_service.put(path_info)?;
+
+    Ok(path_info)
+}
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index dece06be8fb4..3fabf4f84bd5 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -17,7 +17,7 @@ sha2 = "0.10.6"
 sled = { version = "0.34.7", features = ["compression"] }
 thiserror = "1.0.38"
 tokio-stream = "0.1.14"
-tokio = { version = "1.28.0", features = ["net", "rt-multi-thread", "signal"] }
+tokio = { version = "1.28.0", features = ["fs", "net", "rt-multi-thread", "signal"] }
 tonic = "0.8.2"
 tracing = "0.1.37"
 tracing-subscriber = { version = "0.3.16", features = ["json"] }
@@ -29,6 +29,7 @@ bytes = "1.4.0"
 smol_str = "0.2.0"
 serde_json = "1.0"
 url = "2.4.0"
+pin-project-lite = "0.2.13"
 
 [dependencies.fuser]
 optional = true
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index f69c2f7fab15..0da264808efb 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -6,6 +6,7 @@ use nix_compat::store_path::StorePath;
 use std::io;
 use std::path::Path;
 use std::path::PathBuf;
+use tokio::task::JoinHandle;
 use tracing_subscriber::prelude::*;
 use tvix_store::blobservice;
 use tvix_store::directoryservice;
@@ -199,17 +200,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                     let directory_service = directory_service.clone();
                     let path_info_service = path_info_service.clone();
 
-                    let task = tokio::task::spawn_blocking(move || -> io::Result<()> {
+                    let task: JoinHandle<io::Result<()>> = tokio::task::spawn(async move {
                         // Ingest the path into blob and directory service.
                         let root_node = import::ingest_path(
                             blob_service.clone(),
                             directory_service.clone(),
                             &path,
                         )
+                        .await
                         .expect("failed to ingest path");
 
                         // Ask the PathInfoService for the NAR size and sha256
-                        let (nar_size, nar_sha256) = path_info_service.calculate_nar(&root_node)?;
+                        let root_node_copy = root_node.clone();
+                        let path_info_service_clone = path_info_service.clone();
+                        let (nar_size, nar_sha256) = tokio::task::spawn_blocking(move || {
+                            path_info_service_clone.calculate_nar(&root_node_copy)
+                        })
+                        .await
+                        .unwrap()?;
 
                         // TODO: make a path_to_name helper function?
                         let name = path
@@ -241,7 +249,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
                         // put into [PathInfoService], and return the PathInfo that we get back
                         // from there (it might contain additional signatures).
-                        let path_info = path_info_service.put(path_info)?;
+                        let path_info =
+                            tokio::task::spawn_blocking(move || path_info_service.put(path_info))
+                                .await
+                                .unwrap()?;
 
                         let node = path_info.node.unwrap().node.unwrap();
 
@@ -304,9 +315,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             // task.
             tokio::task::spawn_blocking(move || -> io::Result<()> {
                 info!("mounting tvix-store on {:?}", fuse_session.mountpoint());
-                let res = fuse_session.run()?;
+                fuse_session.run()?;
                 info!("unmount occured, terminating…");
-                Ok(res)
+                Ok(())
             })
             .await??;
         }
diff --git a/tvix/store/src/blobservice/dumb_seeker.rs b/tvix/store/src/blobservice/dumb_seeker.rs
deleted file mode 100644
index 6df4eb57f5f1..000000000000
--- a/tvix/store/src/blobservice/dumb_seeker.rs
+++ /dev/null
@@ -1,110 +0,0 @@
-use std::io;
-
-use tracing::{debug, instrument};
-
-use super::BlobReader;
-
-/// This implements [io::Seek] for and [io::Read] by simply skipping over some
-/// bytes, keeping track of the position.
-/// It fails whenever you try to seek backwards.
-pub struct DumbSeeker<R: io::Read> {
-    r: R,
-    pos: u64,
-}
-
-impl<R: io::Read> DumbSeeker<R> {
-    pub fn new(r: R) -> Self {
-        DumbSeeker { r, pos: 0 }
-    }
-}
-
-impl<R: io::Read> io::Read for DumbSeeker<R> {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        let bytes_read = self.r.read(buf)?;
-
-        self.pos += bytes_read as u64;
-
-        Ok(bytes_read)
-    }
-}
-
-impl<R: io::Read> io::Seek for DumbSeeker<R> {
-    #[instrument(skip(self))]
-    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
-        let absolute_offset: u64 = match pos {
-            io::SeekFrom::Start(start_offset) => {
-                if start_offset < self.pos {
-                    return Err(io::Error::new(
-                        io::ErrorKind::Unsupported,
-                        format!("can't seek backwards ({} -> {})", self.pos, start_offset),
-                    ));
-                } else {
-                    start_offset
-                }
-            }
-            // we don't know the total size, can't support this.
-            io::SeekFrom::End(_end_offset) => {
-                return Err(io::Error::new(
-                    io::ErrorKind::Unsupported,
-                    "can't seek from end",
-                ));
-            }
-            io::SeekFrom::Current(relative_offset) => {
-                if relative_offset < 0 {
-                    return Err(io::Error::new(
-                        io::ErrorKind::Unsupported,
-                        "can't seek backwards relative to current position",
-                    ));
-                } else {
-                    self.pos + relative_offset as u64
-                }
-            }
-        };
-
-        debug!(absolute_offset=?absolute_offset, "seek");
-
-        // we already know absolute_offset is larger than self.pos
-        debug_assert!(
-            absolute_offset >= self.pos,
-            "absolute_offset {} is larger than self.pos {}",
-            absolute_offset,
-            self.pos
-        );
-
-        // calculate bytes to skip
-        let bytes_to_skip: u64 = absolute_offset - self.pos;
-
-        // discard these bytes. We can't use take() as it requires ownership of
-        // self.r, but we only have &mut self.
-        let mut buf = [0; 1024];
-        let mut bytes_skipped: u64 = 0;
-        while bytes_skipped < bytes_to_skip {
-            let len = std::cmp::min(bytes_to_skip - bytes_skipped, buf.len() as u64);
-            match self.r.read(&mut buf[..len as usize]) {
-                Ok(0) => break,
-                Ok(n) => bytes_skipped += n as u64,
-                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
-                Err(e) => return Err(e),
-            }
-        }
-
-        // This will fail when seeking past the end of self.r
-        if bytes_to_skip != bytes_skipped {
-            return Err(std::io::Error::new(
-                std::io::ErrorKind::UnexpectedEof,
-                format!(
-                    "tried to skip {} bytes, but only was able to skip {} until reaching EOF",
-                    bytes_to_skip, bytes_skipped
-                ),
-            ));
-        }
-
-        self.pos = absolute_offset;
-
-        // return the new position from the start of the stream
-        Ok(absolute_offset)
-    }
-}
-
-/// A Cursor<Vec<u8>> can be used as a BlobReader.
-impl<R: io::Read + Send + 'static> BlobReader for DumbSeeker<R> {}
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs
index c6d28860f8aa..cea796033ea9 100644
--- a/tvix/store/src/blobservice/grpc.rs
+++ b/tvix/store/src/blobservice/grpc.rs
@@ -1,22 +1,26 @@
-use super::{dumb_seeker::DumbSeeker, BlobReader, BlobService, BlobWriter};
+use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
 use crate::{proto, B3Digest};
-use futures::sink::{SinkExt, SinkMapErr};
-use std::{collections::VecDeque, io};
+use futures::sink::SinkExt;
+use futures::TryFutureExt;
+use std::{
+    collections::VecDeque,
+    io::{self},
+    pin::pin,
+    task::Poll,
+};
+use tokio::io::AsyncWriteExt;
 use tokio::{net::UnixStream, task::JoinHandle};
 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
 use tokio_util::{
-    io::{CopyToBytes, SinkWriter, SyncIoBridge},
+    io::{CopyToBytes, SinkWriter},
     sync::{PollSendError, PollSender},
 };
-use tonic::{transport::Channel, Code, Status, Streaming};
+use tonic::{async_trait, transport::Channel, Code, Status};
 use tracing::instrument;
 
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
 pub struct GRPCBlobService {
-    /// A handle into the active tokio runtime. Necessary to spawn tasks.
-    tokio_handle: tokio::runtime::Handle,
-
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
     grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
@@ -28,13 +32,11 @@ impl GRPCBlobService {
     pub fn from_client(
         grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
     ) -> Self {
-        Self {
-            tokio_handle: tokio::runtime::Handle::current(),
-            grpc_client,
-        }
+        Self { grpc_client }
     }
 }
 
+#[async_trait]
 impl BlobService for GRPCBlobService {
     /// Constructs a [GRPCBlobService] from the passed [url::Url]:
     /// - scheme has to match `grpc+*://`.
@@ -89,22 +91,16 @@ impl BlobService for GRPCBlobService {
     }
 
     #[instrument(skip(self, digest), fields(blob.digest=%digest))]
-    fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> {
-        // Get a new handle to the gRPC client, and copy the digest.
+    async fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> {
         let mut grpc_client = self.grpc_client.clone();
-        let digest = digest.clone();
-
-        let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
-            Ok(grpc_client
-                .stat(proto::StatBlobRequest {
-                    digest: digest.into(),
-                    ..Default::default()
-                })
-                .await?
-                .into_inner())
-        });
-
-        match self.tokio_handle.block_on(task)? {
+        let resp = grpc_client
+            .stat(proto::StatBlobRequest {
+                digest: digest.clone().into(),
+                ..Default::default()
+            })
+            .await;
+
+        match resp {
             Ok(_blob_meta) => Ok(true),
             Err(e) if e.code() == Code::NotFound => Ok(false),
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
@@ -113,35 +109,30 @@ impl BlobService for GRPCBlobService {
 
     // On success, this returns a Ok(Some(io::Read)), which can be used to read
     // the contents of the Blob, identified by the digest.
-    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, crate::Error> {
+    async fn open_read(
+        &self,
+        digest: &B3Digest,
+    ) -> Result<Option<Box<dyn BlobReader>>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
-        let digest = digest.clone();
-
-        // Construct the task that'll send out the request and return the stream
-        // the gRPC client should use to send [proto::BlobChunk], or an error if
-        // the blob doesn't exist.
-        let task: JoinHandle<Result<Streaming<proto::BlobChunk>, Status>> =
-            self.tokio_handle.spawn(async move {
-                let stream = grpc_client
-                    .read(proto::ReadBlobRequest {
-                        digest: digest.into(),
-                    })
-                    .await?
-                    .into_inner();
-
-                Ok(stream)
-            });
+
+        // Get a stream of [proto::BlobChunk], or return an error if the blob
+        // doesn't exist.
+        let resp = grpc_client
+            .read(proto::ReadBlobRequest {
+                digest: digest.clone().into(),
+            })
+            .await;
 
         // This runs the task to completion, which on success will return a stream.
         // On reading from it, we receive individual [proto::BlobChunk], so we
         // massage this to a stream of bytes,
         // then create an [AsyncRead], which we'll turn into a [io::Read],
         // that's returned from the function.
-        match self.tokio_handle.block_on(task)? {
+        match resp {
             Ok(stream) => {
                 // map the stream of proto::BlobChunk to bytes.
-                let data_stream = stream.map(|x| {
+                let data_stream = stream.into_inner().map(|x| {
                     x.map(|x| VecDeque::from(x.data.to_vec()))
                         .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
                 });
@@ -149,9 +140,7 @@ impl BlobService for GRPCBlobService {
                 // Use StreamReader::new to convert to an AsyncRead.
                 let data_reader = tokio_util::io::StreamReader::new(data_stream);
 
-                // Use SyncIoBridge to turn it into a sync Read.
-                let sync_reader = tokio_util::io::SyncIoBridge::new(data_reader);
-                Ok(Some(Box::new(DumbSeeker::new(sync_reader))))
+                Ok(Some(Box::new(NaiveSeeker::new(data_reader))))
             }
             Err(e) if e.code() == Code::NotFound => Ok(None),
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
@@ -160,7 +149,7 @@ impl BlobService for GRPCBlobService {
 
     /// Returns a BlobWriter, that'll internally wrap each write in a
     // [proto::BlobChunk], which is send to the gRPC server.
-    fn open_write(&self) -> Box<dyn BlobWriter> {
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
         let mut grpc_client = self.grpc_client.clone();
 
         // set up an mpsc channel passing around Bytes.
@@ -171,9 +160,8 @@ impl BlobService for GRPCBlobService {
         let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x });
 
         // That receiver stream is used as a stream in the gRPC BlobService.put rpc call.
-        let task: JoinHandle<Result<_, Status>> = self
-            .tokio_handle
-            .spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) });
+        let task: JoinHandle<Result<_, Status>> =
+            tokio::spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) });
 
         // The tx part of the channel is converted to a sink of byte chunks.
 
@@ -187,43 +175,26 @@ impl BlobService for GRPCBlobService {
         // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item"
 
         // … which is turned into an [tokio::io::AsyncWrite].
-        let async_writer = SinkWriter::new(CopyToBytes::new(sink));
-        // … which is then turned into a [io::Write].
-        let writer = SyncIoBridge::new(async_writer);
+        let writer = SinkWriter::new(CopyToBytes::new(sink));
 
         Box::new(GRPCBlobWriter {
-            tokio_handle: self.tokio_handle.clone(),
             task_and_writer: Some((task, writer)),
             digest: None,
         })
     }
 }
 
-type BridgedWriter = SyncIoBridge<
-    SinkWriter<
-        CopyToBytes<
-            SinkMapErr<PollSender<bytes::Bytes>, fn(PollSendError<bytes::Bytes>) -> io::Error>,
-        >,
-    >,
->;
-
-pub struct GRPCBlobWriter {
-    /// A handle into the active tokio runtime. Necessary to block on the task
-    /// containing the put request.
-    tokio_handle: tokio::runtime::Handle,
-
+pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> {
     /// The task containing the put request, and the inner writer, if we're still writing.
-    task_and_writer: Option<(
-        JoinHandle<Result<proto::PutBlobResponse, Status>>,
-        BridgedWriter,
-    )>,
+    task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>,
 
     /// The digest that has been returned, if we successfully closed.
     digest: Option<B3Digest>,
 }
 
-impl BlobWriter for GRPCBlobWriter {
-    fn close(&mut self) -> Result<B3Digest, crate::Error> {
+#[async_trait]
+impl<W: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static> BlobWriter for GRPCBlobWriter<W> {
+    async fn close(&mut self) -> Result<B3Digest, crate::Error> {
         if self.task_and_writer.is_none() {
             // if we're already closed, return the b3 digest, which must exist.
             // If it doesn't, we already closed and failed once, and didn't handle the error.
@@ -240,12 +211,14 @@ impl BlobWriter for GRPCBlobWriter {
             // the channel.
             writer
                 .shutdown()
-                .map_err(|e| crate::Error::StorageError(e.to_string()))?;
+                .map_err(|e| crate::Error::StorageError(e.to_string()))
+                .await?;
 
             // block on the RPC call to return.
             // This ensures all chunks are sent out, and have been received by the
             // backend.
-            match self.tokio_handle.block_on(task)? {
+
+            match task.await? {
                 Ok(resp) => {
                     // return the digest from the response, and store it in self.digest for subsequent closes.
                     let digest: B3Digest = resp.digest.try_into().map_err(|_| {
@@ -262,26 +235,48 @@ impl BlobWriter for GRPCBlobWriter {
     }
 }
 
-impl io::Write for GRPCBlobWriter {
-    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for GRPCBlobWriter<W> {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
         match &mut self.task_and_writer {
-            None => Err(io::Error::new(
+            None => Poll::Ready(Err(io::Error::new(
                 io::ErrorKind::NotConnected,
                 "already closed",
-            )),
-            Some((_, ref mut writer)) => writer.write(buf),
+            ))),
+            Some((_, ref mut writer)) => {
+                let pinned_writer = pin!(writer);
+                pinned_writer.poll_write(cx, buf)
+            }
         }
     }
 
-    fn flush(&mut self) -> io::Result<()> {
+    fn poll_flush(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
         match &mut self.task_and_writer {
-            None => Err(io::Error::new(
+            None => Poll::Ready(Err(io::Error::new(
                 io::ErrorKind::NotConnected,
                 "already closed",
-            )),
-            Some((_, ref mut writer)) => writer.flush(),
+            ))),
+            Some((_, ref mut writer)) => {
+                let pinned_writer = pin!(writer);
+                pinned_writer.poll_flush(cx)
+            }
         }
     }
+
+    fn poll_shutdown(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        // TODO(raitobezarius): this might not be a graceful shutdown of the
+        // channel inside the gRPC connection.
+        Poll::Ready(Ok(()))
+    }
 }
 
 #[cfg(test)]
@@ -291,7 +286,6 @@ mod tests {
 
     use tempfile::TempDir;
     use tokio::net::UnixListener;
-    use tokio::task;
     use tokio::time;
     use tokio_stream::wrappers::UnixListenerStream;
 
@@ -358,32 +352,23 @@ mod tests {
     }
 
     /// This uses the correct scheme for a unix socket, and provides a server on the other side.
-    #[tokio::test]
-    async fn test_valid_unix_path_ping_pong() {
+    /// This is not a tokio::test, because spawn two separate tokio runtimes and
+    // want to have explicit control.
+    #[test]
+    fn test_valid_unix_path_ping_pong() {
         let tmpdir = TempDir::new().unwrap();
         let path = tmpdir.path().join("daemon");
 
-        // let mut join_set = JoinSet::new();
-
-        // prepare a client
-        let client = {
-            let mut url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse");
-            url.set_path(path.to_str().unwrap());
-            GRPCBlobService::from_url(&url).expect("must succeed")
-        };
-
-        let path_copy = path.clone();
+        let path_clone = path.clone();
 
         // Spin up a server, in a thread far away, which spawns its own tokio runtime,
         // and blocks on the task.
         thread::spawn(move || {
             // Create the runtime
             let rt = tokio::runtime::Runtime::new().unwrap();
-            // Get a handle from this runtime
-            let handle = rt.handle();
 
-            let task = handle.spawn(async {
-                let uds = UnixListener::bind(path_copy).unwrap();
+            let task = rt.spawn(async {
+                let uds = UnixListener::bind(path_clone).unwrap();
                 let uds_stream = UnixListenerStream::new(uds);
 
                 // spin up a new server
@@ -397,33 +382,46 @@ mod tests {
                 router.serve_with_incoming(uds_stream).await
             });
 
-            handle.block_on(task)
+            rt.block_on(task).unwrap().unwrap();
         });
 
-        // wait for the socket to be created
-        {
-            let mut socket_created = false;
-            for _try in 1..20 {
-                if path.exists() {
-                    socket_created = true;
-                    break;
+        // Now create another tokio runtime which we'll use in the main test code.
+        let rt = tokio::runtime::Runtime::new().unwrap();
+
+        let task = rt.spawn(async move {
+            // wait for the socket to be created
+            {
+                let mut socket_created = false;
+                // TODO: exponential backoff urgently
+                for _try in 1..20 {
+                    if path.exists() {
+                        socket_created = true;
+                        break;
+                    }
+                    tokio::time::sleep(time::Duration::from_millis(20)).await;
                 }
-                tokio::time::sleep(time::Duration::from_millis(20)).await;
+
+                assert!(
+                    socket_created,
+                    "expected socket path to eventually get created, but never happened"
+                );
             }
 
-            assert!(
-                socket_created,
-                "expected socket path to eventually get created, but never happened"
-            );
-        }
+            // prepare a client
+            let client = {
+                let mut url =
+                    url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse");
+                url.set_path(path.to_str().unwrap());
+                GRPCBlobService::from_url(&url).expect("must succeed")
+            };
 
-        let has = task::spawn_blocking(move || {
-            client
+            let has = client
                 .has(&fixtures::BLOB_A_DIGEST)
-                .expect("must not be err")
-        })
-        .await
-        .expect("must not be err");
-        assert!(!has);
+                .await
+                .expect("must not be err");
+
+            assert!(!has);
+        });
+        rt.block_on(task).unwrap()
     }
 }
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
index 893f27364b80..383127344a17 100644
--- a/tvix/store/src/blobservice/memory.rs
+++ b/tvix/store/src/blobservice/memory.rs
@@ -1,9 +1,11 @@
-use std::io::{self, Cursor};
+use std::io::{self, Cursor, Write};
+use std::task::Poll;
 use std::{
     collections::HashMap,
     sync::{Arc, RwLock},
 };
-use tracing::{instrument, warn};
+use tonic::async_trait;
+use tracing::instrument;
 
 use super::{BlobReader, BlobService, BlobWriter};
 use crate::{B3Digest, Error};
@@ -13,6 +15,7 @@ pub struct MemoryBlobService {
     db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
 }
 
+#[async_trait]
 impl BlobService for MemoryBlobService {
     /// Constructs a [MemoryBlobService] from the passed [url::Url]:
     /// - scheme has to be `memory://`
@@ -31,12 +34,12 @@ impl BlobService for MemoryBlobService {
     }
 
     #[instrument(skip(self, digest), fields(blob.digest=%digest))]
-    fn has(&self, digest: &B3Digest) -> Result<bool, Error> {
+    async fn has(&self, digest: &B3Digest) -> Result<bool, Error> {
         let db = self.db.read().unwrap();
         Ok(db.contains_key(digest))
     }
 
-    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> {
+    async fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> {
         let db = self.db.read().unwrap();
 
         match db.get(digest).map(|x| Cursor::new(x.clone())) {
@@ -46,7 +49,7 @@ impl BlobService for MemoryBlobService {
     }
 
     #[instrument(skip(self))]
-    fn open_write(&self) -> Box<dyn BlobWriter> {
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
         Box::new(MemoryBlobWriter::new(self.db.clone()))
     }
 }
@@ -70,9 +73,13 @@ impl MemoryBlobWriter {
         }
     }
 }
-impl std::io::Write for MemoryBlobWriter {
-    fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
-        match &mut self.writers {
+impl tokio::io::AsyncWrite for MemoryBlobWriter {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+        b: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        Poll::Ready(match &mut self.writers {
             None => Err(io::Error::new(
                 io::ErrorKind::NotConnected,
                 "already closed",
@@ -81,22 +88,34 @@ impl std::io::Write for MemoryBlobWriter {
                 let bytes_written = buf.write(b)?;
                 hasher.write(&b[..bytes_written])
             }
-        }
+        })
     }
 
-    fn flush(&mut self) -> std::io::Result<()> {
-        match &mut self.writers {
+    fn poll_flush(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        Poll::Ready(match self.writers {
             None => Err(io::Error::new(
                 io::ErrorKind::NotConnected,
                 "already closed",
             )),
             Some(_) => Ok(()),
-        }
+        })
+    }
+
+    fn poll_shutdown(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        // shutdown is "instantaneous", we only write to memory.
+        Poll::Ready(Ok(()))
     }
 }
 
+#[async_trait]
 impl BlobWriter for MemoryBlobWriter {
-    fn close(&mut self) -> Result<B3Digest, Error> {
+    async fn close(&mut self) -> Result<B3Digest, Error> {
         if self.writers.is_none() {
             match &self.digest {
                 Some(digest) => Ok(digest.clone()),
diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs
index 33cfb113edd3..5ecf25ac1337 100644
--- a/tvix/store/src/blobservice/mod.rs
+++ b/tvix/store/src/blobservice/mod.rs
@@ -1,11 +1,12 @@
 use std::io;
+use tonic::async_trait;
 
 use crate::{B3Digest, Error};
 
-mod dumb_seeker;
 mod from_addr;
 mod grpc;
 mod memory;
+mod naive_seeker;
 mod sled;
 
 #[cfg(test)]
@@ -21,35 +22,41 @@ pub use self::sled::SledBlobService;
 /// a way to get a [io::Read] to a blob, and a method to initiate writing a new
 /// Blob, which will return something implmenting io::Write, and providing a
 /// close funtion, to finalize a blob and get its digest.
+#[async_trait]
 pub trait BlobService: Send + Sync {
     /// Create a new instance by passing in a connection URL.
+    /// TODO: check if we want to make this async, instead of lazily connecting
     fn from_url(url: &url::Url) -> Result<Self, Error>
     where
         Self: Sized;
 
     /// Check if the service has the blob, by its content hash.
-    fn has(&self, digest: &B3Digest) -> Result<bool, Error>;
+    async fn has(&self, digest: &B3Digest) -> Result<bool, Error>;
 
     /// Request a blob from the store, by its content hash.
-    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error>;
+    async fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error>;
 
     /// Insert a new blob into the store. Returns a [BlobWriter], which
     /// implements [io::Write] and a [BlobWriter::close].
-    fn open_write(&self) -> Box<dyn BlobWriter>;
+    async fn open_write(&self) -> Box<dyn BlobWriter>;
 }
 
-/// A [io::Write] that you need to close() afterwards, and get back the digest
-/// of the written blob.
-pub trait BlobWriter: io::Write + Send + Sync + 'static {
+/// A [tokio::io::AsyncWrite] that you need to close() afterwards, and get back
+/// the digest of the written blob.
+#[async_trait]
+pub trait BlobWriter: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static {
     /// Signal there's no more data to be written, and return the digest of the
     /// contents written.
     ///
     /// Closing a already-closed BlobWriter is a no-op.
-    fn close(&mut self) -> Result<B3Digest, Error>;
+    async fn close(&mut self) -> Result<B3Digest, Error>;
 }
 
-/// A [io::Read] that also allows seeking.
-pub trait BlobReader: io::Read + io::Seek + Send + 'static {}
+/// A [tokio::io::AsyncRead] that also allows seeking.
+pub trait BlobReader:
+    tokio::io::AsyncRead + tokio::io::AsyncSeek + tokio::io::AsyncBufRead + Send + Unpin + 'static
+{
+}
 
 /// A [`io::Cursor<Vec<u8>>`] can be used as a BlobReader.
 impl BlobReader for io::Cursor<Vec<u8>> {}
diff --git a/tvix/store/src/blobservice/naive_seeker.rs b/tvix/store/src/blobservice/naive_seeker.rs
new file mode 100644
index 000000000000..e65a82c7f45a
--- /dev/null
+++ b/tvix/store/src/blobservice/naive_seeker.rs
@@ -0,0 +1,269 @@
+use super::BlobReader;
+use pin_project_lite::pin_project;
+use std::io;
+use std::task::Poll;
+use tokio::io::AsyncRead;
+use tracing::{debug, instrument};
+
+pin_project! {
+    /// This implements [tokio::io::AsyncSeek] for and [tokio::io::AsyncRead] by
+    /// simply skipping over some bytes, keeping track of the position.
+    /// It fails whenever you try to seek backwards.
+    ///
+    /// ## Pinning concerns:
+    ///
+    /// [NaiveSeeker] is itself pinned by callers, and we do not need to concern
+    /// ourselves regarding that.
+    ///
+    /// Though, its fields as per
+    /// <https://doc.rust-lang.org/std/pin/#pinning-is-not-structural-for-field>
+    /// can be pinned or unpinned.
+    ///
+    /// So we need to go over each field and choose our policy carefully.
+    ///
+    /// The obvious cases are the bookkeeping integers we keep in the structure,
+    /// those are private and not shared to anyone, we never build a
+    /// `Pin<&mut X>` out of them at any point, therefore, we can safely never
+    /// mark them as pinned. Of course, it is expected that no developer here
+    /// attempt to `pin!(self.pos)` to pin them because it makes no sense. If
+    /// they have to become pinned, they should be marked `#[pin]` and we need
+    /// to discuss it.
+    ///
+    /// So the bookkeeping integers are in the right state with respect to their
+    /// pinning status. The projection should offer direct access.
+    ///
+    /// On the `r` field, i.e. a `BufReader<R>`, given that
+    /// <https://docs.rs/tokio/latest/tokio/io/struct.BufReader.html#impl-Unpin-for-BufReader%3CR%3E>
+    /// is available, even a `Pin<&mut BufReader<R>>` can be safely moved.
+    ///
+    /// The only care we should have regards the internal reader itself, i.e.
+    /// the `R` instance, see that Tokio decided to `#[pin]` it too:
+    /// <https://docs.rs/tokio/latest/src/tokio/io/util/buf_reader.rs.html#29>
+    ///
+    /// In general, there's no `Unpin` instance for `R: tokio::io::AsyncRead`
+    /// (see <https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html>).
+    ///
+    /// Therefore, we could keep it unpinned and pin it in every call site
+    /// whenever we need to call `poll_*` which can be confusing to the non-
+    /// expert developer and we have a fair share amount of situations where the
+    /// [BufReader] instance is naked, i.e. in its `&mut BufReader<R>`
+    /// form, this is annoying because it could lead to expose the naked `R`
+    /// internal instance somehow and would produce a risk of making it move
+    /// unexpectedly.
+    ///
+    /// We choose the path of the least resistance as we have no reason to have
+    /// access to the raw `BufReader<R>` instance, we just `#[pin]` it too and
+    /// enjoy its `poll_*` safe APIs and push the unpinning concerns to the
+    /// internal implementations themselves, which studied the question longer
+    /// than us.
+    pub struct NaiveSeeker<R: tokio::io::AsyncRead> {
+        #[pin]
+        r: tokio::io::BufReader<R>,
+        pos: u64,
+        bytes_to_skip: u64,
+    }
+}
+
+impl<R: tokio::io::AsyncRead> NaiveSeeker<R> {
+    pub fn new(r: R) -> Self {
+        NaiveSeeker {
+            r: tokio::io::BufReader::new(r),
+            pos: 0,
+            bytes_to_skip: 0,
+        }
+    }
+}
+
+impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> {
+    fn poll_read(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &mut tokio::io::ReadBuf<'_>,
+    ) -> Poll<std::io::Result<()>> {
+        // The amount of data read can be determined by the increase
+        // in the length of the slice returned by `ReadBuf::filled`.
+        let filled_before = buf.filled().len();
+        let this = self.project();
+        let pos: &mut u64 = this.pos;
+
+        match this.r.poll_read(cx, buf) {
+            Poll::Ready(a) => {
+                let bytes_read = buf.filled().len() - filled_before;
+                *pos += bytes_read as u64;
+
+                Poll::Ready(a)
+            }
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
+impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> {
+    fn poll_fill_buf(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<io::Result<&[u8]>> {
+        self.project().r.poll_fill_buf(cx)
+    }
+
+    fn consume(self: std::pin::Pin<&mut Self>, amt: usize) {
+        let this = self.project();
+        this.r.consume(amt);
+        let pos: &mut u64 = this.pos;
+        *pos += amt as u64;
+    }
+}
+
+impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> {
+    #[instrument(skip(self))]
+    fn start_seek(
+        self: std::pin::Pin<&mut Self>,
+        position: std::io::SeekFrom,
+    ) -> std::io::Result<()> {
+        let absolute_offset: u64 = match position {
+            io::SeekFrom::Start(start_offset) => {
+                if start_offset < self.pos {
+                    return Err(io::Error::new(
+                        io::ErrorKind::Unsupported,
+                        format!("can't seek backwards ({} -> {})", self.pos, start_offset),
+                    ));
+                } else {
+                    start_offset
+                }
+            }
+            // we don't know the total size, can't support this.
+            io::SeekFrom::End(_end_offset) => {
+                return Err(io::Error::new(
+                    io::ErrorKind::Unsupported,
+                    "can't seek from end",
+                ));
+            }
+            io::SeekFrom::Current(relative_offset) => {
+                if relative_offset < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::Unsupported,
+                        "can't seek backwards relative to current position",
+                    ));
+                } else {
+                    self.pos + relative_offset as u64
+                }
+            }
+        };
+
+        debug!(absolute_offset=?absolute_offset, "seek");
+
+        // we already know absolute_offset is larger than self.pos
+        debug_assert!(
+            absolute_offset >= self.pos,
+            "absolute_offset {} is larger than self.pos {}",
+            absolute_offset,
+            self.pos
+        );
+
+        // calculate bytes to skip
+        *self.project().bytes_to_skip = absolute_offset - self.pos;
+
+        Ok(())
+    }
+
+    #[instrument(skip(self))]
+    fn poll_complete(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<std::io::Result<u64>> {
+        if self.bytes_to_skip == 0 {
+            // return the new position (from the start of the stream)
+            return Poll::Ready(Ok(self.pos));
+        }
+
+        // discard some bytes, until pos is where we want it to be.
+        // We create a buffer that we'll discard later on.
+        let mut buf = [0; 1024];
+
+        // Loop until we've reached the desired seek position. This is done by issuing repeated
+        // `poll_read` calls. If the data is not available yet, we will yield back to the executor
+        // and wait to be polled again.
+        loop {
+            // calculate the length we want to skip at most, which is either a max
+            // buffer size, or the number of remaining bytes to read, whatever is
+            // smaller.
+            let bytes_to_skip = std::cmp::min(self.bytes_to_skip as usize, buf.len());
+
+            let mut read_buf = tokio::io::ReadBuf::new(&mut buf[..bytes_to_skip]);
+
+            match self.as_mut().poll_read(cx, &mut read_buf) {
+                Poll::Ready(_a) => {
+                    let bytes_read = read_buf.filled().len() as u64;
+
+                    if bytes_read == 0 {
+                        return Poll::Ready(Err(io::Error::new(
+                            io::ErrorKind::UnexpectedEof,
+                            format!(
+                                "tried to skip {} bytes, but only was able to skip {} until reaching EOF",
+                                bytes_to_skip, bytes_read
+                            ),
+                        )));
+                    }
+
+                    // calculate bytes to skip
+                    let bytes_to_skip = self.bytes_to_skip - bytes_read;
+
+                    *self.as_mut().project().bytes_to_skip = bytes_to_skip;
+
+                    if bytes_to_skip == 0 {
+                        return Poll::Ready(Ok(self.pos));
+                    }
+                }
+                Poll::Pending => return Poll::Pending,
+            };
+        }
+    }
+}
+
+impl<R: tokio::io::AsyncRead + Send + Unpin + 'static> BlobReader for NaiveSeeker<R> {}
+
+#[cfg(test)]
+mod tests {
+    use super::NaiveSeeker;
+    use std::io::{Cursor, SeekFrom};
+    use tokio::io::{AsyncReadExt, AsyncSeekExt};
+
+    /// This seek requires multiple `poll_read` as we use a 1024 bytes internal
+    /// buffer when doing the seek.
+    /// This ensures we don't hang indefinitely.
+    #[tokio::test]
+    async fn seek() {
+        let buf = vec![0u8; 4096];
+        let reader = Cursor::new(&buf);
+        let mut seeker = NaiveSeeker::new(reader);
+        seeker.seek(SeekFrom::Start(4000)).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn seek_read() {
+        let mut buf = vec![0u8; 2048];
+        buf.extend_from_slice(&[1u8; 2048]);
+        buf.extend_from_slice(&[2u8; 2048]);
+
+        let reader = Cursor::new(&buf);
+        let mut seeker = NaiveSeeker::new(reader);
+
+        let mut read_buf = vec![0u8; 1024];
+        seeker.read_exact(&mut read_buf).await.expect("must read");
+        assert_eq!(read_buf.as_slice(), &[0u8; 1024]);
+
+        seeker
+            .seek(SeekFrom::Current(1024))
+            .await
+            .expect("must seek");
+        seeker.read_exact(&mut read_buf).await.expect("must read");
+        assert_eq!(read_buf.as_slice(), &[1u8; 1024]);
+
+        seeker
+            .seek(SeekFrom::Start(2 * 2048))
+            .await
+            .expect("must seek");
+        seeker.read_exact(&mut read_buf).await.expect("must read");
+        assert_eq!(read_buf.as_slice(), &[2u8; 1024]);
+    }
+}
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs
index 00291ba88717..209f0b76fc7a 100644
--- a/tvix/store/src/blobservice/sled.rs
+++ b/tvix/store/src/blobservice/sled.rs
@@ -1,9 +1,11 @@
 use super::{BlobReader, BlobService, BlobWriter};
 use crate::{B3Digest, Error};
 use std::{
-    io::{self, Cursor},
+    io::{self, Cursor, Write},
     path::PathBuf,
+    task::Poll,
 };
+use tonic::async_trait;
 use tracing::instrument;
 
 #[derive(Clone)]
@@ -27,6 +29,7 @@ impl SledBlobService {
     }
 }
 
+#[async_trait]
 impl BlobService for SledBlobService {
     /// Constructs a [SledBlobService] from the passed [url::Url]:
     /// - scheme has to be `sled://`
@@ -57,7 +60,7 @@ impl BlobService for SledBlobService {
     }
 
     #[instrument(skip(self), fields(blob.digest=%digest))]
-    fn has(&self, digest: &B3Digest) -> Result<bool, Error> {
+    async fn has(&self, digest: &B3Digest) -> Result<bool, Error> {
         match self.db.contains_key(digest.to_vec()) {
             Ok(has) => Ok(has),
             Err(e) => Err(Error::StorageError(e.to_string())),
@@ -65,7 +68,7 @@ impl BlobService for SledBlobService {
     }
 
     #[instrument(skip(self), fields(blob.digest=%digest))]
-    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> {
+    async fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> {
         match self.db.get(digest.to_vec()) {
             Ok(None) => Ok(None),
             Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))),
@@ -74,7 +77,7 @@ impl BlobService for SledBlobService {
     }
 
     #[instrument(skip(self))]
-    fn open_write(&self) -> Box<dyn BlobWriter> {
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
         Box::new(SledBlobWriter::new(self.db.clone()))
     }
 }
@@ -99,9 +102,13 @@ impl SledBlobWriter {
     }
 }
 
-impl io::Write for SledBlobWriter {
-    fn write(&mut self, b: &[u8]) -> io::Result<usize> {
-        match &mut self.writers {
+impl tokio::io::AsyncWrite for SledBlobWriter {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+        b: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        Poll::Ready(match &mut self.writers {
             None => Err(io::Error::new(
                 io::ErrorKind::NotConnected,
                 "already closed",
@@ -110,22 +117,34 @@ impl io::Write for SledBlobWriter {
                 let bytes_written = buf.write(b)?;
                 hasher.write(&b[..bytes_written])
             }
-        }
+        })
     }
 
-    fn flush(&mut self) -> io::Result<()> {
-        match &mut self.writers {
+    fn poll_flush(
+        mut self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        Poll::Ready(match &mut self.writers {
             None => Err(io::Error::new(
                 io::ErrorKind::NotConnected,
                 "already closed",
             )),
             Some(_) => Ok(()),
-        }
+        })
+    }
+
+    fn poll_shutdown(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        // shutdown is "instantaneous", we only write to a Vec<u8> as buffer.
+        Poll::Ready(Ok(()))
     }
 }
 
+#[async_trait]
 impl BlobWriter for SledBlobWriter {
-    fn close(&mut self) -> Result<B3Digest, Error> {
+    async fn close(&mut self) -> Result<B3Digest, Error> {
         if self.writers.is_none() {
             match &self.digest {
                 Some(digest) => Ok(digest.clone()),
diff --git a/tvix/store/src/blobservice/tests.rs b/tvix/store/src/blobservice/tests.rs
index ec7a618fab58..501270780cf4 100644
--- a/tvix/store/src/blobservice/tests.rs
+++ b/tvix/store/src/blobservice/tests.rs
@@ -1,6 +1,9 @@
 use std::io;
+use std::pin::pin;
 
 use test_case::test_case;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncSeekExt;
 
 use super::B3Digest;
 use super::BlobService;
@@ -24,19 +27,25 @@ fn gen_sled_blob_service() -> impl BlobService {
 #[test_case(gen_memory_blob_service(); "memory")]
 #[test_case(gen_sled_blob_service(); "sled")]
 fn has_nonexistent_false(blob_service: impl BlobService) {
-    assert!(!blob_service
-        .has(&fixtures::BLOB_A_DIGEST)
-        .expect("must not fail"));
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        assert!(!blob_service
+            .has(&fixtures::BLOB_A_DIGEST)
+            .await
+            .expect("must not fail"));
+    })
 }
 
 /// Trying to read a non-existing blob should return a None instead of a reader.
 #[test_case(gen_memory_blob_service(); "memory")]
 #[test_case(gen_sled_blob_service(); "sled")]
 fn not_found_read(blob_service: impl BlobService) {
-    assert!(blob_service
-        .open_read(&fixtures::BLOB_A_DIGEST)
-        .expect("must not fail")
-        .is_none())
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        assert!(blob_service
+            .open_read(&fixtures::BLOB_A_DIGEST)
+            .await
+            .expect("must not fail")
+            .is_none())
+    })
 }
 
 /// Put a blob in the store, check has, get it back.
@@ -46,165 +55,192 @@ fn not_found_read(blob_service: impl BlobService) {
 #[test_case(gen_memory_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "memory-big")]
 #[test_case(gen_sled_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "sled-big")]
 fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest: &B3Digest) {
-    let mut w = blob_service.open_write();
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        let mut w = blob_service.open_write().await;
 
-    let l = io::copy(&mut io::Cursor::new(blob_contents), &mut w).expect("copy must succeed");
-    assert_eq!(
-        blob_contents.len(),
-        l as usize,
-        "written bytes must match blob length"
-    );
+        let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w)
+            .await
+            .expect("copy must succeed");
+        assert_eq!(
+            blob_contents.len(),
+            l as usize,
+            "written bytes must match blob length"
+        );
 
-    let digest = w.close().expect("close must succeed");
+        let digest = w.close().await.expect("close must succeed");
 
-    assert_eq!(*blob_digest, digest, "returned digest must be correct");
+        assert_eq!(*blob_digest, digest, "returned digest must be correct");
 
-    assert!(
-        blob_service.has(blob_digest).expect("must not fail"),
-        "blob service should now have the blob"
-    );
+        assert!(
+            blob_service.has(blob_digest).await.expect("must not fail"),
+            "blob service should now have the blob"
+        );
 
-    let mut r = blob_service
-        .open_read(blob_digest)
-        .expect("open_read must succeed")
-        .expect("must be some");
+        let mut r = blob_service
+            .open_read(blob_digest)
+            .await
+            .expect("open_read must succeed")
+            .expect("must be some");
 
-    let mut buf: Vec<u8> = Vec::new();
-    let l = io::copy(&mut r, &mut buf).expect("copy must succeed");
+        let mut buf: Vec<u8> = Vec::new();
+        let mut pinned_reader = pin!(r);
+        let l = tokio::io::copy(&mut pinned_reader, &mut buf)
+            .await
+            .expect("copy must succeed");
+        // let l = io::copy(&mut r, &mut buf).expect("copy must succeed");
 
-    assert_eq!(
-        blob_contents.len(),
-        l as usize,
-        "read bytes must match blob length"
-    );
+        assert_eq!(
+            blob_contents.len(),
+            l as usize,
+            "read bytes must match blob length"
+        );
 
-    assert_eq!(blob_contents, buf, "read blob contents must match");
+        assert_eq!(blob_contents, buf, "read blob contents must match");
+    })
 }
 
 /// Put a blob in the store, and seek inside it a bit.
 #[test_case(gen_memory_blob_service(); "memory")]
 #[test_case(gen_sled_blob_service(); "sled")]
 fn put_seek(blob_service: impl BlobService) {
-    let mut w = blob_service.open_write();
-
-    io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w).expect("copy must succeed");
-    w.close().expect("close must succeed");
-
-    // open a blob for reading
-    let mut r = blob_service
-        .open_read(&fixtures::BLOB_B_DIGEST)
-        .expect("open_read must succeed")
-        .expect("must be some");
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        let mut w = blob_service.open_write().await;
 
-    let mut pos: u64 = 0;
+        tokio::io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w)
+            .await
+            .expect("copy must succeed");
+        w.close().await.expect("close must succeed");
 
-    // read the first 10 bytes, they must match the data in the fixture.
-    {
-        let mut buf = [0; 10];
-        r.read_exact(&mut buf).expect("must succeed");
+        // open a blob for reading
+        let mut r = blob_service
+            .open_read(&fixtures::BLOB_B_DIGEST)
+            .await
+            .expect("open_read must succeed")
+            .expect("must be some");
 
-        assert_eq!(
-            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-            buf,
-            "expected first 10 bytes to match"
-        );
+        let mut pos: u64 = 0;
 
-        pos += buf.len() as u64;
-    }
-    // seek by 0 bytes, using SeekFrom::Start.
-    let p = r.seek(io::SeekFrom::Start(pos)).expect("must not fail");
-    assert_eq!(pos, p);
-
-    // read the next 10 bytes, they must match the data in the fixture.
-    {
-        let mut buf = [0; 10];
-        r.read_exact(&mut buf).expect("must succeed");
+        // read the first 10 bytes, they must match the data in the fixture.
+        {
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
 
-        assert_eq!(
-            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-            buf,
-            "expected data to match"
-        );
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected first 10 bytes to match"
+            );
 
-        pos += buf.len() as u64;
-    }
+            pos += buf.len() as u64;
+        }
+        // seek by 0 bytes, using SeekFrom::Start.
+        let p = r
+            .seek(io::SeekFrom::Start(pos))
+            .await
+            .expect("must not fail");
+        assert_eq!(pos, p);
+
+        // read the next 10 bytes, they must match the data in the fixture.
+        {
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
 
-    // seek by 5 bytes, using SeekFrom::Start.
-    let p = r.seek(io::SeekFrom::Start(pos + 5)).expect("must not fail");
-    pos += 5;
-    assert_eq!(pos, p);
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected data to match"
+            );
 
-    // read the next 10 bytes, they must match the data in the fixture.
-    {
-        let mut buf = [0; 10];
-        r.read_exact(&mut buf).expect("must succeed");
+            pos += buf.len() as u64;
+        }
 
-        assert_eq!(
-            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-            buf,
-            "expected data to match"
-        );
+        // seek by 5 bytes, using SeekFrom::Start.
+        let p = r
+            .seek(io::SeekFrom::Start(pos + 5))
+            .await
+            .expect("must not fail");
+        pos += 5;
+        assert_eq!(pos, p);
 
-        pos += buf.len() as u64;
-    }
+        // read the next 10 bytes, they must match the data in the fixture.
+        {
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
 
-    // seek by 12345 bytes, using SeekFrom::
-    let p = r.seek(io::SeekFrom::Current(12345)).expect("must not fail");
-    pos += 12345;
-    assert_eq!(pos, p);
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected data to match"
+            );
 
-    // read the next 10 bytes, they must match the data in the fixture.
-    {
-        let mut buf = [0; 10];
-        r.read_exact(&mut buf).expect("must succeed");
+            pos += buf.len() as u64;
+        }
 
-        assert_eq!(
-            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-            buf,
-            "expected data to match"
-        );
+        // seek by 12345 bytes, using SeekFrom::
+        let p = r
+            .seek(io::SeekFrom::Current(12345))
+            .await
+            .expect("must not fail");
+        pos += 12345;
+        assert_eq!(pos, p);
 
-        #[allow(unused_assignments)]
+        // read the next 10 bytes, they must match the data in the fixture.
         {
-            pos += buf.len() as u64;
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
+
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected data to match"
+            );
+
+            #[allow(unused_assignments)]
+            {
+                pos += buf.len() as u64;
+            }
         }
-    }
 
-    // seeking to the end is okay…
-    let p = r
-        .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64))
-        .expect("must not fail");
-    pos = fixtures::BLOB_B.len() as u64;
-    assert_eq!(pos, p);
+        // seeking to the end is okay…
+        let p = r
+            .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64))
+            .await
+            .expect("must not fail");
+        pos = fixtures::BLOB_B.len() as u64;
+        assert_eq!(pos, p);
 
-    {
-        // but it returns no more data.
-        let mut buf: Vec<u8> = Vec::new();
-        r.read_to_end(&mut buf).expect("must not fail");
-        assert!(buf.is_empty(), "expected no more data to be read");
-    }
-
-    // seeking past the end…
-    match r.seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) {
-        // should either be ok, but then return 0 bytes.
-        // this matches the behaviour or a Cursor<Vec<u8>>.
-        Ok(_pos) => {
+        {
+            // but it returns no more data.
             let mut buf: Vec<u8> = Vec::new();
-            r.read_to_end(&mut buf).expect("must not fail");
+            r.read_to_end(&mut buf).await.expect("must not fail");
             assert!(buf.is_empty(), "expected no more data to be read");
         }
-        // or not be okay.
-        Err(_) => {}
-    }
 
-    // TODO: this is only broken for the gRPC version
-    // We expect seeking backwards or relative to the end to fail.
-    // r.seek(io::SeekFrom::Current(-1))
-    //     .expect_err("SeekFrom::Current(-1) expected to fail");
+        // seeking past the end…
+        match r
+            .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1))
+            .await
+        {
+            // should either be ok, but then return 0 bytes.
+            // this matches the behaviour or a Cursor<Vec<u8>>.
+            Ok(_pos) => {
+                let mut buf: Vec<u8> = Vec::new();
+                r.read_to_end(&mut buf).await.expect("must not fail");
+                assert!(buf.is_empty(), "expected no more data to be read");
+            }
+            // or not be okay.
+            Err(_) => {}
+        }
+
+        // TODO: this is only broken for the gRPC version
+        // We expect seeking backwards or relative to the end to fail.
+        // r.seek(io::SeekFrom::Current(-1))
+        //     .expect_err("SeekFrom::Current(-1) expected to fail");
 
-    // r.seek(io::SeekFrom::Start(pos - 1))
-    //     .expect_err("SeekFrom::Start(pos-1) expected to fail");
+        // r.seek(io::SeekFrom::Start(pos - 1))
+        //     .expect_err("SeekFrom::Start(pos-1) expected to fail");
 
-    // r.seek(io::SeekFrom::End(0))
-    //     .expect_err("SeekFrom::End(_) expected to fail");
+        // r.seek(io::SeekFrom::End(0))
+        //     .expect_err("SeekFrom::End(_) expected to fail");
+    })
 }
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs
index 36e5b10d307d..fea4191400f3 100644
--- a/tvix/store/src/directoryservice/mod.rs
+++ b/tvix/store/src/directoryservice/mod.rs
@@ -49,7 +49,7 @@ pub trait DirectoryService: Send + Sync {
 /// The consumer can periodically call [DirectoryPutter::put], starting from the
 /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
 /// retrieve the root digest (or an error).
-pub trait DirectoryPutter {
+pub trait DirectoryPutter: Send {
     /// Put a individual [proto::Directory] into the store.
     /// Error semantics and behaviour is up to the specific implementation of
     /// this trait.
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs
index 0015abb9d557..978fd50e2634 100644
--- a/tvix/store/src/fuse/mod.rs
+++ b/tvix/store/src/fuse/mod.rs
@@ -18,11 +18,12 @@ use crate::{
 };
 use fuser::{FileAttr, ReplyAttr, Request};
 use nix_compat::store_path::StorePath;
-use std::io::{self, Read, Seek};
+use std::io;
 use std::os::unix::ffi::OsStrExt;
 use std::str::FromStr;
 use std::sync::Arc;
 use std::{collections::HashMap, time::Duration};
+use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
 use tracing::{debug, info_span, warn};
 
 use self::inode_tracker::InodeTracker;
@@ -79,6 +80,8 @@ pub struct FUSE {
     file_handles: HashMap<u64, Box<dyn BlobReader>>,
 
     next_file_handle: u64,
+
+    tokio_handle: tokio::runtime::Handle,
 }
 
 impl FUSE {
@@ -100,6 +103,7 @@ impl FUSE {
 
             file_handles: Default::default(),
             next_file_handle: 1,
+            tokio_handle: tokio::runtime::Handle::current(),
         }
     }
 
@@ -430,6 +434,7 @@ impl fuser::Filesystem for FUSE {
             reply.error(libc::ENOSYS);
             return;
         }
+
         // lookup the inode
         match *self.inode_tracker.get(ino).unwrap() {
             // read is invalid on non-files.
@@ -441,7 +446,16 @@ impl fuser::Filesystem for FUSE {
                 let span = info_span!("read", blob.digest = %blob_digest);
                 let _enter = span.enter();
 
-                match self.blob_service.open_read(blob_digest) {
+                let blob_service = self.blob_service.clone();
+                let blob_digest = blob_digest.clone();
+
+                let task = self
+                    .tokio_handle
+                    .spawn(async move { blob_service.open_read(&blob_digest).await });
+
+                let blob_reader = self.tokio_handle.block_on(task).unwrap();
+
+                match blob_reader {
                     Ok(None) => {
                         warn!("blob not found");
                         reply.error(libc::EIO);
@@ -451,6 +465,7 @@ impl fuser::Filesystem for FUSE {
                         reply.error(libc::EIO);
                     }
                     Ok(Some(blob_reader)) => {
+                        debug!("add file handle {}", fh);
                         self.file_handles.insert(fh, blob_reader);
                         reply.opened(fh, 0);
 
@@ -477,9 +492,14 @@ impl fuser::Filesystem for FUSE {
         reply: fuser::ReplyEmpty,
     ) {
         // remove and get ownership on the blob reader
-        let blob_reader = self.file_handles.remove(&fh).unwrap();
-        // drop it, which will close it.
-        drop(blob_reader);
+        match self.file_handles.remove(&fh) {
+            // drop it, which will close it.
+            Some(blob_reader) => drop(blob_reader),
+            None => {
+                // These might already be dropped if a read error occured.
+                debug!("file_handle {} not found", fh);
+            }
+        }
 
         reply.ok();
     }
@@ -498,29 +518,70 @@ impl fuser::Filesystem for FUSE {
     ) {
         debug!("read");
 
-        let blob_reader = self.file_handles.get_mut(&fh).unwrap();
-
-        // seek to the offset specified, which is relative to the start of the file.
-        let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64));
-        match resp {
-            Ok(pos) => {
-                debug_assert_eq!(offset as u64, pos);
-            }
-            Err(e) => {
-                warn!("failed to seek to offset {}: {}", offset, e);
+        // We need to take out the blob reader from self.file_handles, so we can
+        // interact with it in the separate task.
+        // On success, we pass it back out of the task, so we can put it back in self.file_handles.
+        let mut blob_reader = match self.file_handles.remove(&fh) {
+            Some(blob_reader) => blob_reader,
+            None => {
+                warn!("file handle {} unknown", fh);
                 reply.error(libc::EIO);
                 return;
             }
-        }
+        };
 
-        // now with the blobreader seeked to this location, read size of data
-        let data: std::io::Result<Vec<u8>> =
-            blob_reader.bytes().take(size.try_into().unwrap()).collect();
+        let task = self.tokio_handle.spawn(async move {
+            // seek to the offset specified, which is relative to the start of the file.
+            let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await;
 
-        match data {
-            // respond with the requested data
-            Ok(data) => reply.data(&data),
-            Err(e) => reply.error(e.raw_os_error().unwrap()),
+            match resp {
+                Ok(pos) => {
+                    debug_assert_eq!(offset as u64, pos);
+                }
+                Err(e) => {
+                    warn!("failed to seek to offset {}: {}", offset, e);
+                    return Err(libc::EIO);
+                }
+            }
+
+            // As written in the fuser docs, read should send exactly the number
+            // of bytes requested except on EOF or error.
+
+            let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
+
+            while (buf.len() as u64) < size as u64 {
+                match blob_reader.fill_buf().await {
+                    Ok(int_buf) => {
+                        // copy things from the internal buffer into buf to fill it till up until size
+
+                        // an empty buffer signals we reached EOF.
+                        if int_buf.is_empty() {
+                            break;
+                        }
+
+                        // calculate how many bytes we can read from int_buf.
+                        // It's either all of int_buf, or the number of bytes missing in buf to reach size.
+                        let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len());
+
+                        // copy these bytes into our buffer
+                        buf.extend_from_slice(&int_buf[..len_to_copy]);
+                        // and consume them in the buffered reader.
+                        blob_reader.consume(len_to_copy);
+                    }
+                    Err(e) => return Err(e.raw_os_error().unwrap()),
+                }
+            }
+            Ok((buf, blob_reader))
+        });
+
+        let resp = self.tokio_handle.block_on(task).unwrap();
+
+        match resp {
+            Err(e) => reply.error(e),
+            Ok((buf, blob_reader)) => {
+                reply.data(&buf);
+                self.file_handles.insert(fh, blob_reader);
+            }
         }
     }
 
diff --git a/tvix/store/src/fuse/tests.rs b/tvix/store/src/fuse/tests.rs
index 2c99f75471a5..29856433b38c 100644
--- a/tvix/store/src/fuse/tests.rs
+++ b/tvix/store/src/fuse/tests.rs
@@ -1,8 +1,8 @@
-use std::fs;
 use std::io::Cursor;
 use std::os::unix::prelude::MetadataExt;
 use std::path::Path;
 use std::sync::Arc;
+use std::{fs, io};
 
 use tempfile::TempDir;
 
@@ -21,34 +21,25 @@ const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test";
 const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test";
 const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test";
 
-fn setup_and_mount<P: AsRef<Path>, F>(
-    mountpoint: P,
-    setup_fn: F,
-) -> Result<fuser::BackgroundSession, std::io::Error>
-where
-    F: Fn(Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>),
-{
-    setup_and_mount_with_listing(mountpoint, setup_fn, false)
-}
-
-fn setup_and_mount_with_listing<P: AsRef<Path>, F>(
-    mountpoint: P,
-    setup_fn: F,
-    list_root: bool,
-) -> Result<fuser::BackgroundSession, std::io::Error>
-where
-    F: Fn(Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>),
-{
+fn gen_svcs() -> (
+    Arc<dyn BlobService>,
+    Arc<dyn DirectoryService>,
+    Arc<dyn PathInfoService>,
+) {
     let blob_service = gen_blob_service();
     let directory_service = gen_directory_service();
     let path_info_service = gen_pathinfo_service(blob_service.clone(), directory_service.clone());
 
-    setup_fn(
-        blob_service.clone(),
-        directory_service.clone(),
-        path_info_service.clone(),
-    );
+    (blob_service, directory_service, path_info_service)
+}
 
+fn do_mount<P: AsRef<Path>>(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+    path_info_service: Arc<dyn PathInfoService>,
+    mountpoint: P,
+    list_root: bool,
+) -> io::Result<fuser::BackgroundSession> {
     let fs = FUSE::new(
         blob_service,
         directory_service,
@@ -58,16 +49,17 @@ where
     fuser::spawn_mount2(fs, mountpoint, &[])
 }
 
-fn populate_blob_a(
-    blob_service: Arc<dyn BlobService>,
-    _directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+async fn populate_blob_a(
+    blob_service: &Arc<dyn BlobService>,
+    _directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // Upload BLOB_A
-    let mut bw = blob_service.open_write();
-    std::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw)
+    let mut bw = blob_service.open_write().await;
+    tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw)
+        .await
         .expect("must succeed uploading");
-    bw.close().expect("must succeed closing");
+    bw.close().await.expect("must succeed closing");
 
     // Create a PathInfo for it
     let path_info = PathInfo {
@@ -84,16 +76,17 @@ fn populate_blob_a(
     path_info_service.put(path_info).expect("must succeed");
 }
 
-fn populate_blob_b(
-    blob_service: Arc<dyn BlobService>,
-    _directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+async fn populate_blob_b(
+    blob_service: &Arc<dyn BlobService>,
+    _directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // Upload BLOB_B
-    let mut bw = blob_service.open_write();
-    std::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw)
+    let mut bw = blob_service.open_write().await;
+    tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw)
+        .await
         .expect("must succeed uploading");
-    bw.close().expect("must succeed closing");
+    bw.close().await.expect("must succeed closing");
 
     // Create a PathInfo for it
     let path_info = PathInfo {
@@ -111,9 +104,9 @@ fn populate_blob_b(
 }
 
 fn populate_symlink(
-    _blob_service: Arc<dyn BlobService>,
-    _directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+    _blob_service: &Arc<dyn BlobService>,
+    _directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // Create a PathInfo for it
     let path_info = PathInfo {
@@ -131,9 +124,9 @@ fn populate_symlink(
 /// This writes a symlink pointing to /nix/store/somewhereelse,
 /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED.
 fn populate_symlink2(
-    _blob_service: Arc<dyn BlobService>,
-    _directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+    _blob_service: &Arc<dyn BlobService>,
+    _directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // Create a PathInfo for it
     let path_info = PathInfo {
@@ -148,16 +141,16 @@ fn populate_symlink2(
     path_info_service.put(path_info).expect("must succeed");
 }
 
-fn populate_directory_with_keep(
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+async fn populate_directory_with_keep(
+    blob_service: &Arc<dyn BlobService>,
+    directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // upload empty blob
-    let mut bw = blob_service.open_write();
+    let mut bw = blob_service.open_write().await;
     assert_eq!(
         fixtures::EMPTY_BLOB_DIGEST.to_vec(),
-        bw.close().expect("must succeed closing").to_vec(),
+        bw.close().await.expect("must succeed closing").to_vec(),
     );
 
     // upload directory
@@ -182,9 +175,9 @@ fn populate_directory_with_keep(
 /// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory
 /// itself.
 fn populate_pathinfo_without_directory(
-    _: Arc<dyn BlobService>,
-    _: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+    _: &Arc<dyn BlobService>,
+    _: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // upload pathinfo
     let path_info = PathInfo {
@@ -202,9 +195,9 @@ fn populate_pathinfo_without_directory(
 
 /// Insert , but don't provide the blob .keep is pointing to
 fn populate_blob_a_without_blob(
-    _: Arc<dyn BlobService>,
-    _: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+    _: &Arc<dyn BlobService>,
+    _: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // Create a PathInfo for blob A
     let path_info = PathInfo {
@@ -221,16 +214,16 @@ fn populate_blob_a_without_blob(
     path_info_service.put(path_info).expect("must succeed");
 }
 
-fn populate_directory_complicated(
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+async fn populate_directory_complicated(
+    blob_service: &Arc<dyn BlobService>,
+    directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // upload empty blob
-    let mut bw = blob_service.open_write();
+    let mut bw = blob_service.open_write().await;
     assert_eq!(
         fixtures::EMPTY_BLOB_DIGEST.to_vec(),
-        bw.close().expect("must succeed closing").to_vec(),
+        bw.close().await.expect("must succeed closing").to_vec(),
     );
 
     // upload inner directory
@@ -258,8 +251,8 @@ fn populate_directory_complicated(
 }
 
 /// Ensure mounting itself doesn't fail
-#[test]
-fn mount() {
+#[tokio::test]
+async fn mount() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -268,14 +261,22 @@ fn mount() {
 
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     fuser_session.join()
 }
 
 /// Ensure listing the root isn't allowed
-#[test]
-fn root() {
+#[tokio::test]
+async fn root() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -283,7 +284,15 @@ fn root() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     {
         // read_dir succeeds, but getting the first element will fail.
@@ -297,8 +306,8 @@ fn root() {
 }
 
 /// Ensure listing the root is allowed if configured explicitly
-#[test]
-fn root_with_listing() {
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn root_with_listing() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -306,8 +315,17 @@ fn root_with_listing() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount_with_listing(tmpdir.path(), populate_blob_a, true).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        true, /* allow listing */
+    )
+    .expect("must succeed");
 
     {
         // read_dir succeeds, but getting the first element will fail.
@@ -325,8 +343,8 @@ fn root_with_listing() {
 }
 
 /// Ensure we can stat a file at the root
-#[test]
-fn stat_file_at_root() {
+#[tokio::test]
+async fn stat_file_at_root() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -334,7 +352,17 @@ fn stat_file_at_root() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_A_NAME);
 
@@ -349,8 +377,8 @@ fn stat_file_at_root() {
 }
 
 /// Ensure we can read a file at the root
-#[test]
-fn read_file_at_root() {
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn read_file_at_root() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -358,7 +386,17 @@ fn read_file_at_root() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_A_NAME);
 
@@ -373,8 +411,8 @@ fn read_file_at_root() {
 }
 
 /// Ensure we can read a large file at the root
-#[test]
-fn read_large_file_at_root() {
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn read_large_file_at_root() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -382,7 +420,17 @@ fn read_large_file_at_root() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_b).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_b(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_B_NAME);
     {
@@ -405,8 +453,8 @@ fn read_large_file_at_root() {
 }
 
 /// Read the target of a symlink
-#[test]
-fn symlink_readlink() {
+#[tokio::test]
+async fn symlink_readlink() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -414,7 +462,18 @@ fn symlink_readlink() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), populate_symlink).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_symlink(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
     let p = tmpdir.path().join(SYMLINK_NAME);
 
     let target = fs::read_link(&p).expect("must succeed");
@@ -437,8 +496,8 @@ fn symlink_readlink() {
 }
 
 /// Read and stat a regular file through a symlink pointing to it.
-#[test]
-fn read_stat_through_symlink() {
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn read_stat_through_symlink() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -446,10 +505,17 @@ fn read_stat_through_symlink() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| {
-        populate_blob_a(bs.clone(), ds.clone(), ps.clone());
-        populate_symlink(bs, ds, ps);
-    })
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+    populate_symlink(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
     .expect("must succeed");
 
     let p_symlink = tmpdir.path().join(SYMLINK_NAME);
@@ -473,8 +539,8 @@ fn read_stat_through_symlink() {
 }
 
 /// Read a directory in the root, and validate some attributes.
-#[test]
-fn read_stat_directory() {
+#[tokio::test]
+async fn read_stat_directory() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -482,8 +548,17 @@ fn read_stat_directory() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_with_keep).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
 
@@ -495,9 +570,9 @@ fn read_stat_directory() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
 /// Read a blob inside a directory. This ensures we successfully populate directory data.
-fn read_blob_inside_dir() {
+async fn read_blob_inside_dir() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -505,8 +580,17 @@ fn read_blob_inside_dir() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_with_keep).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep");
 
@@ -522,10 +606,10 @@ fn read_blob_inside_dir() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
 /// Read a blob inside a directory inside a directory. This ensures we properly
 /// populate directories as we traverse down the structure.
-fn read_blob_deep_inside_dir() {
+async fn read_blob_deep_inside_dir() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -533,8 +617,17 @@ fn read_blob_deep_inside_dir() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir
         .path()
@@ -555,8 +648,8 @@ fn read_blob_deep_inside_dir() {
 }
 
 /// Ensure readdir works.
-#[test]
-fn readdir() {
+#[tokio::test]
+async fn readdir() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -564,8 +657,17 @@ fn readdir() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME);
 
@@ -601,9 +703,9 @@ fn readdir() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test]
 /// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory.
-fn readdir_deep() {
+async fn readdir_deep() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -611,8 +713,17 @@ fn readdir_deep() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep");
 
@@ -636,8 +747,8 @@ fn readdir_deep() {
 }
 
 /// Check attributes match how they show up in /nix/store normally.
-#[test]
-fn check_attributes() {
+#[tokio::test]
+async fn check_attributes() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -645,11 +756,18 @@ fn check_attributes() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| {
-        populate_blob_a(bs.clone(), ds.clone(), ps.clone());
-        populate_directory_with_keep(bs.clone(), ds.clone(), ps.clone());
-        populate_symlink(bs, ds, ps);
-    })
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+    populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
+    populate_symlink(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
     .expect("must succeed");
 
     let p_file = tmpdir.path().join(BLOB_A_NAME);
@@ -689,10 +807,10 @@ fn check_attributes() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test]
 /// Ensure we allocate the same inodes for the same directory contents.
 /// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP.
-fn compare_inodes_directories() {
+async fn compare_inodes_directories() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -700,10 +818,17 @@ fn compare_inodes_directories() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| {
-        populate_directory_with_keep(bs.clone(), ds.clone(), ps.clone());
-        populate_directory_complicated(bs, ds, ps);
-    })
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
     .expect("must succeed");
 
     let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
@@ -720,8 +845,8 @@ fn compare_inodes_directories() {
 
 /// Ensure we allocate the same inodes for the same directory contents.
 /// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep
-#[test]
-fn compare_inodes_files() {
+#[tokio::test]
+async fn compare_inodes_files() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -729,8 +854,17 @@ fn compare_inodes_files() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep");
     let p_keep2 = tmpdir
@@ -750,8 +884,8 @@ fn compare_inodes_files() {
 
 /// Ensure we allocate the same inode for symlinks pointing to the same targets.
 /// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2.
-#[test]
-fn compare_inodes_symlinks() {
+#[tokio::test]
+async fn compare_inodes_symlinks() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -759,10 +893,17 @@ fn compare_inodes_symlinks() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| {
-        populate_directory_complicated(bs.clone(), ds.clone(), ps.clone());
-        populate_symlink2(bs, ds, ps);
-    })
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+    populate_symlink2(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
     .expect("must succeed");
 
     let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa");
@@ -778,8 +919,8 @@ fn compare_inodes_symlinks() {
 }
 
 /// Check we match paths exactly.
-#[test]
-fn read_wrong_paths_in_root() {
+#[tokio::test]
+async fn read_wrong_paths_in_root() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -787,7 +928,17 @@ fn read_wrong_paths_in_root() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     // wrong name
     assert!(!tmpdir
@@ -817,8 +968,8 @@ fn read_wrong_paths_in_root() {
 }
 
 /// Make sure writes are not allowed
-#[test]
-fn disallow_writes() {
+#[tokio::test]
+async fn disallow_writes() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -827,7 +978,16 @@ fn disallow_writes() {
 
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_A_NAME);
     let e = std::fs::File::create(p).expect_err("must fail");
@@ -837,17 +997,26 @@ fn disallow_writes() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test]
 /// Ensure we get an IO error if the directory service does not have the Directory object.
-fn missing_directory() {
+async fn missing_directory() {
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
         return;
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_pathinfo_without_directory).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
 
@@ -871,17 +1040,26 @@ fn missing_directory() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
 /// Ensure we get an IO error if the blob service does not have the blob
-fn missing_blob() {
+async fn missing_blob() {
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
         return;
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_blob_a_without_blob).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_A_NAME);
 
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index cd3dc01cfe0a..6764eaddb457 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -6,8 +6,6 @@ use std::sync::Arc;
 use std::{
     collections::HashMap,
     fmt::Debug,
-    fs::File,
-    io,
     os::unix::prelude::PermissionsExt,
     path::{Path, PathBuf},
 };
@@ -57,7 +55,7 @@ impl From<super::Error> for Error {
 //
 // It assumes the caller adds returned nodes to the directories it assembles.
 #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
-fn process_entry(
+async fn process_entry(
     blob_service: Arc<dyn BlobService>,
     directory_putter: &mut Box<dyn DirectoryPutter>,
     entry: &walkdir::DirEntry,
@@ -102,16 +100,17 @@ fn process_entry(
             .metadata()
             .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
 
-        let mut file = File::open(entry.path())
+        let mut file = tokio::fs::File::open(entry.path())
+            .await
             .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?;
 
-        let mut writer = blob_service.open_write();
+        let mut writer = blob_service.open_write().await;
 
-        if let Err(e) = io::copy(&mut file, &mut writer) {
+        if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
             return Err(Error::UnableToRead(entry.path().to_path_buf(), e));
         };
 
-        let digest = writer.close()?;
+        let digest = writer.close().await?;
 
         return Ok(proto::node::Node::File(proto::FileNode {
             name: entry.file_name().as_bytes().to_vec().into(),
@@ -137,7 +136,7 @@ fn process_entry(
 /// caller to possibly register it somewhere (and potentially rename it based on
 /// some naming scheme.
 #[instrument(skip(blob_service, directory_service), fields(path=?p))]
-pub fn ingest_path<P: AsRef<Path> + Debug>(
+pub async fn ingest_path<P: AsRef<Path> + Debug>(
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
     p: P,
@@ -175,7 +174,8 @@ pub fn ingest_path<P: AsRef<Path> + Debug>(
             &mut directory_putter,
             &entry,
             maybe_directory,
-        )?;
+        )
+        .await?;
 
         if entry.depth() == 0 {
             return Ok(node);
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index cc5af488abf8..4255148fc5e8 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -7,10 +7,8 @@ use crate::{
 use count_write::CountWrite;
 use nix_compat::nar;
 use sha2::{Digest, Sha256};
-use std::{
-    io::{self, BufReader},
-    sync::Arc,
-};
+use std::{io, sync::Arc};
+use tokio::io::BufReader;
 use tracing::warn;
 
 /// Invoke [write_nar], and return the size and sha256 digest of the produced
@@ -75,8 +73,11 @@ fn walk_node(
                 ))
             })?;
 
-            let mut blob_reader = match blob_service
-                .open_read(&digest)
+            // HACK: blob_service is async, but this function isn't async yet..
+            let tokio_handle = tokio::runtime::Handle::current();
+
+            let blob_reader = match tokio_handle
+                .block_on(async { blob_service.open_read(&digest).await })
                 .map_err(RenderError::StoreError)?
             {
                 Some(blob_reader) => Ok(BufReader::new(blob_reader)),
@@ -90,7 +91,7 @@ fn walk_node(
                 .file(
                     proto_file_node.executable,
                     proto_file_node.size.into(),
-                    &mut blob_reader,
+                    &mut tokio_util::io::SyncIoBridge::new(blob_reader),
                 )
                 .map_err(RenderError::NARWriterError)?;
         }
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index 2d8c396539d8..8bd3083c17ad 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,4 +1,6 @@
-use crate::{blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead};
+use crate::blobservice::BlobService;
+use core::pin::pin;
+use futures::TryFutureExt;
 use std::{
     collections::VecDeque,
     io,
@@ -6,7 +8,6 @@ use std::{
     pin::Pin,
     sync::Arc,
 };
-use tokio::task;
 use tokio_stream::StreamExt;
 use tokio_util::io::ReaderStream;
 use tonic::{async_trait, Request, Response, Status, Streaming};
@@ -103,7 +104,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
             return Err(Status::internal("not implemented"));
         }
 
-        match self.blob_service.has(&req_digest) {
+        match self.blob_service.has(&req_digest).await {
             Ok(true) => Ok(Response::new(super::BlobMeta::default())),
             Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
             Err(e) => Err(e.into()),
@@ -122,13 +123,8 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
             .try_into()
             .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
 
-        match self.blob_service.open_read(&req_digest) {
+        match self.blob_service.open_read(&req_digest).await {
             Ok(Some(reader)) => {
-                let async_reader: SyncReadIntoAsyncRead<
-                    _,
-                    BytesMutWithDefaultCapacity<{ 100 * 1024 }>,
-                > = reader.into();
-
                 fn stream_mapper(
                     x: Result<bytes::Bytes, io::Error>,
                 ) -> Result<super::BlobChunk, Status> {
@@ -138,7 +134,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
                     }
                 }
 
-                let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper);
+                let chunks_stream = ReaderStream::new(reader).map(stream_mapper);
                 Ok(Response::new(Box::pin(chunks_stream)))
             }
             Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
@@ -158,35 +154,28 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
                 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
         });
 
-        let data_reader = tokio_util::io::StreamReader::new(data_stream);
-
-        // prepare a writer, which we'll use in the blocking task below.
-        let mut writer = self.blob_service.open_write();
+        let mut data_reader = tokio_util::io::StreamReader::new(data_stream);
 
-        let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> {
-            // construct a sync reader to the data
-            let mut reader = tokio_util::io::SyncIoBridge::new(data_reader);
+        let mut blob_writer = pin!(self.blob_service.open_write().await);
 
-            io::copy(&mut reader, &mut writer).map_err(|e| {
+        tokio::io::copy(&mut data_reader, &mut blob_writer)
+            .await
+            .map_err(|e| {
                 warn!("error copying: {}", e);
                 Status::internal("error copying")
             })?;
 
-            let digest = writer
-                .close()
-                .map_err(|e| {
-                    warn!("error closing stream: {}", e);
-                    Status::internal("error closing stream")
-                })?
-                .to_vec();
-
-            Ok(super::PutBlobResponse {
-                digest: digest.into(),
+        let digest = blob_writer
+            .close()
+            .map_err(|e| {
+                warn!("error closing stream: {}", e);
+                Status::internal("error closing stream")
             })
-        })
-        .await
-        .map_err(|_| Status::internal("failed to wait for task"))??;
+            .await?
+            .to_vec();
 
-        Ok(Response::new(result))
+        Ok(Response::new(super::PutBlobResponse {
+            digest: digest.into(),
+        }))
     }
 }
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 16a2fd51d0df..33861d9ffa4e 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -71,10 +71,12 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
         match request.into_inner().node {
             None => Err(Status::invalid_argument("no root node sent")),
             Some(root_node) => {
-                let (nar_size, nar_sha256) = self
-                    .path_info_service
-                    .calculate_nar(&root_node)
-                    .expect("error during nar calculation"); // TODO: handle error
+                let path_info_service = self.path_info_service.clone();
+                let (nar_size, nar_sha256) =
+                    task::spawn_blocking(move || path_info_service.calculate_nar(&root_node))
+                        .await
+                        .unwrap()
+                        .expect("error during nar calculation"); // TODO: handle error
 
                 Ok(Response::new(proto::CalculateNarResponse {
                     nar_size,
diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs
index 044769ce579d..97a2694ac3de 100644
--- a/tvix/store/src/proto/mod.rs
+++ b/tvix/store/src/proto/mod.rs
@@ -12,8 +12,6 @@ mod grpc_blobservice_wrapper;
 mod grpc_directoryservice_wrapper;
 mod grpc_pathinfoservice_wrapper;
 
-mod sync_read_into_async_read;
-
 pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
 pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper;
 pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper;
diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs
deleted file mode 100644
index 0a0ef019781c..000000000000
--- a/tvix/store/src/proto/sync_read_into_async_read.rs
+++ /dev/null
@@ -1,158 +0,0 @@
-use bytes::Buf;
-use core::task::Poll::Ready;
-use futures::ready;
-use futures::Future;
-use std::io;
-use std::io::Read;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::Context;
-use std::task::Poll;
-use tokio::io::AsyncRead;
-use tokio::runtime::Handle;
-use tokio::sync::Mutex;
-use tokio::task::JoinHandle;
-
-#[derive(Debug)]
-enum State<Buf: bytes::Buf + bytes::BufMut> {
-    Idle(Option<Buf>),
-    Busy(JoinHandle<(io::Result<usize>, Buf)>),
-}
-
-use State::{Busy, Idle};
-
-/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a
-/// synchronous API.
-#[derive(Debug)]
-pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> {
-    state: Mutex<State<Buf>>,
-    reader: Arc<Mutex<R>>,
-    rt: Handle,
-}
-
-impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> {
-    /// This must be called from within a Tokio runtime context, or else it will panic.
-    #[track_caller]
-    pub fn new(rt: Handle, reader: R) -> Self {
-        Self {
-            rt,
-            state: State::Idle(None).into(),
-            reader: Arc::new(reader.into()),
-        }
-    }
-
-    /// This must be called from within a Tokio runtime context, or else it will panic.
-    pub fn new_with_reader(readable: R) -> Self {
-        Self::new(Handle::current(), readable)
-    }
-}
-
-/// Repeats operations that are interrupted.
-macro_rules! uninterruptibly {
-    ($e:expr) => {{
-        loop {
-            match $e {
-                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
-                res => break res,
-            }
-        }
-    }};
-}
-
-impl<
-        R: Read + Send + 'static + std::marker::Unpin,
-        Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static,
-    > AsyncRead for SyncReadIntoAsyncRead<R, Buf>
-{
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-        dst: &mut tokio::io::ReadBuf<'_>,
-    ) -> Poll<io::Result<()>> {
-        let me = self.get_mut();
-        // Do we need this mutex?
-        let state = me.state.get_mut();
-
-        loop {
-            match state {
-                Idle(ref mut buf_cell) => {
-                    let mut buf = buf_cell.take().unwrap_or_default();
-
-                    if buf.has_remaining() {
-                        // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]`
-                        // The `rest` is stuffed into the `buf_cell` for further poll_read.
-                        // The other is completely consumed into the unfilled destination.
-                        // `rest` can be empty.
-                        let mut adjusted_src =
-                            buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining()));
-                        let copied_size = adjusted_src.remaining();
-                        adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size));
-                        dst.set_filled(copied_size);
-                        *buf_cell = Some(buf);
-                        return Ready(Ok(()));
-                    }
-
-                    let reader = me.reader.clone();
-                    *state = Busy(me.rt.spawn_blocking(move || {
-                        let result = uninterruptibly!(reader.blocking_lock().read(
-                            // SAFETY: `reader.read` will *ONLY* write initialized bytes
-                            // and never *READ* uninitialized bytes
-                            // inside this buffer.
-                            //
-                            // Furthermore, casting the slice as `*mut [u8]`
-                            // is safe because it has the same layout.
-                            //
-                            // Finally, the pointer obtained is valid and owned
-                            // by `buf` only as we have a valid mutable reference
-                            // to it, it is valid for write.
-                            //
-                            // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998
-                            unsafe {
-                                &mut *(buf.chunk_mut().as_uninit_slice_mut()
-                                    as *mut [std::mem::MaybeUninit<u8>]
-                                    as *mut [u8])
-                            }
-                        ));
-
-                        if let Ok(n) = result {
-                            // SAFETY: given we initialize `n` bytes, we can move `n` bytes
-                            // forward.
-                            unsafe {
-                                buf.advance_mut(n);
-                            }
-                        }
-
-                        (result, buf)
-                    }));
-                }
-                Busy(ref mut rx) => {
-                    let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?;
-
-                    match result {
-                        Ok(n) => {
-                            if n > 0 {
-                                let remaining = std::cmp::min(n, dst.remaining());
-                                let mut adjusted_src = buf.copy_to_bytes(remaining);
-                                adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining));
-                                dst.advance(remaining);
-                            }
-                            *state = Idle(Some(buf));
-                            return Ready(Ok(()));
-                        }
-                        Err(e) => {
-                            *state = Idle(None);
-                            return Ready(Err(e));
-                        }
-                    }
-                }
-            }
-        }
-    }
-}
-
-impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> {
-    /// This must be called from within a Tokio runtime context, or else it will panic.
-    fn from(value: R) -> Self {
-        Self::new_with_reader(value)
-    }
-}
diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs
index ccaa4f4e4244..45b9c3440d89 100644
--- a/tvix/store/src/tests/import.rs
+++ b/tvix/store/src/tests/import.rs
@@ -9,8 +9,8 @@ use tempfile::TempDir;
 use std::os::unix::ffi::OsStrExt;
 
 #[cfg(target_family = "unix")]
-#[test]
-fn symlink() {
+#[tokio::test]
+async fn symlink() {
     let tmpdir = TempDir::new().unwrap();
 
     std::fs::create_dir_all(&tmpdir).unwrap();
@@ -25,6 +25,7 @@ fn symlink() {
         gen_directory_service(),
         tmpdir.path().join("doesntmatter"),
     )
+    .await
     .expect("must succeed");
 
     assert_eq!(
@@ -36,8 +37,8 @@ fn symlink() {
     )
 }
 
-#[test]
-fn single_file() {
+#[tokio::test]
+async fn single_file() {
     let tmpdir = TempDir::new().unwrap();
 
     std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap();
@@ -49,6 +50,7 @@ fn single_file() {
         gen_directory_service(),
         tmpdir.path().join("root"),
     )
+    .await
     .expect("must succeed");
 
     assert_eq!(
@@ -62,12 +64,12 @@ fn single_file() {
     );
 
     // ensure the blob has been uploaded
-    assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).unwrap());
+    assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap());
 }
 
 #[cfg(target_family = "unix")]
-#[test]
-fn complicated() {
+#[tokio::test]
+async fn complicated() {
     let tmpdir = TempDir::new().unwrap();
 
     // File ``.keep`
@@ -87,6 +89,7 @@ fn complicated() {
         directory_service.clone(),
         tmpdir.path(),
     )
+    .await
     .expect("must succeed");
 
     // ensure root_node matched expectations
@@ -116,5 +119,5 @@ fn complicated() {
         .is_some());
 
     // ensure EMPTY_BLOB_CONTENTS has been uploaded
-    assert!(blob_service.has(&EMPTY_BLOB_DIGEST).unwrap());
+    assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
 }
diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs
index 102bf5e7ce24..22dbd7bcba9e 100644
--- a/tvix/store/src/tests/nar_renderer.rs
+++ b/tvix/store/src/tests/nar_renderer.rs
@@ -28,22 +28,26 @@ fn single_symlink() {
 }
 
 /// Make sure the NARRenderer fails if a referred blob doesn't exist.
-#[test]
-fn single_file_missing_blob() {
+#[tokio::test]
+async fn single_file_missing_blob() {
     let mut buf: Vec<u8> = vec![];
 
-    let e = write_nar(
-        &mut buf,
-        &crate::proto::node::Node::File(FileNode {
-            name: "doesntmatter".into(),
-            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-            size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
-            executable: false,
-        }),
-        // the blobservice is empty intentionally, to provoke the error.
-        gen_blob_service(),
-        gen_directory_service(),
-    )
+    let e = tokio::task::spawn_blocking(move || {
+        write_nar(
+            &mut buf,
+            &crate::proto::node::Node::File(FileNode {
+                name: "doesntmatter".into(),
+                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+                size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
+                executable: false,
+            }),
+            // the blobservice is empty intentionally, to provoke the error.
+            gen_blob_service(),
+            gen_directory_service(),
+        )
+    })
+    .await
+    .unwrap()
     .expect_err("must fail");
 
     match e {
@@ -56,34 +60,43 @@ fn single_file_missing_blob() {
 
 /// Make sure the NAR Renderer fails if the returned blob meta has another size
 /// than specified in the proto node.
-#[test]
-fn single_file_wrong_blob_size() {
+#[tokio::test]
+async fn single_file_wrong_blob_size() {
     let blob_service = gen_blob_service();
 
     // insert blob into the store
-    let mut writer = blob_service.open_write();
-    io::copy(
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(
         &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()),
         &mut writer,
     )
+    .await
     .unwrap();
-    assert_eq!(HELLOWORLD_BLOB_DIGEST.clone(), writer.close().unwrap());
+    assert_eq!(
+        HELLOWORLD_BLOB_DIGEST.clone(),
+        writer.close().await.unwrap()
+    );
 
+    let bs = blob_service.clone();
     // Test with a root FileNode of a too big size
     {
         let mut buf: Vec<u8> = vec![];
 
-        let e = write_nar(
-            &mut buf,
-            &crate::proto::node::Node::File(FileNode {
-                name: "doesntmatter".into(),
-                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-                size: 42, // <- note the wrong size here!
-                executable: false,
-            }),
-            blob_service.clone(),
-            gen_directory_service(),
-        )
+        let e = tokio::task::spawn_blocking(move || {
+            write_nar(
+                &mut buf,
+                &crate::proto::node::Node::File(FileNode {
+                    name: "doesntmatter".into(),
+                    digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+                    size: 42, // <- note the wrong size here!
+                    executable: false,
+                }),
+                bs,
+                gen_directory_service(),
+            )
+        })
+        .await
+        .unwrap()
         .expect_err("must fail");
 
         match e {
@@ -94,22 +107,27 @@ fn single_file_wrong_blob_size() {
         }
     }
 
+    let bs = blob_service.clone();
     // Test with a root FileNode of a too small size
     {
         let mut buf: Vec<u8> = vec![];
 
-        let e = write_nar(
-            &mut buf,
-            &crate::proto::node::Node::File(FileNode {
-                name: "doesntmatter".into(),
-                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-                size: 2, // <- note the wrong size here!
-                executable: false,
-            }),
-            blob_service,
-            gen_directory_service(),
-        )
-        .expect_err("must fail");
+        let e = tokio::task::spawn_blocking(move || {
+            write_nar(
+                &mut buf,
+                &crate::proto::node::Node::File(FileNode {
+                    name: "doesntmatter".into(),
+                    digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+                    size: 2, // <- note the wrong size here!
+                    executable: false,
+                }),
+                bs,
+                gen_directory_service(),
+            )
+            .expect_err("must fail")
+        })
+        .await
+        .unwrap();
 
         match e {
             crate::nar::RenderError::NARWriterError(e) => {
@@ -120,51 +138,63 @@ fn single_file_wrong_blob_size() {
     }
 }
 
-#[test]
-fn single_file() {
+#[tokio::test]
+async fn single_file() {
     let blob_service = gen_blob_service();
 
     // insert blob into the store
-    let mut writer = blob_service.open_write();
-    io::copy(
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(
         &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.clone()),
         &mut writer,
     )
+    .await
     .unwrap();
-    assert_eq!(HELLOWORLD_BLOB_DIGEST.clone(), writer.close().unwrap());
+
+    assert_eq!(
+        HELLOWORLD_BLOB_DIGEST.clone(),
+        writer.close().await.unwrap()
+    );
 
     let mut buf: Vec<u8> = vec![];
 
-    write_nar(
-        &mut buf,
-        &crate::proto::node::Node::File(FileNode {
-            name: "doesntmatter".into(),
-            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-            size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
-            executable: false,
-        }),
-        blob_service,
-        gen_directory_service(),
-    )
-    .expect("must succeed");
+    let buf = tokio::task::spawn_blocking(move || {
+        write_nar(
+            &mut buf,
+            &crate::proto::node::Node::File(FileNode {
+                name: "doesntmatter".into(),
+                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+                size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
+                executable: false,
+            }),
+            blob_service,
+            gen_directory_service(),
+        )
+        .expect("must succeed");
+
+        buf
+    })
+    .await
+    .unwrap();
 
     assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec());
 }
 
-#[test]
-fn test_complicated() {
+#[tokio::test]
+async fn test_complicated() {
     let blob_service = gen_blob_service();
     let directory_service = gen_directory_service();
 
     // put all data into the stores.
     // insert blob into the store
-    let mut writer = blob_service.open_write();
-    io::copy(
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(
         &mut io::Cursor::new(EMPTY_BLOB_CONTENTS.clone()),
         &mut writer,
     )
+    .await
     .unwrap();
-    assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().unwrap());
+    assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap());
 
     directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap();
     directory_service
@@ -173,30 +203,44 @@ fn test_complicated() {
 
     let mut buf: Vec<u8> = vec![];
 
-    write_nar(
-        &mut buf,
-        &crate::proto::node::Node::Directory(DirectoryNode {
-            name: "doesntmatter".into(),
-            digest: DIRECTORY_COMPLICATED.digest().into(),
-            size: DIRECTORY_COMPLICATED.size(),
-        }),
-        blob_service.clone(),
-        directory_service.clone(),
-    )
-    .expect("must succeed");
+    let bs = blob_service.clone();
+    let ds = directory_service.clone();
+
+    let buf = tokio::task::spawn_blocking(move || {
+        write_nar(
+            &mut buf,
+            &crate::proto::node::Node::Directory(DirectoryNode {
+                name: "doesntmatter".into(),
+                digest: DIRECTORY_COMPLICATED.digest().into(),
+                size: DIRECTORY_COMPLICATED.size(),
+            }),
+            bs,
+            ds,
+        )
+        .expect("must succeed");
+        buf
+    })
+    .await
+    .unwrap();
 
     assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec());
 
     // ensure calculate_nar does return the correct sha256 digest and sum.
-    let (nar_size, nar_digest) = calculate_size_and_sha256(
-        &crate::proto::node::Node::Directory(DirectoryNode {
-            name: "doesntmatter".into(),
-            digest: DIRECTORY_COMPLICATED.digest().into(),
-            size: DIRECTORY_COMPLICATED.size(),
-        }),
-        blob_service,
-        directory_service,
-    )
+    let bs = blob_service.clone();
+    let ds = directory_service.clone();
+    let (nar_size, nar_digest) = tokio::task::spawn_blocking(move || {
+        calculate_size_and_sha256(
+            &crate::proto::node::Node::Directory(DirectoryNode {
+                name: "doesntmatter".into(),
+                digest: DIRECTORY_COMPLICATED.digest().into(),
+                size: DIRECTORY_COMPLICATED.size(),
+            }),
+            bs,
+            ds,
+        )
+    })
+    .await
+    .unwrap()
     .expect("must succeed");
 
     assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size);