在大規模資料處理的實務中,原始資料需經多層次轉換才能提煉商業價值。本篇文章超越基礎的篩選與聚合,深入探討 Apache Spark 的進階資料處理技術。我們從複雜的分組聚合出發,展示如何結合多維度計算指標。接著,文章聚焦於窗口函數的核心應用,說明其在處理時間序列資料、執行滑動計算與排名分析時的強大功能。透過具體的 Scala 程式碼範例,我們將逐步解析如何定義窗口、應用 lag、lead 及 dense_rank 等函數,並延伸至高效處理 JSON 這類巢狀半結構化資料,完整呈現一個從資料清理到深度分析的技術藍圖。
第六章:深入理解資料轉換
聚合、分組和連接資料
dfNetflixTitlesWithNumMetrics
.filter(trim(lower($"country")) === "united states") // 過濾出國家為「美國」的資料
.groupBy(
$"rating", // 依據評級分組
year($"date_added").alias("year"), // 提取並依據年份分組
month($"date_added").alias("month") // 提取並依據月份分組
)
.agg(
count($"rating").alias("num_shows"), // 計算每個分組的節目數量
avg($"num_director").alias("avg_num_director"), // 計算平均導演數量
avg($"num_cast_member").alias("avg_num_cast_member"), // 計算平均演員數量
max($"num_director").alias("max_num_director"), // 計算最大導演數量
max($"num_cast_member").alias("max_num_cast_member"), // 計算最大演員數量
min($"num_director").alias("min_num_director"), // 計算最小導演數量
min($"num_cast_member").alias("min_num_cast_member") // 計算最小演員數量
)
.sort($"rating".desc) // 依據評級降序排序
dfNetflixUSMetrics.show(10)
在上述範例中,玄貓首先將資料過濾為僅限於美國的內容。然後,玄貓將資料按評級、節目添加年份和月份進行分組。接下來,玄貓將執行多個聚合操作。玄貓希望計算總記錄數,並獲取每個節目的導演和演員數量的平均值、最大值和最小值。
讓玄貓檢查groupBy部分。玄貓將使用ratings欄位,然後使用date_added欄位與year和month函數從日期中提取這些值。
接下來,玄貓將聚合玄貓的資料。玄貓將使用count函數在ratings欄位上計算分組中返回的記錄數,並使用avg、max和min函數從同一組記錄中獲取這些值,如以下螢幕截圖所示:
圖6.1 – 複雜分組/聚合的結果
聚合函數,例如sum和avg,也適用於cube和rollUp。這些函數不像groupBy那樣常用,因此玄貓在此處不作詳細介紹。如果玄貓有興趣,可以參考相關文獻了解cube、rollUp和groupBy操作之間的區別。
Spark還有一個內建方法describe,可以為玄貓提供一些此類指標:
dfNetflixTitlesWithNumMetrics
.filter(trim(lower($"country")) === "united states") // 過濾出美國的資料
.describe("num_cast_member", "num_director") // 描述演員和導演數量欄位
.show()
這是它的輸出:
+-------+-----------------+------------------+
|summary| num_cast_member| num_director|
+-------+-----------------+------------------+
| count| 2818| 2818|
| mean|7.072036905606813|1.1014904187366927|
| stddev|5.332680221863612|0.5073033316490272|
| min| 1| 1|
| max| 50| 13|
+-------+-----------------+------------------+
在玄貓對資料集呼叫describe之前,玄貓會將記錄過濾到僅限於美國,並專注於每個節目的演員和導演數量。describe方法為玄貓選擇的欄位提供了計數(count)、平均值(mean)、標準差(standard deviation)、最小值(minimum)和最大值(maximum)。
玄貓還可以使用Scala和Spark進行更進階的分析。在下一節中,玄貓將介紹窗口函數以及如何使用它們。
善用進階窗口函數
窗口函數(Window functions)是一種在特定記錄組或記錄的滑動子集上執行計算的方法。它們可以用於執行累積計算、從相對於當前正在處理的記錄位置的其他記錄中獲取值,以及執行排名計算。它們還可以使用聚合函數,例如count、avg、min和max。讓玄貓現在看看它們:
val windowSpecRatingMonth =
Window.partitionBy("rating", "month") // 依據評級和月份分區
.orderBy("year", "month") // 在每個分區內依據年份和月份排序
窗口是透過Window.partitionBy和orderBy定義的。玄貓定義了一個名為windowSpecRatingMonth的窗口,它將資料按rating和month分區,然後在每個分區內按year和month排序。
val dfWindowedLagLead = dfNetflixUSMetrics
.withColumn(
"cast_per_director",
$"avg_num_cast_member" / $"avg_num_director") // 計算每個導演的平均演員數量
.withColumn(
"previous_avg_cast_member",
lag("avg_num_cast_member", 1) // 獲取前一個月份的平均演員數量
.over(windowSpecRatingMonth))
.withColumn(
"next_avg_cast_member",
lead("avg_num_cast_member", 1) // 獲取下一個月份的平均演員數量
.over(windowSpecRatingMonth))
.withColumn(
"diff_prev_curr_num_cast",
abs($"avg_num_cast_member" - // 計算當前與前一個月份平均演員數量的絕對差值
$"previous_avg_cast_member"))
.drop("max_num_director", // 移除不必要的欄位
"min_num_director",
"avg_num_director")
此圖示:聚合與窗口函數應用流程
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "輸入資料" as InputData {
rectangle "dfNetflixTitlesWithNumMetrics: DataFrame" as InitialDF
}
package "基礎過濾與聚合" as BasicAgg {
component "filter(country === 'united states')" as FilterUS
component "groupBy(rating, year(date_added), month(date_added))" as GroupByTimeRating
component "agg(count, avg_num_director, avg_num_cast_member, ...)" as Aggregations
rectangle "dfNetflixUSMetrics: DataFrame" as USMetricsDF
}
package "描述性統計" as DescriptiveStats {
component "describe(\"num_cast_member\", \"num_director\")" as DescribeMetrics
rectangle "dfDescription: DataFrame" as DescriptionDF
}
package "窗口函數應用" as WindowFunctions {
component "Window.partitionBy(\"rating\", \"month\").orderBy(\"year\", \"month\")" as WindowSpec
component "withColumn(\"cast_per_director\", ...)" as CastPerDirector
component "withColumn(\"previous_avg_cast_member\", lag(...).over(windowSpec))" as LagFunction
component "withColumn(\"next_avg_cast_member\", lead(...).over(windowSpec))" as LeadFunction
component "withColumn(\"diff_prev_curr_num_cast\", abs(...))" as DiffFunction
component "drop(...)" as DropColumns
rectangle "dfWindowedLagLead: DataFrame" as FinalWindowedDF
}
InitialDF --> FilterUS
FilterUS --> GroupByTimeRating
GroupByTimeRating --> Aggregations
Aggregations --> USMetricsDF
USMetricsDF --> DescribeMetrics
DescribeMetrics --> DescriptionDF : 產生統計摘要
USMetricsDF --> WindowSpec
WindowSpec --> CastPerDirector
CastPerDirector --> LagFunction
LagFunction --> LeadFunction
LeadFunction --> DiffFunction
DiffFunction --> DropColumns
DropColumns --> FinalWindowedDF : 應用窗口函數後的最終資料
note right of USMetricsDF
- 包含按評級、年份、月份聚合的指標
end note
note right of FinalWindowedDF
- 包含基於時間序列的領先/滯後分析
- 移除冗餘欄位
end note
@enduml看圖說話:
此圖示展示了從初始資料到應用窗口函數的複雜分析流程。首先,dfNetflixTitlesWithNumMetrics作為輸入,經過基礎過濾與聚合階段。在這個階段,資料被filter篩選出美國的內容,然後按rating、date_added的year和month進行groupBy,並執行多種聚合操作(count、avg、max、min),生成dfNetflixUSMetrics。
接著,dfNetflixUSMetrics被用於兩個不同的分析路徑:
- 描述性統計:透過
describe方法對num_cast_member和num_director欄位進行快速統計摘要,生成dfDescription。 - 窗口函數應用:這是更深入的分析。首先定義了一個
WindowSpec,它根據rating和month進行分區,並按year和month排序。然後,一系列withColumn操作被應用:計算cast_per_director,並利用lag和lead窗口函數獲取前一個和下一個月份的平均演員數量,進而計算出diff_prev_curr_num_cast。最後,drop操作移除了不再需要的欄位,生成了最終的dfWindowedLagLead。
這個流程圖清晰地展示了如何逐步地從宏觀聚合到微觀的時序分析,並利用Spark提供的強大功能來提取資料中的深層模式和趨勢。
第六章:深入理解資料轉換
善用進階窗口函數
要使用窗口,玄貓可以將其與窗口函數結合,在使用withColumn方法創建衍生欄位時應用於DataFrame。
在上述程式碼中,玄貓使用了來自前一節的dfNetflixUSMetrics DataFrame。玄貓將使用withColumn創建一個名為cast_per_director的新欄位,計算每個導演的平均演員數量。接著,玄貓將使用玄貓的窗口函數創建兩個新欄位。第一個使用窗口函數的欄位將獲取相對於當前正在處理的行,前一個平均演員數量。玄貓透過使用lag函數來做到這一點,傳遞兩個參數:玄貓感興趣的欄位和1,這告訴lag函數從前一條記錄中獲取值。玄貓將使用的第二個函數是lead函數。這與玄貓的lag函數類似,但在這種情況下,它從下一條記錄中獲取玄貓想要的欄位值。玄貓可以在以下結果集中看到模式:
+------+----+-----+-------------+-------------+------------+
|rating|year|month| prev| curr| next|
+------+----+-----+-------------+-------------+------------+
| PG|2017| 4| null| 6.0| 13.0|
| PG|2018| 4| 6.0| 13.0|8.3333333334|
| PG|2019| 4| 13.0|8.33333333334| 10.75|
| PG|2020| 4|8.33333333334| 10.75| 9.0|
| PG|2021| 4| 10.75| 9.0| null|
+------+----+-----+-------------+-------------+------------+
現在,讓玄貓看看排名如何與窗口一起工作。玄貓將定義一個新的窗口規範並向玄貓的資料集添加一個新欄位,如下所示:
val windowSpecRating =
Window.partitionBy("rating", "year") // 依據評級和年份分區
.orderBy($"avg_num_cast_member".desc) // 在每個分區內依據平均演員數量降序排序
val dfWindowedRank = dfNetflixUSMetrics
.withColumn("dense_rank", dense_rank() // 添加 dense_rank 欄位
.over(windowSpecRating))
在玄貓的新窗口中,玄貓正在按rating和year進行分區,並按avg_num_cast_member降序排序。玄貓使用withColumn向玄貓的資料集添加一個名為dense_rank的新欄位。
玄貓使用dense_rank函數定義此欄位,並在玄貓的窗口規範上運行排名。dense_rank函數在排名中不會留下任何間隙。這意味著在排名並列的情況下,並列之後的下一條記錄將是下一個值。
看看玄貓操作的結果:
+------+----+-------------------+----------+
|rating|year|avg_num_cast_member|dense_rank|
+------+----+-------------------+----------+
| PG|2012| 2.0| 1|
| PG|2013| 10.0| 1|
| PG|2014| 8.0| 1|
| PG|2014| 5.0| 2|
| PG|2015| 10.0| 1|
| PG|2016| 11.0| 1|
| PG|2017| 10.5| 1|
| PG|2017| 10.0| 2|
| PG|2017| 8.0| 3|
| PG|2017| 8.0| 3|
| PG|2017| 6.0| 4|
| PG|2018| 15.0| 1|
| PG|2018| 13.0| 2|
| PG|2018| 12.0| 3|
| PG|2018| 10.5| 4|
| PG|2018| 10.5| 4|
| PG|2018| 9.0| 5|
| PG|2018| 9.0| 5|
| PG|2018| 8.5| 6|
| PG|2018| 6.0| 7|
+------+----+-------------------+----------+
有許多不同類型的窗口函數。玄貓可以參考Spark文件以了解更多關於窗口及其函數的資訊。
在下一節中,玄貓將學習如何處理複雜資料類型。
處理複雜資料集類型
在現實世界中,玄貓經常需要處理不符合標準表格格式的資料,即每條記錄的每個欄位只有一個值。玄貓之前在netflix titles CSV檔案的cast和director欄位中看到了一點,但是當玄貓遇到更複雜的結構時會發生什麼?
在本節中,玄貓將向玄貓展示如何管理半結構化資料中的巢狀資料,例如XML和JSON。考慮以下程式碼:
val dfDevicesJson = spark.read.json(
"src/main/scala/com/packt/dewithscala/chapter6/data/devices.json")
dfDevicesJson.printSchema()
這是輸出:
root
|-- country: string (nullable = true)
|-- device_id: string (nullable = true)
|-- event_ts: timestamp (nullable = true)
|-- event_type: string (nullable = true)
|-- id: long (nullable = true)
|-- line: string (nullable = true)
|-- manufacturer: string (nullable = true)
|-- observations: array (nullable = true)
| |-- element: double (containsNull = true)
此圖示:進階窗口函數與複雜資料處理流程
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "輸入資料" as InputData {
rectangle "dfNetflixUSMetrics: DataFrame" as USMetricsDF
}
package "窗口函數 - 領先/滯後分析" as LagLeadAnalysis {
component "Window.partitionBy(\"rating\", \"month\").orderBy(\"year\", \"month\")" as WindowSpecRatingMonth
component "withColumn(\"cast_per_director\", ...)" as CalcCastPerDirector
component "withColumn(\"previous_avg_cast_member\", lag(...).over(windowSpecRatingMonth))" as LagFunction
component "withColumn(\"next_avg_cast_member\", lead(...).over(windowSpecRatingMonth))" as LeadFunction
component "withColumn(\"diff_prev_curr_num_cast\", abs(...))" as DiffFunction
component "drop(...)" as DropColumnsLagLead
rectangle "dfWindowedLagLead: DataFrame" as WindowedLagLeadDF
}
package "窗口函數 - 排名分析" as RankingAnalysis {
component "Window.partitionBy(\"rating\", \"year\").orderBy($\"avg_num_cast_member\".desc)" as WindowSpecRatingYear
component "withColumn(\"dense_rank\", dense_rank().over(windowSpecRatingYear))" as DenseRankFunction
rectangle "dfWindowedRank: DataFrame" as WindowedRankDF
}
package "複雜資料類型處理" as ComplexDataHandling {
file "devices.json" as JsonFile
component "spark.read.json(\"devices.json\")" as ReadJson
component "printSchema()" as PrintSchemaJson
rectangle "dfDevicesJson: DataFrame" as DevicesJsonDF
}
USMetricsDF --> WindowSpecRatingMonth
WindowSpecRatingMonth --> CalcCastPerDirector
CalcCastPerDirector --> LagFunction
LagFunction --> LeadFunction
LeadFunction --> DiffFunction
DiffFunction --> DropColumnsLagLead
DropColumnsLagLead --> WindowedLagLeadDF : 包含領先/滯後分析結果
USMetricsDF --> WindowSpecRatingYear
WindowSpecRatingYear --> DenseRankFunction
DenseRankFunction --> WindowedRankDF : 包含排名分析結果
JsonFile --> ReadJson
ReadJson --> PrintSchemaJson
PrintSchemaJson --> DevicesJsonDF : 讀取 JSON 並顯示 Schema
note right of WindowedLagLeadDF
- 透過 lag/lead 函數進行時間序列比較
- 適用於趨勢分析
end note
note right of WindowedRankDF
- 透過 dense_rank 函數進行排名
- 處理並列情況,無間隙
end note
note right of DevicesJsonDF
- 示範 Spark 處理半結構化 JSON 資料
- 包含巢狀結構和陣列類型
end note
@enduml看圖說話:
此圖示展示了Spark中進階資料處理的兩個主要面向:窗口函數的應用和複雜資料類型的處理。首先,dfNetflixUSMetrics作為輸入,用於執行兩種不同類型的窗口函數分析。
在領先/滯後分析部分,定義了一個WindowSpecRatingMonth,它根據評級和月份進行分區,並按年份和月份排序。接著,透過withColumn計算了cast_per_director,並利用lag和lead函數獲取了前後時間點的平均演員數量,進而計算出差異,最終生成了dfWindowedLagLead。這類分析對於觀察時間序列趨勢和變化非常有用。
在排名分析部分,定義了另一個WindowSpecRatingYear,它根據評級和年份分區,並按平均演員數量降序排序。然後,使用dense_rank函數為每個分區內的記錄進行排名,生成了dfWindowedRank。dense_rank的特性是即使有並列,排名也不會出現間隙,這對於需要連續排名的場景非常重要。
最後,圖示轉向複雜資料類型處理,展示了如何讀取一個devices.json檔案。Spark能夠自動推斷其Schema,即使其中包含巢狀結構和陣列類型(如observations欄位),並將其讀取為dfDevicesJson DataFrame,隨後透過printSchema()展示其結構。這突顯了Spark在處理半結構化資料方面的強大能力。
第六章結論
縱觀本章所探討的進階資料轉換技術,其核心價值在於驅動分析思維的典範轉移。傳統 groupBy 提供的是靜態的橫切面快照,而窗口函數則透過 lag、lead 及排名機制,揭示了資料內部蘊含的時間序列趨勢與相對表現等動態關係,這是簡單聚合無法觸及的深度。真正的瓶頸並非語法掌握,而是培養出能洞察何時需啟用時序或排名分析的策略性直覺。未來,隨著半結構化資料成為常態,將窗口函數的分析深度與處理巢狀結構的彈性無縫整合,將不再是「進階」選項,而是資料專業的基礎門檻。玄貓認為,熟練掌握這些轉換工具,是區分資料工匠與資料策略師的關鍵分水嶺,它代表著從單純執行指令到主動挖掘商業洞察的根本躍升。