訊息代理伺服器扮演著物聯網平臺中重要的資料傳輸角色,但其本身存在一些限制,例如無法直接通知客戶端存取控制的結果。為確保訊息代理伺服器的穩定性,我們可以利用 Forever 工具來監控和管理 Mosquitto 伺服器,使其在崩潰時自動重啟。此外,設計良好的 REST API 對於有效管理和存取物聯網平臺的資料至關重要。本文將深入探討如何使用 Node-RED 和 MySQL 建立一個 RESTful API,以實現資料的查詢、過濾和刪除等功能。同時,我們也會探討如何處理身份驗證和資料完整性等議題,確保 API 的安全性和可靠性。

訊息代理伺服器的限制

訊息代理伺服器有一些限制,例如,無法通知客戶端存取控制的結果。然而,訊息代理伺服器可以記錄存取控制的結果,例如,記錄哪些使用者存取了哪些主題。

圖表翻譯:

此圖表示訊息代理伺服器的設定和存取控制流程。設定檔案是用於定義訊息代理伺服器的行為和存取控制。一般設定、使用者設定和模式設定是設定檔案的三個主要部分。存取控制是用於控制使用者存取主題的許可權。使用者名稱和客戶端ID是用於識別使用者和客戶端的唯一標識。主題名稱是用於識別主題的唯一標識。訊息代理伺服器的限制包括無法通知客戶端存取控制的結果。

設定訊息代理

為了確保 Node-RED 的穩定執行,我們使用 Forever 工具。現在,我們也要設定訊息代理,以便在代理器崩潰時,Forever 可以自動重啟它。雖然 Forever 工具可以輕易地與 Node.js 應用程式合作,但要使用它來控制 Mosquitto 代理器則需要額外的工作。

首先,我們建立一個 shell 指令碼,該指令碼呼叫 mosquitto 命令。然後,我們使用 Forever 工具執行這個指令碼。建立一個 shell 指令碼,名稱為 mqtt-sh.sh,內容如下:

#!/bin/sh
mosquitto

儲存這個檔案後,我們使其可執行,然後使用 Forever 工具執行它(先結束之前的 Mosquitto 程序):

# chmod +x mqtt-sh.sh
# pkill mosquitto
# forever start -l mqtt.log --append -c sh /root/mqtt-sh.sh

這個命令在背景中啟動訊息代理,並保持它的執行。所有代理器的輸出都會記錄在 mqtt.log 檔案中,以便檢視。要檢視目前使用 Forever 工具執行的應用程式數量,請在命令列中輸入 forever list,這將列出所有執行中的應用程式及其執行時間和其他資訊。

內容解密:

上述命令使用 Forever 工具來保持 Mosquitto 代理器的執行。Forever 工具是一個 Node.js 模組,用於保持 Node.js 應用程式的執行。如果應用程式崩潰或意外終止,Forever 工具將自動重啟它。使用 Forever 工具可以確保 Mosquitto 代理器始終保持執行,從而確保整個系統的穩定性。

圖表翻譯:

這個流程圖描述了使用 Forever 工具來保持 Mosquitto 代理器執行的過程。首先,啟動 Forever 工具,然後執行 Mosquitto 代理器。代理器的輸出將被記錄到 mqtt.log 檔案中。Forever 工具將保持代理器的執行,如果代理器崩潰或終止,Forever 工具將自動重啟它。

建立REST介面

在前面的章節中,我們已經為構建有意義的API奠定了基礎。在本章中,我們將建立資料存取API和實用API。

資料存取API

根據我們的願望清單,要求D1和D2主要是為了訪問時間序列資料儲存中的資料記錄。在第7章中,我們已經實現了核心服務,並且實現了D4要求,即透過HTTP釋出訊息API。

接下來,我們將建立一個API,根據指定的主題或有效載荷條件(或兩者兼有)來查詢一條或多條資料記錄。這個條件可以是一個主題或有效載荷模式,並且可以依賴時間戳,例如需要特定時間段的資料。

所有資料存取API都具有相似的結構,每個API都有一個HTTP輸入節點,用於建立查詢塊,然後由MySQL查詢輸出節點和HTTP響應節點組成。每個HTTP輸入節點都有一個唯一的端點結構和方法,可以用於API呼叫來呼叫該流序列。

在建立查詢節點中,我們將構建一個適合的查詢來查詢所請求的資料,並將其傳遞給MySQL節點,然後由MySQL節點執行實際查詢工作。在準備響應節點中,我們可以新增一些格式化或新增一些其他物件到響應JSON中,以進一步增強API功能。

