在微服務架構中,服務間的非同步通訊仰賴訊息佇列系統。本文以 RabbitMQ 為例,探討如何有效地實作間接訊息傳遞機制。首先,為瞭解決微服務與 RabbitMQ 啟動順序相依性的問題,我們可以利用 wait-port 工具確保 RabbitMQ 服務啟動完成後,微服務才開始運作,避免連線錯誤。接著,文章將深入探討如何利用 RabbitMQ 的交換器和佇列機制,實作單一接收者和多重接收者訊息的處理。針對多重接收者的情境,我們會介紹如何使用 fanout 交換器進行訊息廣播,讓所有訂閱的微服務都能接收到訊息。此外,文章也會說明如何設定匿名佇列,確保訊息的有效傳遞和資源的自動釋放。最後,我們將提供完整的程式碼範例和流程圖,幫助讀者理解 RabbitMQ 在微服務架構中的實際應用。

微服務之間的通訊:解決RabbitMQ啟動順序問題

在微服務架構中,各個服務之間的通訊是非常重要的。然而,在實際佈署中,可能會遇到服務啟動順序的問題。例如,當我們的微服務試圖連線RabbitMQ伺服器時,如果RabbitMQ尚未完全啟動並準備好接受連線,則會導致錯誤和中止。

為瞭解決這個問題,我們可以使用一個簡單的工作-around:在Dockerfile中增加一個額外的命令,延遲啟動微服務直到RabbitMQ伺服器準備好。這裡,我們使用wait-port命令,該命令可以等待指定的埠號是否已經開放。

首先,我們需要安裝wait-port命令:

npm install --save wait-port

然後,我們更新history微服務的Dockerfile以包含wait-port命令:

FROM node:18.17.1
WORKDIR /usr/src/app

COPY package*.json./

# 安裝wait-port命令
RUN npm install --save wait-port

# 設定RabbitMQ連線URL
ENV RABBIT_URL=amqp://guest:guest@rabbit:5672

# 等待RabbitMQ埠號開放
CMD ["wait-port", "5672", "&&", "npm", "start"]

在這個Dockerfile中,我們首先安裝wait-port命令,然後設定RabbitMQ連線URL。最後,我們使用wait-port命令等待RabbitMQ埠號(5672)開放,如果開放則執行npm start命令啟動微服務。

這樣,當我們啟動history微服務時,它將會等待RabbitMQ伺服器準備好後才啟動,避免了啟動順序問題。

圖表翻譯:

  sequenceDiagram
    participant History微服務
    participant RabbitMQ伺服器
    History微服務->>RabbitMQ伺服器: 連線請求
    RabbitMQ伺服器->>History微服務: 埠號未開放
    History微服務->>wait-port: 等待埠號開放
    wait-port->>History微服務:埠號已開放
    History微服務->>RabbitMQ伺服器: 連線請求
    RabbitMQ伺服器->>History微服務: 連線成功

這個圖表顯示了history微服務和RabbitMQ伺服器之間的通訊過程,包括等待埠號開放和連線請求的過程。

使用 RabbitMQ 實作間接訊息傳遞

在微服務架構中,間接訊息傳遞是一種重要的溝通方式。RabbitMQ 是一種流行的訊息代理軟體,可以用於實作間接訊息傳遞。在本文中,我們將介紹如何使用 RabbitMQ 實作單一接收者間接訊息傳遞。

單一接收者間接訊息傳遞

單一接收者間接訊息傳遞是一種一對一的訊息傳遞方式,保證每個訊息只會被一個微服務接收。這種方式適合於分配任務給多個微服務,但每個任務只需要被一個微服務處理。

使用 wait-port 等待 RabbitMQ 伺服器啟動

在啟動微服務之前,我們需要等待 RabbitMQ 伺服器啟動並開始接受連線。可以使用 wait-port 命令來實作這個功能。

npx wait-port rabbit:5672

接收單一接收者間接訊息

要接收單一接收者間接訊息,需要先連線到 RabbitMQ 伺服器,然後宣告一個訊息佇列。可以使用 assertQueue 方法來宣告一個訊息佇列,如果佇列不存在則會自動建立。

async function main() {
  //...
  await messageChannel.assertQueue("viewed", {});
  //...
}

處理訊息

當訊息到達時,需要使用 consume 方法來處理訊息。可以使用 insertOne 方法將訊息儲存到資料函式庫中。

messageChannel.consume("viewed", (msg) => {
  const parsedMsg = JSON.parse(msg.content.toString());
  historyCollection.insertOne({
    videoPath: parsedMsg.videoPath,
  });
  messageChannel.ack(msg);
});
圖表翻譯:

以下是使用 Mermaid 語法繪製的 RabbitMQ 訊息佇列圖表:

  graph LR
    A[Microservice] -->| send message | B[RabbitMQ]
    B -->| route message | C[Message Queue]
    C -->| deliver message | D[Microservice]

這個圖表展示了微服務之間的訊息傳遞過程,包括傳送訊息、路由訊息和接收訊息。

使用 RabbitMQ 進行微服務間的溝通

在微服務架構中,各個服務之間的溝通是一個非常重要的議題。RabbitMQ 是一個流行的訊息佇列系統,能夠幫助我們實作微服務間的溝通。在本文中,我們將探討如何使用 RabbitMQ 傳送和接收訊息。

接收訊息

首先,我們需要建立一個 RabbitMQ 連線,並宣告一個佇列來接收訊息。以下是示例程式碼:

const amqp = require('amqplib');

async function receiveViewedMessage() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();

  await channel.assertQueue('viewed', { durable: true });

  channel.consume('viewed', (msg) => {
    if (msg!== null) {
      const message = JSON.parse(msg.content.toString());
      console.log(`Received message: ${message.videoPath}`);
      channel.ack(msg);
    }
  });
}

在這個示例中,我們建立了一個 RabbitMQ 連線,並聲明瞭一個名為 viewed 的佇列。然後,我們使用 consume 方法來接收佇列中的訊息。當我們接收到一條訊息時,我們將其解析為 JSON 物件,並記錄影片路徑。

傳送訊息

傳送訊息比接收訊息更簡單。以下是示例程式碼:

async function sendViewedMessage(messageChannel, videoPath) {
  const msg = { videoPath: videoPath };
  const jsonMsg = JSON.stringify(msg);

  messageChannel.publish('', 'viewed', Buffer.from(jsonMsg));
}

在這個示例中,我們建立了一個 JSON 物件,並將其轉換為字串。然後,我們使用 publish 方法來傳送訊息到 viewed 佇列。

使用 RabbitMQ 的優點

使用 RabbitMQ 有幾個優點:

  • 解耦: RabbitMQ 能夠幫助我們解耦微服務之間的依賴關係,使得系統更加靈活和可擴充套件。
  • 可靠性: RabbitMQ 提供了可靠的訊息傳遞機制,能夠確保訊息不會丟失或重複傳遞。
  • 高效能: RabbitMQ 能夠處理大量的訊息傳遞,具有高效能和低延遲的特點。

圖表翻譯:

  graph LR
    A[微服務 A] -->|傳送訊息|> B[RabbitMQ]
    B -->|轉發訊息|> C[微服務 B]
    C -->|處理訊息|> D[記錄影片路徑]

在這個圖表中,我們展示了微服務 A 如何傳送訊息到 RabbitMQ,然後 RabbitMQ 如何轉發訊息到微服務 B。微服務 B 收到訊息後,會處理訊息並記錄影片路徑。

內容解密:

在上面的程式碼中,我們使用了 amqplib 函式庫來連線 RabbitMQ 伺服器,並聲明瞭一個佇列來接收訊息。當我們接收到一條訊息時,我們將其解析為 JSON 物件,並記錄影片路徑。在傳送訊息的程式碼中,我們建立了一個 JSON 物件,並將其轉換為字串。然後,我們使用 publish 方法來傳送訊息到 viewed 佇列。

使用 RabbitMQ 實作間接訊息傳遞

在微服務架構中,間接訊息傳遞是一種常見的溝通方式。RabbitMQ 是一種流行的訊息代理伺服器,提供了高效、可靠的訊息傳遞機制。在本文中,我們將探討如何使用 RabbitMQ 實作間接訊息傳遞。

單一接收者訊息

首先,我們來看一下單一接收者訊息的實作。當一個微服務想要傳送訊息給另一個微服務時,可以使用 RabbitMQ 的佇列(Queue)功能。傳送者將訊息傳送到佇列中,接收者從佇列中取出訊息進行處理。

以下是傳送「Viewed」訊息的範例:

// 定義訊息 payload
const message = {
  videoId: '123',
  userId: '456'
};

// 將訊息轉換為 JSON 格式
const jsonMessage = JSON.stringify(message);

// 傳送訊息到 viewed 佇列
channel.sendToQueue('viewed', Buffer.from(jsonMessage));

在這個範例中,傳送者將「Viewed」訊息傳送到 viewed 佇列中,接收者可以從佇列中取出訊息進行處理。

多接收者訊息

