2018年11月11日 星期日

當 ESP8266 遇上中華電信 IoT 智慧聯網大平台 { 入門 - 07 } - 結合 Arduino + ESP8266 實現 MQTT 主題訂閱與接收


網頁最後修改時間:2018/11/11


經過了前面幾篇關於 MQTT 控制包格式的講解和演示後,相信各位對於 MQTT 主題訂閱和訊息發佈都有了一定的了解。

作為入門系列的最後一篇,此篇將針對 "{ 入門 - 06 } - 了解 MQTT 協議,學習如何訂閱 MQTT 主題與接收 MQTT 發佈消息" 所說的內容,撰寫實現它的程式碼。

延續在 {入門 -04 } - 了解 MQTT 協議,學習如何發佈 MQTT 消息 所建立的專案,此篇網頁的程式碼將用來訂閱該專案設備中的三個感測器,並且取出由 MQTT Broker 發送回的 PUBLISH 控制封包中屬於 Payload 部分的 Json 字串,只要解析這個字串就可得到該感測器的相關數據。

解析的方式可參考 { 入門 - 02 } - 設備感測數據讀取與 (JSON) 解析 所提及的方法,或是用自己的方式,下面的程式主要的目的是展示怎麼訂閱主題以及輸出接收到的 PUBLISH 控制封包的 Json 字串,已說明過的程式碼就不再細述,請自行參照前面幾篇入門網頁的說明。

【參考電路圖】

注意下面的電路圖,與前幾篇網頁所用的有些許變更:
  • SoftwareSerial 的接腳變更為 <D4> (Rx)、<D5> (Tx);
  • <D2> 低態觸發時,程式送出 MQTT DISCONNEC 控制封包中斷連線;

參考電路圖
*   開機時可先將 <D2> 與 <5.0V> 相接,當要與 MQTT 斷線時,再將其與 <GND> 相接發出 DISCONNECT 控制封包。
** 只要開發板通電之後發生 ESP8266 一直重新開機或是網路連不上等......問題,絕大部分都是由於供電不足導致!輸入電源可直接使用 MicroUSB 電源直接輸入使用 ( 建議 5V / 1A 以上 ),但盡量不要直接使用電腦的 USB 埠 ( 除非能提供較大的電流 )。

【程式碼說明】

此程式將會訂閱專案裡,設備編號 ${device_id} 的設備所配置的三個感測器 ( ${sensor_id} 分別為:SayHelloSHT31_TemperatureSHT31_Humidity )。只要這三個感測器數據有所變化,MQTT Broker 就會發送 PUBLISH 控制封包給訂閱裝置。

下面的說明將根據 gist 上面的程式碼重點說明如下,其他部分已經盡量加上註解,有問題請自行研究,最下面的影片就是此程式碼運行的結果。

/*--*//**---/*///**---*-*////***--*/*///***----*///--*/*///**--*/*//**--**/*//
* 全局變數宣告與定義:
  • line 21 ... 22: 填入連線的 SSID 名稱和密碼在 "" 裡面;
  • line 29, 31, 33: TOPIC 的名稱;${device_id} 要根據專案所提供給你 (妳) 的來做填寫,這是固定值不會變,但SayHelloSHT31_TemperatureSHT31_Humidity 為要訂閱的感測器 ( ${sensor_id} ) 名稱,三者是不會相同的;
  • line 30, 32, 34: SUBSCRIBE 控制封包中的 PACKET_ID,可為任意值,但不可超過 2 個位元組的大小;
  • line 36: 專案存取的權限,可由專案頁面新增;這邊應該只要唯獨就可以,若不行則要更改權限為讀寫;
MQTT_Subscribe_ESP01_Demo01.ino, line 19 ... 38
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//--**  ROUTER INFOMATION **--//
// AT+CWJAP
#define ROUTER_NAME "SSID名稱"
#define ROUTER_PW   "SSID連線密碼"

