diff --git a/VERSION b/VERSION index 6f165bc..f88cf52 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.12.1 \ No newline at end of file +1.13.0 \ No newline at end of file diff --git a/metrics/tests/test_models.py b/article/management/__init__.py similarity index 100% rename from metrics/tests/test_models.py rename to article/management/__init__.py diff --git a/article/management/commands/__init__.py b/article/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/article/management/commands/load_articles_by_year.py b/article/management/commands/load_articles_by_year.py new file mode 100644 index 0000000..335598e --- /dev/null +++ b/article/management/commands/load_articles_by_year.py @@ -0,0 +1,80 @@ +from django.core.management.base import BaseCommand + +from article.tasks import task_load_article_from_opac, task_load_article_from_article_meta + + +class Command(BaseCommand): + help = 'Generate task requests for loading article data from Article Meta for each year from 1900 to 2025' + + def add_arguments(self, parser): + parser.add_argument( + '--start-year', + type=int, + default=1990, + help='Start year (default: 1990)' + ) + parser.add_argument( + '--end-year', + type=int, + default=2025, + help='End year (default: 2025)' + ) + parser.add_argument( + '--collection', + type=str, + default='scl', + help='Collection code (default: scl)' + ) + parser.add_argument( + '--task', + choices=['load_article_from_opac', 'load_article_from_article_meta'], + default='load_article_from_opac', + help='Task to execute (default: load_article_from_opac)', + ) + + def handle(self, *args, **options): + start_year = options['start_year'] + end_year = options['end_year'] + collection = options['collection'] + + self.stdout.write( + self.style.SUCCESS( + f'Generating task requests from {start_year} to {end_year} for collection: {collection}' + ) + ) + + total_tasks = 0 + + for year in range(start_year, end_year + 1): + from_date = f'{year}-01-01' + until_date = f'{year}-12-31' + + self.stdout.write(f'Queuing task for year {year}...') + + # Queue the task for each year + if options['task'] == 'load_article_from_article_meta': + task_result = task_load_article_from_article_meta.delay( + from_date=from_date, + until_date=until_date, + collection=collection + ) + else: + task_result = task_load_article_from_opac.delay( + from_date=from_date, + until_date=until_date, + collection=collection + ) + + total_tasks += 1 + + self.stdout.write( + self.style.SUCCESS( + f'✓ Task queued for year {year}: {from_date} to {until_date} (Task ID: {task_result.id})' + ) + ) + + self.stdout.write( + self.style.SUCCESS( + f'\nCompleted! {total_tasks} tasks have been queued successfully.' + ) + ) diff --git a/article/models.py b/article/models.py index 71f3e96..80d2a97 100644 --- a/article/models.py +++ b/article/models.py @@ -114,7 +114,6 @@ def metadata(cls, collection=None): for a in qs.iterator(): yield { - 'id': a.id, 'collection': a.collection.acron3, 'default_lang': a.default_lang, 'files': a.files, diff --git a/article/tasks.py b/article/tasks.py index 5ab75d2..958a336 100644 --- a/article/tasks.py +++ b/article/tasks.py @@ -60,7 +60,7 @@ def task_load_article_from_article_meta(self, from_date=None, until_date=None, d try: article, created = models.Article.objects.get_or_create(collection=col_obj, scielo_issn=jou.scielo_issn, pid_v2=obj.get('code')) if created or force_update: - article.files = obj.get('files') or {} + article.files = obj.get('pdfs') or {} article.processing_date = obj.get('processing_date') or '' article.publication_date = obj.get('publication_date') or '' article.publication_year = obj.get('publication_year') or '' @@ -121,8 +121,8 @@ def task_load_article_from_opac(self, collection='scl', from_date=None, until_da article.pid_v3 = doc.get('pid_v3') or '' if not created: article.pid_v2 = doc.get('pid_v2') or '' - article.publication_date = doc.get('publication_date') or '' - article.default_lang = doc.get('default_language') or '' + article.publication_date = doc.get('publication_date') or article.publication_date or '' + article.default_lang = doc.get('default_language') or article.default_lang or '' try: article.publication_year = article.publication_date[:4] diff --git a/article/wagtail_hooks.py b/article/wagtail_hooks.py index 777aa02..4cf55bd 100644 --- a/article/wagtail_hooks.py +++ b/article/wagtail_hooks.py @@ -22,7 +22,7 @@ class ArticleSnippetViewSet(SnippetViewSet): "pid_v3", "pid_generic", "files", - "publication_date", + "publication_year", ) list_filter = ( "collection", diff --git a/compose/local/django/celery/worker/start b/compose/local/django/celery/worker/start index 124ed93..e89cc9e 100644 --- a/compose/local/django/celery/worker/start +++ b/compose/local/django/celery/worker/start @@ -4,4 +4,4 @@ set -o errexit set -o nounset -watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=4 +watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 diff --git a/compose/production/django/celery/worker/start b/compose/production/django/celery/worker/start index 7691ec1..8cbc7d9 100644 --- a/compose/production/django/celery/worker/start +++ b/compose/production/django/celery/worker/start @@ -5,4 +5,4 @@ set -o pipefail set -o nounset -exec celery -A config.celery_app worker -l INFO --concurrency=4 +exec celery -A config.celery_app worker -l INFO --concurrency=1 diff --git a/config/settings/base.py b/config/settings/base.py index db7c967..a914ec2 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -407,7 +407,7 @@ # Elasticsearch # ------------------------------------------------------------------------------ ES_URL = env("ES_URL", default="http://192.168.0.33:9200/") -ES_INDEX_NAME = env("ES_INDEX_NAME", default="usage-daily") +ES_INDEX_NAME = env("ES_INDEX_NAME", default="usage") ES_API_KEY = env("ES_API_KEY", default="") ES_BASIC_AUTH = env("ES_BASIC_AUTH", default=("elastic", "iHktg66E")) ES_VERIFY_CERTS = env.bool("ES_VERIFY_CERTS", default=False) diff --git a/core/tests_standardizer.py b/core/tests_standardizer.py index a12f88a..a50ff87 100644 --- a/core/tests_standardizer.py +++ b/core/tests_standardizer.py @@ -137,3 +137,65 @@ def test_standardize_doi_is_valid_with_doi_prefix_and_https_prefix(self): doi = 'doi:https://doi.org/10.1590/S0102-67202020000100001' standardized = standardizer.standardize_doi(doi) self.assertEqual(standardized, '10.1590/S0102-67202020000100001') + + +class TestStandardizeYearOfPublication(TestCase): + def test_standardize_year_of_publication_four_digit_year(self): + """Test that a four-digit year is returned as-is""" + year = "2023" + result = standardizer.standardize_year_of_publication(year) + self.assertEqual(result, "2023") + + def test_standardize_year_of_publication_integer_year(self): + """Test that an integer year is converted to string""" + year = 2023 + result = standardizer.standardize_year_of_publication(year) + self.assertEqual(result, "2023") + + def test_standardize_year_of_publication_year_range(self): + """Test that a year range returns the first year""" + year = "2020-2023" + result = standardizer.standardize_year_of_publication(year) + self.assertEqual(result, "2020") + + def test_standardize_year_of_publication_year_with_slash(self): + """Test that a year with slash returns the first year""" + year = "2020/2023" + result = standardizer.standardize_year_of_publication(year) + self.assertEqual(result, "2020") + + def test_standardize_year_of_publication_year_with_extra_text(self): + """Test that year with extra text extracts the year""" + year = "Published in 2023" + result = standardizer.standardize_year_of_publication(year) + self.assertEqual(result, "") + + def test_standardize_year_of_publication_invalid_year(self): + """Test that invalid year returns None or empty string""" + year = "invalid" + result = standardizer.standardize_year_of_publication(year) + self.assertEqual(result, '') + + def test_standardize_year_of_publication_empty_string(self): + """Test that empty string returns None or empty string""" + year = "" + result = standardizer.standardize_year_of_publication(year) + self.assertEqual(result, '') + + def test_standardize_year_of_publication_none_input(self): + """Test that None input returns None""" + year = None + result = standardizer.standardize_year_of_publication(year) + self.assertEqual(result, '') + + def test_standardize_year_of_publication_two_digit_year(self): + """Test that two-digit year is converted to four-digit year""" + year = "23" + result = standardizer.standardize_year_of_publication(year) + self.assertEqual(result, '') + + def test_standardize_year_of_publication_year_with_parentheses(self): + """Test that year in parentheses is extracted""" + year = "(2023)" + result = standardizer.standardize_year_of_publication(year) + self.assertEqual(result, '') diff --git a/core/utils/standardizer.py b/core/utils/standardizer.py index dda02e9..27b5cba 100644 --- a/core/utils/standardizer.py +++ b/core/utils/standardizer.py @@ -218,3 +218,30 @@ def standardize_pid_generic(pid_generic): return pid_generic_based_on_doi return pid_generic.strip().upper() + + +def standardize_year_of_publication(year_of_publication): + """ + Standardizes a year of publication. + + Parameters: + year_of_publication (str): The year of publication to be standardized. + + Returns: + str: The standardized year of publication or an empty string if the input is not a valid year. + """ + if not year_of_publication: + return '' + + # Truncate to 4 characters if longer + if isinstance(year_of_publication, str) and len(year_of_publication) > 4: + year_of_publication = year_of_publication[:4] + + try: + year = int(year_of_publication) + if 1500 <= year <= 2100: + return str(year) + except ValueError: + pass + + return '' diff --git a/journal/models.py b/journal/models.py index e3bed0d..0d830e9 100644 --- a/journal/models.py +++ b/journal/models.py @@ -80,7 +80,6 @@ def metadata(cls, collection=None): 'scielo_issn', 'subject_areas', 'title', 'wos_subject_areas' ): yield { - 'id': journal.id, 'acronym': journal.acronym, 'collection': journal.collection.acron3, 'issns': set([v for v in journal.issns.values() if v]), diff --git a/log_manager/choices.py b/log_manager/choices.py index 0c62e01..e98c8f2 100644 --- a/log_manager/choices.py +++ b/log_manager/choices.py @@ -5,6 +5,7 @@ LOG_FILE_STATUS_QUEUED = 'QUE' LOG_FILE_STATUS_PARSING = 'PAR' LOG_FILE_STATUS_PROCESSED = 'PRO' +LOG_FILE_STATUS_ERROR = 'ERR' LOG_FILE_STATUS_INVALIDATED = 'INV' LOG_FILE_STATUS_IGNORED = 'IGN' @@ -13,6 +14,7 @@ (LOG_FILE_STATUS_QUEUED, _("Queued")), (LOG_FILE_STATUS_PARSING, _("Parsing")), (LOG_FILE_STATUS_PROCESSED, _("Processed")), + (LOG_FILE_STATUS_ERROR, _("Error")), (LOG_FILE_STATUS_INVALIDATED, _("Invalidated")), (LOG_FILE_STATUS_IGNORED, _("Ignored")), ] diff --git a/log_manager/migrations/0006_logfile_last_processed_line.py b/log_manager/migrations/0006_logfile_last_processed_line.py new file mode 100644 index 0000000..44bf2db --- /dev/null +++ b/log_manager/migrations/0006_logfile_last_processed_line.py @@ -0,0 +1,19 @@ +# Generated by Django 5.0.7 on 2025-06-22 15:21 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("log_manager", "0005_alter_logfile_status_alter_logfiledate_date_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="logfile", + name="last_processed_line", + field=models.IntegerField( + blank=True, default=0, verbose_name="Last Processed Line" + ), + ), + ] diff --git a/log_manager/migrations/0007_logfile_summary.py b/log_manager/migrations/0007_logfile_summary.py new file mode 100644 index 0000000..2cbf65d --- /dev/null +++ b/log_manager/migrations/0007_logfile_summary.py @@ -0,0 +1,19 @@ +# Generated by Django 5.0.7 on 2025-06-22 17:30 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("log_manager", "0006_logfile_last_processed_line"), + ] + + operations = [ + migrations.AddField( + model_name="logfile", + name="summary", + field=models.JSONField( + blank=True, default=dict, null=True, verbose_name="Summary" + ), + ), + ] diff --git a/log_manager/migrations/0008_alter_logfile_status.py b/log_manager/migrations/0008_alter_logfile_status.py new file mode 100644 index 0000000..b6d5eee --- /dev/null +++ b/log_manager/migrations/0008_alter_logfile_status.py @@ -0,0 +1,29 @@ +# Generated by Django 5.0.7 on 2025-08-06 19:01 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("log_manager", "0007_logfile_summary"), + ] + + operations = [ + migrations.AlterField( + model_name="logfile", + name="status", + field=models.CharField( + choices=[ + ("CRE", "Created"), + ("QUE", "Queued"), + ("PAR", "Parsing"), + ("PRO", "Processed"), + ("ERR", "Error"), + ("INV", "Invalidated"), + ("IGN", "Ignored"), + ], + max_length=3, + verbose_name="Status", + ), + ), + ] diff --git a/log_manager/migrations/0009_collectionlogfiledatecount_exported_files_count.py b/log_manager/migrations/0009_collectionlogfiledatecount_exported_files_count.py new file mode 100644 index 0000000..dad5cdb --- /dev/null +++ b/log_manager/migrations/0009_collectionlogfiledatecount_exported_files_count.py @@ -0,0 +1,19 @@ +# Generated by Django 5.0.7 on 2025-08-07 00:14 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("log_manager", "0008_alter_logfile_status"), + ] + + operations = [ + migrations.AddField( + model_name="collectionlogfiledatecount", + name="exported_files_count", + field=models.SmallIntegerField( + default=0, verbose_name="Exported Files Count" + ), + ), + ] diff --git a/log_manager/models.py b/log_manager/models.py index b95a758..fc3a8b6 100644 --- a/log_manager/models.py +++ b/log_manager/models.py @@ -133,6 +133,11 @@ class CollectionLogFileDateCount(CommonControlField): verbose_name=_('Is Usage Metric Computed'), default=False, ) + + exported_files_count = models.SmallIntegerField( + verbose_name=_('Exported Files Count'), + default=0, + ) status = models.CharField( verbose_name=_('Status'), @@ -147,7 +152,11 @@ def set_status(self): self.status = choices.COLLECTION_LOG_FILE_DATE_COUNT_EXTRA_FILES else: self.status = choices.COLLECTION_LOG_FILE_DATE_COUNT_OK - + + def set_is_usage_metric_computed(self): + if self.exported_files_count == self.found_log_files: + self.is_usage_metric_computed = True + @classmethod def create_or_update(cls, user, collection, date, expected_log_files, found_log_files): obj, created = cls.objects.get_or_create( @@ -216,6 +225,13 @@ class LogFile(CommonControlField): default=dict, ) + summary = models.JSONField( + _("Summary"), + null=True, + blank=True, + default=dict, + ) + collection = models.ForeignKey( Collection, verbose_name=_("Collection"), @@ -224,12 +240,20 @@ class LogFile(CommonControlField): blank=False, ) + last_processed_line = models.IntegerField( + _("Last Processed Line"), + blank=True, + default=0, + ) + panels = [ FieldPanel('hash'), FieldPanel('path'), FieldPanel('stat_result'), FieldPanel('status'), FieldPanel('validation'), + FieldPanel('summary'), + FieldPanel('last_processed_line'), AutocompletePanel('collection'), ] diff --git a/log_manager/tasks.py b/log_manager/tasks.py index 93a8b3d..054a27a 100644 --- a/log_manager/tasks.py +++ b/log_manager/tasks.py @@ -46,11 +46,11 @@ def task_search_log_files(self, collections=[], from_date=None, until_date=None, col_configs_dirs = lmc_models.CollectionLogDirectory.objects.filter(collection__acron3=col, active=True) if len(col_configs_dirs) == 0: - raise lmc_exceptions.UndefinedCollectionLogDirectoryError(_(f'Error. Please, add a CollectionLogDirectory for the collection {col}.')) + logging.error(f'No CollectionLogDirectory found for collection {col}.') supported_logfile_extensions = lmc_models.SupportedLogFile.objects.values_list('file_extension', flat=True) if len(supported_logfile_extensions) == 0: - raise lmc_exceptions.UndefinedSupportedLogFile(_('Error. Please, add a SupportedLogFile for each of the supported log file formats.')) + logging.error('No SupportedLogFile found. Please, add a SupportedLogFile for each of the supported log file formats.') for cd in col_configs_dirs: for root, _sub_dirs, files in os.walk(cd.path): @@ -99,14 +99,14 @@ def task_validate_log_files(self, collections=[], from_date=None, until_date=Non username (str, optional): The username of the user initiating the task. Defaults to None. ignore_date (bool, optional): If True, ignore the date of the log file. Defaults to False. """ - logging.info(f'Validating log files for collections: {collections}.') + cols = collections or Collection.acron3_list() + logging.info(f'Validating log files for collections: {cols}.') visible_dates = _get_visible_dates(from_date, until_date, days_to_go_back) - if not ignore_date: logging.info(f'Interval: {visible_dates[0]} to {visible_dates[-1]}.') - for col in collections or Collection.acron3_list(): + for col in cols: for log_file in models.LogFile.objects.filter(status=choices.LOG_FILE_STATUS_CREATED, collection__acron3=col): file_ctime = date_utils.get_date_obj_from_timestamp(log_file.stat_result[LOGFILE_STAT_RESULT_CTIME_INDEX]) if file_ctime in visible_dates or ignore_date: @@ -191,11 +191,11 @@ def task_check_missing_logs_for_date_range(self, collections=[], from_date=None, for col in collections or Collection.acron3_list(): collection = Collection.objects.get(acron3=col) for date in date_utils.get_date_objs_from_date_range(from_date_str, until_date_str): - logging.info(f'Checking missings logs for collection {col} and date {date}') - _check_missing_logs_for_date(user, collection, date) + logging.info(f'Couting logs for collection {col} and date {date}') + count_logs_for_date(user, collection, date) -def _check_missing_logs_for_date(user, collection, date): +def count_logs_for_date(user, collection, date): try: n_expected_files = lmc_models.CollectionLogFilesPerDay.get_number_of_expected_files_by_day(collection=collection.acron3, date=date) except lmc_exceptions.UndefinedCollectionFilesPerDayError: diff --git a/log_manager/wagtail_hooks.py b/log_manager/wagtail_hooks.py index 2f0466e..aeb6908 100644 --- a/log_manager/wagtail_hooks.py +++ b/log_manager/wagtail_hooks.py @@ -40,11 +40,13 @@ class CollectionLogFileDateCountViewSet(SnippetViewSet): "found_log_files", "expected_log_files", "status", + "exported_files_count", "is_usage_metric_computed", ) list_filter = ( "collection", "status", + "exported_files_count", "is_usage_metric_computed", "year", "month" @@ -62,6 +64,8 @@ class LogFileSnippetViewSet(SnippetViewSet): "collection", "status", "validation", + "summary", + "last_processed_line", "hash" ) list_filter = ("status", "collection") diff --git a/metrics/es.py b/metrics/es.py index 6bc7630..f0d42f0 100644 --- a/metrics/es.py +++ b/metrics/es.py @@ -1,244 +1,382 @@ -from elasticsearch import Elasticsearch, helpers, NotFoundError -from django.conf import settings - import logging +from elasticsearch import Elasticsearch, helpers, NotFoundError +from django.conf import settings -def get_elasticsearch_client(url=None, basic_auth=None, api_key=None, verify_certs=False): - """ - Create an Elasticsearch client instance using Django settings. - - :param url: Elasticsearch URL. If None, it will be taken from Django settings. - :param basic_auth: Basic authentication credentials. If None, it will be taken from Django settings. - :param api_key: API key. If None, it will be taken from Django settings. - :param verify_certs: Whether to verify SSL certificates. If None, it will be taken from Django settings. - """ - if not url: - url = getattr(settings, "ES_URL", None) - - if not basic_auth: - basic_auth = getattr(settings, "ES_BASIC_AUTH", None) - - if not api_key: - api_key = getattr(settings, "ES_API_KEY", None) - - if not verify_certs: - verify_certs = getattr(settings, "ES_VERIFY_CERTS", False) - - if basic_auth: - client = Elasticsearch(url, basic_auth=basic_auth, verify_certs=verify_certs) - elif api_key: - client = Elasticsearch(url, api_key=api_key, verify_certs=verify_certs) - else: - client = Elasticsearch(url, verify_certs=verify_certs) - - return client +from .utils import index_utils -def create_index(index_name, mappings=None, client=None, url=None, basic_auth=None, api_key=None): - """ - Create an Elasticsearch index. - - :param index_name: Name of the index to create. - :param mappings: Mappings for the index. If None, default mappings will be used. - :param client: Elasticsearch client instance. If None, a new client will be created. - :param url: Elasticsearch URL. If None, it will be taken from Django settings. - :param basic_auth: Basic authentication credentials. If None, it will be taken from Django settings. - :param api_key: API key. If None, it will be taken from Django settings. - """ - if not client: - client = get_elasticsearch_client(url, basic_auth, api_key) - - if not mappings: - mappings = { +DEFAULT_ES_INDEX_USAGE_MAPPINGS = { + "properties": { + "collection": { + "type": "keyword" + }, + "journal": { "properties": { - "collection": { - "type": "keyword" - }, - "journal": { + "scielo_issn": { "type": "keyword" }, - "pid_v2": { + "main_title": { "type": "keyword" }, - "pid_v3": { + "subject_area_capes": { "type": "keyword" }, - "pid_generic": { + "subject_area_wos": { "type": "keyword" }, - "media_language": { + "acronym": { "type": "keyword" }, - "country_code": { - "type": "keyword" - }, - "date": { - "type": "date", - "format": "yyyy-MM-dd" - }, - "year": { - "type": "integer" - }, - "month": { - "type": "integer" - }, - "day": { - "type": "integer" - }, - "total_requests": { - "type": "integer" - }, - "total_investigations": { - "type": "integer" - }, - "unique_requests": { - "type": "integer" - }, - "unique_investigations": { - "type": "integer" - } } + }, + "pid": { + "type": "keyword" + }, + "pid_v2": { + "type": "keyword" + }, + "pid_v3": { + "type": "keyword" + }, + "pid_generic": { + "type": "keyword" + }, + "year_of_publication": { + "type": "integer" + }, + "media_language": { + "type": "keyword" + }, + "country_code": { + "type": "keyword" + }, + "date": { + "type": "date", + "format": "yyyy-MM-dd" + }, + "total_requests": { + "type": "integer" + }, + "total_investigations": { + "type": "integer" + }, + "unique_requests": { + "type": "integer" + }, + "unique_investigations": { + "type": "integer" } - - resp = client.indices.create( - index=index_name, - mappings=mappings, - ) - logging.info(f"Index {index_name} created: {resp}") + } +} -def delete_index(index_name, client=None, url=None, basic_auth=None, api_key=None): +class ElasticSearchUsageWrapper: """ - Delete an Elasticsearch index. - - :param index_name: Name of the index to delete. - :param client: Elasticsearch client instance. If None, a new client will be created. - :param url: Elasticsearch URL. If None, it will be taken from Django settings. - :param basic_auth: Basic authentication credentials. If None, it will be taken from Django settings. - :param api_key: API key. If None, it will be taken from Django settings. + Wrapper for Elasticsearch usage metrics operations. + This class provides methods to interact with Elasticsearch for indexing, + deleting, and managing usage metrics data. """ - if not client: - client = get_elasticsearch_client(url, basic_auth, api_key) - client.indices.delete(index=index_name) + def __init__(self, url=None, basic_auth=None, api_key=None, verify_certs=False): + self.client = self.get_elasticsearch_client(url, basic_auth, api_key, verify_certs) -def index_document(index_name, doc_id, document, client=None, url=None, basic_auth=None, api_key=None): - """ - Index a document in Elasticsearch. - - :param index_name: Name of the index. - :param doc_id: ID of the document. - :param document: Document to index. - :param client: Elasticsearch client instance. If None, a new client will be created. - :param url: Elasticsearch URL. If None, it will be taken from Django settings. - :param basic_auth: Basic authentication credentials. If None, it will be taken from Django settings. - :param api_key: API key. If None, it will be taken from Django settings. - """ - if not client: - client = get_elasticsearch_client(url, basic_auth, api_key) - client.index(index=index_name, id=doc_id, document=document) + def get_elasticsearch_client(self, url=None, basic_auth=None, api_key=None, verify_certs=False): + """ + Create an Elasticsearch client instance using Django settings. -def index_documents(index_name, documents, client=None, url=None, basic_auth=None, api_key=None): - """ - Index multiple documents in Elasticsearch. - - :param index_name: Name of the index. - :param documents: Dictionary of documents to index, where keys are document IDs and values are the documents. - :param client: Elasticsearch client instance. If None, a new client will be created. - :param url: Elasticsearch URL. If None, it will be taken from Django settings. - :param basic_auth: Basic authentication credentials. If None, it will be taken from Django settings. - :param api_key: API key. If None, it will be taken from Django settings. - """ - if not client: - client = get_elasticsearch_client(url, basic_auth, api_key) - - helpers.bulk( - client, - ( - { - "_index": index_name, - "_id": doc_id, - "_source": document, - } - for doc_id, document in documents.items() - ), - ) + :param url: Elasticsearch URL. If None, it will be taken from Django settings. + :param basic_auth: Basic authentication credentials. If None, it will be taken from Django settings. + :param api_key: API key. If None, it will be taken from Django settings. + :param verify_certs: Whether to verify SSL certificates. If None, it will be taken from Django settings. + """ + if not url: + url = getattr(settings, "ES_URL", None) + if not basic_auth: + basic_auth = getattr(settings, "ES_BASIC_AUTH", None) -def delete_document(index_name, doc_id, client=None, url=None, basic_auth=None, api_key=None): - """ - Delete a document from Elasticsearch. - - :param index_name: Name of the index. - :param doc_id: ID of the document to delete. - :param client: Elasticsearch client instance. If None, a new client will be created. - :param url: Elasticsearch URL. If None, it will be taken from Django settings. - :param basic_auth: Basic authentication credentials. If None, it will be taken from Django settings. - :param api_key: API key. If None, it will be taken from Django settings. - """ - if not client: - client = get_elasticsearch_client(url, basic_auth, api_key) + if not api_key: + api_key = getattr(settings, "ES_API_KEY", None) - try: - client.delete(index=index_name, id=doc_id) - except NotFoundError as e: - logging.error(f"Failed to delete document {doc_id} from Elasticsearch: {e}") + if not verify_certs: + verify_certs = getattr(settings, "ES_VERIFY_CERTS", False) + if basic_auth: + client = Elasticsearch(url, basic_auth=basic_auth, verify_certs=verify_certs) + elif api_key: + client = Elasticsearch(url, api_key=api_key, verify_certs=verify_certs) + else: + client = Elasticsearch(url, verify_certs=verify_certs) -def delete_documents(index_name, doc_ids, client=None, url=None, basic_auth=None, api_key=None): - """ - Delete multiple documents from Elasticsearch using bulk. - :param index_name: Name of the index. - :param doc_ids: List of document IDs to delete. - :param client: Elasticsearch client instance. If None, a new client will be created. - :param url: Elasticsearch URL. If None, it will be taken from Django settings. - :param basic_auth: Basic authentication credentials. If None, it will be taken from Django settings. - :param api_key: API key. If None, it will be taken from Django settings. - """ - if not client: - client = get_elasticsearch_client(url, basic_auth, api_key) + return client - actions = ( - { - "_op_type": "delete", - "_index": index_name, - "_id": doc_id, - } - for doc_id in doc_ids - ) - - try: - helpers.bulk(client, actions) - except helpers.BulkIndexError as e: - logging.error(f"BulkIndexError occurred: {e.errors}") - - -def delete_documents_by_key(index_name, key, values, client=None, url=None, basic_auth=None, api_key=None): - """ - Delete multiple documents from Elasticsearch based on a specific key and its values. - - :param index_name: Name of the index. - :param key: Key to search for in the documents. - :param values: List of values to match against the key. - :param client: Elasticsearch client instance. If None, a new client will be created. - :param url: Elasticsearch URL. If None, it will be taken from Django settings. - :param basic_auth: Basic authentication credentials. If None, it will be taken from Django settings. - :param api_key: API key. If None, it will be taken from Django settings. - """ - if not client: - client = get_elasticsearch_client(url, basic_auth, api_key) - query = { - "query": { - "terms": { - key: values + def ping(self): + """ + Check if the Elasticsearch client is available. + Returns True if the client is available, False otherwise. + """ + try: + return self.client.ping() + except Exception as e: + logging.error(f"Error pinging Elasticsearch client: {e}") + return False + + + def create_index(self, index_name, mappings=None, ping_client=False): + """ + Create an Elasticsearch index. + + :param index_name: Name of the index to create. + :param mappings: Mappings for the index. If None, default mappings will be used. + :param ping_client: If True, checks if the Elasticsearch client is available before creating the index. + """ + if ping_client and not self.ping(): + return + + if not mappings: + mappings = DEFAULT_ES_INDEX_USAGE_MAPPINGS + + resp = self.client.indices.create( + index=index_name, + mappings=mappings, + ) + logging.info(f"Index {index_name} created: {resp}") + + + def create_index_if_not_exists(self, index_name, mappings=None, ping_client=False): + """ + Create an Elasticsearch index if it does not already exist. + + :param index_name: Name of the index to create. + :param mappings: Mappings for the index. If None, default mappings will be used. + :param ping_client: If True, checks if the Elasticsearch client is available before creating the index. + """ + if ping_client and not self.ping(): + return + + if not self.client.indices.exists(index=index_name): + self.create_index(index_name, mappings, ping_client) + else: + logging.info(f"Index {index_name} already exists. Skipping creation.") + + + def delete_index(self, index_name, ping_client=False): + """ + Delete an Elasticsearch index. + + :param index_name: Name of the index to delete. + :param ping_client: If True, checks if the Elasticsearch client is available before deleting the index. + """ + if ping_client and not self.ping(): + return + + self.client.indices.delete(index=index_name) + + + def index_document(self, index_name, doc_id, document, ping_client=False): + """ + Index a document in Elasticsearch. + + :param index_name: Name of the index. + :param doc_id: ID of the document. + :param document: Document to index. + :param ping_client: If True, checks if the Elasticsearch client is available before indexing the document. + """ + if ping_client and not self.ping(): + return + + self.client.index(index=index_name, id=doc_id, document=document) + + + def index_documents(self, index_name, documents, ping_client=False): + """ + Index multiple documents in Elasticsearch. + + :param index_name: Name of the index. + :param documents: Dictionary of documents to index, where keys are document IDs and values are the documents. + :param ping_client: If True, checks if the Elasticsearch client is available before indexing the documents. + """ + if ping_client and not self.ping(): + return + + helpers.bulk( + self.client, + ( + { + "_index": index_name, + "_id": doc_id, + "_source": document, + } + for doc_id, document in documents.items() + ), + ) + + + def delete_document(self, index_name, doc_id, ping_client=False): + """ + Delete a document from Elasticsearch. + + :param index_name: Name of the index. + :param doc_id: ID of the document to delete. + :param ping_client: If True, checks if the Elasticsearch client is available before deleting the document. + """ + if ping_client and not self.ping(): + return + + try: + self.client.delete(index=index_name, id=doc_id) + except NotFoundError as e: + logging.error(f"Failed to delete document {doc_id} from Elasticsearch: {e}") + + + def delete_documents(self, index_name, doc_ids, ping_client=False): + """ + Delete multiple documents from Elasticsearch using bulk. + :param index_name: Name of the index. + :param doc_ids: List of document IDs to delete. + :param ping_client: If True, checks if the Elasticsearch client is available before deleting the documents. + """ + if ping_client and not self.ping(): + return + + actions = ( + { + "_op_type": "delete", + "_index": index_name, + "_id": doc_id, + } + for doc_id in doc_ids + ) + + try: + helpers.bulk(self.client, actions) + except helpers.BulkIndexError as e: + logging.error(f"BulkIndexError occurred: {e.errors}") + + + def delete_documents_by_key(self, index_name, data, ping_client=False): + """ + Delete multiple documents from Elasticsearch based on specific key-value pairs. + + :param index_name: Name of the index. + :param data: Dictionary where keys are field names and values are single values or lists of values. + :param ping_client: If True, checks if the Elasticsearch client is available before deleting the documents. + """ + if ping_client and not self.ping(): + return + + query = { + "query": { + "bool": { + "must": [ + { + "terms": { + key: values if isinstance(values, list) else [values] + } + } + for key, values in data.items() + ] + } } } - } - try: - client.delete_by_query(index=index_name, body=query) - except Exception as e: - logging.error(f"Failed to delete documents by key {key} with values {values}: {e}") + try: + self.client.delete_by_query(index=index_name, body=query) + return True + except Exception as e: + logging.error(f"Failed to delete documents: {e}") + + return False + + + def fetch_and_update_documents_locally(self, index_name, documents, batch_size=5000, ping_client=False): + """ + Fetch existing documents from Elasticsearch and update local documents with accumulated metrics. + This function retrieves documents from Elasticsearch in batches and merges their metric fields + with the provided local documents. The merge operation adds values for specific metric fields + or sets them if they don't exist in the local documents. + + Args: + index_name (str): Name of the Elasticsearch index to fetch documents from. + documents (dict): Dictionary of documents to be updated, where keys are document IDs and values + are dictionaries containing metric data. + batch_size (int, optional): Number of documents to fetch in each batch from Elasticsearch. + Defaults to 5000. + ping_client (bool, optional): If True, checks if the Elasticsearch client is available before + fetching documents. Defaults to False. + + Returns: + None: The function modifies the input documents dictionary in-place. + """ + if ping_client and not self.ping(): + return + + existing_docs = {} + ids = list(documents.keys()) + + for i in range(0, len(ids), batch_size): + batch_ids = ids[i:i+batch_size] + resp = self.client.mget(index=index_name, ids=batch_ids) + for doc in resp.get('docs', []): + if doc.get('found'): + existing_docs[doc['_id']] = doc['_source'] + logging.info(f'Found {len(existing_docs)} existing documents in Elasticsearch for update.') + + for doc_id, existing in existing_docs.items(): + current = documents[doc_id] + for field in [ + "total_requests", + "unique_requests", + "total_investigations", + "unique_investigations", + ]: + if field in existing and field in current: + current[field] += existing[field] + elif field in existing: + current[field] = existing[field] + + + def export_to_index(self, index_name, data, batch_size=5000, ping_client=False): + """ + Export data to Elasticsearch index in bulk operations. + This function converts input data to index documents, processes them locally, + and then indexes them to Elasticsearch in batches to optimize performance. + + Args: + index_name (str): Name of the Elasticsearch index to export data to. + data: The data to be exported to the Elasticsearch index + batch_size (int, optional): Number of documents to process in each bulk operation. + Defaults to 5000. + ping_client (bool, optional): If True, checks if the Elasticsearch client is available + + Returns: + None: Function performs side effects by indexing data to Elasticsearch + """ + if ping_client and not self.ping(): + return + + bulk_data = [] + documents = index_utils.convert_to_index_documents(data) + self.fetch_and_update_documents_locally(index_name=index_name, documents=documents) + + for key, metric_data in documents.items(): + metric_data['pid'] = metric_data.get('pid_v3') or metric_data.get('pid_v2') or metric_data.get('pid_generic', '') + bulk_data.append({ + "_id": key, + "_source": metric_data, + }) + + if len(bulk_data) >= batch_size: + self.index_documents( + index_name=index_name, + documents={doc["_id"]: doc["_source"] for doc in bulk_data}, + ) + bulk_data = [] + + self.index_documents( + index_name=index_name, + documents={doc["_id"]: doc["_source"] for doc in bulk_data}, + ) diff --git a/metrics/migrations/0008_remove_a_few_models.py b/metrics/migrations/0008_remove_a_few_models.py new file mode 100644 index 0000000..dfd14ec --- /dev/null +++ b/metrics/migrations/0008_remove_a_few_models.py @@ -0,0 +1,48 @@ +# Generated by Django 5.0.7 on 2025-06-22 17:45 + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("metrics", "0007_alter_usersession_datetime_and_more"), + ] + + operations = [ + migrations.AlterUniqueTogether( + name="itemaccess", + unique_together=None, + ), + migrations.AlterUniqueTogether( + name="useragent", + unique_together=None, + ), + migrations.AlterUniqueTogether( + name="usersession", + unique_together=None, + ), + migrations.RemoveField( + model_name="itemaccess", + name="user_session", + ), + migrations.RemoveField( + model_name="usersession", + name="user_agent", + ), + migrations.RemoveField( + model_name="itemaccess", + name="item", + ), + migrations.DeleteModel( + name="Item", + ), + migrations.DeleteModel( + name="ItemAccess", + ), + migrations.DeleteModel( + name="UserAgent", + ), + migrations.DeleteModel( + name="UserSession", + ), + ] diff --git a/metrics/models.py b/metrics/models.py index d3e1c0b..e69de29 100644 --- a/metrics/models.py +++ b/metrics/models.py @@ -1,214 +0,0 @@ -from django.db import models -from django.utils.translation import gettext_lazy as _ - -from collection.models import Collection -from journal.models import Journal -from article.models import Article - - -class Item(models.Model): - collection = models.ForeignKey( - Collection, - verbose_name=_("Collection"), - null=False, - blank=False, - on_delete=models.CASCADE, - db_index=True, - ) - - journal = models.ForeignKey( - Journal, - verbose_name=_("Journal"), - null=False, - blank=False, - on_delete=models.CASCADE, - db_index=True, - ) - - article = models.ForeignKey( - Article, - verbose_name=_("Article"), - null=False, - blank=False, - on_delete=models.CASCADE, - db_index=True, - ) - - def __str__(self): - return '|'.join([ - self.collection.acron3, - self.journal.acronym, - self.article.pid_v2 or self.article.pid_v3 or self.article.pid_generic, - ]) - - class Meta: - verbose_name = _("Item") - verbose_name_plural = _("Items") - indexes = [ - models.Index(fields=['collection', 'journal', 'article']), - models.Index(fields=['collection', 'journal']), - ] - unique_together = ( - 'collection', - 'journal', - 'article', - ) - - -class UserAgent(models.Model): - name = models.CharField( - verbose_name=_("Name"), - max_length=255, - null=False, - blank=False, - db_index=True, - ) - - version = models.CharField( - verbose_name=_("Version"), - max_length=255, - null=False, - blank=False, - db_index=True, - ) - - def __str__(self): - return f"{self.name} {self.version}" - - class Meta: - verbose_name = _("User Agent") - verbose_name_plural = _("User Agents") - unique_together = ( - 'name', - 'version', - ) - - -class UserSession(models.Model): - datetime = models.DateTimeField( - verbose_name=_("Datetime"), - null=False, - blank=False, - db_index=True, - ) - - user_agent = models.ForeignKey( - UserAgent, - verbose_name=_("User Agent"), - null=False, - blank=False, - on_delete=models.CASCADE, - db_index=True, - ) - - user_ip = models.CharField( - verbose_name=_("User IP"), - max_length=255, - null=False, - blank=False, - db_index=True, - ) - - def user_session(self): - return '|'.join([ - self.user_agent.name, - self.user_agent.version, - self.user_ip, - self.datetime.strftime('%Y-%m-%d'), - self.datetime.strftime('%H'), - ]) - - def __str__(self): - return self.user_session() - - class Meta: - verbose_name = _("User Session") - verbose_name_plural = _("User Sessions") - unique_together = ( - 'datetime', - 'user_agent', - 'user_ip', - ) - - -class ItemAccess(models.Model): - item = models.ForeignKey( - 'Item', - verbose_name=_("Item"), - null=False, - blank=False, - on_delete=models.CASCADE, - db_index=True, - ) - - user_session = models.ForeignKey( - 'UserSession', - verbose_name=_("User Session"), - null=False, - blank=False, - on_delete=models.CASCADE, - db_index=True, - ) - - country_code = models.CharField( - verbose_name=_("Country"), - max_length=2, - null=False, - blank=False, - db_index=True, - ) - - media_language = models.CharField( - verbose_name=_("Media Language"), - max_length=2, - null=False, - blank=False, - db_index=True, - ) - - media_format = models.CharField( - verbose_name=_("Media Format"), - max_length=10, - null=False, - blank=False, - db_index=True, - ) - - content_type = models.CharField( - verbose_name=_("Content Type"), - max_length=32, - null=False, - blank=False, - ) - - click_timestamps = models.JSONField( - verbose_name=_("Click Timestamps"), - default=dict, - ) - - def __str__(self): - return '|'.join([ - self.item.collection.acron3, - self.item.journal.acronym, - self.item.article.pid_v2 or self.item.article.pid_v3 or self.item.article.pid_generic, - self.user_session.user_session(), - self.country_code, - self.media_language, - self.media_format, - self.content_type, - ]) - - class Meta: - verbose_name = _("Item Access") - verbose_name_plural = _("Items Access") - unique_together = ( - 'item', - 'user_session', - 'country_code', - 'media_format', - 'media_language', - 'content_type', - ) - indexes = [ - models.Index(fields=['item', 'user_session']), - ] diff --git a/metrics/tasks.py b/metrics/tasks.py index 78e1220..b850e8c 100644 --- a/metrics/tasks.py +++ b/metrics/tasks.py @@ -1,61 +1,43 @@ -from collections import defaultdict +import logging from django.conf import settings from django.contrib.auth import get_user_model -from django.utils import timezone from django.utils.translation import gettext as _ from scielo_usage_counter import log_handler from scielo_usage_counter import url_translator -from scielo_usage_counter.counter import compute_r5_metrics -from core.utils.utils import _get_user -from core.utils.date_utils import ( - get_date_obj, - get_date_str, - get_date_range_str, - get_date_objs_from_date_range, - extract_minute_second_key, - truncate_datetime_to_hour, -) from config import celery_app - +from core.utils.utils import _get_user +from core.utils.date_utils import get_date_obj, get_date_range_str from article.models import Article -from core.utils import standardizer from collection.models import Collection from journal.models import Journal from log_manager import choices -from log_manager_config.models import ( - CollectionURLTranslatorClass, - CollectionLogDirectory, -) +from log_manager_config.models import CollectionURLTranslatorClass, CollectionLogFilesPerDay, CollectionLogDirectory from log_manager.models import LogFile, CollectionLogFileDateCount, LogFileDate from resources.models import MMDB, RobotUserAgent from tracker.models import LogFileDiscardedLine -from tracker import choices as tracker_choices +from tracker.choices import LOG_FILE_DISCARDED_LINE_REASON_MISSING_ARTICLE, LOG_FILE_DISCARDED_LINE_REASON_MISSING_JOURNAL -from .es import create_index, delete_documents_by_key, index_documents, get_elasticsearch_client - -from .utils import ( - is_valid_item_access_data, - translator_class_name_to_obj, -) -from .models import UserAgent, UserSession, Item, ItemAccess - -import logging -import time +from . import es +from .utils import parser_utils, index_utils User = get_user_model() @celery_app.task(bind=True, name=_('Parse logs'), timelimit=-1) -def task_parse_logs(self, collections=[], from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None): +def task_parse_logs(self, collections=[], include_logs_with_error=True, batch_size=5000, replace=False, track_errors=False, from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None): """ Parses log files associated with a given collection. Args: collections (list, optional): List of collection acronyms to parse logs for. Defaults to all collections. + include_logs_with_error (bool, optional): Whether to include logs with errors. Defaults to True. + batch_size (int, optional): Number of records to process in a single batch. Defaults to 5000. + replace (bool, optional): Whether to replace existing records. Defaults to False. + track_errors (bool, optional): Whether to track errors in log parsing. Defaults to False. from_date (str, optional): Start date for log parsing in 'YYYY-MM-DD' format. Defaults to None. until_date (str, optional): End date for log parsing in 'YYYY-MM-DD' format. Defaults to None. days_to_go_back (int, optional): Number of days to go back from the current date to parse logs. Defaults to None. @@ -70,45 +52,37 @@ def task_parse_logs(self, collections=[], from_date=None, until_date=None, days_ from_date_obj = get_date_obj(from_date) until_date_obj = get_date_obj(until_date) + # Set status filters based on the include_logs_with_error and replace flags + status_filters = [choices.LOG_FILE_STATUS_QUEUED] + if include_logs_with_error: + status_filters.append(choices.LOG_FILE_STATUS_ERROR) + if replace: + status_filters.append(choices.LOG_FILE_STATUS_PROCESSED) + for collection in collections or Collection.acron3_list(): - for lf in LogFile.objects.filter(status=choices.LOG_FILE_STATUS_QUEUED, collection__acron3=collection): - probably_date = _extract_date_from_validation(lf.validation) + for lf in LogFile.objects.filter(status__in=status_filters, collection__acron3=collection): + probably_date = parser_utils.extract_date_from_validation_dict(lf.validation) if not probably_date: logging.debug(f'Log file {lf.path} does not have a valid probably date.') continue - if probably_date <= from_date_obj or probably_date >= until_date_obj: + if probably_date < from_date_obj or probably_date > until_date_obj: continue logging.info(f'PARSING file {lf.path}') - task_parse_log.apply_async(args=(lf.hash, user_id, username)) - - -def _extract_date_from_validation(validation): - """ - Extracts the date from the validation dict of a log file. - - Args: - validation (dict): The validation dict of the log file. - - Returns: - datetime.date: The extracted date. - """ - try: - date_str = validation.get('probably_date') - return get_date_obj(date_str, '%Y-%m-%d') - except Exception as e: - logging.error(f"Failed to extract date from validation: {e}") - return None + task_parse_log.apply_async(args=(lf.hash, batch_size, replace, track_errors, user_id, username)) @celery_app.task(bind=True, name=_('Parse one log'), timelimit=-1) -def task_parse_log(self, log_file_hash, user_id=None, username=None): +def task_parse_log(self, log_file_hash, batch_size=5000, replace=False, track_errors=False, user_id=None, username=None): """ Parses a log file, extracts relevant information, and creates processed log records in the database. Args: log_file_hash (str): Hash representing the log file to be parsed. + batch_size (int, optional): Number of records to process in a single batch. Defaults to 5000. + replace (bool, optional): Whether to replace existing records. Defaults to False. + track_errors (bool, optional): Whether to track errors in log parsing. Defaults to False. user_id username @@ -123,19 +97,52 @@ def task_parse_log(self, log_file_hash, user_id=None, username=None): log_file = _initialize_log_file(log_file_hash) if not log_file: return + + clfdc = create_or_update_collection_log_file_date_count( + user=user, + collection=log_file.collection, + date=get_date_obj(log_file.validation.get('probably_date')) + ) + + if not replace and clfdc.is_usage_metric_computed: + logging.info(f'Usage metric already computed for {log_file.validation.get("probably_date")}') + return + + if replace: + clfdc.exported_files_count = 0 + clfdc.is_usage_metric_computed = False + clfdc.save() log_parser, url_translator_manager = _setup_parsing_environment(log_file, robots_list, mmdb) - success = _process_lines(log_parser, url_translator_manager, log_file) + success = _process_lines(lp=log_parser, utm=url_translator_manager, log_file=log_file, batch_size=batch_size, replace=replace, track_errors=track_errors) if not success: logging.error(f'Failed to parse log file {log_file.path}.') + log_file.status = choices.LOG_FILE_STATUS_ERROR + log_file.save() return log_file.status = choices.LOG_FILE_STATUS_PROCESSED log_file.save() + + _update_exported_files_count(clfdc) + logging.info(f'Log file {log_file.path} has been successfully parsed.') +def create_or_update_collection_log_file_date_count(user, collection, date): + n_expected_files = CollectionLogFilesPerDay.get_number_of_expected_files_by_day(collection=collection.acron3, date=date) + n_found_logs = LogFileDate.get_number_of_found_files_for_date(collection=collection.acron3, date=date) + + return CollectionLogFileDateCount.create_or_update( + user=user, + collection=collection, + date=date, + expected_log_files=n_expected_files, + found_log_files=n_found_logs, + ) + + def _initialize_log_file(log_file_hash): """ Initializes the log file for parsing by setting its status to 'parsing'. @@ -196,7 +203,7 @@ def _setup_parsing_environment(log_file, robots_list, mmdb): if cld.path in log_file.path: try: translator_class_name = CollectionURLTranslatorClass.objects.get(collection=log_file.collection, directory=cld).translator_class - translator_class = translator_class_name_to_obj(translator_class_name) + translator_class = parser_utils.translator_class_name_to_obj(translator_class_name) break except CollectionURLTranslatorClass.DoesNotExist: continue @@ -213,34 +220,7 @@ def _setup_parsing_environment(log_file, robots_list, mmdb): return lp, utm -def _fetch_art_jou_ids(utm, item_access_data): - """ - Fetches the journal and article IDs based on the item access data. - - Args: - utm (URLTranslationManager): The URL translation manager instance. - item_access_data (dict): A dictionary containing item access data, including ISSN and PIDs. - - Returns: - tuple: A tuple containing the journal ID and article ID, or (None, None) if not found. - """ - issn = item_access_data.get('scielo_issn') - if not issn: - return (None, None) - - pid_v2 = item_access_data.get('pid_v2') - pid_v3 = item_access_data.get('pid_v3') - pid_generic = item_access_data.get('pid_generic') - if not issn or not pid_v2 and not pid_v3 and not pid_generic: - return (None, None) - - jou_db_id = utm.journals_metadata['issn_to_db_id'].get(issn) - art_db_id = utm.articles_metadata['pid_v2_to_db_id'].get(pid_v2) or utm.articles_metadata['pid_v3_to_db_id'].get(pid_v3) or utm.articles_metadata['pid_generic_to_db_id'].get(pid_generic) - - return (jou_db_id, art_db_id) - - -def _process_lines(lp, utm, log_file): +def _process_lines(lp, utm, log_file, batch_size=5000, replace=False, track_errors=False): """ Processes each line of the log file, translating URLs and registering item accesses. @@ -248,221 +228,178 @@ def _process_lines(lp, utm, log_file): lp (LogParser): The log parser instance. utm (URLTranslationManager): The URL translation manager instance. log_file (LogFile): The log file being processed. + batch_size (int, optional): Number of records to process in a single batch. Defaults to 5000. + replace (bool, optional): Whether to replace existing records. Defaults to False. + track_errors (bool, optional): Whether to track errors in log parsing. Defaults to False. Returns: None. """ logging.info(f'Processing {lp.logfile}') + results = {} + errors = [] + + jump = log_file.last_processed_line if not replace else 0 + + es_manager = es.ElasticSearchUsageWrapper( + settings.ES_URL, + settings.ES_BASIC_AUTH, + settings.ES_API_KEY, + settings.ES_VERIFY_CERTS + ) + + if not es_manager.ping(): + logging.error('Elasticsearch client is not available.') + return False + + index_name = index_utils.generate_index_name( + index_prefix=settings.ES_INDEX_NAME, + collection=log_file.collection.acron3, + date=log_file.validation.get('probably_date') + ) + + es_manager.create_index_if_not_exists(index_name=index_name) + + if replace: + logging.info(f'Removing existing documents for collection {log_file.collection.acron3} and date {log_file.validation.get("probably_date")}') + delete_success = es_manager.delete_documents_by_key( + index_name=index_name, + data={'collection': log_file.collection.acron3, 'date': log_file.validation.get('probably_date')}, + ) + if not delete_success: + logging.error(f'Failed to delete existing documents for collection {log_file.collection.acron3} and date {log_file.validation.get("probably_date")}') + return False + for line in lp.parse(): - if not _process_line(line, utm, log_file): + if lp.stats.lines_parsed < jump: continue + if lp.stats.lines_parsed % batch_size == 0: + logging.info(f'Processing line {lp.stats.lines_parsed} of {lp.logfile}') + + is_valid_line, error_obj = _process_line(results, line, utm, log_file, track_errors) + if not is_valid_line: + if error_obj: + errors.append(error_obj) + + if len(errors) >= batch_size: + LogFileDiscardedLine.objects.bulk_create(errors) + errors = [] + continue + + if len(results) >= batch_size: + logging.info(f'Indexing data for log file {log_file.path}') + es_manager.export_to_index( + index_name=index_name, + data=results, + batch_size=batch_size + ) + results = {} + + _update_log_file_summary(log_file, lp.stats.get_stats()) + + logging.info(f'Indexing data for log file {log_file.path}') + es_manager.export_to_index( + index_name=index_name, + data=results, + batch_size=batch_size + ) + results = {} + + LogFileDiscardedLine.objects.bulk_create(errors) if errors else None + errors = [] + + _update_log_file_summary(log_file, lp.stats.get_stats()) + return True -def _process_line(line, utm, log_file): +def _update_log_file_summary(log_file, stats): + if not stats: + logging.warning(f'No stats available for log file {log_file.path}. Skipping summary update.') + return + + summary_k, summary_v = stats + log_file.summary = dict(zip(summary_k, summary_v)) + log_file.last_processed_line = log_file.summary.get('lines_parsed', 0) + log_file.save() + + +def _update_exported_files_count(collection_log_file_date: CollectionLogFileDateCount): + collection_log_file_date.exported_files_count += 1 + collection_log_file_date.set_is_usage_metric_computed() + collection_log_file_date.save() + + +def _process_line(results, line, utm, log_file, track_errors=False): """ - Processes a single line from the log file, translating the URL and registering item access if valid. + Process a single log line to extract and validate item access data. + This function translates a URL from the log line, extracts item access data, + validates the data, and updates the results if the data is valid. Args: - line (dict): A dictionary representing a single log line. - utm (URLTranslationManager): The URL translation manager instance. - log_file (LogFile): The log file being processed. + results: Dictionary or data structure to store processed results + line (dict): Log line containing URL and other access information + utm: URL translation manager for converting URLs + log_file: Log file object containing collection information (must have collection.acron3) + track_errors (bool): Whether to track errors in log parsing. Returns: - bool: True if the line was processed successfully, False otherwise. + tuple: A tuple containing a boolean indicating success or failure, and an optional LogFileDiscardedLine object. + + Raises: + Logs errors for URL translation failures and item access data extraction failures. + Logs debug messages for invalid item access data. """ try: translated_url = utm.translate(line.get('url')) except Exception as e: - _log_discarded_line(log_file, line, tracker_choices.LOG_FILE_DISCARDED_LINE_REASON_URL_TRANSLATION, str(e)) - return False + logging.error(f'Error translating URL {line.get("url")}: {e}') + return False, None try: - item_access_data = _extract_item_access_data(log_file.collection.acron3, translated_url) + item_access_data = index_utils.extract_item_access_data(log_file.collection.acron3, translated_url) except Exception as e: - _log_discarded_line(log_file, line, tracker_choices.LOG_FILE_DISCARDED_LINE_REASON_URL_TRANSLATION, str(e)) - return False - - if not is_valid_item_access_data(item_access_data): - _log_discarded_line( - log_file, line, - tracker_choices.LOG_FILE_DISCARDED_LINE_REASON_MISSING_METADATA, - _(f'It was not possible to identify the necessary information for the URL {line.get("url")}') - ) - return False - - jou_id, art_id = _fetch_art_jou_ids(utm, item_access_data) - - if not jou_id: - _log_discarded_line( - log_file, line, - tracker_choices.LOG_FILE_DISCARDED_LINE_REASON_MISSING_JOURNAL, - _('There is no journal registered for the given ISSN') - ) - return False - - if not art_id: - _log_discarded_line( - log_file, line, - tracker_choices.LOG_FILE_DISCARDED_LINE_REASON_MISSING_ARTICLE, - _('There is no article registered for the given PID') - ) - return False - - try: - _register_item_access(item_access_data, line, jou_id, art_id) - except Exception as e: - _log_discarded_line(log_file, line, tracker_choices.LOG_FILE_DISCARDED_LINE_REASON_DATABASE_ERROR, str(e)) - return False - - return True - - -def _extract_item_access_data(collection_acron3, translated_url): - """ - Extracts item access data from the translated URL and standardizes it. - - Args: - collection_acron3 (str): The acronym of the collection. - translated_url (dict): The translated URL containing metadata. + logging.error(f'Error extracting item access data from URL {line.get("url")}: {e}') + return False, None - Returns: - dict: A dictionary containing standardized item access data, or None if the data is invalid. - """ - item_access_data = { - 'collection': collection_acron3, - 'scielo_issn': translated_url.get('scielo_issn'), - 'pid_v2': standardizer.standardize_pid_v2(translated_url.get('pid_v2')), - 'pid_v3': standardizer.standardize_pid_v3(translated_url.get('pid_v3')), - 'pid_generic': standardizer.standardize_pid_generic(translated_url.get('pid_generic')), - 'media_language': standardizer.standardize_language_code(translated_url.get('media_language')), - 'media_format': translated_url.get('media_format'), - 'content_type': translated_url.get('content_type'), - } + ignore_utm_validation = not track_errors + is_valid, check_result = index_utils.is_valid_item_access_data(item_access_data, utm, ignore_utm_validation) + + if not is_valid: + if track_errors: + error_code = check_result.get('code') + + if error_code in { + 'invalid_scielo_issn', + 'invalid_pid_v3', + 'invalid_pid_v2', + 'invalid_pid_generic' + }: + if 'pid' in error_code: + tracker_error_type = LOG_FILE_DISCARDED_LINE_REASON_MISSING_ARTICLE + else: + tracker_error_type = LOG_FILE_DISCARDED_LINE_REASON_MISSING_JOURNAL + + lfdl = LogFileDiscardedLine.create( + log_file=log_file, + error_type=tracker_error_type, + message=check_result.get('message'), + data={'line': line, 'item_access_data': item_access_data}, + save=False, + ) + logging.debug(f'Invalid item access data: {check_result.get("message")}. Line: {line}. Item Access Data: {item_access_data}') + return False, lfdl - return item_access_data - - -def _register_item_access(item_access_data, line, jou_id, art_id): - """ - Registers an item access in the database, creating necessary objects if they do not exist. - - Args: - item_access_data (dict): A dictionary containing item access data, including collection, ISSN, PIDs, media format, language, and content type. - line (dict): The log line being processed. - jou_id (int): The journal ID. - art_id (int): The article ID. - - Returns: - None. - """ - col_acron3 = item_access_data.get('collection') - media_format = item_access_data.get('media_format') - media_language = item_access_data.get('media_language') - content_type = item_access_data.get('content_type') - - client_name = line.get('client_name') - client_version = line.get('client_version') - local_datetime = line.get('local_datetime') - country_code = line.get('country_code') - ip_address = line.get('ip_address') - - truncated_datetime = truncate_datetime_to_hour(local_datetime) - if timezone.is_naive(truncated_datetime): - truncated_datetime = timezone.make_aware(truncated_datetime) - ms_key = extract_minute_second_key(local_datetime) - - it = _get_or_create_item(col_acron3, jou_id, art_id) - ua = _get_or_create_user_agent(client_name, client_version) - us = _get_or_create_user_session(truncated_datetime, ua, ip_address) - ita = _get_or_create_item_access(it, us, media_format, media_language, country_code, content_type, ms_key) - - ita.click_timestamps[ms_key] = ita.click_timestamps.get(ms_key, 0) + 1 - ita.save() - - -def _get_or_create_item(col_acron3, jou_id, art_id, max_retries=3): - for attempt in range(max_retries): - try: - collection_obj = Collection.objects.get(acron3=col_acron3) - journal_obj = Journal.objects.get(id=jou_id) - article_obj = Article.objects.get(id=art_id) - it, _ = Item.objects.get_or_create( - collection=collection_obj, - journal=journal_obj, - article=article_obj, - ) - return it - except Exception as e: - if attempt == max_retries - 1: - raise - time.sleep(0.1) - return None - - -def _get_or_create_user_agent(client_name, client_version, max_retries=3): - for attempt in range(max_retries): - try: - ua, _ = UserAgent.objects.get_or_create( - name=client_name, - version=client_version - ) - return ua - except Exception as e: - if attempt == max_retries - 1: - raise - time.sleep(0.1) - return None - - -def _get_or_create_user_session(truncated_datetime, ua, ip_address, max_retries=3): - for attempt in range(max_retries): - try: - us, _ = UserSession.objects.get_or_create( - datetime=truncated_datetime, - user_agent=ua, - user_ip=ip_address - ) - return us - except Exception as e: - if attempt == max_retries - 1: - raise - time.sleep(0.1) - return None - - -def _get_or_create_item_access(it, us, media_format, media_language, country_code, content_type, ms_key, max_retries=3): - for attempt in range(max_retries): - try: - ita, _ = ItemAccess.objects.get_or_create( - item=it, - user_session=us, - media_format=media_format, - media_language=media_language, - country_code=country_code, - content_type=content_type, - defaults={'click_timestamps': {ms_key: 0}} - ) - return ita - except Exception as e: - if attempt == max_retries - 1: - raise - time.sleep(0.1) - return None - - -def _log_discarded_line(log_file, line, error_type, message): - """ - Logs a discarded line from the log file and creates a record in the database. + return False, None + + index_utils.update_results_with_item_access_data( + results, + item_access_data, + line + ) - Args: - log_file (LogFile): The log file being processed. - line (dict): The log line that was discarded. - error_type (str): The type of error that caused the line to be discarded. - message (str): A message describing the reason for discarding the line. - """ - LogFileDiscardedLine.create(log_file=log_file, data=line, error_type=error_type, message=message) + return True, None @celery_app.task(bind=True, name=_('Create index'), timelimit=-1) @@ -480,14 +417,19 @@ def task_create_index(self, index_name, mappings=None, user_id=None, username=No None. """ user = _get_user(self.request, username=username, user_id=user_id) - es_client = get_elasticsearch_client(settings.ES_URL, settings.ES_BASIC_AUTH, settings.ES_API_KEY) + es_manager = es.ElasticSearchUsageWrapper( + settings.ES_URL, + settings.ES_BASIC_AUTH, + settings.ES_API_KEY, + settings.ES_VERIFY_CERTS + ) try: - if es_client.indices.exists(index=index_name): + if es_manager.client.indices.exists(index=index_name): logging.info(f"Index {index_name} already exists.") return - create_index(client=es_client, index_name=index_name, mappings=mappings) + es_manager.create_index(index_name=index_name, mappings=mappings) logging.info(f"Index {index_name} created successfully.") except Exception as e: logging.error(f"Failed to create index {index_name}: {e}") @@ -507,28 +449,32 @@ def task_delete_index(self, index_name, user_id=None, username=None): None. """ user = _get_user(self.request, username=username, user_id=user_id) - es_client = get_elasticsearch_client(settings.ES_URL, settings.ES_BASIC_AUTH, settings.ES_API_KEY) + es_manager = es.ElasticSearchUsageWrapper( + settings.ES_URL, + settings.ES_BASIC_AUTH, + settings.ES_API_KEY, + settings.ES_VERIFY_CERTS + ) try: - if not es_client.indices.exists(index=index_name): + if not es_manager.client.indices.exists(index=index_name): logging.info(f"Index {index_name} does not exist.") return - es_client.indices.delete(index=index_name) + es_manager.client.indices.delete(index=index_name) logging.info(f"Index {index_name} deleted successfully.") except Exception as e: logging.error(f"Failed to delete index {index_name}: {e}") @celery_app.task(bind=True, name=_('Delete documents by key'), timelimit=-1) -def task_delete_documents_by_key(self, keys, values, index_name=None, user_id=None, username=None): +def task_delete_documents_by_key(self, index_name, data, user_id=None, username=None): """ Deletes documents from Elasticsearch based on the provided keys and values. Args: - keys (list): List of document keys to delete. - values (dict): Additional values to filter documents for deletion. This is required. - index_name (str, optional): The name of the Elasticsearch index. Defaults to settings.ES_INDEX_NAME. + index_name (str): The name of the Elasticsearch index. Defaults to settings.ES_INDEX_NAME. + data (dict): A dictionary where keys are field names and values are the corresponding values to match for deletion. user_id (int, optional): The ID of the user initiating the task. Defaults to None. username (str, optional): The username of the user initiating the task. Defaults to None. @@ -536,280 +482,18 @@ def task_delete_documents_by_key(self, keys, values, index_name=None, user_id=No None. """ user = _get_user(self.request, username=username, user_id=user_id) - es_client = get_elasticsearch_client(settings.ES_URL, settings.ES_BASIC_AUTH, settings.ES_API_KEY) - - if not index_name: - index_name = settings.ES_INDEX_NAME + es_manager = es.ElasticSearchUsageWrapper( + settings.ES_URL, + settings.ES_BASIC_AUTH, + settings.ES_API_KEY, + settings.ES_VERIFY_CERTS + ) try: - delete_documents_by_key(client=es_client, index_name=index_name, keys=keys, values=values) - logging.info(f"Successfully deleted documents with keys: {keys} and values: {values} from index {index_name}.") + es_manager.delete_documents_by_key( + index_name=index_name, + data=data, + ) + logging.info(f"Successfully deleted documents with data: {data} from index {index_name}.") except Exception as e: - logging.error(f"Failed to delete documents with keys {keys} and values {values} from index {index_name}: {e}") - - -@celery_app.task(bind=True, name=_('Index metrics'), timelimit=-1) -def task_index_documents(self, collections=[], from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None, bulk_size=5000, replace=False): - """ - Task to compute and index metrics for specified collections within a given date range. - - This task retrieves metrics for the specified collections and indexes them into an Elasticsearch - index. The metrics are computed for the provided date range or a range derived from the given - parameters. - - Args: - collections (list, optional): List of collection identifiers to compute metrics for. Defaults to an empty list. - from_date (str, optional): Start date for the metrics computation in 'YYYY-MM-DD' format. Defaults to None. - until_date (str, optional): End date for the metrics computation in 'YYYY-MM-DD' format. Defaults to None. - days_to_go_back (int, optional): Number of days to go back from the current date to compute metrics. Defaults to None. - user_id (int, optional): ID of the user initiating the task. Defaults to None. - username (str, optional): Username of the user initiating the task. Defaults to None. - bulk_size (int, optional): Number of documents to send in each bulk request to Elasticsearch. Defaults to 5000. - replace (bool, optional): If True, replaces existing documents in Elasticsearch. Defaults to False. - - Raises: - Exception: Logs errors if bulk indexing to Elasticsearch fails. - - Notes: - - If no collections are provided, the task will compute metrics for all collections. - - The date range is determined by the combination of `from_date`, `until_date`, and `days_to_go_back`. - - Metrics are computed and indexed in bulk to optimize performance. - """ - user = _get_user(self.request, username=username, user_id=user_id) - - if not collections: - collections = Collection.acron3_list() - - from_date_str, until_date_str = get_date_range_str(from_date, until_date, days_to_go_back) - dates = get_date_objs_from_date_range(from_date_str, until_date_str) - - es_client = get_elasticsearch_client(settings.ES_URL, settings.ES_BASIC_AUTH, settings.ES_API_KEY) - - for collection in collections: - logging.info(f'Computing metrics for collection {collection} from {from_date_str} to {until_date_str}') - - clfdc_to_update = [] - - bulk_data = [] - metrics_result = compute_metrics_for_collection(collection, dates, replace, clfdc_to_update) - - for key, metric_data in metrics_result.items(): - bulk_data.append({ - "_id": key, - "_source": metric_data, - }) - - if len(bulk_data) >= bulk_size: - try: - index_documents( - index_name=settings.ES_INDEX_NAME, - documents={doc["_id"]: doc["_source"] for doc in bulk_data}, - client=es_client, - ) - bulk_data = [] - except Exception as e: - logging.error(f"Failed to send bulk metrics to Elasticsearch: {e}") - clfdc_to_update = [] - - if bulk_data: - try: - index_documents( - index_name=settings.ES_INDEX_NAME, - documents={doc["_id"]: doc["_source"] for doc in bulk_data}, - client=es_client, - ) - except Exception as e: - logging.error(f"Failed to send remaining bulk metrics to Elasticsearch: {e}") - clfdc_to_update = [] - - for clfdc in clfdc_to_update: - clfdc.is_usage_metric_computed = True - clfdc.save() - - -def compute_metrics_for_collection(collection, dates, replace=False, clfdc_to_update=None): - """ - Computes usage metrics for a given collection over a range of dates. - - Args: - collection (str): The acronym of the collection for which metrics - are to be computed. - dates (list[datetime.date]): A list of dates for which metrics - should be computed. - replace (bool, optional): A flag indicating whether to replace - existing metrics. Defaults to False. - clfdc_to_update (list, optional): List to append clfdc objects that should be marked as computed after successful export. - - Returns: - dict: A dictionary containing computed metrics, keyed by a - generated usage key. - """ - data = {} - - if clfdc_to_update is None: - clfdc_to_update = [] - - for date in dates: - date_str = get_date_str(date) - - if not _is_valid_log_file_status(collection, date_str): - continue - - is_valid, clfdc = _is_valid_collection_log_file_date(collection, date_str, replace) - if not is_valid: - continue - - logging.info(f"Computing metrics for {date_str}") - _process_user_sessions(collection, date, date_str, data) - clfdc_to_update.append(clfdc) - - return data - - -def _is_valid_collection_log_file_date(collection, date_str, replace): - """ - Checks if the CollectionLogFileDateCount exists and is valid for the given date. - - Args: - collection (str): The acronym of the collection. - date_str (str): The date string in 'YYYY-MM-DD' format. - replace (bool): Whether to replace existing metrics. - - Returns: - tuple: A tuple containing a boolean indicating if the date is valid and the CollectionLogFileDateCount object if it exists. - """ - try: - clfdc = CollectionLogFileDateCount.objects.get(date=date_str, collection__acron3=collection) - - if clfdc.status != choices.COLLECTION_LOG_FILE_DATE_COUNT_OK: - logging.info(f'CollectionLogFileDateCount status is not OK for {date_str}') - return False, None - - if not replace and clfdc.is_usage_metric_computed: - logging.info(f'Usage metric already computed for {date_str}') - return False, None - - return True, clfdc - - except CollectionLogFileDateCount.DoesNotExist: - logging.info(f'CollectionLogFileDateCount does not exist for {date_str}') - return False, None - - -def _is_valid_log_file_status(collection, date_str): - """ - Checks if all LogFileDate objects for the given date and collection have a valid status. - - Args: - collection (str): The acronym of the collection. - date_str (str): The date string in 'YYYY-MM-DD' format. - - Returns: - bool: True if all LogFileDate objects have a valid status, False otherwise. - """ - for lfd in LogFileDate.objects.filter(date=date_str, log_file__collection__acron3=collection): - if lfd.log_file.status not in (choices.LOG_FILE_STATUS_INVALIDATED, choices.LOG_FILE_STATUS_PROCESSED): - logging.info(f'LogFile status is not PROCESSED for {date_str}') - return False - return True - - -def _process_user_sessions(collection, date, date_str, data): - """ - Processes user sessions for a given collection and date, computing metrics for each item access. - - Args: - collection (str): The acronym of the collection. - date (datetime.date): The date for which to compute metrics. - date_str (str): The date string in 'YYYY-MM-DD' format. - data (dict): A dictionary to store computed metrics. - """ - all_item_accesses = ItemAccess.objects.filter( - item__collection__acron3=collection, - user_session__datetime__date=date_str - ).select_related( - 'item__journal', - 'item__article', - 'item__collection', - 'user_session' - ).only( - 'item__journal__scielo_issn', - 'item__article__pid_v2', - 'item__article__pid_v3', - 'item__article__pid_generic', - 'item__collection__acron3', - 'media_language', - 'country_code', - 'click_timestamps', - 'content_type', - 'user_session__datetime', - 'user_session__user_agent__name', - 'user_session__user_agent__version', - 'user_session__user_ip', - ).iterator() - - user_sessions_data = defaultdict(list) - for item_access in all_item_accesses: - if item_access.item.collection.acron3 != collection: - continue - user_sessions_data[item_access.user_session].append(item_access) - - for user_session, item_accesses_list in user_sessions_data.items(): - for item_access in item_accesses_list: - key = _generate_usage_key( - collection, - item_access.item.journal.scielo_issn, - item_access.item.article.pid_v2 or '', - item_access.item.article.pid_v3 or '', - item_access.item.article.pid_generic or '', - item_access.media_language, - item_access.country_code, - date_str, - ) - - compute_r5_metrics( - key, - data, - collection, - item_access.item.journal.scielo_issn, - item_access.item.article.pid_v2 or '', - item_access.item.article.pid_v3 or '', - item_access.item.article.pid_generic or '', - item_access.media_language, - item_access.country_code, - date_str, - date.year, - date.month, - date.day, - item_access.click_timestamps, - item_access.content_type, - ) - - return True - - -def _generate_usage_key(collection, journal, pid_v2, pid_v3, pid_generic, media_language, country_code, date_str): - """" - Generates a unique key for the given parameters. - - :param collection: collection acrononym with 3 characters - :param journal: journal ISSN (e.g., scielo_issn) - :param pid_v2: PID v2 - :param pid_v3: PID v3 - :param pid_generic: generic PID - :param media_language: media language - :param country_code: country code - :param date_str: date string in the format YYYY-MM-DD - - :return: a string that uniquely identifies the combination of parameters - """ - return '|'.join([ - collection, - journal, - pid_v2 or '', - pid_v3 or '', - pid_generic or '', - media_language, - country_code, - date_str - ]) + logging.error(f"Failed to delete documents with data {data} from index {index_name}: {e}") diff --git a/metrics/tests/test_utils.py b/metrics/tests/test_index_utils.py similarity index 76% rename from metrics/tests/test_utils.py rename to metrics/tests/test_index_utils.py index cff448e..47f1a0e 100644 --- a/metrics/tests/test_utils.py +++ b/metrics/tests/test_index_utils.py @@ -10,10 +10,10 @@ DEFAULT_SCIELO_ISSN, ) -from metrics.utils import is_valid_item_access_data +from metrics.utils import index_utils -class TestUtils(unittest.TestCase): +class TestIndexUtils(unittest.TestCase): def test_is_valid_item_access_data_valid(self): data = { 'scielo_issn': '1234-5678', @@ -22,7 +22,8 @@ def test_is_valid_item_access_data_valid(self): 'media_format': MEDIA_FORMAT_PDF, 'content_type': CONTENT_TYPE_FULL_TEXT, } - self.assertTrue(is_valid_item_access_data(data)) + result, _ = index_utils.is_valid_item_access_data(data) + self.assertTrue(result) def test_is_valid_item_access_data_missing_scielo_issn(self): data = { @@ -32,7 +33,8 @@ def test_is_valid_item_access_data_missing_scielo_issn(self): 'media_format': MEDIA_FORMAT_PDF, 'content_type': CONTENT_TYPE_FULL_TEXT, } - self.assertFalse(is_valid_item_access_data(data)) + result, _ = index_utils.is_valid_item_access_data(data) + self.assertFalse(result) def test_is_valid_item_access_data_undefined_media_format(self): data = { @@ -42,7 +44,8 @@ def test_is_valid_item_access_data_undefined_media_format(self): 'media_format': MEDIA_FORMAT_UNDEFINED, 'content_type': CONTENT_TYPE_FULL_TEXT, } - self.assertFalse(is_valid_item_access_data(data)) + result, _ = index_utils.is_valid_item_access_data(data) + self.assertFalse(result) def test_is_valid_item_access_data_undefined_content_type(self): data = { @@ -52,7 +55,8 @@ def test_is_valid_item_access_data_undefined_content_type(self): 'media_format': MEDIA_FORMAT_PDF, 'content_type': CONTENT_TYPE_UNDEFINED, } - self.assertFalse(is_valid_item_access_data(data)) + result, _ = index_utils.is_valid_item_access_data(data) + self.assertFalse(result) def test_is_valid_item_access_data_missing_pid_v2_and_pid_v3(self): data = { @@ -62,7 +66,8 @@ def test_is_valid_item_access_data_missing_pid_v2_and_pid_v3(self): 'media_format': MEDIA_FORMAT_PDF, 'content_type': CONTENT_TYPE_FULL_TEXT, } - self.assertFalse(is_valid_item_access_data(data)) + result, _ = index_utils.is_valid_item_access_data(data) + self.assertFalse(result) def test_is_valid_item_access_data_media_format_html(self): data = { @@ -72,7 +77,8 @@ def test_is_valid_item_access_data_media_format_html(self): 'media_format': MEDIA_FORMAT_HTML, 'content_type': CONTENT_TYPE_FULL_TEXT, } - self.assertTrue(is_valid_item_access_data(data)) + result, _ = index_utils.is_valid_item_access_data(data) + self.assertTrue(result) def test_is_valid_item_access_data_content_type_abstract(self): data = { @@ -82,7 +88,8 @@ def test_is_valid_item_access_data_content_type_abstract(self): 'media_format': MEDIA_FORMAT_PDF, 'content_type': CONTENT_TYPE_ABSTRACT } - self.assertTrue(is_valid_item_access_data(data)) + result, _ = index_utils.is_valid_item_access_data(data) + self.assertTrue(result) def test_is_valid_item_acess_data_dataverse(self): data = { @@ -93,4 +100,5 @@ def test_is_valid_item_acess_data_dataverse(self): 'media_format': MEDIA_FORMAT_HTML, 'content_type': CONTENT_TYPE_ABSTRACT, } - self.assertTrue(is_valid_item_access_data(data)) \ No newline at end of file + result, _ = index_utils.is_valid_item_access_data(data) + self.assertTrue(result) diff --git a/metrics/utils/__init__.py b/metrics/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/metrics/utils.py b/metrics/utils/file_utils.py similarity index 50% rename from metrics/utils.py rename to metrics/utils/file_utils.py index e170e07..23d3949 100644 --- a/metrics/utils.py +++ b/metrics/utils/file_utils.py @@ -2,17 +2,6 @@ import io import tarfile -from scielo_usage_counter.values import ( - CONTENT_TYPE_UNDEFINED, - MEDIA_FORMAT_UNDEFINED, -) - -from scielo_usage_counter.translator.classic import URLTranslatorClassicSite -from scielo_usage_counter.translator.dataverse import URLTranslatorDataverseSite -from scielo_usage_counter.translator.opac import URLTranslatorOPACSite -from scielo_usage_counter.translator.opac_alpha import URLTranslatorOPACAlphaSite -from scielo_usage_counter.translator.preprints import URLTranslatorPreprintsSite - def get_load_data_function(file_path): """ @@ -81,58 +70,3 @@ def load_tar_gz(file_path, delimiter='\t'): delimiter=delimiter, is_stream=True ) - - -def is_valid_item_access_data(data): - """ - Validates the item access data based on the provided parameters. - - Parameters: - data (dict): A dictionary containing the following keys: - - scielo_issn (str): The ISSN of the SciELO journal. - - pid_v2 (str): The PID version 2 of the document. - - pid_v3 (str): The PID version 3 of the document. - - media_format (str): The media format of the document. - - content_type (str): The content type of the document. - - Returns: - bool: True if the item access data is valid, False otherwise." - """ - if not isinstance(data, dict): - return False - - scielo_issn = data.get('scielo_issn') - media_format = data.get('media_format') - content_type = data.get('content_type') - pid_v2 = data.get('pid_v2') - pid_v3 = data.get('pid_v3') - pid_generic = data.get('pid_generic') - - if not all([ - scielo_issn, - media_format and media_format != MEDIA_FORMAT_UNDEFINED, - content_type and content_type != CONTENT_TYPE_UNDEFINED, - pid_v2 or pid_v3 or pid_generic, - ]): - return False - return True - - -def translator_class_name_to_obj(name): - """ - Translates a class name to a class object." - - Parameters: - name (str): The name of the URL translator site. - """ - if not name: - return None - - translator_classes = { - 'classic': URLTranslatorClassicSite, - 'dataverse': URLTranslatorDataverseSite, - 'opac': URLTranslatorOPACSite, - 'opac_alpha': URLTranslatorOPACAlphaSite, - 'preprints': URLTranslatorPreprintsSite - } - return translator_classes.get(name.lower()) diff --git a/metrics/utils/index_utils.py b/metrics/utils/index_utils.py new file mode 100644 index 0000000..c597f4e --- /dev/null +++ b/metrics/utils/index_utils.py @@ -0,0 +1,329 @@ +from scielo_usage_counter.counter import compute_r5_metrics +from scielo_usage_counter.values import CONTENT_TYPE_UNDEFINED, MEDIA_FORMAT_UNDEFINED + +from core.utils import standardizer +from core.utils.date_utils import extract_minute_second_key, truncate_datetime_to_hour + + +def generate_user_session_id(client_name, client_version, ip_address, datetime, sep='|'): + """ + Generates a user session ID based on the provided parameters. + + Parameters: + client_name (str): The name of the client. + client_version (str): The version of the client. + ip_address (str): The IP address of the user. + datetime (datetime): The datetime object representing the session time. + sep (str): The separator to use in the ID. Default is '|'. + + Returns: + str: A user session ID formatted as a string. + """ + dt_year_month_day = datetime.strftime('%Y-%m-%d') + dt_hour = datetime.strftime('%H') + + return sep.join([ + str(client_name), + str(client_version), + str(ip_address), + str(dt_year_month_day), + str(dt_hour), + ]) + + +def generate_item_access_id(col_acron3, scielo_issn, pid_v2, pid_v3, pid_generic, user_session_id, country_code, media_language, media_format, content_type, sep='|'): + """ + Generates an item access ID based on the provided parameters. + + Parameters: + col_acron3 (str): The acronym of the collection. + scielo_issn (str): The ISSN of the SciELO journal. + pid_v2 (str): The PID version 2. + pid_v3 (str): The PID version 3. + pid_generic (str): The generic PID. + user_session_id (str): The user session ID. + country_code (str): The country code of the user. + media_language (str): The language of the media. + media_format (str): The format of the media. + content_type (str): The type of content. + sep (str): The separator to use in the ID. Default is '|'. + """ + return sep.join([ + col_acron3, + scielo_issn, + pid_v2 or '', + pid_v3 or '', + pid_generic or '', + user_session_id, + country_code, + media_language, + media_format, + content_type, + ]) + + +def generate_index_name(index_prefix: str, collection: str, date: str): + """ Generates an index name based on the provided parameters. + Parameters: + index_prefix (str): The prefix for the index name. + collection (str): The collection acronym. + date (str): The date string in 'YYYY-MM-DD' format. + Returns: + str: The formatted index name. + """ + if not date or not isinstance(date, str): + raise ValueError("Date must be a non-empty string in 'YYYY-MM-DD' format.") + + if not collection or not isinstance(collection, str): + raise ValueError("Collection must be a non-empty string.") + + if not index_prefix or not isinstance(index_prefix, str): + raise ValueError("Index prefix must be a non-empty string.") + + index_year, _, _ = date.split('-') + return f'{index_prefix}_{collection}_{index_year}' + + +def generate_index_id(collection, journal, pid_v2, pid_v3, pid_generic, media_language, country_code, date_str): + """ + Generates a unique index key based on the provided parameters. + This is different from the item access ID as it does not include user session, media_format, and content_type information. + It is used for indexing purposes. + + Parameters: + collection (str): The collection acronym. + journal (str): The journal name. + pid_v2 (str): The PID version 2. + pid_v3 (str): The PID version 3. + pid_generic (str): The generic PID. + media_language (str): The media language code. + country_code (str): The country code. + date_str (str): The date string in 'YYYY-MM-DD' format. + + Returns: + str: A unique index key formatted as a string. + """ + return '|'.join([ + collection, + journal, + pid_v2 or '', + pid_v3 or '', + pid_generic or '', + media_language, + country_code, + date_str + ]) + + +def extract_item_access_data(collection_acron3:str, translated_url: dict): + """ + Extracts item access data from the translated URL and standardizes it. + + Args: + collection_acron3 (str): The acronym of the collection. + translated_url (dict): The translated URL containing metadata. + + Returns: + dict: A dictionary containing standardized item access data, or None if the data is invalid. + """ + if not translated_url or not isinstance(translated_url, dict): + return {} + + item_access_data = { + 'collection': collection_acron3, + 'scielo_issn': translated_url.get('scielo_issn'), + 'pid_v2': standardizer.standardize_pid_v2(translated_url.get('pid_v2')), + 'pid_v3': standardizer.standardize_pid_v3(translated_url.get('pid_v3')), + 'pid_generic': standardizer.standardize_pid_generic(translated_url.get('pid_generic')), + 'media_language': standardizer.standardize_language_code(translated_url.get('media_language')), + 'media_format': translated_url.get('media_format'), + 'content_type': translated_url.get('content_type'), + 'year_of_publication': standardizer.standardize_year_of_publication(translated_url.get('year_of_publication')), + 'journal_main_title': translated_url.get('journal_main_title'), + 'journal_subject_area_capes': translated_url.get('journal_subject_area_capes'), + 'journal_subject_area_wos': translated_url.get('journal_subject_area_wos'), + 'journal_acronym': translated_url.get('journal_acronym'), + } + + return item_access_data + + +def is_valid_item_access_data(data: dict, utm=None, ignore_utm_validation=False): + """ + Validates the item access data based on the provided parameters. + + Parameters: + data (dict): A dictionary containing the following keys: + - scielo_issn (str): The ISSN of the SciELO journal. + - pid_v2 (str): The PID version 2 of the document. + - pid_v3 (str): The PID version 3 of the document. + - media_format (str): The media format of the document. + - content_type (str): The content type of the document. + utm: URL translation manager for converting URLs + ignore_utm_validation (bool): If True, skips validation against the URL translation manager. + + Returns: + tuple: A tuple containing a boolean indicating whether the data is valid and a message. + If the data is valid, the first element is True and the second element is a success message. + If the data is invalid, the first element is False and the second element is an error message. + """ + if not isinstance(data, dict): + return False, {'message': 'Invalid data format. Expected a dictionary.', 'code': 'invalid_format'} + + scielo_issn = data.get('scielo_issn') + media_format = data.get('media_format') + content_type = data.get('content_type') + pid_v2 = data.get('pid_v2') + pid_v3 = data.get('pid_v3') + pid_generic = data.get('pid_generic') + + if not all([ + scielo_issn, + media_format and media_format != MEDIA_FORMAT_UNDEFINED, + content_type and content_type != CONTENT_TYPE_UNDEFINED, + pid_v2 or pid_v3 or pid_generic, + ]): + return False, {'message': 'Missing required fields in item access data.', 'code': 'missing_fields'} + + # Check ISSN and PIDs validity using the URL translation manager + if utm and not ignore_utm_validation: + if not utm.is_valid_code(scielo_issn, utm.journals_metadata['issn_set']): + return False, {'message': f'Invalid scielo_issn: {scielo_issn}', 'code': 'invalid_scielo_issn'} + + if pid_v2 and not utm.is_valid_code(pid_v2, utm.articles_metadata['pid_set']): + return False, {'message': f'Invalid pid_v2: {pid_v2}', 'code': 'invalid_pid_v2'} + + if pid_v3 and not utm.is_valid_code(pid_v3, utm.articles_metadata['pid_set']): + return False, {'message': f'Invalid pid_v3: {pid_v3}', 'code': 'invalid_pid_v3'} + + if pid_generic and not utm.is_valid_code(pid_generic, utm.articles_metadata['pid_set']): + return False, {'message': f'Invalid pid_generic: {pid_generic}', 'code': 'invalid_pid_generic'} + + return True, {'message': 'Item access data is valid.', 'code': 'valid'} + + +def update_results_with_item_access_data(results: dict, item_access_data: dict, line: dict): + """ + Updates the item access data with the information from the log line. + + Args: + data (dict): The dictionary to store item access data. + item_access_data (dict): The item access data extracted from the translated URL. + line (dict): The log line containing additional information. + + Returns: + None. + """ + col_acron3 = item_access_data.get('collection') + scielo_issn = item_access_data.get('scielo_issn') + pid_v2 = item_access_data.get('pid_v2') + pid_v3 = item_access_data.get('pid_v3') + pid_generic = item_access_data.get('pid_generic') + + media_format = item_access_data.get('media_format') + media_language = item_access_data.get('media_language') + content_type = item_access_data.get('content_type') + + client_name = line.get('client_name') + client_version = line.get('client_version') + local_datetime = line.get('local_datetime') + country_code = line.get('country_code') + ip_address = line.get('ip_address') + + truncated_datetime = truncate_datetime_to_hour(local_datetime) + ms_key = extract_minute_second_key(local_datetime) + + user_session_id = generate_user_session_id( + client_name, + client_version, + ip_address, + truncated_datetime, + ) + + item_access_id = generate_item_access_id( + user_session_id=user_session_id, + col_acron3=col_acron3, + scielo_issn=scielo_issn, + pid_v2=pid_v2, + pid_v3=pid_v3, + pid_generic=pid_generic, + media_language=media_language, + country_code=country_code, + media_format=media_format, + content_type=content_type, + ) + + if item_access_id not in results: + results[item_access_id] = { + 'click_timestamps': {ms_key: 0}, + 'media_format': media_format, + 'media_language': media_language, + 'content_type': content_type, + 'country_code': country_code, + 'date_str': truncated_datetime.strftime('%Y-%m-%d'), + 'date': truncated_datetime, + 'year_of_publication': item_access_data.get('year_of_publication'), + 'journal': { + 'scielo_issn': item_access_data.get('scielo_issn'), + 'main_title': item_access_data.get('journal_main_title'), + 'subject_area_capes': item_access_data.get('journal_subject_area_capes'), + 'subject_area_wos': item_access_data.get('journal_subject_area_wos'), + 'acronym': item_access_data.get('journal_acronym'), + }, + } + + # Check if the click timestamp for this minute-second key exists, if not, initialize it + if ms_key not in results[item_access_id]['click_timestamps']: + results[item_access_id]['click_timestamps'][ms_key] = 0 + + # Increment the click timestamp count + results[item_access_id]['click_timestamps'][ms_key] += 1 + + +def convert_to_index_documents(data: dict, key_sep='|'): + """ + Converts the provided data into a format suitable for indexing metrics. + This function processes the data dictionary, extracting relevant fields and computing metrics. + + Args: + data (dict): A dictionary containing the metrics data to be processed. + + Returns: + dict: A dictionary containing the processed metrics data, ready for indexing. + """ + if not isinstance(data, dict): + return {} + + metrics_data = {} + + for key, value in data.items(): + collection, scielo_issn, pid_v2, pid_v3, pid_generic, _, _, _, _, _, country_code, media_language, _, content_type = key.split(key_sep) + + document_id = generate_index_id( + collection, + scielo_issn, + pid_v2, + pid_v3, + pid_generic, + media_language, + country_code, + value.get('date_str') + ) + + compute_r5_metrics( + document_id, + metrics_data, + collection, + value.get('journal'), + pid_v2, + pid_v3, + pid_generic, + value.get('year_of_publication'), + media_language, + value.get('country_code'), + value.get('date_str'), + value.get('click_timestamps'), + content_type, + ) + + return metrics_data diff --git a/metrics/utils/parser_utils.py b/metrics/utils/parser_utils.py new file mode 100644 index 0000000..ef142e6 --- /dev/null +++ b/metrics/utils/parser_utils.py @@ -0,0 +1,47 @@ +import logging + +from scielo_usage_counter.translator.classic import URLTranslatorClassicSite +from scielo_usage_counter.translator.dataverse import URLTranslatorDataverseSite +from scielo_usage_counter.translator.opac import URLTranslatorOPACSite +from scielo_usage_counter.translator.opac_alpha import URLTranslatorOPACAlphaSite +from scielo_usage_counter.translator.preprints import URLTranslatorPreprintsSite + +from core.utils.date_utils import get_date_obj + + +def extract_date_from_validation_dict(validation): + """ + Extracts the date from the validation dict of a log file. + + Args: + validation (dict): The validation dict of the log file. + + Returns: + datetime.date: The extracted date. + """ + try: + date_str = validation.get('probably_date') + return get_date_obj(date_str, '%Y-%m-%d') + except Exception as e: + logging.error(f"Failed to extract date from validation: {e}") + return None + + +def translator_class_name_to_obj(name: str): + """ + Translates a class name to a class object." + + Parameters: + name (str): The name of the URL translator site. + """ + if not name or not isinstance(name, str): + return None + + translator_classes = { + 'classic': URLTranslatorClassicSite, + 'dataverse': URLTranslatorDataverseSite, + 'opac': URLTranslatorOPACSite, + 'opac_alpha': URLTranslatorOPACAlphaSite, + 'preprints': URLTranslatorPreprintsSite + } + return translator_classes.get(name.lower()) diff --git a/metrics/wagtail_hooks.py b/metrics/wagtail_hooks.py deleted file mode 100644 index c6d027a..0000000 --- a/metrics/wagtail_hooks.py +++ /dev/null @@ -1,108 +0,0 @@ -from django.utils.translation import gettext_lazy as _ -from wagtail.snippets.views.snippets import SnippetViewSet, SnippetViewSetGroup -from wagtail.snippets.models import register_snippet - -from config.menu import get_menu_order -from .models import Item, ItemAccess, UserSession, UserAgent - - -class ItemSnippetViewSet(SnippetViewSet): - model = Item - menu_label = _("Item") - icon = "list-ol" - menu_order = 100 - - list_display = ( - 'collection', - 'journal', - 'article', - ) - list_filter = ( - "collection", - "journal", - ) - search_fields = ( - "journal", - "article", - ) - - -class ItemAccessSnippetViewSet(SnippetViewSet): - model = ItemAccess - menu_label = _("Item Access") - icon = "list-ol" - menu_order = 200 - - list_display = ( - 'item', - 'user_session', - 'country_code', - 'media_language', - 'media_format', - 'content_type', - 'click_timestamps', - ) - list_filter = ( - "item", - "item__collection", - "item__journal", - "country_code", - "media_language", - "media_format", - "content_type", - ) - search_fields = ( - "item", - ) - - -class UserSessionSnippetViewSet(SnippetViewSet): - model = UserSession - menu_label = _("User Session") - icon = "list-ol" - menu_order = 300 - - list_display = ( - 'datetime', - 'user_agent', - 'user_ip', - ) - list_filter = ( - "datetime", - ) - search_fields = ( - "user_agent", - "user_ip", - ) - - -class UserAgentSnippetViewSet(SnippetViewSet): - model = UserAgent - menu_label = _("User Agent") - icon = "list-ol" - menu_order = 400 - - list_display = ( - 'name', - 'version', - ) - search_fields = ( - "name", - ) - - -class MetricsViewSetGroup(SnippetViewSetGroup): - menu_name = 'metrics' - menu_label = _("Metrics") - icon = "folder-open-inverse" - menu_order = get_menu_order("metrics") - - items = ( - ItemAccessSnippetViewSet, - ItemSnippetViewSet, - UserSessionSnippetViewSet, - UserAgentSnippetViewSet, - ) - - -register_snippet(MetricsViewSetGroup) diff --git a/production.yml b/production.yml index 5fb928f..8cd684c 100644 --- a/production.yml +++ b/production.yml @@ -38,17 +38,6 @@ services: links: - postgres - solr: - image: solr:9.3 - restart: always - ports: - - "8983:8983" - volumes: - - ./index:/var/solr - environment: - - SOLR_JAVA_MEM=-Xms512m -Xmx512m - - SOLR_HEAP=512m - postgres: build: context: . diff --git a/requirements/base.txt b/requirements/base.txt index ad8c8ab..10800d0 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -66,7 +66,7 @@ reverse-geocode==1.6 # https://pypi.org/project/reverse-geocode/ -e git+https://github.com/scieloorg/scielo_log_validator@0.4.0#egg=scielo_log_validator # SciELO Usage COUNTER --e git+https://github.com/scieloorg/scielo_usage_counter@1.5.0#egg=scielo_usage_counter +-e git+https://github.com/scieloorg/scielo_usage_counter@1.5.1#egg=scielo_usage_counter # Device Detector device-detector==0.10 # https://github.com/thinkwelltwd/device_detector diff --git a/tracker/choices.py b/tracker/choices.py index de14077..e2c80e2 100644 --- a/tracker/choices.py +++ b/tracker/choices.py @@ -38,8 +38,8 @@ LOG_FILE_DISCARDED_LINE_REASON = [ (LOG_FILE_DISCARDED_LINE_REASON_MISSING_METADATA, _("Missing Metadata")), - (LOG_FILE_DISCARDED_LINE_REASON_MISSING_ARTICLE, _("Missing Article")), - (LOG_FILE_DISCARDED_LINE_REASON_MISSING_JOURNAL, _("Missing Journal")), + (LOG_FILE_DISCARDED_LINE_REASON_MISSING_ARTICLE, _("Missing PIDv2 or PIDv3 or PID Generic")), + (LOG_FILE_DISCARDED_LINE_REASON_MISSING_JOURNAL, _("Missing ISSN")), (LOG_FILE_DISCARDED_LINE_REASON_URL_TRANSLATION, _("URL Translation")), (LOG_FILE_DISCARDED_LINE_REASON_DATABASE_ERROR, _("Database Error")), ] diff --git a/tracker/migrations/0007_alter_logfilediscardedline_error_type.py b/tracker/migrations/0007_alter_logfilediscardedline_error_type.py new file mode 100644 index 0000000..f9ffebe --- /dev/null +++ b/tracker/migrations/0007_alter_logfilediscardedline_error_type.py @@ -0,0 +1,29 @@ +# Generated by Django 5.0.7 on 2025-08-09 21:04 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("tracker", "0006_alter_logfilediscardedline_error_type"), + ] + + operations = [ + migrations.AlterField( + model_name="logfilediscardedline", + name="error_type", + field=models.CharField( + blank=True, + choices=[ + ("MET", "Missing Metadata"), + ("ART", "Missing PIDv2 or PIDv3 or PID Generic"), + ("JOU", "Missing ISSN"), + ("URL", "URL Translation"), + ("DBE", "Database Error"), + ], + max_length=3, + null=True, + verbose_name="Error Type", + ), + ), + ] diff --git a/tracker/models.py b/tracker/models.py index 07b56ef..77086ee 100644 --- a/tracker/models.py +++ b/tracker/models.py @@ -89,14 +89,15 @@ class LogFileDiscardedLine(CommonControlField): ) @classmethod - def create(cls, log_file, error_type, data, message): + def create(cls, log_file, error_type, data, message, save=False): try: obj = cls() obj.log_file = log_file obj.error_type = error_type obj.data = data obj.message = message - obj.save() + if save: + obj.save() except Exception as exc: raise LogFileDiscardedLineCreateError( f"Unable to create LogFileDiscardedLine ({data} - {error_type} - {message}). EXCEPTION {exc}"