除了單一接收者訊息外,RabbitMQ 還支援多接收者訊息,也就是廣播式訊息傳遞。當一個微服務想要傳送訊息給多個其他微服務時,可以使用 RabbitMQ 的交換器(Exchange)功能。

以下是使用交換器傳送「Viewed」訊息的範例:

// 定義訊息 payload
const message = {
  videoId: '123',
  userId: '456'
};

// 將訊息轉換為 JSON 格式
const jsonMessage = JSON.stringify(message);

// 傳送訊息到 viewed 交換器
channel.publish('viewed', '', Buffer.from(jsonMessage));

在這個範例中,傳送者將「Viewed」訊息傳送到 viewed 交換器中,交換器會將訊息路由到多個匿名佇列中,由多個微服務接收和處理。

圖表翻譯:

以下是使用 Mermaid 語法繪製的圖表,展示了 RabbitMQ 的交換器和佇列之間的關係:

  graph LR
    A[傳送者] -->|傳送訊息|> B[交換器]
    B -->|路由訊息|> C[佇列1]
    B -->|路由訊息|> D[佇列2]
    C -->|處理訊息|> E[接收者1]
    D -->|處理訊息|> F[接收者2]

這個圖表展示了傳送者將訊息傳送到交換器,交換器將訊息路由到多個佇列中,由多個接收者接收和處理。

如何實作訊息廣播給多個微服務

在微服務架構中,實作訊息廣播給多個微服務是一個常見的需求。這可以透過使用訊息佇列(Message Queue)來實作。以下是使用 RabbitMQ 實作訊息廣播的步驟:

步驟 1:建立匿名佇列

每個微服務需要建立一個匿名佇列(Anonymous Queue),這個佇列是唯一的,並且只屬於該微服務。這可以透過 RabbitMQ 的 assertQueue 方法來實作,指定 { exclusive: true } 選項以建立一個唯一的佇列。

步驟 2:繫結佇列到交換器

建立佇列後,需要將佇列繫結到一個交換器(Exchange)。交換器是 RabbitMQ 中的一個重要概念,它負責將訊息路由到不同的佇列。這裡,我們使用的是 fanout 交換器,它會將訊息廣播到所有繫結到它的佇列。

步驟 3:發布訊息到交換器

當需要廣播訊息時,發布者只需要將訊息發布到交換器即可。交換器會自動將訊息路由到所有繫結到它的佇列。

步驟 4:消費者接收訊息

每個微服務都可以從自己的匿名佇列中接收訊息。這可以透過 RabbitMQ 的 consume 方法來實作。

優點

使用 RabbitMQ 實作訊息廣播有以下優點:

  • 解耦: 生產者和消費者之間解耦,生產者不需要知道消費者的存在。
  • 可擴充套件: 可以輕鬆地增加或減少消費者數量,而不需要修改生產者。
  • 容錯: 如果有一個消費者失敗,其他消費者仍然可以接收訊息。

程式碼示例

以下是使用 Node.js 和 RabbitMQ 實作訊息廣播的程式碼示例:

const amqp = require('amqplib');

// 建立連線
async function createConnection() {
  const connection = await amqp.connect('amqp://localhost');
  return connection;
}

// 建立通道
async function createChannel(connection) {
  const channel = await connection.createChannel();
  return channel;
}

// 建立匿名佇列
async function createQueue(channel) {
  const queue = await channel.assertQueue('', { exclusive: true });
  return queue;
}

// 繫結佇列到交換器
async function bindQueue(channel, queue, exchange) {
  await channel.bindQueue(queue.queue, exchange, '');
}

// 發布訊息到交換器
async function publishMessage(channel, exchange, message) {
  await channel.publish(exchange, '', Buffer.from(message));
}

// 消費者接收訊息
async function consumeMessage(channel, queue) {
  await channel.consume(queue.queue, (msg) => {
    console.log(`Received message: ${msg.content.toString()}`);
    channel.ack(msg);
  });
}

// 主函式
async function main() {
  const connection = await createConnection();
  const channel = await createChannel(connection);

  // 建立匿名佇列
  const queue = await createQueue(channel);

  // 繫結佇列到交換器
  const exchange = 'viewed';
  await bindQueue(channel, queue, exchange);

  // 發布訊息到交換器
  const message = 'Hello, world!';
  await publishMessage(channel, exchange, message);

  // 消費者接收訊息
  await consumeMessage(channel, queue);
}

main();

這個程式碼示例展示瞭如何建立匿名佇列,繫結佇列到交換器,發布訊息到交換器,和消費者接收訊息。

RabbitMQ 中的匿名佇列設定

