在現代無伺服器應用程式中,有效率的訊息處理至關重要。本文將探討如何利用 AWS Lambda 和 SQS 構建一個訊息處理系統,實作訊息的批次接收、處理和轉發。此係統能從 SQS 輸入佇列批次讀取訊息,經過 Lambda 函式處理後,再批次傳送到 SQS 輸出佇列,提升整體訊息處理效率。過程中會詳細說明 Java Lambda 函式的程式碼撰寫、SQS 相關操作的介面設計與實作,以及如何使用 AWS CLI 進行 Lambda 函式的佈署、設定和測試。此外,還會示範如何設定必要的 IAM Role 和 Policy,確保 Lambda 函式擁有存取 SQS 的許可權,並提供使用 CloudFormation 建立 SNS 主題的範例,方便讀者擴充應用。

使用 SQS 和 SNS 進行訊息傳遞與通知(第六章)

在無伺服器架構中,訊息傳遞與通知是非常重要的一環。Amazon SQS(Simple Queue Service)和 SNS(Simple Notification Service)是 AWS 提供的兩種訊息處理服務。本章將介紹如何使用 Java Lambda 與 SQS 進行訊息的接收、處理和轉發。

建立 Java Lambda 專案

首先,我們需要建立一個 Java Lambda 專案。專案結構如下:

  • Request.java:對應輸入的 JSON 物件,包含輸入佇列 URL、輸出佇列 URL、最大接收訊息數和延遲時間等屬性。
  • Response.java:包含回傳給呼叫者的訊息。
  • SqsService.java:定義 SQS 相關操作的介面。
  • SqsServiceImpl.java:實作 SQS 相關操作的類別。
  • LambdaSqsSdkReceiveSendBatchHandler.java:Lambda 的處理函式類別。

Request.java

@Data
public class Request {
    private String inputQueueURL;
    private String outputQueueURL;
    private int maxMessagesToReceive;
    private int delay;
}

Response.java

@Data
@AllArgsConstructor
public class Response {
    private String message;
}

SQS 服務介面與實作

SqsService.java

public interface SqsService {
    Response sendMessage(Request request, LambdaLogger logger);
}

SqsServiceImpl.java

SqsServiceImpl.java 中,我們實作了從輸入佇列接收訊息、將訊息轉發到輸出佇列以及刪除已處理訊息的功能。

final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
    .withQueueUrl(request.getInputQueueURL())
    .withMaxNumberOfMessages(request.getMaxMessagesToReceive());
final List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest).getMessages();

Collection<SendMessageBatchRequestEntry> entries = new ArrayList<>();
int idVal = 1;
for (Message m : messages) {
    logger.log("Adding message: " + m.getBody());
    entries.add(new SendMessageBatchRequestEntry("id_" + idVal, m.getBody()).withDelaySeconds(request.getDelay()));
    idVal++;
}
final SendMessageBatchRequest sendBatchRequest = new SendMessageBatchRequest()
    .withQueueUrl(request.getOutputQueueURL())
    .withEntries(entries);
this.sqsClient.sendMessageBatch(sendBatchRequest);

for (Message m : messages) {
    this.sqsClient.deleteMessage(request.getInputQueueURL(), m.getReceiptHandle());
}

LambdaSqsSdkReceiveSendBatchHandler.java

在 Lambda 處理函式中,我們初始化了 SQS 客戶端並呼叫了 SqsService 的方法。

private final AmazonSQS sqsClient;

public LambdaSqsSdkReceiveSendBatchHandler() {
    this.sqsClient = AmazonSQSClientBuilder.standard()
        .withRegion(System.getenv("AWS_REGION"))
        .build();
}

public Response handleRequest(final Request request, final Context context) {
    final SqsService sqsService = new SqsServiceImpl(this.sqsClient);
    return sqsService.sendMessage(request, context.getLogger());
}

設定佇列和資料

在佈署 Lambda 之前,我們需要建立輸入和輸出佇列,並向輸入佇列傳送一些測試訊息。

  1. 建立兩個 SQS 佇列:my-input-queuemy-output-queue
  2. my-input-queue 傳送 6 到 7 條訊息。

佈署和測試 Lambda(使用 AWS CLI)

  1. 編譯 Lambda 專案並產生 Uber JAR。
  2. 將 Uber JAR 上傳到 S3。
  3. 建立具有適當信任關係的 IAM Role。
  4. 建立具有必要 SQS 許可權的 IAM Policy 並將其附加到 Role。
  5. 使用 AWS CLI 建立 Lambda 函式。
  6. 使用測試 payload 呼叫 Lambda 函式。

建立 IAM Role 和 Policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:SendMessageBatch",
                "sqs:DeleteMessage"
            ],
            "Resource": [
                "arn:aws:sqs:*:*:*"
            ]
        }
    ]
}

建立和呼叫 Lambda 函式

