diff --git a/QSBK.py b/QSBK.py index 42e9e4f..4496c77 100644 --- a/QSBK.py +++ b/QSBK.py @@ -1,91 +1,217 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -__author__='WYY' -__date__='2017.03.13' - -#实战小项目:糗事百科—我的第一个小爬虫 import re -import requests import time -import codecs +import requests +from pathlib import Path +from requests.exceptions import RequestException, ConnectionError, Timeout + +__author__ = 'WYY' +__date__ = '2024.10.30' # 更新为当前优化时间 + + +class QiubaiSpider: + """糗事百科爬虫,支持多页数据爬取、格式化输出与文件保存""" + # 基础配置 + BASE_URL = "https://www.qiushibaike.com" + SAVE_DIR = Path("qiubai_data") # 数据保存目录(脚本所在路径) + # 现代浏览器请求头,避免被反爬 + HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Referer": BASE_URL + } + # 糗事内容解析正则(适配当前糗事百科列表页结构) + CONTENT_PATTERN = re.compile( + r'
(.*?)
' # 2. 作者性别/年龄 + r'.*?
.*?(.*?)' # 3. 糗事内容 + r'.*?(.*?)' # 4. 好笑数 + r'.*?(.*?)' # 5. 评论数 + r'.*?' # 6. 赞数 + r'.*?', # 7. 踩数 + re.S + ) -#定义一个Tool类,方便用replace方法把换行符等删除 -class Tool(): - def replace(self,x): - x=re.sub(re.compile('
|
|/>|(.*?)
.*?
.*?(.*?).*?(.*?).*?stats-comments.*?number">(.*?).*?up.*?number hidden">(.*?).*?down.*?number hidden">(.*?)',re.S) - items=re.findall(pattern,source) - number=1 + # 初始化会话(复用连接,提升效率) + self.session = requests.Session() + self.session.headers.update(self.HEADERS) + # 创建保存目录(自动处理不存在的情况) + self.SAVE_DIR.mkdir(parents=True, exist_ok=True) + print("===== 糗事百科爬虫初始化完成 =====") + + + def fetch_html(self, url, max_retries=2): + """获取网页HTML,带重试机制和异常处理""" + for retry in range(max_retries + 1): + try: + response = self.session.get(url, timeout=10) + response.raise_for_status() # 触发HTTP错误(如404、500) + response.encoding = "utf-8" # 明确编码,避免乱码 + return response.text + except (ConnectionError, Timeout): + if retry < max_retries: + print(f"网络异常,第{retry+1}次重试(URL: {url})...") + time.sleep(1) + continue + print(f"请求失败:网络连接超时(URL: {url})") + except RequestException as e: + print(f"请求失败:{str(e)}(URL: {url})") + return None + + + def clean_text(self, text): + """清理文本:去除HTML标签、换行符、空格""" + if not text: + return "未知" + # 移除
等标签和多余空格 + clean_pattern = re.compile(r'|
|[\n\r\s]+') + return re.sub(clean_pattern, ' ', text).strip() + + + def parse_qiubai_page(self, html): + """解析糗事百科页面,提取结构化数据""" + if not html: + return [] + + items = self.CONTENT_PATTERN.findall(html) + parsed_data = [] for item in items: - print u'' - print number,u'楼',u'\n楼主:',item[0],u'',item[1],u'岁',u'\n发言:',self.tool.replace(item[2]),u'\n好笑:',item[3],u'\n评论:',item[4],u'\n赞:',item[5],u'\n踩:',item[6] - time.sleep(0.1) - number+=1 - return items - - #保存信息写入文件 - def saveDetailPage(self,data,name): - fileName='page'+name+'.'+'txt' - f=codecs.open(fileName,'wb') - f.write(data) - print u'',u'成功将数据保存入文件',fileName - f.close() - - #对一页的操作 - def OnePage(self,detailURL,name): - data=self.getDetailPage(detailURL) - self.saveDetailPage(str(data),str(name)) - - #对很多页的操作 - #分两种情况讨论,start页等于1\start页大于1 - def getAllPage(self,start,end): - if start==1: - print u'正在获取第1页的数据...' - detailURL=self.siteURL - self.OnePage(detailURL,start) - number=2 - for page in range(2, end+1): - print u'正在获取第', number, u'页的数据...' - detailURL = self.siteURL + '8hr/page/' + str(page) + '/?s=4964625' - self.OnePage(detailURL,number) - time.sleep(2) - number +=1 - if number==end+1: - print u'',u'\n加载结束!' - return False - - elif start>1: - number=start - for page in range(start,end+1): - print u'',u'\n正在获取第',number,u'页的数据...' - detailURL=self.siteURL + '8hr/page/' +str(page)+ '/?s=4964625' - self.OnePage(detailURL,number) - time.sleep(2) - number += 1 - if number==end+1: - print u'',u'加载结束!' - return False - -spider=Spider() -spider.getAllPage(start=int(raw_input(u'请输入起始页数:')),end=int(raw_input(u'请输入结束页数:'))) + # 清理每个字段的文本 + author = self.clean_text(item[0]) + gender_age = self.clean_text(item[1]) # 格式如“男 28” + content = self.clean_text(item[2]) + laugh_count = self.clean_text(item[3]) or "0" + comment_count = self.clean_text(item[4]) or "0" + like_count = self.clean_text(item[5]) or "0" + dislike_count = self.clean_text(item[6]) or "0" + + # 结构化数据,便于后续使用 + parsed_data.append({ + "author": author, + "gender_age": gender_age, + "content": content, + "laugh_count": laugh_count, + "comment_count": comment_count, + "like_count": like_count, + "dislike_count": dislike_count + }) + return parsed_data + + + def format_output(self, parsed_data, page_num): + """格式化输出糗事数据(控制台友好显示)""" + if not parsed_data: + print(f"第{page_num}页未解析到糗事内容") + return "" + + output_text = f"===== 糗事百科第{page_num}页内容(共{len(parsed_data)}条)=====\n" + for idx, data in enumerate(parsed_data, 1): + output_text += ( + f"\n{idx}楼\n" + f"楼主:{data['author']}({data['gender_age']})\n" + f"内容:{data['content']}\n" + f"好笑:{data['laugh_count']} | 评论:{data['comment_count']} | " + f"赞:{data['like_count']} | 踩:{data['dislike_count']}\n" + f"{'='*50}\n" + ) + # 控制台逐条打印,避免一次性输出过多 + print( + f"\n{idx}楼\n" + f"楼主:{data['author']}({data['gender_age']})\n" + f"内容:{data['content']}\n" + f"好笑:{data['laugh_count']} | 评论:{data['comment_count']}" + ) + time.sleep(0.1) # 控制打印速度,提升可读性 + return output_text + + + def save_to_file(self, content, page_num): + """将糗事数据保存到TXT文件(UTF-8编码,避免乱码)""" + if not content: + return False + + file_path = self.SAVE_DIR / f"qiubai_page{page_num}.txt" + try: + with open(file_path, "w", encoding="utf-8") as f: + f.write(content) + print(f"\n✅ 第{page_num}页数据已保存至:{file_path.resolve()}") + return True + except OSError as e: + print(f"❌ 保存第{page_num}页文件失败:{str(e)}") + return False + + + def crawl_single_page(self, page_num): + """爬取单页糗事数据(获取→解析→输出→保存)""" + print(f"\n===== 开始爬取第{page_num}页 =====") + # 构建分页URL(第1页URL特殊,无page参数) + if page_num == 1: + page_url = self.BASE_URL + else: + page_url = f"{self.BASE_URL}/8hr/page/{page_num}/" + + # 1. 获取页面HTML + html = self.fetch_html(page_url) + if not html: + print(f"❌ 第{page_num}页爬取失败,跳过该页") + return False + + # 2. 解析页面数据 + parsed_data = self.parse_qiubai_page(html) + if not parsed_data: + print(f"❌ 第{page_num}页无有效内容,跳过该页") + return False + + # 3. 格式化输出与保存 + output_content = self.format_output(parsed_data, page_num) + self.save_to_file(output_content, page_num) + return True + + + def crawl_multi_pages(self, start_page, end_page): + """爬取多页糗事数据(从start_page到end_page)""" + # 校验输入参数合法性 + if not (isinstance(start_page, int) and isinstance(end_page, int)): + print("❌ 页码必须为整数") + return + if start_page < 1 or end_page < start_page: + print(f"❌ 页码范围错误(需满足 1 ≤ 起始页 ≤ 结束页)") + return + + print(f"\n===== 开始批量爬取(第{start_page}页至第{end_page}页)=====") + success_count = 0 # 成功爬取的页数 + + for page in range(start_page, end_page + 1): + if self.crawl_single_page(page): + success_count += 1 + # 页间等待,避免高频请求被反爬 + if page != end_page: + wait_time = 2.5 # 等待2.5秒 + print(f"\n等待{wait_time}秒后爬取下一页...") + time.sleep(wait_time) + + # 爬取完成总结 + total_pages = end_page - start_page + 1 + print(f"\n===== 批量爬取结束 =====") + print(f"总任务:{total_pages}页 | 成功:{success_count}页 | 失败:{total_pages - success_count}页") + print(f"数据保存目录:{self.SAVE_DIR.resolve()}") + + +if __name__ == "__main__": + try: + # 获取用户输入的页码范围(适配Python3的input) + start = int(input("请输入起始页数:").strip()) + end = int(input("请输入结束页数:").strip()) + + # 初始化爬虫并执行 + spider = QiubaiSpider() + spider.crawl_multi_pages(start, end) + except ValueError: + print("❌ 输入错误:页码必须为整数") + except Exception as e: + print(f"❌ 程序运行出错:{str(e)}") \ No newline at end of file diff --git a/README_TASK_MANAGER.md b/README_TASK_MANAGER.md new file mode 100644 index 0000000..b88641c --- /dev/null +++ b/README_TASK_MANAGER.md @@ -0,0 +1,199 @@ +# 定时任务管理系统 + +这是一个基于Flask和APScheduler的定时任务管理系统,专为Python爬虫项目设计,支持Python脚本、Scrapy爬虫和Flask应用的定时执行。 + +## 功能特点 + +- 📅 **灵活的调度器**:支持Cron表达式、间隔执行和指定日期三种触发方式 +- 🕷️ **多类型任务支持**:支持Python脚本、Scrapy爬虫和Flask应用三种任务类型 +- 📥 **任务导入导出**:支持JSON格式的任务配置导入导出,方便备份和迁移 +- 🌐 **友好的Web界面**:基于Bootstrap的响应式界面,操作简单直观 +- 📊 **执行历史记录**:记录任务执行历史,方便排查问题 +- ⚡ **实时任务控制**:支持立即执行、暂停、恢复任务操作 + +## 安装与配置 + +### 1. 安装依赖 + +```bash +pip install flask apscheduler +``` + +### 2. 启动系统 + +```bash +python run_task_manager.py +``` + +启动后访问 http://localhost:5000 即可使用Web界面管理定时任务。 + +## 使用说明 + +### 创建任务 + +1. 点击"新建任务"按钮 +2. 填写任务基本信息: + - 任务名称:唯一标识符 + - 任务描述:可选的任务说明 + - 任务类型:选择Python脚本、Scrapy爬虫或Flask应用 + - 启用状态:是否立即启用任务 + +3. 根据任务类型配置相应参数: + - **Python脚本**:脚本路径(相对于项目根目录) + - **Scrapy爬虫**:项目路径和爬虫名称 + - **Flask应用**:应用路径和函数名称 + +4. 配置触发器: + - **Cron表达式**:设置分钟、小时、日、月、星期 + - **间隔执行**:设置秒、分钟、小时的间隔 + - **指定日期**:设置具体的执行日期和时间 + +5. 点击"创建任务"保存 + +### 管理任务 + +在任务列表页面,您可以: + +- 查看所有任务的状态和下次执行时间 +- 立即执行任务 +- 编辑任务配置 +- 暂停/恢复任务 +- 删除任务 +- 查看任务详情和执行历史 + +### 导入导出任务 + +#### 导出任务 + +1. 点击"导出任务"按钮 +2. 选择要导出的任务 +3. 设置文件名(可选) +4. 选择是否包含已禁用的任务 +5. 点击"下载文件"获取JSON格式的任务配置 + +#### 导入任务 + +1. 点击"导入任务"按钮 +2. 选择JSON格式的任务配置文件 +3. 选择是否覆盖已存在的任务 +4. 预览文件内容 +5. 确认导入 + +## 任务配置示例 + +### Python脚本任务 + +```json +{ + "id": "daily_qsbk", + "config": { + "name": "每日糗事百科爬取", + "description": "每天早上8点爬取糗事百科最新内容", + "type": "script", + "enabled": true, + "trigger_type": "cron", + "minute": "0", + "hour": "8", + "day": "*", + "month": "*", + "day_of_week": "*", + "script_path": "QSBK.py" + } +} +``` + +### Scrapy爬虫任务 + +```json +{ + "id": "weekly_xiaohua", + "config": { + "name": "每周笑话图片爬取", + "description": "每周一早上9点爬取笑话图片", + "type": "scrapy", + "enabled": true, + "trigger_type": "cron", + "minute": "0", + "hour": "9", + "day": "*", + "month": "*", + "day_of_week": "0", + "project_path": "XiaoHua", + "spider_name": "xiaohua" + } +} +``` + +### Flask应用任务 + +```json +{ + "id": "hourly_data_sync", + "config": { + "name": "每小时数据同步", + "description": "每小时同步一次数据", + "type": "flask", + "enabled": true, + "trigger_type": "interval", + "seconds": 0, + "minutes": 0, + "hours": 1, + "app_path": "server.py", + "function_name": "sync_data" + } +} +``` + +## 项目结构 + +``` +d:\Python-web-scraping\ +├── task_manager_app.py # Flask应用主文件 +├── task_scheduler.py # 任务调度器核心模块 +├── task_config_manager.py # 任务配置管理模块 +├── run_task_manager.py # 启动脚本 +├── templates/ # HTML模板目录 +│ ├── base.html # 基础模板 +│ ├── tasks.html # 任务列表页面 +│ ├── task_form.html # 任务表单页面 +│ ├── task_detail.html # 任务详情页面 +│ ├── import_tasks.html # 任务导入页面 +│ └── export_tasks.html # 任务导出页面 +├── static/ # 静态资源目录 +├── exports/ # 导出文件目录 +└── task_manager.log # 日志文件 +``` + +## 注意事项 + +1. 确保Python脚本、Scrapy项目和Flask应用的路径正确 +2. 任务执行时会记录日志,可通过日志排查问题 +3. 导入任务时会验证配置格式,格式错误的任务会被跳过 +4. 系统会自动创建必要的目录(如exports目录) +5. 建议定期导出任务配置作为备份 + +## 常见问题 + +### Q: 任务执行失败怎么办? + +A: 查看任务详情页面的执行历史,可以看到具体的错误信息。也可以查看task_manager.log日志文件获取更详细的错误信息。 + +### Q: 如何修改已存在的任务? + +A: 在任务列表页面点击任务的"编辑"按钮,或者点击任务名称进入详情页面后点击"编辑"按钮。 + +### Q: 任务可以同时运行多个实例吗? + +A: 不可以,同一个任务同时只能有一个实例在运行。如果任务正在执行,再次点击"立即执行"会提示任务正在运行。 + +### Q: 如何备份所有任务配置? + +A: 使用"导出任务"功能,选择所有任务并导出为JSON文件。建议定期备份以防数据丢失。 + +## 技术栈 + +- **后端框架**:Flask +- **任务调度**:APScheduler +- **前端框架**:Bootstrap 5 +- **前端交互**:jQuery +- **图标库**:Font Awesome \ No newline at end of file diff --git a/run_task_manager.py b/run_task_manager.py new file mode 100644 index 0000000..0bf22fb --- /dev/null +++ b/run_task_manager.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +定时任务管理系统启动脚本 +""" + +import os +import sys +import logging +from task_manager_app import app + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('task_manager.log'), + logging.StreamHandler() + ] +) + +logger = logging.getLogger(__name__) + +def main(): + """主函数""" + try: + logger.info("启动定时任务管理系统...") + + # 确保导出目录存在 + os.makedirs('exports', exist_ok=True) + + # 启动Flask应用 + logger.info("Web界面启动成功,访问 http://localhost:5000") + app.run(host='0.0.0.0', port=5000, debug=True) + + except Exception as e: + logger.error(f"启动失败: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/task_config_manager.py b/task_config_manager.py new file mode 100644 index 0000000..3b97f0f --- /dev/null +++ b/task_config_manager.py @@ -0,0 +1,293 @@ +# -*- coding: utf-8 -*- +""" +任务配置管理模块 +提供任务配置的导入导出功能 +""" +import os +import json +import datetime +from typing import Dict, List, Any, Optional + +class TaskConfigManager: + """任务配置管理器""" + + def __init__(self, config_dir: str = None): + """ + 初始化任务配置管理器 + + Args: + config_dir: 配置文件目录,默认为当前目录下的configs文件夹 + """ + if config_dir is None: + self.config_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'configs') + else: + self.config_dir = config_dir + + # 确保配置目录存在 + os.makedirs(self.config_dir, exist_ok=True) + + # 默认配置文件路径 + self.default_config_file = os.path.join(self.config_dir, 'default_tasks.json') + self.custom_config_file = os.path.join(self.config_dir, 'custom_tasks.json') + + # 初始化默认配置 + self._init_default_config() + + def _init_default_config(self): + """初始化默认任务配置""" + if not os.path.exists(self.default_config_file): + default_tasks = { + "qsbk_daily": { + "name": "糗事百科每日爬取", + "description": "每天早上8点爬取糗事百科最新内容", + "type": "script", + "script_path": os.path.join(os.path.dirname(os.path.abspath(__file__)), "QSBK.py"), + "trigger_type": "cron", + "hour": "8", + "minute": "0", + "enabled": True, + "params": { + "pages": 3 + } + }, + "douban_weekly": { + "name": "豆瓣每周爬取", + "description": "每周一早上9点爬取豆瓣内容", + "type": "script", + "script_path": os.path.join(os.path.dirname(os.path.abspath(__file__)), "DouBan", "DouBan.py"), + "trigger_type": "cron", + "day_of_week": "1", + "hour": "9", + "minute": "0", + "enabled": True + }, + "xiaohua_daily": { + "name": "小花图片每日爬取", + "description": "每天下午2点爬取小花图片", + "type": "scrapy", + "project_path": os.path.join(os.path.dirname(os.path.abspath(__file__)), "XiaoHua"), + "spider_name": "XiaoHua", + "trigger_type": "cron", + "hour": "14", + "minute": "0", + "enabled": True + } + } + + with open(self.default_config_file, 'w', encoding='utf-8') as f: + json.dump(default_tasks, f, ensure_ascii=False, indent=2) + + def get_default_tasks(self) -> Dict[str, Any]: + """获取默认任务配置""" + try: + with open(self.default_config_file, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + print(f"读取默认任务配置失败: {str(e)}") + return {} + + def get_custom_tasks(self) -> Dict[str, Any]: + """获取自定义任务配置""" + if not os.path.exists(self.custom_config_file): + return {} + + try: + with open(self.custom_config_file, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + print(f"读取自定义任务配置失败: {str(e)}") + return {} + + def save_custom_tasks(self, tasks: Dict[str, Any]) -> bool: + """保存自定义任务配置""" + try: + with open(self.custom_config_file, 'w', encoding='utf-8') as f: + json.dump(tasks, f, ensure_ascii=False, indent=2) + return True + except Exception as e: + print(f"保存自定义任务配置失败: {str(e)}") + return False + + def export_tasks(self, tasks: Dict[str, Any], file_path: str = None) -> Optional[str]: + """ + 导出任务配置到文件 + + Args: + tasks: 要导出的任务配置 + file_path: 导出文件路径,如果不指定则自动生成 + + Returns: + 导出文件的路径,失败返回None + """ + if file_path is None: + timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') + file_path = os.path.join(self.config_dir, f"tasks_export_{timestamp}.json") + + try: + # 添加导出元数据 + export_data = { + "export_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + "export_version": "1.0", + "tasks": tasks + } + + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(export_data, f, ensure_ascii=False, indent=2) + + print(f"任务配置已导出到: {file_path}") + return file_path + except Exception as e: + print(f"导出任务配置失败: {str(e)}") + return None + + def import_tasks(self, file_path: str, merge: bool = True) -> Optional[Dict[str, Any]]: + """ + 从文件导入任务配置 + + Args: + file_path: 导入文件路径 + merge: 是否与现有配置合并,False则完全替换 + + Returns: + 导入的任务配置,失败返回None + """ + if not os.path.exists(file_path): + print(f"导入文件不存在: {file_path}") + return None + + try: + with open(file_path, 'r', encoding='utf-8') as f: + import_data = json.load(f) + + # 检查是否是导出格式的文件 + if "tasks" in import_data: + tasks = import_data["tasks"] + export_info = { + "export_time": import_data.get("export_time", "未知"), + "export_version": import_data.get("export_version", "未知") + } + print(f"导入文件信息 - 导出时间: {export_info['export_time']}, 版本: {export_info['export_version']}") + else: + # 直接是任务配置 + tasks = import_data + + if merge: + # 与现有配置合并 + custom_tasks = self.get_custom_tasks() + custom_tasks.update(tasks) + self.save_custom_tasks(custom_tasks) + print(f"已合并导入 {len(tasks)} 个任务配置") + else: + # 完全替换 + self.save_custom_tasks(tasks) + print(f"已替换导入 {len(tasks)} 个任务配置") + + return tasks + except Exception as e: + print(f"导入任务配置失败: {str(e)}") + return None + + def validate_task_config(self, task_config: Dict[str, Any]) -> List[str]: + """ + 验证任务配置的有效性 + + Args: + task_config: 任务配置 + + Returns: + 错误信息列表,空列表表示验证通过 + """ + errors = [] + + # 检查必需字段 + if "name" not in task_config: + errors.append("缺少任务名称 (name)") + + if "type" not in task_config: + errors.append("缺少任务类型 (type)") + elif task_config["type"] not in ["script", "scrapy", "flask"]: + errors.append(f"不支持的任务类型: {task_config['type']}") + + if "trigger_type" not in task_config: + errors.append("缺少触发器类型 (trigger_type)") + elif task_config["trigger_type"] not in ["cron", "interval", "date"]: + errors.append(f"不支持的触发器类型: {task_config['trigger_type']}") + + # 根据任务类型检查特定字段 + if task_config.get("type") == "script": + if "script_path" not in task_config: + errors.append("脚本类型任务缺少脚本路径 (script_path)") + elif not os.path.exists(task_config.get("script_path", "")): + errors.append(f"脚本文件不存在: {task_config.get('script_path', '')}") + + elif task_config.get("type") == "scrapy": + if "project_path" not in task_config: + errors.append("Scrapy类型任务缺少项目路径 (project_path)") + elif not os.path.exists(task_config.get("project_path", "")): + errors.append(f"Scrapy项目路径不存在: {task_config.get('project_path', '')}") + + if "spider_name" not in task_config: + errors.append("Scrapy类型任务缺少爬虫名称 (spider_name)") + + elif task_config.get("type") == "flask": + if "app_path" not in task_config: + errors.append("Flask类型任务缺少应用路径 (app_path)") + elif not os.path.exists(task_config.get("app_path", "")): + errors.append(f"Flask应用文件不存在: {task_config.get('app_path', '')}") + + if "function_name" not in task_config: + errors.append("Flask类型任务缺少函数名称 (function_name)") + + # 根据触发器类型检查特定字段 + trigger_type = task_config.get("trigger_type") + if trigger_type == "date": + if "run_date" not in task_config: + errors.append("日期触发器缺少运行日期 (run_date)") + + return errors + + def create_task_template(self, task_type: str) -> Dict[str, Any]: + """ + 创建任务配置模板 + + Args: + task_type: 任务类型 (script, scrapy, flask) + + Returns: + 任务配置模板 + """ + base_template = { + "name": "新任务", + "description": "任务描述", + "enabled": True, + "trigger_type": "cron" + } + + if task_type == "script": + return { + **base_template, + "type": "script", + "script_path": "path/to/script.py", + "minute": "0", + "hour": "0" + } + elif task_type == "scrapy": + return { + **base_template, + "type": "scrapy", + "project_path": "path/to/scrapy/project", + "spider_name": "spider_name", + "minute": "0", + "hour": "0" + } + elif task_type == "flask": + return { + **base_template, + "type": "flask", + "app_path": "path/to/app.py", + "function_name": "function_name", + "minute": "0", + "hour": "0" + } + else: + return base_template \ No newline at end of file diff --git a/task_manager_app.py b/task_manager_app.py new file mode 100644 index 0000000..6113f98 --- /dev/null +++ b/task_manager_app.py @@ -0,0 +1,187 @@ +# -*- coding: utf-8 -*- +""" +定时任务管理Web应用 +提供Web界面管理定时任务的导入导出功能 +""" +import os +import json +from flask import Flask, render_template, request, jsonify, send_file, redirect, url_for +from werkzeug.utils import secure_filename +from task_scheduler import task_scheduler +from task_config_manager import TaskConfigManager + +# 创建Flask应用 +app = Flask(__name__) +app.config['SECRET_KEY'] = 'your-secret-key-here' +app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') +app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 限制上传文件大小为16MB + +# 确保上传目录存在 +os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) + +# 初始化任务配置管理器 +config_manager = TaskConfigManager() + +@app.route('/') +def index(): + """主页 - 显示所有任务""" + tasks_status = task_scheduler.get_task_status() + return render_template('tasks.html', tasks=tasks_status) + +@app.route('/api/tasks', methods=['GET']) +def api_get_tasks(): + """获取所有任务状态API""" + tasks = task_scheduler.get_task_status() + return jsonify(tasks) + +@app.route('/api/tasks/', methods=['GET']) +def api_get_task(task_id): + """获取指定任务状态API""" + task = task_scheduler.get_task_status(task_id) + if task is None: + return jsonify({'error': '任务不存在'}), 404 + return jsonify(task) + +@app.route('/api/tasks', methods=['POST']) +def api_add_task(): + """添加任务API""" + task_data = request.json + if not task_data or 'id' not in task_data or 'config' not in task_data: + return jsonify({'error': '请求数据格式错误'}), 400 + + task_id = task_data['id'] + task_config = task_data['config'] + + # 验证任务配置 + errors = config_manager.validate_task_config(task_config) + if errors: + return jsonify({'error': '任务配置无效', 'details': errors}), 400 + + # 添加任务 + if task_scheduler.add_task(task_id, task_config): + return jsonify({'success': True, 'message': f'任务 {task_id} 添加成功'}) + else: + return jsonify({'error': '添加任务失败'}), 500 + +@app.route('/api/tasks/', methods=['PUT']) +def api_update_task(task_id): + """更新任务API""" + task_config = request.json + if not task_config: + return jsonify({'error': '请求数据格式错误'}), 400 + + # 验证任务配置 + errors = config_manager.validate_task_config(task_config) + if errors: + return jsonify({'error': '任务配置无效', 'details': errors}), 400 + + # 更新任务 + if task_scheduler.add_task(task_id, task_config): + return jsonify({'success': True, 'message': f'任务 {task_id} 更新成功'}) + else: + return jsonify({'error': '更新任务失败'}), 500 + +@app.route('/api/tasks/', methods=['DELETE']) +def api_delete_task(task_id): + """删除任务API""" + if task_scheduler.remove_task(task_id): + return jsonify({'success': True, 'message': f'任务 {task_id} 删除成功'}) + else: + return jsonify({'error': '删除任务失败'}), 500 + +@app.route('/api/tasks//run', methods=['POST']) +def api_run_task(task_id): + """立即运行任务API""" + if task_scheduler.run_task_now(task_id): + return jsonify({'success': True, 'message': f'任务 {task_id} 已开始执行'}) + else: + return jsonify({'error': '执行任务失败'}), 500 + +@app.route('/api/tasks/export', methods=['GET']) +def api_export_tasks(): + """导出任务配置API""" + # 获取当前所有任务配置 + tasks = task_scheduler.tasks + + # 导出任务配置 + file_path = config_manager.export_tasks(tasks) + if file_path: + return send_file(file_path, as_attachment=True) + else: + return jsonify({'error': '导出任务配置失败'}), 500 + +@app.route('/api/tasks/import', methods=['POST']) +def api_import_tasks(): + """导入任务配置API""" + if 'file' not in request.files: + return jsonify({'error': '没有上传文件'}), 400 + + file = request.files['file'] + if file.filename == '': + return jsonify({'error': '没有选择文件'}), 400 + + # 保存上传的文件 + filename = secure_filename(file.filename) + file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) + file.save(file_path) + + # 获取合并选项 + merge = request.form.get('merge', 'true').lower() == 'true' + + # 导入任务配置 + tasks = config_manager.import_tasks(file_path, merge) + + # 删除临时文件 + os.remove(file_path) + + if tasks is not None: + # 更新调度器中的任务 + for task_id, task_config in tasks.items(): + task_scheduler.add_task(task_id, task_config) + + return jsonify({'success': True, 'message': f'成功导入 {len(tasks)} 个任务配置'}) + else: + return jsonify({'error': '导入任务配置失败'}), 500 + +@app.route('/api/templates/') +def api_get_template(task_type): + """获取任务配置模板API""" + template = config_manager.create_task_template(task_type) + return jsonify(template) + +@app.route('/tasks/new') +def new_task(): + """新建任务页面""" + task_type = request.args.get('type', 'script') + template = config_manager.create_task_template(task_type) + return render_template('task_form.html', task=None, template=template, edit=False) + +@app.route('/tasks//edit') +def edit_task(task_id): + """编辑任务页面""" + task = task_scheduler.get_task_status(task_id) + if task is None: + return redirect(url_for('index')) + return render_template('task_form.html', task=task, template=None, edit=True) + +@app.route('/tasks/import') +def import_tasks(): + """导入任务页面""" + return render_template('import_tasks.html') + +@app.route('/tasks/export') +def export_tasks(): + """导出任务页面""" + tasks = task_scheduler.tasks + return render_template('export_tasks.html', tasks=tasks) + +if __name__ == '__main__': + # 启动任务调度器 + task_scheduler.start() + + try: + # 启动Flask应用 + app.run(host='0.0.0.0', port=5001, debug=True) + finally: + # 停止任务调度器 + task_scheduler.stop() \ No newline at end of file diff --git a/task_scheduler.py b/task_scheduler.py new file mode 100644 index 0000000..485f773 --- /dev/null +++ b/task_scheduler.py @@ -0,0 +1,379 @@ +# -*- coding: utf-8 -*- +""" +定时任务调度器 +支持调度不同类型的爬虫任务 +""" +import os +import sys +import json +import logging +import importlib +import subprocess +from datetime import datetime +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.date import DateTrigger +from flask import Flask + +# 添加项目根目录到Python路径 +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +class TaskScheduler: + """定时任务调度器""" + + def __init__(self): + self.scheduler = BackgroundScheduler() + self.tasks = {} + self.task_results = {} + self.tasks_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tasks.json') + self.logger = self._setup_logger() + self._load_tasks() + + def _setup_logger(self): + """设置日志记录器""" + logger = logging.getLogger('TaskScheduler') + logger.setLevel(logging.INFO) + + # 创建文件处理器 + file_handler = logging.FileHandler( + os.path.join(os.path.dirname(os.path.abspath(__file__)), 'scheduler.log'), + encoding='utf-8' + ) + file_handler.setLevel(logging.INFO) + + # 创建控制台处理器 + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + + # 创建格式化器 + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + + # 添加处理器 + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + return logger + + def _load_tasks(self): + """从文件加载任务配置""" + if os.path.exists(self.tasks_file): + try: + with open(self.tasks_file, 'r', encoding='utf-8') as f: + self.tasks = json.load(f) + self.logger.info(f"已加载 {len(self.tasks)} 个任务配置") + except Exception as e: + self.logger.error(f"加载任务配置失败: {str(e)}") + self.tasks = {} + else: + self.tasks = {} + + def _save_tasks(self): + """保存任务配置到文件""" + try: + with open(self.tasks_file, 'w', encoding='utf-8') as f: + json.dump(self.tasks, f, ensure_ascii=False, indent=2) + self.logger.info("任务配置已保存") + except Exception as e: + self.logger.error(f"保存任务配置失败: {str(e)}") + + def add_task(self, task_id, task_config): + """添加任务""" + if task_id in self.tasks: + self.logger.warning(f"任务 {task_id} 已存在,将被覆盖") + + self.tasks[task_id] = task_config + self._save_tasks() + + # 如果调度器已启动,添加任务到调度器 + if self.scheduler.running: + self._schedule_task(task_id, task_config) + + self.logger.info(f"已添加任务: {task_id}") + return True + + def remove_task(self, task_id): + """移除任务""" + if task_id not in self.tasks: + self.logger.warning(f"任务 {task_id} 不存在") + return False + + # 从调度器中移除 + if self.scheduler.get_job(task_id): + self.scheduler.remove_job(task_id) + + # 从配置中移除 + del self.tasks[task_id] + self._save_tasks() + + self.logger.info(f"已移除任务: {task_id}") + return True + + def _schedule_task(self, task_id, task_config): + """将任务添加到调度器""" + trigger_type = task_config.get('trigger_type', 'cron') + + try: + if trigger_type == 'cron': + # Cron表达式触发 + trigger = CronTrigger( + minute=task_config.get('minute', '*'), + hour=task_config.get('hour', '*'), + day=task_config.get('day', '*'), + month=task_config.get('month', '*'), + day_of_week=task_config.get('day_of_week', '*') + ) + elif trigger_type == 'interval': + # 间隔触发 + trigger = IntervalTrigger( + seconds=task_config.get('seconds', 0), + minutes=task_config.get('minutes', 0), + hours=task_config.get('hours', 0), + days=task_config.get('days', 0) + ) + elif trigger_type == 'date': + # 指定日期触发 + run_date = task_config.get('run_date') + if not run_date: + self.logger.error(f"任务 {task_id} 缺少 run_date 参数") + return False + trigger = DateTrigger(run_date=run_date) + else: + self.logger.error(f"任务 {task_id} 不支持的触发器类型: {trigger_type}") + return False + + # 添加任务到调度器 + self.scheduler.add_job( + func=self._execute_task, + trigger=trigger, + args=[task_id, task_config], + id=task_id, + replace_existing=True + ) + + self.logger.info(f"已调度任务: {task_id}") + return True + + except Exception as e: + self.logger.error(f"调度任务 {task_id} 失败: {str(e)}") + return False + + def _execute_task(self, task_id, task_config): + """执行任务""" + self.logger.info(f"开始执行任务: {task_id}") + start_time = datetime.now() + + try: + task_type = task_config.get('type', 'script') + + if task_type == 'script': + # 执行Python脚本 + script_path = task_config.get('script_path') + if not script_path or not os.path.exists(script_path): + raise Exception(f"脚本文件不存在: {script_path}") + + # 使用subprocess执行脚本 + result = subprocess.run( + [sys.executable, script_path], + capture_output=True, + text=True, + encoding='utf-8' + ) + + if result.returncode != 0: + raise Exception(f"脚本执行失败: {result.stderr}") + + output = result.stdout + + elif task_type == 'scrapy': + # 执行Scrapy爬虫 + project_path = task_config.get('project_path') + spider_name = task_config.get('spider_name') + + if not project_path or not spider_name: + raise Exception("Scrapy任务缺少 project_path 或 spider_name 参数") + + # 切换到Scrapy项目目录 + original_dir = os.getcwd() + os.chdir(project_path) + + try: + # 执行Scrapy命令 + result = subprocess.run( + [sys.executable, '-m', 'scrapy', 'crawl', spider_name], + capture_output=True, + text=True, + encoding='utf-8' + ) + + if result.returncode != 0: + raise Exception(f"Scrapy爬虫执行失败: {result.stderr}") + + output = result.stdout + finally: + # 恢复原始目录 + os.chdir(original_dir) + + elif task_type == 'flask': + # 调用Flask应用中的函数 + app_path = task_config.get('app_path') + function_name = task_config.get('function_name') + + if not app_path or not function_name: + raise Exception("Flask任务缺少 app_path 或 function_name 参数") + + # 动态导入模块 + module_name = os.path.splitext(os.path.basename(app_path))[0] + spec = importlib.util.spec_from_file_location(module_name, app_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # 获取函数并执行 + if not hasattr(module, function_name): + raise Exception(f"模块 {module_name} 中不存在函数 {function_name}") + + func = getattr(module, function_name) + output = func() + + else: + raise Exception(f"不支持的任务类型: {task_type}") + + # 记录任务执行结果 + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + self.task_results[task_id] = { + 'status': 'success', + 'start_time': start_time.strftime('%Y-%m-%d %H:%M:%S'), + 'end_time': end_time.strftime('%Y-%m-%d %H:%M:%S'), + 'duration': f"{duration:.2f}秒", + 'output': output[:1000] if output else "", # 限制输出长度 + 'last_run': end_time.strftime('%Y-%m-%d %H:%M:%S') + } + + self.logger.info(f"任务 {task_id} 执行成功,耗时 {duration:.2f}秒") + + except Exception as e: + # 记录任务执行失败 + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + self.task_results[task_id] = { + 'status': 'failed', + 'start_time': start_time.strftime('%Y-%m-%d %H:%M:%S'), + 'end_time': end_time.strftime('%Y-%m-%d %H:%M:%S'), + 'duration': f"{duration:.2f}秒", + 'error': str(e), + 'last_run': end_time.strftime('%Y-%m-%d %H:%M:%S') + } + + self.logger.error(f"任务 {task_id} 执行失败: {str(e)}") + + def start(self): + """启动调度器""" + if not self.scheduler.running: + # 添加所有已配置的任务 + for task_id, task_config in self.tasks.items(): + if task_config.get('enabled', True): # 只添加启用的任务 + self._schedule_task(task_id, task_config) + + self.scheduler.start() + self.logger.info("任务调度器已启动") + + def stop(self): + """停止调度器""" + if self.scheduler.running: + self.scheduler.shutdown() + self.logger.info("任务调度器已停止") + + def get_task_status(self, task_id=None): + """获取任务状态""" + if task_id: + if task_id not in self.tasks: + return None + + job = self.scheduler.get_job(task_id) + task_config = self.tasks[task_id] + task_result = self.task_results.get(task_id, {}) + + return { + 'id': task_id, + 'config': task_config, + 'scheduled': job is not None, + 'next_run': job.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if job and job.next_run_time else None, + 'result': task_result + } + else: + # 返回所有任务状态 + tasks_status = {} + for tid in self.tasks: + tasks_status[tid] = self.get_task_status(tid) + return tasks_status + + def run_task_now(self, task_id): + """立即运行指定任务""" + if task_id not in self.tasks: + self.logger.error(f"任务 {task_id} 不存在") + return False + + task_config = self.tasks[task_id] + self._execute_task(task_id, task_config) + self.logger.info(f"已手动执行任务: {task_id}") + return True + + def export_tasks(self, file_path=None): + """导出任务配置""" + if not file_path: + file_path = f"tasks_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + + try: + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(self.tasks, f, ensure_ascii=False, indent=2) + self.logger.info(f"任务配置已导出到: {file_path}") + return file_path + except Exception as e: + self.logger.error(f"导出任务配置失败: {str(e)}") + return None + + def import_tasks(self, file_path, merge=False): + """导入任务配置""" + if not os.path.exists(file_path): + self.logger.error(f"导入文件不存在: {file_path}") + return False + + try: + with open(file_path, 'r', encoding='utf-8') as f: + imported_tasks = json.load(f) + + if not merge: + # 不合并,直接替换 + self.tasks = imported_tasks + else: + # 合并导入 + self.tasks.update(imported_tasks) + + self._save_tasks() + + # 如果调度器已运行,重新调度所有任务 + if self.scheduler.running: + # 先移除所有任务 + for job in self.scheduler.get_jobs(): + self.scheduler.remove_job(job.id) + + # 重新添加所有任务 + for task_id, task_config in self.tasks.items(): + if task_config.get('enabled', True): + self._schedule_task(task_id, task_config) + + self.logger.info(f"已导入 {len(imported_tasks)} 个任务配置") + return True + + except Exception as e: + self.logger.error(f"导入任务配置失败: {str(e)}") + return False + + +# 全局调度器实例 +task_scheduler = TaskScheduler() \ No newline at end of file diff --git a/templates/base.html b/templates/base.html new file mode 100644 index 0000000..e29e4f8 --- /dev/null +++ b/templates/base.html @@ -0,0 +1,140 @@ + + + + + + {% block title %}定时任务管理系统{% endblock %} + + + + + + + + + {% block head %}{% endblock %} + + + + +
+ {% block content %}{% endblock %} +
+ +
+
+

