串流平台如 Kafka 可外部化資料函式庫的預寫日誌(WAL),將交易資料提供給其他系統,如同次要 OLTP 資料函式庫的資料複製機制。此方法讓原本隱藏的 WAL 公開可用,成為跨組織同步資料函式庫的有效工具。透過訂閱特定主題,系統能建立原始 OLTP 資料函式庫中表格的副本,實作資料同步。文章進一步探討了物化檢視在提升查詢效能方面的作用,以及點選流資料在使用者行為分析中的應用。點選流資料包含使用者在數位平台上的互動記錄,可用於個人化推薦、定向廣告等場景。此外,文章也闡述瞭如何將點選事件資料與客戶和產品資訊結合,以豐富資料的上下文,並透過串流平台 Kafka 進行處理和分析,最終提供更具價值的商業洞察。

串流基礎與資料函式庫外部化

在探討串流資料函式庫之前,我們需要了解串流平台如何與傳統的線上交易處理(OLTP)資料函式庫互動。OLTP資料函式庫在儲存資料到磁碟時,會自然地寫入預寫日誌(WAL)。WAL可用於將資料複製到次要OLTP資料函式庫,以應對災難還原場景。

串流平台與資料函式庫WAL的外部化

串流平台(如Kafka)可以將資料函式庫WAL外部化,使用主題抽象的分割槽來提供原本在WAL中的交易給其他系統。這些系統訂閱主題,以便像次要OLTP資料函式庫一樣建立原始主OLTP資料函式庫中表格的副本。因此,串流平台可用於使先前隱藏在OLTP資料函式庫系統中的WAL公開可用,成為跨組織同步資料函式庫系統的工具。

具體實作與觀念解析

在串流平台中,分割槽可以儲存來自源OLTP資料函式庫的交易,並將這些交易發布給其他系統,以便建立表格的副本。這種方法類別似於使用WAL進行資料複製。

-- 在Postgres中重新整理物化檢視
REFRESH MATERIALIZED VIEW CONCURRENTLY product_sales;

內容解密:

這段程式碼展示瞭如何在Postgres資料函式庫中重新整理物化檢視。REFRESH MATERIALIZED VIEW CONCURRENTLY陳述式用於更新物化檢視中的資料,使其保持最新。CONCURRENTLY關鍵字表示重新整理操作將以非阻塞方式進行,不會鎖定檢視,從而允許在重新整理期間繼續查詢該檢視。

物化檢視

在典型的OLTP資料函式庫中,物化檢視是一種特殊的資料函式庫物件,儲存預先計算的查詢或聚合結果。與虛擬的普通檢視不同,物化檢視實際儲存資料,使其物理儲存在資料函式庫中。

物化檢視的優點與實務應用

物化檢視的目的是透過預先計算和儲存結果來提高複雜查詢或聚合的效能。當查詢參照物化檢視時,資料函式庫可以快速從物化檢視中檢索預先計算的資料,而不必從基礎資料表重新計算。這可以顯著減少查詢執行時間並提高整體資料函式庫效能,尤其是在大型且耗費資源的查詢中。

點選流分析的應用場景

點選流資料是指捕捉使用者在瀏覽網站、應用程式或數位平台時的動作和互動的一系列記錄事件。它提供了使用者線上上會話期間執行的點選、頁面檢視和其他互動的詳細記錄。

點選流分析的實務意義

點選流資料可用於各種目的,如個人化、定向廣告、使用者分段、詐欺檢測和轉換率最佳化。它在網頁分析、行銷分析、使用者經驗研究和其他資料驅動的學科中扮演著至關重要的角色。

串流平台在點選流分析中的角色

在我們的應用場景中,一位居住在紐約Woodstock的24歲男性客戶使用手機應用程式點選了一件綠色T恤。我們的目標是向終端使用者提供點選流資料,以便他們能夠進行分析並得出有助於做出資料驅動決策的洞察。

技術實作與挑戰

我們將從導向使用者的應用程式捕捉的點選事件稱為事件。這些事件最終將進入像Kafka這樣的串流平台,以便我們最終能夠將它們與現有的客戶資訊結合起來。

此圖示展示了點選事件從使用者互動到被分析的整個流程。

事件與交易的理解及其在領域驅動設計中的重要性

