在現代雲端基礎設施中,Kubernetes已然成為容器協調的主流標準。然而,其真正的價值不僅在於高效的容器管理,更在於其優異的擴充套件性。今天玄貓要帶領大家探討如何運用Kubernetes的擴充套件機制,建立一個能夠監控叢集外部資源的自定義運算元(Operator)。

為何需要外部資源監控?

在多年的技術諮詢經驗中,玄貓發現許多企業面臨著需要統一管理內外部資源的挑戰。像Deckhouse Commander、Argo CD和Crossplane等專案已經展現了Kubernetes管理外部資源的潛力,它們能夠透過叢集內的資源定義來操作外部的叢集或應用程式佈署。

深入理解Kubernetes控制器架構

Kubebuilder框架概觀

在開發Kubernetes控制器時,有多種工具可供選擇。其中controller-runtime是最基礎的函式庫Kubebuilder和Operator SDK則是建立在其上的高階框架。根據多年開發經驗,玄貓特別推薦使用Kubebuilder,因為它提供了更簡潔的開發體驗。

Kubebuilder建立的控制器主要包含三個核心元件:

  1. 管理器(Manager)

    • 負責協調各元件運作
    • 處理資源快取機制
    • 管理長官者選舉(Leader Election)
    • 負責系統訊號處理
  2. Webhook

    • 可選元件
    • 處理資源規格的預設值設定
    • 執行資源驗證
  3. 控制器(Controller)

    • 監控特定類別資源的變化
    • 處理相關子資源
    • 實作核心業務邏輯

控制器運作機制

當探討控制器的核心功能時,我們需要關注它如何處理資源事件。在玄貓實作的專案中,控制器的工作流程通常包含:

  1. 事件監聽:接收資源的建立、刪除和修改事件
  2. 事件過濾:根據業務需求篩選需要處理的事件
  3. 調解處理:透過Reconciler比較目標狀態與實際狀態,並執行必要的調整

對於叢集內部資源的監控,Kubebuilder提供了For和Owns方法來設定監聽機制。然而,當我們要處理外部資源時,就需要一個不同的方法。

在接下來的章節中,玄貓將展示如何擴充套件這個基礎框架,建立一個能夠監控外部HTTP伺服器的運算元。這個實作不僅展示了Kubernetes的擴充套件性,也為處理類別似的外部資源監控需求提供了參考範本。

在 Kubernetes 的生態系統中,Operator 模式已經成為管理複雜應用程式的標準方法。今天玄貓要帶大家探討如何開發一個監控外部 HTTP 伺服器的 Operator,這個案例將展示 Controller 如何管理叢集外部資源。

專案需求與架構設計

核心功能需求

我們要開發的 Operator 主要功能是監控叢集外部的 HTTP 伺服器。每當檢測到伺服器狀態變化時,Controller 會即時更新對應資源的狀態。這個案例雖然簡單,但足以展示 Controller 處理外部資源的核心概念。

技術架構

  • 使用 kubebuilder 框架開發 Operator
  • 實作自定義資源(Custom Resource)來描述監控目標
  • 建立控制器持續監控伺服器狀態
  • 使用 Go channel 處理非同步探測任務

專案初始化與環境設定

首先,我們需要建立專案結構:

mkdir probes && cd probes
kubebuilder init --domain network.io --repo probes

接著建立自定義資源:

kubebuilder create api --group test --version v1alpha1 --kind WebChecker --namespaced=true

定義資源結構

api/v1alpha1/webchecker_types.go 中定義我們的資源結構:

type WebCheckerSpec struct {
    Host string `json:"host"`
    // +kubebuilder:default:="/"
    Path string `json:"path,omitempty"`
}

type WebCheckerStatus struct {
    Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
}

type WebChecker struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    
    Spec   WebCheckerSpec   `json:"spec,omitempty"`
    Status WebCheckerStatus `json:"status,omitempty"`
}

這個結構定義了我們需要監控的 HTTP 伺服器資訊:

  • Host:伺服器主機位址
  • Path:要檢查的路徑,預設為 “/”
  • Conditions:用於記錄伺服器狀態的條件陣列

Controller 核心邏輯實作

internal/controller/webchecker_controller.go 中,我們需要定義一些重要的常數和資料結構:

const (
    WebCheckerWorkersCount           = 2
    WebCheckerProbeTaskChannelSize   = 100
    WebCheckerProbeResultChannelSize = 100
)

type WebCheckerState struct {
    Host          string
    Path          string
    LastCheckTime time.Time
    IsSuccessful  bool
    LastError     string
}

type WebCheckerProbeTask struct {
    NamespacedName types.NamespacedName
    Host           string
    Path           string
}

type WebCheckerProbeResult struct {
    NamespacedName types.NamespacedName
    IsSuccessful   bool
}

這些定義的意義:

  • WebCheckerWorkersCount:同時執行的工作者數量
  • WebCheckerProbeTaskChannelSize:探測任務佇列大小
  • WebCheckerProbeResultChannelSize:結果佇列大小
  • WebCheckerState:記錄檢查狀態的結構
  • WebCheckerProbeTask:探測任務的資料結構
  • WebCheckerProbeResult:探測結果的資料結構 接下來讓我們深入解析這個 Kubernetes Operator 的核心實作。首先,我們來看 WebCheckerReconciler 這個關鍵結構體:
// WebCheckerReconciler 用於調和 WebChecker 物件
type WebCheckerReconciler struct {
    client.Client
    Scheme *runtime.Scheme
    events chan event.GenericEvent
    mu sync.RWMutex
    webCheckersStates map[types.NamespacedName]WebCheckerState
    tasks chan WebCheckerProbeTask 
    tasksResults chan WebCheckerProbeResult
    cancelFunc context.CancelFunc
}

這個結構體包含了以下重要欄位:

  • Client:用於與 Kubernetes API 伺服器互動
  • Scheme:負責序列化和反序列化 Kubernetes 資源
  • events:事件通知通道
  • mu:用於狀態同步的讀寫鎖
  • webCheckersStates:維護所有 WebChecker 資源的狀態對映
  • tasks 與 tasksResults:處理檢查任務的通道
  • cancelFunc:用於取消操作的連貫的背景與環境函式

接著是核心的 Reconcile 方法實作:

func (r *WebCheckerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    logger := log.FromContext(ctx)
    logger.Info("Reconciling WebChecker", "request", req)

    var reconciledResource testv1alpha1.WebChecker
    
    // 從 API 伺服器取得資源
    if err := r.Get(ctx, req.NamespacedName, &reconciledResource); err != nil {
        logger.Error(err, "Failed to get WebChecker resource")
        if errors.IsNotFound(err) {
            r.mu.Lock()
            delete(r.webCheckersStates, req.NamespacedName)
            r.mu.Unlock()
        }
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // 處理資源刪除邏輯
    if reconciledResource.DeletionTimestamp != nil {
        logger.Info("WebChecker resource is being deleted")
        r.mu.Lock()
        delete(r.webCheckersStates, req.NamespacedName)
        r.mu.Unlock()
        return ctrl.Result{}, nil
    }

    // 處理資源建立與更新邏輯
    r.mu.Lock()
    defer r.mu.Unlock()
    
    var cachedState WebCheckerState
    var ok bool
    
    // 檢查是否為新建立的資源
    if cachedState, ok = r.webCheckersStates[req.NamespacedName]; !ok {
        r.webCheckersStates[req.NamespacedName] = WebCheckerState{
            Host: reconciledResource.Spec.Host,
            Path: reconciledResource.Spec.Path,
        }
        return ctrl.Result{}, nil
    }

    // 檢查資源是否需要更新
    if cachedState.Host != reconciledResource.Spec.Host || 
       cachedState.Path != reconciledResource.Spec.Path {
        r.webCheckersStates[req.NamespacedName] = WebCheckerState{
            Host: reconciledResource.Spec.Host,
            Path: reconciledResource.Spec.Path,
        }
        return ctrl.Result{}, nil
    }

    // 更新資源狀態
    webCheckerStatus := testv1alpha1.WebCheckerStatus{
        Conditions: []metav1.Condition{
            statusFromWebCheckerState(cachedState),
        },
    }

    reconciledResource.Status = webCheckerStatus
    if err := r.Status().Update(ctx, &reconciledResource); err != nil {
        logger.Error(err, "Failed to update WebChecker status")
        return ctrl.Result{}, err
    }

    return ctrl.Result{}, nil
}

這個 Reconcile 方法實作了完整的資源生命週期管理:

  1. 資源取得

    • 使用 client.Get 從 API 伺服器取得資源
    • 處理資源不存在的情況
  2. 刪除處理

    • 檢查資源是否被標記為刪除
    • 從內部狀態中移除相關記錄
  3. 狀態管理

    • 維護資源的內部狀態
    • 處理新建立的資源
    • 檢測並處理資源更新
  4. 狀態更新

    • 根據內部狀態更新資源的狀態列位
    • 將更新後的狀態同步到 API 伺服器

這個控制器的設計展現了以下幾個重要的最佳實踐:

  1. 並發控制:使用互斥鎖確保狀態更新的安全性
  2. 資源管理:完整處理資源的建立、更新和刪除操作
  3. 錯誤處理:適當的錯誤處理和日誌記錄
  4. 狀態同步:確保內部狀態與叢集狀態的一致性

在實際應用中,這種設計模式能夠有效地管理自定義資源,並確保其在 Kubernetes 叢集中的正確運作。透過這種方式,我們可以擴充套件 Kubernetes 的功能,實作更複雜的業務邏輯和自動化流程。

// 監控網站健康狀態的任務排程器
func (r *WebCheckerReconciler) runTasksScheduler(ctx context.Context) {
    logger := log.FromContext(ctx)
    logger.Info("啟動任務排程器")

    // 建立定時器,每10秒執行一次檢查
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            logger.Info("排程器停止運作") 
            return
        case <-ticker.C:
            r.mu.Lock()
            // 遍歷所有需要監控的網站
            for namespacedName, state := range r.webCheckersStates {
                // 檢查是否需要執行新的檢查任務
                if time.Since(state.LastCheckTime) >= 10*time.Second {
                    // 建立新的檢查任務
                    task := WebCheckerTask{
                        NamespacedName: namespacedName,
                        Host:          state.Host,
                        Path:          state.Path,
                    }
                    // 將任務送入任務佇列
                    r.tasks <- task
                }
            }
            r.mu.Unlock()
        }
    }
}

// WebCheckerProbe 用於執行實際的網站健康檢查
type WebCheckerProbe struct {
    NamespacedName types.NamespacedName
    Host           string 
    Path           string
}

// PerformCheck 執行網站可用性檢查
func (p *WebCheckerProbe) PerformCheck(ctx context.Context) WebCheckerTaskResult {
    result := WebCheckerTaskResult{
        NamespacedName: p.NamespacedName,
        IsSuccessful:   false,
    }

    // 建立HTTP請求
    url := fmt.Sprintf("http://%s%s", p.Host, p.Path)
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        result.LastError = fmt.Sprintf("建立請求失敗: %v", err)
        return result
    }

    // 執行請求
    client := &http.Client{
        Timeout: 5 * time.Second,
    }
    resp, err := client.Do(req)
    if err != nil {
        result.LastError = fmt.Sprintf("執行請求失敗: %v", err)
        return result
    }
    defer resp.Body.Close()

    // 檢查HTTP狀態碼
    if resp.StatusCode >= 200 && resp.StatusCode < 300 {
        result.IsSuccessful = true
    } else {
        result.LastError = fmt.Sprintf("HTTP狀態碼異常: %d", resp.StatusCode)
    }

    return result
}
  1. runTasksScheduler 函式
  • 這是一個持續執行的排程器,負責定期觸發網站健康檢查
  • 使用 time.Ticker 設定每10秒執行一次檢查
  • 使用互斥鎖(mutex)保護分享資源的存取
  • 檢查每個網站的最後檢查時間,若超過10秒則建立新的檢查任務
  1. WebCheckerProbe 結構體
  • 定義了執行網站健康檢查所需的基本資訊
  • 包含名稱空間、主機名稱和路徑等資訊
  • 代表一個具體的檢查任務實體
  1. PerformCheck 方法
  • 實作實際的網站健康檢查邏輯
  • 使用 HTTP GET 請求檢查目標網站的可用性
  • 設定5秒的請求超時間
  • 根據 HTTP 狀態碼判斷網站是否正常運作
  • 記錄檢查結果與錯誤訊息