aws lambda create-function \
    --function-name lambda-sqs-sdk-receive-send-batch \
    --runtime java8 \
    --role arn:aws:iam::<account id>:role/lambda-sqs-sdk-receive-send-batch-role \
    --handler tech.heartin.books.serverlesscookbook.LambdaSqsSdkReceiveSendBatchHandler::handleRequest \
    --code S3Bucket=serverless-cookbook,S3Key=lambda-sqs-sdk-receive-send-batch-0.0.1-SNAPSHOT.jar \
    --timeout 15 \
    --memory-size 512 \
    --region us-east-1 \
    --profile admin

aws lambda invoke \
    --invocation-type RequestResponse \
    --function-name lambda-sqs-sdk-receive-send-batch \
    --log-type Tail \
    --payload file://payload.json \
    --region us-east-1 \
    --profile admin \
    outputfile.txt

Payload JSON 示例:

{
    "inputQueueURL": "https://queue.amazonaws.com/855923912133/my-input-queue",
    "outputQueueURL": "https://queue.amazonaws.com/855923912133/my-output-queue",
    "maxMessagesToReceive": 5,
    "delay": 10
}

使用 SQS 和 SNS 進行訊息傳遞與通知(第 6 章節)

使用 AWS Lambda 與 SQS 進行訊息批次處理

在前面的章節中,我們已經探討瞭如何使用 AWS Lambda 進行伺服器less 運算。本章節將重點放在如何使用 AWS Lambda 與 Amazon Simple Queue Service(SQS)進行訊息傳遞與批次處理。

如何使用 AWS Lambda 與 SQS 進行訊息批次處理

首先,我們需要建立一個 Java Lambda 專案,並在 pom.xml 檔案中加入 aws-java-sdk-sqs 相依性:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-sqs</artifactId>
    <version>${aws.sdk.version}</version>
</dependency>

接下來,建立一個 SqsService 介面,用於定義處理 SQS 事件的方法:

public interface SqsService {
    Boolean processEvent(SQSEvent event, String outputQueueURL, LambdaLogger logger);
}

然後,建立一個 SqsServiceImpl 類別,實作 SqsService 介面。在這個類別中,我們將從 SQS 事件中擷取訊息,並建立一個 SendMessageBatchRequestEntry 物件集合:

Collection<SendMessageBatchRequestEntry> entries = new ArrayList<>();
int idVal = 1;
for (SQSMessage m : event.getRecords()) {
    logger.log("Adding message: " + m.getBody());
    entries.add(new SendMessageBatchRequestEntry("id_" + idVal, m.getBody()));
    idVal++;
}

將訊息批次傳送到輸出佇列

建立一個 SendMessageBatchRequest 物件,並將訊息批次傳送到輸出佇列:

final SendMessageBatchRequest sendBatchRequest = new SendMessageBatchRequest()
        .withQueueUrl(request.getOutputQueueURL())
        .withEntries(entries);
this.sqsClient.sendMessageBatch(sendBatchRequest);

Lambda 處理程式

建立一個 LambdaSqsEventHandler 類別,作為 Lambda 處理程式。在這個類別中,我們將初始化 SQS 客戶端,並將其傳遞給 SqsService 類別:

private final AmazonSQS sqsClient;
public LambdaSqsSdkReceiveSendBatchHandler() {
    this.sqsClient = AmazonSQSClientBuilder.standard()
            .withRegion(System.getenv("AWS_REGION"))
            .build();
}

然後,呼叫 SqsService 類別的 processEvent 方法,處理 SQS 事件:

public Boolean handleRequest(final SQSEvent sqsEvent, final Context context) {
    // ...
}

建立 SQS 佇列和 Lambda 函式

使用 AWS CLI 建立兩個 SQS 佇列,分別作為輸入佇列和輸出佇列。然後,建立一個 Lambda 函式,並將其設定為在輸入佇列接收到訊息時觸發。

#### 內容解密:

上述程式碼和步驟展示瞭如何使用 AWS Lambda 和 SQS 進行訊息批次處理。首先,我們建立了一個 Java Lambda 專案,並加入了必要的相依性。然後,我們定義了一個 SqsService 介面和其實作類別,用於處理 SQS 事件。接下來,我們建立了一個 LambdaSqsEventHandler 類別,作為 Lambda 處理程式。最後,我們使用 AWS CLI 建立了 SQS 佇列和 Lambda 函式,並設定了觸發器。

使用 AWS CLI 驗證結果

使用 AWS CLI 命令來驗證 Lambda 函式是否正確地將訊息批次傳送到輸出佇列:

aws sqs receive-message \
    --queue-url https://queue.amazonaws.com/855923912133/my-output-queue \
    --max-number-of-messages 7 \
    --profile admin

如果成功,您應該會收到最多 7 個訊息。

未來趨勢與實務應用評估

隨著無伺服器運算的普及,使用 AWS Lambda 和 SQS 進行訊息批次處理將變得越來越常見。這種架構可以提供高度的擴充套件性和可靠性,並且可以減少營運成本。因此,學習如何使用 AWS Lambda 和 SQS 將對開發人員和企業有所裨益。

圖表說明

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 圖表說明

rectangle "觸發" as node1
rectangle "接收訊息" as node2
rectangle "處理訊息" as node3
rectangle "傳送訊息" as node4

