diff --git a/scripts/populate_notification_subscriptions.py b/scripts/populate_notification_subscriptions.py deleted file mode 100644 index 557b9f2a47d..00000000000 --- a/scripts/populate_notification_subscriptions.py +++ /dev/null @@ -1,113 +0,0 @@ -import django -django.setup() - -from website.app import init_app -init_app(routes=False) - -from framework.celery_tasks import app as celery_app -from django.contrib.contenttypes.models import ContentType -from django.db.models import Count, F, OuterRef, Subquery, IntegerField, CharField -from django.db.models.functions import Cast -from osf.models import OSFUser, Node, NotificationSubscription, NotificationType - - -@celery_app.task(name='scripts.populate_notification_subscriptions') -def populate_notification_subscriptions(): - created = 0 - user_file_nt = NotificationType.Type.USER_FILE_UPDATED.instance - review_nt = NotificationType.Type.REVIEWS_SUBMISSION_STATUS.instance - node_file_nt = NotificationType.Type.NODE_FILE_UPDATED.instance - - user_ct = ContentType.objects.get_for_model(OSFUser) - node_ct = ContentType.objects.get_for_model(Node) - - reviews_qs = OSFUser.objects.exclude(subscriptions__notification_type__name=NotificationType.Type.REVIEWS_SUBMISSION_STATUS).distinct('id') - files_qs = OSFUser.objects.exclude(subscriptions__notification_type__name=NotificationType.Type.USER_FILE_UPDATED).distinct('id') - - node_notifications_sq = ( - NotificationSubscription.objects.filter( - content_type=node_ct, - notification_type=node_file_nt, - object_id=Cast(OuterRef('pk'), CharField()), - ).values( - 'object_id' - ).annotate( - cnt=Count('id') - ).values('cnt')[:1] - ) - - nodes_qs = ( - Node.objects - .annotate( - contributors_count=Count('_contributors', distinct=True), - notifications_count=Subquery( - node_notifications_sq, - output_field=IntegerField(), - ), - ).exclude(contributors_count=F('notifications_count')) - ) - - print(f"Creating REVIEWS_SUBMISSION_STATUS subscriptions for {reviews_qs.count()} users.") - for id, user in enumerate(reviews_qs, 1): - print(f"Processing user {id} / {reviews_qs.count()}") - try: - _, is_created = NotificationSubscription.objects.get_or_create( - notification_type=review_nt, - user=user, - content_type=user_ct, - object_id=user.id, - defaults={ - 'message_frequency': 'none', - }, - ) - if is_created: - created += 1 - except Exception as exeption: - print(exeption) - continue - - print(f"Creating USER_FILE_UPDATED subscriptions for {files_qs.count()} users.") - for id, user in enumerate(files_qs, 1): - print(f"Processing user {id} / {files_qs.count()}") - try: - _, is_created = NotificationSubscription.objects.get_or_create( - notification_type=user_file_nt, - user=user, - content_type=user_ct, - object_id=user.id, - defaults={ - '_is_digest': True, - 'message_frequency': 'none', - }, - ) - if is_created: - created += 1 - except Exception as exeption: - print(exeption) - continue - - print(f"Creating NODE_FILE_UPDATED subscriptions for {nodes_qs.count()} nodes.") - for id, node in enumerate(nodes_qs, 1): - print(f"Processing node {id} / {nodes_qs.count()}") - for contributor in node.contributors.all(): - try: - _, is_created = NotificationSubscription.objects.get_or_create( - notification_type=node_file_nt, - user=contributor, - content_type=node_ct, - object_id=node.id, - defaults={ - '_is_digest': True, - 'message_frequency': 'none', - }, - ) - if is_created: - created += 1 - except Exception as exeption: - print(exeption) - continue - - print(f"Created {created} subscriptions") - -if __name__ == '__main__': - populate_notification_subscriptions.delay() diff --git a/scripts/populate_notification_subscriptions_node_file_updated.py b/scripts/populate_notification_subscriptions_node_file_updated.py new file mode 100644 index 00000000000..0eb2549beab --- /dev/null +++ b/scripts/populate_notification_subscriptions_node_file_updated.py @@ -0,0 +1,124 @@ +import django +django.setup() + +from website.app import init_app +init_app(routes=False) + +from datetime import datetime +from framework.celery_tasks import app as celery_app +from django.contrib.contenttypes.models import ContentType +from django.db.models import Count, F, OuterRef, Subquery, IntegerField, CharField +from django.db.models.functions import Cast, Coalesce +from osf.models import Node, NotificationSubscription, NotificationType + + +@celery_app.task(name='scripts.populate_nodes_notification_subscriptions') +def populate_nodes_notification_subscriptions(): + print('---Starting NODE_FILE_UPDATED subscriptions population script----') + global_start = datetime.now() + + batch_size = 1000 + node_file_nt = NotificationType.Type.NODE_FILE_UPDATED + + node_ct = ContentType.objects.get_for_model(Node) + + node_notifications_sq = ( + NotificationSubscription.objects.filter( + content_type=node_ct, + notification_type=node_file_nt.instance, + object_id=Cast(OuterRef('pk'), CharField()), + ).values( + 'object_id' + ).annotate( + cnt=Count('id') + ).values('cnt')[:1] + ) + + nodes_qs = ( + Node.objects + .annotate( + contributors_count=Count('_contributors', distinct=True), + notifications_count=Coalesce( + Subquery( + node_notifications_sq, + output_field=IntegerField(), + ), + 0 + ), + ).exclude(contributors_count=F('notifications_count')) + ).iterator(chunk_size=batch_size) + + items_to_create = [] + total_created = 0 + batch_start = datetime.now() + count_nodes = 0 + count_contributors = 0 + for node in nodes_qs: + count_nodes += 1 + for contributor in node.contributors.all(): + count_contributors += 1 + items_to_create.append( + NotificationSubscription( + notification_type=node_file_nt.instance, + user=contributor, + content_type=node_ct, + object_id=node.id, + _is_digest=True, + message_frequency='none', + ) + ) + if len(items_to_create) >= batch_size: + print(f'Creating batch of {len(items_to_create)} subscriptions...') + try: + NotificationSubscription.objects.bulk_create( + items_to_create, + batch_size=batch_size, + ignore_conflicts=True, + ) + total_created += len(items_to_create) + items_to_create = [] + except Exception as exeption: + print(f"Error during bulk_create: {exeption}") + continue + finally: + items_to_create.clear() + batch_end = datetime.now() + print(f'Batch took {batch_end - batch_start}') + + if count_contributors % batch_size == 0: + print(f'Processed {count_nodes} nodes with {count_contributors} contributors, created {total_created} subscriptions') + + if items_to_create: + print(f'Creating final batch of {len(items_to_create)} subscriptions...') + try: + NotificationSubscription.objects.bulk_create( + items_to_create, + batch_size=batch_size, + ignore_conflicts=True, + ) + total_created += len(items_to_create) + except Exception as exeption: + print(f"Error during bulk_create: {exeption}") + + global_end = datetime.now() + print(f'Total time for NODE_FILE_UPDATED subscription population: {global_end - global_start}') + print(f'Created {total_created} subscriptions.') + print('----Creation finished----') + +@celery_app.task(name='scripts.update_nodes_notification_subscriptions') +def update_nodes_notification_subscriptions(): + print('---Starting NODE_FILE_UPDATED subscriptions update script----') + + node_file_nt = NotificationType.Type.NODE_FILE_UPDATED + + updated_start = datetime.now() + updated = ( + NotificationSubscription.objects.filter( + notification_type__name=node_file_nt, + _is_digest=False, + ) + .update(_is_digest=True) + ) + updated_end = datetime.now() + print(f'Updated {updated} subscriptions. Took time: {updated_end - updated_start}') + print('Update finished.') \ No newline at end of file diff --git a/scripts/populate_notification_subscriptions_user_global_file_updated.py b/scripts/populate_notification_subscriptions_user_global_file_updated.py new file mode 100644 index 00000000000..fa6657160cc --- /dev/null +++ b/scripts/populate_notification_subscriptions_user_global_file_updated.py @@ -0,0 +1,99 @@ +import django +django.setup() + +from website.app import init_app +init_app(routes=False) + +from datetime import datetime +from framework.celery_tasks import app as celery_app +from django.contrib.contenttypes.models import ContentType +from osf.models import OSFUser, NotificationSubscription, NotificationType + + +@celery_app.task(name='scripts.populate_notification_subscriptions_user_global_file_updated') +def populate_notification_subscriptions_user_global_file_updated(): + print('---Starting USER_FILE_UPDATED subscriptions population script----') + global_start = datetime.now() + + batch_size = 1000 + user_file_updated_nt = NotificationType.Type.USER_FILE_UPDATED + + user_ct = ContentType.objects.get_for_model(OSFUser) + user_qs = (OSFUser.objects + .exclude(subscriptions__notification_type__name=user_file_updated_nt) + .distinct() + .order_by('id') + .iterator(chunk_size=batch_size) + ) + + items_to_create = [] + total_created = 0 + + batch_start = datetime.now() + for count, user in enumerate(user_qs, 1): + items_to_create.append( + NotificationSubscription( + notification_type=user_file_updated_nt.instance, + user=user, + content_type=user_ct, + object_id=user.id, + _is_digest=True, + message_frequency='none', + ) + ) + if len(items_to_create) >= batch_size: + print(f'Creating batch of {len(items_to_create)} subscriptions...') + try: + NotificationSubscription.objects.bulk_create( + items_to_create, + batch_size=batch_size, + ignore_conflicts=True, + ) + total_created += len(items_to_create) + except Exception as e: + print(f'Error during bulk_create: {e}') + finally: + items_to_create.clear() + batch_end = datetime.now() + print(f'Batch took {batch_end - batch_start}') + + if count % batch_size == 0: + print(f'Processed {count}, created {total_created}') + batch_start = datetime.now() + + if items_to_create: + print(f'Creating final batch of {len(items_to_create)} subscriptions...') + try: + NotificationSubscription.objects.bulk_create( + items_to_create, + batch_size=batch_size, + ignore_conflicts=True, + ) + total_created += len(items_to_create) + except Exception as e: + print(f'Error during bulk_create: {e}') + + global_end = datetime.now() + print(f'Total time for USER_FILE_UPDATED subscription population: {global_end - global_start}') + print(f'Created {total_created} subscriptions.') + print('----Creation finished----') + +@celery_app.task(name='scripts.update_notification_subscriptions_user_global_file_updated') +def update_notification_subscriptions_user_global_file_updated(): + print('---Starting USER_FILE_UPDATED subscriptions updating script----') + + user_file_updated_nt = NotificationType.Type.USER_FILE_UPDATED + + update_start = datetime.now() + updated = ( + NotificationSubscription.objects + .filter( + notification_type__name=user_file_updated_nt, + _is_digest=False, + ) + .update(_is_digest=True) + ) + update_end = datetime.now() + + print(f'Updated {updated} subscriptions. Took time: {update_end - update_start}') + print('Update finished.') \ No newline at end of file diff --git a/scripts/populate_notification_subscriptions_user_global_reviews.py b/scripts/populate_notification_subscriptions_user_global_reviews.py new file mode 100644 index 00000000000..16303941ad2 --- /dev/null +++ b/scripts/populate_notification_subscriptions_user_global_reviews.py @@ -0,0 +1,95 @@ +import django +django.setup() + +from website.app import init_app +init_app(routes=False) + +from datetime import datetime +from framework.celery_tasks import app as celery_app +from django.contrib.contenttypes.models import ContentType +from osf.models import OSFUser, NotificationSubscription, NotificationType + + +@celery_app.task(name='scripts.populate_reviews_notification_subscriptions') +def populate_reviews_notification_subscriptions(): + print('---Starting REVIEWS_SUBMISSION_STATUS subscriptions population script----') + global_start = datetime.now() + + batch_size = 1000 + review_nt = NotificationType.Type.REVIEWS_SUBMISSION_STATUS + + user_ct = ContentType.objects.get_for_model(OSFUser) + + user_qs = OSFUser.objects.exclude( + subscriptions__notification_type__name=NotificationType.Type.REVIEWS_SUBMISSION_STATUS.instance + ).distinct('id') + + items_to_create = [] + total_created = 0 + batch_start = datetime.now() + for count, user in enumerate(user_qs, 1): + items_to_create.append( + NotificationSubscription( + notification_type=review_nt.instance, + user=user, + content_type=user_ct, + object_id=user.id, + _is_digest=True, + message_frequency='none', + ) + ) + if len(items_to_create) >= batch_size: + print(f'Creating batch of {len(items_to_create)} subscriptions...') + try: + NotificationSubscription.objects.bulk_create( + items_to_create, + batch_size=batch_size, + ignore_conflicts=True, + ) + total_created += len(items_to_create) + except Exception as e: + print(f'Error during bulk_create: {e}') + finally: + items_to_create.clear() + batch_end = datetime.now() + print(f'Batch took {batch_end - batch_start}') + + if count % batch_size == 0: + print(f'Processed {count}, created {total_created}') + batch_start = datetime.now() + + if items_to_create: + print(f'Creating final batch of {len(items_to_create)} subscriptions...') + try: + NotificationSubscription.objects.bulk_create( + items_to_create, + batch_size=batch_size, + ignore_conflicts=True, + ) + total_created += len(items_to_create) + except Exception as e: + print(f'Error during bulk_create: {e}') + + global_end = datetime.now() + print(f'Total time for REVIEWS_SUBMISSION_STATUS subscription population: {global_end - global_start}') + print(f'Created {total_created} subscriptions.') + print('----Creation finished----') + +@celery_app.task(name='scripts.update_reviews_notification_subscriptions') +def update_reviews_notification_subscriptions(): + print('---Starting REVIEWS_SUBMISSION_STATUS subscriptions updating script----') + + review_nt = NotificationType.Type.REVIEWS_SUBMISSION_STATUS + + updated_start = datetime.now() + updated = ( + NotificationSubscription.objects.filter( + notification_type__name=review_nt, + _is_digest=False, + ) + .update(_is_digest=True) + ) + updated_end = datetime.now() + + print(f'Updated {updated} subscriptions. Took time: {updated_end - updated_start}') + print('Update finished.')