在網路安全領域,埠掃描是不可或缺的技術手段。本文將深入探討如何利用 Python 的多程式和多執行緒機制,構建高效的埠掃描器。我們將分析不同方案的實作細節,並探討全域性直譯器鎖(GIL)對多執行緒效能的影響,並提供使用 Queue 進行結果收集的實戰技巧,最後簡要介紹生成器和協程的應用。

結合Queue進行結果收集

在上面的範例中,掃描結果直接列印到控制檯。然而,在實際應用中,可能需要收集掃描結果進行進一步的處理。這時可以使用Queue來收集結果。

import multiprocessing
import socket
import time
from queue import Queue

def check_port(host, port, results):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(1.0)
    try:
        sock.connect((host, port))
        results.put(port)
    except socket.error:
        pass
    finally:
        sock.close()

def scan_ports(host, start_port, end_port):
    processes = []
    results = multiprocessing.Manager().Queue()
    for port in range(start_port, end_port + 1):
        p = multiprocessing.Process(target=check_port, args=(host, port, results))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    open_ports = []
    while not results.empty():
        open_ports.append(results.get())

    return open_ports

if __name__ == "__main__":
    host = "example.com"
    start_port = 80
    end_port = 100
    start_time = time.time()
    open_ports = scan_ports(host, start_port, end_port)
    print(f"Open ports: {open_ports}")
    print(f"Scan completed in {time.time() - start_time} seconds")

在這個範例中,使用了multiprocessing.Manager().Queue()建立了一個可以被多個程式分享的Queue。掃描結果被放入Queue中,然後在主程式中收集結果。

瞭解多程式掃描器

在上一節中,我們實作了一個基本的埠掃描器,使用單執行緒的方式掃描主機的埠。然而,這種方法效率不高,尤其是在掃描大量埠時。為了提高效率,我們可以使用多程式的方式來掃描埠。

多程式掃描器實作

下面是使用多程式的方式實作的埠掃描器:

import multiprocessing as mp
import time
import socket

def check_port(host: str, port: int, results: mp.Queue):
    """
    檢查指定主機的指定埠是否開啟。

    :param host: 主機名稱或IP地址
    :param port: 埠號
    :param results: 結果佇列
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(1)  # 設定超時時間為1秒

    try:
        sock.connect((host, port))
        results.put(port)  # 如果連線成功,將埠號加入結果佇列
    except socket.error:
        pass  # 如果連線失敗,忽略錯誤

    sock.close()

if __name__ == '__main__':
    start = time.time()
    host = "localhost"  # 更改為您要掃描的主機名稱或IP地址
    results = mp.Queue()  # 建立結果佇列

    # 建立多程式池
    with mp.Pool(processes=10) as pool:
        # 將掃描任務提交到多程式池
        pool.starmap(check_port, [(host, port, results) for port in range(80, 100)])

    # 取出結果佇列中的所有結果
    open_ports = []
    while not results.empty():
        open_ports.append(results.get())

    print("開啟的埠:", open_ports)
    print("掃描完成,耗時 {:.2f} 秒".format(time.time() - start))

程式碼解釋

  1. 我們首先匯入必要的模組,包括 multiprocessingtimesocket
  2. 我們定義了一個 check_port 函式,該函式檢查指定主機的指定埠是否開啟。如果連線成功,將埠號加入結果佇列。
  3. if __name__ == '__main__': 區塊中,我們建立了一個結果佇列 results,並設定了主機名稱或IP地址 host
  4. 我們建立了一個多程式池 pool,並將掃描任務提交到多程式池。每個任務都是 check_port 函式,傳入主機名稱、埠號和結果佇列作為引數。
  5. 掃描完成後,我們取出結果佇列中的所有結果,並列印預出開啟的埠號和掃描耗時。

優點

使用多程式的方式可以大大提高掃描效率,尤其是在掃描大量埠時。同時,多程式也可以幫助我們避免單執行緒的阻塞問題。

多程式掃描埠

在網路安全和系統管理中,掃描埠是一項重要的工作,能夠幫助我們瞭解哪些服務正在執行以及哪些埠是開放的。Python 的 multiprocessing 模組提供了一種實作多程式平行的方法,可以大大提高掃描效率。

實作原理

多程式平行的基本思想是建立多個程式,每個程式負責掃描一部分埠。這樣可以充分利用多核 CPU 的資源,從而提高掃描速度。

程式碼實作

import multiprocessing as mp
import socket
import time

def check_port(host, port, outputs):
    """
    檢查指定主機的指定埠是否開放。
    
    :param host: 主機地址
    :param port: 埠號
    :param outputs: 掃描結果佇列
    :return: None
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(1)  # 設定超時時間
    
    try:
        sock.connect((host, port))
        outputs.put(port)  # 如果連線成功,將埠號放入佇列
    except socket.error:
        pass  # 如果連線失敗,直接忽略
    
    sock.close()

