Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 9 additions & 189 deletions bot/bot.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
import os
import logging
import importlib
from pathlib import Path
from .core import Core
from pyrogram import Client
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing import Generator, Optional
from pyrogram.handlers.handler import Handler
from config import Config, DataBase, PluginDatabase
from config import Config, DataBase


class BotMeta(type):
Expand All @@ -17,188 +10,15 @@ def __call__(cls, *args, **kwargs):
return instance


class Bot(Client, metaclass=BotMeta):
class Bot(Core, Client, metaclass=BotMeta):
def _post_init(self):
self.builtin_plugin = "bot/builtin_plugins"
DataBase.metadata.create_all(Config.engine)
self.load_plugins(folder=self.builtin_plugin)
DataBase.metadata.create_all(Config.engine)

def modules_list(
self, folder: Optional[str | list[str]] = None
) -> list[Path]:
modules = []

folders = (
folder
if isinstance(folder, list)
else [folder or self.plugins["root"]]
)

for f in folders:
for root, _, files in os.walk(
f.replace(".", "/"), followlinks=True
):
for file in files:
if not file.endswith(".py"):
continue
path = Path(root) / file
modules.append(path)

return sorted(modules)

def get_plugins(
self, folder: Optional[str | list[str]] = None
) -> list[str] | list[Path]:
plugins = []

folders = (
folder
if isinstance(folder, list)
else [folder or self.plugins["root"]]
)

for path in self.modules_list(folders):
module_path = ".".join(path.with_suffix("").parts)
module = importlib.import_module(module_path)
if getattr(module, "__plugin__", False):
plugins.append(path.stem)

return sorted(plugins)

def get_handlers(
self,
plugins: Optional[str | list[str]] = None,
folder: Optional[str | list[str]] = None,
) -> Generator[tuple[str, str] | tuple[Handler, int], None, None]:
if isinstance(plugins, str):
plugins = plugins.split(",")

group_offset = 0 if folder == self.builtin_plugin else 1
_plugins = self.get_plugins(folder=folder)

if plugins:
for plugin in plugins:
if plugin not in _plugins:
yield (plugin, "Plugin not found")

for path in self.modules_list(folder=folder):
if plugins and path.stem not in plugins:
for base_class in reversed(self.__class__.__mro__):
if base_class is self.__class__ or base_class is object:
continue

module_path = ".".join(path.parent.parts + (path.stem,))
module = importlib.import_module(module_path)
# TODO: reload the module after import

for name in vars(module).keys():
target_attr = getattr(module, name)
if hasattr(target_attr, "handlers"):
for handler, group in target_attr.handlers:
if isinstance(handler, Handler) and isinstance(
group, int
):
if group < 0 and group_offset != 0:
yield (handler, 0)
else:
yield (handler, group + group_offset)

def handler_is_loaded(self, handler: Handler, group: int = 0) -> bool:
if group not in self.dispatcher.groups:
return False
return handler in self.dispatcher.groups[group]

def set_plugin_status(self, plugin: str, enabled: bool = True):
with Session(Config.engine) as session:
session.merge(PluginDatabase(name=plugin, enabled=enabled))
session.commit()

def get_plugin_status(self, plugin: str) -> bool:
with Session(Config.engine) as session:
enabled = session.execute(
select(PluginDatabase.enabled).where(
PluginDatabase.name == plugin
)
).scalar()
return enabled or False

def load_plugins(
self,
plugins: Optional[str | list[str]] = None,
folder: Optional[str | list[str]] = None,
force_load: bool = False,
) -> dict[str, str]:
result = {}
if isinstance(plugins, str):
plugins = plugins.split(",")

_plugins = self.get_plugins(folder=folder)
plugins = plugins or _plugins

for plugin in plugins:
if plugin in _plugins:
with Session(Config.engine) as session:
if (
session.execute(
select(PluginDatabase.enabled).where(
PluginDatabase.name == plugin
)
).scalar()
is False
and not force_load
):
plugins.remove(plugin)
else:
self.set_plugin_status(plugin, True)

for handler in self.get_handlers(plugins, folder=folder):
if isinstance(handler[0], str):
result[handler[0]] = handler[1]
logging.warning(handler[1])
else:
callback_name = handler[0].callback.__name__
if not self.handler_is_loaded(*handler):
self.add_handler(*handler)
result[callback_name] = "Handler loaded"
logging.info(f"{callback_name} handler has been loaded")
else:
result[callback_name] = "Failed to load handler"
logging.warning(
f"Failed to load {callback_name} handler, "
"because it is already loaded"
)
DataBase.metadata.create_all(Config.engine)
return result

def unload_plugins(
self,
plugins: Optional[str | list[str]] = None,
folder: Optional[str | list[str]] = None,
):
result = {}
if isinstance(plugins, str):
plugins = plugins.split(",")

_plugins = self.get_plugins(folder=folder)
plugins = plugins or _plugins

for plugin in plugins:
if plugin in _plugins:
self.set_plugin_status(plugin, False)

