現代分散式系統的設計與實作充滿挑戰,特別是在需要處理大量並發請求、維持系統高可用性,同時確保訊息可靠傳遞的場景中。Space Beaver 衛星通訊平台作為一個真實的生產環境案例,為我們展示了如何透過 Actor Model 行動者模式、Kubernetes 容器協調、Ray 分散式運算框架與 FastAPI 非同步處理技術,構建一個高效能、可擴展的通訊系統。本文將深入剖析這個案例中的核心技術決策、架構設計理念與實作細節,協助讀者理解現代分散式系統開發的關鍵要素與最佳實踐。

Actor Model 行動者模式核心概念

Actor Model 是一種並行運算的數學模型,它透過訊息傳遞的方式實現不同執行單元之間的通訊與協作。在這個模型中,每個 Actor 都是一個獨立的運算實體,擁有自己的狀態和行為。Actor 之間不共享記憶體,所有的互動都必須透過非同步訊息傳遞來完成。這種設計從根本上避免了傳統多執行緒程式設計中的鎖競爭、死鎖等常見問題。

Actor Model 的核心特性包含三個關鍵要素。首先是封裝性,每個 Actor 維護自己的內部狀態,外部無法直接存取或修改這些狀態。這種強制的狀態隔離大幅簡化了並行程式的推理與除錯。其次是非同步訊息傳遞,Actor 透過傳送訊息與其他 Actor 通訊,訊息的傳遞是非同步的,傳送者不需要等待接收者處理完成。最後是動態性,Actor 可以在執行時期動態建立新的 Actor,形成複雜的階層結構。

在 Space Beaver 系統中,Actor Model 被用於處理來自衛星裝置的訊息流。每個衛星裝置對應一個 Satellite Actor,負責與該裝置的通訊。使用者相關的操作則由 User Actor 處理。這種設計使得系統可以輕鬆應對大量並發連線,每個 Actor 獨立處理自己的任務,不會因為某個 Actor 的處理速度慢而影響其他 Actor 的運作。

相較於傳統的執行緒池模式,Actor Model 提供了更高層次的抽象。開發者不需要手動管理執行緒的生命週期、同步原語的使用,或者擔心資源競爭的問題。Actor 系統會自動處理訊息的排程與分發,將複雜的並行控制邏輯隱藏在框架內部。這使得應用程式碼更加清晰,專注於業務邏輯的實現。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 140

package "Actor System 架構" {
  
  rectangle "Satellite Actor Pool" {
    component "Satellite Actor 1" as sa1
    component "Satellite Actor 2" as sa2
    component "Satellite Actor N" as san
  }
  
  rectangle "User Actor Pool" {
    component "User Actor 1" as ua1
    component "User Actor 2" as ua2
    component "User Actor M" as uam
  }
  
  rectangle "支援服務" {
    component "Mail Client" as mail
    component "SMS Gateway" as sms
    component "Message Queue" as mq
  }
  
  cloud "衛星裝置網路" as devices
  cloud "使用者端點" as users
}

devices --> sa1
devices --> sa2
devices --> san

sa1 --> mq
sa2 --> mq
san --> mq

mq --> ua1
mq --> ua2
mq --> uam

ua1 --> mail
ua1 --> sms
ua2 --> mail
ua2 --> sms
uam --> mail
uam --> sms

mail --> users
sms --> users

@enduml

Satellite Actor 非同步訊息處理機制

Satellite Actor 在 Space Beaver 系統中扮演關鍵角色,負責與衛星裝置建立連線、接收來自太空的訊息,並將這些訊息轉發到適當的處理流程。由於衛星通訊的特殊性,包括不穩定的網路連線、高延遲的訊息傳輸,以及可能的訊號中斷,Satellite Actor 必須具備強健的錯誤處理與重試機制。

整個訊息處理流程採用非同步設計模式,充分利用 Python 的 asyncio 框架。在主要的執行迴圈中,Satellite Actor 首先嘗試與衛星裝置建立認證連線,這個過程本身就需要處理各種可能的網路異常。一旦成功登入,系統進入週期性的訊息檢查循環,透過可配置的延遲參數控制檢查頻率,在即時性與資源消耗之間取得平衡。

import asyncio
import logging
from typing import AsyncIterator
import base64
from google.protobuf import text_format