在探討即時資料處理和分析的過程中,瞭解交易(transactions)和事件(events)之間的區別至關重要。交易通常指的是源自資料函式庫的變更,如插入(inserts)、更新(updates)和刪除(deletes),這些變更首先被寫入預寫式日誌(WAL),然後被傳輸到串流平台的主題(topic)中。這些交易也可以被稱為變更事件(change events)或簡稱為事件。

領域驅動設計

在軟體開發中,工程師使用與業務領域相關的物件來建模應用程式。例如,如果業務涉及客戶,那麼應用程式中就會有一個代表客戶的物件。這種做法適用於業務領域中的所有物件。

實體與事件

客戶和產品是我們使用案例中的兩個重要物件,它們是領域模型的一部分。這些物件被稱為實體(entities),實體儲存線上上交易處理(OLTP)資料函式庫中,並經歷插入、更新和刪除等變更事件。

另一方面,像點選事件(click events)這樣的事件捕捉了應用程式中實體之間的互動。例如,客戶點選產品,這裡的客戶和產品是物件,而點選動作是事件。這種關係可以用一個句子來描述:主語(客戶)執行動詞(點選)對受詞(產品)。

事件的豐富性

點選事件提供了豐富的資訊,可以擴充套件成更詳細的句子,例如:「具有IP 111.11.1111 的客戶在 2023 年 7 月 21 日東部時間上午 11:20 點選了產品 12345。」然而,這個事件缺乏客戶和產品的詳細資訊,如客戶的名字、位置、年齡,或產品的型別、顏色等。因此,在將點選流事件交付分析之前,需要用客戶和產品資訊來豐富它。

為何點選事件不儲存在資料函式庫中?

一個合理的問題是,為什麼不將點選事件儲存在資料函式庫中?主要原因是 OLTP 資料函式庫可能會因為儲存大量點選事件而耗盡空間。實體變更相對較少,而點選事件是不可變的,並且只會被插入資料函式庫,這是一種追加只(append-only)的模式。因此,使用微服務直接將點選事件寫入串流平台是更好的選擇。

上下文豐富化

所有的分析應用都需要事件發生的上下文。點選事件只包含與點選相關的資訊,而不包含客戶或產品的資訊。通常,實體資訊在點選事件發生時不可用。在實時資料管道下游進行豐富化可以提供額外的資訊,幫助做出更明智的決策。例如,如果知道客戶喜歡綠色襯衫且是二十多歲的男性,這將有助於實作更智慧的決策和個人化應用。

變更資料捕捉

實體變更被寫入 WAL,然後透過變更資料捕捉(change data capture)的過程被傳輸到串流平台的主題中,其他系統可以消費這些變更事件並建立實體的副本,以實作點選事件與產品和客戶資訊的豐富化。

此圖示說明瞭從 OLTP 資料函式庫到串流平台的資料流程,以及如何透過變更資料捕捉和微服務將不同來源的資料匯聚到串流處理器進行豐富化處理。

資料變更捕捉(Change Data Capture, CDC)技術解析

CDC 技術的核心概念

資料變更捕捉(CDC)是一種在資料函式庫和資料整合系統中使用的技術,用於即時捕捉和追蹤資料的變更。其主要目標是識別和捕捉對特定表格所做的任何變更交易(插入、更新或刪除),並將這些變更事件提供給下游系統或流程使用。

CDC 的實作方式

CDC 可以透過多種方式實作,包括:

  1. 監聽 WAL(Write-Ahead Logging):這是捕捉資料函式庫變更的首選方式,能夠即時捕捉變更並以串流方式處理。像是 PostgreSQL 和 MySQL 等關聯式 OLTP 資料函式庫通常採用此方式。

  2. 比較快照(Snapshots):此方法涉及拍攝表格的快照,並與之前的快照進行比較,以過濾出變更。這種方法可能非常消耗處理資源,尤其是在表格龐大的情況下。此外,這種方法並非真正的即時處理,因為快照是按照間隔拍攝的。在間隔之間發生的變更,包括還原變更,可能會丟失。

  3. 比較更新時間戳(Update Timestamps):這種方法儲存最後一批變更的時間戳,並過濾出更新時間戳在之後的記錄。這種方法需要在表格中包含一個更新欄位,每當記錄變更時都需要更新。這個方法也不是即時的。

