在機器學習應用中,高效的資料流程設計至關重要。本文將探討如何處理靜態和動態資料,並利用向量嵌入技術提升模型效能。同時也說明如何使用同步或非同步方法更新向量嵌入,確保資料一致性。最後,將介紹如何使用 MongoDB 的變化流程功能,結合 Python 和 LangChain,實作對資料變化的實時監控和嵌入更新,以維持資料的即時性和準確性,並提供一個使用 OpenAI 嵌入 MDN 文章標題和摘要的 Python 範例。

設計高效的資料流程

在人工智慧和機器學習應用中,資料流程對於提供準確、相關且快速的結果至關重要。這一節將探討如何設計高效的資料流程,以處理資料來源、資料處理、LLM提示和模型嵌入等。

處理靜態資料來源

靜態資料可以透過 mongoimport 工具匯入到 MongoDB Atlas 中。這個工具支援 JSON、CSV 和 TSV 格式,適合初始載入或批次更新。透過增加插入工作者的數量,可以提高匯入速度。

mongoimport --uri=<連線字串> --db=mdn --collection=subscribers --mode=merge --file=github-20240719.json --upsertFields=github_id --numInsertionWorkers=4

處理動態資料來源

動態資料可以透過 Apache Kafka 聯結器或 Atlas Stream Processing 等方法匯入到 MongoDB 中。這些方法可以提供連續的資料聚合和-schema 驗證,確保資料的一致性和完整性。

儲存操作資料與向量嵌入

當原始資料被儲存或更新時,其對應的向量嵌入必須被重新整理,以準確反映內容。這可以透過同步或非同步的方式完成。

同步方法

同步方法可以在資料函式庫操作之前獲得更新的向量嵌入,並將其寫入資料函式庫中。這個方法適合於快速、簡單的嵌入模型或本地主機模型。

非同步方法

非同步方法可以在資料函式庫操作之後提示嵌入模型,然後更新向量嵌入。這個方法提供了可擴充套件性和可靠性,但可能會引入延遲。

使用變化流程保持嵌入更新

變化流程可以提供實時的資料變化存取,允許應用程式立即反應。這個方法可以使用 Atlas Triggers、Atlas Stream Processing 或 Kafka 聯結器等工具實作。

使用 Atlas Triggers

Atlas Triggers 可以執行應用程式和資料函式庫邏輯,回應特定事件並呼叫 Atlas Functions。

使用 Atlas Stream Processing

Atlas Stream Processing 可以處理複雜的資料流,並提供連續的聚合和-schema 驗證。

使用 Kafka 聯結器

Kafka 聯結器可以將 Apache Kafka 主題中的資料流到 MongoDB 集合中。

使用 Python 實作變化流程

以下是使用 Python 和 LangChain 實作變化流程的範例,使用 OpenAI 嵌入 MDN 文章的標題和摘要:

import os
from langchain_openai import OpenAIEmbeddings
from pymongo import MongoClient
from pymongo.errors import PyMongoError

# 設定 OpenAI API 金鑰
os.environ["OPENAI_API_KEY"] = "YOUR-OPENAI-API-KEY"

# 建立 MongoDB 客戶端
client = MongoClient()

# 設定資料函式庫和集合
db = client["mdn"]
collection = db["articles"]

# 定義變化流程
def change_stream():
    # 建立變化流程
    pipeline = [
        {"$match": {"operationType": "insert"}},
        {"$addFields": {"title_embedding": {"$function": {"name": "get_embedding", "args": ["$title"]}}}},
        {"$addFields": {"summary_embedding": {"$function": {"name": "get_embedding", "args": ["$summary"]}}}}
    ]

    # 執行變化流程
    for change in collection.watch(pipeline=pipeline):
        print(change)

# 執行變化流程
change_stream()

這個範例使用 LangChain 和 OpenAI 嵌入 MDN 文章的標題和摘要,並將其儲存在 MongoDB 中。變化流程可以實時監測資料變化,並更新嵌入以反映最新的內容。

MongoDB Atlas 連線與檔案嵌入

連線設定

首先,我們需要定義MongoDB Atlas的連線字串。這個字串包含了連線到MongoDB Atlas叢集所需的所有資訊,包括主機名稱、埠號、資料函式庫名稱、使用者名稱和密碼等。

MONGODB_ATLAS_CONNSTRING = "mongodb+srv://使用者名稱:密碼@主機名稱/資料函式庫名稱?retryWrites=true&w=majority"

客戶端例項化

接下來,我們需要建立一個MongoClient例項來連線到MongoDB Atlas。這個例項將用於執行對資料函式庫的各種操作,包括查詢、插入、更新和刪除等。

client = MongoClient(MONGODB_ATLAS_CONNSTRING, tls=True, tlsAllowInvalidCertificates=True)

集合選擇

然後,我們需要選擇要操作的集合。在這個例子中,我們選擇了名為"articles"的集合,它儲存在名為"mdn"的資料函式庫中。

coll = client["mdn"]["articles"]

嵌入模型例項化

為了生成檔案嵌入,我們需要例項化一個OpenAIEmbeddings模型。這個模型將用於將檔案轉換為向量嵌入,從而可以進行語義相似度計算等操作。

embedding_model = OpenAIEmbeddings(model="text-embedding-3-large", dimensions=1024, disallowed_special=())

變化處理函式

定義一個函式來處理MongoDB集合中偵測到的變化。這個函式將在集合中有檔案更新時被呼叫。

