AI LLM

Dissecting the A2A Protocol: Foundations for Interoperable Multi-Agent Systems (MAS)

The goal of this post is to provide an in-depth look at standardizing how heterogeneous agents discover, authenticate, and communicate with one another by examining the Google's Agent-to-Agent (A2A) protocol [1] (covering both theory, practical deployment and security). The post will present the foundations of

30 min read
Dissecting the A2A Protocol: Foundations for Interoperable Multi-Agent Systems (MAS)

The goal of this post is to provide an in-depth look at standardizing how heterogeneous agents discover, authenticate, and communicate with one another by examining the Google's Agent-to-Agent (A2A) protocol [1] (covering both theory, practical deployment and security).

The post will present the foundations of the A2A protocol, one of the most promising standards for agent communication. It provides the plumbing to build multi-agent ecosystems while addressing the interoperability problem by enabling agents developed using different stacks and frameworks to communicate seamlessly.

Anthropic's Model Context Protocol (MCP) [2] is built around a similar idea, standardizing how tools are delivered to agents, or more precisely, to foundation models or LLMs, to enable interaction with the external environment, that is, to initiate actions and go beyond text generation, while still avoiding the M × N integration bottleneck. In other words, MCP standardizes vertical interaction (agent-to-environment). In contrast, the A2A protocol focuses on standardizing horizontal interaction (agent-to-agent communication). Thus, A2A and MCP are complementary rather than competitive (i.e. an A2A agent may utilize MCP internally).

Both A2A and MCP provide significant benefits, including interoperability, extensibility, and scalability. However, A2A integration is more complex than MCP, making a solid understanding of its core concepts is critical for effective integration, secure adoption, smart coordination, and the development of reliable multi-agent systems (MAS). In this post, we will take a deep dive into its building blocks to confidently understand and leverage the A2A protocol.

A2A Protocol

The Agent-to-Agent or A2A is a protocol built on top of JSON-RPC, launched by Google in April 2025. It defines a standard for agent-to-agent communication, which is paramount for building scalable and secure multi-agent systems (MAS). Similar to how MCP standardizes tool discovery and calling, A2A defines a standardized protocol through which agents can communicate, discover each other's capabilities via standardized Agent Cards (from /.well-known/agent.json, discussed later), delegate tasks, and exchange messages to enable collaborative multitasking.

The core concepts of the A2A protocol includes Agent CardsMessagesTasks, and Artifacts, which will be covered in a moment.

Agent Discovery (Agent Cards)

An Agent Card is declarative metadata that describes an agent's identity, capabilities, interfaces and required authentication schemes.

class AgentCard(_message.Message):
    __slots__ = ("name", "description", "supported_interfaces", "provider", "version", "documentation_url", "capabilities", "security_schemes", "security_requirements", "default_input_modes", "default_output_modes", "skills", "signatures", "icon_url")
....

    agent_card = AgentCard(
        name='Calendar Agent',
        description="An agent that can manage a user's calendar",
        url=f'http://{host}:{port}/',
        version='1.0.0',
        default_input_modes=['text'],
        default_output_modes=['text'],
        capabilities=AgentCapabilities(streaming=True),
        skills=[skill],
        security_schemes={OAUTH_SCHEME_NAME: SecurityScheme(root=oauth_scheme)},
        # Declare that this scheme is required to use the agent's skills
        security=[
            {OAUTH_SCHEME_NAME: ['https://www.googleapis.com/auth/calendar']}
        ],
    )

example from https://github.com/a2aproject/a2a-samples

Agent Cards enable autonomous agents to discover remote agents best suited for a the task at hand.

In A2A, every agent exposes an Agent Card describing its identity, capabilities, supported skills, and other metadata. The A2A server serves this description through the Agent Card endpoint. By sending a GET request to http://<a2a-base-url>/.well-known/agent-card.json, A2A clients receive a structured data describing the agent. The JSON response looks as follows:

{
  "name": "Hello World Agent",
  "description": "Just a hello world agent",
  "supportedInterfaces": [
    {
      "url": "http://127.0.0.1:9999",
      "protocolBinding": "JSONRPC"
    }
  ],
  "version": "0.0.1",
  "capabilities": {
    "streaming": true,
    "extendedAgentCard": true
  },
  "defaultInputModes": [
    "text/plain"
  ],
  "defaultOutputModes": [
    "text/plain"
  ],
  "skills": [
    {
      "id": "echo_bot",
      "name": "Echo Bot",
      "description": "An example agent that acknowledges client request and responds with a \\"
      Hello
      World
      \
      \
      " message.",
      "tags": [
        "a2a",
        "echo-example"
      ],
      "examples": [
        "hi",
        "how are you"
      ],
      "inputModes": [
        "text/plain"
      ],
      "outputModes": [
        "text/plain"
      ]
    }
  ],
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3",
  "supportsAuthenticatedExtendedCard": true,
  "url": "http://127.0.0.1:9999"
}

Using the Agent Cards of remote agents, orchestrating, routing, or planning agents can dynamically discover available capabilities and delegate subtasks to the most appropriate subagent. For example, an orchestrator may first retrieve the Agent Cards of available A2A agents and use the names, descriptions, skills, and capabilities to determine which agent is best suited for a given task or query. In LLM-based orchestrators, this information is typically incorporated into the model's execution context typically the system prompt allowing the language model to decide when and which delegation is appropriate. The ability to communicate with a remote agent can be exposed as a tool or subagent, with each remote connection backed by an A2AClient responsible for communicating with the target A2A agent. Once the remote agent returns a response, the orchestrator injects the result back into the conversation, allowing the language model to continue reasoning as if it had invoked any other tool. Many other orchestration patterns are possible.

A2A Server

The A2A Server exposes an HTTPS endpoint implementing the A2A methods defined by the protocol over JSON-RPC 2.0, supporting both streaming and non-streaming interactions. It receives incoming requests from A2A clients, manages the A2A task lifecycle, including task creation, execution, state transitions, and subscriptions and returns responses through the A2A protocol. The server-side request handler is responsible for coordinating task execution with the underlying agent and maintaining the task state throughout its lifecycle. 

In the non-streaming interaction mode, a client invokes the SendMessage method using a standard JSON-RPC request.

    async def send_message(
        self,
        request: SendMessageRequest,
        *,
        context: ClientCallContext | None = None,
    ) -> SendMessageResponse:
        """Sends a non-streaming message request to the agent."""
        rpc_request = JSONRPC20Request(
            method='SendMessage',
            params=json_format.MessageToDict(request),
            _id=str(uuid4()),
        )

The server returns either a Message or a Task:

    @abstractmethod
    async def on_message_send(
        self,
        params: SendMessageRequest,
        context: ServerCallContext,
    ) -> Task | Message:

Task includes fields such as idcontextIdstatusartifactshistory, and metadata

class Task(_message.Message):
    __slots__ = ("id", "context_id", "status", "artifacts", "history", "metadata")

while a Message includes fields such as messageIdcontextIdtaskIdrolepartsmetadataextensions, and referenceTaskIds.

class Message(_message.Message):
    __slots__ = ("message_id", "context_id", "task_id", "role", "parts", "metadata", "extensions", "reference_task_ids")

 Message and Task objects will be discussed in more details later.