在 RabbitMQ 中,當我們需要建立一個臨時的、不需要持久化的佇列時,可以使用匿名佇列。這種佇列通常用於微服務之間的通訊,尤其是在需要自動管理佇列生命週期的情況下。

設定 exclusive 選項

為了確保當微服務斷開連線時,佇列能夠自動被釋放,我們需要將 exclusive 選項設定為 true。這樣做的好處是避免了記憶體洩漏,因為當微服務不再需要這個佇列時,佇列會被自動刪除。

channel.queue_declare(exclusive=True)

取得匿名佇列名稱

當我們建立一個匿名佇列時,RabbitMQ 會自動為其生成一個唯一的名稱。為了能夠在後續的操作中參照這個佇列,我們需要提取出這個生成的名稱。

queue_name = channel.queue_declare(exclusive=True).method.queue

實際應用場景

在微服務架構中,匿名佇列可以用於實作服務之間的解耦。例如,當一個服務需要向另一個服務傳送訊息時,可以將訊息傳送到一個匿名佇列中。接收端的服務可以從這個佇列中消費訊息,並在處理完訊息後自動確認訊息已被處理,從而實作了訊息的可靠傳遞。

內容解密:

上述程式碼片段展示瞭如何在 RabbitMQ 中建立一個匿名佇列,並提取出其名稱。這個過程涉及到設定 exclusive 選項以確保佇列的自動釋放,以及取得生成的佇列名稱以便後續操作。

  flowchart TD
    A[建立匿名佇列] --> B[設定 exclusive 選項]
    B --> C[取得生成的佇列名稱]
    C --> D[使用佇列名稱進行訊息傳送或接收]

圖表翻譯:

此圖表描述了建立和使用 RabbitMQ 匿名佇列的流程。首先,建立一個匿名佇列,然後設定 exclusive 選項以確保其自動釋放。接著,提取出生成的佇列名稱,以便在後續的訊息傳送或接收操作中使用。這個流程確保了匿名佇列的正確使用和管理,避免了記憶體洩漏和其他潛在問題。

訊息處理流程

在訊息佇列(Queue)與交換器(Exchange)之間建立連結,讓訊息可以從交換器正確地路由到對應的佇列中。這個過程確保了訊息的正確傳遞和處理。

訊息接收

接收來自匿名佇列的訊息,這些訊息可能包含了特定的資料或指令,需要被解析和處理。匿名佇列允許不需要明確宣告就能傳送訊息,這增加了系統的靈活性和可擴充套件性。

JSON 訊息解析

將接收到的 JSON 格式的訊息解析成 JavaScript 物件。這個步驟是將字串形式的 JSON 資料轉換成可在程式中直接操作的物件,方便後續的處理和分析。

錄製歷史

將解析後的檢視或相關資料記錄到歷史資料函式庫中。這個步驟對於追蹤系統的使用歷史、分析使用者行為以及進行後續的最佳化和維護非常重要。

錯誤處理和確認

如果在整個處理流程中沒有發生錯誤,則對於成功接收和處理的訊息進行確認。這通常涉及向訊息傳送方傳送確認訊息,以確保訊息已被成功處理,避免重複處理或遺失訊息。

// 範例程式碼:接收和處理訊息
function processMessage(message) {
  try {
    // 解析 JSON 訊息
    const jsonData = JSON.parse(message);
    
    // 錄製歷史
    recordHistory(jsonData);
    
    // 確認訊息
    acknowledgeMessage(message);
  } catch (error) {
    // 處理錯誤
    handleError(error);
  }
}

// 解析 JSON 訊息並錄製歷史
function recordHistory(data) {
  // 將資料記錄到歷史資料函式庫
  db.record(data);
}

// 確認訊息
function acknowledgeMessage(message) {
  // 向訊息傳送方傳送確認訊息
  sender.acknowledge(message);
}

// 處理錯誤
function handleError(error) {
  // 錄製錯誤日誌並通知管理員
  logError(error);
  notifyAdmin(error);
}

圖表翻譯:

  flowchart TD
    A[接收訊息] --> B[解析 JSON]
    B --> C[錄製歷史]
    C --> D[確認訊息]
    D --> E[錯誤處理]

在這個流程圖中,我們可以看到從接收訊息開始,到解析 JSON、錄製歷史、確認訊息,最後到錯誤處理的完整流程。每一步驟都對應著特定的功能和邏輯,確保了系統的穩定性和可靠性。

使用 RabbitMQ 實作間接訊息傳遞

