在現代無伺服器應用程式中,有效率的訊息處理至關重要。本文將探討如何利用 AWS Lambda 和 SQS 構建一個訊息處理系統,實作訊息的批次接收、處理和轉發。此係統能從 SQS 輸入佇列批次讀取訊息,經過 Lambda 函式處理後,再批次傳送到 SQS 輸出佇列,提升整體訊息處理效率。過程中會詳細說明 Java Lambda 函式的程式碼撰寫、SQS 相關操作的介面設計與實作,以及如何使用 AWS CLI 進行 Lambda 函式的佈署、設定和測試。此外,還會示範如何設定必要的 IAM Role 和 Policy,確保 Lambda 函式擁有存取 SQS 的許可權,並提供使用 CloudFormation 建立 SNS 主題的範例,方便讀者擴充應用。
使用 SQS 和 SNS 進行訊息傳遞與通知(第六章)
在無伺服器架構中,訊息傳遞與通知是非常重要的一環。Amazon SQS(Simple Queue Service)和 SNS(Simple Notification Service)是 AWS 提供的兩種訊息處理服務。本章將介紹如何使用 Java Lambda 與 SQS 進行訊息的接收、處理和轉發。
建立 Java Lambda 專案
首先,我們需要建立一個 Java Lambda 專案。專案結構如下:
Request.java:對應輸入的 JSON 物件,包含輸入佇列 URL、輸出佇列 URL、最大接收訊息數和延遲時間等屬性。Response.java:包含回傳給呼叫者的訊息。SqsService.java:定義 SQS 相關操作的介面。SqsServiceImpl.java:實作 SQS 相關操作的類別。LambdaSqsSdkReceiveSendBatchHandler.java:Lambda 的處理函式類別。
Request.java
@Data
public class Request {
private String inputQueueURL;
private String outputQueueURL;
private int maxMessagesToReceive;
private int delay;
}
Response.java
@Data
@AllArgsConstructor
public class Response {
private String message;
}
SQS 服務介面與實作
SqsService.java
public interface SqsService {
Response sendMessage(Request request, LambdaLogger logger);
}
SqsServiceImpl.java
在 SqsServiceImpl.java 中,我們實作了從輸入佇列接收訊息、將訊息轉發到輸出佇列以及刪除已處理訊息的功能。
final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(request.getInputQueueURL())
.withMaxNumberOfMessages(request.getMaxMessagesToReceive());
final List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest).getMessages();
Collection<SendMessageBatchRequestEntry> entries = new ArrayList<>();
int idVal = 1;
for (Message m : messages) {
logger.log("Adding message: " + m.getBody());
entries.add(new SendMessageBatchRequestEntry("id_" + idVal, m.getBody()).withDelaySeconds(request.getDelay()));
idVal++;
}
final SendMessageBatchRequest sendBatchRequest = new SendMessageBatchRequest()
.withQueueUrl(request.getOutputQueueURL())
.withEntries(entries);
this.sqsClient.sendMessageBatch(sendBatchRequest);
for (Message m : messages) {
this.sqsClient.deleteMessage(request.getInputQueueURL(), m.getReceiptHandle());
}
LambdaSqsSdkReceiveSendBatchHandler.java
在 Lambda 處理函式中,我們初始化了 SQS 客戶端並呼叫了 SqsService 的方法。
private final AmazonSQS sqsClient;
public LambdaSqsSdkReceiveSendBatchHandler() {
this.sqsClient = AmazonSQSClientBuilder.standard()
.withRegion(System.getenv("AWS_REGION"))
.build();
}
public Response handleRequest(final Request request, final Context context) {
final SqsService sqsService = new SqsServiceImpl(this.sqsClient);
return sqsService.sendMessage(request, context.getLogger());
}
設定佇列和資料
在佈署 Lambda 之前,我們需要建立輸入和輸出佇列,並向輸入佇列傳送一些測試訊息。
- 建立兩個 SQS 佇列:
my-input-queue和my-output-queue。 - 向
my-input-queue傳送 6 到 7 條訊息。
佈署和測試 Lambda(使用 AWS CLI)
- 編譯 Lambda 專案並產生 Uber JAR。
- 將 Uber JAR 上傳到 S3。
- 建立具有適當信任關係的 IAM Role。
- 建立具有必要 SQS 許可權的 IAM Policy 並將其附加到 Role。
- 使用 AWS CLI 建立 Lambda 函式。
- 使用測試 payload 呼叫 Lambda 函式。
建立 IAM Role 和 Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:SendMessageBatch",
"sqs:DeleteMessage"
],
"Resource": [
"arn:aws:sqs:*:*:*"
]
}
]
}
建立和呼叫 Lambda 函式
aws lambda create-function \
--function-name lambda-sqs-sdk-receive-send-batch \
--runtime java8 \
--role arn:aws:iam::<account id>:role/lambda-sqs-sdk-receive-send-batch-role \
--handler tech.heartin.books.serverlesscookbook.LambdaSqsSdkReceiveSendBatchHandler::handleRequest \
--code S3Bucket=serverless-cookbook,S3Key=lambda-sqs-sdk-receive-send-batch-0.0.1-SNAPSHOT.jar \
--timeout 15 \
--memory-size 512 \
--region us-east-1 \
--profile admin
aws lambda invoke \
--invocation-type RequestResponse \
--function-name lambda-sqs-sdk-receive-send-batch \
--log-type Tail \
--payload file://payload.json \
--region us-east-1 \
--profile admin \
outputfile.txt
Payload JSON 示例:
{
"inputQueueURL": "https://queue.amazonaws.com/855923912133/my-input-queue",
"outputQueueURL": "https://queue.amazonaws.com/855923912133/my-output-queue",
"maxMessagesToReceive": 5,
"delay": 10
}
使用 SQS 和 SNS 進行訊息傳遞與通知(第 6 章節)
使用 AWS Lambda 與 SQS 進行訊息批次處理
在前面的章節中,我們已經探討瞭如何使用 AWS Lambda 進行伺服器less 運算。本章節將重點放在如何使用 AWS Lambda 與 Amazon Simple Queue Service(SQS)進行訊息傳遞與批次處理。
如何使用 AWS Lambda 與 SQS 進行訊息批次處理
首先,我們需要建立一個 Java Lambda 專案,並在 pom.xml 檔案中加入 aws-java-sdk-sqs 相依性:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
接下來,建立一個 SqsService 介面,用於定義處理 SQS 事件的方法:
public interface SqsService {
Boolean processEvent(SQSEvent event, String outputQueueURL, LambdaLogger logger);
}
然後,建立一個 SqsServiceImpl 類別,實作 SqsService 介面。在這個類別中,我們將從 SQS 事件中擷取訊息,並建立一個 SendMessageBatchRequestEntry 物件集合:
Collection<SendMessageBatchRequestEntry> entries = new ArrayList<>();
int idVal = 1;
for (SQSMessage m : event.getRecords()) {
logger.log("Adding message: " + m.getBody());
entries.add(new SendMessageBatchRequestEntry("id_" + idVal, m.getBody()));
idVal++;
}
將訊息批次傳送到輸出佇列
建立一個 SendMessageBatchRequest 物件,並將訊息批次傳送到輸出佇列:
final SendMessageBatchRequest sendBatchRequest = new SendMessageBatchRequest()
.withQueueUrl(request.getOutputQueueURL())
.withEntries(entries);
this.sqsClient.sendMessageBatch(sendBatchRequest);
Lambda 處理程式
建立一個 LambdaSqsEventHandler 類別,作為 Lambda 處理程式。在這個類別中,我們將初始化 SQS 客戶端,並將其傳遞給 SqsService 類別:
private final AmazonSQS sqsClient;
public LambdaSqsSdkReceiveSendBatchHandler() {
this.sqsClient = AmazonSQSClientBuilder.standard()
.withRegion(System.getenv("AWS_REGION"))
.build();
}
然後,呼叫 SqsService 類別的 processEvent 方法,處理 SQS 事件:
public Boolean handleRequest(final SQSEvent sqsEvent, final Context context) {
// ...
}
建立 SQS 佇列和 Lambda 函式
使用 AWS CLI 建立兩個 SQS 佇列,分別作為輸入佇列和輸出佇列。然後,建立一個 Lambda 函式,並將其設定為在輸入佇列接收到訊息時觸發。
#### 內容解密:
上述程式碼和步驟展示瞭如何使用 AWS Lambda 和 SQS 進行訊息批次處理。首先,我們建立了一個 Java Lambda 專案,並加入了必要的相依性。然後,我們定義了一個 SqsService 介面和其實作類別,用於處理 SQS 事件。接下來,我們建立了一個 LambdaSqsEventHandler 類別,作為 Lambda 處理程式。最後,我們使用 AWS CLI 建立了 SQS 佇列和 Lambda 函式,並設定了觸發器。
使用 AWS CLI 驗證結果
使用 AWS CLI 命令來驗證 Lambda 函式是否正確地將訊息批次傳送到輸出佇列:
aws sqs receive-message \
--queue-url https://queue.amazonaws.com/855923912133/my-output-queue \
--max-number-of-messages 7 \
--profile admin
如果成功,您應該會收到最多 7 個訊息。
未來趨勢與實務應用評估
隨著無伺服器運算的普及,使用 AWS Lambda 和 SQS 進行訊息批次處理將變得越來越常見。這種架構可以提供高度的擴充套件性和可靠性,並且可以減少營運成本。因此,學習如何使用 AWS Lambda 和 SQS 將對開發人員和企業有所裨益。
圖表說明
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 圖表說明
rectangle "觸發" as node1
rectangle "接收訊息" as node2
rectangle "處理訊息" as node3
rectangle "傳送訊息" as node4
node1 --> node2
node2 --> node3
node3 --> node4
@enduml此圖示展示了使用 AWS Lambda 和 SQS 進行訊息批次處理的流程。Lambda 函式在接收到 SQS 輸入佇列的訊息時觸發,然後由 SqsService 處理訊息,最後將訊息傳送到 SQS 輸出佇列,下游服務可以從輸出佇列接收訊息。
使用 SQS 和 SNS 進行訊息傳遞和通知(第六章)
設定 SQS 事件處理的 Lambda 函式
在這個章節中,我們將學習如何建立一個能夠處理 SQS 事件的 Lambda 函式。這個 Lambda 函式將從一個輸入佇列中接收訊息,並將它們轉發到一個輸出佇列。
Lambda 函式程式碼
context.getLogger().log("Received SQS event: " + sqsEvent);
final SqsService sqsService = new SqsServiceImpl(this.sqsClient);
return sqsService.processEvent(sqsEvent,
System.getenv("SPC_OUTPUT_QUEUE_URL"), context.getLogger());
內容解密:
- 接收 SQS 事件:Lambda 函式被觸發時,會接收到一個 SQS 事件,包含多個訊息。
- 初始化 SQS 服務:使用
SqsServiceImpl類別初始化一個 SQS 服務例項,傳入sqsClient物件。 - 處理 SQS 事件:呼叫
processEvent方法處理 SQS 事件,將訊息轉發到輸出佇列。 - 取得輸出佇列 URL:從環境變數
SPC_OUTPUT_QUEUE_URL中取得輸出佇列的 URL。
設定佇列和資料
在呼叫 Lambda 函式之前,我們需要建立輸入和輸出佇列,並將訊息傳送到輸入佇列。
步驟:
- 建立兩個 SQS 佇列:一個輸入佇列(my-input-queue)和一個輸出佇列(my-output-queue)。
- 傳送訊息到輸入佇列:使用 CLI 將六到七個訊息傳送到輸入佇列。
使用 AWS CLI 組態 Lambda 函式
步驟:
- 編譯和封裝 Lambda 函式:執行
mvn clean package命令建立 Uber JAR 檔案。 - 上傳 Uber JAR 到 S3:使用
aws s3 cp命令將 JAR 檔案上傳到 S3。 - 建立 Lambda 執行角色:使用
aws iam create-role命令建立一個具有適當信任關係的角色。 - 建立和附加必要政策:建立具有必要 SQS 許可權的政策,並將其附加到角色。
- 建立 Lambda 函式:使用
aws lambda create-function命令建立 Lambda 函式,指定角色和程式碼位置。 - 組態 SQS 事件來源:使用
aws lambda create-event-source-mapping命令組態 SQS 事件來源。
Lambda 函式組態範例:
aws lambda create-function \
--function-name lambda-invoke-sqs-event \
--runtime java8 \
--role arn:aws:iam::855923912133:role/lambda-invoke-sqs-event-role \
--handler tech.heartin.books.serverlesscookbook.LambdaSqsEventHandler::handleRequest \
--code S3Bucket=serverless-cookbook,S3Key=lambda-invoke-sqs-event-0.0.1-SNAPSHOT.jar \
--environment Variables={SPC_OUTPUT_QUEUE_URL='https://queue.amazonaws.com/855923912133/my-output-queue'} \
--timeout 15 \
--memory-size 512 \
--region us-east-1 \
--profile admin
內容解密:
create-function命令:用於建立 Lambda 函式。--function-name引數:指定 Lambda 函式的名稱。--runtime引數:指定執行環境為 Java 8。--role引數:指定 Lambda 函式的執行角色。--handler引數:指定處理請求的方法。--code引數:指定程式碼的位置(S3 Bucket 和 Key)。--environment引數:設定環境變數。
使用 AWS CLI 測試 Lambda 函式
步驟:
- 傳送訊息到輸入佇列:使用 CLI 將五個訊息傳送到輸入佇列。
- 驗證輸出佇列中的訊息:使用
aws sqs receive-message命令從輸出佇列中接收訊息。
使用 CloudFormation 建立 SNS 主題
範例範本:
Resources:
SNSTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName: 'My first SNS topic'
TopicName: my-first-sns-topic-cf
Output:
SNSTopicARN:
Value: !Ref SNSTopic
Description: The ARN of the SNS topic
內容解密:
Resources部分:定義了要建立的 AWS 資源。SNSTopic資源:定義了一個 SNS 主題資源。Type屬性:指定資源型別為AWS::SNS::Topic。Properties屬性:指定了 SNS 主題的屬性,如顯示名稱和主題名稱。Output部分:定義了範本的輸出值。SNSTopicARN輸出值:輸出了 SNS 主題的 ARN。