在機器學習應用佈署至生產環境後,系統的效能和可擴充套件性面臨諸多挑戰。本文介紹如何利用過濾模型識別並過濾掉對主模型來說過於複雜或不確定的樣本,從而減少主模型的計算負擔,提升整體系統效率。文章將詳細說明如何訓練、評估和應用過濾模型,並提供 Python 程式碼範例。同時,也將探討快取機制、有向無環圖(DAG)和使用者反饋機制等策略,以確保機器學習應用在生產環境中的穩定運作和持續最佳化。這些策略能有效提升系統的效能和可擴充套件性,並能根據使用者行為和模型表現進行動態調整,確保模型的有效性和及時性。

使用過濾模型最佳化生產環境中的機器學習應用

在生產環境中佈署機器學習模型時,維持系統的效能和效率是一項重大挑戰。為了應對這一挑戰,可以採用多種策略,包括使用過濾模型、擴充套件模型以支援多使用者、以及最佳化訓練流程等。

訓練過濾模型

過濾模型是一種特殊的機器學習模型,用於識別那些對於主模型來說過於困難或不確定的輸入樣本。透過預先篩選這些樣本,過濾模型可以幫助減少主模型的計算負擔,從而提高整體系統的效率。

def get_filtering_model(classifier, features, labels):
    """
    取得用於過濾模型的訓練資料
    :param classifier: 已訓練的分類別器
    :param features: 輸入特徵
    :param labels: 真實標籤
    """
    predictions = classifier.predict(features)
    # 建立標籤,其中錯誤為1,正確猜測為0
    is_error = [pred != truth for pred, truth in zip(predictions, labels)]
    filtering_model = RandomForestClassifier()
    filtering_model.fit(features, is_error)
    return filtering_model

內容解密:

  1. 取得預測結果:首先使用已訓練的分類別器對輸入特徵進行預測,得到預測結果。
  2. 標記錯誤樣本:比較預測結果與真實標籤,將預測錯誤的樣本標記為1,正確的樣本標記為0。
  3. 訓練過濾模型:使用標記好的資料訓練一個隨機森林分類別器作為過濾模型。
  4. 傳回過濾模型:訓練完成後,傳回過濾模型。

過濾模型的評估與應用

一個有效的過濾模型需要滿足兩個主要條件:足夠快的執行速度和良好的篩選效果。執行速度的快慢直接影響到整體系統的效率,而篩選效果則決定了過濾模型能否有效地減少主模型的計算負擔。

假設主模型的平均推理時間為 $i$,過濾模型的執行時間為 $f$,且過濾模型能夠篩選掉的樣本比例為 $b$。那麼,使用過濾模型後的平均推理時間可以表示為 $f + i(1 - b)$。要使用過濾模型後的平均推理時間小於單獨使用主模型的時間,需要滿足 $f + i(1 - b) < i$,即 $f/i < b$。

例如,如果過濾模型的執行速度是主模型的20倍($f/i = 5%$),那麼它至少需要能夠篩選掉超過5%的樣本($b > 5%$)才能帶來實際的好處。

圖表說明:過濾模型的運作流程

圖表翻譯: 此圖示展示了過濾模型的運作流程。首先,輸入樣本進入過濾模型進行篩選。過濾模型將樣本分為兩類別:一類別是被篩選掉的困難樣本,這些樣本可能不被主模型處理或進行特殊處理;另一類別是透過篩選的樣本,這些樣本將被送入主模型進行進一步處理並輸出結果。

擴充套件機器學習應用以支援多使用者

隨著使用者數量的增加,機器學習應用的規模也需要相應擴充套件。這可以透過增加伺服器資源來實作,尤其是對於深度學習模型,可能需要使用GPU加速推理過程。

快取機制

快取是一種常見的最佳化技術,透過儲存函式呼叫的結果來避免重複計算。在機器學習應用中,可以使用最近最少使用(LRU)快取來儲存模型的推理結果。當相同的輸入再次出現時,可以直接從快取中檢索結果,而無需重新執行模型。

from functools import lru_cache

