訊息代理伺服器扮演著物聯網平臺中重要的資料傳輸角色,但其本身存在一些限制,例如無法直接通知客戶端存取控制的結果。為確保訊息代理伺服器的穩定性,我們可以利用 Forever 工具來監控和管理 Mosquitto 伺服器,使其在崩潰時自動重啟。此外,設計良好的 REST API 對於有效管理和存取物聯網平臺的資料至關重要。本文將深入探討如何使用 Node-RED 和 MySQL 建立一個 RESTful API,以實現資料的查詢、過濾和刪除等功能。同時,我們也會探討如何處理身份驗證和資料完整性等議題,確保 API 的安全性和可靠性。

訊息代理伺服器的限制

訊息代理伺服器有一些限制,例如,無法通知客戶端存取控制的結果。然而,訊息代理伺服器可以記錄存取控制的結果,例如,記錄哪些使用者存取了哪些主題。

圖表翻譯:

此圖表示訊息代理伺服器的設定和存取控制流程。設定檔案是用於定義訊息代理伺服器的行為和存取控制。一般設定、使用者設定和模式設定是設定檔案的三個主要部分。存取控制是用於控制使用者存取主題的許可權。使用者名稱和客戶端ID是用於識別使用者和客戶端的唯一標識。主題名稱是用於識別主題的唯一標識。訊息代理伺服器的限制包括無法通知客戶端存取控制的結果。

設定訊息代理

為了確保 Node-RED 的穩定執行,我們使用 Forever 工具。現在,我們也要設定訊息代理,以便在代理器崩潰時,Forever 可以自動重啟它。雖然 Forever 工具可以輕易地與 Node.js 應用程式合作,但要使用它來控制 Mosquitto 代理器則需要額外的工作。

首先,我們建立一個 shell 指令碼,該指令碼呼叫 mosquitto 命令。然後,我們使用 Forever 工具執行這個指令碼。建立一個 shell 指令碼,名稱為 mqtt-sh.sh,內容如下:

#!/bin/sh
mosquitto

儲存這個檔案後,我們使其可執行,然後使用 Forever 工具執行它(先結束之前的 Mosquitto 程序):

# chmod +x mqtt-sh.sh
# pkill mosquitto
# forever start -l mqtt.log --append -c sh /root/mqtt-sh.sh

這個命令在背景中啟動訊息代理,並保持它的執行。所有代理器的輸出都會記錄在 mqtt.log 檔案中,以便檢視。要檢視目前使用 Forever 工具執行的應用程式數量,請在命令列中輸入 forever list,這將列出所有執行中的應用程式及其執行時間和其他資訊。

內容解密:

上述命令使用 Forever 工具來保持 Mosquitto 代理器的執行。Forever 工具是一個 Node.js 模組,用於保持 Node.js 應用程式的執行。如果應用程式崩潰或意外終止,Forever 工具將自動重啟它。使用 Forever 工具可以確保 Mosquitto 代理器始終保持執行,從而確保整個系統的穩定性。

圖表翻譯:

  flowchart TD
    A[啟動 Forever 工具] --> B[執行 Mosquitto 代理器]
    B --> C[記錄輸出到 mqtt.log]
    C --> D[保持代理器執行]
    D --> E[自動重啟代理器]

這個流程圖描述了使用 Forever 工具來保持 Mosquitto 代理器執行的過程。首先,啟動 Forever 工具,然後執行 Mosquitto 代理器。代理器的輸出將被記錄到 mqtt.log 檔案中。Forever 工具將保持代理器的執行,如果代理器崩潰或終止,Forever 工具將自動重啟它。

建立REST介面

在前面的章節中,我們已經為構建有意義的API奠定了基礎。在本章中,我們將建立資料存取API和實用API。

資料存取API

根據我們的願望清單,要求D1和D2主要是為了訪問時間序列資料儲存中的資料記錄。在第7章中,我們已經實現了核心服務,並且實現了D4要求,即透過HTTP釋出訊息API。

接下來,我們將建立一個API,根據指定的主題或有效載荷條件(或兩者兼有)來查詢一條或多條資料記錄。這個條件可以是一個主題或有效載荷模式,並且可以依賴時間戳,例如需要特定時間段的資料。

所有資料存取API都具有相似的結構,每個API都有一個HTTP輸入節點,用於建立查詢塊,然後由MySQL查詢輸出節點和HTTP響應節點組成。每個HTTP輸入節點都有一個唯一的端點結構和方法,可以用於API呼叫來呼叫該流序列。

在建立查詢節點中,我們將構建一個適合的查詢來查詢所請求的資料,並將其傳遞給MySQL節點,然後由MySQL節點執行實際查詢工作。在準備響應節點中,我們可以新增一些格式化或新增一些其他物件到響應JSON中,以進一步增強API功能。

條件基礎資料存取API

  flowchart TD
    A[HTTP輸入節點] --> B[建立查詢節點]
    B --> C[MySQL查詢輸出節點]
    C --> D[HTTP響應節點]

API端點結構

我們的端點結構為 /get/topicLike/:topic/payloadLike/:payload/last/:count,其中主題和有效載荷輸入是根據萬用字元的。由於我們正在查詢不確定的記錄數,因此我們強制要求新增所請求記錄的數量作為計數引數。

建立查詢函式塊程式碼

// 建立查詢
// 如果沒有身份驗證過濾器定義或可用
// 則設定預設值為1
if (!msg.req.authFilter) {
    msg.req.authFilter = 1;
}
// 萬用字元用於API查詢是*,需要轉換為SQL中的通配字元%
msg.topic = "SELECT id, topic, payload, timestamp" +
    " FROM thingData WHERE" +
    " topic LIKE '" + msg.req.params.topic.replace(/\*/g, "%") + "'" +
    " AND" +
    " payload LIKE '" + msg.req.params.payload.replace(/\*/g, "%") + "'" +
    " AND deleted=0" +
    " AND (" + msg.req.authFilter + ")";

圖表翻譯:

此圖表示了條件基礎資料存取API的流序列。HTTP輸入節點接收API呼叫,然後建立查詢節點構建適合的查詢。MySQL查詢輸出節點執行實際查詢工作,然後HTTP響應節點傳回查詢結果。

建立 REST 介面

在本章中,我們將建立一個 REST 介面來存取和管理 IoT 平臺的資料。首先,我們需要準備好程式碼以適應身份驗證過濾。我們檢查 authFilter 物件是否可用,如果不可用,則設定其預設值為 1。這確保我們的程式碼和 API 在新增身份驗證功能之前可以正常工作。

查詢構建

在查詢中,我們使用 * 作為萬用字元,而不是 %,以避免與 HTML 編碼混淆。同時,我們也檢查非刪除記錄,並新增 authFilter 條件。如果 authFilter 未提供,則預設值為 1,條件變為 AND (1),不會改變查詢結果。如果 authFilter 值為 0,則條件變為 AND (0),查詢結果將為空。

時間基礎過濾

除了根據模式的資料 API 之外,我們還添加了時間基礎 API。這裡,我們建立了三個端點:一個用於獲取指定主題或主題模式在指定時間戳之後建立的記錄,另一個用於獲取在指定時間戳之前建立的記錄,最後一個用於獲取在兩個時間戳之間建立的記錄。

端點

我們有三個端點:

  • /get/:topic/after/:time/last/:count:查詢指定主題或主題模式在指定時間戳之後建立的記錄。
  • /get/:topic/before/:time/last/:count:查詢指定主題或主題模式在指定時間戳之前建立的記錄。
  • /get/:topic/during/:start/:end/last/:count:查詢指定主題或主題模式在兩個時間戳之間建立的記錄。

示例

以下是使用 cURL 呼叫這些 API 的示例:

payloadLike/*/last/5

輸出 1:

[
  {"id": 18, "topic": "mytopic", "payload": "myplayload", "timestamp": "1543731089.784"},
  {"id": 8, "topic": "myTopic", "payload": "myPayload", "timestamp": "1543717154.899"},
  {"id": 7, "topic": "myTopic", "payload": "myPayload", "timestamp": "1543716966.189"}
]
payloadLike/*/last/2

輸出 2:

[
  {"id": 31, "topic": "timestamp", "payload": "1544011400243", "timestamp": "1544011400.245"},
  {"id": 30, "topic": "timestamp", "payload": "1544011399074", "timestamp": "1544011399.078"}
]
payloadLike/*88*/last/2

輸出 3:

[
  {"id": 28, "topic": "timestamp", "payload": "1544011288907", "timestamp": "1544011288.910"}
]

在輸出 3 中,雖然我們要求 2 條記錄,但只有一條記錄滿足主題和有效負載模式的要求。

圖表翻譯:

  flowchart TD
    A[查詢構建] --> B[時間基礎過濾]
    B --> C[端點]
    C --> D[查詢指定主題或主題模式]
    D --> E[查詢記錄]
    E --> F[傳回記錄]

內容解密:

以上程式碼示例展示瞭如何構建查詢和使用時間基礎過濾。同時,也展示瞭如何使用 cURL 呼叫這些 API。輸出結果顯示了查詢結果的 JSON 格式。

時間戳記查詢

