import requests import re import csv from bs4 import BeautifulSoup from time import sleep from random import uniform from fake_useragent import UserAgent import datetime import time ONEBOT_HOST = "http://sheyiyuan.cn:63000" # OneBot服务地址 USER_ID = "1040843522" def send_notification(error_msg): try: # OneBot私信消息协议 url = f"{ONEBOT_HOST}/send_group_msg" data = { "group_id": USER_ID, "message": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + f"[SMA-bilibili监控告警]\n{error_msg}" } requests.post(url, json=data, timeout=5) except Exception as e: print(f"QQ通知发送失败: {e}") def send_info_log(log_msg): try: # OneBot私信消息协议 url = f"{ONEBOT_HOST}/send_group_msg" data = { "group_id": USER_ID, "message": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + f"[SMA-bilibili监控日志]\n{log_msg}" } requests.post(url, json=data, timeout=5) except Exception as e: print(f"QQ通知发送失败: {e}") def load_targets(): """从targets.txt加载BV号列表""" try: with open('targets.txt', 'r', encoding='utf-8') as f: return [line.strip() for line in f if line.strip().startswith('BV')] except FileNotFoundError: print("未找到targets.txt文件,请创建文件并添加BV号") return [] class BiliWebCrawler: def __init__(self, url, cookie=None): self.url = url self.bvid = self._extract_bvid() self.user_agent = UserAgent() self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Referer': f'https://www.bilibili.com/video/{self.bvid}', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', } if cookie: self.headers['Cookie'] = cookie self.session = requests.Session() self.aid = self._get_video_id() def _extract_bvid(self): """从视频链接中提取 BVID""" match = re.search(r'(BV[A-Za-z0-9]+)', self.url) if match: return match.group(1) return None def _get_video_id(self): if not self.bvid: return None url = f'https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}' resp = self._safe_request(url) if resp: data = resp.json() return data.get('data', {}).get('aid') return None def _safe_request(self, url, params=None, retry=3): # 添加 params 参数 """带重试机制的请求""" for i in range(retry): try: resp = self.session.get(url, headers=self.headers, timeout=10, params=params) # 使用 params 参数 resp.raise_for_status() return resp except Exception as e: print(f"请求失败: {str(e)}, 第{i + 1}次重试...") send_notification(f"请求失败: {str(e)}, 第{i + 1}次重试...") sleep(uniform(1, 3)) return None def get_video_info(self): """获取视频基本信息""" if not self.bvid: return None # 获取视频基本信息 url = f'https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}' resp = self._safe_request(url) if not resp: return None data = resp.json() video_data = data.get('data') if not video_data: return None # 获取视频最高分辨率(基于dimension对象) max_width = 0 max_height = 0 dimension = video_data.get('dimension', {}) width = dimension.get('width', 0) height = dimension.get('height', 0) rotate = dimension.get('rotate', 0) # 处理视频旋转(当rotate=1时宽高互换) if rotate == 1: width, height = height, width # 通过像素总量比较分辨率 if (width * height) > (max_width * max_height): max_width = width max_height = height # 将分辨率格式化为 "宽x高" 的字符串 resolution_str = f"{max_width}x{max_height}" if max_width and max_height else "未知" sleep(uniform(1, 2)) # 获取视频标签 tag_url = f'https://api.bilibili.com/x/web-interface/view/detail/tag?bvid={self.bvid}' tag_resp = self._safe_request(tag_url) tag_data = [] try: if tag_resp: tag_json = tag_resp.json() tag_data = [tag['tag_name'] for tag in tag_json.get('data', [])] except Exception as e: print(f"获取视频标签失败: {str(e)}") send_notification(f"获取视频{self.bvid}标签失败: {str(e)}") if tag_resp: send_info_log(f"原始响应内容: {tag_resp.text[:500]}") subtitle_data = [] try: subtitle_raw = video_data.get('subtitle', {}).get('list', []) except Exception as e: subtitle_raw = [] for subtitle in subtitle_raw: subtitle_data.append({ '语言': subtitle.get('lan_doc', ''), '字幕数量': subtitle.get('subtitles', []), '字幕URL': subtitle.get('subtitle_url', ''), }) honor_data = [] try: honors = video_data.get('honor', []) if isinstance(honors, list): # 确保是列表类型 for honor in honors: if isinstance(honor, dict): # 检查是否为字典类型 honor_data.append({ 'type': honor.get('type', 0), 'desc': honor.get('desc', '') }) # 如果honors是字符串则直接记录 elif isinstance(honors, str): honor_data.append({'desc': honors}) except Exception as e: honor_data = [] # 获取staff列表 try: staff= video_data.get('staff', []) if len(staff)>0: is_unity_up = True else: is_unity_up = False except Exception as e: is_unity_up = False info = { 'BV号': self.bvid, 'title': video_data.get('title', ''), 'up主名称': video_data.get('owner', {}).get('name', ''), # 新增字段 'up主UID': video_data.get('owner', {}).get('mid', ''), # 新增UID字段 '播放量': video_data.get('stat', {}).get('view', 0), '弹幕量': video_data.get('stat', {}).get('danmaku', 0), '点赞量': video_data.get('stat', {}).get('like', 0), '投币量': video_data.get('stat', {}).get('coin', 0), '收藏量': video_data.get('stat', {}).get('favorite', 0), '分享量': video_data.get('stat', {}).get('share', 0), '评论量': video_data.get('stat', {}).get('reply', 0), '发布时间的timestamp': video_data.get('pubdate', 0), '视频荣誉': honor_data, '发布时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(video_data.get('pubdate', 0))), '分区': video_data.get('tname', ''), '标签': tag_data, '是否为联合投稿': is_unity_up, '视频方向': self._get_video_orientation(video_data.get('dimension', {})), '视频最高分辨率': resolution_str, '视频类型': ["","自制", "转载"][video_data.get('copyright', 0)], '视频分p数': len(video_data.get('pages', [])), '视频字幕': subtitle_data, '视频总时长': self.get_video_length(video_data.get('pages', [])), '视频封面URL': video_data.get('pic', ''), '简介': video_data.get('desc', '').replace('\n', '\\n'), } return info def get_video_length(self,pages): """获取视频总时长""" length = 0 for page in pages: length += page.get('duration', 0) return length def _get_video_orientation(self, dimension): """判断视频方向(横屏/竖屏)""" width = dimension.get('width', 0) height = dimension.get('height', 0) rotate = dimension.get('rotate', 0) # 处理视频旋转(90度或270度旋转时需要交换宽高) if rotate in [1, 3]: width, height = height, width return "横屏" if width >= height else "竖屏" # 在类方法中添加以下新方法(建议放在 get_video_info 方法之后) def get_up_info(self, mid): """获取UP主详细信息""" if not mid: return None url = f"https://api.bilibili.com/x/web-interface/card?mid={mid}&photo=false" resp = self._safe_request(url) if not resp: return None try: data = resp.json().get('data', {}) card = data.get('card') up_info = { 'uid': mid, '昵称': card['name'], '性别': card['sex'], '头像': card['face'], '等级': card['level_info']['current_level'], '粉丝数': card['fans'], '稿件数': data['archive_count'], '获赞数': data['like_num'], } except Exception as e: print(f"解析UP主数据失败: {str(e)}") send_notification(f"解析UP主数据失败: {str(e)}") return None # try: # # 获取投稿列表 # archive_url = f'https://api.bilibili.com/x/space/arc/search?mid={mid}&ps=30' # sleep(1) # archive_resp = self._safe_request(archive_url) # if archive_resp and archive_resp.status_code == 200: # archive_data = archive_resp.json() # # print(archive_data) # videos = archive_data.get('data', {}).get('list', {}).get('vlist', []) # # # 计算30天前的时间戳 # month_ago = time.time() - 30 * 86400 # # 统计符合时间条件的视频 # recent_count = sum(1 for v in videos if v.get('created') > month_ago) # up_info['近一个月投稿数'] = recent_count # except Exception as e: # print(f"获取投稿数据失败: {str(e)}") # send_notification(f"获取投稿数据失败: {str(e)}") return up_info def get_danmaku(self): """获取弹幕数据""" if not self.bvid: send_notification("未找到 BVID,无法获取弹幕数据。") print("未找到 BVID,无法获取弹幕数据。") return [] url = f"https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}" resp = self._safe_request(url) if not resp: return [] data = resp.json() cid = data.get('data', {}).get('cid') if not cid: send_notification("未找到 cid,无法获取弹幕数据。") print("未找到 cid,无法获取弹幕数据。") return [] danmaku_url = f'https://comment.bilibili.com/{cid}.xml' resp = self._safe_request(danmaku_url) if not resp: return [] danmaku = [] soup = BeautifulSoup(resp.content, 'lxml-xml') for d in soup.find_all('d'): attrs = d['p'].split(',') danmaku.append({ '时间': float(attrs[0]), '模式': attrs[1], '颜色': f'#{int(attrs[3]):06X}', '弹幕内容': d.text }) return danmaku def get_comments(self, max_pages=5): """获取热门评论,仅保留一级评论""" if not self.aid: send_notification("未找到视频 ID,无法获取评论数据。") print("未找到视频 ID,无法获取评论数据。") return [] comments = [] for page in range(1, max_pages + 1): url = f'https://api.bilibili.com/x/v2/reply?pn={page}&type=1&oid={self.aid}&sort=2' try: response = self._safe_request(url) if response and response.status_code == 200: response.encoding = 'utf-8' data = response.json() if data and data.get('data') and data['data'].get('replies'): for comment in data['data']['replies']: comment_info = { '用户昵称': comment['member']['uname'], '评论内容': comment['content']['message'], '被回复用户': '', '评论层级': '一级评论', '性别': comment['member']['sex'], '用户当前等级': comment['member']['level_info']['current_level'], '点赞数量': comment['like'], '回复时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(comment['ctime'])) } comments.append(comment_info) # 移除获取二级评论的代码 else: # 当当前页没有评论时,跳出循环 break except requests.RequestException as e: send_notification(f"评论请求出错: {e}") print(f"请求出错: {e}") break # 适当调整请求间隔 sleep(1) return comments # 移除 fetch_comment_replies 方法(原方法不再需要) def _parse_count(self, text): """统一处理数量文本""" if '万' in text: return int(float(text.replace('万', '')) * 10000) return int(text) def save_to_csv(self, data, filename, mode='w'): """保存数据到CSV""" if not data: return keys = data[0].keys() with open(filename, mode, newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=keys) if f.tell() == 0: # 新文件写入表头 writer.writeheader() writer.writerows(data) def run(self): """执行完整流程""" print("正在获取视频基本信息...") video_info = self.get_video_info() if video_info: import os partition = video_info.get('分区', '其他') base_dir = os.path.join('data', partition) video_dir = os.path.join(base_dir, self.bvid) os.makedirs(base_dir, exist_ok=True) os.makedirs(video_dir, exist_ok=True) # 保存视频信息 info_csv_path = os.path.join(base_dir, 'info.csv') self.save_to_csv([video_info], info_csv_path, mode='a') play_count = video_info.get('播放量', 0) video_info_filename = os.path.join(video_dir, f'{self.bvid}_{play_count}_info.csv') self.save_to_csv([video_info], video_info_filename) sleep(1) # 新增弹幕抓取 print("正在抓取弹幕数据...") danmaku = self.get_danmaku() danmaku_filename = os.path.join(video_dir, f'{self.bvid}_{len(danmaku)}_danmaku.csv') self.save_to_csv(danmaku, danmaku_filename) sleep(1) # 新增评论抓取 print("正在抓取评论数据...") comments = self.get_comments() comments_filename = os.path.join(video_dir, f'{self.bvid}_{len(comments)}_comments.csv') self.save_to_csv(comments, comments_filename) sleep(1) # 新增UP主信息记录 print("正在获取UP主信息...") up_info = self.get_up_info(video_info.get('up主UID')) up_info['BV号'] = self.bvid up_csv_path = os.path.join(base_dir, 'up_info.csv') self.save_to_csv([up_info], up_csv_path, mode='a') print(f"抓取完成!结果已保存到 {video_dir}/") else: print("未获取到视频信息,无法进行抓取。") if __name__ == "__main__": cookie = "buvid3=669D9192-9030-AE04-8149-45A24D82CBB985872infoc; b_nut=1728112285; _uuid=BDD29A64-331010-1578-7AB2-6985DCD1EC10586028infoc; enable_web_push=DISABLE; buvid4=02E86127-F707-C9D6-1E0E-62127CDB94EA86683-024100507-5HwKLZoKiRRAzdUiyP1DtUb99jVdMKHnip8nMCxMvnyDueJx41kzeR6uEnG0C2HY; DedeUserID=399209972; DedeUserID__ckMd5=9ad9de58e979dbdf; header_theme_version=CLOSE; rpdid=0zbfAHZlOT|utNc6ahG|2XT|3w1SWYYy; buvid_fp_plain=undefined; hit-dyn-v2=1; is-2022-channel=1; LIVE_BUVID=AUTO7717291772934669; PVID=2; enable_feed_channel=ENABLE; CURRENT_QUALITY=80; fingerprint=262ed395815a48ea928a2b3cf305da95; buvid_fp=262ed395815a48ea928a2b3cf305da95; bsource=search_bing; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NDI3ODE1NzgsImlhdCI6MTc0MjUyMjMxOCwicGx0IjotMX0.yseN1sbG5qDIlo5L0mUKqRr2tCL0OqPBnSnnHUYCWtE; bili_ticket_expires=1742781518; home_feed_column=5; browser_resolution=1702-986; SESSDATA=d0449397%2C1758291057%2C666e5%2A32CjDmj_WpEXUbv2oTIls3PPvM1wODDzR-gnPMRP5gwm09UIZ7YGdhTsJ7ssNg5Tb19qESVmlUMktwdlhkTnNlM0dTU05kZlBOTERNM3JqeTVSNGFaRzdwZmtNWUpjTHhPWmdxTzJiWmdDSVZxNkpEb0VGNHJFTEdaTlJtcVhBUlMzbEZJTzdCeXNnIIEC; bili_jct=e7f4029b2be38fe915678f49aa5b36f7; sid=6samqc5x; CURRENT_FNVAL=4048; bp_t_offset_399209972=1047699216786259968; b_lsid=919AF1F2_195C5ADCCC6" # 批量处理targets.txt中的BV号 targets = load_targets() if not targets: print("未找到有效的BV号,程序退出") send_notification("未找到有效的BV号,程序退出") exit() send_info_log(f"开始批量处理 {len(targets)} 个视频") i=0 for bvid in targets: print(f"\n{'=' * 30} 开始处理 {bvid} {'=' * 30}") crawler = BiliWebCrawler(f"https://www.bilibili.com/video/{bvid}",cookie) crawler.run() i=i+1 if i%20==0: send_info_log(f"已完成 {i} 个视频,进度:{i/len(targets)*100:.2f}%") sleep(1) send_info_log(f"批量处理完成!")