From 27ff98000b0cdf0ed30eb8837c7d44cd3e79d32f Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Thu, 25 May 2023 17:52:08 +0300 Subject: feat(tvix/store): eliminate generics in BlobStore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit To construct various stores at runtime, we need to eliminate associated types from the BlobService trait, and return Box instead of specific types. This also means we can't consume self in the close() method, so everything we write to is put in an Option<>, and during the first close we take from there. Change-Id: Ia523b6ab2f2a5276f51cb5d17e81a5925bce69b6 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8647 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: tazjin --- tvix/cli/src/main.rs | 4 +- tvix/store/src/bin/tvix-store.rs | 7 +- tvix/store/src/blobservice/grpc.rs | 96 ++++++++++++++-------- tvix/store/src/blobservice/memory.rs | 89 ++++++++++++++------ tvix/store/src/blobservice/mod.rs | 15 ++-- tvix/store/src/blobservice/sled.rs | 81 ++++++++++++------ tvix/store/src/import.rs | 13 +-- .../src/nar/non_caching_calculation_service.rs | 13 ++- tvix/store/src/nar/renderer.rs | 9 +- tvix/store/src/proto/grpc_blobservice_wrapper.rs | 16 ++-- tvix/store/src/proto/tests/grpc_blobservice.rs | 4 +- tvix/store/src/store_io.rs | 19 ++--- tvix/store/src/tests/import.rs | 1 - tvix/store/src/tests/nar_renderer.rs | 2 - tvix/store/src/tests/utils.rs | 4 +- 15 files changed, 230 insertions(+), 143 deletions(-) diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs index 28b75dd907aa..459177717c5a 100644 --- a/tvix/cli/src/main.rs +++ b/tvix/cli/src/main.rs @@ -75,14 +75,14 @@ fn interpret(code: &str, path: Option, args: &Args, explain: bool) -> b let directory_service = MemoryDirectoryService::default(); let path_info_service = MemoryPathInfoService::default(); let nar_calculation_service = tvix_store::nar::NonCachingNARCalculationService::new( - blob_service.clone(), + Box::new(blob_service.clone()), directory_service.clone(), ); eval.io_handle = Box::new(tvix_io::TvixIO::new( known_paths.clone(), tvix_store::TvixStoreIO::new( - blob_service, + Box::new(blob_service), directory_service, path_info_service, nar_calculation_service, diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index ce2701157110..8fe62e5b5b2e 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -6,6 +6,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use tracing_subscriber::prelude::*; +use tvix_store::blobservice::BlobService; use tvix_store::blobservice::GRPCBlobService; use tvix_store::blobservice::SledBlobService; use tvix_store::directoryservice::GRPCDirectoryService; @@ -112,14 +113,14 @@ async fn main() -> Result<(), Box> { let mut server = Server::builder(); let nar_calculation_service = NonCachingNARCalculationService::new( - blob_service.clone(), + Box::new(blob_service.clone()), directory_service.clone(), ); #[allow(unused_mut)] let mut router = server .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( - blob_service, + Box::new(blob_service) as Box, ))) .add_service(DirectoryServiceServer::new( GRPCDirectoryServiceWrapper::from(directory_service), @@ -156,7 +157,7 @@ async fn main() -> Result<(), Box> { GRPCNARCalculationService::from_client(path_info_service_client); let io = Arc::new(TvixStoreIO::new( - blob_service, + Box::new(blob_service), directory_service, path_info_service, nar_calculation_service, diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index 0b08fbf46ad9..46ec64bce785 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -47,9 +47,6 @@ impl GRPCBlobService { } impl BlobService for GRPCBlobService { - type BlobReader = Box; - type BlobWriter = GRPCBlobWriter; - #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result { // Get a new handle to the gRPC client, and copy the digest. @@ -76,7 +73,10 @@ impl BlobService for GRPCBlobService { // On success, this returns a Ok(Some(io::Read)), which can be used to read // the contents of the Blob, identified by the digest. - fn open_read(&self, digest: &B3Digest) -> Result, crate::Error> { + fn open_read( + &self, + digest: &B3Digest, + ) -> Result>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest = digest.clone(); @@ -123,7 +123,7 @@ impl BlobService for GRPCBlobService { /// Returns a [Self::BlobWriter], that'll internally wrap each write in a // [proto::BlobChunk] and which is passed to the - fn open_write(&self) -> Result { + fn open_write(&self) -> Result, crate::Error> { let mut grpc_client = self.grpc_client.clone(); // set up an mpsc channel passing around Bytes. @@ -155,11 +155,11 @@ impl BlobService for GRPCBlobService { // … which is then turned into a [io::Write]. let writer = SyncIoBridge::new(async_writer); - Ok(GRPCBlobWriter { + Ok(Box::new(GRPCBlobWriter { tokio_handle: self.tokio_handle.clone(), // TODO: is the clone() ok here? - task, - inner_writer: writer, - }) + task_and_writer: Some((task, writer)), + digest: None, + })) } } @@ -176,42 +176,74 @@ pub struct GRPCBlobWriter { /// containing the put request. tokio_handle: tokio::runtime::Handle, - /// The task containing the put request. - task: JoinHandle>, + /// The task containing the put request, and the inner writer, if we're still writing. + task_and_writer: Option<( + JoinHandle>, + BridgedWriter, + )>, - /// The inner Writer. - inner_writer: BridgedWriter, + /// The digest that has been returned, if we successfully closed. + digest: Option, } impl BlobWriter for GRPCBlobWriter { - fn close(mut self) -> Result { - // invoke shutdown, so the inner writer closes its internal tx side of - // the channel. - self.inner_writer - .shutdown() - .map_err(|e| crate::Error::StorageError(e.to_string()))?; - - // block on the RPC call to return. - // This ensures all chunks are sent out, and have been received by the - // backend. - match self.tokio_handle.block_on(self.task)? { - Ok(resp) => { - // return the digest from the response. - B3Digest::from_vec(resp.digest).map_err(|_| { - crate::Error::StorageError("invalid root digest length in response".to_string()) - }) + fn close(&mut self) -> Result { + if self.task_and_writer.is_none() { + // if we're already closed, return the b3 digest, which must exist. + // If it doesn't, we already closed and failed once, and didn't handle the error. + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (task, mut writer) = self.task_and_writer.take().unwrap(); + + // invoke shutdown, so the inner writer closes its internal tx side of + // the channel. + writer + .shutdown() + .map_err(|e| crate::Error::StorageError(e.to_string()))?; + + // block on the RPC call to return. + // This ensures all chunks are sent out, and have been received by the + // backend. + match self.tokio_handle.block_on(task)? { + Ok(resp) => { + // return the digest from the response, and store it in self.digest for subsequent closes. + let digest = B3Digest::from_vec(resp.digest).map_err(|_| { + crate::Error::StorageError( + "invalid root digest length in response".to_string(), + ) + })?; + self.digest = Some(digest.clone()); + Ok(digest) + } + Err(e) => Err(crate::Error::StorageError(e.to_string())), } - Err(e) => Err(crate::Error::StorageError(e.to_string())), } } } impl io::Write for GRPCBlobWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - self.inner_writer.write(buf) + match &mut self.task_and_writer { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((_, ref mut writer)) => writer.write(buf), + } } fn flush(&mut self) -> io::Result<()> { - self.inner_writer.flush() + match &mut self.task_and_writer { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((_, ref mut writer)) => writer.flush(), + } } } diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 1ee59d108743..166eeabdb6a2 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -1,4 +1,4 @@ -use std::io::Cursor; +use std::io::{self, Cursor}; use std::{ collections::HashMap, sync::{Arc, RwLock}, @@ -14,63 +14,102 @@ pub struct MemoryBlobService { } impl BlobService for MemoryBlobService { - type BlobReader = Cursor>; - type BlobWriter = MemoryBlobWriter; - #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result { let db = self.db.read().unwrap(); Ok(db.contains_key(digest)) } - fn open_read(&self, digest: &B3Digest) -> Result, Error> { + fn open_read(&self, digest: &B3Digest) -> Result>, Error> { let db = self.db.read().unwrap(); - Ok(db.get(digest).map(|x| Cursor::new(x.clone()))) + match db.get(digest).map(|x| Cursor::new(x.clone())) { + Some(result) => Ok(Some(Box::new(result))), + None => Ok(None), + } } #[instrument(skip(self))] - fn open_write(&self) -> Result { - Ok(MemoryBlobWriter::new(self.db.clone())) + fn open_write(&self) -> Result, Error> { + Ok(Box::new(MemoryBlobWriter::new(self.db.clone()))) } } pub struct MemoryBlobWriter { db: Arc>>>, - buf: Vec, + /// Contains the Vec and hasher, or None if already closed + writers: Option<(Vec, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option, } impl MemoryBlobWriter { fn new(db: Arc>>>) -> Self { Self { - buf: Vec::new(), db, + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, } } } impl std::io::Write for MemoryBlobWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.buf.write(buf) + fn write(&mut self, b: &[u8]) -> std::io::Result { + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&buf[..bytes_written]) + } + } } fn flush(&mut self) -> std::io::Result<()> { - self.buf.flush() + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + } } } impl BlobWriter for MemoryBlobWriter { - fn close(self) -> Result { - // in this memory implementation, we don't actually bother hashing - // incrementally while writing, but do it at the end. - let mut hasher = blake3::Hasher::new(); - hasher.update(&self.buf); - let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); - - // open the database for writing. - let mut db = self.db.write()?; - db.insert(digest.clone(), self.buf); - - Ok(digest) + fn close(&mut self) -> Result { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // Only insert if the blob doesn't already exist. + let db = self.db.read()?; + if !db.contains_key(&digest) { + // drop the read lock, so we can open for writing. + drop(db); + + // open the database for writing. + let mut db = self.db.write()?; + + // and put buf in there. This will move buf out. + db.insert(digest.clone(), buf); + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } } } diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index c5a2de124656..08565285c0d1 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -14,28 +14,25 @@ pub use self::sled::SledBlobService; /// It provides functions to check whether a given blob exists, /// a way to get a [io::Read] to a blob, and a method to initiate writing a new /// Blob, which returns a [BlobWriter], that can be used -pub trait BlobService { - type BlobReader: io::Read + Send + std::marker::Unpin; - type BlobWriter: BlobWriter + Send; - +pub trait BlobService: Send + Sync { /// Check if the service has the blob, by its content hash. fn has(&self, digest: &B3Digest) -> Result; /// Request a blob from the store, by its content hash. Returns a Option. - fn open_read(&self, digest: &B3Digest) -> Result, Error>; + fn open_read(&self, digest: &B3Digest) -> Result>, Error>; /// Insert a new blob into the store. Returns a [BlobWriter], which /// implements [io::Write] and a [BlobWriter::close]. /// TODO: is there any reason we want this to be a Result<>, and not just T? - fn open_write(&self) -> Result; + fn open_write(&self) -> Result, Error>; } /// A [io::Write] that you need to close() afterwards, and get back the digest /// of the written blob. -pub trait BlobWriter: io::Write { +pub trait BlobWriter: io::Write + Send + Sync + 'static { /// Signal there's no more data to be written, and return the digest of the /// contents written. /// - /// This consumes self, so it's not possible to close twice. - fn close(self) -> Result; + /// Closing a already-closed BlobWriter is a no-op. + fn close(&mut self) -> Result; } diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 2b090335344d..3b130fcd9a05 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -28,9 +28,6 @@ impl SledBlobService { } impl BlobService for SledBlobService { - type BlobReader = Cursor>; - type BlobWriter = SledBlobWriter; - #[instrument(skip(self), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result { match self.db.contains_key(digest.to_vec()) { @@ -40,55 +37,93 @@ impl BlobService for SledBlobService { } #[instrument(skip(self), fields(blob.digest=%digest))] - fn open_read(&self, digest: &B3Digest) -> Result, Error> { + fn open_read(&self, digest: &B3Digest) -> Result>, Error> { match self.db.get(digest.to_vec()) { Ok(None) => Ok(None), - Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))), + Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))), Err(e) => Err(Error::StorageError(e.to_string())), } } #[instrument(skip(self))] - fn open_write(&self) -> Result { - Ok(SledBlobWriter::new(self.db.clone())) + fn open_write(&self) -> Result, Error> { + Ok(Box::new(SledBlobWriter::new(self.db.clone()))) } } pub struct SledBlobWriter { db: sled::Db, - buf: Vec, - hasher: blake3::Hasher, + + /// Contains the Vec and hasher, or None if already closed + writers: Option<(Vec, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option, } impl SledBlobWriter { pub fn new(db: sled::Db) -> Self { Self { - buf: Vec::default(), db, - hasher: blake3::Hasher::new(), + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, } } } impl io::Write for SledBlobWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - let bytes_written = self.buf.write(buf)?; - self.hasher.write(&buf[..bytes_written]) + fn write(&mut self, b: &[u8]) -> io::Result { + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&buf[..bytes_written]) + } + } } fn flush(&mut self) -> io::Result<()> { - self.buf.flush() + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + } } } impl BlobWriter for SledBlobWriter { - fn close(self) -> Result { - let digest = self.hasher.finalize(); - self.db - .insert(digest.as_bytes(), self.buf) - .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; - - // We know self.hasher is doing blake3 hashing, so this won't fail. - Ok(B3Digest::from_vec(digest.as_bytes().to_vec()).unwrap()) + fn close(&mut self) -> Result { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // Only insert if the blob doesn't already exist. + if !self.db.contains_key(&digest.to_vec()).map_err(|e| { + Error::StorageError(format!("Unable to check if we have blob {}: {}", digest, e)) + })? { + // put buf in there. This will move buf out. + self.db + .insert(digest.to_vec(), buf) + .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } } } diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index 206e5eaba975..a0bd1de5e149 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,5 +1,6 @@ -use crate::{blobservice::BlobService, directoryservice::DirectoryService}; -use crate::{blobservice::BlobWriter, directoryservice::DirectoryPutter, proto}; +use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryService; +use crate::{directoryservice::DirectoryPutter, proto}; use std::{ collections::HashMap, fmt::Debug, @@ -55,8 +56,8 @@ impl From for Error { // // It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] -fn process_entry( - blob_service: &BS, +fn process_entry( + blob_service: &Box, directory_putter: &mut DP, entry: &walkdir::DirEntry, maybe_directory: Option, @@ -144,8 +145,8 @@ fn process_entry( /// possibly register it somewhere (and potentially rename it based on some /// naming scheme. #[instrument(skip(blob_service, directory_service), fields(path=?p))] -pub fn ingest_path + Debug>( - blob_service: &BS, +pub fn ingest_path + Debug>( + blob_service: &Box, directory_service: &DS, p: P, ) -> Result { diff --git a/tvix/store/src/nar/non_caching_calculation_service.rs b/tvix/store/src/nar/non_caching_calculation_service.rs index 8a080cb4df5e..b743f264b0ff 100644 --- a/tvix/store/src/nar/non_caching_calculation_service.rs +++ b/tvix/store/src/nar/non_caching_calculation_service.rs @@ -10,22 +10,19 @@ use super::{NARCalculationService, RenderError}; /// A NAR calculation service which simply renders the whole NAR whenever /// we ask for the calculation. -#[derive(Clone)] -pub struct NonCachingNARCalculationService { - nar_renderer: NARRenderer, +pub struct NonCachingNARCalculationService { + nar_renderer: NARRenderer, } -impl NonCachingNARCalculationService { - pub fn new(blob_service: BS, directory_service: DS) -> Self { +impl NonCachingNARCalculationService { + pub fn new(blob_service: Box, directory_service: DS) -> Self { Self { nar_renderer: NARRenderer::new(blob_service, directory_service), } } } -impl NARCalculationService - for NonCachingNARCalculationService -{ +impl NARCalculationService for NonCachingNARCalculationService { fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), RenderError> { let h = Sha256::new(); let mut cw = CountWrite::from(h); diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index c10f2ddf52fa..a9a6d989e1b8 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -11,14 +11,13 @@ use tracing::warn; /// A NAR renderer, using a blob_service, chunk_service and directory_service /// to render a NAR to a writer. -#[derive(Clone)] -pub struct NARRenderer { - blob_service: BS, +pub struct NARRenderer { + blob_service: Box, directory_service: DS, } -impl NARRenderer { - pub fn new(blob_service: BS, directory_service: DS) -> Self { +impl NARRenderer { + pub fn new(blob_service: Box, directory_service: DS) -> Self { Self { blob_service, directory_service, diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 3ec1d68872c7..066790daf370 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,7 +1,5 @@ use crate::{ - blobservice::{BlobService, BlobWriter}, - proto::sync_read_into_async_read::SyncReadIntoAsyncRead, - B3Digest, + blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest, }; use std::{collections::VecDeque, io, pin::Pin}; use tokio::task; @@ -10,12 +8,12 @@ use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{instrument, warn}; -pub struct GRPCBlobServiceWrapper { - blob_service: BS, +pub struct GRPCBlobServiceWrapper { + blob_service: Box, } -impl From for GRPCBlobServiceWrapper { - fn from(value: BS) -> Self { +impl From> for GRPCBlobServiceWrapper { + fn from(value: Box) -> Self { Self { blob_service: value, } @@ -23,9 +21,7 @@ impl From for GRPCBlobServiceWrapper { } #[async_trait] -impl super::blob_service_server::BlobService - for GRPCBlobServiceWrapper -{ +impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 type ReadStream = Pin> + Send + 'static>>; diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs index 02e04e7d723f..2f18ea4abb99 100644 --- a/tvix/store/src/proto/tests/grpc_blobservice.rs +++ b/tvix/store/src/proto/tests/grpc_blobservice.rs @@ -1,12 +1,10 @@ -use crate::blobservice::BlobService; use crate::proto::blob_service_server::BlobService as GRPCBlobService; use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest}; use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST}; use crate::tests::utils::gen_blob_service; use tokio_stream::StreamExt; -fn gen_grpc_blob_service( -) -> GRPCBlobServiceWrapper { +fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper { let blob_service = gen_blob_service(); GRPCBlobServiceWrapper::from(blob_service) } diff --git a/tvix/store/src/store_io.rs b/tvix/store/src/store_io.rs index fb46204e505f..a2967c06ff98 100644 --- a/tvix/store/src/store_io.rs +++ b/tvix/store/src/store_io.rs @@ -29,24 +29,19 @@ use crate::{ /// This is to both cover cases of syntactically valid store paths, that exist /// on the filesystem (still managed by Nix), as well as being able to read /// files outside store paths. -pub struct TvixStoreIO< - BS: BlobService, - DS: DirectoryService, - PS: PathInfoService, - NCS: NARCalculationService, -> { - blob_service: BS, +pub struct TvixStoreIO { + blob_service: Box, directory_service: DS, path_info_service: PS, nar_calculation_service: NCS, std_io: StdIO, } -impl - TvixStoreIO +impl + TvixStoreIO { pub fn new( - blob_service: BS, + blob_service: Box, directory_service: DS, path_info_service: PS, nar_calculation_service: NCS, @@ -183,8 +178,8 @@ fn calculate_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> S build_regular_ca_path(name, &nar_hash_with_mode, Vec::::new(), false).unwrap() } -impl EvalIO - for TvixStoreIO +impl EvalIO + for TvixStoreIO { #[instrument(skip(self), ret, err)] fn path_exists(&self, path: &Path) -> Result { diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs index 8b66cb024bf0..725d467bd0cf 100644 --- a/tvix/store/src/tests/import.rs +++ b/tvix/store/src/tests/import.rs @@ -1,5 +1,4 @@ use super::utils::{gen_blob_service, gen_directory_service}; -use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::import::ingest_path; use crate::proto; diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 198df0083cc9..3d7cfd4a96a7 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -1,5 +1,3 @@ -use crate::blobservice::BlobService; -use crate::blobservice::BlobWriter; use crate::directoryservice::DirectoryService; use crate::nar::NARRenderer; use crate::proto::DirectoryNode; diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs index 2991feed41db..ec379bddcf77 100644 --- a/tvix/store/src/tests/utils.rs +++ b/tvix/store/src/tests/utils.rs @@ -4,8 +4,8 @@ use crate::{ pathinfoservice::{MemoryPathInfoService, PathInfoService}, }; -pub fn gen_blob_service() -> impl BlobService + Send + Sync + Clone + 'static { - MemoryBlobService::default() +pub fn gen_blob_service() -> Box { + Box::new(MemoryBlobService::default()) } pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + 'static { -- cgit 1.4.1