@lru_cache(maxsize=128)
def run_model(question_data):
    # 在此插入慢速的模型推理程式碼
    pass

內容解密:

  1. 使用LRU快取裝飾器:透過在run_model函式上新增@lru_cache裝飾器,可以啟用LRU快取功能。
  2. 設定快取大小maxsize引數用於設定快取的最大容量。
  3. 快取推理結果:當run_model函式被呼叫時,其結果會被自動快取。當相同的輸入再次出現時,可以直接從快取中傳回結果,而無需重新執行模型。

透過採用上述策略,可以顯著提高機器學習應用在生產環境中的效能和可擴充套件性。

快取機制在機器學習中的應用與挑戰

在機器學習的實際應用中,快取(Caching)是一種能夠有效提升系統效能的技術。快取的核心概念是將頻繁使用或計算成本較高的資料暫存起來,以便在未來需要時能夠快速存取,從而避免重複計算或查詢。

快取的適用場景

快取最適用於那些檢索特徵、處理資料和執行推論(inference)比存取快取更耗時的場景。根據快取實作方式(例如,記憶體快取 vs. 磁碟快取)和所使用的模型複雜度,快取帶來的效能提升程度會有所不同。

索引快取技術

當輸入資料具有唯一性時,傳統的快取方法可能不再適用。然而,我們可以快取其他可以預先計算的 pipeline 部分。這種做法在模型不完全依賴使用者輸入的情況下尤其有效。

舉例來說,在建立一個允許使用者根據文字查詢或提供的圖片搜尋相關內容的系統時,由於使用者查詢的多樣性,直接快取使用者查詢的內容可能無法顯著提升效能。但是,我們可以預先計算與目錄中專案相關的模型層面。這種方法在搜尋系統中特別常見。

搜尋系統中的嵌入式快取

在搜尋系統中,一種常見的做法是預先將所有索引檔案嵌入到一個有意義的向量空間中(詳見「向量化」章節)。一旦生成了嵌入向量(embeddings),就可以將它們儲存在資料函式庫中。當使用者提交搜尋查詢時,查詢內容會在推論時被嵌入,然後在資料函式庫中執行查詢,以找出最相似的嵌入向量並傳回對應的專案。這種方法顯著加快了推論速度,因為大部分計算已經預先完成。

Twitter 和 Airbnb 等公司已成功在其大規模生產 pipeline 中應用嵌入向量技術。

快取帶來的挑戰

儘管快取可以提升效能,但它也增加了系統的複雜度。快取的大小成為了一個需要根據應用程式工作負載進行調整的超引數。此外,每當模型或底層資料更新時,都需要清除快取以避免傳回過時的結果。

模型與資料生命週期管理

保持快取和模型的更新是一項挑戰。許多模型需要定期重新訓練以維持其效能。在佈署更新模型到使用者時,需要謹慎處理。通常,一個訓練好的模型被儲存為一個二進位檔案,包含了其型別、架構和學習到的引數。大多數生產應用程式在啟動時將訓練好的模型載入記憶體,並呼叫它來提供結果。

更新模型的簡單方法

一種簡單的更新模型的方法是替換應用程式載入的二進位檔案。然而,在實際操作中,這個過程往往更加複雜。理想情況下,ML 應用程式應該產生可重現的結果,能夠抵禦模型更新,並且足夠靈活以處理重大的模型和資料處理變更。

可重現性

為了追蹤和重現錯誤,需要知道哪個模型正在生產環境中執行。這需要儲存訓練好的模型和它們所訓練的資料集的存檔。每個模型/資料集對都應該被賦予一個唯一的識別碼。每次在生產環境中使用模型時,都應該記錄這個識別碼。

彈性

使應用程式能夠在更新模型後載入新模型,需要建立一個流程來載入新模型,理想情況下不中斷服務。這可能涉及啟動一個新的伺服器來服務更新後的模型,並逐漸將流量轉移到新伺服器。

Pipeline 的靈活性

