Delphi 的多執行緒程式設計中,有效管理執行緒間的資料交換和同步至關重要。阻塞集合提供了一種安全可靠的機制,允許不同執行緒以非同步方式進行資料存取,避免競爭條件和資料不一致等問題。OmniThreadLibrary (OTL) 則進一步簡化了多執行緒程式的開發,提供多種平行模式,例如 Join、Future、平行任務和背景工作者等,讓開發者能更輕鬆地利用多核心處理器的優勢,提升程式效能。理解這些模式的運作原理和使用方法,對於開發高效能、高回應性的 Delphi 應用程式至關重要,尤其在處理 I/O 密集型或 CPU 密集型任務時,更能展現其優勢。

阻塞集合(Blocking Collections)詳解

在多執行緒程式設計中,阻塞集合是一種非常重要且實用的資料結構,它允許執行緒安全地分享資料,並且能夠有效地控制資料的生產和消費速率。本文將探討阻塞集合的概念、使用方法以及在實際應用中的範例。

阻塞集合的基本概念

阻塞集合是一種特殊的集合,它支援多個執行緒同時進行讀寫操作。當集合為空時,讀取操作將被阻塞,直到有新的資料被寫入。同樣地,當集合已滿時,寫入操作也將被阻塞,直到有空間可用。這種機制有效地避免了執行緒之間的競爭條件和資料不一致的問題。

使用阻塞集合的範例

以下是一個使用阻塞集合的簡單範例,其中兩個執行緒分享一個阻塞集合。第一個執行緒產生資料並寫入阻塞集合,第二個執行緒則從阻塞集合中讀取資料並進行處理。

procedure TfrmBlockingCollection.btnStartClick(Sender: TObject);
begin
  FQueue := TOmniBlockingCollection.Create;
  TThread.CreateAnonymousThread(CreateProducer(17, FQueue)).Start;
  TThread.CreateAnonymousThread(CreateProducer(42, FQueue)).Start;
  TThread.CreateAnonymousThread(CreateConsumer(FQueue)).Start;
  btnStart.Enabled := false;
  btnStop.Enabled := true;
end;

在這個範例中,我們建立了兩個生產者執行緒和一個消費者執行緒。生產者執行緒使用 CreateProducer 方法產生資料並寫入阻塞集合,而消費者執行緒則使用 CreateConsumer 方法從阻塞集合中讀取資料。

function TfrmBlockingCollection.CreateProducer(interval: integer; const queue: IOmniBlockingCollection): TProc;
begin
  Result := procedure
    var
      num: integer;
    begin
      num := interval;
      while queue.TryAdd(num) do begin
        Inc(num, interval);
        Sleep(250);
      end;
      QueueLog('END/' + IntToStr(interval));
    end;
end;

function TfrmBlockingCollection.CreateConsumer(const queue: IOmniBlockingCollection): TProc;
begin
  Result := procedure
    var
      value: integer;
    begin
      for value in queue do
        QueueLog(IntToStr(value));
      QueueLog('STOP');
    end;
end;

程式碼解密:

  1. CreateProducer 方法:此方法建立一個匿名執行緒,該執行緒不斷產生資料並寫入阻塞集合,直到阻塞集合被標記為完成狀態。

    • queue.TryAdd(num):嘗試將資料寫入阻塞集合。如果集合未滿,則寫入成功並傳回 True;如果集合已滿或已被標記為完成狀態,則傳回 False
    • Sleep(250):使執行緒暫停 250 毫秒,以控制資料生產的速率。
  2. CreateConsumer 方法:此方法建立一個匿名執行緒,該執行緒從阻塞集合中讀取資料並進行處理,直到阻塞集合為空且被標記為完成狀態。

    • for value in queue do:使用 for..in 迴圈從阻塞集合中讀取資料。如果集合為空,迴圈將被阻塞,直到有新的資料被寫入。

圖表說明

圖表翻譯: 此圖表展示了生產者執行緒、消費者執行緒以及阻塞集合之間的關係。生產者執行緒將資料寫入阻塞集合,而消費者執行緒則從中讀取資料。當停止按鈕被點選時,阻塞集合被標記為完成狀態,從而通知所有執行緒停止操作。

