在現代資料架構中,將分散的資料處理任務串連成穩定可靠的自動化流程,是資料工程領域的核心挑戰。隨著容器化技術普及,Kubernetes 已成為執行大規模應用程式的標準平台,而 Apache Spark 則是分散式運算的領導框架。然而,僅僅在 Kubernetes 上運行 Spark 作業並不足以構成完整的資料管道,仍需一個強大的編排工具來管理任務之間的複雜依賴關係、執行順序與錯誤處理。Argo Workflows 作為一個專為 Kubernetes 設計的雲原生工作流程引擎,正好填補了此一缺口。它透過宣告式 YAML 定義,讓開發者能以基礎設施即程式碼(IaC)的理念,將 Spark 作業無縫整合為工作流程中的一個步驟。這種模式不僅提升了資料管道的可視性與可管理性,也實現了更具彈性與擴展性的雲原生資料處理方案。

第十章:資料管道編排

使用Argo Workflows

helm install my-release spark-operator/spark-operator \
--namespace spark-operator --create-namespace

資料管道編排

208

  1. 然後,玄貓需要創建Spark所需的服務帳戶
kubectl create serviceaccount spark
kubectl create clusterrolebinding spark-role \
--clusterrole=cluster-admin --serviceaccount=spark-app:spark \
--namespace=spark-app
  1. 完成這些步驟後,玄貓可以透過以下方式運行玄貓的Spark應用程式:
kind: SparkApplication
metadata:
name: "spark-pi"
namespace: spark-app
spec:
timeToLiveSeconds: 3600
type: Scala
mode: cluster
image: apache/spark:v3.3.1
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local://///opt/spark/examples/jars/spark-examples_2.12-3.3.1.jar"
sparkConf:
spark.kubernetes.authenticate.driver.serviceAccountName: spark
sparkVersion: 3.0.0
driver:
memory: 1G
executor:
instances: 1
cores: 1
memory: 1G
  1. 為了運行Spark應用程式,將上述內容保存到一個名為spark-pi.yaml的檔案中,然後運行以下命令:
kubectl apply -f ./spark-pi.yaml
  1. 玄貓可以使用以下命令檢查Spark應用程式的狀態:
kubectl get sparkapp spark-pi -n spark-app

使用Argo Workflows

209

玄貓會看到以下內容:

NAME STATUS ATTEMPTS START FINISH AGE
spark-pi COMPLETED 1 2023-08-18T12:34:06Z 2023-08-18T12:34:19Z 5m49s
  1. 為了檢查pi的評估值,玄貓可以檢查驅動程式日誌:
kubectl logs spark-pi-driver -n spark-app | grep ^Pi

輸出將類似於:Pi is roughly 3.145675728378642

接下來,玄貓將轉到Argo工作流程。

創建一個Argo工作流程

為了本章的目的,玄貓將創建一個簡單的工作流程,它包含以下步驟:

  1. 工作流程開始時列印一條訊息。
  2. 運行一個Spark作業。
  3. 工作流程結束時列印一條訊息。

根據玄貓目前所涵蓋的內容,基本工作流程結構將如下所示:

kind: Workflow #k8s resource kind
metadata:
generateName: example-workflow- #有助於避免名稱衝突
namespace: spark-app #此工作流程將運行的k8s命名空間
spec:
entrypoint: dag-seq #工作流程將從dag-seq開始
templates:
- name: print-start-message
container:
#模板類型 container。將用於列印訊息
- name: calculate-pi
resource:
#模板類型 resource。將用於啟動一個spark作業
- name: print-termination-message
container:
#模板類型 container。將用於列印訊息
- name: dag-seq
dag: #模板類型 dag
tasks:
- name: start-message #任務名稱
template: print-start-message #要使用的模板名稱
- name: launch-spark-job
depends: start-message #launch-spark-job 在 start-message 之後啟動
template: calculate-pi
- name: termination-message
depends: launch-spark-job
template: print-termination-message

資料管道編排

210

玄貓所需要做的就是為各個模板提供詳細資訊。為了列印訊息,玄貓將使用Docker Hub中可用的busybox映像。以下是完整的工作流程:

kind: Workflow #k8s resource kind
metadata:
generateName: example-workflow- #有助於避免名稱衝突
namespace: spark-app #此工作流程將運行的k8s命名空間
spec:
entrypoint: dag-seq #工作流程將從dag-seq開始
templates:
- name: print-start-message
container:
image: busybox
imagePullPolicy: IfNotPresent
command: [echo]
args: ["Starting Argo Workflow!"]
- name: calculate-pi
resource:
action: create
successCondition: status.applicationState.state == COMPLETED
failureCondition: status.applicationState.state == FAILED
manifest: |
kind: SparkApplication
metadata:
generateName: "spark-pi-"

此圖示:Argo Workflows 整合 Spark 應用流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

actor "資料工程師" as DataEngineer

package "Kubernetes 環境" as K8sEnv {
rectangle "minikube" as Minikube
rectangle "kubectl CLI" as Kubectl
rectangle "spark-app 命名空間" as SparkNamespace
rectangle "Spark Operator" as SparkOperator
rectangle "Spark Service Account" as SparkSA
}

package "Argo Workflows" as ArgoWF {
rectangle "Argo Server & Controller" as ArgoServer
rectangle "Argo CLI" as ArgoCLI
rectangle "Argo Workflow 定義 (YAML)" as ArgoWorkflowDef
}

