現代軟體開發已經從單純的程式碼撰寫演變為涵蓋整個生命週期的系統工程。從需求收集、架構設計、開發實作、測試驗證,到部署上線和維運監控,每個環節都影響著軟體的最終品質和用戶體驗。人工智慧技術的引入為這個複雜的流程帶來了新的可能性,讓我們能夠在各個環節中實現更高程度的自動化和智慧化。

本文將探討 AI 技術如何在軟體生命週期的不同階段發揮作用。我們不僅會討論理論概念,更會透過具體的程式碼範例,展示如何在實際專案中應用這些技術。特別是在維運階段,AI 驅動的日誌分析和異常檢測已經成為現代 AIOps 的核心能力,能夠幫助團隊在問題影響用戶之前及時發現並解決。

智慧需求管理與分析

軟體開發的第一步是理解和管理需求。傳統的需求分析依賴人工閱讀和整理,不僅耗時,還容易遺漏或誤解。自然語言處理技術的進步使得我們能夠自動分析需求文件,提取關鍵資訊並識別潛在問題。

需求分析中常見的挑戰包括需求不完整、需求之間存在矛盾、以及需求的優先級難以確定。AI 可以透過分析歷史專案資料,學習識別這些問題的模式,並在早期階段提出警告。這不僅節省了後期修改的成本,更重要的是提高了專案成功的機率。

# 智慧需求分析系統
# 使用 NLP 技術分析需求文件,識別問題和提取關鍵資訊

import re
from collections import defaultdict
from typing import List, Dict, Tuple, Set
from dataclasses import dataclass
from enum import Enum

class RequirementType(Enum):
    """需求類型"""
    FUNCTIONAL = "功能性需求"
    NON_FUNCTIONAL = "非功能性需求"
    CONSTRAINT = "約束條件"
    ASSUMPTION = "假設前提"

class IssueType(Enum):
    """問題類型"""
    AMBIGUOUS = "模糊不清"
    INCOMPLETE = "不完整"
    CONFLICTING = "相互矛盾"
    UNTESTABLE = "無法測試"

@dataclass
class Requirement:
    """需求資料結構"""
    id: str
    text: str
    req_type: RequirementType
    priority: int
    keywords: List[str]
    issues: List[IssueType]

