diff options
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r-- | tvix/castore/src/blobservice/from_addr.rs | 11 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/mod.rs | 3 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/simplefs.rs | 195 |
3 files changed, 208 insertions, 1 deletions
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index 97e185464d3c..8f8e59952662 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -3,7 +3,9 @@ use url::Url; use crate::{proto::blob_service_client::BlobServiceClient, Error}; -use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService}; +use super::{ + BlobService, GRPCBlobService, MemoryBlobService, SimpleFilesystemBlobService, SledBlobService, +}; /// Constructs a new instance of a [BlobService] from an URI. /// @@ -11,6 +13,7 @@ use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService}; /// - `memory://` ([MemoryBlobService]) /// - `sled://` ([SledBlobService]) /// - `grpc+*://` ([GRPCBlobService]) +/// - `simplefs://` ([SimpleFilesystemBlobService]) /// /// See their `from_url` methods for more details about their syntax. pub async fn from_addr(uri: &str) -> Result<Arc<dyn BlobService>, crate::Error> { @@ -54,6 +57,12 @@ pub async fn from_addr(uri: &str) -> Result<Arc<dyn BlobService>, crate::Error> // Constructing the channel is handled by tvix_castore::channel::from_url. let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?); Arc::new(GRPCBlobService::from_client(client)) + } else if url.scheme() == "simplefs" { + if url.path().is_empty() { + return Err(Error::StorageError("Invalid filesystem path".to_string())); + } + + Arc::new(SimpleFilesystemBlobService::new(url.path().into()).await?) } else { Err(crate::Error::StorageError(format!( "unknown scheme: {}", diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index faaf94f03746..d024121eaa39 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -7,6 +7,7 @@ mod from_addr; mod grpc; mod memory; mod naive_seeker; +mod simplefs; mod sled; #[cfg(test)] @@ -15,6 +16,7 @@ mod tests; pub use self::from_addr::from_addr; pub use self::grpc::GRPCBlobService; pub use self::memory::MemoryBlobService; +pub use self::simplefs::SimpleFilesystemBlobService; pub use self::sled::SledBlobService; /// The base trait all BlobService services need to implement. @@ -51,3 +53,4 @@ pub trait BlobReader: tokio::io::AsyncRead + tokio::io::AsyncSeek + Send + Unpin /// A [`io::Cursor<Vec<u8>>`] can be used as a BlobReader. impl BlobReader for io::Cursor<Vec<u8>> {} +impl BlobReader for tokio::fs::File {} diff --git a/tvix/castore/src/blobservice/simplefs.rs b/tvix/castore/src/blobservice/simplefs.rs new file mode 100644 index 000000000000..b21db2808c23 --- /dev/null +++ b/tvix/castore/src/blobservice/simplefs.rs @@ -0,0 +1,195 @@ +use std::{ + io, + path::{Path, PathBuf}, + pin::pin, + task::Poll, +}; + +use bytes::Buf; +use data_encoding::HEXLOWER; +use pin_project_lite::pin_project; +use tokio::io::AsyncWriteExt; +use tonic::async_trait; +use tracing::instrument; + +use crate::B3Digest; + +use super::{BlobReader, BlobService, BlobWriter}; + +/// Connects to a tvix-store BlobService on an existing path backed by a POSIX-compliant +/// filesystem. +/// +/// It takes an existing path, builds a `tmp` directory and a `blobs` directory inside of it. All +/// blobs received are staged in that `tmp` directory, then they are moved **atomically** into +/// `blobs/B3DIGEST[:2]/B3DIGEST[2:]` in a sharding style, e.g. `abcdef` gets turned into `ab/cdef` +/// +/// **Disclaimer** : This very simple implementation is subject to change and does not give any +/// final guarantees on the on-disk format. +#[derive(Clone)] +pub struct SimpleFilesystemBlobService { + /// Where the blobs are located on a filesystem already mounted. + path: PathBuf, +} + +impl SimpleFilesystemBlobService { + pub async fn new(path: PathBuf) -> std::io::Result<Self> { + tokio::fs::create_dir_all(&path).await?; + tokio::fs::create_dir_all(path.join("tmp")).await?; + tokio::fs::create_dir_all(path.join("blobs")).await?; + + Ok(Self { path }) + } +} + +fn derive_path(root: &Path, digest: &B3Digest) -> PathBuf { + let prefix = HEXLOWER.encode(&digest.as_slice()[..2]); + let pathname = HEXLOWER.encode(digest.as_slice()); + + root.join("blobs").join(prefix).join(pathname) +} + +#[async_trait] +impl BlobService for SimpleFilesystemBlobService { + #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> io::Result<bool> { + Ok(tokio::fs::try_exists(derive_path(&self.path, digest)).await?) + } + + #[instrument(skip_all, err, fields(blob.digest=%digest))] + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { + let dst_path = derive_path(&self.path, digest); + let reader = match tokio::fs::File::open(dst_path).await { + Ok(file) => { + let reader: Box<dyn BlobReader> = Box::new(file); + Ok(Some(reader)) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e), + }; + + Ok(reader?) + } + + #[instrument(skip_all)] + async fn open_write(&self) -> Box<dyn BlobWriter> { + let file = match async_tempfile::TempFile::new_in(self.path.join("tmp")).await { + Ok(file) => Ok(file), + Err(e) => match e { + async_tempfile::Error::Io(io_error) => Err(io_error), + async_tempfile::Error::InvalidFile => Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "invalid or missing file specified", + )), + async_tempfile::Error::InvalidDirectory => Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "invalid or missing directory specified", + )), + }, + }; + + Box::new(SimpleFilesystemBlobWriter { + root: self.path.clone(), + file, + digester: blake3::Hasher::new(), + }) + } +} + +pin_project! { + struct SimpleFilesystemBlobWriter { + root: PathBuf, + file: std::io::Result<async_tempfile::TempFile>, + digester: blake3::Hasher + } +} + +impl tokio::io::AsyncWrite for SimpleFilesystemBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<Result<usize, std::io::Error>> { + if let Err(e) = self.file.as_mut() { + return Poll::Ready(Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + ))); + } + + let writer = self.file.as_mut().unwrap(); + match pin!(writer).poll_write(cx, buf) { + Poll::Ready(Ok(n)) => { + let this = self.project(); + this.digester.update(buf.take(n).into_inner()); + Poll::Ready(Ok(n)) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), std::io::Error>> { + if let Err(e) = self.file.as_mut() { + return Poll::Ready(Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + ))); + } + + let writer = self.file.as_mut().unwrap(); + pin!(writer).poll_flush(cx) + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), std::io::Error>> { + if let Err(e) = self.file.as_mut() { + return Poll::Ready(Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + ))); + } + + let writer = self.file.as_mut().unwrap(); + pin!(writer).poll_shutdown(cx) + } +} + +#[async_trait] +impl BlobWriter for SimpleFilesystemBlobWriter { + async fn close(&mut self) -> io::Result<B3Digest> { + if let Err(e) = self.file.as_mut() { + return Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + )); + } + + let writer = self.file.as_mut().unwrap(); + writer.sync_all().await?; + writer.flush().await?; + + let digest: B3Digest = self.digester.finalize().as_bytes().into(); + let dst_path = derive_path(&self.root, &digest); + tokio::fs::create_dir_all(dst_path.parent().unwrap()).await?; + tokio::fs::rename(writer.file_path(), dst_path).await?; + + Ok(digest) + } +} |