在現今社群網路應用中,即時訊息傳遞至關重要。本文將探討如何利用 Redis 的發布/訂閱機制,結合 Python,建構一個具備串流 API 的簡易社群網路,實作訊息的即時過濾與傳遞。我們將深入程式碼細節,解析串流 HTTP 伺服器的運作方式,以及如何利用 Redis 有效地發布和刪除狀態訊息。同時,我們也將探討如何實作不同型別的訊息過濾器,例如根據取樣率的隨機過濾、根據關鍵字的追蹤過濾、根據使用者關係的關注過濾,以及根據地理位置的區域過濾,以滿足不同應用場景的需求。最後,我們將簡要討論未來可能的發展方向,例如效能最佳化和更進階的過濾機制。
建構簡單社群網路的串流 API
簡介
在本章中,我們將探討如何建構一個簡單的社群網路,並實作串流 API 以支援即時訊息傳遞。我們將介紹如何使用 Redis 的 PUBLISH 和 SUBSCRIBE 功能來實作訊息的即時過濾和傳遞。
串流 HTTP 伺服器的實作
為了實作串流 API,我們需要建立一個能夠處理 HTTP 請求並傳回串流回應的伺服器。以下是一個簡單的實作範例:
def handle_request(handler):
method = handler.path.rsplit('/')[-1].split('.')[0]
name = None
args = None
if method == 'filter':
data = cgi.FieldStorage(
fp=handler.rfile,
headers=handler.headers,
environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': handler.headers['Content-Type']}
)
for name in data:
if name in FILTERS:
args = data.getfirst(name).lower().split(',')
break
if not args:
return handler.send_error(401, "no filter provided")
else:
args = handler.query
handler.send_response(200)
handler.send_header('Transfer-Encoding', 'chunked')
handler.end_headers()
quit = [False]
for item in filter_content(id, method, name, args, quit):
try:
handler.wfile.write('%X\r\n%s\r\n' % (len(item), item))
except socket.error:
quit[0] = True
if not quit[0]:
handler.wfile.write('0\r\n\r\n')
程式碼解析:
- 首先,我們從請求路徑中提取方法名稱,並檢查是否為過濾請求。
- 如果是過濾請求,我們從 POST 請求中解析過濾引數。
- 如果沒有提供過濾引數,則傳回錯誤回應。
- 否則,我們將查詢引數作為過濾引數。
- 我們傳送 HTTP 200 回應,並設定 Transfer-Encoding 為 chunked。
- 我們使用
filter_content
函式生成符合過濾條件的訊息,並將其寫入回應串流中。 - 如果發生 socket 錯誤,我們設定 quit 旗標為 True。
- 如果沒有發生錯誤,我們傳送結束 chunk 的訊息。
串流訊息的過濾
為了支援即時訊息傳遞,我們需要實作訊息的過濾功能。我們將使用 Redis 的 PUBLISH 和 SUBSCRIBE 功能來實作此功能。
更新狀態訊息發布和刪除功能
首先,我們需要更新狀態訊息的發布和刪除功能,以支援串流訊息的過濾。以下是更新後的 create_status
和 delete_status
函式:
def create_status(conn, uid, message, **data):
pipeline = conn.pipeline(True)
pipeline.hget('user:%s' % uid, 'login')
pipeline.incr('status:id:')
login, id = pipeline.execute()
if not login:
return None
data.update({
'message': message,
'posted': time.time(),
'id': id,
'uid': uid,
'login': login,
})
pipeline.hmset('status:%s' % id, data)
pipeline.hincrby('user:%s' % uid, 'posts')
pipeline.publish('streaming:status:', json.dumps(data))
pipeline.execute()
return id
def delete_status(conn, uid, status_id):
key = 'status:%s' % status_id
lock = acquire_lock_with_timeout(conn, key, 1)
# ... (略)
程式碼解析:
- 在
create_status
函式中,我們新增了一行程式碼,將訊息發布到 Redis 的streaming:status:
頻道。 - 這樣,當使用者發布新訊息時,訊息將被發布到 Redis 頻道,並被串流 API 監聽。
串流 API 的實作
為了實作串流 API,我們需要建立一個能夠監聽 Redis 頻道並將訊息傳遞給使用者的機制。以下是簡單的實作範例:
def filter_content(id, method, name, args, quit):
# ... (略)
conn = redis.Redis()
pubsub = conn.pubsub()
pubsub.subscribe('streaming:status:')
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
# ... (略)
yield json.dumps(data)
程式碼解析:
- 我們建立了一個 Redis 連線,並使用
pubsub
方法監聽streaming:status:
頻道。 - 當收到訊息時,我們將訊息解析為 JSON 格式,並根據過濾條件進行處理。
- 如果訊息符合過濾條件,我們將其傳遞給使用者。
在未來,我們可以進一步擴充套件串流 API 的功能,例如支援更多種過濾條件、提高訊息傳遞的效率等。同時,我們也可以考慮使用更先進的技術,例如 WebSocket 或 gRPC,以實作更高效的即時通訊。
建構簡單社群網路的串流 API
在前面的章節中,我們已經探討瞭如何使用 Redis 實作一個簡單的社群網路。在本章節中,我們將繼續探討如何為這個社群網路新增串流 API,以實作即時更新的功能。
發布與刪除狀態訊息
首先,我們需要實作發布和刪除狀態訊息的功能。當使用者發布新的狀態訊息時,我們需要將該訊息發布到 Redis 的特定頻道中,以便其他使用者可以接收到該訊息。同樣地,當使用者刪除狀態訊息時,我們也需要將刪除的訊息發布到相同的頻道中。
def post_status(conn, uid, message, **data):
# ... (省略其他程式碼)
pipeline = conn.pipeline(True)
pipeline.hset(key, 'message', message)
pipeline.hset(key, 'uid', uid)
pipeline.hset(key, 'posted', int(time.time()))
# ... (省略其他程式碼)
pipeline.publish('streaming:status:', json.dumps(status))
pipeline.execute()
# ... (省略其他程式碼)
def delete_status(conn, uid, status_id):
key = 'status:%s' % status_id
lock = acquire_lock(conn, key)
if not lock:
return None
if conn.hget(key, 'uid') != str(uid):
return None
pipeline = conn.pipeline(True)
status = conn.hgetall(key)
status['deleted'] = True
pipeline.publish('streaming:status:', json.dumps(status))
pipeline.delete(key)
pipeline.zrem('profile:%s' % uid, status_id)
pipeline.zrem('home:%s' % uid, status_id)
pipeline.hincrby('user:%s' % uid, 'posts', -1)
pipeline.execute()
release_lock(conn, key, lock)
return True
內容解密:
在 post_status
函式中,我們使用 Redis 的 PUBLISH
命令將新發布的狀態訊息發布到 streaming:status:
頻道中。
在 delete_status
函式中,我們同樣將刪除的狀態訊息發布到相同的頻道中,並標記為已刪除。
這些訊息將被訂閱該頻道的客戶端接收,用於即時更新。
接收串流訊息
為了接收這些串流訊息,我們需要訂閱 streaming:status:
頻道。我們可以使用 Redis 的 pubsub
物件來實作這一點。
@redis_connection('social-network')
def filter_content(conn, id, method, name, args, quit):
match = create_filters(id, method, name, args)
pubsub = conn.pubsub()
pubsub.subscribe(['streaming:status:'])
for item in pubsub.listen():
message = item['data']
decoded = json.loads(message)
if match(decoded):
if decoded.get('deleted'):
yield json.dumps({'id': decoded['id'], 'deleted': True})
else:
yield message
if quit[0]:
break
pubsub.reset()
內容解密:
我們首先建立一個 pubsub
物件並訂閱 streaming:status:
頻道。
然後,我們進入一個迴圈,不斷接收來自該頻道的訊息。
對於每個接收到的訊息,我們使用 create_filters
函式建立的篩選器進行過濾。
如果訊息符合篩選條件,我們將根據訊息是否被刪除來產生不同的輸出。
篩選訊息
篩選訊息是串流 API 中的一個重要環節。我們需要根據不同的篩選條件來決定哪些訊息應該被傳送給客戶端。
def create_filters(id, method, name, args):
if method == 'sample':
return SampleFilter(id, args)
elif name == 'track':
return TrackFilter(args)
elif name == 'follow':
return FollowFilter(args)
elif name == 'location':
return LocationFilter(args)
raise Exception("Unknown filter")
內容解密:
create_filters
函式根據不同的篩選方法和引數建立不同的篩選器。
例如,如果方法是 sample
,則建立 SampleFilter
。
如果名稱是 track
、follow
或 location
,則分別建立對應的篩選器。
如果篩選方法或名稱未知,則丟擲異常。
實作篩選器
不同的篩選器實作了不同的篩選邏輯。例如,SampleFilter
實作了 Twitter 風格的 firehose、gardenhose 和 spritzer 存取級別的功能。
class SampleFilter:
def __init__(self, id, args):
self.id = id
self.sample_rate = float(args.get('sample_rate', 0.1))
def __call__(self, message):
import random
return random.random() < self.sample_rate
內容解密:
SampleFilter
根據設定的取樣率來決定是否保留訊息。
在初始化時,我們設定了取樣率。
在呼叫 SampleFilter
例項時,我們使用亂數來決定是否保留訊息。
未來,我們可以進一步最佳化串流 API 的效能,例如透過增加更多的篩選條件或最佳化訊息處理邏輯。
此外,我們也可以考慮使用更先進的技術,如機器學習,來改進訊息篩選的準確性。
graph LR A[使用者發布狀態訊息] --> B[Redis發布訊息到頻道] B --> C[客戶端訂閱頻道] C --> D[篩選器過濾訊息] D --> E[傳送符合條件的訊息給客戶端]
圖表翻譯:
此圖表展示了使用者發布狀態訊息後,訊息如何透過 Redis 的發布/訂閱機制傳遞給客戶端的流程。
首先,使用者發布狀態訊息,Redis 將訊息發布到特定的頻道。
客戶端訂閱該頻道後,可以接收到訊息。
篩選器對接收到的訊息進行過濾,只有符合條件的訊息才會被傳送給客戶端。
參考資料
- Redis官方檔案:https://redis.io/docs/
- Redis發布/訂閱命令:https://redis.io/commands/pubsub/
建構簡單社交網路的串流API過濾器實作
在開發社交網路應用程式時,串流API扮演著至關重要的角色,特別是在處理即時資料時。Twitter的串流API提供了一種高效的方式來取得即時推文資料。本章節將探討如何利用Python建構簡單的社交網路串流API過濾器,主要涵蓋取樣過濾器(SampleFilter)、追蹤過濾器(TrackFilter)、關注過濾器(FollowFilter)和位置過濾器(LocationFilter)的實作。
取樣過濾器(SampleFilter)
取樣過濾器允許開發者根據特定的百分比隨機取樣推文。以下是SampleFilter
的實作程式碼:
import random
def SampleFilter(id, args):
percent = int(args.get('percent', ['10'])[0], 10)
ids = range(100)
shuffler = random.Random(id)
shuffler.shuffle(ids)
keep = set(ids[:max(percent, 1)])
def check(status):
return (status['id'] % 100) in keep
return check
內容解密:
SampleFilter
函式接收id
和args
作為引數,其中args
是一個字典,包含了GET請求中的引數。- 使用
id
作為種子值初始化隨機數生成器,以確保對於相同的id
,取樣結果是一致的。 - 透過
random.Random(id)
建立一個隨機數生成器,並使用它來打亂ids
列表。 - 根據指定的百分比(
percent
)決定保留的ID數量,並將其儲存在keep
集合中。 check
函式檢查推文的ID是否在保留的ID集合中。
追蹤過濾器(TrackFilter)
追蹤過濾器允許開發者根據特定的片語或單詞來過濾推文。以下是TrackFilter
的實作:
def TrackFilter(list_of_strings):
groups = []
for group in list_of_strings:
group = set(group.lower().split())
if group:
groups.append(group)
def check(status):
message_words = set(status['message'].lower().split())
for group in groups:
if len(group & message_words) == len(group):
return True
return False
return check
內容解密:
TrackFilter
接收一個字串列表,並將每個字串轉換為小寫並分割成單詞集合。check
函式檢查推文中的單詞是否包含任何一個片語。- 使用Python集合的交集運算(
&
)來檢查推文是否匹配指定的片語。
關注過濾器(FollowFilter)
關注過濾器允許開發者根據特定的使用者名稱來過濾推文,無論是推文的釋出者還是提及的使用者。以下是FollowFilter
的實作:
def FollowFilter(names):
names = set('@' + name.lower().lstrip('@') for name in names)
def check(status):
message_words = set(status['message'].lower().split())
message_words.add('@' + status['login'].lower())
return bool(message_words & names)
return check
內容解密:
FollowFilter
將使用者名稱轉換為小寫並統一加上@
字首,以保持一致性。check
函式檢查推文中的單詞(包括釋出者名稱)是否與指定的使用者名稱匹配。- 使用集合的交集運算來高效地檢查匹配情況。
位置過濾器(LocationFilter)
位置過濾器允許開發者根據地理位置(經緯度)來過濾推文。以下是LocationFilter
的實作:
def LocationFilter(list_of_boxes):
boxes = []
for start in range(0, len(list_of_boxes) - 3, 4):
boxes.append(list(map(float, list_of_boxes[start:start + 4])))
def check(status):
location = status.get('location')
if not location:
return False
lat, lon = map(float, location.split(','))
for box in boxes:
if box[0] <= lat <= box[2] and box[1] <= lon <= box[3]:
return True
return False
return check
內容解密:
LocationFilter
接收一個定義地理區域的座標列表,並將其轉換為浮點數。check
函式檢查推文中的位置資訊是否在指定的地理區域內。- 透過比較推文的經緯度與區域邊界來確定是否匹配。