From 35812cc1fb688f79cf01675ea342fa81094535b1 Mon Sep 17 00:00:00 2001 From: devtalker Date: Thu, 3 Jul 2025 18:20:10 +0800 Subject: [PATCH 1/4] feat: enable real-time streaming of function call arguments --- examples/basic/stream_function_call_args.py | 222 ++++++++++++++++++ src/agents/models/chatcmpl_stream_handler.py | 178 +++++++++++--- .../test_litellm_chatcompletions_stream.py | 115 +++++++++ tests/test_openai_chatcompletions_stream.py | 138 +++++++++++ 4 files changed, 614 insertions(+), 39 deletions(-) create mode 100644 examples/basic/stream_function_call_args.py diff --git a/examples/basic/stream_function_call_args.py b/examples/basic/stream_function_call_args.py new file mode 100644 index 000000000..3c9c80274 --- /dev/null +++ b/examples/basic/stream_function_call_args.py @@ -0,0 +1,222 @@ +import asyncio +import os +from typing import Any + +from openai import AsyncOpenAI +from openai.types.responses import ResponseFunctionCallArgumentsDeltaEvent + +from agents import Agent, OpenAIChatCompletionsModel, Runner, function_tool, set_tracing_disabled + +BASE_URL = os.getenv("EXAMPLE_BASE_URL") or "" +API_KEY = os.getenv("EXAMPLE_API_KEY") or "" +MODEL_NAME = os.getenv("EXAMPLE_MODEL_NAME") or "" + +if not BASE_URL or not API_KEY or not MODEL_NAME: + raise ValueError( + "Please set EXAMPLE_BASE_URL, EXAMPLE_API_KEY, EXAMPLE_MODEL_NAME via env var or code." + ) + +client = AsyncOpenAI(base_url=BASE_URL, api_key=API_KEY) +set_tracing_disabled(disabled=True) + + +async def demo_single_function_call(): + """ + Demonstrates real-time streaming of function call arguments for a single function. + + As the AI generates a function call, you can see the arguments + being built up incrementally, rather than waiting for the complete + function call to finish. + """ + print("=" * 60) + print("DEMO 1: Single Function Call Streaming") + print("=" * 60) + + @function_tool + def write_file(filename: str, content: str) -> str: + """Write content to a file.""" + print(f"⚔ write_file: {filename}, {content}") + return f"File {filename} written successfully" + + agent = Agent( + name="CodeGenerator", + instructions="""You are a helpful coding assistant. When asked to create files, + use the write_file tool with appropriate filenames and content.""", + model=OpenAIChatCompletionsModel(model=MODEL_NAME, openai_client=client), + tools=[write_file], + ) + + print("šŸ“ Request: Create a Python script that prints 'Hello, World!' and saves it as hello.py") + print("šŸš€ Starting single function call streaming...\n") + + result = Runner.run_streamed( + agent, + input="Create a Python script that prints 'Hello, World!' and saves it as hello.py" + ) + + function_name = None + current_arguments = "" + + async for event in result.stream_events(): + if event.type == "raw_response_event": + # Function call started + if event.data.type == "response.output_item.added": + if hasattr(event.data.item, 'name'): + function_name = event.data.item.name + print(f"šŸ“ž Function call streaming started: {function_name}()") + print("šŸ“ Arguments building...") + + # Real-time argument streaming + elif isinstance(event.data, ResponseFunctionCallArgumentsDeltaEvent): + current_arguments += event.data.delta + print(f" + {event.data.delta}", end="", flush=True) + + # Function call completed + elif event.data.type == "response.output_item.done": + if hasattr(event.data.item, 'name'): + print(f"\nāœ… Function call streaming completed: {function_name}") + print(f"šŸ”§ Final arguments: {current_arguments}") + print() + + print(f"šŸŽ‰ Result: {result.final_output}\n") + + +async def demo_multiple_function_calls(): + """ + Demonstrates real-time streaming of function call arguments for multiple functions. + + As the AI generates multiple function calls, you can see the arguments + for each function being built up incrementally, with clear identification + of which arguments belong to which function call. + """ + print("=" * 60) + print("DEMO 2: Multiple Function Calls Streaming") + print("=" * 60) + + # Create multiple tools for a comprehensive demo + @function_tool + def create_directory(path: str) -> str: + """Create a directory at the specified path.""" + print(f"⚔ create_directory: {path}") + return f"Directory {path} created successfully" + + @function_tool + def write_file(filename: str, content: str) -> str: + """Write content to a file.""" + print(f"⚔ write_file: {filename}, {content}") + return f"File {filename} written successfully" + + @function_tool + def create_config(project_name: str, version: str, dependencies: list[str]) -> str: + """Create a configuration file for a project.""" + print(f"⚔ create_config: {project_name}, {version}, {dependencies}") + return f"Config for {project_name} v{version} created with {len(dependencies)} dependencies" + + @function_tool + def add_readme(project_name: str, description: str) -> str: + """Add a README file to the project.""" + print(f"⚔ add_readme: {project_name}, {description}") + return f"README for {project_name} added with description" + + agent = Agent( + name="ProjectSetupAgent", + instructions="""You are a helpful project setup assistant. When asked to create + a new project, you should: + 1. Create the project directory + 2. Create the main application file + 3. Create a configuration file + 4. Add a README file + + Use all the available tools to set up a complete project structure.""", + model=OpenAIChatCompletionsModel(model=MODEL_NAME, openai_client=client), + tools=[create_directory, write_file, create_config, add_readme], + ) + + print("šŸ“ Request: Create a new Python web project called 'my-web-app' with FastAPI") + print("šŸš€ Starting multiple function calls streaming...\n") + + result = Runner.run_streamed( + agent, + input="Create a new Python web project called 'my-web-app' with FastAPI. Set it up with version 1.0.0 and include dependencies: fastapi, uvicorn, pydantic" + ) + + # Track function calls + function_calls: dict[Any, dict[str, Any]] = {} # call_id -> {name, output_index, arguments} + current_active_call_id = None # Track which function call is currently receiving arguments + + async for event in result.stream_events(): + if event.type == "raw_response_event": + # Function call started + if event.data.type == "response.output_item.added": + if hasattr(event.data.item, 'name') and hasattr(event.data.item, 'call_id'): + output_index = event.data.output_index + function_name = event.data.item.name + call_id = event.data.item.call_id + + function_calls[call_id] = { + 'name': function_name, + 'output_index': output_index, + 'arguments': "" + } + # Set this as the current active function call + current_active_call_id = call_id + print(f"šŸ“ž Function call #{call_id} streaming started: {function_name}()") + print("šŸ“ Arguments building...") + + # Real-time argument streaming + elif isinstance(event.data, ResponseFunctionCallArgumentsDeltaEvent): + # Use the current active call_id to add arguments + if current_active_call_id and current_active_call_id in function_calls: + # Ensure arguments is always a string + prev_args = function_calls[current_active_call_id]['arguments'] + if not isinstance(prev_args, str): + prev_args = str(prev_args) + function_calls[current_active_call_id]['arguments'] = prev_args + str(event.data.delta) + print(f" + {event.data.delta}", end="", flush=True) + + # Function call completed + elif event.data.type == "response.output_item.done": + if hasattr(event.data.item, 'call_id'): + output_index = event.data.output_index + call_id = event.data.item.call_id + + if call_id in function_calls: + function_info = function_calls[call_id] + print(f"\nāœ… Function call #{call_id} streaming completed: {function_info['name']}") + print(f"šŸ”§ Final arguments: {function_info['arguments']}") + print() + # Clear the current active call_id when this function call is done + if current_active_call_id == call_id: + current_active_call_id = None + + print("šŸ“Š Summary of all function calls:") + for call_id, info in function_calls.items(): + print(f" - #{call_id}: {info['name']}({info['arguments']})") + + print(f"\nšŸŽ‰ Result: {result.final_output}\n") + + +async def main(): + """ + Main function that demonstrates both single and multiple function call streaming. + + This comprehensive demo shows: + 1. How function arguments are streamed for single function calls + 2. How multiple function calls are handled with proper identification + 3. Real-time argument building for complex workflows + """ + print("šŸš€ Function Call Arguments Streaming Demo") + print("This demo shows real-time streaming of function arguments") + print("for both single and multiple function call scenarios.\n") + + # Demo 1: Single function call + await demo_single_function_call() + + await asyncio.sleep(1) + + # Demo 2: Multiple function calls + await demo_multiple_function_calls() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/agents/models/chatcmpl_stream_handler.py b/src/agents/models/chatcmpl_stream_handler.py index 0cf1e6a3e..684bc0a3e 100644 --- a/src/agents/models/chatcmpl_stream_handler.py +++ b/src/agents/models/chatcmpl_stream_handler.py @@ -53,6 +53,9 @@ class StreamingState: refusal_content_index_and_output: tuple[int, ResponseOutputRefusal] | None = None reasoning_content_index_and_output: tuple[int, ResponseReasoningItem] | None = None function_calls: dict[int, ResponseFunctionToolCall] = field(default_factory=dict) + # New fields for real-time function call streaming + function_call_streaming: dict[int, bool] = field(default_factory=dict) + function_call_output_idx: dict[int, int] = field(default_factory=dict) class SequenceNumber: @@ -255,9 +258,7 @@ async def handle_stream( # Accumulate the refusal string in the output part state.refusal_content_index_and_output[1].refusal += delta.refusal - # Handle tool calls - # Because we don't know the name of the function until the end of the stream, we'll - # save everything and yield events at the end + # Handle tool calls with real-time streaming support if delta.tool_calls: for tc_delta in delta.tool_calls: if tc_delta.index not in state.function_calls: @@ -268,8 +269,11 @@ async def handle_stream( type="function_call", call_id="", ) + state.function_call_streaming[tc_delta.index] = False + tc_function = tc_delta.function + # Accumulate the data as before state.function_calls[tc_delta.index].arguments += ( tc_function.arguments if tc_function else "" ) or "" @@ -278,6 +282,73 @@ async def handle_stream( ) or "" state.function_calls[tc_delta.index].call_id += tc_delta.id or "" + # Check if we have enough info to start streaming this function call + function_call = state.function_calls[tc_delta.index] + + # Strategy: Only start streaming when we see arguments coming in + # but no new name information, indicating the name is finalized + current_chunk_has_name = tc_function and tc_function.name + current_chunk_has_args = tc_function and tc_function.arguments + + # If this chunk has a name, it means the function name might still be building + # We should wait until we get a chunk with only arguments (no name) + name_seems_finalized = not current_chunk_has_name and current_chunk_has_args + + if (not state.function_call_streaming[tc_delta.index] and + function_call.name and + function_call.call_id and + # Only start streaming when we're confident the name is finalized + # This happens when we get args but no new name chunk + name_seems_finalized): + + # Calculate the output index for this function call + function_call_starting_index = 0 + if state.reasoning_content_index_and_output: + function_call_starting_index += 1 + if state.text_content_index_and_output: + function_call_starting_index += 1 + if state.refusal_content_index_and_output: + function_call_starting_index += 1 + + # Add offset for already started function calls + function_call_starting_index += sum( + 1 for streaming in state.function_call_streaming.values() if streaming + ) + + # Mark this function call as streaming and store its output index + state.function_call_streaming[tc_delta.index] = True + state.function_call_output_idx[ + tc_delta.index + ] = function_call_starting_index + + # Send initial function call added event + yield ResponseOutputItemAddedEvent( + item=ResponseFunctionToolCall( + id=FAKE_RESPONSES_ID, + call_id=function_call.call_id, + arguments="", # Start with empty arguments + name=function_call.name, + type="function_call", + ), + output_index=function_call_starting_index, + type="response.output_item.added", + sequence_number=sequence_number.get_and_increment(), + ) + + # Stream arguments if we've started streaming this function call + if (state.function_call_streaming[tc_delta.index] and + tc_function and + tc_function.arguments): + + output_index = state.function_call_output_idx[tc_delta.index] + yield ResponseFunctionCallArgumentsDeltaEvent( + delta=tc_function.arguments, + item_id=FAKE_RESPONSES_ID, + output_index=output_index, + type="response.function_call_arguments.delta", + sequence_number=sequence_number.get_and_increment(), + ) + if state.reasoning_content_index_and_output: yield ResponseReasoningSummaryPartDoneEvent( item_id=FAKE_RESPONSES_ID, @@ -327,42 +398,71 @@ async def handle_stream( sequence_number=sequence_number.get_and_increment(), ) - # Actually send events for the function calls - for function_call in state.function_calls.values(): - # First, a ResponseOutputItemAdded for the function call - yield ResponseOutputItemAddedEvent( - item=ResponseFunctionToolCall( - id=FAKE_RESPONSES_ID, - call_id=function_call.call_id, - arguments=function_call.arguments, - name=function_call.name, - type="function_call", - ), - output_index=function_call_starting_index, - type="response.output_item.added", - sequence_number=sequence_number.get_and_increment(), - ) - # Then, yield the args - yield ResponseFunctionCallArgumentsDeltaEvent( - delta=function_call.arguments, - item_id=FAKE_RESPONSES_ID, - output_index=function_call_starting_index, - type="response.function_call_arguments.delta", - sequence_number=sequence_number.get_and_increment(), - ) - # Finally, the ResponseOutputItemDone - yield ResponseOutputItemDoneEvent( - item=ResponseFunctionToolCall( - id=FAKE_RESPONSES_ID, - call_id=function_call.call_id, - arguments=function_call.arguments, - name=function_call.name, - type="function_call", - ), - output_index=function_call_starting_index, - type="response.output_item.done", - sequence_number=sequence_number.get_and_increment(), - ) + # Send completion events for function calls + for index, function_call in state.function_calls.items(): + if state.function_call_streaming.get(index, False): + # Function call was streamed, just send the completion event + output_index = state.function_call_output_idx[index] + yield ResponseOutputItemDoneEvent( + item=ResponseFunctionToolCall( + id=FAKE_RESPONSES_ID, + call_id=function_call.call_id, + arguments=function_call.arguments, + name=function_call.name, + type="function_call", + ), + output_index=output_index, + type="response.output_item.done", + sequence_number=sequence_number.get_and_increment(), + ) + else: + # Function call was not streamed (fallback to old behavior) + # This handles edge cases where function name never arrived + fallback_starting_index = 0 + if state.reasoning_content_index_and_output: + fallback_starting_index += 1 + if state.text_content_index_and_output: + fallback_starting_index += 1 + if state.refusal_content_index_and_output: + fallback_starting_index += 1 + + # Add offset for already started function calls + fallback_starting_index += sum( + 1 for streaming in state.function_call_streaming.values() if streaming + ) + + # Send all events at once (backward compatibility) + yield ResponseOutputItemAddedEvent( + item=ResponseFunctionToolCall( + id=FAKE_RESPONSES_ID, + call_id=function_call.call_id, + arguments=function_call.arguments, + name=function_call.name, + type="function_call", + ), + output_index=fallback_starting_index, + type="response.output_item.added", + sequence_number=sequence_number.get_and_increment(), + ) + yield ResponseFunctionCallArgumentsDeltaEvent( + delta=function_call.arguments, + item_id=FAKE_RESPONSES_ID, + output_index=fallback_starting_index, + type="response.function_call_arguments.delta", + sequence_number=sequence_number.get_and_increment(), + ) + yield ResponseOutputItemDoneEvent( + item=ResponseFunctionToolCall( + id=FAKE_RESPONSES_ID, + call_id=function_call.call_id, + arguments=function_call.arguments, + name=function_call.name, + type="function_call", + ), + output_index=fallback_starting_index, + type="response.output_item.done", + sequence_number=sequence_number.get_and_increment(), + ) # Finally, send the Response completed event outputs: list[ResponseOutputItem] = [] diff --git a/tests/models/test_litellm_chatcompletions_stream.py b/tests/models/test_litellm_chatcompletions_stream.py index cd342e444..900379c54 100644 --- a/tests/models/test_litellm_chatcompletions_stream.py +++ b/tests/models/test_litellm_chatcompletions_stream.py @@ -299,3 +299,118 @@ async def patched_fetch_response(self, *args, **kwargs): assert output_events[2].delta == "arg1arg2" assert output_events[3].type == "response.output_item.done" assert output_events[4].type == "response.completed" + + +@pytest.mark.allow_call_model_methods +@pytest.mark.asyncio +async def test_stream_response_yields_real_time_function_call_arguments(monkeypatch) -> None: + """ + Validate that LiteLLM `stream_response` also emits function call arguments in real-time + as they are received, ensuring consistent behavior across model providers. + """ + # Simulate realistic chunks: name first, then arguments incrementally + tool_call_delta1 = ChoiceDeltaToolCall( + index=0, + id="litellm-call-456", + function=ChoiceDeltaToolCallFunction(name="generate_code", arguments=""), + type="function", + ) + tool_call_delta2 = ChoiceDeltaToolCall( + index=0, + function=ChoiceDeltaToolCallFunction(arguments='{"language": "'), + type="function", + ) + tool_call_delta3 = ChoiceDeltaToolCall( + index=0, + function=ChoiceDeltaToolCallFunction(arguments='python", "task": "'), + type="function", + ) + tool_call_delta4 = ChoiceDeltaToolCall( + index=0, + function=ChoiceDeltaToolCallFunction(arguments='hello world"}'), + type="function", + ) + + chunk1 = ChatCompletionChunk( + id="chunk-id", + created=1, + model="fake", + object="chat.completion.chunk", + choices=[Choice(index=0, delta=ChoiceDelta(tool_calls=[tool_call_delta1]))], + ) + chunk2 = ChatCompletionChunk( + id="chunk-id", + created=1, + model="fake", + object="chat.completion.chunk", + choices=[Choice(index=0, delta=ChoiceDelta(tool_calls=[tool_call_delta2]))], + ) + chunk3 = ChatCompletionChunk( + id="chunk-id", + created=1, + model="fake", + object="chat.completion.chunk", + choices=[Choice(index=0, delta=ChoiceDelta(tool_calls=[tool_call_delta3]))], + ) + chunk4 = ChatCompletionChunk( + id="chunk-id", + created=1, + model="fake", + object="chat.completion.chunk", + choices=[Choice(index=0, delta=ChoiceDelta(tool_calls=[tool_call_delta4]))], + usage=CompletionUsage(completion_tokens=1, prompt_tokens=1, total_tokens=2), + ) + + async def fake_stream() -> AsyncIterator[ChatCompletionChunk]: + for c in (chunk1, chunk2, chunk3, chunk4): + yield c + + async def patched_fetch_response(self, *args, **kwargs): + resp = Response( + id="resp-id", + created_at=0, + model="fake-model", + object="response", + output=[], + tool_choice="none", + tools=[], + parallel_tool_calls=False, + ) + return resp, fake_stream() + + monkeypatch.setattr(LitellmModel, "_fetch_response", patched_fetch_response) + model = LitellmProvider().get_model("gpt-4") + output_events = [] + async for event in model.stream_response( + system_instructions=None, + input="", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=ModelTracing.DISABLED, + previous_response_id=None, + prompt=None, + ): + output_events.append(event) + + # Extract events by type + function_args_delta_events = [ + e for e in output_events if e.type == "response.function_call_arguments.delta" + ] + output_item_added_events = [e for e in output_events if e.type == "response.output_item.added"] + + # Verify we got real-time streaming (3 argument delta events) + assert len(function_args_delta_events) == 3 + assert len(output_item_added_events) == 1 + + # Verify the deltas were streamed correctly + expected_deltas = ['{"language": "', 'python", "task": "', 'hello world"}'] + for i, delta_event in enumerate(function_args_delta_events): + assert delta_event.delta == expected_deltas[i] + + # Verify function call metadata + added_event = output_item_added_events[0] + assert isinstance(added_event.item, ResponseFunctionToolCall) + assert added_event.item.name == "generate_code" + assert added_event.item.call_id == "litellm-call-456" diff --git a/tests/test_openai_chatcompletions_stream.py b/tests/test_openai_chatcompletions_stream.py index 49e7bc2f4..f669e673e 100644 --- a/tests/test_openai_chatcompletions_stream.py +++ b/tests/test_openai_chatcompletions_stream.py @@ -299,3 +299,141 @@ async def patched_fetch_response(self, *args, **kwargs): assert output_events[2].delta == "arg1arg2" assert output_events[3].type == "response.output_item.done" assert output_events[4].type == "response.completed" + + +@pytest.mark.allow_call_model_methods +@pytest.mark.asyncio +async def test_stream_response_yields_real_time_function_call_arguments(monkeypatch) -> None: + """ + Validate that `stream_response` emits function call arguments in real-time as they + are received, not just at the end. This test simulates the real OpenAI API behavior + where function name comes first, then arguments are streamed incrementally. + """ + # Simulate realistic OpenAI API chunks: name first, then arguments incrementally + tool_call_delta1 = ChoiceDeltaToolCall( + index=0, + id="tool-call-123", + function=ChoiceDeltaToolCallFunction(name="write_file", arguments=""), + type="function", + ) + tool_call_delta2 = ChoiceDeltaToolCall( + index=0, + function=ChoiceDeltaToolCallFunction(arguments='{"filename": "'), + type="function", + ) + tool_call_delta3 = ChoiceDeltaToolCall( + index=0, + function=ChoiceDeltaToolCallFunction(arguments='test.py", "content": "'), + type="function", + ) + tool_call_delta4 = ChoiceDeltaToolCall( + index=0, + function=ChoiceDeltaToolCallFunction(arguments='print(hello)"}'), + type="function", + ) + + chunk1 = ChatCompletionChunk( + id="chunk-id", + created=1, + model="fake", + object="chat.completion.chunk", + choices=[Choice(index=0, delta=ChoiceDelta(tool_calls=[tool_call_delta1]))], + ) + chunk2 = ChatCompletionChunk( + id="chunk-id", + created=1, + model="fake", + object="chat.completion.chunk", + choices=[Choice(index=0, delta=ChoiceDelta(tool_calls=[tool_call_delta2]))], + ) + chunk3 = ChatCompletionChunk( + id="chunk-id", + created=1, + model="fake", + object="chat.completion.chunk", + choices=[Choice(index=0, delta=ChoiceDelta(tool_calls=[tool_call_delta3]))], + ) + chunk4 = ChatCompletionChunk( + id="chunk-id", + created=1, + model="fake", + object="chat.completion.chunk", + choices=[Choice(index=0, delta=ChoiceDelta(tool_calls=[tool_call_delta4]))], + usage=CompletionUsage(completion_tokens=1, prompt_tokens=1, total_tokens=2), + ) + + async def fake_stream() -> AsyncIterator[ChatCompletionChunk]: + for c in (chunk1, chunk2, chunk3, chunk4): + yield c + + async def patched_fetch_response(self, *args, **kwargs): + resp = Response( + id="resp-id", + created_at=0, + model="fake-model", + object="response", + output=[], + tool_choice="none", + tools=[], + parallel_tool_calls=False, + ) + return resp, fake_stream() + + monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", patched_fetch_response) + model = OpenAIProvider(use_responses=False).get_model("gpt-4") + output_events = [] + async for event in model.stream_response( + system_instructions=None, + input="", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=ModelTracing.DISABLED, + previous_response_id=None, + prompt=None, + ): + output_events.append(event) + + # Extract events by type + created_events = [e for e in output_events if e.type == "response.created"] + output_item_added_events = [e for e in output_events if e.type == "response.output_item.added"] + function_args_delta_events = [ + e for e in output_events if e.type == "response.function_call_arguments.delta" + ] + output_item_done_events = [e for e in output_events if e.type == "response.output_item.done"] + completed_events = [e for e in output_events if e.type == "response.completed"] + + # Verify event structure + assert len(created_events) == 1 + assert len(output_item_added_events) == 1 + assert len(function_args_delta_events) == 3 # Three incremental argument chunks + assert len(output_item_done_events) == 1 + assert len(completed_events) == 1 + + # Verify the function call started as soon as we had name and ID + added_event = output_item_added_events[0] + assert isinstance(added_event.item, ResponseFunctionToolCall) + assert added_event.item.name == "write_file" + assert added_event.item.call_id == "tool-call-123" + assert added_event.item.arguments == "" # Should be empty at start + + # Verify real-time argument streaming + expected_deltas = ['{"filename": "', 'test.py", "content": "', 'print(hello)"}'] + for i, delta_event in enumerate(function_args_delta_events): + assert delta_event.delta == expected_deltas[i] + assert delta_event.item_id == "__fake_id__" # FAKE_RESPONSES_ID + assert delta_event.output_index == 0 + + # Verify completion event has full arguments + done_event = output_item_done_events[0] + assert isinstance(done_event.item, ResponseFunctionToolCall) + assert done_event.item.name == "write_file" + assert done_event.item.arguments == '{"filename": "test.py", "content": "print(hello)"}' + + # Verify final response + completed_event = completed_events[0] + function_call_output = completed_event.response.output[0] + assert isinstance(function_call_output, ResponseFunctionToolCall) + assert function_call_output.name == "write_file" + assert function_call_output.arguments == '{"filename": "test.py", "content": "print(hello)"}' From e9b4cece40d0f911b0d7b0eee1e4859223d42978 Mon Sep 17 00:00:00 2001 From: devtalker Date: Thu, 3 Jul 2025 18:47:28 +0800 Subject: [PATCH 2/4] docs: add documentation for real-time streaming of function call arguments --- docs/streaming.md | 67 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/docs/streaming.md b/docs/streaming.md index b2c7c095d..f4aa0446b 100644 --- a/docs/streaming.md +++ b/docs/streaming.md @@ -31,6 +31,73 @@ if __name__ == "__main__": asyncio.run(main()) ``` +### Function call argument streaming + +Function call arguments are now streamed in real-time as they are generated, providing immediate feedback during large parameter generation. This is especially useful for scenarios like code generation, API request building, or complex configuration creation. + +```python +import asyncio +from openai.types.responses import ResponseFunctionCallArgumentsDeltaEvent +from agents import Agent, Runner, function_tool + +@function_tool +def write_file(filename: str, content: str) -> str: + """Write content to a file.""" + return f"File {filename} written successfully" + +async def main(): + agent = Agent( + name="CodeGenerator", + instructions="You are a helpful coding assistant. When asked to create files, use the write_file tool.", + tools=[write_file], + ) + + result = Runner.run_streamed( + agent, + input="Create a Python script that prints 'Hello, World!' and saves it as hello.py" + ) + + function_name = None + current_arguments = "" + + async for event in result.stream_events(): + if event.type == "raw_response_event": + # Function call started + if event.data.type == "response.output_item.added": + if hasattr(event.data.item, 'name'): + function_name = event.data.item.name + print(f"šŸ“ž Function call streaming started: {function_name}()") + print("šŸ“ Arguments building...") + + # Real-time argument streaming + elif isinstance(event.data, ResponseFunctionCallArgumentsDeltaEvent): + current_arguments += event.data.delta + print(f" + {event.data.delta}", end="", flush=True) + + # Function call completed + elif event.data.type == "response.output_item.done": + if hasattr(event.data.item, 'name'): + print(f"\nāœ… Function call streaming completed: {function_name}") + print(f"šŸ”§ Final arguments: {current_arguments}") + + print(f"\nšŸŽ‰ Result: {result.final_output}") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +This will show the function arguments being built incrementally: +``` +šŸ“ž Function call streaming started: write_file() +šŸ“ Arguments building... + + {"filename": " + + hello.py", "content": " + + print('Hello, World!')"} +āœ… Function call streaming completed: write_file +šŸ”§ Final arguments: {"filename": "hello.py", "content": "print('Hello, World!')"} +``` + ## Run item events and agent events [`RunItemStreamEvent`][agents.stream_events.RunItemStreamEvent]s are higher level events. They inform you when an item has been fully generated. This allows you to push progress updates at the level of "message generated", "tool ran", etc, instead of each token. Similarly, [`AgentUpdatedStreamEvent`][agents.stream_events.AgentUpdatedStreamEvent] gives you updates when the current agent changes (e.g. as the result of a handoff). From 3ee4d6310fbe9d64b5033269bdf331e3a043f0be Mon Sep 17 00:00:00 2001 From: devtalker Date: Mon, 7 Jul 2025 15:12:15 +0800 Subject: [PATCH 3/4] refactor: improve function call detection in streaming examples --- docs/streaming.md | 2 +- examples/basic/stream_function_call_args.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/streaming.md b/docs/streaming.md index f4aa0446b..e55dcf0a9 100644 --- a/docs/streaming.md +++ b/docs/streaming.md @@ -64,7 +64,7 @@ async def main(): if event.type == "raw_response_event": # Function call started if event.data.type == "response.output_item.added": - if hasattr(event.data.item, 'name'): + if getattr(event.data.item, "type", None) == "function_call": function_name = event.data.item.name print(f"šŸ“ž Function call streaming started: {function_name}()") print("šŸ“ Arguments building...") diff --git a/examples/basic/stream_function_call_args.py b/examples/basic/stream_function_call_args.py index 3c9c80274..133edc8b2 100644 --- a/examples/basic/stream_function_call_args.py +++ b/examples/basic/stream_function_call_args.py @@ -61,7 +61,7 @@ def write_file(filename: str, content: str) -> str: if event.type == "raw_response_event": # Function call started if event.data.type == "response.output_item.added": - if hasattr(event.data.item, 'name'): + if getattr(event.data.item, "type", None) == "function_call": function_name = event.data.item.name print(f"šŸ“ž Function call streaming started: {function_name}()") print("šŸ“ Arguments building...") @@ -148,7 +148,7 @@ def add_readme(project_name: str, description: str) -> str: if event.type == "raw_response_event": # Function call started if event.data.type == "response.output_item.added": - if hasattr(event.data.item, 'name') and hasattr(event.data.item, 'call_id'): + if getattr(event.data.item, "type", None) == "function_call": output_index = event.data.output_index function_name = event.data.item.name call_id = event.data.item.call_id From 73cd8b4332fca5a2beabf7f628c48c91535f6a0a Mon Sep 17 00:00:00 2001 From: devtalker Date: Mon, 7 Jul 2025 15:30:27 +0800 Subject: [PATCH 4/4] fix: resolve mypy type errors in function call streaming examples --- docs/streaming.md | 2 +- examples/basic/stream_function_call_args.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/streaming.md b/docs/streaming.md index e55dcf0a9..7b5e11f34 100644 --- a/docs/streaming.md +++ b/docs/streaming.md @@ -65,7 +65,7 @@ async def main(): # Function call started if event.data.type == "response.output_item.added": if getattr(event.data.item, "type", None) == "function_call": - function_name = event.data.item.name + function_name = getattr(event.data.item, "name", "unknown") print(f"šŸ“ž Function call streaming started: {function_name}()") print("šŸ“ Arguments building...") diff --git a/examples/basic/stream_function_call_args.py b/examples/basic/stream_function_call_args.py index 133edc8b2..a8b8765b6 100644 --- a/examples/basic/stream_function_call_args.py +++ b/examples/basic/stream_function_call_args.py @@ -62,7 +62,7 @@ def write_file(filename: str, content: str) -> str: # Function call started if event.data.type == "response.output_item.added": if getattr(event.data.item, "type", None) == "function_call": - function_name = event.data.item.name + function_name = getattr(event.data.item, "name", "unknown") print(f"šŸ“ž Function call streaming started: {function_name}()") print("šŸ“ Arguments building...") @@ -150,8 +150,8 @@ def add_readme(project_name: str, description: str) -> str: if event.data.type == "response.output_item.added": if getattr(event.data.item, "type", None) == "function_call": output_index = event.data.output_index - function_name = event.data.item.name - call_id = event.data.item.call_id + function_name = getattr(event.data.item, "name", "unknown") + call_id = getattr(event.data.item, "call_id", "unknown") function_calls[call_id] = { 'name': function_name, @@ -178,7 +178,7 @@ def add_readme(project_name: str, description: str) -> str: elif event.data.type == "response.output_item.done": if hasattr(event.data.item, 'call_id'): output_index = event.data.output_index - call_id = event.data.item.call_id + call_id = getattr(event.data.item, "call_id", "unknown") if call_id in function_calls: function_info = function_calls[call_id]