about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice/mod.rs
blob: 0141a48f75bc04c95808b84b401f8fa67597b224 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
use crate::composition::{Registry, ServiceBuilder};
use crate::proto;
use crate::{B3Digest, Error};
use crate::{ValidateDirectoryError, ValidateNodeError};

use bytes::Bytes;
use futures::stream::BoxStream;
use tonic::async_trait;
mod combinators;
mod directory_graph;
mod from_addr;
mod grpc;
mod memory;
mod object_store;
mod order_validator;
mod redb;
mod simple_putter;
mod sled;
#[cfg(test)]
pub mod tests;
mod traverse;
mod utils;

pub use self::combinators::{Cache, CacheConfig};
pub use self::directory_graph::DirectoryGraph;
pub use self::from_addr::from_addr;
pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig};
pub use self::memory::{MemoryDirectoryService, MemoryDirectoryServiceConfig};
pub use self::object_store::{ObjectStoreDirectoryService, ObjectStoreDirectoryServiceConfig};
pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
pub use self::redb::{RedbDirectoryService, RedbDirectoryServiceConfig};
pub use self::simple_putter::SimplePutter;
pub use self::sled::{SledDirectoryService, SledDirectoryServiceConfig};
pub use self::traverse::descend_to;
pub use self::utils::traverse_directory;

#[cfg(feature = "cloud")]
mod bigtable;

#[cfg(feature = "cloud")]
pub use self::bigtable::{BigtableDirectoryService, BigtableParameters};

/// The base trait all Directory services need to implement.
/// This is a simple get and put of [Directory], returning their
/// digest.
#[async_trait]
pub trait DirectoryService: Send + Sync {
    /// Looks up a single Directory message by its digest.
    /// The returned Directory message *must* be valid.
    /// In case the directory is not found, Ok(None) is returned.
    ///
    /// It is okay for certain implementations to only allow retrieval of
    /// Directory digests that are at the "root", aka the last element that's
    /// sent to a DirectoryPutter. This makes sense for implementations bundling
    /// closures of directories together in batches.
    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>;
    /// Uploads a single Directory message, and returns the calculated
    /// digest, or an error. An error *must* also be returned if the message is
    /// not valid.
    async fn put(&self, directory: Directory) -> Result<B3Digest, Error>;

    /// Looks up a closure of [Directory].
    /// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`,
    /// and we'd be able to add a default implementation for it here, but
    /// we can't have that yet.
    ///
    /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily,
    /// and the box allows different underlying stream implementations to be returned since
    /// Rust doesn't support this as a generic in traits yet. This is the same thing that
    /// [async_trait] generates, but for streams instead of futures.
    ///
    /// The individually returned Directory messages *must* be valid.
    /// Directories are sent in an order from the root to the leaves, so that
    /// the receiving side can validate each message to be a connected to the root
    /// that has initially been requested.
    ///
    /// In case the directory can not be found, this should return an empty stream.
    fn get_recursive(
        &self,
        root_directory_digest: &B3Digest,
    ) -> BoxStream<'static, Result<Directory, Error>>;

    /// Allows persisting a closure of [Directory], which is a graph of
    /// connected Directory messages.
    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>;
}

#[async_trait]
impl<A> DirectoryService for A
where
    A: AsRef<dyn DirectoryService> + Send + Sync,
{
    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
        self.as_ref().get(digest).await
    }

    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
        self.as_ref().put(directory).await
    }

    fn get_recursive(
        &self,
        root_directory_digest: &B3Digest,
    ) -> BoxStream<'static, Result<Directory, Error>> {
        self.as_ref().get_recursive(root_directory_digest)
    }

    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> {
        self.as_ref().put_multiple_start()
    }
}

/// Provides a handle to put a closure of connected [Directory] elements.
///
/// The consumer can periodically call [DirectoryPutter::put], starting from the
/// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
/// retrieve the root digest (or an error).
///
/// DirectoryPutters might be created without a single [DirectoryPutter::put],
/// and then dropped without calling [DirectoryPutter::close],
/// for example when ingesting a path that ends up not pointing to a directory,
/// but a single file or symlink.
#[async_trait]
pub trait DirectoryPutter: Send {
    /// Put a individual [Directory] into the store.
    /// Error semantics and behaviour is up to the specific implementation of
    /// this trait.
    /// Due to bursting, the returned error might refer to an object previously
    /// sent via `put`.
    async fn put(&mut self, directory: Directory) -> Result<(), Error>;

