about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-03-19T14·26+0200
committerclbot <clbot@tvl.fyi>2024-03-20T12·17+0000
commit345cebaebb1bf37b0bf251032428f5ed85dd26e3 (patch)
tree50595679e131f884318a225013a72df2d3353471 /tvix/castore/src/blobservice
parent2798803f76687bdeceb10cdaf31ef3013d30acbc (diff)
refactor(tvix/castore/blob): drop simplefs r/7750
This functionality is provided by the object store backend too
(using `objectstore+file://$some_path`).

This backend also supports content-defined chunking and compresses
chunks with zstd.

Change-Id: I5968c713112c400d23897c59db06b6c713c9d8cb
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11205
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r--tvix/castore/src/blobservice/from_addr.rs11
-rw-r--r--tvix/castore/src/blobservice/mod.rs2
-rw-r--r--tvix/castore/src/blobservice/simplefs.rs196
3 files changed, 1 insertions, 208 deletions
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs
index 1f37ef61edf6..1473ee7e9cdc 100644
--- a/tvix/castore/src/blobservice/from_addr.rs
+++ b/tvix/castore/src/blobservice/from_addr.rs
@@ -3,8 +3,7 @@ use url::Url;
 use crate::{proto::blob_service_client::BlobServiceClient, Error};
 
 use super::{
-    BlobService, GRPCBlobService, MemoryBlobService, ObjectStoreBlobService,
-    SimpleFilesystemBlobService, SledBlobService,
+    BlobService, GRPCBlobService, MemoryBlobService, ObjectStoreBlobService, SledBlobService,
 };
 
 /// Constructs a new instance of a [BlobService] from an URI.
@@ -13,7 +12,6 @@ use super::{
 /// - `memory://` ([MemoryBlobService])
 /// - `sled://` ([SledBlobService])
 /// - `grpc+*://` ([GRPCBlobService])
-/// - `simplefs://` ([SimpleFilesystemBlobService])
 ///
 /// See their `from_url` methods for more details about their syntax.
 pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error> {
@@ -58,13 +56,6 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error>
             let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?);
             Box::new(GRPCBlobService::from_client(client))
         }