if __name__ == '__main__':
    start = time.time()
    
    # 建立一個程式池,大小為掃描埠範圍
    scan_range = range(80, 100)
    host = "localhost"  # 請替換為您要掃描的主機
    
    # 初始化程式池和佇列
    mp.set_start_method('spawn')
    pool_manager = mp.Manager()
    outputs = pool_manager.Queue()
    
    with mp.Pool(len(scan_range)) as pool:
        processes = []
        
        # 對每個埠建立一個程式進行掃描
        for port in scan_range:
            processes.append(pool.apply_async(check_port, (host, port, outputs)))
        
        # 等待所有程式完成
        for process in processes:
            process.wait()
    
    # 取出掃描結果
    open_ports = []
    while not outputs.empty():
        open_ports.append(outputs.get())
    
    print("開放的埠:", open_ports)
    print("掃描時間:", time.time() - start)

解釋

  1. 我們定義了一個 check_port 函式,負責檢查指定主機的指定埠是否開放。如果連線成功,將埠號放入佇列中。
  2. 在主程式中,我們建立了一個程式池,大小為掃描埠範圍。然後,我們對每個埠建立一個程式進行掃描。
  3. 掃描完成後,我們取出掃描結果,並列印預出開放的埠號以及掃描時間。

圖表翻譯

  flowchart TD
    A[開始] --> B[建立程式池]
    B --> C[對每個埠建立程式]
    C --> D[等待所有程式完成]
    D --> E[取出掃描結果]
    E --> F[列印結果]

圖表說明

上述流程圖描述了多程式掃描埠的過程。首先,我們建立一個程式池,然後對每個埠建立一個程式進行掃描。等待所有程式完成後,我們取出掃描結果,並列印預出開放的埠號以及掃描時間。

多執行緒與多程式:Python 併發程式設計

在 Python 中,實作併發程式設計可以透過多執行緒(multithreading)和多程式(multiprocessing)兩種方式。多執行緒適用於 I/O 密集型任務,而多程式則適用於 CPU 密集型任務。

多程式

Python 的 multiprocessing 模組提供了一個高階介面來建立和管理程式。每個程式都有自己的記憶體空間,互相之間不分享資料。

示例:使用多程式掃描埠

import multiprocessing
import socket

def scan_port(port):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect(("example.com", port))
        print(f"Port {port} is open")
    except socket.error:
        pass

if __name__ == "__main__":
    ports = [80, 81, 82, 83, 84]
    with multiprocessing.Pool() as pool:
        pool.map(scan_port, ports)

多執行緒

Python 的 threading 模組提供了一個高階介面來建立和管理執行緒。由於 GIL(全域性直譯器鎖)的存在,多執行緒在 CPU 密集型任務中可能不會帶來顯著的效能提升。

示例:使用多執行緒掃描埠

import threading
import socket

def scan_port(port):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect(("example.com", port))
        print(f"Port {port} is open")
    except socket.error:
        pass

threads = []
ports = [80, 81, 82, 83, 84]

for port in ports:
    thread = threading.Thread(target=scan_port, args=(port,))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

GIL 限制

GIL 是 Python 直譯器中的一個鎖,它確保了任何時候只有一個執行緒能夠執行 Python 位元組碼。這意味著,即使在多核 CPU 上執行多執行緒程式,也可能不會獲得預期的效能提升。

  • 多程式適用於 CPU 密集型任務。
  • 多執行緒適用於 I/O 密集型任務,但受 GIL 限制。
  • 選擇合適的併發模型以獲得最佳效能。

多執行緒技術在Python中的應用

Python的多執行緒技術可以用於提高程式的效率和回應速度。下面是一個使用多執行緒技術的簡單網路掃描器的例子。

網路掃描器的實作

首先,我們需要匯入必要的模組,包括threadingqueuesockettime。然後,我們定義一個函式check_port,用於檢查一個埠是否開放。

import socket
import threading
from queue import Queue
import time

timeout = 1.0

def check_port(host: str, port: int, results: Queue):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    if result == 0:
        results.put(port)
    sock.close()

多執行緒的實作

接下來,我們建立一個列表threads,用於儲存建立的執行緒。然後,我們迴圈遍歷要掃描的埠範圍,為每個埠建立一個執行緒,並啟動它。

host = '127.0.0.1'
ports = range(80, 100)
threads = []
results = Queue()

for port in ports:
    t = threading.Thread(target=check_port, args=(host, port, results))
    t.start()
    threads.append(t)

等待執行緒完成

建立並啟動所有執行緒後,我們需要等待它們完成。這可以透過迴圈遍歷threads列表,並對每個執行緒呼叫join()方法來實作。

