2018年9月21日 星期五

當 ESP8266 遇上中華電信 IoT 智慧聯網大平台 { 入門 - 05 } - 結合 ESP8266 發佈 MQTT 消息

網頁最後修改時間:2018/11/10


經過了上一篇的說明,已對於發佈 MQTT 消息到中華電信 IoT 智慧聯網大平台 (下面簡稱 CHT IoT SP 或 平台),不論是通訊的過程或是控制封包格式有了基本的了解。而在實際的應用上,是不會直接 (也不應該) 自己去合成控制封包來發送,因為這也太折磨人了!

在這一篇,我們來換個與 MQTT Broker 溝通的方法,改採用 Arduino 開發板加上 ESP8266 WiFi 模組的方式。

ESP8266 負責 WiFi 通訊,Arduino 開發板負責 ESP8266 的操作和 MQTT 控制封包的發送與接收,與定時連續發佈三個感測器的資料,並持續維持與 MQTT Broker 之間的聯繫。

*********************************************************************************
此網頁所用的材料可自行準備,或選用新版本的升級套件
更多 ESP8266 相關商品,請至分類賣場
*********************************************************************************

不同於在入門 3網頁使用 RESTful API 更新感測器資料的方法,此篇在不改變既有硬體接線與 ESP8266 透傳模式下,改用 MQTT 協議來處理資料更新的部分。

不管採用何種通訊方式,兩者各有其適用的場合以及優缺點,從 IoT 實作的觀點[ref]可簡述比較如下:
  • REST 是單方向通訊,MQTT 是雙向通訊 (即時回應)
    REST 對於伺服器 (Server) 的連線是斷續的,只有在需要向伺服器更新或是取回感測器資料時,才會與其連線通訊。它並不會自動提供被更新的感測器資料給客戶端 (Client),只有客戶端向伺服端要求時才會有。
    反之,MQTT 讓伺服端與客戶端隨時處於雙向通訊環境下,使得在感測器資料被更新的當下,終端主題訂閱裝置端 "幾乎即時" 就能獲得該感測器當下更新的數據,不需要再等待客戶端連線才能提供。
  • 功耗
    由於 REST 資源大部分都花在客戶端和伺服端連線、斷線和資源清理上,在傳輸相同數據資料的情況下,使用 MQTT 通訊比用 REST 通訊可減少 20% 的功率消耗。所以在建立以電池驅動的裝置上,使用 MQTT 協議通訊方式可以比 REST 擁有較長的運作時間。
  • 安全性
    MQTT 可以解決裝置處於防火牆後面開發的問題。
  • 工作效能
    MQTT 資料傳輸率快於 REST 20 - 30 倍,且可同時處理的連線也比 REST 多上許多。

參考電路圖:

參考電路圖如下圖所示。

其中,中間部分也可以使用 Arduino 開發板外接 ESP8266 無線模組來取代 (ESP8266 與 Arduino 開發板 TX / RX 連接部分要接電壓準位轉換模組),其餘的接線不變。

參考電路圖

程式碼說明:

在撰寫 CHT IoT SP MQTT 程式的時候,原先的設想是使用 MQTT 函式庫配合 ESP8266 AT 指令的方式來做,但是在嘗試過多種的函式庫組合之後就放棄了;不是 AT 函式庫連線有問題,就是 MQTT 函式庫與 AT 函式庫不能配合,零零總總的測試一堆都不能用,雖然也有想過直接用 ESP8266 來做不就好,但這終究不是我想要實現 MQTT 的方式!

所以,既然已經能夠在 ESP8266 AT 指令下的透傳模式手動發送與接收 CHT IoT SP MQTT 的數據資料,那麼為何不自己在既有的透傳模式的程式碼下加入 MQTT 協議通訊的支援呢 ?

就在搜尋了一些參考資料及把握既有的資源之後,終於可以開始寫程式了!

/*--*//**---/*///**---*-*////***--*/*///***----*///--*/*///**--*/*//**--**/*//
* 程式流程圖:

為了方便理解,下面將整個程式做成流程圖,主要是由 setup()loop() 這兩個部分構成 (點擊圖面可看原圖)。

CHT IoT SP MQTT 訊息發佈程式流程圖
START 之後,對 Serial SHT31 部分的硬體做初始化,並且發送 AT 指令給 ESP8266 加入到 AP 並建立與 MQTT Broker 的 TCP 連線,然後進入到透傳模式。

透傳模式下,裝置端發送 MQTT CONNECT 控制封包與 MQTT Broker 進行連接,然後發送字串 "hELLo" 給 SayHello 感測器,表示 MQTT 連接和發佈成功。接著下來就進入到了迴圈 loop() 運行的部分。

迴圈中設定了三個以秒計時的變數,分別負責 PUBLISHPINGREQ DISCONNECT 控制封包發送的時間,用戶需要完全了解上一篇所談及的 MQTT 控制封包的組成格式,才能掌握和了解程式碼所代表的意思。

程式以 nopnop2002 的程式作為一開始的雛形參考,但是由於該程式所連接的 MQTT Broker 和使用的 MQTT 版本 (v3.1) 都與我們的不同,所以最後是以之前撰寫的透傳程式作為基礎,加入了自行修改的部分,融合之後有了下面的程式。

/*-/--*-*/*/*/*/***//-*-*-**-*/*-*-/*/*/*-*-/-////--/**/**--**/--///--//**----**//--**//**----***//*-**//*
環境設置:
  • MQTT v3.1.1
  • Software
    • Arduino IDE v1.8.5
  • Hardware (下面兩種型式皆可)
    • Arduino UNO + ESP8266 (WiFi) 二合一開發板**
    • Arduino 開發板外接 ESP8266 無線模組**
    • SHT31-D 溫溼度感測器模組
** ESP8266 AT 韌體版本 AT: 1.2.0.0, SDK: v1.5.4.1
/*-/--*-*/*/*/*/***//-*-*-**-*/*-*-/*/*/*-*-/-////--/**/**--**/--///--//**----**//--**//**----***//*-**//*

/*--*//**---/*///**---*-*////***--*/*///***----*///--*/*///**--*/*//**--**/*//
* 主程式的部分:

setup() 的部分在上面已經講解過了,下面程式碼也有註解,就不再說明了。
MQTT_Pulish_ESP01_Demo01.ino, ( 01/10 ), setup()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void setup() {
  Serial.begin( 115200 );
  swSerialOutput.begin( 115200 );
  delay( 3000 );

  pinMode( _builtin_led, OUTPUT );
  swSerialOutput.print( F("SoftwareSerial Ready!\r\n") );  

  /** SHT31 初始化 */
  Wire.begin();
  softResetSHT31();
  swSerialOutput.print( F("SHT31 Ready!\r\n") );

  //--** 開機時退出 TCP 連線與退出 AP
  if( !exitPassthroughMode() )  // +++
    ledErrorIndicator( 1 );
  // 關閉 TCP 連接
  closeTCPConnection();         // AT+CIPCLOSE
  // 斷開與 AP 的連接
  disconnectFromAP();           // AT+CWQAP

  //--** MQTT Broker 連線  **--//
  if( !station_createTCPPassthrough() )
    ledErrorIndicator( 2 );
  dbgOutput( F( "MQTT CONNECT..." ) );
  //-- Client requests a connection to a server
  // void mqttConnect( int keep_alive, char *client_id, char *user_name, char *password ) {
  mqttConnect( MQTT_KEEP_ALIVE, CLIENT_ID, PROJECT_KEY, PROJECT_KEY );
  //-----------------------------
  publishSelect( PUBLISH_SAYHELLO, "hELLo" );
  lastmillis = millis();  
}

loop() 主要依計時的時間執行發送相對應的 MQTT 控制封包格式:
  • line   9 - 11: 避免 now() 函式溢位
  • line 14 - 79: 大概每秒鐘計時,和檢查是否到了要發送 MQTT 控制封包的時間
    • line 28 - 37: 當 publishtimer 計數到了,就會連續發送三個 PUBLISH 控制封包,而且網頁上面的感測器數值也會 "幾乎即時" 的更新。
      SayHello 每次發佈都會先將數字加一再上傳,起始值是 1。
      SHT31 的溫溼度藉由 publishSelect 的第一個參數做選擇,而第二個參數只要不是空白字元,任意的字元與字串都可作為引數。
      最後要將 publishtimer pingreq 歸零。
    • line 40 - 49: 當 pingreqtimer 時間到,發送 PINGREQ 的控制封包,然後等待回應;否則,若有接收到其他東西則將其輸出到 SoftwareSerial
    • line 52 - 73: 當 disconnecttimer 時間到,清除 SayHelloSHT31_Temperature SHT31_Humidity 三個感測器數值為空白,接著再發送 DISCONNECT 的控制封包。
      最後,依照 MQTT 3.1.1 規範的要求;此時的發送端 (Client) 要自行關閉網路連線。
      完成動作之後,讓 LED 每秒閃爍 5 次。
