diff --git a/README.md b/README.md index 84eb893..eb7e3ed 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,8 @@ It implements the Model Context Protocol specification, handling model context r - Manages tool registration and invocation - Supports prompt registration and execution - Supports resource registration and retrieval +- Supports stdio & Streamable HTTP (including SSE) transports +- Supports notifications for list changes (tools, prompts, resources) ### Supported Methods - `initialize` - Initializes the protocol and returns server capabilities @@ -45,20 +47,53 @@ It implements the Model Context Protocol specification, handling model context r - `resources/read` - Retrieves a specific resource by name - `resources/templates/list` - Lists all registered resource templates and their schemas +### Notifications + +The server supports sending notifications to clients when lists of tools, prompts, or resources change. This enables real-time updates without polling. + +#### Notification Methods + +The server provides three notification methods: +- `notify_tools_list_changed()` - Send a notification when the tools list changes +- `notify_prompts_list_changed()` - Send a notification when the prompts list changes +- `notify_resources_list_changed()` - Send a notification when the resources list changes + +#### Notification Format + +Notifications follow the JSON-RPC 2.0 specification and use these method names: +- `notifications/tools/list_changed` +- `notifications/prompts/list_changed` +- `notifications/resources/list_changed` + +#### Transport Support + +- **HTTP Transport**: Notifications are sent as Server-Sent Events (SSE) to all connected sessions +- **Stdio Transport**: Notifications are sent as JSON-RPC 2.0 messages to stdout + +#### Usage Example + +```ruby +server = MCP::Server.new(name: "my_server") +transport = MCP::Transports::HTTP.new(server) +server.transport = transport + +# When tools change, notify clients +server.define_tool(name: "new_tool") { |**args| { result: "ok" } } +server.notify_tools_list_changed() +``` + ### Unsupported Features ( to be implemented in future versions ) -- Notifications - Log Level - Resource subscriptions - Completions -- Complete StreamableHTTP implementation with streaming responses ### Usage #### Rails Controller When added to a Rails controller on a route that handles POST requests, your server will be compliant with non-streaming -[StreamableHTTP](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) transport +[Streamable HTTP](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) transport requests. You can use the `Server#handle_json` method to handle requests. diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..6d39e28 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,174 @@ +# MCP Ruby Examples + +This directory contains examples of how to use the Model Context Protocol (MCP) Ruby library. + +## Available Examples + +### 1. STDIO Server (`stdio_server.rb`) +A simple server that communicates over standard input/output. This is useful for desktop applications and command-line tools. + +**Usage:** +```bash +ruby examples/stdio_server.rb +{"jsonrpc":"2.0","id":0,"method":"tools/list"} +``` + +### 2. HTTP Server (`http_server.rb`) +A standalone HTTP server built with Rack that implements the MCP Streamable HTTP transport protocol. This demonstrates how to create a web-based MCP server with session management and Server-Sent Events (SSE) support. + +**Features:** +- HTTP transport with Server-Sent Events (SSE) for streaming +- Session management with unique session IDs +- Example tools, prompts, and resources +- JSON-RPC 2.0 protocol implementation +- Full MCP protocol compliance + +**Usage:** +```bash +ruby examples/http_server.rb +``` + +The server will start on `http://localhost:9292` and provide: +- **Tools**: + - `ExampleTool` - adds two numbers + - `echo` - echoes back messages +- **Prompts**: `ExamplePrompt` - echoes back arguments as a prompt +- **Resources**: `test_resource` - returns example content + +### 3. HTTP Client Example (`http_client.rb`) +A client that demonstrates how to interact with the HTTP server using all MCP protocol methods. + +**Usage:** +1. Start the HTTP server in one terminal: + ```bash + ruby examples/http_server.rb + ``` + +2. Run the client example in another terminal: + ```bash + ruby examples/http_client.rb + ``` + +The client will demonstrate: +- Session initialization +- Ping requests +- Listing and calling tools +- Listing and getting prompts +- Listing and reading resources +- Session cleanup + +### 4. SSE Test Server (`sse_test_server.rb`) +A specialized HTTP server designed to test and demonstrate Server-Sent Events (SSE) functionality in the MCP protocol. + +**Features:** +- Tools specifically designed to trigger SSE notifications +- Real-time progress updates and notifications +- Detailed SSE-specific logging + +**Available Tools:** +- `NotificationTool` - Send custom SSE notifications with optional delays +- `echo` - Simple echo tool for basic testing + +**Usage:** +```bash +ruby examples/sse_test_server.rb +``` + +The server will start on `http://localhost:9393` and provide detailed instructions for testing SSE functionality. + +### 5. SSE Test Client (`sse_test_client.rb`) +An interactive client that connects to the SSE stream and provides a menu-driven interface for testing SSE functionality. + +**Features:** +- Automatic SSE stream connection +- Interactive menu for triggering various SSE events +- Real-time display of received SSE notifications +- Session management + +**Usage:** +1. Start the SSE test server in one terminal: + ```bash + ruby examples/sse_test_server.rb + ``` + +2. Run the SSE test client in another terminal: + ```bash + ruby examples/sse_test_client.rb + ``` + +The client will: +- Initialize a session automatically +- Connect to the SSE stream +- Provide an interactive menu to trigger notifications +- Display all received SSE events in real-time + +### Testing SSE with cURL + +You can also test SSE functionality manually using cURL: + +1. Initialize a session: +```bash +SESSION_ID=$(curl -D - -s -o /dev/null -X POST http://localhost:9393 \ + -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","method":"initialize","id":1,"params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"curl-test","version":"1.0"}}}' |grep -i "Mcp-Session-Id:" | cut -d' ' -f2- | tr -d '\r') +``` + +2. Connect to SSE stream (in one terminal): +```bash +curl -N -H "Mcp-Session-Id: $SESSION_ID" http://localhost:9393 +``` + +3. Trigger notifications (in another terminal): +```bash +# Send immediate notification +curl -X POST http://localhost:9393 \ + -H "Content-Type: application/json" \ + -H "Mcp-Session-Id: $SESSION_ID" \ + -d '{"jsonrpc":"2.0","method":"tools/call","id":2,"params":{"name":"notification_tool","arguments":{"message":"Hello from cURL!"}}}' +``` + +## Streamable HTTP Transport Details + +### Protocol Flow + +The HTTP server implements the MCP Streamable HTTP transport protocol: + +1. **Initialize Session**: + - Client sends POST request with `initialize` method + - Server responds with session ID in `Mcp-Session-Id` header + +2. **Establish SSE Connection** (optional): + - Client sends GET request with `Mcp-Session-Id` header + - Server establishes Server-Sent Events stream for notifications + +3. **Send Requests**: + - Client sends POST requests with JSON-RPC 2.0 format + - Server processes and responds with results + +4. **Close Session**: + - Client sends DELETE request with `Mcp-Session-Id` header + +### Example cURL Commands + +Initialize a session: +```bash +curl -X POST http://localhost:9292 \ + -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","method":"initialize","id":1,"params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}' +``` + +List tools (using the session ID from initialization): +```bash +curl -X POST http://localhost:9292 \ + -H "Content-Type: application/json" \ + -H "Mcp-Session-Id: YOUR_SESSION_ID" \ + -d '{"jsonrpc":"2.0","method":"tools/list","id":2}' +``` + +Call a tool: +```bash +curl -X POST http://localhost:9292 \ + -H "Content-Type: application/json" \ + -H "Mcp-Session-Id: YOUR_SESSION_ID" \ + -d '{"jsonrpc":"2.0","method":"tools/call","id":3,"params":{"name":"ExampleTool","arguments":{"a":5,"b":3}}}' +``` diff --git a/examples/http_client.rb b/examples/http_client.rb new file mode 100644 index 0000000..992b9fd --- /dev/null +++ b/examples/http_client.rb @@ -0,0 +1,185 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require "net/http" +require "json" +require "uri" + +# Simple HTTP client example for interacting with the MCP HTTP server +class MCPHTTPClient + def initialize(base_url = "http://localhost:9292") + @base_url = base_url + @session_id = nil + end + + def send_request(method, params = nil, id = nil) + uri = URI(@base_url) + http = Net::HTTP.new(uri.host, uri.port) + + request = Net::HTTP::Post.new(uri.path.empty? ? "/" : uri.path) + request["Content-Type"] = "application/json" + request["Mcp-Session-Id"] = @session_id if @session_id + + body = { + jsonrpc: "2.0", + method: method, + params: params, + id: id || rand(10000), + }.compact + + request.body = body.to_json + + response = http.request(request) + + # Store session ID if provided + if response["Mcp-Session-Id"] + @session_id = response["Mcp-Session-Id"] + puts "Session ID: #{@session_id}" + end + + JSON.parse(response.body) + end + + def initialize_session + puts "=== Initializing session ===" + result = send_request("initialize", { + protocolVersion: "2024-11-05", + capabilities: {}, + clientInfo: { + name: "example_client", + version: "1.0", + }, + }) + puts "Response: #{JSON.pretty_generate(result)}" + puts + result + end + + def ping + puts "=== Sending ping ===" + result = send_request("ping") + puts "Response: #{JSON.pretty_generate(result)}" + puts + result + end + + def list_tools + puts "=== Listing tools ===" + result = send_request("tools/list") + puts "Response: #{JSON.pretty_generate(result)}" + puts + result + end + + def call_tool(name, arguments) + puts "=== Calling tool: #{name} ===" + result = send_request("tools/call", { + name: name, + arguments: arguments, + }) + puts "Response: #{JSON.pretty_generate(result)}" + puts + result + end + + def list_prompts + puts "=== Listing prompts ===" + result = send_request("prompts/list") + puts "Response: #{JSON.pretty_generate(result)}" + puts + result + end + + def get_prompt(name, arguments) + puts "=== Getting prompt: #{name} ===" + result = send_request("prompts/get", { + name: name, + arguments: arguments, + }) + puts "Response: #{JSON.pretty_generate(result)}" + puts + result + end + + def list_resources + puts "=== Listing resources ===" + result = send_request("resources/list") + puts "Response: #{JSON.pretty_generate(result)}" + puts + result + end + + def read_resource(uri) + puts "=== Reading resource: #{uri} ===" + result = send_request("resources/read", { + uri: uri, + }) + puts "Response: #{JSON.pretty_generate(result)}" + puts + result + end + + def close_session + return unless @session_id + + puts "=== Closing session ===" + uri = URI(@base_url) + http = Net::HTTP.new(uri.host, uri.port) + + request = Net::HTTP::Delete.new(uri.path.empty? ? "/" : uri.path) + request["Mcp-Session-Id"] = @session_id + + response = http.request(request) + result = JSON.parse(response.body) + puts "Response: #{JSON.pretty_generate(result)}" + puts + + @session_id = nil + result + end +end + +# Main script +if __FILE__ == $PROGRAM_NAME + puts "MCP HTTP Client Example" + puts "Make sure the HTTP server is running (ruby examples/http_server.rb)" + puts "=" * 50 + puts + + client = MCPHTTPClient.new + + begin + # Initialize session + client.initialize_session + + # Test ping + client.ping + + # List available tools + client.list_tools + + # Call the example_tool (note: snake_case name) + client.call_tool("example_tool", { a: 5, b: 3 }) + + # Call the echo tool + client.call_tool("echo", { message: "Hello from client!" }) + + # List prompts + client.list_prompts + + # Get a prompt (note: snake_case name) + client.get_prompt("example_prompt", { message: "This is a test message" }) + + # List resources + client.list_resources + + # Read a resource + client.read_resource("test_resource") + rescue => e + puts "Error: #{e.message}" + puts e.backtrace + ensure + # Clean up session + client.close_session + end +end diff --git a/examples/http_server.rb b/examples/http_server.rb new file mode 100644 index 0000000..811775c --- /dev/null +++ b/examples/http_server.rb @@ -0,0 +1,169 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +$LOAD_PATH.unshift(File.expand_path("../lib", __dir__)) +require "mcp" +require "mcp/server/transports/streamable_http_transport" +require "rack" +require "rackup" +require "json" +require "logger" + +# Create a simple tool +class ExampleTool < MCP::Tool + description "A simple example tool that adds two numbers" + input_schema( + properties: { + a: { type: "number" }, + b: { type: "number" }, + }, + required: ["a", "b"], + ) + + class << self + def call(a:, b:) + MCP::Tool::Response.new([{ + type: "text", + text: "The sum of #{a} and #{b} is #{a + b}", + }]) + end + end +end + +# Create a simple prompt +class ExamplePrompt < MCP::Prompt + description "A simple example prompt that echoes back its arguments" + arguments [ + MCP::Prompt::Argument.new( + name: "message", + description: "The message to echo back", + required: true, + ), + ] + + class << self + def template(args, server_context:) + MCP::Prompt::Result.new( + messages: [ + MCP::Prompt::Message.new( + role: "user", + content: MCP::Content::Text.new(args[:message]), + ), + ], + ) + end + end +end + +# Set up the server +server = MCP::Server.new( + name: "example_http_server", + tools: [ExampleTool], + prompts: [ExamplePrompt], + resources: [ + MCP::Resource.new( + uri: "test_resource", + name: "Test resource", + description: "Test resource that echoes back the uri as its content", + mime_type: "text/plain", + ), + ], +) + +server.define_tool( + name: "echo", + description: "A simple example tool that echoes back its arguments", + input_schema: { properties: { message: { type: "string" } }, required: ["message"] }, +) do |message:| + MCP::Tool::Response.new( + [ + { + type: "text", + text: "Hello from echo tool! Message: #{message}", + }, + ], + ) +end + +server.resources_read_handler do |params| + [{ + uri: params[:uri], + mimeType: "text/plain", + text: "Hello from HTTP server resource!", + }] +end + +# Create the Streamable HTTP transport +transport = MCP::Server::Transports::StreamableHTTPTransport.new(server) +server.transport = transport + +# Create a logger for MCP-specific logging +mcp_logger = Logger.new($stdout) +mcp_logger.formatter = proc do |_severity, _datetime, _progname, msg| + "[MCP] #{msg}\n" +end + +# Create a Rack application with logging +app = proc do |env| + request = Rack::Request.new(env) + + # Log MCP-specific details for POST requests + if request.post? + body = request.body.read + request.body.rewind + begin + parsed_body = JSON.parse(body) + mcp_logger.info("Request: #{parsed_body["method"]} (id: #{parsed_body["id"]})") + mcp_logger.debug("Request body: #{JSON.pretty_generate(parsed_body)}") + rescue JSON::ParserError + mcp_logger.warn("Request body (raw): #{body}") + end + end + + # Handle the request + response = transport.handle_request(request) + + # Log the MCP response details + _, _, body = response + if body.is_a?(Array) && !body.empty? && body.first + begin + parsed_response = JSON.parse(body.first) + if parsed_response["error"] + mcp_logger.error("Response error: #{parsed_response["error"]["message"]}") + else + mcp_logger.info("Response: #{parsed_response["result"] ? "success" : "empty"} (id: #{parsed_response["id"]})") + end + mcp_logger.debug("Response body: #{JSON.pretty_generate(parsed_response)}") + rescue JSON::ParserError + mcp_logger.warn("Response body (raw): #{body}") + end + end + + response +end + +# Wrap the app with Rack middleware +rack_app = Rack::Builder.new do + # Use CommonLogger for standard HTTP request logging + use(Rack::CommonLogger, Logger.new($stdout)) + + # Add other useful middleware + use(Rack::ShowExceptions) + + run(app) +end + +# Start the server +puts "Starting MCP HTTP server on http://localhost:9292" +puts "Use POST requests to initialize and send JSON-RPC commands" +puts "Example initialization:" +puts ' curl -X POST http://localhost:9292 -H "Content-Type: application/json" -d \'{"jsonrpc":"2.0","method":"initialize","id":1,"params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}\'' +puts "" +puts "The server will return a session ID in the Mcp-Session-Id header." +puts "Use this session ID for subsequent requests." +puts "" +puts "Press Ctrl+C to stop the server" + +# Run the server +# Use Rackup to run the server +Rackup::Handler.get("puma").run(rack_app, Port: 9292, Host: "localhost") diff --git a/examples/streaming_http_test_client.rb b/examples/streaming_http_test_client.rb new file mode 100644 index 0000000..cd24ab3 --- /dev/null +++ b/examples/streaming_http_test_client.rb @@ -0,0 +1,207 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require "net/http" +require "uri" +require "json" +require "logger" + +# Logger for client operations +logger = Logger.new($stdout) +logger.formatter = proc do |severity, datetime, _progname, msg| + "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" +end + +# Server configuration +SERVER_URL = "http://localhost:9393/mcp" +PROTOCOL_VERSION = "2024-11-05" + +# Helper method to make JSON-RPC requests +def make_request(session_id, method, params = {}, id = nil) + uri = URI(SERVER_URL) + http = Net::HTTP.new(uri.host, uri.port) + + request = Net::HTTP::Post.new(uri) + request["Content-Type"] = "application/json" + request["Mcp-Session-Id"] = session_id if session_id + + body = { + jsonrpc: "2.0", + method: method, + params: params, + id: id || SecureRandom.uuid, + } + + request.body = body.to_json + response = http.request(request) + + { + status: response.code, + headers: response.to_hash, + body: JSON.parse(response.body), + } +rescue => e + { error: e.message } +end + +# Connect to SSE stream +def connect_sse(session_id, logger) + uri = URI(SERVER_URL) + + logger.info("Connecting to SSE stream...") + + Net::HTTP.start(uri.host, uri.port) do |http| + request = Net::HTTP::Get.new(uri) + request["Mcp-Session-Id"] = session_id + request["Accept"] = "text/event-stream" + request["Cache-Control"] = "no-cache" + + http.request(request) do |response| + if response.code == "200" + logger.info("SSE stream connected successfully") + + response.read_body do |chunk| + chunk.split("\n").each do |line| + if line.start_with?("data: ") + data = line[6..-1] + begin + logger.info("SSE data: #{data}") + rescue JSON::ParserError + logger.debug("Non-JSON SSE data: #{data}") + end + elsif line.start_with?(": ") + logger.debug("SSE keepalive received: #{line}") + end + end + end + else + logger.error("Failed to connect to SSE: #{response.code} #{response.message}") + end + end + end +rescue Interrupt + logger.info("SSE connection interrupted by user") +rescue => e + logger.error("SSE connection error: #{e.message}") +end + +# Main client flow +def main + logger = Logger.new($stdout) + logger.formatter = proc do |severity, datetime, _progname, msg| + "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" + end + + puts "=== MCP SSE Test Client ===" + puts "" + + # Step 1: Initialize session + logger.info("Initializing session...") + + init_response = make_request( + nil, + "initialize", + { + protocolVersion: PROTOCOL_VERSION, + capabilities: {}, + clientInfo: { + name: "sse-test-client", + version: "1.0", + }, + }, + "init-1", + ) + + if init_response[:error] + logger.error("Failed to initialize: #{init_response[:error]}") + exit(1) + end + + session_id = init_response[:headers]["mcp-session-id"]&.first + + if session_id.nil? + logger.error("No session ID received") + exit(1) + end + + logger.info("Session initialized: #{session_id}") + logger.info("Server info: #{init_response[:body]["result"]["serverInfo"]}") + + # Step 2: Start SSE connection in a separate thread + sse_thread = Thread.new { connect_sse(session_id, logger) } + + # Give SSE time to connect + sleep(1) + + # Step 3: Interactive menu + loop do + puts "\n=== Available Actions ===" + puts "1. Send custom notification" + puts "2. Test echo" + puts "3. List tools" + puts "0. Exit" + puts "" + print("Choose an action: ") + + choice = gets.chomp + + case choice + when "1" + print("Enter notification message: ") + message = gets.chomp + print("Enter delay in seconds (0 for immediate): ") + delay = gets.chomp.to_f + + response = make_request( + session_id, + "tools/call", + { + name: "notification_tool", + arguments: { + message: message, + delay: delay, + }, + }, + ) + + if response[:body]["result"] + logger.info("Notification tool response: #{response[:body]["result"]["content"]}") + else + logger.error("Error: #{response[:body]["error"]}") + end + + when "2" + print("Enter message to echo: ") + message = gets.chomp + make_request(session_id, "tools/call", { name: "echo", arguments: { message: message } }) + + when "3" + make_request(session_id, "tools/list") + + when "0" + logger.info("Exiting...") + break + + else + puts "Invalid choice" + end + end + + # Clean up + sse_thread.kill if sse_thread.alive? + + # Close session + logger.info("Closing session...") + make_request(session_id, "close") + logger.info("Session closed") +rescue Interrupt + logger.info("Client interrupted by user") +rescue => e + logger.error("Client error: #{e.message}") + logger.error(e.backtrace.join("\n")) +end + +# Run the client +if __FILE__ == $PROGRAM_NAME + main +end diff --git a/examples/streaming_http_test_server.rb b/examples/streaming_http_test_server.rb new file mode 100644 index 0000000..2ff9d96 --- /dev/null +++ b/examples/streaming_http_test_server.rb @@ -0,0 +1,173 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +$LOAD_PATH.unshift(File.expand_path("../lib", __dir__)) +require "mcp" +require "mcp/server/transports/streamable_http_transport" +require "rack" +require "rackup" +require "json" +require "logger" + +# Create a logger for SSE-specific logging +sse_logger = Logger.new($stdout) +sse_logger.formatter = proc do |severity, datetime, _progname, msg| + "[SSE] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" +end + +# Tool that returns a response that will be sent via SSE if a stream is active +class NotificationTool < MCP::Tool + tool_name "notification_tool" + description "Returns a notification message that will be sent via SSE if stream is active" + input_schema( + properties: { + message: { type: "string", description: "Message to send via SSE" }, + delay: { type: "number", description: "Delay in seconds before returning (optional)" }, + }, + required: ["message"], + ) + + class << self + attr_accessor :logger + + def call(message:, delay: 0) + sleep(delay) if delay > 0 + + logger&.info("Returning notification message: #{message}") + + MCP::Tool::Response.new([{ + type: "text", + text: "Notification: #{message} (timestamp: #{Time.now.iso8601})", + }]) + end + end +end + +# Create the server +server = MCP::Server.new( + name: "sse_test_server", + tools: [NotificationTool], + prompts: [], + resources: [], +) + +# Set logger for tools +NotificationTool.logger = sse_logger + +# Add a simple echo tool for basic testing +server.define_tool( + name: "echo", + description: "Simple echo tool", + input_schema: { properties: { message: { type: "string" } }, required: ["message"] }, +) do |message:| + MCP::Tool::Response.new([{ type: "text", text: "Echo: #{message}" }]) +end + +# Create the Streamable HTTP transport +transport = MCP::Server::Transports::StreamableHTTPTransport.new(server) +server.transport = transport + +# Create a logger for MCP request/response logging +mcp_logger = Logger.new($stdout) +mcp_logger.formatter = proc do |_severity, _datetime, _progname, msg| + "[MCP] #{msg}\n" +end + +# Create the Rack application +app = proc do |env| + request = Rack::Request.new(env) + + # Log request details + if request.post? + body = request.body.read + request.body.rewind + begin + parsed_body = JSON.parse(body) + mcp_logger.info("Request: #{parsed_body["method"]} (id: #{parsed_body["id"]})") + + # Log SSE-specific setup + if parsed_body["method"] == "initialize" + sse_logger.info("New client initializing session") + end + rescue JSON::ParserError + mcp_logger.warn("Invalid JSON in request") + end + elsif request.get? + session_id = request.env["HTTP_MCP_SESSION_ID"] || + Rack::Utils.parse_query(request.env["QUERY_STRING"])["sessionId"] + sse_logger.info("SSE connection request for session: #{session_id}") + end + + # Handle the request + response = transport.handle_request(request) + + # Log response details + status, headers, body = response + if body.is_a?(Array) && !body.empty? && request.post? + begin + parsed_response = JSON.parse(body.first) + if parsed_response["error"] + mcp_logger.error("Response error: #{parsed_response["error"]["message"]}") + elsif parsed_response["accepted"] + # Response was sent via SSE + sse_logger.info("Response sent via SSE stream") + else + mcp_logger.info("Response: success (id: #{parsed_response["id"]})") + + # Log session ID for initialization + if headers["Mcp-Session-Id"] + sse_logger.info("Session created: #{headers["Mcp-Session-Id"]}") + end + end + rescue JSON::ParserError + mcp_logger.warn("Invalid JSON in response") + end + elsif request.get? && status == 200 + sse_logger.info("SSE stream established") + end + + response +end + +# Build the Rack application with middleware +rack_app = Rack::Builder.new do + use(Rack::CommonLogger, Logger.new($stdout)) + use(Rack::ShowExceptions) + run(app) +end + +# Print usage instructions +puts "=== MCP Streaming HTTP Test Server ===" +puts "" +puts "Starting server on http://localhost:9393" +puts "" +puts "Available Tools:" +puts "1. NotificationTool - Returns messages that are sent via SSE when stream is active" +puts "2. echo - Simple echo tool" +puts "" +puts "Testing SSE:" +puts "" +puts "1. Initialize session:" +puts ' curl -X POST http://localhost:9393 -H "Content-Type: application/json" \\' +puts ' -d \'{"jsonrpc":"2.0","method":"initialize","id":1,"params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"sse-test","version":"1.0"}}}\'' +puts "" +puts "2. Connect SSE stream (use the session ID from step 1):" +puts ' curl -N -H "Mcp-Session-Id: YOUR_SESSION_ID" http://localhost:9393' +puts "" +puts "3. In another terminal, test tools (responses will be sent via SSE if stream is active):" +puts "" +puts " Echo tool:" +puts ' curl -X POST http://localhost:9393 -H "Content-Type: application/json" -H "Mcp-Session-Id: YOUR_SESSION_ID" \\' +puts ' -d \'{"jsonrpc":"2.0","method":"tools/call","id":2,"params":{"name":"echo","arguments":{"message":"Hello SSE!"}}}\'' +puts "" +puts " Notification tool (with 2 second delay):" +puts ' curl -X POST http://localhost:9393 -H "Content-Type: application/json" -H "Mcp-Session-Id: YOUR_SESSION_ID" \\' +puts ' -d \'{"jsonrpc":"2.0","method":"tools/call","id":3,"params":{"name":"notification_tool","arguments":{"message":"Hello SSE!", "delay": 2}}}\'' +puts "" +puts "Note: When an SSE stream is active, tool responses will appear in the SSE stream and the POST request will return {\"accepted\": true}" +puts "" +puts "Press Ctrl+C to stop the server" +puts "" + +# Start the server +Rackup::Handler.get("puma").run(rack_app, Port: 9393, Host: "localhost") diff --git a/lib/mcp.rb b/lib/mcp.rb index eff9de8..84ccce4 100644 --- a/lib/mcp.rb +++ b/lib/mcp.rb @@ -13,6 +13,7 @@ require_relative "mcp/resource/embedded" require_relative "mcp/resource_template" require_relative "mcp/server" +require_relative "mcp/server/transports/streamable_http_transport" require_relative "mcp/server/transports/stdio_transport" require_relative "mcp/string_utils" require_relative "mcp/tool" diff --git a/lib/mcp/methods.rb b/lib/mcp/methods.rb index 4f0b0b2..9a2db26 100644 --- a/lib/mcp/methods.rb +++ b/lib/mcp/methods.rb @@ -21,6 +21,11 @@ module Methods SAMPLING_CREATE_MESSAGE = "sampling/createMessage" + # Notification methods + NOTIFICATIONS_TOOLS_LIST_CHANGED = "notifications/tools/list_changed" + NOTIFICATIONS_PROMPTS_LIST_CHANGED = "notifications/prompts/list_changed" + NOTIFICATIONS_RESOURCES_LIST_CHANGED = "notifications/resources/list_changed" + class MissingRequiredCapabilityError < StandardError attr_reader :method attr_reader :capability diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index 2c39fb0..d246121 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -24,7 +24,7 @@ def initialize(message, request, error_type: :internal_error, original_error: ni include Instrumentation attr_writer :capabilities - attr_accessor :name, :version, :tools, :prompts, :resources, :server_context, :configuration + attr_accessor :name, :version, :tools, :prompts, :resources, :server_context, :configuration, :transport def initialize( name: "model_context_protocol", @@ -35,7 +35,8 @@ def initialize( resource_templates: [], server_context: nil, configuration: nil, - capabilities: nil + capabilities: nil, + transport: nil ) @name = name @version = version @@ -68,6 +69,7 @@ def initialize( Methods::COMPLETION_COMPLETE => ->(_) {}, Methods::LOGGING_SET_LEVEL => ->(_) {}, } + @transport = transport end def capabilities @@ -96,6 +98,30 @@ def define_prompt(name: nil, description: nil, arguments: [], &block) @prompts[prompt.name_value] = prompt end + def notify_tools_list_changed + return unless @transport + + @transport.send_notification(Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED) + rescue => e + report_exception(e, { notification: "tools_list_changed" }) + end + + def notify_prompts_list_changed + return unless @transport + + @transport.send_notification(Methods::NOTIFICATIONS_PROMPTS_LIST_CHANGED) + rescue => e + report_exception(e, { notification: "prompts_list_changed" }) + end + + def notify_resources_list_changed + return unless @transport + + @transport.send_notification(Methods::NOTIFICATIONS_RESOURCES_LIST_CHANGED) + rescue => e + report_exception(e, { notification: "resources_list_changed" }) + end + def resources_list_handler(&block) @capabilities.support_resources @handlers[Methods::RESOURCES_LIST] = block diff --git a/lib/mcp/server/transports/stdio_transport.rb b/lib/mcp/server/transports/stdio_transport.rb index d4aae27..702af08 100644 --- a/lib/mcp/server/transports/stdio_transport.rb +++ b/lib/mcp/server/transports/stdio_transport.rb @@ -37,6 +37,20 @@ def send_response(message) $stdout.puts(json_message) $stdout.flush end + + def send_notification(method, params = nil) + notification = { + jsonrpc: "2.0", + method: method, + } + notification[:params] = params if params + + send_response(notification) + true + rescue => e + MCP.configuration.exception_reporter.call(e, { error: "Failed to send notification" }) + false + end end end end diff --git a/lib/mcp/server/transports/streamable_http_transport.rb b/lib/mcp/server/transports/streamable_http_transport.rb new file mode 100644 index 0000000..4c91f4c --- /dev/null +++ b/lib/mcp/server/transports/streamable_http_transport.rb @@ -0,0 +1,289 @@ +# frozen_string_literal: true + +require_relative "../../transport" +require "json" +require "securerandom" + +module MCP + class Server + module Transports + class StreamableHTTPTransport < Transport + def initialize(server) + super + # { session_id => { stream: stream_object } + @sessions = {} + @mutex = Mutex.new + end + + def handle_request(request) + case request.env["REQUEST_METHOD"] + when "POST" + handle_post(request) + when "GET" + handle_get(request) + when "DELETE" + handle_delete(request) + else + [405, { "Content-Type" => "application/json" }, [{ error: "Method not allowed" }.to_json]] + end + end + + def close + @mutex.synchronize do + @sessions.each_key { |session_id| cleanup_session_unsafe(session_id) } + end + end + + def send_notification(notification, session_id: nil) + @mutex.synchronize do + if session_id + # Send to specific session + session = @sessions[session_id] + return false unless session && session[:stream] + + begin + send_to_stream(session[:stream], notification) + true + rescue IOError, Errno::EPIPE => e + MCP.configuration.exception_reporter.call( + e, + { session_id: session_id, error: "Failed to send notification" }, + ) + cleanup_session_unsafe(session_id) + false + end + else + # Broadcast to all connected SSE sessions + sent_count = 0 + failed_sessions = [] + + @sessions.each do |sid, session| + next unless session[:stream] + + begin + send_to_stream(session[:stream], notification) + sent_count += 1 + rescue IOError, Errno::EPIPE => e + MCP.configuration.exception_reporter.call( + e, + { session_id: sid, error: "Failed to send notification" }, + ) + failed_sessions << sid + end + end + + # Clean up failed sessions + failed_sessions.each { |sid| cleanup_session_unsafe(sid) } + + sent_count + end + end + end + + private + + def send_to_stream(stream, data) + message = data.is_a?(String) ? data : data.to_json + stream.write("data: #{message}\n\n") + stream.flush if stream.respond_to?(:flush) + end + + def send_ping_to_stream(stream) + stream.write(": ping #{Time.now.iso8601}\n\n") + stream.flush if stream.respond_to?(:flush) + end + + def handle_post(request) + body_string = request.body.read + session_id = extract_session_id(request) + + body = parse_request_body(body_string) + return body unless body.is_a?(Hash) # Error response + + if body["method"] == "initialize" + handle_initialization(body_string, body) + else + handle_regular_request(body_string, session_id) + end + rescue StandardError => e + ModelContextProtocol.configuration.exception_reporter.call(e, { request: body_string }) + [500, { "Content-Type" => "application/json" }, [{ error: "Internal server error" }.to_json]] + end + + def handle_get(request) + session_id = extract_session_id(request) + + return missing_session_id_response unless session_id + return session_not_found_response unless session_exists?(session_id) + + setup_sse_stream(session_id) + end + + def handle_delete(request) + session_id = request.env["HTTP_MCP_SESSION_ID"] + + return [ + 400, + { "Content-Type" => "application/json" }, + [{ error: "Missing session ID" }.to_json], + ] unless session_id + + cleanup_session(session_id) + [200, { "Content-Type" => "application/json" }, [{ success: true }.to_json]] + end + + def cleanup_session(session_id) + @mutex.synchronize do + cleanup_session_unsafe(session_id) + end + end + + def cleanup_session_unsafe(session_id) + session = @sessions[session_id] + return unless session + + begin + session[:stream]&.close + rescue + nil + end + @sessions.delete(session_id) + end + + def extract_session_id(request) + request.env["HTTP_MCP_SESSION_ID"] + end + + def parse_request_body(body_string) + JSON.parse(body_string) + rescue JSON::ParserError, TypeError + [400, { "Content-Type" => "application/json" }, [{ error: "Invalid JSON" }.to_json]] + end + + def handle_initialization(body_string, body) + session_id = SecureRandom.uuid + + @mutex.synchronize do + @sessions[session_id] = { + stream: nil, + } + end + + response = @server.handle_json(body_string) + + headers = { + "Content-Type" => "application/json", + "Mcp-Session-Id" => session_id, + } + + [200, headers, [response]] + end + + def handle_regular_request(body_string, session_id) + # If session ID is provided, but not in the sessions hash, return an error + if session_id && !@sessions.key?(session_id) + return [400, { "Content-Type" => "application/json" }, [{ error: "Invalid session ID" }.to_json]] + end + + response = @server.handle_json(body_string) + stream = get_session_stream(session_id) if session_id + + if stream + send_response_to_stream(stream, response, session_id) + else + [200, { "Content-Type" => "application/json" }, [response]] + end + end + + def get_session_stream(session_id) + @mutex.synchronize { @sessions[session_id]&.fetch(:stream, nil) } + end + + def send_response_to_stream(stream, response, session_id) + message = JSON.parse(response) + send_to_stream(stream, message) + [200, { "Content-Type" => "application/json" }, [{ accepted: true }.to_json]] + rescue IOError, Errno::EPIPE => e + ModelContextProtocol.configuration.exception_reporter.call( + e, + { session_id: session_id, error: "Stream closed during response" }, + ) + cleanup_session(session_id) + [200, { "Content-Type" => "application/json" }, [response]] + end + + def session_exists?(session_id) + @mutex.synchronize { @sessions.key?(session_id) } + end + + def missing_session_id_response + [400, { "Content-Type" => "application/json" }, [{ error: "Missing session ID" }.to_json]] + end + + def session_not_found_response + [404, { "Content-Type" => "application/json" }, [{ error: "Session not found" }.to_json]] + end + + def setup_sse_stream(session_id) + body = create_sse_body(session_id) + + headers = { + "Content-Type" => "text/event-stream", + "Cache-Control" => "no-cache", + "Connection" => "keep-alive", + } + + [200, headers, body] + end + + def create_sse_body(session_id) + proc do |stream| + store_stream_for_session(session_id, stream) + start_keepalive_thread(session_id) + end + end + + def store_stream_for_session(session_id, stream) + @mutex.synchronize do + if @sessions[session_id] + @sessions[session_id][:stream] = stream + else + stream.close + end + end + end + + def start_keepalive_thread(session_id) + Thread.new do + while session_active_with_stream?(session_id) + sleep(30) + send_keepalive_ping(session_id) + end + rescue StandardError => e + MCP.configuration.exception_reporter.call(e, { session_id: session_id }) + ensure + cleanup_session(session_id) + end + end + + def session_active_with_stream?(session_id) + @mutex.synchronize { @sessions.key?(session_id) && @sessions[session_id][:stream] } + end + + def send_keepalive_ping(session_id) + @mutex.synchronize do + if @sessions[session_id] && @sessions[session_id][:stream] + send_ping_to_stream(@sessions[session_id][:stream]) + end + end + rescue IOError, Errno::EPIPE => e + MCP.configuration.exception_reporter.call( + e, + { session_id: session_id, error: "Stream closed" }, + ) + raise # Re-raise to exit the keepalive loop + end + end + end + end +end diff --git a/lib/mcp/transport.rb b/lib/mcp/transport.rb index 7e4b63e..a995c3a 100644 --- a/lib/mcp/transport.rb +++ b/lib/mcp/transport.rb @@ -2,32 +2,44 @@ module MCP class Transport + # Initialize the transport with the server instance def initialize(server) @server = server end + # Send a response to the client def send_response(response) raise NotImplementedError, "Subclasses must implement send_response" end + # Open the transport connection def open raise NotImplementedError, "Subclasses must implement open" end + # Close the transport connection def close raise NotImplementedError, "Subclasses must implement close" end - private + # Handle a JSON request + # Returns a response that should be sent back to the client + def handle_json_request(request) + response = @server.handle_json(request) + send_response(response) if response + end + # Handle an incoming request + # Returns a response that should be sent back to the client def handle_request(request) response = @server.handle(request) send_response(response) if response end - def handle_json_request(request) - response = @server.handle_json(request) - send_response(response) if response + # Send a notification to the client + # Returns true if the notification was sent successfully + def send_notification(method, params = nil) + raise NotImplementedError, "Subclasses must implement send_notification" end end end diff --git a/mcp.gemspec b/mcp.gemspec index c598f45..1228881 100644 --- a/mcp.gemspec +++ b/mcp.gemspec @@ -29,5 +29,8 @@ Gem::Specification.new do |spec| spec.add_dependency("json_rpc_handler", "~> 0.1") spec.add_development_dependency("activesupport") + spec.add_development_dependency("puma", ">= 5.0.0") + spec.add_development_dependency("rack", ">= 2.0.0") + spec.add_development_dependency("rackup", ">= 2.1.0") spec.add_development_dependency("sorbet-static-and-runtime") end diff --git a/test/mcp/server/transports/stdio_notification_integration_test.rb b/test/mcp/server/transports/stdio_notification_integration_test.rb new file mode 100644 index 0000000..88b494e --- /dev/null +++ b/test/mcp/server/transports/stdio_notification_integration_test.rb @@ -0,0 +1,249 @@ +# frozen_string_literal: true + +require "test_helper" +require "json" +require "mcp/server" +require "mcp/server/transports/stdio_transport" + +module MCP + class Server + module Transports + class StdioNotificationIntegrationTest < ActiveSupport::TestCase + class MockIO + attr_reader :output + + def initialize + @output = [] + @closed = false + end + + def puts(message) + @output << message + end + + def write(message) + @output << message + message.length + end + + def gets + nil # Simulate end of input + end + + def set_encoding(encoding) # rubocop:disable Naming/AccessorMethodName + # Mock implementation + end + + def flush + # Mock implementation + end + + def close + @closed = true + end + + def closed? + @closed + end + end + + setup do + @original_stdout = $stdout + @original_stdin = $stdin + + @mock_stdout = MockIO.new + @mock_stdin = MockIO.new + + $stdout = @mock_stdout + $stdin = @mock_stdin + + @server = Server.new( + name: "test_server", + tools: [], + prompts: [], + resources: [], + ) + @transport = StdioTransport.new(@server) + @server.transport = @transport + end + + teardown do + $stdout = @original_stdout + $stdin = @original_stdin + end + + test "server notification methods send JSON-RPC notifications through StdioTransport" do + # Test tools notification + @server.notify_tools_list_changed + + # Test prompts notification + @server.notify_prompts_list_changed + + # Test resources notification + @server.notify_resources_list_changed + + # Check the notifications were sent + assert_equal 3, @mock_stdout.output.size + + # Parse and verify each notification + notifications = @mock_stdout.output.map { |msg| JSON.parse(msg) } + + assert_equal "2.0", notifications[0]["jsonrpc"] + assert_equal Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED, notifications[0]["method"] + assert_nil notifications[0]["params"] + + assert_equal "2.0", notifications[1]["jsonrpc"] + assert_equal Methods::NOTIFICATIONS_PROMPTS_LIST_CHANGED, notifications[1]["method"] + assert_nil notifications[1]["params"] + + assert_equal "2.0", notifications[2]["jsonrpc"] + assert_equal Methods::NOTIFICATIONS_RESOURCES_LIST_CHANGED, notifications[2]["method"] + assert_nil notifications[2]["params"] + end + + test "notifications include params when provided" do + # Test the transport's send_notification directly with params + result = @transport.send_notification("test/notification", { data: "test_value" }) + + assert result + assert_equal 1, @mock_stdout.output.size + + notification = JSON.parse(@mock_stdout.output.first) + assert_equal "2.0", notification["jsonrpc"] + assert_equal "test/notification", notification["method"] + assert_equal({ "data" => "test_value" }, notification["params"]) + end + + test "server continues to work when stdout is closed" do + # Close stdout + @mock_stdout.close + + # Server notifications should not raise errors + assert_nothing_raised do + @server.notify_tools_list_changed + @server.notify_prompts_list_changed + @server.notify_resources_list_changed + end + end + + test "notifications work with dynamic tool additions" do + # Define a new tool + @server.define_tool( + name: "dynamic_tool", + description: "A dynamically added tool", + ) do |**_args| + { result: "success" } + end + + # Clear previous output + @mock_stdout.output.clear + + # Manually trigger notification + @server.notify_tools_list_changed + + # Check the notification was sent + assert_equal 1, @mock_stdout.output.size + + notification = JSON.parse(@mock_stdout.output.first) + assert_equal Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED, notification["method"] + + # Verify the tool was added to the server + assert @server.tools.key?("dynamic_tool") + end + + test "notifications are properly formatted JSON-RPC 2.0 messages" do + # Send a notification + @server.notify_prompts_list_changed + + # Verify format + assert_equal 1, @mock_stdout.output.size + output = @mock_stdout.output.first + + # Should be valid JSON + notification = JSON.parse(output) + + # Should have required JSON-RPC 2.0 fields + assert_equal "2.0", notification["jsonrpc"] + assert notification.key?("method") + refute notification.key?("id") # Notifications should not have an id + + # Method should be the expected notification type + assert_equal Methods::NOTIFICATIONS_PROMPTS_LIST_CHANGED, notification["method"] + end + + test "multiple notifications are sent as separate JSON messages" do + # Send multiple notifications rapidly + 5.times do + @server.notify_tools_list_changed + end + + # Each should be a separate JSON message + assert_equal 5, @mock_stdout.output.size + + # All should be parseable as JSON + @mock_stdout.output.each do |msg| + notification = JSON.parse(msg) + assert_equal "2.0", notification["jsonrpc"] + assert_equal Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED, notification["method"] + end + end + + test "transport handles errors gracefully" do + # Create a stdout that raises errors + error_stdout = Class.new(MockIO) do + def puts(message) + raise IOError, "Simulated IO error" + end + end.new + + $stdout = error_stdout + + # Notification should return false but not raise + result = @transport.send_notification("test/notification") + refute result + end + + test "server notification flow works end-to-end with StdioTransport" do + # This test verifies the complete integration from server to transport + + # Start with no output + assert_empty @mock_stdout.output + + # Add a prompt and notify + @server.define_prompt( + name: "test_prompt", + description: "Test prompt", + ) do |_args, _server_context:| + MCP::PromptResponse.new(messages: [{ role: "user", content: "Test" }]) + end + + # Manually trigger notification + @server.notify_prompts_list_changed + + # Verify notification was sent + assert_equal 1, @mock_stdout.output.size + notification = JSON.parse(@mock_stdout.output.first) + assert_equal Methods::NOTIFICATIONS_PROMPTS_LIST_CHANGED, notification["method"] + + # Add a resource and notify + @server.resources = [ + MCP::Resource.new( + uri: "test://resource", + name: "Test Resource", + description: "A test resource", + mime_type: "text/plain", + ), + ] + + # Manually trigger notification + @server.notify_resources_list_changed + + # Verify both notifications were sent + assert_equal 2, @mock_stdout.output.size + second_notification = JSON.parse(@mock_stdout.output.last) + assert_equal Methods::NOTIFICATIONS_RESOURCES_LIST_CHANGED, second_notification["method"] + end + end + end + end +end diff --git a/test/mcp/server/transports/streamable_http_notification_integration_test.rb b/test/mcp/server/transports/streamable_http_notification_integration_test.rb new file mode 100644 index 0000000..b3362e1 --- /dev/null +++ b/test/mcp/server/transports/streamable_http_notification_integration_test.rb @@ -0,0 +1,249 @@ +# frozen_string_literal: true + +require "test_helper" +require "json" +require "rack" +require "mcp/server" +require "mcp/server/transports/streamable_http_transport" + +module MCP + class Server + module Transports + class StreamableHTTPNotificationIntegrationTest < ActiveSupport::TestCase + setup do + @server = Server.new( + name: "test_server", + tools: [], + prompts: [], + resources: [], + ) + @transport = StreamableHTTPTransport.new(@server) + @server.transport = @transport + end + + test "server notification methods send SSE notifications through HTTP transport" do + # Initialize a session + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Connect with SSE + io = StringIO.new + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + response = @transport.handle_request(get_request) + response[2].call(io) if response[2].is_a?(Proc) + + # Give the stream time to set up + sleep(0.1) + + # Test tools notification + @server.notify_tools_list_changed + + # Test prompts notification + @server.notify_prompts_list_changed + + # Test resources notification + @server.notify_resources_list_changed + + # Check the notifications were received + io.rewind + output = io.read + + assert_includes output, "data: #{Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED}" + assert_includes output, "data: #{Methods::NOTIFICATIONS_PROMPTS_LIST_CHANGED}" + assert_includes output, "data: #{Methods::NOTIFICATIONS_RESOURCES_LIST_CHANGED}" + end + + test "notifications are broadcast to all connected sessions" do + # Create two sessions + init_request1 = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response1 = @transport.handle_request(init_request1) + session_id1 = init_response1[1]["Mcp-Session-Id"] + + init_request2 = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "456" }.to_json, + ) + init_response2 = @transport.handle_request(init_request2) + session_id2 = init_response2[1]["Mcp-Session-Id"] + + # Connect both sessions with SSE + io1 = StringIO.new + get_request1 = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id1 }, + ) + response1 = @transport.handle_request(get_request1) + response1[2].call(io1) if response1[2].is_a?(Proc) + + io2 = StringIO.new + get_request2 = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id2 }, + ) + response2 = @transport.handle_request(get_request2) + response2[2].call(io2) if response2[2].is_a?(Proc) + + # Give the streams time to set up + sleep(0.1) + + # Send notification through server + @server.notify_tools_list_changed + + # Check both sessions received the notification + io1.rewind + output1 = io1.read + assert_includes output1, "data: #{Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED}" + + io2.rewind + output2 = io2.read + assert_includes output2, "data: #{Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED}" + end + + test "server continues to work when SSE connection is closed" do + # Initialize a session + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Connect with SSE + io = StringIO.new + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + response = @transport.handle_request(get_request) + response[2].call(io) if response[2].is_a?(Proc) + + # Give the stream time to set up + sleep(0.1) + + # Close the stream + io.close + + # Server notifications should not raise errors + assert_nothing_raised do + @server.notify_tools_list_changed + @server.notify_prompts_list_changed + @server.notify_resources_list_changed + end + end + + test "notifications work with dynamic tool additions" do + # Initialize a session + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Connect with SSE + io = StringIO.new + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + response = @transport.handle_request(get_request) + response[2].call(io) if response[2].is_a?(Proc) + + # Give the stream time to set up + sleep(0.1) + + # Define a new tool (simulating dynamic tool addition) + @server.define_tool( + name: "dynamic_tool", + description: "A dynamically added tool", + ) do |**_args| + { result: "success" } + end + + # Manually trigger notification (since we removed the automatic triggers) + @server.notify_tools_list_changed + + # Check the notification was received + io.rewind + output = io.read + assert_includes output, "data: #{Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED}" + + # Verify the tool was added to the server + assert @server.tools.key?("dynamic_tool") + end + + test "SSE format is correct for notifications" do + # Initialize a session + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Connect with SSE + io = StringIO.new + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + response = @transport.handle_request(get_request) + response[2].call(io) if response[2].is_a?(Proc) + + # Give the stream time to set up + sleep(0.1) + + # Send a notification + @server.notify_tools_list_changed + + # Check SSE format + io.rewind + output = io.read + + # SSE format should be "data: \n\n" + assert_match(/data: #{Regexp.escape(Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED)}\n/, output) + end + + private + + def create_rack_request(method, path, headers, body = nil) + env = { + "REQUEST_METHOD" => method, + "PATH_INFO" => path, + "rack.input" => StringIO.new(body.to_s), + }.merge(headers) + + Rack::Request.new(env) + end + end + end + end +end diff --git a/test/mcp/server/transports/streamable_http_transport_test.rb b/test/mcp/server/transports/streamable_http_transport_test.rb new file mode 100644 index 0000000..eb0cf7e --- /dev/null +++ b/test/mcp/server/transports/streamable_http_transport_test.rb @@ -0,0 +1,483 @@ +# frozen_string_literal: true + +require "test_helper" +require "json" +require "rack" +require "mcp/server" +require "mcp/server/transports/streamable_http_transport" + +module MCP + class Server + module Transports + class StreamableHTTPTransportTest < ActiveSupport::TestCase + setup do + @server = Server.new( + name: "test_server", + tools: [], + prompts: [], + resources: [], + ) + @transport = StreamableHTTPTransport.new(@server) + end + + test "handles POST request with valid JSON-RPC message" do + # First create a session + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Now make the ping request with the session ID + request = create_rack_request( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_MCP_SESSION_ID" => session_id, + }, + { jsonrpc: "2.0", method: "ping", id: "123" }.to_json, + ) + + response = @transport.handle_request(request) + assert_equal 200, response[0] + assert_equal({ "Content-Type" => "application/json" }, response[1]) + + body = JSON.parse(response[2][0]) + assert_equal "2.0", body["jsonrpc"] + assert_equal "123", body["id"] + assert_empty(body["result"]) + end + + test "handles POST request with invalid JSON" do + request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + "invalid json", + ) + + response = @transport.handle_request(request) + assert_equal 400, response[0] + assert_equal({ "Content-Type" => "application/json" }, response[1]) + + body = JSON.parse(response[2][0]) + assert_equal "Invalid JSON", body["error"] + end + + test "handles POST request with initialize method" do + request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + + response = @transport.handle_request(request) + assert_equal 200, response[0] + assert_equal "application/json", response[1]["Content-Type"] + assert response[1]["Mcp-Session-Id"] + + body = JSON.parse(response[2][0]) + assert_equal "2.0", body["jsonrpc"] + assert_equal "123", body["id"] + assert_equal "2024-11-05", body["result"]["protocolVersion"] + end + + test "handles GET request with valid session ID" do + # First create a session with initialize + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Then try to connect with GET + request = create_rack_request( + "GET", + "/", + { + "HTTP_MCP_SESSION_ID" => session_id, + }, + ) + + response = @transport.handle_request(request) + assert_equal 200, response[0] + assert_equal "text/event-stream", response[1]["Content-Type"] + assert response[2].is_a?(Proc) # The body should be a Proc for streaming + end + + test "handles GET request with missing session ID" do + request = create_rack_request( + "GET", + "/", + {}, + ) + + response = @transport.handle_request(request) + assert_equal 400, response[0] + assert_equal({ "Content-Type" => "application/json" }, response[1]) + + body = JSON.parse(response[2][0]) + assert_equal "Missing session ID", body["error"] + end + + test "handles GET request with invalid session ID" do + request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => "invalid_id" }, + ) + + response = @transport.handle_request(request) + assert_equal 404, response[0] + assert_equal({ "Content-Type" => "application/json" }, response[1]) + + body = JSON.parse(response[2][0]) + assert_equal "Session not found", body["error"] + end + + test "handles DELETE request with valid session ID" do + # First create a session with initialize + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Then try to delete it + request = create_rack_request( + "DELETE", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + + response = @transport.handle_request(request) + assert_equal 200, response[0] + assert_equal({ "Content-Type" => "application/json" }, response[1]) + + body = JSON.parse(response[2][0]) + assert body["success"] + end + + test "handles DELETE request with missing session ID" do + request = create_rack_request( + "DELETE", + "/", + {}, + ) + + response = @transport.handle_request(request) + assert_equal 400, response[0] + assert_equal({ "Content-Type" => "application/json" }, response[1]) + + body = JSON.parse(response[2][0]) + assert_equal "Missing session ID", body["error"] + end + + test "closes transport and cleans up session" do + # First create a session with initialize + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Then connect with GET + io = StringIO.new + request = create_rack_request( + "GET", + "/", + { + "HTTP_MCP_SESSION_ID" => session_id, + }, + ) + response = @transport.handle_request(request) + # Call the body proc with our StringIO + response[2].call(io) if response[2].is_a?(Proc) + + # Give the background thread a moment to set up + sleep(0.01) + + # Verify session exists before closing + assert @transport.instance_variable_get(:@sessions).key?(session_id) + + # Close the transport without session context (closes all sessions) + @transport.close + + # Verify session was cleaned up + assert_empty @transport.instance_variable_get(:@sessions) + end + + test "sends notification to correct session with multiple active sessions" do + # Create first session + init_request1 = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response1 = @transport.handle_request(init_request1) + session_id1 = init_response1[1]["Mcp-Session-Id"] + + # Create second session + init_request2 = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "456" }.to_json, + ) + init_response2 = @transport.handle_request(init_request2) + session_id2 = init_response2[1]["Mcp-Session-Id"] + + # Connect first session with GET + io1 = StringIO.new + get_request1 = create_rack_request( + "GET", + "/", + { + "HTTP_MCP_SESSION_ID" => session_id1, + }, + ) + response1 = @transport.handle_request(get_request1) + response1[2].call(io1) if response1[2].is_a?(Proc) + + # Connect second session with GET + io2 = StringIO.new + get_request2 = create_rack_request( + "GET", + "/", + { + "HTTP_MCP_SESSION_ID" => session_id2, + }, + ) + response2 = @transport.handle_request(get_request2) + response2[2].call(io2) if response2[2].is_a?(Proc) + + # Give the streams time to be fully set up + sleep(0.2) + + # Verify sessions are set up + assert @transport.instance_variable_get(:@sessions).key?(session_id1), "Session 1 not found in @sessions" + assert @transport.instance_variable_get(:@sessions).key?(session_id2), "Session 2 not found in @sessions" + + # Test that notifications go to the correct session based on the request context + # First, make a request as session 1 + request_as_session1 = create_rack_request( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_MCP_SESSION_ID" => session_id1, + }, + { jsonrpc: "2.0", method: "ping", id: "789" }.to_json, + ) + + # Monkey-patch handle_json on the server to send a notification when called + original_handle_json = @server.method(:handle_json) + transport = @transport # Capture the transport in a local variable + @server.define_singleton_method(:handle_json) do |request| + result = original_handle_json.call(request) + # Send notification while still in request context - broadcast to all sessions + notification = { jsonrpc: "2.0", method: "test_notification", params: { session: "current" } } + transport.send_notification(notification) + result + end + + # Handle request from session 1 + @transport.handle_request(request_as_session1) + + # Make a request as session 2 + request_as_session2 = create_rack_request( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_MCP_SESSION_ID" => session_id2, + }, + { jsonrpc: "2.0", method: "ping", id: "890" }.to_json, + ) + + # Handle request from session 2 + @transport.handle_request(request_as_session2) + + # Check that each session received one notification + io1.rewind + output1 = io1.read + # Session 1 should have received two notifications (one from each request since we broadcast) + assert_equal 2, + output1.scan(/data: {"jsonrpc":"2.0","method":"test_notification","params":{"session":"current"}}/).count + + io2.rewind + output2 = io2.read + # Session 2 should have received two notifications (one from each request since we broadcast) + assert_equal 2, + output2.scan(/data: {"jsonrpc":"2.0","method":"test_notification","params":{"session":"current"}}/).count + end + + test "send_notification to specific session" do + # Create and initialize a session + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Connect with SSE + io = StringIO.new + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + response = @transport.handle_request(get_request) + response[2].call(io) if response[2].is_a?(Proc) + + # Give the stream time to set up + sleep(0.1) + + # Send notification to specific session + notification = { jsonrpc: "2.0", method: "test_notification", params: { message: "Hello" } } + result = @transport.send_notification(notification, session_id: session_id) + + assert result + + # Check the notification was received + io.rewind + output = io.read + assert_includes output, + "data: {\"jsonrpc\":\"2.0\",\"method\":\"test_notification\",\"params\":{\"message\":\"Hello\"}}" + end + + test "send_notification broadcasts to all sessions when no session_id" do + # Create two sessions + init_request1 = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response1 = @transport.handle_request(init_request1) + session_id1 = init_response1[1]["Mcp-Session-Id"] + + init_request2 = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "456" }.to_json, + ) + init_response2 = @transport.handle_request(init_request2) + session_id2 = init_response2[1]["Mcp-Session-Id"] + + # Connect both sessions with SSE + io1 = StringIO.new + get_request1 = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id1 }, + ) + response1 = @transport.handle_request(get_request1) + response1[2].call(io1) if response1[2].is_a?(Proc) + + io2 = StringIO.new + get_request2 = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id2 }, + ) + response2 = @transport.handle_request(get_request2) + response2[2].call(io2) if response2[2].is_a?(Proc) + + # Give the streams time to set up + sleep(0.1) + + # Broadcast notification to all sessions + notification = { jsonrpc: "2.0", method: "broadcast", params: { message: "Hello everyone" } } + sent_count = @transport.send_notification(notification) + + assert_equal 2, sent_count + + # Check both sessions received the notification + io1.rewind + output1 = io1.read + assert_includes output1, + "data: {\"jsonrpc\":\"2.0\",\"method\":\"broadcast\",\"params\":{\"message\":\"Hello everyone\"}}" + + io2.rewind + output2 = io2.read + assert_includes output2, + "data: {\"jsonrpc\":\"2.0\",\"method\":\"broadcast\",\"params\":{\"message\":\"Hello everyone\"}}" + end + + test "send_notification returns false for non-existent session" do + result = @transport.send_notification({ message: "test" }, session_id: "non_existent") + refute result + end + + test "send_notification handles closed streams gracefully" do + # Create and initialize a session + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Connect with SSE + io = StringIO.new + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + response = @transport.handle_request(get_request) + response[2].call(io) if response[2].is_a?(Proc) + + # Give the stream time to set up + sleep(0.1) + + # Close the stream + io.close + + # Try to send notification + result = @transport.send_notification({ message: "test" }, session_id: session_id) + + # Should return false and clean up the session + refute result + + # Verify session was cleaned up + assert_not @transport.instance_variable_get(:@sessions).key?(session_id) + end + + private + + def create_rack_request(method, path, headers, body = nil) + env = { + "REQUEST_METHOD" => method, + "PATH_INFO" => path, + "rack.input" => StringIO.new(body.to_s), + }.merge(headers) + + Rack::Request.new(env) + end + end + end + end +end diff --git a/test/mcp/server_notification_test.rb b/test/mcp/server_notification_test.rb new file mode 100644 index 0000000..af35936 --- /dev/null +++ b/test/mcp/server_notification_test.rb @@ -0,0 +1,130 @@ +# typed: true +# frozen_string_literal: true + +require "test_helper" + +module MCP + class ServerNotificationTest < ActiveSupport::TestCase + include InstrumentationTestHelper + + class MockTransport < Transport + attr_reader :notifications + + def initialize(server) + super + @notifications = [] + end + + def send_notification(method, params = nil) + @notifications << { method: method, params: params } + true + end + + def send_response(response); end + def open; end + def close; end + def handle_request(request); end + end + + setup do + configuration = MCP::Configuration.new + configuration.instrumentation_callback = instrumentation_helper.callback + + @server = Server.new( + name: "test_server", + version: "1.0.0", + configuration: configuration, + ) + + @mock_transport = MockTransport.new(@server) + @server.transport = @mock_transport + end + + test "#notify_tools_list_changed sends notification through transport" do + @server.notify_tools_list_changed + + assert_equal 1, @mock_transport.notifications.size + notification = @mock_transport.notifications.first + assert_equal Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED, notification[:method] + assert_nil notification[:params] + end + + test "#notify_prompts_list_changed sends notification through transport" do + @server.notify_prompts_list_changed + + assert_equal 1, @mock_transport.notifications.size + notification = @mock_transport.notifications.first + assert_equal Methods::NOTIFICATIONS_PROMPTS_LIST_CHANGED, notification[:method] + assert_nil notification[:params] + end + + test "#notify_resources_list_changed sends notification through transport" do + @server.notify_resources_list_changed + + assert_equal 1, @mock_transport.notifications.size + notification = @mock_transport.notifications.first + assert_equal Methods::NOTIFICATIONS_RESOURCES_LIST_CHANGED, notification[:method] + assert_nil notification[:params] + end + + test "notification methods work without transport" do + server_without_transport = Server.new(name: "test_server") + + # Should not raise any errors + assert_nothing_raised do + server_without_transport.notify_tools_list_changed + server_without_transport.notify_prompts_list_changed + server_without_transport.notify_resources_list_changed + end + end + + test "notification methods handle transport errors gracefully" do + # Create a transport that raises errors + error_transport = Class.new(MockTransport) do + def send_notification(method, params = nil) + raise StandardError, "Transport error" + end + end.new(@server) + + @server.transport = error_transport + + # Mock the exception reporter + expected_contexts = [ + { notification: "tools_list_changed" }, + { notification: "prompts_list_changed" }, + { notification: "resources_list_changed" }, + ] + + call_count = 0 + @server.configuration.exception_reporter.expects(:call).times(3).with do |exception, context| + assert_kind_of StandardError, exception + assert_equal "Transport error", exception.message + assert_includes expected_contexts, context + call_count += 1 + true + end + + # Should not raise errors to the caller + assert_nothing_raised do + @server.notify_tools_list_changed + @server.notify_prompts_list_changed + @server.notify_resources_list_changed + end + + assert_equal 3, call_count + end + + test "multiple notification methods can be called in sequence" do + @server.notify_tools_list_changed + @server.notify_prompts_list_changed + @server.notify_resources_list_changed + + assert_equal 3, @mock_transport.notifications.size + + notifications = @mock_transport.notifications + assert_equal Methods::NOTIFICATIONS_TOOLS_LIST_CHANGED, notifications[0][:method] + assert_equal Methods::NOTIFICATIONS_PROMPTS_LIST_CHANGED, notifications[1][:method] + assert_equal Methods::NOTIFICATIONS_RESOURCES_LIST_CHANGED, notifications[2][:method] + end + end +end diff --git a/test/mcp/transport_test.rb b/test/mcp/transport_test.rb new file mode 100644 index 0000000..dc94881 --- /dev/null +++ b/test/mcp/transport_test.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +require "test_helper" +require "mcp/server" +require "mcp/transport" + +module MCP + class TransportTest < ActiveSupport::TestCase + class TestTransport < Transport + def handle_request(request) + [200, {}, ["OK"]] + end + + def send_request(method, params = nil) + true + end + + def close + true + end + end + + setup do + @server = Server.new( + name: "test_server", + tools: [], + prompts: [], + resources: [], + ) + @transport = TestTransport.new(@server) + end + + test "initializes with server instance" do + assert_equal @server, @transport.instance_variable_get(:@server) + end + + test "handles request" do + response = @transport.handle_request(nil) + assert_equal [200, {}, ["OK"]], response + end + + test "sends request" do + assert @transport.send_request("test_method", { foo: "bar" }) + end + + test "closes connection" do + assert @transport.close + end + end +end