新的模型版本往往需要額外的預處理步驟或不同的特徵。因此,應用程式版本也應該在模型進行預測時被記錄下來,以使預測結果可重現。這增加了 pipeline 的複雜度,因為預處理和後處理步驟現在也需要是可重現和可修改的。

資料處理和 DAGs

為了產生可重現的結果,訓練 pipeline 也應該是可重現和決定性的。對於給定的資料集、預處理步驟和模型的組合,訓練 pipeline 應該在每次訓練執行中產生相同的訓練模型。

在建立模型的過程中,需要許多連續的轉換步驟,因此 pipeline 經常會在不同的位置中斷。這使得保證每個部分都成功執行,並且它們都按照正確的順序執行變得非常重要。

# 示例程式碼:簡單的快取機制實作
import functools

def cache_results(func):
    """簡單的函式結果快取裝飾器"""
    cache = {}
    @functools.wraps(func)
    def wrapper(*args):
        if args in cache:
            return cache[args]
        result = func(*args)
        cache[args] = result
        return result
    return wrapper

@cache_results
def expensive_computation(x):
    # 模擬昂貴的計算
    import time
    time.sleep(2)
    return x * x

print(expensive_computation(10))  # 第一次計算需要時間
print(expensive_computation(10))  # 第二次直接從快取傳回結果

內容解密:

此範例展示了一個簡單的函式結果快取裝飾器 cache_results。該裝飾器用於快取函式 expensive_computation 的結果,避免重複執行昂貴的計算。當第一次呼叫 expensive_computation(10) 時,函式會執行並傳回結果,同時將結果快取起來。當第二次呼叫 expensive_computation(10) 時,直接從快取中傳回結果,無需再次執行函式,從而顯著提升效能。

圖表翻譯: 此圖示展示了一個簡單的快取流程。首先檢查是否有現成的快取結果,若有則直接傳回;若無,則執行相應的函式運算,並將結果存入快取中以供未來使用。

透過有向無環圖(DAG)簡化機器學習流程

在機器學習(ML)領域,將原始資料轉換為訓練好的模型這一過程,可以透過將其表示為有向無環圖(DAG)來簡化。每個節點代表一個處理步驟,而每個步驟之間的依賴關係則由圖的結構來表達。這種方法源自資料流程式設計的概念,而TensorFlow這一流行的ML函式庫正是根據此理念設計的。

DAG在預處理中的應用

DAG提供了一種自然的方式來視覺化預處理流程。如圖10-11所示,每個箭頭代表一個依賴於另一個任務的任務。這種表示方法使每個任務保持簡單,並利用圖的結構來表達複雜性。

圖表翻譯:

此圖示展示了一個典型的DAG結構,其中每個節點代表一個處理步驟,而箭頭則表示步驟之間的依賴關係。這種結構使得整個預處理流程變得清晰且易於管理。

一旦我們有了DAG,就可以確保每次生成模型時都遵循相同的操作集。有多種解決方案可以用來定義ML的DAG,包括開源專案如Apache Airflow或Spotify的Luigi。這些工具包允許使用者定義DAG,並提供一系列儀錶板來監控DAG的進度以及相關的日誌。

為何使用DAG

在建立MLPipeline的初期,使用DAG可能顯得不必要地複雜。然而,一旦模型成為生產系統的核心部分,重現性的需求使得DAG變得非常有吸引力。當模型被定期重新訓練和佈署時,任何有助於系統化、除錯和版本控制Pipeline的工具都將成為節省時間的關鍵。

向使用者尋求反饋

本章最後將介紹一種直接保證模型表現良好的方法——向使用者尋求反饋。

收集使用者反饋

您可以透過明確詢問使用者或測量隱含訊號來收集反饋。在顯示模型預測結果時,可以附帶提供讓使用者判斷和糾正預測結果的途徑。這可以簡單到一個對話方塊,詢問“這個預測有用嗎?”或者更為微妙的方式。

範例程式碼:收集使用者反饋

def collect_user_feedback(prediction, user_input):
    # 記錄使用者對預測結果的反饋
    feedback = {
        'prediction': prediction,
        'user_input': user_input,
        'useful': user_input == prediction  # 簡單判斷使用者輸入是否與預測一致
    }
    # 將反饋儲存到資料函式庫或日誌中
    save_feedback_to_database(feedback)
    return feedback