為了要拍攝影片的關係,所以必須在短時間內一次完成 MQTT CONNECTPUBLISHPINGREQ DISCONNECT 的動作,一般使用上不會使用這麼短的 PUBLISH PINGREQ 的時間,DISCONNECT 也會配合硬體按鈕來做關閉網路連接的動作,來提高電池驅動時的運作時間;不過,這卻是一個完整的 MQTT 消息發佈的流程。
MQTT_Pulish_ESP01_Demo01.ino, ( 02/10 ), loop()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
void loop() {
  static int publishtimer     = 0;
  static int pingreqtimer     = 0;
  static int disconnecttimer  = 0;
  static int loop             = 1;
  static int running_state    = 1;

  unsigned long now = millis();
  if ( (now - lastmillis) < 0) {    // 避免計時溢位
    lastmillis = now;
  }

  //----**** 主要計時迴圈 ****--//
  if ( (now - lastmillis) > 1000) {
    lastmillis = now;
    publishtimer++;
    pingreqtimer++;
    disconnecttimer++;

    //--** 每秒切換一次 LED 狀態都耗
    digitalWrite( _builtin_led, running_state );
    running_state = !running_state;
    //--** 每秒計數輸出一個標示
    if ( (publishtimer % 10) == 0) dbgOutput( F("+") );
    if ( (publishtimer % 10) != 0) dbgOutput( F(".") );

    //--** PUBLISH **--//
    if( publishtimer == MQTT_PUBLISH_INTERVAL ) {
      dbgOutputln( F("Sending PUBLISH") );
      publishSelect( PUBLISH_SAYHELLO, (String(loop++)).c_str() );
      if( readTempHumSHT31() ) {
        publishSelect( PUBLISH_SHT31H, "H" );   // 第 2 個參數隨便丟一個值進去就是輸出溫溼度,但不能是 ""
        publishSelect( PUBLISH_SHT31T, "T" );       
      }
      publishtimer = 0;
      pingreqtimer = 0;
    }   // PUBLISH

    //--** PINGREQ **--//
    if( pingreqtimer == MQTT_KEEP_ALIVE ) {
      dbgOutputln( F("Sending PINGREQ") );
      // Sending MQTT PINGREQ
      Serial.write( mqtt_pingreq_pack[0] );
      Serial.write( mqtt_pingreq_pack[1] );
      getResponse( 1000 );
      pingreqtimer = 0;
    } else {
      getResponse(10);
    }   // PINGREQ

    //--** MQTT DISCONNECT **--//
    if( disconnecttimer == MQTT_DISCONNECT_INTERVAL ) {    // DISCONNECT
      //--** 恢復感測器欄位值
      publishSelect( PUBLISH_SAYHELLO );    // 只有一個參數表示以空白填入感測器欄位
      publishSelect( PUBLISH_SHT31H );
      publishSelect( PUBLISH_SHT31T );
      //--** sending DISCONNECT CPF
      dbgOutputln( F("Sending DISCONNECT") );
      // sending MQTT DISCONNECT
      Serial.write( mqtt_disconnect_pack[0] );
      Serial.write( mqtt_disconnect_pack[1] );
      //--** mqtt-v3.1.1-os29 October 2014, 3.14 DISCONNECT, 3.14.4 Respnse
      // After sending a DISCONNECT Packet the Client:
      //    - MUST close the Network Connection[MQTT-3.14.4-1].
      //    - MUST NOT send any more Control Packets on that Network Connection[MQTT-3.14.4-2].
      //
      if( exitPassthroughMode() ) {
        closeTCPConnection();   // AT+CIPCLOSE
        disconnectFromAP();     // AT+WQAP
      } else {
        dbgOutputln( F("AT+CLOSE Fail.") );
        stopRunning(); 
      }
      clearSerialBuffer();
      dbgOutputln();
      dbgOutputln( F("PUBLISH end, DISCONNECT from CHT IoT SP") );
      ledErrorIndicator( 5 );   // 此處並非錯誤產生,而是顯示已完成 DISCONNECT 的動作      
    }   // MQTT_DISCONNECT_INTERVAL
  }   // 1000ms loop
}   // loop

下面是程式用到的標頭檔與變數宣告和定義,大多看程式裡面註解或是之前的網頁就能清楚,這裡只解釋 line 43line 44

