The goal of this post is to provide an in-depth look at standardizing how heterogeneous agents discover, authenticate, and communicate with one another by examining the Google's Agent-to-Agent (A2A) protocol [1] (covering both theory, practical deployment and security). The post will present the foundations of
The goal of this post is to provide an in-depth look at standardizing how heterogeneous agents discover, authenticate, and communicate with one another by examining the Google's Agent-to-Agent (A2A) protocol [1] (covering both theory, practical deployment and security).
The post will present the foundations of the A2A protocol, one of the most promising standards for agent communication. It provides the plumbing to build multi-agent ecosystems while addressing the interoperability problem by enabling agents developed using different stacks and frameworks to communicate seamlessly.
Anthropic's Model Context Protocol (MCP) [2] is built around a similar idea, standardizing how tools are delivered to agents, or more precisely, to foundation models or LLMs, to enable interaction with the external environment, that is, to initiate actions and go beyond text generation, while still avoiding the M × N integration bottleneck. In other words, MCP standardizes vertical interaction (agent-to-environment). In contrast, the A2A protocol focuses on standardizing horizontal interaction (agent-to-agent communication). Thus, A2A and MCP are complementary rather than competitive (i.e. an A2A agent may utilize MCP internally).
Both A2A and MCP provide significant benefits, including interoperability, extensibility, and scalability. However, A2A integration is more complex than MCP, making a solid understanding of its core concepts is critical for effective integration, secure adoption, smart coordination, and the development of reliable multi-agent systems (MAS). In this post, we will take a deep dive into its building blocks to confidently understand and leverage the A2A protocol.

The Agent-to-Agent or A2A is a protocol built on top of JSON-RPC, launched by Google in April 2025. It defines a standard for agent-to-agent communication, which is paramount for building scalable and secure multi-agent systems (MAS). Similar to how MCP standardizes tool discovery and calling, A2A defines a standardized protocol through which agents can communicate, discover each other's capabilities via standardized Agent Cards (from /.well-known/agent.json, discussed later), delegate tasks, and exchange messages to enable collaborative multitasking.
The core concepts of the A2A protocol includes Agent Cards, Messages, Tasks, and Artifacts, which will be covered in a moment.

An Agent Card is declarative metadata that describes an agent's identity, capabilities, interfaces and required authentication schemes.
class AgentCard(_message.Message):
__slots__ = ("name", "description", "supported_interfaces", "provider", "version", "documentation_url", "capabilities", "security_schemes", "security_requirements", "default_input_modes", "default_output_modes", "skills", "signatures", "icon_url")
....
agent_card = AgentCard(
name='Calendar Agent',
description="An agent that can manage a user's calendar",
url=f'http://{host}:{port}/',
version='1.0.0',
default_input_modes=['text'],
default_output_modes=['text'],
capabilities=AgentCapabilities(streaming=True),
skills=[skill],
security_schemes={OAUTH_SCHEME_NAME: SecurityScheme(root=oauth_scheme)},
# Declare that this scheme is required to use the agent's skills
security=[
{OAUTH_SCHEME_NAME: ['https://www.googleapis.com/auth/calendar']}
],
)
example from https://github.com/a2aproject/a2a-samples
Agent Cards enable autonomous agents to discover remote agents best suited for a the task at hand.

In A2A, every agent exposes an Agent Card describing its identity, capabilities, supported skills, and other metadata. The A2A server serves this description through the Agent Card endpoint. By sending a GET request to http://<a2a-base-url>/.well-known/agent-card.json, A2A clients receive a structured data describing the agent. The JSON response looks as follows:
{
"name": "Hello World Agent",
"description": "Just a hello world agent",
"supportedInterfaces": [
{
"url": "http://127.0.0.1:9999",
"protocolBinding": "JSONRPC"
}
],
"version": "0.0.1",
"capabilities": {
"streaming": true,
"extendedAgentCard": true
},
"defaultInputModes": [
"text/plain"
],
"defaultOutputModes": [
"text/plain"
],
"skills": [
{
"id": "echo_bot",
"name": "Echo Bot",
"description": "An example agent that acknowledges client request and responds with a \\"
Hello
World
\
\
" message.",
"tags": [
"a2a",
"echo-example"
],
"examples": [
"hi",
"how are you"
],
"inputModes": [
"text/plain"
],
"outputModes": [
"text/plain"
]
}
],
"preferredTransport": "JSONRPC",
"protocolVersion": "0.3",
"supportsAuthenticatedExtendedCard": true,
"url": "http://127.0.0.1:9999"
}Using the Agent Cards of remote agents, orchestrating, routing, or planning agents can dynamically discover available capabilities and delegate subtasks to the most appropriate subagent. For example, an orchestrator may first retrieve the Agent Cards of available A2A agents and use the names, descriptions, skills, and capabilities to determine which agent is best suited for a given task or query. In LLM-based orchestrators, this information is typically incorporated into the model's execution context typically the system prompt allowing the language model to decide when and which delegation is appropriate. The ability to communicate with a remote agent can be exposed as a tool or subagent, with each remote connection backed by an A2AClient responsible for communicating with the target A2A agent. Once the remote agent returns a response, the orchestrator injects the result back into the conversation, allowing the language model to continue reasoning as if it had invoked any other tool. Many other orchestration patterns are possible.
The A2A Server exposes an HTTPS endpoint implementing the A2A methods defined by the protocol over JSON-RPC 2.0, supporting both streaming and non-streaming interactions. It receives incoming requests from A2A clients, manages the A2A task lifecycle, including task creation, execution, state transitions, and subscriptions and returns responses through the A2A protocol. The server-side request handler is responsible for coordinating task execution with the underlying agent and maintaining the task state throughout its lifecycle.

