前言:企業級自然語言處理的技術挑戰

當代企業在數位轉型過程中累積了龐大的非結構化文字資料,這些資料隱藏著寶貴的商業洞察,卻因為處理技術的限制而無法有效利用。自然語言處理技術提供了從海量文字中萃取價值的能力,無論是客戶意見分析、新聞輿情監測或是智慧客服系統,都需要處理大規模的文字資料集。然而傳統單機處理方式在面對企業級資料規模時,往往遭遇嚴重的效能瓶頸與擴展性限制,這使得許多具有高商業價值的分析專案難以落地實施。

Databricks 作為統一資料分析平台,整合了 Apache Spark 的分散式運算能力與雲端基礎設施的彈性擴展特性,為自然語言處理任務提供了理想的執行環境。透過將 SpaCy 這個功能完整的自然語言處理函式庫與 PySpark 的分散式資料框架結合,開發團隊能在保持程式碼可讀性的前提下,實現對大規模文字資料的高效處理。這種結合不僅解決了運算效能問題,更提供了從資料探索到生產佈署的完整解決方案,讓企業能夠真正將自然語言處理技術應用於實際業務場景。

自然語言處理技術在商業應用上具有廣泛的價值,企業可以透過文字分析了解客戶對產品的真實感受,可以從新聞報導中追蹤競爭對手動態,也可以自動化處理大量的客戶諮詢請求。這些應用場景的共同特點是需要處理的資料量龐大,傳統的處理方式無法在合理時間內完成分析任務。分散式處理架構提供了突破這個限制的途徑,透過將工作分散到多個運算節點並行執行,可以大幅縮短處理時間,讓原本需要數天完成的任務在數小時內完成。

本文以命名實體識別作為核心技術案例,完整說明在 Databricks 平台建置自然語言處理流程的技術實踐。命名實體識別是自然語言處理的基礎任務之一,其目標是從文字中識別具有特定意義的詞彙或詞組,並將其分類為預先定義的類別,例如人名、地名、組織名稱等。這個任務在資訊擷取、知識圖譜建構、智慧搜尋等場景都有重要應用。從環境準備、模型載入、資料處理到生產化佈署,每個環節都提供詳細的程式碼範例與技術說明,協助開發者與資料科學家掌握在雲端環境進行自然語言處理的完整技能。

Databricks 環境建置與模型佈署架構

在開始自然語言處理任務之前,必須完成 Databricks 環境的基礎建置。這個階段的核心目標是建立可重複使用的初始化機制,確保每次叢集啟動時都能自動完成必要函式庫的安裝與設定。透過這種基礎設施即程式碼的做法,不僅能節省手動設定時間,更能確保開發環境與生產環境的一致性,這在企業應用中至關重要。良好的環境建置策略能夠大幅降低維運負擔,同時避免因環境差異導致的各種疑難雜症。

環境建置涉及多個步驟,首先需要在分散式檔案系統中建立適當的目錄結構,接著上傳預訓練的語言模型,然後建立初始化指令碼確保模型能在叢集啟動時自動安裝。這些步驟環環相扣,任何一個環節出現問題都可能導致後續處理失敗。在企業級應用中,環境的一致性與可重複性是確保系統穩定運作的基礎,因此投入足夠的時間進行環境建置規劃是非常值得的。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100

title Databricks 環境建置流程

|準備階段|
start
:建立 DBFS 目錄結構;
note right
    使用 dbutils.fs.mkdirs
    建立模型存放目錄
end note
:上傳 SpaCy 語言模型;
note right
    從雲端儲存體複製
    預訓練模型檔案
end note
:驗證檔案完整性與權限;

|設定階段|
:撰寫初始化 Shell 指令碼;
note right
    定義模型安裝命令
    設定錯誤處理機制
end note
:設定叢集啟動組態;
note right
    指定初始化指令碼路徑
    配置記憶體與核心數
end note

|驗證階段|
:重新啟動叢集測試;
:確認模型載入正常;
note right
    執行簡單的 NLP 測試
    確保環境完整可用
end note
stop

@enduml

環境建置的第一步是在 Databricks 檔案系統中建立適當的目錄結構來存放 SpaCy 模型檔案。DBFS 是 Databricks 提供的分散式檔案系統,能讓叢集中所有節點存取相同的檔案資源。透過將模型檔案放置在 DBFS 中,可以確保分散式運算時每個工作節點都能載入所需模型。這種集中式檔案管理方式大幅簡化了分散式環境的資源共享問題,開發者不需要擔心模型檔案在各節點間的同步議題,系統會自動處理這些底層細節。

DBFS 的運作原理是在底層雲端儲存服務之上建立一個抽象層,讓使用者可以像操作本地檔案系統一樣存取雲端儲存的檔案。當開發者將模型檔案上傳到 DBFS 後,這些檔案實際上儲存在雲端物件儲存服務中,但透過 DBFS 的抽象層可以使用標準的檔案路徑存取。在叢集的每個節點上,DBFS 路徑會被掛載為本地路徑,因此程式碼可以直接使用本地檔案操作方式讀取這些檔案,這大幅簡化了分散式環境中的檔案存取邏輯。

# =============================================================================
# Databricks 環境初始化設定
# 目的:在 DBFS 建立目錄結構並上傳 SpaCy 語言模型
# 執行環境:Databricks Notebook
# 版本需求:Databricks Runtime 13.0 以上
# =============================================================================

# -----------------------------------------------------------------------------
# 步驟一:建立 DBFS 目錄結構
#
# dbutils 是 Databricks 提供的工具函式庫,整合了檔案系統操作、
# 秘密管理、筆記本工作流程等多種功能。這個工具函式庫在 Databricks
# 的 Notebook 環境中會自動載入,開發者可以直接使用。
#
# fs.mkdirs 方法用於建立目錄,其行為類似於 Unix 系統的 mkdir -p 命令,
# 如果目錄已經存在則不會產生錯誤,如果父目錄不存在則會自動建立。
# 這個特性讓開發者可以放心執行這個命令,不需要事先檢查目錄是否存在。
#
# 此目錄將作為所有自然語言處理模型的統一存放位置,建議採用有意義的
# 命名結構,例如 /databricks/models/{類別}/{模型名稱},以便於管理
# 多個不同用途的模型。
# -----------------------------------------------------------------------------
dbutils.fs.mkdirs("dbfs:/databricks/models/spacy")

# -----------------------------------------------------------------------------
# 步驟二:從雲端儲存體複製預訓練模型至 DBFS
#
# Databricks 支援多種雲端儲存服務的存取,包括 Amazon S3、Azure Blob
# Storage、Azure Data Lake Storage Gen2 以及 Google Cloud Storage。
# 不同的雲端服務使用不同的協定前綴:
#   Amazon S3:s3a:// 或 s3://,其中 s3a 效能較佳
#   Azure Blob Storage:wasbs://
#   Azure Data Lake Storage Gen2:abfss://
#   Google Cloud Storage:gs://
#
# dbutils.fs.cp 方法用於複製檔案或目錄,參數說明如下:
#   第一個參數:來源路徑,可以是雲端儲存體或 DBFS 路徑
#   第二個參數:目標路徑,通常是 DBFS 路徑
#   第三個參數:布林值,True 表示遞迴複製整個目錄結構
#
# 在正式環境中,建議將模型檔案儲存在受版本控制的儲存體中,
# 並使用明確的版本號碼作為目錄名稱,以便追蹤模型版本變更。
# -----------------------------------------------------------------------------
dbutils.fs.cp(
    "s3a://your-bucket/models/spacy/",  # 請替換為實際的儲存體路徑
    "dbfs:/databricks/models/spacy/",
    True
)

# -----------------------------------------------------------------------------
# 步驟三:驗證模型檔案是否成功複製
#
# 驗證步驟在環境建置中非常重要,透過確認檔案確實存在且大小正確,
# 可以避免後續處理時因為模型檔案缺失或損毀導致的錯誤。
#
# display 函式是 Databricks Notebook 提供的輸出函式,
# 會以美觀的表格形式呈現資料,包含檔案名稱、大小、修改時間等資訊。
# 相較於 print 函式,display 提供更豐富的視覺化效果。
#
# 預期應看到以下檔案(實際版本號碼可能不同):
#   en_core_web_lg-3.7.1.tar.gz:SpaCy 官方大型英文模型
#   zh_core_web_trf-3.7.0.tar.gz:中文 Transformer 模型(選用)
#   其他自訂訓練的模型套件
# -----------------------------------------------------------------------------
display(dbutils.fs.ls("dbfs:/databricks/models/spacy/"))

