批次處理是許多企業應用的基礎,特別適合處理不需要即時回應的大量資料。在批次處理中整合模型服務有幾種常見方式:

直接整合模型服務API

最簡單的方式是在批次處理作業中直接呼叫模型服務API。這種方法實作簡單,但在處理大量資料時可能會遇到效能瓶頸。

# 批次處理中直接呼叫模型服務的範例
import requests
import pandas as pd

def process_batch(data_chunk):
    results = []
    
    # 逐筆處理資料並呼叫模型服務
    for item in data_chunk:
        payload = {"instances": [item]}
        response = requests.post(
            "http://model-service:8501/v1/models/my_model:predict", 
            json=payload
        )
        prediction = response.json()["predictions"][0]
        results.append(prediction)
    
    return results

# 讀取批次資料
data = pd.read_csv("large_dataset.csv")
batch_size = 1000

all_results = []

# 分批處理
for i in range(0, len(data), batch_size):
    batch = data.iloc[i:i+batch_size].to_dict("records")
    batch_results = process_batch(batch)
    all_results.extend(batch_results)

這段程式碼展示了批次處理中最直接的模型服務整合方法。它將大型資料集分割成較小的批次,然後逐一呼叫模型服務API。雖然這種方法簡單明瞭,但它有幾個潛在問題:

  1. 每次呼叫都需要建立HTTP連線,造成額外開銷
  2. 序列處理導致整體效能不佳
  3. 缺乏錯誤處理與重試機制
  4. 網路延遲可能成為主要瓶頸

這種方法適合小型資料集或原型開發,但在生產環境處理大量資料時,我們需要更高效的解決方案。

使用批次推論端點

許多模型服務框架提供批次推論端點,允許一次請求處理多筆資料。這種方法可以顯著減少網路開銷,提升整體效能。

# 使用批次推論端點的改進版本
import requests
import pandas as pd
from concurrent.futures import ThreadPoolExecutor

def process_batch(batch):
    # 一次傳送整個批次
    payload = {"instances": batch}
    response = requests.post(
        "http://model-service:8501/v1/models/my_model:predict", 
        json=payload
    )
    return response.json()["predictions"]

# 讀取批次資料
data = pd.read_csv("large_dataset.csv")
batch_size = 1000
all_results = []

# 使用多執行緒處理多個批次
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    
    for i in range(0, len(data), batch_size):
        batch = data.iloc[i:i+batch_size].to_dict("records")
        futures.append(executor.submit(process_batch, batch))
    
    # 收集結果
    for future in futures:
        all_results.extend(future.result())

這個改進版本有兩個主要最佳化:

  1. 使用批次推論端點一次處理多筆資料,減少網路請求次數
  2. 引入多執行緒平行處理多個批次,進一步提升效能

這種方法顯著提高了處理效率,特別是當模型服務支援高效的批次處理時。然而,這種方法仍然受限於HTTP請求的開銷,與需要手動管理批次大小與平行度的平衡。

根據串流處理的批次架構

對於真正大規模的批次處理,我們可以利用串流處理框架來實作更高效的模型服務整合。這種架構結合了批次處理的資料量與串流處理的效率。

上圖展示了一個根據串流處理的批次服務實作,包含三個主要層次:

  1. Cloudflow串流處理層:呼叫模型服務處理串流中的每個元素,每個串流元件可以獨立擴充套件以提供所需吞吐量
  2. 模型伺服器層:執行實際模型推論,可透過增加模型伺服器數量獨立擴充套件
  3. 負載平衡層:使用Istio或Ambassador等工具為推論請求提供負載平衡

這種架構的優勢在於每一層都可以獨立擴充套件,使得整體解決方案在處理大規模批次資料時具有極高的彈性。

串流應用中的模型服務整合

串流應用需要即時處理持續產生的資料,並在低延遲內提供回應。在串流環境中整合模型服務面臨獨特的挑戰,包括處理速率波動、保證低延遲和維持高用性。

串流處理引擎與函式庫

在實作串流模型服務時,我們可以選擇使用完整的處理引擎或專門的函式庫:

  • 串流處理引擎:如Apache Flink、Apache Spark Streaming等,提供完整的分散式處理能力
  • 串流處理函式庫:如Akka Streams、RxJava等,提供在單一應用中處理串流的能力

