diff options
Diffstat (limited to 'tvix/castore/src/blobservice/combinator.rs')
-rw-r--r-- | tvix/castore/src/blobservice/combinator.rs | 63 |
1 files changed, 10 insertions, 53 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs index fc33d16a3473..6a964c8a8440 100644 --- a/tvix/castore/src/blobservice/combinator.rs +++ b/tvix/castore/src/blobservice/combinator.rs @@ -1,14 +1,12 @@ use std::sync::Arc; -use futures::{StreamExt, TryStreamExt}; -use tokio_util::io::{ReaderStream, StreamReader}; use tonic::async_trait; -use tracing::{instrument, warn}; +use tracing::instrument; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{B3Digest, Error}; -use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; +use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; /// Combinator for a BlobService, using a "local" and "remote" blobservice. /// Requests are tried in (and returned from) the local store first, only if @@ -71,19 +69,16 @@ where // otherwise, a chunked reader, which will always try the // local backend first. - // map Vec<ChunkMeta> to Vec<(B3Digest, u64)> - let chunks: Vec<(B3Digest, u64)> = remote_chunks - .into_iter() - .map(|chunk_meta| { + let chunked_reader = ChunkedReader::from_chunks( + remote_chunks.into_iter().map(|chunk| { ( - B3Digest::try_from(chunk_meta.digest) - .expect("invalid chunk digest"), - chunk_meta.size, + chunk.digest.try_into().expect("invalid b3 digest"), + chunk.size, ) - }) - .collect(); - - Ok(Some(make_chunked_reader(self.clone(), chunks))) + }), + Arc::new(self.clone()) as Arc<dyn BlobService>, + ); + Ok(Some(Box::new(chunked_reader))) } } } @@ -131,41 +126,3 @@ impl ServiceBuilder for CombinedBlobServiceConfig { })) } } - -fn make_chunked_reader<BS>( - // This must consume, as we can't retain references to blob_service, - // as it'd add a lifetime to BlobReader in general, which will get - // problematic in TvixStoreFs, which is using async move closures and cloning. - blob_service: BS, - // A list of b3 digests for individual chunks, and their sizes. - chunks: Vec<(B3Digest, u64)>, -) -> Box<dyn BlobReader> -where - BS: BlobService + Clone + 'static, -{ - // TODO: offset, verified streaming - - // construct readers for each chunk - let blob_service = blob_service.clone(); - let readers_stream = tokio_stream::iter(chunks).map(move |(digest, _)| { - let d = digest.to_owned(); - let blob_service = blob_service.clone(); - async move { - blob_service.open_read(&d.to_owned()).await?.ok_or_else(|| { - warn!(chunk.digest = %digest, "chunk not found"); - std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found") - }) - } - }); - - // convert the stream of readers to a stream of streams of byte chunks - let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) }); - - // flatten into one stream of byte chunks - let bytes_stream = bytes_streams.try_flatten(); - - // convert into AsyncRead - let blob_reader = StreamReader::new(bytes_stream); - - Box::new(NaiveSeeker::new(Box::pin(blob_reader))) -} |