隨著企業資料量的爆炸性成長以及機器學習模型日趨複雜,如何有效地將模型從開發階段順利部署到生產環境,成為資料科學團隊面臨的核心挑戰。Databricks 作為一個統一的資料分析平台,整合了 Apache Spark 的強大運算能力與雲端服務的彈性擴展特性,為企業提供了一套完整的解決方案來處理模型開發、部署和維護的全生命週期管理。本文將深入探討如何運用 Databricks 平台來建構穩健的機器學習應用,並提供具體的程式碼範例與實務技巧,協助讀者掌握從環境設定、叢集建立到安全強化的完整部署流程。
機器學習模型部署的團隊協作模式
在機器學習專案中,成功的模型部署不僅僅是技術問題,更是團隊協作的成果。資料科學家、資料工程師和資料分析師在模型生命週期的不同階段各自扮演著關鍵角色,彼此之間的有效溝通與配合決定了專案的成敗。
原型開發階段的職責分工
在模型開發的初期階段,資料科學家是主要的推動者,負責從原始資料中提取有價值的資訊並建構預測模型。這個階段的工作涵蓋了資料探索與清理、特徵工程設計、演算法選擇與調參、模型訓練與評估等核心任務。資料科學家需要深入理解業務問題,並將其轉化為可以透過機器學習解決的技術問題。
在這個過程中,資料工程師扮演著重要的支援角色,負責建立資料管線(Data Pipeline)來整合來自不同來源的資料。他們確保資料科學家能夠存取到乾淨、一致且及時更新的資料集,這對於模型品質的提升至關重要。資料分析師則協助進行初步的資料探索分析,透過視覺化和統計方法來發現資料中的模式和異常,為特徵工程提供有價值的洞察。
模型部署階段的工程實踐
當模型經過驗證並達到預期的效能指標後,便進入部署階段。此時,資料工程師和機器學習工程師成為主要負責人,他們的任務是將資料科學家開發的原型程式碼轉化為可以在生產環境中穩定執行的應用程式。這個轉化過程包含了程式碼重構以提升執行效率、錯誤處理機制的建立、API 端點的開發、以及與現有系統的整合等工作。
工程師需要考慮模型在面對大規模資料時的擴展性,確保系統能夠處理高並發的預測請求而不會出現效能瓶頸。同時,他們也需要設計完善的監控機制來追蹤模型的執行狀態和預測品質,以便在問題發生時能夠快速定位和修復。在這個階段,資料科學家持續提供技術支援,解答工程師關於模型運作原理和參數設定的疑問。
持續維護與模型監控
模型部署上線後並不代表工作的結束,反而是新挑戰的開始。資料分析師在這個階段扮演著關鍵角色,負責解讀模型的輸出結果並將技術分析轉譯為業務洞察,與各部門的利害關係人進行溝通。他們需要建立定期的效能報告機制,追蹤模型預測的準確度和業務指標的變化。
工程師則持續監控模型的技術指標,包括回應時間、錯誤率、資源使用率等,確保系統維持高可用性。當模型效能開始下降時,這可能是因為資料分佈發生了改變(概念漂移),團隊需要協作進行問題診斷並決定是否需要重新訓練模型。這種持續迭代的流程確保了機器學習系統能夠長期維持其商業價值。
團隊協作流程圖
以下圖表展示了機器學習模型從開發到維護的完整協作流程,說明各角色在不同階段的職責與互動關係。
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
start
partition "原型開發階段" {
:資料科學家進行資料探索;
:設計特徵工程方案;
:選擇演算法並訓練模型;
:評估模型效能;
if (模型效能達標?) then (是)
:準備模型移交文件;
else (否)
:調整特徵或演算法;
:重新訓練模型;
detach
endif
}
partition "部署階段" {
:工程師重構程式碼;
:建立 API 端點;
:整合現有系統;
:執行壓力測試;
if (通過測試?) then (是)
:部署至生產環境;
else (否)
:修復問題並重測;
detach
endif
}
partition "維護階段" {
:監控模型效能指標;
:分析師解讀預測結果;
if (效能持續達標?) then (是)
:持續監控;
else (否)
:觸發模型重訓練流程;
endif
}
stop
@enduml這個流程圖清楚地展示了機器學習專案的三個主要階段以及各階段的核心任務。在原型開發階段,重點在於模型的建構與驗證;部署階段則著重於程式碼的工程化與系統整合;維護階段則專注於持續的效能監控與迭代改進。各階段之間的反饋迴路確保了問題能夠被及時發現和處理。
Databricks 平台的核心技術優勢
Databricks 之所以成為機器學習模型部署的首選平台之一,在於它成功地整合了多項關鍵技術,解決了資料科學團隊在日常工作中面臨的各種痛點。這些技術優勢讓團隊能夠更有效率地協作,並大幅降低從原型到生產的轉換成本。
Apache Spark 驅動的大資料處理能力
Databricks 建構在 Apache Spark 之上,這意味著它天生具備處理大規模分散式資料的能力。在模型開發階段,資料科學家通常使用相對較小的樣本資料進行實驗和驗證。然而,當模型進入生產環境時,就必須能夠處理企業級的海量資料。Spark 的記憶體運算架構和智慧查詢最佳化器讓這種擴展變得無縫且高效。
以下程式碼展示了如何使用 PySpark 進行典型的資料處理任務,這些操作可以輕鬆地從單機擴展到數百個節點的叢集。
# 匯入必要的 PySpark 模組
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, when
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
# 建立 SparkSession 物件
# 這是與 Spark 叢集互動的入口點
spark = SparkSession.builder \
.appName("CustomerChurnPrediction") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# 從 S3 載入客戶資料
# 使用 inferSchema 自動推斷欄位型別
customer_data = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("s3://company-data-lake/customers/")
# 進行資料品質檢查
# 計算各欄位的空值比例
null_counts = customer_data.select([
(count(when(col(c).isNull(), c)) / count("*")).alias(c)
for c in customer_data.columns
])
# 資料轉換與特徵工程
# 建立新的衍生特徵
processed_data = customer_data \
.withColumn("tenure_group",
when(col("tenure") < 12, "short")
.when(col("tenure") < 36, "medium")
.otherwise("long")) \
.withColumn("avg_monthly_charges",
col("TotalCharges") / col("tenure"))
# 準備特徵向量
# 選擇用於模型訓練的數值型特徵
feature_columns = ["tenure", "MonthlyCharges", "TotalCharges"]
assembler = VectorAssembler(
inputCols=feature_columns,
outputCol="features"
)
# 標準化特徵
# 確保不同量綱的特徵具有可比性
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withStd=True,
withMean=True
)
# 套用轉換並檢視結果
feature_data = assembler.transform(processed_data)
scaled_data = scaler.fit(feature_data).transform(feature_data)
# 顯示處理後的資料樣本
scaled_data.select("customerID", "scaled_features", "Churn").show(5)
這段程式碼展示了完整的資料處理流程,從資料載入、品質檢查、特徵工程到特徵標準化。透過 Spark 的 DataFrame API,這些操作可以在分散式叢集上平行執行,處理數十億筆記錄也能在合理的時間內完成。Spark 的延遲執行(Lazy Evaluation)機制會自動最佳化執行計畫,確保資源被有效利用。
多語言環境的協作支援
Databricks Notebook 支援在同一個工作流程中混合使用 Python、Scala、SQL 和 R 等多種程式語言,這對於跨職能團隊的協作極為有利。資料科學家可以使用熟悉的 Python 和其豐富的機器學習生態系統進行模型開發,工程師則可以運用 Scala 的型別安全特性來建構更穩健的生產程式碼,而分析師可以直接撰寫 SQL 查詢來探索資料或驗證結果。
這種語言彈性消除了團隊成員之間的技術障礙,讓每個人都能使用最適合自己專業領域的工具來貢獻價值。更重要的是,不同語言之間可以透過臨時表(Temporary View)或 Delta Lake 表格來共享資料,實現無縫的工作銜接。
主流機器學習框架的原生整合
Databricks 環境預先安裝並最佳化了 TensorFlow、PyTorch、Scikit-learn、XGBoost 等主流機器學習框架,讓資料科學家能夠立即開始模型開發而無需處理繁瑣的環境設定問題。此外,Databricks 的 MLflow 整合提供了完整的實驗追蹤、模型註冊和部署功能,實現了端對端的 MLOps 工作流程。
以下程式碼展示了如何在 Databricks 環境中使用 MLflow 來追蹤模型訓練實驗並註冊最佳模型。
# 匯入 MLflow 和機器學習相關模組
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import numpy as np
# 設定 MLflow 實驗名稱
# 所有執行記錄都會歸類在這個實驗下
mlflow.set_experiment("/Users/datascience/customer-churn-prediction")
# 載入和準備資料
# 使用 Pandas 處理小型資料集
data = pd.read_csv("/dbfs/mnt/data/customer_churn.csv")
X = data.drop(columns=["Churn", "customerID"])
y = data["Churn"].map({"Yes": 1, "No": 0})
# 分割訓練和測試資料集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 定義要嘗試的超參數組合
param_grid = [
{"n_estimators": 100, "max_depth": 10, "min_samples_split": 2},
{"n_estimators": 200, "max_depth": 15, "min_samples_split": 5},
{"n_estimators": 300, "max_depth": 20, "min_samples_split": 10}
]
# 執行超參數搜尋實驗
best_f1 = 0
best_run_id = None
for params in param_grid:
# 開始一個新的 MLflow 執行
with mlflow.start_run():
# 記錄超參數
mlflow.log_params(params)
# 訓練隨機森林模型
model = RandomForestClassifier(
n_estimators=params["n_estimators"],
max_depth=params["max_depth"],
min_samples_split=params["min_samples_split"],
random_state=42,
n_jobs=-1 # 使用所有可用的 CPU 核心
)
model.fit(X_train, y_train)
# 進行預測並計算評估指標
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# 記錄評估指標
mlflow.log_metrics({
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1
})
# 記錄模型
mlflow.sklearn.log_model(model, "random_forest_model")
# 記錄特徵重要性
feature_importance = pd.DataFrame({
"feature": X.columns,
"importance": model.feature_importances_
}).sort_values("importance", ascending=False)
mlflow.log_text(
feature_importance.to_string(),
"feature_importance.txt"
)
# 追蹤最佳模型
if f1 > best_f1:
best_f1 = f1
best_run_id = mlflow.active_run().info.run_id
print(f"參數: {params}")
print(f"F1 Score: {f1:.4f}\n")
# 將最佳模型註冊到 Model Registry
if best_run_id:
model_uri = f"runs:/{best_run_id}/random_forest_model"
model_details = mlflow.register_model(
model_uri=model_uri,
name="customer_churn_classifier"
)
print(f"最佳模型已註冊,版本: {model_details.version}")
這段程式碼展示了一個完整的模型訓練工作流程,包含超參數搜尋、實驗追蹤和模型註冊。MLflow 會自動記錄每次執行的參數、指標和產出物,讓資料科學家能夠輕鬆比較不同實驗的結果並追溯最佳模型的訓練過程。Model Registry 則提供了版本控制和階段管理功能,支援模型從開發到生產的平滑過渡。
機器學習工作流程圖
以下圖表說明了在 Databricks 平台上執行機器學習專案的標準工作流程。
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
start
:載入原始資料;
note right
支援 S3、ADLS、
GCS 等雲端儲存
end note
:資料品質檢查與清理;
:特徵工程與轉換;
:分割訓練與測試資料;
:設定 MLflow 實驗;
while (超參數組合未完成?) is (是)
:訓練模型;
:評估模型效能;
:記錄參數與指標;
endwhile (否)
:選擇最佳模型;
:註冊模型至 Model Registry;
if (通過審核?) then (是)
:推進至 Staging 環境;
:執行整合測試;
if (測試通過?) then (是)
:部署至 Production;
else (否)
:退回修正;
endif
else (否)
:修改模型;
endif
stop
@endumlDatabricks 環境設定與資源準備
在開始使用 Databricks 進行機器學習專案之前,需要完成一系列的環境設定工作。這包括雲端儲存的設定、身份驗證的配置、以及工作區函式庫的安裝。正確的環境設定是確保後續開發和部署工作順利進行的基礎。
建立雲端儲存並組織資料結構
機器學習專案通常需要處理大量的訓練資料、模型檔案和中間產出物,因此需要一個可靠且可擴展的儲存解決方案。以 AWS 為例,S3 是最常用的選擇。在建立儲存桶時,應該根據資料的用途和存取模式來設計目錄結構,以便於管理和維護。
一個典型的機器學習專案儲存結構可能包含原始資料目錄用於存放未經處理的原始資料集、處理資料目錄用於存放經過清理和轉換的資料、模型目錄用於存放訓練好的模型檔案、以及輸出目錄用於存放預測結果和報告。這種結構化的設計讓團隊成員能夠快速找到所需的資源,也便於實施細粒度的存取控制。
以下程式碼展示了如何使用 boto3 來建立 S3 儲存桶並上傳資料檔案。
# 匯入 AWS SDK
import boto3
from botocore.exceptions import ClientError
import os
# 建立 S3 客戶端
# 確保已設定 AWS 認證(透過環境變數或 IAM 角色)
s3_client = boto3.client('s3')
def create_ml_project_bucket(bucket_name, region='ap-northeast-1'):
"""
建立 S3 儲存桶並設定基本的目錄結構
Args:
bucket_name: 儲存桶名稱(必須全球唯一)
region: AWS 區域
Returns:
bool: 建立成功返回 True,否則返回 False
"""
try:
# 建立儲存桶
# 注意:us-east-1 區域不需要指定 LocationConstraint
if region == 'us-east-1':
s3_client.create_bucket(Bucket=bucket_name)
else:
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
# 啟用儲存桶版本控制
# 這對於追蹤模型版本很有用
s3_client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)
# 建立目錄結構
# S3 沒有真正的目錄概念,透過建立空物件來模擬
directories = [
'raw-data/',
'processed-data/',
'models/',
'outputs/',
'configs/'
]
for directory in directories:
s3_client.put_object(
Bucket=bucket_name,
Key=directory
)
print(f"儲存桶 {bucket_name} 建立成功")
return True
except ClientError as e:
print(f"建立儲存桶失敗: {e}")
return False
def upload_training_data(bucket_name, local_path, s3_prefix='raw-data/'):
"""
上傳訓練資料到 S3
Args:
bucket_name: 儲存桶名稱
local_path: 本機檔案或目錄路徑
s3_prefix: S3 目標路徑前綴
"""
try:
if os.path.isfile(local_path):
# 上傳單一檔案
file_name = os.path.basename(local_path)
s3_key = f"{s3_prefix}{file_name}"
s3_client.upload_file(
local_path,
bucket_name,
s3_key
)
print(f"已上傳: {local_path} -> s3://{bucket_name}/{s3_key}")
elif os.path.isdir(local_path):
# 遞迴上傳目錄中的所有檔案
for root, dirs, files in os.walk(local_path):
for file in files:
local_file = os.path.join(root, file)
relative_path = os.path.relpath(local_file, local_path)
s3_key = f"{s3_prefix}{relative_path}"
s3_client.upload_file(
local_file,
bucket_name,
s3_key
)
print(f"已上傳: {local_file} -> s3://{bucket_name}/{s3_key}")
except ClientError as e:
print(f"上傳失敗: {e}")
# 使用範例
bucket_name = "ml-project-databricks-demo"
create_ml_project_bucket(bucket_name, region='ap-northeast-1')
upload_training_data(bucket_name, "./training_data/", "raw-data/")
這段程式碼提供了建立儲存桶和上傳資料的完整功能。儲存桶版本控制的啟用讓我們能夠追蹤資料和模型的歷史版本,這在需要回溯或比較不同版本時非常有用。目錄結構的預先建立則確保了專案資源的有序組織。
設定 Instance Profile 實現安全存取
為了讓 Databricks 叢集能夠存取 S3 儲存桶中的資料,需要透過 Instance Profile 來授予適當的權限。這種方式比直接在程式碼中嵌入存取金鑰更加安全,因為它利用 IAM 角色的臨時認證機制,避免了長期憑證外洩的風險。
以下程式碼展示了如何建立 IAM 角色和 Instance Profile,並設定適當的權限政策。
# 匯入 boto3 IAM 模組
import boto3
import json
# 建立 IAM 客戶端
iam_client = boto3.client('iam')
def create_databricks_instance_profile(
role_name,
instance_profile_name,
bucket_name
):
"""
建立 Databricks 使用的 Instance Profile
Args:
role_name: IAM 角色名稱
instance_profile_name: Instance Profile 名稱
bucket_name: 要存取的 S3 儲存桶名稱
Returns:
str: Instance Profile ARN
"""
# 定義信任政策
# 允許 EC2 服務代入此角色
assume_role_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
# 建立 IAM 角色
try:
role_response = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(assume_role_policy),
Description="Databricks cluster role for S3 access"
)
role_arn = role_response['Role']['Arn']
print(f"已建立角色: {role_arn}")
except iam_client.exceptions.EntityAlreadyExistsException:
role_response = iam_client.get_role(RoleName=role_name)
role_arn = role_response['Role']['Arn']
print(f"角色已存在: {role_arn}")
# 定義 S3 存取政策
# 遵循最小權限原則,只授予必要的權限
s3_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
f"arn:aws:s3:::{bucket_name}",
f"arn:aws:s3:::{bucket_name}/*"
]
}
]
}
# 建立並附加內聯政策
policy_name = f"{role_name}-s3-policy"
try:
iam_client.put_role_policy(
RoleName=role_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(s3_policy)
)
print(f"已附加政策: {policy_name}")
except Exception as e:
print(f"附加政策失敗: {e}")
# 建立 Instance Profile
try:
iam_client.create_instance_profile(
InstanceProfileName=instance_profile_name
)
print(f"已建立 Instance Profile: {instance_profile_name}")
except iam_client.exceptions.EntityAlreadyExistsException:
print(f"Instance Profile 已存在: {instance_profile_name}")
# 將角色加入 Instance Profile
try:
iam_client.add_role_to_instance_profile(
InstanceProfileName=instance_profile_name,
RoleName=role_name
)
print(f"已將角色 {role_name} 加入 Instance Profile")
except iam_client.exceptions.LimitExceededException:
print("Instance Profile 已包含此角色")
# 取得 Instance Profile ARN
profile_response = iam_client.get_instance_profile(
InstanceProfileName=instance_profile_name
)
profile_arn = profile_response['InstanceProfile']['Arn']
return profile_arn
# 使用範例
profile_arn = create_databricks_instance_profile(
role_name="databricks-ml-project-role",
instance_profile_name="databricks-ml-project-profile",
bucket_name="ml-project-databricks-demo"
)
print(f"Instance Profile ARN: {profile_arn}")
這段程式碼完整展示了 Instance Profile 的建立流程。信任政策定義了哪些服務可以代入此角色,而 S3 存取政策則精確控制了可以執行的操作和可以存取的資源。這種細粒度的權限控制是雲端安全最佳實踐的核心要素。
資料存取架構圖
以下圖表說明了 Databricks 叢集透過 Instance Profile 存取 S3 儲存桶的安全架構。
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "Databricks 工作區" {
[Databricks 叢集] as Cluster
note bottom of Cluster
運行 Spark 作業
執行模型訓練
end note
}
package "AWS IAM" {
[Instance Profile] as Profile
[IAM 角色] as Role
[S3 存取政策] as Policy
Profile --> Role
Role --> Policy
}
package "AWS S3" {
[訓練資料] as TrainData
[模型檔案] as Models
[輸出結果] as Outputs
}
Cluster --> Profile : 關聯
Profile --> TrainData : 授權存取
Profile --> Models : 授權存取
Profile --> Outputs : 授權存取
note right of Policy
最小權限原則
僅授予必要權限
end note
@endumlDatabricks 叢集組態與最佳化策略
叢集是 Databricks 中執行運算任務的基礎單元。正確的叢集組態不僅影響工作負載的執行效率,也直接關係到成本控制。根據不同的使用場景,需要選擇適當的節點類型、叢集大小和執行模式。
叢集組態的核心參數
以下 JSON 展示了一個典型的 Databricks 叢集組態,其中包含了關鍵的設定參數。
{
"cluster_name": "ml-training-cluster",
"spark_version": "13.3.x-ml-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4,
"spark_conf": {
"spark.databricks.delta.preview.enabled": "true",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.databricks.io.cache.enabled": "true"
},
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "ap-northeast-1a",
"instance_profile_arn": "arn:aws:iam::123456789012:instance-profile/databricks-ml-project-profile",
"spot_bid_price_percent": 100,
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 100
},
"autotermination_minutes": 60,
"enable_elastic_disk": true,
"cluster_log_conf": {
"s3": {
"destination": "s3://ml-project-databricks-demo/cluster-logs",
"region": "ap-northeast-1"
}
},
"init_scripts": [
{
"s3": {
"destination": "s3://ml-project-databricks-demo/init-scripts/install-packages.sh"
}
}
]
}
這個組態檔案涵蓋了叢集設定的各個面向。spark_version 選擇了 ML Runtime 版本,這個版本預先安裝了常用的機器學習函式庫。node_type_id 指定了 i3.xlarge 實例類型,這種實例配備了 NVMe SSD,特別適合需要大量磁碟 I/O 的資料處理任務。
spark_conf 區塊中的設定啟用了多項效能最佳化功能。Adaptive Query Execution 能夠根據執行時期的統計資訊動態調整查詢計畫,而 Delta Cache 則透過 SSD 快取來加速資料存取。這些設定通常能夠帶來顯著的效能提升。
aws_attributes 區塊中的 availability 設定為 SPOT_WITH_FALLBACK,這意味著叢集會優先使用價格較低的 Spot 實例,但在 Spot 實例不可用時會自動切換到 On-Demand 實例,確保工作負載的連續性。這種策略在成本和可靠性之間取得了良好的平衡。
自動擴展與成本控制
對於工作負載變動較大的場景,可以啟用叢集自動擴展功能,讓系統根據實際需求動態調整工作節點的數量。
{
"cluster_name": "ml-autoscaling-cluster",
"spark_version": "13.3.x-ml-scala2.12",
"node_type_id": "i3.xlarge",
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"spark_conf": {
"spark.databricks.cluster.profile": "serverless",
"spark.databricks.repl.allowedLanguages": "python,sql"
},
"custom_tags": {
"Environment": "Production",
"Project": "CustomerChurn",
"CostCenter": "DataScience"
},
"autotermination_minutes": 30
}
autoscale 區塊定義了工作節點數量的範圍,Databricks 會根據待處理任務的佇列長度來自動增減節點。這種彈性確保了在尖峰時期有足夠的運算資源,而在閒置時期則節省成本。custom_tags 則用於成本分攤和資源管理,讓財務團隊能夠追蹤各專案的雲端支出。
叢集監控與告警設定
持續監控叢集的資源使用狀況對於維持系統健康和最佳化成本至關重要。Databricks 提供了內建的監控儀表板,同時也支援與外部監控系統的整合。
以下程式碼展示了如何使用 Databricks API 來取得叢集的執行時期指標。
# 匯入必要模組
import requests
import json
from datetime import datetime, timedelta
class DatabricksClusterMonitor:
"""
Databricks 叢集監控類別
提供叢集狀態查詢和指標收集功能
"""
def __init__(self, workspace_url, token):
"""
初始化監控器
Args:
workspace_url: Databricks 工作區 URL
token: Personal Access Token
"""
self.workspace_url = workspace_url.rstrip('/')
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
def get_cluster_info(self, cluster_id):
"""
取得叢集詳細資訊
Args:
cluster_id: 叢集識別碼
Returns:
dict: 叢集資訊
"""
endpoint = f"{self.workspace_url}/api/2.0/clusters/get"
response = requests.get(
endpoint,
headers=self.headers,
params={"cluster_id": cluster_id}
)
response.raise_for_status()
return response.json()
def get_cluster_events(self, cluster_id, start_time=None, end_time=None):
"""
取得叢集事件記錄
Args:
cluster_id: 叢集識別碼
start_time: 開始時間(毫秒時間戳)
end_time: 結束時間(毫秒時間戳)
Returns:
list: 事件列表
"""
if start_time is None:
# 預設查詢過去 24 小時的事件
start_time = int((datetime.now() - timedelta(hours=24)).timestamp() * 1000)
if end_time is None:
end_time = int(datetime.now().timestamp() * 1000)
endpoint = f"{self.workspace_url}/api/2.0/clusters/events"
payload = {
"cluster_id": cluster_id,
"start_time": start_time,
"end_time": end_time,
"limit": 500
}
response = requests.post(
endpoint,
headers=self.headers,
json=payload
)
response.raise_for_status()
return response.json().get("events", [])
def analyze_cluster_utilization(self, cluster_id):
"""
分析叢集資源使用率
Args:
cluster_id: 叢集識別碼
Returns:
dict: 使用率分析結果
"""
# 取得叢集資訊
cluster_info = self.get_cluster_info(cluster_id)
# 取得最近的事件
events = self.get_cluster_events(cluster_id)
# 計算執行時間
state = cluster_info.get("state", "UNKNOWN")
# 分析自動擴展事件
scale_up_events = [e for e in events if e.get("type") == "AUTOSCALING_STATS_REPORT"]
analysis = {
"cluster_id": cluster_id,
"cluster_name": cluster_info.get("cluster_name"),
"current_state": state,
"num_workers": cluster_info.get("num_workers", 0),
"driver_node_type": cluster_info.get("driver_node_type_id"),
"worker_node_type": cluster_info.get("node_type_id"),
"spark_version": cluster_info.get("spark_version"),
"autoscaling_enabled": "autoscale" in cluster_info,
"total_events_24h": len(events),
"scale_events": len(scale_up_events)
}
return analysis
# 使用範例
monitor = DatabricksClusterMonitor(
workspace_url="https://adb-1234567890123456.7.azuredatabricks.net",
token="dapi1234567890abcdef"
)
cluster_analysis = monitor.analyze_cluster_utilization("0123-456789-abcdef")
print(json.dumps(cluster_analysis, indent=2, ensure_ascii=False))
這個監控類別提供了查詢叢集狀態和分析使用率的功能。透過定期收集這些指標,團隊可以識別資源使用的模式,進而調整叢集組態以達到更好的成本效益。例如,如果發現叢集在特定時段經常閒置,可以考慮縮短自動終止時間或調整自動擴展的閾值。
叢集資源監控架構
以下圖表展示了叢集監控系統的整體架構。
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "Databricks 叢集" {
[Driver 節點] as Driver
[Worker 節點群] as Workers
Driver --> Workers : 分配任務
}
package "監控系統" {
[Metrics Collector] as Collector
[Time Series DB] as TSDB
[Alert Manager] as AlertMgr
[Dashboard] as Dashboard
Collector --> TSDB : 儲存指標
TSDB --> Dashboard : 視覺化
TSDB --> AlertMgr : 觸發條件
}
Driver --> Collector : CPU、記憶體、I/O
Workers --> Collector : 資源使用率
AlertMgr --> [通知服務] : Email、Slack
note bottom of Dashboard
即時監控
歷史趨勢
成本分析
end note
@enduml安全強化與合規性考量
在企業環境中部署機器學習系統時,安全性和合規性是不可忽視的重要面向。Databricks 提供了多層次的安全機制,從網路隔離、身份驗證到資料加密,協助企業滿足各種安全和監管要求。
網路安全與存取控制
建議將 Databricks 工作區部署在 VPC 中,並透過 VPC 端點來存取雲端服務,這樣可以確保所有流量都保持在私有網路內,不會經過公共網際網路。同時,應該設定安全群組規則來限制入站和出站流量,只允許必要的連接埠和 IP 範圍。
以下程式碼展示了如何設定 IAM 政策來實施細粒度的存取控制。
# 定義 Databricks 工作區的 IAM 政策
# 這個政策限制使用者只能存取特定的資源
workspace_access_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowDatabricksWorkspaceAccess",
"Effect": "Allow",
"Action": [
"databricks:*"
],
"Resource": [
"arn:aws:databricks:ap-northeast-1:123456789012:workspace/1234567890123456"
]
},
{
"Sid": "DenyClusterCreationWithoutTags",
"Effect": "Deny",
"Action": [
"databricks:CreateCluster"
],
"Resource": "*",
"Condition": {
"Null": {
"databricks:RequestTag/CostCenter": "true"
}
}
},
{
"Sid": "RestrictInstanceTypes",
"Effect": "Deny",
"Action": [
"databricks:CreateCluster",
"databricks:EditCluster"
],
"Resource": "*",
"Condition": {
"ForAnyValue:StringNotLike": {
"databricks:InstanceType": [
"i3.*",
"r5.*",
"m5.*"
]
}
}
}
]
}
# 這個政策實現了以下控制:
# 1. 允許存取特定的 Databricks 工作區
# 2. 強制所有叢集必須標記 CostCenter 標籤
# 3. 限制可使用的實例類型,防止使用過於昂貴的實例
這個 IAM 政策展示了如何透過條件語句來實施精細的存取控制。透過強制標籤和限制實例類型,組織可以有效控制雲端成本並確保資源使用符合公司政策。
資料加密與金鑰管理
敏感資料應該在儲存和傳輸過程中都進行加密。對於 S3 儲存桶,建議啟用伺服器端加密(SSE-S3 或 SSE-KMS)。如果使用 SSE-KMS,可以透過 AWS KMS 來管理加密金鑰,並設定金鑰輪換政策。
以下程式碼展示了如何為 S3 儲存桶設定加密組態。
# 匯入 boto3
import boto3
# 建立 S3 客戶端
s3_client = boto3.client('s3')
def configure_bucket_encryption(bucket_name, kms_key_id=None):
"""
設定 S3 儲存桶的加密組態
Args:
bucket_name: 儲存桶名稱
kms_key_id: KMS 金鑰 ID(可選,若不提供則使用 SSE-S3)
"""
if kms_key_id:
# 使用 KMS 管理的金鑰進行加密
encryption_configuration = {
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'aws:kms',
'KMSMasterKeyID': kms_key_id
},
'BucketKeyEnabled': True # 啟用 Bucket Key 以降低 KMS 成本
}
]
}
else:
# 使用 S3 管理的金鑰進行加密
encryption_configuration = {
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'AES256'
}
}
]
}
# 套用加密組態
s3_client.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration=encryption_configuration
)
print(f"已為 {bucket_name} 設定加密")
def configure_bucket_policy_for_encryption(bucket_name):
"""
設定儲存桶政策以強制加密上傳
Args:
bucket_name: 儲存桶名稱
"""
# 這個政策會拒絕任何未加密的上傳請求
bucket_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyUnencryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": f"arn:aws:s3:::{bucket_name}/*",
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": ["AES256", "aws:kms"]
}
}
},
{
"Sid": "DenyUnencryptedTransport",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:*",
"Resource": [
f"arn:aws:s3:::{bucket_name}",
f"arn:aws:s3:::{bucket_name}/*"
],
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
s3_client.put_bucket_policy(
Bucket=bucket_name,
Policy=json.dumps(bucket_policy)
)
print(f"已為 {bucket_name} 設定強制加密政策")
# 使用範例
configure_bucket_encryption("ml-project-databricks-demo")
configure_bucket_policy_for_encryption("ml-project-databricks-demo")
這段程式碼設定了兩層加密保護。首先是預設的伺服器端加密,確保所有新上傳的物件都會被自動加密。其次是儲存桶政策,它會拒絕任何未加密的上傳請求以及非 HTTPS 的存取,提供額外的安全保障。
安全架構總覽
以下圖表展示了 Databricks 環境的完整安全架構。
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "企業網路" {
[使用者] as User
}
package "AWS VPC" {
package "Private Subnet" {
[Databricks 叢集] as Cluster
}
[VPC Endpoint] as VPCE
[NAT Gateway] as NAT
}
package "AWS 服務" {
[S3 儲存桶] as S3
[KMS] as KMS
[CloudWatch] as CW
}
package "安全控制" {
[IAM 角色] as IAM
[Security Group] as SG
[Network ACL] as NACL
}
User --> Cluster : HTTPS
Cluster --> VPCE : 私有連線
VPCE --> S3 : 存取資料
S3 --> KMS : 加密金鑰
Cluster --> CW : 日誌與指標
IAM --> Cluster : 身份驗證
SG --> Cluster : 流量控制
NACL --> Cluster : 網路過濾
note bottom of S3
SSE-KMS 加密
版本控制
存取日誌
end note
@enduml總結
Databricks 平台為機器學習模型的部署提供了一個功能完整且高度整合的解決方案。透過本文的探討,我們了解到成功的模型部署需要資料科學家、工程師和分析師之間的緊密協作,而 Databricks 的多語言支援和協作功能正好滿足了這個需求。
在技術實作方面,Databricks 建構在 Apache Spark 之上的架構確保了優異的大資料處理能力,讓模型能夠從開發階段的小規模實驗無縫擴展到生產環境的大規模應用。MLflow 的整合則提供了完整的 MLOps 工作流程支援,從實驗追蹤到模型註冊再到部署監控,形成了閉環的生命週期管理。
安全性和成本控制是企業採用雲端平台時的重要考量。透過 Instance Profile 和 IAM 政策的正確設定,可以實現最小權限原則並確保資料安全。叢集的自動擴展和 Spot 實例的使用則有效降低了運算成本,而自動終止功能避免了資源的浪費。
對於計劃在 Databricks 上部署機器學習模型的團隊,建議從小規模的概念驗證專案開始,逐步熟悉平台的各項功能和最佳實踐。隨著經驗的累積,再逐步擴展到更複雜的生產應用。持續關注 Databricks 的更新和新功能,特別是在 AutoML 和 Serverless 運算方面的發展,這些技術有望進一步簡化機器學習的部署流程並降低使用門檻。