在大規模資料處理的實務中,原始資料需經多層次轉換才能提煉商業價值。本篇文章超越基礎的篩選與聚合,深入探討 Apache Spark 的進階資料處理技術。我們從複雜的分組聚合出發,展示如何結合多維度計算指標。接著,文章聚焦於窗口函數的核心應用,說明其在處理時間序列資料、執行滑動計算與排名分析時的強大功能。透過具體的 Scala 程式碼範例,我們將逐步解析如何定義窗口、應用 lagleaddense_rank 等函數,並延伸至高效處理 JSON 這類巢狀半結構化資料,完整呈現一個從資料清理到深度分析的技術藍圖。

第六章:深入理解資料轉換

聚合、分組和連接資料

dfNetflixTitlesWithNumMetrics
.filter(trim(lower($"country")) === "united states") // 過濾出國家為「美國」的資料
.groupBy(
$"rating", // 依據評級分組
year($"date_added").alias("year"), // 提取並依據年份分組
month($"date_added").alias("month") // 提取並依據月份分組
)
.agg(
count($"rating").alias("num_shows"), // 計算每個分組的節目數量
avg($"num_director").alias("avg_num_director"), // 計算平均導演數量
avg($"num_cast_member").alias("avg_num_cast_member"), // 計算平均演員數量
max($"num_director").alias("max_num_director"), // 計算最大導演數量
max($"num_cast_member").alias("max_num_cast_member"), // 計算最大演員數量
min($"num_director").alias("min_num_director"), // 計算最小導演數量
min($"num_cast_member").alias("min_num_cast_member") // 計算最小演員數量
)
.sort($"rating".desc) // 依據評級降序排序
dfNetflixUSMetrics.show(10)

在上述範例中,玄貓首先將資料過濾為僅限於美國的內容。然後,玄貓將資料按評級節目添加年份月份進行分組。接下來,玄貓將執行多個聚合操作。玄貓希望計算總記錄數,並獲取每個節目的導演和演員數量的平均值、最大值和最小值。

讓玄貓檢查groupBy部分。玄貓將使用ratings欄位,然後使用date_added欄位與yearmonth函數從日期中提取這些值。

接下來,玄貓將聚合玄貓的資料。玄貓將使用count函數在ratings欄位上計算分組中返回的記錄數,並使用avgmaxmin函數從同一組記錄中獲取這些值,如以下螢幕截圖所示:

圖6.1 – 複雜分組/聚合的結果

聚合函數,例如sumavg,也適用於cuberollUp。這些函數不像groupBy那樣常用,因此玄貓在此處不作詳細介紹。如果玄貓有興趣,可以參考相關文獻了解cuberollUpgroupBy操作之間的區別。

Spark還有一個內建方法describe,可以為玄貓提供一些此類指標:

dfNetflixTitlesWithNumMetrics
.filter(trim(lower($"country")) === "united states") // 過濾出美國的資料
.describe("num_cast_member", "num_director") // 描述演員和導演數量欄位
.show()

這是它的輸出:

+-------+-----------------+------------------+
|summary| num_cast_member| num_director|
+-------+-----------------+------------------+
| count| 2818| 2818|
| mean|7.072036905606813|1.1014904187366927|
| stddev|5.332680221863612|0.5073033316490272|
| min| 1| 1|
| max| 50| 13|
+-------+-----------------+------------------+

在玄貓對資料集呼叫describe之前,玄貓會將記錄過濾到僅限於美國,並專注於每個節目的演員和導演數量。describe方法為玄貓選擇的欄位提供了計數(count)平均值(mean)標準差(standard deviation)最小值(minimum)最大值(maximum)

玄貓還可以使用ScalaSpark進行更進階的分析。在下一節中,玄貓將介紹窗口函數以及如何使用它們。

善用進階窗口函數

窗口函數(Window functions)是一種在特定記錄組或記錄的滑動子集上執行計算的方法。它們可以用於執行累積計算、從相對於當前正在處理的記錄位置的其他記錄中獲取值,以及執行排名計算。它們還可以使用聚合函數,例如countavgminmax。讓玄貓現在看看它們:

val windowSpecRatingMonth =
Window.partitionBy("rating", "month") // 依據評級和月份分區
.orderBy("year", "month") // 在每個分區內依據年份和月份排序

窗口是透過Window.partitionByorderBy定義的。玄貓定義了一個名為windowSpecRatingMonth的窗口,它將資料按ratingmonth分區,然後在每個分區內按yearmonth排序。

