Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data_scraper/core/ci_logs_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
class CILogsRecord(TypedDict):
"""CILogs data point."""
url: str
topic: str
test_name: str
text: str
components: list[str]
kind: str
Expand All @@ -37,7 +37,7 @@ def get_records(self, documents: list[dict]) -> list[CILogsRecord]:
for document in documents:
ci_logs_records.append({
"url": document["url"],
"topic": document["test_name"],
"test_name": document["test_name"],
"text": document["traceback"],
"components": [],
"kind": "zuul_jobs",
Expand Down
22 changes: 15 additions & 7 deletions data_scraper/core/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ def record_postprocessing(self, record: dict) -> None:
after it has been created but before storing in vectorDB."""
raise NotImplementedError

def store_records(self, records: list, recreate: bool = True) -> None:
def store_records(self,
records: list,
record_fields_for_key: tuple[str, ...],
recreate: bool = True) -> None:
"""Process text and store embeddings in database."""
vector_size = self.get_embedding_dimension()

Expand All @@ -107,12 +110,17 @@ def store_records(self, records: list, recreate: bool = True) -> None:
raise IOError

for record in tqdm(records, desc="Processing embeddings"):
if record['url']:
record_id = str(uuid.uuid5(uuid.NAMESPACE_URL, record["url"]))
else:
LOG.error("Missing required URL field")
missing_fields = [
field for field in record_fields_for_key
if field not in record or not record[field]
]
if missing_fields:
LOG.error("Missing required fields for key generation: %s", missing_fields)
continue

combined_key = "_".join([record[field] for field in record_fields_for_key])
record_id = str(uuid.uuid5(uuid.NAMESPACE_URL, combined_key))

chunks: list[str] = self.get_chunks(record)

embeddings: list[list[float]] = []
Expand Down Expand Up @@ -148,7 +156,7 @@ def get_records(self, documents: List[Dict]) -> list[dict]:
"""Convert raw data into list of dictionaries."""
raise NotImplementedError

def run(self):
def run(self, record_fields_for_key: tuple[str,...] = ("url",)):
"""Main execution method."""
documents = self.get_documents()
if not documents:
Expand All @@ -159,7 +167,7 @@ def run(self):
records = self.cleanup_records(records)

# Process and store embeddings
self.store_records(records, self.config["recreate_collection"])
self.store_records(records, record_fields_for_key, self.config["recreate_collection"])

# Print final stats
stats = self.db_manager.get_collection_stats(self.config["db_collection_name"])
Expand Down
2 changes: 1 addition & 1 deletion data_scraper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def ci_logs_scraper() -> None:

# when json is ready, proceed with tracebacks and store them to QdrantDB
scraper = CILogsScraper(config_args)
scraper.run()
scraper.run(("url", "test_name"))


def solutions_scraper() -> None:
Expand Down
4 changes: 2 additions & 2 deletions data_scraper/processors/ci_logs_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@

# Test path constants
TEST_OPERATOR_PATH = "logs/controller-0/ci-framework-data/tests/test_operator"
TEMPEST_TEST_PATTERN = "tempest-tests"
TOBIKO_TEST_PATTERN = "tobiko-tests"
TEMPEST_TEST_PATTERN = "tempest-"
TOBIKO_TEST_PATTERN = "tobiko-"

async def fetch_with_gssapi(url, params=None, timeout=30.0):
"""
Expand Down
Loading