From 254c6eda45f27e100f661448e3f3cd4c7655f965 Mon Sep 17 00:00:00 2001 From: jerry <1772030600@qq.com> Date: Wed, 11 Feb 2026 00:45:32 +0800 Subject: [PATCH 1/7] feat: DailyPaper SSE streaming, post-judge filter, email push, persisted config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Convert /daily endpoint to SSE streaming when LLM/Judge enabled (search → build → LLM → judge → filter → save → notify → result) - Add post-judge paper filtering: remove "skip"/"skim", keep "worth_reading"+"must_read" - Add email push with UI-provided recipient (notify_email_to field) - Persist all config toggles in zustand store (defaults all enabled) - Add StreamProgressCard with real-time phase/progress/log display - SSE-aware Next.js proxy for daily route (detect content-type, stream or JSON) - Raise judge_max_items_per_query cap from 20 to 200 - Fix @dataclass inheritance in errors.py (subclass defaults) - Fix ensure_execution_result treating raw dicts as result dicts - Add 3 end-to-end tests for SSE pipeline (filter, full pipeline, sync fallback) Closes #24 --- src/paperbot/api/routes/paperscool.py | 584 +++++++++++++++- .../services/daily_push_service.py | 41 +- src/paperbot/core/abstractions/executable.py | 37 +- src/paperbot/core/errors/errors.py | 11 +- tests/unit/test_paperscool_route.py | 468 ++++++++++++- .../api/research/paperscool/daily/route.ts | 47 +- .../research/TopicWorkflowDashboard.tsx | 627 ++++++++++++++++-- web/src/lib/stores/workflow-store.ts | 65 +- 8 files changed, 1732 insertions(+), 148 deletions(-) diff --git a/src/paperbot/api/routes/paperscool.py b/src/paperbot/api/routes/paperscool.py index 5d5ee44..e9d11dd 100644 --- a/src/paperbot/api/routes/paperscool.py +++ b/src/paperbot/api/routes/paperscool.py @@ -1,7 +1,12 @@ from __future__ import annotations import copy +import os +import re from typing import Any, Dict, List, Optional +from urllib.parse import urlparse + +import requests from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse @@ -16,12 +21,15 @@ apply_judge_scores_to_report, build_daily_paper_report, enrich_daily_paper_report, + ingest_daily_report_to_registry, normalize_llm_features, normalize_output_formats, + persist_judge_scores_to_registry, render_daily_paper_markdown, select_judge_candidates, ) from paperbot.application.workflows.paperscool_topic_search import PapersCoolTopicSearchWorkflow +from paperbot.utils.text_processing import extract_github_url router = APIRouter() @@ -52,7 +60,7 @@ class DailyPaperRequest(BaseModel): show_per_branch: int = Field(25, ge=1, le=200) min_score: float = Field(0.0, ge=0.0, description="Drop papers scoring below this threshold") title: str = "DailyPaper Digest" - top_n: int = Field(10, ge=1, le=50) + top_n: int = Field(10, ge=1, le=200) formats: List[str] = Field(default_factory=lambda: ["both"]) save: bool = False output_dir: str = "./reports/dailypaper" @@ -60,10 +68,11 @@ class DailyPaperRequest(BaseModel): llm_features: List[str] = Field(default_factory=lambda: ["summary"]) enable_judge: bool = False judge_runs: int = Field(1, ge=1, le=5) - judge_max_items_per_query: int = Field(5, ge=1, le=20) + judge_max_items_per_query: int = Field(5, ge=1, le=200) judge_token_budget: int = Field(0, ge=0, le=2_000_000) notify: bool = False notify_channels: List[str] = Field(default_factory=list) + notify_email_to: List[str] = Field(default_factory=list) class DailyPaperResponse(BaseModel): @@ -80,11 +89,25 @@ class PapersCoolAnalyzeRequest(BaseModel): run_trends: bool = False run_insight: bool = False judge_runs: int = Field(1, ge=1, le=5) - judge_max_items_per_query: int = Field(5, ge=1, le=20) + judge_max_items_per_query: int = Field(5, ge=1, le=200) judge_token_budget: int = Field(0, ge=0, le=2_000_000) trend_max_items_per_query: int = Field(3, ge=1, le=20) +class PapersCoolReposRequest(BaseModel): + report: Optional[Dict[str, Any]] = None + papers: List[Dict[str, Any]] = Field(default_factory=list) + max_items: int = Field(100, ge=1, le=1000) + include_github_api: bool = True + + +class PapersCoolReposResponse(BaseModel): + total_candidates: int + matched_repos: int + github_api_used: bool + repos: List[Dict[str, Any]] + + @router.post("/research/paperscool/search", response_model=PapersCoolSearchResponse) def topic_search(req: PapersCoolSearchRequest): cleaned_queries = [q.strip() for q in req.queries if (q or "").strip()] @@ -106,37 +129,373 @@ def topic_search(req: PapersCoolSearchRequest): return PapersCoolSearchResponse(**result) -@router.post("/research/paperscool/daily", response_model=DailyPaperResponse) -def generate_daily_report(req: DailyPaperRequest): +async def _dailypaper_stream(req: DailyPaperRequest): + """SSE generator for the full DailyPaper pipeline.""" + cleaned_queries = [q.strip() for q in req.queries if (q or "").strip()] + + # Phase 1 — Search + yield StreamEvent(type="progress", data={"phase": "search", "message": "Searching papers..."}) + workflow = PapersCoolTopicSearchWorkflow() + effective_top_k = max(int(req.top_k_per_query), int(req.top_n), 1) + search_result = workflow.run( + queries=cleaned_queries, + sources=req.sources, + branches=req.branches, + top_k_per_query=effective_top_k, + show_per_branch=req.show_per_branch, + min_score=req.min_score, + ) + summary = search_result.get("summary") or {} + yield StreamEvent( + type="search_done", + data={ + "items_count": len(search_result.get("items") or []), + "queries_count": len(search_result.get("queries") or []), + "unique_items": int(summary.get("unique_items") or 0), + }, + ) + + # Phase 2 — Build Report + yield StreamEvent(type="progress", data={"phase": "build", "message": "Building report..."}) + report = build_daily_paper_report(search_result=search_result, title=req.title, top_n=req.top_n) + yield StreamEvent( + type="report_built", + data={ + "queries_count": len(report.get("queries") or []), + "global_top_count": len(report.get("global_top") or []), + "report": report, + }, + ) + + # Phase 3 — LLM Enrichment + if req.enable_llm_analysis: + features = normalize_llm_features(req.llm_features) + if features: + llm_service = get_llm_service() + llm_block: Dict[str, Any] = { + "enabled": True, + "features": features, + "query_trends": [], + "daily_insight": "", + } + + summary_done = 0 + summary_total = 0 + if "summary" in features or "relevance" in features: + for query in report.get("queries") or []: + summary_total += len((query.get("top_items") or [])[:3]) + + yield StreamEvent( + type="progress", + data={"phase": "llm", "message": "Starting LLM enrichment...", "total": summary_total}, + ) + + for query in report.get("queries") or []: + query_name = query.get("normalized_query") or query.get("raw_query") or "" + top_items = (query.get("top_items") or [])[:3] + + if "summary" in features: + for item in top_items: + item["ai_summary"] = llm_service.summarize_paper( + title=item.get("title") or "", + abstract=item.get("snippet") or item.get("abstract") or "", + ) + summary_done += 1 + yield StreamEvent( + type="llm_summary", + data={ + "title": item.get("title") or "Untitled", + "query": query_name, + "ai_summary": item["ai_summary"], + "done": summary_done, + "total": summary_total, + }, + ) + + if "relevance" in features: + for item in top_items: + item["relevance"] = llm_service.assess_relevance(paper=item, query=query_name) + if "summary" not in features: + summary_done += 1 + + if "trends" in features and top_items: + trend_text = llm_service.analyze_trends(topic=query_name, papers=top_items) + llm_block["query_trends"].append({"query": query_name, "analysis": trend_text}) + yield StreamEvent( + type="trend", + data={ + "query": query_name, + "analysis": trend_text, + "done": len(llm_block["query_trends"]), + "total": len(report.get("queries") or []), + }, + ) + + if "insight" in features: + yield StreamEvent(type="progress", data={"phase": "insight", "message": "Generating daily insight..."}) + llm_block["daily_insight"] = llm_service.generate_daily_insight(report) + yield StreamEvent(type="insight", data={"analysis": llm_block["daily_insight"]}) + + report["llm_analysis"] = llm_block + yield StreamEvent( + type="llm_done", + data={ + "summaries_count": summary_done, + "trends_count": len(llm_block["query_trends"]), + }, + ) + + # Phase 4 — Judge + if req.enable_judge: + llm_service_j = get_llm_service() + judge = PaperJudge(llm_service=llm_service_j) + selection = select_judge_candidates( + report, + max_items_per_query=req.judge_max_items_per_query, + n_runs=req.judge_runs, + token_budget=req.judge_token_budget, + ) + selected = list(selection.get("selected") or []) + recommendation_count: Dict[str, int] = { + "must_read": 0, + "worth_reading": 0, + "skim": 0, + "skip": 0, + } + + yield StreamEvent( + type="progress", + data={ + "phase": "judge", + "message": "Starting judge scoring", + "total": len(selected), + "budget": selection.get("budget") or {}, + }, + ) + + for idx, row in enumerate(selected, start=1): + query_index = int(row.get("query_index") or 0) + item_index = int(row.get("item_index") or 0) + + queries = list(report.get("queries") or []) + if query_index >= len(queries): + continue + + query = queries[query_index] + query_name = query.get("normalized_query") or query.get("raw_query") or "" + top_items = list(query.get("top_items") or []) + if item_index >= len(top_items): + continue + + item = top_items[item_index] + if req.judge_runs > 1: + judgment = judge.judge_with_calibration( + paper=item, + query=query_name, + n_runs=max(1, int(req.judge_runs)), + ) + else: + judgment = judge.judge_single(paper=item, query=query_name) + + j_payload = judgment.to_dict() + item["judge"] = j_payload + rec = j_payload.get("recommendation") + if rec in recommendation_count: + recommendation_count[rec] += 1 + + yield StreamEvent( + type="judge", + data={ + "query": query_name, + "title": item.get("title") or "Untitled", + "judge": j_payload, + "done": idx, + "total": len(selected), + }, + ) + + for query in report.get("queries") or []: + top_items = list(query.get("top_items") or []) + if not top_items: + continue + capped_count = min(len(top_items), max(1, int(req.judge_max_items_per_query))) + capped = top_items[:capped_count] + capped.sort( + key=lambda it: float((it.get("judge") or {}).get("overall") or -1), reverse=True + ) + query["top_items"] = capped + top_items[capped_count:] + + report["judge"] = { + "enabled": True, + "max_items_per_query": int(req.judge_max_items_per_query), + "n_runs": int(max(1, int(req.judge_runs))), + "recommendation_count": recommendation_count, + "budget": selection.get("budget") or {}, + } + yield StreamEvent(type="judge_done", data=report["judge"]) + + # Phase 4b — Filter: remove papers below "worth_reading" + KEEP_RECOMMENDATIONS = {"must_read", "worth_reading"} + yield StreamEvent( + type="progress", + data={"phase": "filter", "message": "Filtering papers by judge recommendation..."}, + ) + filter_log: List[Dict[str, Any]] = [] + total_before = 0 + total_after = 0 + for query in report.get("queries") or []: + query_name = query.get("normalized_query") or query.get("raw_query") or "" + items_before = list(query.get("top_items") or []) + total_before += len(items_before) + kept: List[Dict[str, Any]] = [] + removed: List[Dict[str, Any]] = [] + for item in items_before: + j = item.get("judge") + if isinstance(j, dict): + rec = j.get("recommendation", "") + if rec in KEEP_RECOMMENDATIONS: + kept.append(item) + else: + removed.append(item) + filter_log.append({ + "query": query_name, + "title": item.get("title") or "Untitled", + "recommendation": rec, + "overall": j.get("overall"), + "action": "removed", + }) + else: + # No judge score — keep by default (unjudged papers) + kept.append(item) + total_after += len(kept) + query["top_items"] = kept + + # Also filter global_top + global_before = list(report.get("global_top") or []) + global_kept = [] + for item in global_before: + j = item.get("judge") + if isinstance(j, dict): + rec = j.get("recommendation", "") + if rec in KEEP_RECOMMENDATIONS: + global_kept.append(item) + else: + global_kept.append(item) + report["global_top"] = global_kept + + report["filter"] = { + "enabled": True, + "keep_recommendations": list(KEEP_RECOMMENDATIONS), + "total_before": total_before, + "total_after": total_after, + "removed_count": total_before - total_after, + "log": filter_log, + } + yield StreamEvent( + type="filter_done", + data={ + "total_before": total_before, + "total_after": total_after, + "removed_count": total_before - total_after, + "log": filter_log, + }, + ) + + # Phase 5 — Persist + Notify + yield StreamEvent(type="progress", data={"phase": "save", "message": "Saving to registry..."}) + try: + ingest_summary = ingest_daily_report_to_registry(report) + report["registry_ingest"] = ingest_summary + except Exception as exc: + report["registry_ingest"] = {"error": str(exc)} + + if req.enable_judge: + try: + report["judge_registry_ingest"] = persist_judge_scores_to_registry(report) + except Exception as exc: + report["judge_registry_ingest"] = {"error": str(exc)} + + markdown = render_daily_paper_markdown(report) + + markdown_path = None + json_path = None + notify_result: Optional[Dict[str, Any]] = None + if req.save: + reporter = DailyPaperReporter(output_dir=req.output_dir) + artifacts = reporter.write( + report=report, + markdown=markdown, + formats=normalize_output_formats(req.formats), + slug=req.title, + ) + markdown_path = artifacts.markdown_path + json_path = artifacts.json_path + + if req.notify: + yield StreamEvent(type="progress", data={"phase": "notify", "message": "Sending notifications..."}) + notify_service = DailyPushService.from_env() + notify_result = notify_service.push_dailypaper( + report=report, + markdown=markdown, + markdown_path=markdown_path, + json_path=json_path, + channels_override=req.notify_channels or None, + email_to_override=req.notify_email_to or None, + ) + + yield StreamEvent( + type="result", + data={ + "report": report, + "markdown": markdown, + "markdown_path": markdown_path, + "json_path": json_path, + "notify_result": notify_result, + }, + ) + + +@router.post("/research/paperscool/daily") +async def generate_daily_report(req: DailyPaperRequest): cleaned_queries = [q.strip() for q in req.queries if (q or "").strip()] if not cleaned_queries: raise HTTPException(status_code=400, detail="queries is required") + # Fast sync path when no LLM/Judge — avoids SSE overhead + if not req.enable_llm_analysis and not req.enable_judge: + return _sync_daily_report(req, cleaned_queries) + + # SSE streaming path for long-running operations + return StreamingResponse( + wrap_generator(_dailypaper_stream(req)), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}, + ) + + +def _sync_daily_report(req: DailyPaperRequest, cleaned_queries: List[str]): + """Original synchronous path for fast requests (no LLM/Judge).""" workflow = PapersCoolTopicSearchWorkflow() + effective_top_k = max(int(req.top_k_per_query), int(req.top_n), 1) try: search_result = workflow.run( queries=cleaned_queries, sources=req.sources, branches=req.branches, - top_k_per_query=req.top_k_per_query, + top_k_per_query=effective_top_k, show_per_branch=req.show_per_branch, min_score=req.min_score, ) except Exception as exc: raise HTTPException(status_code=502, detail=f"daily search failed: {exc}") from exc report = build_daily_paper_report(search_result=search_result, title=req.title, top_n=req.top_n) - if req.enable_llm_analysis: - report = enrich_daily_paper_report( - report, - llm_features=normalize_llm_features(req.llm_features), - ) - if req.enable_judge: - report = apply_judge_scores_to_report( - report, - max_items_per_query=req.judge_max_items_per_query, - n_runs=req.judge_runs, - judge_token_budget=req.judge_token_budget, - ) + + try: + ingest_summary = ingest_daily_report_to_registry(report) + report["registry_ingest"] = ingest_summary + except Exception as exc: + report["registry_ingest"] = {"error": str(exc)} + markdown = render_daily_paper_markdown(report) markdown_path = None @@ -161,6 +520,7 @@ def generate_daily_report(req: DailyPaperRequest): markdown_path=markdown_path, json_path=json_path, channels_override=req.notify_channels or None, + email_to_override=req.notify_email_to or None, ) return DailyPaperResponse( @@ -172,6 +532,190 @@ def generate_daily_report(req: DailyPaperRequest): ) +_GITHUB_REPO_RE = re.compile(r"https?://github\.com/([\w.-]+)/([\w.-]+)", re.IGNORECASE) + + +def _normalize_github_repo_url(raw_url: str | None) -> Optional[str]: + if not raw_url: + return None + candidate = (raw_url or "").strip() + if not candidate: + return None + if "github.com" not in candidate.lower(): + return None + if not candidate.startswith("http"): + candidate = f"https://{candidate}" + + parsed = urlparse(candidate) + if "github.com" not in (parsed.netloc or "").lower(): + return None + + match = _GITHUB_REPO_RE.search(f"{parsed.scheme}://{parsed.netloc}{parsed.path}") + if not match: + return None + + owner, repo = match.group(1), match.group(2) + repo = repo.removesuffix(".git") + return f"https://github.com/{owner}/{repo}" + + +def _extract_repo_url_from_paper(paper: Dict[str, Any]) -> Optional[str]: + candidates: List[str] = [] + for key in ("github_url", "external_url", "url", "pdf_url"): + value = paper.get(key) + if isinstance(value, str) and value: + candidates.append(value) + + for alt in paper.get("alternative_urls") or []: + if isinstance(alt, str) and alt: + candidates.append(alt) + + for candidate in candidates: + normalized = _normalize_github_repo_url(candidate) + if normalized: + return normalized + + text_blob_parts = [ + str(paper.get("title") or ""), + str(paper.get("snippet") or paper.get("abstract") or ""), + " ".join(str(k) for k in (paper.get("keywords") or [])), + ] + extracted = extract_github_url("\n".join(text_blob_parts)) + return _normalize_github_repo_url(extracted) + + +def _flatten_report_papers(report: Dict[str, Any]) -> List[Dict[str, Any]]: + rows: List[Dict[str, Any]] = [] + for query in report.get("queries") or []: + query_name = query.get("normalized_query") or query.get("raw_query") or "" + for item in query.get("top_items") or []: + row = dict(item) + row.setdefault("_query", query_name) + rows.append(row) + + for item in report.get("global_top") or []: + row = dict(item) + if "_query" not in row: + matched = row.get("matched_queries") or [] + row["_query"] = matched[0] if matched else "" + rows.append(row) + + deduped: List[Dict[str, Any]] = [] + seen: set[str] = set() + for item in rows: + key = f"{item.get('url') or ''}|{item.get('title') or ''}" + if key in seen: + continue + seen.add(key) + deduped.append(item) + return deduped + + +def _fetch_github_repo_metadata(repo_url: str, token: Optional[str]) -> Dict[str, Any]: + normalized = _normalize_github_repo_url(repo_url) + if not normalized: + return {"ok": False, "error": "invalid_repo_url"} + + match = _GITHUB_REPO_RE.search(normalized) + if not match: + return {"ok": False, "error": "invalid_repo_url"} + + owner, repo = match.group(1), match.group(2) + api_url = f"https://api.github.com/repos/{owner}/{repo}" + + headers = {"Accept": "application/vnd.github+json", "User-Agent": "PaperBot/1.0"} + if token: + headers["Authorization"] = f"Bearer {token}" + + try: + resp = requests.get(api_url, headers=headers, timeout=15) + if resp.status_code != 200: + return { + "ok": False, + "status": resp.status_code, + "error": "github_api_error", + "repo_url": normalized, + } + + payload = resp.json() + return { + "ok": True, + "status": resp.status_code, + "repo_url": normalized, + "full_name": payload.get("full_name") or f"{owner}/{repo}", + "description": payload.get("description") or "", + "stars": int(payload.get("stargazers_count") or 0), + "forks": int(payload.get("forks_count") or 0), + "open_issues": int(payload.get("open_issues_count") or 0), + "watchers": int(payload.get("subscribers_count") or payload.get("watchers_count") or 0), + "language": payload.get("language") or "", + "license": (payload.get("license") or {}).get("spdx_id") or "", + "updated_at": payload.get("updated_at"), + "pushed_at": payload.get("pushed_at"), + "archived": bool(payload.get("archived")), + "topics": payload.get("topics") or [], + "html_url": payload.get("html_url") or normalized, + } + except Exception as exc: + return { + "ok": False, + "error": f"github_api_exception: {exc}", + "repo_url": normalized, + } + + +@router.post("/research/paperscool/repos", response_model=PapersCoolReposResponse) +def enrich_papers_with_repo_data(req: PapersCoolReposRequest): + papers: List[Dict[str, Any]] = [] + if isinstance(req.report, dict): + papers.extend(_flatten_report_papers(req.report)) + papers.extend(list(req.papers or [])) + + if not papers: + raise HTTPException(status_code=400, detail="report or papers is required") + + deduped: List[Dict[str, Any]] = [] + seen: set[str] = set() + for item in papers: + key = f"{item.get('url') or ''}|{item.get('title') or ''}" + if key in seen: + continue + seen.add(key) + deduped.append(item) + + selected = deduped[: max(1, int(req.max_items))] + token = os.getenv("GITHUB_TOKEN") or os.getenv("GH_TOKEN") + + repos: List[Dict[str, Any]] = [] + for item in selected: + repo_url = _extract_repo_url_from_paper(item) + if not repo_url: + continue + + row: Dict[str, Any] = { + "title": item.get("title") or "Untitled", + "query": item.get("_query") or ", ".join(item.get("matched_queries") or []), + "paper_url": item.get("url") or item.get("external_url") or "", + "repo_url": repo_url, + } + if req.include_github_api: + row["github"] = _fetch_github_repo_metadata(repo_url=repo_url, token=token) + repos.append(row) + + if req.include_github_api: + repos.sort( + key=lambda row: int(((row.get("github") or {}).get("stars") or -1)), + reverse=True, + ) + + return PapersCoolReposResponse( + total_candidates=len(selected), + matched_repos=len(repos), + github_api_used=bool(req.include_github_api), + repos=repos, + ) + + async def _paperscool_analyze_stream(req: PapersCoolAnalyzeRequest): report = copy.deepcopy(req.report) llm_service = get_llm_service() @@ -323,6 +867,10 @@ async def _paperscool_analyze_stream(req: PapersCoolAnalyzeRequest): "recommendation_count": recommendation_count, "budget": selection.get("budget") or {}, } + try: + report["judge_registry_ingest"] = persist_judge_scores_to_registry(report) + except Exception as exc: + report["judge_registry_ingest"] = {"error": str(exc)} yield StreamEvent(type="judge_done", data=report["judge"]) markdown = render_daily_paper_markdown(report) diff --git a/src/paperbot/application/services/daily_push_service.py b/src/paperbot/application/services/daily_push_service.py index 6f2fd98..9d97e72 100644 --- a/src/paperbot/application/services/daily_push_service.py +++ b/src/paperbot/application/services/daily_push_service.py @@ -93,6 +93,7 @@ def push_dailypaper( markdown_path: Optional[str] = None, json_path: Optional[str] = None, channels_override: Optional[List[str]] = None, + email_to_override: Optional[List[str]] = None, ) -> Dict[str, Any]: channels = channels_override or self.config.channels channels = [c.strip().lower() for c in channels if c and c.strip()] @@ -102,6 +103,13 @@ def push_dailypaper( if not channels: return {"sent": False, "reason": "no channels configured", "channels": channels} + # Allow UI-provided email recipients to override env config + original_email_to = self.config.email_to + if email_to_override: + cleaned = [e.strip() for e in email_to_override if (e or "").strip()] + if cleaned: + self.config.email_to = cleaned + subject = self._build_subject(report) text = self._build_text( report, markdown=markdown, markdown_path=markdown_path, json_path=json_path @@ -109,21 +117,24 @@ def push_dailypaper( results: Dict[str, Any] = {"sent": False, "channels": channels, "results": {}} any_success = False - for channel in channels: - try: - if channel == "email": - self._send_email(subject=subject, body=text) - elif channel == "slack": - self._send_slack(subject=subject, body=text) - elif channel in {"dingtalk", "dingding"}: - self._send_dingtalk(subject=subject, body=text) - else: - raise ValueError(f"unsupported channel: {channel}") - results["results"][channel] = {"ok": True} - any_success = True - except Exception as exc: # pragma: no cover - runtime specific - logger.warning("Daily push failed channel=%s err=%s", channel, exc) - results["results"][channel] = {"ok": False, "error": str(exc)} + try: + for channel in channels: + try: + if channel == "email": + self._send_email(subject=subject, body=text) + elif channel == "slack": + self._send_slack(subject=subject, body=text) + elif channel in {"dingtalk", "dingding"}: + self._send_dingtalk(subject=subject, body=text) + else: + raise ValueError(f"unsupported channel: {channel}") + results["results"][channel] = {"ok": True} + any_success = True + except Exception as exc: # pragma: no cover - runtime specific + logger.warning("Daily push failed channel=%s err=%s", channel, exc) + results["results"][channel] = {"ok": False, "error": str(exc)} + finally: + self.config.email_to = original_email_to results["sent"] = any_success return results diff --git a/src/paperbot/core/abstractions/executable.py b/src/paperbot/core/abstractions/executable.py index 7ada4dd..d4beab9 100644 --- a/src/paperbot/core/abstractions/executable.py +++ b/src/paperbot/core/abstractions/executable.py @@ -110,22 +110,27 @@ def ensure_execution_result(raw: Union[ExecutionResult[TOutput], Dict[str, Any], return raw if isinstance(raw, dict): - # 兼容旧的 status 字段 - success = raw.get("success") - if success is None: - status = raw.get("status") - success = status == "success" if status is not None else True - error = raw.get("error") - data = raw.get("data") - metadata = raw.get("metadata", {}) - duration_ms = raw.get("duration_ms") - return ExecutionResult( - success=bool(success), - error=error, - data=data, - metadata=metadata if isinstance(metadata, dict) else {}, - duration_ms=duration_ms, - ) + # Only treat as a result dict if it has recognized result keys + result_keys = {"success", "status", "data", "error"} + if result_keys & raw.keys(): + # 兼容旧的 status 字段 + success = raw.get("success") + if success is None: + status = raw.get("status") + success = status == "success" if status is not None else True + error = raw.get("error") + data = raw.get("data") + metadata = raw.get("metadata", {}) + duration_ms = raw.get("duration_ms") + return ExecutionResult( + success=bool(success), + error=error, + data=data, + metadata=metadata if isinstance(metadata, dict) else {}, + duration_ms=duration_ms, + ) + # No recognized keys — treat the entire dict as data + return ExecutionResult.ok(raw) # type: ignore[arg-type] return ExecutionResult.ok(raw) # type: ignore[arg-type] diff --git a/src/paperbot/core/errors/errors.py b/src/paperbot/core/errors/errors.py index a9a78c0..b88a964 100644 --- a/src/paperbot/core/errors/errors.py +++ b/src/paperbot/core/errors/errors.py @@ -26,17 +26,20 @@ def __str__(self) -> str: return f"[{self.code}] {self.message}" +@dataclass class LLMError(PaperBotError): - code = "LLM_ERROR" + code: str = "LLM_ERROR" +@dataclass class APIError(PaperBotError): - code = "API_ERROR" + code: str = "API_ERROR" +@dataclass class ValidationError(PaperBotError): - code = "VALIDATION_ERROR" - severity = ErrorSeverity.WARNING + severity: ErrorSeverity = ErrorSeverity.WARNING + code: str = "VALIDATION_ERROR" T = TypeVar("T") diff --git a/tests/unit/test_paperscool_route.py b/tests/unit/test_paperscool_route.py index 79cd72a..c4f9fb8 100644 --- a/tests/unit/test_paperscool_route.py +++ b/tests/unit/test_paperscool_route.py @@ -4,8 +4,24 @@ from paperbot.api.routes import paperscool as paperscool_route +def _parse_sse_events(text: str): + """Parse SSE text into a list of event dicts.""" + import json + events = [] + for line in text.split("\n"): + if line.startswith("data: "): + payload = line[6:].strip() + if payload == "[DONE]": + continue + try: + events.append(json.loads(payload)) + except Exception: + pass + return events + + class _FakeWorkflow: - def run(self, *, queries, sources, branches, top_k_per_query, show_per_branch): + def run(self, *, queries, sources, branches, top_k_per_query, show_per_branch, min_score=0.0): return { "source": "papers.cool", "fetched_at": "2026-02-09T00:00:00+00:00", @@ -113,15 +129,20 @@ def test_paperscool_daily_route_success(monkeypatch, tmp_path): def test_paperscool_daily_route_with_llm_enrichment(monkeypatch): monkeypatch.setattr(paperscool_route, "PapersCoolTopicSearchWorkflow", _FakeWorkflow) - called = {"value": False} + class _FakeLLMService: + def summarize_paper(self, *, title, abstract): + return f"summary of {title}" - def _fake_enrich(report, *, llm_features, llm_service=None, max_items_per_query=3): - called["value"] = True - report = dict(report) - report["llm_analysis"] = {"enabled": True, "features": llm_features} - return report + def assess_relevance(self, *, paper, query): + return {"score": 4, "reason": "relevant"} - monkeypatch.setattr(paperscool_route, "enrich_daily_paper_report", _fake_enrich) + def analyze_trends(self, *, topic, papers): + return f"trend:{topic}:{len(papers)}" + + def generate_daily_insight(self, report): + return "daily insight" + + monkeypatch.setattr(paperscool_route, "get_llm_service", lambda: _FakeLLMService()) with TestClient(api_main.app) as client: resp = client.post( @@ -134,36 +155,44 @@ def _fake_enrich(report, *, llm_features, llm_service=None, max_items_per_query= ) assert resp.status_code == 200 - payload = resp.json() - assert called["value"] is True - assert payload["report"]["llm_analysis"]["enabled"] is True + # SSE stream response + events = _parse_sse_events(resp.text) + types = [e.get("type") for e in events] + assert "llm_done" in types + result_event = next(e for e in events if e.get("type") == "result") + assert result_event["data"]["report"]["llm_analysis"]["enabled"] is True def test_paperscool_daily_route_with_judge(monkeypatch): monkeypatch.setattr(paperscool_route, "PapersCoolTopicSearchWorkflow", _FakeWorkflow) - called = {"value": False} - - def _fake_judge( - report, - *, - llm_service=None, - max_items_per_query=5, - n_runs=1, - judge_token_budget=0, - ): - called["value"] = True - report = dict(report) - report["judge"] = { - "enabled": True, - "max_items_per_query": max_items_per_query, - "n_runs": n_runs, - "recommendation_count": {"must_read": 1, "worth_reading": 0, "skim": 0, "skip": 0}, - "budget": {"token_budget": judge_token_budget, "judged_items": 1}, - } - return report + class _FakeJudgment: + def to_dict(self): + return { + "relevance": {"score": 5, "rationale": ""}, + "novelty": {"score": 4, "rationale": ""}, + "rigor": {"score": 4, "rationale": ""}, + "impact": {"score": 4, "rationale": ""}, + "clarity": {"score": 4, "rationale": ""}, + "overall": 4.2, + "one_line_summary": "good", + "recommendation": "must_read", + "judge_model": "fake", + "judge_cost_tier": 1, + } + + class _FakeJudge: + def __init__(self, llm_service=None): + pass - monkeypatch.setattr(paperscool_route, "apply_judge_scores_to_report", _fake_judge) + def judge_single(self, *, paper, query): + return _FakeJudgment() + + def judge_with_calibration(self, *, paper, query, n_runs=1): + return _FakeJudgment() + + monkeypatch.setattr(paperscool_route, "get_llm_service", lambda: object()) + monkeypatch.setattr(paperscool_route, "PaperJudge", _FakeJudge) with TestClient(api_main.app) as client: resp = client.post( @@ -177,9 +206,12 @@ def _fake_judge( ) assert resp.status_code == 200 - payload = resp.json() - assert called["value"] is True - assert payload["report"]["judge"]["enabled"] is True + events = _parse_sse_events(resp.text) + types = [e.get("type") for e in events] + assert "judge" in types + assert "judge_done" in types + result_event = next(e for e in events if e.get("type") == "result") + assert result_event["data"]["report"]["judge"]["enabled"] is True def test_paperscool_analyze_route_stream(monkeypatch): @@ -256,3 +288,369 @@ def judge_with_calibration(self, *, paper, query, n_runs=1): assert '"type": "trend"' in text assert '"type": "judge"' in text assert "[DONE]" in text + + +def test_paperscool_repos_route_extracts_and_enriches(monkeypatch): + class _FakeResp: + status_code = 200 + + def json(self): + return { + "full_name": "owner/repo", + "stargazers_count": 42, + "forks_count": 7, + "open_issues_count": 1, + "watchers_count": 5, + "language": "Python", + "license": {"spdx_id": "MIT"}, + "updated_at": "2026-02-01T00:00:00Z", + "pushed_at": "2026-02-02T00:00:00Z", + "archived": False, + "topics": ["llm"], + "html_url": "https://github.com/owner/repo", + } + + monkeypatch.setattr(paperscool_route.requests, "get", lambda *args, **kwargs: _FakeResp()) + + with TestClient(api_main.app) as client: + resp = client.post( + "/api/research/paperscool/repos", + json={ + "papers": [ + { + "title": "Repo Paper", + "url": "https://papers.cool/arxiv/1234", + "external_url": "https://github.com/owner/repo", + } + ], + "include_github_api": True, + }, + ) + + assert resp.status_code == 200 + payload = resp.json() + assert payload["matched_repos"] == 1 + assert payload["repos"][0]["repo_url"] == "https://github.com/owner/repo" + assert payload["repos"][0]["github"]["stars"] == 42 + + +def test_paperscool_daily_route_persists_judge_scores(monkeypatch): + monkeypatch.setattr(paperscool_route, "PapersCoolTopicSearchWorkflow", _FakeWorkflow) + + class _FakeJudgment: + def to_dict(self): + return { + "relevance": {"score": 5, "rationale": ""}, + "novelty": {"score": 4, "rationale": ""}, + "rigor": {"score": 4, "rationale": ""}, + "impact": {"score": 4, "rationale": ""}, + "clarity": {"score": 4, "rationale": ""}, + "overall": 4.2, + "one_line_summary": "good", + "recommendation": "must_read", + "judge_model": "fake", + "judge_cost_tier": 1, + } + + class _FakeJudge: + def __init__(self, llm_service=None): + pass + + def judge_single(self, *, paper, query): + return _FakeJudgment() + + def judge_with_calibration(self, *, paper, query, n_runs=1): + return _FakeJudgment() + + monkeypatch.setattr(paperscool_route, "get_llm_service", lambda: object()) + monkeypatch.setattr(paperscool_route, "PaperJudge", _FakeJudge) + + with TestClient(api_main.app) as client: + resp = client.post( + "/api/research/paperscool/daily", + json={ + "queries": ["ICL压缩"], + "enable_judge": True, + }, + ) + + assert resp.status_code == 200 + events = _parse_sse_events(resp.text) + result_event = next(e for e in events if e.get("type") == "result") + report = result_event["data"]["report"] + # Judge registry ingest should have been attempted + assert "judge_registry_ingest" in report + + +class _FakeWorkflowMultiPaper: + """Workflow returning multiple papers for filter testing.""" + + def run(self, *, queries, sources, branches, top_k_per_query, show_per_branch, min_score=0.0): + return { + "source": "papers.cool", + "fetched_at": "2026-02-10T00:00:00+00:00", + "sources": sources, + "queries": [ + { + "raw_query": queries[0], + "normalized_query": "icl compression", + "tokens": ["icl", "compression"], + "total_hits": 3, + "items": [ + { + "paper_id": "p1", + "title": "GoodPaper", + "url": "https://papers.cool/venue/p1", + "score": 10.0, + "snippet": "excellent work", + "keywords": ["icl"], + "branches": branches, + "matched_queries": ["icl compression"], + }, + { + "paper_id": "p2", + "title": "MediocreWork", + "url": "https://papers.cool/venue/p2", + "score": 5.0, + "snippet": "average", + "keywords": ["icl"], + "branches": branches, + "matched_queries": ["icl compression"], + }, + { + "paper_id": "p3", + "title": "WeakPaper", + "url": "https://papers.cool/venue/p3", + "score": 2.0, + "snippet": "not great", + "keywords": ["icl"], + "branches": branches, + "matched_queries": ["icl compression"], + }, + ], + } + ], + "items": [], + "summary": { + "unique_items": 3, + "total_query_hits": 3, + }, + } + + +def test_dailypaper_sse_filter_removes_low_papers(monkeypatch): + """End-to-end: judge scores papers, filter removes 'skip' and 'skim'.""" + monkeypatch.setattr(paperscool_route, "PapersCoolTopicSearchWorkflow", _FakeWorkflowMultiPaper) + + # Judge returns different recommendations per paper title + class _VaryingJudgment: + def __init__(self, title): + self._title = title + + def to_dict(self): + rec_map = { + "GoodPaper": ("must_read", 4.5), + "MediocreWork": ("skim", 2.9), + "WeakPaper": ("skip", 1.8), + } + rec, overall = rec_map.get(self._title, ("skip", 1.0)) + return { + "relevance": {"score": 4, "rationale": ""}, + "novelty": {"score": 3, "rationale": ""}, + "rigor": {"score": 3, "rationale": ""}, + "impact": {"score": 3, "rationale": ""}, + "clarity": {"score": 3, "rationale": ""}, + "overall": overall, + "one_line_summary": f"summary of {self._title}", + "recommendation": rec, + "judge_model": "fake", + "judge_cost_tier": 1, + } + + class _FakeJudge: + def __init__(self, llm_service=None): + pass + + def judge_single(self, *, paper, query): + return _VaryingJudgment(paper.get("title", "")) + + def judge_with_calibration(self, *, paper, query, n_runs=1): + return _VaryingJudgment(paper.get("title", "")) + + monkeypatch.setattr(paperscool_route, "get_llm_service", lambda: object()) + monkeypatch.setattr(paperscool_route, "PaperJudge", _FakeJudge) + + with TestClient(api_main.app) as client: + resp = client.post( + "/api/research/paperscool/daily", + json={ + "queries": ["ICL压缩"], + "enable_judge": True, + "judge_max_items_per_query": 10, + }, + ) + + assert resp.status_code == 200 + events = _parse_sse_events(resp.text) + types = [e.get("type") for e in events] + + # All expected phases present + assert "judge" in types + assert "judge_done" in types + assert "filter_done" in types + assert "result" in types + + # Check filter_done event + filter_event = next(e for e in events if e.get("type") == "filter_done") + assert filter_event["data"]["total_before"] == 3 + assert filter_event["data"]["total_after"] == 1 # only GoodPaper kept + assert filter_event["data"]["removed_count"] == 2 + + # Check filter log has details for removed papers + filter_log = filter_event["data"]["log"] + removed_titles = {entry["title"] for entry in filter_log} + assert "MediocreWork" in removed_titles + assert "WeakPaper" in removed_titles + assert "GoodPaper" not in removed_titles + + # Check final result only has the kept paper + result_event = next(e for e in events if e.get("type") == "result") + final_report = result_event["data"]["report"] + final_items = final_report["queries"][0]["top_items"] + assert len(final_items) == 1 + assert final_items[0]["title"] == "GoodPaper" + + # Judge log events should have all 3 papers (complete log) + judge_events = [e for e in events if e.get("type") == "judge"] + assert len(judge_events) == 3 + judge_titles = {e["data"]["title"] for e in judge_events} + assert judge_titles == {"GoodPaper", "MediocreWork", "WeakPaper"} + + +def test_dailypaper_sse_full_pipeline_llm_judge_filter(monkeypatch): + """End-to-end: LLM enrichment + Judge + Filter in one SSE stream.""" + monkeypatch.setattr(paperscool_route, "PapersCoolTopicSearchWorkflow", _FakeWorkflowMultiPaper) + + class _FakeLLMService: + def summarize_paper(self, *, title, abstract): + return f"summary of {title}" + + def assess_relevance(self, *, paper, query): + return {"score": 4, "reason": "relevant"} + + def analyze_trends(self, *, topic, papers): + return f"trend:{topic}:{len(papers)}" + + def generate_daily_insight(self, report): + return "daily insight" + + def complete(self, **kwargs): + return "{}" + + def describe_task_provider(self, task_type): + return {"model_name": "fake", "cost_tier": 1} + + class _VaryingJudgment: + def __init__(self, title): + self._title = title + + def to_dict(self): + rec_map = { + "GoodPaper": ("must_read", 4.5), + "MediocreWork": ("worth_reading", 3.7), + "WeakPaper": ("skip", 1.8), + } + rec, overall = rec_map.get(self._title, ("skip", 1.0)) + return { + "relevance": {"score": 4, "rationale": ""}, + "novelty": {"score": 3, "rationale": ""}, + "rigor": {"score": 3, "rationale": ""}, + "impact": {"score": 3, "rationale": ""}, + "clarity": {"score": 3, "rationale": ""}, + "overall": overall, + "one_line_summary": f"summary of {self._title}", + "recommendation": rec, + "judge_model": "fake", + "judge_cost_tier": 1, + } + + class _FakeJudge: + def __init__(self, llm_service=None): + pass + + def judge_single(self, *, paper, query): + return _VaryingJudgment(paper.get("title", "")) + + def judge_with_calibration(self, *, paper, query, n_runs=1): + return _VaryingJudgment(paper.get("title", "")) + + monkeypatch.setattr(paperscool_route, "get_llm_service", lambda: _FakeLLMService()) + monkeypatch.setattr(paperscool_route, "PaperJudge", _FakeJudge) + + with TestClient(api_main.app) as client: + resp = client.post( + "/api/research/paperscool/daily", + json={ + "queries": ["ICL压缩"], + "enable_llm_analysis": True, + "llm_features": ["summary", "trends"], + "enable_judge": True, + "judge_max_items_per_query": 10, + }, + ) + + assert resp.status_code == 200 + events = _parse_sse_events(resp.text) + types = [e.get("type") for e in events] + + # Full pipeline phases + assert "search_done" in types + assert "report_built" in types + assert "llm_summary" in types + assert "trend" in types + assert "llm_done" in types + assert "judge" in types + assert "judge_done" in types + assert "filter_done" in types + assert "result" in types + + # Filter keeps must_read + worth_reading, removes skip + filter_event = next(e for e in events if e.get("type") == "filter_done") + assert filter_event["data"]["total_after"] == 2 # GoodPaper + MediocreWork + assert filter_event["data"]["removed_count"] == 1 # WeakPaper + + # Final report has 2 papers + result_event = next(e for e in events if e.get("type") == "result") + final_items = result_event["data"]["report"]["queries"][0]["top_items"] + assert len(final_items) == 2 + final_titles = {item["title"] for item in final_items} + assert final_titles == {"GoodPaper", "MediocreWork"} + + # LLM analysis present + assert result_event["data"]["report"]["llm_analysis"]["enabled"] is True + + # Filter metadata in report + assert result_event["data"]["report"]["filter"]["enabled"] is True + + +def test_dailypaper_sync_path_no_llm_no_judge(monkeypatch): + """When no LLM/Judge, endpoint returns sync JSON (not SSE).""" + monkeypatch.setattr(paperscool_route, "PapersCoolTopicSearchWorkflow", _FakeWorkflow) + + with TestClient(api_main.app) as client: + resp = client.post( + "/api/research/paperscool/daily", + json={ + "queries": ["ICL压缩"], + "sources": ["papers_cool"], + "branches": ["arxiv", "venue"], + }, + ) + + assert resp.status_code == 200 + # Should be JSON, not SSE + payload = resp.json() + assert "report" in payload + assert payload["report"]["stats"]["unique_items"] == 1 + # No filter block in sync path + assert "filter" not in payload["report"] diff --git a/web/src/app/api/research/paperscool/daily/route.ts b/web/src/app/api/research/paperscool/daily/route.ts index a59a5f4..f38c39b 100644 --- a/web/src/app/api/research/paperscool/daily/route.ts +++ b/web/src/app/api/research/paperscool/daily/route.ts @@ -1,7 +1,50 @@ export const runtime = "nodejs" -import { apiBaseUrl, proxyJson } from "../../_base" +import { apiBaseUrl } from "../../_base" export async function POST(req: Request) { - return proxyJson(req, `${apiBaseUrl()}/api/research/paperscool/daily`, "POST") + const body = await req.text() + const contentType = req.headers.get("content-type") || "application/json" + + let upstream: Response + try { + upstream = await fetch(`${apiBaseUrl()}/api/research/paperscool/daily`, { + method: "POST", + headers: { + "Content-Type": contentType, + Accept: "text/event-stream, application/json", + }, + body, + }) + } catch (error) { + const detail = error instanceof Error ? error.message : String(error) + return Response.json( + { detail: "Upstream API unreachable", error: detail }, + { status: 502 }, + ) + } + + const upstreamContentType = upstream.headers.get("content-type") || "" + + // SSE stream path — pipe through without buffering + if (upstreamContentType.includes("text/event-stream")) { + return new Response(upstream.body, { + status: upstream.status, + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, + }) + } + + // JSON fallback (fast path when no LLM/Judge) + const text = await upstream.text() + return new Response(text, { + status: upstream.status, + headers: { + "Content-Type": upstreamContentType || "application/json", + "Cache-Control": "no-cache", + }, + }) } diff --git a/web/src/components/research/TopicWorkflowDashboard.tsx b/web/src/components/research/TopicWorkflowDashboard.tsx index 7ed5297..043ca2f 100644 --- a/web/src/components/research/TopicWorkflowDashboard.tsx +++ b/web/src/components/research/TopicWorkflowDashboard.tsx @@ -1,6 +1,6 @@ "use client" -import { useMemo, useState } from "react" +import { useCallback, useMemo, useRef, useState } from "react" import Markdown from "react-markdown" import remarkGfm from "remark-gfm" import { @@ -9,6 +9,7 @@ import { ChevronRightIcon, FilterIcon, Loader2Icon, + MailIcon, PlusIcon, PlayIcon, SettingsIcon, @@ -79,6 +80,20 @@ type SearchItem = { judge?: JudgeResult } +type RepoRow = { + title: string + query?: string + paper_url?: string + repo_url: string + github?: { + ok?: boolean + stars?: number + language?: string + updated_at?: string + error?: string + } +} + type StepStatus = "pending" | "running" | "done" | "error" | "skipped" /* ── Helpers ──────────────────────────────────────────── */ @@ -205,6 +220,96 @@ function buildDagStatuses(args: { return statuses } +/* ── Stream Progress ─────────────────────────────────── */ + +type StreamPhase = "idle" | "search" | "build" | "llm" | "judge" | "filter" | "save" | "notify" | "done" | "error" + +const PHASE_LABELS: Record = { + idle: "Idle", + search: "Searching papers", + build: "Building report", + llm: "LLM enrichment", + judge: "Judge scoring", + filter: "Filtering papers", + save: "Saving", + notify: "Sending notifications", + done: "Done", + error: "Error", +} + +const PHASE_ORDER: StreamPhase[] = ["search", "build", "llm", "judge", "filter", "save", "notify", "done"] + +function StreamProgressCard({ + streamPhase, + streamLog, + streamProgress, + startTime, +}: { + streamPhase: StreamPhase + streamLog: string[] + streamProgress: { done: number; total: number } + startTime: number | null +}) { + const elapsed = startTime ? Math.round((Date.now() - startTime) / 1000) : 0 + const currentIdx = PHASE_ORDER.indexOf(streamPhase) + const pct = streamProgress.total > 0 + ? Math.round((streamProgress.done / streamProgress.total) * 100) + : currentIdx >= 0 + ? Math.round(((currentIdx + 0.5) / PHASE_ORDER.length) * 100) + : 0 + + return ( + + +
+
+ + {PHASE_LABELS[streamPhase] || streamPhase} +
+
+ {streamProgress.total > 0 && ( + {streamProgress.done}/{streamProgress.total} + )} + {elapsed > 0 && {elapsed}s} +
+
+ +
+ {PHASE_ORDER.slice(0, -1).map((p) => { + const idx = PHASE_ORDER.indexOf(p) + const status = idx < currentIdx ? "done" : idx === currentIdx ? "active" : "pending" + return ( +
+
+ + {PHASE_LABELS[p]} + +
+ ) + })} +
+ {streamLog.length > 0 && ( + +
+ {streamLog.slice(-20).map((line, idx) => ( +
{line}
+ ))} +
+
+ )} + + + ) +} + /* ── Paper Card ───────────────────────────────────────── */ function PaperCard({ item, query, onOpenDetail }: { item: SearchItem; query?: string; onOpenDetail: (item: SearchItem) => void }) { @@ -341,6 +446,7 @@ function ConfigSheetBody(props: { useVenue: boolean; setUseVenue: (v: boolean) => void usePapersCool: boolean; setUsePapersCool: (v: boolean) => void useArxivApi: boolean; setUseArxivApi: (v: boolean) => void + useHFDaily: boolean; setUseHFDaily: (v: boolean) => void enableLLM: boolean; setEnableLLM: (v: boolean) => void useSummary: boolean; setUseSummary: (v: boolean) => void useTrends: boolean; setUseTrends: (v: boolean) => void @@ -350,16 +456,19 @@ function ConfigSheetBody(props: { judgeRuns: number; setJudgeRuns: (v: number) => void judgeMaxItems: number; setJudgeMaxItems: (v: number) => void judgeTokenBudget: number; setJudgeTokenBudget: (v: number) => void + notifyEmail: string; setNotifyEmail: (v: string) => void + notifyEnabled: boolean; setNotifyEnabled: (v: boolean) => void }) { const { queryItems, setQueryItems, topK, setTopK, topN, setTopN, showPerBranch, setShowPerBranch, saveDaily, setSaveDaily, outputDir, setOutputDir, useArxiv, setUseArxiv, useVenue, setUseVenue, - usePapersCool, setUsePapersCool, useArxivApi, setUseArxivApi, enableLLM, setEnableLLM, + usePapersCool, setUsePapersCool, useArxivApi, setUseArxivApi, useHFDaily, setUseHFDaily, enableLLM, setEnableLLM, useSummary, setUseSummary, useTrends, setUseTrends, useInsight, setUseInsight, useRelevance, setUseRelevance, enableJudge, setEnableJudge, judgeRuns, setJudgeRuns, judgeMaxItems, setJudgeMaxItems, judgeTokenBudget, setJudgeTokenBudget, + notifyEmail, setNotifyEmail, notifyEnabled, setNotifyEnabled, } = props const updateQuery = (idx: number, value: string) => { @@ -404,6 +513,7 @@ function ConfigSheetBody(props: {
+
@@ -450,11 +560,37 @@ function ConfigSheetBody(props: { {enableJudge && (
setJudgeRuns(Number(e.target.value || 1))} className="h-8 text-sm" />
-
setJudgeMaxItems(Number(e.target.value || 5))} className="h-8 text-sm" />
+
setJudgeMaxItems(Number(e.target.value || 20))} className="h-8 text-sm" />
setJudgeTokenBudget(Number(e.target.value || 0))} className="h-8 text-sm" />
)} + + + +
+ + {notifyEnabled && ( +
+
+ + setNotifyEmail(e.target.value)} + placeholder="you@example.com" + className="h-8 text-sm" + /> +
+

+ Requires PAPERBOT_NOTIFY_SMTP_* env vars on the backend. The email address here overrides PAPERBOT_NOTIFY_EMAIL_TO. +

+
+ )} +
) } @@ -462,43 +598,74 @@ function ConfigSheetBody(props: { /* ── Main Dashboard ───────────────────────────────────── */ export default function TopicWorkflowDashboard() { - /* Config state (local) */ + /* Config state (local — queries only) */ const [queryItems, setQueryItems] = useState([...DEFAULT_QUERIES]) - const [topK, setTopK] = useState(5) - const [topN, setTopN] = useState(10) - const [showPerBranch, setShowPerBranch] = useState(25) - const [saveDaily, setSaveDaily] = useState(false) - const [outputDir, setOutputDir] = useState("./reports/dailypaper") - const [useArxiv, setUseArxiv] = useState(true) - const [useVenue, setUseVenue] = useState(true) - const [usePapersCool, setUsePapersCool] = useState(true) - const [useArxivApi, setUseArxivApi] = useState(false) - const persistedDailyResult = useWorkflowStore.getState().dailyResult - const [enableLLM, setEnableLLM] = useState( - () => Boolean(persistedDailyResult?.report?.llm_analysis?.enabled), - ) - const [useSummary, setUseSummary] = useState(true) - const [useTrends, setUseTrends] = useState(true) - const [useInsight, setUseInsight] = useState(true) - const [useRelevance, setUseRelevance] = useState(false) - const [enableJudge, setEnableJudge] = useState( - () => Boolean(persistedDailyResult?.report?.judge?.enabled), - ) - const [judgeRuns, setJudgeRuns] = useState(1) - const [judgeMaxItems, setJudgeMaxItems] = useState(5) - const [judgeTokenBudget, setJudgeTokenBudget] = useState(0) /* Persisted state (zustand) */ const store = useWorkflowStore() - const { searchResult, dailyResult, phase, analyzeLog } = store + const { searchResult, dailyResult, phase, analyzeLog, notifyEmail, notifyEnabled, config } = store + const uc = store.updateConfig + + /* Derived config accessors — read from persisted store */ + const topK = config.topK + const setTopK = (v: number) => uc({ topK: v }) + const topN = config.topN + const setTopN = (v: number) => uc({ topN: v }) + const showPerBranch = config.showPerBranch + const setShowPerBranch = (v: number) => uc({ showPerBranch: v }) + const saveDaily = config.saveDaily + const setSaveDaily = (v: boolean) => uc({ saveDaily: v }) + const outputDir = config.outputDir + const setOutputDir = (v: string) => uc({ outputDir: v }) + const useArxiv = config.useArxiv + const setUseArxiv = (v: boolean) => uc({ useArxiv: v }) + const useVenue = config.useVenue + const setUseVenue = (v: boolean) => uc({ useVenue: v }) + const usePapersCool = config.usePapersCool + const setUsePapersCool = (v: boolean) => uc({ usePapersCool: v }) + const useArxivApi = config.useArxivApi + const setUseArxivApi = (v: boolean) => uc({ useArxivApi: v }) + const useHFDaily = config.useHFDaily + const setUseHFDaily = (v: boolean) => uc({ useHFDaily: v }) + const enableLLM = config.enableLLM + const setEnableLLM = (v: boolean) => uc({ enableLLM: v }) + const useSummary = config.useSummary + const setUseSummary = (v: boolean) => uc({ useSummary: v }) + const useTrends = config.useTrends + const setUseTrends = (v: boolean) => uc({ useTrends: v }) + const useInsight = config.useInsight + const setUseInsight = (v: boolean) => uc({ useInsight: v }) + const useRelevance = config.useRelevance + const setUseRelevance = (v: boolean) => uc({ useRelevance: v }) + const enableJudge = config.enableJudge + const setEnableJudge = (v: boolean) => uc({ enableJudge: v }) + const judgeRuns = config.judgeRuns + const setJudgeRuns = (v: number) => uc({ judgeRuns: v }) + const judgeMaxItems = config.judgeMaxItems + const setJudgeMaxItems = (v: number) => uc({ judgeMaxItems: v }) + const judgeTokenBudget = config.judgeTokenBudget + const setJudgeTokenBudget = (v: number) => uc({ judgeTokenBudget: v }) /* Transient loading state (not persisted) */ const [loadingSearch, setLoadingSearch] = useState(false) const [loadingDaily, setLoadingDaily] = useState(false) const [loadingAnalyze, setLoadingAnalyze] = useState(false) const [analyzeProgress, setAnalyzeProgress] = useState({ done: 0, total: 0 }) + const [loadingRepos, setLoadingRepos] = useState(false) + const [repoRows, setRepoRows] = useState([]) + const [repoError, setRepoError] = useState(null) const [error, setError] = useState(null) + /* Stream progress state */ + const [streamPhase, setStreamPhase] = useState("idle") + const [streamLog, setStreamLog] = useState([]) + const [streamProgress, setStreamProgress] = useState({ done: 0, total: 0 }) + const streamStartRef = useRef(null) + + const addStreamLog = useCallback((line: string) => { + setStreamLog((prev) => [...prev.slice(-50), line]) + }, []) + /* UI state */ const [dagOpen, setDagOpen] = useState(false) const [selectedPaper, setSelectedPaper] = useState(null) @@ -506,7 +673,14 @@ export default function TopicWorkflowDashboard() { const queries = useMemo(() => queryItems.map((q) => q.trim()).filter(Boolean), [queryItems]) const branches = useMemo(() => [useArxiv ? "arxiv" : "", useVenue ? "venue" : ""].filter(Boolean), [useArxiv, useVenue]) - const sources = useMemo(() => [usePapersCool ? "papers_cool" : "", useArxivApi ? "arxiv_api" : ""].filter(Boolean), [usePapersCool, useArxivApi]) + const sources = useMemo( + () => [ + usePapersCool ? "papers_cool" : "", + useArxivApi ? "arxiv_api" : "", + useHFDaily ? "hf_daily" : "", + ].filter(Boolean), + [usePapersCool, useArxivApi, useHFDaily], + ) const llmFeatures = useMemo( () => [useSummary ? "summary" : "", useTrends ? "trends" : "", useInsight ? "insight" : "", useRelevance ? "relevance" : ""].filter(Boolean), [useInsight, useRelevance, useSummary, useTrends], @@ -542,6 +716,8 @@ export default function TopicWorkflowDashboard() { [phase, error, enableLLM, enableJudge, hasSearchData, hasReportData, hasLLMData, hasJudgeData, schedulerDone], ) + const paperDataSource = dailyResult?.report?.queries ? "dailypaper" : searchResult?.items ? "search" : null + const allPapers = useMemo(() => { const items: Array = [] if (dailyResult?.report?.queries) { @@ -603,6 +779,7 @@ export default function TopicWorkflowDashboard() { /* Actions */ async function runTopicSearch() { setLoadingSearch(true); setError(null); store.setPhase("searching") + store.setDailyResult(null); store.clearAnalyzeLog() try { const res = await fetch("/api/research/paperscool/search", { method: "POST", headers: { "Content-Type": "application/json" }, @@ -615,23 +792,259 @@ export default function TopicWorkflowDashboard() { } catch (err) { setError(String(err)); store.setPhase("error") } finally { setLoadingSearch(false) } } - async function runDailyPaper() { - setLoadingDaily(true); setError(null); store.setPhase("reporting") + async function runDailyPaperStream() { + setLoadingDaily(true); setError(null); setRepoRows([]); setRepoError(null) + store.setPhase("reporting"); store.clearAnalyzeLog() + setStreamPhase("search"); setStreamLog([]); setStreamProgress({ done: 0, total: 0 }) + streamStartRef.current = Date.now() + + const requestBody = { + queries, sources, branches, top_k_per_query: topK, show_per_branch: showPerBranch, top_n: topN, + title: "DailyPaper Digest", formats: ["both"], save: saveDaily, output_dir: outputDir, + enable_llm_analysis: enableLLM, llm_features: llmFeatures, + enable_judge: enableJudge, judge_runs: judgeRuns, + judge_max_items_per_query: judgeMaxItems, judge_token_budget: judgeTokenBudget, + notify: notifyEnabled, + notify_channels: notifyEnabled ? ["email"] : [], + notify_email_to: notifyEnabled && notifyEmail.trim() ? [notifyEmail.trim()] : [], + } + + let streamFailed = false try { const res = await fetch("/api/research/paperscool/daily", { - method: "POST", headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - queries, sources, branches, top_k_per_query: topK, show_per_branch: showPerBranch, top_n: topN, - title: "DailyPaper Digest", formats: ["both"], save: saveDaily, output_dir: outputDir, - enable_llm_analysis: enableLLM, llm_features: llmFeatures, - enable_judge: enableJudge, judge_runs: judgeRuns, - judge_max_items_per_query: judgeMaxItems, judge_token_budget: judgeTokenBudget, - }), + method: "POST", + headers: { "Content-Type": "application/json", Accept: "text/event-stream, application/json" }, + body: JSON.stringify(requestBody), }) if (!res.ok) throw new Error(await res.text()) - store.setDailyResult(await res.json()) - store.setPhase("reported") - } catch (err) { setError(String(err)); store.setPhase("error") } finally { setLoadingDaily(false) } + + const contentType = res.headers.get("content-type") || "" + + // JSON fallback (fast path — no LLM/Judge) + if (!contentType.includes("text/event-stream")) { + const data = await res.json() + store.setDailyResult(data) + store.setPhase("reported") + setStreamPhase("done") + return + } + + // SSE streaming path + if (!res.body) throw new Error("No response body for SSE stream") + + for await (const event of readSSE(res.body)) { + if (event.type === "progress") { + const d = (event.data || {}) as { phase?: string; message?: string; total?: number } + const p = (d.phase || "search") as StreamPhase + setStreamPhase(p) + addStreamLog(`[${p}] ${d.message || "running"}`) + if (d.total && d.total > 0) { + setStreamProgress({ done: 0, total: d.total }) + } + continue + } + + if (event.type === "search_done") { + const d = (event.data || {}) as { items_count?: number; unique_items?: number } + addStreamLog(`search done: ${d.unique_items || 0} unique papers`) + setStreamPhase("build") + continue + } + + if (event.type === "report_built") { + const d = (event.data || {}) as { report?: DailyResult["report"]; queries_count?: number; global_top_count?: number } + addStreamLog(`report built: ${d.queries_count || 0} queries, ${d.global_top_count || 0} global top`) + if (d.report) { + store.setDailyResult({ report: d.report, markdown: "" }) + } + continue + } + + if (event.type === "llm_summary") { + const d = (event.data || {}) as { title?: string; query?: string; ai_summary?: string; done?: number; total?: number } + setStreamProgress({ done: d.done || 0, total: d.total || 0 }) + addStreamLog(`summary ${d.done || 0}/${d.total || 0}: ${d.title || "paper"}`) + if (d.query && d.title && d.ai_summary) { + store.updateDailyResult((prev) => { + const nextQueries = (prev.report.queries || []).map((query) => { + const queryName = query.normalized_query || query.raw_query || "" + if (queryName !== d.query) return query + const nextItems = (query.top_items || []).map((item) => { + if (item.title === d.title) return { ...item, ai_summary: d.ai_summary } + return item + }) + return { ...query, top_items: nextItems } + }) + return { ...prev, report: { ...prev.report, queries: nextQueries } } + }) + } + continue + } + + if (event.type === "trend") { + const d = (event.data || {}) as { query?: string; analysis?: string; done?: number; total?: number } + addStreamLog(`trend ${d.done || 0}/${d.total || 0}: ${d.query || "query"}`) + if (d.query && typeof d.analysis === "string") { + store.updateDailyResult((prev) => { + const llmAnalysis = prev.report.llm_analysis || { enabled: true, features: [], daily_insight: "", query_trends: [] } + const features = new Set(llmAnalysis.features || []) + features.add("trends") + const trendList = [...(llmAnalysis.query_trends || [])] + const existingIndex = trendList.findIndex((item) => item.query === d.query) + if (existingIndex >= 0) { + trendList[existingIndex] = { query: d.query!, analysis: d.analysis! } + } else { + trendList.push({ query: d.query!, analysis: d.analysis! }) + } + return { + ...prev, + report: { + ...prev.report, + llm_analysis: { ...llmAnalysis, enabled: true, features: Array.from(features), query_trends: trendList }, + }, + } + }) + } + continue + } + + if (event.type === "insight") { + const d = (event.data || {}) as { analysis?: string } + addStreamLog("insight generated") + if (typeof d.analysis === "string") { + store.updateDailyResult((prev) => { + const llmAnalysis = prev.report.llm_analysis || { enabled: true, features: [], daily_insight: "", query_trends: [] } + const features = new Set(llmAnalysis.features || []) + features.add("insight") + return { + ...prev, + report: { + ...prev.report, + llm_analysis: { ...llmAnalysis, enabled: true, features: Array.from(features), daily_insight: d.analysis! }, + }, + } + }) + } + continue + } + + if (event.type === "llm_done") { + const d = (event.data || {}) as { summaries_count?: number; trends_count?: number } + addStreamLog(`LLM done: ${d.summaries_count || 0} summaries, ${d.trends_count || 0} trends`) + setStreamPhase("judge") + continue + } + + if (event.type === "judge") { + const d = (event.data || {}) as { query?: string; title?: string; judge?: SearchItem["judge"]; done?: number; total?: number } + setStreamProgress({ done: d.done || 0, total: d.total || 0 }) + setStreamPhase("judge") + const rec = d.judge?.recommendation || "?" + const overall = d.judge?.overall != null ? Number(d.judge.overall).toFixed(2) : "?" + addStreamLog(`judge ${d.done || 0}/${d.total || 0}: [${rec} ${overall}] ${d.title || "paper"} (${d.query || ""})`) + if (d.query && d.title && d.judge) { + store.updateDailyResult((prev) => { + const sourceQueries = prev.report.queries || [] + let matched = false + const nextQueries = sourceQueries.map((query) => { + const queryName = query.normalized_query || query.raw_query || "" + if (queryName !== d.query) return query + const nextItems = (query.top_items || []).map((item) => { + if (item.title === d.title) { matched = true; return { ...item, judge: d.judge } } + return item + }) + return { ...query, top_items: nextItems } + }) + if (!matched) { + const fallbackQueries = nextQueries.map((query) => { + if (matched) return query + const nextItems = (query.top_items || []).map((item) => { + if (!matched && item.title === d.title) { matched = true; return { ...item, judge: d.judge } } + return item + }) + return { ...query, top_items: nextItems } + }) + return { ...prev, report: { ...prev.report, queries: fallbackQueries } } + } + return { ...prev, report: { ...prev.report, queries: nextQueries } } + }) + } + continue + } + + if (event.type === "judge_done") { + const d = (event.data || {}) as DailyResult["report"]["judge"] + store.updateDailyResult((prev) => ({ + ...prev, + report: { ...prev.report, judge: d || prev.report.judge }, + })) + addStreamLog("judge scoring complete") + continue + } + + if (event.type === "filter_done") { + const d = (event.data || {}) as { + total_before?: number + total_after?: number + removed_count?: number + log?: Array<{ query?: string; title?: string; recommendation?: string; overall?: number; action?: string }> + } + setStreamPhase("filter") + addStreamLog(`filter: ${d.total_before || 0} papers -> ${d.total_after || 0} kept, ${d.removed_count || 0} removed`) + if (d.log) { + for (const entry of d.log) { + addStreamLog(` removed [${entry.recommendation || "?"}] ${entry.title || "?"} (${entry.query || ""})`) + } + } + // Update the store with filtered report — the next "result" event will have the final state + // but we can also re-fetch queries from the filter event if needed + continue + } + + if (event.type === "result") { + const d = (event.data || {}) as { + report?: DailyResult["report"] + markdown?: string + markdown_path?: string | null + json_path?: string | null + notify_result?: Record | null + } + if (d.report) { + store.setDailyResult({ + report: d.report, + markdown: typeof d.markdown === "string" ? d.markdown : "", + markdown_path: d.markdown_path, + json_path: d.json_path, + }) + } + setStreamPhase("done") + addStreamLog("stream complete") + continue + } + + if (event.type === "error") { + const d = (event.data || {}) as { message?: string; detail?: string } + const msg = event.message || d.message || d.detail || "Unknown stream error" + addStreamLog(`[error] ${msg}`) + setError(`DailyPaper failed: ${msg}`) + streamFailed = true + setStreamPhase("error") + store.setPhase("error") + break + } + } + if (!streamFailed) { + store.setPhase("reported") + } + } catch (err) { + streamFailed = true + setError(String(err)) + setStreamPhase("error") + store.setPhase("error") + } finally { + setLoadingDaily(false) + streamStartRef.current = null + } } async function runAnalyzeStream() { @@ -642,6 +1055,8 @@ export default function TopicWorkflowDashboard() { if (!runJudge && !runTrends && !runInsight) { setError("Enable Judge, LLM trends, or LLM insight before analyzing."); return } setLoadingAnalyze(true); setError(null); store.clearAnalyzeLog(); setAnalyzeProgress({ done: 0, total: 0 }); store.setPhase("reporting") + setStreamPhase("idle"); setStreamLog([]); setStreamProgress({ done: 0, total: 0 }) + streamStartRef.current = Date.now() store.addAnalyzeLog( `[start] run_judge=${runJudge} run_trends=${runTrends} run_insight=${runInsight} llm_enabled=${enableLLM} judge_enabled=${enableJudge}`, ) @@ -649,6 +1064,7 @@ export default function TopicWorkflowDashboard() { store.addAnalyzeLog("[hint] Analyze stream currently supports trends and daily insight.") } + let streamFailed = false try { const res = await fetch("/api/research/paperscool/analyze", { method: "POST", headers: { "Content-Type": "application/json" }, @@ -841,30 +1257,62 @@ export default function TopicWorkflowDashboard() { const msg = event.message || d.message || d.detail || "Unknown analyze stream error" store.addAnalyzeLog(`[error] ${msg}`) setError(`Analyze failed: ${msg}`) + streamFailed = true store.setPhase("error") break } } - if (store.phase !== "error") { + if (!streamFailed) { store.setPhase("reported") } - } catch (err) { setError(String(err)); store.setPhase("error") } finally { setLoadingAnalyze(false) } + } catch (err) { + streamFailed = true + setError(String(err)) + store.setPhase("error") + } finally { + setLoadingAnalyze(false) + streamStartRef.current = null + } + } + + async function runRepoEnrichment() { + if (!dailyResult?.report) { + setRepoError("Generate DailyPaper first.") + return + } + + setLoadingRepos(true) + setRepoError(null) + try { + const res = await fetch("/api/research/paperscool/repos", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + report: dailyResult.report, + max_items: 500, + include_github_api: true, + }), + }) + if (!res.ok) throw new Error(await res.text()) + const payload = await res.json() as { repos?: RepoRow[] } + setRepoRows(payload.repos || []) + } catch (err) { + setRepoError(String(err)) + } finally { + setLoadingRepos(false) + } } const isLoading = loadingSearch || loadingDaily || loadingAnalyze const canSearch = queries.length > 0 && branches.length > 0 && sources.length > 0 const loadingLabel = loadingSearch ? "Searching sources..." - : loadingDaily - ? "Generating DailyPaper report and enrichment..." - : "Running judge/trend/insight enrichment..." + : "Running judge/trend/insight enrichment..." const loadingHint = loadingAnalyze && analyzeProgress.total > 0 ? `${analyzeProgress.done}/${analyzeProgress.total} judged` - : loadingDaily - ? "Fetching, ranking, and composing report" - : loadingSearch - ? "Multi-query retrieval in progress" - : "Waiting for LLM events" + : loadingSearch + ? "Multi-query retrieval in progress" + : "Waiting for LLM events" return (
@@ -883,7 +1331,7 @@ export default function TopicWorkflowDashboard() { -
@@ -921,7 +1371,18 @@ export default function TopicWorkflowDashboard() { {error &&
{error}
} - {isLoading && ( + {/* Stream progress card for DailyPaper SSE */} + {loadingDaily && streamPhase !== "idle" && ( + + )} + + {/* Generic loading card for search / analyze */} + {isLoading && !loadingDaily && !loadingAnalyze && (
@@ -994,7 +1455,14 @@ export default function TopicWorkflowDashboard() { {/* Papers */}
-

{allPapers.length} papers

+
+

{allPapers.length} papers

+ {paperDataSource && ( + + {paperDataSource === "dailypaper" ? "DailyPaper" : "Search"} + + )} +
{ setEmail(e.target.value); setStatus("idle") }} + placeholder="subscriber@example.com" + className="h-8 text-sm" + onKeyDown={(e) => e.key === "Enter" && handleSubscribe()} + /> + +
+ {message && ( +

{message}

+ )} + {subCount && ( +

{subCount.active} active subscriber{subCount.active !== 1 ? "s" : ""}

+ )}
) } @@ -604,6 +684,7 @@ export default function TopicWorkflowDashboard() { /* Persisted state (zustand) */ const store = useWorkflowStore() const { searchResult, dailyResult, phase, analyzeLog, notifyEmail, notifyEnabled, config } = store + const resendEnabled = store.resendEnabled const uc = store.updateConfig /* Derived config accessors — read from persisted store */ @@ -804,8 +885,8 @@ export default function TopicWorkflowDashboard() { enable_llm_analysis: enableLLM, llm_features: llmFeatures, enable_judge: enableJudge, judge_runs: judgeRuns, judge_max_items_per_query: judgeMaxItems, judge_token_budget: judgeTokenBudget, - notify: notifyEnabled, - notify_channels: notifyEnabled ? ["email"] : [], + notify: notifyEnabled || resendEnabled, + notify_channels: [...(notifyEnabled ? ["email"] : []), ...(resendEnabled ? ["resend"] : [])], notify_email_to: notifyEnabled && notifyEmail.trim() ? [notifyEmail.trim()] : [], } @@ -1362,6 +1443,7 @@ export default function TopicWorkflowDashboard() { judgeMaxItems, setJudgeMaxItems, judgeTokenBudget, setJudgeTokenBudget, notifyEmail, setNotifyEmail: store.setNotifyEmail, notifyEnabled, setNotifyEnabled: store.setNotifyEnabled, + resendEnabled, setResendEnabled: store.setResendEnabled, }} />
diff --git a/web/src/lib/stores/workflow-store.ts b/web/src/lib/stores/workflow-store.ts index b305f0b..6e59be2 100644 --- a/web/src/lib/stores/workflow-store.ts +++ b/web/src/lib/stores/workflow-store.ts @@ -141,6 +141,7 @@ interface WorkflowState { lastUpdated: string | null notifyEmail: string notifyEnabled: boolean + resendEnabled: boolean config: WorkflowConfig /* Actions */ @@ -152,6 +153,7 @@ interface WorkflowState { clearAnalyzeLog: () => void setNotifyEmail: (email: string) => void setNotifyEnabled: (enabled: boolean) => void + setResendEnabled: (enabled: boolean) => void updateConfig: (patch: Partial) => void clearAll: () => void } @@ -166,6 +168,7 @@ export const useWorkflowStore = create()( lastUpdated: null, notifyEmail: "", notifyEnabled: false, + resendEnabled: false, config: { ...DEFAULT_CONFIG }, setSearchResult: (result) => @@ -192,6 +195,8 @@ export const useWorkflowStore = create()( setNotifyEnabled: (enabled) => set({ notifyEnabled: enabled }), + setResendEnabled: (enabled) => set({ resendEnabled: enabled }), + updateConfig: (patch) => set((s) => ({ config: { ...s.config, ...patch } })), @@ -214,6 +219,7 @@ export const useWorkflowStore = create()( lastUpdated: state.lastUpdated, notifyEmail: state.notifyEmail, notifyEnabled: state.notifyEnabled, + resendEnabled: state.resendEnabled, config: state.config, }), }, From 0013d20b5a27b310881c1efeb1a8010b5c8d70cf Mon Sep 17 00:00:00 2001 From: jerry <1772030600@qq.com> Date: Wed, 11 Feb 2026 13:19:36 +0800 Subject: [PATCH 5/7] fix: address PR review security and correctness issues Security fixes: - Sanitize output_dir to prevent path traversal (restrict to ./reports/) - Validate email addresses to prevent header injection (reject \r\n) - Escape HTML in unsubscribe error page to prevent XSS - Mask email PII in Resend error logs - Guard against empty unsub tokens producing broken links Concurrency fix: - Eliminate shared self.config.email_to mutation in push_dailypaper; pass effective recipients as local variable to _send_email instead React fix: - Replace useState(() => fetchCount()) with useEffect (side effect during render is a React anti-pattern, runs twice in Strict Mode) Frontend alignment: - Add missing "insight" phase to StreamPhase type and PHASE_ORDER - Add max={200} to judgeMaxItems input to match backend limit Performance: - Move queries list creation outside judge scoring loop Compatibility: - Replace str.removesuffix() with endswith check for Python 3.8+ Infrastructure: - Lazy-init SubscriberStore via lru_cache instead of module-level - Scope create_all to newsletter_subscribers table only --- src/paperbot/api/routes/newsletter.py | 15 +++-- src/paperbot/api/routes/paperscool.py | 44 +++++++++++--- .../services/daily_push_service.py | 58 ++++++++++--------- .../application/services/resend_service.py | 7 ++- .../infrastructure/stores/subscriber_store.py | 4 +- .../newsletter/unsubscribe/[token]/route.ts | 3 +- .../research/TopicWorkflowDashboard.tsx | 11 ++-- 7 files changed, 93 insertions(+), 49 deletions(-) diff --git a/src/paperbot/api/routes/newsletter.py b/src/paperbot/api/routes/newsletter.py index 4783b74..03e5e42 100644 --- a/src/paperbot/api/routes/newsletter.py +++ b/src/paperbot/api/routes/newsletter.py @@ -1,6 +1,7 @@ from __future__ import annotations import re +from functools import lru_cache from typing import Any, Dict from fastapi import APIRouter, HTTPException @@ -11,11 +12,15 @@ router = APIRouter() -_subscriber_store = SubscriberStore() - _EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$") +@lru_cache(maxsize=1) +def _get_subscriber_store() -> SubscriberStore: + """Lazy-init subscriber store on first use, not at import time.""" + return SubscriberStore() + + class SubscribeRequest(BaseModel): email: str = Field(..., min_length=3, max_length=256) @@ -32,7 +37,7 @@ def subscribe(req: SubscribeRequest): if not _EMAIL_RE.match(email): raise HTTPException(status_code=400, detail="Invalid email format") - result = _subscriber_store.add_subscriber(email) + result = _get_subscriber_store().add_subscriber(email) return SubscribeResponse( ok=True, email=result["email"], @@ -45,7 +50,7 @@ def unsubscribe(token: str): if not token or len(token) > 64: raise HTTPException(status_code=400, detail="Invalid token") - ok = _subscriber_store.remove_subscriber(token) + ok = _get_subscriber_store().remove_subscriber(token) if not ok: raise HTTPException(status_code=404, detail="Token not found") @@ -66,5 +71,5 @@ class SubscriberCountResponse(BaseModel): @router.get("/newsletter/subscribers", response_model=SubscriberCountResponse) def list_subscribers(): - counts = _subscriber_store.get_subscriber_count() + counts = _get_subscriber_store().get_subscriber_count() return SubscriberCountResponse(**counts) diff --git a/src/paperbot/api/routes/paperscool.py b/src/paperbot/api/routes/paperscool.py index e9d11dd..0c3f21f 100644 --- a/src/paperbot/api/routes/paperscool.py +++ b/src/paperbot/api/routes/paperscool.py @@ -33,6 +33,33 @@ router = APIRouter() +_ALLOWED_REPORT_BASE = os.path.abspath("./reports") + + +def _sanitize_output_dir(raw: str) -> str: + """Prevent path traversal — resolve and ensure output stays under ./reports/.""" + resolved = os.path.abspath(raw) + if not resolved.startswith(_ALLOWED_REPORT_BASE): + return os.path.join(_ALLOWED_REPORT_BASE, "dailypaper") + return resolved + + +_EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$") + + +def _validate_email_list(emails: List[str]) -> List[str]: + """Validate and sanitize email list — reject header injection attempts.""" + cleaned: List[str] = [] + for e in emails: + addr = (e or "").strip() + if not addr: + continue + if "\n" in addr or "\r" in addr: + continue + if _EMAIL_RE.match(addr): + cleaned.append(addr) + return cleaned + class PapersCoolSearchRequest(BaseModel): queries: List[str] = Field(default_factory=list) @@ -63,7 +90,7 @@ class DailyPaperRequest(BaseModel): top_n: int = Field(10, ge=1, le=200) formats: List[str] = Field(default_factory=lambda: ["both"]) save: bool = False - output_dir: str = "./reports/dailypaper" + output_dir: str = Field("./reports/dailypaper", description="Relative path under project root for saving reports") enable_llm_analysis: bool = False llm_features: List[str] = Field(default_factory=lambda: ["summary"]) enable_judge: bool = False @@ -273,11 +300,11 @@ async def _dailypaper_stream(req: DailyPaperRequest): }, ) + queries = list(report.get("queries") or []) for idx, row in enumerate(selected, start=1): query_index = int(row.get("query_index") or 0) item_index = int(row.get("item_index") or 0) - queries = list(report.get("queries") or []) if query_index >= len(queries): continue @@ -421,7 +448,7 @@ async def _dailypaper_stream(req: DailyPaperRequest): json_path = None notify_result: Optional[Dict[str, Any]] = None if req.save: - reporter = DailyPaperReporter(output_dir=req.output_dir) + reporter = DailyPaperReporter(output_dir=_sanitize_output_dir(req.output_dir)) artifacts = reporter.write( report=report, markdown=markdown, @@ -440,7 +467,7 @@ async def _dailypaper_stream(req: DailyPaperRequest): markdown_path=markdown_path, json_path=json_path, channels_override=req.notify_channels or None, - email_to_override=req.notify_email_to or None, + email_to_override=_validate_email_list(req.notify_email_to) or None, ) yield StreamEvent( @@ -502,7 +529,7 @@ def _sync_daily_report(req: DailyPaperRequest, cleaned_queries: List[str]): json_path = None notify_result: Optional[Dict[str, Any]] = None if req.save: - reporter = DailyPaperReporter(output_dir=req.output_dir) + reporter = DailyPaperReporter(output_dir=_sanitize_output_dir(req.output_dir)) artifacts = reporter.write( report=report, markdown=markdown, @@ -520,7 +547,7 @@ def _sync_daily_report(req: DailyPaperRequest, cleaned_queries: List[str]): markdown_path=markdown_path, json_path=json_path, channels_override=req.notify_channels or None, - email_to_override=req.notify_email_to or None, + email_to_override=_validate_email_list(req.notify_email_to) or None, ) return DailyPaperResponse( @@ -555,7 +582,8 @@ def _normalize_github_repo_url(raw_url: str | None) -> Optional[str]: return None owner, repo = match.group(1), match.group(2) - repo = repo.removesuffix(".git") + if repo.endswith(".git"): + repo = repo[:-4] return f"https://github.com/{owner}/{repo}" @@ -808,11 +836,11 @@ async def _paperscool_analyze_stream(req: PapersCoolAnalyzeRequest): }, ) + queries = list(report.get("queries") or []) for idx, row in enumerate(selected, start=1): query_index = int(row.get("query_index") or 0) item_index = int(row.get("item_index") or 0) - queries = list(report.get("queries") or []) if query_index >= len(queries): continue diff --git a/src/paperbot/application/services/daily_push_service.py b/src/paperbot/application/services/daily_push_service.py index fb2067c..3458cf5 100644 --- a/src/paperbot/application/services/daily_push_service.py +++ b/src/paperbot/application/services/daily_push_service.py @@ -104,12 +104,12 @@ def push_dailypaper( if not channels: return {"sent": False, "reason": "no channels configured", "channels": channels} - # Allow UI-provided email recipients to override env config - original_email_to = self.config.email_to + # Determine effective email recipients (local var, no shared state mutation) + effective_email_to = self.config.email_to if email_to_override: cleaned = [e.strip() for e in email_to_override if (e or "").strip()] if cleaned: - self.config.email_to = cleaned + effective_email_to = cleaned subject = self._build_subject(report) text = self._build_text( @@ -119,26 +119,26 @@ def push_dailypaper( results: Dict[str, Any] = {"sent": False, "channels": channels, "results": {}} any_success = False - try: - for channel in channels: - try: - if channel == "email": - self._send_email(subject=subject, body=text, html_body=html_body) - elif channel == "slack": - self._send_slack(subject=subject, body=text) - elif channel in {"dingtalk", "dingding"}: - self._send_dingtalk(subject=subject, body=text) - elif channel == "resend": - self._send_resend(report=report, markdown=markdown or text) - else: - raise ValueError(f"unsupported channel: {channel}") - results["results"][channel] = {"ok": True} - any_success = True - except Exception as exc: # pragma: no cover - runtime specific - logger.warning("Daily push failed channel=%s err=%s", channel, exc) - results["results"][channel] = {"ok": False, "error": str(exc)} - finally: - self.config.email_to = original_email_to + for channel in channels: + try: + if channel == "email": + self._send_email( + subject=subject, body=text, html_body=html_body, + recipients=effective_email_to, + ) + elif channel == "slack": + self._send_slack(subject=subject, body=text) + elif channel in {"dingtalk", "dingding"}: + self._send_dingtalk(subject=subject, body=text) + elif channel == "resend": + self._send_resend(report=report, markdown=markdown or text) + else: + raise ValueError(f"unsupported channel: {channel}") + results["results"][channel] = {"ok": True} + any_success = True + except Exception as exc: # pragma: no cover - runtime specific + logger.warning("Daily push failed channel=%s err=%s", channel, exc) + results["results"][channel] = {"ok": False, "error": str(exc)} results["sent"] = any_success return results @@ -177,10 +177,14 @@ def _build_html(self, report: Dict[str, Any]) -> str: return build_digest_html(report) - def _send_email(self, *, subject: str, body: str, html_body: str = "") -> None: + def _send_email( + self, *, subject: str, body: str, html_body: str = "", + recipients: Optional[List[str]] = None, + ) -> None: if not self.config.smtp_host: raise ValueError("PAPERBOT_NOTIFY_SMTP_HOST is required for email notifications") - if not self.config.email_to: + email_to = recipients or self.config.email_to + if not email_to: raise ValueError("PAPERBOT_NOTIFY_EMAIL_TO is required for email notifications") from_addr = self.config.email_from or self.config.smtp_username @@ -190,7 +194,7 @@ def _send_email(self, *, subject: str, body: str, html_body: str = "") -> None: msg = MIMEMultipart("alternative") msg["Subject"] = subject msg["From"] = formataddr(("PaperBot", from_addr)) - msg["To"] = ", ".join(self.config.email_to) + msg["To"] = ", ".join(email_to) msg.attach(MIMEText(body, _subtype="plain", _charset="utf-8")) if html_body: @@ -216,7 +220,7 @@ def _send_email(self, *, subject: str, body: str, html_body: str = "") -> None: server.ehlo() if self.config.smtp_username: server.login(self.config.smtp_username, self.config.smtp_password) - server.sendmail(from_addr, self.config.email_to, msg.as_string()) + server.sendmail(from_addr, email_to, msg.as_string()) def _send_slack(self, *, subject: str, body: str) -> None: url = self.config.slack_webhook_url diff --git a/src/paperbot/application/services/resend_service.py b/src/paperbot/application/services/resend_service.py index 9893d37..912120f 100644 --- a/src/paperbot/application/services/resend_service.py +++ b/src/paperbot/application/services/resend_service.py @@ -71,6 +71,10 @@ def send_digest( for email_addr in to: token = unsub_tokens.get(email_addr, "") + if not token: + logger.warning("Resend: no unsub token for subscriber, skipping") + results[email_addr] = {"ok": False, "error": "missing_unsub_token"} + continue unsub_link = f"{self.unsub_base_url}/api/newsletter/unsubscribe/{token}" html_body = self._render_html(report, markdown, unsub_link) text = self._render_text(report, markdown, unsub_link) @@ -80,7 +84,8 @@ def send_digest( ) results[email_addr] = {"ok": True, "id": r.get("id")} except Exception as e: - logger.warning("Resend failed for %s: %s", email_addr, e) + masked = email_addr[:2] + "***" + email_addr[email_addr.index("@"):] if "@" in email_addr else "***" + logger.warning("Resend failed for %s: %s", masked, e) results[email_addr] = {"ok": False, "error": str(e)} return results diff --git a/src/paperbot/infrastructure/stores/subscriber_store.py b/src/paperbot/infrastructure/stores/subscriber_store.py index c77816a..ec399b7 100644 --- a/src/paperbot/infrastructure/stores/subscriber_store.py +++ b/src/paperbot/infrastructure/stores/subscriber_store.py @@ -6,7 +6,7 @@ from sqlalchemy import select -from paperbot.infrastructure.stores.models import Base, NewsletterSubscriberModel +from paperbot.infrastructure.stores.models import NewsletterSubscriberModel from paperbot.infrastructure.stores.sqlalchemy_db import SessionProvider, get_db_url @@ -21,7 +21,7 @@ def __init__(self, db_url: Optional[str] = None, *, auto_create_schema: bool = T self.db_url = db_url or get_db_url() self._provider = SessionProvider(self.db_url) if auto_create_schema: - Base.metadata.create_all(self._provider.engine) + NewsletterSubscriberModel.__table__.create(self._provider.engine, checkfirst=True) def add_subscriber(self, email: str) -> Dict[str, Any]: email = email.strip().lower() diff --git a/web/src/app/api/newsletter/unsubscribe/[token]/route.ts b/web/src/app/api/newsletter/unsubscribe/[token]/route.ts index d64d32d..c644c2b 100644 --- a/web/src/app/api/newsletter/unsubscribe/[token]/route.ts +++ b/web/src/app/api/newsletter/unsubscribe/[token]/route.ts @@ -20,8 +20,9 @@ export async function GET( }) } catch (error) { const detail = error instanceof Error ? error.message : String(error) + const escaped = detail.replace(/&/g, "&").replace(//g, ">").replace(/"/g, """) return new Response( - `

Error

${detail}

`, + `

Error

${escaped}

`, { status: 502, headers: { "Content-Type": "text/html" } }, ) } diff --git a/web/src/components/research/TopicWorkflowDashboard.tsx b/web/src/components/research/TopicWorkflowDashboard.tsx index 61ca2f0..b4e9aa1 100644 --- a/web/src/components/research/TopicWorkflowDashboard.tsx +++ b/web/src/components/research/TopicWorkflowDashboard.tsx @@ -1,6 +1,6 @@ "use client" -import { useCallback, useMemo, useRef, useState } from "react" +import { useCallback, useEffect, useMemo, useRef, useState } from "react" import Markdown from "react-markdown" import remarkGfm from "remark-gfm" import { @@ -222,13 +222,14 @@ function buildDagStatuses(args: { /* ── Stream Progress ─────────────────────────────────── */ -type StreamPhase = "idle" | "search" | "build" | "llm" | "judge" | "filter" | "save" | "notify" | "done" | "error" +type StreamPhase = "idle" | "search" | "build" | "llm" | "insight" | "judge" | "filter" | "save" | "notify" | "done" | "error" const PHASE_LABELS: Record = { idle: "Idle", search: "Searching papers", build: "Building report", llm: "LLM enrichment", + insight: "Generating insights", judge: "Judge scoring", filter: "Filtering papers", save: "Saving", @@ -237,7 +238,7 @@ const PHASE_LABELS: Record = { error: "Error", } -const PHASE_ORDER: StreamPhase[] = ["search", "build", "llm", "judge", "filter", "save", "notify", "done"] +const PHASE_ORDER: StreamPhase[] = ["search", "build", "llm", "insight", "judge", "filter", "save", "notify", "done"] function StreamProgressCard({ streamPhase, @@ -562,7 +563,7 @@ function ConfigSheetBody(props: { {enableJudge && (
setJudgeRuns(Number(e.target.value || 1))} className="h-8 text-sm" />
-
setJudgeMaxItems(Number(e.target.value || 20))} className="h-8 text-sm" />
+
setJudgeMaxItems(Number(e.target.value || 20))} className="h-8 text-sm" />
setJudgeTokenBudget(Number(e.target.value || 0))} className="h-8 text-sm" />
)} @@ -627,7 +628,7 @@ function NewsletterSubscribeWidget() { } catch { /* ignore */ } }, []) - useState(() => { fetchCount() }) + useEffect(() => { fetchCount() }, [fetchCount]) async function handleSubscribe() { if (!email.trim()) return From f41f6eb0f0d68d7d2b36e41ec596f69dbe02608d Mon Sep 17 00:00:00 2001 From: jerry <1772030600@qq.com> Date: Wed, 11 Feb 2026 13:27:02 +0800 Subject: [PATCH 6/7] chore: add TODO comments for deferred PR review items MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mark low-priority / large-scope issues from PR #25 review as TODOs: - GitHub API sequential calls → async rewrite needed - GDPR compliance for subscriber email storage - Scholar mock fallback hardcoded to Dawn Song - Markdown code block language specifiers (fix README inline) - fetchScholarDetails test coverage - Judge update logic refactoring - scalar_one_or_none MultipleResultsFound edge case - Upsert overwriting empty authors/keywords - judge_cost_tier int() ValueError guard (fixed inline) --- README.md | 2 +- src/paperbot/api/routes/paperscool.py | 3 +++ src/paperbot/infrastructure/stores/models.py | 7 ++++++- .../infrastructure/stores/paper_store.py | 20 ++++++++++++++----- .../infrastructure/stores/research_store.py | 2 ++ .../research/TopicWorkflowDashboard.tsx | 3 +++ web/src/lib/api.ts | 4 ++++ 7 files changed, 34 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 968287f..9562189 100644 --- a/README.md +++ b/README.md @@ -86,7 +86,7 @@ Input Queries ──→ ├─── arXiv API (relevance sort) 当启用 LLM 分析或 Judge 评分时,`/daily` 端点返回 SSE 流式响应,前端实时显示每个阶段的进度: -``` +```text Search → Build Report → LLM Enrichment → Judge Scoring → Filter → Save → Notify → Result │ │ │ │ │ │ │ │ │ └─ 移除 skip/skim 论文 diff --git a/src/paperbot/api/routes/paperscool.py b/src/paperbot/api/routes/paperscool.py index 0c3f21f..aa61198 100644 --- a/src/paperbot/api/routes/paperscool.py +++ b/src/paperbot/api/routes/paperscool.py @@ -714,6 +714,9 @@ def enrich_papers_with_repo_data(req: PapersCoolReposRequest): selected = deduped[: max(1, int(req.max_items))] token = os.getenv("GITHUB_TOKEN") or os.getenv("GH_TOKEN") + # TODO: GitHub API calls are sequential — switch to concurrent.futures or + # async httpx with bounded concurrency to avoid multi-minute requests and + # rate-limit exhaustion (60 req/hr unauthenticated, 5000 authenticated). repos: List[Dict[str, Any]] = [] for item in selected: repo_url = _extract_repo_url_from_paper(item) diff --git a/src/paperbot/infrastructure/stores/models.py b/src/paperbot/infrastructure/stores/models.py index 92a9e69..726f29b 100644 --- a/src/paperbot/infrastructure/stores/models.py +++ b/src/paperbot/infrastructure/stores/models.py @@ -635,7 +635,12 @@ class ResearchTrackEmbeddingModel(Base): class NewsletterSubscriberModel(Base): - """Email newsletter subscriber for DailyPaper digest delivery.""" + """Email newsletter subscriber for DailyPaper digest delivery. + + TODO(GDPR): email stored as plaintext — consider encryption-at-rest or + hashing. Add a hard-delete method for GDPR/CCPA right-to-erasure (current + unsubscribe only sets status='unsubscribed', no row purge). + """ __tablename__ = "newsletter_subscribers" diff --git a/src/paperbot/infrastructure/stores/paper_store.py b/src/paperbot/infrastructure/stores/paper_store.py index 5e26327..e4f8c87 100644 --- a/src/paperbot/infrastructure/stores/paper_store.py +++ b/src/paperbot/infrastructure/stores/paper_store.py @@ -118,6 +118,9 @@ def upsert_paper( } with self._provider.session() as session: + # TODO: title+url fallback query uses scalar_one_or_none() which + # raises MultipleResultsFound if duplicates exist. Switch to + # .first() or add .limit(1) for safety. row = None if arxiv_id: row = session.execute( @@ -167,6 +170,10 @@ def upsert_paper( row.source = str(source or row.source or "papers_cool") row.venue = venue or row.venue or "" row.published_at = _as_utc(published_at) or _as_utc(row.published_at) + # TODO: unconditional set_authors/set_keywords/set_metadata may wipe + # existing data when new paper dict has empty values. Consider + # preserving existing values when incoming data is empty: + # row.set_authors(authors or row.get_authors()) row.set_authors(authors) row.set_keywords(keywords) row.set_metadata(metadata) @@ -268,11 +275,14 @@ def upsert_judge_scores_from_report(self, report: Dict[str, Any]) -> Dict[str, i row.recommendation = str(judge.get("recommendation") or "") row.one_line_summary = str(judge.get("one_line_summary") or "") row.judge_model = str(judge.get("judge_model") or "") - row.judge_cost_tier = ( - int(judge.get("judge_cost_tier")) - if judge.get("judge_cost_tier") is not None - else None - ) + try: + row.judge_cost_tier = ( + int(judge.get("judge_cost_tier")) + if judge.get("judge_cost_tier") is not None + else None + ) + except (ValueError, TypeError): + row.judge_cost_tier = None row.scored_at = scored_at row.metadata_json = "{}" diff --git a/src/paperbot/infrastructure/stores/research_store.py b/src/paperbot/infrastructure/stores/research_store.py index 06c5ddb..425724a 100644 --- a/src/paperbot/infrastructure/stores/research_store.py +++ b/src/paperbot/infrastructure/stores/research_store.py @@ -1092,6 +1092,8 @@ def _resolve_paper_ref_id( if row is not None: return int(row.id) + # TODO: scalar_one_or_none() can raise MultipleResultsFound if + # multiple papers share the same URL or title. Switch to .first(). if url_candidates: row = session.execute( select(PaperModel).where( diff --git a/web/src/components/research/TopicWorkflowDashboard.tsx b/web/src/components/research/TopicWorkflowDashboard.tsx index b4e9aa1..167bcaa 100644 --- a/web/src/components/research/TopicWorkflowDashboard.tsx +++ b/web/src/components/research/TopicWorkflowDashboard.tsx @@ -1024,6 +1024,9 @@ export default function TopicWorkflowDashboard() { const rec = d.judge?.recommendation || "?" const overall = d.judge?.overall != null ? Number(d.judge.overall).toFixed(2) : "?" addStreamLog(`judge ${d.done || 0}/${d.total || 0}: [${rec} ${overall}] ${d.title || "paper"} (${d.query || ""})`) + // TODO: refactor judge update — current nested map + matched flag is hard + // to follow. Use findIndex to locate target query+item, then apply a + // single immutable update. See PR #25 review for suggested approach. if (d.query && d.title && d.judge) { store.updateDailyResult((prev) => { const sourceQueries = prev.report.queries || [] diff --git a/web/src/lib/api.ts b/web/src/lib/api.ts index f8b6894..28af73b 100644 --- a/web/src/lib/api.ts +++ b/web/src/lib/api.ts @@ -207,6 +207,8 @@ export async function fetchPaperDetails(id: string): Promise { } } +// TODO: add unit tests for fetchScholarDetails — cover successful network+trends, +// partial responses, and both-null fallback path. export async function fetchScholarDetails(id: string): Promise { const scholarName = slugToName(id) @@ -237,6 +239,8 @@ export async function fetchScholarDetails(id: string): Promise { }), ]) + // TODO: mock fallback is hardcoded to Dawn Song — replace with generic + // placeholder or remove entirely once real scholar data is always available. // Fallback to mock data if the scholar is not configured in subscriptions yet. if (!network && !trends) { const papers = await fetchPapers() From f692fe5cd7991a21806620dc233777c74478a58d Mon Sep 17 00:00:00 2001 From: jerry <1772030600@qq.com> Date: Wed, 11 Feb 2026 14:45:44 +0800 Subject: [PATCH 7/7] fix(web): elapsed timer, stable log keys, and AbortController for SSE streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add useElapsed hook with setInterval to tick elapsed seconds every 1s (previously computed once per render, appeared frozen between SSE events) - Use stable keys for stream log entries (offset-based instead of sliding window index) to prevent React DOM remount/reorder on updates - Add AbortController to both runDailyPaperStream and runAnalyzeStream: abort previous stream on re-invocation, pass signal to fetch, clear ref in finally block — prevents concurrent/orphaned SSE consumers from corrupting state --- .../research/TopicWorkflowDashboard.tsx | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/web/src/components/research/TopicWorkflowDashboard.tsx b/web/src/components/research/TopicWorkflowDashboard.tsx index 167bcaa..b030bcb 100644 --- a/web/src/components/research/TopicWorkflowDashboard.tsx +++ b/web/src/components/research/TopicWorkflowDashboard.tsx @@ -240,6 +240,17 @@ const PHASE_LABELS: Record = { const PHASE_ORDER: StreamPhase[] = ["search", "build", "llm", "insight", "judge", "filter", "save", "notify", "done"] +function useElapsed(startTime: number | null) { + const [elapsed, setElapsed] = useState(0) + useEffect(() => { + if (!startTime) { setElapsed(0); return } + setElapsed(Math.round((Date.now() - startTime) / 1000)) + const id = setInterval(() => setElapsed(Math.round((Date.now() - startTime) / 1000)), 1000) + return () => clearInterval(id) + }, [startTime]) + return elapsed +} + function StreamProgressCard({ streamPhase, streamLog, @@ -251,7 +262,7 @@ function StreamProgressCard({ streamProgress: { done: number; total: number } startTime: number | null }) { - const elapsed = startTime ? Math.round((Date.now() - startTime) / 1000) : 0 + const elapsed = useElapsed(startTime) const currentIdx = PHASE_ORDER.indexOf(streamPhase) const pct = streamProgress.total > 0 ? Math.round((streamProgress.done / streamProgress.total) * 100) @@ -301,7 +312,7 @@ function StreamProgressCard({
{streamLog.slice(-20).map((line, idx) => ( -
{line}
+
{line}
))}
@@ -743,6 +754,7 @@ export default function TopicWorkflowDashboard() { const [streamLog, setStreamLog] = useState([]) const [streamProgress, setStreamProgress] = useState({ done: 0, total: 0 }) const streamStartRef = useRef(null) + const streamAbortRef = useRef(null) const addStreamLog = useCallback((line: string) => { setStreamLog((prev) => [...prev.slice(-50), line]) @@ -875,6 +887,9 @@ export default function TopicWorkflowDashboard() { } async function runDailyPaperStream() { + streamAbortRef.current?.abort() + const controller = new AbortController() + streamAbortRef.current = controller setLoadingDaily(true); setError(null); setRepoRows([]); setRepoError(null) store.setPhase("reporting"); store.clearAnalyzeLog() setStreamPhase("search"); setStreamLog([]); setStreamProgress({ done: 0, total: 0 }) @@ -897,6 +912,7 @@ export default function TopicWorkflowDashboard() { method: "POST", headers: { "Content-Type": "application/json", Accept: "text/event-stream, application/json" }, body: JSON.stringify(requestBody), + signal: controller.signal, }) if (!res.ok) throw new Error(await res.text()) @@ -1129,6 +1145,7 @@ export default function TopicWorkflowDashboard() { } finally { setLoadingDaily(false) streamStartRef.current = null + streamAbortRef.current = null } } @@ -1139,6 +1156,9 @@ export default function TopicWorkflowDashboard() { const runInsight = Boolean(enableLLM && useInsight) if (!runJudge && !runTrends && !runInsight) { setError("Enable Judge, LLM trends, or LLM insight before analyzing."); return } + streamAbortRef.current?.abort() + const controller = new AbortController() + streamAbortRef.current = controller setLoadingAnalyze(true); setError(null); store.clearAnalyzeLog(); setAnalyzeProgress({ done: 0, total: 0 }); store.setPhase("reporting") setStreamPhase("idle"); setStreamLog([]); setStreamProgress({ done: 0, total: 0 }) streamStartRef.current = Date.now() @@ -1158,6 +1178,7 @@ export default function TopicWorkflowDashboard() { judge_runs: judgeRuns, judge_max_items_per_query: judgeMaxItems, judge_token_budget: judgeTokenBudget, trend_max_items_per_query: 3, }), + signal: controller.signal, }) if (!res.ok || !res.body) throw new Error(await res.text()) @@ -1357,6 +1378,7 @@ export default function TopicWorkflowDashboard() { } finally { setLoadingAnalyze(false) streamStartRef.current = null + streamAbortRef.current = null } }