SAM-bilibili/main.py

295 lines
12 KiB
Python
Raw Normal View History

2025-03-18 19:13:56 +08:00
import requests
import re
import csv
from bs4 import BeautifulSoup
from time import sleep
from random import uniform
from fake_useragent import UserAgent
import time
def load_targets():
"""从targets.txt加载BV号列表"""
try:
with open('targets.txt', 'r', encoding='utf-8') as f:
return [line.strip() for line in f if line.strip().startswith('BV')]
except FileNotFoundError:
print("未找到targets.txt文件请创建文件并添加BV号")
return []
class BiliWebCrawler:
def __init__(self, url, cookie=None):
self.url = url
self.bvid = self._extract_bvid()
self.user_agent = UserAgent()
self.headers = {
'User-Agent': self.user_agent.random,
'Referer': 'https://www.bilibili.com/',
}
if cookie:
self.headers['Cookie'] = cookie
self.session = requests.Session()
self.aid = self._get_video_id()
def _extract_bvid(self):
"""从视频链接中提取 BVID"""
match = re.search(r'(BV[A-Za-z0-9]+)', self.url)
if match:
return match.group(1)
return None
def _get_video_id(self):
if not self.bvid:
return None
url = f'https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}'
resp = self._safe_request(url)
if resp:
data = resp.json()
return data.get('data', {}).get('aid')
return None
def _safe_request(self, url, params=None, retry=3): # 添加 params 参数
"""带重试机制的请求"""
for i in range(retry):
try:
resp = self.session.get(url, headers=self.headers, timeout=10, params=params) # 使用 params 参数
resp.raise_for_status()
return resp
except Exception as e:
print(f"请求失败: {str(e)}, 第{i + 1}次重试...")
sleep(uniform(1, 3))
return None
def get_video_info(self):
"""获取视频基本信息"""
if not self.bvid:
return None
# 获取视频基本信息
url = f'https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}'
resp = self._safe_request(url)
if not resp:
return None
data = resp.json()
video_data = data.get('data')
if not video_data:
return None
# 获取视频最高分辨率基于dimension对象
max_width = 0
max_height = 0
for format_info in video_data.get('formats', []):
dimension = format_info.get('dimension', {})
width = dimension.get('width', 0)
height = dimension.get('height', 0)
rotate = dimension.get('rotate', 0)
# 处理视频旋转当rotate=1时宽高互换
if rotate == 1:
width, height = height, width
# 通过像素总量比较分辨率
if (width * height) > (max_width * max_height):
max_width = width
max_height = height
# 将分辨率格式化为 "宽x高" 的字符串
resolution_str = f"{max_width}x{max_height}" if max_width and max_height else "未知"
# 获取视频标签
tag_url = f'https://api.bilibili.com/x/web-interface/view/detail/tag?bvid={self.bvid}'
tag_resp = self._safe_request(tag_url)
tag_data = []
if tag_resp:
tag_json = tag_resp.json()
tag_data = [tag['tag_name'] for tag in tag_json.get('data', [])]
info = {
'title': video_data.get('title', ''),
'up主': video_data.get('owner', {}).get('name', ''),
'播放量': video_data.get('stat', {}).get('view', 0),
'弹幕量': video_data.get('stat', {}).get('danmaku', 0),
'点赞量': video_data.get('stat', {}).get('like', 0),
'投币量': video_data.get('stat', {}).get('coin', 0),
'收藏量': video_data.get('stat', {}).get('favorite', 0),
'发布时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(video_data.get('pubdate', 0))),
'分区': video_data.get('tname', ''),
'标签': tag_data,
'视频最高分辨率': resolution_str,
'视频类型': video_data.get('copyright', 0),
'视频分p数': len(video_data.get('pages', []))
}
return info
def get_danmaku(self):
"""获取弹幕数据"""
if not self.bvid:
print("未找到 BVID无法获取弹幕数据。")
return []
url = f"https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}"
resp = self._safe_request(url)
if not resp:
return []
data = resp.json()
cid = data.get('data', {}).get('cid')
if not cid:
print("未找到 cid无法获取弹幕数据。")
return []
danmaku_url = f'https://comment.bilibili.com/{cid}.xml'
resp = self._safe_request(danmaku_url)
if not resp:
return []
danmaku = []
soup = BeautifulSoup(resp.content, 'lxml-xml')
for d in soup.find_all('d'):
attrs = d['p'].split(',')
danmaku.append({
'时间': float(attrs[0]),
'模式': attrs[1],
'颜色': f'#{int(attrs[3]):06X}',
'弹幕内容': d.text
})
return danmaku
def get_comments(self, max_pages=1000):
"""获取热门评论,包含二级评论"""
if not self.aid:
print("未找到视频 ID无法获取评论数据。")
return []
comments = []
for page in range(1, max_pages + 1):
url = f'https://api.bilibili.com/x/v2/reply?pn={page}&type=1&oid={self.aid}&sort=2'
try:
response = self._safe_request(url)
if response and response.status_code == 200:
response.encoding = 'utf-8'
data = response.json()
if data and data.get('data') and data['data'].get('replies'):
for comment in data['data']['replies']:
comment_info = {
'用户昵称': comment['member']['uname'],
'评论内容': comment['content']['message'],
'被回复用户': '',
'评论层级': '一级评论',
'性别': comment['member']['sex'],
'用户当前等级': comment['member']['level_info']['current_level'],
'点赞数量': comment['like'],
'回复时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(comment['ctime']))
}
comments.append(comment_info)
replies = self.fetch_comment_replies(comment['rpid'], comment['member']['uname'])
comments.extend(replies)
else:
# 当当前页没有评论时,跳出循环
break
except requests.RequestException as e:
print(f"请求出错: {e}")
break
# 适当调整请求间隔
sleep(1)
return comments
def fetch_comment_replies(self, comment_id, parent_user_name, max_pages=1000):
replies = []
for page in range(1, max_pages + 1):
url = f'https://api.bilibili.com/x/v2/reply/reply?oid={self.aid}&type=1&root={comment_id}&ps=10&pn={page}'
try:
response = self._safe_request(url)
if response and response.status_code == 200:
response.encoding = 'utf-8'
data = response.json()
if data and data.get('data') and data['data'].get('replies'):
for reply in data['data']['replies']:
reply_info = {
'用户昵称': reply['member']['uname'],
'评论内容': reply['content']['message'],
'被回复用户': parent_user_name,
'评论层级': '二级评论',
'性别': reply['member']['sex'],
'用户当前等级': reply['member']['level_info']['current_level'],
'点赞数量': reply['like'],
'回复时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reply['ctime']))
}
replies.append(reply_info)
else:
# 当当前页没有二级评论时,跳出循环
break
except requests.RequestException as e:
print(f"请求二级评论出错: {e}")
break
# 适当调整请求间隔
sleep(1)
return replies
def _parse_count(self, text):
"""统一处理数量文本"""
if '' in text:
return int(float(text.replace('', '')) * 10000)
return int(text)
def save_to_csv(self, data, filename, mode='w'):
"""保存数据到CSV"""
if not data:
return
keys = data[0].keys()
with open(filename, mode, newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=keys)
if f.tell() == 0: # 新文件写入表头
writer.writeheader()
writer.writerows(data)
def run(self):
"""执行完整流程"""
print("正在获取视频基本信息...")
video_info = self.get_video_info()
if video_info:
import os
partition = video_info.get('分区', '其他')
base_dir = os.path.join('data', partition)
video_dir = os.path.join(base_dir, self.bvid)
os.makedirs(base_dir, exist_ok=True)
os.makedirs(video_dir, exist_ok=True)
# 保存视频信息
info_csv_path = os.path.join(base_dir, 'info.csv')
self.save_to_csv([video_info], info_csv_path, mode='a')
play_count = video_info.get('播放量', 0)
video_info_filename = os.path.join(video_dir, f'{self.bvid}_{play_count}_info.csv')
self.save_to_csv([video_info], video_info_filename)
# 新增弹幕抓取
print("正在抓取弹幕数据...")
danmaku = self.get_danmaku()
danmaku_filename = os.path.join(video_dir, f'{self.bvid}_{len(danmaku)}_danmaku.csv')
self.save_to_csv(danmaku, danmaku_filename)
# 新增评论抓取
print("正在抓取评论数据...")
comments = self.get_comments()
comments_filename = os.path.join(video_dir, f'{self.bvid}_{len(comments)}_comments.csv')
self.save_to_csv(comments, comments_filename)
print(f"抓取完成!结果已保存到 {video_dir}/")
else:
print("未获取到视频信息,无法进行抓取。")
if __name__ == "__main__":
# 批量处理targets.txt中的BV号
targets = load_targets()
if not targets:
print("未找到有效的BV号程序退出")
exit()
for bvid in targets:
print(f"\n{'=' * 30} 开始处理 {bvid} {'=' * 30}")
crawler = BiliWebCrawler(f"https://www.bilibili.com/video/{bvid}")
crawler.run()