diff options
author | Florian Klink <flokli@flokli.de> | 2023-09-13T12·20+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-09-18T10·33+0000 |
commit | da6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch) | |
tree | 5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/cli/src | |
parent | 3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff) |
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync. This however had some annoying consequences: - It became more and more complicated to track when we're in a context with an async runtime in the context or not, producing bugs like https://b.tvl.fyi/issues/304 - The sync trait shielded away async clients from async worloads, requiring manual block_on code inside the gRPC client code, and spawn_blocking calls in consumers of the trait, even if they were async (like the gRPC server) - We had to write our own custom glue code (SyncReadIntoAsyncRead) to convert a sync io::Read into a tokio::io::AsyncRead, which already existed in tokio internally, but upstream ia hesitant to expose. This now makes the BlobService trait async (via the async_trait macro, like we already do in various gRPC parts), and replaces the sync readers and writers with their async counterparts. Tests interacting with a BlobService now need to have an async runtime available, the easiest way for this is to mark the test functions with the tokio::test macro, allowing us to directly .await in the test function. In places where we don't have an async runtime available from context (like tvix-cli), we can pass one down explicitly. Now that we don't provide a sync interface anymore, the (sync) FUSE library now holds a pointer to a tokio runtime handle, and needs to at least have 2 threads available when talking to a blob service (which is why some of the tests now use the multi_thread flavor). The FUSE tests got a bit more verbose, as we couldn't use the setup_and_mount function accepting a callback anymore. We can hopefully move some of the test fixture setup to rstest in the future to make this less repetitive. Co-Authored-By: Connor Brewster <cbrewster@hey.com> Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329 Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/cli/src')
-rw-r--r-- | tvix/cli/src/main.rs | 9 | ||||
-rw-r--r-- | tvix/cli/src/tvix_store_io.rs | 175 |
2 files changed, 107 insertions, 77 deletions
diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs index 1980ac1731e7..65970e1d1cd8 100644 --- a/tvix/cli/src/main.rs +++ b/tvix/cli/src/main.rs @@ -80,9 +80,16 @@ fn interpret(code: &str, path: Option<PathBuf>, args: &Args, explain: bool) -> b directory_service.clone(), )); + let tokio_runtime = tokio::runtime::Runtime::new().unwrap(); + eval.io_handle = Box::new(tvix_io::TvixIO::new( known_paths.clone(), - TvixStoreIO::new(blob_service, directory_service, path_info_service), + TvixStoreIO::new( + blob_service, + directory_service, + path_info_service, + tokio_runtime.handle().clone(), + ), )); // bundle fetchurl.nix (used in nixpkgs) by resolving <nix> to diff --git a/tvix/cli/src/tvix_store_io.rs b/tvix/cli/src/tvix_store_io.rs index 2c673385342f..1a373a705fe4 100644 --- a/tvix/cli/src/tvix_store_io.rs +++ b/tvix/cli/src/tvix_store_io.rs @@ -2,6 +2,7 @@ use nix_compat::store_path::{self, StorePath}; use std::{io, path::Path, path::PathBuf, sync::Arc}; +use tokio::io::AsyncReadExt; use tracing::{error, instrument, warn}; use tvix_eval::{EvalIO, FileType, StdIO}; @@ -27,6 +28,7 @@ pub struct TvixStoreIO { directory_service: Arc<dyn DirectoryService>, path_info_service: Arc<dyn PathInfoService>, std_io: StdIO, + tokio_handle: tokio::runtime::Handle, } impl TvixStoreIO { @@ -34,12 +36,14 @@ impl TvixStoreIO { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, path_info_service: Arc<dyn PathInfoService>, + tokio_handle: tokio::runtime::Handle, ) -> Self { Self { blob_service, directory_service, path_info_service, std_io: StdIO {}, + tokio_handle, } } @@ -86,65 +90,6 @@ impl TvixStoreIO { sub_path, )?) } - - /// Imports a given path on the filesystem into the store, and returns the - /// [PathInfo] describing the path, that was sent to - /// [PathInfoService]. - /// While not part of the [EvalIO], it's still useful for clients who - /// care about the [PathInfo]. - #[instrument(skip(self), ret, err)] - pub fn import_path_with_pathinfo(&self, path: &std::path::Path) -> Result<PathInfo, io::Error> { - // Call [import::ingest_path], which will walk over the given path and return a root_node. - let root_node = import::ingest_path( - self.blob_service.clone(), - self.directory_service.clone(), - path, - ) - .expect("error during import_path"); - - // Render the NAR - let (nar_size, nar_sha256) = calculate_size_and_sha256( - &root_node, - self.blob_service.clone(), - self.directory_service.clone(), - ) - .expect("error during nar calculation"); // TODO: handle error - - // TODO: make a path_to_name helper function? - let name = path - .file_name() - .expect("path must not be ..") - .to_str() - .expect("path must be valid unicode"); - - let output_path = store_path::build_nar_based_store_path(&nar_sha256, name); - - // assemble a new root_node with a name that is derived from the nar hash. - let root_node = root_node.rename(output_path.to_string().into_bytes().into()); - - // assemble the [PathInfo] object. - let path_info = PathInfo { - node: Some(tvix_store::proto::Node { - node: Some(root_node), - }), - // There's no reference scanning on path contents ingested like this. - references: vec![], - narinfo: Some(NarInfo { - nar_size, - nar_sha256: nar_sha256.to_vec().into(), - signatures: vec![], - reference_names: vec![], - // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6` - // do we need this anywhere? - }), - }; - - // put into [PathInfoService], and return the [PathInfo] that we get - // back from there (it might contain additional signatures). - let path_info = self.path_info_service.put(path_info)?; - - Ok(path_info) - } } impl EvalIO for TvixStoreIO { @@ -197,24 +142,33 @@ impl EvalIO for TvixStoreIO { ) })?; - let reader = { - let resp = self.blob_service.open_read(&digest)?; - match resp { - Some(blob_reader) => blob_reader, - None => { - error!( - blob.digest = %digest, - "blob not found", - ); - Err(io::Error::new( - io::ErrorKind::NotFound, - format!("blob {} not found", &digest), - ))? + let blob_service = self.blob_service.clone(); + + let task = self.tokio_handle.spawn(async move { + let mut reader = { + let resp = blob_service.open_read(&digest).await?; + match resp { + Some(blob_reader) => blob_reader, + None => { + error!( + blob.digest = %digest, + "blob not found", + ); + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("blob {} not found", &digest), + ))? + } } - } - }; + }; - io::read_to_string(reader) + let mut buf = String::new(); + + reader.read_to_string(&mut buf).await?; + Ok(buf) + }); + + self.tokio_handle.block_on(task).unwrap() } Node::Symlink(_symlink_node) => Err(io::Error::new( io::ErrorKind::Unsupported, @@ -296,7 +250,16 @@ impl EvalIO for TvixStoreIO { #[instrument(skip(self), ret, err)] fn import_path(&self, path: &std::path::Path) -> Result<PathBuf, std::io::Error> { - let path_info = self.import_path_with_pathinfo(path)?; + let p = path.to_owned(); + let blob_service = self.blob_service.clone(); + let directory_service = self.directory_service.clone(); + let path_info_service = self.path_info_service.clone(); + + let task = self.tokio_handle.spawn(async move { + import_path_with_pathinfo(blob_service, directory_service, path_info_service, &p).await + }); + + let path_info = self.tokio_handle.block_on(task).unwrap()?; // from the [PathInfo], extract the store path (as string). Ok({ @@ -320,3 +283,63 @@ impl EvalIO for TvixStoreIO { Some("/nix/store".to_string()) } } + +/// Imports a given path on the filesystem into the store, and returns the +/// [PathInfo] describing the path, that was sent to +/// [PathInfoService]. +#[instrument(skip(blob_service, directory_service, path_info_service), ret, err)] +async fn import_path_with_pathinfo( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + path_info_service: Arc<dyn PathInfoService>, + path: &std::path::Path, +) -> Result<PathInfo, io::Error> { + // Call [import::ingest_path], which will walk over the given path and return a root_node. + let root_node = import::ingest_path(blob_service.clone(), directory_service.clone(), path) + .await + .expect("error during import_path"); + + // Render the NAR. This is blocking. + let calc_task = tokio::task::spawn_blocking(move || { + let (nar_size, nar_sha256) = + calculate_size_and_sha256(&root_node, blob_service.clone(), directory_service.clone()) + .expect("error during nar calculation"); // TODO: handle error + (nar_size, nar_sha256, root_node) + }); + let (nar_size, nar_sha256, root_node) = calc_task.await.unwrap(); + + // TODO: make a path_to_name helper function? + let name = path + .file_name() + .expect("path must not be ..") + .to_str() + .expect("path must be valid unicode"); + + let output_path = store_path::build_nar_based_store_path(&nar_sha256, name); + + // assemble a new root_node with a name that is derived from the nar hash. + let root_node = root_node.rename(output_path.to_string().into_bytes().into()); + + // assemble the [PathInfo] object. + let path_info = PathInfo { + node: Some(tvix_store::proto::Node { + node: Some(root_node), + }), + // There's no reference scanning on path contents ingested like this. + references: vec![], + narinfo: Some(NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: vec![], + // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6` + // do we need this anywhere? + }), + }; + + // put into [PathInfoService], and return the [PathInfo] that we get + // back from there (it might contain additional signatures). + let path_info = path_info_service.put(path_info)?; + + Ok(path_info) +} |