diff options
Diffstat (limited to 'tvix/castore/src')
41 files changed, 7873 insertions, 0 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs new file mode 100644 index 000000000000..d75cad62a629 --- /dev/null +++ b/tvix/castore/src/blobservice/combinator.rs @@ -0,0 +1,136 @@ +use data_encoding::BASE64; +use futures::{StreamExt, TryStreamExt}; +use tokio_util::io::{ReaderStream, StreamReader}; +use tonic::async_trait; +use tracing::{instrument, warn}; + +use crate::B3Digest; + +use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; + +/// Combinator for a BlobService, using a "local" and "remote" blobservice. +/// Requests are tried in (and returned from) the local store first, only if +/// things are not present there, the remote BlobService is queried. +/// In case the local blobservice doesn't have the blob, we ask the remote +/// blobservice for chunks, and try to read each of these chunks from the local +/// blobservice again, before falling back to the remote one. +/// The remote BlobService is never written to. +pub struct CombinedBlobService<BL, BR> { + local: BL, + remote: BR, +} + +impl<BL, BR> Clone for CombinedBlobService<BL, BR> +where + BL: Clone, + BR: Clone, +{ + fn clone(&self) -> Self { + Self { + local: self.local.clone(), + remote: self.remote.clone(), + } + } +} + +#[async_trait] +impl<BL, BR> BlobService for CombinedBlobService<BL, BR> +where + BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, + BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, +{ + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> { + Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?) + } + + #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> { + if self.local.as_ref().has(digest).await? { + // local store has the blob, so we can assume it also has all chunks. + self.local.as_ref().open_read(digest).await + } else { + // Local store doesn't have the blob. + // Ask the remote one for the list of chunks, + // and create a chunked reader that uses self.open_read() for + // individual chunks. There's a chance we already have some chunks + // locally, meaning we don't need to fetch them all from the remote + // BlobService. + match self.remote.as_ref().chunks(digest).await? { + // blob doesn't exist on the remote side either, nothing we can do. + None => Ok(None), + Some(remote_chunks) => { + // if there's no more granular chunks, or the remote + // blobservice doesn't support chunks, read the blob from + // the remote blobservice directly. + if remote_chunks.is_empty() { + return self.remote.as_ref().open_read(digest).await; + } + // otherwise, a chunked reader, which will always try the + // local backend first. + + // map Vec<ChunkMeta> to Vec<(B3Digest, u64)> + let chunks: Vec<(B3Digest, u64)> = remote_chunks + .into_iter() + .map(|chunk_meta| { + ( + B3Digest::try_from(chunk_meta.digest) + .expect("invalid chunk digest"), + chunk_meta.size, + ) + }) + .collect(); + + Ok(Some(make_chunked_reader(self.clone(), chunks))) + } + } + } + } + + #[instrument(skip_all)] + async fn open_write(&self) -> Box<dyn BlobWriter> { + // direct writes to the local one. + self.local.as_ref().open_write().await + } +} + +fn make_chunked_reader<BS>( + // This must consume, as we can't retain references to blob_service, + // as it'd add a lifetime to BlobReader in general, which will get + // problematic in TvixStoreFs, which is using async move closures and cloning. + blob_service: BS, + // A list of b3 digests for individual chunks, and their sizes. + chunks: Vec<(B3Digest, u64)>, +) -> Box<dyn BlobReader> +where + BS: BlobService + Clone + 'static, +{ + // TODO: offset, verified streaming + + // construct readers for each chunk + let blob_service = blob_service.clone(); + let readers_stream = tokio_stream::iter(chunks).map(move |(digest, _)| { + let d = digest.to_owned(); + let blob_service = blob_service.clone(); + async move { + blob_service.open_read(&d.to_owned()).await?.ok_or_else(|| { + warn!( + chunk.digest = BASE64.encode(digest.as_slice()), + "chunk not found" + ); + std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found") + }) + } + }); + + // convert the stream of readers to a stream of streams of byte chunks + let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) }); + + // flatten into one stream of byte chunks + let bytes_stream = bytes_streams.try_flatten(); + + // convert into AsyncRead + let blob_reader = StreamReader::new(bytes_stream); + + Box::new(NaiveSeeker::new(Box::pin(blob_reader))) +} diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs new file mode 100644 index 000000000000..db221f05ab3b --- /dev/null +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -0,0 +1,121 @@ +use url::Url; + +use crate::{proto::blob_service_client::BlobServiceClient, Error}; + +use super::{ + BlobService, GRPCBlobService, MemoryBlobService, SimpleFilesystemBlobService, SledBlobService, +}; + +/// Constructs a new instance of a [BlobService] from an URI. +/// +/// The following schemes are supported by the following services: +/// - `memory://` ([MemoryBlobService]) +/// - `sled://` ([SledBlobService]) +/// - `grpc+*://` ([GRPCBlobService]) +/// - `simplefs://` ([SimpleFilesystemBlobService]) +/// +/// See their `from_url` methods for more details about their syntax. +pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error> { + let url = Url::parse(uri) + .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; + + Ok(if url.scheme() == "memory" { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string())); + } + Box::<MemoryBlobService>::default() + } else if url.scheme() == "sled" { + // sled doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string())); + } + + if url.path() == "/" { + return Err(Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )); + } + + // TODO: expose other parameters as URL parameters? + + if url.path().is_empty() { + return Ok(Box::new( + SledBlobService::new_temporary().map_err(|e| Error::StorageError(e.to_string()))?, + )); + } + return Ok(Box::new( + SledBlobService::new(url.path()).map_err(|e| Error::StorageError(e.to_string()))?, + )); + } else if url.scheme().starts_with("grpc+") { + // schemes starting with grpc+ go to the GRPCPathInfoService. + // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?); + Box::new(GRPCBlobService::from_client(client)) + } else if url.scheme() == "simplefs" { + if url.path().is_empty() { + return Err(Error::StorageError("Invalid filesystem path".to_string())); + } + + Box::new(SimpleFilesystemBlobService::new(url.path().into()).await?) + } else { + Err(crate::Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))? + }) +} + +#[cfg(test)] +mod tests { + use super::from_addr; + use lazy_static::lazy_static; + use tempfile::TempDir; + use test_case::test_case; + + lazy_static! { + static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); + } + + /// This uses an unsupported scheme. + #[test_case("http://foo.example/test", false; "unsupported scheme")] + /// This configures sled in temporary mode. + #[test_case("sled://", true; "sled valid temporary")] + /// This configures sled with /, which should fail. + #[test_case("sled:///", false; "sled invalid root")] + /// This configures sled with a host, not path, which should fail. + #[test_case("sled://foo.example", false; "sled invalid host")] + /// This configures sled with a valid path path, which should succeed. + #[test_case(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true; "sled valid path")] + /// This configures sled with a host, and a valid path path, which should fail. + #[test_case(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false; "sled invalid host with valid path")] + /// This correctly sets the scheme, and doesn't set a path. + #[test_case("memory://", true; "memory valid")] + /// This sets a memory url host to `foo` + #[test_case("memory://foo", false; "memory invalid host")] + /// This sets a memory url path to "/", which is invalid. + #[test_case("memory:///", false; "memory invalid root path")] + /// This sets a memory url path to "/foo", which is invalid. + #[test_case("memory:///foo", false; "memory invalid root path foo")] + /// Correct scheme to connect to a unix socket. + #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")] + /// Correct scheme for unix socket, but setting a host too, which is invalid. + #[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")] + /// Correct scheme to connect to localhost, with port 12345 + #[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[test_case("grpc+http://localhost", true; "grpc valid http host without port")] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[test_case("grpc+https://localhost", true; "grpc valid https host without port")] + /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. + #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")] + #[tokio::test] + async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) { + assert_eq!(from_addr(uri_str).await.is_ok(), is_ok) + } +} diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs new file mode 100644 index 000000000000..d98a9b517724 --- /dev/null +++ b/tvix/castore/src/blobservice/grpc.rs @@ -0,0 +1,316 @@ +use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; +use crate::{ + proto::{self, stat_blob_response::ChunkMeta}, + B3Digest, +}; +use futures::sink::SinkExt; +use std::{io, pin::pin, task::Poll}; +use tokio::io::AsyncWriteExt; +use tokio::task::JoinHandle; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_util::{ + io::{CopyToBytes, SinkWriter}, + sync::PollSender, +}; +use tonic::{async_trait, transport::Channel, Code, Status}; +use tracing::{instrument, warn}; + +/// Connects to a (remote) tvix-store BlobService over gRPC. +#[derive(Clone)] +pub struct GRPCBlobService { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, +} + +impl GRPCBlobService { + /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + ) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl BlobService for GRPCBlobService { + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> io::Result<bool> { + let mut grpc_client = self.grpc_client.clone(); + let resp = grpc_client + .stat(proto::StatBlobRequest { + digest: digest.clone().into(), + ..Default::default() + }) + .await; + + match resp { + Ok(_blob_meta) => Ok(true), + Err(e) if e.code() == Code::NotFound => Ok(false), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + } + } + + #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { + // Get a stream of [proto::BlobChunk], or return an error if the blob + // doesn't exist. + match self + .grpc_client + .clone() + .read(proto::ReadBlobRequest { + digest: digest.clone().into(), + }) + .await + { + Ok(stream) => { + // on success, this is a stream of tonic::Result<proto::BlobChunk>, + // so access .data and map errors into std::io::Error. + let data_stream = stream.into_inner().map(|e| { + e.map(|c| c.data) + .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s)) + }); + + // Use StreamReader::new to convert to an AsyncRead. + let data_reader = tokio_util::io::StreamReader::new(data_stream); + + Ok(Some(Box::new(NaiveSeeker::new(data_reader)))) + } + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + } + } + + /// Returns a BlobWriter, that'll internally wrap each write in a + /// [proto::BlobChunk], which is send to the gRPC server. + #[instrument(skip_all)] + async fn open_write(&self) -> Box<dyn BlobWriter> { + // set up an mpsc channel passing around Bytes. + let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); + + // bytes arriving on the RX side are wrapped inside a + // [proto::BlobChunk], and a [ReceiverStream] is constructed. + let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); + + // spawn the gRPC put request, which will read from blobchunk_stream. + let task = tokio::spawn({ + let mut grpc_client = self.grpc_client.clone(); + async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) } + }); + + // The tx part of the channel is converted to a sink of byte chunks. + let sink = PollSender::new(tx) + .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)); + + // … which is turned into an [tokio::io::AsyncWrite]. + let writer = SinkWriter::new(CopyToBytes::new(sink)); + + Box::new(GRPCBlobWriter { + task_and_writer: Some((task, writer)), + digest: None, + }) + } + + #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { + let resp = self + .grpc_client + .clone() + .stat(proto::StatBlobRequest { + digest: digest.clone().into(), + send_chunks: true, + ..Default::default() + }) + .await; + + match resp { + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + Ok(resp) => { + let resp = resp.into_inner(); + + resp.validate() + .map_err(|e| std::io::Error::new(io::ErrorKind::InvalidData, e))?; + + if resp.chunks.is_empty() { + warn!("chunk list is empty"); + } + Ok(Some(resp.chunks)) + } + } + } +} + +pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> { + /// The task containing the put request, and the inner writer, if we're still writing. + task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, +} + +#[async_trait] +impl<W: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static> BlobWriter for GRPCBlobWriter<W> { + async fn close(&mut self) -> io::Result<B3Digest> { + if self.task_and_writer.is_none() { + // if we're already closed, return the b3 digest, which must exist. + // If it doesn't, we already closed and failed once, and didn't handle the error. + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "already closed")), + } + } else { + let (task, mut writer) = self.task_and_writer.take().unwrap(); + + // invoke shutdown, so the inner writer closes its internal tx side of + // the channel. + writer.shutdown().await?; + + // block on the RPC call to return. + // This ensures all chunks are sent out, and have been received by the + // backend. + + match task.await? { + Ok(resp) => { + // return the digest from the response, and store it in self.digest for subsequent closes. + let digest_len = resp.digest.len(); + let digest: B3Digest = resp.digest.try_into().map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + format!("invalid root digest length {} in response", digest_len), + ) + })?; + self.digest = Some(digest.clone()); + Ok(digest) + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), + } + } + } +} + +impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for GRPCBlobWriter<W> { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<Result<usize, io::Error>> { + match &mut self.task_and_writer { + None => Poll::Ready(Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + ))), + Some((_, ref mut writer)) => { + let pinned_writer = pin!(writer); + pinned_writer.poll_write(cx, buf) + } + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + match &mut self.task_and_writer { + None => Poll::Ready(Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + ))), + Some((_, ref mut writer)) => { + let pinned_writer = pin!(writer); + pinned_writer.poll_flush(cx) + } + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + // TODO(raitobezarius): this might not be a graceful shutdown of the + // channel inside the gRPC connection. + Poll::Ready(Ok(())) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio_retry::strategy::ExponentialBackoff; + use tokio_retry::Retry; + use tokio_stream::wrappers::UnixListenerStream; + + use crate::blobservice::MemoryBlobService; + use crate::fixtures; + use crate::proto::blob_service_client::BlobServiceClient; + use crate::proto::GRPCBlobServiceWrapper; + + use super::BlobService; + use super::GRPCBlobService; + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("daemon"); + + let path_clone = socket_path.clone(); + + // Spin up a server + tokio::spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = + server.add_service(crate::proto::blob_service_server::BlobServiceServer::new( + GRPCBlobServiceWrapper::new( + Box::<MemoryBlobService>::default() as Box<dyn BlobService> + ), + )); + router.serve_with_incoming(uds_stream).await + }); + + // wait for the socket to be created + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // prepare a client + let grpc_client = { + let url = url::Url::parse(&format!( + "grpc+unix://{}?wait-connect=1", + socket_path.display() + )) + .expect("must parse"); + let client = BlobServiceClient::new( + crate::tonic::channel_from_url(&url) + .await + .expect("must succeed"), + ); + + GRPCBlobService::from_client(client) + }; + + let has = grpc_client + .has(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not be err"); + + assert!(!has); + } +} diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs new file mode 100644 index 000000000000..25eec334de60 --- /dev/null +++ b/tvix/castore/src/blobservice/memory.rs @@ -0,0 +1,137 @@ +use std::io::{self, Cursor, Write}; +use std::task::Poll; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; +use tonic::async_trait; +use tracing::instrument; + +use super::{BlobReader, BlobService, BlobWriter}; +use crate::B3Digest; + +#[derive(Clone, Default)] +pub struct MemoryBlobService { + db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, +} + +#[async_trait] +impl BlobService for MemoryBlobService { + #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> io::Result<bool> { + let db = self.db.read().unwrap(); + Ok(db.contains_key(digest)) + } + + #[instrument(skip_all, err, fields(blob.digest=%digest))] + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { + let db = self.db.read().unwrap(); + + match db.get(digest).map(|x| Cursor::new(x.clone())) { + Some(result) => Ok(Some(Box::new(result))), + None => Ok(None), + } + } + + #[instrument(skip_all)] + async fn open_write(&self) -> Box<dyn BlobWriter> { + Box::new(MemoryBlobWriter::new(self.db.clone())) + } +} + +pub struct MemoryBlobWriter { + db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, + + /// Contains the buffer Vec and hasher, or None if already closed + writers: Option<(Vec<u8>, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, +} + +impl MemoryBlobWriter { + fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self { + Self { + db, + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, + } + } +} +impl tokio::io::AsyncWrite for MemoryBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + b: &[u8], + ) -> std::task::Poll<Result<usize, io::Error>> { + Poll::Ready(match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&b[..bytes_written]) + } + }) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + Poll::Ready(match self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + }) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + // shutdown is "instantaneous", we only write to memory. + Poll::Ready(Ok(())) + } +} + +#[async_trait] +impl BlobWriter for MemoryBlobWriter { + async fn close(&mut self) -> io::Result<B3Digest> { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "already closed")), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + // We know self.hasher is doing blake3 hashing, so this won't fail. + let digest: B3Digest = hasher.finalize().as_bytes().into(); + + // Only insert if the blob doesn't already exist. + let db = self.db.read().map_err(|e| { + io::Error::new(io::ErrorKind::BrokenPipe, format!("RwLock poisoned: {}", e)) + })?; + if !db.contains_key(&digest) { + // drop the read lock, so we can open for writing. + drop(db); + + // open the database for writing. + let mut db = self.db.write().map_err(|e| { + io::Error::new(io::ErrorKind::BrokenPipe, format!("RwLock poisoned: {}", e)) + })?; + + // and put buf in there. This will move buf out. + db.insert(digest.clone(), buf); + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } + } +} diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs new file mode 100644 index 000000000000..fa6b87926ba1 --- /dev/null +++ b/tvix/castore/src/blobservice/mod.rs @@ -0,0 +1,79 @@ +use std::io; +use tonic::async_trait; + +use crate::proto::stat_blob_response::ChunkMeta; +use crate::B3Digest; + +mod combinator; +mod from_addr; +mod grpc; +mod memory; +mod naive_seeker; +mod simplefs; +mod sled; + +#[cfg(test)] +mod tests; + +pub use self::combinator::CombinedBlobService; +pub use self::from_addr::from_addr; +pub use self::grpc::GRPCBlobService; +pub use self::memory::MemoryBlobService; +pub use self::simplefs::SimpleFilesystemBlobService; +pub use self::sled::SledBlobService; + +/// The base trait all BlobService services need to implement. +/// It provides functions to check whether a given blob exists, +/// a way to read (and seek) a blob, and a method to create a blobwriter handle, +/// which will implement a writer interface, and also provides a close funtion, +/// to finalize a blob and get its digest. +#[async_trait] +pub trait BlobService: Send + Sync { + /// Check if the service has the blob, by its content hash. + /// On implementations returning chunks, this must also work for chunks. + async fn has(&self, digest: &B3Digest) -> io::Result<bool>; + + /// Request a blob from the store, by its content hash. + /// On implementations returning chunks, this must also work for chunks. + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>>; + + /// Insert a new blob into the store. Returns a [BlobWriter], which + /// implements [tokio::io::AsyncWrite] and a [BlobWriter::close] to finalize + /// the blob and get its digest. + async fn open_write(&self) -> Box<dyn BlobWriter>; + + /// Return a list of chunks for a given blob. + /// There's a distinction between returning Ok(None) and Ok(Some(vec![])). + /// The former return value is sent in case the blob is not present at all, + /// while the second one is sent in case there's no more granular chunks (or + /// the backend does not support chunking). + /// A default implementation checking for existence and then returning it + /// does not have more granular chunks available is provided. + async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { + if !self.has(digest).await? { + return Ok(None); + } else { + // default implementation, signalling the backend does not have more + // granular chunks available. + return Ok(Some(vec![])); + } + } +} + +/// A [tokio::io::AsyncWrite] that the user needs to close() afterwards for persist. +/// On success, it returns the digest of the written blob. +#[async_trait] +pub trait BlobWriter: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static { + /// Signal there's no more data to be written, and return the digest of the + /// contents written. + /// + /// Closing a already-closed BlobWriter is a no-op. + async fn close(&mut self) -> io::Result<B3Digest>; +} + +/// BlobReader is a [tokio::io::AsyncRead] that also allows seeking. +pub trait BlobReader: tokio::io::AsyncRead + tokio::io::AsyncSeek + Send + Unpin + 'static {} + +/// A [`io::Cursor<Vec<u8>>`] can be used as a BlobReader. +impl BlobReader for io::Cursor<Vec<u8>> {} +impl BlobReader for tokio::fs::File {} diff --git a/tvix/castore/src/blobservice/naive_seeker.rs b/tvix/castore/src/blobservice/naive_seeker.rs new file mode 100644 index 000000000000..e65a82c7f45a --- /dev/null +++ b/tvix/castore/src/blobservice/naive_seeker.rs @@ -0,0 +1,269 @@ +use super::BlobReader; +use pin_project_lite::pin_project; +use std::io; +use std::task::Poll; +use tokio::io::AsyncRead; +use tracing::{debug, instrument}; + +pin_project! { + /// This implements [tokio::io::AsyncSeek] for and [tokio::io::AsyncRead] by + /// simply skipping over some bytes, keeping track of the position. + /// It fails whenever you try to seek backwards. + /// + /// ## Pinning concerns: + /// + /// [NaiveSeeker] is itself pinned by callers, and we do not need to concern + /// ourselves regarding that. + /// + /// Though, its fields as per + /// <https://doc.rust-lang.org/std/pin/#pinning-is-not-structural-for-field> + /// can be pinned or unpinned. + /// + /// So we need to go over each field and choose our policy carefully. + /// + /// The obvious cases are the bookkeeping integers we keep in the structure, + /// those are private and not shared to anyone, we never build a + /// `Pin<&mut X>` out of them at any point, therefore, we can safely never + /// mark them as pinned. Of course, it is expected that no developer here + /// attempt to `pin!(self.pos)` to pin them because it makes no sense. If + /// they have to become pinned, they should be marked `#[pin]` and we need + /// to discuss it. + /// + /// So the bookkeeping integers are in the right state with respect to their + /// pinning status. The projection should offer direct access. + /// + /// On the `r` field, i.e. a `BufReader<R>`, given that + /// <https://docs.rs/tokio/latest/tokio/io/struct.BufReader.html#impl-Unpin-for-BufReader%3CR%3E> + /// is available, even a `Pin<&mut BufReader<R>>` can be safely moved. + /// + /// The only care we should have regards the internal reader itself, i.e. + /// the `R` instance, see that Tokio decided to `#[pin]` it too: + /// <https://docs.rs/tokio/latest/src/tokio/io/util/buf_reader.rs.html#29> + /// + /// In general, there's no `Unpin` instance for `R: tokio::io::AsyncRead` + /// (see <https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html>). + /// + /// Therefore, we could keep it unpinned and pin it in every call site + /// whenever we need to call `poll_*` which can be confusing to the non- + /// expert developer and we have a fair share amount of situations where the + /// [BufReader] instance is naked, i.e. in its `&mut BufReader<R>` + /// form, this is annoying because it could lead to expose the naked `R` + /// internal instance somehow and would produce a risk of making it move + /// unexpectedly. + /// + /// We choose the path of the least resistance as we have no reason to have + /// access to the raw `BufReader<R>` instance, we just `#[pin]` it too and + /// enjoy its `poll_*` safe APIs and push the unpinning concerns to the + /// internal implementations themselves, which studied the question longer + /// than us. + pub struct NaiveSeeker<R: tokio::io::AsyncRead> { + #[pin] + r: tokio::io::BufReader<R>, + pos: u64, + bytes_to_skip: u64, + } +} + +impl<R: tokio::io::AsyncRead> NaiveSeeker<R> { + pub fn new(r: R) -> Self { + NaiveSeeker { + r: tokio::io::BufReader::new(r), + pos: 0, + bytes_to_skip: 0, + } + } +} + +impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<std::io::Result<()>> { + // The amount of data read can be determined by the increase + // in the length of the slice returned by `ReadBuf::filled`. + let filled_before = buf.filled().len(); + let this = self.project(); + let pos: &mut u64 = this.pos; + + match this.r.poll_read(cx, buf) { + Poll::Ready(a) => { + let bytes_read = buf.filled().len() - filled_before; + *pos += bytes_read as u64; + + Poll::Ready(a) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> { + fn poll_fill_buf( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<io::Result<&[u8]>> { + self.project().r.poll_fill_buf(cx) + } + + fn consume(self: std::pin::Pin<&mut Self>, amt: usize) { + let this = self.project(); + this.r.consume(amt); + let pos: &mut u64 = this.pos; + *pos += amt as u64; + } +} + +impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> { + #[instrument(skip(self))] + fn start_seek( + self: std::pin::Pin<&mut Self>, + position: std::io::SeekFrom, + ) -> std::io::Result<()> { + let absolute_offset: u64 = match position { + io::SeekFrom::Start(start_offset) => { + if start_offset < self.pos { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + format!("can't seek backwards ({} -> {})", self.pos, start_offset), + )); + } else { + start_offset + } + } + // we don't know the total size, can't support this. + io::SeekFrom::End(_end_offset) => { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "can't seek from end", + )); + } + io::SeekFrom::Current(relative_offset) => { + if relative_offset < 0 { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "can't seek backwards relative to current position", + )); + } else { + self.pos + relative_offset as u64 + } + } + }; + + debug!(absolute_offset=?absolute_offset, "seek"); + + // we already know absolute_offset is larger than self.pos + debug_assert!( + absolute_offset >= self.pos, + "absolute_offset {} is larger than self.pos {}", + absolute_offset, + self.pos + ); + + // calculate bytes to skip + *self.project().bytes_to_skip = absolute_offset - self.pos; + + Ok(()) + } + + #[instrument(skip(self))] + fn poll_complete( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<std::io::Result<u64>> { + if self.bytes_to_skip == 0 { + // return the new position (from the start of the stream) + return Poll::Ready(Ok(self.pos)); + } + + // discard some bytes, until pos is where we want it to be. + // We create a buffer that we'll discard later on. + let mut buf = [0; 1024]; + + // Loop until we've reached the desired seek position. This is done by issuing repeated + // `poll_read` calls. If the data is not available yet, we will yield back to the executor + // and wait to be polled again. + loop { + // calculate the length we want to skip at most, which is either a max + // buffer size, or the number of remaining bytes to read, whatever is + // smaller. + let bytes_to_skip = std::cmp::min(self.bytes_to_skip as usize, buf.len()); + + let mut read_buf = tokio::io::ReadBuf::new(&mut buf[..bytes_to_skip]); + + match self.as_mut().poll_read(cx, &mut read_buf) { + Poll::Ready(_a) => { + let bytes_read = read_buf.filled().len() as u64; + + if bytes_read == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!( + "tried to skip {} bytes, but only was able to skip {} until reaching EOF", + bytes_to_skip, bytes_read + ), + ))); + } + + // calculate bytes to skip + let bytes_to_skip = self.bytes_to_skip - bytes_read; + + *self.as_mut().project().bytes_to_skip = bytes_to_skip; + + if bytes_to_skip == 0 { + return Poll::Ready(Ok(self.pos)); + } + } + Poll::Pending => return Poll::Pending, + }; + } + } +} + +impl<R: tokio::io::AsyncRead + Send + Unpin + 'static> BlobReader for NaiveSeeker<R> {} + +#[cfg(test)] +mod tests { + use super::NaiveSeeker; + use std::io::{Cursor, SeekFrom}; + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + /// This seek requires multiple `poll_read` as we use a 1024 bytes internal + /// buffer when doing the seek. + /// This ensures we don't hang indefinitely. + #[tokio::test] + async fn seek() { + let buf = vec![0u8; 4096]; + let reader = Cursor::new(&buf); + let mut seeker = NaiveSeeker::new(reader); + seeker.seek(SeekFrom::Start(4000)).await.unwrap(); + } + + #[tokio::test] + async fn seek_read() { + let mut buf = vec![0u8; 2048]; + buf.extend_from_slice(&[1u8; 2048]); + buf.extend_from_slice(&[2u8; 2048]); + + let reader = Cursor::new(&buf); + let mut seeker = NaiveSeeker::new(reader); + + let mut read_buf = vec![0u8; 1024]; + seeker.read_exact(&mut read_buf).await.expect("must read"); + assert_eq!(read_buf.as_slice(), &[0u8; 1024]); + + seeker + .seek(SeekFrom::Current(1024)) + .await + .expect("must seek"); + seeker.read_exact(&mut read_buf).await.expect("must read"); + assert_eq!(read_buf.as_slice(), &[1u8; 1024]); + + seeker + .seek(SeekFrom::Start(2 * 2048)) + .await + .expect("must seek"); + seeker.read_exact(&mut read_buf).await.expect("must read"); + assert_eq!(read_buf.as_slice(), &[2u8; 1024]); + } +} diff --git a/tvix/castore/src/blobservice/simplefs.rs b/tvix/castore/src/blobservice/simplefs.rs new file mode 100644 index 000000000000..b21db2808c23 --- /dev/null +++ b/tvix/castore/src/blobservice/simplefs.rs @@ -0,0 +1,195 @@ +use std::{ + io, + path::{Path, PathBuf}, + pin::pin, + task::Poll, +}; + +use bytes::Buf; +use data_encoding::HEXLOWER; +use pin_project_lite::pin_project; +use tokio::io::AsyncWriteExt; +use tonic::async_trait; +use tracing::instrument; + +use crate::B3Digest; + +use super::{BlobReader, BlobService, BlobWriter}; + +/// Connects to a tvix-store BlobService on an existing path backed by a POSIX-compliant +/// filesystem. +/// +/// It takes an existing path, builds a `tmp` directory and a `blobs` directory inside of it. All +/// blobs received are staged in that `tmp` directory, then they are moved **atomically** into +/// `blobs/B3DIGEST[:2]/B3DIGEST[2:]` in a sharding style, e.g. `abcdef` gets turned into `ab/cdef` +/// +/// **Disclaimer** : This very simple implementation is subject to change and does not give any +/// final guarantees on the on-disk format. +#[derive(Clone)] +pub struct SimpleFilesystemBlobService { + /// Where the blobs are located on a filesystem already mounted. + path: PathBuf, +} + +impl SimpleFilesystemBlobService { + pub async fn new(path: PathBuf) -> std::io::Result<Self> { + tokio::fs::create_dir_all(&path).await?; + tokio::fs::create_dir_all(path.join("tmp")).await?; + tokio::fs::create_dir_all(path.join("blobs")).await?; + + Ok(Self { path }) + } +} + +fn derive_path(root: &Path, digest: &B3Digest) -> PathBuf { + let prefix = HEXLOWER.encode(&digest.as_slice()[..2]); + let pathname = HEXLOWER.encode(digest.as_slice()); + + root.join("blobs").join(prefix).join(pathname) +} + +#[async_trait] +impl BlobService for SimpleFilesystemBlobService { + #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> io::Result<bool> { + Ok(tokio::fs::try_exists(derive_path(&self.path, digest)).await?) + } + + #[instrument(skip_all, err, fields(blob.digest=%digest))] + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { + let dst_path = derive_path(&self.path, digest); + let reader = match tokio::fs::File::open(dst_path).await { + Ok(file) => { + let reader: Box<dyn BlobReader> = Box::new(file); + Ok(Some(reader)) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e), + }; + + Ok(reader?) + } + + #[instrument(skip_all)] + async fn open_write(&self) -> Box<dyn BlobWriter> { + let file = match async_tempfile::TempFile::new_in(self.path.join("tmp")).await { + Ok(file) => Ok(file), + Err(e) => match e { + async_tempfile::Error::Io(io_error) => Err(io_error), + async_tempfile::Error::InvalidFile => Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "invalid or missing file specified", + )), + async_tempfile::Error::InvalidDirectory => Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "invalid or missing directory specified", + )), + }, + }; + + Box::new(SimpleFilesystemBlobWriter { + root: self.path.clone(), + file, + digester: blake3::Hasher::new(), + }) + } +} + +pin_project! { + struct SimpleFilesystemBlobWriter { + root: PathBuf, + file: std::io::Result<async_tempfile::TempFile>, + digester: blake3::Hasher + } +} + +impl tokio::io::AsyncWrite for SimpleFilesystemBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<Result<usize, std::io::Error>> { + if let Err(e) = self.file.as_mut() { + return Poll::Ready(Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + ))); + } + + let writer = self.file.as_mut().unwrap(); + match pin!(writer).poll_write(cx, buf) { + Poll::Ready(Ok(n)) => { + let this = self.project(); + this.digester.update(buf.take(n).into_inner()); + Poll::Ready(Ok(n)) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), std::io::Error>> { + if let Err(e) = self.file.as_mut() { + return Poll::Ready(Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + ))); + } + + let writer = self.file.as_mut().unwrap(); + pin!(writer).poll_flush(cx) + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), std::io::Error>> { + if let Err(e) = self.file.as_mut() { + return Poll::Ready(Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + ))); + } + + let writer = self.file.as_mut().unwrap(); + pin!(writer).poll_shutdown(cx) + } +} + +#[async_trait] +impl BlobWriter for SimpleFilesystemBlobWriter { + async fn close(&mut self) -> io::Result<B3Digest> { + if let Err(e) = self.file.as_mut() { + return Err(std::mem::replace( + e, + std::io::Error::new( + std::io::ErrorKind::NotConnected, + "this file is already closed", + ), + )); + } + + let writer = self.file.as_mut().unwrap(); + writer.sync_all().await?; + writer.flush().await?; + + let digest: B3Digest = self.digester.finalize().as_bytes().into(); + let dst_path = derive_path(&self.root, &digest); + tokio::fs::create_dir_all(dst_path.parent().unwrap()).await?; + tokio::fs::rename(writer.file_path(), dst_path).await?; + + Ok(digest) + } +} diff --git a/tvix/castore/src/blobservice/sled.rs b/tvix/castore/src/blobservice/sled.rs new file mode 100644 index 000000000000..3dd4bff7bc8e --- /dev/null +++ b/tvix/castore/src/blobservice/sled.rs @@ -0,0 +1,150 @@ +use super::{BlobReader, BlobService, BlobWriter}; +use crate::{B3Digest, Error}; +use std::{ + io::{self, Cursor, Write}, + path::Path, + task::Poll, +}; +use tonic::async_trait; +use tracing::instrument; + +#[derive(Clone)] +pub struct SledBlobService { + db: sled::Db, +} + +impl SledBlobService { + pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> { + let config = sled::Config::default() + .use_compression(false) // is a required parameter + .path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +#[async_trait] +impl BlobService for SledBlobService { + #[instrument(skip(self), fields(blob.digest=%digest))] + async fn has(&self, digest: &B3Digest) -> io::Result<bool> { + match self.db.contains_key(digest.as_slice()) { + Ok(has) => Ok(has), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), + } + } + + #[instrument(skip(self), fields(blob.digest=%digest))] + async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { + match self.db.get(digest.as_slice()) { + Ok(None) => Ok(None), + Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())), + } + } + + #[instrument(skip(self))] + async fn open_write(&self) -> Box<dyn BlobWriter> { + Box::new(SledBlobWriter::new(self.db.clone())) + } +} + +pub struct SledBlobWriter { + db: sled::Db, + + /// Contains the buffer Vec and hasher, or None if already closed + writers: Option<(Vec<u8>, blake3::Hasher)>, + + /// The digest that has been returned, if we successfully closed. + digest: Option<B3Digest>, +} + +impl SledBlobWriter { + pub fn new(db: sled::Db) -> Self { + Self { + db, + writers: Some((Vec::new(), blake3::Hasher::new())), + digest: None, + } + } +} + +impl tokio::io::AsyncWrite for SledBlobWriter { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + b: &[u8], + ) -> std::task::Poll<Result<usize, io::Error>> { + Poll::Ready(match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some((ref mut buf, ref mut hasher)) => { + let bytes_written = buf.write(b)?; + hasher.write(&b[..bytes_written]) + } + }) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + Poll::Ready(match &mut self.writers { + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + Some(_) => Ok(()), + }) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), io::Error>> { + // shutdown is "instantaneous", we only write to a Vec<u8> as buffer. + Poll::Ready(Ok(())) + } +} + +#[async_trait] +impl BlobWriter for SledBlobWriter { + async fn close(&mut self) -> io::Result<B3Digest> { + if self.writers.is_none() { + match &self.digest { + Some(digest) => Ok(digest.clone()), + None => Err(io::Error::new( + io::ErrorKind::NotConnected, + "already closed", + )), + } + } else { + let (buf, hasher) = self.writers.take().unwrap(); + + let digest: B3Digest = hasher.finalize().as_bytes().into(); + + // Only insert if the blob doesn't already exist. + if !self.db.contains_key(digest.as_slice()).map_err(|e| { + Error::StorageError(format!("Unable to check if we have blob {}: {}", digest, e)) + })? { + // put buf in there. This will move buf out. + self.db + .insert(digest.as_slice(), buf) + .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?; + } + + self.digest = Some(digest.clone()); + + Ok(digest) + } + } +} diff --git a/tvix/castore/src/blobservice/tests.rs b/tvix/castore/src/blobservice/tests.rs new file mode 100644 index 000000000000..7480ca808225 --- /dev/null +++ b/tvix/castore/src/blobservice/tests.rs @@ -0,0 +1,243 @@ +use std::io; +use std::pin::pin; + +use test_case::test_case; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; + +use super::B3Digest; +use super::BlobService; +use super::MemoryBlobService; +use super::SledBlobService; +use crate::fixtures; + +// TODO: avoid having to define all different services we test against for all functions. +// maybe something like rstest can be used? + +fn gen_memory_blob_service() -> impl BlobService { + MemoryBlobService::default() +} +fn gen_sled_blob_service() -> impl BlobService { + SledBlobService::new_temporary().unwrap() +} + +// TODO: add GRPC blob service here. + +/// Using [BlobService::has] on a non-existing blob should return false +#[test_case(gen_memory_blob_service(); "memory")] +#[test_case(gen_sled_blob_service(); "sled")] +fn has_nonexistent_false(blob_service: impl BlobService) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + assert!(!blob_service + .has(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not fail")); + }) +} + +/// Trying to read a non-existing blob should return a None instead of a reader. +#[test_case(gen_memory_blob_service(); "memory")] +#[test_case(gen_sled_blob_service(); "sled")] +fn not_found_read(blob_service: impl BlobService) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + assert!(blob_service + .open_read(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not fail") + .is_none()) + }) +} + +/// Put a blob in the store, check has, get it back. +/// We test both with small and big blobs. +#[test_case(gen_memory_blob_service(), &fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST; "memory-small")] +#[test_case(gen_sled_blob_service(), &fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST; "sled-small")] +#[test_case(gen_memory_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "memory-big")] +#[test_case(gen_sled_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "sled-big")] +fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest: &B3Digest) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut w = blob_service.open_write().await; + + let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w) + .await + .expect("copy must succeed"); + assert_eq!( + blob_contents.len(), + l as usize, + "written bytes must match blob length" + ); + + let digest = w.close().await.expect("close must succeed"); + + assert_eq!(*blob_digest, digest, "returned digest must be correct"); + + assert!( + blob_service.has(blob_digest).await.expect("must not fail"), + "blob service should now have the blob" + ); + + let mut r = blob_service + .open_read(blob_digest) + .await + .expect("open_read must succeed") + .expect("must be some"); + + let mut buf: Vec<u8> = Vec::new(); + let mut pinned_reader = pin!(r); + let l = tokio::io::copy(&mut pinned_reader, &mut buf) + .await + .expect("copy must succeed"); + // let l = io::copy(&mut r, &mut buf).expect("copy must succeed"); + + assert_eq!( + blob_contents.len(), + l as usize, + "read bytes must match blob length" + ); + + assert_eq!(blob_contents, buf, "read blob contents must match"); + }) +} + +/// Put a blob in the store, and seek inside it a bit. +#[test_case(gen_memory_blob_service(); "memory")] +#[test_case(gen_sled_blob_service(); "sled")] +fn put_seek(blob_service: impl BlobService) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut w = blob_service.open_write().await; + + tokio::io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w) + .await + .expect("copy must succeed"); + w.close().await.expect("close must succeed"); + + // open a blob for reading + let mut r = blob_service + .open_read(&fixtures::BLOB_B_DIGEST) + .await + .expect("open_read must succeed") + .expect("must be some"); + + let mut pos: u64 = 0; + + // read the first 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected first 10 bytes to match" + ); + + pos += buf.len() as u64; + } + // seek by 0 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos)) + .await + .expect("must not fail"); + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + pos += buf.len() as u64; + } + + // seek by 5 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos + 5)) + .await + .expect("must not fail"); + pos += 5; + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + pos += buf.len() as u64; + } + + // seek by 12345 bytes, using SeekFrom:: + let p = r + .seek(io::SeekFrom::Current(12345)) + .await + .expect("must not fail"); + pos += 12345; + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + #[allow(unused_assignments)] + { + pos += buf.len() as u64; + } + } + + // seeking to the end is okay… + let p = r + .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64)) + .await + .expect("must not fail"); + pos = fixtures::BLOB_B.len() as u64; + assert_eq!(pos, p); + + { + // but it returns no more data. + let mut buf: Vec<u8> = Vec::new(); + r.read_to_end(&mut buf).await.expect("must not fail"); + assert!(buf.is_empty(), "expected no more data to be read"); + } + + // seeking past the end… + // should either be ok, but then return 0 bytes. + // this matches the behaviour or a Cursor<Vec<u8>>. + if let Ok(_pos) = r + .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) + .await + { + let mut buf: Vec<u8> = Vec::new(); + r.read_to_end(&mut buf).await.expect("must not fail"); + assert!(buf.is_empty(), "expected no more data to be read"); + } + // or not be okay. + + // TODO: this is only broken for the gRPC version + // We expect seeking backwards or relative to the end to fail. + // r.seek(io::SeekFrom::Current(-1)) + // .expect_err("SeekFrom::Current(-1) expected to fail"); + + // r.seek(io::SeekFrom::Start(pos - 1)) + // .expect_err("SeekFrom::Start(pos-1) expected to fail"); + + // r.seek(io::SeekFrom::End(0)) + // .expect_err("SeekFrom::End(_) expected to fail"); + }) +} diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs new file mode 100644 index 000000000000..137ed2669a8f --- /dev/null +++ b/tvix/castore/src/digests.rs @@ -0,0 +1,73 @@ +use bytes::Bytes; +use data_encoding::BASE64; +use thiserror::Error; + +#[derive(PartialEq, Eq, Hash, Debug)] +pub struct B3Digest(Bytes); + +// TODO: allow converting these errors to crate::Error +#[derive(Error, Debug)] +pub enum Error { + #[error("invalid digest length: {0}")] + InvalidDigestLen(usize), +} + +pub const B3_LEN: usize = 32; + +impl B3Digest { + pub fn as_slice(&self) -> &[u8] { + &self.0[..] + } +} + +impl From<B3Digest> for bytes::Bytes { + fn from(val: B3Digest) -> Self { + val.0 + } +} + +impl TryFrom<Vec<u8>> for B3Digest { + type Error = Error; + + // constructs a [B3Digest] from a [Vec<u8>]. + // Returns an error if the digest has the wrong length. + fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> { + if value.len() != B3_LEN { + Err(Error::InvalidDigestLen(value.len())) + } else { + Ok(Self(value.into())) + } + } +} + +impl TryFrom<bytes::Bytes> for B3Digest { + type Error = Error; + + // constructs a [B3Digest] from a [bytes::Bytes]. + // Returns an error if the digest has the wrong length. + fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> { + if value.len() != B3_LEN { + Err(Error::InvalidDigestLen(value.len())) + } else { + Ok(Self(value)) + } + } +} + +impl From<&[u8; B3_LEN]> for B3Digest { + fn from(value: &[u8; B3_LEN]) -> Self { + Self(value.to_vec().into()) + } +} + +impl Clone for B3Digest { + fn clone(&self) -> Self { + Self(self.0.to_owned()) + } +} + +impl std::fmt::Display for B3Digest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "b3:{}", BASE64.encode(&self.0)) + } +} diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs new file mode 100644 index 000000000000..0a5a0464c14a --- /dev/null +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -0,0 +1,120 @@ +use url::Url; + +use crate::{proto::directory_service_client::DirectoryServiceClient, Error}; + +use super::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService, SledDirectoryService}; + +/// Constructs a new instance of a [DirectoryService] from an URI. +/// +/// The following URIs are supported: +/// - `memory:` +/// Uses a in-memory implementation. +/// - `sled:` +/// Uses a in-memory sled implementation. +/// - `sled:///absolute/path/to/somewhere` +/// Uses sled, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. +/// - `grpc+unix:///absolute/path/to/somewhere` +/// Connects to a local tvix-store gRPC service via Unix socket. +/// - `grpc+http://host:port`, `grpc+https://host:port` +/// Connects to a (remote) tvix-store gRPC service. +pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Error> { + let url = Url::parse(uri) + .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; + + Ok(if url.scheme() == "memory" { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string())); + } + Box::<MemoryDirectoryService>::default() + } else if url.scheme() == "sled" { + // sled doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string())); + } + + if url.path() == "/" { + return Err(Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )); + } + + // TODO: expose compression and other parameters as URL parameters? + + if url.path().is_empty() { + return Ok(Box::new( + SledDirectoryService::new_temporary() + .map_err(|e| Error::StorageError(e.to_string()))?, + )); + } + return Ok(Box::new( + SledDirectoryService::new(url.path()) + .map_err(|e| Error::StorageError(e.to_string()))?, + )); + } else if url.scheme().starts_with("grpc+") { + // schemes starting with grpc+ go to the GRPCPathInfoService. + // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?); + Box::new(GRPCDirectoryService::from_client(client)) + } else { + Err(crate::Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))? + }) +} + +#[cfg(test)] +mod tests { + use super::from_addr; + use lazy_static::lazy_static; + use tempfile::TempDir; + use test_case::test_case; + + lazy_static! { + static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); + } + + /// This uses an unsupported scheme. + #[test_case("http://foo.example/test", false; "unsupported scheme")] + /// This configures sled in temporary mode. + #[test_case("sled://", true; "sled valid temporary")] + /// This configures sled with /, which should fail. + #[test_case("sled:///", false; "sled invalid root")] + /// This configures sled with a host, not path, which should fail. + #[test_case("sled://foo.example", false; "sled invalid host")] + /// This configures sled with a valid path path, which should succeed. + #[test_case(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true; "sled valid path")] + /// This configures sled with a host, and a valid path path, which should fail. + #[test_case(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false; "sled invalid host with valid path")] + /// This correctly sets the scheme, and doesn't set a path. + #[test_case("memory://", true; "memory valid")] + /// This sets a memory url host to `foo` + #[test_case("memory://foo", false; "memory invalid host")] + /// This sets a memory url path to "/", which is invalid. + #[test_case("memory:///", false; "memory invalid root path")] + /// This sets a memory url path to "/foo", which is invalid. + #[test_case("memory:///foo", false; "memory invalid root path foo")] + /// Correct scheme to connect to a unix socket. + #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")] + /// Correct scheme for unix socket, but setting a host too, which is invalid. + #[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")] + /// Correct scheme to connect to localhost, with port 12345 + #[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[test_case("grpc+http://localhost", true; "grpc valid http host without port")] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[test_case("grpc+https://localhost", true; "grpc valid https host without port")] + /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. + #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")] + #[tokio::test] + async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) { + assert_eq!(from_addr(uri_str).await.is_ok(), is_ok) + } +} diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs new file mode 100644 index 000000000000..ad06cb17b668 --- /dev/null +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -0,0 +1,505 @@ +use std::collections::HashSet; + +use super::{DirectoryPutter, DirectoryService}; +use crate::proto::{self, get_directory_request::ByWhat}; +use crate::{B3Digest, Error}; +use async_stream::try_stream; +use futures::stream::BoxStream; +use tokio::spawn; +use tokio::sync::mpsc::UnboundedSender; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic::async_trait; +use tonic::Code; +use tonic::{transport::Channel, Status}; +use tracing::{instrument, warn}; + +/// Connects to a (remote) tvix-store DirectoryService over gRPC. +#[derive(Clone)] +pub struct GRPCDirectoryService { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, +} + +impl GRPCDirectoryService { + /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + ) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl DirectoryService for GRPCDirectoryService { + async fn get( + &self, + digest: &B3Digest, + ) -> Result<Option<crate::proto::Directory>, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + let digest_cpy = digest.clone(); + let message = async move { + let mut s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(digest_cpy.into())), + }) + .await? + .into_inner(); + + // Retrieve the first message only, then close the stream (we set recursive to false) + s.message().await + }; + + let digest = digest.clone(); + match message.await { + Ok(Some(directory)) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != digest { + Err(crate::Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))) + } else if let Err(e) = directory.validate() { + // Validate the Directory itself is valid. + warn!("directory failed validation: {}", e.to_string()); + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + digest, e, + ))) + } else { + Ok(Some(directory)) + } + } + Ok(None) => Ok(None), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + let resp = self + .grpc_client + .clone() + .put(tokio_stream::once(directory)) + .await; + + match resp { + Ok(put_directory_resp) => Ok(put_directory_resp + .into_inner() + .root_digest + .try_into() + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<Result<proto::Directory, Error>> { + let mut grpc_client = self.grpc_client.clone(); + let root_directory_digest = root_directory_digest.clone(); + + let stream = try_stream! { + let mut stream = grpc_client + .get(proto::GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(root_directory_digest.clone().into())), + }) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); + + // The Directory digests we received so far + let mut received_directory_digests: HashSet<B3Digest> = HashSet::new(); + // The Directory digests we're still expecting to get sent. + let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]); + + loop { + match stream.message().await { + Ok(Some(directory)) => { + // validate the directory itself. + if let Err(e) = directory.validate() { + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + directory.digest(), + e, + )))?; + } + // validate we actually expected that directory, and move it from expected to received. + let directory_digest = directory.digest(); + let was_expected = expected_directory_digests.remove(&directory_digest); + if !was_expected { + // FUTUREWORK: dumb clients might send the same stuff twice. + // as a fallback, we might want to tolerate receiving + // it if it's in received_directory_digests (as that + // means it once was in expected_directory_digests) + Err(crate::Error::StorageError(format!( + "received unexpected directory {}", + directory_digest + )))?; + } + received_directory_digests.insert(directory_digest); + + // register all children in expected_directory_digests. + for child_directory in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + let child_directory_digest = + child_directory.digest.clone().try_into().unwrap(); + + expected_directory_digests + .insert(child_directory_digest); + } + + yield directory; + }, + Ok(None) => { + // If we were still expecting something, that's an error. + if !expected_directory_digests.is_empty() { + Err(crate::Error::StorageError(format!( + "still expected {} directories, but got premature end of stream", + expected_directory_digests.len(), + )))? + } else { + return + } + }, + Err(e) => { + Err(crate::Error::StorageError(e.to_string()))?; + }, + } + } + }; + + Box::pin(stream) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + let mut grpc_client = self.grpc_client.clone(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); + + Ok(s) + }); + + Box::new(GRPCPutter::new(tx, task)) + } +} + +/// Allows uploading multiple Directory messages in the same gRPC stream. +pub struct GRPCPutter { + /// Data about the current request - a handle to the task, and the tx part + /// of the channel. + /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request. + /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed. + #[allow(clippy::type_complexity)] // lol + rq: Option<( + JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + UnboundedSender<proto::Directory>, + )>, +} + +impl GRPCPutter { + pub fn new( + directory_sender: UnboundedSender<proto::Directory>, + task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + ) -> Self { + Self { + rq: Some((task, directory_sender)), + } + } + + // allows checking if the tx part of the channel is closed. + // only used in the test case. + #[cfg(test)] + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } +} + +#[async_trait] +impl DirectoryPutter for GRPCPutter { + async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + match self.rq { + // If we're not already closed, send the directory to directory_sender. + Some((_, ref directory_sender)) => { + if directory_sender.send(directory).is_err() { + // If the channel has been prematurely closed, invoke close (so we can peek at the error code) + // That error code is much more helpful, because it + // contains the error message from the server. + self.close().await?; + } + Ok(()) + } + // If self.close() was already called, we can't put again. + None => Err(Error::StorageError( + "DirectoryPutter already closed".to_string(), + )), + } + } + + /// Closes the stream for sending, and returns the value + async fn close(&mut self) -> Result<B3Digest, crate::Error> { + // get self.rq, and replace it with None. + // This ensures we can only close it once. + match std::mem::take(&mut self.rq) { + None => Err(Error::StorageError("already closed".to_string())), + Some((task, directory_sender)) => { + // close directory_sender, so blocking on task will finish. + drop(directory_sender); + + let root_digest = task + .await? + .map_err(|e| Error::StorageError(e.to_string()))? + .root_digest; + + root_digest.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + }) + } + } + } +} + +#[cfg(test)] +mod tests { + use core::time; + use futures::StreamExt; + use std::{any::Any, time::Duration}; + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio_retry::{strategy::ExponentialBackoff, Retry}; + use tokio_stream::wrappers::UnixListenerStream; + + use crate::{ + directoryservice::{ + grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService, + MemoryDirectoryService, + }, + fixtures::{self, DIRECTORY_A, DIRECTORY_B}, + proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper}, + utils::gen_directorysvc_grpc_client, + }; + + #[tokio::test] + async fn test() { + // create the GrpcDirectoryService + let directory_service = + super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await); + + // try to get DIRECTORY_A should return Ok(None) + assert_eq!( + None, + directory_service + .get(&DIRECTORY_A.digest()) + .await + .expect("must not fail") + ); + + // Now upload it + assert_eq!( + DIRECTORY_A.digest(), + directory_service + .put(DIRECTORY_A.clone()) + .await + .expect("must succeed") + ); + + // And retrieve it, compare for equality. + assert_eq!( + DIRECTORY_A.clone(), + directory_service + .get(&DIRECTORY_A.digest()) + .await + .expect("must succeed") + .expect("must be some") + ); + + // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. + directory_service + .put(DIRECTORY_B.clone()) + .await + .expect_err("must fail"); + + // Putting DIRECTORY_B in a put_multiple will succeed, but the close + // will always fail. + { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + handle.close().await.expect_err("must fail"); + } + + // Uploading A and then B should succeed, and closing should return the digest of B. + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + let digest = handle.close().await.expect("must succeed"); + assert_eq!(DIRECTORY_B.digest(), digest); + + // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. + let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); + assert_eq!( + DIRECTORY_B.clone(), + directories_it + .next() + .await + .expect("must be some") + .expect("must succeed") + ); + assert_eq!( + DIRECTORY_A.clone(), + directories_it + .next() + .await + .expect("must be some") + .expect("must succeed") + ); + + // Uploading B and then A should fail, because B refers to A, which + // hasn't been uploaded yet. + // However, the client can burst, so we might not have received the + // error back from the server. + { + let mut handle = directory_service.put_multiple_start(); + // sending out B will always be fine + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + + // whether we will be able to put A as well depends on whether we + // already received the error about B. + if handle.put(DIRECTORY_A.clone()).await.is_ok() { + // If we didn't, and this was Ok(_), … + // a subsequent close MUST fail (because it waits for the + // server) + handle.close().await.expect_err("must fail"); + } + } + + // Now we do the same test as before, send B, then A, but wait + // a long long time so we already received the error from the server + // (causing the internal stream to be closed). + // Uploading anything else subsequently should then fail. + { + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + + // get a GRPCPutter, so we can peek at [is_closed]. + let handle_any = &mut handle as &mut dyn Any; + + // `unchecked_downcast_mut` is unstable for now, + // https://github.com/rust-lang/rust/issues/90850 + // We do the same thing here. + // The reason for why we cannot use the checked downcast lies + // in the fact that: + // - GRPCPutter has type ID A + // - Box<GRPCPutter> has type ID B + // - "Box<dyn GRPCPutter>" (invalid type) has type ID C + // B seems different from C in this context. + // We cannot unpack and perform upcast coercion of the traits as it's an unstable + // feature. + // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose + // of not making leak `is_closed` in the original trait. + let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) }; + let mut is_closed = false; + for _try in 1..1000 { + if handle.is_closed() { + is_closed = true; + break; + } + tokio::time::sleep(time::Duration::from_millis(10)).await; + } + + assert!( + is_closed, + "expected channel to eventually close, but never happened" + ); + + handle + .put(DIRECTORY_A.clone()) + .await + .expect_err("must fail"); + } + } + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("daemon"); + + let path_clone = socket_path.clone(); + + // Spin up a server + tokio::spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = server.add_service( + crate::proto::directory_service_server::DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::new( + Box::<MemoryDirectoryService>::default() as Box<dyn DirectoryService> + ), + ), + ); + router.serve_with_incoming(uds_stream).await + }); + + // wait for the socket to be created + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // prepare a client + let grpc_client = { + let url = url::Url::parse(&format!( + "grpc+unix://{}?wait-connect=1", + socket_path.display() + )) + .expect("must parse"); + let client = DirectoryServiceClient::new( + crate::tonic::channel_from_url(&url) + .await + .expect("must succeed"), + ); + GRPCDirectoryService::from_client(client) + }; + + assert!(grpc_client + .get(&fixtures::DIRECTORY_A.digest()) + .await + .expect("must not fail") + .is_none()) + } +} diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs new file mode 100644 index 000000000000..528ffe2f2c03 --- /dev/null +++ b/tvix/castore/src/directoryservice/memory.rs @@ -0,0 +1,86 @@ +use crate::{proto, B3Digest, Error}; +use futures::stream::BoxStream; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tonic::async_trait; +use tracing::{instrument, warn}; + +use super::utils::{traverse_directory, SimplePutter}; +use super::{DirectoryPutter, DirectoryService}; + +#[derive(Clone, Default)] +pub struct MemoryDirectoryService { + db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, +} + +#[async_trait] +impl DirectoryService for MemoryDirectoryService { + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + let db = self.db.read()?; + + match db.get(digest) { + // The directory was not found, return + None => Ok(None), + + // The directory was found, try to parse the data as Directory message + Some(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != *digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))); + } + + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + actual_digest, e, + ))); + } + + Ok(Some(directory.clone())) + } + } + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + + // store it + let mut db = self.db.write()?; + db.insert(digest.clone(), directory); + + Ok(digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<Result<proto::Directory, Error>> { + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + Box::new(SimplePutter::new(self.clone())) + } +} diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs new file mode 100644 index 000000000000..db3d5767eadd --- /dev/null +++ b/tvix/castore/src/directoryservice/mod.rs @@ -0,0 +1,76 @@ +use crate::{proto, B3Digest, Error}; +use futures::stream::BoxStream; +use tonic::async_trait; + +mod from_addr; +mod grpc; +mod memory; +mod sled; +mod traverse; +mod utils; + +pub use self::from_addr::from_addr; +pub use self::grpc::GRPCDirectoryService; +pub use self::memory::MemoryDirectoryService; +pub use self::sled::SledDirectoryService; +pub use self::traverse::descend_to; + +/// The base trait all Directory services need to implement. +/// This is a simple get and put of [crate::proto::Directory], returning their +/// digest. +#[async_trait] +pub trait DirectoryService: Send + Sync { + /// Looks up a single Directory message by its digest. + /// The returned Directory message *must* be valid. + /// In case the directory is not found, Ok(None) is returned. + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + /// Uploads a single Directory message, and returns the calculated + /// digest, or an error. An error *must* also be returned if the message is + /// not valid. + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + + /// Looks up a closure of [proto::Directory]. + /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, + /// and we'd be able to add a default implementation for it here, but + /// we can't have that yet. + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + /// + /// The individual Directory messages *must* be valid. + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<Result<proto::Directory, Error>>; + + /// Allows persisting a closure of [proto::Directory], which is a graph of + /// connected Directory messages. + fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; +} + +/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// +/// The consumer can periodically call [DirectoryPutter::put], starting from the +/// leaves. Once the root is reached, [DirectoryPutter::close] can be called to +/// retrieve the root digest (or an error). +/// +/// DirectoryPutters might be created without a single [DirectoryPutter::put], +/// and then dropped without calling [DirectoryPutter::close], +/// for example when ingesting a path that ends up not pointing to a directory, +/// but a single file or symlink. +#[async_trait] +pub trait DirectoryPutter: Send { + /// Put a individual [proto::Directory] into the store. + /// Error semantics and behaviour is up to the specific implementation of + /// this trait. + /// Due to bursting, the returned error might refer to an object previously + /// sent via `put`. + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + + /// Close the stream, and wait for any errors. + /// If there's been any invalid Directory message uploaded, and error *must* + /// be returned. + async fn close(&mut self) -> Result<B3Digest, Error>; +} diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs new file mode 100644 index 000000000000..9acd3854184b --- /dev/null +++ b/tvix/castore/src/directoryservice/sled.rs @@ -0,0 +1,112 @@ +use crate::directoryservice::DirectoryPutter; +use crate::proto::Directory; +use crate::{proto, B3Digest, Error}; +use futures::stream::BoxStream; +use prost::Message; +use std::path::Path; +use tonic::async_trait; +use tracing::{instrument, warn}; + +use super::utils::{traverse_directory, SimplePutter}; +use super::DirectoryService; + +#[derive(Clone)] +pub struct SledDirectoryService { + db: sled::Db, +} + +impl SledDirectoryService { + pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> { + let config = sled::Config::default() + .use_compression(false) // is a required parameter + .path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +#[async_trait] +impl DirectoryService for SledDirectoryService { + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + match self.db.get(digest.as_slice()) { + // The directory was not found, return + Ok(None) => Ok(None), + + // The directory was found, try to parse the data as Directory message + Ok(Some(data)) => match Directory::decode(&*data) { + Ok(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != *digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + digest, actual_digest + ))); + } + + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + actual_digest, e, + ))); + } + + Ok(Some(directory)) + } + Err(e) => { + warn!("unable to parse directory {}: {}", digest, e); + Err(Error::StorageError(e.to_string())) + } + }, + // some storage error? + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + // store it + let result = self.db.insert(digest.as_slice(), directory.encode_to_vec()); + if let Err(e) = result { + return Err(Error::StorageError(e.to_string())); + } + Ok(digest) + } + + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> BoxStream<Result<proto::Directory, Error>> { + traverse_directory(self.clone(), root_directory_digest) + } + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> + where + Self: Clone, + { + Box::new(SimplePutter::new(self.clone())) + } +} diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs new file mode 100644 index 000000000000..528cef757432 --- /dev/null +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -0,0 +1,231 @@ +use super::DirectoryService; +use crate::{proto::NamedNode, B3Digest, Error}; +use std::os::unix::ffi::OsStrExt; +use tracing::{instrument, warn}; + +/// This descends from a (root) node to the given (sub)path, returning the Node +/// at that path, or none, if there's nothing at that path. +#[instrument(skip(directory_service))] +pub async fn descend_to<DS>( + directory_service: DS, + root_node: crate::proto::node::Node, + path: &std::path::Path, +) -> Result<Option<crate::proto::node::Node>, Error> +where + DS: AsRef<dyn DirectoryService>, +{ + // strip a possible `/` prefix from the path. + let path = { + if path.starts_with("/") { + path.strip_prefix("/").unwrap() + } else { + path + } + }; + + let mut cur_node = root_node; + let mut it = path.components(); + + loop { + match it.next() { + None => { + // the (remaining) path is empty, return the node we're current at. + return Ok(Some(cur_node)); + } + Some(first_component) => { + match cur_node { + crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => { + // There's still some path left, but the current node is no directory. + // This means the path doesn't exist, as we can't reach it. + return Ok(None); + } + crate::proto::node::Node::Directory(directory_node) => { + let digest: B3Digest = directory_node.digest.try_into().map_err(|_e| { + Error::StorageError("invalid digest length".to_string()) + })?; + + // fetch the linked node from the directory_service + match directory_service.as_ref().get(&digest).await? { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + None => { + warn!("directory {} does not exist", digest); + + return Err(Error::StorageError(format!( + "directory {} does not exist", + digest + ))); + } + Some(directory) => { + // look for first_component in the [Directory]. + // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we + // could stop as soon as e.name is larger than the search string. + let child_node = directory.nodes().find(|n| { + n.get_name() == first_component.as_os_str().as_bytes() + }); + + match child_node { + // child node not found means there's no such element inside the directory. + None => { + return Ok(None); + } + // child node found, return to top-of loop to find the next + // node in the path. + Some(child_node) => { + cur_node = child_node; + } + } + } + } + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}; + use crate::utils::gen_directory_service; + + use super::descend_to; + + #[tokio::test] + async fn test_descend_to() { + let directory_service = gen_directory_service(); + + let mut handle = directory_service.put_multiple_start(); + handle + .put(DIRECTORY_WITH_KEEP.clone()) + .await + .expect("must succeed"); + handle + .put(DIRECTORY_COMPLICATED.clone()) + .await + .expect("must succeed"); + + // construct the node for DIRECTORY_COMPLICATED + let node_directory_complicated = + crate::proto::node::Node::Directory(crate::proto::DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }); + + // construct the node for DIRECTORY_COMPLICATED + let node_directory_with_keep = crate::proto::node::Node::Directory( + DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), + ); + + // construct the node for the .keep file + let node_file_keep = + crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + + // traversal to an empty subpath should return the root node. + { + let resp = descend_to( + &directory_service, + node_directory_complicated.clone(), + &PathBuf::from(""), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_directory_complicated.clone()), resp); + } + + // traversal to `keep` should return the node for DIRECTORY_WITH_KEEP + { + let resp = descend_to( + &directory_service, + node_directory_complicated.clone(), + &PathBuf::from("keep"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_directory_with_keep), resp); + } + + // traversal to `keep/.keep` should return the node for the .keep file + { + let resp = descend_to( + &directory_service, + node_directory_complicated.clone(), + &PathBuf::from("keep/.keep"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_file_keep.clone()), resp); + } + + // traversal to `keep/.keep` should return the node for the .keep file + { + let resp = descend_to( + &directory_service, + node_directory_complicated.clone(), + &PathBuf::from("/keep/.keep"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_file_keep), resp); + } + + // traversal to `void` should return None (doesn't exist) + { + let resp = descend_to( + &directory_service, + node_directory_complicated.clone(), + &PathBuf::from("void"), + ) + .await + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to `void` should return None (doesn't exist) + { + let resp = descend_to( + &directory_service, + node_directory_complicated.clone(), + &PathBuf::from("//v/oid"), + ) + .await + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to `keep/.keep/404` should return None (the path can't be + // reached, as keep/.keep already is a file) + { + let resp = descend_to( + &directory_service, + node_directory_complicated.clone(), + &PathBuf::from("keep/.keep/foo"), + ) + .await + .expect("must succeed"); + + assert_eq!(None, resp); + } + + // traversal to a subpath of '/' should return the root node. + { + let resp = descend_to( + &directory_service, + node_directory_complicated.clone(), + &PathBuf::from("/"), + ) + .await + .expect("must succeed"); + + assert_eq!(Some(node_directory_complicated), resp); + } + } +} diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs new file mode 100644 index 000000000000..341705a8db9f --- /dev/null +++ b/tvix/castore/src/directoryservice/utils.rs @@ -0,0 +1,134 @@ +use super::DirectoryPutter; +use super::DirectoryService; +use crate::proto; +use crate::B3Digest; +use crate::Error; +use async_stream::stream; +use futures::stream::BoxStream; +use std::collections::{HashSet, VecDeque}; +use tonic::async_trait; +use tracing::warn; + +/// Traverses a [proto::Directory] from the root to the children. +/// +/// This is mostly BFS, but directories are only returned once. +pub fn traverse_directory<'a, DS: DirectoryService + 'static>( + directory_service: DS, + root_directory_digest: &B3Digest, +) -> BoxStream<'a, Result<proto::Directory, Error>> { + // The list of all directories that still need to be traversed. The next + // element is picked from the front, new elements are enqueued at the + // back. + let mut worklist_directory_digests: VecDeque<B3Digest> = + VecDeque::from([root_directory_digest.clone()]); + // The list of directory digests already sent to the consumer. + // We omit sending the same directories multiple times. + let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new(); + + let stream = stream! { + while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { + match directory_service.get(¤t_directory_digest).await { + // if it's not there, we have an inconsistent store! + Ok(None) => { + warn!("directory {} does not exist", current_directory_digest); + yield Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + ))); + } + Err(e) => { + warn!("failed to look up directory"); + yield Err(Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_digest, e + ))); + } + + // if we got it + Ok(Some(current_directory)) => { + // validate, we don't want to send invalid directories. + if let Err(e) = current_directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + yield Err(Error::StorageError(format!( + "invalid directory: {}", + current_directory_digest + ))); + } + + // We're about to send this directory, so let's avoid sending it again if a + // descendant has it. + sent_directory_digests.insert(current_directory_digest); + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + for child_directory_node in ¤t_directory.directories { + // TODO: propagate error + let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + + if worklist_directory_digests.contains(&child_digest) + || sent_directory_digests.contains(&child_digest) + { + continue; + } + worklist_directory_digests.push_back(child_digest); + } + + yield Ok(current_directory); + } + }; + } + }; + + Box::pin(stream) +} + +/// This is a simple implementation of a Directory uploader. +/// TODO: verify connectivity? Factor out these checks into generic helpers? +pub struct SimplePutter<DS: DirectoryService> { + directory_service: DS, + last_directory_digest: Option<B3Digest>, + closed: bool, +} + +impl<DS: DirectoryService> SimplePutter<DS> { + pub fn new(directory_service: DS) -> Self { + Self { + directory_service, + closed: false, + last_directory_digest: None, + } + } +} + +#[async_trait] +impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + + let digest = self.directory_service.put(directory).await?; + + // track the last directory digest + self.last_directory_digest = Some(digest); + + Ok(()) + } + + async fn close(&mut self) -> Result<B3Digest, Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + + match &self.last_directory_digest { + Some(last_digest) => { + self.closed = true; + Ok(last_digest.clone()) + } + None => Err(Error::InvalidRequest( + "no directories sent, can't show root digest".to_string(), + )), + } + } +} diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs new file mode 100644 index 000000000000..1b3ae4d1c862 --- /dev/null +++ b/tvix/castore/src/errors.rs @@ -0,0 +1,61 @@ +use std::sync::PoisonError; +use thiserror::Error; +use tokio::task::JoinError; +use tonic::Status; + +/// Errors related to communication with the store. +#[derive(Debug, Error)] +pub enum Error { + #[error("invalid request: {0}")] + InvalidRequest(String), + + #[error("internal storage error: {0}")] + StorageError(String), +} + +impl<T> From<PoisonError<T>> for Error { + fn from(value: PoisonError<T>) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<JoinError> for Error { + fn from(value: JoinError) -> Self { + Error::StorageError(value.to_string()) + } +} + +impl From<Error> for Status { + fn from(value: Error) -> Self { + match value { + Error::InvalidRequest(msg) => Status::invalid_argument(msg), + Error::StorageError(msg) => Status::data_loss(format!("storage error: {}", msg)), + } + } +} + +impl From<crate::tonic::Error> for Error { + fn from(value: crate::tonic::Error) -> Self { + Self::StorageError(value.to_string()) + } +} + +impl From<std::io::Error> for Error { + fn from(value: std::io::Error) -> Self { + if value.kind() == std::io::ErrorKind::InvalidInput { + Error::InvalidRequest(value.to_string()) + } else { + Error::StorageError(value.to_string()) + } + } +} + +// TODO: this should probably go somewhere else? +impl From<Error> for std::io::Error { + fn from(value: Error) -> Self { + match value { + Error::InvalidRequest(msg) => Self::new(std::io::ErrorKind::InvalidInput, msg), + Error::StorageError(msg) => Self::new(std::io::ErrorKind::Other, msg), + } + } +} diff --git a/tvix/castore/src/fixtures.rs b/tvix/castore/src/fixtures.rs new file mode 100644 index 000000000000..a206d9b7ddc6 --- /dev/null +++ b/tvix/castore/src/fixtures.rs @@ -0,0 +1,88 @@ +use crate::{ + proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode}, + B3Digest, +}; +use lazy_static::lazy_static; + +pub const HELLOWORLD_BLOB_CONTENTS: &[u8] = b"Hello World!"; +pub const EMPTY_BLOB_CONTENTS: &[u8] = b""; + +lazy_static! { + pub static ref DUMMY_DIGEST: B3Digest = { + let u = [0u8; 32]; + (&u).into() + }; + pub static ref DUMMY_DIGEST_2: B3Digest = { + let mut u = [0u8; 32]; + u[0] = 0x10; + (&u).into() + }; + pub static ref DUMMY_DATA_1: bytes::Bytes = vec![0x01, 0x02, 0x03].into(); + pub static ref DUMMY_DATA_2: bytes::Bytes = vec![0x04, 0x05].into(); + + pub static ref HELLOWORLD_BLOB_DIGEST: B3Digest = + blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into(); + pub static ref EMPTY_BLOB_DIGEST: B3Digest = + blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into(); + + // 2 bytes + pub static ref BLOB_A: bytes::Bytes = vec![0x00, 0x01].into(); + pub static ref BLOB_A_DIGEST: B3Digest = blake3::hash(&BLOB_A).as_bytes().into(); + + // 1MB + pub static ref BLOB_B: bytes::Bytes = (0..255).collect::<Vec<u8>>().repeat(4 * 1024).into(); + pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into(); + + // Directories + pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory { + directories: vec![], + files: vec![FileNode { + name: b".keep".to_vec().into(), + digest: EMPTY_BLOB_DIGEST.clone().into(), + size: 0, + executable: false, + }], + symlinks: vec![], + }; + pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory { + directories: vec![DirectoryNode { + name: b"keep".to_vec().into(), + digest: DIRECTORY_WITH_KEEP.digest().into(), + size: DIRECTORY_WITH_KEEP.size(), + }], + files: vec![FileNode { + name: b".keep".to_vec().into(), + digest: EMPTY_BLOB_DIGEST.clone().into(), + size: 0, + executable: false, + }], + symlinks: vec![SymlinkNode { + name: b"aa".to_vec().into(), + target: b"/nix/store/somewhereelse".to_vec().into(), + }], + }; + pub static ref DIRECTORY_A: Directory = Directory::default(); + pub static ref DIRECTORY_B: Directory = Directory { + directories: vec![DirectoryNode { + name: b"a".to_vec().into(), + digest: DIRECTORY_A.digest().into(), + size: DIRECTORY_A.size(), + }], + ..Default::default() + }; + pub static ref DIRECTORY_C: Directory = Directory { + directories: vec![ + DirectoryNode { + name: b"a".to_vec().into(), + digest: DIRECTORY_A.digest().into(), + size: DIRECTORY_A.size(), + }, + DirectoryNode { + name: b"a'".to_vec().into(), + digest: DIRECTORY_A.digest().into(), + size: DIRECTORY_A.size(), + } + ], + ..Default::default() + }; +} diff --git a/tvix/castore/src/fs/file_attr.rs b/tvix/castore/src/fs/file_attr.rs new file mode 100644 index 000000000000..ad41f036a253 --- /dev/null +++ b/tvix/castore/src/fs/file_attr.rs @@ -0,0 +1,53 @@ +#![allow(clippy::unnecessary_cast)] // libc::S_IFDIR is u32 on Linux and u16 on MacOS +use super::inodes::{DirectoryInodeData, InodeData}; +use fuse_backend_rs::abi::fuse_abi::Attr; + +/// The [Attr] describing the root +pub const ROOT_FILE_ATTR: Attr = Attr { + ino: fuse_backend_rs::api::filesystem::ROOT_ID, + size: 0, + blksize: 1024, + blocks: 0, + mode: libc::S_IFDIR as u32 | 0o555, + atime: 0, + mtime: 0, + ctime: 0, + atimensec: 0, + mtimensec: 0, + ctimensec: 0, + nlink: 0, + uid: 0, + gid: 0, + rdev: 0, + flags: 0, + #[cfg(target_os = "macos")] + crtime: 0, + #[cfg(target_os = "macos")] + crtimensec: 0, + #[cfg(target_os = "macos")] + padding: 0, +}; + +/// for given &Node and inode, construct an [Attr] +pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> Attr { + Attr { + ino: inode, + // FUTUREWORK: play with this numbers, as it affects read sizes for client applications. + blocks: 1024, + size: match inode_data { + InodeData::Regular(_, size, _) => *size as u64, + InodeData::Symlink(target) => target.len() as u64, + InodeData::Directory(DirectoryInodeData::Sparse(_, size)) => *size as u64, + InodeData::Directory(DirectoryInodeData::Populated(_, ref children)) => { + children.len() as u64 + } + }, + mode: match inode_data { + InodeData::Regular(_, _, false) => libc::S_IFREG as u32 | 0o444, // no-executable files + InodeData::Regular(_, _, true) => libc::S_IFREG as u32 | 0o555, // executable files + InodeData::Symlink(_) => libc::S_IFLNK as u32 | 0o444, + InodeData::Directory(_) => libc::S_IFDIR as u32 | 0o555, + }, + ..Default::default() + } +} diff --git a/tvix/castore/src/fs/fuse.rs b/tvix/castore/src/fs/fuse.rs new file mode 100644 index 000000000000..1dce43915905 --- /dev/null +++ b/tvix/castore/src/fs/fuse.rs @@ -0,0 +1,115 @@ +use std::{io, path::Path, sync::Arc, thread}; + +use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use tracing::{error, instrument}; + +struct FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, + channel: fuse_backend_rs::transport::FuseChannel, +} + +#[cfg(target_os = "macos")] +const BADFD: libc::c_int = libc::EBADF; +#[cfg(target_os = "linux")] +const BADFD: libc::c_int = libc::EBADFD; + +impl<FS> FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + fn start(&mut self) -> io::Result<()> { + while let Some((reader, writer)) = self + .channel + .get_request() + .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? + { + if let Err(e) = self + .server + .handle_message(reader, writer.into(), None, None) + { + match e { + // This indicates the session has been shut down. + fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => { + break; + } + error => { + error!(?error, "failed to handle fuse request"); + continue; + } + } + } + } + Ok(()) + } +} + +pub struct FuseDaemon { + session: FuseSession, + threads: Vec<thread::JoinHandle<()>>, +} + +impl FuseDaemon { + #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)] + pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error> + where + FS: FileSystem + Sync + Send + 'static, + P: AsRef<Path> + std::fmt::Debug, + { + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + #[cfg(target_os = "linux")] + session.set_allow_other(false); + session + .mount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + let mut join_handles = Vec::with_capacity(threads); + for _ in 0..threads { + let mut server = FuseServer { + server: server.clone(), + channel: session + .new_channel() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + }; + let join_handle = thread::Builder::new() + .name("fuse_server".to_string()) + .spawn(move || { + let _ = server.start(); + })?; + join_handles.push(join_handle); + } + + Ok(FuseDaemon { + session, + threads: join_handles, + }) + } + + #[instrument(skip_all, err)] + pub fn unmount(&mut self) -> Result<(), io::Error> { + self.session + .umount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + for thread in self.threads.drain(..) { + thread.join().map_err(|_| { + io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") + })?; + } + + Ok(()) + } +} + +impl Drop for FuseDaemon { + fn drop(&mut self) { + if let Err(error) = self.unmount() { + error!(?error, "failed to unmont fuse filesystem") + } + } +} diff --git a/tvix/castore/src/fs/inode_tracker.rs b/tvix/castore/src/fs/inode_tracker.rs new file mode 100644 index 000000000000..4a8283b6b144 --- /dev/null +++ b/tvix/castore/src/fs/inode_tracker.rs @@ -0,0 +1,207 @@ +use std::{collections::HashMap, sync::Arc}; + +use super::inodes::{DirectoryInodeData, InodeData}; +use crate::B3Digest; + +/// InodeTracker keeps track of inodes, stores data being these inodes and deals +/// with inode allocation. +pub struct InodeTracker { + data: HashMap<u64, Arc<InodeData>>, + + // lookup table for blobs by their B3Digest + blob_digest_to_inode: HashMap<B3Digest, u64>, + + // lookup table for symlinks by their target + symlink_target_to_inode: HashMap<bytes::Bytes, u64>, + + // lookup table for directories by their B3Digest. + // Note the corresponding directory may not be present in data yet. + directory_digest_to_inode: HashMap<B3Digest, u64>, + + // the next inode to allocate + next_inode: u64, +} + +impl Default for InodeTracker { + fn default() -> Self { + Self { + data: Default::default(), + + blob_digest_to_inode: Default::default(), + symlink_target_to_inode: Default::default(), + directory_digest_to_inode: Default::default(), + + next_inode: 2, + } + } +} + +impl InodeTracker { + // Retrieves data for a given inode, if it exists. + pub fn get(&self, ino: u64) -> Option<Arc<InodeData>> { + self.data.get(&ino).cloned() + } + + // Replaces data for a given inode. + // Panics if the inode doesn't already exist. + pub fn replace(&mut self, ino: u64, data: Arc<InodeData>) { + if self.data.insert(ino, data).is_none() { + panic!("replace called on unknown inode"); + } + } + + // Stores data and returns the inode for it. + // In case an inode has already been allocated for the same data, that inode + // is returned, otherwise a new one is allocated. + // In case data is a [InodeData::Directory], inodes for all items are looked + // up + pub fn put(&mut self, data: InodeData) -> u64 { + match data { + InodeData::Regular(ref digest, _, _) => { + match self.blob_digest_to_inode.get(digest) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => self.insert_and_increment(data), + } + } + InodeData::Symlink(ref target) => { + match self.symlink_target_to_inode.get(target) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => self.insert_and_increment(data), + } + } + InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { + // check the lookup table if the B3Digest is known. + match self.directory_digest_to_inode.get(digest) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => { + // insert and return the inode + self.insert_and_increment(data) + } + } + } + // Inserting [DirectoryInodeData::Populated] doesn't normally happen, + // only via [replace]. + InodeData::Directory(DirectoryInodeData::Populated(..)) => { + unreachable!("should never be called with DirectoryInodeData::Populated") + } + } + } + + // Inserts the data and returns the inode it was stored at, while + // incrementing next_inode. + fn insert_and_increment(&mut self, data: InodeData) -> u64 { + let ino = self.next_inode; + // insert into lookup tables + match data { + InodeData::Regular(ref digest, _, _) => { + self.blob_digest_to_inode.insert(digest.clone(), ino); + } + InodeData::Symlink(ref target) => { + self.symlink_target_to_inode.insert(target.clone(), ino); + } + InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { + self.directory_digest_to_inode.insert(digest.clone(), ino); + } + // This is currently not used outside test fixtures. + // Usually a [DirectoryInodeData::Sparse] is inserted and later + // "upgraded" with more data. + // However, as a future optimization, a lookup for a PathInfo could trigger a + // [DirectoryService::get_recursive()] request that "forks into + // background" and prepopulates all Directories in a closure. + InodeData::Directory(DirectoryInodeData::Populated(ref digest, _)) => { + self.directory_digest_to_inode.insert(digest.clone(), ino); + } + } + // Insert data + self.data.insert(ino, Arc::new(data)); + + // increment inode counter and return old inode. + self.next_inode += 1; + ino + } +} + +#[cfg(test)] +mod tests { + use crate::fixtures; + + use super::InodeData; + use super::InodeTracker; + + /// Getting something non-existent should be none + #[test] + fn get_nonexistent() { + let inode_tracker = InodeTracker::default(); + assert!(inode_tracker.get(1).is_none()); + } + + /// Put of a regular file should allocate a uid, which should be the same when inserting again. + #[test] + fn put_regular() { + let mut inode_tracker = InodeTracker::default(); + let f = InodeData::Regular( + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u64, + false, + ); + + // put it in + let ino = inode_tracker.put(f.clone()); + + // a get should return the right data + let data = inode_tracker.get(ino).expect("must be some"); + match *data { + InodeData::Regular(ref digest, _, _) => { + assert_eq!(&fixtures::BLOB_A_DIGEST.clone(), digest); + } + InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"), + } + + // another put should return the same ino + assert_eq!(ino, inode_tracker.put(f)); + + // inserting another file should return a different ino + assert_ne!( + ino, + inode_tracker.put(InodeData::Regular( + fixtures::BLOB_B_DIGEST.clone(), + fixtures::BLOB_B.len() as u64, + false, + )) + ); + } + + // Put of a symlink should allocate a uid, which should be the same when inserting again + #[test] + fn put_symlink() { + let mut inode_tracker = InodeTracker::default(); + let f = InodeData::Symlink("target".into()); + + // put it in + let ino = inode_tracker.put(f.clone()); + + // a get should return the right data + let data = inode_tracker.get(ino).expect("must be some"); + match *data { + InodeData::Symlink(ref target) => { + assert_eq!(b"target".to_vec(), *target); + } + InodeData::Regular(..) | InodeData::Directory(..) => panic!("wrong type"), + } + + // another put should return the same ino + assert_eq!(ino, inode_tracker.put(f)); + + // inserting another file should return a different ino + assert_ne!(ino, inode_tracker.put(InodeData::Symlink("target2".into()))); + } +} diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs new file mode 100644 index 000000000000..9131b703bae0 --- /dev/null +++ b/tvix/castore/src/fs/inodes.rs @@ -0,0 +1,57 @@ +//! This module contains all the data structures used to track information +//! about inodes, which present tvix-castore nodes in a filesystem. +use crate::proto as castorepb; +use crate::B3Digest; + +#[derive(Clone, Debug)] +pub enum InodeData { + Regular(B3Digest, u64, bool), // digest, size, executable + Symlink(bytes::Bytes), // target + Directory(DirectoryInodeData), // either [DirectoryInodeData:Sparse] or [DirectoryInodeData:Populated] +} + +/// This encodes the two different states of [InodeData::Directory]. +/// Either the data still is sparse (we only saw a [castorepb::DirectoryNode], +/// but didn't fetch the [castorepb::Directory] struct yet, or we processed a +/// lookup and did fetch the data. +#[derive(Clone, Debug)] +pub enum DirectoryInodeData { + Sparse(B3Digest, u64), // digest, size + Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)] +} + +impl From<&castorepb::node::Node> for InodeData { + fn from(value: &castorepb::node::Node) -> Self { + match value { + castorepb::node::Node::Directory(directory_node) => directory_node.into(), + castorepb::node::Node::File(file_node) => file_node.into(), + castorepb::node::Node::Symlink(symlink_node) => symlink_node.into(), + } + } +} + +impl From<&castorepb::SymlinkNode> for InodeData { + fn from(value: &castorepb::SymlinkNode) -> Self { + InodeData::Symlink(value.target.clone()) + } +} + +impl From<&castorepb::FileNode> for InodeData { + fn from(value: &castorepb::FileNode) -> Self { + InodeData::Regular( + value.digest.clone().try_into().unwrap(), + value.size, + value.executable, + ) + } +} + +/// Converts a DirectoryNode to a sparsely populated InodeData::Directory. +impl From<&castorepb::DirectoryNode> for InodeData { + fn from(value: &castorepb::DirectoryNode) -> Self { + InodeData::Directory(DirectoryInodeData::Sparse( + value.digest.clone().try_into().unwrap(), + value.size, + )) + } +} diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs new file mode 100644 index 000000000000..28528f38af1d --- /dev/null +++ b/tvix/castore/src/fs/mod.rs @@ -0,0 +1,628 @@ +mod file_attr; +mod inode_tracker; +mod inodes; +mod root_nodes; + +#[cfg(feature = "fuse")] +pub mod fuse; + +#[cfg(feature = "virtiofs")] +pub mod virtiofs; + +#[cfg(test)] +mod tests; + +use crate::proto as castorepb; +use crate::{ + blobservice::{BlobReader, BlobService}, + directoryservice::DirectoryService, + proto::{node::Node, NamedNode}, + B3Digest, +}; +use fuse_backend_rs::abi::fuse_abi::stat64; +use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; +use futures::StreamExt; +use parking_lot::RwLock; +use std::{ + collections::HashMap, + io, + sync::atomic::AtomicU64, + sync::{atomic::Ordering, Arc}, + time::Duration, +}; +use tokio::{ + io::{AsyncReadExt, AsyncSeekExt}, + sync::mpsc, +}; +use tracing::{debug, info_span, instrument, warn}; + +pub use self::root_nodes::RootNodes; +use self::{ + file_attr::{gen_file_attr, ROOT_FILE_ATTR}, + inode_tracker::InodeTracker, + inodes::{DirectoryInodeData, InodeData}, +}; + +/// This implements a read-only FUSE filesystem for a tvix-store +/// with the passed [BlobService], [DirectoryService] and [RootNodes]. +/// +/// Linux uses inodes in filesystems. When implementing FUSE, most calls are +/// *for* a given inode. +/// +/// This means, we need to have a stable mapping of inode numbers to the +/// corresponding store nodes. +/// +/// We internally delegate all inode allocation and state keeping to the +/// inode tracker. +/// We store a mapping from currently "explored" names in the root to their +/// inode. +/// +/// There's some places where inodes are allocated / data inserted into +/// the inode tracker, if not allocated before already: +/// - Processing a `lookup` request, either in the mount root, or somewhere +/// deeper. +/// - Processing a `readdir` request +/// +/// Things pointing to the same contents get the same inodes, irrespective of +/// their own location. +/// This means: +/// - Symlinks with the same target will get the same inode. +/// - Regular/executable files with the same contents will get the same inode +/// - Directories with the same contents will get the same inode. +/// +/// Due to the above being valid across the whole store, and considering the +/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed +/// allocation", aka reserve Directory.size inodes for each directory node we +/// explore. +/// Tests for this live in the tvix-store crate. +pub struct TvixStoreFs<BS, DS, RN> { + blob_service: BS, + directory_service: DS, + root_nodes_provider: RN, + + /// Whether to (try) listing elements in the root. + list_root: bool, + + /// This maps a given basename in the root to the inode we allocated for the node. + root_nodes: RwLock<HashMap<Vec<u8>, u64>>, + + /// This keeps track of inodes and data alongside them. + inode_tracker: RwLock<InodeTracker>, + + /// This holds all open file handles + #[allow(clippy::type_complexity)] + file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>, + + next_file_handle: AtomicU64, + + tokio_handle: tokio::runtime::Handle, +} + +impl<BS, DS, RN> TvixStoreFs<BS, DS, RN> +where + BS: AsRef<dyn BlobService> + Clone + Send, + DS: AsRef<dyn DirectoryService> + Clone + Send + 'static, + RN: RootNodes + Clone + 'static, +{ + pub fn new( + blob_service: BS, + directory_service: DS, + root_nodes_provider: RN, + list_root: bool, + ) -> Self { + Self { + blob_service, + directory_service, + root_nodes_provider, + + list_root, + + root_nodes: RwLock::new(HashMap::default()), + inode_tracker: RwLock::new(Default::default()), + + file_handles: RwLock::new(Default::default()), + next_file_handle: AtomicU64::new(1), + tokio_handle: tokio::runtime::Handle::current(), + } + } + + /// Retrieves the inode for a given root node basename, if present. + /// This obtains a read lock on self.root_nodes. + fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> { + self.root_nodes.read().get(name).cloned() + } + + /// For a given inode, look up the given directory behind it (from + /// self.inode_tracker), and return its children. + /// The inode_tracker MUST know about this inode already, and it MUST point + /// to a [InodeData::Directory]. + /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup + /// in self.directory_service is performed, and self.inode_tracker is updated with the + /// [DirectoryInodeData::Populated]. + #[instrument(skip(self), err)] + fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> { + let data = self.inode_tracker.read().get(ino).unwrap(); + match *data { + // if it's populated already, return children. + InodeData::Directory(DirectoryInodeData::Populated( + ref parent_digest, + ref children, + )) => Ok((parent_digest.clone(), children.clone())), + // if it's sparse, fetch data using directory_service, populate child nodes + // and update it in [self.inode_tracker]. + InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { + let directory = self + .tokio_handle + .block_on(self.tokio_handle.spawn({ + let directory_service = self.directory_service.clone(); + let parent_digest = parent_digest.to_owned(); + async move { directory_service.as_ref().get(&parent_digest).await } + })) + .unwrap()? + .ok_or_else(|| { + warn!(directory.digest=%parent_digest, "directory not found"); + // If the Directory can't be found, this is a hole, bail out. + io::Error::from_raw_os_error(libc::EIO) + })?; + + // Turn the retrieved directory into a InodeData::Directory(DirectoryInodeData::Populated(..)), + // allocating inodes for the children on the way. + let children = { + let mut inode_tracker = self.inode_tracker.write(); + + let children: Vec<(u64, castorepb::node::Node)> = directory + .nodes() + .map(|child_node| { + let child_ino = inode_tracker.put((&child_node).into()); + (child_ino, child_node) + }) + .collect(); + + // replace. + inode_tracker.replace( + ino, + Arc::new(InodeData::Directory(DirectoryInodeData::Populated( + parent_digest.clone(), + children.clone(), + ))), + ); + + children + }; + + Ok((parent_digest.clone(), children)) + } + // if the parent inode was not a directory, this doesn't make sense + InodeData::Regular(..) | InodeData::Symlink(_) => { + Err(io::Error::from_raw_os_error(libc::ENOTDIR)) + } + } + } + + /// This will turn a lookup request for a name in the root to a ino and + /// [InodeData]. + /// It will peek in [self.root_nodes], and then either look it up from + /// [self.inode_tracker], + /// or otherwise fetch from [self.root_nodes], and then insert into + /// [self.inode_tracker]. + /// In the case the name can't be found, a libc::ENOENT is returned. + fn name_in_root_to_ino_and_data( + &self, + name: &std::ffi::CStr, + ) -> io::Result<(u64, Arc<InodeData>)> { + // Look up the inode for that root node. + // If there's one, [self.inode_tracker] MUST also contain the data, + // which we can then return. + if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) { + return Ok(( + inode, + self.inode_tracker + .read() + .get(inode) + .expect("must exist") + .to_owned(), + )); + } + + // We don't have it yet, look it up in [self.root_nodes]. + match self.tokio_handle.block_on({ + let root_nodes_provider = self.root_nodes_provider.clone(); + async move { root_nodes_provider.get_by_basename(name.to_bytes()).await } + }) { + // if there was an error looking up the root node, propagate up an IO error. + Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)), + // the root node doesn't exist, so the file doesn't exist. + Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), + // The root node does exist + Ok(Some(ref root_node)) => { + // The name must match what's passed in the lookup, otherwise this is also a ENOENT. + if root_node.get_name() != name.to_bytes() { + debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch"); + return Err(io::Error::from_raw_os_error(libc::ENOENT)); + } + + // Let's check if someone else beat us to updating the inode tracker and + // root_nodes map. This avoids locking inode_tracker for writing. + if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) { + return Ok(( + *ino, + self.inode_tracker.read().get(*ino).expect("must exist"), + )); + } + + // Only in case it doesn't, lock [self.root_nodes] and + // [self.inode_tracker] for writing. + let mut root_nodes = self.root_nodes.write(); + let mut inode_tracker = self.inode_tracker.write(); + + // insert the (sparse) inode data and register in + // self.root_nodes. + let inode_data: InodeData = root_node.into(); + let ino = inode_tracker.put(inode_data.clone()); + root_nodes.insert(name.to_bytes().into(), ino); + + Ok((ino, Arc::new(inode_data))) + } + } + } +} + +impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN> +where + BS: AsRef<dyn BlobService> + Clone + Send + 'static, + DS: AsRef<dyn DirectoryService> + Send + Clone + 'static, + RN: RootNodes + Clone + 'static, +{ + type Handle = u64; + type Inode = u64; + + fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> { + Ok(FsOptions::empty()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn getattr( + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Option<Self::Handle>, + ) -> io::Result<(stat64, Duration)> { + if inode == ROOT_ID { + return Ok((ROOT_FILE_ATTR.into(), Duration::MAX)); + } + + match self.inode_tracker.read().get(inode) { + None => Err(io::Error::from_raw_os_error(libc::ENOENT)), + Some(node) => { + debug!(node = ?node, "found node"); + Ok((gen_file_attr(&node, inode).into(), Duration::MAX)) + } + } + } + + #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))] + fn lookup( + &self, + _ctx: &Context, + parent: Self::Inode, + name: &std::ffi::CStr, + ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> { + debug!("lookup"); + + // This goes from a parent inode to a node. + // - If the parent is [ROOT_ID], we need to check + // [self.root_nodes] (fetching from a [RootNode] provider if needed) + // - Otherwise, lookup the parent in [self.inode_tracker] (which must be + // a [InodeData::Directory]), and find the child with that name. + if parent == ROOT_ID { + let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?; + + debug!(inode_data=?&inode_data, ino=ino, "Some"); + return Ok(fuse_backend_rs::api::filesystem::Entry { + inode: ino, + attr: gen_file_attr(&inode_data, ino).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + }); + } + // This is the "lookup for "a" inside inode 42. + // We already know that inode 42 must be a directory. + let (parent_digest, children) = self.get_directory_children(parent)?; + + let span = info_span!("lookup", directory.digest = %parent_digest); + let _enter = span.enter(); + + // Search for that name in the list of children and return the FileAttrs. + + // in the children, find the one with the desired name. + if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + // lookup the child [InodeData] in [self.inode_tracker]. + // We know the inodes for children have already been allocated. + let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap(); + + // Reply with the file attributes for the child. + // For child directories, we still have all data we need to reply. + Ok(fuse_backend_rs::api::filesystem::Entry { + inode: *child_ino, + attr: gen_file_attr(&child_inode_data, *child_ino).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + }) + } else { + // Child not found, return ENOENT. + Err(io::Error::from_raw_os_error(libc::ENOENT)) + } + } + + // TODO: readdirplus? + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))] + fn readdir( + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Self::Handle, + _size: u32, + offset: u64, + add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>, + ) -> io::Result<()> { + debug!("readdir"); + + if inode == ROOT_ID { + if !self.list_root { + return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo + } else { + let root_nodes_provider = self.root_nodes_provider.clone(); + let (tx, mut rx) = mpsc::channel(16); + + // This task will run in the background immediately and will exit + // after the stream ends or if we no longer want any more entries. + self.tokio_handle.spawn(async move { + let mut stream = root_nodes_provider.list().skip(offset as usize).enumerate(); + while let Some(node) = stream.next().await { + if tx.send(node).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } + } + }); + + while let Some((i, ref root_node)) = rx.blocking_recv() { + let root_node = match root_node { + Err(e) => { + warn!("failed to retrieve pathinfo: {}", e); + return Err(io::Error::from_raw_os_error(libc::EPERM)); + } + Ok(root_node) => root_node, + }; + + let name = root_node.get_name(); + // obtain the inode, or allocate a new one. + let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| { + // insert the (sparse) inode data and register in + // self.root_nodes. + let ino = self.inode_tracker.write().put(root_node.into()); + self.root_nodes.write().insert(name.into(), ino); + ino + }); + + let ty = match root_node { + Node::Directory(_) => libc::S_IFDIR, + Node::File(_) => libc::S_IFREG, + Node::Symlink(_) => libc::S_IFLNK, + }; + + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino, + offset: offset + i as u64 + 1, + type_: ty, + name, + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } + } + + return Ok(()); + } + } + + // lookup the children, or return an error if it's not a directory. + let (parent_digest, children) = self.get_directory_children(inode)?; + + let span = info_span!("lookup", directory.digest = %parent_digest); + let _enter = span.enter(); + + for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() { + // the second parameter will become the "offset" parameter on the next call. + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino: *ino, + offset: offset + i as u64 + 1, + type_: match child_node { + #[allow(clippy::unnecessary_cast)] + // libc::S_IFDIR is u32 on Linux and u16 on MacOS + Node::Directory(_) => libc::S_IFDIR as u32, + #[allow(clippy::unnecessary_cast)] + // libc::S_IFDIR is u32 on Linux and u16 on MacOS + Node::File(_) => libc::S_IFREG as u32, + #[allow(clippy::unnecessary_cast)] + // libc::S_IFDIR is u32 on Linux and u16 on MacOS + Node::Symlink(_) => libc::S_IFLNK as u32, + }, + name: child_node.get_name(), + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } + } + + Ok(()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn open( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + _fuse_flags: u32, + ) -> io::Result<( + Option<Self::Handle>, + fuse_backend_rs::api::filesystem::OpenOptions, + )> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // lookup the inode + match *self.inode_tracker.read().get(inode).unwrap() { + // read is invalid on non-files. + InodeData::Directory(..) | InodeData::Symlink(_) => { + warn!("is directory"); + Err(io::Error::from_raw_os_error(libc::EISDIR)) + } + InodeData::Regular(ref blob_digest, _blob_size, _) => { + let span = info_span!("read", blob.digest = %blob_digest); + let _enter = span.enter(); + + let task = self.tokio_handle.spawn({ + let blob_service = self.blob_service.clone(); + let blob_digest = blob_digest.clone(); + async move { blob_service.as_ref().open_read(&blob_digest).await } + }); + + let blob_reader = self.tokio_handle.block_on(task).unwrap(); + + match blob_reader { + Ok(None) => { + warn!("blob not found"); + Err(io::Error::from_raw_os_error(libc::EIO)) + } + Err(e) => { + warn!(e=?e, "error opening blob"); + Err(io::Error::from_raw_os_error(libc::EIO)) + } + Ok(Some(blob_reader)) => { + // get a new file handle + // TODO: this will overflow after 2**64 operations, + // which is fine for now. + // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 + // for the discussion on alternatives. + let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst); + + debug!("add file handle {}", fh); + self.file_handles + .write() + .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader))); + + Ok(( + Some(fh), + fuse_backend_rs::api::filesystem::OpenOptions::empty(), + )) + } + } + } + } + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))] + fn release( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + handle: Self::Handle, + _flush: bool, + _flock_release: bool, + _lock_owner: Option<u64>, + ) -> io::Result<()> { + // remove and get ownership on the blob reader + match self.file_handles.write().remove(&handle) { + // drop it, which will close it. + Some(blob_reader) => drop(blob_reader), + None => { + // These might already be dropped if a read error occured. + debug!("file_handle {} not found", handle); + } + } + + Ok(()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))] + fn read( + &self, + _ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter, + size: u32, + offset: u64, + _lock_owner: Option<u64>, + _flags: u32, + ) -> io::Result<usize> { + debug!("read"); + + // We need to take out the blob reader from self.file_handles, so we can + // interact with it in the separate task. + // On success, we pass it back out of the task, so we can put it back in self.file_handles. + let blob_reader = match self.file_handles.read().get(&handle) { + Some(blob_reader) => blob_reader.clone(), + None => { + warn!("file handle {} unknown", handle); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + }; + + let task = self.tokio_handle.spawn(async move { + let mut blob_reader = blob_reader.lock().await; + + // seek to the offset specified, which is relative to the start of the file. + let resp = blob_reader.seek(io::SeekFrom::Start(offset)).await; + + match resp { + Ok(pos) => { + debug_assert_eq!(offset, pos); + } + Err(e) => { + warn!("failed to seek to offset {}: {}", offset, e); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + } + + // As written in the fuse docs, read should send exactly the number + // of bytes requested except on EOF or error. + + let mut buf: Vec<u8> = Vec::with_capacity(size as usize); + + // copy things from the internal buffer into buf to fill it till up until size + tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?; + + Ok(buf) + }); + + let buf = self.tokio_handle.block_on(task).unwrap()?; + + w.write(&buf) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // lookup the inode + match *self.inode_tracker.read().get(inode).unwrap() { + InodeData::Directory(..) | InodeData::Regular(..) => { + Err(io::Error::from_raw_os_error(libc::EINVAL)) + } + InodeData::Symlink(ref target) => Ok(target.to_vec()), + } + } +} diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs new file mode 100644 index 000000000000..6609e049a1fc --- /dev/null +++ b/tvix/castore/src/fs/root_nodes.rs @@ -0,0 +1,37 @@ +use std::collections::BTreeMap; + +use crate::{proto::node::Node, Error}; +use bytes::Bytes; +use futures::stream::BoxStream; +use tonic::async_trait; + +/// Provides an interface for looking up root nodes in tvix-castore by given +/// a lookup key (usually the basename), and optionally allow a listing. +#[async_trait] +pub trait RootNodes: Send + Sync { + /// Looks up a root CA node based on the basename of the node in the root + /// directory of the filesystem. + async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>; + + /// Lists all root CA nodes in the filesystem. An error can be returned + /// in case listing is not allowed + fn list(&self) -> BoxStream<Result<Node, Error>>; +} + +#[async_trait] +/// Implements RootNodes for something deref'ing to a BTreeMap of Nodes, where +/// the key is the node name. +impl<T> RootNodes for T +where + T: AsRef<BTreeMap<Bytes, Node>> + Send + Sync, +{ + async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> { + Ok(self.as_ref().get(name).cloned()) + } + + fn list(&self) -> BoxStream<Result<Node, Error>> { + Box::pin(tokio_stream::iter( + self.as_ref().iter().map(|(_, v)| Ok(v.clone())), + )) + } +} diff --git a/tvix/castore/src/fs/tests.rs b/tvix/castore/src/fs/tests.rs new file mode 100644 index 000000000000..2f27c3c1c8e4 --- /dev/null +++ b/tvix/castore/src/fs/tests.rs @@ -0,0 +1,1141 @@ +use std::{ + collections::BTreeMap, + io::{self, Cursor}, + os::unix::fs::MetadataExt, + path::Path, + sync::Arc, +}; + +use bytes::Bytes; +use tempfile::TempDir; +use tokio_stream::{wrappers::ReadDirStream, StreamExt}; + +use super::{fuse::FuseDaemon, TvixStoreFs}; +use crate::proto::node::Node; +use crate::proto::{self as castorepb}; +use crate::{ + blobservice::{BlobService, MemoryBlobService}, + directoryservice::{DirectoryService, MemoryDirectoryService}, + fixtures, +}; + +const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; +const BLOB_B_NAME: &str = "55555555555555555555555555555555-test"; +const HELLOWORLD_BLOB_NAME: &str = "66666666666666666666666666666666-test"; +const SYMLINK_NAME: &str = "11111111111111111111111111111111-test"; +const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test"; +const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test"; +const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test"; + +fn gen_svcs() -> (Arc<dyn BlobService>, Arc<dyn DirectoryService>) { + ( + Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>, + Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>, + ) +} + +fn do_mount<P: AsRef<Path>, BS, DS>( + blob_service: BS, + directory_service: DS, + root_nodes: BTreeMap<bytes::Bytes, Node>, + mountpoint: P, + list_root: bool, +) -> io::Result<FuseDaemon> +where + BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, + DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, +{ + let fs = TvixStoreFs::new( + blob_service, + directory_service, + Arc::new(root_nodes), + list_root, + ); + FuseDaemon::new(Arc::new(fs), mountpoint.as_ref(), 4) +} + +async fn populate_blob_a( + blob_service: &Arc<dyn BlobService>, + root_nodes: &mut BTreeMap<Bytes, Node>, +) { + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + root_nodes.insert( + BLOB_A_NAME.into(), + Node::File(castorepb::FileNode { + name: BLOB_A_NAME.into(), + digest: fixtures::BLOB_A_DIGEST.clone().into(), + size: fixtures::BLOB_A.len() as u64, + executable: false, + }), + ); +} + +async fn populate_blob_b( + blob_service: &Arc<dyn BlobService>, + root_nodes: &mut BTreeMap<Bytes, Node>, +) { + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + root_nodes.insert( + BLOB_B_NAME.into(), + Node::File(castorepb::FileNode { + name: BLOB_B_NAME.into(), + digest: fixtures::BLOB_B_DIGEST.clone().into(), + size: fixtures::BLOB_B.len() as u64, + executable: false, + }), + ); +} + +/// adds a blob containing helloworld and marks it as executable +async fn populate_blob_helloworld( + blob_service: &Arc<dyn BlobService>, + root_nodes: &mut BTreeMap<Bytes, Node>, +) { + let mut bw = blob_service.open_write().await; + tokio::io::copy( + &mut Cursor::new(fixtures::HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut bw, + ) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + root_nodes.insert( + HELLOWORLD_BLOB_NAME.into(), + Node::File(castorepb::FileNode { + name: HELLOWORLD_BLOB_NAME.into(), + digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), + size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: true, + }), + ); +} + +async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { + root_nodes.insert( + SYMLINK_NAME.into(), + Node::Symlink(castorepb::SymlinkNode { + name: SYMLINK_NAME.into(), + target: BLOB_A_NAME.into(), + }), + ); +} + +/// This writes a symlink pointing to /nix/store/somewhereelse, +/// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. +async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) { + root_nodes.insert( + SYMLINK_NAME2.into(), + Node::Symlink(castorepb::SymlinkNode { + name: SYMLINK_NAME2.into(), + target: "/nix/store/somewhereelse".into(), + }), + ); +} + +async fn populate_directory_with_keep( + blob_service: &Arc<dyn BlobService>, + directory_service: &Arc<dyn DirectoryService>, + root_nodes: &mut BTreeMap<Bytes, Node>, +) { + // upload empty blob + let mut bw = blob_service.open_write().await; + assert_eq!( + fixtures::EMPTY_BLOB_DIGEST.as_slice(), + bw.close().await.expect("must succeed closing").as_slice(), + ); + + // upload directory + directory_service + .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await + .expect("must succeed uploading"); + + root_nodes.insert( + DIRECTORY_WITH_KEEP_NAME.into(), + castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: DIRECTORY_WITH_KEEP_NAME.into(), + digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + size: fixtures::DIRECTORY_WITH_KEEP.size(), + }), + ); +} + +/// Create a root node for DIRECTORY_WITH_KEEP, but don't upload the Directory +/// itself. +async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) { + root_nodes.insert( + DIRECTORY_WITH_KEEP_NAME.into(), + castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: DIRECTORY_WITH_KEEP_NAME.into(), + digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + size: fixtures::DIRECTORY_WITH_KEEP.size(), + }), + ); +} + +/// Insert BLOB_A, but don't provide the blob .keep is pointing to. +async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) { + root_nodes.insert( + BLOB_A_NAME.into(), + Node::File(castorepb::FileNode { + name: BLOB_A_NAME.into(), + digest: fixtures::BLOB_A_DIGEST.clone().into(), + size: fixtures::BLOB_A.len() as u64, + executable: false, + }), + ); +} + +async fn populate_directory_complicated( + blob_service: &Arc<dyn BlobService>, + directory_service: &Arc<dyn DirectoryService>, + root_nodes: &mut BTreeMap<Bytes, Node>, +) { + // upload empty blob + let mut bw = blob_service.open_write().await; + assert_eq!( + fixtures::EMPTY_BLOB_DIGEST.as_slice(), + bw.close().await.expect("must succeed closing").as_slice(), + ); + + // upload inner directory + directory_service + .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await + .expect("must succeed uploading"); + + // upload parent directory + directory_service + .put(fixtures::DIRECTORY_COMPLICATED.clone()) + .await + .expect("must succeed uploading"); + + root_nodes.insert( + DIRECTORY_COMPLICATED_NAME.into(), + Node::Directory(castorepb::DirectoryNode { + name: DIRECTORY_COMPLICATED_NAME.into(), + digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), + size: fixtures::DIRECTORY_COMPLICATED.size(), + }), + ); +} + +/// Ensure mounting itself doesn't fail +#[tokio::test] +async fn mount() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + BTreeMap::default(), + tmpdir.path(), + false, + ) + .expect("must succeed"); + + fuse_daemon.unmount().expect("unmount"); +} +/// Ensure listing the root isn't allowed +#[tokio::test] +async fn root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + BTreeMap::default(), + tmpdir.path(), + false, + ) + .expect("must succeed"); + + { + // read_dir succeeds, but getting the first element will fail. + let mut it = ReadDirStream::new(tokio::fs::read_dir(tmpdir).await.expect("must succeed")); + + let err = it + .next() + .await + .expect("must be some") + .expect_err("must be err"); + assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure listing the root is allowed if configured explicitly +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn root_with_listing() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + true, /* allow listing */ + ) + .expect("must succeed"); + + { + // read_dir succeeds, but getting the first element will fail. + let mut it = ReadDirStream::new(tokio::fs::read_dir(tmpdir).await.expect("must succeed")); + + let e = it + .next() + .await + .expect("must be some") + .expect("must succeed"); + + let metadata = e.metadata().await.expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can stat a file at the root +#[tokio::test] +async fn stat_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + // peek at the file metadata + let metadata = tokio::fs::metadata(p).await.expect("must succeed"); + + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can read a file at the root +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + // read the file contents + let data = tokio::fs::read(p).await.expect("must succeed"); + + // ensure size and contents match + assert_eq!(fixtures::BLOB_A.len(), data.len()); + assert_eq!(fixtures::BLOB_A.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can read a large file at the root +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_large_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_b(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_B_NAME); + { + // peek at the file metadata + let metadata = tokio::fs::metadata(&p).await.expect("must succeed"); + + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_B.len() as u64, metadata.len()); + } + + // read the file contents + let data = tokio::fs::read(p).await.expect("must succeed"); + + // ensure size and contents match + assert_eq!(fixtures::BLOB_B.len(), data.len()); + assert_eq!(fixtures::BLOB_B.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read the target of a symlink +#[tokio::test] +async fn symlink_readlink() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_symlink(&mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(SYMLINK_NAME); + + let target = tokio::fs::read_link(&p).await.expect("must succeed"); + assert_eq!(BLOB_A_NAME, target.to_str().unwrap()); + + // peek at the file metadata, which follows symlinks. + // this must fail, as we didn't populate the target. + let e = tokio::fs::metadata(&p).await.expect_err("must fail"); + assert_eq!(std::io::ErrorKind::NotFound, e.kind()); + + // peeking at the file metadata without following symlinks will succeed. + let metadata = tokio::fs::symlink_metadata(&p).await.expect("must succeed"); + assert!(metadata.is_symlink()); + + // reading from the symlink (which follows) will fail, because the target doesn't exist. + let e = tokio::fs::read(p).await.expect_err("must fail"); + assert_eq!(std::io::ErrorKind::NotFound, e.kind()); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read and stat a regular file through a symlink pointing to it. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_stat_through_symlink() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + populate_symlink(&mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_symlink = tmpdir.path().join(SYMLINK_NAME); + let p_blob = tmpdir.path().join(SYMLINK_NAME); + + // peek at the file metadata, which follows symlinks. + // this must now return the same metadata as when statting at the target directly. + let metadata_symlink = tokio::fs::metadata(&p_symlink).await.expect("must succeed"); + let metadata_blob = tokio::fs::metadata(&p_blob).await.expect("must succeed"); + assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type()); + assert_eq!(metadata_blob.len(), metadata_symlink.len()); + + // reading from the symlink (which follows) will return the same data as if + // we were reading from the file directly. + assert_eq!( + tokio::fs::read(p_blob).await.expect("must succeed"), + tokio::fs::read(p_symlink).await.expect("must succeed"), + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read a directory in the root, and validate some attributes. +#[tokio::test] +async fn read_stat_directory() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + + // peek at the metadata of the directory + let metadata = tokio::fs::metadata(p).await.expect("must succeed"); + assert!(metadata.is_dir()); + assert!(metadata.permissions().readonly()); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Read a blob inside a directory. This ensures we successfully populate directory data. +async fn read_blob_inside_dir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); + + // peek at metadata. + let metadata = tokio::fs::metadata(&p).await.expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + + // read from it + let data = tokio::fs::read(&p).await.expect("must succeed"); + assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Read a blob inside a directory inside a directory. This ensures we properly +/// populate directories as we traverse down the structure. +async fn read_blob_deep_inside_dir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir + .path() + .join(DIRECTORY_COMPLICATED_NAME) + .join("keep") + .join(".keep"); + + // peek at metadata. + let metadata = tokio::fs::metadata(&p).await.expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + + // read from it + let data = tokio::fs::read(&p).await.expect("must succeed"); + assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure readdir works. +#[tokio::test] +async fn readdir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME); + + { + // read_dir should succeed. Collect all elements + let elements: Vec<_> = + ReadDirStream::new(tokio::fs::read_dir(p).await.expect("must succeed")) + .map(|e| e.expect("must not be err")) + .collect() + .await; + + assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and .. + + // We explicitly look at specific positions here, because we always emit + // them ordered. + + // ".keep", 0 byte file. + let e = &elements[0]; + assert_eq!(".keep", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); + + // "aa", symlink. + let e = &elements[1]; + assert_eq!("aa", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_symlink()); + + // "keep", directory + let e = &elements[2]; + assert_eq!("keep", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_dir()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory. +async fn readdir_deep() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); + + { + // read_dir should succeed. Collect all elements + let elements: Vec<_> = + ReadDirStream::new(tokio::fs::read_dir(p).await.expect("must succeed")) + .map(|e| e.expect("must not be err")) + .collect() + .await; + + assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and .. + + // ".keep", 0 byte file. + let e = &elements[0]; + assert_eq!(".keep", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Check attributes match how they show up in /nix/store normally. +#[tokio::test] +async fn check_attributes() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; + populate_symlink(&mut root_nodes).await; + populate_blob_helloworld(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_file = tmpdir.path().join(BLOB_A_NAME); + let p_directory = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + let p_symlink = tmpdir.path().join(SYMLINK_NAME); + let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME); + + // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident. + let metadata_file = tokio::fs::symlink_metadata(&p_file) + .await + .expect("must succeed"); + let metadata_executable_file = tokio::fs::symlink_metadata(&p_executable_file) + .await + .expect("must succeed"); + let metadata_directory = tokio::fs::symlink_metadata(&p_directory) + .await + .expect("must succeed"); + let metadata_symlink = tokio::fs::symlink_metadata(&p_symlink) + .await + .expect("must succeed"); + + // modes should match. We & with 0o777 to remove any higher bits. + assert_eq!(0o444, metadata_file.mode() & 0o777); + assert_eq!(0o555, metadata_executable_file.mode() & 0o777); + assert_eq!(0o555, metadata_directory.mode() & 0o777); + assert_eq!(0o444, metadata_symlink.mode() & 0o777); + + // files should have the correct filesize + assert_eq!(fixtures::BLOB_A.len() as u64, metadata_file.len()); + // directories should have their "size" as filesize + assert_eq!( + { fixtures::DIRECTORY_WITH_KEEP.size() }, + metadata_directory.size() + ); + + for metadata in &[&metadata_file, &metadata_directory, &metadata_symlink] { + // uid and gid should be 0. + assert_eq!(0, metadata.uid()); + assert_eq!(0, metadata.gid()); + + // all times should be set to the unix epoch. + assert_eq!(0, metadata.atime()); + assert_eq!(0, metadata.mtime()); + assert_eq!(0, metadata.ctime()); + // crtime seems MacOS only + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Ensure we allocate the same inodes for the same directory contents. +/// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP. +async fn compare_inodes_directories() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + let p_sibling_dir = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); + + // peek at metadata. + assert_eq!( + tokio::fs::metadata(p_dir_with_keep) + .await + .expect("must succeed") + .ino(), + tokio::fs::metadata(p_sibling_dir) + .await + .expect("must succeed") + .ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we allocate the same inodes for the same directory contents. +/// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep +#[tokio::test] +async fn compare_inodes_files() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep"); + let p_keep2 = tmpdir + .path() + .join(DIRECTORY_COMPLICATED_NAME) + .join("keep") + .join(".keep"); + + // peek at metadata. + assert_eq!( + tokio::fs::metadata(p_keep1) + .await + .expect("must succeed") + .ino(), + tokio::fs::metadata(p_keep2) + .await + .expect("must succeed") + .ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we allocate the same inode for symlinks pointing to the same targets. +/// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2. +#[tokio::test] +async fn compare_inodes_symlinks() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + populate_symlink2(&mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa"); + let p2 = tmpdir.path().join(SYMLINK_NAME2); + + // peek at metadata. + assert_eq!( + tokio::fs::symlink_metadata(p1) + .await + .expect("must succeed") + .ino(), + tokio::fs::symlink_metadata(p2) + .await + .expect("must succeed") + .ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Check we match paths exactly. +#[tokio::test] +async fn read_wrong_paths_in_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + // wrong name + assert!( + tokio::fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); + + // invalid hash + assert!( + tokio::fs::metadata(tmpdir.path().join("0000000000000000000000000000000-test")) + .await + .is_err() + ); + + // right name, must exist + assert!( + tokio::fs::metadata(tmpdir.path().join("00000000000000000000000000000000-test")) + .await + .is_ok() + ); + + // now wrong name with right hash still may not exist + assert!( + tokio::fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Make sure writes are not allowed +#[tokio::test] +async fn disallow_writes() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let root_nodes = BTreeMap::default(); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + let e = tokio::fs::File::create(p).await.expect_err("must fail"); + + assert_eq!(Some(libc::EROFS), e.raw_os_error()); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Ensure we get an IO error if the directory service does not have the Directory object. +async fn missing_directory() { + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directorynode_without_directory(&mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + + { + // `stat` on the path should succeed, because it doesn't trigger the directory request. + tokio::fs::metadata(&p).await.expect("must succeed"); + + // However, calling either `readdir` or `stat` on a child should fail with an IO error. + // It fails when trying to pull the first entry, because we don't implement opendir separately + ReadDirStream::new(tokio::fs::read_dir(&p).await.unwrap()) + .next() + .await + .expect("must be some") + .expect_err("must be err"); + + // rust currently sets e.kind() to Uncategorized, which isn't very + // helpful, so we don't look at the error more closely than that.. + tokio::fs::metadata(p.join(".keep")) + .await + .expect_err("must fail"); + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Ensure we get an IO error if the blob service does not have the blob +async fn missing_blob() { + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_filenode_without_blob(&mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + { + // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service. + tokio::fs::metadata(&p).await.expect("must succeed"); + + // However, calling read on the blob should fail. + // rust currently sets e.kind() to Uncategorized, which isn't very + // helpful, so we don't look at the error more closely than that.. + tokio::fs::read(p).await.expect_err("must fail"); + } + + fuse_daemon.unmount().expect("unmount"); +} diff --git a/tvix/castore/src/fs/virtiofs.rs b/tvix/castore/src/fs/virtiofs.rs new file mode 100644 index 000000000000..846270d28568 --- /dev/null +++ b/tvix/castore/src/fs/virtiofs.rs @@ -0,0 +1,237 @@ +use std::{ + convert, error, fmt, io, + ops::Deref, + path::Path, + sync::{Arc, MutexGuard, RwLock}, +}; + +use fuse_backend_rs::{ + api::{filesystem::FileSystem, server::Server}, + transport::{FsCacheReqHandler, Reader, VirtioFsWriter}, +}; +use tracing::error; +use vhost::vhost_user::{ + Listener, SlaveFsCacheReq, VhostUserProtocolFeatures, VhostUserVirtioFeatures, +}; +use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringMutex, VringState, VringT}; +use virtio_bindings::bindings::virtio_ring::{ + VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC, +}; +use virtio_queue::QueueT; +use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; +use vmm_sys_util::epoll::EventSet; + +const VIRTIO_F_VERSION_1: u32 = 32; +const NUM_QUEUES: usize = 2; +const QUEUE_SIZE: usize = 1024; + +#[derive(Debug)] +enum Error { + /// Failed to handle non-input event. + HandleEventNotEpollIn, + /// Failed to handle unknown event. + HandleEventUnknownEvent, + /// Invalid descriptor chain. + InvalidDescriptorChain, + /// Failed to handle filesystem requests. + HandleRequests(fuse_backend_rs::Error), + /// Failed to construct new vhost user daemon. + NewDaemon, + /// Failed to start the vhost user daemon. + StartDaemon, + /// Failed to wait for the vhost user daemon. + WaitDaemon, +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "vhost_user_fs_error: {self:?}") + } +} + +impl error::Error for Error {} + +impl convert::From<Error> for io::Error { + fn from(e: Error) -> Self { + io::Error::new(io::ErrorKind::Other, e) + } +} + +struct VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + server: Arc<Server<Arc<FS>>>, + event_idx: bool, + guest_mem: GuestMemoryAtomic<GuestMemoryMmap>, + cache_req: Option<SlaveFsCacheReq>, +} + +impl<FS> VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + fn process_queue(&mut self, vring: &mut MutexGuard<VringState>) -> std::io::Result<bool> { + let mut used_descs = false; + + while let Some(desc_chain) = vring + .get_queue_mut() + .pop_descriptor_chain(self.guest_mem.memory()) + { + let memory = desc_chain.memory(); + let reader = Reader::from_descriptor_chain(memory, desc_chain.clone()) + .map_err(|_| Error::InvalidDescriptorChain)?; + let writer = VirtioFsWriter::new(memory, desc_chain.clone()) + .map_err(|_| Error::InvalidDescriptorChain)?; + + self.server + .handle_message( + reader, + writer.into(), + self.cache_req + .as_mut() + .map(|req| req as &mut dyn FsCacheReqHandler), + None, + ) + .map_err(Error::HandleRequests)?; + + // TODO: Is len 0 correct? + if let Err(error) = vring + .get_queue_mut() + .add_used(memory, desc_chain.head_index(), 0) + { + error!(?error, "failed to add desc back to ring"); + } + + // TODO: What happens if we error out before here? + used_descs = true; + } + + let needs_notification = if self.event_idx { + match vring + .get_queue_mut() + .needs_notification(self.guest_mem.memory().deref()) + { + Ok(needs_notification) => needs_notification, + Err(error) => { + error!(?error, "failed to check if queue needs notification"); + true + } + } + } else { + true + }; + + if needs_notification { + if let Err(error) = vring.signal_used_queue() { + error!(?error, "failed to signal used queue"); + } + } + + Ok(used_descs) + } +} + +impl<FS> VhostUserBackendMut<VringMutex> for VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + fn num_queues(&self) -> usize { + NUM_QUEUES + } + + fn max_queue_size(&self) -> usize { + QUEUE_SIZE + } + + fn features(&self) -> u64 { + 1 << VIRTIO_F_VERSION_1 + | 1 << VIRTIO_RING_F_INDIRECT_DESC + | 1 << VIRTIO_RING_F_EVENT_IDX + | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::SLAVE_REQ + } + + fn set_event_idx(&mut self, enabled: bool) { + self.event_idx = enabled; + } + + fn update_memory(&mut self, _mem: GuestMemoryAtomic<GuestMemoryMmap>) -> std::io::Result<()> { + // This is what most the vhost user implementations do... + Ok(()) + } + + fn set_slave_req_fd(&mut self, cache_req: SlaveFsCacheReq) { + self.cache_req = Some(cache_req); + } + + fn handle_event( + &mut self, + device_event: u16, + evset: vmm_sys_util::epoll::EventSet, + vrings: &[VringMutex], + _thread_id: usize, + ) -> std::io::Result<bool> { + if evset != EventSet::IN { + return Err(Error::HandleEventNotEpollIn.into()); + } + + let mut queue = match device_event { + // High priority queue + 0 => vrings[0].get_mut(), + // Regurlar priority queue + 1 => vrings[1].get_mut(), + _ => { + return Err(Error::HandleEventUnknownEvent.into()); + } + }; + + if self.event_idx { + loop { + queue + .get_queue_mut() + .enable_notification(self.guest_mem.memory().deref()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + if !self.process_queue(&mut queue)? { + break; + } + } + } else { + self.process_queue(&mut queue)?; + } + + Ok(false) + } +} + +pub fn start_virtiofs_daemon<FS, P>(fs: FS, socket: P) -> io::Result<()> +where + FS: FileSystem + Send + Sync + 'static, + P: AsRef<Path>, +{ + let guest_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); + + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let backend = Arc::new(RwLock::new(VhostUserFsBackend { + server, + guest_mem: guest_mem.clone(), + event_idx: false, + cache_req: None, + })); + + let listener = Listener::new(socket, true).unwrap(); + + let mut fs_daemon = + VhostUserDaemon::new(String::from("vhost-user-fs-tvix-store"), backend, guest_mem) + .map_err(|_| Error::NewDaemon)?; + + fs_daemon.start(listener).map_err(|_| Error::StartDaemon)?; + + fs_daemon.wait().map_err(|_| Error::WaitDaemon)?; + + Ok(()) +} diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs new file mode 100644 index 000000000000..f7677e63142d --- /dev/null +++ b/tvix/castore/src/import.rs @@ -0,0 +1,384 @@ +use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryPutter; +use crate::directoryservice::DirectoryService; +use crate::proto::node::Node; +use crate::proto::Directory; +use crate::proto::DirectoryNode; +use crate::proto::FileNode; +use crate::proto::SymlinkNode; +use crate::Error as CastoreError; +use async_stream::stream; +use futures::pin_mut; +use futures::Stream; +use std::fs::FileType; + +#[cfg(target_family = "unix")] +use std::os::unix::ffi::OsStrExt; + +use std::{ + collections::HashMap, + fmt::Debug, + os::unix::prelude::PermissionsExt, + path::{Path, PathBuf}, +}; +use tokio_stream::StreamExt; +use tracing::instrument; +use walkdir::DirEntry; +use walkdir::WalkDir; + +#[cfg(debug_assertions)] +use std::collections::HashSet; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed to upload directory at {0}: {1}")] + UploadDirectoryError(PathBuf, CastoreError), + + #[error("invalid encoding encountered for entry {0:?}")] + InvalidEncoding(PathBuf), + + #[error("unable to stat {0}: {1}")] + UnableToStat(PathBuf, std::io::Error), + + #[error("unable to open {0}: {1}")] + UnableToOpen(PathBuf, std::io::Error), + + #[error("unable to read {0}: {1}")] + UnableToRead(PathBuf, std::io::Error), + + #[error("unsupported file {0} type: {1:?}")] + UnsupportedFileType(PathBuf, FileType), +} + +impl From<CastoreError> for Error { + fn from(value: CastoreError) -> Self { + match value { + CastoreError::InvalidRequest(_) => panic!("tvix bug"), + CastoreError::StorageError(_) => panic!("error"), + } + } +} + +impl From<Error> for std::io::Error { + fn from(value: Error) -> Self { + std::io::Error::new(std::io::ErrorKind::Other, value) + } +} + +/// This processes a given [walkdir::DirEntry] and returns a +/// proto::node::Node, depending on the type of the entry. +/// +/// If the entry is a file, its contents are uploaded. +/// If the entry is a directory, the Directory is uploaded as well. +/// For this to work, it relies on the caller to provide the directory object +/// with the previously returned (child) nodes. +/// +/// It assumes to be called only if all children of it have already been processed. If the entry is +/// indeed a directory, it'll also upload that directory to the store. For this, the +/// so-far-assembled Directory object for this path needs to be passed in. +/// +/// It assumes the caller adds returned nodes to the directories it assembles. +#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] +async fn process_entry<'a, BS>( + blob_service: BS, + directory_putter: &'a mut Box<dyn DirectoryPutter>, + entry: &'a walkdir::DirEntry, + maybe_directory: Option<Directory>, +) -> Result<Node, Error> +where + BS: AsRef<dyn BlobService> + Clone, +{ + let file_type = entry.file_type(); + + if file_type.is_dir() { + let directory = maybe_directory + .expect("tvix bug: must be called with some directory in the case of directory"); + let directory_digest = directory.digest(); + let directory_size = directory.size(); + + // upload this directory + directory_putter + .put(directory) + .await + .map_err(|e| Error::UploadDirectoryError(entry.path().to_path_buf(), e))?; + + return Ok(Node::Directory(DirectoryNode { + name: entry.file_name().as_bytes().to_owned().into(), + digest: directory_digest.into(), + size: directory_size, + })); + } + + if file_type.is_symlink() { + let target: bytes::Bytes = std::fs::read_link(entry.path()) + .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))? + .as_os_str() + .as_bytes() + .to_owned() + .into(); + + return Ok(Node::Symlink(SymlinkNode { + name: entry.file_name().as_bytes().to_owned().into(), + target, + })); + } + + if file_type.is_file() { + let metadata = entry + .metadata() + .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?; + + let mut file = tokio::fs::File::open(entry.path()) + .await + .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?; + + let mut writer = blob_service.as_ref().open_write().await; + + if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { + return Err(Error::UnableToRead(entry.path().to_path_buf(), e)); + }; + + let digest = writer + .close() + .await + .map_err(|e| Error::UnableToRead(entry.path().to_path_buf(), e))?; + + return Ok(Node::File(FileNode { + name: entry.file_name().as_bytes().to_vec().into(), + digest: digest.into(), + size: metadata.len(), + // If it's executable by the user, it'll become executable. + // This matches nix's dump() function behaviour. + executable: metadata.permissions().mode() & 64 != 0, + })); + } + + // Nix says things like: error: file '/home/raito/dev/code.tvl.fyi/tvix/glue/src/tests/import_fixtures/a_devnode' has an unsupported type + Err(Error::UnsupportedFileType( + entry.path().to_path_buf(), + file_type, + )) +} + +/// Walk the filesystem at a given path and returns a level-keyed list of directory entries. +/// +/// This is how [`ingest_path`] assembles the set of entries to pass on [`ingest_entries`]. +/// This low-level function can be used if additional filtering or processing is required on the +/// entries. +/// +/// Level here is in the context of graph theory, e.g. 2-level nodes +/// are nodes that are at depth 2. +/// +/// This function will walk the filesystem using `walkdir` and will consume +/// `O(#number of entries)` space. +#[instrument(fields(path), err)] +pub fn walk_path_for_ingestion<P>(path: P) -> Result<Vec<Vec<DirEntry>>, Error> +where + P: AsRef<Path> + std::fmt::Debug, +{ + let mut entries_per_depths: Vec<Vec<DirEntry>> = vec![Vec::new()]; + for entry in WalkDir::new(path.as_ref()) + .follow_links(false) + .follow_root_links(false) + .contents_first(false) + .sort_by_file_name() + .into_iter() + { + // Entry could be a NotFound, if the root path specified does not exist. + let entry = entry.map_err(|e| { + Error::UnableToOpen( + PathBuf::from(path.as_ref()), + e.into_io_error().expect("walkdir err must be some"), + ) + })?; + + if entry.depth() >= entries_per_depths.len() { + debug_assert!( + entry.depth() == entries_per_depths.len(), + "Received unexpected entry with depth {} during descent, previously at {}", + entry.depth(), + entries_per_depths.len() + ); + + entries_per_depths.push(vec![entry]); + } else { + entries_per_depths[entry.depth()].push(entry); + } + } + + Ok(entries_per_depths) +} + +/// Convert a leveled-key vector of filesystem entries into a stream of +/// [DirEntry] in a way that honors the Merkle invariant, i.e. from bottom to top. +pub fn leveled_entries_to_stream( + entries_per_depths: Vec<Vec<DirEntry>>, +) -> impl Stream<Item = DirEntry> { + stream! { + for level in entries_per_depths.into_iter().rev() { + for entry in level.into_iter() { + yield entry; + } + } + } +} + +/// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and +/// [DirectoryService]. It returns the root node or an error. +/// +/// It does not follow symlinks at the root, they will be ingested as actual symlinks. +#[instrument(skip(blob_service, directory_service), fields(path), err)] +pub async fn ingest_path<'a, BS, DS, P>( + blob_service: BS, + directory_service: DS, + path: P, +) -> Result<Node, Error> +where + P: AsRef<Path> + std::fmt::Debug, + BS: AsRef<dyn BlobService> + Clone, + DS: AsRef<dyn DirectoryService>, +{ + let entries = walk_path_for_ingestion(path)?; + let entries_stream = leveled_entries_to_stream(entries); + pin_mut!(entries_stream); + + ingest_entries(blob_service, directory_service, entries_stream).await +} + +/// The Merkle invariant checker is an internal structure to perform bookkeeping of all directory +/// entries we are ingesting and verifying we are ingesting them in the right order. +/// +/// That is, whenever we process an entry `L`, we would like to verify if we didn't process earlier +/// an entry `P` such that `P` is an **ancestor** of `L`. +/// +/// If such a thing happened, it means that we have processed something like: +/// +///```no_trust +/// A +/// / \ +/// B C +/// / \ \ +/// G F P <--------- processed before this one +/// / \ | +/// D E | +/// \ | +/// L <-----------------------------+ +/// ``` +/// +/// This is exactly what must never happen. +/// +/// Note: this checker is local, it can only see what happens on our side, not on the remote side, +/// i.e. the different remote services. +#[derive(Default)] +#[cfg(debug_assertions)] +struct MerkleInvariantChecker { + seen: HashSet<PathBuf>, +} + +#[cfg(debug_assertions)] +impl MerkleInvariantChecker { + /// See a directory entry and remember it. + fn see(&mut self, node: &DirEntry) { + self.seen.insert(node.path().to_owned()); + } + + /// Returns a potential ancestor already seen for that directory entry. + fn find_ancestor(&self, node: &DirEntry) -> Option<PathBuf> { + for anc in node.path().ancestors() { + if self.seen.contains(anc) { + return Some(anc.to_owned()); + } + } + + None + } +} + +/// Ingests elements from the given stream of [`DirEntry`] into a the passed [`BlobService`] and +/// [`DirectoryService`]. +/// It does not follow symlinks at the root, they will be ingested as actual symlinks. +#[instrument(skip_all, ret, err)] +pub async fn ingest_entries<'a, BS, DS, S>( + blob_service: BS, + directory_service: DS, + mut entries_async_iterator: S, +) -> Result<Node, Error> +where + BS: AsRef<dyn BlobService> + Clone, + DS: AsRef<dyn DirectoryService>, + S: Stream<Item = DirEntry> + std::marker::Unpin, +{ + let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); + + let mut directory_putter = directory_service.as_ref().put_multiple_start(); + + #[cfg(debug_assertions)] + let mut invariant_checker: MerkleInvariantChecker = Default::default(); + + // We need to process a directory's children before processing + // the directory itself in order to have all the data needed + // to compute the hash. + while let Some(entry) = entries_async_iterator.next().await { + #[cfg(debug_assertions)] + { + // If we find an ancestor before we see this entry, this means that the caller + // broke the contract, refer to the documentation of the invariant checker to + // understand the reasoning here. + if let Some(ancestor) = invariant_checker.find_ancestor(&entry) { + panic!("Tvix bug: merkle invariant checker discovered that {} was processed before {}!", + ancestor.display(), + entry.path().display() + ); + } + + invariant_checker.see(&entry); + } + + // FUTUREWORK: handle directory putting here, rather than piping it through process_entry. + let node = process_entry( + blob_service.clone(), + &mut directory_putter, + &entry, + // process_entry wants an Option<Directory> in case the entry points to a directory. + // make sure to provide it. + // If the directory has contents, we already have it in + // `directories` because we iterate over depth in reverse order (deepest to + // shallowest). + if entry.file_type().is_dir() { + Some( + directories + .remove(entry.path()) + // In that case, it contained no children + .unwrap_or_default(), + ) + } else { + None + }, + ) + .await?; + + if entry.depth() == 0 { + // Make sure all the directories are flushed. + // FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal + // to `directories.get(entry.path())`. + if entry.file_type().is_dir() { + directory_putter.close().await?; + } + return Ok(node); + } else { + // calculate the parent path, and make sure we register the node there. + // NOTE: entry.depth() > 0 + let parent_path = entry.path().parent().unwrap().to_path_buf(); + + // record node in parent directory, creating a new [proto:Directory] if not there yet. + let parent_directory = directories.entry(parent_path).or_default(); + match node { + Node::Directory(e) => parent_directory.directories.push(e), + Node::File(e) => parent_directory.files.push(e), + Node::Symlink(e) => parent_directory.symlinks.push(e), + } + } + } + // unreachable, we already bailed out before if root doesn't exist. + unreachable!("Tvix bug: no root node emitted during ingestion") +} diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs new file mode 100644 index 000000000000..8da0edef786b --- /dev/null +++ b/tvix/castore/src/lib.rs @@ -0,0 +1,20 @@ +mod digests; +mod errors; + +pub mod blobservice; +pub mod directoryservice; +pub mod fixtures; + +#[cfg(feature = "fs")] +pub mod fs; + +pub mod import; +pub mod proto; +pub mod tonic; +pub mod utils; + +pub use digests::{B3Digest, B3_LEN}; +pub use errors::Error; + +#[cfg(test)] +mod tests; diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs new file mode 100644 index 000000000000..9f3f944da26f --- /dev/null +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -0,0 +1,168 @@ +use crate::blobservice::BlobService; +use core::pin::pin; +use futures::{stream::BoxStream, TryFutureExt}; +use std::{ + collections::VecDeque, + ops::{Deref, DerefMut}, +}; +use tokio_stream::StreamExt; +use tokio_util::io::ReaderStream; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{instrument, warn}; + +pub struct GRPCBlobServiceWrapper<T> { + blob_service: T, +} + +impl<T> GRPCBlobServiceWrapper<T> { + pub fn new(blob_service: T) -> Self { + Self { blob_service } + } +} + +// This is necessary because bytes::BytesMut comes up with +// a default 64 bytes capacity that cannot be changed +// easily if you assume a bytes::BufMut trait implementation +// Therefore, we override the Default implementation here +// TODO(raitobezarius?): upstream me properly +struct BytesMutWithDefaultCapacity<const N: usize> { + inner: bytes::BytesMut, +} + +impl<const N: usize> Deref for BytesMutWithDefaultCapacity<N> { + type Target = bytes::BytesMut; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<const N: usize> DerefMut for BytesMutWithDefaultCapacity<N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl<const N: usize> Default for BytesMutWithDefaultCapacity<N> { + fn default() -> Self { + BytesMutWithDefaultCapacity { + inner: bytes::BytesMut::with_capacity(N), + } + } +} + +impl<const N: usize> bytes::Buf for BytesMutWithDefaultCapacity<N> { + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn chunk(&self) -> &[u8] { + self.inner.chunk() + } + + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt); + } +} + +unsafe impl<const N: usize> bytes::BufMut for BytesMutWithDefaultCapacity<N> { + fn remaining_mut(&self) -> usize { + self.inner.remaining_mut() + } + + unsafe fn advance_mut(&mut self, cnt: usize) { + self.inner.advance_mut(cnt); + } + + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + self.inner.chunk_mut() + } +} + +#[async_trait] +impl<T> super::blob_service_server::BlobService for GRPCBlobServiceWrapper<T> +where + T: Deref<Target = dyn BlobService> + Send + Sync + 'static, +{ + // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 + type ReadStream = BoxStream<'static, Result<super::BlobChunk, Status>>; + + #[instrument(skip(self))] + async fn stat( + &self, + request: Request<super::StatBlobRequest>, + ) -> Result<Response<super::StatBlobResponse>, Status> { + let rq = request.into_inner(); + let req_digest = rq + .digest + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.chunks(&req_digest).await { + Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), + Ok(Some(chunk_metas)) => Ok(Response::new(super::StatBlobResponse { + chunks: chunk_metas, + ..Default::default() + })), + Err(e) => Err(e.into()), + } + } + + #[instrument(skip(self))] + async fn read( + &self, + request: Request<super::ReadBlobRequest>, + ) -> Result<Response<Self::ReadStream>, Status> { + let rq = request.into_inner(); + + let req_digest = rq + .digest + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.open_read(&req_digest).await { + Ok(Some(r)) => { + let chunks_stream = + ReaderStream::new(r).map(|chunk| Ok(super::BlobChunk { data: chunk? })); + Ok(Response::new(Box::pin(chunks_stream))) + } + Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), + Err(e) => Err(e.into()), + } + } + + #[instrument(skip(self))] + async fn put( + &self, + request: Request<Streaming<super::BlobChunk>>, + ) -> Result<Response<super::PutBlobResponse>, Status> { + let req_inner = request.into_inner(); + + let data_stream = req_inner.map(|x| { + x.map(|x| VecDeque::from(x.data.to_vec())) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) + }); + + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); + + let mut blob_writer = pin!(self.blob_service.open_write().await); + + tokio::io::copy(&mut data_reader, &mut blob_writer) + .await + .map_err(|e| { + warn!("error copying: {}", e); + Status::internal("error copying") + })?; + + let digest = blob_writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") + }) + .await?; + + Ok(Response::new(super::PutBlobResponse { + digest: digest.into(), + })) + } +} diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs new file mode 100644 index 000000000000..b83048045861 --- /dev/null +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -0,0 +1,178 @@ +use crate::proto; +use crate::{directoryservice::DirectoryService, B3Digest}; +use futures::StreamExt; +use std::collections::HashMap; +use std::ops::Deref; +use tokio::sync::mpsc::channel; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{debug, instrument, warn}; + +pub struct GRPCDirectoryServiceWrapper<T> { + directory_service: T, +} + +impl<T> GRPCDirectoryServiceWrapper<T> { + pub fn new(directory_service: T) -> Self { + Self { directory_service } + } +} + +#[async_trait] +impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T> +where + T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static, +{ + type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>; + + #[instrument(skip(self))] + async fn get( + &self, + request: Request<proto::GetDirectoryRequest>, + ) -> Result<Response<Self::GetStream>, Status> { + let (tx, rx) = channel(5); + + let req_inner = request.into_inner(); + + // look at the digest in the request and put it in the top of the queue. + match &req_inner.by_what { + None => return Err(Status::invalid_argument("by_what needs to be specified")), + Some(proto::get_directory_request::ByWhat::Digest(ref digest)) => { + let digest: B3Digest = digest + .clone() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + if !req_inner.recursive { + let e: Result<proto::Directory, Status> = + match self.directory_service.get(&digest).await { + Ok(Some(directory)) => Ok(directory), + Ok(None) => { + Err(Status::not_found(format!("directory {} not found", digest))) + } + Err(e) => Err(e.into()), + }; + + if tx.send(e).await.is_err() { + debug!("receiver dropped"); + } + } else { + // If recursive was requested, traverse via get_recursive. + let mut directories_it = self.directory_service.get_recursive(&digest); + + while let Some(e) = directories_it.next().await { + // map err in res from Error to Status + let res = e.map_err(|e| Status::internal(e.to_string())); + if tx.send(res).await.is_err() { + debug!("receiver dropped"); + break; + } + } + } + } + } + + let receiver_stream = ReceiverStream::new(rx); + Ok(Response::new(receiver_stream)) + } + + #[instrument(skip(self, request))] + async fn put( + &self, + request: Request<Streaming<proto::Directory>>, + ) -> Result<Response<proto::PutDirectoryResponse>, Status> { + let mut req_inner = request.into_inner(); + // TODO: let this use DirectoryPutter to the store it's connected to, + // and move the validation logic into [SimplePutter]. + + // This keeps track of the seen directory keys, and their size. + // This is used to validate the size field of a reference to a previously sent directory. + // We don't need to keep the contents around, they're stored in the DB. + // https://github.com/rust-lang/rust-clippy/issues/5812 + #[allow(clippy::mutable_key_type)] + let mut seen_directories_sizes: HashMap<B3Digest, u64> = HashMap::new(); + let mut last_directory_dgst: Option<B3Digest> = None; + + // Consume directories, and insert them into the store. + // Reject directory messages that refer to Directories not sent in the same stream. + while let Some(directory) = req_inner.message().await? { + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Status::invalid_argument(format!( + "directory {} failed validation: {}", + directory.digest(), + e, + ))); + } + + // for each child directory this directory refers to, we need + // to ensure it has been seen already in this stream, and that the size + // matches what we recorded. + for child_directory in &directory.directories { + let child_directory_digest: B3Digest = child_directory + .digest + .clone() + .try_into() + .map_err(|_e| Status::internal("invalid child directory digest len"))?; + + match seen_directories_sizes.get(&child_directory_digest) { + None => { + return Err(Status::invalid_argument(format!( + "child directory '{:?}' ({}) in directory '{}' not seen yet", + child_directory.name, + &child_directory_digest, + &directory.digest(), + ))); + } + Some(seen_child_directory_size) => { + if seen_child_directory_size != &child_directory.size { + return Err(Status::invalid_argument(format!( + "child directory '{:?}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}", + child_directory.name, + &child_directory_digest, + &directory.digest(), + seen_child_directory_size, + child_directory.size, + ))); + } + } + } + } + + // NOTE: We can't know if a directory we're receiving actually is + // part of the closure, because we receive directories from the leaf nodes up to + // the root. + // The only thing we could to would be doing a final check when the + // last Directory was received, that all Directories received so far are + // reachable from that (root) node. + + let dgst = directory.digest(); + seen_directories_sizes.insert(dgst.clone(), directory.size()); + last_directory_dgst = Some(dgst.clone()); + + // check if the directory already exists in the database. We can skip + // inserting if it's already there, as that'd be a no-op. + match self.directory_service.get(&dgst).await { + Err(e) => { + warn!("error checking if directory already exists: {}", e); + return Err(e.into()); + } + // skip if already exists + Ok(Some(_)) => {} + // insert if it doesn't already exist + Ok(None) => { + self.directory_service.put(directory).await?; + } + } + } + + // We're done receiving. peek at last_directory_digest and either return the digest, + // or an error, if we received an empty stream. + match last_directory_dgst { + None => Err(Status::invalid_argument("no directories received")), + Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse { + root_digest: last_directory_dgst.into(), + })), + } + } +} diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs new file mode 100644 index 000000000000..59f5c1fdf3f6 --- /dev/null +++ b/tvix/castore/src/proto/mod.rs @@ -0,0 +1,387 @@ +#![allow(clippy::derive_partial_eq_without_eq, non_snake_case)] +// https://github.com/hyperium/tonic/issues/1056 +use bstr::ByteSlice; +use std::{collections::HashSet, iter::Peekable, str}; + +use prost::Message; + +mod grpc_blobservice_wrapper; +mod grpc_directoryservice_wrapper; + +pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; +pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; + +use crate::{B3Digest, B3_LEN}; + +tonic::include_proto!("tvix.castore.v1"); + +#[cfg(feature = "tonic-reflection")] +/// Compiled file descriptors for implementing [gRPC +/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g. +/// [`tonic_reflection`](https://docs.rs/tonic-reflection). +pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.castore.v1"); + +#[cfg(test)] +mod tests; + +/// Errors that can occur during the validation of [Directory] messages. +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +pub enum ValidateDirectoryError { + /// Elements are not in sorted order + #[error("{:?} is not sorted", .0.as_bstr())] + WrongSorting(Vec<u8>), + /// Multiple elements with the same name encountered + #[error("{:?} is a duplicate name", .0.as_bstr())] + DuplicateName(Vec<u8>), + /// Invalid node + #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())] + InvalidNode(Vec<u8>, ValidateNodeError), + #[error("Total size exceeds u32::MAX")] + SizeOverflow, +} + +/// Errors that occur during Node validation +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +pub enum ValidateNodeError { + #[error("No node set")] + NoNodeSet, + /// Invalid digest length encountered + #[error("Invalid Digest length: {0}")] + InvalidDigestLen(usize), + /// Invalid name encountered + #[error("Invalid name: {}", .0.as_bstr())] + InvalidName(Vec<u8>), + /// Invalid symlink target + #[error("Invalid symlink target: {}", .0.as_bstr())] + InvalidSymlinkTarget(Vec<u8>), +} + +/// Errors that occur during StatBlobResponse validation +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +pub enum ValidateStatBlobResponseError { + /// Invalid digest length encountered + #[error("Invalid digest length {0} for chunk #{1}")] + InvalidDigestLen(usize, usize), +} + +/// Checks a Node name for validity as an intermediate node. +/// We disallow slashes, null bytes, '.', '..' and the empty string. +fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { + if name.is_empty() + || name == b".." + || name == b"." + || name.contains(&0x00) + || name.contains(&b'/') + { + Err(ValidateNodeError::InvalidName(name.to_owned())) + } else { + Ok(()) + } +} + +/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] +/// and [node::Node], so we can ask all of them for the name easily. +pub trait NamedNode { + fn get_name(&self) -> &[u8]; +} + +impl NamedNode for &FileNode { + fn get_name(&self) -> &[u8] { + &self.name + } +} + +impl NamedNode for &DirectoryNode { + fn get_name(&self) -> &[u8] { + &self.name + } +} + +impl NamedNode for &SymlinkNode { + fn get_name(&self) -> &[u8] { + &self.name + } +} + +impl NamedNode for node::Node { + fn get_name(&self) -> &[u8] { + match self { + node::Node::File(node_file) => &node_file.name, + node::Node::Directory(node_directory) => &node_directory.name, + node::Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} + +impl Node { + /// Ensures the node has a valid enum kind (is Some), and passes its + // per-enum validation. + pub fn validate(&self) -> Result<(), ValidateNodeError> { + if let Some(node) = self.node.as_ref() { + node.validate() + } else { + Err(ValidateNodeError::NoNodeSet) + } + } +} + +impl node::Node { + /// Returns the node with a new name. + pub fn rename(self, name: bytes::Bytes) -> Self { + match self { + node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }), + node::Node::File(n) => node::Node::File(FileNode { name, ..n }), + node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }), + } + } + + /// Ensures the node has a valid name, and checks the type-specific fields too. + pub fn validate(&self) -> Result<(), ValidateNodeError> { + match self { + // for a directory root node, ensure the digest has the appropriate size. + node::Node::Directory(directory_node) => { + if directory_node.digest.len() != B3_LEN { + Err(ValidateNodeError::InvalidDigestLen( + directory_node.digest.len(), + ))?; + } + validate_node_name(&directory_node.name) + } + // for a file root node, ensure the digest has the appropriate size. + node::Node::File(file_node) => { + if file_node.digest.len() != B3_LEN { + Err(ValidateNodeError::InvalidDigestLen(file_node.digest.len()))?; + } + validate_node_name(&file_node.name) + } + // ensure the symlink target is not empty and doesn't contain null bytes. + node::Node::Symlink(symlink_node) => { + if symlink_node.target.is_empty() || symlink_node.target.contains(&b'\0') { + Err(ValidateNodeError::InvalidSymlinkTarget( + symlink_node.target.to_vec(), + ))?; + } + validate_node_name(&symlink_node.name) + } + } + } +} + +impl Eq for node::Node {} + +impl PartialOrd for node::Node { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for node::Node { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +/// Accepts a name, and a mutable reference to the previous name. +/// If the passed name is larger than the previous one, the reference is updated. +/// If it's not, an error is returned. +fn update_if_lt_prev<'n>( + prev_name: &mut &'n [u8], + name: &'n [u8], +) -> Result<(), ValidateDirectoryError> { + if *name < **prev_name { + return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); + } + *prev_name = name; + Ok(()) +} + +/// Inserts the given name into a HashSet if it's not already in there. +/// If it is, an error is returned. +fn insert_once<'n>( + seen_names: &mut HashSet<&'n [u8]>, + name: &'n [u8], +) -> Result<(), ValidateDirectoryError> { + if seen_names.get(name).is_some() { + return Err(ValidateDirectoryError::DuplicateName(name.to_vec())); + } + seen_names.insert(name); + Ok(()) +} + +fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { + iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) +} + +impl Directory { + /// The size of a directory is the number of all regular and symlink elements, + /// the number of directory elements, and their size fields. + pub fn size(&self) -> u64 { + if cfg!(debug_assertions) { + self.size_checked() + .expect("Directory::size exceeds u64::MAX") + } else { + self.size_checked().unwrap_or(u64::MAX) + } + } + + fn size_checked(&self) -> Option<u64> { + checked_sum([ + self.files.len().try_into().ok()?, + self.symlinks.len().try_into().ok()?, + self.directories.len().try_into().ok()?, + checked_sum(self.directories.iter().map(|e| e.size))?, + ]) + } + + /// Calculates the digest of a Directory, which is the blake3 hash of a + /// Directory protobuf message, serialized in protobuf canonical form. + pub fn digest(&self) -> B3Digest { + let mut hasher = blake3::Hasher::new(); + + hasher + .update(&self.encode_to_vec()) + .finalize() + .as_bytes() + .into() + } + + /// validate checks the directory for invalid data, such as: + /// - violations of name restrictions + /// - invalid digest lengths + /// - not properly sorted lists + /// - duplicate names in the three lists + pub fn validate(&self) -> Result<(), ValidateDirectoryError> { + let mut seen_names: HashSet<&[u8]> = HashSet::new(); + + let mut last_directory_name: &[u8] = b""; + let mut last_file_name: &[u8] = b""; + let mut last_symlink_name: &[u8] = b""; + + // check directories + for directory_node in &self.directories { + node::Node::Directory(directory_node.clone()) + .validate() + .map_err(|e| { + ValidateDirectoryError::InvalidNode(directory_node.name.to_vec(), e) + })?; + + update_if_lt_prev(&mut last_directory_name, &directory_node.name)?; + insert_once(&mut seen_names, &directory_node.name)?; + } + + // check files + for file_node in &self.files { + node::Node::File(file_node.clone()) + .validate() + .map_err(|e| ValidateDirectoryError::InvalidNode(file_node.name.to_vec(), e))?; + + update_if_lt_prev(&mut last_file_name, &file_node.name)?; + insert_once(&mut seen_names, &file_node.name)?; + } + + // check symlinks + for symlink_node in &self.symlinks { + node::Node::Symlink(symlink_node.clone()) + .validate() + .map_err(|e| ValidateDirectoryError::InvalidNode(symlink_node.name.to_vec(), e))?; + + update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?; + insert_once(&mut seen_names, &symlink_node.name)?; + } + + self.size_checked() + .ok_or(ValidateDirectoryError::SizeOverflow)?; + + Ok(()) + } + + /// Allows iterating over all three nodes ([DirectoryNode], [FileNode], + /// [SymlinkNode]) in an ordered fashion, as long as the individual lists + /// are sorted (which can be checked by the [Directory::validate]). + pub fn nodes(&self) -> DirectoryNodesIterator { + return DirectoryNodesIterator { + i_directories: self.directories.iter().peekable(), + i_files: self.files.iter().peekable(), + i_symlinks: self.symlinks.iter().peekable(), + }; + } +} + +impl StatBlobResponse { + /// Validates a StatBlobResponse. All chunks must have valid blake3 digests. + /// It is allowed to send an empty list, if no more granular chunking is + /// available. + pub fn validate(&self) -> Result<(), ValidateStatBlobResponseError> { + for (i, chunk) in self.chunks.iter().enumerate() { + if chunk.digest.len() != blake3::KEY_LEN { + return Err(ValidateStatBlobResponseError::InvalidDigestLen( + chunk.digest.len(), + i, + )); + } + } + Ok(()) + } +} + +/// Struct to hold the state of an iterator over all nodes of a Directory. +/// +/// Internally, this keeps peekable Iterators over all three lists of a +/// Directory message. +pub struct DirectoryNodesIterator<'a> { + // directory: &Directory, + i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>, + i_files: Peekable<std::slice::Iter<'a, FileNode>>, + i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>, +} + +/// looks at two elements implementing NamedNode, and returns true if "left +/// is smaller / comes first". +/// +/// Some(_) is preferred over None. +fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool { + match left { + // if left is None, right always wins + None => false, + Some(left_inner) => { + // left is Some. + match right { + // left is Some, right is None - left wins. + None => true, + Some(right_inner) => { + // both are Some - compare the name. + return left_inner.get_name() < right_inner.get_name(); + } + } + } + } +} + +impl Iterator for DirectoryNodesIterator<'_> { + type Item = node::Node; + + // next returns the next node in the Directory. + // we peek at all three internal iterators, and pick the one with the + // smallest name, to ensure lexicographical ordering. + // The individual lists are already known to be sorted. + fn next(&mut self) -> Option<Self::Item> { + if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) { + // i_directories is still in the game, compare with symlinks + if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) { + self.i_directories + .next() + .cloned() + .map(node::Node::Directory) + } else { + self.i_symlinks.next().cloned().map(node::Node::Symlink) + } + } else { + // i_files is still in the game, compare with symlinks + if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) { + self.i_files.next().cloned().map(node::Node::File) + } else { + self.i_symlinks.next().cloned().map(node::Node::Symlink) + } + } + } +} diff --git a/tvix/castore/src/proto/tests/directory.rs b/tvix/castore/src/proto/tests/directory.rs new file mode 100644 index 000000000000..5fda394775b5 --- /dev/null +++ b/tvix/castore/src/proto/tests/directory.rs @@ -0,0 +1,373 @@ +use crate::proto::{ + Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError, ValidateNodeError, +}; + +use hex_literal::hex; + +const DUMMY_DIGEST: [u8; 32] = [0; 32]; + +#[test] +fn size() { + { + let d = Directory::default(); + assert_eq!(d.size(), 0); + } + { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 0, + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } + { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 4, + }], + ..Default::default() + }; + assert_eq!(d.size(), 5); + } + { + let d = Directory { + files: vec![FileNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + executable: false, + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "foo".into(), + target: "bar".into(), + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } +} + +#[test] +#[cfg_attr(not(debug_assertions), ignore)] +#[should_panic = "Directory::size exceeds u64::MAX"] +fn size_unchecked_panic() { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX, + }], + ..Default::default() + }; + + d.size(); +} + +#[test] +#[cfg_attr(debug_assertions, ignore)] +fn size_unchecked_saturate() { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX, + }], + ..Default::default() + }; + + assert_eq!(d.size(), u64::MAX); +} + +#[test] +fn size_checked() { + // We don't test the overflow cases that rely purely on immediate + // child count, since that would take an absurd amount of memory. + { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX - 1, + }], + ..Default::default() + }; + assert_eq!(d.size_checked(), Some(u64::MAX)); + } + { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX, + }], + ..Default::default() + }; + assert_eq!(d.size_checked(), None); + } + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX / 2, + }, + DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX / 2, + }, + ], + ..Default::default() + }; + assert_eq!(d.size_checked(), None); + } +} + +#[test] +fn digest() { + let d = Directory::default(); + + assert_eq!( + d.digest(), + (&hex!("af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262")).into() + ) +} + +#[test] +fn validate_empty() { + let d = Directory::default(); + assert_eq!(d.validate(), Ok(())); +} + +#[test] +fn validate_invalid_names() { + { + let d = Directory { + directories: vec![DirectoryNode { + name: "".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { + assert_eq!(n, b"") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + directories: vec![DirectoryNode { + name: ".".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { + assert_eq!(n, b".") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + files: vec![FileNode { + name: "..".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + executable: false, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { + assert_eq!(n, b"..") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "\x00".into(), + target: "foo".into(), + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { + assert_eq!(n, b"\x00") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "foo/bar".into(), + target: "foo".into(), + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { + assert_eq!(n, b"foo/bar") + } + _ => panic!("unexpected error"), + }; + } +} + +#[test] +fn validate_invalid_digest() { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: vec![0x00, 0x42].into(), // invalid length + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => { + assert_eq!(n, 2) + } + _ => panic!("unexpected error"), + } +} + +#[test] +fn validate_sorting() { + // "b" comes before "a", bad. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "b".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::WrongSorting(s) => { + assert_eq!(s, b"a"); + } + _ => panic!("unexpected error"), + } + } + + // "a" exists twice, bad. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::DuplicateName(s) => { + assert_eq!(s, b"a"); + } + _ => panic!("unexpected error"), + } + } + + // "a" comes before "b", all good. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "b".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + ..Default::default() + }; + + d.validate().expect("validate shouldn't error"); + } + + // [b, c] and [a] are both properly sorted. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "b".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "c".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + symlinks: vec![SymlinkNode { + name: "a".into(), + target: "foo".into(), + }], + ..Default::default() + }; + + d.validate().expect("validate shouldn't error"); + } +} + +#[test] +fn validate_overflow() { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX, + }], + ..Default::default() + }; + + match d.validate().expect_err("must fail") { + ValidateDirectoryError::SizeOverflow => {} + _ => panic!("unexpected error"), + } +} diff --git a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs new file mode 100644 index 000000000000..68f147a33210 --- /dev/null +++ b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs @@ -0,0 +1,78 @@ +use crate::proto::Directory; +use crate::proto::DirectoryNode; +use crate::proto::FileNode; +use crate::proto::NamedNode; +use crate::proto::SymlinkNode; + +#[test] +fn iterator() { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "c".into(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "d".into(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "h".into(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "l".into(), + ..DirectoryNode::default() + }, + ], + files: vec![ + FileNode { + name: "b".into(), + ..FileNode::default() + }, + FileNode { + name: "e".into(), + ..FileNode::default() + }, + FileNode { + name: "g".into(), + ..FileNode::default() + }, + FileNode { + name: "j".into(), + ..FileNode::default() + }, + ], + symlinks: vec![ + SymlinkNode { + name: "a".into(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "f".into(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "i".into(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "k".into(), + ..SymlinkNode::default() + }, + ], + }; + + // We keep this strings here and convert to string to make the comparison + // less messy. + let mut node_names: Vec<String> = vec![]; + + for node in d.nodes() { + node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap()); + } + + assert_eq!( + vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"], + node_names + ); +} diff --git a/tvix/castore/src/proto/tests/grpc_blobservice.rs b/tvix/castore/src/proto/tests/grpc_blobservice.rs new file mode 100644 index 000000000000..fb202b7d8a51 --- /dev/null +++ b/tvix/castore/src/proto/tests/grpc_blobservice.rs @@ -0,0 +1,94 @@ +use crate::fixtures::{BLOB_A, BLOB_A_DIGEST}; +use crate::proto::{BlobChunk, ReadBlobRequest, StatBlobRequest}; +use crate::utils::gen_blobsvc_grpc_client; +use tokio_stream::StreamExt; + +/// Trying to read a non-existent blob should return a not found error. +#[tokio::test] +async fn not_found_read() { + let mut grpc_client = gen_blobsvc_grpc_client().await; + + let resp = grpc_client + .read(ReadBlobRequest { + digest: BLOB_A_DIGEST.clone().into(), + }) + .await; + + // We can't use unwrap_err here, because the Ok value doesn't implement + // debug. + if let Err(e) = resp { + assert_eq!(e.code(), tonic::Code::NotFound); + } else { + panic!("resp is not err") + } +} + +/// Trying to stat a non-existent blob should return a not found error. +#[tokio::test] +async fn not_found_stat() { + let mut grpc_client = gen_blobsvc_grpc_client().await; + + let resp = grpc_client + .stat(StatBlobRequest { + digest: BLOB_A_DIGEST.clone().into(), + ..Default::default() + }) + .await + .expect_err("must fail"); + + // The resp should be a status with Code::NotFound + assert_eq!(resp.code(), tonic::Code::NotFound); +} + +/// Put a blob in the store, get it back. +#[tokio::test] +async fn put_read_stat() { + let mut grpc_client = gen_blobsvc_grpc_client().await; + + // Send blob A. + let put_resp = grpc_client + .put(tokio_stream::once(BlobChunk { + data: BLOB_A.clone(), + })) + .await + .expect("must succeed") + .into_inner(); + + assert_eq!(BLOB_A_DIGEST.as_slice(), put_resp.digest); + + // Stat for the digest of A. + // We currently don't ask for more granular chunking data, as we don't + // expose it yet. + let _resp = grpc_client + .stat(StatBlobRequest { + digest: BLOB_A_DIGEST.clone().into(), + ..Default::default() + }) + .await + .expect("must succeed") + .into_inner(); + + // Read the blob. It should return the same data. + let resp = grpc_client + .read(ReadBlobRequest { + digest: BLOB_A_DIGEST.clone().into(), + }) + .await; + + let mut rx = resp.ok().unwrap().into_inner(); + + // the stream should contain one element, a BlobChunk with the same contents as BLOB_A. + let item = rx + .next() + .await + .expect("must be some") + .expect("must succeed"); + + assert_eq!(BLOB_A.clone(), item.data); + + // … and no more elements + assert!(rx.next().await.is_none()); + + // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks. + // Test with some bigger blob too +} diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs new file mode 100644 index 000000000000..1b522472be5b --- /dev/null +++ b/tvix/castore/src/proto/tests/grpc_directoryservice.rs @@ -0,0 +1,246 @@ +use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; +use crate::proto::directory_service_client::DirectoryServiceClient; +use crate::proto::get_directory_request::ByWhat; +use crate::proto::GetDirectoryRequest; +use crate::proto::{Directory, DirectoryNode, SymlinkNode}; +use crate::utils::gen_directorysvc_grpc_client; +use tokio_stream::StreamExt; +use tonic::transport::Channel; +use tonic::Status; + +/// Send the specified GetDirectoryRequest. +/// Returns an error in the case of an error response, or an error in one of +// the items in the stream, or a Vec<Directory> in the case of a successful +/// request. +async fn get_directories( + grpc_client: &mut DirectoryServiceClient<Channel>, + get_directory_request: GetDirectoryRequest, +) -> Result<Vec<Directory>, Status> { + let resp = grpc_client.get(get_directory_request).await; + + // if the response is an error itself, return the error, otherwise unpack + let stream = match resp { + Ok(resp) => resp, + Err(status) => return Err(status), + } + .into_inner(); + + let directory_results: Vec<Result<Directory, Status>> = stream.collect().await; + + // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status> + directory_results.into_iter().collect() +} + +/// Trying to get a non-existent Directory should return a not found error. +#[tokio::test] +async fn not_found() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + let resp = grpc_client + .get(GetDirectoryRequest { + by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())), + ..Default::default() + }) + .await; + + let stream = resp.expect("must succeed").into_inner(); + + let items: Vec<_> = stream.collect().await; + + // The stream should contain one element, an error with Code::NotFound. + assert_eq!(1, items.len()); + let item = items[0].clone(); + + assert!(item.is_err(), "must be err"); + assert_eq!( + tonic::Code::NotFound, + item.unwrap_err().code(), + "must be err" + ); +} + +/// Put a Directory into the store, get it back. +#[tokio::test] +async fn put_get() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + // send directory A. + let put_resp = { + grpc_client + .put(tokio_stream::once(DIRECTORY_A.clone())) + .await + .expect("must succeed") + .into_inner() + }; + + // the sent root_digest should match the calculated digest + assert_eq!(put_resp.root_digest, DIRECTORY_A.digest().as_slice()); + + // get it back + let items = get_directories( + &mut grpc_client, + GetDirectoryRequest { + by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())), + ..Default::default() + }, + ) + .await + .expect("must not error"); + + assert_eq!(vec![DIRECTORY_A.clone()], items); +} + +/// Put multiple Directories into the store, and get them back +#[tokio::test] +async fn put_get_multiple() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + // sending "b" (which refers to "a") without sending "a" first should fail. + let put_resp = { + grpc_client + .put(tokio_stream::once(DIRECTORY_B.clone())) + .await + .expect_err("must fail") + }; + + assert_eq!(tonic::Code::InvalidArgument, put_resp.code()); + + // sending "a", then "b" should succeed, and the response should contain the digest of b. + let put_resp = { + grpc_client + .put(tokio_stream::iter(vec![ + DIRECTORY_A.clone(), + DIRECTORY_B.clone(), + ])) + .await + .expect("must succeed") + .into_inner() + }; + + assert_eq!(DIRECTORY_B.digest().as_slice(), put_resp.root_digest); + + // now, request b, first in non-recursive mode. + let items = get_directories( + &mut grpc_client, + GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())), + }, + ) + .await + .expect("must not error"); + + // We expect to only get b. + assert_eq!(vec![DIRECTORY_B.clone()], items); + + // now, request b, but in recursive mode. + let items = get_directories( + &mut grpc_client, + GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())), + }, + ) + .await + .expect("must not error"); + + // We expect to get b, and then a, because that's how we traverse down. + assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items); +} + +/// Put multiple Directories into the store, and omit duplicates. +#[tokio::test] +async fn put_get_dedup() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + // Send "A", then "C", which refers to "A" two times + // Pretend we're a dumb client sending A twice. + let put_resp = { + grpc_client + .put(tokio_stream::iter(vec![ + DIRECTORY_A.clone(), + DIRECTORY_A.clone(), + DIRECTORY_C.clone(), + ])) + .await + .expect("must succeed") + }; + + assert_eq!( + DIRECTORY_C.digest().as_slice(), + put_resp.into_inner().root_digest + ); + + // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice. + let items = get_directories( + &mut grpc_client, + GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().into())), + }, + ) + .await + .expect("must not error"); + + // We expect to get C, and then A (once, as the second A has been deduplicated). + assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items); +} + +/// Trying to upload a Directory failing validation should fail. +#[tokio::test] +async fn put_reject_failed_validation() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + // construct a broken Directory message that fails validation + let broken_directory = Directory { + symlinks: vec![SymlinkNode { + name: "".into(), + target: "doesntmatter".into(), + }], + ..Default::default() + }; + assert!(broken_directory.validate().is_err()); + + // send it over, it must fail + let put_resp = { + grpc_client + .put(tokio_stream::once(broken_directory)) + .await + .expect_err("must fail") + }; + + assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); +} + +/// Trying to upload a Directory with wrong size should fail. +#[tokio::test] +async fn put_reject_wrong_size() { + let mut grpc_client = gen_directorysvc_grpc_client().await; + + // Construct a directory referring to DIRECTORY_A, but with wrong size. + let broken_parent_directory = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DIRECTORY_A.digest().into(), + size: 42, + }], + ..Default::default() + }; + // Make sure we got the size wrong. + assert_ne!( + broken_parent_directory.directories[0].size, + DIRECTORY_A.size() + ); + + // now upload both (first A, then the broken parent). This must fail. + let put_resp = { + grpc_client + .put(tokio_stream::iter(vec![ + DIRECTORY_A.clone(), + broken_parent_directory, + ])) + .await + .expect_err("must fail") + }; + assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); +} diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs new file mode 100644 index 000000000000..8b62fadeb5a6 --- /dev/null +++ b/tvix/castore/src/proto/tests/mod.rs @@ -0,0 +1,4 @@ +mod directory; +mod directory_nodes_iterator; +mod grpc_blobservice; +mod grpc_directoryservice; diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs new file mode 100644 index 000000000000..99e993f36da3 --- /dev/null +++ b/tvix/castore/src/tests/import.rs @@ -0,0 +1,126 @@ +use crate::blobservice::BlobService; +use crate::fixtures::*; +use crate::import::ingest_path; +use crate::proto; +use crate::utils::{gen_blob_service, gen_directory_service}; +use std::sync::Arc; +use tempfile::TempDir; + +#[cfg(target_family = "unix")] +use std::os::unix::ffi::OsStrExt; + +#[cfg(target_family = "unix")] +#[tokio::test] +async fn symlink() { + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + + let tmpdir = TempDir::new().unwrap(); + + std::fs::create_dir_all(&tmpdir).unwrap(); + std::os::unix::fs::symlink( + "/nix/store/somewhereelse", + tmpdir.path().join("doesntmatter"), + ) + .unwrap(); + + let root_node = ingest_path( + Arc::from(blob_service), + directory_service, + tmpdir.path().join("doesntmatter"), + ) + .await + .expect("must succeed"); + + assert_eq!( + proto::node::Node::Symlink(proto::SymlinkNode { + name: "doesntmatter".into(), + target: "/nix/store/somewhereelse".into(), + }), + root_node, + ) +} + +#[tokio::test] +async fn single_file() { + let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); + let directory_service = gen_directory_service(); + + let tmpdir = TempDir::new().unwrap(); + + std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap(); + + let root_node = ingest_path( + blob_service.clone(), + directory_service, + tmpdir.path().join("root"), + ) + .await + .expect("must succeed"); + + assert_eq!( + proto::node::Node::File(proto::FileNode { + name: "root".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: false, + }), + root_node, + ); + + // ensure the blob has been uploaded + assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap()); +} + +#[cfg(target_family = "unix")] +#[tokio::test] +async fn complicated() { + let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); + let directory_service = gen_directory_service(); + + let tmpdir = TempDir::new().unwrap(); + + // File ``.keep` + std::fs::write(tmpdir.path().join(".keep"), vec![]).unwrap(); + // Symlink `aa` + std::os::unix::fs::symlink("/nix/store/somewhereelse", tmpdir.path().join("aa")).unwrap(); + // Directory `keep` + std::fs::create_dir(tmpdir.path().join("keep")).unwrap(); + // File ``keep/.keep` + std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap(); + + let root_node = ingest_path(blob_service.clone(), &directory_service, tmpdir.path()) + .await + .expect("must succeed"); + + // ensure root_node matched expectations + assert_eq!( + proto::node::Node::Directory(proto::DirectoryNode { + name: tmpdir + .path() + .file_name() + .unwrap() + .as_bytes() + .to_owned() + .into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + root_node, + ); + + // ensure DIRECTORY_WITH_KEEP and DIRECTORY_COMPLICATED have been uploaded + assert!(directory_service + .get(&DIRECTORY_WITH_KEEP.digest()) + .await + .unwrap() + .is_some()); + assert!(directory_service + .get(&DIRECTORY_COMPLICATED.digest()) + .await + .unwrap() + .is_some()); + + // ensure EMPTY_BLOB_CONTENTS has been uploaded + assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap()); +} diff --git a/tvix/castore/src/tests/mod.rs b/tvix/castore/src/tests/mod.rs new file mode 100644 index 000000000000..d016f3e0aa55 --- /dev/null +++ b/tvix/castore/src/tests/mod.rs @@ -0,0 +1 @@ +mod import; diff --git a/tvix/castore/src/tonic.rs b/tvix/castore/src/tonic.rs new file mode 100644 index 000000000000..8ea417548ce9 --- /dev/null +++ b/tvix/castore/src/tonic.rs @@ -0,0 +1,118 @@ +use tokio::net::UnixStream; +use tonic::transport::{Channel, Endpoint}; + +fn url_wants_wait_connect(url: &url::Url) -> bool { + url.query_pairs() + .filter(|(k, v)| k == "wait-connect" && v == "1") + .count() + > 0 +} + +/// Turn a [url::Url] to a [Channel] if it can be parsed successfully. +/// It supports the following schemes (and URLs): +/// - `grpc+http://[::1]:8000`, connecting over unencrypted HTTP/2 (h2c) +/// - `grpc+https://[::1]:8000`, connecting over encrypted HTTP/2 +/// - `grpc+unix:/path/to/socket`, connecting to a unix domain socket +/// +/// All URLs support adding `wait-connect=1` as a URL parameter, in which case +/// the connection is established lazily. +pub async fn channel_from_url(url: &url::Url) -> Result<Channel, self::Error> { + match url.scheme() { + "grpc+unix" => { + if url.host_str().is_some() { + return Err(Error::HostSetForUnixSocket()); + } + + let connector = tower::service_fn({ + let url = url.clone(); + move |_: tonic::transport::Uri| UnixStream::connect(url.path().to_string().clone()) + }); + + // the URL doesn't matter + let endpoint = Endpoint::from_static("http://[::]:50051"); + if url_wants_wait_connect(url) { + Ok(endpoint.connect_with_connector(connector).await?) + } else { + Ok(endpoint.connect_with_connector_lazy(connector)) + } + } + _ => { + // ensure path is empty, not supported with gRPC. + if !url.path().is_empty() { + return Err(Error::PathMayNotBeSet()); + } + + // Stringify the URL and remove the grpc+ prefix. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let unprefixed_url_str = match url.to_string().strip_prefix("grpc+") { + None => return Err(Error::MissingGRPCPrefix()), + Some(url_str) => url_str.to_owned(), + }; + + // Use the regular tonic transport::Endpoint logic, but unprefixed_url_str, + // as tonic doesn't know about grpc+http[s]. + let endpoint = Endpoint::try_from(unprefixed_url_str)?; + if url_wants_wait_connect(url) { + Ok(endpoint.connect().await?) + } else { + Ok(endpoint.connect_lazy()) + } + } + } +} + +/// Errors occuring when trying to connect to a backend +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("grpc+ prefix is missing from URL")] + MissingGRPCPrefix(), + + #[error("host may not be set for unix domain sockets")] + HostSetForUnixSocket(), + + #[error("path may not be set")] + PathMayNotBeSet(), + + #[error("transport error: {0}")] + TransportError(tonic::transport::Error), +} + +impl From<tonic::transport::Error> for Error { + fn from(value: tonic::transport::Error) -> Self { + Self::TransportError(value) + } +} + +#[cfg(test)] +mod tests { + use super::channel_from_url; + use test_case::test_case; + use url::Url; + + /// Correct scheme to connect to a unix socket. + #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")] + /// Connecting with wait-connect set to 0 succeeds, as that's the default. + #[test_case("grpc+unix:///path/to/somewhere?wait-connect=0", true; "grpc valid unix wait-connect=0")] + /// Connecting with wait-connect set to 1 fails, as the path doesn't exist. + #[test_case("grpc+unix:///path/to/somewhere?wait-connect=1", false; "grpc valid unix wait-connect=1")] + /// Correct scheme for unix socket, but setting a host too, which is invalid. + #[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")] + /// Correct scheme to connect to localhost, with port 12345 + #[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[test_case("grpc+http://localhost", true; "grpc valid http host without port")] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[test_case("grpc+https://localhost", true; "grpc valid https host without port")] + /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. + #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")] + /// Connecting with wait-connect set to 0 succeeds, as that's the default. + #[test_case("grpc+http://localhost?wait-connect=0", true; "grpc valid host wait-connect=0")] + /// Connecting with wait-connect set to 1 fails, as the host doesn't exist. + #[test_case("grpc+http://nonexist.invalid?wait-connect=1", false; "grpc valid host wait-connect=1")] + #[tokio::test] + async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) { + let url = Url::parse(uri_str).expect("must parse"); + assert_eq!(channel_from_url(&url).await.is_ok(), is_ok) + } +} diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs new file mode 100644 index 000000000000..ca68f9f26c5a --- /dev/null +++ b/tvix/castore/src/utils.rs @@ -0,0 +1,89 @@ +//! A crate containing constructors to provide instances of a BlobService and +//! DirectoryService. Only used for testing purposes, but across crates. +//! Should be removed once we have a better concept of a "Service registry". +use tonic::transport::{Channel, Endpoint, Server, Uri}; + +use crate::{ + blobservice::{BlobService, MemoryBlobService}, + directoryservice::{DirectoryService, MemoryDirectoryService}, + proto::{ + blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer, + directory_service_client::DirectoryServiceClient, + directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper, + GRPCDirectoryServiceWrapper, + }, +}; + +pub fn gen_blob_service() -> Box<dyn BlobService> { + Box::<MemoryBlobService>::default() +} + +pub fn gen_directory_service() -> Box<dyn DirectoryService> { + Box::<MemoryDirectoryService>::default() +} + +/// This will spawn the a gRPC server with a DirectoryService client, connect a +/// gRPC DirectoryService client and return it. +#[allow(dead_code)] +pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> { + let (left, right) = tokio::io::duplex(64); + + // spin up a server, which will only connect once, to the left side. + tokio::spawn(async { + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::new(gen_directory_service()), + )); + + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await + }); + + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + DirectoryServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), + ) +} + +/// This will spawn the a gRPC server with a BlobService client, connect a +/// gRPC BlobService client and return it. +#[allow(dead_code)] +pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> { + let (left, right) = tokio::io::duplex(64); + + // spin up a server, which will only connect once, to the left side. + tokio::spawn(async { + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( + gen_blob_service(), + ))); + + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await + }); + + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + BlobServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), + ) +} |