class SatelliteActor:
    """
    衛星通訊 Actor 負責處理與衛星裝置的訊息交換
    實現可靠的訊息接收與錯誤恢復機制
    """
    
    def __init__(self, device_id: str, delay: int = 30, 
                 max_internal_retries: int = 5):
        self.device_id = device_id
        self.delay = delay
        self.max_internal_retries = max_internal_retries
        self.running = True
        self.authenticated = False
        self.message_count = 0
        
        logging.info(f"初始化 Satellite Actor: {device_id}")
    
    async def run(self):
        """
        主要執行迴圈處理衛星訊息接收與錯誤恢復
        實現自動重連與狀態管理機制
        """
        internal_retries = 0
        
        while self.running:
            try:
                await self._establish_connection()
                
                while self.authenticated and self.running:
                    await asyncio.sleep(self.delay)
                    await self._check_and_process_messages()
                    internal_retries = 0
                    
            except ConnectionError as e:
                logging.error(f"連線錯誤 {e},嘗試重新建立連線")
                self.authenticated = False
                internal_retries += 1
                
                if internal_retries > self.max_internal_retries:
                    logging.critical(
                        f"達到最大重試次數 {self.max_internal_retries},停止服務"
                    )
                    raise
                
                backoff_delay = min(300, self.delay * (2 ** internal_retries))
                logging.info(f"等待 {backoff_delay} 秒後重試")
                await asyncio.sleep(backoff_delay)
                
            except Exception as e:
                logging.exception(f"處理訊息時發生未預期錯誤: {e}")
                internal_retries += 1
                
                if internal_retries > self.max_internal_retries:
                    raise
    
    async def _establish_connection(self):
        """
        與衛星裝置建立認證連線
        包含身份驗證與通訊通道初始化
        """
        try:
            logging.info(f"正在連線到衛星裝置 {self.device_id}")
            
            credentials = await self._fetch_device_credentials()
            await self._authenticate(credentials)
            await self._initialize_communication_channel()
            
            self.authenticated = True
            logging.info(f"成功連線到衛星裝置 {self.device_id}")
            
        except Exception as e:
            logging.error(f"連線建立失敗: {e}")
            raise ConnectionError(f"無法連線到裝置 {self.device_id}") from e
    
    async def _check_and_process_messages(self):
        """
        檢查並處理來自衛星裝置的新訊息
        實現訊息批次處理與錯誤隔離
        """
        try:
            messages = await self._fetch_messages()
            
            if not messages:
                logging.debug(f"裝置 {self.device_id} 無新訊息")
                return
            
            logging.info(f"接收到 {len(messages)} 則新訊息")
            
            for message_item in messages:
                try:
                    await self._process_single_message(message_item)
                    self.message_count += 1
                except Exception as e:
                    logging.error(
                        f"處理訊息失敗 {message_item.get('id', 'unknown')}: {e}",
                        exc_info=True
                    )
                    
        except Exception as e:
            logging.error(f"訊息檢查過程發生錯誤: {e}")
            raise
    
    async def _fetch_messages(self) -> list:
        """
        從衛星裝置 API 取得待處理訊息
        實現分頁與限流控制
        """
        pass
    
    async def _fetch_device_credentials(self) -> dict:
        """
        取得裝置認證憑證
        支援多種認證方式
        """
        pass
    
    async def _authenticate(self, credentials: dict):
        """
        執行裝置身份認證
        """
        pass
    
    async def _initialize_communication_channel(self):
        """
        初始化通訊通道
        """
        pass
    
    async def _process_single_message(self, message_item: dict):
        """
        處理單一訊息項目
        """
        pass

錯誤處理策略採用多層次的設計。對於可恢復的錯誤,例如暫時性的網路故障,系統會自動重試,並使用指數退避演算法計算重試間隔,避免在服務異常時造成額外的負載。重試計數器在成功處理訊息後重置,確保偶發性錯誤不會累積導致服務中斷。對於嚴重的錯誤,系統會在達到最大重試次數後主動拋出例外,觸發上層的容錯機制。

這種設計確保了系統在面對不穩定的衛星通訊環境時,仍能保持高可用性。即使某個 Satellite Actor 因為特定裝置的問題而暫時無法工作,其他 Actor 依然可以正常處理各自負責的裝置訊息,不會出現單點故障導致整個系統癱瘓的情況。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 120

