docs/decisions/0071-multi-agent-orchestration.md
The industry is moving up the stack to build more complex systems using LLMs. From interacting with foundation models to building RAG systems, and now creating single AI agents to perform more complex tasks, the desire for a multi-agent system is growing.
With the recent GA of the Semantic Kernel Agent Framework, which offers a stable agent abstraction/APIs and support for multiple agent services such as OpenAI Assistant and Chat Completion services, we are now able to build on top of it to create multi-agent systems. This will allow our customers to unlock even more complex scenarios.
In addition, the recent collaboration with the AutoGen team that resulted in the shared agent runtime abstraction allowed us to leverage their work as the foundation on which we can build our framework.
The current state of the Semantic Kernel Agent Framework is limited to single agents, i.e. agents cannot work collaboratively to solve user requests. We need to extend it to support multi-agent orchestration, which will allow our customers to unlock more possibilities using Semantic Kernel agents. Please refer to the Considerations section to see success criteria for this proposal.
Before we dive into the details, let's clarify some terminologies that will be used throughout this document.
| Term | Definition |
|---|---|
| Actor | An entity in the runtime that can send and receive messages. |
| Runtime | Facilitates the communication between actors and manages the states and lifecycle of them. |
| Runtime Abstraction | An abstraction that provides a common interface for different runtime implementations. |
| Agent | A Semantic Kernel agent. |
| Orchestration | Contains actors and rules on how they will interact with each others. |
We are using the term "actor" to avoid confusion with the term "agent" used in the Semantic Kernel Agent Framework. You may see the name "actor" used interchangeably with "agent" in the runtime documentation. To learn more about "actor"s in software design, please refer to: https://en.wikipedia.org/wiki/Actor_model.
You may hear the term "pattern" in other contexts. "Pattern" is almost semantically identical to "orchestration" where the latter implies the management and execution of patterns. You can also think of "patterns" as types of "orchestrations". For example, "concurrent orchestration" is a type of orchestration that follows the concurrent pattern.
The runtime abstraction serves as the foundational layer for the system. A basic understanding of the runtime is recommended. For more details, refer to the AutoGen Core User Guide.
The AutoGen team has built a runtime abstraction (along with an in-process runtime implementation) that supports pub-sub communication between actors in a system. We have had the opportunity to leverage this work, which led to a shared agent runtime abstraction which Semantic Kernel will depend on.
Depending on the actual runtime implementation, actors can be local or distributed. Our agent framework is not tied to a specific runtime implementation, a.k.a runtime agnostic.
The first version of the multi-agent orchestration framework will provide a set of pre-built orchestrations that cover the most common patterns listed below. As time goes on, we will add more orchestrations based on customer feedback and will allow customers to easily create their own orchestrations using the building blocks provided by the framework.
| Orchestrations | Description |
|---|---|
| Concurrent | Useful for tasks that will benefit from independent analysis from multiple agents. |
| Sequential | Useful for tasks that require a well-defined step-by-step approach. |
| Handoff | Useful for tasks that are dynamic in nature and don't have a well-defined step-by-step approach. |
| GroupChat | Useful for tasks that will benefit from inputs from multiple agents and a highly configurable conversation flow. |
| Magentic One | GroupChat like with a planner based manager. Inspired by Magentic One. |
Please see Appendix A for a more detailed descriptions of the pre-built orchestrations.
Using an orchestration should be as simple as the following:
agent_1 = ChatCompletionAgent(...)
agent_2 = ChatCompletionAgent(...)
group_chat = GroupChatOrchestration(members=[agent_1, agent_2], manager=RoundRobinGroupChatManager())
# The runtime can be a context manager for better resource management and developer experience.
# We may also consider using a factory to create a default runtime instance.
runtime = InProcessRuntime()
runtime.start()
orchestration_result = await group_chat.invoke(task="Hello world", runtime=runtime)
result = await orchestration_result.get(timeout=20)
print(result)
await runtime.stop_when_idle()
We should consider an orchestration as a template that describes how the agents will interact with each other similar to a directed graph. The actual execution of the orchestration should be done by the runtime. Therefore, the followings must be true:
An orchestration can be invoked multiple times and each invocation should be independent and isolated from each other. Invocations can also share the same runtime instance. This will require us to define clear invocation boundaries to avoid collisions, such as actor names or IDs.
For example, in the following code snippet, the task_1 and task_2 are independent and don't share any context:
agent_1 = ChatCompletionAgent(...)
agent_2 = ChatCompletionAgent(...)
group_chat = GroupChatOrchestration(members=[agent_1, agent_2], manager=RoundRobinGroupChatManager())
runtime = InProcessRuntime()
runtime.start()
task_1 = await group_chat.invoke(task=TASK_1, runtime=runtime)
task_2 = await group_chat.invoke(task=TASK_2, runtime=runtime)
result_1 = await task_1.get(timeout=20)
result_2 = await task_2.get(timeout=20)
await runtime.stop_when_idle()
We need the orchestrations to accept structured inputs and return structured outputs, so that they will be easier to work with from a code perspective. This will also make it easier for developers to work with orchestrations that are not chat-based (although internally the agents will still be chat-based).
Code snippets shown are not complete but they provide enough context to understand the proposal.
| Component | Details |
|---|---|
| Agent actor | - Semantic Kernel agent |
graph TD
%% Outer Block
subgraph Orchestration
subgraph Members[Members]
subgraph AA0[Agent Actor]
AG0[agent 0]
end
subgraph AA1[Agent Actor]
AG1[agent 1]
end
end
IT[Internal Topic]
OA[Optional Actor]
end
%% Connections
AA0 <-.Direct messaging.-> AA1
AA0 <-.Direct messaging.-> OA
AA1 <-.Direct messaging.-> OA
IT <-.Broadcast.-> AA0
IT <-.Broadcast.-> AA1
IT <-.Broadcast.-> OA
This is a wrapper around a Semantic Kernel agent so that the agent can send and receive messages from the runtime. The AgentActorBase will inherit the RoutedAgent class:
class AgentActorBase(RoutedAgent):
"""A agent actor for multi-agent orchestration running on Agent runtime."""
def __init__(self, agent: Agent) -> None:
"""Initialize the agent container.
Args:
agent (Agent): An agent to be run in the container.
"""
self._agent = agent
self._agent_thread = None
# Chat history to temporarily store messages before the agent thread is created
self._chat_history = ChatHistory()
RoutedAgent.__init__(self, description=agent.description or "Semantic Kernel Agent")
Orchestrations will have their own agent actor that extends the AgentActorBase because each orchestration can have its own set of message handlers.
To learn more about messages and message handlers, please refer to the AutoGen documentation.
For example, for the group chat orchestration, the agent actor will look like this:
class GroupChatAgentActor(AgentActorBase):
"""An agent actor for agents that process messages in a group chat."""
@message_handler
async def _handle_start_message(self, message: GroupChatStartMessage, ctx: MessageContext) -> None:
"""Handle the initial message(s) provided by the user."""
...
@message_handler
async def _handle_response_message(self, message: GroupChatResponseMessage, ctx: MessageContext) -> None:
"""Handle the response message from other agents in the group chat."""
...
@message_handler
async def _handle_request_message(self, message: GroupChatRequestMessage, ctx: MessageContext) -> None:
"""Handle the request message from the group manager."""
...
Agent actors in other orchestrations will handle different message types or different number of message types. This proposal doesn't make any restrictions on how agent actors interact with each other inside an orchestration, i.e. rules are defined by individual orchestrations.
The signature of the data transform logic will be as follows:
DefaultTypeAlias = ChatMessageContent | list[ChatMessageContent]
TIn = TypeVar("TIn", default=DefaultTypeAlias)
TOut = TypeVar("TOut", default=DefaultTypeAlias)
input_transform: Callable[[TIn], Awaitable[DefaultTypeAlias] | DefaultTypeAlias]
output_transform: Callable[[DefaultTypeAlias], Awaitable[TOut] | TOut]
TIn denotes the type of input the orchestration will take, while TOut denotes the type of output the orchestration will return to the caller. We will use ChatMessageContent and list[ChatMessageContent] as the default types. This means that the orchestration will accept a single chat message or a list of chat messages as input and return a single chat message or a list of chat messages as output.
We can offer a set of default transforms to improve the developer quality of life. We can also have LLMs that automatically perform the transforms given the types.
An orchestration is simply a collection of Semantic Kernel agents and the rules that govern how they will interact with each other. Concrete implementations have to provide logic for how to start and prepare an invocation of the orchestration. "Preparing" an invocation simply means registering the actors with the runtime and setting up the communication channels between them based on the orchestration type.
class OrchestrationBase(ABC, Generic[TIn, TOut]):
def __init__(
self,
members: list[Agent],
input_transform: Callable[[TIn], Awaitable[DefaultTypeAlias] | DefaultTypeAlias]
| None = None,
output_transform: Callable[[DefaultTypeAlias], Awaitable[TOut] | TOut] | None = None,
) -> None:
"""Initialize the orchestration base.
Args:
members (list[Agent]): The list of agents or orchestrations to be used.
input_transform (Callable | None): A function that transforms the external input message.
output_transform (Callable | None): A function that transforms the internal output message.
"""
...
async def invoke(
self,
task: str | DefaultTypeAlias | TIn,
runtime: AgentRuntime,
) -> OrchestrationResult:
"""Invoke the orchestration and return an result immediately which can be awaited later.
The runtime is supplied by the application at invocation time, not at creation time.
Orchestrations are runtime-agnostic and can be used with any runtime that implements the runtime abstraction.
"""
orchestration_result = OrchestrationResult[TOut]()
async def result_callback(result: DefaultTypeAlias) -> None:
"""Callback function that is called when the result is ready."""
...
...
# This unique topic type is used to isolate the invocation from others.
internal_topic_type = uuid.uuid4().hex
await self._prepare(runtime, internal_topic_type, result_callback)
...
await self._start(runtime, internal_topic_type, orchestration_result.cancellation_token)
return orchestration_result
@abstractmethod
async def _start(
self,
runtime: AgentRuntime,
internal_topic_type: str,
cancellation_token: CancellationToken,
) -> None:
...
@abstractmethod
async def _prepare(
self, runtime: AgentRuntime,
internal_topic_type: str,
result_callback: Callable[[DefaultTypeAlias], Awaitable[None]] | None = None,
) -> str:
...
When using the orchestration, the user will can optionally set TIn and TOut and provide the input and output transforms. For example, in Python, the user can do the following:
class MyTypeA:
pass
class MyTypeB:
pass
sequential_orchestration = SequentialOrchestration[MyTypeA, MyTypeB](
members=[agent_0, agent_1],
input_transform=input_transform_func,
output_transform=output_transform_func,
)
And depending on the language, we can offer defaults so that only advanced users will need to set TIn and TOut. For example, in Python, we can do the following:
DefaultTypeAlias = ChatMessageContent | list[ChatMessageContent]
TIn = TypeVar("TIn", default=DefaultTypeAlias)
TOut = TypeVar("TOut", default=DefaultTypeAlias)
And in .Net, we can do the following:
public class SequentialOrchestration<TIn, TOut> : AgentOrchestration<TIn, TOut>
{
...
}
public sealed class SequentialOrchestration : SequentialOrchestration<ChatMessageContent, ChatMessageContent>
{
...
}
The orchestration result will be represented as such:
class OrchestrationResult(KernelBaseModel, Generic[TOut]):
value: TOut | None = None
event: asyncio.Event = Field(default_factory=lambda: asyncio.Event())
cancellation_token: CancellationToken = Field(default_factory=lambda: CancellationToken())
async def get(self, timeout: float | None = None) -> TOut:
"""Get the result of the invocation.
Args:
timeout (float | None): The timeout in seconds. If None, wait indefinitely.
Raises:
TimeoutError: If the timeout is reached before the result is ready.
RuntimeError: If the invocation is cancelled.
Returns:
TOut: The result of the invocation.
"""
...
def cancel(self) -> None:
"""Cancel the invocation.
This method will cancel the invocation and set the cancellation token.
Actors that have received messages will continue to process them, but no new messages will be processed.
"""
...
The following items are important topics we need to consider and need further discussion. However, they shouldn't block the initial implementation of the multi-agent orchestration framework.
Definitions for resume and restart before proceeding:
Orchestrations can be long-running, hours, days, and even years. And they can be short-lived, minutes or seconds or less. The states of an orchestration can mean the following:
Resuming from an idle state will be handled by the runtime. The runtime is responsible for saving the state of the actors and rehydrating them when the orchestration is resumed.
Another type of states are the agents' conversational context. There is active work on agent threads and memories, and we should consider how these concepts fit into the framework. Ideally, we want the ability to restart an orchestration on some existing agent context. Please refer to Agent context for further discussion.
We mentioned in the State management section that orchestrations do not manage the state of the agents, while we do want to support the ability to invoke/restart an orchestration on some existing agent context. This means that we need to have a way to provide the state of the agents to the orchestrations.
An option is to have a context provider that provides agent contexts given an agent ID. The context provider will be attached to the agent actors for the agent actor to retrieve and update contexts. Each new invocation of an orchestration will return a text representation (see Support declarative orchestrations) of the orchestration, which can be used to rehydrate the orchestration.
We need a clear story for customers on how to handle errors in the runtime. The runtime is managed by the application. Orchestrations will not be able to capture errors that happen in the runtime and actor level.
The in_process runtime currently have a flag ignore_unhandled_exceptions which by default is set to True and can be set at construction time. Setting this flag to False will cause the runtime to stop and raise if an exception occurs during the execution.
It will get more complicated when we have distributed runtimes. We should also consider retries and idempotency at the runtime level.
Human-in-the-loop is a critical component in autonomous systems. We need to consider how to support human-in-the-loop in the multi-agent orchestration framework.
The group chat orchestration has an experimental feature that allows input from users. Please refer to the Group Chat Orchestration section for more details.
Composition allows users to take existing orchestrations and use them to build more powerful orchestrations. Think of replacing an agent in an orchestration with another orchestration. This will unlock more complex scenarios with less effort. However, this comes with challenges, including:
Although orchestrations are not tied to a specific runtime, we need to understand how actors and orchestrations will be distributed if a runtime allows distribution. The following questions need to be answered:
Declarative orchestrations provide a low-code solution for users. We are already working on declarative agents, and we can leverage that work to create declarative orchestrations.
Safety is also a priority. A powerful orchestration may accomplish a lot of things, but it may also do a lot of harm. We need to consider how to implement guardrails in the multi-agent orchestration framework, similar to what OpenAI has in their agent SDK.
SK being an enterprise solution, we should also consider observability.
We can consider adding a layer before the runtime that standardize all messages between actors for the following benefits:
The concurrent orchestration works in the following steps:
graph TD
%% Outer Block
subgraph Concurrent Orchestration
subgraph Members[Members]
AG0[agent 0]
AG1[agent 1]
end
IT[Internal Topic]
RC[Result Collector]
end
IT --> |ConcurrentRequestMessage| AG0
IT --> |ConcurrentRequestMessage| AG1
AG0 --> |ConcurrentResponseMessage| RC
AG1 --> |ConcurrentResponseMessage| RC
The sequential orchestration works in the following steps:
graph TD
%% Outer Block
subgraph Sequential Orchestration
subgraph Members[Members]
AG0[agent 0]
AG1[agent 1]
end
RC[Result Collector]
end
%% Connections
AG0 --> |SequentialRequestMessage| AG1
AG1 --> |SequentialResponseMessage| RC
The handoff orchestration works in the following steps:
graph TD
%% Outer Block
subgraph Handoff Orchestration
subgraph Members[Members]
AG0[agent 0]
AG1[agent 1]
end
IT[Internal Topic]
end
%% Connections
IT <--> |Broadcast| AG0
IT <--> |Broadcast| AG1
The group chat orchestration works in the following steps:
graph TD
%% Outer Block
subgraph Group Chat Orchestration
subgraph Members[Members]
AG0[agent 0]
AG1[agent 1]
end
IT[Internal Topic]
GM[Group Manager]
end
%% Connections
IT <--> |Broadcast| AG0
IT <--> |Broadcast| AG1
IT <--> |Broadcast| GM
The group chat manager is responsible for managing the conversation flow. It will have the following responsibilities:
class GroupChatManager(KernelBaseModel, ABC):
"""A group chat manager that manages the flow of a group chat."""
user_input_func: Callable[[ChatHistory], Awaitable[str]] | None = None
@abstractmethod
async def should_request_user_input(self, chat_history: ChatHistory) -> bool:
raise NotImplementedError
@abstractmethod
async def should_terminate(self, chat_history: ChatHistory) -> bool:
raise NotImplementedError
@abstractmethod
async def select_next_agent(self, chat_history: ChatHistory, participant_descriptions: dict[str, str]) -> str:
raise NotImplementedError
@abstractmethod
async def filter_results(self, chat_history: ChatHistory) -> ChatMessageContent:
raise NotImplementedError
Magentic one is a group chat-like orchestration with a special group manager. Refer to the Magentic One blog or paper for more details.