Streaming
Note: This is a work-in-progress specification for an upcoming feature. This document will change as the implementation evoles and is tested. Streaming is currently implemented as additional endpoints within a Memory resource. In future releases, a dedicated EventStream resource will be introduced. A single service can optionally implement both Memory and streaming APIs.
Real-time message streaming allows users to see agent responses as they’re generated during query execution.
Streaming is implemented in line with the OpenAI Streaming Responses specification, meaning that any client that supports the OpenAI spec can read streaming responses from Ark queries. The OpenAI v1/completions
Ark Api streams responses when the stream=True
parameter is set.
Enabling Streaming
Enable streaming for a query by setting the ark.mckinsey.com/streaming-enabled
annotation:
apiVersion: ark.mckinsey.com/v1alpha1
kind: Query
metadata:
name: my-query
annotations:
# Enable streaming for this query.
ark.mckinsey.com/streaming-enabled: "true"
spec:
input: "Hello"
# Point to a memory resource that implemnts the event stream API.
memory: "streaming-memory"
targets:
- name: my-agent
The Ark OpenAI completions API returns a streaming response when stream: true
is set:
# Curl the ark api completions endpoint with streaming enabled.
curl -N -X POST http://ark-api.default.127.0.0.1.nip.io:8080/openai/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "model/gpt-4",
"messages": [
{"role": "user", "content": "Write a haiku about streaming data"}
],
"stream": true # <-- Enable streaming
}'
The response will be Server-Sent Events (SSE) with streaming chunks in OpenAI format, following the Read Stream API specification, which is fully OpenAI compatible. This means that consumers can query Ark and stream responses as if it was an LLM, and use existing OpenAI SDKs etc.
Streaming Responses
Standard OpenAI streaming response chunks are sent for each query target type. An additional Ark Metadata field is attached to each chunk to provide additional details such as the session ID and query target, which allows for complex streaming responses from multi-target or team queries to be processed client-side.
Streaming responses follow the OpenAI specification exactly; the additional ark
field will not interfere with regular clients as the OpenAI spec allows for additional fields to be included in the response (which standard clients will ingore).
If an LLM does not support streaming, the complete response from the model will be sent at the end of the query as a single chunk, as per the OpenAI specification. Streamign is currently supported in Ark for openai
and azure
models and is not yet supported for bedrock
models.
Model Query Streaming
Direct model queries stream the LLM response, exactly as per the OpenAI Completions API specification:
curl -N "http://localhost:8083/stream/model-query-123"
data: {"choices":[{"delta":{"content":"The"}}],"ark":{"model":"gpt-4","query":"123","target":"model/gpt-4"}}
data: {"choices":[{"delta":{"content":" capital"}}],"ark":{"model":"gpt-4","query":"123","target":"model/gpt-4"}}
data: {"choices":[{"delta":{"content":" is"}}],"ark":{"model":"gpt-4","query":"123","target":"model/gpt-4"}}
Agent Query Streaming
Agent queries include intermediate tool calls in the stream. This allows you to show tool calls as they happen in team time:
curl -N "http://localhost:8083/stream/agent-query-456"
data: {"choices":[{"delta":{"tool_calls":[{"function":{"name":"web_search","arguments":"{\"query\":\"weather\"}"}}]}}],"ark":{"agent":"researcher","model":"gpt-4","query":"456"}}
data: {"choices":[{"delta":{"content":"Based"}}],"ark":{"agent":"researcher","model":"gpt-4","query":"456"}}
data: {"choices":[{"delta":{"content":" on"}}],"ark":{"agent":"researcher","model":"gpt-4","query":"456"}}
Tool call chunks are sent exactly as per the OpenAI specification.
Team Query Streaming
Team queries stream all LLM calls from every team member, providing full visibility into multi-agent execution responses and tool calls:
curl -N "http://localhost:8083/stream/team-query-789"
data: {"choices":[{"delta":{"content":"Let"}}],"ark":{"agent":"team-leader","team":"research-team","model":"gpt-4","query":"789","target":"team/research-team"}}
data: {"choices":[{"delta":{"content":" me"}}],"ark":{"agent":"team-leader","team":"research-team","model":"gpt-4","query":"789"}}
data: {"choices":[{"delta":{"content":"I'll"}}],"ark":{"agent":"analyst","team":"research-team","model":"gpt-4","query":"789"}}
If using the OpenAI completions API to query teams with streaming, it becomes the client’s responsiblilty to decide how to render the chunks. With a regular, non-streaming call, Ark will do exactly as OpenAI does and return only the final response from the team (without any speification on what the agent was). However, for a real-time streaming response it is not possible to ‘filter’ this down to only the final response (as team members can execute in parallel and the final message cannot be determined in advance deterministically).
This means that OpenAI compability is not possible when querying teams with streaming - the client must check the ark
metadata fields to see what agent is currently responding.
The Ark Metadata Field
The ark
metadata field identifies which agent is currently contributing, as well as other parameters for the query. This is essential when trying to process responses from targets such as teams, which will contain stream chunks for multiple agents and tools.
{
"ark": {
"query": "789", // Query ID
"session": "sess-123", // Session ID if applicable
"target": "team/research-team", // Query target (team/agent/model)
"team": "research-team", // Team name (for team queries)
"agent": "analyst", // Current agent name
"model": "gpt-4" // Model being used
}
}
Clients aware of this field can display rich multi-agent interactions and tool calls. The ark
CLI demonstrates this with the chat
function, which shows team member responses and intermediate tool calls.
Event Streaming Architecture
Writing Stream (Query Execution):
┌──────────┐ ┌──────────┐ ┌──────────────────┐ ┌──────────────────────┐
│ Client │───→│ ARK API │───→│ Query Controller │───→│ Event Streaming │
│(request) │ └──────────┘ └──────────────────┘ │ Service │
└──────────┘ (Create Query) (Stream chunks) └──────────────────────┘
POST /stream/{id}
Reading Stream (Client Consumption):
┌──────────┐ ┌──────────┐ ┌──────────────────────┐
│ Client │───→│ ARK API │───→│ Event Streaming │
│(response)│←───└──────────┘←───│ Service │
└──────────┘ GET ../stream └──────────────────────┘
(SSE chunks) GET /stream/{id}
When streaming is enabled:
- Client creates a Query resource through the Kubernetes API
- Query Controller writes OpenAI-compatible chunks using the Write Stream API as they’re generated from LLMs
- Consumers connect and read events in real-time using the Read Stream API
- Query Controller calls the Complete Stream API when the entire query execution finishes (which may involve multiple LLM completions across agents and teams)
All OpenAI-compatible chunks are passed through as-is, including text content, tool calls, and finish reasons. When queries involve teams or multiple agents, intermediate finish_reason
chunks are forwarded but do not close the stream - only the explicit completion signal from the Query Controller ends the stream with data: [DONE]
.
If a query target does not support streaming (as is currently the case for some models), then as-per the OpenAI specification the streaming APIs will return the entire response in a single chunk at the end of query execution.
Event Streaming Service
An Event Streaming Service is any service that implements the Event Stream API. To configure event streaming, deploy a ConfigMap that specifies the service that should be used:
apiVersion: v1
kind: ConfigMap
metadata:
name: ark-config-streaming
namespace: default
data:
enabled: "true"
serviceRef: |
name: ark-cluster-memory # Service that implements the Event Stream API
port: "http" # Port name from the service definition
namespace: "" # Optional: namespace (defaults to current namespace)
When this ConfigMap is present and enabled
is set to "true"
, the Query Controller will use the specified service for event streaming. If the ConfigMap is not present or enabled
is "false"
, streaming is disabled. If a query is issued with the streaming-enabled
annotation set to true
and no event streaming service is available, a warning will be logged.
The ark-cluster-memory
service implements this API when deployed with streaming enabled.
Tool/Function Calling in Streams
OpenAI’s Chat Completions API supports streaming tool calls. Unlike text content which appears in delta.content
, tool calls appear in delta.tool_calls
and must be accumulated by index.
Tool call delta structure:
index
: Identifies which function call the delta is forid
: Tool call ID (only in first delta)function.name
: Function name (only in first delta)function.arguments
: Argument fragments to accumulatetype
: Always “function” (only in first delta)
Example chunks showing delta.tool_calls
:
[{"index": 0, "id": "call_abc", "function": {"arguments": "", "name": "get_weather"}, "type": "function"}]
[{"index": 0, "function": {"arguments": "{\"location\":"}}]
[{"index": 0, "function": {"arguments": "\"Paris, France\"}"}}]
Clients must concatenate function.arguments
across all deltas with the same index to reconstruct complete tool calls.
Event Stream API
The event stream API can be used to read and write message chunks.
Read Stream
/stream/{query_id}
Establishes SSE connection for receiving real-time messages. On disconnect, reconnect with from-beginning=true
to replay stored chunks.
Query Parameters:
from-beginning=true
- Send all existing messages first, then stream new oneswait-for-query=<timeout>
- Wait for query execution to start (e.g.,wait-for-query=30s
)
Response Format:
OpenAI-compatible streaming chunks sent as SSE events:
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1704067200,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1704067200,"model":"gpt-4","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1704067200,"model":"gpt-4","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
Completion Signal:
When streaming completes, the service sends:
- A chunk with
finish_reason: "stop"
- Followed by
data: [DONE]
Write Stream
/stream/{query_id}
Write chunks to a stream. Controller disconnects (ECONNRESET) indicate timeout or network issues.
Content-Type: application/x-ndjson
Request Body:
Newline-delimited JSON chunks in OpenAI format:
{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1704067200,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1704067200,"model":"gpt-4","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]}
{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1704067200,"model":"gpt-4","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
Complete Stream
/stream/{query_id}/complete
Marks query execution as complete. Closes connections to consumers.
Response:
{
"status": "completed",
"query": "my-query-id"
}
ARK’s controller orchestrates execution across multiple agents/models and must explicitly signal completion to the memory service.