Skip to content

Commit a5b0060

Browse files
authored
reuse sessions to eliminate fd leak (#391)
* reuse session * remove import * cleanup
1 parent 5c14bdf commit a5b0060

File tree

2 files changed

+44
-29
lines changed

2 files changed

+44
-29
lines changed

eval_protocol/adapters/fireworks_tracing.py

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ def __init__(
264264
self.project_id = project_id
265265
self.base_url = base_url.rstrip("/")
266266
self.timeout = timeout
267+
self._session = requests.Session()
267268

268269
def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) -> List[Dict[str, Any]]:
269270
"""Fetch logs from Fireworks tracing gateway /logs endpoint.
@@ -287,14 +288,14 @@ def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) -
287288
last_error: Optional[str] = None
288289
for url in urls_to_try:
289290
try:
290-
response = requests.get(url, params=params, timeout=self.timeout, headers=headers)
291-
if response.status_code == 404:
292-
# Try next variant
293-
last_error = f"404 for {url}"
294-
continue
295-
response.raise_for_status()
296-
data = response.json() or {}
297-
break
291+
with self._session.get(url, params=params, timeout=self.timeout, headers=headers) as response:
292+
if response.status_code == 404:
293+
# Try next variant (must close response to release connection)
294+
last_error = f"404 for {url}"
295+
continue
296+
response.raise_for_status()
297+
data = response.json() or {}
298+
break
298299
except requests.exceptions.RequestException as e:
299300
last_error = str(e)
300301
continue
@@ -412,22 +413,20 @@ def get_evaluation_rows(
412413

413414
result = None
414415
try:
415-
response = requests.get(url, params=params, timeout=self.timeout, headers=headers)
416-
response.raise_for_status()
417-
result = response.json()
418-
except requests.exceptions.HTTPError as e:
419-
error_msg = str(e)
420-
421-
# Try to extract detail message from response
422-
if e.response is not None:
423-
try:
424-
error_detail = e.response.json().get("detail", {})
425-
error_msg = error_detail or e.response.text
426-
except Exception: # In case e.response.json() fails
427-
error_msg = f"Proxy error: {e.response.text}"
428-
429-
logger.error("Failed to fetch traces from proxy (HTTP %s): %s", e.response.status_code, error_msg)
430-
return eval_rows
416+
with self._session.get(url, params=params, timeout=self.timeout, headers=headers) as response:
417+
if response.status_code >= 400:
418+
error_msg: str = response.text
419+
try:
420+
payload = response.json()
421+
if isinstance(payload, dict) and "detail" in payload:
422+
detail = payload.get("detail")
423+
if detail:
424+
error_msg = str(detail)
425+
except Exception:
426+
pass
427+
logger.error("Failed to fetch traces from proxy (HTTP %s): %s", response.status_code, error_msg)
428+
return eval_rows
429+
result = response.json()
431430
except requests.exceptions.RequestException as e:
432431
# Non-HTTP errors (network issues, timeouts, etc.)
433432
logger.error("Failed to fetch traces from proxy: %s", str(e))
@@ -451,3 +450,10 @@ def get_evaluation_rows(
451450

452451
logger.info("Successfully converted %d traces to evaluation rows", len(eval_rows))
453452
return eval_rows
453+
454+
def close(self) -> None:
455+
"""Close underlying HTTP resources."""
456+
try:
457+
self._session.close()
458+
except Exception:
459+
pass

eval_protocol/pytest/remote_rollout_processor.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def __init__(
5454
self._timeout_seconds = timeout_seconds
5555
self._output_data_loader = output_data_loader or default_fireworks_output_data_loader
5656
self._tracing_adapter = FireworksTracingAdapter(base_url=self._model_base_url)
57+
self._session = requests.Session()
5758

5859
def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]:
5960
tasks: List[asyncio.Task[EvaluationRow]] = []
@@ -94,8 +95,8 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
9495
def _post_init() -> None:
9596
url = f"{remote_base_url}/init"
9697
try:
97-
r = requests.post(url, json=init_payload.model_dump(), timeout=300)
98-
r.raise_for_status()
98+
with self._session.post(url, json=init_payload.model_dump(), timeout=300) as r:
99+
r.raise_for_status()
99100
except requests.exceptions.Timeout:
100101
raise TimeoutError(
101102
f"The /init endpoint tried {url} with {init_payload.model_dump()} but timed out after 300 seconds."
@@ -108,9 +109,9 @@ def _post_init() -> None:
108109

109110
def _get_status() -> Dict[str, Any]:
110111
url = f"{remote_base_url}/status"
111-
r = requests.get(url, params={"rollout_id": row.execution_metadata.rollout_id}, timeout=15)
112-
r.raise_for_status()
113-
return r.json()
112+
with self._session.get(url, params={"rollout_id": row.execution_metadata.rollout_id}, timeout=15) as r:
113+
r.raise_for_status()
114+
return r.json()
114115

115116
continue_polling_status = True
116117
while time.time() < deadline:
@@ -204,4 +205,12 @@ async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow:
204205
return tasks
205206

206207
def cleanup(self) -> None:
208+
try:
209+
self._tracing_adapter.close()
210+
except Exception:
211+
pass
212+
try:
213+
self._session.close()
214+
except Exception:
215+
pass
207216
return None

0 commit comments

Comments
 (0)