在當代資訊安全的防禦體系中,即時監控使用者的登入行為已成為不可或缺的一環。每當有人嘗試存取系統時,背後都可能隱藏著正常的業務需求,或者是潛在的安全威脅。本文將帶領讀者建構一套完整的登入監控系統,透過 Python 程式語言結合正規表示式的強大功能,實現智慧化的日誌分析與異常偵測機制。同時,我們也將深入探討 Scapy 這個網路安全領域的利器,從基礎的封包捕捉到進階的協定操作,展現其在網路分析與安全測試中的多元應用。無論您是負責系統維運的工程師,或是專注於資訊安全的研究人員,都能從本文獲得實用的技術洞察與實作經驗。

登入監控系統的架構設計思維

建構一個可靠的登入監控系統,首先需要釐清系統的核心目標與運作邏輯。這類系統的主要任務是持續追蹤使用者的認證行為,識別可能的暴力破解攻擊或異常登入模式,並在發現威脅時即時採取防護措施。在 Linux 系統環境中,所有的認證活動都會被記錄在系統日誌檔案中,這些日誌就成為我們監控的主要資料來源。

系統的設計核心在於建立一套彈性的監控框架,這個框架需要能夠即時讀取日誌內容,透過預先定義的模式比對規則來識別登入事件,並根據事件的性質執行相應的處理邏輯。為了達成這個目標,我們需要定義幾個關鍵的組態參數,包括要監控的日誌檔案路徑、允許的最大失敗嘗試次數、觸發防護機制時要執行的命令等。此外,系統還需要維護一個動態的狀態追蹤機制,記錄每個使用者的失敗登入次數,以便在達到閾值時觸發警告或防護措施。

在實作層面,我們採用正規表示式來解析日誌內容。正規表示式提供了強大且彈性的文字比對能力,能夠從複雜的日誌格式中精確地擷取出使用者識別碼、來源主機位址等關鍵資訊。為了應對不同的認證機制與日誌格式,系統設計成支援多組比對模式,分別用於識別成功與失敗的登入事件。這種模組化的設計使得系統能夠輕易地擴充支援新的認證方式或日誌格式。

import os
import re
import random
import logging
from typing import Dict, List, Pattern
from pathlib import Path

