Delphi 平行程式函式庫提供多種機制簡化多執行緒應用程式開發。TTask 類別讓建立和管理執行緒任務變得更直覺,搭配原子操作確保多執行緒環境下共用變數的資料一致性。執行緒池則透過重複使用執行緒降低建立執行緒的成本,TThreadPool 物件提供控制執行緒池行為的介面,例如設定最小和最大工作執行緒數量。Async/Await 模式進一步簡化非同步操作的處理,Async 函式啟動非同步任務,Await 方法指定完成後的回呼動作,提升程式碼可讀性和維護性。

平行處理的實踐探討

在現代軟體開發中,多執行緒技術已成為提升程式效能的重要手段。本文將探討Delphi中的平行處理技術,包括任務(Tasks)、執行緒池(Thread Pooling)以及非同步/等待(Async/Await)模式。

任務(Tasks)與執行緒管理

在Delphi的平行程式函式庫中,TTask類別提供了一種簡便的方式來建立和管理執行緒任務。以下是一個典型的任務執行範例:

Tasks 
311
running: int64;
max: int64;
begin
  running := TInterlocked.Increment(FNumRunning);
  max := TInterlocked.Read(FMaxRunning);
  if running > max then
    TInterlocked.CompareExchange(FMaxRunning, running, max);
  TInterlocked.Add(taskResult^, FindPrimes(lowBound, highBound));
  TInterlocked.Decrement(FNumRunning);
end;

內容解密:

  1. 使用TInterlocked.Increment來原子性地增加FNumRunning的值,表示有新的任務正在執行。
  2. 讀取FMaxRunning的當前值,並與新的running值比較,若running較大,則更新FMaxRunning
  3. 呼叫FindPrimes函式計算質數,並將結果加到taskResult指標所指的變數上。
  4. 任務完成後,使用TInterlocked.Decrement減少FNumRunning的值。

此段程式碼展示瞭如何使用原子操作來安全地更新共用變數,確保在多執行緒環境下的正確性。

執行緒池(Thread Pooling)

建立執行緒雖然比建立行程(Process)快,但仍然是一個相對耗時的作業。為瞭解決這個問題,大多數根據任務的平行程式函式庫都採用了執行緒池的概念。

執行緒池的工作原理

  1. 當應用程式首次建立任務時,平行程式函式庫會請求執行緒池來執行該任務。
  2. 執行緒池會建立一個新的執行緒來執行任務。
  3. 任務完成後,執行緒不會被銷毀,而是被儲存在一個非活躍執行緒的清單中。
  4. 當新的任務被啟動時,執行緒池會重複使用先前儲存的執行緒,而不是建立新的執行緒。

控制執行緒池

Delphi中的TThreadPool物件允許開發者控制執行緒池的行為。以下是其公開的介面:

function SetMaxWorkerThreads(Value: Integer): Boolean;
function SetMinWorkerThreads(Value: Integer): Boolean;
property MaxWorkerThreads: Integer read GetMaxWorkerThreads;
property MinWorkerThreads: Integer read GetMinWorkerThreads;
  • SetMinWorkerThreads用於設定最小工作執行緒數,但實際上這只是一個提示,因為實際的執行緒數量取決於實際需求。
  • SetMaxWorkerThreads用於限制最大工作執行緒數。如果達到這個限制,新的任務將必須等待其他任務完成。

非同步/等待(Async/Await)模式

非同步/等待模式提供了一種更簡潔的方式來處理非同步操作,使得程式碼更容易閱讀和維護。

Async/Await的使用範例

procedure TfrmAsyncAwait.btnLongTaskAsyncClick(Sender: TObject);
begin
  Log(Format('Button click in thread %d', [TThread.Current.ThreadID]));
  (Sender as TButton).Enabled := false;
  Async(LongTask)
    .Await(
      procedure
      begin
        Log(Format('Await in thread %d', [TThread.Current.ThreadID]));
        (Sender as TButton).Enabled := true;
      end
    );
end;

內容解密:

  1. 當使用者點選按鈕時,該按鈕被停用,以防止重複點選。
  2. Async(LongTask)啟動了一個非同步任務,執行LongTask方法。
  3. .Await方法指定了當LongTask完成後要執行的程式碼,這段程式碼會在主執行緒中執行,因此可以用來更新UI。