start
:Satellite Actor 啟動;
:初始化重試計數器;
while (系統運作中?) is (是)
  :嘗試建立連線;
  if (連線成功?) then (是)
    :設定認證狀態;
    while (已認證且運作中?) is (是)
      :等待檢查週期;
      :取得待處理訊息;
      if (有新訊息?) then (是)
        :處理訊息批次;
        fork
          :解碼訊息內容;
        fork again
          :驗證訊息格式;
        fork again
          :轉發到 User Actor;
        end fork
        :重置錯誤計數器;
      else (否)
        :繼續監聽;
      endif
    endwhile (否)
  else (否)
    :記錄連線錯誤;
    :增加重試計數;
    if (達到最大重試次數?) then (是)
      :觸發告警;
      stop
    else (否)
      :計算退避延遲;
      :等待後重試;
    endif
  endif
endwhile (否)
stop

@enduml

訊息解碼與非同步處理流程

從衛星裝置接收到的原始資料通常採用緊湊的二進位格式,以最小化傳輸頻寬的使用。這些資料經過 Base64 編碼後透過 HTTP API 傳輸,在 Satellite Actor 端需要進行解碼與解析。整個解碼流程設計為非同步迭代器模式,能夠高效處理包含多條訊息的批次資料。

Protocol Buffers 作為訊息序列化格式,提供了高效的二進位編碼與跨語言支援。定義好的訊息結構確保了前後端的資料格式一致性,同時也便於未來的版本演進。解碼過程首先將 Base64 編碼的字串還原為二進位資料,接著使用 Protocol Buffers 的解析器將二進位資料轉換為結構化的訊息物件。

import base64
import logging
from typing import AsyncIterator
from google.protobuf import text_format
from dataclasses import dataclass

@dataclass
class CombinedMessage:
    """
    統一的訊息格式封裝來自不同來源的訊息
    支援衛星裝置、電子郵件與 SMS 等多種協定
    """
    text: str
    to: str
    protocol: str
    msg_from: str
    from_device: bool = False
    timestamp: int = 0
    priority: int = 0

class MessageDecoder:
    """
    訊息解碼器處理二進位訊息的解析與驗證
    支援批次處理與錯誤恢復
    """
    
    def __init__(self):
        self.decoded_count = 0
        self.error_count = 0
    
    async def decode_message(self, item: dict) -> AsyncIterator[CombinedMessage]:
        """
        解碼單一訊息項目並產生標準化的訊息物件
        使用非同步迭代器支援批次訊息處理
        """
        raw_msg_data = item.get("data", "")
        device_id = item.get("deviceId", "unknown")
        
        if not raw_msg_data:
            logging.warning(f"裝置 {device_id} 傳送空訊息")
            return
        
        logging.info(f"開始解碼來自裝置 {device_id} 的訊息")
        
        try:
            messagedata = MessageDataPB()
            bin_data = base64.b64decode(raw_msg_data)
            messagedata.ParseFromString(bin_data)
            
            logging.debug(
                f"解碼結果: {text_format.MessageToString(messagedata)}"
            )
            
            if len(messagedata.message) < 1:
                logging.warning(
                    f"裝置 {device_id} 的訊息封包不包含任何訊息內容"
                )
                return
            
            for message in messagedata.message:
                if not self._validate_message(message):
                    logging.error(f"訊息驗證失敗: {message}")
                    self.error_count += 1
                    continue
                
                yield CombinedMessage(
                    text=message.text,
                    to=message.to,
                    protocol=message.protocol,
                    msg_from=device_id,
                    from_device=True,
                    timestamp=message.timestamp if hasattr(message, 'timestamp') else 0,
                    priority=message.priority if hasattr(message, 'priority') else 0
                )
                
                self.decoded_count += 1
                
        except Exception as e:
            logging.exception(
                f"解碼訊息時發生錯誤,裝置 {device_id}: {e}"
            )
            self.error_count += 1
            raise
    
    def _validate_message(self, message) -> bool:
        """
        驗證訊息內容的完整性與合法性
        檢查必要欄位與格式約束
        """
        if not hasattr(message, 'text') or not message.text:
            logging.error("訊息缺少文字內容")
            return False
        
        if not hasattr(message, 'to') or not message.to:
            logging.error("訊息缺少接收者資訊")
            return False
        
        if not hasattr(message, 'protocol') or not message.protocol:
            logging.error("訊息缺少協定資訊")
            return False
        
        if len(message.text) > 10000:
            logging.error(f"訊息內容超過長度限制: {len(message.text)}")
            return False
        
        return True

