在資料驅動的應用程式開發中,建構穩健且高效的資料管道至關重要。本文將以 Apache NiFi 為核心,示範如何從 SeeClickFix API 抓取資料,利用 Jython 處理 HTTP 請求和 JSON 回應,並將資料轉換後載入至 Elasticsearch,最後透過 Kibana 進行視覺化呈現。過程中將使用 GenerateFlowFile 處理器啟動資料流程,並以 ExecuteScript 處理器執行 Jython 指令碼,實作客製化的資料轉換邏輯,例如新增地理坐標欄位、日期格式調整等。同時,我們也會探討如何使用 Great Expectations 進行資料驗證,確保資料品質與一致性,提升資料管道的可靠性。

啟動資料管道

在前面的章節中,我提到需要觸發資料管道的啟動。記住,這個管道將連線到一個 API 端點,為了做到這一點,NiFi 的 HTTP 端點呼叫處理器以及您將使用的 ExecuteScript 處理器都需要一個傳入的 flowfile 來啟動它們。這個處理器不能是資料管道中的第一個處理器。

建立 311 資料管道

為了啟動資料管道,您將使用 GenerateFlowFile 處理器。將處理器拖曳到畫布上,然後雙擊它來更改組態。在設定標籤中,命名處理器。我將其命名為「Start Flow Fake Data」,這讓我們知道這個處理器只會傳送假資料來啟動流程。組態將使用所有預設值,並且看起來像以下的截圖:

最後,在排程標籤中,設定處理器在您想要的間隔內執行。我使用 8,因為我不想讓 API 過載。

當處理器執行時,它將生成一個單一的 flowfile,具有 0 個位元組的資料。它是空的,但它包含由玄貓生成的元資料。然而,這個空的 flowfile 將會啟動下一個處理器。那是工作開始的地方。

查詢 SeeClickFix

在前面的 NiFi 範例中,您沒有使用任何程式碼,只使用組態來使處理器做您需要的事情。現在是引入使用 Python - Jython - 在管道中的好時機。

將 ExecuteScript 處理器拖曳到畫布上,然後雙擊它來編輯組態。從設定標籤開始,命名它以便您知道它查詢 SeeClickFix。我將其命名為「Query SCF」。在屬性標籤中,設定 Script Engine 為 Python。在 Script Body 引數中,您將撰寫 Python 程式碼,該程式碼將由處理器執行。查詢步驟如下:

1.您需要匯入所需的函式庫。以下程式碼是您始終需要包含的函式庫:

from java.nio.charset import StandardCharsets

2.接下來,您將建立一個類別,該類別將被呼叫來處理工作。process 函式將包含執行任務的程式碼:

class ModJSON(StreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream, outputStream):
        # 工作任務在此

3.最後,假設沒有錯誤發生,然後檢查是否有一個 flowfile。如果有一個,則寫入 flowfile 並呼叫類別。接下來,檢查是否發生錯誤。如果發生錯誤,則將 flowfile 傳送到失敗關係,否則將其傳送到成功關係:

errorOccurred = False
flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, ModJSON())
    # flowFile = session.putAttribute(flowFile)
if (errorOccurred):
    session.transfer(flowFile, REL_FAILURE)
else:
    session.transfer(flowFile, REL_SUCCESS)

前面的程式碼是任何 Python ExecuteScript 處理器的範本。您唯一需要更改的是 process 函式中的程式碼,我們將在下一步中進行。

由於 NiFi 使用 Jython,您可以將許多 Python 函式庫新增到 Jython 環境中,但這超出了本章的範圍。目前,您將使用標準函式庫。

4.為了呼叫 SeeClickFix API,您需要匯入 urllib 函式庫。

import urllib.request

您可以使用以下程式碼來呼叫 API:

url = "https://example.com/api/endpoint"
request = urllib.request.Request(url)
response = urllib.request.urlopen(request)
data = response.read()