OTL 的 Join 模式與 Future 模式

Join 模式的實作與應用

OTL(OmniThreadLibrary)的 Join 模式與 Delphi 原生的 PPL(Parallel Programming Library)實作方式有顯著不同,主要差異在於任務啟動方式和例外處理機制。OTL 提供了更靈活的任務管理方式,特別是在非同步執行的控制上。

實作範例

以下範例展示如何使用 OTL 的 Parallel.Join 啟動多個平行任務,並等待它們完成:

procedure TfrmParallelJoin.btnJoin2Click(Sender: TObject);
begin
  ListBox1.Items.Add('Starting tasks');
  Parallel.Join([Task1, Task2]).Execute;
  QueueLog('Join finished');
end;

在這個例子中,Task1Task2 將平行執行,而主執行緒會等待它們完成後才繼續執行。

非同步執行的實作

若要啟動非同步任務並在完成時收到通知,可以使用 NoWaitOnStopInvoke 方法:

procedure TfrmParallelJoin.btnJoinNoWaitClick(Sender: TObject);
begin
  ListBox1.Items.Add('Starting tasks');
  Parallel.Join([Task1, Task2, Task3])
    .NoWait
    .OnStopInvoke(
      procedure
      begin
        ListBox1.Items.Add('Tasks stopped');
      end)
    .Execute;
  ListBox1.Items.Add('Tasks started');
end;

Fluent 程式設計介面

OTL 的 OtlParallel 單元採用 Fluent 介面設計,大多數方法都傳回 Self,允許方法鏈式呼叫,如 .NoWait.OnStopInvoke(...).Execute

例外處理

OTL 的 Join 模式會將任務中未處理的例外收集到 EJoinException 物件中,並在主執行緒中引發。以下範例展示如何在同步和非同步情況下處理這些例外:

同步執行的例外處理

procedure TfrmParallelJoin.btnJoin3EClick(Sender: TObject);
var
  i: integer;
begin
  ListBox1.Items.Add('Starting tasks');
  try
    Parallel.Join([Task1, Task2, Task3E]).Execute;
  except
    on E: EJoinException do
    begin
      for i := 0 to EJoinException(E).Count - 1 do
        QueueLog('Task raised exception: ' + EJoinException(E)[i].FatalException.Message);
      ReleaseExceptionObject;
    end;
  end;
  QueueLog('Join finished');
end;

非同步執行的例外處理

procedure TfrmParallelJoin.btnJoinNoWaitEClick(Sender: TObject);
begin
  ListBox1.Items.Add('Starting tasks');
  FJoin := Parallel.Join([Task1, Task2E, Task3E])
    .NoWait
    .OnStopInvoke(TasksStopped)
    .Execute;
end;

procedure TfrmParallelJoin.TasksStopped;
var
  i: Integer;
begin
  QueueLog('Tasks stopped');
  try
    try
      FJoin.WaitFor(0);
    except
      on E: EJoinException do
      begin
        for i := 0 to EJoinException(E).Count - 1 do
          QueueLog('Task raised exception: ' + EJoinException(E)[i].FatalException.Message);
        ReleaseExceptionObject;
      end;
    end;
  finally
    FJoin := nil;
  end;
end;

Future 模式的實作與應用

OTL 的 Future 模式與 PPL 版本類別似,但提供了更強大的通知機制。以下是一個基本的使用範例:

procedure TfrmFuture.btnFutureClick(Sender: TObject);
begin
  FFuture := Parallel.Future<integer>(CountPrimes);
end;

主要差異

  • OTL 使用 Parallel.Future 取代 PPL 的 TParallel.Future
  • OTL 提供任務組態區塊來通知主執行緒計算完成,而 PPL 需要在背景計算程式碼中使用 TThread.Queue

OTL 平行模式探討

在前一章節中,我們已經探討了幾個基本的平行模式,本章節將進一步深入研究 OTL(OmniThreadLibrary)所提供的更多平行模式。這些模式能夠簡化多執行緒程式的開發,提高程式的效能和回應性。