資料函式庫的 CDC 支援

幸運的是,大多數 OLTP 資料函式庫都有辦法讀取其 WAL。一些 OLTP 資料函式庫也原生支援將事件提交到串流平台或其他系統。例如,CockroachDB 提供了一種方式,可以直接建立從自身到 Kafka、Google Cloud Pub/Sub、Cloud Storage(Amazon S3、Google Cloud Storage、Azure Storage)或 Webhook 的變更饋送(Change Feed)。這種推播機制能夠顯著降低串流資料管線的架構複雜度。

建立變更饋送範例

CREATE CHANGEFEED FOR TABLE customer, product INTO 'kafka://localhost:9092';

內容解密:

  • CREATE CHANGEFEED 是 CockroachDB 用於建立變更饋送的 SQL 陳述式。
  • FOR TABLE customer, product 指定了要監控變更的表格。
  • INTO 'kafka://localhost:9092' 指定了變更事件要推播到的 Kafka 叢集地址。

這種原生支援使得 OLTP 資料函式庫更接近串流資料函式庫。若資料函式庫不具備此功能,則需要額外的元件,稱為聯結器(Connectors),來提取資料並將其發布到串流平台的主題中。

聯結器(Connectors)

在串流處理中,我們區分兩種主要的聯結器:

  1. 源聯結器(Source Connectors):從資料來源系統讀取資料,並將資料提供為事件串流。

  2. 接收聯結器(Sink Connectors):從事件串流中消費資料,並將資料寫入接收系統。

這兩種型別的聯結器如圖 1-8 所示。大多數情況下,源聯結器將靜態資料轉換為串流資料,而接收聯結器則將串流資料轉換為靜態資料。

圖 1-8:源聯結器與接收聯結器示意圖

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title 串流資料函式庫整合與CDC技術解析

package "系統架構" {
    package "前端層" {
        component [使用者介面] as ui
        component [API 客戶端] as client
    }

    package "後端層" {
        component [API 服務] as api
        component [業務邏輯] as logic
        component [資料存取] as dao
    }

    package "資料層" {
        database [主資料庫] as db
        database [快取] as cache
    }
}

ui --> client : 使用者操作
client --> api : HTTP 請求
api --> logic : 處理邏輯
logic --> dao : 資料操作
dao --> db : 持久化
dao --> cache : 快取

note right of api
  RESTful API
  或 GraphQL
end note

@enduml

此圖示說明瞭源聯結器和接收聯結器在串流資料處理中的角色。

靜態資料是指存放在資料函式庫或檔案系統中、不處於移動狀態的資料。靜態資料通常使用批次處理或微批次處理技術進行處理。與此相反,串流資料(或稱移動中的資料)意味著資料沒有明確的開始或結束。處理串流資料的應用程式始終在執行,並監聽串流中的新資料到達。

串流資料處理的中介軟體與聯結器實作

在串流資料處理的架構中,聯結器(Connector)扮演著至關重要的角色,用於在不同的資料來源與目的地之間建立資料流動。目前有多種實作聯結器的方式,包括使用聯結器中介軟體(Connector Middleware)、內建聯結器(Embedded Connector)以及自定義聯結器(Custom-Built Connector)。

聯結器中介軟體

聯結器中介軟體解決方案,如 Kafka Connect、Meroxa、Striim 和 StreamSets,提供了大量的預設聯結器,並且通常具備擴充套件性,能夠支援更多的資料來源和目的地。這些中介軟體還提供了水平擴充套件、監控等功能,尤其是在生產環境的佈署中至關重要。

Kafka Connect 是 Apache Kafka 專案的一部分,作為分散式叢集執行,Kafka 聯結器在其中佈署平行執行。然而,這類別佈署會增加串流架構的複雜性,且維護這些叢集往往十分繁瑣。當資料來源和目的地數量龐大時,這些叢集不僅成本高昂,還會消耗大量資源。因此,將聯結器嵌入系統本身可以更好地解決整合問題。

內建聯結器