for handler in self.get_handlers(plugins, folder=folder):
if isinstance(handler[0], str):
result[handler[0]] = handler[1]
logging.warning(handler[1])
else:
callback_name = handler[0].callback.__name__
if self.handler_is_loaded(*handler):
self.remove_handler(*handler)
result[callback_name] = "Handler unloaded"
logging.info(f"{callback_name} handler has been unloaded")
else:
result[callback_name] = "Failed to unload handler"
logging.warning(
f"Failed to unload {callback_name} handler, "
"it is not loaded already."
)
return result
if hasattr(base_class, "_post_init"):
_post_init = getattr(base_class, "_post_init")
if callable(_post_init):
_post_init(self)
118 changes: 95 additions & 23 deletions bot/builtin_plugins/manager.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,85 @@
from bot import Bot
from typing import Optional
from pyrogram import filters
from pyrogram.types import (
Message,
InlineKeyboardMarkup,
InlineKeyboardButton,
CallbackQuery,
)
from sqlalchemy import select
from sqlalchemy.orm import Session

from config import Config
from config import Config, PluginDatabase


def plugins_keyboard(app: Bot):
plugins = app.get_plugins()
keyboard = [
[
InlineKeyboardButton(
plugin.replace("-", " ").replace("_", " "), f"plugins {plugin}"
),
InlineKeyboardButton(
{True: "✅", False: "❌"}[app.get_plugin_status(plugin)],
f"plugins {plugin}",
),
def plugins_keyboard(client: Bot, plugin: Optional[str] = None):

with Session(Config.engine) as session:
if plugin:
enabled, public = session.execute(
select(
PluginDatabase.enabled,
PluginDatabase.is_public_use,
).where(PluginDatabase.name == plugin)
).one()

return [
[
InlineKeyboardButton(
"Status", f"plugins {plugin} status2"
),
InlineKeyboardButton(
{True: "✅", False: "❌"}[enabled],
f"plugins {plugin} status2",
),
],
[
InlineKeyboardButton(
"Public Use", f"plugins {plugin} public"
),
InlineKeyboardButton(
{True: "✅", False: "❌"}[public],
f"plugins {plugin} public",
),
],
[
InlineKeyboardButton("Back", "plugins"),
],
]

keyboard = [[InlineKeyboardButton("No were plugin found.", "None")]]
_plugins = client.get_plugins()
plugins = session.execute(
select(
PluginDatabase.name,
PluginDatabase.enabled,
PluginDatabase.is_public_use,
)
).all()
plugins: dict[str, list[bool]] = {
plugin[0]: plugin[1] for plugin in plugins
}

if len(plugins) == 0:
return keyboard

keyboard = [
[
InlineKeyboardButton(
plugin.replace("-", " ").replace("_", " "),
f"plugins {plugin}",
),
InlineKeyboardButton(
{True: "✅", False: "❌"}[plugins[plugin]],
f"plugins {plugin} status1",
),
]
for plugin in plugins
if plugin in _plugins
]
for plugin in plugins
]
return keyboard or [
[InlineKeyboardButton("No were plugin found.", "None")]
]

return keyboard


@Bot.on_message(
Expand All @@ -40,17 +93,36 @@ async def plugins(app: Bot, message: Message):


@Bot.on_callback_query(
Config.IS_ADMIN & filters.regex(r"^plugins (?P<plugin>[\w\-]+)$")
Config.IS_ADMIN
& filters.regex(r"^plugins(?: (?P<plugin>[\w\-]+))?(?: (?P<mode>\w+))?$")
)
async def plugins_callback(app: Bot, query: CallbackQuery):
plugin: str = query.matches[0].group("plugin")
if app.get_plugin_status(plugin):
app.unload_plugins(plugin)
mode: str = query.matches[0].group("mode")
text = "**Plugins**:"

if not plugin:
keyboard = plugins_keyboard(app)
elif mode == "status1":
if app.get_plugin_status(plugin):
app.unload_plugins(plugin)
else:
app.load_plugins(plugin, force_load=True)
keyboard = plugins_keyboard(app)
else:
app.load_plugins(plugin, force_load=True)
text = f"Plugin **{plugin}**:"
if mode == "status2":
if app.get_plugin_status(plugin):
app.unload_plugins(plugin)
else:
app.load_plugins(plugin, force_load=True)
elif mode == "public":
is_public_use = app.get_plugin_data(plugin, "is_public_use")
app.set_plugin_data(plugin, "is_public_use", not is_public_use)
keyboard = plugins_keyboard(app, plugin=plugin)

await query.edit_message_text(
"**Plugins**:",
reply_markup=InlineKeyboardMarkup(plugins_keyboard(app)),
text, reply_markup=InlineKeyboardMarkup(keyboard)
)


Expand Down
9 changes: 9 additions & 0 deletions bot/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .is_public_use import IsPublicUse
from .plugin_manager import PluginManager


class Core(PluginManager, IsPublicUse):
pass


__all__ = ["Core"]
27 changes: 27 additions & 0 deletions bot/core/is_public_use.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import bot
import functools
from typing import Callable
from sqlalchemy import select
from sqlalchemy.orm import Session
from config import Config, PluginDatabase
from pyrogram.types import Message, InlineQuery


class IsPublicUse:
def is_public_use(func: Callable):
@functools.wraps(func)
async def decorator(client: "bot.Bot", update: Message | InlineQuery):
if await Config.IS_ADMIN(client, update):
return await func(client, update)

with Session(Config.engine) as session:
if session.execute(
select(PluginDatabase.is_public_use).where(
PluginDatabase.name == func.__module__.split(".")[-1]
)
).scalar():
return await func(client, update)

return None

return decorator
Loading