![]()
作者:東燈原標題:用 MoonBit 實現 CRDTs 算法并構建實時協作應用
一.引言
當你在 Google Docs 中與同事同時編輯一份文檔,或者在 Figma 中與設計師協作調整界面時,你有沒有想過:為什么兩個人同時在同一位置輸入文字,最終結果卻不會互相覆蓋或產生亂碼?
這背后隱藏著分布式系統中最具挑戰性的問題之一:并發沖突( Concurrent Conflict)。當多個用戶同時編輯同一份數據,且這些編輯需要在不同設備之間同步時,如何保證所有人最終看到相同的結果?
本文將帶你深入理解實時協作的核心算法演進——從經典的操作轉換(OT)到經典的 CRDTs 算法 RGA,再到現代的 EG-Walker。我們不僅會解釋這些算法的原理,還會用 MoonBit 實現算法的核心邏輯,并最終展示如何用它構建一個簡單的、斷網重連可合并的協作編輯器。
二.實時協作的核心挑戰
假設我們要構建一個多人協作的文本編輯器。每個用戶在自己的設備上都有一份文檔副本,當用戶進行編輯時,操作會通過網絡同步給其他用戶。為了保證流暢的編輯體驗,用戶的輸入應該立即生效,而不是等待服務器確認。
問題來了:當兩個用戶同時編輯時,會發生什么?讓我們考慮這個場景:

這就是并發沖突的本質:相同的操作序列,以不同順序應用,可能產生不同的結果 。
我們需要的是一種機制,無論操作以什么順序到達,最終所有人看到的結果都相同,而且尊重所有編輯參與者的貢獻。這個性質叫做 最終一致性 (Eventual Consistency) 。
三.操作轉換( OT )
1、原理與簡單實現
操作轉換(Operational Transformation,OT)是最早用于解決實時協作沖突的算法,誕生于 1989 年。Google Docs、Etherpad 等早期協作工具都采用了這種方案。OT 的基本思路是:既然操作之間會互相影響,那就在應用操作之前,根據已發生的操作對其進行“轉換”,使其適應當前狀態。

看看 OT 是如何解決沖突的:
初始文檔是 AB 。Alice 在位置 1 插入 X ,本地變成 AXB ;Bob 在位置 1 插入 Y ,本地變成 AYB 。現在雙方需要同步對方的操作。
當 Alice 收到 Bob 的操作 insert(1, 'Y') 時,她不能直接執行——因為她已經在位置 1 插入了 X ,后面的字符都向右移了一位。OT 發現 Bob 的插入位置(1)不在 Alice 插入位置之前,于是把位置 +1,變成 insert(2, 'Y') 。Alice 執行后得到 AXYB 。
同樣,Bob 收到 Alice 的 insert(1, 'X') 。但 Alice 的插入位置(1)不比 Bob 的(1)大,所以不調整。Bob 直接執行,在位置 1 插入 X ,也得到 AXYB 。
不妨簡單用 MoonBit 實現一下:
enum Op {
Insert(Int, Char) // Insert(位置, 字符)
Delete(Int) // Delete(位置)
}/// 轉換函數:將 op1 轉換為在 op2 已經執行后的等效操作
fn transform(op1 : Op, op2 : Op) -> Op {
match (op1, op2) {
(Insert(pos1, ch), Insert(pos2, _)) =>
Insert(if pos1 <= pos2 { pos1 } else { pos1 + 1 }, ch)
(Insert(pos1, ch), Delete(pos2)) =>
Insert(if pos1 <= pos2 { pos1 } else { pos1 - 1 }, ch)
// ... 其他 case 類似
}
}
2、OT 存在的問題
OT 在工業界有廣泛應用(Google Docs 就基于 OT),但它有一些根本性的問題:
1、需要中央服務器 :OT 需要一個權威的服務器來確定操作的全局順序。沒有服務器,就無法確定誰的操作“先”發生。
2、轉換規則復雜度爆炸 :如果有 N 種操作類型,就需要定義 N2 個轉換規則。當操作類型增多(如富文本的加粗、斜體、鏈接等),復雜度急劇上升。
3、長分支合并極慢 :如果兩個用戶離線編輯了很長時間,重新連接時需要轉換大量操作,性能很差。