您需要將這段程式碼新增到 process 函式中,以便從 API 中讀取資料。

  flowchart TD
    A[啟動資料管道] --> B[查詢 SeeClickFix]
    B --> C[呼叫 API]
    C --> D[讀取資料]
    D --> E[處理資料]
    E --> F[傳送資料]

圖表翻譯:

此圖表展示了資料管道的流程。首先,資料管道被啟動,然後查詢 SeeClickFix。接下來,API 被呼叫,然後讀取資料。資料被處理,然後傳送到下一個步驟。

內容解密:

以上程式碼展示瞭如何使用 Python 在 NiFi 中查詢 SeeClickFix API。首先,匯入所需的函式庫,然後建立一個類別來處理工作。process 函式包含執行任務的程式碼。您需要將呼叫 API 的程式碼新增到 process 函式中,以便從 API 中讀取資料。

處理 HTTP 請求和 JSON 回應

在此章節中,我們將探討如何使用 Python 處理 HTTP 請求和 JSON 回應。這涉及到使用 urlliburllib2 模組傳送 HTTP 請求,以及使用 json 模組解析 JSON 回應。

傳送 HTTP 請求

首先,我們需要匯入必要的模組,包括 urlliburllib2json。然後,我們定義了一個引數字典 param,其中包含了我們想要傳送的引數。在這個例子中,我們設定了 place_urlper_page 引數。

import urllib
import urllib2
import json

param = {'place_url':'bernalillo-county', 'per_page':'100'}

接下來,我們使用 urllib.urlencode() 函式將引數字典編碼為 URL 查詢字串。這個字串將被追加到 URL 的末尾,以形成最終的請求 URL。

query_string = urllib.urlencode(param)

處理 HTTP 回應

現在,我們可以使用 urllib2.urlopen() 函式傳送 HTTP 請求,並讀取回應內容。

url = "https://example.com/api/endpoint"  # 請求 URL
raw_reply = urllib2.urlopen(url).read()

然後,我們使用 json.loads() 函式解析 JSON 回應內容。

reply = json.loads(raw_reply)

寫入回應內容

最後,我們使用 outputStream.write() 函式將 JSON 回應內容寫入輸出流中。這裡,我們使用 bytearray() 函式將 JSON 字串轉換為 byte 陣列,並設定編碼為 UTF-8。

outputStream.write(bytearray(json.dumps(reply, indent=4).encode('utf-8')))

錯誤處理

如果發生錯誤,我們可以使用 try-except 塊捕捉異常,並設定 errorOccurred 變數為 True,以觸發錯誤處理流程。

try:
    # ...
except:
    global errorOccurred
    errorOccurred = True

內容解密:

在這個例子中,我們使用了 urlliburllib2 模組傳送 HTTP 請求,並使用 json 模組解析 JSON 回應。然後,我們將 JSON 回應內容寫入輸出流中,並進行錯誤處理。

圖表翻譯:

  flowchart TD
    A[傳送 HTTP 請求] --> B[解析 JSON 回應]
    B --> C[寫入回應內容]
    C --> D[錯誤處理]

此圖表顯示了 HTTP 請求和 JSON 回應的處理流程,包括傳送請求、解析回應、寫入內容和錯誤處理。

建立資料管道

為了將資料從原始來源轉換成 Elasticsearch 可以接受的格式,我們需要建立一個資料管道。這個管道將會負責抓取資料、轉換資料格式,並將資料送到 Elasticsearch。

步驟1:抓取資料

首先,我們需要抓取資料。假設我們想要抓取某個縣市的 100 個議題,我們可以使用 API 來抓取這些資料。抓取到的資料會包含一些 metadata 和一個議題的陣列。

步驟2:轉換資料格式

接下來,我們需要將抓取到的資料轉換成 Elasticsearch 可以接受的格式。為了做到這一點,我們需要使用一些處理器(processor)來轉換資料。

使用 SplitJson 處理器

