在社群網路平台中,狀態訊息的發布和刪除是至關重要的功能。本文將根據 Redis 設計實作一個高效且可擴充套件的狀態訊息管理方案。此方案利用 Redis 的資料結構特性,最佳化了訊息發布和刪除的流程,並能有效處理大量使用者和高頻操作。我們將詳細介紹如何利用 Redis 的 Hash 和 Sorted Set 結構儲存和管理狀態訊息,並提供 Python 程式碼範例說明具體的實作細節。同時,我們也將探討如何透過非同步處理和鎖機制來提升系統的並發處理能力和資料一致性,並簡述建構串流 API 伺服器的步驟,包括使用 Python 建立多執行緒 HTTP 伺服器、處理 GET 和 POST 請求,以及解析客戶端識別碼和處理篩選條件等。

實作社群網路中的狀態訊息發布與刪除功能

在開發類別似Twitter的社群網路時,狀態訊息的發布與刪除是兩個核心功能。本文將探討如何利用Redis實作這兩個功能,並確保系統的高效能與擴充套件性。

狀態訊息發布流程

狀態訊息的發布涉及將訊息推播到使用者的個人時間軸以及其所有關注者的首頁時間軸。以下是具體實作步驟:

  1. 發布狀態訊息:使用者發布一則狀態訊息,該訊息會被儲存在Redis中的一個HASH結構中,鍵名格式為status:<status_id>
  2. 推播到關注者首頁時間軸:利用Redis的ZSET結構,將狀態訊息ID新增到所有關注者首頁時間軸中。鍵名格式為home:<follower_id>

程式碼實作:發布狀態訊息

def post_status(conn, uid, message):
    # 生成狀態訊息ID
    status_id = str(conn.incr('status:id'))
    
    # 儲存狀態訊息
    pipeline = conn.pipeline(True)
    pipeline.hmset('status:' + status_id, {
        'uid': uid,
        'message': message,
        'posted': time.time()
    })
    pipeline.zadd('profile:' + uid, {status_id: time.time()})
    pipeline.execute()
    
    # 推播狀態訊息到關注者首頁時間軸
    syndicate_status(conn, uid, status_id)

def syndicate_status(conn, uid, status_id):
    # 取得使用者的關注者
    followers = conn.zrangebyscore('followers:' + uid, 0, 'inf', start=0, num=1000, withscores=True)
    
    pipeline = conn.pipeline(False)
    for follower, _ in followers:
        pipeline.zadd('home:' + follower, {status_id: time.time()})
        pipeline.zremrangebyrank('home:' + follower, 0, -1000-1)  # 保持首頁時間軸大小為1000
    pipeline.execute()
    
    # 如果關注者數量超過1000,非同步處理剩餘的關注者
    if len(followers) >= 1000:
        execute_later(conn, 'default', 'syndicate_status', [conn, uid, status_id, followers[-1][1]])

內容解密:

  1. post_status函式負責發布狀態訊息,首先生成一個唯一的狀態訊息ID,然後將訊息儲存在Redis的HASH結構中,並將狀態訊息ID新增到使用者的個人時間軸中。
  2. syndicate_status函式負責將狀態訊息推播到使用者的關注者首頁時間軸中。它首先取得使用者的關注者列表,然後將狀態訊息ID新增到每個關注者的首頁時間軸中。如果關注者數量超過1000,則非同步處理剩餘的關注者。

狀態訊息刪除流程

當使用者刪除一則狀態訊息時,需要從Redis中移除該訊息,並更新相關的時間軸。

程式碼實作:刪除狀態訊息

def delete_status(conn, uid, status_id):
    key = 'status:' + status_id
    lock = acquire_lock_with_timeout(conn, key, 1)
    if not lock:
        return None
    
    # 驗證狀態訊息所有者
    if conn.hget(key, 'uid') != str(uid):
        return None
    
    pipeline = conn.pipeline(True)
    pipeline.delete(key)
    pipeline.zrem('profile:' + uid, status_id)
    pipeline.zrem('home:' + uid, status_id)
    pipeline.hincrby('user:' + uid, 'posts', -1)
    pipeline.execute()
    
    release_lock(conn, key, lock)
    return True

內容解密:

  1. delete_status函式首先嘗試取得狀態訊息的鎖,以防止並發刪除操作。
  2. 驗證狀態訊息的所有者是否與當前使用者匹配。
  3. 刪除狀態訊息HASH,並從使用者的個人時間軸和首頁時間軸中移除該訊息ID。
  4. 更新使用者的狀態訊息數量。
  5. 釋放鎖。

額外功能

除了基本的狀態訊息發布與刪除功能外,還可以考慮實作以下額外功能:

  • 私人使用者與關注請求
  • 訊息收藏與轉發
  • 直接訊息與回覆功能
  • @提及與#標籤功能
  • 垃圾資訊與濫用報告機制

這些功能將進一步豐富社群網路的功能性與使用者經驗。

8.5 串流API

