From 37a348b4fae16b2b1c5ec12deaa085a049833d7f Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Tue, 19 Sep 2023 11:46:41 -0500 Subject: refactor(tvix/store): Asyncify PathInfoService and DirectoryService We've decided to asyncify all of the services to reduce some of the pains going back and for between sync<->async. The end goal will be for all the tvix-store internals to be async and then expose a sync interface for things like tvix eval io. Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369 Autosubmit: Connor Brewster Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/store/src/directoryservice/grpc.rs | 285 +++++++++++++--------------- tvix/store/src/directoryservice/memory.rs | 20 +- tvix/store/src/directoryservice/mod.rs | 24 ++- tvix/store/src/directoryservice/sled.rs | 20 +- tvix/store/src/directoryservice/traverse.rs | 115 ++++++----- tvix/store/src/directoryservice/utils.rs | 161 +++++++--------- 6 files changed, 301 insertions(+), 324 deletions(-) (limited to 'tvix/store/src/directoryservice') diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index 73d88bb688a3..6257a8e81485 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -1,22 +1,24 @@ use std::collections::HashSet; +use std::pin::Pin; use super::{DirectoryPutter, DirectoryService}; use crate::proto::{self, get_directory_request::ByWhat}; use crate::{B3Digest, Error}; +use async_stream::try_stream; +use futures::Stream; use tokio::net::UnixStream; +use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic::async_trait; +use tonic::Code; use tonic::{transport::Channel, Status}; -use tonic::{Code, Streaming}; use tracing::{instrument, warn}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] pub struct GRPCDirectoryService { - /// A handle into the active tokio runtime. Necessary to spawn tasks. - tokio_handle: tokio::runtime::Handle, - /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::directory_service_client::DirectoryServiceClient, @@ -28,13 +30,11 @@ impl GRPCDirectoryService { pub fn from_client( grpc_client: proto::directory_service_client::DirectoryServiceClient, ) -> Self { - Self { - tokio_handle: tokio::runtime::Handle::current(), - grpc_client, - } + Self { grpc_client } } } +#[async_trait] impl DirectoryService for GRPCDirectoryService { /// Constructs a [GRPCDirectoryService] from the passed [url::Url]: /// - scheme has to match `grpc+*://`. @@ -89,11 +89,15 @@ impl DirectoryService for GRPCDirectoryService { } } } - fn get(&self, digest: &B3Digest) -> Result, crate::Error> { + + async fn get( + &self, + digest: &B3Digest, + ) -> Result, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest_cpy = digest.clone(); - let task = self.tokio_handle.spawn(async move { + let message = async move { let mut s = grpc_client .get(proto::GetDirectoryRequest { recursive: false, @@ -104,10 +108,10 @@ impl DirectoryService for GRPCDirectoryService { // Retrieve the first message only, then close the stream (we set recursive to false) s.message().await - }); + }; let digest = digest.clone(); - match self.tokio_handle.block_on(task)? { + match message.await { Ok(Some(directory)) => { // Validate the retrieved Directory indeed has the // digest we expect it to have, to detect corruptions. @@ -134,14 +138,12 @@ impl DirectoryService for GRPCDirectoryService { } } - fn put(&self, directory: crate::proto::Directory) -> Result { + async fn put(&self, directory: crate::proto::Directory) -> Result { let mut grpc_client = self.grpc_client.clone(); - let task = self - .tokio_handle - .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); + let resp = grpc_client.put(tokio_stream::iter(vec![directory])).await; - match self.tokio_handle.block_on(task)? { + match resp { Ok(put_directory_resp) => Ok(put_directory_resp .into_inner() .root_digest @@ -157,32 +159,82 @@ impl DirectoryService for GRPCDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Box> + Send> { + ) -> Pin> + Send>> { let mut grpc_client = self.grpc_client.clone(); + let root_directory_digest = root_directory_digest.clone(); - // clone so we can move it - let root_directory_digest_cpy = root_directory_digest.clone(); - - let task: JoinHandle, Status>> = - self.tokio_handle.spawn(async move { - let s = grpc_client - .get(proto::GetDirectoryRequest { - recursive: true, - by_what: Some(ByWhat::Digest(root_directory_digest_cpy.into())), - }) - .await? - .into_inner(); - - Ok(s) - }); + let stream = try_stream! { + let mut stream = grpc_client + .get(proto::GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(root_directory_digest.clone().into())), + }) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); - let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); + // The Directory digests we received so far + let mut received_directory_digests: HashSet = HashSet::new(); + // The Directory digests we're still expecting to get sent. + let mut expected_directory_digests: HashSet = HashSet::from([root_directory_digest]); + + loop { + match stream.message().await { + Ok(Some(directory)) => { + // validate the directory itself. + if let Err(e) = directory.validate() { + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + directory.digest(), + e, + )))?; + } + // validate we actually expected that directory, and move it from expected to received. + let directory_digest = directory.digest(); + let was_expected = expected_directory_digests.remove(&directory_digest); + if !was_expected { + // FUTUREWORK: dumb clients might send the same stuff twice. + // as a fallback, we might want to tolerate receiving + // it if it's in received_directory_digests (as that + // means it once was in expected_directory_digests) + Err(crate::Error::StorageError(format!( + "received unexpected directory {}", + directory_digest + )))?; + } + received_directory_digests.insert(directory_digest); + + // register all children in expected_directory_digests. + for child_directory in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + let child_directory_digest = + child_directory.digest.clone().try_into().unwrap(); + + expected_directory_digests + .insert(child_directory_digest); + } + + yield directory; + }, + Ok(None) => { + // If we were still expecting something, that's an error. + if !expected_directory_digests.is_empty() { + Err(crate::Error::StorageError(format!( + "still expected {} directories, but got premature end of stream", + expected_directory_digests.len(), + )))? + } else { + return + } + }, + Err(e) => { + Err(crate::Error::StorageError(e.to_string()))?; + }, + } + } + }; - Box::new(StreamIterator::new( - self.tokio_handle.clone(), - root_directory_digest.clone(), - stream, - )) + Box::pin(stream) } #[instrument(skip_all)] @@ -194,110 +246,21 @@ impl DirectoryService for GRPCDirectoryService { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let task: JoinHandle> = - self.tokio_handle.spawn(async move { - let s = grpc_client - .put(UnboundedReceiverStream::new(rx)) - .await? - .into_inner(); - - Ok(s) - }); - - Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task)) - } -} - -pub struct StreamIterator { - /// A handle into the active tokio runtime. Necessary to run futures to completion. - tokio_handle: tokio::runtime::Handle, - // A stream of [proto::Directory] - stream: Streaming, - // The Directory digests we received so far - received_directory_digests: HashSet, - // The Directory digests we're still expecting to get sent. - expected_directory_digests: HashSet, -} - -impl StreamIterator { - pub fn new( - tokio_handle: tokio::runtime::Handle, - root_digest: B3Digest, - stream: Streaming, - ) -> Self { - Self { - tokio_handle, - stream, - received_directory_digests: HashSet::new(), - expected_directory_digests: HashSet::from([root_digest]), - } - } -} - -impl Iterator for StreamIterator { - type Item = Result; - - fn next(&mut self) -> Option { - match self.tokio_handle.block_on(self.stream.message()) { - Ok(ok) => match ok { - Some(directory) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - return Some(Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))); - } - // validate we actually expected that directory, and move it from expected to received. - let directory_digest = directory.digest(); - let was_expected = self.expected_directory_digests.remove(&directory_digest); - if !was_expected { - // FUTUREWORK: dumb clients might send the same stuff twice. - // as a fallback, we might want to tolerate receiving - // it if it's in received_directory_digests (as that - // means it once was in expected_directory_digests) - return Some(Err(crate::Error::StorageError(format!( - "received unexpected directory {}", - directory_digest - )))); - } - self.received_directory_digests.insert(directory_digest); - - // register all children in expected_directory_digests. - for child_directory in &directory.directories { - // We ran validate() above, so we know these digests must be correct. - let child_directory_digest = - child_directory.digest.clone().try_into().unwrap(); + let task: JoinHandle> = spawn(async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); - self.expected_directory_digests - .insert(child_directory_digest); - } + Ok(s) + }); - Some(Ok(directory)) - } - None => { - // If we were still expecting something, that's an error. - if !self.expected_directory_digests.is_empty() { - Some(Err(crate::Error::StorageError(format!( - "still expected {} directories, but got premature end of stream", - self.expected_directory_digests.len(), - )))) - } else { - None - } - } - }, - Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), - } + Box::new(GRPCPutter::new(tx, task)) } } /// Allows uploading multiple Directory messages in the same gRPC stream. pub struct GRPCPutter { - /// A handle into the active tokio runtime. Necessary to spawn tasks. - tokio_handle: tokio::runtime::Handle, - /// Data about the current request - a handle to the task, and the tx part /// of the channel. /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request. @@ -311,19 +274,18 @@ pub struct GRPCPutter { impl GRPCPutter { pub fn new( - tokio_handle: tokio::runtime::Handle, directory_sender: UnboundedSender, task: JoinHandle>, ) -> Self { Self { - tokio_handle, rq: Some((task, directory_sender)), } } } +#[async_trait] impl DirectoryPutter for GRPCPutter { - fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { match self.rq { // If we're not already closed, send the directory to directory_sender. Some((_, ref directory_sender)) => { @@ -331,7 +293,7 @@ impl DirectoryPutter for GRPCPutter { // If the channel has been prematurely closed, invoke close (so we can peek at the error code) // That error code is much more helpful, because it // contains the error message from the server. - self.close()?; + self.close().await?; } Ok(()) } @@ -343,7 +305,7 @@ impl DirectoryPutter for GRPCPutter { } /// Closes the stream for sending, and returns the value - fn close(&mut self) -> Result { + async fn close(&mut self) -> Result { // get self.rq, and replace it with None. // This ensures we can only close it once. match std::mem::take(&mut self.rq) { @@ -352,9 +314,8 @@ impl DirectoryPutter for GRPCPutter { // close directory_sender, so blocking on task will finish. drop(directory_sender); - let root_digest = self - .tokio_handle - .block_on(task)? + let root_digest = task + .await? .map_err(|e| Error::StorageError(e.to_string()))? .root_digest; @@ -379,6 +340,7 @@ mod tests { use core::time; use std::thread; + use futures::StreamExt; use tempfile::TempDir; use tokio::net::{UnixListener, UnixStream}; use tokio_stream::wrappers::UnixListenerStream; @@ -446,7 +408,7 @@ mod tests { ); } - let task = tester_runtime.spawn_blocking(move || { + tester_runtime.block_on(async move { // Create a channel, connecting to the uds at socket_path. // The URI is unused. let channel = Endpoint::try_from("http://[::]:50051") @@ -465,6 +427,7 @@ mod tests { None, directory_service .get(&DIRECTORY_A.digest()) + .await .expect("must not fail") ); @@ -473,6 +436,7 @@ mod tests { DIRECTORY_A.digest(), directory_service .put(DIRECTORY_A.clone()) + .await .expect("must succeed") ); @@ -481,6 +445,7 @@ mod tests { DIRECTORY_A.clone(), directory_service .get(&DIRECTORY_A.digest()) + .await .expect("must succeed") .expect("must be some") ); @@ -488,21 +453,22 @@ mod tests { // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. directory_service .put(DIRECTORY_B.clone()) + .await .expect_err("must fail"); // Putting DIRECTORY_B in a put_multiple will succeed, but the close // will always fail. { let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).expect("must succeed"); - handle.close().expect_err("must fail"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + handle.close().await.expect_err("must fail"); } // Uploading A and then B should succeed, and closing should return the digest of B. let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_A.clone()).expect("must succeed"); - handle.put(DIRECTORY_B.clone()).expect("must succeed"); - let digest = handle.close().expect("must succeed"); + handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + let digest = handle.close().await.expect("must succeed"); assert_eq!(DIRECTORY_B.digest(), digest); // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. @@ -511,6 +477,7 @@ mod tests { DIRECTORY_B.clone(), directories_it .next() + .await .expect("must be some") .expect("must succeed") ); @@ -518,6 +485,7 @@ mod tests { DIRECTORY_A.clone(), directories_it .next() + .await .expect("must be some") .expect("must succeed") ); @@ -529,15 +497,15 @@ mod tests { { let mut handle = directory_service.put_multiple_start(); // sending out B will always be fine - handle.put(DIRECTORY_B.clone()).expect("must succeed"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); // whether we will be able to put A as well depends on whether we // already received the error about B. - if handle.put(DIRECTORY_A.clone()).is_ok() { + if handle.put(DIRECTORY_A.clone()).await.is_ok() { // If we didn't, and this was Ok(_), … // a subsequent close MUST fail (because it waits for the // server) - handle.close().expect_err("must fail"); + handle.close().await.expect_err("must fail"); } } @@ -547,7 +515,7 @@ mod tests { // and then assert that uploading anything else via the handle will fail. { let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).expect("must succeed"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); let mut is_closed = false; for _try in 1..1000 { @@ -555,7 +523,7 @@ mod tests { is_closed = true; break; } - std::thread::sleep(time::Duration::from_millis(10)) + tokio::time::sleep(time::Duration::from_millis(10)).await; } assert!( @@ -563,12 +531,13 @@ mod tests { "expected channel to eventually close, but never happened" ); - handle.put(DIRECTORY_A.clone()).expect_err("must fail"); + handle + .put(DIRECTORY_A.clone()) + .await + .expect_err("must fail"); } }); - tester_runtime.block_on(task)?; - Ok(()) } } diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs index 634dbf9922d0..ac67c999d01b 100644 --- a/tvix/store/src/directoryservice/memory.rs +++ b/tvix/store/src/directoryservice/memory.rs @@ -1,16 +1,20 @@ use crate::{proto, B3Digest, Error}; +use futures::Stream; use std::collections::HashMap; +use std::pin::Pin; use std::sync::{Arc, RwLock}; +use tonic::async_trait; use tracing::{instrument, warn}; -use super::utils::SimplePutter; -use super::{DirectoryPutter, DirectoryService, DirectoryTraverser}; +use super::utils::{traverse_directory, SimplePutter}; +use super::{DirectoryPutter, DirectoryService}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { db: Arc>>, } +#[async_trait] impl DirectoryService for MemoryDirectoryService { /// Constructs a [MemoryDirectoryService] from the passed [url::Url]: /// - scheme has to be `memory://` @@ -27,8 +31,9 @@ impl DirectoryService for MemoryDirectoryService { Ok(Self::default()) } + #[instrument(skip(self, digest), fields(directory.digest = %digest))] - fn get(&self, digest: &B3Digest) -> Result, Error> { + async fn get(&self, digest: &B3Digest) -> Result, Error> { let db = self.db.read()?; match db.get(digest) { @@ -62,7 +67,7 @@ impl DirectoryService for MemoryDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - fn put(&self, directory: proto::Directory) -> Result { + async fn put(&self, directory: proto::Directory) -> Result { let digest = directory.digest(); // validate the directory itself. @@ -84,11 +89,8 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Box> + Send> { - Box::new(DirectoryTraverser::with( - self.clone(), - root_directory_digest, - )) + ) -> Pin> + Send>> { + traverse_directory(self.clone(), root_directory_digest) } #[instrument(skip_all)] diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index fea4191400f3..09210dfed8e8 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -1,4 +1,7 @@ use crate::{proto, B3Digest, Error}; +use futures::Stream; +use std::pin::Pin; +use tonic::async_trait; mod from_addr; mod grpc; @@ -12,32 +15,38 @@ pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService; pub use self::sled::SledDirectoryService; pub use self::traverse::traverse_to; -pub use self::utils::DirectoryTraverser; /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their /// digest. +#[async_trait] pub trait DirectoryService: Send + Sync { /// Create a new instance by passing in a connection URL. + /// TODO: check if we want to make this async, instead of lazily connecting fn from_url(url: &url::Url) -> Result where Self: Sized; /// Get looks up a single Directory message by its digest. /// In case the directory is not found, Ok(None) is returned. - fn get(&self, digest: &B3Digest) -> Result, Error>; + async fn get(&self, digest: &B3Digest) -> Result, Error>; /// Get uploads a single Directory message, and returns the calculated /// digest, or an error. - fn put(&self, directory: proto::Directory) -> Result; + async fn put(&self, directory: proto::Directory) -> Result; /// Looks up a closure of [proto::Directory]. - /// Ideally this would be a `impl Iterator>`, + /// Ideally this would be a `impl Stream>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Box> + Send>; + ) -> Pin> + Send>>; /// Allows persisting a closure of [proto::Directory], which is a graph of /// connected Directory messages. @@ -49,16 +58,17 @@ pub trait DirectoryService: Send + Sync { /// The consumer can periodically call [DirectoryPutter::put], starting from the /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to /// retrieve the root digest (or an error). +#[async_trait] pub trait DirectoryPutter: Send { /// Put a individual [proto::Directory] into the store. /// Error semantics and behaviour is up to the specific implementation of /// this trait. /// Due to bursting, the returned error might refer to an object previously /// sent via `put`. - fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; /// Close the stream, and wait for any errors. - fn close(&mut self) -> Result; + async fn close(&mut self) -> Result; /// Return whether the stream is closed or not. /// Used from some [DirectoryService] implementations only. diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs index e741434eabb5..0dc5496803cb 100644 --- a/tvix/store/src/directoryservice/sled.rs +++ b/tvix/store/src/directoryservice/sled.rs @@ -1,12 +1,15 @@ use crate::directoryservice::DirectoryPutter; use crate::proto::Directory; use crate::{proto, B3Digest, Error}; +use futures::Stream; use prost::Message; use std::path::PathBuf; +use std::pin::Pin; +use tonic::async_trait; use tracing::{instrument, warn}; -use super::utils::SimplePutter; -use super::{DirectoryService, DirectoryTraverser}; +use super::utils::{traverse_directory, SimplePutter}; +use super::DirectoryService; #[derive(Clone)] pub struct SledDirectoryService { @@ -29,6 +32,7 @@ impl SledDirectoryService { } } +#[async_trait] impl DirectoryService for SledDirectoryService { /// Constructs a [SledDirectoryService] from the passed [url::Url]: /// - scheme has to be `sled://` @@ -59,7 +63,7 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip(self, digest), fields(directory.digest = %digest))] - fn get(&self, digest: &B3Digest) -> Result, Error> { + async fn get(&self, digest: &B3Digest) -> Result, Error> { match self.db.get(digest.to_vec()) { // The directory was not found, return Ok(None) => Ok(None), @@ -99,7 +103,7 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - fn put(&self, directory: proto::Directory) -> Result { + async fn put(&self, directory: proto::Directory) -> Result { let digest = directory.digest(); // validate the directory itself. @@ -121,12 +125,8 @@ impl DirectoryService for SledDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Box<(dyn Iterator> + std::marker::Send + 'static)> - { - Box::new(DirectoryTraverser::with( - self.clone(), - root_directory_digest, - )) + ) -> Pin> + Send + 'static)>> { + traverse_directory(self.clone(), root_directory_digest) } #[instrument(skip_all)] diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs index 9029543267c1..0489c4458163 100644 --- a/tvix/store/src/directoryservice/traverse.rs +++ b/tvix/store/src/directoryservice/traverse.rs @@ -1,19 +1,18 @@ use super::DirectoryService; -use crate::{proto::NamedNode, Error}; +use crate::{proto::NamedNode, B3Digest, Error}; use std::{os::unix::ffi::OsStrExt, sync::Arc}; use tracing::{instrument, warn}; /// This traverses from a (root) node to the given (sub)path, returning the Node /// at that path, or none, if there's nothing at that path. -/// TODO: Do we want to rewrite this in a non-recursing fashion, and use -/// [DirectoryService.get_recursive] to do less lookups? +/// TODO: Do we want to use [DirectoryService.get_recursive] to do less lookups? /// Or do we consider this to be a non-issue due to store composition and local caching? /// TODO: the name of this function (and mod) is a bit bad, because it doesn't /// clearly distinguish it from the BFS traversers. #[instrument(skip(directory_service))] -pub fn traverse_to( +pub async fn traverse_to( directory_service: Arc, - node: crate::proto::node::Node, + root_node: crate::proto::node::Node, path: &std::path::Path, ) -> Result, Error> { // strip a possible `/` prefix from the path. @@ -25,52 +24,56 @@ pub fn traverse_to( } }; + let mut cur_node = root_node; let mut it = path.components(); - match it.next() { - None => { - // the (remaining) path is empty, return the node we've been called with. - Ok(Some(node)) - } - Some(first_component) => { - match node { - crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => { - // There's still some path left, but the current node is no directory. - // This means the path doesn't exist, as we can't reach it. - Ok(None) - } - crate::proto::node::Node::Directory(directory_node) => { - let digest = directory_node - .digest - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - - // fetch the linked node from the directory_service - match directory_service.get(&digest)? { - // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! - None => { - warn!("directory {} does not exist", digest); - - Err(Error::StorageError(format!( - "directory {} does not exist", - digest - ))) - } - Some(directory) => { - // look for first_component in the [Directory]. - // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we - // could stop as soon as e.name is larger than the search string. - let child_node = directory - .nodes() - .find(|n| n.get_name() == first_component.as_os_str().as_bytes()); - - match child_node { - // child node not found means there's no such element inside the directory. - None => Ok(None), - // child node found, recurse with it and the rest of the path. - Some(child_node) => { - let rest_path: std::path::PathBuf = it.collect(); - traverse_to(directory_service, child_node, &rest_path) + loop { + match it.next() { + None => { + // the (remaining) path is empty, return the node we're current at. + return Ok(Some(cur_node)); + } + Some(first_component) => { + match cur_node { + crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => { + // There's still some path left, but the current node is no directory. + // This means the path doesn't exist, as we can't reach it. + return Ok(None); + } + crate::proto::node::Node::Directory(directory_node) => { + let digest: B3Digest = directory_node.digest.try_into().map_err(|_e| { + Error::StorageError("invalid digest length".to_string()) + })?; + + // fetch the linked node from the directory_service + match directory_service.get(&digest).await? { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + None => { + warn!("directory {} does not exist", digest); + + return Err(Error::StorageError(format!( + "directory {} does not exist", + digest + ))); + } + Some(directory) => { + // look for first_component in the [Directory]. + // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we + // could stop as soon as e.name is larger than the search string. + let child_node = directory.nodes().find(|n| { + n.get_name() == first_component.as_os_str().as_bytes() + }); + + match child_node { + // child node not found means there's no such element inside the directory. + None => { + return Ok(None); + } + // child node found, return to top-of loop to find the next + // node in the path. + Some(child_node) => { + cur_node = child_node; + } } } } @@ -92,16 +95,18 @@ mod tests { use super::traverse_to; - #[test] - fn test_traverse_to() { + #[tokio::test] + async fn test_traverse_to() { let directory_service = gen_directory_service(); let mut handle = directory_service.put_multiple_start(); handle .put(DIRECTORY_WITH_KEEP.clone()) + .await .expect("must succeed"); handle .put(DIRECTORY_COMPLICATED.clone()) + .await .expect("must succeed"); // construct the node for DIRECTORY_COMPLICATED @@ -128,6 +133,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from(""), ) + .await .expect("must succeed"); assert_eq!(Some(node_directory_complicated.clone()), resp); @@ -140,6 +146,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("keep"), ) + .await .expect("must succeed"); assert_eq!(Some(node_directory_with_keep), resp); @@ -152,6 +159,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("keep/.keep"), ) + .await .expect("must succeed"); assert_eq!(Some(node_file_keep.clone()), resp); @@ -164,6 +172,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("/keep/.keep"), ) + .await .expect("must succeed"); assert_eq!(Some(node_file_keep), resp); @@ -176,6 +185,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("void"), ) + .await .expect("must succeed"); assert_eq!(None, resp); @@ -188,6 +198,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("//v/oid"), ) + .await .expect("must succeed"); assert_eq!(None, resp); @@ -201,6 +212,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("keep/.keep/foo"), ) + .await .expect("must succeed"); assert_eq!(None, resp); @@ -213,6 +225,7 @@ mod tests { node_directory_complicated.clone(), &PathBuf::from("/"), ) + .await .expect("must succeed"); assert_eq!(Some(node_directory_complicated), resp); diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs index 95f02f1f9ce8..4c5e7cfde37c 100644 --- a/tvix/store/src/directoryservice/utils.rs +++ b/tvix/store/src/directoryservice/utils.rs @@ -3,103 +3,85 @@ use super::DirectoryService; use crate::proto; use crate::B3Digest; use crate::Error; +use async_stream::stream; +use futures::Stream; use std::collections::{HashSet, VecDeque}; -use tracing::{debug_span, instrument, warn}; +use std::pin::Pin; +use tonic::async_trait; +use tracing::warn; /// Traverses a [proto::Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. -pub struct DirectoryTraverser { +pub fn traverse_directory( directory_service: DS, - /// The list of all directories that still need to be traversed. The next - /// element is picked from the front, new elements are enqueued at the - /// back. - worklist_directory_digests: VecDeque, - /// The list of directory digests already sent to the consumer. - /// We omit sending the same directories multiple times. - sent_directory_digests: HashSet, -} - -impl DirectoryTraverser { - pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self { - Self { - directory_service, - worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]), - sent_directory_digests: HashSet::new(), - } - } - - // enqueue all child directory digests to the work queue, as - // long as they're not part of the worklist or already sent. - // This panics if the digest looks invalid, it's supposed to be checked first. - fn enqueue_child_directories(&mut self, directory: &proto::Directory) { - for child_directory_node in &directory.directories { - // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); - - if self.worklist_directory_digests.contains(&child_digest) - || self.sent_directory_digests.contains(&child_digest) - { - continue; - } - self.worklist_directory_digests.push_back(child_digest); - } - } -} - -impl Iterator for DirectoryTraverser { - type Item = Result; - - #[instrument(skip_all)] - fn next(&mut self) -> Option { - // fetch the next directory digest from the top of the work queue. - match self.worklist_directory_digests.pop_front() { - None => None, - Some(current_directory_digest) => { - let span = debug_span!("directory.digest", "{}", current_directory_digest); - let _ = span.enter(); - - // look up the directory itself. - let current_directory = match self.directory_service.get(¤t_directory_digest) - { - // if we got it - Ok(Some(current_directory)) => { - // validate, we don't want to send invalid directories. - if let Err(e) = current_directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Some(Err(Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - )))); - } - current_directory - } - // if it's not there, we have an inconsistent store! - Ok(None) => { - warn!("directory {} does not exist", current_directory_digest); - return Some(Err(Error::StorageError(format!( - "directory {} does not exist", + root_directory_digest: &B3Digest, +) -> Pin> + Send>> { + // The list of all directories that still need to be traversed. The next + // element is picked from the front, new elements are enqueued at the + // back. + let mut worklist_directory_digests: VecDeque = + VecDeque::from([root_directory_digest.clone()]); + // The list of directory digests already sent to the consumer. + // We omit sending the same directories multiple times. + let mut sent_directory_digests: HashSet = HashSet::new(); + + let stream = stream! { + while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { + match directory_service.get(¤t_directory_digest).await { + // if it's not there, we have an inconsistent store! + Ok(None) => { + warn!("directory {} does not exist", current_directory_digest); + yield Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + ))); + } + Err(e) => { + warn!("failed to look up directory"); + yield Err(Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_digest, e + ))); + } + + // if we got it + Ok(Some(current_directory)) => { + // validate, we don't want to send invalid directories. + if let Err(e) = current_directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + yield Err(Error::StorageError(format!( + "invalid directory: {}", current_directory_digest - )))); - } - Err(e) => { - warn!("failed to look up directory"); - return Some(Err(Error::StorageError(format!( - "unable to look up directory {}: {}", - current_directory_digest, e - )))); + ))); } - }; - // All DirectoryServices MUST validate directory nodes, before returning them out, so we - // can be sure [enqueue_child_directories] doesn't panic. + // We're about to send this directory, so let's avoid sending it again if a + // descendant has it. + sent_directory_digests.insert(current_directory_digest); + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + for child_directory_node in ¤t_directory.directories { + // TODO: propagate error + let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + + if worklist_directory_digests.contains(&child_digest) + || sent_directory_digests.contains(&child_digest) + { + continue; + } + worklist_directory_digests.push_back(child_digest); + } - // enqueue child directories - self.enqueue_child_directories(¤t_directory); - Some(Ok(current_directory)) - } + yield Ok(current_directory); + } + }; } - } + }; + + Box::pin(stream) } /// This is a simple implementation of a Directory uploader. @@ -120,13 +102,14 @@ impl SimplePutter { } } +#[async_trait] impl DirectoryPutter for SimplePutter { - fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { if self.closed { return Err(Error::StorageError("already closed".to_string())); } - let digest = self.directory_service.put(directory)?; + let digest = self.directory_service.put(directory).await?; // track the last directory digest self.last_directory_digest = Some(digest); @@ -135,7 +118,7 @@ impl DirectoryPutter for SimplePutter { } /// We need to be mutable here, as that's the signature of the trait. - fn close(&mut self) -> Result { + async fn close(&mut self) -> Result { if self.closed { return Err(Error::StorageError("already closed".to_string())); } -- cgit 1.4.1