-        "simplefs" => {
-            if url.path().is_empty() {
-                return Err(Error::StorageError("Invalid filesystem path".to_string()));
-            }
-
-            Box::new(SimpleFilesystemBlobService::new(url.path().into()).await?)
-        }
         scheme if scheme.starts_with("objectstore+") => {
             // We need to convert the URL to string, strip the prefix there, and then
             // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do.
diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs
index 7324fcf5168a..5eb88a479427 100644
--- a/tvix/castore/src/blobservice/mod.rs
+++ b/tvix/castore/src/blobservice/mod.rs
@@ -11,7 +11,6 @@ mod grpc;
 mod memory;
 mod naive_seeker;
 mod object_store;
-mod simplefs;
 mod sled;
 
 #[cfg(test)]
@@ -23,7 +22,6 @@ pub use self::from_addr::from_addr;
 pub use self::grpc::GRPCBlobService;
 pub use self::memory::MemoryBlobService;
 pub use self::object_store::ObjectStoreBlobService;
-pub use self::simplefs::SimpleFilesystemBlobService;
 pub use self::sled::SledBlobService;
 
 /// The base trait all BlobService services need to implement.
diff --git a/tvix/castore/src/blobservice/simplefs.rs b/tvix/castore/src/blobservice/simplefs.rs
deleted file mode 100644
index 2dcb24f34221..000000000000
--- a/tvix/castore/src/blobservice/simplefs.rs
+++ /dev/null
@@ -1,196 +0,0 @@
-use std::{
-    io,
-    path::{Path, PathBuf},
-    pin::pin,
-    task::Poll,
-};
-
-use bytes::Buf;
-use data_encoding::HEXLOWER;
-use pin_project_lite::pin_project;
-use tokio::io::AsyncWriteExt;
-use tonic::async_trait;
-use tracing::instrument;
-
-use crate::B3Digest;
-
-use super::{BlobReader, BlobService, BlobWriter};
-
-/// Connects to a tvix-store BlobService on an existing path backed by a POSIX-compliant
-/// filesystem.
-///
-/// It takes an existing path, builds a `tmp` directory and a `blobs` directory inside of it. All
-/// blobs received are staged in that `tmp` directory, then they are moved **atomically** into
-/// `blobs/B3DIGEST[:2]/B3DIGEST[2:]` in a sharding style, e.g. `abcdef` gets turned into `ab/cdef`
-///
-/// **Disclaimer** : This very simple implementation is subject to change and does not give any
-/// final guarantees on the on-disk format.
-/// TODO: migrate to object_store?
-#[derive(Clone)]
-pub struct SimpleFilesystemBlobService {
-    /// Where the blobs are located on a filesystem already mounted.
-    path: PathBuf,
-}
-
-impl SimpleFilesystemBlobService {
-    pub async fn new(path: PathBuf) -> std::io::Result<Self> {
-        tokio::fs::create_dir_all(&path).await?;
-        tokio::fs::create_dir_all(path.join("tmp")).await?;
-        tokio::fs::create_dir_all(path.join("blobs")).await?;
-
-        Ok(Self { path })
-    }
-}
-
-fn derive_path(root: &Path, digest: &B3Digest) -> PathBuf {
-    let prefix = HEXLOWER.encode(&digest.as_slice()[..2]);
-    let pathname = HEXLOWER.encode(digest.as_slice());
-
-    root.join("blobs").join(prefix).join(pathname)
-}
-
-#[async_trait]
-impl BlobService for SimpleFilesystemBlobService {
-    #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
-    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
-        Ok(tokio::fs::try_exists(derive_path(&self.path, digest)).await?)
-    }
-
-    #[instrument(skip_all, err, fields(blob.digest=%digest))]
-    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
-        let dst_path = derive_path(&self.path, digest);
-        let reader = match tokio::fs::File::open(dst_path).await {
-            Ok(file) => {
-                let reader: Box<dyn BlobReader> = Box::new(file);
-                Ok(Some(reader))
-            }
-            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
-            Err(e) => Err(e),
-        };
-
-        Ok(reader?)
-    }
-
-    #[instrument(skip_all)]
-    async fn open_write(&self) -> Box<dyn BlobWriter> {
-        let file = match async_tempfile::TempFile::new_in(self.path.join("tmp")).await {
-            Ok(file) => Ok(file),
-            Err(e) => match e {
-                async_tempfile::Error::Io(io_error) => Err(io_error),
-                async_tempfile::Error::InvalidFile => Err(std::io::Error::new(
-                    std::io::ErrorKind::NotFound,
-                    "invalid or missing file specified",
-                )),
-                async_tempfile::Error::InvalidDirectory => Err(std::io::Error::new(
-                    std::io::ErrorKind::NotFound,
-                    "invalid or missing directory specified",
-                )),
-            },
-        };
-
-        Box::new(SimpleFilesystemBlobWriter {
-            root: self.path.clone(),
-            file,
-            digester: blake3::Hasher::new(),
-        })
-    }
-}
-
-pin_project! {
-    struct SimpleFilesystemBlobWriter {
-        root: PathBuf,
-        file: std::io::Result<async_tempfile::TempFile>,
-        digester: blake3::Hasher
-    }
-}
-
-impl tokio::io::AsyncWrite for SimpleFilesystemBlobWriter {
-    fn poll_write(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> std::task::Poll<Result<usize, std::io::Error>> {
-        if let Err(e) = self.file.as_mut() {
-            return Poll::Ready(Err(std::mem::replace(
-                e,
-                std::io::Error::new(
-                    std::io::ErrorKind::NotConnected,
-                    "this file is already closed",
-                ),
-            )));
-        }
-
-        let writer = self.file.as_mut().unwrap();
-        match pin!(writer).poll_write(cx, buf) {
-            Poll::Ready(Ok(n)) => {
-                let this = self.project();
-                this.digester.update(buf.take(n).into_inner());
-                Poll::Ready(Ok(n))
-            }
-            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
-            Poll::Pending => Poll::Pending,
-        }
-    }
-
-    fn poll_flush(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), std::io::Error>> {
-        if let Err(e) = self.file.as_mut() {
-            return Poll::Ready(Err(std::mem::replace(
-                e,
-                std::io::Error::new(
-                    std::io::ErrorKind::NotConnected,
-                    "this file is already closed",
-                ),
-            )));
-        }
-
-        let writer = self.file.as_mut().unwrap();
-        pin!(writer).poll_flush(cx)
-    }
-
-    fn poll_shutdown(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), std::io::Error>> {
-        if let Err(e) = self.file.as_mut() {
-            return Poll::Ready(Err(std::mem::replace(
-                e,
-                std::io::Error::new(
-                    std::io::ErrorKind::NotConnected,
-                    "this file is already closed",
-                ),
-            )));
-        }
-
-        let writer = self.file.as_mut().unwrap();
-        pin!(writer).poll_shutdown(cx)
-    }
-}
-
-#[async_trait]
-impl BlobWriter for SimpleFilesystemBlobWriter {
-    async fn close(&mut self) -> io::Result<B3Digest> {
-        if let Err(e) = self.file.as_mut() {
-            return Err(std::mem::replace(
-                e,
-                std::io::Error::new(
-                    std::io::ErrorKind::NotConnected,
-                    "this file is already closed",
-                ),
-            ));
-        }
-
-        let writer = self.file.as_mut().unwrap();
-        writer.sync_all().await?;
-        writer.flush().await?;
-
-        let digest: B3Digest = self.digester.finalize().as_bytes().into();
-        let dst_path = derive_path(&self.root, &digest);
-        tokio::fs::create_dir_all(dst_path.parent().unwrap()).await?;
-        tokio::fs::rename(writer.file_path(), dst_path).await?;
-
-        Ok(digest)
-    }
-}