當企業將單體應用程式拆解為數十個甚至上百個微服務時,服務間的通訊安全立即成為資訊安全團隊必須面對的挑戰。傳統的網路邊界防護已無法有效保護這些服務之間的東西向流量,任何一個服務的安全漏洞都可能成為攻擊者橫向移動的跳板。Istio 服務網格透過在每個服務旁部署 Envoy Sidecar Proxy 的方式,將安全控制從應用程式層剝離出來,讓開發團隊能專注於業務邏輯開發,同時由基礎設施層統一處理身份驗證、流量加密與授權控制等安全機制。

零信任架構的設計理念

Istio 的安全架構建立在零信任模型之上,這個模型的核心原則是預設不信任任何請求來源。無論請求來自叢集內部的服務或是外部的客戶端,系統都會要求進行身份驗證並檢查授權許可。這種設計徹底改變了傳統的「信任但驗證」思維,轉變為「永不信任,持續驗證」的安全策略。當攻擊者即使成功滲透到叢集內部,也無法輕易地橫向移動到其他服務,因為每一次服務呼叫都需要經過嚴格的身份驗證與授權檢查。

服務網格的安全防護可以從三個層面來理解。傳輸層安全透過 mTLS 加密所有服務間的通訊內容,防止中間人攻擊與資料竊聽。身份驗證機制確保每個發起請求的實體都具有可驗證的身份,這包含 Peer Authentication 用於服務對服務的身份驗證,以及 Request Authentication 用於終端使用者的身份驗證。授權控制層則決定哪些已驗證的身份可以存取哪些資源,實現最小權限原則的存取管理。這三個層面相互配合,共同建構起完整的縱深防禦體系。

SPIFFE 身份識別框架的運作機制

SPIFFE 框架為分散式系統中的工作負載提供了標準化的身份識別方式。在 Kubernetes 環境中,每個服務帳戶都會對應到一個唯一的 SPIFFE ID,其格式遵循 spiffe://cluster.local/ns/{namespace}/sa/{serviceaccount} 的規範。Istio 的 Citadel 元件負責為每個工作負載簽發 X.509 證書,這些證書會定期自動輪換以維持安全性。透過 SPIFFE 標準,不同廠商的服務網格實作之間能夠實現互通,為多叢集與多雲端的部署場景奠定基礎。

當服務 A 需要呼叫服務 B 時,mTLS 雙向認證流程會自動在背景執行。應用程式本身完全不需要處理任何 TLS 相關邏輯,所有的加密與解密操作都由 Envoy Proxy 代為處理。服務 A 的 Envoy Proxy 會載入由 Citadel 簽發的證書與私鑰,這些憑證通常掛載在 /var/run/secrets/ 路徑下。當建立 TLS 連線時,雙方的 Envoy Proxy 會互相驗證對方證書的有效性,確認證書是否由受信任的憑證授權單位簽發,以及 SPIFFE ID 是否符合預期格式。完成驗證後,所有應用層資料都會經過加密傳輸,即使在不安全的網路環境中也能確保資料的機密性與完整性。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100

participant "服務 A\n應用程式" as AppA
participant "Envoy Proxy A" as ProxyA
participant "Envoy Proxy B" as ProxyB
participant "服務 B\n應用程式" as AppB

AppA -> ProxyA: 發起 HTTP 請求
note right of AppA
  應用程式無需處理
  任何加密邏輯
end note

ProxyA -> ProxyA: 載入 SPIFFE 證書
note right of ProxyA
  證書路徑:
  /var/run/secrets/
end note

ProxyA -> ProxyB: 建立 TLS 連線
ProxyA -> ProxyB: 傳送客戶端證書

ProxyB -> ProxyB: 驗證客戶端證書
note right of ProxyB
  檢查 CA 簽章
  檢查 SPIFFE ID
  檢查有效期限
end note

ProxyB -> ProxyA: 傳送伺服器證書

ProxyA -> ProxyA: 驗證伺服器證書
ProxyA -> ProxyB: TLS 交握完成

ProxyA -> ProxyB: 傳送加密資料

ProxyB -> ProxyB: 執行授權檢查
note right of ProxyB
  根據 AuthorizationPolicy
  決定是否允許存取
end note

alt 授權通過
    ProxyB -> AppB: 轉發請求
    AppB -> ProxyB: 回傳處理結果
    ProxyB -> ProxyA: 傳送加密回應
    ProxyA -> AppA: 解密後回傳
else 授權拒絕
    ProxyB -> ProxyA: 回傳 403 Forbidden
    note right of ProxyB
      記錄拒絕事件
      供安全稽核
    end note
end

@enduml

mTLS 配置模式與最佳實務

Istio 提供三種 mTLS 配置模式供管理者選擇。STRICT 模式要求所有流量都必須使用 mTLS 加密,這是生產環境建議的配置方式,能提供最高等級的安全防護。PERMISSIVE 模式同時接受 mTLS 與純文字流量,適合用於遷移期間讓新舊服務能夠共存。DISABLE 模式則完全關閉 mTLS 功能,僅建議在特殊測試場景使用。透過 PeerAuthentication 資源可以在網格層級、命名空間層級或工作負載層級設定這些模式,實現彈性的安全策略配置。

