Skip to content

Commit dff0548

Browse files
authored
Realtime: move demo audio to separate thread (#1141)
Without this, the audio playback contends for CPU with the event queue and mic input, which is bad
1 parent 479c171 commit dff0548

File tree

7 files changed

+109
-22
lines changed

7 files changed

+109
-22
lines changed

examples/realtime/demo.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None:
9393
self.ui.add_transcript("Audio ended")
9494
elif event.type == "audio":
9595
np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
96-
self.ui.play_audio(np_audio)
96+
# Play audio in a separate thread to avoid blocking the event loop
97+
await asyncio.to_thread(self.ui.play_audio, np_audio)
9798
elif event.type == "audio_interrupted":
9899
self.ui.add_transcript("Audio interrupted")
99100
elif event.type == "error":

examples/realtime/no_ui_demo.py

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import asyncio
2+
import queue
23
import sys
4+
import threading
5+
from typing import Any
36

47
import numpy as np
58
import sounddevice as sd
@@ -46,14 +49,77 @@ def __init__(self) -> None:
4649
self.audio_player: sd.OutputStream | None = None
4750
self.recording = False
4851

52+
# Audio output state for callback system
53+
self.output_queue: queue.Queue[Any] = queue.Queue(maxsize=10) # Buffer more chunks
54+
self.interrupt_event = threading.Event()
55+
self.current_audio_chunk: np.ndarray | None = None # type: ignore
56+
self.chunk_position = 0
57+
58+
def _output_callback(self, outdata, frames: int, time, status) -> None:
59+
"""Callback for audio output - handles continuous audio stream from server."""
60+
if status:
61+
print(f"Output callback status: {status}")
62+
63+
# Check if we should clear the queue due to interrupt
64+
if self.interrupt_event.is_set():
65+
# Clear the queue and current chunk state
66+
while not self.output_queue.empty():
67+
try:
68+
self.output_queue.get_nowait()
69+
except queue.Empty:
70+
break
71+
self.current_audio_chunk = None
72+
self.chunk_position = 0
73+
self.interrupt_event.clear()
74+
outdata.fill(0)
75+
return
76+
77+
# Fill output buffer from queue and current chunk
78+
outdata.fill(0) # Start with silence
79+
samples_filled = 0
80+
81+
while samples_filled < len(outdata):
82+
# If we don't have a current chunk, try to get one from queue
83+
if self.current_audio_chunk is None:
84+
try:
85+
self.current_audio_chunk = self.output_queue.get_nowait()
86+
self.chunk_position = 0
87+
except queue.Empty:
88+
# No more audio data available - this causes choppiness
89+
# Uncomment next line to debug underruns:
90+
# print(f"Audio underrun: {samples_filled}/{len(outdata)} samples filled")
91+
break
92+
93+
# Copy data from current chunk to output buffer
94+
remaining_output = len(outdata) - samples_filled
95+
remaining_chunk = len(self.current_audio_chunk) - self.chunk_position
96+
samples_to_copy = min(remaining_output, remaining_chunk)
97+
98+
if samples_to_copy > 0:
99+
chunk_data = self.current_audio_chunk[
100+
self.chunk_position : self.chunk_position + samples_to_copy
101+
]
102+
# More efficient: direct assignment for mono audio instead of reshape
103+
outdata[samples_filled : samples_filled + samples_to_copy, 0] = chunk_data
104+
samples_filled += samples_to_copy
105+
self.chunk_position += samples_to_copy
106+
107+
# If we've used up the entire chunk, reset for next iteration
108+
if self.chunk_position >= len(self.current_audio_chunk):
109+
self.current_audio_chunk = None
110+
self.chunk_position = 0
111+
49112
async def run(self) -> None:
50113
print("Connecting, may take a few seconds...")
51114

52-
# Initialize audio player
115+
# Initialize audio player with callback
116+
chunk_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)
53117
self.audio_player = sd.OutputStream(
54118
channels=CHANNELS,
55119
samplerate=SAMPLE_RATE,
56120
dtype=FORMAT,
121+
callback=self._output_callback,
122+
blocksize=chunk_size, # Match our chunk timing for better alignment
57123
)
58124
self.audio_player.start()
59125

