statistics_model2025/RoBERTa_danmu_sentiment_analyzer.py
2025-04-02 18:50:35 +08:00

111 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
# 设置HuggingFace国内镜像源添加在文件最开头
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
# 修改导入部分
from transformers import AutoModelForSequenceClassification, AutoTokenizer # 替换为 transformers 库
import pandas as pd
import torch
import os
# 在文件开头添加导入
from tqdm import tqdm
def load_data(file_path):
"""优化后的数据加载函数"""
try:
df = pd.read_csv(file_path, usecols=['弹幕内容'], engine='python', encoding='utf-8')
print(f"调试信息 - 文件 {file_path} 包含的列名: {list(df.columns)}")
return df['弹幕内容'].dropna().astype(str).tolist()
except Exception as e:
print(f"数据加载失败: {str(e)}")
return []
# 在analyze_sentiment函数中添加模型路径处理
def analyze_sentiment(texts):
"""改进的情感分析函数"""
try:
# 如果弹幕数量超过500均匀抽样
if len(texts) > 500:
step = len(texts) // 500
texts = texts[::step][:500]
# 使用 HuggingFace 的模型
model_path = "IDEA-CCNL/Erlangshen-Roberta-330M-Sentiment"
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)
# 将模型移动到GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# 批量处理提升效率
inputs = tokenizer(texts, padding=True, truncation=True, max_length=128, return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()} # 将输入数据也移动到GPU
with torch.no_grad():
outputs = model(**inputs)
# 调整概率计算方式
probs = torch.softmax(outputs.logits, dim=1)
return probs[:, 1].mean().item()
except Exception as e:
print(f"情感分析失败: {str(e)}")
return 0.5 # 错误时返回中性值
def process_all_partitions(base_path):
# 获取所有分区目录
partitions = [d for d in os.listdir(base_path)
if os.path.isdir(os.path.join(base_path, d))]
for partition in partitions:
partition_path = os.path.join(base_path, partition)
print(f"正在处理分区: {partition}")
process_partition(partition_path)
# process_partition函数
def process_partition(partition_path):
info_file = os.path.join(partition_path, 'info.csv')
if not os.path.exists(info_file):
print(f"未找到info文件: {info_file}")
return
info_df = pd.read_csv(info_file, encoding='utf-8')
scores = [None] * len(info_df)
# 添加进度条
with tqdm(total=len(info_df), desc=f"处理分区 {os.path.basename(partition_path)}") as pbar:
for idx, bv in enumerate(info_df['BV号']):
danmu_dir = os.path.join(partition_path, bv)
if not os.path.exists(danmu_dir):
pbar.update(1)
continue
danmu_files = [f for f in os.listdir(danmu_dir)
if f.startswith(bv) and f.endswith('danmaku.csv')]
if not danmu_files:
pbar.update(1)
continue
danmu_file = os.path.join(danmu_dir, danmu_files[0])
danmu_texts = load_data(danmu_file)
if not danmu_texts:
pbar.update(1)
continue
scores[idx] = analyze_sentiment(danmu_texts)
pbar.update(1)
pbar.set_postfix({'当前BV号': bv, '评分': scores[idx]})
info_df['弹幕情感评分RoBERTa'] = scores
info_df.to_csv(info_file, index=False, encoding='utf-8-sig')
# 使用示例 - 处理所有分区
#process_all_partitions("hot_data")
process_all_partitions("nohot_data")
while True :
pass