diff options
Diffstat (limited to 'tvix/castore/src')
30 files changed, 4843 insertions, 0 deletions
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs new file mode 100644 index 000000000000..2e0a30697d75 --- /dev/null +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; +use url::Url; + +use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService}; + +/// Constructs a new instance of a [BlobService] from an URI. +/// +/// The following schemes are supported by the following services: +/// - `memory://` ([MemoryBlobService]) +/// - `sled://` ([SledBlobService]) +/// - `grpc+*://` ([GRPCBlobService]) +/// +/// See their `from_url` methods for more details about their syntax. +pub fn from_addr(uri: &str) -> Result<Arc<dyn BlobService>, crate::Error> { + let url = Url::parse(uri) + .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; + + Ok(if url.scheme() == "memory" { + Arc::new(MemoryBlobService::from_url(&url)?) + } else if url.scheme() == "sled" { + Arc::new(SledBlobService::from_url(&url)?) + } else if url.scheme().starts_with("grpc+") { + Arc::new(GRPCBlobService::from_url(&url)?) + } else { + Err(crate::Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))? + }) +} diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs new file mode 100644 index 000000000000..115efa5f09c6 --- /dev/null +++ b/tvix/castore/src/blobservice/grpc.rs @@ -0,0 +1,406 @@ +use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; +use crate::{proto, B3Digest}; +use futures::sink::SinkExt; +use futures::TryFutureExt; +use std::{ + collections::VecDeque, + io::{self}, + pin::pin, + task::Poll, +}; +use tokio::io::AsyncWriteExt; +use tokio::{net::UnixStream, task::JoinHandle}; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_util::{ + io::{CopyToBytes, SinkWriter}, + sync::{PollSendError, PollSender}, +}; +use tonic::{async_trait, transport::Channel, Code, Status}; +use tracing::instrument; + +/// Connects to a (remote) tvix-store BlobService over gRPC. +#[derive(Clone)] +pub struct GRPCBlobService { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, +} + +impl GRPCBlobService { + /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + ) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl BlobService for GRPCBlobService { + /// Constructs a [GRPCBlobService] from the passed [url::Url]: + /// - scheme has to match `grpc+*://`. + /// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + /// - In the case of unix sockets, there must be a path, but may not be a host. + /// - In the case of non-unix sockets, there must be a host, but no path. + fn from_url(url: &url::Url) -> Result<Self, crate::Error> { + // Start checking for the scheme to start with grpc+. + match url.scheme().strip_prefix("grpc+") { + None => Err(crate::Error::StorageError("invalid scheme".to_string())), + Some(rest) => { + if rest == "unix" { + if url.host_str().is_some() { + return Err(crate::Error::StorageError( + "host may not be set".to_string(), + )); + } + let path = url.path().to_string(); + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter + .unwrap() + .connect_with_connector_lazy(tower::service_fn( + move |_: tonic::transport::Uri| UnixStream::connect(path.clone()), + )); + let grpc_client = proto::blob_service_client::BlobServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } else { + // ensure path is empty, not supported with gRPC. + if !url.path().is_empty() { + return Err(crate::Error::StorageError( + "path may not be set".to_string(), + )); + } + + // clone the uri, and drop the grpc+ from the scheme. + // Recreate a new uri with the `grpc+` prefix dropped from the scheme. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let url = { + let url_str = url.to_string(); + let s_stripped = url_str.strip_prefix("grpc+").unwrap(); + url::Url::parse(s_stripped).unwrap() + }; + let channel = tonic::transport::Endpoint::try_from(url.to_string()) + .unwrap() + .connect_lazy(); + + let grpc_client = proto::blob_service_client::BlobServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } + } + } + } + + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> { + let mut grpc_client = self.grpc_client.clone(); + let resp = grpc_client + .stat(proto::StatBlobRequest { + digest: digest.clone().into(), + }) + .await; + + match resp { + Ok(_blob_meta) => Ok(true), + Err(e) if e.code() == Code::NotFound => Ok(false), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + // On success, this returns a Ok(Some(io::Read)), which can be used to read + // the contents of the Blob, identified by the digest. + async fn open_read( + &self, + digest: &B3Digest, + ) -> Result<Option<Box<dyn BlobReader>>, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + + // Get a stream of [proto::BlobChunk], or return an error if the blob + // doesn't exist. + let resp = grpc_client + .read(proto::ReadBlobRequest { + digest: digest.clone().into(), + }) + .await; + + // This runs the task to completion, which on success will return a stream. + // On reading from it, we receive individual [proto::BlobChunk], so we + // massage this to a stream of bytes, + // then create an [AsyncRead], which we'll turn into a [io::Read], + // that's returned from the function. + match resp { + Ok(stream) => { + // map the stream of proto::BlobChunk to bytes. + let data_stream = stream.into_inner().map(|x| { + x.map(|x| VecDeque::from(x.data.to_vec())) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) + }); + + // Use StreamReader::new to convert to an AsyncRead. + let data_reader = tokio_util::io::StreamReader::new(data_stream); + + Ok(Some(Box::new(NaiveSeeker::new(data_reader)))) + } + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + /// Returns a BlobWriter, that'll internally wrap each write in a + // [proto::BlobChunk], which is send to the gRPC server. + async fn open_write(&self) -> Box<dyn BlobWriter> { + let mut grpc_client = self.grpc_client.clone(); + + // set up an mpsc channel passing around Bytes. + let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); + + // bytes arriving on the RX side are wrapped inside a + // [proto::BlobChunk], and a [ReceiverStream] is constructed. + let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); + + // That receiver stream is used as a stream in the gRPC BlobService.put rpc call. + let task: JoinHandle<Result<_, Status>> = + tokio::spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); + + // The tx part of the channel is converted to a sink of byte chunks. + + // We need to make this a function pointer, not a closure. + fn convert_error(_: PollSendError<bytes::Bytes>) -> io::Error { + io::Error::from(io::ErrorKind::BrokenPipe) + } + + let sink = PollSender::new(tx) + .sink_map_err(convert_error as fn(PollSendError<bytes::Bytes>) -> io::Error); + // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item" + + // … which is turned into an [tokio::io::AsyncWrite]. + let writer = SinkWriter::new(CopyToBytes::new(sink)); + + Box::new(GRPCBlobWriter { + task_and_writer: Some((task, writer)), + digest: None, + }) + } +} + +pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> { + /// The task containing the put request, and the inner writer, if we're still writing. + task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, +} + +#[async_trait] +impl<W: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static> BlobWriter for GRPCBlobWriter<W> { + async fn close(&mut self) -> Result<B3Digest, crate::Error> { + if self.task_and_writer.is_none() { + // if we're already closed, return the b3 digest, which must exist. + // If it doesn't, we already closed and failed once, and didn't handle the error. + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (task, mut writer) = self.task_and_writer.take().unwrap(); + + // invoke shutdown, so the inner writer closes its internal tx side of + // the channel. + writer + .shutdown() + .map_err(|e| crate::Error::StorageError(e.to_string())) + .await?; + + // block on the RPC call to return. + // This ensures all chunks are sent out, and have been received by the + // backend. + + match task.await? { + Ok(resp) => { + // return the digest from the response, and store it in self.digest for subsequent closes. + let digest: B3Digest = resp.digest.try_into().map_err(|_| { + crate::Error::StorageError( + "invalid root digest length in response".to_string(), + ) + })?; + self.digest = Some(digest.clone()); + Ok(digest) + } + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + } +} + +impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for GRPCBlobWriter<W> { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<Result<usize, io::Error>> { + match &mut self.task_and_writer { + None => Poll::Ready(Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + ))), + Some((_, ref mut writer)) => { + let pinned_writer = pin!(writer); + pinned_writer.poll_write(cx, buf) + } + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + match &mut self.task_and_writer { + None => Poll::Ready(Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + ))), + Some((_, ref mut writer)) => { + let pinned_writer = pin!(writer); + pinned_writer.poll_flush(cx) + } + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + // TODO(raitobezarius): this might not be a graceful shutdown of the + // channel inside the gRPC connection. + Poll::Ready(Ok(())) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio_retry::strategy::ExponentialBackoff; + use tokio_retry::Retry; + use tokio_stream::wrappers::UnixListenerStream; + + use crate::blobservice::MemoryBlobService; + use crate::fixtures; + use crate::proto::GRPCBlobServiceWrapper; + + use super::BlobService; + use super::GRPCBlobService; + + /// This uses the wrong scheme + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme for a unix socket. + /// The fact that /path/to/somewhere doesn't exist yet is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_unix_path() { + let url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_ok()); + } + + /// This uses the correct scheme for a unix socket, + /// but sets a host, which is unsupported. + #[tokio::test] + async fn test_invalid_unix_path_with_domain() { + let url = + url::Url::parse("grpc+unix://host.example/path/to/somewhere").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme for a HTTP server. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_http() { + let url = url::Url::parse("grpc+http://localhost").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_ok()); + } + + /// This uses the correct scheme for a HTTPS server. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_https() { + let url = url::Url::parse("grpc+https://localhost").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_ok()); + } + + /// This uses the correct scheme, but also specifies + /// an additional path, which is not supported for gRPC. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_invalid_http_with_path() { + let url = url::Url::parse("grpc+https://localhost/some-path").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_err()); + } + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("daemon"); + + let path_clone = socket_path.clone(); + + // Spin up a server + tokio::spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = + server.add_service(crate::proto::blob_service_server::BlobServiceServer::new( + GRPCBlobServiceWrapper::from( + Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService> + ), + )); + router.serve_with_incoming(uds_stream).await + }); + + // wait for the socket to be created + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // prepare a client + let grpc_client = { + let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) + .expect("must parse"); + GRPCBlobService::from_url(&url).expect("must succeed") + }; + + let has = grpc_client + .has(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not be err"); + + assert!(!has); + } +} diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs new file mode 100644 index 000000000000..383127344a17 --- /dev/null +++ b/tvix/castore/src/blobservice/memory.rs @@ -0,0 +1,196 @@ +use std::io::{self, Cursor, Write}; +use std::task::Poll; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; +use tonic::async_trait; +use tracing::instrument; + +use super::{BlobReader, BlobService, BlobWriter}; +use crate::{B3Digest, Error}; + +#[derive(Clone, Default)] +pub struct MemoryBlobService { + db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, +} + +#[async_trait] +impl BlobService for MemoryBlobService { + /// Constructs a [MemoryBlobService] from the passed [url::Url]: + /// - scheme has to be `memory://` + /// - there may not be a host. + /// - there may not be a path. + fn from_url(url: &url::Url) -> Result<Self, Error> { + if url.scheme() != "memory" { + return Err(crate::Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() || !url.path().is_empty() { + return Err(crate::Error::StorageError("invalid url".to_string())); + } + + Ok(Self::default()) + } + + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> Result<bool, Error> { + let db = self.db.read().unwrap(); + Ok(db.contains_key(digest)) + } + + async fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> { + let db = self.db.read().unwrap(); + + match db.get(digest).map(|x| Cursor::new(x.clone())) { + Some(result) => Ok(Some(Box::new(result))), + None => Ok(None), + } + } + + #[instrument(skip(self))] + async fn open_write(&self) -> Box<dyn BlobWriter> { + Box::new(MemoryBlobWriter::new(self.db.clone())) + } +} + +pub struct MemoryBlobWriter { + db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, + + /// Contains the buffer Vec and hasher, or None if already closed + writers: Option<(Vec<u8>, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, +} + +impl MemoryBlobWriter { + fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self { + Self { + db, + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, + } + } +} +impl tokio::io::AsyncWrite for MemoryBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + b: &[u8], + ) -> std::task::Poll<Result<usize, io::Error>> { + Poll::Ready(match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&b[..bytes_written]) + } + }) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + Poll::Ready(match self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + }) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + // shutdown is "instantaneous", we only write to memory. + Poll::Ready(Ok(())) + } +} + +#[async_trait] +impl BlobWriter for MemoryBlobWriter { + async fn close(&mut self) -> Result<B3Digest, Error> { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest: B3Digest = hasher.finalize().as_bytes().into(); + + // Only insert if the blob doesn't already exist. + let db = self.db.read()?; + if !db.contains_key(&digest) { + // drop the read lock, so we can open for writing. + drop(db); + + // open the database for writing. + let mut db = self.db.write()?; + + // and put buf in there. This will move buf out. + db.insert(digest.clone(), buf); + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } + } +} + +#[cfg(test)] +mod tests { + use super::BlobService; + use super::MemoryBlobService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } + + /// This correctly sets the scheme, and doesn't set a path. + #[test] + fn test_valid_scheme() { + let url = url::Url::parse("memory://").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_ok()); + } + + /// This sets the host to `foo` + #[test] + fn test_invalid_host() { + let url = url::Url::parse("memory://foo").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } + + /// This has the path "/", which is invalid. + #[test] + fn test_invalid_has_path() { + let url = url::Url::parse("memory:///").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } + + /// This has the path "/foo", which is invalid. + #[test] + fn test_invalid_path2() { + let url = url::Url::parse("memory:///foo").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } +} diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs new file mode 100644 index 000000000000..5ecf25ac1337 --- /dev/null +++ b/tvix/castore/src/blobservice/mod.rs @@ -0,0 +1,62 @@ +use std::io; +use tonic::async_trait; + +use crate::{B3Digest, Error}; + +mod from_addr; +mod grpc; +mod memory; +mod naive_seeker; +mod sled; + +#[cfg(test)] +mod tests; + +pub use self::from_addr::from_addr; +pub use self::grpc::GRPCBlobService; +pub use self::memory::MemoryBlobService; +pub use self::sled::SledBlobService; + +/// The base trait all BlobService services need to implement. +/// It provides functions to check whether a given blob exists, +/// a way to get a [io::Read] to a blob, and a method to initiate writing a new +/// Blob, which will return something implmenting io::Write, and providing a +/// close funtion, to finalize a blob and get its digest. +#[async_trait] +pub trait BlobService: Send + Sync { + /// Create a new instance by passing in a connection URL. + /// TODO: check if we want to make this async, instead of lazily connecting + fn from_url(url: &url::Url) -> Result<Self, Error> + where + Self: Sized; + + /// Check if the service has the blob, by its content hash. + async fn has(&self, digest: &B3Digest) -> Result<bool, Error>; + + /// Request a blob from the store, by its content hash. + async fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error>; + + /// Insert a new blob into the store. Returns a [BlobWriter], which + /// implements [io::Write] and a [BlobWriter::close]. + async fn open_write(&self) -> Box<dyn BlobWriter>; +} + +/// A [tokio::io::AsyncWrite] that you need to close() afterwards, and get back +/// the digest of the written blob. +#[async_trait] +pub trait BlobWriter: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static { + /// Signal there's no more data to be written, and return the digest of the + /// contents written. + /// + /// Closing a already-closed BlobWriter is a no-op. + async fn close(&mut self) -> Result<B3Digest, Error>; +} + +/// A [tokio::io::AsyncRead] that also allows seeking. +pub trait BlobReader: + tokio::io::AsyncRead + tokio::io::AsyncSeek + tokio::io::AsyncBufRead + Send + Unpin + 'static +{ +} + +/// A [`io::Cursor<Vec<u8>>`] can be used as a BlobReader. +impl BlobReader for io::Cursor<Vec<u8>> {} diff --git a/tvix/castore/src/blobservice/naive_seeker.rs b/tvix/castore/src/blobservice/naive_seeker.rs new file mode 100644 index 000000000000..e65a82c7f45a --- /dev/null +++ b/tvix/castore/src/blobservice/naive_seeker.rs @@ -0,0 +1,269 @@ +use super::BlobReader; +use pin_project_lite::pin_project; +use std::io; +use std::task::Poll; +use tokio::io::AsyncRead; +use tracing::{debug, instrument}; + +pin_project! { + /// This implements [tokio::io::AsyncSeek] for and [tokio::io::AsyncRead] by + /// simply skipping over some bytes, keeping track of the position. + /// It fails whenever you try to seek backwards. + /// + /// ## Pinning concerns: + /// + /// [NaiveSeeker] is itself pinned by callers, and we do not need to concern + /// ourselves regarding that. + /// + /// Though, its fields as per + /// <https://doc.rust-lang.org/std/pin/#pinning-is-not-structural-for-field> + /// can be pinned or unpinned. + /// + /// So we need to go over each field and choose our policy carefully. + /// + /// The obvious cases are the bookkeeping integers we keep in the structure, + /// those are private and not shared to anyone, we never build a + /// `Pin<&mut X>` out of them at any point, therefore, we can safely never + /// mark them as pinned. Of course, it is expected that no developer here + /// attempt to `pin!(self.pos)` to pin them because it makes no sense. If + /// they have to become pinned, they should be marked `#[pin]` and we need + /// to discuss it. + /// + /// So the bookkeeping integers are in the right state with respect to their + /// pinning status. The projection should offer direct access. + /// + /// On the `r` field, i.e. a `BufReader<R>`, given that + /// <https://docs.rs/tokio/latest/tokio/io/struct.BufReader.html#impl-Unpin-for-BufReader%3CR%3E> + /// is available, even a `Pin<&mut BufReader<R>>` can be safely moved. + /// + /// The only care we should have regards the internal reader itself, i.e. + /// the `R` instance, see that Tokio decided to `#[pin]` it too: + /// <https://docs.rs/tokio/latest/src/tokio/io/util/buf_reader.rs.html#29> + /// + /// In general, there's no `Unpin` instance for `R: tokio::io::AsyncRead` + /// (see <https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html>). + /// + /// Therefore, we could keep it unpinned and pin it in every call site + /// whenever we need to call `poll_*` which can be confusing to the non- + /// expert developer and we have a fair share amount of situations where the + /// [BufReader] instance is naked, i.e. in its `&mut BufReader<R>` + /// form, this is annoying because it could lead to expose the naked `R` + /// internal instance somehow and would produce a risk of making it move + /// unexpectedly. + /// + /// We choose the path of the least resistance as we have no reason to have + /// access to the raw `BufReader<R>` instance, we just `#[pin]` it too and + /// enjoy its `poll_*` safe APIs and push the unpinning concerns to the + /// internal implementations themselves, which studied the question longer + /// than us. + pub struct NaiveSeeker<R: tokio::io::AsyncRead> { + #[pin] + r: tokio::io::BufReader<R>, + pos: u64, + bytes_to_skip: u64, + } +} + +impl<R: tokio::io::AsyncRead> NaiveSeeker<R> { + pub fn new(r: R) -> Self { + NaiveSeeker { + r: tokio::io::BufReader::new(r), + pos: 0, + bytes_to_skip: 0, + } + } +} + +impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<std::io::Result<()>> { + // The amount of data read can be determined by the increase + // in the length of the slice returned by `ReadBuf::filled`. + let filled_before = buf.filled().len(); + let this = self.project(); + let pos: &mut u64 = this.pos; + + match this.r.poll_read(cx, buf) { + Poll::Ready(a) => { + let bytes_read = buf.filled().len() - filled_before; + *pos += bytes_read as u64; + + Poll::Ready(a) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> { + fn poll_fill_buf( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<io::Result<&[u8]>> { + self.project().r.poll_fill_buf(cx) + } + + fn consume(self: std::pin::Pin<&mut Self>, amt: usize) { + let this = self.project(); + this.r.consume(amt); + let pos: &mut u64 = this.pos; + *pos += amt as u64; + } +} + +impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> { + #[instrument(skip(self))] + fn start_seek( + self: std::pin::Pin<&mut Self>, + position: std::io::SeekFrom, + ) -> std::io::Result<()> { + let absolute_offset: u64 = match position { + io::SeekFrom::Start(start_offset) => { + if start_offset < self.pos { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + format!("can't seek backwards ({} -> {})", self.pos, start_offset), + )); + } else { + start_offset + } + } + // we don't know the total size, can't support this. + io::SeekFrom::End(_end_offset) => { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "can't seek from end", + )); + } + io::SeekFrom::Current(relative_offset) => { + if relative_offset < 0 { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "can't seek backwards relative to current position", + )); + } else { + self.pos + relative_offset as u64 + } + } + }; + + debug!(absolute_offset=?absolute_offset, "seek"); + + // we already know absolute_offset is larger than self.pos + debug_assert!( + absolute_offset >= self.pos, + "absolute_offset {} is larger than self.pos {}", + absolute_offset, + self.pos + ); + + // calculate bytes to skip + *self.project().bytes_to_skip = absolute_offset - self.pos; + + Ok(()) + } + + #[instrument(skip(self))] + fn poll_complete( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<std::io::Result<u64>> { + if self.bytes_to_skip == 0 { + // return the new position (from the start of the stream) + return Poll::Ready(Ok(self.pos)); + } + + // discard some bytes, until pos is where we want it to be. + // We create a buffer that we'll discard later on. + let mut buf = [0; 1024]; + + // Loop until we've reached the desired seek position. This is done by issuing repeated + // `poll_read` calls. If the data is not available yet, we will yield back to the executor + // and wait to be polled again. + loop { + // calculate the length we want to skip at most, which is either a max + // buffer size, or the number of remaining bytes to read, whatever is + // smaller. + let bytes_to_skip = std::cmp::min(self.bytes_to_skip as usize, buf.len()); + + let mut read_buf = tokio::io::ReadBuf::new(&mut buf[..bytes_to_skip]); + + match self.as_mut().poll_read(cx, &mut read_buf) { + Poll::Ready(_a) => { + let bytes_read = read_buf.filled().len() as u64; + + if bytes_read == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!( + "tried to skip {} bytes, but only was able to skip {} until reaching EOF", + bytes_to_skip, bytes_read + ), + ))); + } + + // calculate bytes to skip + let bytes_to_skip = self.bytes_to_skip - bytes_read; + + *self.as_mut().project().bytes_to_skip = bytes_to_skip; + + if bytes_to_skip == 0 { + return Poll::Ready(Ok(self.pos)); + } + } + Poll::Pending => return Poll::Pending, + }; + } + } +} + +impl<R: tokio::io::AsyncRead + Send + Unpin + 'static> BlobReader for NaiveSeeker<R> {} + +#[cfg(test)] +mod tests { + use super::NaiveSeeker; + use std::io::{Cursor, SeekFrom}; + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + /// This seek requires multiple `poll_read` as we use a 1024 bytes internal + /// buffer when doing the seek. + /// This ensures we don't hang indefinitely. + #[tokio::test] + async fn seek() { + let buf = vec![0u8; 4096]; + let reader = Cursor::new(&buf); + let mut seeker = NaiveSeeker::new(reader); + seeker.seek(SeekFrom::Start(4000)).await.unwrap(); + } + + #[tokio::test] + async fn seek_read() { + let mut buf = vec![0u8; 2048]; + buf.extend_from_slice(&[1u8; 2048]); + buf.extend_from_slice(&[2u8; 2048]); + + let reader = Cursor::new(&buf); + let mut seeker = NaiveSeeker::new(reader); + + let mut read_buf = vec![0u8; 1024]; + seeker.read_exact(&mut read_buf).await.expect("must read"); + assert_eq!(read_buf.as_slice(), &[0u8; 1024]); + + seeker + .seek(SeekFrom::Current(1024)) + .await + .expect("must seek"); + seeker.read_exact(&mut read_buf).await.expect("must read"); + assert_eq!(read_buf.as_slice(), &[1u8; 1024]); + + seeker + .seek(SeekFrom::Start(2 * 2048)) + .await + .expect("must seek"); + seeker.read_exact(&mut read_buf).await.expect("must read"); + assert_eq!(read_buf.as_slice(), &[2u8; 1024]); + } +} diff --git a/tvix/castore/src/blobservice/sled.rs b/tvix/castore/src/blobservice/sled.rs new file mode 100644 index 000000000000..209f0b76fc7a --- /dev/null +++ b/tvix/castore/src/blobservice/sled.rs @@ -0,0 +1,249 @@ +use super::{BlobReader, BlobService, BlobWriter}; +use crate::{B3Digest, Error}; +use std::{ + io::{self, Cursor, Write}, + path::PathBuf, + task::Poll, +}; +use tonic::async_trait; +use tracing::instrument; + +#[derive(Clone)] +pub struct SledBlobService { + db: sled::Db, +} + +impl SledBlobService { + pub fn new(p: PathBuf) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +#[async_trait] +impl BlobService for SledBlobService { + /// Constructs a [SledBlobService] from the passed [url::Url]: + /// - scheme has to be `sled://` + /// - there may not be a host. + /// - a path to the sled needs to be provided (which may not be `/`). + fn from_url(url: &url::Url) -> Result<Self, Error> { + if url.scheme() != "sled" { + return Err(crate::Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() { + return Err(crate::Error::StorageError(format!( + "invalid host: {}", + url.host().unwrap() + ))); + } + + // TODO: expose compression and other parameters as URL parameters, drop new and new_temporary? + if url.path().is_empty() { + Self::new_temporary().map_err(|e| Error::StorageError(e.to_string())) + } else if url.path() == "/" { + Err(crate::Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )) + } else { + Self::new(url.path().into()).map_err(|e| Error::StorageError(e.to_string())) + } + } + + #[instrument(skip(self), fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> Result<bool, Error> { + match self.db.contains_key(digest.to_vec()) { + Ok(has) => Ok(has), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self), fields(blob.digest=%digest))] + async fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> { + match self.db.get(digest.to_vec()) { + Ok(None) => Ok(None), + Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self))] + async fn open_write(&self) -> Box<dyn BlobWriter> { + Box::new(SledBlobWriter::new(self.db.clone())) + } +} + +pub struct SledBlobWriter { + db: sled::Db, + + /// Contains the buffer Vec and hasher, or None if already closed + writers: Option<(Vec<u8>, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, +} + +impl SledBlobWriter { + pub fn new(db: sled::Db) -> Self { + Self { + db, + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, + } + } +} + +impl tokio::io::AsyncWrite for SledBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + b: &[u8], + ) -> std::task::Poll<Result<usize, io::Error>> { + Poll::Ready(match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&b[..bytes_written]) + } + }) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + Poll::Ready(match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + }) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + // shutdown is "instantaneous", we only write to a Vec<u8> as buffer. + Poll::Ready(Ok(())) + } +} + +#[async_trait] +impl BlobWriter for SledBlobWriter { + async fn close(&mut self) -> Result<B3Digest, Error> { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(crate::Error::StorageError( + "previously closed with error".to_string(), + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + let digest: B3Digest = hasher.finalize().as_bytes().into(); + + // Only insert if the blob doesn't already exist. + if !self.db.contains_key(digest.to_vec()).map_err(|e| { + Error::StorageError(format!("Unable to check if we have blob {}: {}", digest, e)) + })? { + // put buf in there. This will move buf out. + self.db + .insert(digest.to_vec(), buf) + .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::BlobService; + use super::SledBlobService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and doesn't specify a path (temporary sled). + #[test] + fn test_valid_scheme_temporary() { + let url = url::Url::parse("sled://").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_ok()); + } + + /// This sets the path to a location that doesn't exist, which should fail (as sled doesn't mkdir -p) + #[test] + fn test_nonexistent_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().join("foo").join("bar").to_str().unwrap()); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and specifies / as path (which should fail + // for obvious reasons) + #[test] + fn test_invalid_path_root() { + let url = url::Url::parse("sled:///").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and sets a tempdir as location. + #[test] + fn test_valid_scheme_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!(SledBlobService::from_url(&url).is_ok()); + } + + /// This sets a host, rather than a path, which should fail. + #[test] + fn test_invalid_host() { + let url = url::Url::parse("sled://foo.example").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This sets a host AND a valid path, which should fail + #[test] + fn test_invalid_host_and_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!(SledBlobService::from_url(&url).is_err()); + } +} diff --git a/tvix/castore/src/blobservice/tests.rs b/tvix/castore/src/blobservice/tests.rs new file mode 100644 index 000000000000..fe390b537eb8 --- /dev/null +++ b/tvix/castore/src/blobservice/tests.rs @@ -0,0 +1,246 @@ +use std::io; +use std::pin::pin; + +use test_case::test_case; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; + +use super::B3Digest; +use super::BlobService; +use super::MemoryBlobService; +use super::SledBlobService; +use crate::fixtures; + +// TODO: avoid having to define all different services we test against for all functions. +// maybe something like rstest can be used? + +fn gen_memory_blob_service() -> impl BlobService { + MemoryBlobService::default() +} +fn gen_sled_blob_service() -> impl BlobService { + SledBlobService::new_temporary().unwrap() +} + +// TODO: add GRPC blob service here. + +/// Using [BlobService::has] on a non-existing blob should return false +#[test_case(gen_memory_blob_service(); "memory")] +#[test_case(gen_sled_blob_service(); "sled")] +fn has_nonexistent_false(blob_service: impl BlobService) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + assert!(!blob_service + .has(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not fail")); + }) +} + +/// Trying to read a non-existing blob should return a None instead of a reader. +#[test_case(gen_memory_blob_service(); "memory")] +#[test_case(gen_sled_blob_service(); "sled")] +fn not_found_read(blob_service: impl BlobService) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + assert!(blob_service + .open_read(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not fail") + .is_none()) + }) +} + +/// Put a blob in the store, check has, get it back. +/// We test both with small and big blobs. +#[test_case(gen_memory_blob_service(), &fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST; "memory-small")] +#[test_case(gen_sled_blob_service(), &fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST; "sled-small")] +#[test_case(gen_memory_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "memory-big")] +#[test_case(gen_sled_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "sled-big")] +fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest: &B3Digest) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut w = blob_service.open_write().await; + + let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w) + .await + .expect("copy must succeed"); + assert_eq!( + blob_contents.len(), + l as usize, + "written bytes must match blob length" + ); + + let digest = w.close().await.expect("close must succeed"); + + assert_eq!(*blob_digest, digest, "returned digest must be correct"); + + assert!( + blob_service.has(blob_digest).await.expect("must not fail"), + "blob service should now have the blob" + ); + + let mut r = blob_service + .open_read(blob_digest) + .await + .expect("open_read must succeed") + .expect("must be some"); + + let mut buf: Vec<u8> = Vec::new(); + let mut pinned_reader = pin!(r); + let l = tokio::io::copy(&mut pinned_reader, &mut buf) + .await + .expect("copy must succeed"); + // let l = io::copy(&mut r, &mut buf).expect("copy must succeed"); + + assert_eq!( + blob_contents.len(), + l as usize, + "read bytes must match blob length" + ); + + assert_eq!(blob_contents, buf, "read blob contents must match"); + }) +} + +/// Put a blob in the store, and seek inside it a bit. +#[test_case(gen_memory_blob_service(); "memory")] +#[test_case(gen_sled_blob_service(); "sled")] +fn put_seek(blob_service: impl BlobService) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut w = blob_service.open_write().await; + + tokio::io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w) + .await + .expect("copy must succeed"); + w.close().await.expect("close must succeed"); + + // open a blob for reading + let mut r = blob_service + .open_read(&fixtures::BLOB_B_DIGEST) + .await + .expect("open_read must succeed") + .expect("must be some"); + + let mut pos: u64 = 0; + + // read the first 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected first 10 bytes to match" + ); + + pos += buf.len() as u64; + } + // seek by 0 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos)) + .await + .expect("must not fail"); + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + pos += buf.len() as u64; + } + + // seek by 5 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos + 5)) + .await + .expect("must not fail"); + pos += 5; + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + pos += buf.len() as u64; + } + + // seek by 12345 bytes, using SeekFrom:: + let p = r + .seek(io::SeekFrom::Current(12345)) + .await + .expect("must not fail"); + pos += 12345; + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + #[allow(unused_assignments)] + { + pos += buf.len() as u64; + } + } + + // seeking to the end is okay… + let p = r + .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64)) + .await + .expect("must not fail"); + pos = fixtures::BLOB_B.len() as u64; + assert_eq!(pos, p); + + { + // but it returns no more data. + let mut buf: Vec<u8> = Vec::new(); + r.read_to_end(&mut buf).await.expect("must not fail"); + assert!(buf.is_empty(), "expected no more data to be read"); + } + + // seeking past the end… + match r + .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) + .await + { + // should either be ok, but then return 0 bytes. + // this matches the behaviour or a Cursor<Vec<u8>>. + Ok(_pos) => { + let mut buf: Vec<u8> = Vec::new(); + r.read_to_end(&mut buf).await.expect("must not fail"); + assert!(buf.is_empty(), "expected no more data to be read"); + } + // or not be okay. + Err(_) => {} + } + + // TODO: this is only broken for the gRPC version + // We expect seeking backwards or relative to the end to fail. + // r.seek(io::SeekFrom::Current(-1)) + // .expect_err("SeekFrom::Current(-1) expected to fail"); + + // r.seek(io::SeekFrom::Start(pos - 1)) + // .expect_err("SeekFrom::Start(pos-1) expected to fail"); + + // r.seek(io::SeekFrom::End(0)) + // .expect_err("SeekFrom::End(_) expected to fail"); + }) +} diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs new file mode 100644 index 000000000000..a9e810aac5dd --- /dev/null +++ b/tvix/castore/src/digests.rs @@ -0,0 +1,74 @@ +use bytes::Bytes; +use data_encoding::BASE64; +use thiserror::Error; + +#[derive(PartialEq, Eq, Hash, Debug)] +pub struct B3Digest(Bytes); + +// TODO: allow converting these errors to crate::Error +#[derive(Error, Debug)] +pub enum Error { + #[error("invalid digest length: {0}")] + InvalidDigestLen(usize), +} + +pub const B3_LEN: usize = 32; + +impl B3Digest { + // returns a copy of the inner [Vec<u8>]. + pub fn to_vec(&self) -> Vec<u8> { + self.0.to_vec() + } +} + +impl From<B3Digest> for bytes::Bytes { + fn from(val: B3Digest) -> Self { + val.0 + } +} + +impl TryFrom<Vec<u8>> for B3Digest { + type Error = Error; + + // constructs a [B3Digest] from a [Vec<u8>]. + // Returns an error if the digest has the wrong length. + fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> { + if value.len() != B3_LEN { + Err(Error::InvalidDigestLen(value.len())) + } else { + Ok(Self(value.into())) + } + } +} + +impl TryFrom<bytes::Bytes> for B3Digest { + type Error = Error; + + // constructs a [B3Digest] from a [bytes::Bytes]. + // Returns an error if the digest has the wrong length. + fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> { + if value.len() != B3_LEN { + Err(Error::InvalidDigestLen(value.len())) + } else { + Ok(Self(value)) + } + } +} + +impl From<&[u8; B3_LEN]> for B3Digest { + fn from(value: &[u8; B3_LEN]) -> Self { + Self(value.to_vec().into()) + } +} + +impl Clone for B3Digest { + fn clone(&self) -> Self { + Self(self.0.to_owned()) + } +} + +impl std::fmt::Display for B3Digest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "b3:{}", BASE64.encode(&self.0)) + } +} diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs new file mode 100644 index 000000000000..776cf061096c --- /dev/null +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -0,0 +1,36 @@ +use std::sync::Arc; +use url::Url; + +use super::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService, SledDirectoryService}; + +/// Constructs a new instance of a [DirectoryService] from an URI. +/// +/// The following URIs are supported: +/// - `memory:` +/// Uses a in-memory implementation. +/// - `sled:` +/// Uses a in-memory sled implementation. +/// - `sled:///absolute/path/to/somewhere` +/// Uses sled, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. +/// - `grpc+unix:///absolute/path/to/somewhere` +/// Connects to a local tvix-store gRPC service via Unix socket. +/// - `grpc+http://host:port`, `grpc+https://host:port` +/// Connects to a (remote) tvix-store gRPC service. +pub fn from_addr(uri: &str) -> Result<Arc<dyn DirectoryService>, crate::Error> { + let url = Url::parse(uri) + .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; + + Ok(if url.scheme() == "memory" { + Arc::new(MemoryDirectoryService::from_url(&url)?) + } else if url.scheme() == "sled" { + Arc::new(SledDirectoryService::from_url(&url)?) + } else if url.scheme().starts_with("grpc+") { + Arc::new(GRPCDirectoryService::from_url(&url)?) + } else { + Err(crate::Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))? + }) +} diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs new file mode 100644 index 000000000000..7ef9f84b0a16 --- /dev/null +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -0,0 +1,529 @@ +use std::collections::HashSet; +use std::pin::Pin; + +use super::{DirectoryPutter, DirectoryService}; +use crate::proto::{self, get_directory_request::ByWhat}; +use crate::{B3Digest, Error}; +use async_stream::try_stream; +use futures::Stream; +use tokio::net::UnixStream; +use tokio::spawn; +use tokio::sync::mpsc::UnboundedSender; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic::async_trait; +use tonic::Code; +use tonic::{transport::Channel, Status}; +use tracing::{instrument, warn}; + +/// Connects to a (remote) tvix-store DirectoryService over gRPC. +#[derive(Clone)] +pub struct GRPCDirectoryService { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, +} + +impl GRPCDirectoryService { + /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + ) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl DirectoryService for GRPCDirectoryService { + /// Constructs a [GRPCDirectoryService] from the passed [url::Url]: + /// - scheme has to match `grpc+*://`. + /// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + /// - In the case of unix sockets, there must be a path, but may not be a host. + /// - In the case of non-unix sockets, there must be a host, but no path. + fn from_url(url: &url::Url) -> Result<Self, crate::Error> { + // Start checking for the scheme to start with grpc+. + match url.scheme().strip_prefix("grpc+") { + None => Err(crate::Error::StorageError("invalid scheme".to_string())), + Some(rest) => { + if rest == "unix" { + if url.host_str().is_some() { + return Err(crate::Error::StorageError( + "host may not be set".to_string(), + )); + } + let path = url.path().to_string(); + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter + .unwrap() + .connect_with_connector_lazy(tower::service_fn( + move |_: tonic::transport::Uri| UnixStream::connect(path.clone()), + )); + let grpc_client = + proto::directory_service_client::DirectoryServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } else { + // ensure path is empty, not supported with gRPC. + if !url.path().is_empty() { + return Err(crate::Error::StorageError( + "path may not be set".to_string(), + )); + } + + // clone the uri, and drop the grpc+ from the scheme. + // Recreate a new uri with the `grpc+` prefix dropped from the scheme. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let url = { + let url_str = url.to_string(); + let s_stripped = url_str.strip_prefix("grpc+").unwrap(); + url::Url::parse(s_stripped).unwrap() + }; + let channel = tonic::transport::Endpoint::try_from(url.to_string()) + .unwrap() + .connect_lazy(); + + let grpc_client = + proto::directory_service_client::DirectoryServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } + } + } + } + + async fn get( + &self, + digest: &B3Digest, + ) -> Result<Option<crate::proto::Directory>, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + let digest_cpy = digest.clone(); + let message = async move { + let mut s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(digest_cpy.into())), + }) + .await? + .into_inner(); + + // Retrieve the first message only, then close the stream (we set recursive to false) + s.message().await + }; + + let digest = digest.clone(); + match message.await { + Ok(Some(directory)) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != digest { + Err(crate::Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))) + } else if let Err(e) = directory.validate() { + // Validate the Directory itself is valid. + warn!("directory failed validation: {}", e.to_string()); + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + digest, e, + ))) + } else { + Ok(Some(directory)) + } + } + Ok(None) => Ok(None), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + let mut grpc_client = self.grpc_client.clone(); + + let resp = grpc_client.put(tokio_stream::iter(vec![directory])).await; + + match resp { + Ok(put_directory_resp) => Ok(put_directory_resp + .into_inner() + .root_digest + .try_into() + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + let mut grpc_client = self.grpc_client.clone(); + let root_directory_digest = root_directory_digest.clone(); + + let stream = try_stream! { + let mut stream = grpc_client + .get(proto::GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(root_directory_digest.clone().into())), + }) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); + + // The Directory digests we received so far + let mut received_directory_digests: HashSet<B3Digest> = HashSet::new(); + // The Directory digests we're still expecting to get sent. + let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]); + + loop { + match stream.message().await { + Ok(Some(directory)) => { + // validate the directory itself. + if let Err(e) = directory.validate() { + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + directory.digest(), + e, + )))?; + } + // validate we actually expected that directory, and move it from expected to received. + let directory_digest = directory.digest(); + let was_expected = expected_directory_digests.remove(&directory_digest); + if !was_expected { + // FUTUREWORK: dumb clients might send the same stuff twice. + // as a fallback, we might want to tolerate receiving + // it if it's in received_directory_digests (as that + // means it once was in expected_directory_digests) + Err(crate::Error::StorageError(format!( + "received unexpected directory {}", + directory_digest + )))?; + } + received_directory_digests.insert(directory_digest); + + // register all children in expected_directory_digests. + for child_directory in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + let child_directory_digest = + child_directory.digest.clone().try_into().unwrap(); + + expected_directory_digests + .insert(child_directory_digest); + } + + yield directory; + }, + Ok(None) => { + // If we were still expecting something, that's an error. + if !expected_directory_digests.is_empty() { + Err(crate::Error::StorageError(format!( + "still expected {} directories, but got premature end of stream", + expected_directory_digests.len(), + )))? + } else { + return + } + }, + Err(e) => { + Err(crate::Error::StorageError(e.to_string()))?; + }, + } + } + }; + + Box::pin(stream) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + let mut grpc_client = self.grpc_client.clone(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); + + Ok(s) + }); + + Box::new(GRPCPutter::new(tx, task)) + } +} + +/// Allows uploading multiple Directory messages in the same gRPC stream. +pub struct GRPCPutter { + /// Data about the current request - a handle to the task, and the tx part + /// of the channel. + /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request. + /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed. + #[allow(clippy::type_complexity)] // lol + rq: Option<( + JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + UnboundedSender<proto::Directory>, + )>, +} + +impl GRPCPutter { + pub fn new( + directory_sender: UnboundedSender<proto::Directory>, + task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + ) -> Self { + Self { + rq: Some((task, directory_sender)), + } + } +} + +#[async_trait] +impl DirectoryPutter for GRPCPutter { + async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + match self.rq { + // If we're not already closed, send the directory to directory_sender. + Some((_, ref directory_sender)) => { + if directory_sender.send(directory).is_err() { + // If the channel has been prematurely closed, invoke close (so we can peek at the error code) + // That error code is much more helpful, because it + // contains the error message from the server. + self.close().await?; + } + Ok(()) + } + // If self.close() was already called, we can't put again. + None => Err(Error::StorageError( + "DirectoryPutter already closed".to_string(), + )), + } + } + + /// Closes the stream for sending, and returns the value + async fn close(&mut self) -> Result<B3Digest, crate::Error> { + // get self.rq, and replace it with None. + // This ensures we can only close it once. + match std::mem::take(&mut self.rq) { + None => Err(Error::StorageError("already closed".to_string())), + Some((task, directory_sender)) => { + // close directory_sender, so blocking on task will finish. + drop(directory_sender); + + let root_digest = task + .await? + .map_err(|e| Error::StorageError(e.to_string()))? + .root_digest; + + root_digest.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + }) + } + } + } + + // allows checking if the tx part of the channel is closed. + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } +} + +#[cfg(test)] +mod tests { + use core::time; + use futures::StreamExt; + use std::{sync::Arc, time::Duration}; + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio_retry::{strategy::ExponentialBackoff, Retry}; + use tokio_stream::wrappers::UnixListenerStream; + + use crate::{ + directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService}, + fixtures::{self, DIRECTORY_A, DIRECTORY_B}, + proto::GRPCDirectoryServiceWrapper, + utils::gen_directorysvc_grpc_client, + }; + + #[tokio::test] + async fn test() { + // create the GrpcDirectoryService + let directory_service = + super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await); + + // try to get DIRECTORY_A should return Ok(None) + assert_eq!( + None, + directory_service + .get(&DIRECTORY_A.digest()) + .await + .expect("must not fail") + ); + + // Now upload it + assert_eq!( + DIRECTORY_A.digest(), + directory_service + .put(DIRECTORY_A.clone()) + .await + .expect("must succeed") + ); + + // And retrieve it, compare for equality. + assert_eq!( + DIRECTORY_A.clone(), + directory_service + .get(&DIRECTORY_A.digest()) + .await + .expect("must succeed") + .expect("must be some") + ); + + // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. + directory_service + .put(DIRECTORY_B.clone()) + .await + .expect_err("must fail"); + + // Putting DIRECTORY_B in a put_multiple will succeed, but the close + // will always fail. + { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + handle.close().await.expect_err("must fail"); + } + + // Uploading A and then B should succeed, and closing should return the digest of B. + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + let digest = handle.close().await.expect("must succeed"); + assert_eq!(DIRECTORY_B.digest(), digest); + + // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. + let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); + assert_eq!( + DIRECTORY_B.clone(), + directories_it + .next() + .await + .expect("must be some") + .expect("must succeed") + ); + assert_eq!( + DIRECTORY_A.clone(), + directories_it + .next() + .await + .expect("must be some") + .expect("must succeed") + ); + + // Uploading B and then A should fail, because B refers to A, which + // hasn't been uploaded yet. + // However, the client can burst, so we might not have received the + // error back from the server. + { + let mut handle = directory_service.put_multiple_start(); + // sending out B will always be fine + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + + // whether we will be able to put A as well depends on whether we + // already received the error about B. + if handle.put(DIRECTORY_A.clone()).await.is_ok() { + // If we didn't, and this was Ok(_), … + // a subsequent close MUST fail (because it waits for the + // server) + handle.close().await.expect_err("must fail"); + } + } + + // Now we do the same test as before, send B, then A, but wait + // sufficiently enough for the server to have s + // to close us the stream, + // and then assert that uploading anything else via the handle will fail. + { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + + let mut is_closed = false; + for _try in 1..1000 { + if handle.is_closed() { + is_closed = true; + break; + } + tokio::time::sleep(time::Duration::from_millis(10)).await; + } + + assert!( + is_closed, + "expected channel to eventually close, but never happened" + ); + + handle + .put(DIRECTORY_A.clone()) + .await + .expect_err("must fail"); + } + } + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("daemon"); + + let path_clone = socket_path.clone(); + + // Spin up a server + tokio::spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = server.add_service( + crate::proto::directory_service_server::DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from( + Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService> + ), + ), + ); + router.serve_with_incoming(uds_stream).await + }); + + // wait for the socket to be created + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // prepare a client + let grpc_client = { + let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) + .expect("must parse"); + GRPCDirectoryService::from_url(&url).expect("must succeed") + }; + + assert!(grpc_client + .get(&fixtures::DIRECTORY_A.digest()) + .await + .expect("must not fail") + .is_none()) + } +} diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs new file mode 100644 index 000000000000..ac67c999d01b --- /dev/null +++ b/tvix/castore/src/directoryservice/memory.rs @@ -0,0 +1,149 @@ +use crate::{proto, B3Digest, Error}; +use futures::Stream; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; +use tonic::async_trait; +use tracing::{instrument, warn}; + +use super::utils::{traverse_directory, SimplePutter}; +use super::{DirectoryPutter, DirectoryService}; + +#[derive(Clone, Default)] +pub struct MemoryDirectoryService { + db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, +} + +#[async_trait] +impl DirectoryService for MemoryDirectoryService { + /// Constructs a [MemoryDirectoryService] from the passed [url::Url]: + /// - scheme has to be `memory://` + /// - there may not be a host. + /// - there may not be a path. + fn from_url(url: &url::Url) -> Result<Self, Error> { + if url.scheme() != "memory" { + return Err(crate::Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() || !url.path().is_empty() { + return Err(crate::Error::StorageError("invalid url".to_string())); + } + + Ok(Self::default()) + } + + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + let db = self.db.read()?; + + match db.get(digest) { + // The directory was not found, return + None => Ok(None), + + // The directory was found, try to parse the data as Directory message + Some(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != *digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))); + } + + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + actual_digest, e, + ))); + } + + Ok(Some(directory.clone())) + } + } + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + + // store it + let mut db = self.db.write()?; + db.insert(digest.clone(), directory); + + Ok(digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + Box::new(SimplePutter::new(self.clone())) + } +} + +#[cfg(test)] +mod tests { + use super::DirectoryService; + use super::MemoryDirectoryService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(MemoryDirectoryService::from_url(&url).is_err()); + } + + /// This correctly sets the scheme, and doesn't set a path. + #[test] + fn test_valid_scheme() { + let url = url::Url::parse("memory://").expect("must parse"); + + assert!(MemoryDirectoryService::from_url(&url).is_ok()); + } + + /// This sets the host to `foo` + #[test] + fn test_invalid_host() { + let url = url::Url::parse("memory://foo").expect("must parse"); + + assert!(MemoryDirectoryService::from_url(&url).is_err()); + } + + /// This has the path "/", which is invalid. + #[test] + fn test_invalid_has_path() { + let url = url::Url::parse("memory:///").expect("must parse"); + + assert!(MemoryDirectoryService::from_url(&url).is_err()); + } + + /// This has the path "/foo", which is invalid. + #[test] + fn test_invalid_path2() { + let url = url::Url::parse("memory:///foo").expect("must parse"); + + assert!(MemoryDirectoryService::from_url(&url).is_err()); + } +} diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs new file mode 100644 index 000000000000..3b26f4baf79b --- /dev/null +++ b/tvix/castore/src/directoryservice/mod.rs @@ -0,0 +1,76 @@ +use crate::{proto, B3Digest, Error}; +use futures::Stream; +use std::pin::Pin; +use tonic::async_trait; + +mod from_addr; +mod grpc; +mod memory; +mod sled; +mod traverse; +mod utils; + +pub use self::from_addr::from_addr; +pub use self::grpc::GRPCDirectoryService; +pub use self::memory::MemoryDirectoryService; +pub use self::sled::SledDirectoryService; +pub use self::traverse::descend_to; + +/// The base trait all Directory services need to implement. +/// This is a simple get and put of [crate::proto::Directory], returning their +/// digest. +#[async_trait] +pub trait DirectoryService: Send + Sync { + /// Create a new instance by passing in a connection URL. + /// TODO: check if we want to make this async, instead of lazily connecting + fn from_url(url: &url::Url) -> Result<Self, Error> + where + Self: Sized; + + /// Get looks up a single Directory message by its digest. + /// In case the directory is not found, Ok(None) is returned. + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + /// Get uploads a single Directory message, and returns the calculated + /// digest, or an error. + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + + /// Looks up a closure of [proto::Directory]. + /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, + /// and we'd be able to add a default implementation for it here, but + /// we can't have that yet. + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>; + + /// Allows persisting a closure of [proto::Directory], which is a graph of + /// connected Directory messages. + fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; +} + +/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// +/// The consumer can periodically call [DirectoryPutter::put], starting from the +/// leaves. Once the root is reached, [DirectoryPutter::close] can be called to +/// retrieve the root digest (or an error). +#[async_trait] +pub trait DirectoryPutter: Send { + /// Put a individual [proto::Directory] into the store. + /// Error semantics and behaviour is up to the specific implementation of + /// this trait. + /// Due to bursting, the returned error might refer to an object previously + /// sent via `put`. + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + + /// Close the stream, and wait for any errors. + async fn close(&mut self) -> Result<B3Digest, Error>; + + /// Return whether the stream is closed or not. + /// Used from some [DirectoryService] implementations only. + fn is_closed(&self) -> bool; +} diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs new file mode 100644 index 000000000000..0dc5496803cb --- /dev/null +++ b/tvix/castore/src/directoryservice/sled.rs @@ -0,0 +1,213 @@ +use crate::directoryservice::DirectoryPutter; +use crate::proto::Directory; +use crate::{proto, B3Digest, Error}; +use futures::Stream; +use prost::Message; +use std::path::PathBuf; +use std::pin::Pin; +use tonic::async_trait; +use tracing::{instrument, warn}; + +use super::utils::{traverse_directory, SimplePutter}; +use super::DirectoryService; + +#[derive(Clone)] +pub struct SledDirectoryService { + db: sled::Db, +} + +impl SledDirectoryService { + pub fn new(p: PathBuf) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +#[async_trait] +impl DirectoryService for SledDirectoryService { + /// Constructs a [SledDirectoryService] from the passed [url::Url]: + /// - scheme has to be `sled://` + /// - there may not be a host. + /// - a path to the sled needs to be provided (which may not be `/`). + fn from_url(url: &url::Url) -> Result<Self, Error> { + if url.scheme() != "sled" { + return Err(crate::Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() { + return Err(crate::Error::StorageError(format!( + "invalid host: {}", + url.host().unwrap() + ))); + } + + // TODO: expose compression and other parameters as URL parameters, drop new and new_temporary? + if url.path().is_empty() { + Self::new_temporary().map_err(|e| Error::StorageError(e.to_string())) + } else if url.path() == "/" { + Err(crate::Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )) + } else { + Self::new(url.path().into()).map_err(|e| Error::StorageError(e.to_string())) + } + } + + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + match self.db.get(digest.to_vec()) { + // The directory was not found, return + Ok(None) => Ok(None), + + // The directory was found, try to parse the data as Directory message + Ok(Some(data)) => match Directory::decode(&*data) { + Ok(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != *digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))); + } + + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + actual_digest, e, + ))); + } + + Ok(Some(directory)) + } + Err(e) => { + warn!("unable to parse directory {}: {}", digest, e); + Err(Error::StorageError(e.to_string())) + } + }, + // some storage error? + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + // store it + let result = self.db.insert(digest.to_vec(), directory.encode_to_vec()); + if let Err(e) = result { + return Err(Error::StorageError(e.to_string())); + } + Ok(digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Pin<Box<(dyn Stream<Item = Result<proto::Directory, Error>> + Send + 'static)>> { + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + Box::new(SimplePutter::new(self.clone())) + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::DirectoryService; + use super::SledDirectoryService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(SledDirectoryService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and doesn't specify a path (temporary sled). + #[test] + fn test_valid_scheme_temporary() { + let url = url::Url::parse("sled://").expect("must parse"); + + assert!(SledDirectoryService::from_url(&url).is_ok()); + } + + /// This sets the path to a location that doesn't exist, which should fail (as sled doesn't mkdir -p) + #[test] + fn test_nonexistent_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().join("foo").join("bar").to_str().unwrap()); + + assert!(SledDirectoryService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and specifies / as path (which should fail + // for obvious reasons) + #[test] + fn test_invalid_path_root() { + let url = url::Url::parse("sled:///").expect("must parse"); + + assert!(SledDirectoryService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and sets a tempdir as location. + #[test] + fn test_valid_scheme_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!(SledDirectoryService::from_url(&url).is_ok()); + } + + /// This sets a host, rather than a path, which should fail. + #[test] + fn test_invalid_host() { + let url = url::Url::parse("sled://foo.example").expect("must parse"); + + assert!(SledDirectoryService::from_url(&url).is_err()); + } + + /// This sets a host AND a valid path, which should fail + #[test] + fn test_invalid_host_and_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!(SledDirectoryService::from_url(&url).is_err()); + } +} diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs new file mode 100644 index 000000000000..4f011c963c06 --- /dev/null +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -0,0 +1,228 @@ +use super::DirectoryService; +use crate::{proto::NamedNode, B3Digest, Error}; +use std::{os::unix::ffi::OsStrExt, sync::Arc}; +use tracing::{instrument, warn}; + +/// This descends from a (root) node to the given (sub)path, returning the Node +/// at that path, or none, if there's nothing at that path. +#[instrument(skip(directory_service))] +pub async fn descend_to( + directory_service: Arc<dyn DirectoryService>, + root_node: crate::proto::node::Node, + path: &std::path::Path, +) -> Result<Option<crate::proto::node::Node>, Error> { + // strip a possible `/` prefix from the path. + let path = { + if path.starts_with("/") { + path.strip_prefix("/").unwrap() + } else { + path + } + }; + + let mut cur_node = root_node; + let mut it = path.components(); + + loop { + match it.next() { + None => { + // the (remaining) path is empty, return the node we're current at. + return Ok(Some(cur_node)); + } + Some(first_component) => { + match cur_node { + crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => { + // There's still some path left, but the current node is no directory. + // This means the path doesn't exist, as we can't reach it. + return Ok(None); + } + crate::proto::node::Node::Directory(directory_node) => { + let digest: B3Digest = directory_node.digest.try_into().map_err(|_e| { + Error::StorageError("invalid digest length".to_string()) + })?; + + // fetch the linked node from the directory_service + match directory_service.get(&digest).await? { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + None => { + warn!("directory {} does not exist", digest); + + return Err(Error::StorageError(format!( + "directory {} does not exist", + digest + ))); + } + Some(directory) => { + // look for first_component in the [Directory]. + // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we + // could stop as soon as e.name is larger than the search string. + let child_node = directory.nodes().find(|n| { + n.get_name() == first_component.as_os_str().as_bytes() + }); + + match child_node { + // child node not found means there's no such element inside the directory. + None => { + return Ok(None); + } + // child node found, return to top-of loop to find the next + // node in the path. + Some(child_node) => { + cur_node = child_node; + } + } + } + } + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}; + use crate::utils::gen_directory_service; + + use super::descend_to; + + #[tokio::test] + async fn test_descend_to() { + let directory_service = gen_directory_service(); + + let mut handle = directory_service.put_multiple_start(); + handle + .put(DIRECTORY_WITH_KEEP.clone()) + .await + .expect("must succeed"); + handle + .put(DIRECTORY_COMPLICATED.clone()) + .await + .expect("must succeed"); + + // construct the node for DIRECTORY_COMPLICATED + let node_directory_complicated = + crate::proto::node::Node::Directory(crate::proto::DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }); + + // construct the node for DIRECTORY_COMPLICATED + let node_directory_with_keep = crate::proto::node::Node::Directory( + DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), + ); + + // construct the node for the .keep file + let node_file_keep = + crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + + // traversal to an empty subpath should return the root node. + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from(""), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_directory_complicated.clone()), resp); + } + + // traversal to `keep` should return the node for DIRECTORY_WITH_KEEP + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("keep"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_directory_with_keep), resp); + } + + // traversal to `keep/.keep` should return the node for the .keep file + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("keep/.keep"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_file_keep.clone()), resp); + } + + // traversal to `keep/.keep` should return the node for the .keep file + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("/keep/.keep"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_file_keep), resp); + } + + // traversal to `void` should return None (doesn't exist) + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("void"), + ) + .await + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to `void` should return None (doesn't exist) + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("//v/oid"), + ) + .await + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to `keep/.keep/404` should return None (the path can't be + // reached, as keep/.keep already is a file) + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("keep/.keep/foo"), + ) + .await + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to a subpath of '/' should return the root node. + { + let resp = descend_to( + directory_service.clone(), + node_directory_complicated.clone(), + &PathBuf::from("/"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_directory_complicated), resp); + } + } +} diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs new file mode 100644 index 000000000000..4c5e7cfde37c --- /dev/null +++ b/tvix/castore/src/directoryservice/utils.rs @@ -0,0 +1,140 @@ +use super::DirectoryPutter; +use super::DirectoryService; +use crate::proto; +use crate::B3Digest; +use crate::Error; +use async_stream::stream; +use futures::Stream; +use std::collections::{HashSet, VecDeque}; +use std::pin::Pin; +use tonic::async_trait; +use tracing::warn; + +/// Traverses a [proto::Directory] from the root to the children. +/// +/// This is mostly BFS, but directories are only returned once. +pub fn traverse_directory<DS: DirectoryService + 'static>( + directory_service: DS, + root_directory_digest: &B3Digest, +) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + // The list of all directories that still need to be traversed. The next + // element is picked from the front, new elements are enqueued at the + // back. + let mut worklist_directory_digests: VecDeque<B3Digest> = + VecDeque::from([root_directory_digest.clone()]); + // The list of directory digests already sent to the consumer. + // We omit sending the same directories multiple times. + let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new(); + + let stream = stream! { + while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { + match directory_service.get(¤t_directory_digest).await { + // if it's not there, we have an inconsistent store! + Ok(None) => { + warn!("directory {} does not exist", current_directory_digest); + yield Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + ))); + } + Err(e) => { + warn!("failed to look up directory"); + yield Err(Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_digest, e + ))); + } + + // if we got it + Ok(Some(current_directory)) => { + // validate, we don't want to send invalid directories. + if let Err(e) = current_directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + yield Err(Error::StorageError(format!( + "invalid directory: {}", + current_directory_digest + ))); + } + + // We're about to send this directory, so let's avoid sending it again if a + // descendant has it. + sent_directory_digests.insert(current_directory_digest); + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + for child_directory_node in ¤t_directory.directories { + // TODO: propagate error + let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + + if worklist_directory_digests.contains(&child_digest) + || sent_directory_digests.contains(&child_digest) + { + continue; + } + worklist_directory_digests.push_back(child_digest); + } + + yield Ok(current_directory); + } + }; + } + }; + + Box::pin(stream) +} + +/// This is a simple implementation of a Directory uploader. +/// TODO: verify connectivity? Factor out these checks into generic helpers? +pub struct SimplePutter<DS: DirectoryService> { + directory_service: DS, + last_directory_digest: Option<B3Digest>, + closed: bool, +} + +impl<DS: DirectoryService> SimplePutter<DS> { + pub fn new(directory_service: DS) -> Self { + Self { + directory_service, + closed: false, + last_directory_digest: None, + } + } +} + +#[async_trait] +impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + + let digest = self.directory_service.put(directory).await?; + + // track the last directory digest + self.last_directory_digest = Some(digest); + + Ok(()) + } + + /// We need to be mutable here, as that's the signature of the trait. + async fn close(&mut self) -> Result<B3Digest, Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + + match &self.last_directory_digest { + Some(last_digest) => { + self.closed = true; + Ok(last_digest.clone()) + } + None => Err(Error::InvalidRequest( + "no directories sent, can't show root digest".to_string(), + )), + } + } + + fn is_closed(&self) -> bool { + self.closed + } +} diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs new file mode 100644 index 000000000000..3b23f972b045 --- /dev/null +++ b/tvix/castore/src/errors.rs @@ -0,0 +1,45 @@ +use std::sync::PoisonError; +use thiserror::Error; +use tokio::task::JoinError; +use tonic::Status; + +/// Errors related to communication with the store. +#[derive(Debug, Error)] +pub enum Error { + #[error("invalid request: {0}")] + InvalidRequest(String), + + #[error("internal storage error: {0}")] + StorageError(String), +} + +impl<T> From<PoisonError<T>> for Error { + fn from(value: PoisonError<T>) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<JoinError> for Error { + fn from(value: JoinError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<Error> for Status { + fn from(value: Error) -> Self { + match value { + Error::InvalidRequest(msg) => Status::invalid_argument(msg), + Error::StorageError(msg) => Status::data_loss(format!("storage error: {}", msg)), + } + } +} + +// TODO: this should probably go somewhere else? +impl From<Error> for std::io::Error { + fn from(value: Error) -> Self { + match value { + Error::InvalidRequest(msg) => Self::new(std::io::ErrorKind::InvalidInput, msg), + Error::StorageError(msg) => Self::new(std::io::ErrorKind::Other, msg), + } + } +} diff --git a/tvix/castore/src/fixtures.rs b/tvix/castore/src/fixtures.rs new file mode 100644 index 000000000000..ed3d1ca6e855 --- /dev/null +++ b/tvix/castore/src/fixtures.rs @@ -0,0 +1,95 @@ +use crate::{ + proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode}, + B3Digest, +}; +use lazy_static::lazy_static; + +pub const HELLOWORLD_BLOB_CONTENTS: &[u8] = b"Hello World!"; +pub const EMPTY_BLOB_CONTENTS: &[u8] = b""; + +lazy_static! { + pub static ref DUMMY_DIGEST: B3Digest = { + let u: &[u8; 32] = &[ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + ]; + u.into() + }; + pub static ref DUMMY_DIGEST_2: B3Digest = { + let u: &[u8; 32] = &[ + 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + ]; + u.into() + }; + pub static ref DUMMY_DATA_1: bytes::Bytes = vec![0x01, 0x02, 0x03].into(); + pub static ref DUMMY_DATA_2: bytes::Bytes = vec![0x04, 0x05].into(); + + pub static ref HELLOWORLD_BLOB_DIGEST: B3Digest = + blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into(); + pub static ref EMPTY_BLOB_DIGEST: B3Digest = + blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into(); + + // 2 bytes + pub static ref BLOB_A: bytes::Bytes = vec![0x00, 0x01].into(); + pub static ref BLOB_A_DIGEST: B3Digest = blake3::hash(&BLOB_A).as_bytes().into(); + + // 1MB + pub static ref BLOB_B: bytes::Bytes = (0..255).collect::<Vec<u8>>().repeat(4 * 1024).into(); + pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into(); + + // Directories + pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory { + directories: vec![], + files: vec![FileNode { + name: b".keep".to_vec().into(), + digest: EMPTY_BLOB_DIGEST.clone().into(), + size: 0, + executable: false, + }], + symlinks: vec![], + }; + pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory { + directories: vec![DirectoryNode { + name: b"keep".to_vec().into(), + digest: DIRECTORY_WITH_KEEP.digest().into(), + size: DIRECTORY_WITH_KEEP.size(), + }], + files: vec![FileNode { + name: b".keep".to_vec().into(), + digest: EMPTY_BLOB_DIGEST.clone().into(), + size: 0, + executable: false, + }], + symlinks: vec![SymlinkNode { + name: b"aa".to_vec().into(), + target: b"/nix/store/somewhereelse".to_vec().into(), + }], + }; + pub static ref DIRECTORY_A: Directory = Directory::default(); + pub static ref DIRECTORY_B: Directory = Directory { + directories: vec![DirectoryNode { + name: b"a".to_vec().into(), + digest: DIRECTORY_A.digest().into(), + size: DIRECTORY_A.size(), + }], + ..Default::default() + }; + pub static ref DIRECTORY_C: Directory = Directory { + directories: vec![ + DirectoryNode { + name: b"a".to_vec().into(), + digest: DIRECTORY_A.digest().into(), + size: DIRECTORY_A.size(), + }, + DirectoryNode { + name: b"a'".to_vec().into(), + digest: DIRECTORY_A.digest().into(), + size: DIRECTORY_A.size(), + } + ], + ..Default::default() + }; +} diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs new file mode 100644 index 000000000000..4c0037e34a33 --- /dev/null +++ b/tvix/castore/src/import.rs @@ -0,0 +1,204 @@ +use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryPutter; +use crate::directoryservice::DirectoryService; +use crate::proto::node::Node; +use crate::proto::Directory; +use crate::proto::DirectoryNode; +use crate::proto::FileNode; +use crate::proto::SymlinkNode; +use crate::Error as CastoreError; +use std::os::unix::ffi::OsStrExt; +use std::sync::Arc; +use std::{ + collections::HashMap, + fmt::Debug, + os::unix::prelude::PermissionsExt, + path::{Path, PathBuf}, +}; +use tracing::instrument; +use walkdir::WalkDir; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed to upload directory at {0}: {1}")] + UploadDirectoryError(PathBuf, CastoreError), + + #[error("invalid encoding encountered for entry {0:?}")] + InvalidEncoding(PathBuf), + + #[error("unable to stat {0}: {1}")] + UnableToStat(PathBuf, std::io::Error), + + #[error("unable to open {0}: {1}")] + UnableToOpen(PathBuf, std::io::Error), + + #[error("unable to read {0}: {1}")] + UnableToRead(PathBuf, std::io::Error), +} + +impl From<CastoreError> for Error { + fn from(value: CastoreError) -> Self { + match value { + CastoreError::InvalidRequest(_) => panic!("tvix bug"), + CastoreError::StorageError(_) => panic!("error"), + } + } +} + +// This processes a given [walkdir::DirEntry] and returns a +// proto::node::Node, depending on the type of the entry. +// +// If the entry is a file, its contents are uploaded. +// If the entry is a directory, the Directory is uploaded as well. +// For this to work, it relies on the caller to provide the directory object +// with the previously returned (child) nodes. +// +// It assumes entries to be returned in "contents first" order, means this +// will only be called with a directory if all children of it have been +// visited. If the entry is indeed a directory, it'll also upload that +// directory to the store. For this, the so-far-assembled Directory object for +// this path needs to be passed in. +// +// It assumes the caller adds returned nodes to the directories it assembles. +#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] +async fn process_entry<'a>( + blob_service: Arc<dyn BlobService>, + directory_putter: &'a mut Box<dyn DirectoryPutter>, + entry: &'a walkdir::DirEntry, + maybe_directory: Option<Directory>, +) -> Result<Node, Error> { + let file_type = entry.file_type(); + + if file_type.is_dir() { + let directory = maybe_directory + .expect("tvix bug: must be called with some directory in the case of directory"); + let directory_digest = directory.digest(); + let directory_size = directory.size(); + + // upload this directory + directory_putter + .put(directory) + .await + .map_err(|e| Error::UploadDirectoryError(entry.path().to_path_buf(), e))?; + + return Ok(Node::Directory(DirectoryNode { + name: entry.file_name().as_bytes().to_owned().into(), + digest: directory_digest.into(), + size: directory_size, + })); + } + + if file_type.is_symlink() { + let target: bytes::Bytes = std::fs::read_link(entry.path()) + .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))? + .as_os_str() + .as_bytes() + .to_owned() + .into(); + + return Ok(Node::Symlink(SymlinkNode { + name: entry.file_name().as_bytes().to_owned().into(), + target, + })); + } + + if file_type.is_file() { + let metadata = entry + .metadata() + .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?; + + let mut file = tokio::fs::File::open(entry.path()) + .await + .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?; + + let mut writer = blob_service.open_write().await; + + if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { + return Err(Error::UnableToRead(entry.path().to_path_buf(), e)); + }; + + let digest = writer.close().await?; + + return Ok(Node::File(FileNode { + name: entry.file_name().as_bytes().to_vec().into(), + digest: digest.into(), + size: metadata.len() as u32, + // If it's executable by the user, it'll become executable. + // This matches nix's dump() function behaviour. + executable: metadata.permissions().mode() & 64 != 0, + })); + } + todo!("handle other types") +} + +/// Ingests the contents at the given path into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// It returns the root node or an error. +/// +/// It does not follow symlinks at the root, they will be ingested as actual +/// symlinks. +/// +/// It's not interacting with a PathInfoService (from tvix-store), or anything +/// else giving it a "non-content-addressed name". +/// It's up to the caller to possibly register it somewhere (and potentially +/// rename it based on some naming scheme) +#[instrument(skip(blob_service, directory_service), fields(path=?p))] +pub async fn ingest_path<P: AsRef<Path> + Debug>( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + p: P, +) -> Result<Node, Error> { + let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); + + let mut directory_putter = directory_service.put_multiple_start(); + + for entry in WalkDir::new(p) + .follow_links(false) + .follow_root_links(false) + .contents_first(true) + .sort_by_file_name() + { + let entry = entry.unwrap(); + + // process_entry wants an Option<Directory> in case the entry points to a directory. + // make sure to provide it. + let maybe_directory: Option<Directory> = { + if entry.file_type().is_dir() { + Some( + directories + .entry(entry.path().to_path_buf()) + .or_default() + .clone(), + ) + } else { + None + } + }; + + let node = process_entry( + blob_service.clone(), + &mut directory_putter, + &entry, + maybe_directory, + ) + .await?; + + if entry.depth() == 0 { + return Ok(node); + } else { + // calculate the parent path, and make sure we register the node there. + // NOTE: entry.depth() > 0 + let parent_path = entry.path().parent().unwrap().to_path_buf(); + + // record node in parent directory, creating a new [proto:Directory] if not there yet. + let parent_directory = directories.entry(parent_path).or_default(); + match node { + Node::Directory(e) => parent_directory.directories.push(e), + Node::File(e) => parent_directory.files.push(e), + Node::Symlink(e) => parent_directory.symlinks.push(e), + } + } + } + // unreachable, we already bailed out before if root doesn't exist. + panic!("tvix bug") +} diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs new file mode 100644 index 000000000000..d51022428e5b --- /dev/null +++ b/tvix/castore/src/lib.rs @@ -0,0 +1,15 @@ +mod digests; +mod errors; + +pub mod blobservice; +pub mod directoryservice; +pub mod fixtures; +pub mod import; +pub mod proto; +pub mod utils; + +pub use digests::{B3Digest, B3_LEN}; +pub use errors::Error; + +#[cfg(test)] +mod tests; diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs new file mode 100644 index 000000000000..93db1deef69a --- /dev/null +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -0,0 +1,177 @@ +use crate::blobservice::BlobService; +use core::pin::pin; +use futures::TryFutureExt; +use std::{ + collections::VecDeque, + io, + ops::{Deref, DerefMut}, + pin::Pin, + sync::Arc, +}; +use tokio_stream::StreamExt; +use tokio_util::io::ReaderStream; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{instrument, warn}; + +pub struct GRPCBlobServiceWrapper { + blob_service: Arc<dyn BlobService>, +} + +impl From<Arc<dyn BlobService>> for GRPCBlobServiceWrapper { + fn from(value: Arc<dyn BlobService>) -> Self { + Self { + blob_service: value, + } + } +} + +// This is necessary because bytes::BytesMut comes up with +// a default 64 bytes capacity that cannot be changed +// easily if you assume a bytes::BufMut trait implementation +// Therefore, we override the Default implementation here +// TODO(raitobezarius?): upstream me properly +struct BytesMutWithDefaultCapacity<const N: usize> { + inner: bytes::BytesMut, +} + +impl<const N: usize> Deref for BytesMutWithDefaultCapacity<N> { + type Target = bytes::BytesMut; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<const N: usize> DerefMut for BytesMutWithDefaultCapacity<N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl<const N: usize> Default for BytesMutWithDefaultCapacity<N> { + fn default() -> Self { + BytesMutWithDefaultCapacity { + inner: bytes::BytesMut::with_capacity(N), + } + } +} + +impl<const N: usize> bytes::Buf for BytesMutWithDefaultCapacity<N> { + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn chunk(&self) -> &[u8] { + self.inner.chunk() + } + + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt); + } +} + +unsafe impl<const N: usize> bytes::BufMut for BytesMutWithDefaultCapacity<N> { + fn remaining_mut(&self) -> usize { + self.inner.remaining_mut() + } + + unsafe fn advance_mut(&mut self, cnt: usize) { + self.inner.advance_mut(cnt); + } + + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + self.inner.chunk_mut() + } +} + +#[async_trait] +impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { + // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 + type ReadStream = + Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>; + + #[instrument(skip(self))] + async fn stat( + &self, + request: Request<super::StatBlobRequest>, + ) -> Result<Response<super::BlobMeta>, Status> { + let rq = request.into_inner(); + let req_digest = rq + .digest + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.has(&req_digest).await { + Ok(true) => Ok(Response::new(super::BlobMeta::default())), + Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))), + Err(e) => Err(e.into()), + } + } + + #[instrument(skip(self))] + async fn read( + &self, + request: Request<super::ReadBlobRequest>, + ) -> Result<Response<Self::ReadStream>, Status> { + let rq = request.into_inner(); + + let req_digest = rq + .digest + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.open_read(&req_digest).await { + Ok(Some(reader)) => { + fn stream_mapper( + x: Result<bytes::Bytes, io::Error>, + ) -> Result<super::BlobChunk, Status> { + match x { + Ok(bytes) => Ok(super::BlobChunk { data: bytes }), + Err(e) => Err(Status::from(e)), + } + } + + let chunks_stream = ReaderStream::new(reader).map(stream_mapper); + Ok(Response::new(Box::pin(chunks_stream))) + } + Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), + Err(e) => Err(e.into()), + } + } + + #[instrument(skip(self))] + async fn put( + &self, + request: Request<Streaming<super::BlobChunk>>, + ) -> Result<Response<super::PutBlobResponse>, Status> { + let req_inner = request.into_inner(); + + let data_stream = req_inner.map(|x| { + x.map(|x| VecDeque::from(x.data.to_vec())) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) + }); + + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); + + let mut blob_writer = pin!(self.blob_service.open_write().await); + + tokio::io::copy(&mut data_reader, &mut blob_writer) + .await + .map_err(|e| { + warn!("error copying: {}", e); + Status::internal("error copying") + })?; + + let digest = blob_writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") + }) + .await? + .to_vec(); + + Ok(Response::new(super::PutBlobResponse { + digest: digest.into(), + })) + } +} diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs new file mode 100644 index 000000000000..5e143a7bd7a8 --- /dev/null +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -0,0 +1,184 @@ +use crate::proto; +use crate::{directoryservice::DirectoryService, B3Digest}; +use futures::StreamExt; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::{sync::mpsc::channel, task}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{debug, instrument, warn}; + +pub struct GRPCDirectoryServiceWrapper { + directory_service: Arc<dyn DirectoryService>, +} + +impl From<Arc<dyn DirectoryService>> for GRPCDirectoryServiceWrapper { + fn from(value: Arc<dyn DirectoryService>) -> Self { + Self { + directory_service: value, + } + } +} + +#[async_trait] +impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper { + type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>; + + #[instrument(skip(self))] + async fn get( + &self, + request: Request<proto::GetDirectoryRequest>, + ) -> Result<Response<Self::GetStream>, Status> { + let (tx, rx) = channel(5); + + let req_inner = request.into_inner(); + + let directory_service = self.directory_service.clone(); + + let _task = { + // look at the digest in the request and put it in the top of the queue. + match &req_inner.by_what { + None => return Err(Status::invalid_argument("by_what needs to be specified")), + Some(proto::get_directory_request::ByWhat::Digest(ref digest)) => { + let digest: B3Digest = digest + .clone() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + task::spawn(async move { + if !req_inner.recursive { + let e: Result<proto::Directory, Status> = + match directory_service.get(&digest).await { + Ok(Some(directory)) => Ok(directory), + Ok(None) => Err(Status::not_found(format!( + "directory {} not found", + digest + ))), + Err(e) => Err(e.into()), + }; + + if tx.send(e).await.is_err() { + debug!("receiver dropped"); + } + } else { + // If recursive was requested, traverse via get_recursive. + let mut directories_it = directory_service.get_recursive(&digest); + + while let Some(e) = directories_it.next().await { + // map err in res from Error to Status + let res = e.map_err(|e| Status::internal(e.to_string())); + if tx.send(res).await.is_err() { + debug!("receiver dropped"); + break; + } + } + } + }); + } + } + }; + + let receiver_stream = ReceiverStream::new(rx); + Ok(Response::new(receiver_stream)) + } + + #[instrument(skip(self, request))] + async fn put( + &self, + request: Request<Streaming<proto::Directory>>, + ) -> Result<Response<proto::PutDirectoryResponse>, Status> { + let mut req_inner = request.into_inner(); + // TODO: let this use DirectoryPutter to the store it's connected to, + // and move the validation logic into [SimplePutter]. + + // This keeps track of the seen directory keys, and their size. + // This is used to validate the size field of a reference to a previously sent directory. + // We don't need to keep the contents around, they're stored in the DB. + // https://github.com/rust-lang/rust-clippy/issues/5812 + #[allow(clippy::mutable_key_type)] + let mut seen_directories_sizes: HashMap<B3Digest, u32> = HashMap::new(); + let mut last_directory_dgst: Option<B3Digest> = None; + + // Consume directories, and insert them into the store. + // Reject directory messages that refer to Directories not sent in the same stream. + while let Some(directory) = req_inner.message().await? { + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Status::invalid_argument(format!( + "directory {} failed validation: {}", + directory.digest(), + e, + ))); + } + + // for each child directory this directory refers to, we need + // to ensure it has been seen already in this stream, and that the size + // matches what we recorded. + for child_directory in &directory.directories { + let child_directory_digest: B3Digest = child_directory + .digest + .clone() + .try_into() + .map_err(|_e| Status::internal("invalid child directory digest len"))?; + + match seen_directories_sizes.get(&child_directory_digest) { + None => { + return Err(Status::invalid_argument(format!( + "child directory '{:?}' ({}) in directory '{}' not seen yet", + child_directory.name, + &child_directory_digest, + &directory.digest(), + ))); + } + Some(seen_child_directory_size) => { + if seen_child_directory_size != &child_directory.size { + return Err(Status::invalid_argument(format!( + "child directory '{:?}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}", + child_directory.name, + &child_directory_digest, + &directory.digest(), + seen_child_directory_size, + child_directory.size, + ))); + } + } + } + } + + // NOTE: We can't know if a directory we're receiving actually is + // part of the closure, because we receive directories from the leaf nodes up to + // the root. + // The only thing we could to would be doing a final check when the + // last Directory was received, that all Directories received so far are + // reachable from that (root) node. + + let dgst = directory.digest(); + seen_directories_sizes.insert(dgst.clone(), directory.size()); + last_directory_dgst = Some(dgst.clone()); + + // check if the directory already exists in the database. We can skip + // inserting if it's already there, as that'd be a no-op. + match self.directory_service.get(&dgst).await { + Err(e) => { + warn!("error checking if directory already exists: {}", e); + return Err(e.into()); + } + // skip if already exists + Ok(Some(_)) => {} + // insert if it doesn't already exist + Ok(None) => { + self.directory_service.put(directory).await?; + } + } + } + + // We're done receiving. peek at last_directory_digest and either return the digest, + // or an error, if we received an empty stream. + match last_directory_dgst { + None => Err(Status::invalid_argument("no directories received")), + Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse { + root_digest: last_directory_dgst.into(), + })), + } + } +} diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs new file mode 100644 index 000000000000..0737f3f08443 --- /dev/null +++ b/tvix/castore/src/proto/mod.rs @@ -0,0 +1,279 @@ +#![allow(clippy::derive_partial_eq_without_eq, non_snake_case)] +// https://github.com/hyperium/tonic/issues/1056 +use data_encoding::BASE64; +use std::{collections::HashSet, iter::Peekable}; +use thiserror::Error; + +use prost::Message; + +mod grpc_blobservice_wrapper; +mod grpc_directoryservice_wrapper; + +pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; +pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; + +use crate::B3Digest; + +tonic::include_proto!("tvix.castore.v1"); + +#[cfg(feature = "tonic-reflection")] +/// Compiled file descriptors for implementing [gRPC +/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g. +/// [`tonic_reflection`](https://docs.rs/tonic-reflection). +pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.castore.v1"); + +#[cfg(test)] +mod tests; + +/// Errors that can occur during the validation of Directory messages. +#[derive(Debug, PartialEq, Eq, Error)] +pub enum ValidateDirectoryError { + /// Elements are not in sorted order + #[error("{} is not sorted", std::str::from_utf8(.0).unwrap_or(&BASE64.encode(.0)))] + WrongSorting(Vec<u8>), + /// Multiple elements with the same name encountered + #[error("{0:?} is a duplicate name")] + DuplicateName(Vec<u8>), + /// Invalid name encountered + #[error("Invalid name in {0:?}")] + InvalidName(Vec<u8>), + /// Invalid digest length encountered + #[error("Invalid Digest length: {0}")] + InvalidDigestLen(usize), +} + +/// Checks a Node name for validity as an intermediate node, and returns an +/// error that's generated from the supplied constructor. +/// +/// We disallow slashes, null bytes, '.', '..' and the empty string. +fn validate_node_name<E>(name: &[u8], err: fn(Vec<u8>) -> E) -> Result<(), E> { + if name.is_empty() + || name == b".." + || name == b"." + || name.contains(&0x00) + || name.contains(&b'/') + { + return Err(err(name.to_vec())); + } + Ok(()) +} + +/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] +/// and [node::Node], so we can ask all of them for the name easily. +pub trait NamedNode { + fn get_name(&self) -> &[u8]; +} + +impl NamedNode for &FileNode { + fn get_name(&self) -> &[u8] { + &self.name + } +} + +impl NamedNode for &DirectoryNode { + fn get_name(&self) -> &[u8] { + &self.name + } +} + +impl NamedNode for &SymlinkNode { + fn get_name(&self) -> &[u8] { + &self.name + } +} + +impl NamedNode for node::Node { + fn get_name(&self) -> &[u8] { + match self { + node::Node::File(node_file) => &node_file.name, + node::Node::Directory(node_directory) => &node_directory.name, + node::Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} + +impl node::Node { + /// Returns the node with a new name. + pub fn rename(self, name: bytes::Bytes) -> Self { + match self { + node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }), + node::Node::File(n) => node::Node::File(FileNode { name, ..n }), + node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }), + } + } +} + +/// Accepts a name, and a mutable reference to the previous name. +/// If the passed name is larger than the previous one, the reference is updated. +/// If it's not, an error is returned. +fn update_if_lt_prev<'n>( + prev_name: &mut &'n [u8], + name: &'n [u8], +) -> Result<(), ValidateDirectoryError> { + if *name < **prev_name { + return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); + } + *prev_name = name; + Ok(()) +} + +/// Inserts the given name into a HashSet if it's not already in there. +/// If it is, an error is returned. +fn insert_once<'n>( + seen_names: &mut HashSet<&'n [u8]>, + name: &'n [u8], +) -> Result<(), ValidateDirectoryError> { + if seen_names.get(name).is_some() { + return Err(ValidateDirectoryError::DuplicateName(name.to_vec())); + } + seen_names.insert(name); + Ok(()) +} + +impl Directory { + /// The size of a directory is the number of all regular and symlink elements, + /// the number of directory elements, and their size fields. + pub fn size(&self) -> u32 { + self.files.len() as u32 + + self.symlinks.len() as u32 + + self + .directories + .iter() + .fold(0, |acc: u32, e| (acc + 1 + e.size)) + } + + /// Calculates the digest of a Directory, which is the blake3 hash of a + /// Directory protobuf message, serialized in protobuf canonical form. + pub fn digest(&self) -> B3Digest { + let mut hasher = blake3::Hasher::new(); + + hasher + .update(&self.encode_to_vec()) + .finalize() + .as_bytes() + .into() + } + + /// validate checks the directory for invalid data, such as: + /// - violations of name restrictions + /// - invalid digest lengths + /// - not properly sorted lists + /// - duplicate names in the three lists + pub fn validate(&self) -> Result<(), ValidateDirectoryError> { + let mut seen_names: HashSet<&[u8]> = HashSet::new(); + + let mut last_directory_name: &[u8] = b""; + let mut last_file_name: &[u8] = b""; + let mut last_symlink_name: &[u8] = b""; + + // check directories + for directory_node in &self.directories { + validate_node_name(&directory_node.name, ValidateDirectoryError::InvalidName)?; + // ensure the digest has the appropriate size. + if TryInto::<B3Digest>::try_into(directory_node.digest.clone()).is_err() { + return Err(ValidateDirectoryError::InvalidDigestLen( + directory_node.digest.len(), + )); + } + + update_if_lt_prev(&mut last_directory_name, &directory_node.name)?; + insert_once(&mut seen_names, &directory_node.name)?; + } + + // check files + for file_node in &self.files { + validate_node_name(&file_node.name, ValidateDirectoryError::InvalidName)?; + if TryInto::<B3Digest>::try_into(file_node.digest.clone()).is_err() { + return Err(ValidateDirectoryError::InvalidDigestLen( + file_node.digest.len(), + )); + } + + update_if_lt_prev(&mut last_file_name, &file_node.name)?; + insert_once(&mut seen_names, &file_node.name)?; + } + + // check symlinks + for symlink_node in &self.symlinks { + validate_node_name(&symlink_node.name, ValidateDirectoryError::InvalidName)?; + + update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?; + insert_once(&mut seen_names, &symlink_node.name)?; + } + + Ok(()) + } + + /// Allows iterating over all three nodes ([DirectoryNode], [FileNode], + /// [SymlinkNode]) in an ordered fashion, as long as the individual lists + /// are sorted (which can be checked by the [Directory::validate]). + pub fn nodes(&self) -> DirectoryNodesIterator { + return DirectoryNodesIterator { + i_directories: self.directories.iter().peekable(), + i_files: self.files.iter().peekable(), + i_symlinks: self.symlinks.iter().peekable(), + }; + } +} + +/// Struct to hold the state of an iterator over all nodes of a Directory. +/// +/// Internally, this keeps peekable Iterators over all three lists of a +/// Directory message. +pub struct DirectoryNodesIterator<'a> { + // directory: &Directory, + i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>, + i_files: Peekable<std::slice::Iter<'a, FileNode>>, + i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>, +} + +/// looks at two elements implementing NamedNode, and returns true if "left +/// is smaller / comes first". +/// +/// Some(_) is preferred over None. +fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool { + match left { + // if left is None, right always wins + None => false, + Some(left_inner) => { + // left is Some. + match right { + // left is Some, right is None - left wins. + None => true, + Some(right_inner) => { + // both are Some - compare the name. + return left_inner.get_name() < right_inner.get_name(); + } + } + } + } +} + +impl Iterator for DirectoryNodesIterator<'_> { + type Item = node::Node; + + // next returns the next node in the Directory. + // we peek at all three internal iterators, and pick the one with the + // smallest name, to ensure lexicographical ordering. + // The individual lists are already known to be sorted. + fn next(&mut self) -> Option<Self::Item> { + if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) { + // i_directories is still in the game, compare with symlinks + if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) { + self.i_directories + .next() + .cloned() + .map(node::Node::Directory) + } else { + self.i_symlinks.next().cloned().map(node::Node::Symlink) + } + } else { + // i_files is still in the game, compare with symlinks + if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) { + self.i_files.next().cloned().map(node::Node::File) + } else { + self.i_symlinks.next().cloned().map(node::Node::Symlink) + } + } + } +} diff --git a/tvix/castore/src/proto/tests/directory.rs b/tvix/castore/src/proto/tests/directory.rs new file mode 100644 index 000000000000..eed49b2b593c --- /dev/null +++ b/tvix/castore/src/proto/tests/directory.rs @@ -0,0 +1,287 @@ +use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError}; +use lazy_static::lazy_static; + +lazy_static! { + static ref DUMMY_DIGEST: [u8; 32] = [ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + ]; +} +#[test] +fn size() { + { + let d = Directory::default(); + assert_eq!(d.size(), 0); + } + { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 0, + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } + { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 4, + }], + ..Default::default() + }; + assert_eq!(d.size(), 5); + } + { + let d = Directory { + files: vec![FileNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + executable: false, + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "foo".into(), + target: "bar".into(), + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } +} + +#[test] +fn digest() { + let d = Directory::default(); + + assert_eq!( + d.digest(), + vec![ + 0xaf, 0x13, 0x49, 0xb9, 0xf5, 0xf9, 0xa1, 0xa6, 0xa0, 0x40, 0x4d, 0xea, 0x36, 0xdc, + 0xc9, 0x49, 0x9b, 0xcb, 0x25, 0xc9, 0xad, 0xc1, 0x12, 0xb7, 0xcc, 0x9a, 0x93, 0xca, + 0xe4, 0x1f, 0x32, 0x62 + ] + .try_into() + .unwrap() + ) +} + +#[test] +fn validate_empty() { + let d = Directory::default(); + assert_eq!(d.validate(), Ok(())); +} + +#[test] +fn validate_invalid_names() { + { + let d = Directory { + directories: vec![DirectoryNode { + name: "".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidName(n) => { + assert_eq!(n, b"") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + directories: vec![DirectoryNode { + name: ".".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidName(n) => { + assert_eq!(n, b".") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + files: vec![FileNode { + name: "..".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + executable: false, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidName(n) => { + assert_eq!(n, b"..") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "\x00".into(), + target: "foo".into(), + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidName(n) => { + assert_eq!(n, b"\x00") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "foo/bar".into(), + target: "foo".into(), + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidName(n) => { + assert_eq!(n, b"foo/bar") + } + _ => panic!("unexpected error"), + }; + } +} + +#[test] +fn validate_invalid_digest() { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: vec![0x00, 0x42].into(), // invalid length + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidDigestLen(n) => { + assert_eq!(n, 2) + } + _ => panic!("unexpected error"), + } +} + +#[test] +fn validate_sorting() { + // "b" comes before "a", bad. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "b".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::WrongSorting(s) => { + assert_eq!(s, b"a"); + } + _ => panic!("unexpected error"), + } + } + + // "a" exists twice, bad. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::DuplicateName(s) => { + assert_eq!(s, b"a"); + } + _ => panic!("unexpected error"), + } + } + + // "a" comes before "b", all good. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "b".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + ..Default::default() + }; + + d.validate().expect("validate shouldn't error"); + } + + // [b, c] and [a] are both properly sorted. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "b".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "c".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + symlinks: vec![SymlinkNode { + name: "a".into(), + target: "foo".into(), + }], + ..Default::default() + }; + + d.validate().expect("validate shouldn't error"); + } +} diff --git a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs new file mode 100644 index 000000000000..68f147a33210 --- /dev/null +++ b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs @@ -0,0 +1,78 @@ +use crate::proto::Directory; +use crate::proto::DirectoryNode; +use crate::proto::FileNode; +use crate::proto::NamedNode; +use crate::proto::SymlinkNode; + +#[test] +fn iterator() { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "c".into(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "d".into(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "h".into(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "l".into(), + ..DirectoryNode::default() + }, + ], + files: vec![ + FileNode { + name: "b".into(), + ..FileNode::default() + }, + FileNode { + name: "e".into(), + ..FileNode::default() + }, + FileNode { + name: "g".into(), + ..FileNode::default() + }, + FileNode { + name: "j".into(), + ..FileNode::default() + }, + ], + symlinks: vec![ + SymlinkNode { + name: "a".into(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "f".into(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "i".into(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "k".into(), + ..SymlinkNode::default() + }, + ], + }; + + // We keep this strings here and convert to string to make the comparison + // less messy. + let mut node_names: Vec<String> = vec![]; + + for node in d.nodes() { + node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap()); + } + + assert_eq!( + vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"], + node_names + ); +} diff --git a/tvix/castore/src/proto/tests/grpc_blobservice.rs b/tvix/castore/src/proto/tests/grpc_blobservice.rs new file mode 100644 index 000000000000..a7816bd1e961 --- /dev/null +++ b/tvix/castore/src/proto/tests/grpc_blobservice.rs @@ -0,0 +1,94 @@ +use crate::fixtures::{BLOB_A, BLOB_A_DIGEST}; +use crate::proto::{BlobChunk, ReadBlobRequest, StatBlobRequest}; +use crate::utils::gen_blobsvc_grpc_client; +use tokio_stream::StreamExt; + +/// Trying to read a non-existent blob should return a not found error. +#[tokio::test] +async fn not_found_read() { + let mut grpc_client = gen_blobsvc_grpc_client().await; + + let resp = grpc_client + .read(ReadBlobRequest { + digest: BLOB_A_DIGEST.clone().into(), + }) + .await; + + // We can't use unwrap_err here, because the Ok value doesn't implement + // debug. + if let Err(e) = resp { + assert_eq!(e.code(), tonic::Code::NotFound); + } else { + panic!("resp is not err") + } +} + +/// Trying to stat a non-existent blob should return a not found error. +#[tokio::test] +async fn not_found_stat() { + let mut grpc_client = gen_blobsvc_grpc_client().await; + + let resp = grpc_client + .stat(StatBlobRequest { + digest: BLOB_A_DIGEST.clone().into(), + ..Default::default() + }) + .await + .expect_err("must fail"); + + // The resp should be a status with Code::NotFound + assert_eq!(resp.code(), tonic::Code::NotFound); +} + +/// Put a blob in the store, get it back. +#[tokio::test] +async fn put_read_stat() { + let mut grpc_client = gen_blobsvc_grpc_client().await; + + // Send blob A. + let put_resp = grpc_client + .put(tokio_stream::once(BlobChunk { + data: BLOB_A.clone(), + })) + .await + .expect("must succeed") + .into_inner(); + + assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest); + + // Stat for the digest of A. + // We currently don't ask for more granular chunking data, as we don't + // expose it yet. + let _resp = grpc_client + .stat(StatBlobRequest { + digest: BLOB_A_DIGEST.clone().into(), + ..Default::default() + }) + .await + .expect("must succeed") + .into_inner(); + + // Read the blob. It should return the same data. + let resp = grpc_client + .read(ReadBlobRequest { + digest: BLOB_A_DIGEST.clone().into(), + }) + .await; + + let mut rx = resp.ok().unwrap().into_inner(); + + // the stream should contain one element, a BlobChunk with the same contents as BLOB_A. + let item = rx + .next() + .await + .expect("must be some") + .expect("must succeed"); + + assert_eq!(BLOB_A.clone(), item.data); + + // … and no more elements + assert!(rx.next().await.is_none()); + + // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks. + // Test with some bigger blob too +} diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs new file mode 100644 index 000000000000..e443b4b1916a --- /dev/null +++ b/tvix/castore/src/proto/tests/grpc_directoryservice.rs @@ -0,0 +1,246 @@ +use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; +use crate::proto::directory_service_client::DirectoryServiceClient; +use crate::proto::get_directory_request::ByWhat; +use crate::proto::GetDirectoryRequest; +use crate::proto::{Directory, DirectoryNode, SymlinkNode}; +use crate::utils::gen_directorysvc_grpc_client; +use tokio_stream::StreamExt; +use tonic::transport::Channel; +use tonic::Status; + +/// Send the specified GetDirectoryRequest. +/// Returns an error in the case of an error response, or an error in one of +// the items in the stream, or a Vec<Directory> in the case of a successful +/// request. +async fn get_directories( + grpc_client: &mut DirectoryServiceClient<Channel>, + get_directory_request: GetDirectoryRequest, +) -> Result<Vec<Directory>, Status> { + let resp = grpc_client.get(get_directory_request).await; + + // if the response is an error itself, return the error, otherwise unpack + let stream = match resp { + Ok(resp) => resp, + Err(status) => return Err(status), + } + .into_inner(); + + let directory_results: Vec<Result<Directory, Status>> = stream.collect().await; + + // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status> + directory_results.into_iter().collect() +} + +/// Trying to get a non-existent Directory should return a not found error. +#[tokio::test] +async fn not_found() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + let resp = grpc_client + .get(GetDirectoryRequest { + by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())), + ..Default::default() + }) + .await; + + let stream = resp.expect("must succeed").into_inner(); + + let items: Vec<_> = stream.collect().await; + + // The stream should contain one element, an error with Code::NotFound. + assert_eq!(1, items.len()); + let item = items[0].clone(); + + assert!(item.is_err(), "must be err"); + assert_eq!( + tonic::Code::NotFound, + item.unwrap_err().code(), + "must be err" + ); +} + +/// Put a Directory into the store, get it back. +#[tokio::test] +async fn put_get() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + // send directory A. + let put_resp = { + grpc_client + .put(tokio_stream::once(DIRECTORY_A.clone())) + .await + .expect("must succeed") + .into_inner() + }; + + // the sent root_digest should match the calculated digest + assert_eq!(put_resp.root_digest, DIRECTORY_A.digest().to_vec()); + + // get it back + let items = get_directories( + &mut grpc_client, + GetDirectoryRequest { + by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())), + ..Default::default() + }, + ) + .await + .expect("must not error"); + + assert_eq!(vec![DIRECTORY_A.clone()], items); +} + +/// Put multiple Directories into the store, and get them back +#[tokio::test] +async fn put_get_multiple() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + // sending "b" (which refers to "a") without sending "a" first should fail. + let put_resp = { + grpc_client + .put(tokio_stream::once(DIRECTORY_B.clone())) + .await + .expect_err("must fail") + }; + + assert_eq!(tonic::Code::InvalidArgument, put_resp.code()); + + // sending "a", then "b" should succeed, and the response should contain the digest of b. + let put_resp = { + grpc_client + .put(tokio_stream::iter(vec![ + DIRECTORY_A.clone(), + DIRECTORY_B.clone(), + ])) + .await + .expect("must succeed") + .into_inner() + }; + + assert_eq!(DIRECTORY_B.digest().to_vec(), put_resp.root_digest); + + // now, request b, first in non-recursive mode. + let items = get_directories( + &mut grpc_client, + GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())), + }, + ) + .await + .expect("must not error"); + + // We expect to only get b. + assert_eq!(vec![DIRECTORY_B.clone()], items); + + // now, request b, but in recursive mode. + let items = get_directories( + &mut grpc_client, + GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())), + }, + ) + .await + .expect("must not error"); + + // We expect to get b, and then a, because that's how we traverse down. + assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items); +} + +/// Put multiple Directories into the store, and omit duplicates. +#[tokio::test] +async fn put_get_dedup() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + // Send "A", then "C", which refers to "A" two times + // Pretend we're a dumb client sending A twice. + let put_resp = { + grpc_client + .put(tokio_stream::iter(vec![ + DIRECTORY_A.clone(), + DIRECTORY_A.clone(), + DIRECTORY_C.clone(), + ])) + .await + .expect("must succeed") + }; + + assert_eq!( + DIRECTORY_C.digest().to_vec(), + put_resp.into_inner().root_digest + ); + + // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice. + let items = get_directories( + &mut grpc_client, + GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().into())), + }, + ) + .await + .expect("must not error"); + + // We expect to get C, and then A (once, as the second A has been deduplicated). + assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items); +} + +/// Trying to upload a Directory failing validation should fail. +#[tokio::test] +async fn put_reject_failed_validation() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + // construct a broken Directory message that fails validation + let broken_directory = Directory { + symlinks: vec![SymlinkNode { + name: "".into(), + target: "doesntmatter".into(), + }], + ..Default::default() + }; + assert!(broken_directory.validate().is_err()); + + // send it over, it must fail + let put_resp = { + grpc_client + .put(tokio_stream::once(broken_directory)) + .await + .expect_err("must fail") + }; + + assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); +} + +/// Trying to upload a Directory with wrong size should fail. +#[tokio::test] +async fn put_reject_wrong_size() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + // Construct a directory referring to DIRECTORY_A, but with wrong size. + let broken_parent_directory = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DIRECTORY_A.digest().into(), + size: 42, + }], + ..Default::default() + }; + // Make sure we got the size wrong. + assert_ne!( + broken_parent_directory.directories[0].size, + DIRECTORY_A.size() + ); + + // now upload both (first A, then the broken parent). This must fail. + let put_resp = { + grpc_client + .put(tokio_stream::iter(vec![ + DIRECTORY_A.clone(), + broken_parent_directory, + ])) + .await + .expect_err("must fail") + }; + assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); +} diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs new file mode 100644 index 000000000000..8b62fadeb5a6 --- /dev/null +++ b/tvix/castore/src/proto/tests/mod.rs @@ -0,0 +1,4 @@ +mod directory; +mod directory_nodes_iterator; +mod grpc_blobservice; +mod grpc_directoryservice; diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs new file mode 100644 index 000000000000..77ed6d21c07a --- /dev/null +++ b/tvix/castore/src/tests/import.rs @@ -0,0 +1,124 @@ +use crate::fixtures::*; +use crate::import::ingest_path; +use crate::proto; +use crate::utils::{gen_blob_service, gen_directory_service}; +use tempfile::TempDir; + +#[cfg(target_family = "unix")] +use std::os::unix::ffi::OsStrExt; + +#[cfg(target_family = "unix")] +#[tokio::test] +async fn symlink() { + let tmpdir = TempDir::new().unwrap(); + + std::fs::create_dir_all(&tmpdir).unwrap(); + std::os::unix::fs::symlink( + "/nix/store/somewhereelse", + tmpdir.path().join("doesntmatter"), + ) + .unwrap(); + + let root_node = ingest_path( + gen_blob_service(), + gen_directory_service(), + tmpdir.path().join("doesntmatter"), + ) + .await + .expect("must succeed"); + + assert_eq!( + proto::node::Node::Symlink(proto::SymlinkNode { + name: "doesntmatter".into(), + target: "/nix/store/somewhereelse".into(), + }), + root_node, + ) +} + +#[tokio::test] +async fn single_file() { + let tmpdir = TempDir::new().unwrap(); + + std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap(); + + let blob_service = gen_blob_service(); + + let root_node = ingest_path( + blob_service.clone(), + gen_directory_service(), + tmpdir.path().join("root"), + ) + .await + .expect("must succeed"); + + assert_eq!( + proto::node::Node::File(proto::FileNode { + name: "root".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + root_node, + ); + + // ensure the blob has been uploaded + assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap()); +} + +#[cfg(target_family = "unix")] +#[tokio::test] +async fn complicated() { + let tmpdir = TempDir::new().unwrap(); + + // File ``.keep` + std::fs::write(tmpdir.path().join(".keep"), vec![]).unwrap(); + // Symlink `aa` + std::os::unix::fs::symlink("/nix/store/somewhereelse", tmpdir.path().join("aa")).unwrap(); + // Directory `keep` + std::fs::create_dir(tmpdir.path().join("keep")).unwrap(); + // File ``keep/.keep` + std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap(); + + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + + let root_node = ingest_path( + blob_service.clone(), + directory_service.clone(), + tmpdir.path(), + ) + .await + .expect("must succeed"); + + // ensure root_node matched expectations + assert_eq!( + proto::node::Node::Directory(proto::DirectoryNode { + name: tmpdir + .path() + .file_name() + .unwrap() + .as_bytes() + .to_owned() + .into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + root_node, + ); + + // ensure DIRECTORY_WITH_KEEP and DIRECTORY_COMPLICATED have been uploaded + assert!(directory_service + .get(&DIRECTORY_WITH_KEEP.digest()) + .await + .unwrap() + .is_some()); + assert!(directory_service + .get(&DIRECTORY_COMPLICATED.digest()) + .await + .unwrap() + .is_some()); + + // ensure EMPTY_BLOB_CONTENTS has been uploaded + assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap()); +} diff --git a/tvix/castore/src/tests/mod.rs b/tvix/castore/src/tests/mod.rs new file mode 100644 index 000000000000..d016f3e0aa55 --- /dev/null +++ b/tvix/castore/src/tests/mod.rs @@ -0,0 +1 @@ +mod import; diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs new file mode 100644 index 000000000000..d9a967598841 --- /dev/null +++ b/tvix/castore/src/utils.rs @@ -0,0 +1,107 @@ +//! A crate containing constructors to provide instances of a BlobService and +//! DirectoryService. +//! Only used for testing purposes, but across crates. +//! Should be removed once we have a better concept of a "Service registry". + +use pin_project_lite::pin_project; +use std::sync::Arc; +use tokio::io::DuplexStream; +use tonic::transport::{Channel, Endpoint, Server, Uri}; + +use crate::{ + blobservice::{BlobService, MemoryBlobService}, + directoryservice::{DirectoryService, MemoryDirectoryService}, + proto::{ + blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer, + directory_service_client::DirectoryServiceClient, + directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper, + GRPCDirectoryServiceWrapper, + }, +}; + +pub fn gen_blob_service() -> Arc<dyn BlobService> { + Arc::new(MemoryBlobService::default()) +} + +pub fn gen_directory_service() -> Arc<dyn DirectoryService> { + Arc::new(MemoryDirectoryService::default()) +} + +pin_project! { + /// A wrapper around [DuplexStreamStream], + /// implementing [AsyncRead] and [Connected]. + pub struct DuplexStreamWrapper { + #[pin] + inner: DuplexStream + } +} + +/// This will spawn the a gRPC server with a DirectoryService client, connect a +/// gRPC DirectoryService client and return it. +#[allow(dead_code)] +pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> { + let (left, right) = tokio::io::duplex(64); + + // spin up a server, which will only connect once, to the left side. + tokio::spawn(async { + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(gen_directory_service()), + )); + + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await + }); + + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + let grpc_client = DirectoryServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), + ); + + grpc_client +} + +/// This will spawn the a gRPC server with a BlobService client, connect a +/// gRPC BlobService client and return it. +#[allow(dead_code)] +pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> { + let (left, right) = tokio::io::duplex(64); + + // spin up a server, which will only connect once, to the left side. + tokio::spawn(async { + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( + gen_blob_service(), + ))); + + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await + }); + + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + let grpc_client = BlobServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), + ); + + grpc_client +} |