diff --git a/tieba.py b/tieba.py index 04dbd40..e88aa76 100644 --- a/tieba.py +++ b/tieba.py @@ -164,4 +164,478 @@ def getAllPage(self,Num): spider.getAllPage(Num) +# #!/usr/bin/env python +# # -*- coding: utf-8 -*- +# """ +# 百度贴吧爬虫工具 +# 功能:爬取贴吧帖子内容、图片,并保存到本地 +# 作者:WYY +# 日期:2017.03.14 +# 优化日期:2024.01.20 +# """ + +# import urllib2 +# import requests +# import re +# import os +# import time +# import random +# import logging +# from bs4 import BeautifulSoup +# from typing import List, Tuple, Optional, Dict, Any + + +# class Tool: +# """ +# 数据清洗工具类 +# 用于清洗HTML标签和格式化文本内容 +# """ + +# def __init__(self): +# # 定义各种正则表达式模式用于清洗数据 +# self.removeImg = re.compile(r'| {1,7}| ') # 去除img标签,1-7位空格,  +# self.removeAddr = re.compile(r'|') # 删除超链接标签 +# self.replaceLine = re.compile(r'|
|
|

') # 把换行的标签换为\n +# self.replaceTD = re.compile(r'') # 把表格制表换为\t +# self.replaceBR = re.compile(r'

|
|
|

