在現代資料工程中,建立高效、可靠的 ETL (Extract, Transform, Load) 管線是核心任務之一。Apache Airflow 和 Apache NiFi 作為開源社群中最受歡迎的兩款工具,分別代表了兩種截然不同的設計哲學。本文將以一個統一的實戰案例——「將 PostgreSQL 的使用者資料匯入 Elasticsearch」——來深入比較這兩種工具,幫助您理解它們的核心差異,並為您的專案做出最佳的技術選型。
統一任務:從 PostgreSQL 到 Elasticsearch
我們的目標是建立一個自動化管線,完成以下三個步驟:
- Extract: 從 PostgreSQL 資料庫的
users
表中提取資料。 - Transform: 將提取出的表格式資料轉換為 Elasticsearch 所需的 JSON 格式。
- Load: 將轉換後的 JSON 資料載入到 Elasticsearch 的
users
索引中。
方案一:Airflow - 程式碼即管線 (Pipeline as Code)
Airflow 的核心哲學是將工作流程定義為 Python 程式碼,這使其具備極高的靈活性、可擴展性,並且易於進行版本控制。
1. 任務拆分
我們將整個 ETL 流程拆分為三個獨立的 Python 函式,每個函式對應一個 Airflow 任務。它們之間透過 Airflow 的 XComs (跨任務通訊機制) 來傳遞資料。
# functions/etl.py
def extract_from_postgres(**kwargs):
# 使用 psycopg2 或 SQLAlchemy 連接到 PostgreSQL
# 查詢資料並以 Pandas DataFrame 格式回傳
df = pd.read_sql("SELECT id, name, email FROM users", conn)
return df
def transform_to_json(**kwargs):
# 從 XComs 獲取上一步的 DataFrame
df = kwargs['ti'].xcom_pull(task_ids='extract_task')
# 轉換為 JSON 字串
json_data = df.to_json(orient='records')
return json_data
def load_to_elasticsearch(**kwargs):
# 從 XComs 獲取上一步的 JSON 資料
json_data = kwargs['ti'].xcom_pull(task_ids='transform_task')
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
# ... 執行批次匯入 ...
2. 編寫 DAG
接著,我們編寫一個 DAG 檔案來編排這三個任務。
# dags/postgres_to_es_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from functions.etl import extract_from_postgres, transform_to_json, load_to_elasticsearch
with DAG(...) as dag:
extract_task = PythonOperator(
task_id='extract_from_postgres',
python_callable=extract_from_postgres
)
transform_task = PythonOperator(
task_id='transform_to_json',
python_callable=transform_to_json
)
load_task = PythonOperator(
task_id='load_to_elasticsearch',
python_callable=load_to_elasticsearch
)
extract_task >> transform_task >> load_task
caption="圖表一:Airflow ETL 管線活動圖。此活動圖展示了 Airflow DAG 如何依序執行三個任務來完成 ETL 流程。"
alt="一個展示 Airflow ETL 流程的活動圖。流程包含三個任務:Extract 任務從資料庫提取資料並透過 XComs 輸出 DataFrame;Transform 任務接收 DataFrame 並輸出 JSON 字串;Load 任務接收 JSON 字串並載入到目標系統。"
方案二:NiFi - 視覺化流程編排 (Visual Flow-Based Programming)
NiFi 提供了圖形化使用者介面 (GUI),讓使用者可以透過拖放「處理器」(Processor) 並將它們連接起來的方式,快速構建資料流。
1. 核心處理器
要完成相同的 ETL 任務,我們需要使用以下幾個核心的 NiFi 處理器:
QueryDatabaseRecord
: 連接到 PostgreSQL 並執行 SQL 查詢,輸出 Avro 格式的資料。ConvertRecord
: 將資料格式從 Avro 轉換為 JSON。SplitJson
: (可選) 如果需要,可以將一個包含多個記錄的 JSON 陣列拆分為多個獨立的 JSON 物件。PutElasticsearchHttp
: 將傳入的 FlowFile (包含 JSON 資料) 寫入 Elasticsearch。
2. 搭建資料流
在 NiFi 的畫布上,我們將這些處理器拖曳出來,並用滑鼠將它們連接起來,形成一個視覺化的資料管線。每個處理器的詳細設定(如資料庫連線資訊、SQL 查詢語句、Elasticsearch 主機位址等)都在其屬性視窗中以圖形化方式完成。
caption="圖表二:NiFi ETL 資料流組件圖。此組件圖展示了在 NiFi 中實現相同 ETL 任務的視覺化流程。"
alt="一個展示 NiFi ETL 資料流的組件圖。資料從 PostgreSQL 資料庫流向 NiFi Flow。在 NiFi Flow 中,資料依次經過 QueryDatabaseRecord、ConvertRecord 和 PutElasticsearchHttp 處理器,最終載入到 Elasticsearch 資料庫。"
Airflow vs. NiFi:如何選擇?
特性 | Apache Airflow | Apache NiFi |
---|---|---|
核心哲學 | 程式碼即管線 (Pipeline as Code) | 視覺化流程編排 (Flow-Based Programming) |
開發模式 | 編寫 Python DAG 檔案,適合開發者 | 拖放處理器,設定屬性,適合資料分析師與維運人員 |
複雜轉換 | 極度靈活,可執行任何 Python 程式碼 | 依賴現有處理器,複雜邏輯需開發自定義處理器 |
排程能力 | 強大且靈活的排程與時間驅動 | 強調即時、事件驅動的資料流 |
版本控制 | 極佳,DAG 檔案可直接納入 Git 管理 | 較弱,需匯出流程範本 (XML/JSON) 進行管理 |
適用場景 | 複雜的、批次處理為主的 ETL/ELT、ML 管線 | 即時資料擷取、路由、簡單轉換、資料注入 |
結論:
- 如果您的團隊以開發者為主,重視版本控制,且需要處理複雜的業務邏輯轉換,Airflow 是更佳的選擇。
- 如果您的團隊需要快速搭建和調整資料流,處理即時資料注入,且團隊成員更偏好視覺化操作,NiFi 會更有效率。
兩種工具並非互斥,在複雜的資料平台中,它們甚至可以協同工作:例如,使用 NiFi 負責從各種來源即時擷取資料並將其落地到資料湖,然後由 Airflow 負責對資料湖中的資料進行複雜的、定時的批次處理和分析。