//--** CHT IOT Smart Platform **--//
// AT+CIPSTART
#define IOT_IP                    "iot.cht.com.tw"        // CHT IoT Smart Platform
#define IOT_PORT                  "1883"
// MQTT
#define SUBSCRIBE_TOPIC_SAYHELLO  "/v1/device/${device_id}/sensor/SayHello/rawdata"
#define PACKET_ID_SAYHELLO        0x0001
#define SUBSCRIBE_TOPIC_SHT31T    "/v1/device/${device_id}/sensor/SHT31_Temperature/rawdata"
#define PACKET_ID_SHT31T          0x0002
#define SUBSCRIBE_TOPIC_SHT31H    "/v1/device/${device_id}/sensor/SHT31_Humidity/rawdata"
#define PACKET_ID_SHT31H          0x0003
#define MQTT_PUBLISH_TOPIC        "/v1/device/${device_id}/rawdata"
#define PROJECT_KEY               "${PROJECT_KEY}"
#define CLIENT_ID                 "MQTT_ESP_01S"
#define MQTT_KEEP_ALIVE           35        // PINGREQ 發送的時間設定


/*--*//**---/*///**---*-*////***--*/*///***----*///--*/*///**--*/*//**--**/*//
* 主程式的部分:

setup() 的部分,主要是建立 TCP 和與 MQTT Broker 的連線 (line 418 ... 434),並且在成功連上 MQTT Broker 後接著訂閱三個主題 (line 439, 442, 445),並且處理來自 MQTT Broker 的 SUBACK 回應 (line 440, 443, 446)。

另外,為了能夠在不再想要接收訂閱主題訊息時正常斷開連線,程式中採用了中斷處理,預設輸入上拉 (line 410) 並且設定為下緣觸發中斷 (line 411)。

而在 loop() 的部分主要分為三個部分:
  • line 454 ... 489: 處理接收到的 PUBLISH 控制封包,並輸出最後的 Payload (Json) 字串;
  • line 491 ... 518: 在設定的 MQTT_KEEP_ALIVE 時間時,送出 PINGREQ 控制封包;
  • line 520 ... 538: 當接收到中斷訊號時,發送 MQTT DISCONNECT 控制封包以及關閉 TCP 連線;
後面那兩個部份很簡單,前幾篇應該已經做過說明,所以下面針對第一個部分做說明。

第一個部分是針對 PUBLISH 控制封包做解析 (不熟的另開一下 {入門 04} 網頁對照),所以一但有接收到資料 (line 455),馬上將資料讀進來直到接收到 '}' 字元停止 (line 456),然後在接收緩衝的 buffer 陣列接收到的最後一個字元處補上 '}' 字元 (line 457),完成一整個 PUBLISH 控制封包的接收,最後輸出此控制封包的總字元數目 (line 458)。

/*-/--*-*/*/*/*/***//-*-*-**-*/*-*-/*/*/*-*-/-////--/**/**--**/--///--//**----**//--**//**----***//*-**//*
不過這邊有一點要注意一下,因為 CHT_IoT_SP 現在只支援 QoS=0 的通訊方式,所以不需要回應;但若是使用其他的 MQTT Broker 或許就需要再進一步處理回應的指令,要不然很可能會被 MQTT Broker 斷開連線。
/*-/--*-*/*/*/*/***//-*-*-**-*/*-*-/*/*/*-*-/-////--/**/**--**/--///--//**----**//--**//**----***//*-**//*

整個接收到的 PUBLISH 控制封包會儲存在 buffer[] 字元緩衝陣列中,然後開始做解析 (line 460 ... 488)。

首先必須確認傳進來的控制封包是 PUBLSIH,根據控制封包格式可以得知 buffer[0] 必須要是 0x30 ... 0x3F 這個值,但因為低位元組是 Bits Flags,所以可以把她剔除掉,只留下高位元組 (line 461) 的部分。

確定是 PUBLISH 控制封包之後,緊接著就是要取出 Remaining Length。這個部分至少有一個位元組 (byte)、至多有 4 個,只要值沒有超過 127 就沒有下一個位元組需要再取 (line 469 ... 473);但因為 buffer[] 限定大小在 200,所以只會取到 2 個位元組。最後經過計算 (line 475) 就可得到 Remaining Length 的值,完成 PUBLISH Fixed Header 的部分。

緊接著下來就是 Variable Header (line 477 ... 481) 的部分,此部分包含:Topic Length (line 477 ... 481)和 Topic Name (X)。其中,Topic Name 不需要特別取出來,而是利用已知的 Topic Length 得到計算得到最後一段 Payload 的 Json 字串長度 (line 482 ... 483),再移動 count 到 Json 字串起頭的位置 (line 485),然後將整段 Json 字串輸出 (line 486 ... 487)。

line 486 ... 487 輸出的就是需要再進一步解析的 Json 字串,解析之後就能得到相關的欄位值:
  • id: 感測器的名稱;例如,SayHelloSHT31_Temperature 等......;
  • deviceId: 專案給予的設備編號;
  • time: 此筆數據紀錄的時間;
  • value: 數據值;
要解析 Json 字串,就是在此處加入程式碼。

/*--*//**---/*///**---*-*////***--*/*///***----*///--*/*///**--*/*//**--**/*//
* 副程式的部分:

大部分使用到的副程式都已經在前幾篇入門網頁說明過了,這裡只針對重要部分和修改過的 getResponse() 函式與新增的 mqttSubscribe() 函式做解釋。

連線的 SSID 名稱和密碼記得要在 line 162 ... 163 進行修改,否則無法正常連線。

MQTT_Subscribe_ESP01_Demo01.ino, bool station_createTCPPassthrough(), line 162 ... 163
162
163
  Serial.println( "AT+CWJAP_CUR=\"SSID名稱\",\"SSID密碼\"" );     // SSID 名稱和密碼要改
  dbgOutput( "AT+CWJAP_CUR=\"SSID名稱\",\"SSID密碼\"\r\n" );      // SSID 名稱和密碼要改

因為訂閱主題需要接收來自 MQTT Broker 的回應訊息,而這些回應訊息長短不一但有跡可循,所以在程式開頭處 (line 45) 宣告定義了下面的回應訊息作為給 getResponse() 函式的第二個參數使用

MQTT_Subscribe_ESP01_Demo01.ino, line 44 ... 45
44
45
// MQTT 控制封包的類型
enum { NNE=0, CONNACK, PUBACK, PINGRESP, SUBACK };

根據這參數決定要接收的字元數目 (line 222 ... 226),收滿就離開

MQTT_Subscribe_ESP01_Demo01.ino, getResponse( int timeout, int mqtt_res_type), line 222 ... 226
222
223
224
225
226
    if( mqtt_res_type == NNE ) continue;
    // PUBACK 和 SUBSCRIBE 回應都是固定長度
    if( ( (mqtt_res_type == PUBACK)    || (mqtt_res_type == PINGRESP) ||
          (mqtt_res_type == CONNACK) ) && flag == 4 ) break;
    if( mqtt_res_type == SUBACK        && flag == 5 ) break;

再來就是這程式的重頭戲:MQTT SUBSCRIBE 控制封包的處理函式。

mqttSubscribe( uint16_t packet_identifier, char *mqtt_subscribe_topic, uint8_t QoS_level ) 函式的第三個參數宣告定義在 line 47,但因為是使用在 CHT_IoT_SP 所以只支援 QoS0

MQTT_Subscribe_ESP01_Demo01.ino, line 46 ... 47
46
47
// MQTT 採用的 QoS
enum { QoS0=0, QoS1, QoS2 };

根據 MQTT SUBSCRIBE 控制封包的格式說明,可以寫出下面 mqttSubscribe() 函式,其中:
  • line 360: 用 pos 來標記現在處理到的封包位置,2 是起始值;
  • line 361: 由於限定字元陣列的大小為 127 bytes,因此 Fixed HeaderVariable Header 所使用的字元數目是可預知的 ( Remaining Length 只使用 1 byte );
  • line 363 ... 372: 計算主題的字元數 (line 365) 和 MQTT SUBSCRIBE 控制封包的總字元數 (line 367),若是總字元數超出 buf[] 所能緩衝的大小(line 368),則輸出錯誤訊息 (line 369 ... 370) 並停止程式 (line 371);
  • line 374 ... 378: Variable Header 的部分,只包含 Packet Identifier (2 bytes);
  • line 380 ... 389: Payload 的部分,包含:Topic Length (主題長度,2 bytes)、Topic Filter (主題名稱,mqtt_subscribe_topic_len bytes) 和 Requested QoS ( QoS Level,1 byte);
  • line 392: Fixed Header byte 1;根據前面參數的說明,此為固定值 0x82
  • line 393: Fixed Header byte 2 (Remaining Length),因為總長度限制在 127 bytes,所以只需要一個 byte 的大小,扣除 Fixed Header 所占用的位元組數目,就是需要填進去的值;
  • line 395: 以 16 進制的方式輸出 buf 陣列裡所有的數據到 SoftwareSerial
  • line 397: 將 buf 陣列裡的資料以無符號字元的方式全部輸出至 Serial
  • line 399: 在 1000ms 的時間內,等待 SUBACK 的回覆;
