Skip to content

Commit 62f1996

Browse files
authored
feat: streamable-http server support (#545)
1 parent d1c3d83 commit 62f1996

8 files changed

Lines changed: 250 additions & 39 deletions

File tree

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,14 @@ This creates a powerful **tool aggregation hub** where kubectl-ai acts as both:
430430
- **MCP Server**: Exposing kubectl tools to clients
431431
- **MCP Client**: Consuming tools from other MCP servers
432432

433+
To serve clients over HTTP using the streamable transport, run:
434+
435+
```bash
436+
kubectl-ai --mcp-server --mcp-server-mode streamable-http --http-port 9080
437+
```
438+
439+
This starts an MCP endpoint at `http://localhost:9080/mcp`.
440+
433441
The enhanced mode provides AI clients with access to both Kubernetes operations and general-purpose tools (filesystem, web search, databases, etc.) through a single MCP endpoint.
434442

435443
📖 **For detailed configuration, examples, and troubleshooting, see the [MCP Server Documentation](./docs/mcp-server.md).**

cmd/main.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ type Options struct {
9898
MaxIterations int `json:"maxIterations,omitempty"`
9999
// MCPServerMode is the mode of the MCP server. only works with --mcp-server.
100100
MCPServerMode string `json:"mcpServerMode,omitempty"`
101-
// Set the SSEndpoint port for the MCP server. only works with --mcp-server and --mcp-server-mode=sse.
102-
SSEndpointPort int `json:"sseEndpointPort,omitempty"`
101+
// Set the HTTP endpoint port for the MCP server when using HTTP transports like SSE or streamable-http.
102+
HTTPPort int `json:"httpPort,omitempty"`
103103
// KubeConfigPath is the path to the kubeconfig file.
104104
// If not provided, the default kubeconfig path will be used.
105105
KubeConfigPath string `json:"kubeConfigPath,omitempty"`
@@ -166,9 +166,9 @@ func (o *Options) InitDefaults() {
166166
// Default to not skipping SSL verification
167167
o.SkipVerifySSL = false
168168
// Default MCP server mode is stdio
169-
o.MCPServerMode = "stdio"
170-
// Default port for SSE endpoint
171-
o.SSEndpointPort = 9080
169+
o.MCPServerMode = "stdio" // Default port for HTTP endpoint when using SSE or streamable-http modes
170+
// Default port for HTTP endpoint when using SSE or streamable-http modes
171+
o.HTTPPort = 9080
172172

173173
// Session management options
174174
o.ResumeSession = ""
@@ -307,8 +307,8 @@ func (opt *Options) bindCLIFlags(f *pflag.FlagSet) error {
307307
f.BoolVar(&opt.ExternalTools, "external-tools", opt.ExternalTools, "in MCP server mode, discover and expose external MCP tools")
308308
f.StringArrayVar(&opt.ToolConfigPaths, "custom-tools-config", opt.ToolConfigPaths, "path to custom tools config file or directory")
309309
f.BoolVar(&opt.MCPClient, "mcp-client", opt.MCPClient, "enable MCP client mode to connect to external MCP servers")
310-
f.StringVar(&opt.MCPServerMode, "mcp-server-mode", opt.MCPServerMode, "mode of the MCP server. Supported values: stdio, sse")
311-
f.IntVar(&opt.SSEndpointPort, "sse-endpoint-port", opt.SSEndpointPort, "port for the SSE endpoint in MCP server mode (only works with --mcp-server and --mcp-server-mode=sse)")
310+
f.StringVar(&opt.MCPServerMode, "mcp-server-mode", opt.MCPServerMode, "mode of the MCP server. Supported values: stdio, sse, streamable-http")
311+
f.IntVar(&opt.HTTPPort, "http-port", opt.HTTPPort, "port for the HTTP endpoint in MCP server mode (used with --mcp-server when --mcp-server-mode is sse or streamable-http)")
312312
f.BoolVar(&opt.EnableToolUseShim, "enable-tool-use-shim", opt.EnableToolUseShim, "enable tool use shim")
313313
f.BoolVar(&opt.Quiet, "quiet", opt.Quiet, "run in non-interactive mode, requires a query to be provided as a positional argument")
314314

@@ -670,7 +670,7 @@ func startMCPServer(ctx context.Context, opt Options) error {
670670
if err := os.MkdirAll(workDir, 0o755); err != nil {
671671
return fmt.Errorf("error creating work directory: %w", err)
672672
}
673-
mcpServer, err := newKubectlMCPServer(ctx, opt.KubeConfigPath, tools.Default(), workDir, opt.ExternalTools, opt.MCPServerMode, opt.SSEndpointPort)
673+
mcpServer, err := newKubectlMCPServer(ctx, opt.KubeConfigPath, tools.Default(), workDir, opt.ExternalTools, opt.MCPServerMode, opt.HTTPPort)
674674
if err != nil {
675675
return fmt.Errorf("creating mcp server: %w", err)
676676
}

cmd/mcp.go

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ type kubectlMCPServer struct {
3232
tools tools.Tools
3333
workDir string
3434
mcpManager *mcp.Manager // Add MCP manager for external tool calls
35-
mcpServerMode string // Server mode (e.g., "mcd", "sse")
36-
sseEndpoint int // SSE endpoint for server mode
35+
mcpServerMode string // Server mode (e.g., "streamable-http", "sse")
36+
httpPort int // Port for HTTP-based server modes
3737
}
3838

39-
func newKubectlMCPServer(ctx context.Context, kubectlConfig string, tools tools.Tools, workDir string, exposeExternalTools bool, serverMode string, sseEndpoint int) (*kubectlMCPServer, error) {
39+
func newKubectlMCPServer(ctx context.Context, kubectlConfig string, tools tools.Tools, workDir string, exposeExternalTools bool, serverMode string, httpPort int) (*kubectlMCPServer, error) {
4040
s := &kubectlMCPServer{
4141
kubectlConfig: kubectlConfig,
4242
workDir: workDir,
@@ -47,7 +47,7 @@ func newKubectlMCPServer(ctx context.Context, kubectlConfig string, tools tools.
4747
),
4848
tools: tools,
4949
mcpServerMode: serverMode,
50-
sseEndpoint: sseEndpoint,
50+
httpPort: httpPort,
5151
}
5252

5353
// Add built-in tools
@@ -95,15 +95,8 @@ func newKubectlMCPServer(ctx context.Context, kubectlConfig string, tools tools.
9595
klog.V(2).Infof("Processing tools from MCP server %s: %d tools found", serverName, len(tools))
9696

9797
for _, tool := range tools {
98-
// Create unique tool name to avoid conflicts between servers
99-
uniqueToolName := tool.Name
100-
if tool.Server != "" {
101-
// If tool already has server info, use it as-is
102-
uniqueToolName = tool.Name
103-
} else {
104-
// Add server prefix to avoid name conflicts
105-
uniqueToolName = fmt.Sprintf("%s_%s", serverName, tool.Name)
106-
}
98+
// Create unique tool name to avoid conflicts with built-in tools or from other servers
99+
uniqueToolName := fmt.Sprintf("%s_%s", serverName, tool.Name)
107100

108101
// Use the actual tool schema instead of creating a generic wrapper
109102
var schema *gollm.FunctionDefinition
@@ -170,16 +163,24 @@ func (s *kubectlMCPServer) Serve(ctx context.Context) error {
170163

171164
klog.Info("Starting kubectl-ai MCP server")
172165

173-
if s.mcpServerMode == "sse" {
166+
switch s.mcpServerMode {
167+
case "sse":
174168
// Start the server in SSE mode
175-
klog.Infof("Starting MCP server in SSE mode on endpoint %d", s.sseEndpoint)
169+
klog.Infof("Starting MCP server in SSE mode on port %d", s.httpPort)
176170
sseServer := server.NewSSEServer(s.server)
177-
endpoint := fmt.Sprintf(":%d", s.sseEndpoint)
178-
klog.Infof("Listening for SSE connections on port %d", s.sseEndpoint)
171+
endpoint := fmt.Sprintf(":%d", s.httpPort)
172+
klog.Infof("Listening for SSE connections on port %d", s.httpPort)
179173
return sseServer.Start(endpoint)
174+
case "streamable-http":
175+
// Start the server in streamable HTTP mode
176+
klog.Infof("Starting MCP server in streamable HTTP mode on port %d", s.httpPort)
177+
httpServer := server.NewStreamableHTTPServer(s.server)
178+
endpoint := fmt.Sprintf(":%d", s.httpPort)
179+
klog.Infof("Listening for streamable HTTP connections on port %d", s.httpPort)
180+
return httpServer.Start(endpoint)
181+
default:
182+
return server.ServeStdio(s.server)
180183
}
181-
182-
return server.ServeStdio(s.server)
183184
}
184185

185186
func (s *kubectlMCPServer) handleToolCall(ctx context.Context, request mcpgo.CallToolRequest) (*mcpgo.CallToolResult, error) {
@@ -285,14 +286,8 @@ func (s *kubectlMCPServer) handleExternalMCPToolCall(ctx context.Context, reques
285286
// Look for the tool by checking both original name and server-prefixed name
286287
for serverName, tools := range serverTools {
287288
for _, tool := range tools {
288-
// Check if this matches the requested tool (either direct name or server-prefixed name)
289-
uniqueToolName := tool.Name
290-
if tool.Server == "" {
291-
// Add server prefix to match what was registered
292-
uniqueToolName = fmt.Sprintf("%s_%s", serverName, tool.Name)
293-
}
294-
295-
if uniqueToolName == toolName || tool.Name == toolName {
289+
uniqueToolName := fmt.Sprintf("%s_%s", serverName, tool.Name)
290+
if uniqueToolName == toolName {
296291
targetServerName = serverName
297292
originalToolName = tool.Name // Use the original tool name for the MCP call
298293
break

cmd/mcp_test.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"net"
21+
"testing"
22+
"time"
23+
24+
"github.com/GoogleCloudPlatform/kubectl-ai/gollm"
25+
"github.com/GoogleCloudPlatform/kubectl-ai/pkg/mcp"
26+
"github.com/GoogleCloudPlatform/kubectl-ai/pkg/tools"
27+
)
28+
29+
func TestKubectlMCPServerHTTPClientIntegration(t *testing.T) {
30+
ctx, cancel := context.WithCancel(context.Background())
31+
defer cancel()
32+
33+
toolset := tools.Tools{}
34+
toolset.Init()
35+
toolset.RegisterTool(&stubTool{})
36+
37+
port := getFreePort(t)
38+
39+
workDir := t.TempDir()
40+
41+
server, err := newKubectlMCPServer(ctx, "", toolset, workDir, false, "streamable-http", port)
42+
if err != nil {
43+
t.Fatalf("failed to create MCP server: %v", err)
44+
}
45+
46+
serverErr := make(chan error, 1)
47+
go func() {
48+
serverErr <- server.Serve(ctx)
49+
}()
50+
51+
waitForHTTPServer(t, port)
52+
53+
select {
54+
case err := <-serverErr:
55+
if err != nil {
56+
t.Fatalf("server exited early: %v", err)
57+
}
58+
default:
59+
}
60+
61+
clientConfig := mcp.ClientConfig{
62+
Name: "test-client",
63+
URL: fmt.Sprintf("http://127.0.0.1:%d/mcp", port),
64+
UseStreaming: true,
65+
Timeout: 5,
66+
}
67+
68+
client := mcp.NewClient(clientConfig)
69+
70+
connectCtx, connectCancel := context.WithTimeout(ctx, 5*time.Second)
71+
defer connectCancel()
72+
73+
t.Log("connecting client")
74+
75+
if err := client.Connect(connectCtx); err != nil {
76+
t.Fatalf("failed to connect client to MCP server: %v", err)
77+
}
78+
defer func() {
79+
if err := client.Close(); err != nil {
80+
t.Errorf("failed to close MCP client: %v", err)
81+
}
82+
}()
83+
84+
toolsCtx, toolsCancel := context.WithTimeout(ctx, 5*time.Second)
85+
defer toolsCancel()
86+
87+
t.Log("listing tools")
88+
89+
availableTools, err := client.ListTools(toolsCtx)
90+
if err != nil {
91+
t.Fatalf("failed to list tools from MCP server: %v", err)
92+
}
93+
94+
t.Logf("retrieved %d tool(s)", len(availableTools))
95+
96+
if !toolExists("stub", availableTools) {
97+
t.Fatalf("expected to find stub tool, got %v", availableTools)
98+
}
99+
100+
cancel()
101+
102+
select {
103+
case <-serverErr:
104+
case <-time.After(500 * time.Millisecond):
105+
}
106+
}
107+
108+
func waitForHTTPServer(t *testing.T, port int) {
109+
t.Helper()
110+
111+
deadline := time.Now().Add(5 * time.Second)
112+
address := fmt.Sprintf("127.0.0.1:%d", port)
113+
114+
for {
115+
conn, err := net.DialTimeout("tcp", address, 100*time.Millisecond)
116+
if err == nil {
117+
conn.Close()
118+
return
119+
}
120+
if time.Now().After(deadline) {
121+
t.Fatalf("server did not start listening on %s: %v", address, err)
122+
}
123+
time.Sleep(50 * time.Millisecond)
124+
}
125+
}
126+
127+
func getFreePort(t *testing.T) int {
128+
t.Helper()
129+
130+
l, err := net.Listen("tcp", "127.0.0.1:0")
131+
if err != nil {
132+
t.Fatalf("failed to acquire free port: %v", err)
133+
}
134+
defer l.Close()
135+
136+
return l.Addr().(*net.TCPAddr).Port
137+
}
138+
139+
func toolExists(name string, tools []mcp.Tool) bool {
140+
for _, tool := range tools {
141+
if tool.Name == name {
142+
return true
143+
}
144+
}
145+
return false
146+
}
147+
148+
type stubTool struct{}
149+
150+
func (stubTool) Name() string {
151+
return "stub"
152+
}
153+
154+
func (stubTool) Description() string {
155+
return "stub tool"
156+
}
157+
158+
func (stubTool) FunctionDefinition() *gollm.FunctionDefinition {
159+
return &gollm.FunctionDefinition{
160+
Name: "stub",
161+
Description: "stub tool",
162+
Parameters: &gollm.Schema{
163+
Type: gollm.TypeObject,
164+
},
165+
}
166+
}
167+
168+
func (stubTool) Run(context.Context, map[string]any) (any, error) {
169+
return "ok", nil
170+
}
171+
172+
func (stubTool) IsInteractive(map[string]any) (bool, error) {
173+
return false, nil
174+
}
175+
176+
func (stubTool) CheckModifiesResource(map[string]any) string {
177+
return "no"
178+
}

docs/mcp-server.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ Start the MCP server with external MCP tool discovery enabled:
2323
kubectl-ai --mcp-server --external-tools
2424
```
2525

26+
### Expose an HTTP endpoint for MCP clients
27+
28+
Run the server with the streamable HTTP transport to serve compatible MCP clients (including kubectl-ai's MCP client mode) over HTTP:
29+
30+
```bash
31+
kubectl-ai --mcp-server --mcp-server-mode streamable-http --http-port 9080
32+
```
33+
34+
This listens on `http://localhost:9080/mcp` by default. Use `--mcp-server-mode sse` for legacy HTTP+SSE clients.
35+
2636
## Configuration
2737

2838
The enhanced MCP server will automatically discover and expose tools from configured MCP servers when `--external-tools` is enabled. Configure MCP servers using the standard MCP client configuration.
@@ -122,6 +132,8 @@ Additional tools available depend on configured MCP servers:
122132
| `--mcp-server` | `false` | Run in MCP server mode |
123133
| `--external-tools` | `false` | Discover and expose external MCP tools (requires --mcp-server) |
124134
| `--kubeconfig` | `~/.kube/config` | Path to kubeconfig file |
135+
| `--mcp-server-mode` | `stdio` | Transport for the MCP server (`stdio`, `sse`, or `streamable-http`) |
136+
| `--http-port` | `9080` | Port for the HTTP endpoint when using `sse` or `streamable-http` modes |
125137

126138
## Architecture
127139

pkg/agent/mcp_client.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,14 @@ func (a *Agent) InitializeMCPClient(ctx context.Context) error {
4242
return err
4343
}
4444

45-
// Create and register MCP tool wrapper
45+
// Create an MCPTool wrapper first to get the unique name
4646
mcpTool := tools.NewMCPTool(serverName, toolInfo.Name, toolInfo.Description, schema, manager)
47+
48+
// Update schema with unique name and better description to avoid conflicts
49+
schema.Name = mcpTool.UniqueToolName()
50+
schema.Description = fmt.Sprintf("%s (from %s)", toolInfo.Description, serverName)
51+
52+
// Create and register MCP tool wrapper
4753
tools.RegisterTool(mcpTool)
4854
return nil
4955
})

pkg/tools/mcp_tool.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ func (t *MCPTool) Name() string {
6868
return t.toolName
6969
}
7070

71+
func (t *MCPTool) UniqueToolName() string {
72+
return fmt.Sprintf("%s_%s", t.serverName, t.toolName)
73+
}
74+
7175
// ServerName returns the MCP server name.
7276
func (t *MCPTool) ServerName() string {
7377
return t.serverName

0 commit comments

Comments
 (0)