隨著問答系統應用日益普及,提升其效能變得至關重要。本文首先介紹如何將 SubjQA 資料集轉換為 SQuAD 格式,以便進行模型訓練和評估。接著,我們將探討如何微調 Reader 模型,並使用評估指標來衡量其效能。此外,本文也介紹了生成式問答模型 RAG 和 RAG-Token 模型,它們能夠有效地整合多個檔案資訊,生成更準確的答案。最後,我們將探討如何在生產環境中提升 Transformers 模型的效率,包含知識蒸餾、量化、剪枝和 ONNX Runtime 等技術,並以意圖檢測為例,說明如何建立效能基準測試來評估模型的準確性、大小和延遲。
問答系統的進階最佳化與評估
將 SubjQA 資料集轉換為 SQuAD 格式
在進行問答系統的訓練與評估時,資料格式的統一是至關重要的。以下程式碼展示瞭如何將 SubjQA 資料集轉換為 SQuAD 格式:
for qid, question in id2question.items():
# 過濾單一問題 ID
question_df = df.query(f"id == '{qid}'").to_dict(orient="list")
ans_start_idxs = question_df["answers.answer_start"][0].tolist()
ans_text = question_df["answers.text"][0].tolist()
# 填充可回答的問題
if len(ans_start_idxs):
answers = [
{"text": text, "answer_start": answer_start}
for text, answer_start in zip(ans_text, ans_start_idxs)
]
is_impossible = False
else:
answers = []
is_impossible = True
# 將問答對新增到 qas
qas.append({"question": question, "id": qid, "is_impossible": is_impossible, "answers": answers})
# 將上下文和問答對新增到段落
paragraphs.append({"qas": qas, "context": review})
return paragraphs
內容解密:
- 問題 ID 過濾:使用
df.query根據問題 ID (qid) 過濾出對應的問題資料,並轉換為字典格式。 - 答案起始位置與文字提取:從篩選出的 DataFrame 中提取答案的起始位置 (
ans_start_idxs) 和答案文字 (ans_text)。 - 問答對構建:根據是否存在答案起始位置,決定是否將問題標記為不可回答 (
is_impossible),並構建問答對 (qas)。 - 段落構建:將問答對和對應的上下文 (
context) 新增到段落 (paragraphs) 中。
將資料集轉換為 SQuAD 格式並儲存
import json
def convert_to_squad(dfs):
for split, df in dfs.items():
subjqa_data = {}
# 為每個產品 ID 建立 `paragraphs`
groups = df.groupby("title").apply(create_paragraphs).to_frame(name="paragraphs").reset_index()
subjqa_data["data"] = groups.to_dict(orient="records")
# 將結果儲存到磁碟
with open(f"electronics-{split}.json", "w+", encoding="utf-8") as f:
json.dump(subjqa_data, f)
convert_to_squad(dfs)
內容解密:
- 分組處理:使用
groupby方法根據產品標題 (title) 對資料進行分組,並應用create_paragraphs函式生成 SQuAD 格式的段落。 - 資料儲存:將轉換後的資料以 JSON 格式儲存到檔案中,檔名根據資料集的分割 (
split) 命名。
微調 Reader 模型
train_filename = "electronics-train.json"
dev_filename = "electronics-validation.json"
reader.train(data_dir=".", use_gpu=True, n_epochs=1, batch_size=16,
train_filename=train_filename, dev_filename=dev_filename)
內容解密:
- 模型微調:使用訓練集 (
electronics-train.json) 和驗證集 (electronics-validation.json) 對 Reader 模型進行微調。 - 訓練引數設定:設定使用 GPU 加速,訓練輪數 (
n_epochs) 為 1,批次大小 (batch_size) 為 16。
評估整個問答系統
# 初始化檢索器管道
pipe = EvalRetrieverPipeline(es_retriever)
# 新增 Reader 節點
eval_reader = EvalAnswers()
pipe.pipeline.add_node(component=reader, name="QAReader", inputs=["EvalRetriever"])
pipe.pipeline.add_node(component=eval_reader, name="EvalReader", inputs=["QAReader"])
# 評估管道
run_pipeline(pipe)
# 提取 Reader 評估指標
reader_eval["QA Pipeline (top-1)"] = {k: v for k, v in eval_reader.__dict__.items() if k in ["top_1_em", "top_1_f1"]}
內容解密:
- 管道初始化:初始化一個包含檢索器 (
es_retriever) 的評估管道 (EvalRetrieverPipeline)。 - 新增節點:在管道中新增 Reader 節點 (
QAReader) 和評估節點 (EvalReader),以評估 Reader 的效能。 - 管道評估:執行管道評估,並提取 Reader 的評估指標 (
top_1_em和top_1_f1)。
超越抽取式問答:生成式問答
生成式問答是一種利用預訓練語言模型生成答案的方法,具有合成多個段落資訊的能力。RAG(Retrieval-Augmented Generation)是一種結合檢索器和生成器的端對端架構,能夠有效地生成答案。
RAG 架構的優勢:
- 端對端微調:RAG 允許對檢索器和生成器進行聯合微調,提高了模型的整體效能。
- 多檔案綜合:RAG 能夠綜合多個檢索到的檔案資訊,生成更準確和全面的答案。
RAG 的應用展示了生成式問答在未來問答系統中的巨大潛力。隨著研究的深入,生成式問答有望在工業界得到廣泛應用。
RAG-Token模型在問答系統中的應用
RAG-Token模型允許在生成答案的每個token時使用不同的檔案,這使得生成器能夠從多個檔案中綜合證據。由於RAG-Token模型的表現往往優於RAG-Sequence模型,因此我們將使用在NQ資料集上進行微調的token模型作為生成器。
初始化生成器
在Haystack中例項化生成器與例項化閱讀器類別似,但需要指定控制文字生成的超引數,而不是指定max_seq_length和doc_stride引數:
from haystack.generator.transformers import RAGenerator
generator = RAGenerator(model_name_or_path="facebook/rag-token-nq", embed_title=False, num_beams=5)
內容解密:
model_name_or_path="facebook/rag-token-nq":指定使用預訓練的RAG-Token模型。embed_title=False:不嵌入檔案標題,因為我們的語料函式庫總是按產品ID過濾。num_beams=5:指定在beam search中使用5個beam,用於控制文字生成的多樣性。
結合檢索器和生成器
使用Haystack的GenerativeQAPipeline將檢索器和生成器結合起來:
from haystack.pipeline import GenerativeQAPipeline
pipe = GenerativeQAPipeline(generator=generator, retriever=dpr_retriever)
內容解密:
generator=generator:傳入之前初始化的生成器。retriever=dpr_retriever:傳入DensePassageRetriever檢索器。
測試RAG模型
編寫一個簡單的函式來測試RAG模型:
def generate_answers(query, top_k_generator=3):
preds = pipe.run(query=query, top_k_generator=top_k_generator, top_k_retriever=5, filters={"item_id": ["B0074BW614"]})
print(f"Question: {preds['query']} \n")
for idx in range(top_k_generator):
print(f"Answer {idx+1}: {preds['answers'][idx]['answer']}")
內容解密:
pipe.run():執行GenerativeQAPipeline,傳入查詢和其他引數。top_k_generator=3:傳回前3個生成的答案。filters={"item_id": ["B0074BW614"]}:過濾特定的產品ID。
實驗結果
執行測試查詢:
generate_answers("Is it good for reading?")
generate_answers("What is the main drawback?")
內容解密:
- 第一個查詢的結果顯示,對於主觀性問題,模型的回答可能不夠精確。
- 第二個查詢的結果顯示,對於事實性問題,模型的回答更為合理。
未來研究方向
- 多模態問答:研究如何結合文字、表格和影像等多種模態來回答複雜的問題。
- 知識圖譜問答:探索如何利用知識圖譜來回答問題,透過將事實表示為(主體、謂詞、客體)三元組。
- 自動問題生成:研究如何自動生成問題,用於無監督或弱監督訓練。
這些研究方向有望推動問答系統的發展,使其能夠更好地服務於實際應用。
提升Transformers在生產環境中的效率
在前面的章節中,我們已經瞭解瞭如何微調Transformers以在多種任務中取得優異的結果。然而,在許多情況下,準確度(或其他最佳化指標)並不足夠;最先進的模型如果速度太慢或體積太大,無法滿足應用程式的業務需求,那麼它就不是很有用。一個明顯的替代方案是訓練一個更快、更緊湊的模型,但模型能力的降低往往伴隨著效能的下降。那麼,當你需要一個快速、緊湊且高度準確的模型時,你該怎麼辦?
在本章中,我們將探討四種互補的技術,這些技術可用於加快Transformers模型的預測速度並減少其記憶體佔用:知識蒸餾、量化、剪枝和利用Open Neural Network Exchange(ONNX)格式和ONNX Runtime(ORT)進行圖形最佳化。我們還將瞭解如何結合這些技術以獲得顯著的效能提升。例如,Roblox工程團隊在他們的文章“How We Scaled Bert to Serve 1+ Billion Daily Requests on CPUs”中就採用了這種方法,他們發現透過結合知識蒸餾和量化,可以將BERT分類別器的延遲和吞吐量提高30倍以上!
意圖檢測作為案例研究
假設我們正在嘗試為公司的呼叫中心建立一個根據文字的助手,以便客戶可以在無需與人工代理溝通的情況下查詢帳戶餘額或進行預訂。為了理解客戶的目標,我們的助手需要能夠將各種自然語言文字分類別為一組預定義的操作或意圖。例如,客戶可能會傳送如下訊息:
Hey, I’d like to rent a vehicle from Nov 1st to Nov 15th in Paris and I need a 15 passenger van
我們的意圖分類別器可以自動將其分類別為汽車租賃意圖,從而觸發操作和回應。為了在生產環境中保持穩健,我們的分類別器還需要能夠處理超出範圍的查詢,即客戶提出的問題不屬於任何預定義的意圖,並且系統應給出回退回應。
作為基線,我們已經微調了一個BERT-base模型,該模型在CLINC150資料集上達到了約94%的準確度。該資料集包含跨150個意圖和10個領域(如銀行和旅遊)的22,500個範圍內查詢,以及1,200個範圍外查詢,這些查詢屬於一個oos意圖類別。
建立效能基準
像其他機器學習模型一樣,在生產環境中佈署Transformers涉及多個約束之間的權衡,最常見的是:
- 模型效能:我們的模型在精心設計的測試集上的表現如何?這一點尤其重要,當錯誤的成本很高時(最好透過人工介入來緩解),或者當我們需要在數百萬個示例上執行推理時,模型指標的小幅改進就可以轉化為整體的大幅收益。
- 延遲:我們的模型可以多快地提供預測?我們通常關心的是在處理大量流量的實時環境中的延遲,例如Stack Overflow需要一個分類別器來快速檢測網站上的不良評論。
- 記憶體:我們如何佈署像GPT-2或T5這樣需要數GB磁碟儲存和RAM的億引數模型?記憶體在行動裝置或邊緣裝置中扮演著特別重要的角色,在這些裝置中,模型必須在無法存取強大的雲端伺服器的情況下生成預測。
為了探討如何透過各種壓縮技術來最佳化每個約束,讓我們首先建立一個簡單的基準測試,測量給定pipeline和測試集的每個數量。下面是我們需要的類別框架:
class PerformanceBenchmark:
def __init__(self, pipeline, dataset, optim_type="BERT baseline"):
self.pipeline = pipeline
建立效能基準測試
為了評估模型的效能,我們需要建立一個基準測試。這個基準測試將幫助我們瞭解模型的延遲、記憶體使用情況和準確度。讓我們繼續實作PerformanceBenchmark類別,以衡量這些指標。
內容解密:
PerformanceBenchmark類別初始化:我們定義了一個名為PerformanceBenchmark的類別,它的初始化方法接受一個pipeline、一個資料集和一個最佳化型別。這個類別將幫助我們評估模型的效能。- pipeline和資料集:pipeline代表了我們的模型,而資料集則是用於評估模型效能的測試資料。
- 最佳化型別:最佳化型別是一個字串,用於標識我們正在評估的模型的版本或組態。
透過建立這個類別,我們可以建立一個簡單而有效的基準測試,以評估我們的Transformers模型在生產環境中的效能。接下來,我們將探討如何使用這個基準測試來評估模型的延遲、記憶體使用情況和準確度。
效能基準測試的實作
在生產環境中最佳化Transformers的效率時,建立一個效能基準測試是非常重要的步驟。本章節將介紹如何實作一個簡單的效能基準測試類別PerformanceBenchmark,用於評估模型的準確性、大小和延遲。
PerformanceBenchmark類別的定義
首先,我們定義一個PerformanceBenchmark類別,用於封裝與效能相關的方法和屬性。
class PerformanceBenchmark:
def __init__(self, pipeline, dataset, optim_type):
self.pipeline = pipeline
self.dataset = dataset
self.optim_type = optim_type
def compute_accuracy(self):
# 將在後面實作
pass
def compute_size(self):
# 將在後面實作
pass
def time_pipeline(self):
# 將在後面實作
pass
def run_benchmark(self):
metrics = {}
metrics[self.optim_type] = self.compute_size()
metrics[self.optim_type].update(self.time_pipeline())
metrics[self.optim_type].update(self.compute_accuracy())
return metrics
載入CLINC150資料集
我們使用datasets函式庫載入CLINC150資料集,這是一個用於意圖分類別的資料集。
from datasets import load_dataset
clinc = load_dataset("clinc_oos", "plus")
實作compute_accuracy方法
我們使用datasets函式庫中的load_metric函式載入準確度指標,並實作compute_accuracy方法來計算模型的準確度。
from datasets import load_metric
def compute_accuracy(self):
"""覆寫PerformanceBenchmark.compute_accuracy()方法"""
preds, labels = [], []
intents = self.dataset.features["intent"]
for example in self.dataset:
pred = self.pipeline(example["text"])[0]["label"]
label = example["intent"]
preds.append(intents.str2int(pred))
labels.append(label)
accuracy_score = load_metric("accuracy")
accuracy = accuracy_score.compute(predictions=preds, references=labels)
print(f"測試集上的準確度 - {accuracy['accuracy']:.3f}")
return accuracy
PerformanceBenchmark.compute_accuracy = compute_accuracy
內容解密:
- 載入準確度指標:使用
load_metric("accuracy")載入準確度指標。 - 遍歷資料集:對資料集中的每個樣本,使用管道進行預測,並將預測結果和真實標籤轉換為整數ID。
- 計算準確度:使用
accuracy_score.compute計算預測結果和真實標籤之間的準確度。
實作compute_size方法
我們使用PyTorch的torch.save函式儲存模型的狀態字典到磁碟,並計算模型的大小。
import torch
from pathlib import Path
def compute_size(self):
"""覆寫PerformanceBenchmark.compute_size()方法"""
state_dict = self.pipeline.model.state_dict()
tmp_path = Path("model.pt")
torch.save(state_dict, tmp_path)
size_mb = Path(tmp_path).stat().st_size / (1024 * 1024)
tmp_path.unlink()
print(f"模型大小 (MB) - {size_mb:.2f}")
return {"size_mb": size_mb}
PerformanceBenchmark.compute_size = compute_size
內容解密:
- 儲存模型狀態字典:使用
torch.save將模型的狀態字典儲存到磁碟。 - 計算模型大小:使用
Path.stat().st_size取得儲存的檔案大小,並轉換為MB。 - 刪除臨時檔案:使用
tmp_path.unlink()刪除儲存的臨時檔案。
實作time_pipeline方法
我們使用Python的time模組中的perf_counter函式來測量管道的延遲。
from time import perf_counter
def time_pipeline(self):
# 將在後面實作
pass
內容解密:
- 使用perf_counter測量時間:使用
perf_counter函式測量程式碼執行的時間差。 - 計算平均延遲:透過多次測量並計算平均值來獲得管道的平均延遲。