class MessageProcessor:
    """
    訊息處理器協調解碼後訊息的分發與路由
    實現訊息佇列與負載平衡
    """
    
    def __init__(self, user_pool, decoder: MessageDecoder):
        self.user_pool = user_pool
        self.decoder = decoder
        self.processed_count = 0
    
    async def process_message(self, item: dict):
        """
        處理來自衛星裝置的訊息項目
        包含解碼、驗證與分發到對應的 User Actor
        """
        try:
            async for message in self.decoder.decode_message(item):
                await self._route_message(message)
                self.processed_count += 1
                
        except Exception as e:
            logging.error(f"處理訊息項目失敗: {e}", exc_info=True)
            raise
    
    async def _route_message(self, message: CombinedMessage):
        """
        根據訊息目標路由到對應的處理 Actor
        實現智慧負載分配策略
        """
        try:
            target_actor = self._select_target_actor(message)
            
            await target_actor.handle_message.remote(message)
            
            logging.debug(
                f"訊息已路由到 Actor,目標: {message.to}"
            )
            
        except Exception as e:
            logging.error(f"訊息路由失敗: {e}")
            raise
    
    def _select_target_actor(self, message: CombinedMessage):
        """
        選擇適當的 User Actor 處理訊息
        考慮負載平衡與訊息親和性
        """
        user_hash = hash(message.to)
        pool_size = self.user_pool.size()
        actor_index = user_hash % pool_size
        
        return self.user_pool.get_actor(actor_index)

非同步迭代器的使用是這段程式碼的亮點之一。由於一個訊息封包可能包含多條獨立的訊息,使用迭代器模式可以逐一處理每條訊息,而不需要將所有訊息載入記憶體。這種設計在處理大量訊息時特別有效,能夠保持記憶體使用的穩定,同時也便於在處理過程中實現細粒度的錯誤處理。

訊息驗證是確保系統穩定性的重要環節。在將訊息傳遞給下游處理流程之前,驗證器會檢查訊息的完整性,包括必要欄位的存在性、資料格式的正確性,以及內容長度的合理性。不符合規範的訊息會被記錄並丟棄,避免異常資料對系統造成影響。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 120

start
:接收原始訊息資料;
:Base64 解碼;
if (解碼成功?) then (是)
  :Protocol Buffers 解析;
  if (解析成功?) then (是)
    :提取訊息陣列;
    if (包含訊息?) then (是)
      while (遍歷每條訊息) is (有更多訊息)
        :驗證訊息格式;
        if (驗證通過?) then (是)
          :建立標準化訊息物件;
          :記錄解碼成功;
          :產生訊息給迭代器;
        else (否)
          :記錄驗證錯誤;
          :增加錯誤計數;
        endif
      endwhile (無)
      :解碼流程完成;
    else (否)
      :記錄空訊息警告;
    endif
  else (否)
    :記錄解析錯誤;
    :增加錯誤計數;
  endif
else (否)
  :記錄解碼失敗;
  :增加錯誤計數;
endif
stop

@enduml

User Actor 設計與訊息路由

User Actor 在 Space Beaver 系統中扮演核心的業務邏輯處理角色。它負責將來自衛星裝置的訊息轉換為使用者可以接收的格式,並透過適當的通道傳遞給使用者。同時,它也處理使用者發起的訊息,將這些訊息轉發到對應的衛星裝置。這種雙向的訊息處理能力使得 Space Beaver 成為一個完整的通訊平台。

User Actor 的設計遵循單一職責原則,每個 Actor 實例專注於處理特定範圍內的使用者請求。透過將使用者空間分片,系統可以水平擴展 User Actor 的數量,從而支援更多的並發使用者。Actor Pool 的管理策略確保了負載的均衡分配,避免某些 Actor 過載而其他 Actor 閒置的情況。

import logging
import platform
from dataclasses import dataclass
from typing import Optional
import ray
from prometheus_client import Counter

EMAIL_PROTOCOL = "email"
SMS_PROTOCOL = "sms"
SATELLITE_PROTOCOL = "satellite"

@dataclass
class User:
    """使用者資料模型"""
    username: str
    email: str
    twilio_number: Optional[str]
    device_serial: Optional[str]

@dataclass
class Device:
    """裝置資料模型"""
    serial_number: str
    user_id: str