選擇取決於你的規模需求和現有技術堆積積疊。對於大規模分散式應用,完整的處理引擎通常是更好的選擇;而對於較小規模的應用,串流處理函式庫可能更為輕量與易於整合。

使用Cloudflow構建串流模型服務

Cloudflow是一個開放原始碼框架,專為構建和執行分散式串流應用而設計。它提供了一種宣告式方法來定義串流處理拓撲,並自動處理佈署和擴充套件。

以下是使用Cloudflow構建串流模型服務的基本架構:

// 定義串流處理拓撲
val blueprint = StreamletGraph
  .withStreamlets(
    dataIngress,          // 資料輸入
    dataPreprocessor,     // 資料預處理
    modelInference,       // 模型推論
    resultProcessor,      // 結果處理
    resultEgress          // 結果輸出
  )
  .withConnections(
    dataIngress.out ~> dataPreprocessor.in,
    dataPreprocessor.out ~> modelInference.in,
    modelInference.out ~> resultProcessor.in,
    resultProcessor.out ~> resultEgress.in
  )

這段Scala程式碼定義了一個Cloudflow串流處理拓撲,包含五個主要元件:

  1. dataIngress:負責從外部來源(如Kafka)取得輸入資料
  2. dataPreprocessor:執行資料清理和特徵轉換
  3. modelInference:呼叫模型服務進行推論
  4. resultProcessor:處理推論結果,可能包括決策邏輯或後處理
  5. resultEgress:將處理結果輸出到外部系統

這種拓撲結構的優勢在於每個元件都可以獨立擴充套件,並且整個流程是非阻塞的,確保高吞吐量和低延遲。

實作模型推論串流元件

在Cloudflow中,模型推論元件負責與模型服務通訊並執行實際預測。以下是一個簡化的實作範例:

class ModelInferenceStreamlet extends FlowProcessor[ModelData, PredictionResult] {

  override def createLogic() = new FlowGraphProcessor[ModelData, PredictionResult] {
    override def process(modelData: ModelData): PredictionResult = {
      // 準備請求資料
      val requestData = prepareRequestData(modelData)
      
      // 呼叫模型服務
      val response = callModelService(requestData)
      
      // 解析回應並回傳結果
      parsePredictionResult(response)
    }
    
    private def callModelService(requestData: RequestData): Response = {
      // 使用HTTP客戶端呼叫模型服務
      // 在生產環境中應加入重試邏輯、熔斷機制等
      val client = HttpClient.newHttpClient()
      val request = HttpRequest.newBuilder()
        .uri(URI.create("http://model-service:8501/v1/models/my_model:predict"))
        .header("Content-Type", "application/json")
        .POST(HttpRequest.BodyPublishers.ofString(requestData.toJson))
        .build()
      
      client.send(request, HttpResponse.BodyHandlers.ofString())
    }
  }
}

這個Scala類別定義了一個Cloudflow串流處理元件,專門負責模型推論。它接收預處理後的資料,呼叫模型服務API,然後回傳預測結果。在實際應用中,這個元件應該包含更多的錯誤處理邏輯,例如:

  1. 請求重試機制,處理暫時性失敗
  2. 熔斷器模式,防止故障級聯
  3. 快取常見請求,減少重複計算
  4. 監控和指標收集,追蹤效能和可靠性

這種模組化設計使得系統各部分可以獨立演化和擴充套件,是構建可靠串流應用的關鍵。

使用串流處理實作批次模型服務

有趣的是,串流處理架構也可以用來實作高效的批次模型服務。這種方法結合了批次處理的資料量與串流處理的效率,特別適合需要處理大量資料的場景。

批次服務的串流實作架構

批次服務的串流實作包含三個主要層次:

  1. Cloudflow串流處理層:呼叫模型服務處理串流中的每個元素
  2. 模型伺服器層:執行實際模型推論
  3. 負載平衡層:為推論請求提供負載平衡

這種架構的每一層都可以獨立擴充套件,提供高度彈性的解決方案,適用於處理大規模批次資料。

批次處理與串流處理的整合

在實際應用中,我們經常需要同時支援批次和串流處理。以下是一個整合兩種模式的架構範例:

// 定義支援批次和串流的處理拓撲
val blueprint = StreamletGraph
  .withStreamlets(
    // 串流輸入路徑
    streamingIngress,
    streamingPreprocessor,
    
    // 批次輸入路徑
    batchIngress,
    batchPreprocessor,
    
    // 分享的模型推論和後處理
    modelInference,
    resultProcessor,
    
    // 不同的輸出路徑
    streamingEgress,
    batchResultStorage
  )
  .withConnections(
    // 串流處理路徑
    streamingIngress.out ~> streamingPreprocessor.in,
    streamingPreprocessor.out ~> modelInference.in1,
    
    // 批次處理路徑
    batchIngress.out ~> batchPreprocessor.in,
    batchPreprocessor.out ~> modelInference.in2,
    
    // 分享的後處理和輸出
    modelInference.out ~> resultProcessor.in,
    resultProcessor.out1 ~> streamingEgress.in,
    resultProcessor.out2 ~> batchResultStorage.in
  )

這個拓撲結構展示瞭如何在同一系統中同時支援批次和串流處理:

  1. 分別為批次和串流定義獨立的輸入和預處理路徑
  2. 分享模型推論元件,提高資源利用率
  3. 在結果處理階段根據來源和需求分流
  4. 使用不同的輸出元件處理串流結果和批次結果

這種設計允許系統以統一的方式處理不同型別的工作負載,同時保持高效率和可擴充套件性。在實際應用中,這種靈活性非常寶貴,尤其是當業務需求同時包含批次和即時處理時。

模型服務的橫向考量

無論選擇哪種模型服務架構,都需要考慮以下幾個關鍵方面:

效能最佳化

在整合模型服務時,效能往往是主要考量因素。以下是一些提升效能的策略:

  1. 批次處理:盡可能使用批次API,減少網路往往返
  2. 模型最佳化:考慮模型量化、剪枝等技術減小模型大小和推論時間
  3. 快取機制:對常見請求實施快取,避免重複計算
  4. 非同步處理:使用非阻塞I/O和事件驅動架構
  5. 資源隔離:確保關鍵模型服務有足夠與專用的資源

可觀測性與監控

有效的監控對於維護模型服務至關重要:

  1. 請求指標:追蹤請求量、延遲和錯誤率
  2. 模型指標:監控模型效能和預測分佈
  3. 系統指標:追蹤CPU、記憶體和網路使用情況
  4. 分散式追蹤:實施跨服務追蹤,瞭解請求流程
  5. 警示系統:設定適當的警示門檻,及時發現問題

可靠性與彈性

在生產環境中,可靠性是模型服務的關鍵要求:

  1. 重試機制:處理暫時性失敗
  2. 熔斷器模式:防止故障級聯
  3. 負載平衡:分散請求到多個服務例項
  4. 自動擴充套件:根據負載自動調整資源
  5. 降級策略:當主要模型不可用時提供替代方案

在我的實踐中,我發現設計良好的降級策略特別重要。例如,當複雜模型不可用時,可以切換到較簡單但更穩定的備用模型,或使用預先計算的結果,確保服務不中斷。

案例分析:金融交易異常檢測系統

為了具體說明這些概念,讓我分享一個實際案例:一個金融交易異常檢測系統,需要同時支援批次處理(歷史交易分析)和串流處理(即時交易監控)。

系統架構

該系統採用了混合架構,包含以下元件:

  1. 資料來源

    • Kafka串流(即時交易)
    • S3資料湖(歷史交易)
  2. 處理層

    • Cloudflow串流處理拓撲
    • 特徵工程元件
    • 模型推論元件
  3. 模型服務層

    • 異常檢測模型(隔離森林演算法)
    • 交易分類別模型(XGBoost)
  4. 輸出層

    • 即時警示(Kafka)
    • 分析報告(S3)

關鍵設計決策

在實作過程中,我們做出了幾個關鍵決策:

  1. 分享特徵工程邏輯:批次和串流使用相同的特徵工程式碼,確保一致性
  2. 分層模型策略:先使用輕量級模型進行初步篩選,再對可疑交易使用複雜模型深入分析
  3. 快取常見模式:對重複出現的交易模式實施快取,顯著提升效能
  4. 漸進式降級:設計多層降級策略,確保系統在各種故障情況下仍能提供基本功能

這種架構成功處理了每秒數千筆交易,同時支援對數十億筆歷史交易的批次分析,展示了根據串流處理的模型服務架構在實際應用中的強大能力。

模型服務技術選型

