diff options
Diffstat (limited to 'tvix/store/src')
41 files changed, 5375 insertions, 0 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs new file mode 100644 index 000000000000..8c278c433997 --- /dev/null +++ b/tvix/store/src/bin/tvix-store.rs @@ -0,0 +1,206 @@ +use clap::Subcommand; +use data_encoding::BASE64; +use futures::future::try_join_all; +use std::io; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use tracing_subscriber::prelude::*; +use tvix_store::blobservice::GRPCBlobService; +use tvix_store::blobservice::SledBlobService; +use tvix_store::directoryservice::GRPCDirectoryService; +use tvix_store::directoryservice::SledDirectoryService; +use tvix_store::nar::GRPCNARCalculationService; +use tvix_store::nar::NonCachingNARCalculationService; +use tvix_store::pathinfoservice::GRPCPathInfoService; +use tvix_store::pathinfoservice::SledPathInfoService; +use tvix_store::proto::blob_service_client::BlobServiceClient; +use tvix_store::proto::blob_service_server::BlobServiceServer; +use tvix_store::proto::directory_service_client::DirectoryServiceClient; +use tvix_store::proto::directory_service_server::DirectoryServiceServer; +use tvix_store::proto::node::Node; +use tvix_store::proto::path_info_service_client::PathInfoServiceClient; +use tvix_store::proto::path_info_service_server::PathInfoServiceServer; +use tvix_store::proto::GRPCBlobServiceWrapper; +use tvix_store::proto::GRPCDirectoryServiceWrapper; +use tvix_store::proto::GRPCPathInfoServiceWrapper; +use tvix_store::TvixStoreIO; + +#[cfg(feature = "reflection")] +use tvix_store::proto::FILE_DESCRIPTOR_SET; + +use clap::Parser; +use tonic::{transport::Server, Result}; +use tracing::{info, Level}; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// Whether to log in JSON + #[arg(long)] + json: bool, + + #[arg(long)] + log_level: Option<Level>, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Runs the tvix-store daemon. + Daemon { + #[arg(long, short = 'l')] + listen_address: Option<String>, + }, + /// Imports a list of paths into the store (not using the daemon) + Import { + #[clap(value_name = "PATH")] + paths: Vec<PathBuf>, + }, +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + let cli = Cli::parse(); + + // configure log settings + let level = cli.log_level.unwrap_or(Level::INFO); + + let subscriber = tracing_subscriber::registry() + .with(if cli.json { + Some( + tracing_subscriber::fmt::Layer::new() + .with_writer(io::stdout.with_max_level(level)) + .json(), + ) + } else { + None + }) + .with(if !cli.json { + Some( + tracing_subscriber::fmt::Layer::new() + .with_writer(io::stdout.with_max_level(level)) + .pretty(), + ) + } else { + None + }); + + tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber"); + + match cli.command { + Commands::Daemon { listen_address } => { + // initialize stores + let blob_service = SledBlobService::new("blobs.sled".into())?; + let directory_service = SledDirectoryService::new("directories.sled".into())?; + let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?; + + let listen_address = listen_address + .unwrap_or_else(|| "[::]:8000".to_string()) + .parse() + .unwrap(); + + let mut server = Server::builder(); + + let nar_calculation_service = NonCachingNARCalculationService::new( + blob_service.clone(), + directory_service.clone(), + ); + + #[allow(unused_mut)] + let mut router = server + .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( + blob_service, + ))) + .add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(directory_service), + )) + .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( + path_info_service, + nar_calculation_service, + ))); + + #[cfg(feature = "reflection")] + { + let reflection_svc = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build()?; + router = router.add_service(reflection_svc); + } + + info!("tvix-store listening on {}", listen_address); + + router.serve(listen_address).await?; + } + Commands::Import { paths } => { + let blob_service = GRPCBlobService::from_client( + BlobServiceClient::connect("http://[::1]:8000").await?, + ); + let directory_service = GRPCDirectoryService::from_client( + DirectoryServiceClient::connect("http://[::1]:8000").await?, + ); + let path_info_service_client = + PathInfoServiceClient::connect("http://[::1]:8000").await?; + let path_info_service = + GRPCPathInfoService::from_client(path_info_service_client.clone()); + let nar_calculation_service = + GRPCNARCalculationService::from_client(path_info_service_client); + + let io = Arc::new(TvixStoreIO::new( + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + )); + + let tasks = paths + .iter() + .map(|path| { + let io_move = io.clone(); + let path = path.clone(); + let task: tokio::task::JoinHandle<Result<(), io::Error>> = + tokio::task::spawn_blocking(move || { + let path_info = io_move.import_path_with_pathinfo(&path)?; + print_node(&path_info.node.unwrap().node.unwrap(), &path); + Ok(()) + }); + task + }) + .collect::<Vec<tokio::task::JoinHandle<Result<(), io::Error>>>>(); + + try_join_all(tasks).await?; + } + }; + Ok(()) +} + +fn print_node(node: &Node, path: &Path) { + match node { + Node::Directory(directory_node) => { + info!( + path = ?path, + name = directory_node.name, + digest = BASE64.encode(&directory_node.digest), + "import successful", + ) + } + Node::File(file_node) => { + info!( + path = ?path, + name = file_node.name, + digest = BASE64.encode(&file_node.digest), + "import successful" + ) + } + Node::Symlink(symlink_node) => { + info!( + path = ?path, + name = symlink_node.name, + target = symlink_node.target, + "import successful" + ) + } + } +} diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs new file mode 100644 index 000000000000..0b08fbf46ad9 --- /dev/null +++ b/tvix/store/src/blobservice/grpc.rs @@ -0,0 +1,217 @@ +use super::{BlobService, BlobWriter}; +use crate::{proto, B3Digest}; +use futures::sink::{SinkExt, SinkMapErr}; +use std::{collections::VecDeque, io}; +use tokio::task::JoinHandle; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_util::{ + io::{CopyToBytes, SinkWriter, SyncIoBridge}, + sync::{PollSendError, PollSender}, +}; +use tonic::{transport::Channel, Code, Status, Streaming}; +use tracing::instrument; + +/// Connects to a (remote) tvix-store BlobService over gRPC. +#[derive(Clone)] +pub struct GRPCBlobService { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, +} + +impl GRPCBlobService { + /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient<Channel>], + /// and a [tokio::runtime::Handle]. + pub fn new( + grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + tokio_handle: tokio::runtime::Handle, + ) -> Self { + Self { + tokio_handle, + grpc_client, + } + } + /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient<Channel>]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle: tokio::runtime::Handle::current(), + grpc_client, + } + } +} + +impl BlobService for GRPCBlobService { + type BlobReader = Box<dyn io::Read + Send>; + type BlobWriter = GRPCBlobWriter; + + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + let digest = digest.clone(); + + let task: tokio::task::JoinHandle<Result<_, Status>> = + self.tokio_handle.spawn(async move { + Ok(grpc_client + .stat(proto::StatBlobRequest { + digest: digest.to_vec(), + ..Default::default() + }) + .await? + .into_inner()) + }); + + match self.tokio_handle.block_on(task)? { + Ok(_blob_meta) => Ok(true), + Err(e) if e.code() == Code::NotFound => Ok(false), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + // On success, this returns a Ok(Some(io::Read)), which can be used to read + // the contents of the Blob, identified by the digest. + fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + let digest = digest.clone(); + + // Construct the task that'll send out the request and return the stream + // the gRPC client should use to send [proto::BlobChunk], or an error if + // the blob doesn't exist. + let task: tokio::task::JoinHandle<Result<Streaming<proto::BlobChunk>, Status>> = + self.tokio_handle.spawn(async move { + let stream = grpc_client + .read(proto::ReadBlobRequest { + digest: digest.to_vec(), + }) + .await? + .into_inner(); + + Ok(stream) + }); + + // This runs the task to completion, which on success will return a stream. + // On reading from it, we receive individual [proto::BlobChunk], so we + // massage this to a stream of bytes, + // then create an [AsyncRead], which we'll turn into a [io::Read], + // that's returned from the function. + match self.tokio_handle.block_on(task)? { + Ok(stream) => { + // map the stream of proto::BlobChunk to bytes. + let data_stream = stream.map(|x| { + x.map(|x| VecDeque::from(x.data)) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) + }); + + // Use StreamReader::new to convert to an AsyncRead. + let data_reader = tokio_util::io::StreamReader::new(data_stream); + + // Use SyncIoBridge to turn it into a sync Read. + let sync_reader = tokio_util::io::SyncIoBridge::new(data_reader); + Ok(Some(Box::new(sync_reader))) + } + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + /// Returns a [Self::BlobWriter], that'll internally wrap each write in a + // [proto::BlobChunk] and which is passed to the + fn open_write(&self) -> Result<Self::BlobWriter, crate::Error> { + let mut grpc_client = self.grpc_client.clone(); + + // set up an mpsc channel passing around Bytes. + let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); + + // bytes arriving on the RX side are wrapped inside a + // [proto::BlobChunk], and a [ReceiverStream] is constructed. + let blobchunk_stream = + ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.to_vec() }); + + // That receiver stream is used as a stream in the gRPC BlobService.put rpc call. + let task: tokio::task::JoinHandle<Result<_, Status>> = self + .tokio_handle + .spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); + + // The tx part of the channel is converted to a sink of byte chunks. + + // We need to make this a function pointer, not a closure. + fn convert_error(_: PollSendError<bytes::Bytes>) -> io::Error { + io::Error::from(io::ErrorKind::BrokenPipe) + } + + let sink = PollSender::new(tx) + .sink_map_err(convert_error as fn(PollSendError<bytes::Bytes>) -> io::Error); + // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item" + + // … which is turned into an [tokio::io::AsyncWrite]. + let async_writer = SinkWriter::new(CopyToBytes::new(sink)); + // … which is then turned into a [io::Write]. + let writer = SyncIoBridge::new(async_writer); + + Ok(GRPCBlobWriter { + tokio_handle: self.tokio_handle.clone(), // TODO: is the clone() ok here? + task, + inner_writer: writer, + }) + } +} + +type BridgedWriter = SyncIoBridge< + SinkWriter< + CopyToBytes< + SinkMapErr<PollSender<bytes::Bytes>, fn(PollSendError<bytes::Bytes>) -> io::Error>, + >, + >, +>; + +pub struct GRPCBlobWriter { + /// A handle into the active tokio runtime. Necessary to block on the task + /// containing the put request. + tokio_handle: tokio::runtime::Handle, + + /// The task containing the put request. + task: JoinHandle<Result<proto::PutBlobResponse, Status>>, + + /// The inner Writer. + inner_writer: BridgedWriter, +} + +impl BlobWriter for GRPCBlobWriter { + fn close(mut self) -> Result<B3Digest, crate::Error> { + // invoke shutdown, so the inner writer closes its internal tx side of + // the channel. + self.inner_writer + .shutdown() + .map_err(|e| crate::Error::StorageError(e.to_string()))?; + + // block on the RPC call to return. + // This ensures all chunks are sent out, and have been received by the + // backend. + match self.tokio_handle.block_on(self.task)? { + Ok(resp) => { + // return the digest from the response. + B3Digest::from_vec(resp.digest).map_err(|_| { + crate::Error::StorageError("invalid root digest length in response".to_string()) + }) + } + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } +} + +impl io::Write for GRPCBlobWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.inner_writer.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner_writer.flush() + } +} diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs new file mode 100644 index 000000000000..1ee59d108743 --- /dev/null +++ b/tvix/store/src/blobservice/memory.rs @@ -0,0 +1,76 @@ +use std::io::Cursor; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; +use tracing::{instrument, warn}; + +use super::{BlobService, BlobWriter}; +use crate::{B3Digest, Error}; + +#[derive(Clone, Default)] +pub struct MemoryBlobService { + db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, +} + +impl BlobService for MemoryBlobService { + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = MemoryBlobWriter; + + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + fn has(&self, digest: &B3Digest) -> Result<bool, Error> { + let db = self.db.read().unwrap(); + Ok(db.contains_key(digest)) + } + + fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> { + let db = self.db.read().unwrap(); + + Ok(db.get(digest).map(|x| Cursor::new(x.clone()))) + } + + #[instrument(skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(MemoryBlobWriter::new(self.db.clone())) + } +} + +pub struct MemoryBlobWriter { + db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, + + buf: Vec<u8>, +} + +impl MemoryBlobWriter { + fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self { + Self { + buf: Vec::new(), + db, + } + } +} +impl std::io::Write for MemoryBlobWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + self.buf.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.buf.flush() + } +} + +impl BlobWriter for MemoryBlobWriter { + fn close(self) -> Result<B3Digest, Error> { + // in this memory implementation, we don't actually bother hashing + // incrementally while writing, but do it at the end. + let mut hasher = blake3::Hasher::new(); + hasher.update(&self.buf); + let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + + // open the database for writing. + let mut db = self.db.write()?; + db.insert(digest.clone(), self.buf); + + Ok(digest) + } +} diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs new file mode 100644 index 000000000000..c5a2de124656 --- /dev/null +++ b/tvix/store/src/blobservice/mod.rs @@ -0,0 +1,41 @@ +use std::io; + +use crate::{B3Digest, Error}; + +mod grpc; +mod memory; +mod sled; + +pub use self::grpc::GRPCBlobService; +pub use self::memory::MemoryBlobService; +pub use self::sled::SledBlobService; + +/// The base trait all BlobService services need to implement. +/// It provides functions to check whether a given blob exists, +/// a way to get a [io::Read] to a blob, and a method to initiate writing a new +/// Blob, which returns a [BlobWriter], that can be used +pub trait BlobService { + type BlobReader: io::Read + Send + std::marker::Unpin; + type BlobWriter: BlobWriter + Send; + + /// Check if the service has the blob, by its content hash. + fn has(&self, digest: &B3Digest) -> Result<bool, Error>; + + /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>. + fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error>; + + /// Insert a new blob into the store. Returns a [BlobWriter], which + /// implements [io::Write] and a [BlobWriter::close]. + /// TODO: is there any reason we want this to be a Result<>, and not just T? + fn open_write(&self) -> Result<Self::BlobWriter, Error>; +} + +/// A [io::Write] that you need to close() afterwards, and get back the digest +/// of the written blob. +pub trait BlobWriter: io::Write { + /// Signal there's no more data to be written, and return the digest of the + /// contents written. + /// + /// This consumes self, so it's not possible to close twice. + fn close(self) -> Result<B3Digest, Error>; +} diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs new file mode 100644 index 000000000000..2b090335344d --- /dev/null +++ b/tvix/store/src/blobservice/sled.rs @@ -0,0 +1,94 @@ +use super::{BlobService, BlobWriter}; +use crate::{B3Digest, Error}; +use std::{ + io::{self, Cursor}, + path::PathBuf, +}; +use tracing::instrument; + +#[derive(Clone)] +pub struct SledBlobService { + db: sled::Db, +} + +impl SledBlobService { + pub fn new(p: PathBuf) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +impl BlobService for SledBlobService { + type BlobReader = Cursor<Vec<u8>>; + type BlobWriter = SledBlobWriter; + + #[instrument(skip(self), fields(blob.digest=%digest))] + fn has(&self, digest: &B3Digest) -> Result<bool, Error> { + match self.db.contains_key(digest.to_vec()) { + Ok(has) => Ok(has), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self), fields(blob.digest=%digest))] + fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> { + match self.db.get(digest.to_vec()) { + Ok(None) => Ok(None), + Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self))] + fn open_write(&self) -> Result<Self::BlobWriter, Error> { + Ok(SledBlobWriter::new(self.db.clone())) + } +} + +pub struct SledBlobWriter { + db: sled::Db, + buf: Vec<u8>, + hasher: blake3::Hasher, +} + +impl SledBlobWriter { + pub fn new(db: sled::Db) -> Self { + Self { + buf: Vec::default(), + db, + hasher: blake3::Hasher::new(), + } + } +} + +impl io::Write for SledBlobWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + let bytes_written = self.buf.write(buf)?; + self.hasher.write(&buf[..bytes_written]) + } + + fn flush(&mut self) -> io::Result<()> { + self.buf.flush() + } +} + +impl BlobWriter for SledBlobWriter { + fn close(self) -> Result<B3Digest, Error> { + let digest = self.hasher.finalize(); + self.db + .insert(digest.as_bytes(), self.buf) + .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; + + // We know self.hasher is doing blake3 hashing, so this won't fail. + Ok(B3Digest::from_vec(digest.as_bytes().to_vec()).unwrap()) + } +} diff --git a/tvix/store/src/digests.rs b/tvix/store/src/digests.rs new file mode 100644 index 000000000000..441a059ee0b6 --- /dev/null +++ b/tvix/store/src/digests.rs @@ -0,0 +1,49 @@ +use data_encoding::BASE64; +use thiserror::Error; + +// FUTUREWORK: make generic + +#[derive(PartialEq, Eq, Hash, Debug)] +pub struct B3Digest(Vec<u8>); + +// TODO: allow converting these errors to crate::Error +#[derive(Error, Debug)] +pub enum Error { + #[error("invalid digest length: {0}")] + InvalidDigestLen(usize), +} + +impl B3Digest { + // constructs a [B3Digest] from a [Vec<u8>]. + // Returns an error if the digest has the wrong length. + pub fn from_vec(value: Vec<u8>) -> Result<Self, Error> { + if value.len() != 32 { + Err(Error::InvalidDigestLen(value.len())) + } else { + Ok(Self(value)) + } + } + + // returns a copy of the inner [Vec<u8>]. + pub fn to_vec(&self) -> Vec<u8> { + self.0.to_vec() + } +} + +impl From<&[u8; 32]> for B3Digest { + fn from(value: &[u8; 32]) -> Self { + Self(value.to_vec()) + } +} + +impl Clone for B3Digest { + fn clone(&self) -> Self { + Self(self.0.to_owned()) + } +} + +impl std::fmt::Display for B3Digest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "b3:{}", BASE64.encode(self.0.as_slice())) + } +} diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs new file mode 100644 index 000000000000..1b33572cf7de --- /dev/null +++ b/tvix/store/src/directoryservice/grpc.rs @@ -0,0 +1,532 @@ +use std::collections::HashSet; + +use super::{DirectoryPutter, DirectoryService}; +use crate::proto::{self, get_directory_request::ByWhat}; +use crate::{B3Digest, Error}; +use tokio::sync::mpsc::UnboundedSender; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic::{transport::Channel, Status}; +use tonic::{Code, Streaming}; +use tracing::{instrument, warn}; + +/// Connects to a (remote) tvix-store DirectoryService over gRPC. +#[derive(Clone)] +pub struct GRPCDirectoryService { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, +} + +impl GRPCDirectoryService { + /// Construct a new [GRPCDirectoryService], by passing a handle to the + /// tokio runtime, and a gRPC client. + pub fn new( + tokio_handle: tokio::runtime::Handle, + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle, + grpc_client, + } + } + + /// construct a [GRPCDirectoryService] from a [proto::blob_service_client::BlobServiceClient<Channel>]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle: tokio::runtime::Handle::current(), + grpc_client, + } + } +} + +impl DirectoryService for GRPCDirectoryService { + type DirectoriesIterator = StreamIterator; + + fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + + let digest_as_vec = digest.to_vec(); + let task = self.tokio_handle.spawn(async move { + let mut s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(digest_as_vec)), + }) + .await? + .into_inner(); + + // Retrieve the first message only, then close the stream (we set recursive to false) + s.message().await + }); + + let digest = digest.clone(); + match self.tokio_handle.block_on(task)? { + Ok(Some(directory)) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != digest { + Err(crate::Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))) + } else if let Err(e) = directory.validate() { + // Validate the Directory itself is valid. + warn!("directory failed validation: {}", e.to_string()); + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + digest, e, + ))) + } else { + Ok(Some(directory)) + } + } + Ok(None) => Ok(None), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + let mut grpc_client = self.grpc_client.clone(); + + let task = self + .tokio_handle + .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); + + match self.tokio_handle.block_on(task)? { + Ok(put_directory_resp) => Ok(B3Digest::from_vec( + put_directory_resp.into_inner().root_digest, + ) + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { + let mut grpc_client = self.grpc_client.clone(); + + let root_directory_digest_as_vec = root_directory_digest.to_vec(); + let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(root_directory_digest_as_vec)), + }) + .await? + .into_inner(); + + Ok(s) + }); + + let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); + + StreamIterator::new( + self.tokio_handle.clone(), + root_directory_digest.clone(), + stream, + ) + } + + type DirectoryPutter = GRPCPutter; + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Self::DirectoryPutter + where + Self: Clone, + { + let mut grpc_client = self.grpc_client.clone(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); + + Ok(s) + }); + + GRPCPutter::new(self.tokio_handle.clone(), tx, task) + } +} + +pub struct StreamIterator { + /// A handle into the active tokio runtime. Necessary to run futures to completion. + tokio_handle: tokio::runtime::Handle, + // A stream of [proto::Directory] + stream: Streaming<proto::Directory>, + // The Directory digests we received so far + received_directory_digests: HashSet<B3Digest>, + // The Directory digests we're still expecting to get sent. + expected_directory_digests: HashSet<B3Digest>, +} + +impl StreamIterator { + pub fn new( + tokio_handle: tokio::runtime::Handle, + root_digest: B3Digest, + stream: Streaming<proto::Directory>, + ) -> Self { + Self { + tokio_handle, + stream, + received_directory_digests: HashSet::new(), + expected_directory_digests: HashSet::from([root_digest]), + } + } +} + +impl Iterator for StreamIterator { + type Item = Result<proto::Directory, crate::Error>; + + fn next(&mut self) -> Option<Self::Item> { + match self.tokio_handle.block_on(self.stream.message()) { + Ok(ok) => match ok { + Some(directory) => { + // validate the directory itself. + if let Err(e) = directory.validate() { + return Some(Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + directory.digest(), + e, + )))); + } + // validate we actually expected that directory, and move it from expected to received. + let directory_digest = directory.digest(); + let was_expected = self.expected_directory_digests.remove(&directory_digest); + if !was_expected { + // FUTUREWORK: dumb clients might send the same stuff twice. + // as a fallback, we might want to tolerate receiving + // it if it's in received_directory_digests (as that + // means it once was in expected_directory_digests) + return Some(Err(crate::Error::StorageError(format!( + "received unexpected directory {}", + directory_digest + )))); + } + self.received_directory_digests.insert(directory_digest); + + // register all children in expected_directory_digests. + for child_directory in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + let child_directory_digest = + B3Digest::from_vec(child_directory.digest.clone()).unwrap(); + + self.expected_directory_digests + .insert(child_directory_digest); + } + + Some(Ok(directory)) + } + None => { + // If we were still expecting something, that's an error. + if !self.expected_directory_digests.is_empty() { + Some(Err(crate::Error::StorageError(format!( + "still expected {} directories, but got premature end of stream", + self.expected_directory_digests.len(), + )))) + } else { + None + } + } + }, + Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), + } + } +} + +/// Allows uploading multiple Directory messages in the same gRPC stream. +pub struct GRPCPutter { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// Data about the current request - a handle to the task, and the tx part + /// of the channel. + /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request. + /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed. + #[allow(clippy::type_complexity)] // lol + rq: Option<( + tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + UnboundedSender<proto::Directory>, + )>, +} + +impl GRPCPutter { + pub fn new( + tokio_handle: tokio::runtime::Handle, + directory_sender: UnboundedSender<proto::Directory>, + task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + ) -> Self { + Self { + tokio_handle, + rq: Some((task, directory_sender)), + } + } + + #[allow(dead_code)] + // allows checking if the tx part of the channel is closed. + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } +} + +impl DirectoryPutter for GRPCPutter { + fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + match self.rq { + // If we're not already closed, send the directory to directory_sender. + Some((_, ref directory_sender)) => { + if directory_sender.send(directory).is_err() { + // If the channel has been prematurely closed, invoke close (so we can peek at the error code) + // That error code is much more helpful, because it + // contains the error message from the server. + self.close()?; + } + Ok(()) + } + // If self.close() was already called, we can't put again. + None => Err(Error::StorageError( + "DirectoryPutter already closed".to_string(), + )), + } + } + + /// Closes the stream for sending, and returns the value + fn close(&mut self) -> Result<B3Digest, crate::Error> { + // get self.rq, and replace it with None. + // This ensures we can only close it once. + match std::mem::take(&mut self.rq) { + None => Err(Error::StorageError("already closed".to_string())), + Some((task, directory_sender)) => { + // close directory_sender, so blocking on task will finish. + drop(directory_sender); + + let root_digest = self + .tokio_handle + .block_on(task)? + .map_err(|e| Error::StorageError(e.to_string()))? + .root_digest; + + B3Digest::from_vec(root_digest).map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + }) + } + } + } +} + +#[cfg(test)] +mod tests { + use core::time; + use std::thread; + + use tempfile::TempDir; + use tokio::net::{UnixListener, UnixStream}; + use tokio_stream::wrappers::UnixListenerStream; + use tonic::transport::{Endpoint, Server, Uri}; + + use crate::{ + directoryservice::{DirectoryPutter, DirectoryService}, + proto, + proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, + tests::{ + fixtures::{DIRECTORY_A, DIRECTORY_B}, + utils::gen_directory_service, + }, + }; + + #[test] + fn test() -> anyhow::Result<()> { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("socket"); + + // Spin up a server, in a thread far away, which spawns its own tokio runtime, + // and blocks on the task. + let socket_path_clone = socket_path.clone(); + thread::spawn(move || { + // Create the runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + // Get a handle from this runtime + let handle = rt.handle(); + + let task = handle.spawn(async { + let uds = UnixListener::bind(socket_path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(gen_directory_service()), + )); + router.serve_with_incoming(uds_stream).await + }); + + handle.block_on(task) + }); + + // set up the local client runtime. This is similar to what the [tokio:test] macro desugars to. + let tester_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // wait for the socket to be created + { + let mut socket_created = false; + for _try in 1..20 { + if socket_path.exists() { + socket_created = true; + break; + } + std::thread::sleep(time::Duration::from_millis(20)) + } + + assert!( + socket_created, + "expected socket path to eventually get created, but never happened" + ); + } + + let task = tester_runtime.spawn_blocking(move || { + // Create a channel, connecting to the uds at socket_path. + // The URI is unused. + let channel = Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { + UnixStream::connect(socket_path.clone()) + })); + + let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel); + + // create the GrpcDirectoryService, using the tester_runtime. + let directory_service = + super::GRPCDirectoryService::new(tokio::runtime::Handle::current(), grpc_client); + + // try to get DIRECTORY_A should return Ok(None) + assert_eq!( + None, + directory_service + .get(&DIRECTORY_A.digest()) + .expect("must not fail") + ); + + // Now upload it + assert_eq!( + DIRECTORY_A.digest(), + directory_service + .put(DIRECTORY_A.clone()) + .expect("must succeed") + ); + + // And retrieve it, compare for equality. + assert_eq!( + DIRECTORY_A.clone(), + directory_service + .get(&DIRECTORY_A.digest()) + .expect("must succeed") + .expect("must be some") + ); + + // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. + directory_service + .put(DIRECTORY_B.clone()) + .expect_err("must fail"); + + // Putting DIRECTORY_B in a put_multiple will succeed, but the close + // will always fail. + { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + handle.close().expect_err("must fail"); + } + + // Uploading A and then B should succeed, and closing should return the digest of B. + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_A.clone()).expect("must succeed"); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + let digest = handle.close().expect("must succeed"); + assert_eq!(DIRECTORY_B.digest(), digest); + + // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. + let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); + assert_eq!( + DIRECTORY_B.clone(), + directories_it + .next() + .expect("must be some") + .expect("must succeed") + ); + assert_eq!( + DIRECTORY_A.clone(), + directories_it + .next() + .expect("must be some") + .expect("must succeed") + ); + + // Uploading B and then A should fail, because B refers to A, which + // hasn't been uploaded yet. + // However, the client can burst, so we might not have received the + // error back from the server. + { + let mut handle = directory_service.put_multiple_start(); + // sending out B will always be fine + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + + // whether we will be able to put A as well depends on whether we + // already received the error about B. + if handle.put(DIRECTORY_A.clone()).is_ok() { + // If we didn't, and this was Ok(_), … + // a subsequent close MUST fail (because it waits for the + // server) + handle.close().expect_err("must fail"); + } + } + + // Now we do the same test as before, send B, then A, but wait + // sufficiently enough for the server to have s + // to close us the stream, + // and then assert that uploading anything else via the handle will fail. + { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + + let mut is_closed = false; + for _try in 1..1000 { + if handle.is_closed() { + is_closed = true; + break; + } + std::thread::sleep(time::Duration::from_millis(10)) + } + + assert!( + is_closed, + "expected channel to eventually close, but never happened" + ); + + handle.put(DIRECTORY_A.clone()).expect_err("must fail"); + } + }); + + tester_runtime.block_on(task)?; + + Ok(()) + } +} diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs new file mode 100644 index 000000000000..1fd619f7c8cb --- /dev/null +++ b/tvix/store/src/directoryservice/memory.rs @@ -0,0 +1,84 @@ +use crate::{proto, B3Digest, Error}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tracing::{instrument, warn}; + +use super::utils::SimplePutter; +use super::{DirectoryService, DirectoryTraverser}; + +#[derive(Clone, Default)] +pub struct MemoryDirectoryService { + db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, +} + +impl DirectoryService for MemoryDirectoryService { + type DirectoriesIterator = DirectoryTraverser<Self>; + + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + let db = self.db.read()?; + + match db.get(digest) { + // The directory was not found, return + None => Ok(None), + + // The directory was found, try to parse the data as Directory message + Some(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != *digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))); + } + + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + actual_digest, e, + ))); + } + + Ok(Some(directory.clone())) + } + } + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + + // store it + let mut db = self.db.write()?; + db.insert(digest.clone(), directory); + + Ok(digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { + DirectoryTraverser::with(self.clone(), root_directory_digest) + } + + type DirectoryPutter = SimplePutter<Self>; + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Self::DirectoryPutter + where + Self: Clone, + { + SimplePutter::new(self.clone()) + } +} diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs new file mode 100644 index 000000000000..f387d28948f0 --- /dev/null +++ b/tvix/store/src/directoryservice/mod.rs @@ -0,0 +1,54 @@ +use crate::{proto, B3Digest, Error}; +mod grpc; +mod memory; +mod sled; +mod traverse; +mod utils; + +pub use self::grpc::GRPCDirectoryService; +pub use self::memory::MemoryDirectoryService; +pub use self::sled::SledDirectoryService; +pub use self::traverse::traverse_to; +pub use self::utils::DirectoryTraverser; + +/// The base trait all Directory services need to implement. +/// This is a simple get and put of [crate::proto::Directory], returning their +/// digest. +pub trait DirectoryService { + type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send; + type DirectoryPutter: DirectoryPutter; + + /// Get looks up a single Directory message by its digest. + /// In case the directory is not found, Ok(None) is returned. + fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + /// Get uploads a single Directory message, and returns the calculated + /// digest, or an error. + fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + + /// Looks up a closure of [proto::Directory]. + /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`, + /// and we'd be able to add a default implementation for it here, but + /// we can't have that yet. + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator; + + /// Allows persisting a closure of [proto::Directory], which is a graph of + /// connected Directory messages. + fn put_multiple_start(&self) -> Self::DirectoryPutter; +} + +/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// +/// The consumer can periodically call [put], starting from the leaves. Once +/// the root is reached, [close] can be called to retrieve the root digest (or +/// an error). +pub trait DirectoryPutter { + /// Put a individual [proto::Directory] into the store. + /// Error semantics and behaviour is up to the specific implementation of + /// this trait. + /// Due to bursting, the returned error might refer to an object previously + /// sent via `put`. + fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + + /// Close the stream, and wait for any errors. + fn close(&mut self) -> Result<B3Digest, Error>; +} diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs new file mode 100644 index 000000000000..e189e8acf507 --- /dev/null +++ b/tvix/store/src/directoryservice/sled.rs @@ -0,0 +1,107 @@ +use crate::proto::Directory; +use crate::{proto, B3Digest, Error}; +use prost::Message; +use std::path::PathBuf; +use tracing::{instrument, warn}; + +use super::utils::SimplePutter; +use super::{DirectoryService, DirectoryTraverser}; + +#[derive(Clone)] +pub struct SledDirectoryService { + db: sled::Db, +} + +impl SledDirectoryService { + pub fn new(p: PathBuf) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +impl DirectoryService for SledDirectoryService { + type DirectoriesIterator = DirectoryTraverser<Self>; + + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + match self.db.get(digest.to_vec()) { + // The directory was not found, return + Ok(None) => Ok(None), + + // The directory was found, try to parse the data as Directory message + Ok(Some(data)) => match Directory::decode(&*data) { + Ok(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != *digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))); + } + + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + actual_digest, e, + ))); + } + + Ok(Some(directory)) + } + Err(e) => { + warn!("unable to parse directory {}: {}", digest, e); + Err(Error::StorageError(e.to_string())) + } + }, + // some storage error? + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + // store it + let result = self.db.insert(digest.to_vec(), directory.encode_to_vec()); + if let Err(e) = result { + return Err(Error::StorageError(e.to_string())); + } + Ok(digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { + DirectoryTraverser::with(self.clone(), root_directory_digest) + } + + type DirectoryPutter = SimplePutter<Self>; + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Self::DirectoryPutter + where + Self: Clone, + { + SimplePutter::new(self.clone()) + } +} diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs new file mode 100644 index 000000000000..8691baa8b73f --- /dev/null +++ b/tvix/store/src/directoryservice/traverse.rs @@ -0,0 +1,222 @@ +use super::DirectoryService; +use crate::{proto::NamedNode, B3Digest, Error}; +use tracing::{instrument, warn}; + +/// This traverses from a (root) node to the given (sub)path, returning the Node +/// at that path, or none, if there's nothing at that path. +/// TODO: Do we want to rewrite this in a non-recursing fashion, and use +/// [DirectoryService.get_recursive] to do less lookups? +/// Or do we consider this to be a non-issue due to store composition and local caching? +/// TODO: the name of this function (and mod) is a bit bad, because it doesn't +/// clearly distinguish it from the BFS traversers. +#[instrument(skip(directory_service))] +pub fn traverse_to<DS: DirectoryService>( + directory_service: &DS, + node: crate::proto::node::Node, + path: &std::path::Path, +) -> Result<Option<crate::proto::node::Node>, Error> { + // strip a possible `/` prefix from the path. + let path = { + if path.starts_with("/") { + path.strip_prefix("/").unwrap() + } else { + path + } + }; + + let mut it = path.components(); + + match it.next() { + None => { + // the (remaining) path is empty, return the node we've been called with. + Ok(Some(node)) + } + Some(first_component) => { + match node { + crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => { + // There's still some path left, but the current node is no directory. + // This means the path doesn't exist, as we can't reach it. + Ok(None) + } + crate::proto::node::Node::Directory(directory_node) => { + let digest = B3Digest::from_vec(directory_node.digest) + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + + // fetch the linked node from the directory_service + match directory_service.get(&digest)? { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + None => { + warn!("directory {} does not exist", digest); + + Err(Error::StorageError(format!( + "directory {} does not exist", + digest + ))) + } + Some(directory) => { + // look for first_component in the [Directory]. + // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we + // could stop as soon as e.name is larger than the search string. + let child_node = directory.nodes().find(|n| { + n.get_name() == first_component.as_os_str().to_str().unwrap() + }); + + match child_node { + // child node not found means there's no such element inside the directory. + None => Ok(None), + // child node found, recurse with it and the rest of the path. + Some(child_node) => { + let rest_path: std::path::PathBuf = it.collect(); + traverse_to(directory_service, child_node, &rest_path) + } + } + } + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use crate::{ + directoryservice::DirectoryPutter, + directoryservice::DirectoryService, + tests::{ + fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, + utils::gen_directory_service, + }, + }; + + use super::traverse_to; + + #[test] + fn test_traverse_to() { + let mut directory_service = gen_directory_service(); + + let mut handle = directory_service.put_multiple_start(); + handle + .put(DIRECTORY_WITH_KEEP.clone()) + .expect("must succeed"); + handle + .put(DIRECTORY_COMPLICATED.clone()) + .expect("must succeed"); + + // construct the node for DIRECTORY_COMPLICATED + let node_directory_complicated = + crate::proto::node::Node::Directory(crate::proto::DirectoryNode { + name: "doesntmatter".to_string(), + digest: DIRECTORY_COMPLICATED.digest().to_vec(), + size: DIRECTORY_COMPLICATED.size(), + }); + + // construct the node for DIRECTORY_COMPLICATED + let node_directory_with_keep = crate::proto::node::Node::Directory( + DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), + ); + + // construct the node for the .keep file + let node_file_keep = + crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + + // traversal to an empty subpath should return the root node. + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from(""), + ) + .expect("must succeed"); + + assert_eq!(Some(node_directory_complicated.clone()), resp); + } + + // traversal to `keep` should return the node for DIRECTORY_WITH_KEEP + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("keep"), + ) + .expect("must succeed"); + + assert_eq!(Some(node_directory_with_keep.clone()), resp); + } + + // traversal to `keep/.keep` should return the node for the .keep file + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("keep/.keep"), + ) + .expect("must succeed"); + + assert_eq!(Some(node_file_keep.clone()), resp); + } + + // traversal to `keep/.keep` should return the node for the .keep file + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("/keep/.keep"), + ) + .expect("must succeed"); + + assert_eq!(Some(node_file_keep.clone()), resp); + } + + // traversal to `void` should return None (doesn't exist) + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("void"), + ) + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to `void` should return None (doesn't exist) + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("//v/oid"), + ) + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to `keep/.keep/404` should return None (the path can't be + // reached, as keep/.keep already is a file) + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("keep/.keep/foo"), + ) + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to a subpath of '/' should return the root node. + { + let resp = traverse_to( + &mut directory_service, + node_directory_complicated.clone(), + &PathBuf::from("/"), + ) + .expect("must succeed"); + + assert_eq!(Some(node_directory_complicated.clone()), resp); + } + } +} diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs new file mode 100644 index 000000000000..3661808734f3 --- /dev/null +++ b/tvix/store/src/directoryservice/utils.rs @@ -0,0 +1,140 @@ +use super::DirectoryPutter; +use super::DirectoryService; +use crate::proto; +use crate::B3Digest; +use crate::Error; +use std::collections::{HashSet, VecDeque}; +use tracing::{debug_span, instrument, warn}; + +/// Traverses a [proto::Directory] from the root to the children. +/// +/// This is mostly BFS, but directories are only returned once. +pub struct DirectoryTraverser<DS: DirectoryService> { + directory_service: DS, + /// The list of all directories that still need to be traversed. The next + /// element is picked from the front, new elements are enqueued at the + /// back. + worklist_directory_digests: VecDeque<B3Digest>, + /// The list of directory digests already sent to the consumer. + /// We omit sending the same directories multiple times. + sent_directory_digests: HashSet<B3Digest>, +} + +impl<DS: DirectoryService> DirectoryTraverser<DS> { + pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self { + Self { + directory_service, + worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]), + sent_directory_digests: HashSet::new(), + } + } + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + fn enqueue_child_directories(&mut self, directory: &proto::Directory) { + for child_directory_node in &directory.directories { + // TODO: propagate error + let child_digest = B3Digest::from_vec(child_directory_node.digest.clone()).unwrap(); + + if self.worklist_directory_digests.contains(&child_digest) + || self.sent_directory_digests.contains(&child_digest) + { + continue; + } + self.worklist_directory_digests.push_back(child_digest); + } + } +} + +impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { + type Item = Result<proto::Directory, Error>; + + #[instrument(skip_all)] + fn next(&mut self) -> Option<Self::Item> { + // fetch the next directory digest from the top of the work queue. + match self.worklist_directory_digests.pop_front() { + None => None, + Some(current_directory_digest) => { + let span = debug_span!("directory.digest", "{}", current_directory_digest); + let _ = span.enter(); + + // look up the directory itself. + let current_directory = match self.directory_service.get(¤t_directory_digest) + { + // if we got it + Ok(Some(current_directory)) => { + // validate, we don't want to send invalid directories. + if let Err(e) = current_directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Some(Err(Error::StorageError(format!( + "invalid directory: {}", + current_directory_digest + )))); + } + current_directory + } + // if it's not there, we have an inconsistent store! + Ok(None) => { + warn!("directory {} does not exist", current_directory_digest); + return Some(Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + )))); + } + Err(e) => { + warn!("failed to look up directory"); + return Some(Err(Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_digest, e + )))); + } + }; + + // All DirectoryServices MUST validate directory nodes, before returning them out, so we + // can be sure [enqueue_child_directories] doesn't panic. + + // enqueue child directories + self.enqueue_child_directories(¤t_directory); + Some(Ok(current_directory)) + } + } + } +} + +/// This is a simple implementation of a Directory uploader. +/// TODO: verify connectivity? Factor out these checks into generic helpers? +pub struct SimplePutter<DS: DirectoryService> { + directory_service: DS, + last_directory_digest: Option<B3Digest>, +} + +impl<DS: DirectoryService> SimplePutter<DS> { + pub fn new(directory_service: DS) -> Self { + Self { + directory_service, + last_directory_digest: None, + } + } +} + +impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { + fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + let digest = self.directory_service.put(directory)?; + + // track the last directory digest + self.last_directory_digest = Some(digest); + + Ok(()) + } + + /// We need to be mutable here, as that's the signature of the trait. + fn close(&mut self) -> Result<B3Digest, Error> { + match &self.last_directory_digest { + Some(last_digest) => Ok(last_digest.clone()), + None => Err(Error::InvalidRequest( + "no directories sent, can't show root digest".to_string(), + )), + } + } +} diff --git a/tvix/store/src/errors.rs b/tvix/store/src/errors.rs new file mode 100644 index 000000000000..3b23f972b045 --- /dev/null +++ b/tvix/store/src/errors.rs @@ -0,0 +1,45 @@ +use std::sync::PoisonError; +use thiserror::Error; +use tokio::task::JoinError; +use tonic::Status; + +/// Errors related to communication with the store. +#[derive(Debug, Error)] +pub enum Error { + #[error("invalid request: {0}")] + InvalidRequest(String), + + #[error("internal storage error: {0}")] + StorageError(String), +} + +impl<T> From<PoisonError<T>> for Error { + fn from(value: PoisonError<T>) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<JoinError> for Error { + fn from(value: JoinError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<Error> for Status { + fn from(value: Error) -> Self { + match value { + Error::InvalidRequest(msg) => Status::invalid_argument(msg), + Error::StorageError(msg) => Status::data_loss(format!("storage error: {}", msg)), + } + } +} + +// TODO: this should probably go somewhere else? +impl From<Error> for std::io::Error { + fn from(value: Error) -> Self { + match value { + Error::InvalidRequest(msg) => Self::new(std::io::ErrorKind::InvalidInput, msg), + Error::StorageError(msg) => Self::new(std::io::ErrorKind::Other, msg), + } + } +} diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs new file mode 100644 index 000000000000..206e5eaba975 --- /dev/null +++ b/tvix/store/src/import.rs @@ -0,0 +1,220 @@ +use crate::{blobservice::BlobService, directoryservice::DirectoryService}; +use crate::{blobservice::BlobWriter, directoryservice::DirectoryPutter, proto}; +use std::{ + collections::HashMap, + fmt::Debug, + fs, + fs::File, + io, + os::unix::prelude::PermissionsExt, + path::{Path, PathBuf}, +}; +use tracing::instrument; +use walkdir::WalkDir; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed to upload directory at {0}: {1}")] + UploadDirectoryError(PathBuf, crate::Error), + + #[error("invalid encoding encountered for entry {0:?}")] + InvalidEncoding(PathBuf), + + #[error("unable to stat {0}: {1}")] + UnableToStat(PathBuf, std::io::Error), + + #[error("unable to open {0}: {1}")] + UnableToOpen(PathBuf, std::io::Error), + + #[error("unable to read {0}: {1}")] + UnableToRead(PathBuf, std::io::Error), +} + +impl From<super::Error> for Error { + fn from(value: super::Error) -> Self { + match value { + crate::Error::InvalidRequest(_) => panic!("tvix bug"), + crate::Error::StorageError(_) => panic!("error"), + } + } +} + +// This processes a given [walkdir::DirEntry] and returns a +// proto::node::Node, depending on the type of the entry. +// +// If the entry is a file, its contents are uploaded. +// If the entry is a directory, the Directory is uploaded as well. +// For this to work, it relies on the caller to provide the directory object +// with the previously returned (child) nodes. +// +// It assumes entries to be returned in "contents first" order, means this +// will only be called with a directory if all children of it have been +// visited. If the entry is indeed a directory, it'll also upload that +// directory to the store. For this, the so-far-assembled Directory object for +// this path needs to be passed in. +// +// It assumes the caller adds returned nodes to the directories it assembles. +#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] +fn process_entry<BS: BlobService, DP: DirectoryPutter>( + blob_service: &BS, + directory_putter: &mut DP, + entry: &walkdir::DirEntry, + maybe_directory: Option<proto::Directory>, +) -> Result<proto::node::Node, Error> { + let file_type = entry.file_type(); + + let entry_path: PathBuf = entry.path().to_path_buf(); + + if file_type.is_dir() { + let directory = maybe_directory + .expect("tvix bug: must be called with some directory in the case of directory"); + let directory_digest = directory.digest(); + let directory_size = directory.size(); + + // upload this directory + directory_putter + .put(directory) + .map_err(|e| Error::UploadDirectoryError(entry.path().to_path_buf(), e))?; + + return Ok(proto::node::Node::Directory(proto::DirectoryNode { + name: entry + .file_name() + .to_str() + .map(|s| Ok(s.to_owned())) + .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?, + digest: directory_digest.to_vec(), + size: directory_size, + })); + } + + if file_type.is_symlink() { + let target = std::fs::read_link(&entry_path) + .map_err(|e| Error::UnableToStat(entry_path.clone(), e))?; + + return Ok(proto::node::Node::Symlink(proto::SymlinkNode { + name: entry + .file_name() + .to_str() + .map(|s| Ok(s.to_owned())) + .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?, + target: target + .to_str() + .map(|s| Ok(s.to_owned())) + .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?, + })); + } + + if file_type.is_file() { + let metadata = entry + .metadata() + .map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?; + + let mut file = File::open(entry_path.clone()) + .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?; + + let mut writer = blob_service.open_write()?; + + if let Err(e) = io::copy(&mut file, &mut writer) { + return Err(Error::UnableToRead(entry_path, e)); + }; + + let digest = writer.close()?; + + return Ok(proto::node::Node::File(proto::FileNode { + name: entry + .file_name() + .to_str() + .map(|s| Ok(s.to_owned())) + .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?, + digest: digest.to_vec(), + size: metadata.len() as u32, + // If it's executable by the user, it'll become executable. + // This matches nix's dump() function behaviour. + executable: metadata.permissions().mode() & 64 != 0, + })); + } + todo!("handle other types") +} + +/// Ingests the contents at the given path into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// It returns the root node or an error. +/// +/// It's not interacting with a [PathInfoService], it's up to the caller to +/// possibly register it somewhere (and potentially rename it based on some +/// naming scheme. +#[instrument(skip(blob_service, directory_service), fields(path=?p))] +pub fn ingest_path<BS: BlobService, DS: DirectoryService, P: AsRef<Path> + Debug>( + blob_service: &BS, + directory_service: &DS, + p: P, +) -> Result<proto::node::Node, Error> { + // Probe if the path points to a symlink. If it does, we process it manually, + // due to https://github.com/BurntSushi/walkdir/issues/175. + let symlink_metadata = fs::symlink_metadata(p.as_ref()) + .map_err(|e| Error::UnableToStat(p.as_ref().to_path_buf(), e))?; + if symlink_metadata.is_symlink() { + let target = std::fs::read_link(p.as_ref()) + .map_err(|e| Error::UnableToStat(p.as_ref().to_path_buf(), e))?; + return Ok(proto::node::Node::Symlink(proto::SymlinkNode { + name: p + .as_ref() + .file_name() + .unwrap_or_default() + .to_str() + .map(|s| Ok(s.to_owned())) + .unwrap_or(Err(Error::InvalidEncoding(p.as_ref().to_path_buf())))?, + target: target + .to_str() + .map(|s| Ok(s.to_owned())) + .unwrap_or(Err(Error::InvalidEncoding(p.as_ref().to_path_buf())))?, + })); + } + + let mut directories: HashMap<PathBuf, proto::Directory> = HashMap::default(); + + let mut directory_putter = directory_service.put_multiple_start(); + + for entry in WalkDir::new(p) + .follow_links(false) + .contents_first(true) + .sort_by_file_name() + { + let entry = entry.unwrap(); + + // process_entry wants an Option<Directory> in case the entry points to a directory. + // make sure to provide it. + let maybe_directory: Option<proto::Directory> = { + if entry.file_type().is_dir() { + Some( + directories + .entry(entry.path().to_path_buf()) + .or_default() + .clone(), + ) + } else { + None + } + }; + + let node = process_entry(blob_service, &mut directory_putter, &entry, maybe_directory)?; + + if entry.depth() == 0 { + return Ok(node); + } else { + // calculate the parent path, and make sure we register the node there. + // NOTE: entry.depth() > 0 + let parent_path = entry.path().parent().unwrap().to_path_buf(); + + // record node in parent directory, creating a new [proto:Directory] if not there yet. + let parent_directory = directories.entry(parent_path).or_default(); + match node { + proto::node::Node::Directory(e) => parent_directory.directories.push(e), + proto::node::Node::File(e) => parent_directory.files.push(e), + proto::node::Node::Symlink(e) => parent_directory.symlinks.push(e), + } + } + } + // unreachable, we already bailed out before if root doesn't exist. + panic!("tvix bug") +} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs new file mode 100644 index 000000000000..7ae8587f8b26 --- /dev/null +++ b/tvix/store/src/lib.rs @@ -0,0 +1,17 @@ +mod digests; +mod errors; +mod store_io; + +pub mod blobservice; +pub mod directoryservice; +pub mod import; +pub mod nar; +pub mod pathinfoservice; +pub mod proto; + +pub use digests::B3Digest; +pub use errors::Error; +pub use store_io::TvixStoreIO; + +#[cfg(test)] +mod tests; diff --git a/tvix/store/src/nar/grpc_nar_calculation_service.rs b/tvix/store/src/nar/grpc_nar_calculation_service.rs new file mode 100644 index 000000000000..429593743914 --- /dev/null +++ b/tvix/store/src/nar/grpc_nar_calculation_service.rs @@ -0,0 +1,69 @@ +use super::NARCalculationService; +use crate::proto; +use tonic::transport::Channel; +use tonic::Status; + +/// A NAR calculation service which asks a remote tvix-store for NAR calculation +/// (via the gRPC PathInfoService). +#[derive(Clone)] +pub struct GRPCNARCalculationService { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, +} + +impl GRPCNARCalculationService { + /// construct a new [GRPCNARCalculationService], by passing a handle to the + /// tokio runtime, and a gRPC client. + pub fn new( + tokio_handle: tokio::runtime::Handle, + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle, + grpc_client, + } + } + + /// construct a [GRPCNARCalculationService], from a [proto::path_info_service_client::PathInfoServiceClient<Channel>]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle: tokio::runtime::Handle::current(), + grpc_client, + } + } +} + +impl NARCalculationService for GRPCNARCalculationService { + fn calculate_nar( + &self, + root_node: &proto::node::Node, + ) -> Result<(u64, [u8; 32]), super::RenderError> { + // Get a new handle to the gRPC client, and copy the root node. + let mut grpc_client = self.grpc_client.clone(); + let root_node = root_node.clone(); + + let task: tokio::task::JoinHandle<Result<_, Status>> = + self.tokio_handle.spawn(async move { + Ok(grpc_client + .calculate_nar(proto::Node { + node: Some(root_node), + }) + .await? + .into_inner()) + }); + + match self.tokio_handle.block_on(task).unwrap() { + Ok(resp) => Ok((resp.nar_size, resp.nar_sha256.to_vec().try_into().unwrap())), + Err(e) => Err(super::RenderError::StoreError(crate::Error::StorageError( + e.to_string(), + ))), + } + } +} diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs new file mode 100644 index 000000000000..a29cc5451bae --- /dev/null +++ b/tvix/store/src/nar/mod.rs @@ -0,0 +1,35 @@ +use crate::{proto, B3Digest}; +use data_encoding::BASE64; +use thiserror::Error; + +mod grpc_nar_calculation_service; +mod non_caching_calculation_service; +mod renderer; + +pub use grpc_nar_calculation_service::GRPCNARCalculationService; +pub use non_caching_calculation_service::NonCachingNARCalculationService; +pub use renderer::NARRenderer; + +/// Errors that can encounter while rendering NARs. +#[derive(Debug, Error)] +pub enum RenderError { + #[error("failure talking to a backing store client: {0}")] + StoreError(crate::Error), + + #[error("unable to find directory {}, referred from {}", .0, .1)] + DirectoryNotFound(B3Digest, String), + + #[error("unable to find blob {}, referred from {}", BASE64.encode(.0), .1)] + BlobNotFound([u8; 32], String), + + #[error("unexpected size in metadata for blob {}, referred from {} returned, expected {}, got {}", BASE64.encode(.0), .1, .2, .3)] + UnexpectedBlobMeta([u8; 32], String, u32, u32), + + #[error("failure using the NAR writer: {0}")] + NARWriterError(std::io::Error), +} + +/// The base trait for something calculating NARs, and returning their size and sha256. +pub trait NARCalculationService { + fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), RenderError>; +} diff --git a/tvix/store/src/nar/non_caching_calculation_service.rs b/tvix/store/src/nar/non_caching_calculation_service.rs new file mode 100644 index 000000000000..8a080cb4df5e --- /dev/null +++ b/tvix/store/src/nar/non_caching_calculation_service.rs @@ -0,0 +1,37 @@ +use count_write::CountWrite; +use sha2::{Digest, Sha256}; + +use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryService; +use crate::proto; + +use super::renderer::NARRenderer; +use super::{NARCalculationService, RenderError}; + +/// A NAR calculation service which simply renders the whole NAR whenever +/// we ask for the calculation. +#[derive(Clone)] +pub struct NonCachingNARCalculationService<BS: BlobService, DS: DirectoryService> { + nar_renderer: NARRenderer<BS, DS>, +} + +impl<BS: BlobService, DS: DirectoryService> NonCachingNARCalculationService<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { + Self { + nar_renderer: NARRenderer::new(blob_service, directory_service), + } + } +} + +impl<BS: BlobService, DS: DirectoryService> NARCalculationService + for NonCachingNARCalculationService<BS, DS> +{ + fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), RenderError> { + let h = Sha256::new(); + let mut cw = CountWrite::from(h); + + self.nar_renderer.write_nar(&mut cw, root_node)?; + + Ok((cw.count(), cw.into_inner().finalize().into())) + } +} diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs new file mode 100644 index 000000000000..c10f2ddf52fa --- /dev/null +++ b/tvix/store/src/nar/renderer.rs @@ -0,0 +1,136 @@ +use super::RenderError; +use crate::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{self, NamedNode}, + B3Digest, +}; +use nix_compat::nar; +use std::io::{self, BufReader}; +use tracing::warn; + +/// A NAR renderer, using a blob_service, chunk_service and directory_service +/// to render a NAR to a writer. +#[derive(Clone)] +pub struct NARRenderer<BS: BlobService, DS: DirectoryService> { + blob_service: BS, + directory_service: DS, +} + +impl<BS: BlobService, DS: DirectoryService> NARRenderer<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { + Self { + blob_service, + directory_service, + } + } + + /// Consumes a [proto::node::Node] pointing to the root of a (store) path, + /// and writes the contents in NAR serialization to the passed + /// [std::io::Write]. + /// + /// It uses the different clients in the struct to perform the necessary + /// lookups as it traverses the structure. + pub fn write_nar<W: std::io::Write>( + &self, + w: &mut W, + proto_root_node: &proto::node::Node, + ) -> Result<(), RenderError> { + // Initialize NAR writer + let nar_root_node = nar::writer::open(w).map_err(RenderError::NARWriterError)?; + + self.walk_node(nar_root_node, proto_root_node) + } + + /// Process an intermediate node in the structure. + /// This consumes the node. + fn walk_node( + &self, + nar_node: nar::writer::Node, + proto_node: &proto::node::Node, + ) -> Result<(), RenderError> { + match proto_node { + proto::node::Node::Symlink(proto_symlink_node) => { + nar_node + .symlink(&proto_symlink_node.target) + .map_err(RenderError::NARWriterError)?; + } + proto::node::Node::File(proto_file_node) => { + let digest = B3Digest::from_vec(proto_file_node.digest.clone()).map_err(|_e| { + warn!( + file_node = ?proto_file_node, + "invalid digest length in file node", + ); + + RenderError::StoreError(crate::Error::StorageError( + "invalid digest len in file node".to_string(), + )) + })?; + + let mut blob_reader = match self + .blob_service + .open_read(&digest) + .map_err(RenderError::StoreError)? + { + Some(blob_reader) => Ok(BufReader::new(blob_reader)), + None => Err(RenderError::NARWriterError(io::Error::new( + io::ErrorKind::NotFound, + format!("blob with digest {} not found", &digest), + ))), + }?; + + nar_node + .file( + proto_file_node.executable, + proto_file_node.size.into(), + &mut blob_reader, + ) + .map_err(RenderError::NARWriterError)?; + } + proto::node::Node::Directory(proto_directory_node) => { + let digest = + B3Digest::from_vec(proto_directory_node.digest.to_vec()).map_err(|_e| { + RenderError::StoreError(crate::Error::StorageError( + "invalid digest len in directory node".to_string(), + )) + })?; + + // look it up with the directory service + let resp = self + .directory_service + .get(&digest) + .map_err(RenderError::StoreError)?; + + match resp { + // if it's None, that's an error! + None => { + return Err(RenderError::DirectoryNotFound( + digest, + proto_directory_node.name.to_owned(), + )) + } + Some(proto_directory) => { + // start a directory node + let mut nar_node_directory = + nar_node.directory().map_err(RenderError::NARWriterError)?; + + // for each node in the directory, create a new entry with its name, + // and then invoke walk_node on that entry. + for proto_node in proto_directory.nodes() { + let child_node = nar_node_directory + .entry(proto_node.get_name()) + .map_err(RenderError::NARWriterError)?; + self.walk_node(child_node, &proto_node)?; + } + + // close the directory + nar_node_directory + .close() + .map_err(RenderError::NARWriterError)?; + } + } + } + } + Ok(()) + } +} diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs new file mode 100644 index 000000000000..6bb774c668a3 --- /dev/null +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -0,0 +1,81 @@ +use super::PathInfoService; +use crate::proto; +use tonic::{transport::Channel, Code, Status}; + +/// Connects to a (remote) tvix-store PathInfoService over gRPC. +#[derive(Clone)] +pub struct GRPCPathInfoService { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, +} + +impl GRPCPathInfoService { + /// Construct a new [GRPCPathInfoService], by passing a handle to the tokio + /// runtime, and a gRPC client. + pub fn new( + tokio_handle: tokio::runtime::Handle, + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle, + grpc_client, + } + } + + /// construct a [GRPCDirectoryService] from a [proto::path_info_service_client::PathInfoServiceClient<Channel>]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle: tokio::runtime::Handle::current(), + grpc_client, + } + } +} + +impl PathInfoService for GRPCPathInfoService { + fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> = + self.tokio_handle.spawn(async move { + let path_info = grpc_client + .get(proto::GetPathInfoRequest { + by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( + digest.to_vec(), + )), + }) + .await? + .into_inner(); + + Ok(path_info) + }); + + match self.tokio_handle.block_on(task)? { + Ok(path_info) => Ok(Some(path_info)), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> = + self.tokio_handle.spawn(async move { + let path_info = grpc_client.put(path_info).await?.into_inner(); + Ok(path_info) + }); + + self.tokio_handle + .block_on(task)? + .map_err(|e| crate::Error::StorageError(e.to_string())) + } +} diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs new file mode 100644 index 000000000000..d0ff1976efab --- /dev/null +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -0,0 +1,41 @@ +use super::PathInfoService; +use crate::{proto, Error}; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +#[derive(Default)] +pub struct MemoryPathInfoService { + db: Arc<RwLock<HashMap<[u8; 20], proto::PathInfo>>>, +} + +impl PathInfoService for MemoryPathInfoService { + fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { + let db = self.db.read().unwrap(); + + match db.get(&digest) { + None => Ok(None), + Some(path_info) => Ok(Some(path_info.clone())), + } + } + + fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { + // Call validate on the received PathInfo message. + match path_info.validate() { + Err(e) => Err(Error::InvalidRequest(format!( + "failed to validate PathInfo: {}", + e + ))), + + // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. + // This overwrites existing PathInfo objects. + Ok(nix_path) => { + let mut db = self.db.write().unwrap(); + db.insert(nix_path.digest, path_info.clone()); + + Ok(path_info) + } + } + } +} diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs new file mode 100644 index 000000000000..6a34e09af478 --- /dev/null +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -0,0 +1,20 @@ +mod grpc; +mod memory; +mod sled; + +use crate::{proto, Error}; + +pub use self::grpc::GRPCPathInfoService; +pub use self::memory::MemoryPathInfoService; +pub use self::sled::SledPathInfoService; + +/// The base trait all PathInfo services need to implement. +/// This is a simple get and put of [proto::Directory], returning their digest. +pub trait PathInfoService { + /// Retrieve a PathInfo message by the output digest. + fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error>; + + /// Store a PathInfo message. Implementations MUST call validate and reject + /// invalid messages. + fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error>; +} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs new file mode 100644 index 000000000000..8776ebcbc106 --- /dev/null +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -0,0 +1,76 @@ +use super::PathInfoService; +use crate::{proto, Error}; +use prost::Message; +use std::path::PathBuf; +use tracing::warn; + +/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). +/// +/// The PathInfo messages are stored as encoded protos, and keyed by their output hash, +/// as that's currently the only request type available. +#[derive(Clone)] +pub struct SledPathInfoService { + db: sled::Db, +} + +impl SledPathInfoService { + pub fn new(p: PathBuf) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +impl PathInfoService for SledPathInfoService { + fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { + match self.db.get(digest) { + Ok(None) => Ok(None), + Ok(Some(data)) => match proto::PathInfo::decode(&*data) { + Ok(path_info) => Ok(Some(path_info)), + Err(e) => { + warn!("failed to decode stored PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to decode stored PathInfo: {}", + e + ))) + } + }, + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to retrieve PathInfo: {}", + e + ))) + } + } + } + + fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { + // Call validate on the received PathInfo message. + match path_info.validate() { + Err(e) => Err(Error::InvalidRequest(format!( + "failed to validate PathInfo: {}", + e + ))), + // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. + // This overwrites existing PathInfo objects. + Ok(nix_path) => match self.db.insert(nix_path.digest, path_info.encode_to_vec()) { + Ok(_) => Ok(path_info), + Err(e) => { + warn!("failed to insert PathInfo: {}", e); + Err(Error::StorageError(format! { + "failed to insert PathInfo: {}", e + })) + } + }, + } + } +} diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs new file mode 100644 index 000000000000..3ec1d68872c7 --- /dev/null +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -0,0 +1,130 @@ +use crate::{ + blobservice::{BlobService, BlobWriter}, + proto::sync_read_into_async_read::SyncReadIntoAsyncRead, + B3Digest, +}; +use std::{collections::VecDeque, io, pin::Pin}; +use tokio::task; +use tokio_stream::StreamExt; +use tokio_util::io::ReaderStream; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{instrument, warn}; + +pub struct GRPCBlobServiceWrapper<BS: BlobService> { + blob_service: BS, +} + +impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> { + fn from(value: BS) -> Self { + Self { + blob_service: value, + } + } +} + +#[async_trait] +impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService + for GRPCBlobServiceWrapper<BS> +{ + // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 + type ReadStream = + Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>; + + #[instrument(skip(self))] + async fn stat( + &self, + request: Request<super::StatBlobRequest>, + ) -> Result<Response<super::BlobMeta>, Status> { + let rq = request.into_inner(); + let req_digest = B3Digest::from_vec(rq.digest) + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + if rq.include_chunks || rq.include_bao { + return Err(Status::internal("not implemented")); + } + + match self.blob_service.has(&req_digest) { + Ok(true) => Ok(Response::new(super::BlobMeta::default())), + Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))), + Err(e) => Err(e.into()), + } + } + + #[instrument(skip(self))] + async fn read( + &self, + request: Request<super::ReadBlobRequest>, + ) -> Result<Response<Self::ReadStream>, Status> { + let rq = request.into_inner(); + + let req_digest = B3Digest::from_vec(rq.digest) + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.open_read(&req_digest) { + Ok(Some(reader)) => { + let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into(); + + fn stream_mapper( + x: Result<bytes::Bytes, io::Error>, + ) -> Result<super::BlobChunk, Status> { + match x { + Ok(bytes) => Ok(super::BlobChunk { + data: bytes.to_vec(), + }), + Err(e) => Err(Status::from(e)), + } + } + + let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper); + Ok(Response::new(Box::pin(chunks_stream))) + } + Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), + Err(e) => Err(e.into()), + } + } + + #[instrument(skip(self))] + async fn put( + &self, + request: Request<Streaming<super::BlobChunk>>, + ) -> Result<Response<super::PutBlobResponse>, Status> { + let req_inner = request.into_inner(); + + let data_stream = req_inner.map(|x| { + x.map(|x| VecDeque::from(x.data)) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) + }); + + let data_reader = tokio_util::io::StreamReader::new(data_stream); + + // prepare a writer, which we'll use in the blocking task below. + let mut writer = self + .blob_service + .open_write() + .map_err(|e| Status::internal(format!("unable to open for write: {}", e)))?; + + let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> { + // construct a sync reader to the data + let mut reader = tokio_util::io::SyncIoBridge::new(data_reader); + + io::copy(&mut reader, &mut writer).map_err(|e| { + warn!("error copying: {}", e); + Status::internal("error copying") + })?; + + let digest = writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") + })? + .to_vec(); + + Ok(super::PutBlobResponse { digest }) + }) + .await + .map_err(|_| Status::internal("failed to wait for task"))??; + + Ok(Response::new(result)) + } +} diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs new file mode 100644 index 000000000000..6d2df310137f --- /dev/null +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -0,0 +1,177 @@ +use crate::proto; +use crate::{directoryservice::DirectoryService, B3Digest}; +use std::collections::HashMap; +use tokio::{sync::mpsc::channel, task}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{debug, instrument, warn}; + +pub struct GRPCDirectoryServiceWrapper<C: DirectoryService> { + directory_service: C, +} + +impl<DS: DirectoryService> From<DS> for GRPCDirectoryServiceWrapper<DS> { + fn from(value: DS) -> Self { + Self { + directory_service: value, + } + } +} + +#[async_trait] +impl<DS: DirectoryService + Send + Sync + Clone + 'static> + proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<DS> +{ + type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>; + + #[instrument(skip(self))] + async fn get( + &self, + request: Request<proto::GetDirectoryRequest>, + ) -> Result<Response<Self::GetStream>, Status> { + let (tx, rx) = channel(5); + + let req_inner = request.into_inner(); + + let directory_service = self.directory_service.clone(); + + let _task = { + // look at the digest in the request and put it in the top of the queue. + match &req_inner.by_what { + None => return Err(Status::invalid_argument("by_what needs to be specified")), + Some(proto::get_directory_request::ByWhat::Digest(digest)) => { + let digest = B3Digest::from_vec(digest.to_vec()) + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + task::spawn(async move { + if !req_inner.recursive { + let e: Result<proto::Directory, Status> = + match directory_service.get(&digest) { + Ok(Some(directory)) => Ok(directory), + Ok(None) => Err(Status::not_found(format!( + "directory {} not found", + digest + ))), + Err(e) => Err(e.into()), + }; + + if tx.send(e).await.is_err() { + debug!("receiver dropped"); + } + } else { + // If recursive was requested, traverse via get_recursive. + let directories_it = directory_service.get_recursive(&digest); + + for e in directories_it { + // map err in res from Error to Status + let res = e.map_err(|e| Status::internal(e.to_string())); + if tx.send(res).await.is_err() { + debug!("receiver dropped"); + break; + } + } + } + }); + } + } + }; + + let receiver_stream = ReceiverStream::new(rx); + Ok(Response::new(receiver_stream)) + } + + #[instrument(skip(self, request))] + async fn put( + &self, + request: Request<Streaming<proto::Directory>>, + ) -> Result<Response<proto::PutDirectoryResponse>, Status> { + let mut req_inner = request.into_inner(); + // TODO: let this use DirectoryPutter to the store it's connected to, + // and move the validation logic into [SimplePutter]. + + // This keeps track of the seen directory keys, and their size. + // This is used to validate the size field of a reference to a previously sent directory. + // We don't need to keep the contents around, they're stored in the DB. + let mut seen_directories_sizes: HashMap<B3Digest, u32> = HashMap::new(); + let mut last_directory_dgst: Option<B3Digest> = None; + + // Consume directories, and insert them into the store. + // Reject directory messages that refer to Directories not sent in the same stream. + while let Some(directory) = req_inner.message().await? { + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Status::invalid_argument(format!( + "directory {} failed validation: {}", + directory.digest(), + e, + ))); + } + + // for each child directory this directory refers to, we need + // to ensure it has been seen already in this stream, and that the size + // matches what we recorded. + for child_directory in &directory.directories { + let child_directory_digest = B3Digest::from_vec(child_directory.digest.to_vec()) + .map_err(|_e| Status::internal("invalid child directory digest len"))?; + + match seen_directories_sizes.get(&child_directory_digest) { + None => { + return Err(Status::invalid_argument(format!( + "child directory '{}' ({}) in directory '{}' not seen yet", + child_directory.name, + &child_directory_digest, + &directory.digest(), + ))); + } + Some(seen_child_directory_size) => { + if seen_child_directory_size != &child_directory.size { + return Err(Status::invalid_argument(format!( + "child directory '{}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}", + child_directory.name, + &child_directory_digest, + &directory.digest(), + seen_child_directory_size, + child_directory.size, + ))); + } + } + } + } + + // NOTE: We can't know if a directory we're receiving actually is + // part of the closure, because we receive directories from the leaf nodes up to + // the root. + // The only thing we could to would be doing a final check when the + // last Directory was received, that all Directories received so far are + // reachable from that (root) node. + + let dgst = directory.digest(); + seen_directories_sizes.insert(dgst.clone(), directory.size()); + last_directory_dgst = Some(dgst.clone()); + + // check if the directory already exists in the database. We can skip + // inserting if it's already there, as that'd be a no-op. + match self.directory_service.get(&dgst) { + Err(e) => { + warn!("error checking if directory already exists: {}", e); + return Err(e.into()); + } + // skip if already exists + Ok(Some(_)) => {} + // insert if it doesn't already exist + Ok(None) => { + self.directory_service.put(directory)?; + } + } + } + + // We're done receiving. peek at last_directory_digest and either return the digest, + // or an error, if we received an empty stream. + match last_directory_dgst { + None => Err(Status::invalid_argument("no directories received")), + Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse { + root_digest: last_directory_dgst.to_vec(), + })), + } + } +} diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs new file mode 100644 index 000000000000..e82557b3a06c --- /dev/null +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -0,0 +1,93 @@ +use crate::nar::RenderError; +use crate::proto; +use crate::{nar::NARCalculationService, pathinfoservice::PathInfoService}; +use tonic::{async_trait, Request, Response, Result, Status}; +use tracing::{instrument, warn}; + +pub struct GRPCPathInfoServiceWrapper<PS: PathInfoService, NS: NARCalculationService> { + path_info_service: PS, + nar_calculation_service: NS, +} + +impl<PS: PathInfoService, NS: NARCalculationService> GRPCPathInfoServiceWrapper<PS, NS> { + pub fn new(path_info_service: PS, nar_calculation_service: NS) -> Self { + Self { + path_info_service, + nar_calculation_service, + } + } +} + +#[async_trait] +impl< + PS: PathInfoService + Send + Sync + 'static, + NS: NARCalculationService + Send + Sync + 'static, + > proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, NS> +{ + #[instrument(skip(self))] + async fn get( + &self, + request: Request<proto::GetPathInfoRequest>, + ) -> Result<Response<proto::PathInfo>> { + match request.into_inner().by_what { + None => Err(Status::unimplemented("by_what needs to be specified")), + Some(proto::get_path_info_request::ByWhat::ByOutputHash(digest)) => { + let digest: [u8; 20] = digest + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + match self.path_info_service.get(digest) { + Ok(None) => Err(Status::not_found("PathInfo not found")), + Ok(Some(path_info)) => Ok(Response::new(path_info)), + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(e.into()) + } + } + } + } + } + + #[instrument(skip(self))] + async fn put(&self, request: Request<proto::PathInfo>) -> Result<Response<proto::PathInfo>> { + let path_info = request.into_inner(); + + // Store the PathInfo in the client. Clients MUST validate the data + // they receive, so we don't validate additionally here. + match self.path_info_service.put(path_info) { + Ok(path_info_new) => Ok(Response::new(path_info_new)), + Err(e) => { + warn!("failed to insert PathInfo: {}", e); + Err(e.into()) + } + } + } + + #[instrument(skip(self))] + async fn calculate_nar( + &self, + request: Request<proto::Node>, + ) -> Result<Response<proto::CalculateNarResponse>> { + match request.into_inner().node { + None => Err(Status::invalid_argument("no root node sent")), + Some(root_node) => match self.nar_calculation_service.calculate_nar(&root_node) { + Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse { + nar_size, + nar_sha256: nar_sha256.to_vec(), + })), + Err(e) => Err(e.into()), + }, + } + } +} + +impl From<RenderError> for tonic::Status { + fn from(value: RenderError) -> Self { + match value { + RenderError::BlobNotFound(_, _) => Self::not_found(value.to_string()), + RenderError::DirectoryNotFound(_, _) => Self::not_found(value.to_string()), + RenderError::NARWriterError(_) => Self::internal(value.to_string()), + RenderError::StoreError(_) => Self::internal(value.to_string()), + RenderError::UnexpectedBlobMeta(_, _, _, _) => Self::internal(value.to_string()), + } + } +} diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs new file mode 100644 index 000000000000..4db0b9731edc --- /dev/null +++ b/tvix/store/src/proto/mod.rs @@ -0,0 +1,370 @@ +#![allow(clippy::derive_partial_eq_without_eq)] +// https://github.com/hyperium/tonic/issues/1056 +use std::{collections::HashSet, iter::Peekable}; +use thiserror::Error; + +use prost::Message; + +use nix_compat::store_path::{self, StorePath}; + +mod grpc_blobservice_wrapper; +mod grpc_directoryservice_wrapper; +mod grpc_pathinfoservice_wrapper; + +mod sync_read_into_async_read; + +pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; +pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; +pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper; + +use crate::B3Digest; + +tonic::include_proto!("tvix.store.v1"); + +#[cfg(feature = "reflection")] +/// Compiled file descriptors for implementing [gRPC +/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g. +/// [`tonic_reflection`](https://docs.rs/tonic-reflection). +pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.store.v1"); + +#[cfg(test)] +mod tests; + +/// Errors that can occur during the validation of Directory messages. +#[derive(Debug, PartialEq, Eq, Error)] +pub enum ValidateDirectoryError { + /// Elements are not in sorted order + #[error("{0} is not sorted")] + WrongSorting(String), + /// Multiple elements with the same name encountered + #[error("{0} is a duplicate name")] + DuplicateName(String), + /// Invalid name encountered + #[error("Invalid name in {0}")] + InvalidName(String), + /// Invalid digest length encountered + #[error("Invalid Digest length: {0}")] + InvalidDigestLen(usize), +} + +/// Errors that can occur during the validation of PathInfo messages. +#[derive(Debug, Error, PartialEq)] +pub enum ValidatePathInfoError { + /// No node present + #[error("No node present")] + NoNodePresent(), + + /// Invalid node name encountered. + #[error("Failed to parse {0} as StorePath: {1}")] + InvalidNodeName(String, store_path::Error), + + /// The digest the (root) node refers to has invalid length. + #[error("Invalid Digest length: {0}")] + InvalidDigestLen(usize), + + /// The number of references in the narinfo.reference_names field does not match + /// the number of references in the .references field. + #[error("Inconsistent Number of References: {0} (references) vs {0} (narinfo)")] + InconsistentNumberOfReferences(usize, usize), +} + +/// Checks a Node name for validity as an intermediate node, and returns an +/// error that's generated from the supplied constructor. +/// +/// We disallow slashes, null bytes, '.', '..' and the empty string. +fn validate_node_name<E>(name: &str, err: fn(String) -> E) -> Result<(), E> { + if name.is_empty() || name == ".." || name == "." || name.contains('\x00') || name.contains('/') + { + return Err(err(name.to_string())); + } + Ok(()) +} + +/// Checks a digest for validity. +/// Digests are 32 bytes long, as we store blake3 digests. +fn validate_digest<E>(digest: &Vec<u8>, err: fn(usize) -> E) -> Result<(), E> { + if digest.len() != 32 { + return Err(err(digest.len())); + } + Ok(()) +} + +/// Parses a root node name. +/// +/// On success, this returns the parsed [StorePath]. +/// On error, it returns an error generated from the supplied constructor. +fn parse_node_name_root<E>( + name: &str, + err: fn(String, store_path::Error) -> E, +) -> Result<StorePath, E> { + match StorePath::from_string(name) { + Ok(np) => Ok(np), + Err(e) => Err(err(name.to_string(), e)), + } +} + +impl PathInfo { + /// validate performs some checks on the PathInfo struct, + /// Returning either a [StorePath] of the root node, or a + /// [ValidatePathInfoError]. + pub fn validate(&self) -> Result<StorePath, ValidatePathInfoError> { + // If there is a narinfo field populated, ensure the number of references there + // matches PathInfo.references count. + if let Some(narinfo) = &self.narinfo { + if narinfo.reference_names.len() != self.references.len() { + return Err(ValidatePathInfoError::InconsistentNumberOfReferences( + narinfo.reference_names.len(), + self.references.len(), + )); + } + } + // FUTUREWORK: parse references in reference_names. ensure they start + // with storeDir, and use the same digest as in self.references. + + // Ensure there is a (root) node present, and it properly parses to a [StorePath]. + let root_nix_path = match &self.node { + None => { + return Err(ValidatePathInfoError::NoNodePresent()); + } + Some(Node { node }) => match node { + None => { + return Err(ValidatePathInfoError::NoNodePresent()); + } + Some(node::Node::Directory(directory_node)) => { + // ensure the digest has the appropriate size. + validate_digest( + &directory_node.digest, + ValidatePathInfoError::InvalidDigestLen, + )?; + + // parse the name + parse_node_name_root( + &directory_node.name, + ValidatePathInfoError::InvalidNodeName, + )? + } + Some(node::Node::File(file_node)) => { + // ensure the digest has the appropriate size. + validate_digest(&file_node.digest, ValidatePathInfoError::InvalidDigestLen)?; + + // parse the name + parse_node_name_root(&file_node.name, ValidatePathInfoError::InvalidNodeName)? + } + Some(node::Node::Symlink(symlink_node)) => { + // parse the name + parse_node_name_root( + &symlink_node.name, + ValidatePathInfoError::InvalidNodeName, + )? + } + }, + }; + + // return the root nix path + Ok(root_nix_path) + } +} + +/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] +/// and [node::Node], so we can ask all of them for the name easily. +pub trait NamedNode { + fn get_name(&self) -> &str; +} + +impl NamedNode for &FileNode { + fn get_name(&self) -> &str { + self.name.as_str() + } +} + +impl NamedNode for &DirectoryNode { + fn get_name(&self) -> &str { + self.name.as_str() + } +} + +impl NamedNode for &SymlinkNode { + fn get_name(&self) -> &str { + self.name.as_str() + } +} + +impl NamedNode for node::Node { + fn get_name(&self) -> &str { + match self { + node::Node::File(node_file) => &node_file.name, + node::Node::Directory(node_directory) => &node_directory.name, + node::Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} + +/// Accepts a name, and a mutable reference to the previous name. +/// If the passed name is larger than the previous one, the reference is updated. +/// If it's not, an error is returned. +fn update_if_lt_prev<'n>( + prev_name: &mut &'n str, + name: &'n str, +) -> Result<(), ValidateDirectoryError> { + if *name < **prev_name { + return Err(ValidateDirectoryError::WrongSorting(name.to_string())); + } + *prev_name = name; + Ok(()) +} + +/// Inserts the given name into a HashSet if it's not already in there. +/// If it is, an error is returned. +fn insert_once<'n>( + seen_names: &mut HashSet<&'n str>, + name: &'n str, +) -> Result<(), ValidateDirectoryError> { + if seen_names.get(name).is_some() { + return Err(ValidateDirectoryError::DuplicateName(name.to_string())); + } + seen_names.insert(name); + Ok(()) +} + +impl Directory { + /// The size of a directory is the number of all regular and symlink elements, + /// the number of directory elements, and their size fields. + pub fn size(&self) -> u32 { + self.files.len() as u32 + + self.symlinks.len() as u32 + + self + .directories + .iter() + .fold(0, |acc: u32, e| (acc + 1 + e.size)) + } + + /// Calculates the digest of a Directory, which is the blake3 hash of a + /// Directory protobuf message, serialized in protobuf canonical form. + pub fn digest(&self) -> B3Digest { + let mut hasher = blake3::Hasher::new(); + + let vec = hasher + .update(&self.encode_to_vec()) + .finalize() + .as_bytes() + .to_vec(); + B3Digest::from_vec(vec).unwrap() + } + + /// validate checks the directory for invalid data, such as: + /// - violations of name restrictions + /// - invalid digest lengths + /// - not properly sorted lists + /// - duplicate names in the three lists + pub fn validate(&self) -> Result<(), ValidateDirectoryError> { + let mut seen_names: HashSet<&str> = HashSet::new(); + + let mut last_directory_name: &str = ""; + let mut last_file_name: &str = ""; + let mut last_symlink_name: &str = ""; + + // check directories + for directory_node in &self.directories { + validate_node_name(&directory_node.name, ValidateDirectoryError::InvalidName)?; + validate_digest( + &directory_node.digest, + ValidateDirectoryError::InvalidDigestLen, + )?; + + update_if_lt_prev(&mut last_directory_name, directory_node.name.as_str())?; + insert_once(&mut seen_names, directory_node.name.as_str())?; + } + + // check files + for file_node in &self.files { + validate_node_name(&file_node.name, ValidateDirectoryError::InvalidName)?; + validate_digest(&file_node.digest, ValidateDirectoryError::InvalidDigestLen)?; + + update_if_lt_prev(&mut last_file_name, file_node.name.as_str())?; + insert_once(&mut seen_names, file_node.name.as_str())?; + } + + // check symlinks + for symlink_node in &self.symlinks { + validate_node_name(&symlink_node.name, ValidateDirectoryError::InvalidName)?; + + update_if_lt_prev(&mut last_symlink_name, symlink_node.name.as_str())?; + insert_once(&mut seen_names, symlink_node.name.as_str())?; + } + + Ok(()) + } + + /// Allows iterating over all three nodes ([DirectoryNode], [FileNode], + /// [SymlinkNode]) in an ordered fashion, as long as the individual lists + /// are sorted (which can be checked by the [Directory::validate]). + pub fn nodes(&self) -> DirectoryNodesIterator { + return DirectoryNodesIterator { + i_directories: self.directories.iter().peekable(), + i_files: self.files.iter().peekable(), + i_symlinks: self.symlinks.iter().peekable(), + }; + } +} + +/// Struct to hold the state of an iterator over all nodes of a Directory. +/// +/// Internally, this keeps peekable Iterators over all three lists of a +/// Directory message. +pub struct DirectoryNodesIterator<'a> { + // directory: &Directory, + i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>, + i_files: Peekable<std::slice::Iter<'a, FileNode>>, + i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>, +} + +/// looks at two elements implementing NamedNode, and returns true if "left +/// is smaller / comes first". +/// +/// Some(_) is preferred over None. +fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool { + match left { + // if left is None, right always wins + None => false, + Some(left_inner) => { + // left is Some. + match right { + // left is Some, right is None - left wins. + None => true, + Some(right_inner) => { + // both are Some - compare the name. + return left_inner.get_name() < right_inner.get_name(); + } + } + } + } +} + +impl Iterator for DirectoryNodesIterator<'_> { + type Item = node::Node; + + // next returns the next node in the Directory. + // we peek at all three internal iterators, and pick the one with the + // smallest name, to ensure lexicographical ordering. + // The individual lists are already known to be sorted. + fn next(&mut self) -> Option<Self::Item> { + if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) { + // i_directories is still in the game, compare with symlinks + if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) { + self.i_directories + .next() + .cloned() + .map(node::Node::Directory) + } else { + self.i_symlinks.next().cloned().map(node::Node::Symlink) + } + } else { + // i_files is still in the game, compare with symlinks + if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) { + self.i_files.next().cloned().map(node::Node::File) + } else { + self.i_symlinks.next().cloned().map(node::Node::Symlink) + } + } + } +} diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs new file mode 100644 index 000000000000..0a0ef019781c --- /dev/null +++ b/tvix/store/src/proto/sync_read_into_async_read.rs @@ -0,0 +1,158 @@ +use bytes::Buf; +use core::task::Poll::Ready; +use futures::ready; +use futures::Future; +use std::io; +use std::io::Read; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncRead; +use tokio::runtime::Handle; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; + +#[derive(Debug)] +enum State<Buf: bytes::Buf + bytes::BufMut> { + Idle(Option<Buf>), + Busy(JoinHandle<(io::Result<usize>, Buf)>), +} + +use State::{Busy, Idle}; + +/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a +/// synchronous API. +#[derive(Debug)] +pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> { + state: Mutex<State<Buf>>, + reader: Arc<Mutex<R>>, + rt: Handle, +} + +impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> { + /// This must be called from within a Tokio runtime context, or else it will panic. + #[track_caller] + pub fn new(rt: Handle, reader: R) -> Self { + Self { + rt, + state: State::Idle(None).into(), + reader: Arc::new(reader.into()), + } + } + + /// This must be called from within a Tokio runtime context, or else it will panic. + pub fn new_with_reader(readable: R) -> Self { + Self::new(Handle::current(), readable) + } +} + +/// Repeats operations that are interrupted. +macro_rules! uninterruptibly { + ($e:expr) => {{ + loop { + match $e { + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + res => break res, + } + } + }}; +} + +impl< + R: Read + Send + 'static + std::marker::Unpin, + Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static, + > AsyncRead for SyncReadIntoAsyncRead<R, Buf> +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + dst: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + let me = self.get_mut(); + // Do we need this mutex? + let state = me.state.get_mut(); + + loop { + match state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap_or_default(); + + if buf.has_remaining() { + // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]` + // The `rest` is stuffed into the `buf_cell` for further poll_read. + // The other is completely consumed into the unfilled destination. + // `rest` can be empty. + let mut adjusted_src = + buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining())); + let copied_size = adjusted_src.remaining(); + adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size)); + dst.set_filled(copied_size); + *buf_cell = Some(buf); + return Ready(Ok(())); + } + + let reader = me.reader.clone(); + *state = Busy(me.rt.spawn_blocking(move || { + let result = uninterruptibly!(reader.blocking_lock().read( + // SAFETY: `reader.read` will *ONLY* write initialized bytes + // and never *READ* uninitialized bytes + // inside this buffer. + // + // Furthermore, casting the slice as `*mut [u8]` + // is safe because it has the same layout. + // + // Finally, the pointer obtained is valid and owned + // by `buf` only as we have a valid mutable reference + // to it, it is valid for write. + // + // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998 + unsafe { + &mut *(buf.chunk_mut().as_uninit_slice_mut() + as *mut [std::mem::MaybeUninit<u8>] + as *mut [u8]) + } + )); + + if let Ok(n) = result { + // SAFETY: given we initialize `n` bytes, we can move `n` bytes + // forward. + unsafe { + buf.advance_mut(n); + } + } + + (result, buf) + })); + } + Busy(ref mut rx) => { + let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?; + + match result { + Ok(n) => { + if n > 0 { + let remaining = std::cmp::min(n, dst.remaining()); + let mut adjusted_src = buf.copy_to_bytes(remaining); + adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining)); + dst.advance(remaining); + } + *state = Idle(Some(buf)); + return Ready(Ok(())); + } + Err(e) => { + *state = Idle(None); + return Ready(Err(e)); + } + } + } + } + } + } +} + +impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> { + /// This must be called from within a Tokio runtime context, or else it will panic. + fn from(value: R) -> Self { + Self::new_with_reader(value) + } +} diff --git a/tvix/store/src/proto/tests/directory.rs b/tvix/store/src/proto/tests/directory.rs new file mode 100644 index 000000000000..8d6ca7241d7a --- /dev/null +++ b/tvix/store/src/proto/tests/directory.rs @@ -0,0 +1,289 @@ +use crate::{ + proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError}, + B3Digest, +}; +use lazy_static::lazy_static; + +lazy_static! { + static ref DUMMY_DIGEST: [u8; 32] = [ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + ]; +} +#[test] +fn size() { + { + let d = Directory::default(); + assert_eq!(d.size(), 0); + } + { + let d = Directory { + directories: vec![DirectoryNode { + name: String::from("foo"), + digest: DUMMY_DIGEST.to_vec(), + size: 0, + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } + { + let d = Directory { + directories: vec![DirectoryNode { + name: String::from("foo"), + digest: DUMMY_DIGEST.to_vec(), + size: 4, + }], + ..Default::default() + }; + assert_eq!(d.size(), 5); + } + { + let d = Directory { + files: vec![FileNode { + name: String::from("foo"), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + executable: false, + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: String::from("foo"), + target: String::from("bar"), + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } +} + +#[test] +fn digest() { + let d = Directory::default(); + + assert_eq!( + d.digest(), + B3Digest::from_vec(vec![ + 0xaf, 0x13, 0x49, 0xb9, 0xf5, 0xf9, 0xa1, 0xa6, 0xa0, 0x40, 0x4d, 0xea, 0x36, 0xdc, + 0xc9, 0x49, 0x9b, 0xcb, 0x25, 0xc9, 0xad, 0xc1, 0x12, 0xb7, 0xcc, 0x9a, 0x93, 0xca, + 0xe4, 0x1f, 0x32, 0x62 + ]) + .unwrap() + ) +} + +#[test] +fn validate_empty() { + let d = Directory::default(); + assert_eq!(d.validate(), Ok(())); +} + +#[test] +fn validate_invalid_names() { + { + let d = Directory { + directories: vec![DirectoryNode { + name: "".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidName(n) => { + assert_eq!(n, "") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + directories: vec![DirectoryNode { + name: ".".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidName(n) => { + assert_eq!(n, ".") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + files: vec![FileNode { + name: "..".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + executable: false, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidName(n) => { + assert_eq!(n, "..") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "\x00".to_string(), + target: "foo".to_string(), + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidName(n) => { + assert_eq!(n, "\x00") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "foo/bar".to_string(), + target: "foo".to_string(), + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidName(n) => { + assert_eq!(n, "foo/bar") + } + _ => panic!("unexpected error"), + }; + } +} + +#[test] +fn validate_invalid_digest() { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".to_string(), + digest: vec![0x00, 0x42], // invalid length + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidDigestLen(n) => { + assert_eq!(n, 2) + } + _ => panic!("unexpected error"), + } +} + +#[test] +fn validate_sorting() { + // "b" comes before "a", bad. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "b".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + }, + DirectoryNode { + name: "a".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + }, + ], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::WrongSorting(s) => { + assert_eq!(s, "a".to_string()); + } + _ => panic!("unexpected error"), + } + } + + // "a" exists twice, bad. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "a".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + }, + DirectoryNode { + name: "a".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + }, + ], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::DuplicateName(s) => { + assert_eq!(s, "a".to_string()); + } + _ => panic!("unexpected error"), + } + } + + // "a" comes before "b", all good. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "a".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + }, + DirectoryNode { + name: "b".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + }, + ], + ..Default::default() + }; + + d.validate().expect("validate shouldn't error"); + } + + // [b, c] and [a] are both properly sorted. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "b".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + }, + DirectoryNode { + name: "c".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 42, + }, + ], + symlinks: vec![SymlinkNode { + name: "a".to_string(), + target: "foo".to_string(), + }], + ..Default::default() + }; + + d.validate().expect("validate shouldn't error"); + } +} diff --git a/tvix/store/src/proto/tests/directory_nodes_iterator.rs b/tvix/store/src/proto/tests/directory_nodes_iterator.rs new file mode 100644 index 000000000000..9a283f72bd45 --- /dev/null +++ b/tvix/store/src/proto/tests/directory_nodes_iterator.rs @@ -0,0 +1,80 @@ +use crate::proto::node::Node; +use crate::proto::Directory; +use crate::proto::DirectoryNode; +use crate::proto::FileNode; +use crate::proto::SymlinkNode; + +#[test] +fn iterator() { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "c".to_string(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "d".to_string(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "h".to_string(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "l".to_string(), + ..DirectoryNode::default() + }, + ], + files: vec![ + FileNode { + name: "b".to_string(), + ..FileNode::default() + }, + FileNode { + name: "e".to_string(), + ..FileNode::default() + }, + FileNode { + name: "g".to_string(), + ..FileNode::default() + }, + FileNode { + name: "j".to_string(), + ..FileNode::default() + }, + ], + symlinks: vec![ + SymlinkNode { + name: "a".to_string(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "f".to_string(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "i".to_string(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "k".to_string(), + ..SymlinkNode::default() + }, + ], + }; + + let mut node_names: Vec<String> = vec![]; + + for node in d.nodes() { + match node { + Node::Directory(n) => node_names.push(n.name), + Node::File(n) => node_names.push(n.name), + Node::Symlink(n) => node_names.push(n.name), + }; + } + + assert_eq!( + vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"], + node_names + ); +} diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs new file mode 100644 index 000000000000..02e04e7d723f --- /dev/null +++ b/tvix/store/src/proto/tests/grpc_blobservice.rs @@ -0,0 +1,102 @@ +use crate::blobservice::BlobService; +use crate::proto::blob_service_server::BlobService as GRPCBlobService; +use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest}; +use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST}; +use crate::tests::utils::gen_blob_service; +use tokio_stream::StreamExt; + +fn gen_grpc_blob_service( +) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> { + let blob_service = gen_blob_service(); + GRPCBlobServiceWrapper::from(blob_service) +} + +/// Trying to read a non-existent blob should return a not found error. +#[tokio::test] +async fn not_found_read() { + let service = gen_grpc_blob_service(); + + let resp = service + .read(tonic::Request::new(ReadBlobRequest { + digest: BLOB_A_DIGEST.to_vec(), + })) + .await; + + // We can't use unwrap_err here, because the Ok value doesn't implement + // debug. + if let Err(e) = resp { + assert_eq!(e.code(), tonic::Code::NotFound); + } else { + panic!("resp is not err") + } +} + +/// Trying to stat a non-existent blob should return a not found error. +#[tokio::test] +async fn not_found_stat() { + let service = gen_grpc_blob_service(); + + let resp = service + .stat(tonic::Request::new(StatBlobRequest { + digest: BLOB_A_DIGEST.to_vec(), + ..Default::default() + })) + .await + .expect_err("must fail"); + + // The resp should be a status with Code::NotFound + assert_eq!(resp.code(), tonic::Code::NotFound); +} + +/// Put a blob in the store, get it back. +#[tokio::test] +async fn put_read_stat() { + let service = gen_grpc_blob_service(); + + // Send blob A. + let put_resp = service + .put(tonic_mock::streaming_request(vec![BlobChunk { + data: BLOB_A.clone(), + }])) + .await + .expect("must succeed") + .into_inner(); + + assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest); + + // Stat for the digest of A. + // We currently don't ask for more granular chunking data, as we don't + // expose it yet. + let _resp = service + .stat(tonic::Request::new(StatBlobRequest { + digest: BLOB_A_DIGEST.to_vec(), + ..Default::default() + })) + .await + .expect("must succeed") + .into_inner(); + + // Read the blob. It should return the same data. + let resp = service + .read(tonic::Request::new(ReadBlobRequest { + digest: BLOB_A_DIGEST.to_vec(), + })) + .await; + + let mut rx = resp.ok().unwrap().into_inner(); + + // the stream should contain one element, a BlobChunk with the same contents as BLOB_A. + let item = rx + .next() + .await + .expect("must be some") + .expect("must succeed"); + + assert_eq!(BLOB_A.to_vec(), item.data); + + // … and no more elements + assert!(rx.next().await.is_none()); + + // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks. + // Test with some bigger blob too +} diff --git a/tvix/store/src/proto/tests/grpc_directoryservice.rs b/tvix/store/src/proto/tests/grpc_directoryservice.rs new file mode 100644 index 000000000000..069e82f6463e --- /dev/null +++ b/tvix/store/src/proto/tests/grpc_directoryservice.rs @@ -0,0 +1,241 @@ +use crate::directoryservice::DirectoryService; +use crate::proto::directory_service_server::DirectoryService as GRPCDirectoryService; +use crate::proto::get_directory_request::ByWhat; +use crate::proto::{Directory, DirectoryNode, SymlinkNode}; +use crate::proto::{GRPCDirectoryServiceWrapper, GetDirectoryRequest}; +use crate::tests::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; +use crate::tests::utils::gen_directory_service; +use tokio_stream::StreamExt; +use tonic::Status; + +fn gen_grpc_service( +) -> GRPCDirectoryServiceWrapper<impl DirectoryService + Send + Sync + Clone + 'static> { + let directory_service = gen_directory_service(); + GRPCDirectoryServiceWrapper::from(directory_service) +} + +/// Send the specified GetDirectoryRequest. +/// Returns an error in the case of an error response, or an error in one of +// the items in the stream, or a Vec<Directory> in the case of a successful +/// request. +async fn get_directories<S: GRPCDirectoryService>( + svc: &S, + get_directory_request: GetDirectoryRequest, +) -> Result<Vec<Directory>, Status> { + let resp = svc.get(tonic::Request::new(get_directory_request)).await; + + // if the response is an error itself, return the error, otherwise unpack + let stream = match resp { + Ok(resp) => resp, + Err(status) => return Err(status), + } + .into_inner(); + + let directory_results: Vec<Result<Directory, Status>> = stream.collect().await; + + // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status> + directory_results.into_iter().collect() +} + +/// Trying to get a non-existent Directory should return a not found error. +#[tokio::test] +async fn not_found() { + let service = gen_grpc_service(); + + let resp = service + .get(tonic::Request::new(GetDirectoryRequest { + by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().to_vec())), + ..Default::default() + })) + .await; + + let mut rx = resp.expect("must succeed").into_inner().into_inner(); + + // The stream should contain one element, an error with Code::NotFound. + let item = rx + .recv() + .await + .expect("must be some") + .expect_err("must be err"); + assert_eq!(item.code(), tonic::Code::NotFound); + + // … and nothing else + assert!(rx.recv().await.is_none()); +} + +/// Put a Directory into the store, get it back. +#[tokio::test] +async fn put_get() { + let service = gen_grpc_service(); + + let streaming_request = tonic_mock::streaming_request(vec![DIRECTORY_A.clone()]); + let put_resp = service + .put(streaming_request) + .await + .expect("must succeed") + .into_inner(); + + // the sent root_digest should match the calculated digest + assert_eq!(put_resp.root_digest, DIRECTORY_A.digest().to_vec()); + + // get it back + let items = get_directories( + &service, + GetDirectoryRequest { + by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().to_vec())), + ..Default::default() + }, + ) + .await + .expect("must not error"); + + assert_eq!(vec![DIRECTORY_A.clone()], items); +} + +/// Put multiple Directories into the store, and get them back +#[tokio::test] +async fn put_get_multiple() { + let service = gen_grpc_service(); + + // sending "b" (which refers to "a") without sending "a" first should fail. + let put_resp = service + .put(tonic_mock::streaming_request(vec![DIRECTORY_B.clone()])) + .await + .expect_err("must fail"); + + assert_eq!(tonic::Code::InvalidArgument, put_resp.code()); + + // sending "a", then "b" should succeed, and the response should contain the digest of b. + let put_resp = service + .put(tonic_mock::streaming_request(vec![ + DIRECTORY_A.clone(), + DIRECTORY_B.clone(), + ])) + .await + .expect("must succeed"); + + assert_eq!( + DIRECTORY_B.digest().to_vec(), + put_resp.into_inner().root_digest + ); + + // now, request b, first in non-recursive mode. + let items = get_directories( + &service, + GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().to_vec())), + }, + ) + .await + .expect("must not error"); + + // We expect to only get b. + assert_eq!(vec![DIRECTORY_B.clone()], items); + + // now, request b, but in recursive mode. + let items = get_directories( + &service, + GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().to_vec())), + }, + ) + .await + .expect("must not error"); + + // We expect to get b, and then a, because that's how we traverse down. + assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items); +} + +/// Put multiple Directories into the store, and omit duplicates. +#[tokio::test] +async fn put_get_dedup() { + let service = gen_grpc_service(); + + // Send "A", then "C", which refers to "A" two times + // Pretend we're a dumb client sending A twice. + let put_resp = service + .put(tonic_mock::streaming_request(vec![ + DIRECTORY_A.clone(), + DIRECTORY_A.clone(), + DIRECTORY_C.clone(), + ])) + .await + .expect("must succeed"); + + assert_eq!( + DIRECTORY_C.digest().to_vec(), + put_resp.into_inner().root_digest + ); + + // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice. + let items = get_directories( + &service, + GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().to_vec())), + }, + ) + .await + .expect("must not error"); + + // We expect to get C, and then A (once, as the second A has been deduplicated). + assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items); +} + +/// Trying to upload a Directory failing validation should fail. +#[tokio::test] +async fn put_reject_failed_validation() { + let service = gen_grpc_service(); + + // construct a broken Directory message that fails validation + let broken_directory = Directory { + symlinks: vec![SymlinkNode { + name: "".to_string(), + target: "doesntmatter".to_string(), + }], + ..Default::default() + }; + assert!(broken_directory.validate().is_err()); + + // send it over, it must fail + let put_resp = service + .put(tonic_mock::streaming_request(vec![broken_directory])) + .await + .expect_err("must fail"); + + assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); +} + +/// Trying to upload a Directory with wrong size should fail. +#[tokio::test] +async fn put_reject_wrong_size() { + let service = gen_grpc_service(); + + // Construct a directory referring to DIRECTORY_A, but with wrong size. + let broken_parent_directory = Directory { + directories: vec![DirectoryNode { + name: "foo".to_string(), + digest: DIRECTORY_A.digest().to_vec(), + size: 42, + }], + ..Default::default() + }; + // Make sure we got the size wrong. + assert_ne!( + broken_parent_directory.directories[0].size, + DIRECTORY_A.size() + ); + + // now upload both (first A, then the broken parent). This must fail. + let put_resp = service + .put(tonic_mock::streaming_request(vec![ + DIRECTORY_A.clone(), + broken_parent_directory, + ])) + .await + .expect_err("must fail"); + + assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); +} diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs new file mode 100644 index 000000000000..11cab2c264cc --- /dev/null +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -0,0 +1,67 @@ +use crate::nar::NonCachingNARCalculationService; +use crate::proto::get_path_info_request::ByWhat::ByOutputHash; +use crate::proto::node::Node::Symlink; +use crate::proto::path_info_service_server::PathInfoService as GRPCPathInfoService; +use crate::proto::GRPCPathInfoServiceWrapper; +use crate::proto::PathInfo; +use crate::proto::{GetPathInfoRequest, Node, SymlinkNode}; +use crate::tests::fixtures::DUMMY_OUTPUT_HASH; +use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; +use tonic::Request; + +/// generates a GRPCPathInfoService out of blob, directory and pathinfo services. +/// +/// We only interact with it via the PathInfo GRPC interface. +/// It uses the NonCachingNARCalculationService NARCalculationService to +/// calculate NARs. +fn gen_grpc_service() -> impl GRPCPathInfoService { + GRPCPathInfoServiceWrapper::new( + gen_pathinfo_service(), + NonCachingNARCalculationService::new(gen_blob_service(), gen_directory_service()), + ) +} + +/// Trying to get a non-existent PathInfo should return a not found error. +#[tokio::test] +async fn not_found() { + let service = gen_grpc_service(); + + let resp = service + .get(Request::new(GetPathInfoRequest { + by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.to_vec())), + })) + .await; + + let resp = resp.expect_err("must fail"); + assert_eq!(resp.code(), tonic::Code::NotFound); +} + +/// Put a PathInfo into the store, get it back. +#[tokio::test] +async fn put_get() { + let service = gen_grpc_service(); + + let path_info = PathInfo { + node: Some(Node { + node: Some(Symlink(SymlinkNode { + name: "00000000000000000000000000000000-foo".to_string(), + target: "doesntmatter".to_string(), + })), + }), + ..Default::default() + }; + + let resp = service.put(Request::new(path_info.clone())).await; + + assert!(resp.is_ok()); + assert_eq!(resp.expect("must succeed").into_inner(), path_info); + + let resp = service + .get(Request::new(GetPathInfoRequest { + by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.to_vec())), + })) + .await; + + assert!(resp.is_ok()); + assert_eq!(resp.expect("must succeed").into_inner(), path_info); +} diff --git a/tvix/store/src/proto/tests/mod.rs b/tvix/store/src/proto/tests/mod.rs new file mode 100644 index 000000000000..0a96ea3a0d59 --- /dev/null +++ b/tvix/store/src/proto/tests/mod.rs @@ -0,0 +1,6 @@ +mod directory; +mod directory_nodes_iterator; +mod grpc_blobservice; +mod grpc_directoryservice; +mod grpc_pathinfoservice; +mod pathinfo; diff --git a/tvix/store/src/proto/tests/pathinfo.rs b/tvix/store/src/proto/tests/pathinfo.rs new file mode 100644 index 000000000000..54a76fc6c554 --- /dev/null +++ b/tvix/store/src/proto/tests/pathinfo.rs @@ -0,0 +1,207 @@ +use crate::proto::{self, Node, PathInfo, ValidatePathInfoError}; +use lazy_static::lazy_static; +use nix_compat::store_path::{self, StorePath}; +use test_case::test_case; + +lazy_static! { + static ref DUMMY_DIGEST: Vec<u8> = vec![ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + ]; + static ref DUMMY_DIGEST_2: Vec<u8> = vec![ + 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + ]; +} + +const DUMMY_NAME: &str = "00000000000000000000000000000000-dummy"; + +#[test_case( + None, + Err(ValidatePathInfoError::NoNodePresent()) ; + "No node" +)] +#[test_case( + Some(Node { node: None }), + Err(ValidatePathInfoError::NoNodePresent()); + "No node 2" +)] +fn validate_no_node( + t_node: Option<proto::Node>, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: t_node, + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +#[test_case( + proto::DirectoryNode { + name: DUMMY_NAME.to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 0, + }, + Ok(StorePath::from_string(DUMMY_NAME).expect("must succeed")); + "ok" +)] +#[test_case( + proto::DirectoryNode { + name: DUMMY_NAME.to_string(), + digest: vec![], + size: 0, + }, + Err(ValidatePathInfoError::InvalidDigestLen(0)); + "invalid digest length" +)] +#[test_case( + proto::DirectoryNode { + name: "invalid".to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 0, + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".to_string(), + store_path::Error::InvalidName(store_path::NameError::InvalidName("".to_string())) + )); + "invalid node name" +)] +fn validate_directory( + t_directory_node: proto::DirectoryNode, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(Node { + node: Some(proto::node::Node::Directory(t_directory_node)), + }), + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +#[test_case( + proto::FileNode { + name: DUMMY_NAME.to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 0, + executable: false, + }, + Ok(StorePath::from_string(DUMMY_NAME).expect("must succeed")); + "ok" +)] +#[test_case( + proto::FileNode { + name: DUMMY_NAME.to_string(), + digest: vec![], + ..Default::default() + }, + Err(ValidatePathInfoError::InvalidDigestLen(0)); + "invalid digest length" +)] +#[test_case( + proto::FileNode { + name: "invalid".to_string(), + digest: DUMMY_DIGEST.to_vec(), + ..Default::default() + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".to_string(), + store_path::Error::InvalidName(store_path::NameError::InvalidName("".to_string())) + )); + "invalid node name" +)] +fn validate_file(t_file_node: proto::FileNode, t_result: Result<StorePath, ValidatePathInfoError>) { + // construct the PathInfo object + let p = PathInfo { + node: Some(Node { + node: Some(proto::node::Node::File(t_file_node)), + }), + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +#[test_case( + proto::SymlinkNode { + name: DUMMY_NAME.to_string(), + ..Default::default() + }, + Ok(StorePath::from_string(DUMMY_NAME).expect("must succeed")); + "ok" +)] +#[test_case( + proto::SymlinkNode { + name: "invalid".to_string(), + ..Default::default() + }, + Err(ValidatePathInfoError::InvalidNodeName( + "invalid".to_string(), + store_path::Error::InvalidName(store_path::NameError::InvalidName("".to_string())) + )); + "invalid node name" +)] +fn validate_symlink( + t_symlink_node: proto::SymlinkNode, + t_result: Result<StorePath, ValidatePathInfoError>, +) { + // construct the PathInfo object + let p = PathInfo { + node: Some(Node { + node: Some(proto::node::Node::Symlink(t_symlink_node)), + }), + ..Default::default() + }; + assert_eq!(t_result, p.validate()); +} + +#[test] +fn validate_references() { + // create a PathInfo without narinfo field. + let path_info = PathInfo { + node: Some(Node { + node: Some(proto::node::Node::Directory(proto::DirectoryNode { + name: DUMMY_NAME.to_string(), + digest: DUMMY_DIGEST.to_vec(), + size: 0, + })), + }), + references: vec![DUMMY_DIGEST_2.to_vec()], + narinfo: None, + }; + assert!(path_info.validate().is_ok()); + + // create a PathInfo with a narinfo field, but an inconsistent set of references + let path_info_with_narinfo_missing_refs = PathInfo { + narinfo: Some(proto::NarInfo { + nar_size: 0, + nar_sha256: DUMMY_DIGEST.to_vec(), + signatures: vec![], + reference_names: vec![], + }), + ..path_info.clone() + }; + match path_info_with_narinfo_missing_refs + .validate() + .expect_err("must_fail") + { + ValidatePathInfoError::InconsistentNumberOfReferences(_, _) => {} + _ => panic!("unexpected error"), + }; + + // create a pathinfo with the correct number of references, should suceed + let path_info_with_narinfo = PathInfo { + narinfo: Some(proto::NarInfo { + nar_size: 0, + nar_sha256: DUMMY_DIGEST.to_vec(), + signatures: vec![], + reference_names: vec![format!("/nix/store/{}", DUMMY_NAME)], + }), + ..path_info + }; + assert!(path_info_with_narinfo.validate().is_ok()); +} diff --git a/tvix/store/src/store_io.rs b/tvix/store/src/store_io.rs new file mode 100644 index 000000000000..fb46204e505f --- /dev/null +++ b/tvix/store/src/store_io.rs @@ -0,0 +1,358 @@ +//! This module provides an implementation of EvalIO. +//! +//! It can be used by the tvix evalutator to talk to a tvix store. + +use data_encoding::BASE64; +use nix_compat::{ + nixhash::{HashAlgo, NixHash, NixHashWithMode}, + store_path::{build_regular_ca_path, StorePath}, +}; +use smol_str::SmolStr; +use std::{io, path::Path, path::PathBuf}; +use tracing::{error, instrument, warn}; +use tvix_eval::{EvalIO, FileType, StdIO}; + +use crate::{ + blobservice::BlobService, + directoryservice::{self, DirectoryService}, + import, + nar::NARCalculationService, + pathinfoservice::PathInfoService, + proto::NamedNode, + B3Digest, +}; + +/// Implements [EvalIO], asking given [PathInfoService], [DirectoryService] +/// and [BlobService]. +/// +/// In case the given path does not exist in these stores, we ask StdIO. +/// This is to both cover cases of syntactically valid store paths, that exist +/// on the filesystem (still managed by Nix), as well as being able to read +/// files outside store paths. +pub struct TvixStoreIO< + BS: BlobService, + DS: DirectoryService, + PS: PathInfoService, + NCS: NARCalculationService, +> { + blob_service: BS, + directory_service: DS, + path_info_service: PS, + nar_calculation_service: NCS, + std_io: StdIO, +} + +impl<BS: BlobService, DS: DirectoryService, PS: PathInfoService, NCS: NARCalculationService> + TvixStoreIO<BS, DS, PS, NCS> +{ + pub fn new( + blob_service: BS, + directory_service: DS, + path_info_service: PS, + nar_calculation_service: NCS, + ) -> Self { + Self { + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + std_io: StdIO {}, + } + } + + /// for a given [StorePath] and additional [Path] inside the store path, + /// look up the [PathInfo], and if it exists, traverse the directory structure to + /// return the [crate::proto::node::Node] specified by `sub_path`. + #[instrument(skip(self), ret, err)] + fn store_path_to_root_node( + &self, + store_path: &StorePath, + sub_path: &Path, + ) -> Result<Option<crate::proto::node::Node>, crate::Error> { + let path_info = { + match self.path_info_service.get(store_path.digest)? { + // If there's no PathInfo found, early exit + None => return Ok(None), + Some(path_info) => path_info, + } + }; + + let root_node = { + match path_info.node { + None => { + warn!( + "returned PathInfo {:?} node is None, this shouldn't happen.", + &path_info + ); + return Ok(None); + } + Some(root_node) => match root_node.node { + None => { + warn!("node for {:?} is None, this shouldn't happen.", &root_node); + return Ok(None); + } + Some(root_node) => root_node, + }, + } + }; + + directoryservice::traverse_to(&self.directory_service, root_node, sub_path) + } + + /// Imports a given path on the filesystem into the store, and returns the + /// [crate::proto::PathInfo] describing the path, that was sent to + /// [PathInfoService]. + /// While not part of the [EvalIO], it's still useful for clients who + /// care about the [PathInfo]. + #[instrument(skip(self), ret, err)] + pub fn import_path_with_pathinfo( + &self, + path: &std::path::Path, + ) -> Result<crate::proto::PathInfo, io::Error> { + // Call [import::ingest_path], which will walk over the given path and return a root_node. + let root_node = import::ingest_path(&self.blob_service, &self.directory_service, path) + .expect("error during import_path"); + + // Render the NAR + let (nar_size, nar_sha256) = self + .nar_calculation_service + .calculate_nar(&root_node) + .expect("error during nar calculation"); // TODO: handle error + + // For given NAR sha256 digest and name, return the new [StorePath] this would have. + let nar_hash_with_mode = + NixHashWithMode::Recursive(NixHash::new(HashAlgo::Sha256, nar_sha256.to_vec())); + + let name = path + .file_name() + .expect("path must not be ..") + .to_str() + .expect("path must be valid unicode"); + + let output_path = + build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap(); + + // assemble a new root_node with a name that is derived from the nar hash. + let renamed_root_node = { + let name = output_path.to_string(); + + match root_node { + crate::proto::node::Node::Directory(n) => { + crate::proto::node::Node::Directory(crate::proto::DirectoryNode { name, ..n }) + } + crate::proto::node::Node::File(n) => { + crate::proto::node::Node::File(crate::proto::FileNode { name, ..n }) + } + crate::proto::node::Node::Symlink(n) => { + crate::proto::node::Node::Symlink(crate::proto::SymlinkNode { name, ..n }) + } + } + }; + + // assemble the [crate::proto::PathInfo] object. + let path_info = crate::proto::PathInfo { + node: Some(crate::proto::Node { + node: Some(renamed_root_node), + }), + // There's no reference scanning on path contents ingested like this. + references: vec![], + narinfo: Some(crate::proto::NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec(), + signatures: vec![], + reference_names: vec![], + // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6` + // do we need this anywhere? + }), + }; + + // put into [PathInfoService], and return the PathInfo that we get back + // from there (it might contain additional signatures). + let path_info = self.path_info_service.put(path_info)?; + + Ok(path_info) + } +} + +/// For given NAR sha256 digest and name, return the new [StorePath] this would have. +#[instrument(skip(nar_sha256_digest), ret, fields(nar_sha256_digest=BASE64.encode(nar_sha256_digest)))] +fn calculate_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> StorePath { + let nar_hash_with_mode = + NixHashWithMode::Recursive(NixHash::new(HashAlgo::Sha256, nar_sha256_digest.to_vec())); + + build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap() +} + +impl<BS: BlobService, DS: DirectoryService, PS: PathInfoService, NCS: NARCalculationService> EvalIO + for TvixStoreIO<BS, DS, PS, NCS> +{ + #[instrument(skip(self), ret, err)] + fn path_exists(&self, path: &Path) -> Result<bool, io::Error> { + if let Ok((store_path, sub_path)) = + StorePath::from_absolute_path_full(&path.to_string_lossy()) + { + if self + .store_path_to_root_node(&store_path, &sub_path)? + .is_some() + { + Ok(true) + } else { + // As tvix-store doesn't manage /nix/store on the filesystem, + // we still need to also ask self.std_io here. + self.std_io.path_exists(path) + } + } else { + // The store path is no store path, so do regular StdIO. + self.std_io.path_exists(path) + } + } + + #[instrument(skip(self), ret, err)] + fn read_to_string(&self, path: &Path) -> Result<String, io::Error> { + if let Ok((store_path, sub_path)) = + StorePath::from_absolute_path_full(&path.to_string_lossy()) + { + if let Some(node) = self.store_path_to_root_node(&store_path, &sub_path)? { + // depending on the node type, treat read_to_string differently + match node { + crate::proto::node::Node::Directory(_) => { + // This would normally be a io::ErrorKind::IsADirectory (still unstable) + Err(io::Error::new( + io::ErrorKind::Unsupported, + "tried to read directory at {path} to string", + )) + } + crate::proto::node::Node::File(file_node) => { + let digest = + B3Digest::from_vec(file_node.digest.clone()).map_err(|_e| { + error!( + file_node = ?file_node, + "invalid digest" + ); + io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid digest length in file node: {:?}", file_node), + ) + })?; + + let reader = { + let resp = self.blob_service.open_read(&digest)?; + match resp { + Some(blob_reader) => blob_reader, + None => { + error!( + blob.digest = %digest, + "blob not found", + ); + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("blob {} not found", &digest), + ))? + } + } + }; + + io::read_to_string(reader) + } + crate::proto::node::Node::Symlink(_symlink_node) => Err(io::Error::new( + io::ErrorKind::Unsupported, + "read_to_string for symlinks is unsupported", + ))?, + } + } else { + // As tvix-store doesn't manage /nix/store on the filesystem, + // we still need to also ask self.std_io here. + self.std_io.read_to_string(path) + } + } else { + // The store path is no store path, so do regular StdIO. + self.std_io.read_to_string(path) + } + } + + #[instrument(skip(self), ret, err)] + fn read_dir(&self, path: &Path) -> Result<Vec<(SmolStr, FileType)>, io::Error> { + if let Ok((store_path, sub_path)) = + StorePath::from_absolute_path_full(&path.to_string_lossy()) + { + if let Some(node) = self.store_path_to_root_node(&store_path, &sub_path)? { + match node { + crate::proto::node::Node::Directory(directory_node) => { + // fetch the Directory itself. + let digest = + B3Digest::from_vec(directory_node.digest.clone()).map_err(|_e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "invalid digest length in directory node: {:?}", + directory_node + ), + ) + })?; + + if let Some(directory) = self.directory_service.get(&digest)? { + let mut children: Vec<(SmolStr, FileType)> = Vec::new(); + for node in directory.nodes() { + children.push(match node { + crate::proto::node::Node::Directory(e) => { + (e.name.into(), FileType::Directory) + } + crate::proto::node::Node::File(e) => { + (e.name.into(), FileType::Regular) + } + crate::proto::node::Node::Symlink(e) => { + (e.name.into(), FileType::Symlink) + } + }) + } + Ok(children) + } else { + // If we didn't get the directory node that's linked, that's a store inconsistency! + error!( + directory.digest = %digest, + path = ?path, + "directory not found", + ); + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("directory {digest} does not exist"), + ))? + } + } + crate::proto::node::Node::File(_file_node) => { + // This would normally be a io::ErrorKind::NotADirectory (still unstable) + Err(io::Error::new( + io::ErrorKind::Unsupported, + "tried to readdir path {:?}, which is a file", + ))? + } + crate::proto::node::Node::Symlink(_symlink_node) => Err(io::Error::new( + io::ErrorKind::Unsupported, + "read_dir for symlinks is unsupported", + ))?, + } + } else { + self.std_io.read_dir(path) + } + } else { + self.std_io.read_dir(path) + } + } + + #[instrument(skip(self), ret, err)] + fn import_path(&self, path: &std::path::Path) -> Result<PathBuf, std::io::Error> { + let path_info = self.import_path_with_pathinfo(path)?; + + // from the [PathInfo], extract the store path (as string). + let mut path = PathBuf::from(nix_compat::store_path::STORE_DIR_WITH_SLASH); + path.push(path_info.node.unwrap().node.unwrap().get_name()); + + // and return it + Ok(path) + } + + #[instrument(skip(self), ret)] + fn store_dir(&self) -> Option<String> { + Some("/nix/store".to_string()) + } +} diff --git a/tvix/store/src/tests/fixtures.rs b/tvix/store/src/tests/fixtures.rs new file mode 100644 index 000000000000..934d9e4c5302 --- /dev/null +++ b/tvix/store/src/tests/fixtures.rs @@ -0,0 +1,175 @@ +use crate::{ + proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode}, + B3Digest, +}; +use lazy_static::lazy_static; + +pub const HELLOWORLD_BLOB_CONTENTS: &[u8] = b"Hello World!"; +pub const EMPTY_BLOB_CONTENTS: &[u8] = b""; + +lazy_static! { + pub static ref DUMMY_DIGEST: Vec<u8> = vec![ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + ]; + pub static ref DUMMY_DATA_1: Vec<u8> = vec![0x01, 0x02, 0x03]; + pub static ref DUMMY_DATA_2: Vec<u8> = vec![0x04, 0x05]; + + pub static ref HELLOWORLD_BLOB_DIGEST: B3Digest = + blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into(); + pub static ref EMPTY_BLOB_DIGEST: B3Digest = + blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into(); + + // 2 bytes + pub static ref BLOB_A: Vec<u8> = vec![0x00, 0x01]; + pub static ref BLOB_A_DIGEST: B3Digest = blake3::hash(&BLOB_A).as_bytes().into(); + + // 1MB + pub static ref BLOB_B: Vec<u8> = (0..255).collect::<Vec<u8>>().repeat(4 * 1024); + pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into(); + + // Directories + pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory { + directories: vec![], + files: vec![FileNode { + name: ".keep".to_string(), + digest: EMPTY_BLOB_DIGEST.to_vec(), + size: 0, + executable: false, + }], + symlinks: vec![], + }; + pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory { + directories: vec![DirectoryNode { + name: "keep".to_string(), + digest: DIRECTORY_WITH_KEEP.digest().to_vec(), + size: DIRECTORY_WITH_KEEP.size(), + }], + files: vec![FileNode { + name: ".keep".to_string(), + digest: EMPTY_BLOB_DIGEST.to_vec(), + size: 0, + executable: false, + }], + symlinks: vec![SymlinkNode { + name: "aa".to_string(), + target: "/nix/store/somewhereelse".to_string(), + }], + }; + pub static ref DIRECTORY_A: Directory = Directory::default(); + pub static ref DIRECTORY_B: Directory = Directory { + directories: vec![DirectoryNode { + name: "a".to_string(), + digest: DIRECTORY_A.digest().to_vec(), + size: DIRECTORY_A.size(), + }], + ..Default::default() + }; + pub static ref DIRECTORY_C: Directory = Directory { + directories: vec![ + DirectoryNode { + name: "a".to_string(), + digest: DIRECTORY_A.digest().to_vec(), + size: DIRECTORY_A.size(), + }, + DirectoryNode { + name: "a'".to_string(), + digest: DIRECTORY_A.digest().to_vec(), + size: DIRECTORY_A.size(), + } + ], + ..Default::default() + }; + + // output hash + pub static ref DUMMY_OUTPUT_HASH: Vec<u8> = vec![ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00 + ]; + + /// The NAR representation of a symlink pointing to `/nix/store/somewhereelse` + pub static ref NAR_CONTENTS_SYMLINK: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink" + 6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target + 24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o', + b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's', + b'e', // "/nix/store/somewhereelse" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")" + ]; + + /// The NAR representation of a regular file with the contents "Hello World!" + pub static ref NAR_CONTENTS_HELLOWORLD: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 12, 0, 0, 0, 0, 0, 0, 0, b'H', b'e', b'l', b'l', b'o', b' ', b'W', b'o', b'r', b'l', b'd', b'!', 0, 0, + 0, 0, // "Hello World!" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")" + ]; + + /// The NAR representation of a more complicated directory structure. + pub static ref NAR_CONTENTS_COMPLICATED: Vec<u8> = vec![ + 13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0, + 0, 0, // "nix-archive-1" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 5, 0, 0, 0, 0, 0, 0, 0, b'.', b'k', b'e', b'e', b'p', 0, 0, 0, // ".keep" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 0, 0, 0, 0, 0, 0, 0, 0, // "" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 2, 0, 0, 0, 0, 0, 0, 0, b'a', b'a', 0, 0, 0, 0, 0, 0, // "aa" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink" + 6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target + 24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o', + b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's', + b'e', // "/nix/store/somewhereelse" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 4, 0, 0, 0, 0, 0, 0, 0, b'k', b'e', b'e', b'p', 0, 0, 0, 0, // "keep" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory" + 5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name" + 5, 0, 0, 0, 0, 0, 0, 0, 46, 107, 101, 101, 112, 0, 0, 0, // ".keep" + 4, 0, 0, 0, 0, 0, 0, 0, 110, 111, 100, 101, 0, 0, 0, 0, // "node" + 1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "(" + 4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type" + 7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular" + 8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents" + 0, 0, 0, 0, 0, 0, 0, 0, // "" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + 1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")" + ]; +} diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs new file mode 100644 index 000000000000..8b66cb024bf0 --- /dev/null +++ b/tvix/store/src/tests/import.rs @@ -0,0 +1,113 @@ +use super::utils::{gen_blob_service, gen_directory_service}; +use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryService; +use crate::import::ingest_path; +use crate::proto; +use crate::tests::fixtures::DIRECTORY_COMPLICATED; +use crate::tests::fixtures::*; +use tempfile::TempDir; + +#[cfg(target_family = "unix")] +#[test] +fn symlink() { + let tmpdir = TempDir::new().unwrap(); + + std::fs::create_dir_all(&tmpdir).unwrap(); + std::os::unix::fs::symlink( + "/nix/store/somewhereelse", + tmpdir.path().join("doesntmatter"), + ) + .unwrap(); + + let root_node = ingest_path( + &mut gen_blob_service(), + &mut gen_directory_service(), + tmpdir.path().join("doesntmatter"), + ) + .expect("must succeed"); + + assert_eq!( + crate::proto::node::Node::Symlink(proto::SymlinkNode { + name: "doesntmatter".to_string(), + target: "/nix/store/somewhereelse".to_string(), + }), + root_node, + ) +} + +#[test] +fn single_file() { + let tmpdir = TempDir::new().unwrap(); + + std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap(); + + let mut blob_service = gen_blob_service(); + + let root_node = ingest_path( + &mut blob_service, + &mut gen_directory_service(), + tmpdir.path().join("root"), + ) + .expect("must succeed"); + + assert_eq!( + crate::proto::node::Node::File(proto::FileNode { + name: "root".to_string(), + digest: HELLOWORLD_BLOB_DIGEST.to_vec(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + root_node, + ); + + // ensure the blob has been uploaded + assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).unwrap()); +} + +#[test] +fn complicated() { + let tmpdir = TempDir::new().unwrap(); + + // File ``.keep` + std::fs::write(tmpdir.path().join(".keep"), vec![]).unwrap(); + // Symlink `aa` + std::os::unix::fs::symlink("/nix/store/somewhereelse", tmpdir.path().join("aa")).unwrap(); + // Directory `keep` + std::fs::create_dir(tmpdir.path().join("keep")).unwrap(); + // File ``keep/.keep` + std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap(); + + let mut blob_service = gen_blob_service(); + let mut directory_service = gen_directory_service(); + + let root_node = ingest_path(&mut blob_service, &mut directory_service, tmpdir.path()) + .expect("must succeed"); + + // ensure root_node matched expectations + assert_eq!( + crate::proto::node::Node::Directory(proto::DirectoryNode { + name: tmpdir + .path() + .file_name() + .unwrap() + .to_string_lossy() + .to_string(), + digest: DIRECTORY_COMPLICATED.digest().to_vec(), + size: DIRECTORY_COMPLICATED.size(), + }), + root_node, + ); + + // ensure DIRECTORY_WITH_KEEP and DIRECTORY_COMPLICATED have been uploaded + assert!(directory_service + .get(&DIRECTORY_WITH_KEEP.digest()) + .unwrap() + .is_some()); + assert!(directory_service + .get(&DIRECTORY_COMPLICATED.digest()) + .unwrap() + .is_some()); + + // ensure EMPTY_BLOB_CONTENTS has been uploaded + assert!(blob_service.has(&EMPTY_BLOB_DIGEST).unwrap()); +} diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs new file mode 100644 index 000000000000..8ceea01e3190 --- /dev/null +++ b/tvix/store/src/tests/mod.rs @@ -0,0 +1,4 @@ +pub mod fixtures; +mod import; +mod nar_renderer; +pub mod utils; diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs new file mode 100644 index 000000000000..f13107b1e48d --- /dev/null +++ b/tvix/store/src/tests/nar_renderer.rs @@ -0,0 +1,189 @@ +use crate::blobservice::BlobService; +use crate::blobservice::BlobWriter; +use crate::directoryservice::DirectoryService; +use crate::nar::NARRenderer; +use crate::proto::DirectoryNode; +use crate::proto::FileNode; +use crate::proto::SymlinkNode; +use crate::tests::fixtures::*; +use crate::tests::utils::*; +use std::io; + +#[test] +fn single_symlink() { + let renderer = NARRenderer::new(gen_blob_service(), gen_directory_service()); + // don't put anything in the stores, as we don't actually do any requests. + + let mut buf: Vec<u8> = vec![]; + + renderer + .write_nar( + &mut buf, + &crate::proto::node::Node::Symlink(SymlinkNode { + name: "doesntmatter".to_string(), + target: "/nix/store/somewhereelse".to_string(), + }), + ) + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_SYMLINK.to_vec()); +} + +/// Make sure the NARRenderer fails if the blob size in the proto node doesn't +/// match what's in the store. +#[test] +fn single_file_missing_blob() { + let renderer = NARRenderer::new(gen_blob_service(), gen_directory_service()); + let mut buf: Vec<u8> = vec![]; + + let e = renderer + .write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".to_string(), + digest: HELLOWORLD_BLOB_DIGEST.to_vec(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + ) + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::NotFound, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } +} + +/// Make sure the NAR Renderer fails if the returned blob meta has another size +/// than specified in the proto node. +#[test] +fn single_file_wrong_blob_size() { + let blob_service = gen_blob_service(); + + // insert blob into the store + let mut writer = blob_service.open_write().unwrap(); + io::copy( + &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .unwrap(); + assert_eq!(HELLOWORLD_BLOB_DIGEST.clone(), writer.close().unwrap()); + + let renderer = NARRenderer::new(blob_service, gen_directory_service()); + + // Test with a root FileNode of a too big size + { + let mut buf: Vec<u8> = vec![]; + let e = renderer + .write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".to_string(), + digest: HELLOWORLD_BLOB_DIGEST.to_vec(), + size: 42, // <- note the wrong size here! + executable: false, + }), + ) + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::UnexpectedEof, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } + } + + // Test with a root FileNode of a too small size + { + let mut buf: Vec<u8> = vec![]; + let e = renderer + .write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".to_string(), + digest: HELLOWORLD_BLOB_DIGEST.to_vec(), + size: 2, // <- note the wrong size here! + executable: false, + }), + ) + .expect_err("must fail"); + + match e { + crate::nar::RenderError::NARWriterError(e) => { + assert_eq!(io::ErrorKind::InvalidInput, e.kind()); + } + _ => panic!("unexpected error: {:?}", e), + } + } +} + +#[test] +fn single_file() { + let blob_service = gen_blob_service(); + + // insert blob into the store + let mut writer = blob_service.open_write().unwrap(); + io::copy( + &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .unwrap(); + assert_eq!(HELLOWORLD_BLOB_DIGEST.clone(), writer.close().unwrap()); + + let renderer = NARRenderer::new(blob_service, gen_directory_service()); + let mut buf: Vec<u8> = vec![]; + + renderer + .write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".to_string(), + digest: HELLOWORLD_BLOB_DIGEST.to_vec(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + ) + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec()); +} + +#[test] +fn test_complicated() { + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + + // put all data into the stores. + // insert blob into the store + let mut writer = blob_service.open_write().unwrap(); + io::copy( + &mut io::Cursor::new(EMPTY_BLOB_CONTENTS.to_vec()), + &mut writer, + ) + .unwrap(); + assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().unwrap()); + + directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap(); + directory_service + .put(DIRECTORY_COMPLICATED.clone()) + .unwrap(); + + let renderer = NARRenderer::new(blob_service, directory_service); + let mut buf: Vec<u8> = vec![]; + + renderer + .write_nar( + &mut buf, + &crate::proto::node::Node::Directory(DirectoryNode { + name: "doesntmatter".to_string(), + digest: DIRECTORY_COMPLICATED.digest().to_vec(), + size: DIRECTORY_COMPLICATED.size(), + }), + ) + .expect("must succeed"); + + assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec()); +} diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs new file mode 100644 index 000000000000..2991feed41db --- /dev/null +++ b/tvix/store/src/tests/utils.rs @@ -0,0 +1,17 @@ +use crate::{ + blobservice::{BlobService, MemoryBlobService}, + directoryservice::{DirectoryService, MemoryDirectoryService}, + pathinfoservice::{MemoryPathInfoService, PathInfoService}, +}; + +pub fn gen_blob_service() -> impl BlobService + Send + Sync + Clone + 'static { + MemoryBlobService::default() +} + +pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + 'static { + MemoryDirectoryService::default() +} + +pub fn gen_pathinfo_service() -> impl PathInfoService { + MemoryPathInfoService::default() +} |