diff --git a/.envs/.local/.django b/.envs/.local/.django index de24db4..168e012 100644 --- a/.envs/.local/.django +++ b/.envs/.local/.django @@ -2,13 +2,16 @@ # ------------------------------------------------------------------------------ USE_DOCKER=yes IPYTHONDIR=/app/.ipython + # Redis # ------------------------------------------------------------------------------ REDIS_URL=redis://redis:6379/0 # Celery # ------------------------------------------------------------------------------ +CELERY_BROKER_URL=redis://redis:6379/0 # Flower +# ------------------------------------------------------------------------------ CELERY_FLOWER_USER=PhFRdLexbrsBvrrbSXxjcMMOcVOavCrZ CELERY_FLOWER_PASSWORD=QgScyefPrYhHgO6onW61u0nazc5xdBuP4sM7jMRrBBFuA2RjsFhZLp7xbVYZbrwR diff --git a/.envs/.local/.postgres b/.envs/.local/.postgres index 94a9fdd..8a29307 100644 --- a/.envs/.local/.postgres +++ b/.envs/.local/.postgres @@ -5,3 +5,5 @@ POSTGRES_PORT=5432 POSTGRES_DB=scielo_usage POSTGRES_USER=GVRFlLmcCNfGLhsFvSnCioYOPJPYpyfj POSTGRES_PASSWORD=BQ4hSUL4rdj5WZLdR8ilDLRQMvCtzo0caMaXDO0olGsmycQjlcZlTVK9DepZR8kk + +DATABASE_URL=postgres://GVRFlLmcCNfGLhsFvSnCioYOPJPYpyfj:BQ4hSUL4rdj5WZLdR8ilDLRQMvCtzo0caMaXDO0olGsmycQjlcZlTVK9DepZR8kk@postgres:5432/scielo_usage diff --git a/.envs/.production/.django b/.envs/.production/.django index 134e2d1..fd9137e 100644 --- a/.envs/.production/.django +++ b/.envs/.production/.django @@ -9,6 +9,7 @@ REDIS_URL=redis://redis:6379/0 # Celery # ------------------------------------------------------------------------------ +CELERY_BROKER_URL=redis://redis:6379/0 # Flower # ------------------------------------------------------------------------------ diff --git a/.envs/.production/.minio b/.envs/.production/.minio deleted file mode 100644 index e721371..0000000 --- a/.envs/.production/.minio +++ /dev/null @@ -1,4 +0,0 @@ -# Minio -# ------------------------------------------------------------------------------ -MINIO_ROOT_USER= -MINIO_ROOT_PASSWORD= \ No newline at end of file diff --git a/.envs/.production/.postgres b/.envs/.production/.postgres index 94a9fdd..8a29307 100644 --- a/.envs/.production/.postgres +++ b/.envs/.production/.postgres @@ -5,3 +5,5 @@ POSTGRES_PORT=5432 POSTGRES_DB=scielo_usage POSTGRES_USER=GVRFlLmcCNfGLhsFvSnCioYOPJPYpyfj POSTGRES_PASSWORD=BQ4hSUL4rdj5WZLdR8ilDLRQMvCtzo0caMaXDO0olGsmycQjlcZlTVK9DepZR8kk + +DATABASE_URL=postgres://GVRFlLmcCNfGLhsFvSnCioYOPJPYpyfj:BQ4hSUL4rdj5WZLdR8ilDLRQMvCtzo0caMaXDO0olGsmycQjlcZlTVK9DepZR8kk@postgres:5432/scielo_usage diff --git a/VERSION b/VERSION index f88cf52..da38e07 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.13.0 \ No newline at end of file +1.13.1 \ No newline at end of file diff --git a/article/tasks.py b/article/tasks.py index 958a336..3514fca 100644 --- a/article/tasks.py +++ b/article/tasks.py @@ -20,7 +20,7 @@ User = get_user_model() -@celery_app.task(bind=True, name=_('Load article data from Article Meta'), timelimit=-1) +@celery_app.task(bind=True, name=_('Load article data from Article Meta'), timelimit=-1, queue='load') def task_load_article_from_article_meta(self, from_date=None, until_date=None, days_to_go_back=None, collection=None, issn=None, force_update=True, user_id=None, username=None): user = _get_user(self.request, username=username, user_id=user_id) @@ -91,7 +91,7 @@ def task_load_article_from_article_meta(self, from_date=None, until_date=None, d return True -@celery_app.task(bind=True, name=_('Load article data from OPAC'), timelimit=-1) +@celery_app.task(bind=True, name=_('Load article data from OPAC'), timelimit=-1, queue='load') def task_load_article_from_opac(self, collection='scl', from_date=None, until_date=None, days_to_go_back=None, page=1, force_update=True, user_id=None, username=None): user = _get_user(self.request, username=username, user_id=user_id) @@ -156,7 +156,7 @@ def task_load_article_from_opac(self, collection='scl', from_date=None, until_da return True -@celery_app.task(bind=True, name=_('Load preprint data from SciELO Preprints'), timelimit=-1) +@celery_app.task(bind=True, name=_('Load preprint data from SciELO Preprints'), timelimit=-1, queue='load') def task_load_preprints_from_preprints_api(self, from_date=None, until_date=None, days_to_go_back=None, force_update=True, user_id=None, username=None): user = _get_user(self.request, username=username, user_id=user_id) @@ -206,7 +206,7 @@ def task_load_preprints_from_preprints_api(self, from_date=None, until_date=None continue -@celery_app.task(bind=True, name=_('Load dataset metadata from Dataverse'), timelimit=-1) +@celery_app.task(bind=True, name=_('Load dataset metadata from Dataverse'), timelimit=-1, queue='load') def task_load_dataset_metadata_from_dataverse(self, from_date=None, until_date=None, days_to_go_back=None, force_update=True, user_id=None, username=None): user = _get_user(self.request, username=username, user_id=user_id) diff --git a/compose/local/django/celery/worker/start b/compose/local/django/celery/worker/start index e89cc9e..7db6f27 100644 --- a/compose/local/django/celery/worker/start +++ b/compose/local/django/celery/worker/start @@ -3,5 +3,22 @@ set -o errexit set -o nounset +# Worker padrão +watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 -n worker.default@%h & -watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 +# Worker para load e validação de dados +watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=4 -Q load -n worker.load@%h & + +# Worker para scl +watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 -Q parse_xlarge -n worker.parse_xlarge@%h & + +# Worker para chl col mex +watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 -Q parse_large -n worker.parse_large@%h & + +# Worker para cri esp psi prt ven +watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 -Q parse_medium -n worker.parse_medium@%h & + +# Worker para arg bol cub data ecu per preprints pry rve spa sss sza ury wid +watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 -Q parse_small -n worker.parse_small@%h & + +wait \ No newline at end of file diff --git a/compose/production/django/celery/worker/start b/compose/production/django/celery/worker/start index 8cbc7d9..4fb112e 100644 --- a/compose/production/django/celery/worker/start +++ b/compose/production/django/celery/worker/start @@ -4,5 +4,22 @@ set -o errexit set -o pipefail set -o nounset +# Worker padrão +celery -A config.celery_app worker -l INFO --concurrency=1 -n worker.default@%h & -exec celery -A config.celery_app worker -l INFO --concurrency=1 +# Worker para load e validação de dados +celery -A config.celery_app worker -l INFO --concurrency=4 -Q load -n worker.load@%h & + +# Worker para parse_scl (coleções extra-grandes) +celery -A config.celery_app worker -l INFO --concurrency=1 -Q parse_xlarge -n worker.parse_xlarge@%h & + +# Worker para chl col mex (coleções grandes) +celery -A config.celery_app worker -l INFO --concurrency=1 -Q parse_large -n worker.parse_large@%h & + +# Worker para cri esp psi prt ven (coleções médias) +celery -A config.celery_app worker -l INFO --concurrency=1 -Q parse_medium -n worker.parse_medium@%h & + +# Worker para arg bol cub data ecu per preprints pry rve spa sss sza ury wid (coleções pequenas) +celery -A config.celery_app worker -l INFO --concurrency=1 -Q parse_small -n worker.parse_small@%h & + +wait \ No newline at end of file diff --git a/config/settings/base.py b/config/settings/base.py index a914ec2..fc00a61 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -411,3 +411,29 @@ ES_API_KEY = env("ES_API_KEY", default="") ES_BASIC_AUTH = env("ES_BASIC_AUTH", default=("elastic", "iHktg66E")) ES_VERIFY_CERTS = env.bool("ES_VERIFY_CERTS", default=False) + +# Collection size categories +# ------------------------------------------------------------------------------ +EXTRA_LARGE_COLLECTIONS = env.list("EXTRA_LARGE_COLLECTIONS", default=["scl"]) +LARGE_COLLECTIONS = env.list("LARGE_COLLECTIONS", default=["chl", "col", "mex"]) +MEDIUM_COLLECTIONS = env.list("MEDIUM_COLLECTIONS", default=["cri", "esp", "psi", "prt", "ven"]) +SMALL_COLLECTIONS = env.list("SMALL_COLLECTIONS", default=["arg", "bol", "cub", "data", "ecu", "per", "preprints", "pry", "rve", "spa", "sss", "sza", "ury", "wid"]) + +# Collection size mapping +def _build_collection_size_map(): + """Build mapping of collection acronyms to their size categories.""" + size_map = {} + size_categories = { + "xlarge": EXTRA_LARGE_COLLECTIONS, + "large": LARGE_COLLECTIONS, + "medium": MEDIUM_COLLECTIONS, + "small": SMALL_COLLECTIONS, + } + + for size, collections in size_categories.items(): + for acron3 in collections: + size_map[acron3] = size + + return size_map + +COLLECTION_ACRON3_SIZE_MAP = _build_collection_size_map() diff --git a/config/settings/local.py b/config/settings/local.py index 3bbf644..e5bcb3c 100644 --- a/config/settings/local.py +++ b/config/settings/local.py @@ -73,3 +73,7 @@ CELERY_TASK_EAGER_PROPAGATES = True # Your stuff... # ------------------------------------------------------------------------------ + +# Redis +# ------------------------------------------------------------------------------ +REDIS_URL = env("REDIS_URL", default="redis://localhost:6379/0") diff --git a/config/settings/production.py b/config/settings/production.py index 0cc417c..373fa25 100644 --- a/config/settings/production.py +++ b/config/settings/production.py @@ -198,3 +198,7 @@ environment=env("SENTRY_ENVIRONMENT", default="production"), traces_sample_rate=env.float("SENTRY_TRACES_SAMPLE_RATE", default=0.0), ) + +# Redis +# ------------------------------------------------------------------------------ +REDIS_URL = env("REDIS_URL", default="redis://localhost:6379/0") diff --git a/journal/tasks.py b/journal/tasks.py index 59479d7..71681cb 100644 --- a/journal/tasks.py +++ b/journal/tasks.py @@ -15,7 +15,7 @@ User = get_user_model() -@celery_app.task(bind=True, name=_('Load journal data from Article Meta')) +@celery_app.task(bind=True, name=_('Load journal data from Article Meta'), queue='load') def task_load_journal_data_from_article_meta(self, collections=[], force_update=True, user_id=None, username=None, mode='thrift'): user = _get_user(user_id, username) diff --git a/log_manager/tasks.py b/log_manager/tasks.py index 054a27a..e14fe92 100644 --- a/log_manager/tasks.py +++ b/log_manager/tasks.py @@ -26,7 +26,7 @@ User = get_user_model() -@celery_app.task(bind=True, name=_('Search for log files')) +@celery_app.task(bind=True, name=_('Search for log files'), queue='load') def task_search_log_files(self, collections=[], from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None): """ Task to search for log files in the directories defined in the CollectionLogDirectory model. @@ -85,7 +85,7 @@ def _add_log_file(user, collection, root, name, visible_dates): ) -@celery_app.task(bind=True, name=_('Validate log files'), timelimit=-1) +@celery_app.task(bind=True, name=_('Validate log files'), timelimit=-1, queue='load') def task_validate_log_files(self, collections=[], from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None, ignore_date=False): """ Task to validate log files in the database. @@ -113,7 +113,7 @@ def task_validate_log_files(self, collections=[], from_date=None, until_date=Non task_validate_log_file.apply_async(args=(log_file.hash, user_id, username)) -@celery_app.task(bind=True, name=_('Validate log file'), timelimit=-1) +@celery_app.task(bind=True, name=_('Validate log file'), timelimit=-1, queue='load') def task_validate_log_file(self, log_file_hash, user_id=None, username=None): """ Task to validate a specific log file. diff --git a/metrics/es.py b/metrics/es.py index f0d42f0..25ad701 100644 --- a/metrics/es.py +++ b/metrics/es.py @@ -28,6 +28,9 @@ "acronym": { "type": "keyword" }, + "publisher": { + "type": "keyword" + } } }, "pid": { diff --git a/metrics/tasks.py b/metrics/tasks.py index b850e8c..026bfb5 100644 --- a/metrics/tasks.py +++ b/metrics/tasks.py @@ -27,6 +27,10 @@ User = get_user_model() +def extract_celery_queue_name(collection_acronym): + return f"parse_{settings.COLLECTION_ACRON3_SIZE_MAP.get(collection_acronym, 'small')}" + + @celery_app.task(bind=True, name=_('Parse logs'), timelimit=-1) def task_parse_logs(self, collections=[], include_logs_with_error=True, batch_size=5000, replace=False, track_errors=False, from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None): """ @@ -69,8 +73,13 @@ def task_parse_logs(self, collections=[], include_logs_with_error=True, batch_si if probably_date < from_date_obj or probably_date > until_date_obj: continue + queue_name = extract_celery_queue_name(collection) + logging.info(f'PARSING file {lf.path}') - task_parse_log.apply_async(args=(lf.hash, batch_size, replace, track_errors, user_id, username)) + task_parse_log.apply_async( + args=(lf.hash, batch_size, replace, track_errors, user_id, username), + queue=queue_name, + ) @celery_app.task(bind=True, name=_('Parse one log'), timelimit=-1) diff --git a/metrics/utils/index_utils.py b/metrics/utils/index_utils.py index c597f4e..76af8c2 100644 --- a/metrics/utils/index_utils.py +++ b/metrics/utils/index_utils.py @@ -143,6 +143,7 @@ def extract_item_access_data(collection_acron3:str, translated_url: dict): 'journal_subject_area_capes': translated_url.get('journal_subject_area_capes'), 'journal_subject_area_wos': translated_url.get('journal_subject_area_wos'), 'journal_acronym': translated_url.get('journal_acronym'), + 'journal_publisher_name': translated_url.get('journal_publisher_name'), } return item_access_data @@ -269,6 +270,7 @@ def update_results_with_item_access_data(results: dict, item_access_data: dict, 'subject_area_capes': item_access_data.get('journal_subject_area_capes'), 'subject_area_wos': item_access_data.get('journal_subject_area_wos'), 'acronym': item_access_data.get('journal_acronym'), + 'publisher_name': item_access_data.get('journal_publisher_name'), }, }