Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/push_combined_summary_to_es_cron_job.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,18 @@ jobs:

- name: Execute Python script
run: python push_combined_summary_to_es.py

- name: Configure Git
run: |
git config user.email "${{ secrets.GIT_AUTHOR_EMAIL }}"
git config user.name "${{ secrets.GIT_AUTHOR_NAME }}"

- name: Add and commit changes
run: |
git add .
if git diff --staged --quiet; then
echo "No changes to commit"
else
git commit -m "push_combined_summary_to_es: updated logs"
git push
fi
15 changes: 15 additions & 0 deletions .github/workflows/push_summary_to_elasticsearch_cron_job.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,18 @@ jobs:

- name: Execute Python script
run: python push_summary_to_es.py

- name: Configure Git
run: |
git config user.email "${{ secrets.GIT_AUTHOR_EMAIL }}"
git config user.name "${{ secrets.GIT_AUTHOR_NAME }}"

- name: Add and commit changes
run: |
git add .
if git diff --staged --quiet; then
echo "No changes to commit"
else
git commit -m "push_summary_to_es: updated logs"
git push
fi
567 changes: 288 additions & 279 deletions generate_homepage_xml.py

Large diffs are not rendered by default.

388 changes: 198 additions & 190 deletions generate_weekly_newsletter_json.py

Large diffs are not rendered by default.

103 changes: 61 additions & 42 deletions push_combined_summary_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,85 @@
from loguru import logger
import glob
import os

from collections import defaultdict
from src.config import ES_INDEX
from src.elasticsearch_utils import ElasticSearchClient
from src.xml_utils import XMLReader
from src.utils import remove_timestamps_from_author_names
from src.utils import remove_timestamps_from_author_names, summarizer_log_csv

if __name__ == "__main__":

REMOVE_TIMESTAMPS_IN_AUTHORS = True
inserted_count = defaultdict(set)
updated_count = defaultdict(set)
no_changes_count = defaultdict(set)
unique_urls = set()
error_message = None
try:
REMOVE_TIMESTAMPS_IN_AUTHORS = True

xml_reader = XMLReader()
elastic_search = ElasticSearchClient()

xml_reader = XMLReader()
elastic_search = ElasticSearchClient()
total_combined_files = []
static_dirs = [
'bitcoin-dev',
'lightning-dev',
'delvingbitcoin'
]
pattern = "combined*.xml"

total_combined_files = []
static_dirs = [
'bitcoin-dev',
'lightning-dev',
'delvingbitcoin'
]
pattern = "combined*.xml"
for static_dir in static_dirs:
combined_files = glob.glob(f"static/{static_dir}/**/{pattern}")
total_combined_files.extend(combined_files)
logger.info(f"Total combined files: {(len(total_combined_files))}")

for static_dir in static_dirs:
combined_files = glob.glob(f"static/{static_dir}/**/{pattern}")
total_combined_files.extend(combined_files)
logger.info(f"Total combined files: {(len(total_combined_files))}")
total_combined_files_dict = {os.path.splitext(os.path.basename(i))[0]: i for i in total_combined_files}

# get unique combined file paths
total_combined_files_dict = {os.path.splitext(os.path.basename(i))[0]: i for i in total_combined_files}
logger.info(f"Total unique combined files: {len(total_combined_files_dict)}")

logger.info(f"Total unique combined files: {len(total_combined_files_dict)}")
for file_name, full_path in tqdm.tqdm(total_combined_files_dict.items()):
try:

count_new = 0
count_updated = 0
xml_file_data = xml_reader.read_xml_file(full_path)
url = xml_file_data['domain']
unique_urls.add(url)

for file_name, full_path in tqdm.tqdm(total_combined_files_dict.items()):
try:
# get data from xml file
xml_file_data = xml_reader.read_xml_file(full_path)
if REMOVE_TIMESTAMPS_IN_AUTHORS:

if REMOVE_TIMESTAMPS_IN_AUTHORS:
# remove timestamps from author's names and collect unique names only
xml_file_data['authors'] = remove_timestamps_from_author_names(xml_file_data['authors'])
xml_file_data['authors'] = remove_timestamps_from_author_names(xml_file_data['authors'])

res = elastic_search.es_client.update(
index=ES_INDEX,
id=file_name,
body={
body = {
'doc': xml_file_data,
'doc_as_upsert': True
}
)

logger.success(f"Version-{res['_version']}, Result-{res['result']}, ID-{res['_id']}")
if res['result'] == 'created':
count_new += 1
if res['result'] == 'updated':
count_updated += 1
res = elastic_search.upsert_document(index_name=ES_INDEX,
doc_id=file_name,
doc_body=body)

if res['result'] == 'created' or res['result'] == 'updated':
updated_count[url].add(res['_id'])
elif res['result'] == 'noop':
no_changes_count[url].add(res['_id'])

except Exception as ex:
logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}")
logger.warning(full_path)

logger.success(f"Process complete.")

except Exception as ex:
error_occurred = True
error_message = f"Error:{ex}\n{traceback.format_exc()}"
logger.error(f"Error Message: {error_message}")
logger.error(f"Process failed.")

except Exception as ex:
error_message = f"Error occurred: {ex} \n{traceback.format_exc()}"
logger.error(error_message)
finally:
summarizer_log_csv(file_name='push_combined_summary_to_es',
domain=list(unique_urls),
inserted=sum(len(inserted_count[url]) for url in unique_urls),
updated=sum(len(updated_count[url]) for url in unique_urls),
no_changes=sum(len(no_changes_count[url]) for url in unique_urls),
error=error_message)

logger.info(f"Inserted {count_new} new documents, Updated {count_updated} documents")
logger.success("Process Complete.")
96 changes: 58 additions & 38 deletions push_summary_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from src.config import ES_INDEX
from src.elasticsearch_utils import ElasticSearchClient
from src.xml_utils import XMLReader

from src.utils import summarizer_log_csv

if __name__ == "__main__":

Expand All @@ -22,47 +22,67 @@
"https://mailing-list.bitcoindevs.xyz/bitcoindev/"
]

for dev_url in dev_urls:
error_occurred = False
error_message = None
updated_ids = set()
try:
for dev_url in dev_urls:
try:
if APPLY_DATE_RANGE:
current_date_str = None
if not current_date_str:
current_date_str = datetime.now().strftime("%Y-%m-%d")
start_date = datetime.now() - timedelta(days=15)
start_date_str = start_date.strftime("%Y-%m-%d")
logger.info(f"start_date: {start_date_str}")
logger.info(f"current_date_str: {current_date_str}")
else:
start_date_str = None
current_date_str = None

if APPLY_DATE_RANGE:
current_date_str = None
if not current_date_str:
current_date_str = datetime.now().strftime("%Y-%m-%d")
start_date = datetime.now() - timedelta(days=15)
start_date_str = start_date.strftime("%Y-%m-%d")
logger.info(f"start_date: {start_date_str}")
logger.info(f"current_date_str: {current_date_str}")
else:
start_date_str = None
current_date_str = None
docs_list = elastic_search.fetch_data_with_empty_summary(ES_INDEX, dev_url, start_date_str,
current_date_str)

docs_list = elastic_search.fetch_data_with_empty_summary(ES_INDEX, dev_url, start_date_str, current_date_str)
if isinstance(dev_url, list):
dev_name = dev_url[0].split("/")[-2]
else:
dev_name = dev_url.split("/")[-2]

if isinstance(dev_url, list):
dev_name = dev_url[0].split("/")[-2]
else:
dev_name = dev_url.split("/")[-2]
logger.info(f"Total threads received with empty summary for '{dev_name}': {len(docs_list)}")

