about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-05-11T15·01+0300
committerflokli <flokli@flokli.de>2023-05-24T09·25+0000
commit5774117d5ea4115a33923640a39a553eb57df59a (patch)
tree2e77cdb73837f688a7bc268366d6846ef5a503f9
parent4b00f1d7ac855e90e15633a2e2ba3a90c3921fac (diff)
feat(tvix/store): implement TvixStoreIO r/6191
This providesEvalIO, asking given PathInfoService, DirectoryService
and BlobService.

Change-Id: I32f210f5a7aa8173ad9a7d53e8a5ac03619f527a
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8561
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
-rw-r--r--tvix/Cargo.lock3
-rw-r--r--tvix/Cargo.nix12
-rw-r--r--tvix/store/Cargo.toml3
-rw-r--r--tvix/store/src/bin/tvix-store.rs54
-rw-r--r--tvix/store/src/lib.rs2
-rw-r--r--tvix/store/src/store_io.rs363
6 files changed, 398 insertions, 39 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 9c2e464a6d..ad5bbeed53 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -2708,8 +2708,10 @@ dependencies = [
  "prost",
  "prost-build",
  "rayon",
+ "serde_json",
  "sha2 0.10.6",
  "sled",
+ "smol_str",
  "tempfile",
  "test-case",
  "thiserror",
@@ -2723,6 +2725,7 @@ dependencies = [
  "tower",
  "tracing",
  "tracing-subscriber",
+ "tvix-eval",
  "walkdir",
 ]
 
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index cf571a36b0..f7cc72d6f8 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -8033,6 +8033,10 @@ rec {
             packageId = "rayon";
           }
           {
+            name = "serde_json";
+            packageId = "serde_json";
+          }
+          {
             name = "sha2";
             packageId = "sha2 0.10.6";
           }
@@ -8042,6 +8046,10 @@ rec {
             features = [ "compression" ];
           }
           {
+            name = "smol_str";
+            packageId = "smol_str";
+          }
+          {
             name = "thiserror";
             packageId = "thiserror";
           }
@@ -8082,6 +8090,10 @@ rec {
             features = [ "json" ];
           }
           {
+            name = "tvix-eval";
+            packageId = "tvix-eval";
+          }
+          {
             name = "walkdir";
             packageId = "walkdir";
           }
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index be282dddb0..a88bdefd7a 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -15,6 +15,7 @@ prost = "0.11.2"
 rayon = "1.6.1"
 sha2 = "0.10.6"
 sled = { version = "0.34.7", features = ["compression"] }
+tvix-eval = { path = "../eval" }
 thiserror = "1.0.38"
 tokio-stream = "0.1.14"
 tokio = { version = "1.28.0", features = ["rt-multi-thread", "net"] }
@@ -26,6 +27,8 @@ tokio-util = { version = "0.7.8", features = ["io", "io-util"] }
 tower = "0.4.13"
 futures = "0.3.28"
 bytes = "1.4.0"
+smol_str = "0.2.0"
+serde_json = "1.0"
 
 [dependencies.tonic-reflection]
 optional = true
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index a6bda8c187..dbd9e2986e 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -1,16 +1,9 @@
 use clap::Subcommand;
 use data_encoding::BASE64;
-use nix_compat::derivation::Derivation;
-use nix_compat::derivation::Output;
-use nix_compat::nixhash::HashAlgo;
-use nix_compat::nixhash::NixHash;
-use nix_compat::nixhash::NixHashWithMode;
 use std::path::PathBuf;
 use tracing_subscriber::prelude::*;
 use tvix_store::blobservice::SledBlobService;
 use tvix_store::directoryservice::SledDirectoryService;
-use tvix_store::import::ingest_path;
-use tvix_store::nar::NARCalculationService;
 use tvix_store::nar::NonCachingNARCalculationService;
 use tvix_store::pathinfoservice::SledPathInfoService;
 use tvix_store::proto::blob_service_server::BlobServiceServer;
@@ -19,6 +12,7 @@ use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
 use tvix_store::proto::GRPCBlobServiceWrapper;
 use tvix_store::proto::GRPCDirectoryServiceWrapper;
 use tvix_store::proto::GRPCPathInfoServiceWrapper;
+use tvix_store::TvixStoreIO;
 
 #[cfg(feature = "reflection")]
 use tvix_store::proto::FILE_DESCRIPTOR_SET;
@@ -85,8 +79,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber");
 
     // initialize stores
-    let mut blob_service = SledBlobService::new("blobs.sled".into())?;
-    let mut directory_service = SledDirectoryService::new("directories.sled".into())?;
+    let blob_service = SledBlobService::new("blobs.sled".into())?;
+    let directory_service = SledDirectoryService::new("directories.sled".into())?;
     let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?;
 
     match cli.command {
@@ -134,37 +128,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 directory_service.clone(),
             );
 
+            let mut io = TvixStoreIO::new(
+                blob_service,
+                directory_service,
+                path_info_service,
+                nar_calculation_service,
+            );
+
             for path in paths {
-                let root_node = ingest_path(&mut blob_service, &mut directory_service, &path)?;
-
-                let nar_hash = NixHashWithMode::Recursive(NixHash::new(
-                    HashAlgo::Sha256,
-                    nar_calculation_service
-                        .calculate_nar(&root_node)?
-                        .1
-                        .to_vec(),
-                ));
-
-                let mut drv = Derivation::default();
-                drv.outputs.insert(
-                    "out".to_string(),
-                    Output {
-                        path: "".to_string(),
-                        hash_with_mode: Some(nar_hash),
-                    },
-                );
-                drv.calculate_output_paths(
-                    path.file_name()
-                        .expect("path must not be ..")
-                        .to_str()
-                        .expect("path must be valid unicode"),
-                    // Note the derivation_or_fod_hash argument is *unused* for FODs, so it doesn't matter what we pass here.
-                    &NixHash::new(HashAlgo::Sha256, vec![]),
-                )?;
-
-                println!("{}", drv.outputs.get("out").unwrap().path);
-
-                match root_node {
+                let path_info = io
+                    .import_path_with_pathinfo(&path)
+                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
+
+                match path_info.node.unwrap().node.unwrap() {
                     tvix_store::proto::node::Node::Directory(directory_node) => {
                         info!(
                             path = ?path,
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index da0b6400a1..7ae8587f8b 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,5 +1,6 @@
 mod digests;
 mod errors;
+mod store_io;
 
 pub mod blobservice;
 pub mod directoryservice;
@@ -10,6 +11,7 @@ pub mod proto;
 
 pub use digests::B3Digest;
 pub use errors::Error;
+pub use store_io::TvixStoreIO;
 
 #[cfg(test)]
 mod tests;
diff --git a/tvix/store/src/store_io.rs b/tvix/store/src/store_io.rs
new file mode 100644
index 0000000000..e4c45f9e59
--- /dev/null
+++ b/tvix/store/src/store_io.rs
@@ -0,0 +1,363 @@
+//! This module provides an implementation of EvalIO.
+//!
+//! It can be used by the tvix evalutator to talk to a tvix store.
+
+use data_encoding::BASE64;
+use nix_compat::{
+    nixhash::{HashAlgo, NixHash, NixHashWithMode},
+    store_path::{build_regular_ca_path, StorePath},
+};
+use smol_str::SmolStr;
+use std::{io, path::Path, path::PathBuf};
+use tracing::{error, instrument, warn};
+use tvix_eval::{EvalIO, FileType, StdIO};
+
+use crate::{
+    blobservice::BlobService,
+    directoryservice::{self, DirectoryService},
+    import,
+    nar::NARCalculationService,
+    pathinfoservice::PathInfoService,
+    proto::NamedNode,
+    B3Digest,
+};
+
+/// Implements [EvalIO], asking given [PathInfoService], [DirectoryService]
+/// and [BlobService].
+///
+/// In case the given path does not exist in these stores, we ask StdIO.
+/// This is to both cover cases of syntactically valid store paths, that exist
+/// on the filesystem (still managed by Nix), as well as being able to read
+/// files outside store paths.
+pub struct TvixStoreIO<
+    BS: BlobService,
+    DS: DirectoryService,
+    PS: PathInfoService,
+    NCS: NARCalculationService,
+> {
+    blob_service: BS,
+    directory_service: DS,
+    path_info_service: PS,
+    nar_calculation_service: NCS,
+    std_io: StdIO,
+}
+
+impl<BS: BlobService, DS: DirectoryService, PS: PathInfoService, NCS: NARCalculationService>
+    TvixStoreIO<BS, DS, PS, NCS>
+{
+    pub fn new(
+        blob_service: BS,
+        directory_service: DS,
+        path_info_service: PS,
+        nar_calculation_service: NCS,
+    ) -> Self {
+        Self {
+            blob_service,
+            directory_service,
+            path_info_service,
+            nar_calculation_service,
+            std_io: StdIO {},
+        }
+    }
+
+    /// for a given [StorePath] and additional [Path] inside the store path,
+    /// look up the [PathInfo], and if it exists, traverse the directory structure to
+    /// return the [crate::proto::node::Node] specified by `sub_path`.
+    #[instrument(skip(self), ret, err)]
+    fn store_path_to_root_node(
+        &mut self,
+        store_path: &StorePath,
+        sub_path: &Path,
+    ) -> Result<Option<crate::proto::node::Node>, crate::Error> {
+        let path_info = {
+            match self.path_info_service.get(store_path.digest)? {
+                // If there's no PathInfo found, early exit
+                None => return Ok(None),
+                Some(path_info) => path_info,
+            }
+        };
+
+        let root_node = {
+            match path_info.node {
+                None => {
+                    warn!(
+                        "returned PathInfo {:?} node is None, this shouldn't happen.",
+                        &path_info
+                    );
+                    return Ok(None);
+                }
+                Some(root_node) => match root_node.node {
+                    None => {
+                        warn!("node for {:?} is None, this shouldn't happen.", &root_node);
+                        return Ok(None);
+                    }
+                    Some(root_node) => root_node,
+                },
+            }
+        };
+
+        directoryservice::traverse_to(&mut self.directory_service, root_node, sub_path)
+    }
+
+    /// Imports a given path on the filesystem into the store, and returns the
+    /// [crate::proto::PathInfo] describing the path, that was sent to
+    /// [PathInfoService].
+    /// While not part of the [EvalIO], it's still useful for clients who
+    /// care about the [PathInfo].
+    #[instrument(skip(self), ret, err)]
+    pub fn import_path_with_pathinfo(
+        &mut self,
+        path: &std::path::Path,
+    ) -> Result<crate::proto::PathInfo, io::Error> {
+        // Call [import::ingest_path], which will walk over the given path and return a root_node.
+        let root_node =
+            import::ingest_path(&mut self.blob_service, &mut self.directory_service, path)
+                .expect("error during import_path");
+
+        // Render the NAR
+        let (nar_size, nar_sha256) = self
+            .nar_calculation_service
+            .calculate_nar(&root_node)
+            .expect("error during nar calculation"); // TODO: handle error
+
+        // For given NAR sha256 digest and name, return the new [StorePath] this would have.
+        let nar_hash_with_mode =
+            NixHashWithMode::Recursive(NixHash::new(HashAlgo::Sha256, nar_sha256.to_vec()));
+
+        let name = path
+            .file_name()
+            .expect("path must not be ..")
+            .to_str()
+            .expect("path must be valid unicode");
+
+        let output_path =
+            build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap();
+
+        // assemble a new root_node with a name that is derived from the nar hash.
+        let renamed_root_node = {
+            let name = output_path.to_string();
+
+            match root_node {
+                crate::proto::node::Node::Directory(n) => {
+                    crate::proto::node::Node::Directory(crate::proto::DirectoryNode { name, ..n })
+                }
+                crate::proto::node::Node::File(n) => {
+                    crate::proto::node::Node::File(crate::proto::FileNode { name, ..n })
+                }
+                crate::proto::node::Node::Symlink(n) => {
+                    crate::proto::node::Node::Symlink(crate::proto::SymlinkNode { name, ..n })
+                }
+            }
+        };
+
+        // assemble the [crate::proto::PathInfo] object.
+        let path_info = crate::proto::PathInfo {
+            node: Some(crate::proto::Node {
+                node: Some(renamed_root_node),
+            }),
+            // There's no reference scanning on path contents ingested like this.
+            references: vec![],
+            narinfo: Some(crate::proto::NarInfo {
+                nar_size,
+                nar_sha256: nar_sha256.to_vec(),
+                signatures: vec![],
+                reference_names: vec![],
+                // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6`
+                // do we need this anywhere?
+            }),
+        };
+
+        // put into [PathInfoService], and return the PathInfo that we get back
+        // from there (it might contain additional signatures).
+        let path_info = self.path_info_service.put(path_info)?;
+
+        Ok(path_info)
+    }
+}
+
+/// For given NAR sha256 digest and name, return the new [StorePath] this would have.
+#[instrument(skip(nar_sha256_digest), ret, fields(nar_sha256_digest=BASE64.encode(nar_sha256_digest)))]
+fn calculate_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> StorePath {
+    let nar_hash_with_mode =
+        NixHashWithMode::Recursive(NixHash::new(HashAlgo::Sha256, nar_sha256_digest.to_vec()));
+
+    build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap()
+}
+
+impl<
+        BS: BlobService + Clone,
+        DS: DirectoryService + Clone,
+        PS: PathInfoService,
+        NCS: NARCalculationService,
+    > EvalIO for TvixStoreIO<BS, DS, PS, NCS>
+{
+    #[instrument(skip(self), ret, err)]
+    fn path_exists(&mut self, path: &Path) -> Result<bool, io::Error> {
+        if let Ok((store_path, sub_path)) =
+            StorePath::from_absolute_path_full(&path.to_string_lossy())
+        {
+            if self
+                .store_path_to_root_node(&store_path, &sub_path)?
+                .is_some()
+            {
+                Ok(true)
+            } else {
+                // As tvix-store doesn't manage /nix/store on the filesystem,
+                // we still need to also ask self.std_io here.
+                self.std_io.path_exists(path)
+            }
+        } else {
+            // The store path is no store path, so do regular StdIO.
+            self.std_io.path_exists(path)
+        }
+    }
+
+    #[instrument(skip(self), ret, err)]
+    fn read_to_string(&mut self, path: &Path) -> Result<String, io::Error> {
+        if let Ok((store_path, sub_path)) =
+            StorePath::from_absolute_path_full(&path.to_string_lossy())
+        {
+            if let Some(node) = self.store_path_to_root_node(&store_path, &sub_path)? {
+                // depending on the node type, treat read_to_string differently
+                match node {
+                    crate::proto::node::Node::Directory(_) => {
+                        // This would normally be a io::ErrorKind::IsADirectory (still unstable)
+                        Err(io::Error::new(
+                            io::ErrorKind::Unsupported,
+                            "tried to read directory at {path} to string",
+                        ))
+                    }
+                    crate::proto::node::Node::File(file_node) => {
+                        let digest =
+                            B3Digest::from_vec(file_node.digest.clone()).map_err(|_e| {
+                                error!(
+                                    file_node = ?file_node,
+                                    "invalid digest"
+                                );
+                                io::Error::new(
+                                    io::ErrorKind::InvalidData,
+                                    format!("invalid digest length in file node: {:?}", file_node),
+                                )
+                            })?;
+
+                        let reader = {
+                            let resp = self.blob_service.open_read(&digest)?;
+                            match resp {
+                                Some(blob_reader) => blob_reader,
+                                None => {
+                                    error!(
+                                        blob.digest = %digest,
+                                        "blob not found",
+                                    );
+                                    Err(io::Error::new(
+                                        io::ErrorKind::NotFound,
+                                        format!("blob {} not found", &digest),
+                                    ))?
+                                }
+                            }
+                        };
+
+                        io::read_to_string(reader)
+                    }
+                    crate::proto::node::Node::Symlink(_symlink_node) => Err(io::Error::new(
+                        io::ErrorKind::Unsupported,
+                        "read_to_string for symlinks is unsupported",
+                    ))?,
+                }
+            } else {
+                // As tvix-store doesn't manage /nix/store on the filesystem,
+                // we still need to also ask self.std_io here.
+                self.std_io.read_to_string(path)
+            }
+        } else {
+            // The store path is no store path, so do regular StdIO.
+            self.std_io.read_to_string(path)
+        }
+    }
+
+    #[instrument(skip(self), ret, err)]
+    fn read_dir(&mut self, path: &Path) -> Result<Vec<(SmolStr, FileType)>, io::Error> {
+        if let Ok((store_path, sub_path)) =
+            StorePath::from_absolute_path_full(&path.to_string_lossy())
+        {
+            if let Some(node) = self.store_path_to_root_node(&store_path, &sub_path)? {
+                match node {
+                    crate::proto::node::Node::Directory(directory_node) => {
+                        // fetch the Directory itself.
+                        let digest =
+                            B3Digest::from_vec(directory_node.digest.clone()).map_err(|_e| {
+                                io::Error::new(
+                                    io::ErrorKind::InvalidData,
+                                    format!(
+                                        "invalid digest length in directory node: {:?}",
+                                        directory_node
+                                    ),
+                                )
+                            })?;
+
+                        if let Some(directory) = self.directory_service.get(&digest)? {
+                            let mut children: Vec<(SmolStr, FileType)> = Vec::new();
+                            for node in directory.nodes() {
+                                children.push(match node {
+                                    crate::proto::node::Node::Directory(e) => {
+                                        (e.name.into(), FileType::Directory)
+                                    }
+                                    crate::proto::node::Node::File(e) => {
+                                        (e.name.into(), FileType::Regular)
+                                    }
+                                    crate::proto::node::Node::Symlink(e) => {
+                                        (e.name.into(), FileType::Symlink)
+                                    }
+                                })
+                            }
+                            Ok(children)
+                        } else {
+                            // If we didn't get the directory node that's linked, that's a store inconsistency!
+                            error!(
+                                directory.digest = %digest,
+                                path = ?path,
+                                "directory not found",
+                            );
+                            Err(io::Error::new(
+                                io::ErrorKind::NotFound,
+                                format!("directory {digest} does not exist"),
+                            ))?
+                        }
+                    }
+                    crate::proto::node::Node::File(_file_node) => {
+                        // This would normally be a io::ErrorKind::NotADirectory (still unstable)
+                        Err(io::Error::new(
+                            io::ErrorKind::Unsupported,
+                            "tried to readdir path {:?}, which is a file",
+                        ))?
+                    }
+                    crate::proto::node::Node::Symlink(_symlink_node) => Err(io::Error::new(
+                        io::ErrorKind::Unsupported,
+                        "read_dir for symlinks is unsupported",
+                    ))?,
+                }
+            } else {
+                self.std_io.read_dir(path)
+            }
+        } else {
+            self.std_io.read_dir(path)
+        }
+    }
+
+    #[instrument(skip(self), ret, err)]
+    fn import_path(&mut self, path: &std::path::Path) -> Result<PathBuf, std::io::Error> {
+        let path_info = self.import_path_with_pathinfo(path)?;
+
+        // from the [PathInfo], extract the store path (as string).
+        let mut path = PathBuf::from(nix_compat::store_path::STORE_DIR_WITH_SLASH);
+        path.push(path_info.node.unwrap().node.unwrap().get_name());
+
+        // and return it
+        Ok(path)
+    }
+
+    #[instrument(skip(self), ret)]
+    fn store_dir(&self) -> Option<String> {
+        Some("/nix/store".to_string())
+    }
+}