在當今資料驅動的時代,有效率地處理和管理資料至關重要。Apache NiFi 作為一個強大的資料流管理工具,提供視覺化介面和豐富的處理器,讓使用者能輕鬆建構、監控和調整資料流程。本文將引導您完成 NiFi 的安裝和設定,並示範如何使用它來處理不同格式的資料,並與其他資料函式庫和工具整合,建立完整的資料工程基礎設施。我們將會深入探討 NiFi 的核心概念,例如處理器、連線和關係,以及如何組態這些元件以滿足您的特定需求。此外,我們還會介紹如何使用 Python 的 csvjson 模組讀寫檔案,以及如何使用 Faker 函式庫生成測試資料。

安裝Apache NiFi

要安裝Apache NiFi,您需要從官網下載它。您可以使用以下命令下載NiFi:

wget https://downloads.apache.org/nifi/1.12.1/nifi-1.12.1-bin.tar.gz

然後,您需要解壓NiFi檔案:

tar xvzf nifi-1.12.1-bin.tar.gz

這將建立一個名為nifi-1.12.1的資料夾。您可以使用以下命令執行NiFi:

bin/nifi.sh start

如果您已經安裝和組態了Java,您可以使用以下命令檢視NiFi的狀態:

sudo bin/nifi.sh status

如果您沒有看到JAVA_HOME設定,您可能需要安裝Java:

sudo apt install openjdk-11-jre-headless

然後,您需要編輯.bash_profile檔案以包含以下行:

export JAVA_HOME=/usr/lib/jvm/java11-openjdk-amd64

最後,您需要重新載入.bash_profile檔案:

source .bash_profile

當您執行NiFi的狀態命令時,您應該會看到JAVA_HOME的路徑。

設定Apache NiFi

當NiFi準備就緒後,您可以開啟網頁瀏覽器並前往http://localhost:8080/nifi。您將看到NiFi的GUI介面。在後續章節中,您將學習到NiFi的許多可用組態,但目前,您只需要更改NiFi執行的埠。在conf/nifi.properties檔案中,您可以找到以下行:

nifi.web.http.port=8080

您可以將其更改為您想要的埠號。例如:

nifi.web.http.port=8081

然後,您需要重新啟動NiFi:

bin/nifi.sh restart

NiFi現在將執行在新的埠號上。

內容解密:

在這個過程中,我們安裝和組態了Apache NiFi。NiFi是一種強大的工具,用於建立資料工程管道。它允許您使用預先構建的處理器來構建資料管道,您可以根據自己的需求進行組態。透過這個過程,您現在可以使用NiFi建立自己的資料管道。

圖表翻譯:

  graph LR
    A[下載NiFi] --> B[解壓NiFi]
    B --> C[執行NiFi]
    C --> D[設定JAVA_HOME]
    D --> E[重新載入.bash_profile]
    E --> F[執行NiFi]
    F --> G[設定NiFi]
    G --> H[重新啟動NiFi]

這個圖表展示了安裝和組態Apache NiFi的過程。從下載NiFi開始,然後解壓、執行、設定JAVA_HOME、重新載入.bash_profile,最後執行和設定NiFi。

Apache NiFi 安裝與設定

Apache NiFi 是一個強大的資料處理工具,能夠幫助您管理和處理大量的資料。在本節中,我們將介紹如何安裝和設定 Apache NiFi。

修改 NiFi 設定

首先,您需要修改 NiFi 的設定檔,以便將 NiFi 的 Web 介面設定為 9300 埠。您可以在 nifi.properties 檔案中找到這個設定。

# web properties #
nifi.web.http.port=9300

如果您的防火牆是開啟的,您可能需要開啟 9300 埠,以便存取 NiFi 的 Web 介面。

sudo ufw allow 9300/tcp

NiFi 介面概覽

當您第一次啟動 NiFi 時,NiFi 的 Web 介面將是空白的,因為您尚未新增任何處理器或處理器群組。在 NiFi 的 Web 介面中,您可以看到工具列和狀態列。工具列提供了建立資料流程所需的工具,而狀態列則提供了 NiFi 例項的目前狀態。