首先,我們需要使用 SplitJson 處理器來將抓取到的資料分割成單個的議題。這個處理器會將議題的陣列分割成單個的議題,並將每個議題轉換成一個新的 flowfile。

使用 ExecuteScript 處理器

接下來,我們需要使用 ExecuteScript 處理器來新增坐標資料。這個處理器會使用 Python 指令碼來轉換資料。指令碼會將 input stream 轉換成字串,然後使用 json 函式來載入 json 資料。接下來,指令碼會新增一個新的欄位叫做 coords,並將其值設為經度和緯度的字串。最後,指令碼會將轉換好的 json 資料寫回 output stream。

def process(self, inputStream, outputStream):
    try:
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        reply = json.loads(text)
        reply['coords'] = str(reply['lat']) + ',' + str(reply['lng'])
        d = reply['created_at'].split('T')
        reply['opendate'] = d[0]
        outputStream.write(bytearray(json.dumps(reply, indent=4).encode('utf-8')))
    except:
        global errorOccurred
        errorOccurred = True
        outputStream.write(bytearray(json.dumps(reply, indent=4).encode('utf-8')))

步驟3:將資料送到 Elasticsearch

最後,我們需要將轉換好的資料送到 Elasticsearch。這個步驟可以使用 Elasticsearch 的 API 來完成。

使用 Elasticsearch API

我們可以使用 Elasticsearch 的 API 來將資料送到 Elasticsearch。這個 API 會將資料轉換成 Elasticsearch 的格式,並將其儲存到 Elasticsearch 的索引中。

import requests

def send_to_elasticsearch(data):
    url = 'https://your-elasticsearch-url.com/index/_doc'
    headers = {'Content-Type': 'application/json'}
    response = requests.post(url, headers=headers, data=json.dumps(data))
    if response.status_code == 201:
        print('資料送到 Elasticsearch 成功')
    else:
        print('資料送到 Elasticsearch 失敗')

Elasticsearch 資料載入與處理

資料準備

為了將資料載入 Elasticsearch,我們需要進行一些資料準備工作。首先,我們需要為每個 issue 建立一個唯一的 ID。這可以透過使用 EvaluateJsonPath 處理器來實作。

{
  "id": "$.id"
}

Elasticsearch 輸出

接下來,我們需要將資料輸出到 Elasticsearch。這可以透過使用 PutElasticsearchHttp 處理器來實作。

{
  "Elasticsearch URL": "http://localhost:9200",
  "Index": "scf",
  "Type": "doc",
  "Index Operation": "upsert"
}

資料抓取

為了抓取所有的資料,我們需要使用 ExecuteScript 處理器來實作。這個處理器可以用於執行 Python 指令碼,從而抓取所有的資料。

import json
import urllib2

def process(session, inputStream):
    text = inputStream.read()
    asjson = json.loads(text)
    if asjson['metadata']['pagination']['page'] <= asjson['metadata']['pagination']['pages']:
        url = asjson['metadata']['pagination']['next_page_url']
        rawreply = urllib2.urlopen(url).read()
        session.write(rawreply)
    else:
        session.transfer(inputStream, 'STOP')

資料載入

現在,我們可以將資料載入 Elasticsearch。這可以透過使用 PutElasticsearchHttp 處理器來實作。

{
  "Elasticsearch URL": "http://localhost:9200",
  "Index": "scf",
  "Type": "doc",
  "Index Operation": "upsert"
}

結果

經過以上步驟,我們可以將所有的資料載入 Elasticsearch。現在,我們可以使用 Elasticsearch 的查詢功能來查詢資料。

curl -X GET 'http://localhost:9200/scf/_search' -H 'Content-Type: application/json' -d '{"query": {"match_all": {}}}'

這個查詢會傳回所有的資料。現在,我們可以使用 Elasticsearch 的功能來進行資料分析和查詢。

建立資料管線和Kibana儀錶板

在上一章中,我們已經建立了一個資料管線,從SeeClickFix提取資料並將其儲存到Elasticsearch中。現在,我們將建立一個Kibana儀錶板來視覺化這些資料。

