diff --git a/.github/workflows/push_combined_summary_to_es_cron_job.yml b/.github/workflows/push_combined_summary_to_es_cron_job.yml index 67865514a8..ee303ea8f1 100644 --- a/.github/workflows/push_combined_summary_to_es_cron_job.yml +++ b/.github/workflows/push_combined_summary_to_es_cron_job.yml @@ -38,3 +38,18 @@ jobs: - name: Execute Python script run: python push_combined_summary_to_es.py + + - name: Configure Git + run: | + git config user.email "${{ secrets.GIT_AUTHOR_EMAIL }}" + git config user.name "${{ secrets.GIT_AUTHOR_NAME }}" + + - name: Add and commit changes + run: | + git add . + if git diff --staged --quiet; then + echo "No changes to commit" + else + git commit -m "push_combined_summary_to_es: updated logs" + git push + fi diff --git a/.github/workflows/push_summary_to_elasticsearch_cron_job.yml b/.github/workflows/push_summary_to_elasticsearch_cron_job.yml index 285118d93c..c62fb2810f 100644 --- a/.github/workflows/push_summary_to_elasticsearch_cron_job.yml +++ b/.github/workflows/push_summary_to_elasticsearch_cron_job.yml @@ -38,3 +38,18 @@ jobs: - name: Execute Python script run: python push_summary_to_es.py + + - name: Configure Git + run: | + git config user.email "${{ secrets.GIT_AUTHOR_EMAIL }}" + git config user.name "${{ secrets.GIT_AUTHOR_NAME }}" + + - name: Add and commit changes + run: | + git add . + if git diff --staged --quiet; then + echo "No changes to commit" + else + git commit -m "push_summary_to_es: updated logs" + git push + fi diff --git a/generate_homepage_xml.py b/generate_homepage_xml.py index ffea736bc4..7d649c932a 100644 --- a/generate_homepage_xml.py +++ b/generate_homepage_xml.py @@ -1,19 +1,20 @@ +import json +import os import random +import sys import time import traceback +import warnings from datetime import datetime, timedelta + from loguru import logger -import os -import sys -import warnings -import json from tqdm import tqdm from src.config import ES_INDEX from src.elasticsearch_utils import ElasticSearchClient from src.json_utils import GenerateJSON +from src.utils import month_dict, summarizer_log_csv from src.xml_utils import GenerateXML -from src.utils import month_dict warnings.filterwarnings("ignore") @@ -41,121 +42,96 @@ def page_data_handling(data_list: list, get_unique_per_dev=False): if __name__ == "__main__": + error_message = None + try: + gen = GenerateJSON() + xml_gen = GenerateXML() + elastic_search = ElasticSearchClient() + dev_urls = [ + ["https://lists.linuxfoundation.org/pipermail/bitcoin-dev/", + "https://gnusha.org/pi/bitcoindev/", + "https://mailing-list.bitcoindevs.xyz/bitcoindev/"], + "https://lists.linuxfoundation.org/pipermail/lightning-dev/", + "https://delvingbitcoin.org/" + ] + + current_date = datetime.now() + current_date_str = current_date.strftime("%Y-%m-%d") + + start_date = current_date - timedelta(days=7) + start_date_str = start_date.strftime("%Y-%m-%d") + logger.info(f"start_date: {start_date_str}") + logger.info(f"current_date_str: {current_date_str}") + + month_name = month_dict[int(current_date.month)] + str_month_year = f"{month_name}_{int(current_date.year)}" + + json_file_path = fr"static/homepage.json" + + recent_data_list = [] + active_data_list = [] + today_in_history_data_list = [] + history_data_collected_from_yesterday = False + + random_years_ago = None + + for dev_url in dev_urls: + logger.info(f"Working on URL: {dev_url}") + fetch_today_in_history = True + + data_list = elastic_search.extract_data_from_es( + ES_INDEX, dev_url, start_date_str, current_date_str, exclude_combined_summary_docs=True + ) - gen = GenerateJSON() - xml_gen = GenerateXML() - elastic_search = ElasticSearchClient() - dev_urls = [ - ["https://lists.linuxfoundation.org/pipermail/bitcoin-dev/", - "https://gnusha.org/pi/bitcoindev/", - "https://mailing-list.bitcoindevs.xyz/bitcoindev/"], - "https://lists.linuxfoundation.org/pipermail/lightning-dev/", - "https://delvingbitcoin.org/" - ] - - current_date = datetime.now() - current_date_str = current_date.strftime("%Y-%m-%d") - - start_date = current_date - timedelta(days=7) - start_date_str = start_date.strftime("%Y-%m-%d") - logger.info(f"start_date: {start_date_str}") - logger.info(f"current_date_str: {current_date_str}") - - month_name = month_dict[int(current_date.month)] - str_month_year = f"{month_name}_{int(current_date.year)}" - - json_file_path = fr"static/homepage.json" - - recent_data_list = [] - active_data_list = [] - today_in_history_data_list = [] - history_data_collected_from_yesterday = False - - random_years_ago = None + if isinstance(dev_url, list): + dev_name = dev_url[0].split("/")[-2] + else: + dev_name = dev_url.split("/")[-2] - for dev_url in dev_urls: - logger.info(f"Working on URL: {dev_url}") - fetch_today_in_history = True + logger.success(f"TOTAL THREADS RECEIVED FOR - '{dev_name}': {len(data_list)}") - data_list = elastic_search.extract_data_from_es( - ES_INDEX, dev_url, start_date_str, current_date_str, exclude_combined_summary_docs=True - ) + seen_titles = set() - if isinstance(dev_url, list): - dev_name = dev_url[0].split("/")[-2] - else: - dev_name = dev_url.split("/")[-2] + # TOP ACTIVE POSTS + active_posts_data = elastic_search.filter_top_active_posts( + es_results=data_list, top_n=10 + ) - logger.success(f"TOTAL THREADS RECEIVED FOR - '{dev_name}': {len(data_list)}") + active_posts_data_counter = 0 + for data in active_posts_data: + if active_posts_data_counter >= 3: + break - seen_titles = set() + title = data['_source']['title'] + if title in seen_titles: + continue + seen_titles.add(title) - # TOP ACTIVE POSTS - active_posts_data = elastic_search.filter_top_active_posts( - es_results=data_list, top_n=10 - ) + # get the first post's info of this title + original_post = elastic_search.get_earliest_posts_by_title( + es_index=ES_INDEX, url=dev_url, title=title + ) - active_posts_data_counter = 0 - for data in active_posts_data: - if active_posts_data_counter >= 3: - break + counts, contributors = elastic_search.es_fetch_contributors_and_threads( + es_index=ES_INDEX, title=title, domain=dev_url + ) - title = data['_source']['title'] - if title in seen_titles: - continue - seen_titles.add(title) + # if you want to show the first post of each selected title, + # then do the below operations on - 'original_post', else on 'data' + for author in original_post['_source']['authors']: + contributors.remove(author) + original_post['_source']['n_threads'] = counts + original_post['_source']['contributors'] = contributors + original_post['_source']['dev_name'] = dev_name + active_data_list.append(original_post) + active_posts_data_counter += 1 - # get the first post's info of this title - original_post = elastic_search.get_earliest_posts_by_title( - es_index=ES_INDEX, url=dev_url, title=title - ) + logger.success(f"Number of active posts collected: {len(active_data_list)}, for URL: {dev_url}") - counts, contributors = elastic_search.es_fetch_contributors_and_threads( - es_index=ES_INDEX, title=title, domain=dev_url - ) + # TOP RECENT POSTS + recent_data_post_counter = 0 + recent_posts_data = elastic_search.filter_top_recent_posts(es_results=data_list, top_n=20) - # if you want to show the first post of each selected title, - # then do the below operations on - 'original_post', else on 'data' - for author in original_post['_source']['authors']: - contributors.remove(author) - original_post['_source']['n_threads'] = counts - original_post['_source']['contributors'] = contributors - original_post['_source']['dev_name'] = dev_name - active_data_list.append(original_post) - active_posts_data_counter += 1 - - logger.success(f"Number of active posts collected: {len(active_data_list)}, for URL: {dev_url}") - - # TOP RECENT POSTS - recent_data_post_counter = 0 - recent_posts_data = elastic_search.filter_top_recent_posts(es_results=data_list, top_n=20) - - for data in recent_posts_data: - # if preprocess body text not longer than token_threshold, skip that post - if not gen.is_body_text_long(data=data, sent_threshold=2): - logger.info(f"skipping: {data['_source']['title']} - {data['_source']['url']}") - continue - - title = data['_source']['title'] - if title in seen_titles: - continue - seen_titles.add(title) - if recent_data_post_counter >= 3: - break - - counts, contributors = elastic_search.es_fetch_contributors_and_threads( - es_index=ES_INDEX, title=title, domain=dev_url - ) - # exclude the post authors - for author in data['_source']['authors']: - contributors.remove(author) - data['_source']['n_threads'] = counts - data['_source']['contributors'] = contributors - data['_source']['dev_name'] = dev_name - recent_data_list.append(data) - recent_data_post_counter += 1 - - if not recent_data_list: for data in recent_posts_data: # if preprocess body text not longer than token_threshold, skip that post if not gen.is_body_text_long(data=data, sent_threshold=2): @@ -163,8 +139,12 @@ def page_data_handling(data_list: list, get_unique_per_dev=False): continue title = data['_source']['title'] + if title in seen_titles: + continue + seen_titles.add(title) if recent_data_post_counter >= 3: break + counts, contributors = elastic_search.es_fetch_contributors_and_threads( es_index=ES_INDEX, title=title, domain=dev_url ) @@ -177,198 +157,227 @@ def page_data_handling(data_list: list, get_unique_per_dev=False): recent_data_list.append(data) recent_data_post_counter += 1 - logger.success(f"Number of recent posts collected: {len(recent_data_list)}, for URL: {dev_url}") - - # TODAY IN HISTORY POSTS - logger.info(f"fetching 'Today in history' posts... ") - - if not random_years_ago: - at_least_years_ago = 3 - at_max_years_ago = current_date.year - 2015 - random_years_ago = random.randint(at_least_years_ago, at_max_years_ago) - logger.info(f"random years ago between {at_least_years_ago} to {at_max_years_ago}: {random_years_ago}") - - if dev_url == "https://delvingbitcoin.org/": - random_years_ago = random.randint(1, current_date.year - 2022) - logger.info( - f"for delving-bitcoin - random years ago between {1} to {current_date.year - 2022}: {random_years_ago}") + if not recent_data_list: + for data in recent_posts_data: + # if preprocess body text not longer than token_threshold, skip that post + if not gen.is_body_text_long(data=data, sent_threshold=2): + logger.info(f"skipping: {data['_source']['title']} - {data['_source']['url']}") + continue - default_days_to_look_back = 6 - loop_counter = 1 - - while fetch_today_in_history: - days_to_look_back = default_days_to_look_back * loop_counter - selected_random_date = current_date - timedelta(days=365 * random_years_ago) + title = data['_source']['title'] + if recent_data_post_counter >= 3: + break + counts, contributors = elastic_search.es_fetch_contributors_and_threads( + es_index=ES_INDEX, title=title, domain=dev_url + ) + # exclude the post authors + for author in data['_source']['authors']: + contributors.remove(author) + data['_source']['n_threads'] = counts + data['_source']['contributors'] = contributors + data['_source']['dev_name'] = dev_name + recent_data_list.append(data) + recent_data_post_counter += 1 + + logger.success(f"Number of recent posts collected: {len(recent_data_list)}, for URL: {dev_url}") + + # TODAY IN HISTORY POSTS + logger.info(f"fetching 'Today in history' posts... ") + + if not random_years_ago: + at_least_years_ago = 3 + at_max_years_ago = current_date.year - 2015 + random_years_ago = random.randint(at_least_years_ago, at_max_years_ago) + logger.info(f"random years ago between {at_least_years_ago} to {at_max_years_ago}: {random_years_ago}") + + if dev_url == "https://delvingbitcoin.org/": + random_years_ago = random.randint(1, current_date.year - 2022) + logger.info( + f"for delving-bitcoin - random years ago between {1} to {current_date.year - 2022}: {random_years_ago}") - start_of_time = selected_random_date - timedelta(days=selected_random_date.weekday()) - end_of_time = start_of_time + timedelta(days=days_to_look_back) + default_days_to_look_back = 6 + loop_counter = 1 - start_of_time_str = start_of_time.strftime("%Y-%m-%dT%H:%M:%S") - end_of_time_str = end_of_time.strftime("%Y-%m-%dT%H:%M:%S") + while fetch_today_in_history: + days_to_look_back = default_days_to_look_back * loop_counter + selected_random_date = current_date - timedelta(days=365 * random_years_ago) - logger.info( - f"collecting the data from {days_to_look_back} days range ... || Start of week: {start_of_time} | " - f"End of week: {end_of_time}") + start_of_time = selected_random_date - timedelta(days=selected_random_date.weekday()) + end_of_time = start_of_time + timedelta(days=days_to_look_back) - selected_threads = elastic_search.fetch_data_in_date_range( - es_index=ES_INDEX, - start_date=start_of_time_str, - end_date=end_of_time_str, - domain=dev_url - ) + start_of_time_str = start_of_time.strftime("%Y-%m-%dT%H:%M:%S") + end_of_time_str = end_of_time.strftime("%Y-%m-%dT%H:%M:%S") - if len(selected_threads) > 0: - for doc in selected_threads: - doc_title = doc['_source']['title'] - doc_created_at = doc['_source']['created_at'] - - if doc['_source']['type'] == 'original_post': - - counts, contributors = elastic_search.es_fetch_contributors_and_threads( - es_index=ES_INDEX, title=doc_title, domain=dev_url - ) - - if counts < 5: - logger.info(f"No. of replies are less than 5, skipping it... ") - continue - - if contributors: - # exclude the post authors - for author in doc['_source']['authors']: - contributors.remove(author) - doc['_source']['n_threads'] = counts - doc['_source']['contributors'] = contributors - doc['_source']['dev_name'] = dev_name - today_in_history_data_list.append(doc) - logger.info(f"collected doc created on: {doc_created_at} || TITLE: {doc_title}") - fetch_today_in_history = False - break - loop_counter += 1 - - # add history data from yesterday's homepage.json - if not today_in_history_data_list: - logger.info("Collecting yesterday's history threads!") - current_directory = os.getcwd() - full_path = os.path.join(current_directory, json_file_path) - if os.path.exists(full_path): - with open(full_path, 'r') as j: - try: - data = json.load(j) - except Exception as e: - logger.info(f"Error reading json file:{full_path} :: {e}") - data = {} - today_in_history_data_list.extend(data.get('today_in_history_posts', [])) - history_data_collected_from_yesterday = True - - logger.success(f"No. of 'Today in history' posts collected: {len(today_in_history_data_list)}") - - current_directory = os.getcwd() - full_path = os.path.join(current_directory, json_file_path) - if os.path.exists(full_path): - with open(full_path, 'r') as j: - try: - yesterday_data = json.load(j) - except Exception as e: - logger.info(f"Error reading json file:{full_path} :: {e}") - yesterday_data = {} - - xml_ids_title = gen.get_existing_json_title(file_path=json_file_path) - recent_post_ids = [data['_source']['title'] for data in recent_data_list] - active_post_ids = [data['_source']['title'] for data in active_data_list] - all_post_titles = set(recent_post_ids + active_post_ids) - - if all_post_titles != set(xml_ids_title): - logger.info("changes found in recent posts ... ") - - delay = 5 - count = 0 - - while True: - try: logger.info( - f"active posts: {len(active_data_list)}, " - f"recent posts: {len(recent_data_list)}, " - f"today in history posts: {len(today_in_history_data_list)}" + f"collecting the data from {days_to_look_back} days range ... || Start of week: {start_of_time} | " + f"End of week: {end_of_time}") + + selected_threads = elastic_search.fetch_data_in_date_range( + es_index=ES_INDEX, + start_date=start_of_time_str, + end_date=end_of_time_str, + domain=dev_url ) - logger.info("Creating homepage.json file ... ") - recent_post_summ = "" - if len(active_data_list) > 0 or len(recent_data_list) > 0: + if len(selected_threads) > 0: + for doc in selected_threads: + doc_title = doc['_source']['title'] + doc_created_at = doc['_source']['created_at'] + + if doc['_source']['type'] == 'original_post': + + counts, contributors = elastic_search.es_fetch_contributors_and_threads( + es_index=ES_INDEX, title=doc_title, domain=dev_url + ) + + if counts < 5: + logger.info(f"No. of replies are less than 5, skipping it... ") + continue + + if contributors: + # exclude the post authors + for author in doc['_source']['authors']: + contributors.remove(author) + doc['_source']['n_threads'] = counts + doc['_source']['contributors'] = contributors + doc['_source']['dev_name'] = dev_name + today_in_history_data_list.append(doc) + logger.info(f"collected doc created on: {doc_created_at} || TITLE: {doc_title}") + fetch_today_in_history = False + break + loop_counter += 1 + + # add history data from yesterday's homepage.json + if not today_in_history_data_list: + logger.info("Collecting yesterday's history threads!") + current_directory = os.getcwd() + full_path = os.path.join(current_directory, json_file_path) + if os.path.exists(full_path): + with open(full_path, 'r') as j: + try: + data = json.load(j) + except Exception as e: + logger.info(f"Error reading json file:{full_path} :: {e}") + data = {} + today_in_history_data_list.extend(data.get('today_in_history_posts', [])) + history_data_collected_from_yesterday = True + + logger.success(f"No. of 'Today in history' posts collected: {len(today_in_history_data_list)}") + + current_directory = os.getcwd() + full_path = os.path.join(current_directory, json_file_path) + if os.path.exists(full_path): + with open(full_path, 'r') as j: + try: + yesterday_data = json.load(j) + except Exception as e: + logger.info(f"Error reading json file:{full_path} :: {e}") + yesterday_data = {} + + xml_ids_title = gen.get_existing_json_title(file_path=json_file_path) + recent_post_ids = [data['_source']['title'] for data in recent_data_list] + active_post_ids = [data['_source']['title'] for data in active_data_list] + all_post_titles = set(recent_post_ids + active_post_ids) + + if all_post_titles != set(xml_ids_title): + logger.info("changes found in recent posts ... ") + + delay = 5 + count = 0 + + while True: + try: + logger.info( + f"active posts: {len(active_data_list)}, " + f"recent posts: {len(recent_data_list)}, " + f"today in history posts: {len(today_in_history_data_list)}" + ) + logger.info("Creating homepage.json file ... ") + + recent_post_summ = "" + if len(active_data_list) > 0 or len(recent_data_list) > 0: + + # header summary + if len(recent_data_list) > 0: + recent_post_summ = gen.generate_recent_posts_summary(recent_data_list) + else: + recent_post_summ = gen.generate_recent_posts_summary(active_data_list) + logger.success(recent_post_summ) + + # recent data + recent_page_data = page_data_handling(recent_data_list) + + # active data + active_page_data = page_data_handling(active_data_list) - # header summary - if len(recent_data_list) > 0: - recent_post_summ = gen.generate_recent_posts_summary(recent_data_list) else: - recent_post_summ = gen.generate_recent_posts_summary(active_data_list) - logger.success(recent_post_summ) - - # recent data - recent_page_data = page_data_handling(recent_data_list) + logger.error(f"'Active' and 'Recent' data list empty! Please check the data again.") + recent_page_data, active_page_data = [], [] - # active data - active_page_data = page_data_handling(active_data_list) + # today in history + if history_data_collected_from_yesterday: + logger.info("No change in 'Today in History' data posts, gathering data from yesterday's post.") + today_in_history_data = yesterday_data.get('today_in_history_posts', []) + else: + if len(today_in_history_data_list) > 0: + today_in_history_data = page_data_handling(today_in_history_data_list, get_unique_per_dev=True) + else: + logger.error(f"'Today in history' data list empty! Please check the data again.") + today_in_history_data = [] + + json_string = { + "header_summary": recent_post_summ, + "recent_posts": recent_page_data, + "active_posts": active_page_data, + "today_in_history_posts": today_in_history_data + } + gen.write_json_file(json_string, json_file_path) + + archive_json_file_path = fr"static/homepage/{str_month_year}/{current_date_str}-homepage.json" + gen.store_file_in_archive(json_file_path, archive_json_file_path) + break - else: - logger.error(f"'Active' and 'Recent' data list empty! Please check the data again.") - recent_page_data, active_page_data = [], [] + except Exception as ex: + logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") + time.sleep(delay) + count += 1 + if count > 1: + sys.exit(f"{ex}") + else: + logger.info("No change in 'Recent' or 'Active' posts.") + rewrite_json_file = False - # today in history - if history_data_collected_from_yesterday: - logger.info("No change in 'Today in History' data posts, gathering data from yesterday's post.") - today_in_history_data = yesterday_data.get('today_in_history_posts', []) + # update today in history and save file if no change in Recent or Active posts + if history_data_collected_from_yesterday: + logger.info("No change in 'Today in History' data posts, gathering data from yesterday's post.") + today_in_history_data = yesterday_data.get('today_in_history_posts', []) + else: + rewrite_json_file = True + if len(today_in_history_data_list) > 0: + today_in_history_data = page_data_handling(today_in_history_data_list, get_unique_per_dev=True) else: - if len(today_in_history_data_list) > 0: - today_in_history_data = page_data_handling(today_in_history_data_list, get_unique_per_dev=True) - else: - logger.error(f"'Today in history' data list empty! Please check the data again.") - today_in_history_data = [] + logger.error(f"'Today in history' data list empty! Please check the data again.") + today_in_history_data = [] + if rewrite_json_file: + logger.info(f'Rewriting the homepage.json file') json_string = { - "header_summary": recent_post_summ, - "recent_posts": recent_page_data, - "active_posts": active_page_data, + "header_summary": yesterday_data.get('header_summary', []), + "recent_posts": yesterday_data.get('recent_posts', []), + "active_posts": yesterday_data.get('recent_posts', []), "today_in_history_posts": today_in_history_data } gen.write_json_file(json_string, json_file_path) + else: + logger.info("No need to rewrite homepage.json file") + if os.path.exists(full_path): archive_json_file_path = fr"static/homepage/{str_month_year}/{current_date_str}-homepage.json" gen.store_file_in_archive(json_file_path, archive_json_file_path) - break - - except Exception as ex: - logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") - time.sleep(delay) - count += 1 - if count > 1: - sys.exit(f"{ex}") - else: - logger.info("No change in 'Recent' or 'Active' posts.") - rewrite_json_file = False - - # update today in history and save file if no change in Recent or Active posts - if history_data_collected_from_yesterday: - logger.info("No change in 'Today in History' data posts, gathering data from yesterday's post.") - today_in_history_data = yesterday_data.get('today_in_history_posts', []) - else: - rewrite_json_file = True - if len(today_in_history_data_list) > 0: - today_in_history_data = page_data_handling(today_in_history_data_list, get_unique_per_dev=True) - else: - logger.error(f"'Today in history' data list empty! Please check the data again.") - today_in_history_data = [] - - if rewrite_json_file: - logger.info(f'Rewriting the homepage.json file') - json_string = { - "header_summary": yesterday_data.get('header_summary', []), - "recent_posts": yesterday_data.get('recent_posts', []), - "active_posts": yesterday_data.get('recent_posts', []), - "today_in_history_posts": today_in_history_data - } - gen.write_json_file(json_string, json_file_path) - else: - logger.info("No need to rewrite homepage.json file") + except Exception as ex: + error_message = f"Error: {ex}\n{traceback.format_exc()}" + logger.error("Process Failed :(") - if os.path.exists(full_path): - archive_json_file_path = fr"static/homepage/{str_month_year}/{current_date_str}-homepage.json" - gen.store_file_in_archive(json_file_path, archive_json_file_path) + finally: + summarizer_log_csv(file_name="generate_homepage_xml", + error=error_message) diff --git a/generate_weekly_newsletter_json.py b/generate_weekly_newsletter_json.py index 33db667025..dd8a5ecab2 100644 --- a/generate_weekly_newsletter_json.py +++ b/generate_weekly_newsletter_json.py @@ -10,209 +10,217 @@ from src.config import ES_INDEX from src.elasticsearch_utils import ElasticSearchClient from src.json_utils import GenerateJSON +from src.utils import month_dict, summarizer_log_csv from src.xml_utils import GenerateXML -from src.utils import month_dict if __name__ == "__main__": - - gen = GenerateJSON() - xml_gen = GenerateXML() - elastic_search = ElasticSearchClient() - dev_urls = [ - ["https://lists.linuxfoundation.org/pipermail/bitcoin-dev/", - "https://gnusha.org/pi/bitcoindev/", - "https://mailing-list.bitcoindevs.xyz/bitcoindev/"], - "https://lists.linuxfoundation.org/pipermail/lightning-dev/", - "https://delvingbitcoin.org/" - ] - - current_date = datetime.now() - current_date_str = current_date.strftime("%Y-%m-%d") - - start_date = current_date - timedelta(days=7) - start_date_str = start_date.strftime("%Y-%m-%d") - - end_date = current_date - timedelta(days=1) - end_date_str = end_date.strftime("%Y-%m-%d") - - logger.info(f"Newsletter publish date: {current_date_str}") - logger.info(f"Gathering data for newsletter from {start_date_str} to {end_date_str}") - - month_name = month_dict[int(current_date.month)] - str_month_year = f"{month_name}_{int(current_date.year)}" - - active_data_list = [] - new_threads_list = [] - - for dev_url in dev_urls: - - data_list = elastic_search.extract_data_from_es( - ES_INDEX, dev_url, start_date_str, end_date_str, exclude_combined_summary_docs=True - ) - - if isinstance(dev_url, list): - dev_name = dev_url[0].split("/")[-2] + error_message = None + try: + gen = GenerateJSON() + xml_gen = GenerateXML() + elastic_search = ElasticSearchClient() + dev_urls = [ + ["https://lists.linuxfoundation.org/pipermail/bitcoin-dev/", + "https://gnusha.org/pi/bitcoindev/", + "https://mailing-list.bitcoindevs.xyz/bitcoindev/"], + "https://lists.linuxfoundation.org/pipermail/lightning-dev/", + "https://delvingbitcoin.org/" + ] + + current_date = datetime.now() + current_date_str = current_date.strftime("%Y-%m-%d") + + start_date = current_date - timedelta(days=7) + start_date_str = start_date.strftime("%Y-%m-%d") + + end_date = current_date - timedelta(days=1) + end_date_str = end_date.strftime("%Y-%m-%d") + + logger.info(f"Newsletter publish date: {current_date_str}") + logger.info(f"Gathering data for newsletter from {start_date_str} to {end_date_str}") + + month_name = month_dict[int(current_date.month)] + str_month_year = f"{month_name}_{int(current_date.year)}" + + active_data_list = [] + new_threads_list = [] + + for dev_url in dev_urls: + + data_list = elastic_search.extract_data_from_es( + ES_INDEX, dev_url, start_date_str, end_date_str, exclude_combined_summary_docs=True + ) + + if isinstance(dev_url, list): + dev_name = dev_url[0].split("/")[-2] + else: + dev_name = dev_url.split("/")[-2] + + logger.success(f"TOTAL THREADS RECEIVED FOR '{dev_name}': {len(data_list)}") + + # NEW THREADS POSTS + seen_titles = set() + for i in data_list: + this_title = i['_source']['title'] + if this_title in seen_titles: + continue + seen_titles.add(this_title) + + # check if the first post for this title is in the past week + original_post = elastic_search.get_earliest_posts_by_title(es_index=ES_INDEX, url=dev_url, title=this_title) + + if original_post['_source'] and i['_source']['created_at'] == original_post['_source']['created_at']: + logger.success(f"new thread created on: {original_post['_source']['created_at']} || TITLE: {this_title}") + + counts, contributors = elastic_search.es_fetch_contributors_and_threads( + es_index=ES_INDEX, title=this_title, domain=dev_url + ) + + for author in i['_source']['authors']: + contributors.remove(author) + i['_source']['n_threads'] = counts + i['_source']['contributors'] = contributors + i['_source']['dev_name'] = dev_name + new_threads_list.append(i) + logger.info(f"number of new threads started this week: {len(new_threads_list)}") + + # TOP ACTIVE POSTS + active_posts_data = elastic_search.filter_top_active_posts(es_results=data_list, top_n=15) + logger.info(f"number of filtered top active post: {len(active_posts_data)}") + + new_threads_titles_list = [i['_source']['title'] for i in new_threads_list] + + seen_titles = set() + # active_posts_data_counter = 0 + for data in active_posts_data: + # if active_posts_data_counter >= 3: + # break + + title = data['_source']['title'] + if (title in seen_titles) or (title in new_threads_titles_list): + continue + data['_source']['dev_name'] = dev_name + seen_titles.add(title) + active_data_list.append(data) + # active_posts_data_counter += 1 + logger.info(f"number of active posts collected: {len(active_data_list)}") + + # gather titles of docs from json file + json_file_path = fr"static/newsletters/newsletter.json" + + current_directory = os.getcwd() + json_full_path = os.path.join(current_directory, json_file_path) + json_xml_ids = set() + if os.path.exists(json_full_path): + with open(json_full_path, 'r') as j: + try: + json_data = json.load(j) + except Exception as e: + logger.info(f"Error reading json file:{json_full_path} :: {e}") + json_data = {} + + json_xml_ids = set( + [item['title'] for item in json_data.get('new_threads_this_week', [])] + + [item['title'] for item in json_data.get('active_posts_this_week', [])] + ) else: - dev_name = dev_url.split("/")[-2] - - logger.success(f"TOTAL THREADS RECEIVED FOR '{dev_name}': {len(data_list)}") - - # NEW THREADS POSTS - seen_titles = set() - for i in data_list: - this_title = i['_source']['title'] - if this_title in seen_titles: - continue - seen_titles.add(this_title) - - # check if the first post for this title is in the past week - original_post = elastic_search.get_earliest_posts_by_title(es_index=ES_INDEX, url=dev_url, title=this_title) - - if original_post['_source'] and i['_source']['created_at'] == original_post['_source']['created_at']: - logger.success(f"new thread created on: {original_post['_source']['created_at']} || TITLE: {this_title}") - - counts, contributors = elastic_search.es_fetch_contributors_and_threads( - es_index=ES_INDEX, title=this_title, domain=dev_url - ) - - for author in i['_source']['authors']: - contributors.remove(author) - i['_source']['n_threads'] = counts - i['_source']['contributors'] = contributors - i['_source']['dev_name'] = dev_name - new_threads_list.append(i) - logger.info(f"number of new threads started this week: {len(new_threads_list)}") - - # TOP ACTIVE POSTS - active_posts_data = elastic_search.filter_top_active_posts(es_results=data_list, top_n=15) - logger.info(f"number of filtered top active post: {len(active_posts_data)}") - - new_threads_titles_list = [i['_source']['title'] for i in new_threads_list] - - seen_titles = set() - # active_posts_data_counter = 0 - for data in active_posts_data: - # if active_posts_data_counter >= 3: - # break - - title = data['_source']['title'] - if (title in seen_titles) or (title in new_threads_titles_list): - continue - data['_source']['dev_name'] = dev_name - seen_titles.add(title) - active_data_list.append(data) - # active_posts_data_counter += 1 - logger.info(f"number of active posts collected: {len(active_data_list)}") - - # gather titles of docs from json file - json_file_path = fr"static/newsletters/newsletter.json" - - current_directory = os.getcwd() - json_full_path = os.path.join(current_directory, json_file_path) - json_xml_ids = set() - if os.path.exists(json_full_path): - with open(json_full_path, 'r') as j: - try: - json_data = json.load(j) - except Exception as e: - logger.info(f"Error reading json file:{json_full_path} :: {e}") - json_data = {} - - json_xml_ids = set( - [item['title'] for item in json_data.get('new_threads_this_week', [])] + - [item['title'] for item in json_data.get('active_posts_this_week', [])] - ) - else: - logger.warning(f"No existing newsletter.json file found: {json_full_path}") + logger.warning(f"No existing newsletter.json file found: {json_full_path}") - # gather ids of docs from active posts and new thread posts - filtered_docs_ids = set( - [data['_source']['title'] for data in active_data_list] + - [data['_source']['title'] for data in new_threads_list] - ) - - # check if there are any updates in the xml file - if filtered_docs_ids != json_xml_ids: - logger.info("changes found in recent posts ... ") - - delay = 5 - count = 0 - - while True: - try: - logger.success(f"Total no. of active posts collected: {len(active_data_list)}") - logger.success(f"Total no. of new threads started this week: {len(new_threads_list)}") - - logger.info("creating newsletter.json file ... ") - if len(active_data_list) > 0 or len(new_threads_list) > 0: - - new_threads_page_data = [] - active_page_data = [] - new_threads_summary = "" - - if new_threads_list: - new_threads_summary += gen.generate_recent_posts_summary(new_threads_list, verbose=True) - logger.success(new_threads_summary) + # gather ids of docs from active posts and new thread posts + filtered_docs_ids = set( + [data['_source']['title'] for data in active_data_list] + + [data['_source']['title'] for data in new_threads_list] + ) - for data in tqdm(new_threads_list): + # check if there are any updates in the xml file + if filtered_docs_ids != json_xml_ids: + logger.info("changes found in recent posts ... ") + + delay = 5 + count = 0 + + while True: + try: + logger.success(f"Total no. of active posts collected: {len(active_data_list)}") + logger.success(f"Total no. of new threads started this week: {len(new_threads_list)}") + + logger.info("creating newsletter.json file ... ") + if len(active_data_list) > 0 or len(new_threads_list) > 0: + + new_threads_page_data = [] + active_page_data = [] + new_threads_summary = "" + + if new_threads_list: + new_threads_summary += gen.generate_recent_posts_summary(new_threads_list, verbose=True) + logger.success(new_threads_summary) + + for data in tqdm(new_threads_list): + try: + # check and generate any missing file + xml_gen.start(dict_data=[data], url=data['_source']['domain']) + + entry_data = gen.create_single_entry( + data, + base_url_for_xml="https://tldr.bitcoinsearch.xyz/summary", + look_for_combined_summary=True, + remove_xml_extension=True + ) + new_threads_page_data.append(entry_data) + except Exception as ex: + logger.error( + f"Error occurred for doc id: {data['_source']['id']}\n{ex} \n{traceback.format_exc()}") + else: + logger.warning(f"No new threads started this week, generating summary of active posts this " + f"week ...") + # if no new threads started this week, generate summary from active post this week + new_threads_summary += gen.generate_recent_posts_summary(active_data_list) + logger.success(new_threads_summary) + + for data in tqdm(active_data_list): try: # check and generate any missing file xml_gen.start(dict_data=[data], url=data['_source']['domain']) entry_data = gen.create_single_entry( - data, - base_url_for_xml="https://tldr.bitcoinsearch.xyz/summary", - look_for_combined_summary=True, - remove_xml_extension=True + data, base_url_for_xml="https://tldr.bitcoinsearch.xyz/summary", + look_for_combined_summary=True, remove_xml_extension=True ) - new_threads_page_data.append(entry_data) + active_page_data.append(entry_data) except Exception as ex: logger.error( f"Error occurred for doc id: {data['_source']['id']}\n{ex} \n{traceback.format_exc()}") + + json_string = { + "summary_of_threads_started_this_week": new_threads_summary, + "new_threads_this_week": new_threads_page_data, + "active_posts_this_week": active_page_data + } + gen.write_json_file(json_string, json_file_path) + + archive_json_file_path = fr"static/newsletters/{str_month_year}/{current_date_str}-newsletter.json" + gen.store_file_in_archive(json_file_path, archive_json_file_path) + else: - logger.warning(f"No new threads started this week, generating summary of active posts this " - f"week ...") - # if no new threads started this week, generate summary from active post this week - new_threads_summary += gen.generate_recent_posts_summary(active_data_list) - logger.success(new_threads_summary) - - for data in tqdm(active_data_list): - try: - # check and generate any missing file - xml_gen.start(dict_data=[data], url=data['_source']['domain']) - - entry_data = gen.create_single_entry( - data, base_url_for_xml="https://tldr.bitcoinsearch.xyz/summary", - look_for_combined_summary=True, remove_xml_extension=True - ) - active_page_data.append(entry_data) - except Exception as ex: - logger.error( - f"Error occurred for doc id: {data['_source']['id']}\n{ex} \n{traceback.format_exc()}") - - json_string = { - "summary_of_threads_started_this_week": new_threads_summary, - "new_threads_this_week": new_threads_page_data, - "active_posts_this_week": active_page_data - } - gen.write_json_file(json_string, json_file_path) - - archive_json_file_path = fr"static/newsletters/{str_month_year}/{current_date_str}-newsletter.json" - gen.store_file_in_archive(json_file_path, archive_json_file_path) - - else: - logger.error(f"Data list empty! Please check the data again.") - - break - except Exception as ex: - logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") - time.sleep(delay) - count += 1 - if count > 1: - sys.exit(f"{ex}") - else: - logger.success("No change in the posts, no need to update newsletter.json file") - # save the previous one with updated name in archive - if os.path.exists(json_full_path): - archive_json_file_path = fr"static/newsletters/{str_month_year}/{current_date_str}-newsletter.json" - gen.store_file_in_archive(json_file_path, archive_json_file_path) + logger.error(f"Data list empty! Please check the data again.") + + break + except Exception as ex: + logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") + time.sleep(delay) + count += 1 + if count > 1: + sys.exit(f"{ex}") + else: + logger.success("No change in the posts, no need to update newsletter.json file") + # save the previous one with updated name in archive + if os.path.exists(json_full_path): + archive_json_file_path = fr"static/newsletters/{str_month_year}/{current_date_str}-newsletter.json" + gen.store_file_in_archive(json_file_path, archive_json_file_path) + except Exception as ex: + error_message = f"Error: {ex}\n{traceback.format_exc()}" + logger.error("Process Failed :(") + + finally: + summarizer_log_csv(file_name="generate_weekly_newsletter_json", + error=error_message) diff --git a/push_combined_summary_to_es.py b/push_combined_summary_to_es.py index e23ff34769..d7c5ef0578 100644 --- a/push_combined_summary_to_es.py +++ b/push_combined_summary_to_es.py @@ -3,66 +3,85 @@ from loguru import logger import glob import os - +from collections import defaultdict from src.config import ES_INDEX from src.elasticsearch_utils import ElasticSearchClient from src.xml_utils import XMLReader -from src.utils import remove_timestamps_from_author_names +from src.utils import remove_timestamps_from_author_names, summarizer_log_csv if __name__ == "__main__": - REMOVE_TIMESTAMPS_IN_AUTHORS = True + inserted_count = defaultdict(set) + updated_count = defaultdict(set) + no_changes_count = defaultdict(set) + unique_urls = set() + error_message = None + try: + REMOVE_TIMESTAMPS_IN_AUTHORS = True + + xml_reader = XMLReader() + elastic_search = ElasticSearchClient() - xml_reader = XMLReader() - elastic_search = ElasticSearchClient() + total_combined_files = [] + static_dirs = [ + 'bitcoin-dev', + 'lightning-dev', + 'delvingbitcoin' + ] + pattern = "combined*.xml" - total_combined_files = [] - static_dirs = [ - 'bitcoin-dev', - 'lightning-dev', - 'delvingbitcoin' - ] - pattern = "combined*.xml" + for static_dir in static_dirs: + combined_files = glob.glob(f"static/{static_dir}/**/{pattern}") + total_combined_files.extend(combined_files) + logger.info(f"Total combined files: {(len(total_combined_files))}") - for static_dir in static_dirs: - combined_files = glob.glob(f"static/{static_dir}/**/{pattern}") - total_combined_files.extend(combined_files) - logger.info(f"Total combined files: {(len(total_combined_files))}") + total_combined_files_dict = {os.path.splitext(os.path.basename(i))[0]: i for i in total_combined_files} - # get unique combined file paths - total_combined_files_dict = {os.path.splitext(os.path.basename(i))[0]: i for i in total_combined_files} + logger.info(f"Total unique combined files: {len(total_combined_files_dict)}") - logger.info(f"Total unique combined files: {len(total_combined_files_dict)}") + for file_name, full_path in tqdm.tqdm(total_combined_files_dict.items()): + try: - count_new = 0 - count_updated = 0 + xml_file_data = xml_reader.read_xml_file(full_path) + url = xml_file_data['domain'] + unique_urls.add(url) - for file_name, full_path in tqdm.tqdm(total_combined_files_dict.items()): - try: - # get data from xml file - xml_file_data = xml_reader.read_xml_file(full_path) + if REMOVE_TIMESTAMPS_IN_AUTHORS: - if REMOVE_TIMESTAMPS_IN_AUTHORS: - # remove timestamps from author's names and collect unique names only - xml_file_data['authors'] = remove_timestamps_from_author_names(xml_file_data['authors']) + xml_file_data['authors'] = remove_timestamps_from_author_names(xml_file_data['authors']) - res = elastic_search.es_client.update( - index=ES_INDEX, - id=file_name, - body={ + body = { 'doc': xml_file_data, 'doc_as_upsert': True } - ) - logger.success(f"Version-{res['_version']}, Result-{res['result']}, ID-{res['_id']}") - if res['result'] == 'created': - count_new += 1 - if res['result'] == 'updated': - count_updated += 1 + res = elastic_search.upsert_document(index_name=ES_INDEX, + doc_id=file_name, + doc_body=body) + + if res['result'] == 'created' or res['result'] == 'updated': + updated_count[url].add(res['_id']) + elif res['result'] == 'noop': + no_changes_count[url].add(res['_id']) + + except Exception as ex: + logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") + logger.warning(full_path) + + logger.success(f"Process complete.") + + except Exception as ex: + error_occurred = True + error_message = f"Error:{ex}\n{traceback.format_exc()}" + logger.error(f"Error Message: {error_message}") + logger.error(f"Process failed.") - except Exception as ex: - error_message = f"Error occurred: {ex} \n{traceback.format_exc()}" - logger.error(error_message) + finally: + summarizer_log_csv(file_name='push_combined_summary_to_es', + domain=list(unique_urls), + inserted=sum(len(inserted_count[url]) for url in unique_urls), + updated=sum(len(updated_count[url]) for url in unique_urls), + no_changes=sum(len(no_changes_count[url]) for url in unique_urls), + error=error_message) - logger.info(f"Inserted {count_new} new documents, Updated {count_updated} documents") + logger.success("Process Complete.") diff --git a/push_summary_to_es.py b/push_summary_to_es.py index 5c57af7544..8074f47f4c 100644 --- a/push_summary_to_es.py +++ b/push_summary_to_es.py @@ -5,7 +5,7 @@ from src.config import ES_INDEX from src.elasticsearch_utils import ElasticSearchClient from src.xml_utils import XMLReader - +from src.utils import summarizer_log_csv if __name__ == "__main__": @@ -22,47 +22,67 @@ "https://mailing-list.bitcoindevs.xyz/bitcoindev/" ] - for dev_url in dev_urls: + error_occurred = False + error_message = None + updated_ids = set() + try: + for dev_url in dev_urls: + try: + if APPLY_DATE_RANGE: + current_date_str = None + if not current_date_str: + current_date_str = datetime.now().strftime("%Y-%m-%d") + start_date = datetime.now() - timedelta(days=15) + start_date_str = start_date.strftime("%Y-%m-%d") + logger.info(f"start_date: {start_date_str}") + logger.info(f"current_date_str: {current_date_str}") + else: + start_date_str = None + current_date_str = None - if APPLY_DATE_RANGE: - current_date_str = None - if not current_date_str: - current_date_str = datetime.now().strftime("%Y-%m-%d") - start_date = datetime.now() - timedelta(days=15) - start_date_str = start_date.strftime("%Y-%m-%d") - logger.info(f"start_date: {start_date_str}") - logger.info(f"current_date_str: {current_date_str}") - else: - start_date_str = None - current_date_str = None + docs_list = elastic_search.fetch_data_with_empty_summary(ES_INDEX, dev_url, start_date_str, + current_date_str) - docs_list = elastic_search.fetch_data_with_empty_summary(ES_INDEX, dev_url, start_date_str, current_date_str) + if isinstance(dev_url, list): + dev_name = dev_url[0].split("/")[-2] + else: + dev_name = dev_url.split("/")[-2] - if isinstance(dev_url, list): - dev_name = dev_url[0].split("/")[-2] - else: - dev_name = dev_url.split("/")[-2] + logger.info(f"Total threads received with empty summary for '{dev_name}': {len(docs_list)}") - logger.success(f"Total threads received with empty summary for '{dev_name}': {len(docs_list)}") + for doc in tqdm.tqdm(docs_list): + try: + doc_id = doc['_id'] + doc_index = doc['_index'] + if not doc['_source'].get('summary'): + xml_summary = xml_reader.get_xml_summary(doc, dev_name) + if xml_summary: + res = elastic_search.es_client.update( + index=doc_index, + id=doc_id, + body={ + 'doc': { + "summary": xml_summary + } + } + ) + updated_ids.add(res['_id']) + except Exception as ex: + logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") + logger.warning(doc) + + logger.success(f"Process complete for dev_url: {dev_url}") - for doc in tqdm.tqdm(docs_list): - res = None - try: - doc_id = doc['_id'] - doc_index = doc['_index'] - if not doc['_source'].get('summary'): - xml_summary = xml_reader.get_xml_summary(doc, dev_name) - if xml_summary: - elastic_search.es_client.update( - index=doc_index, - id=doc_id, - body={ - 'doc': { - "summary": xml_summary - } - } - ) except Exception as ex: - logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") + error_occurred = True + error_message = f"Error: {ex}\n{traceback.format_exc()}" + + except Exception as ex: + logger.error(f"Error: {str(ex)}\n{traceback.format_exc()}") - logger.success(f"Process complete.") + finally: + summarizer_log_csv( + file_name='push_summary_to_elasticsearch', + domain=dev_urls, + updated=len(updated_ids), + error=error_message) diff --git a/src/elasticsearch_utils.py b/src/elasticsearch_utils.py index c45eb0bd5e..a2f29be802 100644 --- a/src/elasticsearch_utils.py +++ b/src/elasticsearch_utils.py @@ -1,5 +1,6 @@ import time from datetime import datetime + from elasticsearch import Elasticsearch, NotFoundError from elasticsearch.helpers import scan from loguru import logger @@ -23,10 +24,41 @@ def __init__(self, http_auth=(self._es_username, self._es_password), ) + def get_domain_counts(self, index_name, domain): + """Function to get the total counts for the given 'domain' field from Elasticsearch index.""" + domain_query = self.get_domain_query(domain) + body = { + "query": domain_query + } + + try: + resp = self.es_client.count(index=index_name, body=body) + return resp['count'] + except Exception as e: + logger.error(f"Error fetching domain counts: {e}") + return None + @property def es_client(self): return self._es_client + def upsert_document(self, index_name, doc_id, doc_body): + + script = { + "source": "ctx._source.putAll(params)", + "params": doc_body + } + + request_body = { + "scripted_upsert": True, + "script": script, + "upsert": doc_body + } + + # Perform the upsert operation + response = self._es_client.update(index=index_name, id=doc_id, body=request_body) + return response + def get_domain_query(self, url): if isinstance(url, list): domain_query = {"terms": {"domain.keyword": url}} @@ -90,15 +122,15 @@ def fetch_data_based_on_title(self, es_index, title, url): "must": [ { "match_phrase": - { - "title.keyword": title - } + { + "title.keyword": title + } }, { "term": - { - "domain.keyword": str(url) - } + { + "domain.keyword": str(url) + } } ] } @@ -236,7 +268,7 @@ def filter_top_recent_posts(self, es_results, top_n): def filter_top_active_posts(self, es_results, top_n): unique_results = [] - thread_dict = {} # maps post titles to their respective activity levels + thread_dict = {} # maps post titles to their respective activity levels # create dictionary with title as key and thread count as value for result in es_results: title = result['_source']['title'] @@ -383,10 +415,10 @@ def es_fetch_contributors_and_threads(self, es_index, title, domain): """ Fetches the count of threads and unique contributors for a given post based on title and domain """ - # The search query + # The search query domain_query = self.get_domain_query(domain) query = { - "size": 0, # no search hits are returned, the focus is solely on the aggregations and counts + "size": 0, # no search hits are returned, the focus is solely on the aggregations and counts "query": { "bool": { "must": [ diff --git a/src/utils.py b/src/utils.py index 6f49fa8a1b..2ccc675e67 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,18 +1,22 @@ -import requests -from bs4 import BeautifulSoup -import pandas as pd +import csv +import datetime import os -from tqdm import tqdm import re from ast import literal_eval + +import pandas as pd +import pytz +import requests +from bs4 import BeautifulSoup from dateutil.parser import parse from dateutil.relativedelta import relativedelta from loguru import logger -import pytz -import datetime +from tqdm import tqdm + +from src.config import TOKENIZER, CHATGPT +from src.elasticsearch_utils import ElasticSearchClient from src.gpt_utils import generate_chatgpt_summary, generate_summary, generate_title, generate_chatgpt_title, \ consolidate_chatgpt_summary, consolidate_summary -from src.config import TOKENIZER, CHATGPT CURRENT_TIME = datetime.datetime.now(datetime.timezone.utc) CURRENT_TIMESTAMP = str(CURRENT_TIME.timestamp()).replace(".", "_") @@ -350,7 +354,8 @@ def save_html_file(df_week_generated, save_file_name): file_handle.write(html) # write title and summary - html = f"

