about summary refs log tree commit diff
path: root/tvix/store/src/listener/mod.rs
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-10-14T16·58-0500
committerConnor Brewster <cbrewster@hey.com>2023-10-14T18·56+0000
commite3d72cc4cb6714f683e97e8405f296977335efa6 (patch)
treef02f6ea7a1373d1f87e53431f6d9bf8cdd2072d1 /tvix/store/src/listener/mod.rs
parent8e811fe62536a45b15e4333a0542d60dbbc74f43 (diff)
refactor(tvix/store): Upgrade tokio-listener to get tonic support r/6810
Tonic support was added to tokio-listener upstream which removes the
need for use to have tonic compatibility wrapper types around it.

See: https://github.com/vi/tokio-listener/pull/2

Fixes b/311

Change-Id: I04a2dbb3bc3c8bfe9339583c0b46070c7ec97811
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9721
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to '')
-rw-r--r--tvix/store/src/listener/mod.rs131
1 files changed, 0 insertions, 131 deletions
diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs
deleted file mode 100644
index 1340e3a2ed..0000000000
--- a/tvix/store/src/listener/mod.rs
+++ /dev/null
@@ -1,131 +0,0 @@
-use std::{
-    io,
-    ops::{Deref, DerefMut},
-    pin::Pin,
-    task::{Context, Poll},
-};
-
-use futures::Stream;
-use pin_project_lite::pin_project;
-use tokio::io::{AsyncRead, AsyncWrite};
-use tokio_listener::{Listener, ListenerAddress};
-use tonic::transport::server::{Connected, TcpConnectInfo, UdsConnectInfo};
-
-/// A wrapper around a [Listener] which implements the [Stream] trait.
-/// Mainly used to bridge [tokio_listener] with [tonic].
-pub struct ListenerStream {
-    inner: Listener,
-}
-
-impl ListenerStream {
-    /// Convert a [Listener] into a [Stream].
-    pub fn new(inner: Listener) -> Self {
-        Self { inner }
-    }
-
-    /// Binds to the specified address and returns a [Stream] of connections.
-    pub async fn bind(addr: &ListenerAddress) -> io::Result<Self> {
-        let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?;
-
-        Ok(Self::new(listener))
-    }
-}
-
-impl Stream for ListenerStream {
-    type Item = io::Result<Connection>;
-
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        match self.inner.poll_accept(cx) {
-            Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))),
-            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
-            Poll::Pending => Poll::Pending,
-        }
-    }
-}
-
-pin_project! {
-    /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait
-    /// so it is compatible with [tonic].
-    pub struct Connection {
-        #[pin]
-        inner: tokio_listener::Connection,
-    }
-}
-
-impl Connection {
-    fn new(inner: tokio_listener::Connection) -> Self {
-        Self { inner }
-    }
-}
-
-impl Deref for Connection {
-    type Target = tokio_listener::Connection;
-
-    fn deref(&self) -> &Self::Target {
-        &self.inner
-    }
-}
-
-impl DerefMut for Connection {
-    fn deref_mut(&mut self) -> &mut Self::Target {
-        &mut self.inner
-    }
-}
-
-#[derive(Clone)]
-pub enum ListenerConnectInfo {
-    TCP(TcpConnectInfo),
-    Unix(UdsConnectInfo),
-    Stdio,
-    Other,
-}
-
-impl Connected for Connection {
-    type ConnectInfo = ListenerConnectInfo;
-
-    fn connect_info(&self) -> Self::ConnectInfo {
-        if let Some(tcp_stream) = self.try_borrow_tcp() {
-            ListenerConnectInfo::TCP(tcp_stream.connect_info())
-        } else if let Some(unix_stream) = self.try_borrow_unix() {
-            ListenerConnectInfo::Unix(unix_stream.connect_info())
-        } else if self.try_borrow_stdio().is_some() {
-            ListenerConnectInfo::Stdio
-        } else {
-            ListenerConnectInfo::Other
-        }
-    }
-}
-
-impl AsyncRead for Connection {
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &mut tokio::io::ReadBuf<'_>,
-    ) -> Poll<io::Result<()>> {
-        self.project().inner.poll_read(cx, buf)
-    }
-}
-
-impl AsyncWrite for Connection {
-    fn poll_write(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> Poll<std::result::Result<usize, io::Error>> {
-        self.project().inner.poll_write(cx, buf)
-    }
-
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<std::result::Result<(), io::Error>> {
-        self.project().inner.poll_flush(cx)
-    }
-
-    fn poll_shutdown(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<std::result::Result<(), io::Error>> {
-        self.project().inner.poll_shutdown(cx)
-    }
-}