import json import time import paho.mqtt.client as mqtt import psutil RETURN_TIME = 30 # 发布设备信息的时间间隔,单位为秒 # MQTT 服务器地址和端口 broker = "127.0.0.1" # 替换为你的 EMQX 服务器地址 port = 1883 # 默认端口 username = "" # 替换为你的用户名 password = "" # 替换为你的密码 # 获取设备 ID、设备名称、型号和制造商 device_id = "orangepizero301" # 例如 "device_12345" device_name = "Orange Pi Zero3 01" # 例如 "Device with Metrics" device_model = "Orange Pi Zero3" # 例如 "Sensor Model X" device_manufacturer = "Orange Pi" # 例如 "Your Company" # 设备的基本信息 device_info = { "device_id": device_id, "device_name": device_name, "device_type": "sensor", "status": "active", "model": device_model, "manufacturer": device_manufacturer } # 获取设备内存、CPU、磁盘、网络信息 def get_device_metrics(): memory = psutil.virtual_memory() total_memory = round(memory.total / (1024 * 1024 * 1024), 2) # GB,保留两位小数 used_memory = round(memory.used / (1024 * 1024 * 1024), 2) # GB,保留两位小数 cpu_usage = round(psutil.cpu_percent(interval=0.1), 2) # 减少阻塞时间 net_info = psutil.net_io_counters() bytes_sent = round(net_info.bytes_sent / (1024 * 1024), 2) # MB,保留两位小数 bytes_recv = round(net_info.bytes_recv / (1024 * 1024), 2) # MB,保留两位小数 disk_info = psutil.disk_usage('/') total_disk = round(disk_info.total / (1024 * 1024 * 1024), 2) # GB,保留两位小数 used_disk = round(disk_info.used / (1024 * 1024 * 1024), 2) # GB,保留两位小数 free_disk = round(disk_info.free / (1024 * 1024 * 1024), 2) # GB,保留两位小数 metrics = { "memory": { "total_memory_gb": total_memory, "used_memory_gb": used_memory }, "cpu_usage_percent": cpu_usage, "network": { "bytes_sent_mb": bytes_sent, "bytes_recv_mb": bytes_recv }, "disk": { "total_disk_gb": total_disk, "used_disk_gb": used_disk, "free_disk_gb": free_disk } } return metrics def create_discovery_payload(name, unit_of_measurement, value_template, unique_id, icon): return { "name": f"{name}", "state_topic": f"devices/{device_id}/info", "unit_of_measurement": unit_of_measurement, "value_template": value_template, "unique_id": f"{device_id}_{unique_id}", "device": { "identifiers": [device_id], "name": device_name, "model": device_model, "manufacturer": device_manufacturer }, "icon": icon } # 发布设备信息和自动发现配置 def publish_device_metrics(client): try: metrics = get_device_metrics() # 合并设备信息和性能数据 device_metrics = {**device_info, **metrics} # 主题名称 device_topic = f"devices/{device_id}/info" # 设备数据上传的主题 # 设备信息和性能数据的 JSON payload payload = json.dumps(device_metrics, indent=4) # 发布设备数据 client.publish(device_topic, payload) print(f"Device metrics published: {payload}") # 发布自动发现配置(发布传感器信息) discovery_payloads = [ create_discovery_payload("Memory", "GB", "{{ value_json.memory.used_memory_gb }}", "memory", "mdi:memory"), create_discovery_payload("CPU Usage", "%", "{{ value_json.cpu_usage_percent }}", "cpu", "mdi:cpu-64-bit"), create_discovery_payload("Disk Usage", "GB", "{{ value_json.disk.used_disk_gb }}", "disk", "mdi:harddisk"), create_discovery_payload("Network Sent", "MB", "{{ value_json.network.bytes_sent_mb }}", "network_sent", "mdi:network"), create_discovery_payload("Network Received", "MB", "{{ value_json.network.bytes_recv_mb }}", "network_received", "mdi:network") ] # 发布自动发现配置到相应主题 for discovery_payload in discovery_payloads: discovery_topic = f"homeassistant/sensor/{device_id}_{discovery_payload['unique_id']}/config" client.publish(discovery_topic, json.dumps(discovery_payload, indent=4)) print(f"Discovery message published: {json.dumps(discovery_payload, indent=4)}") except Exception as e: print(f"Error publishing device metrics: {e}") # 连接时的回调函数 def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") # 断开时的回调函数 def on_disconnect(client, userdata, rc): print(f"Disconnected with result code {rc}") # 创建 MQTT 客户端实例 client = mqtt.Client() # 设置认证信息 client.username_pw_set(username, password) # 设置回调函数 client.on_connect = on_connect client.on_disconnect = on_disconnect # 连接到 EMQX client.connect(broker, port, 60) # 启动网络循环 client.loop_start() # 每30秒上传设备信息并进行发现发布 try: while True: publish_device_metrics(client) time.sleep(RETURN_TIME) except KeyboardInterrupt: print("Program interrupted, disconnecting...") finally: try: # 断开连接 client.disconnect() except Exception as e: print(f"Error disconnecting: {e}") finally: client.loop_stop()