From e3d72cc4cb6714f683e97e8405f296977335efa6 Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Sat, 14 Oct 2023 11:58:23 -0500 Subject: refactor(tvix/store): Upgrade tokio-listener to get tonic support Tonic support was added to tokio-listener upstream which removes the need for use to have tonic compatibility wrapper types around it. See: https://github.com/vi/tokio-listener/pull/2 Fixes b/311 Change-Id: I04a2dbb3bc3c8bfe9339583c0b46070c7ec97811 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9721 Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/store/src/bin/tvix-store.rs | 11 +++- tvix/store/src/lib.rs | 1 - tvix/store/src/listener/mod.rs | 131 --------------------------------------- 3 files changed, 9 insertions(+), 134 deletions(-) delete mode 100644 tvix/store/src/listener/mod.rs (limited to 'tvix/store/src') diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 2f7589b073..db19c532f3 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -7,6 +7,9 @@ use std::io; use std::path::Path; use std::path::PathBuf; use tokio::task::JoinHandle; +use tokio_listener::Listener; +use tokio_listener::SystemOptions; +use tokio_listener::UserOptions; use tracing_subscriber::prelude::*; use tvix_castore::blobservice; use tvix_castore::directoryservice; @@ -17,7 +20,6 @@ use tvix_castore::proto::node::Node; use tvix_castore::proto::GRPCBlobServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper; use tvix_castore::proto::NamedNode; -use tvix_store::listener::ListenerStream; use tvix_store::pathinfoservice; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; @@ -228,7 +230,12 @@ async fn main() -> Result<(), Box> { info!("tvix-store listening on {}", listen_address); - let listener = ListenerStream::bind(&listen_address).await?; + let listener = Listener::bind( + &listen_address, + &SystemOptions::default(), + &UserOptions::default(), + ) + .await?; router.serve_with_incoming(listener).await?; } diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index c988e14717..c591214533 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,7 +1,6 @@ #[cfg(feature = "fs")] pub mod fs; -pub mod listener; pub mod nar; pub mod pathinfoservice; pub mod proto; diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs deleted file mode 100644 index 1340e3a2ed..0000000000 --- a/tvix/store/src/listener/mod.rs +++ /dev/null @@ -1,131 +0,0 @@ -use std::{ - io, - ops::{Deref, DerefMut}, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::Stream; -use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_listener::{Listener, ListenerAddress}; -use tonic::transport::server::{Connected, TcpConnectInfo, UdsConnectInfo}; - -/// A wrapper around a [Listener] which implements the [Stream] trait. -/// Mainly used to bridge [tokio_listener] with [tonic]. -pub struct ListenerStream { - inner: Listener, -} - -impl ListenerStream { - /// Convert a [Listener] into a [Stream]. - pub fn new(inner: Listener) -> Self { - Self { inner } - } - - /// Binds to the specified address and returns a [Stream] of connections. - pub async fn bind(addr: &ListenerAddress) -> io::Result { - let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?; - - Ok(Self::new(listener)) - } -} - -impl Stream for ListenerStream { - type Item = io::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.inner.poll_accept(cx) { - Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))), - Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), - Poll::Pending => Poll::Pending, - } - } -} - -pin_project! { - /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait - /// so it is compatible with [tonic]. - pub struct Connection { - #[pin] - inner: tokio_listener::Connection, - } -} - -impl Connection { - fn new(inner: tokio_listener::Connection) -> Self { - Self { inner } - } -} - -impl Deref for Connection { - type Target = tokio_listener::Connection; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for Connection { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -#[derive(Clone)] -pub enum ListenerConnectInfo { - TCP(TcpConnectInfo), - Unix(UdsConnectInfo), - Stdio, - Other, -} - -impl Connected for Connection { - type ConnectInfo = ListenerConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - if let Some(tcp_stream) = self.try_borrow_tcp() { - ListenerConnectInfo::TCP(tcp_stream.connect_info()) - } else if let Some(unix_stream) = self.try_borrow_unix() { - ListenerConnectInfo::Unix(unix_stream.connect_info()) - } else if self.try_borrow_stdio().is_some() { - ListenerConnectInfo::Stdio - } else { - ListenerConnectInfo::Other - } - } -} - -impl AsyncRead for Connection { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - self.project().inner.poll_read(cx, buf) - } -} - -impl AsyncWrite for Connection { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.project().inner.poll_shutdown(cx) - } -} -- cgit 1.4.1