Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.11.1
1.12.0
143 changes: 47 additions & 96 deletions metrics/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,39 +213,6 @@ def _setup_parsing_environment(log_file, robots_list, mmdb):
return lp, utm


def _load_metrics_objs_cache(log_file):
"""
Loads the necessary objects into a cache for efficient access during log processing.

Args:
log_file (LogFile): The log file being processed.

Returns:
dict: A cache containing items and user agents.
"""
logging.info(f'Loading metrics objects cache for {log_file.collection}')
cache = {
'items': {},
'user_agents': {},
'user_sessions': {},
'item_accesses': {},
}

items_qs = Item.objects.filter(collection=log_file.collection).select_related('journal', 'article', 'collection')
for it in items_qs:
key = (it.collection.acron3, it.journal_id, it.article_id)
cache['items'][key] = it
logging.info(f'Loaded {len(cache["items"])} items for {log_file.collection}')

user_agents_qs = UserAgent.objects.all()
for ua in user_agents_qs:
key = (ua.name, ua.version)
cache['user_agents'][key] = ua
logging.info(f'Loaded {len(cache["user_agents"])} user agents')

return cache


def _fetch_art_jou_ids(utm, item_access_data):
"""
Fetches the journal and article IDs based on the item access data.
Expand Down Expand Up @@ -285,26 +252,22 @@ def _process_lines(lp, utm, log_file):
Returns:
None.
"""
logging.info(f'Loading metadata cache for {log_file.collection}')
cache = _load_metrics_objs_cache(log_file)

logging.info(f'Processing {lp.logfile}')
for line in lp.parse():
if not _process_line(line, utm, log_file, cache):
if not _process_line(line, utm, log_file):
continue

return True


def _process_line(line, utm, log_file, cache):
def _process_line(line, utm, log_file):
"""
Processes a single line from the log file, translating the URL and registering item access if valid.

Args:
line (dict): A dictionary representing a single log line.
utm (URLTranslationManager): The URL translation manager instance.
log_file (LogFile): The log file being processed.
cache (dict): A cache containing pre-fetched objects to avoid redundant database queries.

Returns:
bool: True if the line was processed successfully, False otherwise.
Expand Down Expand Up @@ -353,18 +316,26 @@ def _process_line(line, utm, log_file, cache):
return False

try:
_register_item_access(item_access_data, line, jou_id, art_id, cache)
_register_item_access(item_access_data, line, jou_id, art_id)
except Exception as e:
_log_discarded_line(log_file, line, tracker_choices.LOG_FILE_DISCARDED_LINE_REASON_DATABASE_ERROR, str(e))
return False

return True


def _register_item_access(item_access_data, line, jou_id, art_id, cache):
def _register_item_access(item_access_data, line, jou_id, art_id):
"""
Registers an item access in the database, creating necessary objects if they do not exist.
Handles potential deadlocks by retrying on database errors.

Args:
item_access_data (dict): A dictionary containing item access data, including collection, ISSN, PIDs, media format, language, and content type.
line (dict): The log line being processed.
jou_id (int): The journal ID.
art_id (int): The article ID.

