Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.8.0
1.9.0
70 changes: 43 additions & 27 deletions log_manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ def task_validate_log_files(self, collections=[], from_date=None, until_date=Non
username (str, optional): The username of the user initiating the task. Defaults to None.
ignore_date (bool, optional): If True, ignore the date of the log file. Defaults to False.
"""
user = _get_user(self.request, username=username, user_id=user_id)

logging.info(f'Validating log files for collections: {collections}.')

visible_dates = _get_visible_dates(from_date, until_date, days_to_go_back)
Expand All @@ -113,34 +111,53 @@ def task_validate_log_files(self, collections=[], from_date=None, until_date=Non
for log_file in models.LogFile.objects.filter(status=choices.LOG_FILE_STATUS_CREATED, collection__acron3=col):
file_ctime = date_utils.get_date_obj_from_timestamp(log_file.stat_result[LOGFILE_STAT_RESULT_CTIME_INDEX])
if file_ctime in visible_dates or ignore_date:
logging.info(f'Validating log file {log_file.path} for collection {log_file.collection.acron3}.')
task_validate_log_file.apply_async(args=(log_file.hash, user_id, username))

buffer_size, sample_size = _fetch_validation_parameters(col)

val_result = utils.validate_file(path=log_file.path, buffer_size=buffer_size, sample_size=sample_size)
if 'datetimes' in val_result.get('content', {}).get('summary', {}):
del val_result['content']['summary']['datetimes']

try:
log_file.validation['result'] = json.dumps(val_result, cls=DjangoJSONEncoder) if val_result else {}
log_file.validation['parameters'] = {'buffer_size': buffer_size, 'sample_size': sample_size}
except json.JSONDecodeError as e:
logging.error(f'Error serializing validation result: {e}')
log_file.validation = {}
@celery_app.task(bind=True, name=_('Validate log file'), timelimit=-1)
def task_validate_log_file(self, log_file_hash, user_id=None, username=None):
"""
Task to validate a specific log file.

if val_result.get('is_valid', {}).get('all', False):
models.LogFileDate.create_or_update(
user=user,
log_file=log_file,
date=val_result.get('probably_date', ''),
)
log_file.status = choices.LOG_FILE_STATUS_QUEUED
Parameters:
log_file_id (int): The ID of the log file to validate.
user_id (int, optional): The ID of the user initiating the task. Defaults to None.
username (str, optional): The username of the user initiating the task. Defaults to None.
"""
user = _get_user(self.request, username=username, user_id=user_id)
log_file = models.LogFile.objects.get(hash=log_file_hash)
collection = log_file.collection.acron3

else:
log_file.status = choices.LOG_FILE_STATUS_INVALIDATED
buffer_size, sample_size = _fetch_validation_parameters(collection)

logging.info(f'Log file {log_file.path} ({log_file.collection.acron3}) has status {log_file.status}.')
log_file.save()
logging.info(f'Validating log file {log_file.path}.')
val_result = utils.validate_file(path=log_file.path, buffer_size=buffer_size, sample_size=sample_size)
if 'datetimes' in val_result.get('content', {}).get('summary', {}):
del val_result['content']['summary']['datetimes']

if 'probably_date' in val_result:
val_result['probably_date'] = date_utils.get_date_str(val_result['probably_date'])

try:
log_file.validation = val_result
log_file.validation.update({'buffer_size': buffer_size, 'sample_size': sample_size})
except json.JSONDecodeError as e:
logging.error(f'Error serializing validation result: {e}')
log_file.validation = {}

if val_result.get('is_valid', {}).get('all', False):
models.LogFileDate.create_or_update(
user=user,
log_file=log_file,
date=val_result.get('probably_date', ''),
)
log_file.status = choices.LOG_FILE_STATUS_QUEUED

else:
log_file.status = choices.LOG_FILE_STATUS_INVALIDATED

logging.info(f'Log file {log_file.path} ({log_file.collection.acron3}) has status {log_file.status}.')
log_file.save()


def _fetch_validation_parameters(collection, default_buffer_size=0.1, default_sample_size=2048):
Expand Down Expand Up @@ -199,8 +216,7 @@ def _check_missing_logs_for_date(user, collection, date):
@celery_app.task(bind=True, name=_('Generate log files count report'))
def task_log_files_count_status_report(self, collections=[], from_date=None, until_date=None, user_id=None, username=None):
from_date, until_date = date_utils.get_date_range_str(from_date, until_date)
possible_dates_n = len(date_utils.get_date_objs_from_date_range(from_date, until_date))


from_date_obj = date_utils.get_date_obj(from_date)
until_date_obj = date_utils.get_date_obj(until_date)

Expand Down
51 changes: 44 additions & 7 deletions metrics/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from core.utils.utils import _get_user
from core.utils.date_utils import (
get_date_obj,
get_date_str,
get_date_range_str,
get_date_objs_from_date_range,
Expand Down Expand Up @@ -40,27 +41,63 @@
from .models import UserAgent, UserSession, Item, ItemAccess

import logging
import json


User = get_user_model()


@celery_app.task(bind=True, name=_('Compute access'), timelimit=-1)
def task_parse_logs(self, collection, user_id=None, username=None):
def task_parse_logs(self, collections=[], from_date=None, until_date=None, user_id=None, username=None):
"""
Parses log files associated with a given collection.

Args:
collection (str): Acronym associated with the collection for which logs are being parsed.
collections (list, optional): List of collection acronyms to parse logs for. Defaults to all collections.
from_date (str, optional): Start date for log parsing in 'YYYY-MM-DD' format. Defaults to None.
until_date (str, optional): End date for log parsing in 'YYYY-MM-DD' format. Defaults to None.
user_id
username

Returns:
None.
"""
for lf in LogFile.objects.filter(status=choices.LOG_FILE_STATUS_QUEUED, collection__acron3=collection):
logging.info(f'PARSING file {lf.path}')
task_parse_log.apply_async(args=(lf.hash, user_id, username))

"""
from_date, until_date = get_date_range_str(from_date, until_date)

from_date_obj = get_date_obj(from_date)
until_date_obj = get_date_obj(until_date)

for collection in collections or Collection.acron3_list():
for lf in LogFile.objects.filter(status=choices.LOG_FILE_STATUS_QUEUED, collection__acron3=collection):
probably_date = _extract_date_from_validation(lf.validation)
if not probably_date:
logging.debug(f'Log file {lf.path} does not have a valid probably date.')
continue

if probably_date <= from_date_obj or probably_date >= until_date_obj:
continue

logging.info(f'PARSING file {lf.path}')
task_parse_log.apply_async(args=(lf.hash, user_id, username))


def _extract_date_from_validation(validation):
"""
Extracts the date from the validation dict of a log file.

Args:
validation (dict): The validation dict of the log file.

Returns:
datetime.date: The extracted date.
"""
try:
date_str = validation.get('probably_date')
return get_date_obj(date_str, '%Y-%m-%d')
except Exception as e:
logging.error(f"Failed to extract date from validation: {e}")
return None


@celery_app.task(bind=True, name=_('Parse one log'), timelimit=-1)
def task_parse_log(self, log_file_hash, user_id=None, username=None):
Expand Down