#!/usr/bin/env python import os import sys import subprocess import threading from concurrent.futures import ThreadPoolExecutor, as_completed import yaml import requests import logging # 初始化日志记录器 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)] ) LOCK = threading.Lock() modified = False def download_file(url): """使用 requests 下载文件内容,限制超时时间为 3 分钟""" try: response = requests.get(url, timeout=180) response.raise_for_status() return response.text except requests.exceptions.RequestException as e: logging.error(f"❌ 下载 {url} 时出错: {e}") return None def process_item(item): global modified file_path = item.get("file") source = item.get("source") if not file_path or not source: logging.warning(f"⚠️ 条目缺少 file 或 source 字段: {item}") return logging.info(f"🛠️ 处理 {file_path} 从 {source}") new_content = download_file(source) if new_content is None: return # 构建最终文件路径 abs_file_path = os.path.join(os.getcwd(), file_path) os.makedirs(os.path.dirname(abs_file_path), exist_ok=True) updated = False if os.path.exists(abs_file_path): with open(abs_file_path, "r", encoding="utf-8") as f: old_content = f.read() if old_content != new_content: updated = True else: updated = True if updated: with open(abs_file_path, "w", encoding="utf-8") as f: f.write(new_content) logging.info(f"🚀 更新了 {abs_file_path}") with LOCK: modified = True def check_env_vars(): """检查必要的环境变量是否存在""" required_vars = ["THE_REPO", "REPO_TOKEN", "CI_EMAIL"] missing_vars = [var for var in required_vars if not os.environ.get(var)] if missing_vars: logging.error(f"❌ 缺少必要的环境变量: {', '.join(missing_vars)}") sys.exit(1) def main(): global modified # 检查环境变量 check_env_vars() # 加载 list.yaml 文件 try: list_yaml_path = os.path.join(os.getcwd(), "list.yaml") with open(list_yaml_path, "r", encoding="utf-8") as f: items = yaml.safe_load(f) or [] except FileNotFoundError: logging.error("❌ list.yaml 文件不存在。") sys.exit(1) except yaml.YAMLError as e: logging.error(f"❌ 解析 list.yaml 文件时出错: {e}") sys.exit(1) logging.info("🚀 开始并发更新文件") # 使用线程池执行多线程任务 max_workers = int(os.getenv("MAX_WORKERS", 5)) # 允许通过环境变量配置并发数 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(process_item, item) for item in items] for future in as_completed(futures): try: future.result() # 确保异常被捕获 except Exception as e: logging.error(f"❌ 处理任务时出错: {e}") if modified: try: # 配置 Git 用户信息 ci_email = os.environ["CI_EMAIL"] subprocess.run(["git", "config", "--global", "user.email", ci_email], check=True) subprocess.run(["git", "config", "--global", "user.name", "CI/CD"], check=True) # 设置远程仓库 URL(隐藏敏感信息) repo_url = os.environ["THE_REPO"] token = os.environ["REPO_TOKEN"] new_repo_url = repo_url.replace("https://", f"https://oauth2:{token}@") subprocess.run(["git", "remote", "set-url", "origin", new_repo_url], check=True) # 提交并推送更改 subprocess.run(["git", "checkout", "-B", "main"], check=True) subprocess.run(["git", "add", "."], check=True) commit_message = "feat: 自动更新从 list.yaml 下载最新文件" subprocess.run(["git", "commit", "-m", commit_message], check=False) subprocess.run(["git", "push", "origin", "main"], check=True) except subprocess.CalledProcessError as e: logging.error(f"❌ Git 操作失败: {e}") sys.exit(1) else: logging.info("📦 没有需要更新的文件。") if __name__ == "__main__": main()