#### 內容解密:
此段程式碼定義了一個函式`collect_user_feedback`,用於收集使用者對模型預測結果的反饋它接受兩個引數:`prediction`(模型的預測結果`user_input`(使用者的實際輸入或糾正)。函式傳回一個包含預測結果使用者輸入和一個簡單的有用判斷的字典這個字典隨後被儲存到資料函式庫或日誌中用於後續分析和模型改進

像Mint這樣的預算應用程式會自動對每個交易進行分類別(類別包括旅行、食品等)。如圖10-12所示,每個類別在UI中顯示為一個可編輯的欄位,使用者可以在需要時進行編輯和糾正。這樣的系統允許以一種不那麼侵入式的方式收集寶貴的反饋,用於持續改進模型。

圖表翻譯:

此圖示展示了Mint應用程式如何允許使用者直接糾正模型的預測結果,從而收集到寶貴的使用者反饋。

隱含訊號的收集

由於使用者無法對模型的每個預測結果都提供反饋,因此收集隱含訊號是評估ML效能的重要方式。收集隱含訊號包括觀察使用者執行的操作,以推斷模型是否提供了有用的結果。

範例程式碼:收集隱含訊號

def collect_implicit_signals(user_actions, predictions):
    # 分析使用者行為以推斷模型的表現
    signals = []
    for action, prediction in zip(user_actions, predictions):
        if action == 'click':  # 使用者點選了推薦專案
            signals.append({'signal': 'positive', 'prediction': prediction})
        elif action == 'ignore':  # 使用者忽略了推薦專案
            signals.append({'signal': 'negative', 'prediction': prediction})
    # 分析收集到的訊號以評估模型的整體表現
    analyze_signals(signals)
    return signals

#### 內容解密:
此段程式碼定義了一個函式`collect_implicit_signals`,用於透過分析使用者行為來收集隱含訊號它接受兩個引數:`user_actions`(使用者的操作記錄`predictions`(模型的預測結果)。函式透過比較使用者的操作和模型的預測結果來推斷訊號並將這些訊號儲存起來以供後續分析

透過收集和分析這些資訊,如圖10-13所示,您可以估計使用者發現結果有用的頻率。隱含訊號的收集是有用的,但也伴隨著收集和儲存資料以及可能引入負面反饋迴圈的風險。

監控與更新模型:確保機器學習系統的穩定運作

在佈署模型後,其效能需要像其他軟體系統一樣受到監控。就像在「測試你的機器學習程式碼」中所做的那樣,正常的軟體最佳實踐同樣適用。而與「測試你的機器學習程式碼」一樣,在處理機器學習模型時,還有一些額外的考慮因素。

在本章中,我們將描述監控機器學習模型時需要牢記的關鍵方面。更具體地說,我們將回答三個問題:

  1. 為什麼我們應該監控我們的模型?
  2. 我們如何監控我們的模型?
  3. 我們的監控應該驅動什麼行動?

監控的重要性

監控的目標是追蹤系統的健康狀況。對於模型來說,這意味著監控其效能和預測的品質。如果使用者習慣的變化突然導致模型產生不佳的結果,一個好的監控系統將使我們能夠盡快注意到並做出反應。

監控以判斷更新頻率

大多數模型需要定期更新以保持一定的效能。監控可用於檢測模型何時不再新鮮並需要重新訓練。例如,如果我們持續監控模型的準確性,我們可以在準確性下降到某個閾值以下時訓練一個新的模型。圖11-1顯示了這個過程的時間軸,重新訓練事件發生在準確性下降到閾值以下時。

圖表翻譯:圖11-1展示了模型的準確性隨著時間的變化,並在準確性下降到閾值以下時觸發重新訓練。

圖表翻譯: 此圖示展示了模型的監控與重新訓練流程。首先開始監控,接著持續檢查模型的準確性。如果準確性下降到閾值以下,則重新訓練模型,並在驗證新模型後佈署。

為什麼要監控模型?

監控可以幫助我們在生產環境中發現問題。例如,當使用者習慣的變化導致模型的效能下降時,監控系統可以及時提醒我們。此外,監控還可以用於檢測潛在的濫用行為,例如在建立防濫用或詐騙檢測系統時。

監控以檢測濫用

在某些情況下,例如建立防濫用或詐騙檢測系統時,一部分使用者會積極地試圖擊敗模型。在這些情況下,監控成為檢測攻擊和評估其成功率的關鍵方法。監控系統可以使用異常檢測來檢測攻擊。例如,在跟蹤每次登入銀行線上門戶的嘗試時,監控系統可以在登入嘗試次數突然增加十倍時發出警示,這可能是攻擊的跡象。

圖表翻譯:圖11-2展示了登入嘗試次數的突然增加,並觸發了警示。

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title 過濾模型最佳化生產環境機器學習應用

package "過濾模型最佳化" {
    package "過濾機制" {
        component [過濾模型] as filter
        component [主模型] as main
        component [困難樣本識別] as hard
    }

    package "效能策略" {
        component [快取機制] as cache
        component [DAG 排程] as dag
        component [使用者反饋] as feedback
    }

    package "評估指標" {
        component [推理時間比 f/i] as ratio
        component [篩選比例 b] as bypass
        component [整體效率] as efficiency
    }
}

filter --> main : 預篩選
cache --> dag : 結果重用
ratio --> bypass : 效益條件

note bottom of filter
  f/i < b 時
  過濾才有效益
end note

collect --> clean : 原始資料
clean --> feature : 乾淨資料
feature --> select : 特徵向量
select --> tune : 基礎模型
tune --> cv : 最佳參數
cv --> eval : 訓練模型
eval --> deploy : 驗證模型
deploy --> monitor : 生產模型

note right of feature
  特徵工程包含:
  - 特徵選擇
  - 特徵轉換
  - 降維處理
end note

note right of eval
  評估指標:
  - 準確率/召回率
  - F1 Score
  - AUC-ROC
end note

@enduml

圖表翻譯: 此圖示展示了登入嘗試的監控流程。首先開始監控登入嘗試,接著持續檢查登入嘗試次數。如果登入嘗試次數超過閾值,則發出警示。

如何監控模型?

Stitch Fix公司透過建立內部工具,使資料科學家能夠直接修復佈署過程中或之後發生的任何錯誤,從而簡化了監控和除錯的過程。這種設定激勵資料科學家構建簡單而強壯的模型,從而減少了模型的破壞頻率。

使用內部工具進行監控和除錯

Stitch Fix建立了一個內部工具,可以接收建模管道並建立Docker容器,驗證引數和傳回型別,將推理管道作為API公開,佈署在基礎設施上,並在其上構建儀錶板。這種工具使資料科學家能夠直接修復佈署過程中或之後發生的任何錯誤。

import docker

def create_docker_container(model_pipeline):
    # 建立Docker客戶端
    client = docker.from_env()
    
    # 建立Docker容器
    container = client.containers.run(model_pipeline, detach=True)
    
    return container

# 使用範例
model_pipeline = "my_model_pipeline"
container = create_docker_container(model_pipeline)
print(f"Container {container.id} is running.")

內容解密:

  1. 匯入必要的函式庫:首先匯入docker函式庫,以便與Docker互動。
  2. 定義建立Docker容器的函式create_docker_container函式接收一個建模管道作為引數,使用Docker客戶端建立並執行一個Docker容器。
  3. 建立Docker客戶端:使用docker.from_env()建立一個Docker客戶端例項,這允許我們與本地Docker守護程式互動。
  4. 建立並執行Docker容器:使用client.containers.run()方法建立並執行一個新的Docker容器,將detach=True引數設定為在後台執行容器。
  5. 傳回容器物件:函式傳回建立的容器物件,可以用來進一步管理和檢查容器的狀態。
  6. 使用範例:最後,提供了一個使用範例,展示如何呼叫create_docker_container函式並列印預出容器的ID。