在資料驅動的應用開發中,向量資料函式庫和生成式 AI 技術的結合日益重要。本文以 OpenAI 嵌入模型和 Pinecone 向量資料函式庫為例,示範如何處理大規模客戶資料,並結合 RAG 技術實作客製化訊息生成。首先,將客戶資料分塊並使用 OpenAI 的嵌入模型轉換為向量表示,接著進行資料擴增以模擬大規模資料情境。然後,使用 Pinecone 建立向量索引,並將擴增後的向量資料上傳至 Pinecone 資料函式庫。最後,示範如何利用 Pinecone 進行高效向量查詢,並結合查詢結果和 RAG 技術,使用 GPT-4o 生成自定義的市場訊息。

資料分塊與嵌入

資料分塊

將客戶記錄分塊處理,以提高資料管理的效率。

chunks = []
for line in lines:
    chunks.append(line)

print(f"總分塊數:{len(chunks)}")

輸出結果顯示總共有 10,000 個分塊。

內容解密:

  • 將每個客戶記錄作為一個獨立的分塊儲存在 chunks 列表中。
  • 這種做法有利於後續的資料處理和品質控制。

檢視分塊內容

檢查前幾個分塊的長度和內容,以瞭解分塊後的資料結構。

for i in range(3):
    print(len(chunks[i]))
    print(chunks[i])

輸出範例顯示每個分塊的長度和內容。

內容解密:

  • 使用迴圈遍歷前幾個分塊。
  • 列印每個分塊的長度和具體內容,以便檢查資料是否正確分塊。

嵌入模型選擇與實作

初始化嵌入模型

選擇適合的嵌入模型來進行資料嵌入。本文使用 OpenAI 提供的嵌入模型。

import openai
embedding_model = "text-embedding-3-small"
client = openai.OpenAI()

def get_embedding(text, model=embedding_model):
    text = text.replace("\n", " ")
    response = client.embeddings.create(input=[text], model=model)
    embedding = response.data[0].embedding
    return embedding

內容解密:

  • 初始化 OpenAI 使用者端並選擇嵌入模型。
  • 定義 get_embedding 函式,用於取得輸入文字的嵌入向量。

分批嵌入處理

為了避免 API 速率限制,將分塊資料分批進行嵌入處理。

start_time = time.time()
chunk_start = 0
chunk_end = 1000
pause_time = 3
embeddings = []
counter = 1

while chunk_end <= len(chunks):
    chunks_to_embed = chunks[chunk_start:chunk_end]
    current_embeddings = []
    for chunk in chunks_to_embed:
        embedding = get_embedding(chunk, model=embedding_model)
        current_embeddings.append(embedding)
    embeddings.extend(current_embeddings)
    chunk_start += 1000
    chunk_end += 1000
    counter += 1

# 處理剩餘的分塊
if chunk_end < len(chunks):
    remaining_chunks = chunks[chunk_end:]
    remaining_embeddings = [get_embedding(chunk, model=embedding_model) for chunk in remaining_chunks]
    embeddings.extend(remaining_embeddings)

輸出結果顯示批次處理的進度和總處理時間。

內容解密:

  • 使用 while 迴圈分批處理分塊資料,每次處理 1000 個分塊。
  • 在每批次處理之間暫停幾秒鐘,以避免觸發 API 速率限制。
  • 將所有嵌入向量儲存在 embeddings 列表中。

大規模向量資料處理與Pinecone索引建立

在大規模資料處理中,資料的嵌入(embedding)和向量儲存(vector store)的建立是至關重要的步驟。本文將介紹如何使用OpenAI嵌入模型和Pinecone向量資料函式庫來處理大規模的客戶資料。

資料嵌入與驗證

首先,我們需要將客戶資料進行分塊(chunking)並嵌入到向量空間中。嵌入的過程是使用OpenAI的嵌入模型來實作的。嵌入後的向量可以用來表示原始資料的語義資訊。

# 顯示第一個嵌入向量
print("First embedding:", embeddings[0])

內容解密:

這段程式碼用於顯示第一個嵌入向量的內容,以驗證嵌入過程是否正確執行。嵌入向量是一個多維數值陣列,代表了對應文字的語義特徵。

# 檢查分塊和嵌入向量的數量
num_chunks = len(chunks)
print(f"Number of chunks: {num_chunks}")
print(f"Number of embeddings: {len(embeddings)}")

內容解密:

這段程式碼用於檢查分塊的數量和嵌入向量的數量是否一致,以確保資料處理的正確性。如果數量不一致,可能會導致後續的處理出現錯誤。

資料擴增

為了模擬大規模資料的處理,我們需要對現有的資料進行擴增。資料擴增的過程是透過複製現有的分塊和嵌入向量來實作的。

# 定義擴增大小
dsize = 5 
total = dsize * len(chunks)
print("Total size", total)

# 初始化新的列表來儲存擴增後的資料
duplicated_chunks = []
duplicated_embeddings = []

# 複製分塊和嵌入向量
for i in range(len(chunks)):
    for _ in range(dsize):
        duplicated_chunks.append(chunks[i])
        duplicated_embeddings.append(embeddings[i])

# 檢查擴增後的資料數量
print(f"Number of duplicated chunks: {len(duplicated_chunks)}")
print(f"Number of duplicated embeddings: {len(duplicated_embeddings)}")

內容解密:

這段程式碼用於將現有的分塊和嵌入向量複製指定的次數,以達到擴增資料的目的。擴增後的資料可以用來模擬大規模資料的處理。

建立Pinecone索引

Pinecone是一個用於向量相似性搜尋的資料函式庫。我們需要建立一個Pinecone索引來儲存我們的向量資料。

# 初始化Pinecone連線
api_key = os.environ.get('PINECONE_API_KEY') 
pc = Pinecone(api_key=api_key)