class RequirementAnalyzer:
    """
    智慧需求分析器
    分析需求文件,提取資訊並識別潛在問題
    """

    def __init__(self):
        # 模糊詞彙模式
        self.ambiguous_patterns = [
            r'\b(適當|合理|足夠|必要時|盡可能|一般|大約|某些)\b',
            r'\b(等|等等|之類|其他)\b',
            r'\b(應該|可能|或許|大概)\b',
        ]

        # 可測試性指標詞彙
        self.measurable_patterns = [
            r'\d+\s*(秒|毫秒|ms|s|分鐘|小時)',  # 時間
            r'\d+\s*(MB|GB|KB|bytes)',            # 大小
            r'\d+\s*(%|百分比)',                   # 百分比
            r'\d+\s*(次|個|筆|條)',                # 數量
        ]

        # 需求類型關鍵詞
        self.type_keywords = {
            RequirementType.FUNCTIONAL: [
                '系統應', '使用者可以', '功能', '操作', '處理', '顯示', '儲存'
            ],
            RequirementType.NON_FUNCTIONAL: [
                '效能', '安全', '可用性', '擴展性', '可靠性', '回應時間'
            ],
            RequirementType.CONSTRAINT: [
                '必須', '限制', '不得', '禁止', '僅能', '只能'
            ],
            RequirementType.ASSUMPTION: [
                '假設', '前提', '預期', '預設'
            ]
        }

        # 衝突檢測關鍵詞對
        self.conflict_pairs = [
            ('即時', '批次'),
            ('自動', '手動'),
            ('本地', '雲端'),
            ('同步', '非同步'),
        ]

    def analyze_requirement(self, req_id: str, text: str) -> Requirement:
        """
        分析單一需求

        參數:
            req_id: 需求編號
            text: 需求描述文字

        返回:
            分析後的需求物件
        """
        issues = []

        # 檢查模糊性
        for pattern in self.ambiguous_patterns:
            if re.search(pattern, text):
                issues.append(IssueType.AMBIGUOUS)
                break

        # 檢查可測試性
        has_measurable = any(
            re.search(pattern, text)
            for pattern in self.measurable_patterns
        )
        if not has_measurable and '效能' in text:
            issues.append(IssueType.UNTESTABLE)

        # 檢查完整性
        if len(text) < 20 or text.count('。') == 0:
            issues.append(IssueType.INCOMPLETE)

        # 識別需求類型
        req_type = self._identify_type(text)

        # 提取關鍵詞
        keywords = self._extract_keywords(text)

        # 計算優先級(基於關鍵詞和類型)
        priority = self._calculate_priority(text, req_type)

        return Requirement(
            id=req_id,
            text=text,
            req_type=req_type,
            priority=priority,
            keywords=keywords,
            issues=issues
        )

    def _identify_type(self, text: str) -> RequirementType:
        """識別需求類型"""
        type_scores = defaultdict(int)

        for req_type, keywords in self.type_keywords.items():
            for keyword in keywords:
                if keyword in text:
                    type_scores[req_type] += 1

        if type_scores:
            return max(type_scores.keys(), key=lambda k: type_scores[k])
        return RequirementType.FUNCTIONAL

    def _extract_keywords(self, text: str) -> List[str]:
        """提取需求關鍵詞"""
        # 簡化的關鍵詞提取
        keywords = []

        # 提取動詞和名詞組合
        patterns = [
            r'(建立|刪除|修改|查詢|匯入|匯出)\s*(\w+)',
            r'(使用者|管理員|系統)\s*(可以|應該|必須)\s*(\w+)',
        ]

        for pattern in patterns:
            matches = re.findall(pattern, text)
            for match in matches:
                if isinstance(match, tuple):
                    keywords.extend([m for m in match if m])
                else:
                    keywords.append(match)

        return list(set(keywords))

    def _calculate_priority(self, text: str, req_type: RequirementType) -> int:
        """計算需求優先級 (1-5, 5 最高)"""
        priority = 3  # 預設中等優先級

        # 高優先級指標
        high_priority_keywords = ['關鍵', '核心', '必須', '緊急', '安全']
        for keyword in high_priority_keywords:
            if keyword in text:
                priority = min(5, priority + 1)

        # 低優先級指標
        low_priority_keywords = ['未來', '選擇性', '可選', '增強']
        for keyword in low_priority_keywords:
            if keyword in text:
                priority = max(1, priority - 1)

        # 約束條件通常優先級較高
        if req_type == RequirementType.CONSTRAINT:
            priority = min(5, priority + 1)

        return priority

    def detect_conflicts(self, requirements: List[Requirement]) -> List[Tuple[str, str, str]]:
        """
        檢測需求之間的潛在衝突

        參數:
            requirements: 需求列表

        返回:
            衝突列表,每個元素為 (需求1 ID, 需求2 ID, 衝突描述)
        """
        conflicts = []

        for i, req1 in enumerate(requirements):
            for req2 in requirements[i+1:]:
                # 檢查關鍵詞衝突
                for word1, word2 in self.conflict_pairs:
                    if word1 in req1.text and word2 in req2.text:
                        conflicts.append((
                            req1.id, req2.id,
                            f"可能存在 '{word1}' 與 '{word2}' 的衝突"
                        ))
                    elif word2 in req1.text and word1 in req2.text:
                        conflicts.append((
                            req1.id, req2.id,
                            f"可能存在 '{word2}' 與 '{word1}' 的衝突"
                        ))

        return conflicts

    def generate_report(self, requirements: List[Requirement]) -> str:
        """生成需求分析報告"""
        report = ["需求分析報告", "=" * 50, ""]

        # 統計摘要
        total = len(requirements)
        by_type = defaultdict(int)
        by_priority = defaultdict(int)
        issues_count = 0

        for req in requirements:
            by_type[req.req_type] += 1
            by_priority[req.priority] += 1
            if req.issues:
                issues_count += 1

        report.append("統計摘要:")
        report.append(f"  總需求數: {total}")
        report.append(f"  存在問題: {issues_count}")
        report.append("")

        report.append("按類型分布:")
        for req_type in RequirementType:
            count = by_type.get(req_type, 0)
            report.append(f"  {req_type.value}: {count}")
        report.append("")

        report.append("按優先級分布:")
        for p in range(5, 0, -1):
            count = by_priority.get(p, 0)
            report.append(f"  優先級 {p}: {count}")
        report.append("")

        # 問題清單
        report.append("發現的問題:")
        report.append("-" * 50)

        for req in requirements:
            if req.issues:
                issues_str = ', '.join([i.value for i in req.issues])
                report.append(f"\n[{req.id}] {issues_str}")
                report.append(f"  {req.text[:100]}...")

        # 衝突檢測
        conflicts = self.detect_conflicts(requirements)
        if conflicts:
            report.append("")
            report.append("潛在衝突:")
            report.append("-" * 50)
            for req1_id, req2_id, desc in conflicts:
                report.append(f"\n{req1_id} <-> {req2_id}")
                report.append(f"  {desc}")

        return "\n".join(report)