node1 --> node2
node2 --> node3
node3 --> node4

@enduml

此圖示展示了使用 AWS Lambda 和 SQS 進行訊息批次處理的流程。Lambda 函式在接收到 SQS 輸入佇列的訊息時觸發,然後由 SqsService 處理訊息,最後將訊息傳送到 SQS 輸出佇列,下游服務可以從輸出佇列接收訊息。

使用 SQS 和 SNS 進行訊息傳遞和通知(第六章)

設定 SQS 事件處理的 Lambda 函式

在這個章節中,我們將學習如何建立一個能夠處理 SQS 事件的 Lambda 函式。這個 Lambda 函式將從一個輸入佇列中接收訊息,並將它們轉發到一個輸出佇列。

Lambda 函式程式碼

context.getLogger().log("Received SQS event: " + sqsEvent);
final SqsService sqsService = new SqsServiceImpl(this.sqsClient);
return sqsService.processEvent(sqsEvent,
System.getenv("SPC_OUTPUT_QUEUE_URL"), context.getLogger());

內容解密:

  1. 接收 SQS 事件:Lambda 函式被觸發時,會接收到一個 SQS 事件,包含多個訊息。
  2. 初始化 SQS 服務:使用 SqsServiceImpl 類別初始化一個 SQS 服務例項,傳入 sqsClient 物件。
  3. 處理 SQS 事件:呼叫 processEvent 方法處理 SQS 事件,將訊息轉發到輸出佇列。
  4. 取得輸出佇列 URL:從環境變數 SPC_OUTPUT_QUEUE_URL 中取得輸出佇列的 URL。

設定佇列和資料

在呼叫 Lambda 函式之前,我們需要建立輸入和輸出佇列,並將訊息傳送到輸入佇列。

步驟:

  1. 建立兩個 SQS 佇列:一個輸入佇列(my-input-queue)和一個輸出佇列(my-output-queue)。
  2. 傳送訊息到輸入佇列:使用 CLI 將六到七個訊息傳送到輸入佇列。

使用 AWS CLI 組態 Lambda 函式

步驟:

  1. 編譯和封裝 Lambda 函式:執行 mvn clean package 命令建立 Uber JAR 檔案。
  2. 上傳 Uber JAR 到 S3:使用 aws s3 cp 命令將 JAR 檔案上傳到 S3。
  3. 建立 Lambda 執行角色:使用 aws iam create-role 命令建立一個具有適當信任關係的角色。
  4. 建立和附加必要政策:建立具有必要 SQS 許可權的政策,並將其附加到角色。
  5. 建立 Lambda 函式:使用 aws lambda create-function 命令建立 Lambda 函式,指定角色和程式碼位置。
  6. 組態 SQS 事件來源:使用 aws lambda create-event-source-mapping 命令組態 SQS 事件來源。

Lambda 函式組態範例:

aws lambda create-function \
--function-name lambda-invoke-sqs-event \
--runtime java8 \
--role arn:aws:iam::855923912133:role/lambda-invoke-sqs-event-role \
--handler tech.heartin.books.serverlesscookbook.LambdaSqsEventHandler::handleRequest \
--code S3Bucket=serverless-cookbook,S3Key=lambda-invoke-sqs-event-0.0.1-SNAPSHOT.jar \
--environment Variables={SPC_OUTPUT_QUEUE_URL='https://queue.amazonaws.com/855923912133/my-output-queue'} \
--timeout 15 \
--memory-size 512 \
--region us-east-1 \
--profile admin

內容解密:

  1. create-function 命令:用於建立 Lambda 函式。
  2. --function-name 引數:指定 Lambda 函式的名稱。
  3. --runtime 引數:指定執行環境為 Java 8。
  4. --role 引數:指定 Lambda 函式的執行角色。
  5. --handler 引數:指定處理請求的方法。
  6. --code 引數:指定程式碼的位置(S3 Bucket 和 Key)。
  7. --environment 引數:設定環境變數。

使用 AWS CLI 測試 Lambda 函式

步驟:

  1. 傳送訊息到輸入佇列:使用 CLI 將五個訊息傳送到輸入佇列。
  2. 驗證輸出佇列中的訊息:使用 aws sqs receive-message 命令從輸出佇列中接收訊息。

使用 CloudFormation 建立 SNS 主題

範例範本:

Resources:
  SNSTopic:
    Type: AWS::SNS::Topic
    Properties:
      DisplayName: 'My first SNS topic'
      TopicName: my-first-sns-topic-cf

Output:
  SNSTopicARN:
    Value: !Ref SNSTopic
    Description: The ARN of the SNS topic

內容解密:

  1. Resources 部分:定義了要建立的 AWS 資源。
  2. SNSTopic 資源:定義了一個 SNS 主題資源。
  3. Type 屬性:指定資源型別為 AWS::SNS::Topic
  4. Properties 屬性:指定了 SNS 主題的屬性,如顯示名稱和主題名稱。
  5. Output 部分:定義了範本的輸出值。
  6. SNSTopicARN 輸出值:輸出了 SNS 主題的 ARN。