Ray 提供了 Workflow 和 Datasets 兩大工具,協助開發者建構高效能的資料處理與機器學習流程。Workflow 允許開發者定義複雜的工作流程,並透過事件監聽器實作更精細的控制,而自訂事件監聽器則提供了更高的彈性。此外,Ray 也提供豐富的後設資料,方便開發者監控和分析工作流程的執行情況。在資料處理方面,Ray Datasets 提供了統一的資料格式,支援與 Spark、Modin、Dask 和 Mars 等工具的整合,簡化了跨平台資料分享的流程。同時,Ray Datasets 也支援多種檔案格式和檔案系統,方便開發者讀取和寫入資料。利用 Apache Arrow 的列式儲存和跨程式資料交換特性,Ray 能夠高效地載入和處理大規模資料。

Ray Workflow 實務應用與高階資料管理

在現代軟體開發中,工作流程(Workflow)管理是一個關鍵環節,特別是在處理複雜的資料處理、機器學習訓練及長期業務流程時。Ray Workflow 提供了一個強大的框架,讓開發者能夠建立動態的Pipeline,並提供豐富的工作流程管理功能。玄貓將探討如何利用 Ray Workflow 來實作這些功能,並結合實務經驗來分析其優勢與應用。

工作流程事件監聽器

在 Ray Workflow 中,事件監聽器(Event Listener)是一個重要的概念,它允許開發者定義特定的事件觸發條件,從而實作更靈活的工作流程控制。以下是一個簡單的例子,展示如何使用事件監聽器來控制工作流程的執行:

from ray import workflow
import time

@workflow.step
def event1_step():
    # 模擬事件1的處理
    return "Event 1 completed"

@workflow.step
def event2_step():
    # 模擬事件2的處理
    return "Event 2 completed"

# 定義一個計時器監聽器
timer_listener = workflow.event_listener.TimerListener(time.time() + 30)

# 定義一個收集步驟,當事件1和事件2完成後執行
@workflow.step
def gather(*args):
    return args

# 將事件1和事件2的步驟繫結到收集步驟上,並設定計時器監聽器
gather.step(event1_step, event2_step).run(timer_listener)

內容解密:

這段程式碼展示瞭如何使用 Ray Workflow 的事件監聽器來控制工作流程的執行。首先,我們定義了兩個步驟 event1_stepevent2_step,分別模擬了兩個事件的處理過程。接著,我們建立了一個計時器監聽器 TimerListener,設定了30秒後觸發的條件。

gather 函式中,我們使用 @workflow.step 裝飾器來標記這是一個工作流程步驟。這個函式接受多個引數,並傳回這些引數。最後,我們將 event1_stepevent2_step 的步驟繫結到 gather 上,並使用 .run() 方法啟動工作流程。

自訂事件監聽器

除了內建的事件監聽器外,Ray Workflow 還支援自訂事件監聽器。以下是一個範例,展示如何透過繼承 EventListener 介面來建立自訂事件監聽器:

from ray import workflow

class CustomEventListener:
    def __init__(self):
        """可選建構函式。僅僅只有無引數建構函式會被呼叫"""
        pass

    async def poll_for_event(self, *args, **kwargs) -> Event:
        """當接收到事件時應該傳回"""
        raise NotImplementedError

    async def event_checkpointed(self, event: Event) -> None:
        """可選。當事件已被檢查點且交易可以安全提交時呼叫"""
        pass

內容解密:

這段程式碼展示瞭如何建立一個自訂的事件監聽器。首先,我們繼承 EventListener 介面並定義了一個新類別 CustomEventListener。在這個類別中,我們實作了兩個方法:

  1. __init__:這是類別的建構函式,可以用來初始化一些狀態或資源。
  2. poll_for_event:這個方法應該在接收到事件時傳回。由於每個具體應用場景下的事件處理邏輯不同,因此這裡使用 NotImplementedError 提示開發者需要實作具體邏輯。
  3. event_checkpointed:這是一個可選方法,當事件已被檢查點且交易可以安全提交時會被呼叫。

工作流程後設資料

在工作流程執行過程中,觀察性(Observability)是一個重要需求。除了取得工作流程執行結果外,我們還需要了解內部狀態、執行路徑、效能以及變數值等資訊。Ray 提供了豐富的後設資料支援來滿足這些需求。

工作流程後設資料

Ray 的工作流程後設資料分為兩種級別:工作流程級別和步驟級別。

  • 工作流程級別後設資料:包含工作流程狀態(如 RUNNING、FAILED、RESUMABLE、CANCELED 或 SUCCESSFUL)、自訂後設資料和執行統計資訊(如開始和結束時間)。
  • 步驟級別後設資料:包含步驟名稱、步驟選項、自訂後設資料和執行統計資訊。