class LazyNamedPool:
    """延遲初始化的命名 Actor Pool"""
    
    def __init__(self, name: str, size: int):
        self.name = name
        self.size = size
        self._pool = None
    
    def get_pool(self):
        """取得或初始化 Actor Pool"""
        if self._pool is None:
            self._pool = self._initialize_pool()
        return self._pool
    
    def _initialize_pool(self):
        """初始化 Actor Pool 的實際邏輯"""
        pass

class MailClient:
    """郵件客戶端封裝"""
    
    def __init__(self, settings):
        self.settings = settings
    
    async def send_message(self, data: str, msg_from: str, msg_to: str):
        """發送電子郵件"""
        logging.info(f"發送郵件從 {msg_from}{msg_to}")

class UserActorBase:
    """
    User Actor 基礎類別處理使用者相關的訊息操作
    支援多種通訊協定的訊息轉發與路由
    """
    
    def __init__(self, settings, idx: int, poolsize: int):
        self.settings = settings
        self.idx = idx
        self.poolsize = poolsize
        
        logging.info(f"初始化 User Actor {idx},運行於 {platform.machine()}")
        
        self.satellite_pool = LazyNamedPool("satellite", poolsize)
        self.outbound_sms = LazyNamedPool("sms", poolsize)
        self.mail_client = MailClient(settings)
        
        self.messages_forwarded = Counter(
            "messages_forwarded",
            "已轉發的訊息數量",
            labelnames=["actor_idx", "protocol"]
        )
        
        self.messages_rejected = Counter(
            "messages_rejected",
            "被拒絕的訊息數量",
            labelnames=["actor_idx", "reason"]
        )
        
        self.user_cache = {}
        self.cache_hits = 0
        self.cache_misses = 0
        
        logging.info(f"User Actor {idx} 初始化完成")
    
    def _fetch_user(self, msg: CombinedMessage) -> User:
        """
        根據訊息來源與協定取得對應的使用者資料
        實現多層快取策略提升查詢效能
        """
        cache_key = self._generate_cache_key(msg)
        
        if cache_key in self.user_cache:
            self.cache_hits += 1
            return self.user_cache[cache_key]
        
        self.cache_misses += 1
        
        try:
            if msg.from_device:
                user = self._fetch_user_by_device(msg.msg_from)
            elif msg.protocol == EMAIL_PROTOCOL:
                user = self._fetch_user_by_email(msg.to)
            elif msg.protocol == SMS_PROTOCOL:
                user = self._fetch_user_by_phone(msg.to)
            else:
                raise ValueError(f"不支援的協定類型: {msg.protocol}")
            
            self.user_cache[cache_key] = user
            return user
            
        except Exception as e:
            logging.error(f"取得使用者資料失敗: {e}")
            self.messages_rejected.labels(
                actor_idx=str(self.idx),
                reason="user_not_found"
            ).inc()
            raise
    
    def _generate_cache_key(self, msg: CombinedMessage) -> str:
        """生成快取鍵值"""
        if msg.from_device:
            return f"device:{msg.msg_from}"
        elif msg.protocol == EMAIL_PROTOCOL:
            return f"email:{msg.to}"
        elif msg.protocol == SMS_PROTOCOL:
            return f"phone:{msg.to}"
        return ""
    
    def _fetch_user_by_device(self, device_id: str) -> User:
        """透過裝置序號查詢使用者"""
        logging.info(f"查詢裝置 {device_id} 的使用者")
        device = Device.objects.get(serial_number=device_id)
        return User.objects.get(id=device.user_id)
    
    def _fetch_user_by_email(self, email: str) -> User:
        """透過電子郵件查詢使用者"""
        logging.info(f"查詢電子郵件 {email} 的使用者")
        return User.objects.get(email=email)
    
    def _fetch_user_by_phone(self, phone: str) -> User:
        """透過電話號碼查詢使用者"""
        logging.info(f"查詢電話號碼 {phone} 的使用者")
        return User.objects.get(twilio_number=str(phone))
    
    async def handle_message(self, input_msg: CombinedMessage):
        """
        處理接收到的訊息並根據來源與目標進行路由
        支援裝置到使用者與使用者到裝置的雙向通訊
        """
        logging.info(f"處理訊息: {input_msg}")
        
        try:
            user = self._fetch_user(input_msg)
            
            if input_msg.from_device:
                await self._forward_device_to_user(user, input_msg)
            else:
                await self._forward_user_to_device(user, input_msg)
            
            self.messages_forwarded.labels(
                actor_idx=str(self.idx),
                protocol=input_msg.protocol
            ).inc()
            
            logging.info(f"訊息處理完成: {input_msg.msg_from} -> {input_msg.to}")
            
        except Exception as e:
            logging.error(f"訊息處理失敗: {e}", exc_info=True)
            self.messages_rejected.labels(
                actor_idx=str(self.idx),
                reason="processing_error"
            ).inc()
            raise
    
    async def _forward_device_to_user(self, user: User, msg: CombinedMessage):
        """
        將來自裝置的訊息轉發給使用者
        根據使用者偏好選擇適當的通知通道
        """
        email_msg = {
            "data": msg.text,
            "msg_from": f"{user.username}@spacebeaver.com",
            "msg_to": user.email
        }
        
        await self.mail_client.send_message(**email_msg)
        logging.info(f"訊息已透過電子郵件轉發給 {user.username}")
    
    async def _forward_user_to_device(self, user: User, msg: CombinedMessage):
        """
        將使用者的訊息轉發到對應的衛星裝置
        確保訊息格式符合裝置通訊協定
        """
        if not user.device_serial:
            raise ValueError(f"使用者 {user.username} 未綁定衛星裝置")
        
        device_msg = {
            "protocol": SATELLITE_PROTOCOL,
            "msg_from": msg.msg_from,
            "msg_to": user.device_serial,
            "data": msg.text
        }
        
        satellite_actor = self.satellite_pool.get_pool().get_actor_by_device(
            user.device_serial
        )
        
        await satellite_actor.send_message.remote(**device_msg)
        logging.info(f"訊息已轉發到裝置 {user.device_serial}")

