diff options
-rw-r--r-- | tvix/store/src/dummy_directory_service.rs | 35 | ||||
-rw-r--r-- | tvix/store/src/lib.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/main.rs | 3 | ||||
-rw-r--r-- | tvix/store/src/sled_directory_service.rs | 249 | ||||
-rw-r--r-- | tvix/store/src/tests/directory_service.rs | 158 | ||||
-rw-r--r-- | tvix/store/src/tests/mod.rs | 1 |
6 files changed, 411 insertions, 37 deletions
diff --git a/tvix/store/src/dummy_directory_service.rs b/tvix/store/src/dummy_directory_service.rs deleted file mode 100644 index 183f146b5814..000000000000 --- a/tvix/store/src/dummy_directory_service.rs +++ /dev/null @@ -1,35 +0,0 @@ -use tokio_stream::wrappers::ReceiverStream; - -use crate::proto::directory_service_server::DirectoryService; -use crate::proto::Directory; -use crate::proto::GetDirectoryRequest; -use crate::proto::PutDirectoryResponse; -use tonic::{Request, Response, Result, Status, Streaming}; -use tracing::{instrument, warn}; - -const NOT_IMPLEMENTED_MSG: &str = "not implemented"; - -pub struct DummyDirectoryService {} - -#[tonic::async_trait] -impl DirectoryService for DummyDirectoryService { - type GetStream = ReceiverStream<Result<Directory>>; - - #[instrument(skip(self))] - async fn get( - &self, - _request: Request<GetDirectoryRequest>, - ) -> Result<Response<Self::GetStream>, Status> { - warn!(NOT_IMPLEMENTED_MSG); - Err(Status::unimplemented(NOT_IMPLEMENTED_MSG)) - } - - #[instrument(skip(self, _request))] - async fn put( - &self, - _request: Request<Streaming<Directory>>, - ) -> Result<Response<PutDirectoryResponse>> { - warn!(NOT_IMPLEMENTED_MSG); - Err(Status::unimplemented(NOT_IMPLEMENTED_MSG)) - } -} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index 8d7d45d028b9..d0e505733a74 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -3,7 +3,7 @@ pub mod proto; pub mod store_path; pub mod dummy_blob_service; -pub mod dummy_directory_service; +pub mod sled_directory_service; pub mod sled_path_info_service; #[cfg(test)] diff --git a/tvix/store/src/main.rs b/tvix/store/src/main.rs index 4fa09f28a285..69b7722f8d7e 100644 --- a/tvix/store/src/main.rs +++ b/tvix/store/src/main.rs @@ -35,7 +35,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut server = Server::builder(); let blob_service = tvix_store::dummy_blob_service::DummyBlobService {}; - let directory_service = tvix_store::dummy_directory_service::DummyDirectoryService {}; + let directory_service = + tvix_store::sled_directory_service::SledDirectoryService::new("directories.sled".into())?; let path_info_service = tvix_store::sled_path_info_service::SledPathInfoService::new("pathinfo.sled".into())?; diff --git a/tvix/store/src/sled_directory_service.rs b/tvix/store/src/sled_directory_service.rs new file mode 100644 index 000000000000..a966601fc423 --- /dev/null +++ b/tvix/store/src/sled_directory_service.rs @@ -0,0 +1,249 @@ +use data_encoding::BASE64; +use std::collections::HashSet; +use std::collections::VecDeque; +use std::path::PathBuf; +use tokio::sync::mpsc::channel; +use tokio::sync::mpsc::Sender; + +use prost::Message; +use tokio::task; +use tokio_stream::wrappers::ReceiverStream; + +use crate::proto::directory_service_server::DirectoryService; +use crate::proto::get_directory_request::ByWhat; +use crate::proto::Directory; +use crate::proto::GetDirectoryRequest; +use crate::proto::PutDirectoryResponse; +use tonic::{Request, Response, Result, Status, Streaming}; +use tracing::{debug, instrument, warn}; + +pub struct SledDirectoryService { + db: sled::Db, +} + +impl SledDirectoryService { + pub fn new(p: PathBuf) -> Result<Self, anyhow::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } +} + +/// Lookup a directory, optionally recurse, and send the result to the passed sender. +/// We pass in a txn, that has been opened on the outside. +/// It's up to the user to wrap this in a TransactionError appropriately. +/// This will open a sled txn to ensure a consistent view. +fn send_directories( + txn: &sled::transaction::TransactionalTree, + tx: &Sender<Result<Directory>>, + req: GetDirectoryRequest, +) -> Result<(), Status> { + // keep the list of directories to traverse + let mut deq: VecDeque<Vec<u8>> = VecDeque::new(); + + // look at the digest in the request and put it in the top of the queue. + match &req.by_what { + None => return Err(Status::invalid_argument("by_what needs to be specified")), + Some(ByWhat::Digest(digest)) => { + if digest.len() != 32 { + return Err(Status::invalid_argument("invalid digest length")); + } + deq.push_back(digest.clone()); + } + } + + // look up the directory at the top of the queue + while let Some(ref digest) = deq.pop_front() { + let digest_b64: String = BASE64.encode(digest); + + match txn.get(digest) { + // The directory was not found, abort + Ok(None) => { + return Err(Status::not_found(format!( + "directory {} not found", + digest_b64 + ))) + } + // The directory was found, try to parse the data as Directory message + Ok(Some(data)) => match Directory::decode(&*data) { + Err(e) => { + warn!("unable to parse directory {}: {}", digest_b64, e); + return Err(Status::internal(format!( + "unable to parse directory {}", + digest_b64 + ))); + } + Ok(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != digest.clone() { + return Err(Status::data_loss(format!( + "requested directory with digest {}, but got {}", + digest_b64, + BASE64.encode(&actual_digest) + ))); + } + + // if recursion was requested, all its children need to be added to the queue. + if req.recursive { + for child_directory_node in &directory.directories { + deq.push_back(child_directory_node.digest.clone()); + } + } + + // send the directory message to the client + if let Err(e) = tx.blocking_send(Ok(directory)) { + debug!("error sending: {}", e); + return Err(Status::internal("error sending")); + } + } + }, + // some storage error? + Err(e) => { + // TODO: check what this error really means + warn!("storage error: {}", e); + return Err(Status::internal("storage error")); + } + }; + } + + // nothing left, we're done + Ok(()) +} + +/// Consume directories, and insert them into the database. +/// Reject directory messages that refer to Directories not sent in the same stream. +fn insert_directories( + txn: &sled::transaction::TransactionalTree, + directories: &Vec<Directory>, +) -> Result<Vec<u8>, Status> { + // This keeps track of the seen directory keys. + // We don't need to keep the contents around, they're stored in the DB. + let mut seen_directory_dgsts: HashSet<Vec<u8>> = HashSet::new(); + let mut last_directory_dgst: Option<Vec<u8>> = None; + + for directory in directories { + // for each child directory this directory refers to, we need + // to ensure it has been seen already in this stream. + for child_directory in &directory.directories { + if !seen_directory_dgsts.contains(&child_directory.digest) { + return Err(Status::invalid_argument(format!( + "referred child directory {} not seen yet", + BASE64.encode(&child_directory.digest) + ))); + } + } + + // TODO: do we want to verify this somehow is connected to the current graph? + // theoretically, this currently allows uploading multiple + // disconnected graphs at the same time… + + let dgst = directory.digest(); + seen_directory_dgsts.insert(dgst.clone()); + last_directory_dgst = Some(dgst.clone()); + + // insert the directory into the database + if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) { + match e { + // TODO: conflict is a problem, as the whole transaction closure is retried? + sled::transaction::UnabortableTransactionError::Conflict => todo!(), + sled::transaction::UnabortableTransactionError::Storage(e) => { + warn!("storage error: {}", e); + return Err(Status::internal("storage error")); + } + } + } + } + + // no more directories to be send. Return the digest of the + // last (root) node, or an error if we don't receive a single one. + match last_directory_dgst { + Some(last_directory_dgst) => Ok(last_directory_dgst), + None => Err(Status::invalid_argument("no directories received")), + } +} + +#[tonic::async_trait] +impl DirectoryService for SledDirectoryService { + type GetStream = ReceiverStream<Result<Directory>>; + + #[instrument(skip(self))] + async fn get( + &self, + request: Request<GetDirectoryRequest>, + ) -> Result<Response<Self::GetStream>, Status> { + let (tx, rx) = channel(5); + + let req_inner = request.into_inner(); + + // clone self.db (cheap), so we don't refer to self in the thread. + let db = self.db.clone(); + + // kick off a thread + task::spawn_blocking(move || { + // open a DB transaction. + let txn_res = db.transaction(|txn| { + send_directories(txn, &tx, req_inner.clone()) + .map_err(sled::transaction::ConflictableTransactionError::Abort) + }); + + // handle transaction errors + match txn_res { + Ok(()) => Ok(()), + Err(sled::transaction::TransactionError::Abort(status)) => { + // if the transaction was aborted, there was an error. Send it to the client + tx.blocking_send(Err(status)) + } + Err(sled::transaction::TransactionError::Storage(e)) => { + warn!("storage error: {}", e); + tx.blocking_send(Err(Status::internal("storage error"))) + } + } + }); + + // NOTE: this always returns an Ok response, with the first item in the + // stream being a potential error, instead of directly returning the + // first error. + // Peeking on the first element seems to be extraordinarily hard, + // and client code considers these two equivalent anyways. + let receiver_stream = ReceiverStream::new(rx); + Ok(Response::new(receiver_stream)) + } + + #[instrument(skip(self, request))] + async fn put( + &self, + request: Request<Streaming<Directory>>, + ) -> Result<Response<PutDirectoryResponse>> { + let mut req_inner = request.into_inner(); + + // TODO: for now, we collect all Directory messages into a Vec, and + // pass it to the helper function. + // Ideally, we would validate things as we receive it, and have a way + // to reject early - but the semantics of transactions (and its + // retries) don't allow us to discard things that early anyways. + let mut directories: Vec<Directory> = Vec::new(); + + while let Some(directory) = req_inner.message().await? { + directories.push(directory) + } + + let txn_res = self.db.transaction(|txn| { + insert_directories(txn, &directories) + .map_err(sled::transaction::ConflictableTransactionError::Abort) + }); + + match txn_res { + Ok(last_directory_dgst) => Ok(Response::new(PutDirectoryResponse { + root_digest: last_directory_dgst, + })), + Err(sled::transaction::TransactionError::Storage(e)) => { + warn!("storage error: {}", e); + Err(Status::internal("storage error")) + } + Err(sled::transaction::TransactionError::Abort(e)) => Err(e), + } + } +} diff --git a/tvix/store/src/tests/directory_service.rs b/tvix/store/src/tests/directory_service.rs new file mode 100644 index 000000000000..c16feb243066 --- /dev/null +++ b/tvix/store/src/tests/directory_service.rs @@ -0,0 +1,158 @@ +use tempfile::TempDir; +use tokio_stream::StreamExt; +use tonic::Status; + +use crate::proto::directory_service_server::DirectoryService; +use crate::proto::get_directory_request::ByWhat; +use crate::proto::GetDirectoryRequest; +use crate::proto::{Directory, DirectoryNode}; +use crate::sled_directory_service::SledDirectoryService; +use lazy_static::lazy_static; + +lazy_static! { + static ref DIRECTORY_A: Directory = Directory::default(); + static ref DIRECTORY_B: Directory = Directory { + directories: vec![DirectoryNode { + name: "a".to_string(), + digest: DIRECTORY_A.digest(), + size: DIRECTORY_A.size(), + }], + ..Default::default() + }; +} + +/// Send the specified GetDirectoryRequest. +/// Returns an error in the case of an error response, or an error in one of the items in the stream, +/// or a Vec<Directory> in the case of a successful request. +async fn get_directories<S: DirectoryService>( + svc: &S, + get_directory_request: GetDirectoryRequest, +) -> Result<Vec<Directory>, Status> { + let resp = svc.get(tonic::Request::new(get_directory_request)).await; + + // if the response is an error itself, return the error, otherwise unpack + let stream = match resp { + Ok(resp) => resp, + Err(status) => return Err(status), + } + .into_inner(); + + let directory_results: Vec<Result<Directory, Status>> = stream.collect().await; + + // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status> + directory_results.into_iter().collect() +} + +/// Trying to get a non-existent Directory should return a not found error. +#[tokio::test] +async fn not_found() -> anyhow::Result<()> { + let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?; + + let resp = service + .get(tonic::Request::new(GetDirectoryRequest { + by_what: Some(ByWhat::Digest(DIRECTORY_A.digest())), + ..Default::default() + })) + .await; + + let mut rx = resp.expect("must succeed").into_inner().into_inner(); + + // The stream should contain one element, an error with Code::NotFound. + let item = rx + .recv() + .await + .expect("must be some") + .expect_err("must be err"); + assert_eq!(item.code(), tonic::Code::NotFound); + + // … and nothing else + assert!(rx.recv().await.is_none()); + + Ok(()) +} + +/// Put a Directory into the store, get it back. +#[tokio::test] +async fn put_get() -> anyhow::Result<()> { + let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?; + + let streaming_request = tonic_mock::streaming_request(vec![DIRECTORY_A.clone()]); + let put_resp = service + .put(streaming_request) + .await + .expect("must succeed") + .into_inner(); + + // the sent root_digest should match the calculated digest + assert_eq!(put_resp.root_digest, DIRECTORY_A.digest()); + + // get it back + let items = get_directories( + &service, + GetDirectoryRequest { + by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().to_vec())), + ..Default::default() + }, + ) + .await + .expect("must not error"); + + assert_eq!(vec![DIRECTORY_A.clone()], items); + + Ok(()) +} + +/// Put multiple Directories into the store, and get them back +#[tokio::test] +async fn put_get_multiple() -> anyhow::Result<()> { + let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?; + + // sending "b" (which refers to "a") without sending "a" first should fail. + let put_resp = service + .put(tonic_mock::streaming_request(vec![DIRECTORY_B.clone()])) + .await + .expect_err("must fail"); + + assert_eq!(tonic::Code::InvalidArgument, put_resp.code()); + + // sending "a", then "b" should succeed, and the response should contain the digest of b. + let put_resp = service + .put(tonic_mock::streaming_request(vec![ + DIRECTORY_A.clone(), + DIRECTORY_B.clone(), + ])) + .await + .expect("must succeed"); + + assert_eq!(DIRECTORY_B.digest(), put_resp.into_inner().root_digest); + + // now, request b, first in non-recursive mode. + let items = get_directories( + &service, + GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(DIRECTORY_B.digest())), + }, + ) + .await + .expect("must not error"); + + // We expect to only get b. + assert_eq!(vec![DIRECTORY_B.clone()], items); + + // now, request b, but in recursive mode. + let items = get_directories( + &service, + GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(DIRECTORY_B.digest())), + }, + ) + .await + .expect("must not error"); + + // We expect to get b, and then a, because that's how we traverse down. + assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items); + + Ok(()) +} diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs index 3f5d044d658a..8bfe60d8620e 100644 --- a/tvix/store/src/tests/mod.rs +++ b/tvix/store/src/tests/mod.rs @@ -1,6 +1,7 @@ use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError}; use lazy_static::lazy_static; +mod directory_service; mod path_info_service; mod pathinfo; |