package "Spark 應用程式" as SparkApp {
rectangle "SparkApplication YAML" as SparkAppYAML
rectangle "Spark Pi Job" as SparkPiJob
}

DataEngineer --> Minikube : 啟動 minikube
DataEngineer --> Kubectl : 使用 kubectl
DataEngineer --> SparkNamespace : 創建 spark-app 命名空間
DataEngineer --> SparkOperator : 安裝 Spark Operator
DataEngineer --> SparkSA : 創建 Spark Service Account

DataEngineer --> ArgoServer : 部署 Argo Server & Controller
DataEngineer --> ArgoCLI : 安裝 Argo CLI

DataEngineer --> SparkAppYAML : 編寫 SparkApplication YAML
DataEngineer --> ArgoWorkflowDef : 編寫 Argo Workflow 定義

Kubectl --> SparkNamespace : 管理命名空間
Kubectl --> SparkOperator : 管理 Spark Operator
Kubectl --> SparkSA : 管理 Service Account
Kubectl --> SparkAppYAML : 應用 SparkApplication

ArgoCLI --> ArgoWorkflowDef : 提交 Argo Workflow

ArgoWorkflowDef --> ArgoServer : Argo Server 接收 Workflow
ArgoServer --> SparkNamespace : 在 spark-app 命名空間中執行
ArgoServer --> SparkAppYAML : 透過 Resource Template 創建 SparkApplication
SparkOperator --> SparkPiJob : Spark Operator 監控並運行 Spark Pi Job
SparkPiJob --> SparkNamespace : Spark Pi Job 在 spark-app 命名空間中運行

note right of ArgoWorkflowDef
- 包含 print-start-message (Container)
- 包含 calculate-pi (Resource, 嵌入 SparkApplication)
- 包含 print-termination-message (Container)
- 使用 DAG 模板定義任務依賴
end note

note right of SparkAppYAML
- 定義 Spark Pi 應用程式的詳細配置
- 如 image, mainClass, driver/executor 資源
end note

note right of SparkPiJob
- 運行結果可透過 driver log 查看
- 驗證 Pi 的計算值
end note

@enduml

看圖說話:

此圖示清晰地展示了Argo Workflows如何與Kubernetes環境中的Spark應用程式整合,以實現資料管道的編排。整個流程始於資料工程師Kubernetes環境中進行一系列準備工作,包括啟動minikube、使用kubectl CLI管理集群、創建專用的spark-app命名空間、安裝Spark Operator以及創建Spark Service Account。這些步驟為Spark應用程式在Kubernetes上運行提供了必要的基礎設施和權限。

同時,資料工程師也需要部署Argo Server & Controller並安裝Argo CLI,這是Argo Workflows的核心組件。一旦環境準備就緒,資料工程師便會編寫SparkApplication YAML來定義Spark作業的具體配置,並編寫Argo Workflow定義,其中包含了工作流程的邏輯和任務依賴。

Argo Workflow定義是一個關鍵點,它不僅定義了像「列印開始訊息」和「列印結束訊息」這樣的通用任務(透過Container模板實現),更重要的是,它透過Resource模板嵌入了SparkApplication YAML,使得Spark作業成為Argo工作流程中的一個步驟。這個工作流程使用DAG模板來定義任務之間的順序和依賴關係,確保Spark作業在正確的時機被觸發。

資料工程師透過Argo CLI提交Argo Workflow後,Argo Server會接收並解析這個工作流程。在執行到calculate-pi這個步驟時,Argo Server會根據Resource模板的定義,在spark-app命名空間中創建一個SparkApplication。隨後,Spark Operator會監控這個命名空間,一旦發現新的SparkApplication,便會負責啟動並運行Spark Pi Job。最終,資料工程師可以透過查看Spark驅動程式的日誌來驗證Pi的計算結果。整個過程展示了Argo Workflows如何作為一個強大的編排工具,將複雜的Spark應用程式無縫整合到自動化的資料管道中。

縱觀現代資料工程的複雜性,本章的核心價值不僅在於展示如何運行單一的 Spark 作業,而是揭示了一種更為優雅且具備高度擴展性的雲原生編排範式。Argo Workflows 與 Spark on Kubernetes 的整合,完美體現了「關注點分離 (Separation of Concerns)」此一關鍵的系統設計哲學。

此模式的突破性在於,Argo 作為編排層,無需深入理解 Spark 的內部運作機制;它僅需透過標準的 Kubernetes API,以宣告式(Declarative)的方式管理 SparkApplication 這個自定義資源(CRD)。這種透過 resource 模板直接操作 CRD 的整合方式,遠比傳統需仰賴特定 API 或外掛程式的排程器更為簡潔、通用且強大。它成功將複雜的資料作業抽象化為標準的 Kubernetes 工作負載,從而統一了監控、日誌與生命週期管理,顯著降低了維運的認知負擔與系統耦合度。

展望未來,這種「以編排器驅動 CRD」的模式將成為雲原生資料平台的標準架構。其價值遠不止於 Spark,從資料庫遷移、機器學習模型訓練到基礎設施配置,任何可被定義為 Kubernetes 資源的任務,都能被無縫納入 Argo 的統一管理框架。這預示著一個以 Kubernetes 為通用介面的「資料作業系統」生態正在加速成形。

玄貓認為,精通此種宣告式的編排範式,已不僅是提升效率的技術選項,更是高階資料工程師與架構師建立可觀測、高韌性自動化資料平台的關鍵能力。它代表著從「執行任務」到「設計系統」的思維躍遷,是通往成熟 DataOps 實踐的核心路徑。