# 使用範例
analyzer = RequirementAnalyzer()

sample_requirements = [
    ("REQ-001", "系統應在適當的時間內回應使用者的查詢請求。"),
    ("REQ-002", "系統必須在 2 秒內完成登入驗證,並顯示使用者儀表板。"),
    ("REQ-003", "使用者可以即時查看銷售報表。"),
    ("REQ-004", "系統應支援批次匯出所有銷售資料。"),
    ("REQ-005", "系統安全性應符合標準。"),
]

# 分析所有需求
requirements = [
    analyzer.analyze_requirement(req_id, text)
    for req_id, text in sample_requirements
]

# 生成報告
report = analyzer.generate_report(requirements)
print(report)

這個需求分析系統展示了如何使用 NLP 技術自動分析需求文件。它能夠識別模糊的用詞、檢查需求的可測試性、分類需求類型,並檢測不同需求之間的潛在衝突。在實際應用中,這些功能可以與更強大的語言模型結合,提供更精確的分析結果。

持續整合中的智慧決策

持續整合(CI)是現代軟體開發的基石,它確保程式碼變更能夠快速且安全地整合到主幹中。然而,隨著專案規模的增長,CI 流程面臨著執行時間過長、測試覆蓋率不足等挑戰。AI 技術可以透過智慧化的決策來最佳化 CI 流程,例如預測哪些測試最可能失敗、自動選擇要執行的測試子集、或者識別可以並行執行的任務。

# 智慧 CI 決策系統
# 預測測試失敗機率並最佳化測試執行順序

import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Set
from datetime import datetime, timedelta

@dataclass
class TestCase:
    """測試案例"""
    id: str
    name: str
    duration_seconds: float
    last_failure: datetime = None
    failure_count: int = 0
    execution_count: int = 0
    affected_files: Set[str] = None

@dataclass
class CodeChange:
    """程式碼變更"""
    commit_id: str
    author: str
    changed_files: List[str]
    lines_added: int
    lines_deleted: int
    timestamp: datetime