@ray.remote(max_restarts=-1, max_task_retries=3)
class UserActor(UserActorBase):
    """
    Ray 分散式 User Actor 實作
    支援自動重啟與任務重試機制
    """
    pass

訊息路由邏輯是 User Actor 的核心功能。根據訊息的來源判斷處理方向,如果訊息來自衛星裝置,則需要將其轉發給對應的使用者,通常透過電子郵件或簡訊。反之,如果訊息來自使用者,則需要找到使用者綁定的衛星裝置,將訊息透過衛星通道傳送出去。這種雙向路由機制實現了完整的通訊閉環。

快取策略的引入大幅提升了系統效能。使用者資料的查詢通常涉及資料庫存取,在高並發場景下可能成為效能瓶頸。透過在 Actor 層級實現快取,相同使用者的重複查詢可以直接從記憶體中獲取結果,避免不必要的資料庫操作。快取的失效策略需要考慮資料一致性與記憶體使用之間的平衡。

監控指標的收集是生產環境運維的重要組成部分。透過 Prometheus 客戶端函式庫,User Actor 暴露了訊息轉發數量、拒絕原因等關鍵指標。這些指標不僅協助運維人員了解系統的運作狀態,也為容量規劃與效能優化提供了資料支援。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 130

start
:User Actor 接收訊息;
:生成快取鍵值;
if (快取命中?) then (是)
  :從快取取得使用者;
  :增加快取命中計數;
else (否)
  :增加快取未命中計數;
  if (訊息來自裝置?) then (是)
    :透過裝置序號查詢使用者;
  elseif (協定為電子郵件?) then (是)
    :透過電子郵件查詢使用者;
  elseif (協定為 SMS?) then (是)
    :透過電話號碼查詢使用者;
  else (其他)
    :記錄不支援的協定;
    :增加拒絕計數;
    stop
  endif
  :快取使用者資料;
endif

if (訊息來自裝置?) then (是)
  :準備電子郵件格式;
  :透過郵件客戶端發送;
  :記錄轉發成功;
else (否)
  :檢查裝置綁定狀態;
  if (已綁定裝置?) then (是)
    :準備衛星訊息格式;
    :選擇目標 Satellite Actor;
    :轉發到衛星裝置;
    :記錄轉發成功;
  else (否)
    :記錄裝置未綁定錯誤;
    :增加拒絕計數;
    stop
  endif
endif

:更新轉發指標;
stop

@enduml

Ray Serve 與 FastAPI 服務整合

