Skip to main content
The Responses API supports streaming responses, allowing you to receive output as it’s generated in real-time. This is particularly useful for chat interfaces and applications that need to display responses progressively.

Enabling Streaming

To enable streaming, set stream: true in your request:
import requests
import json

response = requests.post(
    "https://api.anannas.ai/api/v1/responses",
    headers={
        "Authorization": "Bearer <ANANNAS_API_KEY>",
        "Content-Type": "application/json",
    },
    json={
        "model": "openai/gpt-5-mini",
        "input": "Tell me a story about a robot.",
        "stream": True,
        "max_output_tokens": 2000,
    },
    stream=True,
)

for line in response.iter_lines():
    if line:
        line_text = line.decode('utf-8')
        if line_text.startswith('data: '):
            data_str = line_text[6:]
            if data_str == '[DONE]':
                break
            try:
                event = json.loads(data_str)
                if event.get('type') == 'response.output_text.delta':
                    delta = event.get('data', {}).get('delta', '')
                    if delta:
                        print(delta, end='', flush=True)
            except json.JSONDecodeError:
                pass

Stream Event Types

The streaming API uses Server-Sent Events (SSE) format. Each event has a type field indicating the event type: Event Types:
  • response.created: Initial response creation event
  • response.output_item.added: New output item added
  • response.output_text.delta: Text delta (incremental text)
  • response.output_item.done: Output item completed
  • response.completed: Response generation completed
Example Stream Events:
data: {"type":"response.created","data":{"id":"resp_123","created_at":1693350000,"model":"openai/gpt-5-mini"}}

data: {"type":"response.output_item.added","data":{"output_index":0,"item":{"type":"message","role":"assistant"}}}

data: {"type":"response.output_text.delta","data":{"output_index":0,"delta":"Once"}}

data: {"type":"response.output_text.delta","data":{"output_index":0,"delta":" upon"}}

data: {"type":"response.output_text.delta","data":{"output_index":0,"delta":" a"}}

data: {"type":"response.output_text.delta","data":{"output_index":0,"delta":" time"}}

data: {"type":"response.output_item.done","data":{"output_index":0}}

data: {"type":"response.completed","data":{"id":"resp_123"}}

data: [DONE]

Processing Stream Events

Here’s a more complete example that handles all event types:
import requests
import json

response = requests.post(
    "https://api.anannas.ai/api/v1/responses",
    headers={
        "Authorization": "Bearer <ANANNAS_API_KEY>",
        "Content-Type": "application/json",
    },
    json={
        "model": "openai/gpt-5-mini",
        "input": "Explain quantum computing.",
        "stream": True,
        "max_output_tokens": 2000,
    },
    stream=True,
)

full_text = ""

for line in response.iter_lines():
    if line:
        line_text = line.decode('utf-8')
        
        # Skip heartbeat messages
        if line_text.startswith(':ANANNAS PROCESSING'):
            continue
            
        if line_text.startswith('data: '):
            data_str = line_text[6:]
            
            if data_str == '[DONE]':
                print("\n\nStream completed.")
                break
                
            try:
                event = json.loads(data_str)
                event_type = event.get('type')
                event_data = event.get('data', {})
                
                if event_type == 'response.created':
                    print(f"Response ID: {event_data.get('id')}")
                    print(f"Model: {event_data.get('model')}\n")
                    
                elif event_type == 'response.output_text.delta':
                    delta = event_data.get('delta', '')
                    if delta:
                        full_text += delta
                        print(delta, end='', flush=True)
                        
                elif event_type == 'response.output_item.done':
                    print("\n\nOutput item completed.")
                    
                elif event_type == 'response.completed':
                    print("\n\nResponse generation completed.")
                    
            except json.JSONDecodeError:
                pass

print(f"\n\nFull text: {full_text}")

Heartbeat Messages

The streaming API sends heartbeat messages (:ANANNAS PROCESSING) to keep the connection alive. These should be ignored when processing events.

Error Handling in Streams

Errors in streaming responses are sent as events:
{
  "error": {
    "message": "Rate limit exceeded",
    "type": "rate_limit_error",
    "code": "rate_limit_exceeded"
  }
}
Always check for error events in your stream processing:
for line in response.iter_lines():
    if line:
        line_text = line.decode('utf-8')
        if line_text.startswith('data: '):
            data_str = line_text[6:]
            try:
                event = json.loads(data_str)
                
                # Check for errors
                if 'error' in event:
                    print(f"Error: {event['error']['message']}")
                    break
                    
                # Process normal events
                # ...
            except json.JSONDecodeError:
                pass

Best Practices

  1. Handle heartbeats: Ignore :ANANNAS PROCESSING messages
  2. Check for [DONE]: Stop processing when you receive [DONE]
  3. Error handling: Always check for error events in the stream
  4. Buffer management: For long streams, consider buffering and processing in chunks
  5. Connection management: Handle connection drops gracefully and implement retry logic
Was this page helpful?