時間序列資料庫在 IoT 平臺中扮演著儲存和分析時序資料的重要角色。本文將逐步說明如何利用 MySQL 建立時間序列資料庫,並結合 Node-RED 和 MQTT 訊息代理,打造一個完整的 IoT 平臺。過程中將會使用 phpMyAdmin 建立資料庫和使用者帳戶,並在 Node-RED 中安裝必要的 MySQL 節點。此外,我們也會建立資料庫監聽器、設計 REST API 供外部系統存取資料,並探討如何整合 WebSocket 與 MQTT 以提升平臺的即時互動能力。最後,我們將會設定訊息代理的存取控制清單(ACL),確保平臺的安全性。

建立時間序列資料庫

時間序列資料庫是IoT平臺的關鍵元件之一。我們將建立一個名為tSeriesDB的資料庫,並新增一個使用者帳戶和密碼。接下來,我們將新增一個資料表結構,根據之前定義的schema。

建立資料庫和使用者帳戶

  1. 使用phpMyAdmin存取MySQL介面。
  2. 建立一個名為tSeriesDB的新資料庫。
  3. 新增一個使用者帳戶和密碼。

新增資料表結構

  1. 選擇tSeriesDB資料庫。
  2. 點選「New」選項新增一個資料表。
  3. 輸入資料表名稱thingData
  4. 新增一欄deleted,型別為二進位制。

安裝Node-RED所需的節點

Node-RED的預設安裝不包含MySQL節點,因此我們需要新增一個MySQL節點。

  1. 開啟Node-RED介面。
  2. 選擇「Manage Palette」選項。
  3. 搜尋MySQL節點並安裝。

建立第一個流程

  1. 拖曳一個「inject」節點到工作區。
  2. 拖曳一個「debug」節點到工作區。
  3. 連線兩個節點。
  4. 點選「Deploy」按鈕啟用流程。

建立資料庫接聽器

接下來,我們需要建立一個資料庫接聽器,監聽資料庫的變化。

建立REST API

我們需要建立一個REST API,提供資料的釋出和檢索功能。

建立REST API的流程

  1. 建立一個REST API節點。
  2. 定義API的路由和方法。
  3. 實現API的邏輯。

圖表翻譯

  graph LR
    A[開始] --> B[建立時間序列資料庫]
    B --> C[安裝Node-RED所需的節點]
    C --> D[建立第一個流程]
    D --> E[建立資料庫接聽器]
    E --> F[建立REST API]

圖表翻譯

此圖表描述了建立IoT平臺的流程,從建立時間序列資料庫開始,接著安裝Node-RED所需的節點,建立第一個流程,建立資料庫接聽器,最後建立REST API。

Node-RED 編輯器和第一個流程式列

Node-RED 是一個根據 Node.js 的視覺化程式設計工具,允許使用者建立流程式列來處理和轉換資料。在本節中,我們將介紹 Node-RED 編輯器和建立第一個流程式列的步驟。

新增 MQTT 釋出功能

要新增 MQTT 釋出功能,我們需要將 mqtt out 節點新增到流程式列中。這個節點需要配置,包括提供訊息代理的詳細資訊和憑證。配置完成後,流程式列將如下所示:

  flowchart TD
    A[Inject Node] --> B[Mqtt Out Node]
    B --> C[Debug Node]

REST API 訊息釋出器

現在,我們將建立一個 REST API 來啟用相同的功能。這個 API 將允許裝置或應用程式使用 HTTP 協議釋出訊息。

  flowchart TD
    A[Http In Node] --> B[Function Node]
    B --> C[Mqtt Out Node]
    C --> D[Http Response Node]

在這個流程式列中,我們使用 http in 節點來處理來自網路的請求。然後,我們使用 function 節點來建立一個訊息,並將其釋出到 MQTT 主題中。最後,我們使用 http response 節點來回應 API 請求。

內容解密:

在這個流程式列中,我們使用 function 節點來建立一個訊息,並將其釋出到 MQTT 主題中。以下是 function 節點的程式碼:

// 建立訊息
msg.topic = msg.req.params.topic;
msg.payload = msg.req.params.payload;
msg.qos = 2;
msg.retain = false;
return msg;

圖表翻譯:

以下是流程式列的 Mermaid 圖表:

  flowchart TD
    A[Http In Node] --> B[Function Node]
    B --> C[Mqtt Out Node]
    C --> D[Http Response Node]

這個圖表顯示了流程式列的邏輯關係,包括 http in 節點、function 節點、mqtt out 節點和 http response 節點。

建立資料庫監聽器

為了建立資料庫監聽器,我們需要建立一個節點,該節點可以監聽MQTT訊息串流,並將所有訊息儲存到時間序列資料庫中。

步驟1:新增MQTT輸入節點

首先,我們需要新增一個MQTT輸入節點到工作區中。這個節點將用於監聽MQTT訊息串流。

步驟2:新增除錯節點

接下來,我們需要新增一個除錯節點,該節點將用於顯示MQTT訊息串流中的訊息。

步驟3:配置MQTT輸入節點

雙擊MQTT輸入節點,配置其訂閱資訊和MQTT代理詳細資訊。這裡,我們訂閱所有訊息,使用#訂閱和QoS = 2進行可靠的訂閱。

步驟4:部署流程

部署流程後,監視除錯側欄中的訊息。已經有一個活躍的時間戳發布者,每15秒發布一次當前的時間戳。這些訊息應該每15秒顯示在除錯側欄中。

步驟5:修改流程

現在,如果我們使用/pubs API發布任何新訊息,該訊息也應該顯示在除錯輸出中。一次這個驗證後,讓我們修改相同的流程式列。新增一個MySQL節點,配置其設定,使用我們的時間序列資料庫憑證。

步驟6:建立查詢函式

建立一個查詢函式,該函式將用於將MQTT訊息插入到資料庫中。以下是查詢函式的程式碼:

// 建立查詢
// 獲取微秒時間
var timestamp = new Date().getTime()/1000;
// 將其填充為尾隨零
timestamp = timestamp.toString() + "000";
// 修剪到確切長度 10 + 1 + 3
timestamp = timestamp.substring(0, 14);
var strQuery = "INSERT INTO thingData (topic, payload, timestamp, deleted) VALUES ('" + escape(msg.topic) + "','" + escape(msg.payload) + "','" + timestamp + "', 0);";
msg.topic = strQuery;
return msg;

這個函式首先獲得最新的時間戳,並將其轉換為一個填充零的字串,然後建立一個INSERT查詢,使用標準MySQL插入語法。

步驟7:部署流程

部署流程後,我們可以使用cURL或等待15秒發布時間戳。然後,登入phpMyAdmin,驗證資料庫中是否添加了新記錄。

結果

現在,任何發布到MQTT訊息串流的訊息都將被記錄到資料庫中。我們的資料庫監聽器現在已經功能正常。

REST API 訊息擷取器

現在,我們要建立一個 API 來擷取儲存在資料庫中的訊息。在我們的平臺願望清單中,我們列出了兩個需求:

  • D1. 擷取單一資料記錄:允許應用程式和裝置根據指定的主題或主題模式查詢單一資料記錄。
  • D2. 擷取多個資料記錄:允許應用程式和裝置根據指定的主題或主題模式查詢多個資料記錄。

這兩個 API 會以類似的方式建立,但這次,我們會使用 MySQL 節點來存取和擷取資料庫值,使用 SELECT SQL 命令。請參考圖 7-13 的設定配置和流程式列。

圖 7-13. 從時間序列資料儲存中擷取訊息

我們將兩個 HTTP 輸入節點的輸出繫結到相同的流程式列。透過玄貓,我們可以容納兩種變化的 /get/:topic/get/:topic/last/:count。第一個只會從時間序列資料庫中擷取一個訊息。第二個指定要擷取的最新訊息數量。

以下程式碼片段顯示了建立查詢函式區塊的程式碼:

// 建立查詢
// 如果需要的記錄數未指定
// 設定預設值為 1
// 建立 SQL 查詢
msg.topic = "SELECT id, topic, payload, timestamp " +
"FROM thingData " +
"WHERE topic='" + escape(msg.req.params.topic) + "' " +
"AND deleted=0 " +
"ORDER BY 玄貓";
return msg;

在這段程式碼中,前兩行檢查引數計數的存在。請注意,這個引數只在要求多個最新訊息時需要。因此,在單一訊息查詢中,這個引數是不存在的。如果它不存在,我們將其設為預設值 1。

然後,我們使用標準的 SELECT 查詢來擷取資料庫記錄。在這個查詢中,我們使用 WHERE 來搜尋指定的主題和 deleted=0 來選擇未刪除的記錄。此外,我們使用 ORDER BY 玄貓。

由於這是一個時間序列資料庫,且由於我們建立資料庫監聽器的方式,所有值都會按照時間順序排列,最新的值在最上面(如果按時間排序)。現在,我們來檢查這兩個 API,首先使用 cURL:

輸出 1

HTTP/1.1 200 OK
Server: Apache
X-Powered-By: Express
Access-Control-Allow-Origin: *
Content-Type: application/json; charset=utf-8
Content-Length: 79
ETag: W/"38-O0I0tXOkbEG/goFLAbvDMSnHdqE"
[{"id":8,"topic":"myTopic","payload":"myPayload","timestamp":"1543717154.899"}]

輸出 2

[{"id":8,"topic":"myTopic","payload":"myPayload","timestamp":"1543717154.899"},
{"id":7,"topic":"myTopic","payload":"myPayload","timestamp":"1543716966.189"},
{"id":6,"topic":"myTopic","payload":"myPayload","timestamp":"1543716787.289"}]

內容解密:

這個程式碼片段使用了 MySQL 節點來存取和擷取資料庫值,使用 SELECT SQL 命令。它會根據指定的主題或主題模式查詢單一或多個資料記錄。這個 API 會傳回 JSON 格式的資料,包含 id、topic、payload 和 timestamp 等欄位。

圖表翻譯:

  flowchart TD
    A[HTTP 輸入節點] --> B[MySQL 節點]
    B --> C[SELECT SQL 查詢]
    C --> D[資料庫記錄]
    D --> E[JSON 格式的資料]
    E --> F[HTTP 輸出節點]

這個圖表顯示了 API 的流程式列,從 HTTP 輸入節點到 MySQL 節點,然後到 SELECT SQL 查詢,最後到資料庫記錄和 JSON 格式的資料。

建立 IoT 平臺的關鍵元件

在上一章中,我們建立了時間序列資料庫和資料庫監聽器,並建立了兩個關鍵元件:REST API 介面和MQTT 訊息代理。在本章中,我們將繼續建立 IoT 平臺的關鍵元件,包括執行 Node-RED 在背景中、建立 WebSocket 連線以及更新存取控制以增強安全性。

執行 Node-RED 在背景中

為了確保 Node-RED 在背景中持續執行,我們可以使用 forever 公用程式。這個公用程式可以確保 Node.js 應用程式持續執行,即使遇到錯誤或崩潰。首先,我們需要安裝 forever

# npm install forever -g

安裝完成後,我們可以使用以下命令執行 Node-RED:

# forever start -l node-red.log --append /usr/local/bin/node red

這個命令會將 Node-RED 的輸出記錄到 node-red.log 檔案中,我們可以隨時檢查這個檔案。

建立 WebSocket 連線

為了建立 WebSocket 連線,我們需要在 MQTT 訊息代理中啟用 WebSocket 支援。這樣,MQTT 連線的應用程式就可以使用 WebSocket 連線到 MQTT 訊息代理。

更新存取控制

為了增強安全性,我們需要更新存取控制。這包括設定使用者帳戶、許可權和存取控制清單 (ACL)。這樣, 我們就可以控制哪些使用者可以存取哪些資源和資料。

