about summary refs log tree commit diff
path: root/tvix/store/src/chunkservice
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-05-11T12·49+0300
committerflokli <flokli@flokli.de>2023-05-11T14·27+0000
commit616fa4476f93e1782e68dc713e9e8cb77a426c7d (patch)
treef76a43e95c75d848d079706fbccfd442210ebebc /tvix/store/src/chunkservice
parentb22b685f0b2524c088deacbf4e80e7b7c73b5afc (diff)
refactor(tvix/store): remove ChunkService r/6133
Whether chunking is involved or not, is an implementation detail of each
Blobstore. Consumers of a whole blob shouldn't need to worry about that.
It currently is not visible in the gRPC interface either. It
shouldn't bleed into everything.

Let the BlobService trait provide `open_read` and `open_write` methods,
which return handles providing io::Read or io::Write, and leave the
details up to the implementation.

This means, our custom BlobReader module can go away, and all the
chunking bits in there, too.

In the future, we might still want to add more chunking-aware syncing,
but as a syncing strategy some stores can expose, not as a fundamental
protocol component.

This currently needs "SyncReadIntoAsyncRead", taken and vendored in from
https://github.com/tokio-rs/tokio/pull/5669.
It provides a AsyncRead for a sync Read, which is necessary to connect
our (sync) BlobReader interface to a GRPC server implementation.

As an alternative, we could also make the BlobReader itself async, and
let consumers of the trait (EvalIO) deal with the async-ness, but this
is less of a change for now.

