diff options
Diffstat (limited to 'tvix/store/src/blobservice/grpc.rs')
-rw-r--r-- | tvix/store/src/blobservice/grpc.rs | 96 |
1 files changed, 64 insertions, 32 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index 0b08fbf46ad9..46ec64bce785 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -47,9 +47,6 @@ impl GRPCBlobService { } impl BlobService for GRPCBlobService { - type BlobReader = Box<dyn io::Read + Send>; - type BlobWriter = GRPCBlobWriter; - #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. @@ -76,7 +73,10 @@ impl BlobService for GRPCBlobService { // On success, this returns a Ok(Some(io::Read)), which can be used to read // the contents of the Blob, identified by the digest. - fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, crate::Error> { + fn open_read( + &self, + digest: &B3Digest, + ) -> Result<Option<Box<dyn io::Read + Send>>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest = digest.clone(); @@ -123,7 +123,7 @@ impl BlobService for GRPCBlobService { /// Returns a [Self::BlobWriter], that'll internally wrap each write in a // [proto::BlobChunk] and which is passed to the - fn open_write(&self) -> Result<Self::BlobWriter, crate::Error> { + fn open_write(&self) -> Result<Box<dyn BlobWriter>, crate::Error> { let mut grpc_client = self.grpc_client.clone(); // set up an mpsc channel passing around Bytes. @@ -155,11 +155,11 @@ impl BlobService for GRPCBlobService { // … which is then turned into a [io::Write]. let writer = SyncIoBridge::new(async_writer); - Ok(GRPCBlobWriter { + Ok(Box::new(GRPCBlobWriter { tokio_handle: self.tokio_handle.clone(), // TODO: is the clone() ok here? - task, - inner_writer: writer, - }) + task_and_writer: Some((task, writer)), + digest: None, + })) } } @@ -176,42 +176,74 @@ pub struct GRPCBlobWriter { /// containing the put request. tokio_handle: tokio::runtime::Handle, - /// The task containing the put request. - task: JoinHandle<Result<proto::PutBlobResponse, Status>>, + /// The task containing the put request, and the inner writer, if we're still writing. + task_and_writer: Option<( + JoinHandle<Result<proto::PutBlobResponse, Status>>, + BridgedWriter, + )>, - /// The inner Writer. - inner_writer: BridgedWriter, + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, } impl BlobWriter for GRPCBlobWriter { - fn close(mut self) -> Result<B3Digest, crate::Error> { - // invoke shutdown, so the inner writer closes its internal tx side of - // the channel. - self.inner_writer - .shutdown() - .map_err(|e| crate::Error::StorageError(e.to_string()))?; - - // block on the RPC call to return. - // This ensures all chunks are sent out, and have been received by the - // backend. - match self.tokio_handle.block_on(self.task)? { - Ok(resp) => { - // return the digest from the response. - B3Digest::from_vec(resp.digest).map_err(|_| { - crate::Error::StorageError("invalid root digest length in response".to_string()) - }) + fn close(&mut self) -> Result<B3Digest, crate::Error> { + if self.task_and_writer.is_none() { + // if we're already closed, return the b3 digest, which must exist. + // If it doesn't, we already closed and failed once, and didn't handle the error. + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (task, mut writer) = self.task_and_writer.take().unwrap(); + + // invoke shutdown, so the inner writer closes its internal tx side of + // the channel. + writer + .shutdown() + .map_err(|e| crate::Error::StorageError(e.to_string()))?; + + // block on the RPC call to return. + // This ensures all chunks are sent out, and have been received by the + // backend. + match self.tokio_handle.block_on(task)? { + Ok(resp) => { + // return the digest from the response, and store it in self.digest for subsequent closes. + let digest = B3Digest::from_vec(resp.digest).map_err(|_| { + crate::Error::StorageError( + "invalid root digest length in response".to_string(), + ) + })?; + self.digest = Some(digest.clone()); + Ok(digest) + } + Err(e) => Err(crate::Error::StorageError(e.to_string())), } - Err(e) => Err(crate::Error::StorageError(e.to_string())), } } } impl io::Write for GRPCBlobWriter { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.inner_writer.write(buf) + match &mut self.task_and_writer { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((_, ref mut writer)) => writer.write(buf), + } } fn flush(&mut self) -> io::Result<()> { - self.inner_writer.flush() + match &mut self.task_and_writer { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((_, ref mut writer)) => writer.flush(), + } } } |