diff --git a/.env.example b/.env.example index d5ac9cf..b72c89a 100644 --- a/.env.example +++ b/.env.example @@ -1,5 +1,10 @@ # Django SECRET_KEY=# +DEBUG=# + +# redis +CELERY_BROKER_IP=# +REDIS_HOST=# # PostgreSQL DB_NAME=postgres diff --git a/core/__init__.py b/core/__init__.py index 51982bb..3fdf743 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -16,4 +16,4 @@ from .celery import app as celery_app -__all__ = ["celery_app"] +__all__ = ("celery_app",) diff --git a/core/settings.py b/core/settings.py index 9def112..dd025fb 100644 --- a/core/settings.py +++ b/core/settings.py @@ -13,6 +13,7 @@ import os from pathlib import Path +from celery.schedules import timedelta from dotenv import load_dotenv load_dotenv() @@ -30,6 +31,7 @@ # SECURITY WARNING: don't run with debug turned on in production! DEBUG = os.environ.get("DEBUG") == "True" + ALLOWED_HOSTS = ( [ os.environ.get("DOMAIN", "*"), @@ -152,13 +154,17 @@ # Celery settings - CELERY_BROKER_URL = os.environ.get("REDIS_URL") CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL") CELERY_ACCEPT_CONTENT = ["application/json"] CELERY_TASK_SERIALIZER = "json" CELERY_RESULT_SERIALIZER = "json" +CELERY_BEAT_SCHEDULE = { + "send_orders_task": { + "task": "tracker.tasks.check_for_new_issues", + "schedule": timedelta(seconds=10), + } +} # Custom app settings - DEFAULT_SCHEDULE_INTERVAL = 3600 diff --git a/tracker/migrations/0002_telegramuser_is_allowed_notification.py b/tracker/migrations/0002_telegramuser_is_allowed_notification.py new file mode 100644 index 0000000..70919ed --- /dev/null +++ b/tracker/migrations/0002_telegramuser_is_allowed_notification.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.3 on 2024-11-21 19:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("tracker", "0001_initial"), + ] + + operations = [ + migrations.AddField( + model_name="telegramuser", + name="is_allowed_notification", + field=models.BooleanField(default=False), + ), + ] diff --git a/tracker/models.py b/tracker/models.py index 6e68fda..6682ff4 100644 --- a/tracker/models.py +++ b/tracker/models.py @@ -223,6 +223,7 @@ class TelegramUser(AbstractModel): user = models.OneToOneField(CustomUser, on_delete=models.CASCADE) telegram_id = models.CharField(unique=True) + is_allowed_notification = models.BooleanField(default=False) def __str__(self) -> str: """ diff --git a/tracker/tasks.py b/tracker/tasks.py index 83ac928..6a2ad42 100644 --- a/tracker/tasks.py +++ b/tracker/tasks.py @@ -1,12 +1,74 @@ """ A `tracker.tasks` module that contains all celery tasks. """ + +import json +import logging +import os + +import redis from asgiref.sync import async_to_sync from celery import shared_task +from dotenv import load_dotenv + +from tracker.models import Repository, TelegramUser +from tracker.telegram.bot import send_new_issue_notification, send_revision_messages +from tracker.utils import ( + compare_two_repo_dicts, + get_existing_issues_for_subscribed_users, + get_user_revisions, +) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +load_dotenv() +# Connect to Redis + + +@shared_task() +def get_relevant_recipients() -> None: + """ + Retrieves a mapping of Telegram users subscribed for + new-issue-notifications to the repositories they are subscribed to. + + :return: A dictionary where keys are Telegram user IDs, and values are lists of subscribed repository names. + """ + subscribed_users = ( + TelegramUser.objects.filter(notify_about_new_issues=True).first().user + ) + repositories = Repository.objects.filter(user__in=subscribed_users).values( + "author", "name" + ) + existing_issues = get_existing_issues_for_subscribed_users(repositories) + + cache = redis.Redis( + host=os.environ.get("REDIS_HOST"), port=6379, decode_responses=True + ) + if not cache.exists("task_first_run_flag"): + cache.set("existing:issues", json.dumps(existing_issues)) + return + + cached_existing_issues = cache.get("existing:issues") + cached_existing_issues = json.loads(str(cached_existing_issues)) + new_issues = compare_two_repo_dicts(existing_issues, cached_existing_issues) + repos_with_new_issues = [key for key in new_issues] + + user_repo_map = {} + for telegram_user in subscribed_users: + repos = Repository.objects.filter( + user=telegram_user.user, name__in=repos_with_new_issues + ) + + logger.info(f"Telegram User: {telegram_user.telegram_id}") + for repo in repos: + if telegram_user.telegram_id not in user_repo_map: + user_repo_map[telegram_user.telegram_id] = [] + + user_repo_map[telegram_user.telegram_id].append(repo.name) -from .models import TelegramUser -from .telegram.bot import send_revision_messages -from .utils import get_user_revisions + async_to_sync(send_new_issue_notification)(user_repo_map, new_issues) @shared_task diff --git a/tracker/telegram/bot.py b/tracker/telegram/bot.py index fdb0a42..b032ea7 100644 --- a/tracker/telegram/bot.py +++ b/tracker/telegram/bot.py @@ -9,19 +9,20 @@ from aiogram import Bot, Dispatcher, F from aiogram.client.default import DefaultBotProperties -from aiogram.filters import CommandObject, CommandStart +from aiogram.filters import Command, CommandObject, CommandStart from aiogram.types.message import Message from aiogram.utils.deep_linking import create_start_link from aiogram.utils.keyboard import ReplyKeyboardBuilder, ReplyKeyboardMarkup from dotenv import load_dotenv from tracker import ISSUES_URL, PULLS_URL, get_issues_without_pull_requests +from tracker.models import TelegramUser from tracker.telegram.templates import TEMPLATES from tracker.utils import ( attach_link_to_issue, create_telegram_user, get_all_available_issues, - get_all_repostitories, + get_all_repositories, get_contributor_issues, get_repository_support, get_support_link, @@ -80,6 +81,32 @@ async def start_message(message: Message) -> None: ) +@dp.message(Command("notify_about_new_issues")) +async def subscribe_to_issue_notifications(msg: Message): + """ + + Updates telegram user subscription status, and responds with the new subscription status. + + :param msg: Message instance used to retrieve telegram id. + :return: None + """ + try: + telegram_user = TelegramUser.objects.filter( + telegram_id=msg.from_user.id + ).first() + if not telegram_user: + await msg.answer(f"Telegram user with ID {msg.from_user.id} not found.") + return + + telegram_user.is_subscribed = not telegram_user.is_subscribed + telegram_user.save(update_fields=["is_subscribed"]) + status = "subscribed" if telegram_user.is_subscribed else "unsubscribed" + await msg.answer(f'Your status was successfully changed to "{status}"') + + except Exception as e: + logger.info(f"During the execution, unexpected error occurred: {e}") + + @dp.message(F.text == "đź““get missed deadlinesđź““") async def send_deprecated_issue_assignees(msg: Message) -> None: """ @@ -87,7 +114,7 @@ async def send_deprecated_issue_assignees(msg: Message) -> None: :param msg: Message instance for communication with a user :return: None """ - all_repositories = await get_all_repostitories(msg.from_user.id) + all_repositories = await get_all_repositories(msg.from_user.id) for repository in all_repositories: @@ -130,7 +157,7 @@ async def send_available_issues(msg: Message) -> None: :param msg: Message instance for communication with a user :return: None """ - all_repositories = await get_all_repostitories(msg.from_user.id) + all_repositories = await get_all_repositories(msg.from_user.id) for repository in all_repositories: repo_message = TEMPLATES.repo_header.substitute( @@ -156,7 +183,19 @@ async def send_available_issues(msg: Message) -> None: message = repo_message + issue_messages - await msg.reply(message, parse_mode="HTML") + await msg.reply(message) + + +async def send_new_issue_notification( + id_to_repos_map: dict[str, list], repo_to_issues_map: dict[str, list] +): + for tg_id, repos in id_to_repos_map.values(): + for repo in repos: + message = f"There are new issues in {repo}!\n" + repo_issues = repo_to_issues_map[repo] + for issue in repo_issues: + message += f"
{issue}
" + await bot.send_message(tg_id, message) @dp.message(F.text.contains("/issues ")) @@ -215,7 +254,7 @@ async def send_support_contacts(msg: Message) -> None: :param msg: Message instance for communication with a user :return: None """ - all_repositories = await get_all_repostitories(msg.from_user.id) + all_repositories = await get_all_repositories(msg.from_user.id) for repository in all_repositories: repo_message = TEMPLATES.repo_header.substitute( diff --git a/tracker/utils.py b/tracker/utils.py index 67d78fa..e81d71e 100644 --- a/tracker/utils.py +++ b/tracker/utils.py @@ -16,6 +16,7 @@ DATETIME_FORMAT, HEADERS, ISSUES_SEARCH, + ISSUES_URL, PULLS_REVIEWS_URL, PULLS_URL, SECONDS_IN_AN_HOUR, @@ -38,7 +39,7 @@ def escape_html(text: str) -> str: @sync_to_async -def get_all_repostitories(tele_id: str) -> list[dict]: +def get_all_repositories(tele_id: str) -> list[dict]: """ A function that returns a list of repositories asyncronously. :param tele_id: str @@ -272,9 +273,8 @@ def get_pull_reviews(url: str) -> list[dict]: try: response = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT) response.raise_for_status() + return response.json() - if response.ok: - return response.json() except requests.exceptions.RequestException as e: logger.info(e) return [] @@ -286,7 +286,7 @@ def get_user_revisions(telegram_id: str) -> list[dict]: :params tele_id: The TelegramUser id of the user :return: A list of reviews for all the user repos open PRS """ - repos = async_to_sync(get_all_repostitories)(telegram_id) + repos = async_to_sync(get_all_repositories)(telegram_id) reviews_list = [] for repo in repos: pulls = get_all_open_pull_requests( @@ -302,6 +302,7 @@ def get_user_revisions(telegram_id: str) -> list[dict]: pull_number=pull["number"], ) ) + if reviews_data: return_data["reviews"] = reviews_data reviews_list.append(return_data.copy()) @@ -342,6 +343,69 @@ def get_contributor_issues( return [] +def get_all_opened_issues(url: str) -> list[dict]: + """ + Retrieves all opened issues from the given URL. + :param url: An API endpoint for issues. + :return: A list of dictionaries representing opened issues. + """ + try: + response = requests.get(url, headers=HEADERS) + response.raise_for_status() + issues = response.json() + + opened_issues = list( + filter( + lambda issue: issue.get("state") == "open" + and not issue.get("draft") + and not issue.get("pull_request"), + issues, + ) + ) + + return opened_issues + except requests.exceptions.RequestException as e: + logger.info(e) + return [] + + +def get_existing_issues_for_subscribed_users( + repositories: list[dict], +) -> dict[str, list[str]]: + """ + Retrieves open issues for a given list of repositories. + + :param repositories: List of repositories with "author" and "name". + :return: Dictionary with repository names as keys and lists of open issue titles as values. + """ + + repository_data = {} + for repository in repositories: + issues = get_all_opened_issues( + ISSUES_URL.format( + owner=repository.get("author", str()), + repo=repository.get("name", str()), + ) + ) + repository_data[repository.get("name", str())] = [ + issue.get("title") for issue in issues + ] + return repository_data + + +def compare_two_repo_dicts( + dict1: dict[str, list[str]], dict2: dict[str, list[str]] +) -> dict[str, list[str]]: + diff = {} + for key in dict1: + len_1 = len(dict1[key]) + len_2 = len(dict2[key]) + if len_1 > len_2: + new_issues = len_1 - len_2 + diff.update({key: dict1[key][:new_issues]}) + return diff + + def attach_link_to_issue(issue: dict) -> str: """ Attaches the issue link to the issue title