條件基礎資料存取API

API端點結構

我們的端點結構為 /get/topicLike/:topic/payloadLike/:payload/last/:count,其中主題和有效載荷輸入是根據萬用字元的。由於我們正在查詢不確定的記錄數,因此我們強制要求新增所請求記錄的數量作為計數引數。

建立查詢函式塊程式碼

// 建立查詢
// 如果沒有身份驗證過濾器定義或可用
// 則設定預設值為1
if (!msg.req.authFilter) {
    msg.req.authFilter = 1;
}
// 萬用字元用於API查詢是*,需要轉換為SQL中的通配字元%
msg.topic = "SELECT id, topic, payload, timestamp" +
    " FROM thingData WHERE" +
    " topic LIKE '" + msg.req.params.topic.replace(/\*/g, "%") + "'" +
    " AND" +
    " payload LIKE '" + msg.req.params.payload.replace(/\*/g, "%") + "'" +
    " AND deleted=0" +
    " AND (" + msg.req.authFilter + ")";

圖表翻譯:

此圖表示了條件基礎資料存取API的流序列。HTTP輸入節點接收API呼叫,然後建立查詢節點構建適合的查詢。MySQL查詢輸出節點執行實際查詢工作,然後HTTP響應節點傳回查詢結果。

建立 REST 介面

在本章中,我們將建立一個 REST 介面來存取和管理 IoT 平臺的資料。首先,我們需要準備好程式碼以適應身份驗證過濾。我們檢查 authFilter 物件是否可用,如果不可用,則設定其預設值為 1。這確保我們的程式碼和 API 在新增身份驗證功能之前可以正常工作。

查詢構建

在查詢中,我們使用 * 作為萬用字元,而不是 %,以避免與 HTML 編碼混淆。同時,我們也檢查非刪除記錄,並新增 authFilter 條件。如果 authFilter 未提供,則預設值為 1,條件變為 AND (1),不會改變查詢結果。如果 authFilter 值為 0,則條件變為 AND (0),查詢結果將為空。

時間基礎過濾

除了根據模式的資料 API 之外,我們還添加了時間基礎 API。這裡,我們建立了三個端點:一個用於獲取指定主題或主題模式在指定時間戳之後建立的記錄,另一個用於獲取在指定時間戳之前建立的記錄,最後一個用於獲取在兩個時間戳之間建立的記錄。

端點

我們有三個端點:

  • /get/:topic/after/:time/last/:count:查詢指定主題或主題模式在指定時間戳之後建立的記錄。
  • /get/:topic/before/:time/last/:count:查詢指定主題或主題模式在指定時間戳之前建立的記錄。
  • /get/:topic/during/:start/:end/last/:count:查詢指定主題或主題模式在兩個時間戳之間建立的記錄。

示例

以下是使用 cURL 呼叫這些 API 的示例:

payloadLike/*/last/5

輸出 1:

[
  {"id": 18, "topic": "mytopic", "payload": "myplayload", "timestamp": "1543731089.784"},
  {"id": 8, "topic": "myTopic", "payload": "myPayload", "timestamp": "1543717154.899"},
  {"id": 7, "topic": "myTopic", "payload": "myPayload", "timestamp": "1543716966.189"}
]
payloadLike/*/last/2

輸出 2:

[
  {"id": 31, "topic": "timestamp", "payload": "1544011400243", "timestamp": "1544011400.245"},
  {"id": 30, "topic": "timestamp", "payload": "1544011399074", "timestamp": "1544011399.078"}
]
payloadLike/*88*/last/2

輸出 3:

[
  {"id": 28, "topic": "timestamp", "payload": "1544011288907", "timestamp": "1544011288.910"}
]

在輸出 3 中,雖然我們要求 2 條記錄,但只有一條記錄滿足主題和有效負載模式的要求。

圖表翻譯:

內容解密:

以上程式碼示例展示瞭如何構建查詢和使用時間基礎過濾。同時,也展示瞭如何使用 cURL 呼叫這些 API。輸出結果顯示了查詢結果的 JSON 格式。

時間戳記查詢

在查詢記錄時,提供開始和結束的時間戳記是非常重要的。這樣可以根據時間戳記來篩選出特定時間段內的記錄。根據使用的端點不同,建立查詢函式的方式也會略有不同,因為查詢語句不同。以下是三個程式碼片段:

建立「之後」查詢

如果沒有定義或可用的身份驗證過濾器,則設定預設值為 1。

if (!msg.req.authFilter) {
  msg.req.authFilter = 1;
}
// 將 API 查詢中的萬字元 * 轉換為 SQL 中的萬字元 %
msg.topic = "SELECT id, topic, payload, timestamp" +
  " FROM thingData WHERE" +
  " topic LIKE '" + msg.req.params.topic.replace(/\*/g, "%") + "'" +
  " AND" +
  " timestamp >= '" + msg.req.params.time + "'" +
  " AND deleted=0" +
  " AND (" + msg.req.authFilter + ")" +
  " ORDER BY timestamp";
return msg;

建立「之前」查詢

如果沒有定義或可用的身份驗證過濾器,則設定預設值為 1。

if (!msg.req.authFilter) {
  msg.req.authFilter = 1;
}
// 將 API 查詢中的萬字元 * 轉換為 SQL 中的萬字元 %
msg.topic = "SELECT id, topic, payload, timestamp" +
  " FROM thingData WHERE" +
  " topic LIKE '" + msg.req.params.topic.replace(/\*/g, "%") + "'" +
  " AND" +
  " timestamp <= '" + msg.req.params.time + "'" +
  " AND deleted=0" +
  " AND (" + msg.req.authFilter + ")" +
  " ORDER BY timestamp";
return msg;

內容解密:

上述程式碼片段展示瞭如何根據時間戳記查詢記錄。首先,檢查是否有身份驗證過濾器,如果沒有,則設定預設值為 1。然後,將 API 查詢中的萬字元 * 轉換為 SQL 中的萬字元 %。接著,構建查詢語句,根據時間戳記篩選出記錄,並根據時間戳記排序。

圖表翻譯:

上述流程圖展示了查詢記錄的過程,從檢查身份驗證過濾器開始,到設定預設值、轉換萬字元、構建查詢語句、篩選記錄、排序記錄,最終傳回結果。

資料查詢與過濾

在進行資料查詢時,需要考慮到查詢條件的設定,包括主題(topic)、時間範圍(timestamp)等。以下是資料查詢的程式碼實現:

def query_data(msg):
    # 將 wildcard 字元 * 轉換為 SQL 的 LIKE 字元 %
    topic = msg.req.params.topic.replace('*', '%')
    
    # 設定查詢條件
    query = "SELECT id, topic, payload, timestamp "
    query += "FROM thingData "
    query += "WHERE topic LIKE '" + topic + "' "
    query += "AND timestamp <= '" + msg.req.params.time + "' "
    query += "AND deleted = 0 "
    
    # 如果沒有設定驗證過濾器,則使用預設值 1
    if not msg.req.authFilter:
        msg.req.authFilter = 1
    query += "AND (" + msg.req.authFilter + ") "
    query += "ORDER BY timestamp "
    
    return query

def query_data_during(msg):
    # 將 wildcard 字元 * 轉換為 SQL 的 LIKE 字元 %
    topic = msg.req.params.topic.replace('*', '%')
    
    # 設定查詢條件
    query = "SELECT id, topic, payload, timestamp "
    query += "FROM thingData "
    query += "WHERE topic LIKE '" + topic + "' "
    query += "AND timestamp >= '" + msg.req.params.start + "' "
    query += "AND deleted = 0 "
    
    # 如果沒有設定驗證過濾器,則使用預設值 1
    if not msg.req.authFilter:
        msg.req.authFilter = 1
    query += "AND (" + msg.req.authFilter + ") "
    query += "ORDER BY timestamp "
    
    return query

內容解密:

以上程式碼實現了資料查詢的功能,包括主題查詢、時間範圍查詢等。其中,query_data 函式實現了資料查詢的基本功能,而 query_data_during 函式則實現了特定時間範圍內的資料查詢。

圖表翻譯:

以下是資料查詢流程的視覺化圖表: 這個圖表展示了資料查詢的流程,從收到查詢請求開始,到傳回查詢結果為止。每一步驟都對應到程式碼中的特定功能。

建立一個 RESTful 介面

在本章中,我們將建立一個 RESTful 介面來管理資料。REST(Representational State of Resource)是一種設計風格,提供了一種簡單、統一的方式來訪問和操作資源。

查詢資料

首先,我們需要建立一個查詢資料的 API。以下是查詢資料的 SQL 語句:

SELECT * FROM data 
WHERE topic = 'mytopic' 
AND timestamp <=' + msg.req.params.end + ' 
AND deleted=0 
AND (' + msg.req.authFilter + ') 
ORDER BY id;

這個查詢語句查詢 mytopic 主題下的所有資料,時間戳小於或等於 end 引數,且 deleted 欄位為 0。

刪除資料

接下來,我們需要建立一個刪除資料的 API。刪除資料可以分為兩種方式:可恢復刪除和不可恢復刪除。可恢復刪除是指將資料標記為已刪除,但不真正刪除資料。不可恢復刪除是指真正刪除資料。

可恢復刪除

可恢復刪除是指將資料的 deleted 欄位設為 1。以下是可恢復刪除的 API 端點:

@app.route('/delete/:topic/id/:id', methods=['DELETE'])
def delete_data(topic, id):
    # 查詢資料
    data = Data.query.filter_by(topic=topic, id=id).first()
    if data:
        # 標記資料為已刪除
        data.deleted = 1
        db.session.commit()
        return {'message': '資料已刪除'}
    else:
        return {'message': '資料不存在'}

不可恢復刪除

不可恢復刪除是指真正刪除資料。以下是不可恢復刪除的 API 端點:

@app.route('/delete/:topic/id/:id/permanent', methods=['DELETE'])
def delete_data_permanent(topic, id):
    # 查詢資料
    data = Data.query.filter_by(topic=topic, id=id).first()
    if data:
        # 刪除資料
        db.session.delete(data)
        db.session.commit()
        return {'message': '資料已刪除'}
    else:
        return {'message': '資料不存在'}

API 端點

以下是所有 API 端點:

  • GET /data/:topic: 查詢 mytopic 主題下的所有資料
  • DELETE /delete/:topic/id/:id: 可恢復刪除資料
  • DELETE /delete/:topic/id/:id/permanent: 不可恢復刪除資料

測試 API

以下是測試 API 的範例:

curl -X GET 'http://localhost:5000/data/mytopic'
curl -X DELETE 'http://localhost:5000/delete/mytopic/id/1'
curl -X DELETE 'http://localhost:5000/delete/mytopic/id/1/permanent'

建立一個 RESTful 介面

在本章中,我們將探討如何建立一個 RESTful 介面來管理物聯網裝置的資料。REST(Representational State of Resource)是一種廣泛使用的網路架構風格,根據 HTTP 協議,使用 JSON 或 XML 格式來交換資料。

刪除裝置資料

首先,我們來看一下如何刪除裝置資料。以下是刪除裝置資料的 API 程式碼:

if not msg.req.authFilter:
    msg.req.authFilter = 1

msg.topic = "UPDATE thingData" + \
            " SET deleted=1" + \
            " WHERE" + \
            " topic='" + msg.req.params.topic + "'" + \
            " AND (" + msg.req.authFilter + ")" + \
            " AND id=" + msg.req.params.id + ";"
return msg

在這段程式碼中,我們更新了裝置資料表中的記錄,將 deleted 欄位設為 1,以標記記錄為已刪除。注意,我們並沒有真正地刪除記錄,而是將其標記為已刪除,以便於日後的恢復。

批次刪除裝置資料

接下來,我們來看一下如何批次刪除裝置資料。以下是批次刪除裝置資料的 API 程式碼:

if not msg.req.authFilter:
    msg.req.authFilter = 1

msg.topic = "UPDATE thingData" + \
            " SET deleted=1" + \
            " WHERE" + \
            " topic='" + msg.req.params.topic + "'" + \
            " AND (" + msg.req.authFilter + ");"
return msg

在這段程式碼中,我們批次更新了裝置資料表中的記錄,將所有與給定主題相關的記錄標記為已刪除。

處理 MySQL 節點的響應

最後,我們需要修改 prepare response 函式來處理 MySQL 節點的響應。以下是修改過的程式碼:

// Prepare response

在這段程式碼中,我們需要處理 MySQL 節點傳回的資料,提取出我們需要的資訊,並傳回給使用者。

圖表翻譯:

在這個流程圖中,我們展示了刪除裝置資料和批次刪除裝置資料的流程。首先,我們更新裝置資料表中的記錄,將 deleted 欄位設為 1,以標記記錄為已刪除。然後,我們傳回結果給使用者。批次刪除裝置資料的流程類似,但我們批次更新了裝置資料表中的記錄。

建立 REST 介面

在本章中,我們將探討如何建立一個 REST 介面來管理時間序列資料。REST(Representational State of Resource)是一種廣泛使用的網路架構風格,允許使用者端和伺服器之間進行通訊。