val dfWindowedLagLead = dfNetflixUSMetrics
.withColumn(
"cast_per_director",
$"avg_num_cast_member" / $"avg_num_director") // 計算每個導演的平均演員數量
.withColumn(
"previous_avg_cast_member",
lag("avg_num_cast_member", 1) // 獲取前一個月份的平均演員數量
.over(windowSpecRatingMonth))
.withColumn(
"next_avg_cast_member",
lead("avg_num_cast_member", 1) // 獲取下一個月份的平均演員數量
.over(windowSpecRatingMonth))
.withColumn(
"diff_prev_curr_num_cast",
abs($"avg_num_cast_member" - // 計算當前與前一個月份平均演員數量的絕對差值
$"previous_avg_cast_member"))
.drop("max_num_director", // 移除不必要的欄位
"min_num_director",
"avg_num_director")

此圖示:聚合與窗口函數應用流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

package "輸入資料" as InputData {
rectangle "dfNetflixTitlesWithNumMetrics: DataFrame" as InitialDF
}

package "基礎過濾與聚合" as BasicAgg {
component "filter(country === 'united states')" as FilterUS
component "groupBy(rating, year(date_added), month(date_added))" as GroupByTimeRating
component "agg(count, avg_num_director, avg_num_cast_member, ...)" as Aggregations
rectangle "dfNetflixUSMetrics: DataFrame" as USMetricsDF
}

package "描述性統計" as DescriptiveStats {
component "describe(\"num_cast_member\", \"num_director\")" as DescribeMetrics
rectangle "dfDescription: DataFrame" as DescriptionDF
}

package "窗口函數應用" as WindowFunctions {
component "Window.partitionBy(\"rating\", \"month\").orderBy(\"year\", \"month\")" as WindowSpec
component "withColumn(\"cast_per_director\", ...)" as CastPerDirector
component "withColumn(\"previous_avg_cast_member\", lag(...).over(windowSpec))" as LagFunction
component "withColumn(\"next_avg_cast_member\", lead(...).over(windowSpec))" as LeadFunction
component "withColumn(\"diff_prev_curr_num_cast\", abs(...))" as DiffFunction
component "drop(...)" as DropColumns
rectangle "dfWindowedLagLead: DataFrame" as FinalWindowedDF
}

InitialDF --> FilterUS
FilterUS --> GroupByTimeRating
GroupByTimeRating --> Aggregations
Aggregations --> USMetricsDF

USMetricsDF --> DescribeMetrics
DescribeMetrics --> DescriptionDF : 產生統計摘要

USMetricsDF --> WindowSpec
WindowSpec --> CastPerDirector
CastPerDirector --> LagFunction
LagFunction --> LeadFunction
LeadFunction --> DiffFunction
DiffFunction --> DropColumns
DropColumns --> FinalWindowedDF : 應用窗口函數後的最終資料

note right of USMetricsDF
- 包含按評級、年份、月份聚合的指標
end note

note right of FinalWindowedDF
- 包含基於時間序列的領先/滯後分析
- 移除冗餘欄位
end note

@enduml

看圖說話:

此圖示展示了從初始資料到應用窗口函數的複雜分析流程。首先,dfNetflixTitlesWithNumMetrics作為輸入,經過基礎過濾與聚合階段。在這個階段,資料被filter篩選出美國的內容,然後按ratingdate_addedyearmonth進行groupBy,並執行多種聚合操作(countavgmaxmin),生成dfNetflixUSMetrics

接著,dfNetflixUSMetrics被用於兩個不同的分析路徑:

  1. 描述性統計:透過describe方法對num_cast_membernum_director欄位進行快速統計摘要,生成dfDescription
  2. 窗口函數應用:這是更深入的分析。首先定義了一個WindowSpec,它根據ratingmonth進行分區,並按yearmonth排序。然後,一系列withColumn操作被應用:計算cast_per_director,並利用laglead窗口函數獲取前一個和下一個月份的平均演員數量,進而計算出diff_prev_curr_num_cast。最後,drop操作移除了不再需要的欄位,生成了最終的dfWindowedLagLead

這個流程圖清晰地展示了如何逐步地從宏觀聚合到微觀的時序分析,並利用Spark提供的強大功能來提取資料中的深層模式和趨勢。

第六章:深入理解資料轉換

善用進階窗口函數

要使用窗口,玄貓可以將其與窗口函數結合,在使用withColumn方法創建衍生欄位時應用於DataFrame

