diff options
author | Florian Klink <flokli@flokli.de> | 2023-05-11T12·49+0300 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-05-11T14·27+0000 |
commit | 616fa4476f93e1782e68dc713e9e8cb77a426c7d (patch) | |
tree | f76a43e95c75d848d079706fbccfd442210ebebc /tvix/store/src | |
parent | b22b685f0b2524c088deacbf4e80e7b7c73b5afc (diff) |
refactor(tvix/store): remove ChunkService r/6133
Whether chunking is involved or not, is an implementation detail of each Blobstore. Consumers of a whole blob shouldn't need to worry about that. It currently is not visible in the gRPC interface either. It shouldn't bleed into everything. Let the BlobService trait provide `open_read` and `open_write` methods, which return handles providing io::Read or io::Write, and leave the details up to the implementation. This means, our custom BlobReader module can go away, and all the chunking bits in there, too. In the future, we might still want to add more chunking-aware syncing, but as a syncing strategy some stores can expose, not as a fundamental protocol component. This currently needs "SyncReadIntoAsyncRead", taken and vendored in from https://github.com/tokio-rs/tokio/pull/5669. It provides a AsyncRead for a sync Read, which is necessary to connect our (sync) BlobReader interface to a GRPC server implementation. As an alternative, we could also make the BlobReader itself async, and let consumers of the trait (EvalIO) deal with the async-ness, but this is less of a change for now. In terms of vendoring, I initially tried to move our tokio crate to these commits, but ended up in version incompatibilities, so let's vendor it in for now. Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551 Tested-by: BuildkiteCI Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/store/src')
21 files changed, 553 insertions, 1251 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 72814fa530e7..29f8a92cb1be 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -8,7 +8,6 @@ use nix_compat::nixhash::NixHashWithMode; use std::path::PathBuf; use tracing_subscriber::prelude::*; use tvix_store::blobservice::SledBlobService; -use tvix_store::chunkservice::SledChunkService; use tvix_store::directoryservice::SledDirectoryService; use tvix_store::import::import_path; use tvix_store::nar::NARCalculationService; @@ -87,7 +86,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { // initialize stores let mut blob_service = SledBlobService::new("blobs.sled".into())?; - let mut chunk_service = SledChunkService::new("chunks.sled".into())?; let mut directory_service = SledDirectoryService::new("directories.sled".into())?; let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?; @@ -102,15 +100,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let nar_calculation_service = NonCachingNARCalculationService::new( blob_service.clone(), - chunk_service.clone(), directory_service.clone(), ); #[allow(unused_mut)] let mut router = server - .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( + .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( blob_service, - chunk_service, ))) .add_service(DirectoryServiceServer::new( GRPCDirectoryServiceWrapper::from(directory_service), @@ -135,17 +131,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { Commands::Import { paths } => { let nar_calculation_service = NonCachingNARCalculationService::new( blob_service.clone(), - chunk_service.clone(), directory_service.clone(), ); for path in paths { - let root_node = import_path( - &mut blob_service, - &mut chunk_service, - &mut directory_service, - &path, - )?; + let root_node = import_path(&mut blob_service, &mut directory_service, &path)?; let nar_hash = NixHashWithMode::Recursive(NixHash::new( HashAlgo::Sha256, diff --git a/tvix/store/src/blobreader.rs b/tvix/store/src/blobreader.rs deleted file mode 100644 index ff0efdf15237..000000000000 --- a/tvix/store/src/blobreader.rs +++ /dev/null @@ -1,376 +0,0 @@ -use std::io::{self, Cursor, Read, Write}; - -use data_encoding::BASE64; - -use crate::{chunkservice::ChunkService, proto}; - -/// BlobReader implements reading of a blob, by querying individual chunks. -/// -/// It doesn't talk to BlobService, but assumes something has already fetched -/// blob_meta already. -pub struct BlobReader<'a, CS: ChunkService> { - // used to look up chunks - chunk_service: &'a CS, - - // internal iterator over chunk hashes and their sizes - chunks_iter: std::vec::IntoIter<proto::blob_meta::ChunkMeta>, - - // If a chunk was partially read (if buf.len() < chunk.size), - // a cursor to its contents are stored here. - current_chunk: Option<Cursor<Vec<u8>>>, -} - -impl<'a, CS: ChunkService> BlobReader<'a, CS> { - pub fn open(chunk_service: &'a CS, blob_meta: proto::BlobMeta) -> Self { - Self { - chunk_service, - chunks_iter: blob_meta.chunks.into_iter(), - current_chunk: None, - } - } - - /// reads (up to n bytes) from the current chunk into buf (if there is - /// a chunk). - /// - /// If it arrives at the end of the chunk, sets it back to None. - /// Returns a [std::io::Result<usize>] of the bytes read from the chunk. - fn read_from_current_chunk<W: std::io::Write>( - &mut self, - m: usize, - buf: &mut W, - ) -> std::io::Result<usize> { - // If there's still something in partial_chunk, read from there - // (up to m: usize bytes) and return the number of bytes read. - if let Some(current_chunk) = &mut self.current_chunk { - let result = io::copy(&mut current_chunk.take(m as u64), buf); - - match result { - Ok(n) => { - // if we were not able to read all off m bytes, - // this means we arrived at the end of the chunk. - if n < m as u64 { - self.current_chunk = None - } - - // n can never be > m, so downcasting this to usize is ok. - Ok(n as usize) - } - Err(e) => Err(e), - } - } else { - Ok(0) - } - } -} - -impl<CS: ChunkService> std::io::Read for BlobReader<'_, CS> { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { - let read_max = buf.len(); - let mut bytes_read = 0_usize; - let mut buf_w = std::io::BufWriter::new(buf); - - // read up to buf.len() bytes into buf, by reading from the current - // chunk and subsequent ones. - loop { - // try to fill buf with bytes from the current chunk - // (if there's still one) - let n = self.read_from_current_chunk(read_max - bytes_read, &mut buf_w)?; - bytes_read += n; - - // We want to make sure we don't accidentially read past more than - // we're allowed to. - assert!(bytes_read <= read_max); - - // buf is entirerly filled, we're done. - if bytes_read == read_max { - buf_w.flush()?; - break Ok(bytes_read); - } - - // Otherwise, bytes_read is < read_max, so we could still write - // more to buf. - // Check if we have more chunks to read from. - match self.chunks_iter.next() { - // No more chunks, we're done. - None => { - buf_w.flush()?; - return Ok(bytes_read); - } - // There's another chunk to visit, fetch its contents - Some(chunk_meta) => { - let chunk_meta_digest: [u8; 32] = - chunk_meta.digest.clone().try_into().map_err(|_e| { - std::io::Error::new( - io::ErrorKind::InvalidData, - format!( - "chunk in chunkmeta has wrong digest size, expected 32, got {}", - chunk_meta.digest.len(), - ), - ) - })?; - match self.chunk_service.get(&chunk_meta_digest) { - // Fetch successful, put it into `self.current_chunk` and restart the loop. - Ok(Some(chunk_data)) => { - // make sure the size matches what chunk_meta says as well. - if chunk_data.len() as u32 != chunk_meta.size { - break Err(std::io::Error::new( - io::ErrorKind::InvalidData, - format!( - "chunk_service returned chunk with wrong size for {}, expected {}, got {}", - BASE64.encode(&chunk_meta.digest), chunk_meta.size, chunk_data.len() - ) - )); - } - self.current_chunk = Some(Cursor::new(chunk_data)); - } - // Chunk requested does not exist - Ok(None) => { - break Err(std::io::Error::new( - io::ErrorKind::NotFound, - format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)), - )) - } - // Error occured while fetching the next chunk, propagate the error from the chunk service - Err(e) => { - break Err(std::io::Error::new(io::ErrorKind::InvalidData, e)); - } - } - } - } - } - } -} - -#[cfg(test)] -mod tests { - use super::BlobReader; - use crate::chunkservice::ChunkService; - use crate::proto; - use crate::tests::fixtures::DUMMY_DATA_1; - use crate::tests::fixtures::DUMMY_DATA_2; - use crate::tests::fixtures::DUMMY_DIGEST; - use crate::tests::utils::gen_chunk_service; - use std::io::Cursor; - use std::io::Read; - use std::io::Write; - - #[test] - /// reading from a blobmeta with zero chunks should produce zero bytes. - fn empty_blobmeta() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - let blobmeta = proto::BlobMeta { - chunks: vec![], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - let mut buf = Cursor::new(Vec::new()); - - let res = std::io::copy(&mut blob_reader, &mut buf); - - assert_eq!(0, res.unwrap()); - - Ok(()) - } - - #[test] - /// trying to read something where the chunk doesn't exist should fail - fn missing_chunk_fail() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: DUMMY_DIGEST.to_vec(), - size: 42, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - let mut buf = Cursor::new(Vec::new()); - - let res = std::io::copy(&mut blob_reader, &mut buf); - - assert!(res.is_err()); - - Ok(()) - } - - #[test] - /// read something containing the single (empty) chunk - fn empty_chunk() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert a single chunk - let dgst = chunk_service.put(vec![]).expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst.to_vec(), - size: 0, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - - let res = - std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed"); - - assert_eq!(res, 0, "number of bytes read must match"); - assert!(buf.is_empty(), "buf must be empty"); - - Ok(()) - } - - /// read something which contains a single chunk - #[test] - fn single_chunk() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert a single chunk - let dgst = chunk_service - .put(DUMMY_DATA_1.clone()) - .expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst.to_vec(), - size: 3, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - - let res = - std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed"); - - assert_eq!(res, 3, "number of bytes read must match"); - assert_eq!(DUMMY_DATA_1[..], buf[..], "data read must match"); - - Ok(()) - } - - /// read something referring to a chunk, but with wrong size - #[test] - fn wrong_size_fail() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert chunks - let dgst_1 = chunk_service - .put(DUMMY_DATA_1.clone()) - .expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst_1.to_vec(), - size: 42, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - - let res = std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)); - - assert!(res.is_err(), "reading must fail"); - - Ok(()) - } - - /// read something referring to multiple chunks - #[test] - fn multiple_chunks() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert chunks - let dgst_1 = chunk_service - .put(DUMMY_DATA_1.clone()) - .expect("must succeed"); - let dgst_2 = chunk_service - .put(DUMMY_DATA_2.clone()) - .expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![ - proto::blob_meta::ChunkMeta { - digest: dgst_1.to_vec(), - size: 3, - }, - proto::blob_meta::ChunkMeta { - digest: dgst_2.to_vec(), - size: 2, - }, - proto::blob_meta::ChunkMeta { - digest: dgst_1.to_vec(), - size: 3, - }, - ], - inline_bao: vec![], - }; - - // assemble ecpected data - let mut expected_data: Vec<u8> = Vec::new(); - expected_data.extend_from_slice(&DUMMY_DATA_1[..]); - expected_data.extend_from_slice(&DUMMY_DATA_2[..]); - expected_data.extend_from_slice(&DUMMY_DATA_1[..]); - - // read via io::copy - { - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta.clone()); - - let mut buf: Vec<u8> = Vec::new(); - - let res = - std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed"); - - assert_eq!(8, res, "number of bytes read must match"); - - assert_eq!(expected_data[..], buf[..], "data read must match"); - } - - // now read the same thing again, but not via io::copy, but individually - { - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - let mut cursor = Cursor::new(&mut buf); - - let mut bytes_read = 0; - - loop { - let mut smallbuf = [0xff; 1]; - match blob_reader.read(&mut smallbuf) { - Ok(n) => { - if n == 0 { - break; - } - let w_b = cursor.write(&smallbuf).unwrap(); - assert_eq!(n, w_b); - bytes_read += w_b; - } - Err(_) => { - panic!("error occured during read"); - } - } - } - - assert_eq!(8, bytes_read, "number of bytes read must match"); - assert_eq!(expected_data[..], buf[..], "data read must match"); - } - - Ok(()) - } -} diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 57c86551e4de..9a796ca2c0c8 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -1,51 +1,80 @@ use data_encoding::BASE64; +use std::io::Cursor; use std::{ collections::HashMap, sync::{Arc, RwLock}, }; -use tracing::instrument; +use tracing::{instrument, warn}; -use crate::{proto, Error}; +use super::{BlobService, BlobWriter}; +use crate::Error; -use super::BlobService; +// type B3Digest = [u8; 32]; +// struct B3Digest ([u8; 32]); #[derive(Clone, Default)] pub struct MemoryBlobService { - db: Arc<RwLock<HashMap<Vec<u8>, proto::BlobMeta>>>, + db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, } impl BlobService for MemoryBlobService { - #[instrument(skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))] - fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> { - if req.include_bao { - todo!("not implemented yet") - } + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = MemoryBlobWriter; + #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))] + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { let db = self.db.read().unwrap(); - // if include_chunks is also false, the user only wants to know if the - // blob is present at all. - if !req.include_chunks { - Ok(if db.contains_key(&req.digest) { - Some(proto::BlobMeta::default()) - } else { - None - }) - } else { - match db.get(&req.digest) { - None => Ok(None), - Some(blob_meta) => Ok(Some(blob_meta.clone())), - } + Ok(db.contains_key(digest)) + } + + fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> { + let db = self.db.read().unwrap(); + + Ok(db.get(digest).map(|x| Cursor::new(x.clone()))) + } + + #[instrument(skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(MemoryBlobWriter::new(self.db.clone())) + } +} + +pub struct MemoryBlobWriter { + db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, + + buf: Vec<u8>, +} + +impl MemoryBlobWriter { + fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self { + Self { + buf: Vec::new(), + db, } } +} +impl std::io::Write for MemoryBlobWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + self.buf.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.buf.flush() + } +} - #[instrument(skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))] - fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> { - let mut db = self.db.write().unwrap(); +impl BlobWriter for MemoryBlobWriter { + fn close(self) -> Result<[u8; 32], Error> { + // in this memory implementation, we don't actually bother hashing + // incrementally while writing, but do it at the end. + let mut hasher = blake3::Hasher::new(); + hasher.update(&self.buf); + let digest: [u8; 32] = hasher.finalize().into(); - db.insert(blob_digest.to_vec(), blob_meta); + // open the database for writing. + let mut db = self.db.write()?; + db.insert(digest, self.buf); - Ok(()) - // TODO: make sure all callers make sure the chunks exist. - // TODO: where should we calculate the bao? + Ok(digest) } } diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index 53e941795e7e..e97ccdf335a0 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -1,4 +1,6 @@ -use crate::{proto, Error}; +use std::io; + +use crate::Error; mod memory; mod sled; @@ -7,14 +9,31 @@ pub use self::memory::MemoryBlobService; pub use self::sled::SledBlobService; /// The base trait all BlobService services need to implement. -/// It provides information about how a blob is chunked, -/// and allows creating new blobs by creating a BlobMeta (referring to chunks -/// in a [crate::chunkservice::ChunkService]). +/// It provides functions to check whether a given blob exists, +/// a way to get a [io::Read] to a blob, and a method to initiate writing a new +/// Blob, which returns a [BlobWriter], that can be used pub trait BlobService { - /// Retrieve chunking information for a given blob - fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error>; + type BlobReader: io::Read + Send + std::marker::Unpin; + type BlobWriter: BlobWriter + Send; + + /// Check if the service has the blob, by its content hash. + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>; + + /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>. + fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error>; + + /// Insert a new blob into the store. Returns a [BlobWriter], which + /// implements [io::Write] and a [BlobWriter::close]. + /// TODO: is there any reason we want this to be a Result<>, and not just T? + fn open_write(&self) -> Result<Self::BlobWriter, Error>; +} - /// Insert chunking information for a given blob. - /// Implementations SHOULD make sure chunks referred do exist. - fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error>; +/// A [io::Write] that you need to close() afterwards, and get back the digest +/// of the written blob. +pub trait BlobWriter: io::Write { + /// Signal there's no more data to be written, and return the digest of the + /// contents written. + /// + /// This consumes self, so it's not possible to close twice. + fn close(self) -> Result<[u8; 32], Error>; } diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 1ae34ee5fb9c..c229f799de98 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -1,13 +1,13 @@ -use std::path::PathBuf; +use std::{ + io::{self, Cursor}, + path::PathBuf, +}; +use super::{BlobService, BlobWriter}; +use crate::Error; use data_encoding::BASE64; -use prost::Message; use tracing::instrument; -use crate::{proto, Error}; - -use super::BlobService; - #[derive(Clone)] pub struct SledBlobService { db: sled::Db, @@ -30,44 +30,69 @@ impl SledBlobService { } impl BlobService for SledBlobService { - #[instrument(name = "SledBlobService::stat", skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))] - fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> { - if req.include_bao { - todo!("not implemented yet") + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = SledBlobWriter; + + #[instrument(name = "SledBlobService::has", skip(self), fields(blob.digest=BASE64.encode(digest)))] + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { + match self.db.contains_key(digest) { + Ok(has) => Ok(has), + Err(e) => Err(Error::StorageError(e.to_string())), } + } - // if include_chunks is also false, the user only wants to know if the - // blob is present at all. - if !req.include_chunks { - match self.db.contains_key(&req.digest) { - Ok(false) => Ok(None), - Ok(true) => Ok(Some(proto::BlobMeta::default())), - Err(e) => Err(Error::StorageError(e.to_string())), - } - } else { - match self.db.get(&req.digest) { - Ok(None) => Ok(None), - Ok(Some(data)) => match proto::BlobMeta::decode(&*data) { - Ok(blob_meta) => Ok(Some(blob_meta)), - Err(e) => Err(Error::StorageError(format!( - "unable to parse blobmeta message for blob {}: {}", - BASE64.encode(&req.digest), - e - ))), - }, - Err(e) => Err(Error::StorageError(e.to_string())), - } + #[instrument(name = "SledBlobService::open_read", skip(self), fields(blob.digest=BASE64.encode(digest)))] + fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> { + match self.db.get(digest) { + Ok(None) => Ok(None), + Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))), + Err(e) => Err(Error::StorageError(e.to_string())), } } - #[instrument(name = "SledBlobService::put", skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))] - fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> { - let result = self.db.insert(blob_digest, blob_meta.encode_to_vec()); - if let Err(e) = result { - return Err(Error::StorageError(e.to_string())); + #[instrument(name = "SledBlobService::open_write", skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(SledBlobWriter::new(self.db.clone())) + } +} + +pub struct SledBlobWriter { + db: sled::Db, + buf: Vec<u8>, + hasher: blake3::Hasher, +} + +impl SledBlobWriter { + pub fn new(db: sled::Db) -> Self { + Self { + buf: Vec::default(), + db, + hasher: blake3::Hasher::new(), } - Ok(()) - // TODO: make sure all callers make sure the chunks exist. - // TODO: where should we calculate the bao? + } +} + +impl io::Write for SledBlobWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + let bytes_written = self.buf.write(buf)?; + self.hasher.write(&buf[..bytes_written]) + } + + fn flush(&mut self) -> io::Result<()> { + self.buf.flush() + } +} + +impl BlobWriter for SledBlobWriter { + fn close(self) -> Result<[u8; 32], Error> { + let digest = self.hasher.finalize(); + self.db.insert(digest.as_bytes(), self.buf).map_err(|e| { + Error::StorageError(format!("unable to insert blob: {}", e.to_string())) + })?; + + Ok(digest + .to_owned() + .try_into() + .map_err(|_| Error::StorageError("invalid digest length in response".to_string()))?) } } diff --git a/tvix/store/src/chunkservice/memory.rs b/tvix/store/src/chunkservice/memory.rs deleted file mode 100644 index 9fe4dc17d5ec..000000000000 --- a/tvix/store/src/chunkservice/memory.rs +++ /dev/null @@ -1,53 +0,0 @@ -use data_encoding::BASE64; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; -use tracing::instrument; - -use crate::Error; - -use super::ChunkService; - -#[derive(Clone, Default)] -pub struct MemoryChunkService { - db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, -} - -impl ChunkService for MemoryChunkService { - #[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))] - fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { - let db = self.db.read().unwrap(); - Ok(db.get(digest).is_some()) - } - - #[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))] - fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> { - let db = self.db.read().unwrap(); - match db.get(digest) { - None => Ok(None), - Some(data) => { - // calculate the hash to verify this is really what we expect - let actual_digest = blake3::hash(data).as_bytes().to_vec(); - if actual_digest != digest { - return Err(Error::StorageError(format!( - "invalid hash encountered when reading chunk, expected {}, got {}", - BASE64.encode(digest), - BASE64.encode(&actual_digest), - ))); - } - Ok(Some(data.clone())) - } - } - } - - #[instrument(skip(self, data))] - fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> { - let digest = blake3::hash(&data); - - let mut db = self.db.write().unwrap(); - db.insert(*digest.as_bytes(), data); - - Ok(*digest.as_bytes()) - } -} diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs deleted file mode 100644 index faf0a88f151a..000000000000 --- a/tvix/store/src/chunkservice/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -mod util; - -pub mod memory; -pub mod sled; - -use crate::Error; - -pub use self::memory::MemoryChunkService; -pub use self::sled::SledChunkService; -pub use self::util::read_all_and_chunk; -pub use self::util::update_hasher; -pub use self::util::upload_chunk; - -/// The base trait all ChunkService services need to implement. -/// It allows checking for the existence, download and upload of chunks. -/// It's usually used after consulting a [crate::blobservice::BlobService] for -/// chunking information. -pub trait ChunkService { - /// check if the service has a chunk, given by its digest. - fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>; - - /// retrieve a chunk by its digest. Implementations MUST validate the digest - /// matches. - fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error>; - - /// insert a chunk. returns the digest of the chunk, or an error. - fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error>; -} diff --git a/tvix/store/src/chunkservice/sled.rs b/tvix/store/src/chunkservice/sled.rs deleted file mode 100644 index 8e86e1825b28..000000000000 --- a/tvix/store/src/chunkservice/sled.rs +++ /dev/null @@ -1,70 +0,0 @@ -use std::path::PathBuf; - -use data_encoding::BASE64; -use tracing::instrument; - -use crate::Error; - -use super::ChunkService; - -#[derive(Clone)] -pub struct SledChunkService { - db: sled::Db, -} - -impl SledChunkService { - pub fn new(p: PathBuf) -> Result<Self, sled::Error> { - let config = sled::Config::default().use_compression(true).path(p); - let db = config.open()?; - - Ok(Self { db }) - } - - pub fn new_temporary() -> Result<Self, sled::Error> { - let config = sled::Config::default().temporary(true); - let db = config.open()?; - - Ok(Self { db }) - } -} - -impl ChunkService for SledChunkService { - #[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))] - fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { - match self.db.get(digest) { - Ok(None) => Ok(false), - Ok(Some(_)) => Ok(true), - Err(e) => Err(Error::StorageError(e.to_string())), - } - } - - #[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))] - fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> { - match self.db.get(digest) { - Ok(None) => Ok(None), - Ok(Some(data)) => { - // calculate the hash to verify this is really what we expect - let actual_digest = blake3::hash(&data).as_bytes().to_vec(); - if actual_digest != digest { - return Err(Error::StorageError(format!( - "invalid hash encountered when reading chunk, expected {}, got {}", - BASE64.encode(digest), - BASE64.encode(&actual_digest), - ))); - } - Ok(Some(Vec::from(&*data))) - } - Err(e) => Err(Error::StorageError(e.to_string())), - } - } - - #[instrument(name = "SledChunkService::put", skip(self, data))] - fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> { - let digest = blake3::hash(&data); - let result = self.db.insert(digest.as_bytes(), data); - if let Err(e) = result { - return Err(Error::StorageError(e.to_string())); - } - Ok(*digest.as_bytes()) - } -} diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs deleted file mode 100644 index 2ad663733b04..000000000000 --- a/tvix/store/src/chunkservice/util.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::{proto, Error}; -use std::io::Read; -use tracing::{debug, instrument}; - -use super::ChunkService; - -/// uploads a chunk to a chunk service, and returns its digest (or an error) when done. -#[instrument(skip_all, err)] -pub fn upload_chunk<CS: ChunkService>( - chunk_service: &CS, - chunk_data: Vec<u8>, -) -> Result<[u8; 32], Error> { - let mut hasher = blake3::Hasher::new(); - update_hasher(&mut hasher, &chunk_data); - let digest = hasher.finalize(); - - if chunk_service.has(digest.as_bytes())? { - debug!("already has chunk, skipping"); - } - let digest_resp = chunk_service.put(chunk_data)?; - - assert_eq!(&digest_resp, digest.as_bytes()); - - Ok(*digest.as_bytes()) -} - -/// reads through a reader, writes chunks to a [ChunkService] and returns a -/// [proto::BlobMeta] pointing to all the chunks. -#[instrument(skip_all, err)] -pub fn read_all_and_chunk<CS: ChunkService, R: Read>( - chunk_service: &CS, - r: R, -) -> Result<(Vec<u8>, proto::BlobMeta), Error> { - let mut blob_meta = proto::BlobMeta::default(); - - // hash the file contents, upload chunks if not there yet - let mut blob_hasher = blake3::Hasher::new(); - - // TODO: play with chunking sizes - let chunker_avg_size = 64 * 1024; - let chunker_min_size = chunker_avg_size / 4; - let chunker_max_size = chunker_avg_size * 4; - - let chunker = - fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size); - - for chunking_result in chunker { - let chunk = chunking_result.unwrap(); - // TODO: convert to error::UnableToRead - - let chunk_len = chunk.data.len() as u32; - - // update calculate blob hash - update_hasher(&mut blob_hasher, &chunk.data); - - let chunk_digest = upload_chunk(chunk_service, chunk.data)?; - - blob_meta.chunks.push(proto::blob_meta::ChunkMeta { - digest: chunk_digest.to_vec(), - size: chunk_len, - }); - } - Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta)) -} - -/// updates a given hasher with more data. Uses rayon if the data is -/// sufficiently big. -/// -/// From the docs: -/// -/// To get any performance benefit from multithreading, the input buffer needs -/// to be large. As a rule of thumb on x86_64, update_rayon is slower than -/// update for inputs under 128 KiB. That threshold varies quite a lot across -/// different processors, and it’s important to benchmark your specific use -/// case. -/// -/// We didn't benchmark yet, so these numbers might need tweaking. -#[instrument(skip_all)] -pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) { - if data.len() > 128 * 1024 { - hasher.update_rayon(data); - } else { - hasher.update(data); - } -} diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index e62097ec468d..bf80eb4b71b9 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,19 +1,17 @@ -use crate::{chunkservice::read_all_and_chunk, directoryservice::DirectoryPutter, proto}; +use crate::{blobservice::BlobService, directoryservice::DirectoryService}; +use crate::{blobservice::BlobWriter, directoryservice::DirectoryPutter, proto}; use std::{ collections::HashMap, fmt::Debug, fs, fs::File, + io, os::unix::prelude::PermissionsExt, path::{Path, PathBuf}, }; use tracing::instrument; use walkdir::WalkDir; -use crate::{ - blobservice::BlobService, chunkservice::ChunkService, directoryservice::DirectoryService, -}; - #[derive(Debug, thiserror::Error)] pub enum Error { #[error("failed to upload directory at {0}: {1}")] @@ -57,9 +55,8 @@ impl From<super::Error> for Error { // // It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] -fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: DirectoryPutter>( +fn process_entry<BS: BlobService, DP: DirectoryPutter>( blob_service: &mut BS, - chunk_service: &mut CS, directory_putter: &mut DP, entry: &walkdir::DirEntry, maybe_directory: Option<proto::Directory>, @@ -112,23 +109,16 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire .metadata() .map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?; - let file = File::open(entry_path.clone()) + let mut file = File::open(entry_path.clone()) .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?; - let (blob_digest, blob_meta) = read_all_and_chunk(chunk_service, file)?; - - // upload blobmeta if not there yet - if blob_service - .stat(&proto::StatBlobRequest { - digest: blob_digest.to_vec(), - include_chunks: false, - include_bao: false, - })? - .is_none() - { - // upload blobmeta - blob_service.put(&blob_digest, blob_meta)?; - } + let mut writer = blob_service.open_write()?; + + if let Err(e) = io::copy(&mut file, &mut writer) { + return Err(Error::UnableToRead(entry_path, e)); + }; + + let digest = writer.close()?; return Ok(proto::node::Node::File(proto::FileNode { name: entry @@ -136,7 +126,7 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire .to_str() .map(|s| Ok(s.to_owned())) .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?, - digest: blob_digest, + digest: digest.to_vec(), size: metadata.len() as u32, // If it's executable by the user, it'll become executable. // This matches nix's dump() function behaviour. @@ -152,15 +142,9 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire /// to the PathInfoService. // // returns the root node, or an error. -#[instrument(skip(blob_service, chunk_service, directory_service), fields(path=?p))] -pub fn import_path< - BS: BlobService, - CS: ChunkService + std::marker::Sync, - DS: DirectoryService, - P: AsRef<Path> + Debug, ->( +#[instrument(skip(blob_service, directory_service), fields(path=?p))] +pub fn import_path<BS: BlobService, DS: DirectoryService, P: AsRef<Path> + Debug>( blob_service: &mut BS, - chunk_service: &mut CS, directory_service: &mut DS, p: P, ) -> Result<proto::node::Node, Error> { @@ -212,13 +196,7 @@ pub fn import_path< } }; - let node = process_entry( - blob_service, - chunk_service, - &mut directory_putter, - &entry, - maybe_directory, - )?; + let node = process_entry(blob_service, &mut directory_putter, &entry, maybe_directory)?; if entry.depth() == 0 { return Ok(node); diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index aac650b1ca8b..3ce826f98ba6 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,15 +1,12 @@ -mod blobreader; mod errors; pub mod blobservice; -pub mod chunkservice; pub mod directoryservice; pub mod import; pub mod nar; pub mod pathinfoservice; pub mod proto; -pub use blobreader::BlobReader; pub use errors::Error; #[cfg(test)] diff --git a/tvix/store/src/nar/non_caching_calculation_service.rs b/tvix/store/src/nar/non_caching_calculation_service.rs index f77f0b30d61f..94dd51bc6a7f 100644 --- a/tvix/store/src/nar/non_caching_calculation_service.rs +++ b/tvix/store/src/nar/non_caching_calculation_service.rs @@ -2,7 +2,6 @@ use count_write::CountWrite; use sha2::{Digest, Sha256}; use crate::blobservice::BlobService; -use crate::chunkservice::ChunkService; use crate::directoryservice::DirectoryService; use crate::proto; @@ -12,26 +11,20 @@ use super::{NARCalculationService, RenderError}; /// A NAR calculation service which simply renders the whole NAR whenever /// we ask for the calculation. #[derive(Clone)] -pub struct NonCachingNARCalculationService< - BS: BlobService, - CS: ChunkService + Clone, - DS: DirectoryService, -> { - nar_renderer: NARRenderer<BS, CS, DS>, +pub struct NonCachingNARCalculationService<BS: BlobService, DS: DirectoryService> { + nar_renderer: NARRenderer<BS, DS>, } -impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> - NonCachingNARCalculationService<BS, CS, DS> -{ - pub fn new(blob_service: BS, chunk_service: CS, directory_service: DS) -> Self { +impl<BS: BlobService, DS: DirectoryService> NonCachingNARCalculationService<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { Self { - nar_renderer: NARRenderer::new(blob_service, chunk_service, directory_service), + nar_renderer: NARRenderer::new(blob_service, directory_service), } } } -impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARCalculationService - for NonCachingNARCalculationService<BS, CS, DS> +impl<BS: BlobService, DS: DirectoryService> NARCalculationService + for NonCachingNARCalculationService<BS, DS> { fn calculate_nar( &self, diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index a061dad9bb35..b080a713ec0a 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -1,10 +1,11 @@ +use std::io::{self, BufReader}; + use crate::{ blobservice::BlobService, - chunkservice::ChunkService, directoryservice::DirectoryService, proto::{self, NamedNode}, - BlobReader, }; +use data_encoding::BASE64; use nix_compat::nar; use super::RenderError; @@ -12,17 +13,15 @@ use super::RenderError; /// A NAR renderer, using a blob_service, chunk_service and directory_service /// to render a NAR to a writer. #[derive(Clone)] -pub struct NARRenderer<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> { +pub struct NARRenderer<BS: BlobService, DS: DirectoryService> { blob_service: BS, - chunk_service: CS, directory_service: DS, } -impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARRenderer<BS, CS, DS> { - pub fn new(blob_service: BS, chunk_service: CS, directory_service: DS) -> Self { +impl<BS: BlobService, DS: DirectoryService> NARRenderer<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { Self { blob_service, - chunk_service, directory_service, } } @@ -65,49 +64,22 @@ impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARRendere )) })?; - // query blob_service for blob_meta - let resp = self - .blob_service - .stat(&proto::StatBlobRequest { - digest: digest.to_vec(), - include_chunks: true, - ..Default::default() - }) - .map_err(RenderError::StoreError)?; + // TODO: handle error + let mut blob_reader = match self.blob_service.open_read(&digest).unwrap() { + Some(blob_reader) => Ok(BufReader::new(blob_reader)), + None => Err(RenderError::NARWriterError(io::Error::new( + io::ErrorKind::NotFound, + format!("blob with digest {} not found", BASE64.encode(&digest)), + ))), + }?; - match resp { - // if it's None, that's an error! - None => { - return Err(RenderError::BlobNotFound( - digest.to_vec(), - proto_file_node.name.to_owned(), - )); - } - Some(blob_meta) => { - // make sure the blob_meta size matches what we expect from proto_file_node - let blob_meta_size = blob_meta.chunks.iter().fold(0, |acc, e| acc + e.size); - if blob_meta_size != proto_file_node.size { - return Err(RenderError::UnexpectedBlobMeta( - digest.to_vec(), - proto_file_node.name.to_owned(), - proto_file_node.size, - blob_meta_size, - )); - } - - let mut blob_reader = std::io::BufReader::new(BlobReader::open( - &self.chunk_service, - blob_meta, - )); - nar_node - .file( - proto_file_node.executable, - proto_file_node.size.into(), - &mut blob_reader, - ) - .map_err(RenderError::NARWriterError)?; - } - } + nar_node + .file( + proto_file_node.executable, + proto_file_node.size.into(), + &mut blob_reader, + ) + .map_err(RenderError::NARWriterError)?; } proto::node::Node::Directory(proto_directory_node) => { let digest: [u8; 32] = diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index e891caa0268e..d1e98a5b7953 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,54 +1,34 @@ -use std::collections::VecDeque; - use crate::{ - blobservice::BlobService, - chunkservice::{read_all_and_chunk, update_hasher, ChunkService}, - Error, + blobservice::{BlobService, BlobWriter}, + proto::sync_read_into_async_read::SyncReadIntoAsyncRead, }; use data_encoding::BASE64; -use tokio::{sync::mpsc::channel, task}; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use std::{collections::VecDeque, io, pin::Pin}; +use tokio::task; +use tokio_stream::StreamExt; +use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{debug, instrument, warn}; +use tracing::{instrument, warn}; -pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> { +pub struct GRPCBlobServiceWrapper<BS: BlobService> { blob_service: BS, - chunk_service: CS, } -impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> { - pub fn new(blob_service: BS, chunk_service: CS) -> Self { +impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> { + fn from(value: BS) -> Self { Self { - blob_service, - chunk_service, - } - } - - // upload the chunk to the chunk service, and return its digest (or an error) when done. - #[instrument(skip(chunk_service))] - fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> { - let mut hasher = blake3::Hasher::new(); - update_hasher(&mut hasher, &chunk_data); - let digest = hasher.finalize(); - - if chunk_service.has(digest.as_bytes())? { - debug!("already has chunk, skipping"); + blob_service: value, } - let digest_resp = chunk_service.put(chunk_data)?; - - assert_eq!(&digest_resp, digest.as_bytes()); - - Ok(digest.as_bytes().to_vec()) } } #[async_trait] -impl< - BS: BlobService + Send + Sync + Clone + 'static, - CS: ChunkService + Send + Sync + Clone + 'static, - > super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS> +impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService + for GRPCBlobServiceWrapper<BS> { - type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>; + // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 + type ReadStream = + Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>; #[instrument(skip(self))] async fn stat( @@ -56,12 +36,22 @@ impl< request: Request<super::StatBlobRequest>, ) -> Result<Response<super::BlobMeta>, Status> { let rq = request.into_inner(); - match self.blob_service.stat(&rq) { - Ok(None) => Err(Status::not_found(format!( + let req_digest: [u8; 32] = rq + .digest + .clone() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + if rq.include_chunks || rq.include_bao { + return Err(Status::internal("not implemented")); + } + + match self.blob_service.has(&req_digest) { + Ok(true) => Ok(Response::new(super::BlobMeta::default())), + Ok(false) => Err(Status::not_found(format!( "blob {} not found", - BASE64.encode(&rq.digest) + BASE64.encode(&req_digest) ))), - Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)), Err(e) => Err(e.into()), } } @@ -71,99 +61,38 @@ impl< &self, request: Request<super::ReadBlobRequest>, ) -> Result<Response<Self::ReadStream>, Status> { - let req = request.into_inner(); - let (tx, rx) = channel(5); + let rq = request.into_inner(); - let req_digest: [u8; 32] = req + let req_digest: [u8; 32] = rq .digest + .clone() .try_into() - .map_err(|_e| Status::invalid_argument("invalid digest length"))?; - - // query the blob service for more detailed blob info - let stat_resp = self.blob_service.stat(&super::StatBlobRequest { - digest: req_digest.to_vec(), - include_chunks: true, - ..Default::default() - })?; - - match stat_resp { - None => { - // If the stat didn't return any blobmeta, the client might - // still have asked for a single chunk to be read. - // Check the chunkstore. - if let Some(data) = self.chunk_service.get(&req_digest)? { - // We already know the hash matches, and contrary to - // iterating over a blobmeta, we can't know the size, - // so send the contents of that chunk over, - // as the first (and only) element of the stream. - task::spawn(async move { - let res = Ok(super::BlobChunk { data }); - // send the result to the client. If the client already left, that's also fine. - if (tx.send(res).await).is_err() { - debug!("receiver dropped"); - } - }); - } else { - return Err(Status::not_found(format!( - "blob {} not found", - BASE64.encode(&req_digest), - ))); - } - } - Some(blobmeta) => { - let chunk_client = self.chunk_service.clone(); - - // TODO: use BlobReader? - // But then we might not be able to send compressed chunks as-is. - // Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it - // first, so we can .next().await in here. - - task::spawn(async move { - for chunkmeta in blobmeta.chunks { - // request chunk. - // We don't need to validate the digest again, as - // that's required for all implementations of ChunkService. - // TODO: handle error - let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap(); - let res = match chunk_client.get(chunkmeta_digest) { - Err(e) => Err(e.into()), - // TODO: make this a separate error type - Ok(None) => Err(Error::StorageError(format!( - "consistency error: chunk {} for blob {} not found", - BASE64.encode(chunkmeta_digest), - BASE64.encode(&req_digest), - )) - .into()), - Ok(Some(data)) => { - // We already know the hash matches, but also - // check the size matches what chunkmeta said. - if data.len() as u32 != chunkmeta.size { - Err(Error::StorageError(format!( - "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}", - BASE64.encode(chunkmeta_digest), - BASE64.encode(&req_digest), - chunkmeta.size, - data.len(), - )).into()) - } else { - // send out the current chunk - // TODO: we might want to break this up further if too big? - Ok(super::BlobChunk { data }) - } - } - }; - // send the result to the client - if (tx.send(res).await).is_err() { - debug!("receiver dropped"); - break; - } + .map_err(|_| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.open_read(&req_digest) { + Ok(Some(reader)) => { + let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into(); + + fn stream_mapper( + x: Result<bytes::Bytes, io::Error>, + ) -> Result<super::BlobChunk, Status> { + match x { + Ok(bytes) => Ok(super::BlobChunk { + data: bytes.to_vec(), + }), + Err(e) => Err(Status::from(e)), } - }); + } + + let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper); + Ok(Response::new(Box::pin(chunks_stream))) } + Ok(None) => Err(Status::not_found(format!( + "blob {} not found", + BASE64.encode(&rq.digest) + ))), + Err(e) => Err(e.into()), } - - let receiver_stream = ReceiverStream::new(rx); - Ok(Response::new(receiver_stream)) } #[instrument(skip(self))] @@ -180,37 +109,34 @@ impl< let data_reader = tokio_util::io::StreamReader::new(data_stream); - // TODO: can we get rid of this clone? - let chunk_service = self.chunk_service.clone(); - - let (blob_digest, blob_meta) = - task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> { - // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream. - read_all_and_chunk( - &chunk_service, - tokio_util::io::SyncIoBridge::new(data_reader), - ) - }) - .await - .map_err(|e| Status::internal(e.to_string()))??; - - // upload blobmeta if not there yet - if self + // prepare a writer, which we'll use in the blocking task below. + let mut writer = self .blob_service - .stat(&super::StatBlobRequest { - digest: blob_digest.to_vec(), - include_chunks: false, - include_bao: false, - })? - .is_none() - { - // upload blobmeta - self.blob_service.put(&blob_digest, blob_meta)?; - } - - // return to client. - Ok(Response::new(super::PutBlobResponse { - digest: blob_digest, - })) + .open_write() + .map_err(|e| Status::internal(format!("unable to open for write: {}", e)))?; + + let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> { + // construct a sync reader to the data + let mut reader = tokio_util::io::SyncIoBridge::new(data_reader); + + io::copy(&mut reader, &mut writer).map_err(|e| { + warn!("error copying: {}", e); + Status::internal("error copying") + })?; + + let digest = writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") + })? + .to_vec(); + + Ok(super::PutBlobResponse { digest }) + }) + .await + .map_err(|_| Status::internal("failed to wait for task"))??; + + Ok(Response::new(result)) } } diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index 5002d7e77a96..62741a3ad508 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -11,6 +11,8 @@ mod grpc_blobservice_wrapper; mod grpc_directoryservice_wrapper; mod grpc_pathinfoservice_wrapper; +mod sync_read_into_async_read; + pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper; diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs new file mode 100644 index 000000000000..0a0ef019781c --- /dev/null +++ b/tvix/store/src/proto/sync_read_into_async_read.rs @@ -0,0 +1,158 @@ +use bytes::Buf; +use core::task::Poll::Ready; +use futures::ready; +use futures::Future; +use std::io; +use std::io::Read; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncRead; +use tokio::runtime::Handle; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; + +#[derive(Debug)] +enum State<Buf: bytes::Buf + bytes::BufMut> { + Idle(Option<Buf>), + Busy(JoinHandle<(io::Result<usize>, Buf)>), +} + +use State::{Busy, Idle}; + +/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a +/// synchronous API. +#[derive(Debug)] +pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> { + state: Mutex<State<Buf>>, + reader: Arc<Mutex<R>>, + rt: Handle, +} + +impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> { + /// This must be called from within a Tokio runtime context, or else it will panic. + #[track_caller] + pub fn new(rt: Handle, reader: R) -> Self { + Self { + rt, + state: State::Idle(None).into(), + reader: Arc::new(reader.into()), + } + } + + /// This must be called from within a Tokio runtime context, or else it will panic. + pub fn new_with_reader(readable: R) -> Self { + Self::new(Handle::current(), readable) + } +} + +/// Repeats operations that are interrupted. +macro_rules! uninterruptibly { + ($e:expr) => {{ + loop { + match $e { + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + res => break res, + } + } + }}; +} + +impl< + R: Read + Send + 'static + std::marker::Unpin, + Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static, + > AsyncRead for SyncReadIntoAsyncRead<R, Buf> +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + dst: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + let me = self.get_mut(); + // Do we need this mutex? + let state = me.state.get_mut(); + + loop { + match state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap_or_default(); + + if buf.has_remaining() { + // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]` + // The `rest` is stuffed into the `buf_cell` for further poll_read. + // The other is completely consumed into the unfilled destination. + // `rest` can be empty. + let mut adjusted_src = + buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining())); + let copied_size = adjusted_src.remaining(); + adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size)); + dst.set_filled(copied_size); + *buf_cell = Some(buf); + return Ready(Ok(())); + } + + let reader = me.reader.clone(); + *state = Busy(me.rt.spawn_blocking(move || { + let result = uninterruptibly!(reader.blocking_lock().read( + // SAFETY: `reader.read` will *ONLY* write initialized bytes + // and never *READ* uninitialized bytes + // inside this buffer. + // + // Furthermore, casting the slice as `*mut [u8]` + // is safe because it has the same layout. + // + // Finally, the pointer obtained is valid and owned + // by `buf` only as we have a valid mutable reference + // to it, it is valid for write. + // + // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998 + unsafe { + &mut *(buf.chunk_mut().as_uninit_slice_mut() + as *mut [std::mem::MaybeUninit<u8>] + as *mut [u8]) + } + )); + + if let Ok(n) = result { + // SAFETY: given we initialize `n` bytes, we can move `n` bytes + // forward. + unsafe { + buf.advance_mut(n); + } + } + + (result, buf) + })); + } + Busy(ref mut rx) => { + let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?; + + match result { + Ok(n) => { + if n > 0 { + let remaining = std::cmp::min(n, dst.remaining()); + let mut adjusted_src = buf.copy_to_bytes(remaining); + adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining)); + dst.advance(remaining); + } + *state = Idle(Some(buf)); + return Ready(Ok(())); + } + Err(e) => { + *state = Idle(None); + return Ready(Err(e)); + } + } + } + } + } + } +} + +impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> { + /// This must be called from within a Tokio runtime context, or else it will panic. + fn from(value: R) -> Self { + Self::new_with_reader(value) + } +} diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs index 88eb21e30241..80e2e71867a3 100644 --- a/tvix/store/src/proto/tests/grpc_blobservice.rs +++ b/tvix/store/src/proto/tests/grpc_blobservice.rs @@ -1,18 +1,14 @@ use crate::blobservice::BlobService; -use crate::chunkservice::ChunkService; -use crate::proto::blob_meta::ChunkMeta; use crate::proto::blob_service_server::BlobService as GRPCBlobService; use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest}; -use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST}; -use crate::tests::utils::{gen_blob_service, gen_chunk_service}; +use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST}; +use crate::tests::utils::gen_blob_service; +use tokio_stream::StreamExt; -fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper< - impl BlobService + Send + Sync + Clone + 'static, - impl ChunkService + Send + Sync + Clone + 'static, -> { +fn gen_grpc_blob_service( +) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> { let blob_service = gen_blob_service(); - let chunk_service = gen_chunk_service(); - GRPCBlobServiceWrapper::new(blob_service, chunk_service) + GRPCBlobServiceWrapper::from(blob_service) } /// Trying to read a non-existent blob should return a not found error. @@ -26,8 +22,13 @@ async fn not_found_read() { })) .await; - let e = resp.expect_err("must_be_err"); - assert_eq!(e.code(), tonic::Code::NotFound); + // We can't use unwrap_err here, because the Ok value doesn't implement + // debug. + if let Err(e) = resp { + assert_eq!(e.code(), tonic::Code::NotFound); + } else { + panic!("resp is not err") + } } /// Trying to stat a non-existent blob should return a not found error. @@ -47,8 +48,7 @@ async fn not_found_stat() { assert_eq!(resp.code(), tonic::Code::NotFound); } -/// Put a blob in the store, get it back. We send something small enough so it -/// won't get split into multiple chunks. +/// Put a blob in the store, get it back. #[tokio::test] async fn put_read_stat() { let service = gen_grpc_blob_service(); @@ -64,39 +64,30 @@ async fn put_read_stat() { assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest); - // Stat for the digest of A. It should return one chunk. + // Stat for the digest of A. + // We currently don't ask for more granular chunking data, as we don't + // expose it yet. let resp = service .stat(tonic::Request::new(StatBlobRequest { digest: BLOB_A_DIGEST.to_vec(), - include_chunks: true, ..Default::default() })) .await .expect("must succeed") .into_inner(); - assert_eq!(1, resp.chunks.len()); - // the `chunks` field should point to the single chunk. - assert_eq!( - vec![ChunkMeta { - digest: BLOB_A_DIGEST.to_vec(), - size: BLOB_A.len() as u32, - }], - resp.chunks, - ); - - // Read the chunk. It should return the same data. + // Read the blob. It should return the same data. let resp = service .read(tonic::Request::new(ReadBlobRequest { digest: BLOB_A_DIGEST.to_vec(), })) .await; - let mut rx = resp.expect("must succeed").into_inner().into_inner(); + let mut rx = resp.ok().unwrap().into_inner(); // the stream should contain one element, a BlobChunk with the same contents as BLOB_A. let item = rx - .recv() + .next() .await .expect("must be some") .expect("must succeed"); @@ -104,127 +95,8 @@ async fn put_read_stat() { assert_eq!(BLOB_A.to_vec(), item.data); // … and no more elements - assert!(rx.recv().await.is_none()); -} - -/// Put a bigger blob in the store, and get it back. -/// Assert the stat request actually returns more than one chunk, and -/// we can read each chunk individually, as well as the whole blob via the -/// `read()` method. -#[tokio::test] -async fn put_read_stat_large() { - let service = gen_grpc_blob_service(); - - // split up BLOB_B into BlobChunks containing 1K bytes each. - let blob_b_blobchunks: Vec<BlobChunk> = BLOB_B - .chunks(1024) - .map(|x| BlobChunk { data: x.to_vec() }) - .collect(); - - assert!(blob_b_blobchunks.len() > 1); + assert!(rx.next().await.is_none()); - // Send blob B - let put_resp = service - .put(tonic_mock::streaming_request(blob_b_blobchunks)) - .await - .expect("must succeed") - .into_inner(); - - assert_eq!(BLOB_B_DIGEST.to_vec(), put_resp.digest); - - // Stat for the digest of B - let resp = service - .stat(tonic::Request::new(StatBlobRequest { - digest: BLOB_B_DIGEST.to_vec(), - include_chunks: true, - ..Default::default() - })) - .await - .expect("must succeed") - .into_inner(); - - // it should return more than one chunk. - assert_ne!(1, resp.chunks.len()); - - // The size added up should equal the size of BLOB_B. - let mut size_in_stat: u32 = 0; - for chunk in &resp.chunks { - size_in_stat += chunk.size - } - assert_eq!(BLOB_B.len() as u32, size_in_stat); - - // Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values. - // TODO: make the chunker config better accessible, so we don't need to synchronize this. - { - let chunker_avg_size = 64 * 1024; - let chunker_min_size = chunker_avg_size / 4; - let chunker_max_size = chunker_avg_size * 4; - - // initialize a chunker with the current buffer - let blob_b = BLOB_B.to_vec(); - let chunker = fastcdc::v2020::FastCDC::new( - &blob_b, - chunker_min_size, - chunker_avg_size, - chunker_max_size, - ); - - let mut num_chunks = 0; - for (i, chunk) in chunker.enumerate() { - assert_eq!( - resp.chunks[i].size, chunk.length as u32, - "expected locally-chunked chunk length to match stat response" - ); - - num_chunks += 1; - } - - assert_eq!( - resp.chunks.len(), - num_chunks, - "expected number of chunks to match" - ); - } - - // Reading the whole blob by its digest via the read() interface should succeed. - { - let resp = service - .read(tonic::Request::new(ReadBlobRequest { - digest: BLOB_B_DIGEST.to_vec(), - })) - .await; - - let mut rx = resp.expect("must succeed").into_inner().into_inner(); - - let mut buf: Vec<u8> = Vec::new(); - while let Some(item) = rx.recv().await { - let mut blob_chunk = item.expect("must not be err"); - buf.append(&mut blob_chunk.data); - } - - assert_eq!(BLOB_B.to_vec(), buf); - } - - // Reading the whole blob by reading individual chunks should also succeed. - { - let mut buf: Vec<u8> = Vec::new(); - for chunk in &resp.chunks { - // request this individual chunk via read - let resp = service - .read(tonic::Request::new(ReadBlobRequest { - digest: chunk.digest.clone(), - })) - .await; - - let mut rx = resp.expect("must succeed").into_inner().into_inner(); - - // append all items from the stream to the buffer - while let Some(item) = rx.recv().await { - let mut blob_chunk = item.expect("must not be err"); - buf.append(&mut blob_chunk.data); - } - } - // finished looping over all chunks, compare - assert_eq!(BLOB_B.to_vec(), buf); - } + // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks. + // Test with some bigger blob too } diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index f74c3fda0213..11cab2c264cc 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -6,12 +6,10 @@ use crate::proto::GRPCPathInfoServiceWrapper; use crate::proto::PathInfo; use crate::proto::{GetPathInfoRequest, Node, SymlinkNode}; use crate::tests::fixtures::DUMMY_OUTPUT_HASH; -use crate::tests::utils::{ - gen_blob_service, gen_chunk_service, gen_directory_service, gen_pathinfo_service, -}; +use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; use tonic::Request; -/// generates a GRPCPathInfoService out of blob, chunk, directory and pathinfo services. +/// generates a GRPCPathInfoService out of blob, directory and pathinfo services. /// /// We only interact with it via the PathInfo GRPC interface. /// It uses the NonCachingNARCalculationService NARCalculationService to @@ -19,11 +17,7 @@ use tonic::Request; fn gen_grpc_service() -> impl GRPCPathInfoService { GRPCPathInfoServiceWrapper::new( gen_pathinfo_service(), - NonCachingNARCalculationService::new( - gen_blob_service(), - gen_chunk_service(), - gen_directory_service(), - ), + NonCachingNARCalculationService::new(gen_blob_service(), gen_directory_service()), ) } diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs index 3a48df9e33c8..ed5154d6fec0 100644 --- a/tvix/store/src/tests/import.rs +++ b/tvix/store/src/tests/import.rs @@ -1,4 +1,4 @@ -use super::utils::{gen_blob_service, gen_chunk_service, gen_directory_service}; +use super::utils::{gen_blob_service, gen_directory_service}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::import::import_path; @@ -21,7 +21,6 @@ fn symlink() { let root_node = import_path( &mut gen_blob_service(), - &mut gen_chunk_service(), &mut gen_directory_service(), tmpdir.path().join("doesntmatter"), ) @@ -46,7 +45,6 @@ fn single_file() { let root_node = import_path( &mut blob_service, - &mut gen_chunk_service(), &mut gen_directory_service(), tmpdir.path().join("root"), ) @@ -64,13 +62,8 @@ fn single_file() { // ensure the blob has been uploaded assert!(blob_service - .stat(&proto::StatBlobRequest { - digest: HELLOWORLD_BLOB_DIGEST.to_vec(), - include_chunks: false, - ..Default::default() - }) - .unwrap() - .is_some()); + .has(&HELLOWORLD_BLOB_DIGEST.to_vec().try_into().unwrap()) + .unwrap()); } #[test] @@ -89,13 +82,8 @@ fn complicated() { let mut blob_service = gen_blob_service(); let mut directory_service = gen_directory_service(); - let root_node = import_path( - &mut blob_service, - &mut gen_chunk_service(), - &mut directory_service, - tmpdir.path(), - ) - .expect("must succeed"); + let root_node = import_path(&mut blob_service, &mut directory_service, tmpdir.path()) + .expect("must succeed"); // ensure root_node matched expectations assert_eq!( @@ -124,11 +112,6 @@ fn complicated() { // ensure EMPTY_BLOB_CONTENTS has been uploaded assert!(blob_service - .stat(&proto::StatBlobRequest { - digest: EMPTY_BLOB_DIGEST.to_vec(), - include_chunks: false, - include_bao: false - }) - .unwrap() - .is_some()); + .has(&EMPTY_BLOB_DIGEST.to_vec().try_into().unwrap()) + .unwrap()); } diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 2d612d9c273f..b5dbf595960f 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -1,21 +1,17 @@ use crate::blobservice::BlobService; -use crate::chunkservice::ChunkService; +use crate::blobservice::BlobWriter; use crate::directoryservice::DirectoryService; use crate::nar::NARRenderer; -use crate::proto; use crate::proto::DirectoryNode; use crate::proto::FileNode; use crate::proto::SymlinkNode; use crate::tests::fixtures::*; use crate::tests::utils::*; +use std::io; #[test] fn single_symlink() { - let renderer = NARRenderer::new( - gen_blob_service(), - gen_chunk_service(), - gen_directory_service(), - ); + let renderer = NARRenderer::new(gen_blob_service(), gen_directory_service()); // don't put anything in the stores, as we don't actually do any requests. let mut buf: Vec<u8> = vec![]; @@ -37,11 +33,7 @@ fn single_symlink() { /// match what's in the store. #[test] fn single_file_missing_blob() { - let renderer = NARRenderer::new( - gen_blob_service(), - gen_chunk_service(), - gen_directory_service(), - ); + let renderer = NARRenderer::new(gen_blob_service(), gen_directory_service()); let mut buf: Vec<u8> = vec![]; let e = renderer @@ -56,10 +48,11 @@ fn single_file_missing_blob() { ) .expect_err("must fail"); - if let crate::nar::RenderError::BlobNotFound(actual_digest, _) = e { - assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), actual_digest); - } else { - panic!("unexpected error") + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::NotFound, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), } } @@ -68,84 +61,79 @@ fn single_file_missing_blob() { #[test] fn single_file_wrong_blob_size() { let blob_service = gen_blob_service(); - let chunk_service = gen_chunk_service(); - // insert blob and chunk into the stores - chunk_service - .put(HELLOWORLD_BLOB_CONTENTS.to_vec()) - .unwrap(); - - blob_service - .put( - &HELLOWORLD_BLOB_DIGEST, - proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { + // insert blob into the store + let mut writer = blob_service.open_write().unwrap(); + io::copy( + &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .unwrap(); + assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), writer.close().unwrap()); + + let renderer = NARRenderer::new(blob_service, gen_directory_service()); + + // Test with a root FileNode of a too big size + { + let mut buf: Vec<u8> = vec![]; + let e = renderer + .write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".to_string(), digest: HELLOWORLD_BLOB_DIGEST.to_vec(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u32, - }], - ..Default::default() - }, - ) - .unwrap(); - - let renderer = NARRenderer::new(blob_service, chunk_service, gen_directory_service()); - let mut buf: Vec<u8> = vec![]; - - let e = renderer - .write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".to_string(), - digest: HELLOWORLD_BLOB_DIGEST.to_vec(), - size: 42, // <- note the wrong size here! - executable: false, - }), - ) - .expect_err("must fail"); + size: 42, // <- note the wrong size here! + executable: false, + }), + ) + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::UnexpectedEof, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } + } - if let crate::nar::RenderError::UnexpectedBlobMeta(digest, _, expected_size, actual_size) = e { - assert_eq!( - digest, - HELLOWORLD_BLOB_DIGEST.to_vec(), - "expect digest to match" - ); - assert_eq!( - expected_size, 42, - "expected expected size to be what's passed in the request" - ); - assert_eq!( - actual_size, - HELLOWORLD_BLOB_CONTENTS.len() as u32, - "expected actual size to be correct" - ); - } else { - panic!("unexpected error") + // Test with a root FileNode of a too small size + { + let mut buf: Vec<u8> = vec![]; + let e = renderer + .write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".to_string(), + digest: HELLOWORLD_BLOB_DIGEST.to_vec(), + size: 2, // <- note the wrong size here! + executable: false, + }), + ) + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::InvalidInput, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } } } #[test] fn single_file() { let blob_service = gen_blob_service(); - let chunk_service = gen_chunk_service(); - chunk_service - .put(HELLOWORLD_BLOB_CONTENTS.to_vec()) - .unwrap(); + // insert blob into the store + let mut writer = blob_service.open_write().unwrap(); + io::copy( + &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .unwrap(); + assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), writer.close().unwrap()); - blob_service - .put( - &HELLOWORLD_BLOB_DIGEST, - proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: HELLOWORLD_BLOB_DIGEST.to_vec(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u32, - }], - ..Default::default() - }, - ) - .unwrap(); - - let renderer = NARRenderer::new(blob_service, chunk_service, gen_directory_service()); + let renderer = NARRenderer::new(blob_service, gen_directory_service()); let mut buf: Vec<u8> = vec![]; renderer @@ -166,31 +154,24 @@ fn single_file() { #[test] fn test_complicated() { let blob_service = gen_blob_service(); - let chunk_service = gen_chunk_service(); let directory_service = gen_directory_service(); // put all data into the stores. - let digest = chunk_service.put(EMPTY_BLOB_CONTENTS.to_vec()).unwrap(); - - blob_service - .put( - &digest, - proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: digest.to_vec(), - size: EMPTY_BLOB_CONTENTS.len() as u32, - }], - ..Default::default() - }, - ) - .unwrap(); + // insert blob into the store + let mut writer = blob_service.open_write().unwrap(); + io::copy( + &mut io::Cursor::new(EMPTY_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .unwrap(); + assert_eq!(EMPTY_BLOB_DIGEST.to_vec(), writer.close().unwrap()); directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap(); directory_service .put(DIRECTORY_COMPLICATED.clone()) .unwrap(); - let renderer = NARRenderer::new(blob_service, chunk_service, directory_service); + let renderer = NARRenderer::new(blob_service, directory_service); let mut buf: Vec<u8> = vec![]; renderer diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs index 916e6516216c..2991feed41db 100644 --- a/tvix/store/src/tests/utils.rs +++ b/tvix/store/src/tests/utils.rs @@ -1,6 +1,5 @@ use crate::{ blobservice::{BlobService, MemoryBlobService}, - chunkservice::{ChunkService, MemoryChunkService}, directoryservice::{DirectoryService, MemoryDirectoryService}, pathinfoservice::{MemoryPathInfoService, PathInfoService}, }; @@ -9,10 +8,6 @@ pub fn gen_blob_service() -> impl BlobService + Send + Sync + Clone + 'static { MemoryBlobService::default() } -pub fn gen_chunk_service() -> impl ChunkService + Clone { - MemoryChunkService::default() -} - pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + 'static { MemoryDirectoryService::default() } |