//! This module provides an implementation of EvalIO talking to tvix-store.
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use nix_compat::store_path::StorePathRef;
use nix_compat::{nixhash::CAHash, store_path::StorePath};
use std::collections::BTreeMap;
use std::{
cell::RefCell,
io,
path::{Path, PathBuf},
sync::Arc,
};
use tokio_util::io::SyncIoBridge;
use tracing::{error, instrument, warn, Level, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use tvix_build::buildservice::BuildService;
use tvix_eval::{EvalIO, FileType, StdIO};
use tvix_store::nar::NarCalculationService;
use tvix_castore::{
blobservice::BlobService,
directoryservice::{self, DirectoryService},
Node,
};
use tvix_store::pathinfoservice::{PathInfo, PathInfoService};
use crate::fetchers::Fetcher;
use crate::known_paths::KnownPaths;
use crate::tvix_build::derivation_to_build_request;
/// Implements [EvalIO], asking given [PathInfoService], [DirectoryService]
/// and [BlobService].
///
/// In case the given path does not exist in these stores, we ask StdIO.
/// This is to both cover cases of syntactically valid store paths, that exist
/// on the filesystem (still managed by Nix), as well as being able to read
/// files outside store paths.
///
/// This structure is also directly used by the derivation builtins
/// and tightly coupled to it.
///
/// In the future, we may revisit that coupling and figure out how to generalize this interface and
/// hide this implementation detail of the glue itself so that glue can be used with more than one
/// implementation of "Tvix Store IO" which does not necessarily bring the concept of blob service,
/// directory service or path info service.
pub struct TvixStoreIO {
// This is public so helper functions can interact with the stores directly.
pub(crate) blob_service: Arc<dyn BlobService>,
pub(crate) directory_service: Arc<dyn DirectoryService>,
pub(crate) path_info_service: Arc<dyn PathInfoService>,
pub(crate) nar_calculation_service: Arc<dyn NarCalculationService>,
std_io: StdIO,
#[allow(dead_code)]
build_service: Arc<dyn BuildService>,
pub(crate) tokio_handle: tokio::runtime::Handle,
#[allow(clippy::type_complexity)]
pub(crate) fetcher: Fetcher<
Arc<dyn BlobService>,
Arc<dyn DirectoryService>,
Arc<dyn PathInfoService>,
Arc<dyn NarCalculationService>,
>,
// Paths known how to produce, by building or fetching.
pub known_paths: RefCell<KnownPaths>,
}
impl TvixStoreIO {
pub fn new(
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
path_info_service: Arc<dyn PathInfoService>,
nar_calculation_service: Arc<dyn NarCalculationService>,
build_service: Arc<dyn BuildService>,
tokio_handle: tokio::runtime::Handle,
) -> Self {
Self {
blob_service: blob_service.clone(),
directory_service: directory_service.clone(),
path_info_service: path_info_service.clone(),
nar_calculation_service: nar_calculation_service.clone(),
std_io: StdIO {},
build_service,
tokio_handle,
fetcher: Fetcher::new(
blob_service,
directory_service,
path_info_service,
nar_calculation_service,
),
known_paths: Default::default(),
}
}
/// for a given [StorePath] and additional [Path] inside the store path,
/// look up the [PathInfo], and if it exists, and then use
/// [directoryservice::descend_to] to return the
/// [Node] specified by `sub_path`.
///
/// In case there is no PathInfo yet, this means we need to build it
/// (which currently is stubbed out still).
#[instrument(skip(self, store_path), fields(store_path=%store_path, indicatif.pb_show=1), ret(level = Level::TRACE), err)]
async fn store_path_to_node(
&self,
store_path: &StorePath<String>,
sub_path: &Path,
) -> io::Result<Option<Node>> {
// Find the root node for the store_path.
// It asks the PathInfoService first, but in case there was a Derivation
// produced that would build it, fall back to triggering the build.
// To populate the input nodes, it might recursively trigger builds of
// its dependencies too.
let root_node = match self
.path_info_service
.as_ref()
.get(*store_path.digest())
.await?
{
// TODO: use stricter typed BuildRequest here.
Some(path_info) => path_info.node,
// If there's no PathInfo found, this normally means we have to
// trigger the build (and insert into PathInfoService, after
// reference scanning).
// However, as Tvix is (currently) not managing /nix/store itself,
// we return Ok(None) to let std_io take over.
// While reading from store paths that are not known to Tvix during
// that evaluation clearly is an impurity, we still need to support
// it for things like <nixpkgs> pointing to a store path.
// In the future, these things will (need to) have PathInfo.
None => {
// The store path doesn't exist yet, so we need to fetch or build it.
// We check for fetches first, as we might have both native
// fetchers and FODs in KnownPaths, and prefer the former.
// This will also find [Fetch] synthesized from
// `builtin:fetchurl` Derivations.
let maybe_fetch = self
.known_paths
.borrow()
.get_fetch_for_output_path(store_path);
match maybe_fetch {
Some((name, fetch)) => {
let (sp, root_node) = self
.fetcher
.ingest_and_persist(&name, fetch)
.await
.map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e)
})?;
debug_assert_eq!(
sp.to_absolute_path(),
store_path.as_ref().to_absolute_path(),
"store path returned from fetcher must match store path we have in fetchers"
);
root_node
}
None => {
// Look up the derivation for this output path.
let (drv_path, drv) = {
let known_paths = self.known_paths.borrow();
match known_paths.get_drv_path_for_output_path(store_path) {
Some(drv_path) => (
drv_path.to_owned(),
known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(),
),
None => {
warn!(store_path=%store_path, "no drv found");
// let StdIO take over
return Ok(None);
}
}
};
let span = Span::current();
span.pb_start();
span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
span.pb_set_message(&format!("⏳Waiting for inputs {}", &store_path));
// derivation_to_build_request needs castore nodes for all inputs.
// Provide them, which means, here is where we recursively build
// all dependencies.
let mut inputs: BTreeMap<Bytes, Node> =
futures::stream::iter(drv.input_derivations.iter())
.map(|(input_drv_path, output_names)| {
// look up the derivation object
let input_drv = {
let known_paths = self.known_paths.borrow();
known_paths
.get_drv_by_drvpath(input_drv_path)
.unwrap_or_else(|| {
panic!("{} not found", input_drv_path)
})
.to_owned()
};
// convert output names to actual paths
let output_paths: Vec<StorePath<String>> = output_names
.iter()
.map(|output_name| {
input_drv
.outputs
.get(output_name)
.expect("missing output_name")
.path
.as_ref()
.expect("missing output path")
.clone()
})
.collect();
// For each output, ask for the castore node.
// We're in a per-derivation context, so if they're
// not built yet they'll all get built together.
// If they don't need to build, we can however still
// substitute all in parallel (if they don't need to
// be built) - so we turn this into a stream of streams.
// It's up to the builder to deduplicate same build requests.
futures::stream::iter(output_paths.into_iter()).map(
|output_path| async move {
let node = self
.store_path_to_node(&output_path, Path::new(""))
.await?;
if let Some(node) = node {
Ok((output_path.to_string().into(), node))
} else {
Err(io::Error::other("no node produced"))
}
},
)
})
.flatten()
.buffer_unordered(
1, /* TODO: increase again once we prevent redundant fetches */
) // TODO: make configurable
.try_collect()
.await?;
// FUTUREWORK: merge these who things together
// add input sources
let input_sources: BTreeMap<_, _> =
futures::stream::iter(drv.input_sources.iter())
.then(|input_source| {
Box::pin({
let input_source = input_source.clone();
async move {
let node = self
.store_path_to_node(&input_source, Path::new(""))
.await?;
if let Some(node) = node {
Ok((input_source.to_string().into(), node))
} else {
Err(io::Error::other("no node produced"))
}
}
})
})
.try_collect()
.await?;
inputs.extend(input_sources);
span.pb_set_message(&format!("🔨Building {}", &store_path));
// TODO: check if input sources are sufficiently dealth with,
// I think yes, they must be imported into the store by other
// operations, so dealt with in the Some(…) match arm
// synthesize the build request.
let build_request = derivation_to_build_request(&drv, inputs)?;
// create a build
let build_result = self
.build_service
.as_ref()
.do_build(build_request)
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
// Maps from the index in refscan_needles to the full store path
// Used to map back to the actual store path from the found needles
// Importantly, this must match the order of the needles generated in derivation_to_build_request
let refscan_needles =
crate::tvix_build::get_refscan_needles(&drv).collect::<Vec<_>>();
// For each output, insert a PathInfo.
for ((output, output_needles), drv_output) in build_result
.outputs
.iter()
.zip(build_result.outputs_needles.iter())
.zip(drv.outputs.iter())
{
let (_, output_node) = output
.clone()
.into_name_bytes_and_node()
.expect("invalid node");
let output_needles: Vec<_> = output_needles
.needles
.iter()
// Map each output needle index back to the refscan_needle
.map(|idx| {
refscan_needles
.get(*idx as usize)
.ok_or(std::io::Error::new(
std::io::ErrorKind::Other,
"invalid build response",
))
})
.collect::<Result<_, std::io::Error>>()?;
// calculate the nar representation
let (nar_size, nar_sha256) = self
.nar_calculation_service
.calculate_nar(&output_node)
.await?;
// assemble the PathInfo to persist
let path_info = PathInfo {
store_path: drv_output
.1
.path
.as_ref()
.ok_or(std::io::Error::new(
std::io::ErrorKind::Other,
"Tvix bug: missing output store path",
))?
.to_owned(),
node: output_node,
references: output_needles
.iter()
.map(|s| (**s).to_owned())
.collect(),
nar_size,
nar_sha256,
signatures: vec![],
deriver: Some(
StorePath::from_name_and_digest_fixed(
drv_path
.name()
.strip_suffix(".drv")
.expect("missing .drv suffix"),
*drv_path.digest(),
)
.expect(
"Tvix bug: StorePath without .drv suffix must be valid",
),
),
ca: drv.fod_digest().map(|fod_digest| {
CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest))
}),
};
self.path_info_service
.put(path_info)
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
}
// find the output for the store path requested
let s = store_path.to_string();
build_result
.outputs
.into_iter()
.map(|e| e.into_name_and_node().expect("invalid node"))
.find(|(output_name, _output_node)| {
output_name.as_ref() == s.as_bytes()
})
.expect("build didn't produce the store path")
.1
}
}
}
};
// now with the root_node and sub_path, descend to the node requested.
// We convert sub_path to the castore model here.
let sub_path = tvix_castore::PathBuf::from_host_path(sub_path, true)?;
directoryservice::descend_to(&self.directory_service, root_node, sub_path)
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))
}
async fn node_to_path_info<'a>(
&self,
name: &'a str,
path: &Path,
ca: CAHash,
root_node: Node,
) -> io::Result<PathInfo> {
// Ask the PathInfoService for the NAR size and sha256
// We always need it no matter what is the actual hash mode
// because the [PathInfo] needs to contain nar_{sha256,size}.
let (nar_size, nar_sha256) = self
.nar_calculation_service
.as_ref()
.calculate_nar(&root_node)
.await?;
// Calculate the output path. This might still fail, as some names are illegal.
let output_path =
nix_compat::store_path::build_ca_path(name, &ca, Vec::<&str>::new(), false).map_err(
|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid name: {}", name),
)
},
)?;
tvix_store::import::log_node(name.as_bytes(), &root_node, path);
// construct a PathInfo
Ok(tvix_store::import::derive_nar_ca_path_info(
nar_size,
nar_sha256,
Some(ca),
output_path,
root_node,
))
}
pub(crate) async fn register_in_path_info_service<'a>(
&self,
name: &'a str,
path: &Path,
ca: CAHash,
root_node: Node,
) -> io::Result<PathInfo> {
let path_info = self.node_to_path_info(name, path, ca, root_node).await?;
Ok(self.path_info_service.as_ref().put(path_info).await?)
}
pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result<bool> {
Ok(self
.path_info_service
.as_ref()
.get(*store_path.digest())
.await?
.is_some())
}
}
impl EvalIO for TvixStoreIO {
#[instrument(skip(self), ret(level = Level::TRACE), err)]
fn path_exists(&self, path: &Path) -> io::Result<bool> {
if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
if self
.tokio_handle
.block_on(self.store_path_to_node(&store_path, sub_path))?
.is_some()
{
Ok(true)
} else {
// As tvix-store doesn't manage /nix/store on the filesystem,
// we still need to also ask self.std_io here.
self.std_io.path_exists(path)
}
} else {
// The store path is no store path, so do regular StdIO.
self.std_io.path_exists(path)
}
}
#[instrument(skip(self), err)]
fn open(&self, path: &Path) -> io::Result<Box<dyn io::Read>> {
if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
if let Some(node) = self
.tokio_handle
.block_on(async { self.store_path_to_node(&store_path, sub_path).await })?
{
// depending on the node type, treat open differently
match node {
Node::Directory { .. } => {
// This would normally be a io::ErrorKind::IsADirectory (still unstable)
Err(io::Error::new(
io::ErrorKind::Unsupported,
format!("tried to open directory at {:?}", path),
))
}
Node::File { digest, .. } => {
self.tokio_handle.block_on(async {
let resp = self.blob_service.as_ref().open_read(&digest).await?;
match resp {
Some(blob_reader) => {
// The VM Response needs a sync [std::io::Reader].
Ok(Box::new(SyncIoBridge::new(blob_reader))
as Box<dyn io::Read>)
}
None => {
error!(
blob.digest = %digest,
"blob not found",
);
Err(io::Error::new(
io::ErrorKind::NotFound,
format!("blob {} not found", &digest),
))
}
}
})
}
Node::Symlink { .. } => Err(io::Error::new(
io::ErrorKind::Unsupported,
"open for symlinks is unsupported",
))?,
}
} else {
// As tvix-store doesn't manage /nix/store on the filesystem,
// we still need to also ask self.std_io here.
self.std_io.open(path)
}
} else {
// The store path is no store path, so do regular StdIO.
self.std_io.open(path)
}
}
#[instrument(skip(self), ret(level = Level::TRACE), err)]
fn file_type(&self, path: &Path) -> io::Result<FileType> {
if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
if let Some(node) = self
.tokio_handle
.block_on(async { self.store_path_to_node(&store_path, sub_path).await })?
{
match node {
Node::Directory { .. } => Ok(FileType::Directory),
Node::File { .. } => Ok(FileType::Regular),
Node::Symlink { .. } => Ok(FileType::Symlink),
}
} else {
self.std_io.file_type(path)
}
} else {
self.std_io.file_type(path)
}
}
#[instrument(skip(self), ret(level = Level::TRACE), err)]
fn read_dir(&self, path: &Path) -> io::Result<Vec<(bytes::Bytes, FileType)>> {
if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
if let Some(node) = self
.tokio_handle
.block_on(async { self.store_path_to_node(&store_path, sub_path).await })?
{
match node {
Node::Directory { digest, .. } => {
// fetch the Directory itself.
if let Some(directory) = self.tokio_handle.block_on({
let digest = digest.clone();
async move { self.directory_service.as_ref().get(&digest).await }
})? {
let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new();
for (name, node) in directory.into_nodes() {
children.push(match node {
Node::Directory { .. } => (name.into(), FileType::Directory),
Node::File { .. } => (name.clone().into(), FileType::Regular),
Node::Symlink { .. } => (name.into(), FileType::Symlink),
})
}
Ok(children)
} else {
// If we didn't get the directory node that's linked, that's a store inconsistency!
error!(
directory.digest = %digest,
path = ?path,
"directory not found",
);
Err(io::Error::new(
io::ErrorKind::NotFound,
format!("directory {digest} does not exist"),
))?
}
}
Node::File { .. } => {
// This would normally be a io::ErrorKind::NotADirectory (still unstable)
Err(io::Error::new(
io::ErrorKind::Unsupported,
"tried to readdir path {:?}, which is a file",
))?
}
Node::Symlink { .. } => Err(io::Error::new(
io::ErrorKind::Unsupported,
"read_dir for symlinks is unsupported",
))?,
}
} else {
self.std_io.read_dir(path)
}
} else {
self.std_io.read_dir(path)
}
}
#[instrument(skip(self), ret(level = Level::TRACE), err)]
fn import_path(&self, path: &Path) -> io::Result<PathBuf> {
let output_path = self.tokio_handle.block_on(async {
tvix_store::import::import_path_as_nar_ca(
path,
tvix_store::import::path_to_name(path)?,
&self.blob_service,
&self.directory_service,
&self.path_info_service,
&self.nar_calculation_service,
)
.await
})?;
Ok(output_path.to_absolute_path().into())
}
#[instrument(skip(self), ret(level = Level::TRACE))]
fn store_dir(&self) -> Option<String> {
Some("/nix/store".to_string())
}
}
#[cfg(test)]
mod tests {
use std::{path::Path, rc::Rc, sync::Arc};
use bstr::ByteSlice;
use clap::Parser;
use tempfile::TempDir;
use tvix_build::buildservice::DummyBuildService;
use tvix_eval::{EvalIO, EvaluationResult};
use tvix_store::utils::{construct_services, ServiceUrlsMemory};
use super::TvixStoreIO;
use crate::builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins};
/// evaluates a given nix expression and returns the result.
/// Takes care of setting up the evaluator so it knows about the
// `derivation` builtin.
fn eval(str: &str) -> EvaluationResult {
let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
tokio_runtime
.block_on(async {
construct_services(ServiceUrlsMemory::parse_from(std::iter::empty::<&str>()))
.await
})
.unwrap();
let io = Rc::new(TvixStoreIO::new(
blob_service,
directory_service,
path_info_service,
nar_calculation_service.into(),
Arc::<DummyBuildService>::default(),
tokio_runtime.handle().clone(),
));
let mut eval_builder =
tvix_eval::Evaluation::builder(io.clone() as Rc<dyn EvalIO>).enable_import();
eval_builder = add_derivation_builtins(eval_builder, Rc::clone(&io));
eval_builder = add_fetcher_builtins(eval_builder, Rc::clone(&io));
eval_builder = add_import_builtins(eval_builder, io);
let eval = eval_builder.build();
// run the evaluation itself.
eval.evaluate(str, None)
}
/// Helper function that takes a &Path, and invokes a tvix evaluator coercing that path to a string
/// (via "${/this/path}"). The path can be both absolute or not.
/// It returns Option<String>, depending on whether the evaluation succeeded or not.
fn import_path_and_compare<P: AsRef<Path>>(p: P) -> Option<String> {
// Try to import the path using "${/tmp/path/to/test}".
// The format string looks funny, the {} passed to Nix needs to be
// escaped.
let code = format!(r#""${{{}}}""#, p.as_ref().display());
let result = eval(&code);
if !result.errors.is_empty() {
return None;
}
let value = result.value.expect("must be some");
match value {
tvix_eval::Value::String(s) => Some(s.to_str_lossy().into_owned()),
_ => panic!("unexpected value type: {:?}", value),
}
}
/// Import a directory with a zero-sized ".keep" regular file.
/// Ensure it matches the (pre-recorded) store path that Nix would produce.
#[test]
fn import_directory() {
let tmpdir = TempDir::new().unwrap();
// create a directory named "test"
let src_path = tmpdir.path().join("test");
std::fs::create_dir(&src_path).unwrap();
// write a regular file `.keep`.
std::fs::write(src_path.join(".keep"), vec![]).unwrap();
// importing the path with .../test at the end.
assert_eq!(
Some("/nix/store/gq3xcv4xrj4yr64dflyr38acbibv3rm9-test".to_string()),
import_path_and_compare(&src_path)
);
// importing the path with .../test/. at the end.
assert_eq!(
Some("/nix/store/gq3xcv4xrj4yr64dflyr38acbibv3rm9-test".to_string()),
import_path_and_compare(src_path.join("."))
);
}
/// Import a file into the store. Nix uses the "recursive"/NAR-based hashing
/// scheme for these.
#[test]
fn import_file() {
let tmpdir = TempDir::new().unwrap();
// write a regular file `empty`.
std::fs::write(tmpdir.path().join("empty"), vec![]).unwrap();
assert_eq!(
Some("/nix/store/lx5i78a4izwk2qj1nq8rdc07y8zrwy90-empty".to_string()),
import_path_and_compare(tmpdir.path().join("empty"))
);
// write a regular file `hello.txt`.
std::fs::write(tmpdir.path().join("hello.txt"), b"Hello World!").unwrap();
assert_eq!(
Some("/nix/store/925f1jb1ajrypjbyq7rylwryqwizvhp0-hello.txt".to_string()),
import_path_and_compare(tmpdir.path().join("hello.txt"))
);
}
/// Invoke toString on a nonexisting file, and access the .file attribute.
/// This should not cause an error, because it shouldn't trigger an import,
/// and leave the path as-is.
#[test]
fn nonexisting_path_without_import() {
let result = eval("toString ({ line = 42; col = 42; file = /deep/thought; }.file)");
assert!(result.errors.is_empty(), "expect evaluation to succeed");
let value = result.value.expect("must be some");
match value {
tvix_eval::Value::String(s) => {
assert_eq!(*s, "/deep/thought");
}
_ => panic!("unexpected value type: {:?}", value),
}
}
}