Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ libc = "0.2"
mesa-dev = "1.15.0"
num-traits = "0.2"
http = "1"
parking_lot = "0.12"
reqwest = { version = "0.12", default-features = false }
reqwest-middleware = "0.4"
serde_path_to_error = "0.1"
Expand Down
286 changes: 147 additions & 139 deletions src/fs/fuser.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::ffi::OsStr;
use std::future::Future;
use std::sync::Arc;

use crate::fs::r#trait::{CommonFileAttr, DirEntryType, FileAttr, Fs, LockOwner, OpenFlags};
use tracing::{debug, error, instrument};
use tracing::Instrument as _;
use tracing::{debug, error};

impl From<FileAttr> for fuser::FileAttr {
fn from(val: FileAttr) -> Self {
Expand Down Expand Up @@ -106,99 +109,101 @@ impl From<i32> for OpenFlags {
}
}

pub struct FuserAdapter<F: Fs>
const SHAMEFUL_TTL: std::time::Duration = std::time::Duration::from_secs(1);

pub struct FuserAdapter<F: Fs + Send + Sync + 'static>
where
F::LookupError: Into<i32>,
F::GetAttrError: Into<i32>,
F::OpenError: Into<i32>,
F::ReadError: Into<i32>,
F::ReaddirError: Into<i32>,
F::ReleaseError: Into<i32>,
F::LookupError: Into<i32> + Send + 'static,
F::GetAttrError: Into<i32> + Send + 'static,
F::OpenError: Into<i32> + Send + 'static,
F::ReadError: Into<i32> + Send + 'static,
F::ReaddirError: Into<i32> + Send + 'static,
F::ReleaseError: Into<i32> + Send + 'static,
{
fs: F,
fs: Arc<F>,
runtime: tokio::runtime::Handle,
}

impl<F: Fs> FuserAdapter<F>
impl<F: Fs + Send + Sync + 'static> FuserAdapter<F>
where
F::LookupError: Into<i32>,
F::GetAttrError: Into<i32>,
F::OpenError: Into<i32>,
F::ReadError: Into<i32>,
F::ReaddirError: Into<i32>,
F::ReleaseError: Into<i32>,
F::LookupError: Into<i32> + Send + 'static,
F::GetAttrError: Into<i32> + Send + 'static,
F::OpenError: Into<i32> + Send + 'static,
F::ReadError: Into<i32> + Send + 'static,
F::ReaddirError: Into<i32> + Send + 'static,
F::ReleaseError: Into<i32> + Send + 'static,
{
// TODO(markovejnovic): This low TTL is really not ideal. It slows us down a lot, since the
// kernel has to ask us for every single lookup all the time.
//
// I think a better implementation is to implement
//
// notify_inval_inode(ino, offset, len)
// notify_inval_entry(parent_ino, name)
//
// These two functions can be used to invalidate specific entries in the kernel cache when we
// know they have changed. This would allow us to set a much higher TTL here.
const SHAMEFUL_TTL: std::time::Duration = std::time::Duration::from_secs(1);

pub fn new(fs: F, runtime: tokio::runtime::Handle) -> Self {
Self { fs, runtime }
Self {
fs: Arc::new(fs),
runtime,
}
}

fn spawn<Fut>(&self, span: tracing::Span, f: impl FnOnce(Arc<F>) -> Fut + Send + 'static)
where
Fut: Future<Output = ()> + Send + 'static,
{
let fs = Arc::clone(&self.fs);
self.runtime.spawn(f(fs).instrument(span));
}
}

impl<F: Fs> fuser::Filesystem for FuserAdapter<F>
impl<F: Fs + Send + Sync + 'static> fuser::Filesystem for FuserAdapter<F>
where
F::LookupError: Into<i32>,
F::GetAttrError: Into<i32>,
F::OpenError: Into<i32>,
F::ReadError: Into<i32>,
F::ReaddirError: Into<i32>,
F::ReleaseError: Into<i32>,
F::LookupError: Into<i32> + Send + 'static,
F::GetAttrError: Into<i32> + Send + 'static,
F::OpenError: Into<i32> + Send + 'static,
F::ReadError: Into<i32> + Send + 'static,
F::ReaddirError: Into<i32> + Send + 'static,
F::ReleaseError: Into<i32> + Send + 'static,
{
#[instrument(name = "FuserAdapter::lookup", skip(self, _req, reply))]
fn lookup(
&mut self,
_req: &fuser::Request<'_>,
parent: u64,
name: &OsStr,
reply: fuser::ReplyEntry,
) {
match self.runtime.block_on(self.fs.lookup(parent, name)) {
Ok(attr) => {
// TODO(markovejnovic): Passing generation = 0 here is a recipe for disaster.
// Someone with A LOT of files will likely see inode reuse which will lead to a
// disaster.
let f_attr: fuser::FileAttr = attr.into();
debug!(?f_attr, "replying...");
reply.entry(&Self::SHAMEFUL_TTL, &f_attr, 0);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
let name = name.to_owned();
let span = tracing::debug_span!("FuserAdapter::lookup", parent, ?name);
self.spawn(span, move |fs| async move {
match fs.lookup(parent, &name).await {
Ok(attr) => {
let f_attr: fuser::FileAttr = attr.into();
debug!(?f_attr, "replying...");
reply.entry(&SHAMEFUL_TTL, &f_attr, 0);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
}
}
}
});
}

#[instrument(name = "FuserAdapter::getattr", skip(self, _req, fh, reply))]
fn getattr(
&mut self,
_req: &fuser::Request<'_>,
ino: u64,
fh: Option<u64>,
reply: fuser::ReplyAttr,
) {
match self.runtime.block_on(self.fs.getattr(ino, fh)) {
Ok(attr) => {
debug!(?attr, "replying...");
reply.attr(&Self::SHAMEFUL_TTL, &attr.into());
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
let span = tracing::debug_span!("FuserAdapter::getattr", ino);
self.spawn(span, move |fs| async move {
match fs.getattr(ino, fh).await {
Ok(attr) => {
debug!(?attr, "replying...");
reply.attr(&SHAMEFUL_TTL, &attr.into());
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
}
}
}
});
}

#[instrument(name = "FuserAdapter::readdir", skip(self, _req, _fh, offset, reply))]
fn readdir(
&mut self,
_req: &fuser::Request<'_>,
Expand All @@ -207,60 +212,62 @@ where
offset: i64,
mut reply: fuser::ReplyDirectory,
) {
let entries = match self.runtime.block_on(self.fs.readdir(ino)) {
Ok(entries) => entries,
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
return;
}
};

#[expect(
clippy::cast_possible_truncation,
reason = "fuser offset is i64 but always non-negative"
)]
for (i, entry) in entries
.iter()
.enumerate()
.skip(offset.cast_unsigned() as usize)
{
let kind: fuser::FileType = entry.kind.into();
let Ok(idx): Result<i64, _> = (i + 1).try_into() else {
error!("Directory entry index {} too large for fuser", i + 1);
reply.error(libc::EIO);
return;
let span = tracing::debug_span!("FuserAdapter::readdir", ino);
self.spawn(span, move |fs| async move {
let entries = match fs.readdir(ino).await {
Ok(entries) => entries,
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
return;
}
};

debug!(?entry, "adding entry to reply...");
if reply.add(entry.ino, idx, kind, &entry.name) {
debug!("buffer full for now, stopping readdir");
break;
#[expect(
clippy::cast_possible_truncation,
reason = "fuser offset is i64 but always non-negative"
)]
for (i, entry) in entries
.iter()
.enumerate()
.skip(offset.cast_unsigned() as usize)
{
let kind: fuser::FileType = entry.kind.into();
let Ok(idx): Result<i64, _> = (i + 1).try_into() else {
error!("Directory entry index {} too large for fuser", i + 1);
reply.error(libc::EIO);
return;
};

debug!(?entry, "adding entry to reply...");
if reply.add(entry.ino, idx, kind, &entry.name) {
debug!("buffer full for now, stopping readdir");
break;
}
}
}

debug!("finalizing reply...");
reply.ok();
debug!("finalizing reply...");
reply.ok();
});
}

#[instrument(name = "FuserAdapter::open", skip(self, _req, flags, reply))]
fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) {
match self.runtime.block_on(self.fs.open(ino, flags.into())) {
Ok(open_file) => {
debug!(handle = open_file.handle, "replying...");
reply.opened(open_file.handle, 0);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
let flags: OpenFlags = flags.into();
let span = tracing::debug_span!("FuserAdapter::open", ino);
self.spawn(span, move |fs| async move {
match fs.open(ino, flags).await {
Ok(open_file) => {
debug!(handle = open_file.handle, "replying...");
reply.opened(open_file.handle, 0);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
}
}
}
});
}

#[instrument(
name = "FuserAdapter::read",
skip(self, _req, fh, offset, size, flags, lock_owner, reply)
)]
fn read(
&mut self,
_req: &fuser::Request<'_>,
Expand All @@ -274,26 +281,24 @@ where
) {
let flags: OpenFlags = flags.into();
let lock_owner = lock_owner.map(LockOwner);
match self.runtime.block_on(self.fs.read(
ino,
fh,
offset.cast_unsigned(),
size,
flags,
lock_owner,
)) {
Ok(data) => {
debug!(read_bytes = data.len(), "replying...");
reply.data(&data);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
let span = tracing::debug_span!("FuserAdapter::read", ino);
self.spawn(span, move |fs| async move {
match fs
.read(ino, fh, offset.cast_unsigned(), size, flags, lock_owner)
.await
{
Ok(data) => {
debug!(read_bytes = data.len(), "replying...");
reply.data(&data);
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
}
}
}
});
}

#[instrument(name = "FuserAdapter::release", skip(self, _req, _lock_owner, reply))]
fn release(
&mut self,
_req: &fuser::Request<'_>,
Expand All @@ -304,30 +309,33 @@ where
flush: bool,
reply: fuser::ReplyEmpty,
) {
match self
.runtime
.block_on(self.fs.release(ino, fh, flags.into(), flush))
{
Ok(()) => {
debug!("replying ok");
reply.ok();
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
let flags: OpenFlags = flags.into();
let span = tracing::debug_span!("FuserAdapter::release", ino, fh);
self.spawn(span, move |fs| async move {
match fs.release(ino, fh, flags, flush).await {
Ok(()) => {
debug!("replying ok");
reply.ok();
}
Err(e) => {
debug!(error = %e, "replying error");
reply.error(e.into());
}
}
}
});
}

#[instrument(name = "FuserAdapter::forget", skip(self, _req, nlookup))]
fn forget(&mut self, _req: &fuser::Request<'_>, ino: u64, nlookup: u64) {
self.runtime.block_on(self.fs.forget(ino, nlookup));
let span = tracing::debug_span!("FuserAdapter::forget", ino, nlookup);
self.spawn(span, move |fs| async move {
fs.forget(ino, nlookup).await;
});
}

#[instrument(name = "FuserAdapter::statfs", skip(self, _req, _ino, reply))]
fn statfs(&mut self, _req: &fuser::Request<'_>, _ino: u64, reply: fuser::ReplyStatfs) {
self.runtime.block_on(async {
match self.fs.statfs().await {
let span = tracing::debug_span!("FuserAdapter::statfs");
self.spawn(span, move |fs| async move {
match fs.statfs().await {
Ok(statvfs) => {
debug!(?statvfs, "replying...");
reply.statfs(
Expand Down
Loading