From e3d72cc4cb6714f683e97e8405f296977335efa6 Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Sat, 14 Oct 2023 11:58:23 -0500 Subject: refactor(tvix/store): Upgrade tokio-listener to get tonic support Tonic support was added to tokio-listener upstream which removes the need for use to have tonic compatibility wrapper types around it. See: https://github.com/vi/tokio-listener/pull/2 Fixes b/311 Change-Id: I04a2dbb3bc3c8bfe9339583c0b46070c7ec97811 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9721 Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/Cargo.lock | 5 +- tvix/Cargo.nix | 18 +++++- tvix/store/Cargo.toml | 4 +- tvix/store/src/bin/tvix-store.rs | 11 +++- tvix/store/src/lib.rs | 1 - tvix/store/src/listener/mod.rs | 131 --------------------------------------- 6 files changed, 29 insertions(+), 141 deletions(-) delete mode 100644 tvix/store/src/listener/mod.rs diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 3ac45f3024..1f24d91e78 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -2570,9 +2570,9 @@ dependencies = [ [[package]] name = "tokio-listener" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05875e290052a679fec29aef1da57eb4da3dafb59c6fe579fb143ccca3dea7fb" +checksum = "669ed78565b6ce6482aaf8c1f67e0ae1fa1cf1a97c090e96994d502857675d45" dependencies = [ "document-features", "futures-core", @@ -2581,6 +2581,7 @@ dependencies = [ "pin-project", "socket2 0.5.4", "tokio", + "tonic", "tracing", ] diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 11d6e0ac7e..8848a3fe15 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -7481,9 +7481,9 @@ rec { }; "tokio-listener" = rec { crateName = "tokio-listener"; - version = "0.2.1"; + version = "0.2.2"; edition = "2021"; - sha256 = "1yx7vsiwqg0lzdwyavwwnnpkvnmlgsjivvwsqbz7k9jj00lmx1q5"; + sha256 = "0iaxcxbjhl2dk6b0w2bwm7qiryp119zgdhgqma169kmncn2xg7k6"; dependencies = [ { name = "document-features"; @@ -7522,6 +7522,11 @@ rec { packageId = "tokio"; features = [ "net" "io-std" "time" "sync" ]; } + { + name = "tonic"; + packageId = "tonic"; + optional = true; + } { name = "tracing"; packageId = "tracing"; @@ -7538,6 +7543,10 @@ rec { packageId = "tokio"; features = [ "macros" "rt" "io-util" ]; } + { + name = "tonic"; + packageId = "tonic"; + } ]; features = { "clap" = [ "dep:clap" ]; @@ -7549,10 +7558,12 @@ rec { "serde_with" = [ "dep:serde_with" ]; "socket2" = [ "dep:socket2" ]; "socket_options" = [ "socket2" ]; + "tonic" = [ "dep:tonic" ]; + "tonic010" = [ "tonic" ]; "unix_path_tools" = [ "nix" ]; "user_facing_default" = [ "inetd" "unix" "unix_path_tools" "sd_listen" "socket_options" ]; }; - resolvedDefaultFeatures = [ "default" "hyper" "hyper014" "inetd" "nix" "sd_listen" "socket2" "socket_options" "unix" "unix_path_tools" "user_facing_default" ]; + resolvedDefaultFeatures = [ "default" "hyper" "hyper014" "inetd" "nix" "sd_listen" "socket2" "socket_options" "tonic" "tonic010" "unix" "unix_path_tools" "user_facing_default" ]; }; "tokio-macros" = rec { crateName = "tokio-macros"; @@ -9003,6 +9014,7 @@ rec { { name = "tokio-listener"; packageId = "tokio-listener"; + features = [ "tonic010" ]; } { name = "tokio-stream"; diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 95c474d6b9..c55eb80f4d 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -20,9 +20,10 @@ prost = "0.12.1" sha2 = "0.10.6" sled = { version = "0.34.7", features = ["compression"] } thiserror = "1.0.38" +tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } +tokio-listener = { version = "0.2.2", features = [ "tonic010" ] } tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } -tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } tonic = { version = "0.10.2", features = ["tls", "tls-roots"] } tower = "0.4.13" tracing = "0.1.37" @@ -30,7 +31,6 @@ tracing-subscriber = { version = "0.3.16", features = ["json"] } tvix-castore = { path = "../castore" } url = "2.4.0" walkdir = "2.4.0" -tokio-listener = { version = "0.2.1" } async-recursion = "1.0.5" [dependencies.fuse-backend-rs] diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 2f7589b073..db19c532f3 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -7,6 +7,9 @@ use std::io; use std::path::Path; use std::path::PathBuf; use tokio::task::JoinHandle; +use tokio_listener::Listener; +use tokio_listener::SystemOptions; +use tokio_listener::UserOptions; use tracing_subscriber::prelude::*; use tvix_castore::blobservice; use tvix_castore::directoryservice; @@ -17,7 +20,6 @@ use tvix_castore::proto::node::Node; use tvix_castore::proto::GRPCBlobServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper; use tvix_castore::proto::NamedNode; -use tvix_store::listener::ListenerStream; use tvix_store::pathinfoservice; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; @@ -228,7 +230,12 @@ async fn main() -> Result<(), Box> { info!("tvix-store listening on {}", listen_address); - let listener = ListenerStream::bind(&listen_address).await?; + let listener = Listener::bind( + &listen_address, + &SystemOptions::default(), + &UserOptions::default(), + ) + .await?; router.serve_with_incoming(listener).await?; } diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index c988e14717..c591214533 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,7 +1,6 @@ #[cfg(feature = "fs")] pub mod fs; -pub mod listener; pub mod nar; pub mod pathinfoservice; pub mod proto; diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs deleted file mode 100644 index 1340e3a2ed..0000000000 --- a/tvix/store/src/listener/mod.rs +++ /dev/null @@ -1,131 +0,0 @@ -use std::{ - io, - ops::{Deref, DerefMut}, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::Stream; -use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_listener::{Listener, ListenerAddress}; -use tonic::transport::server::{Connected, TcpConnectInfo, UdsConnectInfo}; - -/// A wrapper around a [Listener] which implements the [Stream] trait. -/// Mainly used to bridge [tokio_listener] with [tonic]. -pub struct ListenerStream { - inner: Listener, -} - -impl ListenerStream { - /// Convert a [Listener] into a [Stream]. - pub fn new(inner: Listener) -> Self { - Self { inner } - } - - /// Binds to the specified address and returns a [Stream] of connections. - pub async fn bind(addr: &ListenerAddress) -> io::Result { - let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?; - - Ok(Self::new(listener)) - } -} - -impl Stream for ListenerStream { - type Item = io::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.inner.poll_accept(cx) { - Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))), - Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), - Poll::Pending => Poll::Pending, - } - } -} - -pin_project! { - /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait - /// so it is compatible with [tonic]. - pub struct Connection { - #[pin] - inner: tokio_listener::Connection, - } -} - -impl Connection { - fn new(inner: tokio_listener::Connection) -> Self { - Self { inner } - } -} - -impl Deref for Connection { - type Target = tokio_listener::Connection; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for Connection { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -#[derive(Clone)] -pub enum ListenerConnectInfo { - TCP(TcpConnectInfo), - Unix(UdsConnectInfo), - Stdio, - Other, -} - -impl Connected for Connection { - type ConnectInfo = ListenerConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - if let Some(tcp_stream) = self.try_borrow_tcp() { - ListenerConnectInfo::TCP(tcp_stream.connect_info()) - } else if let Some(unix_stream) = self.try_borrow_unix() { - ListenerConnectInfo::Unix(unix_stream.connect_info()) - } else if self.try_borrow_stdio().is_some() { - ListenerConnectInfo::Stdio - } else { - ListenerConnectInfo::Other - } - } -} - -impl AsyncRead for Connection { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - self.project().inner.poll_read(cx, buf) - } -} - -impl AsyncWrite for Connection { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.project().inner.poll_shutdown(cx) - } -} -- cgit 1.4.1