diff options
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 71 |
1 files changed, 69 insertions, 2 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index f1ab3a87e477..04097997c2e4 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,7 +1,13 @@ use crate::{ blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest, }; -use std::{collections::VecDeque, io, pin::Pin, sync::Arc}; +use std::{ + collections::VecDeque, + io, + ops::{Deref, DerefMut}, + pin::Pin, + sync::Arc, +}; use tokio::task; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; @@ -20,6 +26,64 @@ impl From<Arc<dyn BlobService>> for GRPCBlobServiceWrapper { } } +// This is necessary because bytes::BytesMut comes up with +// a default 64 bytes capacity that cannot be changed +// easily if you assume a bytes::BufMut trait implementation +// Therefore, we override the Default implementation here +// TODO(raitobezarius?): upstream me properly +struct BytesMutWithDefaultCapacity<const N: usize> { + inner: bytes::BytesMut, +} + +impl<const N: usize> Deref for BytesMutWithDefaultCapacity<N> { + type Target = bytes::BytesMut; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<const N: usize> DerefMut for BytesMutWithDefaultCapacity<N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl<const N: usize> Default for BytesMutWithDefaultCapacity<N> { + fn default() -> Self { + BytesMutWithDefaultCapacity { + inner: bytes::BytesMut::with_capacity(N), + } + } +} + +impl<const N: usize> bytes::Buf for BytesMutWithDefaultCapacity<N> { + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn chunk(&self) -> &[u8] { + self.inner.chunk() + } + + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt); + } +} + +unsafe impl<const N: usize> bytes::BufMut for BytesMutWithDefaultCapacity<N> { + fn remaining_mut(&self) -> usize { + self.inner.remaining_mut() + } + + unsafe fn advance_mut(&mut self, cnt: usize) { + self.inner.advance_mut(cnt); + } + + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + self.inner.chunk_mut() + } +} + #[async_trait] impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 @@ -58,7 +122,10 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { match self.blob_service.open_read(&req_digest) { Ok(Some(reader)) => { - let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into(); + let async_reader: SyncReadIntoAsyncRead< + _, + BytesMutWithDefaultCapacity<{ 100 * 1024 }>, + > = reader.into(); fn stream_mapper( x: Result<bytes::Bytes, io::Error>, |