在微服務架構中,服務間的非同步通訊仰賴訊息佇列系統。本文以 RabbitMQ 為例,探討如何有效地實作間接訊息傳遞機制。首先,為瞭解決微服務與 RabbitMQ 啟動順序相依性的問題,我們可以利用 wait-port
工具確保 RabbitMQ 服務啟動完成後,微服務才開始運作,避免連線錯誤。接著,文章將深入探討如何利用 RabbitMQ 的交換器和佇列機制,實作單一接收者和多重接收者訊息的處理。針對多重接收者的情境,我們會介紹如何使用 fanout
交換器進行訊息廣播,讓所有訂閱的微服務都能接收到訊息。此外,文章也會說明如何設定匿名佇列,確保訊息的有效傳遞和資源的自動釋放。最後,我們將提供完整的程式碼範例和流程圖,幫助讀者理解 RabbitMQ 在微服務架構中的實際應用。
微服務之間的通訊:解決RabbitMQ啟動順序問題
在微服務架構中,各個服務之間的通訊是非常重要的。然而,在實際佈署中,可能會遇到服務啟動順序的問題。例如,當我們的微服務試圖連線RabbitMQ伺服器時,如果RabbitMQ尚未完全啟動並準備好接受連線,則會導致錯誤和中止。
為瞭解決這個問題,我們可以使用一個簡單的工作-around:在Dockerfile中增加一個額外的命令,延遲啟動微服務直到RabbitMQ伺服器準備好。這裡,我們使用wait-port
命令,該命令可以等待指定的埠號是否已經開放。
首先,我們需要安裝wait-port
命令:
npm install --save wait-port
然後,我們更新history微服務的Dockerfile以包含wait-port
命令:
FROM node:18.17.1
WORKDIR /usr/src/app
COPY package*.json./
# 安裝wait-port命令
RUN npm install --save wait-port
# 設定RabbitMQ連線URL
ENV RABBIT_URL=amqp://guest:guest@rabbit:5672
# 等待RabbitMQ埠號開放
CMD ["wait-port", "5672", "&&", "npm", "start"]
在這個Dockerfile中,我們首先安裝wait-port
命令,然後設定RabbitMQ連線URL。最後,我們使用wait-port
命令等待RabbitMQ埠號(5672)開放,如果開放則執行npm start
命令啟動微服務。
這樣,當我們啟動history微服務時,它將會等待RabbitMQ伺服器準備好後才啟動,避免了啟動順序問題。
圖表翻譯:
sequenceDiagram participant History微服務 participant RabbitMQ伺服器 History微服務->>RabbitMQ伺服器: 連線請求 RabbitMQ伺服器->>History微服務: 埠號未開放 History微服務->>wait-port: 等待埠號開放 wait-port->>History微服務:埠號已開放 History微服務->>RabbitMQ伺服器: 連線請求 RabbitMQ伺服器->>History微服務: 連線成功
這個圖表顯示了history微服務和RabbitMQ伺服器之間的通訊過程,包括等待埠號開放和連線請求的過程。
使用 RabbitMQ 實作間接訊息傳遞
在微服務架構中,間接訊息傳遞是一種重要的溝通方式。RabbitMQ 是一種流行的訊息代理軟體,可以用於實作間接訊息傳遞。在本文中,我們將介紹如何使用 RabbitMQ 實作單一接收者間接訊息傳遞。
單一接收者間接訊息傳遞
單一接收者間接訊息傳遞是一種一對一的訊息傳遞方式,保證每個訊息只會被一個微服務接收。這種方式適合於分配任務給多個微服務,但每個任務只需要被一個微服務處理。
使用 wait-port 等待 RabbitMQ 伺服器啟動
在啟動微服務之前,我們需要等待 RabbitMQ 伺服器啟動並開始接受連線。可以使用 wait-port
命令來實作這個功能。
npx wait-port rabbit:5672
接收單一接收者間接訊息
要接收單一接收者間接訊息,需要先連線到 RabbitMQ 伺服器,然後宣告一個訊息佇列。可以使用 assertQueue
方法來宣告一個訊息佇列,如果佇列不存在則會自動建立。
async function main() {
//...
await messageChannel.assertQueue("viewed", {});
//...
}
處理訊息
當訊息到達時,需要使用 consume
方法來處理訊息。可以使用 insertOne
方法將訊息儲存到資料函式庫中。
messageChannel.consume("viewed", (msg) => {
const parsedMsg = JSON.parse(msg.content.toString());
historyCollection.insertOne({
videoPath: parsedMsg.videoPath,
});
messageChannel.ack(msg);
});
圖表翻譯:
以下是使用 Mermaid 語法繪製的 RabbitMQ 訊息佇列圖表:
graph LR A[Microservice] -->| send message | B[RabbitMQ] B -->| route message | C[Message Queue] C -->| deliver message | D[Microservice]
這個圖表展示了微服務之間的訊息傳遞過程,包括傳送訊息、路由訊息和接收訊息。
使用 RabbitMQ 進行微服務間的溝通
在微服務架構中,各個服務之間的溝通是一個非常重要的議題。RabbitMQ 是一個流行的訊息佇列系統,能夠幫助我們實作微服務間的溝通。在本文中,我們將探討如何使用 RabbitMQ 傳送和接收訊息。
接收訊息
首先,我們需要建立一個 RabbitMQ 連線,並宣告一個佇列來接收訊息。以下是示例程式碼:
const amqp = require('amqplib');
async function receiveViewedMessage() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue('viewed', { durable: true });
channel.consume('viewed', (msg) => {
if (msg!== null) {
const message = JSON.parse(msg.content.toString());
console.log(`Received message: ${message.videoPath}`);
channel.ack(msg);
}
});
}
在這個示例中,我們建立了一個 RabbitMQ 連線,並聲明瞭一個名為 viewed
的佇列。然後,我們使用 consume
方法來接收佇列中的訊息。當我們接收到一條訊息時,我們將其解析為 JSON 物件,並記錄影片路徑。
傳送訊息
傳送訊息比接收訊息更簡單。以下是示例程式碼:
async function sendViewedMessage(messageChannel, videoPath) {
const msg = { videoPath: videoPath };
const jsonMsg = JSON.stringify(msg);
messageChannel.publish('', 'viewed', Buffer.from(jsonMsg));
}
在這個示例中,我們建立了一個 JSON 物件,並將其轉換為字串。然後,我們使用 publish
方法來傳送訊息到 viewed
佇列。
使用 RabbitMQ 的優點
使用 RabbitMQ 有幾個優點:
- 解耦: RabbitMQ 能夠幫助我們解耦微服務之間的依賴關係,使得系統更加靈活和可擴充套件。
- 可靠性: RabbitMQ 提供了可靠的訊息傳遞機制,能夠確保訊息不會丟失或重複傳遞。
- 高效能: RabbitMQ 能夠處理大量的訊息傳遞,具有高效能和低延遲的特點。
圖表翻譯:
graph LR A[微服務 A] -->|傳送訊息|> B[RabbitMQ] B -->|轉發訊息|> C[微服務 B] C -->|處理訊息|> D[記錄影片路徑]
在這個圖表中,我們展示了微服務 A 如何傳送訊息到 RabbitMQ,然後 RabbitMQ 如何轉發訊息到微服務 B。微服務 B 收到訊息後,會處理訊息並記錄影片路徑。
內容解密:
在上面的程式碼中,我們使用了 amqplib
函式庫來連線 RabbitMQ 伺服器,並聲明瞭一個佇列來接收訊息。當我們接收到一條訊息時,我們將其解析為 JSON 物件,並記錄影片路徑。在傳送訊息的程式碼中,我們建立了一個 JSON 物件,並將其轉換為字串。然後,我們使用 publish
方法來傳送訊息到 viewed
佇列。
使用 RabbitMQ 實作間接訊息傳遞
在微服務架構中,間接訊息傳遞是一種常見的溝通方式。RabbitMQ 是一種流行的訊息代理伺服器,提供了高效、可靠的訊息傳遞機制。在本文中,我們將探討如何使用 RabbitMQ 實作間接訊息傳遞。
單一接收者訊息
首先,我們來看一下單一接收者訊息的實作。當一個微服務想要傳送訊息給另一個微服務時,可以使用 RabbitMQ 的佇列(Queue)功能。傳送者將訊息傳送到佇列中,接收者從佇列中取出訊息進行處理。
以下是傳送「Viewed」訊息的範例:
// 定義訊息 payload
const message = {
videoId: '123',
userId: '456'
};
// 將訊息轉換為 JSON 格式
const jsonMessage = JSON.stringify(message);
// 傳送訊息到 viewed 佇列
channel.sendToQueue('viewed', Buffer.from(jsonMessage));
在這個範例中,傳送者將「Viewed」訊息傳送到 viewed
佇列中,接收者可以從佇列中取出訊息進行處理。
多接收者訊息
除了單一接收者訊息外,RabbitMQ 還支援多接收者訊息,也就是廣播式訊息傳遞。當一個微服務想要傳送訊息給多個其他微服務時,可以使用 RabbitMQ 的交換器(Exchange)功能。
以下是使用交換器傳送「Viewed」訊息的範例:
// 定義訊息 payload
const message = {
videoId: '123',
userId: '456'
};
// 將訊息轉換為 JSON 格式
const jsonMessage = JSON.stringify(message);
// 傳送訊息到 viewed 交換器
channel.publish('viewed', '', Buffer.from(jsonMessage));
在這個範例中,傳送者將「Viewed」訊息傳送到 viewed
交換器中,交換器會將訊息路由到多個匿名佇列中,由多個微服務接收和處理。
圖表翻譯:
以下是使用 Mermaid 語法繪製的圖表,展示了 RabbitMQ 的交換器和佇列之間的關係:
graph LR A[傳送者] -->|傳送訊息|> B[交換器] B -->|路由訊息|> C[佇列1] B -->|路由訊息|> D[佇列2] C -->|處理訊息|> E[接收者1] D -->|處理訊息|> F[接收者2]
這個圖表展示了傳送者將訊息傳送到交換器,交換器將訊息路由到多個佇列中,由多個接收者接收和處理。
如何實作訊息廣播給多個微服務
在微服務架構中,實作訊息廣播給多個微服務是一個常見的需求。這可以透過使用訊息佇列(Message Queue)來實作。以下是使用 RabbitMQ 實作訊息廣播的步驟:
步驟 1:建立匿名佇列
每個微服務需要建立一個匿名佇列(Anonymous Queue),這個佇列是唯一的,並且只屬於該微服務。這可以透過 RabbitMQ 的 assertQueue
方法來實作,指定 { exclusive: true }
選項以建立一個唯一的佇列。
步驟 2:繫結佇列到交換器
建立佇列後,需要將佇列繫結到一個交換器(Exchange)。交換器是 RabbitMQ 中的一個重要概念,它負責將訊息路由到不同的佇列。這裡,我們使用的是 fanout
交換器,它會將訊息廣播到所有繫結到它的佇列。
步驟 3:發布訊息到交換器
當需要廣播訊息時,發布者只需要將訊息發布到交換器即可。交換器會自動將訊息路由到所有繫結到它的佇列。
步驟 4:消費者接收訊息
每個微服務都可以從自己的匿名佇列中接收訊息。這可以透過 RabbitMQ 的 consume
方法來實作。
優點
使用 RabbitMQ 實作訊息廣播有以下優點:
- 解耦: 生產者和消費者之間解耦,生產者不需要知道消費者的存在。
- 可擴充套件: 可以輕鬆地增加或減少消費者數量,而不需要修改生產者。
- 容錯: 如果有一個消費者失敗,其他消費者仍然可以接收訊息。
程式碼示例
以下是使用 Node.js 和 RabbitMQ 實作訊息廣播的程式碼示例:
const amqp = require('amqplib');
// 建立連線
async function createConnection() {
const connection = await amqp.connect('amqp://localhost');
return connection;
}
// 建立通道
async function createChannel(connection) {
const channel = await connection.createChannel();
return channel;
}
// 建立匿名佇列
async function createQueue(channel) {
const queue = await channel.assertQueue('', { exclusive: true });
return queue;
}
// 繫結佇列到交換器
async function bindQueue(channel, queue, exchange) {
await channel.bindQueue(queue.queue, exchange, '');
}
// 發布訊息到交換器
async function publishMessage(channel, exchange, message) {
await channel.publish(exchange, '', Buffer.from(message));
}
// 消費者接收訊息
async function consumeMessage(channel, queue) {
await channel.consume(queue.queue, (msg) => {
console.log(`Received message: ${msg.content.toString()}`);
channel.ack(msg);
});
}
// 主函式
async function main() {
const connection = await createConnection();
const channel = await createChannel(connection);
// 建立匿名佇列
const queue = await createQueue(channel);
// 繫結佇列到交換器
const exchange = 'viewed';
await bindQueue(channel, queue, exchange);
// 發布訊息到交換器
const message = 'Hello, world!';
await publishMessage(channel, exchange, message);
// 消費者接收訊息
await consumeMessage(channel, queue);
}
main();
這個程式碼示例展示瞭如何建立匿名佇列,繫結佇列到交換器,發布訊息到交換器,和消費者接收訊息。
RabbitMQ 中的匿名佇列設定
在 RabbitMQ 中,當我們需要建立一個臨時的、不需要持久化的佇列時,可以使用匿名佇列。這種佇列通常用於微服務之間的通訊,尤其是在需要自動管理佇列生命週期的情況下。
設定 exclusive 選項
為了確保當微服務斷開連線時,佇列能夠自動被釋放,我們需要將 exclusive
選項設定為 true
。這樣做的好處是避免了記憶體洩漏,因為當微服務不再需要這個佇列時,佇列會被自動刪除。
channel.queue_declare(exclusive=True)
取得匿名佇列名稱
當我們建立一個匿名佇列時,RabbitMQ 會自動為其生成一個唯一的名稱。為了能夠在後續的操作中參照這個佇列,我們需要提取出這個生成的名稱。
queue_name = channel.queue_declare(exclusive=True).method.queue
實際應用場景
在微服務架構中,匿名佇列可以用於實作服務之間的解耦。例如,當一個服務需要向另一個服務傳送訊息時,可以將訊息傳送到一個匿名佇列中。接收端的服務可以從這個佇列中消費訊息,並在處理完訊息後自動確認訊息已被處理,從而實作了訊息的可靠傳遞。
內容解密:
上述程式碼片段展示瞭如何在 RabbitMQ 中建立一個匿名佇列,並提取出其名稱。這個過程涉及到設定 exclusive
選項以確保佇列的自動釋放,以及取得生成的佇列名稱以便後續操作。
flowchart TD A[建立匿名佇列] --> B[設定 exclusive 選項] B --> C[取得生成的佇列名稱] C --> D[使用佇列名稱進行訊息傳送或接收]
圖表翻譯:
此圖表描述了建立和使用 RabbitMQ 匿名佇列的流程。首先,建立一個匿名佇列,然後設定 exclusive
選項以確保其自動釋放。接著,提取出生成的佇列名稱,以便在後續的訊息傳送或接收操作中使用。這個流程確保了匿名佇列的正確使用和管理,避免了記憶體洩漏和其他潛在問題。
訊息處理流程
在訊息佇列(Queue)與交換器(Exchange)之間建立連結,讓訊息可以從交換器正確地路由到對應的佇列中。這個過程確保了訊息的正確傳遞和處理。
訊息接收
接收來自匿名佇列的訊息,這些訊息可能包含了特定的資料或指令,需要被解析和處理。匿名佇列允許不需要明確宣告就能傳送訊息,這增加了系統的靈活性和可擴充套件性。
JSON 訊息解析
將接收到的 JSON 格式的訊息解析成 JavaScript 物件。這個步驟是將字串形式的 JSON 資料轉換成可在程式中直接操作的物件,方便後續的處理和分析。
錄製歷史
將解析後的檢視或相關資料記錄到歷史資料函式庫中。這個步驟對於追蹤系統的使用歷史、分析使用者行為以及進行後續的最佳化和維護非常重要。
錯誤處理和確認
如果在整個處理流程中沒有發生錯誤,則對於成功接收和處理的訊息進行確認。這通常涉及向訊息傳送方傳送確認訊息,以確保訊息已被成功處理,避免重複處理或遺失訊息。
// 範例程式碼:接收和處理訊息
function processMessage(message) {
try {
// 解析 JSON 訊息
const jsonData = JSON.parse(message);
// 錄製歷史
recordHistory(jsonData);
// 確認訊息
acknowledgeMessage(message);
} catch (error) {
// 處理錯誤
handleError(error);
}
}
// 解析 JSON 訊息並錄製歷史
function recordHistory(data) {
// 將資料記錄到歷史資料函式庫
db.record(data);
}
// 確認訊息
function acknowledgeMessage(message) {
// 向訊息傳送方傳送確認訊息
sender.acknowledge(message);
}
// 處理錯誤
function handleError(error) {
// 錄製錯誤日誌並通知管理員
logError(error);
notifyAdmin(error);
}
圖表翻譯:
flowchart TD A[接收訊息] --> B[解析 JSON] B --> C[錄製歷史] C --> D[確認訊息] D --> E[錯誤處理]
在這個流程圖中,我們可以看到從接收訊息開始,到解析 JSON、錄製歷史、確認訊息,最後到錯誤處理的完整流程。每一步驟都對應著特定的功能和邏輯,確保了系統的穩定性和可靠性。
使用 RabbitMQ 實作間接訊息傳遞
在分散式系統中,訊息傳遞是一個重要的功能,尤其是在微服務架構中。RabbitMQ 是一個流行的訊息代理伺服器,可以用於實作間接訊息傳遞。在本文中,我們將探討如何使用 RabbitMQ 實作多重接收者訊息傳遞。
傳送多重接收者訊息
傳送多重接收者訊息與傳送單一接收者訊息類別似。以下是 video-streaming 微服務的 index.js
檔案的一個片段(chapter-4/example-4/video-streaming/src/index.js):
async function main() {
const messageChannel = await messagingConnection.createChannel();
await messageChannel.assertExchange(
"viewed",
"fanout"
);
function broadcastViewedMessage(messageChannel, videoPath) {
const msg = { videoPath: videoPath };
const jsonMsg = JSON.stringify(msg);
messageChannel.publish(
"viewed",
"",
Buffer.from(jsonMsg)
);
}
app.get("/video", async (req, res) => {
//...
sendViewedMessage(messageChannel, videoPath);
});
}
在這個片段中,我們首先建立了一個 RabbitMQ 連線,並建立了一個名為 “viewed” 的交換器(exchange)。然後,我們定義了一個 broadcastViewedMessage
函式,用於傳送 “Viewed” 訊息給所有連線到該交換器的接收者。
測試多重接收者訊息
為了測試我們的更新程式碼,我們增加了一個新的微服務:recommendations 微服務。這個微服務只是一個 stub,什麼也不做,只是列印預出它收到的訊息。這足以展示多重接收者訊息傳遞的功能。
以下是 broadcastViewedMessage
函式的詳細實作:
function broadcastViewedMessage(messageChannel, videoPath) {
const msg = { videoPath: videoPath };
const jsonMsg = JSON.stringify(msg);
messageChannel.publish(
"viewed",
"",
Buffer.from(jsonMsg)
);
}
在這個函式中,我們首先建立了一個包含影片路徑的訊息物件。然後,我們將該物件轉換為 JSON 字串,並使用 publish
函式將其傳送給 “viewed” 交換器。
使用 Mermaid 圖表展示訊息傳遞流程
以下是使用 Mermaid 圖表展示訊息傳遞流程:
flowchart TD A[傳送影片請求] --> B[建立影片路徑訊息] B --> C[傳送影片路徑訊息給 "viewed" 交換器] C --> D[交換器將訊息轉發給所有連線的接收者] D --> E[接收者處理影片路徑訊息]
這個圖表展示了影片請求如何觸發影片路徑訊息的建立和傳送,並且如何將該訊息傳送給 “viewed” 交換器,然後交換器將其轉發給所有連線的接收者。
圖表翻譯:
這個圖表展示了 RabbitMQ 中的間接訊息傳遞流程。當使用者傳送影片請求時,系統建立了一個包含影片路徑的訊息物件,並將其傳送給 “viewed” 交換器。交換器接收到該訊息後,將其轉發給所有連線到該交換器的接收者。每個接收者都可以處理該訊息,並執行相應的業務邏輯。這個流程展示了 RabbitMQ 如何實作間接訊息傳遞和解耦不同的微服務。
5.8間接訊息與微服務之間的溝通
在微服務架構中,間接訊息是一種重要的溝通方式。與直接訊息不同,間接訊息不需要指定特定的接收者,而是將訊息釋出到一個交換器(exchange),然後由多個佇列(queue)接收和處理。
微服務架構的興起,使得服務間的非同步通訊機制越發重要。本文深入探討了RabbitMQ在微服務通訊中的應用,特別是解決啟動順序問題和實作間接訊息傳遞的技巧。透過wait-port機制,有效地規避了微服務啟動時RabbitMQ尚未準備好的問題,確保系統的穩定性。此外,文章詳細闡述瞭如何利用RabbitMQ的交換器和佇列機制,實作單一和多重接收者的間接訊息傳遞,充分展現了RabbitMQ的靈活性與擴充套件性。然而,訊息傳遞機制也存在潛在的訊息丟失和延遲風險,需要完善的監控和錯誤處理機制。展望未來,隨著Serverless和事件驅動架構的普及,RabbitMQ等訊息佇列系統將扮演更關鍵的角色。對於追求高用性和可擴充套件性的微服務架構,深入理解和應用RabbitMQ的進階特性,例如訊息確認、重試機制和死信佇列等,將是提升系統穩定性和效能的關鍵所在。玄貓認為,RabbitMQ作為成熟的訊息佇列解決方案,在微服務架構中具有廣泛的應用前景,值得技術團隊深入研究和應用。