建立資料流程

NiFi 的資料流程是由處理器、連線和關係組成的。NiFi 提供了超過 100 個處理器,供您使用。您可以使用工具列中的處理器工具來新增處理器到您的資料流程中。

新增處理器

使用工具列中的處理器工具,您可以新增處理器到您的資料流程中。例如,您可以新增 GenerateFlowFile 處理器來建立 FlowFiles。

  graph LR
    A[GenerateFlowFile] --> B[PutFile]

組態處理器

新增處理器後,您需要組態它。您可以雙擊處理器或右鍵點選並選擇屬性來組態它。

  graph LR
    A[GenerateFlowFile] -->|組態|> B[屬性]

建立連線

建立連線是將處理器連線起來的過程。您可以使用工具列中的連線工具來建立連線。

  graph LR
    A[GenerateFlowFile] -->|連線|> B[PutFile]

執行資料流程

最後,您可以右鍵點選處理器並選擇執行來執行您的資料流程。

  graph LR
    A[GenerateFlowFile] -->|執行|> B[資料流程]

安裝和設定 Apache NiFi 和 Apache Airflow

在本章中,我們將探討如何安裝和設定 Apache NiFi 和 Apache Airflow 這兩個強大的資料工程工具。Apache NiFi 是一個資料流管理工具,允許您建立和管理資料流,而 Apache Airflow 是一個工作流程管理工具,允許您建立和管理工作流程。

安裝和設定 Apache NiFi

首先,我們需要安裝 Apache NiFi。您可以從 Apache NiFi 官方網站下載 NiFi 的安裝包。安裝完成後,請啟動 NiFi 服務。

接下來,我們需要組態 NiFi 的 processor。Processor 是 NiFi 中的核心元件,負責處理資料。您可以建立一個新的 processor 並組態其屬性。

例如,您可以建立一個 PutFile processor 並組態其屬性,如下所示:

# PutFile processor 組態
processor = PutFile()
processor.file_path = '/opt/nifi/output'
processor.file_name = 'output.txt'

這個 processor 會將資料寫入到 /opt/nifi/output 目錄下的 output.txt 檔案中。

安裝和設定 Apache Airflow

接下來,我們需要安裝 Apache Airflow。您可以使用 pip 安裝 Airflow:

pip install 'apache-airflow[postgres,slack,celery]'

安裝完成後,請初始化 Airflow 的資料函式庫:

airflow initdb

Airflow 的預設資料函式庫是 SQLite,但您可以更改為其他資料函式庫,如 PostgreSQL。

組態 Airflow 的資料函式庫

如果您想要使用 PostgreSQL 作為 Airflow 的資料函式庫,您需要下載 PostgreSQL 的 JDBC 驅動程式。您可以從 PostgreSQL 官方網站下載驅動程式。

接下來,您需要組態 Airflow 的 airflow.cfg 檔案,指定資料函式庫的連線設定:

# airflow.cfg 組態
[core]
sql_alchemy_conn = postgresql+psycopg2://username:password@host:port/dbname

這個設定指定了 Airflow 的資料函式庫連線設定。

內容解密:
  • Apache NiFi 是一個資料流管理工具,允許您建立和管理資料流。
  • Apache Airflow 是一個工作流程管理工具,允許您建立和管理工作流程。
  • NiFi 的 processor 是核心元件,負責處理資料。
  • Airflow 的資料函式庫可以更改為其他資料函式庫,如 PostgreSQL。
  • 您需要下載 PostgreSQL 的 JDBC 驅動程式並組態 Airflow 的 airflow.cfg 檔案。

圖表翻譯:

  graph LR
    A[Apache NiFi] --> B[Processor]
    B --> C[PutFile]
    C --> D[File]
    E[Apache Airflow] --> F[Database]
    F --> G[PostgreSQL]
    G --> H[JDBC Driver]