完成模型檔案上傳後,接下來需要建立初始化指令碼。初始化指令碼是 Databricks 叢集管理的重要功能,它允許開發者定義在叢集啟動時自動執行的命令。對於自然語言處理任務而言,這個機制特別重要,因為 SpaCy 模型需要透過 pip 安裝才能在 Python 環境中使用。如果沒有初始化指令碼,開發者就需要在每次叢集啟動後手動執行安裝命令,這不僅繁瑣而且容易遺忘。

初始化指令碼的執行時機是在叢集啟動過程中,在 Spark 服務啟動之前。每個新啟動的節點都會執行這個指令碼,包括主節點和工作節點。這表示無論是叢集首次啟動,還是透過自動擴展機制新增工作節點,初始化指令碼都會自動執行,確保所有節點具備相同的執行環境。這種自動化機制消除了手動設定的繁瑣步驟,也避免了因為環境不一致導致的執行錯誤。

# =============================================================================
# 建立叢集初始化指令碼
# 目的:確保每個節點啟動時自動安裝 SpaCy 模型
# 設計原則:冪等性、錯誤處理、日誌記錄
# =============================================================================

# -----------------------------------------------------------------------------
# 建立指令碼存放目錄
#
# 將初始化指令碼與模型檔案分開存放是一個良好的實踐做法,
# 這樣可以更清楚地區分不同類型的資源,也便於版本管理。
# 建議的目錄結構如下:
#   /databricks/models/:存放各種模型檔案
#   /databricks/scripts/:存放初始化指令碼
#   /databricks/configs/:存放設定檔案
# -----------------------------------------------------------------------------
dbutils.fs.mkdirs("dbfs:/databricks/scripts/")

# -----------------------------------------------------------------------------
# 定義初始化指令碼內容
#
# 這是一個標準的 Bash Shell 指令碼,會在每個節點啟動時執行。
# 指令碼設計時需要考慮以下幾個重點:
#
# 1. 冪等性:指令碼應該可以重複執行而不會產生錯誤或副作用
# 2. 錯誤處理:當某個步驟失敗時,應該適當地處理錯誤
# 3. 日誌記錄:記錄足夠的資訊以便於除錯
# 4. 效能考量:避免不必要的重複安裝
#
# 使用 /dbfs 路徑存取 DBFS 檔案是因為在叢集節點上,
# DBFS 會被掛載為本地檔案系統,掛載點就是 /dbfs 目錄。
# 因此 dbfs:/databricks/models/spacy/ 在節點上可以透過
# /dbfs/databricks/models/spacy/ 路徑存取。
# -----------------------------------------------------------------------------
init_script_content = """#!/bin/bash
# =============================================================================
# SpaCy 模型自動安裝指令碼
# 執行時機:叢集每個節點啟動時
# 目的:確保所有節點都具備相同的 NLP 模型環境
# 維護資訊:請配合模型更新調整版本號碼
# =============================================================================

# 啟用嚴格模式,任何命令失敗都會中斷指令碼執行
# -e:命令失敗時立即結束
# -u:使用未定義變數時視為錯誤
# -o pipefail:管線中任何命令失敗都視為整個管線失敗
set -euo pipefail

# 記錄開始時間,用於效能監控
echo "[$(date '+%Y-%m-%d %H:%M:%S')] 開始安裝 SpaCy 模型"

# -----------------------------------------------------------------------------
# 安裝 SpaCy 官方預訓練大型英文語言模型
#
# en_core_web_lg 是 SpaCy 提供的大型英文語言模型,包含以下功能:
#   完整詞彙表:約 500,000 個詞條
#   詞向量:685,000 維度的詞向量,支援語意相似度計算
#   命名實體識別:可識別 18 種實體類別
#   詞性標註:精確的詞性分析
#   依存句法分析:完整的語法結構分析
#
# 模型檔案大小約 800MB,首次載入需要數秒時間。
# 在分散式環境中,每個節點都需要載入自己的模型實例,
# 因此需要確保每個節點都有足夠的記憶體空間。
#
# 使用 pip install 從本地檔案安裝模型的好處是:
#   不需要網路連線,適合封閉網路環境
#   安裝速度快,因為不需要下載
#   版本可控,確保所有環境使用相同版本
# -----------------------------------------------------------------------------
pip install /dbfs/databricks/models/spacy/en_core_web_lg-3.7.1.tar.gz

# -----------------------------------------------------------------------------
# 安裝自訂訓練的命名實體識別模型(選用)
#
# 如果有針對特定領域訓練的自訂模型,可以在此一併安裝。
# 自訂模型能識別標準模型無法辨識的領域專有實體,例如:
#   醫療領域:藥物名稱、疾病名稱、醫療程序
#   法律領域:法條編號、案件名稱、法律術語
#   金融領域:金融產品名稱、財務指標、公司代碼
#
# 取消下方註解以啟用自訂模型安裝
# -----------------------------------------------------------------------------
# pip install /dbfs/databricks/models/spacy/custom_ner_model-0.0.1.tar.gz

# 記錄完成時間
echo "[$(date '+%Y-%m-%d %H:%M:%S')] SpaCy 模型安裝完成"

# 驗證安裝結果
python -c "import spacy; print(f'SpaCy 版本: {spacy.__version__}')"
python -c "import en_core_web_lg; print('en_core_web_lg 模型載入成功')"

echo "[$(date '+%Y-%m-%d %H:%M:%S')] 模型驗證完成"
"""

# -----------------------------------------------------------------------------
# 將指令碼寫入 DBFS
#
# dbutils.fs.put 方法用於將字串內容寫入檔案,參數說明:
#   第一個參數:目標檔案的完整路徑
#   第二個參數:要寫入的字串內容
#   第三個參數:布林值,True 表示如果檔案已存在則覆寫
#
# 指令碼建立後,需要在叢集組態中指定此路徑才會在啟動時執行。
# 設定方式是進入叢集編輯頁面,展開「進階選項」區塊,
# 在「初始化指令碼」項目中新增指令碼路徑。
# -----------------------------------------------------------------------------
dbutils.fs.put(
    "dbfs:/databricks/scripts/spacy_init.sh",
    init_script_content,
    True
)

# -----------------------------------------------------------------------------
# 驗證指令碼內容
#
# 使用 head 方法讀取指令碼的前 2000 個位元組,確認內容正確。
# 這個驗證步驟可以及早發現語法錯誤或格式問題,
# 避免在叢集啟動時才發現問題。
# -----------------------------------------------------------------------------
print("指令碼內容預覽:")
print(dbutils.fs.head("dbfs:/databricks/scripts/spacy_init.sh"))

建立初始化指令碼後,需要在叢集組態中指定這個指令碼的路徑。開發者需要進入 Databricks 網頁介面的「Compute」頁面,選擇要設定的叢集並點選「Edit」按鈕進入編輯模式。在叢集編輯頁面的下方有一個「Advanced Options」區塊,展開後可以看到「Init Scripts」設定項目。在這個設定項目中點選「Add」按鈕,選擇「DBFS」作為來源類型,然後輸入初始化指令碼的完整路徑,也就是剛才建立的 dbfs:/databricks/scripts/spacy_init.sh

設定完成後儲存叢集組態並重新啟動叢集,系統會在每個節點的啟動過程中執行初始化指令碼。可以在叢集的「Driver Logs」或「Event Log」中查看指令碼的執行結果,確認模型是否成功安裝。如果初始化指令碼執行失敗,叢集可能無法正常啟動,此時需要檢查指令碼內容是否有語法錯誤或路徑錯誤。

