about summary refs log tree commit diff
path: root/tvix/store
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store')
-rw-r--r--tvix/store/Cargo.toml32
-rw-r--r--tvix/store/src/bin/tvix-store.rs54
-rw-r--r--tvix/store/src/fs/mod.rs3
-rw-r--r--tvix/store/src/fs/virtiofs.rs237
4 files changed, 325 insertions, 1 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index a2e143de7014..20909221c524 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -36,6 +36,35 @@ optional = true
 # TODO: Switch back to upstream version once https://github.com/cloud-hypervisor/fuse-backend-rs/pull/153 lands.
 git = "https://github.com/cbrewster/fuse-backend-rs.git"
 branch = "optional-allow_other"
+# Ideally this would only be enabled if virtiofs is enabled
+# Ex: virtiofs = [..., "fuse-backend-rs/?vhost-user-fs", ...]
+# However, crate2nix doesn't properly understand this syntax and doesn't
+# enable this feature properly.
+features = ["vhost-user-fs"]
+
+[dependencies.vhost]
+optional = true
+version = "0.6"
+
+[dependencies.vhost-user-backend]
+optional = true
+version = "0.8"
+
+[dependencies.virtio-queue]
+optional = true
+version = "0.7"
+
+[dependencies.vm-memory]
+optional = true
+version = "0.10"
+
+[dependencies.vmm-sys-util]
+optional = true
+version = "0.11"
+
+[dependencies.virtio-bindings]
+optional = true
+version = "0.2.1"
 
 [dependencies.tonic-reflection]
 optional = true
@@ -55,7 +84,8 @@ tempfile = "3.3.0"
 tonic-mock = { git = "https://github.com/brainrake/tonic-mock", branch = "bump-dependencies" }
 
 [features]
-default = ["fuse", "reflection"]
+default = ["fuse", "virtiofs", "reflection"]
 fs = ["dep:libc", "dep:fuse-backend-rs"]
+virtiofs = ["fs", "dep:vhost", "dep:vhost-user-backend", "dep:virtio-queue", "dep:vm-memory", "dep:vmm-sys-util", "dep:virtio-bindings"]
 fuse = ["fs"]
 reflection = ["tonic-reflection"]
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 474a48c9fd1a..813d62cb129a 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -29,6 +29,9 @@ use tvix_store::fs::TvixStoreFs;
 #[cfg(feature = "fuse")]
 use tvix_store::fs::fuse::FuseDaemon;
 
+#[cfg(feature = "virtiofs")]
+use tvix_store::fs::virtiofs::start_virtiofs_daemon;
+
 #[cfg(feature = "reflection")]
 use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET;
 #[cfg(feature = "reflection")]
@@ -111,6 +114,28 @@ enum Commands {
         #[clap(long, short, action)]
         list_root: bool,
     },
+    /// Starts a tvix-store virtiofs daemon at the given socket path.
+    #[cfg(feature = "virtiofs")]
+    #[command(name = "virtiofs")]
+    VirtioFs {
+        #[clap(value_name = "PATH")]
+        socket: PathBuf,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+
+        /// Whether to list elements at the root of the mount point.
+        /// This is useful if your PathInfoService doesn't provide an
+        /// (exhaustive) listing.
+        #[clap(long, short, action)]
+        list_root: bool,
+    },
 }
 
 #[cfg(feature = "fuse")]
@@ -328,6 +353,35 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             })
             .await??;
         }
+        #[cfg(feature = "virtiofs")]
+        Commands::VirtioFs {
+            socket,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            list_root,
+        } => {
+            let blob_service = blobservice::from_addr(&blob_service_addr)?;
+            let directory_service = directoryservice::from_addr(&directory_service_addr)?;
+            let path_info_service = pathinfoservice::from_addr(
+                &path_info_service_addr,
+                blob_service.clone(),
+                directory_service.clone(),
+            )?;
+
+            tokio::task::spawn_blocking(move || {
+                let fs = TvixStoreFs::new(
+                    blob_service,
+                    directory_service,
+                    path_info_service,
+                    list_root,
+                );
+                info!("starting tvix-store virtiofs daemon on {:?}", &socket);
+
+                start_virtiofs_daemon(fs, socket)
+            })
+            .await??;
+        }
     };
     Ok(())
 }
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs
index 59b8f0d0854f..91adfa35f0e0 100644
--- a/tvix/store/src/fs/mod.rs
+++ b/tvix/store/src/fs/mod.rs
@@ -5,6 +5,9 @@ mod inodes;
 #[cfg(feature = "fuse")]
 pub mod fuse;
 