class SmartCIOptimizer:
    """
    智慧 CI 最佳化器
    根據歷史資料和程式碼變更預測測試結果並最佳化執行順序
    """

    def __init__(self):
        self.tests: Dict[str, TestCase] = {}
        self.file_test_mapping: Dict[str, Set[str]] = {}  # 檔案 -> 相關測試

    def register_test(self, test: TestCase):
        """註冊測試案例"""
        self.tests[test.id] = test

        # 建立檔案到測試的映射
        if test.affected_files:
            for file in test.affected_files:
                if file not in self.file_test_mapping:
                    self.file_test_mapping[file] = set()
                self.file_test_mapping[file].add(test.id)

    def record_result(self, test_id: str, passed: bool, duration: float):
        """
        記錄測試執行結果

        參數:
            test_id: 測試 ID
            passed: 是否通過
            duration: 執行時間(秒)
        """
        if test_id in self.tests:
            test = self.tests[test_id]
            test.execution_count += 1
            test.duration_seconds = (
                test.duration_seconds * 0.8 + duration * 0.2
            )  # 指數移動平均

            if not passed:
                test.failure_count += 1
                test.last_failure = datetime.now()

    def calculate_failure_probability(
        self,
        test: TestCase,
        change: CodeChange
    ) -> float:
        """
        計算測試失敗的機率

        參數:
            test: 測試案例
            change: 程式碼變更

        返回:
            失敗機率 (0-1)
        """
        probability = 0.0

        # 因素 1: 歷史失敗率
        if test.execution_count > 0:
            historical_rate = test.failure_count / test.execution_count
            probability += historical_rate * 0.3

        # 因素 2: 最近失敗的時間
        if test.last_failure:
            days_since_failure = (
                datetime.now() - test.last_failure
            ).days
            recency_factor = max(0, 1 - days_since_failure / 30)
            probability += recency_factor * 0.2

        # 因素 3: 變更的檔案是否相關
        if test.affected_files:
            overlap = set(change.changed_files) & test.affected_files
            if overlap:
                # 相關檔案被修改,失敗機率增加
                relevance_factor = len(overlap) / len(test.affected_files)
                probability += relevance_factor * 0.3

        # 因素 4: 變更的規模
        total_changes = change.lines_added + change.lines_deleted
        if total_changes > 500:
            probability += 0.1
        elif total_changes > 100:
            probability += 0.05

        # 因素 5: 是否為高風險作者(根據歷史資料)
        # 這裡簡化處理,實際應該基於統計資料
        probability += 0.05

        return min(1.0, probability)

    def select_tests(
        self,
        change: CodeChange,
        time_budget_seconds: float = 3600,
        min_coverage: float = 0.8
    ) -> List[TestCase]:
        """
        智慧選擇要執行的測試

        參數:
            change: 程式碼變更
            time_budget_seconds: 時間預算(秒)
            min_coverage: 最小覆蓋率

        返回:
            選中的測試列表(按優先級排序)
        """
        # 計算每個測試的優先級分數
        test_scores = []

        for test_id, test in self.tests.items():
            failure_prob = self.calculate_failure_probability(test, change)

            # 計算效益分數 = 失敗機率 / 執行時間
            # 優先執行最可能失敗且執行時間短的測試
            if test.duration_seconds > 0:
                efficiency = failure_prob / test.duration_seconds
            else:
                efficiency = failure_prob

            # 檢查是否與變更相關
            is_related = False
            if test.affected_files:
                is_related = bool(
                    set(change.changed_files) & test.affected_files
                )

            test_scores.append({
                'test': test,
                'failure_prob': failure_prob,
                'efficiency': efficiency,
                'is_related': is_related
            })

        # 排序:相關性 > 效率
        test_scores.sort(
            key=lambda x: (not x['is_related'], -x['efficiency'])
        )

        # 在時間預算內選擇測試
        selected = []
        total_time = 0
        related_selected = 0
        total_related = sum(1 for t in test_scores if t['is_related'])

        for item in test_scores:
            test = item['test']

            # 檢查時間預算
            if total_time + test.duration_seconds > time_budget_seconds:
                # 如果是相關測試,允許超出預算
                if not item['is_related']:
                    continue

            selected.append(test)
            total_time += test.duration_seconds

            if item['is_related']:
                related_selected += 1

        # 確保最小覆蓋率
        if total_related > 0:
            coverage = related_selected / total_related
            if coverage < min_coverage:
                # 需要添加更多相關測試
                for item in test_scores:
                    if item['is_related'] and item['test'] not in selected:
                        selected.append(item['test'])
                        related_selected += 1
                        if related_selected / total_related >= min_coverage:
                            break

        return selected

    def optimize_execution_order(
        self,
        tests: List[TestCase],
        change: CodeChange
    ) -> List[TestCase]:
        """
        最佳化測試執行順序
        將最可能失敗的測試放在前面,以便更快發現問題

        參數:
            tests: 測試列表
            change: 程式碼變更

        返回:
            重新排序的測試列表
        """
        # 計算每個測試的失敗機率
        test_probs = [
            (test, self.calculate_failure_probability(test, change))
            for test in tests
        ]

        # 按失敗機率降序排列
        test_probs.sort(key=lambda x: -x[1])

        return [test for test, _ in test_probs]

    def generate_execution_plan(
        self,
        change: CodeChange,
        time_budget_seconds: float = 3600
    ) -> str:
        """生成測試執行計劃"""
        selected = self.select_tests(change, time_budget_seconds)
        optimized = self.optimize_execution_order(selected, change)

        total_time = sum(t.duration_seconds for t in optimized)

        plan = ["CI 測試執行計劃", "=" * 50, ""]
        plan.append(f"程式碼變更: {change.commit_id}")
        plan.append(f"變更檔案: {len(change.changed_files)} 個")
        plan.append(f"變更行數: +{change.lines_added} -{change.lines_deleted}")
        plan.append("")
        plan.append(f"選中測試: {len(optimized)} 個")
        plan.append(f"預估時間: {total_time:.1f} 秒")
        plan.append("")
        plan.append("執行順序:")
        plan.append("-" * 50)

        for i, test in enumerate(optimized, 1):
            prob = self.calculate_failure_probability(test, change)
            plan.append(
                f"{i:3d}. [{test.id}] {test.name}"
            )
            plan.append(
                f"     失敗機率: {prob:.1%}, 預估時間: {test.duration_seconds:.1f}s"
            )

        return "\n".join(plan)

