From 37a348b4fae16b2b1c5ec12deaa085a049833d7f Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Tue, 19 Sep 2023 11:46:41 -0500 Subject: refactor(tvix/store): Asyncify PathInfoService and DirectoryService We've decided to asyncify all of the services to reduce some of the pains going back and for between sync<->async. The end goal will be for all the tvix-store internals to be async and then expose a sync interface for things like tvix eval io. Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369 Autosubmit: Connor Brewster Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/cli/src/tvix_store_io.rs | 70 ++++++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 31 deletions(-) (limited to 'tvix/cli') diff --git a/tvix/cli/src/tvix_store_io.rs b/tvix/cli/src/tvix_store_io.rs index 1a373a705fe4..1ea718f1a188 100644 --- a/tvix/cli/src/tvix_store_io.rs +++ b/tvix/cli/src/tvix_store_io.rs @@ -57,12 +57,15 @@ impl TvixStoreIO { store_path: &StorePath, sub_path: &Path, ) -> Result, io::Error> { - let path_info = { - match self.path_info_service.get(store_path.digest)? { - // If there's no PathInfo found, early exit - None => return Ok(None), - Some(path_info) => path_info, - } + let path_info_service = self.path_info_service.clone(); + let digest = store_path.digest.clone(); + let task = self + .tokio_handle + .spawn(async move { path_info_service.get(digest).await }); + let path_info = match self.tokio_handle.block_on(task).unwrap()? { + // If there's no PathInfo found, early exit + None => return Ok(None), + Some(path_info) => path_info, }; let root_node = { @@ -84,11 +87,13 @@ impl TvixStoreIO { } }; - Ok(directoryservice::traverse_to( - self.directory_service.clone(), - root_node, - sub_path, - )?) + let directory_service = self.directory_service.clone(); + let sub_path = sub_path.to_owned(); + let task = self.tokio_handle.spawn(async move { + directoryservice::traverse_to(directory_service, root_node, &sub_path).await + }); + + Ok(self.tokio_handle.block_on(task).unwrap()?) } } @@ -195,17 +200,23 @@ impl EvalIO for TvixStoreIO { match node { Node::Directory(directory_node) => { // fetch the Directory itself. - let digest = directory_node.digest.clone().try_into().map_err(|_e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!( - "invalid digest length in directory node: {:?}", - directory_node - ), - ) - })?; - - if let Some(directory) = self.directory_service.get(&digest)? { + let digest: B3Digest = + directory_node.digest.clone().try_into().map_err(|_e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "invalid digest length in directory node: {:?}", + directory_node + ), + ) + })?; + + let directory_service = self.directory_service.clone(); + let digest_clone = digest.clone(); + let task = self + .tokio_handle + .spawn(async move { directory_service.get(&digest_clone).await }); + if let Some(directory) = self.tokio_handle.block_on(task).unwrap()? { let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new(); for node in directory.nodes() { children.push(match node { @@ -299,14 +310,11 @@ async fn import_path_with_pathinfo( .await .expect("error during import_path"); - // Render the NAR. This is blocking. - let calc_task = tokio::task::spawn_blocking(move || { - let (nar_size, nar_sha256) = - calculate_size_and_sha256(&root_node, blob_service.clone(), directory_service.clone()) - .expect("error during nar calculation"); // TODO: handle error - (nar_size, nar_sha256, root_node) - }); - let (nar_size, nar_sha256, root_node) = calc_task.await.unwrap(); + // Render the NAR. + let (nar_size, nar_sha256) = + calculate_size_and_sha256(&root_node, blob_service.clone(), directory_service.clone()) + .await + .expect("error during nar calculation"); // TODO: handle error // TODO: make a path_to_name helper function? let name = path @@ -339,7 +347,7 @@ async fn import_path_with_pathinfo( // put into [PathInfoService], and return the [PathInfo] that we get // back from there (it might contain additional signatures). - let path_info = path_info_service.put(path_info)?; + let path_info = path_info_service.put(path_info).await?; Ok(path_info) } -- cgit 1.4.1