+#[cfg(feature = "virtiofs")]
+pub mod virtiofs;
+
 #[cfg(test)]
 mod tests;
 
diff --git a/tvix/store/src/fs/virtiofs.rs b/tvix/store/src/fs/virtiofs.rs
new file mode 100644
index 000000000000..3786a84285cd
--- /dev/null
+++ b/tvix/store/src/fs/virtiofs.rs
@@ -0,0 +1,237 @@
+use std::{
+    convert, error, fmt, io,
+    ops::Deref,
+    path::Path,
+    sync::{Arc, MutexGuard, RwLock},
+};
+
+use fuse_backend_rs::{
+    api::{filesystem::FileSystem, server::Server},
+    transport::{FsCacheReqHandler, Reader, VirtioFsWriter},
+};
+use tracing::error;
+use vhost::vhost_user::{
+    Listener, SlaveFsCacheReq, VhostUserProtocolFeatures, VhostUserVirtioFeatures,
+};
+use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringMutex, VringState, VringT};
+use virtio_bindings::bindings::virtio_ring::{
+    VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC,
+};
+use virtio_queue::QueueT;
+use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
+use vmm_sys_util::epoll::EventSet;
+
+const VIRTIO_F_VERSION_1: u32 = 32;
+const NUM_QUEUES: usize = 2;
+const QUEUE_SIZE: usize = 1024;
+
+#[derive(Debug)]
+enum Error {
+    /// Failed to handle non-input event.
+    HandleEventNotEpollIn,
+    /// Failed to handle unknown event.
+    HandleEventUnknownEvent,
+    /// Invalid descriptor chain.
+    InvlaidDescriptorChain,
+    /// Failed to handle filesystem requests.
+    HandleRequests(fuse_backend_rs::Error),
+    /// Failed to construct new vhost user daemon.
+    NewDaemon,
+    /// Failed to start the vhost user daemon.
+    StartDaemon,
+    /// Failed to wait for the vhost user daemon.
+    WaitDaemon,
+}
+
+impl fmt::Display for Error {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "vhost_user_fs_error: {self:?}")
+    }
+}
+
+impl error::Error for Error {}
+
+impl convert::From<Error> for io::Error {
+    fn from(e: Error) -> Self {
+        io::Error::new(io::ErrorKind::Other, e)
+    }
+}
+
+struct VhostUserFsBackend<FS>
+where
+    FS: FileSystem + Send + Sync,
+{
+    server: Arc<Server<Arc<FS>>>,
+    event_idx: bool,
+    guest_mem: GuestMemoryAtomic<GuestMemoryMmap>,
+    cache_req: Option<SlaveFsCacheReq>,
+}
+
+impl<FS> VhostUserFsBackend<FS>
+where
+    FS: FileSystem + Send + Sync,
+{
+    fn process_queue(&mut self, vring: &mut MutexGuard<VringState>) -> std::io::Result<bool> {
+        let mut used_descs = false;
+
+        while let Some(desc_chain) = vring
+            .get_queue_mut()
+            .pop_descriptor_chain(self.guest_mem.memory())
+        {
+            let memory = desc_chain.memory();
+            let reader = Reader::from_descriptor_chain(memory, desc_chain.clone())
+                .map_err(|_| Error::InvlaidDescriptorChain)?;
+            let writer = VirtioFsWriter::new(memory, desc_chain.clone())
+                .map_err(|_| Error::InvlaidDescriptorChain)?;
+
+            self.server
+                .handle_message(
+                    reader,
+                    writer.into(),
+                    self.cache_req
+                        .as_mut()
+                        .map(|req| req as &mut dyn FsCacheReqHandler),
+                    None,
+                )
+                .map_err(Error::HandleRequests)?;
+
+            // TODO: Is len 0 correct?
+            if let Err(error) = vring
+                .get_queue_mut()
+                .add_used(memory, desc_chain.head_index(), 0)
+            {
+                error!(?error, "failed to add desc back to ring");
+            }
+
+            // TODO: What happens if we error out before here?
+            used_descs = true;
+        }
+
+        let needs_notification = if self.event_idx {
+            match vring
+                .get_queue_mut()
+                .needs_notification(self.guest_mem.memory().deref())
+            {
+                Ok(needs_notification) => needs_notification,
+                Err(error) => {
+                    error!(?error, "failed to check if queue needs notification");
+                    true
+                }
+            }
+        } else {
+            true
+        };
+
+        if needs_notification {
+            if let Err(error) = vring.signal_used_queue() {
+                error!(?error, "failed to signal used queue");
+            }
+        }
+
+        Ok(used_descs)
+    }
+}
+
+impl<FS> VhostUserBackendMut<VringMutex> for VhostUserFsBackend<FS>
+where
+    FS: FileSystem + Send + Sync,
+{
+    fn num_queues(&self) -> usize {
+        NUM_QUEUES
+    }
+
+    fn max_queue_size(&self) -> usize {
+        QUEUE_SIZE
+    }
+
+    fn features(&self) -> u64 {
+        1 << VIRTIO_F_VERSION_1
+            | 1 << VIRTIO_RING_F_INDIRECT_DESC
+            | 1 << VIRTIO_RING_F_EVENT_IDX
+            | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()
+    }
+
+    fn protocol_features(&self) -> VhostUserProtocolFeatures {
+        VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::SLAVE_REQ
+    }
+
+    fn set_event_idx(&mut self, enabled: bool) {
+        self.event_idx = enabled;
+    }
+
+    fn update_memory(&mut self, _mem: GuestMemoryAtomic<GuestMemoryMmap>) -> std::io::Result<()> {
+        // This is what most the vhost user implementations do...
+        Ok(())
+    }
+
+    fn set_slave_req_fd(&mut self, cache_req: SlaveFsCacheReq) {
+        self.cache_req = Some(cache_req);
+    }
+
+    fn handle_event(
+        &mut self,
+        device_event: u16,
+        evset: vmm_sys_util::epoll::EventSet,
+        vrings: &[VringMutex],
+        _thread_id: usize,
+    ) -> std::io::Result<bool> {
+        if evset != EventSet::IN {
+            return Err(Error::HandleEventNotEpollIn.into());
+        }
+
+        let mut queue = match device_event {
+            // High priority queue
+            0 => vrings[0].get_mut(),
+            // Regurlar priority queue
+            1 => vrings[1].get_mut(),
+            _ => {
+                return Err(Error::HandleEventUnknownEvent.into());
+            }
+        };
+
+        if self.event_idx {
+            loop {
+                queue
+                    .get_queue_mut()
+                    .enable_notification(self.guest_mem.memory().deref())
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+                if !self.process_queue(&mut queue)? {
+                    break;
+                }
+            }
+        } else {
+            self.process_queue(&mut queue)?;
+        }
+
+        Ok(false)
+    }
+}
+
+pub fn start_virtiofs_daemon<FS, P>(fs: FS, socket: P) -> io::Result<()>
+where
+    FS: FileSystem + Send + Sync + 'static,
+    P: AsRef<Path>,
+{
+    let guest_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new());
+
+    let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
+
+    let backend = Arc::new(RwLock::new(VhostUserFsBackend {
+        server,
+        guest_mem: guest_mem.clone(),
+        event_idx: false,
+        cache_req: None,
+    }));
+
+    let listener = Listener::new(socket, true).unwrap();
+
+    let mut fs_daemon =
+        VhostUserDaemon::new(String::from("vhost-user-fs-tvix-store"), backend, guest_mem)
+            .map_err(|_| Error::NewDaemon)?;
+
+    fs_daemon.start(listener).map_err(|_| Error::StartDaemon)?;
+
+    fs_daemon.wait().map_err(|_| Error::WaitDaemon)?;
+
+    Ok(())
+}