diff options
Diffstat (limited to 'tvix/store/src/directoryservice')
-rw-r--r-- | tvix/store/src/directoryservice/from_addr.rs | 36 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 543 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/memory.rs | 149 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 76 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/sled.rs | 213 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/traverse.rs | 230 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/utils.rs | 140 |
7 files changed, 0 insertions, 1387 deletions
diff --git a/tvix/store/src/directoryservice/from_addr.rs b/tvix/store/src/directoryservice/from_addr.rs deleted file mode 100644 index 776cf061096c..000000000000 --- a/tvix/store/src/directoryservice/from_addr.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::sync::Arc; -use url::Url; - -use super::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService, SledDirectoryService}; - -/// Constructs a new instance of a [DirectoryService] from an URI. -/// -/// The following URIs are supported: -/// - `memory:` -/// Uses a in-memory implementation. -/// - `sled:` -/// Uses a in-memory sled implementation. -/// - `sled:///absolute/path/to/somewhere` -/// Uses sled, using a path on the disk for persistency. Can be only opened -/// from one process at the same time. -/// - `grpc+unix:///absolute/path/to/somewhere` -/// Connects to a local tvix-store gRPC service via Unix socket. -/// - `grpc+http://host:port`, `grpc+https://host:port` -/// Connects to a (remote) tvix-store gRPC service. -pub fn from_addr(uri: &str) -> Result<Arc<dyn DirectoryService>, crate::Error> { - let url = Url::parse(uri) - .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; - - Ok(if url.scheme() == "memory" { - Arc::new(MemoryDirectoryService::from_url(&url)?) - } else if url.scheme() == "sled" { - Arc::new(SledDirectoryService::from_url(&url)?) - } else if url.scheme().starts_with("grpc+") { - Arc::new(GRPCDirectoryService::from_url(&url)?) - } else { - Err(crate::Error::StorageError(format!( - "unknown scheme: {}", - url.scheme() - )))? - }) -} diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs deleted file mode 100644 index 6257a8e81485..000000000000 --- a/tvix/store/src/directoryservice/grpc.rs +++ /dev/null @@ -1,543 +0,0 @@ -use std::collections::HashSet; -use std::pin::Pin; - -use super::{DirectoryPutter, DirectoryService}; -use crate::proto::{self, get_directory_request::ByWhat}; -use crate::{B3Digest, Error}; -use async_stream::try_stream; -use futures::Stream; -use tokio::net::UnixStream; -use tokio::spawn; -use tokio::sync::mpsc::UnboundedSender; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::async_trait; -use tonic::Code; -use tonic::{transport::Channel, Status}; -use tracing::{instrument, warn}; - -/// Connects to a (remote) tvix-store DirectoryService over gRPC. -#[derive(Clone)] -pub struct GRPCDirectoryService { - /// The internal reference to a gRPC client. - /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, -} - -impl GRPCDirectoryService { - /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. - /// panics if called outside the context of a tokio runtime. - pub fn from_client( - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, - ) -> Self { - Self { grpc_client } - } -} - -#[async_trait] -impl DirectoryService for GRPCDirectoryService { - /// Constructs a [GRPCDirectoryService] from the passed [url::Url]: - /// - scheme has to match `grpc+*://`. - /// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. - /// - In the case of unix sockets, there must be a path, but may not be a host. - /// - In the case of non-unix sockets, there must be a host, but no path. - fn from_url(url: &url::Url) -> Result<Self, crate::Error> { - // Start checking for the scheme to start with grpc+. - match url.scheme().strip_prefix("grpc+") { - None => Err(crate::Error::StorageError("invalid scheme".to_string())), - Some(rest) => { - if rest == "unix" { - if url.host_str().is_some() { - return Err(crate::Error::StorageError( - "host may not be set".to_string(), - )); - } - let path = url.path().to_string(); - let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter - .unwrap() - .connect_with_connector_lazy(tower::service_fn( - move |_: tonic::transport::Uri| UnixStream::connect(path.clone()), - )); - let grpc_client = - proto::directory_service_client::DirectoryServiceClient::new(channel); - Ok(Self::from_client(grpc_client)) - } else { - // ensure path is empty, not supported with gRPC. - if !url.path().is_empty() { - return Err(crate::Error::StorageError( - "path may not be set".to_string(), - )); - } - - // clone the uri, and drop the grpc+ from the scheme. - // Recreate a new uri with the `grpc+` prefix dropped from the scheme. - // We can't use `url.set_scheme(rest)`, as it disallows - // setting something http(s) that previously wasn't. - let url = { - let url_str = url.to_string(); - let s_stripped = url_str.strip_prefix("grpc+").unwrap(); - url::Url::parse(s_stripped).unwrap() - }; - let channel = tonic::transport::Endpoint::try_from(url.to_string()) - .unwrap() - .connect_lazy(); - - let grpc_client = - proto::directory_service_client::DirectoryServiceClient::new(channel); - Ok(Self::from_client(grpc_client)) - } - } - } - } - - async fn get( - &self, - digest: &B3Digest, - ) -> Result<Option<crate::proto::Directory>, crate::Error> { - // Get a new handle to the gRPC client, and copy the digest. - let mut grpc_client = self.grpc_client.clone(); - let digest_cpy = digest.clone(); - let message = async move { - let mut s = grpc_client - .get(proto::GetDirectoryRequest { - recursive: false, - by_what: Some(ByWhat::Digest(digest_cpy.into())), - }) - .await? - .into_inner(); - - // Retrieve the first message only, then close the stream (we set recursive to false) - s.message().await - }; - - let digest = digest.clone(); - match message.await { - Ok(Some(directory)) => { - // Validate the retrieved Directory indeed has the - // digest we expect it to have, to detect corruptions. - let actual_digest = directory.digest(); - if actual_digest != digest { - Err(crate::Error::StorageError(format!( - "requested directory with digest {}, but got {}", - digest, actual_digest - ))) - } else if let Err(e) = directory.validate() { - // Validate the Directory itself is valid. - warn!("directory failed validation: {}", e.to_string()); - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - digest, e, - ))) - } else { - Ok(Some(directory)) - } - } - Ok(None) => Ok(None), - Err(e) if e.code() == Code::NotFound => Ok(None), - Err(e) => Err(crate::Error::StorageError(e.to_string())), - } - } - - async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { - let mut grpc_client = self.grpc_client.clone(); - - let resp = grpc_client.put(tokio_stream::iter(vec![directory])).await; - - match resp { - Ok(put_directory_resp) => Ok(put_directory_resp - .into_inner() - .root_digest - .try_into() - .map_err(|_| { - Error::StorageError("invalid root digest length in response".to_string()) - })?), - Err(e) => Err(crate::Error::StorageError(e.to_string())), - } - } - - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive( - &self, - root_directory_digest: &B3Digest, - ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { - let mut grpc_client = self.grpc_client.clone(); - let root_directory_digest = root_directory_digest.clone(); - - let stream = try_stream! { - let mut stream = grpc_client - .get(proto::GetDirectoryRequest { - recursive: true, - by_what: Some(ByWhat::Digest(root_directory_digest.clone().into())), - }) - .await - .map_err(|e| crate::Error::StorageError(e.to_string()))? - .into_inner(); - - // The Directory digests we received so far - let mut received_directory_digests: HashSet<B3Digest> = HashSet::new(); - // The Directory digests we're still expecting to get sent. - let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]); - - loop { - match stream.message().await { - Ok(Some(directory)) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))?; - } - // validate we actually expected that directory, and move it from expected to received. - let directory_digest = directory.digest(); - let was_expected = expected_directory_digests.remove(&directory_digest); - if !was_expected { - // FUTUREWORK: dumb clients might send the same stuff twice. - // as a fallback, we might want to tolerate receiving - // it if it's in received_directory_digests (as that - // means it once was in expected_directory_digests) - Err(crate::Error::StorageError(format!( - "received unexpected directory {}", - directory_digest - )))?; - } - received_directory_digests.insert(directory_digest); - - // register all children in expected_directory_digests. - for child_directory in &directory.directories { - // We ran validate() above, so we know these digests must be correct. - let child_directory_digest = - child_directory.digest.clone().try_into().unwrap(); - - expected_directory_digests - .insert(child_directory_digest); - } - - yield directory; - }, - Ok(None) => { - // If we were still expecting something, that's an error. - if !expected_directory_digests.is_empty() { - Err(crate::Error::StorageError(format!( - "still expected {} directories, but got premature end of stream", - expected_directory_digests.len(), - )))? - } else { - return - } - }, - Err(e) => { - Err(crate::Error::StorageError(e.to_string()))?; - }, - } - } - }; - - Box::pin(stream) - } - - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> - where - Self: Clone, - { - let mut grpc_client = self.grpc_client.clone(); - - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - - let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move { - let s = grpc_client - .put(UnboundedReceiverStream::new(rx)) - .await? - .into_inner(); - - Ok(s) - }); - - Box::new(GRPCPutter::new(tx, task)) - } -} - -/// Allows uploading multiple Directory messages in the same gRPC stream. -pub struct GRPCPutter { - /// Data about the current request - a handle to the task, and the tx part - /// of the channel. - /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request. - /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed. - #[allow(clippy::type_complexity)] // lol - rq: Option<( - JoinHandle<Result<proto::PutDirectoryResponse, Status>>, - UnboundedSender<proto::Directory>, - )>, -} - -impl GRPCPutter { - pub fn new( - directory_sender: UnboundedSender<proto::Directory>, - task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>, - ) -> Self { - Self { - rq: Some((task, directory_sender)), - } - } -} - -#[async_trait] -impl DirectoryPutter for GRPCPutter { - async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { - match self.rq { - // If we're not already closed, send the directory to directory_sender. - Some((_, ref directory_sender)) => { - if directory_sender.send(directory).is_err() { - // If the channel has been prematurely closed, invoke close (so we can peek at the error code) - // That error code is much more helpful, because it - // contains the error message from the server. - self.close().await?; - } - Ok(()) - } - // If self.close() was already called, we can't put again. - None => Err(Error::StorageError( - "DirectoryPutter already closed".to_string(), - )), - } - } - - /// Closes the stream for sending, and returns the value - async fn close(&mut self) -> Result<B3Digest, crate::Error> { - // get self.rq, and replace it with None. - // This ensures we can only close it once. - match std::mem::take(&mut self.rq) { - None => Err(Error::StorageError("already closed".to_string())), - Some((task, directory_sender)) => { - // close directory_sender, so blocking on task will finish. - drop(directory_sender); - - let root_digest = task - .await? - .map_err(|e| Error::StorageError(e.to_string()))? - .root_digest; - - root_digest.try_into().map_err(|_| { - Error::StorageError("invalid root digest length in response".to_string()) - }) - } - } - } - - // allows checking if the tx part of the channel is closed. - fn is_closed(&self) -> bool { - match self.rq { - None => true, - Some((_, ref directory_sender)) => directory_sender.is_closed(), - } - } -} - -#[cfg(test)] -mod tests { - use core::time; - use std::thread; - - use futures::StreamExt; - use tempfile::TempDir; - use tokio::net::{UnixListener, UnixStream}; - use tokio_stream::wrappers::UnixListenerStream; - use tonic::transport::{Endpoint, Server, Uri}; - - use crate::{ - directoryservice::DirectoryService, - proto, - proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, - tests::{ - fixtures::{DIRECTORY_A, DIRECTORY_B}, - utils::gen_directory_service, - }, - }; - - #[test] - fn test() -> anyhow::Result<()> { - let tmpdir = TempDir::new().unwrap(); - let socket_path = tmpdir.path().join("socket"); - - // Spin up a server, in a thread far away, which spawns its own tokio runtime, - // and blocks on the task. - let socket_path_clone = socket_path.clone(); - thread::spawn(move || { - // Create the runtime - let rt = tokio::runtime::Runtime::new().unwrap(); - // Get a handle from this runtime - let handle = rt.handle(); - - let task = handle.spawn(async { - let uds = UnixListener::bind(socket_path_clone).unwrap(); - let uds_stream = UnixListenerStream::new(uds); - - // spin up a new DirectoryService - let mut server = Server::builder(); - let router = server.add_service(DirectoryServiceServer::new( - GRPCDirectoryServiceWrapper::from(gen_directory_service()), - )); - router.serve_with_incoming(uds_stream).await - }); - - handle.block_on(task) - }); - - // set up the local client runtime. This is similar to what the [tokio:test] macro desugars to. - let tester_runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - // wait for the socket to be created - { - let mut socket_created = false; - for _try in 1..20 { - if socket_path.exists() { - socket_created = true; - break; - } - std::thread::sleep(time::Duration::from_millis(20)) - } - - assert!( - socket_created, - "expected socket path to eventually get created, but never happened" - ); - } - - tester_runtime.block_on(async move { - // Create a channel, connecting to the uds at socket_path. - // The URI is unused. - let channel = Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { - UnixStream::connect(socket_path.clone()) - })); - - let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel); - - // create the GrpcDirectoryService, using the tester_runtime. - let directory_service = super::GRPCDirectoryService::from_client(grpc_client); - - // try to get DIRECTORY_A should return Ok(None) - assert_eq!( - None, - directory_service - .get(&DIRECTORY_A.digest()) - .await - .expect("must not fail") - ); - - // Now upload it - assert_eq!( - DIRECTORY_A.digest(), - directory_service - .put(DIRECTORY_A.clone()) - .await - .expect("must succeed") - ); - - // And retrieve it, compare for equality. - assert_eq!( - DIRECTORY_A.clone(), - directory_service - .get(&DIRECTORY_A.digest()) - .await - .expect("must succeed") - .expect("must be some") - ); - - // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. - directory_service - .put(DIRECTORY_B.clone()) - .await - .expect_err("must fail"); - - // Putting DIRECTORY_B in a put_multiple will succeed, but the close - // will always fail. - { - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - handle.close().await.expect_err("must fail"); - } - - // Uploading A and then B should succeed, and closing should return the digest of B. - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - let digest = handle.close().await.expect("must succeed"); - assert_eq!(DIRECTORY_B.digest(), digest); - - // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. - let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); - assert_eq!( - DIRECTORY_B.clone(), - directories_it - .next() - .await - .expect("must be some") - .expect("must succeed") - ); - assert_eq!( - DIRECTORY_A.clone(), - directories_it - .next() - .await - .expect("must be some") - .expect("must succeed") - ); - - // Uploading B and then A should fail, because B refers to A, which - // hasn't been uploaded yet. - // However, the client can burst, so we might not have received the - // error back from the server. - { - let mut handle = directory_service.put_multiple_start(); - // sending out B will always be fine - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - - // whether we will be able to put A as well depends on whether we - // already received the error about B. - if handle.put(DIRECTORY_A.clone()).await.is_ok() { - // If we didn't, and this was Ok(_), … - // a subsequent close MUST fail (because it waits for the - // server) - handle.close().await.expect_err("must fail"); - } - } - - // Now we do the same test as before, send B, then A, but wait - // sufficiently enough for the server to have s - // to close us the stream, - // and then assert that uploading anything else via the handle will fail. - { - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - - let mut is_closed = false; - for _try in 1..1000 { - if handle.is_closed() { - is_closed = true; - break; - } - tokio::time::sleep(time::Duration::from_millis(10)).await; - } - - assert!( - is_closed, - "expected channel to eventually close, but never happened" - ); - - handle - .put(DIRECTORY_A.clone()) - .await - .expect_err("must fail"); - } - }); - - Ok(()) - } -} diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs deleted file mode 100644 index ac67c999d01b..000000000000 --- a/tvix/store/src/directoryservice/memory.rs +++ /dev/null @@ -1,149 +0,0 @@ -use crate::{proto, B3Digest, Error}; -use futures::Stream; -use std::collections::HashMap; -use std::pin::Pin; -use std::sync::{Arc, RwLock}; -use tonic::async_trait; -use tracing::{instrument, warn}; - -use super::utils::{traverse_directory, SimplePutter}; -use super::{DirectoryPutter, DirectoryService}; - -#[derive(Clone, Default)] -pub struct MemoryDirectoryService { - db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, -} - -#[async_trait] -impl DirectoryService for MemoryDirectoryService { - /// Constructs a [MemoryDirectoryService] from the passed [url::Url]: - /// - scheme has to be `memory://` - /// - there may not be a host. - /// - there may not be a path. - fn from_url(url: &url::Url) -> Result<Self, Error> { - if url.scheme() != "memory" { - return Err(crate::Error::StorageError("invalid scheme".to_string())); - } - - if url.has_host() || !url.path().is_empty() { - return Err(crate::Error::StorageError("invalid url".to_string())); - } - - Ok(Self::default()) - } - - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { - let db = self.db.read()?; - - match db.get(digest) { - // The directory was not found, return - None => Ok(None), - - // The directory was found, try to parse the data as Directory message - Some(directory) => { - // Validate the retrieved Directory indeed has the - // digest we expect it to have, to detect corruptions. - let actual_digest = directory.digest(); - if actual_digest != *digest { - return Err(Error::StorageError(format!( - "requested directory with digest {}, but got {}", - digest, actual_digest - ))); - } - - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } - - Ok(Some(directory.clone())) - } - } - } - - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - let digest = directory.digest(); - - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - - // store it - let mut db = self.db.write()?; - db.insert(digest.clone(), directory); - - Ok(digest) - } - - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive( - &self, - root_directory_digest: &B3Digest, - ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { - traverse_directory(self.clone(), root_directory_digest) - } - - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> - where - Self: Clone, - { - Box::new(SimplePutter::new(self.clone())) - } -} - -#[cfg(test)] -mod tests { - use super::DirectoryService; - use super::MemoryDirectoryService; - - /// This uses a wrong scheme. - #[test] - fn test_invalid_scheme() { - let url = url::Url::parse("http://foo.example/test").expect("must parse"); - - assert!(MemoryDirectoryService::from_url(&url).is_err()); - } - - /// This correctly sets the scheme, and doesn't set a path. - #[test] - fn test_valid_scheme() { - let url = url::Url::parse("memory://").expect("must parse"); - - assert!(MemoryDirectoryService::from_url(&url).is_ok()); - } - - /// This sets the host to `foo` - #[test] - fn test_invalid_host() { - let url = url::Url::parse("memory://foo").expect("must parse"); - - assert!(MemoryDirectoryService::from_url(&url).is_err()); - } - - /// This has the path "/", which is invalid. - #[test] - fn test_invalid_has_path() { - let url = url::Url::parse("memory:///").expect("must parse"); - - assert!(MemoryDirectoryService::from_url(&url).is_err()); - } - - /// This has the path "/foo", which is invalid. - #[test] - fn test_invalid_path2() { - let url = url::Url::parse("memory:///foo").expect("must parse"); - - assert!(MemoryDirectoryService::from_url(&url).is_err()); - } -} diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs deleted file mode 100644 index 3b26f4baf79b..000000000000 --- a/tvix/store/src/directoryservice/mod.rs +++ /dev/null @@ -1,76 +0,0 @@ -use crate::{proto, B3Digest, Error}; -use futures::Stream; -use std::pin::Pin; -use tonic::async_trait; - -mod from_addr; -mod grpc; -mod memory; -mod sled; -mod traverse; -mod utils; - -pub use self::from_addr::from_addr; -pub use self::grpc::GRPCDirectoryService; -pub use self::memory::MemoryDirectoryService; -pub use self::sled::SledDirectoryService; -pub use self::traverse::descend_to; - -/// The base trait all Directory services need to implement. -/// This is a simple get and put of [crate::proto::Directory], returning their -/// digest. -#[async_trait] -pub trait DirectoryService: Send + Sync { - /// Create a new instance by passing in a connection URL. - /// TODO: check if we want to make this async, instead of lazily connecting - fn from_url(url: &url::Url) -> Result<Self, Error> - where - Self: Sized; - - /// Get looks up a single Directory message by its digest. - /// In case the directory is not found, Ok(None) is returned. - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; - /// Get uploads a single Directory message, and returns the calculated - /// digest, or an error. - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; - - /// Looks up a closure of [proto::Directory]. - /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, - /// and we'd be able to add a default implementation for it here, but - /// we can't have that yet. - /// - /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, - /// and the box allows different underlying stream implementations to be returned since - /// Rust doesn't support this as a generic in traits yet. This is the same thing that - /// [async_trait] generates, but for streams instead of futures. - fn get_recursive( - &self, - root_directory_digest: &B3Digest, - ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>; - - /// Allows persisting a closure of [proto::Directory], which is a graph of - /// connected Directory messages. - fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; -} - -/// Provides a handle to put a closure of connected [proto::Directory] elements. -/// -/// The consumer can periodically call [DirectoryPutter::put], starting from the -/// leaves. Once the root is reached, [DirectoryPutter::close] can be called to -/// retrieve the root digest (or an error). -#[async_trait] -pub trait DirectoryPutter: Send { - /// Put a individual [proto::Directory] into the store. - /// Error semantics and behaviour is up to the specific implementation of - /// this trait. - /// Due to bursting, the returned error might refer to an object previously - /// sent via `put`. - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; - - /// Close the stream, and wait for any errors. - async fn close(&mut self) -> Result<B3Digest, Error>; - - /// Return whether the stream is closed or not. - /// Used from some [DirectoryService] implementations only. - fn is_closed(&self) -> bool; -} diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs deleted file mode 100644 index 0dc5496803cb..000000000000 --- a/tvix/store/src/directoryservice/sled.rs +++ /dev/null @@ -1,213 +0,0 @@ -use crate::directoryservice::DirectoryPutter; -use crate::proto::Directory; -use crate::{proto, B3Digest, Error}; -use futures::Stream; -use prost::Message; -use std::path::PathBuf; -use std::pin::Pin; -use tonic::async_trait; -use tracing::{instrument, warn}; - -use super::utils::{traverse_directory, SimplePutter}; -use super::DirectoryService; - -#[derive(Clone)] -pub struct SledDirectoryService { - db: sled::Db, -} - -impl SledDirectoryService { - pub fn new(p: PathBuf) -> Result<Self, sled::Error> { - let config = sled::Config::default().use_compression(true).path(p); - let db = config.open()?; - - Ok(Self { db }) - } - - pub fn new_temporary() -> Result<Self, sled::Error> { - let config = sled::Config::default().temporary(true); - let db = config.open()?; - - Ok(Self { db }) - } -} - -#[async_trait] -impl DirectoryService for SledDirectoryService { - /// Constructs a [SledDirectoryService] from the passed [url::Url]: - /// - scheme has to be `sled://` - /// - there may not be a host. - /// - a path to the sled needs to be provided (which may not be `/`). - fn from_url(url: &url::Url) -> Result<Self, Error> { - if url.scheme() != "sled" { - return Err(crate::Error::StorageError("invalid scheme".to_string())); - } - - if url.has_host() { - return Err(crate::Error::StorageError(format!( - "invalid host: {}", - url.host().unwrap() - ))); - } - - // TODO: expose compression and other parameters as URL parameters, drop new and new_temporary? - if url.path().is_empty() { - Self::new_temporary().map_err(|e| Error::StorageError(e.to_string())) - } else if url.path() == "/" { - Err(crate::Error::StorageError( - "cowardly refusing to open / with sled".to_string(), - )) - } else { - Self::new(url.path().into()).map_err(|e| Error::StorageError(e.to_string())) - } - } - - #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { - match self.db.get(digest.to_vec()) { - // The directory was not found, return - Ok(None) => Ok(None), - - // The directory was found, try to parse the data as Directory message - Ok(Some(data)) => match Directory::decode(&*data) { - Ok(directory) => { - // Validate the retrieved Directory indeed has the - // digest we expect it to have, to detect corruptions. - let actual_digest = directory.digest(); - if actual_digest != *digest { - return Err(Error::StorageError(format!( - "requested directory with digest {}, but got {}", - digest, actual_digest - ))); - } - - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } - - Ok(Some(directory)) - } - Err(e) => { - warn!("unable to parse directory {}: {}", digest, e); - Err(Error::StorageError(e.to_string())) - } - }, - // some storage error? - Err(e) => Err(Error::StorageError(e.to_string())), - } - } - - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - let digest = directory.digest(); - - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - // store it - let result = self.db.insert(digest.to_vec(), directory.encode_to_vec()); - if let Err(e) = result { - return Err(Error::StorageError(e.to_string())); - } - Ok(digest) - } - - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive( - &self, - root_directory_digest: &B3Digest, - ) -> Pin<Box<(dyn Stream<Item = Result<proto::Directory, Error>> + Send + 'static)>> { - traverse_directory(self.clone(), root_directory_digest) - } - - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> - where - Self: Clone, - { - Box::new(SimplePutter::new(self.clone())) - } -} - -#[cfg(test)] -mod tests { - use tempfile::TempDir; - - use super::DirectoryService; - use super::SledDirectoryService; - - /// This uses a wrong scheme. - #[test] - fn test_invalid_scheme() { - let url = url::Url::parse("http://foo.example/test").expect("must parse"); - - assert!(SledDirectoryService::from_url(&url).is_err()); - } - - /// This uses the correct scheme, and doesn't specify a path (temporary sled). - #[test] - fn test_valid_scheme_temporary() { - let url = url::Url::parse("sled://").expect("must parse"); - - assert!(SledDirectoryService::from_url(&url).is_ok()); - } - - /// This sets the path to a location that doesn't exist, which should fail (as sled doesn't mkdir -p) - #[test] - fn test_nonexistent_path() { - let tmpdir = TempDir::new().unwrap(); - - let mut url = url::Url::parse("sled://foo.example").expect("must parse"); - url.set_path(tmpdir.path().join("foo").join("bar").to_str().unwrap()); - - assert!(SledDirectoryService::from_url(&url).is_err()); - } - - /// This uses the correct scheme, and specifies / as path (which should fail - // for obvious reasons) - #[test] - fn test_invalid_path_root() { - let url = url::Url::parse("sled:///").expect("must parse"); - - assert!(SledDirectoryService::from_url(&url).is_err()); - } - - /// This uses the correct scheme, and sets a tempdir as location. - #[test] - fn test_valid_scheme_path() { - let tmpdir = TempDir::new().unwrap(); - - let mut url = url::Url::parse("sled://").expect("must parse"); - url.set_path(tmpdir.path().to_str().unwrap()); - - assert!(SledDirectoryService::from_url(&url).is_ok()); - } - - /// This sets a host, rather than a path, which should fail. - #[test] - fn test_invalid_host() { - let url = url::Url::parse("sled://foo.example").expect("must parse"); - - assert!(SledDirectoryService::from_url(&url).is_err()); - } - - /// This sets a host AND a valid path, which should fail - #[test] - fn test_invalid_host_and_path() { - let tmpdir = TempDir::new().unwrap(); - - let mut url = url::Url::parse("sled://foo.example").expect("must parse"); - url.set_path(tmpdir.path().to_str().unwrap()); - - assert!(SledDirectoryService::from_url(&url).is_err()); - } -} diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs deleted file mode 100644 index 5043439e9de5..000000000000 --- a/tvix/store/src/directoryservice/traverse.rs +++ /dev/null @@ -1,230 +0,0 @@ -use super::DirectoryService; -use crate::{proto::NamedNode, B3Digest, Error}; -use std::{os::unix::ffi::OsStrExt, sync::Arc}; -use tracing::{instrument, warn}; - -/// This descends from a (root) node to the given (sub)path, returning the Node -/// at that path, or none, if there's nothing at that path. -#[instrument(skip(directory_service))] -pub async fn descend_to( - directory_service: Arc<dyn DirectoryService>, - root_node: crate::proto::node::Node, - path: &std::path::Path, -) -> Result<Option<crate::proto::node::Node>, Error> { - // strip a possible `/` prefix from the path. - let path = { - if path.starts_with("/") { - path.strip_prefix("/").unwrap() - } else { - path - } - }; - - let mut cur_node = root_node; - let mut it = path.components(); - - loop { - match it.next() { - None => { - // the (remaining) path is empty, return the node we're current at. - return Ok(Some(cur_node)); - } - Some(first_component) => { - match cur_node { - crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => { - // There's still some path left, but the current node is no directory. - // This means the path doesn't exist, as we can't reach it. - return Ok(None); - } - crate::proto::node::Node::Directory(directory_node) => { - let digest: B3Digest = directory_node.digest.try_into().map_err(|_e| { - Error::StorageError("invalid digest length".to_string()) - })?; - - // fetch the linked node from the directory_service - match directory_service.get(&digest).await? { - // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! - None => { - warn!("directory {} does not exist", digest); - - return Err(Error::StorageError(format!( - "directory {} does not exist", - digest - ))); - } - Some(directory) => { - // look for first_component in the [Directory]. - // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we - // could stop as soon as e.name is larger than the search string. - let child_node = directory.nodes().find(|n| { - n.get_name() == first_component.as_os_str().as_bytes() - }); - - match child_node { - // child node not found means there's no such element inside the directory. - None => { - return Ok(None); - } - // child node found, return to top-of loop to find the next - // node in the path. - Some(child_node) => { - cur_node = child_node; - } - } - } - } - } - } - } - } - } -} - -#[cfg(test)] -mod tests { - use std::path::PathBuf; - - use crate::tests::{ - fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, - utils::gen_directory_service, - }; - - use super::descend_to; - - #[tokio::test] - async fn test_descend_to() { - let directory_service = gen_directory_service(); - - let mut handle = directory_service.put_multiple_start(); - handle - .put(DIRECTORY_WITH_KEEP.clone()) - .await - .expect("must succeed"); - handle - .put(DIRECTORY_COMPLICATED.clone()) - .await - .expect("must succeed"); - - // construct the node for DIRECTORY_COMPLICATED - let node_directory_complicated = - crate::proto::node::Node::Directory(crate::proto::DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }); - - // construct the node for DIRECTORY_COMPLICATED - let node_directory_with_keep = crate::proto::node::Node::Directory( - DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), - ); - - // construct the node for the .keep file - let node_file_keep = - crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); - - // traversal to an empty subpath should return the root node. - { - let resp = descend_to( - directory_service.clone(), - node_directory_complicated.clone(), - &PathBuf::from(""), - ) - .await - .expect("must succeed"); - - assert_eq!(Some(node_directory_complicated.clone()), resp); - } - - // traversal to `keep` should return the node for DIRECTORY_WITH_KEEP - { - let resp = descend_to( - directory_service.clone(), - node_directory_complicated.clone(), - &PathBuf::from("keep"), - ) - .await - .expect("must succeed"); - - assert_eq!(Some(node_directory_with_keep), resp); - } - - // traversal to `keep/.keep` should return the node for the .keep file - { - let resp = descend_to( - directory_service.clone(), - node_directory_complicated.clone(), - &PathBuf::from("keep/.keep"), - ) - .await - .expect("must succeed"); - - assert_eq!(Some(node_file_keep.clone()), resp); - } - - // traversal to `keep/.keep` should return the node for the .keep file - { - let resp = descend_to( - directory_service.clone(), - node_directory_complicated.clone(), - &PathBuf::from("/keep/.keep"), - ) - .await - .expect("must succeed"); - - assert_eq!(Some(node_file_keep), resp); - } - - // traversal to `void` should return None (doesn't exist) - { - let resp = descend_to( - directory_service.clone(), - node_directory_complicated.clone(), - &PathBuf::from("void"), - ) - .await - .expect("must succeed"); - - assert_eq!(None, resp); - } - - // traversal to `void` should return None (doesn't exist) - { - let resp = descend_to( - directory_service.clone(), - node_directory_complicated.clone(), - &PathBuf::from("//v/oid"), - ) - .await - .expect("must succeed"); - - assert_eq!(None, resp); - } - - // traversal to `keep/.keep/404` should return None (the path can't be - // reached, as keep/.keep already is a file) - { - let resp = descend_to( - directory_service.clone(), - node_directory_complicated.clone(), - &PathBuf::from("keep/.keep/foo"), - ) - .await - .expect("must succeed"); - - assert_eq!(None, resp); - } - - // traversal to a subpath of '/' should return the root node. - { - let resp = descend_to( - directory_service.clone(), - node_directory_complicated.clone(), - &PathBuf::from("/"), - ) - .await - .expect("must succeed"); - - assert_eq!(Some(node_directory_complicated), resp); - } - } -} diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs deleted file mode 100644 index 4c5e7cfde37c..000000000000 --- a/tvix/store/src/directoryservice/utils.rs +++ /dev/null @@ -1,140 +0,0 @@ -use super::DirectoryPutter; -use super::DirectoryService; -use crate::proto; -use crate::B3Digest; -use crate::Error; -use async_stream::stream; -use futures::Stream; -use std::collections::{HashSet, VecDeque}; -use std::pin::Pin; -use tonic::async_trait; -use tracing::warn; - -/// Traverses a [proto::Directory] from the root to the children. -/// -/// This is mostly BFS, but directories are only returned once. -pub fn traverse_directory<DS: DirectoryService + 'static>( - directory_service: DS, - root_directory_digest: &B3Digest, -) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { - // The list of all directories that still need to be traversed. The next - // element is picked from the front, new elements are enqueued at the - // back. - let mut worklist_directory_digests: VecDeque<B3Digest> = - VecDeque::from([root_directory_digest.clone()]); - // The list of directory digests already sent to the consumer. - // We omit sending the same directories multiple times. - let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new(); - - let stream = stream! { - while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { - match directory_service.get(¤t_directory_digest).await { - // if it's not there, we have an inconsistent store! - Ok(None) => { - warn!("directory {} does not exist", current_directory_digest); - yield Err(Error::StorageError(format!( - "directory {} does not exist", - current_directory_digest - ))); - } - Err(e) => { - warn!("failed to look up directory"); - yield Err(Error::StorageError(format!( - "unable to look up directory {}: {}", - current_directory_digest, e - ))); - } - - // if we got it - Ok(Some(current_directory)) => { - // validate, we don't want to send invalid directories. - if let Err(e) = current_directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - yield Err(Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - ))); - } - - // We're about to send this directory, so let's avoid sending it again if a - // descendant has it. - sent_directory_digests.insert(current_directory_digest); - - // enqueue all child directory digests to the work queue, as - // long as they're not part of the worklist or already sent. - // This panics if the digest looks invalid, it's supposed to be checked first. - for child_directory_node in ¤t_directory.directories { - // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); - - if worklist_directory_digests.contains(&child_digest) - || sent_directory_digests.contains(&child_digest) - { - continue; - } - worklist_directory_digests.push_back(child_digest); - } - - yield Ok(current_directory); - } - }; - } - }; - - Box::pin(stream) -} - -/// This is a simple implementation of a Directory uploader. -/// TODO: verify connectivity? Factor out these checks into generic helpers? -pub struct SimplePutter<DS: DirectoryService> { - directory_service: DS, - last_directory_digest: Option<B3Digest>, - closed: bool, -} - -impl<DS: DirectoryService> SimplePutter<DS> { - pub fn new(directory_service: DS) -> Self { - Self { - directory_service, - closed: false, - last_directory_digest: None, - } - } -} - -#[async_trait] -impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { - if self.closed { - return Err(Error::StorageError("already closed".to_string())); - } - - let digest = self.directory_service.put(directory).await?; - - // track the last directory digest - self.last_directory_digest = Some(digest); - - Ok(()) - } - - /// We need to be mutable here, as that's the signature of the trait. - async fn close(&mut self) -> Result<B3Digest, Error> { - if self.closed { - return Err(Error::StorageError("already closed".to_string())); - } - - match &self.last_directory_digest { - Some(last_digest) => { - self.closed = true; - Ok(last_digest.clone()) - } - None => Err(Error::InvalidRequest( - "no directories sent, can't show root digest".to_string(), - )), - } - } - - fn is_closed(&self) -> bool { - self.closed - } -} |