packages/cli/src/modules/log-streaming.ee/README.md
The Log Streaming module provides enterprise-grade event logging that forwards n8n internal events to external destinations in real-time. This allows organizations to centralize logs, integrate with monitoring solutions, and maintain audit trails.
Key Features:
License Requirement: feat:logStreaming
┌──────────────┐
│ n8n Workflow │ emits events
│ Execution │─────────────────┐
└──────────────┘ │
▼
┌────────────────────────┐
│ MessageEventBus │
│ (Event Publisher) │
└────────────┬───────────┘
│ message event
▼
┌─────────────────────────────────────┐
│ LogStreamingDestinationService │
│ - Filters by subscription │
│ - Routes to destinations │
│ - Handles confirmations │
└──────────┬──────────────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│Webhook │ │ Sentry │ │Syslog │
│ Dest │ │ Dest │ │ Dest │
└────┬───┘ └────┬───┘ └────┬───┘
│ │ │
└─────────────┴─────────────┘
│ with Circuit Breaker
▼
External Systems
Abstract base for all destination types with:
sendMessage()Key Method - sendMessage(): Entry point for all messages. Automatically handles:
license.isLogStreamingEnabled())hasSubscribedToEvent())receiveFromEventBus() for actual sendingCircuit Breaker: Protects against cascading failures with configurable thresholds for timeout, max failures, and recovery testing.
REST API endpoints:
GET /eventbus/eventnames - List available eventsGET /eventbus/destination - Get destination(s)POST /eventbus/destination - Create destinationDELETE /eventbus/destination - Delete destinationGET /eventbus/testmessage - Test destinationTable: event_destinations
{
id: string; // UUID
destination: JSON; // Serialized destination config
createdAt: DateTime;
updatedAt: DateTime;
}
Sends events to HTTP/HTTPS endpoints with authentication support, custom headers, and response validation.
Forwards events to Sentry with automatic severity detection and release tracking.
Sends events to syslog servers in RFC 5424 format via TCP/UDP/TLS.
Add to packages/workflow/src/Interfaces.ts:
export enum MessageEventBusDestinationTypeNames {
// ... existing types
myNewType = 'myNewType',
}
export interface MessageEventBusDestinationMyNewTypeOptions
extends MessageEventBusDestinationOptions {
__type: MessageEventBusDestinationTypeNames.myNewType;
// Add your config properties
host: string;
apiKey?: string;
}
Create destinations/message-event-bus-destination-mynewtype.ee.ts:
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
import type { MessageWithCallback } from '@/eventbus/message-event-bus/message-event-bus';
import { MessageEventBusDestination } from './message-event-bus-destination.ee';
export class MessageEventBusDestinationMyNewType extends MessageEventBusDestination {
host: string;
constructor(eventBusInstance, options) {
super(eventBusInstance, options);
this.host = options.host;
this.__type = MessageEventBusDestinationTypeNames.myNewType;
// Initialize your client
}
async receiveFromEventBus(emitterPayload: MessageWithCallback): Promise<boolean> {
const { msg, confirmCallback } = emitterPayload;
try {
const payload = this.anonymizeAuditMessages ? msg.anonymize() : msg.payload;
// Send to your destination
// await this.client.send({ ... });
confirmCallback(msg, { id: this.id, name: this.label });
return true;
} catch (error) {
this.logger.error(`Failed to send to ${this.label}`, { error });
throw error; // Triggers circuit breaker
}
}
serialize() {
return { ...super.serialize(), host: this.host };
}
static deserialize(eventBusInstance, data) {
if (data.__type === MessageEventBusDestinationTypeNames.myNewType) {
return new MessageEventBusDestinationMyNewType(eventBusInstance, data);
}
return null;
}
}
The confirmCallback is a critical part of the message delivery flow. You must call it when your destination successfully sends a message.
Why it's important:
When to call it:
// ✅ DO: Call after successful delivery
await this.client.send(payload);
confirmCallback(msg, { id: this.id, name: this.label });
// ❌ DON'T: Call before sending (message might fail)
confirmCallback(msg, { id: this.id, name: this.label });
await this.client.send(payload); // What if this fails?
// ❌ DON'T: Forget to call it (delivery won't be tracked)
await this.client.send(payload);
return true; // Missing confirmCallback!
Parameters:
msg: The original event message being confirmedsource: Object identifying your destination
id: Your destination's unique ID (this.id)name: Human-readable label (this.label)In destinations/message-event-bus-destination-from-db.ts:
case MessageEventBusDestinationTypeNames.myNewType:
return MessageEventBusDestinationMyNewType.deserialize(eventBusInstance, destinationData);
In log-streaming.controller.ts:
case MessageEventBusDestinationTypeNames.myNewType:
result = await this.destinationService.addDestination(
new MessageEventBusDestinationMyNewType(this.eventBus, body),
);
break;
Update packages/@n8n/api-types with Zod schema and add to CreateDestinationDto discriminated union.
Create destinations/__tests__/message-event-bus-destination-mynewtype.ee.test.ts following existing test patterns.
Subscribe to events using glob patterns:
subscribedEvents: [
'*', // All events
'n8n.workflow.*', // All workflow events
'n8n.workflow.failed', // Specific event
'n8n.audit.*', // All audit events
]
Get available events: GET /eventbus/eventnames
receiveFromEventBus() to trigger circuit breakerclose() methodthis.anonymizeAuditMessagesGET /eventbus/testmessage?id=<destination-id>enabled: true and subscription patterns matchmaxFailures or failureWindow