背景

用阿里云 IoT、腾讯云 IoT 之类的平台,设备多了要交月费,数据也不在自己手里。换成开源方案,消息队列用 EMQX,时序数据库用 TimescaleDB(基于 PostgreSQL),自己掌控一切。

整体架构

ESP32 设备
   ↓ (MQTT over TLS)
EMQX Broker (消息代理)
   ↓ (规则引擎)
TimescaleDB (时序存储)
   ↓
Grafana (可视化)

EMQX 部署

Docker 一键跑起来:

docker run -d \
  --name emqx \
  -p 1883:1883      # MQTT TCP
  -p 8883:8883      # MQTT TLS
  -p 8083:8083      # MQTT/WS
  -p 8084:8084      # MQTT/WSS
  -p 18083:18083    # Dashboard
  -p 9001:9001      # API
  emqx/emqx:latest

认证配置

用内置的 HTTP 认证,设备连接时调用自己的认证服务:

# emqx.conf
auth.http.enable = on
auth.http.request.url = http://your-auth-server/auth
auth.http.request.method = post
auth.http.request.headers = Content-Type: application/json
auth.http.request.params = clientId, username, password

规则引擎

把设备消息直接转存 TimescaleDB:

# 创建规则
POST /api/v4/rules
{
  "name": "store_telemetry",
  "sql": "SELECT topic, payload FROM \"devices/+/telemetry\"",
  "actions": [{
    "function": "mysql",
    "args": {
      "table": "telemetry",
      "fields": ["topic", "payload"]
    }
  }]
}

ESP32 程序

用 Arduino + PubSubClient 库:

#include <WiFi.h>
#include <PubSubClient.h>
#include <esp32-hal-tls.h>

const char* ssid = "your_wifi";
const char* pass = "your_password";
const char* mqtt_server = "your-domain.com";
const int mqtt_port = 8883;

WiFiClientSecure espClient;
PubSubClient client(espClient);

void callback(char* topic, byte* payload, unsigned int len) {
  // 处理下行消息
}

void reconnect() {
  while (!client.connected()) {
    if (client.connect("esp32-client", "username", "password")) {
      client.subscribe("devices/mydevice/cmd");
    } else {
      delay(5000);
    }
  }
}

void sendTelemetry(float temp, float humi) {
  StaticJsonDocument<256> doc;
  doc["device"] = "esp32-001";
  doc["temp"] = temp;
  doc["humi"] = humi;
  doc["ts"] = millis();

  char buf[256];
  serializeJson(doc, buf);
  client.publish("devices/esp32-001/telemetry", buf);
}

TLS 配置

MQTT 用 TLS 加密,防止被窃听:

espClient.setCACert(root_ca_cert);
espClient.setCertificate(client_cert);
espClient.setPrivateKey(client_key);

TimescaleDB 时序存储

安装

# Docker 跑 TimescaleDB
docker run -d \
  --name timescaledb \
  -e POSTGRES_PASSWORD=secret \
  -e POSTGRES_DB=iot \
  -p 5432:5432 \
  timescale/timescaledb:latest-pg15

创建超表

-- 开启 TimescaleDB 扩展
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

-- 创建普通表
CREATE TABLE telemetry (
  time        TIMESTAMPTZ NOT NULL,
  device_id   TEXT NOT NULL,
  temperature DOUBLE PRECISION,
  humidity    DOUBLE PRECISION,
  raw         JSONB
);

-- 转为超表(自动分区)
SELECT create_hypertable('telemetry', 'time',
  chunk_time_interval => INTERVAL '1 day');

-- 创建压缩策略(省存储)
ALTER TABLE telemetry SET (
  timescaledb.compress,
  timescaledb.compress_segmentby = 'device_id'
);
SELECT add_compression_policy('telemetry', INTERVAL '7 days');

保留策略

自动清理 90 天前的数据:

SELECT add_retention_policy('telemetry', INTERVAL '90 days');

Grafana 可视化

接 TimescaleDB 做图表:

# 跑 Grafana
docker run -d --name grafana -p 3000:3000 grafana/grafana

添加 DataSource → PostgreSQL → 连接 TimescaleDB,然后写 SQL 查询:

SELECT time_bucket('5 minutes', time) AS t,
       device_id,
       avg(temperature) as temp,
       avg(humidity) as humi
FROM telemetry
WHERE device_id = $1
  AND time > NOW() - INTERVAL '24 hours'
GROUP BY t, device_id
ORDER BY t;

成本对比

方案 100 设备/月 1000 设备/月
阿里云 IoT 高级版 ¥500+ ¥3000+
自建 EMQX + TimescaleDB ¥150(服务器+流量) ¥200

踩坑记录

  1. MQTT 遗嘱消息 — 设备断线时 EMQX 不会自动清理 session,要手动配置
  2. TLS 握手慢 — ESP32 跑 TLS 很吃性能,连接建立要 2-3 秒,用 Session Resumption 优化
  3. TimescaleDB 压缩影响写入 — 压缩在后台线程跑,但大 chunk 压缩时偶尔卡住写入,新版本改进了不少
  4. EMQX 规则引擎 JSON 解析 — 消息里字段名字要完全匹配才能提取,否则整条消息被丢弃