線上上廣告系統中,精準的廣告點選率預測至關重要。本文首先探討如何利用 Redis 儲存廣告資訊、點選率資料和排序分數,構建高效的廣告定向引擎。接著,說明如何利用 pipeline 批次處理 Redis 指令,並使用 MGET 和 ZMGET 一次取得多個值,有效降低 Redis 查詢次數,提升系統效能。此外,文章也探討了引入機器學習模型、多維度資料分析和實時資料處理等未來最佳化方向。下半部分則示範如何使用 Redis 構建簡化的社交網路平台,包含使用者資訊、狀態訊息、主時間軸和關注者列表等功能的設計與實作。文章提供 Python 程式碼範例,說明如何使用 Redis 的 Hash、Set 和 ZSET 結構儲存和管理資料,並探討效能最佳化、安全性考量和未來發展方向,例如分片技術、實時訊息推播和使用者推薦等功能。

廣告點選率預測與最佳化:深入解析 Redis 在廣告定向中的應用

在現代的線上廣告系統中,廣告點選率(Click-Through Rate, CTR)的預測與最佳化是至關重要的。準確的 CTR 預測能夠幫助廣告系統選擇最合適的廣告進行展示,從而提高廣告的投放效果和收益。本篇文章將探討如何利用 Redis 實作一個廣告定向引擎,並對其進行最佳化。

Redis 在廣告定向中的角色

Redis 是一種高效能的鍵值資料函式庫,廣泛應用於需要快速讀寫操作的場景。在廣告定向中,Redis 可以用來儲存廣告的相關資訊、點選率資料以及用於排序的相關分數。

廣告定向引擎的實作

廣告資料結構設計

在廣告定向引擎中,我們需要設計合適的資料結構來儲存廣告的相關資訊。以下是一些關鍵的資料結構:

  1. 廣告基本資訊:使用 Hash 結構儲存廣告的基本資訊,如廣告 ID、廣告型別、基礎價值等。
  2. 廣告關鍵字:使用 Set 結構儲存與廣告相關的關鍵字。
  3. 廣告點選率資料:使用 ZSET 結構儲存廣告的點選率資料,用於排序和篩選。

更新廣告點選率

更新廣告點選率是廣告定向引擎中的一個重要步驟。以下是一個更新廣告點選率的範例程式碼:

def update_cpms(conn, ad_id):
    pipeline = conn.pipeline()
    
    # 取得廣告型別、基礎價值和關鍵字
    pipeline.hget('type:', ad_id)
    pipeline.zscore('ad:base_value:', ad_id)
    pipeline.smembers('terms:' + ad_id)
    type, base_value, words = pipeline.execute()
    
    # 根據廣告型別決定使用點選率還是轉換率
    which = 'clicks'
    if type == 'cpa':
        which = 'actions'
    
    # 取得廣告型別的點選率和瀏覽量
    pipeline.get('type:%s:views:' % type)
    pipeline.get('type:%s:%s' % (type, which))
    type_views, type_clicks = pipeline.execute()
    
    # 更新全域點選率字典
    AVERAGE_PER_1K[type] = (
        1000. * int(type_clicks or '1') / int(type_views or '1'))
    
    # 如果是 CPM 廣告,則不更新 eCPM
    if type == 'cpm':
        return
    
    # 取得廣告的瀏覽量和點選量
    view_key = 'views:%s' % ad_id
    click_key = '%s:%s' % (which, ad_id)
    pipeline.zscore(view_key, '')
    pipeline.zscore(click_key, '')
    ad_views, ad_clicks = pipeline.execute()
    
    # 計算廣告的 eCPM
    to_ecpm = TO_ECPM[type]
    if (ad_clicks or 0) < 1:
        ad_ecpm = conn.zscore('idx:ad:value:', ad_id)
    else:
        ad_ecpm = to_ecpm(ad_views or 1, ad_clicks or 0, base_value)
    pipeline.zadd('idx:ad:value:', ad_id, ad_ecpm)
    
    # 更新關鍵字的 eCPM 和獎勵值
    for word in words:
        pipeline.zscore(view_key, word)
        pipeline.zscore(click_key, word)
        views, clicks = pipeline.execute()[-2:]
        if (clicks or 0) < 1:
            continue
        word_ecpm = to_ecpm(views or 1, clicks or 0, base_value)
        bonus = word_ecpm - ad_ecpm
        pipeline.zadd('idx:' + word, ad_id, bonus)
    
    pipeline.execute()

#### 內容解密:
此函式首先取得廣告的型別基礎價值和相關關鍵字然後根據廣告型別決定是使用點選率還是轉換率來計算 eCPM接著它更新全域的點選率字典並根據廣告的瀏覽量和點選量計算廣告的 eCPM最後它更新每個關鍵字的 eCPM 和獎勵值並將結果寫回 Redis