class LoginMonitorConfig:
    """登入監控系統組態管理類別"""
    
    def __init__(self):
        self.log_file = Path('/var/log/auth.log')
        self.max_failed_attempts = 5
        self.shutdown_command = 'shutdown -h now'
        self.enable_voice_alert = True
        self.alert_sound_device = 'default'
        
        self.failed_login_counter: Dict[str, int] = {}
        
        self.success_patterns: List[Pattern] = [
            re.compile(
                r"authentication success.*?logname=(?P<logname>\S+)\s+"
                r"uid=(?P<user>\d+).*?ruser=(?P<ruser>\S+)\s+"
                r"rhost=(?P<host>\S+)"
            ),
            re.compile(
                r"Accepted\s+(?P<method>\S+)\s+for\s+(?P<user>\S+)\s+"
                r"from\s+(?P<host>\S+)\s+port\s+(?P<port>\d+)"
            ),
            re.compile(
                r"session opened for user (?P<user>\S+).*?by.*?\(uid=(?P<uid>\d+)\)"
            )
        ]
        
        self.failed_patterns: List[Pattern] = [
            re.compile(
                r"authentication failure.*?logname=(?P<logname>\S+)\s+"
                r"uid=(?P<user>\d+).*?ruser=(?P<ruser>\S+)\s+"
                r"rhost=(?P<host>\S+)"
            ),
            re.compile(
                r"Failed\s+(?P<method>\S+)\s+for\s+(?P<user>\S+)\s+"
                r"from\s+(?P<host>\S+)\s+port\s+(?P<port>\d+)"
            ),
            re.compile(
                r"authentication failure.*?user=(?P<user>\S+)"
            ),
            re.compile(
                r"Invalid user (?P<user>\S+) from (?P<host>\S+)"
            )
        ]
        
        self.shutdown_messages = [
            "偵測到異常登入行為,系統即將關閉以保護資料安全",
            "登入失敗次數過多,啟動防護機制",
            "未經授權的存取嘗試,系統進入保護模式",
            "安全警告,偵測到可能的暴力破解攻擊"
        ]
        
        self.setup_logging()
    
    def setup_logging(self):
        """設定日誌記錄機制"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/login_monitor.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger('LoginMonitor')

這個組態管理類別封裝了系統運作所需的所有參數與狀態資訊。透過物件導向的設計方式,我們能夠輕鬆地管理複雜的組態項目,並為未來的擴充預留了彈性空間。類別中定義的正規表示式模式涵蓋了常見的認證機制,包括 SSH 登入、PAM 認證等場景。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

package "登入監控系統架構" {
    component "日誌監控模組" as log_monitor {
        [檔案監聽器]
        [即時讀取]
        [緩衝管理]
    }
    
    component "模式比對引擎" as pattern_engine {
        [正規表示式處理]
        [事件分類]
        [資訊擷取]
    }
    
    component "狀態管理器" as state_manager {
        [失敗計數器]
        [使用者追蹤]
        [閾值檢查]
    }
    
    component "警示系統" as alert_system {
        [語音提示]
        [日誌記錄]
        [通知發送]
    }
    
    component "防護機制" as defense {
        [IP 封鎖]
        [帳號鎖定]
        [系統關閉]
    }
}

database "系統日誌" as syslog
cloud "管理員" as admin

syslog --> log_monitor: 寫入認證事件
log_monitor --> pattern_engine: 傳遞日誌行
pattern_engine --> state_manager: 更新狀態
state_manager --> alert_system: 觸發警示
state_manager --> defense: 啟動防護

alert_system --> admin: 通知
defense --> admin: 報告

note right of pattern_engine
  支援多種認證機制
  - SSH 遠端登入
  - 本地終端機登入
  - PAM 認證
  - sudo 權限提升
end note

note right of state_manager
  智慧追蹤機制
  - 每使用者獨立計數
  - 時間視窗管理
  - 自動重置邏輯
end note

@enduml

核心監控邏輯的程式實作

登入監控系統的核心在於如何處理每一條日誌記錄,並根據其內容做出正確的判斷與回應。這個處理流程需要考慮多種情境,包括遠端登入與本地登入的差異、特殊帳號如系統排程任務的過濾、以及失敗次數的累計與重置邏輯。透過精心設計的處理函式,我們能夠建立一個既準確又有效率的監控機制。

當系統捕捉到一條日誌記錄時,首先會使用預先定義的正規表示式模式進行比對。如果比對成功,就能從中擷取出使用者識別碼與來源主機等關鍵資訊。接下來,系統需要判斷這是一次成功的登入還是失敗的嘗試,並據此更新該使用者的失敗計數器。對於成功的登入事件,系統會將該使用者的失敗計數歸零,表示之前的失敗嘗試可能只是使用者忘記密碼等正常情況。而對於失敗的登入,系統會遞增計數器,並檢查是否已經達到預設的閾值。

為了提升系統的可用性與管理者的感知能力,我們整合了語音提示功能。當發生重要的認證事件時,系統會透過文字轉語音引擎發出即時的語音通知,這對於需要即時監控的場景特別有用。此外,所有的事件都會被完整地記錄在專門的監控日誌中,提供事後分析與稽核的依據。

class LoginMonitor:
    """登入監控核心類別"""
    
    def __init__(self, config: LoginMonitorConfig):
        self.config = config
        self.logger = config.logger
        
    def speak_alert(self, message: str):
        """發送語音警示"""
        if not self.config.enable_voice_alert:
            return
            
        try:
            import subprocess
            subprocess.run(
                ['espeak', '-v', 'zh', message],
                capture_output=True,
                timeout=5
            )
        except Exception as e:
            self.logger.warning(f"語音提示失敗: {e}")
    
    def analyze_log_entry(self, log_line: str, pattern: Pattern, 
                         is_failure: bool) -> bool:
        """
        分析單一日誌條目
        
        Args:
            log_line: 日誌內容
            pattern: 比對模式
            is_failure: 是否為失敗事件
            
        Returns:
            bool: 是否比對成功
        """
        match = pattern.search(log_line)
        if not match:
            return False
        
        extracted_data = match.groupdict()
        username = extracted_data.get('user', 'unknown')
        hostname = extracted_data.get('host', '')
        
        if username not in self.config.failed_login_counter:
            self.config.failed_login_counter[username] = 0
        
        if hostname and is_failure:
            alert_msg = f"使用者 {username} 從主機 {hostname} 登入失敗"
            self.logger.warning(alert_msg)
            self.speak_alert(alert_msg)
            self.config.failed_login_counter[username] += 1
            
        elif hostname and not is_failure:
            info_msg = f"使用者 {username} 從主機 {hostname} 成功登入"
            self.logger.info(info_msg)
            self.speak_alert(f"使用者 {username} 已登入系統")
            self.config.failed_login_counter[username] = 0
            
        elif username != "CRON" and is_failure:
            alert_msg = f"本地使用者 {username} 登入失敗"
            self.logger.warning(alert_msg)
            self.speak_alert(alert_msg)
            self.config.failed_login_counter[username] += 1
            
        elif username != "CRON" and not is_failure:
            info_msg = f"本地使用者 {username} 成功登入"
            self.logger.info(info_msg)
            self.speak_alert(f"使用者 {username} 已登入")
            self.config.failed_login_counter[username] = 0
        
        if self.config.failed_login_counter[username] >= self.config.max_failed_attempts:
            self.trigger_security_action(username)
        
        return True
    
    def trigger_security_action(self, username: str):
        """觸發安全防護措施"""
        warning_msg = random.choice(self.config.shutdown_messages)
        critical_msg = f"嚴重安全警告: 使用者 {username} 失敗次數達到 {self.config.max_failed_attempts} 次"
        
        self.logger.critical(critical_msg)
        self.speak_alert(warning_msg)
        
        try:
            import subprocess
            subprocess.run(
                ['wall', f'安全警告: {critical_msg}'],
                timeout=5
            )
            
            if self.config.shutdown_command:
                self.logger.critical("執行系統保護措施")
                
        except Exception as e:
            self.logger.error(f"執行防護措施時發生錯誤: {e}")
    
    def process_log_stream(self):
        """處理日誌串流"""
        self.logger.info("開始監控登入活動")
        
        try:
            with open(self.config.log_file, 'r') as log_file:
                log_file.seek(0, 2)
                
                while True:
                    line = log_file.readline()
                    if not line:
                        import time
                        time.sleep(0.1)
                        continue
                    
                    self.process_single_line(line.strip())
                    
        except KeyboardInterrupt:
            self.logger.info("監控系統正常停止")
        except Exception as e:
            self.logger.error(f"監控過程發生錯誤: {e}")
    
    def process_single_line(self, line: str):
        """處理單一日誌行"""
        for pattern in self.config.failed_patterns:
            if self.analyze_log_entry(line, pattern, is_failure=True):
                return
        
        for pattern in self.config.success_patterns:
            if self.analyze_log_entry(line, pattern, is_failure=False):
                return

這個監控類別實作了完整的事件處理邏輯,從日誌讀取到模式比對,再到警示發送與防護措施的觸發,形成了一個完整的處理鏈。程式碼中特別注意了錯誤處理與例外狀況的管理,確保監控系統能夠穩定運作。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

start

:監聽日誌檔案;

repeat
  :讀取新增日誌行;
  
  :嘗試比對失敗模式;
  
  if (比對成功?) then (是)
    :擷取使用者資訊;
    :更新失敗計數;
    
    if (來自遠端主機?) then (是)
      :記錄遠端失敗登入;
      :發送語音警示;
    else (否)
      if (非系統帳號?) then (是)
        :記錄本地失敗登入;
        :發送語音警示;
      endif
    endif
    
    if (失敗次數超過閾值?) then (是)
      :發送嚴重警告;
      :執行防護措施;
      stop
    endif
    
  else (否)
    :嘗試比對成功模式;
    
    if (比對成功?) then (是)
      :擷取使用者資訊;
      :重置失敗計數;
      
      if (來自遠端主機?) then (是)
        :記錄遠端成功登入;
        :發送通知;
      else (否)
        if (非系統帳號?) then (是)
          :記錄本地成功登入;
          :發送通知;
        endif
      endif
    endif
  endif

repeat while (繼續監控?)

stop

@enduml

Scapy 網路封包分析框架

在網路安全與協定分析的領域中,Scapy 堪稱是最具彈性與強大功能的 Python 工具之一。不同於傳統的網路分析工具如 Wireshark 或 tcpdump,Scapy 提供了程式化的介面,讓開發者能夠精確地控制封包的生成、傳送、捕捉與解析過程。這種程式化的特性使得 Scapy 特別適合用於自動化測試、安全稽核、協定研究等場景。

Scapy 的核心優勢在於其對網路協定棧的完整支援。從資料鏈結層到應用層,Scapy 內建了數百種協定的解析器與構造器。無論是經典的 TCP/IP 協定族,還是新興的物聯網通訊協定,Scapy 都能提供細緻的操作能力。更重要的是,Scapy 的設計允許使用者輕鬆地擴充新的協定支援,這對於研究專有協定或是客製化的網路應用特別有價值。

在實際應用中,Scapy 常被用於進行網路探測與安全測試。透過精心構造的封包,研究人員能夠測試目標系統對於各種異常或惡意封包的反應,從而發現潛在的安全漏洞。在協定開發階段,Scapy 也是驗證協定實作正確性的利器,開發者可以快速地生成各種測試案例,確保協定實作符合規範。此外,Scapy 也被廣泛應用在網路教學中,其直觀的 API 設計讓學習者能夠快速理解網路協定的運作原理。

from scapy.all import *
from typing import Optional, List, Dict, Any
import logging

class ScapyNetworkAnalyzer:
    """Scapy 網路分析工具類別"""
    
    def __init__(self, interface: str = "eth0"):
        self.interface = interface
        self.captured_packets: List[Packet] = []
        self.setup_logging()
        
    def setup_logging(self):
        """設定日誌系統"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger('ScapyAnalyzer')
    
    def capture_wifi_beacons(self, duration: int = 60, 
                            channel: Optional[int] = None) -> List[Packet]:
        """
        捕捉 Wi-Fi Beacon 框架
        
        Args:
            duration: 捕捉持續時間(秒)
            channel: 指定監聽頻道
            
        Returns:
            List[Packet]: 捕捉到的封包列表
        """
        self.logger.info(f"開始捕捉 Wi-Fi Beacon 框架,持續 {duration} 秒")
        
        beacon_packets = []
        
        def process_packet(pkt):
            if pkt.haslayer(Dot11Beacon):
                try:
                    ssid = pkt[Dot11Elt].info.decode('utf-8', errors='ignore')
                    bssid = pkt[Dot11].addr3
                    
                    stats = pkt[Dot11Beacon].network_stats()
                    channel_info = stats.get("channel", "N/A")
                    
                    beacon_info = {
                        'ssid': ssid,
                        'bssid': bssid,
                        'channel': channel_info,
                        'timestamp': pkt.time
                    }
                    
                    self.logger.info(f"發現 AP: {ssid} ({bssid}) - 頻道 {channel_info}")
                    beacon_packets.append(pkt)
                    
                except Exception as e:
                    self.logger.warning(f"解析 Beacon 框架時發生錯誤: {e}")
        
        try:
            sniff(
                iface=self.interface,
                prn=process_packet,
                timeout=duration,
                store=False
            )
        except Exception as e:
            self.logger.error(f"封包捕捉過程發生錯誤: {e}")
        
        self.logger.info(f"捕捉完成,共收集 {len(beacon_packets)} 個 Beacon 框架")
        return beacon_packets
    
    def analyze_dhcp_traffic(self, duration: int = 30) -> Dict[str, Any]:
        """
        分析 DHCP 流量
        
        Args:
            duration: 監聽持續時間(秒)
            
        Returns:
            Dict: DHCP 流量統計資訊
        """
        self.logger.info(f"開始分析 DHCP 流量,持續 {duration} 秒")
        
        dhcp_stats = {
            'discover': 0,
            'offer': 0,
            'request': 0,
            'ack': 0,
            'nak': 0,
            'clients': set()
        }
        
        def process_dhcp_packet(pkt):
            if DHCP in pkt:
                for opt in pkt[DHCP].options:
                    if isinstance(opt, tuple) and opt[0] == 'message-type':
                        msg_type = opt[1]
                        
                        if msg_type == 1:
                            dhcp_stats['discover'] += 1
                            mac = pkt[Ether].src
                            dhcp_stats['clients'].add(mac)
                            self.logger.info(f"DHCP Discover from {mac}")
                            
                        elif msg_type == 2:
                            dhcp_stats['offer'] += 1
                            self.logger.info("DHCP Offer detected")
                            
                        elif msg_type == 3:
                            dhcp_stats['request'] += 1
                            self.logger.info("DHCP Request detected")
                            
                        elif msg_type == 5:
                            dhcp_stats['ack'] += 1
                            if BOOTP in pkt:
                                assigned_ip = pkt[BOOTP].yiaddr
                                self.logger.info(f"DHCP ACK: IP {assigned_ip} assigned")
                                
                        elif msg_type == 6:
                            dhcp_stats['nak'] += 1
                            self.logger.warning("DHCP NAK detected")
        
        try:
            sniff(
                iface=self.interface,
                filter="udp and (port 67 or port 68)",
                prn=process_dhcp_packet,
                timeout=duration,
                store=False
            )
        except Exception as e:
            self.logger.error(f"DHCP 流量分析發生錯誤: {e}")
        
        dhcp_stats['clients'] = list(dhcp_stats['clients'])
        self.logger.info(f"DHCP 分析完成: {dhcp_stats}")
        
        return dhcp_stats
    
    def generate_dhcp_discover(self, mac_addr: Optional[str] = None) -> Packet:
        """
        生成 DHCP Discover 封包
        
        Args:
            mac_addr: 客戶端 MAC 位址(選填,預設隨機生成)
            
        Returns:
            Packet: 構造的 DHCP Discover 封包
        """
        if mac_addr is None:
            mac_addr = RandMAC()
        
        dhcp_discover = (
            Ether(dst="ff:ff:ff:ff:ff:ff", src=mac_addr) /
            IP(src="0.0.0.0", dst="255.255.255.255") /
            UDP(sport=68, dport=67) /
            BOOTP(
                chaddr=[mac2str(mac_addr)],
                xid=random.randint(1, 0xFFFFFFFF),
                flags=0x8000
            ) /
            DHCP(options=[
                ("message-type", "discover"),
                ("hostname", "scapy-client"),
                ("param_req_list", [1, 3, 6, 15, 28, 51, 58, 59]),
                "end"
            ])
        )
        
        self.logger.info(f"生成 DHCP Discover 封包: MAC={mac_addr}")
        return dhcp_discover
    
    def send_dhcp_discover(self, mac_addr: Optional[str] = None) -> List[Packet]:
        """
        發送 DHCP Discover 並等待回應
        
        Args:
            mac_addr: 客戶端 MAC 位址
            
        Returns:
            List[Packet]: 接收到的回應封包
        """
        discover_pkt = self.generate_dhcp_discover(mac_addr)
        
        self.logger.info("發送 DHCP Discover 封包")
        
        try:
            responses = srp1(
                discover_pkt,
                iface=self.interface,
                timeout=5,
                verbose=False
            )
            
            if responses:
                self.logger.info("收到 DHCP 伺服器回應")
                return responses
            else:
                self.logger.warning("未收到 DHCP 伺服器回應")
                return []
                
        except Exception as e:
            self.logger.error(f"發送 DHCP Discover 時發生錯誤: {e}")
            return []