這種環境建置方式的核心優勢在於實現了基礎設施即程式碼的理念,所有的環境設定都以程式碼形式記錄,可以納入版本控制系統進行管理。當團隊成員需要建立新叢集或在不同環境間遷移時,只需執行相同的設定程式碼,就能確保環境完全一致。這種一致性對於避免「在我的機器上可以執行」這類問題非常重要,也是維持生產系統穩定性的基礎。

資料載入與前處理策略

完成環境建置後,下一步是載入待處理的文字資料。資料載入看似簡單,但在分散式環境中有許多需要注意的細節。正確的資料載入策略可以避免後續處理過程中的各種問題,同時也能為效能優化奠定基礎。資料載入階段的主要任務包括讀取資料檔案、進行資料品質檢查、執行必要的資料清理,以及設定適當的資料分區策略。

本文使用 AG News Dataset 作為示範資料集,這是自然語言處理領域廣泛使用的公開資料集。AG News Dataset 包含來自各大新聞網站的新聞標題與摘要,涵蓋世界新聞、體育、商業與科技四個類別。這個資料集總計約 120,000 筆訓練資料與 7,600 筆測試資料,規模適中,既能展示分散式處理效果,又不會因資料量過大而影響實驗便利性。對於正在學習分散式自然語言處理的開發者來說,AG News Dataset 是理想的練習資料集。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100

title 資料載入與前處理流程

partition 資料載入 {
    :讀取 CSV 檔案;
    note right
        使用 Spark CSV Reader
        設定正確的解析參數
    end note
    :自動推斷資料型別;
}

partition 資料品質檢查 {
    :偵測空值與異常;
    note right
        統計各欄位空值比例
        識別資料品質問題
    end note
    :分析文字長度分布;
    note right
        了解資料特性
        規劃處理策略
    end note
}

partition 資料清理 {
    :過濾空值資料;
    :移除過短文字;
    note right
        過短文字缺乏語境
        實體識別效果差
    end note
}

partition 效能優化 {
    :調整資料分區數量;
    note right
        平衡運算資源利用
        避免資料傾斜
    end note
    :快取至記憶體;
    note right
        減少重複讀取
        提升處理效率
    end note
}

@enduml

在 Databricks 載入資料時,PySpark 提供了豐富的資料讀取功能,能處理各種格式的資料來源。對於 CSV 格式的文字資料,需要特別注意引號與跳脫字元的處理,因為新聞內容經常包含引號與特殊字元。如果沒有正確設定這些參數,可能導致資料解析錯誤,某些欄位的內容會被錯誤地切割或合併。PySpark 的 CSV 讀取器提供了多種參數來處理這些邊界情況,開發者需要根據實際資料的格式特性選擇適當的設定。

CSV 格式雖然簡單普遍,但在處理包含特殊字元的文字資料時有其限制。新聞摘要經常包含逗號、引號、換行符號等字元,這些字元在 CSV 格式中具有特殊意義,如果不正確處理會導致解析錯誤。常見的解決方式是使用引號包圍欄位值,並使用跳脫字元處理內容中的引號。PySpark 的 CSV 讀取器透過 quoteescape 參數提供了這種處理能力,讓開發者可以正確讀取包含特殊字元的資料。

# =============================================================================
# 資料載入與前處理模組
# 目的:載入新聞資料集並進行品質檢查與清理
# 適用資料集:AG News Dataset 或類似結構的新聞資料
# =============================================================================

# -----------------------------------------------------------------------------
# 匯入必要的函式庫
#
# 這些函式庫各自負責不同的功能,在自然語言處理流程中都扮演重要角色:
#   spacy:自然語言處理的核心功能
#   numpy:數值運算與陣列操作
#   pandas:資料處理與分析
#   pyspark.sql.functions:Spark SQL 的各種操作函式
#   pyspark.sql.types:Spark SQL 的資料型別定義
# -----------------------------------------------------------------------------

# SpaCy:自然語言處理核心函式庫
# 提供詞性標註、命名實體識別、依存句法分析等功能
# SpaCy 的設計理念是「提供最佳實踐」,預設設定已經過優化
import spacy

# NumPy:數值運算函式庫
# 提供高效的多維陣列運算能力,是許多科學計算套件的基礎
# 在自然語言處理中常用於處理詞向量與數值特徵
import numpy as np

# Pandas:資料分析與處理函式庫
# 提供 DataFrame 資料結構與豐富的資料操作方法
# 在 Pandas UDF 中會使用到 Pandas 的資料結構
import pandas as pd

# PySpark SQL 函式庫
# 這些函式用於 DataFrame 的各種操作,是建構資料處理流程的基本工具
from pyspark.sql.functions import (
    udf,        # 使用者定義函式,將 Python 函式轉換為 Spark 可用的分散式函式
    col,        # 選取 DataFrame 欄位的便捷函式,等同於 df["column_name"]
    length,     # 計算字串長度的內建函式
    when,       # 條件判斷函式,類似 SQL 的 CASE WHEN 語法
    isnull      # 檢查空值的函式,回傳布林值
)

# PySpark 資料型別定義
# 這些型別用於定義 UDF 的輸入輸出型別與 DataFrame Schema
# 正確的型別定義對於 Spark 的效能優化非常重要
from pyspark.sql.types import (
    ArrayType,      # 陣列型別,用於表示元素列表
    StructType,     # 結構型別,類似 JSON 物件或資料表的一列
    StructField,    # 結構欄位定義,包含欄位名稱與型別
    StringType,     # 字串型別
    IntegerType     # 整數型別
)

# -----------------------------------------------------------------------------
# 定義資料來源路徑
#
# 使用雲端物件儲存作為資料來源是企業資料平台的標準做法,
# 這種架構具有以下優點:
#   高可用性:雲端儲存服務提供多區域複製與故障轉移
#   彈性擴展:可以儲存任意大小的資料集
#   成本效益:只需支付實際使用的儲存空間
#   安全性:支援存取控制與加密
#
# 對於部署位置的選擇,建議使用地理位置較近的區域來儲存資料,
# 例如 AWS 的東京區域或 Azure 的日本區域,以降低網路延遲。
# -----------------------------------------------------------------------------
INPUT_PATH = "s3a://nlp-demo/ag_dataset/train.csv"

# -----------------------------------------------------------------------------
# 使用 Spark 讀取 CSV 檔案
#
# spark 是 SparkSession 物件,在 Databricks Notebook 環境中會自動建立。
# SparkSession 是 Spark 2.0 以後的統一入口點,整合了 SparkContext、
# SQLContext 和 HiveContext 的功能。
#
# 讀取 CSV 檔案時,各參數的作用說明如下:
#
#   header='true':
#     表示 CSV 檔案的第一列是欄位名稱,Spark 會使用這一列作為
#     DataFrame 的欄位名稱。如果設為 'false',Spark 會自動產生
#     _c0、_c1 等預設欄位名稱。
#
#   inferSchema='true':
#     啟用自動型別推斷功能。Spark 會掃描資料的前幾列,
#     推斷每個欄位的資料型別。這個功能方便但會有額外的掃描成本,
#     對於大型資料集,建議手動定義 Schema 以提升效能。
#
#   quote='"':
#     指定引號字元。當欄位值包含逗號或換行符號時,
#     會使用引號將整個值包圍起來。這個設定告訴 Spark 使用
#     雙引號作為引號字元。
#
#   escape='"':
#     指定跳脫字元。當欄位值本身包含引號時,需要使用跳脫字元
#     來區分。在標準 CSV 中,通常使用連續兩個引號表示一個引號,
#     因此這裡也設定為雙引號。
#
#   multiLine='true':
#     允許欄位值跨越多行。新聞摘要可能包含換行符號,
#     啟用這個選項可以正確處理這種情況。但要注意這會降低
#     讀取效能,因為無法並行處理單一檔案的不同部分。
# -----------------------------------------------------------------------------
df = spark.read.format('csv').options(
    header='true',
    inferSchema='true',
    quote='"',
    escape='"',
    multiLine='true'
).load(INPUT_PATH)

# -----------------------------------------------------------------------------
# 顯示資料集基本資訊
#
# 在開始資料處理之前,了解資料的基本特性是非常重要的步驟。
# 這些資訊可以幫助開發者規劃後續的處理策略,
# 例如資料量決定需要多少運算資源,欄位結構決定如何設計處理邏輯。
# -----------------------------------------------------------------------------
print(f"資料集總筆數:{df.count():,}")
print(f"資料集欄位:{df.columns}")