### 最佳化廣告點選率預測

#### 減少 Redis 查詢次數

在上述的 `update_cpms` 函式中我們進行了多次 Redis 查詢為了提高效能我們可以將這些查詢進行批次處理以減少與 Redis 的互動次數

```python
def update_cpms_optimized(conn, ad_id):
    pipeline = conn.pipeline()
    
    # ... 省略部分程式碼
    
    # 使用 MGET 一次性取得多個值
    keys = ['type:%s:views:' % type, 'type:%s:%s' % (type, which)]
    pipeline.mget(keys)
    type_views, type_clicks = pipeline.execute()
    
    # ... 省略部分程式碼
    
    # 對每個關鍵字進行 ZScore 查詢時,使用一次性查詢多個關鍵字的分數
    word_keys = [view_key, click_key]
    for word in words:
        word_keys.extend([view_key + ':' + word, click_key + ':' + word])
    pipeline.zmget(word_keys, words)
    results = pipeline.execute()
    
    # 處理查詢結果
    for i, word in enumerate(words):
        views, clicks = results[i*2], results[i*2+1]
        # ... 省略部分程式碼
    
    pipeline.execute()

#### 內容解密:
此最佳化版本的函式透過使用 `MGET`  `ZMGET` 命令一次性取得多個值從而減少了與 Redis 的互動次數提高了效能



1. **引入機器學習模型**利用機器學習模型來預測廣告的點選率從而進一步提高廣告定向的準確性
2. **多維度資料分析**結合更多的資料維度如使用者行為廣告位等來進行更精準的廣告定向
3. **實時資料處理**實作實時資料處理和分析以更快地回應廣告效果的變化

### 參考資料

- Redis 官方檔案https://redis.io/documentation
- 廣告定向技術相關論文和研究資料

### 附錄

#### Redis 命令參考

- `HGET`:取得 Hash 中特定欄位的值
- `ZADD`: ZSET 中新增成員並設定分數
- `ZSCORE`:取得 ZSET 中成員的分數
- `SMEMBERS`:取得 Set 中的所有成員
- `MGET`:一次性取得多個鍵的值
- `ZMGET`:一次性取得 ZSET 中多個成員的分數

#### 程式碼範例

完整的 `update_cpms` 函式和最佳化版本 `update_cpms_optimized` 的程式碼範例可參考上述內容

## 廣告定向引擎架構圖
```mermaid
graph LR
    A[廣告資料儲存] -->|Redis|> B[廣告定向引擎]
    B -->|取得廣告資訊|> C[計算廣告 eCPM]
    C -->|更新廣告點選率|> D[廣告排序]
    D -->|傳回最佳廣告|> E[廣告展示]
    E -->|收集點選資料|> F[更新點選率資料]
    F -->|回饋|> B

圖表翻譯: 此圖表展示了廣告定向引擎的整體架構。首先,廣告資料儲存在 Redis 中。廣告定向引擎從 Redis 取得廣告資訊,並計算廣告的 eCPM。然後,根據計算出的 eCPM 更新廣告的點選率,並進行廣告排序。排序後,傳回最優的廣告進行展示。展示後,收集使用者的點選資料,並更新點選率資料。最後,將更新後的點選率資料回饋給廣告定向引擎,以進行下一次的廣告定向。

7.4 搜尋工作機會

在過去的某個時刻,你是否曾經像許多人一樣,花費大量時間瀏覽分類別廣告和線上求職頁面,或是透過徵才機構尋找工作機會?在篩選工作機會時,除了地點之外,第一件被檢查的事情通常是所需的經驗和技能。

本文將討論如何利用Redis的SETs和ZSETs資料結構,來找出候選人具備所有必要技能的工作機會。閱讀完本文後,你將瞭解如何利用Redis的資料模型來解決實際問題。

7.4.1 逐一處理工作機會

乍看之下,我們可能會考慮一個直接了當的解決方案:為每一個工作機會建立一個SET,將該工作所需的技能新增為SET的成員。要檢查候選人是否具備某個工作機會所需的所有技能,我們可以先將候選人的技能新增到另一個SET中,然後對工作機會的SET和候選人技能的SET執行SDIFF操作。如果結果SDIFF中沒有任何技能,代表候選人具備該工作所需的所有技能。以下是新增工作機會和檢查候選人技能是否足夠的程式碼範例:

def add_job(conn, job_id, required_skills):
    conn.sadd('job:' + job_id, *required_skills)