© 2023 定时任务管理系统 - 基于Python爬虫项目

+
+
+ + + + + + + + + {% block scripts %}{% endblock %} + + \ No newline at end of file diff --git a/templates/export_tasks.html b/templates/export_tasks.html new file mode 100644 index 0000000..3ff8cc5 --- /dev/null +++ b/templates/export_tasks.html @@ -0,0 +1,244 @@ +{% extends "base.html" %} + +{% block title %} + 导出任务 - 定时任务管理系统 +{% endblock %} + +{% block content %} +
+
+
+
+

+ 导出任务 +

+
+
+
+

选择要导出的任务,系统将生成JSON格式的配置文件。

+
+ +
+
选择任务
+
+ + +
+ +
+
+
+ 加载中... +
+

加载任务列表...

+
+
+
+ +
+ + + 返回 + +
+
+
+
+
+ + + +{% endblock %} + +{% block scripts %} + +{% endblock %} \ No newline at end of file diff --git a/templates/import_tasks.html b/templates/import_tasks.html new file mode 100644 index 0000000..4a7de3c --- /dev/null +++ b/templates/import_tasks.html @@ -0,0 +1,210 @@ +{% extends "base.html" %} + +{% block title %} + 导入任务 - 定时任务管理系统 +{% endblock %} + +{% block content %} +
+
+
+
+