使用任務組態增強平行模式功能

許多 OTL 平行模式都支援使用 IOmniTaskConfig 介面來進行任務組態。這種方式允許開發者在建立平行任務時,傳遞額外的組態引數,例如取消機制、執行緒池設定、完成事件處理器等。在 ParallelFuture 專案中,就使用了任務組態來指定完成處理器。

程式碼範例:使用任務組態建立 Future

procedure TfrmFuture.btnFuture2Click(Sender: TObject);
begin
  FFuture := Parallel.Future<Integer>(
    function: Integer
    begin
      Result := CountPrimes;
    end,
    Parallel.TaskConfig.OnTerminated(ReportFuture))
end;

procedure TfrmFuture.ReportFuture;
begin
  ListBox1.Items.Add('Result = ' + FFuture.Value.ToString);
  FFuture := nil;
end;

內容解密:

  1. btnFuture2Click 事件處理器中,建立了一個 Future 任務,並傳遞了一個額外的任務組態引數。
  2. Parallel.TaskConfig.OnTerminated(ReportFuture) 設定了當 Future 任務完成時的處理器 ReportFuture
  3. ReportFuture 方法在主執行緒中被呼叫,用於記錄計算結果並釋放 FFuture 物件。

平行任務模式

Parallel Task 模式是一種概念簡單但功能強大的平行模式。它允許在多個背景執行緒中執行相同的程式碼,從而加速較長的操作。

程式碼範例:使用 Parallel Task 生成隨機資料

procedure TfrmParallelTask.CreateRandomData(
  fileSize: integer; output: TStream);
const
  CBlockSize = 1*1024*1024 {1 MB};
var
  buffer : TOmniValue;
  memStr : TMemoryStream;
  outQueue : IOmniBlockingCollection;
  unwritten: IOmniCounter;
begin
  outQueue := TOmniBlockingCollection.Create;
  unwritten := CreateCounter(fileSize);
  Parallel.ParallelTask.NoWait
    .NumTasks(SpinEdit1.Value)
    .OnStop(Parallel.CompleteQueue(outQueue))
    .Execute(
      procedure
      var
        buffer : TMemoryStream;
        bytesToWrite: integer;
        randomGen : TGpRandom;
      begin
        randomGen := TGpRandom.Create;
        try
          while unwritten.Take(CBlockSize, bytesToWrite) do begin
            buffer := TMemoryStream.Create;
            buffer.Size := bytesToWrite;
            FillBuffer(buffer.Memory, bytesToWrite, randomGen);
            outQueue.Add(buffer);
          end;
        finally FreeAndNil(randomGen); end;
      end);
  for buffer in outQueue do begin
    memStr := buffer.AsObject as TMemoryStream;
    output.CopyFrom(memStr, 0);
    FreeAndNil(memStr);
  end;
end;

內容解密:

  1. CreateRandomData 方法使用 Parallel Task 模式來生成大塊的隨機資料。
  2. 資料被寫入到一個輸出串流中,為了避免執行緒安全問題,每個背景執行緒都建立獨立的 TMemoryStream 物件,並將其加入到一個共用的阻塞集合 outQueue 中。
  3. 主執行緒從 outQueue 中讀取資料,並將其複製到輸出串流中。
  4. 使用 unwritten 計數器來控制每個背景執行緒需要生成的資料量。
  5. OnStop 事件用於在所有背景任務完成後,標記 outQueue 為完成狀態。

圖表說明

圖表翻譯: 此圖示展示了使用 Parallel Task 模式生成隨機資料的流程。首先,建立 Parallel Task 並設定相關引數,接著在背景執行緒中生成隨機資料並加入到共用佇列中,最後由主執行緒讀取佇列中的資料並寫入到輸出串流中。

平行任務模式詳解與背景工作者模式介紹

在多執行緒程式設計中,平行任務模式(Parallel Task Pattern)與背景工作者模式(Background Worker Pattern)是非常實用的設計模式,能夠有效提升程式的執行效率和回應性。本文將探討這兩種模式的原理、實作方法及其在實際應用中的優勢。

平行任務模式(Parallel Task Pattern)