範例應用

在下一章中,我們將看到一些範例應用,示範如何使用我們的 IoT 平臺與其他應用程式和系統進行互動。這包括使用 MQTT 連線進行即時資料交換。

內容解密:

在這個章節中,我們建立了 IoT 平臺的關鍵元件,包括執行 Node-RED 在背景中、建立 WebSocket 連線和更新存取控制。這些元件對於建立一個安全和可靠的 IoT 平臺至關重要。透過使用 forever 公用程式,我們可以確保 Node-RED 持續執行,即使遇到錯誤或崩潰。透過啟用 WebSocket 支援,我們可以讓 MQTT 連線的應用程式使用 WebSocket 連線到 MQTT 訊息代理。最後,透過更新存取控制,我們可以控制哪些使用者可以存取哪些資源和資料。

圖表翻譯:

  flowchart TD
    A[Node-RED] --> B[Forever]
    B --> C[MQTT 訊息代理]
    C --> D[WebSocket 連線]
    D --> E[存取控制]
    E --> F[安全性增強]

這個圖表示範了我們的 IoT 平臺的關鍵元件,包括 Node-RED、Forever、MQTT 訊息代理、WebSocket 連線和存取控制。透過這些元件,我們可以建立一個安全和可靠的 IoT 平臺。

配置訊息代理商

在之前的章節中,我們已經建立了一個功能性的訊息代理商和雲端例項,並添加了核心能力到我們的IoT平臺中。現在,我們將修改訊息代理商的配置,使其更有用,特別是在互操作性方面。在本章中,我們將學習WebSocket和MQTT的區別,瞭解WebSocket的重要性,新增WebSocket功能到代理商中,並測試它。

WebSocket和MQTT的區別

WebSocket提供了一個始終開啟的通訊頻道,而不是像正常的HTTP一樣,每次請求都會開啟和關閉頻道。WebSocket提供了一個雙向的通訊頻道,但不一定遵循MQTT協議。您可以實現一個原始的WebSocket,並讓兩個裝置透過它進行通訊。MQTT在WebSocket上添加了強大的功能,例如讓客戶端可以選擇接收什麼訊息,並允許客戶端發布訊息或資訊給其他客戶端透過代理商。

為什麼WebSocket重要?

從我們自己的IoT平臺的角度來看,提供多種方法來連線平臺到各種裝置是一個基本的期望。這就是為什麼我們選擇了HTTP REST介面和MQTT來實現這個目的。由於這兩種協議和技術都有自己的優缺點,結合它們可以在應用架構中提供顯著的提升。想象一下,如果您的網頁應用程式可以與所有其他應用程式使用者和連線到平臺的裝置進行實時通訊,會如何。這種力量是由於WebSocket在MQTT上啟用而提供的。

新增WebSocket到MQTT配置

新增WebSocket支援到我們的代理商是一個簡單的練習。在前面的章節中,您已經看到如何新增一個listener到不同的埠口。現在,我們將新增另一個listener埠定義到配置檔案中。

# 安全開啟埠口只供localhost使用
listener 1883 localhost

# 監聽安全連線並使用SSL證書
listener 8883
certfile /etc/letsencrypt/live/in24hrs.xyz/cert.pem
cafile /etc/letsencrypt/live/in24hrs.xyz/chain.pem
keyfile /etc/letsencrypt/live/in24hrs.xyz/privkey.pem

# 監聽安全的WebSocket
listener 8443
protocol websockets
certfile /etc/letsencrypt/live/in24hrs.xyz/cert.pem

內容解密:

上述程式碼片段展示瞭如何新增WebSocket支援到MQTT代理商的配置檔案中。首先,我們定義了一個安全開啟埠口只供localhost使用,然後定義了一個監聽安全連線並使用SSL證書的埠口。最後,我們定義了一個監聽安全的WebSocket的埠口,並指定了WebSocket協議和SSL證書檔案。

圖表翻譯:

以下是MQTT代理商的架構圖,展示了WebSocket和MQTT的關係。

  flowchart TD
    A[MQTT代理商] --> B[WebSocket]
    B --> C[客戶端]
    C --> D[裝置]
    D --> E[MQTT代理商]
    E --> F[WebSocket]
    F --> G[網頁應用程式]

上述圖表展示了MQTT代理商如何使用WebSocket與客戶端和裝置進行通訊,並如何讓客戶端和裝置透過MQTT代理商進行通訊。

配置訊息代理

為了增強訊息代理的安全性和控制性,我們需要配置使用者存取控制。首先,我們需要啟用ACL(存取控制列表)功能。這可以透過修改代理的配置檔案來實現。

啟用ACL

要啟用ACL,我們需要在代理的配置檔案中新增以下內容:

acl_file /etc/mosquitto/conf.d/broker.acl

這行程式碼指定了ACL檔案的位置。

配置ACL檔案

接下來,我們需要建立ACL檔案。這個檔案將定義使用者對於不同主題的存取許可權。以下是一個範例:

user test
topic read #/test/#
topic write #/test/#

user admin
topic read #
topic write #

這個範例定義了兩個使用者:testadmin。使用者test只能讀取和寫入#/test/#主題,而使用者admin可以讀取和寫入所有主題。

啟用ACL

修改完ACL檔案後,我們需要重啟代理服務以啟用ACL功能。可以使用以下命令重啟代理:

pkill mosquitto

測試ACL

現在,我們可以使用WebSocket工具測試ACL功能。首先,連線到代理並使用使用者test登入。然後,嘗試發布一條訊息到#/test/#主題。這應該是成功的。

接下來,嘗試發布一條訊息到#/other/#主題。這應該會失敗,因為使用者test沒有寫入許可權。

程式碼示例

以下是使用Python和Paho MQTT庫連線到代理並發布一條訊息的範例:

import paho.mqtt.client as mqtt

# 連線到代理
client = mqtt.Client()
client.username_pw_set("test", "password")
client.connect("localhost", 1883)

# 發布一條訊息
client.publish("#/test/#", "Hello, world!")

# 關閉連線
client.disconnect()

這個範例使用使用者test連線到代理並發布一條訊息到#/test/#主題。

Mermaid圖表

以下是使用Mermaid繪製的代理架構圖:

  graph LR
    A[使用者] -->|連線|> B[代理]
    B -->|授權|> C[ACL]
    C -->|存取控制|> D[主題]
    D -->|發布|> E[訊息]

這個圖表展示了使用者連線到代理、授權、存取控制和發布訊息的過程。

MQTT 伺服器的存取控制清單(ACL)設定

為了增強MQTT伺服器的安全性,設定存取控制清單(ACL)是一個重要的步驟。以下將介紹如何設定ACL檔案,從而控制使用者和應用程式對MQTT主題的存取許可權。

建立ACL檔案

首先,需要建立一個ACL檔案。假設我們將ACL檔案命名為broker.acl,位於/etc/mosquitto/conf.d/目錄下。可以使用以下命令建立檔案:

touch /etc/mosquitto/conf.d/broker.acl

編輯ACL檔案

接下來,需要編輯ACL檔案以新增存取控制規則。可以使用以下命令開啟檔案:

nano /etc/mosquitto/conf.d/broker.acl

ACL檔案內容

ACL檔案的內容分為三個部分:一般存取控制、使用者存取控制和模式存取控制。以下是ACL檔案的範例內容:

# GENERAL
topic read timestamp/#

# USERS
user admin
topic readwrite #

# APPLICATION AS A USER
user my_app_name
topic read timestamp/#
topic readwrite myapp/%c/#

# PATTERNS
topic read timestamp/#
pattern readwrite users/%u/#
pattern write %c/up/#
pattern read %c/dn/#