In the non-streaming interaction mode, a client invokes the SendMessage method using a standard JSON-RPC request.
async def send_message(
self,
request: SendMessageRequest,
*,
context: ClientCallContext | None = None,
) -> SendMessageResponse:
"""Sends a non-streaming message request to the agent."""
rpc_request = JSONRPC20Request(
method='SendMessage',
params=json_format.MessageToDict(request),
_id=str(uuid4()),
)
The server returns either a Message or a Task:
@abstractmethod
async def on_message_send(
self,
params: SendMessageRequest,
context: ServerCallContext,
) -> Task | Message: A Task includes fields such as id, contextId, status, artifacts, history, and metadata
class Task(_message.Message):
__slots__ = ("id", "context_id", "status", "artifacts", "history", "metadata")while a Message includes fields such as messageId, contextId, taskId, role, parts, metadata, extensions, and referenceTaskIds.
class Message(_message.Message):
__slots__ = ("message_id", "context_id", "task_id", "role", "parts", "metadata", "extensions", "reference_task_ids") Message and Task objects will be discussed in more details later.
Specifically, in a non-streaming mode, the task is retuned when it transitions to terminal or interrupted states.
@validate_request_params
async def on_message_send( # noqa: D102
self,
params: SendMessageRequest,
context: ServerCallContext,
) -> Message | Task:
active_task, request_context = await self._setup_active_task(
params, context
)
task_id = cast('str', request_context.task_id)
result: Message | Task | None = None
async for raw_event in active_task.subscribe(
request=request_context,
include_initial_task=False,
replace_status_update_with_task=True,
):
event = raw_event
logger.debug(
'Processing[%s] event [%s] %s',
params.message.task_id,
type(event).__name__,
event,
)
if isinstance(event, Task) and (
params.configuration.return_immediately
or event.status.state
in (TERMINAL_TASK_STATES | INTERRUPTED_TASK_STATES)
):
self._validate_task_id_match(task_id, event.id)
result = event
# DO break here as it's "return_immediately".
# AgentExecutor will continue to run in the background.
break
if isinstance(event, Message):
result = event
# Do NOT break here as Message is supposed to be the only
# event in "Message-only" interaction.
# ActiveTask consumer (see active_task.py) validates the event
# stream and raises InvalidAgentResponseError if more events are
# pushed after a Message.In the streaming interaction mode, a client uses the SendStreamingMessage method using a JSON-RPC request.
async def send_message_streaming(
self,
request: SendMessageRequest,
*,
context: ClientCallContext | None = None,
) -> AsyncGenerator[StreamResponse]:
"""Sends a streaming message request to the agent and yields responses as they arrive."""
rpc_request = JSONRPC20Request(
method='SendStreamingMessage',
params=json_format.MessageToDict(request),
_id=str(uuid4()),
)
async for event in self._send_stream_request(
dict(rpc_request.data),
context,
):
yield eventRather than returning a single response, the server establishes a Server-Sent Events (SSE) stream and sends incremental updates as the task progresses (as shown above, the events are produced using AsyncGenerator ).