建立Kibana儀錶板

首先,開啟Kibana並瀏覽到管理頁面。點選左側工具列底部的管理圖示,然後選擇「Create new Index Pattern」。這將允許我們建立一個新的索引模式,以便Kibana可以理解我們的Elasticsearch資料。

接下來,選擇我們的Elasticsearch索引,並設定索引模式。這將告訴Kibana如何處理我們的資料。

設定儀錶板

建立索引模式後,我們可以開始建立儀錶板。點選左側工具列中的「Dashboard」圖示,然後選擇「Create new Dashboard」。這將開啟一個新的儀錶板頁面。

在這裡,我們可以新增各種視覺化元件,例如圖表、表格和地圖,以展示我們的資料。例如,我們可以新增一個地圖元件來顯示SeeClickFix的問題位置。

新增視覺化元件

要新增視覺化元件,點選右上角的「Add」按鈕,然後選擇所需的元件型別。例如,我們可以選擇「Map」元件來顯示問題位置。

接下來,設定元件的設定,例如選擇資料來源和設定地圖的範圍。這將允許我們看到問題的位置和分佈。

儀錶板範例

以下是建立的儀錶板範例:

  • 地圖元件:顯示SeeClickFix的問題位置
  • 條形圖元件:顯示問題型別的分佈
  • 表格元件:顯示問題的詳細資料

這些元件可以根據需要進行自訂,以提供更好的資料視覺化。

圖表翻譯:

此圖表顯示了從SeeClickFix提取資料、儲存到Elasticsearch、視覺化到Kibana儀錶板的過程。最終,儀錶板提供了資料分析的結果。

建立 Kibana 儀錶板

在本節中,我們將學習如何使用 Kibana 建立一個儀錶板,以視覺化呈現我們的 311 資料。首先,我們需要建立一個索引模式(index pattern)。

建立索引模式

要建立索引模式,請按照以下步驟進行:

  1. 點選 Kibana 的工具欄中的「索引模式」圖示。
  2. 點選「建立索引模式」按鈕。
  3. 輸入索引模式的名稱,例如 scf*
  4. 選擇時間過濾欄位,例如 created_at

建立視覺化

接下來,我們需要建立一些視覺化,以展示我們的資料。Kibana 提供了多種視覺化型別,包括條形圖、折線圖、餅圖等。

  1. 點選 Kibana 的工具欄中的「視覺化」圖示。
  2. 點選「建立視覺化」按鈕。
  3. 選擇視覺化型別,例如「垂直條形圖」。
  4. 組態視覺化的設定,例如選擇 x 軸和 y 軸的欄位。

建立儀錶板

現在,我們可以建立一個儀錶板,以展示我們的視覺化。

  1. 點選 Kibana 的工具欄中的「儀錶板」圖示。
  2. 點選「建立儀錶板」按鈕。
  3. 新增視覺化到儀錶板中。
  4. 組態儀錶板的設定,例如選擇時間過濾欄位。

篩選資料

最後,我們可以使用篩選器來篩選我們的資料。

  1. 點選 Kibana 的工具欄中的「篩選器」圖示。
  2. 選擇篩選器型別,例如「時間篩選器」。
  3. 組態篩選器的設定,例如選擇時間範圍。

以下是建立 Kibana 儀錶板的步驟總結:

  • 建立索引模式
  • 建立視覺化
  • 建立儀錶板
  • 篩選資料

透過這些步驟,我們可以建立一個強大的 Kibana 儀錶板,以視覺化呈現我們的 311 資料。

建立生產級資料管線

在前面的章節中,我們學習瞭如何建立資料管線並將其佈署到生產環境中。然而,生產級資料管線需要更多的功能和考量,以確保資料的正確性和可靠性。在本章中,我們將探討建立生產級資料管線的重要性和相關的技術。

資料管線的特點

