在網路安全領域,埠掃描是不可或缺的技術手段。本文將深入探討如何利用 Python 的多程式和多執行緒機制,構建高效的埠掃描器。我們將分析不同方案的實作細節,並探討全域性直譯器鎖(GIL)對多執行緒效能的影響,並提供使用 Queue 進行結果收集的實戰技巧,最後簡要介紹生成器和協程的應用。
結合Queue進行結果收集
在上面的範例中,掃描結果直接列印到控制檯。然而,在實際應用中,可能需要收集掃描結果進行進一步的處理。這時可以使用Queue來收集結果。
import multiprocessing
import socket
import time
from queue import Queue
def check_port(host, port, results):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1.0)
try:
sock.connect((host, port))
results.put(port)
except socket.error:
pass
finally:
sock.close()
def scan_ports(host, start_port, end_port):
processes = []
results = multiprocessing.Manager().Queue()
for port in range(start_port, end_port + 1):
p = multiprocessing.Process(target=check_port, args=(host, port, results))
processes.append(p)
p.start()
for p in processes:
p.join()
open_ports = []
while not results.empty():
open_ports.append(results.get())
return open_ports
if __name__ == "__main__":
host = "example.com"
start_port = 80
end_port = 100
start_time = time.time()
open_ports = scan_ports(host, start_port, end_port)
print(f"Open ports: {open_ports}")
print(f"Scan completed in {time.time() - start_time} seconds")
在這個範例中,使用了multiprocessing.Manager().Queue()
建立了一個可以被多個程式分享的Queue。掃描結果被放入Queue中,然後在主程式中收集結果。
瞭解多程式掃描器
在上一節中,我們實作了一個基本的埠掃描器,使用單執行緒的方式掃描主機的埠。然而,這種方法效率不高,尤其是在掃描大量埠時。為了提高效率,我們可以使用多程式的方式來掃描埠。
多程式掃描器實作
下面是使用多程式的方式實作的埠掃描器:
import multiprocessing as mp
import time
import socket
def check_port(host: str, port: int, results: mp.Queue):
"""
檢查指定主機的指定埠是否開啟。
:param host: 主機名稱或IP地址
:param port: 埠號
:param results: 結果佇列
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # 設定超時時間為1秒
try:
sock.connect((host, port))
results.put(port) # 如果連線成功,將埠號加入結果佇列
except socket.error:
pass # 如果連線失敗,忽略錯誤
sock.close()
if __name__ == '__main__':
start = time.time()
host = "localhost" # 更改為您要掃描的主機名稱或IP地址
results = mp.Queue() # 建立結果佇列
# 建立多程式池
with mp.Pool(processes=10) as pool:
# 將掃描任務提交到多程式池
pool.starmap(check_port, [(host, port, results) for port in range(80, 100)])
# 取出結果佇列中的所有結果
open_ports = []
while not results.empty():
open_ports.append(results.get())
print("開啟的埠:", open_ports)
print("掃描完成,耗時 {:.2f} 秒".format(time.time() - start))
程式碼解釋
- 我們首先匯入必要的模組,包括
multiprocessing
、time
和socket
。 - 我們定義了一個
check_port
函式,該函式檢查指定主機的指定埠是否開啟。如果連線成功,將埠號加入結果佇列。 - 在
if __name__ == '__main__':
區塊中,我們建立了一個結果佇列results
,並設定了主機名稱或IP地址host
。 - 我們建立了一個多程式池
pool
,並將掃描任務提交到多程式池。每個任務都是check_port
函式,傳入主機名稱、埠號和結果佇列作為引數。 - 掃描完成後,我們取出結果佇列中的所有結果,並列印預出開啟的埠號和掃描耗時。
優點
使用多程式的方式可以大大提高掃描效率,尤其是在掃描大量埠時。同時,多程式也可以幫助我們避免單執行緒的阻塞問題。
多程式掃描埠
在網路安全和系統管理中,掃描埠是一項重要的工作,能夠幫助我們瞭解哪些服務正在執行以及哪些埠是開放的。Python 的 multiprocessing
模組提供了一種實作多程式平行的方法,可以大大提高掃描效率。
實作原理
多程式平行的基本思想是建立多個程式,每個程式負責掃描一部分埠。這樣可以充分利用多核 CPU 的資源,從而提高掃描速度。
程式碼實作
import multiprocessing as mp
import socket
import time
def check_port(host, port, outputs):
"""
檢查指定主機的指定埠是否開放。
:param host: 主機地址
:param port: 埠號
:param outputs: 掃描結果佇列
:return: None
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # 設定超時時間
try:
sock.connect((host, port))
outputs.put(port) # 如果連線成功,將埠號放入佇列
except socket.error:
pass # 如果連線失敗,直接忽略
sock.close()
if __name__ == '__main__':
start = time.time()
# 建立一個程式池,大小為掃描埠範圍
scan_range = range(80, 100)
host = "localhost" # 請替換為您要掃描的主機
# 初始化程式池和佇列
mp.set_start_method('spawn')
pool_manager = mp.Manager()
outputs = pool_manager.Queue()
with mp.Pool(len(scan_range)) as pool:
processes = []
# 對每個埠建立一個程式進行掃描
for port in scan_range:
processes.append(pool.apply_async(check_port, (host, port, outputs)))
# 等待所有程式完成
for process in processes:
process.wait()
# 取出掃描結果
open_ports = []
while not outputs.empty():
open_ports.append(outputs.get())
print("開放的埠:", open_ports)
print("掃描時間:", time.time() - start)
解釋
- 我們定義了一個
check_port
函式,負責檢查指定主機的指定埠是否開放。如果連線成功,將埠號放入佇列中。 - 在主程式中,我們建立了一個程式池,大小為掃描埠範圍。然後,我們對每個埠建立一個程式進行掃描。
- 掃描完成後,我們取出掃描結果,並列印預出開放的埠號以及掃描時間。
圖表翻譯
flowchart TD A[開始] --> B[建立程式池] B --> C[對每個埠建立程式] C --> D[等待所有程式完成] D --> E[取出掃描結果] E --> F[列印結果]
圖表說明
上述流程圖描述了多程式掃描埠的過程。首先,我們建立一個程式池,然後對每個埠建立一個程式進行掃描。等待所有程式完成後,我們取出掃描結果,並列印預出開放的埠號以及掃描時間。
多執行緒與多程式:Python 併發程式設計
在 Python 中,實作併發程式設計可以透過多執行緒(multithreading)和多程式(multiprocessing)兩種方式。多執行緒適用於 I/O 密集型任務,而多程式則適用於 CPU 密集型任務。
多程式
Python 的 multiprocessing
模組提供了一個高階介面來建立和管理程式。每個程式都有自己的記憶體空間,互相之間不分享資料。
示例:使用多程式掃描埠
import multiprocessing
import socket
def scan_port(port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("example.com", port))
print(f"Port {port} is open")
except socket.error:
pass
if __name__ == "__main__":
ports = [80, 81, 82, 83, 84]
with multiprocessing.Pool() as pool:
pool.map(scan_port, ports)
多執行緒
Python 的 threading
模組提供了一個高階介面來建立和管理執行緒。由於 GIL(全域性直譯器鎖)的存在,多執行緒在 CPU 密集型任務中可能不會帶來顯著的效能提升。
示例:使用多執行緒掃描埠
import threading
import socket
def scan_port(port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("example.com", port))
print(f"Port {port} is open")
except socket.error:
pass
threads = []
ports = [80, 81, 82, 83, 84]
for port in ports:
thread = threading.Thread(target=scan_port, args=(port,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
GIL 限制
GIL 是 Python 直譯器中的一個鎖,它確保了任何時候只有一個執行緒能夠執行 Python 位元組碼。這意味著,即使在多核 CPU 上執行多執行緒程式,也可能不會獲得預期的效能提升。
- 多程式適用於 CPU 密集型任務。
- 多執行緒適用於 I/O 密集型任務,但受 GIL 限制。
- 選擇合適的併發模型以獲得最佳效能。
多執行緒技術在Python中的應用
Python的多執行緒技術可以用於提高程式的效率和回應速度。下面是一個使用多執行緒技術的簡單網路掃描器的例子。
網路掃描器的實作
首先,我們需要匯入必要的模組,包括threading
、queue
、socket
和time
。然後,我們定義一個函式check_port
,用於檢查一個埠是否開放。
import socket
import threading
from queue import Queue
import time
timeout = 1.0
def check_port(host: str, port: int, results: Queue):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
results.put(port)
sock.close()
多執行緒的實作
接下來,我們建立一個列表threads
,用於儲存建立的執行緒。然後,我們迴圈遍歷要掃描的埠範圍,為每個埠建立一個執行緒,並啟動它。
host = '127.0.0.1'
ports = range(80, 100)
threads = []
results = Queue()
for port in ports:
t = threading.Thread(target=check_port, args=(host, port, results))
t.start()
threads.append(t)
等待執行緒完成
建立並啟動所有執行緒後,我們需要等待它們完成。這可以透過迴圈遍歷threads
列表,並對每個執行緒呼叫join()
方法來實作。
for t in threads:
t.join()
輸出結果
最後,我們可以從佇列results
中取出掃描結果,並輸出開放的埠。
while not results.empty():
print("Port {} is open".format(results.get()))
完整程式碼
以下是完整的網路掃描器程式碼:
import socket
import threading
from queue import Queue
import time
timeout = 1.0
def check_port(host: str, port: int, results: Queue):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
results.put(port)
sock.close()
host = '127.0.0.1'
ports = range(80, 100)
threads = []
results = Queue()
for port in ports:
t = threading.Thread(target=check_port, args=(host, port, results))
t.start()
threads.append(t)
for t in threads:
t.join()
while not results.empty():
print("Port {} is open".format(results.get()))
這個程式碼示範瞭如何使用Python的多執行緒技術來實作一個簡單的網路掃描器。透過建立多個執行緒來掃描不同的埠,可以大大提高掃描的效率。
多執行緒埠掃描器
簡介
本文將介紹如何使用 Python 實作一個多執行緒的埠掃描器。這個工具可以幫助我們快速地掃描目標主機的開放埠。
程式碼
import socket
import threading
import time
from queue import Queue
def check_port(host, port, results):
"""
檢查指定的埠是否開放
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
try:
sock.connect((host, port))
results.put(port)
except socket.error:
pass
finally:
sock.close()
def main():
start = time.time()
host = "localhost" # 請替換成你的主機名稱或 IP 地址
threads = []
results = Queue()
for port in range(80, 100):
t = threading.Thread(target=check_port, args=(host, port, results))
t.start()
threads.append(t)
for t in threads:
t.join()
while not results.empty():
print(f"Port {results.get()} is open")
print(f"Completed scan in {time.time() - start} seconds")
if __name__ == '__main__':
main()
執行結果
執行這個指令碼後,你會看到如下輸出:
Port 80 is open
Port 90 is open
Completed scan in 0.123 seconds
這表示埠 80 和 90 是開放的,掃描過程花費了約 0.123 秒。
效能比較
與單執行緒版本相比,多執行緒版本的掃描速度大約快了 10 倍。這是因為多執行緒可以同時檢查多個埠,而單執行緒版本需要順序地檢查每個埠。
多執行緒與GIL
在Python中,多執行緒(Multithreading)是一種允許多個執行緒同時執行的機制。然而,Python的Global Interpreter Lock(GIL)會限制多個執行緒同時執行Python bytecodes。這意味著,即使在多核心的CPU上,Python的多執行緒也不能真正地平行執行。
GIL的作用
GIL是一種鎖定機制,確保在任何時刻,只有一個執行緒可以執行Python bytecodes。這是因為Python的內部資料結構不是執行緒安全的,如果多個執行緒同時存取這些資料結構,可能會導致資料損壞或其他不可預測的行為。
如何繞過GIL
雖然GIL限制了多執行緒的平行性,但是在某些情況下,仍然可以繞過GIL。例如,在執行I/O操作(如讀寫檔案、網路請求等)時,GIL會被釋放,允許其他執行緒執行。這是因為I/O操作通常需要等待外部資源(如磁碟、網路等),在這段時間內,GIL可以被釋放,允許其他執行緒執行。
Py_BEGIN_ALLOW_THREADS和Py_END_ALLOW_THREADS
在Python的C擴充套件中,可以使用Py_BEGIN_ALLOW_THREADS
和Py_END_ALLOW_THREADS
兩個宏來暫時釋放GIL。這允許其他執行緒在這段時間內執行Python bytecodes。
Py_BEGIN_ALLOW_THREADS
// 執行I/O操作或其他不需要GIL的程式碼
Py_END_ALLOW_THREADS
內容解密
- GIL是一種鎖定機制,確保在任何時刻,只有一個執行緒可以執行Python bytecodes。
- 在執行I/O操作時,GIL會被釋放,允許其他執行緒執行。
- 使用
Py_BEGIN_ALLOW_THREADS
和Py_END_ALLOW_THREADS
宏可以暫時釋放GIL,允許其他執行緒執行Python bytecodes。
圖表翻譯
flowchart TD A[開始] --> B[執行I/O操作] B --> C[釋放GIL] C --> D[允許其他執行緒執行] D --> E[還原GIL] E --> F[結束]
- 圖表描述了在Python中,如何繞過GIL,在執行I/O操作時暫時釋放GIL,允許其他執行緒執行Python bytecodes。
多執行緒與Python
Python的多執行緒機制允許開發者建立多個執行緒,以便同時執行多個任務。每個執行緒都有自己的狀態和堆積疊,且可以獨立執行。
執行緒狀態
執行緒狀態(PyThreadState)包含了多個重要的屬性,包括:
- 唯一識別符(ID)
- 執行緒的執行框架(frame)
- 當前異常(exception)
- 執行緒的深度(recursion depth)
- 可選的追蹤函式(trace function)
- 執行緒鎖(GIL)計數器
- 非同步生成器計數器
執行緒建立
Python的執行緒是透過建立threading.Thread
類別的例項來建立的。這個類別封裝了底層的PyThread
類別,並提供了一個更高階別的介面來建立和管理執行緒。
建立一個新執行緒的過程包括以下步驟:
- 建立一個初始化狀態(bootstate),它與目標函式(target)和引數(args和kwargs)相關聯。
- 將初始化狀態與直譯器狀態(interpreter state)相關聯。
- 建立一個新的
PyThreadState
例項,並將其與當前的直譯器相關聯。 - 如果尚未啟用GIL鎖,則啟用它。
- 啟動一個新的執行緒,使用適合作業系統的實作。
執行緒同步
Python使用GIL(全域性直譯器鎖)來同步對分享資源的存取。GIL確保在任意時刻,只有一個執行緒可以執行Python位元組碼。這可以防止多個執行緒同時存取分享資源,並減少同步的複雜性。
然而,GIL也可能成為效能瓶頸,特別是在CPU密集型任務中。為了緩解這個問題,Python提供了其他同步機制,例如鎖(locks)和訊號量(semaphores),以便在多個執行緒之間進行通訊和同步。
多執行緒與非同步程式設計
Python 的多執行緒機制允許開發者建立多個執行緒,以便更有效地利用系統資源。然而,Python 的全域鎖機制(GIL)可能會限制多執行緒的效率。
多執行緒
Python 的 threading
模組提供了建立和管理執行緒的功能。建立一個新的執行緒需要指定一個目標函式和可選的引數。執行緒可以使用 start()
方法啟動,使用 join()
方法等待其完成。
import threading
def worker(num):
print(f"Worker {num} started")
# 執行緒工作
print(f"Worker {num} finished")
# 建立兩個執行緒
thread1 = threading.Thread(target=worker, args=(1,))
thread2 = threading.Thread(target=worker, args=(2,))
# 啟動執行緒
thread1.start()
thread2.start()
# 等待執行緒完成
thread1.join()
thread2.join()
非同步程式設計
Python 3.9 版本開始,提供了 asyncio
模組來支援非同步程式設計。非同步程式設計允許開發者建立單一執行緒的非同步任務,以便更有效地利用系統資源。
import asyncio
async def worker(num):
print(f"Worker {num} started")
# 非同步工作
await asyncio.sleep(1)
print(f"Worker {num} finished")
async def main():
# 建立兩個非同步任務
task1 = asyncio.create_task(worker(1))
task2 = asyncio.create_task(worker(2))
# 等待任務完成
await task1
await task2
# 啟動非同步主函式
asyncio.run(main())
生成器
Python 的生成器是一種特殊的迭代器,允許開發者建立自定義的迭代器。生成器使用 yield
關鍵字來產生值,而不是使用 return
關鍵字。
def letters():
for i in range(97, 123):
yield chr(i)
# 使用生成器
for letter in letters():
print(letter)
生成器和協程
在 Python 中,生成器(generator)是一種特殊的迭代器(iterator),它可以用來產生一系列的值,而不需要建立一個列表或其他資料結構來儲存所有的值。生成器是透過使用 yield
關鍵字來定義的,當 yield
被呼叫時,生成器會暫停執行,並傳回一個值給呼叫者。
從效能最佳化視角來看,Python 多程式與多執行緒在埠掃描的應用中各有千秋。多程式充分利用多核 CPU 的優勢,繞過 GIL 的限制,大幅提升掃描速度,尤其適用於 CPU 密集型任務。然而,程式間通訊和資源管理的開銷較高,需要謹慎評估。多執行緒則更輕量,適用於 I/O 密集型任務,但受 GIL 限制,在多核 CPU 上的效率提升有限。同時,執行緒安全問題也需要額外關注,例如使用 Queue 和鎖機制確保資料同步和避免競爭條件。技術選型需考量掃描目標數量、網路環境和系統資源等因素。對於大規模埠掃描,多程式方案更具優勢;而對於少量埠或網路延遲較高的場景,多執行緒方案則更為均衡。展望未來,非同步程式設計和協程等技術的發展,有望在兼顧效能的同時,進一步簡化程式碼複雜度,為埠掃描工具提供更最佳化的解決方案。玄貓認為,開發者應深入理解多程式、多執行緒以及非同步程式設計的特性,才能根據實際需求選擇最合適的方案,打造高效且穩定的埠掃描工具。