Files
script/update_files.py

136 lines
4.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python
import os
import sys
import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import yaml
import requests
import logging
# 初始化日志记录器
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
LOCK = threading.Lock()
modified = False
def download_file(url):
"""使用 requests 下载文件内容,限制超时时间为 3 分钟"""
try:
response = requests.get(url, timeout=180)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
logging.error(f"❌ 下载 {url} 时出错: {e}")
return None
def process_item(item):
global modified
file_path = item.get("file")
source = item.get("source")
if not file_path or not source:
logging.warning(f"⚠️ 条目缺少 file 或 source 字段: {item}")
return
logging.info(f"🛠️ 处理 {file_path}{source}")
new_content = download_file(source)
if new_content is None:
return
# 构建最终文件路径
abs_file_path = os.path.join(os.getcwd(), file_path)
os.makedirs(os.path.dirname(abs_file_path), exist_ok=True)
updated = False
if os.path.exists(abs_file_path):
with open(abs_file_path, "r", encoding="utf-8") as f:
old_content = f.read()
if old_content != new_content:
updated = True
else:
logging.warning(f"⚠️ 文件 {abs_file_path} 不存在,将创建。")
updated = True
if updated:
with open(abs_file_path, "w", encoding="utf-8") as f:
f.write(new_content)
logging.info(f"🚀 更新了 {file_path}")
with LOCK:
modified = True
def check_env_vars():
"""检查必要的环境变量是否存在"""
required_vars = ["THE_REPO", "REPO_TOKEN", "CI_EMAIL"]
missing_vars = [var for var in required_vars if not os.environ.get(var)]
if missing_vars:
logging.error(f"❌ 缺少必要的环境变量: {', '.join(missing_vars)}")
sys.exit(1)
def main():
global modified
# 检查环境变量
check_env_vars()
# 加载 list.yaml 文件
try:
list_yaml_path = os.path.join(os.getcwd(), "list.yaml")
with open(list_yaml_path, "r", encoding="utf-8") as f:
items = yaml.safe_load(f) or []
except FileNotFoundError:
logging.error("❌ list.yaml 文件不存在。")
sys.exit(0)
except yaml.YAMLError as e:
logging.error(f"❌ 解析 list.yaml 文件时出错: {e}")
sys.exit(1)
logging.info("🚀 开始并发更新文件")
# 使用线程池执行多线程任务
max_workers = int(os.getenv("MAX_WORKERS", 5)) # 允许通过环境变量配置并发数
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_item, item) for item in items]
for future in as_completed(futures):
try:
future.result() # 确保异常被捕获
except Exception as e:
logging.error(f"❌ 处理任务时出错: {e}")
if modified:
logging.info("📦 更新完成,正在提交更改...")
try:
# 配置 Git 用户信息
ci_email = os.environ["CI_EMAIL"]
subprocess.run(["git", "config", "--global", "user.email", ci_email], check=True)
subprocess.run(["git", "config", "--global", "user.name", "CI/CD"], check=True)
# 设置远程仓库 URL隐藏敏感信息
repo_url = os.environ["THE_REPO"]
token = os.environ["REPO_TOKEN"]
new_repo_url = repo_url.replace("https://", f"https://oauth2:{token}@")
subprocess.run(["git", "remote", "set-url", "origin", new_repo_url], check=True)
# 提交并推送更改
subprocess.run(["git", "checkout", "-B", "main"], check=True)
subprocess.run(["git", "add", "."], check=True)
commit_message = "feat: 自动更新从 list.yaml 下载最新文件"
subprocess.run(["git", "commit", "-m", commit_message], check=False)
subprocess.run(["git", "push", "origin", "main"], check=True)
except subprocess.CalledProcessError as e:
logging.error(f"❌ Git 操作失败: {e}")
sys.exit(1)
else:
logging.info("📦 没有需要更新的文件。")
if __name__ == "__main__":
main()