根據應用需求選擇合適的模型服務架構是成功的關鍵。以下是一個簡單的選型:

適合批次處理的模型服務方案

如果你的應用主要處理批次資料,考慮以下方案:

  1. 直接API呼叫:適合小型資料集和簡單應用
  2. 批次推論端點:適閤中等規模資料集,需要較高效能
  3. 根據串流的批次架構:適合大規模資料集,需要高擴充套件性和彈性

適合串流處理的模型服務方案

對於需要即時處理的串流應用,考慮以下方案:

  1. 輕量級函式庫:適合低延遲要求,單一應用內的串流處理
  2. Cloudflow等框架:適合分散式串流應用,需要高擴充套件性
  3. 混合架構:同時支援批次和串流處理的綜合解決方案

技術選型考量因素

在做出選擇時,應考慮以下因素:

  1. 延遲要求:應用能接受的最大回應時間
  2. 吞吐量需求:系統需要處理的資料量
  3. 資源限制:可用的計算和網路資源
  4. 擴充套件性需求:系統未來的增長預期
  5. 團隊技術背景:團隊現有的技術熟悉度和偏好

我發現,許多團隊傾向於選擇最新、最熱門的技術,而不是最適合其需求的解決方案。在模型服務領域,適合性遠比新穎性更重要。一個簡單但適合的架構通常比複雜的"尖端"解決方案更可靠、更易於維護。

未來趨勢與發展方向

模型服務領域正在快速發展,以下是我認為值得關注的幾個趨勢:

  1. 邊緣計算整合:將模型服務擴充套件到邊緣裝置,減少延遲並提高隱私保護
  2. 自適應模型服務:根據負載和資源動態調整模型複雜度和批次大小
  3. 多模型協同:多個專業模型協同工作,處理複雜任務
  4. 模型即程式碼(MaaC):將模型作為應用程式的一部分直接佈署,而非獨立服務
  5. 聯邦學習與服務:在保護資料隱私的前提下實作分散式學習和推論

這些趨勢反映了一個共同主題:模型服務正從獨立系統演變為更加整合、適應性強與分散的架構。作為開發者,我們需要跟進這些發展,同時保持對基本原則的關注——效能、可靠性和可維護性。

實用建議與最佳實踐

根據多年實踐經驗,我想分享一些實用建議:

  1. 從簡單開始:先實作最簡單的解決方案,然後根據需要增加複雜性
  2. 分層設計:將模型服務架構設計為多層,每層專注於特定功能
  3. 充分測試:建立全面的測試套件,包括單元測試、整合測試和負載測試
  4. 漸進式佈署:使用藍綠佈署或金絲雀發布策略安全更新模型
  5. 考慮全生命週期:不僅關注初始佈署,還要考慮監控、更新和退役

特別值得一提的是監控策略。我建議實施"三層監控":

  • 技術指標:CPU、記憶體使用率等系統指標
  • 業務指標:請求量、成功率等營運指標
  • 模型指標:預測分佈、特徵重要性等模型健康指標

這種全面的監控方法能夠幫助你及早發現問題,無論是技術故障、業務變化還是模型效能下降。

模型服務是連線AI模型與實際應用的關鍵橋樑。透過選擇合適的架構和實作方法,我們可以充分發揮模型的價值,同時確保系統的可靠性、效能和可維護性。無論是批次處理還是串流應用,現代模型服務架構都提供了豐富的選擇,能夠滿足各種需求。

在實作過程中,請記住:最佳的解決方案不一定是最複雜的。通常,一個設計良好的簡單架構比過度工程化的複雜系統更可靠、更易於維護。從業務需求出發,選擇適合的技術,並在實踐中不斷最佳化和演進,這是成功整合模型服務的關鍵。

Kubeflow機器學習平台:為AI工作流程賦能

機器學習系統的佈署與管理一直是企業實施AI戰略的關鍵挑戰。隨著模型複雜度增加與計算需求擴充套件,傳統的機器學習工作流程已難以滿足現代企業需求。在這樣的背景下,Kubeflow作為一個專為Kubernetes開發的機器學習平台,正逐漸成為業界的標準解決方案。

Kubeflow的核心價值與定位

Kubeflow本質上是一個開放原始碼平台,旨在簡化在Kubernetes上佈署、監控和管理機器學習工作流程的過程。它不僅是一個工具,而是一個完整的生態系統,將機器學習的各個階段—從資料準備、模型訓練到模型服務—整合到一個統一的平台中。

