diff --git a/data_scraper/common/constants.py b/data_scraper/common/constants.py index 6d417a8..977d274 100644 --- a/data_scraper/common/constants.py +++ b/data_scraper/common/constants.py @@ -4,6 +4,9 @@ OSP_DOCS_COLLECTION_NAME = "rca-osp-docs-knowledge-base" ERRATA_COLLECTION_NAME = "rca-errata" CI_LOGS_COLLECTION_NAME = "rca-ci" +SOLUTIONS_COLLECTION_NAME = "rca-solutions" +SOLUTIONS_PRODUCT_NAME = "OpenStack" +SOLUTIONS_MAX_RESULTS = 9999 DEFAULT_EMBEDDING_MODEL = "BAAI/bge-m3" DEFAULT_JIRA_URL = "https://issues.redhat.com" DEFAULT_JIRA_PROJECTS = { @@ -25,3 +28,4 @@ DEFAULT_DATE_CUTOFF = "2000-01-01T00:00:00Z" DEFAULT_NUM_SCRAPER_PROCESSES=10 DEFAULT_ERRATA_PUBLIC_URL="https://access.redhat.com/errata" +DEFAULT_SOLUTIONS_PUBLIC_URL="https://access.redhat.com" diff --git a/data_scraper/core/solutions_scraper.py b/data_scraper/core/solutions_scraper.py new file mode 100644 index 0000000..fed0b21 --- /dev/null +++ b/data_scraper/core/solutions_scraper.py @@ -0,0 +1,93 @@ +"""Code for scraping Solutions data""" +import logging + +from typing import List, Dict, TypedDict +from tqdm import tqdm + + +import pandas as pd + +from data_scraper.core.scraper import Scraper +from data_scraper.processors.solutions_provider import SolutionsProvider + + +LOG = logging.getLogger(__name__) +LOG.setLevel(logging.INFO) + + +class SolutionsRecord(TypedDict): + """Represents a record extracted from Solutions""" + kb_id: str + kind: str + topic: str + url: str + issue: str + diagnosticsteps: str + text: str + components: list[str] + + +class SolutionsScraper(Scraper): + """Main class for Solutions scraping and processing.""" + + def __init__(self, config: dict): + super().__init__(config=config) + self.config = config + self.kb_provider = SolutionsProvider( + self.config["solutions_url"], + self.config["solutions_token"] + ) + + def get_documents(self) -> list[dict]: + documents = self.kb_provider.get_solutions( + self.config["product_name"], + self.config["max_results"]) + return documents + + def get_records(self, documents: List[Dict]) -> list[SolutionsRecord]: + """Convert Solution API responses to SolutionsRecord""" + solutions_records: list[SolutionsRecord] = [] + for raw_result in tqdm(documents, desc="Processing issues"): + solutions_records.append( + { + "kb_id": raw_result.get('id', ''), + "url": raw_result.get('view_uri', ''), + "topic": raw_result.get('publishedTitle', ''), + "issue": ''.join(raw_result.get('issue', '')), + "diagnosticsteps": ''.join(raw_result.get('solution_diagnosticsteps', 'N/A')), + "text": ''.join(raw_result.get('solution_resolution', 'N/A')), + "components": raw_result.get('component', []), + "kind": "solution", + } + ) + + return solutions_records + + def get_chunks(self, record: dict) -> list[str]: + chunks = [] + + for kb_field in ["topic", "issue"]: + chunks += self.text_processor.split_text(record[kb_field]) + + return chunks + + def record_postprocessing(self, record): + # Postprocessing is not required for Errata records + pass + + def cleanup_records( + self, records: list, backup_path: str = "solutions_all_data.csv" + ) -> list: + df = pd.DataFrame(records) + + LOG.info("Records stats BEFORE cleanup: %d", df.shape[0]) + + df = df.dropna() + df = df.drop_duplicates(subset=["text"]) + + LOG.info("Records stats AFTER cleanup: %d", df.shape[0]) + + LOG.info("Saving backup to: %s", backup_path) + df.to_csv(backup_path) + + return [SolutionsRecord(**row) for row in df.to_dict(orient="records")] diff --git a/data_scraper/main.py b/data_scraper/main.py index 43940d2..37962f5 100644 --- a/data_scraper/main.py +++ b/data_scraper/main.py @@ -8,6 +8,7 @@ from data_scraper.core.errata_scraper import ErrataScraper from data_scraper.core.ci_logs_scraper import CILogsScraper from data_scraper.processors.ci_logs_provider import TestOperatorReportsProvider +from data_scraper.core.solutions_scraper import SolutionsScraper logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) @@ -181,8 +182,6 @@ def errata_scraper() -> None: scraper.run() - - def ci_logs_scraper() -> None: """Entry point for command line execution.""" parser = ArgumentParser("ci_logs_scraper") @@ -250,3 +249,50 @@ def ci_logs_scraper() -> None: # when json is ready, proceed with tracebacks and store them to QdrantDB scraper = CILogsScraper(config_args) scraper.run() + + +def solutions_scraper() -> None: + """Entry point for command line execution.""" + parser = ArgumentParser("solutions_scraper") + + # Required arguments + parser.add_argument("--database_client_url", type=str, required=True) + parser.add_argument("--llm_server_url", type=str, required=True) + parser.add_argument("--llm_api_key", type=str, required=True) + parser.add_argument("--database_api_key", type=str, required=True) + parser.add_argument("--solutions-token", type=str, required=True) + + # Optional arguments + parser.add_argument("--solutions-url", type=str, + default=constants.DEFAULT_SOLUTIONS_PUBLIC_URL) + parser.add_argument("--max_results", type=int, + default=constants.SOLUTIONS_MAX_RESULTS) + parser.add_argument("--chunk_size", type=int, + default=constants.DEFAULT_CHUNK_SIZE) + parser.add_argument("--embedding_model", type=str, + default=constants.DEFAULT_EMBEDDING_MODEL) + parser.add_argument("--db_collection_name", type=str, + default=constants.SOLUTIONS_COLLECTION_NAME) + parser.add_argument("--product_name", type=str, + default=constants.SOLUTIONS_PRODUCT_NAME) + parser.add_argument("--recreate_collection", type=bool, default=True, + help="Recreate database collection from scratch.") + args = parser.parse_args() + + config_args = { + "database_client_url": args.database_client_url, + "llm_server_url": args.llm_server_url, + "llm_api_key": args.llm_api_key, + "database_api_key": args.database_api_key, + "chunk_size": args.chunk_size, + "embedding_model": args.embedding_model, + "db_collection_name": args.db_collection_name, + "solutions_url": args.solutions_url, + "solutions_token": args.solutions_token, + "product_name": args.product_name, + "max_results": args.max_results, + "recreate_collection": args.recreate_collection, + } + + scraper = SolutionsScraper(config_args) + scraper.run() diff --git a/data_scraper/processors/solutions_provider.py b/data_scraper/processors/solutions_provider.py new file mode 100644 index 0000000..bb643a9 --- /dev/null +++ b/data_scraper/processors/solutions_provider.py @@ -0,0 +1,72 @@ +"""Client to fetch Solutions.""" +import logging +import requests + +LOG = logging.getLogger(__name__) +LOG.setLevel(logging.INFO) + + +# pylint: disable=too-few-public-methods +class SolutionsProvider: + """Provider for Solutions""" + + def __init__(self, query_url: str, query_token: str): + self.query_url = query_url + self.headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {query_token}", + } + + def get_solutions(self, product_name: str, + max_results: int, + start_at: int = 0) -> list[dict]: + """Get solutions from Knowledge Base. + + Gets solutions from Knowledge Base and returns list of all the solutions and number + of retrieved records. + + Args: + product_name: Search for Solutions for the specific product name + (e.g., product_name="*OpenStack*") + max_results: Maximum number of solutions that should be retrieved + start_at: Specifies a start page you want to download. + """ + + url = f"{self.query_url}/hydra/rest/search/v2/kcs" + + query = (f"fq=(documentKind:Solution AND product: *{product_name}* AND " + "solution_resolution:*)&sort=lastModifiedDate desc") + + payload = { + "clientName": "cli", + "expression": query, + "q": "*", + "rows": max_results, + "start": start_at + } + + LOG.info("Processing Solutions request [product: %s, max_results: %d, " + "start_at: %d]", query, max_results, start_at) + + try: + response = requests.post( + url, + json=payload, + headers=self.headers, + verify=False, + timeout=(3.05, 180), + ) + except requests.exceptions.Timeout: + LOG.error("Request to Knowledge base %s timed out.", query) + return [{}] + except requests.exceptions.RequestException as e: + LOG.error("Error fetching KB data: %s", e) + return [{}] + parsed_response = response.json()['response'] + LOG.info("Found %d Solution records matching the query and retrieved %d " \ + "of them. [query: %s, max_results: %d, start_at: %d]", + parsed_response["numFound"], + len(parsed_response["docs"]), + query, max_results, start_at) + + return parsed_response["docs"] diff --git a/pyproject.toml b/pyproject.toml index f4f5a14..e7271d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ ci_logs_scraper = "data_scraper.main:ci_logs_scraper" feedback_exporter = "feedback_exporter.export_feedback:main" evaluation = "evaluation.evaluation:main" osp_doc_scraper = "data_scraper.main:osp_doc_scraper" +solutions_scraper = "data_scraper.main:solutions_scraper" [tool.setuptools.packages.find] include = ["data_scraper*", "feedback_exporter*"]