    /// Close the stream, and wait for any errors.
    /// If there's been any invalid Directory message uploaded, and error *must*
    /// be returned.
    async fn close(&mut self) -> Result<B3Digest, Error>;
}

/// Registers the builtin DirectoryService implementations with the registry
pub(crate) fn register_directory_services(reg: &mut Registry) {
    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::ObjectStoreDirectoryServiceConfig>("objectstore");
    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::MemoryDirectoryServiceConfig>("memory");
    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::CacheConfig>("cache");
    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc");
    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::SledDirectoryServiceConfig>("sled");
    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::RedbDirectoryServiceConfig>("redb");
    #[cfg(feature = "cloud")]
    {
        reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable");
    }
}

/// A Directory can contain Directory, File or Symlink nodes.
/// Each of these nodes have a name attribute, which is the basename in that
/// directory and node type specific attributes.
/// While a Node by itself may have any name, the names of Directory entries:
///  - MUST not contain slashes or null bytes
///  - MUST not be '.' or '..'
///  - MUST be unique across all three lists
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct Directory {
    nodes: Vec<Node>,
}

/// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest].
/// It also gives it a `name` and `size`.
/// Such a node is either an element in the [Directory] it itself is contained in,
/// or a standalone root node./
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DirectoryNode {
    /// The (base)name of the directory
    name: Bytes,
    /// The blake3 hash of a Directory message, serialized in protobuf canonical form.
    digest: B3Digest,
    /// Number of child elements in the Directory referred to by `digest`.
    /// Calculated by summing up the numbers of nodes, and for each directory.
    /// its size field. Can be used for inode allocation.
    /// This field is precisely as verifiable as any other Merkle tree edge.
    /// Resolve `digest`, and you can compute it incrementally. Resolve the entire
    /// tree, and you can fully compute it from scratch.
    /// A credulous implementation won't reject an excessive size, but this is
    /// harmless: you'll have some ordinals without nodes. Undersizing is obvious
    /// and easy to reject: you won't have an ordinal for some nodes.
    size: u64,
}

impl DirectoryNode {
    pub fn new(name: Bytes, digest: B3Digest, size: u64) -> Result<Self, ValidateNodeError> {
        Ok(Self { name, digest, size })
    }

    pub fn digest(&self) -> &B3Digest {
        &self.digest
    }
    pub fn size(&self) -> u64 {
        self.size
    }
}

/// A FileNode represents a regular or executable file in a Directory or at the root.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileNode {
    /// The (base)name of the file
    name: Bytes,

    /// The blake3 digest of the file contents
    digest: B3Digest,

    /// The file content size
    size: u64,

    /// Whether the file is executable
    executable: bool,
}

impl FileNode {
    pub fn new(
        name: Bytes,
        digest: B3Digest,
        size: u64,
        executable: bool,
    ) -> Result<Self, ValidateNodeError> {
        Ok(Self {
            name,
            digest,
            size,
            executable,
        })
    }

    pub fn digest(&self) -> &B3Digest {
        &self.digest
    }

    pub fn size(&self) -> u64 {
        self.size
    }

    pub fn executable(&self) -> bool {
        self.executable
    }
}

/// A SymlinkNode represents a symbolic link in a Directory or at the root.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SymlinkNode {
    /// The (base)name of the symlink
    name: Bytes,
    /// The target of the symlink.
    target: Bytes,
}

impl SymlinkNode {
    pub fn new(name: Bytes, target: Bytes) -> Result<Self, ValidateNodeError> {
        if target.is_empty() || target.contains(&b'\0') {
            return Err(ValidateNodeError::InvalidSymlinkTarget(target));
        }
        Ok(Self { name, target })
    }

    pub fn target(&self) -> &bytes::Bytes {
        &self.target
    }
}

/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode].
/// While a Node by itself may have any name, only those matching specific requirements
/// can can be added as entries to a [Directory] (see the documentation on [Directory] for details).
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Node {
    Directory(DirectoryNode),
    File(FileNode),
    Symlink(SymlinkNode),
}

/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode]
/// and [Node], so we can ask all of them for the name easily.
pub trait NamedNode {
    fn get_name(&self) -> &bytes::Bytes;
}

