about summary refs log tree commit diff
diff options
context:
space:
mode:
authorRyan Lahfa <tvl@lahfa.xyz>2023-12-17T00·22+0100
committerclbot <clbot@tvl.fyi>2023-12-17T14·34+0000
commit0ae32d45f690eed8f259ac55b4d0d8bbc006baf2 (patch)
tree23d4b725053bf0a6bc5b41d79ce00a063c90cb38
parent923a5737e61da020e6d7672c3aebc00db9f44850 (diff)
feat(tvix/castore): simple filesystem blob service r/7228
The simple filesystem `BlobService` enable a user to write blob store
on an existing filesystem using a prefix-style layout in the provided root directory,
e.g. the two first bytes of the blake3 hashes are used as directories prefixes.

Change-Id: I3451a688a6f39027b9c6517d853b95a87adb3a52
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10071
Autosubmit: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
-rw-r--r--tvix/Cargo.lock20
-rw-r--r--tvix/Cargo.nix87
-rw-r--r--tvix/castore/Cargo.toml1
-rw-r--r--tvix/castore/src/blobservice/from_addr.rs11
-rw-r--r--tvix/castore/src/blobservice/mod.rs3
-rw-r--r--tvix/castore/src/blobservice/simplefs.rs195
6 files changed, 316 insertions, 1 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index b1e6c7d70637..297adf5c846a 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -139,6 +139,16 @@ dependencies = [
 ]
 
 [[package]]
+name = "async-tempfile"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b37d4bb113c47e4f263d4b0221912ff5aa840a51bc9b7b47b024e1cf1926fd9b"
+dependencies = [
+ "tokio",
+ "uuid",
+]
+
+[[package]]
 name = "async-trait"
 version = "0.1.68"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3093,6 +3103,7 @@ name = "tvix-castore"
 version = "0.1.0"
 dependencies = [
  "async-stream",
+ "async-tempfile",
  "blake3",
  "bstr",
  "bytes",
@@ -3335,6 +3346,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
 
 [[package]]
+name = "uuid"
+version = "1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc"
+dependencies = [
+ "getrandom",
+]
+
+[[package]]
 name = "valuable"
 version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index bf280efa37da..2c9120376871 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -470,6 +470,40 @@ rec {
         ];
 
       };
