Ray和Dask的結合為大規模資料處理提供了強大的解決方案。Ray的分散式計算框架和Dask的資料處理能力相輔相成,能有效提升資料處理效率。利用Ray的to_arrow_refs,可以實作與其他工具(如Apache Spark、Parquet等)的零複製資料交換。資料分割槽是提升效率的關鍵,Ray資料集能有效分割資料,平衡平行計算和排程開銷。Dask on Ray的優勢在於利用分享記憶體儲存資料,尤其在廣播連線等操作中效率更高。需要注意的是,Dask的懶惰評估特性在除錯時需要特別留意。Dask DataFrame的索引功能與pandas略有不同,預設不支援按位置索引行,需自行計算分割槽大小以實作此功能,但效率較低。

使用Ray資料集與Dask進行分散式資料處理

為何選擇Ray和Dask

在現代資料處理領域,Ray和Dask是兩個強大的工具,能夠幫助我們高效地處理大規模資料。Ray提供了靈活的分散式計算框架,而Dask則擁有強大的分散式資料處理能力。這兩者結合使用,能夠顯著提升資料處理的效率和靈活性。

首先,Ray並不限制於內建的工具。如果你有新的支援Arrow的工具,並且使用Arrow支援的型別,to_arrow_refs可以為你提供一個零複製的Arrow表示法。這樣你就可以將這些Ray Arrow物件傳遞給你的工具,無論是用於模型訓練還是其他目的。

許多工具和語言都可以與Arrow和Ray連線,包括Apache Spark、Dask、Apache Parquet、Modin、pandas、TensorFlow、R、JSON和MATLAB。然而,Dask和Spark都有非DataFrame的集合(如bags、arrays和RDDs),這些集合無法透過這些API進行轉換。

資料分割槽

資料分割槽是控制處理資料所需任務數量的關鍵技術。如果你有數十億行資料,使用單一任務來處理它可能需要很長時間。相反地,為每一行分配一個任務會導致排程任務的時間超過實際工作時間。因此,適當的資料分割槽是高效處理資料的一個基本要求。

使用Ray資料集可以讓你有效地將資料分割成分割槽或塊,從而平行化計算同時保持排程任務的開銷在可接受範圍內。隨著分割槽數量的增加,最大平行性也會增加,但排程和通訊開銷也會隨之增加。

Dask與Ray

Dask是一個平行計算函式庫,能夠擴充套件現有的Python生態系統。它提供高層次的Array、Bag和DataFrame集合,這些集合模仿NumPy、列表和pandas,但可以平行操作不適合記憶體中的大型資料集。

在Ray上使用Dask有一些顯著優勢。首先,Dask on Ray能夠利用Ray每個節點/容器中的分享記憶體儲存資料。這對於進行廣播連線等操作特別重要;在Dask中,相同的資料需要在每個工作程式中儲存一次。然而在Ray中,它只需要在每個節點或容器中儲存一次。

與Ray不同的是,Dask通常是懶惰評估的,這意味著它不會立即評估資料直到被強制執行。這可能會使除錯變得困難,因為錯誤可能出現在根本原因幾行之外。

Dask DataFrame索引

索引是pandas中一個強大的功能,但當移至像Dask這樣的分散式系統時會有些限制。由於Dask預設情況下不追蹤每個分割槽的大小,所以不支援按位置索引行。你可以對列進行位置索引,也可以對列或行進行標籤索引。

例如,我們曾經對舊金山COVID-19資料進行過篩選操作:```python mini_sf_covid_df = sf_covid_df[sf_covid_df[‘vaccination_status’] == ‘All’][[‘specimen_collection_date’, ’new_cases’]]


如果你真的需要按位置索引行,你可以自行實作這一功能,計算每個分割槽的大小並使用它來選擇所需的分割槽子集。但是這種方法非常低效,所以Dask避免直接實作這一功能以便你在做出選擇之前意識到這一點。

### 在Ray資料集上使用工具

以下是一些使用Dask和Ray資料集的一些具體案例:

```python
import ray
from ray.data import from_pandas
import dask.dataframe as dd

# 初始化Ray
ray.init()

# 將pandas DataFrame轉換為Ray Dataset
pdf = pd.DataFrame({'a': range(1000), 'b': range(1000, 2000)})
ray_df = from_pandas(pdf)

# 將Ray Dataset轉換為Dask DataFrame
dask_df = dd.from_pandas(ray_df.to_pandas(), npartitions=4)

內容解密:

  • 初始化Ray:首先我們需要初始化Ray環境。
  • 將pandas DataFrame轉換為Ray Dataset:我們建立了一個pandas DataFrame並將其轉換為Ray Dataset。
  • 將Ray Dataset轉換為Dask DataFrame:最後我們將Ray Dataset轉換為Dask DataFrame以進行進一步處理。