logger.success(f"Total threads received with empty summary for '{dev_name}': {len(docs_list)}")
for doc in tqdm.tqdm(docs_list):
try:
doc_id = doc['_id']
doc_index = doc['_index']
if not doc['_source'].get('summary'):
xml_summary = xml_reader.get_xml_summary(doc, dev_name)
if xml_summary:
res = elastic_search.es_client.update(
index=doc_index,
id=doc_id,
body={
'doc': {
"summary": xml_summary
}
}
)
updated_ids.add(res['_id'])
except Exception as ex:
logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}")
logger.warning(doc)

logger.success(f"Process complete for dev_url: {dev_url}")

for doc in tqdm.tqdm(docs_list):
res = None
try:
doc_id = doc['_id']
doc_index = doc['_index']
if not doc['_source'].get('summary'):
xml_summary = xml_reader.get_xml_summary(doc, dev_name)
if xml_summary:
elastic_search.es_client.update(
index=doc_index,
id=doc_id,
body={
'doc': {
"summary": xml_summary
}
}
)
except Exception as ex:
logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}")
error_occurred = True
error_message = f"Error: {ex}\n{traceback.format_exc()}"

except Exception as ex:
logger.error(f"Error: {str(ex)}\n{traceback.format_exc()}")

logger.success(f"Process complete.")
finally:
summarizer_log_csv(
file_name='push_summary_to_elasticsearch',
domain=dev_urls,
updated=len(updated_ids),
error=error_message)
50 changes: 41 additions & 9 deletions src/elasticsearch_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from datetime import datetime

from elasticsearch import Elasticsearch, NotFoundError
from elasticsearch.helpers import scan
from loguru import logger
Expand All @@ -23,10 +24,41 @@ def __init__(self,
http_auth=(self._es_username, self._es_password),
)

def get_domain_counts(self, index_name, domain):
"""Function to get the total counts for the given 'domain' field from Elasticsearch index."""
domain_query = self.get_domain_query(domain)
body = {
"query": domain_query
}

try:
resp = self.es_client.count(index=index_name, body=body)
return resp['count']
except Exception as e:
logger.error(f"Error fetching domain counts: {e}")
return None

@property
def es_client(self):
return self._es_client

def upsert_document(self, index_name, doc_id, doc_body):

script = {
"source": "ctx._source.putAll(params)",
"params": doc_body
}

request_body = {
"scripted_upsert": True,
"script": script,
"upsert": doc_body
}

# Perform the upsert operation
response = self._es_client.update(index=index_name, id=doc_id, body=request_body)
return response

def get_domain_query(self, url):
if isinstance(url, list):
domain_query = {"terms": {"domain.keyword": url}}
Expand Down Expand Up @@ -90,15 +122,15 @@ def fetch_data_based_on_title(self, es_index, title, url):
"must": [
{
"match_phrase":
{
"title.keyword": title
}
{
"title.keyword": title
}
},
{
"term":
{
"domain.keyword": str(url)
}
{
"domain.keyword": str(url)
}
}
]
}
Expand Down Expand Up @@ -236,7 +268,7 @@ def filter_top_recent_posts(self, es_results, top_n):
def filter_top_active_posts(self, es_results, top_n):
unique_results = []

thread_dict = {} # maps post titles to their respective activity levels
thread_dict = {} # maps post titles to their respective activity levels
# create dictionary with title as key and thread count as value
for result in es_results:
title = result['_source']['title']
Expand Down Expand Up @@ -383,10 +415,10 @@ def es_fetch_contributors_and_threads(self, es_index, title, domain):
"""
Fetches the count of threads and unique contributors for a given post based on title and domain
"""
# The search query
# The search query
domain_query = self.get_domain_query(domain)
query = {
"size": 0, # no search hits are returned, the focus is solely on the aggregations and counts
"size": 0, # no search hits are returned, the focus is solely on the aggregations and counts
"query": {
"bool": {
"must": [
Expand Down
Loading