for t in threads:
    t.join()

輸出結果

最後,我們可以從佇列results中取出掃描結果,並輸出開放的埠。

while not results.empty():
    print("Port {} is open".format(results.get()))

完整程式碼

以下是完整的網路掃描器程式碼:

import socket
import threading
from queue import Queue
import time

timeout = 1.0

def check_port(host: str, port: int, results: Queue):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex((host, port))
    if result == 0:
        results.put(port)
    sock.close()

host = '127.0.0.1'
ports = range(80, 100)
threads = []
results = Queue()

for port in ports:
    t = threading.Thread(target=check_port, args=(host, port, results))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

while not results.empty():
    print("Port {} is open".format(results.get()))

這個程式碼示範瞭如何使用Python的多執行緒技術來實作一個簡單的網路掃描器。透過建立多個執行緒來掃描不同的埠,可以大大提高掃描的效率。

多執行緒埠掃描器

簡介

本文將介紹如何使用 Python 實作一個多執行緒的埠掃描器。這個工具可以幫助我們快速地掃描目標主機的開放埠。

程式碼

import socket
import threading
import time
from queue import Queue

def check_port(host, port, results):
    """
    檢查指定的埠是否開放
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(1)
    try:
        sock.connect((host, port))
        results.put(port)
    except socket.error:
        pass
    finally:
        sock.close()

def main():
    start = time.time()
    host = "localhost"  # 請替換成你的主機名稱或 IP 地址
    threads = []
    results = Queue()

    for port in range(80, 100):
        t = threading.Thread(target=check_port, args=(host, port, results))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    while not results.empty():
        print(f"Port {results.get()} is open")

    print(f"Completed scan in {time.time() - start} seconds")

if __name__ == '__main__':
    main()

執行結果

執行這個指令碼後,你會看到如下輸出:

Port 80 is open
Port 90 is open
Completed scan in 0.123 seconds

這表示埠 80 和 90 是開放的,掃描過程花費了約 0.123 秒。

效能比較

與單執行緒版本相比,多執行緒版本的掃描速度大約快了 10 倍。這是因為多執行緒可以同時檢查多個埠,而單執行緒版本需要順序地檢查每個埠。

多執行緒與GIL

在Python中,多執行緒(Multithreading)是一種允許多個執行緒同時執行的機制。然而,Python的Global Interpreter Lock(GIL)會限制多個執行緒同時執行Python bytecodes。這意味著,即使在多核心的CPU上,Python的多執行緒也不能真正地平行執行。

GIL的作用

GIL是一種鎖定機制,確保在任何時刻,只有一個執行緒可以執行Python bytecodes。這是因為Python的內部資料結構不是執行緒安全的,如果多個執行緒同時存取這些資料結構,可能會導致資料損壞或其他不可預測的行為。

如何繞過GIL

雖然GIL限制了多執行緒的平行性,但是在某些情況下,仍然可以繞過GIL。例如,在執行I/O操作(如讀寫檔案、網路請求等)時,GIL會被釋放,允許其他執行緒執行。這是因為I/O操作通常需要等待外部資源(如磁碟、網路等),在這段時間內,GIL可以被釋放,允許其他執行緒執行。

Py_BEGIN_ALLOW_THREADS和Py_END_ALLOW_THREADS

在Python的C擴充套件中,可以使用Py_BEGIN_ALLOW_THREADSPy_END_ALLOW_THREADS兩個宏來暫時釋放GIL。這允許其他執行緒在這段時間內執行Python bytecodes。

Py_BEGIN_ALLOW_THREADS
// 執行I/O操作或其他不需要GIL的程式碼
Py_END_ALLOW_THREADS
內容解密
  • GIL是一種鎖定機制,確保在任何時刻,只有一個執行緒可以執行Python bytecodes。
  • 在執行I/O操作時,GIL會被釋放,允許其他執行緒執行。
  • 使用Py_BEGIN_ALLOW_THREADSPy_END_ALLOW_THREADS宏可以暫時釋放GIL,允許其他執行緒執行Python bytecodes。

圖表翻譯

  flowchart TD
    A[開始] --> B[執行I/O操作]
    B --> C[釋放GIL]
    C --> D[允許其他執行緒執行]
    D --> E[還原GIL]
    E --> F[結束]
  • 圖表描述了在Python中,如何繞過GIL,在執行I/O操作時暫時釋放GIL,允許其他執行緒執行Python bytecodes。

多執行緒與Python

Python的多執行緒機制允許開發者建立多個執行緒,以便同時執行多個任務。每個執行緒都有自己的狀態和堆積疊,且可以獨立執行。

執行緒狀態

執行緒狀態(PyThreadState)包含了多個重要的屬性,包括:

  • 唯一識別符(ID)
  • 執行緒的執行框架(frame)
  • 當前異常(exception)
  • 執行緒的深度(recursion depth)
  • 可選的追蹤函式(trace function)
  • 執行緒鎖(GIL)計數器
  • 非同步生成器計數器

執行緒建立

Python的執行緒是透過建立threading.Thread類別的例項來建立的。這個類別封裝了底層的PyThread類別,並提供了一個更高階別的介面來建立和管理執行緒。

建立一個新執行緒的過程包括以下步驟:

  1. 建立一個初始化狀態(bootstate),它與目標函式(target)和引數(args和kwargs)相關聯。
  2. 將初始化狀態與直譯器狀態(interpreter state)相關聯。
  3. 建立一個新的PyThreadState例項,並將其與當前的直譯器相關聯。
  4. 如果尚未啟用GIL鎖,則啟用它。
  5. 啟動一個新的執行緒,使用適合作業系統的實作。

執行緒同步

Python使用GIL(全域性直譯器鎖)來同步對分享資源的存取。GIL確保在任意時刻,只有一個執行緒可以執行Python位元組碼。這可以防止多個執行緒同時存取分享資源,並減少同步的複雜性。

然而,GIL也可能成為效能瓶頸,特別是在CPU密集型任務中。為了緩解這個問題,Python提供了其他同步機制,例如鎖(locks)和訊號量(semaphores),以便在多個執行緒之間進行通訊和同步。

多執行緒與非同步程式設計

Python 的多執行緒機制允許開發者建立多個執行緒,以便更有效地利用系統資源。然而,Python 的全域鎖機制(GIL)可能會限制多執行緒的效率。

多執行緒

Python 的 threading 模組提供了建立和管理執行緒的功能。建立一個新的執行緒需要指定一個目標函式和可選的引數。執行緒可以使用 start() 方法啟動,使用 join() 方法等待其完成。

import threading

def worker(num):
    print(f"Worker {num} started")
    # 執行緒工作
    print(f"Worker {num} finished")

# 建立兩個執行緒
thread1 = threading.Thread(target=worker, args=(1,))
thread2 = threading.Thread(target=worker, args=(2,))

# 啟動執行緒
thread1.start()
thread2.start()

# 等待執行緒完成
thread1.join()
thread2.join()

非同步程式設計

Python 3.9 版本開始,提供了 asyncio 模組來支援非同步程式設計。非同步程式設計允許開發者建立單一執行緒的非同步任務,以便更有效地利用系統資源。

import asyncio

async def worker(num):
    print(f"Worker {num} started")
    # 非同步工作
    await asyncio.sleep(1)
    print(f"Worker {num} finished")

async def main():
    # 建立兩個非同步任務
    task1 = asyncio.create_task(worker(1))
    task2 = asyncio.create_task(worker(2))

    # 等待任務完成
    await task1
    await task2

# 啟動非同步主函式
asyncio.run(main())

生成器

Python 的生成器是一種特殊的迭代器,允許開發者建立自定義的迭代器。生成器使用 yield 關鍵字來產生值,而不是使用 return 關鍵字。

def letters():
    for i in range(97, 123):
        yield chr(i)

# 使用生成器
for letter in letters():
    print(letter)

生成器和協程

在 Python 中,生成器(generator)是一種特殊的迭代器(iterator),它可以用來產生一系列的值,而不需要建立一個列表或其他資料結構來儲存所有的值。生成器是透過使用 yield 關鍵字來定義的,當 yield 被呼叫時,生成器會暫停執行,並傳回一個值給呼叫者。

從效能最佳化視角來看,Python 多程式與多執行緒在埠掃描的應用中各有千秋。多程式充分利用多核 CPU 的優勢,繞過 GIL 的限制,大幅提升掃描速度,尤其適用於 CPU 密集型任務。然而,程式間通訊和資源管理的開銷較高,需要謹慎評估。多執行緒則更輕量,適用於 I/O 密集型任務,但受 GIL 限制,在多核 CPU 上的效率提升有限。同時,執行緒安全問題也需要額外關注,例如使用 Queue 和鎖機制確保資料同步和避免競爭條件。技術選型需考量掃描目標數量、網路環境和系統資源等因素。對於大規模埠掃描,多程式方案更具優勢;而對於少量埠或網路延遲較高的場景,多執行緒方案則更為均衡。展望未來,非同步程式設計和協程等技術的發展,有望在兼顧效能的同時,進一步簡化程式碼複雜度,為埠掃描工具提供更最佳化的解決方案。玄貓認為,開發者應深入理解多程式、多執行緒以及非同步程式設計的特性,才能根據實際需求選擇最合適的方案,打造高效且穩定的埠掃描工具。