# 顯示資料結構與型別
# printSchema 方法會以樹狀結構顯示 DataFrame 的 Schema,
# 包含每個欄位的名稱、型別與是否允許空值
df.printSchema()

# 預覽前幾筆資料,了解資料格式與內容
# show 方法顯示 DataFrame 的前 n 筆資料
# truncate 參數限制每個欄位最多顯示的字元數,避免輸出過長
df.show(5, truncate=80)

資料載入後,進行基本的資料品質檢查與前處理是不可或缺的步驟。對於自然語言處理任務,需要特別關注文字欄位是否存在空值、文字長度的分布情況,以及是否存在明顯的資料異常。這些檢查有助於預防後續處理過程可能發生的錯誤,同時也能幫助開發者了解資料特性,為後續模型調整提供依據。在實務經驗中,資料品質問題往往是導致自然語言處理專案失敗的主要原因之一,因此投入足夠的時間進行資料品質檢查是非常值得的。

# =============================================================================
# 資料品質檢查與清理
# =============================================================================

# -----------------------------------------------------------------------------
# 檢查各欄位的空值數量
# 空值會導致 SpaCy 模型處理錯誤,必須在前處理階段妥善處理
# 在大型資料集中,即使空值比例很低,絕對數量也可能相當可觀
# -----------------------------------------------------------------------------
print("=== 資料品質檢查 ===")

# 計算 description 欄位中空值的數量
# filter() 篩選符合條件的資料列
# isnull() 檢查欄位是否為空值,回傳布林值
# count() 計算篩選後的資料筆數
null_count = df.filter(isnull(col("description"))).count()
total_count = df.count()
null_percentage = (null_count / total_count) * 100

print(f"description 欄位空值數量:{null_count:,} 筆 ({null_percentage:.2f}%)")

# -----------------------------------------------------------------------------
# 分析文字長度分布
# 了解文字長度有助於評估處理時間與記憶體需求
# 過長的文字可能需要分段處理或設定 SpaCy 的 max_length 參數
# 過短的文字可能缺乏足夠的語境資訊進行有效的實體識別
# -----------------------------------------------------------------------------

# withColumn 方法新增一個名為 text_length 的計算欄位
df_with_length = df.withColumn(
    "text_length",
    length(col("description"))
)

# 計算文字長度的描述性統計
# summary() 計算最小值、各分位數、最大值、平均值等統計資訊
print("\n文字長度統計:")
length_stats = df_with_length.select("text_length").summary(
    "min", "25%", "50%", "75%", "max", "mean"
)
length_stats.show()

# -----------------------------------------------------------------------------
# 資料清理:過濾空值與過短文字
# isNotNull() 確保欄位值不為空值
# length() > 20 過濾掉長度過短的文字(20 字元約為 3-5 個英文單詞)
# 過短的文字通常缺乏足夠語境資訊,實體識別效果不佳
# -----------------------------------------------------------------------------
df_cleaned = df.filter(
    (col("description").isNotNull()) &
    (length(col("description")) > 20)
)

cleaned_count = df_cleaned.count()
removed_count = total_count - cleaned_count
print(f"\n清理前資料筆數:{total_count:,}")
print(f"清理後資料筆數:{cleaned_count:,}")
print(f"移除資料筆數:{removed_count:,} ({(removed_count/total_count)*100:.2f}%)")

# -----------------------------------------------------------------------------
# 快取清理後的資料
# cache() 將 DataFrame 保留在各節點的記憶體中
# 這是延遲執行操作,會在第一次行動操作時實際執行
# 對於需要多次存取的資料,快取能大幅提升後續操作的執行效能
# -----------------------------------------------------------------------------
df_cleaned.cache()

# 觸發快取並統計實際資料量
print(f"\n資料已快取,準備進行 NLP 處理")

資料分區策略是影響處理效能的重要因素。在 Spark 中,資料被切分為多個分區,每個分區獨立處理,這是實現平行運算的基礎。分區數量的設定會直接影響運算平行度與資源利用率,分區數量過少會導致部分運算資源閒置,分區數量過多則會增加任務排程的額外負擔。根據實務經驗,每個 CPU 核心處理二到四個分區是比較理想的設定,這樣可以確保在某些任務完成時,還有待處理任務可接續執行,充分利用運算資源。

# =============================================================================
# 資料分區優化
# =============================================================================

# -----------------------------------------------------------------------------
# 檢查目前的分區數量
# 了解資料分布狀況,判斷是否需要調整
# -----------------------------------------------------------------------------
current_partitions = df_cleaned.rdd.getNumPartitions()
print(f"目前分區數量:{current_partitions}")

# -----------------------------------------------------------------------------
# 根據叢集規模計算最佳分區數量
# 假設使用的叢集配置:
#   - 4 個 Worker 節點
#   - 每個節點 8 個 CPU 核心
#   - 總共 32 個核心
# 建議分區數量:核心數 × 2 到 核心數 × 4
# 這個範圍能確保所有核心都有任務可執行,同時不會有過多的排程開銷
# -----------------------------------------------------------------------------
CLUSTER_CORES = 32  # 請根據實際叢集配置調整
PARTITIONS_PER_CORE = 3
optimal_partitions = CLUSTER_CORES * PARTITIONS_PER_CORE

print(f"叢集核心總數:{CLUSTER_CORES}")
print(f"建議分區數量:{optimal_partitions}")

# -----------------------------------------------------------------------------
# 調整分區數量
# repartition() 會觸發完整的資料重新洗牌(Shuffle)
# Shuffle 會將資料重新分配到各個分區,確保資料均勻分布
# 這個操作相對耗時,但對於後續大量運算來說是值得的投資
# -----------------------------------------------------------------------------
if current_partitions != optimal_partitions:
    df_repartitioned = df_cleaned.repartition(optimal_partitions)
    df_repartitioned.cache()
    
    # 釋放原本的快取,避免佔用過多記憶體
    df_cleaned.unpersist()
    
    print(f"已調整分區數量為:{optimal_partitions}")
else:
    df_repartitioned = df_cleaned
    print("分區數量已為最佳值,無需調整")

命名實體識別模型整合與實作

命名實體識別是自然語言處理的基礎任務之一,其目標是從文字中識別具有特定意義的詞彙或詞組,並將其分類為預先定義的類別。常見的實體類別包括人名、地名、組織名稱、日期時間、金額等。在商業應用中,命名實體識別廣泛用於資訊擷取、知識圖譜建構、智慧搜尋、風險監控等場景。透過自動化的實體識別,企業能從大量非結構化文字快速擷取結構化資訊,為後續資料分析與決策支援提供堅實的基礎。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100

title 命名實體識別處理流程

|輸入處理|
start
:接收原始新聞文字;
note right
    來自 DataFrame 的
    description 欄位
end note

|自然語言處理|
:文字斷詞與標記化;
note right
    Tokenization
    將文字拆分為詞彙單位
end note
:詞性標註分析;
note right
    POS Tagging
    識別名詞、動詞等詞性
end note
:命名實體識別模型;
note right
    SpaCy en_core_web_lg 模型
    使用深度學習識別實體
    支援 18 種實體類別
end note

|結果輸出|
:實體文字內容;
:實體位置資訊;
:實體類別標籤;
note right
    PERSON:人名
    ORG:組織名稱
    GPE:地理政治實體
    DATE:日期
    MONEY:金額
end note
stop

@enduml

要在 PySpark 中使用 SpaCy 進行命名實體識別,需要將 SpaCy 的處理邏輯封裝為使用者定義函式。UDF 是 Spark SQL 提供的擴展機制,允許開發者使用 Python 撰寫自訂資料處理邏輯,並將其應用於 DataFrame 的欄位運算。透過 UDF,開發者可以在分散式環境中使用任何 Python 函式庫,這大幅擴展了 Spark 的應用範圍。然而 UDF 也有其效能代價,因為資料需要在 JVM 與 Python 直譯器之間序列化與反序列化,這個過程會消耗額外的運算資源。