impl NamedNode for &FileNode {
    fn get_name(&self) -> &bytes::Bytes {
        &self.name
    }
}
impl NamedNode for FileNode {
    fn get_name(&self) -> &bytes::Bytes {
        &self.name
    }
}

impl NamedNode for &DirectoryNode {
    fn get_name(&self) -> &bytes::Bytes {
        &self.name
    }
}
impl NamedNode for DirectoryNode {
    fn get_name(&self) -> &bytes::Bytes {
        &self.name
    }
}

impl NamedNode for &SymlinkNode {
    fn get_name(&self) -> &bytes::Bytes {
        &self.name
    }
}
impl NamedNode for SymlinkNode {
    fn get_name(&self) -> &bytes::Bytes {
        &self.name
    }
}

impl NamedNode for &Node {
    fn get_name(&self) -> &bytes::Bytes {
        match self {
            Node::File(node_file) => &node_file.name,
            Node::Directory(node_directory) => &node_directory.name,
            Node::Symlink(node_symlink) => &node_symlink.name,
        }
    }
}
impl NamedNode for Node {
    fn get_name(&self) -> &bytes::Bytes {
        match self {
            Node::File(node_file) => &node_file.name,
            Node::Directory(node_directory) => &node_directory.name,
            Node::Symlink(node_symlink) => &node_symlink.name,
        }
    }
}

impl Node {
    /// Returns the node with a new name.
    pub fn rename(self, name: bytes::Bytes) -> Self {
        match self {
            Node::Directory(n) => Node::Directory(DirectoryNode { name, ..n }),
            Node::File(n) => Node::File(FileNode { name, ..n }),
            Node::Symlink(n) => Node::Symlink(SymlinkNode { name, ..n }),
        }
    }
}

impl PartialOrd for Node {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for Node {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.get_name().cmp(other.get_name())
    }
}

impl PartialOrd for FileNode {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for FileNode {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.get_name().cmp(other.get_name())
    }
}

impl PartialOrd for DirectoryNode {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for DirectoryNode {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.get_name().cmp(other.get_name())
    }
}

impl PartialOrd for SymlinkNode {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for SymlinkNode {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.get_name().cmp(other.get_name())
    }
}

fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> {
    iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i))
}

impl Directory {
    pub fn new() -> Self {
        Directory { nodes: vec![] }
    }

    /// The size of a directory is the number of all regular and symlink elements,
    /// the number of directory elements, and their size fields.
    pub fn size(&self) -> u64 {
        // It's impossible to create a Directory where the size overflows, because we
        // check before every add() that the size won't overflow.
        (self.nodes.len() as u64) + self.directories().map(|e| e.size).sum::<u64>()
    }

    /// Calculates the digest of a Directory, which is the blake3 hash of a
    /// Directory protobuf message, serialized in protobuf canonical form.
    pub fn digest(&self) -> B3Digest {
        proto::Directory::from(self).digest()
    }

