1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
use std::sync::Arc;
use std::{
pin::Pin,
task::{self, Poll},
};
use tokio::io::{self, AsyncWrite};
use tvix_castore::{
blobservice::{self, BlobService},
directoryservice::{self, DirectoryService},
};
use url::Url;
use crate::nar::{NarCalculationService, SimpleRenderer};
use crate::pathinfoservice::{self, PathInfoService};
/// Construct the store handles from their addrs.
pub async fn construct_services(
blob_service_addr: impl AsRef<str>,
directory_service_addr: impl AsRef<str>,
path_info_service_addr: impl AsRef<str>,
) -> std::io::Result<(
Arc<dyn BlobService>,
Arc<dyn DirectoryService>,
Box<dyn PathInfoService>,
Box<dyn NarCalculationService>,
)> {
let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref())
.await?
.into();
let directory_service: Arc<dyn DirectoryService> =
directoryservice::from_addr(directory_service_addr.as_ref())
.await?
.into();
let path_info_service = pathinfoservice::from_addr(
path_info_service_addr.as_ref(),
blob_service.clone(),
directory_service.clone(),
)
.await?;
// HACK: The grpc client also implements NarCalculationService, and we
// really want to use it (otherwise we'd need to fetch everything again for hashing).
// Until we revamped store composition and config, detect this special case here.
let nar_calculation_service: Box<dyn NarCalculationService> = {
use crate::pathinfoservice::GRPCPathInfoService;
use crate::proto::path_info_service_client::PathInfoServiceClient;
let url = Url::parse(path_info_service_addr.as_ref())
.map_err(|e| io::Error::other(e.to_string()))?;
if url.scheme().starts_with("grpc+") {
Box::new(GRPCPathInfoService::from_client(
PathInfoServiceClient::with_interceptor(
tvix_castore::tonic::channel_from_url(&url)
.await
.map_err(|e| io::Error::other(e.to_string()))?,
tvix_tracing::propagate::tonic::send_trace,
),
))
} else {
Box::new(SimpleRenderer::new(
blob_service.clone(),
directory_service.clone(),
)) as Box<dyn NarCalculationService>
}
};
Ok((
blob_service,
directory_service,
path_info_service,
nar_calculation_service,
))
}
/// The inverse of [tokio_util::io::SyncIoBridge].
/// Don't use this with anything that actually does blocking I/O.
pub struct AsyncIoBridge<T>(pub T);
impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(self.get_mut().0.write(buf))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(self.get_mut().0.flush())
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
|