Building Execution Engines
Execution engines let you run Ark agents with custom frameworks (LangChain, CrewAI, etc.) while integrating with Ark’s resource management. They receive agent context via the A2A v0.3.0 query extension and return results over the A2A protocol.
For hosting raw A2A agents without Ark integration, see Building A2A Servers.
Quickstart
Install the Ark SDK:
pip install ark-sdkCreate a minimal executor:
from ark_sdk.executor import BaseExecutor, ExecutionEngineRequest, Message
from ark_sdk.executor_app import ExecutorApp
class MyExecutor(BaseExecutor):
async def execute_agent(self, request: ExecutionEngineRequest) -> list[Message]:
# request.agent has the agent config (name, prompt, model, parameters)
# request.userInput has the user message
# request.mcpServers has MCP server connection info (url, transport, headers, tool allowlist)
# request.conversationId has the conversation threading ID (from A2A contextId)
prompt = self._resolve_prompt(request.agent)
return [Message(role="assistant", content=f"Echo: {request.userInput.content}")]
app = ExecutorApp(
executor=MyExecutor("my-engine"),
engine_name="my-engine",
description="A simple execution engine",
)
if __name__ == "__main__":
app.run()Run it:
python main.pyThe engine serves an agent card at /.well-known/agent-card.json that declares support for the Ark query extension.
How It Works
When Ark dispatches a query to your engine:
- The A2A message carries only a
QueryRef(name + namespace) via the query extension — no agent config, credentials, or tool definitions cross the wire - The SDK extracts the QueryRef and the A2A
contextId(conversation threading) - Within your executor pod, the SDK fetches the Query CRD and resolves agent config, model (including API keys), and MCP servers from the cluster using the pod’s service account
- Your
execute_agent()receives a fully populatedExecutionEngineRequestwithconversationId
Secrets never leave the cluster over A2A — they are resolved locally within your executor pod. You never interact with the extension or Kubernetes APIs directly.
Extension Spec
The query extension contract is defined at ark/api/extensions/query/v1/. The schema is minimal: just name and namespace fields identifying the Query resource.
Deploying to Kubernetes
Create an ExecutionEngine resource pointing to your engine:
apiVersion: ark.mckinsey.com/v1prealpha1
kind: ExecutionEngine
metadata:
name: my-engine
spec:
address:
value: "http://my-engine-service:8000"Then reference it from an Agent:
apiVersion: ark.mckinsey.com/v1alpha1
kind: Agent
metadata:
name: my-agent
spec:
prompt: "You are a helpful assistant."
executionEngine:
name: my-engine
modelRef:
name: my-modelQueries targeting my-agent will be routed to your engine via A2A.
Streaming Support
If your engine writes streaming chunks to the broker (via the memory event stream), opt in by adding the streaming-supported annotation to your ExecutionEngine resource:
apiVersion: ark.mckinsey.com/v1prealpha1
kind: ExecutionEngine
metadata:
name: my-engine
annotations:
ark.mckinsey.com/streaming-supported: "true"
spec:
address:
value: "http://my-engine-service:8000"Without this annotation, streaming requests are handled transparently — clients still receive a valid SSE response, just without incremental chunks.
Conversation Threading
When a query includes spec.conversationId, the controller sends it to your engine as the A2A contextId. The SDK extracts it and populates request.conversationId.
Use this for stateful engines that need multi-turn support:
class StatefulExecutor(BaseExecutor):
async def execute_agent(self, request: ExecutionEngineRequest) -> list[Message]:
session = self.get_or_create_session(request.conversationId)
response = session.process(request.userInput.content)
return [Message(role="assistant", content=response)]If conversationId is empty, the query is a single-turn interaction. The conversation ID returned via A2A response is stored in status.conversationId for clients to use in follow-up queries.
Key Types
Import from ark_sdk.executor:
| Type | Purpose |
|---|---|
BaseExecutor | Abstract base class — implement execute_agent() |
ExecutionEngineRequest | Input to your executor (agent, mcpServers, userInput, conversationId) |
AgentConfig | Agent name, namespace, prompt, model, parameters |
Message | Conversation message (role + content) |
MCPServerConfig | MCP server connection info (name, url, transport, timeout, headers, tools allowlist) |
Model | Model name, provider type, config |
Import from ark_sdk.executor_app:
| Type | Purpose |
|---|---|
ExecutorApp | Wraps your executor in an A2A server with health check |
MCP Tools
When an agent has MCP-type tools, the SDK resolves the MCPServer CRDs and provides connection info via request.mcpServers. Each entry contains everything needed to establish an MCP client connection:
for server in request.mcpServers:
# server.name — MCPServer resource name
# server.url — resolved server address
# server.transport — "http" or "sse"
# server.timeout — e.g. "30s"
# server.headers — resolved headers (secrets already dereferenced)
# server.tools — allowlist of tool names this agent can useYour executor connects to each MCP server using its own MCP client (e.g., via LangChain, CrewAI, or the MCP SDK directly). The tools list is an allowlist — only register and use tools that appear in this list, even if the server exposes additional tools.
Non-MCP tool types (http, agent, team, builtin) are not included in mcpServers.