use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{ proto::{self, stat_blob_response::ChunkMeta}, B3Digest, }; use futures::sink::SinkExt; use std::{ io::{self, Cursor}, pin::pin, sync::Arc, task::Poll, }; use tokio::io::AsyncWriteExt; use tokio::task::JoinHandle; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_util::{ io::{CopyToBytes, SinkWriter}, sync::PollSender, }; use tonic::{async_trait, Code, Status}; use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] pub struct GRPCBlobService<T> { instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::blob_service_client::BlobServiceClient<T>, } impl<T> GRPCBlobService<T> { /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. pub fn from_client( instance_name: String, grpc_client: proto::blob_service_client::BlobServiceClient<T>, ) -> Self { Self { instance_name, grpc_client, } } } #[async_trait] impl<T> BlobService for GRPCBlobService<T> where T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, T::Future: Send, { #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { match self .grpc_client .clone() .stat(proto::StatBlobRequest { digest: digest.clone().into(), ..Default::default() }) .await { Ok(_blob_meta) => Ok(true), Err(e) if e.code() == Code::NotFound => Ok(false), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } } #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // First try to get a list of chunks. In case there's only one chunk returned, // buffer its data into a Vec, otherwise use a ChunkedReader. // We previously used NaiveSeeker here, but userland likes to seek backwards too often, // and without store composition this will get very noisy. // FUTUREWORK: use CombinedBlobService and store composition. match self.chunks(digest).await { Ok(None) => Ok(None), Ok(Some(chunks)) => { if chunks.is_empty() || chunks.len() == 1 { // No more granular chunking info, treat this as an individual chunk. // Get a stream of [proto::BlobChunk], or return an error if the blob // doesn't exist. return match self .grpc_client .clone() .read(proto::ReadBlobRequest { digest: digest.clone().into(), }) .await { Ok(stream) => { let data_stream = stream.into_inner().map(|e| { e.map(|c| c.data) .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s)) }); // Use StreamReader::new to convert to an AsyncRead. let mut data_reader = tokio_util::io::StreamReader::new(data_stream); let mut buf = Vec::new(); // TODO: only do this up to a certain limit. tokio::io::copy(&mut data_reader, &mut buf).await?; Ok(Some(Box::new(Cursor::new(buf)))) } Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), }; } // The chunked case. Let ChunkedReader do individual reads. // TODO: we should store the chunking data in some local cache, // so `ChunkedReader` doesn't call `self.chunks` *again* for every chunk. // Think about how store composition will fix this. let chunked_reader = ChunkedReader::from_chunks( chunks.into_iter().map(|chunk| { ( chunk.digest.try_into().expect("invalid b3 digest"), chunk.size, ) }), Arc::new(self.clone()) as Arc<dyn BlobService>, ); Ok(Some(Box::new(chunked_reader))) } Err(e) => Err(e)?, } } /// Returns a BlobWriter, that'll internally wrap each write in a /// [proto::BlobChunk], which is send to the gRPC server. #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // set up an mpsc channel passing around Bytes. let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); // bytes arriving on the RX side are wrapped inside a // [proto::BlobChunk], and a [ReceiverStream] is constructed. let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); // spawn the gRPC put request, which will read from blobchunk_stream. let task = tokio::spawn({ let mut grpc_client = self.grpc_client.clone(); async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) } // instrument the task with the current span, this is not done by default .in_current_span() }); // The tx part of the channel is converted to a sink of byte chunks. let sink = PollSender::new(tx) .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)); // … which is turned into an [tokio::io::AsyncWrite]. let writer = SinkWriter::new(CopyToBytes::new(sink)); Box::new(GRPCBlobWriter { task_and_writer: Some((task, writer)), digest: None, }) } #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { let resp = self .grpc_client .clone() .stat(proto::StatBlobRequest { digest: digest.clone().into(), send_chunks: true, ..Default::default() }) .await; match resp { Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), Ok(resp) => { let resp = resp.into_inner(); resp.validate() .map_err(|e| std::io::Error::new(io::ErrorKind::InvalidData, e))?; Ok(Some(resp.chunks)) } } } } #[derive(serde::Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct GRPCBlobServiceConfig { url: String, } impl TryFrom<url::Url> for GRPCBlobServiceConfig { type Error = Box<dyn std::error::Error + Send + Sync>; fn try_from(url: url::Url) -> Result<Self, Self::Error> { // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. Ok(GRPCBlobServiceConfig { url: url.to_string(), }) } } #[async_trait] impl ServiceBuilder for GRPCBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let client = proto::blob_service_client::BlobServiceClient::new( crate::tonic::channel_from_url(&self.url.parse()?).await?, ); Ok(Arc::new(GRPCBlobService::from_client( instance_name.to_string(), client, ))) } } pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> { /// The task containing the put request, and the inner writer, if we're still writing. task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>, /// The digest that has been returned, if we successfully closed. digest: Option<B3Digest>, } #[async_trait] impl<W: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static> BlobWriter for GRPCBlobWriter<W> { async fn close(&mut self) -> io::Result<B3Digest> { if self.task_and_writer.is_none() { // if we're already closed, return the b3 digest, which must exist. // If it doesn't, we already closed and failed once, and didn't handle the error. match &self.digest { Some(digest) => Ok(digest.clone()), None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "already closed")), } } else { let (task, mut writer) = self.task_and_writer.take().unwrap(); // invoke shutdown, so the inner writer closes its internal tx side of // the channel. writer.shutdown().await?; // block on the RPC call to return. // This ensures all chunks are sent out, and have been received by the // backend. match task.await? { Ok(resp) => { // return the digest from the response, and store it in self.digest for subsequent closes. let digest_len = resp.digest.len(); let digest: B3Digest = resp.digest.try_into().map_err(|_| { io::Error::new( io::ErrorKind::Other, format!("invalid root digest length {} in response", digest_len), ) })?; self.digest = Some(digest.clone()); Ok(digest) } Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), } } } } impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for GRPCBlobWriter<W> { fn poll_write( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll<Result<usize, io::Error>> { match &mut self.task_and_writer { None => Poll::Ready(Err(io::Error::new( io::ErrorKind::NotConnected, "already closed", ))), Some((_, ref mut writer)) => { let pinned_writer = pin!(writer); pinned_writer.poll_write(cx, buf) } } } fn poll_flush( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Result<(), io::Error>> { match &mut self.task_and_writer { None => Poll::Ready(Err(io::Error::new( io::ErrorKind::NotConnected, "already closed", ))), Some((_, ref mut writer)) => { let pinned_writer = pin!(writer); pinned_writer.poll_flush(cx) } } } fn poll_shutdown( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Result<(), io::Error>> { // TODO(raitobezarius): this might not be a graceful shutdown of the // channel inside the gRPC connection. Poll::Ready(Ok(())) } } #[cfg(test)] mod tests { use std::time::Duration; use tempfile::TempDir; use tokio::net::UnixListener; use tokio_retry::strategy::ExponentialBackoff; use tokio_retry::Retry; use tokio_stream::wrappers::UnixListenerStream; use crate::blobservice::MemoryBlobService; use crate::fixtures; use crate::proto::blob_service_client::BlobServiceClient; use crate::proto::GRPCBlobServiceWrapper; use super::BlobService; use super::GRPCBlobService; /// This ensures connecting via gRPC works as expected. #[tokio::test] async fn test_valid_unix_path_ping_pong() { let tmpdir = TempDir::new().unwrap(); let socket_path = tmpdir.path().join("daemon"); let path_clone = socket_path.clone(); // Spin up a server tokio::spawn(async { let uds = UnixListener::bind(path_clone).unwrap(); let uds_stream = UnixListenerStream::new(uds); // spin up a new server let mut server = tonic::transport::Server::builder(); let router = server.add_service(crate::proto::blob_service_server::BlobServiceServer::new( GRPCBlobServiceWrapper::new( Box::<MemoryBlobService>::default() as Box<dyn BlobService> ), )); router.serve_with_incoming(uds_stream).await }); // wait for the socket to be created Retry::spawn( ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), || async { if socket_path.exists() { Ok(()) } else { Err(()) } }, ) .await .expect("failed to wait for socket"); // prepare a client let grpc_client = { let url = url::Url::parse(&format!( "grpc+unix://{}?wait-connect=1", socket_path.display() )) .expect("must parse"); let client = BlobServiceClient::new( crate::tonic::channel_from_url(&url) .await .expect("must succeed"), ); GRPCBlobService::from_client("root".into(), client) }; let has = grpc_client .has(&fixtures::BLOB_A_DIGEST) .await .expect("must not be err"); assert!(!has); } }