ACL檔案解釋

  • # GENERAL部分定義一般存取控制規則,適用於所有使用者和應用程式。在這個範例中,所有使用者都可以讀取timestamp/#主題。
  • # USERS部分定義使用者存取控制規則。使用者admin可以讀寫所有主題(#)。
  • # APPLICATION AS A USER部分定義應用程式存取控制規則。應用程式my_app_name可以讀取timestamp/#主題和寫入myapp/%c/#主題。
  • # PATTERNS部分定義模式存取控制規則。這些規則適用於所有使用者和應用程式。在這個範例中,所有使用者都可以讀取timestamp/#主題,寫入users/%u/#主題,寫入%c/up/#主題,讀取%c/dn/#主題。

訊息代理伺服器設定

訊息代理伺服器(Message Broker)是物聯網(IoT)應用中的一個重要元件,負責管理和路由裝置之間的訊息。為了確保訊息代理伺服器的安全性和效率,需要進行適當的設定。

設定檔案

設定檔案是用於定義訊息代理伺服器的行為和存取控制。檔案中包含了多個部分,包括一般設定、使用者設定和模式設定。每個部分都有其特定的用途和設定方式。

一般設定

一般設定部分用於定義對所有使用者的存取控制。例如,可以設定允許或拒絕所有使用者存取特定主題。這部分的設定對所有使用者都有效。

使用者設定

使用者設定部分用於定義特定使用者的存取控制。例如,可以設定允許或拒絕特定使用者存取特定主題。這部分的設定只對指定的使用者有效。

模式設定

模式設定部分用於定義根據模式的存取控制。例如,可以設定允許或拒絕所有使用者存取特定主題,根據使用者名稱和客戶端ID的模式。這部分的設定對所有使用者都有效。

存取控制

存取控制是訊息代理伺服器的一個重要功能,負責控制使用者存取主題的許可權。存取控制可以根據使用者名稱、客戶端ID和主題名稱等因素進行設定。

使用者名稱和客戶端ID

使用者名稱和客戶端ID是用於識別使用者和客戶端的唯一標識。使用者名稱和客戶端ID可以用於設定存取控制,例如,允許或拒絕特定使用者存取特定主題。

主題名稱

主題名稱是用於識別主題的唯一標識。主題名稱可以用於設定存取控制,例如,允許或拒絕特定使用者存取特定主題。

從建置時間序列資料庫、安裝 Node-RED 必要節點、建立第一個流程、建置資料庫監聽器到 REST API 的設計與實作,本文完整闡述了建構一個基礎 IoT 平臺的流程。透過逐步的說明與程式碼範例,我們深入探討瞭如何利用 Node-RED、MQTT 和 MySQL 等技術整合資料庫、訊息佇列和 API,實現資料的收集、儲存、查詢和釋出。

分析流程中,我們特別著重於效能與安全的考量。例如,在資料庫監聽器章節中,我們強調了使用 QoS = 2 來確保訊息的可靠傳輸;在 REST API 設計中,我們使用了引數化查詢來避免 SQL 注入風險;在訊息代理設定中,更詳細說明瞭如何利用 ACL (Access Control List) 機制強化安全性,精細化控制使用者對不同主題的讀寫許可權,並以實際案例展示設定方式,有效提升系統的安全性。此外,使用forever公用程式確保 Node-RED 持續執行,提升了平臺的穩定性。

展望未來,此 IoT 平臺可以進一步整合更多功能,例如資料視覺化、機器學習模型部署、邊緣運算整合等。隨著 WebSocket 的應用,更可實現網頁應用程式與平臺裝置的即時雙向通訊,大幅提升平臺的互動性和應用範圍。同時,考量到系統的擴充套件性,未來可以評估匯入更進階的資料庫技術,例如時序資料庫 InfluxDB 或 TimescaleDB,以提升資料儲存和查詢的效率。持續最佳化訊息代理的配置,例如負載平衡和叢集部署,將能更好地應對大規模資料吞吐和高併發連線的需求。玄貓認為,持續關注這些新興技術和應用場景,將有助於打造更強大、更具彈性的 IoT 平臺,並在未來發展中取得更大的成功。