隨著我們的社交網路開發持續進行,我們會想要了解系統中正在發生的事情——例如,每小時有多少篇文章被釋出、哪些話題最受關注,或者哪些使用者被頻繁提及。有三種方法可以實作這一點:第一種是透過API呼叫來收集這些資訊;第二種是在執行各種操作的函式內部記錄這些資訊;第三種,也是我們將在本文中探討的方法,是讓我們的函式廣播簡單的事件,這些事件被事件監聽器接收和處理,以分析資料。

在本文中,我將描述如何為我們的社交網路建立一個類別似於Twitter提供的串流API的後端。

與我們已經建立的系統的其他部分不同,串流API是一組完全不同的功能。我們之前建立的支援典型操作的函式旨在快速執行和完成。另一方面,串流API請求旨在長時間傳回資料。

大多數現代社交網路都提供透過某種API從系統中收集資訊的功能。Twitter在過去幾年中展示的一個優勢是,透過向第三方提供即時事件,這些第三方可以開發出獨特且有趣的資料分析,這些分析可能是Twitter本身沒有時間或興趣開發的。

建立串流API的第一步是瞭解我們將處理和生成的資料型別。

8.5.1 要串流的資料

當人們在我們的社交網路中執行各種操作時,這些操作會在定義API的各種函式中被觀察到。特別是,我們花了很多時間建立了關注/取消關注使用者和釋出/刪除訊息的功能。如果我們建立了社交網路的其他部分,我們還會發現由於使用者行為而發生的各種其他事件。串流API旨在隨著時間的推移產生一系列這些事件,以便讓客戶端或其他服務瞭解整個網路中正在發生的事情的一個子集。

在建立串流API的過程中,必須做出各種決定,這些決定通常可以歸結為三個主要問題: ■ 哪些事件應該被公開? ■ 應該存在哪些存取限制(如果有的話)? ■ 應該提供哪些型別的篩選選項?

目前,我不會回答關於存取限制的第二個問題。那是我們在建立社交網路時根據隱私和系統資源的預期需要回答的問題。但我會回答其他兩個問題。

因為我們專注於釋出/刪除訊息和關注/取消關注使用者,所以我們至少應該提供其中一些事件。為了保持簡單,我們目前只會產生訊息釋出和刪除事件。但是,根據我們建立和傳遞的結構,新增功能以支援關注/取消關注事件或其他我們新增的操作的事件應該很容易。

我們將提供的篩選選項型別將與Twitter在公共方面提供的API功能和特性有很大的重疊。特別是,我們將提供根據跟隨(使用者)、追蹤(關鍵字)和位置篩選訊息的功能,以及隨機選取的子集訊息,類別似於Twitter的firehose和sample streams。

現在我們知道了我們將要存取的資料,讓我們開始看看如何提供這些資料。

8.5.2 提供資料

在前面的章節中,當我們展示了對Redis進行呼叫的函式時,我們假設已經有一個現有的web伺服器會在適當的時候呼叫這些函式。在串流API的情況下,將資料串流到客戶端的細節可能比將這些函式插入現有的web服務堆積疊中更為複雜。特別是,大多數web伺服器都在假設下運作,即我們將一次性傳回對請求的整個回應,但對於串流API來說,情況絕對不是這樣。

串流API的回應是按照產生的順序接收狀態訊息,並進行匹配。儘管像WebSockets和SPDY這樣的現代技術可以提供增量資料生成,甚至是伺服器端推播訊息,但相關的協定仍在最終確定中,而且在許多程式語言中的客戶端支援並不完整。但是,有一種使用HTTP伺服器產生增量內容的方法——使用分塊傳輸編碼傳送資料。

在本文中,我們將建立一個簡單的web伺服器,支援串流到可以處理分塊HTTP回應的客戶端。這是為了支援我們後面的章節,這些章節將實際實作對串流訊息資料的篩選選項。

要建立這個串流HTTP web伺服器,我們必須更深入地瞭解Python程式語言。過去,我們試圖將所有內容保持在標準函式的範圍內,甚至在第6章中開始使用生成器(那是包含yield的程式碼)。但是在這裡,我們必須使用Python類別。這主要是因為我們不想從頭開始建立一個完整的web伺服器,而Python已經包含了可以混合使用以處理web服務的所有困難部分的伺服器。如果你曾經在其他語言中使用過類別,那麼你將對Python感到熟悉,因為Python中的類別是相似的。它們旨在封裝資料,並使用方法來操作資料。在我們的例子中,我們想要使用的大部分功能已經在現有的程式函式庫中可用;我們只需要將它們組合在一起。

一個串流HTTP伺服器

在Python中,我們有一系列的通訊端伺服器程式函式庫,可以混合在一起提供不同型別的功能。首先,我們將建立一個使用執行緒的伺服器。

from http.server import BaseHTTPRequestHandler, HTTPServer
import threading

class RequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # 處理GET請求
        self.send_response(200)
        self.send_header('Content-type', 'text/plain')
        self.end_headers()
        
        # 模擬串流資料
        for i in range(10):
            self.wfile.write(f"資料 {i}\n".encode())
            self.wfile.flush()  # 強制將資料傳送給客戶端

def run_server():
    server_address = ('', 8000)
    httpd = HTTPServer(server_address, RequestHandler)
    print('啟動串流HTTP伺服器...')
    httpd.serve_forever()