一個生產級資料管線應該具備以下特點:

  • idempotent:資料管線應該可以多次執行而不改變結果。
  • atomicity:如果交易失敗,資料管線應該可以回復到之前的狀態。
  • 資料驗證:資料管線應該可以驗證資料的正確性和完整性。

資料分段和驗證

在生產級資料管線中,資料分段和驗證是非常重要的步驟。資料分段是指將資料分成小塊,以便於處理和驗證。驗證是指檢查資料的正確性和完整性,以確保資料的品質。

建立idempotent資料管線

建立idempotent資料管線需要考慮以下幾點:

  • 資料處理:資料處理應該是可以多次執行而不改變結果的。
  • 資料儲存:資料儲存應該是可以多次執行而不改變結果的。

建立atomic資料管線

建立atomic資料管線需要考慮以下幾點:

  • 交易管理:交易管理應該可以回復到之前的狀態,如果交易失敗。
  • 資料儲存:資料儲存應該可以回復到之前的狀態,如果交易失敗。

資料驗證

資料驗證是指檢查資料的正確性和完整性,以確保資料的品質。資料驗證可以在資料管線中的各個階段進行,例如資料提取、資料轉換和資料儲存。

範例程式碼

以下是使用Python和Apache NiFi建立一個生產級資料管線的範例程式碼:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 建立SparkSession
spark = SparkSession.builder.appName("Production Data Pipeline").getOrCreate()

# 讀取資料
data = spark.read.csv("data.csv", header=True, inferSchema=True)

# 資料轉換
data = data.withColumn("date", col("date").cast("timestamp"))

# 資料儲存
data.write.parquet("data.parquet")

# 資料驗證
data = spark.read.parquet("data.parquet")
data.count()

這個範例程式碼使用Apache Spark和Apache NiFi建立一個生產級資料管線,包括資料讀取、資料轉換、資料儲存和資料驗證。

資料管道中的資料分段和驗證

資料分段是資料管道中的一個重要步驟,尤其是在處理交易性資料函式庫時。交易性資料函式庫中的資料是經常變動的,若不進行分段,可能會導致資料的不一致性和錯誤。

資料分段的好處

資料分段可以提供以下好處:

  • 資料的一致性:透過分段,可以確保資料的一致性,即使資料函式庫中的資料發生變化,也可以透過分段來保持資料的一致性。
  • 錯誤的處理:分段可以幫助處理錯誤,若某個步驟出現錯誤,可以透過分段來重新執行該步驟,而不需要重新執行整個資料管道。
  • 資料的驗證:分段可以幫助驗證資料,透過分段可以對資料進行驗證,確保資料的正確性和完整性。

資料分段的實作

資料分段可以透過以下步驟實作:

  1. 資料的抽取:從資料函式庫中抽取資料。
  2. 資料的分段:將抽取的資料分段成小塊,例如,將資料分段成檔案或資料表。
  3. 資料的轉換:對分段的資料進行轉換,例如,將資料從一個格式轉換成另一個格式。
  4. 資料的載入:將轉換的資料載入到目標資料函式庫中。

Great Expectations 的使用

Great Expectations 是一個 Python 函式庫,用於驗證資料。它提供了一個簡單的方式來定義資料的期望,例如,資料中的某個欄位不應該為空。

以下是 Great Expectations 的使用步驟:

  1. 安裝 Great Expectations:使用 pip 安裝 Great Expectations。
  2. 定義資料的期望:使用 Great Expectations 的 API 定義資料的期望,例如,資料中的某個欄位不應該為空。
  3. 驗證資料:使用 Great Expectations 驗證資料,確保資料符合定義的期望。

資料驗證的重要性

資料驗證是資料管道中的一個重要步驟,透過驗證可以確保資料的正確性和完整性,避免錯誤的發生。