# 定義索引名稱、雲端提供者和地區
index_name = 'bank-index-50000'
cloud = 'aws'
region = 'us-east-1'
spec = ServerlessSpec(cloud=cloud, region=region)

# 建立索引
if index_name not in pc.list_indexes().names():
    pc.create_index(
        index_name,
        dimension=1536, 
        metric='cosine',
        spec=spec
    )
    
# 連線到索引
index = pc.Index(index_name)
index.describe_index_stats()

內容解密:

這段程式碼用於建立一個Pinecone索引,並定義了索引的名稱、維度、相似性度量指標等引數。建立索引後,我們可以連線到該索引並檢視其狀態。

上傳向量資料到Pinecone索引

上傳向量資料到Pinecone索引是透過upsert操作來實作的。我們需要將嵌入向量和對應的中繼資料上傳到索引中。

# 定義上傳函式
def upsert_to_pinecone(data, batch_size):
    for i in range(0, len(data), batch_size):
        batch = data[i:i+batch_size]
        index.upsert(vectors=batch)

# 定義批次大小計算函式
def get_batch_size(data, limit=4000000): 
    total_size = 0
    batch_size = 0
    for item in data:
        item_size = sum([sys.getsizeof(v) for v in item.values()])
        if total_size + item_size > limit:
            break
        total_size += item_size
        batch_size += 1
    return batch_size

# 上傳資料到Pinecone索引
def batch_upsert(data):
    total = len(data)
    i = 0
    while i < total:
        batch_size = get_batch_size(data[i:])
        batch = data[i:i + batch_size]
        if batch:
            upsert_to_pinecone(batch, batch_size)
            i += batch_size
            print(f"Upserted {i}/{total} items...")
        else:
            break

內容解密:

這段程式碼定義了上傳函式upsert_to_pinecone和批次大小計算函式get_batch_size,用於將資料分批上傳到Pinecone索引中,以避免單次請求過大導致的效能問題。batch_upsert函式則負責協調整個上傳過程,並顯示上傳進度。

使用Pinecone向量資料函式庫進行高效查詢與RAG生成式AI應用

在現代化的資料驅動應用中,高效的向量檢索和生成式AI技術正逐漸成為關鍵的核心能力。本文將探討如何利用Pinecone向量資料函式庫進行大規模資料的索引與查詢,並結合RAG(Retrieval-Augmented Generation)技術實作自定義的生成式AI應用。

建立Pinecone向量索引

首先,我們需要為資料建立向量索引。以下程式碼展示瞭如何將資料批次嵌入並上傳至Pinecone:

# 為每個資料專案生成唯一ID
ids = [str(i) for i in range(1, len(duplicated_chunks) + 1)]

# 準備上傳資料
data_for_upsert = [
    {"id": str(id), "values": emb, "metadata": {"text": chunk}}
    for id, (chunk, emb) in zip(ids, zip(duplicated_chunks, duplicated_embeddings))
]

# 批次上傳資料至Pinecone
batch_upsert(data_for_upsert)

# 測量上傳時間
response_time = time.time() - start_time
print(f"上傳回應時間:{response_time:.2f} 秒")

內容解密:

  1. ID生成:為每個資料區塊生成唯一的ID,確保資料的可識別性。
  2. 資料準備:將資料區塊、對應的向量嵌入和元資料封裝成Pinecone所需的格式。
  3. 批次上傳:使用batch_upsert函式將準備好的資料分批次上傳至Pinecone索引。
  4. 效能測量:記錄上傳過程所需的時間,以評估系統效能。

查詢Pinecone向量儲存

接下來,我們需要驗證查詢的回應時間和結果品質。以下程式碼展示瞭如何查詢向量儲存並顯示結果:

# 定義查詢結果顯示函式
def display_results(query_results):
    for match in query_results['matches']:
        print(f"ID: {match['id']}, 相似度: {match['score']}")
        if 'metadata' in match and 'text' in match['metadata']:
            print(f"文字: {match['metadata']['text']}")

# 取得查詢嵌入向量
query_embedding = get_embedding(query_text, model=embedding_model)

# 執行查詢並顯示結果
query_results = index.query(vector=query_embedding, top_k=1, include_metadata=True)
display_results(query_results)

# 測量查詢回應時間
response_time = time.time() - start_time
print(f"查詢回應時間:{response_time:.2f} 秒")

內容解密:

  1. 查詢結果處理:定義display_results函式來解析和顯示查詢結果,包括匹配的ID、相似度和原始文字。
  2. 查詢嵌入:使用相同的嵌入模型將查詢文字轉換為向量表示。
  3. 執行查詢:在Pinecone索引中執行查詢,檢索最相似的向量。
  4. 效能評估:測量並輸出查詢所需的時間,以確保系統回應速度。

RAG生成式AI應用

最後,我們結合RAG技術,利用Pinecone的查詢結果生成自定義的市場訊息:

# 使用GPT-4o生成自定義訊息
def generate_custom_message(target_vector):
    # 查詢Pinecone索引取得相似向量
    query_results = index.query(vector=target_vector, top_k=5, include_metadata=True)
    
    # 處理查詢結果並生成自定義訊息
    # ...(省略具體實作細節)
    
    return custom_message

# 示例:為特定市場細分生成訊息
target_vector = get_embedding("目標市場細分描述", model=embedding_model)
custom_message = generate_custom_message(target_vector)
print("生成的自定義訊息:", custom_message)

內容解密:

  1. 目標向量:使用特定的市場細分描述生成目標向量。
  2. 查詢與處理:在Pinecone中查詢最相似的向量,並提取相關資訊。
  3. 生成訊息:結合檢索到的資訊,利用GPT-4o生成自定義的市場訊息。