Delphi 平行程式函式庫提供多種機制簡化多執行緒應用程式開發。TTask 類別讓建立和管理執行緒任務變得更直覺,搭配原子操作確保多執行緒環境下共用變數的資料一致性。執行緒池則透過重複使用執行緒降低建立執行緒的成本,TThreadPool 物件提供控制執行緒池行為的介面,例如設定最小和最大工作執行緒數量。Async/Await 模式進一步簡化非同步操作的處理,Async 函式啟動非同步任務,Await 方法指定完成後的回呼動作,提升程式碼可讀性和維護性。
平行處理的實踐探討
在現代軟體開發中,多執行緒技術已成為提升程式效能的重要手段。本文將探討Delphi中的平行處理技術,包括任務(Tasks)、執行緒池(Thread Pooling)以及非同步/等待(Async/Await)模式。
任務(Tasks)與執行緒管理
在Delphi的平行程式函式庫中,TTask類別提供了一種簡便的方式來建立和管理執行緒任務。以下是一個典型的任務執行範例:
Tasks
311
running: int64;
max: int64;
begin
running := TInterlocked.Increment(FNumRunning);
max := TInterlocked.Read(FMaxRunning);
if running > max then
TInterlocked.CompareExchange(FMaxRunning, running, max);
TInterlocked.Add(taskResult^, FindPrimes(lowBound, highBound));
TInterlocked.Decrement(FNumRunning);
end;
內容解密:
- 使用
TInterlocked.Increment來原子性地增加FNumRunning的值,表示有新的任務正在執行。 - 讀取
FMaxRunning的當前值,並與新的running值比較,若running較大,則更新FMaxRunning。 - 呼叫
FindPrimes函式計算質數,並將結果加到taskResult指標所指的變數上。 - 任務完成後,使用
TInterlocked.Decrement減少FNumRunning的值。
此段程式碼展示瞭如何使用原子操作來安全地更新共用變數,確保在多執行緒環境下的正確性。
執行緒池(Thread Pooling)
建立執行緒雖然比建立行程(Process)快,但仍然是一個相對耗時的作業。為瞭解決這個問題,大多數根據任務的平行程式函式庫都採用了執行緒池的概念。
執行緒池的工作原理
- 當應用程式首次建立任務時,平行程式函式庫會請求執行緒池來執行該任務。
- 執行緒池會建立一個新的執行緒來執行任務。
- 任務完成後,執行緒不會被銷毀,而是被儲存在一個非活躍執行緒的清單中。
- 當新的任務被啟動時,執行緒池會重複使用先前儲存的執行緒,而不是建立新的執行緒。
控制執行緒池
Delphi中的TThreadPool物件允許開發者控制執行緒池的行為。以下是其公開的介面:
function SetMaxWorkerThreads(Value: Integer): Boolean;
function SetMinWorkerThreads(Value: Integer): Boolean;
property MaxWorkerThreads: Integer read GetMaxWorkerThreads;
property MinWorkerThreads: Integer read GetMinWorkerThreads;
SetMinWorkerThreads用於設定最小工作執行緒數,但實際上這只是一個提示,因為實際的執行緒數量取決於實際需求。SetMaxWorkerThreads用於限制最大工作執行緒數。如果達到這個限制,新的任務將必須等待其他任務完成。
非同步/等待(Async/Await)模式
非同步/等待模式提供了一種更簡潔的方式來處理非同步操作,使得程式碼更容易閱讀和維護。
Async/Await的使用範例
procedure TfrmAsyncAwait.btnLongTaskAsyncClick(Sender: TObject);
begin
Log(Format('Button click in thread %d', [TThread.Current.ThreadID]));
(Sender as TButton).Enabled := false;
Async(LongTask)
.Await(
procedure
begin
Log(Format('Await in thread %d', [TThread.Current.ThreadID]));
(Sender as TButton).Enabled := true;
end
);
end;
內容解密:
- 當使用者點選按鈕時,該按鈕被停用,以防止重複點選。
Async(LongTask)啟動了一個非同步任務,執行LongTask方法。.Await方法指定了當LongTask完成後要執行的程式碼,這段程式碼會在主執行緒中執行,因此可以用來更新UI。
此範例展示瞭如何使用Async/Await模式來保持UI的回應性,同時執行耗時的操作。
Async/Await 模式的實作與應用
在 Delphi 的 Parallel Programming Library (PPL) 中,Async/Await 是一種簡化非同步程式設計的模式。它允許開發者在背景執行耗時的操作,並在操作完成後於主執行緒中執行回呼函式。
IAsync 介面與 Async 函式
IAsync 介面定義了 Await 方法,用於指定非同步操作完成後的回呼動作。Async 函式用於啟動非同步操作,並傳回一個實作 IAsync 介面的物件。
type
IAsync = interface
['{190C1975-FFCF-47AD-B075-79BC8F4157DA}']
procedure Await(const awaitProc: TProc);
end;
function Async(const asyncProc: TProc): IAsync;
內容解密:
IAsync介面提供了Await方法,讓開發者可以指定非同步操作完成後的處理邏輯。Async函式接收一個TProc型別的引數asyncProc,並建立一個TAsync物件傳回其IAsync介面。
TAsync 類別的實作
TAsync 類別是 Async/Await 模式的核心實作。它負責儲存非同步操作的處理邏輯,並在背景執行該操作。
constructor TAsync.Create(const asyncProc: TProc);
begin
inherited Create;
FAsyncProc := asyncProc;
end;
procedure TAsync.Await(const awaitProc: TProc);
begin
FSelf := Self;
FAwaitProc := awaitProc;
TTask.Run(Run);
end;
procedure TAsync.Run;
begin
FAsyncProc();
TThread.Queue(nil,
procedure
begin
FAwaitProc();
FSelf := nil;
end
);
end;
內容解密:
TAsync.Create建構函式儲存了傳入的非同步處理程式asyncProc。Await方法儲存了完成後的回呼程式awaitProc,並啟動了一個背景任務來執行Run方法。- 在
Run方法中,首先執行了儲存的非同步處理程式FAsyncProc。 - 非同步處理完成後,透過
TThread.Queue將回呼程式FAwaitProc安排到主執行緒中執行。 - 為了防止
IAsync介面被過早釋放,Await方法中將Self指定給FSelf以增加參照計數,待回呼程式執行完成後再將FSelf置為nil以釋放介面。
Join 模式的實作與應用
Join 模式用於平行啟動多個任務,並等待它們全部完成。PPL 中的 TParallel.Join 方法提供了此功能。
TParallel.Join 方法
TParallel.Join([Task1, Task2, Task3]);
或者對於兩個任務的情況,可以直接傳入兩個 TProc 引數:
TParallel.Join(Task1, Task2).Wait;
內容解密:
TParallel.Join方法接受一個包含多個任務處理程式的陣列,平行啟動這些任務。- 傳回的
ITask介面代表了一個組合任務,該任務在所有子任務完成後才會完成。 - 可以對傳回的
ITask介面呼叫Wait方法來等待所有任務完成。
示例程式碼
procedure TfrmParallelJoin.btnJoin2Click(Sender: TObject);
begin
ListBox1.Items.Add('Starting tasks');
TParallel.Join(Task1, Task2);
end;
procedure TfrmParallelJoin.Task1;
begin
QueueLog('Task1 started in thread ' + TThread.Current.ThreadID.ToString);
Sleep(1000);
QueueLog('Task1 stopped in thread ' + TThread.Current.ThreadID.ToString);
end;
procedure TfrmParallelJoin.Task2;
begin
QueueLog('Task2 started in thread ' + TThread.Current.ThreadID.ToString);
Sleep(1000);
QueueLog('Task2 stopped in thread ' + TThread.Current.ThreadID.ToString);
end;
圖表翻譯:
此範例展示了使用 TParallel.Join 同時啟動兩個任務。每個任務記錄其開始和結束的時間,以及執行緒 ID。透過此範例,可以觀察到任務確實是平行執行的。
自定義 Join/Await 模式
為了彌補 PPL 中 Join 模式無法在任務完成後通知主執行緒的不足,可以實作一個支援回呼功能的自定義 Join/Await 模式。
IJoin 介面與自定義 Join 方法
type
IJoin = interface
procedure Await(const awaitProc: TProc);
end;
function Join(const tasks: array of TProc): IJoin;
內容解密:
- 自定義的
Join方法接受一個包含多個任務處理程式的陣列,並傳回一個實作IJoin介面的物件。 IJoin介面提供了Await方法,用於指定所有任務完成後的回呼動作。
深入理解Join/Await模式及其實作細節
在平行程式設計中,Join/Await模式是一種重要的設計模式,用於處理多個非同步任務的協調與例外處理。本文將詳細探討該模式的實作原理及其在Delphi中的應用。
Join/Await模式的核心概念
Join/Await模式主要用於啟動多個平行任務,並在所有任務完成後執行特定的回呼函式。其核心介面IJoin定義了兩個過載的Await方法,一個用於簡單等待所有任務完成,另一個則用於捕捉任務中可能丟擲的異常。
type
IJoin = interface
['{ED4B4531-B233-4A02-A09A-13EE488FCCA3}']
procedure Await(const awaitProc: TProc); overload;
procedure Await(const awaitProc: TProc<Exception>); overload;
end;
實作細節:TJoin類別
TJoin類別是Join/Await模式的具體實作者。其建構子負責複製傳入的非同步任務陣列。Await方法的實作則啟動所有任務,並在任務完成後執行指定的回呼函式。
procedure TJoin.Await(const awaitProc: TProc<Exception>);
var
i: integer;
begin
FAwaitProc := awaitProc;
FNumRunning := Length(FAsyncProc);
FSelf := Self;
for i := Low(FAsyncProc) to High(FAsyncProc) do
TTask.Run(GetAsyncProc(i));
end;
執行非同步任務:Run方法
每個非同步任務由Run方法啟動。它負責執行指定的任務程式碼,並在任務完成後更新計數器。當所有任務完成時,計數器歸零,此時會佇列一個匿名方法來呼叫Await處理常式。
procedure TJoin.Run(const asyncProc: TProc);
begin
try
asyncProc();
except
on E: Exception do
AppendException(AcquireExceptionObject as Exception);
end;
if TInterlocked.Decrement(FNumRunning) = 0 then
TThread.Queue(nil,
procedure
begin
FAwaitProc(CreateAggregateException);
FSelf := nil;
end);
end;
例外處理機制
在任務執行過程中,如果發生異常,AppendException方法會將異常物件新增至FExceptions陣列中。由於多個任務可能同時丟擲異常,因此需要使用鎖定機制來同步對該陣列的存取。
procedure TJoin.AppendException(E: Exception);
begin
TMonitor.Enter(Self);
try
SetLength(FExceptions, Length(FExceptions) + 1);
FExceptions[High(FExceptions)] := E;
finally
TMonitor.Exit(Self);
end;
end;
使用範例
在ParallelJoin示範程式中,按下"DHP Join Exception"按鈕會啟動三個任務,其中兩個任務會丟擲異常。所有任務完成後,TasksStopped回呼函式會被呼叫,並處理可能存在的異常。
procedure TfrmParallelJoin.btnDHPJoinExcClick(Sender: TObject);
begin
ListBox1.Items.Add('Starting tasks');
Join([Task1, Task2E, Task3E]).Await(TasksStopped);
end;
procedure TfrmParallelJoin.TasksStopped(E: Exception);
var
i: Integer;
begin
ListBox1.Items.Add('Tasks stopped');
if Assigned(E) then
for i := 0 to EAggregateException(E).Count - 1 do
ListBox1.Items.Add('Task raised exception: ' + EAggregateException(E)[i].Message);
end;
Future模式簡介
除了Join/Await模式外,Delphi的平行程式函式庫還提供了Future模式,用於表示一個非同步操作並傳回結果。IFuture<T>介面繼承自ITask,支援查詢狀態、取消任務等操作。
function CountPrimes: integer;
var
i: Integer;
begin
// ... 省略實作細節
end;
procedure TfrmParallelFuture.btnCreateFutureClick(Sender: TObject);
begin
FFuture := TTask.Future<integer>(CountPrimes);
end;
procedure TfrmParallelFuture.btnGetValueClick(Sender: TObject);
begin
ListBox1.Items.Add('Result: ' + FFuture.Value.ToString);
end;