def is_qualified(conn, job_id, candidate_skills):
    temp = str(uuid.uuid4())
    pipeline = conn.pipeline(True)
    pipeline.sadd(temp, *candidate_skills)
    pipeline.expire(temp, 5)
    pipeline.sdiff('job:' + job_id, temp)
    return not pipeline.execute()[-1]

內容解密:

  1. add_job函式將特定工作機會所需的所有技能新增到以job_id為名的SET中。
  2. is_qualified函式檢查候選人的技能是否滿足特定工作機會的需求。它首先建立一個臨時的SET來儲存候選人的技能,並設定該SET在5秒後過期,以避免佔用過多記憶體。
  3. 執行SDIFF操作來找出該工作機會所需但候選人缺乏的技能。
  4. 如果SDIFF的結果為空,表示候選人具備所有必要技能,函式傳回True

然而,這種方法在擴充套件性上存在問題,因為我們需要逐一檢查每個工作機會,以確定候選人是否具備所需的技能。

7.4.2 以搜尋方式處理問題

如同在7.3.3節中,我們利用SETs和ZSETs來儲存廣告定向引數的可選獎勵。同樣地,我們也可以用類別似的方法處理所需的技能。

我們將問題轉換為:為每個技能建立一個SET,儲存需要該技能的所有工作機會。同時,我們建立一個ZSET來儲存每個工作機會所需的技能總數。以下是建立索引的程式碼範例:

def index_job(conn, job_id, skills):
    pipeline = conn.pipeline(True)
    for skill in skills:
        pipeline.sadd('idx:skill:' + skill, job_id)
    pipeline.zadd('idx:jobs:req', {job_id: len(set(skills))})
    pipeline.execute()

內容解密:

  1. index_job函式為每個技能建立一個SET,並將相關的工作機會ID新增到該SET中。
  2. 同時,將每個工作機會所需的技能總數新增到名為idx:jobs:req的ZSET中,其中job_id為成員,對應的分數是所需的技能數量。

要搜尋候選人具備所有必要技能的工作機會,我們執行以下步驟:

  1. 對候選人具備的技能對應的SET執行ZUNIONSTORE操作,以計算每個工作機會的總分(即候選人具備的技能數量)。
  2. 對候選人的技能ZSET和所需技能ZSET執行ZINTERSTORE操作,分別賦予權重-1和1。最終ZSET中分數為0的工作機會ID,即代表候選人具備該工作所需的所有技能。

以下是執行搜尋操作的程式碼範例:

def find_jobs(conn, candidate_skills):
    # 首先對候選人具備的技能對應的SET執行ZUNIONSTORE
    pipeline = conn.pipeline(True)
    pipeline.zunionstore('candidate:skills', dict(('idx:skill:' + skill, 1) for skill in candidate_skills), aggregate='sum')
    
    # 然後對候選人的技能ZSET和所需技能ZSET執行ZINTERSTORE
    pipeline.zinterstore('matches', {'candidate:skills': 1, 'idx:jobs:req': -1}, aggregate='sum')
    
    # 取得分數為0的工作機會ID,即候選人具備所有必要技能的工作機會
    pipeline.zrangebyscore('matches', 0, 0)
    return pipeline.execute()[-1]

內容解密:

  1. find_jobs函式首先計算候選人具備的技能對應的工作機會ID的總分。
  2. 然後執行ZINTERSTORE操作,以找出候選人具備所有必要技能的工作機會。
  3. 最後傳回分數為0的工作機會ID列表,即候選人完全具備所需技能的工作機會。

使用Redis構建簡單社交網路

隨著社群媒體的興起,開發一個類別似Twitter的平台變得越來越重要。在本章中,我們將探討如何使用Redis構建一個簡單的社交網路,涵蓋使用者和狀態物件、主時間軸、關注者列表、發布狀態訊息等功能。

使用者和狀態訊息

在任何社交網路中,使用者和狀態訊息都是最基本的組成部分。使用者物件包含了使用者的基本資訊,如使用者名稱、關注者數量、發布的狀態訊息數量等。狀態訊息則是使用者表達自己和與他人互動的主要方式。

使用者資訊

在我們的Twitter類別似平台中,我們將使用者資訊儲存在Redis的HASH資料結構中。每個使用者物件包含以下資訊:

  • 使用者名稱
  • 關注者數量
  • 正在關注的使用者數量
  • 發布的狀態訊息數量
  • 註冊時間戳

建立新使用者時,我們初始化這些欄位,並將關注者、正在關注的使用者和發布的狀態訊息數量設為0。

def create_user(conn, login, name):
    # 生成使用者ID
    user_id = conn.incr('user:id')

    # 將使用者資訊儲存在HASH中
    conn.hmset(f'user:{user_id}', {
        'login': login,
        'id': user_id,
        'name': name,
        'followers': 0,
        'following': 0,
        'posts': 0,
        'signup': time.time()
    })

    return user_id

