diff options
Diffstat (limited to 'tvix/store/src/proto')
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 242 | ||||
-rw-r--r-- | tvix/store/src/proto/mod.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/proto/sync_read_into_async_read.rs | 158 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_blobservice.rs | 174 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_pathinfoservice.rs | 12 |
5 files changed, 270 insertions, 318 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index e891caa0268e..d1e98a5b7953 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,54 +1,34 @@ -use std::collections::VecDeque; - use crate::{ - blobservice::BlobService, - chunkservice::{read_all_and_chunk, update_hasher, ChunkService}, - Error, + blobservice::{BlobService, BlobWriter}, + proto::sync_read_into_async_read::SyncReadIntoAsyncRead, }; use data_encoding::BASE64; -use tokio::{sync::mpsc::channel, task}; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use std::{collections::VecDeque, io, pin::Pin}; +use tokio::task; +use tokio_stream::StreamExt; +use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{debug, instrument, warn}; +use tracing::{instrument, warn}; -pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> { +pub struct GRPCBlobServiceWrapper<BS: BlobService> { blob_service: BS, - chunk_service: CS, } -impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> { - pub fn new(blob_service: BS, chunk_service: CS) -> Self { +impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> { + fn from(value: BS) -> Self { Self { - blob_service, - chunk_service, - } - } - - // upload the chunk to the chunk service, and return its digest (or an error) when done. - #[instrument(skip(chunk_service))] - fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> { - let mut hasher = blake3::Hasher::new(); - update_hasher(&mut hasher, &chunk_data); - let digest = hasher.finalize(); - - if chunk_service.has(digest.as_bytes())? { - debug!("already has chunk, skipping"); + blob_service: value, } - let digest_resp = chunk_service.put(chunk_data)?; - - assert_eq!(&digest_resp, digest.as_bytes()); - - Ok(digest.as_bytes().to_vec()) } } #[async_trait] -impl< - BS: BlobService + Send + Sync + Clone + 'static, - CS: ChunkService + Send + Sync + Clone + 'static, - > super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS> +impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService + for GRPCBlobServiceWrapper<BS> { - type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>; + // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 + type ReadStream = + Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>; #[instrument(skip(self))] async fn stat( @@ -56,12 +36,22 @@ impl< request: Request<super::StatBlobRequest>, ) -> Result<Response<super::BlobMeta>, Status> { let rq = request.into_inner(); - match self.blob_service.stat(&rq) { - Ok(None) => Err(Status::not_found(format!( + let req_digest: [u8; 32] = rq + .digest + .clone() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + if rq.include_chunks || rq.include_bao { + return Err(Status::internal("not implemented")); + } + + match self.blob_service.has(&req_digest) { + Ok(true) => Ok(Response::new(super::BlobMeta::default())), + Ok(false) => Err(Status::not_found(format!( "blob {} not found", - BASE64.encode(&rq.digest) + BASE64.encode(&req_digest) ))), - Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)), Err(e) => Err(e.into()), } } @@ -71,99 +61,38 @@ impl< &self, request: Request<super::ReadBlobRequest>, ) -> Result<Response<Self::ReadStream>, Status> { - let req = request.into_inner(); - let (tx, rx) = channel(5); + let rq = request.into_inner(); - let req_digest: [u8; 32] = req + let req_digest: [u8; 32] = rq .digest + .clone() .try_into() - .map_err(|_e| Status::invalid_argument("invalid digest length"))?; - - // query the blob service for more detailed blob info - let stat_resp = self.blob_service.stat(&super::StatBlobRequest { - digest: req_digest.to_vec(), - include_chunks: true, - ..Default::default() - })?; - - match stat_resp { - None => { - // If the stat didn't return any blobmeta, the client might - // still have asked for a single chunk to be read. - // Check the chunkstore. - if let Some(data) = self.chunk_service.get(&req_digest)? { - // We already know the hash matches, and contrary to - // iterating over a blobmeta, we can't know the size, - // so send the contents of that chunk over, - // as the first (and only) element of the stream. - task::spawn(async move { - let res = Ok(super::BlobChunk { data }); - // send the result to the client. If the client already left, that's also fine. - if (tx.send(res).await).is_err() { - debug!("receiver dropped"); - } - }); - } else { - return Err(Status::not_found(format!( - "blob {} not found", - BASE64.encode(&req_digest), - ))); - } - } - Some(blobmeta) => { - let chunk_client = self.chunk_service.clone(); - - // TODO: use BlobReader? - // But then we might not be able to send compressed chunks as-is. - // Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it - // first, so we can .next().await in here. - - task::spawn(async move { - for chunkmeta in blobmeta.chunks { - // request chunk. - // We don't need to validate the digest again, as - // that's required for all implementations of ChunkService. - // TODO: handle error - let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap(); - let res = match chunk_client.get(chunkmeta_digest) { - Err(e) => Err(e.into()), - // TODO: make this a separate error type - Ok(None) => Err(Error::StorageError(format!( - "consistency error: chunk {} for blob {} not found", - BASE64.encode(chunkmeta_digest), - BASE64.encode(&req_digest), - )) - .into()), - Ok(Some(data)) => { - // We already know the hash matches, but also - // check the size matches what chunkmeta said. - if data.len() as u32 != chunkmeta.size { - Err(Error::StorageError(format!( - "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}", - BASE64.encode(chunkmeta_digest), - BASE64.encode(&req_digest), - chunkmeta.size, - data.len(), - )).into()) - } else { - // send out the current chunk - // TODO: we might want to break this up further if too big? - Ok(super::BlobChunk { data }) - } - } - }; - // send the result to the client - if (tx.send(res).await).is_err() { - debug!("receiver dropped"); - break; - } + .map_err(|_| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.open_read(&req_digest) { + Ok(Some(reader)) => { + let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into(); + + fn stream_mapper( + x: Result<bytes::Bytes, io::Error>, + ) -> Result<super::BlobChunk, Status> { + match x { + Ok(bytes) => Ok(super::BlobChunk { + data: bytes.to_vec(), + }), + Err(e) => Err(Status::from(e)), } - }); + } + + let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper); + Ok(Response::new(Box::pin(chunks_stream))) } + Ok(None) => Err(Status::not_found(format!( + "blob {} not found", + BASE64.encode(&rq.digest) + ))), + Err(e) => Err(e.into()), } - - let receiver_stream = ReceiverStream::new(rx); - Ok(Response::new(receiver_stream)) } #[instrument(skip(self))] @@ -180,37 +109,34 @@ impl< let data_reader = tokio_util::io::StreamReader::new(data_stream); - // TODO: can we get rid of this clone? - let chunk_service = self.chunk_service.clone(); - - let (blob_digest, blob_meta) = - task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> { - // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream. - read_all_and_chunk( - &chunk_service, - tokio_util::io::SyncIoBridge::new(data_reader), - ) - }) - .await - .map_err(|e| Status::internal(e.to_string()))??; - - // upload blobmeta if not there yet - if self + // prepare a writer, which we'll use in the blocking task below. + let mut writer = self .blob_service - .stat(&super::StatBlobRequest { - digest: blob_digest.to_vec(), - include_chunks: false, - include_bao: false, - })? - .is_none() - { - // upload blobmeta - self.blob_service.put(&blob_digest, blob_meta)?; - } - - // return to client. - Ok(Response::new(super::PutBlobResponse { - digest: blob_digest, - })) + .open_write() + .map_err(|e| Status::internal(format!("unable to open for write: {}", e)))?; + + let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> { + // construct a sync reader to the data + let mut reader = tokio_util::io::SyncIoBridge::new(data_reader); + + io::copy(&mut reader, &mut writer).map_err(|e| { + warn!("error copying: {}", e); + Status::internal("error copying") + })?; + + let digest = writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") + })? + .to_vec(); + + Ok(super::PutBlobResponse { digest }) + }) + .await + .map_err(|_| Status::internal("failed to wait for task"))??; + + Ok(Response::new(result)) } } diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index 5002d7e77a96..62741a3ad508 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -11,6 +11,8 @@ mod grpc_blobservice_wrapper; mod grpc_directoryservice_wrapper; mod grpc_pathinfoservice_wrapper; +mod sync_read_into_async_read; + pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper; diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs new file mode 100644 index 000000000000..0a0ef019781c --- /dev/null +++ b/tvix/store/src/proto/sync_read_into_async_read.rs @@ -0,0 +1,158 @@ +use bytes::Buf; +use core::task::Poll::Ready; +use futures::ready; +use futures::Future; +use std::io; +use std::io::Read; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncRead; +use tokio::runtime::Handle; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; + +#[derive(Debug)] +enum State<Buf: bytes::Buf + bytes::BufMut> { + Idle(Option<Buf>), + Busy(JoinHandle<(io::Result<usize>, Buf)>), +} + +use State::{Busy, Idle}; + +/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a +/// synchronous API. +#[derive(Debug)] +pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> { + state: Mutex<State<Buf>>, + reader: Arc<Mutex<R>>, + rt: Handle, +} + +impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> { + /// This must be called from within a Tokio runtime context, or else it will panic. + #[track_caller] + pub fn new(rt: Handle, reader: R) -> Self { + Self { + rt, + state: State::Idle(None).into(), + reader: Arc::new(reader.into()), + } + } + + /// This must be called from within a Tokio runtime context, or else it will panic. + pub fn new_with_reader(readable: R) -> Self { + Self::new(Handle::current(), readable) + } +} + +/// Repeats operations that are interrupted. +macro_rules! uninterruptibly { + ($e:expr) => {{ + loop { + match $e { + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + res => break res, + } + } + }}; +} + +impl< + R: Read + Send + 'static + std::marker::Unpin, + Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static, + > AsyncRead for SyncReadIntoAsyncRead<R, Buf> +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + dst: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + let me = self.get_mut(); + // Do we need this mutex? + let state = me.state.get_mut(); + + loop { + match state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap_or_default(); + + if buf.has_remaining() { + // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]` + // The `rest` is stuffed into the `buf_cell` for further poll_read. + // The other is completely consumed into the unfilled destination. + // `rest` can be empty. + let mut adjusted_src = + buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining())); + let copied_size = adjusted_src.remaining(); + adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size)); + dst.set_filled(copied_size); + *buf_cell = Some(buf); + return Ready(Ok(())); + } + + let reader = me.reader.clone(); + *state = Busy(me.rt.spawn_blocking(move || { + let result = uninterruptibly!(reader.blocking_lock().read( + // SAFETY: `reader.read` will *ONLY* write initialized bytes + // and never *READ* uninitialized bytes + // inside this buffer. + // + // Furthermore, casting the slice as `*mut [u8]` + // is safe because it has the same layout. + // + // Finally, the pointer obtained is valid and owned + // by `buf` only as we have a valid mutable reference + // to it, it is valid for write. + // + // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998 + unsafe { + &mut *(buf.chunk_mut().as_uninit_slice_mut() + as *mut [std::mem::MaybeUninit<u8>] + as *mut [u8]) + } + )); + + if let Ok(n) = result { + // SAFETY: given we initialize `n` bytes, we can move `n` bytes + // forward. + unsafe { + buf.advance_mut(n); + } + } + + (result, buf) + })); + } + Busy(ref mut rx) => { + let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?; + + match result { + Ok(n) => { + if n > 0 { + let remaining = std::cmp::min(n, dst.remaining()); + let mut adjusted_src = buf.copy_to_bytes(remaining); + adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining)); + dst.advance(remaining); + } + *state = Idle(Some(buf)); + return Ready(Ok(())); + } + Err(e) => { + *state = Idle(None); + return Ready(Err(e)); + } + } + } + } + } + } +} + +impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> { + /// This must be called from within a Tokio runtime context, or else it will panic. + fn from(value: R) -> Self { + Self::new_with_reader(value) + } +} diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs index 88eb21e30241..80e2e71867a3 100644 --- a/tvix/store/src/proto/tests/grpc_blobservice.rs +++ b/tvix/store/src/proto/tests/grpc_blobservice.rs @@ -1,18 +1,14 @@ use crate::blobservice::BlobService; -use crate::chunkservice::ChunkService; -use crate::proto::blob_meta::ChunkMeta; use crate::proto::blob_service_server::BlobService as GRPCBlobService; use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest}; -use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST}; -use crate::tests::utils::{gen_blob_service, gen_chunk_service}; +use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST}; +use crate::tests::utils::gen_blob_service; +use tokio_stream::StreamExt; -fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper< - impl BlobService + Send + Sync + Clone + 'static, - impl ChunkService + Send + Sync + Clone + 'static, -> { +fn gen_grpc_blob_service( +) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> { let blob_service = gen_blob_service(); - let chunk_service = gen_chunk_service(); - GRPCBlobServiceWrapper::new(blob_service, chunk_service) + GRPCBlobServiceWrapper::from(blob_service) } /// Trying to read a non-existent blob should return a not found error. @@ -26,8 +22,13 @@ async fn not_found_read() { })) .await; - let e = resp.expect_err("must_be_err"); - assert_eq!(e.code(), tonic::Code::NotFound); + // We can't use unwrap_err here, because the Ok value doesn't implement + // debug. + if let Err(e) = resp { + assert_eq!(e.code(), tonic::Code::NotFound); + } else { + panic!("resp is not err") + } } /// Trying to stat a non-existent blob should return a not found error. @@ -47,8 +48,7 @@ async fn not_found_stat() { assert_eq!(resp.code(), tonic::Code::NotFound); } -/// Put a blob in the store, get it back. We send something small enough so it -/// won't get split into multiple chunks. +/// Put a blob in the store, get it back. #[tokio::test] async fn put_read_stat() { let service = gen_grpc_blob_service(); @@ -64,39 +64,30 @@ async fn put_read_stat() { assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest); - // Stat for the digest of A. It should return one chunk. + // Stat for the digest of A. + // We currently don't ask for more granular chunking data, as we don't + // expose it yet. let resp = service .stat(tonic::Request::new(StatBlobRequest { digest: BLOB_A_DIGEST.to_vec(), - include_chunks: true, ..Default::default() })) .await .expect("must succeed") .into_inner(); - assert_eq!(1, resp.chunks.len()); - // the `chunks` field should point to the single chunk. - assert_eq!( - vec![ChunkMeta { - digest: BLOB_A_DIGEST.to_vec(), - size: BLOB_A.len() as u32, - }], - resp.chunks, - ); - - // Read the chunk. It should return the same data. + // Read the blob. It should return the same data. let resp = service .read(tonic::Request::new(ReadBlobRequest { digest: BLOB_A_DIGEST.to_vec(), })) .await; - let mut rx = resp.expect("must succeed").into_inner().into_inner(); + let mut rx = resp.ok().unwrap().into_inner(); // the stream should contain one element, a BlobChunk with the same contents as BLOB_A. let item = rx - .recv() + .next() .await .expect("must be some") .expect("must succeed"); @@ -104,127 +95,8 @@ async fn put_read_stat() { assert_eq!(BLOB_A.to_vec(), item.data); // … and no more elements - assert!(rx.recv().await.is_none()); -} - -/// Put a bigger blob in the store, and get it back. -/// Assert the stat request actually returns more than one chunk, and -/// we can read each chunk individually, as well as the whole blob via the -/// `read()` method. -#[tokio::test] -async fn put_read_stat_large() { - let service = gen_grpc_blob_service(); - - // split up BLOB_B into BlobChunks containing 1K bytes each. - let blob_b_blobchunks: Vec<BlobChunk> = BLOB_B - .chunks(1024) - .map(|x| BlobChunk { data: x.to_vec() }) - .collect(); - - assert!(blob_b_blobchunks.len() > 1); + assert!(rx.next().await.is_none()); - // Send blob B - let put_resp = service - .put(tonic_mock::streaming_request(blob_b_blobchunks)) - .await - .expect("must succeed") - .into_inner(); - - assert_eq!(BLOB_B_DIGEST.to_vec(), put_resp.digest); - - // Stat for the digest of B - let resp = service - .stat(tonic::Request::new(StatBlobRequest { - digest: BLOB_B_DIGEST.to_vec(), - include_chunks: true, - ..Default::default() - })) - .await - .expect("must succeed") - .into_inner(); - - // it should return more than one chunk. - assert_ne!(1, resp.chunks.len()); - - // The size added up should equal the size of BLOB_B. - let mut size_in_stat: u32 = 0; - for chunk in &resp.chunks { - size_in_stat += chunk.size - } - assert_eq!(BLOB_B.len() as u32, size_in_stat); - - // Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values. - // TODO: make the chunker config better accessible, so we don't need to synchronize this. - { - let chunker_avg_size = 64 * 1024; - let chunker_min_size = chunker_avg_size / 4; - let chunker_max_size = chunker_avg_size * 4; - - // initialize a chunker with the current buffer - let blob_b = BLOB_B.to_vec(); - let chunker = fastcdc::v2020::FastCDC::new( - &blob_b, - chunker_min_size, - chunker_avg_size, - chunker_max_size, - ); - - let mut num_chunks = 0; - for (i, chunk) in chunker.enumerate() { - assert_eq!( - resp.chunks[i].size, chunk.length as u32, - "expected locally-chunked chunk length to match stat response" - ); - - num_chunks += 1; - } - - assert_eq!( - resp.chunks.len(), - num_chunks, - "expected number of chunks to match" - ); - } - - // Reading the whole blob by its digest via the read() interface should succeed. - { - let resp = service - .read(tonic::Request::new(ReadBlobRequest { - digest: BLOB_B_DIGEST.to_vec(), - })) - .await; - - let mut rx = resp.expect("must succeed").into_inner().into_inner(); - - let mut buf: Vec<u8> = Vec::new(); - while let Some(item) = rx.recv().await { - let mut blob_chunk = item.expect("must not be err"); - buf.append(&mut blob_chunk.data); - } - - assert_eq!(BLOB_B.to_vec(), buf); - } - - // Reading the whole blob by reading individual chunks should also succeed. - { - let mut buf: Vec<u8> = Vec::new(); - for chunk in &resp.chunks { - // request this individual chunk via read - let resp = service - .read(tonic::Request::new(ReadBlobRequest { - digest: chunk.digest.clone(), - })) - .await; - - let mut rx = resp.expect("must succeed").into_inner().into_inner(); - - // append all items from the stream to the buffer - while let Some(item) = rx.recv().await { - let mut blob_chunk = item.expect("must not be err"); - buf.append(&mut blob_chunk.data); - } - } - // finished looping over all chunks, compare - assert_eq!(BLOB_B.to_vec(), buf); - } + // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks. + // Test with some bigger blob too } diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index f74c3fda0213..11cab2c264cc 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -6,12 +6,10 @@ use crate::proto::GRPCPathInfoServiceWrapper; use crate::proto::PathInfo; use crate::proto::{GetPathInfoRequest, Node, SymlinkNode}; use crate::tests::fixtures::DUMMY_OUTPUT_HASH; -use crate::tests::utils::{ - gen_blob_service, gen_chunk_service, gen_directory_service, gen_pathinfo_service, -}; +use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; use tonic::Request; -/// generates a GRPCPathInfoService out of blob, chunk, directory and pathinfo services. +/// generates a GRPCPathInfoService out of blob, directory and pathinfo services. /// /// We only interact with it via the PathInfo GRPC interface. /// It uses the NonCachingNARCalculationService NARCalculationService to @@ -19,11 +17,7 @@ use tonic::Request; fn gen_grpc_service() -> impl GRPCPathInfoService { GRPCPathInfoServiceWrapper::new( gen_pathinfo_service(), - NonCachingNARCalculationService::new( - gen_blob_service(), - gen_chunk_service(), - gen_directory_service(), - ), + NonCachingNARCalculationService::new(gen_blob_service(), gen_directory_service()), ) } |