在查詢記錄時,提供開始和結束的時間戳記是非常重要的。這樣可以根據時間戳記來篩選出特定時間段內的記錄。根據使用的端點不同,建立查詢函式的方式也會略有不同,因為查詢語句不同。以下是三個程式碼片段:

建立「之後」查詢

如果沒有定義或可用的身份驗證過濾器,則設定預設值為 1。

if (!msg.req.authFilter) {
  msg.req.authFilter = 1;
}
// 將 API 查詢中的萬字元 * 轉換為 SQL 中的萬字元 %
msg.topic = "SELECT id, topic, payload, timestamp" +
  " FROM thingData WHERE" +
  " topic LIKE '" + msg.req.params.topic.replace(/\*/g, "%") + "'" +
  " AND" +
  " timestamp >= '" + msg.req.params.time + "'" +
  " AND deleted=0" +
  " AND (" + msg.req.authFilter + ")" +
  " ORDER BY timestamp";
return msg;

建立「之前」查詢

如果沒有定義或可用的身份驗證過濾器,則設定預設值為 1。

if (!msg.req.authFilter) {
  msg.req.authFilter = 1;
}
// 將 API 查詢中的萬字元 * 轉換為 SQL 中的萬字元 %
msg.topic = "SELECT id, topic, payload, timestamp" +
  " FROM thingData WHERE" +
  " topic LIKE '" + msg.req.params.topic.replace(/\*/g, "%") + "'" +
  " AND" +
  " timestamp <= '" + msg.req.params.time + "'" +
  " AND deleted=0" +
  " AND (" + msg.req.authFilter + ")" +
  " ORDER BY timestamp";
return msg;

內容解密:

上述程式碼片段展示瞭如何根據時間戳記查詢記錄。首先,檢查是否有身份驗證過濾器,如果沒有,則設定預設值為 1。然後,將 API 查詢中的萬字元 * 轉換為 SQL 中的萬字元 %。接著,構建查詢語句,根據時間戳記篩選出記錄,並根據時間戳記排序。

圖表翻譯:

  flowchart TD
    A[開始] --> B[檢查身份驗證過濾器]
    B --> C[設定預設值]
    C --> D[轉換萬字元]
    D --> E[構建查詢語句]
    E --> F[篩選記錄]
    F --> G[排序記錄]
    G --> H[傳回結果]

上述流程圖展示了查詢記錄的過程,從檢查身份驗證過濾器開始,到設定預設值、轉換萬字元、構建查詢語句、篩選記錄、排序記錄,最終傳回結果。

資料查詢與過濾

在進行資料查詢時,需要考慮到查詢條件的設定,包括主題(topic)、時間範圍(timestamp)等。以下是資料查詢的程式碼實現:

def query_data(msg):
    # 將 wildcard 字元 * 轉換為 SQL 的 LIKE 字元 %
    topic = msg.req.params.topic.replace('*', '%')
    
    # 設定查詢條件
    query = "SELECT id, topic, payload, timestamp "
    query += "FROM thingData "
    query += "WHERE topic LIKE '" + topic + "' "
    query += "AND timestamp <= '" + msg.req.params.time + "' "
    query += "AND deleted = 0 "
    
    # 如果沒有設定驗證過濾器,則使用預設值 1
    if not msg.req.authFilter:
        msg.req.authFilter = 1
    query += "AND (" + msg.req.authFilter + ") "
    query += "ORDER BY timestamp "
    
    return query

def query_data_during(msg):
    # 將 wildcard 字元 * 轉換為 SQL 的 LIKE 字元 %
    topic = msg.req.params.topic.replace('*', '%')
    
    # 設定查詢條件
    query = "SELECT id, topic, payload, timestamp "
    query += "FROM thingData "
    query += "WHERE topic LIKE '" + topic + "' "
    query += "AND timestamp >= '" + msg.req.params.start + "' "
    query += "AND deleted = 0 "
    
    # 如果沒有設定驗證過濾器,則使用預設值 1
    if not msg.req.authFilter:
        msg.req.authFilter = 1
    query += "AND (" + msg.req.authFilter + ") "
    query += "ORDER BY timestamp "
    
    return query

內容解密:

以上程式碼實現了資料查詢的功能,包括主題查詢、時間範圍查詢等。其中,query_data 函式實現了資料查詢的基本功能,而 query_data_during 函式則實現了特定時間範圍內的資料查詢。

圖表翻譯:

以下是資料查詢流程的視覺化圖表:

  flowchart TD
    A[收到查詢請求] --> B[設定查詢條件]
    B --> C[轉換 wildcard 字元]
    C --> D[組裝查詢語句]
    D --> E[執行查詢]
    E --> F[傳回查詢結果]