刪除記錄

首先,我們來看一下刪除記錄的 API。這個 API 的路徑是 /delete/:topic/last/:count,其中 :topic 是主題名稱,:count 是要刪除的記錄數量。

if (!msg.req.authFilter) {
  msg.req.authFilter = 1;
}

msg.topic = "DELETE thingData" +
  " WHERE deleted=1" +
  " AND" +
  " topic='" + msg.req.params.topic + "'" +
  " AND (" + msg.req.authFilter + ")";
return msg;

這個 API 的作用是刪除指定主題下最新的 :count 條記錄。若沒有指定記錄數量,則預設刪除 1 條記錄。

刪除記錄(從開始)

另一個 API 的路徑是 /delete/:topic/first/:count,這個 API 的作用是刪除指定主題下最舊的 :count 條記錄。

// ...

這個 API 的實現與前一個 API 類似,不過排序方式不同。前一個 API 排序方式是 DESC(降序),而這個 API 排序方式是 ASC(升序)。

時間序列資料儲存

在時間序列資料儲存中,刪除記錄並不會真正地從資料庫中刪除,而是將記錄的 deleted 標誌設定為 1。這樣,記錄仍然存在於資料庫中,但不會被查詢出來。

// ...

這種方式可以保留資料的完整性和可追溯性。

示例

以下是使用 cURL 命令測試這些 API 的示例:

curl -X DELETE 'http://example.com/delete/timestamp/last/1'
curl -X DELETE 'http://example.com/delete/timestamp/first/1'

這些命令分別刪除最新的一條記錄和最舊的一條記錄。

內容解密:

上述 API 的實現使用了 JavaScript 和 Node.js。API 的路徑和查詢引數使用了 Express.js 框架。資料庫操作使用了 MySQL。

const express = require('express');
const mysql = require('mysql');

const app = express();

// ...

這個示例展示瞭如何使用 Node.js 和 Express.js 建立一個 REST 介面來管理時間序列資料。

圖表翻譯:

以下是 API 的流程圖:

@startuml
skinparam backgroundColor #FEFEFE
skinparam sequenceArrowThickness 2

title MQTT 訊息代理伺服器設定與 REST API 設計

actor "客戶端" as client
participant "API Gateway" as gateway
participant "認證服務" as auth
participant "業務服務" as service
database "資料庫" as db
queue "訊息佇列" as mq

client -> gateway : HTTP 請求
gateway -> auth : 驗證 Token
auth --> gateway : 認證結果

alt 認證成功
    gateway -> service : 轉發請求
    service -> db : 查詢/更新資料
    db --> service : 回傳結果
    service -> mq : 發送事件
    service --> gateway : 回應資料
    gateway --> client : HTTP 200 OK
else 認證失敗
    gateway --> client : HTTP 401 Unauthorized
end

@enduml

這個流程圖展示了 API 的主要步驟,包括刪除記錄、查詢記錄、排序記錄和傳回結果。

時序資料儲存的資料刪除和清除

在時序資料儲存中,刪除和清除資料是兩個不同的過程。刪除資料是指標記資料為已刪除,但不會立即從儲存中移除。清除資料是指完全從儲存中移除已刪除的資料。

從系統架構視角來看,訊息代理伺服器在物聯網平臺中扮演著關鍵角色,負責訊息的路由和轉發,但其本身的限制,例如無法直接通知客戶端存取控制結果,需要透過額外的機制,例如日誌記錄來彌補。本文深入探討瞭如何利用 Forever 工具確保 Mosquitto 訊息代理伺服器的穩定執行,以及如何構建 RESTful 介面來實現資料的查詢、過濾和刪除,包含條件式查詢、時間戳記查詢以及批次刪除等功能。文章中提供的程式碼範例和流程圖清晰地展示了 API 的設計和運作方式,並特別強調了軟刪除機制,以確保資料的完整性和可追溯性。然而,目前方案的 RESTful 介面設計仍有改進空間,例如可以匯入更完善的錯誤處理機制和版本控制。展望未來,隨著物聯網裝置的普及和資料量的增長,訊息代理伺服器和 RESTful 介面的效能和可擴充套件性將面臨更大的挑戰,需要持續最佳化和改進,例如引入更進階的訊息佇列技術和資料庫分片策略。玄貓認為,掌握這些關鍵技術,才能構建更穩健、高效的物聯網平臺。