首先需要定義 UDF 的輸出結構。由於命名實體識別的結果包含多個實體,而每個實體又包含多個屬性,因此需要使用巢狀資料結構來表示。這種結構化輸出格式便於後續資料分析與視覺化,也能直接儲存為 Parquet 等欄位式格式,保留完整的結構資訊。

# =============================================================================
# 命名實體識別 UDF 定義
# =============================================================================

# -----------------------------------------------------------------------------
# 定義輸出資料結構
# Spark 需要明確的型別定義才能正確處理資料
# 每筆文字可能包含零到多個實體,因此外層使用 ArrayType 表示陣列
# 每個實體使用 StructType 定義結構,包含四個屬性:
#   - text:實體的文字內容(如 "蘋果公司"、"台北市")
#   - start_char:實體在原文中的起始字元位置(索引從 0 開始)
#   - end_char:實體在原文中的結束字元位置(不包含該位置字元)
#   - label:實體的分類標籤(如 PERSON、ORG、GPE、DATE)
# -----------------------------------------------------------------------------
entity_schema = ArrayType(
    StructType([
        StructField("text", StringType(), nullable=False),
        StructField("start_char", IntegerType(), nullable=False),
        StructField("end_char", IntegerType(), nullable=False),
        StructField("label", StringType(), nullable=False)
    ])
)

# -----------------------------------------------------------------------------
# 定義命名實體識別函式
# 此函式將被轉換為 PySpark UDF,在各工作節點上平行執行
# 函式設計要點:
#   1. 處理空值與異常輸入
#   2. 使用全域變數快取模型,避免重複載入
#   3. 回傳結構化結果,符合預定義的 Schema
# -----------------------------------------------------------------------------
def extract_entities(text):
    """
    從輸入文字中擷取命名實體
    
    此函式使用 SpaCy 的預訓練語言模型處理輸入文字,
    識別其中的命名實體並回傳結構化結果。
    為了在分散式環境中維持效能,採用全域變數快取已載入的模型。
    
    Parameters
    ----------
    text : str
        待處理的文字字串,可以是任何長度的英文文字
    
    Returns
    -------
    list
        包含所有識別實體的串列,每個實體為一個包含
        [文字, 起始位置, 結束位置, 標籤] 的串列
        若無識別到任何實體,回傳空串列
    
    Examples
    --------
    >>> extract_entities("Apple CEO Tim Cook announced new products in Cupertino.")
    [['Apple', 0, 5, 'ORG'], ['Tim Cook', 10, 18, 'PERSON'], ['Cupertino', 47, 56, 'GPE']]
    """
    # 使用全域變數儲存已載入的模型
    # 這樣可以避免每次函式呼叫都重新載入模型,大幅提升效能
    # 在分散式環境中,每個 Worker 節點會維護自己的模型實例
    # 模型實例在 Executor 的生命週期內會被重複使用
    global nlp_model
    
    # 處理空值輸入,避免模型處理錯誤
    if text is None or str(text).strip() == '':
        return []
    
    try:
        # 嘗試使用已載入的模型處理文字
        doc = nlp_model(str(text))
    except NameError:
        # 若模型尚未載入(nlp_model 變數不存在),則進行載入
        # en_core_web_lg 是 SpaCy 提供的大型英文語言模型
        # 包含完整的命名實體識別、詞向量與語法分析能力
        # 模型大小約 800MB,首次載入需要數秒時間
        nlp_model = spacy.load('en_core_web_lg')
        doc = nlp_model(str(text))
    
    # 從處理結果中擷取所有實體
    # doc.ents 是一個包含所有識別實體的元組
    # 每個實體物件(Entity)具有以下重要屬性:
    #   - text:實體的原始文字內容
    #   - start_char:起始字元位置
    #   - end_char:結束字元位置
    #   - label_:類別標籤字串(注意有底線)
    entities = [
        [ent.text, ent.start_char, ent.end_char, ent.label_]
        for ent in doc.ents
    ]
    
    return entities

# -----------------------------------------------------------------------------
# 將 Python 函式轉換為 PySpark UDF
# udf() 函式接收 Python 函式與輸出型別定義作為參數
# 轉換後的 UDF 可以在 DataFrame 的 withColumn、select 等操作中使用
# UDF 會在 Spark 執行時自動序列化並分發到各 Worker 節點
# -----------------------------------------------------------------------------
extract_entities_udf = udf(extract_entities, entity_schema)

定義好 UDF 後,就可將其應用於 DataFrame 進行批次處理。PySpark 會自動將資料分散到叢集各節點平行處理,充分利用分散式運算優勢。這種平行處理方式能線性擴展處理能力,當資料量增加時,只需增加叢集節點即可維持處理效率,這是單機處理方式無法達到的擴展性。

# =============================================================================
# 執行命名實體識別
# =============================================================================

# -----------------------------------------------------------------------------
# 將 NER UDF 應用於 DataFrame
# withColumn 方法會新增一個名為 entities 的欄位
# 該欄位包含從 description 欄位擷取出的所有命名實體
# 這是延遲執行(Lazy Evaluation)操作,定義轉換但不立即執行
# Spark 會自動優化執行計畫,並在觸發行動操作時才實際執行
# -----------------------------------------------------------------------------
documents_df = df_repartitioned.withColumn(
    'entities',
    extract_entities_udf(col('description'))
)

# -----------------------------------------------------------------------------
# 預覽處理結果
# show() 是行動操作,會觸發實際的分散式運算
# 此時 Spark 會開始在各節點平行執行 NER 處理
# truncate=80 限制每個欄位的顯示長度
# -----------------------------------------------------------------------------
print("=== NER 處理結果預覽 ===")
documents_df.select('description', 'entities').show(5, truncate=80)

# -----------------------------------------------------------------------------
# 統計各類別實體的數量分布
# 這個分析能幫助了解資料集中實體類型的組成
# 對於評估模型效果與規劃後續應用都很有價值
# -----------------------------------------------------------------------------
from pyspark.sql.functions import explode, size

# 先統計每筆文字識別出的實體數量
entity_count_stats = documents_df.withColumn(
    'entity_count',
    size(col('entities'))
).select('entity_count').summary()
print("\n每筆文字的實體數量統計:")
entity_count_stats.show()

# 展開實體陣列,將每個實體變成獨立的資料列
# explode() 函式會將陣列欄位中的每個元素展開為獨立的列
# 原本一列包含多個實體的資料,會被展開為多列,每列一個實體
entities_exploded = documents_df.select(
    explode(col('entities')).alias('entity')
)

# 按實體類別統計數量
# col('entity.label') 存取巢狀結構中的 label 欄位
# groupBy 將相同標籤的實體歸為一組
# count() 計算每組的數量
# orderBy 按數量降序排列
entity_type_counts = entities_exploded.select(
    col('entity.label').alias('entity_type')
).groupBy('entity_type').count().orderBy(col('count').desc())

print("\n各類別命名實體數量分布:")
entity_type_counts.show(15)

SpaCy 的預訓練模型 en_core_web_lg 能識別十八種命名實體類別,涵蓋了大多數常見的實體類型。這些類別包括人名 PERSON、國籍或宗教政治團體 NORP、設施名稱 FAC、組織名稱 ORG、地理政治實體 GPE、地點 LOC、產品名稱 PRODUCT、事件名稱 EVENT、藝術作品名稱 WORK_OF_ART、法律文件 LAW、語言 LANGUAGE、日期 DATE、時間 TIME、百分比 PERCENT、金額 MONEY、數量 QUANTITY、序數 ORDINAL 以及基數 CARDINAL。對於特定領域應用,若預訓練模型無法滿足需求,開發者也可使用 Prodigy 等標註工具訓練自訂模型,以識別領域專有的實體類型。

效能優化策略與 Pandas UDF 實作

在大規模自然語言處理任務中,效能優化是確保系統能在合理時間內完成處理的關鍵。傳統的 Python UDF 雖然使用方便,但因為需要在 JVM 與 Python 之間逐列傳輸資料,會產生顯著的序列化開銷。Pandas UDF 是 Spark 2.3 版本引入的功能,透過 Apache Arrow 實現高效的批次資料傳輸,能大幅減少這種開銷,根據官方測試,效能提升可達三到一百倍不等。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100

