diff --git a/README.md b/README.md index c08c401..4b74828 100644 --- a/README.md +++ b/README.md @@ -160,6 +160,7 @@ Redis Timers can be configured using environment variables: - `TIMERS_HANDLING_SLEEP`: Base sleep time between timer processing cycles (default: 0.05 seconds) - `TIMERS_HANDLING_JITTER_MIN_VALUE`: Minimum jitter multiplier (default: 0.5) - `TIMERS_HANDLING_JITTER_MAX_VALUE`: Maximum jitter multiplier (default: 2.0) +- `TIMERS_CONCURRENT_PROCESSING_LIMIT`: Maximum number of timers processed concurrently (default: 5) - `TIMERS_SEPARATOR`: Separator used between topic and timer ID (default: "--") ## How It Works diff --git a/redis_timers/settings.py b/redis_timers/settings.py index 34286e1..d4b8956 100644 --- a/redis_timers/settings.py +++ b/redis_timers/settings.py @@ -7,5 +7,6 @@ TIMERS_HANDLING_SLEEP: float = float(os.getenv("TIMERS_HANDLING_SLEEP", "0.05")) TIMERS_HANDLING_JITTER_MIN_VALUE: float = float(os.getenv("TIMERS_HANDLING_JITTER_MIN_VALUE", "0.5")) TIMERS_HANDLING_JITTER_MAX_VALUE: float = float(os.getenv("TIMERS_HANDLING_JITTER_MAX_VALUE", "2.0")) +TIMERS_CONCURRENT_PROCESSING_LIMIT: int = int(os.getenv("TIMERS_CONCURRENT_PROCESSING_LIMIT", "5")) TIMERS_SEPARATOR = "--" diff --git a/redis_timers/timers.py b/redis_timers/timers.py index 05abd21..35ec1e9 100644 --- a/redis_timers/timers.py +++ b/redis_timers/timers.py @@ -82,28 +82,37 @@ async def _handle_one_timer(self, timer_key: str) -> None: await handler.handler(payload, self.context) + async def _handle_one_timer_with_lock(self, timer_key: str) -> None: + lock = consume_lock( + redis_client=self.redis_client, + key=timer_key, + ) + if await lock.locked(): + logger.debug(f"Timer is locked, {timer_key=}") + return + + with contextlib.suppress(LockError): + async with lock: + await self._handle_one_timer(timer_key) + await self._remove_timer_by_key(timer_key) + async def handle_ready_timers(self) -> None: - ready_timers = await self.fetch_ready_timers(datetime.datetime.now(tz=TIMEZONE)) - for timer_key in ready_timers: - lock = consume_lock( - redis_client=self.redis_client, - key=timer_key, - ) - if await lock.locked(): - logger.debug(f"Timer is locked, {timer_key=}") - continue + ready_timers = await self.fetch_ready_timers(timestamp=datetime.datetime.now(tz=TIMEZONE)) + tasks_number = 0 + async with asyncio.TaskGroup() as tg: + for timer_key in ready_timers: + tasks_number += 1 + if tasks_number > settings.TIMERS_CONCURRENT_PROCESSING_LIMIT: + break - with contextlib.suppress(LockError): - async with lock: - await self._handle_one_timer(timer_key) - await self._remove_timer_by_key(timer_key) + tg.create_task(self._handle_one_timer_with_lock(timer_key=timer_key)) async def run_forever(self) -> None: # pragma: no cover while True: try: await self.handle_ready_timers() except aioredis.RedisError: - logger.exception("Timer haven't been consumed because of Redis error") # pragma: no cover + logger.exception("Timer haven't been consumed because of Redis error") await asyncio.sleep( settings.TIMERS_HANDLING_SLEEP * random.uniform(settings.TIMERS_HANDLING_JITTER_MIN_VALUE, settings.TIMERS_HANDLING_JITTER_MAX_VALUE), # noqa: S311 diff --git a/tests/test_timers.py b/tests/test_timers.py index c52cac4..5dbc4dd 100644 --- a/tests/test_timers.py +++ b/tests/test_timers.py @@ -140,6 +140,33 @@ async def test_handle_ready_timers(timers_instance: Timers, handler_results: Han assert not payloads_dict +async def test_handle_ready_timers_with_zero_limit(timers_instance: Timers, handler_results: HandlerResults) -> None: + old_value = settings.TIMERS_CONCURRENT_PROCESSING_LIMIT + settings.TIMERS_CONCURRENT_PROCESSING_LIMIT = 0 + + try: + payload = SomePayloadModel(message="ready_timer", count=42) + await timers_instance.set_timer( + topic="some_topic", + timer_id="ready_timer_1", + payload=payload, + activation_period=datetime.timedelta(seconds=0), # Ready immediately + ) + + # Handle ready timers + await timers_instance.handle_ready_timers() + + # Check that the handler was not called + assert len(handler_results.results) == 0 + + # Check that timer was not removed from Redis + timeline_keys, payloads_dict = await timers_instance.fetch_all_timers() + assert timeline_keys + assert payloads_dict + finally: + settings.TIMERS_CONCURRENT_PROCESSING_LIMIT = old_value + + async def test_handle_ready_timer_no_handler(timers_instance: Timers, caplog: pytest.LogCaptureFixture) -> None: caplog.set_level(logging.INFO) payload = SomePayloadModel(message="ready_timer", count=42)