') # 把换行符或者双换行符换为\n +# self.removeExtraTag = re.compile(r'<.*?>') # 把其余标签剔除 +# self.removeNoneLine = re.compile(r'\n+') # 把多余空行删除 + +# def replace(self, text: str) -> str: +# """ +# 清洗HTML文本 + +# Args: +# text: 待清洗的HTML文本 + +# Returns: +# 清洗后的纯文本 +# """ +# if not text: +# return "" + +# text = re.sub(self.removeImg, "", text) +# text = re.sub(self.removeAddr, "", text) +# text = re.sub(self.replaceLine, "\n", text) +# text = re.sub(self.replaceTD, "\t", text) +# text = re.sub(self.replaceBR, "\n", text) +# text = re.sub(self.removeExtraTag, "", text) +# text = re.sub(self.removeNoneLine, "\n", text) +# return text.strip() + + +# class Spider: +# """ +# 贴吧爬虫主类 +# 负责爬取帖子内容、图片并保存到本地 +# """ + +# def __init__(self, base_path: str = None, max_retries: int = 3): +# """ +# 初始化爬虫 + +# Args: +# base_path: 文件保存的基础路径 +# max_retries: 最大重试次数 +# """ +# self.tool = Tool() +# self.session = requests.Session() +# self.max_retries = max_retries + +# # 设置基础保存路径 +# if base_path is None: +# self.base_path = os.path.join(os.path.expanduser('~'), 'TiebaSpider') +# else: +# self.base_path = base_path + +# # 创建基础目录 +# if not os.path.exists(self.base_path): +# os.makedirs(self.base_path) + +# # 配置日志 +# self._setup_logging() + +# # User-Agent列表 +# self.user_agents = [ +# 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', +# 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36', +# 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15', +# 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0', +# 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36' +# ] + +# logging.info(f"爬虫初始化完成,文件将保存到: {self.base_path}") + +# def _setup_logging(self): +# """配置日志系统""" +# log_path = os.path.join(self.base_path, 'spider.log') +# logging.basicConfig( +# level=logging.INFO, +# format='%(asctime)s - %(levelname)s - %(message)s', +# handlers=[ +# logging.FileHandler(log_path, encoding='utf-8'), +# logging.StreamHandler() +# ] +# ) + +# def _get_random_delay(self) -> float: +# """获取随机延迟时间""" +# return random.uniform(0.5, 2.0) + +# def get_source(self, url: str) -> Optional[str]: +# """ +# 获取网页源码 + +# Args: +# url: 目标URL + +# Returns: +# 网页源码文本,失败返回None +# """ +# for attempt in range(self.max_retries): +# try: +# # 随机选择User-Agent +# user_agent = random.choice(self.user_agents) +# headers = { +# 'User-Agent': user_agent, +# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', +# 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', +# 'Accept-Encoding': 'gzip, deflate', +# 'Connection': 'keep-alive', +# 'Upgrade-Insecure-Requests': '1', +# } + +# response = self.session.get(url, headers=headers, timeout=10) +# response.raise_for_status() +# response.encoding = 'utf-8' + +# logging.info(f"成功获取页面: {url}") +# return response.text + +# except requests.exceptions.RequestException as e: +# logging.warning(f"第{attempt + 1}次获取页面失败: {url}, 错误: {e}") +# if attempt < self.max_retries - 1: +# delay = self._get_random_delay() * (attempt + 1) +# logging.info(f"等待{delay:.2f}秒后重试...") +# time.sleep(delay) +# else: +# logging.error(f"获取页面失败,已达到最大重试次数: {url}") +# return None + +# def get_title(self, url: str) -> Optional[str]: +# """ +# 获取帖子标题 + +# Args: +# url: 帖子URL + +# Returns: +# 帖子标题,失败返回None +# """ +# result = self.get_source(url) +# if not result: +# return None + +# try: +# pattern = re.compile(r'', re.S) +# match = re.search(pattern, result) +# if match: +# title = self.tool.replace(match.group(1)) +# logging.info(f"帖子标题: {title}") +# return title +# else: +# logging.warning("未找到帖子标题") +# return None +# except Exception as e: +# logging.error(f"解析帖子标题失败: {e}") +# return None + +# def get_page_number(self, url: str) -> int: +# """ +# 获取帖子总页数 + +# Args: +# url: 帖子URL + +# Returns: +# 总页数,失败返回1 +# """ +# result = self.get_source(url) +# if not result: +# return 1 + +# try: +# # 尝试多种方式获取页数 +# patterns = [ +# re.compile(r'(\d+)', re.S), +# re.compile(r'共(\d+)页', re.S), +# re.compile(r'">(\d+) List[Tuple[str, str]]: +# """ +# 获取帖子评论内容 + +# Args: +# url: 帖子页面URL + +# Returns: +# 包含(楼主, 内容)的元组列表 +# """ +# result = self.get_source(url) +# if not result: +# return [] + +# try: +# pattern = re.compile( +# r'(.*?).*?' +# r'
List[str]: +# """ +# 获取帖子中的图片链接 + +# Args: +# url: 帖子页面URL + +# Returns: +# 图片URL列表 +# """ +# result = self.get_source(url) +# if not result: +# return [] + +# try: +# soup = BeautifulSoup(result, 'lxml') +# # 查找所有图片标签 +# img_tags = soup.find_all('img', class_="BDE_Image") + +# images = [] +# for img in img_tags: +# if img.get('src'): +# images.append(img['src']) + +# logging.info(f"发现 {len(images)} 张图片") +# return images + +# except Exception as e: +# logging.error(f"解析图片链接失败: {e}") +# return [] + +# def make_dir(self, dir_name: str) -> bool: +# """ +# 创建目录 + +# Args: +# dir_name: 目录名称 + +# Returns: +# 创建成功返回True,否则返回False +# """ +# try: +# dir_path = os.path.join(self.base_path, dir_name.strip()) + +# if not os.path.exists(dir_path): +# os.makedirs(dir_path) +# logging.info(f"创建目录: {dir_path}") +# return True +# else: +# logging.info(f"目录已存在: {dir_path}") +# return True + +# except Exception as e: +# logging.error(f"创建目录失败: {e}") +# return False + +# def save_image(self, image_url: str, save_path: str) -> bool: +# """ +# 保存图片到本地 + +# Args: +# image_url: 图片URL +# save_path: 保存路径 + +# Returns: +# 保存成功返回True,否则返回False +# """ +# for attempt in range(self.max_retries): +# try: +# headers = {'User-Agent': random.choice(self.user_agents)} +# response = self.session.get(image_url, headers=headers, timeout=15) +# response.raise_for_status() + +# with open(save_path, 'wb') as f: +# f.write(response.content) + +# logging.info(f"成功保存图片: {save_path}") +# return True + +# except Exception as e: +# logging.warning(f"第{attempt + 1}次保存图片失败: {image_url}, 错误: {e}") +# if attempt < self.max_retries - 1: +# time.sleep(self._get_random_delay()) + +# logging.error(f"保存图片失败,已达到最大重试次数: {image_url}") +# return False + +# def save_content_to_file(self, contents: List[Tuple[str, str]], file_path: str): +# """ +# 保存文本内容到文件 + +# Args: +# contents: 内容列表 +# file_path: 文件保存路径 +# """ +# try: +# with open(file_path, 'w', encoding='utf-8') as f: +# f.write("贴吧帖子内容\n") +# f.write("=" * 50 + "\n\n") + +# for i, (author, content) in enumerate(contents, 1): +# f.write(f"第{i}楼\n") +# f.write(f"作者: {author}\n") +# f.write(f"内容: {content}\n") +# f.write("-" * 30 + "\n\n") + +# logging.info(f"成功保存文本内容到: {file_path}") +# except Exception as e: +# logging.error(f"保存文本内容失败: {e}") + +# def get_all_page(self, post_id: int, start_page: int = 1, end_page: int = None): +# """ +# 获取帖子的所有内容 + +# Args: +# post_id: 帖子ID +# start_page: 开始页码 +# end_page: 结束页码 +# """ +# base_url = f'https://tieba.baidu.com/p/{post_id}' + +# logging.info(f"开始爬取帖子: {post_id}") + +# # 获取帖子标题 +# title = self.get_title(base_url) +# if title: +# # 创建帖子主目录 +# safe_title = re.sub(r'[\\/*?:"<>|]', "", title)[:50] # 清理非法字符并截断 +# post_dir = f"{post_id}_{safe_title}" +# self.make_dir(post_dir) + +# # 获取总页数 +# total_pages = self.get_page_number(base_url) + +# if end_page is None or end_page > total_pages: +# end_page = total_pages + +# logging.info(f"计划爬取第{start_page}页到第{end_page}页,共{end_page - start_page + 1}页") + +# for page in range(start_page, end_page + 1): +# logging.info(f"正在处理第{page}页...") + +# # 构建页面URL +# page_url = f'{base_url}?pn={page}' + +# # 随机延迟 +# time.sleep(self._get_random_delay()) + +# # 获取评论内容 +# contents = self.get_content(page_url) + +# # 获取图片 +# images = self.get_images(page_url) + +# # 创建页面目录 +# page_dir_name = f"page_{page}" +# page_dir_path = os.path.join(self.base_path, post_dir, page_dir_name) +# self.make_dir(page_dir_path) + +# # 保存文本内容 +# if contents: +# content_file = os.path.join(page_dir_path, f"content_page_{page}.txt") +# self.save_content_to_file(contents, content_file) + +# # 保存图片 +# if images: +# logging.info(f"开始保存第{page}页的{len(images)}张图片...") +# for i, img_url in enumerate(images, 1): +# img_name = f"image_{i}.jpg" +# img_path = os.path.join(page_dir_path, img_name) +# self.save_image(img_url, img_path) + +# # 图片下载间隔 +# time.sleep(self._get_random_delay() / 2) + +# logging.info(f"第{page}页处理完成") + +# logging.info(f"帖子 {post_id} 爬取完成!") + + +# def main(): +# """主函数""" +# print("=" * 50) +# print("百度贴吧爬虫工具") +# print("=" * 50) + +# try: +# # 获取用户输入 +# post_id = input("请输入贴吧帖子ID: ").strip() +# if not post_id.isdigit(): +# print("错误:帖子ID必须是数字!") +# return + +# start_page = input("请输入开始页码(默认为1): ").strip() +# start_page = int(start_page) if start_page.isdigit() else 1 + +# end_page = input("请输入结束页码(默认为全部): ").strip() +# end_page = int(end_page) if end_page.isdigit() else None + +# # 创建爬虫实例 +# spider = Spider() + +# # 开始爬取 +# spider.get_all_page(int(post_id), start_page, end_page) + +# print("\n爬取完成!") +# print(f"文件保存在: {spider.base_path}") + +# except KeyboardInterrupt: +# print("\n用户中断操作") +# except Exception as e: +# print(f"程序执行出错: {e}") + + +# if __name__ == '__main__': +# main() +