SAM-bilibili/main.py
2025-03-23 19:33:37 +08:00

375 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import re
import csv
from bs4 import BeautifulSoup
from time import sleep
from random import uniform
from fake_useragent import UserAgent
import time
def load_targets():
"""从targets.txt加载BV号列表"""
try:
with open('targets.txt', 'r', encoding='utf-8') as f:
return [line.strip() for line in f if line.strip().startswith('BV')]
except FileNotFoundError:
print("未找到targets.txt文件请创建文件并添加BV号")
return []
class BiliWebCrawler:
def __init__(self, url, cookie=None):
self.url = url
self.bvid = self._extract_bvid()
self.user_agent = UserAgent()
self.headers = {
'User-Agent': self.user_agent.random,
'Referer': 'https://www.bilibili.com/',
}
if cookie:
self.headers['Cookie'] = cookie
self.session = requests.Session()
self.aid = self._get_video_id()
def _extract_bvid(self):
"""从视频链接中提取 BVID"""
match = re.search(r'(BV[A-Za-z0-9]+)', self.url)
if match:
return match.group(1)
return None
def _get_video_id(self):
if not self.bvid:
return None
url = f'https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}'
resp = self._safe_request(url)
if resp:
data = resp.json()
return data.get('data', {}).get('aid')
return None
def _safe_request(self, url, params=None, retry=3): # 添加 params 参数
"""带重试机制的请求"""
for i in range(retry):
try:
resp = self.session.get(url, headers=self.headers, timeout=10, params=params) # 使用 params 参数
resp.raise_for_status()
return resp
except Exception as e:
print(f"请求失败: {str(e)}, 第{i + 1}次重试...")
sleep(uniform(1, 3))
return None
def get_video_info(self):
"""获取视频基本信息"""
if not self.bvid:
return None
# 获取视频基本信息
url = f'https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}'
resp = self._safe_request(url)
if not resp:
return None
data = resp.json()
video_data = data.get('data')
if not video_data:
return None
# 获取视频最高分辨率基于dimension对象
max_width = 0
max_height = 0
dimension = video_data.get('dimension', {})
width = dimension.get('width', 0)
height = dimension.get('height', 0)
rotate = dimension.get('rotate', 0)
# 处理视频旋转当rotate=1时宽高互换
if rotate == 1:
width, height = height, width
# 通过像素总量比较分辨率
if (width * height) > (max_width * max_height):
max_width = width
max_height = height
# 将分辨率格式化为 "宽x高" 的字符串
resolution_str = f"{max_width}x{max_height}" if max_width and max_height else "未知"
# 获取视频标签
tag_url = f'https://api.bilibili.com/x/web-interface/view/detail/tag?bvid={self.bvid}'
tag_resp = self._safe_request(tag_url)
tag_data = []
if tag_resp:
tag_json = tag_resp.json()
tag_data = [tag['tag_name'] for tag in tag_json.get('data', [])]
info = {
'BV号': self.bvid,
'title': video_data.get('title', ''),
'up主名称': video_data.get('owner', {}).get('name', ''), # 新增字段
'up主UID': video_data.get('owner', {}).get('mid', ''), # 新增UID字段
'播放量': video_data.get('stat', {}).get('view', 0),
'弹幕量': video_data.get('stat', {}).get('danmaku', 0),
'点赞量': video_data.get('stat', {}).get('like', 0),
'投币量': video_data.get('stat', {}).get('coin', 0),
'收藏量': video_data.get('stat', {}).get('favorite', 0),
'分享量': video_data.get('stat', {}).get('share', 0),
'评论量': video_data.get('stat', {}).get('reply', 0),
'发布时间的timestamp': video_data.get('pubdate', 0),
'发布时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(video_data.get('pubdate', 0))),
'分区': video_data.get('tname', ''),
'标签': tag_data,
'视频方向': self._get_video_orientation(video_data.get('dimension', {})),
'视频最高分辨率': resolution_str,
'视频类型': ["","自制", "转载"][video_data.get('copyright', 0)],
'视频分p数': len(video_data.get('pages', [])),
'视频总时长': self.get_video_length(video_data.get('pages', [])),
'视频封面URL': video_data.get('pic', ''),
'简介': video_data.get('desc', '').replace('\n', '\\n'),
}
return info
def get_video_length(self,pages):
"""获取视频总时长"""
length = 0
for page in pages:
length += page.get('duration', 0)
return length
def _get_video_orientation(self, dimension):
"""判断视频方向(横屏/竖屏)"""
width = dimension.get('width', 0)
height = dimension.get('height', 0)
rotate = dimension.get('rotate', 0)
# 处理视频旋转90度或270度旋转时需要交换宽高
if rotate in [1, 3]:
width, height = height, width
return "横屏" if width >= height else "竖屏"
# 在类方法中添加以下新方法(建议放在 get_video_info 方法之后)
def get_up_info(self, mid):
"""获取UP主详细信息"""
if not mid:
return None
url = f"https://api.bilibili.com/x/web-interface/card?mid={mid}&photo=false"
resp = self._safe_request(url)
if not resp:
return None
try:
data = resp.json().get('data', {})
card = data.get('card')
up_info = {
'uid': mid,
'昵称': card['name'],
'性别': card['sex'],
'头像': card['face'],
'等级': card['level_info']['current_level'],
'粉丝数': card['fans'],
'稿件数': data['archive_count'],
'获赞数': data['like_num'],
}
except Exception as e:
print(f"解析UP主数据失败: {str(e)}")
return None
try:
# 获取投稿列表
archive_url = f'https://api.bilibili.com/x/space/arc/search?mid={mid}&ps=30'
archive_resp = self._safe_request(archive_url)
if archive_resp and archive_resp.status_code == 200:
archive_data = archive_resp.json()
print(archive_data)
videos = archive_data.get('data', {}).get('list', {}).get('vlist', [])
# 计算30天前的时间戳
month_ago = time.time() - 30 * 86400
# 统计符合时间条件的视频
recent_count = sum(1 for v in videos if v.get('created') > month_ago)
up_info['近一个月投稿数'] = recent_count
except Exception as e:
print(f"获取投稿数据失败: {str(e)}")
return up_info
def get_danmaku(self):
"""获取弹幕数据"""
if not self.bvid:
print("未找到 BVID无法获取弹幕数据。")
return []
url = f"https://api.bilibili.com/x/web-interface/view?bvid={self.bvid}"
resp = self._safe_request(url)
if not resp:
return []
data = resp.json()
cid = data.get('data', {}).get('cid')
if not cid:
print("未找到 cid无法获取弹幕数据。")
return []
danmaku_url = f'https://comment.bilibili.com/{cid}.xml'
resp = self._safe_request(danmaku_url)
if not resp:
return []
danmaku = []
soup = BeautifulSoup(resp.content, 'lxml-xml')
for d in soup.find_all('d'):
attrs = d['p'].split(',')
danmaku.append({
'时间': float(attrs[0]),
'模式': attrs[1],
'颜色': f'#{int(attrs[3]):06X}',
'弹幕内容': d.text
})
return danmaku
def get_comments(self, max_pages=1000):
"""获取热门评论,包含二级评论"""
if not self.aid:
print("未找到视频 ID无法获取评论数据。")
return []
comments = []
for page in range(1, max_pages + 1):
url = f'https://api.bilibili.com/x/v2/reply?pn={page}&type=1&oid={self.aid}&sort=2'
try:
response = self._safe_request(url)
if response and response.status_code == 200:
response.encoding = 'utf-8'
data = response.json()
if data and data.get('data') and data['data'].get('replies'):
for comment in data['data']['replies']:
comment_info = {
'用户昵称': comment['member']['uname'],
'评论内容': comment['content']['message'],
'被回复用户': '',
'评论层级': '一级评论',
'性别': comment['member']['sex'],
'用户当前等级': comment['member']['level_info']['current_level'],
'点赞数量': comment['like'],
'回复时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(comment['ctime']))
}
comments.append(comment_info)
replies = self.fetch_comment_replies(comment['rpid'], comment['member']['uname'])
comments.extend(replies)
else:
# 当当前页没有评论时,跳出循环
break
except requests.RequestException as e:
print(f"请求出错: {e}")
break
# 适当调整请求间隔
sleep(1)
return comments
def fetch_comment_replies(self, comment_id, parent_user_name, max_pages=1000):
replies = []
for page in range(1, max_pages + 1):
url = f'https://api.bilibili.com/x/v2/reply/reply?oid={self.aid}&type=1&root={comment_id}&ps=10&pn={page}'
try:
response = self._safe_request(url)
if response and response.status_code == 200:
response.encoding = 'utf-8'
data = response.json()
if data and data.get('data') and data['data'].get('replies'):
for reply in data['data']['replies']:
reply_info = {
'用户昵称': reply['member']['uname'],
'评论内容': reply['content']['message'],
'被回复用户': parent_user_name,
'评论层级': '二级评论',
'性别': reply['member']['sex'],
'用户当前等级': reply['member']['level_info']['current_level'],
'点赞数量': reply['like'],
'回复时间': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reply['ctime']))
}
replies.append(reply_info)
else:
# 当当前页没有二级评论时,跳出循环
break
except requests.RequestException as e:
print(f"请求二级评论出错: {e}")
break
# 适当调整请求间隔
sleep(1)
return replies
def _parse_count(self, text):
"""统一处理数量文本"""
if '' in text:
return int(float(text.replace('', '')) * 10000)
return int(text)
def save_to_csv(self, data, filename, mode='w'):
"""保存数据到CSV"""
if not data:
return
keys = data[0].keys()
with open(filename, mode, newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=keys)
if f.tell() == 0: # 新文件写入表头
writer.writeheader()
writer.writerows(data)
def run(self):
"""执行完整流程"""
print("正在获取视频基本信息...")
video_info = self.get_video_info()
if video_info:
import os
partition = video_info.get('分区', '其他')
base_dir = os.path.join('data', partition)
video_dir = os.path.join(base_dir, self.bvid)
os.makedirs(base_dir, exist_ok=True)
os.makedirs(video_dir, exist_ok=True)
# 保存视频信息
info_csv_path = os.path.join(base_dir, 'info.csv')
self.save_to_csv([video_info], info_csv_path, mode='a')
play_count = video_info.get('播放量', 0)
video_info_filename = os.path.join(video_dir, f'{self.bvid}_{play_count}_info.csv')
self.save_to_csv([video_info], video_info_filename)
# 新增弹幕抓取
print("正在抓取弹幕数据...")
#danmaku = self.get_danmaku()
#danmaku_filename = os.path.join(video_dir, f'{self.bvid}_{len(danmaku)}_danmaku.csv')
#self.save_to_csv(danmaku, danmaku_filename)
# 新增评论抓取
print("正在抓取评论数据...")
#comments = self.get_comments()
#comments_filename = os.path.join(video_dir, f'{self.bvid}_{len(comments)}_comments.csv')
#self.save_to_csv(comments, comments_filename)
# 新增UP主信息记录
print("正在获取UP主信息...")
#up_info = self.get_up_info(video_info.get('up主UID'))
#up_info['BV号'] = self.bvid
#up_csv_path = os.path.join(base_dir, 'up_info.csv')
#self.save_to_csv([up_info], up_csv_path, mode='a')
print(f"抓取完成!结果已保存到 {video_dir}/")
else:
print("未获取到视频信息,无法进行抓取。")
if __name__ == "__main__":
# 批量处理targets.txt中的BV号
targets = load_targets()
if not targets:
print("未找到有效的BV号程序退出")
exit()
for bvid in targets:
print(f"\n{'=' * 30} 开始处理 {bvid} {'=' * 30}")
crawler = BiliWebCrawler(f"https://www.bilibili.com/video/{bvid}")
crawler.run()