diff --git a/VERSION b/VERSION index afa2b35..abb1658 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.8.0 \ No newline at end of file +1.9.0 \ No newline at end of file diff --git a/log_manager/tasks.py b/log_manager/tasks.py index f7552eb..5a04204 100644 --- a/log_manager/tasks.py +++ b/log_manager/tasks.py @@ -100,8 +100,6 @@ def task_validate_log_files(self, collections=[], from_date=None, until_date=Non username (str, optional): The username of the user initiating the task. Defaults to None. ignore_date (bool, optional): If True, ignore the date of the log file. Defaults to False. """ - user = _get_user(self.request, username=username, user_id=user_id) - logging.info(f'Validating log files for collections: {collections}.') visible_dates = _get_visible_dates(from_date, until_date, days_to_go_back) @@ -113,34 +111,53 @@ def task_validate_log_files(self, collections=[], from_date=None, until_date=Non for log_file in models.LogFile.objects.filter(status=choices.LOG_FILE_STATUS_CREATED, collection__acron3=col): file_ctime = date_utils.get_date_obj_from_timestamp(log_file.stat_result[LOGFILE_STAT_RESULT_CTIME_INDEX]) if file_ctime in visible_dates or ignore_date: - logging.info(f'Validating log file {log_file.path} for collection {log_file.collection.acron3}.') + task_validate_log_file.apply_async(args=(log_file.hash, user_id, username)) - buffer_size, sample_size = _fetch_validation_parameters(col) - - val_result = utils.validate_file(path=log_file.path, buffer_size=buffer_size, sample_size=sample_size) - if 'datetimes' in val_result.get('content', {}).get('summary', {}): - del val_result['content']['summary']['datetimes'] - try: - log_file.validation['result'] = json.dumps(val_result, cls=DjangoJSONEncoder) if val_result else {} - log_file.validation['parameters'] = {'buffer_size': buffer_size, 'sample_size': sample_size} - except json.JSONDecodeError as e: - logging.error(f'Error serializing validation result: {e}') - log_file.validation = {} +@celery_app.task(bind=True, name=_('Validate log file'), timelimit=-1) +def task_validate_log_file(self, log_file_hash, user_id=None, username=None): + """ + Task to validate a specific log file. - if val_result.get('is_valid', {}).get('all', False): - models.LogFileDate.create_or_update( - user=user, - log_file=log_file, - date=val_result.get('probably_date', ''), - ) - log_file.status = choices.LOG_FILE_STATUS_QUEUED + Parameters: + log_file_id (int): The ID of the log file to validate. + user_id (int, optional): The ID of the user initiating the task. Defaults to None. + username (str, optional): The username of the user initiating the task. Defaults to None. + """ + user = _get_user(self.request, username=username, user_id=user_id) + log_file = models.LogFile.objects.get(hash=log_file_hash) + collection = log_file.collection.acron3 - else: - log_file.status = choices.LOG_FILE_STATUS_INVALIDATED + buffer_size, sample_size = _fetch_validation_parameters(collection) - logging.info(f'Log file {log_file.path} ({log_file.collection.acron3}) has status {log_file.status}.') - log_file.save() + logging.info(f'Validating log file {log_file.path}.') + val_result = utils.validate_file(path=log_file.path, buffer_size=buffer_size, sample_size=sample_size) + if 'datetimes' in val_result.get('content', {}).get('summary', {}): + del val_result['content']['summary']['datetimes'] + + if 'probably_date' in val_result: + val_result['probably_date'] = date_utils.get_date_str(val_result['probably_date']) + + try: + log_file.validation = val_result + log_file.validation.update({'buffer_size': buffer_size, 'sample_size': sample_size}) + except json.JSONDecodeError as e: + logging.error(f'Error serializing validation result: {e}') + log_file.validation = {} + + if val_result.get('is_valid', {}).get('all', False): + models.LogFileDate.create_or_update( + user=user, + log_file=log_file, + date=val_result.get('probably_date', ''), + ) + log_file.status = choices.LOG_FILE_STATUS_QUEUED + + else: + log_file.status = choices.LOG_FILE_STATUS_INVALIDATED + + logging.info(f'Log file {log_file.path} ({log_file.collection.acron3}) has status {log_file.status}.') + log_file.save() def _fetch_validation_parameters(collection, default_buffer_size=0.1, default_sample_size=2048): @@ -199,8 +216,7 @@ def _check_missing_logs_for_date(user, collection, date): @celery_app.task(bind=True, name=_('Generate log files count report')) def task_log_files_count_status_report(self, collections=[], from_date=None, until_date=None, user_id=None, username=None): from_date, until_date = date_utils.get_date_range_str(from_date, until_date) - possible_dates_n = len(date_utils.get_date_objs_from_date_range(from_date, until_date)) - + from_date_obj = date_utils.get_date_obj(from_date) until_date_obj = date_utils.get_date_obj(until_date) diff --git a/metrics/tasks.py b/metrics/tasks.py index d424d54..2e99174 100644 --- a/metrics/tasks.py +++ b/metrics/tasks.py @@ -9,6 +9,7 @@ from core.utils.utils import _get_user from core.utils.date_utils import ( + get_date_obj, get_date_str, get_date_range_str, get_date_objs_from_date_range, @@ -40,27 +41,63 @@ from .models import UserAgent, UserSession, Item, ItemAccess import logging +import json User = get_user_model() + @celery_app.task(bind=True, name=_('Compute access'), timelimit=-1) -def task_parse_logs(self, collection, user_id=None, username=None): +def task_parse_logs(self, collections=[], from_date=None, until_date=None, user_id=None, username=None): """ Parses log files associated with a given collection. Args: - collection (str): Acronym associated with the collection for which logs are being parsed. + collections (list, optional): List of collection acronyms to parse logs for. Defaults to all collections. + from_date (str, optional): Start date for log parsing in 'YYYY-MM-DD' format. Defaults to None. + until_date (str, optional): End date for log parsing in 'YYYY-MM-DD' format. Defaults to None. user_id username Returns: None. - """ - for lf in LogFile.objects.filter(status=choices.LOG_FILE_STATUS_QUEUED, collection__acron3=collection): - logging.info(f'PARSING file {lf.path}') - task_parse_log.apply_async(args=(lf.hash, user_id, username)) - + """ + from_date, until_date = get_date_range_str(from_date, until_date) + + from_date_obj = get_date_obj(from_date) + until_date_obj = get_date_obj(until_date) + + for collection in collections or Collection.acron3_list(): + for lf in LogFile.objects.filter(status=choices.LOG_FILE_STATUS_QUEUED, collection__acron3=collection): + probably_date = _extract_date_from_validation(lf.validation) + if not probably_date: + logging.debug(f'Log file {lf.path} does not have a valid probably date.') + continue + + if probably_date <= from_date_obj or probably_date >= until_date_obj: + continue + + logging.info(f'PARSING file {lf.path}') + task_parse_log.apply_async(args=(lf.hash, user_id, username)) + + +def _extract_date_from_validation(validation): + """ + Extracts the date from the validation dict of a log file. + + Args: + validation (dict): The validation dict of the log file. + + Returns: + datetime.date: The extracted date. + """ + try: + date_str = validation.get('probably_date') + return get_date_obj(date_str, '%Y-%m-%d') + except Exception as e: + logging.error(f"Failed to extract date from validation: {e}") + return None + @celery_app.task(bind=True, name=_('Parse one log'), timelimit=-1) def task_parse_log(self, log_file_hash, user_id=None, username=None):