這個圖表展示了 Apache NiFi 和 Apache Airflow 的關係,以及如何組態資料函式庫和驅動程式。

安裝和設定 Apache Airflow

Apache Airflow 是一個強大的工作流程管理系統,允許您定義和管理複雜的資料處理流程。在本節中,我們將介紹如何安裝和設定 Apache Airflow。

安裝 Airflow

首先,您需要安裝 Airflow。您可以使用 pip 安裝 Airflow:

pip install apache-airflow

初始化資料函式庫

安裝完成後,您需要初始化 Airflow 的資料函式庫。您可以使用以下命令初始化資料函式庫:

airflow initdb

啟動 Web 伺服器

初始化資料函式庫後,您可以啟動 Airflow 的 Web 伺服器。您可以使用以下命令啟動 Web 伺服器:

airflow webserver

啟動排程器

啟動 Web 伺服器後,您需要啟動 Airflow 的排程器。您可以使用以下命令啟動排程器:

airflow scheduler

移除範例 DAG

Airflow 預設會安裝一些範例 DAG。如果您不需要這些範例 DAG,您可以移除它們。您可以編輯 airflow.cfg 檔案,將 load_examples 引數設為 False

load_examples = False

然後,您需要重置資料函式庫:

airflow resetdb

重新啟動 Web 伺服器

重置資料函式庫後,您需要重新啟動 Web 伺服器:

airflow webserver

現在,Airflow 已經安裝和設定完成。您可以使用 Airflow 的 Web 介面來管理您的 DAG。

DAG 的基本概念

DAG(Directed Acyclic Graph)是 Airflow 的核心概念。DAG 是一個有向非迴圈圖,代表了一個工作流程。每個節點在圖中代表了一個任務,邊緣代表了任務之間的依賴關係。

DAG 的生命週期

DAG 的生命週期包括以下幾個階段:

  1. 建立: DAG 被建立和定義。
  2. 觸發: DAG 被觸發,開始執行。
  3. 執行: DAG 的任務被執行。
  4. 完成: DAG 的任務完成,DAG 結束。

DAG 的管理

Airflow 提供了一個 Web 介面來管理 DAG。您可以使用 Web 介面來建立、觸發、監控和管理您的 DAG。

DAG 的圖形化

Airflow 提供了一個圖形化的介面來顯示 DAG 的結構。您可以使用圖形化的介面來檢視 DAG 的任務和依賴關係。

DAG 的觸發

您可以使用 Web 介面來觸發 DAG。您可以設定 DAG 的觸發條件,例如時間或事件。

DAG 的監控

您可以使用 Web 介面來監控 DAG 的執行狀態。您可以檢視 DAG 的任務執行狀態、日誌和其他相關資訊。

安裝和設定Elasticsearch

Elasticsearch是一種搜尋引擎,在本章中,您將使用它作為NoSQL資料函式庫。您將在Elasticsearch和其他位置之間移動資料。要下載Elasticsearch,請按照以下步驟:

  1. 使用curl下載檔案,如下所示:
curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.0-darwin-x86_64.tar.gz
  1. 使用以下命令解壓檔案:
tar xvzf elasticsearch.tar.gz
  1. 您可以編輯config/elasticsearch.yml檔案以命名您的節點和叢集。稍後在本章中,您將設定一個具有多個節點的Elasticsearch叢集。目前,我已經更改了以下屬性:
cluster.name: DataEngineeringWithPython
node.name: OnlyNode

建立資料工程基礎設施

在本章中,我們將探討如何建立資料工程基礎設施,包括安裝和組態Elasticsearch、Kibana、PostgreSQL和pgAdmin 4。

安裝和組態Elasticsearch和Kibana

Elasticsearch是一個開源的搜尋和分析引擎,而Kibana則是一個根據Web的介面,用於探索和視覺化Elasticsearch中的資料。安裝和組態Elasticsearch和Kibana後,我們可以使用Kibana的Discover功能來探索資料,建立視覺化和儀錶板。

