-
Notifications
You must be signed in to change notification settings - Fork 72
Open
Description
I would find it useful if inotify integrated with asyncio.
Here is a quick example of what would be involved. Although I think an actual implementation should be done quite differently.
import asyncio
import os
import sys
from inotify.adapters import Inotify, _DEFAULT_TERMINAL_EVENTS, TerminalEventException
import inotify.constants as Iconst
class AInotify(Inotify):
def __init__(self, *args, **kws):
super().__init__(*args, **kws)
def fileno(self):
return self._Inotify__inotify_fd
async def aevent_gen(self, terminal_events=_DEFAULT_TERMINAL_EVENTS):
loop = asyncio.get_running_loop()
try:
while True:
ready = loop.create_future()
# replaces previous reader for this FD
loop.add_reader(self.fileno(), ready.set_result, None)
await ready
# _handle_inotify_event() calls read(fd) exactly once,
# so we can get away with not setting FD to non-blocking
for (header, type_names, path, filename) in self._handle_inotify_event(self.fileno()):
e = (header, type_names, path, filename)
for type_name in type_names:
if type_name in terminal_events:
raise TerminalEventException(type_name, e)
yield e
finally:
loop.remove_reader(self.fileno())rsvargas
Metadata
Metadata
Assignees
Labels
No labels