這段程式碼展示了一個完整的網站監控系統的核心功能:

  • 定期排程執行檢查的機制
  • 具體執行檢查的邏輯
  • 錯誤處理和結果回報
  • 使用 Go 語言的併發特性處理多個檢查任務

這樣的設計讓系統能夠有效地監控多個網站的健康狀態,並及時發現異常情況。

在開發Kubernetes控制器時,週期性檢查和狀態管理是兩個關鍵的技術要點。今天就讓玄貓帶大家探討如何實作一個具有週期性檢查功能的自定義資源控制器。

實作週期性檢查邏輯

首先,讓我們來看控制器的核心檢查邏輯:

// 週期性檢查機制
defer ticker.Stop()
for {
    select {
    case <-ticker.C:
        r.mu.RLock()
        for namespacedName, state := range r.webCheckersStates {
            now := time.Now()
            // 若資源超過10秒未檢查,建立新的檢查任務
            if now.Sub(state.LastCheckTime) > 10*time.Second {
                r.tasks <- WebCheckerProbeTask{
                    NamespacedName: namespacedName,
                    Host:           state.Host,
                    Path:           state.Path,
                }
            }
        }
        r.mu.RUnlock()
    case <-ctx.Done():
        return
    }
}
  1. defer ticker.Stop():確保在函式結束時停止計時器,避免資源洩漏
  2. select結構:同時處理計時器事件和連貫的背景與環境取消訊號
  3. 使用讀寫鎖RLock()確保在讀取狀態時的執行緒安全
  4. 檢查邏輯使用時間差值判斷,若超過10秒則建立新的檢查任務

HTTP檢查機制實作

接下來看如何實作HTTP可用性檢查:

package controller

import (
    "context"
    "fmt"
    "net/http"
    "slices"
    "time"
    "k8s.io/apimachinery/pkg/types"
)

var (
    successfulStatuses = []int{200, 300, 301, 302, 303}
)

type WebCheckerProbe struct {
    NamespacedName types.NamespacedName
    Host           string
    Path           string
}

func (p *WebCheckerProbe) PerformCheck(ctx context.Context) WebCheckerProbeResult {
    c := http.Client{
        Timeout: 1 * time.Second,
    }
    
    req, err := http.NewRequest("GET", p.Host+p.Path, nil)
    if err != nil {
        return WebCheckerProbeResult{
            NamespacedName: p.NamespacedName,
            IsSuccessful:   false,
            LastError:      err.Error(),
        }
    }
    
    resp, err := c.Do(req)
    if err != nil {
        return WebCheckerProbeResult{
            NamespacedName: p.NamespacedName,
            IsSuccessful:   false,
            LastError:      err.Error(),
        }
    }
    defer resp.Body.Close()
    
    if slices.Contains(successfulStatuses, resp.StatusCode) {
        return WebCheckerProbeResult{
            NamespacedName: p.NamespacedName,
            IsSuccessful:   true,
            LastError:      "",
        }
    }
    
    return WebCheckerProbeResult{
        NamespacedName: p.NamespacedName,
        IsSuccessful:   false,
        LastError:      fmt.Sprintf("Status code: %d", resp.StatusCode),
    }
}
  1. successfulStatuses:定義成功的HTTP狀態碼清單
  2. WebCheckerProbe結構:封裝檢查所需的基本資訊
  3. PerformCheck方法:
    • 設定1秒超時的HTTP客戶端
    • 建立並執行GET請求
    • 根據回應狀態碼判定檢查結果
    • 使用defer確保回應體正確關閉
  4. 錯誤處理:詳細記錄錯誤資訊,便於問題診斷

控制器的啟動與佈署

在主程式中設定控制器:

webCheckerController := controller.NewWebChecker(mgr.GetClient(), mgr.GetScheme())
if err = webCheckerController.SetupWithManager(mgr); err != nil {
    setupLog.Error(err, "unable to create controller", "controller", "WebChecker")
    os.Exit(1)
}
webCheckerController.RunWorkers(context.Background())

佈署測試資源:

apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
  name: flant
spec:
  host: http://flant.ru
apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
  name: badnews
