Pinecone 向量資料函式庫在處理百萬級資料時,效能和成本是關鍵考量。資料的預處理和分塊策略直接影響嵌入效率和後續查詢速度。選擇合適的 OpenAI 嵌入模型至關重要,需考量模型的效能、成本和 API 呼叫限制。此外,Pinecone 的雲端和區域選擇、讀寫單位和儲存成本都會影響整體開銷,需要仔細評估和監控。程式碼實作部分,示範瞭如何使用 Python 和 Pandas 進行資料處理、分塊,以及如何使用 OpenAI API 進行嵌入。分批處理和適當的暫停時間可以避免觸發 API 速率限制,確保嵌入過程順利進行。

Pipeline2:擴充套件 Pinecone 索引(向量儲存)

本章節的目標是使用我們的資料集建立 Pinecone 索引,並將其從 10,000 筆記錄擴充套件到 1,000,000 筆記錄。雖然我們建立在前幾章所獲得的知識之上,但擴充套件與管理樣本資料集是不同的。

Pipeline2 的流程

此 Pipeline 的每個過程看似簡單:資料準備、嵌入、上傳到向量儲存和查詢以檢索檔案。我們已經在第 2 章和第 3 章中經歷了這些過程。

步驟 1:資料準備

我們將使用 Python 對資料集進行分塊處理。

步驟 2:分塊和嵌入

我們將準備好的資料進行分塊,然後對分塊後的資料進行嵌入。

步驟 3:建立 Pinecone 索引

我們將建立一個 Pinecone 索引(向量儲存)。

步驟 4:上傳資料

我們將上傳嵌入的檔案(在本例中為客戶記錄)以及每個記錄的文字作為後設資料。

步驟 5:查詢 Pinecone 索引

最後,我們將執行查詢以檢索相關檔案,為 Pipeline3:RAG 生成式 AI 做準備。

向量儲存管理的挑戰

在擴充套件時,錯誤會被指數級放大。因此,在執行任何程式碼之前,我們需要考慮專案管理決策。

OpenAI 模型的挑戰

  1. 嵌入模型的選擇:OpenAI 不斷改進並提供新的嵌入模型。在嵌入之前,請確保檢查每個模型的特性,包括速度、成本、輸入限制和 API 呼叫率。

    https://platform.openai.com/docs/models/embeddings
    
  2. 生成模型的選擇:OpenAI 不斷發布新模型並放棄舊模型。選擇最有效的模型以提高速度和降低成本。

    https://platform.openai.com/docs/models
    

Pinecone 的約束

  1. 雲端和區域選擇:雲端(AWS、Google 等)和區域(無伺服器儲存的位置)的選擇對定價有影響。

  2. 使用情況:包括讀取單位、寫入單位和儲存成本(包括雲端備份)的費用。

    https://docs.pinecone.io/guides/indexes/back-up-an-index
    

監控成本和用量

您需要持續監控 Pinecone 的價格和用量,就像監控其他雲端環境一樣。

https://www.pinecone.io/pricing/
https://docs.pinecone.io/guides/operations/monitoring

實作 Pipeline2

現在,讓我們開始實作 Pipeline2,重點關注超出前幾章所探討的功能性的痛點。

安裝環境

首先,我們安裝 OpenAI 和 Pinecone:

!pip install openai==1.40.3
!pip install pinecone-client==5.0.1

初始化 API 金鑰

f = open("drive/MyDrive/files/pinecone.txt", "r")
PINECONE_API_KEY = f.readline()
f.close()

f = open("drive/MyDrive/files/api_key.txt", "r")
API_KEY = f.readline()
f.close()

import os
import openai
os.environ['OPENAI_API_KEY'] = API_KEY
openai.api_key = os.getenv("OPENAI_API_KEY")

處理資料集

本文將重點放在為分塊準備資料集,將其分成最佳化的文字區塊以進行嵌入。

!cp /content/drive/MyDrive/files/rag_c6/data1.csv /content/data1
import pandas as pd

# 載入 CSV 檔案
df = pd.read_csv('/content/data1.csv')

內容解密:

上述程式碼首先將 data1.csv 檔案從 Google Drive 複製到當前工作目錄。然後,使用 pandas 函式庫將 CSV 檔案載入到 DataFrame 中,以便進一步處理。

未來步驟

  1. 對資料進行分塊和嵌入。
  2. 建立 Pinecone 索引並上傳嵌入的資料。
  3. 查詢 Pinecone 索引以檢索相關檔案。

