Kafka Streams 提供了便捷的測試工具,讓開發者得以驗證串流處理邏輯的正確性。對於個別處理單元的測試,MockProcessorContext 可以模擬 Kafka Streams 的執行環境,方便開發者對特定功能進行隔離測試。而 TopologyTestDriver 則允許在不啟動真實 Kafka 叢集的情況下,測試整個串流拓撲的行為,驗證資料的轉換和流動是否符合預期。除了功能驗證,效能測試也同樣重要。透過 TestOutputTopic 讀取輸出記錄,並結合 JMH 等基準測試框架,可以精確測量拓撲的吞吐量,及早發現潛在的效能瓶頸。此外,文章也提及了使用 Kafka 內建工具進行生產者和消費者效能測試,以及透過 JMX 監控 Kafka Streams 和 ksqlDB 應用程式,提供全面的測試與監控策略。

Kafka Streams 測試:單元測試與行為測試

Kafka Streams 為開發者提供了強大的串流處理能力,但要確保程式碼的正確性和可靠性,測試是不可或缺的一環。本文將探討如何使用 Kafka Streams 提供的測試工具進行單元測試和行為測試。

單元測試:使用 MockProcessorContext

在進行單元測試時,我們需要隔離被測試的元件,並模擬其依賴的環境。Kafka Streams 提供了一個名為 MockProcessorContext 的類別,可以用來模擬處理器上下文。

設定 MockProcessorContext

@BeforeEach
public void setup() {
    processorContext = new MockProcessorContext();
    // 設定必要的 Kafka Streams 屬性
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    processorContext = new MockProcessorContext(props);
    // 初始化狀態儲存
    transformer = new CountTransformer();
    transformer.init(processorContext);
}

測試 CountTransformer

@Test
public void testTransformer() {
    String key = "123";
    String value = "some value";
    assertThat(transformer.transform(key, value)).isEqualTo(1L);
    assertThat(transformer.transform(key, value)).isEqualTo(2L);
    assertThat(transformer.transform(key, null)).isNull(); // 測試墓碑記錄(tombstone)
    assertThat(transformer.transform(key, value)).isEqualTo(1L);
}

內容解密:

  1. MockProcessorContext 的建立:我們使用 MockProcessorContext 來模擬 Kafka Streams 的處理器上下文,並提供必要的組態引數。這讓我們能夠在不依賴真實 Kafka 環境的情況下進行單元測試。
  2. CountTransformer 的初始化:在測試前,我們初始化 CountTransformer,使其準備好處理輸入記錄。
  3. 斷言驗證:透過一系列的斷言,我們驗證了 CountTransformer 的計數功能是否正確,以及它如何處理墓碑記錄。

行為測試:使用 TopologyTestDriver

除了單元測試外,我們還需要測試整個 Kafka Streams 拓撲的行為。這可以透過 TopologyTestDriver 來實作。

建立測試拓撲

class GreeterTopology {
    public static Topology build() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("users", Consumed.with(Serdes.Void(), Serdes.String()))
               .filterNot((key, value) -> value.toLowerCase().equals("randy"))
               .mapValues(GreeterTopology::generateGreeting)
               .to("greetings", Produced.with(Serdes.Void(), Serdes.String()));
        return builder.build();
    }
}

使用 TopologyTestDriver 進行測試

class GreeterTopologyTest {
    private TopologyTestDriver testDriver;
    private TestInputTopic<Void, String> inputTopic;
    private TestOutputTopic<Void, String> outputTopic;

    @BeforeEach
    void setup() {
        Topology topology = GreeterTopology.build();
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        testDriver = new TopologyTestDriver(topology, props);
        inputTopic = testDriver.createInputTopic("users", Serdes.Void().serializer(), Serdes.String().serializer());
        outputTopic = testDriver.createOutputTopic("greetings", Serdes.Void().deserializer(), Serdes.String().deserializer());
    }

    @Test
    void testUsersGreeted() {
        String value = "Izzy";
        inputTopic.pipeInput(value);
        assertThat(outputTopic.isEmpty()).isFalse();
        List<TestRecord<Void, String>> outRecords = outputTopic.readRecordsToList();
        assertThat(outRecords).hasSize(1);
        String greeting = outRecords.get(0).getValue();
        assertThat(greeting).isEqualTo("Hello Izzy");
    }
}

內容解密:

  1. TopologyTestDriver 的使用:透過 TopologyTestDriver,我們可以在模擬環境中測試整個 Kafka Streams 拓撲,而無需真實的 Kafka 叢集。
  2. 輸入與輸出主題的建立:我們使用 createInputTopiccreateOutputTopic 方法來建立輸入和輸出主題,這使得我們可以向拓撲中輸入資料並檢查輸出結果。
  3. 斷言驗證:在測試中,我們透過斷言來驗證輸出結果是否符合預期,確保拓撲的行為正確。

