diff options
author | Florian Klink <flokli@flokli.de> | 2023-05-25T14·52+0300 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-06-12T10·15+0000 |
commit | 27ff98000b0cdf0ed30eb8837c7d44cd3e79d32f (patch) | |
tree | 09fcb40135001d35717ce176d8b473f5e634bdcf /tvix | |
parent | 5139cc45c2ce1736509f3f0ebf68a71c10ace939 (diff) |
feat(tvix/store): eliminate generics in BlobStore r/6269
To construct various stores at runtime, we need to eliminate associated types from the BlobService trait, and return Box<dyn …> instead of specific types. This also means we can't consume self in the close() method, so everything we write to is put in an Option<>, and during the first close we take from there. Change-Id: Ia523b6ab2f2a5276f51cb5d17e81a5925bce69b6 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8647 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/cli/src/main.rs | 4 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 7 | ||||
-rw-r--r-- | tvix/store/src/blobservice/grpc.rs | 96 | ||||
-rw-r--r-- | tvix/store/src/blobservice/memory.rs | 89 | ||||
-rw-r--r-- | tvix/store/src/blobservice/mod.rs | 15 | ||||
-rw-r--r-- | tvix/store/src/blobservice/sled.rs | 81 | ||||
-rw-r--r-- | tvix/store/src/import.rs | 13 | ||||
-rw-r--r-- | tvix/store/src/nar/non_caching_calculation_service.rs | 13 | ||||
-rw-r--r-- | tvix/store/src/nar/renderer.rs | 9 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 16 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_blobservice.rs | 4 | ||||
-rw-r--r-- | tvix/store/src/store_io.rs | 19 | ||||
-rw-r--r-- | tvix/store/src/tests/import.rs | 1 | ||||
-rw-r--r-- | tvix/store/src/tests/nar_renderer.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/tests/utils.rs | 4 |
15 files changed, 230 insertions, 143 deletions
diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs index 28b75dd907aa..459177717c5a 100644 --- a/tvix/cli/src/main.rs +++ b/tvix/cli/src/main.rs @@ -75,14 +75,14 @@ fn interpret(code: &str, path: Option<PathBuf>, args: &Args, explain: bool) -> b let directory_service = MemoryDirectoryService::default(); let path_info_service = MemoryPathInfoService::default(); let nar_calculation_service = tvix_store::nar::NonCachingNARCalculationService::new( - blob_service.clone(), + Box::new(blob_service.clone()), directory_service.clone(), ); eval.io_handle = Box::new(tvix_io::TvixIO::new( known_paths.clone(), tvix_store::TvixStoreIO::new( - blob_service, + Box::new(blob_service), directory_service, path_info_service, nar_calculation_service, diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index ce2701157110..8fe62e5b5b2e 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -6,6 +6,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use tracing_subscriber::prelude::*; +use tvix_store::blobservice::BlobService; use tvix_store::blobservice::GRPCBlobService; use tvix_store::blobservice::SledBlobService; use tvix_store::directoryservice::GRPCDirectoryService; @@ -112,14 +113,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut server = Server::builder(); let nar_calculation_service = NonCachingNARCalculationService::new( - blob_service.clone(), + Box::new(blob_service.clone()), directory_service.clone(), ); #[allow(unused_mut)] let mut router = server .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( - blob_service, + Box::new(blob_service) as Box<dyn BlobService>, ))) .add_service(DirectoryServiceServer::new( GRPCDirectoryServiceWrapper::from(directory_service), @@ -156,7 +157,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { GRPCNARCalculationService::from_client(path_info_service_client); let io = Arc::new(TvixStoreIO::new( - blob_service, + Box::new(blob_service), directory_service, path_info_service, nar_calculation_service, diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index 0b08fbf46ad9..46ec64bce785 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -47,9 +47,6 @@ impl GRPCBlobService { } impl BlobService for GRPCBlobService { - type BlobReader = Box<dyn io::Read + Send>; - type BlobWriter = GRPCBlobWriter; - #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. @@ -76,7 +73,10 @@ impl BlobService for GRPCBlobService { // On success, this returns a Ok(Some(io::Read)), which can be used to read // the contents of the Blob, identified by the digest. - fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, crate::Error> { + fn open_read( + &self, + digest: &B3Digest, + ) -> Result<Option<Box<dyn io::Read + Send>>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest = digest.clone(); @@ -123,7 +123,7 @@ impl BlobService for GRPCBlobService { /// Returns a [Self::BlobWriter], that'll internally wrap each write in a // [proto::BlobChunk] and which is passed to the - fn open_write(&self) -> Result<Self::BlobWriter, crate::Error> { + fn open_write(&self) -> Result<Box<dyn BlobWriter>, crate::Error> { let mut grpc_client = self.grpc_client.clone(); // set up an mpsc channel passing around Bytes. @@ -155,11 +155,11 @@ impl BlobService for GRPCBlobService { // … which is then turned into a [io::Write]. let writer = SyncIoBridge::new(async_writer); - Ok(GRPCBlobWriter { + Ok(Box::new(GRPCBlobWriter { tokio_handle: self.tokio_handle.clone(), // TODO: is the clone() ok here? - task, - inner_writer: writer, - }) + task_and_writer: Some((task, writer)), + digest: None, + })) } } @@ -176,42 +176,74 @@ pub struct GRPCBlobWriter { /// containing the put request. tokio_handle: tokio::runtime::Handle, - /// The task containing the put request. - task: JoinHandle<Result<proto::PutBlobResponse, Status>>, + /// The task containing the put request, and the inner writer, if we're still writing. + task_and_writer: Option<( + JoinHandle<Result<proto::PutBlobResponse, Status>>, + BridgedWriter, + )>, - /// The inner Writer. - inner_writer: BridgedWriter, + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, } impl BlobWriter for GRPCBlobWriter { - fn close(mut self) -> Result<B3Digest, crate::Error> { - // invoke shutdown, so the inner writer closes its internal tx side of - // the channel. - self.inner_writer - .shutdown() - .map_err(|e| crate::Error::StorageError(e.to_string()))?; - - // block on the RPC call to return. - // This ensures all chunks are sent out, and have been received by the - // backend. - match self.tokio_handle.block_on(self.task)? { - Ok(resp) => { - // return the digest from the response. - B3Digest::from_vec(resp.digest).map_err(|_| { - crate::Error::StorageError("invalid root digest length in response".to_string()) - }) + fn close(&mut self) -> Result<B3Digest, crate::Error> { + if self.task_and_writer.is_none() { + // if we're already closed, return the b3 digest, which must exist. + // If it doesn't, we already closed and failed once, and didn't handle the error. + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (task, mut writer) = self.task_and_writer.take().unwrap(); + + // invoke shutdown, so the inner writer closes its internal tx side of + // the channel. + writer + .shutdown() + .map_err(|e| crate::Error::StorageError(e.to_string()))?; + + // block on the RPC call to return. + // This ensures all chunks are sent out, and have been received by the + // backend. + match self.tokio_handle.block_on(task)? { + Ok(resp) => { + // return the digest from the response, and store it in self.digest for subsequent closes. + let digest = B3Digest::from_vec(resp.digest).map_err(|_| { + crate::Error::StorageError( + "invalid root digest length in response".to_string(), + ) + })?; + self.digest = Some(digest.clone()); + Ok(digest) + } + Err(e) => Err(crate::Error::StorageError(e.to_string())), } - Err(e) => Err(crate::Error::StorageError(e.to_string())), } } } impl io::Write for GRPCBlobWriter { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.inner_writer.write(buf) + match &mut self.task_and_writer { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((_, ref mut writer)) => writer.write(buf), + } } fn flush(&mut self) -> io::Result<()> { - self.inner_writer.flush() + match &mut self.task_and_writer { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((_, ref mut writer)) => writer.flush(), + } } } diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 1ee59d108743..166eeabdb6a2 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -1,4 +1,4 @@ -use std::io::Cursor; +use std::io::{self, Cursor}; use std::{ collections::HashMap, sync::{Arc, RwLock}, @@ -14,63 +14,102 @@ pub struct MemoryBlobService { } impl BlobService for MemoryBlobService { - type BlobReader = Cursor<Vec<u8>>; - type BlobWriter = MemoryBlobWriter; - #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, Error> { let db = self.db.read().unwrap(); Ok(db.contains_key(digest)) } - fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> { + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> { let db = self.db.read().unwrap(); - Ok(db.get(digest).map(|x| Cursor::new(x.clone()))) + match db.get(digest).map(|x| Cursor::new(x.clone())) { + Some(result) => Ok(Some(Box::new(result))), + None => Ok(None), + } } #[instrument(skip(self))] - fn open_write(&self) -> Result<Self::BlobWriter, Error> { - Ok(MemoryBlobWriter::new(self.db.clone())) + fn open_write(&self) -> Result<Box<dyn BlobWriter>, Error> { + Ok(Box::new(MemoryBlobWriter::new(self.db.clone()))) } } pub struct MemoryBlobWriter { db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, - buf: Vec<u8>, + /// Contains the Vec and hasher, or None if already closed + writers: Option<(Vec<u8>, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, } impl MemoryBlobWriter { fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self { Self { - buf: Vec::new(), db, + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, } } } impl std::io::Write for MemoryBlobWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { - self.buf.write(buf) + fn write(&mut self, b: &[u8]) -> std::io::Result<usize> { + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&buf[..bytes_written]) + } + } } fn flush(&mut self) -> std::io::Result<()> { - self.buf.flush() + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + } } } impl BlobWriter for MemoryBlobWriter { - fn close(self) -> Result<B3Digest, Error> { - // in this memory implementation, we don't actually bother hashing - // incrementally while writing, but do it at the end. - let mut hasher = blake3::Hasher::new(); - hasher.update(&self.buf); - let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); - - // open the database for writing. - let mut db = self.db.write()?; - db.insert(digest.clone(), self.buf); - - Ok(digest) + fn close(&mut self) -> Result<B3Digest, Error> { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // Only insert if the blob doesn't already exist. + let db = self.db.read()?; + if !db.contains_key(&digest) { + // drop the read lock, so we can open for writing. + drop(db); + + // open the database for writing. + let mut db = self.db.write()?; + + // and put buf in there. This will move buf out. + db.insert(digest.clone(), buf); + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } } } diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index c5a2de124656..08565285c0d1 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -14,28 +14,25 @@ pub use self::sled::SledBlobService; /// It provides functions to check whether a given blob exists, /// a way to get a [io::Read] to a blob, and a method to initiate writing a new /// Blob, which returns a [BlobWriter], that can be used -pub trait BlobService { - type BlobReader: io::Read + Send + std::marker::Unpin; - type BlobWriter: BlobWriter + Send; - +pub trait BlobService: Send + Sync { /// Check if the service has the blob, by its content hash. fn has(&self, digest: &B3Digest) -> Result<bool, Error>; /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>. - fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error>; + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error>; /// Insert a new blob into the store. Returns a [BlobWriter], which /// implements [io::Write] and a [BlobWriter::close]. /// TODO: is there any reason we want this to be a Result<>, and not just T? - fn open_write(&self) -> Result<Self::BlobWriter, Error>; + fn open_write(&self) -> Result<Box<dyn BlobWriter>, Error>; } /// A [io::Write] that you need to close() afterwards, and get back the digest /// of the written blob. -pub trait BlobWriter: io::Write { +pub trait BlobWriter: io::Write + Send + Sync + 'static { /// Signal there's no more data to be written, and return the digest of the /// contents written. /// - /// This consumes self, so it's not possible to close twice. - fn close(self) -> Result<B3Digest, Error>; + /// Closing a already-closed BlobWriter is a no-op. + fn close(&mut self) -> Result<B3Digest, Error>; } diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 2b090335344d..3b130fcd9a05 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -28,9 +28,6 @@ impl SledBlobService { } impl BlobService for SledBlobService { - type BlobReader = Cursor<Vec<u8>>; - type BlobWriter = SledBlobWriter; - #[instrument(skip(self), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, Error> { match self.db.contains_key(digest.to_vec()) { @@ -40,55 +37,93 @@ impl BlobService for SledBlobService { } #[instrument(skip(self), fields(blob.digest=%digest))] - fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> { + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> { match self.db.get(digest.to_vec()) { Ok(None) => Ok(None), - Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))), + Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))), Err(e) => Err(Error::StorageError(e.to_string())), } } #[instrument(skip(self))] - fn open_write(&self) -> Result<Self::BlobWriter, Error> { - Ok(SledBlobWriter::new(self.db.clone())) + fn open_write(&self) -> Result<Box<dyn BlobWriter>, Error> { + Ok(Box::new(SledBlobWriter::new(self.db.clone()))) } } pub struct SledBlobWriter { db: sled::Db, - buf: Vec<u8>, - hasher: blake3::Hasher, + + /// Contains the Vec and hasher, or None if already closed + writers: Option<(Vec<u8>, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, } impl SledBlobWriter { pub fn new(db: sled::Db) -> Self { Self { - buf: Vec::default(), db, - hasher: blake3::Hasher::new(), + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, } } } impl io::Write for SledBlobWriter { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - let bytes_written = self.buf.write(buf)?; - self.hasher.write(&buf[..bytes_written]) + fn write(&mut self, b: &[u8]) -> io::Result<usize> { + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&buf[..bytes_written]) + } + } } fn flush(&mut self) -> io::Result<()> { - self.buf.flush() + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + } } } impl BlobWriter for SledBlobWriter { - fn close(self) -> Result<B3Digest, Error> { - let digest = self.hasher.finalize(); - self.db - .insert(digest.as_bytes(), self.buf) - .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; - - // We know self.hasher is doing blake3 hashing, so this won't fail. - Ok(B3Digest::from_vec(digest.as_bytes().to_vec()).unwrap()) + fn close(&mut self) -> Result<B3Digest, Error> { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // Only insert if the blob doesn't already exist. + if !self.db.contains_key(&digest.to_vec()).map_err(|e| { + Error::StorageError(format!("Unable to check if we have blob {}: {}", digest, e)) + })? { + // put buf in there. This will move buf out. + self.db + .insert(digest.to_vec(), buf) + .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } } } diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index 206e5eaba975..a0bd1de5e149 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,5 +1,6 @@ -use crate::{blobservice::BlobService, directoryservice::DirectoryService}; -use crate::{blobservice::BlobWriter, directoryservice::DirectoryPutter, proto}; +use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryService; +use crate::{directoryservice::DirectoryPutter, proto}; use std::{ collections::HashMap, fmt::Debug, @@ -55,8 +56,8 @@ impl From<super::Error> for Error { // // It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] -fn process_entry<BS: BlobService, DP: DirectoryPutter>( - blob_service: &BS, +fn process_entry<DP: DirectoryPutter>( + blob_service: &Box<dyn BlobService>, directory_putter: &mut DP, entry: &walkdir::DirEntry, maybe_directory: Option<proto::Directory>, @@ -144,8 +145,8 @@ fn process_entry<BS: BlobService, DP: DirectoryPutter>( /// possibly register it somewhere (and potentially rename it based on some /// naming scheme. #[instrument(skip(blob_service, directory_service), fields(path=?p))] -pub fn ingest_path<BS: BlobService, DS: DirectoryService, P: AsRef<Path> + Debug>( - blob_service: &BS, +pub fn ingest_path<DS: DirectoryService, P: AsRef<Path> + Debug>( + blob_service: &Box<dyn BlobService>, directory_service: &DS, p: P, ) -> Result<proto::node::Node, Error> { diff --git a/tvix/store/src/nar/non_caching_calculation_service.rs b/tvix/store/src/nar/non_caching_calculation_service.rs index 8a080cb4df5e..b743f264b0ff 100644 --- a/tvix/store/src/nar/non_caching_calculation_service.rs +++ b/tvix/store/src/nar/non_caching_calculation_service.rs @@ -10,22 +10,19 @@ use super::{NARCalculationService, RenderError}; /// A NAR calculation service which simply renders the whole NAR whenever /// we ask for the calculation. -#[derive(Clone)] -pub struct NonCachingNARCalculationService<BS: BlobService, DS: DirectoryService> { - nar_renderer: NARRenderer<BS, DS>, +pub struct NonCachingNARCalculationService<DS: DirectoryService> { + nar_renderer: NARRenderer<DS>, } -impl<BS: BlobService, DS: DirectoryService> NonCachingNARCalculationService<BS, DS> { - pub fn new(blob_service: BS, directory_service: DS) -> Self { +impl<DS: DirectoryService> NonCachingNARCalculationService<DS> { + pub fn new(blob_service: Box<dyn BlobService>, directory_service: DS) -> Self { Self { nar_renderer: NARRenderer::new(blob_service, directory_service), } } } -impl<BS: BlobService, DS: DirectoryService> NARCalculationService - for NonCachingNARCalculationService<BS, DS> -{ +impl<DS: DirectoryService> NARCalculationService for NonCachingNARCalculationService<DS> { fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), RenderError> { let h = Sha256::new(); let mut cw = CountWrite::from(h); diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index c10f2ddf52fa..a9a6d989e1b8 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -11,14 +11,13 @@ use tracing::warn; /// A NAR renderer, using a blob_service, chunk_service and directory_service /// to render a NAR to a writer. -#[derive(Clone)] -pub struct NARRenderer<BS: BlobService, DS: DirectoryService> { - blob_service: BS, +pub struct NARRenderer<DS: DirectoryService> { + blob_service: Box<dyn BlobService>, directory_service: DS, } -impl<BS: BlobService, DS: DirectoryService> NARRenderer<BS, DS> { - pub fn new(blob_service: BS, directory_service: DS) -> Self { +impl<DS: DirectoryService> NARRenderer<DS> { + pub fn new(blob_service: Box<dyn BlobService>, directory_service: DS) -> Self { Self { blob_service, directory_service, diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 3ec1d68872c7..066790daf370 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,7 +1,5 @@ use crate::{ - blobservice::{BlobService, BlobWriter}, - proto::sync_read_into_async_read::SyncReadIntoAsyncRead, - B3Digest, + blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest, }; use std::{collections::VecDeque, io, pin::Pin}; use tokio::task; @@ -10,12 +8,12 @@ use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{instrument, warn}; -pub struct GRPCBlobServiceWrapper<BS: BlobService> { - blob_service: BS, +pub struct GRPCBlobServiceWrapper { + blob_service: Box<dyn BlobService>, } -impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> { - fn from(value: BS) -> Self { +impl From<Box<dyn BlobService + 'static>> for GRPCBlobServiceWrapper { + fn from(value: Box<dyn BlobService>) -> Self { Self { blob_service: value, } @@ -23,9 +21,7 @@ impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> { } #[async_trait] -impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService - for GRPCBlobServiceWrapper<BS> -{ +impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 type ReadStream = Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>; diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs index 02e04e7d723f..2f18ea4abb99 100644 --- a/tvix/store/src/proto/tests/grpc_blobservice.rs +++ b/tvix/store/src/proto/tests/grpc_blobservice.rs @@ -1,12 +1,10 @@ -use crate::blobservice::BlobService; use crate::proto::blob_service_server::BlobService as GRPCBlobService; use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest}; use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST}; use crate::tests::utils::gen_blob_service; use tokio_stream::StreamExt; -fn gen_grpc_blob_service( -) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> { +fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper { let blob_service = gen_blob_service(); GRPCBlobServiceWrapper::from(blob_service) } diff --git a/tvix/store/src/store_io.rs b/tvix/store/src/store_io.rs index fb46204e505f..a2967c06ff98 100644 --- a/tvix/store/src/store_io.rs +++ b/tvix/store/src/store_io.rs @@ -29,24 +29,19 @@ use crate::{ /// This is to both cover cases of syntactically valid store paths, that exist /// on the filesystem (still managed by Nix), as well as being able to read /// files outside store paths. -pub struct TvixStoreIO< - BS: BlobService, - DS: DirectoryService, - PS: PathInfoService, - NCS: NARCalculationService, -> { - blob_service: BS, +pub struct TvixStoreIO<DS: DirectoryService, PS: PathInfoService, NCS: NARCalculationService> { + blob_service: Box<dyn BlobService>, directory_service: DS, path_info_service: PS, nar_calculation_service: NCS, std_io: StdIO, } -impl<BS: BlobService, DS: DirectoryService, PS: PathInfoService, NCS: NARCalculationService> - TvixStoreIO<BS, DS, PS, NCS> +impl<DS: DirectoryService, PS: PathInfoService, NCS: NARCalculationService> + TvixStoreIO<DS, PS, NCS> { pub fn new( - blob_service: BS, + blob_service: Box<dyn BlobService>, directory_service: DS, path_info_service: PS, nar_calculation_service: NCS, @@ -183,8 +178,8 @@ fn calculate_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> S build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap() } -impl<BS: BlobService, DS: DirectoryService, PS: PathInfoService, NCS: NARCalculationService> EvalIO - for TvixStoreIO<BS, DS, PS, NCS> +impl<DS: DirectoryService, PS: PathInfoService, NCS: NARCalculationService> EvalIO + for TvixStoreIO<DS, PS, NCS> { #[instrument(skip(self), ret, err)] fn path_exists(&self, path: &Path) -> Result<bool, io::Error> { diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs index 8b66cb024bf0..725d467bd0cf 100644 --- a/tvix/store/src/tests/import.rs +++ b/tvix/store/src/tests/import.rs @@ -1,5 +1,4 @@ use super::utils::{gen_blob_service, gen_directory_service}; -use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::import::ingest_path; use crate::proto; diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 198df0083cc9..3d7cfd4a96a7 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -1,5 +1,3 @@ -use crate::blobservice::BlobService; -use crate::blobservice::BlobWriter; use crate::directoryservice::DirectoryService; use crate::nar::NARRenderer; use crate::proto::DirectoryNode; diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs index 2991feed41db..ec379bddcf77 100644 --- a/tvix/store/src/tests/utils.rs +++ b/tvix/store/src/tests/utils.rs @@ -4,8 +4,8 @@ use crate::{ pathinfoservice::{MemoryPathInfoService, PathInfoService}, }; -pub fn gen_blob_service() -> impl BlobService + Send + Sync + Clone + 'static { - MemoryBlobService::default() +pub fn gen_blob_service() -> Box<dyn BlobService> { + Box::new(MemoryBlobService::default()) } pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + 'static { |