只有一個元件的 pipeline 並不是很有趣。在下一個例子中,我將自定義輕量級 Python 函式的容器。我們將建立一個新的 pipeline,它安裝並匯入額外的 Python 函式庫,從指定的基礎映像建構,並在容器之間傳遞輸出。

我們將建立一個 pipeline,它將一個數字除以另一個數字,然後加上第三個數字。首先,讓我們建立一個簡單的加法函式:

def add(a: float, b: float) -> float:
    '''計算兩個引數的總和'''
    return a + b

add_op = comp.func_to_container_op(add)

這個函式非常簡單,接受兩個浮點數引數 ab,並回傳它們的和。我們使用 func_to_container_op 將這個函式轉換為 Kubeflow Pipeline 操作,儲存在 add_op 變數中,以便後續在 pipeline 中使用。

接下來,讓我們建立一個稍微複雜的函式。此外,讓這個函式需要並從非標準 Python 函式庫 numpy 匯入。這必須在函式內部完成,因為筆記本中的全域匯入不會被封裝到我們建立的容器中。當然,確保我們的容器已安裝了我們正在匯入的函式庫也很重要。

為此,我們將把要使用的特定容器作為基礎映像傳遞給 .func_to_container_op()

from typing import NamedTuple

def my_divmod(dividend: float, divisor: float) -> \
    NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float)]):
    '''將兩個數字相除並計算商和餘數'''
    # 在元件函式內部匯入
    import numpy as np
    
    # 這個函式演示如何在元件函式內使用巢狀函式
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)
    
    (quotient, remainder) = divmod_helper(dividend, divisor)
    
    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput', ['quotient', 'remainder'])
    return divmod_output(quotient, remainder)

divmod_op = comp.func_to_container_op(
    my_divmod, base_image='tensorflow/tensorflow:1.14.0-py3')

這個函式更加複雜,它執行除法運算並回傳商和餘數。以下是關鍵點:

  1. 函式內部匯入 numpy 函式庫,確保它在容器執行時可用
  2. 使用巢狀函式 divmod_helper 來執行實際計算
  3. 回傳一個命名元組,包含商和餘數兩個值
  4. 指定基礎映像為 TensorFlow 的 Python 3 容器,這確保容器中已安裝了 numpy 函式庫

現在我們將構建一個 pipeline。這個 pipeline 使用前面定義的函式 my_divmodadd 作為階段:

@dsl.pipeline(
    name='Calculation pipeline',
    description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
    a='a',
    b='7',
    c='17',
):
    # 將 pipeline 引數和常數值作為操作引數傳遞
    add_task = add_op(a, 4)  # 回傳 dsl.ContainerOp 類別例項
    
    # 將任務輸出參照作為操作引數傳遞
    # 對於具有單個回傳值的操作,可以使用 `task.output` 
    # 或 `task.outputs['output_name']` 語法存取輸出參照
    divmod_task = divmod_op(add_task.output, b)
    
    # 對於具有多個回傳值的操作,可以使用 `task.outputs['output_name']` 語法存取輸出參照
    result_task = add_op(divmod_task.outputs['quotient'], c)

這個 pipeline 定義了一個計算流程,展示瞭如何在 Kubeflow Pipelines 中組合多個操作:

  1. 首先,add_task 將輸入引數 a 與常數 4 相加
  2. 然後,divmod_task 使用前一步的輸出(add_task.output)與引數 b 進行除法運算
  3. 最後,result_task 將除法運算的商(divmod_task.outputs['quotient'])與引數 c 相加

注意操作之間的依賴關係是透過資料流自動推斷的。當一個操作使用另一個操作的輸出作為輸入時,Kubeflow Pipelines 會自動確保它們按正確的順序執行。

Pipeline 元件間的資料傳遞機制

在構建複雜的機器學習工作流程時,理解元件間如何傳遞資料至關重要。Kubeflow Pipelines 提供了靈活的機制來處理這一點:

  1. 單一回傳值:對於只回傳一個值的操作,可以使用 task.output 直接存取其輸出
  2. 多回傳值:對於回傳多個值的操作,使用 task.outputs['output_name'] 來存取特定輸出

這種設計使得構建複雜的資料處理流程變得直觀。資料從一個元件流向另一個元件的方式清晰可見,同時 Kubeflow 會自動處理底層的資料傳輸細節。

擴充套件 Pipeline 的功能

雖然我們的範例相對簡單,但 Kubeflow Pipelines 的功能遠不止於此。在實際的機器學習專案中,你可以:

  1. 整合複雜的 ML 框架:如 TensorFlow、PyTorch 或 scikit-learn
  2. 處理大規模資料:透過掛載外部儲存或使用資料處理函式庫
  3. 平行化工作流程:讓獨立的任務同時執行,提高效率
  4. 條件執行:根據前一步驟的結果決定下一步操作
  5. 重用元件:將常用操作封裝為可重用的元件

深入理解 Pipeline 的執行模型

Kubeflow Pipelines 的執行建立在 Argo 工作流程引擎之上。當你提交一個 pipeline 時,它會被轉換為 Argo 工作流程,然後在 Kubernetes 叢集上執行。每個步驟都在獨立的容器中執行,這提供了隔離性和可重現性。

這種根據容器的執行模型帶來了幾個重要優勢:

  1. 可重現性:每個步驟都在定義明確的環境中執行
  2. 隔離性:不同步驟之間不會相互幹擾
  3. 可擴充套件性:可以輕鬆擴充套件到大規模計算資源
  4. 可觀察性:每個步驟的執行都可以被監控和記錄

使用 Pipeline 的最佳實踐

在我使用 Kubeflow Pipelines 的經驗中,發現以下幾點最佳實踐特別有用:

  1. 模組化設計:將 pipeline 拆分為獨立、可重用的元件
  2. 引數化:使用引數使 pipeline 可設定,而不是硬編碼值
  3. 錯誤處理:在每個步驟中增加適當的錯誤處理邏輯
  4. 資源管理:為每個步驟指定合適的資源需求(CPU、記憶體、GPU)
  5. 版本控制:對 pipeline 定義進行版本控制,確保可重現性

結合 Notebook 與 Pipeline 開發

Kubeflow 的一個強大特性是能夠無縫結合互動式筆記本開發和自動化 pipeline 執行。這種方法允許資料科學家在筆記本中探索和開發,然後輕鬆將工作轉換為可重複執行的 pipeline。

在開發流程中,我通常遵循這樣的模式:

  1. 在筆記本中探索資料和開發模型
  2. 將成熟的程式碼重構為 pipeline 元件
  3. 組合這些元件成為完整的 pipeline
  4. 測試並迭代 pipeline
  5. 佈署 pipeline 用於生產或定期執行

這種方法結合了筆記本的靈活性和 pipeline 的自動化能力,大提高了機器學習工作流程的效率。

Kubeflow Pipelines 為機器學習工作流程自動化提供了強大而靈活的框架。透過本文介紹的基礎知識和範例,你應該能夠開始構建自己的 pipeline,無論是簡單的資料處理流程還是複雜的模型訓練和佈署工作流程。隨著你對 Kubeflow Pipelines 的深入瞭解,你將發現它能夠處理越來越複雜的機器學習場景,成為你 ML 工程工具箱中不可或缺的一部分。

提交管道並執行:從開發到佈署

完成管道定義後,我們需要使用 Kubeflow 客戶端將其提交至執行環境。這個過程會回傳執行與實驗的連結,讓我們能夠追蹤整個工作流程的進度。

client = kfp.Client()
# 指定管道引數值
# arguments = {'a': '7', 'b': '8'} # 根據新版本調整適當引數
# 提交管道執行
client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)

這段程式碼實際上是管道執行的關鍵。kfp.Client() 初始化一個連線到 Kubeflow 後端的客戶端,而 create_run_from_pipeline_func 方法則直接從我們定義的函式建立並執行管道。這比起先編譯成 YAML 或 zip 檔再上載的方式更為直接。arguments 字典用於傳遞引數到管道中,相當於管道的入口引數。

當你執行這段程式碼後,Kubeflow 會回傳一個網頁連結,點選後可以看到如圖所示的執行介面。這個介面會視覺化整個管道的執行流程,顯示每個步驟的狀態、結果和日誌,讓你能夠直觀地監控整個工作流程。

輕量級 Python 函式的強大能力

值得注意的是,所謂的「輕量級」Python 函式指的是建立這些步驟的便利性,而非功能的限制。事實上,我們可以:

  1. 使用自定義的 Python 套件
  2. 指定基礎映像檔
  3. 在容器間傳遞小型結果

這種靈活性讓我們能夠建構複雜而強大的機器學習工作流程,同時保持程式碼的簡潔與可維護性。

接下來,我將探討如何透過掛載儲存卷的方式在容器間傳輸更大型的資料檔案。

使用註解簡化管道開發流程

你可能已經注意到,直接呼叫 comp.func_to_container_op 的方式有些冗長。為了簡化這個過程,我們可以建立一個回傳 kfp.dsl.ContainerOp 的函式。雖然在實際開發中,過於龐大的函式並不總是理想選擇,但這裡提供一個簡化寫法的參考。

值得注意的是,增加 @kfp.dsl.component 註解會指示 Kubeflow 編譯器啟用靜態型別檢查:

@kfp.dsl.component
def my_component(my_param):
    ...
    return kfp.dsl.ContainerOp(
        name='My component name',
        image='gcr.io/path/to/container/image'
    )

這段程式碼展示瞭如何使用 @kfp.dsl.component 註解來簡化元件定義。這個裝飾器會自動處理將函式轉換為容器操作的過程,同時啟用靜態型別檢查,提高程式碼的穩定性。函式內部回傳一個 ContainerOp 物件,指定了元件名稱和容器映像檔。這種方式比起每次手動呼叫 func_to_container_op 更為簡潔。

將這些元件整合到管道中的方式如下:

@kfp.dsl.pipeline(
    name='My pipeline',
    description='My machine learning pipeline'
)
def my_pipeline(param_1: PipelineParam, param_2: PipelineParam):
    my_step = my_component(my_param='a')

這段程式碼展示瞭如何使用 @kfp.dsl.pipeline 裝飾器定義一個完整的管道。裝飾器接受管道的名稱和描述作為引數。管道函式本身可以接受多個 PipelineParam 型別的引數,這些引數可以在管道執行時從外部傳入。在管道函式內部,我們可以直接呼叫前面定義的元件函式,傳入所需引數,從而建立管道的執行步驟。這種方式讓管道的定義變得更加直觀和簡潔。

步驟間的資料儲存策略

在前面的例子中,容器之間傳遞的資料量較小,與為基本型別(如數字、字串、列表和陣列)。然而,在實際應用中,我們通常需要傳遞更大型的資料(例如整個資料集)。Kubeflow 提供了兩種主要方法來實作這一點:

  1. Kubernetes 叢集內的永續性儲存區
  2. 雲端儲存選項(如 S3)

但每種方法都有其固有的問題和限制。

永續性儲存區抽象化儲存層

永續性儲存區抽象化了底層儲存。根據供應商不同,永續性儲存區的佈建可能較慢,與有 I/O 限制。你應該確認你的供應商是否支援 ReadWriteMany 儲存類別,這允許多個 Pod 存取儲存,這在某些型別的平行處理中是必要的。

儲存類別可以是以下之一:

  • ReadWriteOnce:卷可以被單個節點掛載為讀寫
  • ReadOnlyMany:卷可以被多個節點掛載為只讀
  • ReadWriteMany:卷可以被多個節點掛載為讀寫

你的系統或叢集管理員可能夠增加 ReadWriteMany 支援。此外,許多雲端提供商包含他們專有的 ReadWriteMany 實作,例如 GKE 上的動態佈建,但請確保沒有單節點瓶頸。

Kubeflow Pipelines 的 VolumeOp 允許你建立一個自動管理的永續性儲存區:

dvop = dsl.VolumeOp(name="create_pvc",
                    resource_name="my-pvc-2",
                    size="5Gi",
                    modes=dsl.VOLUME_MODE_RWO)

這段程式碼建立了一個 5GB 大小的永續性儲存區宣告(PVC)。VolumeOp 是 Kubeflow 提供的一個便捷操作,它會自動處理 Kubernetes 永續性儲存區的建立和生命週期管理。name 引數指定了操作的名稱,resource_name 是建立的 PVC 資源名稱,size 定義了儲存大小,而 modes 指定了存取模式(這裡是 ReadWriteOnce)。

要將此卷增加到你的操作中,只需呼叫 add_pvolumes 並提供一個掛載點到卷的字典:

download_data_op(year).add_pvolumes({"/data_processing": dvop.volume})

物件儲存解決方案

雖然在 Kubeflow 範例中較少見,但在某些情況下,使用物件儲存解決方案可能更為合適。MinIO 提供了雲原生物件儲存,它可以作為現有物件儲存引擎的閘道器或獨立執行。這種方法在需要確保解決方案在多個雲端提供商之間的可移植性時特別有用。

Kubeflow 內建的 file_output 機制會自動在管道步驟之間將指定的本地檔案傳輸到 MinIO。要使用 file_output,只需在容器中本地寫入檔案,並在 ContainerOp 中指定引數:

fetch = kfp.dsl.ContainerOp(name='download',
                           image='busybox',
                           command=['sh', '-c'],
                           arguments=[
                               'sleep 1;'
                               'mkdir -p /tmp/data;'
                               'wget ' + data_url +
                               ' -O /tmp/data/results.csv'
                           ],
                           file_outputs={'downloaded': '/tmp/data'})
# 這裡期望一個輸入目錄,而非單個檔案

這段程式碼定義了一個下載資料的容器操作。它使用 busybox 映像檔執行 shell 命令,建立一個臨時目錄,然後從指定的 URL 下載資料到該目錄中。關鍵在於 file_outputs 引數,它定義了一個鍵值對,其中 ‘downloaded’ 是輸出的名稱,而 ‘/tmp/data’ 是容器中的路徑。Kubeflow 會自動將該路徑中的內容上載到 MinIO,並使其可用於下一個管道步驟。這種機制大簡化了管道步驟間的資料傳遞。

如果你不想使用 MinIO,也可以直接使用你的提供商的物件儲存,但這可能會影響一些可移植性。

在任何機器學習管道中,能夠本地掛載資料都是一個基本任務。這裡我們簡要概述了多種方法,並為每種方法提供了範例。

Kubeflow Pipelines 元件介紹

Kubeflow Pipelines 建立在 Argo Workflows 之上,後者是一個開放原始碼的、容器原生的 Kubernetes 工作流程引擎。在這一節中,我將描述 Argo 的工作原理、功能,以及 Kubeflow Pipeline 如何補充 Argo,使其更容易被資料科學家使用。

Argo:管道的基礎