The updates are emitted as A2A events, such as TaskStatusUpdateEvent, which reports task state transitions, and TaskArtifactUpdateEvent, which delivers newly generated artifacts as they become available. The stream remains open until the task reaches a terminal state.
async def on_message_send_stream( # noqa: D102
self,
params: SendMessageRequest,
context: ServerCallContext,
) -> AsyncGenerator[Event, None]:
active_task, request_context = await self._setup_active_task(
params, context
)
task_id = cast('str', request_context.task_id)
async for event in active_task.subscribe(
request=request_context,
include_initial_task=False,
):
# Do NOT break here as we rely on AgentExecutor to yield control.
# ActiveTask consumer (see active_task.py) validates the event
# stream and raises InvalidAgentResponseError on misbehaving agents:
# - an event after a Message
# - Message after entering task mode
# - an event after a terminal state
if isinstance(event, Task):
self._validate_task_id_match(task_id, event.id)
yield apply_history_length(event, params.configuration)
else:
yield eventThe A2A Client is typically an LLM-based application or an autonomous agent that communicates with remote agents by sending A2A JSON-RPC requests to an A2A server. The client discovers the target agent through its Agent Card as explained earlier, constructing protocol-compliant requests, sending tasks or messages, and handling the returned A2A responses or streamed updates from the agent server.
@trace_class(kind=SpanKind.CLIENT)
class JsonRpcTransport(ClientTransport):
"""A JSON-RPC transport for the A2A client."""
def __init__(
self,
httpx_client: httpx.AsyncClient,
agent_card: AgentCard,
url: str,
):
"""Initializes the JsonRpcTransport."""
self.url = url
self.httpx_client = httpx_client
self.agent_card = agent_card
async def send_message(
self,
request: SendMessageRequest,
*,
context: ClientCallContext | None = None,
) -> SendMessageResponse:
"""Sends a non-streaming message request to the agent."""
rpc_request = JSONRPC20Request(
method='SendMessage',
params=json_format.MessageToDict(request),
_id=str(uuid4()),
)
response_data = await self._send_request(
dict(rpc_request.data), context
)
json_rpc_response = JSONRPC20Response(**response_data)
if json_rpc_response.error:
raise self._create_jsonrpc_error(json_rpc_response.error)
response: SendMessageResponse = json_format.ParseDict(
json_rpc_response.result, SendMessageResponse()
)
return responseHere is what a request looks like:
POST /rpc HTTP/1.1
Host: agent.example.com
Content-Type: application/json
Authorization: Bearer token
A2A-Version: 0.3
A2A-Extensions: https://example.com/extensions/geolocation/v1,https://standards.org/extensions/citations/v1
{
"jsonrpc": "2.0",
"id": 1,
"method": "SendMessage",
"params": { /* SendMessageRequest */ }
}Then on the server side, the routing is performed by the request handler:
@validate_version(constants.PROTOCOL_VERSION_1_0)
async def _process_non_streaming_request( # noqa: PLR0911
self,
request_obj: Any,
context: ServerCallContext,
) -> dict[str, Any] | None:
"""Processes non-streaming requests.
Args:
request_obj: The proto request message.
context: The ServerCallContext for the request.
Returns:
A dict containing the result or error.
"""
method = context.state.get('method')
match method:
case 'SendMessage':
return await self._handle_send_message(request_obj, context)
case 'CancelTask':
return await self._handle_cancel_task(request_obj, context)
case 'GetTask':
return await self._handle_get_task(request_obj, context)
...A Task is the fundamental unit of work in the A2A protocol.
class Task(_message.Message):
__slots__ = ("id", "context_id", "status", "artifacts", "history", "metadata")
ID_FIELD_NUMBER: _ClassVar[int]
CONTEXT_ID_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
ARTIFACTS_FIELD_NUMBER: _ClassVar[int]
HISTORY_FIELD_NUMBER: _ClassVar[int]
METADATA_FIELD_NUMBER: _ClassVar[int]
id: str
context_id: str
status: TaskStatus
artifacts: _containers.RepeatedCompositeFieldContainer[Artifact]
history: _containers.RepeatedCompositeFieldContainer[Message]
metadata: _struct_pb2.Struct
def __init__(self, id: _Optional[str] = ..., context_id: _Optional[str] = ..., status: _Optional[_Union[TaskStatus, _Mapping]] = ..., artifacts: _Optional[_Iterable[_Union[Artifact, _Mapping]]] = ..., history: _Optional[_Iterable[_Union[Message, _Mapping]]] = ..., metadata: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ...) -> None: ...
Every request submitted to an agent is associated with a task that represents the lifecycle of that piece of work.
@validate_request_params
async def on_message_send( # noqa: D102
self,
params: SendMessageRequest,
context: ServerCallContext,
) -> Message | Task:
active_task, request_context = await self._setup_active_task(
params, context
)When a request is received, the server first determines whether it references an existing active task. If so, the request is associated with that task; otherwise, a new task is created and persisted in the task store, allowing its state and results to be retrieved throughout its lifetime.
active_task = await self._active_task_registry.get_or_create(
task_id,
context_id=context_id,
call_context=call_context,
create_task_if_missing=True,
)A task progresses through a well-defined lifecycle represented by a set of standardized states:
class TaskStatus(_message.Message):
__slots__ = ("state", "message", "timestamp")
STATE_FIELD_NUMBER: _ClassVar[int]
MESSAGE_FIELD_NUMBER: _ClassVar[int]
TIMESTAMP_FIELD_NUMBER: _ClassVar[int]
state: TaskState
message: Message
timestamp: _timestamp_pb2.Timestamp
def __init__(self, state: _Optional[_Union[TaskState, str]] = ..., message: _Optional[_Union[Message, _Mapping]] = ..., timestamp: _Optional[_Union[datetime.datetime, _timestamp_pb2.Timestamp, _Mapping]] = ...) -> None: ...
...
class TaskState(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
TASK_STATE_UNSPECIFIED: _ClassVar[TaskState]
TASK_STATE_SUBMITTED: _ClassVar[TaskState]
TASK_STATE_WORKING: _ClassVar[TaskState]
TASK_STATE_COMPLETED: _ClassVar[TaskState]
TASK_STATE_FAILED: _ClassVar[TaskState]
TASK_STATE_CANCELED: _ClassVar[TaskState]
TASK_STATE_INPUT_REQUIRED: _ClassVar[TaskState]
TASK_STATE_REJECTED: _ClassVar[TaskState]
TASK_STATE_AUTH_REQUIRED: _ClassVar[TaskState]A TaskManager is typically used to manage a task's lifecycle during execution of a request. It references a TaskStore, which is used to for fetching, saving, and updating the Task based on events produced by the underlying agent.
class TaskManager:
...
async def get_task(self) -> Task | None:
"""Retrieves the current task object, either from memory or the store.
If `task_id` is set, it first checks the in-memory `_current_task`,
then attempts to load it from the `task_store`.
Returns:
The `Task` object if found, otherwise `None`.
"""
if not self.task_id:
logger.debug('task_id is not set, cannot get task.')
return None
if self._current_task:
return self._current_task
logger.debug(
'Attempting to get task from store with id: %s', self.task_id
)
self._current_task = await self.task_store.get(
self.task_id, self._call_context
)
if self._current_task:
logger.debug('Task %s retrieved successfully.', self.task_id)
else:
logger.debug('Task %s not found.', self.task_id)
return self._current_taskThe response to a client request in A2A can be either a Task or a Message. In non-streaming mode, when a task completes successfully, the server typically returns the completed task, which contains the final state, any generated artifacts, and the agent's response.
async def _handle_send_message(
self, request_obj: SendMessageRequest, context: ServerCallContext
) -> dict[str, Any]:
task_or_message = await self.request_handler.on_message_send(
request_obj, context
)
if isinstance(task_or_message, Task):
return MessageToDict(SendMessageResponse(task=task_or_message))
return MessageToDict(SendMessageResponse(message=task_or_message))Alternatively, the server may return a standalone Message without exposing too much data and thus minimizing unnecessary exposure of sensitive information [4]; e.g for simple conversational exchanges where the client only requires the immediate response rather than the complete task lifecycle.
def new_agent_message(
self,
parts: list[Part],
metadata: dict[str, Any] | None = None,
) -> Message:
"""Creates a new message object sent by the agent for this task/context.
Note: This method only *creates* the message object. It does not
automatically enqueue it.
Args:
parts: A list of `Part` objects for the message content.
metadata: Optional metadata for the message.
Returns:
A new `Message` object.
"""
return Message(
role=Role.ROLE_AGENT,
task_id=self.task_id,
context_id=self.context_id,
message_id=self._message_id_generator.generate(
IDGeneratorContext(
task_id=self.task_id, context_id=self.context_id
)
),
metadata=metadata,
parts=parts,
)
As shown above, each message consists of a role (e.g., ROLE_USER for client or ROLE_AGENT for the server) and one or more parts.
request_message = Message(
role=Role.user,
parts=[Part(root=TextPart(text=message))],
message_id=message_id,
context_id=context_id,
task_id=task_id,
)A part represents a unit of content contained within the message or an artifact (e.g outputs generated by the A2A agent) and may be for example a TextPart, FilePart or DataPart for a plain text, files and structured data respectively.
async def convert_part(part: Part, tool_context: ToolContext) -> object:
"""Convert a single A2A Part into an ADK-friendly representation."""
if part.root.kind == 'text':
return part.root.text
if part.root.kind == 'data':
return part.root.data
if part.root.kind == 'file':
# Repackage A2A FilePart to google.genai Blob
# Currently not considering plain text as files
file_id = part.root.file.name
file_bytes = base64.b64decode(part.root.file.bytes)
file_part = types.Part(
inline_data=types.Blob(
mime_type=part.root.file.mime_type, data=file_bytes
)
)
await tool_context.save_artifact(file_id, file_part)
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
return DataPart(data={'artifact-file-id': file_id})
return f'Unknown type: {part.kind}'Example from a2a-samples repositroy
Note that even when the client receives only a standalone message, the server still represents the interaction internally as an active task within the A2A framework. The task exists to manage execution and state on the server side but is simply not entirely exposed to the client in the response; the task id is still surfaced in the message payload as listed below:
class Message(_message.Message):
__slots__ = ("message_id", "context_id", "task_id", "role", "parts", "metadata", "extensions", "reference_task_ids")The A2A protocol also defines a mechanism for servers to push asynchronous notifications to clients through webhooks.
PushNotificationEvent = Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent
class PushNotificationSender(ABC):
"""Interface for sending push notifications for tasks."""
@abstractmethod
async def send_notification(
self, task_id: str, event: PushNotificationEvent
) -> None:
"""Sends a push notification containing the latest task state.""" Instead of continuously polling for task updates, an A2A client can provide a webhook URL where the server sends notifications whenever significant task events occur, such as a task reaching a terminal state, transitioning to the input_required state or an artifact is produced.
async def _update_task_state(
self,
updated_task: Task,
event: Task
| TaskStatusUpdateEvent
| TaskArtifactUpdateEvent
| PushNotificationEvent,
) -> None:
is_terminal = updated_task.status.state in TERMINAL_TASK_STATES
if is_terminal:
await self._handle_terminal_state(updated_task)
if (
self.active_task._push_sender
and self.active_task._task_id
and isinstance(event, PushNotificationEvent)
):
logger.debug(
'Consumer[%s]: Sending push notification',
self.active_task._task_id,
)
await self.active_task._push_sender.send_notification(
self.active_task._task_id, event
)Upon receiving a push notification, the A2A client should first validate the notification payload and then invoke the GetTask JSON-RPC method using the taskId contained in the notification in order to retrieve the latest state of the task, including any updated artifacts, messages, or status information.
Before push notifications can be used, the A2A server must support the feature in its Agent Card by setting capabilities.pushNotifications to true.
class AgentCapabilities(_message.Message):
__slots__ = ("streaming", "push_notifications", "extensions", "extended_agent_card")
STREAMING_FIELD_NUMBER: _ClassVar[int]
PUSH_NOTIFICATIONS_FIELD_NUMBER: _ClassVar[int]
EXTENSIONS_FIELD_NUMBER: _ClassVar[int]
EXTENDED_AGENT_CARD_FIELD_NUMBER: _ClassVar[int]
streaming: bool
push_notifications: bool
extensions: _containers.RepeatedCompositeFieldContainer[AgentExtension]
extended_agent_card: bool
def __init__(self, streaming: _Optional[bool] = ..., push_notifications: _Optional[bool] = ..., extensions: _Optional[_Iterable[_Union[AgentExtension, _Mapping]]] = ..., extended_agent_card: _Optional[bool] = ...) -> None: ...
The client can then provide a TaskPushNotificationConfig
class TaskPushNotificationConfig(_message.Message):
__slots__ = ("tenant", "id", "task_id", "url", "token", "authentication")
TENANT_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
TASK_ID_FIELD_NUMBER: _ClassVar[int]
URL_FIELD_NUMBER: _ClassVar[int]
TOKEN_FIELD_NUMBER: _ClassVar[int]
AUTHENTICATION_FIELD_NUMBER: _ClassVar[int]
tenant: str
id: str
task_id: str
url: str
token: str
authentication: AuthenticationInfo
def __init__(self, tenant: _Optional[str] = ..., id: _Optional[str] = ..., task_id: _Optional[str] = ..., url: _Optional[str] = ..., token: _Optional[str] = ..., authentication: _Optional[_Union[AuthenticationInfo, _Mapping]] = ...) -> None: ...
either as part of the initial request when creating the task
class SendMessageConfiguration(_message.Message):
__slots__ = ("accepted_output_modes", "task_push_notification_config", "history_length", "return_immediately")
ACCEPTED_OUTPUT_MODES_FIELD_NUMBER: _ClassVar[int]
TASK_PUSH_NOTIFICATION_CONFIG_FIELD_NUMBER: _ClassVar[int]
HISTORY_LENGTH_FIELD_NUMBER: _ClassVar[int]
RETURN_IMMEDIATELY_FIELD_NUMBER: _ClassVar[int]
accepted_output_modes: _containers.RepeatedScalarFieldContainer[str]
task_push_notification_config: TaskPushNotificationConfig
history_length: int
return_immediately: bool
def __init__(self, accepted_output_modes: _Optional[_Iterable[str]] = ..., task_push_notification_config: _Optional[_Union[TaskPushNotificationConfig, _Mapping]] = ..., history_length: _Optional[int] = ..., return_immediately: _Optional[bool] = ...) -> None: ...
or later by invoking the CreateTaskPushNotificationConfig JSON-RPC method for an existing task.
async def _handle_create_task_push_notification_config(
self,
request_obj: TaskPushNotificationConfig,
context: ServerCallContext,
) -> dict[str, Any]:
result_config = (
await self.request_handler.on_create_task_push_notification_config(
request_obj, context
)
)
return MessageToDict(result_config, preserving_proto_field_name=False)
...
@validate_request_params
@validate(
lambda self: self._agent_card.capabilities.push_notifications,
error_message='Push notifications are not supported by the agent',
error_type=PushNotificationNotSupportedError,
)
async def on_create_task_push_notification_config( # noqa: D102
self,
params: TaskPushNotificationConfig,
context: ServerCallContext,
) -> TaskPushNotificationConfig:
if not self._push_config_store:
raise PushNotificationNotSupportedError
task_id = params.task_id
task: Task | None = await self.task_store.get(task_id, context)
if not task:
raise TaskNotFoundError
await self._push_config_store.set_info(
task_id,
params,
context,
)
return paramsA2A clients communicate with an A2A server through JSON-RPC requests sent over HTTP POST. The server exposes a JSON-RPC endpoint that routes each request to the appropriate handler based on the rpc method surfaced in the request payload.
# Method-to-model mapping for centralized routing
# Proto types don't have model_fields, so we define the mapping explicitly
# Method names match gRPC service method names
METHOD_TO_MODEL: dict[str, type] = {
'SendMessage': SendMessageRequest,
'SendStreamingMessage': SendMessageRequest, # Same proto type as SendMessage
'GetTask': GetTaskRequest,
'ListTasks': ListTasksRequest,
'CancelTask': CancelTaskRequest,
'CreateTaskPushNotificationConfig': TaskPushNotificationConfig,
'GetTaskPushNotificationConfig': GetTaskPushNotificationConfigRequest,
'ListTaskPushNotificationConfigs': ListTaskPushNotificationConfigsRequest,
'DeleteTaskPushNotificationConfig': DeleteTaskPushNotificationConfigRequest,
'SubscribeToTask': SubscribeToTaskRequest,
'GetExtendedAgentCard': GetExtendedAgentCardRequest,
}For example, when the method is SendMessage, the client is sending a message to the remote agent in a non-streaming mode. Other request types also exist, such as SubscribeToTask, which allows a client to subscribe to an active A2A task and receive updates while the underlying agent processes the request, similarly to streaming message execution.
Let us focus on the SendMessage request which does not require streaming i.e the connection is not upgraded, and only one response is eventually returned to the client. That response can be either a Task or a Message.
raw_result = await self._process_non_streaming_request(
specific_request, call_context
)
handler_result = JSONRPC20Response(
result=raw_result, _id=request_id
).dataEach A2A request is associated with an A2A task. Therefore, the first responsibility of the handler is to set up the task that will process the request.
If the client request includes a taskId, the server attempts to resolve it as an existing task.
async def get_or_create(
self,
task_id: str,
call_context: ServerCallContext,
context_id: str | None = None,
create_task_if_missing: bool = False,
) -> ActiveTask:
"""Retrieves an existing ActiveTask or creates a new one."""
async with self._lock:
if task_id in self._active_tasks:
return self._active_tasks[task_id]Tasks are stored in a task store and can be retrieved when needed. Active tasks are also maintained in memory through an ActiveTaskRegistry.
class ActiveTaskRegistry:
"""A registry for active ActiveTask instances."""
def __init__(
self,
agent_executor: AgentExecutor,
task_store: TaskStore,
push_sender: PushNotificationSender | None = None,
):
self._agent_executor = agent_executor
self._task_store = task_store
self._push_sender = push_sender
self._active_tasks: dict[str, ActiveTask] = {}
self._lock = asyncio.Lock()
self._cleanup_tasks: set[asyncio.Task[None]] = set()If the request does not refer to a known task, a new ActiveTask instance is created, along with a new requestId (and contextId).
async def get_or_create(
self,
task_id: str,
call_context: ServerCallContext,
context_id: str | None = None,
create_task_if_missing: bool = False,
) -> ActiveTask:
"""Retrieves an existing ActiveTask or creates a new one."""
async with self._lock:
if task_id in self._active_tasks:
return self._active_tasks[task_id]
task_manager = TaskManager(
task_id=task_id,
context_id=context_id,
task_store=self._task_store,
initial_message=None,
context=call_context,
)
active_task = ActiveTask(
agent_executor=self._agent_executor,
task_id=task_id,
task_manager=task_manager,
push_sender=self._push_sender,
on_cleanup=self._on_active_task_cleanup,
)
self._active_tasks[task_id] = active_task
await active_task.start(
call_context=call_context,
create_task_if_missing=create_task_if_missing,
)
return active_taskThe active task initializes three internal queues:
class ActiveTask:
"""Manages the lifecycle and execution of an active A2A task.
It coordinates between the agent's execution (the producer), the
persistence and state management (the TaskManager), and the event
distribution to subscribers (the consumer).
Concurrency Guarantees:
- This class is designed to be highly concurrent. It manages an internal
producer-consumer model using `asyncio.Task`s.
- `self._lock` (asyncio.Lock) ensures mutually exclusive access for critical
lifecycle state changes, such as starting the task, subscribing, and
determining if cleanup is safe to trigger.
- `self._is_finished` (asyncio.Event) provides a thread-safe, non-blocking way
for external observers and internal loops to check if the ActiveTask has
permanently ceased execution and closed its queues.
"""
def __init__(
self,
agent_executor: AgentExecutor,
task_id: str,
task_manager: TaskManager,
push_sender: PushNotificationSender | None = None,
on_cleanup: Callable[[ActiveTask], None] | None = None,
) -> None:
"""Initializes the ActiveTask.
Args:
agent_executor: The executor to run the agent logic (producer).
task_id: The unique identifier of the task being managed.
task_manager: The manager for task state and database persistence.
push_sender: Optional sender for out-of-band push notifications.
on_cleanup: Optional callback triggered when the task is fully finished
and the last subscriber has disconnected. Used to prune
the task from the ActiveTaskRegistry.
"""
# --- Core Dependencies ---
self._agent_executor = agent_executor
self._task_id = task_id
self._event_queue_agent = EventQueueSource()
self._event_queue_subscribers = EventQueueSource(
create_default_sink=False
)
self._task_manager = task_manager
self._push_sender = push_sender
self._on_cleanup = on_cleanup
# --- Synchronization Primitives ---
# `_lock` protects structural lifecycle changes: start(), subscribe() counting,
# and _maybe_cleanup() race conditions.
self._lock = asyncio.Lock()
# `_request_lock` protects parallel request processing.
self._request_lock = asyncio.Lock()
# _task_created is set when initial version of task is stored in DB.
self._task_created = asyncio.Event()
# `_is_finished` is set EXACTLY ONCE when the consumer loop exits, signifying
# the absolute end of the task's active lifecycle.
self._is_finished = asyncio.Event()
# --- Lifecycle State ---
# The background task executing the agent logic.
self._producer_task: asyncio.Task[None] | None = None
# The background task reading from _event_queue and updating the DB.
self._consumer_task: asyncio.Task[None] | None = None
# Tracks how many active SSE/gRPC streams are currently tailing this task.
# Protected by `_lock`.
self._reference_count = 0
# Queue for incoming requests
self._request_queue: AsyncQueue[tuple[RequestContext, uuid.UUID]] = (
create_async_queue()
)_request_queue receives requests to be processed by the agent _event_queue_agent stores events produced by the agent and _event_queue_subscribers, which forwards agent events to registered subscribers tailing the task.
A TaskManager is then associated with the active task. It is responsible for retrieving the corresponding task, either from the persistent task store or directly from memory.
Next, two background schedules are started. The first drains the _request_queue, executes the agent loop through the AgentExecutor; produced responses are pushed into _event_queue_agent.
async def _run_producer(self) -> None:
"""Executes the agent logic.
This method encapsulates the external `AgentExecutor.execute` call. It ensures
that regardless of how the agent finishes (success, unhandled exception, or
cancellation), the underlying `_event_queue` is safely closed, which signals
the consumer to wind down.
Concurrency Guarantee:
Runs as a detached asyncio.Task. Safe to cancel.
"""
logger.debug('Producer[%s]: Started', self._task_id)
request_context = None
try:
while True:
(
request_context,
request_id,
) = await self._request_queue.get()
await self._request_lock.acquire()
# TODO: Should we create task manager every time?
self._task_manager._call_context = request_context.call_context
request_context.current_task = (
await self._task_manager.get_task()
)
logger.debug(
'Producer[%s]: Executing agent task %s',
self._task_id,
request_context.current_task,
)
try:
await self._event_queue_agent.enqueue_event(
cast(
'Event',
_RequestStarted(request_id, request_context),
)
)
await self._agent_executor.execute(
request_context, self._event_queue_agent
)
logger.debug(
'Producer[%s]: Execution finished successfully',
self._task_id,
)
await self._event_queue_agent.enqueue_event(
cast('Event', _RequestCompleted(request_id))
)
finally:
self._request_queue.task_done()The second schedule drains _event_queue_agent, processes each event, updates the task state when needed, enriches events with the latest task object, and forwards them to subscribers.
async def _run_consumer(self) -> None:
"""Consumes events from the agent and updates system state."""
try:
await EventConsumer(self).run()
finally:
self._is_finished.set()
self._request_queue.shutdown(immediate=True)
await self._event_queue_agent.close(immediate=True)
async with self._lock:
self._reference_count -= 1
logger.debug('Consumer[%s]: Finishing', self._task_id)
await self._maybe_cleanup()
...
class EventConsumer:
"""Consumes events from the agent and updates system state."""
def __init__(self, active_task: ActiveTask) -> None:
self.active_task = active_task
self.task_mode: bool | None = None
self.message_to_save: Message | None = None
async def run(self) -> None:
"""Consumes events from the agent and updates system state."""
logger.debug('Consumer[%s]: Started', self.active_task._task_id)
try:
while True:
logger.debug(
'Consumer[%s]: Waiting for event',
self.active_task._task_id,
)
event = (
await self.active_task._event_queue_agent.dequeue_event()
)
logger.debug(
'Consumer[%s]: Dequeued event %s',
self.active_task._task_id,
type(event).__name__,
)
await self._process_event(event)For example, in non-streaming mode, a task status update event may be replaced with the fully updated task before being returned. Once the producer schedule completes, the consumer schedule eventually shuts down as well.
Regarding subscriptions, a tapped queue is created to receive events published by the agent logic. This tapped queue is drained, and the caller receives events through and yield back to the request handler.
In non-streaming mode, if the event is a Message, it represents the only payload returned for that interaction. If the event is a Task and its state is terminal, such as completed, the handler stops waiting and returns the final result.
@validate_request_params
async def on_message_send( # noqa: D102
self,
params: SendMessageRequest,
context: ServerCallContext,
) -> Message | Task:
active_task, request_context = await self._setup_active_task(
params, context
)
task_id = cast('str', request_context.task_id)
result: Message | Task | None = None
async for raw_event in active_task.subscribe(
request=request_context,
include_initial_task=False,
replace_status_update_with_task=True,
):
event = raw_event
logger.debug(
'Processing[%s] event [%s] %s',
params.message.task_id,
type(event).__name__,
event,
)
if isinstance(event, Task) and (
params.configuration.return_immediately
or event.status.state
in (TERMINAL_TASK_STATES | INTERRUPTED_TASK_STATES)
):
self._validate_task_id_match(task_id, event.id)
result = event
# DO break here as it's "return_immediately".
# AgentExecutor will continue to run in the background.
break
if isinstance(event, Message):
result = event
# Do NOT break here as Message is supposed to be the only
# event in "Message-only" interaction.
# ActiveTask consumer (see active_task.py) validates the event
# stream and raises InvalidAgentResponseError if more events are
# pushed after a Message.
if result is None:
logger.debug('Missing result for task %s', request_context.task_id)
result = await active_task.get_task()
if isinstance(result, Task):
result = apply_history_length(result, params.configuration)
logger.debug(
'Returning result for task %s: %s',
request_context.task_id,
result,
)
return resultThe JSON-RPC response is then created and sent back to the client:
handler_result = JSONRPC20Response(
result=raw_result,
_id=request_id,
).data
Naturally, non-streaming requests are not ideal for long-running tasks, especially when the client needs progress updates. For this reason, A2A provides a dedicated method, SendStreamingMessage, which uses Server-Sent Events or SSE. The method sends a message to the agent and yields events as they are produced, including task updates, message chunks, and artifact updates. It is similar to SendMessage, except that events are streamed incrementally instead of being returned only once at the end.
async for event in active_task.subscribe(
request=request_context,
include_initial_task=False,
):
# Do NOT break here as we rely on AgentExecutor to yield control.
# ActiveTask consumer validates the event stream and raises
# InvalidAgentResponseError on misbehaving agents:
# - an event after a Message
# - a Message after entering task mode
# - an event after a terminal state
if isinstance(event, Task):
self._validate_task_id_match(task_id, event.id)
yield apply_history_length(event, params.configuration)
else:
yield event
The first time a message is sent to an A2A agent, a contextId is generated.
class RequestContext:
...
# If the task id and context id were provided, make sure they
# match the request. Otherwise, create them
if self._params:
if task_id:
self._params.message.task_id = task_id
if task and task.id != task_id:
raise InvalidParamsError(message='bad task id')
else:
self._check_or_generate_task_id()
if context_id:
self._params.message.context_id = context_id
if task and task.context_id != context_id:
raise InvalidParamsError(message='bad context id')
else:
self._check_or_generate_context_id()This identifier is used to logically group multiple Tasks and Messages that belong to the same conversation or execution session. It differs from the taskId, which identifies a specific unit of work. Providing a taskId in a subsequent SendMessage request indicates that the client wishes to continue an existing task that has not yet reached a terminal state. By contrast, the contextId spans the entire conversation and can be used to relate completed tasks, interrupted tasks, or even multiple concurrent tasks that belong to the same interaction. Clients can further help the agent by including referenceTaskIds in the Message object, allowing the agent to explicitly reference previous tasks as discussed earlier.
The A2A task store maintains the execution history for each task; however, the protocol deliberately does not define how this history should be projected into the agent's execution context. It is therefore the developer's responsibility to ensure that the underlying agent retrieves the relevant conversation history associated with the contextId and injects it into the prompt or execution context of the underlying foundation model. In other words, while A2A standardizes communication between agents, context management remains an implementation concern. For LLM-based agents in particular, the contextId serves as the mechanism for reconstructing the conversational state and maintaining continuity across multiple requests. An example is shown below (from a2a-samples repository):
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
error = self._validate_request(context)
if error:
raise ServerError(error=InvalidParamsError())
input_event = self._get_input_event(context)
context_id = context.context_id
task_id = context.task_id
try:
ctx = None
handler = None
# Check if we have a saved context state for this session
print(f'Len of ctx_states: {len(self.ctx_states)}', flush=True)
saved_ctx_state = self.ctx_states.get(context_id, None)
if saved_ctx_state is not None:
# Resume with existing context
logger.info(f'Resuming session {context_id} with saved context')
ctx = Context.from_dict(self.agent, saved_ctx_state)
handler = self.agent.run(
start_event=input_event,
ctx=ctx,
)
else:
# New session!
logger.info(f'Starting new session {context_id}')
handler = self.agent.run(
start_event=input_event,
)
# Emit an initial task object
updater = TaskUpdater(event_queue, task_id, context_id)
await updater.submit()
async for event in handler.stream_events():
if isinstance(event, LogEvent):
# Send log event as intermediate message
await updater.update_status(
TaskState.working,
new_agent_text_message(event.msg, context_id, task_id),
)
# Wait for final response
final_response = await handler
if isinstance(final_response, ChatResponseEvent):
content = final_response.response
metadata = (
final_response.citations
if hasattr(final_response, 'citations')
else None
)
if metadata is not None:
# ensure metadata is a dict of str keys
metadata = {str(k): v for k, v in metadata.items()}
# save the context state to resume the current session
self.ctx_states[context_id] = handler.ctx.to_dict()
await updater.add_artifact(
[Part(root=TextPart(text=content))],
name='llama_summary',
metadata=metadata,
)
await updater.complete()
else:
await updater.failed(f'Unexpected completion {final_response}')During execution, an agent typically produce intermediate events such as progress updates, partial outputs, artifact updates, or task status changes. These events are typically published through a task updater or an internal event queue. In streaming mode, the A2A client receives these events incrementally as they are produced, enabling real-time monitoring of the agent's execution. In non-streaming mode, the client receives only the final task, typically in the form of a completed task containing the final response, generated artifacts, and the terminal task state (or interrupted).
A2A supports interruptible execution through the TASK_STATE_INPUT_REQUIRED task state. During the agent execution loop, if the agent determines that additional information is needed from the user, its execution will be suspended by updating the task status to input_required signaling a need for additional data to the client.
elif require_user_input:
await updater.update_status(
TaskState.input_required,
new_agent_text_message(
item["content"],
task.context_id,
task.id,
),
final=True,
)
breakAt this point, the agent loop stops, i.e the agent executor yields control, and the A2A client receives the request for additional input ( the task transitions to TASK_STATE_INPUT_REQUIRED).
The client resumes the interaction by sending another SendMessage request whose Message.taskId references the existing task. The A2A server then continues processing that task from its current state rather than creating a new one; that is when the user response/message is received, the A2A framework will invoke the agent executor again (the agent executor implementation should have returned when yielding control ):

Tasks are immutable. Once a task reaches a terminal state (completed, failed, or canceled), it cannot be modified or resumed directly. Instead, continuing the interaction creates a new task that references the same contextId and, where appropriate, the previous task through referenceTaskIds, thereby preserving the conversational context while maintaining an immutable history of task executions. As discussed earlier, the agent executor implementation is responsible for managing the conversational state associated with a given contextId.
async def start(
self,
call_context: ServerCallContext,
create_task_if_missing: bool = False,
) -> None:
"""Starts the active task background processes.
Concurrency Guarantee:
Uses `self._lock` to ensure the producer and consumer tasks are strictly
singleton instances for the lifetime of this ActiveTask.
"""
logger.debug('ActiveTask[%s]: Starting', self._task_id)
async with self._lock:
if self._is_finished.is_set():
raise InvalidParamsError(
f'Task {self._task_id} is already completed. Cannot start it again.'
)
if (
self._producer_task is not None
and self._consumer_task is not None
):
logger.debug(
'ActiveTask[%s]: Already started, ignoring start request',
self._task_id,
)
return
logger.debug(
'ActiveTask[%s]: Executing setup (call_context: %s, create_task_if_missing: %s)',
self._task_id,
call_context,
create_task_if_missing,
)
try:
self._task_manager._call_context = call_context
task = await self._task_manager.get_task()
logger.debug('TASK (start): %s', task)
if task:
self._task_created.set()
if task.status.state in TERMINAL_TASK_STATES:
raise InvalidParamsError(
message=f'Task {task.id} is in terminal state: {task.status.state}'
)
elif not create_task_if_missing:
raise TaskNotFoundErrorThe A2A initiative addresses the lack of uniform standards for agent-to-agent communication by defining a structured communication flow between an A2A client and an A2A server, providing a robust foundation for building sophisticated agentic systems. However, special attention must be paid to the security risks introduced when using the protocol.
Security risks include Agent Card Spoofing [3], where an A2A client is tricked into trusting a forged Agent Card and subsequently sends sensitive tasks to a rogue A2A server, A2A Task Replay [3], where an attacker captures a valid A2A task and replays it multiple times to trigger repeated execution, and injection attacks [3], where malformed or malicious messages manipulate the receiving agent. Network-level attacks may also result in A2A Server Impersonation [3], while additional threats include supply chain attacks through compromised A2A dependencies, insecure storage or transmission of authentication tokens, and other identity-related attacks [3].
Agent Cards deserve particular attention because they describe an agent's capabilities and are typically incorporated into the orchestrator's system prompt during planning. Consequently, a poisoned Agent Card can become a vector for prompt injection, influencing the planner to make incorrect delegation decisions or execute unintended actions or tools (e.g to achieve RCE and data exfiltration).
Mitigations include secure Agent Card resolution through trusted registries, Agent Card sanitization before exposing descriptions to the planning agent, the use of short-lived OAuth 2.0 access tokens with granular scopes for sensitive tools, timestamp and nonce verification to prevent task replay, and designing tasks to be idempotent whenever possible.
Implementations should also enforce strict schema validation, the principle of least privilege, mTLS, strong JWT validation, credential verification for every request, and secure token storage.
In addition, all uploaded files should be scanned for malicious content before being processed by an agent. Finally, when using A2A notifications or webhooks, webhook URLs should be validated to prevent Server-Side Request Forgery or SSRF attacks.
Let’s now see how we can deploy two A2A servers, each hosting a different agent, one for hotel booking and one for flight booking. For simplicity, the responses are hardcoded, but you can imagine replacing them with more sophisticated agent using MCPs, Skills and multi agentic patterns (handoff, agent as a tool ...).
First, we define the plumbing required to expose the Agent Card and process JSON-RPC requests. Security interceptors are omitted for simplicity.
class BookingAgentExecutor(AgentExecutor):
def __init__(self, responder: Callable[[str], str]) -> None:
self._responder = responder
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
task = context.current_task or new_task_from_user_message(context.message)
if context.current_task is None:
await event_queue.enqueue_event(task)
updater = TaskUpdater(
event_queue=event_queue,
task_id=task.id,
context_id=task.context_id,
)
await updater.update_status(
state=TaskState.TASK_STATE_WORKING,
message=new_text_message("Processing booking request..."),
)
user_text = get_message_text(context.message) or ""
result = self._responder(user_text)
await updater.add_artifact(
parts=[new_text_part(text=result, media_type="text/plain")]
)
await updater.update_status(
state=TaskState.TASK_STATE_COMPLETED,
message=new_text_message("Booking request completed."),
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
raise NotImplementedError("Cancellation is not implemented for this demo.")
def build_agent_card(
*,
name: str,
description: str,
url: str,
skill_id: str,
skill_name: str,
skill_description: str,
examples: list[str],
) -> AgentCard:
skill = AgentSkill(
id=skill_id,
name=skill_name,
description=skill_description,
input_modes=["text/plain"],
output_modes=["text/plain"],
tags=["travel", "booking", skill_id],
examples=examples,
)
return AgentCard(
name=name,
description=description,
version="0.1.0",
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
capabilities=AgentCapabilities(
streaming=False,
extended_agent_card=False,
),
supported_interfaces=[
AgentInterface(
protocol_binding="JSONRPC",
url=url,
protocol_version="1.0",
)
],
skills=[skill],
)
def run_a2a_server(
card: AgentCard,
executor: AgentExecutor,
host: str,
port: int,
) -> None:
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
agent_card=card,
)
routes = []
routes.extend(create_agent_card_routes(card))
routes.extend(create_jsonrpc_routes(handler, "/"))
app = Starlette(routes=routes)
uvicorn.run(app, host=host, port=port)
Now, here is our flight agent:
def flight_response(user_text: str) -> str:
return (
"FLIGHT_BOOKING_CONFIRMED\n"
"Booking reference: FL-A2A-0427\n"
)
def main() -> None:
parser = argparse.ArgumentParser(description="Run the Flight Booking A2A server.")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=10001)
args = parser.parse_args()
url = f"http://{args.host}:{args.port}"
card = build_agent_card(
name="Flight Booking Agent",
description="Books round-trip flights for a requested travel period.",
url=url,
skill_id="book_flight",
skill_name="Book flight",
skill_description=(
"Given a user request containing a travel period, "
"return a flight booking confirmation."
),
examples=["Book me a flight from 2026-08-10 to 2026-08-15"],
)
run_a2a_server(
card,
BookingAgentExecutor(flight_response),
args.host,
args.port,
)
Followed by our hotel agent:
def hotel_response(user_text: str) -> str:
return (
"HOTEL_BOOKING_CONFIRMED\n"
"Booking reference: HT-A2A-3819\n"
)
def main() -> None:
parser = argparse.ArgumentParser(description="Run the Hotel Booking A2A server.")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=10002)
args = parser.parse_args()
url = f"http://{args.host}:{args.port}"
card = build_agent_card(
name="Hotel Booking Agent",
description="Books hotels for a requested stay period.",
url=url,
skill_id="book_hotel",
skill_name="Book hotel",
skill_description=(
"Given a user request containing a stay period, "
"return a hotel booking confirmation."
),
examples=["Book me a hotel from 2026-08-10 to 2026-08-15"],
)
run_a2a_server(
card,
BookingAgentExecutor(hotel_response),
args.host,
args.port,
)
Once the two servers are deployed, we can define the following Bash helper, send_a2a_jsonrpc.sh, to send a non-streaming A2A JSON-RPC request:
#!/usr/bin/env bash
URL="${1:-http://127.0.0.1:10001}"
TEXT="${2:-Book me for 2026-08-24 to 2026-08-29}"
curl -sS -X POST "$URL/" \
-H 'Content-Type: application/json' \
-H 'A2A-Version: 1.0' \
-d @- <<JSON | python -m json.tool
{
"jsonrpc": "2.0",
"id": "id-1",
"method": "SendMessage",
"params": {
"message": {
"role": "ROLE_USER",
"parts": [
{
"text": "$TEXT"
}
],
"messageId": "message-1"
}
}
}
We can also test Agent Card discovery with:
curl -s http://127.0.0.1:10001/.well-known/agent-card.json | python -m json.tool
{
"name": "Flight Booking Agent",
"description": "Books round-trip flights for a requested travel period.",
"supportedInterfaces": [
{
"url": "http://127.0.0.1:10001",
"protocolBinding": "JSONRPC",
"protocolVersion": "1.0"
}
],
"version": "0.1.0",
"capabilities": {
"streaming": false,
"extendedAgentCard": false
},
"defaultInputModes": [
"text/plain"
],
"defaultOutputModes": [
"text/plain"
],
"skills": [
{
"id": "book_flight",
"name": "Book flight",
"description": "Given a user request containing a travel period, return a flight booking confirmation.",
"tags": [
"travel",
"booking",
"book_flight"
],
"examples": [
"Book me a flight from 2026-08-10 to 2026-08-15"
],
"inputModes": [
"text/plain"
],
"outputModes": [
"text/plain"
]
}
]
}
Next, we send a message to the flight agent:
./send_a2a_jsonrpc.sh http://127.0.0.1:10001 \
"Book me a flight from 2026-08-24 to 2026-08-30"
This returns a completed A2A task containing the final artifact along with history we listed earlier in the executor implementation:
{
"result": {
"task": {
"id": "0bcddc7b-3687-4494-9232-d8f1c17b7af9",
"contextId": "4ac3986c-6e6d-40dd-923b-ecbc899145ff",
"status": {
"state": "TASK_STATE_COMPLETED",
"message": {
"messageId": "a751238a-37c5-49ff-8342-686273c916dd",
"role": "ROLE_AGENT",
"parts": [
{
"text": "Booking request completed."
}
]
},
"timestamp": "2026-07-05T10:36:26.927722Z"
},
"artifacts": [
{
"artifactId": "20343763-c6d1-43c9-81d4-012924dd982a",
"parts": [
{
"text": "FLIGHT_BOOKING_CONFIRMED\nBooking reference: FL-A2A-0427\n",
"mediaType": "text/plain"
}
]
}
],
"history": [
{
"messageId": "message-1",
"contextId": "4ac3986c-6e6d-40dd-923b-ecbc899145ff",
"taskId": "0bcddc7b-3687-4494-9232-d8f1c17b7af9",
"role": "ROLE_USER",
"parts": [
{
"text": "Book me a flight from 2026-08-10 to 2026-08-15"
}
]
},
{
"messageId": "13895840-5dd7-4072-9a47-b9af324988c2",
"role": "ROLE_AGENT",
"parts": [
{
"text": "Processing booking request..."
}
]
}
]
}
},
"id": "id-1",
"jsonrpc": "2.0"
}
At this point, we have a completed A2A task with artifacts and intermediate updates.
To integrate this into a multi-agent ecosystem, the routing agent can first fetch and list the available Agent Cards (e.g private maintained registry in your organization), append their capabilities to the system prompt, and then expose the remote agents in one of two ways. either as a send_message tool that takes the target agent name and message as input, or as remote A2A sub-agents like https://adk.dev/a2a/quickstart-consuming/ (again many patterns are possible).
@a2a_experimental
class RemoteA2aAgent(BaseAgent):
"""Agent that communicates with a remote A2A agent via A2A client.
This agent supports multiple ways to specify the remote agent:
1. Direct AgentCard object
2. URL to agent card JSON
3. File path to agent card JSON
The agent handles:
- Agent card resolution and validation
- HTTP client management with proper resource cleanup
- A2A message conversion and error handling
- Session state management across requests
"""
source: https://github.com/google/adk-python/blob/main/src/google/adk/agents/remote_a2a_agent.py
As listed below, the Google ADK uses the A2A client provided by the official python sdk to send message to the remote A2A server; specifically, the current implementation constructs by default the A2A client with non-streaming mode.
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""Core implementation for async agent execution."""
try:
await self._ensure_resolved()
except Exception as e:
yield Event(
author=self.name,
error_message=f"Failed to initialize remote A2A agent: {e}",
invocation_id=ctx.invocation_id,
branch=ctx.branch,
)
return
# Create A2A request for function response or regular message
a2a_request = self._create_a2a_request_for_user_function_response(ctx)
if not a2a_request:
message_parts, context_id = self._construct_message_parts_from_session(
ctx
)
if not message_parts:
logger.warning(
"No parts to send to remote A2A agent. Emitting empty event."
)
yield Event(
author=self.name,
content=genai_types.Content(),
invocation_id=ctx.invocation_id,
branch=ctx.branch,
)
return
a2a_request = A2AMessage(
message_id=platform_uuid.new_uuid(),
parts=message_parts,
role="user",
context_id=context_id,
)
logger.debug(build_a2a_request_log(a2a_request))
try:
a2a_request, parameters = await execute_before_request_interceptors(
self._config.request_interceptors, ctx, a2a_request
)
if isinstance(a2a_request, Event):
yield a2a_request
return
# Backward compatibility
if self._a2a_request_meta_provider:
parameters.request_metadata = self._a2a_request_meta_provider(
ctx, a2a_request
)
# TODO: Add support for requested_extension and
# message_send_configuration once they are supported by the A2A client.
async for a2a_response in self._a2a_client.send_message(
request=a2a_request,
request_metadata=parameters.request_metadata,
context=parameters.client_call_context,
):
logger.debug(build_a2a_response_log(a2a_response))
metadata = None
if isinstance(a2a_response, tuple):
task = a2a_response[0]
if task:
metadata = task.metadata
else:
metadata = a2a_response.metadata
if metadata and metadata.get(_NEW_A2A_ADK_INTEGRATION_EXTENSION):
event = await self._handle_a2a_response_v2(a2a_response, ctx)
else:
event = await self._handle_a2a_response(a2a_response, ctx)
if not event:
continue
event = await execute_after_request_interceptors(
self._config.request_interceptors, ctx, a2a_response, event
)
if not event:
continue
# Add metadata about the request and response
event.custom_metadata = event.custom_metadata or {}
event.custom_metadata[A2A_METADATA_PREFIX + "request"] = (
a2a_request.model_dump(exclude_none=True, by_alias=True)
)
# If the response is a ClientEvent, record the task state; otherwise,
# record the message object.
if isinstance(a2a_response, tuple):
event.custom_metadata[A2A_METADATA_PREFIX + "response"] = (
a2a_response[0].model_dump(exclude_none=True, by_alias=True)
)
else:
event.custom_metadata[A2A_METADATA_PREFIX + "response"] = (
a2a_response.model_dump(exclude_none=True, by_alias=True)
)
yield event
The A2A protocol is a peer-to-peer framework that defines a standard for how agents discover and communicate with one another. An agentic application discovers available remote agents by retrieving their Agent Cards and then uses its underlying model or orchestration logic to determine which A2A agent should handle a given request. Each A2A agent is hosted behind an A2A Server, which routes incoming A2A messages to an AgentExecutor responsible for executing the request (e.g., running an agent which could be implemented with a particular framework or not) and streaming results back to the client over an established SSE stream (in streaming mode). By connecting trusted remote A2A agents with your agentic application, you can build systems that combine local agents with remotely hosted A2A agents. Likewise, your own application can expose an A2A Server, allowing other agentic applications to interact with its agents. Over time, these interconnected applications can evolve into sophisticated multi-agent systems (MAS) or mesh of agents. Note that each local agent may implement a different orchestration or multi-agentic pattern (swarm, agent as a tool...) independently of the A2A protocol.
[1] https://github.com/a2aproject/A2A
[2] https://modelcontextprotocol.io