MQTT_Subscribe_ESP01_Demo01.ino, mqttSubscribe(), line 357 ... 400
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
void mqttSubscribe( uint16_t packet_identifier, char *mqtt_subscribe_topic, uint8_t QoS_level ) {
  //--** for MQTT Version 3.1.1 **--//
  
  int pos = 2;
  uint8_t buf[127] = {0};    // MQTT SUBSCRIBE 格式暫存陣列,主題訂閱所需的陣列不需要大

  //----**** Length ****----//
  //--** Payload, len of the topic
  int mqtt_subscribe_topic_len = strlen( mqtt_subscribe_topic );
  // total len = [FH] + [VH] + [PL] = [2] + [2] + [2+mqtt_subscribe_topic_len+1]    // last byte is requested QoS
  int total_len = 2 + 2 + 2 + mqtt_subscribe_topic_len + 1;
  if( total_len > 127 ) {    // 超出 buf 範圍,直接停止執行
    dbgOutput( F("Remining Length: ") ); dbgOutputln( total_len );
    dbgOutputln( F("SUBSCRIBE: out of buffer space, stop continue!") );
    stopRunning();
  }

  //----**** Variable Headers ****----//
  // The variable header contains a Packet Identifier (2-byte).
  // packet_identifier_len = 2;
  buf[pos++] = packet_identifier >> 8;        // get the high byte of the packet identifier
  buf[pos++] = packet_identifier & 0x00ff ;   // get the low byte of the packet identifier
  
  //----**** Payload ****----//
  //--** Topic Length
  buf[pos++] = 0x00;
  buf[pos++] = mqtt_subscribe_topic_len;
  //--** Topic Filter
  for( int i = 0; i < mqtt_subscribe_topic_len; i++ ) {
    buf[pos++] = mqtt_subscribe_topic[i];
  }
  //--** Reqquested QoS
  buf[pos++] = QoS_level;  

  //----**** Fixed Headers ****----//
  buf[0] = 0x82;
  buf[1] = pos - 2;    // Only 1 byte needed, bz Remaining Length < 128

  hexDump( buf, pos );
  for( int i = 0; i < pos; i++ ) Serial.write( buf[i] );
  //-- wait SUBACK
  getResponse( 1000, SUBACK );
  dbgOutputln( "OK" );  
}

到此完成程式碼的說明,完整的程式碼如下面所示

範例程式碼下載

【程式執行結果】

下面的影片,展示了此篇網頁程式碼的運行過程,與同時接收多個數據發佈的情況。

影片中使用 Postman 作為數據發佈到 CHT_IoT_SP MQTT Broker 的工具軟體,接收到控制封包後只單純將主要的 Json 部分輸出,並未再進一步進行解析。



加入 Json 字串解析的程式碼之後,跟上面影片相同的步驟,解析各個欄位名稱與欄位值的結果就如下面影片所示。



【結論】

主題訂閱與接收的部分應該是整個 MQTT 通訊協議最複雜的部份,尤其是接收的情況會根據 QoS 的指定而有所不同。所幸,CHT_IoT_SP 現只支援 QoS=0 的通訊方式,因此簡單許多!

只不過對於 CHT_IoT_SP 來說,不管是發佈或是消息接收都是使用 Json 格式的字串,它的組合比較簡單,但是解析就比較麻煩一點!但不管如何,已經到了此入門系列的最後,即便一開始是以 CHT_IoT_SP 作為開始,但是這只是個遵循 MQTT 通訊規範的平台而已,有了 MQTT 通訊協議的概念和知識之後,相信往後改用其他的平台也能得心應手的使用。


<< 部落格相關文章 >>

沒有留言:

張貼留言

留言屬名為"Unknown"或"不明"的用戶,大多這樣的留言都會直接被刪除掉,不會得到任何回覆!

發問問題,請描述清楚你(妳)的問題,別人回答前不會想去 "猜" 問題是什麼?

不知道怎麼發問,請看 [公告] 部落格提問須知 - 如何問問題 !