about summary refs log tree commit diff
path: root/tvix/store
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-10-14T16·58-0500
committerConnor Brewster <cbrewster@hey.com>2023-10-14T18·56+0000
commite3d72cc4cb6714f683e97e8405f296977335efa6 (patch)
treef02f6ea7a1373d1f87e53431f6d9bf8cdd2072d1 /tvix/store
parent8e811fe62536a45b15e4333a0542d60dbbc74f43 (diff)
refactor(tvix/store): Upgrade tokio-listener to get tonic support r/6810
Tonic support was added to tokio-listener upstream which removes the
need for use to have tonic compatibility wrapper types around it.

See: https://github.com/vi/tokio-listener/pull/2

Fixes b/311

Change-Id: I04a2dbb3bc3c8bfe9339583c0b46070c7ec97811
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9721
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store')
-rw-r--r--tvix/store/Cargo.toml4
-rw-r--r--tvix/store/src/bin/tvix-store.rs11
-rw-r--r--tvix/store/src/lib.rs1
-rw-r--r--tvix/store/src/listener/mod.rs131
4 files changed, 11 insertions, 136 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 95c474d6b929..c55eb80f4dc3 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -20,9 +20,10 @@ prost = "0.12.1"
 sha2 = "0.10.6"
 sled = { version = "0.34.7", features = ["compression"] }
 thiserror = "1.0.38"
+tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
+tokio-listener = { version = "0.2.2", features = [ "tonic010" ] }
 tokio-stream = { version = "0.1.14", features = ["fs"] }
 tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
-tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
 tonic = { version = "0.10.2", features = ["tls", "tls-roots"] }
 tower = "0.4.13"
 tracing = "0.1.37"
@@ -30,7 +31,6 @@ tracing-subscriber = { version = "0.3.16", features = ["json"] }
 tvix-castore = { path = "../castore" }
 url = "2.4.0"
 walkdir = "2.4.0"
-tokio-listener = { version = "0.2.1" }
 async-recursion = "1.0.5"
 
 [dependencies.fuse-backend-rs]
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 2f7589b07356..db19c532f380 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -7,6 +7,9 @@ use std::io;
 use std::path::Path;
 use std::path::PathBuf;
 use tokio::task::JoinHandle;
+use tokio_listener::Listener;
+use tokio_listener::SystemOptions;
+use tokio_listener::UserOptions;
 use tracing_subscriber::prelude::*;
 use tvix_castore::blobservice;
 use tvix_castore::directoryservice;
@@ -17,7 +20,6 @@ use tvix_castore::proto::node::Node;
 use tvix_castore::proto::GRPCBlobServiceWrapper;
 use tvix_castore::proto::GRPCDirectoryServiceWrapper;
 use tvix_castore::proto::NamedNode;
-use tvix_store::listener::ListenerStream;
 use tvix_store::pathinfoservice;
 use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
 use tvix_store::proto::GRPCPathInfoServiceWrapper;
@@ -228,7 +230,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
             info!("tvix-store listening on {}", listen_address);
 
-            let listener = ListenerStream::bind(&listen_address).await?;
+            let listener = Listener::bind(
+                &listen_address,
+                &SystemOptions::default(),
+                &UserOptions::default(),
+            )
+            .await?;
 
             router.serve_with_incoming(listener).await?;
         }
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index c988e147174b..c59121453352 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,7 +1,6 @@
 #[cfg(feature = "fs")]
 pub mod fs;
 
-pub mod listener;
 pub mod nar;
 pub mod pathinfoservice;
 pub mod proto;
diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs
deleted file mode 100644
index 1340e3a2ed65..000000000000
--- a/tvix/store/src/listener/mod.rs
+++ /dev/null
@@ -1,131 +0,0 @@
-use std::{
-    io,
-    ops::{Deref, DerefMut},
-    pin::Pin,
-    task::{Context, Poll},
-};
-
-use futures::Stream;
-use pin_project_lite::pin_project;
-use tokio::io::{AsyncRead, AsyncWrite};
-use tokio_listener::{Listener, ListenerAddress};
-use tonic::transport::server::{Connected, TcpConnectInfo, UdsConnectInfo};
-
-/// A wrapper around a [Listener] which implements the [Stream] trait.
-/// Mainly used to bridge [tokio_listener] with [tonic].
-pub struct ListenerStream {
-    inner: Listener,
-}
-
-impl ListenerStream {
-    /// Convert a [Listener] into a [Stream].
-    pub fn new(inner: Listener) -> Self {
-        Self { inner }
-    }
-
-    /// Binds to the specified address and returns a [Stream] of connections.
-    pub async fn bind(addr: &ListenerAddress) -> io::Result<Self> {
-        let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?;
-
-        Ok(Self::new(listener))
-    }
-}
-
-impl Stream for ListenerStream {
-    type Item = io::Result<Connection>;
-
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        match self.inner.poll_accept(cx) {
-            Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))),
-            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
-            Poll::Pending => Poll::Pending,
-        }
-    }
-}
-
-pin_project! {
-    /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait
-    /// so it is compatible with [tonic].
-    pub struct Connection {
-        #[pin]
-        inner: tokio_listener::Connection,
-    }
-}
-
-impl Connection {
-    fn new(inner: tokio_listener::Connection) -> Self {
-        Self { inner }
-    }
-}
-
-impl Deref for Connection {
-    type Target = tokio_listener::Connection;
-
-    fn deref(&self) -> &Self::Target {
-        &self.inner
-    }
-}
-
-impl DerefMut for Connection {
-    fn deref_mut(&mut self) -> &mut Self::Target {
-        &mut self.inner
-    }
-}
-
-#[derive(Clone)]
-pub enum ListenerConnectInfo {
-    TCP(TcpConnectInfo),
-    Unix(UdsConnectInfo),
-    Stdio,
-    Other,
-}
-
-impl Connected for Connection {
-    type ConnectInfo = ListenerConnectInfo;
-
-    fn connect_info(&self) -> Self::ConnectInfo {
-        if let Some(tcp_stream) = self.try_borrow_tcp() {
-            ListenerConnectInfo::TCP(tcp_stream.connect_info())
-        } else if let Some(unix_stream) = self.try_borrow_unix() {
-            ListenerConnectInfo::Unix(unix_stream.connect_info())
-        } else if self.try_borrow_stdio().is_some() {
-            ListenerConnectInfo::Stdio
-        } else {
-            ListenerConnectInfo::Other
-        }
-    }
-}
-
-impl AsyncRead for Connection {
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &mut tokio::io::ReadBuf<'_>,
-    ) -> Poll<io::Result<()>> {
-        self.project().inner.poll_read(cx, buf)
-    }
-}
-
-impl AsyncWrite for Connection {
-    fn poll_write(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> Poll<std::result::Result<usize, io::Error>> {
-        self.project().inner.poll_write(cx, buf)
-    }
-
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<std::result::Result<(), io::Error>> {
-        self.project().inner.poll_flush(cx)
-    }
-
-    fn poll_shutdown(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<std::result::Result<(), io::Error>> {
-        self.project().inner.poll_shutdown(cx)
-    }
-}