about summary refs log tree commit diff
path: root/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
blob: e73dfdab18663b6b43d7ba912bdd754c765104ab (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
use crate::directoryservice::DirectoryService;
use crate::proto;
use data_encoding::BASE64;
use std::collections::{HashMap, HashSet, VecDeque};
use tokio::{sync::mpsc::channel, task};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{debug, info_span, instrument, warn};

pub struct GRPCDirectoryServiceWrapper<C: DirectoryService> {
    directory_service: C,
}

impl<DS: DirectoryService> From<DS> for GRPCDirectoryServiceWrapper<DS> {
    fn from(value: DS) -> Self {
        Self {
            directory_service: value,
        }
    }
}

#[async_trait]
impl<DS: DirectoryService + Send + Sync + Clone + 'static>
    proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<DS>
{
    type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>;

    #[instrument(skip(self))]
    async fn get(
        &self,
        request: Request<proto::GetDirectoryRequest>,
    ) -> Result<Response<Self::GetStream>, Status> {
        let (tx, rx) = channel(5);

        let req_inner = request.into_inner();

        let directory_service = self.directory_service.clone();

        // kick off an async thread
        task::spawn(async move {
            // Keep the list of directory digests to traverse.
            // As per rpc_directory.proto, we traverse in BFS order.
            let mut deq: VecDeque<[u8; 32]> = VecDeque::new();

            // look at the digest in the request and put it in the top of the queue.
            match &req_inner.by_what {
                None => return Err(Status::invalid_argument("by_what needs to be specified")),
                Some(proto::get_directory_request::ByWhat::Digest(digest)) => {
                    deq.push_back(
                        digest
                            .as_slice()
                            .try_into()
                            .map_err(|_e| Status::invalid_argument("invalid digest length"))?,
                    );
                }
            }

            // keep a list of all the Directory messages already sent, so we can omit sending the same.
            let mut sent_directory_dgsts: HashSet<[u8; 32]> = HashSet::new();

            // look up the directory at the top of the queue
            while let Some(digest) = deq.pop_front() {
                let digest_b64: String = BASE64.encode(&digest);

                // add digest we're currently processing to a span, but pay attention to
                // https://docs.rs/tracing/0.1.37/tracing/span/struct.Span.html#in-asynchronous-code
                // There may be no await in here (we leave the span before the tx.send(…).await)
                let span = info_span!("digest", "{}", &digest_b64);

                let res: Result<proto::Directory, Status> = {
                    let _enter = span.enter();

                    // invoke client.get, and map to a Result<Directory, Status>
                    match directory_service.get(&digest) {
                        // The directory was not found, abort
                        Ok(None) => {
                            if !sent_directory_dgsts.is_empty() {
                                // If this is not the first lookup, we have a
                                // consistency issue, and we're missing some children, of which we have the
                                // parents. Log this out.
                                // Both the node we started with, and the
                                // current digest are part of the span.
                                warn!("consistency issue: directory not found")
                            }
                            Err(Status::not_found(format!(
                                "directory {} not found",
                                digest_b64
                            )))
                        }
                        Ok(Some(directory)) => {
                            // if recursion was requested, all its children need to be added to the queue.
                            // If a Directory message with the same digest has already
                            // been sent previously, we can skip enqueueing it.
                            // Same applies to when it already is in the queue.
                            if req_inner.recursive {
                                for child_directory_node in &directory.directories {
                                    let child_directory_node_digest: [u8; 32] =
                                        child_directory_node.digest.clone().try_into().map_err(
                                            |_e| {
                                                Status::internal(
                                                    "invalid child directory digest len",
                                                )
                                            },
                                        )?;

                                    if !sent_directory_dgsts.contains(&child_directory_node_digest)
                                        && !deq.contains(&child_directory_node_digest)
                                    {
                                        deq.push_back(child_directory_node_digest);
                                    }
                                }
                            }

                            // add it to sent_directory_dgsts.
                            // Strictly speaking, it wasn't sent yet, but tx.send happens right after,
                            // and the only way we can still fail is by the remote side to hang up,
                            // in which case we stop anyways.
                            sent_directory_dgsts.insert(digest);

                            Ok(directory)
                        }
                        Err(e) => Err(e.into()),
                    }
                };

                // send the result to the client
                if (tx.send(res).await).is_err() {
                    debug!("receiver dropped");
                    break;
                }
            }

            Ok(())
        });

        // NOTE: this always returns an Ok response, with the first item in the
        // stream being a potential error, instead of directly returning the
        // first error.
        // There's no need to check if the directory node exists twice,
        // and client code should consider an Err(), or the first item of the
        // stream being an error to be equivalent.
        let receiver_stream = ReceiverStream::new(rx);
        Ok(Response::new(receiver_stream))
    }

    #[instrument(skip(self, request))]
    async fn put(
        &self,
        request: Request<Streaming<proto::Directory>>,
    ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
        let mut req_inner = request.into_inner();
        // This keeps track of the seen directory keys, and their size.
        // This is used to validate the size field of a reference to a previously sent directory.
        // We don't need to keep the contents around, they're stored in the DB.
        let mut seen_directories_sizes: HashMap<[u8; 32], u32> = HashMap::new();
        let mut last_directory_dgst: Option<[u8; 32]> = None;

        // Consume directories, and insert them into the store.
        // Reject directory messages that refer to Directories not sent in the same stream.
        while let Some(directory) = req_inner.message().await? {
            // validate the directory itself.
            if let Err(e) = directory.validate() {
                return Err(Status::invalid_argument(format!(
                    "directory {} failed validation: {}",
                    BASE64.encode(&directory.digest()),
                    e,
                )));
            }

            // for each child directory this directory refers to, we need
            // to ensure it has been seen already in this stream, and that the size
            // matches what we recorded.
            for child_directory in &directory.directories {
                let child_directory_digest: [u8; 32] = child_directory
                    .digest
                    .clone()
                    .try_into()
                    .map_err(|_e| Status::internal("invalid child directory digest len"))?;

                match seen_directories_sizes.get(&child_directory_digest) {
                    None => {
                        return Err(Status::invalid_argument(format!(
                            "child directory '{}' ({}) in directory '{}' not seen yet",
                            child_directory.name,
                            BASE64.encode(&child_directory_digest),
                            BASE64.encode(&directory.digest()),
                        )));
                    }
                    Some(seen_child_directory_size) => {
                        if seen_child_directory_size != &child_directory.size {
                            return Err(Status::invalid_argument(format!(
                                    "child directory '{}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}",
                                    child_directory.name,
                                    BASE64.encode(&child_directory_digest),
                                    BASE64.encode(&directory.digest()),
                                    seen_child_directory_size,
                                    child_directory.size,
                                )));
                        }
                    }
                }
            }

            // TODO: We don't validate the currently received directory refers
            // to at least one child we already received.
            // This means, we thoeretically allow uploading multiple disconnected graphs,
            // and the digest of the last element in the stream becomes the root node.
            // For example, you can upload a leaf directory A, a leaf directory
            // B, and then as last element a directory C that only refers to A,
            // leaving B disconnected.
            // At some point, we might want to populate a datastructure that
            // does a reachability check.

            let dgst = directory.digest();
            seen_directories_sizes.insert(dgst, directory.size());
            last_directory_dgst = Some(dgst);

            // check if the directory already exists in the database. We can skip
            // inserting if it's already there, as that'd be a no-op.
            match self.directory_service.get(&dgst) {
                Err(e) => {
                    warn!("error checking if directory already exists: {}", e);
                    return Err(e.into());
                }
                // skip if already exists
                Ok(Some(_)) => {}
                // insert if it doesn't already exist
                Ok(None) => {
                    self.directory_service.put(directory)?;
                }
            }
        }

        // We're done receiving. peek at last_directory_digest and either return the digest,
        // or an error, if we received an empty stream.
        match last_directory_dgst {
            None => Err(Status::invalid_argument("no directories received")),
            Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse {
                root_digest: last_directory_dgst.to_vec(),
            })),
        }
    }
}