建立視覺化和儀錶板

Kibana提供了多種視覺化工具,包括條形圖、餅圖、熱圖和地圖等。使用這些工具,我們可以建立多種視覺化和儀錶板,以展示資料的趨勢和模式。

安裝和組態PostgreSQL和pgAdmin 4

PostgreSQL是一個開源的關係型資料函式庫,而pgAdmin 4則是一個根據Web的介面,用於管理PostgreSQL資料函式庫。安裝和組態PostgreSQL和pgAdmin 4後,我們可以使用pgAdmin 4來建立和管理資料函式庫,包括建立表格、插入資料和執行查詢。

管理PostgreSQL資料函式庫

使用pgAdmin 4,我們可以輕鬆地管理PostgreSQL資料函式庫,包括建立和刪除表格、插入和更新資料、以及執行查詢和分析。

結合Elasticsearch和PostgreSQL

透過結合Elasticsearch和PostgreSQL,我們可以建立一個強大的資料工程基礎設施,用於支援多種資料分析和應用。

以下是使用Python、Rust和Mojo混合設計的程式碼範例:

# 使用Python和Rust混合設計
from rust_io import read_sensors  # Rust資料採集
from mojo_compute import transform_data  # Mojo計算
from transformers import pipeline  # Python和Hugging Face

# 混合處理流程: Rust採集 -> Mojo處理 -> Python推理
device_data = read_sensors("MEDICAL_DEVICE")  # Rust部分
processed_data = transform_data(device_data)  # Mojo部分
anomaly_result = pipeline("anomaly-detection", model="medical/transformer")(processed_data)  # Python+HF部分

這個範例展示瞭如何使用Python、Rust和Mojo混合設計來處理資料,包括資料採集、計算和推理。

安裝和組態pgAdmin 4

pgAdmin 4是一個強大的PostgreSQL管理工具,提供了圖形化的介面來管理您的資料函式庫。要安裝pgAdmin 4,您需要先下載並安裝PostgreSQL資料函式倉管理系統。安裝完成後,您可以啟動pgAdmin 4並登入到系統。

登入pgAdmin 4

啟動pgAdmin 4後,您會看到登入介面。輸入您剛剛建立的使用者名稱和密碼,即可登入到系統。

新增新伺服器

登入後,您會看到一個儀錶板,左側有一個伺服器圖示。因為目前尚未組態任何伺服器,因此您需要新增剛剛安裝的PostgreSQL例項。點選新增新伺服器圖示,然後輸入您的PostgreSQL例項的相關資訊。

管理資料函式庫

新增伺服器後,您可以展開伺服器圖示,檢視您建立的資料函式庫。您可以右擊資料函式庫,新增新表格、索引、觸發器等。

建立表格

要建立表格,您需要右擊資料函式庫,選擇「建立」>「表格」。然後,您可以輸入表格名稱、欄位名稱和欄位型別。

填充表格資料

要填充表格資料,您可以使用Python的faker函式庫,生成假資料。您需要安裝faker函式庫,然後使用它來生成資料。

Python中的檔案讀寫

Python提供了多種方式來讀寫檔案,包括CSV、JSON等格式。

寫入CSV檔案

要寫入CSV檔案,您需要使用Python的csv函式庫。您可以使用csv函式庫的writer函式來寫入CSV檔案。

讀取CSV檔案

要讀取CSV檔案,您需要使用Python的csv函式庫。您可以使用csv函式庫的reader函式來讀取CSV檔案。

寫入JSON檔案

要寫入JSON檔案,您需要使用Python的json函式庫。您可以使用json函式庫的dump函式來寫入JSON檔案。

讀取JSON檔案

要讀取JSON檔案,您需要使用Python的json函式庫。您可以使用json函式庫的load函式來讀取JSON檔案。

Apache NiFi和Apache Airflow

Apache NiFi和Apache Airflow是兩個強大的資料工程工具,提供了多種方式來處理和管理資料。