以下是如何取得這些後設資料的範例:

# 取得工作流程元資料
workflow_metadata = workflow.get_metadata(workflow_id)

# 取得特定步驟的元資料
step_metadata = workflow.get_metadata(workflow_id, name="step_name")

自訂後設資料

除了標準後設資料外,Ray 還允許新增自訂後設資料來捕捉感興趣的引數。以下是如何新增自訂後設資料的範例:

# 新增工作流程級別自定義元資料
workflow.run(metadata={"custom_key": "custom_value"})

# 新增步驟級別自定義元資料
@workflow.step(metadata={"custom_key": "custom_value"})
def step_function():
    pass

# 或者透過 options 方法新增
step_function.options(metadata={"custom_key": "custom_value"})

虛擬演員後設資料

Ray 還支援從虛擬演員執行中暴露後設資料,並取得工作流程/步驟後設資料以控制執行。

Ray Datasets 與多工具整合

隨著資料生態系統的快速發展,我們經常需要在資料管道中使用多種工具。Ray Datasets 提供了一種分享格式,讓不同工具之間可以無縫協作。Ray Datasets 支援 Spark、Modin、Dask 和 Mars ,並且可以與 TensorFlow 等機器學習工具配合使用。

此圖示說明瞭 Ray Datasets 的基本結構及其與其他工具的整合方式:

  graph TD;
    A[Ray Datasets] --> B[Spark];
    A --> C[Modin];
    A --> D[Dask];
    A --> E[Mars];
    A --> F[TensorFlow];

內容解密:

此圖示展示了 Ray Datasets 的核心概念及其與其他工具之間的關係。Ray Datasets 作為一種分享格式,可以讓不同工具之間無縫協作。例如:Spark、Modin、Dask 和 Mars 等資料處理工具都可以透過 Ray Datasets 來分享資料;TensorFlow 則可以透過 Ray Datasets 與其他工具進行協同操作。

寫在最後

Ray Workflow 和 Ray Datasets 提供了一種靈活且強大的方式來管理複雜的資料處理和機器學習管道。透過結合實務經驗和技術選型分析,「玄貓」深信 Ray 是未來資料處理和機器學習領域中的重要工具之一。

隨著技術不斷進步,「玄貓」預測未來將會有更多高階功能和最佳化措施被新增到 Ray 中,「玄貓」也將持續關注這些變化並分享最新進展給大家。「玄貓」認為,「玄貓」希望大家能夠充分利用這些強大工具來推動技術創新與應用。

雲端運算框架 Ray 的資料集處理

Ray 是一個功能強大的雲端運算框架,能夠高效地處理大規模資料。在本篇文章中,玄貓將探討如何使用 Ray 進行資料集的建立與儲存,並介紹與其他工具的整合方法。

從本地集合建立資料集

在 Ray 中,你可以使用 ray.data.from_items 從本地集合建立資料集。然而,本地集合的資料量有限,因此 Ray 支援多種其他選項來處理大規模資料。

Apache Arrow

Apache Arrow 是一個語言無關的記憶體格式,用於表示平面和階層化資料。其主要組成部分包括:

  • 豐富的資料型別:支援 SQL 和 JSON 型別,如整數、大整數、十進位制、變長字串、對映、結構和陣列。
  • 記憶體中的列式表示:支援複雜的記錄結構,並根據定義的資料型別構建。
  • 資料結構:支援選單、雜湊表和佇列等資料結構。
  • 跨程式資料交換:使用分享記憶體、TCP/IP 和遠端直接記憶體存取(RDMA)。
  • 多語言支援:包括 Java、C++、Python、Ruby、Rust、Go 和 JavaScript。
  • 操作演算法:支援點陣圖選擇、雜湊、過濾、分桶、排序和比對等操作。
  • 記憶體壓縮:透過記憶體中的列式壓縮來提高效率。
  • 記憶體持久化工具:透過非揮發性記憶體、SSD 或 HDD 進行短期持久化。

Ray 使用 Apache Arrow 載入外部資料到資料集中,並支援多種檔案格式和檔案系統。目前支援的格式包括 CSV、JSON、Parquet、NumPy、文字和原始二進位制檔案。這些載入資料的函式遵循 read_[format] 模式,並位於 ray.data 模組中。

載入本地資料

以下是一個載入本地 CSV 資料的範例:

ds = ray.data.read_csv(
    "2021",
    partition_filter=None # 由於檔案不以 .csv 結尾
)

在載入過程中,你可以指定目標平行度,但 Ray 可能會受到正在載入的檔案數量限制。選擇適當的目標平行度是複雜的,需要考慮多種因素。一般來說,平行度應該使資料可以輕鬆適應記憶體,並利用叢集中的所有機器,同時避免過高的開銷。

