From f8746fcab0d6cbd6d9e7a49ce2217f858151a476 Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Fri, 22 Sep 2023 19:45:01 -0500 Subject: feat(tvix/store): Support listening on UNIX domain sockets This adds support for listening on UNIX domain sockets via the tokio-listener crate. The crate will automatically determine whether to start a TCP or UNIX domain socket server based on the listen address. Unfortunately, it's not compatible with tonic right out of the box so I added some wrapper types to implement the necessary traits to make things work. We should investigate upstreaming a `tonic` option to the tokio-listener crate which implements the relevant `tonic` traits. Example: ``` $ tvix-store daemon -l /run/tvix-store.sock INFO tvix_store: tvix-store listening on /run/tvix-store.sock $ tvix-store mount -l /mnt/tvix --blob-service-addr grpc+unix:///run/tvix-store.sock --directory-service-addr grpc+unix:///run/tvix-store.sock --path-info-service-addr grpc+unix:///run/tvix-store.sock $ ls /mnt/tvix ``` Change-Id: I91c4a4b0c5a177b3b90e6c01a4e5d263130e6bdb Reviewed-on: https://cl.tvl.fyi/c/depot/+/9429 Tested-by: BuildkiteCI Reviewed-by: flokli Reviewed-by: raitobezarius --- tvix/store/Cargo.toml | 1 + tvix/store/src/bin/tvix-store.rs | 5 +- tvix/store/src/lib.rs | 1 + tvix/store/src/listener/mod.rs | 115 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 tvix/store/src/listener/mod.rs (limited to 'tvix/store') diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 20909221c524..05b3f4c863f1 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -30,6 +30,7 @@ tracing-subscriber = { version = "0.3.16", features = ["json"] } tvix-castore = { path = "../castore" } url = "2.4.0" walkdir = "2.4.0" +tokio-listener = { version = "0.2.1" } [dependencies.fuse-backend-rs] optional = true diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 813d62cb129a..e80014047ca4 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -17,6 +17,7 @@ use tvix_castore::proto::node::Node; use tvix_castore::proto::GRPCBlobServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper; use tvix_castore::proto::NamedNode; +use tvix_store::listener::ListenerStream; use tvix_store::pathinfoservice; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; @@ -220,7 +221,9 @@ async fn main() -> Result<(), Box> { info!("tvix-store listening on {}", listen_address); - router.serve(listen_address).await?; + let listener = ListenerStream::bind(&listen_address).await?; + + router.serve_with_incoming(listener).await?; } Commands::Import { paths, diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index c59121453352..c988e147174b 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,6 +1,7 @@ #[cfg(feature = "fs")] pub mod fs; +pub mod listener; pub mod nar; pub mod pathinfoservice; pub mod proto; diff --git a/tvix/store/src/listener/mod.rs b/tvix/store/src/listener/mod.rs new file mode 100644 index 000000000000..3a44a0511cb8 --- /dev/null +++ b/tvix/store/src/listener/mod.rs @@ -0,0 +1,115 @@ +use std::{ + io, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::Stream; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_listener::{Listener, ListenerAddress}; +use tonic::transport::server::Connected; + +/// A wrapper around a [Listener] which implements the [Stream] trait. +/// Mainly used to bridge [tokio_listener] with [tonic]. +pub struct ListenerStream { + inner: Listener, +} + +impl ListenerStream { + /// Convert a [Listener] into a [Stream]. + pub fn new(inner: Listener) -> Self { + Self { inner } + } + + /// Binds to the specified address and returns a [Stream] of connections. + pub async fn bind(addr: &ListenerAddress) -> io::Result { + let listener = Listener::bind(addr, &Default::default(), &Default::default()).await?; + + Ok(Self::new(listener)) + } +} + +impl Stream for ListenerStream { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.inner.poll_accept(cx) { + Poll::Ready(Ok((connection, _))) => Poll::Ready(Some(Ok(Connection::new(connection)))), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + +pin_project! { + /// A wrapper around a [tokio_listener::Connection] that implements the [Connected] trait + /// so it is compatible with [tonic]. + pub struct Connection { + #[pin] + inner: tokio_listener::Connection, + } +} + +impl Connection { + fn new(inner: tokio_listener::Connection) -> Self { + Self { inner } + } +} + +impl Deref for Connection { + type Target = tokio_listener::Connection; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for Connection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Connected for Connection { + type ConnectInfo = (); + + fn connect_info(&self) -> Self::ConnectInfo { + () + } +} + +impl AsyncRead for Connection { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncWrite for Connection { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().inner.poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.project().inner.poll_shutdown(cx) + } +} -- cgit 1.4.1