在 Delphi 開發中,處理大量資料或執行複雜任務時,多執行緒程式設計是提升效率的關鍵。OmniThreadLibrary (OTL) 框架提供了一系列簡化多執行緒開發的模式,其中管線模式、對映模式和定時任務尤其值得關注。本文將以檔案統計分析為例,示範如何應用這些模式。首先,管線模式能將檔案統計分析過程分解為掃描資料夾、讀取檔案、收集統計資料和彙總結果等多個階段,每個階段可獨立運作,提升整體效率。接著,對映模式能簡化對大量檔案的相同操作,例如檢查檔案是否符合特定條件。最後,定時任務能定期執行特定操作,例如定時備份或網路檢查,而 OTL 提供了更彈性的設定,例如動態調整間隔時間。

管道模式在檔案統計分析中的應用

管道模式是一種在平行處理中非常實用的設計模式,尤其是在處理大量資料時,能夠有效地將任務分解為多個階段,每個階段負責特定的處理工作,從而提高整體的處理效率和系統的可擴充套件性。

管道模式的基本概念

在管道模式中,資料透過一系列的階段進行處理,每個階段都是一個獨立的任務或執行緒。這些階段透過輸入和輸出通道進行通訊,前一個階段的輸出作為後一個階段的輸入。這種模式非常適合於處理Pipeline式的任務,如檔案處理、資料分析等。

檔案統計分析管道的實作

以下是一個使用管道模式進行檔案統計分析的例子。這個管道包含四個階段:FolderScannerFileReaderStatCollectorStatAggregator

第一階段:FolderScanner

FolderScanner負責掃描指定的資料夾及其子資料夾,找出所有的檔案,並將檔案的路徑傳送到輸出通道。

procedure TfrmPipeline.FolderScanner(
  const input, output: IOmniBlockingCollection;
  const task: IOmniTask);
var
  value: TOmniValue;
  stopEnum: boolean;
begin
  for value in input do begin
    if task.CancellationToken.IsSignalled then
      break;
    stopEnum := false;
    DSiEnumFilesEx(value, '*.txt', faAnyFile, 
      procedure(const folder, filename: string)
      begin
        if task.CancellationToken.IsSignalled then
          stopEnum := true
        else
          output.TryAdd(TPath.Combine(folder, filename));
      end,
      false, false, true);
    if stopEnum then
      break;
  end;
end;

內容解密:

  1. FolderScanner方法接收輸入和輸出通道,以及任務控制物件。它遍歷輸入通道中的值,通常是資料夾路徑。
  2. DSiEnumFilesEx函式用於掃描資料夾及其子資料夾,並對每個找到的檔案執行匿名方法。
  3. 在匿名方法中,檢查是否應該因取消訊號而停止列舉。如果沒有取消,則將檔案的完整路徑新增到輸出通道。
  4. TryAdd方法用於新增資料到輸出通道,這是一種非阻塞呼叫,如果管道被取消,它可能會失敗。

第二階段:FileReader

FileReader讀取檔案內容,並將其儲存在TStringList中,然後將其作為一個物件傳送到輸出通道。

procedure TfrmPipeline.FileReader(
  const input, output: IOmniBlockingCollection;
  const task: IOmniTask);
var
  sl: TStringList;
  value: TOmniValue;
  outValue: TOmniValue;
begin
  for value in input do begin
    if task.CancellationToken.IsSignalled then
      break;
    sl := TStringList.Create;
    try
      sl.LoadFromFile(value);
      outValue.AsOwnedObject := sl;
      sl := nil;
      output.TryAdd(outValue);
      task.Comm.Send(MSG_OK, value);
    except
      task.Comm.Send(MSG_ERROR, value);
      FreeAndNil(sl);
    end;
  end;
end;

內容解密:

  1. FileReader遍歷輸入通道中的檔案路徑,並嘗試載入每個檔案的內容到TStringList
  2. TStringList被封裝在TOmniValue中作為一個擁有的物件,這樣可以確保它在適當的時候被銷毀。
  3. 成功載入後,將TStringList新增到輸出通道,並向主執行緒傳送一個成功訊息。
  4. 如果載入失敗,向主執行緒傳送一個錯誤訊息,並釋放TStringList

平行模式的進階應用:管線與對映

在平行程式設計中,管線(Pipeline)與對映(Map)是兩種非常實用的模式,能夠有效地簡化平行任務的實作。本章將探討這兩種模式的原理、實作方法及其在實際應用中的優勢。

管線模式:資料處理的最佳實踐

管線模式是一種將任務分解為多個階段,每個階段處理資料並將結果傳遞給下一個階段的平行處理方式。這種模式非常適合處理需要多步驟處理的資料流,例如影象處理、資料分析等。

管線模式的實作

以下是一個使用管線模式處理檔案統計的範例:

procedure TfrmPipeline.btnStartClick(Sender: TObject);
begin
  FPipeline := Parallel.Pipeline.NumStages(3)
    .Stage(CollectFiles)
    .Stage(ProcessFile)
    .Stage(Aggregate)
    .OnStopInvoke(ShowResult)
    .Run;
end;

在上述範例中,管線被分為三個階段:收集檔案、處理檔案和匯總結果。每個階段都是一個獨立的任務,平行執行以提高效率。

#### 內容解密:

  • Parallel.Pipeline用於建立管線。
  • NumStages(3)指定管線包含三個階段。
  • Stage方法用於定義每個階段的任務。
  • OnStopInvoke(ShowResult)指定管線完成後的回呼函式。