此範例展示瞭如何使用Async/Await模式來保持UI的回應性,同時執行耗時的操作。

Async/Await 模式的實作與應用

在 Delphi 的 Parallel Programming Library (PPL) 中,Async/Await 是一種簡化非同步程式設計的模式。它允許開發者在背景執行耗時的操作,並在操作完成後於主執行緒中執行回呼函式。

IAsync 介面與 Async 函式

IAsync 介面定義了 Await 方法,用於指定非同步操作完成後的回呼動作。Async 函式用於啟動非同步操作,並傳回一個實作 IAsync 介面的物件。

type
  IAsync = interface
    ['{190C1975-FFCF-47AD-B075-79BC8F4157DA}']
    procedure Await(const awaitProc: TProc);
  end;

function Async(const asyncProc: TProc): IAsync;

內容解密:

  • IAsync 介面提供了 Await 方法,讓開發者可以指定非同步操作完成後的處理邏輯。
  • Async 函式接收一個 TProc 型別的引數 asyncProc,並建立一個 TAsync 物件傳回其 IAsync 介面。

TAsync 類別的實作

TAsync 類別是 Async/Await 模式的核心實作。它負責儲存非同步操作的處理邏輯,並在背景執行該操作。

constructor TAsync.Create(const asyncProc: TProc);
begin
  inherited Create;
  FAsyncProc := asyncProc;
end;

procedure TAsync.Await(const awaitProc: TProc);
begin
  FSelf := Self;
  FAwaitProc := awaitProc;
  TTask.Run(Run);
end;

procedure TAsync.Run;
begin
  FAsyncProc();
  TThread.Queue(nil,
    procedure
    begin
      FAwaitProc();
      FSelf := nil;
    end
  );
end;

內容解密:

  1. TAsync.Create 建構函式儲存了傳入的非同步處理程式 asyncProc
  2. Await 方法儲存了完成後的回呼程式 awaitProc,並啟動了一個背景任務來執行 Run 方法。
  3. Run 方法中,首先執行了儲存的非同步處理程式 FAsyncProc
  4. 非同步處理完成後,透過 TThread.Queue 將回呼程式 FAwaitProc 安排到主執行緒中執行。
  5. 為了防止 IAsync 介面被過早釋放,Await 方法中將 Self 指定給 FSelf 以增加參照計數,待回呼程式執行完成後再將 FSelf 置為 nil 以釋放介面。

Join 模式的實作與應用

Join 模式用於平行啟動多個任務,並等待它們全部完成。PPL 中的 TParallel.Join 方法提供了此功能。

TParallel.Join 方法

TParallel.Join([Task1, Task2, Task3]);

或者對於兩個任務的情況,可以直接傳入兩個 TProc 引數:

TParallel.Join(Task1, Task2).Wait;

內容解密:

  • TParallel.Join 方法接受一個包含多個任務處理程式的陣列,平行啟動這些任務。
  • 傳回的 ITask 介面代表了一個組合任務,該任務在所有子任務完成後才會完成。
  • 可以對傳回的 ITask 介面呼叫 Wait 方法來等待所有任務完成。

示例程式碼

procedure TfrmParallelJoin.btnJoin2Click(Sender: TObject);
begin
  ListBox1.Items.Add('Starting tasks');
  TParallel.Join(Task1, Task2);
end;

procedure TfrmParallelJoin.Task1;
begin
  QueueLog('Task1 started in thread ' + TThread.Current.ThreadID.ToString);
  Sleep(1000);
  QueueLog('Task1 stopped in thread ' + TThread.Current.ThreadID.ToString);
end;

procedure TfrmParallelJoin.Task2;
begin
  QueueLog('Task2 started in thread ' + TThread.Current.ThreadID.ToString);
  Sleep(1000);
  QueueLog('Task2 stopped in thread ' + TThread.Current.ThreadID.ToString);
end;

圖表翻譯:

此範例展示了使用 TParallel.Join 同時啟動兩個任務。每個任務記錄其開始和結束的時間,以及執行緒 ID。透過此範例,可以觀察到任務確實是平行執行的。

自定義 Join/Await 模式

為了彌補 PPL 中 Join 模式無法在任務完成後通知主執行緒的不足,可以實作一個支援回呼功能的自定義 Join/Await 模式。

IJoin 介面與自定義 Join 方法

type
  IJoin = interface
    procedure Await(const awaitProc: TProc);
  end;