+ 导入任务 +

+
+
+
+

从JSON文件导入任务配置。文件应包含任务列表,每个任务包含id和config字段。

+
+ +
+ + +
请选择JSON格式的任务配置文件
+
+ +
+
+ + +
+
+ +
+ + + 返回 + +
+ + +
+
+
+
+ + + +{% endblock %} + +{% block scripts %} + +{% endblock %} \ No newline at end of file diff --git a/templates/task_detail.html b/templates/task_detail.html new file mode 100644 index 0000000..9fee594 --- /dev/null +++ b/templates/task_detail.html @@ -0,0 +1,273 @@ +{% extends "base.html" %} + +{% block title %} + {{ task.config.name }} - 任务详情 - 定时任务管理系统 +{% endblock %} + +{% block content %} +
+
+
+
+

+ 任务详情 +

+
+ + 编辑 + + +
+
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
任务名称{{ task.config.name }}
任务描述{{ task.config.get('description', '无') }}
任务类型 + {% if task.config.type == 'script' %} + Python脚本 + {% elif task.config.type == 'scrapy' %} + Scrapy爬虫 + {% elif task.config.type == 'flask' %} + Flask应用 + {% endif %} +
任务状态 + {% if task.config.enabled %} + 已启用 + {% else %} + 已禁用 + {% endif %} +
触发器类型 + {% if task.config.trigger_type == 'cron' %} + Cron表达式 + {% elif task.config.trigger_type == 'interval' %} + 间隔执行 + {% elif task.config.trigger_type == 'date' %} + 指定日期 + {% endif %} +
触发器配置 + {% if task.config.trigger_type == 'cron' %} + {{ task.config.minute }} {{ task.config.hour }} {{ task.config.day }} {{ task.config.month }} {{ task.config.day_of_week }} + {% elif task.config.trigger_type == 'interval' %} + {% set total_seconds = task.config.get('seconds', 0) + task.config.get('minutes', 0) * 60 + task.config.get('hours', 0) * 3600 %} + {% if total_seconds > 0 %} + 每 + {% if task.config.get('hours', 0) > 0 %}{{ task.config.hours }} 小时{% endif %} + {% if task.config.get('minutes', 0) > 0 %}{{ task.config.minutes }} 分钟{% endif %} + {% if task.config.get('seconds', 0) > 0 %}{{ task.config.seconds }} 秒{% endif %} + 执行一次 + {% else %} + 未配置 + {% endif %} + {% elif task.config.trigger_type == 'date' %} + {{ task.config.run_date }} + {% endif %} +
任务配置 + {% if task.config.type == 'script' %} +
脚本路径: {{ task.config.script_path }}
+ {% elif task.config.type == 'scrapy' %} +
项目路径: {{ task.config.project_path }}
+
爬虫名称: {{ task.config.spider_name }}
+ {% elif task.config.type == 'flask' %} +
应用路径: {{ task.config.app_path }}
+
函数名称: {{ task.config.function_name }}
+ {% endif %} +
+ +
+ + {% if task.config.enabled %} + + {% else %} + + {% endif %} + + 返回 + +
+
+
+
+ +
+
+
+
+ 执行历史 +
+
+
+
+
+
+ 加载中... +
+