這些步驟將在後續章節中詳細介紹。

程式碼範例

安裝必要的套件

!pip install openai==1.40.3
!pip install pinecone-client==5.0.1

內容解密:

這段程式碼安裝了 openaipinecone-client 套件,分別用於與 OpenAI API 和 Pinecone 向量資料函式庫互動。指定了特定的版本,以確保相容性和穩定性。

初始化 API 金鑰

import os
import openai

# 從檔案讀取 API 金鑰
with open("drive/MyDrive/files/pinecone.txt", "r") as f:
    PINECONE_API_KEY = f.readline().strip()

with open("drive/MyDrive/files/api_key.txt", "r") as f:
    API_KEY = f.readline().strip()

# 設定 OpenAI API 金鑰
os.environ['OPENAI_API_KEY'] = API_KEY
openai.api_key = os.getenv("OPENAI_API_KEY")

內容解密:

這段程式碼從指定的檔案中讀取 Pinecone 和 OpenAI 的 API 金鑰,並將 OpenAI 的 API 金鑰設定到環境變數和 openai 函式庫中。這樣可以確保在後續的操作中能夠正確地使用這些 API。

載入資料集

import pandas as pd

# 載入 CSV 檔案
df = pd.read_csv('/content/data1.csv')

內容解密:

這段程式碼使用 pandas 函式庫載入 CSV 檔案到 DataFrame 中,方便進行後續的資料處理和分析。

Pipeline2 流程圖

  graph LR
    A[資料準備] --> B[分塊和嵌入]
    B --> C[建立 Pinecone 索引]
    C --> D[上傳嵌入資料]
    D --> E[查詢 Pinecone 索引]

圖表翻譯: 此圖示展示了 Pipeline2 的主要步驟,包括資料準備、分塊和嵌入、建立 Pinecone 索引、上傳嵌入資料以及查詢 Pinecone 索引。這些步驟按順序執行,共同構成了擴充套件 Pinecone 索引的流程。

隨著資料量的增加,如何高效地管理和查詢向量資料成為了一個重要的課題。未來,我們將進一步探討如何最佳化 Pinecone 索引的效能和成本,以及如何結合其他技術來構建更強大的 AI 應用。

參考資料

資料集載入與預處理:客戶資料分析

在資料科學專案中,正確載入和預處理資料是至關重要的第一步。本章節將詳細介紹如何使用Python的pandas函式庫載入CSV檔案並進行初步的資料處理。

資料載入與驗證

首先,我們需要載入所需的函式庫並讀取資料檔案。以下程式碼展示瞭如何載入CSV檔案並驗證資料的行數:

import pandas as pd

# 定義檔案路徑
file_path = '/content/data1.csv'

# 使用pandas讀取CSV檔案
data1 = pd.read_csv(file_path)

# 計算資料的行數
number_of_lines = len(data1)
print("資料總行數:", number_of_lines)

內容解密:

  1. 檔案路徑定義file_path變數儲存了資料檔案的路徑。
  2. 資料讀取:使用pd.read_csv()函式將CSV檔案讀入DataFrame物件data1中。
  3. 行數計算len(data1)用於計算DataFrame中的總行數。
  4. 結果輸出:列印出資料的總行數,用於驗證資料是否正確載入。

資料轉換:建立客戶記錄文字

接下來,我們需要將DataFrame中的每一行資料轉換為特定的文字格式。以下程式碼展示瞭如何實作這一步驟:

# 初始化空列表用於儲存轉換後的資料
output_lines = []

# 遍歷DataFrame的每一行
for index, row in data1.iterrows():
    # 為每一列建立「欄位名稱: 值」的格式
    row_data = [f"{col}: {row[col]}" for col in data1.columns]
    # 將列表中的元素以空格連線成單一字串
    line = ' '.join(row_data)
    # 將處理好的字串加入列表中
    output_lines.append(line)

# 顯示前5行資料進行驗證
for line in output_lines[:5]:
    print(line)

內容解密:

  1. 列表初始化:建立空列表output_lines用於儲存處理後的客戶資料。
  2. 資料轉換:使用列表推導式將每列的欄位名稱和對應值組合成特定格式。
  3. 字串連線:使用' '.join()方法將列表元素連線成單一字串。
  4. 結果驗證:列印前5行資料以確認轉換結果正確。

資料完整性驗證

