v3/docs/adr/ADR-109-receive-side-dispatch.md
In alpha.9, the federation plugin auto-binds a transport listener (transport.listen(port)) when config.port is set. Inbound bytes arrive at the WebSocket server and are queued in the transport's per-address message queue. But the coordinator never wakes up to consume them. Federation today is one-directional in a meaningful sense: peers can SEND to each other (transport.send works end-to-end), but the receiver's coordinator doesn't know any envelopes arrived.
Concretely, the WebSocketFallbackTransport's onmessage handler in agentic-flow/transport/quic-loader.ts:
ws.on('message', (raw: RawData) => {
const message = JSON.parse(raw.toString()) as AgentMessage;
const queue = this.messageQueue.get(remoteAddr) ?? [];
queue.push(message);
this.messageQueue.set(remoteAddr, queue);
});
Pushes to the in-memory queue, full stop. The federation plugin doesn't poll, doesn't subscribe, doesn't dispatch.
Add a receive loop in plugin.ts that:
transport.listen() succeeds, registers an inbound message handlerAgentMessage, reconstructs the FederationEnvelope from the payload field (the sender wrapped it there in sendToNode)messageType: 'task' | 'task-assignment' → emit federation:inbound-task eventmessageType: 'memory-query' → emit federation:inbound-query eventmessageType: 'context-share' → store in PII-scrubbed inbound contextmessage_received with metadata.unknown=trueInbound messages are integrator-routed. Federation plugin's job is to deliver the envelope safely (verified, scrubbed, audited) and let the host application decide what to do with it. The plugin's eventBus.emit('federation:inbound-task', envelope) is the contract; the integrator subscribes via context.eventBus.on(...).
This keeps the plugin responsibility-bounded: it's the trusted boundary between wire and app, not a task scheduler.
onInboundMessage to the transport interfaceThe current AgentTransport interface (in agentic-flow/transport/loader) doesn't expose an inbound subscription. We need to extend it WITHOUT breaking existing consumers:
// New optional method on AgentTransport
onMessage?(handler: (address: string, message: AgentMessage) => void | Promise<void>): void;
Implementation in WebSocketFallbackTransport: add a private messageHandlers: Set<...> set, fire each registered handler on every onmessage. Keep the existing queue-based receive() API for callers that prefer poll over push.
Companion change to upstream agentic-flow's PR #153 (already open). Federation plugin uses optional chaining (transport.onMessage?.(...)) so it gracefully degrades if running against an older agentic-flow that doesn't have the hook yet.
type InboundHandler = (address: string, message: AgentMessage) => void | Promise<void>;
address is the sender's address (e.g. 192.168.1.42:54321 from the WS upgrade headers). message.metadata.sourceNodeId is the cryptographic identity claim — handler must verify the signature against discovery.getPeer(sourceNodeId).publicKey before trusting any other field.
onMessage hook (companion to PR #153)In agentic-flow/src/transport/quic-loader.ts:
private messageHandlers = new Set<(address: string, message: AgentMessage) => void | Promise<void>>();
onMessage(handler: (address: string, message: AgentMessage) => void | Promise<void>): void {
this.messageHandlers.add(handler);
}
// In the existing onmessage callbacks (both server-side and client-side):
ws.on('message', (raw: RawData) => {
try {
const message = JSON.parse(raw.toString()) as AgentMessage;
// Existing queue push (preserves receive() API)
const queue = this.messageQueue.get(addr) ?? [];
queue.push(message);
this.messageQueue.set(addr, queue);
// New: fan out to handlers
for (const h of this.messageHandlers) {
Promise.resolve(h(addr, message)).catch((err) =>
logger.warn('Inbound handler threw', { addr, err })
);
}
} catch (err) { /* ... */ }
});
In v3/@claude-flow/plugin-agent-federation/src/plugin.ts, after transport.listen():
if (transport && typeof transport.onMessage === 'function') {
transport.onMessage(async (address, message) => {
await dispatchInbound(address, message, {
coordinator: this.coordinator!,
discovery,
audit,
verifyEnvelope: verifyBytes,
eventBus: context.eventBus,
logger: context.logger,
});
});
}
dispatchInbound to a new filesrc/application/inbound-dispatcher.ts:
__tests__/unit/inbound-dispatcher.test.ts:
message_received + event emittedmessage_rejected + event NOT emittedPEER_SUSPENDED (defense-in-depth: outbound side should already short-circuit, but receive side enforces too)federation:inbound with metadata.unknown=truemessage.id / metadata. Dispatcher emits the event, integrator's handler sends the reply via coordinator.sendMessage.message_rejected)message_rejected)dispatchInbound is async-tolerant — handler errors surface as audit log entries, never crash the listener| Step | Status |
|---|---|
Upstream onMessage hook in agentic-flow | Implemented this iteration — companion commit to PR #153 |
inbound-dispatcher.ts | Implemented |
plugin.ts subscription wiring | Implemented |
| Tests | Implemented (5 specs) |
| Validated mac↔ruvultra round-trip with both directions | Implemented — alpha.10 release smoke |
Re-open when: