about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice/tests/mod.rs
//! This contains test scenarios that a given [DirectoryService] needs to pass.
//! We use [rstest] and [rstest_reuse] to provide all services we want to test
//! against, and then apply this template to all test functions.

use futures::StreamExt;
use rstest::*;
use rstest_reuse::{self, *};

use super::DirectoryService;
use crate::directoryservice;
use crate::{
    fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D},
    proto::{self, Directory},
};

mod utils;
use self::utils::make_grpc_directory_service_client;

// TODO: add tests doing individual puts of a closure, then doing a get_recursive
// (and figure out semantics if necessary)

/// This produces a template, which will be applied to all individual test functions.
/// See https://github.com/la10736/rstest/issues/130#issuecomment-968864832
#[template]
#[rstest]
#[case::grpc(make_grpc_directory_service_client().await)]
#[case::memory(directoryservice::from_addr("memory://").await.unwrap())]
#[case::sled(directoryservice::from_addr("sled://").await.unwrap())]
#[case::redb(directoryservice::from_addr("redb://").await.unwrap())]
#[case::objectstore(directoryservice::from_addr("objectstore+memory://").await.unwrap())]
#[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))]
pub fn directory_services(#[case] directory_service: impl DirectoryService) {}

/// Ensures asking for a directory that doesn't exist returns a Ok(None), and a get_recursive
/// returns an empty stream.
#[apply(directory_services)]
#[tokio::test]
async fn test_non_exist(directory_service: impl DirectoryService) {
    // single get
    assert_eq!(Ok(None), directory_service.get(&DIRECTORY_A.digest()).await);

    // recursive get
    assert_eq!(
        Vec::<Result<proto::Directory, crate::Error>>::new(),
        directory_service
            .get_recursive(&DIRECTORY_A.digest())
            .collect::<Vec<Result<proto::Directory, crate::Error>>>()
            .await
    );
}

/// Putting a single directory into the store, and then getting it out both via
/// `.get[_recursive]` should work.
#[apply(directory_services)]
#[tokio::test]
async fn put_get(directory_service: impl DirectoryService) {
    // Insert a Directory.
    let digest = directory_service.put(DIRECTORY_A.clone()).await.unwrap();
    assert_eq!(DIRECTORY_A.digest(), digest, "returned digest must match");

    // single get
    assert_eq!(
        Some(DIRECTORY_A.clone()),
        directory_service.get(&DIRECTORY_A.digest()).await.unwrap()
    );

    // recursive get
    assert_eq!(
        vec![Ok(DIRECTORY_A.clone())],
        directory_service
            .get_recursive(&DIRECTORY_A.digest())
            .collect::<Vec<_>>()
            .await
    );
}

/// Putting a directory closure should work, and it should be possible to get
/// back the root node both via .get[_recursive]. We don't check `.get` for the
/// leaf node is possible, as it's Ok for stores to not support that.
#[apply(directory_services)]
#[tokio::test]
async fn put_get_multiple_success(directory_service: impl DirectoryService) {
    // Insert a Directory closure.
    let mut handle = directory_service.put_multiple_start();
    handle.put(DIRECTORY_A.clone()).await.unwrap();
    handle.put(DIRECTORY_C.clone()).await.unwrap();
    let root_digest = handle.close().await.unwrap();
    assert_eq!(
        DIRECTORY_C.digest(),
        root_digest,
        "root digest should match"
    );

    // Get the root node.
    assert_eq!(
        Some(DIRECTORY_C.clone()),
        directory_service.get(&DIRECTORY_C.digest()).await.unwrap()
    );

    // Get the closure. Ensure it's sent from the root to the leaves.
    assert_eq!(
        vec![Ok(DIRECTORY_C.clone()), Ok(DIRECTORY_A.clone())],
        directory_service
            .get_recursive(&DIRECTORY_C.digest())
            .collect::<Vec<_>>()
            .await
    )
}

/// Puts a directory closure, but simulates a dumb client not deduplicating
/// its list. Ensure we still only get back a deduplicated list.
#[apply(directory_services)]
#[tokio::test]
async fn put_get_multiple_dedup(directory_service: impl DirectoryService) {
    // Insert a Directory closure.
    let mut handle = directory_service.put_multiple_start();
    handle.put(DIRECTORY_A.clone()).await.unwrap();
    handle.put(DIRECTORY_A.clone()).await.unwrap();
    handle.put(DIRECTORY_C.clone()).await.unwrap();
    let root_digest = handle.close().await.unwrap();
    assert_eq!(
        DIRECTORY_C.digest(),
        root_digest,
        "root digest should match"
    );

    // Ensure the returned closure only contains `DIRECTORY_A` once.
    assert_eq!(
        vec![Ok(DIRECTORY_C.clone()), Ok(DIRECTORY_A.clone())],
        directory_service
            .get_recursive(&DIRECTORY_C.digest())
            .collect::<Vec<_>>()
            .await
    )
}

/// This tests the insertion and retrieval of a closure which contains a duplicated directory
/// (DIRECTORY_A, which is an empty directory), once in the root, and once in a subdir.
#[apply(directory_services)]
#[tokio::test]
async fn put_get_foo(directory_service: impl DirectoryService) {
    let mut handle = directory_service.put_multiple_start();
    handle.put(DIRECTORY_A.clone()).await.unwrap();
    handle.put(DIRECTORY_B.clone()).await.unwrap();
    handle.put(DIRECTORY_D.clone()).await.unwrap();
    let root_digest = handle.close().await.unwrap();
    assert_eq!(
        DIRECTORY_D.digest(),
        root_digest,
        "root digest should match"
    );

    // Ensure we can get the closure back out of the service, and it is returned in a valid order
    // (there are multiple valid possibilities)
    let retrieved_closure = directory_service
        .get_recursive(&DIRECTORY_D.digest())
        .collect::<Vec<_>>()
        .await;

    let valid_closures = [
        vec![
            Ok(DIRECTORY_D.clone()),
            Ok(DIRECTORY_B.clone()),
            Ok(DIRECTORY_A.clone()),
        ],
        vec![
            Ok(DIRECTORY_D.clone()),
            Ok(DIRECTORY_A.clone()),
            Ok(DIRECTORY_B.clone()),
        ],
    ];
    if !valid_closures.contains(&retrieved_closure) {
        panic!("invalid closure returned: {:?}", retrieved_closure);
    }
}

/// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close,
/// as B itself would be left unconnected.
#[apply(directory_services)]
#[tokio::test]
async fn upload_reject_unconnected(directory_service: impl DirectoryService) {
    let mut handle = directory_service.put_multiple_start();

    handle.put(DIRECTORY_A.clone()).await.unwrap();
    handle.put(DIRECTORY_C.clone()).await.unwrap();
    handle.put(DIRECTORY_B.clone()).await.unwrap();

    assert!(
        handle.close().await.is_err(),
        "closing handle should fail, as B would be left unconnected"
    );
}

/// Uploading a directory that refers to another directory not yet uploaded
/// should fail.
#[apply(directory_services)]
#[tokio::test]
async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService) {
    let mut handle = directory_service.put_multiple_start();

    // We insert DIRECTORY_A on its own, to ensure the check runs for the
    // individual put_multiple session, not across the global DirectoryService
    // contents.
    directory_service.put(DIRECTORY_A.clone()).await.unwrap();

    // DIRECTORY_B refers to DIRECTORY_A, which is not uploaded with this handle.
    if handle.put(DIRECTORY_B.clone()).await.is_ok() {
        assert!(
            handle.close().await.is_err(),
            "when succeeding put, close must fail"
        )
    }
}