越來越多的資料函式庫開始提供內建的串流平台聯結器。例如,CockroachDB 就是一個例子。還有更多資料函式庫實作了內建聯結器,能夠直接從事件流中消費資料,例如 Apache Druid、Apache Pinot、ClickHouse、StarRocks、Apache Doris 和 Rockset。讓資料函式庫直接解決與串流平台的整合問題,能夠使它們更接近成為串流資料函式庫。如果能夠讓資料函式庫具備提取和推播資料到串流平台的能力,串流將自然成為資料函式庫中的一等公民。

自定義聯結器

聯結器也可以透過自定義的方式構建,例如實作專門的微服務。這種方法的優勢在於其靈活性,但缺點是需要「重新造輪子」,特別是在已經有大量強大且可擴充套件的開源聯結器(如 Kafka Connect 中介軟體的 Debezium 源聯結器)的情況下,從頭開始實作聯結器往往不具備成本效益。

圖示說明:此圖示展示了實作聯結器的三種方式,分別是透過聯結器中介軟體、內建聯結器和自定義聯結器。

在後續章節中,我們將對聯結器的具體實作進行抽象處理,無論是根據聯結器中介軟體、內建聯結器還是自定義聯結器,都將被視為「聯結器」。

應使用案例項

回到我們的範例使用案例,我們希望用產品和客戶資訊來豐富點選事件。這些資訊很可能儲存在交易型資料函式庫或線上交易處理(OLTP)資料函式庫中。為了使這些資料能夠作為事件流使用,我們需要為該資料函式庫使用源聯結器。

OLTP 資料函式庫也被稱為操作型資料函式庫,是一種專門設計用來處理大量交易的資料函式庫型別。它們能夠提供快速的資料存取和更新,這對於需要即時資料處理的應用程式至關重要。

圖示說明:此圖示展示了客戶和產品資料在資料函式庫中的儲存情況,以及如何透過源聯結器將這些資料寫入主題。

串流處理平台

在第一章中,我們介紹了一個簡單的使用案例,即如何將即時資料提供給消費者。我們還介紹了聯結器,以及它們如何將靜態資料轉換為動態資料(或事件流),並將其釋出到串流平台的主題中。現在,這些事件流可以被讀取,但很可能還沒有達到消費者可以直接使用的格式。事件通常需要在進行分析處理之前進行清理和準備,並且需要豐富上下文資訊,以使其足夠有用來獲得洞察。分析處理嚴重依賴於資料的準確性和可靠性。透過解決缺失值、不一致性、重複資料和異常值等問題,可以提高資料品質,從而獲得更可靠和準確的分析結果。

圖示說明:此圖示展示了在事件資料到達目的地之前進行清理、準備和豐富的過程。

事件資料準備也對分析查詢的效能有著重要影響。透過最佳化資料佈局、索引和分割區,可以提高資料檢索和處理的效率。這包括針對分析工作負載量身定製的資料反正規化、欄位儲存和索引策略等技術。準備好的資料可以減少處理時間,並實作更快的洞察力。我們將在第四章討論如何向消費者提供分析資料時,涵蓋反正規化、欄位儲存和索引策略。

事件資料準備在確保資料治理和合規方面也發揮著至關重要的作用。這涉及執行資料安全措施、匿名化敏感資訊等,以滿足法規要求。

程式碼範例:

import pandas as pd

# 假設我們有一個包含點選事件的 DataFrame
click_events = pd.DataFrame({
    'user_id': [1, 2, 3],
    'product_id': [101, 102, 103],
    'click_time': ['2023-01-01 12:00:00', '2023-01-01 12:05:00', '2023-01-01 12:10:00']
})

# 將 click_time 欄位轉換為 datetime 型別
click_events['click_time'] = pd.to_datetime(click_events['click_time'])

#### 內容解密:
1. `import pandas as pd`:匯入 pandas 函式庫並賦予別名 `pd`,以便後續使用
2. `click_events = pd.DataFrame({...})`:建立一個 DataFrame 物件 `click_events`,用於儲存點選事件的相關資料
3. `click_events['click_time'] = pd.to_datetime(click_events['click_time'])`:`click_time` 欄位的資料型別轉換為 datetime以便於進行時間相關的操作

資料準備在串流資料處理中的重要性

在串流資料處理的過程中,資料準備是確保資料準確性、一致性和結構完整性的關鍵步驟。適當的資料準備可以提升分析結果的可靠性、改善系統效能,並使得從資料中擷取有意義的洞察變得更加容易。透過清理和準備資料,組織可以維護資料完整性、保護隱私,並符合法律和道德要求。