測試與基準測試

在前面的章節中,我們已經討論瞭如何使用 Kafka Streams 的測試工具來測試我們的拓撲結構。在本文中,我們將繼續探討如何讀取輸出主題中的記錄,以及如何執行基準測試以評估我們的 Kafka Streams 應用程式的效能。

讀取輸出主題中的記錄

有多種方法可以讀取輸出主題中的記錄。在這裡,我們使用 readRecordsToList() 方法將所有輸出記錄讀入列表中。其他可用的方法包括 readValue()readKeyValue()readRecord()readKeyValuesToMap()

首先,我們斷言只有一條輸出記錄(使用 flatMap 運算子的拓撲結構可能會具有 1:N 的輸入輸出記錄比例)。然後,我們讀取輸出記錄的值。還有其他方法可以用於存取更多的記錄資料,包括 getKey()getRecordTime()getHeaders()

使用 TestOutputTopic 的方法

TestOutputTopic.readRecordsToList() 方法在處理流資料時非常有用,因為它包含了整個輸出事件序列。另一方面,TestOutputTopic.readKeyValuesToMap() 方法在處理表格資料時很有用,因為它只包含了每個鍵的最新表示形式。

基準測試

雖然 ksqlDB 會為我們組成底層的 Kafka Streams 拓撲結構,但是直接使用 Kafka Streams DSL 或 Processor API 會引入更多的效能迴歸向量。因此,我們需要在對 Kafka Streams 應用程式進行更改時執行基準測試,以保護我們免受效能迴歸的影響。

使用 JMH 進行基準測試

我們可以使用 kafka-streams-test-utils 套件提供的模擬執行環境與 JMH 等基準測試框架相結合來實作基準測試。首先,我們需要在 build.gradle 檔案中新增 me.champeau.gradle.jmh 外掛,並組態該外掛建立的 jmh 任務。

plugins {
    id 'me.champeau.gradle.jmh' version '0.5.2'
}

jmh {
    iterations = 4
    benchmarkMode = ['thrpt']
    threads = 1
    fork = 1
    timeOnIteration = '3s'
    resultFormat = 'TEXT'
    profilers = []
    warmupIterations = 3
    warmup = '1s'
}

接下來,我們需要建立一個類別來執行基準測試。與其他 Kafka Streams 測試程式碼不同,基準測試程式碼應位於 src/jmh/java 目錄中。我們的基準測試類別如下所示:

public class TopologyBench {
    @State(org.openjdk.jmh.annotations.Scope.Thread)
    public static class MyState {
        public TestInputTopic<Void, String> inputTopic;

        @Setup(Level.Trial)
        public void setupState() {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

            // build the topology
            Topology topology = GreeterTopology.build();

            // create a test driver. we will use this to pipe data to our topology
            TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
            testDriver = new TopologyTestDriver(topology, props);

            // create the test input topic
            inputTopic =
                    testDriver.createInputTopic(
                            "users", Serdes.Void().serializer(), Serdes.String().serializer());
        }
    }

    @Benchmark
    @BenchmarkMode(Mode.Throughput)
    @OutputTimeUnit(TimeUnit.SECONDS)
    public void benchmarkTopology(MyState state) {
        state.inputTopic.pipeInput("Izzy");
    }
}

執行基準測試

現在,我們可以使用以下命令使用 Gradle 執行基準測試:

$ ./gradlew jmh

您應該會看到類別似以下的輸出:

Benchmark Mode Cnt Score Error Units
TopologyBench.benchmarkTopology thrpt 4 264794.572 ± 39462.097 ops/s

Kafka 叢集基準測試

無論您是使用 Kafka Streams 還是 ksqlDB,您可能還需要在 Kafka 叢集級別執行效能測試。Kafka 包含一些控制檯指令碼,可以幫助您測量讀寫吞吐量,對叢集執行負載和壓力測試,並確定某些客戶端設定(批次大小、緩衝區記憶體、生產者確認、消費者執行緒數)和輸入特性(記錄大小、訊息量)如何影響叢集的效能。

如果您想分析向主題生成資料的效能,可以使用 kafka-producer-perf-test 命令。該命令有許多可用的選項,以下示例演示了其中幾個:

kafka-producer-perf-test --topic test-topic --num-records 100000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092

這個命令將向名為 test-topic 的主題生成 100,000 條記錄,每條記錄大小為 1KB,並測量生產者的吞吐量。您可以根據需要調整選項以滿足您的測試需求。

測試與監控 Kafka Streams 與 ksqlDB 應用程式

在開發和佈署 Kafka Streams 與 ksqlDB 應用程式時,測試和監控是非常重要的環節。本篇文章將介紹如何使用各種工具和技術來測試和監控這些應用程式。

