diff options
Diffstat (limited to 'tvix/store/src/directoryservice')
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 532 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/memory.rs | 84 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 54 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/sled.rs | 107 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/traverse.rs | 222 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/utils.rs | 140 |
6 files changed, 1139 insertions, 0 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs new file mode 100644 index 000000000000..1b33572cf7de --- /dev/null +++ b/tvix/store/src/directoryservice/grpc.rs @@ -0,0 +1,532 @@ +use std::collections::HashSet; + +use super::{DirectoryPutter, DirectoryService}; +use crate::proto::{self, get_directory_request::ByWhat}; +use crate::{B3Digest, Error}; +use tokio::sync::mpsc::UnboundedSender; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic::{transport::Channel, Status}; +use tonic::{Code, Streaming}; +use tracing::{instrument, warn}; + +/// Connects to a (remote) tvix-store DirectoryService over gRPC. +#[derive(Clone)] +pub struct GRPCDirectoryService { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, +} + +impl GRPCDirectoryService { + /// Construct a new [GRPCDirectoryService], by passing a handle to the + /// tokio runtime, and a gRPC client. + pub fn new( + tokio_handle: tokio::runtime::Handle, + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle, + grpc_client, + } + } + + /// construct a [GRPCDirectoryService] from a [proto::blob_service_client::BlobServiceClient<Channel>]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle: tokio::runtime::Handle::current(), + grpc_client, + } + } +} + +impl DirectoryService for GRPCDirectoryService { + type DirectoriesIterator = StreamIterator; + + fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + + let digest_as_vec = digest.to_vec(); + let task = self.tokio_handle.spawn(async move { + let mut s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(digest_as_vec)), + }) + .await? + .into_inner(); + + // Retrieve the first message only, then close the stream (we set recursive to false) + s.message().await + }); + + let digest = digest.clone(); + match self.tokio_handle.block_on(task)? { + Ok(Some(directory)) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != digest { + Err(crate::Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))) + } else if let Err(e) = directory.validate() { + // Validate the Directory itself is valid. + warn!("directory failed validation: {}", e.to_string()); + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + digest, e, + ))) + } else { + Ok(Some(directory)) + } + } + Ok(None) => Ok(None), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + let mut grpc_client = self.grpc_client.clone(); + + let task = self + .tokio_handle + .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); + + match self.tokio_handle.block_on(task)? { + Ok(put_directory_resp) => Ok(B3Digest::from_vec( + put_directory_resp.into_inner().root_digest, + ) + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { + let mut grpc_client = self.grpc_client.clone(); + + let root_directory_digest_as_vec = root_directory_digest.to_vec(); + let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(root_directory_digest_as_vec)), + }) + .await? + .into_inner(); + + Ok(s) + }); + + let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); + + StreamIterator::new( + self.tokio_handle.clone(), + root_directory_digest.clone(), + stream, + ) + } + + type DirectoryPutter = GRPCPutter; + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Self::DirectoryPutter + where + Self: Clone, + { + let mut grpc_client = self.grpc_client.clone(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); + + Ok(s) + }); + + GRPCPutter::new(self.tokio_handle.clone(), tx, task) + } +} + +pub struct StreamIterator { + /// A handle into the active tokio runtime. Necessary to run futures to completion. + tokio_handle: tokio::runtime::Handle, + // A stream of [proto::Directory] + stream: Streaming<proto::Directory>, + // The Directory digests we received so far + received_directory_digests: HashSet<B3Digest>, + // The Directory digests we're still expecting to get sent. + expected_directory_digests: HashSet<B3Digest>, +} + +impl StreamIterator { + pub fn new( + tokio_handle: tokio::runtime::Handle, + root_digest: B3Digest, + stream: Streaming<proto::Directory>, + ) -> Self { + Self { + tokio_handle, + stream, + received_directory_digests: HashSet::new(), + expected_directory_digests: HashSet::from([root_digest]), + } + } +} + +impl Iterator for StreamIterator { + type Item = Result<proto::Directory, crate::Error>; + + fn next(&mut self) -> Option<Self::Item> { + match self.tokio_handle.block_on(self.stream.message()) { + Ok(ok) => match ok { + Some(directory) => { + // validate the directory itself. + if let Err(e) = directory.validate() { + return Some(Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + directory.digest(), + e, + )))); + } + // validate we actually expected that directory, and move it from expected to received. + let directory_digest = directory.digest(); + let was_expected = self.expected_directory_digests.remove(&directory_digest); + if !was_expected { + // FUTUREWORK: dumb clients might send the same stuff twice. + // as a fallback, we might want to tolerate receiving + // it if it's in received_directory_digests (as that + // means it once was in expected_directory_digests) + return Some(Err(crate::Error::StorageError(format!( + "received unexpected directory {}", + directory_digest + )))); + } + self.received_directory_digests.insert(directory_digest); + + // register all children in expected_directory_digests. + for child_directory in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + let child_directory_digest = + B3Digest::from_vec(child_directory.digest.clone()).unwrap(); + + self.expected_directory_digests + .insert(child_directory_digest); + } + + Some(Ok(directory)) + } + None => { + // If we were still expecting something, that's an error. + if !self.expected_directory_digests.is_empty() { + Some(Err(crate::Error::StorageError(format!( + "still expected {} directories, but got premature end of stream", + self.expected_directory_digests.len(), + )))) + } else { + None + } + } + }, + Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), + } + } +} + +/// Allows uploading multiple Directory messages in the same gRPC stream. +pub struct GRPCPutter { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// Data about the current request - a handle to the task, and the tx part + /// of the channel. + /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request. + /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed. + #[allow(clippy::type_complexity)] // lol + rq: Option<( + tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + UnboundedSender<proto::Directory>, + )>, +} + +impl GRPCPutter { + pub fn new( + tokio_handle: tokio::runtime::Handle, + directory_sender: UnboundedSender<proto::Directory>, + task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + ) -> Self { + Self { + tokio_handle, + rq: Some((task, directory_sender)), + } + } + + #[allow(dead_code)] + // allows checking if the tx part of the channel is closed. + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } +} + +impl DirectoryPutter for GRPCPutter { + fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + match self.rq { + // If we're not already closed, send the directory to directory_sender. + Some((_, ref directory_sender)) => { + if directory_sender.send(directory).is_err() { + // If the channel has been prematurely closed, invoke close (so we can peek at the error code) + // That error code is much more helpful, because it + // contains the error message from the server. + self.close()?; + } + Ok(()) + } + // If self.close() was already called, we can't put again. + None => Err(Error::StorageError( + "DirectoryPutter already closed".to_string(), + )), + } + } + + /// Closes the stream for sending, and returns the value + fn close(&mut self) -> Result<B3Digest, crate::Error> { + // get self.rq, and replace it with None. + // This ensures we can only close it once. + match std::mem::take(&mut self.rq) { + None => Err(Error::StorageError("already closed".to_string())), + Some((task, directory_sender)) => { + // close directory_sender, so blocking on task will finish. + drop(directory_sender); + + let root_digest = self + .tokio_handle + .block_on(task)? + .map_err(|e| Error::StorageError(e.to_string()))? + .root_digest; + + B3Digest::from_vec(root_digest).map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + }) + } + } + } +} + +#[cfg(test)] +mod tests { + use core::time; + use std::thread; + + use tempfile::TempDir; + use tokio::net::{UnixListener, UnixStream}; + use tokio_stream::wrappers::UnixListenerStream; + use tonic::transport::{Endpoint, Server, Uri}; + + use crate::{ + directoryservice::{DirectoryPutter, DirectoryService}, + proto, + proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, + tests::{ + fixtures::{DIRECTORY_A, DIRECTORY_B}, + utils::gen_directory_service, + }, + }; + + #[test] + fn test() -> anyhow::Result<()> { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("socket"); + + // Spin up a server, in a thread far away, which spawns its own tokio runtime, + // and blocks on the task. + let socket_path_clone = socket_path.clone(); + thread::spawn(move || { + // Create the runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + // Get a handle from this runtime + let handle = rt.handle(); + + let task = handle.spawn(async { + let uds = UnixListener::bind(socket_path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(gen_directory_service()), + )); + router.serve_with_incoming(uds_stream).await + }); + + handle.block_on(task) + }); + + // set up the local client runtime. This is similar to what the [tokio:test] macro desugars to. + let tester_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // wait for the socket to be created + { + let mut socket_created = false; + for _try in 1..20 { + if socket_path.exists() { + socket_created = true; + break; + } + std::thread::sleep(time::Duration::from_millis(20)) + } + + assert!( + socket_created, + "expected socket path to eventually get created, but never happened" + ); + } + + let task = tester_runtime.spawn_blocking(move || { + // Create a channel, connecting to the uds at socket_path. + // The URI is unused. + let channel = Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { + UnixStream::connect(socket_path.clone()) + })); + + let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel); + + // create the GrpcDirectoryService, using the tester_runtime. + let directory_service = + super::GRPCDirectoryService::new(tokio::runtime::Handle::current(), grpc_client); + + // try to get DIRECTORY_A should return Ok(None) + assert_eq!( + None, + directory_service + .get(&DIRECTORY_A.digest()) + .expect("must not fail") + ); + + // Now upload it + assert_eq!( + DIRECTORY_A.digest(), + directory_service + .put(DIRECTORY_A.clone()) + .expect("must succeed") + ); + + // And retrieve it, compare for equality. + assert_eq!( + DIRECTORY_A.clone(), + directory_service + .get(&DIRECTORY_A.digest()) + .expect("must succeed") + .expect("must be some") + ); + + // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. + directory_service + .put(DIRECTORY_B.clone()) + .expect_err("must fail"); + + // Putting DIRECTORY_B in a put_multiple will succeed, but the close + // will always fail. + { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + handle.close().expect_err("must fail"); + } + + // Uploading A and then B should succeed, and closing should return the digest of B. + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_A.clone()).expect("must succeed"); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + let digest = handle.close().expect("must succeed"); + assert_eq!(DIRECTORY_B.digest(), digest); + + // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. + let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); + assert_eq!( + DIRECTORY_B.clone(), + directories_it + .next() + .expect("must be some") + .expect("must succeed") + ); + assert_eq!( + DIRECTORY_A.clone(), + directories_it + .next() + .expect("must be some") + .expect("must succeed") + ); + + // Uploading B and then A should fail, because B refers to A, which + // hasn't been uploaded yet. + // However, the client can burst, so we might not have received the + // error back from the server. + { + let mut handle = directory_service.put_multiple_start(); + // sending out B will always be fine + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + + // whether we will be able to put A as well depends on whether we + // already received the error about B. + if handle.put(DIRECTORY_A.clone()).is_ok() { + // If we didn't, and this was Ok(_), … + // a subsequent close MUST fail (because it waits for the + // server) + handle.close().expect_err("must fail"); + } + } + + // Now we do the same test as before, send B, then A, but wait + // sufficiently enough for the server to have s + // to close us the stream, + // and then assert that uploading anything else via the handle will fail. + { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + + let mut is_closed = false; + for _try in 1..1000 { + if handle.is_closed() { + is_closed = true; + break; + } + std::thread::sleep(time::Duration::from_millis(10)) + } + + assert!( + is_closed, + "expected channel to eventually close, but never happened" + ); + + handle.put(DIRECTORY_A.clone()).expect_err("must fail"); + } + }); + + tester_runtime.block_on(task)?; + + Ok(()) + } +} diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs new file mode 100644 index 000000000000..1fd619f7c8cb --- /dev/null +++ b/tvix/store/src/directoryservice/memory.rs @@ -0,0 +1,84 @@ +use crate::{proto, B3Digest, Error}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tracing::{instrument, warn}; + +use super::utils::SimplePutter; +use super::{DirectoryService, DirectoryTraverser}; + +#[derive(Clone, Default)] +pub struct MemoryDirectoryService { + db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, +} + +impl DirectoryService for MemoryDirectoryService { + type DirectoriesIterator = DirectoryTraverser<Self>; + + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + let db = self.db.read()?; + + match db.get(digest) { + // The directory was not found, return + None => Ok(None), + + // The directory was found, try to parse the data as Directory message + Some(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != *digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))); + } + + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + actual_digest, e, + ))); + } + + Ok(Some(directory.clone())) + } + } + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + + // store it + let mut db = self.db.write()?; + db.insert(digest.clone(), directory); + + Ok(digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { + DirectoryTraverser::with(self.clone(), root_directory_digest) + } + + type DirectoryPutter = SimplePutter<Self>; + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Self::DirectoryPutter + where + Self: Clone, + { + SimplePutter::new(self.clone()) + } +} diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs new file mode 100644 index 000000000000..f387d28948f0 --- /dev/null +++ b/tvix/store/src/directoryservice/mod.rs @@ -0,0 +1,54 @@ +use crate::{proto, B3Digest, Error}; +mod grpc; +mod memory; +mod sled; +mod traverse; +mod utils; + +pub use self::grpc::GRPCDirectoryService; +pub use self::memory::MemoryDirectoryService; +pub use self::sled::SledDirectoryService; +pub use self::traverse::traverse_to; +pub use self::utils::DirectoryTraverser; + +/// The base trait all Directory services need to implement. +/// This is a simple get and put of [crate::proto::Directory], returning their +/// digest. +pub trait DirectoryService { + type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send; + type DirectoryPutter: DirectoryPutter; + + /// Get looks up a single Directory message by its digest. + /// In case the directory is not found, Ok(None) is returned. + fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + /// Get uploads a single Directory message, and returns the calculated + /// digest, or an error. + fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + + /// Looks up a closure of [proto::Directory]. + /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`, + /// and we'd be able to add a default implementation for it here, but + /// we can't have that yet. + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator; + + /// Allows persisting a closure of [proto::Directory], which is a graph of + /// connected Directory messages. + fn put_multiple_start(&self) -> Self::DirectoryPutter; +} + +/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// +/// The consumer can periodically call [put], starting from the leaves. Once +/// the root is reached, [close] can be called to retrieve the root digest (or +/// an error). +pub trait DirectoryPutter { + /// Put a individual [proto::Directory] into the store. + /// Error semantics and behaviour is up to the specific implementation of + /// this trait. + /// Due to bursting, the returned error might refer to an object previously + /// sent via `put`. + fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + + /// Close the stream, and wait for any errors. + fn close(&mut self) -> Result<B3Digest, Error>; +} diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs new file mode 100644 index 000000000000..e189e8acf507 --- /dev/null +++ b/tvix/store/src/directoryservice/sled.rs @@ -0,0 +1,107 @@ +use crate::proto::Directory; +use crate::{proto, B3Digest, Error}; +use prost::Message; +use std::path::PathBuf; +use tracing::{instrument, warn}; + +use super::utils::SimplePutter; +use super::{DirectoryService, DirectoryTraverser}; + +#[derive(Clone)] +pub struct SledDirectoryService { + db: sled::Db, +} + +impl SledDirectoryService { + pub fn new(p: PathBuf) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +impl DirectoryService for SledDirectoryService { + type DirectoriesIterator = DirectoryTraverser<Self>; + + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + match self.db.get(digest.to_vec()) { + // The directory was not found, return + Ok(None) => Ok(None), + + // The directory was found, try to parse the data as Directory message + Ok(Some(data)) => match Directory::decode(&*data) { + Ok(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != *digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))); + } + + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + actual_digest, e, + ))); + } + + Ok(Some(directory)) + } + Err(e) => { + warn!("unable to parse directory {}: {}", digest, e); + Err(Error::StorageError(e.to_string())) + } + }, + // some storage error? + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + // store it + let result = self.db.insert(digest.to_vec(), directory.encode_to_vec()); + if let Err(e) = result { + return Err(Error::StorageError(e.to_string())); + } + Ok(digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { + DirectoryTraverser::with(self.clone(), root_directory_digest) + } + + type DirectoryPutter = SimplePutter<Self>; + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Self::DirectoryPutter + where + Self: Clone, + { + SimplePutter::new(self.clone()) + } +} diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs new file mode 100644 index 000000000000..8691baa8b73f --- /dev/null +++ b/tvix/store/src/directoryservice/traverse.rs @@ -0,0 +1,222 @@ +use super::DirectoryService; +use crate::{proto::NamedNode, B3Digest, Error}; +use tracing::{instrument, warn}; + +/// This traverses from a (root) node to the given (sub)path, returning the Node +/// at that path, or none, if there's nothing at that path. +/// TODO: Do we want to rewrite this in a non-recursing fashion, and use +/// [DirectoryService.get_recursive] to do less lookups? +/// Or do we consider this to be a non-issue due to store composition and local caching? +/// TODO: the name of this function (and mod) is a bit bad, because it doesn't +/// clearly distinguish it from the BFS traversers. +#[instrument(skip(directory_service))] +pub fn traverse_to<DS: DirectoryService>( + directory_service: &DS, + node: crate::proto::node::Node, + path: &std::path::Path, +) -> Result<Option<crate::proto::node::Node>, Error> { + // strip a possible `/` prefix from the path. + let path = { + if path.starts_with("/") { + path.strip_prefix("/").unwrap() + } else { + path + } + }; + + let mut it = path.components(); + + match it.next() { + None => { + // the (remaining) path is empty, return the node we've been called with. + Ok(Some(node)) + } + Some(first_component) => { + match node { + crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => { + // There's still some path left, but the current node is no directory. + // This means the path doesn't exist, as we can't reach it. + Ok(None) + } + crate::proto::node::Node::Directory(directory_node) => { + let digest = B3Digest::from_vec(directory_node.digest) + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + + // fetch the linked node from the directory_service + match directory_service.get(&digest)? { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + None => { + warn!("directory {} does not exist", digest); + + Err(Error::StorageError(format!( + "directory {} does not exist", + digest + ))) + } + Some(directory) => { + // look for first_component in the [Directory]. + // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we + // could stop as soon as e.name is larger than the search string. + let child_node = directory.nodes().find(|n| { + n.get_name() == first_component.as_os_str().to_str().unwrap() + }); + + match child_node { + // child node not found means there's no such element inside the directory. + None => Ok(None), + // child node found, recurse with it and the rest of the path. + Some(child_node) => { + let rest_path: std::path::PathBuf = it.collect(); + traverse_to(directory_service, child_node, &rest_path) + } + } + } + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use crate::{ + directoryservice::DirectoryPutter, + directoryservice::DirectoryService, + tests::{ + fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, + utils::gen_directory_service, + }, + }; + + use super::traverse_to; + + #[test] + fn test_traverse_to() { + let mut directory_service = gen_directory_service(); + + let mut handle = directory_service.put_multiple_start(); + handle + .put(DIRECTORY_WITH_KEEP.clone()) + .expect("must succeed"); + handle + .put(DIRECTORY_COMPLICATED.clone()) + .expect("must succeed"); + + // construct the node for DIRECTORY_COMPLICATED + let node_directory_complicated = + crate::proto::node::Node::Directory(crate::proto::DirectoryNode { + name: "doesntmatter".to_string(), + digest: DIRECTORY_COMPLICATED.digest().to_vec(), + size: DIRECTORY_COMPLICATED.size(), + }); + + // construct the node for DIRECTORY_COMPLICATED + let node_directory_with_keep = crate::proto::node::Node::Directory( + DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), + ); + + // construct the node for the .keep file + let node_file_keep = + crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + + // traversal to an empty subpath should return the root node. + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from(""), + ) + .expect("must succeed"); + + assert_eq!(Some(node_directory_complicated.clone()), resp); + } + + // traversal to `keep` should return the node for DIRECTORY_WITH_KEEP + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("keep"), + ) + .expect("must succeed"); + + assert_eq!(Some(node_directory_with_keep.clone()), resp); + } + + // traversal to `keep/.keep` should return the node for the .keep file + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("keep/.keep"), + ) + .expect("must succeed"); + + assert_eq!(Some(node_file_keep.clone()), resp); + } + + // traversal to `keep/.keep` should return the node for the .keep file + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("/keep/.keep"), + ) + .expect("must succeed"); + + assert_eq!(Some(node_file_keep.clone()), resp); + } + + // traversal to `void` should return None (doesn't exist) + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("void"), + ) + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to `void` should return None (doesn't exist) + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("//v/oid"), + ) + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to `keep/.keep/404` should return None (the path can't be + // reached, as keep/.keep already is a file) + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("keep/.keep/foo"), + ) + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to a subpath of '/' should return the root node. + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("/"), + ) + .expect("must succeed"); + + assert_eq!(Some(node_directory_complicated.clone()), resp); + } + } +} diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs new file mode 100644 index 000000000000..3661808734f3 --- /dev/null +++ b/tvix/store/src/directoryservice/utils.rs @@ -0,0 +1,140 @@ +use super::DirectoryPutter; +use super::DirectoryService; +use crate::proto; +use crate::B3Digest; +use crate::Error; +use std::collections::{HashSet, VecDeque}; +use tracing::{debug_span, instrument, warn}; + +/// Traverses a [proto::Directory] from the root to the children. +/// +/// This is mostly BFS, but directories are only returned once. +pub struct DirectoryTraverser<DS: DirectoryService> { + directory_service: DS, + /// The list of all directories that still need to be traversed. The next + /// element is picked from the front, new elements are enqueued at the + /// back. + worklist_directory_digests: VecDeque<B3Digest>, + /// The list of directory digests already sent to the consumer. + /// We omit sending the same directories multiple times. + sent_directory_digests: HashSet<B3Digest>, +} + +impl<DS: DirectoryService> DirectoryTraverser<DS> { + pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self { + Self { + directory_service, + worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]), + sent_directory_digests: HashSet::new(), + } + } + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + fn enqueue_child_directories(&mut self, directory: &proto::Directory) { + for child_directory_node in &directory.directories { + // TODO: propagate error + let child_digest = B3Digest::from_vec(child_directory_node.digest.clone()).unwrap(); + + if self.worklist_directory_digests.contains(&child_digest) + || self.sent_directory_digests.contains(&child_digest) + { + continue; + } + self.worklist_directory_digests.push_back(child_digest); + } + } +} + +impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { + type Item = Result<proto::Directory, Error>; + + #[instrument(skip_all)] + fn next(&mut self) -> Option<Self::Item> { + // fetch the next directory digest from the top of the work queue. + match self.worklist_directory_digests.pop_front() { + None => None, + Some(current_directory_digest) => { + let span = debug_span!("directory.digest", "{}", current_directory_digest); + let _ = span.enter(); + + // look up the directory itself. + let current_directory = match self.directory_service.get(¤t_directory_digest) + { + // if we got it + Ok(Some(current_directory)) => { + // validate, we don't want to send invalid directories. + if let Err(e) = current_directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Some(Err(Error::StorageError(format!( + "invalid directory: {}", + current_directory_digest + )))); + } + current_directory + } + // if it's not there, we have an inconsistent store! + Ok(None) => { + warn!("directory {} does not exist", current_directory_digest); + return Some(Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + )))); + } + Err(e) => { + warn!("failed to look up directory"); + return Some(Err(Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_digest, e + )))); + } + }; + + // All DirectoryServices MUST validate directory nodes, before returning them out, so we + // can be sure [enqueue_child_directories] doesn't panic. + + // enqueue child directories + self.enqueue_child_directories(¤t_directory); + Some(Ok(current_directory)) + } + } + } +} + +/// This is a simple implementation of a Directory uploader. +/// TODO: verify connectivity? Factor out these checks into generic helpers? +pub struct SimplePutter<DS: DirectoryService> { + directory_service: DS, + last_directory_digest: Option<B3Digest>, +} + +impl<DS: DirectoryService> SimplePutter<DS> { + pub fn new(directory_service: DS) -> Self { + Self { + directory_service, + last_directory_digest: None, + } + } +} + +impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { + fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + let digest = self.directory_service.put(directory)?; + + // track the last directory digest + self.last_directory_digest = Some(digest); + + Ok(()) + } + + /// We need to be mutable here, as that's the signature of the trait. + fn close(&mut self) -> Result<B3Digest, Error> { + match &self.last_directory_digest { + Some(last_digest) => Ok(last_digest.clone()), + None => Err(Error::InvalidRequest( + "no directories sent, can't show root digest".to_string(), + )), + } + } +} |