diff --git a/src/AsyncPipeline.py b/src/AsyncPipeline.py new file mode 100644 index 0000000..2ef45cd --- /dev/null +++ b/src/AsyncPipeline.py @@ -0,0 +1,35 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor + +class AsyncPipeline(object): + """ Async wrapper for a mpipe pipeline that allows for concurrent streams in + and out via generator functions """ + + def __init__(self, pipeline, data_source): + self._pipeline = pipeline + self._data_source = data_source + self._executor = ThreadPoolExecutor(2) + + async def _pipe_filler(self): + """ Async task to pass data from _data_source generator function into + pipeline """ + for item in self._data_source(): + await self._loop.run_in_executor( + self._executor, + lambda: self._pipeline.put(item) + ) + + self._pipeline.put(None) + + async def start(self): + """ Async generator method which starts the input stream into the + pipeline and yields result values from the pipeline """ + self._loop = asyncio.get_event_loop() + self._loop.create_task(self._pipe_filler()) + while True: + result = await self._loop.run_in_executor( + self._executor, + self._pipeline.get + ) + if result is None: break + yield result diff --git a/src/__init__.py b/src/__init__.py index 59b6aed..9d0f88f 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -8,5 +8,6 @@ from .OrderedStage import OrderedStage from .UnorderedStage import UnorderedStage from .Pipeline import Pipeline +from .AsyncPipeline import AsyncPipeline from .FilterWorker import FilterWorker from .FilterStage import FilterStage