spec:
  host: http://badnews.me

在實際開發過程中,玄貓發現這種設計雖然簡單,但已經能滿足基本的監控需求。不過在生產環境中,我們還需要考慮更多的最佳化點,例如連線池管理、重試機制、更細緻的錯誤處理等。特別是在高併發場景下,這些最佳化對系統的穩定性至關重要。

從玄貓多年的經驗來看,實作自定義控制器時,保持程式碼的簡潔和可維護性非常重要。雖然可以加入更多進階功能,但首要目標是確保核心功能的可靠性和可測試性。這個基礎實作為後續的功能擴充套件提供了良好的起點。

在Kubernetes生態系統中,自定義控制器扮演著越來越重要的角色。透過這樣的實作,我們不僅能更好地理解Kubernetes的運作機制,還能根據實際需求開發更符合業務場景的解決方案。後續若要擴充套件功能,建議優先考慮加入指標收集、告警機制等維運必需功能。

在多年開發和維運 Kubernetes 叢集的經驗中,玄貓發現有效監控外部服務的可用性是確保系統穩定性的關鍵。今天就讓我分享如何透過建立自定義的 Kubernetes Operator 來實作這個目標。

Operator 狀態檢查實作

讓我們先來看如何檢查我們佈署的資源狀態。以下是兩個實際的監控範例:

監控範例一:無法存取的服務

apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
  name: badnews
  namespace: default
spec:
  host: http://badnews.me
  path: /
status:
  conditions:
    - lastTransitionTime: "2025-02-04T12:33:07Z"
      message: 'Get "http://badnews.me/": dial tcp: lookup badnews.me: no such host'
      reason: WebResourceReady
      status: "False"
      type: Ready

監控範例二:正常運作的服務

apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
  name: flant
  namespace: default
spec:
  host: http://flant.ru
  path: /
status:
  conditions:
    - lastTransitionTime: "2025-02-04T12:33:02Z"
      message: WebChecker is ready
      reason: WebResourceReady
      status: "True"
      type: Ready

從這兩個範例中,我們可以清楚看到 Operator 如何監控不同狀態的服務。在第一個例子中,服務無法存取,Operator 記錄了詳細的錯誤資訊。而第二個例子顯示了正常運作的服務狀態。

實作細節與技術考量

在開發這個監控系統時,玄貓特別注意了幾個關鍵點:

  1. 狀態追蹤的精確性:我們使用 conditions 欄位來記錄服務的詳細狀態,包含最後轉換時間、具體訊息和狀態原因。

  2. 錯誤處理的完整性:當服務無法存取時,系統提供了詳細的錯誤訊息,這對於問題診斷非常重要。

  3. 即時性:透過 lastTransitionTime 欄位,我們可以準確追蹤狀態變更的時間點。

在實際佈署中,玄貓建議根據以下準則來擴充套件這個基礎框架:

  • 建立更完善的重試機制
  • 實作更細緻的錯誤分類別
  • 加入警示通知功能
  • 提供詳細的監控指標

透過這個 Operator,我們不僅可以監控簡單的 HTTP 服務,還可以擴充套件到更複雜的應用場景。例如,在我之前負責的一個大型金融科技專案中,我們就根據類別似的架構,建立了一個可以監控多種外部服務(包括資料函式庫息佇列和第三方 API)的綜合監控系統。

雖然本文展示的是一個基礎範例,但這個架構具有極大的擴充套件潛力。從我的經驗來看,這種方式特別適合需要深度整合 Kubernetes 生態系統的場景,例如多雲端環境的服務健康檢查,或是微服務架構中的依賴服務監控。

在深入實作自己的 Operator 之前,建議先充分理解 Kubernetes 的控制器模式,並仔細評估是否真的需要這樣的客製化解決方案。有時候,現有的監控工具可能已經足夠滿足需求。但如果確實需要深度整合和客製化,這個方案將是一個很好的起點。

透過這個實作經驗,我們不僅學會瞭如何建立基本的 Kubernetes Operator,更重要的是理解了如何將外部服務監控整合到 Kubernetes 的原生管理流程中。這種整合方式不僅提高了系統的可維護性,也為未來的擴充套件提供了堅實的基礎。