@@ -146,15 +212,24 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None:
146212
elif event.type == "audio_end":
147213
print("Audio ended")
148214
elif event.type == "audio":
149-
# Play audio through speakers
215+
# Enqueue audio for callback-based playback
150216
np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
151-
if self.audio_player:
152-
try:
153-
self.audio_player.write(np_audio)
154-
except Exception as e:
155-
print(f"Audio playback error: {e}")
217+
try:
218+
self.output_queue.put_nowait(np_audio)
219+
except queue.Full:
220+
# Queue is full - only drop if we have significant backlog
221+
# This prevents aggressive dropping that could cause choppiness
222+
if self.output_queue.qsize() > 8: # Keep some buffer
223+
try:
224+
self.output_queue.get_nowait()
225+
self.output_queue.put_nowait(np_audio)
226+
except queue.Empty:
227+
pass
228+
# If queue isn't too full, just skip this chunk to avoid blocking
156229
elif event.type == "audio_interrupted":
157230
print("Audio interrupted")
231+
# Signal the output callback to clear its queue and state
232+
self.interrupt_event.set()
158233
elif event.type == "error":
159234
print(f"Error: {event.error}")
160235
elif event.type == "history_updated":

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ requires-python = ">=3.9"
77
license = "MIT"
88
authors = [{ name = "OpenAI", email = "[email protected]" }]
99
dependencies = [
10-
"openai>=1.96.0, <2",
10+
"openai>=1.96.1, <2",
1111
"pydantic>=2.10, <3",
1212
"griffe>=1.5.6, <2",
1313
"typing-extensions>=4.12.2, <5",

src/agents/realtime/items.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ class AssistantText(BaseModel):
3030
model_config = ConfigDict(extra="allow")
3131

3232

33+
class AssistantAudio(BaseModel):
34+
type: Literal["audio"] = "audio"
35+
audio: str | None = None
36+
transcript: str | None = None
37+
38+
# Allow extra data
39+
model_config = ConfigDict(extra="allow")
40+
41+
3342
class SystemMessageItem(BaseModel):
3443
item_id: str
3544
previous_item_id: str | None = None
@@ -58,7 +67,7 @@ class AssistantMessageItem(BaseModel):
5867
type: Literal["message"] = "message"
5968
role: Literal["assistant"] = "assistant"
6069
status: Literal["in_progress", "completed", "incomplete"] | None = None
61-
content: list[AssistantText]
70+
content: list[Annotated[AssistantText | AssistantAudio, Field(discriminator="type")]]
6271

6372
# Allow extra data
6473
model_config = ConfigDict(extra="allow")

src/agents/realtime/openai_realtime.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,9 @@ async def _handle_output_item(self, item: ConversationItem) -> None:
364364
"item_id": item.id or "",
365365
"type": item.type,
366366
"role": item.role,
367-
"content": item.content,
367+
"content": (
368+
[content.model_dump() for content in item.content] if item.content else []
369+
),
368370
"status": "in_progress",
369371
}
370372
)

tests/realtime/test_session.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ async def test_item_updated_event_updates_existing_item(self, mock_model, mock_a
295295
# Check that item was updated
296296
assert len(session._history) == 1
297297
updated_item = cast(AssistantMessageItem, session._history[0])
298-
assert updated_item.content[0].text == "Updated"
298+
assert updated_item.content[0].text == "Updated" # type: ignore
299299

300300
# Should have 2 events: raw + history updated (not added)
301301
assert session._event_queue.qsize() == 2
@@ -526,7 +526,7 @@ def test_update_existing_item_by_id(self):
526526
# Item should be updated
527527
result_item = cast(AssistantMessageItem, new_history[0])
528528
assert result_item.item_id == "item_1"
529-
assert result_item.content[0].text == "Updated"
529+
assert result_item.content[0].text == "Updated" # type: ignore
530530

531531
def test_update_existing_item_preserves_order(self):
532532
"""Test that updating existing item preserves its position in history"""
@@ -559,13 +559,13 @@ def test_update_existing_item_preserves_order(self):
559559

560560
# Middle item should be updated
561561
updated_result = cast(AssistantMessageItem, new_history[1])
562-
assert updated_result.content[0].text == "Updated Second"
562+
assert updated_result.content[0].text == "Updated Second" # type: ignore
563563

564564
# Other items should be unchanged
565565
item1_result = cast(AssistantMessageItem, new_history[0])
566566
item3_result = cast(AssistantMessageItem, new_history[2])
567-
assert item1_result.content[0].text == "First"
568-
assert item3_result.content[0].text == "Third"
567+
assert item1_result.content[0].text == "First" # type: ignore
568+
assert item3_result.content[0].text == "Third" # type: ignore
569569

570570
def test_insert_new_item_after_previous_item(self):
571571
"""Test inserting new item after specified previous_item_id"""
@@ -600,7 +600,7 @@ def test_insert_new_item_after_previous_item(self):
600600

601601
# Content should be correct
602602
item2_result = cast(AssistantMessageItem, new_history[1])
603-
assert item2_result.content[0].text == "Second"
603+
assert item2_result.content[0].text == "Second" # type: ignore
604604

605605
def test_insert_new_item_after_nonexistent_previous_item(self):
606606
"""Test that item with nonexistent previous_item_id gets added to end"""
@@ -703,7 +703,7 @@ def test_complex_insertion_scenario(self):
703703
assert len(history) == 4
704704
assert [item.item_id for item in history] == ["A", "B", "D", "C"]
705705
itemB_result = cast(AssistantMessageItem, history[1])
706-
assert itemB_result.content[0].text == "Updated B"
706+
assert itemB_result.content[0].text == "Updated B" # type: ignore
707707

708708

709709
# Test 3: Tool call execution flow (_handle_tool_call method)

uv.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)