測試 Kafka 生產者和消費者

Kafka 提供了多種工具來測試生產者和消費者的效能。以下是一些範例:

生產者效能測試

可以使用 kafka-producer-perf-test 命令來測試 Kafka 生產者的效能。例如:

kafka-producer-perf-test \
--topic users \
--num-records 1000000 \
--record-size 100 \
--throughput -1 \
--producer-props acks=1 \
bootstrap.servers=kafka:9092 \
buffer.memory=67108864 \
batch.size=8196

這個命令將測試生產者向 users 主題傳送 100 萬條記錄的效能。

使用 payload 檔案進行生產者效能測試

也可以使用 --payload-file 引數來指定一個包含測試記錄的檔案。例如:

cat <<EOF >./input.json
{"username": "Mitch", "user_id": 1}
{"username": "Isabelle", "user_id": 2}
{"username": "Elyse", "user_id": 3}
EOF

kafka-producer-perf-test \
--topic users \
--num-records 1000000 \
--payload-file input.json \
--throughput -1 \
--producer-props acks=1 \
bootstrap.servers=kafka:9092 \
buffer.memory=67108864 \
batch.size=8196

這個命令將使用 input.json 檔案中的記錄來進行生產者效能測試。

消費者效能測試

可以使用 kafka-consumer-perf-test 命令來測試 Kafka 消費者的效能。例如:

kafka-consumer-perf-test \
--bootstrap-server kafka:9092 \
--messages 100000 \
--topic users \
--threads 1

這個命令將測試消費者從 users 主題讀取 10 萬條訊息的效能。

JMX 監控

Kafka Streams 和 ksqlDB 都提供了 JMX(Java Management Extensions)監控功能,可以用來監控應用程式的效能和狀態。

啟用 JMX 監控

可以在啟動 ksqlDB 時啟用 JMX 監控,例如:

docker run \
--net=chapter-12_default \
-p 1099:1099 \
-v "$(pwd)/ksqldb":/ksqldb \
-e KSQL_JMX_OPTS="\
-Dcom.sun.management.jmxremote \
-Djava.rmi.server.hostname=$MY_IP \
-Dcom.sun.management.jmxremote.port=1099 \
-Dcom.sun.management.jmxremote.rmi.port=1099 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false" \
-ti confluentinc/ksqldb-server:0.14.0 \
ksql-server-start /ksqldb/config/server.properties

這個命令將啟用 JMX 監控,並將監控資料暴露在 1099 連線埠上。

使用 JConsole 檢視 JMX 指標

可以使用 JConsole 工具來檢視 JMX 指標,例如:

jconsole $MY_IP:1099

這個命令將開啟 JConsole,並連線到指定的 JMX 連線埠。

監控檢查清單

以下是一些需要監控的專案:

  • Cluster monitoring:監控 Kafka 叢集的健康狀態,例如 under-replicated partitions、consumer lag 等。
  • Log monitoring:監控應用程式的日誌,例如錯誤日誌的數量等。
  • Metric monitoring:監控應用程式的指標,例如消費率、生產率等。
  • Custom instrumentation:監控自定義的業務指標。
  • Profiling:監控應用程式的效能,例如死鎖、熱點等。
  • Visualizations:視覺化監控資料,例如使用 Grafana 等工具。
  • Alerting:設定警示,例如當 SLO(Service-Level Objective)未達成時觸發警示。

監控架構圖示

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 監控檢查清單

rectangle "JMX" as node1
rectangle "Metrics" as node2
rectangle "Visualization" as node3
rectangle "Alerting" as node4

node1 --> node2
node2 --> node3
node3 --> node4

@enduml

此圖示展示了 Kafka Streams 的監控架構,包括 JMX、Prometheus、Grafana 和 Alertmanager 等元件。

程式碼範例與內容解密

以下是一個使用 Prometheus 和 Grafana 進行監控的範例程式碼:

import prometheus_client

# 定義一個 Prometheus 指標
counter = prometheus_client.Counter('my_counter', 'An example counter')

# 更新指標值
counter.inc()

# 啟動 Prometheus HTTP 伺服器
prometheus_client.start_http_server(8000)

內容解密:

  1. 首先,我們匯入了 prometheus_client 模組,這是 Prometheus 的 Python 客戶端函式庫。
  2. 然後,我們定義了一個名為 my_counter 的 Counter 指標,用於計數。
  3. 接著,我們更新了指標值,使用 inc() 方法將計數器加一。
  4. 最後,我們啟動了 Prometheus HTTP 伺服器,監聽在 8000 連線埠上。

這個範例展示瞭如何使用 Prometheus 和 Grafana 進行監控,包括定義指標、更新指標值和啟動 HTTP 伺服器等步驟。