77from mcp import StdioServerParameters
88
99from mcpadapt .core import MCPAdapt , ToolAdapter
10+ from tests ._server_utils import launch_mcp_server , terminate_mcp_server
1011
1112
1213class DummyAdapter (ToolAdapter ):
@@ -81,9 +82,12 @@ def echo_tool(text: str) -> str:
8182def echo_server_sse_script ():
8283 return dedent (
8384 '''
85+ import os
8486 from mcp.server.fastmcp import FastMCP
8587
86- mcp = FastMCP("Echo Server", host="127.0.0.1", port=8000)
88+ port = int(os.environ.get("MCP_TEST_PORT", "8000"))
89+
90+ mcp = FastMCP("Echo Server", host="127.0.0.1", port=port)
8791
8892 @mcp.tool()
8993 def echo_tool(text: str) -> str:
@@ -97,31 +101,27 @@ def echo_tool(text: str) -> str:
97101
98102@pytest .fixture
99103async def echo_sse_server (echo_server_sse_script ):
100- import subprocess
101-
102- # Start the SSE server process with its own process group
103- process = subprocess .Popen (
104- ["python" , "-c" , echo_server_sse_script ],
105- )
106-
107- # Give the server a moment to start up
108- time .sleep (1 )
104+ try :
105+ process , port = launch_mcp_server (echo_server_sse_script )
106+ except RuntimeError as exc :
107+ pytest .skip (str (exc ))
109108
110109 try :
111- yield {"url" : "http://127.0.0.1:8000 /sse" }
110+ yield {"url" : f "http://127.0.0.1:{ port } /sse" }
112111 finally :
113- # Clean up the process when test is done
114- process .kill ()
115- process .wait ()
112+ terminate_mcp_server (process )
116113
117114
118115@pytest .fixture
119116def echo_server_streamable_http_script ():
120117 return dedent (
121118 '''
119+ import os
122120 from mcp.server.fastmcp import FastMCP
123121
124- mcp = FastMCP("Echo Server", host="127.0.0.1", port=8000, stateless_http=True, json_response=True)
122+ port = int(os.environ.get("MCP_TEST_PORT", "8000"))
123+
124+ mcp = FastMCP("Echo Server", host="127.0.0.1", port=port, stateless_http=True, json_response=True)
125125
126126 @mcp.tool()
127127 def echo_tool(text: str) -> str:
@@ -135,22 +135,15 @@ def echo_tool(text: str) -> str:
135135
136136@pytest .fixture
137137async def echo_streamable_http_server (echo_server_streamable_http_script ):
138- import subprocess
139-
140- # Start the SSE server process with its own process group
141- process = subprocess .Popen (
142- ["python" , "-c" , echo_server_streamable_http_script ],
143- )
144-
145- # Give the server a moment to start up
146- time .sleep (1 )
138+ try :
139+ process , port = launch_mcp_server (echo_server_streamable_http_script )
140+ except RuntimeError as exc :
141+ pytest .skip (str (exc ))
147142
148143 try :
149- yield {"url" : "http://127.0.0.1:8000 /mcp" , "transport" : "streamable-http" }
144+ yield {"url" : f "http://127.0.0.1:{ port } /mcp" , "transport" : "streamable-http" }
150145 finally :
151- # Clean up the process when test is done
152- process .kill ()
153- process .wait ()
146+ terminate_mcp_server (process )
154147
155148
156149@pytest .fixture
0 commit comments