doc/services/zbus/index.rst
.. _zbus:
Zephyr bus (zbus) #################
.. Note to documentation authors: the diagrams included in this documentation page were designed using the following Figma library: https://www.figma.com/community/file/1292866458780627559/zbus-diagram-assets
The :dfn:Zephyr bus - zbus is a lightweight and flexible software bus enabling a simple way for
threads to talk to one another in a many-to-many way.
.. contents:: :local: :depth: 2
Concepts
Threads can send messages to one or more observers using zbus. It makes the many-to-many communication possible. The bus implements message-passing and publish/subscribe communication paradigms that enable threads to communicate synchronously or asynchronously through shared memory.
The communication through zbus is channel-based. Threads (or callbacks) use channels to exchange messages. Additionally, besides other actions, threads can publish and observe channels. When a thread publishes a message on a channel, the bus will make the message available to all the published channel's observers. Based on the observer's type, it can access the message directly, receive a copy of it, or even receive only a reference of the published channel.
The figure below shows an example of a typical application using zbus in which the application logic (hardware independent) talks to other threads via software bus. Note that the threads are decoupled from each other because they only use zbus channels and do not need to know each other to talk.
.. figure:: images/zbus_overview.svg :alt: zbus usage overview :width: 75%
A typical zbus application architecture.
The bus comprises:
Virtual Distributed Event Dispatcher (VDED), the bus logic responsible for sending
notifications/messages to the observers. The VDED logic runs inside the publishing action in the same
thread context, giving the bus an idea of a distributed execution. When a thread publishes to a
channel, it also propagates the notifications to the observers;.. figure:: images/zbus_anatomy.svg :alt: ZBus anatomy :width: 70%
ZBus anatomy.
The bus makes the publish, read, claim, finish, notify, and subscribe actions available over channels. Publishing, reading, claiming, and finishing are available in all RTOS thread contexts, including ISRs. The publish and read operations are simple and fast; the procedure is channel locking followed by a memory copy to and from a shared memory region and then a channel unlocking. Another essential aspect of zbus is the observers. There are three types of observers:
.. figure:: images/zbus_type_of_observers.svg :alt: ZBus observers type :width: 70%
ZBus observers.
Channel observation structures define the relationship between a channel and its observers. For
every observation, a pair channel/observer. Developers can statically allocate observation using the
:c:macro:ZBUS_CHAN_DEFINE or :c:macro:ZBUS_CHAN_ADD_OBS. There are also runtime observers,
enabling developers to create runtime observations. It is possible to disable an observer entirely
or observations individually. The event dispatcher will ignore disabled observers and observations.
.. figure:: images/zbus_observation_mask.svg :alt: ZBus observation mask. :width: 75%
ZBus observation mask.
The above figure illustrates some states, from (a) to (d), for channels from C1 to C5,
Subscriber 1, and the observations. The last two are in orange to indicate they are dynamically
allocated (runtime observation). (a) shows that the observer and all observations are enabled. (b)
shows the observer is disabled, so the event dispatcher will ignore it. (c) shows the observer
enabled. However, there is one static observation disabled. The event dispatcher will only stop
sending notifications from channel C3. In (d), the event dispatcher will stop sending
notifications from channels C3 and C5 to Subscriber 1.
Suppose a usual sensor-based solution is in the figure below for illustration purposes. When
triggered, the timer publishes to the Trigger channel. As the sensor thread subscribed to the
Trigger channel, it receives the sensor data. Notice the VDED executes the Blink because it
also listens to the Trigger channel. When the sensor data is ready, the sensor thread publishes
it to the Sensor data channel. The core thread receives the message as a Sensor data channel
message subscriber, processes the sensor data, and stores it in an internal sample buffer. It
repeats until the sample buffer is full; when it happens, the core thread aggregates the sample
buffer information, prepares a package, and publishes that to the Payload channel. The Lora
thread receives that because it is a Payload channel message subscriber and sends the payload to
the cloud. When it completes the transmission, the Lora thread publishes to the Transmission done channel. The VDED executes the Blink again since it listens to the Transmission done
channel.
.. figure:: images/zbus_operations.svg :alt: ZBus sensor-based application :width: 85%
ZBus sensor-based application.
This way of implementing the solution makes the application more flexible, enabling us to change things independently. For example, we want to change the trigger from a timer to a button press. We can do that, and the change does not affect other parts of the system. Likewise, we would like to change the communication interface from LoRa to Bluetooth; we only need to change the LoRa thread. No other change is required in order to make that work. Thus, the developer would do that for every block of the image. Based on that, there is a sign zbus promotes decoupling in the system architecture.
Another important aspect of using zbus is the reuse of system modules. If a code portion with well-defined behaviors (we call that module) only uses zbus channels and not hardware interfaces, it can easily be reused in other solutions. The new solution must implement the interfaces (set of channels) the module needs to work. That indicates zbus could improve the module reuse.
The last important note is the zbus solution reach. We can count on many ways of using zbus to enable the developer to be as free as possible to create what they need. For example, messages can be dynamic or static allocated; notifications can be synchronous or asynchronous; the developer can control the channel in so many different ways claiming the channel, developers can add their metadata information to a channel by using the user-data field, the discretionary use of a validator enables the systems to be accurate over message format, and so on. Those characteristics increase the solutions that can be done with zbus and make it a good fit as an open-source community tool.
.. _Virtual Distributed Event Dispatcher:
The VDED execution always happens in the publisher's context. It can be a thread or an ISR. Be careful with publications inside ISR because the scheduler won't preempt the VDED. Use that wisely. The basic description of the execution is as follows:
memcpy);zbus_chan_const_msg
function) since the channel is still locked;To illustrate the VDED execution, consider the example illustrated below. We have four threads in
ascending priority S1, MS2, MS1, and T1 (the highest priority); two listeners,
L1 and L2; and channel A. Supposing L1, L2, MS1, MS2, and S1 observer
channel A.
.. figure:: images/zbus_publishing_process_example_scenario.svg :alt: ZBus example scenario :width: 45%
ZBus VDED execution example scenario.
The following code implements channel A. Note the struct a_msg is illustrative only.
.. code-block:: c
ZBUS_CHAN_DEFINE(a_chan, /* Name */
struct a_msg, /* Message type */
NULL, /* Validator */
NULL, /* User Data */
ZBUS_OBSERVERS(L1, L2, MS1, MS2, S1), /* observers */
ZBUS_MSG_INIT(0) /* Initial value {0} */
);
In the figure below, the letters indicate some action related to the VDED execution. The X-axis represents the time, and the Y-axis represents the priority of threads. Channel A's message, represented by a voice balloon, is only one memory portion (shared memory). It appears several times only as an illustration of the message at that point in time.
.. figure:: images/zbus_publishing_process_example.svg :alt: ZBus publish processing detail :width: 85%
ZBus VDED execution detail for priority T1 > MS1 > MS2 > S1.
The figure above illustrates the actions performed during the VDED execution when T1 publishes to channel A. Thus, the table below describes the activities (represented by a letter) of the VDED execution. The scenario considers the following priorities: T1 > MS1 > MS2 > S1. T1 has the highest priority.
.. list-table:: VDED execution steps in detail for priority T1 > MS1 > MS2 > S1. :widths: 5 65 :header-rows: 1
zbus_chan_const_msg function, which provides a direct constant
reference to channel A's message. It is quick, and no copy is needed here.The figure below illustrates the actions performed during the VDED execution when T1 publishes to channel A. The scenario considers the following priorities: T1 < MS1 < MS2 < S1.
.. figure:: images/zbus_publishing_process_example2.svg :alt: ZBus publish processing detail :width: 85%
ZBus VDED execution detail for priority T1 < MS1 < MS2 < S1.
Thus, the table below describes the activities (represented by a letter) of the VDED execution.
.. list-table:: VDED execution steps in detail for priority T1 < MS1 < MS2 < S1. :widths: 5 65 :header-rows: 1
zbus_chan_const_msg function, which provides a direct constant
reference to channel A's message. It is quick, and no copy is needed here.ZBus implements the Highest Locker Protocol that relies on the observers' thread priority to determine a temporary publisher priority. The protocol considers the channel's Highest Observer Priority (HOP); even if the observer is not waiting for a message on the channel, it is considered in the calculation. The VDED will elevate the publisher's priority based on the HOP to ensure small latency and as few preemptions as possible.
.. note::
The priority boost is enabled by default. To deactivate it, you must set the
:kconfig:option:CONFIG_ZBUS_PRIORITY_BOOST configuration.
.. warning:: ZBus priority boost does not consider runtime observers on the HOP calculations.
The figure below illustrates the actions performed during the VDED execution when T1 publishes to channel A. The scenario considers the priority boost feature and the following priorities: T1 < MS1 < MS2 < S1.
.. figure:: images/zbus_publishing_process_example_HLP.svg :alt: ZBus publishing process details using priority boost. :width: 85%
ZBus VDED execution detail with priority boost enabled and for priority T1 < MS1 < MS2 < S1.
To properly use the priority boost, attaching the observer to a thread is necessary. When the subscriber is attached to a thread, it assumes its priority, and the priority boost algorithm will consider the observer's priority. The following code illustrates the thread-attaching function.
.. code-block:: c :emphasize-lines: 10
ZBUS_SUBSCRIBER_DEFINE(s1, 4); void s1_thread(void *ptr1, void *ptr2, void *ptr3) { ARG_UNUSED(ptr1); ARG_UNUSED(ptr2); ARG_UNUSED(ptr3);
const struct zbus_channel *chan;
zbus_obs_attach_to_thread(&s1);
while (1) {
zbus_sub_wait(&s1, &chan, K_FOREVER);
/* Subscriber implementation */
}
} K_THREAD_DEFINE(s1_id, CONFIG_MAIN_STACK_SIZE, s1_thread, NULL, NULL, NULL, 2, 0, 0);
On the above code, the :c:func:zbus_obs_attach_to_thread will set the s1 observer with
priority two as the thread has that priority. It is possible to reverse that by detaching the
observer using the :c:func:zbus_obs_detach_from_thread. Only enabled observers and observations
will be considered on the channel HOP calculation. Masking a specific observation of a channel will
affect the channel HOP.
In summary, the benefits of the feature are:
Based on the fact that developers can use zbus to solve many different problems, some challenges
arise. ZBus will not solve every problem, so it is necessary to analyze the situation to be sure
zbus is applicable. For instance, based on the zbus benchmark, it would not be well suited to a
high-speed stream of bytes between threads. The :ref:Pipe <pipes_v2> kernel object solves this
kind of need.
ZBus always delivers the messages to the listeners, message subscribers, and async listeners. However, there are no message delivery guarantees for subscribers because zbus only sends the notification, but the message reading depends on the subscriber's implementation. It is possible to increase the delivery rate by following design tips:
.. warning::
ZBus uses :zephyr_file:include/zephyr/net_buf.h (network buffers) to exchange data with message
subscribers. Thus, choose carefully the configurations
:kconfig:option:CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE and
:kconfig:option:CONFIG_HEAP_MEM_POOL_ADD_SIZE_ZBUS are crucial to a proper VDED execution
(delivery guarantee) considering message subscribers and async listeners. If you want to keep an
isolated pool for a specific set of channels, you can use
:kconfig:option:CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION with a dedicated pool. Look
at the :zephyr:code-sample:zbus-msg-subscriber and :zephyr:code-sample:zbus-async-listeners
to see the isolation in action.
.. warning:: Subscribers will receive only the reference of the changing channel. A data loss may be perceived if the channel is published twice before the subscriber reads it. The second publication overwrites the value from the first. Thus, the subscriber will receive two notifications, but only the last data is there.
.. _zbus delivery sequence:
The message delivery will follow the precedence:
#. Observers defined in a channel using the :c:macro:ZBUS_CHAN_DEFINE (following the definition
sequence);
#. Observers defined using the :c:macro:ZBUS_CHAN_ADD_OBS based on the sequence priority
(parameter of the macro);
#. The latest is the runtime observers in the addition sequence using the
:c:func:zbus_chan_add_obs.
.. note:: The VDED will ignore all disabled observers or observations.
Usage
ZBus operation depends on channels and observers. Therefore, it is necessary to determine the
channel's message during the channel definition and its list of observers, which can be either
statically (in the channel definition), using the :c:macro:ZBUS_CHAN_ADD_OBS or at runtime (see
runtime observers_). A message is a regular C struct; the observer can be a listener
(synchronous), an async listener (asynchronous), a subscriber (asynchronous), or a message
subscriber (asynchronous).
The following code defines and initializes a regular channel and its dependencies. This channel exchanges accelerometer data, for example.
.. code-block:: c
struct acc_msg {
int x;
int y;
int z;
};
ZBUS_CHAN_DEFINE(acc_chan, /* Name */
struct acc_msg, /* Message type */
NULL, /* Validator */
NULL, /* User Data */
ZBUS_OBSERVERS(my_listener, my_subscriber,
my_msg_subscriber), /* observers */
ZBUS_MSG_INIT(.x = 0, .y = 0, .z = 0) /* Initial value */
);
void listener_callback_example(const struct zbus_channel *chan)
{
const struct acc_msg *acc;
if (&acc_chan == chan) {
acc = zbus_chan_const_msg(chan); // Direct message access
LOG_DBG("From listener -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z);
}
}
ZBUS_LISTENER_DEFINE(my_listener, listener_callback_example);
ZBUS_LISTENER_DEFINE(my_listener2, listener_callback_example);
ZBUS_CHAN_ADD_OBS(acc_chan, my_listener2, 3);
ZBUS_SUBSCRIBER_DEFINE(my_subscriber, 4);
void subscriber_task(void)
{
const struct zbus_channel *chan;
while (!zbus_sub_wait(&my_subscriber, &chan, K_FOREVER)) {
struct acc_msg acc = {0};
if (&acc_chan == chan) {
// Indirect message access
zbus_chan_read(&acc_chan, &acc, K_NO_WAIT);
LOG_DBG("From subscriber -> Acc x=%d, y=%d, z=%d", acc.x, acc.y, acc.z);
}
}
}
K_THREAD_DEFINE(subscriber_task_id, 512, subscriber_task, NULL, NULL, NULL, 3, 0, 0);
ZBUS_MSG_SUBSCRIBER_DEFINE(my_msg_subscriber);
static void msg_subscriber_task(void *ptr1, void *ptr2, void *ptr3)
{
ARG_UNUSED(ptr1);
ARG_UNUSED(ptr2);
ARG_UNUSED(ptr3);
const struct zbus_channel *chan;
struct acc_msg acc = {0};
while (!zbus_sub_wait_msg(&my_msg_subscriber, &chan, &acc, K_FOREVER)) {
if (&acc_chan == chan) {
LOG_INF("From msg subscriber -> Acc x=%d, y=%d, z=%d", acc.x, acc.y, acc.z);
}
}
}
K_THREAD_DEFINE(msg_subscriber_task_id, 1024, msg_subscriber_task, NULL, NULL, NULL, 3, 0, 0);
It is possible to add static observers to a channel using the :c:macro:ZBUS_CHAN_ADD_OBS. We call
that a post-definition static observer. The command enables us to indicate an initialization
priority that affects the observers' initialization order. The sequence priority param only affects
the post-definition static observers. There is no possibility to overwrite the message delivery
sequence of the static observers.
.. note:: It is unnecessary to claim/lock a channel before accessing the message inside the listener since the event dispatcher calls listeners with the notifying channel already locked. Subscribers, however, must claim/lock that or use regular read operations to access the message after being notified.
Channels can have a validator function that enables a channel to accept only valid messages.
Publish attempts invalidated by hard channels will return immediately with an error code. This
allows original creators of a channel to exert some authority over other developers/publishers who
may want to piggy-back on their channels. The following code defines and initializes a :dfn:hard channel and its dependencies. Only valid messages can be published to a :dfn:hard channel. It is
possible because a validator function was passed to the channel's definition. In this example,
only messages with move equal to 0, -1, and 1 are valid. Publish function will discard all other
values to move.
.. code-block:: c
struct control_msg {
int move;
};
bool control_validator(const void* msg, size_t msg_size) {
const struct control_msg* cm = msg;
bool is_valid = (cm->move == -1) || (cm->move == 0) || (cm->move == 1);
return is_valid;
}
static int message_count = 0;
ZBUS_CHAN_DEFINE(control_chan, /* Name */
struct control_msg, /* Message type */
control_validator, /* Validator */
&message_count, /* User data */
ZBUS_OBSERVERS_EMPTY, /* observers */
ZBUS_MSG_INIT(.move = 0) /* Initial value */
);
The following sections describe in detail how to use zbus features.
.. _publishing to a channel:
Messages are published to a channel in zbus by calling :c:func:zbus_chan_pub. For example, the
following code builds on the examples above and publishes to channel acc_chan. The code is
trying to publish the message acc1 to channel acc_chan, and it will wait up to one second
for the message to be published. Otherwise, the operation fails. As can be inferred from the code
sample, it's OK to use stack allocated messages since VDED copies the data internally.
.. code-block:: c
struct acc_msg acc1 = {.x = 1, .y = 1, .z = 1};
zbus_chan_pub(&acc_chan, &acc1, K_SECONDS(1));
.. warning::
Only use this function inside an ISR with a :c:macro:K_NO_WAIT timeout.
.. _reading from a channel:
Messages are read from a channel in zbus by calling :c:func:zbus_chan_read. So, for example, the
following code tries to read the channel acc_chan, which will wait up to 500 milliseconds to
read the message. Otherwise, the operation fails.
.. code-block:: c
struct acc_msg acc = {0};
zbus_chan_read(&acc_chan, &acc, K_MSEC(500));
.. warning::
Only use this function inside an ISR with a :c:macro:K_NO_WAIT timeout.
.. warning::
Choose the timeout of :c:func:zbus_chan_read after receiving a notification from
:c:func:zbus_sub_wait carefully because the channel will always be unavailable during the VDED
execution. Using K_NO_WAIT for reading is highly likely to return a timeout error if there
are more than one subscriber. For example, consider the VDED illustration again and notice how
S1 read attempts would definitely fail with K_NO_WAIT. For more details, check
the Virtual Distributed Event Dispatcher_ section.
It is possible to force zbus to notify a channel's observers by calling :c:func:zbus_chan_notify.
For example, the following code builds on the examples above and forces a notification for the
channel acc_chan. Note this can send events with no message, which does not require any data
exchange. See the code example under Claim and finish a channel_ where this may become useful.
.. code-block:: c
zbus_chan_notify(&acc_chan, K_NO_WAIT);
.. warning::
Only use this function inside an ISR with a :c:macro:K_NO_WAIT timeout.
For accessing channels or observers from files other than its defining files, it is necessary to
declare them by calling :c:macro:ZBUS_CHAN_DECLARE and :c:macro:ZBUS_OBS_DECLARE. In other
words, zbus channel definitions and declarations with the same channel names in different files
would point to the same (global) channel. Thus, developers should be careful about existing
channels, and naming new channels or linking will fail. It is possible to declare more than one
channel or observer on the same call. The following code builds on the examples above and displays
the defined channels and observers.
.. code-block:: c
ZBUS_OBS_DECLARE(my_listener, my_subscriber);
ZBUS_CHAN_DECLARE(acc_chan, version_chan);
To simplify integrations with external entities, it is possible to assign a unique numeric identifier
to a channel. Users can then retrieve the channel reference by using the identifier with
:c:func:zbus_chan_from_id, rather than needing to obtain the reference at compile time with
:c:macro:ZBUS_CHAN_DECLARE. Channels using this feature are declared with
:c:func:ZBUS_CHAN_DEFINE_WITH_ID.
.. code-block:: c
ZBUS_CHAN_DEFINE_WITH_ID(control_chan, /* Name */
0x12345678, /* Unique channel identifier */
struct control_msg, /* Message type */
control_validator, /* Validator */
&message_count, /* User data */
ZBUS_OBSERVERS_EMPTY, /* observers */
ZBUS_MSG_INIT(.move = 0) /* Initial value */
);
static void channel_retrieve(void)
{
const struct zbus_channel *chan = zbus_chan_from_id(0x12345678);
...
}
ZBus subsystem also implements :ref:Iterable Sections <iterable_sections_api> for channels and
observers, for which there are supporting APIs like :c:func:zbus_iterate_over_channels,
:c:func:zbus_iterate_over_channels_with_user_data, :c:func:zbus_iterate_over_observers and
:c:func:zbus_iterate_over_observers_with_user_data. This feature enables developers to call a
procedure over all declared channels, where the procedure parameter is a :c:struct:zbus_channel.
The execution sequence is in the alphabetical name order of the channels (see :ref:Iterable Sections <iterable_sections_api> documentation for details). ZBus also implements this feature for
:c:struct:zbus_observer.
.. code-block:: c
static bool print_channel_data_iterator(const struct zbus_channel *chan, void *user_data) { int *count = user_data;
LOG_INF("%d - Channel %s:", *count, zbus_chan_name(chan));
LOG_INF(" Message size: %d", zbus_chan_msg_size(chan));
LOG_INF(" Observers:");
++(*count);
struct zbus_channel_observation *observation;
for (int16_t i = *chan->observers_start_idx, limit = *chan->observers_end_idx; i < limit;
++i) {
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
LOG_INF(" - %s", observation->obs->name);
}
struct zbus_observer_node *obs_nd, *tmp;
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->observers, obs_nd, tmp, node) {
LOG_INF(" - %s", obs_nd->obs->name);
}
return true;
}
static bool print_observer_data_iterator(const struct zbus_observer *obs, void *user_data) { int *count = user_data;
LOG_INF("%d - %s %s", *count, obs->queue ? "Subscriber" : "Listener", zbus_obs_name(obs));
++(*count);
return true;
}
int main(void) { int count = 0;
LOG_INF("Channel list:");
zbus_iterate_over_channels_with_user_data(print_channel_data_iterator, &count);
count = 0;
LOG_INF("Observers list:");
zbus_iterate_over_observers_with_user_data(print_observer_data_iterator, &count);
return 0;
}
The code will log the following output:
.. code-block:: console
D: Channel list:
D: 0 - Channel acc_chan:
D: Message size: 12
D: Observers:
D: - my_listener
D: - my_subscriber
D: 1 - Channel version_chan:
D: Message size: 4
D: Observers:
D: Observers list:
D: 0 - Listener my_listener
D: 1 - Subscriber my_subscriber
.. _Claim and finish a channel:
ZBus was designed to be as flexible and extensible as possible. Thus, there are some features designed to provide some control and extensibility to the bus.
For performance purposes, listeners can access the receiving channel message directly since they
already have the channel locked for it. To access the channel's message, the listener should use the
:c:func:zbus_chan_const_msg because the channel passed as an argument to the listener function is
a constant pointer to the channel. The const pointer return type tells developers not to modify the
message.
.. code-block:: c
void listener_callback_example(const struct zbus_channel *chan)
{
const struct acc_msg *acc;
if (&acc_chan == chan) {
acc = zbus_chan_const_msg(chan); // Use this
// instead of zbus_chan_read(chan, &acc, K_MSEC(200))
// or zbus_chan_msg(chan)
LOG_DBG("From listener -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z);
}
}
Async listeners are implemented by utilizing the established message subscriber infrastructure. They are executed in a work queue context rather than in the publisher's context. When using the system work queue, the user will experience similar behavior to the regular listeners. Since the system work queue is, by default, a cooperative thread with a priority of -1, async listeners would run before all other application threads (usually preemptive) after the VDED execution.
Async listeners can access the receiving channel's message directly via the message copy reference passed to the callback. Be aware that the message copy is freed just after the async listener execution. To access the channel's message, the async listener should only cast to the proper constant message format. The following example demonstrates how to access the message from an async listener.
.. code-block:: c
static void async_listener_callback(const struct zbus_channel *chan, const void *message) { if (chan != &chan_event) { LOG_ERR("Unexpected channel"); return; }
const struct msg_event *msg = message;
LOG_INF("From async listener -> Evt=%d | %s", msg->type,
k_thread_name_get(k_current_get()));
}
It is possible to pass custom data into the channel's user_data for various purposes, such as
writing channel metadata. That can be achieved by passing a pointer to the channel definition
macro's user_data field, which will then be accessible by others. Note that user_data is
individual for each channel. Also, note that user_data access is not thread-safe. For
thread-safe access to user_data, see the next section.
To take more control over channels, two functions were added :c:func:zbus_chan_claim and
:c:func:zbus_chan_finish. With these functions, it is possible to access the channel's metadata
safely. When a channel is claimed, no actions are available to that channel. After finishing the
channel, all the actions are available again.
.. warning:: Never change the fields of the channel struct directly. It may cause zbus behavior inconsistencies and scheduling issues.
.. warning::
Only use this function inside an ISR with a :c:macro:K_NO_WAIT timeout.
The following code builds on the examples above and claims the acc_chan to set the user_data
to the channel. Suppose we would like to count how many times the channels exchange messages. We
defined the user_data to have the 32 bits integer. This code could be added to the listener code
described above.
.. code-block:: c
if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) {
int *message_counting = (int *) zbus_chan_user_data(&acc_chan);
*message_counting += 1;
zbus_chan_finish(&acc_chan);
}
The following code has the exact behavior of the code in :ref:publishing to a channel.
.. code-block:: c
if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) {
struct acc_msg *acc1 = (struct acc_msg *) zbus_chan_msg(&acc_chan);
acc1.x = 1;
acc1.y = 1;
acc1.z = 1;
zbus_chan_finish(&acc_chan);
zbus_chan_notify(&acc_chan, K_SECONDS(1));
}
The following code has the exact behavior of the code in :ref:reading from a channel.
.. code-block:: c
if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) {
const struct acc_msg *acc1 = (const struct acc_msg *) zbus_chan_const_msg(&acc_chan);
// access the acc_msg fields directly.
zbus_chan_finish(&acc_chan);
}
.. _runtime observers:
It is possible to add observers to channels in runtime. Set the
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS to enable the feature. This feature uses the heap to
allocate the nodes dynamically, a memory slab to allocate the nodes statically, or user-provided
nodes. It depends on the :kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC, which can be
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC,
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC, and
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE. The dynamic is the default. When
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC is enabled, you need to set the
number of runtime observers you are going to use by setting the
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE configuration. The following example
illustrates the runtime registration usage.
.. code-block:: c
ZBUS_LISTENER_DEFINE(my_listener, callback);
// ...
void thread_entry(void) {
// ...
/* Adding the observer to channel chan1 */
zbus_chan_add_obs(&chan1, &my_listener, K_NO_WAIT);
/* Removing the observer from channel chan1 */
zbus_chan_rm_obs(&chan1, &my_listener, K_NO_WAIT);
.. warning::
The :c:struct:zbus_observer_node can only be reused in :c:func:zbus_chan_add_obs_with_node after removing
the channel observer it was first associated with through :c:func:zbus_chan_rm_obs.
.. _zbus_proxy_agent:
Proxy Agent Communication (Experimental)
.. warning:: Proxy agent communication is experimental and may change without deprecation.
ZBus supports proxy agent forwarding, enabling message passing between different execution domains such as CPU cores or separate devices.
.. figure:: images/zbus_proxy_agent.svg :alt: ZBus proxy agent communication :width: 75%
.. Image illustrating zbus proxy agent communication between domains.
Proxy agent communication introduces several key concepts:
Proxy agents are set up in code using :c:macro:ZBUS_PROXY_AGENT_DEFINE, specifying the transport
backend and configuration parameters.
Channels are defined using standard zbus macros (:c:macro:ZBUS_CHAN_DEFINE or
:c:macro:ZBUS_CHAN_DEFINE_WITH_ID) and shadow channels use :c:macro:ZBUS_SHADOW_CHAN_DEFINE
to create read-only mirrors linked to specific proxy agents.
ZBus proxy agent communication relies on transport backends to forward messages between different execution domains.
The IPC backend forwards messages between CPU cores within the same system using Inter-Process Communication mechanisms.
See the :zephyr:code-sample:zbus-proxy-agent-ipc sample for a complete implementation.
CONFIG_ZBUS_PROXY_AGENT and the desired backend option... code-block:: c
#include <zephyr/zbus/proxy_agent/zbus_proxy_agent.h>
#include <zephyr/zbus/proxy_agent/zbus_proxy_agent_ipc.h>
#define IPC_DEV_NODE DT_NODELABEL(ipc0)
ZBUS_PROXY_AGENT_DEFINE(proxy_agent, /* Proxy agent name */
ZBUS_PROXY_AGENT_BACKEND_IPC, /* Proxy agent type */
IPC_DEV_NODE /* Backend node */
);
Where:
.. code-block:: c
ZBUS_CHAN_DEFINE(my_channel, struct my_msg, NULL, NULL,
ZBUS_OBSERVERS_EMPTY, ZBUS_MSG_INIT(0));
ZBUS_PROXY_ADD_CHAN(proxy_agent, my_channel);
zbus_chan_pub(&my_channel, &msg, K_MSEC(100));
Any message published to "my_channel" will be forwarded by "proxy_agent" to remote domains.
.. code-block:: c
ZBUS_SHADOW_CHAN_DEFINE(my_channel_shadow, struct my_msg,
proxy_agent, NULL, ZBUS_OBSERVERS_EMPTY,
ZBUS_MSG_INIT(0));
Where the shadow channel can be used like any regular channel, but is read-only. For example, adding a listener:
.. code-block:: c
void my_listener_cb(const struct zbus_channel *chan)
{
const struct my_msg *data = zbus_chan_const_msg(chan);
printk("Received: %d\n", data->data);
}
ZBUS_LISTENER_DEFINE(my_listener, my_listener_cb);
ZBUS_CHAN_ADD_OBS(my_channel_shadow, my_listener, 0);
Samples
For a complete overview of zbus usage, take a look at the samples. There are the following samples available:
zbus-hello-world illustrates the code used above in action;zbus-work-queue shows how to define and use different kinds of observers.
Note there is an example of using a work queue instead of executing the listener as an execution
option;zbus-msg-subscriber illustrates how to use message subscribers;zbus-async-listeners illustrates how to use async listeners;zbus-dyn-channel demonstrates how to use dynamically allocated exchanging
data in zbus;zbus-uart-bridge shows an example of sending the operation of the channel to
a host via serial;zbus-remote-mock illustrates how to implement an external mock (on the host)
to send and receive messages to and from the bus;zbus-priority-boost illustrates zbus priority boost feature with a priority
inversion scenario;zbus-runtime-obs-registration illustrates a way of using the runtime
observer registration feature;zbus-confirmed-channel implements a way of implement confirmed channel only
with subscribers;zbus-benchmark implements a benchmark with different combinations of inputs.zbus-proxy-agent-ipc demonstrates multi-core communication using IPC proxy agents;Suggested Uses
Use zbus to transfer data (messages) between threads in one-to-one, one-to-many, and many-to-many
synchronously or asynchronously. Choosing the proper observer type is crucial. Use subscribers for
scenarios that can tolerate message losses and duplications; when they cannot, use message
subscribers (if you need a thread) or listeners (if you need to be lean and fast). In addition to
the listener, another asynchronous message processing mechanism (like :ref:message queues <message_queues_v2>) may be necessary to retain the pending message until it gets processed.
For proxy agent scenarios, use zbus to enable communication across execution boundaries:
.. note:: ZBus can be used to transfer streams from the producer to the consumer. However, this can increase zbus' communication latency. So maybe consider a Pipe a good alternative for this communication topology.
Configuration Options
For enabling zbus, it is necessary to enable the :kconfig:option:CONFIG_ZBUS option.
Related configuration options:
:kconfig:option:CONFIG_ZBUS_PRIORITY_BOOST zbus Highest Locker Protocol implementation;
:kconfig:option:CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY determine the :c:macro:SYS_INIT
priority used by zbus to organize the channels observations by channel;
:kconfig:option:CONFIG_ZBUS_CHANNEL_NAME enables the name of channels to be available inside the
channels metadata. The log uses this information to show the channels' names;
:kconfig:option:CONFIG_ZBUS_OBSERVER_NAME enables the name of observers to be available inside
the channels metadata;
:kconfig:option:CONFIG_ZBUS_PREFER_DYNAMIC_ALLOCATION instructs zbus to
use dynamic allocation for its internals. That can be disabled by the user and tuned later;
:kconfig:option:CONFIG_ZBUS_MSG_SUBSCRIBER enables the message subscriber observer type;
:kconfig:option:CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC uses the heap to allocate message
buffers;
:kconfig:option:CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_STATIC uses the stack to allocate message
buffers;
:kconfig:option:CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE the available number of message
buffers to be used simultaneously;
:kconfig:option:CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION enables the developer to isolate
a pool for the message subscriber for a set of channels;
:kconfig:option:CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE the biggest message of zbus
channels to be transported into a message buffer;
:kconfig:option:CONFIG_HEAP_MEM_POOL_ADD_SIZE_ZBUS the reserved heap size for ZBus in a whole
including message buffer allocation;
:kconfig:option:CONFIG_ZBUS_ASYNC_LISTENER enables the async listener observer type;
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS enables the runtime observer registration;
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC allocate the runtime observers
dynamically using the heap;
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC allocate the runtime observers
statically using a memory slab;
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE the amount of enabled runtime
observers to statically allocate.
:kconfig:option:CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE use user-provided runtime
observers nodes;
:kconfig:option:CONFIG_ZBUS_PROXY_AGENT enable proxy agent communication support.
CONFIG_ZBUS_PROXY_AGENT_LOG_LEVEL log level for proxy agent communication;CONFIG_ZBUS_PROXY_AGENT_IPC enable IPC backend for proxy agent communication;CONFIG_ZBUS_PROXY_AGENT_IPC_LOG_LEVEL log level for IPC backend proxy agent
communication;CONFIG_ZBUS_PROXY_AGENT_MAX_MESSAGE_SIZE maximum message size for proxy agent
channels;CONFIG_ZBUS_PROXY_AGENT_MAX_CHANNEL_NAME_SIZE maximum size of channel names in
proxy agent communication;CONFIG_ZBUS_PROXY_AGENT_INIT_PRIORITY initialization priority for proxy agent
setup.CONFIG_ZBUS_PROXY_AGENT_WORK_QUEUE_STACK_SIZE stack size for the proxy agent
receive work queue thread;CONFIG_ZBUS_PROXY_AGENT_WORK_QUEUE_PRIORITY priority for the proxy agent receive
work queue thread.CONFIG_ZBUS_PROXY_AGENT_RX_QUEUE_DEPTH depth of the proxy agent receive queue
for incoming messages from remote domains.API Reference
.. doxygengroup:: zbus_apis