訊息代理伺服器扮演著物聯網平臺中重要的資料傳輸角色,但其本身存在一些限制,例如無法直接通知客戶端存取控制的結果。為確保訊息代理伺服器的穩定性,我們可以利用 Forever 工具來監控和管理 Mosquitto 伺服器,使其在崩潰時自動重啟。此外,設計良好的 REST API 對於有效管理和存取物聯網平臺的資料至關重要。本文將深入探討如何使用 Node-RED 和 MySQL 建立一個 RESTful API,以實現資料的查詢、過濾和刪除等功能。同時,我們也會探討如何處理身份驗證和資料完整性等議題,確保 API 的安全性和可靠性。
訊息代理伺服器的限制
訊息代理伺服器有一些限制,例如,無法通知客戶端存取控制的結果。然而,訊息代理伺服器可以記錄存取控制的結果,例如,記錄哪些使用者存取了哪些主題。
圖表翻譯:
此圖表示訊息代理伺服器的設定和存取控制流程。設定檔案是用於定義訊息代理伺服器的行為和存取控制。一般設定、使用者設定和模式設定是設定檔案的三個主要部分。存取控制是用於控制使用者存取主題的許可權。使用者名稱和客戶端ID是用於識別使用者和客戶端的唯一標識。主題名稱是用於識別主題的唯一標識。訊息代理伺服器的限制包括無法通知客戶端存取控制的結果。
設定訊息代理
為了確保 Node-RED 的穩定執行,我們使用 Forever 工具。現在,我們也要設定訊息代理,以便在代理器崩潰時,Forever 可以自動重啟它。雖然 Forever 工具可以輕易地與 Node.js 應用程式合作,但要使用它來控制 Mosquitto 代理器則需要額外的工作。
首先,我們建立一個 shell 指令碼,該指令碼呼叫 mosquitto
命令。然後,我們使用 Forever 工具執行這個指令碼。建立一個 shell 指令碼,名稱為 mqtt-sh.sh
,內容如下:
#!/bin/sh
mosquitto
儲存這個檔案後,我們使其可執行,然後使用 Forever 工具執行它(先結束之前的 Mosquitto 程序):
# chmod +x mqtt-sh.sh
# pkill mosquitto
# forever start -l mqtt.log --append -c sh /root/mqtt-sh.sh
這個命令在背景中啟動訊息代理,並保持它的執行。所有代理器的輸出都會記錄在 mqtt.log
檔案中,以便檢視。要檢視目前使用 Forever 工具執行的應用程式數量,請在命令列中輸入 forever list
,這將列出所有執行中的應用程式及其執行時間和其他資訊。
內容解密:
上述命令使用 Forever 工具來保持 Mosquitto 代理器的執行。Forever 工具是一個 Node.js 模組,用於保持 Node.js 應用程式的執行。如果應用程式崩潰或意外終止,Forever 工具將自動重啟它。使用 Forever 工具可以確保 Mosquitto 代理器始終保持執行,從而確保整個系統的穩定性。
圖表翻譯:
flowchart TD A[啟動 Forever 工具] --> B[執行 Mosquitto 代理器] B --> C[記錄輸出到 mqtt.log] C --> D[保持代理器執行] D --> E[自動重啟代理器]
這個流程圖描述了使用 Forever 工具來保持 Mosquitto 代理器執行的過程。首先,啟動 Forever 工具,然後執行 Mosquitto 代理器。代理器的輸出將被記錄到 mqtt.log
檔案中。Forever 工具將保持代理器的執行,如果代理器崩潰或終止,Forever 工具將自動重啟它。
建立REST介面
在前面的章節中,我們已經為構建有意義的API奠定了基礎。在本章中,我們將建立資料存取API和實用API。
資料存取API
根據我們的願望清單,要求D1和D2主要是為了訪問時間序列資料儲存中的資料記錄。在第7章中,我們已經實現了核心服務,並且實現了D4要求,即透過HTTP釋出訊息API。
接下來,我們將建立一個API,根據指定的主題或有效載荷條件(或兩者兼有)來查詢一條或多條資料記錄。這個條件可以是一個主題或有效載荷模式,並且可以依賴時間戳,例如需要特定時間段的資料。
所有資料存取API都具有相似的結構,每個API都有一個HTTP輸入節點,用於建立查詢塊,然後由MySQL查詢輸出節點和HTTP響應節點組成。每個HTTP輸入節點都有一個唯一的端點結構和方法,可以用於API呼叫來呼叫該流序列。
在建立查詢節點中,我們將構建一個適合的查詢來查詢所請求的資料,並將其傳遞給MySQL節點,然後由MySQL節點執行實際查詢工作。在準備響應節點中,我們可以新增一些格式化或新增一些其他物件到響應JSON中,以進一步增強API功能。
條件基礎資料存取API
flowchart TD A[HTTP輸入節點] --> B[建立查詢節點] B --> C[MySQL查詢輸出節點] C --> D[HTTP響應節點]
API端點結構
我們的端點結構為 /get/topicLike/:topic/payloadLike/:payload/last/:count
,其中主題和有效載荷輸入是根據萬用字元的。由於我們正在查詢不確定的記錄數,因此我們強制要求新增所請求記錄的數量作為計數引數。
建立查詢函式塊程式碼
// 建立查詢
// 如果沒有身份驗證過濾器定義或可用
// 則設定預設值為1
if (!msg.req.authFilter) {
msg.req.authFilter = 1;
}
// 萬用字元用於API查詢是*,需要轉換為SQL中的通配字元%
msg.topic = "SELECT id, topic, payload, timestamp" +
" FROM thingData WHERE" +
" topic LIKE '" + msg.req.params.topic.replace(/\*/g, "%") + "'" +
" AND" +
" payload LIKE '" + msg.req.params.payload.replace(/\*/g, "%") + "'" +
" AND deleted=0" +
" AND (" + msg.req.authFilter + ")";
圖表翻譯:
此圖表示了條件基礎資料存取API的流序列。HTTP輸入節點接收API呼叫,然後建立查詢節點構建適合的查詢。MySQL查詢輸出節點執行實際查詢工作,然後HTTP響應節點傳回查詢結果。
建立 REST 介面
在本章中,我們將建立一個 REST 介面來存取和管理 IoT 平臺的資料。首先,我們需要準備好程式碼以適應身份驗證過濾。我們檢查 authFilter
物件是否可用,如果不可用,則設定其預設值為 1。這確保我們的程式碼和 API 在新增身份驗證功能之前可以正常工作。
查詢構建
在查詢中,我們使用 *
作為萬用字元,而不是 %
,以避免與 HTML 編碼混淆。同時,我們也檢查非刪除記錄,並新增 authFilter
條件。如果 authFilter
未提供,則預設值為 1,條件變為 AND (1)
,不會改變查詢結果。如果 authFilter
值為 0,則條件變為 AND (0)
,查詢結果將為空。
時間基礎過濾
除了根據模式的資料 API 之外,我們還添加了時間基礎 API。這裡,我們建立了三個端點:一個用於獲取指定主題或主題模式在指定時間戳之後建立的記錄,另一個用於獲取在指定時間戳之前建立的記錄,最後一個用於獲取在兩個時間戳之間建立的記錄。
端點
我們有三個端點:
/get/:topic/after/:time/last/:count
:查詢指定主題或主題模式在指定時間戳之後建立的記錄。/get/:topic/before/:time/last/:count
:查詢指定主題或主題模式在指定時間戳之前建立的記錄。/get/:topic/during/:start/:end/last/:count
:查詢指定主題或主題模式在兩個時間戳之間建立的記錄。
示例
以下是使用 cURL 呼叫這些 API 的示例:
payloadLike/*/last/5
輸出 1:
[
{"id": 18, "topic": "mytopic", "payload": "myplayload", "timestamp": "1543731089.784"},
{"id": 8, "topic": "myTopic", "payload": "myPayload", "timestamp": "1543717154.899"},
{"id": 7, "topic": "myTopic", "payload": "myPayload", "timestamp": "1543716966.189"}
]
payloadLike/*/last/2
輸出 2:
[
{"id": 31, "topic": "timestamp", "payload": "1544011400243", "timestamp": "1544011400.245"},
{"id": 30, "topic": "timestamp", "payload": "1544011399074", "timestamp": "1544011399.078"}
]
payloadLike/*88*/last/2
輸出 3:
[
{"id": 28, "topic": "timestamp", "payload": "1544011288907", "timestamp": "1544011288.910"}
]
在輸出 3 中,雖然我們要求 2 條記錄,但只有一條記錄滿足主題和有效負載模式的要求。
圖表翻譯:
flowchart TD A[查詢構建] --> B[時間基礎過濾] B --> C[端點] C --> D[查詢指定主題或主題模式] D --> E[查詢記錄] E --> F[傳回記錄]
內容解密:
以上程式碼示例展示瞭如何構建查詢和使用時間基礎過濾。同時,也展示瞭如何使用 cURL 呼叫這些 API。輸出結果顯示了查詢結果的 JSON 格式。
時間戳記查詢
在查詢記錄時,提供開始和結束的時間戳記是非常重要的。這樣可以根據時間戳記來篩選出特定時間段內的記錄。根據使用的端點不同,建立查詢函式的方式也會略有不同,因為查詢語句不同。以下是三個程式碼片段:
建立「之後」查詢
如果沒有定義或可用的身份驗證過濾器,則設定預設值為 1。
if (!msg.req.authFilter) {
msg.req.authFilter = 1;
}
// 將 API 查詢中的萬字元 * 轉換為 SQL 中的萬字元 %
msg.topic = "SELECT id, topic, payload, timestamp" +
" FROM thingData WHERE" +
" topic LIKE '" + msg.req.params.topic.replace(/\*/g, "%") + "'" +
" AND" +
" timestamp >= '" + msg.req.params.time + "'" +
" AND deleted=0" +
" AND (" + msg.req.authFilter + ")" +
" ORDER BY timestamp";
return msg;
建立「之前」查詢
如果沒有定義或可用的身份驗證過濾器,則設定預設值為 1。
if (!msg.req.authFilter) {
msg.req.authFilter = 1;
}
// 將 API 查詢中的萬字元 * 轉換為 SQL 中的萬字元 %
msg.topic = "SELECT id, topic, payload, timestamp" +
" FROM thingData WHERE" +
" topic LIKE '" + msg.req.params.topic.replace(/\*/g, "%") + "'" +
" AND" +
" timestamp <= '" + msg.req.params.time + "'" +
" AND deleted=0" +
" AND (" + msg.req.authFilter + ")" +
" ORDER BY timestamp";
return msg;
內容解密:
上述程式碼片段展示瞭如何根據時間戳記查詢記錄。首先,檢查是否有身份驗證過濾器,如果沒有,則設定預設值為 1。然後,將 API 查詢中的萬字元 * 轉換為 SQL 中的萬字元 %。接著,構建查詢語句,根據時間戳記篩選出記錄,並根據時間戳記排序。
圖表翻譯:
flowchart TD A[開始] --> B[檢查身份驗證過濾器] B --> C[設定預設值] C --> D[轉換萬字元] D --> E[構建查詢語句] E --> F[篩選記錄] F --> G[排序記錄] G --> H[傳回結果]
上述流程圖展示了查詢記錄的過程,從檢查身份驗證過濾器開始,到設定預設值、轉換萬字元、構建查詢語句、篩選記錄、排序記錄,最終傳回結果。
資料查詢與過濾
在進行資料查詢時,需要考慮到查詢條件的設定,包括主題(topic)、時間範圍(timestamp)等。以下是資料查詢的程式碼實現:
def query_data(msg):
# 將 wildcard 字元 * 轉換為 SQL 的 LIKE 字元 %
topic = msg.req.params.topic.replace('*', '%')
# 設定查詢條件
query = "SELECT id, topic, payload, timestamp "
query += "FROM thingData "
query += "WHERE topic LIKE '" + topic + "' "
query += "AND timestamp <= '" + msg.req.params.time + "' "
query += "AND deleted = 0 "
# 如果沒有設定驗證過濾器,則使用預設值 1
if not msg.req.authFilter:
msg.req.authFilter = 1
query += "AND (" + msg.req.authFilter + ") "
query += "ORDER BY timestamp "
return query
def query_data_during(msg):
# 將 wildcard 字元 * 轉換為 SQL 的 LIKE 字元 %
topic = msg.req.params.topic.replace('*', '%')
# 設定查詢條件
query = "SELECT id, topic, payload, timestamp "
query += "FROM thingData "
query += "WHERE topic LIKE '" + topic + "' "
query += "AND timestamp >= '" + msg.req.params.start + "' "
query += "AND deleted = 0 "
# 如果沒有設定驗證過濾器,則使用預設值 1
if not msg.req.authFilter:
msg.req.authFilter = 1
query += "AND (" + msg.req.authFilter + ") "
query += "ORDER BY timestamp "
return query
內容解密:
以上程式碼實現了資料查詢的功能,包括主題查詢、時間範圍查詢等。其中,query_data
函式實現了資料查詢的基本功能,而 query_data_during
函式則實現了特定時間範圍內的資料查詢。
圖表翻譯:
以下是資料查詢流程的視覺化圖表:
flowchart TD A[收到查詢請求] --> B[設定查詢條件] B --> C[轉換 wildcard 字元] C --> D[組裝查詢語句] D --> E[執行查詢] E --> F[傳回查詢結果]
這個圖表展示了資料查詢的流程,從收到查詢請求開始,到傳回查詢結果為止。每一步驟都對應到程式碼中的特定功能。
建立一個 RESTful 介面
在本章中,我們將建立一個 RESTful 介面來管理資料。REST(Representational State of Resource)是一種設計風格,提供了一種簡單、統一的方式來訪問和操作資源。
查詢資料
首先,我們需要建立一個查詢資料的 API。以下是查詢資料的 SQL 語句:
SELECT * FROM data
WHERE topic = 'mytopic'
AND timestamp <=' + msg.req.params.end + '
AND deleted=0
AND (' + msg.req.authFilter + ')
ORDER BY id;
這個查詢語句查詢 mytopic
主題下的所有資料,時間戳小於或等於 end
引數,且 deleted
欄位為 0。
刪除資料
接下來,我們需要建立一個刪除資料的 API。刪除資料可以分為兩種方式:可恢復刪除和不可恢復刪除。可恢復刪除是指將資料標記為已刪除,但不真正刪除資料。不可恢復刪除是指真正刪除資料。
可恢復刪除
可恢復刪除是指將資料的 deleted
欄位設為 1。以下是可恢復刪除的 API 端點:
@app.route('/delete/:topic/id/:id', methods=['DELETE'])
def delete_data(topic, id):
# 查詢資料
data = Data.query.filter_by(topic=topic, id=id).first()
if data:
# 標記資料為已刪除
data.deleted = 1
db.session.commit()
return {'message': '資料已刪除'}
else:
return {'message': '資料不存在'}
不可恢復刪除
不可恢復刪除是指真正刪除資料。以下是不可恢復刪除的 API 端點:
@app.route('/delete/:topic/id/:id/permanent', methods=['DELETE'])
def delete_data_permanent(topic, id):
# 查詢資料
data = Data.query.filter_by(topic=topic, id=id).first()
if data:
# 刪除資料
db.session.delete(data)
db.session.commit()
return {'message': '資料已刪除'}
else:
return {'message': '資料不存在'}
API 端點
以下是所有 API 端點:
GET /data/:topic
: 查詢mytopic
主題下的所有資料DELETE /delete/:topic/id/:id
: 可恢復刪除資料DELETE /delete/:topic/id/:id/permanent
: 不可恢復刪除資料
測試 API
以下是測試 API 的範例:
curl -X GET 'http://localhost:5000/data/mytopic'
curl -X DELETE 'http://localhost:5000/delete/mytopic/id/1'
curl -X DELETE 'http://localhost:5000/delete/mytopic/id/1/permanent'
建立一個 RESTful 介面
在本章中,我們將探討如何建立一個 RESTful 介面來管理物聯網裝置的資料。REST(Representational State of Resource)是一種廣泛使用的網路架構風格,根據 HTTP 協議,使用 JSON 或 XML 格式來交換資料。
刪除裝置資料
首先,我們來看一下如何刪除裝置資料。以下是刪除裝置資料的 API 程式碼:
if not msg.req.authFilter:
msg.req.authFilter = 1
msg.topic = "UPDATE thingData" + \
" SET deleted=1" + \
" WHERE" + \
" topic='" + msg.req.params.topic + "'" + \
" AND (" + msg.req.authFilter + ")" + \
" AND id=" + msg.req.params.id + ";"
return msg
在這段程式碼中,我們更新了裝置資料表中的記錄,將 deleted
欄位設為 1,以標記記錄為已刪除。注意,我們並沒有真正地刪除記錄,而是將其標記為已刪除,以便於日後的恢復。
批次刪除裝置資料
接下來,我們來看一下如何批次刪除裝置資料。以下是批次刪除裝置資料的 API 程式碼:
if not msg.req.authFilter:
msg.req.authFilter = 1
msg.topic = "UPDATE thingData" + \
" SET deleted=1" + \
" WHERE" + \
" topic='" + msg.req.params.topic + "'" + \
" AND (" + msg.req.authFilter + ");"
return msg
在這段程式碼中,我們批次更新了裝置資料表中的記錄,將所有與給定主題相關的記錄標記為已刪除。
處理 MySQL 節點的響應
最後,我們需要修改 prepare response
函式來處理 MySQL 節點的響應。以下是修改過的程式碼:
// Prepare response
在這段程式碼中,我們需要處理 MySQL 節點傳回的資料,提取出我們需要的資訊,並傳回給使用者。
圖表翻譯:
flowchart TD A[刪除裝置資料] --> B[更新裝置資料表] B --> C[標記記錄為已刪除] C --> D[傳回結果] D --> E[批次刪除裝置資料] E --> F[更新裝置資料表] F --> G[標記記錄為已刪除] G --> H[傳回結果]
在這個流程圖中,我們展示了刪除裝置資料和批次刪除裝置資料的流程。首先,我們更新裝置資料表中的記錄,將 deleted
欄位設為 1,以標記記錄為已刪除。然後,我們傳回結果給使用者。批次刪除裝置資料的流程類似,但我們批次更新了裝置資料表中的記錄。
建立 REST 介面
在本章中,我們將探討如何建立一個 REST 介面來管理時間序列資料。REST(Representational State of Resource)是一種廣泛使用的網路架構風格,允許使用者端和伺服器之間進行通訊。
刪除記錄
首先,我們來看一下刪除記錄的 API。這個 API 的路徑是 /delete/:topic/last/:count
,其中 :topic
是主題名稱,:count
是要刪除的記錄數量。
if (!msg.req.authFilter) {
msg.req.authFilter = 1;
}
msg.topic = "DELETE thingData" +
" WHERE deleted=1" +
" AND" +
" topic='" + msg.req.params.topic + "'" +
" AND (" + msg.req.authFilter + ")";
return msg;
這個 API 的作用是刪除指定主題下最新的 :count
條記錄。若沒有指定記錄數量,則預設刪除 1 條記錄。
刪除記錄(從開始)
另一個 API 的路徑是 /delete/:topic/first/:count
,這個 API 的作用是刪除指定主題下最舊的 :count
條記錄。
// ...
這個 API 的實現與前一個 API 類似,不過排序方式不同。前一個 API 排序方式是 DESC(降序),而這個 API 排序方式是 ASC(升序)。
時間序列資料儲存
在時間序列資料儲存中,刪除記錄並不會真正地從資料庫中刪除,而是將記錄的 deleted
標誌設定為 1。這樣,記錄仍然存在於資料庫中,但不會被查詢出來。
// ...
這種方式可以保留資料的完整性和可追溯性。
示例
以下是使用 cURL 命令測試這些 API 的示例:
curl -X DELETE 'http://example.com/delete/timestamp/last/1'
curl -X DELETE 'http://example.com/delete/timestamp/first/1'
這些命令分別刪除最新的一條記錄和最舊的一條記錄。
內容解密:
上述 API 的實現使用了 JavaScript 和 Node.js。API 的路徑和查詢引數使用了 Express.js 框架。資料庫操作使用了 MySQL。
const express = require('express');
const mysql = require('mysql');
const app = express();
// ...
這個示例展示瞭如何使用 Node.js 和 Express.js 建立一個 REST 介面來管理時間序列資料。
圖表翻譯:
以下是 API 的流程圖:
flowchart TD A[刪除記錄] --> B[查詢記錄] B --> C[排序記錄] C --> D[刪除記錄] D --> E[傳回結果]
這個流程圖展示了 API 的主要步驟,包括刪除記錄、查詢記錄、排序記錄和傳回結果。
時序資料儲存的資料刪除和清除
在時序資料儲存中,刪除和清除資料是兩個不同的過程。刪除資料是指標記資料為已刪除,但不會立即從儲存中移除。清除資料是指完全從儲存中移除已刪除的資料。
從系統架構視角來看,訊息代理伺服器在物聯網平臺中扮演著關鍵角色,負責訊息的路由和轉發,但其本身的限制,例如無法直接通知客戶端存取控制結果,需要透過額外的機制,例如日誌記錄來彌補。本文深入探討瞭如何利用 Forever 工具確保 Mosquitto 訊息代理伺服器的穩定執行,以及如何構建 RESTful 介面來實現資料的查詢、過濾和刪除,包含條件式查詢、時間戳記查詢以及批次刪除等功能。文章中提供的程式碼範例和流程圖清晰地展示了 API 的設計和運作方式,並特別強調了軟刪除機制,以確保資料的完整性和可追溯性。然而,目前方案的 RESTful 介面設計仍有改進空間,例如可以匯入更完善的錯誤處理機制和版本控制。展望未來,隨著物聯網裝置的普及和資料量的增長,訊息代理伺服器和 RESTful 介面的效能和可擴充套件性將面臨更大的挑戰,需要持續最佳化和改進,例如引入更進階的訊息佇列技術和資料庫分片策略。玄貓認為,掌握這些關鍵技術,才能構建更穩健、高效的物聯網平臺。