Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ Redis Timers can be configured using environment variables:
- `TIMERS_HANDLING_SLEEP`: Base sleep time between timer processing cycles (default: 0.05 seconds)
- `TIMERS_HANDLING_JITTER_MIN_VALUE`: Minimum jitter multiplier (default: 0.5)
- `TIMERS_HANDLING_JITTER_MAX_VALUE`: Maximum jitter multiplier (default: 2.0)
- `TIMERS_CONCURRENT_PROCESSING_LIMIT`: Maximum number of timers processed concurrently (default: 5)
- `TIMERS_SEPARATOR`: Separator used between topic and timer ID (default: "--")

## How It Works
Expand Down
1 change: 1 addition & 0 deletions redis_timers/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
TIMERS_HANDLING_SLEEP: float = float(os.getenv("TIMERS_HANDLING_SLEEP", "0.05"))
TIMERS_HANDLING_JITTER_MIN_VALUE: float = float(os.getenv("TIMERS_HANDLING_JITTER_MIN_VALUE", "0.5"))
TIMERS_HANDLING_JITTER_MAX_VALUE: float = float(os.getenv("TIMERS_HANDLING_JITTER_MAX_VALUE", "2.0"))
TIMERS_CONCURRENT_PROCESSING_LIMIT: int = int(os.getenv("TIMERS_CONCURRENT_PROCESSING_LIMIT", "5"))

TIMERS_SEPARATOR = "--"
37 changes: 23 additions & 14 deletions redis_timers/timers.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,28 +82,37 @@ async def _handle_one_timer(self, timer_key: str) -> None:

await handler.handler(payload, self.context)

async def _handle_one_timer_with_lock(self, timer_key: str) -> None:
lock = consume_lock(
redis_client=self.redis_client,
key=timer_key,
)
if await lock.locked():
logger.debug(f"Timer is locked, {timer_key=}")
return

with contextlib.suppress(LockError):
async with lock:
await self._handle_one_timer(timer_key)
await self._remove_timer_by_key(timer_key)

async def handle_ready_timers(self) -> None:
ready_timers = await self.fetch_ready_timers(datetime.datetime.now(tz=TIMEZONE))
for timer_key in ready_timers:
lock = consume_lock(
redis_client=self.redis_client,
key=timer_key,
)
if await lock.locked():
logger.debug(f"Timer is locked, {timer_key=}")
continue
ready_timers = await self.fetch_ready_timers(timestamp=datetime.datetime.now(tz=TIMEZONE))
tasks_number = 0
async with asyncio.TaskGroup() as tg:
for timer_key in ready_timers:
tasks_number += 1
if tasks_number > settings.TIMERS_CONCURRENT_PROCESSING_LIMIT:
break

with contextlib.suppress(LockError):
async with lock:
await self._handle_one_timer(timer_key)
await self._remove_timer_by_key(timer_key)
tg.create_task(self._handle_one_timer_with_lock(timer_key=timer_key))

async def run_forever(self) -> None: # pragma: no cover
while True:
try:
await self.handle_ready_timers()
except aioredis.RedisError:
logger.exception("Timer haven't been consumed because of Redis error") # pragma: no cover
logger.exception("Timer haven't been consumed because of Redis error")
await asyncio.sleep(
settings.TIMERS_HANDLING_SLEEP
* random.uniform(settings.TIMERS_HANDLING_JITTER_MIN_VALUE, settings.TIMERS_HANDLING_JITTER_MAX_VALUE), # noqa: S311
Expand Down
27 changes: 27 additions & 0 deletions tests/test_timers.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,33 @@ async def test_handle_ready_timers(timers_instance: Timers, handler_results: Han
assert not payloads_dict


async def test_handle_ready_timers_with_zero_limit(timers_instance: Timers, handler_results: HandlerResults) -> None:
old_value = settings.TIMERS_CONCURRENT_PROCESSING_LIMIT
settings.TIMERS_CONCURRENT_PROCESSING_LIMIT = 0

try:
payload = SomePayloadModel(message="ready_timer", count=42)
await timers_instance.set_timer(
topic="some_topic",
timer_id="ready_timer_1",
payload=payload,
activation_period=datetime.timedelta(seconds=0), # Ready immediately
)

# Handle ready timers
await timers_instance.handle_ready_timers()

# Check that the handler was not called
assert len(handler_results.results) == 0

# Check that timer was not removed from Redis
timeline_keys, payloads_dict = await timers_instance.fetch_all_timers()
assert timeline_keys
assert payloads_dict
finally:
settings.TIMERS_CONCURRENT_PROCESSING_LIMIT = old_value


async def test_handle_ready_timer_no_handler(timers_instance: Timers, caplog: pytest.LogCaptureFixture) -> None:
caplog.set_level(logging.INFO)
payload = SomePayloadModel(message="ready_timer", count=42)
Expand Down
Loading