From 27ff98000b0cdf0ed30eb8837c7d44cd3e79d32f Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Thu, 25 May 2023 17:52:08 +0300 Subject: feat(tvix/store): eliminate generics in BlobStore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit To construct various stores at runtime, we need to eliminate associated types from the BlobService trait, and return Box instead of specific types. This also means we can't consume self in the close() method, so everything we write to is put in an Option<>, and during the first close we take from there. Change-Id: Ia523b6ab2f2a5276f51cb5d17e81a5925bce69b6 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8647 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: tazjin --- tvix/store/src/bin/tvix-store.rs | 7 +- tvix/store/src/blobservice/grpc.rs | 96 ++++++++++++++-------- tvix/store/src/blobservice/memory.rs | 89 ++++++++++++++------ tvix/store/src/blobservice/mod.rs | 15 ++-- tvix/store/src/blobservice/sled.rs | 81 ++++++++++++------ tvix/store/src/import.rs | 13 +-- .../src/nar/non_caching_calculation_service.rs | 13 ++- tvix/store/src/nar/renderer.rs | 9 +- tvix/store/src/proto/grpc_blobservice_wrapper.rs | 16 ++-- tvix/store/src/proto/tests/grpc_blobservice.rs | 4 +- tvix/store/src/store_io.rs | 19 ++--- tvix/store/src/tests/import.rs | 1 - tvix/store/src/tests/nar_renderer.rs | 2 - tvix/store/src/tests/utils.rs | 4 +- 14 files changed, 228 insertions(+), 141 deletions(-) (limited to 'tvix/store') diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index ce27011571..8fe62e5b5b 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -6,6 +6,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use tracing_subscriber::prelude::*; +use tvix_store::blobservice::BlobService; use tvix_store::blobservice::GRPCBlobService; use tvix_store::blobservice::SledBlobService; use tvix_store::directoryservice::GRPCDirectoryService; @@ -112,14 +113,14 @@ async fn main() -> Result<(), Box> { let mut server = Server::builder(); let nar_calculation_service = NonCachingNARCalculationService::new( - blob_service.clone(), + Box::new(blob_service.clone()), directory_service.clone(), ); #[allow(unused_mut)] let mut router = server .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( - blob_service, + Box::new(blob_service) as Box, ))) .add_service(DirectoryServiceServer::new( GRPCDirectoryServiceWrapper::from(directory_service), @@ -156,7 +157,7 @@ async fn main() -> Result<(), Box> { GRPCNARCalculationService::from_client(path_info_service_client); let io = Arc::new(TvixStoreIO::new( - blob_service, + Box::new(blob_service), directory_service, path_info_service, nar_calculation_service, diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index 0b08fbf46a..46ec64bce7 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -47,9 +47,6 @@ impl GRPCBlobService { } impl BlobService for GRPCBlobService { - type BlobReader = Box; - type BlobWriter = GRPCBlobWriter; - #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result { // Get a new handle to the gRPC client, and copy the digest. @@ -76,7 +73,10 @@ impl BlobService for GRPCBlobService { // On success, this returns a Ok(Some(io::Read)), which can be used to read // the contents of the Blob, identified by the digest. - fn open_read(&self, digest: &B3Digest) -> Result, crate::Error> { + fn open_read( + &self, + digest: &B3Digest, + ) -> Result>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest = digest.clone(); @@ -123,7 +123,7 @@ impl BlobService for GRPCBlobService { /// Returns a [Self::BlobWriter], that'll internally wrap each write in a // [proto::BlobChunk] and which is passed to the - fn open_write(&self) -> Result { + fn open_write(&self) -> Result, crate::Error> { let mut grpc_client = self.grpc_client.clone(); // set up an mpsc channel passing around Bytes. @@ -155,11 +155,11 @@ impl BlobService for GRPCBlobService { // … which is then turned into a [io::Write]. let writer = SyncIoBridge::new(async_writer); - Ok(GRPCBlobWriter { + Ok(Box::new(GRPCBlobWriter { tokio_handle: self.tokio_handle.clone(), // TODO: is the clone() ok here? - task, - inner_writer: writer, - }) + task_and_writer: Some((task, writer)), + digest: None, + })) } } @@ -176,42 +176,74 @@ pub struct GRPCBlobWriter { /// containing the put request. tokio_handle: tokio::runtime::Handle, - /// The task containing the put request. - task: JoinHandle>, + /// The task containing the put request, and the inner writer, if we're still writing. + task_and_writer: Option<( + JoinHandle>, + BridgedWriter, + )>, - /// The inner Writer. - inner_writer: BridgedWriter, + /// The digest that has been returned, if we successfully closed. + digest: Option, } impl BlobWriter for GRPCBlobWriter { - fn close(mut self) -> Result { - // invoke shutdown, so the inner writer closes its internal tx side of - // the channel. - self.inner_writer - .shutdown() - .map_err(|e| crate::Error::StorageError(e.to_string()))?; - - // block on the RPC call to return. - // This ensures all chunks are sent out, and have been received by the - // backend. - match self.tokio_handle.block_on(self.task)? { - Ok(resp) => { - // return the digest from the response. - B3Digest::from_vec(resp.digest).map_err(|_| { - crate::Error::StorageError("invalid root digest length in response".to_string()) - }) + fn close(&mut self) -> Result { + if self.task_and_writer.is_none() { + // if we're already closed, return the b3 digest, which must exist. + // If it doesn't, we already closed and failed once, and didn't handle the error. + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (task, mut writer) = self.task_and_writer.take().unwrap(); + + // invoke shutdown, so the inner writer closes its internal tx side of + // the channel. + writer + .shutdown() + .map_err(|e| crate::Error::StorageError(e.to_string()))?; + + // block on the RPC call to return. + // This ensures all chunks are sent out, and have been received by the + // backend. + match self.tokio_handle.block_on(task)? { + Ok(resp) => { + // return the digest from the response, and store it in self.digest for subsequent closes. + let digest = B3Digest::from_vec(resp.digest).map_err(|_| { + crate::Error::StorageError( + "invalid root digest length in response".to_string(), + ) + })?; + self.digest = Some(digest.clone()); + Ok(digest) + } + Err(e) => Err(crate::Error::StorageError(e.to_string())), } - Err(e) => Err(crate::Error::StorageError(e.to_string())), } } } impl io::Write for GRPCBlobWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - self.inner_writer.write(buf) + match &mut self.task_and_writer { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((_, ref mut writer)) => writer.write(buf), + } } fn flush(&mut self) -> io::Result<()> { - self.inner_writer.flush() + match &mut self.task_and_writer { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((_, ref mut writer)) => writer.flush(), + } } } diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 1ee59d1087..166eeabdb6 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -1,4 +1,4 @@ -use std::io::Cursor; +use std::io::{self, Cursor}; use std::{ collections::HashMap, sync::{Arc, RwLock}, @@ -14,63 +14,102 @@ pub struct MemoryBlobService { } impl BlobService for MemoryBlobService { - type BlobReader = Cursor>; - type BlobWriter = MemoryBlobWriter; - #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result { let db = self.db.read().unwrap(); Ok(db.contains_key(digest)) } - fn open_read(&self, digest: &B3Digest) -> Result, Error> { + fn open_read(&self, digest: &B3Digest) -> Result>, Error> { let db = self.db.read().unwrap(); - Ok(db.get(digest).map(|x| Cursor::new(x.clone()))) + match db.get(digest).map(|x| Cursor::new(x.clone())) { + Some(result) => Ok(Some(Box::new(result))), + None => Ok(None), + } } #[instrument(skip(self))] - fn open_write(&self) -> Result { - Ok(MemoryBlobWriter::new(self.db.clone())) + fn open_write(&self) -> Result, Error> { + Ok(Box::new(MemoryBlobWriter::new(self.db.clone()))) } } pub struct MemoryBlobWriter { db: Arc>>>, - buf: Vec, + /// Contains the Vec and hasher, or None if already closed + writers: Option<(Vec, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option, } impl MemoryBlobWriter { fn new(db: Arc>>>) -> Self { Self { - buf: Vec::new(), db, + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, } } } impl std::io::Write for MemoryBlobWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.buf.write(buf) + fn write(&mut self, b: &[u8]) -> std::io::Result { + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&buf[..bytes_written]) + } + } } fn flush(&mut self) -> std::io::Result<()> { - self.buf.flush() + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + } } } impl BlobWriter for MemoryBlobWriter { - fn close(self) -> Result { - // in this memory implementation, we don't actually bother hashing - // incrementally while writing, but do it at the end. - let mut hasher = blake3::Hasher::new(); - hasher.update(&self.buf); - let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); - - // open the database for writing. - let mut db = self.db.write()?; - db.insert(digest.clone(), self.buf); - - Ok(digest) + fn close(&mut self) -> Result { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // Only insert if the blob doesn't already exist. + let db = self.db.read()?; + if !db.contains_key(&digest) { + // drop the read lock, so we can open for writing. + drop(db); + + // open the database for writing. + let mut db = self.db.write()?; + + // and put buf in there. This will move buf out. + db.insert(digest.clone(), buf); + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } } } diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index c5a2de1246..08565285c0 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -14,28 +14,25 @@ pub use self::sled::SledBlobService; /// It provides functions to check whether a given blob exists, /// a way to get a [io::Read] to a blob, and a method to initiate writing a new /// Blob, which returns a [BlobWriter], that can be used -pub trait BlobService { - type BlobReader: io::Read + Send + std::marker::Unpin; - type BlobWriter: BlobWriter + Send; - +pub trait BlobService: Send + Sync { /// Check if the service has the blob, by its content hash. fn has(&self, digest: &B3Digest) -> Result; /// Request a blob from the store, by its content hash. Returns a Option. - fn open_read(&self, digest: &B3Digest) -> Result, Error>; + fn open_read(&self, digest: &B3Digest) -> Result>, Error>; /// Insert a new blob into the store. Returns a [BlobWriter], which /// implements [io::Write] and a [BlobWriter::close]. /// TODO: is there any reason we want this to be a Result<>, and not just T? - fn open_write(&self) -> Result; + fn open_write(&self) -> Result, Error>; } /// A [io::Write] that you need to close() afterwards, and get back the digest /// of the written blob. -pub trait BlobWriter: io::Write { +pub trait BlobWriter: io::Write + Send + Sync + 'static { /// Signal there's no more data to be written, and return the digest of the /// contents written. /// - /// This consumes self, so it's not possible to close twice. - fn close(self) -> Result; + /// Closing a already-closed BlobWriter is a no-op. + fn close(&mut self) -> Result; } diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 2b09033534..3b130fcd9a 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -28,9 +28,6 @@ impl SledBlobService { } impl BlobService for SledBlobService { - type BlobReader = Cursor>; - type BlobWriter = SledBlobWriter; - #[instrument(skip(self), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result { match self.db.contains_key(digest.to_vec()) { @@ -40,55 +37,93 @@ impl BlobService for SledBlobService { } #[instrument(skip(self), fields(blob.digest=%digest))] - fn open_read(&self, digest: &B3Digest) -> Result, Error> { + fn open_read(&self, digest: &B3Digest) -> Result>, Error> { match self.db.get(digest.to_vec()) { Ok(None) => Ok(None), - Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))), + Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))), Err(e) => Err(Error::StorageError(e.to_string())), } } #[instrument(skip(self))] - fn open_write(&self) -> Result { - Ok(SledBlobWriter::new(self.db.clone())) + fn open_write(&self) -> Result, Error> { + Ok(Box::new(SledBlobWriter::new(self.db.clone()))) } } pub struct SledBlobWriter { db: sled::Db, - buf: Vec, - hasher: blake3::Hasher, + + /// Contains the Vec and hasher, or None if already closed + writers: Option<(Vec, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option, } impl SledBlobWriter { pub fn new(db: sled::Db) -> Self { Self { - buf: Vec::default(), db, - hasher: blake3::Hasher::new(), + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, } } } impl io::Write for SledBlobWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - let bytes_written = self.buf.write(buf)?; - self.hasher.write(&buf[..bytes_written]) + fn write(&mut self, b: &[u8]) -> io::Result { + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&buf[..bytes_written]) + } + } } fn flush(&mut self) -> io::Result<()> { - self.buf.flush() + match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + } } } impl BlobWriter for SledBlobWriter { - fn close(self) -> Result { - let digest = self.hasher.finalize(); - self.db - .insert(digest.as_bytes(), self.buf) - .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; - - // We know self.hasher is doing blake3 hashing, so this won't fail. - Ok(B3Digest::from_vec(digest.as_bytes().to_vec()).unwrap()) + fn close(&mut self) -> Result { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // Only insert if the blob doesn't already exist. + if !self.db.contains_key(&digest.to_vec()).map_err(|e| { + Error::StorageError(format!("Unable to check if we have blob {}: {}", digest, e)) + })? { + // put buf in there. This will move buf out. + self.db + .insert(digest.to_vec(), buf) + .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } } } diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index 206e5eaba9..a0bd1de5e1 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,5 +1,6 @@ -use crate::{blobservice::BlobService, directoryservice::DirectoryService}; -use crate::{blobservice::BlobWriter, directoryservice::DirectoryPutter, proto}; +use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryService; +use crate::{directoryservice::DirectoryPutter, proto}; use std::{ collections::HashMap, fmt::Debug, @@ -55,8 +56,8 @@ impl From for Error { // // It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] -fn process_entry( - blob_service: &BS, +fn process_entry( + blob_service: &Box, directory_putter: &mut DP, entry: &walkdir::DirEntry, maybe_directory: Option, @@ -144,8 +145,8 @@ fn process_entry( /// possibly register it somewhere (and potentially rename it based on some /// naming scheme. #[instrument(skip(blob_service, directory_service), fields(path=?p))] -pub fn ingest_path + Debug>( - blob_service: &BS, +pub fn ingest_path + Debug>( + blob_service: &Box, directory_service: &DS, p: P, ) -> Result { diff --git a/tvix/store/src/nar/non_caching_calculation_service.rs b/tvix/store/src/nar/non_caching_calculation_service.rs index 8a080cb4df..b743f264b0 100644 --- a/tvix/store/src/nar/non_caching_calculation_service.rs +++ b/tvix/store/src/nar/non_caching_calculation_service.rs @@ -10,22 +10,19 @@ use super::{NARCalculationService, RenderError}; /// A NAR calculation service which simply renders the whole NAR whenever /// we ask for the calculation. -#[derive(Clone)] -pub struct NonCachingNARCalculationService { - nar_renderer: NARRenderer, +pub struct NonCachingNARCalculationService { + nar_renderer: NARRenderer, } -impl NonCachingNARCalculationService { - pub fn new(blob_service: BS, directory_service: DS) -> Self { +impl NonCachingNARCalculationService { + pub fn new(blob_service: Box, directory_service: DS) -> Self { Self { nar_renderer: NARRenderer::new(blob_service, directory_service), } } } -impl NARCalculationService - for NonCachingNARCalculationService -{ +impl NARCalculationService for NonCachingNARCalculationService { fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), RenderError> { let h = Sha256::new(); let mut cw = CountWrite::from(h); diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index c10f2ddf52..a9a6d989e1 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -11,14 +11,13 @@ use tracing::warn; /// A NAR renderer, using a blob_service, chunk_service and directory_service /// to render a NAR to a writer. -#[derive(Clone)] -pub struct NARRenderer { - blob_service: BS, +pub struct NARRenderer { + blob_service: Box, directory_service: DS, } -impl NARRenderer { - pub fn new(blob_service: BS, directory_service: DS) -> Self { +impl NARRenderer { + pub fn new(blob_service: Box, directory_service: DS) -> Self { Self { blob_service, directory_service, diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 3ec1d68872..066790daf3 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,7 +1,5 @@ use crate::{ - blobservice::{BlobService, BlobWriter}, - proto::sync_read_into_async_read::SyncReadIntoAsyncRead, - B3Digest, + blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest, }; use std::{collections::VecDeque, io, pin::Pin}; use tokio::task; @@ -10,12 +8,12 @@ use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{instrument, warn}; -pub struct GRPCBlobServiceWrapper { - blob_service: BS, +pub struct GRPCBlobServiceWrapper { + blob_service: Box, } -impl From for GRPCBlobServiceWrapper { - fn from(value: BS) -> Self { +impl From> for GRPCBlobServiceWrapper { + fn from(value: Box) -> Self { Self { blob_service: value, } @@ -23,9 +21,7 @@ impl From for GRPCBlobServiceWrapper { } #[async_trait] -impl super::blob_service_server::BlobService - for GRPCBlobServiceWrapper -{ +impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 type ReadStream = Pin> + Send + 'static>>; diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs index 02e04e7d72..2f18ea4abb 100644 --- a/tvix/store/src/proto/tests/grpc_blobservice.rs +++ b/tvix/store/src/proto/tests/grpc_blobservice.rs @@ -1,12 +1,10 @@ -use crate::blobservice::BlobService; use crate::proto::blob_service_server::BlobService as GRPCBlobService; use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest}; use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST}; use crate::tests::utils::gen_blob_service; use tokio_stream::StreamExt; -fn gen_grpc_blob_service( -) -> GRPCBlobServiceWrapper { +fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper { let blob_service = gen_blob_service(); GRPCBlobServiceWrapper::from(blob_service) } diff --git a/tvix/store/src/store_io.rs b/tvix/store/src/store_io.rs index fb46204e50..a2967c06ff 100644 --- a/tvix/store/src/store_io.rs +++ b/tvix/store/src/store_io.rs @@ -29,24 +29,19 @@ use crate::{ /// This is to both cover cases of syntactically valid store paths, that exist /// on the filesystem (still managed by Nix), as well as being able to read /// files outside store paths. -pub struct TvixStoreIO< - BS: BlobService, - DS: DirectoryService, - PS: PathInfoService, - NCS: NARCalculationService, -> { - blob_service: BS, +pub struct TvixStoreIO { + blob_service: Box, directory_service: DS, path_info_service: PS, nar_calculation_service: NCS, std_io: StdIO, } -impl - TvixStoreIO +impl + TvixStoreIO { pub fn new( - blob_service: BS, + blob_service: Box, directory_service: DS, path_info_service: PS, nar_calculation_service: NCS, @@ -183,8 +178,8 @@ fn calculate_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> S build_regular_ca_path(name, &nar_hash_with_mode, Vec::::new(), false).unwrap() } -impl EvalIO - for TvixStoreIO +impl EvalIO + for TvixStoreIO { #[instrument(skip(self), ret, err)] fn path_exists(&self, path: &Path) -> Result { diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs index 8b66cb024b..725d467bd0 100644 --- a/tvix/store/src/tests/import.rs +++ b/tvix/store/src/tests/import.rs @@ -1,5 +1,4 @@ use super::utils::{gen_blob_service, gen_directory_service}; -use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::import::ingest_path; use crate::proto; diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 198df0083c..3d7cfd4a96 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -1,5 +1,3 @@ -use crate::blobservice::BlobService; -use crate::blobservice::BlobWriter; use crate::directoryservice::DirectoryService; use crate::nar::NARRenderer; use crate::proto::DirectoryNode; diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs index 2991feed41..ec379bddcf 100644 --- a/tvix/store/src/tests/utils.rs +++ b/tvix/store/src/tests/utils.rs @@ -4,8 +4,8 @@ use crate::{ pathinfoservice::{MemoryPathInfoService, PathInfoService}, }; -pub fn gen_blob_service() -> impl BlobService + Send + Sync + Clone + 'static { - MemoryBlobService::default() +pub fn gen_blob_service() -> Box { + Box::new(MemoryBlobService::default()) } pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + 'static { -- cgit 1.4.1