這個 Scapy 分析工具類別展現了如何將 Scapy 的功能封裝成易於使用的介面。透過物件導向的設計,我們將複雜的封包操作簡化為直觀的方法呼叫,使得網路分析任務變得更加容易實現。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

package "Scapy 協定支援架構" {
    component "資料鏈結層" as datalink {
        [Ethernet]
        [Wi-Fi 802.11]
        [Bluetooth LE]
        [PPP]
    }
    
    component "網路層" as network {
        [IPv4]
        [IPv6]
        [ICMP]
        [ARP]
    }
    
    component "傳輸層" as transport {
        [TCP]
        [UDP]
        [SCTP]
    }
    
    component "應用層" as application {
        [HTTP]
        [DNS]
        [DHCP]
        [TLS]
    }
    
    component "特殊協定" as special {
        [GATT]
        [BTLE]
        [NTP]
        [SNMP]
    }
}

actor "分析者" as analyst
database "封包捕捉" as capture
cloud "網路環境" as network_env

analyst --> datalink: 選擇介面
datalink --> capture: 捕捉封包
network_env --> capture: 流量來源

datalink --> network: 解析
network --> transport: 解析
transport --> application: 解析

note right of datalink
  支援多種實體介面
  - 有線網路卡
  - 無線網卡(監聽模式)
  - 虛擬介面
  - Loopback