/// Try uploading a Directory failing its internal validation, ensure it gets
/// rejected.
#[apply(directory_services)]
#[tokio::test]
async fn upload_reject_failing_validation(directory_service: impl DirectoryService) {
    let broken_directory = Directory {
        symlinks: vec![proto::SymlinkNode {
            name: "".into(), // wrong!
            target: "doesntmatter".into(),
        }],
        ..Default::default()
    };
    assert!(broken_directory.validate().is_err());

    // Try to upload via single upload.
    assert!(
        directory_service
            .put(broken_directory.clone())
            .await
            .is_err(),
        "single upload must fail"
    );

    // Try to upload via put_multiple. We're a bit more permissive here, the
    // intermediate .put() might succeed, due to client-side bursting (in the
    // case of gRPC), but then the close MUST fail.
    let mut handle = directory_service.put_multiple_start();
    if handle.put(broken_directory).await.is_ok() {
        assert!(
            handle.close().await.is_err(),
            "when succeeding put, close must fail"
        )
    }
}

/// Try uploading a Directory that refers to a previously-uploaded directory.
/// Both pass their isolated validation, but the size field in the parent is wrong.
/// This should be rejected.
#[apply(directory_services)]
#[tokio::test]
async fn upload_reject_wrong_size(directory_service: impl DirectoryService) {
    let wrong_parent_directory = Directory {
        directories: vec![proto::DirectoryNode {
            name: "foo".into(),
            digest: DIRECTORY_A.digest().into(),
            size: DIRECTORY_A.size() + 42, // wrong!
        }],
        ..Default::default()
    };

    // Make sure isolated validation itself is ok
    assert!(wrong_parent_directory.validate().is_ok());

    // Now upload both. Ensure it either fails during the second put, or during
    // the close.
    let mut handle = directory_service.put_multiple_start();
    handle.put(DIRECTORY_A.clone()).await.unwrap();
    if handle.put(wrong_parent_directory).await.is_ok() {
        assert!(
            handle.close().await.is_err(),
            "when second put succeeds, close must fail"
        )
    }
}