Ray Serve 是建構於 Ray 分散式運算框架之上的模型服務層,專為機器學習模型與 Python 應用程式的線上服務設計。它提供了自動擴展、負載平衡與故障恢復等企業級功能,同時保持了簡潔的 API 介面。在 Space Beaver 系統中,Ray Serve 被用於暴露 RESTful API,使得外部系統可以透過標準的 HTTP 協定與系統互動。

FastAPI 作為現代 Python Web 框架的代表,以其高效能、自動文件生成與型別檢查等特性受到開發者青睞。將 FastAPI 與 Ray Serve 結合,可以快速建構出既具備良好開發體驗,又擁有強大擴展能力的服務端點。這種組合特別適合需要處理高並發請求,同時包含複雜業務邏輯的應用場景。

from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, validator
from typing import Optional, List
import ray
from ray import serve
import logging
from datetime import datetime

app = FastAPI(
    title="Space Beaver API",
    description="衛星通訊平台 API 服務",
    version="1.0.0"
)

class SMSMessage(BaseModel):
    """SMS 訊息請求模型"""
    phone_number: str = Field(..., description="目標電話號碼")
    message_text: str = Field(..., min_length=1, max_length=1000, 
                               description="訊息內容")
    priority: int = Field(default=0, ge=0, le=10, 
                         description="訊息優先級")
    
    @validator('phone_number')
    def validate_phone(cls, v):
        if not v.startswith('+'):
            raise ValueError('電話號碼必須包含國碼,以 + 開頭')
        return v

class MessageResponse(BaseModel):
    """訊息回應模型"""
    message_id: str
    status: str
    timestamp: datetime

@serve.deployment(
    name="sms-service",
    num_replicas=3,
    max_concurrent_queries=100,
    ray_actor_options={
        "num_cpus": 0.5,
        "memory": 512 * 1024 * 1024
    }
)
@serve.ingress(app)
class SMSService:
    """
    SMS 服務部署類別處理簡訊相關的 API 請求
    整合 Ray Actor 系統實現訊息的分散式處理
    """
    
    def __init__(self):
        logging.info("初始化 SMS Service")
        self.user_actor_pool = self._initialize_user_pool()
        self.message_counter = 0
    
    def _initialize_user_pool(self):
        """初始化 User Actor Pool"""
        pool_size = 10
        actors = [
            UserActor.remote(settings=None, idx=i, poolsize=pool_size)
            for i in range(pool_size)
        ]
        return actors
    
    @app.post("/api/v1/sms/send", response_model=MessageResponse)
    async def send_sms(
        self,
        message: SMSMessage,
        background_tasks: BackgroundTasks
    ):
        """
        發送 SMS 訊息端點
        支援非同步處理與背景任務
        """
        try:
            logging.info(
                f"接收 SMS 請求: {message.phone_number}, "
                f"優先級: {message.priority}"
            )
            
            message_id = self._generate_message_id()
            
            combined_msg = CombinedMessage(
                text=message.message_text,
                to=message.phone_number,
                protocol=SMS_PROTOCOL,
                msg_from="api",
                from_device=False,
                priority=message.priority
            )
            
            actor_idx = self._select_actor(message.phone_number)
            actor = self.user_actor_pool[actor_idx]
            
            background_tasks.add_task(
                self._process_message_async,
                actor,
                combined_msg
            )
            
            self.message_counter += 1
            
            return MessageResponse(
                message_id=message_id,
                status="accepted",
                timestamp=datetime.utcnow()
            )
            
        except Exception as e:
            logging.error(f"處理 SMS 請求失敗: {e}", exc_info=True)
            raise HTTPException(
                status_code=500,
                detail=f"訊息處理失敗: {str(e)}"
            )
    
    @app.get("/api/v1/health")
    async def health_check(self):
        """健康檢查端點"""
        return {
            "status": "healthy",
            "service": "sms-service",
            "messages_processed": self.message_counter,
            "timestamp": datetime.utcnow().isoformat()
        }
    
    @app.get("/api/v1/metrics")
    async def metrics(self):
        """指標查詢端點"""
        return {
            "total_messages": self.message_counter,
            "actor_pool_size": len(self.user_actor_pool),
            "timestamp": datetime.utcnow().isoformat()
        }
    
    def _generate_message_id(self) -> str:
        """生成唯一訊息識別碼"""
        import uuid
        return f"msg_{uuid.uuid4().hex[:16]}"
    
    def _select_actor(self, phone_number: str) -> int:
        """選擇處理訊息的 Actor"""
        return hash(phone_number) % len(self.user_actor_pool)
    
    async def _process_message_async(self, actor, message: CombinedMessage):
        """非同步處理訊息"""
        try:
            await actor.handle_message.remote(message)
            logging.info(f"訊息處理完成: {message.to}")
        except Exception as e:
            logging.error(f"背景任務處理失敗: {e}")

