1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
use std::{io, path::Path, sync::Arc, thread};
use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
use tracing::{error, instrument};
#[cfg(test)]
mod tests;
struct FuseServer<FS>
where
FS: FileSystem + Sync + Send,
{
server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
channel: fuse_backend_rs::transport::FuseChannel,
}
#[cfg(target_os = "macos")]
const BADFD: libc::c_int = libc::EBADF;
#[cfg(target_os = "linux")]
const BADFD: libc::c_int = libc::EBADFD;
impl<FS> FuseServer<FS>
where
FS: FileSystem + Sync + Send,
{
fn start(&mut self) -> io::Result<()> {
while let Some((reader, writer)) = self
.channel
.get_request()
.map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
{
if let Err(e) = self
.server
.handle_message(reader, writer.into(), None, None)
{
match e {
// This indicates the session has been shut down.
fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
break;
}
error => {
error!(?error, "failed to handle fuse request");
continue;
}
}
}
}
Ok(())
}
}
pub struct FuseDaemon {
session: FuseSession,
threads: Vec<thread::JoinHandle<()>>,
}
impl FuseDaemon {
#[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)]
pub fn new<FS, P>(
fs: FS,
mountpoint: P,
threads: usize,
allow_other: bool,
) -> Result<Self, io::Error>
where
FS: FileSystem + Sync + Send + 'static,
P: AsRef<Path> + std::fmt::Debug,
{
let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
#[cfg(target_os = "linux")]
session.set_allow_other(allow_other);
session
.mount()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let mut join_handles = Vec::with_capacity(threads);
for _ in 0..threads {
let mut server = FuseServer {
server: server.clone(),
channel: session
.new_channel()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
};
let join_handle = thread::Builder::new()
.name("fuse_server".to_string())
.spawn(move || {
let _ = server.start();
})?;
join_handles.push(join_handle);
}
Ok(FuseDaemon {
session,
threads: join_handles,
})
}
#[instrument(skip_all, err)]
pub fn unmount(&mut self) -> Result<(), io::Error> {
self.session
.umount()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
for thread in self.threads.drain(..) {
thread.join().map_err(|_| {
io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
})?;
}
Ok(())
}
}
impl Drop for FuseDaemon {
fn drop(&mut self) {
if let Err(error) = self.unmount() {
error!(?error, "failed to unmont fuse filesystem")
}
}
}
|