In terms of vendoring, I initially tried to move our tokio crate to
these commits, but ended up in version incompatibilities, so let's
vendor it in for now.

Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/store/src/chunkservice')
-rw-r--r--tvix/store/src/chunkservice/memory.rs53
-rw-r--r--tvix/store/src/chunkservice/mod.rs28
-rw-r--r--tvix/store/src/chunkservice/sled.rs70
-rw-r--r--tvix/store/src/chunkservice/util.rs85
4 files changed, 0 insertions, 236 deletions
diff --git a/tvix/store/src/chunkservice/memory.rs b/tvix/store/src/chunkservice/memory.rs
deleted file mode 100644
index 9fe4dc17d5ec..000000000000
--- a/tvix/store/src/chunkservice/memory.rs
+++ /dev/null
@@ -1,53 +0,0 @@
-use data_encoding::BASE64;
-use std::{
-    collections::HashMap,
-    sync::{Arc, RwLock},
-};
-use tracing::instrument;
-
-use crate::Error;
-
-use super::ChunkService;
-
-#[derive(Clone, Default)]
-pub struct MemoryChunkService {
-    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
-}
-
-impl ChunkService for MemoryChunkService {
-    #[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
-        let db = self.db.read().unwrap();
-        Ok(db.get(digest).is_some())
-    }
-
-    #[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))]
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
-        let db = self.db.read().unwrap();
-        match db.get(digest) {
-            None => Ok(None),
-            Some(data) => {
-                // calculate the hash to verify this is really what we expect
-                let actual_digest = blake3::hash(data).as_bytes().to_vec();
-                if actual_digest != digest {
-                    return Err(Error::StorageError(format!(
-                        "invalid hash encountered when reading chunk, expected {}, got {}",
-                        BASE64.encode(digest),
-                        BASE64.encode(&actual_digest),
-                    )));
-                }
-                Ok(Some(data.clone()))
-            }
-        }
-    }
-
-    #[instrument(skip(self, data))]
-    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
-        let digest = blake3::hash(&data);
-
-        let mut db = self.db.write().unwrap();
-        db.insert(*digest.as_bytes(), data);
-
-        Ok(*digest.as_bytes())
-    }
-}
diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs
deleted file mode 100644
index faf0a88f151a..000000000000
--- a/tvix/store/src/chunkservice/mod.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-mod util;
-
-pub mod memory;
-pub mod sled;
-
-use crate::Error;
-
-pub use self::memory::MemoryChunkService;
-pub use self::sled::SledChunkService;
-pub use self::util::read_all_and_chunk;
-pub use self::util::update_hasher;
-pub use self::util::upload_chunk;
-
-/// The base trait all ChunkService services need to implement.
-/// It allows checking for the existence, download and upload of chunks.
-/// It's usually used after consulting a [crate::blobservice::BlobService] for
-/// chunking information.
-pub trait ChunkService {
-    /// check if the service has a chunk, given by its digest.
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
-
-    /// retrieve a chunk by its digest. Implementations MUST validate the digest
-    /// matches.
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error>;
-
-    /// insert a chunk. returns the digest of the chunk, or an error.
-    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error>;
-}
diff --git a/tvix/store/src/chunkservice/sled.rs b/tvix/store/src/chunkservice/sled.rs
deleted file mode 100644
index 8e86e1825b28..000000000000
--- a/tvix/store/src/chunkservice/sled.rs
+++ /dev/null
@@ -1,70 +0,0 @@
-use std::path::PathBuf;
-
-use data_encoding::BASE64;
-use tracing::instrument;
-
-use crate::Error;
-
-use super::ChunkService;
-
-#[derive(Clone)]
-pub struct SledChunkService {
-    db: sled::Db,
-}
-
-impl SledChunkService {
-    pub fn new(p: PathBuf) -> Result<Self, sled::Error> {
-        let config = sled::Config::default().use_compression(true).path(p);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-
-    pub fn new_temporary() -> Result<Self, sled::Error> {
-        let config = sled::Config::default().temporary(true);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-}
-
-impl ChunkService for SledChunkService {
-    #[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
-        match self.db.get(digest) {
-            Ok(None) => Ok(false),
-            Ok(Some(_)) => Ok(true),
-            Err(e) => Err(Error::StorageError(e.to_string())),
-        }
-    }
-
-    #[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))]
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
-        match self.db.get(digest) {
-            Ok(None) => Ok(None),
-            Ok(Some(data)) => {
-                // calculate the hash to verify this is really what we expect
-                let actual_digest = blake3::hash(&data).as_bytes().to_vec();
-                if actual_digest != digest {
-                    return Err(Error::StorageError(format!(
-                        "invalid hash encountered when reading chunk, expected {}, got {}",
-                        BASE64.encode(digest),
-                        BASE64.encode(&actual_digest),
-                    )));
-                }
-                Ok(Some(Vec::from(&*data)))
-            }
-            Err(e) => Err(Error::StorageError(e.to_string())),
-        }
-    }
-
-    #[instrument(name = "SledChunkService::put", skip(self, data))]
-    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
-        let digest = blake3::hash(&data);
-        let result = self.db.insert(digest.as_bytes(), data);
-        if let Err(e) = result {
-            return Err(Error::StorageError(e.to_string()));
-        }
-        Ok(*digest.as_bytes())
-    }
-}
diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs
deleted file mode 100644
index 2ad663733b04..000000000000
--- a/tvix/store/src/chunkservice/util.rs
+++ /dev/null
@@ -1,85 +0,0 @@
-use crate::{proto, Error};
-use std::io::Read;
-use tracing::{debug, instrument};
-
-use super::ChunkService;
-
-/// uploads a chunk to a chunk service, and returns its digest (or an error) when done.
-#[instrument(skip_all, err)]
-pub fn upload_chunk<CS: ChunkService>(
-    chunk_service: &CS,
-    chunk_data: Vec<u8>,
-) -> Result<[u8; 32], Error> {
-    let mut hasher = blake3::Hasher::new();
-    update_hasher(&mut hasher, &chunk_data);
-    let digest = hasher.finalize();
-
-    if chunk_service.has(digest.as_bytes())? {
-        debug!("already has chunk, skipping");
-    }
-    let digest_resp = chunk_service.put(chunk_data)?;
-
-    assert_eq!(&digest_resp, digest.as_bytes());
-
-    Ok(*digest.as_bytes())
-}
-
-/// reads through a reader, writes chunks to a [ChunkService] and returns a
-/// [proto::BlobMeta] pointing to all the chunks.
-#[instrument(skip_all, err)]
-pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
-    chunk_service: &CS,
-    r: R,
-) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
-    let mut blob_meta = proto::BlobMeta::default();
-
-    // hash the file contents, upload chunks if not there yet
-    let mut blob_hasher = blake3::Hasher::new();
-
-    // TODO: play with chunking sizes
-    let chunker_avg_size = 64 * 1024;
-    let chunker_min_size = chunker_avg_size / 4;
-    let chunker_max_size = chunker_avg_size * 4;
-
-    let chunker =
-        fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size);
-
-    for chunking_result in chunker {
-        let chunk = chunking_result.unwrap();
-        // TODO: convert to error::UnableToRead
-
-        let chunk_len = chunk.data.len() as u32;
-
-        // update calculate blob hash
-        update_hasher(&mut blob_hasher, &chunk.data);
-
-        let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
-
-        blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
-            digest: chunk_digest.to_vec(),
-            size: chunk_len,
-        });
-    }
-    Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta))
-}
-
-/// updates a given hasher with more data. Uses rayon if the data is
-/// sufficiently big.
-///
-/// From the docs:
-///
-/// To get any performance benefit from multithreading, the input buffer needs
-/// to be large. As a rule of thumb on x86_64, update_rayon is slower than
-/// update for inputs under 128 KiB. That threshold varies quite a lot across
-/// different processors, and it’s important to benchmark your specific use
-/// case.
-///
-/// We didn't benchmark yet, so these numbers might need tweaking.
-#[instrument(skip_all)]
-pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) {
-    if data.len() > 128 * 1024 {
-        hasher.update_rayon(data);
-    } else {
-        hasher.update(data);
-    }
-}