Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.11.0
1.11.1
219 changes: 132 additions & 87 deletions metrics/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@
from .models import UserAgent, UserSession, Item, ItemAccess

import logging
import time


User = get_user_model()


@celery_app.task(bind=True, name=_('Compute access'), timelimit=-1)
@celery_app.task(bind=True, name=_('Parse logs'), timelimit=-1)
def task_parse_logs(self, collections=[], from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None):
"""
Parses log files associated with a given collection.
Expand Down Expand Up @@ -124,7 +125,15 @@ def task_parse_log(self, log_file_hash, user_id=None, username=None):
return

log_parser, url_translator_manager = _setup_parsing_environment(log_file, robots_list, mmdb)
_process_lines(log_parser, url_translator_manager, log_file)
success = _process_lines(log_parser, url_translator_manager, log_file)

if not success:
logging.error(f'Failed to parse log file {log_file.path}.')
return

log_file.status = choices.LOG_FILE_STATUS_PROCESSED
log_file.save()
logging.info(f'Log file {log_file.path} has been successfully parsed.')


def _initialize_log_file(log_file_hash):
Expand Down Expand Up @@ -212,7 +221,7 @@ def _load_metrics_objs_cache(log_file):
log_file (LogFile): The log file being processed.

Returns:
dict: A cache containing items, user agents, user sessions, and item accesses.
dict: A cache containing items and user agents.
"""
logging.info(f'Loading metrics objects cache for {log_file.collection}')
cache = {
Expand All @@ -234,26 +243,6 @@ def _load_metrics_objs_cache(log_file):
cache['user_agents'][key] = ua
logging.info(f'Loaded {len(cache["user_agents"])} user agents')

date_str = log_file.validation.get('probably_date')
user_sessions_qs = UserSession.objects.filter(datetime__date=date_str).select_related('user_agent')
for us in user_sessions_qs:
key = (us.datetime, us.user_agent_id, us.user_ip)
cache['user_sessions'][key] = us
logging.info(f'Loaded {len(cache["user_sessions"])} user sessions for {date_str}')

item_accesses_qs = ItemAccess.objects.filter(item__collection=log_file.collection)
for ia in item_accesses_qs:
key = (
ia.item_id,
ia.user_session_id,
ia.media_format,
ia.media_language,
ia.country_code,
ia.content_type,
)
cache['item_accesses'][key] = ia
logging.info(f'Loaded {len(cache["item_accesses"])} item accesses for {log_file.collection}')

return cache


Expand Down Expand Up @@ -304,9 +293,7 @@ def _process_lines(lp, utm, log_file):
if not _process_line(line, utm, log_file, cache):
continue

logging.info(f'File {log_file.path} has been parsed.')
log_file.status = choices.LOG_FILE_STATUS_PROCESSED
log_file.save()
return True


def _process_line(line, utm, log_file, cache):
Expand Down Expand Up @@ -365,20 +352,19 @@ def _process_line(line, utm, log_file, cache):
)
return False

_register_item_access(item_access_data, line, jou_id, art_id, cache)
try:
_register_item_access(item_access_data, line, jou_id, art_id, cache)
except Exception as e:
_log_discarded_line(log_file, line, tracker_choices.LOG_FILE_DISCARDED_LINE_REASON_DATABASE_ERROR, str(e))
return False

return True


def _register_item_access(item_access_data, line, jou_id, art_id, cache):
"""
Registers an item access in the database, creating necessary objects if they do not exist.

Args:
item_access_data (dict): Data related to the item access, including collection, journal, article, media format, etc.
line (dict): The log line being processed.
jou_id (int): The ID of the journal.
art_id (int): The ID of the article.
cache (dict): A cache containing pre-fetched objects to avoid redundant database queries.
Handles potential deadlocks by retrying on database errors.
"""
col_acron3 = item_access_data.get('collection')
media_format = item_access_data.get('media_format')
Expand All @@ -390,64 +376,109 @@ def _register_item_access(item_access_data, line, jou_id, art_id, cache):
local_datetime = line.get('local_datetime')
country_code = line.get('country_code')
ip_address = line.get('ip_address')

truncated_datetime = truncate_datetime_to_hour(local_datetime)
if timezone.is_naive(truncated_datetime):
truncated_datetime = timezone.make_aware(truncated_datetime)
ms_key = extract_minute_second_key(local_datetime)