function Join(const tasks: array of TProc): IJoin;

內容解密:

  • 自定義的 Join 方法接受一個包含多個任務處理程式的陣列,並傳回一個實作 IJoin 介面的物件。
  • IJoin 介面提供了 Await 方法,用於指定所有任務完成後的回呼動作。

深入理解Join/Await模式及其實作細節

在平行程式設計中,Join/Await模式是一種重要的設計模式,用於處理多個非同步任務的協調與例外處理。本文將詳細探討該模式的實作原理及其在Delphi中的應用。

Join/Await模式的核心概念

Join/Await模式主要用於啟動多個平行任務,並在所有任務完成後執行特定的回呼函式。其核心介面IJoin定義了兩個過載的Await方法,一個用於簡單等待所有任務完成,另一個則用於捕捉任務中可能丟擲的異常。

type
  IJoin = interface
    ['{ED4B4531-B233-4A02-A09A-13EE488FCCA3}']
    procedure Await(const awaitProc: TProc); overload;
    procedure Await(const awaitProc: TProc<Exception>); overload;
  end;

實作細節:TJoin類別

TJoin類別是Join/Await模式的具體實作者。其建構子負責複製傳入的非同步任務陣列。Await方法的實作則啟動所有任務,並在任務完成後執行指定的回呼函式。

procedure TJoin.Await(const awaitProc: TProc<Exception>);
var
  i: integer;
begin
  FAwaitProc := awaitProc;
  FNumRunning := Length(FAsyncProc);
  FSelf := Self;
  for i := Low(FAsyncProc) to High(FAsyncProc) do
    TTask.Run(GetAsyncProc(i));
end;

執行非同步任務:Run方法

每個非同步任務由Run方法啟動。它負責執行指定的任務程式碼,並在任務完成後更新計數器。當所有任務完成時,計數器歸零,此時會佇列一個匿名方法來呼叫Await處理常式。

procedure TJoin.Run(const asyncProc: TProc);
begin
  try
    asyncProc();
  except
    on E: Exception do
      AppendException(AcquireExceptionObject as Exception);
  end;
  
  if TInterlocked.Decrement(FNumRunning) = 0 then
    TThread.Queue(nil,
      procedure
      begin
        FAwaitProc(CreateAggregateException);
        FSelf := nil;
      end);
end;

例外處理機制

在任務執行過程中,如果發生異常,AppendException方法會將異常物件新增至FExceptions陣列中。由於多個任務可能同時丟擲異常,因此需要使用鎖定機制來同步對該陣列的存取。

procedure TJoin.AppendException(E: Exception);
begin
  TMonitor.Enter(Self);
  try
    SetLength(FExceptions, Length(FExceptions) + 1);
    FExceptions[High(FExceptions)] := E;
  finally
    TMonitor.Exit(Self);
  end;
end;

使用範例

ParallelJoin示範程式中,按下"DHP Join Exception"按鈕會啟動三個任務,其中兩個任務會丟擲異常。所有任務完成後,TasksStopped回呼函式會被呼叫,並處理可能存在的異常。

procedure TfrmParallelJoin.btnDHPJoinExcClick(Sender: TObject);
begin
  ListBox1.Items.Add('Starting tasks');
  Join([Task1, Task2E, Task3E]).Await(TasksStopped);
end;

procedure TfrmParallelJoin.TasksStopped(E: Exception);
var
  i: Integer;
begin
  ListBox1.Items.Add('Tasks stopped');
  if Assigned(E) then
    for i := 0 to EAggregateException(E).Count - 1 do
      ListBox1.Items.Add('Task raised exception: ' + EAggregateException(E)[i].Message);
end;

Future模式簡介

除了Join/Await模式外,Delphi的平行程式函式庫還提供了Future模式,用於表示一個非同步操作並傳回結果。IFuture<T>介面繼承自ITask,支援查詢狀態、取消任務等操作。

function CountPrimes: integer;
var
  i: Integer;
begin
  // ... 省略實作細節
end;

procedure TfrmParallelFuture.btnCreateFutureClick(Sender: TObject);
begin
  FFuture := TTask.Future<integer>(CountPrimes);
end;

procedure TfrmParallelFuture.btnGetValueClick(Sender: TObject);
begin
  ListBox1.Items.Add('Result: ' + FFuture.Value.ToString);
end;