Specifically, in a non-streaming mode, the task is retuned when it transitions to terminal or interrupted states.

    @validate_request_params
    async def on_message_send(  # noqa: D102
        self,
        params: SendMessageRequest,
        context: ServerCallContext,
    ) -> Message | Task:
        active_task, request_context = await self._setup_active_task(
            params, context
        )
        task_id = cast('str', request_context.task_id)

        result: Message | Task | None = None

        async for raw_event in active_task.subscribe(
            request=request_context,
            include_initial_task=False,
            replace_status_update_with_task=True,
        ):
            event = raw_event
            logger.debug(
                'Processing[%s] event [%s] %s',
                params.message.task_id,
                type(event).__name__,
                event,
            )
            if isinstance(event, Task) and (
                params.configuration.return_immediately
                or event.status.state
                in (TERMINAL_TASK_STATES | INTERRUPTED_TASK_STATES)
            ):
                self._validate_task_id_match(task_id, event.id)
                result = event
                # DO break here as it's "return_immediately".
                # AgentExecutor will continue to run in the background.
                break

            if isinstance(event, Message):
                result = event
                # Do NOT break here as Message is supposed to be the only
                # event in "Message-only" interaction.
                # ActiveTask consumer (see active_task.py) validates the event
                # stream and raises InvalidAgentResponseError if more events are
                # pushed after a Message.

In the streaming interaction mode, a client uses the SendStreamingMessage method using a JSON-RPC request.

    async def send_message_streaming(
        self,
        request: SendMessageRequest,
        *,
        context: ClientCallContext | None = None,
    ) -> AsyncGenerator[StreamResponse]:
        """Sends a streaming message request to the agent and yields responses as they arrive."""
        rpc_request = JSONRPC20Request(
            method='SendStreamingMessage',
            params=json_format.MessageToDict(request),
            _id=str(uuid4()),
        )
        async for event in self._send_stream_request(
            dict(rpc_request.data),
            context,
        ):
            yield event

Rather than returning a single response, the server establishes a Server-Sent Events (SSE) stream and sends incremental updates as the task progresses (as shown above, the events are produced using AsyncGenerator ).

The updates are emitted as A2A events, such as TaskStatusUpdateEvent, which reports task state transitions, and TaskArtifactUpdateEvent, which delivers newly generated artifacts as they become available. The stream remains open until the task reaches a terminal state.

    async def on_message_send_stream(  # noqa: D102
        self,
        params: SendMessageRequest,
        context: ServerCallContext,
    ) -> AsyncGenerator[Event, None]:
        active_task, request_context = await self._setup_active_task(
            params, context
        )

        task_id = cast('str', request_context.task_id)

        async for event in active_task.subscribe(
            request=request_context,
            include_initial_task=False,
        ):
            # Do NOT break here as we rely on AgentExecutor to yield control.
            # ActiveTask consumer (see active_task.py) validates the event
            # stream and raises InvalidAgentResponseError on misbehaving agents:
            #   - an event after a Message
            #   - Message after entering task mode
            #   - an event after a terminal state
            if isinstance(event, Task):
                self._validate_task_id_match(task_id, event.id)
                yield apply_history_length(event, params.configuration)
            else:
                yield event

A2A Client

The A2A Client is typically an LLM-based application or an autonomous agent that communicates with remote agents by sending A2A JSON-RPC requests to an A2A server. The client discovers the target agent through its Agent Card as explained earlier, constructing protocol-compliant requests, sending tasks or messages, and handling the returned A2A responses or streamed updates from the agent server.

@trace_class(kind=SpanKind.CLIENT)
class JsonRpcTransport(ClientTransport):
    """A JSON-RPC transport for the A2A client."""

    def __init__(
        self,
        httpx_client: httpx.AsyncClient,
        agent_card: AgentCard,
        url: str,
    ):
        """Initializes the JsonRpcTransport."""
        self.url = url
        self.httpx_client = httpx_client
        self.agent_card = agent_card

    async def send_message(
        self,
        request: SendMessageRequest,
        *,
        context: ClientCallContext | None = None,
    ) -> SendMessageResponse:
        """Sends a non-streaming message request to the agent."""
        rpc_request = JSONRPC20Request(
            method='SendMessage',
            params=json_format.MessageToDict(request),
            _id=str(uuid4()),
        )
        response_data = await self._send_request(
            dict(rpc_request.data), context
        )
        json_rpc_response = JSONRPC20Response(**response_data)
        if json_rpc_response.error:
            raise self._create_jsonrpc_error(json_rpc_response.error)
        response: SendMessageResponse = json_format.ParseDict(
            json_rpc_response.result, SendMessageResponse()
        )
        return response

Here is what a request looks like:

POST /rpc HTTP/1.1
Host: agent.example.com
Content-Type: application/json
Authorization: Bearer token
A2A-Version: 0.3
A2A-Extensions: https://example.com/extensions/geolocation/v1,https://standards.org/extensions/citations/v1

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "SendMessage",
  "params": { /* SendMessageRequest */ }
}

https://github.com/a2aproject/a2a-python

Then on the server side, the routing is performed by the request handler:

    @validate_version(constants.PROTOCOL_VERSION_1_0)
    async def _process_non_streaming_request(  # noqa: PLR0911
        self,
        request_obj: Any,
        context: ServerCallContext,
    ) -> dict[str, Any] | None:
        """Processes non-streaming requests.

        Args:
            request_obj: The proto request message.
            context: The ServerCallContext for the request.

        Returns:
            A dict containing the result or error.
        """
        method = context.state.get('method')
        match method:
            case 'SendMessage':
                return await self._handle_send_message(request_obj, context)
            case 'CancelTask':
                return await self._handle_cancel_task(request_obj, context)
            case 'GetTask':
                return await self._handle_get_task(request_obj, context)
                ...

A2A Task

Task is the fundamental unit of work in the A2A protocol.

class Task(_message.Message):
    __slots__ = ("id", "context_id", "status", "artifacts", "history", "metadata")
    ID_FIELD_NUMBER: _ClassVar[int]
    CONTEXT_ID_FIELD_NUMBER: _ClassVar[int]
    STATUS_FIELD_NUMBER: _ClassVar[int]
    ARTIFACTS_FIELD_NUMBER: _ClassVar[int]
    HISTORY_FIELD_NUMBER: _ClassVar[int]
    METADATA_FIELD_NUMBER: _ClassVar[int]
    id: str
    context_id: str
    status: TaskStatus
    artifacts: _containers.RepeatedCompositeFieldContainer[Artifact]
    history: _containers.RepeatedCompositeFieldContainer[Message]
    metadata: _struct_pb2.Struct
    def __init__(self, id: _Optional[str] = ..., context_id: _Optional[str] = ..., status: _Optional[_Union[TaskStatus, _Mapping]] = ..., artifacts: _Optional[_Iterable[_Union[Artifact, _Mapping]]] = ..., history: _Optional[_Iterable[_Union[Message, _Mapping]]] = ..., metadata: _Optional[_Union[_struct_pb2.Struct, _Mapping]] = ...) -> None: ...

Every request submitted to an agent is associated with a task that represents the lifecycle of that piece of work.

    @validate_request_params
    async def on_message_send(  # noqa: D102
        self,
        params: SendMessageRequest,
        context: ServerCallContext,
    ) -> Message | Task:
        active_task, request_context = await self._setup_active_task(
            params, context
        )

When a request is received, the server first determines whether it references an existing active task. If so, the request is associated with that task; otherwise, a new task is created and persisted in the task store, allowing its state and results to be retrieved throughout its lifetime.

        active_task = await self._active_task_registry.get_or_create(
            task_id,
            context_id=context_id,
            call_context=call_context,
            create_task_if_missing=True,
        )

A task progresses through a well-defined lifecycle represented by a set of standardized states:

class TaskStatus(_message.Message):
    __slots__ = ("state", "message", "timestamp")
    STATE_FIELD_NUMBER: _ClassVar[int]
    MESSAGE_FIELD_NUMBER: _ClassVar[int]
    TIMESTAMP_FIELD_NUMBER: _ClassVar[int]
    state: TaskState
    message: Message
    timestamp: _timestamp_pb2.Timestamp
    def __init__(self, state: _Optional[_Union[TaskState, str]] = ..., message: _Optional[_Union[Message, _Mapping]] = ..., timestamp: _Optional[_Union[datetime.datetime, _timestamp_pb2.Timestamp, _Mapping]] = ...) -> None: ...
...
class TaskState(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
    __slots__ = ()
    TASK_STATE_UNSPECIFIED: _ClassVar[TaskState]
    TASK_STATE_SUBMITTED: _ClassVar[TaskState]
    TASK_STATE_WORKING: _ClassVar[TaskState]
    TASK_STATE_COMPLETED: _ClassVar[TaskState]
    TASK_STATE_FAILED: _ClassVar[TaskState]
    TASK_STATE_CANCELED: _ClassVar[TaskState]
    TASK_STATE_INPUT_REQUIRED: _ClassVar[TaskState]
    TASK_STATE_REJECTED: _ClassVar[TaskState]
    TASK_STATE_AUTH_REQUIRED: _ClassVar[TaskState]

A TaskManager is typically used to manage a task's lifecycle during execution of a request. It references a TaskStore, which is used to for fetching, saving, and updating the Task based on events produced by the underlying agent.

class TaskManager:
...
    async def get_task(self) -> Task | None:
        """Retrieves the current task object, either from memory or the store.

        If `task_id` is set, it first checks the in-memory `_current_task`,
        then attempts to load it from the `task_store`.

        Returns:
            The `Task` object if found, otherwise `None`.
        """
        if not self.task_id:
            logger.debug('task_id is not set, cannot get task.')
            return None

        if self._current_task:
            return self._current_task

        logger.debug(
            'Attempting to get task from store with id: %s', self.task_id
        )
        self._current_task = await self.task_store.get(
            self.task_id, self._call_context
        )
        if self._current_task:
            logger.debug('Task %s retrieved successfully.', self.task_id)
        else:
            logger.debug('Task %s not found.', self.task_id)
        return self._current_task

A2A Messages

The response to a client request in A2A can be either a Task or a Message. In non-streaming mode, when a task completes successfully, the server typically returns the completed task, which contains the final state, any generated artifacts, and the agent's response.

    async def _handle_send_message(
        self, request_obj: SendMessageRequest, context: ServerCallContext
    ) -> dict[str, Any]:
        task_or_message = await self.request_handler.on_message_send(
            request_obj, context
        )
        if isinstance(task_or_message, Task):
            return MessageToDict(SendMessageResponse(task=task_or_message))
        return MessageToDict(SendMessageResponse(message=task_or_message))

Alternatively, the server may return a standalone Message without exposing too much data and thus minimizing unnecessary exposure of sensitive information [4]; e.g for simple conversational exchanges where the client only requires the immediate response rather than the complete task lifecycle.

    def new_agent_message(
        self,
        parts: list[Part],
        metadata: dict[str, Any] | None = None,
    ) -> Message:
        """Creates a new message object sent by the agent for this task/context.

        Note: This method only *creates* the message object. It does not
              automatically enqueue it.

        Args:
            parts: A list of `Part` objects for the message content.
            metadata: Optional metadata for the message.

        Returns:
            A new `Message` object.
        """
        return Message(
            role=Role.ROLE_AGENT,
            task_id=self.task_id,
            context_id=self.context_id,
            message_id=self._message_id_generator.generate(
                IDGeneratorContext(
                    task_id=self.task_id, context_id=self.context_id
                )
            ),
            metadata=metadata,
            parts=parts,
        )

As shown above, each message consists of a role (e.g., ROLE_USER for client or ROLE_AGENT for the server) and one or more parts.

        request_message = Message(
            role=Role.user,
            parts=[Part(root=TextPart(text=message))],
            message_id=message_id,
            context_id=context_id,
            task_id=task_id,
        )

A part represents a unit of content contained within the message or an artifact (e.g outputs generated by the A2A agent) and may be for example a TextPart, FilePart or DataPart for a plain text, files and structured data respectively.

async def convert_part(part: Part, tool_context: ToolContext) -> object:
    """Convert a single A2A Part into an ADK-friendly representation."""
    if part.root.kind == 'text':
        return part.root.text
    if part.root.kind == 'data':
        return part.root.data
    if part.root.kind == 'file':
        # Repackage A2A FilePart to google.genai Blob
        # Currently not considering plain text as files
        file_id = part.root.file.name
        file_bytes = base64.b64decode(part.root.file.bytes)
        file_part = types.Part(
            inline_data=types.Blob(
                mime_type=part.root.file.mime_type, data=file_bytes
            )
        )
        await tool_context.save_artifact(file_id, file_part)
        tool_context.actions.skip_summarization = True
        tool_context.actions.escalate = True
        return DataPart(data={'artifact-file-id': file_id})
    return f'Unknown type: {part.kind}'

Example from a2a-samples repositroy

Note that even when the client receives only a standalone message, the server still represents the interaction internally as an active task within the A2A framework. The task exists to manage execution and state on the server side but is simply not entirely exposed to the client in the response; the task id is still surfaced in the message payload as listed below:

class Message(_message.Message):
    __slots__ = ("message_id", "context_id", "task_id", "role", "parts", "metadata", "extensions", "reference_task_ids")

A2A Notifications

The A2A protocol also defines a mechanism for servers to push asynchronous notifications to clients through webhooks.

PushNotificationEvent = Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent


class PushNotificationSender(ABC):
    """Interface for sending push notifications for tasks."""

    @abstractmethod
    async def send_notification(
        self, task_id: str, event: PushNotificationEvent
    ) -> None:
        """Sends a push notification containing the latest task state."""

Instead of continuously polling for task updates, an A2A client can provide a webhook URL where the server sends notifications whenever significant task events occur, such as a task reaching a terminal state, transitioning to the input_required state or an artifact is produced.

    async def _update_task_state(
        self,
        updated_task: Task,
        event: Task
        | TaskStatusUpdateEvent
        | TaskArtifactUpdateEvent
        | PushNotificationEvent,
    ) -> None:
        is_terminal = updated_task.status.state in TERMINAL_TASK_STATES

        if is_terminal:
            await self._handle_terminal_state(updated_task)

        if (
            self.active_task._push_sender
            and self.active_task._task_id
            and isinstance(event, PushNotificationEvent)
        ):
            logger.debug(
                'Consumer[%s]: Sending push notification',
                self.active_task._task_id,
            )
            await self.active_task._push_sender.send_notification(
                self.active_task._task_id, event
            )

Upon receiving a push notification, the A2A client should first validate the notification payload and then invoke the GetTask JSON-RPC method using the taskId contained in the notification in order to retrieve the latest state of the task, including any updated artifacts, messages, or status information.

Before push notifications can be used, the A2A server must support the feature in its Agent Card by setting capabilities.pushNotifications to true.

class AgentCapabilities(_message.Message):
    __slots__ = ("streaming", "push_notifications", "extensions", "extended_agent_card")
    STREAMING_FIELD_NUMBER: _ClassVar[int]
    PUSH_NOTIFICATIONS_FIELD_NUMBER: _ClassVar[int]
    EXTENSIONS_FIELD_NUMBER: _ClassVar[int]
    EXTENDED_AGENT_CARD_FIELD_NUMBER: _ClassVar[int]
    streaming: bool
    push_notifications: bool
    extensions: _containers.RepeatedCompositeFieldContainer[AgentExtension]
    extended_agent_card: bool
    def __init__(self, streaming: _Optional[bool] = ..., push_notifications: _Optional[bool] = ..., extensions: _Optional[_Iterable[_Union[AgentExtension, _Mapping]]] = ..., extended_agent_card: _Optional[bool] = ...) -> None: ...

The client can then provide a TaskPushNotificationConfig 

class TaskPushNotificationConfig(_message.Message):
    __slots__ = ("tenant", "id", "task_id", "url", "token", "authentication")
    TENANT_FIELD_NUMBER: _ClassVar[int]
    ID_FIELD_NUMBER: _ClassVar[int]
    TASK_ID_FIELD_NUMBER: _ClassVar[int]
    URL_FIELD_NUMBER: _ClassVar[int]
    TOKEN_FIELD_NUMBER: _ClassVar[int]
    AUTHENTICATION_FIELD_NUMBER: _ClassVar[int]
    tenant: str
    id: str
    task_id: str
    url: str
    token: str
    authentication: AuthenticationInfo
    def __init__(self, tenant: _Optional[str] = ..., id: _Optional[str] = ..., task_id: _Optional[str] = ..., url: _Optional[str] = ..., token: _Optional[str] = ..., authentication: _Optional[_Union[AuthenticationInfo, _Mapping]] = ...) -> None: ...


either as part of the initial request when creating the task

class SendMessageConfiguration(_message.Message):
    __slots__ = ("accepted_output_modes", "task_push_notification_config", "history_length", "return_immediately")
    ACCEPTED_OUTPUT_MODES_FIELD_NUMBER: _ClassVar[int]
    TASK_PUSH_NOTIFICATION_CONFIG_FIELD_NUMBER: _ClassVar[int]
    HISTORY_LENGTH_FIELD_NUMBER: _ClassVar[int]
    RETURN_IMMEDIATELY_FIELD_NUMBER: _ClassVar[int]
    accepted_output_modes: _containers.RepeatedScalarFieldContainer[str]
    task_push_notification_config: TaskPushNotificationConfig
    history_length: int
    return_immediately: bool
    def __init__(self, accepted_output_modes: _Optional[_Iterable[str]] = ..., task_push_notification_config: _Optional[_Union[TaskPushNotificationConfig, _Mapping]] = ..., history_length: _Optional[int] = ..., return_immediately: _Optional[bool] = ...) -> None: ...

or later by invoking the CreateTaskPushNotificationConfig JSON-RPC method for an existing task.

    async def _handle_create_task_push_notification_config(
        self,
        request_obj: TaskPushNotificationConfig,
        context: ServerCallContext,
    ) -> dict[str, Any]:
        result_config = (
            await self.request_handler.on_create_task_push_notification_config(
                request_obj, context
            )
        )
        return MessageToDict(result_config, preserving_proto_field_name=False)
        ...
    @validate_request_params
    @validate(
        lambda self: self._agent_card.capabilities.push_notifications,
        error_message='Push notifications are not supported by the agent',
        error_type=PushNotificationNotSupportedError,
    )
    async def on_create_task_push_notification_config(  # noqa: D102
        self,
        params: TaskPushNotificationConfig,
        context: ServerCallContext,
    ) -> TaskPushNotificationConfig:
        if not self._push_config_store:
            raise PushNotificationNotSupportedError

        task_id = params.task_id
        task: Task | None = await self.task_store.get(task_id, context)
        if not task:
            raise TaskNotFoundError

        await self._push_config_store.set_info(
            task_id,
            params,
            context,
        )

        return params

A2A Request Processing

A2A clients communicate with an A2A server through JSON-RPC requests sent over HTTP POST. The server exposes a JSON-RPC endpoint that routes each request to the appropriate handler based on the rpc method surfaced in the request payload.

    # Method-to-model mapping for centralized routing
    # Proto types don't have model_fields, so we define the mapping explicitly
    # Method names match gRPC service method names
    METHOD_TO_MODEL: dict[str, type] = {
        'SendMessage': SendMessageRequest,
        'SendStreamingMessage': SendMessageRequest,  # Same proto type as SendMessage
        'GetTask': GetTaskRequest,
        'ListTasks': ListTasksRequest,
        'CancelTask': CancelTaskRequest,
        'CreateTaskPushNotificationConfig': TaskPushNotificationConfig,
        'GetTaskPushNotificationConfig': GetTaskPushNotificationConfigRequest,
        'ListTaskPushNotificationConfigs': ListTaskPushNotificationConfigsRequest,
        'DeleteTaskPushNotificationConfig': DeleteTaskPushNotificationConfigRequest,
        'SubscribeToTask': SubscribeToTaskRequest,
        'GetExtendedAgentCard': GetExtendedAgentCardRequest,
    }

For example, when the method is SendMessage, the client is sending a message to the remote agent in a non-streaming mode. Other request types also exist, such as SubscribeToTask, which allows a client to subscribe to an active A2A task and receive updates while the underlying agent processes the request, similarly to streaming message execution.

Let us focus on the SendMessage request which does not require streaming i.e the connection is not upgraded, and only one response is eventually returned to the client. That response can be either a Task or a Message.

                    raw_result = await self._process_non_streaming_request(
                        specific_request, call_context
                    )
                    handler_result = JSONRPC20Response(
                        result=raw_result, _id=request_id
                    ).data

Each A2A request is associated with an A2A task. Therefore, the first responsibility of the handler is to set up the task that will process the request.

If the client request includes a taskId, the server attempts to resolve it as an existing task.

    async def get_or_create(
        self,
        task_id: str,
        call_context: ServerCallContext,
        context_id: str | None = None,
        create_task_if_missing: bool = False,
    ) -> ActiveTask:
        """Retrieves an existing ActiveTask or creates a new one."""
        async with self._lock:
            if task_id in self._active_tasks:
                return self._active_tasks[task_id]

Tasks are stored in a task store and can be retrieved when needed. Active tasks are also maintained in memory through an ActiveTaskRegistry.

class ActiveTaskRegistry:
    """A registry for active ActiveTask instances."""

    def __init__(
        self,
        agent_executor: AgentExecutor,
        task_store: TaskStore,
        push_sender: PushNotificationSender | None = None,
    ):
        self._agent_executor = agent_executor
        self._task_store = task_store
        self._push_sender = push_sender
        self._active_tasks: dict[str, ActiveTask] = {}
        self._lock = asyncio.Lock()
        self._cleanup_tasks: set[asyncio.Task[None]] = set()

If the request does not refer to a known task, a new ActiveTask instance is created, along with a new requestId (and contextId).

    async def get_or_create(
        self,
        task_id: str,
        call_context: ServerCallContext,
        context_id: str | None = None,
        create_task_if_missing: bool = False,
    ) -> ActiveTask:
        """Retrieves an existing ActiveTask or creates a new one."""
        async with self._lock:
            if task_id in self._active_tasks:
                return self._active_tasks[task_id]

            task_manager = TaskManager(
                task_id=task_id,
                context_id=context_id,
                task_store=self._task_store,
                initial_message=None,
                context=call_context,
            )

            active_task = ActiveTask(
                agent_executor=self._agent_executor,
                task_id=task_id,
                task_manager=task_manager,
                push_sender=self._push_sender,
                on_cleanup=self._on_active_task_cleanup,
            )
            self._active_tasks[task_id] = active_task

        await active_task.start(
            call_context=call_context,
            create_task_if_missing=create_task_if_missing,
        )
        return active_task

The active task initializes three internal queues:

class ActiveTask:
    """Manages the lifecycle and execution of an active A2A task.

    It coordinates between the agent's execution (the producer), the
    persistence and state management (the TaskManager), and the event
    distribution to subscribers (the consumer).

    Concurrency Guarantees:
    - This class is designed to be highly concurrent. It manages an internal
      producer-consumer model using `asyncio.Task`s.
    - `self._lock` (asyncio.Lock) ensures mutually exclusive access for critical
      lifecycle state changes, such as starting the task, subscribing, and
      determining if cleanup is safe to trigger.
    - `self._is_finished` (asyncio.Event) provides a thread-safe, non-blocking way
      for external observers and internal loops to check if the ActiveTask has
      permanently ceased execution and closed its queues.
    """

    def __init__(
        self,
        agent_executor: AgentExecutor,
        task_id: str,
        task_manager: TaskManager,
        push_sender: PushNotificationSender | None = None,
        on_cleanup: Callable[[ActiveTask], None] | None = None,
    ) -> None:
        """Initializes the ActiveTask.

        Args:
            agent_executor: The executor to run the agent logic (producer).
            task_id: The unique identifier of the task being managed.
            task_manager: The manager for task state and database persistence.
            push_sender: Optional sender for out-of-band push notifications.
            on_cleanup: Optional callback triggered when the task is fully finished
                        and the last subscriber has disconnected. Used to prune
                        the task from the ActiveTaskRegistry.
        """
        # --- Core Dependencies ---
        self._agent_executor = agent_executor
        self._task_id = task_id
        self._event_queue_agent = EventQueueSource()
        self._event_queue_subscribers = EventQueueSource(
            create_default_sink=False
        )
        self._task_manager = task_manager
        self._push_sender = push_sender
        self._on_cleanup = on_cleanup

        # --- Synchronization Primitives ---
        # `_lock` protects structural lifecycle changes: start(), subscribe() counting,
        # and _maybe_cleanup() race conditions.
        self._lock = asyncio.Lock()

        # `_request_lock` protects parallel request processing.
        self._request_lock = asyncio.Lock()

        # _task_created is set when initial version of task is stored in DB.
        self._task_created = asyncio.Event()

        # `_is_finished` is set EXACTLY ONCE when the consumer loop exits, signifying
        # the absolute end of the task's active lifecycle.
        self._is_finished = asyncio.Event()

        # --- Lifecycle State ---
        # The background task executing the agent logic.
        self._producer_task: asyncio.Task[None] | None = None
        # The background task reading from _event_queue and updating the DB.
        self._consumer_task: asyncio.Task[None] | None = None

        # Tracks how many active SSE/gRPC streams are currently tailing this task.
        # Protected by `_lock`.
        self._reference_count = 0

        # Queue for incoming requests
        self._request_queue: AsyncQueue[tuple[RequestContext, uuid.UUID]] = (
            create_async_queue()
        )

_request_queue receives requests to be processed by the agent _event_queue_agent stores events produced by the agent and _event_queue_subscribers, which forwards agent events to registered subscribers tailing the task.

TaskManager is then associated with the active task. It is responsible for retrieving the corresponding task, either from the persistent task store or directly from memory.

Next, two background schedules are started. The first drains the _request_queue, executes the agent loop through the AgentExecutor; produced responses are pushed into _event_queue_agent.

    async def _run_producer(self) -> None:
        """Executes the agent logic.

        This method encapsulates the external `AgentExecutor.execute` call. It ensures
        that regardless of how the agent finishes (success, unhandled exception, or
        cancellation), the underlying `_event_queue` is safely closed, which signals
        the consumer to wind down.

        Concurrency Guarantee:
        Runs as a detached asyncio.Task. Safe to cancel.
        """
        logger.debug('Producer[%s]: Started', self._task_id)
        request_context = None
        try:
            while True:
                (
                    request_context,
                    request_id,
                ) = await self._request_queue.get()
                await self._request_lock.acquire()
                # TODO: Should we create task manager every time?
                self._task_manager._call_context = request_context.call_context

                request_context.current_task = (
                    await self._task_manager.get_task()
                )

                logger.debug(
                    'Producer[%s]: Executing agent task %s',
                    self._task_id,
                    request_context.current_task,
                )

                try:
                    await self._event_queue_agent.enqueue_event(
                        cast(
                            'Event',
                            _RequestStarted(request_id, request_context),
                        )
                    )
                    await self._agent_executor.execute(
                        request_context, self._event_queue_agent
                    )
                    logger.debug(
                        'Producer[%s]: Execution finished successfully',
                        self._task_id,
                    )
                    await self._event_queue_agent.enqueue_event(
                        cast('Event', _RequestCompleted(request_id))
                    )
                finally:
                    self._request_queue.task_done()

The second schedule drains _event_queue_agent, processes each event, updates the task state when needed, enriches events with the latest task object, and forwards them to subscribers.

    async def _run_consumer(self) -> None:
        """Consumes events from the agent and updates system state."""
        try:
            await EventConsumer(self).run()
        finally:
            self._is_finished.set()
            self._request_queue.shutdown(immediate=True)
            await self._event_queue_agent.close(immediate=True)
            async with self._lock:
                self._reference_count -= 1
            logger.debug('Consumer[%s]: Finishing', self._task_id)
            await self._maybe_cleanup()
 ...
  class EventConsumer:
    """Consumes events from the agent and updates system state."""

    def __init__(self, active_task: ActiveTask) -> None:
        self.active_task = active_task
        self.task_mode: bool | None = None
        self.message_to_save: Message | None = None

    async def run(self) -> None:
        """Consumes events from the agent and updates system state."""
        logger.debug('Consumer[%s]: Started', self.active_task._task_id)
        try:
            while True:
                logger.debug(
                    'Consumer[%s]: Waiting for event',
                    self.active_task._task_id,
                )
                event = (
                    await self.active_task._event_queue_agent.dequeue_event()
                )
                logger.debug(
                    'Consumer[%s]: Dequeued event %s',
                    self.active_task._task_id,
                    type(event).__name__,
                )

                await self._process_event(event)

For example, in non-streaming mode, a task status update event may be replaced with the fully updated task before being returned. Once the producer schedule completes, the consumer schedule eventually shuts down as well.

Regarding subscriptions, a tapped queue is created to receive events published by the agent logic. This tapped queue is drained, and the caller receives events through and yield back to the request handler.

In non-streaming mode, if the event is a Message, it represents the only payload returned for that interaction. If the event is a Task and its state is terminal, such as completed, the handler stops waiting and returns the final result.

    @validate_request_params
    async def on_message_send(  # noqa: D102
        self,
        params: SendMessageRequest,
        context: ServerCallContext,
    ) -> Message | Task:
        active_task, request_context = await self._setup_active_task(
            params, context
        )
        task_id = cast('str', request_context.task_id)

        result: Message | Task | None = None

        async for raw_event in active_task.subscribe(
            request=request_context,
            include_initial_task=False,
            replace_status_update_with_task=True,
        ):
            event = raw_event
            logger.debug(
                'Processing[%s] event [%s] %s',
                params.message.task_id,
                type(event).__name__,
                event,
            )
            if isinstance(event, Task) and (
                params.configuration.return_immediately
                or event.status.state
                in (TERMINAL_TASK_STATES | INTERRUPTED_TASK_STATES)
            ):
                self._validate_task_id_match(task_id, event.id)
                result = event
                # DO break here as it's "return_immediately".
                # AgentExecutor will continue to run in the background.
                break

            if isinstance(event, Message):
                result = event
                # Do NOT break here as Message is supposed to be the only
                # event in "Message-only" interaction.
                # ActiveTask consumer (see active_task.py) validates the event
                # stream and raises InvalidAgentResponseError if more events are
                # pushed after a Message.

        if result is None:
            logger.debug('Missing result for task %s', request_context.task_id)
            result = await active_task.get_task()

        if isinstance(result, Task):
            result = apply_history_length(result, params.configuration)

        logger.debug(
            'Returning result for task %s: %s',
            request_context.task_id,
            result,
        )
        return result

The JSON-RPC response is then created and sent back to the client:

handler_result = JSONRPC20Response(
    result=raw_result,
    _id=request_id,
).data

Naturally, non-streaming requests are not ideal for long-running tasks, especially when the client needs progress updates. For this reason, A2A provides a dedicated method, SendStreamingMessage, which uses Server-Sent Events or SSE. The method sends a message to the agent and yields events as they are produced, including task updates, message chunks, and artifact updates. It is similar to SendMessage, except that events are streamed incrementally instead of being returned only once at the end.

async for event in active_task.subscribe(
    request=request_context,
    include_initial_task=False,
):
    # Do NOT break here as we rely on AgentExecutor to yield control.
    # ActiveTask consumer validates the event stream and raises
    # InvalidAgentResponseError on misbehaving agents:
    #   - an event after a Message
    #   - a Message after entering task mode
    #   - an event after a terminal state
    if isinstance(event, Task):
        self._validate_task_id_match(task_id, event.id)
        yield apply_history_length(event, params.configuration)
    else:
        yield event

Agent Execution and Context Management

The first time a message is sent to an A2A agent, a contextId is generated.

class RequestContext:
...
        # If the task id and context id were provided, make sure they
        # match the request. Otherwise, create them
        if self._params:
            if task_id:
                self._params.message.task_id = task_id
                if task and task.id != task_id:
                    raise InvalidParamsError(message='bad task id')
            else:
                self._check_or_generate_task_id()
            if context_id:
                self._params.message.context_id = context_id
                if task and task.context_id != context_id:
                    raise InvalidParamsError(message='bad context id')
            else:
                self._check_or_generate_context_id()

This identifier is used to logically group multiple Tasks and Messages that belong to the same conversation or execution session. It differs from the taskId, which identifies a specific unit of work. Providing a taskId in a subsequent SendMessage request indicates that the client wishes to continue an existing task that has not yet reached a terminal state. By contrast, the contextId spans the entire conversation and can be used to relate completed tasks, interrupted tasks, or even multiple concurrent tasks that belong to the same interaction. Clients can further help the agent by including referenceTaskIds in the Message object, allowing the agent to explicitly reference previous tasks as discussed earlier.

The A2A task store maintains the execution history for each task; however, the protocol deliberately does not define how this history should be projected into the agent's execution context. It is therefore the developer's responsibility to ensure that the underlying agent retrieves the relevant conversation history associated with the contextId and injects it into the prompt or execution context of the underlying foundation model. In other words, while A2A standardizes communication between agents, context management remains an implementation concern. For LLM-based agents in particular, the contextId serves as the mechanism for reconstructing the conversational state and maintaining continuity across multiple requests. An example is shown below (from a2a-samples repository):

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        error = self._validate_request(context)
        if error:
            raise ServerError(error=InvalidParamsError())

        input_event = self._get_input_event(context)
        context_id = context.context_id
        task_id = context.task_id
        try:
            ctx = None
            handler = None

            # Check if we have a saved context state for this session
            print(f'Len of ctx_states: {len(self.ctx_states)}', flush=True)
            saved_ctx_state = self.ctx_states.get(context_id, None)

            if saved_ctx_state is not None:
                # Resume with existing context
                logger.info(f'Resuming session {context_id} with saved context')
                ctx = Context.from_dict(self.agent, saved_ctx_state)
                handler = self.agent.run(
                    start_event=input_event,
                    ctx=ctx,
                )
            else:
                # New session!
                logger.info(f'Starting new session {context_id}')
                handler = self.agent.run(
                    start_event=input_event,
                )

            # Emit an initial task object
            updater = TaskUpdater(event_queue, task_id, context_id)
            await updater.submit()
            async for event in handler.stream_events():
                if isinstance(event, LogEvent):
                    # Send log event as intermediate message
                    await updater.update_status(
                        TaskState.working,
                        new_agent_text_message(event.msg, context_id, task_id),
                    )

            # Wait for final response
            final_response = await handler
            if isinstance(final_response, ChatResponseEvent):
                content = final_response.response
                metadata = (
                    final_response.citations
                    if hasattr(final_response, 'citations')
                    else None
                )
                if metadata is not None:
                    # ensure metadata is a dict of str keys
                    metadata = {str(k): v for k, v in metadata.items()}

                # save the context state to resume the current session
                self.ctx_states[context_id] = handler.ctx.to_dict()

                await updater.add_artifact(
                    [Part(root=TextPart(text=content))],
                    name='llama_summary',
                    metadata=metadata,
                )
                await updater.complete()
            else:
                await updater.failed(f'Unexpected completion {final_response}')

During execution, an agent typically produce intermediate events such as progress updates, partial outputs, artifact updates, or task status changes. These events are typically published through a task updater or an internal event queue. In streaming mode, the A2A client receives these events incrementally as they are produced, enabling real-time monitoring of the agent's execution. In non-streaming mode, the client receives only the final task, typically in the form of a completed task containing the final response, generated artifacts, and the terminal task state (or interrupted).

A2A and Interruptible Execution

A2A supports interruptible execution through the TASK_STATE_INPUT_REQUIRED task state. During the agent execution loop, if the agent determines that additional information is needed from the user, its execution will be suspended by updating the task status to input_required  signaling a need for additional data to the client.

elif require_user_input:
    await updater.update_status(
        TaskState.input_required,
        new_agent_text_message(
            item["content"],
            task.context_id,
            task.id,
        ),
        final=True,
    )
    break

At this point, the agent loop stops, i.e the agent executor yields control, and the A2A client receives the request for additional input ( the task transitions to TASK_STATE_INPUT_REQUIRED).

The client resumes the interaction by sending another SendMessage request whose Message.taskId references the existing task. The A2A server then continues processing that task from its current state rather than creating a new one; that is when the user response/message is received, the A2A framework will invoke the agent executor again (the agent executor implementation should have returned when yielding control ):

Tasks are immutable. Once a task reaches a terminal state (completed, failed, or canceled), it cannot be modified or resumed directly. Instead, continuing the interaction creates a new task that references the same contextId and, where appropriate, the previous task through referenceTaskIds, thereby preserving the conversational context while maintaining an immutable history of task executions. As discussed earlier, the agent executor implementation is responsible for managing the conversational state associated with a given contextId.

    async def start(
        self,
        call_context: ServerCallContext,
        create_task_if_missing: bool = False,
    ) -> None:
        """Starts the active task background processes.

        Concurrency Guarantee:
        Uses `self._lock` to ensure the producer and consumer tasks are strictly
        singleton instances for the lifetime of this ActiveTask.
        """
        logger.debug('ActiveTask[%s]: Starting', self._task_id)
        async with self._lock:
            if self._is_finished.is_set():
                raise InvalidParamsError(
                    f'Task {self._task_id} is already completed. Cannot start it again.'
                )

            if (
                self._producer_task is not None
                and self._consumer_task is not None
            ):
                logger.debug(
                    'ActiveTask[%s]: Already started, ignoring start request',
                    self._task_id,
                )
                return

            logger.debug(
                'ActiveTask[%s]: Executing setup (call_context: %s, create_task_if_missing: %s)',
                self._task_id,
                call_context,
                create_task_if_missing,
            )
            try:
                self._task_manager._call_context = call_context
                task = await self._task_manager.get_task()
                logger.debug('TASK (start): %s', task)

                if task:
                    self._task_created.set()
                    if task.status.state in TERMINAL_TASK_STATES:
                        raise InvalidParamsError(
                            message=f'Task {task.id} is in terminal state: {task.status.state}'
                        )
                elif not create_task_if_missing:
                    raise TaskNotFoundError

A2A Security

The A2A initiative addresses the lack of uniform standards for agent-to-agent communication by defining a structured communication flow between an A2A client and an A2A server, providing a robust foundation for building sophisticated agentic systems. However, special attention must be paid to the security risks introduced when using the protocol.

Security risks include Agent Card Spoofing [3], where an A2A client is tricked into trusting a forged Agent Card and subsequently sends sensitive tasks to a rogue A2A server,  A2A Task Replay [3], where an attacker captures a valid A2A task and replays it multiple times to trigger repeated execution, and injection attacks [3], where malformed or malicious messages manipulate the receiving agent. Network-level attacks may also result in A2A Server Impersonation [3], while additional threats include supply chain attacks through compromised A2A dependencies, insecure storage or transmission of authentication tokens, and other identity-related attacks [3].

Agent Cards deserve particular attention because they describe an agent's capabilities and are typically incorporated into the orchestrator's system prompt during planning. Consequently, a poisoned Agent Card can become a vector for prompt injection, influencing the planner to make incorrect delegation decisions or execute unintended actions or tools (e.g to achieve RCE and data exfiltration).

Mitigations include secure Agent Card resolution through trusted registries, Agent Card sanitization before exposing descriptions to the planning agent, the use of short-lived OAuth 2.0 access tokens with granular scopes for sensitive tools, timestamp and nonce verification to prevent task replay, and designing tasks to be idempotent whenever possible.

Implementations should also enforce strict schema validation, the principle of least privilege, mTLS, strong JWT validation, credential verification for every request, and secure token storage.

In addition, all uploaded files should be scanned for malicious content before being processed by an agent. Finally, when using A2A notifications or webhooks, webhook URLs should be validated to prevent Server-Side Request Forgery or SSRF attacks.

A2A Demo

Let’s now see how we can deploy two A2A servers, each hosting a different agent, one for hotel booking and one for flight booking. For simplicity, the responses are hardcoded, but you can imagine replacing them with more sophisticated agent using MCPs, Skills and multi agentic patterns (handoff, agent as a tool ...).

First, we define the plumbing required to expose the Agent Card and process JSON-RPC requests. Security interceptors are omitted for simplicity.

class BookingAgentExecutor(AgentExecutor):
    def __init__(self, responder: Callable[[str], str]) -> None:
        self._responder = responder

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        task = context.current_task or new_task_from_user_message(context.message)
        if context.current_task is None:
            await event_queue.enqueue_event(task)

        updater = TaskUpdater(
            event_queue=event_queue,
            task_id=task.id,
            context_id=task.context_id,
        )

        await updater.update_status(
            state=TaskState.TASK_STATE_WORKING,
            message=new_text_message("Processing booking request..."),
        )

        user_text = get_message_text(context.message) or ""
        result = self._responder(user_text)

        await updater.add_artifact(
            parts=[new_text_part(text=result, media_type="text/plain")]
        )

        await updater.update_status(
            state=TaskState.TASK_STATE_COMPLETED,
            message=new_text_message("Booking request completed."),
        )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        raise NotImplementedError("Cancellation is not implemented for this demo.")


def build_agent_card(
    *,
    name: str,
    description: str,
    url: str,
    skill_id: str,
    skill_name: str,
    skill_description: str,
    examples: list[str],
) -> AgentCard:
    skill = AgentSkill(
        id=skill_id,
        name=skill_name,
        description=skill_description,
        input_modes=["text/plain"],
        output_modes=["text/plain"],
        tags=["travel", "booking", skill_id],
        examples=examples,
    )

    return AgentCard(
        name=name,
        description=description,
        version="0.1.0",
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        capabilities=AgentCapabilities(
            streaming=False,
            extended_agent_card=False,
        ),
        supported_interfaces=[
            AgentInterface(
                protocol_binding="JSONRPC",
                url=url,
                protocol_version="1.0",
            )
        ],
        skills=[skill],
    )


def run_a2a_server(
    card: AgentCard,
    executor: AgentExecutor,
    host: str,
    port: int,
) -> None:
    handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
        agent_card=card,
    )

    routes = []
    routes.extend(create_agent_card_routes(card))
    routes.extend(create_jsonrpc_routes(handler, "/"))

    app = Starlette(routes=routes)
    uvicorn.run(app, host=host, port=port)