# 在執行緒中執行伺服器
server_thread = threading.Thread(target=run_server)
server_thread.daemon = True
server_thread.start()

# 保持主執行緒執行
while True:
    pass

內容解密:

這段程式碼建立了一個簡單的HTTP伺服器,支援串流資料到客戶端。RequestHandler類別處理客戶端的GET請求,並使用do_GET方法模擬串流資料到客戶端。run_server函式啟動HTTP伺服器,並在執行緒中執行,以避免阻塞主執行緒。

此範例展示瞭如何使用Python的http.server模組建立一個簡單的串流HTTP伺服器,並使用執行緒來執行伺服器。

  graph LR
    A[客戶端] -->|GET請求|> B[HTTP伺服器]
    B -->|串流資料|> A
    B -->|使用執行緒|> C[執行緒執行伺服器]

圖表翻譯: 此圖表展示了客戶端與HTTP伺服器之間的互動流程。客戶端傳送GET請求給HTTP伺服器,伺服器使用執行緒來處理請求並串流資料回客戶端。

建構簡易社群網路串流API伺服器

簡介

本章節將介紹如何建構一個簡易的社群網路串流API伺服器。該伺服器將支援基本的HTTP請求處理,並實作串流資料的傳輸。

架構設計

為了實作串流API伺服器,我們需要設計一個能夠處理多個平行請求的伺服器。為此,我們將使用Python的SocketServer.ThreadingMixInBaseHTTPServer.HTTPServer來建立一個多執行緒的HTTP伺服器。

class StreamingAPIServer(
    SocketServer.ThreadingMixIn,
    BaseHTTPServer.HTTPServer):
    daemon_threads = True

class StreamingAPIRequestHandler(
    BaseHTTPServer.BaseHTTPRequestHandler):
    def do_GET(self):
        parse_identifier(self)
        if self.path != '/statuses/sample.json':
            return self.send_error(404)
        process_filters(self)

    def do_POST(self):
        parse_identifier(self)
        if self.path != '/statuses/filter.json':
            return self.send_error(404)
        process_filters(self)

內容解密:

  1. StreamingAPIServer類別繼承自SocketServer.ThreadingMixInBaseHTTPServer.HTTPServer,使其能夠為每個請求建立新的執行緒進行處理。
  2. daemon_threads = True確保當主執行緒終止時,所有客戶端請求執行緒也會被終止。
  3. StreamingAPIRequestHandler類別定義瞭如何處理HTTP請求,特別是GET和POST請求。
  4. do_GETdo_POST方法分別處理GET和POST請求,並呼叫parse_identifier函式來解析客戶端識別碼。
  5. 如果請求路徑不符合預期,將傳回404錯誤;否則,將呼叫process_filters函式進行進一步處理。

啟動伺服器

為了啟動伺服器,我們需要例項化StreamingAPIServer並呼叫其serve_forever方法。

if __name__ == '__main__':
    server = StreamingAPIServer(
        ('localhost', 8080), StreamingAPIRequestHandler)
    print 'Starting server, use <Ctrl-C> to stop'
    server.serve_forever()

內容解密:

  1. 當該模組被直接執行時,將建立一個StreamingAPIServer例項,監聽本地8080埠,並使用StreamingAPIRequestHandler處理請求。
  2. serve_forever方法啟動伺服器,使其持續執行直到被手動停止。

解析客戶端識別碼

parse_identifier函式負責從請求查詢引數中提取客戶端識別碼。

def parse_identifier(handler):
    handler.identifier = None
    handler.query = {}
    if '?' in handler.path:
        handler.path, _, query = handler.path.partition('?')
        handler.query = urlparse.parse_qs(query)
    identifier = handler.query.get('identifier') or [None]
    handler.identifier = identifier[0]

內容解密:

  1. 初始化identifierquery屬性。
  2. 如果請求路徑中包含查詢引數,則解析這些引數並儲存在handler.query中。
  3. 從查詢引數中提取identifier值。

處理HTTP串流

process_filters函式負責驗證請求並串流資料給客戶端。

FILTERS = ('track', 'filter', 'location')

def process_filters(handler):
    id = handler.identifier
    if not id:
        return handler.send_error(401, "identifier missing")

內容解密:

  1. 定義了支援的篩選器型別。

  2. 檢查客戶端識別碼是否存在;如果不存在,則傳回401錯誤。

  3. 增加身份驗證機制,確保只有授權客戶端能夠存取API。

  4. 擴充套件篩選器功能,支援更多型別的篩選條件。

  5. 最佳化伺服器效能,提高平行處理能力。

技術選型分析

  1. 使用Python的http.serversocketserver模組簡化了HTTP伺服器的開發。
  2. 多執行緒設計提高了伺服器的平行處理能力。
  3. 使用查詢引數傳遞客戶端識別碼簡化了客戶端的實作。

安全性考量

  1. 目前的實作沒有任何身份驗證機制,需要增加相關功能以提高安全性。
  2. 需要對輸入的查詢引數進行驗證,防止潛在的安全漏洞。