end note

note right of application
  應用層協定分析
  - 完整的協定解析
  - 欄位存取
  - 修改與重組
end note

@enduml

Wi-Fi 協定分析與應用實例

無線網路的普及使得 Wi-Fi 協定分析成為網路安全研究的重要課題。IEEE 802.11 標準定義了無線區域網路的運作方式,其中包含了多種不同類型的管理框架與控制框架。Beacon 框架是無線存取點定期廣播的訊號,其中包含了 SSID、支援的傳輸速率、加密方式等重要資訊。透過分析 Beacon 框架,我們能夠繪製出周遭的無線網路拓撲,了解各個存取點的設定與安全狀態。

關聯請求與認證框架則涉及客戶端與存取點建立連線的過程。當無線裝置嘗試連接到存取點時,會先發送關聯請求框架,其中包含了裝置的能力參數與希望使用的傳輸選項。存取點收到請求後,會回覆關聯回應框架,告知客戶端連線是否被接受。這個過程的分析對於理解無線網路的存取控制機制非常重要,也是進行無線安全測試的基礎。

Scapy 對 Wi-Fi 協定的支援使得研究人員能夠深入探索無線通訊的細節。透過將無線網卡設定為監聽模式,我們可以捕捉到網路中所有的 Wi-Fi 框架,包括那些不是發送給我們裝置的封包。這種被動式的監聽技術是無線網路分析的核心,它讓我們能夠在不影響網路正常運作的前提下,收集大量的協定互動資訊。

