1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
|
use crate::proto::path_info_service_client::PathInfoServiceClient;
use super::{
GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService,
SledPathInfoService,
};
use nix_compat::narinfo;
use std::sync::Arc;
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
use url::Url;
/// Constructs a new instance of a [PathInfoService] from an URI.
///
/// The following URIs are supported:
/// - `memory:`
/// Uses a in-memory implementation.
/// - `sled:`
/// Uses a in-memory sled implementation.
/// - `sled:///absolute/path/to/somewhere`
/// Uses sled, using a path on the disk for persistency. Can be only opened
/// from one process at the same time.
/// - `nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=`
/// Exposes the Nix binary cache as a PathInfoService, ingesting NARs into the
/// {Blob,Directory}Service. You almost certainly want to use this with some cache.
/// The `trusted-public-keys` URL parameter can be provided, which will then
/// enable signature verification.
/// - `grpc+unix:///absolute/path/to/somewhere`
/// Connects to a local tvix-store gRPC service via Unix socket.
/// - `grpc+http://host:port`, `grpc+https://host:port`
/// Connects to a (remote) tvix-store gRPC service.
///
/// As the [PathInfoService] needs to talk to [BlobService] and [DirectoryService],
/// these also need to be passed in.
pub async fn from_addr(
uri: &str,
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) -> Result<Box<dyn PathInfoService>, Error> {
#[allow(unused_mut)]
let mut url =
Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
let path_info_service: Box<dyn PathInfoService> = match url.scheme() {
"memory" => {
// memory doesn't support host or path in the URL.
if url.has_host() || !url.path().is_empty() {
return Err(Error::StorageError("invalid url".to_string()));
}
Box::new(MemoryPathInfoService::new(blob_service, directory_service))
}
"sled" => {
// sled doesn't support host, and a path can be provided (otherwise
// it'll live in memory only).
if url.has_host() {
return Err(Error::StorageError("no host allowed".to_string()));
}
if url.path() == "/" {
return Err(Error::StorageError(
"cowardly refusing to open / with sled".to_string(),
));
}
// TODO: expose other parameters as URL parameters?
Box::new(if url.path().is_empty() {
SledPathInfoService::new_temporary(blob_service, directory_service)
.map_err(|e| Error::StorageError(e.to_string()))?
} else {
SledPathInfoService::new(url.path(), blob_service, directory_service)
.map_err(|e| Error::StorageError(e.to_string()))?
})
}
"nix+http" | "nix+https" => {
// Stringify the URL and remove the nix+ prefix.
// We can't use `url.set_scheme(rest)`, as it disallows
// setting something http(s) that previously wasn't.
let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap();
let mut nix_http_path_info_service =
NixHTTPPathInfoService::new(new_url, blob_service, directory_service);
let pairs = &url.query_pairs();
for (k, v) in pairs.into_iter() {
if k == "trusted-public-keys" {
let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect();
let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len());
for pubkey_str in pubkey_strs {
pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| {
Error::StorageError(format!("invalid public key: {e}"))
})?);
}
nix_http_path_info_service.set_public_keys(pubkeys);
}
}
Box::new(nix_http_path_info_service)
}
scheme if scheme.starts_with("grpc+") => {
// schemes starting with grpc+ go to the GRPCPathInfoService.
// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
// - In the case of unix sockets, there must be a path, but may not be a host.
// - In the case of non-unix sockets, there must be a host, but no path.
// Constructing the channel is handled by tvix_castore::channel::from_url.
let client =
PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?);
Box::new(GRPCPathInfoService::from_client(client))
}
#[cfg(feature = "cloud")]
"bigtable" => {
use super::bigtable::BigtableParameters;
use super::BigtablePathInfoService;
// parse the instance name from the hostname.
let instance_name = url
.host_str()
.ok_or_else(|| Error::StorageError("instance name missing".into()))?
.to_string();
// … but add it to the query string now, so we just need to parse that.
url.query_pairs_mut()
.append_pair("instance_name", &instance_name);
let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
.map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
Box::new(
BigtablePathInfoService::connect(params)
.await
.map_err(|e| Error::StorageError(e.to_string()))?,
)
}
_ => Err(Error::StorageError(format!(
"unknown scheme: {}",
url.scheme()
)))?,
};
Ok(path_info_service)
}
#[cfg(test)]
mod tests {
use super::from_addr;
use lazy_static::lazy_static;
use std::sync::Arc;
use tempfile::TempDir;
use test_case::test_case;
use tvix_castore::{
blobservice::{BlobService, MemoryBlobService},
directoryservice::{DirectoryService, MemoryDirectoryService},
};
lazy_static! {
static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap();
}
// the gRPC tests below don't fail, because we connect lazily.
/// This uses a unsupported scheme.
#[test_case("http://foo.example/test", false; "unsupported scheme")]
/// This configures sled in temporary mode.
#[test_case("sled://", true; "sled valid temporary")]
/// This configures sled with /, which should fail.
#[test_case("sled:///", false; "sled invalid root")]
/// This configures sled with a host, not path, which should fail.
#[test_case("sled://foo.example", false; "sled invalid host")]
/// This configures sled with a valid path path, which should succeed.
#[test_case(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true; "sled valid path")]
/// This configures sled with a host, and a valid path path, which should fail.
#[test_case(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false; "sled invalid host with valid path")]
/// This correctly sets the scheme, and doesn't set a path.
#[test_case("memory://", true; "memory valid")]
/// This sets a memory url host to `foo`
#[test_case("memory://foo", false; "memory invalid host")]
/// This sets a memory url path to "/", which is invalid.
#[test_case("memory:///", false; "memory invalid root path")]
/// This sets a memory url path to "/foo", which is invalid.
#[test_case("memory:///foo", false; "memory invalid root path foo")]
/// Correct Scheme for the cache.nixos.org binary cache.
#[test_case("nix+https://cache.nixos.org", true; "correct nix+https")]
/// Correct Scheme for the cache.nixos.org binary cache (HTTP URL).
#[test_case("nix+http://cache.nixos.org", true; "correct nix+http")]
/// Correct Scheme for Nix HTTP Binary cache, with a subpath.
#[test_case("nix+http://192.0.2.1/foo", true; "correct nix http with subpath")]
/// Correct Scheme for Nix HTTP Binary cache, with a subpath and port.
#[test_case("nix+http://[::1]:8080/foo", true; "correct nix http with subpath and port")]
/// Correct Scheme for the cache.nixos.org binary cache, and correct trusted public key set
#[test_case("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", true; "correct nix+https with trusted-public-key")]
/// Correct Scheme for the cache.nixos.org binary cache, and two correct trusted public keys set
#[test_case("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=%20foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", true; "correct nix+https with two trusted-public-key")]
/// Correct scheme to connect to a unix socket.
#[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")]
/// Correct scheme for unix socket, but setting a host too, which is invalid.
#[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")]
/// Correct scheme to connect to localhost, with port 12345
#[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")]
/// Correct scheme to connect to localhost over http, without specifying a port.
#[test_case("grpc+http://localhost", true; "grpc valid http host without port")]
/// Correct scheme to connect to localhost over http, without specifying a port.
#[test_case("grpc+https://localhost", true; "grpc valid https host without port")]
/// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
#[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")]
#[tokio::test]
async fn test_from_addr_tokio(uri_str: &str, exp_succeed: bool) {
let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
let directory_service: Arc<dyn DirectoryService> =
Arc::from(MemoryDirectoryService::default());
let resp = from_addr(uri_str, blob_service, directory_service).await;
if exp_succeed {
resp.expect("should succeed");
} else {
assert!(resp.is_err(), "should fail");
}
}
#[cfg(feature = "cloud")]
/// A valid example for Bigtable.
#[test_case("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1", true; "objectstore valid bigtable url")]
/// An invalid examplee for Bigtable, missing fields
#[test_case("bigtable://instance-1", false; "objectstore invalid bigtable url, missing fields")]
#[tokio::test]
async fn test_from_addr_tokio_cloud(uri_str: &str, exp_succeed: bool) {
let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
let directory_service: Arc<dyn DirectoryService> =
Arc::from(MemoryDirectoryService::default());
let resp = from_addr(uri_str, blob_service, directory_service).await;
if exp_succeed {
resp.expect("should succeed");
} else {
assert!(resp.is_err(), "should fail");
}
}
}
|