以下是資料驗證的重要性:

  • 確保資料的正確性:驗證可以確保資料的正確性,避免錯誤的發生。
  • 確保資料的完整性:驗證可以確保資料的完整性,避免資料的丟失或損壞。
  • 提高資料的品質:驗證可以提高資料的品質,確保資料的可靠性和可用性。

初始化Great Expectations專案

要初始化Great Expectations專案,首先需要安裝必要的套件,包括jupyter。可以使用以下命令安裝:

pip3 install jupyter

安裝完成後,建立一個目錄用於專案,並切換到該目錄:

mkdir $HOME/peoplepipeline
cd $HOME/peoplepipeline

接下來,生成一個示例資料檔案people.csv,包含1000條與人相關的記錄。使用以下程式碼:

from faker import Faker
import csv

output = open('people.csv', 'w')
fake = Faker()
header = ['name', 'age', 'street', 'city', 'state', 'zip', 'lng', 'lat']

mywriter = csv.writer(output)
mywriter.writerow(header)

for r in range(1000):
    mywriter.writerow([fake.name(), fake.random_int(min=18, max=80, step=1), fake.street_address(), fake.city(), fake.state(), fake.zipcode(), fake.longitude(), fake.latitude()])

output.close()

這個程式碼建立了一個CSV檔案,包含人們的基本資訊。

初始化Great Expectations

現在,可以初始化Great Expectations專案了。使用以下命令:

great_expectations init

Great Expectations會提示您是否準備好繼續。輸入Y並按Enter鍵後,Great Expectations會問您一系列問題,包括:

  • 您想讓Great Expectations連線到哪些資料?
  • 您正在使用什麼來處理檔案?
  • 輸入資料檔案的路徑(相對或絕對)。
  • 為新的期望套件命名。

回答這些問題後,Great Expectations會生成一個檔案,包含11個期望。這些期望包括記錄數量、欄位名稱和順序,以及年齡範圍等基本資訊。

期望生成

Great Expectations生成的期望包括:

  • 記錄數量:1000條記錄。
  • 欄位名稱和順序:nameagestreetcitystateziplnglat
  • 年齡範圍:年齡必須大於17歲,少於81歲。

這些期望可以用於驗證資料的品質和一致性。

內容解密:

Great Expectations是一個強大的工具,用於驗證和監控資料品質。透過初始化Great Expectations專案和生成期望,可以確保資料的一致性和品質。這對於資料驅動的應用程式和商業決策至關重要。

圖表翻譯:

  graph LR
    A[初始化Great Expectations] --> B[生成示例資料]
    B --> C[初始化Great Expectations專案]
    C --> D[生成期望]
    D --> E[驗證資料品質]

這個流程圖展示了初始化Great Expectations專案、生成示例資料、初始化Great Expectations專案、生成期望和驗證資料品質的過程。

從資料管道啟動、資料抓取、格式轉換到最終載入 Elasticsearch 並以 Kibana 儀錶板呈現的完整流程來看,本文提供了一個建構 311 資料視覺化平臺的實用。透過 NiFi 的視覺化介面搭配 Python 指令碼的彈性,有效簡化了資料處理流程,尤其是 SplitJson 和 ExecuteScript 處理器的運用,展現了 ETL 流程的效率。然而,文章對於錯誤處理機制著墨不多,僅在程式碼片段中簡略提及 errorOccurred 變數,缺乏實際的錯誤處理策略說明,這在生產環境中是至關重要的。此外,雖然提到了資料分段和驗證的重要性,並引入了 Great Expectations 這個工具,但並未深入探討如何整合至 NiFi 流程中,也缺乏實際應用案例的說明,這對於讀者理解其價值和應用方式造成一定的困難。展望未來,除了完善錯誤處理和資料驗證機制外,可以進一步探討如何提升管線的效能和可擴充套件性,例如引入更進階的 NiFi 功能,或是整合 Spark 等分散式運算框架,以應對更大規模的資料處理需求。玄貓認為,隨著資料量的增長和應用場景的複雜化,建構更強健、更具彈性的資料管線將是未來發展的關鍵。