about summary refs log tree commit diff
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-10-14T16·58-0500
committerConnor Brewster <cbrewster@hey.com>2023-10-14T18·56+0000
commite3d72cc4cb6714f683e97e8405f296977335efa6 (patch)
treef02f6ea7a1373d1f87e53431f6d9bf8cdd2072d1
parent8e811fe62536a45b15e4333a0542d60dbbc74f43 (diff)
refactor(tvix/store): Upgrade tokio-listener to get tonic support r/6810
Tonic support was added to tokio-listener upstream which removes the
need for use to have tonic compatibility wrapper types around it.

See: https://github.com/vi/tokio-listener/pull/2

Fixes b/311

Change-Id: I04a2dbb3bc3c8bfe9339583c0b46070c7ec97811
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9721
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
-rw-r--r--tvix/Cargo.lock5
-rw-r--r--tvix/Cargo.nix18
-rw-r--r--tvix/store/Cargo.toml4
-rw-r--r--tvix/store/src/bin/tvix-store.rs11
-rw-r--r--tvix/store/src/lib.rs1
-rw-r--r--tvix/store/src/listener/mod.rs131
6 files changed, 29 insertions, 141 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 3ac45f302495..1f24d91e78d4 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -2570,9 +2570,9 @@ dependencies = [
 
 [[package]]
 name = "tokio-listener"
-version = "0.2.1"
+version = "0.2.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "05875e290052a679fec29aef1da57eb4da3dafb59c6fe579fb143ccca3dea7fb"
+checksum = "669ed78565b6ce6482aaf8c1f67e0ae1fa1cf1a97c090e96994d502857675d45"
 dependencies = [
  "document-features",
  "futures-core",
@@ -2581,6 +2581,7 @@ dependencies = [
  "pin-project",
  "socket2 0.5.4",
  "tokio",
+ "tonic",
  "tracing",
 ]
 
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 11d6e0ac7e9c..8848a3fe159a 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -7481,9 +7481,9 @@ rec {
       };
       "tokio-listener" = rec {
         crateName = "tokio-listener";
-        version = "0.2.1";
+        version = "0.2.2";
         edition = "2021";
-        sha256 = "1yx7vsiwqg0lzdwyavwwnnpkvnmlgsjivvwsqbz7k9jj00lmx1q5";
+        sha256 = "0iaxcxbjhl2dk6b0w2bwm7qiryp119zgdhgqma169kmncn2xg7k6";
         dependencies = [
           {
             name = "document-features";
@@ -7523,6 +7523,11 @@ rec {
             features = [ "net" "io-std" "time" "sync" ];
           }
           {
+            name = "tonic";
+            packageId = "tonic";
+            optional = true;
+          }
+          {
             name = "tracing";
             packageId = "tracing";
           }
@@ -7538,6 +7543,10 @@ rec {
             packageId = "tokio";
             features = [ "macros" "rt" "io-util" ];
           }
+          {
+            name = "tonic";
+            packageId = "tonic";
+          }
         ];
         features = {
           "clap" = [ "dep:clap" ];
@@ -7549,10 +7558,12 @@ rec {
           "serde_with" = [ "dep:serde_with" ];
           "socket2" = [ "dep:socket2" ];
           "socket_options" = [ "socket2" ];
+          "tonic" = [ "dep:tonic" ];
+          "tonic010" = [ "tonic" ];
           "unix_path_tools" = [ "nix" ];
           "user_facing_default" = [ "inetd" "unix" "unix_path_tools" "sd_listen" "socket_options" ];
         };
-        resolvedDefaultFeatures = [ "default" "hyper" "hyper014" "inetd" "nix" "sd_listen" "socket2" "socket_options" "unix" "unix_path_tools" "user_facing_default" ];
+        resolvedDefaultFeatures = [ "default" "hyper" "hyper014" "inetd" "nix" "sd_listen" "socket2" "socket_options" "tonic" "tonic010" "unix" "unix_path_tools" "user_facing_default" ];
       };
       "tokio-macros" = rec {
         crateName = "tokio-macros";
@@ -9003,6 +9014,7 @@ rec {
           {
             name = "tokio-listener";
             packageId = "tokio-listener";
+            features = [ "tonic010" ];
           }
           {
             name = "tokio-stream";
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 95c474d6b929..c55eb80f4dc3 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -20,9 +20,10 @@ prost = "0.12.1"
 sha2 = "0.10.6"
 sled = { version = "0.34.7", features = ["compression"] }
 thiserror = "1.0.38"
+tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
+tokio-listener = { version = "0.2.2", features = [ "tonic010" ] }
 tokio-stream = { version = "0.1.14", features = ["fs"] }
 tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
-tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
 tonic = { version = "0.10.2", features = ["tls", "tls-roots"] }
 tower = "0.4.13"
 tracing = "0.1.37"
@@ -30,7 +31,6 @@ tracing-subscriber = { version = "0.3.16", features = ["json"] }
 tvix-castore = { path = "../castore" }
 url = "2.4.0"
 walkdir = "2.4.0"
-tokio-listener = { version = "0.2.1" }
 async-recursion = "1.0.5"
 
 [dependencies.fuse-backend-rs]
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 2f7589b07356..db19c532f380 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -7,6 +7,9 @@ use std::io;
 use std::path::Path;
 use std::path::PathBuf;
 use tokio::task::JoinHandle;
+use tokio_listener::Listener;
+use tokio_listener::SystemOptions;
+use tokio_listener::UserOptions;
 use tracing_subscriber::prelude::*;
 use tvix_castore::blobservice;
 use tvix_castore::directoryservice;
@@ -17,7 +20,6 @@ use tvix_castore::proto::node::Node;
 use tvix_castore::proto::GRPCBlobServiceWrapper;
 use tvix_castore::proto::GRPCDirectoryServiceWrapper;
 use tvix_castore::proto::NamedNode;
-use tvix_store::listener::ListenerStream;
 use tvix_store::pathinfoservice;
 use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
 use tvix_store::proto::GRPCPathInfoServiceWrapper;
@@ -228,7 +230,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
             info!("tvix-store listening on {}", listen_address);
 
-            let listener = ListenerStream::bind(&listen_address).await?;
+            let listener = Listener::bind(
+                &listen_address,
+                &SystemOptions::default(),
+                &UserOptions::default(),
+            )
+            .await?;
 
             router.serve_with_incoming(listener).await?;
         }
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index c988e147174b..c59121453352 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,7 +1,6 @@
 #[cfg(feature = "fs")]
 pub mod fs;
 
-pub mod listener;
 pub mod nar;
 pub mod pathinfoservice;
 pub mod proto;
diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs
deleted file mode 100644
index 1340e3a2ed65..000000000000
--- a/tvix/store/src/listener/mod.rs
+++ /dev/null
@@ -1,131 +0,0 @@
-use std::{
-    io,
-    ops::{Deref, DerefMut},
-    pin::Pin,
-    task::{Context, Poll},
-};
-
-use futures::Stream;
-use pin_project_lite::pin_project;
-use tokio::io::{AsyncRead, AsyncWrite};
-use tokio_listener::{Listener, ListenerAddress};
-use tonic::transport::server::{Connected, TcpConnectInfo, UdsConnectInfo};
-
-/// A wrapper around a [Listener] which implements the [Stream] trait.
-/// Mainly used to bridge [tokio_listener] with [tonic].
-pub struct ListenerStream {
-    inner: Listener,
-}
-
-impl ListenerStream {
-    /// Convert a [Listener] into a [Stream].
-    pub fn new(inner: Listener) -> Self {
-        Self { inner }
-    }
-
-    /// Binds to the specified address and returns a [Stream] of connections.
-    pub async fn bind(addr: &ListenerAddress) -> io::Result<Self> {
-        let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?;
-
-        Ok(Self::new(listener))
-    }
-}
-
-impl Stream for ListenerStream {
-    type Item = io::Result<Connection>;
-
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        match self.inner.poll_accept(cx) {
-            Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))),
-            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
-            Poll::Pending => Poll::Pending,
-        }
-    }
-}
-
-pin_project! {
-    /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait
-    /// so it is compatible with [tonic].
-    pub struct Connection {
-        #[pin]
-        inner: tokio_listener::Connection,
-    }
-}
-
-impl Connection {
-    fn new(inner: tokio_listener::Connection) -> Self {
-        Self { inner }
-    }
-}
-
-impl Deref for Connection {
-    type Target = tokio_listener::Connection;
-
-    fn deref(&self) -> &Self::Target {
-        &self.inner
-    }
-}
-
-impl DerefMut for Connection {
-    fn deref_mut(&mut self) -> &mut Self::Target {
-        &mut self.inner
-    }
-}
-
-#[derive(Clone)]
-pub enum ListenerConnectInfo {
-    TCP(TcpConnectInfo),
-    Unix(UdsConnectInfo),
-    Stdio,
-    Other,
-}
-
-impl Connected for Connection {
-    type ConnectInfo = ListenerConnectInfo;
-
-    fn connect_info(&self) -> Self::ConnectInfo {
-        if let Some(tcp_stream) = self.try_borrow_tcp() {
-            ListenerConnectInfo::TCP(tcp_stream.connect_info())
-        } else if let Some(unix_stream) = self.try_borrow_unix() {
-            ListenerConnectInfo::Unix(unix_stream.connect_info())
-        } else if self.try_borrow_stdio().is_some() {
-            ListenerConnectInfo::Stdio
-        } else {
-            ListenerConnectInfo::Other
-        }
-    }
-}
-
-impl AsyncRead for Connection {
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &mut tokio::io::ReadBuf<'_>,
-    ) -> Poll<io::Result<()>> {
-        self.project().inner.poll_read(cx, buf)
-    }
-}
-
-impl AsyncWrite for Connection {
-    fn poll_write(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> Poll<std::result::Result<usize, io::Error>> {
-        self.project().inner.poll_write(cx, buf)
-    }
-
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<std::result::Result<(), io::Error>> {
-        self.project().inner.poll_flush(cx)
-    }
-
-    fn poll_shutdown(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<std::result::Result<(), io::Error>> {
-        self.project().inner.poll_shutdown(cx)
-    }
-}