在現代資料工程中,建立高效、可靠的 ETL (Extract, Transform, Load) 管線是核心任務之一。Apache Airflow 和 Apache NiFi 作為開源社群中最受歡迎的兩款工具,分別代表了兩種截然不同的設計哲學。本文將以一個統一的實戰案例——「將 PostgreSQL 的使用者資料匯入 Elasticsearch」——來深入比較這兩種工具,幫助您理解它們的核心差異,並為您的專案做出最佳的技術選型。

統一任務:從 PostgreSQL 到 Elasticsearch

我們的目標是建立一個自動化管線,完成以下三個步驟:

  1. Extract: 從 PostgreSQL 資料庫的 users 表中提取資料。
  2. Transform: 將提取出的表格式資料轉換為 Elasticsearch 所需的 JSON 格式。
  3. Load: 將轉換後的 JSON 資料載入到 Elasticsearch 的 users 索引中。

方案一:Airflow - 程式碼即管線 (Pipeline as Code)

Airflow 的核心哲學是將工作流程定義為 Python 程式碼,這使其具備極高的靈活性、可擴展性,並且易於進行版本控制。

1. 任務拆分

我們將整個 ETL 流程拆分為三個獨立的 Python 函式,每個函式對應一個 Airflow 任務。它們之間透過 Airflow 的 XComs (跨任務通訊機制) 來傳遞資料。

# functions/etl.py

def extract_from_postgres(**kwargs):
    # 使用 psycopg2 或 SQLAlchemy 連接到 PostgreSQL
    # 查詢資料並以 Pandas DataFrame 格式回傳
    df = pd.read_sql("SELECT id, name, email FROM users", conn)
    return df

def transform_to_json(**kwargs):
    # 從 XComs 獲取上一步的 DataFrame
    df = kwargs['ti'].xcom_pull(task_ids='extract_task')
    # 轉換為 JSON 字串
    json_data = df.to_json(orient='records')
    return json_data

def load_to_elasticsearch(**kwargs):
    # 從 XComs 獲取上一步的 JSON 資料
    json_data = kwargs['ti'].xcom_pull(task_ids='transform_task')
    es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    # ... 執行批次匯入 ...

2. 編寫 DAG

接著,我們編寫一個 DAG 檔案來編排這三個任務。

# dags/postgres_to_es_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from functions.etl import extract_from_postgres, transform_to_json, load_to_elasticsearch

with DAG(...) as dag:
    extract_task = PythonOperator(
        task_id='extract_from_postgres',
        python_callable=extract_from_postgres
    )
    transform_task = PythonOperator(
        task_id='transform_to_json',
        python_callable=transform_to_json
    )
    load_task = PythonOperator(
        task_id='load_to_elasticsearch',
        python_callable=load_to_elasticsearch
    )
    
    extract_task >> transform_task >> load_task
caption="圖表一:Airflow ETL 管線活動圖。此活動圖展示了 Airflow DAG 如何依序執行三個任務來完成 ETL 流程。"
alt="一個展示 Airflow ETL 流程的活動圖。流程包含三個任務:Extract 任務從資料庫提取資料並透過 XComs 輸出 DataFrame;Transform 任務接收 DataFrame 並輸出 JSON 字串;Load 任務接收 JSON 字串並載入到目標系統。"
PlantUML 圖表

方案二:NiFi - 視覺化流程編排 (Visual Flow-Based Programming)

NiFi 提供了圖形化使用者介面 (GUI),讓使用者可以透過拖放「處理器」(Processor) 並將它們連接起來的方式,快速構建資料流。

1. 核心處理器

要完成相同的 ETL 任務,我們需要使用以下幾個核心的 NiFi 處理器:

  • QueryDatabaseRecord: 連接到 PostgreSQL 並執行 SQL 查詢,輸出 Avro 格式的資料。
  • ConvertRecord: 將資料格式從 Avro 轉換為 JSON。
  • SplitJson: (可選) 如果需要,可以將一個包含多個記錄的 JSON 陣列拆分為多個獨立的 JSON 物件。
  • PutElasticsearchHttp: 將傳入的 FlowFile (包含 JSON 資料) 寫入 Elasticsearch。

2. 搭建資料流

在 NiFi 的畫布上,我們將這些處理器拖曳出來,並用滑鼠將它們連接起來,形成一個視覺化的資料管線。每個處理器的詳細設定(如資料庫連線資訊、SQL 查詢語句、Elasticsearch 主機位址等)都在其屬性視窗中以圖形化方式完成。

caption="圖表二:NiFi ETL 資料流組件圖。此組件圖展示了在 NiFi 中實現相同 ETL 任務的視覺化流程。"
alt="一個展示 NiFi ETL 資料流的組件圖。資料從 PostgreSQL 資料庫流向 NiFi Flow。在 NiFi Flow 中,資料依次經過 QueryDatabaseRecord、ConvertRecord 和 PutElasticsearchHttp 處理器,最終載入到 Elasticsearch 資料庫。"
PlantUML 圖表

Airflow vs. NiFi:如何選擇?

特性Apache AirflowApache NiFi
核心哲學程式碼即管線 (Pipeline as Code)視覺化流程編排 (Flow-Based Programming)
開發模式編寫 Python DAG 檔案,適合開發者拖放處理器,設定屬性,適合資料分析師與維運人員
複雜轉換極度靈活,可執行任何 Python 程式碼依賴現有處理器,複雜邏輯需開發自定義處理器
排程能力強大且靈活的排程與時間驅動強調即時、事件驅動的資料流
版本控制極佳,DAG 檔案可直接納入 Git 管理較弱,需匯出流程範本 (XML/JSON) 進行管理
適用場景複雜的、批次處理為主的 ETL/ELT、ML 管線即時資料擷取、路由、簡單轉換、資料注入

結論:

  • 如果您的團隊以開發者為主,重視版本控制,且需要處理複雜的業務邏輯轉換Airflow 是更佳的選擇。
  • 如果您的團隊需要快速搭建和調整資料流,處理即時資料注入,且團隊成員更偏好視覺化操作NiFi 會更有效率。

兩種工具並非互斥,在複雜的資料平台中,它們甚至可以協同工作:例如,使用 NiFi 負責從各種來源即時擷取資料並將其落地到資料湖,然後由 Airflow 負責對資料湖中的資料進行複雜的、定時的批次處理和分析。