總結來說,結合使用Ray和Dask能夠顯著提升大規模資料處理的效率和靈活性。透過適當地分割槽和利用分享記憶體儲存等技術,我們可以高效地平行化計算同時保持排程任務的開銷在可接受範圍內。

Dask中的資料洗牌與最佳化技術

Dask 是一個強大的平行計算框架,特別適合處理大規模資料。在 Dask 中,資料洗牌(shuffle)是一個昂貴的操作,因為它涉及網路傳輸和資料序列化,這些操作相對於從記憶體中讀取資料來說是較慢的。為了減少洗牌的成本,Dask 提供了多種技術來降低資料洗牌的量。這些技術通常依賴於資料的特定屬性或正在執行的操作。然而,理解洗牌對於效能最佳化是至關重要的,如果你的程式碼運作良好,可以跳過這部分內容。

滾動視窗與 map_overlap

滾動視窗是一種在資料處理中常見的技術,當你在分割槽邊緣需要鄰近分割槽的記錄時,會觸發資料洗牌。Dask DataFrame 提供了一個特殊的 map_overlap 函式,讓你可以指定看前視窗(look-before window)和看後視窗(look-after window),這些視窗可以是整數或時間間隔。最簡單的例子就是滾動平均值,如下所示:

def process_overlapped(df):
    return df.rolling('5D').mean()

rolling_avg = partitioned_df.map_overlap(process_overlapped, pd.Timedelta('5D'), 0)

內容解密:

  • 滾動視窗:滾動視窗是指在時間序列或資料序列中,對於每個時間點或資料點,計算一個固定長度範圍內的統計量。
  • map_overlapmap_overlap 函式允許你在處理每個分割槽時,引入來自鄰近分割槽的一些記錄。這樣可以避免不必要的洗牌操作。
  • 看前視窗與看後視窗:這些視窗定義了從鄰近分割槽引入的記錄範圍。看前視窗指的是從當前分割槽之前的分割槽引入記錄,看後視窗指的是從當前分割槽之後的分割槽引入記錄。
  • 最小分割槽大小:為了正確執行 map_overlap 函式,最小分割槽大小必須大於最大視窗大小。

Dask 的滾動視窗不會跨越多個分割槽。如果你的 DataFrame 分割槽方式使得看後或看前超過鄰居分割槽的長度,結果可能會失敗或不正確。Dask 會對時間間隔的看後進行驗證,但對於看前或整數看後不會進行此類別驗證。

聚合操作

聚合操作是另一種可以減少網路傳輸資料量的特殊情況。聚合操作是將記錄組合起來的一種函式。如果你來自 map/reduce 或 Spark 背景,reduceByKey 是經典的聚合操作。聚合可以根據鍵進行,也可以在整個 DataFrame 上進行。

按鍵聚合

要按鍵進行聚合,你首先需要呼叫 groupby 函式來指定聚合的列(key)。例如:

df.groupby("PostCode")

這樣會將 DataFrame 按郵遞區號進行分組。如果你有多個列需要分組,可以使用:

df.groupby(["PostCode", "SicCodes"])

函式聚合

在 Dask 中使用函式聚合時,許多 Pandas 的聚合函式都可用,但效能與本地 Pandas DataFrame 不同。如果按照分割槽鍵進行聚合,Dask 可以在不需要洗牌的情況下計算聚合結果。

加速聚合

加速聚合的方法包括:

  1. 減少聚合列:最快處理的是沒有任何列需要處理。
  2. 同時進行多個聚合:這樣可以減少相同資料需要洗牌的次數。

例如,同時計算平均值和最大值:

dask.compute(
    raw_grouped[["new_cases"]].max(),
    raw_grouped[["new_cases"]].mean()
)

部分聚合

在分散式系統中如 Dask,如果一個聚合操作可以部分評估然後合併,就可以在洗牌之前潛在地結合一些記錄。並非所有部分聚合都是相同效果。部分聚合中的關鍵點是當與同一鍵值進行比較時,所節省出來的儲存空間相比於原始多值所佔用空間而定。

高效聚合

最高效的聚合操作無需線性增加儲存空間。例如:

  • 總和
  • 計數
  • 第一
  • 最小值
  • 最大值
  • 平均值
  • 標準差

更複雜的一些任務如四分位數和唯一計數也有次線性近似選項。這些近似選項很有用,因為精確答案可能需要線性增長儲存空間。

自訂聚合

有些聚合函式不是次線性增長,但可能不會快速增加儲存空間。例如唯一值計數就屬於這一類別。

自訂加權平均

自訂加權平均需要三個函式:chunkaggfinalize

def process_chunk(chunk):
    def weighted_func(df):
        return (df["EmployerSize"] * df["DiffMeanHourlyPercent"]).sum()
    return (chunk.apply(weighted_func), chunk.sum()["EmployerSize"])

def agg(total, weights):
    return (total.sum(), weights.sum())

def finalize(total, weights):
    return total / weights

