1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
use crate::nar::seekable::Reader;
use crate::tests::fixtures::blob_service_with_contents as blob_service;
use crate::tests::fixtures::directory_service_with_contents as directory_service;
use crate::tests::fixtures::*;
use rstest::*;
use rstest_reuse::*;
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncSeek, AsyncSeekExt};
use tvix_castore::blobservice::BlobService;
use tvix_castore::directoryservice::DirectoryService;
use tvix_castore::Node;
#[apply(castore_fixtures_template)]
#[tokio::test]
async fn read_to_end(
#[future] blob_service: Arc<dyn BlobService>,
#[future] directory_service: Arc<dyn DirectoryService>,
#[case] test_input: &Node,
#[case] test_output: Result<Result<&Vec<u8>, io::ErrorKind>, crate::nar::RenderError>,
) {
let reader_result = Reader::new(
test_input.clone(),
// don't put anything in the stores, as we don't actually do any requests.
blob_service.await,
directory_service.await,
)
.await;
match (reader_result, test_output) {
(Ok(_), Err(_)) => panic!("creating reader should have failed but succeeded"),
(Err(err), Ok(_)) => panic!("creating reader should have succeeded but failed: {}", err),
(Err(reader_err), Err(expected_err)) => {
assert_eq!(format!("{}", reader_err), format!("{}", expected_err));
}
(Ok(mut reader), Ok(expected_read_result)) => {
let mut buf: Vec<u8> = vec![];
let read_result = reader.read_to_end(&mut buf).await;
match (read_result, expected_read_result) {
(Ok(_), Err(_)) => panic!("read_to_end should have failed but succeeded"),
(Err(err), Ok(_)) => {
panic!("read_to_end should have succeeded but failed: {}", err)
}
(Err(read_err), Err(expected_read_err)) => {
assert_eq!(read_err.kind(), expected_read_err);
}
(Ok(_n), Ok(expected_read_result)) => {
assert_eq!(buf, expected_read_result.to_vec());
}
}
}
}
}
#[rstest]
#[tokio::test]
/// Check that the Reader does not allow starting a seek while another seek is running
/// If this is not prevented, it might lead to futures piling up on the heap
async fn seek_twice(
#[future] blob_service: Arc<dyn BlobService>,
#[future] directory_service: Arc<dyn DirectoryService>,
) {
let mut reader = Reader::new(
CASTORE_NODE_COMPLICATED.clone(),
// don't put anything in the stores, as we don't actually do any requests.
blob_service.await,
directory_service.await,
)
.await
.expect("must succeed");
Pin::new(&mut reader)
.start_seek(io::SeekFrom::Start(1))
.expect("must succeed");
let seek_err = Pin::new(&mut reader)
.start_seek(io::SeekFrom::Start(2))
.expect_err("must fail");
assert_eq!(seek_err.kind(), io::ErrorKind::Other);
assert_eq!(seek_err.to_string(), "Already seeking".to_string());
}
#[rstest]
#[tokio::test]
async fn seek(
#[future] blob_service: Arc<dyn BlobService>,
#[future] directory_service: Arc<dyn DirectoryService>,
) {
let mut reader = Reader::new(
CASTORE_NODE_HELLOWORLD.clone(),
// don't put anything in the stores, as we don't actually do any requests.
blob_service.await,
directory_service.await,
)
.await
.expect("must succeed");
let mut buf = [0u8; 10];
for position in [
io::SeekFrom::Start(0x65), // Just before the file contents
io::SeekFrom::Start(0x68), // Seek back the file contents
io::SeekFrom::Start(0x70), // Just before the end of the file contents
] {
let n = reader.seek(position).await.expect("seek") as usize;
reader.read_exact(&mut buf).await.expect("read_exact");
assert_eq!(NAR_CONTENTS_HELLOWORLD[n..n + 10], buf);
}
}
|