diff options
Diffstat (limited to 'tvix/store/src/blobservice')
-rw-r--r-- | tvix/store/src/blobservice/grpc.rs | 217 | ||||
-rw-r--r-- | tvix/store/src/blobservice/memory.rs | 76 | ||||
-rw-r--r-- | tvix/store/src/blobservice/mod.rs | 41 | ||||
-rw-r--r-- | tvix/store/src/blobservice/sled.rs | 94 |
4 files changed, 428 insertions, 0 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs new file mode 100644 index 000000000000..0b08fbf46ad9 --- /dev/null +++ b/tvix/store/src/blobservice/grpc.rs @@ -0,0 +1,217 @@ +use super::{BlobService, BlobWriter}; +use crate::{proto, B3Digest}; +use futures::sink::{SinkExt, SinkMapErr}; +use std::{collections::VecDeque, io}; +use tokio::task::JoinHandle; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_util::{ + io::{CopyToBytes, SinkWriter, SyncIoBridge}, + sync::{PollSendError, PollSender}, +}; +use tonic::{transport::Channel, Code, Status, Streaming}; +use tracing::instrument; + +/// Connects to a (remote) tvix-store BlobService over gRPC. +#[derive(Clone)] +pub struct GRPCBlobService { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, +} + +impl GRPCBlobService { + /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient<Channel>], + /// and a [tokio::runtime::Handle]. + pub fn new( + grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + tokio_handle: tokio::runtime::Handle, + ) -> Self { + Self { + tokio_handle, + grpc_client, + } + } + /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient<Channel>]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle: tokio::runtime::Handle::current(), + grpc_client, + } + } +} + +impl BlobService for GRPCBlobService { + type BlobReader = Box<dyn io::Read + Send>; + type BlobWriter = GRPCBlobWriter; + + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + let digest = digest.clone(); + + let task: tokio::task::JoinHandle<Result<_, Status>> = + self.tokio_handle.spawn(async move { + Ok(grpc_client + .stat(proto::StatBlobRequest { + digest: digest.to_vec(), + ..Default::default() + }) + .await? + .into_inner()) + }); + + match self.tokio_handle.block_on(task)? { + Ok(_blob_meta) => Ok(true), + Err(e) if e.code() == Code::NotFound => Ok(false), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + // On success, this returns a Ok(Some(io::Read)), which can be used to read + // the contents of the Blob, identified by the digest. + fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + let digest = digest.clone(); + + // Construct the task that'll send out the request and return the stream + // the gRPC client should use to send [proto::BlobChunk], or an error if + // the blob doesn't exist. + let task: tokio::task::JoinHandle<Result<Streaming<proto::BlobChunk>, Status>> = + self.tokio_handle.spawn(async move { + let stream = grpc_client + .read(proto::ReadBlobRequest { + digest: digest.to_vec(), + }) + .await? + .into_inner(); + + Ok(stream) + }); + + // This runs the task to completion, which on success will return a stream. + // On reading from it, we receive individual [proto::BlobChunk], so we + // massage this to a stream of bytes, + // then create an [AsyncRead], which we'll turn into a [io::Read], + // that's returned from the function. + match self.tokio_handle.block_on(task)? { + Ok(stream) => { + // map the stream of proto::BlobChunk to bytes. + let data_stream = stream.map(|x| { + x.map(|x| VecDeque::from(x.data)) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) + }); + + // Use StreamReader::new to convert to an AsyncRead. + let data_reader = tokio_util::io::StreamReader::new(data_stream); + + // Use SyncIoBridge to turn it into a sync Read. + let sync_reader = tokio_util::io::SyncIoBridge::new(data_reader); + Ok(Some(Box::new(sync_reader))) + } + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + /// Returns a [Self::BlobWriter], that'll internally wrap each write in a + // [proto::BlobChunk] and which is passed to the + fn open_write(&self) -> Result<Self::BlobWriter, crate::Error> { + let mut grpc_client = self.grpc_client.clone(); + + // set up an mpsc channel passing around Bytes. + let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); + + // bytes arriving on the RX side are wrapped inside a + // [proto::BlobChunk], and a [ReceiverStream] is constructed. + let blobchunk_stream = + ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.to_vec() }); + + // That receiver stream is used as a stream in the gRPC BlobService.put rpc call. + let task: tokio::task::JoinHandle<Result<_, Status>> = self + .tokio_handle + .spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); + + // The tx part of the channel is converted to a sink of byte chunks. + + // We need to make this a function pointer, not a closure. + fn convert_error(_: PollSendError<bytes::Bytes>) -> io::Error { + io::Error::from(io::ErrorKind::BrokenPipe) + } + + let sink = PollSender::new(tx) + .sink_map_err(convert_error as fn(PollSendError<bytes::Bytes>) -> io::Error); + // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item" + + // … which is turned into an [tokio::io::AsyncWrite]. + let async_writer = SinkWriter::new(CopyToBytes::new(sink)); + // … which is then turned into a [io::Write]. + let writer = SyncIoBridge::new(async_writer); + + Ok(GRPCBlobWriter { + tokio_handle: self.tokio_handle.clone(), // TODO: is the clone() ok here? + task, + inner_writer: writer, + }) + } +} + +type BridgedWriter = SyncIoBridge< + SinkWriter< + CopyToBytes< + SinkMapErr<PollSender<bytes::Bytes>, fn(PollSendError<bytes::Bytes>) -> io::Error>, + >, + >, +>; + +pub struct GRPCBlobWriter { + /// A handle into the active tokio runtime. Necessary to block on the task + /// containing the put request. + tokio_handle: tokio::runtime::Handle, + + /// The task containing the put request. + task: JoinHandle<Result<proto::PutBlobResponse, Status>>, + + /// The inner Writer. + inner_writer: BridgedWriter, +} + +impl BlobWriter for GRPCBlobWriter { + fn close(mut self) -> Result<B3Digest, crate::Error> { + // invoke shutdown, so the inner writer closes its internal tx side of + // the channel. + self.inner_writer + .shutdown() + .map_err(|e| crate::Error::StorageError(e.to_string()))?; + + // block on the RPC call to return. + // This ensures all chunks are sent out, and have been received by the + // backend. + match self.tokio_handle.block_on(self.task)? { + Ok(resp) => { + // return the digest from the response. + B3Digest::from_vec(resp.digest).map_err(|_| { + crate::Error::StorageError("invalid root digest length in response".to_string()) + }) + } + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } +} + +impl io::Write for GRPCBlobWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.inner_writer.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner_writer.flush() + } +} diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs new file mode 100644 index 000000000000..1ee59d108743 --- /dev/null +++ b/tvix/store/src/blobservice/memory.rs @@ -0,0 +1,76 @@ +use std::io::Cursor; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; +use tracing::{instrument, warn}; + +use super::{BlobService, BlobWriter}; +use crate::{B3Digest, Error}; + +#[derive(Clone, Default)] +pub struct MemoryBlobService { + db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, +} + +impl BlobService for MemoryBlobService { + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = MemoryBlobWriter; + + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + fn has(&self, digest: &B3Digest) -> Result<bool, Error> { + let db = self.db.read().unwrap(); + Ok(db.contains_key(digest)) + } + + fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> { + let db = self.db.read().unwrap(); + + Ok(db.get(digest).map(|x| Cursor::new(x.clone()))) + } + + #[instrument(skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(MemoryBlobWriter::new(self.db.clone())) + } +} + +pub struct MemoryBlobWriter { + db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, + + buf: Vec<u8>, +} + +impl MemoryBlobWriter { + fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self { + Self { + buf: Vec::new(), + db, + } + } +} +impl std::io::Write for MemoryBlobWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + self.buf.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.buf.flush() + } +} + +impl BlobWriter for MemoryBlobWriter { + fn close(self) -> Result<B3Digest, Error> { + // in this memory implementation, we don't actually bother hashing + // incrementally while writing, but do it at the end. + let mut hasher = blake3::Hasher::new(); + hasher.update(&self.buf); + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // open the database for writing. + let mut db = self.db.write()?; + db.insert(digest.clone(), self.buf); + + Ok(digest) + } +} diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs new file mode 100644 index 000000000000..c5a2de124656 --- /dev/null +++ b/tvix/store/src/blobservice/mod.rs @@ -0,0 +1,41 @@ +use std::io; + +use crate::{B3Digest, Error}; + +mod grpc; +mod memory; +mod sled; + +pub use self::grpc::GRPCBlobService; +pub use self::memory::MemoryBlobService; +pub use self::sled::SledBlobService; + +/// The base trait all BlobService services need to implement. +/// It provides functions to check whether a given blob exists, +/// a way to get a [io::Read] to a blob, and a method to initiate writing a new +/// Blob, which returns a [BlobWriter], that can be used +pub trait BlobService { + type BlobReader: io::Read + Send + std::marker::Unpin; + type BlobWriter: BlobWriter + Send; + + /// Check if the service has the blob, by its content hash. + fn has(&self, digest: &B3Digest) -> Result<bool, Error>; + + /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>. + fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error>; + + /// Insert a new blob into the store. Returns a [BlobWriter], which + /// implements [io::Write] and a [BlobWriter::close]. + /// TODO: is there any reason we want this to be a Result<>, and not just T? + fn open_write(&self) -> Result<Self::BlobWriter, Error>; +} + +/// A [io::Write] that you need to close() afterwards, and get back the digest +/// of the written blob. +pub trait BlobWriter: io::Write { + /// Signal there's no more data to be written, and return the digest of the + /// contents written. + /// + /// This consumes self, so it's not possible to close twice. + fn close(self) -> Result<B3Digest, Error>; +} diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs new file mode 100644 index 000000000000..2b090335344d --- /dev/null +++ b/tvix/store/src/blobservice/sled.rs @@ -0,0 +1,94 @@ +use super::{BlobService, BlobWriter}; +use crate::{B3Digest, Error}; +use std::{ + io::{self, Cursor}, + path::PathBuf, +}; +use tracing::instrument; + +#[derive(Clone)] +pub struct SledBlobService { + db: sled::Db, +} + +impl SledBlobService { + pub fn new(p: PathBuf) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +impl BlobService for SledBlobService { + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = SledBlobWriter; + + #[instrument(skip(self), fields(blob.digest=%digest))] + fn has(&self, digest: &B3Digest) -> Result<bool, Error> { + match self.db.contains_key(digest.to_vec()) { + Ok(has) => Ok(has), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self), fields(blob.digest=%digest))] + fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> { + match self.db.get(digest.to_vec()) { + Ok(None) => Ok(None), + Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(SledBlobWriter::new(self.db.clone())) + } +} + +pub struct SledBlobWriter { + db: sled::Db, + buf: Vec<u8>, + hasher: blake3::Hasher, +} + +impl SledBlobWriter { + pub fn new(db: sled::Db) -> Self { + Self { + buf: Vec::default(), + db, + hasher: blake3::Hasher::new(), + } + } +} + +impl io::Write for SledBlobWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + let bytes_written = self.buf.write(buf)?; + self.hasher.write(&buf[..bytes_written]) + } + + fn flush(&mut self) -> io::Result<()> { + self.buf.flush() + } +} + +impl BlobWriter for SledBlobWriter { + fn close(self) -> Result<B3Digest, Error> { + let digest = self.hasher.finalize(); + self.db + .insert(digest.as_bytes(), self.buf) + .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; + + // We know self.hasher is doing blake3 hashing, so this won't fail. + Ok(B3Digest::from_vec(digest.as_bytes().to_vec()).unwrap()) + } +} |