內容解密:

  1. conn.incr('user:id'):為新使用者生成一個唯一的ID。
  2. conn.hmset(f'user:{user_id}', {...}):將使用者的基本資訊儲存在Redis的HASH中。
  3. 初始化使用者的關注者數量、正在關注的使用者數量和發布的狀態訊息數量為0。

狀態訊息

狀態訊息是使用者在平台上發布的內容。每個狀態訊息都包含訊息內容、發布時間、使用者ID等資訊。我們同樣使用HASH來儲存狀態訊息。

def create_status(conn, uid, message):
    # 生成狀態訊息ID
    status_id = conn.incr('status:id')

    # 儲存狀態訊息
    conn.hmset(f'status:{status_id}', {
        'id': status_id,
        'uid': uid,
        'message': message,
        'posted': time.time()
    })

    # 更新使用者的狀態訊息數量
    conn.hincrby(f'user:{uid}', 'posts', 1)

    return status_id

內容解密:

  1. conn.incr('status:id'):為新狀態訊息生成一個唯一的ID。
  2. conn.hmset(f'status:{status_id}', {...}):將狀態訊息的詳細資訊儲存在Redis的HASH中。
  3. conn.hincrby(f'user:{uid}', 'posts', 1):更新使用者的狀態訊息數量。

主時間軸和關注者列表

主時間軸顯示了使用者及其關注者的最新狀態訊息。關注者列表則包含了關注某個使用者的所有使用者。

發布狀態訊息到主時間軸

當使用者發布新的狀態訊息時,我們需要將該訊息推播到其關注者的主時間軸中。

def post_status(conn, uid, message):
    status_id = create_status(conn, uid, message)
    # 取得使用者的關注者列表
    followers = conn.smembers(f'followers:{uid}')

    # 將狀態訊息ID推播到每個關注者的主時間軸
    for follower in followers:
        conn.zadd(f'timeline:{follower}', {status_id: time.time()})

    return status_id

內容解密:

  1. create_status(conn, uid, message):建立新的狀態訊息並傳回其ID。
  2. conn.smembers(f'followers:{uid}'):取得使用者的關注者列表。
  3. conn.zadd(f'timeline:{follower}', {status_id: time.time()}):將狀態訊息ID新增到每個關注者的主時間軸中,按時間排序。

關注和取消關注

使用者可以關注其他使用者,也可以取消關注。

關注某使用者

當使用者A關注使用者B時,我們需要更新使用者A的正在關注列表和使用者B的關注者列表。

def follow_user(conn, uid, follow_uid):
    # 將follow_uid新增到uid的正在關注列表
    conn.sadd(f'following:{uid}', follow_uid)
    # 將uid新增到follow_uid的關注者列表
    conn.sadd(f'followers:{follow_uid}', uid)

    # 更新使用者的關注數量
    conn.hincrby(f'user:{uid}', 'following', 1)
    conn.hincrby(f'user:{follow_uid}', 'followers', 1)

內容解密:

  1. conn.sadd(f'following:{uid}', follow_uid):將被關注的使用者ID新增到使用者的正在關注列表中。
  2. conn.sadd(f'followers:{follow_uid}', uid):將使用者ID新增到被關注使用者的關注者列表中。
  3. 更新使用者的關注和被關注數量。
  • 擴充套件到更大規模:透過分片技術將資料分佈到多個Redis例項上,以支援更大規模的使用者群。
  • 最佳化效能:進一步最佳化資料結構和查詢邏輯,以提高系統的回應速度和吞吐量。
  • 增加新功能:如實時訊息推播、使用者推薦等,以增強使用者經驗和平台的粘性。

效能最佳化分析

  1. 使用Pipeline技術:批次執行Redis命令,減少網路延遲。
  2. 合理設計資料結構:根據存取模式最佳化資料儲存結構,提高查詢效率。
  3. 利用Redis的過期機制:自動清理過期資料,減少記憶體佔用。

安全性考量

  1. 資料加密:對敏感資料進行加密儲存,保護使用者隱私。
  2. 存取控制:實施嚴格的存取控制策略,防止未授權存取。
  3. 備份和還原:定期備份資料,並確保能夠快速還原,以防資料丟失。

本章介紹瞭如何使用Redis構建一個簡單的社交網路,包括使用者和狀態訊息的管理、關注者列表和主時間軸的實作。這些技術和方法為構建更複雜的社交網路平台提供了基礎。接下來的章節將進一步探討如何擴充套件和最佳化這些功能,以支援更大規模和更複雜的應用場景。