关于MQTT

MQTT(消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

搭建MQTT服务器

这里我们利用emqx在centos中来搭建mqtt服务器。官方提供了直观的安装方式,及其简单。这里我安装的是EMQX5.0(企业版)

wget https://www.emqx.com/zh/downloads/enterprise/v5.0.0/emqx-enterprise-5.0.0-el7-arm64.rpm
yum install emqx-enterprise-5.0.0-el7-arm64.rpm
sudo systemctl start emqx


需要注意的是,尽量不要下载最新版的。因为很多功能在最新版中要钱。更多历史版本可以在这里下载https://www.emqx.com/zh/downloads/enterprise

然后输入初始账号admin public登录。

配置esp8266

我们需要将DHT11的数据通过mqtt协议发送出来。烧录代码如下。

#include "DHT.h"
#include 
#include 

/************ WIFI and MQTT Information (CHANGE THESE FOR YOUR SETUP) ******************/
const char* ssid = "PDCN"; //type your WIFI information inside the quotes
const char* password = "1234567890";
const char* mqtt_server = "192.168.123.173";
const char* mqtt_username = "admin";
const char* mqtt_password = "admin";
const int mqtt_port = 1883;
const char* mqtt_sensor_topic = "dht11";
unsigned long last_send = 0;

#define DHTPIN 4 
#define DHTTYPE DHT11 
DHT dht(DHTPIN, DHTTYPE);

WiFiClient espClient;
PubSubClient client(espClient);

void setup() {
  Serial.begin(115200);
  dht.begin();

  setupWifi();
  client.setServer(mqtt_server, mqtt_port);
}

// 连接 Wifi
void setupWifi() {
  WiFi.mode(WIFI_STA);
  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}

void loop()
{
  if ( !client.connected() ) { // MQTT 是否连接成功
    reconnect();
  }
  if ( millis() - last_send > 5000 ) { // 每5秒发布一次温湿度数据
    handleTemperatureAndHumidity();
    last_send = millis();
  }
  client.loop();
}

void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Attempt to connect
    if (client.connect(mqtt_sensor_topic, mqtt_username, mqtt_password)) {
      Serial.println("connected");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      delay(5000);
    }
  }
}

void handleTemperatureAndHumidity() {
  float h = dht.readHumidity();
  float t = dht.readTemperature();
  if (isnan(h) || isnan(t) ) {
    Serial.println(F("Failed to read from DHT sensor!"));
    return;
  }
  // 准备JSON Payload
  String payload = "{";
  payload += "\"temp\":"; payload += t; payload += ",";
  payload += "\"humi\":"; payload += h;
  payload += "}";

  // 发送温湿度数据
  char attributes[100];
  payload.toCharArray( attributes, 100 );
  client.publish( mqtt_sensor_topic, attributes );
  Serial.println( attributes );
}

在上述代码中,我们引入了PubSubClientDHT库。在ide中安装这两个库。(注意版本,过高可能编译不过)

上传代码到esp8266。过会儿我们在emqx中便可以看到设备已经上线了。

查看订阅数据

在上述代码中,我们的订阅主题为dht11。下载mqtt工具mqttx。新建一个连接。

具体配置如下

连接完成后,添加订阅

如下,成功得到esp8266中的dht11的数据。

需要注意的是,我们需将数据处理成json数据,方便后期入库。如{"temp":23.40,"humi":57.00}

将数据存入mysql

为了后期方便数据,我们可以将数据存放到mysql中。具体操作如下:
点击数据桥接-创建-资源类型为mysql,填写相应的mysql账号等信息,并进行测试,提示资源可用则证明没有问题。


sql模板内容如下

insert into wendu ( temp, humi,time) values (${temp}, ${humi},FROM_UNIXTIME(${time}/1000) )

简单的sql插入语句,即在wendu表的temp humi time字段中插入相应的值。
配置完成后,会自动提示配置相应的规则

代码如下

SELECT
  timestamp as time, payload.temp as temp ,payload.humi as humi #获取值赋值给变量,注意逗号隔开。
FROM
  "dht11" # 数据来子订阅的主题dht11

现在,我们在mysql中新建一个wendu的表。并创建temp humitime字段。
最终效果如下

最后修改:2023 年 08 月 04 日
如果觉得我的文章对你有用,请随意赞赏