這個圖表展示了資料查詢的流程,從收到查詢請求開始,到傳回查詢結果為止。每一步驟都對應到程式碼中的特定功能。

建立一個 RESTful 介面

在本章中,我們將建立一個 RESTful 介面來管理資料。REST(Representational State of Resource)是一種設計風格,提供了一種簡單、統一的方式來訪問和操作資源。

查詢資料

首先,我們需要建立一個查詢資料的 API。以下是查詢資料的 SQL 語句:

SELECT * FROM data 
WHERE topic = 'mytopic' 
AND timestamp <=' + msg.req.params.end + ' 
AND deleted=0 
AND (' + msg.req.authFilter + ') 
ORDER BY id;

這個查詢語句查詢 mytopic 主題下的所有資料,時間戳小於或等於 end 引數,且 deleted 欄位為 0。

刪除資料

接下來,我們需要建立一個刪除資料的 API。刪除資料可以分為兩種方式:可恢復刪除和不可恢復刪除。可恢復刪除是指將資料標記為已刪除,但不真正刪除資料。不可恢復刪除是指真正刪除資料。

可恢復刪除

可恢復刪除是指將資料的 deleted 欄位設為 1。以下是可恢復刪除的 API 端點:

@app.route('/delete/:topic/id/:id', methods=['DELETE'])
def delete_data(topic, id):
    # 查詢資料
    data = Data.query.filter_by(topic=topic, id=id).first()
    if data:
        # 標記資料為已刪除
        data.deleted = 1
        db.session.commit()
        return {'message': '資料已刪除'}
    else:
        return {'message': '資料不存在'}

不可恢復刪除

不可恢復刪除是指真正刪除資料。以下是不可恢復刪除的 API 端點:

@app.route('/delete/:topic/id/:id/permanent', methods=['DELETE'])
def delete_data_permanent(topic, id):
    # 查詢資料
    data = Data.query.filter_by(topic=topic, id=id).first()
    if data:
        # 刪除資料
        db.session.delete(data)
        db.session.commit()
        return {'message': '資料已刪除'}
    else:
        return {'message': '資料不存在'}

API 端點

以下是所有 API 端點:

  • GET /data/:topic: 查詢 mytopic 主題下的所有資料
  • DELETE /delete/:topic/id/:id: 可恢復刪除資料
  • DELETE /delete/:topic/id/:id/permanent: 不可恢復刪除資料

測試 API

以下是測試 API 的範例:

curl -X GET 'http://localhost:5000/data/mytopic'
curl -X DELETE 'http://localhost:5000/delete/mytopic/id/1'
curl -X DELETE 'http://localhost:5000/delete/mytopic/id/1/permanent'

建立一個 RESTful 介面

在本章中,我們將探討如何建立一個 RESTful 介面來管理物聯網裝置的資料。REST(Representational State of Resource)是一種廣泛使用的網路架構風格,根據 HTTP 協議,使用 JSON 或 XML 格式來交換資料。

刪除裝置資料

首先,我們來看一下如何刪除裝置資料。以下是刪除裝置資料的 API 程式碼:

if not msg.req.authFilter:
    msg.req.authFilter = 1

msg.topic = "UPDATE thingData" + \
            " SET deleted=1" + \
            " WHERE" + \
            " topic='" + msg.req.params.topic + "'" + \
            " AND (" + msg.req.authFilter + ")" + \
            " AND id=" + msg.req.params.id + ";"
return msg

在這段程式碼中,我們更新了裝置資料表中的記錄,將 deleted 欄位設為 1,以標記記錄為已刪除。注意,我們並沒有真正地刪除記錄,而是將其標記為已刪除,以便於日後的恢復。

批次刪除裝置資料

接下來,我們來看一下如何批次刪除裝置資料。以下是批次刪除裝置資料的 API 程式碼:

if not msg.req.authFilter:
    msg.req.authFilter = 1

msg.topic = "UPDATE thingData" + \
            " SET deleted=1" + \
            " WHERE" + \
            " topic='" + msg.req.params.topic + "'" + \
            " AND (" + msg.req.authFilter + ");"
return msg

在這段程式碼中,我們批次更新了裝置資料表中的記錄,將所有與給定主題相關的記錄標記為已刪除。

處理 MySQL 節點的響應

最後,我們需要修改 prepare response 函式來處理 MySQL 節點的響應。以下是修改過的程式碼:

// Prepare response

在這段程式碼中,我們需要處理 MySQL 節點傳回的資料,提取出我們需要的資訊,並傳回給使用者。

