import requests import re import csv from bs4 import BeautifulSoup from time import sleep from random import uniform from fake_useragent import UserAgent import time def load_targets(): """从targets.txt加载BV号列表""" try: with open('targets.txt', 'r', encoding='utf-8') as f: return [line.strip() for line in f if line.strip().startswith('BV')] except FileNotFoundError: print("未找到targets.txt文件,请创建文件并添加BV号") return [] class BiliWebCrawler: def __init__(self, url, cookie=None): self.url = url self.bvid = self._extract_bvid() self.user_agent = UserAgent() self.headers = { 'User-Agent': self.user_agent.random, 'Referer': 'https://www.bilibili.com/', } if cookie: self.headers['Cookie'] = cookie self.session = requests.Session() self.aid = self._get_video_id() def _extract_bvid(self): """从视频链接中提取 BVID""" match = re.search(r'(BV[A-Za-z0-9]+)', self.url) if match: return match.group(1) return None def _get_video_id(self): if not self.bvid: return None url = f'https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}' resp = self._safe_request(url) if resp: data = resp.json() return data.get('data', {}).get('aid') return None def _safe_request(self, url, params=None, retry=3): # 添加 params 参数 """带重试机制的请求""" for i in range(retry): try: resp = self.session.get(url, headers=self.headers, timeout=10, params=params) # 使用 params 参数 resp.raise_for_status() return resp except Exception as e: print(f"请求失败: {str(e)}, 第{i + 1}次重试...") sleep(uniform(1, 3)) return None def get_video_info(self): """获取视频基本信息""" if not self.bvid: return None # 获取视频基本信息 url = f'https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}' resp = self._safe_request(url) if not resp: return None data = resp.json() video_data = data.get('data') if not video_data: return None # 获取视频最高分辨率(基于dimension对象) max_width = 0 max_height = 0 for format_info in video_data.get('formats', []): dimension = format_info.get('dimension', {}) width = dimension.get('width', 0) height = dimension.get('height', 0) rotate = dimension.get('rotate', 0) # 处理视频旋转(当rotate=1时宽高互换) if rotate == 1: width, height = height, width # 通过像素总量比较分辨率 if (width * height) > (max_width * max_height): max_width = width max_height = height # 将分辨率格式化为 "宽x高" 的字符串 resolution_str = f"{max_width}x{max_height}" if max_width and max_height else "未知" # 获取视频标签 tag_url = f'https://api.bilibili.com/x/web-interface/view/detail/tag?bvid={self.bvid}' tag_resp = self._safe_request(tag_url) tag_data = [] if tag_resp: tag_json = tag_resp.json() tag_data = [tag['tag_name'] for tag in tag_json.get('data', [])] info = { 'title': video_data.get('title', ''), 'up主': video_data.get('owner', {}).get('name', ''), '播放量': video_data.get('stat', {}).get('view', 0), '弹幕量': video_data.get('stat', {}).get('danmaku', 0), '点赞量': video_data.get('stat', {}).get('like', 0), '投币量': video_data.get('stat', {}).get('coin', 0), '收藏量': video_data.get('stat', {}).get('favorite', 0), '发布时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(video_data.get('pubdate', 0))), '分区': video_data.get('tname', ''), '标签': tag_data, '视频最高分辨率': resolution_str, '视频类型': video_data.get('copyright', 0), '视频分p数': len(video_data.get('pages', [])) } return info def get_danmaku(self): """获取弹幕数据""" if not self.bvid: print("未找到 BVID,无法获取弹幕数据。") return [] url = f"https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}" resp = self._safe_request(url) if not resp: return [] data = resp.json() cid = data.get('data', {}).get('cid') if not cid: print("未找到 cid,无法获取弹幕数据。") return [] danmaku_url = f'https://comment.bilibili.com/{cid}.xml' resp = self._safe_request(danmaku_url) if not resp: return [] danmaku = [] soup = BeautifulSoup(resp.content, 'lxml-xml') for d in soup.find_all('d'): attrs = d['p'].split(',') danmaku.append({ '时间': float(attrs[0]), '模式': attrs[1], '颜色': f'#{int(attrs[3]):06X}', '弹幕内容': d.text }) return danmaku def get_comments(self, max_pages=1000): """获取热门评论,包含二级评论""" if not self.aid: print("未找到视频 ID,无法获取评论数据。") return [] comments = [] for page in range(1, max_pages + 1): url = f'https://api.bilibili.com/x/v2/reply?pn={page}&type=1&oid={self.aid}&sort=2' try: response = self._safe_request(url) if response and response.status_code == 200: response.encoding = 'utf-8' data = response.json() if data and data.get('data') and data['data'].get('replies'): for comment in data['data']['replies']: comment_info = { '用户昵称': comment['member']['uname'], '评论内容': comment['content']['message'], '被回复用户': '', '评论层级': '一级评论', '性别': comment['member']['sex'], '用户当前等级': comment['member']['level_info']['current_level'], '点赞数量': comment['like'], '回复时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(comment['ctime'])) } comments.append(comment_info) replies = self.fetch_comment_replies(comment['rpid'], comment['member']['uname']) comments.extend(replies) else: # 当当前页没有评论时,跳出循环 break except requests.RequestException as e: print(f"请求出错: {e}") break # 适当调整请求间隔 sleep(1) return comments def fetch_comment_replies(self, comment_id, parent_user_name, max_pages=1000): replies = [] for page in range(1, max_pages + 1): url = f'https://api.bilibili.com/x/v2/reply/reply?oid={self.aid}&type=1&root={comment_id}&ps=10&pn={page}' try: response = self._safe_request(url) if response and response.status_code == 200: response.encoding = 'utf-8' data = response.json() if data and data.get('data') and data['data'].get('replies'): for reply in data['data']['replies']: reply_info = { '用户昵称': reply['member']['uname'], '评论内容': reply['content']['message'], '被回复用户': parent_user_name, '评论层级': '二级评论', '性别': reply['member']['sex'], '用户当前等级': reply['member']['level_info']['current_level'], '点赞数量': reply['like'], '回复时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reply['ctime'])) } replies.append(reply_info) else: # 当当前页没有二级评论时,跳出循环 break except requests.RequestException as e: print(f"请求二级评论出错: {e}") break # 适当调整请求间隔 sleep(1) return replies def _parse_count(self, text): """统一处理数量文本""" if '万' in text: return int(float(text.replace('万', '')) * 10000) return int(text) def save_to_csv(self, data, filename, mode='w'): """保存数据到CSV""" if not data: return keys = data[0].keys() with open(filename, mode, newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=keys) if f.tell() == 0: # 新文件写入表头 writer.writeheader() writer.writerows(data) def run(self): """执行完整流程""" print("正在获取视频基本信息...") video_info = self.get_video_info() if video_info: import os partition = video_info.get('分区', '其他') base_dir = os.path.join('data', partition) video_dir = os.path.join(base_dir, self.bvid) os.makedirs(base_dir, exist_ok=True) os.makedirs(video_dir, exist_ok=True) # 保存视频信息 info_csv_path = os.path.join(base_dir, 'info.csv') self.save_to_csv([video_info], info_csv_path, mode='a') play_count = video_info.get('播放量', 0) video_info_filename = os.path.join(video_dir, f'{self.bvid}_{play_count}_info.csv') self.save_to_csv([video_info], video_info_filename) # 新增弹幕抓取 print("正在抓取弹幕数据...") danmaku = self.get_danmaku() danmaku_filename = os.path.join(video_dir, f'{self.bvid}_{len(danmaku)}_danmaku.csv') self.save_to_csv(danmaku, danmaku_filename) # 新增评论抓取 print("正在抓取评论数据...") comments = self.get_comments() comments_filename = os.path.join(video_dir, f'{self.bvid}_{len(comments)}_comments.csv') self.save_to_csv(comments, comments_filename) print(f"抓取完成!结果已保存到 {video_dir}/") else: print("未获取到视频信息,无法进行抓取。") if __name__ == "__main__": # 批量处理targets.txt中的BV号 targets = load_targets() if not targets: print("未找到有效的BV号,程序退出") exit() for bvid in targets: print(f"\n{'=' * 30} 开始处理 {bvid} {'=' * 30}") crawler = BiliWebCrawler(f"https://www.bilibili.com/video/{bvid}") crawler.run()