Apache NiFi

Apache NiFi是一個資料流管理工具,提供了多種方式來處理和管理資料。您可以使用NiFi的processor來處理資料,包括讀取和寫入檔案。

Apache Airflow

Apache Airflow是一個工作流程管理工具,提供了多種方式來管理和執行工作流程。您可以使用Airflow的operator來執行任務,包括讀取和寫入檔案。

使用 Python 寫入 CSV 檔案

Python 的 csv 模組提供了一個方便的方式來寫入 CSV 檔案。以下是基本的步驟:

建立 CSV 寫入器

首先,需要建立一個 CSV 寫入器。這可以透過 csv.writer() 函式來完成。這個函式需要一個檔案物件作為引數。

import csv

output = open('myCSV.CSV', 'w')
mywriter = csv.writer(output)

寫入標題

如果你想在 CSV 檔案中包含標題,需要使用 writerow() 函式來寫入標題。

header = ['name', 'age']
mywriter.writerow(header)

寫入資料

現在,你可以使用 writerow() 函式來寫入資料。

data = ['Bob Smith', 40]
mywriter.writerow(data)

關閉檔案

最後,需要關閉檔案以確保資料被正確寫入。

output.close()

使用 Faker 生成資料

如果你想生成大量的資料,可以使用 Faker 函式庫來生成假資料。

from faker import Faker
import csv

fake = Faker()
output = open('data.CSV', 'w')
mywriter = csv.writer(output)

for _ in range(1000):
    name = fake.name()
    age = fake.random_int(min=18, max=100)
    mywriter.writerow([name, age])

output.close()

內容解密:

在上面的例子中,我們使用 csv.writer() 函式來建立一個 CSV 寫入器。然後,我們使用 writerow() 函式來寫入標題和資料。最後,我們關閉檔案以確保資料被正確寫入。

圖表翻譯:

  flowchart TD
    A[建立 CSV 寫入器] --> B[寫入標題]
    B --> C[寫入資料]
    C --> D[關閉檔案]

這個圖表展示了寫入 CSV 檔案的基本步驟。首先,建立一個 CSV 寫入器。然後,寫入標題和資料。最後,關閉檔案。

讀寫 CSV 檔案

CSV(Comma Separated Values)檔案是一種常用的資料儲存格式,廣泛用於資料交換和儲存。Python 提供了 csv 模組來讀寫 CSV 檔案。

從資料工程基礎建設的建置流程來看,本文涵蓋了Apache NiFi、Apache Airflow、Elasticsearch、Kibana、PostgreSQL 以及 pgAdmin 4 等關鍵工具的安裝與設定。透過循序漸進的步驟說明,讀者能逐步掌握建構完整資料處理管線的技巧,從資料採集、轉換、儲存到視覺化分析,各環節都有明確的指引。然而,文章並未深入探討各工具間的整合方式以及實際應用場景,例如如何利用 NiFi 串接 PostgreSQL 與 Elasticsearch,或是如何運用 Airflow Orchestration 整合不同工具排程複雜的工作流程。

考量資料工程的複雜性與技術的快速迭代,未來發展趨勢將更著重於自動化佈署與管理、跨平臺整合以及雲原生架構的應用。預期 Infrastructure as Code、容器化技術以及 Serverless Computing 將扮演更重要的角色,簡化佈署流程並提升系統的可擴充套件性。此外,隨著機器學習和人工智慧的興起,資料工程也將與這些領域更緊密結合,例如利用 Airflow 管理機器學習模型的訓練和佈署流程。

對於初學者而言,建議先專注於理解各工具的核心功能,並透過實作練習逐步熟悉操作流程。在累積一定經驗後,可進一步探索不同工具的整合應用,並關注前述的技術發展趨勢,以提升自身在資料工程領域的專業能力。玄貓認為,掌握這些關鍵工具與技術,將有助於讀者在資料驅動的時代中,更有效地擷取資料價值,並將其轉化為商業洞察力。