import cv2 import numpy as np import pandas as pd import requests from io import BytesIO from PIL import Image import os from multiprocessing import Pool from sklearn.cluster import MiniBatchKMeans from tqdm import tqdm import warnings warnings.filterwarnings('ignore') # ------------------图像处理-初始化配置 --------------------- # 人脸检测模型初始化 face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') # ------------------处理图像 --------------------- def get_image(url): """从URL获取图像并预处理""" try: response = requests.get(url, timeout=10) img = Image.open(BytesIO(response.content)) return cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) except Exception as e: print(f"Error loading {url}: {str(e)}") return None def extract_color_palette(img_rgb, color_count=5): """提取前N种主色及比例""" # 使用k-means聚类提取主色 pixels = img_rgb.reshape(-1, 3) kmeans = MiniBatchKMeans(n_clusters=color_count, random_state=0) labels = kmeans.fit_predict(pixels) # 计算每种颜色的比例 counts = np.bincount(labels) total = counts.sum() palette = [] for i in range(color_count): ratio = counts[i] / total color = kmeans.cluster_centers_[i].astype(int) palette.append((color, ratio)) return sorted(palette, key=lambda x: -x[1]) # 按比例降序排列 def classify_hsv_color(rgb_color): """将RGB颜色分类为暖色/冷色/中性色""" try: # 确保输入是有效的RGB颜色值 rgb_color = np.clip(rgb_color, 0, 255) hsv = cv2.cvtColor(np.uint8([[rgb_color]]), cv2.COLOR_RGB2HSV)[0][0] h, s, v = hsv[0], hsv[1] / 255.0, hsv[2] / 255.0 # 归一化 # 中性色判断(根据Palmer标准) if s < 0.2 or v < 0.2: return 'neutral' # 色相分类 if (0 <= h < 90) or (270 <= h <= 360): return 'warm' return 'cool' except Exception as e: print(f"颜色越界: {str(e)}") return 'neutral' # 出错时默认返回中性色 def determine_warm_tone(img_rgb): """返回(暖色标签, 暖色比例, 冷色比例)""" palette = extract_color_palette(img_rgb) warm_ratio, cool_ratio, neutral_ratio = 0.0, 0.0, 0.0 for color, ratio in palette: category = classify_hsv_color(color) if category == 'warm': warm_ratio += ratio elif category == 'cool': cool_ratio += ratio else: neutral_ratio += ratio return warm_ratio, cool_ratio, neutral_ratio def detect_human(img): gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # 优化参数组合 faces = face_cascade.detectMultiScale( gray, scaleFactor=1.02, # 减小缩放步长,增加检测粒度 minNeighbors=5, # 提高邻居数要求,减少误检 minSize=(50, 50), # 适配B站封面最小人脸尺寸 flags=cv2.CASCADE_FIND_BIGGEST_OBJECT # 优先检测最大人脸 ) return len(faces) > 0 def process_url(url): try: img = get_image(url) if img is None: return None # 颜色分析 img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) warm_ratio, cool_ratio, neutral_ratio = determine_warm_tone(img_rgb) return { 'url': url, 'Portrait': int(detect_human(img)), 'WarmRatio': round(warm_ratio, 3), 'CoolRatio': round(cool_ratio, 3), 'NeutralRatio': round(neutral_ratio, 3) } except Exception as e: print(f"Error processing {url}: {str(e)}") return None # 批量处理 def batch_process(urls, workers=4): # 创建包含所有URL的初始结果列表 results = [{'url': url, 'success': False} for url in urls] with Pool(workers) as pool: processed = list(tqdm(pool.imap(process_url, urls), total=len(urls), desc="处理进度")) # 按原始顺序更新成功处理的结果 for i, res in enumerate(processed): if res is not None: results[i] = res results[i]['success'] = True return pd.DataFrame(results) # 使用示例 if __name__ == "__main__": # # 读取URL列表 # input_csv = "data_all_first_ver.csv" # #输出路径 # os.makedirs('./result', exist_ok=True) # output_csv = "result/analysis_results.csv" # # #完整运行 # df = pd.read_csv(input_csv) # urls = df['视频封面URL'].tolist() # # # 执行分析 # result_df = batch_process(urls) # result_df.to_csv(output_csv, index=False, encoding='utf-8-sig') # print(f"成功处理 {len(result_df)}/{len(urls)} 张图片") # print("分析完成!结果已保存至", output_csv) #重新执行失败的url urls_failed=[ 'http://i1.hdslb.com/bfs/archive/5c42e0fa42ec945106d2e167253889e8a05541c9.jpg', 'http://i1.hdslb.com/bfs/archive/d2ca3e3f4c543245715937bf643e98b55badcc21.jpg', 'http://i0.hdslb.com/bfs/archive/2b1cf64d70bf2036793e33b2de3067344a7ff77d.jpg', 'http://i0.hdslb.com/bfs/archive/123ddc4cdf429968fa416f78f4049a728e8da3ab.jpg', 'http://i2.hdslb.com/bfs/archive/b07446d2176cec63d42f204504f4cda7a940b05b.jpg', ] result_failed = batch_process(urls_failed) result_failed.to_csv('result/reanalyze.csv', index=False, encoding='utf-8-sig')