前面幾篇用到的 checkOK 函式都只檢查 "OK",但這裡提供可指定 "OK" 之外的選擇;預設沒有使用參數時,是採用 "OK" 作為輸入。

publishSelect 預設在不使用第二個參數時,第二個參數的值會被設為 "" 空白字元,這在程式中會將第一個參數指定的感測器數值的欄位自動設定為 "" 空白字元,發佈後該欄位就會被自動清除掉;若填入任意字元或是字串,則會以該值作為指定感測器的數值。
MQTT_Pulish_ESP01_Demo01.ino, ( 03/10 )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <SoftwareSerial.h>
#include <Wire.h>

//----**** 基本輸出入  ****----//
//--** Software Serial
#define _rxpin  2
#define _txpin  3
SoftwareSerial swSerialOutput( _rxpin, _txpin );       // RX, TX
//--** 硬體資訊
#define SHT31_IIC_ADDRESS   0x44  // SHT31 預設的 IIC 地址
#define _builtin_led        13
//--** 除錯輸出
#define DEBUG
#ifdef DEBUG
  #define dbgOutput( str )    (swSerialOutput.print( str ))
  #define dbgOutputln( str )  (swSerialOutput.println( str ))
#else
  #define dbgOutput( str )
  #define dbgOutputln( str )
#endif
//-------------------------------------------------------*-

//----**** CHT IoT Smart Platform ****----//
//--** MQTT Broker
#define MQTT_PUBLISH_TOPIC    "/v1/device/7581817213/rawdata"
#define PROJECT_KEY           "PK4LQMHCHE1ZTQFQRW"
#define CLIENT_ID             "MQTT_ESP_01S"
#define MQTT_PUBLISH_INTERVAL 18
#define MQTT_KEEP_ALIVE       8
#define MQTT_DISCONNECT_INTERVAL 60
//--** MQTT PINGREQ and PINGRESP Control Packet Format
// MQTT Control Packets of the PINGREQ ( PING REQuest )
static uint8_t mqtt_pingreq_pack[] = { 0xC0, 0x00 };
// MQTT Control Packets of the DISCONNECT
static uint8_t mqtt_disconnect_pack[] = { 0xE0, 0x00 };
//--** MQTT PUBLISH 感測器選擇
#define PUBLISH_SAYHELLO  1
#define PUBLISH_SHT31T    2
#define PUBLISH_SHT31H    3
//-------------------------------------------------------*-

//----**** 函式宣告 ****----//
bool checkOK( char* fstring = "OK" );
void publishSelect( uint8_t type, char* text = "" );
static unsigned long lastmillis;
//-------------------------------------------------------*-

//----**** 全局變數 ****----//
//--** 儲存 SHT31-D 的溫溼度值
struct sht31_t{
  float temperature;
  float humidity;
} sht31 = { 0.0, 0.0 };
//-------------------------------------------------------*-

station_createTCPPassthrough 函式將 ESP8266 加入到 AP 再連線到 CHT IoT SP,最後設定 ESP8266 處於透傳模式下。

相關資訊如下所示,請依自己實際的環境進行修改
  • ROUTER 連線資訊
    • SSID: Proteus-WiFi
    • PASSWORD: asdfghjkl
  • MQTT Broker IP address / Port number
    • IP: iot.cht.com.tw
    • Port: 1883

station_createTCPPassthrough()
  • line 10: 在這裡填入 ROUTER 連線資訊
  • line 15: 在這裡輸入 MQTT Broker IP / Port 的資料
MQTT_Pulish_ESP01_Demo01.ino, ( 04/10 ), station_createTCPPassthrough()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bool station_createTCPPassthrough() { // station
  swSerialOutput.print( F("Connecting to CHT IoT SP...") );

  dbgOutput( "\r\n" );
  // 建立 station
  Serial.println( "AT+CWMODE_CUR=1" );    // station mode
  dbgOutput( F("AT+CWMODE_CUR=1\r\n") );
  if( !checkOK() ) return false;
  // 連線到 ROUTER
  Serial.println( "AT+CWJAP_CUR=\"Proteus-WiFi\",\"asdfghjkl\"" );
  dbgOutput( "AT+CWJAP_CUR=\"Proteus-WiFi\",\"asdfghjkl\"\r\n" );
  if( !checkOK() ) return false;
  // Establish TCP Transmission
  // <type>,<remote IP>,<remote port>
  Serial.println( "AT+CIPSTART=\"TCP\",\"iot.cht.com.tw\",1883");
  dbgOutput( "AT+CIPSTART=\"TCP\",\"iot.cht.com.tw\",1883\r\n");
  if( !checkOK() ) return false;
  // 設定傳輸模式為透傳模式
  Serial.println( "AT+CIPMODE=1" );
  dbgOutput( "AT+CIPMODE=1\r\n" );
  if( !checkOK() ) return false;
  // 開始傳送資料
  Serial.println( "AT+CIPSEND" );
  dbgOutput( "AT+CIPSEND\r\n" );
  if( !Serial.find( ">" ) ) return false;

  swSerialOutput.print( "done.\r\n\r\n" );
  return true;
}