加载执行历史...

+
+
+
+
+
+
+ + + +{% endblock %} + +{% block scripts %} + +{% endblock %} \ No newline at end of file diff --git a/templates/task_form.html b/templates/task_form.html new file mode 100644 index 0000000..e22129a --- /dev/null +++ b/templates/task_form.html @@ -0,0 +1,310 @@ +{% extends "base.html" %} + +{% block title %} + {% if edit %}编辑任务{% else %}新建任务{% endif %} - 定时任务管理系统 +{% endblock %} + +{% block content %} +
+
+
+
+

+ {% if edit %} + 编辑任务 + {% else %} + 新建任务 + {% endif %} +

+
+
+
+ + + +
+
基本信息
+
+
+ +
+ + +
+ +
+ + +
+ +
+ + +
+ +
+ + +
+ + +
+
任务配置
+
+
+ + +
+
+ + +
相对于项目根目录的路径,如: QSBK.py
+
+
+ + + + + + + + +
+
触发器配置
+
+
+ +
+ + +
+ + +
+
+
+ + +
+
+ + +
+
+ +
+
+ + +
+
+ + +
+
+ +
+ + +
+
+ + + + + + + +
+ + 返回 + + +
+
+
+
+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %} \ No newline at end of file diff --git a/templates/tasks.html b/templates/tasks.html new file mode 100644 index 0000000..bb4af9c --- /dev/null +++ b/templates/tasks.html @@ -0,0 +1,201 @@ +{% extends "base.html" %} + +{% block title %}任务列表 - 定时任务管理系统{% endblock %} + +{% block content %} + + +
+ {% for task_id, task in tasks.items() %} +
+
+
+
{{ task.config.name }}
+ + {{ '启用' if task.config.enabled else '禁用' }} + +
+
+

