Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .envs/.local/.django
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
# ------------------------------------------------------------------------------
USE_DOCKER=yes
IPYTHONDIR=/app/.ipython

# Redis
# ------------------------------------------------------------------------------
REDIS_URL=redis://redis:6379/0

# Celery
# ------------------------------------------------------------------------------
CELERY_BROKER_URL=redis://redis:6379/0

# Flower
# ------------------------------------------------------------------------------
CELERY_FLOWER_USER=PhFRdLexbrsBvrrbSXxjcMMOcVOavCrZ
CELERY_FLOWER_PASSWORD=QgScyefPrYhHgO6onW61u0nazc5xdBuP4sM7jMRrBBFuA2RjsFhZLp7xbVYZbrwR
2 changes: 2 additions & 0 deletions .envs/.local/.postgres
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ POSTGRES_PORT=5432
POSTGRES_DB=scielo_usage
POSTGRES_USER=GVRFlLmcCNfGLhsFvSnCioYOPJPYpyfj
POSTGRES_PASSWORD=BQ4hSUL4rdj5WZLdR8ilDLRQMvCtzo0caMaXDO0olGsmycQjlcZlTVK9DepZR8kk

DATABASE_URL=postgres://GVRFlLmcCNfGLhsFvSnCioYOPJPYpyfj:BQ4hSUL4rdj5WZLdR8ilDLRQMvCtzo0caMaXDO0olGsmycQjlcZlTVK9DepZR8kk@postgres:5432/scielo_usage
1 change: 1 addition & 0 deletions .envs/.production/.django
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ REDIS_URL=redis://redis:6379/0

# Celery
# ------------------------------------------------------------------------------
CELERY_BROKER_URL=redis://redis:6379/0

# Flower
# ------------------------------------------------------------------------------
Expand Down
4 changes: 0 additions & 4 deletions .envs/.production/.minio

This file was deleted.

2 changes: 2 additions & 0 deletions .envs/.production/.postgres
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ POSTGRES_PORT=5432
POSTGRES_DB=scielo_usage
POSTGRES_USER=GVRFlLmcCNfGLhsFvSnCioYOPJPYpyfj
POSTGRES_PASSWORD=BQ4hSUL4rdj5WZLdR8ilDLRQMvCtzo0caMaXDO0olGsmycQjlcZlTVK9DepZR8kk

DATABASE_URL=postgres://GVRFlLmcCNfGLhsFvSnCioYOPJPYpyfj:BQ4hSUL4rdj5WZLdR8ilDLRQMvCtzo0caMaXDO0olGsmycQjlcZlTVK9DepZR8kk@postgres:5432/scielo_usage
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.13.0
1.13.1
8 changes: 4 additions & 4 deletions article/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

User = get_user_model()

@celery_app.task(bind=True, name=_('Load article data from Article Meta'), timelimit=-1)
@celery_app.task(bind=True, name=_('Load article data from Article Meta'), timelimit=-1, queue='load')
def task_load_article_from_article_meta(self, from_date=None, until_date=None, days_to_go_back=None, collection=None, issn=None, force_update=True, user_id=None, username=None):
user = _get_user(self.request, username=username, user_id=user_id)