Returns:
None.
"""
col_acron3 = item_access_data.get('collection')
media_format = item_access_data.get('media_format')
Expand All @@ -382,31 +353,26 @@ def _register_item_access(item_access_data, line, jou_id, art_id, cache):
truncated_datetime = timezone.make_aware(truncated_datetime)
ms_key = extract_minute_second_key(local_datetime)

it = _get_or_create_item(col_acron3, jou_id, art_id, cache)
ua = _get_or_create_user_agent(client_name, client_version, cache)
us = _get_or_create_user_session(truncated_datetime, ua, ip_address, cache)
ita = _get_or_create_item_access(it, us, media_format, media_language, country_code, content_type, ms_key, cache)
it = _get_or_create_item(col_acron3, jou_id, art_id)
ua = _get_or_create_user_agent(client_name, client_version)
us = _get_or_create_user_session(truncated_datetime, ua, ip_address)
ita = _get_or_create_item_access(it, us, media_format, media_language, country_code, content_type, ms_key)

ita.click_timestamps[ms_key] = ita.click_timestamps.get(ms_key, 0) + 1
ita.save()


def _get_or_create_item(col_acron3, jou_id, art_id, cache, max_retries=3):
item_key = (col_acron3, jou_id, art_id)
def _get_or_create_item(col_acron3, jou_id, art_id, max_retries=3):
for attempt in range(max_retries):
try:
if item_key not in cache['items']:
collection_obj = Collection.objects.get(acron3=col_acron3)
journal_obj = Journal.objects.get(id=jou_id)
article_obj = Article.objects.get(id=art_id)
it, _ = Item.objects.get_or_create(
collection=collection_obj,
journal=journal_obj,
article=article_obj,
)
cache['items'][item_key] = it
else:
it = cache['items'][item_key]
collection_obj = Collection.objects.get(acron3=col_acron3)
journal_obj = Journal.objects.get(id=jou_id)
article_obj = Article.objects.get(id=art_id)
it, _ = Item.objects.get_or_create(
collection=collection_obj,
journal=journal_obj,
article=article_obj,
)
return it
except Exception as e:
if attempt == max_retries - 1:
Expand All @@ -415,18 +381,13 @@ def _get_or_create_item(col_acron3, jou_id, art_id, cache, max_retries=3):
return None


def _get_or_create_user_agent(client_name, client_version, cache, max_retries=3):
user_agent_key = (client_name, client_version)
def _get_or_create_user_agent(client_name, client_version, max_retries=3):
for attempt in range(max_retries):
try:
if user_agent_key not in cache['user_agents']:
ua, _ = UserAgent.objects.get_or_create(
name=client_name,
version=client_version
)
cache['user_agents'][user_agent_key] = ua
else:
ua = cache['user_agents'][user_agent_key]
ua, _ = UserAgent.objects.get_or_create(
name=client_name,
version=client_version
)
return ua
except Exception as e:
if attempt == max_retries - 1:
Expand All @@ -435,19 +396,14 @@ def _get_or_create_user_agent(client_name, client_version, cache, max_retries=3)
return None


def _get_or_create_user_session(truncated_datetime, ua, ip_address, cache, max_retries=3):
us_key = (truncated_datetime, ua.id, ip_address)
def _get_or_create_user_session(truncated_datetime, ua, ip_address, max_retries=3):
for attempt in range(max_retries):
try:
if us_key not in cache['user_sessions']:
us, _ = UserSession.objects.get_or_create(
datetime=truncated_datetime,
user_agent=ua,
user_ip=ip_address
)
cache['user_sessions'][us_key] = us
else:
us = cache['user_sessions'][us_key]
us, _ = UserSession.objects.get_or_create(
datetime=truncated_datetime,
user_agent=ua,
user_ip=ip_address
)
return us
except Exception as e:
if attempt == max_retries - 1:
Expand All @@ -456,23 +412,18 @@ def _get_or_create_user_session(truncated_datetime, ua, ip_address, cache, max_r
return None


def _get_or_create_item_access(it, us, media_format, media_language, country_code, content_type, ms_key, cache, max_retries=3):
item_access_key = (it.id, us.id, media_format, media_language, country_code, content_type)
def _get_or_create_item_access(it, us, media_format, media_language, country_code, content_type, ms_key, max_retries=3):
for attempt in range(max_retries):
try:
if item_access_key not in cache['item_accesses']:
ita, _ = ItemAccess.objects.get_or_create(
item=it,
user_session=us,
media_format=media_format,
media_language=media_language,
country_code=country_code,
content_type=content_type,
defaults={'click_timestamps': {ms_key: 1}}
)
cache['item_accesses'][item_access_key] = ita
else:
ita = cache['item_accesses'][item_access_key]
ita, _ = ItemAccess.objects.get_or_create(
item=it,
user_session=us,
media_format=media_format,
media_language=media_language,
country_code=country_code,
content_type=content_type,
defaults={'click_timestamps': {ms_key: 0}}
)
return ita
except Exception as e:
if attempt == max_retries - 1:
Expand Down