item_key = (col_acron3, jou_id, art_id)
if item_key not in cache['items']:
collection_obj = Collection.objects.get(acron3=col_acron3)
journal_obj = Journal.objects.get(id=jou_id)
article_obj = Article.objects.get(id=art_id)
it, _it = Item.objects.get_or_create(
collection=collection_obj,
journal=journal_obj,
article=article_obj,
)
cache['items'][item_key] = it
else:
it = cache['items'][item_key]
it = _get_or_create_item(col_acron3, jou_id, art_id, cache)
ua = _get_or_create_user_agent(client_name, client_version, cache)
us = _get_or_create_user_session(truncated_datetime, ua, ip_address, cache)
ita = _get_or_create_item_access(it, us, media_format, media_language, country_code, content_type, ms_key, cache)

user_agent_key = (client_name, client_version)
if user_agent_key not in cache['user_agents']:
ua, _ua = UserAgent.objects.get_or_create(
name=client_name,
version=client_version
)
cache['user_agents'][user_agent_key] = ua
else:
ua = cache['user_agents'][user_agent_key]
ita.click_timestamps[ms_key] = ita.click_timestamps.get(ms_key, 0) + 1
ita.save()

us_key = (truncated_datetime, ua.id, ip_address)
if us_key not in cache['user_sessions']:
us, _us = UserSession.objects.get_or_create(
datetime=truncated_datetime,
user_agent=ua,
user_ip=ip_address
)
cache['user_sessions'][us_key] = us
else:
us = cache['user_sessions'][us_key]

def _get_or_create_item(col_acron3, jou_id, art_id, cache, max_retries=3):
item_key = (col_acron3, jou_id, art_id)
for attempt in range(max_retries):
try:
if item_key not in cache['items']:
collection_obj = Collection.objects.get(acron3=col_acron3)
journal_obj = Journal.objects.get(id=jou_id)
article_obj = Article.objects.get(id=art_id)
it, _ = Item.objects.get_or_create(
collection=collection_obj,
journal=journal_obj,
article=article_obj,
)
cache['items'][item_key] = it
else:
it = cache['items'][item_key]
return it
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(0.1)
return None


def _get_or_create_user_agent(client_name, client_version, cache, max_retries=3):
user_agent_key = (client_name, client_version)
for attempt in range(max_retries):
try:
if user_agent_key not in cache['user_agents']:
ua, _ = UserAgent.objects.get_or_create(
name=client_name,
version=client_version
)
cache['user_agents'][user_agent_key] = ua
else:
ua = cache['user_agents'][user_agent_key]
return ua
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(0.1)
return None


def _get_or_create_user_session(truncated_datetime, ua, ip_address, cache, max_retries=3):
us_key = (truncated_datetime, ua.id, ip_address)
for attempt in range(max_retries):
try:
if us_key not in cache['user_sessions']:
us, _ = UserSession.objects.get_or_create(
datetime=truncated_datetime,
user_agent=ua,
user_ip=ip_address
)
cache['user_sessions'][us_key] = us
else:
us = cache['user_sessions'][us_key]
return us
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(0.1)
return None


def _get_or_create_item_access(it, us, media_format, media_language, country_code, content_type, ms_key, cache, max_retries=3):
item_access_key = (it.id, us.id, media_format, media_language, country_code, content_type)
if item_access_key not in cache['item_accesses']:
ita, _ita = ItemAccess.objects.get_or_create(
item=it,
user_session=us,
media_format=media_format,
media_language=media_language,
country_code=country_code,
content_type=content_type,
click_timestamps={ms_key: 1}
)
cache['item_accesses'][item_access_key] = ita
else:
ita = cache['item_accesses'][item_access_key]

ita.click_timestamps[ms_key] = ita.click_timestamps.get(ms_key, 0) + 1
ita.save()
for attempt in range(max_retries):
try:
if item_access_key not in cache['item_accesses']:
ita, _ = ItemAccess.objects.get_or_create(
item=it,
user_session=us,
media_format=media_format,
media_language=media_language,
country_code=country_code,
content_type=content_type,
defaults={'click_timestamps': {ms_key: 1}}
)
cache['item_accesses'][item_access_key] = ita
else:
ita = cache['item_accesses'][item_access_key]
return ita
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(0.1)
return None


