From 0ae32d45f690eed8f259ac55b4d0d8bbc006baf2 Mon Sep 17 00:00:00 2001 From: Ryan Lahfa Date: Sun, 17 Dec 2023 01:22:01 +0100 Subject: feat(tvix/castore): simple filesystem blob service The simple filesystem `BlobService` enable a user to write blob store on an existing filesystem using a prefix-style layout in the provided root directory, e.g. the two first bytes of the blake3 hashes are used as directories prefixes. Change-Id: I3451a688a6f39027b9c6517d853b95a87adb3a52 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10071 Autosubmit: raitobezarius Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/Cargo.lock | 20 +++ tvix/Cargo.nix | 87 +++++++++++++ tvix/castore/Cargo.toml | 1 + tvix/castore/src/blobservice/from_addr.rs | 11 +- tvix/castore/src/blobservice/mod.rs | 3 + tvix/castore/src/blobservice/simplefs.rs | 195 ++++++++++++++++++++++++++++++ 6 files changed, 316 insertions(+), 1 deletion(-) create mode 100644 tvix/castore/src/blobservice/simplefs.rs (limited to 'tvix') diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index b1e6c7d70637..297adf5c846a 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -138,6 +138,16 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "async-tempfile" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b37d4bb113c47e4f263d4b0221912ff5aa840a51bc9b7b47b024e1cf1926fd9b" +dependencies = [ + "tokio", + "uuid", +] + [[package]] name = "async-trait" version = "0.1.68" @@ -3093,6 +3103,7 @@ name = "tvix-castore" version = "0.1.0" dependencies = [ "async-stream", + "async-tempfile", "blake3", "bstr", "bytes", @@ -3334,6 +3345,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index bf280efa37da..2c9120376871 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -470,6 +470,40 @@ rec { ]; }; + "async-tempfile" = rec { + crateName = "async-tempfile"; + version = "0.4.0"; + edition = "2021"; + sha256 = "16zx4qcwzq94n13pp6xwa4589apm5y8j20jb7lk4yzn42fqlnzdk"; + authors = [ + "Markus Mayer" + ]; + dependencies = [ + { + name = "tokio"; + packageId = "tokio"; + features = [ "fs" ]; + } + { + name = "uuid"; + packageId = "uuid"; + optional = true; + features = [ "v4" ]; + } + ]; + devDependencies = [ + { + name = "tokio"; + packageId = "tokio"; + features = [ "rt-multi-thread" "macros" ]; + } + ]; + features = { + "default" = [ "uuid" ]; + "uuid" = [ "dep:uuid" ]; + }; + resolvedDefaultFeatures = [ "default" "uuid" ]; + }; "async-trait" = rec { crateName = "async-trait"; version = "0.1.68"; @@ -9553,6 +9587,10 @@ rec { name = "async-stream"; packageId = "async-stream"; } + { + name = "async-tempfile"; + packageId = "async-tempfile"; + } { name = "blake3"; packageId = "blake3"; @@ -10410,6 +10448,55 @@ rec { features = { }; resolvedDefaultFeatures = [ "default" ]; }; + "uuid" = rec { + crateName = "uuid"; + version = "1.5.0"; + edition = "2018"; + sha256 = "1z6dnvba224p8wvv4vx4xpgc2yxqy12sk4qh346sfh8baskmkbc8"; + authors = [ + "Ashley Mannix" + "Christopher Armstrong" + "Dylan DPC" + "Hunar Roop Kahlon" + ]; + dependencies = [ + { + name = "getrandom"; + packageId = "getrandom"; + rename = "getrandom"; + optional = true; + } + ]; + features = { + "arbitrary" = [ "dep:arbitrary" ]; + "atomic" = [ "dep:atomic" ]; + "borsh" = [ "dep:borsh" ]; + "bytemuck" = [ "dep:bytemuck" ]; + "default" = [ "std" ]; + "fast-rng" = [ "rng" "rand" ]; + "getrandom" = [ "dep:getrandom" ]; + "js" = [ "wasm-bindgen" "getrandom" "getrandom/js" ]; + "macro-diagnostics" = [ "uuid-macro-internal" ]; + "md-5" = [ "dep:md-5" ]; + "md5" = [ "md-5" ]; + "rand" = [ "dep:rand" ]; + "rng" = [ "getrandom" ]; + "serde" = [ "dep:serde" ]; + "sha1" = [ "sha1_smol" ]; + "sha1_smol" = [ "dep:sha1_smol" ]; + "slog" = [ "dep:slog" ]; + "uuid-macro-internal" = [ "dep:uuid-macro-internal" ]; + "v1" = [ "atomic" ]; + "v3" = [ "md5" ]; + "v4" = [ "rng" ]; + "v5" = [ "sha1" ]; + "v6" = [ "atomic" ]; + "v7" = [ "atomic" "rng" ]; + "wasm-bindgen" = [ "dep:wasm-bindgen" ]; + "zerocopy" = [ "dep:zerocopy" ]; + }; + resolvedDefaultFeatures = [ "default" "getrandom" "rng" "std" "v4" ]; + }; "valuable" = rec { crateName = "valuable"; version = "0.1.0"; diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index 468ef8ec3528..2a421280b82b 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -23,6 +23,7 @@ tracing = "0.1.37" url = "2.4.0" walkdir = "2.4.0" bstr = "1.6.0" +async-tempfile = "0.4.0" [dependencies.tonic-reflection] optional = true diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index 97e185464d3c..8f8e59952662 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -3,7 +3,9 @@ use url::Url; use crate::{proto::blob_service_client::BlobServiceClient, Error}; -use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService}; +use super::{ + BlobService, GRPCBlobService, MemoryBlobService, SimpleFilesystemBlobService, SledBlobService, +}; /// Constructs a new instance of a [BlobService] from an URI. /// @@ -11,6 +13,7 @@ use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService}; /// - `memory://` ([MemoryBlobService]) /// - `sled://` ([SledBlobService]) /// - `grpc+*://` ([GRPCBlobService]) +/// - `simplefs://` ([SimpleFilesystemBlobService]) /// /// See their `from_url` methods for more details about their syntax. pub async fn from_addr(uri: &str) -> Result, crate::Error> { @@ -54,6 +57,12 @@ pub async fn from_addr(uri: &str) -> Result, crate::Error> // Constructing the channel is handled by tvix_castore::channel::from_url. let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?); Arc::new(GRPCBlobService::from_client(client)) + } else if url.scheme() == "simplefs" { + if url.path().is_empty() { + return Err(Error::StorageError("Invalid filesystem path".to_string())); + } + + Arc::new(SimpleFilesystemBlobService::new(url.path().into()).await?) } else { Err(crate::Error::StorageError(format!( "unknown scheme: {}", diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index faaf94f03746..d024121eaa39 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -7,6 +7,7 @@ mod from_addr; mod grpc; mod memory; mod naive_seeker; +mod simplefs; mod sled; #[cfg(test)] @@ -15,6 +16,7 @@ mod tests; pub use self::from_addr::from_addr; pub use self::grpc::GRPCBlobService; pub use self::memory::MemoryBlobService; +pub use self::simplefs::SimpleFilesystemBlobService; pub use self::sled::SledBlobService; /// The base trait all BlobService services need to implement. @@ -51,3 +53,4 @@ pub trait BlobReader: tokio::io::AsyncRead + tokio::io::AsyncSeek + Send + Unpin /// A [`io::Cursor>`] can be used as a BlobReader. impl BlobReader for io::Cursor> {} +impl BlobReader for tokio::fs::File {} diff --git a/tvix/castore/src/blobservice/simplefs.rs b/tvix/castore/src/blobservice/simplefs.rs new file mode 100644 index 000000000000..b21db2808c23 --- /dev/null +++ b/tvix/castore/src/blobservice/simplefs.rs @@ -0,0 +1,195 @@ +use std::{ + io, + path::{Path, PathBuf}, + pin::pin, + task::Poll, +}; + +use bytes::Buf; +use data_encoding::HEXLOWER; +use pin_project_lite::pin_project; +use tokio::io::AsyncWriteExt; +use tonic::async_trait; +use tracing::instrument; + +use crate::B3Digest; + +use super::{BlobReader, BlobService, BlobWriter}; + +/// Connects to a tvix-store BlobService on an existing path backed by a POSIX-compliant +/// filesystem. +/// +/// It takes an existing path, builds a `tmp` directory and a `blobs` directory inside of it. All +/// blobs received are staged in that `tmp` directory, then they are moved **atomically** into +/// `blobs/B3DIGEST[:2]/B3DIGEST[2:]` in a sharding style, e.g. `abcdef` gets turned into `ab/cdef` +/// +/// **Disclaimer** : This very simple implementation is subject to change and does not give any +/// final guarantees on the on-disk format. +#[derive(Clone)] +pub struct SimpleFilesystemBlobService { + /// Where the blobs are located on a filesystem already mounted. + path: PathBuf, +} + +impl SimpleFilesystemBlobService { + pub async fn new(path: PathBuf) -> std::io::Result { + tokio::fs::create_dir_all(&path).await?; + tokio::fs::create_dir_all(path.join("tmp")).await?; + tokio::fs::create_dir_all(path.join("blobs")).await?; + + Ok(Self { path }) + } +} + +fn derive_path(root: &Path, digest: &B3Digest) -> PathBuf { + let prefix = HEXLOWER.encode(&digest.as_slice()[..2]); + let pathname = HEXLOWER.encode(digest.as_slice()); + + root.join("blobs").join(prefix).join(pathname) +} + +#[async_trait] +impl BlobService for SimpleFilesystemBlobService { + #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> io::Result { + Ok(tokio::fs::try_exists(derive_path(&self.path, digest)).await?) + } + + #[instrument(skip_all, err, fields(blob.digest=%digest))] + async fn open_read(&self, digest: &B3Digest) -> io::Result>> { + let dst_path = derive_path(&self.path, digest); + let reader = match tokio::fs::File::open(dst_path).await { + Ok(file) => { + let reader: Box = Box::new(file); + Ok(Some(reader)) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e), + }; + + Ok(reader?) + } + + #[instrument(skip_all)] + async fn open_write(&self) -> Box { + let file = match async_tempfile::TempFile::new_in(self.path.join("tmp")).await { + Ok(file) => Ok(file), + Err(e) => match e { + async_tempfile::Error::Io(io_error) => Err(io_error), + async_tempfile::Error::InvalidFile => Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "invalid or missing file specified", + )), + async_tempfile::Error::InvalidDirectory => Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "invalid or missing directory specified", + )), + }, + }; + + Box::new(SimpleFilesystemBlobWriter { + root: self.path.clone(), + file, + digester: blake3::Hasher::new(), + }) + } +} + +pin_project! { + struct SimpleFilesystemBlobWriter { + root: PathBuf, + file: std::io::Result, + digester: blake3::Hasher + } +} + +impl tokio::io::AsyncWrite for SimpleFilesystemBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + if let Err(e) = self.file.as_mut() { + return Poll::Ready(Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + ))); + } + + let writer = self.file.as_mut().unwrap(); + match pin!(writer).poll_write(cx, buf) { + Poll::Ready(Ok(n)) => { + let this = self.project(); + this.digester.update(buf.take(n).into_inner()); + Poll::Ready(Ok(n)) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if let Err(e) = self.file.as_mut() { + return Poll::Ready(Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + ))); + } + + let writer = self.file.as_mut().unwrap(); + pin!(writer).poll_flush(cx) + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if let Err(e) = self.file.as_mut() { + return Poll::Ready(Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + ))); + } + + let writer = self.file.as_mut().unwrap(); + pin!(writer).poll_shutdown(cx) + } +} + +#[async_trait] +impl BlobWriter for SimpleFilesystemBlobWriter { + async fn close(&mut self) -> io::Result { + if let Err(e) = self.file.as_mut() { + return Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + )); + } + + let writer = self.file.as_mut().unwrap(); + writer.sync_all().await?; + writer.flush().await?; + + let digest: B3Digest = self.digester.finalize().as_bytes().into(); + let dst_path = derive_path(&self.root, &digest); + tokio::fs::create_dir_all(dst_path.parent().unwrap()).await?; + tokio::fs::rename(writer.file_path(), dst_path).await?; + + Ok(digest) + } +} -- cgit 1.4.1