about summary refs log tree commit diff
path: root/tvix/store/src/blobservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/blobservice')
-rw-r--r--tvix/store/src/blobservice/grpc.rs217
-rw-r--r--tvix/store/src/blobservice/memory.rs76
-rw-r--r--tvix/store/src/blobservice/mod.rs41
-rw-r--r--tvix/store/src/blobservice/sled.rs94
4 files changed, 428 insertions, 0 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs
new file mode 100644
index 000000000000..0b08fbf46ad9
--- /dev/null
+++ b/tvix/store/src/blobservice/grpc.rs
@@ -0,0 +1,217 @@
+use super::{BlobService, BlobWriter};
+use crate::{proto, B3Digest};
+use futures::sink::{SinkExt, SinkMapErr};
+use std::{collections::VecDeque, io};
+use tokio::task::JoinHandle;
+use tokio_stream::{wrappers::ReceiverStream, StreamExt};
+use tokio_util::{
+    io::{CopyToBytes, SinkWriter, SyncIoBridge},
+    sync::{PollSendError, PollSender},
+};
+use tonic::{transport::Channel, Code, Status, Streaming};
+use tracing::instrument;
+
+/// Connects to a (remote) tvix-store BlobService over gRPC.
+#[derive(Clone)]
+pub struct GRPCBlobService {
+    /// A handle into the active tokio runtime. Necessary to spawn tasks.
+    tokio_handle: tokio::runtime::Handle,
+
+    /// The internal reference to a gRPC client.
+    /// Cloning it is cheap, and it internally handles concurrent requests.
+    grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
+}
+
+impl GRPCBlobService {
+    /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient<Channel>],
+    /// and a [tokio::runtime::Handle].
+    pub fn new(
+        grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
+        tokio_handle: tokio::runtime::Handle,
+    ) -> Self {
+        Self {
+            tokio_handle,
+            grpc_client,
+        }
+    }
+    /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient<Channel>].
+    /// panics if called outside the context of a tokio runtime.
+    pub fn from_client(
+        grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
+    ) -> Self {
+        Self {
+            tokio_handle: tokio::runtime::Handle::current(),
+            grpc_client,
+        }
+    }
+}
+
+impl BlobService for GRPCBlobService {
+    type BlobReader = Box<dyn io::Read + Send>;
+    type BlobWriter = GRPCBlobWriter;
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> {
+        // Get a new handle to the gRPC client, and copy the digest.
+        let mut grpc_client = self.grpc_client.clone();
+        let digest = digest.clone();
+
+        let task: tokio::task::JoinHandle<Result<_, Status>> =
+            self.tokio_handle.spawn(async move {
+                Ok(grpc_client
+                    .stat(proto::StatBlobRequest {
+                        digest: digest.to_vec(),
+                        ..Default::default()
+                    })
+                    .await?
+                    .into_inner())
+            });
+
+        match self.tokio_handle.block_on(task)? {
+            Ok(_blob_meta) => Ok(true),
+            Err(e) if e.code() == Code::NotFound => Ok(false),
+            Err(e) => Err(crate::Error::StorageError(e.to_string())),
+        }
+    }
+
+    // On success, this returns a Ok(Some(io::Read)), which can be used to read
+    // the contents of the Blob, identified by the digest.
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, crate::Error> {
+        // Get a new handle to the gRPC client, and copy the digest.
+        let mut grpc_client = self.grpc_client.clone();
+        let digest = digest.clone();
+
+        // Construct the task that'll send out the request and return the stream
+        // the gRPC client should use to send [proto::BlobChunk], or an error if
+        // the blob doesn't exist.
+        let task: tokio::task::JoinHandle<Result<Streaming<proto::BlobChunk>, Status>> =
+            self.tokio_handle.spawn(async move {
+                let stream = grpc_client
+                    .read(proto::ReadBlobRequest {
+                        digest: digest.to_vec(),
+                    })
+                    .await?
+                    .into_inner();
+
+                Ok(stream)
+            });
+
+        // This runs the task to completion, which on success will return a stream.
+        // On reading from it, we receive individual [proto::BlobChunk], so we
+        // massage this to a stream of bytes,
+        // then create an [AsyncRead], which we'll turn into a [io::Read],
+        // that's returned from the function.
+        match self.tokio_handle.block_on(task)? {
+            Ok(stream) => {
+                // map the stream of proto::BlobChunk to bytes.
+                let data_stream = stream.map(|x| {
+                    x.map(|x| VecDeque::from(x.data))
+                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
+                });
+
+                // Use StreamReader::new to convert to an AsyncRead.
+                let data_reader = tokio_util::io::StreamReader::new(data_stream);
+
+                // Use SyncIoBridge to turn it into a sync Read.
+                let sync_reader = tokio_util::io::SyncIoBridge::new(data_reader);
+                Ok(Some(Box::new(sync_reader)))
+            }
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(crate::Error::StorageError(e.to_string())),
+        }
+    }
+
+    /// Returns a [Self::BlobWriter], that'll internally wrap each write in a
+    // [proto::BlobChunk] and which is passed to the
+    fn open_write(&self) -> Result<Self::BlobWriter, crate::Error> {
+        let mut grpc_client = self.grpc_client.clone();
+
+        // set up an mpsc channel passing around Bytes.
+        let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10);
+
+        // bytes arriving on the RX side are wrapped inside a
+        // [proto::BlobChunk], and a [ReceiverStream] is constructed.
+        let blobchunk_stream =
+            ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.to_vec() });
+
+        // That receiver stream is used as a stream in the gRPC BlobService.put rpc call.
+        let task: tokio::task::JoinHandle<Result<_, Status>> = self
+            .tokio_handle
+            .spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) });
+
+        // The tx part of the channel is converted to a sink of byte chunks.
+
+        // We need to make this a function pointer, not a closure.
+        fn convert_error(_: PollSendError<bytes::Bytes>) -> io::Error {
+            io::Error::from(io::ErrorKind::BrokenPipe)
+        }
+
+        let sink = PollSender::new(tx)
+            .sink_map_err(convert_error as fn(PollSendError<bytes::Bytes>) -> io::Error);
+        // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item"
+
+        // … which is turned into an [tokio::io::AsyncWrite].
+        let async_writer = SinkWriter::new(CopyToBytes::new(sink));
+        // … which is then turned into a [io::Write].
+        let writer = SyncIoBridge::new(async_writer);
+
+        Ok(GRPCBlobWriter {
+            tokio_handle: self.tokio_handle.clone(), // TODO: is the clone() ok here?
+            task,
+            inner_writer: writer,
+        })
+    }
+}
+
+type BridgedWriter = SyncIoBridge<
+    SinkWriter<
+        CopyToBytes<
+            SinkMapErr<PollSender<bytes::Bytes>, fn(PollSendError<bytes::Bytes>) -> io::Error>,
+        >,
+    >,
+>;
+
+pub struct GRPCBlobWriter {
+    /// A handle into the active tokio runtime. Necessary to block on the task
+    /// containing the put request.
+    tokio_handle: tokio::runtime::Handle,
+
+    /// The task containing the put request.
+    task: JoinHandle<Result<proto::PutBlobResponse, Status>>,
+
+    /// The inner Writer.
+    inner_writer: BridgedWriter,
+}
+
+impl BlobWriter for GRPCBlobWriter {
+    fn close(mut self) -> Result<B3Digest, crate::Error> {
+        // invoke shutdown, so the inner writer closes its internal tx side of
+        // the channel.
+        self.inner_writer
+            .shutdown()
+            .map_err(|e| crate::Error::StorageError(e.to_string()))?;
+
+        // block on the RPC call to return.
+        // This ensures all chunks are sent out, and have been received by the
+        // backend.
+        match self.tokio_handle.block_on(self.task)? {
+            Ok(resp) => {
+                // return the digest from the response.
+                B3Digest::from_vec(resp.digest).map_err(|_| {
+                    crate::Error::StorageError("invalid root digest length in response".to_string())
+                })
+            }
+            Err(e) => Err(crate::Error::StorageError(e.to_string())),
+        }
+    }
+}
+
+impl io::Write for GRPCBlobWriter {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.inner_writer.write(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.inner_writer.flush()
+    }
+}
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
new file mode 100644
index 000000000000..1ee59d108743
--- /dev/null
+++ b/tvix/store/src/blobservice/memory.rs
@@ -0,0 +1,76 @@
+use std::io::Cursor;
+use std::{
+    collections::HashMap,
+    sync::{Arc, RwLock},
+};
+use tracing::{instrument, warn};
+
+use super::{BlobService, BlobWriter};
+use crate::{B3Digest, Error};
+
+#[derive(Clone, Default)]
+pub struct MemoryBlobService {
+    db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
+}
+
+impl BlobService for MemoryBlobService {
+    type BlobReader = Cursor<Vec<u8>>;
+    type BlobWriter = MemoryBlobWriter;
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    fn has(&self, digest: &B3Digest) -> Result<bool, Error> {
+        let db = self.db.read().unwrap();
+        Ok(db.contains_key(digest))
+    }
+
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> {
+        let db = self.db.read().unwrap();
+
+        Ok(db.get(digest).map(|x| Cursor::new(x.clone())))
+    }
+
+    #[instrument(skip(self))]
+    fn open_write(&self) -> Result<Self::BlobWriter, Error> {
+        Ok(MemoryBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct MemoryBlobWriter {
+    db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
+
+    buf: Vec<u8>,
+}
+
+impl MemoryBlobWriter {
+    fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self {
+        Self {
+            buf: Vec::new(),
+            db,
+        }
+    }
+}
+impl std::io::Write for MemoryBlobWriter {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        self.buf.write(buf)
+    }
+
+    fn flush(&mut self) -> std::io::Result<()> {
+        self.buf.flush()
+    }
+}
+
+impl BlobWriter for MemoryBlobWriter {
+    fn close(self) -> Result<B3Digest, Error> {
+        // in this memory implementation, we don't actually bother hashing
+        // incrementally while writing, but do it at the end.
+        let mut hasher = blake3::Hasher::new();
+        hasher.update(&self.buf);
+        let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap();
+
+        // open the database for writing.
+        let mut db = self.db.write()?;
+        db.insert(digest.clone(), self.buf);
+
+        Ok(digest)
+    }
+}
diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs
new file mode 100644
index 000000000000..c5a2de124656
--- /dev/null
+++ b/tvix/store/src/blobservice/mod.rs
@@ -0,0 +1,41 @@
+use std::io;
+
+use crate::{B3Digest, Error};
+
+mod grpc;
+mod memory;
+mod sled;
+
+pub use self::grpc::GRPCBlobService;
+pub use self::memory::MemoryBlobService;
+pub use self::sled::SledBlobService;
+
+/// The base trait all BlobService services need to implement.
+/// It provides functions to check whether a given blob exists,
+/// a way to get a [io::Read] to a blob, and a method to initiate writing a new
+/// Blob, which returns a [BlobWriter], that can be used
+pub trait BlobService {
+    type BlobReader: io::Read + Send + std::marker::Unpin;
+    type BlobWriter: BlobWriter + Send;
+
+    /// Check if the service has the blob, by its content hash.
+    fn has(&self, digest: &B3Digest) -> Result<bool, Error>;
+
+    /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>.
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error>;
+
+    /// Insert a new blob into the store. Returns a [BlobWriter], which
+    /// implements [io::Write] and a [BlobWriter::close].
+    /// TODO: is there any reason we want this to be a Result<>, and not just T?
+    fn open_write(&self) -> Result<Self::BlobWriter, Error>;
+}
+
+/// A [io::Write] that you need to close() afterwards, and get back the digest
+/// of the written blob.
+pub trait BlobWriter: io::Write {
+    /// Signal there's no more data to be written, and return the digest of the
+    /// contents written.
+    ///
+    /// This consumes self, so it's not possible to close twice.
+    fn close(self) -> Result<B3Digest, Error>;
+}
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs
new file mode 100644
index 000000000000..2b090335344d
--- /dev/null
+++ b/tvix/store/src/blobservice/sled.rs
@@ -0,0 +1,94 @@
+use super::{BlobService, BlobWriter};
+use crate::{B3Digest, Error};
+use std::{
+    io::{self, Cursor},
+    path::PathBuf,
+};
+use tracing::instrument;
+
+#[derive(Clone)]
+pub struct SledBlobService {
+    db: sled::Db,
+}
+
+impl SledBlobService {
+    pub fn new(p: PathBuf) -> Result<Self, sled::Error> {
+        let config = sled::Config::default().use_compression(true).path(p);
+        let db = config.open()?;
+
+        Ok(Self { db })
+    }
+
+    pub fn new_temporary() -> Result<Self, sled::Error> {
+        let config = sled::Config::default().temporary(true);
+        let db = config.open()?;
+
+        Ok(Self { db })
+    }
+}
+
+impl BlobService for SledBlobService {
+    type BlobReader = Cursor<Vec<u8>>;
+    type BlobWriter = SledBlobWriter;
+
+    #[instrument(skip(self), fields(blob.digest=%digest))]
+    fn has(&self, digest: &B3Digest) -> Result<bool, Error> {
+        match self.db.contains_key(digest.to_vec()) {
+            Ok(has) => Ok(has),
+            Err(e) => Err(Error::StorageError(e.to_string())),
+        }
+    }
+
+    #[instrument(skip(self), fields(blob.digest=%digest))]
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> {
+        match self.db.get(digest.to_vec()) {
+            Ok(None) => Ok(None),
+            Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))),
+            Err(e) => Err(Error::StorageError(e.to_string())),
+        }
+    }
+
+    #[instrument(skip(self))]
+    fn open_write(&self) -> Result<Self::BlobWriter, Error> {
+        Ok(SledBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct SledBlobWriter {
+    db: sled::Db,
+    buf: Vec<u8>,
+    hasher: blake3::Hasher,
+}
+
+impl SledBlobWriter {
+    pub fn new(db: sled::Db) -> Self {
+        Self {
+            buf: Vec::default(),
+            db,
+            hasher: blake3::Hasher::new(),
+        }
+    }
+}
+
+impl io::Write for SledBlobWriter {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        let bytes_written = self.buf.write(buf)?;
+        self.hasher.write(&buf[..bytes_written])
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.buf.flush()
+    }
+}
+
+impl BlobWriter for SledBlobWriter {
+    fn close(self) -> Result<B3Digest, Error> {
+        let digest = self.hasher.finalize();
+        self.db
+            .insert(digest.as_bytes(), self.buf)
+            .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?;
+
+        // We know self.hasher is doing blake3 hashing, so this won't fail.
+        Ok(B3Digest::from_vec(digest.as_bytes().to_vec()).unwrap())
+    }
+}