{title}

{summary}


References:" + html = (f"

{title}

{summary}

References:") file_handle.write(html) for i in range(len(urls)): @@ -367,3 +372,32 @@ def save_html_file(df_week_generated, save_file_name): file_handle.close() return f"output/{save_file_name}.html" + + +def summarizer_log_csv(file_name, domain=None, inserted=0, updated=0, no_changes=0, folder_path="./summarizer_logs/", + error=None): + last_updated = datetime.datetime.now().isoformat(timespec='milliseconds').replace('+00:00', 'Z') + + if domain: + csv_headers = ['last_updated', 'source', 'total_docs', 'inserted_docs', 'updated_docs', 'no_changed_docs', + 'error'] + total_docs = ElasticSearchClient().get_domain_counts(index_name=os.getenv('INDEX'), domain=domain) + csv_data = [last_updated, domain, total_docs, inserted, updated, no_changes, error] + else: + csv_headers = ['last_updated', 'error'] + csv_data = [last_updated, error] + + os.makedirs(folder_path, exist_ok=True) + save_file_path = f"{folder_path}/{file_name}.csv" + with open(save_file_path, mode='a', newline='') as csv_file: + writer = csv.writer(csv_file) + if csv_file.tell() == 0: + writer.writerow(csv_headers) + + writer.writerow(csv_data) + logger.success(f"Logs updated successfully at: {save_file_path}") + + logger.info(f"Inserted Docs: {inserted}") + logger.info(f"Updated Docs: {updated}") + logger.info(f"No changed Docs: {no_changes}") + logger.info(f"Error Message: {error}") diff --git a/xmls_generator_production.py b/xmls_generator_production.py index c5ea4d9ddf..79d48cd1ac 100644 --- a/xmls_generator_production.py +++ b/xmls_generator_production.py @@ -1,18 +1,21 @@ +import sys import time +import traceback +import warnings from datetime import datetime, timedelta -import sys + from loguru import logger -import warnings from openai.error import APIError, PermissionError, AuthenticationError, InvalidAPIType, ServiceUnavailableError + from src.config import ES_INDEX from src.elasticsearch_utils import ElasticSearchClient +from src.utils import summarizer_log_csv from src.xml_utils import GenerateXML warnings.filterwarnings("ignore") if __name__ == "__main__": - gen = GenerateXML() - elastic_search = ElasticSearchClient() + error_message = None dev_urls = [ "https://lists.linuxfoundation.org/pipermail/bitcoin-dev/", "https://lists.linuxfoundation.org/pipermail/lightning-dev/", @@ -20,36 +23,47 @@ "https://gnusha.org/pi/bitcoindev/", "https://mailing-list.bitcoindevs.xyz/bitcoindev/" ] + try: + gen = GenerateXML() + elastic_search = ElasticSearchClient() + + end_date = datetime.now() + start_date = end_date - timedelta(days=30) + + # yyyy-mm-dd + end_date_str = end_date.strftime("%Y-%m-%d") + start_date_str = start_date.strftime("%Y-%m-%d") + logger.info(f"start_data: {start_date_str}") + logger.info(f"end_date_str: {end_date_str}") + + for dev_url in dev_urls: + data_list = elastic_search.extract_data_from_es( + ES_INDEX, dev_url, start_date_str, end_date_str, exclude_combined_summary_docs=True + ) + dev_name = dev_url.split("/")[-2] + logger.success(f"TOTAL THREADS RECEIVED FOR - '{dev_url}': {len(data_list)}") + + delay = 5 + count_main = 0 + + while True: + try: + gen.start(data_list, dev_url) + break + except (APIError, PermissionError, AuthenticationError, InvalidAPIType, ServiceUnavailableError) as ex: + logger.error(str(ex)) + logger.error(ex) + time.sleep(delay) + count_main += 1 + if count_main > 5: + sys.exit(ex) + except Exception as ex: + error_message = f"Error: {ex}\n{traceback.format_exc()}" + logger.error("Process Failed :(") - end_date = datetime.now() - start_date = end_date - timedelta(days=30) - - # yyyy-mm-dd - end_date_str = end_date.strftime("%Y-%m-%d") - start_date_str = start_date.strftime("%Y-%m-%d") - logger.info(f"start_data: {start_date_str}") - logger.info(f"end_date_str: {end_date_str}") - - for dev_url in dev_urls: - data_list = elastic_search.extract_data_from_es( - ES_INDEX, dev_url, start_date_str, end_date_str, exclude_combined_summary_docs=True - ) - dev_name = dev_url.split("/")[-2] - logger.success(f"TOTAL THREADS RECEIVED FOR - '{dev_url}': {len(data_list)}") - - delay = 5 - count_main = 0 - - while True: - try: - gen.start(data_list, dev_url) - break - except (APIError, PermissionError, AuthenticationError, InvalidAPIType, ServiceUnavailableError) as ex: - logger.error(str(ex)) - logger.error(ex) - time.sleep(delay) - count_main += 1 - if count_main > 5: - sys.exit(ex) + finally: + summarizer_log_csv(file_name="xmls_generator_production", + domain=dev_urls, + error=error_message) logger.info("Process Complete.")