在完成資料轉換後,我們需要驗證資料的完整性:

# 複製output_lines到新的列表
lines = output_lines.copy()

# 驗證資料行數
number_of_lines = len(lines)
print("資料總行數:", number_of_lines)

內容解密:

  1. 資料複製:將output_lines列表複製到lines,確保原始資料不被修改。
  2. 行數驗證:再次檢查資料的總行數,確保在轉換過程中沒有遺失任何資料。

資料分塊與嵌入處理

在完成資料預處理後,我們需要對資料進行分塊和嵌入處理,以準備後續的機器學習模型訓練。

資料分塊

資料分塊是將預處理後的資料分割成較小的區塊,以便於後續處理:

# 初始化空列表用於儲存資料區塊
chunks = []

# 將每一行資料作為獨立的區塊
for line in lines:
    chunks.append(line)

# 列印總區塊數
print(f"總區塊數:{len(chunks)}")

內容解密:

  1. 區塊列表初始化:建立空列表chunks用於儲存資料區塊。
  2. 資料分塊:將lines中的每一行資料直接作為一個獨立的區塊加入chunks
  3. 結果輸出:列印總區塊數,確認資料分塊的結果。

區塊內容檢視

為了更好地理解分塊後的資料結構,我們可以檢視前幾個區塊的內容和長度:

# 列印前3個區塊的長度和內容
for i in range(3):
    print(len(chunks[i]))
    print(chunks[i])

內容解密:

  1. 區塊長度計算:使用len()函式計算每個區塊的字元長度。
  2. 區塊內容輸出:列印區塊的實際內容,用於人工檢查資料的正確性。

嵌入處理

嵌入(Embedding)是將文字資料轉換為數值向量的過程,是許多自然語言處理任務的關鍵步驟。

嵌入模型選擇

目前OpenAI提供了多種嵌入模型可供選擇:

import openai
import time

# 選擇嵌入模型
embedding_model = "text-embedding-3-small"
# embedding_model = "text-embedding-3-large"
# embedding_model = "text-embedding-ada-002"

# 初始化OpenAI客戶端
client = openai.OpenAI()

def get_embedding(text, model=embedding_model):
    # 預處理文字,移除換行符號
    text = text.replace("\n", " ")
    # 取得嵌入向量
    response = client.embeddings.create(input=[text], model=model)
    embedding = response.data[0].embedding
    return embedding

內容解密:

  1. 模型選擇:目前選擇了text-embedding-3-small模型,可以根據需求切換其他模型。
  2. 客戶端初始化:建立OpenAI的API客戶端,用於後續的嵌入請求。
  3. 嵌入函式定義get_embedding()函式負責將輸入文字轉換為嵌入向量。

分批嵌入處理

為了避免觸發API的速率限制,我們採用分批處理的方式進行嵌入:

# 初始化變數
start_time = time.time()
chunk_start = 0
chunk_end = 1000
pause_time = 3
embeddings = []
counter = 1

while chunk_end <= len(chunks):
    # 取得當前批次的區塊
    chunks_to_embed = chunks[chunk_start:chunk_end]
    
    # 對當前批次進行嵌入處理
    current_embeddings = []
    for chunk in chunks_to_embed:
        embedding = get_embedding(chunk, model=embedding_model)
        current_embeddings.append(embedding)
    
    # 將當前批次的嵌入結果加入總列表
    embeddings.extend(current_embeddings)
    
    # 更新區塊索引
    chunk_start += 1000
    chunk_end += 1000
    
    # 處理剩餘區塊
    if chunk_end > len(chunks):
        remaining_chunks = chunks[chunk_start:]
        remaining_embeddings = [get_embedding(chunk, model=embedding_model) for chunk in remaining_chunks]
        embeddings.extend(remaining_embeddings)
        break
    
    # 列印進度資訊
    print(f"批次 {counter} 已處理完成")
    counter += 1
    
    # 暫停以避免觸發API速率限制
    time.sleep(pause_time)

# 列印總處理時間
print(f"總處理時間:{time.time() - start_time} 秒")

內容解密:

  1. 分批處理:將資料分成多個批次,每批次1000個區塊進行嵌入處理。
  2. 速率控制:在批次之間加入暫停時間,避免觸發API的速率限制。
  3. 進度追蹤:列印每個批次的處理進度,方便監控整個嵌入過程。
  4. 剩餘資料處理:最後處理剩餘的資料區塊,確保所有資料都被正確嵌入。