about summary refs log tree commit diff
path: root/users/edef
diff options
context:
space:
mode:
authoredef <edef@edef.eu>2024-10-17T13·26+0000
committeredef <edef@edef.eu>2024-10-19T17·01+0000
commit313899c291f0295506c275418e570b39b4a5f079 (patch)
tree0a4c737f7c54082eaf3957072693115e29fabcd1 /users/edef
parentbdc2891053c28a5739c4be55f4816d362a5f08e2 (diff)
refactor(users/edef/weave/swizzle): use polars streaming r/8844
This vastly reduces the memory requirements, so we can run in ~40G RAM.

Change-Id: I4952a780df294bd852a8b4682ba2fd59b9bae675
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12667
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'users/edef')
-rw-r--r--users/edef/weave/Cargo.lock12
-rw-r--r--users/edef/weave/Cargo.nix48
-rw-r--r--users/edef/weave/Cargo.toml3
-rw-r--r--users/edef/weave/src/bin/swizzle.rs99
-rw-r--r--users/edef/weave/src/lib.rs20
5 files changed, 70 insertions, 112 deletions
diff --git a/users/edef/weave/Cargo.lock b/users/edef/weave/Cargo.lock
index aaa77014fba2..191059ffd729 100644
--- a/users/edef/weave/Cargo.lock
+++ b/users/edef/weave/Cargo.lock
@@ -1654,15 +1654,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
 
 [[package]]
-name = "signal-hook-registry"
-version = "1.4.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
-dependencies = [
- "libc",
-]
-
-[[package]]
 name = "signature"
 version = "2.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1881,9 +1872,7 @@ dependencies = [
  "bytes",
  "libc",
  "mio",
- "parking_lot",
  "pin-project-lite",
- "signal-hook-registry",
  "socket2",
  "tokio-macros",
  "windows-sys",
@@ -2039,7 +2028,6 @@ dependencies = [
  "polars",
  "rayon",
  "safer_owning_ref",
- "tokio",
 ]
 
 [[package]]
diff --git a/users/edef/weave/Cargo.nix b/users/edef/weave/Cargo.nix
index a06bae091c83..c34a03b6876b 100644
--- a/users/edef/weave/Cargo.nix
+++ b/users/edef/weave/Cargo.nix
@@ -3395,7 +3395,7 @@ rec {
           "unique_counts" = [ "polars-ops/unique_counts" "polars-lazy?/unique_counts" ];
           "zip_with" = [ "polars-core/zip_with" ];
         };