# 使用範例
optimizer = SmartCIOptimizer()

# 註冊測試案例
tests = [
    TestCase("T001", "登入功能測試", 30.0, affected_files={"auth.py", "user.py"}),
    TestCase("T002", "資料庫連線測試", 15.0, affected_files={"db.py", "config.py"}),
    TestCase("T003", "API 端點測試", 120.0, affected_files={"api.py", "routes.py"}),
    TestCase("T004", "效能測試", 300.0, affected_files={"api.py", "db.py"}),
    TestCase("T005", "UI 整合測試", 180.0, affected_files={"frontend.js", "api.py"}),
]

for test in tests:
    optimizer.register_test(test)

# 模擬歷史執行記錄
optimizer.record_result("T001", True, 28.0)
optimizer.record_result("T001", False, 32.0)
optimizer.record_result("T002", True, 14.0)
optimizer.record_result("T003", True, 125.0)

# 建立程式碼變更
change = CodeChange(
    commit_id="abc123",
    author="developer",
    changed_files=["auth.py", "api.py"],
    lines_added=150,
    lines_deleted=50,
    timestamp=datetime.now()
)

# 生成執行計劃
plan = optimizer.generate_execution_plan(change, time_budget_seconds=600)
print(plan)

AIOps 智慧維運監控

維運階段是軟體生命週期中最長的階段,也是最能直接影響用戶體驗的階段。傳統的維運依賴人工監控和預設的閾值告警,往往只能在問題發生後才能響應。AIOps(人工智慧維運)透過機器學習技術,實現對系統行為的智慧分析,能夠預測潛在問題並在其影響用戶之前採取行動。

# AIOps 日誌分析與異常檢測系統
# 使用機器學習分析日誌並識別異常模式

import re
import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
from dataclasses import dataclass
from enum import Enum

class LogLevel(Enum):
    """日誌等級"""
    DEBUG = 0
    INFO = 1
    WARNING = 2
    ERROR = 3
    CRITICAL = 4

@dataclass
class LogEntry:
    """日誌條目"""
    timestamp: datetime
    level: LogLevel
    service: str
    message: str
    metadata: Dict = None

@dataclass
class Anomaly:
    """異常事件"""
    timestamp: datetime
    anomaly_type: str
    severity: float
    description: str
    affected_service: str
    related_logs: List[LogEntry]

