串流資料擷取是現代資料工程中不可或缺的一環,如何有效地擷取、儲存和轉換串流資料,對於後續的資料分析至關重要。本文以 AWS Kinesis 和 Kinesis Data Firehose 作為串流資料擷取工具,示範如何將串流資料寫入 S3 儲存桶。同時,文章也討論了資料轉換的重要性,比較了 Apache Spark、Hadoop MapReduce 和 SQL 等不同轉換引擎的特性,並提供使用 AWS Glue Studio 和 PySpark 進行資料轉換的實務範例,讓讀者更深入地瞭解如何最佳化資料轉換流程,提升分析效率。
實戰練習:串流資料擷取
在本章的前面部分,我們探討了將串流資料擷取到AWS的兩種選擇,即Amazon Kinesis和Amazon MSK。在本文中,我們將使用無伺服器的Amazon Kinesis服務來擷取串流資料。為了產生串流資料,我們將使用開源的Amazon Kinesis Data Generator(KDG)。在本文中:
- 組態Amazon Kinesis Data Firehose以擷取串流資料,並將資料寫入Amazon S3。
- 組態Amazon KDG以建立假的串流資料來源。
組態Kinesis Data Firehose以將串流資料傳送到Amazon S3
Kinesis Data Firehose的設計目的是讓您能夠輕鬆地從串流來源擷取資料,然後將該資料寫入支援的目標(例如Amazon S3,我們將在本次練習中這樣做)。讓我們開始吧:
- 在AWS管理主控台中,使用頂部的搜尋列搜尋並選擇Kinesis。
- Kinesis登入頁面提供了使用Kinesis Data Streams、Kinesis Data Firehose或Kinesis Data Analytics建立新串流的連結。選擇Kinesis Data Firehose服務,然後點選建立傳遞串流。
- 在本次練習中,我們將使用KDG直接將資料傳送到Firehose,因此對於來源,從下拉清單中選擇Direct PUT。對於目的地,從下拉清單中選擇Amazon S3。
- 對於傳遞串流名稱,輸入一個描述性的名稱,例如
dataeng-firehose-streaming-s3。 - 對於使用AWS Lambda轉換記錄,保留預設的已停用。此功能可用於執行資料驗證任務或使用AWS Lambda對傳入的資料進行輕量級處理,但我們希望在不進行任何處理的情況下擷取資料,因此我們將保持此功能停用。
- 對於轉換記錄格式,我們也將保留預設的已停用。這可用於將傳入的資料轉換為Apache Parquet或Apache ORC格式。但是,要做到這一點,我們需要提前指定傳入資料的結構描述。我們將在不更改檔案格式的情況下擷取資料,因此我們將保持此功能停用。
設定詳細資訊
- 對於S3儲存桶,選擇您先前建立的Landing Zone儲存桶;例如,
dataeng-landing-zone-<initials>。 - 預設情況下,Kinesis Data Firehose會將資料寫入S3,並使用字首來按YYYY/MM/dd/HH分割傳入的資料。對於我們的資料集,我們希望將串流資料載入到一個串流字首中,並且只希望按資料被擷取的年份和月份進行分割。因此,我們必須將S3儲存桶字首設定為
streaming/!{timestamp:yyyy/MM/}。有關自定義字首的更多資訊,請參閱https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html。 - 如果我們為傳入的資料設定自定義字首,我們還必須設定自定義錯誤字首。將S3儲存桶錯誤輸出字首設定為
!{firehose:error-output-type}/!{timestamp:yyyy/MM/}。
緩衝區設定
- 展開緩衝區提示、壓縮和加密部分。
- S3緩衝區條件允許我們控制在將資料寫入目標之前,Kinesis緩衝傳入資料的引數。我們指定緩衝區大小(以MB為單位)和緩衝區間隔(以秒為單位),並且以先到達者為準,將觸發Kinesis寫入目標。如果我們使用最大緩衝區大小128 MB和最大緩衝區間隔900秒(15分鐘),我們將看到以下行為。如果我們每秒接收1 MB的資料,Kinesis Data Firehose將在約128秒後觸發(當128 MB的資料被緩衝時)。另一方面,如果我們每秒接收0.1 MB的資料,Kinesis Data Firehose將在900秒的最大緩衝區間隔後觸發。對於我們的使用案例,我們將把緩衝區大小設定為1 MB,將緩衝區間隔設定為60秒。
建立Kinesis Data Firehose串流並組態KDG
- 對於所有其他設定,保持預設設定不變,然後點選建立傳遞串流。
- 現在我們的Kinesis Data Firehose串流已經準備好接收資料。因此,在下一節中,我們將使用KDG工具生成一些資料以傳送到串流。
組態Amazon Kinesis Data Generator(KDG)
Amazon KDG是一個來自AWS的開源工具,可以用來生成自定義的資料串流,並可以將該資料傳送到Kinesis Data Streams或Kinesis Data Firehose。
KDG的使用步驟
- 在瀏覽器中開啟KDG幫助頁面,網址為https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html。
- 閱讀有關CloudFormation範本如何運作以在您的帳戶中建立Cognito憑據的資訊。當您準備好後,點選使用CloudFormation建立Cognito使用者按鈕。
- AWS管理主控台將開啟到CloudFormation建立堆積疊頁面。開啟連結時,區域可能會預設為俄勒岡州(us-west-2-),因此如果必要,請更改區域到您正在使用的區域,然後接受CloudFormation預設值並點選下一步。
- 在指定堆積疊詳細資訊頁面上,為您的Cognito使用者提供使用者名稱和密碼,然後點選下一步。
設定KDG
- 對於組態堆積疊選項,保持所有預設設定不變,然後點選下一步。
- 檢視要建立的堆積疊的詳細資訊,然後點選確認方塊以確認AWS CloudFormation可能會建立IAM資源。點選建立堆積疊。
使用KDG生成串流資料
// 使用KDG生成模擬串流資料
// 包含以下欄位:
// - 串流時間戳
// - 客戶是否租借、購買或觀看了預告片
// - 與Sakila影片資料函式庫相匹配的film_id
// - 發行合作夥伴名稱
// - 串流平台
// - 影片被串流播放的州份
程式碼解密:
此JavaScript程式碼片段展示瞭如何使用KDG生成模擬串流資料。它包含了多個欄位,例如串流時間戳、客戶行為、影片ID、發行合作夥伴名稱、串流平台和州份等,用於模擬真實世界的串流資料。
- 包含多個欄位: 程式碼定義了多個欄位,例如
streaming_timestamp、customer_action、film_id等,這些欄位用於生成模擬串流資料。 - 模擬真實資料: 透過使用特定的函式或範本引擎,KDG可以生成類別似真實世界的資料,從而幫助測試和開發根據串流資料的應用程式或系統。
圖表說明:使用Plantuml表示KDG資料流程圖
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 圖表說明:使用Plantuml表示KDG資料流程圖
rectangle "生成模擬資料" as node1
rectangle "寫入資料" as node2
rectangle "儲存資料" as node3
node1 --> node2
node2 --> node3
@enduml此圖示展示了使用KDG生成模擬串流資料,並透過Kinesis Data Firehose將資料寫入Amazon S3,最終儲存在資料湖中的流程。
圖表內容解密:
- KDG生成模擬資料: KDG根據設定的範本或規則生成模擬的串流資料。
- Kinesis Data Firehose寫入資料: KDG生成的資料被傳送到Kinesis Data Firehose,由其負責將資料寫入指定的目標,在本例中是Amazon S3。
- Amazon S3儲存資料: 資料被寫入S3儲存桶,並根據設定的字首規則進行組織,最終成為資料湖的一部分,用於進一步的分析和處理。
資料轉換以最佳化分析效能
在前面的章節中,我們討論瞭如何架構資料管線以及將資料匯入資料湖的常見方法。現在,我們將著重於轉換原始資料的過程,以最佳化資料用於分析,並為組織創造價值。
資料轉換的重要性
資料轉換是資料工程師的核心任務之一,目的是最佳化資料以供分析使用,並為組織創造價值。資料轉換有多種型別,有些是常見的,可以通用於各類別資料集,例如將原始檔案轉換為Parquet格式並對資料集進行分割槽。其他轉換則需要根據業務邏輯進行,並根據資料內容和特定業務需求進行變化。
常見的資料轉換型別
格式轉換
將原始資料從一種格式轉換為另一種更適合分析的格式,例如將CSV或JSON檔案轉換為Parquet或ORC格式,可以提高查詢效能和降低儲存成本。
-- 使用Athena將CSV檔案轉換為Parquet格式
CREATE TABLE my_table_parquet
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY'
) AS
SELECT *
FROM my_table_csv;
內容解密:
此查詢使用Amazon Athena將名為my_table_csv的CSV格式表格轉換為Parquet格式,並儲存在my_table_parquet中。WITH子句指定輸出格式為Parquet,並使用SNAPPY壓縮演算法。這樣的轉換可以提高查詢效能並降低儲存成本。
資料分割槽
根據特定的欄位對資料進行分割槽,可以提高查詢效能,特別是在處理大規模資料集時。例如,根據日期或地區對資料進行分割槽。
-- 使用Athena對資料進行分割槽
CREATE TABLE my_partitioned_table (
id INT,
name STRING,
date DATE
)
PARTITIONED BY (region STRING);
內容解密:
此範例展示如何使用Athena建立一個根據region欄位進行分割槽的表格my_partitioned_table。分割槽可以顯著提高查詢特定地區資料時的效能,因為查詢可以只掃描相關的分割槽,而不必掃描整個資料集。
業務邏輯相關的轉換
除了常見的格式轉換和分割槽外,資料轉換還可能涉及根據業務邏輯進行的複雜處理,例如資料清理、資料標準化、以及根據特定業務規則進行的資料轉換。
資料轉換:最佳化分析的關鍵步驟
在資料工程領域中,資料轉換是解鎖資料價值的關鍵步驟。本章節將探討 AWS 中可用於執行資料轉換的引擎,並介紹常見的資料轉換技術。
資料轉換的重要性
正如本文前幾章節所討論的,資料是組織最有價值的資產之一。然而,原始且孤立的資料本身具有有限的價值。當我們將不同的原始資料集合併並透過分析管道進行轉換時,才能真正釋放組織資料的價值。
烹飪、烘焙與資料轉換的類別比
想像以下這些食材:
- 糖
- 奶油
- 雞蛋
- 牛奶
這些是常見的食材,有些可以直接食用(如雞蛋和牛奶),而其他的則通常與其他東西一起食用(如在咖啡或茶中新增糖,或在麵包上塗抹奶油)。
如果我們將這些食材與其他成分(如麵粉和泡打粉)結合,並以正確的方式混合,就可以烘焙出美味的蛋糕,這與原始食材完全不同。同樣地,我們的個別資料集對其來源的組織部分具有價值,但如果我們以正確的方式將這些資料集結合起來,就可以創造出全新的東西。
資料轉換作為管道的一部分
在第 5 章「設計資料工程管道」中,我們開發了資料管道的高層設計。首先,我們與業務使用者合作,瞭解他們的需求(決定是要製作蛋糕還是早餐)。然後,我們收集了三個廣泛領域的初始資訊:
- 資料消費者:誰將消費我們建立的資料,以及他們將使用哪些工具來收集資料(我們的客人)?
- 資料來源:我們有哪些可用的資料來源可以用來建立新的資料集(我們的原始食材)?
- 資料轉換:我們在高層次上檢視了管道中可能需要的轉換型別,以便準備和連線我們的資料集(製作蛋糕或煎蛋的配方)。
現在,我們需要為管道轉換開發低層設計,包括確定我們需要執行的轉換型別,以及將使用哪些資料轉換工具。在下一節中,我們將首先檢視可用的資料轉換引擎型別。
資料轉換工具的型別
正如第 3 章「AWS 資料工程師工具包」中所介紹的,AWS 提供了多種可用於資料轉換的服務。我們將在下面更廣泛地檢視不同型別的資料轉換引擎。
Apache Spark
Apache Spark 是一種記憶體引擎,用於處理大型資料集,提供了一種將資料集分割到叢集中多個節點以實作高效處理的機制。Spark 是一種極為流行的引擎,用於處理和轉換大型資料集,並且有多種方法可以在 AWS 中執行 Spark 任務。
常見的資料轉換技術
本章節將介紹常見的資料準備轉換、業務使用案例轉換,以及如何處理變更資料捕捉(CDC)資料。此外,還將透過 AWS Glue Studio 和 Apache Spark 進行實作練習,以建立資料轉換管道。
技術需求
要完成本章節中的實作任務,您需要能夠存取 AWS Glue 服務,包括 AWS Glue Studio。此外,您還需要能夠建立新的 S3 儲存桶和新的 IAM 策略。
程式碼範例:
from pyspark.sql import SparkSession
# 建立 SparkSession
spark = SparkSession.builder.appName("Data Transformation Example").getOrCreate()
# 載入範例資料
data = spark.read.csv("s3://example-bucket/data.csv", header=True, inferSchema=True)
# 進行資料轉換
transformed_data = data.filter(data["age"] > 30)
# 將結果寫入新的 CSV 檔案
transformed_data.write.csv("s3://example-bucket/transformed_data.csv", header=True)
# 結束 SparkSession
spark.stop()
內容解密:
- 建立 SparkSession:首先,我們需要建立一個 SparkSession,這是與 Spark 進行互動的主要入口點。我們使用
SparkSession.builder.appName("Data Transformation Example").getOrCreate()來建立一個名為 “Data Transformation Example” 的 SparkSession。 - 載入範例資料:接下來,我們使用
spark.read.csv()方法從 S3 儲存桶中載入範例 CSV 檔案。我們指定header=True以指示第一行包含欄位名稱,並使用inferSchema=True自動推斷欄位的資料型別。 - 進行資料轉換:在這個範例中,我們對載入的資料進行了一個簡單的篩選操作,使用
data.filter(data["age"] > 30)只保留年齡大於 30 的記錄。 - 將結果寫入新的 CSV 檔案:最後,我們使用
transformed_data.write.csv()方法將轉換後的結果寫入新的 CSV 檔案中,同樣儲存在 S3 儲存桶中。 - 結束 SparkSession:完成所有操作後,我們呼叫
spark.stop()方法來結束 SparkSession,釋放資源。
這個範例展示瞭如何使用 Apache Spark 和 PySpark 在 AWS 環境中進行基本的資料轉換任務。您可以根據實際需求修改和擴充套件這個範例,以滿足特定的業務需求。
資料轉換工具的多樣性與應用
在當今的大資料時代,資料轉換工具扮演著至關重要的角色。這些工具能夠幫助資料工程師和科學家有效地處理和分析海量資料。以下將介紹幾種常見的資料轉換工具及其在AWS上的應用。
Apache Spark:高效能的資料處理引擎
Apache Spark是一種高效能的開源資料處理引擎,能夠處理批次和串流資料。它提供了多種API,包括Spark SQL、Spark ML和GraphX,分別用於SQL查詢、機器學習和圖形計算。Spark的最大優勢在於其能夠在記憶體中進行資料處理,從而大大提高了處理速度。
在AWS上,可以使用多種服務來執行Spark任務。例如,AWS Glue提供了一種無伺服器的執行Spark的方式,而Amazon EMR則提供了一個託管服務,用於佈署和管理Spark叢集。此外,還可以使用AWS容器服務(ECS或EKS)來執行Spark引擎,或使用AWS合作夥伴提供的託管服務,如Databricks。
Hadoop和MapReduce:傳統的大資料處理框架
Apache Hadoop是一個由多個開源軟體包組成的框架,用於處理大規模資料集。它可以從單一伺服器擴充套件到數千個節點。在Spark出現之前,Hadoop框架中的工具(如Hive和MapReduce)是最受歡迎的大資料處理方式。
Apache Hive提供了一個類別似SQL的介面,用於處理大規模資料集,而MapReduce則提供了一種根據程式碼的處理大規模資料集的方法。與Spark相比,Hadoop MapReduce的主要差異在於它大量使用傳統的磁碟讀寫操作,這使得它在某些情況下可能比Spark更適合處理大規模資料集。
在AWS上,可以使用Amazon EMR來執行多種Hadoop工具,包括Hive、HBase、Yarn、Tez和Pig等。
SQL:廣泛使用的資料轉換方法
結構化查詢語言(SQL)是另一種常見的資料轉換方法。SQL的優勢在於其知識和經驗廣泛可用,使得它成為許多組織進行資料轉換的首選。然而,使用根據程式碼的方法(如Apache Spark)可以提供更強大和靈活的資料轉換能力。
內容解密:
在選擇資料轉換引擎時,資料工程師需要考慮組織內部的技能組合、工具集以及最終的資料目標。如果組織內部具備豐富的SQL技能,那麼使用SQL進行資料轉換可能是合理的選擇。然而,如果組織需要處理複雜的資料,並且對延遲和吞吐量有很高的要求,那麼投資於現代資料處理技術(如Spark)可能是更好的選擇。
資料轉換的未來趨勢
隨著大資料技術的不斷發展,資料轉換工具也在不斷演進。現代資料處理引擎如Apache Spark正在成為主流,而傳統的Hadoop系統仍然在許多組織中被廣泛使用。此外,SQL仍然是一種重要的資料轉換方法,尤其是在具備豐富SQL技能的組織中。
內容解密:
在選擇資料轉換工具時,需要考慮多種因素,包括組織的技能組合、資料的複雜度和對延遲和吞吐量的要求。透過選擇合適的工具,可以有效地提高資料處理效率並降低成本。
資料倉儲與資料湖的整合
在某些情況下,資料轉換的目標可能是資料倉儲系統,如Amazon Redshift或Snowflake。這時,可以採用Extract、Load、Transform(ELT)的方法,先將原始資料載入資料倉儲中,然後使用SQL進行資料轉換。或者,可以使用Apache Spark等工具進行資料轉換,然後將轉換後的資料載入資料倉儲或資料湖中。
-- 使用SQL進行資料轉換的範例
SELECT
column1,
column2,
SUM(column3) AS total_column3
FROM
table_name
GROUP BY
column1, column2;
內容解密:
上述SQL查詢用於對table_name中的資料進行分組並計算column3的總和。其中,column1和column2用於分組,而SUM(column3)則計算每個分組中column3的總和。
AWS Glue Studio:視覺化的ETL工具
AWS Glue Studio提供了一個視覺化的介面,用於設計ETL任務,包括使用SQL陳述式進行複雜的資料轉換。這使得沒有Spark程式設計技能的使用者也能夠使用Apache Spark引擎執行根據SQL的轉換任務。
# 使用PySpark進行資料轉換的範例
from pyspark.sql import SparkSession
# 建立SparkSession
spark = SparkSession.builder.appName("Data Transformation").getOrCreate()
# 讀取資料
df = spark.read.csv("s3://bucket-name/data.csv", header=True, inferSchema=True)
# 進行資料轉換
df_transformed = df.filter(df["column_name"] > 0).groupBy("column_name").count()
# 寫入資料
df_transformed.write.parquet("s3://bucket-name/output.parquet")
內容解密:
上述PySpark程式碼用於讀取儲存在S3上的CSV檔案,進行資料轉換後寫入Parquet格式的檔案。其中,filter方法用於篩選資料,而groupBy和count方法則用於對篩選後的資料進行分組並計算每個分組的數量。最終,轉換後的資料被寫入S3上的Parquet檔案中。