基本原理

平行任務模式是一種利用多執行緒技術,將大規模任務分解為多個子任務,並在多個執行緒中平行執行的設計模式。這種模式能夠充分利用多核心處理器的計算能力,顯著提高任務的完成速度。

實作方法

在 Delphi 中,可以使用 OmniThreadLibrary(OTL)來實作平行任務模式。OTL 提供了一套簡潔高效的 API,能夠輕鬆地建立和管理背景執行緒。

以下是一個簡單的範例,展示瞭如何使用 OTL 建立一個平行任務:

Parallel.Task(
  procedure
  var
    i: Integer;
  begin
    for i := 1 to 10 do
    begin
      // 執行某個任務
      DoSomething(i);
    end;
  end
).NoWait;

內容解密:

  • 上述程式碼建立了一個平行任務,該任務會在背景執行緒中執行一個迴圈。
  • NoWait 方法表示主執行緒不會等待該任務完成,而是立即繼續執行後續程式碼。

實際應用

在實際應用中,平行任務模式可以用於各種需要大量計算或 I/O 操作的場景,例如資料壓縮、加密、影像處理等。

背景工作者模式(Background Worker Pattern)

基本原理

背景工作者模式是一種設計模式,用於在背景執行緒中處理任務,並將處理結果傳回給主執行緒。這種模式能夠保持主執行緒的回應性,避免因為耗時操作而導致介面凍結。

實作方法

OTL 也提供了對背景工作者模式的支援。以下是一個簡單的範例,展示瞭如何使用 OTL 建立一個背景工作者:

FBackgroundWorker :=
  Parallel.BackgroundWorker
    .NumTasks(4)
    .Initialize(InitializeWorkerTask_asy)
    .Finalize(FinalizeWorkerTask_asy)
    .Execute(DownloadWebPage_asy)
    .OnRequestDone(ArticleDownloaded);

內容解密:

  • 上述程式碼建立了一個背景工作者,該工作者會使用 4 個執行緒來處理任務。
  • InitializeFinalize 方法分別用於初始化和清理背景工作者的資源。
  • Execute 方法指定了背景工作者要執行的任務。
  • OnRequestDone 事件處理器用於處理任務完成後的結果。

實際應用

在實際應用中,背景工作者模式可以用於各種需要背景處理的場景,例如網路請求、資料函式庫查詢、檔案處理等。

平行任務與背景工作者的比較

特性平行任務模式背景工作者模式
主要用途將大規模任務分解為多個子任務平行執行在背景執行緒中處理任務並傳回結果
執行方式多個執行緒平行執行背景執行緒處理任務
結果處理通常不需要傳回結果給主執行緒將處理結果傳回給主執行緒
適用場景大量計算或 I/O 操作需要保持主執行緒回應性的場景

此圖示

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Delphi多執行緒程式設計阻塞集合與OTL平行模式

package "安全架構" {
    package "網路安全" {
        component [防火牆] as firewall
        component [WAF] as waf
        component [DDoS 防護] as ddos
    }

    package "身份認證" {
        component [OAuth 2.0] as oauth
        component [JWT Token] as jwt
        component [MFA] as mfa
    }

    package "資料安全" {
        component [加密傳輸 TLS] as tls
        component [資料加密] as encrypt
        component [金鑰管理] as kms
    }

    package "監控審計" {
        component [日誌收集] as log
        component [威脅偵測] as threat
        component [合規審計] as audit
    }
}

firewall --> waf : 過濾流量
waf --> oauth : 驗證身份
oauth --> jwt : 簽發憑證
jwt --> tls : 加密傳輸
tls --> encrypt : 資料保護
log --> threat : 異常分析
threat --> audit : 報告生成

@enduml

圖表翻譯:

此圖示展示了平行任務模式和背景工作者模式之間的關係。兩種模式都利用多執行緒技術來提高程式的執行效率和回應性。平行任務模式主要用於將大規模任務分解為多個子任務平行執行,而背景工作者模式則用於在背景執行緒中處理任務並傳回結果給主執行緒。兩種模式都能夠有效地提升程式的效能和使用者經驗。