diff options
Diffstat (limited to 'tvix/nix-compat/src/nix_daemon/handler.rs')
-rw-r--r-- | tvix/nix-compat/src/nix_daemon/handler.rs | 69 |
1 files changed, 58 insertions, 11 deletions
diff --git a/tvix/nix-compat/src/nix_daemon/handler.rs b/tvix/nix-compat/src/nix_daemon/handler.rs index 65c5c2d60d08..4f43612114d8 100644 --- a/tvix/nix-compat/src/nix_daemon/handler.rs +++ b/tvix/nix-compat/src/nix_daemon/handler.rs @@ -1,4 +1,4 @@ -use std::{future::Future, sync::Arc}; +use std::{future::Future, ops::DerefMut, sync::Arc}; use bytes::Bytes; use tokio::{ @@ -8,7 +8,8 @@ use tokio::{ use tracing::{debug, warn}; use super::{ - types::QueryValidPaths, + framing::{NixFramedReader, StderrReadFramedReader}, + types::{AddToStoreNarRequest, QueryValidPaths}, worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST}, NixDaemonIO, }; @@ -120,7 +121,7 @@ where Ok(operation) => match operation { Operation::IsValidPath => { let path: StorePath<String> = self.reader.read_value().await?; - self.handle(io.is_valid_path(&path)).await? + Self::handle(&self.writer, io.is_valid_path(&path)).await? } // Note this operation does not currently delegate to NixDaemonIO, // The general idea is that we will pass relevant ClientSettings @@ -128,23 +129,23 @@ where // For now we just store the settings in the NixDaemon for future use. Operation::SetOptions => { self.client_settings = self.reader.read_value().await?; - self.handle(async { Ok(()) }).await? + Self::handle(&self.writer, async { Ok(()) }).await? } Operation::QueryPathInfo => { let path: StorePath<String> = self.reader.read_value().await?; - self.handle(io.query_path_info(&path)).await? + Self::handle(&self.writer, io.query_path_info(&path)).await? } Operation::QueryPathFromHashPart => { let hash: Bytes = self.reader.read_value().await?; - self.handle(io.query_path_from_hash_part(&hash)).await? + Self::handle(&self.writer, io.query_path_from_hash_part(&hash)).await? } Operation::QueryValidPaths => { let query: QueryValidPaths = self.reader.read_value().await?; - self.handle(io.query_valid_paths(&query)).await? + Self::handle(&self.writer, io.query_valid_paths(&query)).await? } Operation::QueryValidDerivers => { let path: StorePath<String> = self.reader.read_value().await?; - self.handle(io.query_valid_derivers(&path)).await? + Self::handle(&self.writer, io.query_valid_derivers(&path)).await? } // FUTUREWORK: These are just stubs that return an empty list. // It's important not to return an error for the local-overlay:// store @@ -154,7 +155,7 @@ where // invariants. Operation::QueryReferrers | Operation::QueryRealisation => { let _: String = self.reader.read_value().await?; - self.handle(async move { + Self::handle(&self.writer, async move { warn!( ?operation, "This operation is not implemented. Returning empty result..." @@ -163,6 +164,41 @@ where }) .await? } + Operation::AddToStoreNar => { + let request: AddToStoreNarRequest = self.reader.read_value().await?; + let minor_version = self.protocol_version.minor(); + match minor_version { + ..21 => { + // Before protocol version 1.21, the nar is sent unframed, so we just + // pass the reader directly to the operation. + Self::handle( + &self.writer, + self.io.add_to_store_nar(request, &mut self.reader), + ) + .await? + } + 21..23 => { + // Protocol versions 1.21 .. 1.23 use STDERR_READ protocol, see logging.md#stderr_read. + Self::handle(&self.writer, async { + let mut writer = self.writer.lock().await; + let mut reader = StderrReadFramedReader::new( + &mut self.reader, + writer.deref_mut(), + ); + self.io.add_to_store_nar(request, &mut reader).await + }) + .await? + } + 23.. => { + // Starting at protocol version 1.23, the framed protocol is used, see serialization.md#framed + let mut framed = NixFramedReader::new(&mut self.reader); + Self::handle(&self.writer, async { + self.io.add_to_store_nar(request, &mut framed).await + }) + .await? + } + } + } _ => { return Err(std::io::Error::other(format!( "Operation {operation:?} is not implemented" @@ -188,14 +224,14 @@ where /// This is a helper method, awaiting on the passed in future and then /// handling log lines/activities as described above. async fn handle<T>( - &mut self, + writer: &Arc<Mutex<NixWriter<WriteHalf<RW>>>>, future: impl Future<Output = std::io::Result<T>>, ) -> Result<(), std::io::Error> where T: NixSerialize + Send, { let result = future.await; - let mut writer = self.writer.lock().await; + let mut writer = writer.lock().await; match result { Ok(r) => { @@ -244,6 +280,17 @@ mod tests { ) -> Result<Option<UnkeyedValidPathInfo>> { Ok(None) } + + async fn add_to_store_nar<R>( + &self, + _request: crate::nix_daemon::types::AddToStoreNarRequest, + _reader: &mut R, + ) -> Result<()> + where + R: tokio::io::AsyncRead + Send + Unpin, + { + Ok(()) + } } #[tokio::test] |