-        resolvedDefaultFeatures = [ "csv" "default" "docs" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-slim" "fmt" "parquet" "polars-io" "polars-ops" "polars-time" "temporal" "zip_with" ];
+        resolvedDefaultFeatures = [ "csv" "default" "docs" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-slim" "fmt" "lazy" "parquet" "polars-io" "polars-lazy" "polars-ops" "polars-time" "streaming" "temporal" "zip_with" ];
       };
       "polars-arrow" = rec {
         crateName = "polars-arrow";
@@ -4204,7 +4204,7 @@ rec {
           "true_div" = [ "polars-plan/true_div" ];
           "unique_counts" = [ "polars-plan/unique_counts" ];
         };
-        resolvedDefaultFeatures = [ "abs" "cross_join" "csv" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-i16" "dtype-i8" "dtype-time" "is_in" "log" "meta" "parquet" "polars-time" "regex" "round_series" "strings" "temporal" "trigonometry" ];
+        resolvedDefaultFeatures = [ "abs" "chunked_ids" "cross_join" "csv" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-i16" "dtype-i8" "dtype-time" "is_in" "log" "meta" "parquet" "polars-pipe" "polars-time" "regex" "round_series" "streaming" "strings" "temporal" "trigonometry" ];
       };
       "polars-ops" = rec {
         crateName = "polars-ops";
@@ -4356,7 +4356,7 @@ rec {
           "timezones" = [ "chrono-tz" "chrono" ];
           "unicode-reverse" = [ "dep:unicode-reverse" ];
         };
-        resolvedDefaultFeatures = [ "abs" "cross_join" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "is_in" "log" "round_series" "search_sorted" "strings" ];
+        resolvedDefaultFeatures = [ "abs" "chunked_ids" "cross_join" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "is_in" "log" "round_series" "search_sorted" "strings" ];
       };
       "polars-parquet" = rec {
         crateName = "polars-parquet";
@@ -4780,7 +4780,7 @@ rec {
           "top_k" = [ "polars-ops/top_k" ];
           "unique_counts" = [ "polars-ops/unique_counts" ];
         };
-        resolvedDefaultFeatures = [ "abs" "cross_join" "csv" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-i16" "dtype-i8" "dtype-time" "is_in" "log" "meta" "parquet" "polars-parquet" "polars-time" "regex" "round_series" "strings" "temporal" "trigonometry" ];
+        resolvedDefaultFeatures = [ "abs" "chunked_ids" "cross_join" "csv" "cum_agg" "dtype-date" "dtype-datetime" "dtype-duration" "dtype-i16" "dtype-i8" "dtype-time" "is_in" "log" "meta" "parquet" "polars-parquet" "polars-time" "regex" "round_series" "streaming" "strings" "temporal" "trigonometry" ];
       };
       "polars-row" = rec {
         crateName = "polars-row";
@@ -5709,24 +5709,6 @@ rec {
         };
         resolvedDefaultFeatures = [ "default" "std" ];
       };
-      "signal-hook-registry" = rec {
-        crateName = "signal-hook-registry";
-        version = "1.4.2";
-        edition = "2015";
-        sha256 = "1cb5akgq8ajnd5spyn587srvs4n26ryq0p78nswffwhv46sf1sd9";
-        libName = "signal_hook_registry";
-        authors = [
-          "Michal 'vorner' Vaner <vorner@vorner.cz>"
-          "Masaki Hara <ackie.h.gmai@gmail.com>"
-        ];
-        dependencies = [
-          {
-            name = "libc";
-            packageId = "libc";
-          }
-        ];
-
-      };
       "signature" = rec {
         crateName = "signature";
         version = "2.2.0";
@@ -6283,21 +6265,10 @@ rec {
             usesDefaultFeatures = false;
           }
           {
-            name = "parking_lot";
-            packageId = "parking_lot";
-            optional = true;
-          }
-          {
             name = "pin-project-lite";
             packageId = "pin-project-lite";
           }
           {
-            name = "signal-hook-registry";
-            packageId = "signal-hook-registry";
-            optional = true;
-            target = { target, features }: (target."unix" or false);
-          }
-          {
             name = "socket2";
             packageId = "socket2";
             optional = true;
@@ -6353,7 +6324,7 @@ rec {
           "tracing" = [ "dep:tracing" ];
           "windows-sys" = [ "dep:windows-sys" ];
         };
-        resolvedDefaultFeatures = [ "bytes" "default" "fs" "full" "io-std" "io-util" "libc" "macros" "mio" "net" "parking_lot" "process" "rt" "rt-multi-thread" "signal" "signal-hook-registry" "socket2" "sync" "time" "tokio-macros" "windows-sys" ];
+        resolvedDefaultFeatures = [ "bytes" "default" "io-util" "libc" "macros" "mio" "net" "rt" "rt-multi-thread" "socket2" "sync" "time" "tokio-macros" "windows-sys" ];
       };
       "tokio-macros" = rec {
         crateName = "tokio-macros";
@@ -6789,7 +6760,7 @@ rec {
           {
             name = "polars";
             packageId = "polars";
-            features = [ "parquet" ];
+            features = [ "parquet" "lazy" "streaming" ];
           }
           {
             name = "rayon";
@@ -6799,11 +6770,6 @@ rec {
             name = "safer_owning_ref";
             packageId = "safer_owning_ref";
           }
-          {
-            name = "tokio";
-            packageId = "tokio";
-            features = [ "full" ];
-          }
         ];
 
       };
@@ -7799,7 +7765,7 @@ rec {
           "Win32_Web" = [ "Win32" ];
           "Win32_Web_InternetExplorer" = [ "Win32_Web" ];
         };
-        resolvedDefaultFeatures = [ "Wdk" "Wdk_Foundation" "Wdk_Storage" "Wdk_Storage_FileSystem" "Wdk_System" "Wdk_System_IO" "Win32" "Win32_Foundation" "Win32_Networking" "Win32_Networking_WinSock" "Win32_Security" "Win32_Storage" "Win32_Storage_FileSystem" "Win32_System" "Win32_System_Com" "Win32_System_Console" "Win32_System_IO" "Win32_System_Pipes" "Win32_System_SystemServices" "Win32_System_Threading" "Win32_System_WindowsProgramming" "Win32_UI" "Win32_UI_Shell" "default" ];
+        resolvedDefaultFeatures = [ "Wdk" "Wdk_Foundation" "Wdk_Storage" "Wdk_Storage_FileSystem" "Wdk_System" "Wdk_System_IO" "Win32" "Win32_Foundation" "Win32_Networking" "Win32_Networking_WinSock" "Win32_Security" "Win32_Storage" "Win32_Storage_FileSystem" "Win32_System" "Win32_System_Com" "Win32_System_IO" "Win32_System_Pipes" "Win32_System_SystemServices" "Win32_System_Threading" "Win32_System_WindowsProgramming" "Win32_UI" "Win32_UI_Shell" "default" ];
       };
       "windows-targets" = rec {
         crateName = "windows-targets";
diff --git a/users/edef/weave/Cargo.toml b/users/edef/weave/Cargo.toml
index 69cf3cf7effd..72a205f66914 100644
--- a/users/edef/weave/Cargo.toml
+++ b/users/edef/weave/Cargo.toml
@@ -13,8 +13,7 @@ hashbrown = "0.14.3"
 nix-compat = { version = "0.1.0", path = "../../../tvix/nix-compat" }
 safer_owning_ref = "0.5.0"
 rayon = "1.8.1"
-tokio = { version = "1.36.0", features = ["full"] }
 
 [dependencies.polars]
 version = "0.36.2"
-features = ["parquet"]
+features = ["parquet", "lazy", "streaming"]
diff --git a/users/edef/weave/src/bin/swizzle.rs b/users/edef/weave/src/bin/swizzle.rs
index 68c18581268a..bcea3edfada9 100644
--- a/users/edef/weave/src/bin/swizzle.rs
+++ b/users/edef/weave/src/bin/swizzle.rs
@@ -32,21 +32,21 @@
 
 use anyhow::Result;
 use hashbrown::HashTable;
-use polars::prelude::*;
-use rayon::prelude::*;
-use std::fs::File;
-use tokio::runtime::Runtime;
+use polars::{
+    lazy::dsl::{col, SpecialEq},
+    prelude::*,
+};
 
-use weave::{as_fixed_binary, hash64, load_ph_array, DONE, INDEX_NULL};
+use weave::{as_fixed_binary, hash64, leak, load_ph_array, DONE, INDEX_NULL};
 
 fn main() -> Result<()> {
-    let ph_array = load_ph_array()?;
+    let ph_array: &'static [[u8; 20]] = leak(load_ph_array()?);
 
     // TODO(edef): re-parallelise this
     // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works.
     // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory
     eprint!("… build index\r");
-    let ph_map: HashTable<(u64, u32)> = {
+    let ph_map: &'static HashTable<(u64, u32)> = {
         let mut ph_map = HashTable::with_capacity(ph_array.len());
 
         for (offset, item) in ph_array.iter().enumerate() {
@@ -55,59 +55,48 @@ fn main() -> Result<()> {
             ph_map.insert_unique(hash, (hash, offset), |&(hash, _)| hash);
         }
 
-        ph_map
+        &*Box::leak(Box::new(ph_map))
     };
     eprintln!("{DONE}");
 
-    eprint!("… swizzle references\r");
-    let mut pq = ParquetReader::new(File::open("narinfo.parquet")?)
-        .with_columns(Some(vec!["references".into()]))
-        .batched(1 << 16)?;
-
-    let mut reference_idxs =
-        Series::new_empty("reference_idxs", &DataType::List(DataType::UInt32.into()));
-
-    let mut bounce = vec![];
-    let runtime = Runtime::new()?;
-    while let Some(batches) = runtime.block_on(pq.next_batches(48))? {
-        batches
-            .into_par_iter()
-            .map(|df| -> ListChunked {
-                df.column("references")
-                    .unwrap()
-                    .list()
-                    .unwrap()
-                    .apply_to_inner(&|series: Series| -> PolarsResult<Series> {
-                        let series = series.binary()?;
-                        let mut out: Vec<u32> = Vec::with_capacity(series.len());
-
-                        out.extend(as_fixed_binary::<20>(series).flat_map(|xs| xs).map(|key| {
-                            let hash = hash64(&key);
-                            ph_map
-                                .find(hash, |&(candidate_hash, candidate_index)| {
-                                    candidate_hash == hash
-                                        && &ph_array[candidate_index as usize] == key
-                                })
-                                .map(|&(_, index)| index)
-                                .unwrap_or(INDEX_NULL)
-                        }));
-
-                        Ok(Series::from_vec("reference_idxs", out))
-                    })
-                    .unwrap()
+    let ph_to_idx = |key: &[u8; 20]| -> u32 {
+        let hash = hash64(key);
+        ph_map
+            .find(hash, |&(candidate_hash, candidate_index)| {
+                candidate_hash == hash && &ph_array[candidate_index as usize] == key
             })
-            .collect_into_vec(&mut bounce);
-
-        for batch in bounce.drain(..) {
-            reference_idxs.append(&batch.into_series())?;
-        }
-    }
-    eprintln!("{DONE}");
+            .map(|&(_, index)| index)
+            .unwrap_or(INDEX_NULL)
+    };
 
-    eprint!("… writing output\r");
-    ParquetWriter::new(File::create("narinfo-references.parquet")?).finish(&mut df! {
-        "reference_idxs" => reference_idxs,
-    }?)?;
+    eprint!("… swizzle references\r");
+    LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())?
+        .with_column(
+            col("references")
+                .map(
+                    move |series: Series| -> PolarsResult<Option<Series>> {
+                        Ok(Some(
+                            series
+                                .list()?
+                                .apply_to_inner(&|series: Series| -> PolarsResult<Series> {
+                                    let series = series.binary()?;
+                                    let mut out: Vec<u32> = Vec::with_capacity(series.len());
+                                    out.extend(as_fixed_binary(series).flatten().map(ph_to_idx));
+                                    Ok(Series::from_vec("reference_idxs", out))
+                                })?
+                                .into_series(),
+                        ))
+                    },
+                    SpecialEq::from_type(DataType::List(DataType::UInt32.into())),
+                )
+                .alias("reference_idxs"),
+        )
+        .select([col("reference_idxs")])
+        .with_streaming(true)
+        .sink_parquet(
+            "narinfo-references.parquet".into(),
+            ParquetWriteOptions::default(),
+        )?;
     eprintln!("{DONE}");
 
     Ok(())
diff --git a/users/edef/weave/src/lib.rs b/users/edef/weave/src/lib.rs
index db3d07e7de07..4ccd566ca52d 100644
--- a/users/edef/weave/src/lib.rs
+++ b/users/edef/weave/src/lib.rs
@@ -1,7 +1,13 @@
 use anyhow::Result;
-use owning_ref::ArcRef;
+use owning_ref::{ArcRef, OwningRef};
 use rayon::prelude::*;
-use std::{fs::File, ops::Range, slice};
+use std::{
+    fs::File,
+    mem,
+    ops::{Deref, Range},
+    slice,
+    sync::Arc,
+};
 
 use polars::{
     datatypes::BinaryChunked,
@@ -24,6 +30,16 @@ pub fn hash64(h: &[u8; 20]) -> u64 {
     u64::from_ne_bytes(buf)
 }
 
+pub fn leak<O, T: ?Sized>(r: OwningRef<Arc<O>, T>) -> &T {
+    // SAFETY: Either `ptr` points into the `Arc`, which lives until `r` is dropped,
+    // or it points at something else entirely which lives at least as long.
+    unsafe {
+        let ptr: *const T = r.deref();
+        mem::forget(r);
+        &*ptr
+    }
+}
+
 /// Read a dense `store_path_hash` array from `narinfo.parquet`,
 /// returning it as an owned [FixedBytes].
 pub fn load_ph_array() -> Result<FixedBytes<20>> {