diff --git a/.github/workflows/homepage_json_gen_cron_job.yml b/.github/workflows/homepage_json_gen_cron_job.yml index b6493212bf..545c85313b 100644 --- a/.github/workflows/homepage_json_gen_cron_job.yml +++ b/.github/workflows/homepage_json_gen_cron_job.yml @@ -37,7 +37,7 @@ jobs: pip install -r requirements.txt - name: Execute Python script - run: python generate_homepage_xml.py + run: python generate_homepage_json.py - name: Configure Git run: | diff --git a/README.md b/README.md index 6e5ad1a997..1b33151298 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Utilizing data collected by the [scraper](https://github.com/bitcoinsearch/scrap - Queries Elasticsearch for documents lacking summaries, extracts summaries from corresponding XML files, and then updates these documents with their summaries in the Elasticsearch index. 3. Daily [Push Combined Summary From XML Files to ES INDEX](.github/workflows/push_combined_summary_to_es_cron_job.yml) ([source](push_combined_summary_to_es.py)) - Processes each combined thread summary XML file, transforming it into a document format, checks for its existence in Elasticsearch, and updates or inserts the document as needed. -4. Daily [Python Homepage Update Script](.github/workflows/homepage_json_gen_cron_job.yml) ([source](generate_homepage_xml.py)) +4. Daily [Python Homepage Update Script](.github/workflows/homepage_json_gen_cron_job.yml) ([source](generate_homepage_json.py)) - Queries the last 7 days of data from Elasticsearch for each source to compile lists of active threads, recent threads, and historical threads for 'Today in History'. It generates a summary of recent threads if available; otherwise, for active threads. The resulting [`homepage.json`](static/homepage.json) is then committed to GitHub to be used by [Bitcoin TLDR](https://github.com/bitcoinsearch/tldr). 5. Weekly [Python Newsletter Generation Script](.github/workflows/weekly_newsletter_gen_cron_job.yml) ([source](generate_weekly_newsletter_json.py)) - Generates a newsletter by compiling lists of new and active threads from the past week's data for each source. It generates a summary of new threads if available; otherwise, for active threads. The resulting [`newsletter.json`](static/newsletters/newsletter.json) is then committed to GitHub to be used by [Bitcoin TLDR](https://github.com/bitcoinsearch/tldr). diff --git a/app.py b/app.py deleted file mode 100644 index 9012bd4819..0000000000 --- a/app.py +++ /dev/null @@ -1,312 +0,0 @@ -from datetime import datetime -import calendar -import os -import openai -from dotenv import load_dotenv -from flask import Flask, request, Response, render_template, url_for, abort, send_file -import feedparser -import xml.etree.ElementTree as ET -from flask_frozen import Freezer -import re -from flask import Flask -from markupsafe import Markup -import shutil -import nltk - -from src.logger import setup_logger - -logger = setup_logger() - -load_dotenv() -openai.api_key = os.environ.get("OPENAI_API_KEY") - -app = Flask(__name__, static_url_path='', static_folder='css') - - -# app.config['SECRET_KEY'] = os.environ.get("FLASK_SECRET") -def linkify(text): - url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+(?">\g<0>', text)) - - -def remove_unfinished_sentences(text): - try: - sentences = nltk.sent_tokenize(text) - if not sentences[-1].endswith(('.', '!', '?')): - sentences = sentences[:-1] - text = ' '.join(sentences) - except Exception as ex: - print(f"Error: {ex}") - url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+(?">\g<0>', text)) - - -# app.jinja_env.filters['linkify'] = linkify -app.jinja_env.filters['remove_unfinished'] = remove_unfinished_sentences -app.config['FREEZER_DEFAULT_URL_GENERATOR'] = 'flask_frozen.url_generators.default_url_generator_with_html' -freezer = Freezer(app) - - -@freezer.register_generator -def url_generator(): - build_path = os.path.join(app.root_path, "build") - yield from generate_url_list(build_path) - - -def save_static_html(endpoint, dev_name, year_month, type_by, build_path): - with app.app_context(): - folder = f'static/{dev_name}/{year_month}' - posts, min_date, max_date = parse_xml_files(folder) - - if type_by == "thread": - posts = sorted(posts, key=lambda p: p['author']) - elif type_by == "subject": - posts = sorted(posts, key=lambda p: p['title']) - elif type_by == "date": - posts = sorted(posts, key=lambda p: p['date']) - elif type_by == "author": - posts = sorted(posts, key=lambda p: p['author']) - else: - raise ValueError(f"Invalid type_by: {type_by}") - - html = render_template('thread.html', posts=posts, year_month=year_month, min_date=min_date, max_date=max_date, - type_by=type_by) - - html_folder_path = os.path.join(build_path, endpoint) - os.makedirs(html_folder_path, exist_ok=True) - html_file_path = os.path.join(html_folder_path, f"{year_month}.html") - - with open(html_file_path, "w", encoding="utf-8") as f: - f.write(html) - - -def save_static_xml(dev_name, year_month, filename, build_path): - original_file_path = os.path.join(app.root_path, f"static/{dev_name}/{year_month}/{filename}") - xml_folder_path = os.path.join(build_path, dev_name, year_month) - os.makedirs(xml_folder_path, exist_ok=True) - xml_file_path = os.path.join(xml_folder_path, filename) - - shutil.copyfile(original_file_path, xml_file_path) - - -def generate_url_list(build_path=None): - url_list = [] - data = get_year_month_data() - - for row in data: - year_month = row["month"].replace(" ", "_") - folder = f'static/{row["dev_name"]}/{year_month}' - - # Check if the folder exists - if os.path.isdir(os.path.join(app.root_path, folder)): - for type_by in ["thread", "subject", "author", "date"]: - save_static_html(type_by, row["dev_name"], year_month, type_by, build_path) - - url_list.append(url_for("thread", dev_name=row["dev_name"], year_month=year_month)) - url_list.append(url_for("subject", dev_name=row["dev_name"], year_month=year_month)) - url_list.append(url_for("author", dev_name=row["dev_name"], year_month=year_month)) - url_list.append(url_for("date", dev_name=row["dev_name"], year_month=year_month)) - - posts, _, _ = parse_xml_files(folder) - for post in posts: - save_static_xml(row["dev_name"], year_month, post["filename"], build_path) - url_list.append( - url_for("display_feed", dev_name=row["dev_name"], year_month=year_month, filename=post["filename"])) - - return url_list - - -def parse_xml_files(folder): - files = os.listdir(os.path.join(app.root_path, folder)) - posts = [] - namespace = {'atom': 'http://www.w3.org/2005/Atom'} - - for file in files: - if file.endswith('.xml'): - tree = ET.parse(os.path.join(folder, file)) - root = tree.getroot() - - title = root.find('atom:title', namespace).text - author = root.find('atom:author/atom:name', namespace).text - date = root.find('atom:entry/atom:published', namespace).text - # summary = root.find('atom:entry/atom:summary', namespace).text - posts.append({'title': title, 'author': author, 'date': date, 'filename': file}) - - min_date = min(posts, key=lambda x: x['date'])['date'] - max_date = max(posts, key=lambda x: x['date'])['date'] - min_date_dt = datetime.fromisoformat(min_date) - max_date_dt = datetime.fromisoformat(max_date) - - return posts, min_date_dt, max_date_dt - - -def get_year_month_data(): - month_order = { - 'Jan': 1, - 'Feb': 2, - 'March': 3, - 'April': 4, - 'May': 5, - 'June': 6, - 'July': 7, - 'Aug': 8, - 'Sept': 9, - 'Oct': 10, - 'Nov': 11, - 'Dec': 12 - } - - folders = os.listdir(os.path.join(app.root_path, 'static')) - data = [] - for dev_folder in folders: - if os.path.isdir(os.path.join(app.root_path, f'static/{dev_folder}')): - for f in os.listdir(os.path.join(app.root_path, f'static/{dev_folder}')): - month = f.split("_")[0] - year = f.split("_")[-1] - data.append({"month": f"{month} {year}", "dev_name": str(dev_folder)}) - data_sorted = sorted(data, key=lambda x: (int(x['month'].split()[1]), - (month_order[x['month'].split()[0]])), reverse=True) - - return data_sorted - - -@app.route("/") -def archive(): - data = get_year_month_data() - return render_template('index.html', data=data) - - -def sort_grouping(posts): - ''' - diff between filename and title - - for combined file - # filename = combined_BIP-20-Rejected-process-for-BIP-21N.xml - # title = Combined summary - BIP-20-Rejected-process-for-BIP-21N - - for simple file - # filename = 001234_BIP-20-Rejected-process-for-BIP-21N.xml - # title = BIP-20-Rejected-process-for-BIP-21N - - ''' - - combined = {} - threads = [] - for post in posts: - if "combined_" in post['filename']: - ckey = '_'.join(post['filename'].split("_")[1:]) - combined[ckey] = post - else: - threads.append(post) - - sorted_threads = sorted(threads, key=lambda x: x['title']) - for i, thread in enumerate(sorted_threads): - ckey = '_'.join(thread['filename'].split("_")[1:]) - if ckey in combined: - sorted_threads.insert(i, combined[ckey]) - combined.pop(ckey) - - return sorted_threads - - -def sort_and_grouping(posts): - combined = {} - for post in posts: - if "combined_" in post['filename']: - ckey = '_'.join(post['filename'].split("_")[1:]) - combined[ckey] = post - sorted_threads = sorted(posts, key=lambda x: x['title']) - new_threads = [] - for thread in sorted_threads: - if "combined_" not in thread['filename']: - ckey = '_'.join(thread['filename'].split("_")[1:]) - if ckey not in combined: - new_threads.append(thread) - else: - new_threads.append(thread) - - return new_threads - - -@app.route('/thread//.html') -def thread(dev_name, year_month): - try: - folder = f'static/{dev_name}/{year_month}' - posts, min_date, max_date = parse_xml_files(folder) - posts = sort_and_grouping(posts) - return render_template('thread.html', posts=posts, dev_name=dev_name, year_month=year_month, min_date=min_date, - max_date=max_date, type_by="thread") - except Exception as e: - logger.exception(e) - abort(500) - - -@app.route('/author//.html') -def author(dev_name, year_month): - folder = f'static/{dev_name}/{year_month}' - posts, min_date, max_date = parse_xml_files(folder) - posts = sorted(posts, key=lambda p: p['author']) - return render_template('thread.html', posts=posts, dev_name=dev_name, year_month=year_month, min_date=min_date, - max_date=max_date, type_by="author") - - -@app.route('/subject//.html') -def subject(dev_name, year_month): - folder = f'static/{dev_name}/{year_month}' - posts, min_date, max_date = parse_xml_files(folder) - posts = sorted(posts, key=lambda p: p['title']) - return render_template('thread.html', posts=posts, dev_name=dev_name, year_month=year_month, min_date=min_date, - max_date=max_date, type_by="subject") - - -@app.route('/date//.html') -def date(dev_name, year_month): - folder = f'static/{dev_name}/{year_month}' - posts, min_date, max_date = parse_xml_files(folder) - posts = sorted(posts, key=lambda p: p['date']) - return render_template('thread.html', posts=posts, dev_name=dev_name, year_month=year_month, min_date=min_date, - max_date=max_date, type_by="date") - - -@app.route('///.html') -def display_feed(dev_name, year_month, filename): - filename = filename + ".xml" - file_url = f"./static/{dev_name}/{year_month}/{filename}" - xml_feed = feedparser.parse(file_url) - combined_filename = "combined_" + "_".join(filename.split("_")[1:]) - if combined_filename in os.listdir(os.path.join("./static", str(dev_name), str(year_month))): - return render_template('feed.html', feed=xml_feed, dev_name=dev_name, year_month=year_month, - filename=combined_filename) - else: - return render_template('feed.html', feed=xml_feed, dev_name=dev_name, year_month=year_month, - filename=filename) - - -@app.route('///') -def display_xml(dev_name, year_month, filename): - original_file_path = f"./static/{dev_name}/{year_month}/{filename}" - combined_filename = "combined_" + "_".join(filename.split("_")[1:]) - combined_file_path = f"./static/{dev_name}/{year_month}/{combined_filename}" - - if os.path.exists(original_file_path): - file_path = original_file_path - elif os.path.exists(combined_file_path): - file_path = combined_file_path - else: - return f"Error: {filename} not found in {year_month}", 404 - - return send_file(file_path, mimetype='text/xml') - - -if __name__ == '__main__': - import sys - nltk.download('punkt') - - if len(sys.argv) > 1 and sys.argv[1] == "build": - freezer.freeze() - else: - os.popen('python scheduler.py > scheduler_logs.txt 2>&1 &') - try: - app.run(debug=True) - except Exception as e: - logger.exception(e) diff --git a/generate_homepage_xml.py b/generate_homepage_json.py similarity index 74% rename from generate_homepage_xml.py rename to generate_homepage_json.py index ec79e966d7..bdd504c643 100644 --- a/generate_homepage_xml.py +++ b/generate_homepage_json.py @@ -1,372 +1,400 @@ -import random -import time -import traceback -from datetime import datetime, timedelta -from loguru import logger -import os -import sys -import warnings -import json -from tqdm import tqdm - -from src.config import ES_INDEX -from src.elasticsearch_utils import ElasticSearchClient -from src.json_utils import GenerateJSON -from src.xml_utils import GenerateXML -from src.utils import month_dict - -warnings.filterwarnings("ignore") - - -def page_data_handling(data_list: list, get_unique_per_dev=False): - page_data = [] - collected_dev_data = [] - for data in tqdm(data_list): - try: - # check and generate any missing file - xml_gen.start(dict_data=[data], url=data['_source']['domain']) - entry_data = gen.create_single_entry(data, look_for_combined_summary=True) - - if get_unique_per_dev: - if entry_data['dev_name'] not in collected_dev_data: - collected_dev_data.append(entry_data['dev_name']) - logger.info(f"collected data for: {collected_dev_data}") - page_data.append(entry_data) - else: - page_data.append(entry_data) - except Exception as ex: - logger.error( - f"Error occurred for doc id: {data['_source']['id']}\n{ex} \n{traceback.format_exc()}") - return page_data - - -if __name__ == "__main__": - - gen = GenerateJSON() - xml_gen = GenerateXML() - elastic_search = ElasticSearchClient() - dev_urls = [ - ["https://lists.linuxfoundation.org/pipermail/bitcoin-dev/", - "https://gnusha.org/pi/bitcoindev/"], - "https://lists.linuxfoundation.org/pipermail/lightning-dev/", - "https://delvingbitcoin.org/" - ] - - current_date = datetime.now() - current_date_str = current_date.strftime("%Y-%m-%d") - - start_date = current_date - timedelta(days=7) - start_date_str = start_date.strftime("%Y-%m-%d") - logger.info(f"start_date: {start_date_str}") - logger.info(f"current_date_str: {current_date_str}") - - month_name = month_dict[int(current_date.month)] - str_month_year = f"{month_name}_{int(current_date.year)}" - - json_file_path = fr"static/homepage.json" - - recent_data_list = [] - active_data_list = [] - today_in_history_data_list = [] - history_data_collected_from_yesterday = False - - random_years_ago = None - - for dev_url in dev_urls: - logger.info(f"Working on URL: {dev_url}") - fetch_today_in_history = True - - data_list = elastic_search.extract_data_from_es( - ES_INDEX, dev_url, start_date_str, current_date_str, exclude_combined_summary_docs=True - ) - - if isinstance(dev_url, list): - dev_name = dev_url[0].split("/")[-2] - else: - dev_name = dev_url.split("/")[-2] - - logger.success(f"TOTAL THREADS RECEIVED FOR - '{dev_name}': {len(data_list)}") - - seen_titles = set() - - # TOP ACTIVE POSTS - active_posts_data = elastic_search.filter_top_active_posts( - es_results=data_list, top_n=10 - ) - - active_posts_data_counter = 0 - for data in active_posts_data: - if active_posts_data_counter >= 3: - break - - title = data['_source']['title'] - if title in seen_titles: - continue - seen_titles.add(title) - - # get the first post's info of this title - original_post = elastic_search.get_earliest_posts_by_title( - es_index=ES_INDEX, url=dev_url, title=title - ) - - counts, contributors = elastic_search.es_fetch_contributors_and_threads( - es_index=ES_INDEX, title=title, domain=dev_url - ) - - # if you want to show the first post of each selected title, - # then do the below operations on - 'original_post', else on 'data' - for author in original_post['_source']['authors']: - contributors.remove(author) - original_post['_source']['n_threads'] = counts - original_post['_source']['contributors'] = contributors - original_post['_source']['dev_name'] = dev_name - active_data_list.append(original_post) - active_posts_data_counter += 1 - - logger.success(f"Number of active posts collected: {len(active_data_list)}, for URL: {dev_url}") - - # TOP RECENT POSTS - recent_data_post_counter = 0 - recent_posts_data = elastic_search.filter_top_recent_posts(es_results=data_list, top_n=20) - - for data in recent_posts_data: - # if preprocess body text not longer than token_threshold, skip that post - if not gen.is_body_text_long(data=data, sent_threshold=2): - logger.info(f"skipping: {data['_source']['title']} - {data['_source']['url']}") - continue - - title = data['_source']['title'] - if title in seen_titles: - continue - seen_titles.add(title) - if recent_data_post_counter >= 3: - break - - counts, contributors = elastic_search.es_fetch_contributors_and_threads( - es_index=ES_INDEX, title=title, domain=dev_url - ) - # exclude the post authors - for author in data['_source']['authors']: - contributors.remove(author) - data['_source']['n_threads'] = counts - data['_source']['contributors'] = contributors - data['_source']['dev_name'] = dev_name - recent_data_list.append(data) - recent_data_post_counter += 1 - - if not recent_data_list: - for data in recent_posts_data: - # if preprocess body text not longer than token_threshold, skip that post - if not gen.is_body_text_long(data=data, sent_threshold=2): - logger.info(f"skipping: {data['_source']['title']} - {data['_source']['url']}") - continue - - title = data['_source']['title'] - if recent_data_post_counter >= 3: - break - counts, contributors = elastic_search.es_fetch_contributors_and_threads( - es_index=ES_INDEX, title=title, domain=dev_url - ) - # exclude the post authors - for author in data['_source']['authors']: - contributors.remove(author) - data['_source']['n_threads'] = counts - data['_source']['contributors'] = contributors - data['_source']['dev_name'] = dev_name - recent_data_list.append(data) - recent_data_post_counter += 1 - - logger.success(f"Number of recent posts collected: {len(recent_data_list)}, for URL: {dev_url}") - - # TODAY IN HISTORY POSTS - logger.info(f"fetching 'Today in history' posts... ") - - if not random_years_ago: - at_least_years_ago = 3 - at_max_years_ago = current_date.year - 2015 - random_years_ago = random.randint(at_least_years_ago, at_max_years_ago) - logger.info(f"random years ago between {at_least_years_ago} to {at_max_years_ago}: {random_years_ago}") - - if dev_url == "https://delvingbitcoin.org/": - random_years_ago = random.randint(1, current_date.year - 2022) - logger.info( - f"for delving-bitcoin - random years ago between {1} to {current_date.year - 2022}: {random_years_ago}") - - default_days_to_look_back = 6 - loop_counter = 1 - - while fetch_today_in_history: - days_to_look_back = default_days_to_look_back * loop_counter - selected_random_date = current_date - timedelta(days=365 * random_years_ago) - - start_of_time = selected_random_date - timedelta(days=selected_random_date.weekday()) - end_of_time = start_of_time + timedelta(days=days_to_look_back) - - start_of_time_str = start_of_time.strftime("%Y-%m-%dT%H:%M:%S") - end_of_time_str = end_of_time.strftime("%Y-%m-%dT%H:%M:%S") - - logger.info(f"collecting the data from {days_to_look_back} days range ... || Start of week: {start_of_time} | " - f"End of week: {end_of_time}") - - selected_threads = elastic_search.fetch_data_in_date_range( - es_index=ES_INDEX, - start_date=start_of_time_str, - end_date=end_of_time_str, - domain=dev_url - ) - - if len(selected_threads) > 0: - for doc in selected_threads: - doc_title = doc['_source']['title'] - doc_created_at = doc['_source']['created_at'] - - if doc['_source']['type'] == 'original_post': - - counts, contributors = elastic_search.es_fetch_contributors_and_threads( - es_index=ES_INDEX, title=doc_title, domain=dev_url - ) - - if counts < 5: - logger.info(f"No. of replies are less than 5, skipping it... ") - continue - - if contributors: - # exclude the post authors - for author in doc['_source']['authors']: - contributors.remove(author) - doc['_source']['n_threads'] = counts - doc['_source']['contributors'] = contributors - doc['_source']['dev_name'] = dev_name - today_in_history_data_list.append(doc) - logger.info(f"collected doc created on: {doc_created_at} || TITLE: {doc_title}") - fetch_today_in_history = False - break - loop_counter += 1 - - # add history data from yesterday's homepage.json - if not today_in_history_data_list: - logger.info("Collecting yesterday's history threads!") - current_directory = os.getcwd() - full_path = os.path.join(current_directory, json_file_path) - if os.path.exists(full_path): - with open(full_path, 'r') as j: - try: - data = json.load(j) - except Exception as e: - logger.info(f"Error reading json file:{full_path} :: {e}") - data = {} - today_in_history_data_list.extend(data.get('today_in_history_posts', [])) - history_data_collected_from_yesterday = True - - logger.success(f"No. of 'Today in history' posts collected: {len(today_in_history_data_list)}") - - current_directory = os.getcwd() - full_path = os.path.join(current_directory, json_file_path) - if os.path.exists(full_path): - with open(full_path, 'r') as j: - try: - yesterday_data = json.load(j) - except Exception as e: - logger.info(f"Error reading json file:{full_path} :: {e}") - yesterday_data = {} - - xml_ids_title = gen.get_existing_json_title(file_path=json_file_path) - recent_post_ids = [data['_source']['title'] for data in recent_data_list] - active_post_ids = [data['_source']['title'] for data in active_data_list] - all_post_titles = set(recent_post_ids + active_post_ids) - - if all_post_titles != set(xml_ids_title): - logger.info("changes found in recent posts ... ") - - delay = 5 - count = 0 - - while True: - try: - logger.info( - f"active posts: {len(active_data_list)}, " - f"recent posts: {len(recent_data_list)}, " - f"today in history posts: {len(today_in_history_data_list)}" - ) - logger.info("Creating homepage.json file ... ") - - recent_post_summ = "" - if len(active_data_list) > 0 or len(recent_data_list) > 0: - - # header summary - if len(recent_data_list) > 0: - recent_post_summ = gen.generate_recent_posts_summary(recent_data_list) - else: - recent_post_summ = gen.generate_recent_posts_summary(active_data_list) - logger.success(recent_post_summ) - - # recent data - recent_page_data = page_data_handling(recent_data_list) - - # active data - active_page_data = page_data_handling(active_data_list) - - else: - logger.error(f"'Active' and 'Recent' data list empty! Please check the data again.") - recent_page_data, active_page_data = [], [] - - # today in history - if history_data_collected_from_yesterday: - logger.info("No change in 'Today in History' data posts, gathering data from yesterday's post.") - today_in_history_data = yesterday_data.get('today_in_history_posts', []) - else: - if len(today_in_history_data_list) > 0: - today_in_history_data = page_data_handling(today_in_history_data_list, get_unique_per_dev=True) - else: - logger.error(f"'Today in history' data list empty! Please check the data again.") - today_in_history_data = [] - - json_string = { - "header_summary": recent_post_summ, - "recent_posts": recent_page_data, - "active_posts": active_page_data, - "today_in_history_posts": today_in_history_data - } - gen.write_json_file(json_string, json_file_path) - - archive_json_file_path = fr"static/homepage/{str_month_year}/{current_date_str}-homepage.json" - gen.store_file_in_archive(json_file_path, archive_json_file_path) - break - - except Exception as ex: - logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") - time.sleep(delay) - count += 1 - if count > 1: - sys.exit(f"{ex}") - else: - logger.info("No change in 'Recent' or 'Active' posts.") - rewrite_json_file = False - - # update today in history and save file if no change in Recent or Active posts - if history_data_collected_from_yesterday: - logger.info("No change in 'Today in History' data posts, gathering data from yesterday's post.") - today_in_history_data = yesterday_data.get('today_in_history_posts', []) - else: - rewrite_json_file = True - if len(today_in_history_data_list) > 0: - today_in_history_data = page_data_handling(today_in_history_data_list, get_unique_per_dev=True) - else: - logger.error(f"'Today in history' data list empty! Please check the data again.") - today_in_history_data = [] - - if rewrite_json_file: - logger.info(f'Rewriting the homepage.json file') - json_string = { - "header_summary": yesterday_data.get('header_summary', []), - "recent_posts": yesterday_data.get('recent_posts', []), - "active_posts": yesterday_data.get('recent_posts', []), - "today_in_history_posts": today_in_history_data - } - gen.write_json_file(json_string, json_file_path) - else: - logger.info("No need to rewrite homepage.json file") - - if os.path.exists(full_path): - archive_json_file_path = fr"static/homepage/{str_month_year}/{current_date_str}-homepage.json" - gen.store_file_in_archive(json_file_path, archive_json_file_path) +import json +import os +import random +import sys +import time +import traceback +import warnings +from datetime import datetime, timedelta + +from loguru import logger +from tqdm import tqdm + +from src.config import ES_INDEX +from src.elasticsearch_utils import ElasticSearchClient +from src.json_utils import GenerateJSON +from src.utils import month_dict +from src.xml_utils import GenerateXML + +warnings.filterwarnings("ignore") + + +def page_data_handling(data_list: list, get_unique_per_dev=False): + """ + Handle the page data by generating XML files and creating single entries. + + Args: + data_list (list): A list of dictionaries containing page data. + get_unique_per_dev (bool, optional): Flag indicating whether to get only unique documents per development domain. Defaults to False. + + Returns: + list: A list of dictionaries containing processed page data. + """ + page_data = [] + collected_dev_data = [] + for data in tqdm(data_list): + try: + # Generate all XML files for each given title, if not present + xml_gen.start(dict_data=[data], url=data['_source']['domain']) + entry_data = json_gen.create_single_entry(data, look_for_combined_summary=True) + if get_unique_per_dev: # Ensure that there is only one document per domain + if entry_data['dev_name'] not in collected_dev_data: + collected_dev_data.append(entry_data['dev_name']) + logger.info(f"collected data for: {collected_dev_data}") + page_data.append(entry_data) + else: + page_data.append(entry_data) + except Exception as ex: + logger.error( + f"Error occurred for doc id: {data['_source']['id']}\n{ex} \n{traceback.format_exc()}") + return page_data + + +if __name__ == "__main__": + + json_gen = GenerateJSON() + xml_gen = GenerateXML() + elastic_search = ElasticSearchClient() + + dev_urls = [ + ["https://lists.linuxfoundation.org/pipermail/bitcoin-dev/", + "https://gnusha.org/pi/bitcoindev/"], + "https://lists.linuxfoundation.org/pipermail/lightning-dev/", + "https://delvingbitcoin.org/" + ] + + # Set the date range for data extraction + current_date = datetime.now() + start_date = current_date - timedelta(days=7) + + start_date_str = start_date.strftime("%Y-%m-%d") + current_date_str = current_date.strftime("%Y-%m-%d") + + logger.info(f"start_date: {start_date_str}") + logger.info(f"current_date_str: {current_date_str}") + + # Convert month from number to name for filename construction + month_name = month_dict[int(current_date.month)] + str_month_year = f"{month_name}_{int(current_date.year)}" + + recent_data_list = [] + active_data_list = [] + today_in_history_data_list = [] + history_data_collected_from_yesterday = False + random_years_ago = None + + # path to the stored homepage.json file + json_file_path = fr"static/homepage.json" + + # Process each URL in the dev_urls list + for dev_url in dev_urls: + logger.info(f"Working on URL: {dev_url}") + fetch_today_in_history = True + + # Fetch docs from an elasticsearch index + data_list = elastic_search.extract_data_from_es( + ES_INDEX, dev_url, start_date_str, current_date_str, exclude_combined_summary_docs=True + ) + + dev_name = dev_url[0].split("/")[-2] if isinstance(dev_url, list) else dev_url.split("/")[-2] + logger.success(f"Retrieved {len(data_list)} threads for {dev_name}") + + seen_titles = set() + + # TOP ACTIVE POSTS + active_posts_data = elastic_search.filter_top_active_posts( + es_results=data_list, top_n=10 + ) + + # Collect N active posts per domain + active_posts_data_counter = 0 + for data in active_posts_data: + if active_posts_data_counter >= 3: + break + + title = data['_source']['title'] + if title in seen_titles: + continue + seen_titles.add(title) + + # Fetch the first post for given title and domain + original_post = elastic_search.get_earliest_posts_by_title( + es_index=ES_INDEX, url=dev_url, title=title + ) + + # Gather post counts for given title and its total contributors + counts, contributors = elastic_search.es_fetch_contributors_and_threads( + es_index=ES_INDEX, title=title, domain=dev_url + ) + + # As we want to show the original/first post of the filtered active post, + # we are parsing information from 'original_post', + # otherwise we would parse the information from 'data' if we want to show the filtered post itself + + # Separate out an original author from contributor's list + for author in original_post['_source']['authors']: + contributors.remove(author) + original_post['_source']['n_threads'] = counts + original_post['_source']['contributors'] = contributors + original_post['_source']['dev_name'] = dev_name + active_data_list.append(original_post) + active_posts_data_counter += 1 + + logger.success(f"Number of active posts collected: {len(active_data_list)}, for URL: {dev_url}") + + # TOP RECENT POSTS + recent_data_post_counter = 0 + recent_posts_data = elastic_search.filter_top_recent_posts(es_results=data_list, top_n=20) + + for data in recent_posts_data: + # If preprocessed body text shorter than token_threshold, skip the doc + if not json_gen.is_body_text_long(data=data, sent_threshold=2): + logger.info(f"skipping: {data['_source']['title']} - {data['_source']['url']}") + continue + + title = data['_source']['title'] + if title in seen_titles: + continue + seen_titles.add(title) + + # Collect N recent posts per domain + if recent_data_post_counter >= 3: + break + + # Gather post counts for given title and its total contributors + counts, contributors = elastic_search.es_fetch_contributors_and_threads( + es_index=ES_INDEX, title=title, domain=dev_url + ) + + # Separate out an original author from contributor's list + for author in data['_source']['authors']: + contributors.remove(author) + data['_source']['n_threads'] = counts + data['_source']['contributors'] = contributors + data['_source']['dev_name'] = dev_name + recent_data_list.append(data) + recent_data_post_counter += 1 + + if not recent_data_list: + for data in recent_posts_data: + # If the preprocessed body text shorter than token_threshold, skip that post + if not json_gen.is_body_text_long(data=data, sent_threshold=2): + logger.info(f"skipping: {data['_source']['title']} - {data['_source']['url']}") + continue + + title = data['_source']['title'] + # Collect N recent posts per domain + if recent_data_post_counter >= 3: + break + counts, contributors = elastic_search.es_fetch_contributors_and_threads( + es_index=ES_INDEX, title=title, domain=dev_url + ) + + # Separate out an original author from contributor's list + for author in data['_source']['authors']: + contributors.remove(author) + data['_source']['n_threads'] = counts + data['_source']['contributors'] = contributors + data['_source']['dev_name'] = dev_name + recent_data_list.append(data) + recent_data_post_counter += 1 + + logger.success(f"Number of recent posts collected: {len(recent_data_list)}, for URL: {dev_url}") + + # TODAY IN HISTORY POSTS + logger.info(f"fetching 'Today in history' posts... ") + + # Randomly choose a number N within given range and look back N for the data N years ago + # for bitcoin-dev and lighting-dev we have data from 2015, and for delving-bitcoin we have it from 2022 + if not random_years_ago: + at_least_years_ago = 3 + at_max_years_ago = current_date.year - 2015 + random_years_ago = random.randint(at_least_years_ago, at_max_years_ago) + logger.info(f"Random years ago between {at_least_years_ago} to {at_max_years_ago}: {random_years_ago}") + + if dev_url == "https://delvingbitcoin.org/": + random_years_ago = random.randint(1, current_date.year - 2022) + logger.info( + f"for delving-bitcoin - random years ago between {1} to {current_date.year - 2022}: {random_years_ago}") + + default_days_to_look_back = 6 + loop_counter = 1 + + while fetch_today_in_history: + days_to_look_back = default_days_to_look_back * loop_counter + selected_random_date = current_date - timedelta(days=365 * random_years_ago) + + start_of_time = selected_random_date - timedelta(days=selected_random_date.weekday()) + end_of_time = start_of_time + timedelta(days=days_to_look_back) + + start_of_time_str = start_of_time.strftime("%Y-%m-%dT%H:%M:%S") + end_of_time_str = end_of_time.strftime("%Y-%m-%dT%H:%M:%S") + + logger.info( + f"collecting the data from {days_to_look_back} days range ... || Start of week: {start_of_time} | " + f"End of week: {end_of_time}") + + selected_threads = elastic_search.fetch_data_in_date_range( + es_index=ES_INDEX, + start_date=start_of_time_str, + end_date=end_of_time_str, + domain=dev_url + ) + + if len(selected_threads) > 0: + for doc in selected_threads: + doc_title = doc['_source']['title'] + doc_created_at = doc['_source']['created_at'] + + if doc['_source']['type'] == 'original_post': + + counts, contributors = elastic_search.es_fetch_contributors_and_threads( + es_index=ES_INDEX, title=doc_title, domain=dev_url + ) + + if counts < 5: + logger.info(f"No. of replies are less than 5, skipping it... ") + continue + + if contributors: + # Separate out an original author from contributor's list + for author in doc['_source']['authors']: + contributors.remove(author) + doc['_source']['n_threads'] = counts + doc['_source']['contributors'] = contributors + doc['_source']['dev_name'] = dev_name + today_in_history_data_list.append(doc) + logger.info(f"collected doc created on: {doc_created_at} || TITLE: {doc_title}") + fetch_today_in_history = False + break + loop_counter += 1 + + # If not data found for given time period, collect the history data from stored homepage.json file + if not today_in_history_data_list: + logger.info("Collecting yesterday's history threads!") + current_directory = os.getcwd() + full_path = os.path.join(current_directory, json_file_path) + if os.path.exists(full_path): + with open(full_path, 'r') as j: + try: + data = json.load(j) + except Exception as e: + logger.info(f"Error reading json file:{full_path} :: {e}") + data = {} + today_in_history_data_list.extend(data.get('today_in_history_posts', [])) + history_data_collected_from_yesterday = True + + logger.success(f"No. of 'Today in history' posts collected: {len(today_in_history_data_list)}") + + # Determine if there's any update in the data as compared to stored JSON file + current_directory = os.getcwd() + full_path = os.path.join(current_directory, json_file_path) + if os.path.exists(full_path): + with open(full_path, 'r') as j: + try: + yesterday_data = json.load(j) + except Exception as e: + logger.info(f"Error reading json file:{full_path} :: {e}") + yesterday_data = {} + + stored_json_titles = json_gen.get_existing_json_title(file_path=json_file_path) + collected_post_titles = set([data['_source']['title'] for data in recent_data_list] + + [data['_source']['title'] for data in active_data_list]) + + if collected_post_titles != set(stored_json_titles): + logger.info("Changes found as compared to previously stored JSON file... ") + + delay = 5 + count = 0 + + while True: + try: + logger.info( + f"Active posts: {len(active_data_list)}, " + f"Recent posts: {len(recent_data_list)}, " + f"Today in history posts: {len(today_in_history_data_list)}" + ) + logger.info("Creating homepage.json file ... ") + + recent_post_summ = "" + if len(active_data_list) > 0 or len(recent_data_list) > 0: + + # Generate the header summary from recent posts, + # and if no recent data is collected then from active posts + if len(recent_data_list) > 0: + recent_post_summ = json_gen.generate_recent_posts_summary(recent_data_list) + else: + recent_post_summ = json_gen.generate_recent_posts_summary(active_data_list) + logger.success(recent_post_summ) + + # Compile recent posts data + recent_page_data = page_data_handling(recent_data_list) + + # Compile active posts data + active_page_data = page_data_handling(active_data_list) + + else: + logger.error(f"'Active' and 'Recent' data list empty! Please check the data again.") + recent_page_data, active_page_data = [], [] + + # Compile today in history posts + if history_data_collected_from_yesterday: + logger.info("No change in 'Today in History' data posts, gathering data from yesterday's post.") + today_in_history_data = yesterday_data.get('today_in_history_posts', []) + else: + if len(today_in_history_data_list) > 0: + today_in_history_data = page_data_handling(today_in_history_data_list, get_unique_per_dev=True) + else: + logger.error(f"'Today in history' data list empty! Please check the data again.") + today_in_history_data = [] + + json_string = { + "header_summary": recent_post_summ, + "recent_posts": recent_page_data, + "active_posts": active_page_data, + "today_in_history_posts": today_in_history_data + } + json_gen.write_json_file(json_string, json_file_path) + + archive_json_file_path = fr"static/homepage/{str_month_year}/{current_date_str}-homepage.json" + json_gen.store_file_in_archive(json_file_path, archive_json_file_path) + break + + except Exception as ex: + logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") + time.sleep(delay) + count += 1 + if count > 1: + sys.exit(f"{ex}") + else: + # If no changes found in Recent or Active posts, + # simply gather all data from yesterday's stored json file and save it with an updated name in the archive directory + logger.info("No change in 'Recent' or 'Active' posts.") + rewrite_json_file = False + + if history_data_collected_from_yesterday: + logger.info("No change in 'Today in History' data posts, gathering data from yesterday's post.") + today_in_history_data = yesterday_data.get('today_in_history_posts', []) + else: + rewrite_json_file = True + if len(today_in_history_data_list) > 0: + today_in_history_data = page_data_handling(today_in_history_data_list, get_unique_per_dev=True) + else: + logger.error(f"'Today in history' data list empty! Please check the data again.") + today_in_history_data = [] + + if rewrite_json_file: + logger.info(f'Rewriting the homepage.json file') + json_string = { + "header_summary": yesterday_data.get('header_summary', []), + "recent_posts": yesterday_data.get('recent_posts', []), + "active_posts": yesterday_data.get('recent_posts', []), + "today_in_history_posts": today_in_history_data + } + json_gen.write_json_file(json_string, json_file_path) + else: + logger.info("No need to rewrite homepage.json file") + + if os.path.exists(full_path): + archive_json_file_path = fr"static/homepage/{str_month_year}/{current_date_str}-homepage.json" + json_gen.store_file_in_archive(json_file_path, archive_json_file_path) diff --git a/generate_weekly_newsletter_json.py b/generate_weekly_newsletter_json.py index ce5de7781b..f425fd34eb 100644 --- a/generate_weekly_newsletter_json.py +++ b/generate_weekly_newsletter_json.py @@ -1,23 +1,25 @@ +import json +import os +import sys import time import traceback from datetime import datetime, timedelta + from loguru import logger -import os -import sys -import json from tqdm import tqdm from src.config import ES_INDEX from src.elasticsearch_utils import ElasticSearchClient from src.json_utils import GenerateJSON -from src.xml_utils import GenerateXML from src.utils import month_dict +from src.xml_utils import GenerateXML if __name__ == "__main__": - gen = GenerateJSON() + json_gen = GenerateJSON() xml_gen = GenerateXML() elastic_search = ElasticSearchClient() + dev_urls = [ ["https://lists.linuxfoundation.org/pipermail/bitcoin-dev/", "https://gnusha.org/pi/bitcoindev/"], @@ -25,40 +27,35 @@ "https://delvingbitcoin.org/" ] + # Set the date range for data extraction: last week to yesterday. current_date = datetime.now() - current_date_str = current_date.strftime("%Y-%m-%d") - start_date = current_date - timedelta(days=7) - start_date_str = start_date.strftime("%Y-%m-%d") - end_date = current_date - timedelta(days=1) + + current_date_str = current_date.strftime("%Y-%m-%d") + start_date_str = start_date.strftime("%Y-%m-%d") end_date_str = end_date.strftime("%Y-%m-%d") logger.info(f"Newsletter publish date: {current_date_str}") logger.info(f"Gathering data for newsletter from {start_date_str} to {end_date_str}") + # Convert month from number to name for filename construction month_name = month_dict[int(current_date.month)] str_month_year = f"{month_name}_{int(current_date.year)}" active_data_list = [] new_threads_list = [] + # Process each URL in the dev_urls list for dev_url in dev_urls: - data_list = elastic_search.extract_data_from_es( ES_INDEX, dev_url, start_date_str, end_date_str, exclude_combined_summary_docs=True ) - if isinstance(dev_url, list): - dev_name = dev_url[0].split("/")[-2] - else: - dev_name = dev_url.split("/")[-2] - - logger.success(f"TOTAL THREADS RECEIVED FOR '{dev_name}': {len(data_list)}") + dev_name = dev_url[0].split("/")[-2] if isinstance(dev_url, list) else dev_url.split("/")[-2] + logger.success(f"Retrieved {len(data_list)} threads for {dev_name}") # NEW THREADS POSTS - # @TODO you already identify the original post by type==original_post - # so you could get the posts in order by date and check if the original posts is there seen_titles = set() for i in data_list: this_title = i['_source']['title'] @@ -66,27 +63,25 @@ continue seen_titles.add(this_title) - # check if the first post for this title is in the past week - original_post = elastic_search.get_earliest_posts_by_title(es_index=ES_INDEX, url=dev_url, title=this_title) - - if original_post['_source'] and i['_source']['created_at'] == original_post['_source']['created_at']: - logger.success(f"new thread created on: {original_post['_source']['created_at']} || TITLE: {this_title}") + # Check if any new thread started in given week + if i['_source']['type'] == 'original_post': + logger.success(f"New thread created on: {i['_source']['created_at']} || TITLE: {this_title}") counts, contributors = elastic_search.es_fetch_contributors_and_threads( es_index=ES_INDEX, title=this_title, domain=dev_url ) - + # Separate an original author and contributors for author in i['_source']['authors']: contributors.remove(author) i['_source']['n_threads'] = counts i['_source']['contributors'] = contributors i['_source']['dev_name'] = dev_name new_threads_list.append(i) - logger.info(f"number of new threads started this week: {len(new_threads_list)}") + logger.info(f"No. of new threads started this week: {len(new_threads_list)}") # TOP ACTIVE POSTS active_posts_data = elastic_search.filter_top_active_posts(es_results=data_list, top_n=15) - logger.info(f"number of filtered top active post: {len(active_posts_data)}") + logger.info(f"No. of filtered top active post: {len(active_posts_data)}") new_threads_titles_list = [i['_source']['title'] for i in new_threads_list] @@ -103,14 +98,15 @@ seen_titles.add(title) active_data_list.append(data) # active_posts_data_counter += 1 - logger.info(f"number of active posts collected: {len(active_data_list)}") + logger.info(f"No. of active posts collected: {len(active_data_list)}") - # gather titles of docs from json file + # Determine if there's any update in the data compared to stored JSON + # Gather titles from stored JSON file json_file_path = fr"static/newsletters/newsletter.json" current_directory = os.getcwd() json_full_path = os.path.join(current_directory, json_file_path) - json_xml_ids = set() + stored_json_titles = set() if os.path.exists(json_full_path): with open(json_full_path, 'r') as j: try: @@ -119,22 +115,22 @@ logger.info(f"Error reading json file:{json_full_path} :: {e}") json_data = {} - json_xml_ids = set( + stored_json_titles = set( [item['title'] for item in json_data.get('new_threads_this_week', [])] + [item['title'] for item in json_data.get('active_posts_this_week', [])] ) else: logger.warning(f"No existing newsletter.json file found: {json_full_path}") - # gather ids of docs from active posts and new thread posts - filtered_docs_ids = set( + # Gather titles from collected Active data and New Threads list + collected_json_titles = set( [data['_source']['title'] for data in active_data_list] + [data['_source']['title'] for data in new_threads_list] ) - # check if there are any updates in the xml file - if filtered_docs_ids != json_xml_ids: - logger.info("changes found in recent posts ... ") + # Generate a new newsletter.json file if changes found in stored JSON file + if collected_json_titles != stored_json_titles: + logger.info("Changes found as compared to previously stored JSON file... ") delay = 5 count = 0 @@ -144,23 +140,21 @@ logger.success(f"Total no. of active posts collected: {len(active_data_list)}") logger.success(f"Total no. of new threads started this week: {len(new_threads_list)}") - logger.info("creating newsletter.json file ... ") + logger.info("Creating newsletter.json file ... ") if len(active_data_list) > 0 or len(new_threads_list) > 0: + # Prepare New Threads data for newsletter new_threads_page_data = [] - active_page_data = [] new_threads_summary = "" - if new_threads_list: - new_threads_summary += gen.generate_recent_posts_summary(new_threads_list, verbose=True) + new_threads_summary += json_gen.generate_recent_posts_summary(new_threads_list, verbose=True) logger.success(new_threads_summary) for data in tqdm(new_threads_list): try: - # check and generate any missing file + # Generate all XML files for given title, if not present xml_gen.start(dict_data=[data], url=data['_source']['domain']) - - entry_data = gen.create_single_entry( + entry_data = json_gen.create_single_entry( data, base_url_for_xml="https://tldr.bitcoinsearch.xyz/summary", look_for_combined_summary=True, @@ -173,16 +167,17 @@ else: logger.warning(f"No new threads started this week, generating summary of active posts this " f"week ...") - # if no new threads started this week, generate summary from active post this week - new_threads_summary += gen.generate_recent_posts_summary(active_data_list) + # If no new threads started this week, generate summary from active posts of the given week + new_threads_summary += json_gen.generate_recent_posts_summary(active_data_list) logger.success(new_threads_summary) + # Prepare active posts data for newsletter + active_page_data = [] for data in tqdm(active_data_list): try: - # check and generate any missing file + # Generate all XML files for given title, if not present xml_gen.start(dict_data=[data], url=data['_source']['domain']) - - entry_data = gen.create_single_entry( + entry_data = json_gen.create_single_entry( data, base_url_for_xml="https://tldr.bitcoinsearch.xyz/summary", look_for_combined_summary=True, remove_xml_extension=True ) @@ -191,19 +186,17 @@ logger.error( f"Error occurred for doc id: {data['_source']['id']}\n{ex} \n{traceback.format_exc()}") + # Compile and save data for newsletter file json_string = { "summary_of_threads_started_this_week": new_threads_summary, "new_threads_this_week": new_threads_page_data, "active_posts_this_week": active_page_data } - gen.write_json_file(json_string, json_file_path) - + json_gen.write_json_file(json_string, json_file_path) archive_json_file_path = fr"static/newsletters/{str_month_year}/{current_date_str}-newsletter.json" - gen.store_file_in_archive(json_file_path, archive_json_file_path) - + json_gen.store_file_in_archive(json_file_path, archive_json_file_path) else: logger.error(f"Data list empty! Please check the data again.") - break except Exception as ex: logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") @@ -212,8 +205,8 @@ if count > 1: sys.exit(f"{ex}") else: + # If no changes found in stored JSON file, save the previous one with updated name in the archive directory logger.success("No change in the posts, no need to update newsletter.json file") - # save the previous one with updated name in archive if os.path.exists(json_full_path): archive_json_file_path = fr"static/newsletters/{str_month_year}/{current_date_str}-newsletter.json" - gen.store_file_in_archive(json_file_path, archive_json_file_path) + json_gen.store_file_in_archive(json_file_path, archive_json_file_path) diff --git a/json_generator_production.py b/json_generator_production.py index 9e80db8d94..9ccb1e1d35 100644 --- a/json_generator_production.py +++ b/json_generator_production.py @@ -1,15 +1,18 @@ +import os +import sys import time +import warnings from datetime import datetime, timedelta -import sys -import os + from loguru import logger -from tqdm import tqdm -import warnings from openai.error import APIError, PermissionError, AuthenticationError, InvalidAPIType, ServiceUnavailableError +from tqdm import tqdm + from src.config import ES_INDEX from src.elasticsearch_utils import ElasticSearchClient from src.gpt_utils import generate_summary_for_transcript from src.json_utils import GenerateJSON + warnings.filterwarnings("ignore") @@ -29,6 +32,7 @@ def get_json_full_path(base_path, url): "https://btctranscripts.com/", ] + # Set the date range for data extraction if APPLY_DATE_RANGE: current_date_str = None if not current_date_str: @@ -41,7 +45,9 @@ def get_json_full_path(base_path, url): start_date_str = None current_date_str = None + # Process each URL in the dev_urls list for dev_url in dev_urls: + # Fetch data with an empty summary field data_list = elastic_search.fetch_raw_data_for_url_with_empty_summary( es_index=ES_INDEX, url=dev_url, start_date_str=start_date_str, end_date_str=current_date_str ) @@ -63,11 +69,11 @@ def get_json_full_path(base_path, url): logger.info(f"{this_id = }") this_data = doc['_source'] - # check locally if file exists + # Check locally if file exists this_url = this_data['url'] json_full_path = get_json_full_path(BASE_PATH, this_url) - # check if the summary exists in a local file + # Check if the summary exists in a local file if os.path.exists(json_full_path): json_data = gen.load_json_file(json_full_path) this_summary = json_data.get('summary', "") @@ -75,12 +81,12 @@ def get_json_full_path(base_path, url): else: this_summary = "" - # generate summary if not found + # Generate summary if not found in the locally stored file if not this_summary: speaker = this_data.get('authors', "") this_summary = generate_summary_for_transcript(body=this_data['body'], speaker=speaker) - # update ES doc with summary + # Update the doc with summary in Elasticsearch Index if this_summary: elastic_search.es_client.update( index=ES_INDEX, @@ -92,11 +98,11 @@ def get_json_full_path(base_path, url): } ) - # update local json file + # Update local json file with added summary field this_data['summary'] = this_summary gen.write_json_file(this_data, json_full_path) - # break after n entries + # Break after N entries temp_counter += 1 if temp_counter == 50: logger.info(f"Break statement called after: {temp_counter} entries.") diff --git a/push_combined_summary_to_es.py b/push_combined_summary_to_es.py index 018a838def..79a3487a1d 100644 --- a/push_combined_summary_to_es.py +++ b/push_combined_summary_to_es.py @@ -16,7 +16,7 @@ xml_reader = XMLReader() elastic_search = ElasticSearchClient() - total_combined_files = [] + # Static directory names to look into for respective combined summary xml files static_dirs = [ 'bitcoin-dev', 'lightning-dev', @@ -24,29 +24,30 @@ ] pattern = "combined*.xml" + total_combined_files = [] for static_dir in static_dirs: combined_files = glob.glob(f"static/{static_dir}/**/{pattern}") total_combined_files.extend(combined_files) logger.info(f"Total combined files: {(len(total_combined_files))}") - # get unique combined file paths + # Get unique combined file paths total_combined_files_dict = {os.path.splitext(os.path.basename(i))[0]: i for i in total_combined_files} - logger.info(f"Total unique combined files: {len(total_combined_files_dict)}") + # Loop through all locally stored combined summary XML files and insert/update them accordingly for file_name, full_path in tqdm.tqdm(total_combined_files_dict.items()): try: - # get data from xml file + # Get data from xml file xml_file_data = xml_reader.read_xml_file(full_path) if REMOVE_TIMESTAMPS_IN_AUTHORS: - # remove timestamps from author's names and collect unique names only + # Remove timestamps from author's names and collect unique names only xml_file_data['authors'] = remove_timestamps_from_author_names(xml_file_data['authors']) - # check if doc exist in ES index + # Check if doc exist in ES index doc_exists = elastic_search.es_client.exists(index=ES_INDEX, id=file_name) - # insert the doc in ES index if it does not exist, else update it + # Insert the doc in ES index if it does not exist, else update it if not doc_exists: res = elastic_search.es_client.index( index=ES_INDEX, diff --git a/push_summary_to_es.py b/push_summary_to_es.py index 930e46c0b7..fd885537e2 100644 --- a/push_summary_to_es.py +++ b/push_summary_to_es.py @@ -6,7 +6,6 @@ from src.elasticsearch_utils import ElasticSearchClient from src.xml_utils import XMLReader - if __name__ == "__main__": APPLY_DATE_RANGE = False @@ -21,8 +20,9 @@ "https://gnusha.org/pi/bitcoindev/" ] + # Process each URL in the dev_urls list for dev_url in dev_urls: - + # Set the date range for data extraction if APPLY_DATE_RANGE: current_date_str = None if not current_date_str: @@ -35,21 +35,20 @@ start_date_str = None current_date_str = None + # Fetch doc with an empty summary field docs_list = elastic_search.fetch_data_with_empty_summary(ES_INDEX, dev_url, start_date_str, current_date_str) - if isinstance(dev_url, list): - dev_name = dev_url[0].split("/")[-2] - else: - dev_name = dev_url.split("/")[-2] - + dev_name = dev_url[0].split("/")[-2] if isinstance(dev_url, list) else dev_url.split("/")[-2] logger.success(f"Total threads received with empty summary for '{dev_name}': {len(docs_list)}") + # Loop through all fetched docs and update them by adding the summary from xml files for doc in tqdm.tqdm(docs_list): res = None try: doc_id = doc['_id'] doc_index = doc['_index'] if not doc['_source'].get('summary'): + # Get summary text from locally stored XML files xml_summary = xml_reader.get_xml_summary(doc, dev_name) if xml_summary: elastic_search.es_client.update( diff --git a/scheduler.py b/scheduler.py deleted file mode 100644 index 7ef84fc5c6..0000000000 --- a/scheduler.py +++ /dev/null @@ -1,22 +0,0 @@ -import schedule -import time -from generate_xmls import GenerateXML, ElasticSearchClient -from src.config import ES_CLOUD_ID, ES_USERNAME, ES_PASSWORD, ES_INDEX - - -def generate_xml(url): - gen = GenerateXML() - # scrap data and save json and then pass it to xml - elastic_search = ElasticSearchClient(es_cloud_id=ES_CLOUD_ID, es_username=ES_USERNAME, - es_password=ES_PASSWORD) - data_list = elastic_search.extract_data_from_es(ES_INDEX, url) - gen.start(data_list, url) - - -schedule.every().day.at("23:00").do(generate_xml(url="https://lists.linuxfoundation.org/pipermail/bitcoin-dev/")) -schedule.every().day.at("02:00").do(generate_xml(url="https://lists.linuxfoundation.org/pipermail/lightning-dev/")) - - -while True: - schedule.run_pending() - time.sleep(1) diff --git a/src/config.py b/src/config.py index 9b2dba2806..a07c318927 100644 --- a/src/config.py +++ b/src/config.py @@ -1,7 +1,7 @@ import os -import openai import warnings +import openai import tiktoken from dotenv import load_dotenv diff --git a/src/elasticsearch_utils.py b/src/elasticsearch_utils.py index c45eb0bd5e..29ae4f3159 100644 --- a/src/elasticsearch_utils.py +++ b/src/elasticsearch_utils.py @@ -1,5 +1,6 @@ import time from datetime import datetime + from elasticsearch import Elasticsearch, NotFoundError from elasticsearch.helpers import scan from loguru import logger @@ -90,15 +91,15 @@ def fetch_data_based_on_title(self, es_index, title, url): "must": [ { "match_phrase": - { - "title.keyword": title - } + { + "title.keyword": title + } }, { "term": - { - "domain.keyword": str(url) - } + { + "domain.keyword": str(url) + } } ] } @@ -236,7 +237,7 @@ def filter_top_recent_posts(self, es_results, top_n): def filter_top_active_posts(self, es_results, top_n): unique_results = [] - thread_dict = {} # maps post titles to their respective activity levels + thread_dict = {} # maps post titles to their respective activity levels # create dictionary with title as key and thread count as value for result in es_results: title = result['_source']['title'] @@ -386,7 +387,7 @@ def es_fetch_contributors_and_threads(self, es_index, title, domain): # The search query domain_query = self.get_domain_query(domain) query = { - "size": 0, # no search hits are returned, the focus is solely on the aggregations and counts + "size": 0, # no search hits are returned, the focus is solely on the aggregations and counts "query": { "bool": { "must": [ diff --git a/src/gpt_utils.py b/src/gpt_utils.py index 7ec3ed2005..07d9cded13 100644 --- a/src/gpt_utils.py +++ b/src/gpt_utils.py @@ -1,10 +1,10 @@ import sys -import openai -from tqdm import tqdm -from openai.error import APIError, PermissionError, AuthenticationError, InvalidAPIType, ServiceUnavailableError import time import traceback + +import openai from loguru import logger +from openai.error import APIError, PermissionError, AuthenticationError, InvalidAPIType, ServiceUnavailableError from src.config import TOKENIZER, OPENAI_API_KEY, OPENAI_ORG_KEY, CHAT_COMPLETION_MODEL, COMPLETION_MODEL @@ -194,7 +194,17 @@ def create_n_bullets(body_summary, n=3): return response_str -def split_prompt_into_chunks(prompt, chunk_size): +def split_prompt_into_chunks(prompt: str, chunk_size: int) -> list[str]: + """ + Split a given prompt into chunks of specified size. + + Args: + prompt (str): The input text prompt to be split into chunks. + chunk_size (int): The maximum size of each chunk in tokens. + + Returns: + list: A list containing the chunks of the input prompt. + """ tokens = TOKENIZER.encode(prompt) chunks = [] @@ -209,10 +219,20 @@ def split_prompt_into_chunks(prompt, chunk_size): return chunks -def get_summary_chunks(body, tokens_per_sub_body, custom_prompt=None): +def get_summary_chunks(body: str, tokens_per_sub_body: int, custom_prompt: str = None) -> list[str]: + """ + Generate summary chunks for a given body of text. + + Args: + body (str): The text body for which summaries are to be generated. + tokens_per_sub_body (int): The maximum number of tokens per summary chunk. + custom_prompt (str, optional): Custom prompt to be used for summarization. Defaults to None. + + Returns: + list: A list containing summary chunks generated from the input body. + """ chunks = split_prompt_into_chunks(body, tokens_per_sub_body) summaries = [] - # logger.info(f"Total chunks created: {len(chunks)}") for chunk in chunks: count_gen_sum = 0 @@ -232,7 +252,19 @@ def get_summary_chunks(body, tokens_per_sub_body, custom_prompt=None): return summaries -def recursive_summary(body, tokens_per_sub_body, max_length, custom_prompt=None): +def recursive_summary(body: str, tokens_per_sub_body: int, max_length: int, custom_prompt: str = None) -> list[str]: + """ + Generate summary recursively for a given body of text. + + Args: + body (str): The text body for which summaries are to be generated. + tokens_per_sub_body (int): The maximum number of tokens per summary chunk. + max_length (int): The maximum length of the summary in tokens. + custom_prompt (str, optional): Custom prompt to be used for summarization. Defaults to None. + + Returns: + list: A list containing the generated summaries. + """ summaries = get_summary_chunks(body, tokens_per_sub_body, custom_prompt) summary_length = sum([len(TOKENIZER.encode(s)) for s in summaries]) @@ -246,7 +278,17 @@ def recursive_summary(body, tokens_per_sub_body, max_length, custom_prompt=None) return summaries -def gpt_api(body, custom_prompt=None): +def gpt_api(body: str, custom_prompt: str = None) -> str: + """ + Interface function for generating summaries using GPT models. + + Args: + body (str): The text body for which summaries are to be generated. + custom_prompt (str, optional): Custom prompt to be used for summarization. Defaults to None. + + Returns: + str: The generated summary. + """ body_length_limit = 127000 tokens_per_sub_body = 127000 summaries = recursive_summary(body, tokens_per_sub_body, body_length_limit, custom_prompt) @@ -274,12 +316,22 @@ def gpt_api(body, custom_prompt=None): return "\n".join(summaries) -def create_summary(body, custom_prompt=None): +def create_summary(body: str, custom_prompt: str = None) -> str: + """ + Create a summary for the given body of text. + + Args: + body (str): The text body for which summaries are to be generated. + custom_prompt (str, optional): Custom prompt to be used for summarization. Defaults to None. + + Returns: + str: The generated summary. + """ summ = gpt_api(body, custom_prompt) return summ -def generate_chatgpt_summary_for_prompt(summarization_prompt, max_tokens): +def generate_chatgpt_summary_for_prompt(summarization_prompt: str, max_tokens: int) -> str: response = openai.ChatCompletion.create( model=CHAT_COMPLETION_MODEL, messages=[ diff --git a/src/json_utils.py b/src/json_utils.py index 7ba75e5d4d..5f0f4ee036 100644 --- a/src/json_utils.py +++ b/src/json_utils.py @@ -1,8 +1,9 @@ -from xml.etree import ElementTree as ET -from datetime import datetime -import os import json +import os import shutil +from datetime import datetime +from xml.etree import ElementTree as ET + import nltk import pytz from loguru import logger @@ -17,6 +18,17 @@ class GenerateJSON: def get_xml_summary(self, data, verbose=False): + """ + Extracts summary information from an XML file based on provided data. + + Args: + self: The instance of the class invoking the method. + data (dict): A dictionary containing data from a source, expected to have keys '_source', 'id', 'title', and 'created_at'. + verbose (bool, optional): If True, verbose logging is enabled. Defaults to False. + + Returns: + str: A string containing author information and summary extracted from the XML file. + """ number = get_id(data["_source"]["id"]) title = data["_source"]["title"] xml_name = clean_title(title) @@ -56,6 +68,17 @@ def get_xml_summary(self, data, verbose=False): return "" def generate_recent_posts_summary(self, dict_list, verbose=False): + """ + Generate a concise summary from a compilation of condensed recent discussions. + + Args: + self: The instance of the class invoking the method. + dict_list (list): A list of dictionaries containing data from recent posts. + verbose (bool, optional): If True, verbose logging is enabled. Defaults to False. + + Returns: + str: A concise summary of the recent discussions. + """ logger.info("working on given post's summary") recent_post_data = "" @@ -92,6 +115,23 @@ def generate_recent_posts_summary(self, dict_list, verbose=False): def create_single_entry(self, data, base_url_for_xml="static", look_for_combined_summary=False, remove_xml_extension=False, add_combined_summary_field=False): + """ + Create a single entry data structure from the provided data. + + Args: + self: The instance of the class invoking the method. + data (dict): A dictionary containing data from a source, expected to have keys '_source', 'id', 'title', 'created_at', 'contributors', 'url', 'authors', and 'body'. + base_url_for_xml (str, optional): The base URL for the XML files. Defaults to "static". + look_for_combined_summary (bool, optional): If True, look for combined summary files. Defaults to False. + remove_xml_extension (bool, optional): If True, remove the XML file extension from the file paths. Defaults to False. + add_combined_summary_field (bool, optional): If True, add a field for combined summary. Defaults to False. + + Returns: + dict: A dictionary containing the entry data with keys 'id', 'title', 'link', 'authors', 'published_at', + 'summary', 'n_threads', 'dev_name', 'contributors', 'file_path', and 'combined_summ_file_path'. If + add_combined_summary_field is True, it also contains 'combined_summary' key. + + """ number = get_id(data["_source"]["id"]) title = data["_source"]["title"] published_at = datetime.strptime(data['_source']['created_at'], '%Y-%m-%dT%H:%M:%S.%fZ') @@ -164,6 +204,16 @@ def create_single_entry(self, data, base_url_for_xml="static", look_for_combined return entry_data def get_existing_json_title(self, file_path): + """ + Retrieve existing titles from a JSON file located at the specified path. + + Args: + self: The instance of the class invoking the method. + file_path (str): The path to the JSON file. + + Returns: + list: A list containing titles extracted from the JSON file's 'recent_posts' and 'active_posts' fields. + """ current_directory = os.getcwd() full_path = os.path.join(current_directory, file_path) if os.path.exists(full_path): @@ -181,6 +231,19 @@ def get_existing_json_title(self, file_path): return [] def is_body_text_long(self, data, sent_threshold=2): + """ + Check if the body text of the provided data is longer than a specified number of sentences. + + Args: + self: The instance of the class invoking the method. + data (dict): A dictionary containing data from a source, expected to have a '_source' key with a 'body' key + containing the body text. + sent_threshold (int, optional): The threshold number of sentences to consider the body text as long. + Defaults to 2. + + Returns: + bool: True if the body text contains more sentences than the threshold, False otherwise. + """ body_text = data['_source']['body'] body_text = preprocess_email(body_text) body_token = sent_tokenize(body_text) @@ -188,12 +251,35 @@ def is_body_text_long(self, data, sent_threshold=2): return len(body_token) > sent_threshold def store_file_in_archive(self, json_file_path, archive_file_path): + """ + Store a JSON file in an archive file. + + Args: + self: The instance of the class invoking the method. + json_file_path (str): The path to the JSON file to be archived. + archive_file_path (str): The path to the archive file where the JSON file will be stored. + + Returns: + None + + """ archive_dirname = os.path.dirname(archive_file_path) os.makedirs(archive_dirname, exist_ok=True) shutil.copy(json_file_path, archive_file_path) logger.success(f'archive updated with file: {archive_file_path}') def load_json_file(self, path): + """ + Load data from a JSON file located at the specified path. + + Args: + self: The instance of the class invoking the method. + path (str): The path to the JSON file. + + Returns: + dict: A dictionary containing the data loaded from the JSON file, or an empty dictionary if the file cannot be read. + + """ with open(path, 'r') as j: try: return json.load(j) @@ -202,6 +288,18 @@ def load_json_file(self, path): return {} def write_json_file(self, data, path): + """ + Write data to a JSON file located at the specified path. + + Args: + self: The instance of the class invoking the method. + data (dict): The data to be written to the JSON file. + path (str): The path to the JSON file. + + Returns: + None + + """ os.makedirs(os.path.dirname(path), exist_ok=True) try: with open(path, 'w') as f: diff --git a/src/utils.py b/src/utils.py index 38fc7b8e32..6e416a9ed2 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,18 +1,20 @@ -import requests -from bs4 import BeautifulSoup -import pandas as pd +import datetime import os -from tqdm import tqdm import re from ast import literal_eval + +import pandas as pd +import pytz +import requests +from bs4 import BeautifulSoup from dateutil.parser import parse from dateutil.relativedelta import relativedelta from loguru import logger -import pytz -import datetime +from tqdm import tqdm + +from src.config import TOKENIZER, CHATGPT from src.gpt_utils import generate_chatgpt_summary, generate_summary, generate_title, generate_chatgpt_title, \ consolidate_chatgpt_summary, consolidate_summary -from src.config import TOKENIZER, CHATGPT CURRENT_TIME = datetime.datetime.now(datetime.timezone.utc) CURRENT_TIMESTAMP = str(CURRENT_TIME.timestamp()).replace(".", "_") @@ -25,6 +27,19 @@ def add_utc_if_not_present(datetime_str, iso_format=True): + """ + Parses the given datetime string and adds UTC timezone information if not already present. + + Args: + datetime_str (str): The datetime string to parse. + iso_format (bool, optional): Whether to return the result in ISO 8601 format. Default is True. + + Returns: + datetime.datetime or str: If `iso_format` is True, returns the datetime string in ISO 8601 format with UTC timezone. + Otherwise, returns a datetime object with UTC timezone. + Raises: + ValueError: If no valid date format is found for the given datetime string. + """ time_formats = [ "%Y-%m-%d %H:%M:%S%z", "%Y-%m-%dT%H:%M:%S", @@ -50,6 +65,15 @@ def add_utc_if_not_present(datetime_str, iso_format=True): def remove_timestamps_from_author_names(author_list): + """ + Removes timestamps from author names in the given list. + + Args: + author_list (list of str): A list containing author names, possibly with timestamps. + + Returns: + list of str: A list of unique author names with timestamps removed. + """ preprocessed_list = [] for author in author_list: name = author.split(" ")[0:-2] @@ -58,6 +82,16 @@ def remove_timestamps_from_author_names(author_list): def convert_to_tuple(x): + """ + Converts the input to a tuple. + + Args: + x (str or any): The input to be converted. If a string, it's evaluated as a Python literal using `ast.literal_eval`. + + Returns: + tuple: The input converted to a tuple. + + """ try: if isinstance(x, str): x = literal_eval(x) @@ -67,6 +101,16 @@ def convert_to_tuple(x): def clean_title(xml_name): + """ + Cleans the title extracted from an XML file by replacing special characters with hyphens. + + Args: + xml_name (str): The title extracted from an XML file. + + Returns: + str: The cleaned title with special characters replaced by hyphens. + + """ special_characters = ['/', ':', '@', '#', '$', '*', '&', '<', '>', '\\', '?'] xml_name = re.sub(r'[^A-Za-z0-9]+', '-', xml_name) for sc in special_characters: @@ -75,18 +119,56 @@ def clean_title(xml_name): def get_id(id): + """ + Extracts the last part of an ID string after splitting by hyphens. + + Args: + id (str): The ID string. + + Returns: + str: The last part of the ID string. + + """ return str(id).split("-")[-1] def create_folder(month_year): + """ + Creates a folder with the given name if it doesn't exist. + + Args: + month_year (str): The name of the folder to be created. + + """ os.makedirs(month_year, exist_ok=True) def remove_multiple_whitespaces(text): + """ + Removes multiple whitespaces from the given text. + + Args: + text (str): The input text. + + Returns: + str: The text with multiple whitespaces replaced by a single whitespace. + + """ return re.sub('\s+', ' ', text).strip() def normalize_text(s, sep_token=" \n "): + """ + Normalizes the given text by removing extra whitespaces, punctuation, and special characters. + + Args: + s (str): The input text to be normalized. + sep_token (str, optional): The separator token to be used. Default is ' \n '. + + Returns: + str: The normalized text. + + """ s = re.sub(r'\s+', ' ', s).strip() s = re.sub(r". ,", "", s) s = s.replace("..", ".") @@ -98,6 +180,17 @@ def normalize_text(s, sep_token=" \n "): def is_date(string, fuzzy=False): + """ + Checks if the given string represents a date. + + Args: + string (str): The input string to be checked. + fuzzy (bool, optional): Whether to use fuzzy parsing. Default is False. + + Returns: + bool: True if the string represents a date, False otherwise. + + """ try: parse(string, fuzzy=fuzzy) return True @@ -106,6 +199,15 @@ def is_date(string, fuzzy=False): def preprocess_email(email_body): + """ + Preprocesses the given email body by removing unnecessary parts and normalizing the text. + + Args: + email_body (str): The email body to be preprocessed. + + Returns: + str: The preprocessed and normalized email body. + """ email_body = email_body.split("-------------- next part --------------")[0] email_lines = email_body.split('\n') temp_ = [] @@ -142,6 +244,15 @@ def preprocess_email(email_body): def scrape_email_data(url_): + """ + Scrapes email data from the given URL and preprocesses it. + + Args: + url_ (str): The URL of the email content. + + Returns: + tuple: A tuple containing author name, timestamp, subject, and preprocessed email body. + """ r = requests.get(url_) body_soup = BeautifulSoup(r.content, 'html.parser').body subject = body_soup.find('h1').text @@ -155,6 +266,16 @@ def scrape_email_data(url_): def get_past_week_data(dataframe): + """ + Retrieves data from the past week from the given DataFrame. + + Args: + dataframe (pd.DataFrame): The DataFrame containing email data. + + Returns: + pd.DataFrame: A DataFrame containing data from the past week. + + """ dt_now = CURRENT_TIME dt_min = dt_now - datetime.timedelta(days=7) dataframe['timestamp'] = pd.to_datetime(dataframe['timestamp'], utc=True) @@ -165,6 +286,15 @@ def get_past_week_data(dataframe): def get_datetime_format(dataframe): + """ + Converts the 'date' column in the DataFrame to a standard datetime format. + + Args: + dataframe (pd.DataFrame): The DataFrame containing email data. + + Returns: + pd.DataFrame: The DataFrame with the 'date' column converted to a standard datetime format. + """ date_list = [] for i, r in dataframe.iterrows(): date_string = str(r['date']) diff --git a/src/xml_utils.py b/src/xml_utils.py index cf4fe58fe3..95b7e53702 100644 --- a/src/xml_utils.py +++ b/src/xml_utils.py @@ -1,27 +1,38 @@ -import re -import pandas as pd -from feedgen.feed import FeedGenerator -from tqdm import tqdm +import glob +import os import platform +import re import shutil +import traceback +import xml.etree.ElementTree as ET from datetime import datetime, timezone + +import pandas as pd import pytz -import glob -import xml.etree.ElementTree as ET -import os -import traceback +from feedgen.feed import FeedGenerator from loguru import logger +from tqdm import tqdm +from src.config import ES_INDEX +from src.elasticsearch_utils import ElasticSearchClient +from src.gpt_utils import create_summary from src.utils import preprocess_email, month_dict, get_id, clean_title, convert_to_tuple, create_folder, \ remove_multiple_whitespaces, add_utc_if_not_present -from src.gpt_utils import create_summary -from src.config import ES_INDEX -from src.elasticsearch_utils import ElasticSearchClient elastic_search = ElasticSearchClient() def get_base_directory(url): + """ + Determine the base directory for storing files based on the provided URL. + + Args: + url (str): The URL used to determine the base directory. + + Returns: + str: The base directory determined based on the URL. Possible values are "bitcoin-dev", "lightning-dev", "delvingbitcoin", or "others". + + """ if "bitcoin-dev" in url or "bitcoindev" in url: directory = "bitcoin-dev" elif "lightning-dev" in url: @@ -36,6 +47,22 @@ def get_base_directory(url): class XMLReader: def get_xml_summary(self, data, dev_name): + """ + Retrieve the summary from an XML file associated with the provided data. + + Args: + self: The instance of the class invoking the method. + data (dict): A dictionary containing data from a source, expected to have keys '_source', 'id', 'title', and 'created_at'. + dev_name (str): The name of the development platform associated with the data. + + Returns: + str or None: The summary extracted from the XML file, or None if no summary is found. + + Notes: + This method assumes the availability of the functions get_id, clean_title, get_base_directory, and month_dict. + It also assumes the existence of the os, pytz, and ET modules for file operations, datetime handling, and XML parsing, respectively. + + """ number = get_id(data["_source"]["id"]) title = data["_source"]["title"] xml_name = clean_title(title) @@ -69,6 +96,18 @@ def get_xml_summary(self, data, dev_name): return None def read_xml_file(self, full_path): + """ + Read and extract information from an XML file located at the specified path. + + Args: + self: The instance of the class invoking the method. + full_path (str): The path to the XML file. + + Returns: + dict: A dictionary containing extracted information from the XML file, including 'id', 'title', 'summary', + 'body', 'url', 'authors', 'created_at', 'body_type', 'type', 'domain', and 'indexed_at'. + + """ namespaces = {'atom': 'http://www.w3.org/2005/Atom'} tree = ET.parse(full_path) root = tree.getroot() @@ -112,6 +151,19 @@ def read_xml_file(self, full_path): class GenerateXML: def generate_xml(self, feed_data, xml_file): + """ + Generate an Atom XML feed based on the provided feed data and write it to the specified XML file. + + Args: + self: The instance of the class invoking the method. + feed_data (dict): A dictionary containing data for generating the feed, including 'id', 'title', 'authors', 'links', + 'url', 'created_at', and 'summary'. + xml_file (str): The path to the XML file where the generated feed will be written. + + Returns: + None + + """ # create feed generator fg = FeedGenerator() fg.id(feed_data['id']) @@ -135,9 +187,19 @@ def generate_xml(self, feed_data, xml_file): def append_columns(self, df_dict, file, title, namespace): """ - Extract specific information from the given XML file corresponding to - a single post or reply within a thread and append this information to - the given dictionary (df_dict) + Append specific information from the given XML file corresponding to a single post or reply within a thread + to the given dictionary (df_dict). + + Args: + self: The instance of the class invoking the method. + df_dict (dict): A dictionary containing columns as keys and lists as values, representing DataFrame columns. + file (str): The path to the XML file. + title (str): The title of the post or reply. + namespace (dict): A dictionary containing XML namespace mappings. + + Returns: + None + """ # Append default values for columns that will not be directly filled from the XML df_dict["body_type"].append(0) @@ -149,8 +211,6 @@ def append_columns(self, df_dict, file, title, namespace): # The title is directly provided as a parameter df_dict["title"].append(title) - # formatted_file_name = file.split("/static")[1] - # logger.info(formatted_file_name) # Parse the XML file to extract and append relevant data tree = ET.parse(file) @@ -174,51 +234,46 @@ def append_columns(self, df_dict, file, title, namespace): df_dict["authors"].append([author_result.strip()]) def file_not_present_df(self, columns, source_cols, df_dict, files_list, dict_data, data, - title, combined_filename, namespace): + title, namespace): """ Processes data directly from the given document (`data`) as no XML summary is available for that document. Also, for each individual summary (XML file) that already exists for the given thread, extracts and appends its content to the dictionary. """ - # Append basic data from dict_data for each column into df_dict + # Append basic data from dict_data for each column into df_dict using list comprehension for col in columns: df_dict[col].append(dict_data[data][col]) + # Processing source_cols with conditional append for col in source_cols: + value = dict_data[data]['_source'][col] if "created_at" in col: - datetime_obj = add_utc_if_not_present(dict_data[data]['_source'][col], iso_format=False) - df_dict[col].append(datetime_obj) - else: - df_dict[col].append(dict_data[data]['_source'][col]) + value = add_utc_if_not_present(value, iso_format=False) + df_dict[col].append(value) + + # Iterate over files with transformed file paths + files_list = [file.replace("\\", "/") for file in files_list] + + # Use dictionary to store parsed XML trees to avoid redundant parsing + parsed_files = {} # For each individual summary (XML file) that exists for the # given thread, extract and append their content to the dictionary - # TODO: - # This method is called for every post without a summary, which means that - # existing inidividual summaries for a thread are added n-1 times the amount - # of new posts in the thread at the time of execution of the cron job. - # this is not an issue because we then drop duplicates, but it's extra complexity. for file in files_list: - file = file.replace("\\", "/") if os.path.exists(file): - tree = ET.parse(file) - root = tree.getroot() + if file not in parsed_files: + tree = ET.parse(file) + root = tree.getroot() + parsed_files[file] = (tree, root) + + tree, root = parsed_files[file] file_title = root.find('atom:entry/atom:title', namespace).text if title == file_title: self.append_columns(df_dict, file, title, namespace) - if combined_filename in file: - # TODO: the code will never reach this point - # as we are already filtering per thread title so no - # "Combined summary - X" filename will pass though - tree = ET.parse(file) - root = tree.getroot() - summary = root.find('atom:entry/atom:summary', namespace).text - df_dict["body"].append(summary) - else: - summary = root.find('atom:entry/atom:summary', namespace).text - df_dict["body"].append(summary) + summary = root.find('atom:entry/atom:summary', namespace).text + df_dict["body"].append(summary) else: logger.info(f"file not present: {file}") @@ -230,39 +285,45 @@ def file_present_df(self, files_list, namespace, combined_filename, title, indiv summary exists, it extracts the content of individual summaries, appending it to the data dictionary. """ - combined_file_fullpath = None # the combined XML file if found # List to keep track of the month folders that contain # the XML files for the posts of the current thread - month_folders = [] + month_folders = set() + + # Cached listdir calls to avoid repeated disk access + folder_contents = {} + + # Identifying combined file and processing individual summaries in a single loop + combined_file_fullpath = None - # Iterate through the list of local XML file paths for file in files_list: - file = file.replace("\\", "/") + normalized_file = file.replace("\\", "/") # Check if the current file is the combined XML file for the thread - if combined_filename in file: - combined_file_fullpath = file + if combined_filename in normalized_file: + combined_file_fullpath = normalized_file # Parse the XML file to find the title and compare it with the current title # in order to understand if the post/file is part of the current thread - tree = ET.parse(file) + tree = ET.parse(normalized_file) root = tree.getroot() file_title = root.find('atom:entry/atom:title', namespace).text # If titles match, add the file to the list of relevant XMLs and track its month folder if title == file_title: - individual_summaries_xmls_list.append(file) - month_folder_path = "/".join(file.split("/")[:-1]) - if month_folder_path not in month_folders: - month_folders.append(month_folder_path) + individual_summaries_xmls_list.append(normalized_file) + month_folder_path = "/".join(normalized_file.split("/")[:-1]) + month_folders.add(month_folder_path) # Ensure the combined XML file is copied to all relevant month folders for month_folder in month_folders: - if combined_file_fullpath and combined_filename not in os.listdir(month_folder): - if combined_filename not in os.listdir(month_folder): - shutil.copy(combined_file_fullpath, month_folder) + if month_folder not in folder_contents: + folder_contents[month_folder] = os.listdir(month_folder) + + if combined_file_fullpath and combined_filename not in folder_contents[month_folder]: + shutil.copy(combined_file_fullpath, month_folder) # If individual summaries exist but no combined summary, # extract and append their content to the dictionary - if len(individual_summaries_xmls_list) > 0 and not any(combined_filename in item for item in files_list): - logger.info("individual summaries are present but not combined ones ...") + combined_exists = any(combined_filename in item for item in files_list) + if individual_summaries_xmls_list and not combined_exists: + logger.info("Individual summaries are present but not combined ones.") for file in individual_summaries_xmls_list: self.append_columns(df_dict, file, title, namespace) tree = ET.parse(file) @@ -271,24 +332,70 @@ def file_present_df(self, files_list, namespace, combined_filename, title, indiv df_dict["body"].append(summary) def preprocess_authors_name(self, author_tuple): + """ + Preprocess author names by removing '+' characters and stripping whitespace. + + Args: + self: The instance of the class invoking the method. + author_tuple (tuple): A tuple containing author names. + + Returns: + tuple: A tuple with author names processed by removing '+' characters and stripping whitespace. + + """ author_tuple = tuple(s.replace('+', '').strip() for s in author_tuple) return author_tuple def get_local_xml_file_paths(self, dev_url): """ - Retrieve paths for all relevant local XML files based on the given domain + Retrieve paths for all relevant local XML files based on the given domain. + + Args: + self: The instance of the class invoking the method. + dev_url (str): The URL of the development platform. + + Returns: + list: A list containing paths to relevant local XML files. + """ current_directory = os.getcwd() directory = get_base_directory(dev_url) files_list = glob.glob(os.path.join(current_directory, "static", directory, "**/*.xml"), recursive=True) return files_list + def get_local_xml_file_paths_for_title(self, dev_url, title): + """ + Retrieve paths for all relevant local XML files based on the given domain and title. + + Args: + self: The instance of the class invoking the method. + dev_url (str): The URL of the development platform. + title (str): The title of the post or reply. + + Returns: + list: A list containing paths to relevant local XML files. + + """ + current_directory = os.getcwd() + directory = get_base_directory(dev_url) + files_list = glob.glob(os.path.join(current_directory, "static", directory, f"**/*{title}.xml"), recursive=True) + return files_list + def generate_new_emails_df(self, main_dict_data, dev_url): + """ + Generate a new DataFrame containing email data from the given main dictionary data and development URL. + + Args: + self: The instance of the class invoking the method. + main_dict_data (list): A list of dictionaries containing data fetched from Elasticsearch Index. + dev_url (str): The URL of the development platform. + + Returns: + pandas.DataFrame: A DataFrame containing email data extracted from the main dictionary data. + + """ # Define XML namespace for parsing XML files namespaces = {'atom': 'http://www.w3.org/2005/Atom'} - - # Retrieve all existing XML files (summaries) for the given source - files_list = self.get_local_xml_file_paths(dev_url) # Initialize a dictionary to store data for DataFrame construction, with predefined columns columns = ['_index', '_id', '_score'] @@ -297,9 +404,9 @@ def generate_new_emails_df(self, main_dict_data, dev_url): df_dict = {col: [] for col in (columns + source_cols)} seen_titles = set() - # Process each document in the input data + # Process each document in the input data for idx in range(len(main_dict_data)): - xmls_list = [] # the existing XML files for the thread that the fetched document is part of + xmls_list = [] # the existing XML files for the thread that the fetched document is part of thread_title = main_dict_data[idx]["_source"]["title"] if thread_title in seen_titles: continue @@ -322,11 +429,14 @@ def generate_new_emails_df(self, main_dict_data, dev_url): combined_filename = f"combined_{xml_name}.xml" created_at = title_dict_data[data_idx]["_source"]["created_at"] + # Retrieve all existing XML files (summaries) for the given source and title + files_list = self.get_local_xml_file_paths_for_title(dev_url=dev_url, title=xml_name) + # Check if the XML file for the document exists if not any(file_name in item for item in files_list): logger.info(f"Not present: {created_at} | {file_name}") self.file_not_present_df(columns, source_cols, df_dict, files_list, title_dict_data, data_idx, - title, combined_filename, namespaces) + title, namespaces) else: logger.info(f"Present: {created_at} | {file_name}") self.file_present_df(files_list, namespaces, combined_filename, title, xmls_list, df_dict) @@ -345,12 +455,36 @@ def generate_new_emails_df(self, main_dict_data, dev_url): return emails_df def start(self, dict_data, url): + """ + Start the process of generating local XML files and combined summaries based on the provided dictionary data and URL. + + Args: + self: The instance of the class invoking the method. + dict_data (list): A list of dictionaries containing data fetched from Elasticsearch Index. + url (str): The URL of the development platform. + + Returns: + None + + """ if len(dict_data) > 0: emails_df = self.generate_new_emails_df(dict_data, url) if len(emails_df) > 0: emails_df['created_at_org'] = emails_df['created_at'].astype(str) def generate_local_xml(cols, combine_flag, url): + """ + Generate a local XML file based on the provided columns data, combine flag, and URL. + + Args: + cols (dict): A dictionary containing column data, including 'id', 'created_at', 'title', 'body', 'authors', and 'url'. + combine_flag (str): A flag indicating whether to combine the XML file with others. + url (str): The URL of the development platform. + + Returns: + str: The file path of the generated local XML file. + + """ if isinstance(cols['created_at'], str): cols['created_at'] = add_utc_if_not_present(cols['created_at'], iso_format=False) month_name = month_dict[int(cols['created_at'].month)] diff --git a/xmls_generator_production.py b/xmls_generator_production.py index e87792a387..9979f6fafd 100644 --- a/xmls_generator_production.py +++ b/xmls_generator_production.py @@ -1,9 +1,11 @@ +import sys import time +import warnings from datetime import datetime, timedelta -import sys + from loguru import logger -import warnings from openai.error import APIError, PermissionError, AuthenticationError, InvalidAPIType, ServiceUnavailableError + from src.config import ES_INDEX from src.elasticsearch_utils import ElasticSearchClient from src.xml_utils import GenerateXML @@ -11,8 +13,10 @@ warnings.filterwarnings("ignore") if __name__ == "__main__": + gen = GenerateXML() elastic_search = ElasticSearchClient() + dev_urls = [ "https://lists.linuxfoundation.org/pipermail/bitcoin-dev/", "https://lists.linuxfoundation.org/pipermail/lightning-dev/", @@ -20,15 +24,17 @@ "https://gnusha.org/pi/bitcoindev/" ] + # Set the date range for data extraction end_date = datetime.now() start_date = end_date - timedelta(days=30) - # yyyy-mm-dd end_date_str = end_date.strftime("%Y-%m-%d") start_date_str = start_date.strftime("%Y-%m-%d") + logger.info(f"start_data: {start_date_str}") logger.info(f"end_date_str: {end_date_str}") + # Process each URL in the dev_urls list for dev_url in dev_urls: data_list = elastic_search.extract_data_from_es( ES_INDEX, dev_url, start_date_str, end_date_str, exclude_combined_summary_docs=True