import pandas as pd import numpy as np from snownlp import SnowNLP import os def load_data(file_path): try: df = pd.read_csv(file_path, usecols=['弹幕内容'], engine='python') return df['弹幕内容'].dropna().astype(str).tolist() except Exception as e: print(f"数据加载失败: {str(e)}") return [] def analyze_sentiment(danmu_texts): sentiment_scores = [] for item in danmu_texts: s=SnowNLP(item) sentiment_scores.append(s.sentiments) avg_score = np.mean(sentiment_scores) return avg_score def process_all_partitions(base_path): # 获取所有分区目录 partitions = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))] for partition in partitions: partition_path = os.path.join(base_path, partition) print(f"正在处理分区: {partition}") process_partition(partition_path) # process_partition函数 def process_partition(partition_path): info_file = os.path.join(partition_path, 'info.csv') if not os.path.exists(info_file): print(f"未找到info文件: {info_file}") return info_df = pd.read_csv(info_file, encoding='utf-8') # 创建与info_df行数相同的空列表,初始值为None scores = [None] * len(info_df) for idx, bv in enumerate(info_df['BV号']): # 构建弹幕文件目录路径 danmu_dir = os.path.join(partition_path, bv) if not os.path.exists(danmu_dir): continue # 保持None值 # 查找匹配的弹幕文件 danmu_files = [f for f in os.listdir(danmu_dir) if f.startswith(bv) and f.endswith('danmaku.csv')] if not danmu_files: continue # 保持None值 danmu_file = os.path.join(danmu_dir, danmu_files[0]) danmu_texts = load_data(danmu_file) if not danmu_texts: continue # 保持None值 # 将结果放入对应的索引位置 scores[idx] = analyze_sentiment(danmu_texts) info_df['弹幕情感评分snowNLP'] = scores info_df.to_csv(info_file, index=False, encoding='utf-8-sig') # 使用示例 - 处理所有分区 process_all_partitions("hot_data") process_all_partitions("nohot_data")