From 69d2f9b97edf7d8d1bace074c1704eeb51f172df Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Tue, 13 May 2025 02:43:06 +0000 Subject: [PATCH 1/2] Add support for inode and dentry invalidation. Package fusekernel already had definitions of the NotifyInvalInodeOut and NotifyInvalEntryOut structures. We add a Notifier, which allows filesystem implementations to use those to invalidate kernel cache entries. By using these invalidation mechanisms, user code can still mostly use the page cache, but with some feedback from the filesystem daemon when contents have changed dynamically. --- connection.go | 29 +-- notifier.go | 132 +++++++++++++ samples/mount_notify_inval/mount.go | 41 ++++ samples/notify_inval/notify_inval.go | 218 ++++++++++++++++++++++ samples/notify_inval/notify_inval_test.go | 85 +++++++++ 5 files changed, 494 insertions(+), 11 deletions(-) create mode 100644 notifier.go create mode 100644 samples/mount_notify_inval/mount.go create mode 100644 samples/notify_inval/notify_inval.go create mode 100644 samples/notify_inval/notify_inval_test.go diff --git a/connection.go b/connection.go index 8ec027fd..728099ef 100644 --- a/connection.go +++ b/connection.go @@ -378,6 +378,23 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) { } } +// Write a buffer.OutMessage to the kernel, with writev if vectored IO is useful +// and write if not. +func (c *Connection) writeOutMessage(outMsg *buffer.OutMessage) error { + var err error + if outMsg.Sglist != nil { + if fusekernel.IsPlatformFuseT { + // writev is not atomic on macos, restrict to fuse-t platform + writeLock.Lock() + defer writeLock.Unlock() + } + _, err = writev(int(c.dev.Fd()), outMsg.Sglist) + } else { + err = c.writeMessage(outMsg.OutHeaderBytes()) + } + return err +} + // Write the supplied message to the kernel. func (c *Connection) writeMessage(msg []byte) error { // Avoid the retry loop in os.File.Write. @@ -535,17 +552,7 @@ func (c *Connection) Reply(ctx context.Context, opErr error) error { noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr) if !noResponse { - var err error - if outMsg.Sglist != nil { - if fusekernel.IsPlatformFuseT { - // writev is not atomic on macos, restrict to fuse-t platform - writeLock.Lock() - defer writeLock.Unlock() - } - _, err = writev(int(c.dev.Fd()), outMsg.Sglist) - } else { - err = c.writeMessage(outMsg.OutHeaderBytes()) - } + err := c.writeOutMessage(outMsg) if err != nil { writeErrMsg := fmt.Sprintf("writeMessage: %v %v", err, outMsg.OutHeaderBytes()) if c.errorLogger != nil { diff --git a/notifier.go b/notifier.go new file mode 100644 index 00000000..e9ea38bc --- /dev/null +++ b/notifier.go @@ -0,0 +1,132 @@ +package fuse + +import ( + "unsafe" + + "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/fuse/internal/fusekernel" +) + +// Notifier coordinates low-level notifications from the fuse daemon to the +// kernel. A Notifier may be used by the ServeOps implementation of a Server. In +// order to deliver notifications, wrap the server with NewServerWithNotifier. +type Notifier struct { + inodeInvalidations chan invalidateInodeCommand + dentryInvalidations chan invalidateEntryCommand +} + +func NewNotifier() *Notifier { + return &Notifier{ + inodeInvalidations: make(chan invalidateInodeCommand), + dentryInvalidations: make(chan invalidateEntryCommand), + } +} + +type invalidateInodeCommand struct { + inode fuseops.InodeID + offset int64 + length int64 + done chan<- error +} + +type invalidateEntryCommand struct { + parent fuseops.InodeID + name string + // If fusekernel.NotifyInvalEntryOut is updated to use its padding as flags, + // we can support the expire flag in this command as well. + done chan<- error +} + +// InvalidateInode notifies the kernel to invalidate an inode cache entry. See +// the libfuse documentation at +// https://libfuse.github.io/doxygen/fuse__lowlevel_8h.html#a9cb974af9745294ff446d11cba2422f1 +// for more details. +// +// InvalidateInode blocks until the kernel write completes, and returns the +// error from the kernel, if any. ENOSYS indicates that the kernel does not +// support inode invalidations. +func (n *Notifier) InvalidateInode(inode fuseops.InodeID, offset, length int64) error { + done := make(chan error) + n.inodeInvalidations <- invalidateInodeCommand{inode, offset, length, done} + return <-done +} + +// InvalidateEntry notifies to the kernel to invalidate a dentry cache entry. +// See the libfuse documentation at +// https://libfuse.github.io/doxygen/fuse__lowlevel_8h.html#ab14032b74b0a57a2b3155dd6ba8d6095 +// for more details. +// +// InvalidateEntry blocks until the kernel write completes, and returns the +// error from the kernel, if any. ENOSYS indicates that the kernel does not +// support dentry invalidations. +func (n *Notifier) InvalidateEntry(parent fuseops.InodeID, name string) error { + done := make(chan error) + n.dentryInvalidations <- invalidateEntryCommand{parent, name, done} + return <-done +} + +func serviceInodeInvalidation(c *Connection, inode fuseops.InodeID, offset, length int64) error { + outMsg := c.getOutMessage() + defer c.putOutMessage(outMsg) + + cmd := fusekernel.NotifyInvalInodeOut{ + Ino: uint64(inode), + Off: offset, + Len: length, + } + outMsg.Append(unsafe.Slice((*byte)(unsafe.Pointer(&cmd)), int(unsafe.Sizeof(cmd)))) + + outMsg.OutHeader().Error = fusekernel.NotifyCodeInvalInode + outMsg.OutHeader().Len = uint32(outMsg.Len()) + + return c.writeOutMessage(outMsg) +} + +func serviceEntryInval(c *Connection, parent fuseops.InodeID, name string) error { + outMsg := c.getOutMessage() + defer c.putOutMessage(outMsg) + + cmd := fusekernel.NotifyInvalEntryOut{ + Parent: uint64(parent), + Namelen: uint32(len(name)), + } + outMsg.Append(unsafe.Slice((*byte)(unsafe.Pointer(&cmd)), int(unsafe.Sizeof(cmd)))) + + // The name must be represented as a C string with a null-terminator. + outMsg.AppendString(name) + outMsg.Append([]byte{0}) + + outMsg.OutHeader().Error = fusekernel.NotifyCodeInvalEntry + outMsg.OutHeader().Len = uint32(outMsg.Len()) + return c.writeOutMessage(outMsg) +} + +func (n *Notifier) notify(c *Connection, terminate <-chan struct{}) { + for { + select { + case i := <-n.inodeInvalidations: + i.done <- serviceInodeInvalidation(c, i.inode, i.offset, i.length) + case e := <-n.dentryInvalidations: + e.done <- serviceEntryInval(c, e.parent, e.name) + case <-terminate: + return + } + } +} + +type notifierServer struct { + n *Notifier + s Server +} + +func (s *notifierServer) ServeOps(c *Connection) { + terminate := make(chan struct{}) + + go s.n.notify(c, terminate) + s.s.ServeOps(c) + close(terminate) +} + +func NewServerWithNotifier(n *Notifier, s Server) Server { + return ¬ifierServer{n, s} +} diff --git a/samples/mount_notify_inval/mount.go b/samples/mount_notify_inval/mount.go new file mode 100644 index 00000000..2a162ac2 --- /dev/null +++ b/samples/mount_notify_inval/mount.go @@ -0,0 +1,41 @@ +package main + +import ( + "context" + "flag" + "log" + "time" + + "github.com/jacobsa/fuse" + "github.com/jacobsa/fuse/samples/notify_inval" +) + +var mountPoint = flag.String("mountpoint", "", "directory to mount the filesystem") + +type ticker struct { + *time.Ticker +} + +func (t *ticker) Ticks() <-chan time.Time { + return t.Ticker.C +} + +func (t *ticker) Tocks() chan<- time.Time { return nil } + +func main() { + flag.Parse() + + if *mountPoint == "" { + log.Fatalf("--mountpoint is required") + } + + t := &ticker{time.NewTicker(time.Second)} + server := notify_inval.NewNotifyInvalFS(t) + mfs, err := fuse.Mount(*mountPoint, server, &fuse.MountConfig{}) + if err != nil { + panic(err) + } + if err := mfs.Join(context.Background()); err != nil { + panic(err) + } +} diff --git a/samples/notify_inval/notify_inval.go b/samples/notify_inval/notify_inval.go new file mode 100644 index 00000000..124471bb --- /dev/null +++ b/samples/notify_inval/notify_inval.go @@ -0,0 +1,218 @@ +package notify_inval + +import ( + "context" + "fmt" + "os" + "sync" + "syscall" + "time" + + "github.com/jacobsa/fuse" + "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/fuse/fuseutil" +) + +var timeLen = len(time.Time{}.Format(time.RFC3339)) + +// NotifyTimer may emit times on Ticks() to trigger filesystem changes. The +// fuse.Server emits the same times in the same order on Tocks(), if not nil, to +// indicate that invalidation is complete. +type NotifyTimer interface { + Ticks() <-chan time.Time + Tocks() chan<- time.Time +} + +// Create a file system with two files: +// One is empty, and its name is the current time. +// The other is named 'current_time' and always contains the current time. +// +// This filesystem is an analog to the libfuse examples here: +// https://github.com/libfuse/libfuse/blob/e75d2c54a347906478724be24bfa1df2638094cb/example/notify_inval_inode.c +// https://github.com/libfuse/libfuse/blob/e75d2c54a347906478724be24bfa1df2638094cb/example/notify_inval_entry.c +// +// Unlike package dynamicfs, this implementation does _not_ depend on direct IO. +// The invalidations allow file operations to eventually observe the changes. +func NewNotifyInvalFS(t NotifyTimer) fuse.Server { + n := fuse.NewNotifier() + fs := ¬ifyInvalInodeFS{ + notifier: n, + teardown: make(chan struct{}), + } + + ticks := t.Ticks() + tocks := t.Tocks() + go func() { + for { + select { + case t := <-ticks: + fs.mu.Lock() + oldtime := fs.currentTime + fs.currentTime = t + fs.mu.Unlock() + fs.invalidateInodes(oldtime) + if tocks != nil { + tocks <- t + } + case <-fs.teardown: + return + } + } + }() + + return fuse.NewServerWithNotifier(n, fuseutil.NewFileSystemServer(fs)) +} + +type notifyInvalInodeFS struct { + fuseutil.NotImplementedFileSystem + + notifier *fuse.Notifier + teardown chan struct{} + + mu sync.Mutex + // GUARDED_BY(mu) + currentTime time.Time +} + +const ( + currentTimeFilename = "current_time" + + rootInode fuseops.InodeID = fuseops.RootInodeID + currentTimeInode = rootInode + iota + changingFnameInode +) + +func (fs *notifyInvalInodeFS) invalidateInodes(oldTime time.Time) { + // Invalidate inode cache and dcache for both dynamic files. + if err := fs.notifier.InvalidateInode(currentTimeInode, 0, 0); err != nil { + fmt.Printf("error invalidating current_time inode %v: %v\n", currentTimeInode, err) + } + if err := fs.notifier.InvalidateEntry(rootInode, currentTimeFilename); err != nil { + fmt.Printf("error invalidating current_time entry %v for parent %v: %v\n", currentTimeFilename, rootInode, err) + } + + if err := fs.notifier.InvalidateInode(changingFnameInode, 0, 0); err != nil { + fmt.Printf("error invalidating dynamic filename inode %v: %v\n", changingFnameInode, err) + } + if err := fs.notifier.InvalidateEntry(rootInode, oldTime.Format(time.RFC3339)); err != nil { + fmt.Printf("error invalidating dynamic filename entry for parent %v: %v\n", rootInode, err) + } +} + +func (fs *notifyInvalInodeFS) fillStat(ino fuseops.InodeID, attrs *fuseops.InodeAttributes) error { + switch ino { + case rootInode: + attrs.Nlink = 1 + attrs.Mode = 0555 | os.ModeDir + case currentTimeInode: + attrs.Nlink = 1 + attrs.Mode = 0444 + attrs.Size = uint64(timeLen + 1) // with newline + case changingFnameInode: + attrs.Nlink = 1 + attrs.Mode = 0444 + default: + return fuse.ENOENT + } + return nil +} + +func (fs *notifyInvalInodeFS) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error { + if op.Parent != rootInode { + return fuse.ENOENT + } + + fs.mu.Lock() + t := fs.currentTime + fs.mu.Unlock() + + switch op.Name { + case currentTimeFilename: + op.Entry.Child = currentTimeInode + fs.fillStat(currentTimeInode, &op.Entry.Attributes) + case t.Format(time.RFC3339): + op.Entry.Child = changingFnameInode + fs.fillStat(changingFnameInode, &op.Entry.Attributes) + default: + return fuse.ENOENT + } + + distantFuture := time.Now().Add(time.Hour * 300) + op.Entry.AttributesExpiration = distantFuture + op.Entry.EntryExpiration = distantFuture + return nil +} + +func (fs *notifyInvalInodeFS) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error { + return fs.fillStat(op.Inode, &op.Attributes) +} + +func (fs *notifyInvalInodeFS) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error { + if op.Inode != rootInode { + return fuse.ENOTDIR + } + + fs.mu.Lock() + t := fs.currentTime + fs.mu.Unlock() + + if op.Offset <= 0 { + op.BytesRead += fuseutil.WriteDirent(op.Dst[op.BytesRead:], fuseutil.Dirent{ + Offset: fuseops.DirOffset(1), + Inode: currentTimeInode, + Name: currentTimeFilename, + }) + } + if op.Offset <= 1 { + op.BytesRead += fuseutil.WriteDirent(op.Dst[op.BytesRead:], fuseutil.Dirent{ + Offset: fuseops.DirOffset(2), + Inode: changingFnameInode, + Name: t.Format(time.RFC3339), + }) + } + return nil +} + +func (fs *notifyInvalInodeFS) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error { + if op.Inode == rootInode { + return syscall.EISDIR + } + if op.Inode == changingFnameInode { + // No access to the changing filename contents + return syscall.EACCES + } + if op.Inode != currentTimeInode { + // This should not happen + return fuse.EIO + } + if !op.OpenFlags.IsReadOnly() { + return syscall.EACCES + } + + // Make cache persistent even if the file is closed. This makes it easier to + // see the effects of invalidation. + op.KeepPageCache = true + + return nil +} + +func (fs *notifyInvalInodeFS) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error { + if op.Inode != currentTimeInode { + return fuse.EIO + } + + fs.mu.Lock() + t := fs.currentTime + fs.mu.Unlock() + + contents := t.Format(time.RFC3339) + "\n" + + if op.Offset < int64(len(contents)) { + op.BytesRead = copy(op.Dst, contents[op.Offset:]) + } + return nil +} + +func (fs *notifyInvalInodeFS) Destroy() { + close(fs.teardown) +} diff --git a/samples/notify_inval/notify_inval_test.go b/samples/notify_inval/notify_inval_test.go new file mode 100644 index 00000000..951c4f3f --- /dev/null +++ b/samples/notify_inval/notify_inval_test.go @@ -0,0 +1,85 @@ +package notify_inval_test + +import ( + "io/ioutil" + "os" + "path" + "testing" + "time" + + "github.com/jacobsa/fuse/fusetesting" + "github.com/jacobsa/fuse/samples" + "github.com/jacobsa/fuse/samples/notify_inval" + + . "github.com/jacobsa/ogletest" +) + +func TestNotifyInvalFS(t *testing.T) { RunTests(t) } + +func (t *NotifyInvalFSTest) setTime(tv time.Time) { + t.ticker.tickchan <- tv + t.expectedTime = <-t.ticker.tockchan +} + +func init() { + RegisterTestSuite(&NotifyInvalFSTest{}) +} + +type manualTicker struct { + tickchan chan time.Time + tockchan chan time.Time +} + +func (t *manualTicker) Ticks() <-chan time.Time { return t.tickchan } +func (t *manualTicker) Tocks() chan<- time.Time { return t.tockchan } + +type NotifyInvalFSTest struct { + samples.SampleTest + + ticker *manualTicker + expectedTime time.Time +} + +func (t *NotifyInvalFSTest) SetUp(ti *TestInfo) { + t.ticker = &manualTicker{ + tickchan: make(chan time.Time), + tockchan: make(chan time.Time), + } + t.Server = notify_inval.NewNotifyInvalFS(t.ticker) + t.SampleTest.SetUp(ti) +} + +func (t *NotifyInvalFSTest) ReadDir_Root() { + entries, err := fusetesting.ReadDirPicky(t.Dir) + AssertEq(nil, err) + AssertEq(2, len(entries)) + + var fi os.FileInfo + fi = entries[0] + ExpectEq(t.expectedTime.Format(time.RFC3339), fi.Name()) + ExpectEq(0, fi.Size()) + ExpectEq(0444, fi.Mode()) + ExpectFalse(fi.IsDir()) + + fi = entries[1] + ExpectEq("current_time", fi.Name()) + ExpectEq(len(time.Time{}.Format(time.RFC3339))+1, fi.Size()) + ExpectEq(0444, fi.Mode()) + ExpectFalse(fi.IsDir()) +} + +func (t *NotifyInvalFSTest) ObserveTimeUpdate() { + oldTime := t.expectedTime.Format(time.RFC3339) + + _, err := os.Stat(path.Join(t.Dir, oldTime)) + AssertEq(nil, err) + slice, err := ioutil.ReadFile(path.Join(t.Dir, "current_time")) + ExpectEq(oldTime+"\n", string(slice)) + + t.setTime(t.expectedTime.Add(time.Minute)) + + _, err = os.Stat(path.Join(t.Dir, oldTime)) + AssertNe(nil, err) + slice, err = ioutil.ReadFile(path.Join(t.Dir, "current_time")) + ExpectNe(oldTime+"\n", string(slice)) +} From 5b9b657f6f47549fbb9074803b8db00ff2e18d43 Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Thu, 15 May 2025 03:25:34 +0000 Subject: [PATCH 2/2] Simplify rootInode in the notify_inval sample --- samples/notify_inval/notify_inval.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/samples/notify_inval/notify_inval.go b/samples/notify_inval/notify_inval.go index 124471bb..32f52a53 100644 --- a/samples/notify_inval/notify_inval.go +++ b/samples/notify_inval/notify_inval.go @@ -77,8 +77,7 @@ type notifyInvalInodeFS struct { const ( currentTimeFilename = "current_time" - rootInode fuseops.InodeID = fuseops.RootInodeID - currentTimeInode = rootInode + iota + currentTimeInode = fuseops.RootInodeID + iota changingFnameInode ) @@ -87,21 +86,21 @@ func (fs *notifyInvalInodeFS) invalidateInodes(oldTime time.Time) { if err := fs.notifier.InvalidateInode(currentTimeInode, 0, 0); err != nil { fmt.Printf("error invalidating current_time inode %v: %v\n", currentTimeInode, err) } - if err := fs.notifier.InvalidateEntry(rootInode, currentTimeFilename); err != nil { - fmt.Printf("error invalidating current_time entry %v for parent %v: %v\n", currentTimeFilename, rootInode, err) + if err := fs.notifier.InvalidateEntry(fuseops.RootInodeID, currentTimeFilename); err != nil { + fmt.Printf("error invalidating current_time entry %v for parent %v: %v\n", currentTimeFilename, fuseops.RootInodeID, err) } if err := fs.notifier.InvalidateInode(changingFnameInode, 0, 0); err != nil { fmt.Printf("error invalidating dynamic filename inode %v: %v\n", changingFnameInode, err) } - if err := fs.notifier.InvalidateEntry(rootInode, oldTime.Format(time.RFC3339)); err != nil { - fmt.Printf("error invalidating dynamic filename entry for parent %v: %v\n", rootInode, err) + if err := fs.notifier.InvalidateEntry(fuseops.RootInodeID, oldTime.Format(time.RFC3339)); err != nil { + fmt.Printf("error invalidating dynamic filename entry for parent %v: %v\n", fuseops.RootInodeID, err) } } func (fs *notifyInvalInodeFS) fillStat(ino fuseops.InodeID, attrs *fuseops.InodeAttributes) error { switch ino { - case rootInode: + case fuseops.RootInodeID: attrs.Nlink = 1 attrs.Mode = 0555 | os.ModeDir case currentTimeInode: @@ -118,7 +117,7 @@ func (fs *notifyInvalInodeFS) fillStat(ino fuseops.InodeID, attrs *fuseops.Inode } func (fs *notifyInvalInodeFS) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error { - if op.Parent != rootInode { + if op.Parent != fuseops.RootInodeID { return fuse.ENOENT } @@ -148,7 +147,7 @@ func (fs *notifyInvalInodeFS) GetInodeAttributes(ctx context.Context, op *fuseop } func (fs *notifyInvalInodeFS) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error { - if op.Inode != rootInode { + if op.Inode != fuseops.RootInodeID { return fuse.ENOTDIR } @@ -174,7 +173,7 @@ func (fs *notifyInvalInodeFS) ReadDir(ctx context.Context, op *fuseops.ReadDirOp } func (fs *notifyInvalInodeFS) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error { - if op.Inode == rootInode { + if op.Inode == fuseops.RootInodeID { return syscall.EISDIR } if op.Inode == changingFnameInode {