串流處理平台的角色

串流處理平台在資料處理流程中扮演著重要的角色。它負責在資料到達最終儲存之前,進行清理、準備和豐富事件資料的預處理步驟。在本文中,我們將這些任務稱為「轉換」(transformations),它們在串流資料管道中執行。

轉換任務的特性

轉換任務通常需要大量的資源和處理能力。因此,盡早在資料管道中完成這些轉換任務是非常重要的。這樣可以避免在大量靜態資料上執行耗費資源的任務。

目的地資料儲存

目的地資料儲存通常是線上分析處理(OLAP)資料儲存,用於支援對資料的分析查詢。實時OLAP(RTOLAP)資料儲存則是針對服務實時分析資料進行了最佳化。

OLAP與OLTP的區別

  • OLAP是為分析讀取而最佳化的資料儲存,通常是根據列的儲存,用於支援導向使用者的儀錶板和應用程式。
  • OLTP是捕捉來自應用程式事件的資料函式庫,針對寫入和單行查詢進行了最佳化。OLTP資料函式庫往往是CDC提取的來源。

案例研究:點選流資料的豐富

在點選流案例中,我們將點選流資料與客戶和產品資訊進行豐富,以提供點選分析的背景資訊,如客戶的位置、產品型別、顏色等。這種豐富的資訊可以幫助企業減少時間、庫存損失和提高客戶滿意度。

有狀態轉換

有些轉換任務足夠複雜,需要記住資訊,例如需要進行連線或聚合的情況。這種情況下,資訊需要儲存在某個地方,被稱為狀態儲存。這些複雜的轉換被稱為有狀態轉換。

有狀態轉換的例子

  • 滾動平均值:計算滾動平均值需要跟蹤前幾個資料元素的總和和數量。
public class RollingAverage {
    private double sum;
    private int count;
    private Queue<Double> window;

    public RollingAverage(int windowSize) {
        this.sum = 0;
        this.count = 0;
        this.window = new LinkedList<>();
    }

    public void add(double value) {
        sum += value;
        count++;
        window.add(value);
        if (window.size() > 10) { // 假設視窗大小為10
            sum -= window.poll();
            count--;
        }
    }

    public double getAverage() {
        return sum / count;
    }
}

內容解密:

此範例展示了一個簡單的滾動平均值計算。add 方法用於新增新的資料元素並更新總和及計數,同時維護一個固定大小的視窗。當視窗滿時,移除最舊的元素以保持視窗大小不變。

  • 會話化:在網頁分析或事件處理中,會話化涉及根據特定標準將相關事件分組到會話中。
from collections import defaultdict

class Sessionizer:
    def __init__(self, timeout):
        self.timeout = timeout
        self.sessions = defaultdict(list)

    def add_event(self, user_id, event_time, event_data):
        if user_id not in self.sessions or event_time - self.sessions[user_id][-1][0] > self.timeout:
            self.sessions[user_id].append((event_time, event_data))
        else:
            self.sessions[user_id].append((event_time, event_data))

內容解密:

此Python範例展示了會話化的基本邏輯。Sessionizer 類別根據給定的超時時間來決定是否將新的事件新增到現有的會話中或是建立新的會話。

  • 重複資料刪除:從串流中刪除重複事件通常需要維護已見事件的記錄。
public class Deduplicator {
    private Set<String> seenEvents;

    public Deduplicator() {
        this.seenEvents = new HashSet<>();
    }

    public boolean isDuplicate(String eventId) {
        return seenEvents.contains(eventId);
    }

    public void addEvent(String eventId) {
        seenEvents.add(eventId);
    }
}

內容解密:

此Java範例展示瞭如何使用一個集合來跟蹤已處理的事件ID,從而實作重複資料刪除。

  • 視窗聚合:需要在每個視窗內累積值並定期產生聚合結果。
SELECT 
    window_start,
    window_end,
    SUM(value) AS total_value
FROM 
    stream_data
WINDOW BY 
    TUMBLING WINDOW (SIZE 1 MINUTE)

內容解密:

此SQL範例展示瞭如何在串流資料上執行視窗聚合,計算每分鐘的總值。