diff options
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/Cargo.toml | 5 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 13 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 285 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/memory.rs | 20 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 24 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/sled.rs | 20 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/traverse.rs | 115 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/utils.rs | 161 | ||||
-rw-r--r-- | tvix/store/src/fs/mod.rs | 43 | ||||
-rw-r--r-- | tvix/store/src/fs/tests.rs | 231 | ||||
-rw-r--r-- | tvix/store/src/import.rs | 1 | ||||
-rw-r--r-- | tvix/store/src/nar/renderer.rs | 61 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 176 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 15 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 19 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 18 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_directoryservice_wrapper.rs | 11 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs | 17 | ||||
-rw-r--r-- | tvix/store/src/tests/import.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/tests/nar_renderer.rs | 186 |
20 files changed, 729 insertions, 694 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index ffa4426aef6a..3097d3c368da 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -5,18 +5,20 @@ edition = "2021" [dependencies] anyhow = "1.0.68" +async-stream = "0.3.5" blake3 = { version = "1.3.1", features = ["rayon", "std"] } clap = { version = "4.0", features = ["derive", "env"] } count-write = "0.1.0" data-encoding = "2.3.3" lazy_static = "1.4.0" nix-compat = { path = "../nix-compat" } +parking_lot = "0.12.1" prost = "0.11.2" rayon = "1.6.1" sha2 = "0.10.6" sled = { version = "0.34.7", features = ["compression"] } thiserror = "1.0.38" -tokio-stream = "0.1.14" +tokio-stream = { version = "0.1.14", features = ["fs"] } tokio = { version = "1.28.0", features = ["fs", "net", "rt-multi-thread", "signal"] } tonic = "0.8.2" tracing = "0.1.37" @@ -31,7 +33,6 @@ serde_json = "1.0" url = "2.4.0" pin-project-lite = "0.2.13" const-zero = "0.1.1" -parking_lot = "0.12.1" [dependencies.fuse-backend-rs] optional = true diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 1be8b00bd9b8..7761855cccb1 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -229,11 +229,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { // Ask the PathInfoService for the NAR size and sha256 let root_node_copy = root_node.clone(); let path_info_service_clone = path_info_service.clone(); - let (nar_size, nar_sha256) = tokio::task::spawn_blocking(move || { - path_info_service_clone.calculate_nar(&root_node_copy) - }) - .await - .unwrap()?; + let (nar_size, nar_sha256) = path_info_service_clone + .calculate_nar(&root_node_copy) + .await?; // TODO: make a path_to_name helper function? let name = path @@ -265,10 +263,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { // put into [PathInfoService], and return the PathInfo that we get back // from there (it might contain additional signatures). - let path_info = - tokio::task::spawn_blocking(move || path_info_service.put(path_info)) - .await - .unwrap()?; + let path_info = path_info_service.put(path_info).await?; let node = path_info.node.unwrap().node.unwrap(); diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index 73d88bb688a3..6257a8e81485 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -1,22 +1,24 @@ use std::collections::HashSet; +use std::pin::Pin; use super::{DirectoryPutter, DirectoryService}; use crate::proto::{self, get_directory_request::ByWhat}; use crate::{B3Digest, Error}; +use async_stream::try_stream; +use futures::Stream; use tokio::net::UnixStream; +use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic::async_trait; +use tonic::Code; use tonic::{transport::Channel, Status}; -use tonic::{Code, Streaming}; use tracing::{instrument, warn}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] pub struct GRPCDirectoryService { - /// A handle into the active tokio runtime. Necessary to spawn tasks. - tokio_handle: tokio::runtime::Handle, - /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, @@ -28,13 +30,11 @@ impl GRPCDirectoryService { pub fn from_client( grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, ) -> Self { - Self { - tokio_handle: tokio::runtime::Handle::current(), - grpc_client, - } + Self { grpc_client } } } +#[async_trait] impl DirectoryService for GRPCDirectoryService { /// Constructs a [GRPCDirectoryService] from the passed [url::Url]: /// - scheme has to match `grpc+*://`. @@ -89,11 +89,15 @@ impl DirectoryService for GRPCDirectoryService { } } } - fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> { + + async fn get( + &self, + digest: &B3Digest, + ) -> Result<Option<crate::proto::Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest_cpy = digest.clone(); - let task = self.tokio_handle.spawn(async move { + let message = async move { let mut s = grpc_client .get(proto::GetDirectoryRequest { recursive: false, @@ -104,10 +108,10 @@ impl DirectoryService for GRPCDirectoryService { // Retrieve the first message only, then close the stream (we set recursive to false) s.message().await - }); + }; let digest = digest.clone(); - match self.tokio_handle.block_on(task)? { + match message.await { Ok(Some(directory)) => { // Validate the retrieved Directory indeed has the // digest we expect it to have, to detect corruptions. @@ -134,14 +138,12 @@ impl DirectoryService for GRPCDirectoryService { } } - fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { let mut grpc_client = self.grpc_client.clone(); - let task = self - .tokio_handle - .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); + let resp = grpc_client.put(tokio_stream::iter(vec![directory])).await; - match self.tokio_handle.block_on(task)? { + match resp { Ok(put_directory_resp) => Ok(put_directory_resp .into_inner() .root_digest @@ -157,32 +159,82 @@ impl DirectoryService for GRPCDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> { + ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { let mut grpc_client = self.grpc_client.clone(); + let root_directory_digest = root_directory_digest.clone(); - // clone so we can move it - let root_directory_digest_cpy = root_directory_digest.clone(); - - let task: JoinHandle<Result<Streaming<proto::Directory>, Status>> = - self.tokio_handle.spawn(async move { - let s = grpc_client - .get(proto::GetDirectoryRequest { - recursive: true, - by_what: Some(ByWhat::Digest(root_directory_digest_cpy.into())), - }) - .await? - .into_inner(); - - Ok(s) - }); + let stream = try_stream! { + let mut stream = grpc_client + .get(proto::GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(root_directory_digest.clone().into())), + }) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); - let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); + // The Directory digests we received so far + let mut received_directory_digests: HashSet<B3Digest> = HashSet::new(); + // The Directory digests we're still expecting to get sent. + let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]); + + loop { + match stream.message().await { + Ok(Some(directory)) => { + // validate the directory itself. + if let Err(e) = directory.validate() { + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + directory.digest(), + e, + )))?; + } + // validate we actually expected that directory, and move it from expected to received. + let directory_digest = directory.digest(); + let was_expected = expected_directory_digests.remove(&directory_digest); + if !was_expected { + // FUTUREWORK: dumb clients might send the same stuff twice. + // as a fallback, we might want to tolerate receiving + // it if it's in received_directory_digests (as that + // means it once was in expected_directory_digests) + Err(crate::Error::StorageError(format!( + "received unexpected directory {}", + directory_digest + )))?; + } + received_directory_digests.insert(directory_digest); + + // register all children in expected_directory_digests. + for child_directory in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + let child_directory_digest = + child_directory.digest.clone().try_into().unwrap(); + + expected_directory_digests + .insert(child_directory_digest); + } + + yield directory; + }, + Ok(None) => { + // If we were still expecting something, that's an error. + if !expected_directory_digests.is_empty() { + Err(crate::Error::StorageError(format!( + "still expected {} directories, but got premature end of stream", + expected_directory_digests.len(), + )))? + } else { + return + } + }, + Err(e) => { + Err(crate::Error::StorageError(e.to_string()))?; + }, + } + } + }; - Box::new(StreamIterator::new( - self.tokio_handle.clone(), - root_directory_digest.clone(), - stream, - )) + Box::pin(stream) } #[instrument(skip_all)] @@ -194,110 +246,21 @@ impl DirectoryService for GRPCDirectoryService { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = - self.tokio_handle.spawn(async move { - let s = grpc_client - .put(UnboundedReceiverStream::new(rx)) - .await? - .into_inner(); - - Ok(s) - }); - - Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task)) - } -} - -pub struct StreamIterator { - /// A handle into the active tokio runtime. Necessary to run futures to completion. - tokio_handle: tokio::runtime::Handle, - // A stream of [proto::Directory] - stream: Streaming<proto::Directory>, - // The Directory digests we received so far - received_directory_digests: HashSet<B3Digest>, - // The Directory digests we're still expecting to get sent. - expected_directory_digests: HashSet<B3Digest>, -} - -impl StreamIterator { - pub fn new( - tokio_handle: tokio::runtime::Handle, - root_digest: B3Digest, - stream: Streaming<proto::Directory>, - ) -> Self { - Self { - tokio_handle, - stream, - received_directory_digests: HashSet::new(), - expected_directory_digests: HashSet::from([root_digest]), - } - } -} - -impl Iterator for StreamIterator { - type Item = Result<proto::Directory, crate::Error>; - - fn next(&mut self) -> Option<Self::Item> { - match self.tokio_handle.block_on(self.stream.message()) { - Ok(ok) => match ok { - Some(directory) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - return Some(Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))); - } - // validate we actually expected that directory, and move it from expected to received. - let directory_digest = directory.digest(); - let was_expected = self.expected_directory_digests.remove(&directory_digest); - if !was_expected { - // FUTUREWORK: dumb clients might send the same stuff twice. - // as a fallback, we might want to tolerate receiving - // it if it's in received_directory_digests (as that - // means it once was in expected_directory_digests) - return Some(Err(crate::Error::StorageError(format!( - "received unexpected directory {}", - directory_digest - )))); - } - self.received_directory_digests.insert(directory_digest); - - // register all children in expected_directory_digests. - for child_directory in &directory.directories { - // We ran validate() above, so we know these digests must be correct. - let child_directory_digest = - child_directory.digest.clone().try_into().unwrap(); + let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); - self.expected_directory_digests - .insert(child_directory_digest); - } + Ok(s) + }); - Some(Ok(directory)) - } - None => { - // If we were still expecting something, that's an error. - if !self.expected_directory_digests.is_empty() { - Some(Err(crate::Error::StorageError(format!( - "still expected {} directories, but got premature end of stream", - self.expected_directory_digests.len(), - )))) - } else { - None - } - } - }, - Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), - } + Box::new(GRPCPutter::new(tx, task)) } } /// Allows uploading multiple Directory messages in the same gRPC stream. pub struct GRPCPutter { - /// A handle into the active tokio runtime. Necessary to spawn tasks. - tokio_handle: tokio::runtime::Handle, - /// Data about the current request - a handle to the task, and the tx part /// of the channel. /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request. @@ -311,19 +274,18 @@ pub struct GRPCPutter { impl GRPCPutter { pub fn new( - tokio_handle: tokio::runtime::Handle, directory_sender: UnboundedSender<proto::Directory>, task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>, ) -> Self { Self { - tokio_handle, rq: Some((task, directory_sender)), } } } +#[async_trait] impl DirectoryPutter for GRPCPutter { - fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { match self.rq { // If we're not already closed, send the directory to directory_sender. Some((_, ref directory_sender)) => { @@ -331,7 +293,7 @@ impl DirectoryPutter for GRPCPutter { // If the channel has been prematurely closed, invoke close (so we can peek at the error code) // That error code is much more helpful, because it // contains the error message from the server. - self.close()?; + self.close().await?; } Ok(()) } @@ -343,7 +305,7 @@ impl DirectoryPutter for GRPCPutter { } /// Closes the stream for sending, and returns the value - fn close(&mut self) -> Result<B3Digest, crate::Error> { + async fn close(&mut self) -> Result<B3Digest, crate::Error> { // get self.rq, and replace it with None. // This ensures we can only close it once. match std::mem::take(&mut self.rq) { @@ -352,9 +314,8 @@ impl DirectoryPutter for GRPCPutter { // close directory_sender, so blocking on task will finish. drop(directory_sender); - let root_digest = self - .tokio_handle - .block_on(task)? + let root_digest = task + .await? .map_err(|e| Error::StorageError(e.to_string()))? .root_digest; @@ -379,6 +340,7 @@ mod tests { use core::time; use std::thread; + use futures::StreamExt; use tempfile::TempDir; use tokio::net::{UnixListener, UnixStream}; use tokio_stream::wrappers::UnixListenerStream; @@ -446,7 +408,7 @@ mod tests { ); } - let task = tester_runtime.spawn_blocking(move || { + tester_runtime.block_on(async move { // Create a channel, connecting to the uds at socket_path. // The URI is unused. let channel = Endpoint::try_from("http://[::]:50051") @@ -465,6 +427,7 @@ mod tests { None, directory_service .get(&DIRECTORY_A.digest()) + .await .expect("must not fail") ); @@ -473,6 +436,7 @@ mod tests { DIRECTORY_A.digest(), directory_service .put(DIRECTORY_A.clone()) + .await .expect("must succeed") ); @@ -481,6 +445,7 @@ mod tests { DIRECTORY_A.clone(), directory_service .get(&DIRECTORY_A.digest()) + .await .expect("must succeed") .expect("must be some") ); @@ -488,21 +453,22 @@ mod tests { // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. directory_service .put(DIRECTORY_B.clone()) + .await .expect_err("must fail"); // Putting DIRECTORY_B in a put_multiple will succeed, but the close // will always fail. { let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).expect("must succeed"); - handle.close().expect_err("must fail"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + handle.close().await.expect_err("must fail"); } // Uploading A and then B should succeed, and closing should return the digest of B. let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_A.clone()).expect("must succeed"); - handle.put(DIRECTORY_B.clone()).expect("must succeed"); - let digest = handle.close().expect("must succeed"); + handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + let digest = handle.close().await.expect("must succeed"); assert_eq!(DIRECTORY_B.digest(), digest); // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. @@ -511,6 +477,7 @@ mod tests { DIRECTORY_B.clone(), directories_it .next() + .await .expect("must be some") .expect("must succeed") ); @@ -518,6 +485,7 @@ mod tests { DIRECTORY_A.clone(), directories_it .next() + .await .expect("must be some") .expect("must succeed") ); @@ -529,15 +497,15 @@ mod tests { { let mut handle = directory_service.put_multiple_start(); // sending out B will always be fine - handle.put(DIRECTORY_B.clone()).expect("must succeed"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); // whether we will be able to put A as well depends on whether we // already received the error about B. - if handle.put(DIRECTORY_A.clone()).is_ok() { + if handle.put(DIRECTORY_A.clone()).await.is_ok() { // If we didn't, and this was Ok(_), … // a subsequent close MUST fail (because it waits for the // server) - handle.close().expect_err("must fail"); + handle.close().await.expect_err("must fail"); } } @@ -547,7 +515,7 @@ mod tests { // and then assert that uploading anything else via the handle will fail. { let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).expect("must succeed"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); let mut is_closed = false; for _try in 1..1000 { @@ -555,7 +523,7 @@ mod tests { is_closed = true; break; } - std::thread::sleep(time::Duration::from_millis(10)) + tokio::time::sleep(time::Duration::from_millis(10)).await; } assert!( @@ -563,12 +531,13 @@ mod tests { "expected channel to eventually close, but never happened" ); - handle.put(DIRECTORY_A.clone()).expect_err("must fail"); + handle + .put(DIRECTORY_A.clone()) + .await + .expect_err("must fail"); } }); - tester_runtime.block_on(task)?; - Ok(()) } } diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs index 634dbf9922d0..ac67c999d01b 100644 --- a/tvix/store/src/directoryservice/memory.rs +++ b/tvix/store/src/directoryservice/memory.rs @@ -1,16 +1,20 @@ use crate::{proto, B3Digest, Error}; +use futures::Stream; use std::collections::HashMap; +use std::pin::Pin; use std::sync::{Arc, RwLock}; +use tonic::async_trait; use tracing::{instrument, warn}; -use super::utils::SimplePutter; -use super::{DirectoryPutter, DirectoryService, DirectoryTraverser}; +use super::utils::{traverse_directory, SimplePutter}; +use super::{DirectoryPutter, DirectoryService}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, } +#[async_trait] impl DirectoryService for MemoryDirectoryService { /// Constructs a [MemoryDirectoryService] from the passed [url::Url]: /// - scheme has to be `memory://` @@ -27,8 +31,9 @@ impl DirectoryService for MemoryDirectoryService { Ok(Self::default()) } + #[instrument(skip(self, digest), fields(directory.digest = %digest))] - fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { let db = self.db.read()?; match db.get(digest) { @@ -62,7 +67,7 @@ impl DirectoryService for MemoryDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); // validate the directory itself. @@ -84,11 +89,8 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> { - Box::new(DirectoryTraverser::with( - self.clone(), - root_directory_digest, - )) + ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + traverse_directory(self.clone(), root_directory_digest) } #[instrument(skip_all)] diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index fea4191400f3..09210dfed8e8 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -1,4 +1,7 @@ use crate::{proto, B3Digest, Error}; +use futures::Stream; +use std::pin::Pin; +use tonic::async_trait; mod from_addr; mod grpc; @@ -12,32 +15,38 @@ pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService; pub use self::sled::SledDirectoryService; pub use self::traverse::traverse_to; -pub use self::utils::DirectoryTraverser; /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their /// digest. +#[async_trait] pub trait DirectoryService: Send + Sync { /// Create a new instance by passing in a connection URL. + /// TODO: check if we want to make this async, instead of lazily connecting fn from_url(url: &url::Url) -> Result<Self, Error> where Self: Sized; /// Get looks up a single Directory message by its digest. /// In case the directory is not found, Ok(None) is returned. - fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; /// Get uploads a single Directory message, and returns the calculated /// digest, or an error. - fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; /// Looks up a closure of [proto::Directory]. - /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`, + /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send>; + ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>; /// Allows persisting a closure of [proto::Directory], which is a graph of /// connected Directory messages. @@ -49,16 +58,17 @@ pub trait DirectoryService: Send + Sync { /// The consumer can periodically call [DirectoryPutter::put], starting from the /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to /// retrieve the root digest (or an error). +#[async_trait] pub trait DirectoryPutter: Send { /// Put a individual [proto::Directory] into the store. /// Error semantics and behaviour is up to the specific implementation of /// this trait. /// Due to bursting, the returned error might refer to an object previously /// sent via `put`. - fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; /// Close the stream, and wait for any errors. - fn close(&mut self) -> Result<B3Digest, Error>; + async fn close(&mut self) -> Result<B3Digest, Error>; /// Return whether the stream is closed or not. /// Used from some [DirectoryService] implementations only. diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs index e741434eabb5..0dc5496803cb 100644 --- a/tvix/store/src/directoryservice/sled.rs +++ b/tvix/store/src/directoryservice/sled.rs @@ -1,12 +1,15 @@ use crate::directoryservice::DirectoryPutter; use crate::proto::Directory; use crate::{proto, B3Digest, Error}; +use futures::Stream; use prost::Message; use std::path::PathBuf; +use std::pin::Pin; +use tonic::async_trait; use tracing::{instrument, warn}; -use super::utils::SimplePutter; -use super::{DirectoryService, DirectoryTraverser}; +use super::utils::{traverse_directory, SimplePutter}; +use super::DirectoryService; #[derive(Clone)] pub struct SledDirectoryService { @@ -29,6 +32,7 @@ impl SledDirectoryService { } } +#[async_trait] impl DirectoryService for SledDirectoryService { /// Constructs a [SledDirectoryService] from the passed [url::Url]: /// - scheme has to be `sled://` @@ -59,7 +63,7 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip(self, digest), fields(directory.digest = %digest))] - fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { match self.db.get(digest.to_vec()) { // The directory was not found, return Ok(None) => Ok(None), @@ -99,7 +103,7 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); // validate the directory itself. @@ -121,12 +125,8 @@ impl DirectoryService for SledDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Box<(dyn Iterator<Item = Result<proto::Directory, Error>> + std::marker::Send + 'static)> - { - Box::new(DirectoryTraverser::with( - self.clone(), - root_directory_digest, - )) + ) -> Pin<Box<(dyn Stream<Item = Result<proto::Directory, Error>> + Send + 'static)>> { + traverse_directory(self.clone(), root_directory_digest) } #[instrument(skip_all)] diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs index 9029543267c1..0489c4458163 100644 --- a/tvix/store/src/directoryservice/traverse.rs +++ b/tvix/store/src/directoryservice/traverse.rs @@ -1,19 +1,18 @@ use super::DirectoryService; -use crate::{proto::NamedNode, Error}; +use crate::{proto::NamedNode, B3Digest, Error}; use std::{os::unix::ffi::OsStrExt, sync::Arc}; use tracing::{instrument, warn}; /// This traverses from a (root) node to the given (sub)path, returning the Node /// at that path, or none, if there's nothing at that path. -/// TODO: Do we want to rewrite this in a non-recursing fashion, and use -/// [DirectoryService.get_recursive] to do less lookups? +/// TODO: Do we want to use [DirectoryService.get_recursive] to do less lookups? /// Or do we consider this to be a non-issue due to store composition and local caching? /// TODO: the name of this function (and mod) is a bit bad, because it doesn't /// clearly distinguish it from the BFS traversers. #[instrument(skip(directory_service))] -pub fn traverse_to( +pub async fn traverse_to( directory_service: Arc<dyn DirectoryService>, - node: crate::proto::node::Node, + root_node: crate::proto::node::Node, path: &std::path::Path, ) -> Result<Option<crate::proto::node::Node>, Error> { // strip a possible `/` prefix from the path. @@ -25,52 +24,56 @@ pub fn traverse_to( } }; + let mut cur_node = root_node; let mut it = path.components(); - match it.next() { - None => { - // the (remaining) path is empty, return the node we've been called with. - Ok(Some(node)) - } - Some(first_component) => { - match node { - crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => { - // There's still some path left, but the current node is no directory. - // This means the path doesn't exist, as we can't reach it. - Ok(None) - } - crate::proto::node::Node::Directory(directory_node) => { - let digest = directory_node - .digest - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - - // fetch the linked node from the directory_service - match directory_service.get(&digest)? { - // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! - None => { - warn!("directory {} does not exist", digest); - - Err(Error::StorageError(format!( - "directory {} does not exist", - digest - ))) - } - Some(directory) => { - // look for first_component in the [Directory]. - // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we - // could stop as soon as e.name is larger than the search string. - let child_node = directory - .nodes() - .find(|n| n.get_name() == first_component.as_os_str().as_bytes()); - - match child_node { - // child node not found means there's no such element inside the directory. - None => Ok(None), - // child node found, recurse with it and the rest of the path. - Some(child_node) => { - let rest_path: std::path::PathBuf = it.collect(); - traverse_to(directory_service, child_node, &rest_path) + loop { + match it.next() { + None => { + // the (remaining) path is empty, return the node we're current at. + return Ok(Some(cur_node)); + } + Some(first_component) => { + match cur_node { + crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => { + // There's still some path left, but the current node is no directory. + // This means the path doesn't exist, as we can't reach it. + return Ok(None); + } + crate::proto::node::Node::Directory(directory_node) => { + let digest: B3Digest = directory_node.digest.try_into().map_err(|_e| { + Error::StorageError("invalid digest length".to_string()) + })?; + + // fetch the linked node from the directory_service + match directory_service.get(&digest).await? { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + None => { + warn!("directory {} does not exist", digest); + + return Err(Error::StorageError(format!( + "directory {} does not exist", + digest + ))); + } + Some(directory) => { + // look for first_component in the [Directory]. + // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we + // could stop as soon as e.name is larger than the search string. + let child_node = directory.nodes().find(|n| { + n.get_name() == first_component.as_os_str().as_bytes() + }); + + match child_node { + // child node not found means there's no such element inside the directory. + None => { + return Ok(None); + } + // child node found, return to top-of loop to find the next + // node in the path. + Some(child_node) => { + cur_node = child_node; + } } } } @@ -92,16 +95,18 @@ mod tests { use super::traverse_to; - #[test] - fn test_traverse_to() { + #[tokio::test] + async fn test_traverse_to() { let directory_service = gen_directory_service(); let mut handle = directory_service.put_multiple_start(); handle .put(DIRECTORY_WITH_KEEP.clone()) + .await .expect("must succeed"); handle .put(DIRECTORY_COMPLICATED.clone()) + .await .expect("must succeed"); // construct the node for DIRECTORY_COMPLICATED @@ -128,6 +133,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from(""), ) + .await .expect("must succeed"); assert_eq!(Some(node_directory_complicated.clone()), resp); @@ -140,6 +146,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("keep"), ) + .await .expect("must succeed"); assert_eq!(Some(node_directory_with_keep), resp); @@ -152,6 +159,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("keep/.keep"), ) + .await .expect("must succeed"); assert_eq!(Some(node_file_keep.clone()), resp); @@ -164,6 +172,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("/keep/.keep"), ) + .await .expect("must succeed"); assert_eq!(Some(node_file_keep), resp); @@ -176,6 +185,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("void"), ) + .await .expect("must succeed"); assert_eq!(None, resp); @@ -188,6 +198,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("//v/oid"), ) + .await .expect("must succeed"); assert_eq!(None, resp); @@ -201,6 +212,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("keep/.keep/foo"), ) + .await .expect("must succeed"); assert_eq!(None, resp); @@ -213,6 +225,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("/"), ) + .await .expect("must succeed"); assert_eq!(Some(node_directory_complicated), resp); diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs index 95f02f1f9ce8..4c5e7cfde37c 100644 --- a/tvix/store/src/directoryservice/utils.rs +++ b/tvix/store/src/directoryservice/utils.rs @@ -3,103 +3,85 @@ use super::DirectoryService; use crate::proto; use crate::B3Digest; use crate::Error; +use async_stream::stream; +use futures::Stream; use std::collections::{HashSet, VecDeque}; -use tracing::{debug_span, instrument, warn}; +use std::pin::Pin; +use tonic::async_trait; +use tracing::warn; /// Traverses a [proto::Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. -pub struct DirectoryTraverser<DS: DirectoryService> { +pub fn traverse_directory<DS: DirectoryService + 'static>( directory_service: DS, - /// The list of all directories that still need to be traversed. The next - /// element is picked from the front, new elements are enqueued at the - /// back. - worklist_directory_digests: VecDeque<B3Digest>, - /// The list of directory digests already sent to the consumer. - /// We omit sending the same directories multiple times. - sent_directory_digests: HashSet<B3Digest>, -} - -impl<DS: DirectoryService> DirectoryTraverser<DS> { - pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self { - Self { - directory_service, - worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]), - sent_directory_digests: HashSet::new(), - } - } - - // enqueue all child directory digests to the work queue, as - // long as they're not part of the worklist or already sent. - // This panics if the digest looks invalid, it's supposed to be checked first. - fn enqueue_child_directories(&mut self, directory: &proto::Directory) { - for child_directory_node in &directory.directories { - // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); - - if self.worklist_directory_digests.contains(&child_digest) - || self.sent_directory_digests.contains(&child_digest) - { - continue; - } - self.worklist_directory_digests.push_back(child_digest); - } - } -} - -impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { - type Item = Result<proto::Directory, Error>; - - #[instrument(skip_all)] - fn next(&mut self) -> Option<Self::Item> { - // fetch the next directory digest from the top of the work queue. - match self.worklist_directory_digests.pop_front() { - None => None, - Some(current_directory_digest) => { - let span = debug_span!("directory.digest", "{}", current_directory_digest); - let _ = span.enter(); - - // look up the directory itself. - let current_directory = match self.directory_service.get(¤t_directory_digest) - { - // if we got it - Ok(Some(current_directory)) => { - // validate, we don't want to send invalid directories. - if let Err(e) = current_directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Some(Err(Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - )))); - } - current_directory - } - // if it's not there, we have an inconsistent store! - Ok(None) => { - warn!("directory {} does not exist", current_directory_digest); - return Some(Err(Error::StorageError(format!( - "directory {} does not exist", + root_directory_digest: &B3Digest, +) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + // The list of all directories that still need to be traversed. The next + // element is picked from the front, new elements are enqueued at the + // back. + let mut worklist_directory_digests: VecDeque<B3Digest> = + VecDeque::from([root_directory_digest.clone()]); + // The list of directory digests already sent to the consumer. + // We omit sending the same directories multiple times. + let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new(); + + let stream = stream! { + while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { + match directory_service.get(¤t_directory_digest).await { + // if it's not there, we have an inconsistent store! + Ok(None) => { + warn!("directory {} does not exist", current_directory_digest); + yield Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + ))); + } + Err(e) => { + warn!("failed to look up directory"); + yield Err(Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_digest, e + ))); + } + + // if we got it + Ok(Some(current_directory)) => { + // validate, we don't want to send invalid directories. + if let Err(e) = current_directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + yield Err(Error::StorageError(format!( + "invalid directory: {}", current_directory_digest - )))); - } - Err(e) => { - warn!("failed to look up directory"); - return Some(Err(Error::StorageError(format!( - "unable to look up directory {}: {}", - current_directory_digest, e - )))); + ))); } - }; - // All DirectoryServices MUST validate directory nodes, before returning them out, so we - // can be sure [enqueue_child_directories] doesn't panic. + // We're about to send this directory, so let's avoid sending it again if a + // descendant has it. + sent_directory_digests.insert(current_directory_digest); + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + for child_directory_node in ¤t_directory.directories { + // TODO: propagate error + let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + + if worklist_directory_digests.contains(&child_digest) + || sent_directory_digests.contains(&child_digest) + { + continue; + } + worklist_directory_digests.push_back(child_digest); + } - // enqueue child directories - self.enqueue_child_directories(¤t_directory); - Some(Ok(current_directory)) - } + yield Ok(current_directory); + } + }; } - } + }; + + Box::pin(stream) } /// This is a simple implementation of a Directory uploader. @@ -120,13 +102,14 @@ impl<DS: DirectoryService> SimplePutter<DS> { } } +#[async_trait] impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { - fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { if self.closed { return Err(Error::StorageError("already closed".to_string())); } - let digest = self.directory_service.put(directory)?; + let digest = self.directory_service.put(directory).await?; // track the last directory digest self.last_directory_digest = Some(digest); @@ -135,7 +118,7 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { } /// We need to be mutable here, as that's the signature of the trait. - fn close(&mut self) -> Result<B3Digest, Error> { + async fn close(&mut self) -> Result<B3Digest, Error> { if self.closed { return Err(Error::StorageError("already closed".to_string())); } diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs index 48e605406331..02d3bb3221ad 100644 --- a/tvix/store/src/fs/mod.rs +++ b/tvix/store/src/fs/mod.rs @@ -16,6 +16,7 @@ use crate::{ B3Digest, Error, }; use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; +use futures::StreamExt; use nix_compat::store_path::StorePath; use parking_lot::RwLock; use std::{ @@ -26,7 +27,10 @@ use std::{ sync::{atomic::Ordering, Arc}, time::Duration, }; -use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; +use tokio::{ + io::{AsyncBufReadExt, AsyncSeekExt}, + sync::mpsc, +}; use tracing::{debug, info_span, warn}; use self::{ @@ -159,7 +163,11 @@ impl TvixStoreFs { ))) } else { // If we don't have it, look it up in PathInfoService. - match self.path_info_service.get(store_path.digest)? { + let path_info_service = self.path_info_service.clone(); + let task = self + .tokio_handle + .spawn(async move { path_info_service.get(store_path.digest).await }); + match self.tokio_handle.block_on(task).unwrap()? { // the pathinfo doesn't exist, so the file doesn't exist. None => Ok(None), Some(path_info) => { @@ -204,7 +212,12 @@ impl TvixStoreFs { /// This is both used to initially insert the root node of a store path, /// as well as when looking up an intermediate DirectoryNode. fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> { - match self.directory_service.get(directory_digest) { + let directory_service = self.directory_service.clone(); + let directory_digest_clone = directory_digest.clone(); + let task = self + .tokio_handle + .spawn(async move { directory_service.get(&directory_digest_clone).await }); + match self.tokio_handle.block_on(task).unwrap() { Err(e) => { warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory"); Err(e) @@ -369,12 +382,23 @@ impl FileSystem for TvixStoreFs { if !self.list_root { return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } else { - for (i, path_info) in self - .path_info_service - .list() - .skip(offset as usize) - .enumerate() - { + let path_info_service = self.path_info_service.clone(); + let (tx, mut rx) = mpsc::channel(16); + + // This task will run in the background immediately and will exit + // after the stream ends or if we no longer want any more entries. + self.tokio_handle.spawn(async move { + let mut stream = path_info_service.list().skip(offset as usize).enumerate(); + while let Some(path_info) = stream.next().await { + if tx.send(path_info).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } + } + }); + + while let Some((i, path_info)) = rx.blocking_recv() { let path_info = match path_info { Err(e) => { warn!("failed to retrieve pathinfo: {}", e); @@ -421,6 +445,7 @@ impl FileSystem for TvixStoreFs { break; } } + return Ok(()); } } diff --git a/tvix/store/src/fs/tests.rs b/tvix/store/src/fs/tests.rs index 30f5ca3f40aa..6837f8aa293a 100644 --- a/tvix/store/src/fs/tests.rs +++ b/tvix/store/src/fs/tests.rs @@ -1,8 +1,10 @@ +use futures::StreamExt; use std::io::Cursor; use std::os::unix::prelude::MetadataExt; use std::path::Path; use std::sync::Arc; -use std::{fs, io}; +use tokio::{fs, io}; +use tokio_stream::wrappers::ReadDirStream; use tempfile::TempDir; @@ -75,7 +77,10 @@ async fn populate_blob_a( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } async fn populate_blob_b( @@ -102,7 +107,10 @@ async fn populate_blob_b( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// adds a blob containing helloworld and marks it as executable @@ -133,10 +141,13 @@ async fn populate_helloworld_blob( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } -fn populate_symlink( +async fn populate_symlink( _blob_service: &Arc<dyn BlobService>, _directory_service: &Arc<dyn DirectoryService>, path_info_service: &Arc<dyn PathInfoService>, @@ -151,12 +162,15 @@ fn populate_symlink( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// This writes a symlink pointing to /nix/store/somewhereelse, /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. -fn populate_symlink2( +async fn populate_symlink2( _blob_service: &Arc<dyn BlobService>, _directory_service: &Arc<dyn DirectoryService>, path_info_service: &Arc<dyn PathInfoService>, @@ -171,7 +185,10 @@ fn populate_symlink2( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } async fn populate_directory_with_keep( @@ -189,6 +206,7 @@ async fn populate_directory_with_keep( // upload directory directory_service .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await .expect("must succeed uploading"); // upload pathinfo @@ -202,12 +220,15 @@ async fn populate_directory_with_keep( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory /// itself. -fn populate_pathinfo_without_directory( +async fn populate_pathinfo_without_directory( _: &Arc<dyn BlobService>, _: &Arc<dyn DirectoryService>, path_info_service: &Arc<dyn PathInfoService>, @@ -223,11 +244,14 @@ fn populate_pathinfo_without_directory( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// Insert , but don't provide the blob .keep is pointing to -fn populate_blob_a_without_blob( +async fn populate_blob_a_without_blob( _: &Arc<dyn BlobService>, _: &Arc<dyn DirectoryService>, path_info_service: &Arc<dyn PathInfoService>, @@ -244,7 +268,10 @@ fn populate_blob_a_without_blob( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } async fn populate_directory_complicated( @@ -262,11 +289,13 @@ async fn populate_directory_complicated( // upload inner directory directory_service .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await .expect("must succeed uploading"); // uplodad parent directory directory_service .put(fixtures::DIRECTORY_COMPLICATED.clone()) + .await .expect("must succeed uploading"); // upload pathinfo @@ -280,7 +309,10 @@ async fn populate_directory_complicated( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// Ensure mounting itself doesn't fail @@ -329,9 +361,13 @@ async fn root() { { // read_dir succeeds, but getting the first element will fail. - let mut it = fs::read_dir(tmpdir).expect("must succeed"); + let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed")); - let err = it.next().expect("must be some").expect_err("must be err"); + let err = it + .next() + .await + .expect("must be some") + .expect_err("must be err"); assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); } @@ -362,11 +398,15 @@ async fn root_with_listing() { { // read_dir succeeds, but getting the first element will fail. - let mut it = fs::read_dir(tmpdir).expect("must succeed"); + let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed")); - let e = it.next().expect("must be some").expect("must succeed"); + let e = it + .next() + .await + .expect("must be some") + .expect("must succeed"); - let metadata = e.metadata().expect("must succeed"); + let metadata = e.metadata().await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); @@ -400,7 +440,7 @@ async fn stat_file_at_root() { let p = tmpdir.path().join(BLOB_A_NAME); // peek at the file metadata - let metadata = fs::metadata(p).expect("must succeed"); + let metadata = fs::metadata(p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); @@ -434,7 +474,7 @@ async fn read_file_at_root() { let p = tmpdir.path().join(BLOB_A_NAME); // read the file contents - let data = fs::read(p).expect("must succeed"); + let data = fs::read(p).await.expect("must succeed"); // ensure size and contents match assert_eq!(fixtures::BLOB_A.len(), data.len()); @@ -468,7 +508,7 @@ async fn read_large_file_at_root() { let p = tmpdir.path().join(BLOB_B_NAME); { // peek at the file metadata - let metadata = fs::metadata(&p).expect("must succeed"); + let metadata = fs::metadata(&p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); @@ -476,7 +516,7 @@ async fn read_large_file_at_root() { } // read the file contents - let data = fs::read(p).expect("must succeed"); + let data = fs::read(p).await.expect("must succeed"); // ensure size and contents match assert_eq!(fixtures::BLOB_B.len(), data.len()); @@ -496,7 +536,7 @@ async fn symlink_readlink() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_symlink(&blob_service, &directory_service, &path_info_service); + populate_symlink(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -509,20 +549,20 @@ async fn symlink_readlink() { let p = tmpdir.path().join(SYMLINK_NAME); - let target = fs::read_link(&p).expect("must succeed"); + let target = fs::read_link(&p).await.expect("must succeed"); assert_eq!(BLOB_A_NAME, target.to_str().unwrap()); // peek at the file metadata, which follows symlinks. // this must fail, as we didn't populate the target. - let e = fs::metadata(&p).expect_err("must fail"); + let e = fs::metadata(&p).await.expect_err("must fail"); assert_eq!(std::io::ErrorKind::NotFound, e.kind()); // peeking at the file metadata without following symlinks will succeed. - let metadata = fs::symlink_metadata(&p).expect("must succeed"); + let metadata = fs::symlink_metadata(&p).await.expect("must succeed"); assert!(metadata.is_symlink()); // reading from the symlink (which follows) will fail, because the target doesn't exist. - let e = fs::read(p).expect_err("must fail"); + let e = fs::read(p).await.expect_err("must fail"); assert_eq!(std::io::ErrorKind::NotFound, e.kind()); fuse_daemon.unmount().expect("unmount"); @@ -540,7 +580,7 @@ async fn read_stat_through_symlink() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - populate_symlink(&blob_service, &directory_service, &path_info_service); + populate_symlink(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -556,16 +596,16 @@ async fn read_stat_through_symlink() { // peek at the file metadata, which follows symlinks. // this must now return the same metadata as when statting at the target directly. - let metadata_symlink = fs::metadata(&p_symlink).expect("must succeed"); - let metadata_blob = fs::metadata(&p_blob).expect("must succeed"); + let metadata_symlink = fs::metadata(&p_symlink).await.expect("must succeed"); + let metadata_blob = fs::metadata(&p_blob).await.expect("must succeed"); assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type()); assert_eq!(metadata_blob.len(), metadata_symlink.len()); // reading from the symlink (which follows) will return the same data as if // we were reading from the file directly. assert_eq!( - std::fs::read(p_blob).expect("must succeed"), - std::fs::read(p_symlink).expect("must succeed"), + fs::read(p_blob).await.expect("must succeed"), + fs::read(p_symlink).await.expect("must succeed"), ); fuse_daemon.unmount().expect("unmount"); @@ -596,7 +636,7 @@ async fn read_stat_directory() { let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); // peek at the metadata of the directory - let metadata = fs::metadata(p).expect("must succeed"); + let metadata = fs::metadata(p).await.expect("must succeed"); assert!(metadata.is_dir()); assert!(metadata.permissions().readonly()); @@ -628,12 +668,12 @@ async fn read_blob_inside_dir() { let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); // peek at metadata. - let metadata = fs::metadata(&p).expect("must succeed"); + let metadata = fs::metadata(&p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); // read from it - let data = fs::read(&p).expect("must succeed"); + let data = fs::read(&p).await.expect("must succeed"); assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); fuse_daemon.unmount().expect("unmount"); @@ -669,12 +709,12 @@ async fn read_blob_deep_inside_dir() { .join(".keep"); // peek at metadata. - let metadata = fs::metadata(&p).expect("must succeed"); + let metadata = fs::metadata(&p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); // read from it - let data = fs::read(&p).expect("must succeed"); + let data = fs::read(&p).await.expect("must succeed"); assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); fuse_daemon.unmount().expect("unmount"); @@ -706,10 +746,10 @@ async fn readdir() { { // read_dir should succeed. Collect all elements - let elements: Vec<_> = fs::read_dir(p) - .expect("must succeed") + let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed")) .map(|e| e.expect("must not be err")) - .collect(); + .collect() + .await; assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and .. @@ -719,18 +759,18 @@ async fn readdir() { // ".keep", 0 byte file. let e = &elements[0]; assert_eq!(".keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_file()); - assert_eq!(0, e.metadata().expect("must succeed").len()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); // "aa", symlink. let e = &elements[1]; assert_eq!("aa", e.file_name()); - assert!(e.file_type().expect("must succeed").is_symlink()); + assert!(e.file_type().await.expect("must succeed").is_symlink()); // "keep", directory let e = &elements[2]; assert_eq!("keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_dir()); + assert!(e.file_type().await.expect("must succeed").is_dir()); } fuse_daemon.unmount().expect("unmount"); @@ -762,18 +802,18 @@ async fn readdir_deep() { { // read_dir should succeed. Collect all elements - let elements: Vec<_> = fs::read_dir(p) - .expect("must succeed") + let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed")) .map(|e| e.expect("must not be err")) - .collect(); + .collect() + .await; assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and .. // ".keep", 0 byte file. let e = &elements[0]; assert_eq!(".keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_file()); - assert_eq!(0, e.metadata().expect("must succeed").len()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); } fuse_daemon.unmount().expect("unmount"); @@ -792,7 +832,7 @@ async fn check_attributes() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a(&blob_service, &directory_service, &path_info_service).await; populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - populate_symlink(&blob_service, &directory_service, &path_info_service); + populate_symlink(&blob_service, &directory_service, &path_info_service).await; populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( @@ -810,10 +850,16 @@ async fn check_attributes() { let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME); // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident. - let metadata_file = fs::symlink_metadata(&p_file).expect("must succeed"); - let metadata_executable_file = fs::symlink_metadata(&p_executable_file).expect("must succeed"); - let metadata_directory = fs::symlink_metadata(&p_directory).expect("must succeed"); - let metadata_symlink = fs::symlink_metadata(&p_symlink).expect("must succeed"); + let metadata_file = fs::symlink_metadata(&p_file).await.expect("must succeed"); + let metadata_executable_file = fs::symlink_metadata(&p_executable_file) + .await + .expect("must succeed"); + let metadata_directory = fs::symlink_metadata(&p_directory) + .await + .expect("must succeed"); + let metadata_symlink = fs::symlink_metadata(&p_symlink) + .await + .expect("must succeed"); // modes should match. We & with 0o777 to remove any higher bits. assert_eq!(0o444, metadata_file.mode() & 0o777); @@ -873,8 +919,14 @@ async fn compare_inodes_directories() { // peek at metadata. assert_eq!( - fs::metadata(p_dir_with_keep).expect("must succeed").ino(), - fs::metadata(p_sibling_dir).expect("must succeed").ino() + fs::metadata(p_dir_with_keep) + .await + .expect("must succeed") + .ino(), + fs::metadata(p_sibling_dir) + .await + .expect("must succeed") + .ino() ); fuse_daemon.unmount().expect("unmount"); @@ -912,8 +964,8 @@ async fn compare_inodes_files() { // peek at metadata. assert_eq!( - fs::metadata(p_keep1).expect("must succeed").ino(), - fs::metadata(p_keep2).expect("must succeed").ino() + fs::metadata(p_keep1).await.expect("must succeed").ino(), + fs::metadata(p_keep2).await.expect("must succeed").ino() ); fuse_daemon.unmount().expect("unmount"); @@ -932,7 +984,7 @@ async fn compare_inodes_symlinks() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - populate_symlink2(&blob_service, &directory_service, &path_info_service); + populate_symlink2(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -948,8 +1000,8 @@ async fn compare_inodes_symlinks() { // peek at metadata. assert_eq!( - fs::symlink_metadata(p1).expect("must succeed").ino(), - fs::symlink_metadata(p2).expect("must succeed").ino() + fs::symlink_metadata(p1).await.expect("must succeed").ino(), + fs::symlink_metadata(p2).await.expect("must succeed").ino() ); fuse_daemon.unmount().expect("unmount"); @@ -978,28 +1030,32 @@ async fn read_wrong_paths_in_root() { .expect("must succeed"); // wrong name - assert!(!tmpdir - .path() - .join("00000000000000000000000000000000-tes") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); // invalid hash - assert!(!tmpdir - .path() - .join("0000000000000000000000000000000-test") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("0000000000000000000000000000000-test")) + .await + .is_err() + ); // right name, must exist - assert!(tmpdir - .path() - .join("00000000000000000000000000000000-test") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-test")) + .await + .is_ok() + ); // now wrong name with right hash still may not exist - assert!(!tmpdir - .path() - .join("00000000000000000000000000000000-tes") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); fuse_daemon.unmount().expect("unmount"); } @@ -1027,7 +1083,7 @@ async fn disallow_writes() { .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); - let e = std::fs::File::create(p).expect_err("must fail"); + let e = fs::File::create(p).await.expect_err("must fail"); assert_eq!(Some(libc::EROFS), e.raw_os_error()); @@ -1044,7 +1100,8 @@ async fn missing_directory() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service); + populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service) + .await; let mut fuse_daemon = do_mount( blob_service, @@ -1059,19 +1116,19 @@ async fn missing_directory() { { // `stat` on the path should succeed, because it doesn't trigger the directory request. - fs::metadata(&p).expect("must succeed"); + fs::metadata(&p).await.expect("must succeed"); // However, calling either `readdir` or `stat` on a child should fail with an IO error. // It fails when trying to pull the first entry, because we don't implement opendir separately - fs::read_dir(&p) - .unwrap() + ReadDirStream::new(fs::read_dir(&p).await.unwrap()) .next() + .await .expect("must be some") .expect_err("must be err"); // rust currently sets e.kind() to Uncategorized, which isn't very // helpful, so we don't look at the error more closely than that.. - fs::metadata(p.join(".keep")).expect_err("must fail"); + fs::metadata(p.join(".keep")).await.expect_err("must fail"); } fuse_daemon.unmount().expect("unmount"); @@ -1087,7 +1144,7 @@ async fn missing_blob() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service); + populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -1102,12 +1159,12 @@ async fn missing_blob() { { // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service. - fs::metadata(&p).expect("must succeed"); + fs::metadata(&p).await.expect("must succeed"); // However, calling read on the blob should fail. // rust currently sets e.kind() to Uncategorized, which isn't very // helpful, so we don't look at the error more closely than that.. - fs::read(p).expect_err("must fail"); + fs::read(p).await.expect_err("must fail"); } fuse_daemon.unmount().expect("unmount"); diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index 6764eaddb457..6eebe500d275 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -72,6 +72,7 @@ async fn process_entry( // upload this directory directory_putter .put(directory) + .await .map_err(|e| Error::UploadDirectoryError(entry.path().to_path_buf(), e))?; return Ok(proto::node::Node::Directory(proto::DirectoryNode { diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index 4255148fc5e8..f1392472a50e 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -8,20 +8,20 @@ use count_write::CountWrite; use nix_compat::nar; use sha2::{Digest, Sha256}; use std::{io, sync::Arc}; -use tokio::io::BufReader; +use tokio::{io::BufReader, task::spawn_blocking}; use tracing::warn; /// Invoke [write_nar], and return the size and sha256 digest of the produced /// NAR output. -pub fn calculate_size_and_sha256( +pub async fn calculate_size_and_sha256( root_node: &proto::node::Node, blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) -> Result<(u64, [u8; 32]), RenderError> { let h = Sha256::new(); - let mut cw = CountWrite::from(h); + let cw = CountWrite::from(h); - write_nar(&mut cw, root_node, blob_service, directory_service)?; + let cw = write_nar(cw, root_node, blob_service, directory_service).await?; Ok((cw.count(), cw.into_inner().finalize().into())) } @@ -30,26 +30,44 @@ pub fn calculate_size_and_sha256( /// and uses the passed blob_service and directory_service to /// perform the necessary lookups as it traverses the structure. /// The contents in NAR serialization are writen to the passed [std::io::Write]. -pub fn write_nar<W: std::io::Write>( - w: &mut W, +/// +/// The writer is passed back in the return value. This is done because async Rust +/// lacks scoped blocking tasks, so we need to transfer ownership of the writer +/// internally. +/// +/// # Panics +/// This will panic if called outside the context of a Tokio runtime. +pub async fn write_nar<W: std::io::Write + Send + 'static>( + mut w: W, proto_root_node: &proto::node::Node, blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, -) -> Result<(), RenderError> { - // Initialize NAR writer - let nar_root_node = nar::writer::open(w).map_err(RenderError::NARWriterError)?; +) -> Result<W, RenderError> { + let tokio_handle = tokio::runtime::Handle::current(); + let proto_root_node = proto_root_node.clone(); + + spawn_blocking(move || { + // Initialize NAR writer + let nar_root_node = nar::writer::open(&mut w).map_err(RenderError::NARWriterError)?; - walk_node( - nar_root_node, - proto_root_node, - blob_service, - directory_service, - ) + walk_node( + tokio_handle, + nar_root_node, + &proto_root_node, + blob_service, + directory_service, + )?; + + Ok(w) + }) + .await + .unwrap() } /// Process an intermediate node in the structure. /// This consumes the node. fn walk_node( + tokio_handle: tokio::runtime::Handle, nar_node: nar::writer::Node, proto_node: &proto::node::Node, blob_service: Arc<dyn BlobService>, @@ -73,9 +91,6 @@ fn walk_node( )) })?; - // HACK: blob_service is async, but this function isn't async yet.. - let tokio_handle = tokio::runtime::Handle::current(); - let blob_reader = match tokio_handle .block_on(async { blob_service.open_read(&digest).await }) .map_err(RenderError::StoreError)? @@ -107,11 +122,10 @@ fn walk_node( })?; // look it up with the directory service - let resp = directory_service - .get(&digest) - .map_err(RenderError::StoreError)?; - - match resp { + match tokio_handle + .block_on(async { directory_service.get(&digest).await }) + .map_err(RenderError::StoreError)? + { // if it's None, that's an error! None => { return Err(RenderError::DirectoryNotFound( @@ -131,6 +145,7 @@ fn walk_node( .entry(proto_node.get_name()) .map_err(RenderError::NARWriterError)?; walk_node( + tokio_handle.clone(), child_node, &proto_node, blob_service.clone(), diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 1649655b69ea..c116ddbc8905 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -4,16 +4,15 @@ use crate::{ directoryservice::DirectoryService, proto::{self, ListPathInfoRequest}, }; -use std::sync::Arc; -use tokio::{net::UnixStream, task::JoinHandle}; -use tonic::{transport::Channel, Code, Status, Streaming}; +use async_stream::try_stream; +use futures::Stream; +use std::{pin::Pin, sync::Arc}; +use tokio::net::UnixStream; +use tonic::{async_trait, transport::Channel, Code}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] pub struct GRPCPathInfoService { - /// A handle into the active tokio runtime. Necessary to spawn tasks. - tokio_handle: tokio::runtime::Handle, - /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, @@ -25,13 +24,11 @@ impl GRPCPathInfoService { pub fn from_client( grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, ) -> Self { - Self { - tokio_handle: tokio::runtime::Handle::current(), - grpc_client, - } + Self { grpc_client } } } +#[async_trait] impl PathInfoService for GRPCPathInfoService { /// Constructs a [GRPCPathInfoService] from the passed [url::Url]: /// - scheme has to match `grpc+*://`. @@ -92,47 +89,39 @@ impl PathInfoService for GRPCPathInfoService { } } - fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> { + async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); - let task: JoinHandle<Result<proto::PathInfo, Status>> = - self.tokio_handle.spawn(async move { - let path_info = grpc_client - .get(proto::GetPathInfoRequest { - by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( - digest.to_vec().into(), - )), - }) - .await? - .into_inner(); - - Ok(path_info) - }); + let path_info = grpc_client + .get(proto::GetPathInfoRequest { + by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( + digest.to_vec().into(), + )), + }) + .await; - match self.tokio_handle.block_on(task)? { - Ok(path_info) => Ok(Some(path_info)), + match path_info { + Ok(path_info) => Ok(Some(path_info.into_inner())), Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(crate::Error::StorageError(e.to_string())), } } - fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> { + async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); - let task: JoinHandle<Result<proto::PathInfo, Status>> = - self.tokio_handle.spawn(async move { - let path_info = grpc_client.put(path_info).await?.into_inner(); - Ok(path_info) - }); + let path_info = grpc_client + .put(path_info) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); - self.tokio_handle - .block_on(task)? - .map_err(|e| crate::Error::StorageError(e.to_string())) + Ok(path_info) } - fn calculate_nar( + async fn calculate_nar( &self, root_node: &proto::node::Node, ) -> Result<(u64, [u8; 32]), crate::Error> { @@ -140,83 +129,54 @@ impl PathInfoService for GRPCPathInfoService { let mut grpc_client = self.grpc_client.clone(); let root_node = root_node.clone(); - let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move { - let path_info = grpc_client - .calculate_nar(proto::Node { - node: Some(root_node), - }) - .await? - .into_inner(); - Ok(path_info) - }); - - let resp = self - .tokio_handle - .block_on(task)? - .map_err(|e| crate::Error::StorageError(e.to_string()))?; + let path_info = grpc_client + .calculate_nar(proto::Node { + node: Some(root_node), + }) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); - let nar_sha256: [u8; 32] = resp + let nar_sha256: [u8; 32] = path_info .nar_sha256 .to_vec() .try_into() .map_err(|_e| crate::Error::StorageError("invalid digest length".to_string()))?; - Ok((resp.nar_size, nar_sha256)) + Ok((path_info.nar_size, nar_sha256)) } - fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> { - // Get a new handle to the gRPC client. + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, crate::Error>> + Send>> { let mut grpc_client = self.grpc_client.clone(); - let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move { - let s = grpc_client - .list(ListPathInfoRequest::default()) - .await? - .into_inner(); - - Ok(s) - }); - - let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); - - Box::new(StreamIterator::new(self.tokio_handle.clone(), stream)) - } -} - -pub struct StreamIterator { - tokio_handle: tokio::runtime::Handle, - stream: Streaming<proto::PathInfo>, -} - -impl StreamIterator { - pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self { - Self { - tokio_handle, - stream, - } - } -} - -impl Iterator for StreamIterator { - type Item = Result<proto::PathInfo, crate::Error>; - - fn next(&mut self) -> Option<Self::Item> { - match self.tokio_handle.block_on(self.stream.message()) { - Ok(o) => match o { - Some(pathinfo) => { - // validate the pathinfo - if let Err(e) = pathinfo.validate() { - return Some(Err(crate::Error::StorageError(format!( - "pathinfo {:?} failed validation: {}", - pathinfo, e - )))); - } - Some(Ok(pathinfo)) + let stream = try_stream! { + let resp = grpc_client.list(ListPathInfoRequest::default()).await; + + let mut stream = resp.map_err(|e| crate::Error::StorageError(e.to_string()))?.into_inner(); + + loop { + match stream.message().await { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + Err(crate::Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))?; + } + yield pathinfo + } + None => { + return; + }, + }, + Err(e) => Err(crate::Error::StorageError(e.to_string()))?, } - None => None, - }, - Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), - } + } + }; + + Box::pin(stream) } } @@ -227,7 +187,6 @@ mod tests { use tempfile::TempDir; use tokio::net::UnixListener; - use tokio::task; use tokio::time; use tokio_stream::wrappers::UnixListenerStream; @@ -377,13 +336,10 @@ mod tests { ); } - let pi = task::spawn_blocking(move || { - client - .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap()) - .expect("must not be error") - }) - .await - .expect("must not be err"); + let pi = client + .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap()) + .await + .expect("must not be error"); assert!(pi.is_none()); } diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index aba1216c6e96..4cdc411ffb28 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -3,10 +3,13 @@ use crate::{ blobservice::BlobService, directoryservice::DirectoryService, nar::calculate_size_and_sha256, proto, Error, }; +use futures::{stream::iter, Stream}; use std::{ collections::HashMap, + pin::Pin, sync::{Arc, RwLock}, }; +use tonic::async_trait; pub struct MemoryPathInfoService { db: Arc<RwLock<HashMap<[u8; 20], proto::PathInfo>>>, @@ -28,6 +31,7 @@ impl MemoryPathInfoService { } } +#[async_trait] impl PathInfoService for MemoryPathInfoService { /// Constructs a [MemoryPathInfoService] from the passed [url::Url]: /// - scheme has to be `memory://` @@ -49,7 +53,7 @@ impl PathInfoService for MemoryPathInfoService { Ok(Self::new(blob_service, directory_service)) } - fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { + async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { let db = self.db.read().unwrap(); match db.get(&digest) { @@ -58,7 +62,7 @@ impl PathInfoService for MemoryPathInfoService { } } - fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { + async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { // Call validate on the received PathInfo message. match path_info.validate() { Err(e) => Err(Error::InvalidRequest(format!( @@ -77,16 +81,17 @@ impl PathInfoService for MemoryPathInfoService { } } - fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { + async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { calculate_size_and_sha256( root_node, self.blob_service.clone(), self.directory_service.clone(), ) + .await .map_err(|e| Error::StorageError(e.to_string())) } - fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_> { + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>> { let db = self.db.read().unwrap(); // Copy all elements into a list. @@ -95,7 +100,7 @@ impl PathInfoService for MemoryPathInfoService { // memory impl is only for testing purposes anyways. let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); - Box::new(items.into_iter()) + Box::pin(iter(items)) } } diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index 51d3e51115eb..a460c35a0289 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -3,8 +3,12 @@ mod grpc; mod memory; mod sled; +use std::pin::Pin; use std::sync::Arc; +use futures::Stream; +use tonic::async_trait; + use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::{proto, Error}; @@ -16,10 +20,12 @@ pub use self::sled::SledPathInfoService; /// The base trait all PathInfo services need to implement. /// This is a simple get and put of [proto::Directory], returning their digest. +#[async_trait] pub trait PathInfoService: Send + Sync { /// Create a new instance by passing in a connection URL, as well /// as instances of a [PathInfoService] and [DirectoryService] (as the /// [PathInfoService] needs to talk to them). + /// TODO: check if we want to make this async, instead of lazily connecting fn from_url( url: &url::Url, blob_service: Arc<dyn BlobService>, @@ -29,18 +35,23 @@ pub trait PathInfoService: Send + Sync { Self: Sized; /// Retrieve a PathInfo message by the output digest. - fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error>; + async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error>; /// Store a PathInfo message. Implementations MUST call validate and reject /// invalid messages. - fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error>; + async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error>; /// Return the nar size and nar sha256 digest for a given root node. /// This can be used to calculate NAR-based output paths, /// and implementations are encouraged to cache it. - fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>; + async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>; /// Iterate over all PathInfo objects in the store. /// Implementations can decide to disallow listing. - fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_>; + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>>; } diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 4f327626d19d..a9d0b029ee6b 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -3,8 +3,10 @@ use crate::{ blobservice::BlobService, directoryservice::DirectoryService, nar::calculate_size_and_sha256, proto, Error, }; +use futures::{stream::iter, Stream}; use prost::Message; -use std::{path::PathBuf, sync::Arc}; +use std::{path::PathBuf, pin::Pin, sync::Arc}; +use tonic::async_trait; use tracing::warn; /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). @@ -49,6 +51,7 @@ impl SledPathInfoService { } } +#[async_trait] impl PathInfoService for SledPathInfoService { /// Constructs a [SledPathInfoService] from the passed [url::Url]: /// - scheme has to be `sled://` @@ -84,7 +87,7 @@ impl PathInfoService for SledPathInfoService { } } - fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { + async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { match self.db.get(digest) { Ok(None) => Ok(None), Ok(Some(data)) => match proto::PathInfo::decode(&*data) { @@ -107,7 +110,7 @@ impl PathInfoService for SledPathInfoService { } } - fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { + async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { // Call validate on the received PathInfo message. match path_info.validate() { Err(e) => Err(Error::InvalidRequest(format!( @@ -128,17 +131,18 @@ impl PathInfoService for SledPathInfoService { } } - fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { + async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { calculate_size_and_sha256( root_node, self.blob_service.clone(), self.directory_service.clone(), ) + .await .map_err(|e| Error::StorageError(e.to_string())) } - fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send> { - Box::new(self.db.iter().values().map(|v| match v { + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>> { + Box::pin(iter(self.db.iter().values().map(|v| match v { Ok(data) => { // we retrieved some bytes match proto::PathInfo::decode(&*data) { @@ -159,7 +163,7 @@ impl PathInfoService for SledPathInfoService { e ))) } - })) + }))) } } diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs index ec53d7d76ce3..5e143a7bd7a8 100644 --- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -1,5 +1,6 @@ use crate::proto; use crate::{directoryservice::DirectoryService, B3Digest}; +use futures::StreamExt; use std::collections::HashMap; use std::sync::Arc; use tokio::{sync::mpsc::channel, task}; @@ -47,7 +48,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW task::spawn(async move { if !req_inner.recursive { let e: Result<proto::Directory, Status> = - match directory_service.get(&digest) { + match directory_service.get(&digest).await { Ok(Some(directory)) => Ok(directory), Ok(None) => Err(Status::not_found(format!( "directory {} not found", @@ -61,9 +62,9 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW } } else { // If recursive was requested, traverse via get_recursive. - let directories_it = directory_service.get_recursive(&digest); + let mut directories_it = directory_service.get_recursive(&digest); - for e in directories_it { + while let Some(e) = directories_it.next().await { // map err in res from Error to Status let res = e.map_err(|e| Status::internal(e.to_string())); if tx.send(res).await.is_err() { @@ -157,7 +158,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW // check if the directory already exists in the database. We can skip // inserting if it's already there, as that'd be a no-op. - match self.directory_service.get(&dgst) { + match self.directory_service.get(&dgst).await { Err(e) => { warn!("error checking if directory already exists: {}", e); return Err(e.into()); @@ -166,7 +167,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW Ok(Some(_)) => {} // insert if it doesn't already exist Ok(None) => { - self.directory_service.put(directory)?; + self.directory_service.put(directory).await?; } } } diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 33861d9ffa4e..14ceb34c3af7 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -1,6 +1,7 @@ use crate::nar::RenderError; use crate::pathinfoservice::PathInfoService; use crate::proto; +use futures::StreamExt; use std::sync::Arc; use tokio::task; use tokio_stream::wrappers::ReceiverStream; @@ -36,7 +37,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra .to_vec() .try_into() .map_err(|_e| Status::invalid_argument("invalid output digest length"))?; - match self.path_info_service.get(digest) { + match self.path_info_service.get(digest).await { Ok(None) => Err(Status::not_found("PathInfo not found")), Ok(Some(path_info)) => Ok(Response::new(path_info)), Err(e) => { @@ -54,7 +55,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra // Store the PathInfo in the client. Clients MUST validate the data // they receive, so we don't validate additionally here. - match self.path_info_service.put(path_info) { + match self.path_info_service.put(path_info).await { Ok(path_info_new) => Ok(Response::new(path_info_new)), Err(e) => { warn!("failed to insert PathInfo: {}", e); @@ -72,11 +73,10 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra None => Err(Status::invalid_argument("no root node sent")), Some(root_node) => { let path_info_service = self.path_info_service.clone(); - let (nar_size, nar_sha256) = - task::spawn_blocking(move || path_info_service.calculate_nar(&root_node)) - .await - .unwrap() - .expect("error during nar calculation"); // TODO: handle error + let (nar_size, nar_sha256) = path_info_service + .calculate_nar(&root_node) + .await + .expect("error during nar calculation"); // TODO: handle error Ok(Response::new(proto::CalculateNarResponse { nar_size, @@ -96,7 +96,8 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra let path_info_service = self.path_info_service.clone(); let _task = task::spawn(async move { - for e in path_info_service.list() { + let mut stream = path_info_service.list(); + while let Some(e) = stream.next().await { let res = e.map_err(|e| Status::internal(e.to_string())); if tx.send(res).await.is_err() { debug!("receiver dropped"); diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs index 45b9c3440d89..3f7f7dff9db1 100644 --- a/tvix/store/src/tests/import.rs +++ b/tvix/store/src/tests/import.rs @@ -111,10 +111,12 @@ async fn complicated() { // ensure DIRECTORY_WITH_KEEP and DIRECTORY_COMPLICATED have been uploaded assert!(directory_service .get(&DIRECTORY_WITH_KEEP.digest()) + .await .unwrap() .is_some()); assert!(directory_service .get(&DIRECTORY_COMPLICATED.digest()) + .await .unwrap() .is_some()); diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 22dbd7bcba9e..e0163dc7bd93 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -8,12 +8,12 @@ use crate::tests::utils::*; use sha2::{Digest, Sha256}; use std::io; -#[test] -fn single_symlink() { - let mut buf: Vec<u8> = vec![]; +#[tokio::test] +async fn single_symlink() { + let buf: Vec<u8> = vec![]; - write_nar( - &mut buf, + let buf = write_nar( + buf, &crate::proto::node::Node::Symlink(SymlinkNode { name: "doesntmatter".into(), target: "/nix/store/somewhereelse".into(), @@ -22,6 +22,7 @@ fn single_symlink() { gen_blob_service(), gen_directory_service(), ) + .await .expect("must succeed"); assert_eq!(buf, NAR_CONTENTS_SYMLINK.to_vec()); @@ -30,24 +31,21 @@ fn single_symlink() { /// Make sure the NARRenderer fails if a referred blob doesn't exist. #[tokio::test] async fn single_file_missing_blob() { - let mut buf: Vec<u8> = vec![]; + let buf: Vec<u8> = vec![]; - let e = tokio::task::spawn_blocking(move || { - write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u32, - executable: false, - }), - // the blobservice is empty intentionally, to provoke the error. - gen_blob_service(), - gen_directory_service(), - ) - }) + let e = write_nar( + buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + // the blobservice is empty intentionally, to provoke the error. + gen_blob_service(), + gen_directory_service(), + ) .await - .unwrap() .expect_err("must fail"); match e { @@ -80,23 +78,20 @@ async fn single_file_wrong_blob_size() { let bs = blob_service.clone(); // Test with a root FileNode of a too big size { - let mut buf: Vec<u8> = vec![]; - - let e = tokio::task::spawn_blocking(move || { - write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: 42, // <- note the wrong size here! - executable: false, - }), - bs, - gen_directory_service(), - ) - }) + let buf: Vec<u8> = vec![]; + + let e = write_nar( + buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 42, // <- note the wrong size here! + executable: false, + }), + bs, + gen_directory_service(), + ) .await - .unwrap() .expect_err("must fail"); match e { @@ -110,24 +105,21 @@ async fn single_file_wrong_blob_size() { let bs = blob_service.clone(); // Test with a root FileNode of a too small size { - let mut buf: Vec<u8> = vec![]; - - let e = tokio::task::spawn_blocking(move || { - write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: 2, // <- note the wrong size here! - executable: false, - }), - bs, - gen_directory_service(), - ) - .expect_err("must fail") - }) + let buf: Vec<u8> = vec![]; + + let e = write_nar( + buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 2, // <- note the wrong size here! + executable: false, + }), + bs, + gen_directory_service(), + ) .await - .unwrap(); + .expect_err("must fail"); match e { crate::nar::RenderError::NARWriterError(e) => { @@ -156,26 +148,21 @@ async fn single_file() { writer.close().await.unwrap() ); - let mut buf: Vec<u8> = vec![]; + let buf: Vec<u8> = vec![]; - let buf = tokio::task::spawn_blocking(move || { - write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u32, - executable: false, - }), - blob_service, - gen_directory_service(), - ) - .expect("must succeed"); - - buf - }) + let buf = write_nar( + buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + blob_service, + gen_directory_service(), + ) .await - .unwrap(); + .expect("must succeed"); assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec()); } @@ -196,51 +183,48 @@ async fn test_complicated() { .unwrap(); assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap()); - directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap(); + directory_service + .put(DIRECTORY_WITH_KEEP.clone()) + .await + .unwrap(); directory_service .put(DIRECTORY_COMPLICATED.clone()) + .await .unwrap(); - let mut buf: Vec<u8> = vec![]; + let buf: Vec<u8> = vec![]; let bs = blob_service.clone(); let ds = directory_service.clone(); - let buf = tokio::task::spawn_blocking(move || { - write_nar( - &mut buf, - &crate::proto::node::Node::Directory(DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), - bs, - ds, - ) - .expect("must succeed"); - buf - }) + let buf = write_nar( + buf, + &crate::proto::node::Node::Directory(DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + bs, + ds, + ) .await - .unwrap(); + .expect("must succeed"); assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec()); // ensure calculate_nar does return the correct sha256 digest and sum. let bs = blob_service.clone(); let ds = directory_service.clone(); - let (nar_size, nar_digest) = tokio::task::spawn_blocking(move || { - calculate_size_and_sha256( - &crate::proto::node::Node::Directory(DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), - bs, - ds, - ) - }) + let (nar_size, nar_digest) = calculate_size_and_sha256( + &crate::proto::node::Node::Directory(DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + bs, + ds, + ) .await - .unwrap() .expect("must succeed"); assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size); |