class LogAnalyzer:
    """
    智慧日誌分析器
    分析日誌模式並檢測異常
    """

    def __init__(self, window_size_minutes: int = 5):
        self.window_size = timedelta(minutes=window_size_minutes)
        self.baselines: Dict[str, Dict] = {}  # 服務基準線
        self.pattern_cache: Dict[str, str] = {}  # 日誌模式快取

    def extract_pattern(self, message: str) -> str:
        """
        提取日誌訊息的模式
        將變數部分替換為佔位符

        參數:
            message: 原始日誌訊息

        返回:
            模式字串
        """
        if message in self.pattern_cache:
            return self.pattern_cache[message]

        pattern = message

        # 替換數字
        pattern = re.sub(r'\d+', '<NUM>', pattern)

        # 替換 IP 位址
        pattern = re.sub(
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
            '<IP>',
            pattern
        )

        # 替換 UUID
        pattern = re.sub(
            r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
            '<UUID>',
            pattern,
            flags=re.IGNORECASE
        )

        # 替換時間戳
        pattern = re.sub(
            r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}',
            '<TIMESTAMP>',
            pattern
        )

        # 替換路徑
        pattern = re.sub(r'/[\w/.-]+', '<PATH>', pattern)

        self.pattern_cache[message] = pattern
        return pattern

    def build_baseline(self, logs: List[LogEntry], service: str):
        """
        建立服務的正常行為基準線

        參數:
            logs: 歷史日誌
            service: 服務名稱
        """
        service_logs = [l for l in logs if l.service == service]

        if not service_logs:
            return

        # 統計各等級日誌的頻率
        level_counts = defaultdict(int)
        pattern_counts = defaultdict(int)

        for log in service_logs:
            level_counts[log.level] += 1
            pattern = self.extract_pattern(log.message)
            pattern_counts[pattern] += 1

        total = len(service_logs)

        self.baselines[service] = {
            'level_distribution': {
                level: count / total
                for level, count in level_counts.items()
            },
            'common_patterns': {
                pattern: count / total
                for pattern, count in pattern_counts.items()
                if count / total > 0.01  # 只保留出現超過 1% 的模式
            },
            'avg_rate': total  # 總日誌數作為頻率基準
        }

    def detect_anomalies(
        self,
        logs: List[LogEntry],
        threshold: float = 0.3
    ) -> List[Anomaly]:
        """
        檢測日誌中的異常

        參數:
            logs: 待分析的日誌
            threshold: 異常閾值

        返回:
            檢測到的異常列表
        """
        anomalies = []

        # 按服務分組
        by_service = defaultdict(list)
        for log in logs:
            by_service[log.service].append(log)

        for service, service_logs in by_service.items():
            if service not in self.baselines:
                continue

            baseline = self.baselines[service]
            service_anomalies = []

            # 檢測 1: 錯誤率異常
            error_anomaly = self._detect_error_rate_anomaly(
                service_logs, baseline, service, threshold
            )
            if error_anomaly:
                service_anomalies.append(error_anomaly)

            # 檢測 2: 新模式出現
            new_pattern_anomaly = self._detect_new_patterns(
                service_logs, baseline, service
            )
            if new_pattern_anomaly:
                service_anomalies.append(new_pattern_anomaly)

            # 檢測 3: 日誌頻率異常
            rate_anomaly = self._detect_rate_anomaly(
                service_logs, baseline, service, threshold
            )
            if rate_anomaly:
                service_anomalies.append(rate_anomaly)

            anomalies.extend(service_anomalies)

        return anomalies

    def _detect_error_rate_anomaly(
        self,
        logs: List[LogEntry],
        baseline: Dict,
        service: str,
        threshold: float
    ) -> Anomaly:
        """檢測錯誤率異常"""
        error_logs = [
            l for l in logs
            if l.level in [LogLevel.ERROR, LogLevel.CRITICAL]
        ]
        error_rate = len(error_logs) / len(logs) if logs else 0

        # 計算基準錯誤率
        baseline_error_rate = sum(
            baseline['level_distribution'].get(level, 0)
            for level in [LogLevel.ERROR, LogLevel.CRITICAL]
        )

        # 如果錯誤率顯著高於基準
        if error_rate > baseline_error_rate + threshold:
            severity = min(1.0, (error_rate - baseline_error_rate) / threshold)
            return Anomaly(
                timestamp=datetime.now(),
                anomaly_type="錯誤率異常",
                severity=severity,
                description=f"錯誤率 {error_rate:.1%} 高於基準 {baseline_error_rate:.1%}",
                affected_service=service,
                related_logs=error_logs[:10]  # 最多保留 10 條相關日誌
            )

        return None

    def _detect_new_patterns(
        self,
        logs: List[LogEntry],
        baseline: Dict,
        service: str
    ) -> Anomaly:
        """檢測新出現的日誌模式"""
        common_patterns = baseline['common_patterns']
        new_patterns = []

        for log in logs:
            pattern = self.extract_pattern(log.message)
            if pattern not in common_patterns:
                new_patterns.append(log)

        # 如果新模式超過 20%
        if len(new_patterns) > len(logs) * 0.2:
            return Anomaly(
                timestamp=datetime.now(),
                anomaly_type="新日誌模式",
                severity=0.5,
                description=f"發現 {len(new_patterns)} 條不常見的日誌模式",
                affected_service=service,
                related_logs=new_patterns[:10]
            )

        return None

    def _detect_rate_anomaly(
        self,
        logs: List[LogEntry],
        baseline: Dict,
        service: str,
        threshold: float
    ) -> Anomaly:
        """檢測日誌頻率異常"""
        current_rate = len(logs)
        baseline_rate = baseline['avg_rate']

        # 計算偏差
        if baseline_rate > 0:
            deviation = abs(current_rate - baseline_rate) / baseline_rate
        else:
            deviation = 1.0 if current_rate > 0 else 0

        if deviation > threshold:
            if current_rate > baseline_rate:
                description = f"日誌量激增: {current_rate} (基準: {baseline_rate})"
            else:
                description = f"日誌量驟降: {current_rate} (基準: {baseline_rate})"

            return Anomaly(
                timestamp=datetime.now(),
                anomaly_type="日誌頻率異常",
                severity=min(1.0, deviation / 2),
                description=description,
                affected_service=service,
                related_logs=logs[:5]
            )

        return None

    def generate_alert(self, anomaly: Anomaly) -> str:
        """生成異常警報"""
        severity_levels = {
            (0, 0.3): "低",
            (0.3, 0.6): "中",
            (0.6, 0.8): "高",
            (0.8, 1.1): "嚴重"
        }

        severity_text = "未知"
        for (low, high), text in severity_levels.items():
            if low <= anomaly.severity < high:
                severity_text = text
                break

        alert = [
            f"[{severity_text}] {anomaly.anomaly_type}",
            f"服務: {anomaly.affected_service}",
            f"時間: {anomaly.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
            f"描述: {anomaly.description}",
            "",
            "相關日誌:",
        ]

        for log in anomaly.related_logs[:5]:
            alert.append(f"  [{log.level.name}] {log.message[:80]}...")

        return "\n".join(alert)