下面是幾個一般輔助函式的程式碼,其中幾個已在前面幾篇網頁做過說明,這裡只針對做過變更與新增的幾個作補充:
  • line   1 -  7: checkOK 函式;修改 line 4 的數值,增加某些指令確認等待的次數與時間。
  • line 30 - 37: stopRunning 函式;修改程式執行時為 LED 快速變換閃爍,而不是直接停止往下運作而已。
MQTT_Pulish_ESP01_Demo01.ino, ( 05/10 ), Functions( 01/02 )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
bool checkOK( char* fstring ) {
  uint8_t i = 0;
  while( !Serial.find( fstring ) ) {
    if( ++i >= 8 ) return false;
  }
  return true;
}

bool exitPassthroughMode() {
  Serial.println( "AT" );
  if( !checkOK() ) {
    Serial.print( "+++" );
    delay( 1000 );
    Serial.println( "AT" );
    if( !checkOK() ) return false;
  }
  return true;
}

void closeTCPConnection() {
  Serial.println( "AT+CIPCLOSE" );
  checkOK();
}

void disconnectFromAP() {
  Serial.println( "AT+CWQAP" );
  checkOK();
}

void stopRunning() {
  int stat = 0;
  while(1) {
    digitalWrite( _builtin_led, stat );
    stat = !stat;
    delay(100);
  }
}

void ledErrorIndicator( uint8_t times ) {
  static uint16_t offms[] = {900, 400, 250, 150, 100};
  while(1) {
    for( int i = 0; i < times; i++ ) {
      digitalWrite( _builtin_led, HIGH );
      delay( 100 );
      digitalWrite( _builtin_led, LOW );
      delay( offms[times - 1] );
    }
    delay( 1000 );
  }
}

下面是這個程式另外新增的一般輔助函式:
  • line   1 - 12: hexDump 函式;將 buf 裡的內容以 16 進制的方式輸出。
  • line 14 - 17: clearSerialBuffer 函式;讀取 Serial 裡面所有未讀出的字元,但不儲存。
MQTT_Pulish_ESP01_Demo01.ino, ( 06/10 ), Functions( 02/02 )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void hexDump( uint8_t *buf, int msize ) {
  #ifdef DEBUG
    swSerialOutput.print( "msize=" );
    swSerialOutput.println( msize );
    for( int i = 0; i < msize; i++ ) {
      if( buf[i] < 0x10 ) swSerialOutput.print( F("0") );
      swSerialOutput.print( buf[i], HEX );
      swSerialOutput.print( " " );
    }
    swSerialOutput.println();
  #endif
}

void clearSerialBuffer() {
  while (Serial.available())
    Serial.read();
}

getResponse 函式主要是用來接收 MQTT 控制封包發送之後,直接將對方所回應的資料以字元或 16 進制的方式輸出,但不做任何的動作。這也就說明了,為什麼程式流程圖判斷菱形處,為何只出一條線。
  • line   7 - 24: 在指定的 (timeout) 時間內,若 Serial 接收到字元,則
    • 若字元是 0x0d (表示 ASCII 的 Enter 鍵;CR;'\r' ),不做任何動作!
    • 若字元是 0x0a (表示 ASCII 的換行鍵;LF;'\n' ),輸出 "\r\n"。
    • 若字元小於 0x20,將此字元以16 進制的方式輸出,並以中擴號 [] 框住再輸出。
    • 若都不是上面所提及的部分,則直接輸出
  • line 23: 若有接收到任何字元的話,則在函式的最後輸出 "\r\n"。
