From 5d0ee4006cf9e50371a27678ab6587fa151d637f Mon Sep 17 00:00:00 2001 From: Bailey Morgan Date: Thu, 28 Apr 2022 13:46:28 -0400 Subject: [PATCH] Addition of an AsyncPipeline than consumes a normal pipeline, and generator as a data source, and provides and async generator for the pipeline output --- src/AsyncPipeline.py | 35 +++++++++++++++++++++++++++++++++++ src/__init__.py | 1 + 2 files changed, 36 insertions(+) create mode 100644 src/AsyncPipeline.py diff --git a/src/AsyncPipeline.py b/src/AsyncPipeline.py new file mode 100644 index 0000000..2ef45cd --- /dev/null +++ b/src/AsyncPipeline.py @@ -0,0 +1,35 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor + +class AsyncPipeline(object): + """ Async wrapper for a mpipe pipeline that allows for concurrent streams in + and out via generator functions """ + + def __init__(self, pipeline, data_source): + self._pipeline = pipeline + self._data_source = data_source + self._executor = ThreadPoolExecutor(2) + + async def _pipe_filler(self): + """ Async task to pass data from _data_source generator function into + pipeline """ + for item in self._data_source(): + await self._loop.run_in_executor( + self._executor, + lambda: self._pipeline.put(item) + ) + + self._pipeline.put(None) + + async def start(self): + """ Async generator method which starts the input stream into the + pipeline and yields result values from the pipeline """ + self._loop = asyncio.get_event_loop() + self._loop.create_task(self._pipe_filler()) + while True: + result = await self._loop.run_in_executor( + self._executor, + self._pipeline.get + ) + if result is None: break + yield result diff --git a/src/__init__.py b/src/__init__.py index 59b6aed..9d0f88f 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -8,5 +8,6 @@ from .OrderedStage import OrderedStage from .UnorderedStage import UnorderedStage from .Pipeline import Pipeline +from .AsyncPipeline import AsyncPipeline from .FilterWorker import FilterWorker from .FilterStage import FilterStage