# 使用範例
analyzer = LogAnalyzer(window_size_minutes=5)

# 建立模擬的歷史日誌(用於建立基準線)
historical_logs = [
    LogEntry(datetime.now() - timedelta(hours=1), LogLevel.INFO,
             "auth-service", "User login successful for user_id=12345"),
    LogEntry(datetime.now() - timedelta(hours=1), LogLevel.INFO,
             "auth-service", "Token generated for user_id=67890"),
    LogEntry(datetime.now() - timedelta(hours=1), LogLevel.DEBUG,
             "auth-service", "Cache hit for session abc-123"),
    LogEntry(datetime.now() - timedelta(hours=1), LogLevel.WARNING,
             "auth-service", "Rate limit approaching for IP 192.168.1.1"),
]

# 建立基準線
analyzer.build_baseline(historical_logs, "auth-service")

# 建立模擬的當前日誌(包含異常)
current_logs = [
    LogEntry(datetime.now(), LogLevel.ERROR,
             "auth-service", "Database connection failed"),
    LogEntry(datetime.now(), LogLevel.ERROR,
             "auth-service", "Authentication service unavailable"),
    LogEntry(datetime.now(), LogLevel.CRITICAL,
             "auth-service", "All login attempts failing"),
    LogEntry(datetime.now(), LogLevel.INFO,
             "auth-service", "User login successful for user_id=11111"),
]

# 檢測異常
anomalies = analyzer.detect_anomalies(current_logs)

print("異常檢測結果:")
print("=" * 50)
for anomaly in anomalies:
    alert = analyzer.generate_alert(anomaly)
    print(alert)
    print("-" * 50)

這個日誌分析系統展示了 AIOps 的核心功能。它首先透過分析歷史日誌建立服務的正常行為基準線,然後使用這個基準線來檢測當前日誌中的異常。系統能夠識別多種類型的異常,包括錯誤率異常、新的日誌模式出現,以及日誌頻率的異常變化。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "AI 驅動的軟體生命週期" as title

rectangle "需求階段" as req {
    rectangle "智慧需求分析"
    rectangle "衝突檢測"
    rectangle "優先級建議"
}

rectangle "開發階段" as dev {
    rectangle "程式碼生成"
    rectangle "智慧補全"
}

rectangle "CI/CD 階段" as ci {
    rectangle "智慧測試選擇"
    rectangle "執行順序最佳化"
    rectangle "失敗預測"
}

rectangle "維運階段" as ops {
    rectangle "日誌分析"
    rectangle "異常檢測"
    rectangle "效能最佳化"
}

req -[hidden]down-> dev
dev -[hidden]down-> ci
ci -[hidden]down-> ops

@enduml

效能最佳化建議系統

除了異常檢測,AI 還能透過分析系統指標,自動識別效能瓶頸並提出最佳化建議。這種主動式的效能管理能夠在問題影響用戶之前就加以解決。

透過結合多種 AI 技術,現代軟體開發團隊能夠在生命週期的每個階段都獲得智慧化的支援。從需求分析時的問題識別,到 CI 流程中的智慧測試選擇,再到維運階段的異常檢測和效能最佳化,AI 正在成為軟體開發不可或缺的夥伴。

然而,這些技術的成功應用需要持續的數據累積和模型調校。團隊需要建立完善的數據收集機制,確保模型有足夠的訓練資料。同時,也需要定期評估模型的效果,並根據實際反饋進行調整。只有這樣,AI 才能真正發揮其潛力,為軟體開發帶來實質性的改進。