title 效能優化架構

package "資料層優化" {
    rectangle "資料分區調整" as d1
    note bottom of d1
        根據叢集規模設定
        最佳分區數量
    end note
    rectangle "快取策略設定" as d2
    note bottom of d2
        將頻繁存取的資料
        保留在記憶體中
    end note
    rectangle "資料格式選擇" as d3
    note bottom of d3
        使用 Parquet 等
        欄位式儲存格式
    end note
}

package "運算層優化" {
    rectangle "Pandas UDF 向量化" as c1
    note bottom of c1
        批次處理減少
        資料傳輸成本
    end note
    rectangle "SpaCy 批次處理" as c2
    note bottom of c2
        使用 nlp.pipe
        提升處理效率
    end note
    rectangle "模型載入快取" as c3
    note bottom of c3
        避免重複載入
        昂貴的模型資源
    end note
}

package "資源層優化" {
    rectangle "叢集規模配置" as r1
    rectangle "記憶體分配調整" as r2
    rectangle "執行器核心設定" as r3
}

d1 --> c1
d2 --> c2
d3 --> c3
c1 --> r1
c2 --> r2
c3 --> r3

@enduml

Pandas UDF 以 Pandas Series 或 DataFrame 為單位進行處理,而非逐列處理。這不僅減少了資料傳輸次數,還能讓開發者在 UDF 內部使用 SpaCy 的批次處理功能 nlp.pipe,進一步提升處理效率。nlp.pipe 方法能批次處理多段文字,相較於逐一處理,可更有效利用 CPU 的向量化運算能力,同時減少 Python 函式呼叫的額外負擔。

# =============================================================================
# Pandas UDF 實作:高效能命名實體識別
# =============================================================================

# -----------------------------------------------------------------------------
# 匯入 Pandas UDF 相關模組
# pandas_udf 是裝飾器,用於將 Python 函式轉換為 Pandas UDF
# PandasUDFType 定義 UDF 的類型(SCALAR 或 GROUPED_MAP)
# -----------------------------------------------------------------------------
from pyspark.sql.functions import pandas_udf
from typing import Iterator

# -----------------------------------------------------------------------------
# 定義 Pandas UDF 進行批次命名實體識別
# 使用 Iterator 型別提示,表示這是一個迭代器形式的 Pandas UDF
# 迭代器形式能更有效地處理大型資料集,減少記憶體使用
# 輸出型別使用先前定義的 entity_schema
# -----------------------------------------------------------------------------
@pandas_udf(entity_schema)
def extract_entities_optimized(
    text_iterator: Iterator[pd.Series]
) -> Iterator[pd.Series]:
    """
    使用 Pandas UDF 進行高效能批次命名實體識別
    
    此函式利用 Pandas UDF 的批次處理機制,結合 SpaCy 的 nlp.pipe 方法,
    實現高效能的分散式命名實體識別。相較於傳統 UDF,效能可提升 3-10 倍。
    
    Parameters
    ----------
    text_iterator : Iterator[pd.Series]
        文字資料的迭代器,每個批次為一個 Pandas Series
    
    Yields
    ------
    pd.Series
        對應的實體識別結果,每個元素為一個實體串列
    
    Notes
    -----
    迭代器形式的 Pandas UDF 特別適合需要初始化昂貴資源(如載入模型)的場景,
    因為初始化只需要在迭代開始時執行一次,而非每個批次都執行。
    """
    # 在迭代器開始時載入模型,整個迭代過程中只載入一次
    # 這比在每個批次中檢查並載入模型更有效率
    nlp = spacy.load('en_core_web_lg')
    
    # 設定 SpaCy 的批次處理參數
    # batch_size 控制每批處理的文件數量
    # 較大的批次能提升效能,但會增加記憶體使用
    # 根據實務測試,50-100 是比較好的平衡點
    BATCH_SIZE = 64
    
    # 遍歷所有文字批次
    for text_series in text_iterator:
        # 將 Series 轉換為串列,準備進行批次處理
        # fillna('') 將空值替換為空字串,避免 SpaCy 處理錯誤
        # astype(str) 確保所有值都是字串型別
        texts = text_series.fillna('').astype(str).tolist()
        
        # 使用 nlp.pipe 進行批次處理
        # pipe 方法接收可迭代物件,回傳處理結果的生成器
        # 這比逐一呼叫 nlp(text) 更有效率
        results = []
        for doc in nlp.pipe(texts, batch_size=BATCH_SIZE):
            # 擷取每篇文件中的所有實體
            entities = [
                [ent.text, ent.start_char, ent.end_char, ent.label_]
                for ent in doc.ents
            ]
            results.append(entities)
        
        # 將結果轉換為 Pandas Series 並回傳
        # 結果 Series 的長度必須與輸入 Series 相同
        yield pd.Series(results)

# -----------------------------------------------------------------------------
# 使用優化版 UDF 處理資料
# 語法與傳統 UDF 相同,但內部處理機制完全不同
# Pandas UDF 會將多筆資料打包成批次處理,大幅提升效能
# -----------------------------------------------------------------------------
documents_df_optimized = df_repartitioned.withColumn(
    'entities',
    extract_entities_optimized(col('description'))
)

# -----------------------------------------------------------------------------
# 執行處理並計時
# 透過實際執行來驗證效能提升效果
# -----------------------------------------------------------------------------
import time

print("開始執行優化版 NER 處理...")
start_time = time.time()

# 觸發實際運算(count 是行動操作)
processed_count = documents_df_optimized.count()

end_time = time.time()
processing_time = end_time - start_time

print(f"處理完成")
print(f"處理資料筆數:{processed_count:,}")
print(f"總處理時間:{processing_time:.2f} 秒")
print(f"平均每秒處理:{processed_count/processing_time:.0f} 筆")

除了 Pandas UDF 之外,記憶體設定也是效能調校的重要面向。SpaCy 的語言模型載入後會佔用相當的記憶體空間,en_core_web_lg 模型約需 800MB 至 1GB。加上處理過程產生的中間資料,每個 Worker 節點至少需要 4GB 以上可用記憶體才能穩定運作。在 Databricks 中,可透過 Spark 設定調整 Executor 的記憶體配置,確保有足夠記憶體載入模型與處理資料。記憶體不足可能導致頻繁的垃圾回收甚至任務失敗,因此在處理大量資料時,務必確認記憶體配置充足。

# =============================================================================
# 叢集資源設定檢視
# =============================================================================

# -----------------------------------------------------------------------------
# 檢視目前的 Spark 設定
# 這些設定會影響分散式運算的效能與穩定性
# 若發現設定不適當,需要在叢集組態中調整後重新啟動叢集
# -----------------------------------------------------------------------------
print("=== 目前 Spark 設定 ===")

# Executor 記憶體:每個執行器可使用的記憶體量
# 建議至少 8GB 以上,確保能載入語言模型並有足夠緩衝空間
executor_memory = spark.conf.get("spark.executor.memory", "未設定")
print(f"Executor 記憶體:{executor_memory}")

# Executor 核心數:每個執行器使用的 CPU 核心數
# 通常設定為 4-8 個核心,取決於任務類型
executor_cores = spark.conf.get("spark.executor.cores", "未設定")
print(f"Executor 核心數:{executor_cores}")

# Driver 記憶體:驅動程式可使用的記憶體量
# 需要足夠記憶體來處理結果彙整與廣播變數
driver_memory = spark.conf.get("spark.driver.memory", "未設定")
print(f"Driver 記憶體:{driver_memory}")

# Shuffle 分區數:資料洗牌時的分區數量
# 建議與資料處理時的分區數量保持一致
shuffle_partitions = spark.conf.get("spark.sql.shuffle.partitions", "未設定")
print(f"Shuffle 分區數:{shuffle_partitions}")

# -----------------------------------------------------------------------------
# 建議的叢集設定(需在 Databricks 叢集組態中設定)
# 以下設定適用於中型自然語言處理任務
# -----------------------------------------------------------------------------
print("\n=== 建議設定 ===")
print("spark.executor.memory: 8g 或更高")
print("spark.executor.cores: 4")
print("spark.driver.memory: 8g")
print("spark.sql.shuffle.partitions: 與資料分區數相同")
print("spark.python.worker.memory: 2g")