四.RGA:一種經典的序列 CRDT
CRDT(Conflict-free Replicated Data Type)采用了完全不同的思路: 不是在收到操作后轉換它,而是設計一種數據結構,使得無論操作以什么順序應用,結果都相同 。這就像設計一種特殊的“加法”——無論你按什么順序把數字加起來,結果都一樣。數學上,這要求操作滿足 交換律 和 結合律 。
1、RGA:一種經典的序列 CRDT
RGA(Replicated Growable Array)是 2011 年提出的一種序列 CRDT,專門用于解決協同文本編輯中的沖突問題。它的核心思想很簡單: 用相對位置代替絕對位置 。

還是用 Alice 和 Bob 的例子。初始文檔是 AB ,Alice 和 Bob 同時在位置 1 插入字符——Alice 插入 X ,Bob 插入 Y 。
OT 的做法是調整位置坐標。但 RGA 換了個思路: 不用數字位置,用“插在誰后面”來描述插入點 。
具體來說:
Alice 的操作不再是“在位置 1 插入 X”,而是“在 A 后面插入 X”
Bob 的操作是“在 A 后面插入 Y”
兩個操作都想插在 A 后面,怎么辦?RGA 給每個字符分配一個 全局唯一的 ID 。這個 ID 由兩部分組成:
用戶 ID :每個用戶有一個唯一標識(如 Alice 是 A ,Bob 是 B )
本地計數器 :每個用戶維護一個遞增的計數器,每次插入新字符時 +1
所以 A@1 表示“Alice 的第 1 個操作”, B@1 表示“Bob 的第 1 個操作”。當兩個字符想插入同一位置時,比較 ID 來決定順序——先比本地計數器,計數器相同時再比用戶 ID(作為 tie-breaker)。這里兩個計數器都是 1,所以比較用戶 ID: B > A ,因此 B@1 排在 A@1 前面,結果就是 A → Y → X → B ,即 AYXB 。
如果用 MoonBit 實現,我們可以先定義每個節點的類型和它的 compare 規則:
/// 唯一標識符
struct ID {
peer : UInt64 // 用戶 ID
counter : Int // 本地計數器
} derive(Eq)/// 比較兩個 ID,用于解決并發插入沖突
impl Compare for ID with compare(self, other) {
// 先比較 counter,再比較 peer(打破平局)
if self.counter != other.counter {
other.counter - self.counter
} else if self.peer > other.peer { -1 }
else if self.peer < other.peer { 1 }
else { 0 }
}
插入時,在目標位置之后找到正確的插入點——跳過所有 ID 更大的節點:
/// 在 target 之后插入,返回插入位置
fn find_insert_pos(order : Array[ID], target_pos : Int, new_id : ID) -> Int {
let mut pos = target_pos + 1
while pos < order.length() && new_id.compare(order[pos]) > 0 {
pos = pos + 1 // new_id 更小,繼續往后找
}
pos
}
我們剛才假設了只有插入的情況,對于刪除問題,RGA 采用墓碑(Tombstone)策略:刪除字符時不真正移除,只標記為“已刪除”。
為什么不能真刪除?考慮這個場景:Alice 刪除了 B,同時 Bob(還沒收到刪除)在 B 后面插入 X。如果 B 真的沒了,Bob 的“在 B 后面插入”就找不到參照物了。墓碑讓 B 保留在數據結構中,只是渲染時跳過,這樣 Bob 的操作仍然有效。
/// RGA 節點
struct RGANode {
id : ID
char : Char
mut deleted : Bool // 墓碑標記
}
/// 刪除:標記為墓碑
fn RGANode::delete(self : RGANode) -> Unit {
self.deleted = true
}/// 渲染:跳過墓碑
fn render(nodes : Array[RGANode]) -> String {
let sb = StringBuilder::new()
for node in nodes {
if !node.deleted { sb.write_char(node.char) }
}
sb.to_string()
}
在上面的簡單實現當中,為了更加簡潔易懂,我們采用的是數組來存儲 RGA 的節點。而熟悉數據結構的讀者很輕松就可以發現:RGA 會存在頻繁的插入情況,因此鏈表也許更適配這種算法。而實際的工程中則經常使用更加穩健高效的結構如 B+ Tree 或跳表實現它。
2、RGA 的問題
RGA 解決了并發沖突問題,不需要中央服務器,支持 P2P 同步,但它也有顯著的缺點:
元數據膨脹 :每個字符都需要存儲 ID(工程上很容易達到 16+ 字節)和前驅引用,一篇 10 萬字的文檔,元數據可能比內容還大。
墓碑累積 :刪除的字符永遠保留在內存中。一篇編輯多次的文檔,可能 90% 的數據都是墓碑,而且文字上可能還有其他維度,比如富文本,會進一步加劇這個缺點造成的影響。
加載緩慢 :從磁盤加載文檔時,需要重建整個數據結構,這是 O(n) 甚至 O(n log n) 的操作。

