在資料驅動的應用程式開發中,建構穩健且高效的資料管道至關重要。本文將以 Apache NiFi 為核心,示範如何從 SeeClickFix API 抓取資料,利用 Jython 處理 HTTP 請求和 JSON 回應,並將資料轉換後載入至 Elasticsearch,最後透過 Kibana 進行視覺化呈現。過程中將使用 GenerateFlowFile 處理器啟動資料流程,並以 ExecuteScript 處理器執行 Jython 指令碼,實作客製化的資料轉換邏輯,例如新增地理坐標欄位、日期格式調整等。同時,我們也會探討如何使用 Great Expectations 進行資料驗證,確保資料品質與一致性,提升資料管道的可靠性。
啟動資料管道
在前面的章節中,我提到需要觸發資料管道的啟動。記住,這個管道將連線到一個 API 端點,為了做到這一點,NiFi 的 HTTP 端點呼叫處理器以及您將使用的 ExecuteScript 處理器都需要一個傳入的 flowfile 來啟動它們。這個處理器不能是資料管道中的第一個處理器。
建立 311 資料管道
為了啟動資料管道,您將使用 GenerateFlowFile 處理器。將處理器拖曳到畫布上,然後雙擊它來更改組態。在設定標籤中,命名處理器。我將其命名為「Start Flow Fake Data」,這讓我們知道這個處理器只會傳送假資料來啟動流程。組態將使用所有預設值,並且看起來像以下的截圖:
最後,在排程標籤中,設定處理器在您想要的間隔內執行。我使用 8,因為我不想讓 API 過載。
當處理器執行時,它將生成一個單一的 flowfile,具有 0 個位元組的資料。它是空的,但它包含由玄貓生成的元資料。然而,這個空的 flowfile 將會啟動下一個處理器。那是工作開始的地方。
查詢 SeeClickFix
在前面的 NiFi 範例中,您沒有使用任何程式碼,只使用組態來使處理器做您需要的事情。現在是引入使用 Python - Jython - 在管道中的好時機。
將 ExecuteScript 處理器拖曳到畫布上,然後雙擊它來編輯組態。從設定標籤開始,命名它以便您知道它查詢 SeeClickFix。我將其命名為「Query SCF」。在屬性標籤中,設定 Script Engine 為 Python。在 Script Body 引數中,您將撰寫 Python 程式碼,該程式碼將由處理器執行。查詢步驟如下:
1.您需要匯入所需的函式庫。以下程式碼是您始終需要包含的函式庫:
from java.nio.charset import StandardCharsets
2.接下來,您將建立一個類別,該類別將被呼叫來處理工作。process 函式將包含執行任務的程式碼:
class ModJSON(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
# 工作任務在此
3.最後,假設沒有錯誤發生,然後檢查是否有一個 flowfile。如果有一個,則寫入 flowfile 並呼叫類別。接下來,檢查是否發生錯誤。如果發生錯誤,則將 flowfile 傳送到失敗關係,否則將其傳送到成功關係:
errorOccurred = False
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, ModJSON())
# flowFile = session.putAttribute(flowFile)
if (errorOccurred):
session.transfer(flowFile, REL_FAILURE)
else:
session.transfer(flowFile, REL_SUCCESS)
前面的程式碼是任何 Python ExecuteScript 處理器的範本。您唯一需要更改的是 process 函式中的程式碼,我們將在下一步中進行。
由於 NiFi 使用 Jython,您可以將許多 Python 函式庫新增到 Jython 環境中,但這超出了本章的範圍。目前,您將使用標準函式庫。
4.為了呼叫 SeeClickFix API,您需要匯入 urllib 函式庫。
import urllib.request
您可以使用以下程式碼來呼叫 API:
url = "https://example.com/api/endpoint"
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
data = response.read()
您需要將這段程式碼新增到 process 函式中,以便從 API 中讀取資料。
flowchart TD A[啟動資料管道] --> B[查詢 SeeClickFix] B --> C[呼叫 API] C --> D[讀取資料] D --> E[處理資料] E --> F[傳送資料]
圖表翻譯:
此圖表展示了資料管道的流程。首先,資料管道被啟動,然後查詢 SeeClickFix。接下來,API 被呼叫,然後讀取資料。資料被處理,然後傳送到下一個步驟。
內容解密:
以上程式碼展示瞭如何使用 Python 在 NiFi 中查詢 SeeClickFix API。首先,匯入所需的函式庫,然後建立一個類別來處理工作。process 函式包含執行任務的程式碼。您需要將呼叫 API 的程式碼新增到 process 函式中,以便從 API 中讀取資料。
處理 HTTP 請求和 JSON 回應
在此章節中,我們將探討如何使用 Python 處理 HTTP 請求和 JSON 回應。這涉及到使用 urllib
和 urllib2
模組傳送 HTTP 請求,以及使用 json
模組解析 JSON 回應。
傳送 HTTP 請求
首先,我們需要匯入必要的模組,包括 urllib
、urllib2
和 json
。然後,我們定義了一個引數字典 param
,其中包含了我們想要傳送的引數。在這個例子中,我們設定了 place_url
和 per_page
引數。
import urllib
import urllib2
import json
param = {'place_url':'bernalillo-county', 'per_page':'100'}
接下來,我們使用 urllib.urlencode()
函式將引數字典編碼為 URL 查詢字串。這個字串將被追加到 URL 的末尾,以形成最終的請求 URL。
query_string = urllib.urlencode(param)
處理 HTTP 回應
現在,我們可以使用 urllib2.urlopen()
函式傳送 HTTP 請求,並讀取回應內容。
url = "https://example.com/api/endpoint" # 請求 URL
raw_reply = urllib2.urlopen(url).read()
然後,我們使用 json.loads()
函式解析 JSON 回應內容。
reply = json.loads(raw_reply)
寫入回應內容
最後,我們使用 outputStream.write()
函式將 JSON 回應內容寫入輸出流中。這裡,我們使用 bytearray()
函式將 JSON 字串轉換為 byte 陣列,並設定編碼為 UTF-8。
outputStream.write(bytearray(json.dumps(reply, indent=4).encode('utf-8')))
錯誤處理
如果發生錯誤,我們可以使用 try-except 塊捕捉異常,並設定 errorOccurred
變數為 True,以觸發錯誤處理流程。
try:
# ...
except:
global errorOccurred
errorOccurred = True
內容解密:
在這個例子中,我們使用了 urllib
和 urllib2
模組傳送 HTTP 請求,並使用 json
模組解析 JSON 回應。然後,我們將 JSON 回應內容寫入輸出流中,並進行錯誤處理。
圖表翻譯:
flowchart TD A[傳送 HTTP 請求] --> B[解析 JSON 回應] B --> C[寫入回應內容] C --> D[錯誤處理]
此圖表顯示了 HTTP 請求和 JSON 回應的處理流程,包括傳送請求、解析回應、寫入內容和錯誤處理。
建立資料管道
為了將資料從原始來源轉換成 Elasticsearch 可以接受的格式,我們需要建立一個資料管道。這個管道將會負責抓取資料、轉換資料格式,並將資料送到 Elasticsearch。
步驟1:抓取資料
首先,我們需要抓取資料。假設我們想要抓取某個縣市的 100 個議題,我們可以使用 API 來抓取這些資料。抓取到的資料會包含一些 metadata 和一個議題的陣列。
步驟2:轉換資料格式
接下來,我們需要將抓取到的資料轉換成 Elasticsearch 可以接受的格式。為了做到這一點,我們需要使用一些處理器(processor)來轉換資料。
使用 SplitJson 處理器
首先,我們需要使用 SplitJson 處理器來將抓取到的資料分割成單個的議題。這個處理器會將議題的陣列分割成單個的議題,並將每個議題轉換成一個新的 flowfile。
使用 ExecuteScript 處理器
接下來,我們需要使用 ExecuteScript 處理器來新增坐標資料。這個處理器會使用 Python 指令碼來轉換資料。指令碼會將 input stream 轉換成字串,然後使用 json 函式來載入 json 資料。接下來,指令碼會新增一個新的欄位叫做 coords
,並將其值設為經度和緯度的字串。最後,指令碼會將轉換好的 json 資料寫回 output stream。
def process(self, inputStream, outputStream):
try:
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
reply = json.loads(text)
reply['coords'] = str(reply['lat']) + ',' + str(reply['lng'])
d = reply['created_at'].split('T')
reply['opendate'] = d[0]
outputStream.write(bytearray(json.dumps(reply, indent=4).encode('utf-8')))
except:
global errorOccurred
errorOccurred = True
outputStream.write(bytearray(json.dumps(reply, indent=4).encode('utf-8')))
步驟3:將資料送到 Elasticsearch
最後,我們需要將轉換好的資料送到 Elasticsearch。這個步驟可以使用 Elasticsearch 的 API 來完成。
使用 Elasticsearch API
我們可以使用 Elasticsearch 的 API 來將資料送到 Elasticsearch。這個 API 會將資料轉換成 Elasticsearch 的格式,並將其儲存到 Elasticsearch 的索引中。
import requests
def send_to_elasticsearch(data):
url = 'https://your-elasticsearch-url.com/index/_doc'
headers = {'Content-Type': 'application/json'}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 201:
print('資料送到 Elasticsearch 成功')
else:
print('資料送到 Elasticsearch 失敗')
Elasticsearch 資料載入與處理
資料準備
為了將資料載入 Elasticsearch,我們需要進行一些資料準備工作。首先,我們需要為每個 issue 建立一個唯一的 ID。這可以透過使用 EvaluateJsonPath
處理器來實作。
{
"id": "$.id"
}
Elasticsearch 輸出
接下來,我們需要將資料輸出到 Elasticsearch。這可以透過使用 PutElasticsearchHttp
處理器來實作。
{
"Elasticsearch URL": "http://localhost:9200",
"Index": "scf",
"Type": "doc",
"Index Operation": "upsert"
}
資料抓取
為了抓取所有的資料,我們需要使用 ExecuteScript
處理器來實作。這個處理器可以用於執行 Python 指令碼,從而抓取所有的資料。
import json
import urllib2
def process(session, inputStream):
text = inputStream.read()
asjson = json.loads(text)
if asjson['metadata']['pagination']['page'] <= asjson['metadata']['pagination']['pages']:
url = asjson['metadata']['pagination']['next_page_url']
rawreply = urllib2.urlopen(url).read()
session.write(rawreply)
else:
session.transfer(inputStream, 'STOP')
資料載入
現在,我們可以將資料載入 Elasticsearch。這可以透過使用 PutElasticsearchHttp
處理器來實作。
{
"Elasticsearch URL": "http://localhost:9200",
"Index": "scf",
"Type": "doc",
"Index Operation": "upsert"
}
結果
經過以上步驟,我們可以將所有的資料載入 Elasticsearch。現在,我們可以使用 Elasticsearch 的查詢功能來查詢資料。
curl -X GET 'http://localhost:9200/scf/_search' -H 'Content-Type: application/json' -d '{"query": {"match_all": {}}}'
這個查詢會傳回所有的資料。現在,我們可以使用 Elasticsearch 的功能來進行資料分析和查詢。
建立資料管線和Kibana儀錶板
在上一章中,我們已經建立了一個資料管線,從SeeClickFix提取資料並將其儲存到Elasticsearch中。現在,我們將建立一個Kibana儀錶板來視覺化這些資料。
建立Kibana儀錶板
首先,開啟Kibana並瀏覽到管理頁面。點選左側工具列底部的管理圖示,然後選擇「Create new Index Pattern」。這將允許我們建立一個新的索引模式,以便Kibana可以理解我們的Elasticsearch資料。
接下來,選擇我們的Elasticsearch索引,並設定索引模式。這將告訴Kibana如何處理我們的資料。
設定儀錶板
建立索引模式後,我們可以開始建立儀錶板。點選左側工具列中的「Dashboard」圖示,然後選擇「Create new Dashboard」。這將開啟一個新的儀錶板頁面。
在這裡,我們可以新增各種視覺化元件,例如圖表、表格和地圖,以展示我們的資料。例如,我們可以新增一個地圖元件來顯示SeeClickFix的問題位置。
新增視覺化元件
要新增視覺化元件,點選右上角的「Add」按鈕,然後選擇所需的元件型別。例如,我們可以選擇「Map」元件來顯示問題位置。
接下來,設定元件的設定,例如選擇資料來源和設定地圖的範圍。這將允許我們看到問題的位置和分佈。
儀錶板範例
以下是建立的儀錶板範例:
- 地圖元件:顯示SeeClickFix的問題位置
- 條形圖元件:顯示問題型別的分佈
- 表格元件:顯示問題的詳細資料
這些元件可以根據需要進行自訂,以提供更好的資料視覺化。
圖表翻譯:
此圖表顯示了從SeeClickFix提取資料、儲存到Elasticsearch、視覺化到Kibana儀錶板的過程。最終,儀錶板提供了資料分析的結果。
建立 Kibana 儀錶板
在本節中,我們將學習如何使用 Kibana 建立一個儀錶板,以視覺化呈現我們的 311 資料。首先,我們需要建立一個索引模式(index pattern)。
建立索引模式
要建立索引模式,請按照以下步驟進行:
- 點選 Kibana 的工具欄中的「索引模式」圖示。
- 點選「建立索引模式」按鈕。
- 輸入索引模式的名稱,例如
scf*
。 - 選擇時間過濾欄位,例如
created_at
。
建立視覺化
接下來,我們需要建立一些視覺化,以展示我們的資料。Kibana 提供了多種視覺化型別,包括條形圖、折線圖、餅圖等。
- 點選 Kibana 的工具欄中的「視覺化」圖示。
- 點選「建立視覺化」按鈕。
- 選擇視覺化型別,例如「垂直條形圖」。
- 組態視覺化的設定,例如選擇 x 軸和 y 軸的欄位。
建立儀錶板
現在,我們可以建立一個儀錶板,以展示我們的視覺化。
- 點選 Kibana 的工具欄中的「儀錶板」圖示。
- 點選「建立儀錶板」按鈕。
- 新增視覺化到儀錶板中。
- 組態儀錶板的設定,例如選擇時間過濾欄位。
篩選資料
最後,我們可以使用篩選器來篩選我們的資料。
- 點選 Kibana 的工具欄中的「篩選器」圖示。
- 選擇篩選器型別,例如「時間篩選器」。
- 組態篩選器的設定,例如選擇時間範圍。
以下是建立 Kibana 儀錶板的步驟總結:
- 建立索引模式
- 建立視覺化
- 建立儀錶板
- 篩選資料
透過這些步驟,我們可以建立一個強大的 Kibana 儀錶板,以視覺化呈現我們的 311 資料。
建立生產級資料管線
在前面的章節中,我們學習瞭如何建立資料管線並將其佈署到生產環境中。然而,生產級資料管線需要更多的功能和考量,以確保資料的正確性和可靠性。在本章中,我們將探討建立生產級資料管線的重要性和相關的技術。
資料管線的特點
一個生產級資料管線應該具備以下特點:
- idempotent:資料管線應該可以多次執行而不改變結果。
- atomicity:如果交易失敗,資料管線應該可以回復到之前的狀態。
- 資料驗證:資料管線應該可以驗證資料的正確性和完整性。
資料分段和驗證
在生產級資料管線中,資料分段和驗證是非常重要的步驟。資料分段是指將資料分成小塊,以便於處理和驗證。驗證是指檢查資料的正確性和完整性,以確保資料的品質。
建立idempotent資料管線
建立idempotent資料管線需要考慮以下幾點:
- 資料處理:資料處理應該是可以多次執行而不改變結果的。
- 資料儲存:資料儲存應該是可以多次執行而不改變結果的。
建立atomic資料管線
建立atomic資料管線需要考慮以下幾點:
- 交易管理:交易管理應該可以回復到之前的狀態,如果交易失敗。
- 資料儲存:資料儲存應該可以回復到之前的狀態,如果交易失敗。
資料驗證
資料驗證是指檢查資料的正確性和完整性,以確保資料的品質。資料驗證可以在資料管線中的各個階段進行,例如資料提取、資料轉換和資料儲存。
範例程式碼
以下是使用Python和Apache NiFi建立一個生產級資料管線的範例程式碼:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# 建立SparkSession
spark = SparkSession.builder.appName("Production Data Pipeline").getOrCreate()
# 讀取資料
data = spark.read.csv("data.csv", header=True, inferSchema=True)
# 資料轉換
data = data.withColumn("date", col("date").cast("timestamp"))
# 資料儲存
data.write.parquet("data.parquet")
# 資料驗證
data = spark.read.parquet("data.parquet")
data.count()
這個範例程式碼使用Apache Spark和Apache NiFi建立一個生產級資料管線,包括資料讀取、資料轉換、資料儲存和資料驗證。
資料管道中的資料分段和驗證
資料分段是資料管道中的一個重要步驟,尤其是在處理交易性資料函式庫時。交易性資料函式庫中的資料是經常變動的,若不進行分段,可能會導致資料的不一致性和錯誤。
資料分段的好處
資料分段可以提供以下好處:
- 資料的一致性:透過分段,可以確保資料的一致性,即使資料函式庫中的資料發生變化,也可以透過分段來保持資料的一致性。
- 錯誤的處理:分段可以幫助處理錯誤,若某個步驟出現錯誤,可以透過分段來重新執行該步驟,而不需要重新執行整個資料管道。
- 資料的驗證:分段可以幫助驗證資料,透過分段可以對資料進行驗證,確保資料的正確性和完整性。
資料分段的實作
資料分段可以透過以下步驟實作:
- 資料的抽取:從資料函式庫中抽取資料。
- 資料的分段:將抽取的資料分段成小塊,例如,將資料分段成檔案或資料表。
- 資料的轉換:對分段的資料進行轉換,例如,將資料從一個格式轉換成另一個格式。
- 資料的載入:將轉換的資料載入到目標資料函式庫中。
Great Expectations 的使用
Great Expectations 是一個 Python 函式庫,用於驗證資料。它提供了一個簡單的方式來定義資料的期望,例如,資料中的某個欄位不應該為空。
以下是 Great Expectations 的使用步驟:
- 安裝 Great Expectations:使用 pip 安裝 Great Expectations。
- 定義資料的期望:使用 Great Expectations 的 API 定義資料的期望,例如,資料中的某個欄位不應該為空。
- 驗證資料:使用 Great Expectations 驗證資料,確保資料符合定義的期望。
資料驗證的重要性
資料驗證是資料管道中的一個重要步驟,透過驗證可以確保資料的正確性和完整性,避免錯誤的發生。
以下是資料驗證的重要性:
- 確保資料的正確性:驗證可以確保資料的正確性,避免錯誤的發生。
- 確保資料的完整性:驗證可以確保資料的完整性,避免資料的丟失或損壞。
- 提高資料的品質:驗證可以提高資料的品質,確保資料的可靠性和可用性。
初始化Great Expectations專案
要初始化Great Expectations專案,首先需要安裝必要的套件,包括jupyter
。可以使用以下命令安裝:
pip3 install jupyter
安裝完成後,建立一個目錄用於專案,並切換到該目錄:
mkdir $HOME/peoplepipeline
cd $HOME/peoplepipeline
接下來,生成一個示例資料檔案people.csv
,包含1000條與人相關的記錄。使用以下程式碼:
from faker import Faker
import csv
output = open('people.csv', 'w')
fake = Faker()
header = ['name', 'age', 'street', 'city', 'state', 'zip', 'lng', 'lat']
mywriter = csv.writer(output)
mywriter.writerow(header)
for r in range(1000):
mywriter.writerow([fake.name(), fake.random_int(min=18, max=80, step=1), fake.street_address(), fake.city(), fake.state(), fake.zipcode(), fake.longitude(), fake.latitude()])
output.close()
這個程式碼建立了一個CSV檔案,包含人們的基本資訊。
初始化Great Expectations
現在,可以初始化Great Expectations專案了。使用以下命令:
great_expectations init
Great Expectations會提示您是否準備好繼續。輸入Y
並按Enter鍵後,Great Expectations會問您一系列問題,包括:
- 您想讓Great Expectations連線到哪些資料?
- 您正在使用什麼來處理檔案?
- 輸入資料檔案的路徑(相對或絕對)。
- 為新的期望套件命名。
回答這些問題後,Great Expectations會生成一個檔案,包含11個期望。這些期望包括記錄數量、欄位名稱和順序,以及年齡範圍等基本資訊。
期望生成
Great Expectations生成的期望包括:
- 記錄數量:1000條記錄。
- 欄位名稱和順序:
name
、age
、street
、city
、state
、zip
、lng
、lat
。 - 年齡範圍:年齡必須大於17歲,少於81歲。
這些期望可以用於驗證資料的品質和一致性。
內容解密:
Great Expectations是一個強大的工具,用於驗證和監控資料品質。透過初始化Great Expectations專案和生成期望,可以確保資料的一致性和品質。這對於資料驅動的應用程式和商業決策至關重要。
圖表翻譯:
graph LR A[初始化Great Expectations] --> B[生成示例資料] B --> C[初始化Great Expectations專案] C --> D[生成期望] D --> E[驗證資料品質]
這個流程圖展示了初始化Great Expectations專案、生成示例資料、初始化Great Expectations專案、生成期望和驗證資料品質的過程。
從資料管道啟動、資料抓取、格式轉換到最終載入 Elasticsearch 並以 Kibana 儀錶板呈現的完整流程來看,本文提供了一個建構 311 資料視覺化平臺的實用。透過 NiFi 的視覺化介面搭配 Python 指令碼的彈性,有效簡化了資料處理流程,尤其是 SplitJson 和 ExecuteScript 處理器的運用,展現了 ETL 流程的效率。然而,文章對於錯誤處理機制著墨不多,僅在程式碼片段中簡略提及 errorOccurred
變數,缺乏實際的錯誤處理策略說明,這在生產環境中是至關重要的。此外,雖然提到了資料分段和驗證的重要性,並引入了 Great Expectations 這個工具,但並未深入探討如何整合至 NiFi 流程中,也缺乏實際應用案例的說明,這對於讀者理解其價值和應用方式造成一定的困難。展望未來,除了完善錯誤處理和資料驗證機制外,可以進一步探討如何提升管線的效能和可擴充套件性,例如引入更進階的 NiFi 功能,或是整合 Spark 等分散式運算框架,以應對更大規模的資料處理需求。玄貓認為,隨著資料量的增長和應用場景的複雜化,建構更強健、更具彈性的資料管線將是未來發展的關鍵。