Now, here is our flight agent:

def flight_response(user_text: str) -> str:
    return (
        "FLIGHT_BOOKING_CONFIRMED\n"
        "Booking reference: FL-A2A-0427\n"
    )


def main() -> None:
    parser = argparse.ArgumentParser(description="Run the Flight Booking A2A server.")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=10001)
    args = parser.parse_args()

    url = f"http://{args.host}:{args.port}"

    card = build_agent_card(
        name="Flight Booking Agent",
        description="Books round-trip flights for a requested travel period.",
        url=url,
        skill_id="book_flight",
        skill_name="Book flight",
        skill_description=(
            "Given a user request containing a travel period, "
            "return a flight booking confirmation."
        ),
        examples=["Book me a flight from 2026-08-10 to 2026-08-15"],
    )

    run_a2a_server(
        card,
        BookingAgentExecutor(flight_response),
        args.host,
        args.port,
    )

Followed by our hotel agent:

def hotel_response(user_text: str) -> str:
    return (
        "HOTEL_BOOKING_CONFIRMED\n"
        "Booking reference: HT-A2A-3819\n"
    )


def main() -> None:
    parser = argparse.ArgumentParser(description="Run the Hotel Booking A2A server.")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=10002)
    args = parser.parse_args()

    url = f"http://{args.host}:{args.port}"

    card = build_agent_card(
        name="Hotel Booking Agent",
        description="Books hotels for a requested stay period.",
        url=url,
        skill_id="book_hotel",
        skill_name="Book hotel",
        skill_description=(
            "Given a user request containing a stay period, "
            "return a hotel booking confirmation."
        ),
        examples=["Book me a hotel from 2026-08-10 to 2026-08-15"],
    )

    run_a2a_server(
        card,
        BookingAgentExecutor(hotel_response),
        args.host,
        args.port,
    )

