
作者 | 諳流科技魏祥臣
沒有意外,隨著模型規模的持續增長和應用場景的日益復雜,AI Infra 也自然的從"單體架構" -> "分布式架構"進行演進,例如:
在大模型訓練和推理階段,隨著模型規模的增長,需要通過多維度并行技術(數據并行、張量并行、流水線并行等)并發使用數百甚至數千個 GPU 才能滿足訓練需求;
在智能體應用階段,從能對話、寫文案的 Chatbot 到如今能自主規劃、工具調用、多 Agent 協作,工具越來越智能,調用鏈也越來越復雜;
再到各行業落地時,應用的業務主路徑開始集成 AI 能力,也對部署架構本身的高可靠、高可用及高性能提出了更多的要求
然而這個從單體架構到分布式架構的升級,最核心的變化就是通過消息中間件讓數據、模型、服務之間能夠異步、可靠、松耦合地協同工作,從而構建可擴展、可維護、可演進的 AI 平臺的基礎設施。
Pulsar 作為消息中間件的中流砥柱,以其更鮮明的存算分離、云原生特性,可發揮著更大的價值。
1 多模態:讓 Pulsar 直接吞進 超大消息,多模態訓練“零”切片
傳統的單模態模型,如自然語言處理(NLP)模型僅處理文本,計算機視覺(CV)模型僅處理圖像,自動語音識別(ASR)模型僅處理音頻,它們彼此獨立。多模態 AI 旨在讓機器能夠像人類一樣,通過融合和理解來自多種感官通道(如視覺、聽覺、語言)的信息來進行感知、推理和交互。這個給多模態訓練增加了不小的難度;
多模態 AI 系統處理的數據類型遠超傳統文本,包含了圖像、視頻、音頻、3D 點云等大體積的非結構化數據。這些數據單個文件的大小就可能從幾 MB 到幾 GB 不等。其他的消息隊列系統往往對單條消息的大小有嚴格限制(例如,Kafka 默認單條消息上限約 1 MB,調參后雖可放大,但需權衡副本同步壓力。),這迫使開發者在傳輸大文件時采用復雜的變通方案,如將文件存儲在對象存儲中,然后在消息中只傳遞文件的路徑或 URL。
這種方式雖然可行,但增加了系統的復雜性和處理延遲,并且無法充分利用消息隊列在數據流管理和處理方面的優勢 。
然而 Pulsar 原生支持超大消息體,即 pulsar 的 chunk message,Pulsar 的 Chunk Message 是多模態訓練的數據管道利器,它解決了大消息傳輸的完整性、順序性、簡化性三大問題,可顯著降低多模態數據管道的工程負擔,使開發者聚焦模型邏輯而非傳輸細節。
![]()
Pulsar Chunk Message(分塊消息)是 Apache Pulsar 提供的一種用于透明處理超大消息(>5MB)的機制。它允許生產者端自動將大消息拆分為多個小塊傳輸,并在消費者端自動重組,業務層無需感知分塊細節。
2 多模態:用 Pulsar 把文本、圖像、音頻流綁定到一起
多模態 AI 需要處理和融合的數據類型極其多樣化。系統需要同時處理文本(自然語言描述)、圖像(像素矩陣)、音頻(波形信號)、視頻(圖像序列和音頻流的組合)等多種異構數據。
在許多場景中,不同模態的數據在時間上存在緊密的依賴關系。例如,在視頻理解任務中,音頻中的對話內容需要與視頻中人物的口型、動作在時間上精確對齊;在自動駕駛場景中,激光雷達的點云數據、攝像頭的圖像數據和 GPS 的定位數據必須在同一時間點或時間窗口內進行融合,才能構建出對周圍環境準確的感知。因此,消息中間件不僅要能傳輸數據,還需要提供機制來保證跨模態數據的時間同步和順序性 。
利用 Pulsar 的 keyshare 消費模型,可以將同一 key 的數據總是被路由分配到同一實例完成聚合,方案如下:
時間同步:選定一個物理時鐘源(PTP/NTP/ 幀時鐘),所有模態 Producer 在本地打時間桶 ID(t-bucket),粒度 = 1 ms 或 1 幀間隔。
Produce 發送:每條消息把 t-bucket 放在 Pulsar 的事件時間( eventTime ,SDK 原生字段)里,同時作為路由 Key。
消費者使用 Key_Shared 訂閱,Key = t-bucket,Pulsar 可保證相同 Key 的消息只會發給同一消費者實例
收到模態 A、B、C 的同一桶消息后,再打包成一條 MultiModalFrame 喂給模型;
![]()
Key_Shared(鍵共享)是 Pulsar 的一種訂閱模式,它在 Shared 模式的基礎上增加了按消息 Key 的路由規則:相同 Key 的消息始終被分配到同一個消費者,而不同 Key 的消息可分布在多個消費者上并行處理,實現Key 級別的有序性與負載均衡。
3 模型訓練:用好 Pulsar 壓縮 Topic,實現 Checkpoint 秒級斷點續訓
模型訓練周期長、數據量大、集群規模大,出現中斷的概率顯著提高,且重啟代價高昂;
所以通常會使用Checkpoint機制來加速恢復的過程,但保存 Checkpoint 耗時較高,若存儲服務瞬時故障,寫入請求直接丟失,導致訓練狀態丟失。
引入 pulsar 作為中間層后,可以將異常數據跳過、Checkpoint 異步緩存、任務級重試等操作都交給 pulsar 的特性來解決,方案如下:
![]()
Checkpoint 數據具有明顯的歷史消息無效的特性,如果發生積壓時,只有最新的一條 checkpoint 才有價值,這時可以使用 Pulsar 的壓縮 Topic(Compaction Topic),壓縮 topic 將 Checkpoint Topic 從日志流變為 KV 存儲,僅保留每個 Key 的最新消息,自動清理歷史版本,這樣對比傳統方案(掃描 S3 文件列表 → 排序 → 下載)需要耗時 3-5 分鐘到直接接收最新 Key 的方案,耗時<1S;
![]()
Compaction Topic是 Apache Pulsar 提供的一種基于消息 Key 的日志壓縮機制,它會自動清理主題中每個 Key 的舊版本消息,僅保留最新版本,從而顯著減少主題體積、加速消費回溯,適用于"只關心最終狀態"的場景。
4 模型訓練:以 Pulsar 為“輸油管”:優化模型訓練中的 GPU 饑餓
在大規模模型訓練中,數據是驅動整個訓練過程的“燃料”,特別是針對擁有數十億甚至萬億級參數的深度學習模型,能高效且穩定的確保“燃料”能夠持續、穩定地供應給計算引擎(如 GPU 集群)是關鍵所在。
訓練這些龐然大物需要海量的訓練數據,這些數據通常以 TB 甚至 PB 計。數據加載和預處理的速度直接決定了 GPU 這一昂貴計算資源的利用率。有數據表明 I/O 延遲使 GPU 每步等待數百毫秒,空閑率可達30-50%。為了充分利用昂貴的計算資源,必須確保數據能夠以足夠快的速度被加載到每個計算節點的內存中,如果數據供給速度跟不上模型消耗數據的速度,就會有大量時間浪費在等待數據上,即所謂的“數據饑餓”問題。
歷史的架構中,數據預處理模塊與訓練模塊存在耦合的情況,然而耦合的模塊可能相互影響從而降低了 GPU 的讀取效率;
![]()
這種架構中,非常適合引入 Pulsar 在其中作為緩沖層,在數據平面預處理服務獨立擴展,訓練節點只專注消費,利用 Pulsar 的高吞吐特性,“喂養”GPU 的數據高速且穩定;
并且當 GPU 消費慢時,還可以利用 Pulsar 的背壓機制,預處理消費時自動降低預取速率,避免 OOM,從而讓整個鏈路更加健壯;
不止如此,還可以繼續針對 topic 的消費進行積壓監控,如果出現積壓,輔以 K8S 的KEDA 機制+ Pulsar 的 Share 消費類型可以整個擴縮容過程更加平滑和穩定;
背壓(Backpressure)是 Pulsar 中用于防止生產者過載消費者的流量控制機制。當消費者處理速度跟不上生產者發送速度時,系統通過多級反饋控制主動減緩上游生產速率,避免內存溢出、數據丟失和系統崩潰。
KEDA(Kubernetes Event-driven Autoscaling),是一種基于事件驅動的自動擴容解決方案,支持通過外部事件源動態調整 Pod 副本數;
Share 消費類型(Shared Subscription)是 Apache Pulsar 的一種訂閱模式,允許多個消費者綁定到同一個訂閱名上,消息通過輪詢機制分發給不同的消費者,每個消息僅會被分發給一個消費者,不保證消息順序,適合高吞吐、無需順序消費的場景。
5 智能體:利用 Pulsar 輕量化主題(non-persistent)解決 AI 應用的異步通信難題
模型迭代日新月異,企業正在積極把 AI 能力嵌入業務流程。然而,企業應用從調用傳統微服務應用 API 接口 到 調用大模型“生成式”的 API 接口過程中,一個顯著的特征是任務處理時耗變的很長,傳統微服務應用通常能實現毫秒級響應,而 AI 應用的處理周期跨度極大——從幾分鐘到數小時不等;
這就意味著原本微服務間的同步調用就不再適用,可將同步調用改為異步通知來解決長耗時的阻塞;改為異步通知后,那又如何能實現同步調用的即時通信吶,可以采取以下模型:
Agent1 在啟動時注冊一個專屬于自己的用于接收回包的非持久化 Topic(non-persistent Topic),非持久化 topic 非常輕量化,數據不落盤存儲,生命周期可由 TTL 自動 或人工回收,Agent1 可使用獨占消費模型進行消費該 topic
當 Agent1 有長耗時的調用模型請求時,向正常 topic 發送請求,并由模型處理模塊處理;該 Topic 為常規 topic,具備消息持久化、消息回放、海量積壓等隊列特性
當 LLM 處理模塊完成后,根據請求包中的回包地址進行回包投遞
基于此模型,可以利用 Pulsar 的 Persistent-topic,將長時耗任務進行異步化處理,利用 pulsar 的高可用、低延時的特性來保障請求任務的可靠、解耦和削峰填谷;又可以利用 pulsar 的 Non-Persistent-topic 的輕量化,實現百萬級創建,快速回收等能力
![]()
Non-persistent Topic:是 pulsar 的一種 topic 類型,是“不落盤、純內存” 的消息通道——數據不會寫入磁盤、不會做副本復制,Broker 宕機或進程重啟即丟失,因此極致輕量、低延遲,適合“可丟、可重試、要快、要大量”的短時消息場景。
6 智能體:Pulsar 可為事件驅動的智能體提供“新基建”
AI Agent 的概念正在經歷一場深刻的變革,從簡單的對話式 AI(Chatbot)向復雜的獨立實體轉變。AI Agent 就是將一個大模型(大腦)和一系列工具(感官與四肢)組裝起來,形成的一個能夠感知和改變外部環境的智能程序。
以創建一個營銷 Agent 為例,采用 ReAct 的模型,Agent 可能首先從 CRM 中提取客戶數據,使用 API 收集市場趨勢,并在新信息出現時不斷調整策略。通過通過記憶保留上下文并迭代查詢,Agent 能夠生成更準確、更相關的輸出。
![]()
當外部接口越來越豐富,Agent 需要不斷的擴展收集信息來源,包括其他 Agent、工具和外部系統等等,以便做出更精準的決策。
![]()
而這,從系統架構設計的角度上講,就是一個分布式系統問題。這和微服務時代面臨的挑戰相似,因為在微服務中,各個組件必須高效地進行通信,而不產生瓶頸或僵化的依賴關系。也和微服務架構系統一樣,它們的輸出不僅僅應該回流到 AI 應用程序中,它們還應該流入其他關鍵系統,如數據倉庫、CRM、CDP 和客戶成功平臺。所以完全可以將 Agent 理解為:有“大腦”的微服務;
從微服務的架構演進來看,Agent 的未來是事件驅動的,事件驅動的架構需要一個高效的消息中間件作為“基建”,因為消息中間的特性可以很好的匹配事件驅動需要的橫向擴展性、低延遲、松耦合、事件持久化等訴求
![]()
Pulsar 除了以上在消息中間件的優勢外,還提供了 Function Mesh 的能力,利用 Function 的能力可以更近一步簡化 AI Agent 的架構:
![]()
ReAct 模式:ReAct(Reasoning and Action)是目前應用最廣泛、最經典的 AI Agent 運行模式之一 。其核心思想是模擬人類解決復雜問題的過程,通過一個 “思考(Thought)→ 行動(Action)→ 觀察(Observation)”的循環來逐步推進任務 。
Pulsar Function :Pulsar 提供的輕量級、Serverless 流處理框架,定位是“用寫普通函數的代碼量,完成 ETL、過濾、聚合、打標簽等實時計算”。它把“消息 → 計算 → 消息”的閉環直接跑在 Pulsar 集群內部,簡單場景不需要額外部署 Flink、Storm 等重型流處理引擎。
![]()
7 智能體:具身智能需要“傳感器流 + 任務隊列”
在具身智能的場景中,既需要處理傳感器讀數流(連續、有序的數據),也需要處理獨立的指令或任務(這些任務需要獨立處理);
例如:一個機器人 agent 在處理任務時,首先機器人的視覺或遙測傳感器持續發布事件流,這些事件需要按順序來處理或者來理解當前所處環境的變化;然后當機器人的 AI 決定采取行動,例如“拾取物體”或“導航到位置”時,這些任務會被添加到工作隊列中。這些行動消息可能需要多個執行器模塊(消費者)會分擔這些任務。每個任務消息會被分配給一個執行器,執行器在完成任務后會進行確認。如果任務失敗,執行器可以發送負向確認(表示失敗),然后另一個實例可以重試。
我們回顧上述的過程,雖然都是利用消息管道進行消息傳遞,但是這是兩種不同的數據類型:
類似傳感器流,生產者將數據追加到一個無界、有序的日志(即流)中。消費者隨后按順序從這個日志中讀取數據,并維護流中的偏移量(offset)。每個分區內的順序是有保障的,消息在消費時不會被移除,這就是 kafka 專注的 stream(流)場景,它提供了高吞吐量和分區的嚴格排序,這使得它非常適合處理有序的事件流。
類似任務消息,生產者將消息發送到隊列,每條消息只由一個消費者處理(即使有多個消費者在監聽)。消費者從隊列中拉取消息,并在處理完成后確認每條消息,消息隨后會從隊列中移除。隊列擅長分發可以并行處理且無需全局排序要求的任務或工作。這就是 rabbitmq、rocketmq 專注的 queues(消息)場景,專注于每個消費者只處理一條消息,并具備消息重試和死信隊列等能力。
然后,在越來越多的 ai 場景,需要兩者兼具,因為 AI 代理在實時環境中觀測,同時必須執行可靠的操作。這正是 pulsar 持續專注的方向,將流 + 消息進行融合,Pulsar 原生支持多種消息語義。其靈活的訂閱模式(獨占、共享、故障轉移、鍵共享)讓你能在同一平臺上為不同任務選擇合適的工具。這意味著系統擴展更少,組件間集成更簡單——這對復雜的 AI 代理架構來說是一個很大的優勢。
Pulsar 的流(Streaming)和消息(Messaging)場景結合,通過 Key-shard(鍵共享)、Failover(故障轉移)、Exclusive(獨占)、Shared(共享)四種訂閱模式來實現
![]()
參考
https://huggingface.co/spaces/nanotron/ultrascale-playbook
https://seanfalconer.medium.com/the-future-of-ai-agents-is-event-driven-9e25124060d6
https://www.linkedin.com/pulse/kafkas-role-powering-next-wave-event-driven-agentic-ai-jeyaraman-xq0kc
https://streamnative.io/blog/streams-vs-queues-why-your-agents-need-both--and-why-pulsar-protocol-delivers
https://dzone.com/articles/agentic-ai-using-apache-kafka-as-event-broker-with-agent2agent-protocol
特別聲明:以上內容(如有圖片或視頻亦包括在內)為自媒體平臺“網易號”用戶上傳并發布,本平臺僅提供信息存儲服務。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.