Skip to content

Realtime: use SDK types for all messages #1134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ requires-python = ">=3.9"
license = "MIT"
authors = [{ name = "OpenAI", email = "[email protected]" }]
dependencies = [
"openai>=1.93.1, <2",
"openai>=1.96.0, <2",
"pydantic>=2.10, <3",
"griffe>=1.5.6, <2",
"typing-extensions>=4.12.2, <5",
Expand Down
1 change: 1 addition & 0 deletions src/agents/realtime/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class AssistantMessageItem(BaseModel):
class RealtimeToolCallItem(BaseModel):
item_id: str
previous_item_id: str | None = None
call_id: str | None
type: Literal["function_call"] = "function_call"
status: Literal["in_progress", "completed"]
arguments: str
Expand Down
262 changes: 165 additions & 97 deletions src/agents/realtime/openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,47 @@

import pydantic
import websockets
from openai.types.beta.realtime.conversation_item import ConversationItem
from openai.types.beta.realtime.conversation_item import (
ConversationItem,
ConversationItem as OpenAIConversationItem,
)
from openai.types.beta.realtime.conversation_item_content import (
ConversationItemContent as OpenAIConversationItemContent,
)
from openai.types.beta.realtime.conversation_item_create_event import (
ConversationItemCreateEvent as OpenAIConversationItemCreateEvent,
)
from openai.types.beta.realtime.conversation_item_retrieve_event import (
ConversationItemRetrieveEvent as OpenAIConversationItemRetrieveEvent,
)
from openai.types.beta.realtime.conversation_item_truncate_event import (
ConversationItemTruncateEvent as OpenAIConversationItemTruncateEvent,
)
from openai.types.beta.realtime.input_audio_buffer_append_event import (
InputAudioBufferAppendEvent as OpenAIInputAudioBufferAppendEvent,
)
from openai.types.beta.realtime.input_audio_buffer_commit_event import (
InputAudioBufferCommitEvent as OpenAIInputAudioBufferCommitEvent,
)
from openai.types.beta.realtime.realtime_client_event import (
RealtimeClientEvent as OpenAIRealtimeClientEvent,
)
from openai.types.beta.realtime.realtime_server_event import (
RealtimeServerEvent as OpenAIRealtimeServerEvent,
)
from openai.types.beta.realtime.response_audio_delta_event import ResponseAudioDeltaEvent
from openai.types.beta.realtime.response_cancel_event import (
ResponseCancelEvent as OpenAIResponseCancelEvent,
)
from openai.types.beta.realtime.response_create_event import (
ResponseCreateEvent as OpenAIResponseCreateEvent,
)
from openai.types.beta.realtime.session_update_event import (
Session as OpenAISessionObject,
SessionTool as OpenAISessionTool,
SessionTracing as OpenAISessionTracing,
SessionTracingTracingConfiguration as OpenAISessionTracingConfiguration,
SessionUpdateEvent as OpenAISessionUpdateEvent,
)
from pydantic import TypeAdapter
from typing_extensions import assert_never
Expand Down Expand Up @@ -135,12 +168,11 @@ async def _send_tracing_config(
) -> None:
"""Update tracing configuration via session.update event."""
if tracing_config is not None:
converted_tracing_config = _ConversionHelper.convert_tracing_config(tracing_config)
await self._send_raw_message(
RealtimeModelSendRawMessage(
message={
"type": "session.update",
"other_data": {"session": {"tracing": tracing_config}},
}
OpenAISessionUpdateEvent(
session=OpenAISessionObject(tracing=converted_tracing_config),
type="session.update",
)
)

Expand Down Expand Up @@ -199,7 +231,11 @@ async def _listen_for_messages(self):
async def send_event(self, event: RealtimeModelSendEvent) -> None:
"""Send an event to the model."""
if isinstance(event, RealtimeModelSendRawMessage):
await self._send_raw_message(event)
converted = _ConversionHelper.try_convert_raw_message(event)
if converted is not None:
await self._send_raw_message(converted)
else:
logger.error(f"Failed to convert raw message: {event}")
elif isinstance(event, RealtimeModelSendUserInput):
await self._send_user_input(event)
elif isinstance(event, RealtimeModelSendAudio):
Expand All @@ -214,77 +250,33 @@ async def send_event(self, event: RealtimeModelSendEvent) -> None:
assert_never(event)
raise ValueError(f"Unknown event type: {type(event)}")

async def _send_raw_message(self, event: RealtimeModelSendRawMessage) -> None:
async def _send_raw_message(self, event: OpenAIRealtimeClientEvent) -> None:
"""Send a raw message to the model."""
assert self._websocket is not None, "Not connected"

converted_event = {
"type": event.message["type"],
}

converted_event.update(event.message.get("other_data", {}))

await self._websocket.send(json.dumps(converted_event))
await self._websocket.send(event.model_dump_json(exclude_none=True, exclude_unset=True))

async def _send_user_input(self, event: RealtimeModelSendUserInput) -> None:
message = (
event.user_input
if isinstance(event.user_input, dict)
else {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": event.user_input}],
}
)
other_data = {
"item": message,
}

await self._send_raw_message(
RealtimeModelSendRawMessage(
message={"type": "conversation.item.create", "other_data": other_data}
)
)
await self._send_raw_message(
RealtimeModelSendRawMessage(message={"type": "response.create"})
)
converted = _ConversionHelper.convert_user_input_to_item_create(event)
await self._send_raw_message(converted)
await self._send_raw_message(OpenAIResponseCreateEvent(type="response.create"))

async def _send_audio(self, event: RealtimeModelSendAudio) -> None:
base64_audio = base64.b64encode(event.audio).decode("utf-8")
await self._send_raw_message(
RealtimeModelSendRawMessage(
message={
"type": "input_audio_buffer.append",
"other_data": {
"audio": base64_audio,
},
}
)
)
converted = _ConversionHelper.convert_audio_to_input_audio_buffer_append(event)
await self._send_raw_message(converted)
if event.commit:
await self._send_raw_message(
RealtimeModelSendRawMessage(message={"type": "input_audio_buffer.commit"})
OpenAIInputAudioBufferCommitEvent(type="input_audio_buffer.commit")
)

async def _send_tool_output(self, event: RealtimeModelSendToolOutput) -> None:
await self._send_raw_message(
RealtimeModelSendRawMessage(
message={
"type": "conversation.item.create",
"other_data": {
"item": {
"type": "function_call_output",
"output": event.output,
"call_id": event.tool_call.id,
},
},
}
)
)
converted = _ConversionHelper.convert_tool_output(event)
await self._send_raw_message(converted)

tool_item = RealtimeToolCallItem(
item_id=event.tool_call.id or "",
previous_item_id=event.tool_call.previous_item_id,
call_id=event.tool_call.call_id,
type="function_call",
status="completed",
arguments=event.tool_call.arguments,
Expand All @@ -294,9 +286,7 @@ async def _send_tool_output(self, event: RealtimeModelSendToolOutput) -> None:
await self._emit_event(RealtimeModelItemUpdatedEvent(item=tool_item))

if event.start_response:
await self._send_raw_message(
RealtimeModelSendRawMessage(message={"type": "response.create"})
)
await self._send_raw_message(OpenAIResponseCreateEvent(type="response.create"))

async def _send_interrupt(self, event: RealtimeModelSendInterrupt) -> None:
if not self._current_item_id or not self._audio_start_time:
Expand All @@ -307,18 +297,12 @@ async def _send_interrupt(self, event: RealtimeModelSendInterrupt) -> None:
elapsed_time_ms = (datetime.now() - self._audio_start_time).total_seconds() * 1000
if elapsed_time_ms > 0 and elapsed_time_ms < self._audio_length_ms:
await self._emit_event(RealtimeModelAudioInterruptedEvent())
await self._send_raw_message(
RealtimeModelSendRawMessage(
message={
"type": "conversation.item.truncate",
"other_data": {
"item_id": self._current_item_id,
"content_index": self._current_audio_content_index,
"audio_end_ms": elapsed_time_ms,
},
}
)
converted = _ConversionHelper.convert_interrupt(
self._current_item_id,
self._current_audio_content_index or 0,
int(elapsed_time_ms),
)
await self._send_raw_message(converted)

self._current_item_id = None
self._audio_start_time = None
Expand Down Expand Up @@ -354,6 +338,7 @@ async def _handle_output_item(self, item: ConversationItem) -> None:
tool_call = RealtimeToolCallItem(
item_id=item.id or "",
previous_item_id=None,
call_id=item.call_id,
type="function_call",
# We use the same item for tool call and output, so it will be completed by the
# output being added
Expand All @@ -365,7 +350,7 @@ async def _handle_output_item(self, item: ConversationItem) -> None:
await self._emit_event(RealtimeModelItemUpdatedEvent(item=tool_call))
await self._emit_event(
RealtimeModelToolCallEvent(
call_id=item.id or "",
call_id=item.call_id or "",
name=item.name or "",
arguments=item.arguments or "",
id=item.id or "",
Expand Down Expand Up @@ -404,9 +389,7 @@ async def close(self) -> None:

async def _cancel_response(self) -> None:
if self._ongoing_response:
await self._send_raw_message(
RealtimeModelSendRawMessage(message={"type": "response.cancel"})
)
await self._send_raw_message(OpenAIResponseCancelEvent(type="response.cancel"))
self._ongoing_response = False

async def _handle_ws_event(self, event: dict[str, Any]):
Expand Down Expand Up @@ -466,16 +449,13 @@ async def _handle_ws_event(self, event: dict[str, Any]):
parsed.type == "conversation.item.input_audio_transcription.completed"
or parsed.type == "conversation.item.truncated"
):
await self._send_raw_message(
RealtimeModelSendRawMessage(
message={
"type": "conversation.item.retrieve",
"other_data": {
"item_id": self._current_item_id,
},
}
if self._current_item_id:
await self._send_raw_message(
OpenAIConversationItemRetrieveEvent(
type="conversation.item.retrieve",
item_id=self._current_item_id,
)
)
)
if parsed.type == "conversation.item.input_audio_transcription.completed":
await self._emit_event(
RealtimeModelInputAudioTranscriptionCompletedEvent(
Expand Down Expand Up @@ -504,14 +484,7 @@ async def _handle_ws_event(self, event: dict[str, Any]):
async def _update_session_config(self, model_settings: RealtimeSessionModelSettings) -> None:
session_config = self._get_session_config(model_settings)
await self._send_raw_message(
RealtimeModelSendRawMessage(
message={
"type": "session.update",
"other_data": {
"session": session_config.model_dump(exclude_unset=True, exclude_none=True)
},
}
)
OpenAISessionUpdateEvent(session=session_config, type="session.update")
)

def _get_session_config(
Expand Down Expand Up @@ -582,3 +555,98 @@ def conversation_item_to_realtime_message_item(
"status": "in_progress",
},
)

@classmethod
def try_convert_raw_message(
cls, message: RealtimeModelSendRawMessage
) -> OpenAIRealtimeClientEvent | None:
try:
data = {}
data["type"] = message.message["type"]
data.update(message.message.get("other_data", {}))
return TypeAdapter(OpenAIRealtimeClientEvent).validate_python(data)
except Exception:
return None

@classmethod
def convert_tracing_config(
cls, tracing_config: RealtimeModelTracingConfig | Literal["auto"] | None
) -> OpenAISessionTracing | None:
if tracing_config is None:
return None
elif tracing_config == "auto":
return "auto"
return OpenAISessionTracingConfiguration(
group_id=tracing_config.get("group_id"),
metadata=tracing_config.get("metadata"),
workflow_name=tracing_config.get("workflow_name"),
)

@classmethod
def convert_user_input_to_conversation_item(
cls, event: RealtimeModelSendUserInput
) -> OpenAIConversationItem:
user_input = event.user_input

if isinstance(user_input, dict):
return OpenAIConversationItem(
type="message",
role="user",
content=[
OpenAIConversationItemContent(
type="input_text",
text=item.get("text"),
)
for item in user_input.get("content", [])
],
)
else:
return OpenAIConversationItem(
type="message",
role="user",
content=[OpenAIConversationItemContent(type="input_text", text=user_input)],
)

@classmethod
def convert_user_input_to_item_create(
cls, event: RealtimeModelSendUserInput
) -> OpenAIRealtimeClientEvent:
return OpenAIConversationItemCreateEvent(
type="conversation.item.create",
item=cls.convert_user_input_to_conversation_item(event),
)

@classmethod
def convert_audio_to_input_audio_buffer_append(
cls, event: RealtimeModelSendAudio
) -> OpenAIRealtimeClientEvent:
base64_audio = base64.b64encode(event.audio).decode("utf-8")
return OpenAIInputAudioBufferAppendEvent(
type="input_audio_buffer.append",
audio=base64_audio,
)

@classmethod
def convert_tool_output(cls, event: RealtimeModelSendToolOutput) -> OpenAIRealtimeClientEvent:
return OpenAIConversationItemCreateEvent(
type="conversation.item.create",
item=OpenAIConversationItem(
type="function_call_output",
output=event.output,
call_id=event.tool_call.call_id,
),
)

@classmethod
def convert_interrupt(
cls,
current_item_id: str,
current_audio_content_index: int,
elapsed_time_ms: int,
) -> OpenAIRealtimeClientEvent:
return OpenAIConversationItemTruncateEvent(
type="conversation.item.truncate",
item_id=current_item_id,
content_index=current_audio_content_index,
audio_end_ms=elapsed_time_ms,
)
Loading