在機器學習應用中,高效的資料流程設計至關重要。本文將探討如何處理靜態和動態資料,並利用向量嵌入技術提升模型效能。同時也說明如何使用同步或非同步方法更新向量嵌入,確保資料一致性。最後,將介紹如何使用 MongoDB 的變化流程功能,結合 Python 和 LangChain,實作對資料變化的實時監控和嵌入更新,以維持資料的即時性和準確性,並提供一個使用 OpenAI 嵌入 MDN 文章標題和摘要的 Python 範例。
設計高效的資料流程
在人工智慧和機器學習應用中,資料流程對於提供準確、相關且快速的結果至關重要。這一節將探討如何設計高效的資料流程,以處理資料來源、資料處理、LLM提示和模型嵌入等。
處理靜態資料來源
靜態資料可以透過 mongoimport
工具匯入到 MongoDB Atlas 中。這個工具支援 JSON、CSV 和 TSV 格式,適合初始載入或批次更新。透過增加插入工作者的數量,可以提高匯入速度。
mongoimport --uri=<連線字串> --db=mdn --collection=subscribers --mode=merge --file=github-20240719.json --upsertFields=github_id --numInsertionWorkers=4
處理動態資料來源
動態資料可以透過 Apache Kafka 聯結器或 Atlas Stream Processing 等方法匯入到 MongoDB 中。這些方法可以提供連續的資料聚合和-schema 驗證,確保資料的一致性和完整性。
儲存操作資料與向量嵌入
當原始資料被儲存或更新時,其對應的向量嵌入必須被重新整理,以準確反映內容。這可以透過同步或非同步的方式完成。
同步方法
同步方法可以在資料函式庫操作之前獲得更新的向量嵌入,並將其寫入資料函式庫中。這個方法適合於快速、簡單的嵌入模型或本地主機模型。
非同步方法
非同步方法可以在資料函式庫操作之後提示嵌入模型,然後更新向量嵌入。這個方法提供了可擴充套件性和可靠性,但可能會引入延遲。
使用變化流程保持嵌入更新
變化流程可以提供實時的資料變化存取,允許應用程式立即反應。這個方法可以使用 Atlas Triggers、Atlas Stream Processing 或 Kafka 聯結器等工具實作。
使用 Atlas Triggers
Atlas Triggers 可以執行應用程式和資料函式庫邏輯,回應特定事件並呼叫 Atlas Functions。
使用 Atlas Stream Processing
Atlas Stream Processing 可以處理複雜的資料流,並提供連續的聚合和-schema 驗證。
使用 Kafka 聯結器
Kafka 聯結器可以將 Apache Kafka 主題中的資料流到 MongoDB 集合中。
使用 Python 實作變化流程
以下是使用 Python 和 LangChain 實作變化流程的範例,使用 OpenAI 嵌入 MDN 文章的標題和摘要:
import os
from langchain_openai import OpenAIEmbeddings
from pymongo import MongoClient
from pymongo.errors import PyMongoError
# 設定 OpenAI API 金鑰
os.environ["OPENAI_API_KEY"] = "YOUR-OPENAI-API-KEY"
# 建立 MongoDB 客戶端
client = MongoClient()
# 設定資料函式庫和集合
db = client["mdn"]
collection = db["articles"]
# 定義變化流程
def change_stream():
# 建立變化流程
pipeline = [
{"$match": {"operationType": "insert"}},
{"$addFields": {"title_embedding": {"$function": {"name": "get_embedding", "args": ["$title"]}}}},
{"$addFields": {"summary_embedding": {"$function": {"name": "get_embedding", "args": ["$summary"]}}}}
]
# 執行變化流程
for change in collection.watch(pipeline=pipeline):
print(change)
# 執行變化流程
change_stream()
這個範例使用 LangChain 和 OpenAI 嵌入 MDN 文章的標題和摘要,並將其儲存在 MongoDB 中。變化流程可以實時監測資料變化,並更新嵌入以反映最新的內容。
MongoDB Atlas 連線與檔案嵌入
連線設定
首先,我們需要定義MongoDB Atlas的連線字串。這個字串包含了連線到MongoDB Atlas叢集所需的所有資訊,包括主機名稱、埠號、資料函式庫名稱、使用者名稱和密碼等。
MONGODB_ATLAS_CONNSTRING = "mongodb+srv://使用者名稱:密碼@主機名稱/資料函式庫名稱?retryWrites=true&w=majority"
客戶端例項化
接下來,我們需要建立一個MongoClient例項來連線到MongoDB Atlas。這個例項將用於執行對資料函式庫的各種操作,包括查詢、插入、更新和刪除等。
client = MongoClient(MONGODB_ATLAS_CONNSTRING, tls=True, tlsAllowInvalidCertificates=True)
集合選擇
然後,我們需要選擇要操作的集合。在這個例子中,我們選擇了名為"articles"的集合,它儲存在名為"mdn"的資料函式庫中。
coll = client["mdn"]["articles"]
嵌入模型例項化
為了生成檔案嵌入,我們需要例項化一個OpenAIEmbeddings模型。這個模型將用於將檔案轉換為向量嵌入,從而可以進行語義相似度計算等操作。
embedding_model = OpenAIEmbeddings(model="text-embedding-3-large", dimensions=1024, disallowed_special=())
變化處理函式
定義一個函式來處理MongoDB集合中偵測到的變化。這個函式將在集合中有檔案更新時被呼叫。
def handle_changes(change):
# 從變化事件中提取檔案ID
doc_id = change["documentKey"]["_id"]
# 建立一個過濾器來識別集合中的檔案
doc_filter = {"_id": doc_id}
# 結合檔案的標題和摘要成一個單一的文字字串
text = [change["fullDocument"]["title"] + " " + change["fullDocument"]["summary"]]
# 為文字生成嵌入
embeddings = embedding_model.embed_documents(text)
# 建立一個更新檔案來設定'semantic_embedding'欄位與生成的嵌入
update_doc = {"$set": {"semantic_embedding": embeddings[0]}}
# 更新集合中的檔案
coll.update_one(doc_filter, update_doc)
圖表翻譯:
flowchart TD A[變化偵測] --> B[提取檔案ID] B --> C[建立過濾器] C --> D[結合標題和摘要] D --> E[生成嵌入] E --> F[更新檔案] F --> G[更新集合]
內容解密:
上述程式碼展示瞭如何連線到MongoDB Atlas,選擇一個集合,並使用OpenAIEmbeddings模型生成檔案嵌入。當集合中偵測到變化時,函式handle_changes
將被呼叫,以更新檔案的語義嵌入。這個過程涉及到提取檔案ID,建立過濾器,結合標題和摘要,生成嵌入,然後更新集合中的檔案。這樣可以保證MongoDB集合中的檔案始終具有最新的語義嵌入,方便進行語義相似度查詢等高階應用。
MongoDB 變更流處理
MongoDB 的變更流(Change Stream)功能允許我們實時監控集合中的變更,包括插入、更新和刪除操作。以下是如何使用 Python 的 PyMongo 函式庫來處理變更流的範例。
更新檔案嵌入式查詢
首先,我們需要更新檔案中的嵌入式查詢欄位。這可以透過以下程式碼實作:
set_fields = {
"$set": {
"semantic_embedding": embeddings[0]
}
}
# 更新檔案
coll.update_one(doc_filter, set_fields)
print(f"Updated embeddings for document {doc_id}")
監控 MongoDB 集合變更
接下來,我們需要監控 MongoDB 集合的變更。這可以透過使用 watch()
方法來實作:
try:
# 定義一個過濾器來匹配插入和更新操作
stream_filter = [
{
"$match": {
"$or": [
{"operationType": "insert"},
{
"$and": [
{"operationType": "update"},
{
"$or": [
{
"updateDescription.updatedFields.title": {
"$exists": True
}
},
{
"updateDescription.updatedFields.summary": {
"$exists": True
}
},
]
},
]
},
]
}
}
]
# 開啟一個變更流來監控集合變更
with coll.watch(stream_filter, full_document="updateLookup") as stream:
print("Listening for changes...")
for change in stream:
print(f"Change detected: {change}. Processing")
handle_changes(change)
except PyMongoError as e:
print(f"Error: {e}")
在上面的程式碼中,我們定義了一個過濾器 stream_filter
來匹配插入和更新操作。然後,我們使用 watch()
方法來開啟一個變更流,並將過濾器應用於變更流。當變更流偵測到變更時,將呼叫 handle_changes()
函式來處理變更。
處理變更
handle_changes()
函式將被呼叫來處理變更流中的每個變更。這個函式可以根據具體需求來實作,例如更新應用程式中的快取或觸發其他業務邏輯。
內容解密:
上述程式碼展示瞭如何使用 PyMongo 函式庫來監控 MongoDB 集合的變更。透過使用變更流,我們可以實時監控集合中的變更,並根據具體需求來處理變更。這個功能在許多應用程式中非常有用,例如實時更新應用程式中的快取或觸發其他業務邏輯。
圖表翻譯:
graph LR A[MongoDB 集合] -->|變更|> B[變更流] B -->|過濾器|> C[過濾後的變更] C -->|處理|> D[handle_changes()] D -->|更新應用程式|> E[應用程式]
上述 Mermaid 圖表展示了 MongoDB 集合、變更流、過濾器、處理函式和應用程式之間的關係。當 MongoDB 集合發生變更時,將觸發變更流。過濾器將被應用於變更流,以過濾出需要處理的變更。然後,處理函式將被呼叫來處理過濾後的變更。最終,應用程式將被更新以反映變更。
從資料擷取、處理到向量嵌入的完整流程建構來看,本文提供的資料流程設計方案,有效地整合了 MongoDB Atlas、Apache Kafka、Atlas Triggers 等工具,實作了從靜態和動態資料來源到向量資料函式庫的無縫銜接。深入分析同步與非同步嵌入更新機制,更凸顯了方案設計的靈活性,能適應不同規模和延遲需求的應用場景。然而,方案的效能瓶頸仍需關注,例如在高吞吐量情境下,非同步更新機制可能引入的延遲累積,以及變化流程的資源消耗。展望未來,隨著向量搜尋技術的持續發展,預期將出現更輕量級、更高效的嵌入模型和資料函式庫索引技術,進一步提升資料流程的效率。對於追求極致效能的應用,建議持續關注向量資料函式庫的效能調校,並探索結合硬體加速方案,以充分釋放向量搜尋的潛力。玄貓認為,掌握資料流程的設計和最佳化,將是未來構建高效能 AI 應用,尤其是根據大語言模型的應用,不可或缺的核心能力。