Fluentd 作為一款開源日誌收集器,其靈活性和擴充套件性仰賴於豐富的外掛生態。本文將引導讀者開發一個 Redis 列表外掛,包含輸入和輸出兩個部分,以實作與 Redis 的高效日誌互動。輸出外掛首先探討了同步和非同步處理模式,並以同步批次操作提升效率。接著,輸入外掛則以輪詢機制從 Redis 列表讀取資料,並解析 JSON 格式的日誌事件。文章同時也涵蓋了 Fluentd 的緩衝機制,說明如何利用緩衝區最佳化 I/O 效能。為了確保外掛的品質,示範瞭如何使用單元測試框架驗證功能的正確性,以及如何使用工具生成完善的檔案。最後,文章也提到了程式碼重構的重要性,以提升外掛的可維護性。
實作 Redis 輸出邏輯
在成功組態、啟動和關閉外掛後,下一步是實作將事件傳送到 Redis 的邏輯。有幾種方法可以執行此邏輯:
實作方法選擇
- 同步處理:透過實作
def process(tag, es)方法處理每個事件。這是最直接的方法,但效能較低,因為它不使用任何緩衝。 - 同步緩衝:透過實作
def write(chunk)方法輸出資料。 - 非同步緩衝:透過實作
def try_write(chunk)方法輸出資料。
是否使用 <buffer> 區段的組態將決定採用哪種實作方法。除非組態了某些覆寫標準行為的設定,否則將採用同步模型進行第一個實作。
同步模型實作流程
我們的實作過程需要標記傳遞的事件流 (es) 並迭代處理事件。由於流中可能包含多個事件,因此可以透過告訴 Redis 批次執行事件插入來提高效率。這是透過使用 redis.multi 命令告訴 Redis 它將接收多個事務呼叫來實作的。迭代完事件後,使用 redis.exec 呼叫告訴 Redis 執行事務。
迭代處理事件時的必要操作
- 建立日誌事件的 JSON 表示:如果檢視輸出介面,會注意到有一個預定義的格式化函式。我們選擇不覆寫或使用它,因為不想影響此方法在外掛基礎類別中的其他應用;因此,可以以任何所需的方式格式化表示,例如使用 msgpack。
- 執行 Redis 列表推播功能。
程式碼實作
def redisFormat(tag, time, record)
redis_entry = Hash.new
redis_entry.store(RedislistOutput::TagAttributeLabel, tag.to_s)
redis_entry.store(RedislistOutput::TimeAttributeLabel, time.to_i)
redis_entry.store(RedislistOutput::RecordAttributeLabel, record.to_s)
redis_out = JSON.generate(redis_entry)
return redis_out
end
def write(chunk)
log.trace "write:", chunk
@redis.multi
chunk.each do |time, record|
log.debug "write sync redis push ", chunk.metadata.tag, time, record, @listname
@redis.lpush(@listname, redisFormat(chunk.metadata.tag, time, record))
end
@redis.exec
end
內容解密:
redisFormat方法:將日誌事件轉換為 JSON 表示。建立一個雜湊結構來儲存事件的標籤、時間和記錄,並將其轉換為 JSON 字串。write方法:實作同步緩衝輸出邏輯。首先記錄追蹤資訊,然後告訴 Redis 接受多個陳述式。使用lpush命令將格式化後的 JSON 事件推播到 Redis 列表中。最後,執行所有陳述式。
測試同步輸出
重新啟動 Fluentd,並使用 Redis 命令檢查 Redis 中的列表結構和彈出條目。預期結果是 Redis 列表結構會增長並包含 JSON 內容,例如:
{"tag": "dummy", "time": "2014-12-14 23:23:38", "record": {"hello": "world", "counter":1}}
日誌事件的計數器值會隨著源組態而遞增,因此 Redis 列表長度與計數器屬性值之間應該存在相關性。可以使用 Redis CLI 中的 llen 命令確認這一點。
繼續完成輸入外掛的實作
在實作其他寫入方法之前,先完成輸入外掛的實作。使用相同的工具生成輸入外掛的骨架檔案。需要實作一些不同的函式,因為生成的程式碼擴充套件了不同的基礎類別。
輸入外掛實作重點
- 需要實作不同於輸出外掛的函式。
- 使用工具生成輸入外掛骨架,並根據需要進行修改。
Redis 輸入外掛的實作
Redis 的輸入外掛本質上是一種輪詢活動,因為大多數解決方案不支援回撥或 Webhook(值得注意的是,Redis 有一個 Webhook 的概念)。這意味著我們需要一個組態值來決定外掛需要多快地輪詢 Redis。與輸出外掛一樣,我們需要連線到 Redis 的必要資訊。對於後者,我們可以複製為輸出外掛編寫的程式碼。雖然這不符合 DRY(不要重複自己)這一優秀的編碼原則,但稍後我們有很多機會來改進我們的程式碼。
組態屬性處理
輸入外掛處理組態屬性的方法與輸出外掛相同。有兩個關鍵函式需要在輸入外掛上實作:處理 run 命令和 emit 函式(如清單 9.6 所示)。run 方法將負責啟動我們的排程執行緒。emit 函式處理呼叫 Redis 並將日誌事件傳送到 Fluentd 中由組態檔案定義的下一個程式。
emit 函式實作
def emit
log.trace "emit triggered"
if !@redis
log.debug "reconnecting Redis ", @hostaddr, ":", @port
connect_redis()
end
if @redis
keep_popping = true
while keep_popping
if (@fifo)
popped = @redis.rpop(@listname)
else
popped = @redis.lpop(@listname)
end
log.debug "Popped", @listname, ": ", popped
if popped
data = JSON.parse(popped)
if (@use_original_time)
time = data[TimeAttributeLabel]
else
time = Fluent::EventTime.now
end
if (@use_original_tag)
tag = data[RedislistInput::TagAttributeLabel]
else
tag = @tag
end
data_record = data.fetch(RecordAttributeLabel).to_s
log.debug "original data record=>", data_record
if (@add_original_time && !(data_record.include? '"' + @add_original_time_name + '"'))
data_record = inject_original_value(data, data_record, RedislistInput::TimeAttributeLabel, @add_original_time_name)
end
if @add_original_tag && !(data_record.include? '"' + @add_original_tag_name + '"')
data_record = inject_original_value(data, data_record, RedislistInput::TagAttributeLabel, @add_original_tag_name)
end
log.debug "Emitting -->", tag, " ", time, " ", data_record
router.emit(tag, time, data_record)
else
keep_popping = false
end
end
else
log.warn "No Redis - ", @redis
end
end
內容解密:
emit方法觸發時,首先檢查是否已建立 Redis 連線。如果沒有,則嘗試重新連線。@redis存在時,進入迴圈持續從 Redis 列表中彈出資料,直到沒有更多資料可供處理。- 根據
@fifo組態決定使用rpop或lpop從列表中取出資料,實作 FIFO 或 LIFO 的行為。 - 解析彈出的 JSON 資料,並根據組態決定是否使用原始的時間戳和標籤,或使用目前時間和預設標籤。
- 將原始資料轉換為字串,並根據組態新增原始時間和標籤到資料記錄中(如果尚未存在)。
- 最終,透過
router.emit將處理好的日誌事件傳送到 Fluentd 的下一個處理步驟。
run 方法實作
def run
log.trace ("run triggered")
while thread_current_running?
current_time = Time.now.to_i
emit() if thread_current_running?
while thread_current_running? && Time.now.to_i <= current_time
sleep @run_interval
end
end
end
內容解密:
run方法在執行緒執行期間持續執行。- 每次迴圈開始時,記錄當前時間戳,並呼叫
emit方法處理日誌事件。 - 隨後進入內層迴圈,根據
@run_interval組態間隔時間後再次檢查是否需要繼續執行。
輸入與輸出外掛測試
實作輸入外掛後,可以進行簡單測試。可以將輸入和輸出外掛都納入單一組態中,但日誌資訊會混雜在一起,因此可以啟動兩個 Fluentd 例項,每個例項使用自己的組態檔案,這樣可以更容易觀察發生了什麼。
使用緩衝區擴充套件輸出外掛
透過使用緩衝區,可以最佳化 I/O 處理過程。目前已經透過組態同步處理程式將 Redis 的推播操作批次處理,從而提升了效能。進一步,可以利用緩衝區將更大的資料群組以單一交易在 Redis 中處理。
Fluentd 緩衝機制
Fluentd 預設提供了兩種緩衝區實作:檔案和記憶體。對於針對記憶體解決方案的外掛,使用記憶體緩衝區更有意義。
圖表說明:Fluentd 輸出外掛處理流程圖
此圖示展示了 Fluentd 輸出外掛的處理流程,根據是否使用緩衝區以及緩衝區的處理方式(同步或非同步),最終將日誌事件傳送至 Redis。
9.8.1 改善場景:將可維護性付諸實踐
我們已經展示了 Redis 列表外掛的雛型實作,說明瞭如何開發新的外掛。在開發過程中,輸入和輸出程式碼之間存在一些共通性。因此,我們獲得了改進的許可。首先,我們將重構輸入和輸出來使用共同的基底類別。
9.9 單元測試
到目前為止,我們所進行的測試是手動的,這並不是最佳實踐。在實際開發中,我們應該先進行單元測試,然後再進行其他測試。理想情況下,程式碼的變更應該觸發持續整合和持續交付流程,自動執行單元測試和端對端測試。
Fluentd 團隊已經開發了一些支援函式庫,可以與任何主要的單元測試框架一起使用,包括 test-unit、RSpec 和 minitest。我們的範例使用 test-unit,因為它是一個被廣泛採用的框架,並且與其他主要單元測試框架(如 NUnit、JUnit 等)類別似。
當我們使用工具生成外掛骨架並產生主要的 Ruby 程式碼時,該工具還在基礎外掛資料夾中生成了一個名為 test 的資料夾結構。這包括提供一個骨架類別來幫助我們開始。測試類別與外掛具有相同的名稱,但具有 test_ 字首。這是一小段輔助程式碼(helper.rb),位於測試路徑的根目錄下。這將框架的輔助程式碼載入到 test-unit 工具中。
為了說明可能性,我們為輸出外掛建立了幾個測試,如清單 9.8 所示。這些測試重點驗證組態相關邏輯,該邏輯驅動外掛的行為。這是透過使用 Fluentd 測試框架的一部分來實作的,該框架實作了不同型別的驅動程式。所需的驅動程式型別由外掛型別決定,並模擬 Fluentd 的核心。我們可以使用驅動程式觸發必要的操作,包括將日誌事件饋送到外掛。驅動程式還提供了檢索和評估結果的方法,例如事件透過特定階段(例如 emit、write)的次數。可以使用驅動程式存取和檢查已處理的事件來進行評估。
使用 Plantuml 圖表呈現測試流程
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title Redis 日誌外掛開發與實踐
package "資料庫架構" {
package "應用層" {
component [連線池] as pool
component [ORM 框架] as orm
}
package "資料庫引擎" {
component [查詢解析器] as parser
component [優化器] as optimizer
component [執行引擎] as executor
}
package "儲存層" {
database [主資料庫] as master
database [讀取副本] as replica
database [快取層] as cache
}
}
pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中
master --> replica : 資料同步
note right of cache
Redis/Memcached
減少資料庫負載
end note
@enduml此圖示描述了測試流程,從載入測試框架到評估結果。
內容解密:
- 測試流程:圖表展示了從開始測試到評估結果的整個流程。
graph LR:表示圖表採用從左到右的佈局方式。- 節點定義:每個節點代表測試流程中的一個步驟,例如「開始測試」、「載入測試框架」等。
- 連線線:箭頭表示步驟之間的流程方向,表明每個步驟的先後順序。
程式碼範例:組態相關邏輯驗證
def test_config
# 組態相關邏輯驗證
config = {
'@type' => 'redislist',
'listname' => 'test_list'
}
# 使用 Fluentd 測試框架驗證組態
driver = Fluent::Test::OutputTestDriver.new(Fluent::Plugin::RedisListOutput, 'tag')
driver.configure(config)
assert_equal('test_list', driver.instance.listname)
end
內容解密:
test_config方法:定義了一個測試案例,用於驗證組態相關邏輯。config雜湊:包含了外掛的組態選項,例如@type和listname。Fluent::Test::OutputTestDriver:使用 Fluentd 測試框架建立輸出測試驅動程式。driver.configure(config):將組態應用於驅動程式例項。assert_equal:斷言listname組態是否正確設定為test_list。
將單元測試的開發投入實際應用
在開發單元測試的過程中,我們不僅需要驗證外掛的功能是否正確,還需要確保測試的全面性和有效性。單元測試框架提供了一系列工具來幫助我們實作這一目標,例如測試驅動開發(TDD)和行為驅動開發(BDD)。
測試輸出外掛的組態和寫入操作
為了測試輸出外掛的組態和寫入操作,我們需要建立一個測試使用案例。在這個測試使用案例中,我們將驗證外掛是否能夠正確地讀取組態資訊,並將資料寫入到 Redis 列表中。
test 'advanced config' do
conf = %[
host 127.0.0.1
port 24229
]
captured_string = capture_stdout do
d = create_driver(conf)
assert_equal 24229, d.instance.port
assert_equal '127.0.0.1', d.instance.hostaddr
end
assert_true (captured_string.include? "Non standard Redis port in use")
d.shutdown
end
內容解密:
- 測試使用案例定義:使用
test方法定義了一個名為 ‘advanced config’ 的測試使用案例。 - 組態資訊:定義了一個包含主機和埠資訊的組態字串
conf。 - 捕捉標準輸出:使用
capture_stdout方法捕捉在程式碼塊中輸出的標準輸出內容。 - 建立驅動例項:使用
create_driver方法建立了一個驅動例項d,並傳入組態資訊conf。 - 斷言驗證:使用
assert_equal方法驗證了驅動例項的埠和主機地址是否正確。 - 檢查警告資訊:使用
assert_true方法驗證了捕捉的標準輸出中是否包含特定的警告資訊。 - 關閉驅動例項:呼叫了
shutdown方法關閉驅動例項,釋放資源。
執行單元測試
要執行單元測試,我們可以使用 Ruby 直譯器直接執行測試檔案。例如:
ruby Chapter9/fluent-plugin-out-redislist/test/plugin/test_out_redislist.rb
將單元測試開發投入實際應用
在之前的章節中,我們已經確定了需要透過執行 Fluentd 不同的組態檔案來測試不同的組態。這些應該被替換為單元測試。由於輸出外掛限制了我們使用緩衝的方式,我們需要進一步測試組態處理。這最好透過單元測試來完成。
封裝和佈署
完成測試後,我們可以考慮封裝和佈署我們的外掛。這包括準備後設資料檔案和檔案。
檔案編寫
編寫檔案是封裝解決方案的重要部分。範本實用程式將提供標準的許可證檔案和基本的 README 檔案。最重要的事情是確保 README 檔案清晰完整。像任何好的產品一樣,外掛的成功使用取決於人們對如何使用它的理解,因此良好的檔案將產生有意義的影響。
使用 Fluentd 提供的工具生成檔案
Fluentd 提供了一個實用程式 fluent-plugin-config-format,可以根據外掛的引數生成檔案。我們可以使用這個工具來生成外掛組態資訊的檔案。
fluent-plugin-config-format output redislist -f markdown -p lib/fluent/plugin/
使用 RDoc 或 YARD 生成開發者檔案
我們還可以使用 RDoc 或 YARD 來生成開發者級別的檔案。YARD 提供了一些額外的功能,因此我們選擇使用 YARD。
yard doc lib/fluent/plugin/out_redislist.rb
yard doc lib/fluent/plugin/in_redislist.rb
內容解密:
- 生成檔案:使用
fluent-plugin-config-format命令生成外掛組態資訊的檔案。 - 指定格式:使用
-f引數指定生成的檔案格式為 Markdown。 - 指定外掛路徑:使用
-p引數指定外掛程式碼的路徑。 - YARD 檔案生成:使用
yard doc命令生成開發者級別的檔案。 - 指定 Ruby 檔案:指定需要生成檔案的 Ruby 檔案路徑。