背景
用阿里云 IoT、腾讯云 IoT 之类的平台,设备多了要交月费,数据也不在自己手里。换成开源方案,消息队列用 EMQX,时序数据库用 TimescaleDB(基于 PostgreSQL),自己掌控一切。
整体架构
ESP32 设备
↓ (MQTT over TLS)
EMQX Broker (消息代理)
↓ (规则引擎)
TimescaleDB (时序存储)
↓
Grafana (可视化)
EMQX 部署
Docker 一键跑起来:
docker run -d \
--name emqx \
-p 1883:1883 # MQTT TCP
-p 8883:8883 # MQTT TLS
-p 8083:8083 # MQTT/WS
-p 8084:8084 # MQTT/WSS
-p 18083:18083 # Dashboard
-p 9001:9001 # API
emqx/emqx:latest
认证配置
用内置的 HTTP 认证,设备连接时调用自己的认证服务:
# emqx.conf
auth.http.enable = on
auth.http.request.url = http://your-auth-server/auth
auth.http.request.method = post
auth.http.request.headers = Content-Type: application/json
auth.http.request.params = clientId, username, password
规则引擎
把设备消息直接转存 TimescaleDB:
# 创建规则
POST /api/v4/rules
{
"name": "store_telemetry",
"sql": "SELECT topic, payload FROM \"devices/+/telemetry\"",
"actions": [{
"function": "mysql",
"args": {
"table": "telemetry",
"fields": ["topic", "payload"]
}
}]
}
ESP32 程序
用 Arduino + PubSubClient 库:
#include <WiFi.h>
#include <PubSubClient.h>
#include <esp32-hal-tls.h>
const char* ssid = "your_wifi";
const char* pass = "your_password";
const char* mqtt_server = "your-domain.com";
const int mqtt_port = 8883;
WiFiClientSecure espClient;
PubSubClient client(espClient);
void callback(char* topic, byte* payload, unsigned int len) {
// 处理下行消息
}
void reconnect() {
while (!client.connected()) {
if (client.connect("esp32-client", "username", "password")) {
client.subscribe("devices/mydevice/cmd");
} else {
delay(5000);
}
}
}
void sendTelemetry(float temp, float humi) {
StaticJsonDocument<256> doc;
doc["device"] = "esp32-001";
doc["temp"] = temp;
doc["humi"] = humi;
doc["ts"] = millis();
char buf[256];
serializeJson(doc, buf);
client.publish("devices/esp32-001/telemetry", buf);
}
TLS 配置
MQTT 用 TLS 加密,防止被窃听:
espClient.setCACert(root_ca_cert);
espClient.setCertificate(client_cert);
espClient.setPrivateKey(client_key);
TimescaleDB 时序存储
安装
# Docker 跑 TimescaleDB
docker run -d \
--name timescaledb \
-e POSTGRES_PASSWORD=secret \
-e POSTGRES_DB=iot \
-p 5432:5432 \
timescale/timescaledb:latest-pg15
创建超表
-- 开启 TimescaleDB 扩展
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;
-- 创建普通表
CREATE TABLE telemetry (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
raw JSONB
);
-- 转为超表(自动分区)
SELECT create_hypertable('telemetry', 'time',
chunk_time_interval => INTERVAL '1 day');
-- 创建压缩策略(省存储)
ALTER TABLE telemetry SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id'
);
SELECT add_compression_policy('telemetry', INTERVAL '7 days');
保留策略
自动清理 90 天前的数据:
SELECT add_retention_policy('telemetry', INTERVAL '90 days');
Grafana 可视化
接 TimescaleDB 做图表:
# 跑 Grafana
docker run -d --name grafana -p 3000:3000 grafana/grafana
添加 DataSource → PostgreSQL → 连接 TimescaleDB,然后写 SQL 查询:
SELECT time_bucket('5 minutes', time) AS t,
device_id,
avg(temperature) as temp,
avg(humidity) as humi
FROM telemetry
WHERE device_id = $1
AND time > NOW() - INTERVAL '24 hours'
GROUP BY t, device_id
ORDER BY t;
成本对比
| 方案 | 100 设备/月 | 1000 设备/月 |
|---|---|---|
| 阿里云 IoT 高级版 | ¥500+ | ¥3000+ |
| 自建 EMQX + TimescaleDB | ¥150(服务器+流量) | ¥200 |
踩坑记录
- MQTT 遗嘱消息 — 设备断线时 EMQX 不会自动清理 session,要手动配置
- TLS 握手慢 — ESP32 跑 TLS 很吃性能,连接建立要 2-3 秒,用 Session Resumption 优化
- TimescaleDB 压缩影响写入 — 压缩在后台线程跑,但大 chunk 压缩时偶尔卡住写入,新版本改进了不少
- EMQX 规则引擎 JSON 解析 — 消息里字段名字要完全匹配才能提取,否则整条消息被丢弃