def _log_discarded_line(log_file, line, error_type, message):
Expand Down Expand Up @@ -586,9 +617,12 @@ def task_index_documents(self, collections=[], from_date=None, until_date=None,
for collection in collections:
logging.info(f'Computing metrics for collection {collection} from {from_date_str} to {until_date_str}')

clfdc_to_update = []

bulk_data = []
metrics_result = compute_metrics_for_collection(collection, dates, replace, clfdc_to_update)

for key, metric_data in compute_metrics_for_collection(collection, dates, replace).items():
for key, metric_data in metrics_result.items():
bulk_data.append({
"_id": key,
"_source": metric_data,
Expand All @@ -604,6 +638,7 @@ def task_index_documents(self, collections=[], from_date=None, until_date=None,
bulk_data = []
except Exception as e:
logging.error(f"Failed to send bulk metrics to Elasticsearch: {e}")
clfdc_to_update = []

if bulk_data:
try:
Expand All @@ -614,9 +649,14 @@ def task_index_documents(self, collections=[], from_date=None, until_date=None,
)
except Exception as e:
logging.error(f"Failed to send remaining bulk metrics to Elasticsearch: {e}")
clfdc_to_update = []

for clfdc in clfdc_to_update:
clfdc.is_usage_metric_computed = True
clfdc.save()


def compute_metrics_for_collection(collection, dates, replace=False):
def compute_metrics_for_collection(collection, dates, replace=False, clfdc_to_update=None):
"""
Computes usage metrics for a given collection over a range of dates.

Expand All @@ -627,13 +667,17 @@ def compute_metrics_for_collection(collection, dates, replace=False):
should be computed.
replace (bool, optional): A flag indicating whether to replace
existing metrics. Defaults to False.
clfdc_to_update (list, optional): List to append clfdc objects that should be marked as computed after successful export.

Returns:
dict: A dictionary containing computed metrics, keyed by a
generated usage key.
"""
data = {}

if clfdc_to_update is None:
clfdc_to_update = []

for date in dates:
date_str = get_date_str(date)

Expand All @@ -646,8 +690,7 @@ def compute_metrics_for_collection(collection, dates, replace=False):

logging.info(f"Computing metrics for {date_str}")
_process_user_sessions(collection, date, date_str, data)
clfdc.is_usage_metric_computed = True
clfdc.save()
clfdc_to_update.append(clfdc)

return data

Expand Down Expand Up @@ -771,6 +814,8 @@ def _process_user_sessions(collection, date, date_str, data):
item_access.content_type,
)

return True


def _generate_usage_key(collection, journal, pid_v2, pid_v3, pid_generic, media_language, country_code, date_str):
""""
Expand Down
2 changes: 2 additions & 0 deletions tracker/choices.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
LOG_FILE_DISCARDED_LINE_REASON_MISSING_ARTICLE = 'ART'
LOG_FILE_DISCARDED_LINE_REASON_MISSING_JOURNAL = 'JOU'
LOG_FILE_DISCARDED_LINE_REASON_URL_TRANSLATION = 'URL'
LOG_FILE_DISCARDED_LINE_REASON_DATABASE_ERROR = 'DBE'

LOG_FILE_DISCARDED_LINE_REASON = [
(LOG_FILE_DISCARDED_LINE_REASON_MISSING_METADATA, _("Missing Metadata")),
(LOG_FILE_DISCARDED_LINE_REASON_MISSING_ARTICLE, _("Missing Article")),
(LOG_FILE_DISCARDED_LINE_REASON_MISSING_JOURNAL, _("Missing Journal")),
(LOG_FILE_DISCARDED_LINE_REASON_URL_TRANSLATION, _("URL Translation")),
(LOG_FILE_DISCARDED_LINE_REASON_DATABASE_ERROR, _("Database Error")),
]


Expand Down
29 changes: 29 additions & 0 deletions tracker/migrations/0006_alter_logfilediscardedline_error_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 5.0.7 on 2025-06-14 10:46

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("tracker", "0005_articleevent"),
]

operations = [
migrations.AlterField(
model_name="logfilediscardedline",
name="error_type",
field=models.CharField(
blank=True,
choices=[
("MET", "Missing Metadata"),
("ART", "Missing Article"),
("JOU", "Missing Journal"),
("URL", "URL Translation"),
("DBE", "Database Error"),
],
max_length=3,
null=True,
verbose_name="Error Type",
),
),
]