Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 2 additions & 19 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
import alembic.command as alembic_command
import alembic.config as alembic_config
import alembic.migration as alembic_migration
import redis
from calypsso import get_calypsso_app
from fastapi import FastAPI, HTTPException, Request, Response, status
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.routing import APIRoute
from redis import Redis
from sqlalchemy.engine import Connection, Engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
Expand All @@ -45,7 +43,6 @@
from app.types.exceptions import (
ContentHTTPException,
GoogleAPIInvalidCredentialsError,
MultipleWorkersWithoutRedisInitializationError,
)
from app.types.sqlalchemy import Base
from app.utils import initialization
Expand All @@ -54,7 +51,7 @@
from app.utils.state import LifespanState

if TYPE_CHECKING:
import redis
from redis import Redis

from app.types.factory import Factory

Expand Down Expand Up @@ -509,22 +506,12 @@ async def init_lifespan(
get_redis_client,
)()

# Initialization steps should only be run once across all workers
# We use Redis locks to ensure that the initialization steps are only run once
number_of_workers = initialization.get_number_of_workers()
if number_of_workers > 1 and not isinstance(
redis_client,
Redis,
):
raise MultipleWorkersWithoutRedisInitializationError

# We need to run the database initialization only once across all the workers
# Other workers have to wait for the db to be initialized
await initialization.use_lock_for_workers(
init_db,
"init_db",
redis_client,
number_of_workers,
hyperion_error_logger,
unlock_key="db_initialized",
settings=settings,
Expand All @@ -536,7 +523,6 @@ async def init_lifespan(
test_configuration,
"test_configuration",
redis_client,
number_of_workers,
hyperion_error_logger,
settings=settings,
hyperion_error_logger=hyperion_error_logger,
Expand All @@ -555,7 +541,6 @@ async def init_lifespan(
run_factories,
"run_factories",
redis_client,
number_of_workers,
hyperion_error_logger,
db=db,
settings=settings,
Expand All @@ -566,7 +551,6 @@ async def init_lifespan(
init_google_API,
"init_google_API",
redis_client,
number_of_workers,
hyperion_error_logger,
db=db,
settings=settings,
Expand All @@ -581,7 +565,6 @@ async def init_lifespan(
initialize_notification_topics,
"initialize_notification_topics",
redis_client,
number_of_workers,
hyperion_error_logger,
db=db,
hyperion_error_logger=hyperion_error_logger,
Expand Down Expand Up @@ -679,7 +662,7 @@ async def logging_middleware(
port = request.client.port
client_address = f"{ip_address}:{port}"

redis_client: redis.Redis | None = get_redis_client_dependency()
redis_client: Redis | None = get_redis_client_dependency()

# We test the ip address with the redis limiter
process = True
Expand Down
7 changes: 0 additions & 7 deletions app/types/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@
from app.core.payment.types_payment import HelloAssoConfigName


class MultipleWorkersWithoutRedisInitializationError(Exception):
def __init__(self):
super().__init__(
"Initialization steps could not be run with multiple workers as no Redis client were configured",
)


class InvalidAppStateTypeError(Exception):
def __init__(self):
super().__init__(
Expand Down
26 changes: 1 addition & 25 deletions app/utils/initialization.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import asyncio
import logging
import os
from collections.abc import Callable
from typing import ParamSpec, TypeVar

import psutil
import redis
from pydantic import ValidationError
from sqlalchemy import Connection, MetaData, delete, select
Expand Down Expand Up @@ -296,7 +294,6 @@ async def use_lock_for_workers(
job_function: Callable[P, R],
key: str,
redis_client: redis.Redis | None,
number_of_workers: int,
logger: logging.Logger,
unlock_key: str | None = None,
*args: P.args,
Expand All @@ -312,17 +309,9 @@ async def use_lock_for_workers(
We assume that the function execution won't take more than 20 seconds.

If the Redis client is not provided, the function will execute `job_function` directly without acquiring a lock.

If `number_of_workers` is less than or equal to 1, the function will execute `job_function` directly without acquiring a lock.
"""

if (
not isinstance(
redis_client,
redis.Redis,
)
or number_of_workers <= 1
):
if not isinstance(redis_client, redis.Redis):
# If a Redis is not provided, we execute the function directly
await execute_async_or_sync_method(job_function, *args, **kwargs)

Expand All @@ -349,16 +338,3 @@ async def use_lock_for_workers(
while redis_client.get(unlock_key) is None:
logger.debug(f"Waiting for {job_function.__name__} to finish")
await asyncio.sleep(1)


def get_number_of_workers() -> int:
"""
Get the number of active Hyperion workers
"""
# We use the parent process to get the workers
parent_pid = os.getppid() # PID du parent (FastAPI master process)
parent_process = psutil.Process(parent_pid)
workers = [
p for p in parent_process.children() if p.status() != psutil.STATUS_ZOMBIE
]
return len(workers)
3 changes: 1 addition & 2 deletions requirements-common.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ icalendar==5.0.13
jellyfish==1.2.1 # String Matching
Jinja2==3.1.6 # template engine for html files
phonenumbers==8.13.43 # Used for phone number validation
psutil==7.0.0 # psutil is used to determine the number of Hyperion workers
pydantic-settings==2.3.4
pydantic==2.12.5
pyjwt[crypto]==2.8.0 # generate and verify the JWT tokens, imported as `jwt`
Expand All @@ -32,4 +31,4 @@ unidecode==1.3.8
uvicorn[standard]==0.30.6
weasyprint==65.1 # HTML to PDF converter
xlsxwriter==3.2.0
psycopg[binary]==3.2.13 # PostgreSQL adapter for synchronous operations at startup (database initializations & migrations), local installation is recommended for a production site
psycopg[binary]==3.2.13 # PostgreSQL adapter for synchronous operations at startup (database initializations & migrations)
Loading