diff options
Diffstat (limited to 'tvix/store/src')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 11 | ||||
-rw-r--r-- | tvix/store/src/lib.rs | 1 | ||||
-rw-r--r-- | tvix/store/src/listener/mod.rs | 131 |
3 files changed, 9 insertions, 134 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 2f7589b07356..db19c532f380 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -7,6 +7,9 @@ use std::io; use std::path::Path; use std::path::PathBuf; use tokio::task::JoinHandle; +use tokio_listener::Listener; +use tokio_listener::SystemOptions; +use tokio_listener::UserOptions; use tracing_subscriber::prelude::*; use tvix_castore::blobservice; use tvix_castore::directoryservice; @@ -17,7 +20,6 @@ use tvix_castore::proto::node::Node; use tvix_castore::proto::GRPCBlobServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper; use tvix_castore::proto::NamedNode; -use tvix_store::listener::ListenerStream; use tvix_store::pathinfoservice; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; @@ -228,7 +230,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { info!("tvix-store listening on {}", listen_address); - let listener = ListenerStream::bind(&listen_address).await?; + let listener = Listener::bind( + &listen_address, + &SystemOptions::default(), + &UserOptions::default(), + ) + .await?; router.serve_with_incoming(listener).await?; } diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index c988e147174b..c59121453352 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,7 +1,6 @@ #[cfg(feature = "fs")] pub mod fs; -pub mod listener; pub mod nar; pub mod pathinfoservice; pub mod proto; diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs deleted file mode 100644 index 1340e3a2ed65..000000000000 --- a/tvix/store/src/listener/mod.rs +++ /dev/null @@ -1,131 +0,0 @@ -use std::{ - io, - ops::{Deref, DerefMut}, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::Stream; -use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_listener::{Listener, ListenerAddress}; -use tonic::transport::server::{Connected, TcpConnectInfo, UdsConnectInfo}; - -/// A wrapper around a [Listener] which implements the [Stream] trait. -/// Mainly used to bridge [tokio_listener] with [tonic]. -pub struct ListenerStream { - inner: Listener, -} - -impl ListenerStream { - /// Convert a [Listener] into a [Stream]. - pub fn new(inner: Listener) -> Self { - Self { inner } - } - - /// Binds to the specified address and returns a [Stream] of connections. - pub async fn bind(addr: &ListenerAddress) -> io::Result<Self> { - let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?; - - Ok(Self::new(listener)) - } -} - -impl Stream for ListenerStream { - type Item = io::Result<Connection>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - match self.inner.poll_accept(cx) { - Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))), - Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), - Poll::Pending => Poll::Pending, - } - } -} - -pin_project! { - /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait - /// so it is compatible with [tonic]. - pub struct Connection { - #[pin] - inner: tokio_listener::Connection, - } -} - -impl Connection { - fn new(inner: tokio_listener::Connection) -> Self { - Self { inner } - } -} - -impl Deref for Connection { - type Target = tokio_listener::Connection; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for Connection { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -#[derive(Clone)] -pub enum ListenerConnectInfo { - TCP(TcpConnectInfo), - Unix(UdsConnectInfo), - Stdio, - Other, -} - -impl Connected for Connection { - type ConnectInfo = ListenerConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - if let Some(tcp_stream) = self.try_borrow_tcp() { - ListenerConnectInfo::TCP(tcp_stream.connect_info()) - } else if let Some(unix_stream) = self.try_borrow_unix() { - ListenerConnectInfo::Unix(unix_stream.connect_info()) - } else if self.try_borrow_stdio().is_some() { - ListenerConnectInfo::Stdio - } else { - ListenerConnectInfo::Other - } - } -} - -impl AsyncRead for Connection { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - self.project().inner.poll_read(cx, buf) - } -} - -impl AsyncWrite for Connection { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll<std::result::Result<usize, io::Error>> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll<std::result::Result<(), io::Error>> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll<std::result::Result<(), io::Error>> { - self.project().inner.poll_shutdown(cx) - } -} |