doc/source/ray-core/internals/ray-event-exporter.rst
.. _ray-event-exporter:
This document is based on Ray version 2.52.1.
Ray's event exporting infrastructure collects events from C++ components (GCS, workers) and Python components, buffers and merges them, and exports them to external HTTP services. This document explains how events flow through the system from creation to final export.
Ray's event system uses a multi-stage pipeline:
RayEventInterface <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_event_interface.h#L24>__. Raylet does not emit any Ray events, but there are no technical limitations preventing it from doing so.AggregatorAgent <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/python/ray/dashboard/modules/aggregator/aggregator_agent.py>__ receives and buffers eventsThe following diagram shows the high-level flow:
.. code-block:: text
C++ Components (GCS, workers) ↓ (Create events via RayEventInterface) RayEventRecorder (C++) ↓ (Buffer & merge events) ↓ (gRPC export via EventAggregatorClient) AggregatorAgent (Python) ↓ (Add to MultiConsumerEventBuffer) RayEventPublisher ↓ (Filter & convert to JSON) ↓ (HTTP POST) External HTTP Service
Ray events are structured using protobuf messages with a base RayEvent message that contains event-specific nested messages.
Event Types
Events are categorized by type, defined in the `EventType` enum in `events_base_event.proto <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/protobuf/public/events_base_event.proto#L49>`__:
- **TASK_DEFINITION_EVENT**: Task definition information
- **TASK_LIFECYCLE_EVENT**: Task state transitions (this covers both normal tasks and actor tasks)
- **ACTOR_TASK_DEFINITION_EVENT**: Actor task definition
- **ACTOR_DEFINITION_EVENT**: Actor definition
- **ACTOR_LIFECYCLE_EVENT**: Actor state transitions
- **DRIVER_JOB_DEFINITION_EVENT**: Driver job definition
- **DRIVER_JOB_LIFECYCLE_EVENT**: Driver job state transitions
- **NODE_DEFINITION_EVENT**: Node definition
- **NODE_LIFECYCLE_EVENT**: Node state transitions
- **TASK_PROFILE_EVENT**: Task profiling data
Event Structure
The base RayEvent <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/protobuf/public/events_base_event.proto#L32>__ message contains:
task_definition_event, actor_lifecycle_event)Entity ID Concept
The entity ID is a unique identifier for the entity associated with an event. It's used for two purposes:
1. **Association**: Links execution events with definition events (e.g., task lifecycle events with task definition events)
2. **Merging**: Groups events with the same entity ID and type for merging before export
For example:
- Task events use `task_id + task_attempt` as the entity ID
- Actor events use `actor_id` as the entity ID
- Driver job events use `job_id` as the entity ID
Event Recording and Buffering (C++ Side)
-----------------------------------------
C++ components record events through the `RayEventRecorder <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_event_recorder.h>`__ class, which provides thread-safe event buffering and export.
RayEventRecorder
~~~~~~~~~~~~~~~~
The `RayEventRecorder` is a thread-safe event recorder that:
- Maintains a bounded circular buffer for events
- Merges events with the same entity ID and type before export
- Periodically exports events via gRPC to the aggregator agent using `EventAggregatorClient <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/rpc/event_aggregator_client.h>`__
- Tracks dropped events when the buffer is full
Adding Events
~~~~~~~~~~~~~
Events are added to the recorder via the `AddEvents() <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_event_recorder.cc#L92>`__ method, which accepts a vector of `RayEventInterface` pointers. The method:
1. Checks if event recording is enabled (via `enable_ray_event` config)
2. Calculates if adding events would exceed the buffer size
3. Drops old events if necessary and records metrics for dropped events
4. Adds new events to the circular buffer
Buffer Management
The recorder uses a boost::circular_buffer <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_event_recorder.h#L66>__ to store events. When the buffer is full:
dropped_events_counter metricRAY_ray_event_recorder_max_queued_events environment variableEvents are exported from C++ components to the aggregator agent using gRPC. The export process is initiated by calling StartExportingEvents() <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_event_recorder.cc#L37>__.
StartExportingEvents
This method:
1. Checks if event recording is enabled
2. Verifies it hasn't been called before (should only be called once)
3. Sets up a `PeriodicalRunner` to periodically call `ExportEvents()`
4. Uses the configured export interval (`ray_events_report_interval_ms`)
ExportEvents Process
The ExportEvents() <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_event_recorder.cc#L52>__ method performs the following steps:
Merge() <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_event_interface.h#L55>__ methodRayEvent protobuf via Serialize() <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_event_interface.h#L58>__EventAggregatorClient::AddEvents() <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/rpc/event_aggregator_client.h>__Event Merging Logic
Event merging is an optimization that reduces data size by combining related events. Events with the same entity ID and type are merged:
- **Definition Events**: Typically don't change when merged (e.g., actor definition)
- **Lifecycle Events**: State transitions are appended to form a time series (e.g., task state transitions: started → running → completed)
The merging maintains the order of events while combining them into a single event with all state transitions.
Error Handling
~~~~~~~~~~~~~~
If the gRPC export fails:
- An error is logged
- The process continues (doesn't crash)
- The next export interval will attempt to send events again
- Events remain in the buffer until successfully exported (or the buffer is full and old events are dropped)
Event Reception and Buffering (Python Side)
---------------------------------------------
The `AggregatorAgent <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/python/ray/dashboard/modules/aggregator/aggregator_agent.py>`__ receives events from C++ components via a gRPC service and buffers them for publishing.
AggregatorAgent
~~~~~~~~~~~~~~~
The `AggregatorAgent` is a dashboard agent module that:
- Implements `EventAggregatorServiceServicer` for gRPC event reception
- Maintains a `MultiConsumerEventBuffer` for event storage
- Manages `RayEventPublisher` instances for publishing to external http endpoints
- Tracks metrics for events received, buffer and publisher operations
AddEvents gRPC Handler
The AddEvents() <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/python/ray/dashboard/modules/aggregator/aggregator_agent.py#L165>__ method is the gRPC handler that receives events:
MultiConsumerEventBuffer via add_event() <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/python/ray/dashboard/modules/aggregator/multi_consumer_event_buffer.py#L62>__MultiConsumerEventBuffer
The `MultiConsumerEventBuffer <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/python/ray/dashboard/modules/aggregator/multi_consumer_event_buffer.py>`__ is an asyncio-friendly buffer that:
- **Supports Multiple Consumers**: Each consumer has an independent cursor index. RayEventPublisher and other consumers share this same buffer.
- **Tracks Evictions**: When the buffer is full, oldest events are dropped and tracked per consumer
- **Bounded Buffer**: Uses `deque` with `maxlen` to limit buffer size
- **Asyncio-Safe**: Uses `asyncio.Lock` and `asyncio.Condition` for synchronization
Key operations:
- **add_event()**: Adds an event to the buffer, dropping oldest if full
- **wait_for_batch()**: Waits for a batch of events up to `max_batch_size`, with timeout. The timeout only applies when there is at least one event in the buffer. If the buffer is empty, `wait_for_batch()` can block indefinitely.
- **register_consumer()**: Registers a new consumer with a unique name
Event Filtering
~~~~~~~~~~~~~~~
The agent checks if events can be exposed to external services via `_can_expose_event() <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/python/ray/dashboard/modules/aggregator/aggregator_agent.py#L195>`__. Only events whose type is in the `EXPOSABLE_EVENT_TYPES` set are allowed to be published externally.
Event Publishing to HTTP
------------------------
Events are published to external HTTP services by the `RayEventPublisher`, which reads from the event buffer and sends HTTP POST requests.
RayEventPublisher
~~~~~~~~~~~~~~~~~
The `RayEventPublisher <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/python/ray/dashboard/modules/aggregator/publisher/ray_event_publisher.py>`__ runs a worker loop that:
1. Registers as a consumer of the `MultiConsumerEventBuffer`
2. Continuously waits for batches of events via `wait_for_batch()`
3. Publishes batches using the configured `PublisherClientInterface`
4. Handles retries with exponential backoff on failures
5. Records metrics for publish success, failures, and latency
The publisher runs in an async context and uses `asyncio` for non-blocking operations.
AsyncHttpPublisherClient
The AsyncHttpPublisherClient <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/python/ray/dashboard/modules/aggregator/publisher/async_publisher_client.py#L60>__ handles HTTP publishing:
events_filter_fn (typically _can_expose_event)message_to_json() from protobufThreadPoolExecutor to avoid blocking the event loopaiohttp.ClientSession for HTTP requestsBatch Publishing
Events are published in batches:
- Batch size is limited by `max_batch_size` (default: 10,000 events)
- Batches are created by `wait_for_batch()` which waits up to a timeout for events
- Larger batches reduce HTTP request overhead but increase latency
Retry Logic
~~~~~~~~~~~
The publisher implements retry logic with exponential backoff:
- Retries failed publishes up to `max_retries` times (default: infinite)
- Uses exponential backoff with jitter between retries
- If max retries are exhausted, we drop the events and record a metric for dropped events
Configuration
~~~~~~~~~~~~~
HTTP publishing is configured via environment variables:
- **RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR**: HTTP endpoint URL (e.g., `http://localhost:8080/events`)
- **RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES**: Comma-separated list of event types to expose
- **RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SERVICE**: Enable/disable flag (default: True)
Creating New Event Types
-------------------------
To create a new event type, follow these steps:
Step 1: Define Protobuf Message
Create a new .proto file in src/ray/protobuf/public/ following the naming convention events_<name>_event.proto. For example, see events_task_definition_event.proto <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/protobuf/public/events_task_definition_event.proto>__.
Define your event-specific message with the fields you need:
.. code-block:: protobuf
syntax = "proto3"; package ray.rpc.events;
message MyNewEvent { // Define your event-specific fields here string entity_id = 1; // ... other fields }
Step 2: Add to Base Event
Update `events_base_event.proto <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/protobuf/public/events_base_event.proto>`__:
1. Add import for your new proto file
2. Add new `EventType` enum value (e.g., `MY_NEW_EVENT = 11`)
3. Add new field to `RayEvent` message (e.g., `MyNewEvent my_new_event = 18`)
Step 3: Implement RayEventInterface
Create a C++ class that implements RayEventInterface <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_event_interface.h>. The easiest approach is to extend RayEvent<T> template class, as shown in ray_actor_definition_event.h <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_actor_definition_event.h>.
You need to implement:
RayEvent protobufEventType enum value for this eventSee ray_actor_definition_event.cc <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_actor_definition_event.cc>__ for a complete example.
Step 4: Update Exposable Event Types (if needed)
If your event should be exposed to external HTTP services, add it to `DEFAULT_EXPOSABLE_EVENT_TYPES <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/python/ray/dashboard/modules/aggregator/aggregator_agent.py#L56>`__ in `aggregator_agent.py`. Alternatively, users can configure it via the `RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES` environment variable.
Step 5: Update RayEventRecorder to publish your new event type
Use RayEventRecorder::AddEvent() <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/src/ray/observability/ray_event_recorder.cc#L92>__ to add your new event type to the buffer.
Step 6: Update AggregatorAgent to publish your new event type
Update `AggregatorAgent <https://github.com/ray-project/ray/blob/4ebdc0abe5e5a551625fe7f87053c7e668a6ff74/python/ray/dashboard/modules/aggregator/aggregator_agent.py#L56>`__ to publish your new event type.