//! This contains test scenarios that a given [DirectoryService] needs to pass.
//! We use [rstest] and [rstest_reuse] to provide all services we want to test
//! against, and then apply this template to all test functions.
use futures::StreamExt;
use rstest::*;
use rstest_reuse::{self, *};
use super::DirectoryService;
use crate::directoryservice;
use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D};
use crate::{Directory, DirectoryNode, Node};
mod utils;
use self::utils::make_grpc_directory_service_client;
// TODO: add tests doing individual puts of a closure, then doing a get_recursive
// (and figure out semantics if necessary)
/// This produces a template, which will be applied to all individual test functions.
/// See https://github.com/la10736/rstest/issues/130#issuecomment-968864832
#[template]
#[rstest]
#[case::grpc(make_grpc_directory_service_client().await)]
#[case::memory(directoryservice::from_addr("memory://").await.unwrap())]
#[case::sled(directoryservice::from_addr("sled://").await.unwrap())]
#[case::redb(directoryservice::from_addr("redb://").await.unwrap())]
#[case::objectstore(directoryservice::from_addr("objectstore+memory://").await.unwrap())]
#[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))]
pub fn directory_services(#[case] directory_service: impl DirectoryService) {}
/// Ensures asking for a directory that doesn't exist returns a Ok(None), and a get_recursive
/// returns an empty stream.
#[apply(directory_services)]
#[tokio::test]
async fn test_non_exist(directory_service: impl DirectoryService) {
// single get
assert_eq!(Ok(None), directory_service.get(&DIRECTORY_A.digest()).await);
// recursive get
assert_eq!(
Vec::<Result<Directory, crate::Error>>::new(),
directory_service
.get_recursive(&DIRECTORY_A.digest())
.collect::<Vec<Result<Directory, crate::Error>>>()
.await
);
}
/// Putting a single directory into the store, and then getting it out both via
/// `.get[_recursive]` should work.
#[apply(directory_services)]
#[tokio::test]
async fn put_get(directory_service: impl DirectoryService) {
// Insert a Directory.
let digest = directory_service.put(DIRECTORY_A.clone()).await.unwrap();
assert_eq!(DIRECTORY_A.digest(), digest, "returned digest must match");
// single get
assert_eq!(
Some(DIRECTORY_A.clone()),
directory_service.get(&DIRECTORY_A.digest()).await.unwrap()
);
// recursive get
assert_eq!(
vec![Ok(DIRECTORY_A.clone())],
directory_service
.get_recursive(&DIRECTORY_A.digest())
.collect::<Vec<_>>()
.await
);
}
/// Putting a directory closure should work, and it should be possible to get
/// back the root node both via .get[_recursive]. We don't check `.get` for the
/// leaf node is possible, as it's Ok for stores to not support that.
#[apply(directory_services)]
#[tokio::test]
async fn put_get_multiple_success(directory_service: impl DirectoryService) {
// Insert a Directory closure.
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_A.clone()).await.unwrap();
handle.put(DIRECTORY_C.clone()).await.unwrap();
let root_digest = handle.close().await.unwrap();
assert_eq!(
DIRECTORY_C.digest(),
root_digest,
"root digest should match"
);
// Get the root node.
assert_eq!(
Some(DIRECTORY_C.clone()),
directory_service.get(&DIRECTORY_C.digest()).await.unwrap()
);
// Get the closure. Ensure it's sent from the root to the leaves.
assert_eq!(
vec![Ok(DIRECTORY_C.clone()), Ok(DIRECTORY_A.clone())],
directory_service
.get_recursive(&DIRECTORY_C.digest())
.collect::<Vec<_>>()
.await
)
}
/// Puts a directory closure, but simulates a dumb client not deduplicating
/// its list. Ensure we still only get back a deduplicated list.
#[apply(directory_services)]
#[tokio::test]
async fn put_get_multiple_dedup(directory_service: impl DirectoryService) {
// Insert a Directory closure.
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_A.clone()).await.unwrap();
handle.put(DIRECTORY_A.clone()).await.unwrap();
handle.put(DIRECTORY_C.clone()).await.unwrap();
let root_digest = handle.close().await.unwrap();
assert_eq!(
DIRECTORY_C.digest(),
root_digest,
"root digest should match"
);
// Ensure the returned closure only contains `DIRECTORY_A` once.
assert_eq!(
vec![Ok(DIRECTORY_C.clone()), Ok(DIRECTORY_A.clone())],
directory_service
.get_recursive(&DIRECTORY_C.digest())
.collect::<Vec<_>>()
.await
)
}
/// This tests the insertion and retrieval of a closure which contains a duplicated directory
/// (DIRECTORY_A, which is an empty directory), once in the root, and once in a subdir.
#[apply(directory_services)]
#[tokio::test]
async fn put_get_foo(directory_service: impl DirectoryService) {
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_A.clone()).await.unwrap();
handle.put(DIRECTORY_B.clone()).await.unwrap();
handle.put(DIRECTORY_D.clone()).await.unwrap();
let root_digest = handle.close().await.unwrap();
assert_eq!(
DIRECTORY_D.digest(),
root_digest,
"root digest should match"
);
// Ensure we can get the closure back out of the service, and it is returned in a valid order
// (there are multiple valid possibilities)
let retrieved_closure = directory_service
.get_recursive(&DIRECTORY_D.digest())
.collect::<Vec<_>>()
.await;
let valid_closures = [
vec![
Ok(DIRECTORY_D.clone()),
Ok(DIRECTORY_B.clone()),
Ok(DIRECTORY_A.clone()),
],
vec![
Ok(DIRECTORY_D.clone()),
Ok(DIRECTORY_A.clone()),
Ok(DIRECTORY_B.clone()),
],
];
if !valid_closures.contains(&retrieved_closure) {
panic!("invalid closure returned: {:?}", retrieved_closure);
}
}
/// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close,
/// as B itself would be left unconnected.
#[apply(directory_services)]
#[tokio::test]
async fn upload_reject_unconnected(directory_service: impl DirectoryService) {
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_A.clone()).await.unwrap();
handle.put(DIRECTORY_C.clone()).await.unwrap();
handle.put(DIRECTORY_B.clone()).await.unwrap();
assert!(
handle.close().await.is_err(),
"closing handle should fail, as B would be left unconnected"
);
}
/// Uploading a directory that refers to another directory not yet uploaded
/// should fail.
#[apply(directory_services)]
#[tokio::test]
async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService) {
let mut handle = directory_service.put_multiple_start();
// We insert DIRECTORY_A on its own, to ensure the check runs for the
// individual put_multiple session, not across the global DirectoryService
// contents.
directory_service.put(DIRECTORY_A.clone()).await.unwrap();
// DIRECTORY_B refers to DIRECTORY_A, which is not uploaded with this handle.
if handle.put(DIRECTORY_B.clone()).await.is_ok() {
assert!(
handle.close().await.is_err(),
"when succeeding put, close must fail"
)
}
}
/// Try uploading a Directory that refers to a previously-uploaded directory.
/// Both pass their isolated validation, but the size field in the parent is wrong.
/// This should be rejected.
#[apply(directory_services)]
#[tokio::test]
async fn upload_reject_wrong_size(directory_service: impl DirectoryService) {
let wrong_parent_directory = {
let mut dir = Directory::new();
dir.add(Node::Directory(
DirectoryNode::new(
"foo".into(),
DIRECTORY_A.digest(),
DIRECTORY_A.size() + 42, // wrong!
)
.unwrap(),
))
.unwrap();
dir
};
// Now upload both. Ensure it either fails during the second put, or during
// the close.
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_A.clone()).await.unwrap();
if handle.put(wrong_parent_directory).await.is_ok() {
assert!(
handle.close().await.is_err(),
"when second put succeeds, close must fail"
)
}
}