about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice/simplefs.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/blobservice/simplefs.rs')
-rw-r--r--tvix/castore/src/blobservice/simplefs.rs195
1 files changed, 195 insertions, 0 deletions
diff --git a/tvix/castore/src/blobservice/simplefs.rs b/tvix/castore/src/blobservice/simplefs.rs
new file mode 100644
index 000000000000..b21db2808c23
--- /dev/null
+++ b/tvix/castore/src/blobservice/simplefs.rs
@@ -0,0 +1,195 @@
+use std::{
+    io,
+    path::{Path, PathBuf},
+    pin::pin,
+    task::Poll,
+};
+
+use bytes::Buf;
+use data_encoding::HEXLOWER;
+use pin_project_lite::pin_project;
+use tokio::io::AsyncWriteExt;
+use tonic::async_trait;
+use tracing::instrument;
+
+use crate::B3Digest;
+
+use super::{BlobReader, BlobService, BlobWriter};
+
+/// Connects to a tvix-store BlobService on an existing path backed by a POSIX-compliant
+/// filesystem.
+///
+/// It takes an existing path, builds a `tmp` directory and a `blobs` directory inside of it. All
+/// blobs received are staged in that `tmp` directory, then they are moved **atomically** into
+/// `blobs/B3DIGEST[:2]/B3DIGEST[2:]` in a sharding style, e.g. `abcdef` gets turned into `ab/cdef`
+///
+/// **Disclaimer** : This very simple implementation is subject to change and does not give any
+/// final guarantees on the on-disk format.
+#[derive(Clone)]
+pub struct SimpleFilesystemBlobService {
+    /// Where the blobs are located on a filesystem already mounted.
+    path: PathBuf,
+}
+
+impl SimpleFilesystemBlobService {
+    pub async fn new(path: PathBuf) -> std::io::Result<Self> {
+        tokio::fs::create_dir_all(&path).await?;
+        tokio::fs::create_dir_all(path.join("tmp")).await?;
+        tokio::fs::create_dir_all(path.join("blobs")).await?;
+
+        Ok(Self { path })
+    }
+}
+
+fn derive_path(root: &Path, digest: &B3Digest) -> PathBuf {
+    let prefix = HEXLOWER.encode(&digest.as_slice()[..2]);
+    let pathname = HEXLOWER.encode(digest.as_slice());
+
+    root.join("blobs").join(prefix).join(pathname)
+}
+
+#[async_trait]
+impl BlobService for SimpleFilesystemBlobService {
+    #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
+        Ok(tokio::fs::try_exists(derive_path(&self.path, digest)).await?)
+    }
+
+    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
+        let dst_path = derive_path(&self.path, digest);
+        let reader = match tokio::fs::File::open(dst_path).await {
+            Ok(file) => {
+                let reader: Box<dyn BlobReader> = Box::new(file);
+                Ok(Some(reader))
+            }
+            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
+            Err(e) => Err(e),
+        };
+
+        Ok(reader?)
+    }
+
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        let file = match async_tempfile::TempFile::new_in(self.path.join("tmp")).await {
+            Ok(file) => Ok(file),
+            Err(e) => match e {
+                async_tempfile::Error::Io(io_error) => Err(io_error),
+                async_tempfile::Error::InvalidFile => Err(std::io::Error::new(
+                    std::io::ErrorKind::NotFound,
+                    "invalid or missing file specified",
+                )),
+                async_tempfile::Error::InvalidDirectory => Err(std::io::Error::new(
+                    std::io::ErrorKind::NotFound,
+                    "invalid or missing directory specified",
+                )),
+            },
+        };
+
+        Box::new(SimpleFilesystemBlobWriter {
+            root: self.path.clone(),
+            file,
+            digester: blake3::Hasher::new(),
+        })
+    }
+}
+
+pin_project! {
+    struct SimpleFilesystemBlobWriter {
+        root: PathBuf,
+        file: std::io::Result<async_tempfile::TempFile>,
+        digester: blake3::Hasher
+    }
+}
+
+impl tokio::io::AsyncWrite for SimpleFilesystemBlobWriter {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<Result<usize, std::io::Error>> {
+        if let Err(e) = self.file.as_mut() {
+            return Poll::Ready(Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            )));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        match pin!(writer).poll_write(cx, buf) {
+            Poll::Ready(Ok(n)) => {
+                let this = self.project();
+                this.digester.update(buf.take(n).into_inner());
+                Poll::Ready(Ok(n))
+            }
+            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+            Poll::Pending => Poll::Pending,
+        }
+    }
+
+    fn poll_flush(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), std::io::Error>> {
+        if let Err(e) = self.file.as_mut() {
+            return Poll::Ready(Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            )));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        pin!(writer).poll_flush(cx)
+    }
+
+    fn poll_shutdown(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), std::io::Error>> {
+        if let Err(e) = self.file.as_mut() {
+            return Poll::Ready(Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            )));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        pin!(writer).poll_shutdown(cx)
+    }
+}
+
+#[async_trait]
+impl BlobWriter for SimpleFilesystemBlobWriter {
+    async fn close(&mut self) -> io::Result<B3Digest> {
+        if let Err(e) = self.file.as_mut() {
+            return Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            ));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        writer.sync_all().await?;
+        writer.flush().await?;
+
+        let digest: B3Digest = self.digester.finalize().as_bytes().into();
+        let dst_path = derive_path(&self.root, &digest);
+        tokio::fs::create_dir_all(dst_path.parent().unwrap()).await?;
+        tokio::fs::rename(writer.file_path(), dst_path).await?;
+
+        Ok(digest)
+    }
+}