串流資料應用日益普及,從 IoT 裝置到社群媒體,都需要即時處理大量資料。AWS 提供了 Kinesis 和 MSK 兩種服務來應對此需求。Kinesis 以其無伺服器的特性簡化了組態和管理,適合快速上手的應用場景。MSK 則根據開源的 Kafka,提供更豐富的組態選項和社群支援,適合需要深度客製化的進階使用者。選擇哪種服務取決於團隊的技術能力和專案需求。除了串流處理,文章也介紹瞭如何使用 AWS DMS 將資料從 MySQL 遷移到 S3,並利用 Athena 進行查詢分析,形成完整的資料處理流程。
串流資料擷取
串流資料來源
越來越多的分析型專案依賴持續生成的串流資料,並需要近乎即時地擷取。常見的串流資料來源包括:
- IoT裝置資料(如智慧手錶、智慧家電等)
- 各類別車輛的遙測資料(汽車、飛機等)
- 感測器資料(製造機器、氣象站等)
- 手機遊戲的即時遊戲資料
- 社群媒體上對公司品牌的提及
例如,飛機製造商波音有一套名為Airplane Health Management(AHM)的系統,可以收集飛行中的飛機資料並即時傳送到波音系統。波音處理這些資訊並透過網頁入口網站立即提供給航空公司的維修人員。
Amazon Kinesis與Amazon Managed Streaming for Kafka (MSK)的比較
AWS內用於擷取串流資料的主要服務是Amazon Kinesis和Amazon MSK。這兩項服務在第3章「The AWS Engineer’s Toolkit」中有詳細介紹,請務必先閱讀相關章節。
簡而言之,Amazon Kinesis和Amazon MSK都是AWS提供的pub-sub訊息處理服務。生產者建立訊息並寫入串流服務(Kinesis或MSK),消費者訂閱並接收來自服務的訊息。這種模式常用於解耦生產串流資料的應用程式和消費資料的應用程式。兩者皆可擴充套件以處理每秒數百萬條訊息。
本文將探討這兩項服務的主要差異,並分析決定選擇哪項服務的因素。
無伺服器服務與受管服務
Amazon Kinesis是一種無伺服器服務,這意味著使用者無需關心、管理和了解執行該服務的底層伺服器。例如,使用Kinesis Data Streams時,使用者組態串流的分片數量,AWS自動組態所需的運算基礎設施(Kinesis中的分片是Kinesis資料串流的基本吞吐量單位)。使用Amazon Kinesis Data Firehose時,甚至無需指定要組態的分片數量,因為Kinesis Data Firehose會根據訊息吞吐量的變化自動擴充套件和縮減,無需任何組態。
Amazon MSK則是一種受管服務,這意味著AWS管理基礎設施,但使用者仍需瞭解和決定底層運算基礎設施和軟體。例如,使用者需要從例項型別列表中選擇以驅動MSK叢集,組態VPC網路設定,並微調一系列Kafka組態設定。使用者還需要選擇要使用的Kafka版本。
作為無伺服器服務,Kinesis比Amazon MSK更快、更容易設定和組態。然而,Amazon MSK提供了更多的選項來組態和微調底層軟體。
如果您的團隊具備使用Apache Kafka的經驗,並需要微調串流效能,那麼MSK可能是更好的選擇。如果您剛開始使用串流,且您的應用場景不需要微調效能,那麼Amazon Kinesis可能更合適。
開放原始碼彈性與具強大AWS整合的專有軟體
Amazon MSK是Apache Kafka的受管版本,Apache Kafka是一種流行的開放原始碼解決方案。Amazon Kinesis是AWS開發的專有軟體,但有一些有限的開放原始碼元素,例如Kinesis Agent。
內容解密:
本段落主要比較了Amazon Kinesis和Amazon MSK兩種AWS服務在處理串流資料時的差異。Amazon Kinesis是無伺服器服務,提供更簡單快速的設定,但彈性較低;而Amazon MSK則提供更多的組態選項,但需要更多專業知識和管理。選擇哪種服務取決於團隊的技術背景和具體需求。開放原始碼的MSK適合具備Kafka經驗的團隊,而專有的Kinesis則適合初學者或不需要高度自定義的場景。兩者的核心功能都是提供pub-sub訊息處理,能夠擴充套件處理大量訊息。
資料串流處理:Apache Kafka 與 Amazon Kinesis 的比較
在處理串流資料時,選擇適當的工具至關重要。Apache Kafka 和 Amazon Kinesis 是兩個流行的選項,它們各有其優缺點。本篇文章將探討這兩者的差異,並提供實務上的考量。
整合能力
Apache Kafka 擁有龐大的社群和豐富的生態系統,提供數百種事件來源和事件接收器的整合,包括 AWS 服務如 Amazon S3,以及其他流行產品如 PostgreSQL 和 Elasticsearch。
Amazon Kinesis 也提供與多個 AWS 服務的整合,如 Amazon S3、Amazon Redshift 和 Amazon Elasticsearch。此外,Kinesis Data Firehose 支援與一些外部服務如 Splunk 和 DataDog 的整合。
程式碼範例:Kafka 與 Kinesis 的整合設定
// Kafka 生產者範例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my_topic", "key", "value"));
內容解密:
- Kafka 生產者設定:在這段程式碼中,我們首先建立了一個
Properties物件,用於設定 Kafka 生產者的屬性。我們指定了bootstrap.servers為localhost:9092,表示 Kafka 伺服器執行在本機的 9092 連線埠上。 - 序列化設定:我們設定了
key.serializer和value.serializer為字串序列化器,這意味著我們將傳送的字串作為訊息的鍵和值。 - 建立生產者:使用設定的屬性,建立了一個
KafkaProducer物件。 - 傳送訊息:透過
producer.send()方法,傳送了一條訊息到名為my_topic的主題中。
訊息處理保證
Amazon Kinesis 提供至少一次(at-least-once)的訊息處理保證,這意味著每個訊息至少會被傳送到消費者一次。然而,在某些情況下,訊息可能會被重複傳送。
Apache Kafka(以及 Amazon MSK)從 0.11 版本開始支援恰好一次(exactly-once)的訊息處理保證。透過設定 processing.guarantee=exactly_once,可以確保訊息只被處理一次。
單一處理引擎 vs. 特殊工具
Apache Kafka 和 Amazon Kinesis Data Streams 都提供了強大的串流訊息處理能力。然而,Amazon Kinesis 還提供了多個子服務,以滿足特定的使用場景,如 Kinesis Video Streams 用於處理視訊和音訊資料,Kinesis Data Firehose 用於簡化串流資料的寫入操作。
選擇串流處理工具
在選擇 AWS 服務進行串流資料處理時,需要考慮多個因素。Amazon Kinesis 需要較少的初始組態和維護任務,並且提供了多個子服務以滿足特殊使用場景。如果您的使用場景需要恰好一次的訊息傳遞、精細的效能調校,或是與第三方產品的整合,那麼 Amazon MSK 可能是一個更好的選擇。
實務操作:使用 AWS DMS 進行資料擷取
本章節將指導您如何使用 AWS DMS 從資料函式庫中擷取資料並存入 Amazon S3。
建立 MySQL 資料函式庫例項
- 登入 AWS 管理控制檯。
- 在頂部搜尋欄中搜尋並選擇 RDS。
- 在左側選單中點選「資料函式庫」。
- 點選「建立資料函式庫」。
- 選擇「簡單建立」並選取 MySQL 資料函式庫引擎。
- 設定資料函式庫例項大小為免費層(db.t2.micro)。
- 提供資料函式庫例項識別碼和主密碼。
建立 MySQL 資料函式庫流程
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title 串流資料擷取與處理比較 Kinesis 與 Kafka
package "AWS 雲端架構" {
package "網路層" {
component [VPC] as vpc
component [Subnet] as subnet
component [Security Group] as sg
component [Route Table] as rt
}
package "運算層" {
component [EC2] as ec2
component [Lambda] as lambda
component [ECS/EKS] as container
}
package "儲存層" {
database [RDS] as rds
database [DynamoDB] as dynamo
storage [S3] as s3
}
package "服務層" {
component [API Gateway] as apigw
component [ALB/NLB] as lb
queue [SQS] as sqs
}
}
apigw --> lambda
apigw --> lb
lb --> ec2
lb --> container
lambda --> dynamo
lambda --> s3
ec2 --> rds
container --> rds
vpc --> subnet
subnet --> sg
sg --> rt
@enduml此圖示顯示了建立 MySQL 資料函式庫例項的步驟。
詳細解說:
- 步驟 A:首先,您需要登入 AWS 管理控制檯。
- 步驟 B:在頂部搜尋欄中搜尋並選擇 RDS,以進入 RDS 控制檯。
- 步驟 C 和 D:在左側選單中點選「資料函式庫」,然後點選「建立資料函式庫」按鈕開始建立新的資料函式庫例項。
- 步驟 E 和 F:選擇「簡單建立」方法,並選取 MySQL 作為資料函式庫引擎。然後,設定資料函式庫例項大小為免費層(db.t2.micro)。
- 步驟 G:最後,提供一個唯一的資料函式庫例項識別碼和主密碼,以完成資料函式庫的建立。
使用AWS DMS進行資料擷取的實作練習
使用Amazon EC2執行個體載入示範資料
我們需要建立一個MySQL的示範資料函式庫,然後使用AWS DMS將資料載入到Amazon S3中。為了載入示範資料到資料函式庫,我們將使用一個Amazon EC2執行個體:
- 在AWS管理主控台中,使用頂部的搜尋欄搜尋並選擇EC2。
- 在左側選單中,點選Instances。
- 在右上角,點選Launch instances。
- 選擇Amazon Linux 2 AMI(HVM),SSD Volume Type。
- 對於Instance type,選擇t2.micro,然後點選Next: Configure Instance Details。
- 在Configure instance details頁面中,確保Auto-assign Public IP設定為Enable。
- 在頁面底部有一個User data區塊。將以下bash指令碼貼入此區塊;該指令碼將在執行個體首次啟動時執行。請務必將
<PASSWORD>替換為你在「建立新的MySQL資料函式庫執行個體」步驟9中設定的密碼,並將<HOST>替換為你在「建立新的MySQL資料函式庫執行個體」步驟11中記錄的MySQL資料函式庫執行個體端點名稱:
#!/bin/bash
yum install -y mariadb
curl https://downloads.mysql.com/docs/sakila-db.zip -o sakila.zip
unzip sakila.zip
cd sakila-db
mysql --host=<HOST> --user=admin --password=<PASSWORD> -f < sakila-schema.sql
mysql --host=<HOST> --user=admin --password=<PASSWORD> -f < sakila-data.sql
內容解密:
此bash指令碼執行以下操作:
- 安裝MariaDB(其中包含MySQL客戶端,以便我們能夠連線到MySQL伺服器)。
- 從MySQL網站下載Sakila示範資料函式庫,然後解壓縮檔案並切換到sakila-db目錄。
- 連線到MySQL並執行sakila-schema.sql檔案中的SQL內容,該檔案建立了Sakila schema(資料函式庫、表格、檢視等)。
- 再次連線到MySQL並執行sakila-data.sql檔案中的SQL內容,該檔案將示範資料插入到Sakila資料函式庫的表格中。
- 點選Next: Add storage,並保留所有預設設定不變。
- 點選Next: Add Tags。點選Add Tag並將Key設為Name,Value設為dataeng-book-ec2。
- 點選Next: Configure Security Group,並選擇Select an existing security group。
- 選擇名為default的安全群組。
- 點選Review and Launch。
- 點選Launch。
- 在彈出視窗中,選擇Create a new key pair,並為你的新金鑰對提供一個名稱(例如dataeng-book-key)。然後,點選Download Key Pair並確保將金鑰對檔案儲存在你可以輕易從命令列存取的位置。
- 點選Launch Instances。
為DMS建立IAM政策和角色
在本文中,我們將建立一個IAM政策和角色,以便允許DMS寫入到我們的目標S3儲存桶:
- 在AWS管理主控台中,使用頂部的搜尋欄搜尋並選擇IAM。
- 在左側選單中,點選Policies,然後點選Create policy。
- 預設選擇Visual editor,因此點選JSON標籤以切換到文字輸入模式。
- 將文字框中的樣板程式碼替換為以下政策定義。請務必將
<initials>替換為你在第3章《AWS Data Engineers Toolkit》中建立的landing-zone儲存桶名稱的正確首字母縮寫:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*"
],
"Resource": [
"arn:aws:s3:::dataeng-landing-zone-<initials>",
"arn:aws:s3:::dataeng-landing-zone-<initials>/*"
]
}
]
}
內容解密:
此政策授予對dataeng-landing-zone-
- 點選Next: Tags,然後點選Next: Review。
- 提供一個描述性的政策名稱,例如DataEngDMSLandingS3BucketPolicy,然後點選Create policy。
組態DMS設定並執行從MySQL到S3的完整載入
在本文中,我們將建立一個DMS複製執行個體(一個受管理的EC2執行個體,連線到源端點、檢索資料並寫入到目標端點),並組態源和目標端點。然後,我們將建立一個資料函式庫遷移任務,提供遷移的組態設定。
設定DMS並啟動完整載入任務的步驟:
- 在AWS管理主控台中,使用頂部的搜尋欄搜尋DMS,然後點選Database Migration Service。
- 在左側選單中,點選Replication Instances。
- 在頁面頂部,點選Create replication instance。
使用AWS DMS實作MySQL資料函式庫到Amazon S3的資料遷移
建立DMS複製例項
為了將MySQL資料函式庫中的資料遷移到Amazon S3,我們首先需要建立一個AWS Database Migration Service(DMS)的複製例項。以下是建立複製例項的步驟:
- 在AWS管理控制檯中,搜尋並選擇DMS服務。
- 點選「複製例項」,然後點選「建立複製例項」。
- 為複製例項命名,例如
mysql-s3-replication。 - 選擇例項類別為
dms.t3.micro。 - 組態分配的儲存空間為10 GB,因為我們的資料函式庫規模較小。
- 選擇預設的VPC。
- 將「多可用區」設定為「開發或測試工作負載(單一AZ)」。
- 保留其他設定為預設值,然後點選「建立」。
建立來源和目標端點
接下來,我們需要建立來源和目標端點。
- 在左側選單中,點選「端點」,然後點選「建立端點」。
- 選擇「來源端點」,並選擇之前建立的MySQL資料函式庫例項。
- 手動輸入資料函式庫的連線資訊,包括密碼。
- 點選「建立端點」。
# Python範例:使用Boto3建立DMS端點
import boto3
dms = boto3.client('dms')
response = dms.create_endpoint(
EndpointIdentifier='dataeng-mysql-1',
EndpointType='source',
EngineName='mysql',
# 其他引數...
)
內容解密:
EndpointIdentifier:端點的唯一識別碼。EndpointType:端點型別,這裡是source,表示這是一個來源端點。EngineName:資料函式庫引擎名稱,這裡是mysql。
- 重複上述步驟,建立目標端點,選擇Amazon S3作為目標引擎,並提供相關的IAM角色ARN和S3儲存桶名稱。
建立資料遷移任務
現在,我們可以建立一個資料遷移任務,將資料從MySQL資料函式庫遷移到S3。
- 在左側選單中,點選「資料函式庫遷移任務」,然後點選「建立任務」。
- 為任務命名,並選擇之前建立的複製例項、來源端點和目標端點。
- 設定遷移型別為「遷移現有資料」。
# Python範例:使用Boto3建立DMS資料遷移任務
response = dms.create_replication_task(
ReplicationTaskIdentifier='dataeng-mysql-s3-sakila-task',
SourceEndpointArn='arn:aws:dms:REGION:ACCOUNT_ID:endpoint:ENDPOINT_ID',
TargetEndpointArn='arn:aws:dms:REGION:ACCOUNT_ID:endpoint:ENDPOINT_ID',
ReplicationInstanceArn='arn:aws:dms:REGION:ACCOUNT_ID:rep:REPLICATION_INSTANCE_ID',
MigrationType='full-load',
# 其他引數...
)
內容解密:
ReplicationTaskIdentifier:任務的唯一識別碼。SourceEndpointArn和TargetEndpointArn:來源和目標端點的ARN。ReplicationInstanceArn:複製例項的ARN。MigrationType:遷移型別,這裡是full-load,表示完整載入資料。
使用Amazon Athena查詢資料
資料遷移完成後,我們可以使用Amazon Athena查詢S3中的資料。
- 建立一個新的S3儲存桶,用於存放Athena查詢結果。
- 在Athena中,建立一個新的查詢編輯器,並選擇
sakila資料函式庫。 - 執行查詢,例如
SELECT * FROM film LIMIT 20;。
-- SQL範例:在Athena中查詢film表格
SELECT * FROM sakila.film LIMIT 20;
內容解密:
SELECT * FROM film LIMIT 20;:查詢film表格的前20行資料。