From 52a61e353bc68690e1a605140985ccae7d80a27b Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sun, 31 Dec 2023 18:54:52 +0200 Subject: feat(tvix/store/bin): factor out import While at it, make it a bit more generic. Change-Id: Ic4caefda93aca3ffb656a09f8b4d648b41415532 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10511 Autosubmit: flokli Reviewed-by: raitobezarius Tested-by: BuildkiteCI --- tvix/store/src/bin/tvix-store.rs | 127 +++++---------------------------------- tvix/store/src/utils.rs | 118 +++++++++++++++++++++++++++++++++++- 2 files changed, 132 insertions(+), 113 deletions(-) (limited to 'tvix/store/src') diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index ef66d6b7ba..9136c34ff8 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -1,27 +1,22 @@ use clap::Subcommand; -use data_encoding::BASE64; + use futures::future::try_join_all; -use nix_compat::store_path; -use std::path::Path; + use std::path::PathBuf; use std::sync::Arc; -use tokio::task::JoinHandle; use tokio_listener::Listener; use tokio_listener::SystemOptions; use tokio_listener::UserOptions; + use tracing_subscriber::prelude::*; -use tvix_castore::import; + use tvix_castore::proto::blob_service_server::BlobServiceServer; use tvix_castore::proto::directory_service_server::DirectoryServiceServer; -use tvix_castore::proto::node::Node; use tvix_castore::proto::GRPCBlobServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper; use tvix_store::pathinfoservice::PathInfoService; -use tvix_store::proto::nar_info; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; -use tvix_store::proto::NarInfo; -use tvix_store::proto::PathInfo; #[cfg(any(feature = "fuse", feature = "virtiofs"))] use tvix_store::pathinfoservice::make_fs; @@ -253,86 +248,25 @@ async fn main() -> Result<(), Box> { let tasks = paths .into_iter() .map(|path| { - let task: JoinHandle> = tokio::task::spawn({ + tokio::task::spawn({ let blob_service = blob_service.clone(); let directory_service = directory_service.clone(); let path_info_service = path_info_service.clone(); async move { - // calculate the name - let name = path - .file_name() - .and_then(|file_name| file_name.to_str()) - .ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "path must not be .. and the basename valid unicode", - ) - })?; - - // Ingest the path into blob and directory service. - let root_node = import::ingest_path( - blob_service.clone(), - directory_service.clone(), - &path, + let resp = tvix_store::utils::import_path( + path, + blob_service, + directory_service, + path_info_service, ) - .await - .expect("failed to ingest path"); - - // Ask the PathInfoService for the NAR size and sha256 - let (nar_size, nar_sha256) = - path_info_service.calculate_nar(&root_node).await?; - - // Calculate the output path. This might still fail, as some names are illegal. - let output_path = - store_path::build_nar_based_store_path(&nar_sha256, name).map_err( - |_| { - std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("invalid name: {}", name), - ) - }, - )?; - - // assemble a new root_node with a name that is derived from the nar hash. - let root_node = - root_node.rename(output_path.to_string().into_bytes().into()); - - // assemble the [crate::proto::PathInfo] object. - let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(root_node), - }), - // There's no reference scanning on path contents ingested like this. - references: vec![], - narinfo: Some(NarInfo { - nar_size, - nar_sha256: nar_sha256.to_vec().into(), - signatures: vec![], - reference_names: vec![], - deriver: None, - ca: Some(nar_info::Ca { - r#type: tvix_store::proto::nar_info::ca::Hash::NarSha256 - .into(), - digest: nar_sha256.to_vec().into(), - }), - }), - }; - - // put into [PathInfoService], and return the PathInfo that we get back - // from there (it might contain additional signatures). - let path_info = path_info_service.put(path_info).await?; - - let node = path_info.node.unwrap().node.unwrap(); - - log_node(&node, &path); - - println!("{}", output_path.to_absolute_path()); - - Ok(()) + .await; + if let Ok(output_path) = resp { + // If the import was successful, print the path to stdout. + println!("{}", output_path.to_absolute_path()); + } } - }); - task + }) }) .collect::>(); @@ -411,32 +345,3 @@ async fn main() -> Result<(), Box> { }; Ok(()) } - -fn log_node(node: &Node, path: &Path) { - match node { - Node::Directory(directory_node) => { - info!( - path = ?path, - name = ?directory_node.name, - digest = BASE64.encode(&directory_node.digest), - "import successful", - ) - } - Node::File(file_node) => { - info!( - path = ?path, - name = ?file_node.name, - digest = BASE64.encode(&file_node.digest), - "import successful" - ) - } - Node::Symlink(symlink_node) => { - info!( - path = ?path, - name = ?symlink_node.name, - target = ?symlink_node.target, - "import successful" - ) - } - } -} diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index 041a9e683d..05d4f79b12 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -1,11 +1,18 @@ -use std::sync::Arc; +use std::{ops::Deref, path::Path, sync::Arc}; +use data_encoding::BASE64; +use nix_compat::store_path::{self, StorePath}; +use tracing::{debug, instrument}; use tvix_castore::{ blobservice::{self, BlobService}, directoryservice::{self, DirectoryService}, + proto::node::Node, }; -use crate::pathinfoservice::{self, PathInfoService}; +use crate::{ + pathinfoservice::{self, PathInfoService}, + proto::{nar_info, NarInfo, PathInfo}, +}; /// Construct the three store handles from their addrs. pub async fn construct_services( @@ -33,3 +40,110 @@ pub async fn construct_services( Ok((blob_service, directory_service, path_info_service)) } + +/// Imports a given path on the filesystem into the store, and returns the +/// [PathInfo] describing the path, that was sent to +/// [PathInfoService]. +#[instrument(skip_all, fields(path=?path), err)] +pub async fn import_path( + path: P, + blob_service: BS, + directory_service: DS, + path_info_service: PS, +) -> Result +where + P: AsRef + std::fmt::Debug, + BS: Deref + Clone, + DS: Deref + Clone, + PS: Deref, +{ + // calculate the name + // TODO: make a path_to_name helper function? + let name = path + .as_ref() + .file_name() + .and_then(|file_name| file_name.to_str()) + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "path must not be .. and the basename valid unicode", + ) + })?; + + // Ingest the path into blob and directory service. + let root_node = tvix_castore::import::ingest_path(blob_service, directory_service, &path) + .await + .expect("failed to ingest path"); + + debug!(root_node =?root_node, "import successful"); + + // Ask the PathInfoService for the NAR size and sha256 + let (nar_size, nar_sha256) = path_info_service.calculate_nar(&root_node).await?; + + // Calculate the output path. This might still fail, as some names are illegal. + let output_path = store_path::build_nar_based_store_path(&nar_sha256, name).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid name: {}", name), + ) + })?; + + // assemble a new root_node with a name that is derived from the nar hash. + let root_node = root_node.rename(output_path.to_string().into_bytes().into()); + log_node(&root_node, path.as_ref()); + + // assemble the [crate::proto::PathInfo] object. + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node), + }), + // There's no reference scanning on path contents ingested like this. + references: vec![], + narinfo: Some(NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: vec![], + deriver: None, + ca: Some(nar_info::Ca { + r#type: nar_info::ca::Hash::NarSha256.into(), + digest: nar_sha256.to_vec().into(), + }), + }), + }; + + // put into [PathInfoService], and return the PathInfo that we get back + // from there (it might contain additional signatures). + let _path_info = path_info_service.put(path_info).await?; + + Ok(output_path.to_owned()) +} + +fn log_node(node: &Node, path: &Path) { + match node { + Node::Directory(directory_node) => { + debug!( + path = ?path, + name = ?directory_node.name, + digest = BASE64.encode(&directory_node.digest), + "import successful", + ) + } + Node::File(file_node) => { + debug!( + path = ?path, + name = ?file_node.name, + digest = BASE64.encode(&file_node.digest), + "import successful" + ) + } + Node::Symlink(symlink_node) => { + debug!( + path = ?path, + name = ?symlink_node.name, + target = ?symlink_node.target, + "import successful" + ) + } + } +} -- cgit 1.4.1