diff options
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r-- | tvix/castore/src/blobservice/chunked_reader.rs | 496 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/combinator.rs | 128 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/from_addr.rs | 88 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 388 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/memory.rs | 155 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/mod.rs | 112 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/object_store.rs | 617 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/tests/mod.rs | 253 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/tests/utils.rs | 42 |
9 files changed, 2279 insertions, 0 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs new file mode 100644 index 000000000000..6e8355874bca --- /dev/null +++ b/tvix/castore/src/blobservice/chunked_reader.rs @@ -0,0 +1,496 @@ +use futures::{ready, TryStreamExt}; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncSeekExt}; +use tokio_stream::StreamExt; +use tokio_util::io::{ReaderStream, StreamReader}; +use tracing::{instrument, trace, warn}; + +use crate::B3Digest; +use std::{cmp::Ordering, pin::Pin}; + +use super::{BlobReader, BlobService}; + +pin_project! { + /// ChunkedReader provides a chunk-aware [BlobReader], so allows reading and + /// seeking into a blob. + /// It internally holds a [ChunkedBlob], which is storing chunk information + /// able to emit a reader seeked to a specific position whenever we need to seek. + pub struct ChunkedReader<BS> { + chunked_blob: ChunkedBlob<BS>, + + #[pin] + r: Box<dyn AsyncRead + Unpin + Send>, + + pos: u64, + } +} + +impl<BS> ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static + Send, +{ + /// Construct a new [ChunkedReader], by retrieving a list of chunks (their + /// blake3 digests and chunk sizes) + pub fn from_chunks(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self { + let chunked_blob = ChunkedBlob::from_iter(chunks_it, blob_service); + let r = chunked_blob.reader_skipped_offset(0); + + Self { + chunked_blob, + r, + pos: 0, + } + } +} + +/// ChunkedReader implements BlobReader. +impl<BS> BlobReader for ChunkedReader<BS> where BS: Send + Clone + 'static + AsRef<dyn BlobService> {} + +impl<BS> tokio::io::AsyncRead for ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static, +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + // The amount of data read can be determined by the increase + // in the length of the slice returned by `ReadBuf::filled`. + let filled_before = buf.filled().len(); + + let this = self.project(); + + ready!(this.r.poll_read(cx, buf))?; + let bytes_read = buf.filled().len() - filled_before; + *this.pos += bytes_read as u64; + + Ok(()).into() + } +} + +impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + Send + 'static, +{ + #[instrument(skip(self), err(Debug))] + fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { + let total_len = self.chunked_blob.blob_length(); + let mut this = self.project(); + + let absolute_offset: u64 = match position { + std::io::SeekFrom::Start(from_start) => from_start, + std::io::SeekFrom::End(from_end) => { + // note from_end is i64, not u64, so this is usually negative. + total_len.checked_add_signed(from_end).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "over/underflow while seeking", + ) + })? + } + std::io::SeekFrom::Current(from_current) => { + // note from_end is i64, not u64, so this can be positive or negative. + (*this.pos) + .checked_add_signed(from_current) + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "over/underflow while seeking", + ) + })? + } + }; + + // check if the position actually did change. + if absolute_offset != *this.pos { + // ensure the new position still is inside the file. + if absolute_offset > total_len { + Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seeked beyond EOF", + ))? + } + + // Update the position and the internal reader. + *this.pos = absolute_offset; + + // FUTUREWORK: if we can seek forward, avoid re-assembling. + // At least if it's still in the same chunk? + *this.r = this.chunked_blob.reader_skipped_offset(absolute_offset); + } + + Ok(()) + } + + fn poll_complete( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<std::io::Result<u64>> { + std::task::Poll::Ready(Ok(self.pos)) + } +} + +/// Holds a list of blake3 digest for individual chunks (and their sizes). +/// Is able to construct a Reader that seeked to a certain offset, which +/// is useful to construct a BlobReader (that implements AsyncSeek). +/// - the current chunk index, and a Custor<Vec<u8>> holding the data of that chunk. +struct ChunkedBlob<BS> { + blob_service: BS, + chunks: Vec<(u64, u64, B3Digest)>, +} + +impl<BS> ChunkedBlob<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static + Send, +{ + /// Constructs [Self] from a list of blake3 digests of chunks and their + /// sizes, and a reference to a blob service. + /// Initializing it with an empty list is disallowed. + fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self { + let mut chunks = Vec::new(); + let mut offset: u64 = 0; + + for (chunk_digest, chunk_size) in chunks_it { + chunks.push((offset, chunk_size, chunk_digest)); + offset += chunk_size; + } + + assert!( + !chunks.is_empty(), + "Chunks must be provided, don't use this for blobs without chunks" + ); + + Self { + blob_service, + chunks, + } + } + + /// Returns the length of the blob. + fn blob_length(&self) -> u64 { + self.chunks + .last() + .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size) + .unwrap_or(0) + } + + /// For a given position pos, return the chunk containing the data. + /// In case this would range outside the blob, None is returned. + #[instrument(level = "trace", skip(self), ret)] + fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> { + // FUTUREWORK: benchmark when to use linear search, binary_search and BTreeSet + self.chunks + .binary_search_by(|(chunk_start_pos, chunk_size, _)| { + if chunk_start_pos + chunk_size <= pos { + Ordering::Less + } else if *chunk_start_pos > pos { + Ordering::Greater + } else { + Ordering::Equal + } + }) + .ok() + } + + /// Returns a stream of bytes of the data in that blob. + /// It internally assembles a stream reading from each chunk (skipping over + /// chunks containing irrelevant data). + /// From the first relevant chunk, the irrelevant bytes are skipped too. + /// The returned boxed thing does not implement AsyncSeek on its own, but + /// ChunkedReader does. + #[instrument(level = "trace", skip(self))] + fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> { + if offset == self.blob_length() { + return Box::new(std::io::Cursor::new(vec![])); + } + // construct a stream of all chunks starting with the given offset + let start_chunk_idx = self + .get_chunk_idx_for_position(offset) + .expect("outside of blob"); + // It's ok to panic here, we can only reach this by seeking, and seeking should already reject out-of-file seeking. + + let skip_first_chunk_bytes = (offset - self.chunks[start_chunk_idx].0) as usize; + + let blob_service = self.blob_service.clone(); + let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec(); + let readers_stream = tokio_stream::iter(chunks.into_iter().enumerate()).map( + move |(nth_chunk, (_chunk_start_offset, chunk_size, chunk_digest))| { + let chunk_digest = chunk_digest.to_owned(); + let blob_service = blob_service.clone(); + async move { + trace!(chunk_size=%chunk_size, chunk_digest=%chunk_digest, "open_read on chunk in stream"); + let mut blob_reader = blob_service + .as_ref() + .open_read(&chunk_digest.to_owned()) + .await? + .ok_or_else(|| { + warn!(chunk.digest = %chunk_digest, "chunk not found"); + std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found") + })?; + + // iff this is the first chunk in the stream, skip by skip_first_chunk_bytes + if nth_chunk == 0 && skip_first_chunk_bytes > 0 { + blob_reader + .seek(std::io::SeekFrom::Start(skip_first_chunk_bytes as u64)) + .await?; + } + Ok::<_, std::io::Error>(blob_reader) + } + }, + ); + + // convert the stream of readers to a stream of streams of byte chunks + let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) }); + + // flatten into one stream of byte chunks + let bytes_stream = bytes_streams.try_flatten(); + + // convert into AsyncRead + Box::new(StreamReader::new(Box::pin(bytes_stream))) + } +} + +#[cfg(test)] +mod test { + use std::{io::SeekFrom, sync::Arc}; + + use crate::{ + blobservice::{chunked_reader::ChunkedReader, BlobService, MemoryBlobService}, + B3Digest, + }; + use hex_literal::hex; + use lazy_static::lazy_static; + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + const CHUNK_1: [u8; 2] = hex!("0001"); + const CHUNK_2: [u8; 4] = hex!("02030405"); + const CHUNK_3: [u8; 1] = hex!("06"); + const CHUNK_4: [u8; 2] = hex!("0708"); + const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f"); + + lazy_static! { + // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]` + pub static ref CHUNK_1_DIGEST: B3Digest = blake3::hash(&CHUNK_1).as_bytes().into(); + pub static ref CHUNK_2_DIGEST: B3Digest = blake3::hash(&CHUNK_2).as_bytes().into(); + pub static ref CHUNK_3_DIGEST: B3Digest = blake3::hash(&CHUNK_3).as_bytes().into(); + pub static ref CHUNK_4_DIGEST: B3Digest = blake3::hash(&CHUNK_4).as_bytes().into(); + pub static ref CHUNK_5_DIGEST: B3Digest = blake3::hash(&CHUNK_5).as_bytes().into(); + pub static ref BLOB_1_LIST: [(B3Digest, u64); 5] = [ + (CHUNK_1_DIGEST.clone(), 2), + (CHUNK_2_DIGEST.clone(), 4), + (CHUNK_3_DIGEST.clone(), 1), + (CHUNK_4_DIGEST.clone(), 2), + (CHUNK_5_DIGEST.clone(), 7), + ]; + } + + use super::ChunkedBlob; + + /// ensure the start offsets are properly calculated. + #[test] + fn from_iter() { + let cb = ChunkedBlob::from_iter( + BLOB_1_LIST.clone().into_iter(), + Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>, + ); + + assert_eq!( + cb.chunks, + Vec::from_iter([ + (0, 2, CHUNK_1_DIGEST.clone()), + (2, 4, CHUNK_2_DIGEST.clone()), + (6, 1, CHUNK_3_DIGEST.clone()), + (7, 2, CHUNK_4_DIGEST.clone()), + (9, 7, CHUNK_5_DIGEST.clone()), + ]) + ); + } + + /// ensure ChunkedBlob can't be used with an empty list of chunks + #[test] + #[should_panic] + fn from_iter_empty() { + ChunkedBlob::from_iter( + [].into_iter(), + Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>, + ); + } + + /// ensure the right chunk is selected + #[test] + fn chunk_idx_for_position() { + let cb = ChunkedBlob::from_iter( + BLOB_1_LIST.clone().into_iter(), + Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>, + ); + + assert_eq!(Some(0), cb.get_chunk_idx_for_position(0), "start of blob"); + + assert_eq!( + Some(0), + cb.get_chunk_idx_for_position(1), + "middle of first chunk" + ); + assert_eq!( + Some(1), + cb.get_chunk_idx_for_position(2), + "beginning of second chunk" + ); + + assert_eq!( + Some(4), + cb.get_chunk_idx_for_position(15), + "right before the end of the blob" + ); + assert_eq!( + None, + cb.get_chunk_idx_for_position(16), + "right outside the blob" + ); + assert_eq!( + None, + cb.get_chunk_idx_for_position(100), + "way outside the blob" + ); + } + + /// returns a blobservice with all chunks in BLOB_1 present. + async fn gen_blobservice_blob1() -> Arc<dyn BlobService> { + let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>; + + // seed blob service with all chunks + for blob_contents in [ + CHUNK_1.to_vec(), + CHUNK_2.to_vec(), + CHUNK_3.to_vec(), + CHUNK_4.to_vec(), + CHUNK_5.to_vec(), + ] { + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw) + .await + .expect("writing blob"); + bw.close().await.expect("close blobwriter"); + } + + blob_service + } + + #[tokio::test] + async fn test_read() { + let blob_service = gen_blobservice_blob1().await; + let mut chunked_reader = + ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service); + + // read all data + let mut buf = Vec::new(); + tokio::io::copy(&mut chunked_reader, &mut buf) + .await + .expect("copy"); + + assert_eq!( + hex!("000102030405060708090a0b0c0d0e0f").to_vec(), + buf, + "read data must match" + ); + } + + #[tokio::test] + async fn test_seek() { + let blob_service = gen_blobservice_blob1().await; + let mut chunked_reader = + ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service); + + // seek to the end + // expect to read 0 bytes + { + chunked_reader + .seek(SeekFrom::End(0)) + .await + .expect("seek to end"); + + let mut buf = Vec::new(); + chunked_reader + .read_to_end(&mut buf) + .await + .expect("read to end"); + + assert_eq!(hex!("").to_vec(), buf); + } + + // seek one bytes before the end + { + chunked_reader.seek(SeekFrom::End(-1)).await.expect("seek"); + + let mut buf = Vec::new(); + chunked_reader + .read_to_end(&mut buf) + .await + .expect("read to end"); + + assert_eq!(hex!("0f").to_vec(), buf); + } + + // seek back three bytes, but using relative positioning + // read two bytes + { + chunked_reader + .seek(SeekFrom::Current(-3)) + .await + .expect("seek"); + + let mut buf = [0b0; 2]; + chunked_reader + .read_exact(&mut buf) + .await + .expect("read exact"); + + assert_eq!(hex!("0d0e"), buf); + } + } + + // seeds a blob service with only the first two chunks, reads a bit in the + // front (which succeeds), but then tries to seek past and read more (which + // should fail). + #[tokio::test] + async fn test_read_missing_chunks() { + let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>; + + for blob_contents in [CHUNK_1.to_vec(), CHUNK_2.to_vec()] { + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw) + .await + .expect("writing blob"); + + bw.close().await.expect("close blobwriter"); + } + + let mut chunked_reader = + ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service); + + // read a bit from the front (5 bytes out of 6 available) + let mut buf = [0b0; 5]; + chunked_reader + .read_exact(&mut buf) + .await + .expect("read exact"); + + assert_eq!(hex!("0001020304"), buf); + + // seek 2 bytes forward, into an area where we don't have chunks + chunked_reader + .seek(SeekFrom::Current(2)) + .await + .expect("seek"); + + let mut buf = Vec::new(); + chunked_reader + .read_to_end(&mut buf) + .await + .expect_err("must fail"); + + // FUTUREWORK: check semantics on errorkinds. Should this be InvalidData + // or NotFound? + } +} diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs new file mode 100644 index 000000000000..6a964c8a8440 --- /dev/null +++ b/tvix/castore/src/blobservice/combinator.rs @@ -0,0 +1,128 @@ +use std::sync::Arc; + +use tonic::async_trait; +use tracing::instrument; + +use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{B3Digest, Error}; + +use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; + +/// Combinator for a BlobService, using a "local" and "remote" blobservice. +/// Requests are tried in (and returned from) the local store first, only if +/// things are not present there, the remote BlobService is queried. +/// In case the local blobservice doesn't have the blob, we ask the remote +/// blobservice for chunks, and try to read each of these chunks from the local +/// blobservice again, before falling back to the remote one. +/// The remote BlobService is never written to. +pub struct CombinedBlobService<BL, BR> { + local: BL, + remote: BR, +} + +impl<BL, BR> Clone for CombinedBlobService<BL, BR> +where + BL: Clone, + BR: Clone, +{ + fn clone(&self) -> Self { + Self { + local: self.local.clone(), + remote: self.remote.clone(), + } + } +} + +#[async_trait] +impl<BL, BR> BlobService for CombinedBlobService<BL, BR> +where + BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, + BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, +{ + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> { + Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?) + } + + #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> { + if self.local.as_ref().has(digest).await? { + // local store has the blob, so we can assume it also has all chunks. + self.local.as_ref().open_read(digest).await + } else { + // Local store doesn't have the blob. + // Ask the remote one for the list of chunks, + // and create a chunked reader that uses self.open_read() for + // individual chunks. There's a chance we already have some chunks + // locally, meaning we don't need to fetch them all from the remote + // BlobService. + match self.remote.as_ref().chunks(digest).await? { + // blob doesn't exist on the remote side either, nothing we can do. + None => Ok(None), + Some(remote_chunks) => { + // if there's no more granular chunks, or the remote + // blobservice doesn't support chunks, read the blob from + // the remote blobservice directly. + if remote_chunks.is_empty() { + return self.remote.as_ref().open_read(digest).await; + } + // otherwise, a chunked reader, which will always try the + // local backend first. + + let chunked_reader = ChunkedReader::from_chunks( + remote_chunks.into_iter().map(|chunk| { + ( + chunk.digest.try_into().expect("invalid b3 digest"), + chunk.size, + ) + }), + Arc::new(self.clone()) as Arc<dyn BlobService>, + ); + Ok(Some(Box::new(chunked_reader))) + } + } + } + } + + #[instrument(skip_all)] + async fn open_write(&self) -> Box<dyn BlobWriter> { + // direct writes to the local one. + self.local.as_ref().open_write().await + } +} + +#[derive(serde::Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct CombinedBlobServiceConfig { + local: String, + remote: String, +} + +impl TryFrom<url::Url> for CombinedBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(_url: url::Url) -> Result<Self, Self::Error> { + Err(Error::StorageError( + "Instantiating a CombinedBlobService from a url is not supported".into(), + ) + .into()) + } +} + +#[async_trait] +impl ServiceBuilder for CombinedBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> { + let (local, remote) = futures::join!( + context.resolve(self.local.clone()), + context.resolve(self.remote.clone()) + ); + Ok(Arc::new(CombinedBlobService { + local: local?, + remote: remote?, + })) + } +} diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs new file mode 100644 index 000000000000..c5cabaa9d945 --- /dev/null +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -0,0 +1,88 @@ +use std::sync::Arc; + +use url::Url; + +use crate::composition::{ + with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG, +}; + +use super::BlobService; + +/// Constructs a new instance of a [BlobService] from an URI. +/// +/// The following schemes are supported by the following services: +/// - `memory://` ([MemoryBlobService]) +/// - `grpc+*://` ([GRPCBlobService]) +/// - `objectstore+*://` ([ObjectStoreBlobService]) +/// +/// See their `from_url` methods for more details about their syntax. +pub async fn from_addr( + uri: &str, +) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> { + let url = Url::parse(uri) + .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; + + let blob_service_config = with_registry(®, || { + <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>::try_from(url) + })? + .0; + let blob_service = blob_service_config + .build("anonymous", &CompositionContext::blank()) + .await?; + + Ok(blob_service) +} + +#[cfg(test)] +mod tests { + use super::from_addr; + use rstest::rstest; + + #[rstest] + /// This uses an unsupported scheme. + #[case::unsupported_scheme("http://foo.example/test", false)] + /// This correctly sets the scheme, and doesn't set a path. + #[case::memory_valid("memory://", true)] + /// This sets a memory url host to `foo` + #[case::memory_invalid_host("memory://foo", false)] + /// This sets a memory url path to "/", which is invalid. + #[case::memory_invalid_root_path("memory:///", false)] + /// This sets a memory url path to "/foo", which is invalid. + #[case::memory_invalid_root_path_foo("memory:///foo", false)] + /// Correct scheme to connect to a unix socket. + #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)] + /// Correct scheme for unix socket, but setting a host too, which is invalid. + #[case::grpc_invalid_unix_socket_and_host("grpc+unix://host.example/path/to/somewhere", false)] + /// Correct scheme to connect to localhost, with port 12345 + #[case::grpc_valid_ipv6_localhost_port_12345("grpc+http://[::1]:12345", true)] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[case::grpc_valid_http_host_without_port("grpc+http://localhost", true)] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)] + /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. + #[case::grpc_invalid_has_path("grpc+http://localhost/some-path", false)] + /// An example for object store (InMemory) + #[case::objectstore_valid_memory("objectstore+memory:///", true)] + /// An example for object store (LocalFileSystem) + #[case::objectstore_valid_file("objectstore+file:///foo/bar", true)] + // An example for object store (HTTP / WebDAV) + #[case::objectstore_valid_http_url("objectstore+https://localhost:8080/some-path", true)] + /// An example for object store (S3) + #[cfg_attr( + feature = "cloud", + case::objectstore_valid_s3_url("objectstore+s3://bucket/path", true) + )] + /// An example for object store (GCS) + #[cfg_attr( + feature = "cloud", + case::objectstore_valid_gcs_url("objectstore+gs://bucket/path", true) + )] + #[tokio::test] + async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { + if exp_succeed { + from_addr(uri_str).await.expect("should succeed"); + } else { + assert!(from_addr(uri_str).await.is_err(), "should fail"); + } + } +} diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs new file mode 100644 index 000000000000..0db3dfea4ad8 --- /dev/null +++ b/tvix/castore/src/blobservice/grpc.rs @@ -0,0 +1,388 @@ +use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; +use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{ + proto::{self, stat_blob_response::ChunkMeta}, + B3Digest, +}; +use futures::sink::SinkExt; +use std::{ + io::{self, Cursor}, + pin::pin, + sync::Arc, + task::Poll, +}; +use tokio::io::AsyncWriteExt; +use tokio::task::JoinHandle; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_util::{ + io::{CopyToBytes, SinkWriter}, + sync::PollSender, +}; +use tonic::{async_trait, Code, Status}; +use tracing::{instrument, Instrument as _}; + +/// Connects to a (remote) tvix-store BlobService over gRPC. +#[derive(Clone)] +pub struct GRPCBlobService<T> { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::blob_service_client::BlobServiceClient<T>, +} + +impl<T> GRPCBlobService<T> { + /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. + pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl<T> BlobService for GRPCBlobService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> io::Result<bool> { + match self + .grpc_client + .clone() + .stat(proto::StatBlobRequest { + digest: digest.clone().into(), + ..Default::default() + }) + .await + { + Ok(_blob_meta) => Ok(true), + Err(e) if e.code() == Code::NotFound => Ok(false), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + } + } + + #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { + // First try to get a list of chunks. In case there's only one chunk returned, + // buffer its data into a Vec, otherwise use a ChunkedReader. + // We previously used NaiveSeeker here, but userland likes to seek backwards too often, + // and without store composition this will get very noisy. + // FUTUREWORK: use CombinedBlobService and store composition. + match self.chunks(digest).await { + Ok(None) => Ok(None), + Ok(Some(chunks)) => { + if chunks.is_empty() || chunks.len() == 1 { + // No more granular chunking info, treat this as an individual chunk. + // Get a stream of [proto::BlobChunk], or return an error if the blob + // doesn't exist. + return match self + .grpc_client + .clone() + .read(proto::ReadBlobRequest { + digest: digest.clone().into(), + }) + .await + { + Ok(stream) => { + let data_stream = stream.into_inner().map(|e| { + e.map(|c| c.data) + .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s)) + }); + + // Use StreamReader::new to convert to an AsyncRead. + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); + + let mut buf = Vec::new(); + // TODO: only do this up to a certain limit. + tokio::io::copy(&mut data_reader, &mut buf).await?; + + Ok(Some(Box::new(Cursor::new(buf)))) + } + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + }; + } + + // The chunked case. Let ChunkedReader do individual reads. + // TODO: we should store the chunking data in some local cache, + // so `ChunkedReader` doesn't call `self.chunks` *again* for every chunk. + // Think about how store composition will fix this. + let chunked_reader = ChunkedReader::from_chunks( + chunks.into_iter().map(|chunk| { + ( + chunk.digest.try_into().expect("invalid b3 digest"), + chunk.size, + ) + }), + Arc::new(self.clone()) as Arc<dyn BlobService>, + ); + Ok(Some(Box::new(chunked_reader))) + } + Err(e) => Err(e)?, + } + } + + /// Returns a BlobWriter, that'll internally wrap each write in a + /// [proto::BlobChunk], which is send to the gRPC server. + #[instrument(skip_all)] + async fn open_write(&self) -> Box<dyn BlobWriter> { + // set up an mpsc channel passing around Bytes. + let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); + + // bytes arriving on the RX side are wrapped inside a + // [proto::BlobChunk], and a [ReceiverStream] is constructed. + let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); + + // spawn the gRPC put request, which will read from blobchunk_stream. + let task = tokio::spawn({ + let mut grpc_client = self.grpc_client.clone(); + async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) } + // instrument the task with the current span, this is not done by default + .in_current_span() + }); + + // The tx part of the channel is converted to a sink of byte chunks. + let sink = PollSender::new(tx) + .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)); + + // … which is turned into an [tokio::io::AsyncWrite]. + let writer = SinkWriter::new(CopyToBytes::new(sink)); + + Box::new(GRPCBlobWriter { + task_and_writer: Some((task, writer)), + digest: None, + }) + } + + #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { + let resp = self + .grpc_client + .clone() + .stat(proto::StatBlobRequest { + digest: digest.clone().into(), + send_chunks: true, + ..Default::default() + }) + .await; + + match resp { + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + Ok(resp) => { + let resp = resp.into_inner(); + + resp.validate() + .map_err(|e| std::io::Error::new(io::ErrorKind::InvalidData, e))?; + + Ok(Some(resp.chunks)) + } + } + } +} + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCBlobServiceConfig { + url: String, +} + +impl TryFrom<url::Url> for GRPCBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Ok(GRPCBlobServiceConfig { + url: url.to_string(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for GRPCBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::blob_service_client::BlobServiceClient::new( + crate::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCBlobService::from_client(client))) + } +} + +pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> { + /// The task containing the put request, and the inner writer, if we're still writing. + task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, +} + +#[async_trait] +impl<W: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static> BlobWriter for GRPCBlobWriter<W> { + async fn close(&mut self) -> io::Result<B3Digest> { + if self.task_and_writer.is_none() { + // if we're already closed, return the b3 digest, which must exist. + // If it doesn't, we already closed and failed once, and didn't handle the error. + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "already closed")), + } + } else { + let (task, mut writer) = self.task_and_writer.take().unwrap(); + + // invoke shutdown, so the inner writer closes its internal tx side of + // the channel. + writer.shutdown().await?; + + // block on the RPC call to return. + // This ensures all chunks are sent out, and have been received by the + // backend. + + match task.await? { + Ok(resp) => { + // return the digest from the response, and store it in self.digest for subsequent closes. + let digest_len = resp.digest.len(); + let digest: B3Digest = resp.digest.try_into().map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + format!("invalid root digest length {} in response", digest_len), + ) + })?; + self.digest = Some(digest.clone()); + Ok(digest) + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), + } + } + } +} + +impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for GRPCBlobWriter<W> { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<Result<usize, io::Error>> { + match &mut self.task_and_writer { + None => Poll::Ready(Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + ))), + Some((_, ref mut writer)) => { + let pinned_writer = pin!(writer); + pinned_writer.poll_write(cx, buf) + } + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + match &mut self.task_and_writer { + None => Poll::Ready(Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + ))), + Some((_, ref mut writer)) => { + let pinned_writer = pin!(writer); + pinned_writer.poll_flush(cx) + } + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + // TODO(raitobezarius): this might not be a graceful shutdown of the + // channel inside the gRPC connection. + Poll::Ready(Ok(())) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio_retry::strategy::ExponentialBackoff; + use tokio_retry::Retry; + use tokio_stream::wrappers::UnixListenerStream; + + use crate::blobservice::MemoryBlobService; + use crate::fixtures; + use crate::proto::blob_service_client::BlobServiceClient; + use crate::proto::GRPCBlobServiceWrapper; + + use super::BlobService; + use super::GRPCBlobService; + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("daemon"); + + let path_clone = socket_path.clone(); + + // Spin up a server + tokio::spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = + server.add_service(crate::proto::blob_service_server::BlobServiceServer::new( + GRPCBlobServiceWrapper::new( + Box::<MemoryBlobService>::default() as Box<dyn BlobService> + ), + )); + router.serve_with_incoming(uds_stream).await + }); + + // wait for the socket to be created + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // prepare a client + let grpc_client = { + let url = url::Url::parse(&format!( + "grpc+unix://{}?wait-connect=1", + socket_path.display() + )) + .expect("must parse"); + let client = BlobServiceClient::new( + crate::tonic::channel_from_url(&url) + .await + .expect("must succeed"), + ); + GRPCBlobService::from_client(client) + }; + + let has = grpc_client + .has(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not be err"); + + assert!(!has); + } +} diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs new file mode 100644 index 000000000000..3d733f950470 --- /dev/null +++ b/tvix/castore/src/blobservice/memory.rs @@ -0,0 +1,155 @@ +use parking_lot::RwLock; +use std::io::{self, Cursor, Write}; +use std::task::Poll; +use std::{collections::HashMap, sync::Arc}; +use tonic::async_trait; +use tracing::instrument; + +use super::{BlobReader, BlobService, BlobWriter}; +use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{B3Digest, Error}; + +#[derive(Clone, Default)] +pub struct MemoryBlobService { + db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, +} + +#[async_trait] +impl BlobService for MemoryBlobService { + #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> io::Result<bool> { + let db = self.db.read(); + Ok(db.contains_key(digest)) + } + + #[instrument(skip_all, err, fields(blob.digest=%digest))] + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { + let db = self.db.read(); + + match db.get(digest).map(|x| Cursor::new(x.clone())) { + Some(result) => Ok(Some(Box::new(result))), + None => Ok(None), + } + } + + #[instrument(skip_all)] + async fn open_write(&self) -> Box<dyn BlobWriter> { + Box::new(MemoryBlobWriter::new(self.db.clone())) + } +} + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MemoryBlobServiceConfig {} + +impl TryFrom<url::Url> for MemoryBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string()).into()); + } + Ok(MemoryBlobServiceConfig {}) + } +} + +#[async_trait] +impl ServiceBuilder for MemoryBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(MemoryBlobService::default())) + } +} + +pub struct MemoryBlobWriter { + db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, + + /// Contains the buffer Vec and hasher, or None if already closed + writers: Option<(Vec<u8>, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, +} + +impl MemoryBlobWriter { + fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self { + Self { + db, + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, + } + } +} +impl tokio::io::AsyncWrite for MemoryBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + b: &[u8], + ) -> std::task::Poll<Result<usize, io::Error>> { + Poll::Ready(match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&b[..bytes_written]) + } + }) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + Poll::Ready(match self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + }) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + // shutdown is "instantaneous", we only write to memory. + Poll::Ready(Ok(())) + } +} + +#[async_trait] +impl BlobWriter for MemoryBlobWriter { + async fn close(&mut self) -> io::Result<B3Digest> { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "already closed")), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + let digest: B3Digest = hasher.finalize().as_bytes().into(); + + // Only insert if the blob doesn't already exist. + let mut db = self.db.upgradable_read(); + if !db.contains_key(&digest) { + // open the database for writing. + db.with_upgraded(|db| { + // and put buf in there. This will move buf out. + db.insert(digest.clone(), buf); + }); + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } + } +} diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs new file mode 100644 index 000000000000..85292722fa7e --- /dev/null +++ b/tvix/castore/src/blobservice/mod.rs @@ -0,0 +1,112 @@ +use std::io; + +use tonic::async_trait; + +use crate::composition::{Registry, ServiceBuilder}; +use crate::proto::stat_blob_response::ChunkMeta; +use crate::B3Digest; + +mod chunked_reader; +mod combinator; +mod from_addr; +mod grpc; +mod memory; +mod object_store; + +#[cfg(test)] +pub mod tests; + +pub use self::chunked_reader::ChunkedReader; +pub use self::combinator::{CombinedBlobService, CombinedBlobServiceConfig}; +pub use self::from_addr::from_addr; +pub use self::grpc::{GRPCBlobService, GRPCBlobServiceConfig}; +pub use self::memory::{MemoryBlobService, MemoryBlobServiceConfig}; +pub use self::object_store::{ObjectStoreBlobService, ObjectStoreBlobServiceConfig}; + +/// The base trait all BlobService services need to implement. +/// It provides functions to check whether a given blob exists, +/// a way to read (and seek) a blob, and a method to create a blobwriter handle, +/// which will implement a writer interface, and also provides a close funtion, +/// to finalize a blob and get its digest. +#[async_trait] +pub trait BlobService: Send + Sync { + /// Check if the service has the blob, by its content hash. + /// On implementations returning chunks, this must also work for chunks. + async fn has(&self, digest: &B3Digest) -> io::Result<bool>; + + /// Request a blob from the store, by its content hash. + /// On implementations returning chunks, this must also work for chunks. + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>>; + + /// Insert a new blob into the store. Returns a [BlobWriter], which + /// implements [tokio::io::AsyncWrite] and a [BlobWriter::close] to finalize + /// the blob and get its digest. + async fn open_write(&self) -> Box<dyn BlobWriter>; + + /// Return a list of chunks for a given blob. + /// There's a distinction between returning Ok(None) and Ok(Some(vec![])). + /// The former return value is sent in case the blob is not present at all, + /// while the second one is sent in case there's no more granular chunks (or + /// the backend does not support chunking). + /// A default implementation checking for existence and then returning it + /// does not have more granular chunks available is provided. + async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { + if !self.has(digest).await? { + return Ok(None); + } + // default implementation, signalling the backend does not have more + // granular chunks available. + Ok(Some(vec![])) + } +} + +#[async_trait] +impl<A> BlobService for A +where + A: AsRef<dyn BlobService> + Send + Sync, +{ + async fn has(&self, digest: &B3Digest) -> io::Result<bool> { + self.as_ref().has(digest).await + } + + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { + self.as_ref().open_read(digest).await + } + + async fn open_write(&self) -> Box<dyn BlobWriter> { + self.as_ref().open_write().await + } + + async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { + self.as_ref().chunks(digest).await + } +} + +/// A [tokio::io::AsyncWrite] that the user needs to close() afterwards for persist. +/// On success, it returns the digest of the written blob. +#[async_trait] +pub trait BlobWriter: tokio::io::AsyncWrite + Send + Unpin { + /// Signal there's no more data to be written, and return the digest of the + /// contents written. + /// + /// Closing a already-closed BlobWriter is a no-op. + async fn close(&mut self) -> io::Result<B3Digest>; +} + +/// BlobReader is a [tokio::io::AsyncRead] that also allows seeking. +pub trait BlobReader: tokio::io::AsyncRead + tokio::io::AsyncSeek + Send + Unpin + 'static {} + +/// A [`io::Cursor<Vec<u8>>`] can be used as a BlobReader. +impl BlobReader for io::Cursor<&'static [u8]> {} +impl BlobReader for io::Cursor<&'static [u8; 0]> {} +impl BlobReader for io::Cursor<Vec<u8>> {} +impl BlobReader for io::Cursor<bytes::Bytes> {} +impl BlobReader for tokio::fs::File {} + +/// Registers the builtin BlobService implementations with the registry +pub(crate) fn register_blob_services(reg: &mut Registry) { + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::ObjectStoreBlobServiceConfig>("objectstore"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::MemoryBlobServiceConfig>("memory"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::CombinedBlobServiceConfig>("combined"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::GRPCBlobServiceConfig>("grpc"); +} diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs new file mode 100644 index 000000000000..5bb05cf26123 --- /dev/null +++ b/tvix/castore/src/blobservice/object_store.rs @@ -0,0 +1,617 @@ +use std::{ + collections::HashMap, + io::{self, Cursor}, + pin::pin, + sync::Arc, + task::Poll, +}; + +use data_encoding::HEXLOWER; +use fastcdc::v2020::AsyncStreamCDC; +use futures::Future; +use object_store::{path::Path, ObjectStore}; +use pin_project_lite::pin_project; +use prost::Message; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio_stream::StreamExt; +use tonic::async_trait; +use tracing::{debug, instrument, trace, Level}; +use url::Url; + +use crate::{ + composition::{CompositionContext, ServiceBuilder}, + proto::{stat_blob_response::ChunkMeta, StatBlobResponse}, + B3Digest, B3HashingReader, Error, +}; + +use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; + +/// Uses any object storage supported by the [object_store] crate to provide a +/// tvix-castore [BlobService]. +/// +/// # Data format +/// Data is organized in "blobs" and "chunks". +/// Blobs don't hold the actual data, but instead contain a list of more +/// granular chunks that assemble to the contents requested. +/// This allows clients to seek, and not download chunks they already have +/// locally, as it's referred to from other files. +/// Check `rpc_blobstore` and more general BlobStore docs on that. +/// +/// ## Blobs +/// Stored at `${base_path}/blobs/b3/$digest_key`. They contains the serialized +/// StatBlobResponse for the blob with the digest. +/// +/// ## Chunks +/// Chunks are stored at `${base_path}/chunks/b3/$digest_key`. They contain +/// the literal contents of the chunk, but are zstd-compressed. +/// +/// ## Digest key sharding +/// The blake3 digest encoded in lower hex, and sharded after the second +/// character. +/// The blob for "Hello World" is stored at +/// `${base_path}/blobs/b3/41/41f8394111eb713a22165c46c90ab8f0fd9399c92028fd6d288944b23ff5bf76`. +/// +/// This reduces the number of files in the same directory, which would be a +/// problem at least when using [object_store::local::LocalFileSystem]. +/// +/// # Future changes +/// There's no guarantees about this being a final format yet. +/// Once object_store gets support for additional metadata / content-types, +/// we can eliminate some requests (small blobs only consisting of a single +/// chunk can be stored as-is, without the blob index file). +/// It also allows signalling any compression of chunks in the content-type. +/// Migration *should* be possible by simply adding the right content-types to +/// all keys stored so far, but no promises ;-) +#[derive(Clone)] +pub struct ObjectStoreBlobService { + object_store: Arc<dyn ObjectStore>, + base_path: Path, + + /// Average chunk size for FastCDC, in bytes. + /// min value is half, max value double of that number. + avg_chunk_size: u32, +} + +#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))] +fn derive_blob_path(base_path: &Path, digest: &B3Digest) -> Path { + base_path + .child("blobs") + .child("b3") + .child(HEXLOWER.encode(&digest.as_slice()[..2])) + .child(HEXLOWER.encode(digest.as_slice())) +} + +#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,chunk.digest=%digest),ret(Display))] +fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path { + base_path + .child("chunks") + .child("b3") + .child(HEXLOWER.encode(&digest.as_slice()[..2])) + .child(HEXLOWER.encode(digest.as_slice())) +} + +#[async_trait] +impl BlobService for ObjectStoreBlobService { + #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> io::Result<bool> { + // TODO: clarify if this should work for chunks or not, and explicitly + // document in the proto docs. + let p = derive_blob_path(&self.base_path, digest); + + match self.object_store.head(&p).await { + Ok(_) => Ok(true), + Err(object_store::Error::NotFound { .. }) => { + let p = derive_chunk_path(&self.base_path, digest); + match self.object_store.head(&p).await { + Ok(_) => Ok(true), + Err(object_store::Error::NotFound { .. }) => Ok(false), + Err(e) => Err(e)?, + } + } + Err(e) => Err(e)?, + } + } + + #[instrument(skip_all, err, fields(blob.digest=%digest))] + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { + // handle reading the empty blob. + if digest.as_slice() == blake3::hash(b"").as_bytes() { + return Ok(Some(Box::new(Cursor::new(b"")) as Box<dyn BlobReader>)); + } + match self + .object_store + .get(&derive_chunk_path(&self.base_path, digest)) + .await + { + Ok(res) => { + // handle reading blobs that are small enough to fit inside a single chunk: + // fetch the entire chunk into memory, decompress, ensure the b3 digest matches, + // and return a io::Cursor over that data. + // FUTUREWORK: use zstd::bulk to prevent decompression bombs + + let chunk_raw_bytes = res.bytes().await?; + let chunk_contents = zstd::stream::decode_all(Cursor::new(chunk_raw_bytes))?; + + if *digest != blake3::hash(&chunk_contents).as_bytes().into() { + Err(io::Error::other("chunk contents invalid"))?; + } + + Ok(Some(Box::new(Cursor::new(chunk_contents)))) + } + Err(object_store::Error::NotFound { .. }) => { + // NOTE: For public-facing things, we would want to stop here. + // Clients should fetch granularly, so they can make use of + // chunks they have locally. + // However, if this is used directly, without any caches, do the + // assembly here. + // This is subject to change, once we have store composition. + // TODO: make this configurable, and/or clarify behaviour for + // the gRPC server surface (explicitly document behaviour in the + // proto docs) + if let Some(chunks) = self.chunks(digest).await? { + let chunked_reader = ChunkedReader::from_chunks( + chunks.into_iter().map(|chunk| { + ( + chunk.digest.try_into().expect("invalid b3 digest"), + chunk.size, + ) + }), + Arc::new(self.clone()) as Arc<dyn BlobService>, + ); + + Ok(Some(Box::new(chunked_reader))) + } else { + // This is neither a chunk nor a blob, return None. + Ok(None) + } + } + Err(e) => Err(e.into()), + } + } + + #[instrument(skip_all)] + async fn open_write(&self) -> Box<dyn BlobWriter> { + // ObjectStoreBlobWriter implements AsyncWrite, but all the chunking + // needs an AsyncRead, so we create a pipe here. + // In its `AsyncWrite` implementation, `ObjectStoreBlobWriter` delegates + // writes to w. It periodically polls the future that's reading from the + // other side. + let (w, r) = tokio::io::duplex(self.avg_chunk_size as usize * 10); + + Box::new(ObjectStoreBlobWriter { + writer: Some(w), + fut: Some(Box::pin(chunk_and_upload( + r, + self.object_store.clone(), + self.base_path.clone(), + self.avg_chunk_size / 2, + self.avg_chunk_size, + self.avg_chunk_size * 2, + ))), + fut_output: None, + }) + } + + #[instrument(skip_all, err, fields(blob.digest=%digest))] + async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { + match self + .object_store + .get(&derive_blob_path(&self.base_path, digest)) + .await + { + Ok(get_result) => { + // fetch the data at the blob path + let blob_data = get_result.bytes().await?; + // parse into StatBlobResponse + let stat_blob_response: StatBlobResponse = StatBlobResponse::decode(blob_data)?; + + debug!( + chunk.count = stat_blob_response.chunks.len(), + blob.size = stat_blob_response + .chunks + .iter() + .map(|x| x.size) + .sum::<u64>(), + "found more granular chunks" + ); + + Ok(Some(stat_blob_response.chunks)) + } + Err(object_store::Error::NotFound { .. }) => { + // If there's only a chunk, we must return the empty vec here, rather than None. + match self + .object_store + .head(&derive_chunk_path(&self.base_path, digest)) + .await + { + Ok(_) => { + // present, but no more chunks available + debug!("found a single chunk"); + Ok(Some(vec![])) + } + Err(object_store::Error::NotFound { .. }) => { + // Neither blob nor single chunk found + debug!("not found"); + Ok(None) + } + // error checking for chunk + Err(e) => Err(e.into()), + } + } + // error checking for blob + Err(err) => Err(err.into()), + } + } +} + +fn default_avg_chunk_size() -> u32 { + 256 * 1024 +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectStoreBlobServiceConfig { + object_store_url: String, + #[serde(default = "default_avg_chunk_size")] + avg_chunk_size: u32, + object_store_options: HashMap<String, String>, +} + +impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// [object_store]. + /// Any path suffix becomes the base path of the object store. + /// additional options, the same as in [object_store::parse_url_opts] can + /// be passed. + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // We need to convert the URL to string, strip the prefix there, and then + // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. + let trimmed_url = { + let s = url.to_string(); + let mut url = Url::parse( + s.strip_prefix("objectstore+") + .ok_or(Error::StorageError("Missing objectstore uri".into()))?, + )?; + // trim the query pairs, they might contain credentials or local settings we don't want to send as-is. + url.set_query(None); + url + }; + Ok(ObjectStoreBlobServiceConfig { + object_store_url: trimmed_url.into(), + object_store_options: url + .query_pairs() + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + avg_chunk_size: 256 * 1024, + }) + } +} + +#[async_trait] +impl ServiceBuilder for ObjectStoreBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (object_store, path) = object_store::parse_url_opts( + &self.object_store_url.parse()?, + &self.object_store_options, + )?; + Ok(Arc::new(ObjectStoreBlobService { + object_store: Arc::new(object_store), + base_path: path, + avg_chunk_size: self.avg_chunk_size, + })) + } +} + +/// Reads blob contents from a AsyncRead, chunks and uploads them. +/// On success, returns a [StatBlobResponse] pointing to the individual chunks. +#[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)] +async fn chunk_and_upload<R: AsyncRead + Unpin>( + r: R, + object_store: Arc<dyn ObjectStore>, + base_path: Path, + min_chunk_size: u32, + avg_chunk_size: u32, + max_chunk_size: u32, +) -> io::Result<B3Digest> { + // wrap reader with something calculating the blake3 hash of all data read. + let mut b3_r = B3HashingReader::from(r); + // set up a fastcdc chunker + let mut chunker = + AsyncStreamCDC::new(&mut b3_r, min_chunk_size, avg_chunk_size, max_chunk_size); + + /// This really should just belong into the closure at + /// `chunker.as_stream().then(|_| { … })``, but if we try to, rustc spits + /// higher-ranked lifetime errors at us. + async fn fastcdc_chunk_uploader( + resp: Result<fastcdc::v2020::ChunkData, fastcdc::v2020::Error>, + base_path: Path, + object_store: Arc<dyn ObjectStore>, + ) -> std::io::Result<ChunkMeta> { + let chunk_data = resp?; + let chunk_digest: B3Digest = blake3::hash(&chunk_data.data).as_bytes().into(); + let chunk_path = derive_chunk_path(&base_path, &chunk_digest); + + upload_chunk(object_store, chunk_digest, chunk_path, chunk_data.data).await + } + + // Use the fastcdc chunker to produce a stream of chunks, and upload these + // that don't exist to the backend. + let chunks = chunker + .as_stream() + .then(|resp| fastcdc_chunk_uploader(resp, base_path.clone(), object_store.clone())) + .collect::<io::Result<Vec<ChunkMeta>>>() + .await?; + + let chunks = if chunks.len() < 2 { + // The chunker returned only one chunk, which is the entire blob. + // According to the protocol, we must return an empty list of chunks + // when the blob is not split up further. + vec![] + } else { + chunks + }; + + let stat_blob_response = StatBlobResponse { + chunks, + bao: "".into(), // still todo + }; + + // check for Blob, if it doesn't exist, persist. + let blob_digest: B3Digest = b3_r.digest().into(); + let blob_path = derive_blob_path(&base_path, &blob_digest); + + match object_store.head(&blob_path).await { + // blob already exists, nothing to do + Ok(_) => { + trace!( + blob.digest = %blob_digest, + blob.path = %blob_path, + "blob already exists on backend" + ); + } + // chunk does not yet exist, upload first + Err(object_store::Error::NotFound { .. }) => { + debug!( + blob.digest = %blob_digest, + blob.path = %blob_path, + "uploading blob" + ); + object_store + .put(&blob_path, stat_blob_response.encode_to_vec().into()) + .await?; + } + Err(err) => { + // other error + Err(err)? + } + } + + Ok(blob_digest) +} + +/// upload chunk if it doesn't exist yet. +#[instrument(skip_all, fields(chunk.digest = %chunk_digest, chunk.size = chunk_data.len(), chunk.path = %chunk_path), err)] +async fn upload_chunk( + object_store: Arc<dyn ObjectStore>, + chunk_digest: B3Digest, + chunk_path: Path, + chunk_data: Vec<u8>, +) -> std::io::Result<ChunkMeta> { + let chunk_size = chunk_data.len(); + match object_store.head(&chunk_path).await { + // chunk already exists, nothing to do + Ok(_) => { + debug!("chunk already exists"); + } + + // chunk does not yet exist, compress and upload. + Err(object_store::Error::NotFound { .. }) => { + let chunk_data_compressed = + zstd::encode_all(Cursor::new(chunk_data), zstd::DEFAULT_COMPRESSION_LEVEL)?; + + debug!(chunk.compressed_size=%chunk_data_compressed.len(), "uploading chunk"); + + object_store + .as_ref() + .put(&chunk_path, chunk_data_compressed.into()) + .await?; + } + // other error + Err(err) => Err(err)?, + } + + Ok(ChunkMeta { + digest: chunk_digest.into(), + size: chunk_size as u64, + }) +} + +pin_project! { + /// Takes care of blob uploads. + /// All writes are relayed to self.writer, and we continuously poll the + /// future (which will internally read from the other side of the pipe and + /// upload chunks). + /// Our BlobWriter::close() needs to drop self.writer, so the other side + /// will read EOF and can finalize the blob. + /// The future should then resolve and return the blob digest. + pub struct ObjectStoreBlobWriter<W, Fut> + where + W: AsyncWrite, + Fut: Future, + { + #[pin] + writer: Option<W>, + + #[pin] + fut: Option<Fut>, + + fut_output: Option<io::Result<B3Digest>> + } +} + +impl<W, Fut> tokio::io::AsyncWrite for ObjectStoreBlobWriter<W, Fut> +where + W: AsyncWrite + Send + Unpin, + Fut: Future, +{ + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<Result<usize, io::Error>> { + let this = self.project(); + // poll the future. + let fut = this.fut.as_pin_mut().expect("not future"); + let fut_p = fut.poll(cx); + // if it's ready, the only way this could have happened is that the + // upload failed, because we're only closing `self.writer` after all + // writes happened. + if fut_p.is_ready() { + return Poll::Ready(Err(io::Error::other("upload failed"))); + } + + // write to the underlying writer + this.writer + .as_pin_mut() + .expect("writer must be some") + .poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + let this = self.project(); + // poll the future. + let fut = this.fut.as_pin_mut().expect("not future"); + let fut_p = fut.poll(cx); + // if it's ready, the only way this could have happened is that the + // upload failed, because we're only closing `self.writer` after all + // writes happened. + if fut_p.is_ready() { + return Poll::Ready(Err(io::Error::other("upload failed"))); + } + + // Call poll_flush on the writer + this.writer + .as_pin_mut() + .expect("writer must be some") + .poll_flush(cx) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + // There's nothing to do on shutdown. We might have written some chunks + // that are nowhere else referenced, but cleaning them up here would be racy. + std::task::Poll::Ready(Ok(())) + } +} + +#[async_trait] +impl<W, Fut> BlobWriter for ObjectStoreBlobWriter<W, Fut> +where + W: AsyncWrite + Send + Unpin, + Fut: Future<Output = io::Result<B3Digest>> + Send + Unpin, +{ + async fn close(&mut self) -> io::Result<B3Digest> { + match self.writer.take() { + Some(mut writer) => { + // shut down the writer, so the other side will read EOF. + writer.shutdown().await?; + + // take out the future. + let fut = self.fut.take().expect("fut must be some"); + // await it. + let resp = pin!(fut).await; + + match resp.as_ref() { + // In the case of an Ok value, we store it in self.fut_output, + // so future calls to close can return that. + Ok(b3_digest) => { + self.fut_output = Some(Ok(b3_digest.clone())); + } + Err(e) => { + // for the error type, we need to cheat a bit, as + // they're not clone-able. + // Simply store a sloppy clone, with the same ErrorKind and message there. + self.fut_output = Some(Err(std::io::Error::new(e.kind(), e.to_string()))) + } + } + resp + } + None => { + // called a second time, return self.fut_output. + match self.fut_output.as_ref().unwrap() { + Ok(ref b3_digest) => Ok(b3_digest.clone()), + Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())), + } + } + } + } +} + +#[cfg(test)] +mod test { + use super::{chunk_and_upload, default_avg_chunk_size}; + use crate::{ + blobservice::{BlobService, ObjectStoreBlobService}, + fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST}, + }; + use std::{io::Cursor, sync::Arc}; + use url::Url; + + /// Tests chunk_and_upload directly, bypassing the BlobWriter at open_write(). + #[rstest::rstest] + #[case::a(&BLOB_A, &BLOB_A_DIGEST)] + #[case::b(&BLOB_B, &BLOB_B_DIGEST)] + #[tokio::test] + async fn test_chunk_and_upload( + #[case] blob: &bytes::Bytes, + #[case] blob_digest: &crate::B3Digest, + ) { + let (object_store, base_path) = + object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap(); + let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store); + let blobsvc = Arc::new(ObjectStoreBlobService { + object_store: object_store.clone(), + avg_chunk_size: default_avg_chunk_size(), + base_path, + }); + + let inserted_blob_digest = chunk_and_upload( + &mut Cursor::new(blob.to_vec()), + object_store, + object_store::path::Path::from("/"), + 1024 / 2, + 1024, + 1024 * 2, + ) + .await + .expect("chunk_and_upload succeeds"); + + assert_eq!(blob_digest.clone(), inserted_blob_digest); + + // Now we should have the blob + assert!(blobsvc.has(blob_digest).await.unwrap()); + + // Check if it was chunked correctly + let chunks = blobsvc.chunks(blob_digest).await.unwrap().unwrap(); + if blob.len() < 1024 / 2 { + // The blob is smaller than the min chunk size, it should have been inserted as a whole + assert!(chunks.is_empty()); + } else if blob.len() > 1024 * 2 { + // The blob is larger than the max chunk size, make sure it was split up into at least + // two chunks + assert!(chunks.len() >= 2); + } + } +} diff --git a/tvix/castore/src/blobservice/tests/mod.rs b/tvix/castore/src/blobservice/tests/mod.rs new file mode 100644 index 000000000000..0280faebb171 --- /dev/null +++ b/tvix/castore/src/blobservice/tests/mod.rs @@ -0,0 +1,253 @@ +//! This contains test scenarios that a given [BlobService] needs to pass. +//! We use [rstest] and [rstest_reuse] to provide all services we want to test +//! against, and then apply this template to all test functions. + +use rstest::*; +use rstest_reuse::{self, *}; +use std::io; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; + +use super::BlobService; +use crate::blobservice; +use crate::fixtures::BLOB_A; +use crate::fixtures::BLOB_A_DIGEST; +use crate::fixtures::BLOB_B; +use crate::fixtures::BLOB_B_DIGEST; + +mod utils; +use self::utils::make_grpc_blob_service_client; + +/// This produces a template, which will be applied to all individual test functions. +/// See https://github.com/la10736/rstest/issues/130#issuecomment-968864832 +#[template] +#[rstest] +#[case::grpc(make_grpc_blob_service_client().await)] +#[case::memory(blobservice::from_addr("memory://").await.unwrap())] +#[case::objectstore_memory(blobservice::from_addr("objectstore+memory://").await.unwrap())] +pub fn blob_services(#[case] blob_service: impl BlobService) {} + +/// Using [BlobService::has] on a non-existing blob should return false. +#[apply(blob_services)] +#[tokio::test] +async fn has_nonexistent_false(blob_service: impl BlobService) { + assert!(!blob_service + .has(&BLOB_A_DIGEST) + .await + .expect("must not fail")); +} + +/// Using [BlobService::chunks] on a non-existing blob should return Ok(None) +#[apply(blob_services)] +#[tokio::test] +async fn chunks_nonexistent_false(blob_service: impl BlobService) { + assert!(blob_service + .chunks(&BLOB_A_DIGEST) + .await + .expect("must be ok") + .is_none()); +} + +// TODO: do tests with `chunks` + +/// Trying to read a non-existing blob should return a None instead of a reader. +#[apply(blob_services)] +#[tokio::test] +async fn not_found_read(blob_service: impl BlobService) { + assert!(blob_service + .open_read(&BLOB_A_DIGEST) + .await + .expect("must not fail") + .is_none()) +} + +/// Put a blob in the store, check has, get it back. +#[apply(blob_services)] +// #[case::small(&fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST)] +// #[case::big(&fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST)] +#[tokio::test] +async fn put_has_get(blob_service: impl BlobService) { + // TODO: figure out how to instantiate this with BLOB_A and BLOB_B, as two separate cases + for (blob_contents, blob_digest) in &[ + (&*BLOB_A, BLOB_A_DIGEST.clone()), + (&*BLOB_B, BLOB_B_DIGEST.clone()), + ] { + let mut w = blob_service.open_write().await; + + let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w) + .await + .expect("copy must succeed"); + assert_eq!( + blob_contents.len(), + l as usize, + "written bytes must match blob length" + ); + + let digest = w.close().await.expect("close must succeed"); + + assert_eq!(*blob_digest, digest, "returned digest must be correct"); + + assert!( + blob_service.has(blob_digest).await.expect("must not fail"), + "blob service should now have the blob" + ); + + let mut r = blob_service + .open_read(blob_digest) + .await + .expect("open_read must succeed") + .expect("must be some"); + + let mut buf: Vec<u8> = Vec::new(); + let mut pinned_reader = std::pin::pin!(r); + let l = tokio::io::copy(&mut pinned_reader, &mut buf) + .await + .expect("copy must succeed"); + + assert_eq!( + blob_contents.len(), + l as usize, + "read bytes must match blob length" + ); + + assert_eq!(&blob_contents[..], &buf, "read blob contents must match"); + } +} + +/// Put a blob in the store, and seek inside it a bit. +#[apply(blob_services)] +#[tokio::test] +async fn put_seek(blob_service: impl BlobService) { + let mut w = blob_service.open_write().await; + + tokio::io::copy(&mut io::Cursor::new(&BLOB_B.to_vec()), &mut w) + .await + .expect("copy must succeed"); + w.close().await.expect("close must succeed"); + + // open a blob for reading + let mut r = blob_service + .open_read(&BLOB_B_DIGEST) + .await + .expect("open_read must succeed") + .expect("must be some"); + + let mut pos: u64 = 0; + + // read the first 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected first 10 bytes to match" + ); + + pos += buf.len() as u64; + } + // seek by 0 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos)) + .await + .expect("must not fail"); + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + pos += buf.len() as u64; + } + + // seek by 5 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos + 5)) + .await + .expect("must not fail"); + pos += 5; + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + pos += buf.len() as u64; + } + + // seek by 12345 bytes, using SeekFrom:: + let p = r + .seek(io::SeekFrom::Current(12345)) + .await + .expect("must not fail"); + pos += 12345; + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + #[allow(unused_assignments)] + { + pos += buf.len() as u64; + } + } + + // seeking to the end is okay… + let p = r + .seek(io::SeekFrom::Start(BLOB_B.len() as u64)) + .await + .expect("must not fail"); + pos = BLOB_B.len() as u64; + assert_eq!(pos, p); + + { + // but it returns no more data. + let mut buf: Vec<u8> = Vec::new(); + r.read_to_end(&mut buf).await.expect("must not fail"); + assert!(buf.is_empty(), "expected no more data to be read"); + } + + // seeking past the end… + // should either be ok, but then return 0 bytes. + // this matches the behaviour or a Cursor<Vec<u8>>. + if let Ok(_pos) = r.seek(io::SeekFrom::Start(BLOB_B.len() as u64 + 1)).await { + let mut buf: Vec<u8> = Vec::new(); + r.read_to_end(&mut buf).await.expect("must not fail"); + assert!(buf.is_empty(), "expected no more data to be read"); + } + // or not be okay. + + // TODO: this is only broken for the gRPC version + // We expect seeking backwards or relative to the end to fail. + // r.seek(io::SeekFrom::Current(-1)) + // .expect_err("SeekFrom::Current(-1) expected to fail"); + + // r.seek(io::SeekFrom::Start(pos - 1)) + // .expect_err("SeekFrom::Start(pos-1) expected to fail"); + + // r.seek(io::SeekFrom::End(0)) + // .expect_err("SeekFrom::End(_) expected to fail"); +} diff --git a/tvix/castore/src/blobservice/tests/utils.rs b/tvix/castore/src/blobservice/tests/utils.rs new file mode 100644 index 000000000000..7df4f00d3a09 --- /dev/null +++ b/tvix/castore/src/blobservice/tests/utils.rs @@ -0,0 +1,42 @@ +use crate::blobservice::{BlobService, MemoryBlobService}; +use crate::proto::blob_service_client::BlobServiceClient; +use crate::proto::GRPCBlobServiceWrapper; +use crate::{blobservice::GRPCBlobService, proto::blob_service_server::BlobServiceServer}; +use hyper_util::rt::TokioIo; +use tonic::transport::{Endpoint, Server, Uri}; + +/// Constructs and returns a gRPC BlobService. +/// The server part is a [MemoryBlobService], exposed via the +/// [GRPCBlobServiceWrapper], and connected through a DuplexStream +pub async fn make_grpc_blob_service_client() -> Box<dyn BlobService> { + let (left, right) = tokio::io::duplex(64); + + // spin up a server, which will only connect once, to the left side. + tokio::spawn(async { + let blob_service = Box::<MemoryBlobService>::default() as Box<dyn BlobService>; + + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( + blob_service, + ))); + + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await + }); + + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + + Box::new(GRPCBlobService::from_client(BlobServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } + })) + .await + .unwrap(), + ))) +} |