Once the two servers are deployed, we can define the following Bash helper, send_a2a_jsonrpc.sh, to send a non-streaming A2A JSON-RPC request:

#!/usr/bin/env bash

URL="${1:-http://127.0.0.1:10001}"
TEXT="${2:-Book me for 2026-08-24 to 2026-08-29}"

curl -sS -X POST "$URL/" \
  -H 'Content-Type: application/json' \
  -H 'A2A-Version: 1.0' \
  -d @- <<JSON | python -m json.tool
{
  "jsonrpc": "2.0",
  "id": "id-1",
  "method": "SendMessage",
  "params": {
    "message": {
      "role": "ROLE_USER",
      "parts": [
        {
          "text": "$TEXT"
        }
      ],
      "messageId": "message-1"
    }
  }
}

We can also test Agent Card discovery with:

curl -s http://127.0.0.1:10001/.well-known/agent-card.json | python -m json.tool
{
    "name": "Flight Booking Agent",
    "description": "Books round-trip flights for a requested travel period.",
    "supportedInterfaces": [
        {
            "url": "http://127.0.0.1:10001",
            "protocolBinding": "JSONRPC",
            "protocolVersion": "1.0"
        }
    ],
    "version": "0.1.0",
    "capabilities": {
        "streaming": false,
        "extendedAgentCard": false
    },
    "defaultInputModes": [
        "text/plain"
    ],
    "defaultOutputModes": [
        "text/plain"
    ],
    "skills": [
        {
            "id": "book_flight",
            "name": "Book flight",
            "description": "Given a user request containing a travel period, return a flight booking confirmation.",
            "tags": [
                "travel",
                "booking",
                "book_flight"
            ],
            "examples": [
                "Book me a flight from 2026-08-10 to 2026-08-15"
            ],
            "inputModes": [
                "text/plain"
            ],
            "outputModes": [
                "text/plain"
            ]
        }
    ]
}