MQTT_Pulish_ESP01_Demo01.ino, ( 07/10 ), getResponse()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void getResponse( int timeout ){
  char c;
  bool flag = false;
  char tmp[10];
  
  long int time = millis() + timeout;
  while( time > millis() ) {
    if( Serial.available() ) {
      flag = true;
      c = Serial.read();
      if(c == 0x0d) {           
      } else if( c == 0x0a ) {
        dbgOutputln();
      } else if( c < 0x20 ) {
        uint8_t cc = c;
        sprintf( tmp, "[0x%.2X]", cc );
        dbgOutput( tmp );
      } else {
        dbgOutput( c );
      } 
    } // end if
  } // end while
  if( flag ) dbgOutputln();
}

解釋下面函式的時候,有不清楚的地方可以借助 Wireshark,會很有幫助!

在開始之前,要事先知道幾個重點參數:
  • Protocol Name: MQTT
  • MQTT Version: v3.1.1 (0x04)
  • Connect Flag: 0xc2
    • User Name Flag: 1
    • Passwrod Flag: 1
    • Clean Session Flag: 1
    • others: 0
  • Keep Alive: 8
  • Client ID: MQTT_ESP_01S
  • User Name: PK4LQMHCHE1ZTQFQRW
  • Password: PK4LQMHCHE1ZTQFQRW
有了這些參數,配合 MQTT 協議規範和 Wireshark,就可以開始撰寫 MQTT CONNECT 控制封包格式合成的程式。
  • line   4:pos 來標記現在處理到的封包位置;12 是起始值,看 line 5 的解釋。
  • line   5: 由於限定字元陣列的大小為 127 bytes,因此 Fixed Header 和 Variable Header 所使用的字元數目是可預知的 ( Remaining Length 只使用 1 byte )。
  • line   7 - 13: 預先獲得 client_iduser_name password 的字元數目,這些數值可做為計算 Remaining Length 使用。
  • line 16: Fixed Header 的 Head Flag;這是固定值 0x10
  • line 17: Remaining Length 的計算,是下面兩個項目的總和
    • Variable Header: { Protocol Name Length } + { Protocol Name } + { Protocol Level } +
                                   { Connect Flags } + { Keep Alive } = 2 + 4 + 1 + 1 + 2 = 10 bytes
    • Payload: { Client ID Length [2-byte] } + { Client ID [n bytes] } +
                      { User Name Length [2 bytes] } + { User Name [m bytes] } +
                      { Password Length [2 bytes] } + { Password [m bytes] }
                  = 2 + client_id_len + 2 + user_name_len + 2 + password_len 
  • line 18 - 23: 確認總輸出的 MQTT CONNECT 控制封包是否超出 127 bytes ? 若超出範圍,則停止全部動作;否則,繼續往下執行。
  • line 24: buf 夠用的情況之下,將計算後的 Remaining Length 值填入 buf[1]
  • line 26 - 36: Variable Header 的字元填入;除了 buf[9...11] 之外,這部分大多是固定的字元,可由協議規範中查到。
  • line 38 - 59: 依序將 client_iduser_name password 的內容值,填入到 buf 陣列裡。
  • line 61: 以 16 進制的方式輸出 buf 陣列裡所有的數據到 SoftwareSerial
  • line 62:buf 陣列裡的資料以無符號字元的方式全部輸出至 Serial
  • line 64: 在 1000ms 的時間內,等待 CONNACK 的回覆。
