163 lines
5.3 KiB
Python
163 lines
5.3 KiB
Python
import json
|
||
import time
|
||
|
||
import paho.mqtt.client as mqtt
|
||
import psutil
|
||
|
||
RETURN_TIME = 30 # 发布设备信息的时间间隔,单位为秒
|
||
|
||
# MQTT 服务器地址和端口
|
||
broker = "127.0.0.1" # 替换为你的 EMQX 服务器地址
|
||
port = 1883 # 默认端口
|
||
username = "" # 替换为你的用户名
|
||
password = "" # 替换为你的密码
|
||
|
||
# 获取设备 ID、设备名称、型号和制造商
|
||
device_id = "orangepizero301" # 例如 "device_12345"
|
||
device_name = "Orange Pi Zero3 01" # 例如 "Device with Metrics"
|
||
device_model = "Orange Pi Zero3" # 例如 "Sensor Model X"
|
||
device_manufacturer = "Orange Pi" # 例如 "Your Company"
|
||
|
||
# 设备的基本信息
|
||
device_info = {
|
||
"device_id": device_id,
|
||
"device_name": device_name,
|
||
"device_type": "sensor",
|
||
"status": "active",
|
||
"model": device_model,
|
||
"manufacturer": device_manufacturer
|
||
}
|
||
|
||
|
||
# 获取设备内存、CPU、磁盘、网络信息
|
||
def get_device_metrics():
|
||
memory = psutil.virtual_memory()
|
||
total_memory = round(memory.total / (1024 * 1024 * 1024), 2) # GB,保留两位小数
|
||
used_memory = round(memory.used / (1024 * 1024 * 1024), 2) # GB,保留两位小数
|
||
|
||
cpu_usage = round(psutil.cpu_percent(interval=0.1), 2) # 减少阻塞时间
|
||
|
||
net_info = psutil.net_io_counters()
|
||
bytes_sent = round(net_info.bytes_sent / (1024 * 1024), 2) # MB,保留两位小数
|
||
bytes_recv = round(net_info.bytes_recv / (1024 * 1024), 2) # MB,保留两位小数
|
||
|
||
disk_info = psutil.disk_usage('/')
|
||
total_disk = round(disk_info.total / (1024 * 1024 * 1024), 2) # GB,保留两位小数
|
||
used_disk = round(disk_info.used / (1024 * 1024 * 1024), 2) # GB,保留两位小数
|
||
free_disk = round(disk_info.free / (1024 * 1024 * 1024), 2) # GB,保留两位小数
|
||
|
||
metrics = {
|
||
"memory": {
|
||
"total_memory_gb": total_memory,
|
||
"used_memory_gb": used_memory
|
||
},
|
||
"cpu_usage_percent": cpu_usage,
|
||
"network": {
|
||
"bytes_sent_mb": bytes_sent,
|
||
"bytes_recv_mb": bytes_recv
|
||
},
|
||
"disk": {
|
||
"total_disk_gb": total_disk,
|
||
"used_disk_gb": used_disk,
|
||
"free_disk_gb": free_disk
|
||
}
|
||
}
|
||
|
||
return metrics
|
||
|
||
|
||
def create_discovery_payload(name, unit_of_measurement, value_template, unique_id, icon):
|
||
return {
|
||
"name": f"{name}",
|
||
"state_topic": f"devices/{device_id}/info",
|
||
"unit_of_measurement": unit_of_measurement,
|
||
"value_template": value_template,
|
||
"unique_id": f"{device_id}_{unique_id}",
|
||
"device": {
|
||
"identifiers": [device_id],
|
||
"name": device_name,
|
||
"model": device_model,
|
||
"manufacturer": device_manufacturer
|
||
},
|
||
"icon": icon
|
||
}
|
||
|
||
|
||
# 发布设备信息和自动发现配置
|
||
def publish_device_metrics(client):
|
||
try:
|
||
metrics = get_device_metrics()
|
||
|
||
# 合并设备信息和性能数据
|
||
device_metrics = {**device_info, **metrics}
|
||
|
||
# 主题名称
|
||
device_topic = f"devices/{device_id}/info" # 设备数据上传的主题
|
||
|
||
# 设备信息和性能数据的 JSON payload
|
||
payload = json.dumps(device_metrics, indent=4)
|
||
|
||
# 发布设备数据
|
||
client.publish(device_topic, payload)
|
||
|
||
# 发布自动发现配置(发布传感器信息)
|
||
discovery_payloads = [
|
||
create_discovery_payload("Memory", "GB", "{{ value_json.memory.used_memory_gb }}", "memory", "mdi:memory"),
|
||
create_discovery_payload("CPU Usage", "%", "{{ value_json.cpu_usage_percent }}", "cpu", "mdi:cpu-64-bit"),
|
||
create_discovery_payload("Disk Usage", "GB", "{{ value_json.disk.used_disk_gb }}", "disk", "mdi:harddisk"),
|
||
create_discovery_payload("Network Sent", "MB", "{{ value_json.network.bytes_sent_mb }}", "network_sent",
|
||
"mdi:network"),
|
||
create_discovery_payload("Network Received", "MB", "{{ value_json.network.bytes_recv_mb }}",
|
||
"network_received", "mdi:network")
|
||
]
|
||
|
||
# 发布自动发现配置到相应主题
|
||
for discovery_payload in discovery_payloads:
|
||
discovery_topic = f"homeassistant/sensor/{device_id}_{discovery_payload['unique_id']}/config"
|
||
client.publish(discovery_topic, json.dumps(discovery_payload, indent=4))
|
||
except Exception as e:
|
||
print(f"Error publishing device metrics: {e}")
|
||
|
||
|
||
# 连接时的回调函数
|
||
def on_connect(client, userdata, flags, rc):
|
||
print(f"Connected with result code {rc}")
|
||
|
||
|
||
# 断开时的回调函数
|
||
def on_disconnect(client, userdata, rc):
|
||
print(f"Disconnected with result code {rc}")
|
||
|
||
|
||
# 创建 MQTT 客户端实例
|
||
client = mqtt.Client()
|
||
|
||
# 设置认证信息
|
||
client.username_pw_set(username, password)
|
||
|
||
# 设置回调函数
|
||
client.on_connect = on_connect
|
||
client.on_disconnect = on_disconnect
|
||
|
||
# 连接到 EMQX
|
||
client.connect(broker, port, 60)
|
||
|
||
# 启动网络循环
|
||
client.loop_start()
|
||
|
||
# 每30秒上传设备信息并进行发现发布
|
||
try:
|
||
while True:
|
||
publish_device_metrics(client)
|
||
time.sleep(RETURN_TIME)
|
||
except KeyboardInterrupt:
|
||
print("Program interrupted, disconnecting...")
|
||
finally:
|
||
try:
|
||
# 断开连接
|
||
client.disconnect()
|
||
except Exception as e:
|
||
print(f"Error disconnecting: {e}")
|
||
finally:
|
||
client.loop_stop()
|