From 2fdd43e3d468fc4068801871d67a198bbab8b436 Mon Sep 17 00:00:00 2001 From: Vasu Date: Fri, 12 Dec 2025 03:23:05 +0000 Subject: [PATCH 1/6] Handle server shutdown gracefully to prevent traceback spam - Add ServerShutdownError exception for graceful shutdown handling - Detect server shutdown via health check and set _server_shutting_down flag - Convert ServerDisconnectedError to ServerShutdownError during shutdown - Catch ServerShutdownError in runner and log at debug level (no traceback) Signed-off-by: Vasu --- agentlightning/runner/agent.py | 32 +++++++++++++++++++++++---- agentlightning/store/client_server.py | 30 +++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/agentlightning/runner/agent.py b/agentlightning/runner/agent.py index 26e45b032..a72193529 100644 --- a/agentlightning/runner/agent.py +++ b/agentlightning/runner/agent.py @@ -33,6 +33,7 @@ from agentlightning.litagent import LitAgent from agentlightning.reward import emit_reward, find_final_reward from agentlightning.store.base import LightningStore +from agentlightning.store.client_server import ServerShutdownError from agentlightning.tracer.agentops import AgentOpsTracer from agentlightning.tracer.base import Tracer from agentlightning.types import ( @@ -289,7 +290,14 @@ async def _post_process_rollout_result( # This will NOT emit another span to the tracer reward_span = emit_reward(raw_result, propagate=False) # We add it to the store manually - await store.add_otel_span(rollout.rollout_id, rollout.attempt.attempt_id, reward_span) + try: + await store.add_otel_span(rollout.rollout_id, rollout.attempt.attempt_id, reward_span) + except ServerShutdownError: + # Server is shutting down - handle gracefully without traceback + logger.debug( + f"{self._log_prefix(rollout.rollout_id)} Server is shutting down. " + "Skipping add_otel_span for reward span." + ) trace_spans.append(reward_span) if isinstance(raw_result, list): @@ -304,9 +312,16 @@ async def _post_process_rollout_result( self._tracer, AgentOpsTracer ): # TODO: this should be replaced with general OpenTelemetry tracer in next version for span in raw_result: - await store.add_otel_span( - rollout.rollout_id, rollout.attempt.attempt_id, cast(ReadableSpan, span) - ) + try: + await store.add_otel_span( + rollout.rollout_id, rollout.attempt.attempt_id, cast(ReadableSpan, span) + ) + except ServerShutdownError: + # Server is shutting down - handle gracefully without traceback + logger.debug( + f"{self._log_prefix(rollout.rollout_id)} Server is shutting down. " + f"Skipping add_otel_span for span: {span.name}" + ) else: logger.warning( f"{self._log_prefix(rollout.rollout_id)} Tracer is already an OpenTelemetry tracer. " @@ -528,6 +543,9 @@ async def _step_impl(self, next_rollout: AttemptedRollout, raise_on_exception: b await store.update_attempt(rollout_id, next_rollout.attempt.attempt_id, status="failed") else: await store.update_attempt(rollout_id, next_rollout.attempt.attempt_id, status="succeeded") + except ServerShutdownError: + # Server is shutting down - handle gracefully without traceback + logger.debug(f"{self._log_prefix(rollout_id)} Server is shutting down. " "Skipping update_attempt.") except Exception: logger.exception( f"{self._log_prefix(rollout_id)} Exception during update_attempt. Giving up the update." @@ -582,6 +600,12 @@ async def iter(self, *, event: Optional[ExecutionEvent] = None) -> None: await store.update_attempt( next_rollout.rollout_id, next_rollout.attempt.attempt_id, worker_id=self.get_worker_id() ) + except ServerShutdownError: + # Server is shutting down - handle gracefully without traceback + logger.debug( + f"{self._log_prefix()} Server is shutting down. " "Skipping update_attempt for rollout claim." + ) + continue except Exception: # This exception could happen if the rollout is dequeued and the other end died for some reason logger.exception(f"{self._log_prefix()} Exception during update_attempt, giving up the rollout.") diff --git a/agentlightning/store/client_server.py b/agentlightning/store/client_server.py index 251972dc2..5c746e596 100644 --- a/agentlightning/store/client_server.py +++ b/agentlightning/store/client_server.py @@ -75,6 +75,15 @@ T_model = TypeVar("T_model", bound=BaseModel) +class ServerShutdownError(Exception): + """Raised when the server is shutting down and requests cannot be completed. + + This exception is raised instead of ServerDisconnectedError when we detect + that the server is permanently unavailable (e.g., during graceful shutdown). + Callers should handle this gracefully without dumping full tracebacks. + """ + + class RolloutRequest(BaseModel): input: TaskInput mode: Optional[Literal["train", "val", "test"]] = None @@ -1238,6 +1247,9 @@ def __init__( self._dequeue_was_successful: bool = False self._dequeue_first_unsuccessful: bool = True + # Track server shutdown state to handle errors gracefully + self._server_shutting_down: bool = False + @property def capabilities(self) -> LightningStoreCapabilities: """Return the capabilities of the store.""" @@ -1287,6 +1299,7 @@ def __setstate__(self, state: Dict[str, Any]): self._connection_timeout = state["_connection_timeout"] self._dequeue_was_successful = False self._dequeue_first_unsuccessful = True + self._server_shutting_down = False async def _get_session(self) -> aiohttp.ClientSession: # In the proxy process, FastAPI middleware calls @@ -1324,6 +1337,7 @@ async def _wait_until_healthy(self, session: aiohttp.ClientSession) -> bool: """ Probe the server's /health until it responds 200 or retries are exhausted. Returns True if healthy, False otherwise. + When this returns False, it indicates the server is shutting down or permanently unavailable. """ if not self._health_retry_delays: client_logger.info("No health retry delays configured; skipping health checks.") @@ -1342,9 +1356,12 @@ async def _wait_until_healthy(self, session: aiohttp.ClientSession) -> bool: client_logger.warning(f"Server is not healthy yet. Retrying in {delay} seconds.") if delay > 0.0: await asyncio.sleep(delay) - client_logger.error( - f"Server is not healthy at {self.server_address}/health after {len(self._health_retry_delays)} retry attempts" + client_logger.warning( + f"Server is not healthy at {self.server_address}/health after {len(self._health_retry_delays)} retry attempts. " + "Server appears to be shutting down." ) + # Mark server as shutting down to handle subsequent errors gracefully + self._server_shutting_down = True return False async def _request_json( @@ -1405,6 +1422,15 @@ async def _request_json( last_exc = net_exc client_logger.info(f"Network/session issue will be retried. Retrying the request {method}: {path}") if not await self._wait_until_healthy(session): + # Server is shutting down - handle ServerDisconnectedError gracefully + if isinstance(net_exc, aiohttp.ServerDisconnectedError) and self._server_shutting_down: + client_logger.debug( + f"Server is shutting down. Suppressing ServerDisconnectedError for {method}: {path}" + ) + # Raise a specific exception that callers can catch and handle gracefully + raise ServerShutdownError( + f"Server is shutting down. Request {method}: {path} cannot be completed." + ) from net_exc break # server is not healthy, do not retry # exhausted retries From 11566517c64309026e61ef27d2fd79969e41c360 Mon Sep 17 00:00:00 2001 From: Vasu Date: Fri, 12 Dec 2025 03:48:05 +0000 Subject: [PATCH 2/6] Address Copilot suggestions: thread safety, flag reset, and string formatting Signed-off-by: Vasu --- agentlightning/runner/agent.py | 4 ++-- agentlightning/store/client_server.py | 14 ++++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/agentlightning/runner/agent.py b/agentlightning/runner/agent.py index a72193529..6a4c6d73b 100644 --- a/agentlightning/runner/agent.py +++ b/agentlightning/runner/agent.py @@ -545,7 +545,7 @@ async def _step_impl(self, next_rollout: AttemptedRollout, raise_on_exception: b await store.update_attempt(rollout_id, next_rollout.attempt.attempt_id, status="succeeded") except ServerShutdownError: # Server is shutting down - handle gracefully without traceback - logger.debug(f"{self._log_prefix(rollout_id)} Server is shutting down. " "Skipping update_attempt.") + logger.debug(f"{self._log_prefix(rollout_id)} Server is shutting down. Skipping update_attempt.") except Exception: logger.exception( f"{self._log_prefix(rollout_id)} Exception during update_attempt. Giving up the update." @@ -603,7 +603,7 @@ async def iter(self, *, event: Optional[ExecutionEvent] = None) -> None: except ServerShutdownError: # Server is shutting down - handle gracefully without traceback logger.debug( - f"{self._log_prefix()} Server is shutting down. " "Skipping update_attempt for rollout claim." + f"{self._log_prefix()} Server is shutting down. Skipping update_attempt for rollout claim." ) continue except Exception: diff --git a/agentlightning/store/client_server.py b/agentlightning/store/client_server.py index 5c746e596..1580c3c1b 100644 --- a/agentlightning/store/client_server.py +++ b/agentlightning/store/client_server.py @@ -1349,6 +1349,9 @@ async def _wait_until_healthy(self, session: aiohttp.ClientSession) -> bool: async with session.get(f"{self.server_address}/health") as r: if r.status == 200: client_logger.info(f"Server is healthy at {self.server_address}/health") + # Reset shutdown flag if server recovers + with self._lock: + self._server_shutting_down = False return True except Exception: # swallow and retry @@ -1361,7 +1364,8 @@ async def _wait_until_healthy(self, session: aiohttp.ClientSession) -> bool: "Server appears to be shutting down." ) # Mark server as shutting down to handle subsequent errors gracefully - self._server_shutting_down = True + with self._lock: + self._server_shutting_down = True return False async def _request_json( @@ -1422,10 +1426,12 @@ async def _request_json( last_exc = net_exc client_logger.info(f"Network/session issue will be retried. Retrying the request {method}: {path}") if not await self._wait_until_healthy(session): - # Server is shutting down - handle ServerDisconnectedError gracefully - if isinstance(net_exc, aiohttp.ServerDisconnectedError) and self._server_shutting_down: + # Server is shutting down - handle all network errors gracefully + with self._lock: + is_shutting_down = self._server_shutting_down + if is_shutting_down: client_logger.debug( - f"Server is shutting down. Suppressing ServerDisconnectedError for {method}: {path}" + f"Server is shutting down. Suppressing {type(net_exc).__name__} for {method}: {path}" ) # Raise a specific exception that callers can catch and handle gracefully raise ServerShutdownError( From fb928f64ea5be54e4491f74f2cbc85fcec5106ed Mon Sep 17 00:00:00 2001 From: Vasu Date: Fri, 12 Dec 2025 04:34:51 +0000 Subject: [PATCH 3/6] Add CancelledError handling for graceful server shutdown Signed-off-by: Vasu --- agentlightning/runner/agent.py | 26 ++++---- agentlightning/store/client_server.py | 86 +++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 12 deletions(-) diff --git a/agentlightning/runner/agent.py b/agentlightning/runner/agent.py index 6a4c6d73b..644dbb1f7 100644 --- a/agentlightning/runner/agent.py +++ b/agentlightning/runner/agent.py @@ -292,10 +292,10 @@ async def _post_process_rollout_result( # We add it to the store manually try: await store.add_otel_span(rollout.rollout_id, rollout.attempt.attempt_id, reward_span) - except ServerShutdownError: - # Server is shutting down - handle gracefully without traceback + except (ServerShutdownError, asyncio.CancelledError): + # Server is shutting down or request was cancelled - handle gracefully without traceback logger.debug( - f"{self._log_prefix(rollout.rollout_id)} Server is shutting down. " + f"{self._log_prefix(rollout.rollout_id)} Server is shutting down or request cancelled. " "Skipping add_otel_span for reward span." ) trace_spans.append(reward_span) @@ -316,10 +316,10 @@ async def _post_process_rollout_result( await store.add_otel_span( rollout.rollout_id, rollout.attempt.attempt_id, cast(ReadableSpan, span) ) - except ServerShutdownError: - # Server is shutting down - handle gracefully without traceback + except (ServerShutdownError, asyncio.CancelledError): + # Server is shutting down or request was cancelled - handle gracefully without traceback logger.debug( - f"{self._log_prefix(rollout.rollout_id)} Server is shutting down. " + f"{self._log_prefix(rollout.rollout_id)} Server is shutting down or request cancelled. " f"Skipping add_otel_span for span: {span.name}" ) else: @@ -543,9 +543,11 @@ async def _step_impl(self, next_rollout: AttemptedRollout, raise_on_exception: b await store.update_attempt(rollout_id, next_rollout.attempt.attempt_id, status="failed") else: await store.update_attempt(rollout_id, next_rollout.attempt.attempt_id, status="succeeded") - except ServerShutdownError: - # Server is shutting down - handle gracefully without traceback - logger.debug(f"{self._log_prefix(rollout_id)} Server is shutting down. Skipping update_attempt.") + except (ServerShutdownError, asyncio.CancelledError): + # Server is shutting down or request was cancelled - handle gracefully without traceback + logger.debug( + f"{self._log_prefix(rollout_id)} Server is shutting down or request cancelled. Skipping update_attempt." + ) except Exception: logger.exception( f"{self._log_prefix(rollout_id)} Exception during update_attempt. Giving up the update." @@ -600,10 +602,10 @@ async def iter(self, *, event: Optional[ExecutionEvent] = None) -> None: await store.update_attempt( next_rollout.rollout_id, next_rollout.attempt.attempt_id, worker_id=self.get_worker_id() ) - except ServerShutdownError: - # Server is shutting down - handle gracefully without traceback + except (ServerShutdownError, asyncio.CancelledError): + # Server is shutting down or request was cancelled - handle gracefully without traceback logger.debug( - f"{self._log_prefix()} Server is shutting down. Skipping update_attempt for rollout claim." + f"{self._log_prefix()} Server is shutting down or request cancelled. Skipping update_attempt for rollout claim." ) continue except Exception: diff --git a/agentlightning/store/client_server.py b/agentlightning/store/client_server.py index 1580c3c1b..fe1d4044c 100644 --- a/agentlightning/store/client_server.py +++ b/agentlightning/store/client_server.py @@ -1438,9 +1438,95 @@ async def _request_json( f"Server is shutting down. Request {method}: {path} cannot be completed." ) from net_exc break # server is not healthy, do not retry + except asyncio.CancelledError as cancel_exc: + # Cancellation can occur during async operations, especially during shutdown + client_logger.debug(f"Request cancelled: {method}: {path}", exc_info=True) + # Check if server is shutting down - if so, convert to ServerShutdownError + # Also check health to see if server is actually down (might not have set flag yet) + with self._lock: + is_shutting_down = self._server_shutting_down + + # If flag is already set, convert immediately + if is_shutting_down: + client_logger.debug( + f"Server is shutting down. Converting CancelledError to ServerShutdownError for {method}: {path}" + ) + raise ServerShutdownError( + f"Server is shutting down. Request {method}: {path} was cancelled." + ) from cancel_exc + + # Flag not set - check health, but handle cancellation of health check itself + # Cancellation during network operations usually indicates server shutdown or connection loss + # Be conservative: only re-raise if we can definitively prove server is healthy + try: + # Use a timeout to prevent health check from hanging + is_healthy = await asyncio.wait_for( + self._wait_until_healthy(session), timeout=1.0 # Quick health check with timeout + ) + if not is_healthy: + # Health check failed - server is down + with self._lock: + self._server_shutting_down = True + client_logger.debug( + f"Server is shutting down. Converting CancelledError to ServerShutdownError for {method}: {path}" + ) + raise ServerShutdownError( + f"Server is shutting down. Request {method}: {path} was cancelled." + ) from cancel_exc + # Server is healthy - this might be a genuine cancellation + # However, during shutdown, health check might succeed briefly before server dies + # Be conservative: if CancelledError occurs during network op, assume shutdown + # unless we're very confident it's a genuine cancellation + # For now, convert to ServerShutdownError to be safe (caller can handle it gracefully) + client_logger.debug( + f"CancelledError during network operation. Converting to ServerShutdownError for safety: {method}: {path}" + ) + raise ServerShutdownError( + f"Request {method}: {path} was cancelled (likely due to server shutdown)." + ) from cancel_exc + except asyncio.TimeoutError: + # Health check timed out - server is likely down + with self._lock: + self._server_shutting_down = True + client_logger.debug( + f"Health check timed out. Converting CancelledError to ServerShutdownError for {method}: {path}" + ) + raise ServerShutdownError( + f"Server is shutting down. Request {method}: {path} was cancelled." + ) from cancel_exc + except asyncio.CancelledError: + # Health check itself was cancelled - this strongly suggests server shutdown + # Convert to ServerShutdownError to be safe + with self._lock: + self._server_shutting_down = True + client_logger.debug( + f"Health check was cancelled. Converting CancelledError to ServerShutdownError for {method}: {path}" + ) + raise ServerShutdownError( + f"Server is shutting down. Request {method}: {path} was cancelled." + ) from cancel_exc # exhausted retries assert last_exc is not None + # Before raising, check if it's a network exception and server is shutting down + if isinstance( + last_exc, + ( + aiohttp.ServerDisconnectedError, + aiohttp.ClientConnectorError, + aiohttp.ClientOSError, + asyncio.TimeoutError, + ), + ): + with self._lock: + is_shutting_down = self._server_shutting_down + if is_shutting_down: + client_logger.debug( + f"Server is shutting down. Converting {type(last_exc).__name__} to ServerShutdownError for {method}: {path}" + ) + raise ServerShutdownError( + f"Server is shutting down. Request {method}: {path} cannot be completed." + ) from last_exc raise last_exc async def close(self): From 41948fad04cf0d272da2751dbeb4419946e03b6d Mon Sep 17 00:00:00 2001 From: Vasu Date: Fri, 12 Dec 2025 04:38:47 +0000 Subject: [PATCH 4/6] Signed-off-by: Vasu --- agentlightning/store/client_server.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/agentlightning/store/client_server.py b/agentlightning/store/client_server.py index fe1d4044c..8e3329a15 100644 --- a/agentlightning/store/client_server.py +++ b/agentlightning/store/client_server.py @@ -1473,11 +1473,6 @@ async def _request_json( raise ServerShutdownError( f"Server is shutting down. Request {method}: {path} was cancelled." ) from cancel_exc - # Server is healthy - this might be a genuine cancellation - # However, during shutdown, health check might succeed briefly before server dies - # Be conservative: if CancelledError occurs during network op, assume shutdown - # unless we're very confident it's a genuine cancellation - # For now, convert to ServerShutdownError to be safe (caller can handle it gracefully) client_logger.debug( f"CancelledError during network operation. Converting to ServerShutdownError for safety: {method}: {path}" ) From 397a34c4bccaacacdbed70b61ab51867b99931c9 Mon Sep 17 00:00:00 2001 From: Vasu Date: Fri, 12 Dec 2025 04:41:53 +0000 Subject: [PATCH 5/6] Signed-off-by: Vasu --- agentlightning/store/client_server.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/agentlightning/store/client_server.py b/agentlightning/store/client_server.py index 8e3329a15..1db73ce47 100644 --- a/agentlightning/store/client_server.py +++ b/agentlightning/store/client_server.py @@ -1455,9 +1455,6 @@ async def _request_json( f"Server is shutting down. Request {method}: {path} was cancelled." ) from cancel_exc - # Flag not set - check health, but handle cancellation of health check itself - # Cancellation during network operations usually indicates server shutdown or connection loss - # Be conservative: only re-raise if we can definitively prove server is healthy try: # Use a timeout to prevent health check from hanging is_healthy = await asyncio.wait_for( From ec5b0e449522f44e359098379e1dd204ed46ae5b Mon Sep 17 00:00:00 2001 From: Vasu Date: Sun, 21 Dec 2025 18:44:23 +0000 Subject: [PATCH 6/6] Handle server shutdown errors gracefully using shared status flag --- agentlightning/execution/client_server.py | 14 +- agentlightning/runner/agent.py | 34 +--- agentlightning/store/client_server.py | 215 ++++++++++------------ 3 files changed, 115 insertions(+), 148 deletions(-) diff --git a/agentlightning/execution/client_server.py b/agentlightning/execution/client_server.py index 7147ced2d..eb766d3a8 100644 --- a/agentlightning/execution/client_server.py +++ b/agentlightning/execution/client_server.py @@ -124,13 +124,21 @@ def __init__( ) self.allowed_exit_codes = tuple(allowed_exit_codes) + # This flag is set to True after server launches and False before server stops + # Clients check this flag when requests fail - if server is not online, silently ignore errors + # Be mindful of performance: all processes need to synchronously read this flag + ctx = multiprocessing.get_context() + self._server_online = ctx.Value("b", False) # 'b' = signed char, False = 0 + async def _execute_algorithm( self, algorithm: AlgorithmBundle, store: LightningStore, stop_evt: ExecutionEvent ) -> None: wrapper_store: LightningStore | None = None if self.managed_store: logger.info("Starting LightningStore server on %s:%s", self.server_host, self.server_port) - wrapper_store = LightningStoreServer(store, host=self.server_host, port=self.server_port) + wrapper_store = LightningStoreServer( + store, host=self.server_host, port=self.server_port, server_online_flag=self._server_online + ) server_started = False else: wrapper_store = store @@ -169,7 +177,9 @@ async def _execute_runner( ) -> None: if self.managed_store: # If managed, we actually do not use the provided store - client_store = LightningStoreClient(f"http://{self.server_host}:{self.server_port}") + client_store = LightningStoreClient( + f"http://{self.server_host}:{self.server_port}", server_online_flag=self._server_online + ) else: client_store = store try: diff --git a/agentlightning/runner/agent.py b/agentlightning/runner/agent.py index 644dbb1f7..26e45b032 100644 --- a/agentlightning/runner/agent.py +++ b/agentlightning/runner/agent.py @@ -33,7 +33,6 @@ from agentlightning.litagent import LitAgent from agentlightning.reward import emit_reward, find_final_reward from agentlightning.store.base import LightningStore -from agentlightning.store.client_server import ServerShutdownError from agentlightning.tracer.agentops import AgentOpsTracer from agentlightning.tracer.base import Tracer from agentlightning.types import ( @@ -290,14 +289,7 @@ async def _post_process_rollout_result( # This will NOT emit another span to the tracer reward_span = emit_reward(raw_result, propagate=False) # We add it to the store manually - try: - await store.add_otel_span(rollout.rollout_id, rollout.attempt.attempt_id, reward_span) - except (ServerShutdownError, asyncio.CancelledError): - # Server is shutting down or request was cancelled - handle gracefully without traceback - logger.debug( - f"{self._log_prefix(rollout.rollout_id)} Server is shutting down or request cancelled. " - "Skipping add_otel_span for reward span." - ) + await store.add_otel_span(rollout.rollout_id, rollout.attempt.attempt_id, reward_span) trace_spans.append(reward_span) if isinstance(raw_result, list): @@ -312,16 +304,9 @@ async def _post_process_rollout_result( self._tracer, AgentOpsTracer ): # TODO: this should be replaced with general OpenTelemetry tracer in next version for span in raw_result: - try: - await store.add_otel_span( - rollout.rollout_id, rollout.attempt.attempt_id, cast(ReadableSpan, span) - ) - except (ServerShutdownError, asyncio.CancelledError): - # Server is shutting down or request was cancelled - handle gracefully without traceback - logger.debug( - f"{self._log_prefix(rollout.rollout_id)} Server is shutting down or request cancelled. " - f"Skipping add_otel_span for span: {span.name}" - ) + await store.add_otel_span( + rollout.rollout_id, rollout.attempt.attempt_id, cast(ReadableSpan, span) + ) else: logger.warning( f"{self._log_prefix(rollout.rollout_id)} Tracer is already an OpenTelemetry tracer. " @@ -543,11 +528,6 @@ async def _step_impl(self, next_rollout: AttemptedRollout, raise_on_exception: b await store.update_attempt(rollout_id, next_rollout.attempt.attempt_id, status="failed") else: await store.update_attempt(rollout_id, next_rollout.attempt.attempt_id, status="succeeded") - except (ServerShutdownError, asyncio.CancelledError): - # Server is shutting down or request was cancelled - handle gracefully without traceback - logger.debug( - f"{self._log_prefix(rollout_id)} Server is shutting down or request cancelled. Skipping update_attempt." - ) except Exception: logger.exception( f"{self._log_prefix(rollout_id)} Exception during update_attempt. Giving up the update." @@ -602,12 +582,6 @@ async def iter(self, *, event: Optional[ExecutionEvent] = None) -> None: await store.update_attempt( next_rollout.rollout_id, next_rollout.attempt.attempt_id, worker_id=self.get_worker_id() ) - except (ServerShutdownError, asyncio.CancelledError): - # Server is shutting down or request was cancelled - handle gracefully without traceback - logger.debug( - f"{self._log_prefix()} Server is shutting down or request cancelled. Skipping update_attempt for rollout claim." - ) - continue except Exception: # This exception could happen if the rollout is dequeued and the other end died for some reason logger.exception(f"{self._log_prefix()} Exception during update_attempt, giving up the rollout.") diff --git a/agentlightning/store/client_server.py b/agentlightning/store/client_server.py index 1db73ce47..3d7725e7e 100644 --- a/agentlightning/store/client_server.py +++ b/agentlightning/store/client_server.py @@ -245,6 +245,7 @@ def __init__( launcher_args: PythonServerLauncherArgs | None = None, n_workers: int = 1, prometheus: bool = False, + server_online_flag: Any = None, ): super().__init__() self.store = store @@ -292,12 +293,15 @@ def __init__( # LightningStoreServer holds a plain Python object (self.store) in one process # (the process that runs uvicorn/FastAPI). # When you multiprocessing.Process(...) and call methods on a different LightningStore instance - # (or on a copy inherited via fork), you’re mutating another process’s memory, not the server’s memory. + # (or on a copy inherited via fork), you're mutating another process's memory, not the server's memory. # So we need to track the owner process (whoever creates the server), # and only mutate the store in that process. self._owner_pid = os.getpid() self._client: Optional[LightningStoreClient] = None + # Set to True after server launches, False before server stops + self._server_online_flag = server_online_flag + @property def capabilities(self) -> LightningStoreCapabilities: """Return the capabilities of the store.""" @@ -407,6 +411,11 @@ async def start(self): end_time = time.time() server_logger.info(f"Lightning store server started in {end_time - start_time:.2f} seconds") + # Set server online flag to True after server has launched + if self._server_online_flag is not None: + with self._server_online_flag.get_lock(): + self._server_online_flag.value = True + async def run_forever(self): """Runs the FastAPI server indefinitely.""" server_logger.info( @@ -419,6 +428,11 @@ async def stop(self): You need to call this method in the same process as the server was created in. """ + # Set server online flag to False before server stops + if self._server_online_flag is not None: + with self._server_online_flag.get_lock(): + self._server_online_flag.value = False + server_logger.info("Stopping the lightning store server...") await self.server_launcher.stop() server_logger.info("Lightning store server stopped.") @@ -1229,6 +1243,7 @@ def __init__( health_retry_delays: Sequence[float] = (0.1, 0.2, 0.5), request_timeout: float = 30.0, connection_timeout: float = 5.0, + server_online_flag: Any = None, ): self.server_address_root = server_address.rstrip("/") self.server_address = self.server_address_root + API_V1_AGL_PREFIX @@ -1245,10 +1260,9 @@ def __init__( # Store whether the dequeue was successful in history self._dequeue_was_successful: bool = False - self._dequeue_first_unsuccessful: bool = True - # Track server shutdown state to handle errors gracefully - self._server_shutting_down: bool = False + # When requests fail, check this flag - if server is not online, silently ignore errors + self._server_online_flag = server_online_flag @property def capabilities(self) -> LightningStoreCapabilities: @@ -1298,8 +1312,6 @@ def __setstate__(self, state: Dict[str, Any]): self._request_timeout = state["_request_timeout"] self._connection_timeout = state["_connection_timeout"] self._dequeue_was_successful = False - self._dequeue_first_unsuccessful = True - self._server_shutting_down = False async def _get_session(self) -> aiohttp.ClientSession: # In the proxy process, FastAPI middleware calls @@ -1349,9 +1361,6 @@ async def _wait_until_healthy(self, session: aiohttp.ClientSession) -> bool: async with session.get(f"{self.server_address}/health") as r: if r.status == 200: client_logger.info(f"Server is healthy at {self.server_address}/health") - # Reset shutdown flag if server recovers - with self._lock: - self._server_shutting_down = False return True except Exception: # swallow and retry @@ -1363,9 +1372,6 @@ async def _wait_until_healthy(self, session: aiohttp.ClientSession) -> bool: f"Server is not healthy at {self.server_address}/health after {len(self._health_retry_delays)} retry attempts. " "Server appears to be shutting down." ) - # Mark server as shutting down to handle subsequent errors gracefully - with self._lock: - self._server_shutting_down = True return False async def _request_json( @@ -1426,81 +1432,36 @@ async def _request_json( last_exc = net_exc client_logger.info(f"Network/session issue will be retried. Retrying the request {method}: {path}") if not await self._wait_until_healthy(session): - # Server is shutting down - handle all network errors gracefully - with self._lock: - is_shutting_down = self._server_shutting_down - if is_shutting_down: - client_logger.debug( - f"Server is shutting down. Suppressing {type(net_exc).__name__} for {method}: {path}" - ) - # Raise a specific exception that callers can catch and handle gracefully - raise ServerShutdownError( - f"Server is shutting down. Request {method}: {path} cannot be completed." - ) from net_exc + # Check shared flag - if server is not online, silently ignore error + if self._server_online_flag is not None: + with self._server_online_flag.get_lock(): + is_online = bool(self._server_online_flag.value) + if not is_online: + client_logger.debug( + f"Server is not online (shared flag). Silently ignoring {type(net_exc).__name__} for {method}: {path}" + ) + # Silently ignore - return None to indicate failure was expected + return None break # server is not healthy, do not retry except asyncio.CancelledError as cancel_exc: # Cancellation can occur during async operations, especially during shutdown client_logger.debug(f"Request cancelled: {method}: {path}", exc_info=True) - # Check if server is shutting down - if so, convert to ServerShutdownError - # Also check health to see if server is actually down (might not have set flag yet) - with self._lock: - is_shutting_down = self._server_shutting_down - - # If flag is already set, convert immediately - if is_shutting_down: - client_logger.debug( - f"Server is shutting down. Converting CancelledError to ServerShutdownError for {method}: {path}" - ) - raise ServerShutdownError( - f"Server is shutting down. Request {method}: {path} was cancelled." - ) from cancel_exc - - try: - # Use a timeout to prevent health check from hanging - is_healthy = await asyncio.wait_for( - self._wait_until_healthy(session), timeout=1.0 # Quick health check with timeout - ) - if not is_healthy: - # Health check failed - server is down - with self._lock: - self._server_shutting_down = True + # Check shared flag - if server is not online, silently ignore error + if self._server_online_flag is not None: + with self._server_online_flag.get_lock(): + is_online = bool(self._server_online_flag.value) + if not is_online: client_logger.debug( - f"Server is shutting down. Converting CancelledError to ServerShutdownError for {method}: {path}" + f"Server is not online (shared flag). Silently ignoring CancelledError for {method}: {path}" ) - raise ServerShutdownError( - f"Server is shutting down. Request {method}: {path} was cancelled." - ) from cancel_exc - client_logger.debug( - f"CancelledError during network operation. Converting to ServerShutdownError for safety: {method}: {path}" - ) - raise ServerShutdownError( - f"Request {method}: {path} was cancelled (likely due to server shutdown)." - ) from cancel_exc - except asyncio.TimeoutError: - # Health check timed out - server is likely down - with self._lock: - self._server_shutting_down = True - client_logger.debug( - f"Health check timed out. Converting CancelledError to ServerShutdownError for {method}: {path}" - ) - raise ServerShutdownError( - f"Server is shutting down. Request {method}: {path} was cancelled." - ) from cancel_exc - except asyncio.CancelledError: - # Health check itself was cancelled - this strongly suggests server shutdown - # Convert to ServerShutdownError to be safe - with self._lock: - self._server_shutting_down = True - client_logger.debug( - f"Health check was cancelled. Converting CancelledError to ServerShutdownError for {method}: {path}" - ) - raise ServerShutdownError( - f"Server is shutting down. Request {method}: {path} was cancelled." - ) from cancel_exc + # Silently ignore - return None to indicate failure was expected + return None + # If flag not available or server is online, re-raise cancellation + raise cancel_exc # exhausted retries assert last_exc is not None - # Before raising, check if it's a network exception and server is shutting down + # Before raising, check shared flag - if server is not online, silently ignore error if isinstance( last_exc, ( @@ -1510,15 +1471,15 @@ async def _request_json( asyncio.TimeoutError, ), ): - with self._lock: - is_shutting_down = self._server_shutting_down - if is_shutting_down: - client_logger.debug( - f"Server is shutting down. Converting {type(last_exc).__name__} to ServerShutdownError for {method}: {path}" - ) - raise ServerShutdownError( - f"Server is shutting down. Request {method}: {path} cannot be completed." - ) from last_exc + if self._server_online_flag is not None: + with self._server_online_flag.get_lock(): + is_online = bool(self._server_online_flag.value) + if not is_online: + client_logger.debug( + f"Server is not online (shared flag). Silently ignoring {type(last_exc).__name__} for {method}: {path}" + ) + # Silently ignore - return None to indicate failure was expected + return None raise last_exc async def close(self): @@ -1607,10 +1568,18 @@ async def dequeue_rollout(self, worker_id: Optional[str] = None) -> Optional[Att self._dequeue_was_successful = True return AttemptedRollout.model_validate(data) if data else None except Exception as e: + # Check shared flag - if server is not online, silently ignore error + if self._server_online_flag is not None: + with self._server_online_flag.get_lock(): + is_online = bool(self._server_online_flag.value) + if not is_online: + client_logger.debug( + f"Server is not online (shared flag). Silently ignoring dequeue_rollout failure: {e}" + ) + return None + # Log warning if server was online and dequeue was successful before (transition from online to offline) if self._dequeue_was_successful: - if self._dequeue_first_unsuccessful: - client_logger.warning(f"dequeue_rollout failed with exception: {e}") - self._dequeue_first_unsuccessful = False + client_logger.warning(f"dequeue_rollout failed with exception: {e}") client_logger.debug("dequeue_rollout failed with exception. Details:", exc_info=True) # Else ignore the exception because the server is not ready yet return None @@ -1854,16 +1823,23 @@ async def add_otel_span( readable_span: ReadableSpan, sequence_id: int | None = None, ) -> Optional[Span]: - # unchanged logic, now benefits from retries inside add_span/get_next_span_sequence_id - if sequence_id is None: - sequence_id = await self.get_next_span_sequence_id(rollout_id, attempt_id) - span = Span.from_opentelemetry( - readable_span, - rollout_id=rollout_id, - attempt_id=attempt_id, - sequence_id=sequence_id, - ) - return await self.add_span(span) + try: + # unchanged logic, now benefits from retries inside add_span/get_next_span_sequence_id + if sequence_id is None: + sequence_id = await self.get_next_span_sequence_id(rollout_id, attempt_id) + span = Span.from_opentelemetry( + readable_span, + rollout_id=rollout_id, + attempt_id=attempt_id, + sequence_id=sequence_id, + ) + return await self.add_span(span) + except (ServerShutdownError, asyncio.CancelledError): + # Server is shutting down or request was cancelled - handle gracefully without traceback + client_logger.debug( + f"Server is shutting down or request cancelled. Skipping add_otel_span for rollout {rollout_id}, attempt {attempt_id}." + ) + return None async def wait_for_rollouts(self, *, rollout_ids: List[str], timeout: Optional[float] = None) -> List[Rollout]: """Wait for rollouts to complete. @@ -1970,22 +1946,29 @@ async def update_attempt( last_heartbeat_time: float | Unset = UNSET, metadata: Optional[Dict[str, Any]] | Unset = UNSET, ) -> Attempt: - payload: Dict[str, Any] = {} - if not isinstance(status, Unset): - payload["status"] = status - if not isinstance(worker_id, Unset): - payload["worker_id"] = worker_id - if not isinstance(last_heartbeat_time, Unset): - payload["last_heartbeat_time"] = last_heartbeat_time - if not isinstance(metadata, Unset): - payload["metadata"] = metadata - - data = await self._request_json( - "post", - f"/rollouts/{rollout_id}/attempts/{attempt_id}", - json=payload, - ) - return Attempt.model_validate(data) + try: + payload: Dict[str, Any] = {} + if not isinstance(status, Unset): + payload["status"] = status + if not isinstance(worker_id, Unset): + payload["worker_id"] = worker_id + if not isinstance(last_heartbeat_time, Unset): + payload["last_heartbeat_time"] = last_heartbeat_time + if not isinstance(metadata, Unset): + payload["metadata"] = metadata + + data = await self._request_json( + "post", + f"/rollouts/{rollout_id}/attempts/{attempt_id}", + json=payload, + ) + return Attempt.model_validate(data) + except (ServerShutdownError, asyncio.CancelledError): + # Server is shutting down or request was cancelled - handle gracefully without traceback + client_logger.debug( + f"Server is shutting down or request cancelled. Skipping update_attempt for rollout {rollout_id}, attempt {attempt_id}." + ) + raise async def query_workers( self,