Next, we send a message to the flight agent:

./send_a2a_jsonrpc.sh http://127.0.0.1:10001 \
  "Book me a flight from 2026-08-24 to 2026-08-30"

This returns a completed A2A task containing the final artifact along with history we listed earlier in the executor implementation:

{
  "result": {
    "task": {
      "id": "0bcddc7b-3687-4494-9232-d8f1c17b7af9",
      "contextId": "4ac3986c-6e6d-40dd-923b-ecbc899145ff",
      "status": {
        "state": "TASK_STATE_COMPLETED",
        "message": {
          "messageId": "a751238a-37c5-49ff-8342-686273c916dd",
          "role": "ROLE_AGENT",
          "parts": [
            {
              "text": "Booking request completed."
            }
          ]
        },
        "timestamp": "2026-07-05T10:36:26.927722Z"
      },
      "artifacts": [
        {
          "artifactId": "20343763-c6d1-43c9-81d4-012924dd982a",
          "parts": [
            {
              "text": "FLIGHT_BOOKING_CONFIRMED\nBooking reference: FL-A2A-0427\n",
              "mediaType": "text/plain"
            }
          ]
        }
      ],
      "history": [
        {
          "messageId": "message-1",
          "contextId": "4ac3986c-6e6d-40dd-923b-ecbc899145ff",
          "taskId": "0bcddc7b-3687-4494-9232-d8f1c17b7af9",
          "role": "ROLE_USER",
          "parts": [
            {
              "text": "Book me a flight from 2026-08-10 to 2026-08-15"
            }
          ]
        },
        {
          "messageId": "13895840-5dd7-4072-9a47-b9af324988c2",
          "role": "ROLE_AGENT",
          "parts": [
            {
              "text": "Processing booking request..."
            }
          ]
        }
      ]
    }
  },
  "id": "id-1",
  "jsonrpc": "2.0"
}

