about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/blobservice/dumb_seeker.rs12
-rw-r--r--tvix/store/src/fuse/mod.rs112
2 files changed, 92 insertions, 32 deletions
diff --git a/tvix/store/src/blobservice/dumb_seeker.rs b/tvix/store/src/blobservice/dumb_seeker.rs
index 5548ea0bd33d..8629394d21bc 100644
--- a/tvix/store/src/blobservice/dumb_seeker.rs
+++ b/tvix/store/src/blobservice/dumb_seeker.rs
@@ -80,7 +80,17 @@ impl<R: io::Read> io::Seek for DumbSeeker<R> {
                 Err(e) => return Err(e),
             }
         }
-        debug_assert_eq!(bytes_to_skip, bytes_skipped);
+
+        // This will fail when seeking past the end of self.r
+        if bytes_to_skip != bytes_skipped {
+            return Err(std::io::Error::new(
+                std::io::ErrorKind::UnexpectedEof,
+                format!(
+                    "tried to skip {} bytes, but only was able to skip {} until reaching EOF",
+                    bytes_to_skip, bytes_skipped
+                ),
+            ));
+        }
 
         self.pos = absolute_offset;
 
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs
index 8aa49c499774..dd2c479b3de5 100644
--- a/tvix/store/src/fuse/mod.rs
+++ b/tvix/store/src/fuse/mod.rs
@@ -6,7 +6,7 @@ mod inodes;
 mod tests;
 
 use crate::{
-    blobservice::BlobService,
+    blobservice::{BlobReader, BlobService},
     directoryservice::DirectoryService,
     fuse::{
         file_attr::gen_file_attr,
@@ -18,7 +18,7 @@ use crate::{
 };
 use fuser::{FileAttr, ReplyAttr, Request};
 use nix_compat::store_path::StorePath;
-use std::io::Read;
+use std::io::{self, Read, Seek};
 use std::str::FromStr;
 use std::sync::Arc;
 use std::{collections::HashMap, time::Duration};
@@ -70,6 +70,11 @@ pub struct FUSE {
 
     /// This keeps track of inodes and data alongside them.
     inode_tracker: InodeTracker,
+
+    /// This holds all open file handles
+    file_handles: HashMap<u64, Box<dyn BlobReader>>,
+
+    next_file_handle: u64,
 }
 
 impl FUSE {
@@ -85,6 +90,9 @@ impl FUSE {
 
             store_paths: HashMap::default(),
             inode_tracker: Default::default(),
+
+            file_handles: Default::default(),
+            next_file_handle: 1,
         }
     }
 
@@ -357,21 +365,10 @@ impl fuser::Filesystem for FUSE {
         }
     }
 
-    /// TODO: implement open + close?
-
-    #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset, rq.size = size))]
-    fn read(
-        &mut self,
-        _req: &Request<'_>,
-        ino: u64,
-        _fh: u64,
-        offset: i64,
-        size: u32,
-        _flags: i32,
-        _lock_owner: Option<u64>,
-        reply: fuser::ReplyData,
-    ) {
-        debug!("read");
+    #[tracing::instrument(skip_all, fields(rq.inode = ino))]
+    fn open(&mut self, _req: &Request<'_>, ino: u64, _flags: i32, reply: fuser::ReplyOpen) {
+        // get a new file handle
+        let fh = self.next_file_handle;
 
         if ino == fuser::FUSE_ROOT_ID {
             reply.error(libc::ENOSYS);
@@ -398,26 +395,79 @@ impl fuser::Filesystem for FUSE {
                         reply.error(libc::EIO);
                     }
                     Ok(Some(blob_reader)) => {
-                        let data: std::io::Result<Vec<u8>> = blob_reader
-                            .bytes()
-                            // TODO: this is obviously terrible. blobreader should implement seek.
-                            .skip(offset.try_into().unwrap())
-                            .take(size.try_into().unwrap())
-                            .collect();
-
-                        match data {
-                            Ok(data) => {
-                                // respond with the requested data
-                                reply.data(&data);
-                            }
-                            Err(e) => reply.error(e.raw_os_error().unwrap()),
-                        }
+                        self.file_handles.insert(fh, blob_reader);
+                        reply.opened(fh, 0);
+
+                        // TODO: this will overflow after 2**64 operations,
+                        // which is fine for now.
+                        // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
+                        // for the discussion on alternatives.
+                        self.next_file_handle += 1;
                     }
                 }
             }
         }
     }
 
+    #[tracing::instrument(skip_all, fields(rq.inode = ino, fh = fh))]
+    fn release(
+        &mut self,
+        _req: &Request<'_>,
+        ino: u64,
+        fh: u64,
+        _flags: i32,
+        _lock_owner: Option<u64>,
+        _flush: bool,
+        reply: fuser::ReplyEmpty,
+    ) {
+        // remove and get ownership on the blob reader
+        let blob_reader = self.file_handles.remove(&fh).unwrap();
+        // drop it, which will close it.
+        drop(blob_reader);
+
+        reply.ok();
+    }
+
+    #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset, rq.size = size))]
+    fn read(
+        &mut self,
+        _req: &Request<'_>,
+        ino: u64,
+        fh: u64,
+        offset: i64,
+        size: u32,
+        _flags: i32,
+        _lock_owner: Option<u64>,
+        reply: fuser::ReplyData,
+    ) {
+        debug!("read");
+
+        let blob_reader = self.file_handles.get_mut(&fh).unwrap();
+
+        // seek to the offset specified, which is relative to the start of the file.
+        let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64));
+        match resp {
+            Ok(pos) => {
+                debug_assert_eq!(offset as u64, pos);
+            }
+            Err(e) => {
+                warn!("failed to seek to offset {}: {}", offset, e);
+                reply.error(libc::EIO);
+                return;
+            }
+        }
+
+        // now with the blobreader seeked to this location, read size of data
+        let data: std::io::Result<Vec<u8>> =
+            blob_reader.bytes().take(size.try_into().unwrap()).collect();
+
+        match data {
+            // respond with the requested data
+            Ok(data) => reply.data(&data),
+            Err(e) => reply.error(e.raw_os_error().unwrap()),
+        }
+    }
+
     #[tracing::instrument(skip_all, fields(rq.inode = ino))]
     fn readlink(&mut self, _req: &Request<'_>, ino: u64, reply: fuser::ReplyData) {
         if ino == fuser::FUSE_ROOT_ID {