Files
script/update_files.py

131 lines
4.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python
import os
import sys
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import yaml
import requests
import subprocess
import logging
# 初始化日志记录器
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
LOCK = threading.Lock()
modified = False
def download_file(url):
"""使用 requests 下载文件内容,限制超时时间为 3 分钟"""
try:
response = requests.get(url, timeout=180)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
logging.error(f"❌ 下载 {url} 时出错: {e}")
return None
def process_item(item):
global modified
file_path = item.get("file")
source = item.get("source")
if not file_path or not source:
logging.warning(f"⚠️ 条目缺少 file 或 source 字段: {item}")
return
logging.info(f"🛠️ 处理 {file_path}{source}")
new_content = download_file(source)
if new_content is None:
return
# 确保文件路径是相对路径且安全
rel_path = os.path.relpath(file_path)
os.makedirs(os.path.dirname(rel_path), exist_ok=True)
updated = False
if os.path.exists(rel_path):
with open(rel_path, "r", encoding="utf-8") as f:
old_content = f.read()
if old_content != new_content:
updated = True
else:
updated = True
if updated:
with open(rel_path, "w", encoding="utf-8") as f:
f.write(new_content)
logging.info(f"🚀 更新了 {rel_path}")
with LOCK:
modified = True
def check_env_vars():
"""检查必要的环境变量是否存在"""
required_vars = ["THE_REPO", "REPO_TOKEN"]
missing_vars = [var for var in required_vars if not os.environ.get(var)]
if missing_vars:
logging.error(f"❌ 缺少必要的环境变量: {', '.join(missing_vars)}")
sys.exit(1)
def main():
global modified
# 检查环境变量
check_env_vars()
# 加载 list.yaml 文件
try:
with open("list.yaml", "r", encoding="utf-8") as f:
items = yaml.safe_load(f) or []
except FileNotFoundError:
logging.error("❌ list.yaml 文件不存在。")
sys.exit(1)
except yaml.YAMLError as e:
logging.error(f"❌ 解析 list.yaml 文件时出错: {e}")
sys.exit(1)
logging.info("🚀 开始并发更新文件")
# 使用线程池执行多线程任务
max_workers = int(os.getenv("MAX_WORKERS", 5)) # 允许通过环境变量配置并发数
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_item, item) for item in items]
for future in as_completed(futures):
try:
future.result() # 确保异常被捕获
except Exception as e:
logging.error(f"❌ 处理任务时出错: {e}")
if modified:
try:
# 配置 Git 用户信息
subprocess.run(["git", "config", "--global", "user.email", "cicd@sugarscat.cn"], check=True)
subprocess.run(["git", "config", "--global", "user.name", "CI/CD"], check=True)
# 设置远程仓库 URL隐藏敏感信息
repo_url = os.environ["THE_REPO"]
token = os.environ["REPO_TOKEN"]
new_repo_url = repo_url.replace("https://", f"https://oauth2:{token}@")
subprocess.run(["git", "remote", "set-url", "origin", new_repo_url], check=True)
# 提交并推送更改
subprocess.run(["git", "checkout", "-B", "main"], check=True)
subprocess.run(["git", "add", "."], check=True)
commit_message = "feat: 自动更新从 list.yaml 下载最新文件"
subprocess.run(["git", "commit", "-m", commit_message], check=False)
subprocess.run(["git", "push", "origin", "main"], check=True)
except subprocess.CalledProcessError as e:
logging.error(f"❌ Git 操作失败: {e}")
sys.exit(1)
else:
logging.info("📦 没有需要更新的文件。")
if __name__ == "__main__":
main()