class WiFiAnalyzer:
    """Wi-Fi 協定分析工具"""
    
    def __init__(self, interface: str = "wlan0"):
        self.interface = interface
        self.logger = logging.getLogger('WiFiAnalyzer')
        self.access_points: Dict[str, Dict[str, Any]] = {}
        self.client_activities: List[Dict[str, Any]] = []
        
    def enable_monitor_mode(self) -> bool:
        """啟用網卡監聽模式"""
        try:
            import subprocess
            
            subprocess.run(['ip', 'link', 'set', self.interface, 'down'], check=True)
            subprocess.run(['iw', 'dev', self.interface, 'set', 'type', 'monitor'], check=True)
            subprocess.run(['ip', 'link', 'set', self.interface, 'up'], check=True)
            
            self.logger.info(f"已將 {self.interface} 設定為監聽模式")
            return True
            
        except Exception as e:
            self.logger.error(f"啟用監聽模式失敗: {e}")
            return False
    
    def analyze_beacon_frame(self, packet: Packet) -> Dict[str, Any]:
        """分析 Beacon 框架"""
        if not packet.haslayer(Dot11Beacon):
            return {}
        
        bssid = packet[Dot11].addr3
        
        try:
            ssid = packet[Dot11Elt].info.decode('utf-8', errors='ignore')
        except:
            ssid = "<Hidden SSID>"
        
        ap_info = {
            'bssid': bssid,
            'ssid': ssid,
            'timestamp': packet.time
        }
        
        current_elt = packet[Dot11Elt]
        while current_elt:
            if current_elt.ID == 3:
                ap_info['channel'] = ord(current_elt.info)
            elif current_elt.ID == 48:
                ap_info['encryption'] = 'WPA2'
            elif current_elt.ID == 221:
                if b'\x00P\xf2\x01\x01\x00' in current_elt.info:
                    ap_info['encryption'] = 'WPA'
            
            current_elt = current_elt.payload.getlayer(Dot11Elt)
        
        if bssid not in self.access_points:
            self.access_points[bssid] = ap_info
            self.logger.info(f"發現新的 AP: {ssid} ({bssid})")
        
        return ap_info
    
    def analyze_probe_request(self, packet: Packet) -> Dict[str, Any]:
        """分析探測請求框架"""
        if not packet.haslayer(Dot11ProbeReq):
            return {}
        
        client_mac = packet[Dot11].addr2
        
        try:
            requested_ssid = packet[Dot11Elt].info.decode('utf-8', errors='ignore')
        except:
            requested_ssid = "<Broadcast>"
        
        probe_info = {
            'client_mac': client_mac,
            'requested_ssid': requested_ssid,
            'timestamp': packet.time
        }
        
        self.client_activities.append(probe_info)
        self.logger.info(f"客戶端 {client_mac} 正在尋找 AP: {requested_ssid}")
        
        return probe_info
    
    def start_comprehensive_scan(self, duration: int = 120):
        """執行完整的 Wi-Fi 掃描"""
        self.logger.info(f"開始全面性 Wi-Fi 掃描,持續 {duration} 秒")
        
        if not self.enable_monitor_mode():
            self.logger.error("無法啟用監聽模式,掃描終止")
            return
        
        def packet_handler(pkt):
            if pkt.haslayer(Dot11Beacon):
                self.analyze_beacon_frame(pkt)
            elif pkt.haslayer(Dot11ProbeReq):
                self.analyze_probe_request(pkt)
            elif pkt.haslayer(Dot11AssoReq):
                client = pkt[Dot11].addr2
                ap = pkt[Dot11].addr1
                self.logger.info(f"客戶端 {client} 嘗試連接 AP {ap}")
            elif pkt.haslayer(Dot11Auth):
                client = pkt[Dot11].addr2
                ap = pkt[Dot11].addr1
                self.logger.info(f"認證過程: 客戶端 {client} <-> AP {ap}")
        
        try:
            sniff(
                iface=self.interface,
                prn=packet_handler,
                timeout=duration,
                store=False
            )
        except KeyboardInterrupt:
            self.logger.info("掃描被使用者中斷")
        except Exception as e:
            self.logger.error(f"掃描過程發生錯誤: {e}")
        
        self.generate_scan_report()
    
    def generate_scan_report(self):
        """產生掃描報告"""
        self.logger.info("\n" + "="*70)
        self.logger.info("Wi-Fi 掃描報告")
        self.logger.info("="*70)
        
        self.logger.info(f"\n發現的存取點總數: {len(self.access_points)}")
        for bssid, info in self.access_points.items():
            encryption = info.get('encryption', 'Open')
            channel = info.get('channel', 'N/A')
            ssid = info.get('ssid', 'N/A')
            self.logger.info(f"  SSID: {ssid}")
            self.logger.info(f"  BSSID: {bssid}")
            self.logger.info(f"  頻道: {channel}")
            self.logger.info(f"  加密方式: {encryption}")
            self.logger.info("-" * 50)
        
        self.logger.info(f"\n偵測到的客戶端活動: {len(self.client_activities)} 筆")

這個 Wi-Fi 分析工具展現了如何利用 Scapy 進行深入的無線網路研究。從基本的 AP 發現到客戶端行為追蹤,這些功能為無線安全研究提供了強大的基礎。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

participant "無線客戶端" as client
participant "存取點 AP" as ap
participant "Scapy 監聽器" as scapy

ap -> ap: 定期廣播 Beacon
ap -> scapy: Beacon 框架\n(SSID, 頻道, 加密)
scapy -> scapy: 記錄 AP 資訊

client -> client: 搜尋可用網路
client -> scapy: Probe Request\n(尋找特定 SSID)
scapy -> scapy: 記錄客戶端活動

client -> ap: Association Request\n(請求連線)
client -> scapy: 捕捉關聯請求
ap -> client: Association Response\n(接受/拒絕)
ap -> scapy: 捕捉關聯回應

scapy -> scapy: 分析連線參數\n能力協商\n安全機制

alt 使用加密
    client -> ap: Authentication\n(身份認證)
    client -> scapy: 捕捉認證過程
    ap -> client: Authentication Response
    ap -> scapy: 捕捉認證回應
    
    client -> ap: 4-way Handshake\n(金鑰交換)
    client -> scapy: 捕捉握手過程
end

client -> ap: 正常資料傳輸
client -> scapy: 持續監聽流量

note right of scapy
  分析收集的資訊
  - AP 配置與安全設定
  - 客戶端行為模式
  - 連線建立過程
  - 可能的安全弱點
end note

@enduml

DHCP 協定的深度剖析與操作

動態主機組態協定在現代網路環境中扮演著自動化 IP 位址分配的關鍵角色。當裝置接入網路時,透過 DHCP 協定可以自動取得 IP 位址、子網路遮罩、預設閘道與 DNS 伺服器等網路參數,大幅簡化了網路管理的複雜度。DHCP 的運作過程涉及四個主要階段,分別是發現、提供、請求與確認,這個過程通常被稱為 DORA 流程。

在發現階段,客戶端會廣播 DHCP Discover 封包,因為此時客戶端還沒有 IP 位址,所以使用 0.0.0.0 作為來源位址,並以 255.255.255.255 作為目的位址進行廣播。網路上的 DHCP 伺服器收到這個廣播後,會回覆 DHCP Offer 封包,其中包含了可用的 IP 位址與其他網路參數。客戶端可能會收到多個 DHCP 伺服器的 Offer,它會選擇其中一個進行後續的請求。