{{ task.config.get('description', '无描述') }}

+ +
+ 类型: + + {% if task.config.type == 'script' %} + Python脚本 + {% elif task.config.type == 'scrapy' %} + Scrapy爬虫 + {% elif task.config.type == 'flask' %} + Flask应用 + {% else %} + {{ task.config.type }} + {% endif %} + +
+ +
+ 触发器: + {% if task.config.trigger_type == 'cron' %} + Cron + + {% if task.config.hour %}{{ task.config.hour }}:{% endif %} + {{ task.config.get('minute', '0') }} + {% if task.config.day_of_week %} (周{{ task.config.day_of_week }}){% endif %} + + {% elif task.config.trigger_type == 'interval' %} + 间隔 + + {% if task.config.hours %}{{ task.config.hours }}小时{% endif %} + {% if task.config.minutes %}{{ task.config.minutes }}分钟{% endif %} + {% if task.config.seconds %}{{ task.config.seconds }}秒{% endif %} + + {% elif task.config.trigger_type == 'date' %} + 指定日期 + {{ task.config.run_date }} + {% endif %} +
+ +
+ 下次执行: + + {{ task.next_run or '未调度' }} + +
+ + {% if task.result %} +
+ 上次执行: + + {% if task.result.status == 'success' %} + 成功 + {% elif task.result.status == 'failed' %} + 失败 + {% else %} + 等待中 + {% endif %} + + ({{ task.result.last_run }}) +
+ {% endif %} +
+ +
+
+ {% endfor %} +
+ +{% if not tasks %} +
+ +

暂无任务

+

点击上方"新建任务"按钮创建您的第一个定时任务

+ + 新建任务 + +
+{% endif %} + + + +{% endblock %} + +{% block scripts %} + +{% endblock %} \ No newline at end of file