+      "async-tempfile" = rec {
+        crateName = "async-tempfile";
+        version = "0.4.0";
+        edition = "2021";
+        sha256 = "16zx4qcwzq94n13pp6xwa4589apm5y8j20jb7lk4yzn42fqlnzdk";
+        authors = [
+          "Markus Mayer"
+        ];
+        dependencies = [
+          {
+            name = "tokio";
+            packageId = "tokio";
+            features = [ "fs" ];
+          }
+          {
+            name = "uuid";
+            packageId = "uuid";
+            optional = true;
+            features = [ "v4" ];
+          }
+        ];
+        devDependencies = [
+          {
+            name = "tokio";
+            packageId = "tokio";
+            features = [ "rt-multi-thread" "macros" ];
+          }
+        ];
+        features = {
+          "default" = [ "uuid" ];
+          "uuid" = [ "dep:uuid" ];
+        };
+        resolvedDefaultFeatures = [ "default" "uuid" ];
+      };
       "async-trait" = rec {
         crateName = "async-trait";
         version = "0.1.68";
@@ -9554,6 +9588,10 @@ rec {
             packageId = "async-stream";
           }
           {
+            name = "async-tempfile";
+            packageId = "async-tempfile";
+          }
+          {
             name = "blake3";
             packageId = "blake3";
             features = [ "rayon" "std" ];
@@ -10410,6 +10448,55 @@ rec {
         features = { };
         resolvedDefaultFeatures = [ "default" ];
       };
+      "uuid" = rec {
+        crateName = "uuid";
+        version = "1.5.0";
+        edition = "2018";
+        sha256 = "1z6dnvba224p8wvv4vx4xpgc2yxqy12sk4qh346sfh8baskmkbc8";
+        authors = [
+          "Ashley Mannix<ashleymannix@live.com.au>"
+          "Christopher Armstrong"
+          "Dylan DPC<dylan.dpc@gmail.com>"
+          "Hunar Roop Kahlon<hunar.roop@gmail.com>"
+        ];
+        dependencies = [
+          {
+            name = "getrandom";
+            packageId = "getrandom";
+            rename = "getrandom";
+            optional = true;
+          }
+        ];
+        features = {
+          "arbitrary" = [ "dep:arbitrary" ];
+          "atomic" = [ "dep:atomic" ];
+          "borsh" = [ "dep:borsh" ];
+          "bytemuck" = [ "dep:bytemuck" ];
+          "default" = [ "std" ];
+          "fast-rng" = [ "rng" "rand" ];
+          "getrandom" = [ "dep:getrandom" ];
+          "js" = [ "wasm-bindgen" "getrandom" "getrandom/js" ];
+          "macro-diagnostics" = [ "uuid-macro-internal" ];
+          "md-5" = [ "dep:md-5" ];
+          "md5" = [ "md-5" ];
+          "rand" = [ "dep:rand" ];
+          "rng" = [ "getrandom" ];
+          "serde" = [ "dep:serde" ];
+          "sha1" = [ "sha1_smol" ];
+          "sha1_smol" = [ "dep:sha1_smol" ];
+          "slog" = [ "dep:slog" ];
+          "uuid-macro-internal" = [ "dep:uuid-macro-internal" ];
+          "v1" = [ "atomic" ];
+          "v3" = [ "md5" ];
+          "v4" = [ "rng" ];
+          "v5" = [ "sha1" ];
+          "v6" = [ "atomic" ];
+          "v7" = [ "atomic" "rng" ];
+          "wasm-bindgen" = [ "dep:wasm-bindgen" ];
+          "zerocopy" = [ "dep:zerocopy" ];
+        };
+        resolvedDefaultFeatures = [ "default" "getrandom" "rng" "std" "v4" ];
+      };
       "valuable" = rec {
         crateName = "valuable";
         version = "0.1.0";
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index 468ef8ec3528..2a421280b82b 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -23,6 +23,7 @@ tracing = "0.1.37"
 url = "2.4.0"
 walkdir = "2.4.0"
 bstr = "1.6.0"
+async-tempfile = "0.4.0"
 
 [dependencies.tonic-reflection]
 optional = true
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs
index 97e185464d3c..8f8e59952662 100644
--- a/tvix/castore/src/blobservice/from_addr.rs
+++ b/tvix/castore/src/blobservice/from_addr.rs
@@ -3,7 +3,9 @@ use url::Url;
 
 use crate::{proto::blob_service_client::BlobServiceClient, Error};
 
-use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService};
+use super::{
+    BlobService, GRPCBlobService, MemoryBlobService, SimpleFilesystemBlobService, SledBlobService,
+};
 
 /// Constructs a new instance of a [BlobService] from an URI.
 ///
@@ -11,6 +13,7 @@ use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService};
 /// - `memory://` ([MemoryBlobService])
 /// - `sled://` ([SledBlobService])
 /// - `grpc+*://` ([GRPCBlobService])
+/// - `simplefs://` ([SimpleFilesystemBlobService])
 ///
 /// See their `from_url` methods for more details about their syntax.
 pub async fn from_addr(uri: &str) -> Result<Arc<dyn BlobService>, crate::Error> {
@@ -54,6 +57,12 @@ pub async fn from_addr(uri: &str) -> Result<Arc<dyn BlobService>, crate::Error>
         // Constructing the channel is handled by tvix_castore::channel::from_url.
         let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?);
         Arc::new(GRPCBlobService::from_client(client))
+    } else if url.scheme() == "simplefs" {
+        if url.path().is_empty() {
+            return Err(Error::StorageError("Invalid filesystem path".to_string()));
+        }
+
+        Arc::new(SimpleFilesystemBlobService::new(url.path().into()).await?)
     } else {
         Err(crate::Error::StorageError(format!(
             "unknown scheme: {}",
diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs
index faaf94f03746..d024121eaa39 100644
--- a/tvix/castore/src/blobservice/mod.rs
+++ b/tvix/castore/src/blobservice/mod.rs
@@ -7,6 +7,7 @@ mod from_addr;
 mod grpc;
 mod memory;
 mod naive_seeker;
+mod simplefs;
 mod sled;
 
 #[cfg(test)]
@@ -15,6 +16,7 @@ mod tests;
 pub use self::from_addr::from_addr;
 pub use self::grpc::GRPCBlobService;
 pub use self::memory::MemoryBlobService;
+pub use self::simplefs::SimpleFilesystemBlobService;
 pub use self::sled::SledBlobService;
 
 /// The base trait all BlobService services need to implement.
@@ -51,3 +53,4 @@ pub trait BlobReader: tokio::io::AsyncRead + tokio::io::AsyncSeek + Send + Unpin
 
 /// A [`io::Cursor<Vec<u8>>`] can be used as a BlobReader.
 impl BlobReader for io::Cursor<Vec<u8>> {}
+impl BlobReader for tokio::fs::File {}
diff --git a/tvix/castore/src/blobservice/simplefs.rs b/tvix/castore/src/blobservice/simplefs.rs
new file mode 100644
index 000000000000..b21db2808c23
--- /dev/null
+++ b/tvix/castore/src/blobservice/simplefs.rs
@@ -0,0 +1,195 @@
+use std::{
+    io,
+    path::{Path, PathBuf},
+    pin::pin,
+    task::Poll,
+};
+
+use bytes::Buf;
+use data_encoding::HEXLOWER;
+use pin_project_lite::pin_project;
+use tokio::io::AsyncWriteExt;
+use tonic::async_trait;
+use tracing::instrument;
+
+use crate::B3Digest;
+
+use super::{BlobReader, BlobService, BlobWriter};
+
+/// Connects to a tvix-store BlobService on an existing path backed by a POSIX-compliant
+/// filesystem.
+///
+/// It takes an existing path, builds a `tmp` directory and a `blobs` directory inside of it. All
+/// blobs received are staged in that `tmp` directory, then they are moved **atomically** into
+/// `blobs/B3DIGEST[:2]/B3DIGEST[2:]` in a sharding style, e.g. `abcdef` gets turned into `ab/cdef`
+///
+/// **Disclaimer** : This very simple implementation is subject to change and does not give any
+/// final guarantees on the on-disk format.
+#[derive(Clone)]
+pub struct SimpleFilesystemBlobService {
+    /// Where the blobs are located on a filesystem already mounted.
+    path: PathBuf,
+}
+
+impl SimpleFilesystemBlobService {
+    pub async fn new(path: PathBuf) -> std::io::Result<Self> {
+        tokio::fs::create_dir_all(&path).await?;
+        tokio::fs::create_dir_all(path.join("tmp")).await?;
+        tokio::fs::create_dir_all(path.join("blobs")).await?;
+
+        Ok(Self { path })
+    }
+}
+
+fn derive_path(root: &Path, digest: &B3Digest) -> PathBuf {
+    let prefix = HEXLOWER.encode(&digest.as_slice()[..2]);
+    let pathname = HEXLOWER.encode(digest.as_slice());
+
+    root.join("blobs").join(prefix).join(pathname)
+}
+
+#[async_trait]
+impl BlobService for SimpleFilesystemBlobService {
+    #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
+        Ok(tokio::fs::try_exists(derive_path(&self.path, digest)).await?)
+    }
+
+    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
+        let dst_path = derive_path(&self.path, digest);
+        let reader = match tokio::fs::File::open(dst_path).await {
+            Ok(file) => {
+                let reader: Box<dyn BlobReader> = Box::new(file);
+                Ok(Some(reader))
+            }
+            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
+            Err(e) => Err(e),
+        };
+
+        Ok(reader?)
+    }
+
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        let file = match async_tempfile::TempFile::new_in(self.path.join("tmp")).await {
+            Ok(file) => Ok(file),
+            Err(e) => match e {
+                async_tempfile::Error::Io(io_error) => Err(io_error),
+                async_tempfile::Error::InvalidFile => Err(std::io::Error::new(
+                    std::io::ErrorKind::NotFound,
+                    "invalid or missing file specified",
+                )),
+                async_tempfile::Error::InvalidDirectory => Err(std::io::Error::new(
+                    std::io::ErrorKind::NotFound,
+                    "invalid or missing directory specified",
+                )),
+            },
+        };
+
+        Box::new(SimpleFilesystemBlobWriter {
+            root: self.path.clone(),
+            file,
+            digester: blake3::Hasher::new(),
+        })
+    }
+}
+
+pin_project! {
+    struct SimpleFilesystemBlobWriter {
+        root: PathBuf,
+        file: std::io::Result<async_tempfile::TempFile>,
+        digester: blake3::Hasher
+    }
+}
+
+impl tokio::io::AsyncWrite for SimpleFilesystemBlobWriter {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<Result<usize, std::io::Error>> {
+        if let Err(e) = self.file.as_mut() {
+            return Poll::Ready(Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            )));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        match pin!(writer).poll_write(cx, buf) {
+            Poll::Ready(Ok(n)) => {
+                let this = self.project();
+                this.digester.update(buf.take(n).into_inner());
+                Poll::Ready(Ok(n))
+            }
+            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+            Poll::Pending => Poll::Pending,
+        }
+    }
+
+    fn poll_flush(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), std::io::Error>> {
+        if let Err(e) = self.file.as_mut() {
+            return Poll::Ready(Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            )));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        pin!(writer).poll_flush(cx)
+    }
+
+    fn poll_shutdown(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), std::io::Error>> {
+        if let Err(e) = self.file.as_mut() {
+            return Poll::Ready(Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            )));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        pin!(writer).poll_shutdown(cx)
+    }
+}
+
+#[async_trait]
+impl BlobWriter for SimpleFilesystemBlobWriter {
+    async fn close(&mut self) -> io::Result<B3Digest> {
+        if let Err(e) = self.file.as_mut() {
+            return Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            ));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        writer.sync_all().await?;
+        writer.flush().await?;
+
+        let digest: B3Digest = self.digester.finalize().as_bytes().into();
+        let dst_path = derive_path(&self.root, &digest);
+        tokio::fs::create_dir_all(dst_path.parent().unwrap()).await?;
+        tokio::fs::rename(writer.file_path(), dst_path).await?;
+
+        Ok(digest)
+    }
+}