At this point, we have a completed A2A task with artifacts and intermediate updates.

To integrate this into a multi-agent ecosystem, the routing agent can first fetch and list the available Agent Cards (e.g private maintained registry in your organization), append their capabilities to the system prompt, and then expose the remote agents in one of two ways. either as a send_message tool that takes the target agent name and message as input, or as remote A2A sub-agents like https://adk.dev/a2a/quickstart-consuming/ (again many patterns are possible).

@a2a_experimental
class RemoteA2aAgent(BaseAgent):
  """Agent that communicates with a remote A2A agent via A2A client.

  This agent supports multiple ways to specify the remote agent:
  1. Direct AgentCard object
  2. URL to agent card JSON
  3. File path to agent card JSON

  The agent handles:
  - Agent card resolution and validation
  - HTTP client management with proper resource cleanup
  - A2A message conversion and error handling
  - Session state management across requests
  """

source: https://github.com/google/adk-python/blob/main/src/google/adk/agents/remote_a2a_agent.py

As listed below, the Google ADK uses the A2A client provided by the official python sdk to send message to the remote A2A server; specifically, the current implementation constructs by default the A2A client with non-streaming mode.

async def _run_async_impl(
      self, ctx: InvocationContext
  ) -> AsyncGenerator[Event, None]:
    """Core implementation for async agent execution."""
    try:
      await self._ensure_resolved()
    except Exception as e:
      yield Event(
          author=self.name,
          error_message=f"Failed to initialize remote A2A agent: {e}",
          invocation_id=ctx.invocation_id,
          branch=ctx.branch,
      )
      return

    # Create A2A request for function response or regular message
    a2a_request = self._create_a2a_request_for_user_function_response(ctx)
    if not a2a_request:
      message_parts, context_id = self._construct_message_parts_from_session(
          ctx
      )

      if not message_parts:
        logger.warning(
            "No parts to send to remote A2A agent. Emitting empty event."
        )
        yield Event(
            author=self.name,
            content=genai_types.Content(),
            invocation_id=ctx.invocation_id,
            branch=ctx.branch,
        )
        return

      a2a_request = A2AMessage(
          message_id=platform_uuid.new_uuid(),
          parts=message_parts,
          role="user",
          context_id=context_id,
      )

    logger.debug(build_a2a_request_log(a2a_request))

    try:
      a2a_request, parameters = await execute_before_request_interceptors(
          self._config.request_interceptors, ctx, a2a_request
      )

      if isinstance(a2a_request, Event):
        yield a2a_request
        return

      # Backward compatibility
      if self._a2a_request_meta_provider:
        parameters.request_metadata = self._a2a_request_meta_provider(
            ctx, a2a_request
        )

      # TODO: Add support for requested_extension and
      # message_send_configuration once they are supported by the A2A client.
      async for a2a_response in self._a2a_client.send_message(
          request=a2a_request,
          request_metadata=parameters.request_metadata,
          context=parameters.client_call_context,
      ):
        logger.debug(build_a2a_response_log(a2a_response))

        metadata = None
        if isinstance(a2a_response, tuple):
          task = a2a_response[0]
          if task:
            metadata = task.metadata
        else:
          metadata = a2a_response.metadata

        if metadata and metadata.get(_NEW_A2A_ADK_INTEGRATION_EXTENSION):
          event = await self._handle_a2a_response_v2(a2a_response, ctx)
        else:
          event = await self._handle_a2a_response(a2a_response, ctx)
        if not event:
          continue

        event = await execute_after_request_interceptors(
            self._config.request_interceptors, ctx, a2a_response, event
        )
        if not event:
          continue

        # Add metadata about the request and response
        event.custom_metadata = event.custom_metadata or {}
        event.custom_metadata[A2A_METADATA_PREFIX + "request"] = (
            a2a_request.model_dump(exclude_none=True, by_alias=True)
        )
        # If the response is a ClientEvent, record the task state; otherwise,
        # record the message object.
        if isinstance(a2a_response, tuple):
          event.custom_metadata[A2A_METADATA_PREFIX + "response"] = (
              a2a_response[0].model_dump(exclude_none=True, by_alias=True)
          )
        else:
          event.custom_metadata[A2A_METADATA_PREFIX + "response"] = (
              a2a_response.model_dump(exclude_none=True, by_alias=True)
          )

        yield event