在探索Kubeflow之前,我認為有必要理解它解決的核心問題:機器學習工作流程在生產環境中的複雜性。這包括資源排程、版本控制、模型佈署與監控等諸多挑戰。Kubeflow透過利用Kubernetes的協調能力,為這些挑戰提供了優雅的解決方案。

Kubeflow架構與核心元件解析

平台架構設計哲學

Kubeflow的架構設計遵循模組化原則,這使得使用者可以根據需求選擇性地佈署特定元件。這種靈活性是我在實際專案中最欣賞的特點之一。平台的核心是Kubernetes,它提供了底層的容器協調能力,而Kubeflow則在此基礎上增加了機器學習特定的功能層。

關鍵元件剖析

  1. Jupyter Notebooks:提供互動式開發環境,支援多種語言和框架
  2. TF工作 & PyTorch Operators:用於管理分散式訓練任務
  3. Pipelines:定義和執行可重複的工作流程
  4. KFServing:簡化模型佈署與服務
  5. Katib:自動化超引數調優和神經網路架構搜尋

這些元件各司其職,共同構成了完整的機器學習生命週期管理系統。在實際應用中,我發現Pipelines元件尤其有價值,它使資料科學家能夠定義可重複、可分享的工作流程,大提高了團隊協作效率。

佈署Kubeflow:從理論到實踐

環境準備與前置需求

在佈署Kubeflow之前,需要確保Kubernetes叢集已經正確設定。我建議使用Kubernetes 1.15或更高版本,並確保叢集有足夠的資源處理機器學習工作負載。

對於開發和測試環境,可以使用minikube或kind等輕量級Kubernetes發行版。但對於生產環境,我通常推薦使用GKE、EKS或AKS等雲端服務商提供的管理型Kubernetes服務,這些服務提供了更好的可靠性和可擴充套件性。

安裝方法比較

Kubeflow提供了多種安裝方法,每種方法都有其優缺點:

  1. Kubeflow Manifests:使用kustomize管理YAML設定
  2. Kubeflow Distributions:預設設定的發行版,如AWS for Kubeflow
  3. Operator-based Installation:使用Kubernetes Operator進行管理

對於初學者,我建議從Kubeflow Distributions開始,因為它提供了預設設定的環境。而對於需要高度定製的場景,Manifests方法提供了更多的靈活性。

實用佈署指令碼

以下是使用kustomize佈署Kubeflow的基本指令碼:

# 克隆官方倉函式庫
git clone https://github.com/kubeflow/manifests.git
cd manifests

# 使用kustomize佈署
while ! kustomize build example | kubectl apply -f -; do
  echo "等待資源建立完成..."
  sleep 10
done

# 驗證佈署
kubectl get pods -n kubeflow

這個指令碼首先克隆Kubeflow的官方設定倉函式庫,然後使用kustomize工具將設定轉換為Kubernetes可識別的格式並應用到叢集。這裡使用了一個while迴圈來處理可能的依賴關係問題,確保所有資源都能正確建立。最後,透過檢查kubeflow名稱空間中的pod來驗證佈署是否成功。

在實際佈署中,我常遇到資源限制或網路問題。針對這些情況,建議增加超時設定並實施重試機制。同時,在生產環境中,應該考慮使用Helm或Operator來簡化管理流程。

Kubeflow Pipelines:構建可重複的機器學習工作流程

工作流程設計理念

Kubeflow Pipelines是Kubeflow中最強大的功能之一,它允許使用者定義端對端的機器學習工作流程。每個Pipeline由多個元件組成,這些元件可以是資料處理、模型訓練或評估等任務。

工作流程的設計遵循以下原則:

  • 模組化:每個元件應該專注於單一任務
  • 可重用性:元件應該設計為可在不同管道中重用
  • 可引數化:透過引數控制元件行為
  • 可追溯性:每個執行都有完整的歷史記錄

Pipeline定義與執行

以下是使用Python SDK定義一個簡單Pipeline的例子:

import kfp
from kfp import dsl
from kfp.components import func_to_container_op

# 定義元件
@func_to_container_op
def preprocess_data(data_path: str) -> str:
    # 資料預處理邏輯
    import pandas as pd
    
    # 載入資料
    df = pd.read_csv(data_path)
    
    # 執行預處理
    # ...預處理程式碼...
    
    # 儲存處理後的資料
    output_path = "/tmp/processed_data.csv"
    df.to_csv(output_path, index=False)
    
    return output_path

@func_to_container_op
def train_model(data_path: str, hyperparams: dict) -> str:
    # 模型訓練邏輯
    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import pickle
    
    # 載入資料
    df = pd.read_csv(data_path)
    X = df.drop('target', axis=1)
    y = df['target']
    
    # 訓練模型
    model = RandomForestClassifier(
        n_estimators=hyperparams.get('n_estimators', 100),
        max_depth=hyperparams.get('max_depth', None)
    )
    model.fit(X, y)
    
    # 儲存模型
    model_path = "/tmp/model.pkl"
    with open(model_path, 'wb') as f:
        pickle.dump(model, f)
    
    return model_path

# 定義Pipeline
@dsl.pipeline(
    name="Simple ML Pipeline",
    description="A simple ML pipeline with preprocessing and training steps"
)
def ml_pipeline(data_path: str = "gs://my-bucket/data.csv"):
    # 資料預處理
    preprocess_task = preprocess_data(data_path)
    
    # 模型訓練
    hyperparams = {'n_estimators': 200, 'max_depth': 10}
    train_task = train_model(preprocess_task.output, hyperparams)
    
    # 設定依賴關係
    train_task.after(preprocess_task)

# 編譯並執行Pipeline
if __name__ == "__main__":
    kfp.compiler.Compiler().compile(ml_pipeline, "ml_pipeline.yaml")
    client = kfp.Client()
    client.create_run_from_pipeline_func(
        ml_pipeline,
        arguments={}
    )

這個Pipeline範例展示了一個簡單的機器學習工作流程,包含資料預處理和模型訓練兩個步驟。

首先,我們使用@func_to_container_op裝飾器將Python函式轉換為Kubeflow Pipeline元件。這個裝飾器會自動將函式封裝成容器,使其能夠在Kubernetes叢集中執行。

preprocess_data函式負責資料清洗和特徵工程,它接收一個資料路徑引數,並回傳處理後的資料路徑。train_model函式則負責模型訓練,它接收處理後的資料和超引數,並回傳儲存的模型路徑。

在Pipeline定義中,我們使用@dsl.pipeline裝飾器來標記Pipeline函式。在函式內部,我們建立了兩個任務並設定了它們之間的依賴關係,確保訓練任務在預處理完成後執行。

最後,我們編譯Pipeline為YAML格式,並使用KFP客戶端建立一個執行例項。在實際應用中,這個Pipeline可以被儲存、分享和重複執行,大提高了機器學習工作流程的可重用性和可再現性。

高階Pipeline功能

在實際專案中,我們通常需要更複雜的Pipeline功能,如條件執行、迴圈和平行處理。以下是一些高階用法範例:

@dsl.pipeline(
    name="Advanced ML Pipeline",
    description="Pipeline with advanced features"
)
def advanced_ml_pipeline(data_path: str, model_type: str = "rf"):
    # 資料拆分
    with dsl.ParallelFor([0.7, 0.8, 0.9]) as train_ratio:
        split_task = split_data(data_path, train_ratio)
        
        # 條件執行不同型別的模型
        with dsl.Condition(model_type == "rf"):
            rf_task = train_rf_model(split_task.output)
            
        with dsl.Condition(model_type == "xgb"):
            xgb_task = train_xgb_model(split_task.output)
    
    # 取得最佳模型
    get_best_model_task = get_best_model([rf_task.output, xgb_task.output])

這個高階Pipeline範例展示了三個重要功能:

  1. 平行迭代:使用ParallelFor建構對不同的訓練資料比例進行平行處理,這可以幫助我們快速評估資料量對模型效能的影響。

  2. 條件執行:透過Condition建構根據指定的模型型別選擇性地執行不同的訓練任務。這使得一個Pipeline可以處理多種模型型別,增加了靈活性。

  3. 輸出聚合get_best_model任務接收所有模型的輸出,並選擇效能最佳的模型。

這種高階Pipeline設計允許我們在單個工作流程中嘗試多種設定和方法,大提高了實驗效率。在實際應用中,我通常會結合使用這些功能來構建複雜的機器學習工作流程,如自動化模型選擇、超引數最佳化和整合學習等。