about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
Diffstat (limited to 'tvix')
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs71
1 files changed, 69 insertions, 2 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index f1ab3a87e477..04097997c2e4 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,7 +1,13 @@
 use crate::{
     blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest,
 };
-use std::{collections::VecDeque, io, pin::Pin, sync::Arc};
+use std::{
+    collections::VecDeque,
+    io,
+    ops::{Deref, DerefMut},
+    pin::Pin,
+    sync::Arc,
+};
 use tokio::task;
 use tokio_stream::StreamExt;
 use tokio_util::io::ReaderStream;
@@ -20,6 +26,64 @@ impl From<Arc<dyn BlobService>> for GRPCBlobServiceWrapper {
     }
 }
 
+// This is necessary because bytes::BytesMut comes up with
+// a default 64 bytes capacity that cannot be changed
+// easily if you assume a bytes::BufMut trait implementation
+// Therefore, we override the Default implementation here
+// TODO(raitobezarius?): upstream me properly
+struct BytesMutWithDefaultCapacity<const N: usize> {
+    inner: bytes::BytesMut,
+}
+
+impl<const N: usize> Deref for BytesMutWithDefaultCapacity<N> {
+    type Target = bytes::BytesMut;
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+impl<const N: usize> DerefMut for BytesMutWithDefaultCapacity<N> {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.inner
+    }
+}
+
+impl<const N: usize> Default for BytesMutWithDefaultCapacity<N> {
+    fn default() -> Self {
+        BytesMutWithDefaultCapacity {
+            inner: bytes::BytesMut::with_capacity(N),
+        }
+    }
+}
+
+impl<const N: usize> bytes::Buf for BytesMutWithDefaultCapacity<N> {
+    fn remaining(&self) -> usize {
+        self.inner.remaining()
+    }
+
+    fn chunk(&self) -> &[u8] {
+        self.inner.chunk()
+    }
+
+    fn advance(&mut self, cnt: usize) {
+        self.inner.advance(cnt);
+    }
+}
+
+unsafe impl<const N: usize> bytes::BufMut for BytesMutWithDefaultCapacity<N> {
+    fn remaining_mut(&self) -> usize {
+        self.inner.remaining_mut()
+    }
+
+    unsafe fn advance_mut(&mut self, cnt: usize) {
+        self.inner.advance_mut(cnt);
+    }
+
+    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
+        self.inner.chunk_mut()
+    }
+}
+
 #[async_trait]
 impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
     // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
@@ -58,7 +122,10 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
 
         match self.blob_service.open_read(&req_digest) {
             Ok(Some(reader)) => {
-                let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into();
+                let async_reader: SyncReadIntoAsyncRead<
+                    _,
+                    BytesMutWithDefaultCapacity<{ 100 * 1024 }>,
+                > = reader.into();
 
                 fn stream_mapper(
                     x: Result<bytes::Bytes, io::Error>,