diff options
Diffstat (limited to 'tvix/castore/src/blobservice/combinator.rs')
-rw-r--r-- | tvix/castore/src/blobservice/combinator.rs | 155 |
1 files changed, 77 insertions, 78 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs index 067eff96f488..1c90fb7bb055 100644 --- a/tvix/castore/src/blobservice/combinator.rs +++ b/tvix/castore/src/blobservice/combinator.rs @@ -1,22 +1,24 @@ -use futures::{StreamExt, TryStreamExt}; -use tokio_util::io::{ReaderStream, StreamReader}; +use std::sync::Arc; + use tonic::async_trait; -use tracing::{instrument, warn}; +use tracing::instrument; -use crate::B3Digest; +use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{B3Digest, Error}; -use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; +use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; -/// Combinator for a BlobService, using a "local" and "remote" blobservice. -/// Requests are tried in (and returned from) the local store first, only if -/// things are not present there, the remote BlobService is queried. -/// In case the local blobservice doesn't have the blob, we ask the remote -/// blobservice for chunks, and try to read each of these chunks from the local -/// blobservice again, before falling back to the remote one. -/// The remote BlobService is never written to. +/// Combinator for a BlobService, using a "near" and "far" blobservice. +/// Requests are tried in (and returned from) the near store first, only if +/// things are not present there, the far BlobService is queried. +/// In case the near blobservice doesn't have the blob, we ask the remote +/// blobservice for chunks, and try to read each of these chunks from the near +/// blobservice again, before falling back to the far one. +/// The far BlobService is never written to. pub struct CombinedBlobService<BL, BR> { - local: BL, - remote: BR, + instance_name: String, + near: BL, + far: BR, } impl<BL, BR> Clone for CombinedBlobService<BL, BR> @@ -26,8 +28,9 @@ where { fn clone(&self) -> Self { Self { - local: self.local.clone(), - remote: self.remote.clone(), + instance_name: self.instance_name.clone(), + near: self.near.clone(), + far: self.far.clone(), } } } @@ -38,95 +41,91 @@ where BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, { - #[instrument(skip(self, digest), fields(blob.digest=%digest))] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> { - Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?) + Ok(self.near.as_ref().has(digest).await? || self.far.as_ref().has(digest).await?) } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> { - if self.local.as_ref().has(digest).await? { - // local store has the blob, so we can assume it also has all chunks. - self.local.as_ref().open_read(digest).await + if self.near.as_ref().has(digest).await? { + // near store has the blob, so we can assume it also has all chunks. + self.near.as_ref().open_read(digest).await } else { - // Local store doesn't have the blob. + // near store doesn't have the blob. // Ask the remote one for the list of chunks, // and create a chunked reader that uses self.open_read() for // individual chunks. There's a chance we already have some chunks - // locally, meaning we don't need to fetch them all from the remote + // in near, meaning we don't need to fetch them all from the far // BlobService. - match self.remote.as_ref().chunks(digest).await? { - // blob doesn't exist on the remote side either, nothing we can do. + match self.far.as_ref().chunks(digest).await? { + // blob doesn't exist on the near side either, nothing we can do. None => Ok(None), Some(remote_chunks) => { - // if there's no more granular chunks, or the remote + // if there's no more granular chunks, or the far // blobservice doesn't support chunks, read the blob from - // the remote blobservice directly. + // the far blobservice directly. if remote_chunks.is_empty() { - return self.remote.as_ref().open_read(digest).await; + return self.far.as_ref().open_read(digest).await; } // otherwise, a chunked reader, which will always try the - // local backend first. + // near backend first. - // map Vec<ChunkMeta> to Vec<(B3Digest, u64)> - let chunks: Vec<(B3Digest, u64)> = remote_chunks - .into_iter() - .map(|chunk_meta| { + let chunked_reader = ChunkedReader::from_chunks( + remote_chunks.into_iter().map(|chunk| { ( - B3Digest::try_from(chunk_meta.digest) - .expect("invalid chunk digest"), - chunk_meta.size, + chunk.digest.try_into().expect("invalid b3 digest"), + chunk.size, ) - }) - .collect(); - - Ok(Some(make_chunked_reader(self.clone(), chunks))) + }), + Arc::new(self.clone()) as Arc<dyn BlobService>, + ); + Ok(Some(Box::new(chunked_reader))) } } } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { - // direct writes to the local one. - self.local.as_ref().open_write().await + // direct writes to the near one. + self.near.as_ref().open_write().await } } -fn make_chunked_reader<BS>( - // This must consume, as we can't retain references to blob_service, - // as it'd add a lifetime to BlobReader in general, which will get - // problematic in TvixStoreFs, which is using async move closures and cloning. - blob_service: BS, - // A list of b3 digests for individual chunks, and their sizes. - chunks: Vec<(B3Digest, u64)>, -) -> Box<dyn BlobReader> -where - BS: BlobService + Clone + 'static, -{ - // TODO: offset, verified streaming - - // construct readers for each chunk - let blob_service = blob_service.clone(); - let readers_stream = tokio_stream::iter(chunks).map(move |(digest, _)| { - let d = digest.to_owned(); - let blob_service = blob_service.clone(); - async move { - blob_service.open_read(&d.to_owned()).await?.ok_or_else(|| { - warn!(chunk.digest = %digest, "chunk not found"); - std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found") - }) - } - }); - - // convert the stream of readers to a stream of streams of byte chunks - let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) }); - - // flatten into one stream of byte chunks - let bytes_stream = bytes_streams.try_flatten(); +#[derive(serde::Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct CombinedBlobServiceConfig { + near: String, + far: String, +} - // convert into AsyncRead - let blob_reader = StreamReader::new(bytes_stream); +impl TryFrom<url::Url> for CombinedBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(_url: url::Url) -> Result<Self, Self::Error> { + Err(Error::StorageError( + "Instantiating a CombinedBlobService from a url is not supported".into(), + ) + .into()) + } +} - Box::new(NaiveSeeker::new(Box::pin(blob_reader))) +#[async_trait] +impl ServiceBuilder for CombinedBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> { + let (local, remote) = futures::join!( + context.resolve(self.near.clone()), + context.resolve(self.far.clone()) + ); + Ok(Arc::new(CombinedBlobService { + instance_name: instance_name.to_string(), + near: local?, + far: remote?, + })) + } } |