def handle_changes(change):
    # 從變化事件中提取檔案ID
    doc_id = change["documentKey"]["_id"]
    
    # 建立一個過濾器來識別集合中的檔案
    doc_filter = {"_id": doc_id}
    
    # 結合檔案的標題和摘要成一個單一的文字字串
    text = [change["fullDocument"]["title"] + " " + change["fullDocument"]["summary"]]
    
    # 為文字生成嵌入
    embeddings = embedding_model.embed_documents(text)
    
    # 建立一個更新檔案來設定'semantic_embedding'欄位與生成的嵌入
    update_doc = {"$set": {"semantic_embedding": embeddings[0]}}
    
    # 更新集合中的檔案
    coll.update_one(doc_filter, update_doc)

圖表翻譯:

  flowchart TD
    A[變化偵測] --> B[提取檔案ID]
    B --> C[建立過濾器]
    C --> D[結合標題和摘要]
    D --> E[生成嵌入]
    E --> F[更新檔案]
    F --> G[更新集合]

內容解密:

上述程式碼展示瞭如何連線到MongoDB Atlas,選擇一個集合,並使用OpenAIEmbeddings模型生成檔案嵌入。當集合中偵測到變化時,函式handle_changes將被呼叫,以更新檔案的語義嵌入。這個過程涉及到提取檔案ID,建立過濾器,結合標題和摘要,生成嵌入,然後更新集合中的檔案。這樣可以保證MongoDB集合中的檔案始終具有最新的語義嵌入,方便進行語義相似度查詢等高階應用。

MongoDB 變更流處理

MongoDB 的變更流(Change Stream)功能允許我們實時監控集合中的變更,包括插入、更新和刪除操作。以下是如何使用 Python 的 PyMongo 函式庫來處理變更流的範例。

更新檔案嵌入式查詢

首先,我們需要更新檔案中的嵌入式查詢欄位。這可以透過以下程式碼實作:

set_fields = {
    "$set": {
        "semantic_embedding": embeddings[0]
    }
}

# 更新檔案
coll.update_one(doc_filter, set_fields)
print(f"Updated embeddings for document {doc_id}")

監控 MongoDB 集合變更

接下來,我們需要監控 MongoDB 集合的變更。這可以透過使用 watch() 方法來實作:

try:
    # 定義一個過濾器來匹配插入和更新操作
    stream_filter = [
        {
            "$match": {
                "$or": [
                    {"operationType": "insert"},
                    {
                        "$and": [
                            {"operationType": "update"},
                            {
                                "$or": [
                                    {
                                        "updateDescription.updatedFields.title": {
                                            "$exists": True
                                        }
                                    },
                                    {
                                        "updateDescription.updatedFields.summary": {
                                            "$exists": True
                                        }
                                    },
                                ]
                            },
                        ]
                    },
                ]
            }
        }
    ]

    # 開啟一個變更流來監控集合變更
    with coll.watch(stream_filter, full_document="updateLookup") as stream:
        print("Listening for changes...")
        for change in stream:
            print(f"Change detected: {change}. Processing")
            handle_changes(change)
except PyMongoError as e:
    print(f"Error: {e}")

在上面的程式碼中,我們定義了一個過濾器 stream_filter 來匹配插入和更新操作。然後,我們使用 watch() 方法來開啟一個變更流,並將過濾器應用於變更流。當變更流偵測到變更時,將呼叫 handle_changes() 函式來處理變更。

處理變更

handle_changes() 函式將被呼叫來處理變更流中的每個變更。這個函式可以根據具體需求來實作,例如更新應用程式中的快取或觸發其他業務邏輯。

內容解密:

上述程式碼展示瞭如何使用 PyMongo 函式庫來監控 MongoDB 集合的變更。透過使用變更流,我們可以實時監控集合中的變更,並根據具體需求來處理變更。這個功能在許多應用程式中非常有用,例如實時更新應用程式中的快取或觸發其他業務邏輯。

圖表翻譯:

  graph LR
    A[MongoDB 集合] -->|變更|> B[變更流]
    B -->|過濾器|> C[過濾後的變更]
    C -->|處理|> D[handle_changes()]
    D -->|更新應用程式|> E[應用程式]

上述 Mermaid 圖表展示了 MongoDB 集合、變更流、過濾器、處理函式和應用程式之間的關係。當 MongoDB 集合發生變更時,將觸發變更流。過濾器將被應用於變更流,以過濾出需要處理的變更。然後,處理函式將被呼叫來處理過濾後的變更。最終,應用程式將被更新以反映變更。

從資料擷取、處理到向量嵌入的完整流程建構來看,本文提供的資料流程設計方案,有效地整合了 MongoDB Atlas、Apache Kafka、Atlas Triggers 等工具,實作了從靜態和動態資料來源到向量資料函式庫的無縫銜接。深入分析同步與非同步嵌入更新機制,更凸顯了方案設計的靈活性,能適應不同規模和延遲需求的應用場景。然而,方案的效能瓶頸仍需關注,例如在高吞吐量情境下,非同步更新機制可能引入的延遲累積,以及變化流程的資源消耗。展望未來,隨著向量搜尋技術的持續發展,預期將出現更輕量級、更高效的嵌入模型和資料函式庫索引技術,進一步提升資料流程的效率。對於追求極致效能的應用,建議持續關注向量資料函式庫的效能調校,並探索結合硬體加速方案,以充分釋放向量搜尋的潛力。玄貓認為,掌握資料流程的設計和最佳化,將是未來構建高效能 AI 應用,尤其是根據大語言模型的應用,不可或缺的核心能力。