在開發類別似推特的社交網路應用時,高效管理使用者資料和狀態訊息至關重要。本文以 Python 和 Redis 為例,示範如何實作使用者建立、狀態訊息釋出、首頁時間軸、以及使用者追蹤等核心功能。程式碼中使用 Redis 的鎖定機制確保使用者名稱唯一性,並利用 pipeline 批次操作提升效能。為處理大量使用者關注的狀況,狀態訊息推播採用分批策略,優先更新前 1000 位關注者的首頁時間軸,確保快速回應。文章也探討了資料儲存結構設計,使用 HASH 儲存使用者和狀態訊息資料,ZSET 儲存時間軸,並以時間戳記作為排序依據。最後,文章提出了未來發展方向,例如加入使用者互動功能、搜尋功能等,以及效能最佳化策略,例如使用非同步任務佇列、Lua 指令碼等。
使用者與狀態管理實作
在建立一個類別似推特(Twitter)的社交網路時,使用者資料與狀態訊息的管理是核心功能之一。本章節將探討如何設計與實作這些功能。
8.1 使用者與狀態訊息的建立
8.1.1 建立使用者
建立使用者的過程中,我們需要確保使用者名稱(login name)的唯一性。以下是一個範例程式碼:
def create_user(conn, login, name):
llogin = login.lower()
lock = acquire_lock_with_timeout(conn, 'user:' + llogin, 1)
if not lock:
return None
if conn.hget('users:', llogin):
return None
id = conn.incr('user:id:')
pipeline = conn.pipeline(True)
pipeline.hset('users:', llogin, id)
pipeline.hmset('user:%s' % id, {
'login': login,
'id': id,
'name': name,
'followers': 0,
'following': 0,
'posts': 0,
'signup': time.time(),
})
pipeline.execute()
release_lock(conn, 'user:' + llogin, lock)
return id
內容解密:
- 鎖定機制:為了避免同時建立相同的使用者名稱,我們使用鎖定機制來確保唯一性。
- 使用者名稱轉換:將使用者名稱轉換為小寫,以避免大小寫不同導致的重複註冊。
- 檢查使用者名稱:在鎖定後再次檢查使用者名稱是否已被註冊。
- 生成唯一ID:使用 Redis 的
incr
命令生成一個唯一的使用者ID。 - 儲存使用者資料:將使用者的基本資料儲存到 Redis 的 HASH 中。
8.1.2 狀態訊息
狀態訊息是用來儲存使用者釋出的內容。以下是一個建立狀態訊息的範例:
def create_status(conn, uid, message, **data):
pipeline = conn.pipeline(True)
pipeline.hget('user:%s' % uid, 'login')
pipeline.incr('status:id:')
login, id = pipeline.execute()
if not login:
return None
data.update({
'message': message,
'posted': time.time(),
'id': id,
'uid': uid,
'login': login,
})
pipeline.hmset('status:%s' % id, data)
pipeline.hincrby('user:%s' % uid, 'posts')
pipeline.execute()
return id
內容解密:
- 取得使用者登入名稱:根據使用者ID取得登入名稱。
- 生成狀態訊息ID:使用 Redis 的
incr
命令生成一個唯一的狀態訊息ID。 - 儲存狀態訊息:將狀態訊息的詳細資料儲存到 Redis 的 HASH 中。
- 更新使用者釋出數量:增加使用者的釋出數量。
8.2 首頁時間軸
首頁時間軸(Home Timeline)是使用者登入後看到的頁面,包含了使用者自己和他們關注的人的狀態訊息。
取得狀態訊息
def get_status_messages(conn, uid, timeline='home:', page=1, count=30):
statuses = conn.zrevrange('%s%s' % (timeline, uid), (page-1)*count, page*count-1)
pipeline = conn.pipeline(True)
for id in statuses:
pipeline.hgetall('status:%s' % id)
return filter(None, pipeline.execute())
內容解密:
- 取得狀態訊息ID列表:根據時間軸和使用者ID取得狀態訊息的ID列表。
- 取得狀態訊息詳細資料:使用 pipeline 批次取得狀態訊息的詳細資料。
- 過濾遺失的訊息:移除可能已被刪除的狀態訊息。
資料儲存結構
- 使用者資料:儲存在 HASH 中,鍵值為
user:<id>
。 - 狀態訊息:儲存在 HASH 中,鍵值為
status:<id>
。 - 首頁時間軸:儲存在 ZSET 中,鍵值為
home:<uid>
,分數為狀態訊息的釋出時間。
在未來的開發中,我們可以考慮加入更多的功能,例如:
- 使用者關注與粉絲管理:實作關注和取消關注的功能,並更新使用者的關注和粉絲列表。
- 狀態訊息的互動功能:加入按讚、轉發和回覆等功能,以增加使用者之間的互動。
- 搜尋功能:實作搜尋使用者和狀態訊息的功能,以提升使用者的體驗。
透過這些功能的擴充,我們可以建立一個更為完善和豐富的社交網路應用。
使用者追蹤功能實作:關注與取消關注
8.3 追蹤者與追蹤物件列表管理
在類別似Twitter的社交平台中,使用者可以透過追蹤他人來取得他們的最新動態。本文將討論如何管理使用者之間的追蹤關係,包括維護追蹤者列表和追蹤物件列表,並探討在開始或停止追蹤某人時如何更新使用者的首頁時間軸。
資料結構設計
為了實作追蹤功能,我們使用有序集合(ZSET)來儲存使用者的追蹤者和追蹤物件。ZSET中的成員是使用者ID,分數是追蹤開始的時間戳。
graph LR A[使用者A] -->|追蹤|> B[使用者B] B -->|被追蹤者|> A
圖表翻譯:
此圖示展示了使用者之間的追蹤關係。使用者A追蹤了使用者B,同時使用者B是被使用者A追蹤的物件。
程式碼實作
以下是實作追蹤功能的程式碼範例:
HOME_TIMELINE_SIZE = 1000
def follow_user(conn, uid, other_uid):
# 定義追蹤者和被追蹤者的ZSET鍵名
fkey1 = 'following:%s' % uid
fkey2 = 'followers:%s' % other_uid
# 檢查是否已經追蹤
if conn.zscore(fkey1, other_uid):
return None
now = time.time()
pipeline = conn.pipeline(True)
# 將使用者ID加入追蹤者和被追蹤者的ZSET中
pipeline.zadd(fkey1, {other_uid: now})
pipeline.zadd(fkey2, {uid: now})
# 取得追蹤者和被追蹤者的數量
pipeline.zcard(fkey1)
pipeline.zcard(fkey2)
# 取得被追蹤者的最新狀態訊息
pipeline.zrevrange('profile:%s' % other_uid, 0, HOME_TIMELINE_SIZE-1, withscores=True)
following, followers, status_and_score = pipeline.execute()[-3:]
# 更新使用者的追蹤者和被追蹤者數量
pipeline.hset('user:%s' % uid, 'following', following)
pipeline.hset('user:%s' % other_uid, 'followers', followers)
# 將被追蹤者的最新狀態訊息加入使用者的首頁時間軸
if status_and_score:
pipeline.zadd('home:%s' % uid, **dict(status_and_score))
# 保留首頁時間軸中最新的1000條狀態訊息
pipeline.zremrangebyrank('home:%s' % uid, 0, -HOME_TIMELINE_SIZE-1)
pipeline.execute()
return True
#### 內容解密:
1. **`follow_user`函式**:實作了使用者追蹤功能的邏輯,包括更新追蹤者和被追蹤者的ZSET,以及使用者的首頁時間軸。
2. **`HOME_TIMELINE_SIZE`變數**:定義了首頁時間軸中保留的狀態訊息數量。
3. **`zadd`方法**:將使用者ID和時間戳加入ZSET中,實作追蹤關係的建立。
4. **`zcard`方法**:取得ZSET中的成員數量,用於更新使用者的追蹤者和被追蹤者數量。
5. **`zrevrange`方法**:取得被追蹤者的最新狀態訊息,用於更新使用者的首頁時間軸。
6. **`hset`方法**:更新使用者的追蹤者和被追蹤者數量。
7. **`zremrangebyrank`方法**:保留首頁時間軸中最新的狀態訊息,刪除舊的狀態訊息。
#### 取消追蹤功能
取消追蹤功能的實作與追蹤功能相反,主要包括從ZSET中移除使用者ID,以及更新使用者的首頁時間軸。
```python
def unfollow_user(conn, uid, other_uid):
fkey1 = 'following:%s' % uid
fkey2 = 'followers:%s' % other_uid
# 檢查是否正在追蹤
if not conn.zscore(fkey1, other_uid):
return None
pipeline = conn.pipeline(True)
# 從ZSET中移除使用者ID
pipeline.zrem(fkey1, other_uid)
pipeline.zrem(fkey2, uid)
# 取得追蹤者和被追蹤者的數量
pipeline.zcard(fkey1)
pipeline.zcard(fkey2)
# 取得被取消追蹤者的最新狀態訊息
pipeline.zrevrange('profile:%s' % other_uid, 0, HOME_TIMELINE_SIZE-1)
following, followers, statuses = pipeline.execute()[-3:]
# 更新使用者的追蹤者和被追蹤者數量
pipeline.hset('user:%s' % uid, 'following', following)
pipeline.hset('user:%s' % other_uid, 'followers', followers)
# 從使用者的首頁時間軸中移除被取消追蹤者的狀態訊息
# 注意:這裡沒有實作,因為需要遍歷所有狀態訊息並刪除相關的狀態訊息
pipeline.execute()
return True
#### 內容解密:
1. **`unfollow_user`函式**:實作了取消追蹤功能的邏輯,包括更新追蹤者和被追蹤者的ZSET,以及使用者的首頁時間軸。
2. **`zrem`方法**:從ZSET中移除使用者ID,實作取消追蹤關係。
3. **`zcard`方法**:取得ZSET中的成員數量,用於更新使用者的追蹤者和被追蹤者數量。
未來可以進一步最佳化追蹤功能的實作,例如:
1. **提高效能**:最佳化資料函式庫查詢和更新操作,提高系統的整體效能。
2. **增加功能**:實作更多的社交功能,例如私信、通知等。
3. **改善使用者經驗**:提供更好的使用者介面和互動體驗,例如即時更新追蹤者和被追蹤者的數量。
透過不斷地改進和最佳化,我們可以建立一個更加完善和高效的社交平台。
## 發布或刪除狀態更新
在社群網路服務中,發布狀態更新是一項基本操作。人們發布狀態更新來分享想法,而其他人閱讀這些更新是因為他們對這些想法感興趣。上一節展示瞭如何建立狀態訊息,但沒有說明如何將狀態訊息放入個人時間軸或使用者關注者的首頁時間軸中。
### 狀態更新的處理流程
當使用者發布新的狀態更新時,狀態訊息需要被推播到其關注者的首頁時間軸中。處理這一操作的具體方法取決於發布使用者的關注者數量。如果使用者的關注者數量相對較少(例如,少於1,000),我們可以立即更新他們的首頁時間軸。但對於擁有大量關注者的使用者(例如,100萬甚至2,500萬),直接執行這些插入操作所需的時間可能超過使用者可以接受的等待時間。
為了確保發布操作的快速回應,我們將採取兩步走的策略。首先,我們將狀態ID新增到前1,000位關注者的首頁時間軸中,作為發布狀態訊息呼叫的一部分。根據像Twitter這樣的網站的統計資料,這應該能夠覆寫至少99.9%的發布使用者(Twitter的分析資料表明,大約有100,000至250,000名使用者的關注者超過1,000,這大約佔活躍使用者總數的0.1%)。這意味著只有頂部的0.1%的使用者需要額外的處理步驟。
#### 發布狀態訊息的程式碼實作
```python
def post_status(conn, uid, message, **data):
id = create_status(conn, uid, message, **data)
if not id:
return None
posted = conn.hget('status:%s' % id, 'posted')
if not posted:
return None
post = {str(id): float(posted)}
conn.zadd('profile:%s' % uid, **post)
syndicate_status(conn, uid, post)
return id
將狀態訊息推播給關注者的程式碼實作
POSTS_PER_PASS = 1000
def syndicate_status(conn, uid, post, start=0):
# 程式碼實作細節...
#### 內容解密:
post_status
函式:該函式負責發布新的狀態訊息。它首先呼叫create_status
函式來建立狀態訊息。如果建立成功,它將狀態ID新增到使用者的個人時間軸中,並呼叫syndicate_status
函式將狀態訊息推播給使用者的關注者。create_status
函式:雖然未在此列出,但該函式負責建立狀態訊息並傳回狀態ID。syndicate_status
函式:該函式負責將狀態訊息推播給使用者的關注者。它採用分批處理的方式,每次處理POSTS_PER_PASS
(1000)個關注者,以避免一次性處理大量資料導致的效能問題。狀態訊息的分發:對於擁有大量關注者的使用者,狀態訊息的分發被設計為一個非同步任務,以避免阻塞主執行緒,影響使用者經驗。
狀態更新的最佳化
為了最佳化狀態更新的處理,可以考慮以下幾點:
非同步處理:對於擁有大量關注者的使用者,可以使用非同步任務佇列來處理狀態訊息的分發,以避免阻塞主執行緒。
分批處理:將狀態訊息的分發分成多個批次進行,每次處理一定數量的關注者,以避免一次性處理大量資料導致的效能問題。
使用高效的資料結構:使用Redis的有序集合(ZSET)來儲存時間軸資料,可以高效地進行插入和刪除操作。
練習題:自定義關注者列表
除了基本的關注者列表外,Twitter還支援建立自定義的關注者列表。這些列表可以包含特定的使用者,並且可以檢視這些使用者的時間軸。請嘗試更新follow_user
和unfollow_user
函式,以支援自定義關注者列表的建立和更新。同時,請嘗試建立函式來取得自定義列表的時間軸。
提示
- 可以將自定義列表視為一種特殊的關注者列表。
- 可以使用Redis的HASH資料結構來儲存自定義列表的後設資料。
- 可以使用Redis的有序集合(ZSET)來儲存自定義列表中的使用者時間軸。
進一步最佳化
- 使用Pipeline操作:在進行多個Redis操作時,可以使用Pipeline操作來減少網路延遲。
- 使用Lua指令碼:對於複雜的操作,可以使用Lua指令碼來減少網路延遲和提高效能。
- 監控和調優:定期監控系統的效能,並根據需要進行調優,以確保系統的穩定性和可擴充套件性。
圖表翻譯:
此圖示描述了狀態更新的處理流程,包括建立狀態訊息、將狀態訊息新增到使用者的個人時間軸、以及將狀態訊息推播給使用者的關注者。
graph LR A[建立狀態訊息] --> B[新增到個人時間軸] B --> C[推播給關注者] C --> D[非同步處理大量關注者]
圖表翻譯: 此圖表呈現了狀態更新的處理流程,從建立狀態訊息開始,到將狀態訊息推播給使用者的關注者為止。對於擁有大量關注者的使用者,狀態訊息的分發被設計為非同步任務,以避免影響使用者經驗。