在上述程式碼中,玄貓使用了來自前一節的dfNetflixUSMetrics DataFrame。玄貓將使用withColumn創建一個名為cast_per_director的新欄位,計算每個導演的平均演員數量。接著,玄貓將使用玄貓的窗口函數創建兩個新欄位。第一個使用窗口函數的欄位將獲取相對於當前正在處理的行,前一個平均演員數量。玄貓透過使用lag函數來做到這一點,傳遞兩個參數:玄貓感興趣的欄位和1,這告訴lag函數從前一條記錄中獲取值。玄貓將使用的第二個函數是lead函數。這與玄貓的lag函數類似,但在這種情況下,它從下一條記錄中獲取玄貓想要的欄位值。玄貓可以在以下結果集中看到模式:

+------+----+-----+-------------+-------------+------------+
|rating|year|month| prev| curr| next|
+------+----+-----+-------------+-------------+------------+
| PG|2017| 4| null| 6.0| 13.0|
| PG|2018| 4| 6.0| 13.0|8.3333333334|
| PG|2019| 4| 13.0|8.33333333334| 10.75|
| PG|2020| 4|8.33333333334| 10.75| 9.0|
| PG|2021| 4| 10.75| 9.0| null|
+------+----+-----+-------------+-------------+------------+

現在,讓玄貓看看排名如何與窗口一起工作。玄貓將定義一個新的窗口規範並向玄貓的資料集添加一個新欄位,如下所示:

val windowSpecRating =
Window.partitionBy("rating", "year") // 依據評級和年份分區
.orderBy($"avg_num_cast_member".desc) // 在每個分區內依據平均演員數量降序排序

val dfWindowedRank = dfNetflixUSMetrics
.withColumn("dense_rank", dense_rank() // 添加 dense_rank 欄位
.over(windowSpecRating))

在玄貓的新窗口中,玄貓正在按ratingyear進行分區,並按avg_num_cast_member降序排序。玄貓使用withColumn向玄貓的資料集添加一個名為dense_rank的新欄位。

玄貓使用dense_rank函數定義此欄位,並在玄貓的窗口規範上運行排名。dense_rank函數在排名中不會留下任何間隙。這意味著在排名並列的情況下,並列之後的下一條記錄將是下一個值。

看看玄貓操作的結果:

+------+----+-------------------+----------+
|rating|year|avg_num_cast_member|dense_rank|
+------+----+-------------------+----------+
| PG|2012| 2.0| 1|
| PG|2013| 10.0| 1|
| PG|2014| 8.0| 1|
| PG|2014| 5.0| 2|
| PG|2015| 10.0| 1|
| PG|2016| 11.0| 1|
| PG|2017| 10.5| 1|
| PG|2017| 10.0| 2|
| PG|2017| 8.0| 3|
| PG|2017| 8.0| 3|
| PG|2017| 6.0| 4|
| PG|2018| 15.0| 1|
| PG|2018| 13.0| 2|
| PG|2018| 12.0| 3|
| PG|2018| 10.5| 4|
| PG|2018| 10.5| 4|
| PG|2018| 9.0| 5|
| PG|2018| 9.0| 5|
| PG|2018| 8.5| 6|
| PG|2018| 6.0| 7|
+------+----+-------------------+----------+

有許多不同類型的窗口函數。玄貓可以參考Spark文件以了解更多關於窗口及其函數的資訊。

在下一節中,玄貓將學習如何處理複雜資料類型

處理複雜資料集類型

在現實世界中,玄貓經常需要處理不符合標準表格格式的資料,即每條記錄的每個欄位只有一個值。玄貓之前在netflix titles CSV檔案的castdirector欄位中看到了一點,但是當玄貓遇到更複雜的結構時會發生什麼?

在本節中,玄貓將向玄貓展示如何管理半結構化資料中的巢狀資料,例如XMLJSON。考慮以下程式碼:

val dfDevicesJson = spark.read.json(
"src/main/scala/com/packt/dewithscala/chapter6/data/devices.json")
dfDevicesJson.printSchema()

這是輸出:

root
|-- country: string (nullable = true)
|-- device_id: string (nullable = true)
|-- event_ts: timestamp (nullable = true)
|-- event_type: string (nullable = true)
|-- id: long (nullable = true)
|-- line: string (nullable = true)
|-- manufacturer: string (nullable = true)
|-- observations: array (nullable = true)
| |-- element: double (containsNull = true)

此圖示:進階窗口函數與複雜資料處理流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

package "輸入資料" as InputData {
rectangle "dfNetflixUSMetrics: DataFrame" as USMetricsDF
}