圖表翻譯:

  flowchart TD
    A[刪除裝置資料] --> B[更新裝置資料表]
    B --> C[標記記錄為已刪除]
    C --> D[傳回結果]
    D --> E[批次刪除裝置資料]
    E --> F[更新裝置資料表]
    F --> G[標記記錄為已刪除]
    G --> H[傳回結果]

在這個流程圖中,我們展示了刪除裝置資料和批次刪除裝置資料的流程。首先,我們更新裝置資料表中的記錄,將 deleted 欄位設為 1,以標記記錄為已刪除。然後,我們傳回結果給使用者。批次刪除裝置資料的流程類似,但我們批次更新了裝置資料表中的記錄。

建立 REST 介面

在本章中,我們將探討如何建立一個 REST 介面來管理時間序列資料。REST(Representational State of Resource)是一種廣泛使用的網路架構風格,允許使用者端和伺服器之間進行通訊。

刪除記錄

首先,我們來看一下刪除記錄的 API。這個 API 的路徑是 /delete/:topic/last/:count,其中 :topic 是主題名稱,:count 是要刪除的記錄數量。

if (!msg.req.authFilter) {
  msg.req.authFilter = 1;
}

msg.topic = "DELETE thingData" +
  " WHERE deleted=1" +
  " AND" +
  " topic='" + msg.req.params.topic + "'" +
  " AND (" + msg.req.authFilter + ")";
return msg;

這個 API 的作用是刪除指定主題下最新的 :count 條記錄。若沒有指定記錄數量,則預設刪除 1 條記錄。

刪除記錄(從開始)

另一個 API 的路徑是 /delete/:topic/first/:count,這個 API 的作用是刪除指定主題下最舊的 :count 條記錄。

// ...

這個 API 的實現與前一個 API 類似,不過排序方式不同。前一個 API 排序方式是 DESC(降序),而這個 API 排序方式是 ASC(升序)。

時間序列資料儲存

在時間序列資料儲存中,刪除記錄並不會真正地從資料庫中刪除,而是將記錄的 deleted 標誌設定為 1。這樣,記錄仍然存在於資料庫中,但不會被查詢出來。

// ...

這種方式可以保留資料的完整性和可追溯性。

示例

以下是使用 cURL 命令測試這些 API 的示例:

curl -X DELETE 'http://example.com/delete/timestamp/last/1'
curl -X DELETE 'http://example.com/delete/timestamp/first/1'

這些命令分別刪除最新的一條記錄和最舊的一條記錄。

內容解密:

上述 API 的實現使用了 JavaScript 和 Node.js。API 的路徑和查詢引數使用了 Express.js 框架。資料庫操作使用了 MySQL。

const express = require('express');
const mysql = require('mysql');

const app = express();

// ...

這個示例展示瞭如何使用 Node.js 和 Express.js 建立一個 REST 介面來管理時間序列資料。

圖表翻譯:

以下是 API 的流程圖:

  flowchart TD
  A[刪除記錄] --> B[查詢記錄]
  B --> C[排序記錄]
  C --> D[刪除記錄]
  D --> E[傳回結果]

這個流程圖展示了 API 的主要步驟,包括刪除記錄、查詢記錄、排序記錄和傳回結果。

時序資料儲存的資料刪除和清除

在時序資料儲存中,刪除和清除資料是兩個不同的過程。刪除資料是指標記資料為已刪除,但不會立即從儲存中移除。清除資料是指完全從儲存中移除已刪除的資料。

從系統架構視角來看,訊息代理伺服器在物聯網平臺中扮演著關鍵角色,負責訊息的路由和轉發,但其本身的限制,例如無法直接通知客戶端存取控制結果,需要透過額外的機制,例如日誌記錄來彌補。本文深入探討瞭如何利用 Forever 工具確保 Mosquitto 訊息代理伺服器的穩定執行,以及如何構建 RESTful 介面來實現資料的查詢、過濾和刪除,包含條件式查詢、時間戳記查詢以及批次刪除等功能。文章中提供的程式碼範例和流程圖清晰地展示了 API 的設計和運作方式,並特別強調了軟刪除機制,以確保資料的完整性和可追溯性。然而,目前方案的 RESTful 介面設計仍有改進空間,例如可以匯入更完善的錯誤處理機制和版本控制。展望未來,隨著物聯網裝置的普及和資料量的增長,訊息代理伺服器和 RESTful 介面的效能和可擴充套件性將面臨更大的挑戰,需要持續最佳化和改進,例如引入更進階的訊息佇列技術和資料庫分片策略。玄貓認為,掌握這些關鍵技術,才能構建更穩健、高效的物聯網平臺。