在資料驅動的應用開發中,向量資料函式庫和生成式 AI 技術的結合日益重要。本文以 OpenAI 嵌入模型和 Pinecone 向量資料函式庫為例,示範如何處理大規模客戶資料,並結合 RAG 技術實作客製化訊息生成。首先,將客戶資料分塊並使用 OpenAI 的嵌入模型轉換為向量表示,接著進行資料擴增以模擬大規模資料情境。然後,使用 Pinecone 建立向量索引,並將擴增後的向量資料上傳至 Pinecone 資料函式庫。最後,示範如何利用 Pinecone 進行高效向量查詢,並結合查詢結果和 RAG 技術,使用 GPT-4o 生成自定義的市場訊息。
資料分塊與嵌入
資料分塊
將客戶記錄分塊處理,以提高資料管理的效率。
chunks = []
for line in lines:
chunks.append(line)
print(f"總分塊數:{len(chunks)}")
輸出結果顯示總共有 10,000 個分塊。
內容解密:
- 將每個客戶記錄作為一個獨立的分塊儲存在
chunks
列表中。 - 這種做法有利於後續的資料處理和品質控制。
檢視分塊內容
檢查前幾個分塊的長度和內容,以瞭解分塊後的資料結構。
for i in range(3):
print(len(chunks[i]))
print(chunks[i])
輸出範例顯示每個分塊的長度和內容。
內容解密:
- 使用迴圈遍歷前幾個分塊。
- 列印每個分塊的長度和具體內容,以便檢查資料是否正確分塊。
嵌入模型選擇與實作
初始化嵌入模型
選擇適合的嵌入模型來進行資料嵌入。本文使用 OpenAI 提供的嵌入模型。
import openai
embedding_model = "text-embedding-3-small"
client = openai.OpenAI()
def get_embedding(text, model=embedding_model):
text = text.replace("\n", " ")
response = client.embeddings.create(input=[text], model=model)
embedding = response.data[0].embedding
return embedding
內容解密:
- 初始化 OpenAI 使用者端並選擇嵌入模型。
- 定義
get_embedding
函式,用於取得輸入文字的嵌入向量。
分批嵌入處理
為了避免 API 速率限制,將分塊資料分批進行嵌入處理。
start_time = time.time()
chunk_start = 0
chunk_end = 1000
pause_time = 3
embeddings = []
counter = 1
while chunk_end <= len(chunks):
chunks_to_embed = chunks[chunk_start:chunk_end]
current_embeddings = []
for chunk in chunks_to_embed:
embedding = get_embedding(chunk, model=embedding_model)
current_embeddings.append(embedding)
embeddings.extend(current_embeddings)
chunk_start += 1000
chunk_end += 1000
counter += 1
# 處理剩餘的分塊
if chunk_end < len(chunks):
remaining_chunks = chunks[chunk_end:]
remaining_embeddings = [get_embedding(chunk, model=embedding_model) for chunk in remaining_chunks]
embeddings.extend(remaining_embeddings)
輸出結果顯示批次處理的進度和總處理時間。
內容解密:
- 使用 while 迴圈分批處理分塊資料,每次處理 1000 個分塊。
- 在每批次處理之間暫停幾秒鐘,以避免觸發 API 速率限制。
- 將所有嵌入向量儲存在
embeddings
列表中。
大規模向量資料處理與Pinecone索引建立
在大規模資料處理中,資料的嵌入(embedding)和向量儲存(vector store)的建立是至關重要的步驟。本文將介紹如何使用OpenAI嵌入模型和Pinecone向量資料函式庫來處理大規模的客戶資料。
資料嵌入與驗證
首先,我們需要將客戶資料進行分塊(chunking)並嵌入到向量空間中。嵌入的過程是使用OpenAI的嵌入模型來實作的。嵌入後的向量可以用來表示原始資料的語義資訊。
# 顯示第一個嵌入向量
print("First embedding:", embeddings[0])
內容解密:
這段程式碼用於顯示第一個嵌入向量的內容,以驗證嵌入過程是否正確執行。嵌入向量是一個多維數值陣列,代表了對應文字的語義特徵。
# 檢查分塊和嵌入向量的數量
num_chunks = len(chunks)
print(f"Number of chunks: {num_chunks}")
print(f"Number of embeddings: {len(embeddings)}")
內容解密:
這段程式碼用於檢查分塊的數量和嵌入向量的數量是否一致,以確保資料處理的正確性。如果數量不一致,可能會導致後續的處理出現錯誤。
資料擴增
為了模擬大規模資料的處理,我們需要對現有的資料進行擴增。資料擴增的過程是透過複製現有的分塊和嵌入向量來實作的。
# 定義擴增大小
dsize = 5
total = dsize * len(chunks)
print("Total size", total)
# 初始化新的列表來儲存擴增後的資料
duplicated_chunks = []
duplicated_embeddings = []
# 複製分塊和嵌入向量
for i in range(len(chunks)):
for _ in range(dsize):
duplicated_chunks.append(chunks[i])
duplicated_embeddings.append(embeddings[i])
# 檢查擴增後的資料數量
print(f"Number of duplicated chunks: {len(duplicated_chunks)}")
print(f"Number of duplicated embeddings: {len(duplicated_embeddings)}")
內容解密:
這段程式碼用於將現有的分塊和嵌入向量複製指定的次數,以達到擴增資料的目的。擴增後的資料可以用來模擬大規模資料的處理。
建立Pinecone索引
Pinecone是一個用於向量相似性搜尋的資料函式庫。我們需要建立一個Pinecone索引來儲存我們的向量資料。
# 初始化Pinecone連線
api_key = os.environ.get('PINECONE_API_KEY')
pc = Pinecone(api_key=api_key)
# 定義索引名稱、雲端提供者和地區
index_name = 'bank-index-50000'
cloud = 'aws'
region = 'us-east-1'
spec = ServerlessSpec(cloud=cloud, region=region)
# 建立索引
if index_name not in pc.list_indexes().names():
pc.create_index(
index_name,
dimension=1536,
metric='cosine',
spec=spec
)
# 連線到索引
index = pc.Index(index_name)
index.describe_index_stats()
內容解密:
這段程式碼用於建立一個Pinecone索引,並定義了索引的名稱、維度、相似性度量指標等引數。建立索引後,我們可以連線到該索引並檢視其狀態。
上傳向量資料到Pinecone索引
上傳向量資料到Pinecone索引是透過upsert
操作來實作的。我們需要將嵌入向量和對應的中繼資料上傳到索引中。
# 定義上傳函式
def upsert_to_pinecone(data, batch_size):
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
index.upsert(vectors=batch)
# 定義批次大小計算函式
def get_batch_size(data, limit=4000000):
total_size = 0
batch_size = 0
for item in data:
item_size = sum([sys.getsizeof(v) for v in item.values()])
if total_size + item_size > limit:
break
total_size += item_size
batch_size += 1
return batch_size
# 上傳資料到Pinecone索引
def batch_upsert(data):
total = len(data)
i = 0
while i < total:
batch_size = get_batch_size(data[i:])
batch = data[i:i + batch_size]
if batch:
upsert_to_pinecone(batch, batch_size)
i += batch_size
print(f"Upserted {i}/{total} items...")
else:
break
內容解密:
這段程式碼定義了上傳函式upsert_to_pinecone
和批次大小計算函式get_batch_size
,用於將資料分批上傳到Pinecone索引中,以避免單次請求過大導致的效能問題。batch_upsert
函式則負責協調整個上傳過程,並顯示上傳進度。
使用Pinecone向量資料函式庫進行高效查詢與RAG生成式AI應用
在現代化的資料驅動應用中,高效的向量檢索和生成式AI技術正逐漸成為關鍵的核心能力。本文將探討如何利用Pinecone向量資料函式庫進行大規模資料的索引與查詢,並結合RAG(Retrieval-Augmented Generation)技術實作自定義的生成式AI應用。
建立Pinecone向量索引
首先,我們需要為資料建立向量索引。以下程式碼展示瞭如何將資料批次嵌入並上傳至Pinecone:
# 為每個資料專案生成唯一ID
ids = [str(i) for i in range(1, len(duplicated_chunks) + 1)]
# 準備上傳資料
data_for_upsert = [
{"id": str(id), "values": emb, "metadata": {"text": chunk}}
for id, (chunk, emb) in zip(ids, zip(duplicated_chunks, duplicated_embeddings))
]
# 批次上傳資料至Pinecone
batch_upsert(data_for_upsert)
# 測量上傳時間
response_time = time.time() - start_time
print(f"上傳回應時間:{response_time:.2f} 秒")
內容解密:
- ID生成:為每個資料區塊生成唯一的ID,確保資料的可識別性。
- 資料準備:將資料區塊、對應的向量嵌入和元資料封裝成Pinecone所需的格式。
- 批次上傳:使用
batch_upsert
函式將準備好的資料分批次上傳至Pinecone索引。 - 效能測量:記錄上傳過程所需的時間,以評估系統效能。
查詢Pinecone向量儲存
接下來,我們需要驗證查詢的回應時間和結果品質。以下程式碼展示瞭如何查詢向量儲存並顯示結果:
# 定義查詢結果顯示函式
def display_results(query_results):
for match in query_results['matches']:
print(f"ID: {match['id']}, 相似度: {match['score']}")
if 'metadata' in match and 'text' in match['metadata']:
print(f"文字: {match['metadata']['text']}")
# 取得查詢嵌入向量
query_embedding = get_embedding(query_text, model=embedding_model)
# 執行查詢並顯示結果
query_results = index.query(vector=query_embedding, top_k=1, include_metadata=True)
display_results(query_results)
# 測量查詢回應時間
response_time = time.time() - start_time
print(f"查詢回應時間:{response_time:.2f} 秒")
內容解密:
- 查詢結果處理:定義
display_results
函式來解析和顯示查詢結果,包括匹配的ID、相似度和原始文字。 - 查詢嵌入:使用相同的嵌入模型將查詢文字轉換為向量表示。
- 執行查詢:在Pinecone索引中執行查詢,檢索最相似的向量。
- 效能評估:測量並輸出查詢所需的時間,以確保系統回應速度。
RAG生成式AI應用
最後,我們結合RAG技術,利用Pinecone的查詢結果生成自定義的市場訊息:
# 使用GPT-4o生成自定義訊息
def generate_custom_message(target_vector):
# 查詢Pinecone索引取得相似向量
query_results = index.query(vector=target_vector, top_k=5, include_metadata=True)
# 處理查詢結果並生成自定義訊息
# ...(省略具體實作細節)
return custom_message
# 示例:為特定市場細分生成訊息
target_vector = get_embedding("目標市場細分描述", model=embedding_model)
custom_message = generate_custom_message(target_vector)
print("生成的自定義訊息:", custom_message)
內容解密:
- 目標向量:使用特定的市場細分描述生成目標向量。
- 查詢與處理:在Pinecone中查詢最相似的向量,並提取相關資訊。
- 生成訊息:結合檢索到的資訊,利用GPT-4o生成自定義的市場訊息。