diff options
Diffstat (limited to 'tvix/castore/src')
-rw-r--r-- | tvix/castore/src/directoryservice/bigtable.rs | 355 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/from_addr.rs | 48 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/mod.rs | 6 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/tests/mod.rs | 1 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/tests/utils.rs | 1 |
5 files changed, 410 insertions, 1 deletions
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs new file mode 100644 index 000000000000..372135628050 --- /dev/null +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -0,0 +1,355 @@ +use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2}; +use bytes::Bytes; +use data_encoding::HEXLOWER; +use futures::stream::BoxStream; +use prost::Message; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DurationSeconds}; +use tonic::async_trait; +use tracing::{instrument, trace, warn}; + +use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter}; +use crate::{proto, B3Digest, Error}; + +/// There should not be more than 10 MiB in a single cell. +/// https://cloud.google.com/bigtable/docs/schema-design#cells +const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; + +/// Provides a [DirectoryService] implementation using +/// [Bigtable](https://cloud.google.com/bigtable/docs/) +/// as an underlying K/V store. +/// +/// # Data format +/// We use Bigtable as a plain K/V store. +/// The row key is the digest of the directory, in hexlower. +/// Inside the row, we currently have a single column/cell, again using the +/// hexlower directory digest. +/// Its value is the Directory message, serialized in canonical protobuf. +/// We currently only populate this column. +/// +/// In the future, we might want to introduce "bucketing", essentially storing +/// all directories inserted via `put_multiple_start` in a batched form. +/// This will prevent looking up intermediate Directories, which are not +/// directly at the root, so rely on store composition. +#[derive(Clone)] +pub struct BigtableDirectoryService { + client: bigtable::BigTable, + params: BigtableParameters, + + #[cfg(test)] + #[allow(dead_code)] + /// Holds the temporary directory containing the unix socket, and the + /// spawned emulator process. + emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, +} + +/// Represents configuration of [BigtableDirectoryService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option<DurationSeconds<String>>")] + #[serde(default = "default_timeout")] + timeout: Option<std::time::Duration>, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option<std::time::Duration> { + Some(std::time::Duration::from_secs(4)) +} + +impl BigtableDirectoryService { + #[cfg(not(test))] + pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + let connection = bigtable::BigTableConnection::new( + ¶ms.project_id, + ¶ms.instance_name, + params.is_read_only, + params.channel_size, + params.timeout, + ) + .await?; + + Ok(Self { + client: connection.client(), + params, + }) + } + + #[cfg(test)] + pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + use std::time::Duration; + + use async_process::{Command, Stdio}; + use tempfile::TempDir; + use tokio_retry::{strategy::ExponentialBackoff, Retry}; + + let tmpdir = TempDir::new().unwrap(); + + let socket_path = tmpdir.path().join("cbtemulator.sock"); + + let emulator_process = Command::new("cbtemulator") + .arg("-address") + .arg(socket_path.clone()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .expect("failed to spwan emulator"); + + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(1)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // populate the emulator + for cmd in &[ + vec!["createtable", ¶ms.table_name], + vec!["createfamily", ¶ms.table_name, ¶ms.family_name], + ] { + Command::new("cbt") + .args({ + let mut args = vec![ + "-instance", + ¶ms.instance_name, + "-project", + ¶ms.project_id, + ]; + args.extend_from_slice(cmd); + args + }) + .env( + "BIGTABLE_EMULATOR_HOST", + format!("unix://{}", socket_path.to_string_lossy()), + ) + .output() + .await + .expect("failed to run cbt setup command"); + } + + let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator( + &format!("unix://{}", socket_path.to_string_lossy()), + ¶ms.project_id, + ¶ms.instance_name, + params.is_read_only, + params.timeout, + )?; + + Ok(Self { + client: connection.client(), + params, + emulator: (tmpdir, emulator_process).into(), + }) + } +} + +/// Derives the row/column key for a given blake3 digest. +/// We use hexlower encoding, also because it can't be misinterpreted as RE2. +fn derive_directory_key(digest: &B3Digest) -> String { + HEXLOWER.encode(digest.as_slice()) +} + +#[async_trait] +impl DirectoryService for BigtableDirectoryService { + #[instrument(skip(self, digest), err, fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + let mut client = self.client.clone(); + let directory_key = derive_directory_key(digest); + + let request = bigtable_v2::ReadRowsRequest { + app_profile_id: self.params.app_profile_id.to_string(), + table_name: client.get_full_table_name(&self.params.table_name), + rows_limit: 1, + rows: Some(bigtable_v2::RowSet { + row_keys: vec![directory_key.clone().into()], + row_ranges: vec![], + }), + // Filter selected family name, and column qualifier matching our digest. + // This is to ensure we don't fail once we start bucketing. + filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::Chain( + bigtable_v2::row_filter::Chain { + filters: vec![ + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::FamilyNameRegexFilter( + self.params.family_name.to_string(), + ), + ), + }, + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + directory_key.clone().into(), + ), + ), + }, + ], + }, + )), + }), + ..Default::default() + }; + + let mut response = client + .read_rows(request) + .await + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + + if response.len() != 1 { + if response.len() > 1 { + // This shouldn't happen, we limit number of rows to 1 + return Err(Error::StorageError( + "got more than one row from bigtable".into(), + )); + } + // else, this is simply a "not found". + return Ok(None); + } + + let (row_key, mut row_cells) = response.pop().unwrap(); + if row_key != directory_key.as_bytes() { + // This shouldn't happen, we requested this row key. + return Err(Error::StorageError( + "got wrong row key from bigtable".into(), + )); + } + + let row_cell = row_cells + .pop() + .ok_or_else(|| Error::StorageError("found no cells".into()))?; + + // Ensure there's only one cell (so no more left after the pop()) + // This shouldn't happen, We filter out other cells in our query. + if !row_cells.is_empty() { + return Err(Error::StorageError( + "more than one cell returned from bigtable".into(), + )); + } + + // We also require the qualifier to be correct in the filter above, + // so this shouldn't happen. + if directory_key.as_bytes() != row_cell.qualifier { + return Err(Error::StorageError("unexpected cell qualifier".into())); + } + + // For the data in that cell, ensure the digest matches what's requested, before parsing. + let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes()); + if got_digest != *digest { + return Err(Error::StorageError(format!( + "invalid digest: {}", + got_digest + ))); + } + + // Try to parse the value into a Directory message. + let directory = proto::Directory::decode(Bytes::from(row_cell.value)) + .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?; + + // validate the Directory. + directory + .validate() + .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?; + + Ok(Some(directory)) + } + + #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let directory_digest = directory.digest(); + let mut client = self.client.clone(); + let directory_key = derive_directory_key(&directory_digest); + + // Ensure the directory we're trying to upload passes validation + directory + .validate() + .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?; + + let data = directory.encode_to_vec(); + if data.len() as u64 > CELL_SIZE_LIMIT { + return Err(Error::StorageError( + "Directory exceeds cell limit on Bigtable".into(), + )); + } + + let resp = client + .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest { + table_name: client.get_full_table_name(&self.params.table_name), + app_profile_id: self.params.app_profile_id.to_string(), + row_key: directory_key.clone().into(), + predicate_filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + directory_key.clone().into(), + )), + }), + // If the column was already found, do nothing. + true_mutations: vec![], + // Else, do the insert. + false_mutations: vec![ + // https://cloud.google.com/bigtable/docs/writes + bigtable_v2::Mutation { + mutation: Some(bigtable_v2::mutation::Mutation::SetCell( + bigtable_v2::mutation::SetCell { + family_name: self.params.family_name.to_string(), + column_qualifier: directory_key.clone().into(), + timestamp_micros: -1, // use server time to fill timestamp + value: data, + }, + )), + }, + ], + }) + .await + .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?; + + if resp.predicate_matched { + trace!("already existed") + } + + Ok(directory_digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<Result<proto::Directory, Error>> { + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + Box::new(SimplePutter::new(self.clone())) + } +} diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index eb01cf348a7f..31158d3a38cc 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -19,7 +19,8 @@ use super::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService, Sled /// - `grpc+http://host:port`, `grpc+https://host:port` /// Connects to a (remote) tvix-store gRPC service. pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Error> { - let url = Url::parse(uri) + #[allow(unused_mut)] + let mut url = Url::parse(uri) .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; let directory_service: Box<dyn DirectoryService> = match url.scheme() { @@ -62,6 +63,30 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Er let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?); Box::new(GRPCDirectoryService::from_client(client)) } + #[cfg(feature = "cloud")] + "bigtable" => { + use super::bigtable::BigtableParameters; + use super::BigtableDirectoryService; + + // parse the instance name from the hostname. + let instance_name = url + .host_str() + .ok_or_else(|| Error::StorageError("instance name missing".into()))? + .to_string(); + + // … but add it to the query string now, so we just need to parse that. + url.query_pairs_mut() + .append_pair("instance_name", &instance_name); + + let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + + Box::new( + BigtableDirectoryService::connect(params) + .await + .map_err(|e| Error::StorageError(e.to_string()))?, + ) + } _ => { return Err(crate::Error::StorageError(format!( "unknown scheme: {}", @@ -117,6 +142,27 @@ mod tests { #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)] /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] + /// A valid example for Bigtable + #[cfg_attr( + feature = "cloud", + case::bigtable_valid_url( + "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1", + true + ) + )] + /// A valid example for Bigtable, specifying a custom channel size and timeout + #[cfg_attr( + feature = "cloud", + case::bigtable_valid_url( + "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1&channel_size=10&timeout=10", + true + ) + )] + /// A invalid Bigtable example (missing fields) + #[cfg_attr( + feature = "cloud", + case::bigtable_invalid_url("bigtable://instance-1", false) + )] #[tokio::test] async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { if exp_succeed { diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index e9ac73173988..cf6bea39d809 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -22,6 +22,12 @@ pub use self::sled::SledDirectoryService; pub use self::traverse::descend_to; pub use self::utils::traverse_directory; +#[cfg(feature = "cloud")] +mod bigtable; + +#[cfg(feature = "cloud")] +pub use self::bigtable::BigtableDirectoryService; + /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their /// digest. diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index 23650069b1db..50c8a5c6d3d5 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -26,6 +26,7 @@ use self::utils::make_grpc_directory_service_client; #[case::grpc(make_grpc_directory_service_client().await)] #[case::memory(directoryservice::from_addr("memory://").await.unwrap())] #[case::sled(directoryservice::from_addr("sled://").await.unwrap())] +#[cfg_attr(feature = "cloud", case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))] pub fn directory_services(#[case] directory_service: impl DirectoryService) {} /// Ensures asking for a directory that doesn't exist returns a Ok(None). diff --git a/tvix/castore/src/directoryservice/tests/utils.rs b/tvix/castore/src/directoryservice/tests/utils.rs index 72a3ff754d19..0f706695eec8 100644 --- a/tvix/castore/src/directoryservice/tests/utils.rs +++ b/tvix/castore/src/directoryservice/tests/utils.rs @@ -5,6 +5,7 @@ use crate::{ directoryservice::MemoryDirectoryService, proto::directory_service_server::DirectoryServiceServer, }; + use tonic::transport::{Endpoint, Server, Uri}; /// Constructs and returns a gRPC DirectoryService. |