about summary refs log tree commit diff
path: root/users/edef/weave/src/lib.rs
blob: ff5bf923267642888d4aa2625677425fb89d21bb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use anyhow::Result;
use owning_ref::{ArcRef, OwningRef};
use rayon::prelude::*;
use std::{
    fs::File,
    mem,
    ops::{Deref, Range},
    slice,
    sync::Arc,
};
use tracing_indicatif::span_ext::IndicatifSpanExt as _;

use polars::{
    datatypes::BinaryChunked,
    export::arrow::array::BinaryArray,
    prelude::{ParquetReader, SerReader},
};

/// An shared `[[u8; N]]` backed by a Polars [Buffer].
pub type FixedBytes<const N: usize> =
    ArcRef<'static, polars::export::arrow::buffer::Bytes<u8>, [[u8; N]]>;

pub const INDEX_NULL: u32 = !0;

/// A terrific hash function, turning 20 bytes of cryptographic hash
/// into 8 bytes of cryptographic hash.
pub fn hash64(h: &[u8; 20]) -> u64 {
    let mut buf = [0; 8];
    buf.copy_from_slice(&h[..8]);
    u64::from_ne_bytes(buf)
}

pub fn leak<O, T: ?Sized>(r: OwningRef<Arc<O>, T>) -> &T {
    // SAFETY: Either `ptr` points into the `Arc`, which lives until `r` is dropped,
    // or it points at something else entirely which lives at least as long.
    unsafe {
        let ptr: *const T = r.deref();
        mem::forget(r);
        &*ptr
    }
}

/// Read a dense `store_path_hash` array from `narinfo.parquet`,
/// returning it as an owned [FixedBytes].
#[tracing::instrument(fields(indicatif.pb_show = tracing::field::Empty))]
pub fn load_ph_array() -> Result<FixedBytes<20>> {
    let span = tracing::Span::current();

    span.pb_set_message("load store_path_hash");
    span.pb_start();

    // TODO(edef): this could use a further pushdown, since polars is more hindrance than help here
    // We know this has to fit in memory (we can't mmap it without further encoding constraints),
    // and we want a single `Vec<[u8; 20]>` of the data.
    let ph_array = into_fixed_binary_rechunk::<20>(
        ParquetReader::new(File::open("narinfo.parquet").unwrap())
            .with_columns(Some(vec!["store_path_hash".into()]))
            .set_rechunk(true)
            .finish()?
            .column("store_path_hash")?
            .binary()?,
    );

    u32::try_from(ph_array.len()).expect("dataset exceeds 2^32");

    Ok(ph_array)
}

/// Iterator over `&[[u8; N]]` from a dense [BinaryChunked].
pub fn as_fixed_binary<const N: usize>(
    chunked: &BinaryChunked,
) -> impl DoubleEndedIterator<Item = &[[u8; N]]> {
    chunked.downcast_iter().map(|array| {
        let range = assert_fixed_dense::<N>(array);
        exact_chunks(&array.values()[range]).unwrap()
    })
}

/// Convert a dense [BinaryChunked] into a single chunk as [FixedBytes],
/// without taking a reference to the offsets array and validity bitmap.
fn into_fixed_binary_rechunk<const N: usize>(chunked: &BinaryChunked) -> FixedBytes<N> {
    let chunked = chunked.rechunk();
    let mut iter = chunked.downcast_iter();
    let array = iter.next().unwrap();
    assert!(iter.next().is_none());

    let (buf, off, len) = {
        let range = assert_fixed_dense::<N>(array);
        array.values().clone().sliced(range.start, range.len())
    }
    .into_inner();

    ArcRef::new(buf).map(|bytes| exact_chunks(&bytes[off..off + len]).unwrap())
}

/// Ensures that the supplied Arrow array consists of densely packed bytestrings of length `N`.
/// In other words, ensure that it is free of nulls, and that the offsets have a fixed stride of `N`.
#[must_use = "only the range returned is guaranteed to be conformant"]
fn assert_fixed_dense<const N: usize>(array: &BinaryArray<i64>) -> Range<usize> {
    let null_count = array.validity().map_or(0, |bits| bits.unset_bits());
    if null_count > 0 {
        panic!("null values present");
    }

    let offsets = array.offsets();
    let length_check = offsets
        .as_slice()
        .par_windows(2)
        .all(|w| (w[1] - w[0]) == N as i64);

    if !length_check {
        panic!("lengths are inconsistent");
    }

    (*offsets.first() as usize)..(*offsets.last() as usize)
}

fn exact_chunks<const K: usize>(buf: &[u8]) -> Option<&[[u8; K]]> {
    // SAFETY: We ensure that `buf.len()` is a multiple of K, and there are no alignment requirements.
    unsafe {
        let ptr = buf.as_ptr();
        let len = buf.len();

        if len % K != 0 {
            return None;
        }

        let ptr = ptr as *mut [u8; K];
        let len = len / K;

        Some(slice::from_raw_parts(ptr, len))
    }
}