在 Delphi 開發中,處理大量資料或執行複雜任務時,多執行緒程式設計是提升效率的關鍵。OmniThreadLibrary (OTL) 框架提供了一系列簡化多執行緒開發的模式,其中管線模式、對映模式和定時任務尤其值得關注。本文將以檔案統計分析為例,示範如何應用這些模式。首先,管線模式能將檔案統計分析過程分解為掃描資料夾、讀取檔案、收集統計資料和彙總結果等多個階段,每個階段可獨立運作,提升整體效率。接著,對映模式能簡化對大量檔案的相同操作,例如檢查檔案是否符合特定條件。最後,定時任務能定期執行特定操作,例如定時備份或網路檢查,而 OTL 提供了更彈性的設定,例如動態調整間隔時間。
管道模式在檔案統計分析中的應用
管道模式是一種在平行處理中非常實用的設計模式,尤其是在處理大量資料時,能夠有效地將任務分解為多個階段,每個階段負責特定的處理工作,從而提高整體的處理效率和系統的可擴充套件性。
管道模式的基本概念
在管道模式中,資料透過一系列的階段進行處理,每個階段都是一個獨立的任務或執行緒。這些階段透過輸入和輸出通道進行通訊,前一個階段的輸出作為後一個階段的輸入。這種模式非常適合於處理Pipeline式的任務,如檔案處理、資料分析等。
檔案統計分析管道的實作
以下是一個使用管道模式進行檔案統計分析的例子。這個管道包含四個階段:FolderScanner、FileReader、StatCollector和StatAggregator。
第一階段:FolderScanner
FolderScanner負責掃描指定的資料夾及其子資料夾,找出所有的檔案,並將檔案的路徑傳送到輸出通道。
procedure TfrmPipeline.FolderScanner(
const input, output: IOmniBlockingCollection;
const task: IOmniTask);
var
value: TOmniValue;
stopEnum: boolean;
begin
for value in input do begin
if task.CancellationToken.IsSignalled then
break;
stopEnum := false;
DSiEnumFilesEx(value, '*.txt', faAnyFile,
procedure(const folder, filename: string)
begin
if task.CancellationToken.IsSignalled then
stopEnum := true
else
output.TryAdd(TPath.Combine(folder, filename));
end,
false, false, true);
if stopEnum then
break;
end;
end;
內容解密:
FolderScanner方法接收輸入和輸出通道,以及任務控制物件。它遍歷輸入通道中的值,通常是資料夾路徑。DSiEnumFilesEx函式用於掃描資料夾及其子資料夾,並對每個找到的檔案執行匿名方法。- 在匿名方法中,檢查是否應該因取消訊號而停止列舉。如果沒有取消,則將檔案的完整路徑新增到輸出通道。
TryAdd方法用於新增資料到輸出通道,這是一種非阻塞呼叫,如果管道被取消,它可能會失敗。
第二階段:FileReader
FileReader讀取檔案內容,並將其儲存在TStringList中,然後將其作為一個物件傳送到輸出通道。
procedure TfrmPipeline.FileReader(
const input, output: IOmniBlockingCollection;
const task: IOmniTask);
var
sl: TStringList;
value: TOmniValue;
outValue: TOmniValue;
begin
for value in input do begin
if task.CancellationToken.IsSignalled then
break;
sl := TStringList.Create;
try
sl.LoadFromFile(value);
outValue.AsOwnedObject := sl;
sl := nil;
output.TryAdd(outValue);
task.Comm.Send(MSG_OK, value);
except
task.Comm.Send(MSG_ERROR, value);
FreeAndNil(sl);
end;
end;
end;
內容解密:
FileReader遍歷輸入通道中的檔案路徑,並嘗試載入每個檔案的內容到TStringList。TStringList被封裝在TOmniValue中作為一個擁有的物件,這樣可以確保它在適當的時候被銷毀。- 成功載入後,將
TStringList新增到輸出通道,並向主執行緒傳送一個成功訊息。 - 如果載入失敗,向主執行緒傳送一個錯誤訊息,並釋放
TStringList。
平行模式的進階應用:管線與對映
在平行程式設計中,管線(Pipeline)與對映(Map)是兩種非常實用的模式,能夠有效地簡化平行任務的實作。本章將探討這兩種模式的原理、實作方法及其在實際應用中的優勢。
管線模式:資料處理的最佳實踐
管線模式是一種將任務分解為多個階段,每個階段處理資料並將結果傳遞給下一個階段的平行處理方式。這種模式非常適合處理需要多步驟處理的資料流,例如影象處理、資料分析等。
管線模式的實作
以下是一個使用管線模式處理檔案統計的範例:
procedure TfrmPipeline.btnStartClick(Sender: TObject);
begin
FPipeline := Parallel.Pipeline.NumStages(3)
.Stage(CollectFiles)
.Stage(ProcessFile)
.Stage(Aggregate)
.OnStopInvoke(ShowResult)
.Run;
end;
在上述範例中,管線被分為三個階段:收集檔案、處理檔案和匯總結果。每個階段都是一個獨立的任務,平行執行以提高效率。
#### 內容解密:
Parallel.Pipeline用於建立管線。NumStages(3)指定管線包含三個階段。Stage方法用於定義每個階段的任務。OnStopInvoke(ShowResult)指定管線完成後的回呼函式。
對映模式:平行資料處理的簡化
對映模式是一種對集合中的每個元素應用某個函式,並傳回包含結果的新集合的平行處理模式。這種模式非常適合需要對大量資料進行相同操作的場景。
對映模式的實作
以下是一個使用對映模式檢查陣列中質數的範例:
procedure TfrmParallelMap.btnParallelClick(Sender: TObject);
var
output: TArray<integer>;
sw: TStopwatch;
begin
PrepareTestData;
sw := TStopwatch.StartNew;
output := Parallel.Map<integer, integer>(FTestData,
function(const source: integer; var target: integer): boolean
begin
Result := IsPrime(source);
target := source;
end);
sw.Stop;
LogResult(output, sw.ElapsedMilliseconds);
end;
在上述範例中,Parallel.Map函式對FTestData陣列中的每個元素應用IsPrime函式,並傳回包含質數的新陣列。
#### 內容解密:
Parallel.Map用於對陣列中的每個元素平行應用指定的函式。- 匿名函式檢查輸入元素是否為質數,並將結果儲存在輸出元素中。
- 傳回值表示是否生成了有效的輸出元素。
定時任務:平行執行的簡易排程
定時任務是一種在背景執行緒中定期執行特定任務的機制。這種機制非常適合需要定期執行某些操作的場景,例如網路檢查、定時備份等。
定時任務的實作
以下是一個使用定時任務進行網路連線檢查的範例:
procedure TfrmParallelTimedTask.btnStartClick(Sender: TObject);
begin
FTimer := Parallel.TimedTask(
procedure
begin
// 執行網路檢查
end)
.Interval(10 * 1000) // 每10秒執行一次
.OnStop(procedure
begin
// 清理工作
end)
.Run;
end;
在上述範例中,Parallel.TimedTask用於建立一個定時任務,每10秒執行一次網路檢查操作。
#### 內容解密:
Parallel.TimedTask用於建立定時任務。Interval方法指定任務執行的間隔時間。OnStop方法指定任務停止時的回呼函式。
探討OmniThreadLibrary中的平行模式
簡介
本章節主要探討開源框架OmniThreadLibrary(OTL)中實作的平行程式設計模式。OTL提供了一系列實用的平行模式,能夠簡化多執行緒程式的開發。
Timed Task模式詳解
初始化與設定
在表單的OnCreate事件中,程式碼使用Parallel.TimedTask工廠建立了一個定時任務:
procedure TfrmTimedTask.FormCreate(Sender: TObject);
begin
FTimedTask := Parallel.TimedTask
.Every(inpInterval.Value * 1000)
.Execute(SendRequest_asy);
FTimedTask.Stop;
FTimedTask.TaskConfig(
Parallel.TaskConfig.OnMessage(Self));
btnStop.Enabled := false;
end;
啟動與停止定時任務
透過btnStartClick事件啟動定時任務,並強制立即執行一次:
procedure TfrmTimedTask.btnStartClick(Sender: TObject);
begin
FTimedTask.Active := true;
FTimedTask.ExecuteNow; // 強制立即執行
btnStart.Enabled := false;
btnStop.Enabled := true;
end;
停止定時任務則透過btnStopClick事件實作:
procedure TfrmTimedTask.btnStopClick(Sender: TObject);
begin
FTimedTask.Active := false;
btnStart.Enabled := true;
btnStop.Enabled := false;
end;
動態調整間隔時間
當inpInterval元件的值改變時,定時任務的間隔時間會相應更新:
procedure TfrmTimedTask.inpIntervalChange(Sender: TObject);
begin
FTimedTask.Interval := inpInterval.Value * 1000;
end;
後台任務執行
SendRequest_asy方法在後台執行緒中執行,用於傳送HTTP請求並解析回應:
procedure TfrmTimedTask.SendRequest_asy(const task: IOmniTask);
var
client: THttpClient;
date: string;
jsonObject: TJSonObject;
origin: string;
response: IHTTPResponse;
begin
client := THttpClient.Create;
try
response := client.Get('https://httpbin.org/ip');
date := response.GetHeaderValue('Date');
jsonObject := TJSonObject.Create;
try
try
origin := (jsonObject.ParseJSONValue(response.ContentAsString) as TJSONObject).Get('origin').JsonValue.Value;
except
origin := '';
end;
finally
FreeAndNil(jsonObject);
end;
task.Comm.Send(MSG_NEWDATA, TOmniValue.CreateNamed(['Date', date, 'Origin', origin]));
finally
FreeAndNil(client);
end;
end;
#### 內容解密:
此段程式碼建立了一個THttpClient物件,傳送GET請求到https://httpbin.org/ip,並解析回應內容。解析結果包括伺服器時間和客戶端IP,並透過task.Comm.Send將這些資料傳送到表單。
處理訊息
表單透過MSGNewData方法處理來自後台任務的訊息:
procedure TfrmTimedTask.MSGNewData(var msg: TOmniMessage);
begin
outServerTime.Text := msg.MsgData['Date'];
outYourIP.Text := msg.MsgData['Origin'];
end;
#### 內容解密:
此方法接收來自後台任務的訊息,並將伺服器時間和客戶端IP顯示在表單上。