SAM-bilibili/main.py
Sheyiyuan 2dec059f17 init
2025-03-18 19:13:56 +08:00

295 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import re
import csv
from bs4 import BeautifulSoup
from time import sleep
from random import uniform
from fake_useragent import UserAgent
import time
def load_targets():
"""从targets.txt加载BV号列表"""
try:
with open('targets.txt', 'r', encoding='utf-8') as f:
return [line.strip() for line in f if line.strip().startswith('BV')]
except FileNotFoundError:
print("未找到targets.txt文件请创建文件并添加BV号")
return []
class BiliWebCrawler:
def __init__(self, url, cookie=None):
self.url = url
self.bvid = self._extract_bvid()
self.user_agent = UserAgent()
self.headers = {
'User-Agent': self.user_agent.random,
'Referer': 'https://www.bilibili.com/',
}
if cookie:
self.headers['Cookie'] = cookie
self.session = requests.Session()
self.aid = self._get_video_id()
def _extract_bvid(self):
"""从视频链接中提取 BVID"""
match = re.search(r'(BV[A-Za-z0-9]+)', self.url)
if match:
return match.group(1)
return None
def _get_video_id(self):
if not self.bvid:
return None
url = f'https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}'
resp = self._safe_request(url)
if resp:
data = resp.json()
return data.get('data', {}).get('aid')
return None
def _safe_request(self, url, params=None, retry=3): # 添加 params 参数
"""带重试机制的请求"""
for i in range(retry):
try:
resp = self.session.get(url, headers=self.headers, timeout=10, params=params) # 使用 params 参数
resp.raise_for_status()
return resp
except Exception as e:
print(f"请求失败: {str(e)}, 第{i + 1}次重试...")
sleep(uniform(1, 3))
return None
def get_video_info(self):
"""获取视频基本信息"""
if not self.bvid:
return None
# 获取视频基本信息
url = f'https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}'
resp = self._safe_request(url)
if not resp:
return None
data = resp.json()
video_data = data.get('data')
if not video_data:
return None
# 获取视频最高分辨率基于dimension对象
max_width = 0
max_height = 0
for format_info in video_data.get('formats', []):
dimension = format_info.get('dimension', {})
width = dimension.get('width', 0)
height = dimension.get('height', 0)
rotate = dimension.get('rotate', 0)
# 处理视频旋转当rotate=1时宽高互换
if rotate == 1:
width, height = height, width
# 通过像素总量比较分辨率
if (width * height) > (max_width * max_height):
max_width = width
max_height = height
# 将分辨率格式化为 "宽x高" 的字符串
resolution_str = f"{max_width}x{max_height}" if max_width and max_height else "未知"
# 获取视频标签
tag_url = f'https://api.bilibili.com/x/web-interface/view/detail/tag?bvid={self.bvid}'
tag_resp = self._safe_request(tag_url)
tag_data = []
if tag_resp:
tag_json = tag_resp.json()
tag_data = [tag['tag_name'] for tag in tag_json.get('data', [])]
info = {
'title': video_data.get('title', ''),
'up主': video_data.get('owner', {}).get('name', ''),
'播放量': video_data.get('stat', {}).get('view', 0),
'弹幕量': video_data.get('stat', {}).get('danmaku', 0),
'点赞量': video_data.get('stat', {}).get('like', 0),
'投币量': video_data.get('stat', {}).get('coin', 0),
'收藏量': video_data.get('stat', {}).get('favorite', 0),
'发布时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(video_data.get('pubdate', 0))),
'分区': video_data.get('tname', ''),
'标签': tag_data,
'视频最高分辨率': resolution_str,
'视频类型': video_data.get('copyright', 0),
'视频分p数': len(video_data.get('pages', []))
}
return info
def get_danmaku(self):
"""获取弹幕数据"""
if not self.bvid:
print("未找到 BVID无法获取弹幕数据。")
return []
url = f"https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}"
resp = self._safe_request(url)
if not resp:
return []
data = resp.json()
cid = data.get('data', {}).get('cid')
if not cid:
print("未找到 cid无法获取弹幕数据。")
return []
danmaku_url = f'https://comment.bilibili.com/{cid}.xml'
resp = self._safe_request(danmaku_url)
if not resp:
return []
danmaku = []
soup = BeautifulSoup(resp.content, 'lxml-xml')
for d in soup.find_all('d'):
attrs = d['p'].split(',')
danmaku.append({
'时间': float(attrs[0]),
'模式': attrs[1],
'颜色': f'#{int(attrs[3]):06X}',
'弹幕内容': d.text
})
return danmaku
def get_comments(self, max_pages=1000):
"""获取热门评论,包含二级评论"""
if not self.aid:
print("未找到视频 ID无法获取评论数据。")
return []
comments = []
for page in range(1, max_pages + 1):
url = f'https://api.bilibili.com/x/v2/reply?pn={page}&type=1&oid={self.aid}&sort=2'
try:
response = self._safe_request(url)
if response and response.status_code == 200:
response.encoding = 'utf-8'
data = response.json()
if data and data.get('data') and data['data'].get('replies'):
for comment in data['data']['replies']:
comment_info = {
'用户昵称': comment['member']['uname'],
'评论内容': comment['content']['message'],
'被回复用户': '',
'评论层级': '一级评论',
'性别': comment['member']['sex'],
'用户当前等级': comment['member']['level_info']['current_level'],
'点赞数量': comment['like'],
'回复时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(comment['ctime']))
}
comments.append(comment_info)
replies = self.fetch_comment_replies(comment['rpid'], comment['member']['uname'])
comments.extend(replies)
else:
# 当当前页没有评论时,跳出循环
break
except requests.RequestException as e:
print(f"请求出错: {e}")
break
# 适当调整请求间隔
sleep(1)
return comments
def fetch_comment_replies(self, comment_id, parent_user_name, max_pages=1000):
replies = []
for page in range(1, max_pages + 1):
url = f'https://api.bilibili.com/x/v2/reply/reply?oid={self.aid}&type=1&root={comment_id}&ps=10&pn={page}'
try:
response = self._safe_request(url)
if response and response.status_code == 200:
response.encoding = 'utf-8'
data = response.json()
if data and data.get('data') and data['data'].get('replies'):
for reply in data['data']['replies']:
reply_info = {
'用户昵称': reply['member']['uname'],
'评论内容': reply['content']['message'],
'被回复用户': parent_user_name,
'评论层级': '二级评论',
'性别': reply['member']['sex'],
'用户当前等级': reply['member']['level_info']['current_level'],
'点赞数量': reply['like'],
'回复时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reply['ctime']))
}
replies.append(reply_info)
else:
# 当当前页没有二级评论时,跳出循环
break
except requests.RequestException as e:
print(f"请求二级评论出错: {e}")
break
# 适当调整请求间隔
sleep(1)
return replies
def _parse_count(self, text):
"""统一处理数量文本"""
if '' in text:
return int(float(text.replace('', '')) * 10000)
return int(text)
def save_to_csv(self, data, filename, mode='w'):
"""保存数据到CSV"""
if not data:
return
keys = data[0].keys()
with open(filename, mode, newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=keys)
if f.tell() == 0: # 新文件写入表头
writer.writeheader()
writer.writerows(data)
def run(self):
"""执行完整流程"""
print("正在获取视频基本信息...")
video_info = self.get_video_info()
if video_info:
import os
partition = video_info.get('分区', '其他')
base_dir = os.path.join('data', partition)
video_dir = os.path.join(base_dir, self.bvid)
os.makedirs(base_dir, exist_ok=True)
os.makedirs(video_dir, exist_ok=True)
# 保存视频信息
info_csv_path = os.path.join(base_dir, 'info.csv')
self.save_to_csv([video_info], info_csv_path, mode='a')
play_count = video_info.get('播放量', 0)
video_info_filename = os.path.join(video_dir, f'{self.bvid}_{play_count}_info.csv')
self.save_to_csv([video_info], video_info_filename)
# 新增弹幕抓取
print("正在抓取弹幕数据...")
danmaku = self.get_danmaku()
danmaku_filename = os.path.join(video_dir, f'{self.bvid}_{len(danmaku)}_danmaku.csv')
self.save_to_csv(danmaku, danmaku_filename)
# 新增评论抓取
print("正在抓取评论数据...")
comments = self.get_comments()
comments_filename = os.path.join(video_dir, f'{self.bvid}_{len(comments)}_comments.csv')
self.save_to_csv(comments, comments_filename)
print(f"抓取完成!结果已保存到 {video_dir}/")
else:
print("未获取到视频信息,无法进行抓取。")
if __name__ == "__main__":
# 批量处理targets.txt中的BV号
targets = load_targets()
if not targets:
print("未找到有效的BV号程序退出")
exit()
for bvid in targets:
print(f"\n{'=' * 30} 开始处理 {bvid} {'=' * 30}")
crawler = BiliWebCrawler(f"https://www.bilibili.com/video/{bvid}")
crawler.run()