diff options
Diffstat (limited to 'tvix/castore/src/directoryservice/bigtable.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/bigtable.rs | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs new file mode 100644 index 000000000000..372135628050 --- /dev/null +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -0,0 +1,355 @@ +use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2}; +use bytes::Bytes; +use data_encoding::HEXLOWER; +use futures::stream::BoxStream; +use prost::Message; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DurationSeconds}; +use tonic::async_trait; +use tracing::{instrument, trace, warn}; + +use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter}; +use crate::{proto, B3Digest, Error}; + +/// There should not be more than 10 MiB in a single cell. +/// https://cloud.google.com/bigtable/docs/schema-design#cells +const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; + +/// Provides a [DirectoryService] implementation using +/// [Bigtable](https://cloud.google.com/bigtable/docs/) +/// as an underlying K/V store. +/// +/// # Data format +/// We use Bigtable as a plain K/V store. +/// The row key is the digest of the directory, in hexlower. +/// Inside the row, we currently have a single column/cell, again using the +/// hexlower directory digest. +/// Its value is the Directory message, serialized in canonical protobuf. +/// We currently only populate this column. +/// +/// In the future, we might want to introduce "bucketing", essentially storing +/// all directories inserted via `put_multiple_start` in a batched form. +/// This will prevent looking up intermediate Directories, which are not +/// directly at the root, so rely on store composition. +#[derive(Clone)] +pub struct BigtableDirectoryService { + client: bigtable::BigTable, + params: BigtableParameters, + + #[cfg(test)] + #[allow(dead_code)] + /// Holds the temporary directory containing the unix socket, and the + /// spawned emulator process. + emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, +} + +/// Represents configuration of [BigtableDirectoryService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option<DurationSeconds<String>>")] + #[serde(default = "default_timeout")] + timeout: Option<std::time::Duration>, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option<std::time::Duration> { + Some(std::time::Duration::from_secs(4)) +} + +impl BigtableDirectoryService { + #[cfg(not(test))] + pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + let connection = bigtable::BigTableConnection::new( + ¶ms.project_id, + ¶ms.instance_name, + params.is_read_only, + params.channel_size, + params.timeout, + ) + .await?; + + Ok(Self { + client: connection.client(), + params, + }) + } + + #[cfg(test)] + pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + use std::time::Duration; + + use async_process::{Command, Stdio}; + use tempfile::TempDir; + use tokio_retry::{strategy::ExponentialBackoff, Retry}; + + let tmpdir = TempDir::new().unwrap(); + + let socket_path = tmpdir.path().join("cbtemulator.sock"); + + let emulator_process = Command::new("cbtemulator") + .arg("-address") + .arg(socket_path.clone()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .expect("failed to spwan emulator"); + + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(1)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // populate the emulator + for cmd in &[ + vec!["createtable", ¶ms.table_name], + vec!["createfamily", ¶ms.table_name, ¶ms.family_name], + ] { + Command::new("cbt") + .args({ + let mut args = vec![ + "-instance", + ¶ms.instance_name, + "-project", + ¶ms.project_id, + ]; + args.extend_from_slice(cmd); + args + }) + .env( + "BIGTABLE_EMULATOR_HOST", + format!("unix://{}", socket_path.to_string_lossy()), + ) + .output() + .await + .expect("failed to run cbt setup command"); + } + + let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator( + &format!("unix://{}", socket_path.to_string_lossy()), + ¶ms.project_id, + ¶ms.instance_name, + params.is_read_only, + params.timeout, + )?; + + Ok(Self { + client: connection.client(), + params, + emulator: (tmpdir, emulator_process).into(), + }) + } +} + +/// Derives the row/column key for a given blake3 digest. +/// We use hexlower encoding, also because it can't be misinterpreted as RE2. +fn derive_directory_key(digest: &B3Digest) -> String { + HEXLOWER.encode(digest.as_slice()) +} + +#[async_trait] +impl DirectoryService for BigtableDirectoryService { + #[instrument(skip(self, digest), err, fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + let mut client = self.client.clone(); + let directory_key = derive_directory_key(digest); + + let request = bigtable_v2::ReadRowsRequest { + app_profile_id: self.params.app_profile_id.to_string(), + table_name: client.get_full_table_name(&self.params.table_name), + rows_limit: 1, + rows: Some(bigtable_v2::RowSet { + row_keys: vec![directory_key.clone().into()], + row_ranges: vec![], + }), + // Filter selected family name, and column qualifier matching our digest. + // This is to ensure we don't fail once we start bucketing. + filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::Chain( + bigtable_v2::row_filter::Chain { + filters: vec![ + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::FamilyNameRegexFilter( + self.params.family_name.to_string(), + ), + ), + }, + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + directory_key.clone().into(), + ), + ), + }, + ], + }, + )), + }), + ..Default::default() + }; + + let mut response = client + .read_rows(request) + .await + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + + if response.len() != 1 { + if response.len() > 1 { + // This shouldn't happen, we limit number of rows to 1 + return Err(Error::StorageError( + "got more than one row from bigtable".into(), + )); + } + // else, this is simply a "not found". + return Ok(None); + } + + let (row_key, mut row_cells) = response.pop().unwrap(); + if row_key != directory_key.as_bytes() { + // This shouldn't happen, we requested this row key. + return Err(Error::StorageError( + "got wrong row key from bigtable".into(), + )); + } + + let row_cell = row_cells + .pop() + .ok_or_else(|| Error::StorageError("found no cells".into()))?; + + // Ensure there's only one cell (so no more left after the pop()) + // This shouldn't happen, We filter out other cells in our query. + if !row_cells.is_empty() { + return Err(Error::StorageError( + "more than one cell returned from bigtable".into(), + )); + } + + // We also require the qualifier to be correct in the filter above, + // so this shouldn't happen. + if directory_key.as_bytes() != row_cell.qualifier { + return Err(Error::StorageError("unexpected cell qualifier".into())); + } + + // For the data in that cell, ensure the digest matches what's requested, before parsing. + let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes()); + if got_digest != *digest { + return Err(Error::StorageError(format!( + "invalid digest: {}", + got_digest + ))); + } + + // Try to parse the value into a Directory message. + let directory = proto::Directory::decode(Bytes::from(row_cell.value)) + .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?; + + // validate the Directory. + directory + .validate() + .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?; + + Ok(Some(directory)) + } + + #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let directory_digest = directory.digest(); + let mut client = self.client.clone(); + let directory_key = derive_directory_key(&directory_digest); + + // Ensure the directory we're trying to upload passes validation + directory + .validate() + .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?; + + let data = directory.encode_to_vec(); + if data.len() as u64 > CELL_SIZE_LIMIT { + return Err(Error::StorageError( + "Directory exceeds cell limit on Bigtable".into(), + )); + } + + let resp = client + .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest { + table_name: client.get_full_table_name(&self.params.table_name), + app_profile_id: self.params.app_profile_id.to_string(), + row_key: directory_key.clone().into(), + predicate_filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + directory_key.clone().into(), + )), + }), + // If the column was already found, do nothing. + true_mutations: vec![], + // Else, do the insert. + false_mutations: vec![ + // https://cloud.google.com/bigtable/docs/writes + bigtable_v2::Mutation { + mutation: Some(bigtable_v2::mutation::Mutation::SetCell( + bigtable_v2::mutation::SetCell { + family_name: self.params.family_name.to_string(), + column_qualifier: directory_key.clone().into(), + timestamp_micros: -1, // use server time to fill timestamp + value: data, + }, + )), + }, + ], + }) + .await + .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?; + + if resp.predicate_matched { + trace!("already existed") + } + + Ok(directory_digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<Result<proto::Directory, Error>> { + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + Box::new(SimplePutter::new(self.clone())) + } +} |