The Responses API supports streaming responses, allowing you to receive output as it’s generated in real-time. This is particularly useful for chat interfaces and applications that need to display responses progressively.
Enabling Streaming
To enable streaming, set stream: true in your request:
import requests
import json
response = requests.post(
"https://api.anannas.ai/api/v1/responses" ,
headers = {
"Authorization" : "Bearer <ANANNAS_API_KEY>" ,
"Content-Type" : "application/json" ,
},
json = {
"model" : "openai/gpt-5-mini" ,
"input" : "Tell me a story about a robot." ,
"stream" : True ,
"max_output_tokens" : 2000 ,
},
stream = True ,
)
for line in response.iter_lines():
if line:
line_text = line.decode( 'utf-8' )
if line_text.startswith( 'data: ' ):
data_str = line_text[ 6 :]
if data_str == '[DONE]' :
break
try :
event = json.loads(data_str)
if event.get( 'type' ) == 'response.output_text.delta' :
delta = event.get( 'data' , {}).get( 'delta' , '' )
if delta:
print (delta, end = '' , flush = True )
except json.JSONDecodeError:
pass
Stream Event Types
The streaming API uses Server-Sent Events (SSE) format. Each event has a type field indicating the event type:
Event Types:
response.created: Initial response creation event
response.output_item.added: New output item added
response.output_text.delta: Text delta (incremental text)
response.output_item.done: Output item completed
response.completed: Response generation completed
Example Stream Events:
data: {"type":"response.created","data":{"id":"resp_123","created_at":1693350000,"model":"openai/gpt-5-mini"}}
data: {"type":"response.output_item.added","data":{"output_index":0,"item":{"type":"message","role":"assistant"}}}
data: {"type":"response.output_text.delta","data":{"output_index":0,"delta":"Once"}}
data: {"type":"response.output_text.delta","data":{"output_index":0,"delta":" upon"}}
data: {"type":"response.output_text.delta","data":{"output_index":0,"delta":" a"}}
data: {"type":"response.output_text.delta","data":{"output_index":0,"delta":" time"}}
data: {"type":"response.output_item.done","data":{"output_index":0}}
data: {"type":"response.completed","data":{"id":"resp_123"}}
data: [DONE]
Processing Stream Events
Here’s a more complete example that handles all event types:
import requests
import json
response = requests.post(
"https://api.anannas.ai/api/v1/responses" ,
headers = {
"Authorization" : "Bearer <ANANNAS_API_KEY>" ,
"Content-Type" : "application/json" ,
},
json = {
"model" : "openai/gpt-5-mini" ,
"input" : "Explain quantum computing." ,
"stream" : True ,
"max_output_tokens" : 2000 ,
},
stream = True ,
)
full_text = ""
for line in response.iter_lines():
if line:
line_text = line.decode( 'utf-8' )
# Skip heartbeat messages
if line_text.startswith( ':ANANNAS PROCESSING' ):
continue
if line_text.startswith( 'data: ' ):
data_str = line_text[ 6 :]
if data_str == '[DONE]' :
print ( " \n\n Stream completed." )
break
try :
event = json.loads(data_str)
event_type = event.get( 'type' )
event_data = event.get( 'data' , {})
if event_type == 'response.created' :
print ( f "Response ID: { event_data.get( 'id' ) } " )
print ( f "Model: { event_data.get( 'model' ) } \n " )
elif event_type == 'response.output_text.delta' :
delta = event_data.get( 'delta' , '' )
if delta:
full_text += delta
print (delta, end = '' , flush = True )
elif event_type == 'response.output_item.done' :
print ( " \n\n Output item completed." )
elif event_type == 'response.completed' :
print ( " \n\n Response generation completed." )
except json.JSONDecodeError:
pass
print ( f " \n\n Full text: { full_text } " )
Heartbeat Messages
The streaming API sends heartbeat messages (:ANANNAS PROCESSING) to keep the connection alive. These should be ignored when processing events.
Error Handling in Streams
Errors in streaming responses are sent as events:
{
"error" : {
"message" : "Rate limit exceeded" ,
"type" : "rate_limit_error" ,
"code" : "rate_limit_exceeded"
}
}
Always check for error events in your stream processing:
for line in response.iter_lines():
if line:
line_text = line.decode( 'utf-8' )
if line_text.startswith( 'data: ' ):
data_str = line_text[ 6 :]
try :
event = json.loads(data_str)
# Check for errors
if 'error' in event:
print ( f "Error: { event[ 'error' ][ 'message' ] } " )
break
# Process normal events
# ...
except json.JSONDecodeError:
pass
Best Practices
Handle heartbeats : Ignore :ANANNAS PROCESSING messages
Check for [DONE] : Stop processing when you receive [DONE]
Error handling : Always check for error events in the stream
Buffer management : For long streams, consider buffering and processing in chunks
Connection management : Handle connection drops gracefully and implement retry logic
Was this page helpful? 👍
Yes👎
No