自訂 Arrow 載入方式

你可以透過 arrow_open_stream_args 引數向 Arrow 傳遞額外引數來自訂載入方式,例如壓縮或緩衝區大小。

支援的檔案系統

Arrow 本身內建快速支援 S3、HDFS 和普通檔案系統。Ray 會根據路徑自動選擇正確的內建檔案系統驅動程式。當從本地檔案系統載入時,你需要確保在分散模式下所有工作節點都能存取該檔案。

此外,Arrow 還使用 fsspec 支援更廣泛的檔案系統,包括 HTTPS(當 aiohttp 安裝時)。與內建檔案系統不同的是,你需要手動指設定檔案系統。

載入 HTTPS 資料

以下是一個載入 HTTPS 資料的範例:

fs = fsspec.filesystem('https')
ds = ray.data.read_csv(
    "https://https://gender-pay-gap.service.gov.uk/viewing/download-data/2021",
    filesystem=fs,
    partition_filter=None # 由於檔案不以 .csv 結尾
)

目前這裡存在一個問題:協定會被錯誤地移除一次,因此你需要把它重複兩次。

資料集寫入

Ray 能夠寫入所有它能讀取的格式。寫入函式遵循 write_[format] 模式。與讀取路徑不同的是,寫入路徑始終以輸入資料集的平行度進行寫入。

輸出 CSV 資料

以下是一個將資料集寫入 S3 的範例:

word_count.write_csv("s3://ray-demo/wc")

如果 Ray 不支援你想要的格式或檔案系統 I/O 操作,你應該檢查是否有其他 Ray 支援的工具來完成此操作。然後,如下一節所述,你可以將資料集轉換為所需工具。

使用 Ray 資料集與不同工具

Ray 提供內建工具來在各種執行於 Ray 的資料工具之間分享資料。這些工具各自有其內部表示方式,但 Ray 能夠根據需要轉換資料。在第一次使用資料集與 Spark 或 Dask 之前,你需要執行一些設定程式碼以將它們委派給 Ray 排程式。

啟用 Dask 在 Ray 上

以下是啟用 Dask 在 Ray 上的一個範例:

from ray.util.dask import enable_dask_on_ray, disable_dask_on_ray
enable_dask_on_ray() # 將所有 Dask 呼叫路由到 Ray 排程式

啟用 Spark 在 Ray 上

以下是啟用 Spark 在 Ray 上的一個範例:

import raydp
spark = raydp.init_spark(
    app_name = "sleepy",
    num_executors = 2,
    executor_cores = 1,
    executor_memory = "2GB"
)

與讀取和載入資料集的函式類別似的是,「轉移到-Ray」函式定義在 ray.data 模組中並遵循 from_[x] 模式,「x」是工具名稱。同樣地,我們使用定義在資料集上的 to_[x] 函式將資料集轉換為工具,「x」是工具名稱。以下範例展示瞭如何使用這種模式將 Ray 資料集轉換為 Dask DataFrame:

轉換為 Dask DataFrame

dask_df = ds.to_dask()

請注意,資料集不使用 Ray 的執行環境來管理依賴項;因此,你必須確保所需工具已安裝在工作器映像中;參見附錄 B 。這對於 Spark 來說更加複雜,因為它需要 Java 虛擬機器(JVM)和其他非 Python 元件。

概括

Ray 的強大之處在於其靈活性和對多種工具及格式的支援。透過深入理解 Apache Arrow 和其他相關技術,玄貓希望能夠幫助你更好地利用 Ray 構建高效且可擴充套件的資料處理管道。

雲端運算技術趨勢與預測

隨著雲端運算技術的不斷演進,「雲原生」已成為許多企業追求的目標。「雲原生」不僅僅是技術上的轉變,更是業務模式和營運方式上的革新。未來我們可能會看到更多根據 Kubernetes 的容器協調技術融入雲端運算框架中。

此外,「Serverless」(無伺服器)架構也逐漸普及,「Serverless」最大的優點是無需管理伺服器基礎設施,「Serverless」架構可以極大地降低營運成本和提升開發效率。

最後,「AI 無處不在」已經成為現實,「AI」將深刻改變我們生活方式以及工作方式。「AI」與「雲端運算」技術相互促進共同發展並形成「正向反饋迴路」。

小段落標題1

繼續觀察「Serverless」架構發展趨勢對「AI 無處不在」理念實作具有什麼樣影響?

小段落標題2

隨著「雲原生」轉型推動企業業務革新及開源社群推動「AI 無處不在」,未來還有什麼樣潛在挑戰呢?