add:covers_analyser

This commit is contained in:
Bairly 2025-04-01 19:46:25 +08:00
parent 76e27bf6a7
commit 29cadea1a4

View File

@ -5,40 +5,17 @@ import requests
from io import BytesIO from io import BytesIO
from PIL import Image from PIL import Image
import os import os
from colorthief import ColorThief
import pytesseract
from multiprocessing import Pool from multiprocessing import Pool
from cnsenti import Sentiment from sklearn.cluster import MiniBatchKMeans
import pynlpir from tqdm import tqdm
from collections import defaultdict
import warnings import warnings
warnings.filterwarnings('ignore') warnings.filterwarnings('ignore')
#设置OCR路径
pytesseract.pytesseract.tesseract_cmd = r'D:Program files\Tesseract-OCR\tesseract.exe'
# ------------------图像处理-初始化配置 --------------------- # ------------------图像处理-初始化配置 ---------------------
# 人脸检测模型初始化 # 人脸检测模型初始化
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# 图像情感模型系数基于IAPS数据集校准
VALENCE_WEIGHTS = {
'warm_ratio': 0.35,
'brightness': 0.15,
'symmetry': 0.20,
'colorfulness': 0.30
}
AROUSAL_WEIGHTS = {
'contrast': 0.40,
'edge_density': 0.35,
'saturation_std': 0.25
}
# 暖色调定义HSV色相范围
WARM_HUE_RANGE = (0, 60) # 红色到黄色
# ------------------处理图像 --------------------- # ------------------处理图像 ---------------------
def get_image(url): def get_image(url):
"""从URL获取图像并预处理""" """从URL获取图像并预处理"""
@ -51,56 +28,72 @@ def get_image(url):
return None return None
def analyze_image(img): def extract_color_palette(img_rgb, color_count=5):
"""分析图像特征""" """提取前N种主色及比例"""
if img is None: # 使用k-means聚类提取主色
return {} pixels = img_rgb.reshape(-1, 3)
kmeans = MiniBatchKMeans(n_clusters=color_count, random_state=0)
labels = kmeans.fit_predict(pixels)
# 转换为HSV颜色空间 # 计算每种颜色的比例
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) counts = np.bincount(labels)
h, s, v = cv2.split(hsv) total = counts.sum()
palette = []
for i in range(color_count):
ratio = counts[i] / total
color = kmeans.cluster_centers_[i].astype(int)
palette.append((color, ratio))
# 计算基础特征 return sorted(palette, key=lambda x: -x[1]) # 按比例降序排列
features = {
'brightness': np.mean(v),
'contrast': np.max(v) - np.min(v),
'saturation_std': np.std(s),
'colorfulness': np.std(h) + np.std(s) + np.std(v)
}
# 暖色比例计算
hue_mask = cv2.inRange(h, WARM_HUE_RANGE[0], WARM_HUE_RANGE[1])
features['warm_ratio'] = np.count_nonzero(hue_mask) / (img.shape[0] * img.shape[1])
# 对称性计算
mid = img.shape[1] // 2
left = img[:, :mid]
right = cv2.flip(img[:, mid:], 1)
features['symmetry'] = cv2.matchTemplate(left, right, cv2.TM_CCOEFF_NORMED)[0][0]
# 边缘密度
edges = cv2.Canny(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), 100, 200)
features['edge_density'] = np.mean(edges)
return features
def calculate_affect(features): def classify_hsv_color(rgb_color):
"""计算情感效价和唤醒度""" """将RGB颜色分类为暖色/冷色/中性色"""
poslm = sum(features[k] * VALENCE_WEIGHTS[k] for k in VALENCE_WEIGHTS) try:
actlm = sum(features[k] * AROUSAL_WEIGHTS[k] for k in AROUSAL_WEIGHTS) # 确保输入是有效的RGB颜色值
rgb_color = np.clip(rgb_color, 0, 255)
hsv = cv2.cvtColor(np.uint8([[rgb_color]]), cv2.COLOR_RGB2HSV)[0][0]
h, s, v = hsv[0], hsv[1] / 255.0, hsv[2] / 255.0 # 归一化
# Sigmoid归一化 # 中性色判断根据Palmer标准
return { if s < 0.2 or v < 0.2:
'Poslm': 2 / (1 + np.exp(-poslm)) - 1, return 'neutral'
'Actlm': 2 / (1 + np.exp(-actlm)) - 1
} # 色相分类
if (0 <= h < 90) or (270 <= h <= 360):
return 'warm'
return 'cool'
except Exception as e:
print(f"颜色越界: {str(e)}")
return 'neutral' # 出错时默认返回中性色
def determine_warm_tone(img_rgb):
"""返回(暖色标签, 暖色比例, 冷色比例)"""
palette = extract_color_palette(img_rgb)
warm_ratio, cool_ratio, neutral_ratio = 0.0, 0.0, 0.0
for color, ratio in palette:
category = classify_hsv_color(color)
if category == 'warm':
warm_ratio += ratio
elif category == 'cool':
cool_ratio += ratio
else:
neutral_ratio += ratio
return warm_ratio, cool_ratio, neutral_ratio
def detect_human(img): def detect_human(img):
"""检测人像"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.05, 3) # 优化参数组合
faces = face_cascade.detectMultiScale(
gray,
scaleFactor=1.02, # 减小缩放步长,增加检测粒度
minNeighbors=5, # 提高邻居数要求,减少误检
minSize=(50, 50), # 适配B站封面最小人脸尺寸
flags=cv2.CASCADE_FIND_BIGGEST_OBJECT # 优先检测最大人脸
)
return len(faces) > 0 return len(faces) > 0
@ -110,24 +103,18 @@ def process_url(url):
if img is None: if img is None:
return None return None
features = analyze_image(img) # 颜色分析
affect = calculate_affect(features) img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
warm_ratio, cool_ratio, neutral_ratio = determine_warm_tone(img_rgb)
# 主色分析
color_thief = ColorThief(BytesIO(requests.get(url).content))
dominant_color = color_thief.get_color(quality=1)
hsv_color = cv2.cvtColor(np.uint8([[dominant_color]]), cv2.COLOR_RGB2HSV)[0][0]
warm = 1 if WARM_HUE_RANGE[0] <= hsv_color[0] <= WARM_HUE_RANGE[1] else 0
# 人像检测
has_human = detect_human(img)
return { return {
'url': url, 'url': url,
**affect, 'Portrait': int(detect_human(img)),
'Warm': warm, 'WarmRatio': round(warm_ratio, 3),
'Portrait': int(has_human) 'CoolRatio': round(cool_ratio, 3),
'NeutralRatio': round(neutral_ratio, 3)
} }
except Exception as e: except Exception as e:
print(f"Error processing {url}: {str(e)}") print(f"Error processing {url}: {str(e)}")
return None return None
@ -136,41 +123,27 @@ def process_url(url):
# 批量处理 # 批量处理
def batch_process(urls, workers=4): def batch_process(urls, workers=4):
with Pool(workers) as pool: with Pool(workers) as pool:
results = [res for res in pool.imap(process_url, urls) if res is not None] results = [res for res in tqdm(pool.imap(process_url, urls),
total=len(urls),
desc="处理进度") if res is not None]
return pd.DataFrame(results) return pd.DataFrame(results)
# 使用示例 # 使用示例
if __name__ == "__main__": if __name__ == "__main__":
# 读取URL列表 # 读取URL列表
input_csv = "data_all.csv" input_csv = "data_all_first_ver.csv"
#输出路径 #输出路径
os.makedirs('./result', exist_ok=True) os.makedirs('./result', exist_ok=True)
output_csv = "result/analysis_results.csv" output_csv = "result/analysis_results.csv"
##完整运行 #完整运行
# df = pd.read_csv(input_csv) df = pd.read_csv(input_csv)
# urls = df['视频封面'].tolist() urls = df['视频封面URL'].tolist()
#
# # 执行分析
# result_df = batch_process(urls)
#
# # 合并原始数据
# final_df = df.merge(result_df, left_on='视频封面')
# final_df.drop('url', axis=1).to_csv(output_csv, index=False)
# 示例URL列表 # 执行分析
#小批量实验
urls = [
'http://i0.hdslb.com/bfs/archive/393a8e961b704d43256fe7e6c89fee04df966e17.jpg',
'http://i0.hdslb.com/bfs/archive/072e16a1237040941f15b1ed67a8d1ebe6f2e041.jpg',
'http://i2.hdslb.com/bfs/archive/1c56b5bec767c604175983cc5926f5832baa9bb8.jpg',
'http://i0.hdslb.com/bfs/archive/66384e53a15345a539ccbb2989442f1d960b9235.jpg',
'http://i2.hdslb.com/bfs/archive/836b762456f0b4d65dd2c40fc4cd120107e46b88.jpg',
]
result_df = batch_process(urls) result_df = batch_process(urls)
result_df.to_csv("result/analysis_results.csv", index=False) result_df.to_csv(output_csv, index=False, encoding='utf-8-sig')
print(f"成功处理 {len(result_df)}/{len(urls)} 张图片") print(f"成功处理 {len(result_df)}/{len(urls)} 张图片")
print("分析完成!结果已保存至", output_csv) print("分析完成!结果已保存至", output_csv)