請求階段中,客戶端發送 DHCP Request 封包,明確表示希望使用特定伺服器提供的 IP 位址。這個請求同樣以廣播方式發送,讓所有的 DHCP 伺服器都能知道客戶端的選擇。被選中的伺服器會回覆 DHCP ACK 封包進行確認,完成 IP 位址的分配。如果配置過程中出現問題,伺服器則會回覆 DHCP NAK 封包,表示無法完成配置。

class DHCPAnalyzer:
    """DHCP 協定分析與測試工具"""
    
    def __init__(self, interface: str = "eth0"):
        self.interface = interface
        self.logger = logging.getLogger('DHCPAnalyzer')
        self.dhcp_servers: Dict[str, Dict[str, Any]] = {}
        
    def craft_dhcp_discover(self, hostname: str = "test-client",
                           vendor_class: str = "Scapy") -> Packet:
        """
        構造 DHCP Discover 封包
        
        Args:
            hostname: 客戶端主機名稱
            vendor_class: 廠商類別識別
            
        Returns:
            Packet: 完整的 DHCP Discover 封包
        """
        client_mac = RandMAC()
        transaction_id = random.randint(1, 0xFFFFFFFF)
        
        discover_packet = (
            Ether(dst="ff:ff:ff:ff:ff:ff", src=str(client_mac)) /
            IP(src="0.0.0.0", dst="255.255.255.255") /
            UDP(sport=68, dport=67) /
            BOOTP(
                op=1,
                chaddr=mac2str(str(client_mac)),
                xid=transaction_id,
                flags=0x8000
            ) /
            DHCP(options=[
                ("message-type", "discover"),
                ("hostname", hostname),
                ("vendor_class_id", vendor_class),
                ("param_req_list", [
                    1,   # Subnet Mask
                    3,   # Router
                    6,   # Domain Name Server
                    15,  # Domain Name
                    28,  # Broadcast Address
                    51,  # IP Address Lease Time
                    58,  # Renewal Time
                    59,  # Rebinding Time
                ]),
                "end"
            ])
        )
        
        self.logger.info(f"構造 DHCP Discover: MAC={client_mac}, XID={hex(transaction_id)}")
        return discover_packet
    
    def perform_dhcp_discovery(self, timeout: int = 10) -> List[Dict[str, Any]]:
        """
        執行完整的 DHCP 發現流程
        
        Args:
            timeout: 等待回應的超時時間
            
        Returns:
            List[Dict]: 發現的 DHCP 伺服器資訊列表
        """
        discover_pkt = self.craft_dhcp_discover()
        
        self.logger.info("發送 DHCP Discover 封包")
        
        try:
            responses, unanswered = srp(
                discover_pkt,
                iface=self.interface,
                timeout=timeout,
                multi=True,
                verbose=False
            )
            
            discovered_servers = []
            
            for sent, received in responses:
                if DHCP in received:
                    server_info = self.parse_dhcp_offer(received)
                    if server_info:
                        discovered_servers.append(server_info)
                        
                        server_ip = server_info['server_ip']
                        if server_ip not in self.dhcp_servers:
                            self.dhcp_servers[server_ip] = server_info
                            self.logger.info(f"發現 DHCP 伺服器: {server_ip}")
            
            if not discovered_servers:
                self.logger.warning("未發現任何 DHCP 伺服器回應")
            
            return discovered_servers
            
        except Exception as e:
            self.logger.error(f"DHCP 發現過程發生錯誤: {e}")
            return []
    
    def parse_dhcp_offer(self, packet: Packet) -> Optional[Dict[str, Any]]:
        """解析 DHCP Offer 封包"""
        if not (DHCP in packet and BOOTP in packet):
            return None
        
        server_info = {
            'server_ip': packet[IP].src,
            'offered_ip': packet[BOOTP].yiaddr,
            'transaction_id': packet[BOOTP].xid,
            'options': {}
        }
        
        for option in packet[DHCP].options:
            if isinstance(option, tuple):
                opt_name, opt_value = option
                
                if opt_name == 'subnet_mask':
                    server_info['options']['subnet_mask'] = opt_value
                elif opt_name == 'router':
                    server_info['options']['gateway'] = opt_value
                elif opt_name == 'name_server':
                    server_info['options']['dns_servers'] = opt_value
                elif opt_name == 'domain':
                    server_info['options']['domain_name'] = opt_value
                elif opt_name == 'lease_time':
                    server_info['options']['lease_time'] = opt_value
                elif opt_name == 'server_id':
                    server_info['server_identifier'] = opt_value
        
        self.logger.info(f"解析 DHCP Offer: 提供 IP {server_info['offered_ip']}")
        return server_info
    
    def test_dhcp_starvation(self, count: int = 10, delay: float = 0.1):
        """
        測試 DHCP 資源消耗(僅供授權測試使用)
        
        Warning: 此功能僅供在受控環境中進行安全測試
        """
        self.logger.warning(f"開始 DHCP 壓力測試,將發送 {count} 個請求")
        
        for i in range(count):
            fake_mac = RandMAC()
            
            dhcp_request = (
                Ether(dst="ff:ff:ff:ff:ff:ff", src=str(fake_mac)) /
                IP(src="0.0.0.0", dst="255.255.255.255") /
                UDP(sport=68, dport=67) /
                BOOTP(
                    chaddr=mac2str(str(fake_mac)),
                    xid=random.randint(1, 0xFFFFFFFF),
                    flags=0x8000
                ) /
                DHCP(options=[
                    ("message-type", "discover"),
                    ("hostname", f"test-client-{i}"),
                    "end"
                ])
            )
            
            sendp(dhcp_request, iface=self.interface, verbose=False)
            
            if (i + 1) % 10 == 0:
                self.logger.info(f"已發送 {i + 1} 個 DHCP 請求")
            
            time.sleep(delay)
        
        self.logger.info(f"DHCP 壓力測試完成,共發送 {count} 個請求")

這個 DHCP 分析工具提供了從基本的協定互動到進階的安全測試功能。透過程式化的方式操作 DHCP 協定,研究人員能夠深入了解 DHCP 的運作細節,並評估網路環境中 DHCP 服務的安全性與可靠性。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

participant "DHCP 客戶端" as client
participant "網路廣播域" as network
participant "DHCP 伺服器 A" as server_a
participant "DHCP 伺服器 B" as server_b

== DHCP 發現階段 ==

client -> network: DHCP Discover\n(廣播 255.255.255.255)\nMAC: XX:XX:XX:XX:XX:XX\nXID: 0x12345678

network -> server_a: 轉發 Discover
network -> server_b: 轉發 Discover

note right of client
  客戶端資訊
  來源 IP: 0.0.0.0
  目的 IP: 255.255.255.255
  來源埠: 68
  目的埠: 67
  Transaction ID: 隨機
end note

== DHCP 提供階段 ==

server_a -> network: DHCP Offer\n提供 IP: 192.168.1.100\n租期: 86400 秒
server_b -> network: DHCP Offer\n提供 IP: 192.168.1.101\n租期: 86400 秒

network -> client: 轉發 Offer (伺服器 A)
network -> client: 轉發 Offer (伺服器 B)

client -> client: 選擇伺服器\n(通常選擇第一個回應)

note right of server_a
  Offer 內容
  - 提供的 IP 位址
  - 子網路遮罩
  - 預設閘道
  - DNS 伺服器
  - 租約時間
  - 伺服器識別碼
end note

== DHCP 請求階段 ==

client -> network: DHCP Request\n(廣播)\n請求 IP: 192.168.1.100\n伺服器: Server A

network -> server_a: 轉發 Request
network -> server_b: 轉發 Request

note right of client
  Request 訊息
  表明選擇的伺服器
  其他伺服器會釋放
  預留的 IP 位址
end note

== DHCP 確認階段 ==

server_a -> network: DHCP ACK\n確認配置\nIP: 192.168.1.100

network -> client: 轉發 ACK

client -> client: 套用網路配置\n開始使用 IP 位址

note right of server_a
  ACK 確認
  - 正式分配 IP
  - 記錄租約資訊
  - 啟動租期計時
end note

server_b -> server_b: 釋放預留的\n192.168.1.101

@enduml

Scapy 在現代網路安全中的應用趨勢

隨著網路技術的快速演進,Scapy 的應用範疇也在不斷擴展。物聯網裝置的普及帶來了大量新興的通訊協定,這些協定往往缺乏完善的分析工具,Scapy 的可擴充性使其成為研究這些新協定的理想選擇。藍牙低功耗通訊協定就是一個典型的例子,隨著智慧型穿戴裝置與智慧家居產品的興起,BLE 協定的安全性研究變得日益重要。Scapy 對 BTLE 與 GATT 協定的支援,讓研究人員能夠深入分析這些裝置的通訊行為,發現潛在的隱私洩露或安全漏洞。

在軟體定義網路與網路功能虛擬化的浪潮中,Scapy 也找到了新的應用場景。網路工程師可以使用 Scapy 來模擬各種網路場景,測試 SDN 控制器的行為,或是驗證虛擬化網路功能的正確性。相較於傳統的硬體測試環境,使用 Scapy 進行軟體化的測試不僅成本更低,而且更具彈性,能夠快速地構造各種測試案例。

容器化技術與微服務架構的普及也為 Scapy 帶來了新的應用機會。在這些分散式系統中,服務之間的網路通訊變得更加複雜,故障排查與效能分析都需要深入的封包層級洞察。Scapy 可以被整合到監控系統中,提供即時的網路流量分析能力。此外,在自動化測試與持續整合的流程中,Scapy 也能夠扮演重要的角色,用於驗證網路服務的功能與效能指標。

登入監控系統與網路協定分析代表了資訊安全防禦體系中兩個不同但互補的面向。前者關注於應用層的使用者行為,透過日誌分析來識別異常的認證模式,是防範帳號竊取與暴力破解的第一道防線。後者則深入到網路層面,透過封包分析來理解通訊協定的運作細節,這對於發現協定層級的漏洞與進行深度安全稽核至關重要。將這兩種技術結合運用,能夠建立起更全面的安全監控能力,從應用層到網路層提供多層次的防護。本文透過實際的程式碼範例與詳細的技術解析,展現了這些技術的實作細節與應用潛力,期望能為讀者在資訊安全領域的實務工作提供有價值的參考。