Kubeflow 安裝時會包含所有 Argo 元件。雖然不需要在你的電腦上安裝 Argo 也能使用 Kubeflow Pipelines,但擁有 Argo 命令列工具可以更容易地理解和除錯你的管道。

預設情況下,Kubeflow 將 Argo 設定為使用 Docker 執行器。如果你的平台不支援 Docker API,你需要切換到相容的執行器。這可以透過更改 Argo 引數檔案中的 containerRuntimeExecutor 值來完成。

在 macOS 上,你可以使用 Homebrew 安裝 Argo:

#!/bin/bash
# 下載二進位檔
curl -sLO https://github.com/argoproj/argo/releases/download/v2.8.1/argo-linux-amd64
# 使二進位檔可執行
chmod +x argo-linux-amd64
# 移動二進位檔到路徑
mv ./argo-linux-amd64 ~/bin/argo

這個 shell 指令碼展示瞭如何手動安裝 Argo CLI 工具。首先使用 curl 下載特定版本的 Argo 二進位檔,然後透過 chmod 命令使其可執行,最後將其移動到使用者的 bin 目錄,使其可以在任何地方執行。雖然這是 Linux 版本的安裝方式,但在 macOS 上更推薦使用 Homebrew 進行安裝。擁有 Argo CLI 工具對於除錯和管理 Kubeflow 管道非常有幫助,尤其是當你需要深入瞭解工作流程執行細節時。

你可以透過在 Kubeflow 名稱空間中執行 Argo 範例來驗證你的 Argo 安裝:

$ argo list -n kubeflow
NAME                STATUS    AGE   DURATION
loops-maps-4mxp5    Succeeded 30m   12s
hello-world-wsxbr   Succeeded 39m   15s

這個命令列出了在 kubeflow 名稱空間中執行的所有 Argo 工作流程。輸出顯示了每個工作流程的名稱、狀態、年齡(已執行時間)以及持續時間。由於 Kubeflow 管道實際上是使用 Argo 工作流程實作的,因此這個命令也可以用來檢視管道執行情況。這為除錯提供了另一個視角,特別是當 Kubeflow UI 無法存取或提供足夠詳細訊息時。

由於管道是使用 Argo 實作的,你也可以使用相同的技術來檢視它們。你還可以取得特定工作流程執行的訊息:

$ argo get hello-world-wsxbr -n kubeflow
Name:                hello-world-wsxbr
Namespace:           kubeflow
ServiceAccount:      default
Status:              Succeeded
Created:             Tue Feb 12 10:05:04 -0600 (2 minutes ago)
Started:             Tue Feb 12 10:05:04 -0600 (2 minutes ago)
Finished:            Tue Feb 12 10:05:23 -0600 (1 minute ago)
Duration:            19 seconds

STEP                 PODNAME             DURATION  MESSAGE
✔ hello-world-wsxbr  hello-world-wsxbr   18s

這個命令取得了特定 Argo 工作流程的詳細訊息。它顯示了工作流程的基本訊息(名稱、名稱空間、服務賬戶)、狀態訊息(狀態、建立/開始/完成時間、總持續時間)以及執行步驟的詳細訊息。這種深入的檢視對於理解工作流程的執行過程和診斷潛在問題非常有價值。hello-world-wsxbr 是之前使用 argo list -n kubeflow 命令取得的工作流程名稱,在你的環境中會有所不同。

我們還可以透過使用以下命令檢視執行日誌:

$ argo logs hello-world-wsxbr -n kubeflow

這個命令顯示指定 Argo 工作流程的執行日誌內容對於除錯問題至關重要,因為它們包含了工作流程中每個步驟的詳細輸出。當管道執行失敗或行為異常時,這些日誌是診斷問題的第一手資料。透過 Argo CLI 檢視日誌比透過 Kubeflow UI 更靈活,特別是當你需要過濾、搜尋或處理大量日誌訊息時。

Kubeflow 與 Argo 的協同:實作機器學習工作流程自動化

Kubeflow Pipelines 之所以能夠提供強大的機器學習工作流程管理能力,很大程度上得益於其底層的 Argo Workflows 引擎。這種協同關係讓資料科學家能夠專注於模型開發,而不必深入瞭解 Kubernetes 的複雜性。

在實際的機器學習專案中,我發現 Kubeflow 的真正價值在於它能夠無縫連線不同階段的工作流程。從資料預處理、特徵工程到模型訓練和佈署,整個過程可以被協調成一個端對端的自動化管道。這不僅提高了效率,還增強了可重複性和可追蹤性。

對於團隊協作,Kubeflow 的元件化設計允許不同專長的成員(如資料工程師和機器學習工程師)獨立開發他們的部分,然後輕鬆地將這些部分組合成一個完整的工作流程。這種模組化方法大提高了團隊的生產力和程式碼的可維護性。

在處理大規模資料時,合理選擇資料傳遞方式至關重要。對於小型資料,直接透過引數傳遞是最簡單的方法;對於中等大小的資料,MinIO 提供了良好的平衡;而對於大型資料集,使用永續性儲存區或雲端原生儲存可能是更好的選擇。在實踐中,我通常會根據具體場景混合使用這些方法,以獲得最佳效能和可靠性。

Kubeflow Pipelines 的另一個強大特性是其可擴充套件性。透過自定義元件,你可以整合幾乎任何工具或服務到你的管道中。這使得它能夠適應各種機器學習工作流程的需求,無論是傳統的批處理任務還是實時推理服務。

隨著機器學習專案的複雜度增加,有效管理資源和依賴關係變得越來越重要。Kubeflow 的資源請求和限制設定,以及條件執行和錯誤處理能力,為構建健壯的生產級管道提供了必要的工具。

總之,掌握 Kubeflow Pipelines 不僅是學習一個工具,更是採納一種方法論,使機器學習工作流程更加結構化、自動化和可管理。透過本文介紹的技術和最佳實踐,你可以建立起高效、可靠的機器學習管道,加速從概念到產品的轉化過程。

掌握Argo執行日誌:Kubeflow Pipelines

Kubeflow Pipeline元件的靈活運用

在機器學習工作流程中,元件的靈活運用是提高開發效率的關鍵。Kubeflow Pipeline提供了多種方式來使用和整合元件,讓我們能夠構建強大而靈活的工作流程。

元件載入策略:確保版本穩定性

在前面的文章中,我們已經直接使用了Kubeflow的基礎構建模組和func_to_container元件。值得注意的是,Kubeflow元件有兩種主要型別:一種可以像普通Python程式碼一樣直接匯入,例如func_to_container;另一種則需要透過Kubeflow的component.yaml系統進行載入。

根據我的實踐經驗,處理Kubeflow元件的最佳方式是下載特定標籤(tag)的程式碼函式庫,這樣可以使用load_component_from_file函式來載入元件:

# 下載特定版本的Kubeflow Pipelines
wget https://github.com/kubeflow/pipelines/archive/0.2.5.tar.gz
# 解壓縮檔案
tar -xvf 0.2.5.tar.gz

這段命令首先下載了Kubeflow Pipelines的0.2.5版本,然後將其解壓縮。使用特定版本而非master分支的好處是確保穩定性,避免因為API變更而導致的問題。這是一種在生產環境中管理依賴的最佳實踐。

避免直接從GitHub載入

雖然Kubeflow提

資料與特徵準備:機器學習模型成功的根本

在機器學習專案中,資料與特徵準備往往是最耗時但也最關鍵的環節。根據玄貓多年開發經驗,一個模型的成敗有時並非取決於演算法的選擇,而是資料的品質和特徵的設計。優質的資料準備工作能顯著提升模型效能,而糟糕的特徵工程則可能導致重要關聯的遺失。

資料準備與特徵工程的重要性

特徵準備(Feature preparation)或稱特徵工程(Feature engineering)指的是將原始輸入資料轉換成機器學習模型可使用的特徵。這個過程若處理不當,可能導致重要關聯的遺失,例如:

  • 線性模型中未展開的非線性項
  • 深度學習模型中方向不一致的影像

資料和特徵準備的微小變化可能導致模型輸出的顯著差異。在實踐中,我發現採用迭代方法最為有效,隨著對問題和模型理解的深入,不斷回顧和最佳化資料與特徵準備過程。

Kubeflow Pipelines 正是為這種迭代過程設計的工具,它讓我們能夠更輕鬆地迭代資料和特徵準備工作。在後續章節中,我們還會探討如何利用超引數調整來進一步最佳化這個過程。

選擇合適的資料處理工具

資料和特徵準備工具種類別繁多,大致可分為分散式和本地兩類別:

  • 本地工具:在單機上執行,提供極大的靈活性
  • 分散式工具:在多台機器上執行,能處理更大、更複雜的任務

選擇錯誤的工具可能導致後期需要大幅修改程式碼。在實際專案中,我常根據以下原則做出選擇:

若輸入資料規模較小,單機工具就能滿足需求。但隨著資料量增長,通常需要使用分散式工具進行處理或抽樣。即使是較小的資料集,分散式系統如 Apache Spark、Dask 或 TFX with Beam 也可能提供更快的處理速度,但可能需要學習新工具。

值得注意的是,資料和特徵準備的各個環節不必使用相同的工具。在處理不同資料集時,混合使用多種工具是很常見的。Kubeflow Pipelines 允許我們將實作拆分為多個步驟,並將它們連線成一個完整的系統,即使這些步驟使用不同的程式語言。

本地資料與特徵準備

使用 Jupyter Notebook 進行資料探索

在本地環境中工作限制了資料規模,但提供了最全面的工具選擇。實作資料和特徵準備的常見方法是使用 Jupyter notebook。

使用 notebook 進行資料準備是開始探索資料的絕佳方式。在這個階段,notebook 特別有用,因為:

  1. 我們對資料的理解通常處於最初階段
  2. 視覺化工具能幫助我們更好地理解資料

取得資料

在郵件列表範例中,我們使用來自網路的公共存檔資料。在理想情況下,你可能想要連線到資料函式庫、串流或其他資料儲存函式庫。然而,即使在生產環境中,取得網路資料有時也是必要的。

以下是一個基本的資料取得演算法,它接受 Apache Software Foundation (ASF) 專案的郵件列表位置以及要取得訊息的年份,並回傳取得記錄的路徑,以便我們可以將其用作下一個管道階段的輸入:

def download_data(year: int) -> str:
    # 行內匯入以便 Kubeflow 能正確序列化函式
    from datetime import datetime
    from lxml import etree
    from requests import get
    from time import sleep
    import json
    
    def scrapeMailArchives(mailingList: str, year: int, month: int):
        # 這裡是 xpath 程式碼,詳見範例函式庫
        # ...
        
    datesToScrape = [(year, i) for i in range(1,2)]
    records = []
    for y,m in datesToScrape:
        print(m,"-",y)
        records += scrapeMailArchives("spark-dev", y, m)
    
    output_path = '/data_processing/data.json'
    with open(output_path, 'w') as f:
        json.dump(records, f)
    
    return output_path

這段程式碼會下載指定年份的郵件列表資料並儲存為 JSON 格式。函式回傳的是儲存資料的檔案路徑,這樣可以在 Pipeline 的下一階段使用。這裡需要注意幾點:

  1. 函式只下載最多一年的資料,並在呼叫之間休眠,以避免對 ASF 郵件存檔伺服器造成過大負擔
  2. 所有匯入都是行內的,這有助於 Kubeflow 正確序列化函式
  3. 資料會儲存到一個已知路徑,需要掛載永續性儲存區來允許資料在不同階段之間移動

對於 GCS 或永續性儲存區上的資料,也可以使用內建元件如 google-cloud/storage/downloadfilesystem/get_subdirectory 來載入資料,而不是編寫自定義函式。

資料清理:過濾雜訊

取得資料後,下一步是進行基本的資料清理。雖然資料清理通常依賴領域專業知識,但有一些標準工具可以協助完成常見任務。第一步可以是透過檢查模式來驗證輸入記錄,確保欄位存在與型別正確。

在郵件列表範例中,我們確保寄件者、主題和內容都存在。為了將其轉換為獨立元件,我們的函式接受輸入路徑引數並回傳清理後記錄的檔案路徑:

def clean_data(input_path: str) -> str:
    import json
    import pandas as pd
    
    print("loading records...")
    with open(input_path, 'r') as f:
        records = json.load(f)
    print("records loaded")
    
    df = pd.DataFrame(records)
    # 刪除沒有主題、內容或寄件者的記錄
    cleaned = df.dropna(subset=["subject", "body", "from"])
    
    output_path_hdf = '/data_processing/clean_data.hdf'
    cleaned.to_hdf(output_path_hdf, key="clean")
    
    return output_path_hdf

這段程式碼執行基本的資料清理工作:

  1. 從指定路徑載入 JSON 格式的記錄
  2. 將記錄轉換為 Pandas DataFrame
  3. 刪除缺少主題、內容或寄件者的記錄
  4. 將清理後的資料儲存為 HDF 格式(一種高效的二進位格式)
  5. 回傳清理後資料的檔案路徑

除了刪除缺失欄位外,還有許多其他標準資料品質技術,例如填補缺失資料和分析並移除可能是錯誤測量結果的異常值。無論你決定執行哪些額外的通用技術,都可以簡單地將它們增加到資料清理函式中。

領域特定的資料清理

領域特定的資料清理工具也很有價值。在郵件列表範例中,資料中的一個潛在噪音來源可能是垃圾郵件。解決這個問題的一種方法是使用 SpamAssassin。我們可以將這個套件增加到容器中:

ARG base
FROM $base
# 以 root 身份執行更新
USER root
# 安裝 SpamAssassin
RUN apt-get update && \
    apt-get install -yq spamassassin spamc && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/* && \
    rm -rf /var/cache/apt
# 切換回預期的使用者
USER jovyan

這個 Dockerfile 在基礎映像上安裝 SpamAssassin 工具:

  1. 首先切換到 root 使用者以取得安裝許可權
  2. 安裝 SpamAssassin 和 spamc 客戶端
  3. 清理 apt 快取以減小映像大小
  4. 切換回 jovyan 使用者(Jupyter 容器的預設使用者)

由於 Jupyter 的安全性考量,notebook 容器以較低許可權的使用者執行,這與大多數以 root 身份執行的容器不同。這就是為什麼我們需要在 Dockerfile 中切換使用者的原因。

分散式資料處理工具

隨著資料量增長,本地工具可能無法有效處理。在這種情況下,分散式處理工具就顯得尤為重要。雖然這些工具需要學習新的 API,但它們提供了處理大規模資料的能力。

Apache Spark 與 PySpark

在處理大型資料集時,我經常使用 Apache Spark。它提供了高效的分散式資料處理能力,而 PySpark 則讓 Python 開發者能夠輕鬆上手。以下是使用 PySpark 進行資料清理的範例:

def spark_clean_data(input_path: str) -> str:
    from pyspark.sql import SparkSession
    
    # 建立 Spark 工作階段
    spark = SparkSession.builder \
        .appName("Email Cleaning") \
        .getOrCreate()
    
    # 讀取資料
    df = spark.read.json(input_path)
    
    # 資料清理
    cleaned = df.dropna(subset=["subject", "body", "from"])
    
    # 儲存結果
    output_path = '/data_processing/clean_data_spark'
    cleaned.write.parquet(output_path)
    
    # 關閉 Spark 工作階段
    spark.stop()
    
    return output_path

這段程式碼展示瞭如何使用 PySpark 進行分散式資料清理:

  1. 建立 SparkSession,這是 Spark 應用程式的入口點
  2. 從 JSON 檔案讀取資料到 Spark DataFrame
  3. 執行與之前相同的基本清理(刪除缺失值)
  4. 將結果儲存為 Parquet 格式(一種高效的分欄式儲存格式)
  5. 關閉 Spark 工作階段並回傳輸出路徑

使用 Spark 的優勢在於它能自動將處理分散到多台機器上,適合處理無法放入單機記憶體的大型資料集。

特徵工程:從原始資料到模型輸入

特徵工程是將原始資料轉換為機器學習模型可用形式的過程。這個階段對模型效能有著決定性影響。

文字資料的特徵提取

在郵件列表範例中,我們需要從文字中提取有意義的特徵。一個常見的方法是使用詞袋(Bag of Words)或 TF-IDF 表示:

def extract_features(input_path: str) -> str:
    import pandas as pd
    from sklearn.feature_extraction.text import TfidfVectorizer
    import joblib
    
    # 讀取清理後的資料
    df = pd.read_hdf(input_path, key='clean')
    
    # 建立 TF-IDF 向量化器
    vectorizer = TfidfVectorizer(
        max_features=5000,  # 限制特徵數量
        min_df=5,           # 至少出現在 5 個檔案中
        stop_words='english'  # 移除英文停用詞
    )
    
    # 從郵件內容提取特徵
    features = vectorizer.fit_transform(df['body'])
    
    # 儲存特徵和向量化器
    output_path = '/data_processing/features/'
    import os
    os.makedirs(output_path, exist_ok=True)
    
    joblib.dump(vectorizer, f"{output_path}/vectorizer.joblib")
    joblib.dump(features, f"{output_path}/features.joblib")
    
    return output_path

這段程式碼執行文字特徵提取:

  1. 從 HDF 檔案讀取清理後的資料
  2. 建立 TF-IDF 向量化器,它能將文字轉換為數值向量
    • max_features=5000 限制詞彙表大小為 5000 個最常見的詞
    • min_df=5 忽略在少於 5 個檔案中出現的詞
    • stop_words='english' 移除常見的英文停用詞(如 “the”, “and” 等)
  3. 將向量化器應用於郵件內容
  4. 儲存特徵矩陣和向量化器以便後續使用

TF-IDF 是一種常用的文字表示方法,它不僅考慮詞頻(Term Frequency),還考慮逆檔案頻率(Inverse Document Frequency),能更好地捕捉詞語的重要性。

進階特徵工程

對於更複雜的特徵工程,我們可以使用更先進的技術,如詞嵌入(Word Embeddings)或預訓練語言模型:

def extract_advanced_features(input_path: str) -> str:
    import pandas as pd
    import numpy as np
    from sentence_transformers import SentenceTransformer
    import joblib
    
    # 讀取清理後的資料
    df = pd.read_hdf(input_path, key='clean')
    
    # 載入預訓練模型
    model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
    
    # 生成嵌入
    embeddings = model.encode(df['body'].tolist(), show_progress_bar=True)
    
    # 儲存嵌入
    output_path = '/data_processing/advanced_features/'
    import os
    os.makedirs(output_path, exist_ok=True)
    
    np.save(f"{output_path}/embeddings.npy", embeddings)
    df[['from', 'subject']].to_csv(f"{output_path}/metadata.csv")
    
    return output_path

這段程式碼使用預訓練的語言模型生成文字嵌入:

  1. 讀取清理後的資料
  2. 載入 SentenceTransformer 預訓練模型(這是一個專為生成句子嵌入而設計的模型)
  3. 將每封郵件的內容轉換為固定長度的向量表示
  4. 將嵌入儲存為 NumPy 陣列,並將相關中繼資料儲存為 CSV

與簡單的 TF-IDF 相比,這種方法能更好地捕捉語義訊息,因為預訓練模型已經從大量文字中學習了詞語和句子的語義關係。

構建端對端的資料處理 Pipeline

在瞭解了各個資料和特徵準備的步驟後,現在我們可以將它們組合成一個完整的 Kubeflow Pipeline。

定義 Pipeline 元件

首先,我們需要將前面定義的函式轉換為 Kubeflow Pipeline 元件:

from kfp.components import create_component_from_func

# 建立元件
download_op = create_component_from_func(
    download_data,
    base_image='python:3.8',
    packages_to_install=['lxml', 'requests']
)

clean_op = create_component_from_func(
    clean_data,
    base_image='python:3.8',
    packages_to_install=['pandas', 'tables']
)

feature_op = create_component_from_func(
    extract_features,
    base_image='python:3.8',
    packages_to_install=['pandas', 'scikit-learn', 'tables', 'joblib']
)

這段程式碼將我們定義的函式轉換為 Kubeflow Pipeline 元件:

  1. 使用 create_component_from_func 函式建立元件
  2. 指定基礎映像(這裡使用 Python 3.8)
  3. 列出需要安裝的套件依賴

每個元件都封裝了特定的資料處理功能,並定義了明確的輸入和輸出。

定義完整 Pipeline

接下來,我們將這些元件組合成一個完整的 Pipeline:

from kfp.dsl import pipeline

@pipeline(
    name="Email Analysis Pipeline",
    description="A pipeline to process and analyze email data"
)
def email_pipeline(year: int = 2020):
    # 下載資料
    download_task = download_op(year)
    
    # 清理資料
    clean_task = clean_op(download_task.output)
    
    # 提取特徵
    feature_task = feature_op(clean_task.output)
    
    # 可以繼續增加更多步驟,如模型訓練、評估等
    
    return feature_task.output

這段程式碼定義了一個完整的 Kubeflow Pipeline:

  1. 使用 @pipeline 裝飾器定義 Pipeline 的名稱和描述
  2. 定義 Pipeline 函式,它接受一個年份引數
  3. 按順序連線各個任務:
    • 首先下載指定年份的資料
    • 然後清理下載的資料
    • 最後從清理後的資料中提取特徵
  4. 每個任務的輸出作為下一個任務的輸入,形成資料處理管道
  5. Pipeline 回傳特徵提取任務的輸出,這可以用於後續的模型訓練

這種方式使得整個資料處理過程變得模組化和可重複,便於迭代和改進。

資料與特徵準備的最佳實踐

在實際工作中,我總結了一些資料和特徵準備的最佳實踐,分享給大家:

1. 保持資料處理的可重複性

確保資料處理流程是可重複的,這對於模型的可靠性至關重要。使用 Kubeflow Pipeline 可以幫助實作這一點,它能夠記錄每一步的輸入、輸出和引數。

2. 注重資料品質而非數量

在機器學習中,資料品質往往比數量更重要。花時間清理和準備高品質的資料集,而不是盲目追求更多的資料。

3. 理解領域知識

深入理解問題領域有助於設計更有效的特徵。例如,在處理郵件資料時,瞭解郵件列表的特性和術語可以幫助提取更有意義的特徵。

4. 持續迭代和改進

資料和特徵準備是一個迭代過程。隨著對問題和模型的深入理解,不斷回顧和改進資料處理流程。Kubeflow Pipeline 使這種迭代變得更加容易。

5. 平衡自動化與靈活性

雖然自動化資料處理流程很重要,但也需要保持足夠的靈活性來應對新的資料挑戰。設計模組化的 Pipeline 元件可以幫助實作這種平衡。

資料與特徵準備的未來趨勢

隨著技術的發展,資料和特徵準備領域也在不斷演進。以下是一些值得關注的趨勢:

自動化特徵工程

自動化特徵工程工具(如 Featuretools、AutoML 系統)正變得越來越成熟,能夠自動發現和建立有用的特徵,減少人工干預。

特徵商店

特徵商店(Feature Store)的概念正在興起,它提供了一個集中管理、分享和重用特徵的平台,有助於提高團隊效率和特徵一致性。

端對端深度學習

端對端深度學習模型減少了對手動特徵工程的依賴,直接從原始資料學習表示。這種方法在某些領域(如電腦視覺和自然語言處理)已經取得了顯著成功。

資料和特徵準備是機器學習流程中最關鍵的步驟之一。透過使用 Kubeflow Pipeline,我們可以構建可重複、可擴充套件的資料處理流程,為後續的模型訓練奠定堅實基礎。無論是使用本地工具還是分散式系統,選擇合適的工具並遵循最佳實踐都能顯著提升模型效能。

在機器學習專案中,良好的資料與特徵準備往往是成功的關鍵。透過精心設計的資料處理流程,我們可以從原始資料中提取最有價值的訊息,為模型提供高品質的輸入,最終實作更好的預測效能。隨著技術的不斷進步,資料與特徵準備的方法也將持續演進,為機器學習應用開啟更多可能性。

Kubeflow 資料處理管道:從基礎到分散式架構

在機器學習工作流程中,資料處理與特徵準備是影響模型效能的關鍵環節。在 Kubeflow 環境中,我們可以利用各種工具和技術來最佳化這些流程,從而構建更加強大和可擴充套件的機器學習系統。本文將分享我在實際專案中積累的經驗,幫助你掌握 Kubeflow 中的資料處理技巧。

自訂容器與管道整合

當我們在 Kubeflow 中建立管道時,僅推播新容器是不夠的。Kubeflow 需要明確指示使用哪個容器。在使用 func_to_container_op 構建管道階段時,你需要透過 base_image 引數指定要使用的容器映像。

這體現了容器技術的強大之處—我們可以在 Kubeflow 提供的基礎元件上增加所需工具,而不必從頭開始構建一切。在資料清理完成後,下一步就是確保資料量足夠,或者探索資料增強的可能性。

資料格式化與特徵準備

資料格式選擇

選擇正確的資料格式取決於你使用的特徵準備工具。如果你打算繼續使用相同的工具進行資料準備,輸出格式可以與輸入相同。但在某些情況下,這是轉換格式的好時機。例如,當使用 Spark 進行資料準備而用 TensorFlow 進行訓練時,我通常會在這個階段實作轉換為 TFRecords 格式。

特徵處理實作

特徵處理的方法取決於具體問題。以郵件列表分析為例,我們可以編寫各種文書處理函式並將它們組合成特徵。下面是一個實際的範例:

df['domains'] = df['links'].apply(extract_domains)
df['isThreadStart'] = df['depth'] == '0'

# 從文字中提取特徵
from sklearn.feature_extraction.text import TfidfVectorizer
bodyV = TfidfVectorizer()
bodyFeatures = bodyV.fit_transform(df['body'])

domainV = TfidfVectorizer()
def makeDomainsAList(d):
    return ' '.join([a for a in d if not a is None])
    
domainFeatures = domainV.fit_transform(
    df['domains'].apply(makeDomainsAList))

# 組合特徵
from scipy.sparse import csr_matrix, hstack
data = hstack([
    csr_matrix(df[[
        'containsPythonStackTrace', 'containsJavaStackTrace',
        'containsExceptionInTaskBody', 'isThreadStart'
    ]].to_numpy()), bodyFeatures, domainFeatures
])

這段程式碼展示了文字資料的特徵工程過程。首先,我們從連結中提取網域名稱並標記是否為討論串的開始。接著使用 TF-IDF 向量化器從郵件內文和網域名稱中提取特徵,這是處理文字資料的常用方法,能有效捕捉詞彙的重要性。最後,我們使用 hstack 函式將多種特徵水平堆積積疊成一個統一的特徵矩陣,包括二元特徵(如是否包含堆積積疊追蹤)和文字特徵。這種模組化的特徵工程方法使得每個步驟都可以轉換為獨立的管道階段。

到目前為止,範例程式碼的結構允許你將每個函式轉換為單獨的管道階段,但還有其他選擇。我們將在後續的「整合管道」部分探討如何使用整個 Notebook 作為管道階段。

超越 Notebook:自訂容器與進階工具

Notebook 和 Python 並不總是最佳選擇——Notebook 在版本控制方面可能存在困難,而 Python 有時缺乏必要的函式庫或效能。因此,我們需要探索其他可用工具。

自訂容器的力量

Kubeflow 管道不僅限於 Notebook 或特定語言。根據專案需求,你可能需要使用一般 Python 專案、自訂工具、Python 2 甚至 FORTRAN 程式碼作為核心元件。

在我的實踐中,有時會使用 Scala 執行管道中的某個步驟,或者整合 R 語言容器來處理特定的統計分析任務。這種靈活性是 Kubeflow 的一大優勢。

有時你可能找不到完全符合需求的容器。在這種情況下,可以採用通用基礎映像並在其上構建。這種方法讓我們能夠精確定製處理環境,確保管道中的每個元件都能發揮最佳效能。

分散式工具:處理大規模資料

當問題規模超出單機記憶體容量或需要更高效能時,分散式平台就顯得尤為重要。通常,當我們的問題超出初始 Notebook 解決方案的能力時,就是開始使用分散式工具的時機。

Kubeflow 中的分散式系統

Kubeflow 中主要的資料平行分散式系統有 Apache Spark 和 Google 的 Dataflow(透過 Apache Beam)。Apache Spark 擁有更大的安裝基礎和更多支援的格式與函式庫。Apache Beam 支援 TensorFlow Extended (TFX)——一個端對端的機器學習工具,它能與 TFServing 無縫整合,用於模型推論。

由於 TFX 與 Kubeflow 整合最為緊密,我們將首先探討 Apache Beam 上的 TFX,然後再介紹更為標準的 Apache Spark。

TensorFlow Extended:端對端機器學習工具

TensorFlow 社群建立了一套出色的整合工具,涵蓋從資料驗證到模型服務的各個方面。目前,TFX 的資料工具都建立在 Apache Beam 之上,後者在 Google Cloud 上提供最完善的分散式處理支援。

值得注意的是,Apache Beam 對 Python 的支援在 Google Cloud Dataflow 之外並不成熟。TFX 是一個 Python 工具,因此其擴充套件性取決於 Apache Beam 對 Python 的支援。你可以透過使用 GCP 的 Dataflow 元件來擴充套件工作。隨著 Apache Beam 對 Apache Flink 和 Spark 的支援改進,未來可能會增加對可移植方式擴充套件 TFX 元件的支援。

Kubeflow 在其管道系統中包含了許多 TFX 元件。TFX 也有自己的管道概念,這與 Kubeflow 管道是分開的,在某些情況下 TFX 可以作為 Kubeflow 的替代品。這裡我們將專注於資料和特徵準備元件,因為這些元件最容易與 Kubeflow 生態系統的其餘部分一起使用。

TensorFlow 資料驗證:確保資料品質

確保資料品質不會隨時間下降是至關重要的。資料驗證允許我們確保資料的結構和分佈僅以預期的方式演變,並在資料品質問題成為生產問題之前發現它們。TensorFlow Data Validation (TFDV) 提供了驗證資料的能力。

安裝與設定

為了簡化開發過程,建議在本地安裝 TFX 和 TFDV。雖然程式碼可以在 Kubeflow 內部評估,但在本地安裝這些函式庫將加速開發工作。安裝 TFX 和 TFDV 只需簡單的 pip 命令:

pip3 install tfx tensorflow-data-validation

在 Kubeflow 管道中使用 TFX 和 TFDV

現在讓我們看如何在 Kubeflow 管道中使用 TFX 和 TFDV。第一步是載入我們想要使用的相關元件。如前所述,雖然 Kubeflow 確實有 load_component 函式,但它會自動解析 master 分支,使其不適合生產使用案例。因此,我們將使用 load_component_from_file 以及 Kubeflow 元件的本地副本來載入 TFDV 元件。

我們需要載入的基本元件包括:範例生成器(相當於資料載入器)、結構定義器、統計生成器和驗證器本身。下面是載入元件的範例:

tfx_csv_gen = kfp.components.load_component_from_file(
    "pipelines-0.2.5/components/tfx/ExampleGen/CsvExampleGen/component.yaml")
tfx_statistic_gen = kfp.components.load_component_from_file(
    "pipelines-0.2.5/components/tfx/StatisticsGen/component.yaml")
tfx_schema_gen = kfp.components.load_component_from_file(
    "pipelines-0.2.5/components/tfx/SchemaGen/component.yaml")
tfx_example_validator = kfp.components.load_component_from_file(
    "pipelines-0.2.5/components/tfx/ExampleValidator/component.yaml")

這段程式碼展示瞭如何載入 TFX 元件以構建資料驗證管道。我們使用 load_component_from_file 函式從本地 YAML 檔案載入四個關鍵元件:

  1. tfx_csv_gen - CSV 範例生成器,用於從 CSV 檔案載入資料
  2. tfx_statistic_gen - 統計生成器,用於計算資料的統計訊息
  3. tfx_schema_gen - 結構定義器,用於從資料推斷結構
  4. tfx_example_validator - 範例驗證器,用於根據結構驗證資料

使用本地元件定義而非動態載入是一個重要的生產實踐,它確保了管道的穩定性和可重現性,避免了因元件更新導致的意外行為變化。

除了元件外,我們還需要資料。目前的 TFX 元件使用 Kubeflow 的 file_output 機制在管道階段之間傳遞資料。這會將輸出放入 MinIO 中,自動追蹤與管道相關的資料。

整合分散式工具與 Kubeflow

TFX 的元件設計讓我們能夠輕鬆地將資料驗證整合到 Kubeflow 管道中。這種整合提供了幾個關鍵優勢:

  1. 自動化資料品質監控:透過管道自動執行資料驗證,我們可以持續監控資料品質,及早發現問題。

  2. 可重現的資料處理:所有資料處理步驟都被明確定義為管道元件,確保處理過程的一致性和可重現性。

  3. 擴充套件性:當資料量增長時,我們可以利用分散式系統的能力來處理更大規模的資料。

  4. 工具整合:Kubeflow 的開放架構允許我們整合各種工具,從 TensorFlow 到 Spark,從 Python 到 R 和 Scala,使我們能夠為每個任務選擇最合適的工具。

在實際應用中,我發現結合使用 TFX 的資料驗證能力和 Apache Spark 的分散式處理能力是處理大規模機器學習專案的有效方法。TFX 提供了強大的資料驗證功能,而 Spark 則提供了處理大規模資料的能力。

實用策略與最佳實踐

透過多年的實踐,我總結了一些在 Kubeflow 中進行資料和特徵準備的最佳實踐:

  1. 模組化設計:將資料處理邏輯分解為獨立的函式,每個函式執行一個特定任務。這不僅提高了程式碼的可讀性,還使得將這些函式轉換為管道階段變得更加容易。

  2. 容器策略:為不同的處理任務建立專用容器,確保每個容器都包含所需的所有依賴項。這種方法比試圖建立一個包含所有可能工具的巨大容器更加靈活和高效。

  3. 資料驗證:在管道的早期階段實施資料驗證,以便及早發現和修復資料問題。這有助於避免在訓練階段或更糟糕的是在生產階段發現資料問題。

  4. 效能考量:對於大型資料集,考慮使用分散式工具如 Spark 或 Beam。但對於較小的資料集,簡單的 Python 指令碼可能更加高效。

  5. 版本控制:對資料處理程式碼和容器映像實施嚴格的版本控制,確保管道的可重現性。

在資料和特徵準備方面,Kubeflow 提供了豐富的工具和技術,使我們能夠構建強大、可擴充套件的機器學習管道。透過正確選擇工具、設計模組化管道並實施資料驗證,我們可以確保機器學習專案的成功。

機器學習工作流程中的資料處理階段往往被低估,但它實際上是整個過程中最關鍵的部分之一。高品質的資料和特徵是訓練出高效能模型的基礎。透過利用 Kubeflow 的強大功能和遵循最佳實踐,我們可以顯著提高資料處理的效率和效果,從而為整個機器學習專案奠定堅實的基礎。

Kubeflow中的資料與特徵準備工作流程

在機器學習專案中,資料準備和特徵工程往往佔據了整個開發週期的80%時間。在Kubeflow平台上,我們有多種工具可以幫助我們高效完成這些工作。本文將探討如何使用TensorFlow Data Validation (TFDV)進行資料驗證、TensorFlow Transform (TFT)進行特徵轉換,以及如何利用Apache Spark處理大規模資料集。

使用TensorFlow Data Validation進行資料驗證

TensorFlow Data Validation (TFDV)是TensorFlow Extended (TFX)生態系統中的重要工具,用於資料驗證和模式推斷。在推薦系統的範例中,我們可以透過以下步驟使用TFDV:

下載資料

首先,我們需要取得資料。在Kubeflow管道中,我們可以使用ContainerOp來下載資料:

fetch = kfp.dsl.ContainerOp(
    name='download',
    image='busybox',
    command=['sh', '-c'],
    arguments=[
        'sleep 1;'
        'mkdir -p /tmp/data;'
        'wget ' + data_url + ' -O /tmp/data/results.csv'
    ],
    file_outputs={'downloaded': '/tmp/data'}
)

這段程式碼建立了一個名為download的容器操作,使用基本的busybox映像來執行Shell命令。它建立了一個臨時目錄/tmp/data,然後使用wgetdata_url變數指定的URL下載資料,並儲存為results.csv檔案。最後,它將下載的資料目錄路徑作為輸出回傳,這樣後續步驟可以參照這個位置。

如果資料已經存在於永續性儲存區上(例如在前一個階段取得的資料),我們也可以使用filesystem/get_file元件來取得資料。

資料轉換為TFX範例

一旦資料下載完成,我們需要使用TFX的範例生成器將資料轉換為TFX可以處理的格式。TFX支援多種資料格式,包括CSV和TFRecord。在我們的推薦系統範例中,我們使用CSV元件:

records_example = tfx_csv_gen(input_base=fetch.output)

這行程式碼使用TFX的CSV生成器元件將下載的CSV檔案轉換為TFX的Example格式。它接受前一個步驟(fetch)的輸出作為輸入路徑,並生成TFX可以處理的Example格式資料。這是TFX管道中的標準步驟,將原始CSV資料轉換為結構化的TensorFlow Example格式,為後續的資料分析和處理做準備。

生成資料統計和模式

有了Example格式的資料後,我們可以使用TFDV來計算資料統計並推斷資料模式:

stats = tfx_statistic_gen(input_data=records_example.output)
schema_op = tfx_schema_gen(stats.output)

這兩行程式碼展示了TFX中資料分析的核心步驟:

  1. tfx_statistic_gen元件計算輸入資料的統計訊息,如特徵分佈、缺失值比例等。
  2. schema_gen元件根據這些統計訊息自動推斷資料的模式(schema),包括每個特徵的型別、值域範圍等。

這個過程非常重要,因為推斷出的模式將用於後續的資料驗證,以確保新的資料符合預期格式。在生產環境中,玄貓建議儲存這個模式並在未來的驗證中重複使用,而不是每次都重新推斷,這樣可以更容易檢測到資料結構的變化。

檢查和顯示模式

在使用模式進行驗證之前,我們應該檢查它是否符合我們的預期:

import tensorflow_data_validation as tfdv

schema = tfdv.load_schema_text("schema_info_2")
tfdv.display_schema(schema)

這段程式碼用於檢視和分析推斷出的資料模式。首先匯入tensorflow_data_validation函式庫,然後從檔案中載入模式定義,並使用display_schema函式以視覺化方式展示模式內容。這一步驟非常重要,因為它允許資料科學家確認自動推斷的模式是否符合預期,並在必要時進行修改。模式檢查可以幫助發現潛在的資料問題,如錯誤的資料型別推斷或不合理的值域範圍。

如果需要修改模式(無論是因為模式演化還是推斷不正確),可以使用TensorFlow GitHub儲存函式庫中的schema_util.py指令碼提供的工具。

驗證資料

確認模式正確後,我們可以使用它來驗證資料:

tfx_example_validator(
    stats=stats.outputs['output'],
    schema=schema_op.outputs['output']
)

這段程式碼執行資料驗證過程,它使用之前生成的統計訊息和模式作為輸入。驗證器會檢查資料是否符合模式定義的約束,如資料型別、值域範圍、缺失值比例等。如果發現異常,驗證器會生成報告指出問題所在。在生產環境中,玄貓強烈建議在推播到生產前檢查被拒絕的記錄數量 - 如果拒絕率過高,可能意味著資料格式已經變更,需要更新模式甚至整個管道。

這種自動化的資料驗證機制是確保機器學習系統穩定性的關鍵,可以及早發現資料問題,避免模型在生產環境中因資料異常而失效。

使用TensorFlow Transform進行特徵轉換

TensorFlow Transform (TFT)是TFX生態系統中用於特徵工程的工具。它的主要優勢在於能夠將特徵轉換邏輯整合到TensorFlow模型中,簡化了推論時的特徵準備工作。

設定TFT環境

首先,我們需要匯入必要的函式庫:

import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

這段程式碼匯入了使用TensorFlow Transform所需的核心函式庫:

  • tensorflow:基礎的TensorFlow函式庫,提供深度學習和張量運算功能
  • tensorflow_transform:特徵轉換的專用函式庫,提供了在大規模資料集上應用轉換的功能
  • schema_utils:用於處理和操作資料模式的實用工具

這些匯入為後續定義特徵轉換邏輯做準備,TFT將這些轉換整合到TensorFlow圖中,確保訓練和推論時使用相同的轉換邏輯。

定義預處理函式

TFT的核心是定義一個預處理函式,指定我們想要應用的轉換:

def preprocessing_fn(inputs):
    outputs = {}
    # TFT business logic goes here
    outputs["body_stuff"] = tft.compute_and_apply_vocabulary(inputs["body"], top_k=1000)
    return outputs

這個preprocessing_fn函式是TFT的核心,它定義瞭如何將原始輸入轉換為模型訓練所需的特徵。在這個例子中:

  1. 函式接收一個包含所有輸入特徵的字典inputs
  2. 建立一個空字典outputs來儲存轉換後的特徵
  3. 使用tft.compute_and_apply_vocabulary函式處理文欄位"body",這會:
    • 從所有訓練資料中計算出現頻率最高的1000個詞彙(由top_k=1000指定)
    • 將原始文字轉換為對應的詞彙索引

這種方法的強大之處在於,詞彙表是在整個訓練集上計算的,然後應用於每個樣本,這種全域和區域性處理的結合是TFT的獨特優勢。

需要注意的是,這個函式不支援任意的Python程式碼,所有轉換必須表示為TensorFlow或TensorFlow Transform操作。TensorFlow操作一次只能操作一個張量,而在資料準備中,我們通常想要計算所有輸入資料的某些統計量,TensorFlow Transform的操作提供了這種能力。

將轉換增加到管道

定義好預處理函式後,我們需要將其增加到Kubeflow管道中:

transformed_output = tfx_transform(
    input_data=records_example.output,
    schema=schema_op.outputs['output'],
    module_file=module_file  # 指向GCS/S3上的TFT程式碼路徑
)

這段程式碼將TFT轉換整合到Kubeflow管道中。它使用TFX的Transform元件,接受三個關鍵輸入:

  1. input_data:要轉換的資料,來自前面的CSV生成器
  2. schema:資料模式,來自模式生成器
  3. module_file:包含預處理函式的Python模組檔案的路徑

這個元件的獨特之處在於它需要將轉換程式碼作為一個上載到S3或GCS的檔案傳入,而不能直接內嵌程式碼。執行後,它會產生轉換後的資料集以及一個關鍵成果:可以在推論時使用的轉換模型,這大簡化了模型佈署過程。

透過TensorFlow Transform,我們不僅完成了特徵準備,還生成了一個關鍵的成果:可以在服務時間轉換請求的工具。TFT與TensorFlow的緊密整合可以使模型服務變得簡單許多。不過,Kubeflow元件中的TensorFlow Transform並不能滿足所有專案的需求,因此下面我們將探討分散式特徵準備。

使用Apache Spark進行分散式資料處理

Apache Spark是一個開放原始碼的分散式資料處理工具,可以在各種叢集上執行。Kubeflow透過不同的元件支援Apache Spark,讓你可以存取雲端特定的功能。

在Jupyter中設定Spark

Spark並未預先安裝在Kubeflow的筆記本映像中。雖然可以在筆記本內使用pip安裝Spark,但這不支援複雜的環境。更好的方法是從你正在使用的筆記本容器開始,透過新的Dockerfile增加Spark:

# 參考 https://www.kubeflow.org/docs/notebooks/custom-notebook/
ARG base
FROM $base
ARG sparkversion
ARG sparkrelease
ARG sparkserver https://www-us.apache.org/dist/spark

# 需要以root身份執行更新
USER root

# 設定Spark的環境變數
ENV SPARK_HOME /opt/spark

# 安裝Java,因為Spark需要它
RUN apt-get update && \
    apt-get install -yq openjdk-8-jre openjdk-8-jre-headless && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# 安裝Spark
RUN set -ex && \
    rm /bin/sh && \
    ln -sv /bin/bash /bin/sh
RUN echo "Setting up $sparkversion"
RUN cd /tmp && \
    (wget ${sparkserver}/spark-${sparkversion}/${sparkrelease}.tgz) && \
    cd /opt && tar -xvf /tmp/${sparkrelease}.tgz && \
    rm /tmp/${sparkrelease}.tgz && mv ${sparkrelease} spark

這個Dockerfile展示瞭如何在Kubeflow筆記本映像中增加Spark支援。主要步驟包括:

  1. 接受基礎映像和Spark版本作為構建引數
  2. 切換到root使用者以進行系統更新
  3. 設定SPARK_HOME環境變數
  4. 安裝Spark執行所需的Java
  5. 下載並安裝指定版本的Spark

這種方法允許你建立一個包含Spark的自定義筆記本映像,而不是在每次執行筆記本時都需要安裝Spark。在Kubeflow中使用自定義映像可以大提高開發效率,特別是當你需要重複使用相同的環境時。

Spark與TFX整合的實用策略

在實際專案中,玄貓發現將Spark與TFX工具結合使用可以提供強大的資料處理能力。以下是一些實用策略:

使用Spark進行初步資料清洗和轉換

Spark擅長處理大規模資料集的初步清洗和轉換。在將資料傳遞給TFX工具之前,可以使用Spark進行:

  1. 缺失值處理和異常值檢測
  2. 特徵工程和轉換
  3. 資料分割槽(訓練/驗證/測試)

將Spark轉換結果與TFX管道整合

雖然Spark和TFX各有優勢,但它們可以很好地協同工作:

  1. 使用Spark處理原始資料並生成中間結果
  2. 將Spark處理後的資料轉換為TFRecord或CSV格式
  3. 使用TFX工具(如TFDV和TFT)進行進一步處理和驗證

大規模資料處理的結構化方法

在處理大規模資料時,玄貓建議將程式碼結構化為單一階段,涵蓋所有特徵和資料準備工作。這是因為在大規模環境中,在步驟之間寫入和載入資料是成本高昂的操作。透過將相關的資料處理邏輯組合在一起,可以顯著提高效率。

資料驗證與特徵轉換的最佳實踐

根據實際經驗,玄貓總結了一些在Kubeflow中進行資料驗證和特徵轉換的最佳實踐:

資料驗證策略

  1. 儲存和重用模式:在生產環境中,應該儲存推斷的模式並在未來的驗證中重複使用它,而不是每次都重新推斷。這有助於捕捉資料分佈的變化。

  2. 檢查被拒絕的記錄:在推播到生產前,應該檢查被驗證器拒絕的記錄數量。如果拒絕率過高,可能意味著資料格式已經變更,需要更新模式。

  3. 定期重新評估模式:雖然應該重用模式進行驗證,但也應該定期重新評估它是否仍然適合當前的資料分佈。資料漂移是機器學習系統中的常見問題。

特徵轉換技巧

  1. 模組化轉換邏輯:將特徵轉換邏輯模組化,這樣可以更容易地測試和重用它們。

  2. 考慮計算成本:某些轉換操作可能在大規模資料集上非常昂貴。在設計轉換邏輯時,應該考慮計算成本和效率。

  3. 整合訓練和推論:利用TensorFlow Transform的優勢,確保訓練和推論時使用相同的轉換邏輯,避免訓練-服務偏差。

  4. 增量更新考量:對於需要定期更新的模型,考慮如何增量更新特徵轉換,而不是每次都從頭開始計算。

結論

在Kubeflow中,TensorFlow Data Validation和TensorFlow Transform提供了強大的工具來處理資料驗證和特徵轉換,而Apache Spark則為大規模資料處理提供了分散式計算能力。透過結合這些工具,我們可以構建健壯、可擴充套件的機器學習資料處理管道。

隨著資料量的不斷增長和模型複雜性的提高,高效的資料處理變得越來越重要。在未來,我們可能會看到更多專注於自動化資料驗證、人工智慧特徵工程和資料品質監控的工具和技術。這些進步將進一步簡化機器學習工作流程,使資料科學家和工程師能夠將更多精力放在解決業務問題上,而不是資料處理的繁瑣任務上。

在實際應用中,選擇合適的工具組合並遵循最佳實踐,可以顯著提高機器學習專案的效率和成功率。資料準備雖然不如模型訓練那樣受到關注,但它往往是決定專案成敗的關鍵因素。

Kubernetes與Spark的完美結合:分散式運算新境界

在現代資料工程領域,Spark已成為處理大規模資料的標準工具,而Kubernetes則是容器協調的首選平台。當這兩項技術結合時,我們可以實作高度彈性與強大的分散式資料處理能力。在這篇文章中,我將分享如何在Kubernetes環境,特別是Kubeflow平台上,正確設定和執行Spark任務。

容器環境中的許可權管理

在容器化的Spark環境中,正確的許可權設定至關重要。以下是一個關鍵的設定片段:

# 設定工作目錄
WORKDIR /opt/spark/work-dir
# 設定適當的許可權
RUN chmod -R 777 /opt/spark/
# 切換回使用者
USER jovyan
# 安裝常用工具
pip install pandas numpy scipy pyarrow

這段Dockerfile指令做了幾件重要的事:

  1. 設定了Spark的工作目錄為/opt/spark/work-dir
  2. 為Spark目錄賦予了完全的讀寫執行許可權(777)
  3. 切換回jovyan使用者(這是Jupyter基礎映像所依賴的使用者)
  4. 安裝了資料科學中常用的Python函式庫

值得注意的是,雖然設定777許可權在生產環境中通常不是最佳實踐(過於寬鬆的許可權可能帶來安全風險),但在這種容器化環境中,為了確保Spark能夠正常寫入工作目錄,有時這是必要的妥協。

解決Notebook與Worker間的通訊問題

在Kubeflow環境中使用Spark時,一個常見的挑戰是Spark workers無法直接連線到我們的notebook伺服器。為瞭解決這個問題,我們需要建立一個服務來實作服務發現:

apiVersion: v1
kind: Service
metadata:
  name: spark-driver
  namespace: kubeflow-programmerboo
spec:
  selector:
    notebook-name: spark-test-2
  ports:
  - port: 39235
    targetPort: 39235
    name: spark-driver-port
  - port: 39236
    targetPort: 39236
    name: spark-block-port

這個服務定義做了以下幾點:

  1. 建立了一個名為spark-driver的服務
  2. 指定了名稱空間為kubeflow-programmerboo
  3. 透過notebook-name: spark-test-2選擇器來識別目標Pod
  4. 暴露了兩個連線埠:
    • 39235:用於Spark驅動程式通訊
    • 39236:用於Spark塊管理器通訊

一旦定義了這個服務,只需執行kubectl apply -f my_spark_service.yaml就能應用這些設定。這樣,Spark驅動程式和執行器就能夠相互通訊了。

在建立SparkContext時,我們需要設定它使用這個服務作為主機名。這種方式非常適合探索階段,但當專案進入更成熟的階段時,應考慮使用Spark Operator。

Kubeflow中的Spark Operator:生產級佈署

當我們從實驗階段過渡到生產環境時,使用Kubeflow的原生Spark Operator、EMR或Dataproc是更好的選擇。其中最具可移植性的是原生Spark Operator,它不依賴於任何特定雲平台。

封裝Spark應用程式

要使用任何Operator,都需要對Spark程式進行封裝,並將其儲存在分散式檔案系統(如GCS、S3等)上,或放入容器中。對於Python或R程式,我建議構建Spark容器來安裝依賴項:

# 使用Spark operator映象作為基礎
FROM gcr.io/spark-operator/spark-py:v2.4.5
# 安裝Python依賴
COPY requirements.txt /
RUN pip3 install -r /requirements.txt
# 現在可以參照local:///job/my_file.py
RUN mkdir -p /job
COPY *.py /job
ENTRYPOINT ["/opt/entrypoint.sh"]

這個Dockerfile展示瞭如何建立自定義Spark容器:

  1. 以官方Spark operator映象為基礎
  2. 安裝Python依賴項
  3. 建立/job目錄並複製Python檔案到此目錄
  4. 設定入口點為Spark的標準入口指令碼

這種方法的優點是,即使決定更新應用程式,仍然可以使用同一個容器,只需將主資源設定為分散式檔案系統上的路徑即可。

雲端特定選項

在Kubeflow中執行Spark的兩個雲特定選項是Amazon EMR和Google Dataproc元件。但它們各自需要不同的引數,這意味著在不同雲平台間遷移時需要轉換管道設定。

EMR元件允許設定叢集、提交作業和清理叢集,相關元件包括用於啟動的aws/emr/create_cluster和用於清理的aws/emr/delete_cluster。執行PySpark作業的元件是aws/emr/submit_pyspark_job

Dataproc叢集的工作流程與EMR類別似,元件名稱也相似:gcp/dataproc/create_cluster/gcp/dataproc/delete_cluster/用於生命週期管理,gcp/dataproc/submit_pyspark_job/用於執行作業。

使用ResourceOp啟動Spark作業

與EMR和Dataproc元件不同,Spark operator沒有專門的元件。對於沒有元件的Kubernetes operators,我們可以使用dsl.ResourceOp來呼叫它們:

resource = {
    "apiVersion": "sparkoperator.k8s.io/v1beta2",
    "kind": "SparkApplication",
    "metadata": {
        "name": "boop",
        "namespace": "kubeflow"
    },
    "spec": {
        "type": "Python",
        "mode": "cluster",
        "image": "gcr.io/boos-demo-projects-are-rad/kf-steps/kubeflow/myspark",
        "imagePullPolicy": "Always",
        # 參見Dockerfile或使用GCS/S3/...
        "mainApplicationFile": "local:///job/job.py",
        "sparkVersion": "2.4.5",
        "restartPolicy": {
            "type": "Never"
        },
        "driver": {
            "cores": 1,
            "coreLimit": "1200m",
            "memory": "512m",
            "labels": {
                "version": "2.4.5",
            },
            # 也可嘗試spark-operatoroperator-sa
            "serviceAccount": "spark-operatoroperator-sa",
        },
        "executor": {
            "cores": 1,
            "instances": 2,
            "memory": "512m"
        },
        "labels": {
            "version": "2.4.5"
        },
    }
}

@dsl.pipeline(name="local Pipeline", description="No need to ask why.")
def local_pipeline():
    rop = dsl.ResourceOp(
        name="boop",
        k8s_resource=resource,
        action="create",
        success_condition="status.applicationState.state == COMPLETED")

這段程式碼展示瞭如何使用ResourceOp啟動Spark作業:

  1. 定義了一個SparkApplication資源,指定了型別、模式、映像和主應用檔案
  2. 設定了驅動程式和執行器的資源需求(核心數、記憶體等)
  3. 在管道函式中,建立了一個ResourceOp物件,並設定了成功條件

需要注意的是,Kubeflow不會對ResourceOp請求進行任何驗證。例如,在Spark中,作業名稱必須能夠用作有效DNS名稱的開頭,而ResourceOps只是直接傳遞請求,不像container ops會重寫容器名稱。

Spark基礎:建立工作階段與設定

Apache Spark提供了多種語言的API,包括Python、R、Scala和Java,以及一些第三方語言的支援。在機器學習社群中,Python介面最為流行。任何Spark程式的第一步是建立一個Spark工作階段或上下文:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import *

session = SparkSession.builder.getOrCreate()

這個例子之所以如此簡單,是因為它從呼叫環境中讀取設定。這在使用Spark operator時很有效,因為operator為我們處理了大部分設定。

然而,在notebook中工作時,我們需要提供一些額外訊息,以便執行器能夠連接回notebook。一旦設定了服務以便notebook和驅動程式可以通訊,就需要設定Spark工作階段,告訴執行器使用這個服務:

.config("spark.driver.bindAddress", "0.0.0.0")
.config("spark.kubernetes.namespace", "kubeflow-programmerboo")
.config("spark.master", "k8s://https://kubernetes.default")
.config("spark.driver.host", "spark-driver.kubeflow-programmerboo.svc.cluster.local")
.config("spark.kubernetes.executor.annotation.sidecar.istio.io/inject", "false")
.config("spark.driver.port", "39235")
.config("spark.blockManager.port", "39236")

這段設定做了以下幾件事:

  1. 設定驅動程式繫結到所有網路介面(0.0.0.0)
  2. 指定Kubernetes名稱空間
  3. 設定Spark主機為Kubernetes API伺服器
  4. 指定驅動程式主機為我們之前建立的服務
  5. 停用Istio sidecar注入(避免可能的網路問題)
  6. 設定驅動程式和塊管理器的連線埠

版本比對與環境一致性

在分散式環境中,版本比對至關重要。Python版本不比對可能導致序列化和函式錯誤。為瞭解決這個問題,我們需要在notebook中設定os.environ["PYSPARK_PYTHON"] = "python3.6",並在Spark的worker容器中安裝Python 3.6:

ARG base
FROM $base
USER root
# 安裝構建Python 3.6所需的函式庫
RUN apt-get update && \
    DEBIAN_FRONTEND=noninteractive apt-get install -y -q \
    make build-essential libssl-dev zlib1g-dev libbz2-dev \
    libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev \
    libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev && \
    rm -rf /var/cache/apt
# 安裝Python 3.6以比對notebook
RUN cd /tmp && \
    wget https://www.python.org/ftp/python/3.6.10/Python-3.6.10.tgz && \
    tar -xvf Python-3.6.10.tgz && \
    cd Python-3.6.10 && \
    ./configure && \
    make -j 8 && \
    make altinstall
RUN python3.6 -m pip install pandas pyarrow==0.11.0 spacy
# 我們依賴Spark在PYTHONPATH上,所以不透過pip安裝
USER 185

這個Dockerfile展示瞭如何在Spark worker容器中安裝特定版本的Python:

  1. 安裝編譯Python所需的依賴函式庫
  2. 下載、解壓、編譯並安裝Python 3.6.10
  3. 安裝必要的Python函式庫
  4. 切換回非root使用者(UID 185)

這種方法確保了notebook和executor使用相同版本的Python,避免了序列化問題。

使用MinIO作為儲存解決方案

Kubeflow內建了MinIO,這是一個與S3相容的儲存服務。要在Spark中使用MinIO需要一些額外設定。這對於儲存資料、模型和中間結果非常有用。

在設定Spark使用MinIO時,需要提供適當的端點、存取金鑰和金鑰ID。這使得Spark應用程式可以無縫地讀取和寫入物件儲存,就像使用本地檔案系統一樣。

分散式資料處理的最佳實踐

在Kubernetes上執行Spark時,有幾個最佳實踐值得注意:

資源分配與調優

適當的資源分配對於Spark作業的效能至關重要。在前面的例子中,我們為驅動程式分配了1個核心和512MB記憶體,為每個執行器分配了相同的資源。在實際應用中,應根據工作負載特性調整這些值。

對於記憶體密集型任務(如機器學習模型訓練),應增加執行器記憶體;對於計算密集型任務,應增加核心數。同時,執行器數量應根據資料大小和叢集容量來確定。

容錯與持久化

Spark提供了多種資料持久化選項,可以顯著提高迭代演算法的效能。在Kubernetes環境中,可以利用永續性儲存區宣告(PVC)來儲存中間結果,或使用MinIO等物件儲存。

此外,設定適當的檢查點機制可以在失敗時還原計算,減少重新計算的成本。在長時間執行的作業中,這一點尤為重要。

監控與故障排除

在Kubernetes上執行Spark時,可以利用多種工具進行監控和故障排除:

  1. Spark自帶的Web UI提供了作業執行的詳細訊息
  2. Kubernetes日誌可以檢視容器級別的問題
  3. Prometheus和Grafana可以用於長期監控和警示

當遇到問題時,首先檢查Spark驅動程式的日誌,然後是執行器的日誌。常見問題包括記憶體不足、序列化錯誤和網路連線問題。

從開發到生產:工作流程轉換

雖然Jupyter notebooks是探索和原型設計的絕佳工具,但當我們準備將Spark作業投入生產時,應考慮以下轉變:

  1. 將notebook程式碼轉換為獨立的Python模組或包
  2. 使用CI/CD流程自動構建和佈署Spark容器
  3. 實作引數化設定,使同一程式碼可以在不同環境中執行
  4. 整合監控和警示系統,及時發現問題

透過這些步驟,可以實作從開發到生產的平滑過渡,同時保持程式碼的可維護性和可擴充套件性。

在Kubernetes上執行Spark為資料處理提供了強大而靈活的平台。透過Kubeflow的整合,我們可以更輕鬆地管理和佈署Spark應用程式,實作真正的分散式資料科學工作流程。隨著資料量的增長和計算需求的提高,這種架構將變得越來越重要。

Spark 與 MinIO 的強力整合:開發彈性資料處理平台

在處理大規模資料時,選擇合適的儲存解決方案與資料處理框架至關重要。我在多個大型專案中發現,將 Apache Spark 與 MinIO 整合是一個極具彈性與高效的組合。MinIO 作為一個開放原始碼的高效能物件儲存系統,與 Spark 的分散式處理能力相輔相成,特別適合需要彈性擴充的雲端環境。

Spark 與 MinIO 的設定整合

MinIO 提供了相容 S3 的 API,這使得 Spark 能夠透過 S3A 檔案系統聯結器與其無縫整合。以下是設定 Spark 使用 MinIO 的關鍵設定:

spark = SparkSession.builder \
    .config("spark.hadoop.fs.s3a.endpoint", "minio-service.kubeflow.svc.cluster.local:9000") \
    .config("fs.s3a.connection.ssl.enabled", "false") \
    .config("fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .getOrCreate()
  • spark.hadoop.fs.s3a.endpoint 設定 MinIO 服務的位置,這裡是使用 Kubernetes 服務名稱
  • fs.s3a.connection.ssl.enabled 設為 false 表示不使用 SSL 連線,適用於內部測試環境
  • fs.s3a.path.style.access 設為 true 啟用路徑風格存取,這是 MinIO 推薦的設定方式
  • 最後兩行設定存取金鑰,用於身份驗證

值得注意的是,此整合方式僅適用於 Spark 3.0 或更高版本。在較舊版本中,可能需要額外的設定或使用不同的聯結器。

資料讀取與格式處理

Spark 的強大之處在於它支援多種資料來源,包括但不限於:Parquet、JSON、JDBC、ORC、Hive、CSV、ElasticSearch、MongoDB、Neo4j、Cassandra、Snowflake 和 Redis 等。

從不同格式讀取資料

在實際專案中,我發現最常用的格式是 Parquet,它提供了出色的壓縮比和列式存取效能。以下是從 Parquet 格式讀取郵件列表資料的範例:

# 讀取 Parquet 格式的資料
initial_posts = session.read.format("parquet").load(fs_prefix + "/initial_posts")
ids_in_reply = session.read.format("parquet").load(fs_prefix + "/ids_in_reply")

這段程式碼示範了 Spark 讀取 Parquet 格式資料的簡潔語法。session.read.format() 指定資料格式,而 load() 方法則指定資料的位置。如果需要讀取其他格式,只需將 “parquet” 替換為相應的格式名稱,例如 “json”、“csv” 等。

平行資料擷取技巧

當需要處理大量資料時,平行擷取是提升效能的關鍵。Spark 提供了 parallelizeflatMap 方法來實作這一點:

# 平行擷取多年的郵件列表資料
years = [2018, 2019, 2020, 2021, 2022]
yearly_data = sc.parallelize(years).flatMap(lambda year: fetchMailingListData(year))

這個範例展示瞭如何平行處理多年的資料。sc.parallelize(years) 建立一個分散式集合,然後 flatMap 在不同的執行器上平行執行 fetchMailingListData 函式。這種技術特別適合於需要從不同來源或時間段擷取資料的情境,能夠顯著減少資料載入時間。

資料驗證與清理

資料品質對模型效能至關重要。在我的經驗中,處理資料問題越早,後續工作就越順利。

驗證資料結構

明確定義並驗證資料結構是確保資料品質的第一步:

# 定義並驗證資料結構
ids_schema = StructType([
    StructField("In-Reply-To", StringType(), nullable=True),
    StructField("message-id", StringType(), nullable=True)
])
ids_in_reply = session.read.format("parquet").schema(ids_schema).load(
    fs_prefix + "/ids_in_reply")

這段程式碼定義了一個包含兩個字串欄位的結構,並在讀取資料時應用此結構。透過明確指定結構,Spark 會在載入階段驗證資料,而不是在後續處理或模型佈署時才發現問題。nullable=True 表示這些欄位可以為空。

處理缺失值

缺失值處理是資料清理的常見任務:

# 處理缺失值
initial_posts_count = initial_posts.count()
initial_posts_cleaned = initial_posts.na.drop(how='any', subset=['body', 'from'])
initial_posts_cleaned_count = initial_posts_cleaned.count()

# 計算移除比例
removal_percentage = (initial_posts_count - initial_posts_cleaned_count) / initial_posts_count * 100
print(f"移除了 {removal_percentage:.2f}% 的記錄")

這個範例使用 Spark 的 na.drop() 方法移除 ‘body’ 或 ‘from’ 欄位中有缺失值的記錄。how='any' 表示只要指定欄位中有任一個為空,就會移除該記錄。程式碼還計算並顯示了被移除記錄的百分比,這有助於評估資料清理的影響。

過濾不良資料

識別並移除不符合業務邏輯的資料是確保模型訓練品質的關鍵:

# 過濾不良資料
def is_valid_post(post):
    # 檢查郵件內容是否符合業務規則
    if post.body is None or len(post.body) < 10:
        return False
    if post.subject is None or len(post.subject) == 0:
        return False
    # 檢查日期格式
    try:
        if post.date and datetime.strptime(post.date, "%Y-%m-%d"):
            return True
    except:
        return False
    return True

spark_mailing_list_data_cleaned = spark_mailing_list_data_with_date.filter(is_valid_post)

這段程式碼定義了一個自訂函式 is_valid_post,用於驗證每筆郵件記錄是否符合特定的業務規則。它檢查郵件內容長度、主旨是否存在,以及日期格式是否有效。Spark 的 filter 方法將此函式應用於每筆記錄,只保留回傳 True 的記錄。這種方法比簡單的異常值移除更精確,因為它根據業務知識而非純統計方法。

使用 Spark SQL 進行資料處理

對於熟悉 SQL 的開發者,Spark SQL 提供了一個熟悉與強大的介面:

# 註冊臨時資料表並使用 SQL 查詢
ids_in_reply.registerTempTable("replies")
empty_messages = session.sql("SELECT * FROM replies WHERE body = '' AND subject = ''")
python_exceptions = session.sql("""
    SELECT * FROM replies 
    WHERE body LIKE '%Traceback (most recent call last)%' 
    AND body LIKE '%Python%'
""")

這個範例展示瞭如何使用 Spark SQL 處理資料。首先,使用 registerTempTable 將 DataFrame 註冊為臨時資料表。然後,使用 session.sql() 執行 SQL 查詢。第一個查詢找出沒有內容和主旨的郵件,第二個查詢則識別包含 Python 異常堆積積疊追蹤的郵件。這種方法特別適合於複雜的過濾條件或熟悉 SQL 的開發者。

儲存處理後的資料

完成資料處理後,需要儲存結果以供後續使用:

# 轉換為 Pandas DataFrame
pandas_posts = initial_posts_cleaned.toPandas()

# 儲存為 Parquet 格式
initial_posts_cleaned.write.format("parquet").mode('overwrite').save(fs_prefix + "/cleaned_posts")
ids_in_reply.write.format("parquet").mode('overwrite').save(fs_prefix + "/cleaned_replies")

這段程式碼展示了兩種儲存處理後資料的方法:

  1. 使用 toPandas() 將 Spark DataFrame 轉換為 Pandas DataFrame,適合後續使用單機工具處理
  2. 使用 write.format().mode().save() 將資料儲存為 Parquet 格式,適合後續使用分散式工具處理

mode('overwrite') 指定如果目標位置已存在資料,則覆寫它。這對於重複執行資料處理流程很有用。

使用 Spark 進行分散式特徵工程

Spark 的 ML 函式庫提供了豐富的特徵工程工具,特別適合處理大規模資料:

文字資料的特徵工程

對於郵件列表這類別文字資料,我們可以使用以下技術將文字轉換為機器學習模型可用的特徵:

# 文字特徵工程流程
tokenizer = Tokenizer(inputCol="body", outputCol="body_tokens")
body_hashing = HashingTF(inputCol="body_tokens", outputCol="raw_body_features", numFeatures=10000)
body_idf = IDF(inputCol="raw_body_features", outputCol="body_features")
body_word2Vec = Word2Vec(
    vectorSize=5,
    minCount=0,
    numPartitions=10,
    inputCol="body_tokens",
    outputCol="body_vecs"
)

# 組合所有特徵
assembler = VectorAssembler(
    inputCols=[
        "body_features", 
        "body_vecs", 
        "contains_python_stack_trace",
        "contains_java_stack_trace", 
        "contains_exception_in_task"
    ],
    outputCol="features"
)

這段程式碼建立了一個完整的文字特徵工程流程:

  1. Tokenizer 將文字分割為單詞
  2. HashingTF 使用雜湊技術將單詞轉換為特徵向量,numFeatures=10000 定義特徵向量的維度
  3. IDF 計算逆檔案頻率,增強重要但不常見單詞的權重
  4. Word2Vec 訓練詞嵌入模型,將單詞對映到語義空間中的向量
  5. VectorAssembler 將所有特徵組合成一個統一的特徵向量

這種特徵工程流程特別適合於處理電子郵件、論壇帖子等文字資料,能夠捕捉文字的語義訊息。

構建完整的特徵工程管道

Spark 的管道 (Pipeline) 功能允許我們將多個處理步驟組合成一個流程:

# 構建特徵工程管道
pipeline = Pipeline(stages=[
    tokenizer,
    body_hashing,
    body_idf,
    body_word2Vec,
    assembler
])

# 訓練並應用管道
pipeline_model = pipeline.fit(initial_posts_cleaned)
processed_data = pipeline_model.transform(initial_posts_cleaned)

這個範例展示瞭如何使用 Spark 的 Pipeline API 構建完整的特徵工程流程。管道將之前定義的所有特徵轉換器組合成一個統一的處理流程。pipeline.fit() 方法訓練管道(例如,學習 IDF 權重和 Word2Vec 模型),而 pipeline_model.transform() 則應用訓練好的管道處理資料。這種方法的優勢在於它確保了特徵工程步驟的一致性,並簡化了程式碼結構。

進階特徵工程技巧

自訂特徵轉換器

有時標準轉換器無法滿足特定需求,這時可以建立自訂轉換器:

# 自訂特徵轉換器
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

class StackTraceDetector(Transformer, HasInputCol, HasOutputCol):
    def __init__(self, inputCol=None, outputCol=None, traceType="python"):
        super(StackTraceDetector, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.traceType = traceType
        
    def _transform(self, dataset):
        if self.traceType == "python":
            pattern = "Traceback (most recent call last)"
        elif self.traceType == "java":
            pattern = "Exception in thread"
        else:
            pattern = "Error:"
            
        detectUDF = udf(lambda text: pattern in text if text else False, BooleanType())
        return dataset.withColumn(self.outputCol, detectUDF(dataset[self.inputCol]))

這段程式碼定義了一個自訂的 StackTraceDetector 轉換器,用於檢測文字中是否包含特定型別的堆積積疊追蹤。它繼承自 Spark 的 Transformer 類別,並實作了 _transform 方法。

這個轉換器使用 UDF (User-Defined Function) 來檢查輸入文字是否包含特定模式。根據 traceType 引數,它可以檢測 Python、Java 或通用錯誤訊息。這種自訂轉換器可以無縫整合到 Spark 的管道中,與內建轉換器一起使用。

特徵選擇與降維

對於高維特徵,選擇最相關的特徵或降維通常很重要:

# 特徵選擇與降維
from pyspark.ml.feature import PCA, ChiSqSelector
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

# 主成分分析降維
pca = PCA(k=50, inputCol="body_features", outputCol="body_features_pca")

# 卡方特徵選擇(假設有標籤列 "category")
selector = ChiSqSelector(
    numTopFeatures=100,
    featuresCol="features",
    outputCol="selectedFeatures",
    labelCol="category"
)

# 應用特徵選擇
selected_features_model = selector.fit(processed_data)
reduced_features_data = selected_features_model.transform(processed_data)

這個範例展示了兩種減少特徵維度的技術:

  1. 主成分析 (PCA):一種無監督降維技術,將高維特徵投影到捕捉最大方差的低維空間
  2. 卡方特徵選擇:一種監督特徵選擇方法,選擇與目標變數最相關的特徵

這些技術有助於減少模型複雜度、提高訓練速度,並可能改善模型泛化能力。在處理文字特徵時特別有用,因為文字特徵通常是高維與稀疏的。

結合 Spark ML 進行模型訓練

完成特徵工程後,可以直接使用 Spark ML 進行模型訓練:

# 使用 Spark ML 訓練模型
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 分割訓練集和測試集
train_data, test_data = processed_data.randomSplit([0.8, 0.2], seed=42)

# 訓練隨機森林分類別器
rf = RandomForestClassifier(
    labelCol="category",
    featuresCol="selectedFeatures",
    numTrees=100
)
rf_model = rf.fit(train_data)

# 評估模型
predictions = rf_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(
    labelCol="category",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"模型準確率: {accuracy:.4f}")

這段程式碼展示瞭如何使用 Spark ML 訓練和評估機器學習模型:

  1. 使用 randomSplit 將資料分割為訓練集和測試集
  2. 建立 RandomForestClassifier 分類別器,使用前面特徵選擇步驟產生的特徵
  3. 使用 fit 方法訓練模型
  4. 使用 transform 方法生成預測結果
  5. 使用 MulticlassClassificationEvaluator 計算模型準確率

這種方法的優勢在於整個流程(從資料載入、特徵工程到模型訓練和評估)都在 Spark 中完成,充分利用了分散式計算的能力。

模型持久化與佈署

訓練完成後,可以持久化模型以供後續使用:

# 儲存管道模型和隨機森林模型
pipeline_model.save(fs_prefix + "/models/feature_pipeline")
rf_model.save(fs_prefix + "/models/random_forest_model")

# 載入儲存的模型
from pyspark.ml import PipelineModel
from pyspark.ml.classification import RandomForestClassificationModel

loaded_pipeline = PipelineModel.load(fs_prefix + "/models/feature_pipeline")
loaded_model = RandomForestClassificationModel.load(fs_prefix + "/models/random_forest_model")

這段程式碼展示瞭如何儲存和載入 Spark ML 模型。儲存管道模型確保了特徵工程流程的一致性,這對於生產環境中的模型佈署至關重要。saveload 方法支援各種儲存系統,包括本地檔案系統、HDFS 和 S3 相容的儲存系統(如 MinIO)。

Spark 與 MinIO 的整合為資料科學家和工程師提供了一個強大的工具組合,能夠高效處理大規模資料。從資料讀取、清理到特徵工程和模型訓練,Spark 提供了一站式解決方案,而 MinIO 則提供了彈性與高效的儲存層。這種組合特別適合於雲端原生環境,如 Kubernetes,可以根據需求輕鬆擴充套件計算和儲存資源。

在實際應用中,我發現這種架構不僅提高了開發效率,還增強了模型訓練流程的可重複性和可靠性。特別是在處理大規模文字資料時,Spark 的分散式特徵工程能力顯著縮短了處理時間,而 MinIO 的高效能物件儲存則確保了資料存取不會成為瓶頸。

透過本文介紹的技術和範例,你應該能夠建立起一個強大的資料處理和特徵工程流程,為後續的機器學習任務奠定堅實基礎。隨著資料量的增長和模型複雜度的提高,這種分散式架構的價值將變得更加明顯。

構建完整的 Kubeflow 資料處理管道

在前面的討論中,我們已經解決了資料和特徵準備的個別問題,現在需要將這些元件整合為一個完整的管道。當處理機器學習專案時,將各個處理步驟組織成一個連貫的管道不僅能提高效率,還能確保流程的可重複性與可靠性。

將獨立功能組織成管道

在本地開發環境中,我們設計的函式都有明確的輸入型別和回傳引數,這使得它們很容易整合到管道中。每個函式回傳輸出檔案的路徑,這些路徑可以作為下一步驟的輸入,從而自然地形成依賴關係圖。

以下是將這些函式組織成 Kubeflow 管道的範例:

@kfp.dsl.pipeline(name='Simple1', description='Simple1')
def my_pipeline_mini(year: int):
    # 建立永續性儲存區宣告來儲存處理資料
    dvop = dsl.VolumeOp(name="create_pvc",
                       resource_name="my-pvc-2",
                       size="5Gi",
                       modes=dsl.VOLUME_MODE_RWO)
    
    # 建立永續性儲存區宣告來儲存 TLD 資料
    tldvop = dsl.VolumeOp(name="create_pvc",
                         resource_name="tld-volume-2",
                         size="100Mi",
                         modes=dsl.VOLUME_MODE_RWO)
    
    # 將函式轉換為容器操作
    download_data_op = kfp.components.func_to_container_op(
        download_data, packages_to_install=['lxml', 'requests'])
    
    download_tld_info_op = kfp.components.func_to_container_op(
        download_tld_data,
        packages_to_install=['requests', 'pandas>=0.24', 'tables'])
    
    clean_data_op = kfp.components.func_to_container_op(
        clean_data, packages_to_install=['pandas>=0.24', 'tables'])
    
    # 定義管道步驟並設定依賴關係
    step1 = download_data_op(year).add_pvolumes(
        {"/data_processing": dvop.volume})
    
    step2 = clean_data_op(input_path=step1.output).add_pvolumes(
        {"/data_processing": dvop.volume})
    
    step3 = download_tld_info_op().add_pvolumes({"/tld_info": tldvop.volume})

    # 編譯管道
    kfp.compiler.Compiler().compile(my_pipeline_mini, 'local-data-prep-2.zip')

這段程式碼定義了一個簡單的 Kubeflow 管道,包含三個主要步驟:下載資料、清理資料和下載 TLD(頂級網域名稱)訊息。管道使用 VolumeOp 建立永續性儲存區來儲存處理過程中的資料,確保資料在不同步驟間能夠分享。func_to_container_op 函式將普通 Python 函式轉換為容器操作,同時指定所需的依賴包。步驟之間的依賴關係透過將一個步驟的輸出作為另一個步驟的輸入來建立,如 step2 使用 step1.output 作為輸入引數。最後,管道被編譯成一個 zip 檔案,可以上載到 Kubeflow 平台執行。

安裝特定依賴套件

特徵準備步驟可能需要不同的函式庫,我們可以在轉換為容器操作時指定所需的套件:

prepare_features_op = kfp.components.func_to_container_op(
    prepare_features,
    packages_to_install=['pandas>=0.24', 'tables', 'scikit-learn'])

這段程式碼建立了一個特徵準備操作,並指定了所需的 Python 套件。func_to_container_op 函式將 prepare_features 函式轉換為容器操作,同時安裝指定的套件。這裡需要 pandas、tables 和 scikit-learn 套件,這些是資料分析和機器學習中常用的工具。這種方式讓我們可以為每個步驟定製所需的環境,而不需要構建包含所有可能需要的套件的巨大容器映像。

當開始探索新資料集時,可能更容易先使用 Jupyter 筆記本進行實驗,而不是立即構建管道元件。不過,盡可能遵循與管道相同的結構,將使後續的生產化工作更加容易。

指定自定義容器

有時候,我們需要使用特定的容器映像,例如包含 SpamAssassin 的容器:

clean_data_op = kfp.components.func_to_container_op(
    clean_data,
    base_image="{0}/kubeflow/spammassisan".format(container_registry),
    packages_to_install=['pandas>=0.24', 'tables'])

這段程式碼展示瞭如何指定基礎映像來建立容器操作。base_image 引數指定了容器的基礎映像,這裡使用了一個包含 SpamAssassin 工具的自定義映像。container_registry 變數應該包含容器登入檔的地址。這種方法允許我們利用預先構建的、包含特定工具或設定的容器映像,而不是每次都從頭開始構建。

權衡單一階段與多階段管道

在某些情況下,將資料寫入磁碟的成本可能過高,因此我們可能選擇將資料和特徵準備合併到單一管道階段中:

  • 單一階段優勢:避免中間資料寫入磁碟,提高效率
  • 單一階段劣勢:可能使除錯變得複雜
  • 多階段優勢:更容易除錯,每個階段可以獨立測試
  • 多階段劣勢:需要將中間結果寫入磁碟,增加 I/O 成本

在推薦系統例子中,我選擇將資料和特徵準備合併為單一階段。在分散式郵件列表例子中,則構建了單一的 Spark 作業。這些決策都根據特定場景下的效率與可維護性之間的權衡。

使用整個筆記本作為資料準備管道階段

如果不想將資料準備筆記本的各個部分轉換為管道元件,可以將整個筆記本作為一個階段。我們可以使用 JupyterHub 使用的相同容器以程式化方式執行筆記本。

封裝筆記本到容器中

需要建立一個新的 Dockerfile,指定它根據另一個容器,然後增加 COPY 指令將筆記本封裝到新容器中:

FROM gcr.io/kubeflow-images-public/tensorflow-1.6.0-notebook-cpu
COPY ./ /workdir /

這個 Dockerfile 根據 Kubeflow 提供的 TensorFlow 筆記本容器映像,並將當前目錄的內容複製到容器的 /workdir 目錄中。這樣,筆記本及其相關檔案就被包含在容器中,可以作為管道的一個階段執行。這種方法特別適合已經有現成筆記本的情況,如 census 資料例子。

增加 Python 依賴

如果需要額外的 Python 依賴,可以使用 RUN 指令安裝它們:

RUN pip3 install --upgrade lxml pandas

這行 Dockerfile 命令使用 pip3 安裝或升級 lxml 和 pandas 套件。將依賴直接安裝在容器中可以加速管道,特別是對於複雜的套件。這個例子適用於郵件列表分析場景,其中需要這些特定的套件進行資料處理。

在管道中使用自定義容器

我們可以像使用任何其他容器一樣,在管道中使用這個容器,透過 dsl.ContainerOp

notebook_op = dsl.ContainerOp(
    name='run_notebook',
    image='my-notebook-image:latest',
    command=['jupyter', 'nbconvert', '--execute', '--to', 'html', '/workdir/my_notebook.ipynb']
)

這段程式碼建立了一個容器操作,用於執行封裝在容器中的筆記本。ContainerOp 指定了操作的名稱、使用的映像以及要執行的命令。這裡使用 jupyter nbconvert 命令以程式化方式執行筆記本,並將結果轉換為 HTML 格式。這提供了在 Kubeflow 中使用筆記本的另一種方式。

如果筆記本需要 GPU 資源,可以在指定 dsl.ContainerOp 時增加 set_gpu_limit 呼叫,並根據需要的 GPU 型別指定 nvidia 或 amd。

中繼資料與人工智慧資產管理

機器學習通常涉及處理大量原始和中間(轉換)資料,最終目標是建立和佈署模型。為了理解模型,必須能夠探索用於建立模型的資料集和轉換過程(資料血統)。這些資料集和應用於它們的轉換的集合稱為模型的中繼資料。

中繼資料在機器學習中的關鍵作用

模型中繼資料對於機器學習中的可重現性至關重要,而可重現性對於可靠的生產佈署又是不可或缺的。捕捉中繼資料允許我們瞭解重新執行作業或實驗時的變化。理解這些變化對於迭代開發和改進模型是必要的。它還為模型比較提供了堅實的基礎。

正如 Pete Warden 在他的文章中定義的:

為了重現結果,需要準確記錄程式碼、訓練資料和整體平台。

同樣的訊息也適用於其他常見的機器學習操作,如模型比較、可重現的模型建立等。

Kubeflow ML 中繼資料工具

有許多不同的選項可用於跟蹤模型的中繼資料。Kubeflow 有一個內建工具,叫做 Kubeflow ML Metadata。這個工具的目標是透過跟蹤和管理工作流程產生的中繼資料,幫助 Kubeflow 使用者理解和管理他們的機器學習工作流程。

Kubeflow ML Metadata 提供的功能包括:

  1. 資料血統追蹤:記錄資料從原始形式到最終模型的完整轉換路徑
  2. 實驗管理:組織和比較不同的機器學習實驗
  3. 引數記錄:記錄模型訓練中使用的所有引數
  4. 結果視覺化:提供介面來檢視和分析結果

MLflow 追蹤

另一個可以整合到 Kubeflow 管道中的中繼資料追蹤工具是 MLflow Tracking。它提供了 API 和 UI,用於在執行機器學習程式碼時記錄引數、程式碼版本、指標和輸出檔案,並在之後視覺化結果。

MLflow 最初由 Databricks 開發,現在是 Linux 基金會的一部分。它提供了一套全面的工具,不僅用於跟蹤實驗,還用於封裝模型、管理佈署和模型註冊。

現在我們已經準備好了用於訓練模型的資料。我們已經看到,對於特徵和資料準備,沒有一種通用的方法;我們的不同例子需要不同的工具。我們還看到,即使在同一個問題中,方法也可能需要改變,就像當我們擴大郵件列表例子的範圍以包括更多資料時一樣。

特徵的數量和品質,以及產生這些特徵的資料,對機器學習專案的成功至關重要。透過執行不同資料規模的例子並比較模型效果,可以驗證這一點。

重要的是要記住,資料和特徵準備不是一次性活動,在開發模型的過程中可能需要重新存取這一步驟。可能會發現需要某個特徵,或者某個原本認為表現良好的特徵實際上暗示了資料品質問題。在接下來的章節中,當我們訓練和佈署模型時,可以隨時重新審視資料和特徵準備過程。

在機器學習工作流程中,中繼資料追蹤是確保模型可重現性和可靠性的關鍵環節。透過整合 Kubeflow ML Metadata 或 MLflow 等工具,我們可以系統地記錄、追蹤和分析模型開發過程中的各種引數、資料和結果,為模型的迭代改進和生產佈署奠定堅實基礎。

機器學習中繼資料管理的重要性

在機器學習專案中,模型開發過程產生的中繼資料對於確保模型可重現性、可解釋性以及合規性至關重要。隨著機器學習應用的增長,我們需要有系統地記錄模型訓練過程中的各種資訊,包括使用的資料集、超引數設定、模型版本以及評估指標等。

在實際開發中,我經常遇到這樣的情況:團隊成員使用不同的方法追蹤實驗結果,導致難以比較不同模型的效能或重現先前的成功實驗。這促使我們需要一個標準化的中繼資料管理系統,而 Kubeflow ML Metadata 正是為解決這個問題而設計的工具。

Kubeflow ML Metadata 概述

Kubeflow ML Metadata 是一個專為記錄和檢索模型建立過程中產生的中繼資料而設計的函式庫。目前的實作中,Kubeflow Metadata 主要提供 Python API,如果需要使用其他語言,則需要實作特定語言的 Python 外掛。

基本元素與架構

Kubeflow Metadata 的資訊組織主要圍繞三個核心概念:

  1. Workspace(工作區):所有中繼資料的容器,用於組織和追蹤記錄
  2. Run(執行):在工作區中執行的特定工作流程或實驗
  3. Execution(執行):具體的執行例項,與特定的執行相關聯

這種層次結構使得中繼資料管理更有組織性,讓我們能夠在不同層級上查詢和分析資訊。

實作 Kubeflow Metadata

讓我們透過一個簡單的範例來瞭解 Kubeflow Metadata 的基本功能。以下是使用 Kubeflow Metadata 的基本步驟:

1. 必要的匯入

首先,我們需要匯入必要的函式庫:

from kfmd import metadata
import pandas
from datetime import datetime

2. 定義工作區、執行和執行

接下來,我們需要設定一個工作區,以便 Kubeflow 可以追蹤和組織記錄:

# 定義工作區
ws1 = metadata.Workspace(
    # 連線到 Kubeflow 名稱空間中的 metadata-service
    backend_url_prefix="metadata-service.kubeflow.svc.cluster.local:8080",
    name="ws1",
    description="測試用工作區",
    labels={"n1": "v1"})

# 定義執行
r = metadata.Run(
    workspace=ws1,
    name="run-" + datetime.utcnow().isoformat("T"),
    description="ws_1 中的執行",
)

# 定義執行
exec = metadata.Execution(
    name="execution" + datetime.utcnow().isoformat("T"),
    workspace=ws1,
    run=r,
    description="執行範例",
)

這段程式碼建立了中繼資料管理的三個核心層級:工作區、執行和執行。工作區 ws1 連線到 Kubeflow 的中繼資料服務,並設定名稱和描述。執行 r 是在此工作區中的一次實驗,使用當前時間作為識別。執行 exec 是此執行的具體例項。這三個層級可以在同一應用程式中多次定義,如果它們不存在,系統會建立它們;如果已存在,則會被重複使用。

值得注意的是,工作區、執行和執行可以在相同或不同的應用程式中多次定義。如果它們不存在,系統會建立它們;如果已存在,則會被重複使用。這種設計使得在不同的實驗和應用之間分享中繼資料成為可能。

3. 記錄資料集資訊

Kubeflow 不會自動追蹤應用程式使用的資料集,我們需要在程式碼中明確註冊它們:

# 註冊資料集
data_set = exec.log_input(
    metadata.DataSet(
        description="範例資料",
        name="mytable-dump",
        owner="owner@my-company.org",
        uri="file://path/to/dataset",
        version="v1.0.0",
        query="SELECT * FROM mytable"))

這段程式碼註冊了一個資料集作為執行的輸入。我們指定了資料集的描述、名稱、擁有者、URI 位置、版本以及用於取得資料的查詢。在實際的 MNIST 範例中,這將是手寫數字的資料集。這種明確的註冊允許我們追蹤模型使用的確切資料,這對於模型再現性和稽核非常重要。

4. 記錄模型和評估指標

除了資料,Kubeflow Metadata 還允許我們儲存有關模型及其評估指標的資訊:

# 記錄模型資訊
model = exec.log_output(
    metadata.Model(
        name="MNIST",
        description="識別手寫數字的模型",
        owner="someone@kubeflow.org",
        uri="gcs://my-bucket/mnist",
        model_type="neural network",
        training_framework={
            "name": "tensorflow",
            "version": "v1.0"
        },
        hyperparameters={
            "learning_rate": 0.5,
            "layers": [10, 3, 1],
            "early_stop": True
        },
        version="v0.0.1",
        labels={"mylabel": "l1"}))

# 記錄評估指標
metrics = exec.log_output(
    metadata.Metrics(
        name="MNIST-evaluation",
        description="驗證 MNIST 模型識別手寫數字的能力",
        owner="someone@kubeflow.org",
        uri="gcs://my-bucket/mnist-eval.csv",
        data_set_id=data_set.id,
        model_id=model.id,
        metrics_type=metadata.Metrics.VALIDATION,
        values={"accuracy": 0.95},
        labels={"mylabel": "l1"}))

這段程式碼記錄了兩個關鍵元素:模型本身和評估指標。對於模型,我們記錄了名稱、描述、擁有者、儲存位置、模型型別、訓練框架、超引數設定和版本。這些資訊對於理解模型的構建方式至關重要。

對於評估指標,我們記錄了名稱、描述、擁有者、指標資料的位置、使用的資料集和模型的 ID、指標型別(這裡是驗證),以及實際的指標值(準確率為 0.95)。這些指標對於評估模型效能和比較不同模型非常重要。

以上程式碼片段實作了儲存模型建立中繼資料的所有主要步驟:

  1. 定義工作區、執行和執行
  2. 儲存用於模型建立的資料資產資訊
  3. 儲存有關建立的模型的資訊,包括其版本、型別、訓練框架和建立時使用的超引數
  4. 儲存有關模型評估指標的資訊

在實際開發中,這些程式碼應該嵌入到負責資料準備、模型訓練等任務的實際程式碼中,以捕捉這些過程中產生的中繼資料。

查詢與檢視中繼資料

收集中繼資料的目的是為了後續的分析和使用。Kubeflow Metadata 提供了兩種檢視中繼資料的方式:程式化查詢和使用中繼資料 UI。

程式化查詢

以下是一些可用於程式化查詢的功能:

1. 列出工作區中的所有模型

pandas.DataFrame.from_dict(ws1.list(metadata.Model.ARTIFACT_TYPE_NAME))

在我們的程式碼中,我們只建立了一個模型,因此這個查詢的結果將只包含這一個模型。

2. 取得基本血統

print("model id is " + model.id)
# 回傳:model id is 2

3. 找出產生此模型的執行

output_events = ws1.client.list_events2(model.id).events
execution_id = output_events[0].execution_id
print(execution_id)
# 回傳:1

4. 找出與該執行相關的所有事件

all_events = ws1.client.list_events(execution_id).events
assert len(all_events) == 3
print("\n與此模型相關的所有事件:")
pandas.DataFrame.from_dict([e.to_dict() for e in all_events])

在這個例子中,我們使用了一個輸入資料集來建立一個模型和評估指標,所以查詢結果會顯示三個事件:一個輸入事件和兩個輸出事件。

Kubeflow Metadata UI

除了提供 API 進行程式化分析外,Kubeflow Metadata 還提供了一個使用者介面,讓我們可以不寫程式碼就能檢視中繼資料。Metadata UI 可以透過主要的 Kubeflow UI 進入。

在 Kubeflow UI 中點選「Artifact Store」後,我們可以看到可用的工件(記錄的中繼資料事件)列表。從這個檢視中,我們可以點選單個工件並檢視其詳細資訊。

Kubeflow Metadata 的侷限性與 MLflow 的替代方案

雖然 Kubeflow Metadata 提供了一些用於儲存和檢視機器學習中繼資料的基本功能,但其功能非常有限,特別是在檢視和操作儲存的中繼資料方面。在實際工作中,我發現這些限制會影響團隊的工作效率和實驗的可追蹤性。

一個更強大的機器學習中繼資料管理實作是 MLflow。雖然 MLflow 不是 Kubeflow 發行版的一部分,但將它佈署在 Kubeflow 旁邊並從根據 Kubeflow 的應用程式中使用它非常簡單。

MLflow 的優勢

MLflow 是一個開放原始碼平台,用於管理端對端的機器學習生命週期。它包括三個主要功能:

  1. MLflow Tracking:追蹤實驗以記錄和比較引數和結果
  2. MLflow Projects:將 ML 程式碼封裝成可重用、可複製的形式,以便與其他資料科學家分享或轉移到生產環境
  3. MLflow Models:從各種 ML 函式倉管理和佈署模型到各種模型服務和推論平台

在我的實踐中,MLflow 的實驗追蹤功能特別有用,它提供了更豐富的視覺化和更強大的查詢能力,使團隊成員能夠更有效地協作和分析實驗結果。

MLflow Tracking 的概念

MLflow Tracking 圍繞「執行」(runs)的概念組織,執行是資料科學程式碼的執行。每次執行記錄以下資訊:

  1. 程式碼版本:執行使用的 Git 提交雜湊,如果是從 MLflow Project 執行的
  2. 開始和結束時間:執行的開始和結束時間
  3. 來源:啟動執行的檔案名,或者如果從 MLflow Project 執行,則為專案名稱和入口點
  4. 引數:執行的引數,這些可以是模型超引數或其他設定
  5. 指標:執行產生的評估指標,如準確率、損失等
  6. 工件:執行產生的檔案,如模型檔案、圖表等

MLflow Tracking 讓我們可以使用 Python、REST、R 和 Java API 記錄和查詢實驗,這顯著擴充套件了 API 的覆寫範圍,讓我們能夠從不同的 ML 元件儲存和存取中繼資料。

在 Kubeflow 中整合 MLflow

在 Kubeflow 環境中整合 MLflow 可以彌補 Kubeflow Metadata 的不足,提供更強大的中繼資料管理功能。以下是整合 MLflow 的基本步驟:

  1. 佈署 MLflow 服務:將 MLflow 服務佈署到與 Kubeflow 相同的 Kubernetes 叢集中
  2. 設定 MLflow 追蹤伺服器:設定 MLflow 追蹤伺服器,使其可以從 Kubeflow 筆記本和管道中存取
  3. 在 Kubeflow 筆記本中使用 MLflow API:在 Kubeflow 筆記本中匯入 MLflow 函式庫並使用其 API 記錄實驗

MLflow 與 Kubeflow Metadata 的比較

在實際使用中,MLflow 和 Kubeflow Metadata 各有優勢:

  • API 豐富度:MLflow 提供更多語言的 API 支援,而 Kubeflow Metadata 主要支援 Python
  • 視覺化能力:MLflow 的 UI 提供更強大的視覺化和查詢功能
  • 生態系統整合:Kubeflow Metadata 與 Kubeflow 生態系統更緊密整合
  • 佈署複雜性:Kubeflow Metadata 作為 Kubeflow 的一部分,不需要額外佈署,而 MLflow 需要單獨佈署

根據專案需求和團隊偏好,我們可以選擇使用 Kubeflow Metadata、MLflow 或兩者結合的方式來管理機器學習中繼資料。

實際應用建議

在實際的機器學習專案中,我建議採取以下策略來有效管理中繼資料:

  1. 確定中繼資料需求:在專案開始時,確定需要追蹤的中繼資料型別,包括資料集、引數、指標等
  2. 選擇適當的工具:根據專案規模和需求,選擇 Kubeflow Metadata、MLflow 或兩者結合
  3. 標準化中繼資料記錄:制定團隊中繼資料記錄標準,確保所有成員以一致的方式記錄中繼資料
  4. 自動化中繼資料收集:盡可能將中繼資料收集整合到自動化工作流程中,減少手動記錄的需要
  5. 定期審查中繼資料:定期審查收集的中繼資料,確保其完整性和準確性
  6. 利用中繼資料進行決策:使用收集的中繼資料來指導模型選擇、引數調整和佈署決策

中繼資料管理是機器學習專案成功的關鍵因素之一。Kubeflow ML Metadata 提供了一個基本框架來追蹤模型建立過程中的關鍵資訊,而 MLflow 則提供了更強大的功能來管理整個機器學習生命週期。

透過有效地管理中繼資料,我們可以提高模型的可重現性、可解釋性和可靠性,並使團隊能夠更有效地協作。在選擇和實施中繼資料管理解決方案時,應考慮專案需求、團隊技能和現有基礎設施,以找到最適合的方法。

無論選擇哪種工具,重要的是建立一個一致的中繼資料管理實踐,使其成為機器學習工作流程的自然組成部分,而不是額外的負擔。這樣,中繼資料管理將成為提升機器學習專案品質和效率的強大工具,而不僅是一個合規性要求。

MLflow與Kubeflow的完美結合:開發企業級模型追蹤平台

在機器學習專案的生命週期中,實驗追蹤與中繼資料管理是確保模型可重現性與可靠性的關鍵環節。隨著模型實驗數量增加,如何有效管理這些實驗結果、引數設定和模型成品,成為每個資料科學團隊面臨的挑戰。MLflow作為一個開放原始碼的機器學習生命週期管理工具,提供了優雅的解決方案,而與Kubeflow的整合更能將其威力發揮到極致。

在我多年的MLOps實踐中發現,建立一個穩健的模型追蹤系統是避免「模型黑洞」的第一道防線。本文將分享如何在Kubernetes環境中佈署MLflow追蹤伺服器,並整合MinIO作為儲存後端,開發一個完整的模型實驗追蹤系統。

MLflow的核心功能與中繼資料分類別

MLflow的追蹤功能主要記錄三種型別的中繼資料:

引數 (Parameters)

這些是模型訓練過程中使用的輸入變數,例如學習率、隱藏層數量、正則化係數等。引數一旦設定,在實驗過程中通常不會改變。引數的記錄對於實驗重現和模型比較至關重要。

指標 (Metrics)

指標是數值型的評估標準,用於衡量模型效能。與引數不同,指標可以在訓練過程中不斷更新,例如每個epoch的損失函式值、準確率等。MLflow會記錄指標的完整歷史,讓你能夠視覺化模型的收斂過程。

成品 (Artifacts)

成品是模型訓練產生的任何格式的輸出檔案,包括:

  • 模型檔案(如序列化的Scikit-learn模型)
  • 視覺化圖表(PNG、SVG等格式)
  • 資料檔案(如Parquet格式的處理後資料)
  • 評估報告與其他分析結果

MLflow追蹤伺服器架構設計

大多數MLflow教學示範的是本地安裝,但在企業環境中,我們需要一個集中式的追蹤伺服器,讓不同的容器化訓練任務都能寫入中繼資料,並從中央位置檢視結果。

根據這個需求,我設計了以下架構:

MLflow元件佈署架構

這個架構的核心元件包括:

  1. MinIO伺服器 - 已作為Kubeflow安裝的一部分存在,用於儲存模型和其他成品
  2. MLflow追蹤伺服器 - 需額外增加到Kubeflow安裝中的元件,提供MLflow UI介面
  3. 使用者程式碼 - 如Jupyter筆記本、Python、R或Java應用程式

這種架構的優勢在於:

  • 集中管理所有實驗中繼資料
  • 支援多使用者平行實驗
  • 利用MinIO提供雲無關的物件儲存解決方案
  • 與Kubeflow整合,實作端對端的MLOps流程

建立與佈署MLflow追蹤伺服器

MLflow追蹤伺服器有兩個主要的儲存元件:後端儲存和成品儲存。

  • 後端儲存:用於儲存實驗和執行的中繼資料,包括引數、指標和標籤。
  • 成品儲存:用於儲存大型資料,如模型檔案、圖表等。

在我的實作中,為了簡化佈署,後端儲存使用檔案系統,而成品儲存則使用MinIO(相容於S3協定)。

構建MLflow追蹤伺服器Docker映像

首先,讓我們建立一個Docker映像來執行MLflow追蹤伺服器:

FROM python:3.7
RUN pip3 install --upgrade pip && \
    pip3 install mlflow --upgrade && \
    pip3 install awscli --upgrade && \
    pip3 install boto3 --upgrade

ENV PORT 5000
ENV AWS_BUCKET bucket
ENV AWS_ACCESS_KEY_ID aws_id
ENV AWS_SECRET_ACCESS_KEY aws_key
ENV FILE_DIR /tmp/mlflow

RUN mkdir -p /opt/mlflow
COPY run.sh /opt/mlflow
RUN chmod -R 777 /opt/mlflow/

ENTRYPOINT ["/opt/mlflow/run.sh"]

這個Dockerfile做了以下幾件事:

  1. 使用Python 3.7作為基礎映像
  2. 安裝MLflow及相關的AWS工具(用於與MinIO通訊)
  3. 設定環境變數,包括伺服器連線埠、S3/MinIO存取資訊
  4. 建立目錄並複製啟動指令碼
  5. 設定入口點為啟動指令碼