package "窗口函數 - 領先/滯後分析" as LagLeadAnalysis {
component "Window.partitionBy(\"rating\", \"month\").orderBy(\"year\", \"month\")" as WindowSpecRatingMonth
component "withColumn(\"cast_per_director\", ...)" as CalcCastPerDirector
component "withColumn(\"previous_avg_cast_member\", lag(...).over(windowSpecRatingMonth))" as LagFunction
component "withColumn(\"next_avg_cast_member\", lead(...).over(windowSpecRatingMonth))" as LeadFunction
component "withColumn(\"diff_prev_curr_num_cast\", abs(...))" as DiffFunction
component "drop(...)" as DropColumnsLagLead
rectangle "dfWindowedLagLead: DataFrame" as WindowedLagLeadDF
}

package "窗口函數 - 排名分析" as RankingAnalysis {
component "Window.partitionBy(\"rating\", \"year\").orderBy($\"avg_num_cast_member\".desc)" as WindowSpecRatingYear
component "withColumn(\"dense_rank\", dense_rank().over(windowSpecRatingYear))" as DenseRankFunction
rectangle "dfWindowedRank: DataFrame" as WindowedRankDF
}

package "複雜資料類型處理" as ComplexDataHandling {
file "devices.json" as JsonFile
component "spark.read.json(\"devices.json\")" as ReadJson
component "printSchema()" as PrintSchemaJson
rectangle "dfDevicesJson: DataFrame" as DevicesJsonDF
}

USMetricsDF --> WindowSpecRatingMonth
WindowSpecRatingMonth --> CalcCastPerDirector
CalcCastPerDirector --> LagFunction
LagFunction --> LeadFunction
LeadFunction --> DiffFunction
DiffFunction --> DropColumnsLagLead
DropColumnsLagLead --> WindowedLagLeadDF : 包含領先/滯後分析結果

USMetricsDF --> WindowSpecRatingYear
WindowSpecRatingYear --> DenseRankFunction
DenseRankFunction --> WindowedRankDF : 包含排名分析結果

JsonFile --> ReadJson
ReadJson --> PrintSchemaJson
PrintSchemaJson --> DevicesJsonDF : 讀取 JSON 並顯示 Schema

note right of WindowedLagLeadDF
- 透過 lag/lead 函數進行時間序列比較
- 適用於趨勢分析
end note

note right of WindowedRankDF
- 透過 dense_rank 函數進行排名
- 處理並列情況,無間隙
end note

note right of DevicesJsonDF
- 示範 Spark 處理半結構化 JSON 資料
- 包含巢狀結構和陣列類型
end note

@enduml

看圖說話:

此圖示展示了Spark中進階資料處理的兩個主要面向:窗口函數的應用複雜資料類型的處理。首先,dfNetflixUSMetrics作為輸入,用於執行兩種不同類型的窗口函數分析

領先/滯後分析部分,定義了一個WindowSpecRatingMonth,它根據評級和月份進行分區,並按年份和月份排序。接著,透過withColumn計算了cast_per_director,並利用laglead函數獲取了前後時間點的平均演員數量,進而計算出差異,最終生成了dfWindowedLagLead。這類分析對於觀察時間序列趨勢和變化非常有用。

排名分析部分,定義了另一個WindowSpecRatingYear,它根據評級和年份分區,並按平均演員數量降序排序。然後,使用dense_rank函數為每個分區內的記錄進行排名,生成了dfWindowedRankdense_rank的特性是即使有並列,排名也不會出現間隙,這對於需要連續排名的場景非常重要。

最後,圖示轉向複雜資料類型處理,展示了如何讀取一個devices.json檔案。Spark能夠自動推斷其Schema,即使其中包含巢狀結構陣列類型(如observations欄位),並將其讀取為dfDevicesJson DataFrame,隨後透過printSchema()展示其結構。這突顯了Spark在處理半結構化資料方面的強大能力。

第六章結論

縱觀本章所探討的進階資料轉換技術,其核心價值在於驅動分析思維的典範轉移。傳統 groupBy 提供的是靜態的橫切面快照,而窗口函數則透過 laglead 及排名機制,揭示了資料內部蘊含的時間序列趨勢與相對表現等動態關係,這是簡單聚合無法觸及的深度。真正的瓶頸並非語法掌握,而是培養出能洞察何時需啟用時序或排名分析的策略性直覺。未來,隨著半結構化資料成為常態,將窗口函數的分析深度與處理巢狀結構的彈性無縫整合,將不再是「進階」選項,而是資料專業的基礎門檻。玄貓認為,熟練掌握這些轉換工具,是區分資料工匠與資料策略師的關鍵分水嶺,它代表著從單純執行指令到主動挖掘商業洞察的根本躍升。