diff options
Diffstat (limited to 'tvix/store/src')
21 files changed, 553 insertions, 1251 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 72814fa530e7..29f8a92cb1be 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -8,7 +8,6 @@ use nix_compat::nixhash::NixHashWithMode; use std::path::PathBuf; use tracing_subscriber::prelude::*; use tvix_store::blobservice::SledBlobService; -use tvix_store::chunkservice::SledChunkService; use tvix_store::directoryservice::SledDirectoryService; use tvix_store::import::import_path; use tvix_store::nar::NARCalculationService; @@ -87,7 +86,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { // initialize stores let mut blob_service = SledBlobService::new("blobs.sled".into())?; - let mut chunk_service = SledChunkService::new("chunks.sled".into())?; let mut directory_service = SledDirectoryService::new("directories.sled".into())?; let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?; @@ -102,15 +100,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let nar_calculation_service = NonCachingNARCalculationService::new( blob_service.clone(), - chunk_service.clone(), directory_service.clone(), ); #[allow(unused_mut)] let mut router = server - .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( + .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( blob_service, - chunk_service, ))) .add_service(DirectoryServiceServer::new( GRPCDirectoryServiceWrapper::from(directory_service), @@ -135,17 +131,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { Commands::Import { paths } => { let nar_calculation_service = NonCachingNARCalculationService::new( blob_service.clone(), - chunk_service.clone(), directory_service.clone(), ); for path in paths { - let root_node = import_path( - &mut blob_service, - &mut chunk_service, - &mut directory_service, - &path, - )?; + let root_node = import_path(&mut blob_service, &mut directory_service, &path)?; let nar_hash = NixHashWithMode::Recursive(NixHash::new( HashAlgo::Sha256, diff --git a/tvix/store/src/blobreader.rs b/tvix/store/src/blobreader.rs deleted file mode 100644 index ff0efdf15237..000000000000 --- a/tvix/store/src/blobreader.rs +++ /dev/null @@ -1,376 +0,0 @@ -use std::io::{self, Cursor, Read, Write}; - -use data_encoding::BASE64; - -use crate::{chunkservice::ChunkService, proto}; - -/// BlobReader implements reading of a blob, by querying individual chunks. -/// -/// It doesn't talk to BlobService, but assumes something has already fetched -/// blob_meta already. -pub struct BlobReader<'a, CS: ChunkService> { - // used to look up chunks - chunk_service: &'a CS, - - // internal iterator over chunk hashes and their sizes - chunks_iter: std::vec::IntoIter<proto::blob_meta::ChunkMeta>, - - // If a chunk was partially read (if buf.len() < chunk.size), - // a cursor to its contents are stored here. - current_chunk: Option<Cursor<Vec<u8>>>, -} - -impl<'a, CS: ChunkService> BlobReader<'a, CS> { - pub fn open(chunk_service: &'a CS, blob_meta: proto::BlobMeta) -> Self { - Self { - chunk_service, - chunks_iter: blob_meta.chunks.into_iter(), - current_chunk: None, - } - } - - /// reads (up to n bytes) from the current chunk into buf (if there is - /// a chunk). - /// - /// If it arrives at the end of the chunk, sets it back to None. - /// Returns a [std::io::Result<usize>] of the bytes read from the chunk. - fn read_from_current_chunk<W: std::io::Write>( - &mut self, - m: usize, - buf: &mut W, - ) -> std::io::Result<usize> { - // If there's still something in partial_chunk, read from there - // (up to m: usize bytes) and return the number of bytes read. - if let Some(current_chunk) = &mut self.current_chunk { - let result = io::copy(&mut current_chunk.take(m as u64), buf); - - match result { - Ok(n) => { - // if we were not able to read all off m bytes, - // this means we arrived at the end of the chunk. - if n < m as u64 { - self.current_chunk = None - } - - // n can never be > m, so downcasting this to usize is ok. - Ok(n as usize) - } - Err(e) => Err(e), - } - } else { - Ok(0) - } - } -} - -impl<CS: ChunkService> std::io::Read for BlobReader<'_, CS> { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { - let read_max = buf.len(); - let mut bytes_read = 0_usize; - let mut buf_w = std::io::BufWriter::new(buf); - - // read up to buf.len() bytes into buf, by reading from the current - // chunk and subsequent ones. - loop { - // try to fill buf with bytes from the current chunk - // (if there's still one) - let n = self.read_from_current_chunk(read_max - bytes_read, &mut buf_w)?; - bytes_read += n; - - // We want to make sure we don't accidentially read past more than - // we're allowed to. - assert!(bytes_read <= read_max); - - // buf is entirerly filled, we're done. - if bytes_read == read_max { - buf_w.flush()?; - break Ok(bytes_read); - } - - // Otherwise, bytes_read is < read_max, so we could still write - // more to buf. - // Check if we have more chunks to read from. - match self.chunks_iter.next() { - // No more chunks, we're done. - None => { - buf_w.flush()?; - return Ok(bytes_read); - } - // There's another chunk to visit, fetch its contents - Some(chunk_meta) => { - let chunk_meta_digest: [u8; 32] = - chunk_meta.digest.clone().try_into().map_err(|_e| { - std::io::Error::new( - io::ErrorKind::InvalidData, - format!( - "chunk in chunkmeta has wrong digest size, expected 32, got {}", - chunk_meta.digest.len(), - ), - ) - })?; - match self.chunk_service.get(&chunk_meta_digest) { - // Fetch successful, put it into `self.current_chunk` and restart the loop. - Ok(Some(chunk_data)) => { - // make sure the size matches what chunk_meta says as well. - if chunk_data.len() as u32 != chunk_meta.size { - break Err(std::io::Error::new( - io::ErrorKind::InvalidData, - format!( - "chunk_service returned chunk with wrong size for {}, expected {}, got {}", - BASE64.encode(&chunk_meta.digest), chunk_meta.size, chunk_data.len() - ) - )); - } - self.current_chunk = Some(Cursor::new(chunk_data)); - } - // Chunk requested does not exist - Ok(None) => { - break Err(std::io::Error::new( - io::ErrorKind::NotFound, - format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)), - )) - } - // Error occured while fetching the next chunk, propagate the error from the chunk service - Err(e) => { - break Err(std::io::Error::new(io::ErrorKind::InvalidData, e)); - } - } - } - } - } - } -} - -#[cfg(test)] -mod tests { - use super::BlobReader; - use crate::chunkservice::ChunkService; - use crate::proto; - use crate::tests::fixtures::DUMMY_DATA_1; - use crate::tests::fixtures::DUMMY_DATA_2; - use crate::tests::fixtures::DUMMY_DIGEST; - use crate::tests::utils::gen_chunk_service; - use std::io::Cursor; - use std::io::Read; - use std::io::Write; - - #[test] - /// reading from a blobmeta with zero chunks should produce zero bytes. - fn empty_blobmeta() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - let blobmeta = proto::BlobMeta { - chunks: vec![], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - let mut buf = Cursor::new(Vec::new()); - - let res = std::io::copy(&mut blob_reader, &mut buf); - - assert_eq!(0, res.unwrap()); - - Ok(()) - } - - #[test] - /// trying to read something where the chunk doesn't exist should fail - fn missing_chunk_fail() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: DUMMY_DIGEST.to_vec(), - size: 42, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - let mut buf = Cursor::new(Vec::new()); - - let res = std::io::copy(&mut blob_reader, &mut buf); - - assert!(res.is_err()); - - Ok(()) - } - - #[test] - /// read something containing the single (empty) chunk - fn empty_chunk() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert a single chunk - let dgst = chunk_service.put(vec![]).expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst.to_vec(), - size: 0, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - - let res = - std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed"); - - assert_eq!(res, 0, "number of bytes read must match"); - assert!(buf.is_empty(), "buf must be empty"); - - Ok(()) - } - - /// read something which contains a single chunk - #[test] - fn single_chunk() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert a single chunk - let dgst = chunk_service - .put(DUMMY_DATA_1.clone()) - .expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst.to_vec(), - size: 3, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - - let res = - std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed"); - - assert_eq!(res, 3, "number of bytes read must match"); - assert_eq!(DUMMY_DATA_1[..], buf[..], "data read must match"); - - Ok(()) - } - - /// read something referring to a chunk, but with wrong size - #[test] - fn wrong_size_fail() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert chunks - let dgst_1 = chunk_service - .put(DUMMY_DATA_1.clone()) - .expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst_1.to_vec(), - size: 42, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - - let res = std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)); - - assert!(res.is_err(), "reading must fail"); - - Ok(()) - } - - /// read something referring to multiple chunks - #[test] - fn multiple_chunks() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert chunks - let dgst_1 = chunk_service - .put(DUMMY_DATA_1.clone()) - .expect("must succeed"); - let dgst_2 = chunk_service - .put(DUMMY_DATA_2.clone()) - .expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![ - proto::blob_meta::ChunkMeta { - digest: dgst_1.to_vec(), - size: 3, - }, - proto::blob_meta::ChunkMeta { - digest: dgst_2.to_vec(), - size: 2, - }, - proto::blob_meta::ChunkMeta { - digest: dgst_1.to_vec(), - size: 3, - }, - ], - inline_bao: vec![], - }; - - // assemble ecpected data - let mut expected_data: Vec<u8> = Vec::new(); - expected_data.extend_from_slice(&DUMMY_DATA_1[..]); - expected_data.extend_from_slice(&DUMMY_DATA_2[..]); - expected_data.extend_from_slice(&DUMMY_DATA_1[..]); - - // read via io::copy - { - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta.clone()); - - let mut buf: Vec<u8> = Vec::new(); - - let res = - std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed"); - - assert_eq!(8, res, "number of bytes read must match"); - - assert_eq!(expected_data[..], buf[..], "data read must match"); - } - - // now read the same thing again, but not via io::copy, but individually - { - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - let mut cursor = Cursor::new(&mut buf); - - let mut bytes_read = 0; - - loop { - let mut smallbuf = [0xff; 1]; - match blob_reader.read(&mut smallbuf) { - Ok(n) => { - if n == 0 { - break; - } - let w_b = cursor.write(&smallbuf).unwrap(); - assert_eq!(n, w_b); - bytes_read += w_b; - } - Err(_) => { - panic!("error occured during read"); - } - } - } - - assert_eq!(8, bytes_read, "number of bytes read must match"); - assert_eq!(expected_data[..], buf[..], "data read must match"); - } - - Ok(()) - } -} diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 57c86551e4de..9a796ca2c0c8 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -1,51 +1,80 @@ use data_encoding::BASE64; +use std::io::Cursor; use std::{ collections::HashMap, sync::{Arc, RwLock}, }; -use tracing::instrument; +use tracing::{instrument, warn}; -use crate::{proto, Error}; +use super::{BlobService, BlobWriter}; +use crate::Error; -use super::BlobService; +// type B3Digest = [u8; 32]; +// struct B3Digest ([u8; 32]); #[derive(Clone, Default)] pub struct MemoryBlobService { - db: Arc<RwLock<HashMap<Vec<u8>, proto::BlobMeta>>>, + db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, } impl BlobService for MemoryBlobService { - #[instrument(skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))] - fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> { - if req.include_bao { - todo!("not implemented yet") - } + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = MemoryBlobWriter; + #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))] + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { let db = self.db.read().unwrap(); - // if include_chunks is also false, the user only wants to know if the - // blob is present at all. - if !req.include_chunks { - Ok(if db.contains_key(&req.digest) { - Some(proto::BlobMeta::default()) - } else { - None - }) - } else { - match db.get(&req.digest) { - None => Ok(None), - Some(blob_meta) => Ok(Some(blob_meta.clone())), - } + Ok(db.contains_key(digest)) + } + + fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> { + let db = self.db.read().unwrap(); + + Ok(db.get(digest).map(|x| Cursor::new(x.clone()))) + } + + #[instrument(skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(MemoryBlobWriter::new(self.db.clone())) + } +} + +pub struct MemoryBlobWriter { + db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, + + buf: Vec<u8>, +} + +impl MemoryBlobWriter { + fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self { + Self { + buf: Vec::new(), + db, } } +} +impl std::io::Write for MemoryBlobWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + self.buf.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.buf.flush() + } +} - #[instrument(skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))] - fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> { - let mut db = self.db.write().unwrap(); +impl BlobWriter for MemoryBlobWriter { + fn close(self) -> Result<[u8; 32], Error> { + // in this memory implementation, we don't actually bother hashing + // incrementally while writing, but do it at the end. + let mut hasher = blake3::Hasher::new(); + hasher.update(&self.buf); + let digest: [u8; 32] = hasher.finalize().into(); - db.insert(blob_digest.to_vec(), blob_meta); + // open the database for writing. + let mut db = self.db.write()?; + db.insert(digest, self.buf); - Ok(()) - // TODO: make sure all callers make sure the chunks exist. - // TODO: where should we calculate the bao? + Ok(digest) } } diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index 53e941795e7e..e97ccdf335a0 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -1,4 +1,6 @@ -use crate::{proto, Error}; +use std::io; + +use crate::Error; mod memory; mod sled; @@ -7,14 +9,31 @@ pub use self::memory::MemoryBlobService; pub use self::sled::SledBlobService; /// The base trait all BlobService services need to implement. -/// It provides information about how a blob is chunked, -/// and allows creating new blobs by creating a BlobMeta (referring to chunks -/// in a [crate::chunkservice::ChunkService]). +/// It provides functions to check whether a given blob exists, +/// a way to get a [io::Read] to a blob, and a method to initiate writing a new +/// Blob, which returns a [BlobWriter], that can be used pub trait BlobService { - /// Retrieve chunking information for a given blob - fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error>; + type BlobReader: io::Read + Send + std::marker::Unpin; + type BlobWriter: BlobWriter + Send; + + /// Check if the service has the blob, by its content hash. + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>; + + /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>. + fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error>; + + /// Insert a new blob into the store. Returns a [BlobWriter], which + /// implements [io::Write] and a [BlobWriter::close]. + /// TODO: is there any reason we want this to be a Result<>, and not just T? + fn open_write(&self) -> Result<Self::BlobWriter, Error>; +} - /// Insert chunking information for a given blob. - /// Implementations SHOULD make sure chunks referred do exist. - fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error>; +/// A [io::Write] that you need to close() afterwards, and get back the digest +/// of the written blob. +pub trait BlobWriter: io::Write { + /// Signal there's no more data to be written, and return the digest of the + /// contents written. + /// + /// This consumes self, so it's not possible to close twice. + fn close(self) -> Result<[u8; 32], Error>; } diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 1ae34ee5fb9c..c229f799de98 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -1,13 +1,13 @@ -use std::path::PathBuf; +use std::{ + io::{self, Cursor}, + path::PathBuf, +}; +use super::{BlobService, BlobWriter}; +use crate::Error; use data_encoding::BASE64; -use prost::Message; use tracing::instrument; -use crate::{proto, Error}; - -use super::BlobService; - #[derive(Clone)] pub struct SledBlobService { db: sled::Db, @@ -30,44 +30,69 @@ impl SledBlobService { } impl BlobService for SledBlobService { - #[instrument(name = "SledBlobService::stat", skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))] - fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> { - if req.include_bao { - todo!("not implemented yet") + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = SledBlobWriter; + + #[instrument(name = "SledBlobService::has", skip(self), fields(blob.digest=BASE64.encode(digest)))] + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { + match self.db.contains_key(digest) { + Ok(has) => Ok(has), + Err(e) => Err(Error::StorageError(e.to_string())), } + } - // if include_chunks is also false, the user only wants to know if the - // blob is present at all. - if !req.include_chunks { - match self.db.contains_key(&req.digest) { - Ok(false) => Ok(None), - Ok(true) => Ok(Some(proto::BlobMeta::default())), - Err(e) => Err(Error::StorageError(e.to_string())), - } - } else { - match self.db.get(&req.digest) { - Ok(None) => Ok(None), - Ok(Some(data)) => match proto::BlobMeta::decode(&*data) { - Ok(blob_meta) => Ok(Some(blob_meta)), - Err(e) => Err(Error::StorageError(format!( - "unable to parse blobmeta message for blob {}: {}", - BASE64.encode(&req.digest), - e - ))), - }, - Err(e) => Err(Error::StorageError(e.to_string())), - } + #[instrument(name = "SledBlobService::open_read", skip(self), fields(blob.digest=BASE64.encode(digest)))] + fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> { + match self.db.get(digest) { + Ok(None) => Ok(None), + Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))), + Err(e) => Err(Error::StorageError(e.to_string())), } } - #[instrument(name = "SledBlobService::put", skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))] - fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> { - let result = self.db.insert(blob_digest, blob_meta.encode_to_vec()); - if let Err(e) = result { - return Err(Error::StorageError(e.to_string())); + #[instrument(name = "SledBlobService::open_write", skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(SledBlobWriter::new(self.db.clone())) + } +} + +pub struct SledBlobWriter { + db: sled::Db, + buf: Vec<u8>, + hasher: blake3::Hasher, +} + +impl SledBlobWriter { + pub fn new(db: sled::Db) -> Self { + Self { + buf: Vec::default(), + db, + hasher: blake3::Hasher::new(), } - Ok(()) - // TODO: make sure all callers make sure the chunks exist. - // TODO: where should we calculate the bao? + } +} + +impl io::Write for SledBlobWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + let bytes_written = self.buf.write(buf)?; + self.hasher.write(&buf[..bytes_written]) + } + + fn flush(&mut self) -> io::Result<()> { + self.buf.flush() + } +} + +impl BlobWriter for SledBlobWriter { + fn close(self) -> Result<[u8; 32], Error> { + let digest = self.hasher.finalize(); + self.db.insert(digest.as_bytes(), self.buf).map_err(|e| { + Error::StorageError(format!("unable to insert blob: {}", e.to_string())) + })?; + + Ok(digest + .to_owned() + .try_into() + .map_err(|_| Error::StorageError("invalid digest length in response".to_string()))?) } } diff --git a/tvix/store/src/chunkservice/memory.rs b/tvix/store/src/chunkservice/memory.rs deleted file mode 100644 index 9fe4dc17d5ec..000000000000 --- a/tvix/store/src/chunkservice/memory.rs +++ /dev/null @@ -1,53 +0,0 @@ -use data_encoding::BASE64; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; -use tracing::instrument; - -use crate::Error; - -use super::ChunkService; - -#[derive(Clone, Default)] -pub struct MemoryChunkService { - db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, -} - -impl ChunkService for MemoryChunkService { - #[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))] - fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { - let db = self.db.read().unwrap(); - Ok(db.get(digest).is_some()) - } - - #[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))] - fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> { - let db = self.db.read().unwrap(); - match db.get(digest) { - None => Ok(None), - Some(data) => { - // calculate the hash to verify this is really what we expect - let actual_digest = blake3::hash(data).as_bytes().to_vec(); - if actual_digest != digest { - return Err(Error::StorageError(format!( - "invalid hash encountered when reading chunk, expected {}, got {}", - BASE64.encode(digest), - BASE64.encode(&actual_digest), - ))); - } - Ok(Some(data.clone())) - } - } - } - - #[instrument(skip(self, data))] - fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> { - let digest = blake3::hash(&data); - - let mut db = self.db.write().unwrap(); - db.insert(*digest.as_bytes(), data); - - Ok(*digest.as_bytes()) - } -} diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs deleted file mode 100644 index faf0a88f151a..000000000000 --- a/tvix/store/src/chunkservice/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -mod util; - -pub mod memory; -pub mod sled; - -use crate::Error; - -pub use self::memory::MemoryChunkService; -pub use self::sled::SledChunkService; -pub use self::util::read_all_and_chunk; -pub use self::util::update_hasher; -pub use self::util::upload_chunk; - -/// The base trait all ChunkService services need to implement. -/// It allows checking for the existence, download and upload of chunks. -/// It's usually used after consulting a [crate::blobservice::BlobService] for -/// chunking information. -pub trait ChunkService { - /// check if the service has a chunk, given by its digest. - fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>; - - /// retrieve a chunk by its digest. Implementations MUST validate the digest - /// matches. - fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error>; - - /// insert a chunk. returns the digest of the chunk, or an error. - fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error>; -} diff --git a/tvix/store/src/chunkservice/sled.rs b/tvix/store/src/chunkservice/sled.rs deleted file mode 100644 index 8e86e1825b28..000000000000 --- a/tvix/store/src/chunkservice/sled.rs +++ /dev/null @@ -1,70 +0,0 @@ -use std::path::PathBuf; - -use data_encoding::BASE64; -use tracing::instrument; - -use crate::Error; - -use super::ChunkService; - -#[derive(Clone)] -pub struct SledChunkService { - db: sled::Db, -} - -impl SledChunkService { - pub fn new(p: PathBuf) -> Result<Self, sled::Error> { - let config = sled::Config::default().use_compression(true).path(p); - let db = config.open()?; - - Ok(Self { db }) - } - - pub fn new_temporary() -> Result<Self, sled::Error> { - let config = sled::Config::default().temporary(true); - let db = config.open()?; - - Ok(Self { db }) - } -} - -impl ChunkService for SledChunkService { - #[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))] - fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { - match self.db.get(digest) { - Ok(None) => Ok(false), - Ok(Some(_)) => Ok(true), - Err(e) => Err(Error::StorageError(e.to_string())), - } - } - - #[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))] - fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> { - match self.db.get(digest) { - Ok(None) => Ok(None), - Ok(Some(data)) => { - // calculate the hash to verify this is really what we expect - let actual_digest = blake3::hash(&data).as_bytes().to_vec(); - if actual_digest != digest { - return Err(Error::StorageError(format!( - "invalid hash encountered when reading chunk, expected {}, got {}", - BASE64.encode(digest), - BASE64.encode(&actual_digest), - ))); - } - Ok(Some(Vec::from(&*data))) - } - Err(e) => Err(Error::StorageError(e.to_string())), - } - } - - #[instrument(name = "SledChunkService::put", skip(self, data))] - fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> { - let digest = blake3::hash(&data); - let result = self.db.insert(digest.as_bytes(), data); - if let Err(e) = result { - return Err(Error::StorageError(e.to_string())); - } - Ok(*digest.as_bytes()) - } -} diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs deleted file mode 100644 index 2ad663733b04..000000000000 --- a/tvix/store/src/chunkservice/util.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::{proto, Error}; -use std::io::Read; -use tracing::{debug, instrument}; - -use super::ChunkService; - -/// uploads a chunk to a chunk service, and returns its digest (or an error) when done. -#[instrument(skip_all, err)] -pub fn upload_chunk<CS: ChunkService>( - chunk_service: &CS, - chunk_data: Vec<u8>, -) -> Result<[u8; 32], Error> { - let mut hasher = blake3::Hasher::new(); - update_hasher(&mut hasher, &chunk_data); - let digest = hasher.finalize(); - - if chunk_service.has(digest.as_bytes())? { - debug!("already has chunk, skipping"); - } - let digest_resp = chunk_service.put(chunk_data)?; - - assert_eq!(&digest_resp, digest.as_bytes()); - - Ok(*digest.as_bytes()) -} - -/// reads through a reader, writes chunks to a [ChunkService] and returns a -/// [proto::BlobMeta] pointing to all the chunks. -#[instrument(skip_all, err)] -pub fn read_all_and_chunk<CS: ChunkService, R: Read>( - chunk_service: &CS, - r: R, -) -> Result<(Vec<u8>, proto::BlobMeta), Error> { - let mut blob_meta = proto::BlobMeta::default(); - - // hash the file contents, upload chunks if not there yet - let mut blob_hasher = blake3::Hasher::new(); - - // TODO: play with chunking sizes - let chunker_avg_size = 64 * 1024; - let chunker_min_size = chunker_avg_size / 4; - let chunker_max_size = chunker_avg_size * 4; - - let chunker = - fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size); - - for chunking_result in chunker { - let chunk = chunking_result.unwrap(); - // TODO: convert to error::UnableToRead - - let chunk_len = chunk.data.len() as u32; - - // update calculate blob hash - update_hasher(&mut blob_hasher, &chunk.data); - - let chunk_digest = upload_chunk(chunk_service, chunk.data)?; - - blob_meta.chunks.push(proto::blob_meta::ChunkMeta { - digest: chunk_digest.to_vec(), - size: chunk_len, - }); - } - Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta)) -} - -/// updates a given hasher with more data. Uses rayon if the data is -/// sufficiently big. -/// -/// From the docs: -/// -/// To get any performance benefit from multithreading, the input buffer needs -/// to be large. As a rule of thumb on x86_64, update_rayon is slower than -/// update for inputs under 128 KiB. That threshold varies quite a lot across -/// different processors, and it’s important to benchmark your specific use -/// case. -/// -/// We didn't benchmark yet, so these numbers might need tweaking. -#[instrument(skip_all)] -pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) { - if data.len() > 128 * 1024 { - hasher.update_rayon(data); - } else { - hasher.update(data); - } -} diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index e62097ec468d..bf80eb4b71b9 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,19 +1,17 @@ -use crate::{chunkservice::read_all_and_chunk, directoryservice::DirectoryPutter, proto}; +use crate::{blobservice::BlobService, directoryservice::DirectoryService}; +use crate::{blobservice::BlobWriter, directoryservice::DirectoryPutter, proto}; use std::{ collections::HashMap, fmt::Debug, fs, fs::File, + io, os::unix::prelude::PermissionsExt, path::{Path, PathBuf}, }; use tracing::instrument; use walkdir::WalkDir; -use crate::{ - blobservice::BlobService, chunkservice::ChunkService, directoryservice::DirectoryService, -}; - #[derive(Debug, thiserror::Error)] pub enum Error { #[error("failed to upload directory at {0}: {1}")] @@ -57,9 +55,8 @@ impl From<super::Error> for Error { // // It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] -fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: DirectoryPutter>( +fn process_entry<BS: BlobService, DP: DirectoryPutter>( blob_service: &mut BS, - chunk_service: &mut CS, directory_putter: &mut DP, entry: &walkdir::DirEntry, maybe_directory: Option<proto::Directory>, @@ -112,23 +109,16 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire .metadata() .map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?; - let file = File::open(entry_path.clone()) + let mut file = File::open(entry_path.clone()) .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?; - let (blob_digest, blob_meta) = read_all_and_chunk(chunk_service, file)?; - - // upload blobmeta if not there yet - if blob_service - .stat(&proto::StatBlobRequest { - digest: blob_digest.to_vec(), - include_chunks: false, - include_bao: false, - })? - .is_none() - { - // upload blobmeta - blob_service.put(&blob_digest, blob_meta)?; - } + let mut writer = blob_service.open_write()?; + + if let Err(e) = io::copy(&mut file, &mut writer) { + return Err(Error::UnableToRead(entry_path, e)); + }; + + let digest = writer.close()?; return Ok(proto::node::Node::File(proto::FileNode { name: entry @@ -136,7 +126,7 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire .to_str() .map(|s| Ok(s.to_owned())) .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?, - digest: blob_digest, + digest: digest.to_vec(), size: metadata.len() as u32, // If it's executable by the user, it'll become executable. // This matches nix's dump() function behaviour. @@ -152,15 +142,9 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire /// to the PathInfoService. // // returns the root node, or an error. -#[instrument(skip(blob_service, chunk_service, directory_service), fields(path=?p))] -pub fn import_path< - BS: BlobService, - CS: ChunkService + std::marker::Sync, - DS: DirectoryService, - P: AsRef<Path> + Debug, ->( +#[instrument(skip(blob_service, directory_service), fields(path=?p))] +pub fn import_path<BS: BlobService, DS: DirectoryService, P: AsRef<Path> + Debug>( blob_service: &mut BS, - chunk_service: &mut CS, directory_service: &mut DS, p: P, ) -> Result<proto::node::Node, Error> { @@ -212,13 +196,7 @@ pub fn import_path< } }; - let node = process_entry( - blob_service, - chunk_service, - &mut directory_putter, - &entry, - maybe_directory, - )?; + let node = process_entry(blob_service, &mut directory_putter, &entry, maybe_directory)?; if entry.depth() == 0 { return Ok(node); diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index aac650b1ca8b..3ce826f98ba6 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,15 +1,12 @@ -mod blobreader; mod errors; pub mod blobservice; -pub mod chunkservice; pub mod directoryservice; pub mod import; pub mod nar; pub mod pathinfoservice; pub mod proto; -pub use blobreader::BlobReader; pub use errors::Error; #[cfg(test)] diff --git a/tvix/store/src/nar/non_caching_calculation_service.rs b/tvix/store/src/nar/non_caching_calculation_service.rs index f77f0b30d61f..94dd51bc6a7f 100644 --- a/tvix/store/src/nar/non_caching_calculation_service.rs +++ b/tvix/store/src/nar/non_caching_calculation_service.rs @@ -2,7 +2,6 @@ use count_write::CountWrite; use sha2::{Digest, Sha256}; use crate::blobservice::BlobService; -use crate::chunkservice::ChunkService; use crate::directoryservice::DirectoryService; use crate::proto; @@ -12,26 +11,20 @@ use super::{NARCalculationService, RenderError}; /// A NAR calculation service which simply renders the whole NAR whenever /// we ask for the calculation. #[derive(Clone)] -pub struct NonCachingNARCalculationService< - BS: BlobService, - CS: ChunkService + Clone, - DS: DirectoryService, -> { - nar_renderer: NARRenderer<BS, CS, DS>, +pub struct NonCachingNARCalculationService<BS: BlobService, DS: DirectoryService> { + nar_renderer: NARRenderer<BS, DS>, } -impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> - NonCachingNARCalculationService<BS, CS, DS> -{ - pub fn new(blob_service: BS, chunk_service: CS, directory_service: DS) -> Self { +impl<BS: BlobService, DS: DirectoryService> NonCachingNARCalculationService<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { Self { - nar_renderer: NARRenderer::new(blob_service, chunk_service, directory_service), + nar_renderer: NARRenderer::new(blob_service, directory_service), } } } -impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARCalculationService - for NonCachingNARCalculationService<BS, CS, DS> +impl<BS: BlobService, DS: DirectoryService> NARCalculationService + for NonCachingNARCalculationService<BS, DS> { fn calculate_nar( &self, diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index a061dad9bb35..b080a713ec0a 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -1,10 +1,11 @@ +use std::io::{self, BufReader}; + use crate::{ blobservice::BlobService, - chunkservice::ChunkService, directoryservice::DirectoryService, proto::{self, NamedNode}, - BlobReader, }; +use data_encoding::BASE64; use nix_compat::nar; use super::RenderError; @@ -12,17 +13,15 @@ use super::RenderError; /// A NAR renderer, using a blob_service, chunk_service and directory_service /// to render a NAR to a writer. #[derive(Clone)] -pub struct NARRenderer<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> { +pub struct NARRenderer<BS: BlobService, DS: DirectoryService> { blob_service: BS, - chunk_service: CS, directory_service: DS, } -impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARRenderer<BS, CS, DS> { - pub fn new(blob_service: BS, chunk_service: CS, directory_service: DS) -> Self { +impl<BS: BlobService, DS: DirectoryService> NARRenderer<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { Self { blob_service, - chunk_service, directory_service, } } @@ -65,49 +64,22 @@ impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARRendere )) })?; - // query blob_service for blob_meta - let resp = self - .blob_service - .stat(&proto::StatBlobRequest { - digest: digest.to_vec(), - include_chunks: true, - ..Default::default() - }) - .map_err(RenderError::StoreError)?; + // TODO: handle error + let mut blob_reader = match self.blob_service.open_read(&digest).unwrap() { + Some(blob_reader) => Ok(BufReader::new(blob_reader)), + None => Err(RenderError::NARWriterError(io::Error::new( + io::ErrorKind::NotFound, + format!("blob with digest {} not found", BASE64.encode(&digest)), + ))), + }?; - match resp { - // if it's None, that's an error! - None => { - return Err(RenderError::BlobNotFound( - digest.to_vec(), - proto_file_node.name.to_owned(), - )); - } - Some(blob_meta) => { - // make sure the blob_meta size matches what we expect from proto_file_node - let blob_meta_size = blob_meta.chunks.iter().fold(0, |acc, e| acc + e.size); - if blob_meta_size != proto_file_node.size { - return Err(RenderError::UnexpectedBlobMeta( - digest.to_vec(), - proto_file_node.name.to_owned(), - proto_file_node.size, - blob_meta_size, - )); - } - - let mut blob_reader = std::io::BufReader::new(BlobReader::open( - &self.chunk_service, - blob_meta, - )); - nar_node - .file( - proto_file_node.executable, - proto_file_node.size.into(), - &mut blob_reader, - ) - .map_err(RenderError::NARWriterError)?; - } - } + nar_node + .file( + proto_file_node.executable, + proto_file_node.size.into(), + &mut blob_reader, + ) + .map_err(RenderError::NARWriterError)?; } proto::node::Node::Directory(proto_directory_node) => { let digest: [u8; 32] = diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index e891caa0268e..d1e98a5b7953 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,54 +1,34 @@ -use std::collections::VecDeque; - use crate::{ - blobservice::BlobService, - chunkservice::{read_all_and_chunk, update_hasher, ChunkService}, - Error, + blobservice::{BlobService, BlobWriter}, + proto::sync_read_into_async_read::SyncReadIntoAsyncRead, }; use data_encoding::BASE64; -use tokio::{sync::mpsc::channel, task}; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use std::{collections::VecDeque, io, pin::Pin}; +use tokio::task; +use tokio_stream::StreamExt; +use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{debug, instrument, warn}; +use tracing::{instrument, warn}; -pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> { +pub struct GRPCBlobServiceWrapper<BS: BlobService> { blob_service: BS, - chunk_service: CS, } -impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> { - pub fn new(blob_service: BS, chunk_service: CS) -> Self { +impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> { + fn from(value: BS) -> Self { Self { - blob_service, - chunk_service, - } - } - - // upload the chunk to the chunk service, and return its digest (or an error) when done. - #[instrument(skip(chunk_service))] - fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> { - let mut hasher = blake3::Hasher::new(); - update_hasher(&mut hasher, &chunk_data); - let digest = hasher.finalize(); - - if chunk_service.has(digest.as_bytes())? { - debug!("already has chunk, skipping"); + blob_service: value, } - let digest_resp = chunk_service.put(chunk_data)?; - - assert_eq!(&digest_resp, digest.as_bytes()); - - Ok(digest.as_bytes().to_vec()) } } #[async_trait] -impl< - BS: BlobService + Send + Sync + Clone + 'static, - CS: ChunkService + Send + Sync + Clone + 'static, - > super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS> +impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService + for GRPCBlobServiceWrapper<BS> { - type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>; + // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 + type ReadStream = + Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>; #[instrument(skip(self))] async fn stat( @@ -56,12 +36,22 @@ impl< request: Request<super::StatBlobRequest>, ) -> Result<Response<super::BlobMeta>, Status> { let rq = request.into_inner(); - match self.blob_service.stat(&rq) { - Ok(None) => Err(Status::not_found(format!( + let req_digest: [u8; 32] = rq + .digest + .clone() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + if rq.include_chunks || rq.include_bao { + return Err(Status::internal("not implemented")); + } + + match self.blob_service.has(&req_digest) { + Ok(true) => Ok(Response::new(super::BlobMeta::default())), + Ok(false) => Err(Status::not_found(format!( "blob {} not found", - BASE64.encode(&rq.digest) + BASE64.encode(&req_digest) ))), - Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)), Err(e) => Err(e.into()), } } @@ -71,99 +61,38 @@ impl< &self, request: Request<super::ReadBlobRequest>, ) -> Result<Response<Self::ReadStream>, Status> { - let req = request.into_inner(); - let (tx, rx) = channel(5); + let rq = request.into_inner(); - let req_digest: [u8; 32] = req + let req_digest: [u8; 32] = rq .digest + .clone() .try_into() - .map_err(|_e| Status::invalid_argument("invalid digest length"))?; - - // query the blob service for more detailed blob info - let stat_resp = self.blob_service.stat(&super::StatBlobRequest { - digest: req_digest.to_vec(), - include_chunks: true, - ..Default::default() - })?; - - match stat_resp { - None => { - // If the stat didn't return any blobmeta, the client might - // still have asked for a single chunk to be read. - // Check the chunkstore. - if let Some(data) = self.chunk_service.get(&req_digest)? { - // We already know the hash matches, and contrary to - // iterating over a blobmeta, we can't know the size, - // so send the contents of that chunk over, - // as the first (and only) element of the stream. - task::spawn(async move { - let res = Ok(super::BlobChunk { data }); - // send the result to the client. If the client already left, that's also fine. - if (tx.send(res).await).is_err() { - debug!("receiver dropped"); - } - }); - } else { - return Err(Status::not_found(format!( - "blob {} not found", - BASE64.encode(&req_digest), - ))); - } - } - Some(blobmeta) => { - let chunk_client = self.chunk_service.clone(); - - // TODO: use BlobReader? - // But then we might not be able to send compressed chunks as-is. - // Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it - // first, so we can .next().await in here. - - task::spawn(async move { - for chunkmeta in blobmeta.chunks { - // request chunk. - // We don't need to validate the digest again, as - // that's required for all implementations of ChunkService. - // TODO: handle error - let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap(); - let res = match chunk_client.get(chunkmeta_digest) { - Err(e) => Err(e.into()), - // TODO: make this a separate error type - Ok(None) => Err(Error::StorageError(format!( - "consistency error: chunk {} for blob {} not found", - BASE64.encode(chunkmeta_digest), - BASE64.encode(&req_digest), - )) - .into()), - Ok(Some(data)) => { - // We already know the hash matches, but also - // check the size matches what chunkmeta said. - if data.len() as u32 != chunkmeta.size { - Err(Error::StorageError(format!( - "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}", - BASE64.encode(chunkmeta_digest), - BASE64.encode(&req_digest), - chunkmeta.size, - data.len(), - )).into()) - } else { - // send out the current chunk - // TODO: we might want to break this up further if too big? - Ok(super::BlobChunk { data }) - } - } - }; - // send the result to the client - if (tx.send(res).await).is_err() { - debug!("receiver dropped"); - break; - } + .map_err(|_| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.open_read(&req_digest) { + Ok(Some(reader)) => { + let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into(); + + fn stream_mapper( + x: Result<bytes::Bytes, io::Error>, + ) -> Result<super::BlobChunk, Status> { + match x { + Ok(bytes) => Ok(super::BlobChunk { + data: bytes.to_vec(), + }), + Err(e) => Err(Status::from(e)), } - }); + } + + let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper); + Ok(Response::new(Box::pin(chunks_stream))) } + Ok(None) => Err(Status::not_found(format!( + "blob {} not found", + BASE64.encode(&rq.digest) + ))), + Err(e) => Err(e.into()), } - - let receiver_stream = ReceiverStream::new(rx); - Ok(Response::new(receiver_stream)) } #[instrument(skip(self))] @@ -180,37 +109,34 @@ impl< let data_reader = tokio_util::io::StreamReader::new(data_stream); - // TODO: can we get rid of this clone? - let chunk_service = self.chunk_service.clone(); - - let (blob_digest, blob_meta) = - task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> { - // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream. - read_all_and_chunk( - &chunk_service, - tokio_util::io::SyncIoBridge::new(data_reader), - ) - }) - .await - .map_err(|e| Status::internal(e.to_string()))??; - - // upload blobmeta if not there yet - if self + // prepare a writer, which we'll use in the blocking task below. + let mut writer = self .blob_service - .stat(&super::StatBlobRequest { - digest: blob_digest.to_vec(), - include_chunks: false, - include_bao: false, - })? - .is_none() - { - // upload blobmeta - self.blob_service.put(&blob_digest, blob_meta)?; - } - - // return to client. - Ok(Response::new(super::PutBlobResponse { - digest: blob_digest, - })) + .open_write() + .map_err(|e| Status::internal(format!("unable to open for write: {}", e)))?; + + let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> { + // construct a sync reader to the data + let mut reader = tokio_util::io::SyncIoBridge::new(data_reader); + + io::copy(&mut reader, &mut writer).map_err(|e| { + warn!("error copying: {}", e); + Status::internal("error copying") + })?; + + let digest = writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") + })? + .to_vec(); + + Ok(super::PutBlobResponse { digest }) + }) + .await + .map_err(|_| Status::internal("failed to wait for task"))??; + + Ok(Response::new(result)) } } diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index 5002d7e77a96..62741a3ad508 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -11,6 +11,8 @@ mod grpc_blobservice_wrapper; mod grpc_directoryservice_wrapper; mod grpc_pathinfoservice_wrapper; +mod sync_read_into_async_read; + pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper; diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs new file mode 100644 index 000000000000..0a0ef019781c --- /dev/null +++ b/tvix/store/src/proto/sync_read_into_async_read.rs @@ -0,0 +1,158 @@ +use bytes::Buf; +use core::task::Poll::Ready; +use futures::ready; +use futures::Future; +use std::io; +use std::io::Read; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncRead; +use tokio::runtime::Handle; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; + +#[derive(Debug)] +enum State<Buf: bytes::Buf + bytes::BufMut> { + Idle(Option<Buf>), + Busy(JoinHandle<(io::Result<usize>, Buf)>), +} + +use State::{Busy, Idle}; + +/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a +/// synchronous API. +#[derive(Debug)] +pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> { + state: Mutex<State<Buf>>, + reader: Arc<Mutex<R>>, + rt: Handle, +} + +impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> { + /// This must be called from within a Tokio runtime context, or else it will panic. + #[track_caller] + pub fn new(rt: Handle, reader: R) -> Self { + Self { + rt, + state: State::Idle(None).into(), + reader: Arc::new(reader.into()), + } + } + + /// This must be called from within a Tokio runtime context, or else it will panic. + pub fn new_with_reader(readable: R) -> Self { + Self::new(Handle::current(), readable) + } +} + +/// Repeats operations that are interrupted. +macro_rules! uninterruptibly { + ($e:expr) => {{ + loop { + match $e { + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + res => break res, + } + } + }}; +} + +impl< + R: Read + Send + 'static + std::marker::Unpin, + Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static, + > AsyncRead for SyncReadIntoAsyncRead<R, Buf> +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + dst: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + let me = self.get_mut(); + // Do we need this mutex? + let state = me.state.get_mut(); + + loop { + match state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap_or_default(); + + if buf.has_remaining() { + // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]` + // The `rest` is stuffed into the `buf_cell` for further poll_read. + // The other is completely consumed into the unfilled destination. + // `rest` can be empty. + let mut adjusted_src = + buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining())); + let copied_size = adjusted_src.remaining(); + adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size)); + dst.set_filled(copied_size); + *buf_cell = Some(buf); + return Ready(Ok(())); + } + + let reader = me.reader.clone(); + *state = Busy(me.rt.spawn_blocking(move || { + let result = uninterruptibly!(reader.blocking_lock().read( + // SAFETY: `reader.read` will *ONLY* write initialized bytes + // and never *READ* uninitialized bytes + // inside this buffer. + // + // Furthermore, casting the slice as `*mut [u8]` + // is safe because it has the same layout. + // + // Finally, the pointer obtained is valid and owned + // by `buf` only as we have a valid mutable reference + // to it, it is valid for write. + // + // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998 + unsafe { + &mut *(buf.chunk_mut().as_uninit_slice_mut() + as *mut [std::mem::MaybeUninit<u8>] + as *mut [u8]) + } + )); + + if let Ok(n) = result { + // SAFETY: given we initialize `n` bytes, we can move `n` bytes + // forward. + unsafe { + buf.advance_mut(n); + } + } + + (result, buf) + })); + } + Busy(ref mut rx) => { + let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?; + + match result { + Ok(n) => { + if n > 0 { + let remaining = std::cmp::min(n, dst.remaining()); + let mut adjusted_src = buf.copy_to_bytes(remaining); + adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining)); + dst.advance(remaining); + } + *state = Idle(Some(buf)); + return Ready(Ok(())); + } + Err(e) => { + *state = Idle(None); + return Ready(Err(e)); + } + } + } + } + } + } +} + +impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> { + /// This must be called from within a Tokio runtime context, or else it will panic. + fn from(value: R) -> Self { + Self::new_with_reader(value) + } +} diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs index 88eb21e30241..80e2e71867a3 100644 --- a/tvix/store/src/proto/tests/grpc_blobservice.rs +++ b/tvix/store/src/proto/tests/grpc_blobservice.rs @@ -1,18 +1,14 @@ use crate::blobservice::BlobService; -use crate::chunkservice::ChunkService; -use crate::proto::blob_meta::ChunkMeta; use crate::proto::blob_service_server::BlobService as GRPCBlobService; use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest}; -use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST}; -use crate::tests::utils::{gen_blob_service, gen_chunk_service}; +use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST}; +use crate::tests::utils::gen_blob_service; +use tokio_stream::StreamExt; -fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper< - impl BlobService + Send + Sync + Clone + 'static, - impl ChunkService + Send + Sync + Clone + 'static, -> { +fn gen_grpc_blob_service( +) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> { let blob_service = gen_blob_service(); - let chunk_service = gen_chunk_service(); - GRPCBlobServiceWrapper::new(blob_service, chunk_service) + GRPCBlobServiceWrapper::from(blob_service) } /// Trying to read a non-existent blob should return a not found error. @@ -26,8 +22,13 @@ async fn not_found_read() { })) .await; - let e = resp.expect_err("must_be_err"); - assert_eq!(e.code(), tonic::Code::NotFound); + // We can't use unwrap_err here, because the Ok value doesn't implement + // debug. + if let Err(e) = resp { + assert_eq!(e.code(), tonic::Code::NotFound); + } else { + panic!("resp is not err") + } } /// Trying to stat a non-existent blob should return a not found error. @@ -47,8 +48,7 @@ async fn not_found_stat() { assert_eq!(resp.code(), tonic::Code::NotFound); } -/// Put a blob in the store, get it back. We send something small enough so it -/// won't get split into multiple chunks. +/// Put a blob in the store, get it back. #[tokio::test] async fn put_read_stat() { let service = gen_grpc_blob_service(); @@ -64,39 +64,30 @@ async fn put_read_stat() { assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest); - // Stat for the digest of A. It should return one chunk. + // Stat for the digest of A. + // We currently don't ask for more granular chunking data, as we don't + // expose it yet. let resp = service .stat(tonic::Request::new(StatBlobRequest { digest: BLOB_A_DIGEST.to_vec(), - include_chunks: true, ..Default::default() })) .await .expect("must succeed") .into_inner(); - assert_eq!(1, resp.chunks.len()); - // the `chunks` field should point to the single chunk. - assert_eq!( - vec![ChunkMeta { - digest: BLOB_A_DIGEST.to_vec(), - size: BLOB_A.len() as u32, - }], - resp.chunks, - ); - - // Read the chunk. It should return the same data. + // Read the blob. It should return the same data. let resp = service .read(tonic::Request::new(ReadBlobRequest { digest: BLOB_A_DIGEST.to_vec(), })) .await; - let mut rx = resp.expect("must succeed").into_inner().into_inner(); + let mut rx = resp.ok().unwrap().into_inner(); // the stream should contain one element, a BlobChunk with the same contents as BLOB_A. let item = rx - .recv() + .next() .await .expect("must be some") .expect("must succeed"); @@ -104,127 +95,8 @@ async fn put_read_stat() { assert_eq!(BLOB_A.to_vec(), item.data); // … and no more elements - assert!(rx.recv().await.is_none()); -} - -/// Put a bigger blob in the store, and get it back. -/// Assert the stat request actually returns more than one chunk, and -/// we can read each chunk individually, as well as the whole blob via the -/// `read()` method. -#[tokio::test] -async fn put_read_stat_large() { - let service = gen_grpc_blob_service(); - - // split up BLOB_B into BlobChunks containing 1K bytes each. - let blob_b_blobchunks: Vec<BlobChunk> = BLOB_B - .chunks(1024) - .map(|x| BlobChunk { data: x.to_vec() }) - .collect(); - - assert!(blob_b_blobchunks.len() > 1); + assert!(rx.next().await.is_none()); - // Send blob B - let put_resp = service - .put(tonic_mock::streaming_request(blob_b_blobchunks)) - .await - .expect("must succeed") - .into_inner(); - - assert_eq!(BLOB_B_DIGEST.to_vec(), put_resp.digest); - - // Stat for the digest of B - let resp = service - .stat(tonic::Request::new(StatBlobRequest { - digest: BLOB_B_DIGEST.to_vec(), - include_chunks: true, - ..Default::default() - })) - .await - .expect("must succeed") - .into_inner(); - - // it should return more than one chunk. - assert_ne!(1, resp.chunks.len()); - - // The size added up should equal the size of BLOB_B. - let mut size_in_stat: u32 = 0; - for chunk in &resp.chunks { - size_in_stat += chunk.size - } - assert_eq!(BLOB_B.len() as u32, size_in_stat); - - // Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values. - // TODO: make the chunker config better accessible, so we don't need to synchronize this. - { - let chunker_avg_size = 64 * 1024; - let chunker_min_size = chunker_avg_size / 4; - let chunker_max_size = chunker_avg_size * 4; - - // initialize a chunker with the current buffer - let blob_b = BLOB_B.to_vec(); - let chunker = fastcdc::v2020::FastCDC::new( - &blob_b, - chunker_min_size, - chunker_avg_size, - chunker_max_size, - ); - - let mut num_chunks = 0; - for (i, chunk) in chunker.enumerate() { - assert_eq!( - resp.chunks[i].size, chunk.length as u32, - "expected locally-chunked chunk length to match stat response" - ); - - num_chunks += 1; - } - - assert_eq!( - resp.chunks.len(), - num_chunks, - "expected number of chunks to match" - ); - } - - // Reading the whole blob by its digest via the read() interface should succeed. - { - let resp = service - .read(tonic::Request::new(ReadBlobRequest { - digest: BLOB_B_DIGEST.to_vec(), - })) - .await; - - let mut rx = resp.expect("must succeed").into_inner().into_inner(); - - let mut buf: Vec<u8> = Vec::new(); - while let Some(item) = rx.recv().await { - let mut blob_chunk = item.expect("must not be err"); - buf.append(&mut blob_chunk.data); - } - - assert_eq!(BLOB_B.to_vec(), buf); - } - - // Reading the whole blob by reading individual chunks should also succeed. - { - let mut buf: Vec<u8> = Vec::new(); - for chunk in &resp.chunks { - // request this individual chunk via read - let resp = service - .read(tonic::Request::new(ReadBlobRequest { - digest: chunk.digest.clone(), - })) - .await; - - let mut rx = resp.expect("must succeed").into_inner().into_inner(); - - // append all items from the stream to the buffer - while let Some(item) = rx.recv().await { - let mut blob_chunk = item.expect("must not be err"); - buf.append(&mut blob_chunk.data); - } - } - // finished looping over all chunks, compare - assert_eq!(BLOB_B.to_vec(), buf); - } + // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks. + // Test with some bigger blob too } diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index f74c3fda0213..11cab2c264cc 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -6,12 +6,10 @@ use crate::proto::GRPCPathInfoServiceWrapper; use crate::proto::PathInfo; use crate::proto::{GetPathInfoRequest, Node, SymlinkNode}; use crate::tests::fixtures::DUMMY_OUTPUT_HASH; -use crate::tests::utils::{ - gen_blob_service, gen_chunk_service, gen_directory_service, gen_pathinfo_service, -}; +use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; use tonic::Request; -/// generates a GRPCPathInfoService out of blob, chunk, directory and pathinfo services. +/// generates a GRPCPathInfoService out of blob, directory and pathinfo services. /// /// We only interact with it via the PathInfo GRPC interface. /// It uses the NonCachingNARCalculationService NARCalculationService to @@ -19,11 +17,7 @@ use tonic::Request; fn gen_grpc_service() -> impl GRPCPathInfoService { GRPCPathInfoServiceWrapper::new( gen_pathinfo_service(), - NonCachingNARCalculationService::new( - gen_blob_service(), - gen_chunk_service(), - gen_directory_service(), - ), + NonCachingNARCalculationService::new(gen_blob_service(), gen_directory_service()), ) } diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs index 3a48df9e33c8..ed5154d6fec0 100644 --- a/tvix/store/src/tests/import.rs +++ b/tvix/store/src/tests/import.rs @@ -1,4 +1,4 @@ -use super::utils::{gen_blob_service, gen_chunk_service, gen_directory_service}; +use super::utils::{gen_blob_service, gen_directory_service}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::import::import_path; @@ -21,7 +21,6 @@ fn symlink() { let root_node = import_path( &mut gen_blob_service(), - &mut gen_chunk_service(), &mut gen_directory_service(), tmpdir.path().join("doesntmatter"), ) @@ -46,7 +45,6 @@ fn single_file() { let root_node = import_path( &mut blob_service, - &mut gen_chunk_service(), &mut gen_directory_service(), tmpdir.path().join("root"), ) @@ -64,13 +62,8 @@ fn single_file() { // ensure the blob has been uploaded assert!(blob_service - .stat(&proto::StatBlobRequest { - digest: HELLOWORLD_BLOB_DIGEST.to_vec(), - include_chunks: false, - ..Default::default() - }) - .unwrap() - .is_some()); + .has(&HELLOWORLD_BLOB_DIGEST.to_vec().try_into().unwrap()) + .unwrap()); } #[test] @@ -89,13 +82,8 @@ fn complicated() { let mut blob_service = gen_blob_service(); let mut directory_service = gen_directory_service(); - let root_node = import_path( - &mut blob_service, - &mut gen_chunk_service(), - &mut directory_service, - tmpdir.path(), - ) - .expect("must succeed"); + let root_node = import_path(&mut blob_service, &mut directory_service, tmpdir.path()) + .expect("must succeed"); // ensure root_node matched expectations assert_eq!( @@ -124,11 +112,6 @@ fn complicated() { // ensure EMPTY_BLOB_CONTENTS has been uploaded assert!(blob_service - .stat(&proto::StatBlobRequest { - digest: EMPTY_BLOB_DIGEST.to_vec(), - include_chunks: false, - include_bao: false - }) - .unwrap() - .is_some()); + .has(&EMPTY_BLOB_DIGEST.to_vec().try_into().unwrap()) + .unwrap()); } diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 2d612d9c273f..b5dbf595960f 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -1,21 +1,17 @@ use crate::blobservice::BlobService; -use crate::chunkservice::ChunkService; +use crate::blobservice::BlobWriter; use crate::directoryservice::DirectoryService; use crate::nar::NARRenderer; -use crate::proto; use crate::proto::DirectoryNode; use crate::proto::FileNode; use crate::proto::SymlinkNode; use crate::tests::fixtures::*; use crate::tests::utils::*; +use std::io; #[test] fn single_symlink() { - let renderer = NARRenderer::new( - gen_blob_service(), - gen_chunk_service(), - gen_directory_service(), - ); + let renderer = NARRenderer::new(gen_blob_service(), gen_directory_service()); // don't put anything in the stores, as we don't actually do any requests. let mut buf: Vec<u8> = vec![]; @@ -37,11 +33,7 @@ fn single_symlink() { /// match what's in the store. #[test] fn single_file_missing_blob() { - let renderer = NARRenderer::new( - gen_blob_service(), - gen_chunk_service(), - gen_directory_service(), - ); + let renderer = NARRenderer::new(gen_blob_service(), gen_directory_service()); let mut buf: Vec<u8> = vec![]; let e = renderer @@ -56,10 +48,11 @@ fn single_file_missing_blob() { ) .expect_err("must fail"); - if let crate::nar::RenderError::BlobNotFound(actual_digest, _) = e { - assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), actual_digest); - } else { - panic!("unexpected error") + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::NotFound, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), } } @@ -68,84 +61,79 @@ fn single_file_missing_blob() { #[test] fn single_file_wrong_blob_size() { let blob_service = gen_blob_service(); - let chunk_service = gen_chunk_service(); - // insert blob and chunk into the stores - chunk_service - .put(HELLOWORLD_BLOB_CONTENTS.to_vec()) - .unwrap(); - - blob_service - .put( - &HELLOWORLD_BLOB_DIGEST, - proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { + // insert blob into the store + let mut writer = blob_service.open_write().unwrap(); + io::copy( + &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .unwrap(); + assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), writer.close().unwrap()); + + let renderer = NARRenderer::new(blob_service, gen_directory_service()); + + // Test with a root FileNode of a too big size + { + let mut buf: Vec<u8> = vec![]; + let e = renderer + .write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".to_string(), digest: HELLOWORLD_BLOB_DIGEST.to_vec(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u32, - }], - ..Default::default() - }, - ) - .unwrap(); - - let renderer = NARRenderer::new(blob_service, chunk_service, gen_directory_service()); - let mut buf: Vec<u8> = vec![]; - - let e = renderer - .write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".to_string(), - digest: HELLOWORLD_BLOB_DIGEST.to_vec(), - size: 42, // <- note the wrong size here! - executable: false, - }), - ) - .expect_err("must fail"); + size: 42, // <- note the wrong size here! + executable: false, + }), + ) + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::UnexpectedEof, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } + } - if let crate::nar::RenderError::UnexpectedBlobMeta(digest, _, expected_size, actual_size) = e { - assert_eq!( - digest, - HELLOWORLD_BLOB_DIGEST.to_vec(), - "expect digest to match" - ); - assert_eq!( - expected_size, 42, - "expected expected size to be what's passed in the request" - ); - assert_eq!( - actual_size, - HELLOWORLD_BLOB_CONTENTS.len() as u32, - "expected actual size to be correct" - ); - } else { - panic!("unexpected error") + // Test with a root FileNode of a too small size + { + let mut buf: Vec<u8> = vec![]; + let e = renderer + .write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".to_string(), + digest: HELLOWORLD_BLOB_DIGEST.to_vec(), + size: 2, // <- note the wrong size here! + executable: false, + }), + ) + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::InvalidInput, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } } } #[test] fn single_file() { let blob_service = gen_blob_service(); - let chunk_service = gen_chunk_service(); - chunk_service - .put(HELLOWORLD_BLOB_CONTENTS.to_vec()) - .unwrap(); + // insert blob into the store + let mut writer = blob_service.open_write().unwrap(); + io::copy( + &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .unwrap(); + assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), writer.close().unwrap()); - blob_service - .put( - &HELLOWORLD_BLOB_DIGEST, - proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: HELLOWORLD_BLOB_DIGEST.to_vec(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u32, - }], - ..Default::default() - }, - ) - .unwrap(); - - let renderer = NARRenderer::new(blob_service, chunk_service, gen_directory_service()); + let renderer = NARRenderer::new(blob_service, gen_directory_service()); let mut buf: Vec<u8> = vec![]; renderer @@ -166,31 +154,24 @@ fn single_file() { #[test] fn test_complicated() { let blob_service = gen_blob_service(); - let chunk_service = gen_chunk_service(); let directory_service = gen_directory_service(); // put all data into the stores. - let digest = chunk_service.put(EMPTY_BLOB_CONTENTS.to_vec()).unwrap(); - - blob_service - .put( - &digest, - proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: digest.to_vec(), - size: EMPTY_BLOB_CONTENTS.len() as u32, - }], - ..Default::default() - }, - ) - .unwrap(); + // insert blob into the store + let mut writer = blob_service.open_write().unwrap(); + io::copy( + &mut io::Cursor::new(EMPTY_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .unwrap(); + assert_eq!(EMPTY_BLOB_DIGEST.to_vec(), writer.close().unwrap()); directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap(); directory_service .put(DIRECTORY_COMPLICATED.clone()) .unwrap(); - let renderer = NARRenderer::new(blob_service, chunk_service, directory_service); + let renderer = NARRenderer::new(blob_service, directory_service); let mut buf: Vec<u8> = vec![]; renderer diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs index 916e6516216c..2991feed41db 100644 --- a/tvix/store/src/tests/utils.rs +++ b/tvix/store/src/tests/utils.rs @@ -1,6 +1,5 @@ use crate::{ blobservice::{BlobService, MemoryBlobService}, - chunkservice::{ChunkService, MemoryChunkService}, directoryservice::{DirectoryService, MemoryDirectoryService}, pathinfoservice::{MemoryPathInfoService, PathInfoService}, }; @@ -9,10 +8,6 @@ pub fn gen_blob_service() -> impl BlobService + Send + Sync + Clone + 'static { MemoryBlobService::default() } -pub fn gen_chunk_service() -> impl ChunkService + Clone { - MemoryChunkService::default() -} - pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + 'static { MemoryDirectoryService::default() } |