weighted_mean = dd.Aggregation(
    name='weighted_mean',
    chunk=process_chunk,
    agg=agg,
    finalize=finalize)

內容解密:

  • chunk:處理每個分組/片段。
  • agg:將 chunk 的結果之間結合。
  • finalize:取 agg 的結果並產生最終值。
aggregated = df_diff_with_emp_size.groupby("PostCode")\
    ["EmployerSize", "DiffMeanHourlyPercent"].agg(weighted_mean)

透過瞭解和應用這些技術和原則,你可以有效地減少 Dask 中洗牌操作帶來的成本,從而提高整體效能。

資料分析中的進階技術:Dask的聚合與分割槽管理

在大規模資料處理中,聚合和分割槽管理是兩個至關重要的環節。Dask 作為一個強大的平行計算函式庫,提供了靈活的聚合和分割槽操作功能,能夠有效地處理大規模資料。本文將探討 Dask 在聚合和分割槽管理方面的技術細節,並結合實際案例進行詳細解說。

Dask 的聚合操作

Dask 提供了多種聚合操作方式,能夠根據不同的需求進行資料聚合。以下是一些關鍵技術點:

精確量位數的計算

精確量位數的計算需要更多的洗牌操作來減少空間開銷。Dask 的自定義聚合介面僅在按鍵操作中公開,但這並不意味著所有的聚合都必須按鍵進行。例如,計算整個 DataFrame 的 COVID-19 疫情資料。

完整 DataFrame 的聚合

Dask 的內建完整 DataFrame 聚合使用了一個名為 apply_contact_apply 的低層介面來進行部分聚合。為了避免學習兩種不同的 API,Dask 使用靜態 groupby 操作來實作這一功能。這樣,只需掌握一個介面即可完成所有的聚合操作。

raw_grouped = sf_covid_df.groupby(lambda x: 0)

部分聚合

在某些情況下,部分聚合可能只部分實作。例如,Dask 的 HyperLogLog 只適用於完整 DataFrame。你可以透過複製塊函式、使用 aggcombine 引數 和 finalizeaggregate 引數 來轉換簡單的聚合。

from dask.dataframe import hyperloglog
approx_unique = dd.Aggregation(
    name='aprox_unique',
    chunk=hyperloglog.compute_hll_array,
    agg=hyperloglog.reduce_state,
    finalize=hyperloglog.estimate_count)
aggregated = df_diff_with_emp_size.groupby("PostCode")["EmployerSize", "DiffMeanHourlyPercent"].agg(weighted_mean)

慢速/無效率的聚合

有些聚合操作可能會導致記憶體不足或效率低下。例如,製作列表或計算精確量位數時,會消耗與記錄數量成比例的儲存空間。在這種情況下,使用 Dask 的聚合類別與 apply API 沒有顯著區別。

df.groupby("PostCode")["EmployerId"].apply(lambda g: list(g))

洗牌與分割槽

在分散式系統中,排序通常需要全量洗牌操作,這是一個昂貴且不可避免的過程。雖然全量洗牌本身較慢,但它可以加速未來根據相同分組鍵的操作。

分割槽管理

分割槽管理是高效資料處理的關鍵。Dask 提供了三種主要方法來控制 DataFrame 的分割槽:

set_index

當更改分割槽到新鍵/索引時使用 set_index。例如,將郵遞區號(PostCode)設為索引:

partitioned_df_as_part_of_set_index = mini_sf_covid_df.set_index('specimen_collection_date', divisions=divisions)

repartition

repartition 用於保持相同鍵/索引但改變切分點。例如,將資料重新分成 10 個區塊:

df.set_index("PostCode", npartitions=10)

shuffle

suffle 不會產生已知的分割槽方案,因此無法被 groupby 等操作利用。

分割槽策略

選擇正確的分割槽策略對於高效處理資料至關重要。以下是一些關鍵考量:

  1. 選擇索引:索引對於大多數按鍵操作都很有用,包括過濾、分組和索引。
  2. 決定分割槽大小:目標是保持每個機器忙碌但又不超過一般推薦的範圍(100 MB 到 1 GB)。
  3. 滾動視窗:確保每個分割槽包含適當範圍的記錄。

分割槽案例

以下是一個使用滾動視窗進行時間序列分析的例子:

divisions = pd.date_range(start="2021-01-01", end=datetime.today(), freq='7D').tolist()
partitioned_df_as_part_of_set_index = mini_sf_covid_df.set_index('specimen_collection_date', divisions=divisions)

滾動視窗與索引設定流程

@startuml
:選擇索引; --> :設定索引;
:B; --> :決定分割槽大小;
:C; --> :重新分割槽;
:D; --> :應用滾動視窗;
@enduml

內容解密:

此圖示展示瞭如何透過設定索引、決定分割槽大小、重新分割槽以及應用滾動視窗來進行高效資料處理。每一步都涉及到具體技術選型和設計考量,以確保資料處理的高效性和準確性。