Expand Down Expand Up @@ -91,7 +91,7 @@ def task_load_article_from_article_meta(self, from_date=None, until_date=None, d
return True


@celery_app.task(bind=True, name=_('Load article data from OPAC'), timelimit=-1)
@celery_app.task(bind=True, name=_('Load article data from OPAC'), timelimit=-1, queue='load')
def task_load_article_from_opac(self, collection='scl', from_date=None, until_date=None, days_to_go_back=None, page=1, force_update=True, user_id=None, username=None):
user = _get_user(self.request, username=username, user_id=user_id)

Expand Down Expand Up @@ -156,7 +156,7 @@ def task_load_article_from_opac(self, collection='scl', from_date=None, until_da
return True


@celery_app.task(bind=True, name=_('Load preprint data from SciELO Preprints'), timelimit=-1)
@celery_app.task(bind=True, name=_('Load preprint data from SciELO Preprints'), timelimit=-1, queue='load')
def task_load_preprints_from_preprints_api(self, from_date=None, until_date=None, days_to_go_back=None, force_update=True, user_id=None, username=None):
user = _get_user(self.request, username=username, user_id=user_id)

Expand Down Expand Up @@ -206,7 +206,7 @@ def task_load_preprints_from_preprints_api(self, from_date=None, until_date=None
continue


@celery_app.task(bind=True, name=_('Load dataset metadata from Dataverse'), timelimit=-1)
@celery_app.task(bind=True, name=_('Load dataset metadata from Dataverse'), timelimit=-1, queue='load')
def task_load_dataset_metadata_from_dataverse(self, from_date=None, until_date=None, days_to_go_back=None, force_update=True, user_id=None, username=None):
user = _get_user(self.request, username=username, user_id=user_id)

Expand Down
19 changes: 18 additions & 1 deletion compose/local/django/celery/worker/start
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,22 @@
set -o errexit
set -o nounset

# Worker padrão
watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 -n worker.default@%h &

watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1
# Worker para load e validação de dados
watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=4 -Q load -n worker.load@%h &

# Worker para scl
watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 -Q parse_xlarge -n worker.parse_xlarge@%h &

# Worker para chl col mex
watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 -Q parse_large -n worker.parse_large@%h &

# Worker para cri esp psi prt ven
watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 -Q parse_medium -n worker.parse_medium@%h &

# Worker para arg bol cub data ecu per preprints pry rve spa sss sza ury wid
watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 -Q parse_small -n worker.parse_small@%h &

wait
19 changes: 18 additions & 1 deletion compose/production/django/celery/worker/start
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,22 @@ set -o errexit
set -o pipefail
set -o nounset

# Worker padrão
celery -A config.celery_app worker -l INFO --concurrency=1 -n worker.default@%h &

exec celery -A config.celery_app worker -l INFO --concurrency=1
# Worker para load e validação de dados
celery -A config.celery_app worker -l INFO --concurrency=4 -Q load -n worker.load@%h &

# Worker para parse_scl (coleções extra-grandes)
celery -A config.celery_app worker -l INFO --concurrency=1 -Q parse_xlarge -n worker.parse_xlarge@%h &

# Worker para chl col mex (coleções grandes)
celery -A config.celery_app worker -l INFO --concurrency=1 -Q parse_large -n worker.parse_large@%h &

# Worker para cri esp psi prt ven (coleções médias)
celery -A config.celery_app worker -l INFO --concurrency=1 -Q parse_medium -n worker.parse_medium@%h &

# Worker para arg bol cub data ecu per preprints pry rve spa sss sza ury wid (coleções pequenas)
celery -A config.celery_app worker -l INFO --concurrency=1 -Q parse_small -n worker.parse_small@%h &

wait
26 changes: 26 additions & 0 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,29 @@
ES_API_KEY = env("ES_API_KEY", default="")
ES_BASIC_AUTH = env("ES_BASIC_AUTH", default=("elastic", "iHktg66E"))
ES_VERIFY_CERTS = env.bool("ES_VERIFY_CERTS", default=False)

# Collection size categories
# ------------------------------------------------------------------------------
EXTRA_LARGE_COLLECTIONS = env.list("EXTRA_LARGE_COLLECTIONS", default=["scl"])
LARGE_COLLECTIONS = env.list("LARGE_COLLECTIONS", default=["chl", "col", "mex"])
MEDIUM_COLLECTIONS = env.list("MEDIUM_COLLECTIONS", default=["cri", "esp", "psi", "prt", "ven"])
SMALL_COLLECTIONS = env.list("SMALL_COLLECTIONS", default=["arg", "bol", "cub", "data", "ecu", "per", "preprints", "pry", "rve", "spa", "sss", "sza", "ury", "wid"])

# Collection size mapping
def _build_collection_size_map():
"""Build mapping of collection acronyms to their size categories."""
size_map = {}
size_categories = {
"xlarge": EXTRA_LARGE_COLLECTIONS,
"large": LARGE_COLLECTIONS,
"medium": MEDIUM_COLLECTIONS,
"small": SMALL_COLLECTIONS,
}

for size, collections in size_categories.items():
for acron3 in collections:
size_map[acron3] = size

return size_map

COLLECTION_ACRON3_SIZE_MAP = _build_collection_size_map()
4 changes: 4 additions & 0 deletions config/settings/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@
CELERY_TASK_EAGER_PROPAGATES = True
# Your stuff...
# ------------------------------------------------------------------------------

# Redis
# ------------------------------------------------------------------------------
REDIS_URL = env("REDIS_URL", default="redis://localhost:6379/0")
4 changes: 4 additions & 0 deletions config/settings/production.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,7 @@
environment=env("SENTRY_ENVIRONMENT", default="production"),
traces_sample_rate=env.float("SENTRY_TRACES_SAMPLE_RATE", default=0.0),
)

# Redis
# ------------------------------------------------------------------------------
REDIS_URL = env("REDIS_URL", default="redis://localhost:6379/0")
2 changes: 1 addition & 1 deletion journal/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
User = get_user_model()


@celery_app.task(bind=True, name=_('Load journal data from Article Meta'))
@celery_app.task(bind=True, name=_('Load journal data from Article Meta'), queue='load')
def task_load_journal_data_from_article_meta(self, collections=[], force_update=True, user_id=None, username=None, mode='thrift'):
user = _get_user(user_id, username)

Expand Down
6 changes: 3 additions & 3 deletions log_manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
User = get_user_model()


@celery_app.task(bind=True, name=_('Search for log files'))
@celery_app.task(bind=True, name=_('Search for log files'), queue='load')
def task_search_log_files(self, collections=[], from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None):
"""
Task to search for log files in the directories defined in the CollectionLogDirectory model.
Expand Down Expand Up @@ -85,7 +85,7 @@ def _add_log_file(user, collection, root, name, visible_dates):
)


@celery_app.task(bind=True, name=_('Validate log files'), timelimit=-1)
@celery_app.task(bind=True, name=_('Validate log files'), timelimit=-1, queue='load')
def task_validate_log_files(self, collections=[], from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None, ignore_date=False):
"""
Task to validate log files in the database.
Expand Down Expand Up @@ -113,7 +113,7 @@ def task_validate_log_files(self, collections=[], from_date=None, until_date=Non
task_validate_log_file.apply_async(args=(log_file.hash, user_id, username))


@celery_app.task(bind=True, name=_('Validate log file'), timelimit=-1)
@celery_app.task(bind=True, name=_('Validate log file'), timelimit=-1, queue='load')
def task_validate_log_file(self, log_file_hash, user_id=None, username=None):
"""
Task to validate a specific log file.
Expand Down
3 changes: 3 additions & 0 deletions metrics/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"acronym": {
"type": "keyword"
},
"publisher": {
"type": "keyword"
}
}
},
"pid": {
Expand Down
11 changes: 10 additions & 1 deletion metrics/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
User = get_user_model()


def extract_celery_queue_name(collection_acronym):
return f"parse_{settings.COLLECTION_ACRON3_SIZE_MAP.get(collection_acronym, 'small')}"


@celery_app.task(bind=True, name=_('Parse logs'), timelimit=-1)
def task_parse_logs(self, collections=[], include_logs_with_error=True, batch_size=5000, replace=False, track_errors=False, from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None):
"""
Expand Down Expand Up @@ -69,8 +73,13 @@ def task_parse_logs(self, collections=[], include_logs_with_error=True, batch_si
if probably_date < from_date_obj or probably_date > until_date_obj:
continue

queue_name = extract_celery_queue_name(collection)

logging.info(f'PARSING file {lf.path}')
task_parse_log.apply_async(args=(lf.hash, batch_size, replace, track_errors, user_id, username))
task_parse_log.apply_async(
args=(lf.hash, batch_size, replace, track_errors, user_id, username),
queue=queue_name,
)


@celery_app.task(bind=True, name=_('Parse one log'), timelimit=-1)
Expand Down
2 changes: 2 additions & 0 deletions metrics/utils/index_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def extract_item_access_data(collection_acron3:str, translated_url: dict):
'journal_subject_area_capes': translated_url.get('journal_subject_area_capes'),
'journal_subject_area_wos': translated_url.get('journal_subject_area_wos'),
'journal_acronym': translated_url.get('journal_acronym'),
'journal_publisher_name': translated_url.get('journal_publisher_name'),
}

return item_access_data
Expand Down Expand Up @@ -269,6 +270,7 @@ def update_results_with_item_access_data(results: dict, item_access_data: dict,
'subject_area_capes': item_access_data.get('journal_subject_area_capes'),
'subject_area_wos': item_access_data.get('journal_subject_area_wos'),
'acronym': item_access_data.get('journal_acronym'),
'publisher_name': item_access_data.get('journal_publisher_name'),
},
}

Expand Down