    /// Allows iterating over all nodes (directories, files and symlinks)
    /// ordered by their name.
    pub fn nodes(&self) -> impl Iterator<Item = &Node> + Send + Sync + '_ {
        self.nodes.iter()
    }

    /// Allows iterating over the FileNode entries of this directory
    /// ordered by their name
    pub fn files(&self) -> impl Iterator<Item = &FileNode> + Send + Sync + '_ {
        self.nodes.iter().filter_map(|node| match node {
            Node::File(n) => Some(n),
            _ => None,
        })
    }

    /// Allows iterating over the subdirectories of this directory
    /// ordered by their name
    pub fn directories(&self) -> impl Iterator<Item = &DirectoryNode> + Send + Sync + '_ {
        self.nodes.iter().filter_map(|node| match node {
            Node::Directory(n) => Some(n),
            _ => None,
        })
    }

    /// Allows iterating over the SymlinkNode entries of this directory
    /// ordered by their name
    pub fn symlinks(&self) -> impl Iterator<Item = &SymlinkNode> + Send + Sync + '_ {
        self.nodes.iter().filter_map(|node| match node {
            Node::Symlink(n) => Some(n),
            _ => None,
        })
    }

    /// Checks a Node name for validity as a directory entry
    /// We disallow slashes, null bytes, '.', '..' and the empty string.
    pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> {
        if name.is_empty()
            || name == b".."
            || name == b"."
            || name.contains(&0x00)
            || name.contains(&b'/')
        {
            Err(ValidateNodeError::InvalidName(name.to_owned().into()))
        } else {
            Ok(())
        }
    }

    /// Adds the specified [Node] to the [Directory], preserving sorted entries.
    ///
    /// Inserting an element that already exists with the same name in the directory will yield an
    /// error.
    /// Inserting an element will validate that its name fulfills the stricter requirements for
    /// directory entries and yield an error if it is not.
    pub fn add(&mut self, node: Node) -> Result<(), ValidateDirectoryError> {
        Self::validate_node_name(node.get_name())
            .map_err(|e| ValidateDirectoryError::InvalidNode(node.get_name().clone().into(), e))?;

        // Check that the even after adding this new directory entry, the size calculation will not
        // overflow
        // FUTUREWORK: add some sort of batch add interface which only does this check once with
        // all the to-be-added entries
        checked_sum([
            self.size(),
            1,
            match node {
                Node::Directory(ref dir) => dir.size,
                _ => 0,
            },
        ])
        .ok_or(ValidateDirectoryError::SizeOverflow)?;

        // This assumes the [Directory] is sorted, since we don't allow accessing the nodes list
        // directly and all previous inserts should have been in-order
        let pos = match self
            .nodes
            .binary_search_by_key(&node.get_name(), |n| n.get_name())
        {
            Err(pos) => pos, // There is no node with this name; good!
            Ok(_) => {
                return Err(ValidateDirectoryError::DuplicateName(
                    node.get_name().to_vec(),
                ))
            }
        };

        self.nodes.insert(pos, node);
        Ok(())
    }
}

#[cfg(test)]
mod test {
    use super::{Directory, DirectoryNode, FileNode, Node, SymlinkNode};
    use crate::fixtures::DUMMY_DIGEST;
    use crate::ValidateDirectoryError;

    #[test]
    fn add_nodes_to_directory() {
        let mut d = Directory::new();

        d.add(Node::Directory(
            DirectoryNode::new("b".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
        ))
        .unwrap();
        d.add(Node::Directory(
            DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
        ))
        .unwrap();
        d.add(Node::Directory(
            DirectoryNode::new("z".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
        ))
        .unwrap();

        d.add(Node::File(
            FileNode::new("f".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
        ))
        .unwrap();
        d.add(Node::File(
            FileNode::new("c".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
        ))
        .unwrap();
        d.add(Node::File(
            FileNode::new("g".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
        ))
        .unwrap();

        d.add(Node::Symlink(
            SymlinkNode::new("t".into(), "a".into()).unwrap(),
        ))
        .unwrap();
        d.add(Node::Symlink(
            SymlinkNode::new("o".into(), "a".into()).unwrap(),
        ))
        .unwrap();
        d.add(Node::Symlink(
            SymlinkNode::new("e".into(), "a".into()).unwrap(),
        ))
        .unwrap();

        // Convert to proto struct and back to ensure we are not generating any invalid structures
        crate::directoryservice::Directory::try_from(crate::proto::Directory::from(d))
            .expect("directory should be valid");
    }

    #[test]
    fn validate_overflow() {
        let mut d = Directory::new();

        assert_eq!(
            d.add(Node::Directory(
                DirectoryNode::new("foo".into(), DUMMY_DIGEST.clone(), u64::MAX).unwrap(),
            )),
            Err(ValidateDirectoryError::SizeOverflow)
        );
    }

    #[test]
    fn add_duplicate_node_to_directory() {
        let mut d = Directory::new();

        d.add(Node::Directory(
            DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
        ))
        .unwrap();
        assert_eq!(
            format!(
                "{}",
                d.add(Node::File(
                    FileNode::new("a".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
                ))
                .expect_err("adding duplicate dir entry must fail")
            ),
            "\"a\" is a duplicate name"
        );
    }

    /// Attempt to add a directory entry with a name which should be rejected.
    #[tokio::test]
    async fn directory_reject_invalid_name() {
        let mut dir = Directory::new();
        assert!(
            dir.add(Node::Symlink(
                SymlinkNode::new(
                    "".into(), // wrong! can not be added to directory
                    "doesntmatter".into(),
                )
                .unwrap()
            ))
            .is_err(),
            "invalid symlink entry be rejected"
        );
    }
}