Ray 作為新興的分散式計算平台,能有效應對大規模資料處理和機器學習訓練等複雜計算任務。其自動調整和多層次排程機制簡化了分散式應用程式的開發和佈署。本文將探討 Ray 的職務 API,它提供了一種便捷的方式將程式碼提交到 Ray 叢集執行,尤其適用於 Kubernetes 等雲原生環境。透過 HTTP 介面提交任務,規避了 gRPC 在某些網路環境下的限制。文章將以 Python 程式碼示例,逐步講解如何使用 Ray 職務 API 提交任務、設定執行環境、監控任務狀態及取得執行日誌。此外,本文也將探討 Kafka 作為分散式日誌系統的核心概念、架構設計以及 API 使用。從生產者和消費者角度出發,結合 Python 程式碼範例,解析如何利用 Kafka 進行資料串流處理。同時,文章也將涵蓋 Kafka 的安裝組態、主題與分割槽、代理與複制、消費者群組等關鍵機制,並探討序列化和反序列化的最佳實踐,最後展望 Kafka 在未來發展趨勢。
分散式計算平台 Ray 的實際應用場景
透過以上詳細描述與具體範例分析可知,Ray 作為一種高效、靈活且強大的分散式計算平台,適合處理各種複雜且大規模的計算需求。透過自動調整、多層次排程機制及靈活多樣化之依賴管理手段等技術手法能使其廣泛應用於機器學習、資料科學、人工智慧等領域;透過預先設定Conda虛擬環境之手段更能顯著提升啟動及擴充套件速度以滿足各類別高效能計算需求;而其職API則能有效解決函式庫不比對及網路不穩定問題使其在多元化實際應用場景中仍能表現卓越!
Ray職務API:為Ray叢集提交工作
在使用Ray進行分散式計算時,通常會面臨需要將程式碼提交到Ray叢集上執行的需求。雖然可以透過Ray Client附加到現有的Ray叢集,但這種方法在某些情況下並不適用,特別是當Ray佈署在Kubernetes叢集上時。因為Ray節點的gRPC介面使用了不安全的gRPC,這並不被大多數Kubernetes Ingress實作所支援。為瞭解決這個問題,Ray引入了一個新的Ray職務SDK,使用HTTP替代gRPC。
Ray職務API的核心功能
Ray職務API提供了以下核心功能:
- 提交一個新的工作到叢集,並傳回一個工作ID
- 根據執行ID取得工作的狀態,傳回提交工作的狀態
- 根據執行ID取得工作的執行日誌
職務請求結構
一個職務請求通常包含以下元素:
- 一個目錄,內含定義應用程式的檔案和組態集合
- 一個執行入口點
- 一個執行環境,包含所需的檔案、Python函式庫和環境變數
Ray職務API的實作範例
以下是如何使用Ray職務API將程式碼提交到Ray叢集進行執行的範例。
import argparse
import os
import ray
import requests
import qiskit
class ParseKwargs(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, dict())
for value in values:
key, value = value.split('=')
getattr(namespace, self.dest)[key] = value
parser = argparse.ArgumentParser()
parser.add_argument('-k', '--kwargs', nargs='*', action=ParseKwargs)
args = parser.parse_args()
numberOfIterations = int(args.kwargs["iterations"])
print(f"Requested number of iterations is: {numberOfIterations}")
print(f'Environment variable MY_VARIABLE has a value of {os.getenv("MY_VARIABLE")}')
ray.init()
@ray.remote
class Counter:
def __init__(self):
self.counter = 0
def inc(self):
self.counter += 1
def get_counter(self):
return self.counter
counter = Counter.remote()
for _ in range(numberOfIterations):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))
print("Requests", requests.__version__)
print("Qiskit", qiskit.__version__)
內容解密:
以上程式碼展示瞭如何使用Ray職務API提交一個工作到Ray叢集進行執行。具體步驟如下:
- 解析命令列引數:透過
argparse模組解析命令列引數,並將其儲存在args.kwargs中。 - 初始化Ray:使用
ray.init()初始化Ray。 - 定義遠端類別:定義了一個名為
Counter的遠端類別,該類別包含一個計數器,可以透過inc方法增加計數器值,透過get_counter方法取得計數器值。 - 遠端例項化:使用
Counter.remote()遠端例項化Counter類別。 - 迴圈增加計數器:在一個迴圈中多次呼叫
counter.inc.remote()方法增加計數器值。 - 列印結果:列印最終的計數器值以及安裝的函式庫版本。
提交工作到Ray叢集
透過以下步驟將工作提交到Ray叢集:
from ray.job_submission import JobSubmissionClient
from ray.job_submission import JobStatus
client = JobSubmissionClient("<your Ray URL>")
job_id = client.submit_job(
# 執行入口點shell命令
entrypoint="python script_with_parameters.py --kwargs iterations=7",
# 工作目錄
runtime_env={
"working_dir": ".",
"pip": ["requests==2.26.0", "qiskit==0.34.2"],
"env_vars": {"MY_VARIABLE": "foo"}
}
)
print(f"Submitted job with ID : {job_id}")
while True:
status = client.get_job_status(job_id)
print(f"status: {status}")
if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
break
time.sleep(5)
logs = client.get_job_logs(job_id)
print(f"logs: {logs}")
內容解密:
以上程式碼展示瞭如何使用JobSubmissionClient將工作提交到Ray叢集進行執行。
- 初始化JobSubmissionClient:建立一個JobSubmissionClient例項,並指定Ray URL。
- 提交工作:使用
submit_job方法提交工作,並指定入口點shell命令和執行環境組態。 - 取得工作狀態:在一個迴圈中不斷檢查工作狀態,直到工作完成或失敗。
- 取得工作日誌:當工作完成或失敗後,取得並列印工作日誌。
Kafka 入門與技術深度分析
Kafka 這個分散式日誌系統已經成為現代資料處理與流式應用的核心技術之一。玄貓將為你深入解析 Kafka 的基本概念、安裝方法、核心架構以及 API 的使用,並探討其在實務中的應用與挑戰。
Kafka 安裝與組態
Kafka 可以在本地或雲端環境中執行。對於本地安裝,你可以參考 Kafka 的「快速開始」指引。如果你使用的是 Mac,可以透過 Homebrew 來簡化安裝流程。此外,雲端提供商如 Confluent 平台也提供了方便的 Kafka 安裝選項。對於 Kubernetes 環境,Strimzi 是一個非常好的選擇。
基本 Kafka 概念
Kafka 並不是傳統的訊息系統,它是一個分散式日誌系統,用來順序儲存記錄。每條記錄都是一個鍵值對,鍵和值都可選,並以位元組陣列形式儲存。生產者總是將資料寫入日誌的結尾,而消費者可以選擇從特定位置(偏移量)開始讀取。
Kafka 與傳統訊息系統的差異
- 日誌持久化:Kafka 的訊息是持久化的,可以重新播放,而傳統訊息系統中的訊息通常是暫時的。
- 消費者管理:在 Kafka 中,消費者需要管理自己的偏移量,這使得 Kafka 能夠支援大量的消費者。
- 主題與分割槽:Kafka 的主題是由多個分割槽組成的邏輯建構,這有助於提高系統的可擴充套件性。
Kafka 架構與設計
Kafka 的架構由多個佈局組成,包括生產者、消費者、主題、分割槽和代理(brokers)。以下是 Kafka 架構的一些關鍵概念:
主題與分割槽
主題是 Kafka 資料組織的基本單位,每個主題由一個或多個分割槽組成。每個分割槽內的資料是順序的且可被複制到多個代理上。
代理與複制
Kafka 以叢集形式佈署,叢集中包含多個代理(brokers)。每個分割槽可以在一個或多個代理上複制,這提高了系統的容錯性和吞吐量。
消費者群組
消費者群組是一組從相同主題讀取資料的消費者。Kafka 會將每個群組中的消費者分配到不同的分割槽上進行讀取,從而實作負載平衡。
Kafka API 與實務應用
Kafka 提供了五大核心 API:生產者 API、消費者 API、管理客戶端 API、流處理 API 和聯結器 API。這些 API 支援多種程式語言,包括 Java、C/C++、Go、C# 和 Python。
生產者 API
生產者 API 用於將資料流寫入 Kafka 主題。以下是使用 Python 生產者 API 的範例:
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
data = {'key': 'value'}
producer.send('my_topic', value=data)
producer.flush()
內容解密:
這段程式碼展示瞭如何使用 Python 的 Kafka 生產者 API 來將資料傳送到指定的主題。「bootstrap_servers」引數指定了 Kafka 叢集的地址,「value_serializer」引數用於將資料序列化為 JSON 格式。「producer.send」方法將資料傳送到指定的主題,「producer.flush」方法確保所有資料都已傳送完成。
消費者 API
消費者 API 用於從 Kafka 主題讀取資料。以下是使用 Python 消費者 API 的範例:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('my_topic',
bootstrap_servers='localhost:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
for message in consumer:
print(message.value)
內容解密:
這段程式碼展示瞭如何使用 Python 的 Kafka 消費者 API 來從指定主題讀取資料。「bootstrap_servers」引數指定了 Kafka 叢集的地址,「value_deserializer」引數用於將 JSON 格式的資料反序列化為 Python 物件。「consumer」物件會持續從指定主題讀取資料並列印預出來。
序列化與反序列化
Kafka 訊息是以位元組陣列形式儲存的,因此需要進行序列化(marshaling)和反序列化(unmarshaling)。常見的序列化格式包括 Apache Avro、Google Protocol Buffers 和 JSON。
JSON 序列化範例:
import json
data = {'key': 'value'}
serialized_data = json.dumps(data).encode('utf-8')
deserialized_data = json.loads(serialized_data.decode('utf-8'))
內容解密:
這段程式碼展示瞭如何使用 JSON 作為序列化格式。「json.dumps」方法將 Python 物件轉換為 JSON 字串並編碼為位元組陣列,「json.loads」方法則將位元組陣列解碼為 JSON 字串並轉換為 Python 物件。
技術選型與未來趨勢
在選擇序列化格式時,需要考慮其效能、訊息大小、擴充套件性以及語言間操作性。Apache Avro 和 Google Protocol Buffers 在效能和壓縮方面表現優異,但 JSON 則更具靈活性和可讀性。
未來,隨著 IoT 和大資料應用的增長,Kafka 作為一種高效且可擴充套件的日誌系統將會更加普及。此外,Kafka 的無伺服器版本也逐漸受到關注,這將進一步降低執行和維護成本。