diff options
author | Florian Klink <flokli@flokli.de> | 2023-02-13T17·58+0100 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-03-10T10·58+0000 |
commit | 7fe7e03df32977da2bea7531a5142b24af971cf8 (patch) | |
tree | d722c164bce492ef6986d5f86d7ee790903438b1 /tvix | |
parent | 6b91efa5cf6d09e65dc06dfc56376b678eafa033 (diff) |
chore(tvix/store): remove old implementations r/5923
These were implementing against the (more complicated) gRPC interface, for which we now have a wrapper. Change-Id: I0a8284493718ab99618a1d21a76df4d173edb899 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8100 Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/store/src/client.rs | 10 | ||||
-rw-r--r-- | tvix/store/src/dummy_blob_service.rs | 43 | ||||
-rw-r--r-- | tvix/store/src/lib.rs | 5 | ||||
-rw-r--r-- | tvix/store/src/sled_directory_service.rs | 300 | ||||
-rw-r--r-- | tvix/store/src/sled_path_info_service.rs | 89 | ||||
-rw-r--r-- | tvix/store/src/tests/directory_service.rs | 268 | ||||
-rw-r--r-- | tvix/store/src/tests/mod.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/tests/path_info_service.rs | 67 |
8 files changed, 0 insertions, 784 deletions
diff --git a/tvix/store/src/client.rs b/tvix/store/src/client.rs deleted file mode 100644 index 3b282eacdd70..000000000000 --- a/tvix/store/src/client.rs +++ /dev/null @@ -1,10 +0,0 @@ -use crate::proto::Directory; - -pub trait StoreClient { - fn open_blob(&self, digest: Vec<u8>) -> std::io::Result<Box<dyn std::io::BufRead>>; - - // TODO: stat_blob, put_blob? - fn get_directory(&self, digest: Vec<u8>) -> std::io::Result<Option<Directory>>; - - // TODO: put_directory -} diff --git a/tvix/store/src/dummy_blob_service.rs b/tvix/store/src/dummy_blob_service.rs deleted file mode 100644 index 690f6bde3869..000000000000 --- a/tvix/store/src/dummy_blob_service.rs +++ /dev/null @@ -1,43 +0,0 @@ -use tokio_stream::wrappers::ReceiverStream; - -use crate::proto::blob_service_server::BlobService; -use crate::proto::BlobChunk; -use crate::proto::BlobMeta; -use crate::proto::PutBlobResponse; -use crate::proto::ReadBlobRequest; -use crate::proto::StatBlobRequest; -use tonic::{Request, Response, Result, Status, Streaming}; -use tracing::{instrument, warn}; - -const NOT_IMPLEMENTED_MSG: &str = "not implemented"; - -pub struct DummyBlobService {} - -#[tonic::async_trait] -impl BlobService for DummyBlobService { - type ReadStream = ReceiverStream<Result<BlobChunk>>; - - #[instrument(skip(self))] - async fn stat(&self, _request: Request<StatBlobRequest>) -> Result<Response<BlobMeta>> { - warn!(NOT_IMPLEMENTED_MSG); - Err(Status::unimplemented(NOT_IMPLEMENTED_MSG)) - } - - #[instrument(skip(self))] - async fn read( - &self, - _request: Request<ReadBlobRequest>, - ) -> Result<Response<Self::ReadStream>, Status> { - warn!(NOT_IMPLEMENTED_MSG); - Err(Status::unimplemented(NOT_IMPLEMENTED_MSG)) - } - - #[instrument(skip(self, _request))] - async fn put( - &self, - _request: Request<Streaming<BlobChunk>>, - ) -> Result<Response<PutBlobResponse>> { - warn!(NOT_IMPLEMENTED_MSG); - Err(Status::unimplemented(NOT_IMPLEMENTED_MSG)) - } -} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index f23b0b9b8833..f294e39ad41a 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,5 +1,3 @@ -pub mod client; - mod blobreader; mod errors; @@ -11,9 +9,6 @@ pub mod pathinfoservice; pub mod proto; pub use blobreader::BlobReader; -pub mod dummy_blob_service; -pub mod sled_directory_service; -pub mod sled_path_info_service; pub use errors::Error; #[cfg(test)] diff --git a/tvix/store/src/sled_directory_service.rs b/tvix/store/src/sled_directory_service.rs deleted file mode 100644 index 4b2c6ed54d62..000000000000 --- a/tvix/store/src/sled_directory_service.rs +++ /dev/null @@ -1,300 +0,0 @@ -use data_encoding::BASE64; -use std::collections::HashMap; -use std::collections::HashSet; -use std::collections::VecDeque; -use std::path::PathBuf; -use tokio::sync::mpsc::channel; -use tokio::sync::mpsc::Sender; - -use prost::Message; -use tokio::task; -use tokio_stream::wrappers::ReceiverStream; - -use crate::proto::directory_service_server::DirectoryService; -use crate::proto::get_directory_request::ByWhat; -use crate::proto::Directory; -use crate::proto::GetDirectoryRequest; -use crate::proto::PutDirectoryResponse; -use tonic::{Request, Response, Result, Status, Streaming}; -use tracing::{debug, instrument, warn}; - -pub struct SledDirectoryService { - db: sled::Db, -} - -impl SledDirectoryService { - pub fn new(p: PathBuf) -> Result<Self, anyhow::Error> { - let config = sled::Config::default().use_compression(true).path(p); - let db = config.open()?; - - Ok(Self { db }) - } -} - -/// Lookup a directory, optionally recurse, and send the result to the passed sender. -/// We pass in a txn, that has been opened on the outside. -/// It's up to the user to wrap this in a TransactionError appropriately. -/// This will open a sled txn to ensure a consistent view. -fn send_directories( - txn: &sled::transaction::TransactionalTree, - tx: &Sender<Result<Directory>>, - req: GetDirectoryRequest, -) -> Result<(), Status> { - // keep the list of directories to traverse - let mut deq: VecDeque<Vec<u8>> = VecDeque::new(); - - // look at the digest in the request and put it in the top of the queue. - match &req.by_what { - None => return Err(Status::invalid_argument("by_what needs to be specified")), - Some(ByWhat::Digest(digest)) => { - if digest.len() != 32 { - return Err(Status::invalid_argument("invalid digest length")); - } - deq.push_back(digest.clone()); - } - } - - // keep a list of all the Directory messages already sent, so we can omit sending the same. - let mut sent_directory_dgsts: HashSet<Vec<u8>> = HashSet::new(); - - // look up the directory at the top of the queue - while let Some(ref digest) = deq.pop_front() { - let digest_b64: String = BASE64.encode(digest); - - match txn.get(digest) { - // The directory was not found, abort - Ok(None) => { - return Err(Status::not_found(format!( - "directory {} not found", - digest_b64 - ))) - } - // The directory was found, try to parse the data as Directory message - Ok(Some(data)) => match Directory::decode(&*data) { - Err(e) => { - warn!("unable to parse directory {}: {}", digest_b64, e); - return Err(Status::internal(format!( - "unable to parse directory {}", - digest_b64 - ))); - } - Ok(directory) => { - // Validate the retrieved Directory indeed has the - // digest we expect it to have, to detect corruptions. - let actual_digest = directory.digest(); - if actual_digest != digest.clone() { - return Err(Status::data_loss(format!( - "requested directory with digest {}, but got {}", - digest_b64, - BASE64.encode(&actual_digest) - ))); - } - - // if recursion was requested, all its children need to be added to the queue. - // If a Directory message with the same digest has already - // been sent previously, we can skip enqueueing it. - // Same applies to when it already is in the queue. - if req.recursive { - for child_directory_node in &directory.directories { - if !sent_directory_dgsts.contains(&child_directory_node.digest) - && !deq.contains(&child_directory_node.digest) - { - deq.push_back(child_directory_node.digest.clone()); - } - } - } - - // send the directory message to the client - if let Err(e) = tx.blocking_send(Ok(directory)) { - debug!("error sending: {}", e); - return Err(Status::internal("error sending")); - } - - sent_directory_dgsts.insert(digest.to_vec()); - } - }, - // some storage error? - Err(e) => { - // TODO: check what this error really means - warn!("storage error: {}", e); - return Err(Status::internal("storage error")); - } - }; - } - - // nothing left, we're done - Ok(()) -} - -/// Consume directories, and insert them into the database. -/// Reject directory messages that refer to Directories not sent in the same stream. -fn insert_directories( - txn: &sled::transaction::TransactionalTree, - directories: &Vec<Directory>, -) -> Result<Vec<u8>, Status> { - // This keeps track of the seen directory keys, and their size. - // This is used to validate the size field of a reference to a previously sent directory. - // We don't need to keep the contents around, they're stored in the DB. - let mut seen_directories_sizes: HashMap<Vec<u8>, u32> = HashMap::new(); - let mut last_directory_dgst: Option<Vec<u8>> = None; - - for directory in directories { - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Status::invalid_argument(format!( - "directory {} failed validation: {}", - BASE64.encode(&directory.digest()), - e, - ))); - } - - // for each child directory this directory refers to, we need - // to ensure it has been seen already in this stream, and that the size - // matches what we recorded. - for child_directory in &directory.directories { - match seen_directories_sizes.get(&child_directory.digest) { - None => { - return Err(Status::invalid_argument(format!( - "child directory '{}' ({}) in directory '{}' not seen yet", - child_directory.name, - BASE64.encode(&child_directory.digest), - BASE64.encode(&directory.digest()), - ))); - } - Some(seen_child_directory_size) => { - if seen_child_directory_size != &child_directory.size { - return Err(Status::invalid_argument(format!( - "child directory '{}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}", - child_directory.name, - BASE64.encode(&child_directory.digest), - BASE64.encode(&directory.digest()), - seen_child_directory_size, - child_directory.size, - ))); - } - } - } - } - - // TODO: do we want to verify this somehow is connected to the current graph? - // theoretically, this currently allows uploading multiple - // disconnected graphs at the same time… - - let dgst = directory.digest(); - seen_directories_sizes.insert(dgst.clone(), directory.size()); - last_directory_dgst = Some(dgst.clone()); - - // check if the directory already exists in the database. We can skip - // inserting if it's already there, as that'd be a no-op. - match txn.get(dgst.clone()) { - Ok(None) => { - // insert the directory into the database - if let Err(e) = txn.insert(dgst, directory.encode_to_vec()) { - match e { - // TODO: conflict is a problem, as the whole transaction closure is retried? - sled::transaction::UnabortableTransactionError::Conflict => todo!(), - sled::transaction::UnabortableTransactionError::Storage(e) => { - warn!("storage error: {}", e); - return Err(Status::internal("storage error")); - } - } - } - } - Ok(Some(_)) => continue, - Err(e) => { - warn!("storage error: {}", e); - return Err(Status::internal("storage error")); - } - } - } - - // no more directories to be send. Return the digest of the - // last (root) node, or an error if we don't receive a single one. - match last_directory_dgst { - Some(last_directory_dgst) => Ok(last_directory_dgst), - None => Err(Status::invalid_argument("no directories received")), - } -} - -#[tonic::async_trait] -impl DirectoryService for SledDirectoryService { - type GetStream = ReceiverStream<Result<Directory>>; - - #[instrument(skip(self))] - async fn get( - &self, - request: Request<GetDirectoryRequest>, - ) -> Result<Response<Self::GetStream>, Status> { - let (tx, rx) = channel(5); - - let req_inner = request.into_inner(); - - // clone self.db (cheap), so we don't refer to self in the thread. - let db = self.db.clone(); - - // kick off a thread - task::spawn_blocking(move || { - // open a DB transaction. - let txn_res = db.transaction(|txn| { - send_directories(txn, &tx, req_inner.clone()) - .map_err(sled::transaction::ConflictableTransactionError::Abort) - }); - - // handle transaction errors - match txn_res { - Ok(()) => Ok(()), - Err(sled::transaction::TransactionError::Abort(status)) => { - // if the transaction was aborted, there was an error. Send it to the client - tx.blocking_send(Err(status)) - } - Err(sled::transaction::TransactionError::Storage(e)) => { - warn!("storage error: {}", e); - tx.blocking_send(Err(Status::internal("storage error"))) - } - } - }); - - // NOTE: this always returns an Ok response, with the first item in the - // stream being a potential error, instead of directly returning the - // first error. - // Peeking on the first element seems to be extraordinarily hard, - // and client code considers these two equivalent anyways. - let receiver_stream = ReceiverStream::new(rx); - Ok(Response::new(receiver_stream)) - } - - #[instrument(skip(self, request))] - async fn put( - &self, - request: Request<Streaming<Directory>>, - ) -> Result<Response<PutDirectoryResponse>> { - let mut req_inner = request.into_inner(); - - // TODO: for now, we collect all Directory messages into a Vec, and - // pass it to the helper function. - // Ideally, we would validate things as we receive it, and have a way - // to reject early - but the semantics of transactions (and its - // retries) don't allow us to discard things that early anyways. - let mut directories: Vec<Directory> = Vec::new(); - - while let Some(directory) = req_inner.message().await? { - directories.push(directory) - } - - let txn_res = self.db.transaction(|txn| { - insert_directories(txn, &directories) - .map_err(sled::transaction::ConflictableTransactionError::Abort) - }); - - match txn_res { - Ok(last_directory_dgst) => Ok(Response::new(PutDirectoryResponse { - root_digest: last_directory_dgst, - })), - Err(sled::transaction::TransactionError::Storage(e)) => { - warn!("storage error: {}", e); - Err(Status::internal("storage error")) - } - Err(sled::transaction::TransactionError::Abort(e)) => Err(e), - } - } -} diff --git a/tvix/store/src/sled_path_info_service.rs b/tvix/store/src/sled_path_info_service.rs deleted file mode 100644 index 6b9212a3c912..000000000000 --- a/tvix/store/src/sled_path_info_service.rs +++ /dev/null @@ -1,89 +0,0 @@ -use prost::Message; -use std::path::PathBuf; - -use crate::proto::get_path_info_request::ByWhat; -use crate::proto::path_info_service_server::PathInfoService; -use crate::proto::CalculateNarResponse; -use crate::proto::GetPathInfoRequest; -use crate::proto::Node; -use crate::proto::PathInfo; -use nix_compat::store_path::DIGEST_SIZE; -use tonic::{Request, Response, Result, Status}; -use tracing::{instrument, warn}; - -const NOT_IMPLEMENTED_MSG: &str = "not implemented"; - -/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). -/// -/// The PathInfo messages are stored as encoded protos, and keyed by their output hash, -/// as that's currently the only request type available. -pub struct SledPathInfoService { - db: sled::Db, -} - -impl SledPathInfoService { - pub fn new(p: PathBuf) -> Result<Self, anyhow::Error> { - let config = sled::Config::default().use_compression(true).path(p); - let db = config.open()?; - - Ok(Self { db }) - } -} - -#[tonic::async_trait] -impl PathInfoService for SledPathInfoService { - #[instrument(skip(self))] - async fn get(&self, request: Request<GetPathInfoRequest>) -> Result<Response<PathInfo>> { - match request.into_inner().by_what { - None => Err(Status::unimplemented("by_what needs to be specified")), - Some(ByWhat::ByOutputHash(digest)) => { - if digest.len() != DIGEST_SIZE { - return Err(Status::invalid_argument("invalid digest length")); - } - - match self.db.get(digest) { - Ok(None) => Err(Status::not_found("PathInfo not found")), - Ok(Some(data)) => match PathInfo::decode(&*data) { - Ok(path_info) => Ok(Response::new(path_info)), - Err(e) => { - warn!("failed to decode stored PathInfo: {}", e); - Err(Status::internal("failed to decode stored PathInfo")) - } - }, - Err(e) => { - warn!("failed to retrieve PathInfo: {}", e); - Err(Status::internal("error during PathInfo lookup")) - } - } - } - } - } - - #[instrument(skip(self))] - async fn put(&self, request: Request<PathInfo>) -> Result<Response<PathInfo>> { - let path_info = request.into_inner(); - - // Call validate on the received PathInfo message. - match path_info.validate() { - Err(e) => Err(Status::invalid_argument(e.to_string())), - // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. - // This overwrites existing PathInfo objects. - Ok(nix_path) => match self.db.insert(nix_path.digest, path_info.encode_to_vec()) { - Ok(_) => Ok(Response::new(path_info)), - Err(e) => { - warn!("failed to insert PathInfo: {}", e); - Err(Status::internal("failed to insert PathInfo")) - } - }, - } - } - - #[instrument(skip(self))] - async fn calculate_nar( - &self, - _request: Request<Node>, - ) -> Result<Response<CalculateNarResponse>> { - warn!(NOT_IMPLEMENTED_MSG); - Err(Status::unimplemented(NOT_IMPLEMENTED_MSG)) - } -} diff --git a/tvix/store/src/tests/directory_service.rs b/tvix/store/src/tests/directory_service.rs deleted file mode 100644 index 8c66ccb53866..000000000000 --- a/tvix/store/src/tests/directory_service.rs +++ /dev/null @@ -1,268 +0,0 @@ -use tempfile::TempDir; -use tokio_stream::StreamExt; -use tonic::Status; - -use crate::proto::directory_service_server::DirectoryService; -use crate::proto::get_directory_request::ByWhat; -use crate::proto::GetDirectoryRequest; -use crate::proto::{Directory, DirectoryNode, SymlinkNode}; -use crate::sled_directory_service::SledDirectoryService; -use lazy_static::lazy_static; - -lazy_static! { - static ref DIRECTORY_A: Directory = Directory::default(); - static ref DIRECTORY_B: Directory = Directory { - directories: vec![DirectoryNode { - name: "a".to_string(), - digest: DIRECTORY_A.digest(), - size: DIRECTORY_A.size(), - }], - ..Default::default() - }; - static ref DIRECTORY_C: Directory = Directory { - directories: vec![ - DirectoryNode { - name: "a".to_string(), - digest: DIRECTORY_A.digest(), - size: DIRECTORY_A.size(), - }, - DirectoryNode { - name: "a'".to_string(), - digest: DIRECTORY_A.digest(), - size: DIRECTORY_A.size(), - } - ], - ..Default::default() - }; -} - -/// Send the specified GetDirectoryRequest. -/// Returns an error in the case of an error response, or an error in one of the items in the stream, -/// or a Vec<Directory> in the case of a successful request. -async fn get_directories<S: DirectoryService>( - svc: &S, - get_directory_request: GetDirectoryRequest, -) -> Result<Vec<Directory>, Status> { - let resp = svc.get(tonic::Request::new(get_directory_request)).await; - - // if the response is an error itself, return the error, otherwise unpack - let stream = match resp { - Ok(resp) => resp, - Err(status) => return Err(status), - } - .into_inner(); - - let directory_results: Vec<Result<Directory, Status>> = stream.collect().await; - - // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status> - directory_results.into_iter().collect() -} - -/// Trying to get a non-existent Directory should return a not found error. -#[tokio::test] -async fn not_found() -> anyhow::Result<()> { - let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?; - - let resp = service - .get(tonic::Request::new(GetDirectoryRequest { - by_what: Some(ByWhat::Digest(DIRECTORY_A.digest())), - ..Default::default() - })) - .await; - - let mut rx = resp.expect("must succeed").into_inner().into_inner(); - - // The stream should contain one element, an error with Code::NotFound. - let item = rx - .recv() - .await - .expect("must be some") - .expect_err("must be err"); - assert_eq!(item.code(), tonic::Code::NotFound); - - // … and nothing else - assert!(rx.recv().await.is_none()); - - Ok(()) -} - -/// Put a Directory into the store, get it back. -#[tokio::test] -async fn put_get() -> anyhow::Result<()> { - let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?; - - let streaming_request = tonic_mock::streaming_request(vec![DIRECTORY_A.clone()]); - let put_resp = service - .put(streaming_request) - .await - .expect("must succeed") - .into_inner(); - - // the sent root_digest should match the calculated digest - assert_eq!(put_resp.root_digest, DIRECTORY_A.digest()); - - // get it back - let items = get_directories( - &service, - GetDirectoryRequest { - by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().to_vec())), - ..Default::default() - }, - ) - .await - .expect("must not error"); - - assert_eq!(vec![DIRECTORY_A.clone()], items); - - Ok(()) -} - -/// Put multiple Directories into the store, and get them back -#[tokio::test] -async fn put_get_multiple() -> anyhow::Result<()> { - let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?; - - // sending "b" (which refers to "a") without sending "a" first should fail. - let put_resp = service - .put(tonic_mock::streaming_request(vec![DIRECTORY_B.clone()])) - .await - .expect_err("must fail"); - - assert_eq!(tonic::Code::InvalidArgument, put_resp.code()); - - // sending "a", then "b" should succeed, and the response should contain the digest of b. - let put_resp = service - .put(tonic_mock::streaming_request(vec![ - DIRECTORY_A.clone(), - DIRECTORY_B.clone(), - ])) - .await - .expect("must succeed"); - - assert_eq!(DIRECTORY_B.digest(), put_resp.into_inner().root_digest); - - // now, request b, first in non-recursive mode. - let items = get_directories( - &service, - GetDirectoryRequest { - recursive: false, - by_what: Some(ByWhat::Digest(DIRECTORY_B.digest())), - }, - ) - .await - .expect("must not error"); - - // We expect to only get b. - assert_eq!(vec![DIRECTORY_B.clone()], items); - - // now, request b, but in recursive mode. - let items = get_directories( - &service, - GetDirectoryRequest { - recursive: true, - by_what: Some(ByWhat::Digest(DIRECTORY_B.digest())), - }, - ) - .await - .expect("must not error"); - - // We expect to get b, and then a, because that's how we traverse down. - assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items); - - Ok(()) -} - -/// Put multiple Directories into the store, and omit duplicates. -#[tokio::test] -async fn put_get_dedup() -> anyhow::Result<()> { - let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?; - - // Send "A", then "C", which refers to "A" two times - // Pretend we're a dumb client sending A twice. - let put_resp = service - .put(tonic_mock::streaming_request(vec![ - DIRECTORY_A.clone(), - DIRECTORY_A.clone(), - DIRECTORY_C.clone(), - ])) - .await - .expect("must succeed"); - - assert_eq!(DIRECTORY_C.digest(), put_resp.into_inner().root_digest); - - // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice. - let items = get_directories( - &service, - GetDirectoryRequest { - recursive: true, - by_what: Some(ByWhat::Digest(DIRECTORY_C.digest())), - }, - ) - .await - .expect("must not error"); - - // We expect to get C, and then A (once, as the second A has been deduplicated). - assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items); - - Ok(()) -} - -/// Trying to upload a Directory failing validation should fail. -#[tokio::test] -async fn put_reject_failed_validation() -> anyhow::Result<()> { - let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?; - - // construct a broken Directory message that fails validation - let broken_directory = Directory { - symlinks: vec![SymlinkNode { - name: "".to_string(), - target: "doesntmatter".to_string(), - }], - ..Default::default() - }; - assert!(broken_directory.validate().is_err()); - - // send it over, it must fail - let put_resp = service - .put(tonic_mock::streaming_request(vec![broken_directory])) - .await - .expect_err("must fail"); - - assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); - - Ok(()) -} - -/// Trying to upload a Directory with wrong size should fail. -#[tokio::test] -async fn put_reject_wrong_size() -> anyhow::Result<()> { - let service = SledDirectoryService::new(TempDir::new()?.path().to_path_buf())?; - - // Construct a directory referring to DIRECTORY_A, but with wrong size. - let broken_parent_directory = Directory { - directories: vec![DirectoryNode { - name: "foo".to_string(), - digest: DIRECTORY_A.digest(), - size: 42, - }], - ..Default::default() - }; - // Make sure we got the size wrong. - assert_ne!( - broken_parent_directory.directories[0].size, - DIRECTORY_A.size() - ); - - // now upload both (first A, then the broken parent). This must fail. - let put_resp = service - .put(tonic_mock::streaming_request(vec![ - DIRECTORY_A.clone(), - broken_parent_directory, - ])) - .await - .expect_err("must fail"); - - assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); - - Ok(()) -} diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs index 6947b277d481..735b0789f80d 100644 --- a/tvix/store/src/tests/mod.rs +++ b/tvix/store/src/tests/mod.rs @@ -1,3 +1 @@ -mod directory_service; mod nar_renderer; -mod path_info_service; diff --git a/tvix/store/src/tests/path_info_service.rs b/tvix/store/src/tests/path_info_service.rs deleted file mode 100644 index 42e6db36ca88..000000000000 --- a/tvix/store/src/tests/path_info_service.rs +++ /dev/null @@ -1,67 +0,0 @@ -use tempfile::TempDir; -use tonic::Request; - -use crate::proto::get_path_info_request::ByWhat::ByOutputHash; -use crate::proto::node::Node::Symlink; -use crate::proto::path_info_service_server::PathInfoService; -use crate::proto::PathInfo; -use crate::proto::{GetPathInfoRequest, Node, SymlinkNode}; -use crate::sled_path_info_service::SledPathInfoService; - -use lazy_static::lazy_static; - -lazy_static! { - static ref DUMMY_OUTPUT_HASH: Vec<u8> = vec![ - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00 - ]; -} - -/// Trying to get a non-existent PathInfo should return a not found error. -#[tokio::test] -async fn not_found() -> anyhow::Result<()> { - let service = SledPathInfoService::new(TempDir::new()?.path().to_path_buf())?; - - let resp = service - .get(Request::new(GetPathInfoRequest { - by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.to_vec())), - })) - .await; - - let resp = resp.expect_err("must fail"); - assert_eq!(resp.code(), tonic::Code::NotFound); - - Ok(()) -} - -/// Put a PathInfo into the store, get it back. -#[tokio::test] -async fn put_get() -> anyhow::Result<()> { - let service = SledPathInfoService::new(TempDir::new()?.path().to_path_buf())?; - - let path_info = PathInfo { - node: Some(Node { - node: Some(Symlink(SymlinkNode { - name: "00000000000000000000000000000000-foo".to_string(), - target: "doesntmatter".to_string(), - })), - }), - ..Default::default() - }; - - let resp = service.put(Request::new(path_info.clone())).await; - - assert!(resp.is_ok()); - assert_eq!(resp.expect("must succeed").into_inner(), path_info); - - let resp = service - .get(Request::new(GetPathInfoRequest { - by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.to_vec())), - })) - .await; - - assert!(resp.is_ok()); - assert_eq!(resp.expect("must succeed").into_inner(), path_info); - - Ok(()) -} |