foundations/server/docs/tx-ordering-middleware-implementation.md
The TxOrderingMiddleware ensures that transactions for the same document are broadcasted to clients in the correct order, preventing race conditions that cause unnecessary getCurrentDoc calls.
When multiple transactions for the same document are processed concurrently:
modifiedOn=101modifiedOn=102handleBroadcast() might complete before tx1'sgetCurrentDoc unnecessarilyThis was confirmed in the unit test liveQuery.race.test.ts.
client.tsLooking at how transactions flow in client.ts:
async txRaw(ctx: ClientSessionCtx, tx: Tx): Promise<{...}> {
// 1. Process transaction
result = await ctx.pipeline.tx(ctx.ctx, [tx])
// 2. Send result to client
await ctx.sendResponse(ctx.requestId, result)
// 3. Broadcast to other clients (returns promise)
const broadcastPromise = ctx.pipeline.handleBroadcast(ctx.ctx)
return { result, broadcastPromise, asyncsPromise }
}
The critical observation: Multiple txRaw() calls can be in-flight simultaneously, and their handleBroadcast() promises may resolve out-of-order.
tx() - Record Transaction OrderWhen a transaction is processed:
Map<docId, TxOrderEntry[]>interface TxOrderEntry {
txId: string
modifiedOn: number
broadcastPromise?: Promise<void>
broadcastResolve?: () => void
}
handleBroadcast() - Wait Before BroadcastingWhen broadcasting transactions:
await Promise.all(waitPromises)handleBroadcast() to perform actual broadcastTime | Thread 1 (tx1, modifiedOn=101) | Thread 2 (tx2, modifiedOn=102)
------|-------------------------------------|-----------------------------------
T1 | tx() called |
| - Create promise1 |
| - Queue: [tx1] |
------|-------------------------------------|-----------------------------------
T2 | | tx() called
| | - Create promise2
| | - Queue: [tx1, tx2]
------|-------------------------------------|-----------------------------------
T3 | | handleBroadcast() called
| | - Find tx1 before tx2
| | - await promise1 (BLOCKS)
------|-------------------------------------|-----------------------------------
T4 | handleBroadcast() called |
| - No previous txes | (still waiting on promise1)
| - Broadcast tx1 |
| - Resolve promise1 |
| - Remove tx1 from queue |
------|-------------------------------------|-----------------------------------
T5 | | promise1 resolved, continues
| | - Broadcast tx2
| | - Resolve promise2
| | - Remove tx2 from queue
private readonly docTxQueue = new Map<Ref<Doc>, TxOrderEntry[]>()
To prevent memory leaks:
tx(ctx, txes)Records transaction order:
for (const tx of txes) {
const docId = this.getTargetDocId(tx)
if (docId !== undefined) {
let queue = this.docTxQueue.get(docId)
if (queue === undefined) {
queue = []
this.docTxQueue.set(docId, queue)
}
// Create promise for this transaction's broadcast
let broadcastResolve
const broadcastPromise = new Promise<void>((resolve) => {
broadcastResolve = resolve
})
queue.push({
txId: tx._id,
modifiedOn: tx.modifiedOn,
broadcastPromise,
broadcastResolve
})
}
}
handleBroadcast(ctx)Enforces order and waits:
const waitPromises: Promise<void>[] = []
for (const tx of txes) {
const queue = this.docTxQueue.get(docId)
const txIndex = queue.findIndex((entry) => entry.txId === tx._id)
// Wait for all previous transactions
for (let i = 0; i < txIndex; i++) {
const prevEntry = queue[i]
if (prevEntry.broadcastPromise !== undefined) {
waitPromises.push(prevEntry.broadcastPromise)
}
}
}
// Block until all previous broadcasts complete
await Promise.all(waitPromises)
// Now broadcast this transaction
await this.next?.handleBroadcast(ctx)
// Mark as complete
for (const tx of txes) {
// Resolve promise and remove from queue
entry.broadcastResolve()
queue.splice(txIndex, 1)
}
txOrdering.test.ts)Simulates the full pipeline flow with mocked next middleware.
getCurrentDoc calls on clientIn your middleware configuration:
import { TxOrderingMiddleware } from '@hcengineering/middleware'
const pipeline = [
// ... other middleware
TxOrderingMiddleware.create()
// ... more middleware
]
/docs/livequery-race-condition-analysis.md - Detailed problem analysis/packages/middleware/src/tests/liveQuery.race.test.ts - Original race condition test/packages/middleware/src/tests/txOrdering.test.ts - Middleware unit tests