處理結果儲存與後續應用

完成命名實體識別處理後,需要將結果儲存到永久儲存系統中,以便後續分析或應用程式使用。儲存格式的選擇會影響後續查詢效能與儲存成本。Parquet 是 Spark 生態系統中最常用的儲存格式,它採用欄位式儲存與高效壓縮技術,特別適合分析型查詢。欄位式儲存的優點在於查詢特定欄位時只需讀取相關資料,而不需掃描整個資料集,這在大型資料集上能帶來顯著效能提升,同時也能節省儲存空間與傳輸成本。

# =============================================================================
# 處理結果儲存
# =============================================================================

# -----------------------------------------------------------------------------
# 定義輸出路徑
# 使用雲端物件儲存作為永久儲存位置
# Parquet 格式能完整保留巢狀結構,且具有良好的壓縮效率
# -----------------------------------------------------------------------------
OUTPUT_PATH = "s3a://nlp-demo/ag_dataset/processed/entities_result"

# -----------------------------------------------------------------------------
# 儲存處理結果為 Parquet 格式
# write.parquet 是 DataFrame 的輸出方法
# mode 參數控制當目標路徑已存在時的處理方式:
#   - overwrite:覆寫已存在的資料
#   - append:追加到現有資料
#   - ignore:若已存在則不執行
#   - error:若已存在則拋出錯誤
# compression 參數指定壓縮演算法,snappy 提供良好的壓縮/解壓縮效能平衡
# -----------------------------------------------------------------------------
documents_df_optimized.write.parquet(
    OUTPUT_PATH,
    mode="overwrite",
    compression="snappy"
)

print(f"處理結果已儲存至:{OUTPUT_PATH}")

# -----------------------------------------------------------------------------
# 驗證儲存結果
# 讀取剛才儲存的資料,確認儲存成功且資料完整
# -----------------------------------------------------------------------------
result_df = spark.read.parquet(OUTPUT_PATH)
stored_count = result_df.count()

print(f"儲存的資料筆數:{stored_count:,}")

# 顯示資料結構,確認巢狀欄位正確保留
print("\n資料結構:")
result_df.printSchema()

# 預覽儲存的資料
print("\n資料預覽:")
result_df.select('description', 'entities').show(3, truncate=60)

儲存的結果可用於多種後續應用。開發者可以建立實體關係圖譜,分析不同實體間的共現關係,了解新聞報導中哪些組織與人物經常一起被提及。可以統計特定類型實體的出現頻率,了解新聞報導的焦點與趨勢變化。也可以將實體資訊作為特徵,輸入到機器學習模型中進行更進一步的分析,例如預測新聞類別、偵測假新聞或識別重要事件。

# =============================================================================
# 進階分析範例:實體統計與關聯分析
# =============================================================================

from pyspark.sql.functions import explode, col, collect_list, array_distinct

# -----------------------------------------------------------------------------
# 範例一:最常被提及的組織名稱
# 這類分析能幫助了解新聞報導的焦點與商業趨勢
# 對於輿情監測、競爭分析等應用場景非常有價值
# -----------------------------------------------------------------------------
print("=== 最常被提及的組織 (ORG) ===")

# 展開實體欄位並篩選組織類型
org_entities = documents_df_optimized.select(
    explode(col('entities')).alias('entity')
).filter(
    col('entity.label') == 'ORG'
).select(
    col('entity.text').alias('organization')
)

# 統計各組織出現次數並排序
org_frequency = org_entities.groupBy('organization').count().orderBy(
    col('count').desc()
)

org_frequency.show(15)

# -----------------------------------------------------------------------------
# 範例二:最常被提及的人物
# 識別新聞中的關鍵人物,有助於追蹤重要人士的新聞曝光度
# -----------------------------------------------------------------------------
print("\n=== 最常被提及的人物 (PERSON) ===")

person_entities = documents_df_optimized.select(
    explode(col('entities')).alias('entity')
).filter(
    col('entity.label') == 'PERSON'
).select(
    col('entity.text').alias('person')
)

person_frequency = person_entities.groupBy('person').count().orderBy(
    col('count').desc()
)

person_frequency.show(15)

# -----------------------------------------------------------------------------
# 範例三:地理位置分布
# 分析新聞報導的地理焦點,了解哪些地區受到較多關注
# GPE 代表地理政治實體,通常是國家、城市或地區名稱
# -----------------------------------------------------------------------------
print("\n=== 地理位置分布 (GPE) ===")

location_entities = documents_df_optimized.select(
    explode(col('entities')).alias('entity')
).filter(
    col('entity.label') == 'GPE'
).select(
    col('entity.text').alias('location')
)

location_frequency = location_entities.groupBy('location').count().orderBy(
    col('count').desc()
)

location_frequency.show(15)

# -----------------------------------------------------------------------------
# 範例四:輸出為其他格式供外部系統使用
# JSON 格式具有良好的可讀性與跨語言相容性
# 適合作為 API 回傳格式或匯入到 NoSQL 資料庫
# -----------------------------------------------------------------------------
JSON_OUTPUT_PATH = "s3a://nlp-demo/ag_dataset/processed/entities_result.json"

# 選擇需要的欄位並輸出為 JSON
documents_df_optimized.select(
    'description', 'entities'
).write.json(
    JSON_OUTPUT_PATH,
    mode="overwrite"
)

print(f"\nJSON 格式結果已儲存至:{JSON_OUTPUT_PATH}")

生產化佈署與自動化排程

完成開發與測試後,下一步是將自然語言處理流程佈署到生產環境中持續運作。生產化佈署涉及多個面向,包括工作排程、監控告警、錯誤處理與日誌記錄等。Databricks 提供了 Jobs 功能,能將 Notebook 或程式碼封裝為可排程執行的任務。透過工作排程,可實現資料處理流程的自動化,例如每日定時處理新進資料,或在上游資料更新時觸發處理任務,大幅減少人工介入的需求。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100

title 生產化佈署架構

|觸發機制|
start
split
    :排程觸發;
    note right
        使用 Cron Expression
        例如每日凌晨執行
    end note
split again
    :事件觸發;
    note right
        檔案到達時觸發
        使用 Delta Live Tables
    end note
split again
    :手動觸發;
    note right
        透過 API 或 UI
        按需執行任務
    end note
end split

|Databricks Workflows|
:參數驗證;
note right
    檢查輸入路徑
    驗證必要欄位
end note
:資料載入;
:NER 處理;
:結果儲存;
:品質檢查;
note right
    驗證輸出資料筆數
    計算品質指標
end note

|監控與通知|
fork
    :執行日誌;
fork again
    :效能指標;
fork again
    :告警通知;
    note right
        失敗時發送通知
        支援 Email、Slack
    end note
end fork
stop

@enduml

建立生產工作流程時,建議將處理邏輯封裝為可參數化的 Notebook。透過參數化設計,同一份程式碼可處理不同輸入來源或輸出目標,提高程式碼重用性。例如同一個命名實體識別 Notebook 可用於處理每日新聞資料、歷史資料回補或不同來源的資料集,只需在執行時傳入不同參數即可。Databricks 提供了 dbutils.widgets 函式來定義互動式參數,以及 getArgument 函式來取得 Jobs 執行時傳入的參數值。

# =============================================================================
# 生產化 NER 處理 Notebook
# 設計為可參數化執行,支援 Databricks Jobs 排程
# =============================================================================

# -----------------------------------------------------------------------------
# 定義執行參數
# 這些參數可以在 Jobs 設定中指定,也可以在互動式執行時手動輸入
# dbutils.widgets 提供互動式參數輸入介面
# getArgument 則用於取得 Jobs 傳入的參數值
# -----------------------------------------------------------------------------

# 定義參數與預設值
# 輸入路徑:待處理資料的位置
INPUT_PATH = getArgument("input_path", "s3a://nlp-demo/ag_dataset/train.csv")

# 輸出路徑:處理結果的儲存位置
OUTPUT_PATH = getArgument("output_path", "s3a://nlp-demo/ag_dataset/processed/entities")

