diff options
Diffstat (limited to 'tvix/store/src')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 5 | ||||
-rw-r--r-- | tvix/store/src/lib.rs | 1 | ||||
-rw-r--r-- | tvix/store/src/listener/mod.rs | 115 |
3 files changed, 120 insertions, 1 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 813d62cb129a..e80014047ca4 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -17,6 +17,7 @@ use tvix_castore::proto::node::Node; use tvix_castore::proto::GRPCBlobServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper; use tvix_castore::proto::NamedNode; +use tvix_store::listener::ListenerStream; use tvix_store::pathinfoservice; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; @@ -220,7 +221,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { info!("tvix-store listening on {}", listen_address); - router.serve(listen_address).await?; + let listener = ListenerStream::bind(&listen_address).await?; + + router.serve_with_incoming(listener).await?; } Commands::Import { paths, diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index c59121453352..c988e147174b 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,6 +1,7 @@ #[cfg(feature = "fs")] pub mod fs; +pub mod listener; pub mod nar; pub mod pathinfoservice; pub mod proto; diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs new file mode 100644 index 000000000000..3a44a0511cb8 --- /dev/null +++ b/tvix/store/src/listener/mod.rs @@ -0,0 +1,115 @@ +use std::{ + io, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::Stream; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_listener::{Listener, ListenerAddress}; +use tonic::transport::server::Connected; + +/// A wrapper around a [Listener] which implements the [Stream] trait. +/// Mainly used to bridge [tokio_listener] with [tonic]. +pub struct ListenerStream { + inner: Listener, +} + +impl ListenerStream { + /// Convert a [Listener] into a [Stream]. + pub fn new(inner: Listener) -> Self { + Self { inner } + } + + /// Binds to the specified address and returns a [Stream] of connections. + pub async fn bind(addr: &ListenerAddress) -> io::Result<Self> { + let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?; + + Ok(Self::new(listener)) + } +} + +impl Stream for ListenerStream { + type Item = io::Result<Connection>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +pin_project! { + /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait + /// so it is compatible with [tonic]. + pub struct Connection { + #[pin] + inner: tokio_listener::Connection, + } +} + +impl Connection { + fn new(inner: tokio_listener::Connection) -> Self { + Self { inner } + } +} + +impl Deref for Connection { + type Target = tokio_listener::Connection; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for Connection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Connected for Connection { + type ConnectInfo = (); + + fn connect_info(&self) -> Self::ConnectInfo { + () + } +} + +impl AsyncRead for Connection { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncWrite for Connection { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll<std::result::Result<usize, io::Error>> { + self.project().inner.poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<std::result::Result<(), io::Error>> { + self.project().inner.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<std::result::Result<(), io::Error>> { + self.project().inner.poll_shutdown(cx) + } +} |