apps/opik-backend/docs/webhook-event-debouncing-implementation.md
This document describes the implementation of webhook alert debouncing in the Opik backend. The system aggregates multiple alert events within a configurable time window and sends consolidated webhook notifications.
┌────────────────────────────────────────────────────────────────────────┐
│ Alert Evaluation │
│ (Checks if events match alert config and adds to bucket) │
└─────────────────────────────────┬──────────────────────────────────────┘
│
↓
┌────────────────────────────────────────────────────────────────────────┐
│ AlertBucketService │
│ (Manages alert event buckets in Redis) │
│ │
│ • addEventToBucket() - Adds event to Redis bucket │
│ • getBucketsReadyToProcess() - Returns buckets past debounce window │
│ • getBucketEventIds() - Retrieves aggregated event IDs │
│ • deleteBucket() - Removes processed bucket │
└─────────────────────────────────┬──────────────────────────────────────┘
│
↓
┌────────────────────────────────────────────────────────────────────────┐
│ AlertJob │
│ (Scheduled job runs every 5 seconds) │
│ │
│ 1. Check buckets ready to process │
│ 2. For each ready bucket: │
│ • Retrieve bucket data (event IDs, workspace ID) │
│ • Fetch alert configuration from AlertService │
│ • Create payload with alert and event data │
│ • Publish webhook via WebhookPublisher │
│ • Delete bucket │
└─────────────────────────────────┬──────────────────────────────────────┘
│
↓
┌────────────────────────────────────────────────────────────────────────┐
│ WebhookPublisher │
│ (Publishes webhooks to message queue) │
│ │
│ • publishWebhookEvent() - Publishes event to webhook queue │
└────────────────────────────────────────────────────────────────────────┘
| Component | Responsibility |
|---|---|
| Alert Evaluation | Checks if event matches alert config, adds to bucket via AlertBucketService |
| AlertBucketService | Stores and retrieves alert event buckets in Redis with workspace context |
| AlertJob | Scheduled job that checks buckets, retrieves alert config from Alert Service, creates payload, publishes webhooks |
| WebhookPublisher | ONLY publishes webhook events - no knowledge of aggregation |
// Alert evaluation logic (to be implemented)
if (eventMatchesAlertConfig(event, alert)) {
alertBucketService.addEventToBucket(
alert.getId(),
workspaceId,
eventType,
event.getId()
).subscribe();
}
Bucket Key Format: alert_bucket:{alertId}:{eventType}
Bucket Data Structure:
{
"eventIds": "[\"event-1\", \"event-2\", \"event-3\"]",
"firstSeen": "1704067200000",
"windowSize": "60000",
"workspaceId": "workspace-123"
}
Note: The bucket stores:
windowSize: The debouncing window (in milliseconds) active when the bucket was createdworkspaceId: The workspace ID for retrieving alert configuration without RequestContext
These fields ensure that configuration changes do not affect existing buckets and allow background jobs to access alert data.AlertJob (runs every 5 seconds):
@Every("5s")
public class AlertJob extends Job {
@Override
public void doJob(JobExecutionContext context) {
// Check buckets ready to process
bucketService.getBucketsReadyToProcess()
.flatMap(this::processBucket)
.blockLast();
}
}
When a bucket is ready:
private Mono<Void> processBucket(String bucketKey) {
// 1. Parse alert ID and event type
// 2. Retrieve alert configuration from AlertService
// 3. Get aggregated event IDs from bucket
// 4. Create consolidated webhook event
// 5. Send via WebhookSubscriber
// 6. Delete bucket
}
webhook:
debouncing:
enabled: true
windowSize: 60 seconds # Time to wait before sending consolidated notification
bucketTtl: 3 minutes # Bucket expiration time (safety cleanup)
Alert {
id: UUID
name: String
enabled: Boolean
webhook: {
url: String
headers: Map<String, String>
secretToken: String
}
triggers: List<AlertTrigger> {
eventType: AlertEventType
triggerConfigs: List<TriggerConfig>
}
}
AlertBucketService:
public Mono<Void> addEventToBucket(
UUID alertId,
String workspaceId,
AlertEventType eventType,
String eventId) {
String bucketKey = "alert_bucket:" + alertId + ":" + eventType.getValue();
RMapReactive<String, String> bucket = redissonClient.getMap(bucketKey);
long currentWindowSizeMillis = webhookConfig.getDebouncing().getWindowSize().toMilliseconds();
// Add event ID to set
// If first event in bucket:
// - Store firstSeen timestamp
// - Store windowSize (preserves config at creation time)
// - Store workspaceId (enables background job to retrieve alert)
// - Set bucket TTL (ONLY on first event, NOT refreshed on subsequent events)
// Subsequent events:
// - DO NOT update windowSize or workspaceId (preserves original config and context)
// - DO NOT refresh TTL (bucket expires based on original TTL)
}
public Flux<String> getBucketsReadyToProcess() {
// Scan all alert_bucket:* keys
// For each bucket:
// - Retrieve firstSeen timestamp AND windowSize from bucket
// - If (now - firstSeen) >= stored windowSize
// - Return bucket key
// Note: Uses the windowSize stored in the bucket, not current config
}
public Mono<BucketData> getBucketData(String bucketKey) {
// Retrieves complete bucket data including:
// - eventIds: Set of aggregated event IDs
// - firstSeen: Timestamp when first event was added
// - windowSize: Debouncing window size (milliseconds)
// - workspaceId: Workspace ID for accessing alert configuration
}
Event Timeline:
t0: First event arrives → Store firstSeen = t0, windowSize = 60000ms, Set TTL = 3 minutes
t1: Second event arrives → Keep firstSeen = t0, Keep windowSize = 60000ms, Keep TTL (not refreshed)
t2: Third event arrives → Keep firstSeen = t0, Keep windowSize = 60000ms, Keep TTL (not refreshed)
...
t60: AlertJob runs → (t60 - t0) >= 60s → Send consolidated webhook
t60+: Bucket deleted by AlertJob after processing
(If not deleted by AlertJob, Redis automatically expires bucket after TTL)
TTL Behavior:
bucketTtl configuration (default: 3 minutes) is the safety cleanup mechanismWhen the debouncing windowSize configuration is changed, the system handles existing and new buckets correctly:
Scenario:
Behavior:
Bucket A (created with 60s window):
New Bucket B (created after config change):
Key Implementation Details:
addEventToBucket() stores windowSize only for the first event in a bucketwindowSizegetBucketsReadyToProcess() reads the windowSize from each bucket (not from current config)Test Coverage:
See AlertBucketServiceTest.java for comprehensive tests covering:
{
"id": "alert-{alertId}-{uuid}",
"eventType": "alert.fired",
"alertId": "{alertId}",
"alertName": "High Error Rate",
"workspaceId": "{workspaceId}",
"userName": "system",
"url": "{webhookUrl}",
"payload": {
"alertId": "{alertId}",
"alertName": "High Error Rate",
"eventType": "trace:errors",
"eventIds": ["event-1", "event-2", "event-3"],
"eventCount": 3,
"aggregationType": "consolidated",
"message": "Alert 'High Error Rate': 3 trace:errors events aggregated"
},
"headers": {
"X-Custom-Header": "value"
}
}
@DisallowConcurrentExecution)1. getBucketsReadyToProcess() → Flux<String> of bucket keys
2. For each bucketKey:
a. Parse alertId and eventType
b. Retrieve Alert configuration
c. Get aggregated event IDs
d. Create consolidated webhook event
e. Send via WebhookSubscriber
f. Delete bucket
3. Error handling: Continue on error, log failures
bucketService.getBucketsReadyToProcess()
.flatMap(this::processBucket)
.onErrorContinue((throwable, bucketKey) -> {
log.error("Failed to process bucket '{}': {}",
bucketKey, throwable.getMessage(), throwable);
})
.blockLast();
// If alert is disabled
if (!alert.enabled()) {
log.warn("Alert '{}' is disabled, skipping webhook", alert.id());
return Mono.empty();
}
// If webhook configuration is missing
if (alert.webhook() == null || alert.webhook().url() == null) {
log.error("Alert '{}' has no webhook configuration, skipping", alert.id());
return Mono.empty();
}
return webhookSubscriber.sendWebhook(webhookEvent)
.doOnSuccess(__ -> log.info("Successfully sent webhook for alert '{}'", alert.id()))
.doOnError(error -> log.error("Failed to send webhook for alert '{}': {}",
alert.id(), error.getMessage(), error));
@Test
void shouldAddEventToBucket() {
// Test event addition to Redis bucket
}
@Test
void shouldReturnBucketsReadyToProcess() {
// Test bucket readiness based on firstSeen timestamp
}
@Test
void shouldProcessBucketAndSendWebhook() {
// Test AlertJob processing and webhook sending
}
@Test
void shouldHandleDisabledAlert() {
// Test skipping webhooks for disabled alerts
}
AlertBucketService.java - Redis bucket managementAlertJob.java - Scheduled job for processing bucketsWebhookPublisher.java - Webhook event publisherWebhookConfig.java - Debouncing configurationAlertService.java - Alert configuration service (includes getByIdAndWorkspace)Alert.java - Alert configurationAlertTrigger.java - Alert trigger configurationWebhookEvent.java - Webhook event structureWebhookEventTypes.java - Event type enum (includes ALERT_FIRED)AlertService.java - Alert CRUD operationsAlertDAO.java - Alert database accessworkspaceId storage in buckets for background job access
getByIdAndWorkspace() method to AlertServicegetBucketData() to retrieve complete bucket information
WebhookPublisher instead of WebhookSubscriberwindowSize
AlertBucketServiceTest.javawindowSize fieldgetBucketsReadyToProcess() to use stored window size instead of current config