diff options
Diffstat (limited to 'tvix/store/src/listener/mod.rs')
-rw-r--r-- | tvix/store/src/listener/mod.rs | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs new file mode 100644 index 000000000000..3a44a0511cb8 --- /dev/null +++ b/tvix/store/src/listener/mod.rs @@ -0,0 +1,115 @@ +use std::{ + io, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::Stream; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_listener::{Listener, ListenerAddress}; +use tonic::transport::server::Connected; + +/// A wrapper around a [Listener] which implements the [Stream] trait. +/// Mainly used to bridge [tokio_listener] with [tonic]. +pub struct ListenerStream { + inner: Listener, +} + +impl ListenerStream { + /// Convert a [Listener] into a [Stream]. + pub fn new(inner: Listener) -> Self { + Self { inner } + } + + /// Binds to the specified address and returns a [Stream] of connections. + pub async fn bind(addr: &ListenerAddress) -> io::Result<Self> { + let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?; + + Ok(Self::new(listener)) + } +} + +impl Stream for ListenerStream { + type Item = io::Result<Connection>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +pin_project! { + /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait + /// so it is compatible with [tonic]. + pub struct Connection { + #[pin] + inner: tokio_listener::Connection, + } +} + +impl Connection { + fn new(inner: tokio_listener::Connection) -> Self { + Self { inner } + } +} + +impl Deref for Connection { + type Target = tokio_listener::Connection; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for Connection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Connected for Connection { + type ConnectInfo = (); + + fn connect_info(&self) -> Self::ConnectInfo { + () + } +} + +impl AsyncRead for Connection { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncWrite for Connection { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll<std::result::Result<usize, io::Error>> { + self.project().inner.poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<std::result::Result<(), io::Error>> { + self.project().inner.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<std::result::Result<(), io::Error>> { + self.project().inner.poll_shutdown(cx) + } +} |