Conclusion

The A2A protocol is a peer-to-peer framework that defines a standard for how agents discover and communicate with one another. An agentic application discovers available remote agents by retrieving their Agent Cards and then uses its underlying model or orchestration logic to determine which A2A agent should handle a given request. Each A2A agent is hosted behind an A2A Server, which routes incoming A2A messages to an AgentExecutor responsible for executing the request (e.g., running an agent which could be implemented with a particular framework or not) and streaming results back to the client over an established SSE stream (in streaming mode). By connecting trusted remote A2A agents with your agentic application, you can build systems that combine local agents with remotely hosted A2A agents. Likewise, your own application can expose an A2A Server, allowing other agentic applications to interact with its agents. Over time, these interconnected applications can evolve into sophisticated multi-agent systems (MAS) or mesh of agents. Note that each local agent may implement a different orchestration or multi-agentic pattern (swarm, agent as a tool...) independently of the A2A protocol.

[1] https://github.com/a2aproject/A2A

[2] https://modelcontextprotocol.io

[3] https://arxiv.org/pdf/2504.16902

[4] https://arxiv.org/pdf/2505.12490

Share This Post

Check out these related posts

Neural Information Retrieval & Acceleration of The Nearest-Neighbor Search (NNS)

Dancing With Agents: A Deep Dive into Multi-Agent Systems

Extending Agent Capabilities with Agent Skills