about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r--tvix/castore/src/blobservice/combinator.rs135
-rw-r--r--tvix/castore/src/blobservice/mod.rs2
2 files changed, 137 insertions, 0 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs
new file mode 100644
index 000000000000..caa04fb35d96
--- /dev/null
+++ b/tvix/castore/src/blobservice/combinator.rs
@@ -0,0 +1,135 @@
+use data_encoding::BASE64;
+use futures::{StreamExt, TryStreamExt};
+use tokio_util::io::{ReaderStream, StreamReader};
+use tonic::async_trait;
+use tracing::{instrument, warn};
+
+use crate::B3Digest;
+
+use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
+
+/// Combinator for a BlobService, using a "local" and "remote" blobservice.
+/// Requests are tried in (and returned from) the local store first, only if
+/// things are not present there, the remote BlobService is queried.
+/// In case the local blobservice doesn't have the blob, we ask the remote
+/// blobservice for chunks, and try to read each of these chunks from the local
+/// blobservice again, before falling back to the remote one.
+/// The remote BlobService is never written to.
+pub struct CombinedBlobService<BL, BR> {
+    local: BL,
+    remote: BR,
+}
+
+impl<BL, BR> Clone for CombinedBlobService<BL, BR>
+where
+    BL: Clone,
+    BR: Clone,
+{
+    fn clone(&self) -> Self {
+        Self {
+            local: self.local.clone(),
+            remote: self.remote.clone(),
+        }
+    }
+}
+
+#[async_trait]
+impl<BL, BR> BlobService for CombinedBlobService<BL, BR>
+where
+    BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
+    BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
+{
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> {
+        Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?)
+    }
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> {
+        if self.local.as_ref().has(digest).await? {
+            // local store has the blob, so we can assume it also has all chunks.
+            self.local.as_ref().open_read(digest).await
+        } else {
+            // Local store doesn't have the blob.
+            // Ask the remote one for the list of chunks,
+            // and create a chunked reader that uses self.open_read() for
+            // individual chunks. There's a chance we already have some chunks
+            // locally, meaning we don't need to fetch them all from the remote
+            // BlobService.
+            match self.remote.as_ref().chunks(digest).await? {
+                // blob doesn't exist on the remote side either, nothing we can do.
+                None => Ok(None),
+                Some(remote_chunks) => {
+                    // if there's no more granular chunks, or the remote
+                    // blobservice doesn't support chunks, read the blob from
+                    // the remote blobservice directly.
+                    if remote_chunks.is_empty() {
+                        return self.remote.as_ref().open_read(digest).await;
+                    }
+                    // otherwise, a chunked reader, which will always try the
+                    // local backend first.
+
+                    // map Vec<ChunkMeta> to Vec<(B3Digest, u64)>
+                    let chunks: Vec<(B3Digest, u64)> = remote_chunks
+                        .into_iter()
+                        .map(|chunk_meta| {
+                            (
+                                B3Digest::try_from(chunk_meta.digest)
+                                    .expect("invalid chunk digest"),
+                                chunk_meta.size,
+                            )
+                        })
+                        .collect();
+
+                    Ok(Some(make_chunked_reader(self.clone(), chunks)))
+                }
+            }
+        }
+    }
+
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        // direct writes to the local one.
+        self.local.as_ref().open_write().await
+    }
+}
+
+fn make_chunked_reader<BS>(
+    // This must consume, as we can't retain references to blob_service,
+    // as it'd add a lifetime to BlobReader in general, which will get
+    // problematic in TvixStoreFs, which is using async move closures and cloning.
+    blob_service: BS,
+    chunks: Vec<(B3Digest, u64)>,
+) -> Box<dyn BlobReader>
+where
+    BS: BlobService + Clone + Send + Sync + 'static,
+{
+    // TODO: offset, verified streaming
+
+    // construct readers for each chunk
+    let blob_service = blob_service.clone();
+    let readers_stream = tokio_stream::iter(chunks).map(move |(digest, _)| {
+        let d = digest.to_owned();
+        let blob_service = blob_service.clone();
+        async move {
+            blob_service.open_read(&d.to_owned()).await?.ok_or_else(|| {
+                warn!(
+                    chunk.digest = BASE64.encode(digest.as_slice()),
+                    "chunk not found"
+                );
+                std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found")
+            })
+        }
+    });
+
+    // convert the stream of readers to a stream of streams of byte chunks
+    let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) });
+
+    // flatten into one stream of byte chunks
+    let bytes_stream = bytes_streams.try_flatten();
+
+    // convert into AsyncRead
+    let blob_reader = StreamReader::new(bytes_stream);
+
+    Box::new(NaiveSeeker::new(Box::pin(blob_reader)))
+}
diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs
index 197b51b1d9e1..348f87fb962e 100644
--- a/tvix/castore/src/blobservice/mod.rs
+++ b/tvix/castore/src/blobservice/mod.rs
@@ -4,6 +4,7 @@ use tonic::async_trait;
 use crate::proto::stat_blob_response::ChunkMeta;
 use crate::B3Digest;
 
+mod combinator;
 mod from_addr;
 mod grpc;
 mod memory;
@@ -14,6 +15,7 @@ mod sled;
 #[cfg(test)]
 mod tests;
 
+pub use self::combinator::CombinedBlobService;
 pub use self::from_addr::from_addr;
 pub use self::grpc::GRPCBlobService;
 pub use self::memory::MemoryBlobService;