diff --git a/VERSION b/VERSION index 169f19b..b0f61c5 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.11.0 \ No newline at end of file +1.11.1 \ No newline at end of file diff --git a/metrics/tasks.py b/metrics/tasks.py index 349b3f2..861f8a4 100644 --- a/metrics/tasks.py +++ b/metrics/tasks.py @@ -43,12 +43,13 @@ from .models import UserAgent, UserSession, Item, ItemAccess import logging +import time User = get_user_model() -@celery_app.task(bind=True, name=_('Compute access'), timelimit=-1) +@celery_app.task(bind=True, name=_('Parse logs'), timelimit=-1) def task_parse_logs(self, collections=[], from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None): """ Parses log files associated with a given collection. @@ -124,7 +125,15 @@ def task_parse_log(self, log_file_hash, user_id=None, username=None): return log_parser, url_translator_manager = _setup_parsing_environment(log_file, robots_list, mmdb) - _process_lines(log_parser, url_translator_manager, log_file) + success = _process_lines(log_parser, url_translator_manager, log_file) + + if not success: + logging.error(f'Failed to parse log file {log_file.path}.') + return + + log_file.status = choices.LOG_FILE_STATUS_PROCESSED + log_file.save() + logging.info(f'Log file {log_file.path} has been successfully parsed.') def _initialize_log_file(log_file_hash): @@ -212,7 +221,7 @@ def _load_metrics_objs_cache(log_file): log_file (LogFile): The log file being processed. Returns: - dict: A cache containing items, user agents, user sessions, and item accesses. + dict: A cache containing items and user agents. """ logging.info(f'Loading metrics objects cache for {log_file.collection}') cache = { @@ -234,26 +243,6 @@ def _load_metrics_objs_cache(log_file): cache['user_agents'][key] = ua logging.info(f'Loaded {len(cache["user_agents"])} user agents') - date_str = log_file.validation.get('probably_date') - user_sessions_qs = UserSession.objects.filter(datetime__date=date_str).select_related('user_agent') - for us in user_sessions_qs: - key = (us.datetime, us.user_agent_id, us.user_ip) - cache['user_sessions'][key] = us - logging.info(f'Loaded {len(cache["user_sessions"])} user sessions for {date_str}') - - item_accesses_qs = ItemAccess.objects.filter(item__collection=log_file.collection) - for ia in item_accesses_qs: - key = ( - ia.item_id, - ia.user_session_id, - ia.media_format, - ia.media_language, - ia.country_code, - ia.content_type, - ) - cache['item_accesses'][key] = ia - logging.info(f'Loaded {len(cache["item_accesses"])} item accesses for {log_file.collection}') - return cache @@ -304,9 +293,7 @@ def _process_lines(lp, utm, log_file): if not _process_line(line, utm, log_file, cache): continue - logging.info(f'File {log_file.path} has been parsed.') - log_file.status = choices.LOG_FILE_STATUS_PROCESSED - log_file.save() + return True def _process_line(line, utm, log_file, cache): @@ -365,20 +352,19 @@ def _process_line(line, utm, log_file, cache): ) return False - _register_item_access(item_access_data, line, jou_id, art_id, cache) + try: + _register_item_access(item_access_data, line, jou_id, art_id, cache) + except Exception as e: + _log_discarded_line(log_file, line, tracker_choices.LOG_FILE_DISCARDED_LINE_REASON_DATABASE_ERROR, str(e)) + return False + return True def _register_item_access(item_access_data, line, jou_id, art_id, cache): """ Registers an item access in the database, creating necessary objects if they do not exist. - - Args: - item_access_data (dict): Data related to the item access, including collection, journal, article, media format, etc. - line (dict): The log line being processed. - jou_id (int): The ID of the journal. - art_id (int): The ID of the article. - cache (dict): A cache containing pre-fetched objects to avoid redundant database queries. + Handles potential deadlocks by retrying on database errors. """ col_acron3 = item_access_data.get('collection') media_format = item_access_data.get('media_format') @@ -390,64 +376,109 @@ def _register_item_access(item_access_data, line, jou_id, art_id, cache): local_datetime = line.get('local_datetime') country_code = line.get('country_code') ip_address = line.get('ip_address') - + truncated_datetime = truncate_datetime_to_hour(local_datetime) if timezone.is_naive(truncated_datetime): truncated_datetime = timezone.make_aware(truncated_datetime) ms_key = extract_minute_second_key(local_datetime) - item_key = (col_acron3, jou_id, art_id) - if item_key not in cache['items']: - collection_obj = Collection.objects.get(acron3=col_acron3) - journal_obj = Journal.objects.get(id=jou_id) - article_obj = Article.objects.get(id=art_id) - it, _it = Item.objects.get_or_create( - collection=collection_obj, - journal=journal_obj, - article=article_obj, - ) - cache['items'][item_key] = it - else: - it = cache['items'][item_key] + it = _get_or_create_item(col_acron3, jou_id, art_id, cache) + ua = _get_or_create_user_agent(client_name, client_version, cache) + us = _get_or_create_user_session(truncated_datetime, ua, ip_address, cache) + ita = _get_or_create_item_access(it, us, media_format, media_language, country_code, content_type, ms_key, cache) - user_agent_key = (client_name, client_version) - if user_agent_key not in cache['user_agents']: - ua, _ua = UserAgent.objects.get_or_create( - name=client_name, - version=client_version - ) - cache['user_agents'][user_agent_key] = ua - else: - ua = cache['user_agents'][user_agent_key] + ita.click_timestamps[ms_key] = ita.click_timestamps.get(ms_key, 0) + 1 + ita.save() - us_key = (truncated_datetime, ua.id, ip_address) - if us_key not in cache['user_sessions']: - us, _us = UserSession.objects.get_or_create( - datetime=truncated_datetime, - user_agent=ua, - user_ip=ip_address - ) - cache['user_sessions'][us_key] = us - else: - us = cache['user_sessions'][us_key] +def _get_or_create_item(col_acron3, jou_id, art_id, cache, max_retries=3): + item_key = (col_acron3, jou_id, art_id) + for attempt in range(max_retries): + try: + if item_key not in cache['items']: + collection_obj = Collection.objects.get(acron3=col_acron3) + journal_obj = Journal.objects.get(id=jou_id) + article_obj = Article.objects.get(id=art_id) + it, _ = Item.objects.get_or_create( + collection=collection_obj, + journal=journal_obj, + article=article_obj, + ) + cache['items'][item_key] = it + else: + it = cache['items'][item_key] + return it + except Exception as e: + if attempt == max_retries - 1: + raise + time.sleep(0.1) + return None + + +def _get_or_create_user_agent(client_name, client_version, cache, max_retries=3): + user_agent_key = (client_name, client_version) + for attempt in range(max_retries): + try: + if user_agent_key not in cache['user_agents']: + ua, _ = UserAgent.objects.get_or_create( + name=client_name, + version=client_version + ) + cache['user_agents'][user_agent_key] = ua + else: + ua = cache['user_agents'][user_agent_key] + return ua + except Exception as e: + if attempt == max_retries - 1: + raise + time.sleep(0.1) + return None + + +def _get_or_create_user_session(truncated_datetime, ua, ip_address, cache, max_retries=3): + us_key = (truncated_datetime, ua.id, ip_address) + for attempt in range(max_retries): + try: + if us_key not in cache['user_sessions']: + us, _ = UserSession.objects.get_or_create( + datetime=truncated_datetime, + user_agent=ua, + user_ip=ip_address + ) + cache['user_sessions'][us_key] = us + else: + us = cache['user_sessions'][us_key] + return us + except Exception as e: + if attempt == max_retries - 1: + raise + time.sleep(0.1) + return None + + +def _get_or_create_item_access(it, us, media_format, media_language, country_code, content_type, ms_key, cache, max_retries=3): item_access_key = (it.id, us.id, media_format, media_language, country_code, content_type) - if item_access_key not in cache['item_accesses']: - ita, _ita = ItemAccess.objects.get_or_create( - item=it, - user_session=us, - media_format=media_format, - media_language=media_language, - country_code=country_code, - content_type=content_type, - click_timestamps={ms_key: 1} - ) - cache['item_accesses'][item_access_key] = ita - else: - ita = cache['item_accesses'][item_access_key] - - ita.click_timestamps[ms_key] = ita.click_timestamps.get(ms_key, 0) + 1 - ita.save() + for attempt in range(max_retries): + try: + if item_access_key not in cache['item_accesses']: + ita, _ = ItemAccess.objects.get_or_create( + item=it, + user_session=us, + media_format=media_format, + media_language=media_language, + country_code=country_code, + content_type=content_type, + defaults={'click_timestamps': {ms_key: 1}} + ) + cache['item_accesses'][item_access_key] = ita + else: + ita = cache['item_accesses'][item_access_key] + return ita + except Exception as e: + if attempt == max_retries - 1: + raise + time.sleep(0.1) + return None def _log_discarded_line(log_file, line, error_type, message): @@ -586,9 +617,12 @@ def task_index_documents(self, collections=[], from_date=None, until_date=None, for collection in collections: logging.info(f'Computing metrics for collection {collection} from {from_date_str} to {until_date_str}') + clfdc_to_update = [] + bulk_data = [] + metrics_result = compute_metrics_for_collection(collection, dates, replace, clfdc_to_update) - for key, metric_data in compute_metrics_for_collection(collection, dates, replace).items(): + for key, metric_data in metrics_result.items(): bulk_data.append({ "_id": key, "_source": metric_data, @@ -604,6 +638,7 @@ def task_index_documents(self, collections=[], from_date=None, until_date=None, bulk_data = [] except Exception as e: logging.error(f"Failed to send bulk metrics to Elasticsearch: {e}") + clfdc_to_update = [] if bulk_data: try: @@ -614,9 +649,14 @@ def task_index_documents(self, collections=[], from_date=None, until_date=None, ) except Exception as e: logging.error(f"Failed to send remaining bulk metrics to Elasticsearch: {e}") + clfdc_to_update = [] + + for clfdc in clfdc_to_update: + clfdc.is_usage_metric_computed = True + clfdc.save() -def compute_metrics_for_collection(collection, dates, replace=False): +def compute_metrics_for_collection(collection, dates, replace=False, clfdc_to_update=None): """ Computes usage metrics for a given collection over a range of dates. @@ -627,6 +667,7 @@ def compute_metrics_for_collection(collection, dates, replace=False): should be computed. replace (bool, optional): A flag indicating whether to replace existing metrics. Defaults to False. + clfdc_to_update (list, optional): List to append clfdc objects that should be marked as computed after successful export. Returns: dict: A dictionary containing computed metrics, keyed by a @@ -634,6 +675,9 @@ def compute_metrics_for_collection(collection, dates, replace=False): """ data = {} + if clfdc_to_update is None: + clfdc_to_update = [] + for date in dates: date_str = get_date_str(date) @@ -646,8 +690,7 @@ def compute_metrics_for_collection(collection, dates, replace=False): logging.info(f"Computing metrics for {date_str}") _process_user_sessions(collection, date, date_str, data) - clfdc.is_usage_metric_computed = True - clfdc.save() + clfdc_to_update.append(clfdc) return data @@ -771,6 +814,8 @@ def _process_user_sessions(collection, date, date_str, data): item_access.content_type, ) + return True + def _generate_usage_key(collection, journal, pid_v2, pid_v3, pid_generic, media_language, country_code, date_str): """" diff --git a/tracker/choices.py b/tracker/choices.py index d82db64..de14077 100644 --- a/tracker/choices.py +++ b/tracker/choices.py @@ -34,12 +34,14 @@ LOG_FILE_DISCARDED_LINE_REASON_MISSING_ARTICLE = 'ART' LOG_FILE_DISCARDED_LINE_REASON_MISSING_JOURNAL = 'JOU' LOG_FILE_DISCARDED_LINE_REASON_URL_TRANSLATION = 'URL' +LOG_FILE_DISCARDED_LINE_REASON_DATABASE_ERROR = 'DBE' LOG_FILE_DISCARDED_LINE_REASON = [ (LOG_FILE_DISCARDED_LINE_REASON_MISSING_METADATA, _("Missing Metadata")), (LOG_FILE_DISCARDED_LINE_REASON_MISSING_ARTICLE, _("Missing Article")), (LOG_FILE_DISCARDED_LINE_REASON_MISSING_JOURNAL, _("Missing Journal")), (LOG_FILE_DISCARDED_LINE_REASON_URL_TRANSLATION, _("URL Translation")), + (LOG_FILE_DISCARDED_LINE_REASON_DATABASE_ERROR, _("Database Error")), ] diff --git a/tracker/migrations/0006_alter_logfilediscardedline_error_type.py b/tracker/migrations/0006_alter_logfilediscardedline_error_type.py new file mode 100644 index 0000000..fb7f74a --- /dev/null +++ b/tracker/migrations/0006_alter_logfilediscardedline_error_type.py @@ -0,0 +1,29 @@ +# Generated by Django 5.0.7 on 2025-06-14 10:46 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("tracker", "0005_articleevent"), + ] + + operations = [ + migrations.AlterField( + model_name="logfilediscardedline", + name="error_type", + field=models.CharField( + blank=True, + choices=[ + ("MET", "Missing Metadata"), + ("ART", "Missing Article"), + ("JOU", "Missing Journal"), + ("URL", "URL Translation"), + ("DBE", "Database Error"), + ], + max_length=3, + null=True, + verbose_name="Error Type", + ), + ), + ]