Files
script/python/send_info_for_homeassistant.py
牡蛎 d2ec4a1b65 refactor(frpc): 更新 frpc.toml 配置
- 更新 frpc.toml 文件,添加 web 管理后台配置
2025-06-05 00:17:12 +08:00

163 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import paho.mqtt.client as mqtt
import psutil
RETURN_TIME = 30 # 发布设备信息的时间间隔,单位为秒
# MQTT 服务器地址和端口
broker = "127.0.0.1" # 替换为你的 EMQX 服务器地址
port = 1883 # 默认端口
username = "" # 替换为你的用户名
password = "" # 替换为你的密码
# 获取设备 ID、设备名称、型号和制造商
device_id = "orangepizero301" # 例如 "device_12345"
device_name = "Orange Pi Zero3 01" # 例如 "Device with Metrics"
device_model = "Orange Pi Zero3" # 例如 "Sensor Model X"
device_manufacturer = "Orange Pi" # 例如 "Your Company"
# 设备的基本信息
device_info = {
"device_id": device_id,
"device_name": device_name,
"device_type": "sensor",
"status": "active",
"model": device_model,
"manufacturer": device_manufacturer
}
# 获取设备内存、CPU、磁盘、网络信息
def get_device_metrics():
memory = psutil.virtual_memory()
total_memory = round(memory.total / (1024 * 1024 * 1024), 2) # GB保留两位小数
used_memory = round(memory.used / (1024 * 1024 * 1024), 2) # GB保留两位小数
cpu_usage = round(psutil.cpu_percent(interval=0.1), 2) # 减少阻塞时间
net_info = psutil.net_io_counters()
bytes_sent = round(net_info.bytes_sent / (1024 * 1024), 2) # MB保留两位小数
bytes_recv = round(net_info.bytes_recv / (1024 * 1024), 2) # MB保留两位小数
disk_info = psutil.disk_usage('/')
total_disk = round(disk_info.total / (1024 * 1024 * 1024), 2) # GB保留两位小数
used_disk = round(disk_info.used / (1024 * 1024 * 1024), 2) # GB保留两位小数
free_disk = round(disk_info.free / (1024 * 1024 * 1024), 2) # GB保留两位小数
metrics = {
"memory": {
"total_memory_gb": total_memory,
"used_memory_gb": used_memory
},
"cpu_usage_percent": cpu_usage,
"network": {
"bytes_sent_mb": bytes_sent,
"bytes_recv_mb": bytes_recv
},
"disk": {
"total_disk_gb": total_disk,
"used_disk_gb": used_disk,
"free_disk_gb": free_disk
}
}
return metrics
def create_discovery_payload(name, unit_of_measurement, value_template, unique_id, icon):
return {
"name": f"{name}",
"state_topic": f"devices/{device_id}/info",
"unit_of_measurement": unit_of_measurement,
"value_template": value_template,
"unique_id": f"{device_id}_{unique_id}",
"device": {
"identifiers": [device_id],
"name": device_name,
"model": device_model,
"manufacturer": device_manufacturer
},
"icon": icon
}
# 发布设备信息和自动发现配置
def publish_device_metrics(client):
try:
metrics = get_device_metrics()
# 合并设备信息和性能数据
device_metrics = {**device_info, **metrics}
# 主题名称
device_topic = f"devices/{device_id}/info" # 设备数据上传的主题
# 设备信息和性能数据的 JSON payload
payload = json.dumps(device_metrics, indent=4)
# 发布设备数据
client.publish(device_topic, payload)
# 发布自动发现配置(发布传感器信息)
discovery_payloads = [
create_discovery_payload("Memory", "GB", "{{ value_json.memory.used_memory_gb }}", "memory", "mdi:memory"),
create_discovery_payload("CPU Usage", "%", "{{ value_json.cpu_usage_percent }}", "cpu", "mdi:cpu-64-bit"),
create_discovery_payload("Disk Usage", "GB", "{{ value_json.disk.used_disk_gb }}", "disk", "mdi:harddisk"),
create_discovery_payload("Network Sent", "MB", "{{ value_json.network.bytes_sent_mb }}", "network_sent",
"mdi:network"),
create_discovery_payload("Network Received", "MB", "{{ value_json.network.bytes_recv_mb }}",
"network_received", "mdi:network")
]
# 发布自动发现配置到相应主题
for discovery_payload in discovery_payloads:
discovery_topic = f"homeassistant/sensor/{device_id}_{discovery_payload['unique_id']}/config"
client.publish(discovery_topic, json.dumps(discovery_payload, indent=4))
except Exception as e:
print(f"Error publishing device metrics: {e}")
# 连接时的回调函数
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# 断开时的回调函数
def on_disconnect(client, userdata, rc):
print(f"Disconnected with result code {rc}")
# 创建 MQTT 客户端实例
client = mqtt.Client()
# 设置认证信息
client.username_pw_set(username, password)
# 设置回调函数
client.on_connect = on_connect
client.on_disconnect = on_disconnect
# 连接到 EMQX
client.connect(broker, port, 60)
# 启动网络循环
client.loop_start()
# 每30秒上传设备信息并进行发现发布
try:
while True:
publish_device_metrics(client)
time.sleep(RETURN_TIME)
except KeyboardInterrupt:
print("Program interrupted, disconnecting...")
finally:
try:
# 断开连接
client.disconnect()
except Exception as e:
print(f"Error disconnecting: {e}")
finally:
client.loop_stop()