def deploy_service():
    """部署 Ray Serve 服務"""
    ray.init(address="auto")
    serve.start(detached=True, http_options={"host": "0.0.0.0", "port": 8000})
    SMSService.deploy()
    logging.info("SMS Service 部署完成")

if __name__ == "__main__":
    deploy_service()

這個服務實作展示了如何將 FastAPI 的路由處理能力與 Ray Serve 的分散式執行能力結合。透過 @serve.deployment 裝飾器,整個 FastAPI 應用程式被包裝成一個可以在 Ray 叢集中分散式部署的服務。副本數量、並發查詢限制與資源分配等參數可以靈活配置,滿足不同的效能需求。

背景任務的使用是處理耗時操作的有效模式。當 API 接收到請求後,立即返回接受狀態給客戶端,而實際的訊息處理則在背景執行緒中非同步完成。這種模式既保證了 API 的回應速度,又確保了訊息處理的可靠性。如果背景任務失敗,可以透過監控系統發現並採取補救措施。

健康檢查與指標暴露端點是生產環境部署的標準配置。負載平衡器可以透過健康檢查端點判斷服務實例的可用性,將流量導向健康的實例。指標端點則提供了服務運作狀態的即時資訊,可以整合到監控系統中進行長期追蹤與分析。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 140

package "Ray Serve 部署架構" {
  
  cloud "外部請求" as client
  
  rectangle "HTTP 層" {
    component "負載平衡器" as lb
    component "Ray Serve Head" as head
  }
  
  rectangle "服務副本" {
    component "SMS Service 副本 1" as svc1
    component "SMS Service 副本 2" as svc2
    component "SMS Service 副本 3" as svc3
  }
  
  rectangle "Actor Pool" {
    component "User Actor Pool" as uap
    component "Satellite Actor Pool" as sap
  }
  
  rectangle "後端服務" {
    database "資料庫" as db
    component "郵件服務" as mail
    component "SMS Gateway" as sms_gw
  }
}

client --> lb
lb --> head
head --> svc1
head --> svc2
head --> svc3

svc1 --> uap
svc2 --> uap
svc3 --> uap

uap --> sap
uap --> db
uap --> mail
uap --> sms_gw

@enduml

Space Beaver 案例研究為我們展示了如何在真實的生產環境中應用現代分散式系統技術。Actor Model 提供了清晰的並行處理模型,Kubernetes 確保了容器化應用的可靠部署,Ray Serve 則簡化了分散式服務的建構過程。這些技術的組合不是隨意的堆疊,而是經過深思熟慮的架構決策,每個元件都在解決特定的問題。

玄貓認為,分散式系統的設計需要在複雜度與可維護性之間取得平衡。過度的抽象會增加系統理解的難度,而過於簡單的設計又無法滿足擴展性需求。Space Beaver 的架構展示了一種務實的方法,在保持核心邏輯清晰的同時,透過成熟的框架與工具來處理分散式系統的固有複雜性。

從技術選型的角度來看,Ray 生態系統的整合性是其主要優勢。無論是分散式運算、模型服務還是資料處理,都可以在統一的框架內完成,減少了不同工具之間的整合成本。但這也意味著團隊需要對 Ray 有深入的理解,才能充分發揮其潛力。對於已經在使用其他分散式框架的團隊,遷移成本是需要考慮的因素。

未來的發展方向可能包括更智慧的負載平衡策略、基於機器學習的異常檢測,以及更細粒度的資源管理。隨著 Serverless 技術的成熟,部分無狀態的服務元件可能會遷移到 FaaS 平台,進一步降低營運成本。同時,可觀測性的提升也是重要的演進方向,透過分散式追蹤與結構化日誌,可以更快速地定位與解決問題。

對於準備建構類似系統的團隊,玄貓建議從小規模開始,逐步驗證架構的可行性。充分的測試環境與監控系統是必不可少的基礎設施。同時,團隊成員對分散式系統原理的理解程度,往往比工具的選擇更為重要。只有深刻理解了並行、一致性、容錯等核心概念,才能在面對實際問題時做出正確的決策。