diff options
author | Connor Brewster <cbrewster@hey.com> | 2023-10-14T16·58-0500 |
---|---|---|
committer | Connor Brewster <cbrewster@hey.com> | 2023-10-14T18·56+0000 |
commit | e3d72cc4cb6714f683e97e8405f296977335efa6 (patch) | |
tree | f02f6ea7a1373d1f87e53431f6d9bf8cdd2072d1 /tvix/store | |
parent | 8e811fe62536a45b15e4333a0542d60dbbc74f43 (diff) |
refactor(tvix/store): Upgrade tokio-listener to get tonic support r/6810
Tonic support was added to tokio-listener upstream which removes the need for use to have tonic compatibility wrapper types around it. See: https://github.com/vi/tokio-listener/pull/2 Fixes b/311 Change-Id: I04a2dbb3bc3c8bfe9339583c0b46070c7ec97811 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9721 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/Cargo.toml | 4 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 11 | ||||
-rw-r--r-- | tvix/store/src/lib.rs | 1 | ||||
-rw-r--r-- | tvix/store/src/listener/mod.rs | 131 |
4 files changed, 11 insertions, 136 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 95c474d6b929..c55eb80f4dc3 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -20,9 +20,10 @@ prost = "0.12.1" sha2 = "0.10.6" sled = { version = "0.34.7", features = ["compression"] } thiserror = "1.0.38" +tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } +tokio-listener = { version = "0.2.2", features = [ "tonic010" ] } tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } -tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } tonic = { version = "0.10.2", features = ["tls", "tls-roots"] } tower = "0.4.13" tracing = "0.1.37" @@ -30,7 +31,6 @@ tracing-subscriber = { version = "0.3.16", features = ["json"] } tvix-castore = { path = "../castore" } url = "2.4.0" walkdir = "2.4.0" -tokio-listener = { version = "0.2.1" } async-recursion = "1.0.5" [dependencies.fuse-backend-rs] diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 2f7589b07356..db19c532f380 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -7,6 +7,9 @@ use std::io; use std::path::Path; use std::path::PathBuf; use tokio::task::JoinHandle; +use tokio_listener::Listener; +use tokio_listener::SystemOptions; +use tokio_listener::UserOptions; use tracing_subscriber::prelude::*; use tvix_castore::blobservice; use tvix_castore::directoryservice; @@ -17,7 +20,6 @@ use tvix_castore::proto::node::Node; use tvix_castore::proto::GRPCBlobServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper; use tvix_castore::proto::NamedNode; -use tvix_store::listener::ListenerStream; use tvix_store::pathinfoservice; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; @@ -228,7 +230,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { info!("tvix-store listening on {}", listen_address); - let listener = ListenerStream::bind(&listen_address).await?; + let listener = Listener::bind( + &listen_address, + &SystemOptions::default(), + &UserOptions::default(), + ) + .await?; router.serve_with_incoming(listener).await?; } diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index c988e147174b..c59121453352 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,7 +1,6 @@ #[cfg(feature = "fs")] pub mod fs; -pub mod listener; pub mod nar; pub mod pathinfoservice; pub mod proto; diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs deleted file mode 100644 index 1340e3a2ed65..000000000000 --- a/tvix/store/src/listener/mod.rs +++ /dev/null @@ -1,131 +0,0 @@ -use std::{ - io, - ops::{Deref, DerefMut}, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::Stream; -use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_listener::{Listener, ListenerAddress}; -use tonic::transport::server::{Connected, TcpConnectInfo, UdsConnectInfo}; - -/// A wrapper around a [Listener] which implements the [Stream] trait. -/// Mainly used to bridge [tokio_listener] with [tonic]. -pub struct ListenerStream { - inner: Listener, -} - -impl ListenerStream { - /// Convert a [Listener] into a [Stream]. - pub fn new(inner: Listener) -> Self { - Self { inner } - } - - /// Binds to the specified address and returns a [Stream] of connections. - pub async fn bind(addr: &ListenerAddress) -> io::Result<Self> { - let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?; - - Ok(Self::new(listener)) - } -} - -impl Stream for ListenerStream { - type Item = io::Result<Connection>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - match self.inner.poll_accept(cx) { - Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))), - Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), - Poll::Pending => Poll::Pending, - } - } -} - -pin_project! { - /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait - /// so it is compatible with [tonic]. - pub struct Connection { - #[pin] - inner: tokio_listener::Connection, - } -} - -impl Connection { - fn new(inner: tokio_listener::Connection) -> Self { - Self { inner } - } -} - -impl Deref for Connection { - type Target = tokio_listener::Connection; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for Connection { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -#[derive(Clone)] -pub enum ListenerConnectInfo { - TCP(TcpConnectInfo), - Unix(UdsConnectInfo), - Stdio, - Other, -} - -impl Connected for Connection { - type ConnectInfo = ListenerConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - if let Some(tcp_stream) = self.try_borrow_tcp() { - ListenerConnectInfo::TCP(tcp_stream.connect_info()) - } else if let Some(unix_stream) = self.try_borrow_unix() { - ListenerConnectInfo::Unix(unix_stream.connect_info()) - } else if self.try_borrow_stdio().is_some() { - ListenerConnectInfo::Stdio - } else { - ListenerConnectInfo::Other - } - } -} - -impl AsyncRead for Connection { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - self.project().inner.poll_read(cx, buf) - } -} - -impl AsyncWrite for Connection { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll<std::result::Result<usize, io::Error>> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll<std::result::Result<(), io::Error>> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll<std::result::Result<(), io::Error>> { - self.project().inner.poll_shutdown(cx) - } -} |