五.Event Graph Walker:更好的方案
1、原理介紹
Event Graph Walker(簡稱 Eg-walker)是由 Joseph Gentle 和 Martin Kleppmann 在 2024 年提出的新 CRDT 算法。
前面我們看到,OT 操作簡單(只有位置索引)但需要中央服務器;CRDT 支持 P2P 但元數據膨脹嚴重。Eg-walker 的核心洞察是:兩者可以結合,即存儲時用簡單索引,合并時臨時構建 CRDT。
操作像 OT 一樣輕量,只記錄 insert(pos, char) 和 delete(pos) 。需要合并并發操作時,臨時重放歷史、構建 CRDT 狀態來解決沖突,合并完就丟掉。
可能很多讀者會這種“臨時構建 CRDT 解決問題的方式”存在一些性能方面的顧慮,我們的確要承認雖然臨時構建確實有開銷,但是由于大部分時間并不需要 CRDT 參與編輯工作,只有同步并發編輯的時候才需要,而且 Eg-Walker 的性質很明顯支持增量構建與局部構建,只需要從快照構建或者再沖突區域構建 CRDT 解決沖突即可。而且可以設想的是,在操作歷史越來越復雜的情況下,臨時構建會比維護一個會一直增長的結構更加穩健高效。

2、代碼實現 1)基礎數據結構
首先是操作的定義。與 RGA 使用“在某個 ID 后面插入”的相對定位不同,Eg-walker 直接使用數字位置索引,就像 OT 一樣簡單:
enum SimpleOp {
Insert(Int, String) // Insert(位置, 內容)
Delete(Int, Int) // Delete(位置, 長度)
}
接下來是 事件(Event) 的定義。事件是對操作的包裝,添加了因果關系信息:
struct Event {
id : ID // 唯一標識符
deps : Array[ID] // 依賴的事件(父節點)
op : SimpleOp // 實際的操作內容
lamport : Int // Lamport 時間戳,用于排序
}
然后我們就可以根據他們定義出一個事件圖(Event Graph):
struct EventGraph {
events : Map[ID, Event] // 所有已知事件
frontiers : Array[ID] // 當前最新的事件 ID 集合
}
這里定義中的 frontiers 記錄了“當前版本”——那些沒有被任何其他事件依賴的事件。如果讀者熟悉 Git 的一些概念,那么可以把它理解為 Git 中當前所有分支的 HEAD 指針集合。
2)添加事件與維護 Frontier
當收到新事件時,除了將事件存入當前的事件表中,還需要更新 frontier。由于 frontier 記錄的是“沒有后續事件的事件”,當新事件依賴某個舊 head 時,說明這個舊 head 已經有了后續,不再是“最新的”了,需要從 frontier 中移除,然后把新事件加入 frontier。
fn EventGraph::add_event(self : EventGraph, event : Event) -> Unit {
self.events[event.id] = event
self.frontiers = self.heads.retain(frontier => !event.deps.contains(frontier))
self.frontiers.push(event.id)
}
3)LCA(最近公共祖先) 與拓撲排序合并兩個版本需要解決兩個問題:
1、找到分叉點(在 Event Graph 上找到 LCA ) :確定從哪里開始重放
2、確定重放順序(根據 Lamport 拓撲排序 ) :按因果關系排序事件
這兩部分都屬于比較基本的圖論問題,相信讀者在查閱資料后可以很快的實現出來。不過需要注意的是,在工業實現 Eg-Walker 算法時,我們通常不使用常規介紹的算法求 LCA,而是對數據結構進行改進,應用一些緩存機制來提高效率。
4)合并算法
現在我們有了所有組件,可以實現完整的合并算法了:
fn EventGraph::merge(
self : EventGraph,
local_frontiers : Array[ID],
remote_frontiers : Array[ID], // 遠程 peer 的 frontiers,隨事件一起發送
remote_events : Array[Event]
) -> String {
// 步驟 1:將遠程事件添加到事件圖
for event in remote_events {
self.add_event(event)
}
// 步驟 2:找到本地版本和遠程版本的 LCA(用 VersionVector 取交集)
let lca = self.find_lca(local_frontiers, remote_frontiers)
// 步驟 4:收集從 LCA 到兩個分支的所有事件
let events_to_replay = self.collect_events_after(lca)
// 步驟 5:按 Lamport 時間戳拓撲排序
let sorted = self.topological_sort(events_to_replay)
// 步驟 6:創建臨時 RGA,重放所有事件
let temp_rga = RGA::new()
for event in sorted {
self.apply_to_rga(temp_rga, event)
}
// 步驟 7:返回最終文本,丟棄臨時 RGA
temp_rga.to_string()
}
合并流程可以總結為三個階段:
Retreat(回退) :找到 LCA,確定需要重放的事件范圍
Collect(收集) :收集兩個分支上的所有事件,按 Lamport 時間戳拓撲排序
Advance(推進) :創建臨時 RGA,按順序重放所有事件,用 CRDT 解決沖突
六.Lomo 與開發一個協作文本編輯器
1、什么是 Loro/Lomo
Loro 是一個基于 Eg-walker 算法的高性能 CRDT 庫,由 Rust 實現。它支持多種數據類型(文本、列表、Map、可移動列表、樹結構等),提供豐富的協作功能,被用于構建實時協作應用。而 Lomo 是 Loro 的 MoonBit 移植版本,與 Loro Rust 版本保持二進制兼容,這意味著用 lomo 生成的文檔可以被 Loro 讀取,反之亦然。
Lomo 的核心 API 非常簡潔:
let doc = LoroDoc::new()
doc.set_peer_id(1UL)
// 獲取文本容器并編輯
let text = doc.get_text("content")
doc.text_insert(text, 0, "Hello, World!")
doc.text_delete(text, 5, 2)
// 導出更新(用于同步)
let updates = doc.export_updates()
// 另一個 peer 導入更新
let doc2 = LoroDoc::new()
doc2.set_peer_id(2UL)
doc2.import_updates(updates) // 兩邊內容自動同步
2、做一個協同文本編輯器
因為協同需求經常發生在前端,因此 Loro 發行了 Wasm API 以保證前端也可以使用這一優秀的 CRDTs 庫。但 Rust 編譯的 Wasm 體積偏大,而且難以根據用戶某一項單獨需求進行 tree-sharking,因此成為很多前端開發者使用 Loro 的痛點。
但前端如果使用 MoonBit+Lomo 在 JavaScript 后端編寫,則編譯器只會按需編譯 API,最終編譯結果非常好。同時,MoonBit 的 Wasm 編譯結果往往會更小、更干凈,就算是使用 Wasm 后端進行發行也會得到很好的效果。
因此我們可以嘗試根據 JavaScript 后端制作一個協同文本編輯器來驗證這一點,下面展示了大致的實現方式:
首先在 MoonBit 一側封裝文檔操作,供 JavaScript 調用:
///| 創建文檔
pub fn create_doc(peer_id : Int) -> Int {
let doc = LoroDoc::new()
doc.set_peer_id(peer_id.to_uint64())
let text = doc.get_text("body")
// 訂閱本地更新,自動收集待發送的數據
let _ = doc.subscribe_local_update((bytes) => {
pending_updates.push(bytes)
true
}
// ...
}///| 應用編輯操作
pub fn apply_edit_utf16(doc_id : Int, start : Int, delete_len : Int, insert_text : String) -> Bool {
let doc = docs[doc_id]
let text = texts[doc_id]
if delete_len > 0 { doc.text_delete_utf16(text, start, delete_len)? }
if insert_text.length() > 0 { doc.text_insert_utf16(text, start, insert_text)? }
true
}
JavaScript 側處理用戶輸入和同步邏輯:
// 處理用戶輸入
function handleInput(side, other) {
const nextText = side.el.textContent;
const change = diffText(side.text, nextText); // 計算新舊文本的差異
// 應用到 CRDT(調用 MoonBit 導出的函數)
apply_edit_utf16(side.id, change.start, change.deleteCount, change.insertText);
side.text = nextText;
syncFrom(side, other); // 同步給另一方
}// 同步邏輯
function syncFrom(from, to) {
const updates = drain_updates(from.id); // 獲取待發送的更新(MoonBit 導出)
if (state.online) {
apply_updates(to.id, updates); // 在線:立即應用(MoonBit 導出)
} else {
from.outbox.push(...updates); // 離線:緩存到發件箱
}
}
最終經過一些樣式編寫和頁面編寫的工作,我們就可以得到一個基于 CRDTs 的協同編輯器:

該項目的源碼在文章末尾已經給出,感興趣的讀者可以自行參考并開發更有意思的項目。
七.總結
本文從并發沖突問題出發,介紹了實時協作算法的演進:
OT :通過轉換操作解決沖突,但需要中央服務器
RGA :用唯一 ID 和相對位置實現去中心化,但元數據膨脹
Eg-walker:結合兩者優點,存儲簡單操作,合并時臨時構建 CRDT
我們用 MoonBit 實現了上述算法的核心數據結構與關鍵計算部分、還介紹了 Loro/lomo 庫和他們的基本使用,并使用 Lomo 開發了一個簡單的協作編輯應用。
從 1989 年 OT 的誕生,到 2011 年 RGA 等 CRDT 的形式化,再到 2024 年 Eg-walker 的創新融合,實時協作算法經歷了三十余年的演進。而近年來隨著 Local-first 理念的興起,CRDT 正從學術論文走向生產實踐——Figma、Linear 背后都有它的身影。
未來,歷史壓縮、復雜數據結構、端到端加密等方向仍在快速推進;MoonBit 高效編譯到 WebAssembly 的能力,也為 CRDTs 在瀏覽器和邊緣設備上的部署提供了新可能。
八.參考項目/文獻
Lomo-Demo(編輯器)演示:
https://lampese.github.io/lomo-demo/
Lomo-Demo(編輯器)源碼:
https://github.com/Lampese/lomo-demo
Loro:https://loro.dev/
Lomo:https://github.com/Lampese/lomo
Eg-walker 論文:
https://arxiv.org/abs/2409.14252
特別聲明:以上內容(如有圖片或視頻亦包括在內)為自媒體平臺“網易號”用戶上傳并發布,本平臺僅提供信息存儲服務。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.