在分散式系統中,訊息傳遞是一個重要的功能,尤其是在微服務架構中。RabbitMQ 是一個流行的訊息代理伺服器,可以用於實作間接訊息傳遞。在本文中,我們將探討如何使用 RabbitMQ 實作多重接收者訊息傳遞。

傳送多重接收者訊息

傳送多重接收者訊息與傳送單一接收者訊息類別似。以下是 video-streaming 微服務的 index.js 檔案的一個片段(chapter-4/example-4/video-streaming/src/index.js):

async function main() {
  const messageChannel = await messagingConnection.createChannel();

  await messageChannel.assertExchange(
    "viewed",
    "fanout"
  );

  function broadcastViewedMessage(messageChannel, videoPath) {
    const msg = { videoPath: videoPath };
    const jsonMsg = JSON.stringify(msg);

    messageChannel.publish(
      "viewed",
      "",
      Buffer.from(jsonMsg)
    );
  }

  app.get("/video", async (req, res) => {
    //...
    sendViewedMessage(messageChannel, videoPath);
  });
}

在這個片段中,我們首先建立了一個 RabbitMQ 連線,並建立了一個名為 “viewed” 的交換器(exchange)。然後,我們定義了一個 broadcastViewedMessage 函式,用於傳送 “Viewed” 訊息給所有連線到該交換器的接收者。

測試多重接收者訊息

為了測試我們的更新程式碼,我們增加了一個新的微服務:recommendations 微服務。這個微服務只是一個 stub,什麼也不做,只是列印預出它收到的訊息。這足以展示多重接收者訊息傳遞的功能。

以下是 broadcastViewedMessage 函式的詳細實作:

function broadcastViewedMessage(messageChannel, videoPath) {
  const msg = { videoPath: videoPath };
  const jsonMsg = JSON.stringify(msg);

  messageChannel.publish(
    "viewed",
    "",
    Buffer.from(jsonMsg)
  );
}

在這個函式中,我們首先建立了一個包含影片路徑的訊息物件。然後,我們將該物件轉換為 JSON 字串,並使用 publish 函式將其傳送給 “viewed” 交換器。

使用 Mermaid 圖表展示訊息傳遞流程

以下是使用 Mermaid 圖表展示訊息傳遞流程:

  flowchart TD
    A[傳送影片請求] --> B[建立影片路徑訊息]
    B --> C[傳送影片路徑訊息給 "viewed" 交換器]
    C --> D[交換器將訊息轉發給所有連線的接收者]
    D --> E[接收者處理影片路徑訊息]

這個圖表展示了影片請求如何觸發影片路徑訊息的建立和傳送,並且如何將該訊息傳送給 “viewed” 交換器,然後交換器將其轉發給所有連線的接收者。

圖表翻譯:

這個圖表展示了 RabbitMQ 中的間接訊息傳遞流程。當使用者傳送影片請求時,系統建立了一個包含影片路徑的訊息物件,並將其傳送給 “viewed” 交換器。交換器接收到該訊息後,將其轉發給所有連線到該交換器的接收者。每個接收者都可以處理該訊息,並執行相應的業務邏輯。這個流程展示了 RabbitMQ 如何實作間接訊息傳遞和解耦不同的微服務。

5.8間接訊息與微服務之間的溝通

在微服務架構中,間接訊息是一種重要的溝通方式。與直接訊息不同,間接訊息不需要指定特定的接收者,而是將訊息釋出到一個交換器(exchange),然後由多個佇列(queue)接收和處理。

微服務架構的興起,使得服務間的非同步通訊機制越發重要。本文深入探討了RabbitMQ在微服務通訊中的應用,特別是解決啟動順序問題和實作間接訊息傳遞的技巧。透過wait-port機制,有效地規避了微服務啟動時RabbitMQ尚未準備好的問題,確保系統的穩定性。此外,文章詳細闡述瞭如何利用RabbitMQ的交換器和佇列機制,實作單一和多重接收者的間接訊息傳遞,充分展現了RabbitMQ的靈活性與擴充套件性。然而,訊息傳遞機制也存在潛在的訊息丟失和延遲風險,需要完善的監控和錯誤處理機制。展望未來,隨著Serverless和事件驅動架構的普及,RabbitMQ等訊息佇列系統將扮演更關鍵的角色。對於追求高用性和可擴充套件性的微服務架構,深入理解和應用RabbitMQ的進階特性,例如訊息確認、重試機制和死信佇列等,將是提升系統穩定性和效能的關鍵所在。玄貓認為,RabbitMQ作為成熟的訊息佇列解決方案,在微服務架構中具有廣泛的應用前景,值得技術團隊深入研究和應用。