diff options
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r-- | tvix/castore/src/directoryservice/from_addr.rs | 36 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 541 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/memory.rs | 149 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/mod.rs | 76 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/sled.rs | 213 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/traverse.rs | 228 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/utils.rs | 140 |
7 files changed, 1383 insertions, 0 deletions
diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs new file mode 100644 index 000000000000..776cf061096c --- /dev/null +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -0,0 +1,36 @@ +use std::sync::Arc; +use url::Url; + +use super::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService, SledDirectoryService}; + +/// Constructs a new instance of a [DirectoryService] from an URI. +/// +/// The following URIs are supported: +/// - `memory:` +/// Uses a in-memory implementation. +/// - `sled:` +/// Uses a in-memory sled implementation. +/// - `sled:///absolute/path/to/somewhere` +/// Uses sled, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. +/// - `grpc+unix:///absolute/path/to/somewhere` +/// Connects to a local tvix-store gRPC service via Unix socket. +/// - `grpc+http://host:port`, `grpc+https://host:port` +/// Connects to a (remote) tvix-store gRPC service. +pub fn from_addr(uri: &str) -> Result<Arc<dyn DirectoryService>, crate::Error> { + let url = Url::parse(uri) + .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; + + Ok(if url.scheme() == "memory" { + Arc::new(MemoryDirectoryService::from_url(&url)?) + } else if url.scheme() == "sled" { + Arc::new(SledDirectoryService::from_url(&url)?) + } else if url.scheme().starts_with("grpc+") { + Arc::new(GRPCDirectoryService::from_url(&url)?) + } else { + Err(crate::Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))? + }) +} diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs new file mode 100644 index 000000000000..0f0305341265 --- /dev/null +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -0,0 +1,541 @@ +use std::collections::HashSet; +use std::pin::Pin; + +use super::{DirectoryPutter, DirectoryService}; +use crate::proto::{self, get_directory_request::ByWhat}; +use crate::{B3Digest, Error}; +use async_stream::try_stream; +use futures::Stream; +use tokio::net::UnixStream; +use tokio::spawn; +use tokio::sync::mpsc::UnboundedSender; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic::async_trait; +use tonic::Code; +use tonic::{transport::Channel, Status}; +use tracing::{instrument, warn}; + +/// Connects to a (remote) tvix-store DirectoryService over gRPC. +#[derive(Clone)] +pub struct GRPCDirectoryService { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, +} + +impl GRPCDirectoryService { + /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + ) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl DirectoryService for GRPCDirectoryService { + /// Constructs a [GRPCDirectoryService] from the passed [url::Url]: + /// - scheme has to match `grpc+*://`. + /// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + /// - In the case of unix sockets, there must be a path, but may not be a host. + /// - In the case of non-unix sockets, there must be a host, but no path. + fn from_url(url: &url::Url) -> Result<Self, crate::Error> { + // Start checking for the scheme to start with grpc+. + match url.scheme().strip_prefix("grpc+") { + None => Err(crate::Error::StorageError("invalid scheme".to_string())), + Some(rest) => { + if rest == "unix" { + if url.host_str().is_some() { + return Err(crate::Error::StorageError( + "host may not be set".to_string(), + )); + } + let path = url.path().to_string(); + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter + .unwrap() + .connect_with_connector_lazy(tower::service_fn( + move |_: tonic::transport::Uri| UnixStream::connect(path.clone()), + )); + let grpc_client = + proto::directory_service_client::DirectoryServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } else { + // ensure path is empty, not supported with gRPC. + if !url.path().is_empty() { + return Err(crate::Error::StorageError( + "path may not be set".to_string(), + )); + } + + // clone the uri, and drop the grpc+ from the scheme. + // Recreate a new uri with the `grpc+` prefix dropped from the scheme. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let url = { + let url_str = url.to_string(); + let s_stripped = url_str.strip_prefix("grpc+").unwrap(); + url::Url::parse(s_stripped).unwrap() + }; + let channel = tonic::transport::Endpoint::try_from(url.to_string()) + .unwrap() + .connect_lazy(); + + let grpc_client = + proto::directory_service_client::DirectoryServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } + } + } + } + + async fn get( + &self, + digest: &B3Digest, + ) -> Result<Option<crate::proto::Directory>, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + let digest_cpy = digest.clone(); + let message = async move { + let mut s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(digest_cpy.into())), + }) + .await? + .into_inner(); + + // Retrieve the first message only, then close the stream (we set recursive to false) + s.message().await + }; + + let digest = digest.clone(); + match message.await { + Ok(Some(directory)) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != digest { + Err(crate::Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))) + } else if let Err(e) = directory.validate() { + // Validate the Directory itself is valid. + warn!("directory failed validation: {}", e.to_string()); + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + digest, e, + ))) + } else { + Ok(Some(directory)) + } + } + Ok(None) => Ok(None), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + let mut grpc_client = self.grpc_client.clone(); + + let resp = grpc_client.put(tokio_stream::iter(vec![directory])).await; + + match resp { + Ok(put_directory_resp) => Ok(put_directory_resp + .into_inner() + .root_digest + .try_into() + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + let mut grpc_client = self.grpc_client.clone(); + let root_directory_digest = root_directory_digest.clone(); + + let stream = try_stream! { + let mut stream = grpc_client + .get(proto::GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(root_directory_digest.clone().into())), + }) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); + + // The Directory digests we received so far + let mut received_directory_digests: HashSet<B3Digest> = HashSet::new(); + // The Directory digests we're still expecting to get sent. + let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]); + + loop { + match stream.message().await { + Ok(Some(directory)) => { + // validate the directory itself. + if let Err(e) = directory.validate() { + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + directory.digest(), + e, + )))?; + } + // validate we actually expected that directory, and move it from expected to received. + let directory_digest = directory.digest(); + let was_expected = expected_directory_digests.remove(&directory_digest); + if !was_expected { + // FUTUREWORK: dumb clients might send the same stuff twice. + // as a fallback, we might want to tolerate receiving + // it if it's in received_directory_digests (as that + // means it once was in expected_directory_digests) + Err(crate::Error::StorageError(format!( + "received unexpected directory {}", + directory_digest + )))?; + } + received_directory_digests.insert(directory_digest); + + // register all children in expected_directory_digests. + for child_directory in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + let child_directory_digest = + child_directory.digest.clone().try_into().unwrap(); + + expected_directory_digests + .insert(child_directory_digest); + } + + yield directory; + }, + Ok(None) => { + // If we were still expecting something, that's an error. + if !expected_directory_digests.is_empty() { + Err(crate::Error::StorageError(format!( + "still expected {} directories, but got premature end of stream", + expected_directory_digests.len(), + )))? + } else { + return + } + }, + Err(e) => { + Err(crate::Error::StorageError(e.to_string()))?; + }, + } + } + }; + + Box::pin(stream) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + let mut grpc_client = self.grpc_client.clone(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); + + Ok(s) + }); + + Box::new(GRPCPutter::new(tx, task)) + } +} + +/// Allows uploading multiple Directory messages in the same gRPC stream. +pub struct GRPCPutter { + /// Data about the current request - a handle to the task, and the tx part + /// of the channel. + /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request. + /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed. + #[allow(clippy::type_complexity)] // lol + rq: Option<( + JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + UnboundedSender<proto::Directory>, + )>, +} + +impl GRPCPutter { + pub fn new( + directory_sender: UnboundedSender<proto::Directory>, + task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + ) -> Self { + Self { + rq: Some((task, directory_sender)), + } + } +} + +#[async_trait] +impl DirectoryPutter for GRPCPutter { + async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + match self.rq { + // If we're not already closed, send the directory to directory_sender. + Some((_, ref directory_sender)) => { + if directory_sender.send(directory).is_err() { + // If the channel has been prematurely closed, invoke close (so we can peek at the error code) + // That error code is much more helpful, because it + // contains the error message from the server. + self.close().await?; + } + Ok(()) + } + // If self.close() was already called, we can't put again. + None => Err(Error::StorageError( + "DirectoryPutter already closed".to_string(), + )), + } + } + + /// Closes the stream for sending, and returns the value + async fn close(&mut self) -> Result<B3Digest, crate::Error> { + // get self.rq, and replace it with None. + // This ensures we can only close it once. + match std::mem::take(&mut self.rq) { + None => Err(Error::StorageError("already closed".to_string())), + Some((task, directory_sender)) => { + // close directory_sender, so blocking on task will finish. + drop(directory_sender); + + let root_digest = task + .await? + .map_err(|e| Error::StorageError(e.to_string()))? + .root_digest; + + root_digest.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + }) + } + } + } + + // allows checking if the tx part of the channel is closed. + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } +} + +#[cfg(test)] +mod tests { + use core::time; + use std::thread; + + use futures::StreamExt; + use tempfile::TempDir; + use tokio::net::{UnixListener, UnixStream}; + use tokio_stream::wrappers::UnixListenerStream; + use tonic::transport::{Endpoint, Server, Uri}; + + use crate::{ + directoryservice::DirectoryService, + fixtures::{DIRECTORY_A, DIRECTORY_B}, + proto, + proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, + utils::gen_directory_service, + }; + + #[test] + fn test() -> anyhow::Result<()> { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("socket"); + + // Spin up a server, in a thread far away, which spawns its own tokio runtime, + // and blocks on the task. + let socket_path_clone = socket_path.clone(); + thread::spawn(move || { + // Create the runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + // Get a handle from this runtime + let handle = rt.handle(); + + let task = handle.spawn(async { + let uds = UnixListener::bind(socket_path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(gen_directory_service()), + )); + router.serve_with_incoming(uds_stream).await + }); + + handle.block_on(task) + }); + + // set up the local client runtime. This is similar to what the [tokio:test] macro desugars to. + let tester_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // wait for the socket to be created + { + let mut socket_created = false; + for _try in 1..20 { + if socket_path.exists() { + socket_created = true; + break; + } + std::thread::sleep(time::Duration::from_millis(20)) + } + + assert!( + socket_created, + "expected socket path to eventually get created, but never happened" + ); + } + + tester_runtime.block_on(async move { + // Create a channel, connecting to the uds at socket_path. + // The URI is unused. + let channel = Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { + UnixStream::connect(socket_path.clone()) + })); + + let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel); + + // create the GrpcDirectoryService, using the tester_runtime. + let directory_service = super::GRPCDirectoryService::from_client(grpc_client); + + // try to get DIRECTORY_A should return Ok(None) + assert_eq!( + None, + directory_service + .get(&DIRECTORY_A.digest()) + .await + .expect("must not fail") + ); + + // Now upload it + assert_eq!( + DIRECTORY_A.digest(), + directory_service + .put(DIRECTORY_A.clone()) + .await + .expect("must succeed") + ); + + // And retrieve it, compare for equality. + assert_eq!( + DIRECTORY_A.clone(), + directory_service + .get(&DIRECTORY_A.digest()) + .await + .expect("must succeed") + .expect("must be some") + ); + + // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. + directory_service + .put(DIRECTORY_B.clone()) + .await + .expect_err("must fail"); + + // Putting DIRECTORY_B in a put_multiple will succeed, but the close + // will always fail. + { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + handle.close().await.expect_err("must fail"); + } + + // Uploading A and then B should succeed, and closing should return the digest of B. + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + let digest = handle.close().await.expect("must succeed"); + assert_eq!(DIRECTORY_B.digest(), digest); + + // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. + let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); + assert_eq!( + DIRECTORY_B.clone(), + directories_it + .next() + .await + .expect("must be some") + .expect("must succeed") + ); + assert_eq!( + DIRECTORY_A.clone(), + directories_it + .next() + .await + .expect("must be some") + .expect("must succeed") + ); + + // Uploading B and then A should fail, because B refers to A, which + // hasn't been uploaded yet. + // However, the client can burst, so we might not have received the + // error back from the server. + { + let mut handle = directory_service.put_multiple_start(); + // sending out B will always be fine + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + + // whether we will be able to put A as well depends on whether we + // already received the error about B. + if handle.put(DIRECTORY_A.clone()).await.is_ok() { + // If we didn't, and this was Ok(_), … + // a subsequent close MUST fail (because it waits for the + // server) + handle.close().await.expect_err("must fail"); + } + } + + // Now we do the same test as before, send B, then A, but wait + // sufficiently enough for the server to have s + // to close us the stream, + // and then assert that uploading anything else via the handle will fail. + { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + + let mut is_closed = false; + for _try in 1..1000 { + if handle.is_closed() { + is_closed = true; + break; + } + tokio::time::sleep(time::Duration::from_millis(10)).await; + } + + assert!( + is_closed, + "expected channel to eventually close, but never happened" + ); + + handle + .put(DIRECTORY_A.clone()) + .await + .expect_err("must fail"); + } + }); + + Ok(()) + } +} diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs new file mode 100644 index 000000000000..ac67c999d01b --- /dev/null +++ b/tvix/castore/src/directoryservice/memory.rs @@ -0,0 +1,149 @@ +use crate::{proto, B3Digest, Error}; +use futures::Stream; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; +use tonic::async_trait; +use tracing::{instrument, warn}; + +use super::utils::{traverse_directory, SimplePutter}; +use super::{DirectoryPutter, DirectoryService}; + +#[derive(Clone, Default)] +pub struct MemoryDirectoryService { + db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, +} + +#[async_trait] +impl DirectoryService for MemoryDirectoryService { + /// Constructs a [MemoryDirectoryService] from the passed [url::Url]: + /// - scheme has to be `memory://` + /// - there may not be a host. + /// - there may not be a path. + fn from_url(url: &url::Url) -> Result<Self, Error> { + if url.scheme() != "memory" { + return Err(crate::Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() || !url.path().is_empty() { + return Err(crate::Error::StorageError("invalid url".to_string())); + } + + Ok(Self::default()) + } + + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + let db = self.db.read()?; + + match db.get(digest) { + // The directory was not found, return + None => Ok(None), + + // The directory was found, try to parse the data as Directory message + Some(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != *digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))); + } + + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + actual_digest, e, + ))); + } + + Ok(Some(directory.clone())) + } + } + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + + // store it + let mut db = self.db.write()?; + db.insert(digest.clone(), directory); + + Ok(digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + Box::new(SimplePutter::new(self.clone())) + } +} + +#[cfg(test)] +mod tests { + use super::DirectoryService; + use super::MemoryDirectoryService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(MemoryDirectoryService::from_url(&url).is_err()); + } + + /// This correctly sets the scheme, and doesn't set a path. + #[test] + fn test_valid_scheme() { + let url = url::Url::parse("memory://").expect("must parse"); + + assert!(MemoryDirectoryService::from_url(&url).is_ok()); + } + + /// This sets the host to `foo` + #[test] + fn test_invalid_host() { + let url = url::Url::parse("memory://foo").expect("must parse"); + + assert!(MemoryDirectoryService::from_url(&url).is_err()); + } + + /// This has the path "/", which is invalid. + #[test] + fn test_invalid_has_path() { + let url = url::Url::parse("memory:///").expect("must parse"); + + assert!(MemoryDirectoryService::from_url(&url).is_err()); + } + + /// This has the path "/foo", which is invalid. + #[test] + fn test_invalid_path2() { + let url = url::Url::parse("memory:///foo").expect("must parse"); + + assert!(MemoryDirectoryService::from_url(&url).is_err()); + } +} diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs new file mode 100644 index 000000000000..3b26f4baf79b --- /dev/null +++ b/tvix/castore/src/directoryservice/mod.rs @@ -0,0 +1,76 @@ +use crate::{proto, B3Digest, Error}; +use futures::Stream; +use std::pin::Pin; +use tonic::async_trait; + +mod from_addr; +mod grpc; +mod memory; +mod sled; +mod traverse; +mod utils; + +pub use self::from_addr::from_addr; +pub use self::grpc::GRPCDirectoryService; +pub use self::memory::MemoryDirectoryService; +pub use self::sled::SledDirectoryService; +pub use self::traverse::descend_to; + +/// The base trait all Directory services need to implement. +/// This is a simple get and put of [crate::proto::Directory], returning their +/// digest. +#[async_trait] +pub trait DirectoryService: Send + Sync { + /// Create a new instance by passing in a connection URL. + /// TODO: check if we want to make this async, instead of lazily connecting + fn from_url(url: &url::Url) -> Result<Self, Error> + where + Self: Sized; + + /// Get looks up a single Directory message by its digest. + /// In case the directory is not found, Ok(None) is returned. + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + /// Get uploads a single Directory message, and returns the calculated + /// digest, or an error. + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + + /// Looks up a closure of [proto::Directory]. + /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, + /// and we'd be able to add a default implementation for it here, but + /// we can't have that yet. + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>; + + /// Allows persisting a closure of [proto::Directory], which is a graph of + /// connected Directory messages. + fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; +} + +/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// +/// The consumer can periodically call [DirectoryPutter::put], starting from the +/// leaves. Once the root is reached, [DirectoryPutter::close] can be called to +/// retrieve the root digest (or an error). +#[async_trait] +pub trait DirectoryPutter: Send { + /// Put a individual [proto::Directory] into the store. + /// Error semantics and behaviour is up to the specific implementation of + /// this trait. + /// Due to bursting, the returned error might refer to an object previously + /// sent via `put`. + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + + /// Close the stream, and wait for any errors. + async fn close(&mut self) -> Result<B3Digest, Error>; + + /// Return whether the stream is closed or not. + /// Used from some [DirectoryService] implementations only. + fn is_closed(&self) -> bool; +} diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs new file mode 100644 index 000000000000..0dc5496803cb --- /dev/null +++ b/tvix/castore/src/directoryservice/sled.rs @@ -0,0 +1,213 @@ +use crate::directoryservice::DirectoryPutter; +use crate::proto::Directory; +use crate::{proto, B3Digest, Error}; +use futures::Stream; +use prost::Message; +use std::path::PathBuf; +use std::pin::Pin; +use tonic::async_trait; +use tracing::{instrument, warn}; + +use super::utils::{traverse_directory, SimplePutter}; +use super::DirectoryService; + +#[derive(Clone)] +pub struct SledDirectoryService { + db: sled::Db, +} + +impl SledDirectoryService { + pub fn new(p: PathBuf) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +#[async_trait] +impl DirectoryService for SledDirectoryService { + /// Constructs a [SledDirectoryService] from the passed [url::Url]: + /// - scheme has to be `sled://` + /// - there may not be a host. + /// - a path to the sled needs to be provided (which may not be `/`). + fn from_url(url: &url::Url) -> Result<Self, Error> { + if url.scheme() != "sled" { + return Err(crate::Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() { + return Err(crate::Error::StorageError(format!( + "invalid host: {}", + url.host().unwrap() + ))); + } + + // TODO: expose compression and other parameters as URL parameters, drop new and new_temporary? + if url.path().is_empty() { + Self::new_temporary().map_err(|e| Error::StorageError(e.to_string())) + } else if url.path() == "/" { + Err(crate::Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )) + } else { + Self::new(url.path().into()).map_err(|e| Error::StorageError(e.to_string())) + } + } + + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + match self.db.get(digest.to_vec()) { + // The directory was not found, return + Ok(None) => Ok(None), + + // The directory was found, try to parse the data as Directory message + Ok(Some(data)) => match Directory::decode(&*data) { + Ok(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != *digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))); + } + + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + actual_digest, e, + ))); + } + + Ok(Some(directory)) + } + Err(e) => { + warn!("unable to parse directory {}: {}", digest, e); + Err(Error::StorageError(e.to_string())) + } + }, + // some storage error? + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + // store it + let result = self.db.insert(digest.to_vec(), directory.encode_to_vec()); + if let Err(e) = result { + return Err(Error::StorageError(e.to_string())); + } + Ok(digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Pin<Box<(dyn Stream<Item = Result<proto::Directory, Error>> + Send + 'static)>> { + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + Box::new(SimplePutter::new(self.clone())) + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::DirectoryService; + use super::SledDirectoryService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(SledDirectoryService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and doesn't specify a path (temporary sled). + #[test] + fn test_valid_scheme_temporary() { + let url = url::Url::parse("sled://").expect("must parse"); + + assert!(SledDirectoryService::from_url(&url).is_ok()); + } + + /// This sets the path to a location that doesn't exist, which should fail (as sled doesn't mkdir -p) + #[test] + fn test_nonexistent_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().join("foo").join("bar").to_str().unwrap()); + + assert!(SledDirectoryService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and specifies / as path (which should fail + // for obvious reasons) + #[test] + fn test_invalid_path_root() { + let url = url::Url::parse("sled:///").expect("must parse"); + + assert!(SledDirectoryService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and sets a tempdir as location. + #[test] + fn test_valid_scheme_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!(SledDirectoryService::from_url(&url).is_ok()); + } + + /// This sets a host, rather than a path, which should fail. + #[test] + fn test_invalid_host() { + let url = url::Url::parse("sled://foo.example").expect("must parse"); + + assert!(SledDirectoryService::from_url(&url).is_err()); + } + + /// This sets a host AND a valid path, which should fail + #[test] + fn test_invalid_host_and_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!(SledDirectoryService::from_url(&url).is_err()); + } +} diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs new file mode 100644 index 000000000000..4f011c963c06 --- /dev/null +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -0,0 +1,228 @@ +use super::DirectoryService; +use crate::{proto::NamedNode, B3Digest, Error}; +use std::{os::unix::ffi::OsStrExt, sync::Arc}; +use tracing::{instrument, warn}; + +/// This descends from a (root) node to the given (sub)path, returning the Node +/// at that path, or none, if there's nothing at that path. +#[instrument(skip(directory_service))] +pub async fn descend_to( + directory_service: Arc<dyn DirectoryService>, + root_node: crate::proto::node::Node, + path: &std::path::Path, +) -> Result<Option<crate::proto::node::Node>, Error> { + // strip a possible `/` prefix from the path. + let path = { + if path.starts_with("/") { + path.strip_prefix("/").unwrap() + } else { + path + } + }; + + let mut cur_node = root_node; + let mut it = path.components(); + + loop { + match it.next() { + None => { + // the (remaining) path is empty, return the node we're current at. + return Ok(Some(cur_node)); + } + Some(first_component) => { + match cur_node { + crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => { + // There's still some path left, but the current node is no directory. + // This means the path doesn't exist, as we can't reach it. + return Ok(None); + } + crate::proto::node::Node::Directory(directory_node) => { + let digest: B3Digest = directory_node.digest.try_into().map_err(|_e| { + Error::StorageError("invalid digest length".to_string()) + })?; + + // fetch the linked node from the directory_service + match directory_service.get(&digest).await? { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + None => { + warn!("directory {} does not exist", digest); + + return Err(Error::StorageError(format!( + "directory {} does not exist", + digest + ))); + } + Some(directory) => { + // look for first_component in the [Directory]. + // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we + // could stop as soon as e.name is larger than the search string. + let child_node = directory.nodes().find(|n| { + n.get_name() == first_component.as_os_str().as_bytes() + }); + + match child_node { + // child node not found means there's no such element inside the directory. + None => { + return Ok(None); + } + // child node found, return to top-of loop to find the next + // node in the path. + Some(child_node) => { + cur_node = child_node; + } + } + } + } + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}; + use crate::utils::gen_directory_service; + + use super::descend_to; + + #[tokio::test] + async fn test_descend_to() { + let directory_service = gen_directory_service(); + + let mut handle = directory_service.put_multiple_start(); + handle + .put(DIRECTORY_WITH_KEEP.clone()) + .await + .expect("must succeed"); + handle + .put(DIRECTORY_COMPLICATED.clone()) + .await + .expect("must succeed"); + + // construct the node for DIRECTORY_COMPLICATED + let node_directory_complicated = + crate::proto::node::Node::Directory(crate::proto::DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }); + + // construct the node for DIRECTORY_COMPLICATED + let node_directory_with_keep = crate::proto::node::Node::Directory( + DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), + ); + + // construct the node for the .keep file + let node_file_keep = + crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + + // traversal to an empty subpath should return the root node. + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from(""), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_directory_complicated.clone()), resp); + } + + // traversal to `keep` should return the node for DIRECTORY_WITH_KEEP + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("keep"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_directory_with_keep), resp); + } + + // traversal to `keep/.keep` should return the node for the .keep file + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("keep/.keep"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_file_keep.clone()), resp); + } + + // traversal to `keep/.keep` should return the node for the .keep file + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("/keep/.keep"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_file_keep), resp); + } + + // traversal to `void` should return None (doesn't exist) + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("void"), + ) + .await + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to `void` should return None (doesn't exist) + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("//v/oid"), + ) + .await + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to `keep/.keep/404` should return None (the path can't be + // reached, as keep/.keep already is a file) + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("keep/.keep/foo"), + ) + .await + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to a subpath of '/' should return the root node. + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("/"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_directory_complicated), resp); + } + } +} diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs new file mode 100644 index 000000000000..4c5e7cfde37c --- /dev/null +++ b/tvix/castore/src/directoryservice/utils.rs @@ -0,0 +1,140 @@ +use super::DirectoryPutter; +use super::DirectoryService; +use crate::proto; +use crate::B3Digest; +use crate::Error; +use async_stream::stream; +use futures::Stream; +use std::collections::{HashSet, VecDeque}; +use std::pin::Pin; +use tonic::async_trait; +use tracing::warn; + +/// Traverses a [proto::Directory] from the root to the children. +/// +/// This is mostly BFS, but directories are only returned once. +pub fn traverse_directory<DS: DirectoryService + 'static>( + directory_service: DS, + root_directory_digest: &B3Digest, +) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + // The list of all directories that still need to be traversed. The next + // element is picked from the front, new elements are enqueued at the + // back. + let mut worklist_directory_digests: VecDeque<B3Digest> = + VecDeque::from([root_directory_digest.clone()]); + // The list of directory digests already sent to the consumer. + // We omit sending the same directories multiple times. + let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new(); + + let stream = stream! { + while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { + match directory_service.get(¤t_directory_digest).await { + // if it's not there, we have an inconsistent store! + Ok(None) => { + warn!("directory {} does not exist", current_directory_digest); + yield Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + ))); + } + Err(e) => { + warn!("failed to look up directory"); + yield Err(Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_digest, e + ))); + } + + // if we got it + Ok(Some(current_directory)) => { + // validate, we don't want to send invalid directories. + if let Err(e) = current_directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + yield Err(Error::StorageError(format!( + "invalid directory: {}", + current_directory_digest + ))); + } + + // We're about to send this directory, so let's avoid sending it again if a + // descendant has it. + sent_directory_digests.insert(current_directory_digest); + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + for child_directory_node in ¤t_directory.directories { + // TODO: propagate error + let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + + if worklist_directory_digests.contains(&child_digest) + || sent_directory_digests.contains(&child_digest) + { + continue; + } + worklist_directory_digests.push_back(child_digest); + } + + yield Ok(current_directory); + } + }; + } + }; + + Box::pin(stream) +} + +/// This is a simple implementation of a Directory uploader. +/// TODO: verify connectivity? Factor out these checks into generic helpers? +pub struct SimplePutter<DS: DirectoryService> { + directory_service: DS, + last_directory_digest: Option<B3Digest>, + closed: bool, +} + +impl<DS: DirectoryService> SimplePutter<DS> { + pub fn new(directory_service: DS) -> Self { + Self { + directory_service, + closed: false, + last_directory_digest: None, + } + } +} + +#[async_trait] +impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + + let digest = self.directory_service.put(directory).await?; + + // track the last directory digest + self.last_directory_digest = Some(digest); + + Ok(()) + } + + /// We need to be mutable here, as that's the signature of the trait. + async fn close(&mut self) -> Result<B3Digest, Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + + match &self.last_directory_digest { + Some(last_digest) => { + self.closed = true; + Ok(last_digest.clone()) + } + None => Err(Error::InvalidRequest( + "no directories sent, can't show root digest".to_string(), + )), + } + } + + fn is_closed(&self) -> bool { + self.closed + } +} |