Files
script/python/send_info_for_homeassistant.py

163 lines
5.3 KiB
Python
Raw Normal View History

import json
import time
import paho.mqtt.client as mqtt
import psutil
RETURN_TIME = 30 # 发布设备信息的时间间隔,单位为秒
# MQTT 服务器地址和端口
broker = "127.0.0.1" # 替换为你的 EMQX 服务器地址
port = 1883 # 默认端口
username = "" # 替换为你的用户名
password = "" # 替换为你的密码
# 获取设备 ID、设备名称、型号和制造商
device_id = "orangepizero301" # 例如 "device_12345"
device_name = "Orange Pi Zero3 01" # 例如 "Device with Metrics"
device_model = "Orange Pi Zero3" # 例如 "Sensor Model X"
device_manufacturer = "Orange Pi" # 例如 "Your Company"
# 设备的基本信息
device_info = {
"device_id": device_id,
"device_name": device_name,
"device_type": "sensor",
"status": "active",
"model": device_model,
"manufacturer": device_manufacturer
}
# 获取设备内存、CPU、磁盘、网络信息
def get_device_metrics():
memory = psutil.virtual_memory()
total_memory = round(memory.total / (1024 * 1024 * 1024), 2) # GB保留两位小数
used_memory = round(memory.used / (1024 * 1024 * 1024), 2) # GB保留两位小数
cpu_usage = round(psutil.cpu_percent(interval=0.1), 2) # 减少阻塞时间
net_info = psutil.net_io_counters()
bytes_sent = round(net_info.bytes_sent / (1024 * 1024), 2) # MB保留两位小数
bytes_recv = round(net_info.bytes_recv / (1024 * 1024), 2) # MB保留两位小数
disk_info = psutil.disk_usage('/')
total_disk = round(disk_info.total / (1024 * 1024 * 1024), 2) # GB保留两位小数
used_disk = round(disk_info.used / (1024 * 1024 * 1024), 2) # GB保留两位小数
free_disk = round(disk_info.free / (1024 * 1024 * 1024), 2) # GB保留两位小数
metrics = {
"memory": {
"total_memory_gb": total_memory,
"used_memory_gb": used_memory
},
"cpu_usage_percent": cpu_usage,
"network": {
"bytes_sent_mb": bytes_sent,
"bytes_recv_mb": bytes_recv
},
"disk": {
"total_disk_gb": total_disk,
"used_disk_gb": used_disk,
"free_disk_gb": free_disk
}
}
return metrics
def create_discovery_payload(name, unit_of_measurement, value_template, unique_id, icon):
return {
"name": f"{name}",
"state_topic": f"devices/{device_id}/info",
"unit_of_measurement": unit_of_measurement,
"value_template": value_template,
"unique_id": f"{device_id}_{unique_id}",
"device": {
"identifiers": [device_id],
"name": device_name,
"model": device_model,
"manufacturer": device_manufacturer
},
"icon": icon
}
# 发布设备信息和自动发现配置
def publish_device_metrics(client):
try:
metrics = get_device_metrics()
# 合并设备信息和性能数据
device_metrics = {**device_info, **metrics}
# 主题名称
device_topic = f"devices/{device_id}/info" # 设备数据上传的主题
# 设备信息和性能数据的 JSON payload
payload = json.dumps(device_metrics, indent=4)
# 发布设备数据
client.publish(device_topic, payload)
# 发布自动发现配置(发布传感器信息)
discovery_payloads = [
create_discovery_payload("Memory", "GB", "{{ value_json.memory.used_memory_gb }}", "memory", "mdi:memory"),
create_discovery_payload("CPU Usage", "%", "{{ value_json.cpu_usage_percent }}", "cpu", "mdi:cpu-64-bit"),
create_discovery_payload("Disk Usage", "GB", "{{ value_json.disk.used_disk_gb }}", "disk", "mdi:harddisk"),
create_discovery_payload("Network Sent", "MB", "{{ value_json.network.bytes_sent_mb }}", "network_sent",
"mdi:network"),
create_discovery_payload("Network Received", "MB", "{{ value_json.network.bytes_recv_mb }}",
"network_received", "mdi:network")
]
# 发布自动发现配置到相应主题
for discovery_payload in discovery_payloads:
discovery_topic = f"homeassistant/sensor/{device_id}_{discovery_payload['unique_id']}/config"
client.publish(discovery_topic, json.dumps(discovery_payload, indent=4))
except Exception as e:
print(f"Error publishing device metrics: {e}")
# 连接时的回调函数
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# 断开时的回调函数
def on_disconnect(client, userdata, rc):
print(f"Disconnected with result code {rc}")
# 创建 MQTT 客户端实例
client = mqtt.Client()
# 设置认证信息
client.username_pw_set(username, password)
# 设置回调函数
client.on_connect = on_connect
client.on_disconnect = on_disconnect
# 连接到 EMQX
client.connect(broker, port, 60)
# 启动网络循环
client.loop_start()
# 每30秒上传设备信息并进行发现发布
try:
while True:
publish_device_metrics(client)
time.sleep(RETURN_TIME)
except KeyboardInterrupt:
print("Program interrupted, disconnecting...")
finally:
try:
# 断开连接
client.disconnect()
except Exception as e:
print(f"Error disconnecting: {e}")
finally:
client.loop_stop()