about summary refs log tree commit diff
path: root/tvix/nix-compat
diff options
context:
space:
mode:
authorPicnoir <picnoir@alternativebit.fr>2024-03-21T08·52+0100
committerpicnoir picnoir <picnoir@alternativebit.fr>2024-03-27T12·05+0000
commit21481b02b872900b881c2c489e085e44f1b90b0f (patch)
tree7eeb75aa809f73d1bd04f090a6521165333b2ce5 /tvix/nix-compat
parent508d67ad49917c293c34f3c9ca59adcea02ea3ef (diff)
feat(tvix/nix-compat): worker protocol operation parser r/7784
Change-Id: I7776635b17c44534223603d28cf59c7eebd976e0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11229
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/nix-compat')
-rw-r--r--tvix/nix-compat/Cargo.toml2
-rw-r--r--tvix/nix-compat/src/wire/mod.rs3
-rw-r--r--tvix/nix-compat/src/wire/worker_protocol.rs83
3 files changed, 88 insertions, 0 deletions
diff --git a/tvix/nix-compat/Cargo.toml b/tvix/nix-compat/Cargo.toml
index 181eb9428915..6e3df0485db1 100644
--- a/tvix/nix-compat/Cargo.toml
+++ b/tvix/nix-compat/Cargo.toml
@@ -14,9 +14,11 @@ bstr = { version = "1.6.0", features = ["alloc", "unicode", "serde"] }
 data-encoding = "2.3.3"
 ed25519 = "2.2.3"
 ed25519-dalek = "2.1.0"
+enum-primitive-derive = "0.3.0"
 futures-util = { version = "0.3.30", features = ["io"], optional = true }
 glob = "0.3.0"
 nom = "7.1.3"
+num-traits = "0.2.18"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 sha2 = "0.10.6"
diff --git a/tvix/nix-compat/src/wire/mod.rs b/tvix/nix-compat/src/wire/mod.rs
index 9444ebbcfe59..cd7b87aacd85 100644
--- a/tvix/nix-compat/src/wire/mod.rs
+++ b/tvix/nix-compat/src/wire/mod.rs
@@ -6,3 +6,6 @@ pub mod bytes;
 
 #[cfg(feature = "async")]
 pub mod primitive;
+
+#[cfg(feature = "async")]
+pub mod worker_protocol;
diff --git a/tvix/nix-compat/src/wire/worker_protocol.rs b/tvix/nix-compat/src/wire/worker_protocol.rs
new file mode 100644
index 000000000000..82f227c72362
--- /dev/null
+++ b/tvix/nix-compat/src/wire/worker_protocol.rs
@@ -0,0 +1,83 @@
+use std::io::{Error, ErrorKind};
+
+use enum_primitive_derive::Primitive;
+use num_traits::{FromPrimitive, ToPrimitive};
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+
+use crate::wire::primitive;
+
+/// Worker Operation
+///
+/// These operations are encoded as unsigned 64 bits before being sent
+/// to the wire. See the [read_op] and
+/// [write_op] operations to serialize/deserialize the
+/// operation on the wire.
+///
+/// Note: for now, we're using the Nix 2.20 operation description. The
+/// operations marked as obsolete are obsolete for Nix 2.20, not
+/// necessarily for Nix 2.3. We'll revisit this later on.
+#[derive(Debug, PartialEq, Primitive)]
+pub enum Operation {
+    IsValidPath = 1,
+    HasSubstitutes = 3,
+    QueryPathHash = 4,   // obsolete
+    QueryReferences = 5, // obsolete
+    QueryReferrers = 6,
+    AddToStore = 7,
+    AddTextToStore = 8, // obsolete since 1.25, Nix 3.0. Use WorkerProto::Op::AddToStore
+    BuildPaths = 9,
+    EnsurePath = 10,
+    AddTempRoot = 11,
+    AddIndirectRoot = 12,
+    SyncWithGC = 13,
+    FindRoots = 14,
+    ExportPath = 16,   // obsolete
+    QueryDeriver = 18, // obsolete
+    SetOptions = 19,
+    CollectGarbage = 20,
+    QuerySubstitutablePathInfo = 21,
+    QueryDerivationOutputs = 22, // obsolete
+    QueryAllValidPaths = 23,
+    QueryFailedPaths = 24,
+    ClearFailedPaths = 25,
+    QueryPathInfo = 26,
+    ImportPaths = 27,                // obsolete
+    QueryDerivationOutputNames = 28, // obsolete
+    QueryPathFromHashPart = 29,
+    QuerySubstitutablePathInfos = 30,
+    QueryValidPaths = 31,
+    QuerySubstitutablePaths = 32,
+    QueryValidDerivers = 33,
+    OptimiseStore = 34,
+    VerifyStore = 35,
+    BuildDerivation = 36,
+    AddSignatures = 37,
+    NarFromPath = 38,
+    AddToStoreNar = 39,
+    QueryMissing = 40,
+    QueryDerivationOutputMap = 41,
+    RegisterDrvOutput = 42,
+    QueryRealisation = 43,
+    AddMultipleToStore = 44,
+    AddBuildLog = 45,
+    BuildPathsWithResults = 46,
+    AddPermRoot = 47,
+}
+
+/// Read a worker [Operation] from the wire.
+pub async fn read_op<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<Operation> {
+    let op_number = primitive::read_u64(r).await?;
+    Operation::from_u64(op_number).ok_or(Error::new(
+        ErrorKind::Other,
+        format!("Invalid OP number {}", op_number),
+    ))
+}
+
+/// Write a worker [Operation] to the wire.
+pub async fn write_op<W: AsyncWriteExt + Unpin>(w: &mut W, op: &Operation) -> std::io::Result<()> {
+    let op = Operation::to_u64(op).ok_or(Error::new(
+        ErrorKind::Other,
+        format!("Can't convert the OP {:?} to u64", op),
+    ))?;
+    w.write_u64(op).await
+}