Skip to Content

Building Execution Engines

Execution engines let you run Ark agents with custom frameworks (LangChain, CrewAI, etc.) while integrating with Ark’s resource management. They receive agent context via the A2A v0.3.0 query extension  and return results over the A2A protocol.

For hosting raw A2A agents without Ark integration, see Building A2A Servers.

Quickstart

Install the Ark SDK:

pip install ark-sdk

Create a minimal executor:

from ark_sdk.executor import BaseExecutor, ExecutionEngineRequest, Message from ark_sdk.executor_app import ExecutorApp class MyExecutor(BaseExecutor): async def execute_agent(self, request: ExecutionEngineRequest) -> list[Message]: # request.agent has the agent config (name, prompt, model, parameters) # request.userInput has the user message # request.mcpServers has MCP server connection info (url, transport, headers, tool allowlist) # request.conversationId has the conversation threading ID (from A2A contextId) prompt = self._resolve_prompt(request.agent) return [Message(role="assistant", content=f"Echo: {request.userInput.content}")] app = ExecutorApp( executor=MyExecutor("my-engine"), engine_name="my-engine", description="A simple execution engine", ) if __name__ == "__main__": app.run()

Run it:

python main.py

The engine serves an agent card at /.well-known/agent-card.json that declares support for the Ark query extension.

How It Works

When Ark dispatches a query to your engine:

  1. The A2A message carries only a QueryRef (name + namespace) via the query extension — no agent config, credentials, or tool definitions cross the wire
  2. The SDK extracts the QueryRef and the A2A contextId (conversation threading)
  3. Within your executor pod, the SDK fetches the Query CRD and resolves agent config, model (including API keys), and MCP servers from the cluster using the pod’s service account
  4. Your execute_agent() receives a fully populated ExecutionEngineRequest with conversationId

Secrets never leave the cluster over A2A — they are resolved locally within your executor pod. You never interact with the extension or Kubernetes APIs directly.

Extension Spec

The query extension contract is defined at ark/api/extensions/query/v1/. The schema is minimal: just name and namespace fields identifying the Query resource.

Deploying to Kubernetes

Create an ExecutionEngine resource pointing to your engine:

apiVersion: ark.mckinsey.com/v1prealpha1 kind: ExecutionEngine metadata: name: my-engine spec: address: value: "http://my-engine-service:8000"

Then reference it from an Agent:

apiVersion: ark.mckinsey.com/v1alpha1 kind: Agent metadata: name: my-agent spec: prompt: "You are a helpful assistant." executionEngine: name: my-engine modelRef: name: my-model

Queries targeting my-agent will be routed to your engine via A2A.

Streaming Support

If your engine writes streaming chunks to the broker (via the memory event stream), opt in by adding the streaming-supported annotation to your ExecutionEngine resource:

apiVersion: ark.mckinsey.com/v1prealpha1 kind: ExecutionEngine metadata: name: my-engine annotations: ark.mckinsey.com/streaming-supported: "true" spec: address: value: "http://my-engine-service:8000"

Without this annotation, streaming requests are handled transparently — clients still receive a valid SSE response, just without incremental chunks.

Conversation Threading

When a query includes spec.conversationId, the controller sends it to your engine as the A2A contextId. The SDK extracts it and populates request.conversationId.

Use this for stateful engines that need multi-turn support:

class StatefulExecutor(BaseExecutor): async def execute_agent(self, request: ExecutionEngineRequest) -> list[Message]: session = self.get_or_create_session(request.conversationId) response = session.process(request.userInput.content) return [Message(role="assistant", content=response)]

If conversationId is empty, the query is a single-turn interaction. The conversation ID returned via A2A response is stored in status.conversationId for clients to use in follow-up queries.

Key Types

Import from ark_sdk.executor:

TypePurpose
BaseExecutorAbstract base class — implement execute_agent()
ExecutionEngineRequestInput to your executor (agent, mcpServers, userInput, conversationId)
AgentConfigAgent name, namespace, prompt, model, parameters
MessageConversation message (role + content)
MCPServerConfigMCP server connection info (name, url, transport, timeout, headers, tools allowlist)
ModelModel name, provider type, config

Import from ark_sdk.executor_app:

TypePurpose
ExecutorAppWraps your executor in an A2A server with health check

MCP Tools

When an agent has MCP-type tools, the SDK resolves the MCPServer CRDs and provides connection info via request.mcpServers. Each entry contains everything needed to establish an MCP client connection:

for server in request.mcpServers: # server.name — MCPServer resource name # server.url — resolved server address # server.transport — "http" or "sse" # server.timeout — e.g. "30s" # server.headers — resolved headers (secrets already dereferenced) # server.tools — allowlist of tool names this agent can use

Your executor connects to each MCP server using its own MCP client (e.g., via LangChain, CrewAI, or the MCP SDK directly). The tools list is an allowlist — only register and use tools that appear in this list, even if the server exposes additional tools.

Non-MCP tool types (http, agent, team, builtin) are not included in mcpServers.

Last updated on