對映模式:平行資料處理的簡化

對映模式是一種對集合中的每個元素應用某個函式,並傳回包含結果的新集合的平行處理模式。這種模式非常適合需要對大量資料進行相同操作的場景。

對映模式的實作

以下是一個使用對映模式檢查陣列中質數的範例:

procedure TfrmParallelMap.btnParallelClick(Sender: TObject);
var
  output: TArray<integer>;
  sw: TStopwatch;
begin
  PrepareTestData;
  sw := TStopwatch.StartNew;
  output := Parallel.Map<integer, integer>(FTestData,
    function(const source: integer; var target: integer): boolean
    begin
      Result := IsPrime(source);
      target := source;
    end);
  sw.Stop;
  LogResult(output, sw.ElapsedMilliseconds);
end;

在上述範例中,Parallel.Map函式對FTestData陣列中的每個元素應用IsPrime函式,並傳回包含質數的新陣列。

#### 內容解密:

  • Parallel.Map用於對陣列中的每個元素平行應用指定的函式。
  • 匿名函式檢查輸入元素是否為質數,並將結果儲存在輸出元素中。
  • 傳回值表示是否生成了有效的輸出元素。

定時任務:平行執行的簡易排程

定時任務是一種在背景執行緒中定期執行特定任務的機制。這種機制非常適合需要定期執行某些操作的場景,例如網路檢查、定時備份等。

定時任務的實作

以下是一個使用定時任務進行網路連線檢查的範例:

procedure TfrmParallelTimedTask.btnStartClick(Sender: TObject);
begin
  FTimer := Parallel.TimedTask(
    procedure
    begin
      // 執行網路檢查
    end)
    .Interval(10 * 1000) // 每10秒執行一次
    .OnStop(procedure
      begin
        // 清理工作
      end)
    .Run;
end;

在上述範例中,Parallel.TimedTask用於建立一個定時任務,每10秒執行一次網路檢查操作。

#### 內容解密:

  • Parallel.TimedTask用於建立定時任務。
  • Interval方法指定任務執行的間隔時間。
  • OnStop方法指定任務停止時的回呼函式。

探討OmniThreadLibrary中的平行模式

簡介

本章節主要探討開源框架OmniThreadLibrary(OTL)中實作的平行程式設計模式。OTL提供了一系列實用的平行模式,能夠簡化多執行緒程式的開發。

Timed Task模式詳解

初始化與設定

在表單的OnCreate事件中,程式碼使用Parallel.TimedTask工廠建立了一個定時任務:

procedure TfrmTimedTask.FormCreate(Sender: TObject);
begin
  FTimedTask := Parallel.TimedTask
    .Every(inpInterval.Value * 1000)
    .Execute(SendRequest_asy);
  FTimedTask.Stop;
  FTimedTask.TaskConfig(
    Parallel.TaskConfig.OnMessage(Self));
  btnStop.Enabled := false;
end;

啟動與停止定時任務

透過btnStartClick事件啟動定時任務,並強制立即執行一次:

procedure TfrmTimedTask.btnStartClick(Sender: TObject);
begin
  FTimedTask.Active := true;
  FTimedTask.ExecuteNow; // 強制立即執行
  btnStart.Enabled := false;
  btnStop.Enabled := true;
end;

停止定時任務則透過btnStopClick事件實作:

procedure TfrmTimedTask.btnStopClick(Sender: TObject);
begin
  FTimedTask.Active := false;
  btnStart.Enabled := true;
  btnStop.Enabled := false;
end;

動態調整間隔時間

inpInterval元件的值改變時,定時任務的間隔時間會相應更新:

procedure TfrmTimedTask.inpIntervalChange(Sender: TObject);
begin
  FTimedTask.Interval := inpInterval.Value * 1000;
end;

後台任務執行

SendRequest_asy方法在後台執行緒中執行,用於傳送HTTP請求並解析回應:

procedure TfrmTimedTask.SendRequest_asy(const task: IOmniTask);
var
  client: THttpClient;
  date: string;
  jsonObject: TJSonObject;
  origin: string;
  response: IHTTPResponse;
begin
  client := THttpClient.Create;
  try
    response := client.Get('https://httpbin.org/ip');
    date := response.GetHeaderValue('Date');
    jsonObject := TJSonObject.Create;
    try
      try
        origin := (jsonObject.ParseJSONValue(response.ContentAsString) as TJSONObject).Get('origin').JsonValue.Value;
      except
        origin := '';
      end;
    finally
      FreeAndNil(jsonObject);
    end;
    task.Comm.Send(MSG_NEWDATA, TOmniValue.CreateNamed(['Date', date, 'Origin', origin]));
  finally
    FreeAndNil(client);
  end;
end;

#### 內容解密:

此段程式碼建立了一個THttpClient物件,傳送GET請求到https://httpbin.org/ip,並解析回應內容。解析結果包括伺服器時間和客戶端IP,並透過task.Comm.Send將這些資料傳送到表單。

處理訊息

表單透過MSGNewData方法處理來自後台任務的訊息:

procedure TfrmTimedTask.MSGNewData(var msg: TOmniMessage);
begin
  outServerTime.Text := msg.MsgData['Date'];
  outYourIP.Text := msg.MsgData['Origin'];
end;

#### 內容解密:

此方法接收來自後台任務的訊息,並將伺服器時間和客戶端IP顯示在表單上。