|
| 1 | +# Queued A2A Protocol Adapter |
| 2 | + |
| 3 | +## What is Queued A2A? |
| 4 | + |
| 5 | +Queued A2A describes agents that use the **A2A wire protocol over an async message broker** instead of HTTP. The message format — task requests, status updates, results — is identical to standard A2A. What changes is the transport: callers publish messages to a queue or topic on a broker (RabbitMQ, Azure Service Bus) and receive responses asynchronously on a reply topic, rather than making a synchronous HTTP call. |
| 6 | + |
| 7 | +This pattern is common in: |
| 8 | + |
| 9 | +- **KEDA-scaled workers** — agents that are scaled to zero when idle. A KEDA trigger watches the queue depth; the worker spins up when messages arrive. |
| 10 | +- **Long-running tasks** — research, code generation, batch processing where a 30-second HTTP timeout is impractical. |
| 11 | +- **Decoupled pipelines** — agent chains where intermediate results pass through a broker, enabling retries, dead-lettering, and fan-out without tight coupling between agents. |
| 12 | + |
| 13 | +The rockbot project demonstrates this pattern concretely: `SampleAgent` and `ResearchAgent` run as KEDA-scalable workers, communicate via RabbitMQ, and use A2A message shapes for all task exchange. |
| 14 | + |
| 15 | +## Supported broker technologies |
| 16 | + |
| 17 | +| Technology | `TransportType` | Key fields | |
| 18 | +|---|---|---| |
| 19 | +| RabbitMQ (AMQP 0-9-1) | `Amqp` | `host`, `port`, `virtualHost`, `exchange`, `taskTopic` | |
| 20 | +| Azure Service Bus | `AzureServiceBus` | `namespace`, `entityPath`, `taskTopic` | |
| 21 | + |
| 22 | +## The QueuedAgentCard |
| 23 | + |
| 24 | +The discovery and registration format is a `QueuedAgentCard` — an A2A agent card extended with a `queueEndpoint` object: |
| 25 | + |
| 26 | +```json |
| 27 | +{ |
| 28 | + "name": "ResearchAgent", |
| 29 | + "description": "On-demand research agent using web search and page fetching", |
| 30 | + "version": "1.0", |
| 31 | + "skills": [ |
| 32 | + { |
| 33 | + "id": "research", |
| 34 | + "name": "Research", |
| 35 | + "description": "Research a topic using web search", |
| 36 | + "tags": ["search", "web"] |
| 37 | + } |
| 38 | + ], |
| 39 | + "defaultInputModes": ["application/json"], |
| 40 | + "defaultOutputModes": ["application/json"], |
| 41 | + "queueEndpoint": { |
| 42 | + "technology": "rabbitmq", |
| 43 | + "host": "rabbitmq.example.com", |
| 44 | + "port": 5672, |
| 45 | + "virtualHost": "/", |
| 46 | + "exchange": "rockbot", |
| 47 | + "taskTopic": "agent.task.ResearchAgent", |
| 48 | + "responseTopic": "agent.response.{callerName}" |
| 49 | + }, |
| 50 | + "id": "3fa85f64-...", |
| 51 | + "isLive": true |
| 52 | +} |
| 53 | +``` |
| 54 | + |
| 55 | +### `queueEndpoint` fields |
| 56 | + |
| 57 | +| Field | Required | Description | |
| 58 | +|---|---|---| |
| 59 | +| `technology` | Yes | `"rabbitmq"` or `"azure-service-bus"` | |
| 60 | +| `taskTopic` | Yes | Routing key or topic path that callers publish task messages to | |
| 61 | +| `host` | RabbitMQ | Broker hostname (e.g. `rabbitmq.example.com`) | |
| 62 | +| `port` | No | Broker port; defaults to 5672 (AMQP) or 5671 (AMQPS) if omitted | |
| 63 | +| `virtualHost` | No | AMQP virtual host; typically `/` | |
| 64 | +| `exchange` | No | AMQP exchange name (topic exchange, e.g. `rockbot`) | |
| 65 | +| `responseTopic` | No | Pattern callers subscribe to for responses (e.g. `agent.response.{callerName}`) | |
| 66 | +| `namespace` | Azure SB | Fully-qualified Service Bus namespace (e.g. `mybus.servicebus.windows.net`) | |
| 67 | +| `entityPath` | Azure SB | Service Bus queue or topic path | |
| 68 | + |
| 69 | +### Registry-added fields |
| 70 | + |
| 71 | +`id` and `isLive` are added by the registry on discovery responses. Omit them when registering. |
| 72 | + |
| 73 | +## API endpoints |
| 74 | + |
| 75 | +| Method | Path | Auth | Description | |
| 76 | +|---|---|---|---| |
| 77 | +| `POST` | `/a2a/async/agents` | Agent or Admin | Register an agent with queue endpoint details | |
| 78 | +| `GET` | `/a2a/async/agents` | Public | List / discover queued agents (paginated) | |
| 79 | +| `GET` | `/a2a/async/agents/{id}` | Public | Retrieve a specific queued agent card | |
| 80 | + |
| 81 | +The list endpoint supports the same query parameters as other protocol list endpoints: `capability`, `tags`, `liveOnly` (default `true`), `page`, `pageSize` (max 100). |
| 82 | + |
| 83 | +## Design decisions |
| 84 | + |
| 85 | +### ProtocolType stays A2A |
| 86 | + |
| 87 | +The A2A message shapes (task request, status update, result) are unchanged. Only the transport layer differs. Using a new `ProtocolType` would fragment discovery — a consumer searching for A2A agents would miss queue-backed ones. Keeping `ProtocolType.A2A` means `GET /discover/agents?protocol=A2A` returns both HTTP and queued A2A agents. |
| 88 | + |
| 89 | +`TransportType` distinguishes them: `Http` for classic A2A over HTTP, `Amqp` for RabbitMQ, `AzureServiceBus` for Azure. |
| 90 | + |
| 91 | +### Separate adapter from the HTTP A2A adapter |
| 92 | + |
| 93 | +The HTTP A2A adapter serves and accepts standard A2A agent cards where `supportedInterfaces[].url` is an HTTP URL. For queued agents, the relevant connection information (exchange, routing key, virtual host) doesn't fit cleanly into a URL field. A dedicated `/a2a/async/agents` surface with a `queueEndpoint` object is clearer for consumers than trying to encode broker details into a URL. |
| 94 | + |
| 95 | +The existing `GET /a2a/agents/{id}` endpoint continues to work for HTTP A2A agents and will return a synthetic placeholder URL for any non-HTTP endpoints it encounters. The dedicated queued endpoint is the intended surface for agents that are truly queue-native. |
| 96 | + |
| 97 | +### Connection details in ProtocolMetadata |
| 98 | + |
| 99 | +All `queueEndpoint` fields and the full card (version, skills, I/O modes) are serialised into `Endpoint.ProtocolMetadata` at registration time. On discovery, the mapper deserialises them to reconstruct the original card exactly. This is the same round-trip strategy used by A2A, MCP, and ACP — no domain model changes required for new fields. |
| 100 | + |
| 101 | +`Endpoint.Address` holds `taskTopic` — the routing key / entity path where callers publish. This is the primary "address" of the agent from the registry's perspective, analogous to an HTTP URL. |
| 102 | + |
| 103 | +### No credentials in the card |
| 104 | + |
| 105 | +Connection strings with usernames, passwords, or SAS tokens are not stored. The card contains only the structural connection details (host, port, exchange, topic). Callers are expected to hold broker credentials separately — via Kubernetes secrets, Azure Key Vault, or equivalent — and combine them with the structural details from the registry. |
| 106 | + |
| 107 | +### Skills and domain capabilities |
| 108 | + |
| 109 | +Skills in the `QueuedAgentCard` map directly to registry capabilities, enabling queued agents to be discovered through the generic `GET /discover/agents?capability=research` endpoint alongside HTTP agents. When an agent registers with multiple skills, all skills become capabilities in the domain model. |
| 110 | + |
| 111 | +On discovery, the stored skills are returned verbatim (preserving original string IDs like `"research"` rather than internal Guid IDs) because they are read from `ProtocolMetadata`, not reconstructed from capabilities. |
| 112 | + |
| 113 | +### isLive reflects heartbeat state |
| 114 | + |
| 115 | +Queued agents typically use the `Persistent` liveness model and call `POST /agents/{id}/endpoints/{eid}/heartbeat` periodically. KEDA-scaled workers may use `Ephemeral` liveness — registering when a pod starts, calling `POST /agents/{id}/endpoints/{eid}/renew` on each task invocation, and relying on TTL expiry when the pod scales to zero. |
| 116 | + |
| 117 | +`isLive: false` does not mean the agent's queue is unavailable — it means the registry has not seen a heartbeat or renewal within the grace period. Consumers can choose to route work to stale endpoints if they know the queue is durable. |
| 118 | + |
| 119 | +## Registration flows |
| 120 | + |
| 121 | +### RabbitMQ agent |
| 122 | + |
| 123 | +```json |
| 124 | +POST /a2a/async/agents |
| 125 | +Authorization: X-Api-Key: ar_... |
| 126 | + |
| 127 | +{ |
| 128 | + "name": "ResearchAgent", |
| 129 | + "description": "On-demand research agent", |
| 130 | + "version": "1.0", |
| 131 | + "skills": [ |
| 132 | + { "id": "research", "name": "Research", "description": "Researches a topic", "tags": ["search"] } |
| 133 | + ], |
| 134 | + "defaultInputModes": ["application/json"], |
| 135 | + "defaultOutputModes": ["application/json"], |
| 136 | + "queueEndpoint": { |
| 137 | + "technology": "rabbitmq", |
| 138 | + "host": "rabbitmq.prod.example.com", |
| 139 | + "port": 5672, |
| 140 | + "virtualHost": "/", |
| 141 | + "exchange": "rockbot", |
| 142 | + "taskTopic": "agent.task.ResearchAgent", |
| 143 | + "responseTopic": "agent.response.{callerName}" |
| 144 | + } |
| 145 | +} |
| 146 | +``` |
| 147 | + |
| 148 | +Response `201 Created`: |
| 149 | +```json |
| 150 | +{ |
| 151 | + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", |
| 152 | + "name": "ResearchAgent", |
| 153 | + "description": "On-demand research agent", |
| 154 | + "version": "1.0", |
| 155 | + "skills": [{ "id": "research", "name": "Research", "description": "Researches a topic", "tags": ["search"] }], |
| 156 | + "defaultInputModes": ["application/json"], |
| 157 | + "defaultOutputModes": ["application/json"], |
| 158 | + "queueEndpoint": { |
| 159 | + "technology": "rabbitmq", |
| 160 | + "host": "rabbitmq.prod.example.com", |
| 161 | + "port": 5672, |
| 162 | + "virtualHost": "/", |
| 163 | + "exchange": "rockbot", |
| 164 | + "taskTopic": "agent.task.ResearchAgent", |
| 165 | + "responseTopic": "agent.response.{callerName}" |
| 166 | + }, |
| 167 | + "isLive": true |
| 168 | +} |
| 169 | +``` |
| 170 | + |
| 171 | +### Azure Service Bus agent |
| 172 | + |
| 173 | +```json |
| 174 | +POST /a2a/async/agents |
| 175 | +{ |
| 176 | + "name": "InvoiceProcessor", |
| 177 | + "description": "Processes invoice documents", |
| 178 | + "version": "1.0", |
| 179 | + "skills": [ |
| 180 | + { "id": "process-invoice", "name": "Process Invoice", "description": "Extracts and validates invoice data", "tags": ["finance", "ocr"] } |
| 181 | + ], |
| 182 | + "defaultInputModes": ["application/json", "application/pdf"], |
| 183 | + "defaultOutputModes": ["application/json"], |
| 184 | + "queueEndpoint": { |
| 185 | + "technology": "azure-service-bus", |
| 186 | + "namespace": "mybus.servicebus.windows.net", |
| 187 | + "entityPath": "invoice-processor-tasks", |
| 188 | + "taskTopic": "invoice-processor-tasks", |
| 189 | + "responseTopic": "invoice-processor-responses" |
| 190 | + } |
| 191 | +} |
| 192 | +``` |
| 193 | + |
| 194 | +### Keeping liveness alive (KEDA worker pattern) |
| 195 | + |
| 196 | +An ephemeral, KEDA-scaled worker registers on pod startup and renews on each task: |
| 197 | + |
| 198 | +```bash |
| 199 | +# On pod startup — register with 5-minute TTL |
| 200 | +curl -X POST https://registry.example.com/a2a/async/agents \ |
| 201 | + -H "X-Api-Key: $REGISTRY_KEY" \ |
| 202 | + -d '{ "name": "ResearchAgent", ..., "livenessModel": "Ephemeral", "ttlSeconds": 300 }' |
| 203 | + |
| 204 | +# Capture the endpoint ID from the response, then on each task invocation: |
| 205 | +curl -X POST https://registry.example.com/agents/$AGENT_ID/endpoints/$ENDPOINT_ID/renew \ |
| 206 | + -H "X-Api-Key: $REGISTRY_KEY" |
| 207 | +``` |
| 208 | + |
| 209 | +When the pod scales to zero, the TTL expires and the agent is no longer returned in `liveOnly=true` queries. The queue address in the card remains valid — KEDA will scale a new pod when messages arrive. |
| 210 | + |
| 211 | +For persistent workers, use `heartbeatIntervalSeconds` instead and call `POST .../heartbeat` on schedule. |
| 212 | + |
| 213 | +## Discovering queued agents |
| 214 | + |
| 215 | +```bash |
| 216 | +# All live queued A2A agents |
| 217 | +GET /a2a/async/agents |
| 218 | + |
| 219 | +# Filter by capability |
| 220 | +GET /a2a/async/agents?capability=research&liveOnly=true |
| 221 | + |
| 222 | +# Filter by tag |
| 223 | +GET /a2a/async/agents?tags=finance,ocr |
| 224 | +``` |
| 225 | + |
| 226 | +Discovery returns a paginated list: |
| 227 | + |
| 228 | +```json |
| 229 | +{ |
| 230 | + "agents": [...], |
| 231 | + "totalCount": 12, |
| 232 | + "page": 1, |
| 233 | + "pageSize": 20, |
| 234 | + "totalPages": 1, |
| 235 | + "hasNextPage": false |
| 236 | +} |
| 237 | +``` |
| 238 | + |
| 239 | +Queued agents also appear in the generic discovery endpoint (`GET /discover/agents?protocol=A2A`) alongside HTTP A2A agents, since they share `ProtocolType.A2A`. |
0 commit comments