# 模型名稱:使用的 SpaCy 模型
MODEL_NAME = getArgument("model_name", "en_core_web_lg")

# 處理日期:用於資料分區與日誌記錄
PROCESS_DATE = getArgument("process_date", "2025-11-28")

# -----------------------------------------------------------------------------
# 記錄執行參數(用於追蹤與除錯)
# -----------------------------------------------------------------------------
print("=" * 60)
print("NER 處理任務開始")
print("=" * 60)
print(f"執行時間:{PROCESS_DATE}")
print(f"輸入路徑:{INPUT_PATH}")
print(f"輸出路徑:{OUTPUT_PATH}")
print(f"使用模型:{MODEL_NAME}")
print("=" * 60)

# -----------------------------------------------------------------------------
# 匯入必要函式庫
# -----------------------------------------------------------------------------
import spacy
import pandas as pd
import time
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType
from typing import Iterator

# -----------------------------------------------------------------------------
# 載入資料
# -----------------------------------------------------------------------------
print("\n[Step 1/5] 載入資料...")
start_time = time.time()

df = spark.read.format('csv').options(
    header='true',
    inferSchema='true',
    quote='"',
    escape='"'
).load(INPUT_PATH)

# 資料驗證:確保必要欄位存在
required_columns = ['description']
for col_name in required_columns:
    if col_name not in df.columns:
        raise ValueError(f"缺少必要欄位:{col_name}")

# 資料清理
df_cleaned = df.filter(
    (col("description").isNotNull()) &
    (col("description") != "")
)

input_count = df_cleaned.count()
print(f"載入資料筆數:{input_count:,}")
print(f"耗時:{time.time() - start_time:.2f} 秒")

# -----------------------------------------------------------------------------
# 定義 NER UDF
# -----------------------------------------------------------------------------
print("\n[Step 2/5] 初始化 NER 模型...")

entity_schema = ArrayType(StructType([
    StructField("text", StringType(), False),
    StructField("start_char", IntegerType(), False),
    StructField("end_char", IntegerType(), False),
    StructField("label", StringType(), False)
]))

@pandas_udf(entity_schema)
def extract_entities_production(
    text_iterator: Iterator[pd.Series]
) -> Iterator[pd.Series]:
    """
    生產環境用的 NER UDF
    包含完整的錯誤處理與日誌記錄
    """
    # 載入指定的模型
    nlp = spacy.load(MODEL_NAME)
    batch_num = 0
    
    for text_series in text_iterator:
        batch_num += 1
        texts = text_series.fillna('').astype(str).tolist()
        
        results = []
        for doc in nlp.pipe(texts, batch_size=64):
            entities = [
                [ent.text, ent.start_char, ent.end_char, ent.label_]
                for ent in doc.ents
            ]
            results.append(entities)
        
        yield pd.Series(results)

print("NER 模型初始化完成")

# -----------------------------------------------------------------------------
# 執行 NER 處理
# -----------------------------------------------------------------------------
print("\n[Step 3/5] 執行 NER 處理...")
start_time = time.time()

# 優化分區數量
df_partitioned = df_cleaned.repartition(96)

# 執行 NER
documents_df = df_partitioned.withColumn(
    'entities',
    extract_entities_production(col('description'))
)

# 快取結果(若需要多次使用)
documents_df.cache()

# 觸發實際運算
output_count = documents_df.count()
processing_time = time.time() - start_time

print(f"處理完成")
print(f"輸出資料筆數:{output_count:,}")
print(f"處理耗時:{processing_time:.2f} 秒")
print(f"處理速度:{output_count/processing_time:.0f} 筆/秒")

# -----------------------------------------------------------------------------
# 儲存結果
# -----------------------------------------------------------------------------
print("\n[Step 4/5] 儲存結果...")
start_time = time.time()

# 依日期分區儲存,便於後續管理
output_path_with_date = f"{OUTPUT_PATH}/date={PROCESS_DATE}"

documents_df.write.parquet(
    output_path_with_date,
    mode="overwrite",
    compression="snappy"
)

print(f"結果已儲存至:{output_path_with_date}")
print(f"儲存耗時:{time.time() - start_time:.2f} 秒")

# -----------------------------------------------------------------------------
# 品質檢查
# -----------------------------------------------------------------------------
print("\n[Step 5/5] 執行品質檢查...")

# 驗證儲存的資料
stored_df = spark.read.parquet(output_path_with_date)
stored_count = stored_df.count()

# 檢查資料完整性
if stored_count != output_count:
    raise ValueError(f"資料筆數不符!預期 {output_count},實際 {stored_count}")

# 計算品質指標
from pyspark.sql.functions import size

entity_stats = stored_df.withColumn(
    'entity_count', size(col('entities'))
).agg({
    'entity_count': 'avg',
    'entity_count': 'sum'
}).collect()[0]

print(f"品質檢查通過!")
print(f"儲存資料筆數:{stored_count:,}")

# -----------------------------------------------------------------------------
# 執行摘要
# -----------------------------------------------------------------------------
print("\n" + "=" * 60)
print("NER 處理任務完成")
print("=" * 60)
print(f"輸入筆數:{input_count:,}")
print(f"輸出筆數:{output_count:,}")
print(f"總耗時:{time.time() - start_time:.2f} 秒")
print("=" * 60)

建立 Databricks Jobs 的步驟是在 Workflows 頁面點選建立工作,指定工作名稱、選擇要執行的 Notebook、設定參數值、選擇叢集配置並設定排程時間。Databricks 支援多種排程選項,包括 Cron 表達式、固定間隔或手動觸發。Cron 表達式提供了最大彈性,可精確控制工作執行時間,例如每天凌晨兩點執行或每週一上午九點執行。工作設定中的重試策略與告警通知也是重要配置項目,重試策略定義了當工作失敗時是否自動重試,告警通知則確保相關人員能及時得知工作執行狀態,這些設定有助於建立穩健的生產系統。

結語:持續演進的技術實踐

隨著大型語言模型與生成式人工智慧的快速發展,自然語言處理技術正經歷前所未有的變革。傳統的命名實體識別、文字分類等任務,現在可透過大型語言模型以更簡潔直觀的方式實現。然而這並不意味著本文介紹的技術已經過時,對於需要處理海量文字資料的企業應用,基於 SpaCy 與 PySpark 的分散式處理方案仍然具有獨特優勢。

相較於大型語言模型,傳統自然語言處理模型的運算成本較低,推論速度更快,更適合大規模批次處理場景。同時傳統模型的行為更可預測,更容易進行錯誤分析與除錯,對於需要高可靠性與可解釋性的生產系統而言是重要考量。在許多應用場景中,結合傳統方法與大型語言模型的混合架構可能是最佳選擇,利用各自優勢達成最佳效能與成本平衡,例如使用傳統 NER 進行初步實體識別,再用大型語言模型進行實體消歧與關係擷取。

SpaCy 的 en_core_web_lg 模型在一般硬體上每秒可處理數百筆文字,而大型語言模型的處理速度通常慢上數十甚至數百倍。在需要處理數百萬筆文字的場景中,這個效能差異會轉化為巨大的時間與成本差異。傳統模型的另一個優勢是行為的可預測性與可解釋性。SpaCy 的模型經過大量測試與驗證,其行為在不同輸入下相對一致。開發者可以透過分析錯誤案例來了解模型的限制,並針對性地進行改進。相較之下,大型語言模型的輸出可能因為提示詞的微小變化而產生顯著差異,這種不確定性在需要高可靠性的生產系統中可能造成問題。

展望未來,自然語言處理技術將朝向多模態整合、知識增強與人機協作等方向發展。多模態處理能同時分析文字、圖像、語音等不同形式資料,提供更全面的資訊理解能力。知識增強技術將外部知識庫與語言模型結合,提升模型對特定領域的理解能力。人機協作則強調人類專家與人工智慧系統的配合,在保持人類判斷力的同時利用機器的處理效率。Databricks 等統一資料平台將在這個演進過程中扮演重要角色,提供整合多種資料類型與處理技術的基礎設施。持續學習與實踐是掌握自然語言處理技術的關鍵,期望本文介紹的技術實踐能為開發者與資料科學家提供有價值的參考。