MQTT_Pulish_ESP01_Demo01.ino, ( 08/10 ), mqttConnect()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
void mqttConnect( int keep_alive, char *client_id, char *user_name, char *password ) {
  //--** for MQTT Version 3.1.1 **--//
  
  int pos = 12;
  uint8_t buf[128] ={ 0 };

  //----**** Length ****----//
  //--** Payload, Client ID
  int client_id_len = strlen(client_id);
  //--** User Name Length
  int user_name_len = strlen( PROJECT_KEY );
  //--** Password Length
  int password_len = strlen( PROJECT_KEY );

  //----**** Fixed Headers ****----//
  buf[0]  = 0x10;     // Head Flags, Connect Command (1)
  int remaining_length = (10) + (2 + client_id_len + 2 + user_name_len + 2 + password_len );
  int total_len = 2 + remaining_length;
  if( total_len > 127 ) {  // 超出 buf 範圍,直接停止執行
    dbgOutput( F("Remining Length: ") ); dbgOutputln( total_len );
    dbgOutputln( F("CONNECT: out of buffer space, stop continue!") );
    stopRunning();
  }
  buf[1]  = remaining_length;  // Remaining Length

  //----**** Variable Headers ****----//
  buf[2]  = 0x00;     // [2 bytes] Protocol Name Length
  buf[3]  = 0x04;
  buf[4]  = 'M';      // [4 bytes] Protocol Name
  buf[5]  = 'Q';
  buf[6]  = 'T';
  buf[7]  = 'T';
  buf[8]  = 0x04;     // Version
  buf[9]  = 0xc2;     // Connect Flags: 0xc2
  buf[10] = keep_alive >> 8;      // Keep Alive
  buf[11] = keep_alive & 0x00FF;

  //----**** Payload ****----//
  //--** [ 2 bytes ] Client ID Length
  buf[pos++] = 0x00;
  buf[pos++] = client_id_len;
  //--** Client ID
  for( int i = 0; i < client_id_len; i++ ) {
    buf[pos++] = client_id[i];
  }
  //--** [ 2 bytes ] User Name Length
  buf[pos++] = 0x00;
  buf[pos++] = user_name_len;
  //--** User Name
  for( int i = 0; i < user_name_len; i++ ) {
    buf[pos++] = user_name[i];
  }
  //--** [ 2 bytes ] Password Length
  buf[pos++] = 0x00;
  buf[pos++] = password_len;
  //--** User Name
  for( int i = 0; i < password_len; i++ ) {
    buf[pos++] = password[i];
  }  

  hexDump( buf,pos );
  for( int i = 0; i < pos; i++ ) Serial.write( buf[i] );

  getResponse( 1000 );
  dbgOutputln( "OK" ); dbgOutputln();
}

publishSelect 函式,主要是用來組建 PUBLISH Payload 部分,是 JSON 格式。第一個參數 type,是用來區分選擇的是三個感測器的哪一個;第二個參數 text,為要發佈的感測器值。
  • line   6 - 12: SayHello 感測器 Payload 的處理。
  • line 13 - 18: SHT31_Temperature 感測器 Payload 的處理。
  • line 19 - 24: SHT31_Humidity 感測器 Payload 的處理。
  • line 31: MQTT PUBLISH 發送。
由於只是做測試的緣故,發佈的感測器值並不儲存在伺服器,所以 JSON 的 save 欄位值要設定為 false;在這特別再次說明!
MQTT_Pulish_ESP01_Demo01.ino, ( 09/10 ), publishSelect()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void publishSelect( uint8_t type, char* text) {

  String payload = "";

  switch( type ) {
    case PUBLISH_SAYHELLO:
    {
      payload = "[{\"id\":\"SayHello\",\"save\":false,\"value\":[\"";
      payload += String( text );
      payload += "\"]}]";
    }
    break;
    case PUBLISH_SHT31T:
      payload  = "[{\"id\":\"SHT31_Temperature\",\"save\":false,\"value\":[\"";
      if( strcmp( text , "" ) == 0 ) payload += "";
      else payload += String( sht31.temperature );
      payload += "\"]}]";
    break;
    case PUBLISH_SHT31H:
      payload  = "[{\"id\":\"SHT31_Humidity\",\"save\":false,\"value\":[\"";
      if( strcmp( text , "" ) == 0 ) payload += "";
      else payload += String( sht31.humidity );
      payload += "\"]}]";
    break;
    default: 
      dbgOutputln( F("Wrong payload type!") );
      stopRunning();
  }

  dbgOutputln( payload );
  mqttPublish( MQTT_PUBLISH_TOPIC, payload.c_str() );
}

mqttPublish 函式,基本上與 mqttConnect 函式的解釋差不多!
  • line   4: 用 pos 來標記現在處理到的封包位置,2 是起始值;
  • line   5: 由於限定字元陣列的大小為 127 bytes,因此 Fixed Header 和 Variable Header 所使用的字元數目是可預知的 ( Remaining Length 只使用 1 byte );
  • line   7 - 11: 預先獲得 mqtt_public_topic 和 payload 的字元數目,這些數值可做為計算 Remaining Length 使用;
  • line 14: Fixed Header byte 1;根據前面參數的說明,此值為 0x30
  • line 15: Remaining Length 的計算,是下面兩個項目的總和
    • Variable Header: { Topic Name Length [2 bytes] } + { Topic Name [n bytes] }
                                  = 2 + mqtt_public_topic_len
    • Payload: { Payload [m bytes] }
                  = payload_len
  • line 16 - 20: 確認總輸出的 MQTT PUBLISH 控制封包是否超出 127 bytes ? 若超出範圍,則停止全部動作;否則,繼續往下執行;
  • line 21: buf 夠用的情況之下,將計算後的 Remaining Length 值填入 buf[1]
  • line 24 - 29: Variable Header 的部分;主要是輸入發佈主題 (Topic) 的長度與名稱;
  • line 33 - 35: 將 payload 陣列的內容加入到 buf 陣列裡;
  • line 37: 以 16 進制的方式輸出 buf 陣列裡所有的數據到 SoftwareSerial
  • line 38: 將 buf 陣列裡的資料以無符號字元的方式全部輸出至 Serial
  • line 39: 在 1000ms 的時間內,等待 PUBACK 的回覆;
