Pinecone 向量資料函式庫在處理百萬級資料時,效能和成本是關鍵考量。資料的預處理和分塊策略直接影響嵌入效率和後續查詢速度。選擇合適的 OpenAI 嵌入模型至關重要,需考量模型的效能、成本和 API 呼叫限制。此外,Pinecone 的雲端和區域選擇、讀寫單位和儲存成本都會影響整體開銷,需要仔細評估和監控。程式碼實作部分,示範瞭如何使用 Python 和 Pandas 進行資料處理、分塊,以及如何使用 OpenAI API 進行嵌入。分批處理和適當的暫停時間可以避免觸發 API 速率限制,確保嵌入過程順利進行。
Pipeline2:擴充套件 Pinecone 索引(向量儲存)
本章節的目標是使用我們的資料集建立 Pinecone 索引,並將其從 10,000 筆記錄擴充套件到 1,000,000 筆記錄。雖然我們建立在前幾章所獲得的知識之上,但擴充套件與管理樣本資料集是不同的。
Pipeline2 的流程
此 Pipeline 的每個過程看似簡單:資料準備、嵌入、上傳到向量儲存和查詢以檢索檔案。我們已經在第 2 章和第 3 章中經歷了這些過程。
步驟 1:資料準備
我們將使用 Python 對資料集進行分塊處理。
步驟 2:分塊和嵌入
我們將準備好的資料進行分塊,然後對分塊後的資料進行嵌入。
步驟 3:建立 Pinecone 索引
我們將建立一個 Pinecone 索引(向量儲存)。
步驟 4:上傳資料
我們將上傳嵌入的檔案(在本例中為客戶記錄)以及每個記錄的文字作為後設資料。
步驟 5:查詢 Pinecone 索引
最後,我們將執行查詢以檢索相關檔案,為 Pipeline3:RAG 生成式 AI 做準備。
向量儲存管理的挑戰
在擴充套件時,錯誤會被指數級放大。因此,在執行任何程式碼之前,我們需要考慮專案管理決策。
OpenAI 模型的挑戰
-
嵌入模型的選擇:OpenAI 不斷改進並提供新的嵌入模型。在嵌入之前,請確保檢查每個模型的特性,包括速度、成本、輸入限制和 API 呼叫率。
https://platform.openai.com/docs/models/embeddings -
生成模型的選擇:OpenAI 不斷發布新模型並放棄舊模型。選擇最有效的模型以提高速度和降低成本。
https://platform.openai.com/docs/models
Pinecone 的約束
-
雲端和區域選擇:雲端(AWS、Google 等)和區域(無伺服器儲存的位置)的選擇對定價有影響。
-
使用情況:包括讀取單位、寫入單位和儲存成本(包括雲端備份)的費用。
https://docs.pinecone.io/guides/indexes/back-up-an-index
監控成本和用量
您需要持續監控 Pinecone 的價格和用量,就像監控其他雲端環境一樣。
https://www.pinecone.io/pricing/
https://docs.pinecone.io/guides/operations/monitoring
實作 Pipeline2
現在,讓我們開始實作 Pipeline2,重點關注超出前幾章所探討的功能性的痛點。
安裝環境
首先,我們安裝 OpenAI 和 Pinecone:
!pip install openai==1.40.3
!pip install pinecone-client==5.0.1
初始化 API 金鑰
f = open("drive/MyDrive/files/pinecone.txt", "r")
PINECONE_API_KEY = f.readline()
f.close()
f = open("drive/MyDrive/files/api_key.txt", "r")
API_KEY = f.readline()
f.close()
import os
import openai
os.environ['OPENAI_API_KEY'] = API_KEY
openai.api_key = os.getenv("OPENAI_API_KEY")
處理資料集
本文將重點放在為分塊準備資料集,將其分成最佳化的文字區塊以進行嵌入。
!cp /content/drive/MyDrive/files/rag_c6/data1.csv /content/data1
import pandas as pd
# 載入 CSV 檔案
df = pd.read_csv('/content/data1.csv')
內容解密:
上述程式碼首先將 data1.csv 檔案從 Google Drive 複製到當前工作目錄。然後,使用 pandas 函式庫將 CSV 檔案載入到 DataFrame 中,以便進一步處理。
未來步驟
- 對資料進行分塊和嵌入。
- 建立 Pinecone 索引並上傳嵌入的資料。
- 查詢 Pinecone 索引以檢索相關檔案。
這些步驟將在後續章節中詳細介紹。
程式碼範例
安裝必要的套件
!pip install openai==1.40.3
!pip install pinecone-client==5.0.1
內容解密:
這段程式碼安裝了 openai 和 pinecone-client 套件,分別用於與 OpenAI API 和 Pinecone 向量資料函式庫互動。指定了特定的版本,以確保相容性和穩定性。
初始化 API 金鑰
import os
import openai
# 從檔案讀取 API 金鑰
with open("drive/MyDrive/files/pinecone.txt", "r") as f:
PINECONE_API_KEY = f.readline().strip()
with open("drive/MyDrive/files/api_key.txt", "r") as f:
API_KEY = f.readline().strip()
# 設定 OpenAI API 金鑰
os.environ['OPENAI_API_KEY'] = API_KEY
openai.api_key = os.getenv("OPENAI_API_KEY")
內容解密:
這段程式碼從指定的檔案中讀取 Pinecone 和 OpenAI 的 API 金鑰,並將 OpenAI 的 API 金鑰設定到環境變數和 openai 函式庫中。這樣可以確保在後續的操作中能夠正確地使用這些 API。
載入資料集
import pandas as pd
# 載入 CSV 檔案
df = pd.read_csv('/content/data1.csv')
內容解密:
這段程式碼使用 pandas 函式庫載入 CSV 檔案到 DataFrame 中,方便進行後續的資料處理和分析。
Pipeline2 流程圖
graph LR
A[資料準備] --> B[分塊和嵌入]
B --> C[建立 Pinecone 索引]
C --> D[上傳嵌入資料]
D --> E[查詢 Pinecone 索引]
圖表翻譯: 此圖示展示了 Pipeline2 的主要步驟,包括資料準備、分塊和嵌入、建立 Pinecone 索引、上傳嵌入資料以及查詢 Pinecone 索引。這些步驟按順序執行,共同構成了擴充套件 Pinecone 索引的流程。
隨著資料量的增加,如何高效地管理和查詢向量資料成為了一個重要的課題。未來,我們將進一步探討如何最佳化 Pinecone 索引的效能和成本,以及如何結合其他技術來構建更強大的 AI 應用。
參考資料
資料集載入與預處理:客戶資料分析
在資料科學專案中,正確載入和預處理資料是至關重要的第一步。本章節將詳細介紹如何使用Python的pandas函式庫載入CSV檔案並進行初步的資料處理。
資料載入與驗證
首先,我們需要載入所需的函式庫並讀取資料檔案。以下程式碼展示瞭如何載入CSV檔案並驗證資料的行數:
import pandas as pd
# 定義檔案路徑
file_path = '/content/data1.csv'
# 使用pandas讀取CSV檔案
data1 = pd.read_csv(file_path)
# 計算資料的行數
number_of_lines = len(data1)
print("資料總行數:", number_of_lines)
內容解密:
- 檔案路徑定義:
file_path變數儲存了資料檔案的路徑。 - 資料讀取:使用
pd.read_csv()函式將CSV檔案讀入DataFrame物件data1中。 - 行數計算:
len(data1)用於計算DataFrame中的總行數。 - 結果輸出:列印出資料的總行數,用於驗證資料是否正確載入。
資料轉換:建立客戶記錄文字
接下來,我們需要將DataFrame中的每一行資料轉換為特定的文字格式。以下程式碼展示瞭如何實作這一步驟:
# 初始化空列表用於儲存轉換後的資料
output_lines = []
# 遍歷DataFrame的每一行
for index, row in data1.iterrows():
# 為每一列建立「欄位名稱: 值」的格式
row_data = [f"{col}: {row[col]}" for col in data1.columns]
# 將列表中的元素以空格連線成單一字串
line = ' '.join(row_data)
# 將處理好的字串加入列表中
output_lines.append(line)
# 顯示前5行資料進行驗證
for line in output_lines[:5]:
print(line)
內容解密:
- 列表初始化:建立空列表
output_lines用於儲存處理後的客戶資料。 - 資料轉換:使用列表推導式將每列的欄位名稱和對應值組合成特定格式。
- 字串連線:使用
' '.join()方法將列表元素連線成單一字串。 - 結果驗證:列印前5行資料以確認轉換結果正確。
資料完整性驗證
在完成資料轉換後,我們需要驗證資料的完整性:
# 複製output_lines到新的列表
lines = output_lines.copy()
# 驗證資料行數
number_of_lines = len(lines)
print("資料總行數:", number_of_lines)
內容解密:
- 資料複製:將
output_lines列表複製到lines,確保原始資料不被修改。 - 行數驗證:再次檢查資料的總行數,確保在轉換過程中沒有遺失任何資料。
資料分塊與嵌入處理
在完成資料預處理後,我們需要對資料進行分塊和嵌入處理,以準備後續的機器學習模型訓練。
資料分塊
資料分塊是將預處理後的資料分割成較小的區塊,以便於後續處理:
# 初始化空列表用於儲存資料區塊
chunks = []
# 將每一行資料作為獨立的區塊
for line in lines:
chunks.append(line)
# 列印總區塊數
print(f"總區塊數:{len(chunks)}")
內容解密:
- 區塊列表初始化:建立空列表
chunks用於儲存資料區塊。 - 資料分塊:將
lines中的每一行資料直接作為一個獨立的區塊加入chunks。 - 結果輸出:列印總區塊數,確認資料分塊的結果。
區塊內容檢視
為了更好地理解分塊後的資料結構,我們可以檢視前幾個區塊的內容和長度:
# 列印前3個區塊的長度和內容
for i in range(3):
print(len(chunks[i]))
print(chunks[i])
內容解密:
- 區塊長度計算:使用
len()函式計算每個區塊的字元長度。 - 區塊內容輸出:列印區塊的實際內容,用於人工檢查資料的正確性。
嵌入處理
嵌入(Embedding)是將文字資料轉換為數值向量的過程,是許多自然語言處理任務的關鍵步驟。
嵌入模型選擇
目前OpenAI提供了多種嵌入模型可供選擇:
import openai
import time
# 選擇嵌入模型
embedding_model = "text-embedding-3-small"
# embedding_model = "text-embedding-3-large"
# embedding_model = "text-embedding-ada-002"
# 初始化OpenAI客戶端
client = openai.OpenAI()
def get_embedding(text, model=embedding_model):
# 預處理文字,移除換行符號
text = text.replace("\n", " ")
# 取得嵌入向量
response = client.embeddings.create(input=[text], model=model)
embedding = response.data[0].embedding
return embedding
內容解密:
- 模型選擇:目前選擇了
text-embedding-3-small模型,可以根據需求切換其他模型。 - 客戶端初始化:建立OpenAI的API客戶端,用於後續的嵌入請求。
- 嵌入函式定義:
get_embedding()函式負責將輸入文字轉換為嵌入向量。
分批嵌入處理
為了避免觸發API的速率限制,我們採用分批處理的方式進行嵌入:
# 初始化變數
start_time = time.time()
chunk_start = 0
chunk_end = 1000
pause_time = 3
embeddings = []
counter = 1
while chunk_end <= len(chunks):
# 取得當前批次的區塊
chunks_to_embed = chunks[chunk_start:chunk_end]
# 對當前批次進行嵌入處理
current_embeddings = []
for chunk in chunks_to_embed:
embedding = get_embedding(chunk, model=embedding_model)
current_embeddings.append(embedding)
# 將當前批次的嵌入結果加入總列表
embeddings.extend(current_embeddings)
# 更新區塊索引
chunk_start += 1000
chunk_end += 1000
# 處理剩餘區塊
if chunk_end > len(chunks):
remaining_chunks = chunks[chunk_start:]
remaining_embeddings = [get_embedding(chunk, model=embedding_model) for chunk in remaining_chunks]
embeddings.extend(remaining_embeddings)
break
# 列印進度資訊
print(f"批次 {counter} 已處理完成")
counter += 1
# 暫停以避免觸發API速率限制
time.sleep(pause_time)
# 列印總處理時間
print(f"總處理時間:{time.time() - start_time} 秒")
內容解密:
- 分批處理:將資料分成多個批次,每批次1000個區塊進行嵌入處理。
- 速率控制:在批次之間加入暫停時間,避免觸發API的速率限制。
- 進度追蹤:列印每個批次的處理進度,方便監控整個嵌入過程。
- 剩餘資料處理:最後處理剩餘的資料區塊,確保所有資料都被正確嵌入。