網頁最後修改時間:2018/11/10
在這一篇,我們來換個與 MQTT Broker 溝通的方法,改採用 Arduino 開發板加上 ESP8266 WiFi 模組的方式。
ESP8266 負責 WiFi 通訊,Arduino 開發板負責 ESP8266 的操作和 MQTT 控制封包的發送與接收,與定時連續發佈三個感測器的資料,並持續維持與 MQTT Broker 之間的聯繫。
*********************************************************************************
此網頁所用的材料可自行準備,或選用新版本的升級套件
- V2.1 版 { 萬物皆聯網-ESP8266 IoT(Internet of Things)入門學習套件 }
- Arduino UNO + ESP8266 (Wi-Fi) 二合一開發板 ( 可選擇獨立與偕同開發 )
- SHT31-D 溫溼度感測器
*********************************************************************************
不同於在入門 3網頁使用 RESTful API 更新感測器資料的方法,此篇在不改變既有硬體接線與 ESP8266 透傳模式下,改用 MQTT 協議來處理資料更新的部分。
不管採用何種通訊方式,兩者各有其適用的場合以及優缺點,從 IoT 實作的觀點[ref]可簡述比較如下:
- REST 是單方向通訊,MQTT 是雙向通訊 (即時回應)
REST 對於伺服器 (Server) 的連線是斷續的,只有在需要向伺服器更新或是取回感測器資料時,才會與其連線通訊。它並不會自動提供被更新的感測器資料給客戶端 (Client),只有客戶端向伺服端要求時才會有。
反之,MQTT 讓伺服端與客戶端隨時處於雙向通訊環境下,使得在感測器資料被更新的當下,終端主題訂閱裝置端 "幾乎即時" 就能獲得該感測器當下更新的數據,不需要再等待客戶端連線才能提供。 - 功耗
由於 REST 資源大部分都花在客戶端和伺服端連線、斷線和資源清理上,在傳輸相同數據資料的情況下,使用 MQTT 通訊比用 REST 通訊可減少 20% 的功率消耗。所以在建立以電池驅動的裝置上,使用 MQTT 協議通訊方式可以比 REST 擁有較長的運作時間。 - 安全性
MQTT 可以解決裝置處於防火牆後面開發的問題。 - 工作效能
MQTT 資料傳輸率快於 REST 20 - 30 倍,且可同時處理的連線也比 REST 多上許多。
參考電路圖:
參考電路圖如下圖所示。
其中,中間部分也可以使用 Arduino 開發板外接 ESP8266 無線模組來取代 (ESP8266 與 Arduino 開發板 TX / RX 連接部分要接電壓準位轉換模組),其餘的接線不變。
參考電路圖 |
程式碼說明:
在撰寫 CHT IoT SP MQTT 程式的時候,原先的設想是使用 MQTT 函式庫配合 ESP8266 AT 指令的方式來做,但是在嘗試過多種的函式庫組合之後就放棄了;不是 AT 函式庫連線有問題,就是 MQTT 函式庫與 AT 函式庫不能配合,零零總總的測試一堆都不能用,雖然也有想過直接用 ESP8266 來做不就好,但這終究不是我想要實現 MQTT 的方式!
所以,既然已經能夠在 ESP8266 AT 指令下的透傳模式手動發送與接收 CHT IoT SP MQTT 的數據資料,那麼為何不自己在既有的透傳模式的程式碼下加入 MQTT 協議通訊的支援呢 ?
就在搜尋了一些參考資料及把握既有的資源之後,終於可以開始寫程式了!
/*--*//**---/*///**---*-*////***--*/*///***----*///--*/*///**--*/*//**--**/*//
* 程式流程圖:
為了方便理解,下面將整個程式做成流程圖,主要是由 setup() 和 loop() 這兩個部分構成 (點擊圖面可看原圖)。
CHT IoT SP MQTT 訊息發佈程式流程圖 |
透傳模式下,裝置端發送 MQTT CONNECT 控制封包與 MQTT Broker 進行連接,然後發送字串 "hELLo" 給 SayHello 感測器,表示 MQTT 連接和發佈成功。接著下來就進入到了迴圈 loop() 運行的部分。
迴圈中設定了三個以秒計時的變數,分別負責 PUBLISH、PINGREQ 和 DISCONNECT 控制封包發送的時間,用戶需要完全了解上一篇所談及的 MQTT 控制封包的組成格式,才能掌握和了解程式碼所代表的意思。
程式以 nopnop2002 的程式作為一開始的雛形參考,但是由於該程式所連接的 MQTT Broker 和使用的 MQTT 版本 (v3.1) 都與我們的不同,所以最後是以之前撰寫的透傳程式作為基礎,加入了自行修改的部分,融合之後有了下面的程式。
/*-/--*-*/*/*/*/***//-*-*-**-*/*-*-/*/*/*-*-/-////--/**/**--**/--///--//**----**//--**//**----***//*-**//*
環境設置:
- MQTT v3.1.1
- Software
- Arduino IDE v1.8.5
- Hardware (下面兩種型式皆可)
- Arduino UNO + ESP8266 (WiFi) 二合一開發板**
- Arduino 開發板外接 ESP8266 無線模組**
- SHT31-D 溫溼度感測器模組
** ESP8266 AT 韌體版本 AT: 1.2.0.0, SDK: v1.5.4.1
/*-/--*-*/*/*/*/***//-*-*-**-*/*-*-/*/*/*-*-/-////--/**/**--**/--///--//**----**//--**//**----***//*-**//*/*--*//**---/*///**---*-*////***--*/*///***----*///--*/*///**--*/*//**--**/*//
* 主程式的部分:
setup() 的部分在上面已經講解過了,下面程式碼也有註解,就不再說明了。
MQTT_Pulish_ESP01_Demo01.ino, ( 01/10 ), setup()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | void setup() { Serial.begin( 115200 ); swSerialOutput.begin( 115200 ); delay( 3000 ); pinMode( _builtin_led, OUTPUT ); swSerialOutput.print( F("SoftwareSerial Ready!\r\n") ); /** SHT31 初始化 */ Wire.begin(); softResetSHT31(); swSerialOutput.print( F("SHT31 Ready!\r\n") ); //--** 開機時退出 TCP 連線與退出 AP if( !exitPassthroughMode() ) // +++ ledErrorIndicator( 1 ); // 關閉 TCP 連接 closeTCPConnection(); // AT+CIPCLOSE // 斷開與 AP 的連接 disconnectFromAP(); // AT+CWQAP //--** MQTT Broker 連線 **--// if( !station_createTCPPassthrough() ) ledErrorIndicator( 2 ); dbgOutput( F( "MQTT CONNECT..." ) ); //-- Client requests a connection to a server // void mqttConnect( int keep_alive, char *client_id, char *user_name, char *password ) { mqttConnect( MQTT_KEEP_ALIVE, CLIENT_ID, PROJECT_KEY, PROJECT_KEY ); //----------------------------- publishSelect( PUBLISH_SAYHELLO, "hELLo" ); lastmillis = millis(); } |
loop() 主要依計時的時間執行發送相對應的 MQTT 控制封包格式:
- line 9 - 11: 避免 now() 函式溢位
- line 14 - 79: 大概每秒鐘計時,和檢查是否到了要發送 MQTT 控制封包的時間
- line 28 - 37: 當 publishtimer 計數到了,就會連續發送三個 PUBLISH 控制封包,而且網頁上面的感測器數值也會 "幾乎即時" 的更新。
SayHello 每次發佈都會先將數字加一再上傳,起始值是 1。
SHT31 的溫溼度藉由 publishSelect 的第一個參數做選擇,而第二個參數只要不是空白字元,任意的字元與字串都可作為引數。
最後要將 publishtimer 和 pingreq 歸零。 - line 40 - 49: 當 pingreqtimer 時間到,發送 PINGREQ 的控制封包,然後等待回應;否則,若有接收到其他東西則將其輸出到 SoftwareSerial
- line 52 - 73: 當 disconnecttimer 時間到,清除 SayHello、SHT31_Temperature 和 SHT31_Humidity 三個感測器數值為空白,接著再發送 DISCONNECT 的控制封包。
最後,依照 MQTT 3.1.1 規範的要求;此時的發送端 (Client) 要自行關閉網路連線。
完成動作之後,讓 LED 每秒閃爍 5 次。
為了要拍攝影片的關係,所以必須在短時間內一次完成 MQTT CONNECT、PUBLISH、PINGREQ 和 DISCONNECT 的動作,一般使用上不會使用這麼短的 PUBLISH 和 PINGREQ 的時間,DISCONNECT 也會配合硬體按鈕來做關閉網路連接的動作,來提高電池驅動時的運作時間;不過,這卻是一個完整的 MQTT 消息發佈的流程。
MQTT_Pulish_ESP01_Demo01.ino, ( 02/10 ), loop()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | void loop() { static int publishtimer = 0; static int pingreqtimer = 0; static int disconnecttimer = 0; static int loop = 1; static int running_state = 1; unsigned long now = millis(); if ( (now - lastmillis) < 0) { // 避免計時溢位 lastmillis = now; } //----**** 主要計時迴圈 ****--// if ( (now - lastmillis) > 1000) { lastmillis = now; publishtimer++; pingreqtimer++; disconnecttimer++; //--** 每秒切換一次 LED 狀態都耗 digitalWrite( _builtin_led, running_state ); running_state = !running_state; //--** 每秒計數輸出一個標示 if ( (publishtimer % 10) == 0) dbgOutput( F("+") ); if ( (publishtimer % 10) != 0) dbgOutput( F(".") ); //--** PUBLISH **--// if( publishtimer == MQTT_PUBLISH_INTERVAL ) { dbgOutputln( F("Sending PUBLISH") ); publishSelect( PUBLISH_SAYHELLO, (String(loop++)).c_str() ); if( readTempHumSHT31() ) { publishSelect( PUBLISH_SHT31H, "H" ); // 第 2 個參數隨便丟一個值進去就是輸出溫溼度,但不能是 "" publishSelect( PUBLISH_SHT31T, "T" ); } publishtimer = 0; pingreqtimer = 0; } // PUBLISH //--** PINGREQ **--// if( pingreqtimer == MQTT_KEEP_ALIVE ) { dbgOutputln( F("Sending PINGREQ") ); // Sending MQTT PINGREQ Serial.write( mqtt_pingreq_pack[0] ); Serial.write( mqtt_pingreq_pack[1] ); getResponse( 1000 ); pingreqtimer = 0; } else { getResponse(10); } // PINGREQ //--** MQTT DISCONNECT **--// if( disconnecttimer == MQTT_DISCONNECT_INTERVAL ) { // DISCONNECT //--** 恢復感測器欄位值 publishSelect( PUBLISH_SAYHELLO ); // 只有一個參數表示以空白填入感測器欄位 publishSelect( PUBLISH_SHT31H ); publishSelect( PUBLISH_SHT31T ); //--** sending DISCONNECT CPF dbgOutputln( F("Sending DISCONNECT") ); // sending MQTT DISCONNECT Serial.write( mqtt_disconnect_pack[0] ); Serial.write( mqtt_disconnect_pack[1] ); //--** mqtt-v3.1.1-os29 October 2014, 3.14 DISCONNECT, 3.14.4 Respnse // After sending a DISCONNECT Packet the Client: // - MUST close the Network Connection[MQTT-3.14.4-1]. // - MUST NOT send any more Control Packets on that Network Connection[MQTT-3.14.4-2]. // if( exitPassthroughMode() ) { closeTCPConnection(); // AT+CIPCLOSE disconnectFromAP(); // AT+WQAP } else { dbgOutputln( F("AT+CLOSE Fail.") ); stopRunning(); } clearSerialBuffer(); dbgOutputln(); dbgOutputln( F("PUBLISH end, DISCONNECT from CHT IoT SP") ); ledErrorIndicator( 5 ); // 此處並非錯誤產生,而是顯示已完成 DISCONNECT 的動作 } // MQTT_DISCONNECT_INTERVAL } // 1000ms loop } // loop |
下面是程式用到的標頭檔與變數宣告和定義,大多看程式裡面註解或是之前的網頁就能清楚,這裡只解釋 line 43 和 line 44。
前面幾篇用到的 checkOK 函式都只檢查 "OK",但這裡提供可指定 "OK" 之外的選擇;預設沒有使用參數時,是採用 "OK" 作為輸入。
publishSelect 預設在不使用第二個參數時,第二個參數的值會被設為 "" 空白字元,這在程式中會將第一個參數指定的感測器數值的欄位自動設定為 "" 空白字元,發佈後該欄位就會被自動清除掉;若填入任意字元或是字串,則會以該值作為指定感測器的數值。
MQTT_Pulish_ESP01_Demo01.ino, ( 03/10 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | #include <SoftwareSerial.h> #include <Wire.h> //----**** 基本輸出入 ****----// //--** Software Serial #define _rxpin 2 #define _txpin 3 SoftwareSerial swSerialOutput( _rxpin, _txpin ); // RX, TX //--** 硬體資訊 #define SHT31_IIC_ADDRESS 0x44 // SHT31 預設的 IIC 地址 #define _builtin_led 13 //--** 除錯輸出 #define DEBUG #ifdef DEBUG #define dbgOutput( str ) (swSerialOutput.print( str )) #define dbgOutputln( str ) (swSerialOutput.println( str )) #else #define dbgOutput( str ) #define dbgOutputln( str ) #endif //-------------------------------------------------------*- //----**** CHT IoT Smart Platform ****----// //--** MQTT Broker #define MQTT_PUBLISH_TOPIC "/v1/device/7581817213/rawdata" #define PROJECT_KEY "PK4LQMHCHE1ZTQFQRW" #define CLIENT_ID "MQTT_ESP_01S" #define MQTT_PUBLISH_INTERVAL 18 #define MQTT_KEEP_ALIVE 8 #define MQTT_DISCONNECT_INTERVAL 60 //--** MQTT PINGREQ and PINGRESP Control Packet Format // MQTT Control Packets of the PINGREQ ( PING REQuest ) static uint8_t mqtt_pingreq_pack[] = { 0xC0, 0x00 }; // MQTT Control Packets of the DISCONNECT static uint8_t mqtt_disconnect_pack[] = { 0xE0, 0x00 }; //--** MQTT PUBLISH 感測器選擇 #define PUBLISH_SAYHELLO 1 #define PUBLISH_SHT31T 2 #define PUBLISH_SHT31H 3 //-------------------------------------------------------*- //----**** 函式宣告 ****----// bool checkOK( char* fstring = "OK" ); void publishSelect( uint8_t type, char* text = "" ); static unsigned long lastmillis; //-------------------------------------------------------*- //----**** 全局變數 ****----// //--** 儲存 SHT31-D 的溫溼度值 struct sht31_t{ float temperature; float humidity; } sht31 = { 0.0, 0.0 }; //-------------------------------------------------------*- |
station_createTCPPassthrough 函式將 ESP8266 加入到 AP 再連線到 CHT IoT SP,最後設定 ESP8266 處於透傳模式下。
相關資訊如下所示,請依自己實際的環境進行修改
- ROUTER 連線資訊
- SSID: Proteus-WiFi
- PASSWORD: asdfghjkl
- MQTT Broker IP address / Port number
- IP: iot.cht.com.tw
- Port: 1883
- line 10: 在這裡填入 ROUTER 連線資訊
- line 15: 在這裡輸入 MQTT Broker IP / Port 的資料
MQTT_Pulish_ESP01_Demo01.ino, ( 04/10 ), station_createTCPPassthrough()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | bool station_createTCPPassthrough() { // station swSerialOutput.print( F("Connecting to CHT IoT SP...") ); dbgOutput( "\r\n" ); // 建立 station Serial.println( "AT+CWMODE_CUR=1" ); // station mode dbgOutput( F("AT+CWMODE_CUR=1\r\n") ); if( !checkOK() ) return false; // 連線到 ROUTER Serial.println( "AT+CWJAP_CUR=\"Proteus-WiFi\",\"asdfghjkl\"" ); dbgOutput( "AT+CWJAP_CUR=\"Proteus-WiFi\",\"asdfghjkl\"\r\n" ); if( !checkOK() ) return false; // Establish TCP Transmission // <type>,<remote IP>,<remote port> Serial.println( "AT+CIPSTART=\"TCP\",\"iot.cht.com.tw\",1883"); dbgOutput( "AT+CIPSTART=\"TCP\",\"iot.cht.com.tw\",1883\r\n"); if( !checkOK() ) return false; // 設定傳輸模式為透傳模式 Serial.println( "AT+CIPMODE=1" ); dbgOutput( "AT+CIPMODE=1\r\n" ); if( !checkOK() ) return false; // 開始傳送資料 Serial.println( "AT+CIPSEND" ); dbgOutput( "AT+CIPSEND\r\n" ); if( !Serial.find( ">" ) ) return false; swSerialOutput.print( "done.\r\n\r\n" ); return true; } |
下面是幾個一般輔助函式的程式碼,其中幾個已在前面幾篇網頁做過說明,這裡只針對做過變更與新增的幾個作補充:
- line 1 - 7: checkOK 函式;修改 line 4 的數值,增加某些指令確認等待的次數與時間。
- line 30 - 37: stopRunning 函式;修改程式執行時為 LED 快速變換閃爍,而不是直接停止往下運作而已。
MQTT_Pulish_ESP01_Demo01.ino, ( 05/10 ), Functions( 01/02 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | bool checkOK( char* fstring ) { uint8_t i = 0; while( !Serial.find( fstring ) ) { if( ++i >= 8 ) return false; } return true; } bool exitPassthroughMode() { Serial.println( "AT" ); if( !checkOK() ) { Serial.print( "+++" ); delay( 1000 ); Serial.println( "AT" ); if( !checkOK() ) return false; } return true; } void closeTCPConnection() { Serial.println( "AT+CIPCLOSE" ); checkOK(); } void disconnectFromAP() { Serial.println( "AT+CWQAP" ); checkOK(); } void stopRunning() { int stat = 0; while(1) { digitalWrite( _builtin_led, stat ); stat = !stat; delay(100); } } void ledErrorIndicator( uint8_t times ) { static uint16_t offms[] = {900, 400, 250, 150, 100}; while(1) { for( int i = 0; i < times; i++ ) { digitalWrite( _builtin_led, HIGH ); delay( 100 ); digitalWrite( _builtin_led, LOW ); delay( offms[times - 1] ); } delay( 1000 ); } } |
下面是這個程式另外新增的一般輔助函式:
- line 1 - 12: hexDump 函式;將 buf 裡的內容以 16 進制的方式輸出。
- line 14 - 17: clearSerialBuffer 函式;讀取 Serial 裡面所有未讀出的字元,但不儲存。
MQTT_Pulish_ESP01_Demo01.ino, ( 06/10 ), Functions( 02/02 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | void hexDump( uint8_t *buf, int msize ) { #ifdef DEBUG swSerialOutput.print( "msize=" ); swSerialOutput.println( msize ); for( int i = 0; i < msize; i++ ) { if( buf[i] < 0x10 ) swSerialOutput.print( F("0") ); swSerialOutput.print( buf[i], HEX ); swSerialOutput.print( " " ); } swSerialOutput.println(); #endif } void clearSerialBuffer() { while (Serial.available()) Serial.read(); } |
getResponse 函式主要是用來接收 MQTT 控制封包發送之後,直接將對方所回應的資料以字元或 16 進制的方式輸出,但不做任何的動作。這也就說明了,為什麼程式流程圖判斷菱形處,為何只出一條線。
- line 7 - 24: 在指定的 (timeout) 時間內,若 Serial 接收到字元,則
- 若字元是 0x0d (表示 ASCII 的 Enter 鍵;CR;'\r' ),不做任何動作!
- 若字元是 0x0a (表示 ASCII 的換行鍵;LF;'\n' ),輸出 "\r\n"。
- 若字元小於 0x20,將此字元以16 進制的方式輸出,並以中擴號 [] 框住再輸出。
- 若都不是上面所提及的部分,則直接輸出
- line 23: 若有接收到任何字元的話,則在函式的最後輸出 "\r\n"。
MQTT_Pulish_ESP01_Demo01.ino, ( 07/10 ), getResponse()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | void getResponse( int timeout ){ char c; bool flag = false; char tmp[10]; long int time = millis() + timeout; while( time > millis() ) { if( Serial.available() ) { flag = true; c = Serial.read(); if(c == 0x0d) { } else if( c == 0x0a ) { dbgOutputln(); } else if( c < 0x20 ) { uint8_t cc = c; sprintf( tmp, "[0x%.2X]", cc ); dbgOutput( tmp ); } else { dbgOutput( c ); } } // end if } // end while if( flag ) dbgOutputln(); } |
解釋下面函式的時候,有不清楚的地方可以借助 Wireshark,會很有幫助!
在開始之前,要事先知道幾個重點參數:
- Protocol Name: MQTT
- MQTT Version: v3.1.1 (0x04)
- Connect Flag: 0xc2
- User Name Flag: 1
- Passwrod Flag: 1
- Clean Session Flag: 1
- others: 0
- Keep Alive: 8
- Client ID: MQTT_ESP_01S
- User Name: PK4LQMHCHE1ZTQFQRW
- Password: PK4LQMHCHE1ZTQFQRW
- line 4: 用 pos 來標記現在處理到的封包位置;12 是起始值,看 line 5 的解釋。
- line 5: 由於限定字元陣列的大小為 127 bytes,因此 Fixed Header 和 Variable Header 所使用的字元數目是可預知的 ( Remaining Length 只使用 1 byte )。
- line 7 - 13: 預先獲得 client_id、user_name 和 password 的字元數目,這些數值可做為計算 Remaining Length 使用。
- line 16: Fixed Header 的 Head Flag;這是固定值 0x10
- line 17: Remaining Length 的計算,是下面兩個項目的總和
- Variable Header: { Protocol Name Length } + { Protocol Name } + { Protocol Level } +
{ Connect Flags } + { Keep Alive } = 2 + 4 + 1 + 1 + 2 = 10 bytes - Payload: { Client ID Length [2-byte] } + { Client ID [n bytes] } +
{ User Name Length [2 bytes] } + { User Name [m bytes] } +
{ Password Length [2 bytes] } + { Password [m bytes] }
= 2 + client_id_len + 2 + user_name_len + 2 + password_len - line 18 - 23: 確認總輸出的 MQTT CONNECT 控制封包是否超出 127 bytes ? 若超出範圍,則停止全部動作;否則,繼續往下執行。
- line 24: buf 夠用的情況之下,將計算後的 Remaining Length 值填入 buf[1]
- line 26 - 36: Variable Header 的字元填入;除了 buf[9...11] 之外,這部分大多是固定的字元,可由協議規範中查到。
- line 38 - 59: 依序將 client_id、user_name 和 password 的內容值,填入到 buf 陣列裡。
- line 61: 以 16 進制的方式輸出 buf 陣列裡所有的數據到 SoftwareSerial。
- line 62: 將 buf 陣列裡的資料以無符號字元的方式全部輸出至 Serial。
- line 64: 在 1000ms 的時間內,等待 CONNACK 的回覆。
MQTT_Pulish_ESP01_Demo01.ino, ( 08/10 ), mqttConnect()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | void mqttConnect( int keep_alive, char *client_id, char *user_name, char *password ) { //--** for MQTT Version 3.1.1 **--// int pos = 12; uint8_t buf[128] ={ 0 }; //----**** Length ****----// //--** Payload, Client ID int client_id_len = strlen(client_id); //--** User Name Length int user_name_len = strlen( PROJECT_KEY ); //--** Password Length int password_len = strlen( PROJECT_KEY ); //----**** Fixed Headers ****----// buf[0] = 0x10; // Head Flags, Connect Command (1) int remaining_length = (10) + (2 + client_id_len + 2 + user_name_len + 2 + password_len ); int total_len = 2 + remaining_length; if( total_len > 127 ) { // 超出 buf 範圍,直接停止執行 dbgOutput( F("Remining Length: ") ); dbgOutputln( total_len ); dbgOutputln( F("CONNECT: out of buffer space, stop continue!") ); stopRunning(); } buf[1] = remaining_length; // Remaining Length //----**** Variable Headers ****----// buf[2] = 0x00; // [2 bytes] Protocol Name Length buf[3] = 0x04; buf[4] = 'M'; // [4 bytes] Protocol Name buf[5] = 'Q'; buf[6] = 'T'; buf[7] = 'T'; buf[8] = 0x04; // Version buf[9] = 0xc2; // Connect Flags: 0xc2 buf[10] = keep_alive >> 8; // Keep Alive buf[11] = keep_alive & 0x00FF; //----**** Payload ****----// //--** [ 2 bytes ] Client ID Length buf[pos++] = 0x00; buf[pos++] = client_id_len; //--** Client ID for( int i = 0; i < client_id_len; i++ ) { buf[pos++] = client_id[i]; } //--** [ 2 bytes ] User Name Length buf[pos++] = 0x00; buf[pos++] = user_name_len; //--** User Name for( int i = 0; i < user_name_len; i++ ) { buf[pos++] = user_name[i]; } //--** [ 2 bytes ] Password Length buf[pos++] = 0x00; buf[pos++] = password_len; //--** User Name for( int i = 0; i < password_len; i++ ) { buf[pos++] = password[i]; } hexDump( buf,pos ); for( int i = 0; i < pos; i++ ) Serial.write( buf[i] ); getResponse( 1000 ); dbgOutputln( "OK" ); dbgOutputln(); } |
publishSelect 函式,主要是用來組建 PUBLISH 的 Payload 部分,是 JSON 格式。第一個參數 type,是用來區分選擇的是三個感測器的哪一個;第二個參數 text,為要發佈的感測器值。
- line 6 - 12: SayHello 感測器 Payload 的處理。
- line 13 - 18: SHT31_Temperature 感測器 Payload 的處理。
- line 19 - 24: SHT31_Humidity 感測器 Payload 的處理。
- line 31: MQTT PUBLISH 發送。
MQTT_Pulish_ESP01_Demo01.ino, ( 09/10 ), publishSelect()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | void publishSelect( uint8_t type, char* text) { String payload = ""; switch( type ) { case PUBLISH_SAYHELLO: { payload = "[{\"id\":\"SayHello\",\"save\":false,\"value\":[\""; payload += String( text ); payload += "\"]}]"; } break; case PUBLISH_SHT31T: payload = "[{\"id\":\"SHT31_Temperature\",\"save\":false,\"value\":[\""; if( strcmp( text , "" ) == 0 ) payload += ""; else payload += String( sht31.temperature ); payload += "\"]}]"; break; case PUBLISH_SHT31H: payload = "[{\"id\":\"SHT31_Humidity\",\"save\":false,\"value\":[\""; if( strcmp( text , "" ) == 0 ) payload += ""; else payload += String( sht31.humidity ); payload += "\"]}]"; break; default: dbgOutputln( F("Wrong payload type!") ); stopRunning(); } dbgOutputln( payload ); mqttPublish( MQTT_PUBLISH_TOPIC, payload.c_str() ); } |
mqttPublish 函式,基本上與 mqttConnect 函式的解釋差不多!
- line 4: 用 pos 來標記現在處理到的封包位置,2 是起始值;
- line 5: 由於限定字元陣列的大小為 127 bytes,因此 Fixed Header 和 Variable Header 所使用的字元數目是可預知的 ( Remaining Length 只使用 1 byte );
- line 7 - 11: 預先獲得 mqtt_public_topic 和 payload 的字元數目,這些數值可做為計算 Remaining Length 使用;
- line 14: Fixed Header byte 1;根據前面參數的說明,此值為 0x30;
- line 15: Remaining Length 的計算,是下面兩個項目的總和
- Variable Header: { Topic Name Length [2 bytes] } + { Topic Name [n bytes] }
= 2 + mqtt_public_topic_len - Payload: { Payload [m bytes] }
= payload_len - line 16 - 20: 確認總輸出的 MQTT PUBLISH 控制封包是否超出 127 bytes ? 若超出範圍,則停止全部動作;否則,繼續往下執行;
- line 21: buf 夠用的情況之下,將計算後的 Remaining Length 值填入 buf[1];
- line 24 - 29: Variable Header 的部分;主要是輸入發佈主題 (Topic) 的長度與名稱;
- line 33 - 35: 將 payload 陣列的內容加入到 buf 陣列裡;
- line 37: 以 16 進制的方式輸出 buf 陣列裡所有的數據到 SoftwareSerial;
- line 38: 將 buf 陣列裡的資料以無符號字元的方式全部輸出至 Serial;
- line 39: 在 1000ms 的時間內,等待 PUBACK 的回覆;
MQTT_Pulish_ESP01_Demo01.ino, ( 10/10 ), mqttPublish()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | void mqttPublish( char *mqtt_publish_topic, char *payload) { //--** for MQTT Version 3.1.1 **--// int pos = 2; uint8_t buf[127] = { 0 }; //----**** Length ****----// //--** Variable Headers / Topic int mqtt_publish_topic_len = strlen( mqtt_publish_topic ); //--** Payload int payload_len = strlen( payload ); //----**** Fixed Headers ****----// buf[0] = 0x30; // Header Flags: 0x30 int remaining_length = 2 + mqtt_publish_topic_len + payload_len; int total_len = 2 + remaining_length; if( total_len > 127 ) { // 超出 buf 範圍,直接停止執行 dbgOutput( F("Remining Length: ") ); dbgOutputln( total_len ); dbgOutputln( F("PUBLISH: out of buffer space, stop continue!") ); stopRunning(); } buf[1] = remaining_length; //----**** Variable Headers ****----// buf[pos++] = 0x00; buf[pos++] = mqtt_publish_topic_len; //--** Topic for( int i = 0; i < mqtt_publish_topic_len; i++ ) { buf[pos++] = mqtt_publish_topic[ i ]; } //----**** Payload ****----// //--** Message for( int i = 0; i < payload_len; i++ ) { buf[pos++] = payload[i]; } hexDump( buf, pos ); for( int i = 0; i < pos; i++ ) Serial.write( buf[i] ); getResponse( 1000 ); dbgOutputln( "OK" ); dbgOutputln(); } |
/*--*//**---/*///**---*-*////***--*/*///***----*///--*/*///**--*/*//**--**/*//
* 程式執行結果:
整理上面的程式碼,編譯之後上傳到 Arduino 開發板就可以開始進行測試。
進入到 CHT IoT SP "專案管理" 頁面,點擊為此測試所建立的專案設備,就能開始通電測試;測試過程與結果就如下面影片所示。
結論:
網頁中的程式碼,實作與演示了上一篇 MQTT 消息發佈的整個流程,從影片中可以看到各控制封包的內容輸出,也能看到在 Wireshark 抓取的相同內容,這也解釋了前一篇網頁對這一篇的重要性!
另外,對於 getResponse 函式只抓取回應沒有處理的問題;如果真的有需要,可以自己加入處理回應的程式碼。即便不處理也沒有關係,但是發送端一定要收到回應,否則就是發送的控制格式出現問題,導致沒有收到接收端的回應。
最後,在關於 Remaining Length 的處理上,畢竟微控制器不會有那麼多的記憶體可以使用,因此除了能夠預先知道發送與接收的數據大小為何之外,是無法預先得知要使用的 Remaining Length 會使用多少個 bytes 的。而且就算知道了,也不一定就會有那麼多的記憶體空間來做為暫存 (像是影像、聲音檔....等),所以 Wireshark 配合 MQTT.fx 就是一個好用的工具;不但能夠抓取封包,也能用來得知發送與接收的封包大小,作為限定數據暫存區的上限 (Remaining Length 的計算可以參考上一篇網頁)。
MQTT 發佈消息這一部分寫好之後,接下來來看看怎麼做消息的訂閱 (SUBSCRIBE) 與接收了!
<< 部落格相關文章 >>
- 當 ESP8266 遇上中華電信 IoT 智慧聯網大平台 { 入門 - 07 } - 結合 Arduino + ESP8266 實現 MQTT 主題訂閱與接收
- 當 ESP8266 遇上中華電信 IoT 智慧聯網大平台 { 入門 - 06 } - 了解 MQTT 協議,學習如何訂閱 MQTT 主題與接收 MQTT 發佈消息
- 當 ESP8266 遇上中華電信 IoT 智慧聯網大平台 { 入門 - 04 } - 了解 MQTT 協議,學習如何發佈 MQTT 消息
ruten-proteus b logspot com/2018/09/esp8266AT-meets-CHT-IoT-SP-introduction-04.html - 當 ESP8266 遇上中華電信 IoT 智慧聯網大平台 { 入門 - 03 } - 使用 Arduino + ESP8266 上傳 SHT31 溫溼度數據
ruten-proteus b logspot com/2018/08/esp8266AT-meets-CHT-IoT-SP-introduction-03.html - 當 ESP8266 遇上中華電信 IoT 智慧聯網大平台 { 入門 - 02 } - 設備感測數據讀取與 (JSON) 解析
ruten-proteus blogspot com/2018/08/esp8266AT-meets-CHT-IoT-SP-introduction-02.html - 當 ESP8266 遇上中華電信 IoT 智慧聯網大平台 { 入門 - 01 } - 如何使用 ESP8266 利用 AT 指令取回中華電信 IoT 智慧聯網大平台上的設備感測器數據
ruten-proteus b logspot com/2018/08/esp8266AT-meets-CHT-IoT-SP-introduction-01.html - 如何使用 MCU 建立與其他 ESP8266 的 UDP 透傳通訊
ruten-proteus b logspot com/2018/08/mcu-pc-esp8266-UDP-passthrough-demo.html - ESP8266 AT 指令下的透傳模式
ruten-proteus b logspot com/2018/07/esp8266-at-uart-wifi-passthrough.html - 如何使用 AT 指令讓同在 AP+STA 模式下的 ESP8266 互相通訊 ?
ruten-proteus b logspot com/2017/11/esp8266-apsta-at-cmd.html - 操控 ESP8266 無線模組 - 經由 AP、STA 和 AP+STA 三種模式,學習 ESP8266 AT 指令
ruten-proteus b logspot com/2014/12/esp8266-at-command.html
沒有留言:
張貼留言
留言屬名為"Unknown"或"不明"的用戶,大多這樣的留言都會直接被刪除掉,不會得到任何回覆!
發問問題,請描述清楚你(妳)的問題,別人回答前不會想去 "猜" 問題是什麼?
不知道怎麼發問,請看 [公告] 部落格提問須知 - 如何問問題 !