MQTT_Pulish_ESP01_Demo01.ino, ( 10/10 ), mqttPublish()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void mqttPublish( char *mqtt_publish_topic, char *payload) {
  //--** for MQTT Version 3.1.1 **--//

  int pos = 2;
  uint8_t buf[127] = { 0 };

  //----**** Length ****----//
  //--** Variable Headers / Topic
  int mqtt_publish_topic_len = strlen( mqtt_publish_topic );
  //--** Payload
  int payload_len = strlen( payload );

  //----**** Fixed Headers ****----//
  buf[0]  = 0x30;   // Header Flags: 0x30
  int remaining_length = 2 + mqtt_publish_topic_len + payload_len;
  int total_len = 2 + remaining_length;
  if( total_len > 127 ) {  // 超出 buf 範圍,直接停止執行
    dbgOutput( F("Remining Length: ") ); dbgOutputln( total_len );
    dbgOutputln( F("PUBLISH: out of buffer space, stop continue!") );    stopRunning();
  }
  buf[1] = remaining_length;

  //----**** Variable Headers ****----//
  buf[pos++]  = 0x00;
  buf[pos++]  = mqtt_publish_topic_len;
  //--** Topic
  for( int i = 0; i < mqtt_publish_topic_len; i++ ) {
    buf[pos++] = mqtt_publish_topic[ i ];
  }

  //----**** Payload ****----//
  //--** Message
  for( int i = 0; i < payload_len; i++ ) {
    buf[pos++] = payload[i];
  }

  hexDump( buf, pos );
  for( int i = 0; i < pos; i++ ) Serial.write( buf[i] );
  getResponse( 1000 );
  dbgOutputln( "OK" ); dbgOutputln();
}

/*--*//**---/*///**---*-*////***--*/*///***----*///--*/*///**--*/*//**--**/*//
* 程式執行結果:

整理上面的程式碼,編譯之後上傳到 Arduino 開發板就可以開始進行測試。

進入到 CHT IoT SP "專案管理" 頁面,點擊為此測試所建立的專案設備,就能開始通電測試;測試過程與結果就如下面影片所示。



結論:

網頁中的程式碼,實作與演示了上一篇 MQTT 消息發佈的整個流程,從影片中可以看到各控制封包的內容輸出,也能看到在 Wireshark 抓取的相同內容,這也解釋了前一篇網頁對這一篇的重要性!

另外,對於 getResponse 函式只抓取回應沒有處理的問題;如果真的有需要,可以自己加入處理回應的程式碼。即便不處理也沒有關係,但是發送端一定要收到回應,否則就是發送的控制格式出現問題,導致沒有收到接收端的回應。

最後,在關於 Remaining Length 的處理上,畢竟微控制器不會有那麼多的記憶體可以使用,因此除了能夠預先知道發送與接收的數據大小為何之外,是無法預先得知要使用的 Remaining Length 會使用多少個 bytes 的。而且就算知道了,也不一定就會有那麼多的記憶體空間來做為暫存 (像是影像、聲音檔....等),所以 Wireshark 配合 MQTT.fx 就是一個好用的工具;不但能夠抓取封包,也能用來得知發送與接收的封包大小,作為限定數據暫存區的上限 (Remaining Length 的計算可以參考上一篇網頁)。

MQTT 發佈消息這一部分寫好之後,接下來來看看怎麼做消息的訂閱 (SUBSCRIBE) 與接收了!


<< 部落格相關文章 >>

沒有留言:

張貼留言

留言屬名為"Unknown"或"不明"的用戶,大多這樣的留言都會直接被刪除掉,不會得到任何回覆!

發問問題,請描述清楚你(妳)的問題,別人回答前不會想去 "猜" 問題是什麼?

不知道怎麼發問,請看 [公告] 部落格提問須知 - 如何問問題 !