以下的 Python 程式碼實作了一個完整的 Istio 安全監控工具,能夠自動掃描叢集中的所有安全配置,分析 mTLS 覆蓋率,識別潛在的安全風險,並產生詳細的安全報告。這個工具支援透過 Kubernetes Python 客戶端直接存取 API Server,也可以在套件不可用時退回使用 kubectl 命令列工具,確保在各種環境下都能正常運作。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Istio 服務網格安全監控工具

提供完整的 Istio 安全配置監控功能,包括 mTLS 狀態檢測、
授權策略分析、安全風險識別與自動化報告產生。

作者:玄貓(BlackCat)
版本:1.0.0
"""

import json
import logging
import subprocess
import sys
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple

# 嘗試匯入 Kubernetes Python 客戶端
# 如果套件不可用,將使用 kubectl 命令列工具作為替代方案
try:
    from kubernetes import client, config
    from kubernetes.client.rest import ApiException
    KUBERNETES_AVAILABLE = True
except ImportError:
    KUBERNETES_AVAILABLE = False

# 配置日誌記錄器,使用模組名稱作為記錄器識別
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

class MTLSMode(Enum):
    """
    mTLS 模式列舉類別
    
    定義 Istio 支援的三種 mTLS 運作模式:
    - STRICT: 僅接受 mTLS 加密連線
    - PERMISSIVE: 同時接受 mTLS 與純文字連線
    - DISABLE: 完全停用 mTLS 功能
    - UNSET: 未明確設定,將繼承上層配置
    """
    STRICT = "STRICT"
    PERMISSIVE = "PERMISSIVE"
    DISABLE = "DISABLE"
    UNSET = "UNSET"

class AuthorizationAction(Enum):
    """授權策略動作類別"""
    ALLOW = "ALLOW"      # 允許符合規則的請求
    DENY = "DENY"        # 拒絕符合規則的請求
    CUSTOM = "CUSTOM"    # 使用外部授權服務
    AUDIT = "AUDIT"      # 僅記錄但不強制執行

class SecurityRiskLevel(Enum):
    """安全風險等級分類"""
    CRITICAL = auto()  # 嚴重:需立即處理的重大安全漏洞
    HIGH = auto()      # 高風險:重要的安全問題
    MEDIUM = auto()    # 中風險:一般性的安全改善建議
    LOW = auto()       # 低風險:輕微的最佳實務建議
    INFO = auto()      # 資訊:僅供參考的提示訊息

@dataclass
class ServiceIdentity:
    """
    服務身份識別資料類別
    
    封裝 SPIFFE ID 的組成元素,提供便利方法建構與解析服務身份。
    
    屬性:
        namespace: 服務所在的 Kubernetes 命名空間
        service_account: Kubernetes 服務帳戶名稱
        trust_domain: SPIFFE 信任網域,預設為 cluster.local
    """
    namespace: str
    service_account: str
    trust_domain: str = "cluster.local"

    @property
    def spiffe_id(self) -> str:
        """
        產生符合 SPIFFE 規範的完整身份識別碼
        
        回傳格式:spiffe://<trust_domain>/ns/<namespace>/sa/<service_account>
        """
        return (
            f"spiffe://{self.trust_domain}/"
            f"ns/{self.namespace}/"
            f"sa/{self.service_account}"
        )

    @classmethod
    def from_spiffe_id(cls, spiffe_id: str) -> Optional['ServiceIdentity']:
        """
        從 SPIFFE ID 字串解析出服務身份資訊
        
        參數:
            spiffe_id: 完整的 SPIFFE ID 字串
            
        回傳:
            ServiceIdentity 物件,解析失敗時回傳 None
        """
        try:
            if not spiffe_id.startswith("spiffe://"):
                return None

            # 解析格式:spiffe://trust_domain/ns/namespace/sa/service_account
            parts = spiffe_id[9:].split("/")
            if len(parts) != 5 or parts[1] != "ns" or parts[3] != "sa":
                return None

            return cls(
                trust_domain=parts[0],
                namespace=parts[2],
                service_account=parts[4]
            )
        except Exception as e:
            logger.warning(f"解析 SPIFFE ID 失敗:{e}")
            return None

@dataclass
class PeerAuthenticationConfig:
    """
    PeerAuthentication 配置資料類別
    
    封裝單一 PeerAuthentication 資源的完整配置資訊。
    """
    name: str
    namespace: str
    mtls_mode: MTLSMode
    selector: Optional[Dict[str, str]] = None
    port_level_mtls: Dict[int, MTLSMode] = field(default_factory=dict)

    @property
    def is_mesh_wide(self) -> bool:
        """檢查是否為網格層級的配置(位於 istio-system 命名空間且名為 default)"""
        return self.namespace == "istio-system" and self.name == "default"

    @property
    def is_namespace_wide(self) -> bool:
        """檢查是否為命名空間層級的配置(無 selector 且非網格層級)"""
        return self.selector is None and not self.is_mesh_wide

@dataclass
class AuthorizationPolicyConfig:
    """
    AuthorizationPolicy 配置資料類別
    
    封裝授權策略的完整設定,包括動作類型、目標選擇器與授權規則。
    """
    name: str
    namespace: str
    action: AuthorizationAction
    selector: Optional[Dict[str, str]] = None
    rules: List[Dict[str, Any]] = field(default_factory=list)

    @property
    def is_allow_all(self) -> bool:
        """檢查是否為允許所有請求的策略"""
        if self.action != AuthorizationAction.ALLOW:
            return False
        # 空規則代表允許所有請求
        return any(not rule for rule in self.rules)

    @property
    def is_deny_all(self) -> bool:
        """檢查是否為拒絕所有請求的策略(空 spec)"""
        return len(self.rules) == 0 and self.action != AuthorizationAction.ALLOW

@dataclass
class SecurityFinding:
    """
    安全發現資料類別
    
    記錄安全檢查過程中發現的問題、風險等級與改善建議。
    """
    title: str
    description: str
    risk_level: SecurityRiskLevel
    affected_resources: List[str] = field(default_factory=list)
    recommendation: str = ""

    def to_dict(self) -> Dict[str, Any]:
        """轉換為字典格式以便序列化輸出"""
        return {
            "title": self.title,
            "description": self.description,
            "risk_level": self.risk_level.name,
            "affected_resources": self.affected_resources,
            "recommendation": self.recommendation
        }

class IstioSecurityMonitor:
    """
    Istio 安全監控器
    
    提供完整的 Istio 安全配置監控功能,包括自動發現
    PeerAuthentication 與 AuthorizationPolicy 配置、分析
    mTLS 覆蓋率、識別安全風險並產生詳細報告。
    """

    def __init__(self, kubeconfig_path: Optional[str] = None):
        """
        初始化安全監控器
        
        參數:
            kubeconfig_path: kubeconfig 檔案路徑,None 表示使用預設配置
        """
        self._kubeconfig_path = kubeconfig_path
        self._k8s_client: Optional[client.CustomObjectsApi] = None
        self._core_v1_client: Optional[client.CoreV1Api] = None
        
        # 初始化快取與結果容器
        self._peer_auth_cache: List[PeerAuthenticationConfig] = []
        self._authz_policy_cache: List[AuthorizationPolicyConfig] = []
        self._namespaces_cache: List[str] = []
        self._findings: List[SecurityFinding] = []
        
        # 嘗試初始化 Kubernetes 客戶端
        self._initialize_k8s_client()

    def _initialize_k8s_client(self) -> None:
        """初始化 Kubernetes API 客戶端,失敗時將使用 kubectl 工具"""
        if not KUBERNETES_AVAILABLE:
            logger.info("kubernetes 套件未安裝,將使用 kubectl 命令列工具")
            return

        try:
            if self._kubeconfig_path:
                config.load_kube_config(config_file=self._kubeconfig_path)
            else:
                try:
                    # 嘗試載入叢集內配置(適用於 Pod 內執行)
                    config.load_incluster_config()
                except config.ConfigException:
                    # 退回使用預設 kubeconfig
                    config.load_kube_config()

            self._k8s_client = client.CustomObjectsApi()
            self._core_v1_client = client.CoreV1Api()
            logger.info("成功初始化 Kubernetes 客戶端")

        except Exception as e:
            logger.warning(f"初始化 Kubernetes 客戶端失敗:{e}")
            logger.info("將改用 kubectl 命令列工具")

    def _run_kubectl(
        self,
        args: List[str],
        namespace: Optional[str] = None
    ) -> Tuple[bool, str]:
        """
        執行 kubectl 命令並回傳結果
        
        參數:
            args: kubectl 命令參數列表
            namespace: 目標命名空間,None 表示所有命名空間
            
        回傳:
            (執行成功與否, 命令輸出內容) 的元組
        """
        cmd = ["kubectl"]
        
        if self._kubeconfig_path:
            cmd.extend(["--kubeconfig", self._kubeconfig_path])
        
        if namespace:
            cmd.extend(["-n", namespace])
        elif "-A" not in args and "--all-namespaces" not in args:
            if "get" in args:
                cmd.append("-A")
        
        cmd.extend(args)

        try:
            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=30
            )
            
            if result.returncode == 0:
                return True, result.stdout
            else:
                logger.error(f"kubectl 命令失敗:{result.stderr}")
                return False, result.stderr

        except subprocess.TimeoutExpired:
            logger.error("kubectl 命令執行逾時")
            return False, "命令執行逾時"
        except Exception as e:
            logger.error(f"執行 kubectl 命令時發生錯誤:{e}")
            return False, str(e)

    def discover_namespaces(self) -> List[str]:
        """
        發現叢集中的所有命名空間
        
        回傳:
            命名空間名稱列表
        """
        if self._namespaces_cache:
            return self._namespaces_cache

        namespaces = []

        if self._core_v1_client:
            try:
                ns_list = self._core_v1_client.list_namespace()
                namespaces = [ns.metadata.name for ns in ns_list.items]
            except ApiException as e:
                logger.error(f"取得命名空間列表失敗:{e}")
        else:
            success, output = self._run_kubectl(
                ["get", "namespaces", "-o", "jsonpath={.items[*].metadata.name}"]
            )
            if success:
                namespaces = output.strip().split()

        self._namespaces_cache = namespaces
        logger.info(f"發現 {len(namespaces)} 個命名空間")
        return namespaces

    def analyze_mtls_coverage(self) -> Dict[str, Any]:
        """
        分析 mTLS 覆蓋率
        
        檢查每個命名空間的 mTLS 配置狀態,計算整體覆蓋率
        並識別未受保護的區域。
        
        回傳:
            包含完整分析結果的字典
        """
        namespaces = self.discover_namespaces()
        peer_auths = self.discover_peer_authentications()

        strict_count = 0
        permissive_count = 0
        disabled_count = 0
        unset_count = 0
        namespace_status = {}

        # 尋找網格層級的 mTLS 配置
        mesh_wide_mtls = None
        for pa in peer_auths:
            if pa.is_mesh_wide:
                mesh_wide_mtls = pa.mtls_mode
                break

        for ns in namespaces:
            # 跳過系統命名空間
            if ns.startswith("kube-") or ns == "istio-system":
                continue

            ns_mtls = mesh_wide_mtls

            # 尋找命名空間層級的配置
            for pa in peer_auths:
                if pa.namespace == ns and pa.is_namespace_wide:
                    ns_mtls = pa.mtls_mode
                    break

            namespace_status[ns] = {
                "mtls_mode": ns_mtls.value if ns_mtls else "UNSET",
                "inherited": ns_mtls == mesh_wide_mtls if mesh_wide_mtls else True
            }

            # 統計各模式數量
            if ns_mtls == MTLSMode.STRICT:
                strict_count += 1
            elif ns_mtls == MTLSMode.PERMISSIVE:
                permissive_count += 1
            elif ns_mtls == MTLSMode.DISABLE:
                disabled_count += 1
            else:
                unset_count += 1

        total = strict_count + permissive_count + disabled_count + unset_count
        coverage = (strict_count / total * 100) if total > 0 else 0

        # 產生安全發現報告
        if disabled_count > 0:
            self._findings.append(SecurityFinding(
                title="偵測到已停用 mTLS 的命名空間",
                description=f"發現 {disabled_count} 個命名空間明確停用 mTLS,"
                           "服務間通訊未受加密保護。",
                risk_level=SecurityRiskLevel.CRITICAL,
                affected_resources=[
                    ns for ns, status in namespace_status.items()
                    if status["mtls_mode"] == "DISABLE"
                ],
                recommendation="建議將 mTLS 模式設定為 STRICT,確保所有服務間"
                              "通訊都經過加密與身份驗證。"
            ))

        if permissive_count > 0:
            self._findings.append(SecurityFinding(
                title="偵測到使用 PERMISSIVE 模式的命名空間",
                description=f"發現 {permissive_count} 個命名空間使用 PERMISSIVE 模式,"
                           "允許純文字流量可能存在安全風險。",
                risk_level=SecurityRiskLevel.MEDIUM,
                affected_resources=[
                    ns for ns, status in namespace_status.items()
                    if status["mtls_mode"] == "PERMISSIVE"
                ],
                recommendation="遷移完成後建議升級為 STRICT 模式。"
            ))

        return {
            "mesh_wide_mtls": mesh_wide_mtls.value if mesh_wide_mtls else None,
            "namespace_status": namespace_status,
            "statistics": {
                "strict": strict_count,
                "permissive": permissive_count,
                "disabled": disabled_count,
                "unset": unset_count,
                "total": total,
                "coverage_percentage": round(coverage, 2)
            }
        }

    def generate_security_report(self) -> Dict[str, Any]:
        """
        產生完整的安全分析報告
        
        執行所有安全檢查並彙整結果,包括 mTLS 覆蓋率分析、
        授權策略檢查與安全風險評估。
        
        回傳:
            包含完整分析結果的字典
        """
        self._findings = []
        
        mtls_analysis = self.analyze_mtls_coverage()
        authz_analysis = self.analyze_authorization_policies()

        report = {
            "report_time": datetime.now().isoformat(),
            "summary": {
                "total_findings": len(self._findings),
                "critical_findings": sum(
                    1 for f in self._findings
                    if f.risk_level == SecurityRiskLevel.CRITICAL
                ),
                "high_findings": sum(
                    1 for f in self._findings
                    if f.risk_level == SecurityRiskLevel.HIGH
                ),
                "medium_findings": sum(
                    1 for f in self._findings
                    if f.risk_level == SecurityRiskLevel.MEDIUM
                ),
                "low_findings": sum(
                    1 for f in self._findings
                    if f.risk_level == SecurityRiskLevel.LOW
                ),
                "mtls_coverage": mtls_analysis["statistics"]["coverage_percentage"]
            },
            "mtls_analysis": mtls_analysis,
            "authorization_analysis": authz_analysis,
            "findings": [f.to_dict() for f in self._findings]
        }

        return report

def main():
    """主程式進入點,執行安全監控並輸出報告"""
    print("=" * 60)
    print("Istio 服務網格安全監控工具")
    print("=" * 60)

    monitor = IstioSecurityMonitor()
    report = monitor.generate_security_report()

    print(f"\n報告產生時間:{report['report_time']}")
    print(f"mTLS 覆蓋率:{report['summary']['mtls_coverage']}%")
    print(f"總發現數量:{report['summary']['total_findings']}")

    # 儲存完整報告
    report_path = Path("istio_security_report.json")
    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(report, f, ensure_ascii=False, indent=2)

    print(f"\n完整報告已儲存至:{report_path}")

    # 根據嚴重程度決定退出碼
    if report["summary"]["critical_findings"] > 0:
        sys.exit(2)
    elif report["summary"]["high_findings"] > 0:
        sys.exit(1)
    else:
        sys.exit(0)

if __name__ == "__main__":
    main()

JWT 驗證與授權策略評估

當外部請求進入 Istio 服務網格時,JWT 原始驗證機制扮演著關鍵角色。使用者在發起請求時會在 Authorization 標頭中附加 Bearer Token,這個 Token 包含了使用者的身份資訊與權限聲明。Istio Ingress Gateway 收到請求後會檢查是否配置了 RequestAuthentication 資源,如果存在則會提取 JWT Token 並使用 JWKS 端點取得的公開金鑰驗證簽章。驗證通過後會解析 Token 內容,檢查是否過期,並將使用者聲明資訊注入到請求標頭中供後續的授權決策使用。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100

participant "使用者端" as Client
participant "Ingress Gateway" as Gateway
participant "目標服務\nEnvoy Proxy" as Proxy
participant "目標服務\n應用程式" as App

Client -> Gateway: 發起 HTTP 請求\n(附加 JWT Token)
note right of Client
  Authorization: Bearer <token>
end note

Gateway -> Gateway: 檢查 RequestAuthentication 配置

alt 配置 JWT 驗證
    Gateway -> Gateway: 提取 JWT Token
    Gateway -> Gateway: 驗證 Token 簽章
    note right of Gateway
      使用 JWKS 端點
      取得公開金鑰驗證
    end note
    
    alt 簽章有效
        Gateway -> Gateway: 檢查 Token 有效期限
        
        alt Token 未過期
            Gateway -> Gateway: 解析使用者聲明
            note right of Gateway
              提取 issuer、subject、
              audience 等資訊
            end note
            Gateway -> Proxy: 轉發請求\n(注入身份資訊)
        else Token 已過期
            Gateway -> Client: 回傳 401 Unauthorized
        end
    else 簽章無效
        Gateway -> Client: 回傳 401 Unauthorized
        note right of Gateway
          Token 可能被竄改
        end note
    end
else 無 JWT 驗證配置
    Gateway -> Proxy: 直接轉發請求
end

Proxy -> Proxy: 查詢 AuthorizationPolicy

alt 存在 DENY 策略
    Proxy -> Proxy: 評估 DENY 規則
    
    alt 符合 DENY 規則
        Proxy -> Client: 回傳 403 Forbidden
        note right of Proxy
          DENY 策略優先
          於 ALLOW 評估
        end note
    end
end

alt 存在 ALLOW 策略
    Proxy -> Proxy: 評估 ALLOW 規則
    
    alt 符合 ALLOW 規則
        Proxy -> App: 轉發請求
        App -> Proxy: 回傳處理結果
        Proxy -> Client: 回傳回應
    else 不符合規則
        Proxy -> Client: 回傳 403 Forbidden
    end
else 無授權策略
    Proxy -> App: 預設允許存取
    App -> Proxy: 回傳處理結果
    Proxy -> Client: 回傳回應
end

@enduml

授權策略的評估採用「先 DENY 後 ALLOW」的順序,這種設計確保安全性優先。即使存在允許的規則,只要請求符合任何一條 DENY 規則就會被立即拒絕。當沒有配置任何 AuthorizationPolicy 時,系統預設允許所有請求通過,但在生產環境中強烈建議為每個命名空間配置明確的授權策略,遵循最小權限原則。透過組合使用 source 來源限制、to 操作限制與 when 條件檢查,管理者可以建構出符合業務需求的精細授權策略。

以下的 Bash 腳本提供了完整的自動化部署功能,能夠快速建立 PeerAuthentication、AuthorizationPolicy 與 RequestAuthentication 等安全配置。腳本採用函式化設計,支援多種部署模式,並內建完整的錯誤處理與驗證機制。

#!/usr/bin/env bash
# ================================================================
# Istio 服務網格安全配置自動化部署腳本
# ================================================================
#
# 提供完整的 Istio 安全配置自動化部署功能,包括:
# - mTLS PeerAuthentication 配置部署
# - AuthorizationPolicy 授權策略建立
# - RequestAuthentication JWT 驗證配置
# - 安全配置狀態驗證與測試
#
# 作者:玄貓(BlackCat)
# 版本:1.0.0
# ================================================================

# 啟用嚴格模式
# -e: 任何命令失敗時立即退出
# -u: 使用未定義變數時視為錯誤
# -o pipefail: 管線中任何命令失敗時回傳失敗狀態
set -euo pipefail

# ================================================================
# 全域變數定義
# ================================================================

readonly SCRIPT_NAME="$(basename "${BASH_SOURCE[0]}")"
readonly SCRIPT_VERSION="1.0.0"

# 預設配置值
NAMESPACE="default"
MTLS_MODE="STRICT"
VERBOSE=false
DRY_RUN=false
COMMAND=""

# ANSI 色彩碼定義
readonly COLOR_RESET='\033[0m'
readonly COLOR_RED='\033[0;31m'
readonly COLOR_GREEN='\033[0;32m'
readonly COLOR_YELLOW='\033[0;33m'
readonly COLOR_BLUE='\033[0;34m'

# Istio 資源的 API 版本
readonly ISTIO_SECURITY_API="security.istio.io/v1beta1"

# ================================================================
# 日誌輸出函式
# ================================================================

# 輸出資訊訊息(藍色)
log_info() {
    echo -e "${COLOR_BLUE}[INFO]${COLOR_RESET} $1"
}

# 輸出成功訊息(綠色)
log_success() {
    echo -e "${COLOR_GREEN}[SUCCESS]${COLOR_RESET} $1"
}

# 輸出警告訊息(黃色)
log_warning() {
    echo -e "${COLOR_YELLOW}[WARNING]${COLOR_RESET} $1"
}

# 輸出錯誤訊息(紅色,導向標準錯誤輸出)
log_error() {
    echo -e "${COLOR_RED}[ERROR]${COLOR_RESET} $1" >&2
}

# ================================================================
# 工具函式
# ================================================================

# 檢查必要的命令是否可用
check_prerequisites() {
    log_info "檢查先決條件..."

    # 檢查 kubectl 是否安裝
    if ! command -v kubectl &> /dev/null; then
        log_error "找不到 kubectl 命令,請先安裝 Kubernetes CLI"
        exit 1
    fi

    # 檢查叢集連線狀態
    if ! kubectl cluster-info &> /dev/null; then
        log_error "無法連線到 Kubernetes 叢集"
        exit 1
    fi

    # 檢查 Istio 是否已安裝
    if ! kubectl get namespace istio-system &> /dev/null; then
        log_error "找不到 istio-system 命名空間,請先安裝 Istio"
        exit 1
    fi

    # 檢查 istiod 是否正在運行
    local istiod_pods
    istiod_pods=$(kubectl get pods -n istio-system -l app=istiod \
        --field-selector=status.phase=Running -o name 2>/dev/null | wc -l)

    if [[ "${istiod_pods}" -eq 0 ]]; then
        log_error "istiod 未在運行狀態"
        exit 1
    fi

    log_success "所有先決條件檢查通過"
}

# 確保命名空間存在
ensure_namespace() {
    local ns="$1"

    if ! kubectl get namespace "${ns}" &> /dev/null; then
        log_warning "命名空間 ${ns} 不存在,正在建立..."

        if [[ "${DRY_RUN}" == "true" ]]; then
            log_info "[DRY-RUN] 將建立命名空間:${ns}"
        else
            kubectl create namespace "${ns}"
            log_success "已建立命名空間:${ns}"
        fi
    fi
}

# 為命名空間啟用 Istio Sidecar 自動注入
enable_sidecar_injection() {
    local ns="$1"

    log_info "為命名空間 ${ns} 啟用 Sidecar 自動注入..."

    if [[ "${DRY_RUN}" == "true" ]]; then
        log_info "[DRY-RUN] 將添加 istio-injection=enabled 標籤"
    else
        kubectl label namespace "${ns}" istio-injection=enabled \
            --overwrite 2>/dev/null || true
        log_success "已啟用 Sidecar 自動注入"
    fi
}

# ================================================================
# YAML 產生函式
# ================================================================

# 產生 PeerAuthentication YAML 配置
generate_peer_authentication_yaml() {
    local namespace="$1"
    local mode="$2"

    # 產生 PeerAuthentication 資源定義
    # 此資源控制服務間通訊的 mTLS 行為
    cat << EOF
---
apiVersion: ${ISTIO_SECURITY_API}
kind: PeerAuthentication
metadata:
  name: default
  namespace: ${namespace}
  labels:
    app.kubernetes.io/managed-by: istio-security-script
spec:
  mtls:
    # mTLS 模式設定
    # STRICT: 僅接受 mTLS 連線,提供最高安全等級
    # PERMISSIVE: 同時接受 mTLS 與純文字連線,適合遷移期間
    # DISABLE: 停用 mTLS 功能
    mode: ${mode}
EOF
}

# 產生拒絕所有請求的 AuthorizationPolicy
generate_deny_all_policy_yaml() {
    local namespace="$1"

    # 產生預設拒絕策略,實施零信任原則
    # 空的 spec 會導致拒絕所有請求
    cat << EOF
---
apiVersion: ${ISTIO_SECURITY_API}
kind: AuthorizationPolicy
metadata:
  name: deny-all-default
  namespace: ${namespace}
  labels:
    app.kubernetes.io/managed-by: istio-security-script
    policy-type: deny-all
spec:
  # 空的 spec 表示拒絕所有未明確允許的請求
  # 這是實施零信任架構的基礎
  {}
EOF
}

# 產生 RequestAuthentication YAML 配置
generate_request_authentication_yaml() {
    local namespace="$1"
    local issuer="$2"
    local jwks_uri="$3"

    # 產生 JWT 驗證配置
    cat << EOF
---
apiVersion: ${ISTIO_SECURITY_API}
kind: RequestAuthentication
metadata:
  name: jwt-auth-default
  namespace: ${namespace}
  labels:
    app.kubernetes.io/managed-by: istio-security-script
spec:
  jwtRules:
    # JWT 發行者設定
    - issuer: "${issuer}"
      # JWKS URI 用於取得公開金鑰驗證 JWT 簽章
      jwksUri: "${jwks_uri}"
      # 轉發原始 Token 供下游服務使用
      forwardOriginalToken: true
EOF
}

# ================================================================
# 部署函式
# ================================================================

# 套用 YAML 配置到叢集
apply_yaml() {
    local yaml_content="$1"
    local description="$2"

    log_info "部署 ${description}..."

    if [[ "${DRY_RUN}" == "true" ]]; then
        log_info "[DRY-RUN] 將套用以下配置:"
        echo "${yaml_content}"
        echo "---"
    else
        echo "${yaml_content}" | kubectl apply -f -
        log_success "已部署 ${description}"
    fi
}

# 部署 mTLS 配置
deploy_mtls() {
    log_info "開始部署 mTLS 配置..."
    log_info "目標命名空間:${NAMESPACE}"
    log_info "mTLS 模式:${MTLS_MODE}"

    ensure_namespace "${NAMESPACE}"
    enable_sidecar_injection "${NAMESPACE}"

    local peer_auth_yaml
    peer_auth_yaml=$(generate_peer_authentication_yaml "${NAMESPACE}" "${MTLS_MODE}")
    apply_yaml "${peer_auth_yaml}" "PeerAuthentication (${MTLS_MODE})"

    log_success "mTLS 配置部署完成"
}

# 部署授權策略
deploy_authz() {
    log_info "開始部署授權策略..."

    ensure_namespace "${NAMESPACE}"

    local deny_all_yaml
    deny_all_yaml=$(generate_deny_all_policy_yaml "${NAMESPACE}")
    apply_yaml "${deny_all_yaml}" "AuthorizationPolicy (deny-all)"

    log_success "授權策略部署完成"
}

# 部署 JWT 驗證配置
deploy_jwt() {
    log_info "開始部署 JWT 驗證配置..."

    ensure_namespace "${NAMESPACE}"

    # 使用示範用的 JWT 發行者設定
    local issuer="https://accounts.google.com"
    local jwks_uri="https://www.googleapis.com/oauth2/v3/certs"

    local jwt_auth_yaml
    jwt_auth_yaml=$(generate_request_authentication_yaml \
        "${NAMESPACE}" \
        "${issuer}" \
        "${jwks_uri}")
    apply_yaml "${jwt_auth_yaml}" "RequestAuthentication (JWT)"

    log_success "JWT 驗證配置部署完成"
}

# 部署所有安全配置
deploy_all() {
    log_info "開始部署所有安全配置..."

    deploy_mtls
    deploy_authz
    deploy_jwt

    log_success "所有安全配置部署完成"
}

# ================================================================
# 驗證函式
# ================================================================

# 驗證安全配置狀態
verify_security() {
    log_info "開始驗證安全配置..."
    
    local has_issues=false

    # 檢查 PeerAuthentication 配置
    log_info "檢查 PeerAuthentication 配置..."
    local peer_auth_count
    peer_auth_count=$(kubectl get peerauthentication -n "${NAMESPACE}" \
        --no-headers 2>/dev/null | wc -l)

    if [[ "${peer_auth_count}" -eq 0 ]]; then
        log_warning "命名空間 ${NAMESPACE} 未配置 PeerAuthentication"
        has_issues=true
    else
        log_success "找到 ${peer_auth_count} 個 PeerAuthentication 配置"
    fi

    # 檢查 AuthorizationPolicy 配置
    log_info "檢查 AuthorizationPolicy 配置..."
    local authz_policy_count
    authz_policy_count=$(kubectl get authorizationpolicy -n "${NAMESPACE}" \
        --no-headers 2>/dev/null | wc -l)

    if [[ "${authz_policy_count}" -eq 0 ]]; then
        log_warning "命名空間 ${NAMESPACE} 未配置 AuthorizationPolicy"
        has_issues=true
    else
        log_success "找到 ${authz_policy_count} 個 AuthorizationPolicy 配置"
    fi

    # 檢查 Sidecar 注入狀態
    log_info "檢查 Sidecar 注入狀態..."
    local injection_label
    injection_label=$(kubectl get namespace "${NAMESPACE}" \
        -o jsonpath='{.metadata.labels.istio-injection}' 2>/dev/null)

    if [[ "${injection_label}" != "enabled" ]]; then
        log_warning "命名空間 ${NAMESPACE} 未啟用 Sidecar 自動注入"
        has_issues=true
    else
        log_success "Sidecar 自動注入已啟用"
    fi

    if [[ "${has_issues}" == "true" ]]; then
        log_warning "驗證完成,發現一些需要注意的問題"
        return 1
    else
        log_success "驗證完成,所有安全配置正常"
        return 0
    fi
}

# ================================================================
# 主程式
# ================================================================

# 解析命令列參數
parse_arguments() {
    while [[ $# -gt 0 ]]; do
        case "$1" in
            -n|--namespace)
                NAMESPACE="$2"
                shift 2
                ;;
            -m|--mode)
                MTLS_MODE="$2"
                case "${MTLS_MODE}" in
                    STRICT|PERMISSIVE|DISABLE) ;;
                    *)
                        log_error "無效的 mTLS 模式:${MTLS_MODE}"
                        exit 1
                        ;;
                esac
                shift 2
                ;;
            -v|--verbose)
                VERBOSE=true
                shift
                ;;
            -d|--dry-run)
                DRY_RUN=true
                shift
                ;;
            deploy-mtls|deploy-authz|deploy-jwt|deploy-all|verify)
                COMMAND="$1"
                shift
                ;;
            *)
                log_error "未知的參數:$1"
                exit 1
                ;;
        esac
    done

    if [[ -z "${COMMAND:-}" ]]; then
        log_error "請指定要執行的命令"
        exit 1
    fi
}

# 主函式
main() {
    parse_arguments "$@"

    log_info "Istio 服務網格安全配置工具 v${SCRIPT_VERSION}"
    log_info "執行命令:${COMMAND}"

    if [[ "${DRY_RUN}" == "true" ]]; then
        log_warning "模擬執行模式(不會實際變更配置)"
    fi

    check_prerequisites

    case "${COMMAND}" in
        deploy-mtls) deploy_mtls ;;
        deploy-authz) deploy_authz ;;
        deploy-jwt) deploy_jwt ;;
        deploy-all) deploy_all ;;
        verify) verify_security ;;
        *)
            log_error "未知的命令:${COMMAND}"
            exit 1
            ;;
    esac

    log_success "操作完成"
}

main "$@"

安全配置部署與驗證流程

當運維人員執行部署腳本時,系統會依序進行先決條件檢查、命名空間準備、Sidecar 注入啟用,以及各項安全資源的建立。部署完成後,Istiod 控制平面會偵測到配置變更,產生對應的 Envoy 配置並透過 xDS API 推送至所有的 Sidecar Proxy。這個過程通常在數秒內完成,確保安全策略能夠即時生效。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 14
skinparam minClassWidth 100

|運維人員|
start

:執行安全部署腳本;
note right
  ./deploy.sh -n production \
  -m STRICT deploy-all
end note

|部署腳本|
:檢查先決條件;
:驗證 kubectl 可用性;
:確認叢集連線狀態;
:檢查 Istio 安裝狀態;

if (先決條件通過?) then (是)
  :確保目標命名空間存在;
  :啟用 Sidecar 自動注入;
  
  |Kubernetes API|
  :建立 PeerAuthentication 資源;
  :設定 mTLS 模式;
  
  |部署腳本|
  :建立 AuthorizationPolicy 資源;
  
  |Kubernetes API|
  :儲存授權策略配置;
  
  |Istiod 控制平面|
  :偵測配置變更事件;
  :產生 Envoy 配置;
  :透過 xDS API 推送配置;
  
  |部署腳本|
  :執行配置驗證;
  :檢查資源建立狀態;
  
  if (驗證通過?) then (是)
    :輸出部署成功訊息;
  else (否)
    :輸出錯誤資訊;
    :提供修復建議;
  endif
else (否)
  :輸出失敗訊息;
  :提供安裝指引;
endif

stop

@enduml

在企業環境實施 Istio 安全機制時,建議採用漸進式的部署策略。初期可以在非生產環境使用 PERMISSIVE 模式進行測試,確認所有服務都已正確注入 Sidecar 且能正常通訊。待驗證完成後再逐步將 mTLS 模式切換為 STRICT,確保所有服務間通訊都經過加密保護。授權策略的配置則建議採用白名單方式,先為命名空間建立預設拒絕策略,再針對每個服務定義明確的允許規則,這種方式能有效降低未授權存取的風險。

透過 Istio 提供的完整安全機制,企業能夠在享受微服務架構帶來的敏捷性與彈性的同時,也建立起符合零信任原則的安全防護體系。從 mTLS 雙向認證確保通訊加密,到 SPIFFE 身份識別提供可驗證的服務身份,再到 AuthorizationPolicy 實現精細的存取控制,這些機制共同構成了現代雲端原生應用程式的安全基石。本文提供的監控工具與部署腳本能協助運維團隊更有效率地管理這些安全配置,確保微服務架構在快速演進的同時也能維持高標準的安全防護能力。