about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice/grpc.rs
blob: b036f16aca34b58ea15bc4f110c66d10aff62dfd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
use super::DirectoryService;
use crate::proto::{self, get_directory_request::ByWhat};
use tonic::transport::Channel;
use tonic::Code;

/// Connects to a (remote) tvix-store DirectoryService over gRPC.
#[derive(Clone)]
pub struct GRPCDirectoryService {
    /// A handle into the active tokio runtime. Necessary to spawn tasks.
    tokio_handle: tokio::runtime::Handle,

    /// The internal reference to a gRPC client.
    /// Cloning it is cheap, and it internally handles concurrent requests.
    grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
}

impl GRPCDirectoryService {
    /// Construct a new [GRPCDirectoryService], by passing a handle to the
    /// tokio runtime, and a gRPC client.
    pub fn new(
        tokio_handle: tokio::runtime::Handle,
        grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
    ) -> Self {
        Self {
            tokio_handle,
            grpc_client,
        }
    }
}

impl DirectoryService for GRPCDirectoryService {
    fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> {
        // Get a new handle to the gRPC client, and copy the digest.
        let mut grpc_client = self.grpc_client.clone();
        let digest = digest.to_owned();

        // TODO: do requests recursively, populate a backing other
        // [DirectoryService] as cache, and ask it first.
        let task = self.tokio_handle.spawn(async move {
            let mut s = grpc_client
                .get(proto::GetDirectoryRequest {
                    recursive: false,
                    by_what: Some(ByWhat::Digest(digest.to_vec())),
                })
                .await?
                .into_inner();

            // Retrieve the first message only, then close the stream (we set recursive to false)
            s.message().await
        });

        match self.tokio_handle.block_on(task)? {
            Ok(resp) => Ok(resp),
            Err(e) if e.code() == Code::NotFound => Ok(None),
            Err(e) => Err(crate::Error::StorageError(e.to_string())),
        }
    }

    fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> {
        let mut grpc_client = self.grpc_client.clone();

        // TODO: this currently doesn't work for directories referring to other
        // directories, as we're required to upload the whole closure all the
        // time.
        let task = self
            .tokio_handle
            .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });

        match self.tokio_handle.block_on(task)? {
            Ok(put_directory_resp) => Ok(put_directory_resp
                .into_inner()
                .root_digest
                .as_slice()
                .try_into()
                .unwrap()), // TODO: map error
            Err(e) => Err(crate::Error::StorageError(e.to_string())),
        }
    }
}

#[cfg(test)]
mod tests {
    use core::time;
    use std::thread;

    use tempfile::TempDir;
    use tokio::net::{UnixListener, UnixStream};
    use tokio_stream::wrappers::UnixListenerStream;
    use tonic::transport::{Endpoint, Server, Uri};

    use crate::{
        directoryservice::DirectoryService,
        proto,
        proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper},
        tests::{fixtures::DIRECTORY_A, utils::gen_directory_service},
    };

    #[test]
    fn test() -> anyhow::Result<()> {
        let tmpdir = TempDir::new().unwrap();
        let socket_path = tmpdir.path().join("socket");

        // Spin up a server, in a thread far away, which spawns its own tokio runtime,
        // and blocks on the task.
        let socket_path_clone = socket_path.clone();
        thread::spawn(move || {
            // Create the runtime
            let rt = tokio::runtime::Runtime::new().unwrap();
            // Get a handle from this runtime
            let handle = rt.handle();

            let task = handle.spawn(async {
                let uds = UnixListener::bind(socket_path_clone).unwrap();
                let uds_stream = UnixListenerStream::new(uds);

                // spin up a new DirectoryService
                let mut server = Server::builder();
                let router = server.add_service(DirectoryServiceServer::new(
                    GRPCDirectoryServiceWrapper::from(gen_directory_service()),
                ));
                router.serve_with_incoming(uds_stream).await
            });

            handle.block_on(task)
        });

        // set up the local client runtime. This is similar to what the [tokio:test] macro desugars to.
        let tester_runtime = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        // TODO: wait for the socket to be created
        std::thread::sleep(time::Duration::from_millis(200));

        let task = tester_runtime.spawn_blocking(move || {
            // Create a channel, connecting to the uds at socket_path.
            // The URI is unused.
            let channel = Endpoint::try_from("http://[::]:50051")
                .unwrap()
                .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
                    UnixStream::connect(socket_path.clone())
                }));

            let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel);

            // create the GrpcDirectoryService, using the tester_runtime.
            let directory_service =
                super::GRPCDirectoryService::new(tokio::runtime::Handle::current(), grpc_client);

            // try to get DIRECTORY_A should return Ok(None)
            assert_eq!(
                None,
                directory_service
                    .get(&DIRECTORY_A.digest())
                    .expect("must not fail")
            );

            // Now upload it
            assert_eq!(
                DIRECTORY_A.digest(),
                directory_service
                    .put(DIRECTORY_A.clone())
                    .expect("must succeed")
            );

            // And retrieve it. We don't compare the two structs literally
            assert_eq!(
                DIRECTORY_A.clone(),
                directory_service
                    .get(&DIRECTORY_A.digest())
                    .expect("must succeed")
                    .expect("must be some")
            )
        });
        tester_runtime.block_on(task)?;

        Ok(())
    }
}