diff options
-rw-r--r-- | tvix/Cargo.lock | 1 | ||||
-rw-r--r-- | tvix/Cargo.nix | 7 | ||||
-rw-r--r-- | tvix/castore/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/closure_validator.rs | 51 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/from_addr.rs | 17 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/mod.rs | 2 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/object_store.rs | 261 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/tests/mod.rs | 1 |
8 files changed, 332 insertions, 11 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 1883cbbc3937..fc65c8869050 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -4072,6 +4072,7 @@ dependencies = [ name = "tvix-castore" version = "0.1.0" dependencies = [ + "async-compression", "async-process", "async-stream", "async-tempfile", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 2303f4b946ea..088bd6fdfcbf 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -12816,6 +12816,11 @@ rec { else ./castore; dependencies = [ { + name = "async-compression"; + packageId = "async-compression"; + features = [ "tokio" "zstd" ]; + } + { name = "async-stream"; packageId = "async-stream"; } @@ -12931,7 +12936,7 @@ rec { { name = "tokio-util"; packageId = "tokio-util"; - features = [ "io" "io-util" ]; + features = [ "io" "io-util" "codec" ]; } { name = "tonic"; diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index 1b317be73924..4cbc29053b22 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +async-compression = { version = "0.4.9", features = ["tokio", "zstd"]} async-stream = "0.3.5" async-tempfile = "0.4.0" blake3 = { version = "1.3.1", features = ["rayon", "std", "traits-preview"] } @@ -21,7 +22,7 @@ prost = "0.12.1" sled = { version = "0.34.7" } thiserror = "1.0.38" tokio-stream = { version = "0.1.14", features = ["fs", "net"] } -tokio-util = { version = "0.7.9", features = ["io", "io-util"] } +tokio-util = { version = "0.7.9", features = ["io", "io-util", "codec"] } tokio-tar = "0.3.1" tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } tonic = "0.11.0" diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs index 183928a86fad..6281d247021c 100644 --- a/tvix/castore/src/directoryservice/closure_validator.rs +++ b/tvix/castore/src/directoryservice/closure_validator.rs @@ -4,7 +4,7 @@ use bstr::ByteSlice; use petgraph::{ graph::{DiGraph, NodeIndex}, - visit::Bfs, + visit::{Bfs, Walker}, }; use tracing::instrument; @@ -13,6 +13,8 @@ use crate::{ B3Digest, Error, }; +type DirectoryGraph = DiGraph<Directory, ()>; + /// This can be used to validate a Directory closure (DAG of connected /// Directories), and their insertion order. /// @@ -37,7 +39,7 @@ use crate::{ pub struct ClosureValidator { // A directed graph, using Directory as node weight, without edge weights. // Edges point from parents to children. - graph: DiGraph<Directory, ()>, + graph: DirectoryGraph, // A lookup table from directory digest to node index. digest_to_node_ix: HashMap<B3Digest, NodeIndex>, @@ -122,11 +124,48 @@ impl ClosureValidator { /// In case no elements have been inserted, returns an empty list. #[instrument(level = "trace", skip_all, err)] pub(crate) fn finalize(self) -> Result<Vec<Directory>, Error> { + let (graph, _) = match self.finalize_raw()? { + None => return Ok(vec![]), + Some(v) => v, + }; + // Dissolve the graph, returning the nodes as a Vec. + // As the graph was populated in a valid DFS PostOrder, we can return + // nodes in that same order. + let (nodes, _edges) = graph.into_nodes_edges(); + Ok(nodes.into_iter().map(|x| x.weight).collect()) + } + + /// Ensure that all inserted Directories are connected, then return a + /// (deduplicated) and validated list of directories, in from-root-to-leaves + /// order. + /// In case no elements have been inserted, returns an empty list. + #[instrument(level = "trace", skip_all, err)] + pub(crate) fn finalize_root_to_leaves(self) -> Result<Vec<Directory>, Error> { + let (mut graph, root) = match self.finalize_raw()? { + None => return Ok(vec![]), + Some(v) => v, + }; + + // do a BFS traversal of the graph, starting with the root node to get + // (the count of) all nodes reachable from there. + let traversal = Bfs::new(&graph, root); + + Ok(traversal + .iter(&graph) + .collect::<Vec<_>>() + .into_iter() + .filter_map(|i| graph.remove_node(i)) + .collect()) + } + + /// Internal implementation of closure validation + #[instrument(level = "trace", skip_all, err)] + fn finalize_raw(self) -> Result<Option<(DirectoryGraph, NodeIndex)>, Error> { // If no nodes were inserted, an empty list is returned. let last_directory_ix = if let Some(x) = self.last_directory_ix { x } else { - return Ok(vec![]); + return Ok(None); }; // do a BFS traversal of the graph, starting with the root node to get @@ -172,11 +211,7 @@ impl ClosureValidator { } } - // Dissolve the graph, returning the nodes as a Vec. - // As the graph was populated in a valid DFS PostOrder, we can return - // nodes in that same order. - let (nodes, _edges) = self.graph.into_nodes_edges(); - Ok(nodes.into_iter().map(|x| x.weight).collect()) + Ok(Some((self.graph, last_directory_ix))) } } diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index ae51df6376f9..ee675ca68a9f 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -2,7 +2,10 @@ use url::Url; use crate::{proto::directory_service_client::DirectoryServiceClient, Error}; -use super::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService, SledDirectoryService}; +use super::{ + DirectoryService, GRPCDirectoryService, MemoryDirectoryService, ObjectStoreDirectoryService, + SledDirectoryService, +}; /// Constructs a new instance of a [DirectoryService] from an URI. /// @@ -63,6 +66,18 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Er let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?); Box::new(GRPCDirectoryService::from_client(client)) } + scheme if scheme.starts_with("objectstore+") => { + // We need to convert the URL to string, strip the prefix there, and then + // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. + let trimmed_url = { + let s = url.to_string(); + Url::parse(s.strip_prefix("objectstore+").unwrap()).unwrap() + }; + Box::new( + ObjectStoreDirectoryService::parse_url(&trimmed_url) + .map_err(|e| Error::StorageError(e.to_string()))?, + ) + } #[cfg(feature = "cloud")] "bigtable" => { use super::bigtable::BigtableParameters; diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index ca82ff2bc95d..3f180ef162d8 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -6,6 +6,7 @@ mod closure_validator; mod from_addr; mod grpc; mod memory; +mod object_store; mod simple_putter; mod sled; #[cfg(test)] @@ -17,6 +18,7 @@ pub use self::closure_validator::ClosureValidator; pub use self::from_addr::from_addr; pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService; +pub use self::object_store::ObjectStoreDirectoryService; pub use self::simple_putter::SimplePutter; pub use self::sled::SledDirectoryService; pub use self::traverse::descend_to; diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs new file mode 100644 index 000000000000..64ce335edb86 --- /dev/null +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -0,0 +1,261 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use data_encoding::HEXLOWER; +use futures::future::Either; +use futures::stream::BoxStream; +use futures::SinkExt; +use futures::StreamExt; +use futures::TryFutureExt; +use futures::TryStreamExt; +use object_store::{path::Path, ObjectStore}; +use prost::Message; +use tokio::io::AsyncWriteExt; +use tokio_util::codec::LengthDelimitedCodec; +use tonic::async_trait; +use tracing::{instrument, trace, warn, Level}; +use url::Url; + +use super::{ClosureValidator, DirectoryPutter, DirectoryService}; +use crate::{proto, B3Digest, Error}; + +/// Stores directory closures in an object store. +/// Notably, this makes use of the option to disallow accessing child directories except when +/// fetching them recursively via the top-level directory, since all batched writes +/// (using `put_multiple_start`) are stored in a single object. +/// Directories are stored in a length-delimited format with a 1MiB limit. The length field is a +/// u32 and the directories are stored in root-to-leaves topological order, the same way they will +/// be returned to the client in get_recursive. +#[derive(Clone)] +pub struct ObjectStoreDirectoryService { + object_store: Arc<dyn ObjectStore>, + base_path: Path, +} + +#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))] +fn derive_dirs_path(base_path: &Path, digest: &B3Digest) -> Path { + base_path + .child("dirs") + .child("b3") + .child(HEXLOWER.encode(&digest.as_slice()[..2])) + .child(HEXLOWER.encode(digest.as_slice())) +} + +#[allow(clippy::identity_op)] +const MAX_FRAME_LENGTH: usize = 1 * 1024 * 1024 * 1000; // 1 MiB + // +impl ObjectStoreDirectoryService { + /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// [object_store]. + /// Any path suffix becomes the base path of the object store. + /// additional options, the same as in [object_store::parse_url_opts] can + /// be passed. + pub fn parse_url_opts<I, K, V>(url: &Url, options: I) -> Result<Self, object_store::Error> + where + I: IntoIterator<Item = (K, V)>, + K: AsRef<str>, + V: Into<String>, + { + let (object_store, path) = object_store::parse_url_opts(url, options)?; + + Ok(Self { + object_store: Arc::new(object_store), + base_path: path, + }) + } + + /// Like [Self::parse_url_opts], except without the options. + pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> { + Self::parse_url_opts(url, Vec::<(String, String)>::new()) + } +} + +#[async_trait] +impl DirectoryService for ObjectStoreDirectoryService { + /// This is the same steps as for get_recursive anyways, so we just call get_recursive and + /// return the first element of the stream and drop the request. + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + self.get_recursive(digest).take(1).next().await.transpose() + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + if !directory.directories.is_empty() { + return Err(Error::InvalidRequest( + "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(), + )); + } + + let mut handle = self.put_multiple_start(); + handle.put(directory).await?; + handle.close().await + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<'static, Result<proto::Directory, Error>> { + // The Directory digests we're expecting to receive. + let mut expected_directory_digests: HashSet<B3Digest> = + HashSet::from([root_directory_digest.clone()]); + + let dir_path = derive_dirs_path(&self.base_path, root_directory_digest); + let object_store = self.object_store.clone(); + + Box::pin( + (async move { + let stream = match object_store.get(&dir_path).await { + Ok(v) => v.into_stream(), + Err(object_store::Error::NotFound { .. }) => { + return Ok(Either::Left(futures::stream::empty())) + } + Err(e) => return Err(std::io::Error::from(e).into()), + }; + + // get a reader of the response body. + let r = tokio_util::io::StreamReader::new(stream); + let decompressed_stream = async_compression::tokio::bufread::ZstdDecoder::new(r); + + // the subdirectories are stored in a length delimited format + let delimited_stream = LengthDelimitedCodec::builder() + .max_frame_length(MAX_FRAME_LENGTH) + .length_field_type::<u32>() + .new_read(decompressed_stream); + + let dirs_stream = delimited_stream.map_err(Error::from).and_then(move |buf| { + futures::future::ready((|| { + let mut hasher = blake3::Hasher::new(); + let digest: B3Digest = hasher.update(&buf).finalize().as_bytes().into(); + + // Ensure to only decode the directory objects whose digests we trust + let was_expected = expected_directory_digests.remove(&digest); + if !was_expected { + return Err(crate::Error::StorageError(format!( + "received unexpected directory {}", + digest + ))); + } + + let directory = proto::Directory::decode(&*buf).map_err(|e| { + warn!("unable to parse directory {}: {}", digest, e); + Error::StorageError(e.to_string()) + })?; + + for directory in &directory.directories { + // Allow the children to appear next + expected_directory_digests.insert( + B3Digest::try_from(directory.digest.clone()) + .map_err(|e| Error::StorageError(e.to_string()))?, + ); + } + + Ok(directory) + })()) + }); + + Ok(Either::Right(dirs_stream)) + }) + .try_flatten_stream(), + ) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + Box::new(ObjectStoreDirectoryPutter::new( + self.object_store.clone(), + self.base_path.clone(), + )) + } +} + +struct ObjectStoreDirectoryPutter { + object_store: Arc<dyn ObjectStore>, + base_path: Path, + + directory_validator: Option<ClosureValidator>, +} + +impl ObjectStoreDirectoryPutter { + fn new(object_store: Arc<dyn ObjectStore>, base_path: Path) -> Self { + Self { + object_store, + base_path, + directory_validator: Some(Default::default()), + } + } +} + +#[async_trait] +impl DirectoryPutter for ObjectStoreDirectoryPutter { + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + match self.directory_validator { + None => return Err(Error::StorageError("already closed".to_string())), + Some(ref mut validator) => { + validator.add(directory)?; + } + } + + Ok(()) + } + + #[instrument(level = "trace", skip_all, ret, err)] + async fn close(&mut self) -> Result<B3Digest, Error> { + let validator = match self.directory_validator.take() { + None => return Err(Error::InvalidRequest("already closed".to_string())), + Some(validator) => validator, + }; + + // retrieve the validated directories. + // It is important that they are in topological order (root first), + // since that's how we want to retrieve them from the object store in the end. + let directories = validator.finalize_root_to_leaves()?; + + // Get the root digest + let root_digest = directories + .first() + .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))? + .digest(); + + let dir_path = derive_dirs_path(&self.base_path, &root_digest); + + match self.object_store.head(&dir_path).await { + // directory tree already exists, nothing to do + Ok(_) => { + trace!("directory tree already exists"); + } + + // directory tree does not yet exist, compress and upload. + Err(object_store::Error::NotFound { .. }) => { + trace!("uploading directory tree"); + + let object_store_writer = + object_store::buffered::BufWriter::new(self.object_store.clone(), dir_path); + let compressed_writer = + async_compression::tokio::write::ZstdEncoder::new(object_store_writer); + let mut directories_sink = LengthDelimitedCodec::builder() + .max_frame_length(MAX_FRAME_LENGTH) + .length_field_type::<u32>() + .new_write(compressed_writer); + + for directory in directories { + directories_sink + .send(directory.encode_to_vec().into()) + .await?; + } + + let mut compressed_writer = directories_sink.into_inner(); + compressed_writer.shutdown().await?; + } + // other error + Err(err) => Err(std::io::Error::from(err))?, + } + + Ok(root_digest) + } +} diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index 1b40d9feb0ba..cc3c5b788a2c 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -26,6 +26,7 @@ use self::utils::make_grpc_directory_service_client; #[case::grpc(make_grpc_directory_service_client().await)] #[case::memory(directoryservice::from_addr("memory://").await.unwrap())] #[case::sled(directoryservice::from_addr("sled://").await.unwrap())] +#[case::objectstore(directoryservice::from_addr("objectstore+memory://").await.unwrap())] #[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))] pub fn directory_services(#[case] directory_service: impl DirectoryService) {} |