diff --git a/agent/x/agentmcp/manager.go b/agent/x/agentmcp/manager.go index 23cba06c18..cd6a705151 100644 --- a/agent/x/agentmcp/manager.go +++ b/agent/x/agentmcp/manager.go @@ -975,15 +975,19 @@ func (m *Manager) createTransport(ctx context.Context, cfg ServerConfig) (transp }), ), nil case "http", "": - return transport.NewStreamableHTTP( - cfg.URL, - transport.WithHTTPHeaders(cfg.Headers), - ) + var opts []transport.StreamableHTTPCOption + opts = append(opts, transport.WithHTTPHeaders(cfg.Headers)) + if c := mcpHTTPClient(); c != nil { + opts = append(opts, transport.WithHTTPBasicClient(c)) + } + return transport.NewStreamableHTTP(cfg.URL, opts...) case "sse": - return transport.NewSSE( - cfg.URL, - transport.WithHeaders(cfg.Headers), - ) + var sseOpts []transport.ClientOption + sseOpts = append(sseOpts, transport.WithHeaders(cfg.Headers)) + if c := mcpHTTPClient(); c != nil { + sseOpts = append(sseOpts, transport.WithHTTPClient(c)) + } + return transport.NewSSE(cfg.URL, sseOpts...) default: return nil, xerrors.Errorf("unsupported transport %q", cfg.Transport) } diff --git a/agent/x/agentmcp/mcphttpclient.go b/agent/x/agentmcp/mcphttpclient.go new file mode 100644 index 0000000000..7099c44281 --- /dev/null +++ b/agent/x/agentmcp/mcphttpclient.go @@ -0,0 +1,25 @@ +package agentmcp + +import ( + "net/http" + "testing" +) + +// mcpHTTPClient returns an isolated *http.Client when running +// inside tests, or nil for production. During tests, +// httptest.Server.Close() calls +// http.DefaultTransport.CloseIdleConnections(), which disrupts +// any MCP client sharing that transport. When DefaultTransport +// is a *http.Transport it is cloned; otherwise a minimal +// transport with ProxyFromEnvironment is created as a fallback. +func mcpHTTPClient() *http.Client { + if !testing.Testing() { + return nil + } + if dt, ok := http.DefaultTransport.(*http.Transport); ok { + return &http.Client{Transport: dt.Clone()} + } + return &http.Client{Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + }} +} diff --git a/aibridge/mcp/mcphttpclient.go b/aibridge/mcp/mcphttpclient.go new file mode 100644 index 0000000000..1685bcf795 --- /dev/null +++ b/aibridge/mcp/mcphttpclient.go @@ -0,0 +1,25 @@ +package mcp + +import ( + "net/http" + "testing" +) + +// mcpHTTPClient returns an isolated *http.Client when running +// inside tests, or nil for production. During tests, +// httptest.Server.Close() calls +// http.DefaultTransport.CloseIdleConnections(), which disrupts +// any MCP client sharing that transport. When DefaultTransport +// is a *http.Transport it is cloned; otherwise a minimal +// transport with ProxyFromEnvironment is created as a fallback. +func mcpHTTPClient() *http.Client { + if !testing.Testing() { + return nil + } + if dt, ok := http.DefaultTransport.(*http.Transport); ok { + return &http.Client{Transport: dt.Clone()} + } + return &http.Client{Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + }} +} diff --git a/aibridge/mcp/proxy_streamable_http.go b/aibridge/mcp/proxy_streamable_http.go index 132c03965a..108a710d19 100644 --- a/aibridge/mcp/proxy_streamable_http.go +++ b/aibridge/mcp/proxy_streamable_http.go @@ -39,6 +39,17 @@ func NewStreamableHTTPServerProxy(serverName, serverURL string, headers map[stri opts = append(opts, transport.WithHTTPHeaders(headers)) } + // Prepend an isolated HTTP client when running in tests so + // httptest.Server.Close() does not disrupt this proxy's + // connections via http.DefaultTransport.CloseIdleConnections(). + // Caller-provided WithHTTPBasicClient in opts overrides this + // (last-wins). + if c := mcpHTTPClient(); c != nil { + opts = append([]transport.StreamableHTTPCOption{ + transport.WithHTTPBasicClient(c), + }, opts...) + } + mcpClient, err := client.NewStreamableHttpClient(serverURL, opts...) if err != nil { return nil, xerrors.Errorf("create streamable http client: %w", err) diff --git a/coderd/mcp/mcp_e2e_test.go b/coderd/mcp/mcp_e2e_test.go index 5b374e36b8..633c68582a 100644 --- a/coderd/mcp/mcp_e2e_test.go +++ b/coderd/mcp/mcp_e2e_test.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "strings" + "sync/atomic" "testing" "github.com/google/uuid" @@ -57,11 +58,10 @@ func TestMCPHTTP_E2E_ClientIntegration(t *testing.T) { mcpURL := api.AccessURL.String() + mcpserver.MCPEndpoint // Configure client with authentication headers using RFC 6750 Bearer token - mcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + mcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + coderClient.SessionToken(), })) - require.NoError(t, err) defer func() { if closeErr := mcpClient.Close(); closeErr != nil { t.Logf("Failed to close MCP client: %v", closeErr) @@ -72,7 +72,7 @@ func TestMCPHTTP_E2E_ClientIntegration(t *testing.T) { defer cancel() // Start client - err = mcpClient.Start(ctx) + err := mcpClient.Start(ctx) require.NoError(t, err) // Initialize connection @@ -190,8 +190,7 @@ func TestMCPHTTP_E2E_UnauthenticatedAccess(t *testing.T) { require.Equal(t, http.StatusUnauthorized, resp.StatusCode, "Should get HTTP 401 for unauthenticated access") // Also test with MCP client to ensure it handles the error gracefully - mcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL) - require.NoError(t, err, "Should be able to create MCP client without authentication") + mcpClient := newIsolatedMCPClient(t, mcpURL) defer func() { if closeErr := mcpClient.Close(); closeErr != nil { t.Logf("Failed to close MCP client: %v", closeErr) @@ -245,11 +244,10 @@ func TestMCPHTTP_E2E_ToolWithWorkspace(t *testing.T) { coderdtest.NewWorkspaceAgentWaiter(t, coderClient, r.Workspace.ID).Wait() mcpURL := api.AccessURL.String() + mcpserver.MCPEndpoint - mcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + mcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + coderClient.SessionToken(), })) - require.NoError(t, err) defer func() { if closeErr := mcpClient.Close(); closeErr != nil { t.Logf("Failed to close MCP client: %v", closeErr) @@ -260,7 +258,7 @@ func TestMCPHTTP_E2E_ToolWithWorkspace(t *testing.T) { defer cancel() require.NoError(t, mcpClient.Start(ctx)) - _, err = mcpClient.Initialize(ctx, mcp.InitializeRequest{ + _, err := mcpClient.Initialize(ctx, mcp.InitializeRequest{ Params: mcp.InitializeParams{ ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION, ClientInfo: mcp.Implementation{ @@ -307,11 +305,10 @@ func TestMCPHTTP_E2E_ErrorHandling(t *testing.T) { // Create MCP client mcpURL := api.AccessURL.String() + mcpserver.MCPEndpoint - mcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + mcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + coderClient.SessionToken(), })) - require.NoError(t, err) defer func() { if closeErr := mcpClient.Close(); closeErr != nil { t.Logf("Failed to close MCP client: %v", closeErr) @@ -322,7 +319,7 @@ func TestMCPHTTP_E2E_ErrorHandling(t *testing.T) { defer cancel() // Start and initialize client - err = mcpClient.Start(ctx) + err := mcpClient.Start(ctx) require.NoError(t, err) initReq := mcp.InitializeRequest{ @@ -366,11 +363,10 @@ func TestMCPHTTP_E2E_ConcurrentRequests(t *testing.T) { // Create MCP client mcpURL := api.AccessURL.String() + mcpserver.MCPEndpoint - mcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + mcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + coderClient.SessionToken(), })) - require.NoError(t, err) defer func() { if closeErr := mcpClient.Close(); closeErr != nil { t.Logf("Failed to close MCP client: %v", closeErr) @@ -381,7 +377,7 @@ func TestMCPHTTP_E2E_ConcurrentRequests(t *testing.T) { defer cancel() // Start and initialize client - err = mcpClient.Start(ctx) + err := mcpClient.Start(ctx) require.NoError(t, err) initReq := mcp.InitializeRequest{ @@ -520,11 +516,10 @@ func TestMCPHTTP_E2E_OAuth2_EndToEnd(t *testing.T) { sessionToken := coderClient.SessionToken() mcpURL := api.AccessURL.String() + mcpserver.MCPEndpoint - mcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + mcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + sessionToken, })) - require.NoError(t, err) defer func() { if closeErr := mcpClient.Close(); closeErr != nil { t.Logf("Failed to close MCP client: %v", closeErr) @@ -669,11 +664,10 @@ func TestMCPHTTP_E2E_OAuth2_EndToEnd(t *testing.T) { // Step 3: Use access token to authenticate with MCP endpoint mcpURL := api.AccessURL.String() + mcpserver.MCPEndpoint - mcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + mcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + accessToken, })) - require.NoError(t, err) defer func() { if closeErr := mcpClient.Close(); closeErr != nil { t.Logf("Failed to close MCP client: %v", closeErr) @@ -762,11 +756,10 @@ func TestMCPHTTP_E2E_OAuth2_EndToEnd(t *testing.T) { t.Logf("Successfully refreshed token: %s...", newAccessToken[:10]) // Step 5: Use new access token to create another MCP connection - newMcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + newMcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + newAccessToken, })) - require.NoError(t, err) defer func() { if closeErr := newMcpClient.Close(); closeErr != nil { t.Logf("Failed to close new MCP client: %v", closeErr) @@ -990,11 +983,10 @@ func TestMCPHTTP_E2E_OAuth2_EndToEnd(t *testing.T) { t.Logf("Successfully obtained access token: %s...", accessToken[:10]) // Step 5: Use access token to get user information via MCP - mcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + mcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + accessToken, })) - require.NoError(t, err) defer func() { if closeErr := mcpClient.Close(); closeErr != nil { t.Logf("Failed to close MCP client: %v", closeErr) @@ -1088,11 +1080,10 @@ func TestMCPHTTP_E2E_OAuth2_EndToEnd(t *testing.T) { t.Logf("Successfully refreshed token: %s...", newAccessToken[:10]) // Step 7: Use refreshed token to get user information again via MCP - newMcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + newMcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + newAccessToken, })) - require.NoError(t, err) defer func() { if closeErr := newMcpClient.Close(); closeErr != nil { t.Logf("Failed to close new MCP client: %v", closeErr) @@ -1268,11 +1259,10 @@ func TestMCPHTTP_E2E_ChatGPTEndpoint(t *testing.T) { mcpURL := api.AccessURL.String() + mcpserver.MCPEndpoint + "?toolset=chatgpt" // Configure client with authentication headers using RFC 6750 Bearer token - mcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + mcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + coderClient.SessionToken(), })) - require.NoError(t, err) t.Cleanup(func() { if closeErr := mcpClient.Close(); closeErr != nil { t.Logf("Failed to close MCP client: %v", closeErr) @@ -1283,7 +1273,7 @@ func TestMCPHTTP_E2E_ChatGPTEndpoint(t *testing.T) { defer cancel() // Start client - err = mcpClient.Start(ctx) + err := mcpClient.Start(ctx) require.NoError(t, err) // Initialize connection @@ -1433,11 +1423,10 @@ func TestMCPHTTP_E2E_WorkspaceSSHAuthz(t *testing.T) { // Connect with the template-admin user. mcpURL := api.AccessURL.String() + mcpserver.MCPEndpoint - mcpClient, err := mcpclient.NewStreamableHttpClient(mcpURL, + mcpClient := newIsolatedMCPClient(t, mcpURL, transport.WithHTTPHeaders(map[string]string{ "Authorization": "Bearer " + tmplAdminClient.SessionToken(), })) - require.NoError(t, err) defer func() { _ = mcpClient.Close() }() @@ -1446,7 +1435,7 @@ func TestMCPHTTP_E2E_WorkspaceSSHAuthz(t *testing.T) { defer cancel() require.NoError(t, mcpClient.Start(ctx)) - _, err = mcpClient.Initialize(ctx, mcp.InitializeRequest{ + _, err := mcpClient.Initialize(ctx, mcp.InitializeRequest{ Params: mcp.InitializeParams{ ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION, ClientInfo: mcp.Implementation{ @@ -1489,3 +1478,91 @@ func mustParseURL(t *testing.T, rawURL string) *url.URL { require.NoError(t, err, "Failed to parse URL %q", rawURL) return u } + +// newIsolatedMCPClient creates a streamable HTTP MCP client that uses +// an isolated http.Transport cloned from http.DefaultTransport. +// This prevents httptest.Server.Close() (which calls +// http.DefaultTransport.CloseIdleConnections()) from disrupting the +// client's connections during parallel tests. +func newIsolatedMCPClient(t *testing.T, mcpURL string, opts ...transport.StreamableHTTPCOption) *mcpclient.Client { + t.Helper() + isolated := coderdtest.NewIsolatedHTTPClient(nil) + opts = append([]transport.StreamableHTTPCOption{transport.WithHTTPBasicClient(isolated)}, opts...) + client, err := mcpclient.NewStreamableHttpClient(mcpURL, opts...) + require.NoError(t, err) + return client +} + +// sentinelTransport wraps an http.RoundTripper and counts how many +// requests flow through it. Used as a test sentinel to verify +// whether a client is (or is not) using http.DefaultTransport. +type sentinelTransport struct { + inner http.RoundTripper + hits atomic.Int64 +} + +func (s *sentinelTransport) RoundTrip(req *http.Request) (*http.Response, error) { + s.hits.Add(1) + return s.inner.RoundTrip(req) +} + +// TestMCPHTTP_E2E_TransportIsolation verifies that the +// newIsolatedMCPClient helper creates clients that do NOT route +// requests through http.DefaultTransport, while raw +// mcpclient.NewStreamableHttpClient (without explicit +// WithHTTPBasicClient) does use it. +// +//nolint:paralleltest // Mutates http.DefaultTransport. +func TestMCPHTTP_E2E_TransportIsolation(t *testing.T) { + // Replace DefaultTransport with a counting sentinel. + original := http.DefaultTransport + sentinel := &sentinelTransport{inner: original} + http.DefaultTransport = sentinel + t.Cleanup(func() { http.DefaultTransport = original }) + + coderClient, closer, api := coderdtest.NewWithAPI(t, nil) + t.Cleanup(func() { closer.Close() }) + _ = coderdtest.CreateFirstUser(t, coderClient) + + mcpURL := api.AccessURL.String() + mcpserver.MCPEndpoint + authOpt := transport.WithHTTPHeaders(map[string]string{ + "Authorization": "Bearer " + coderClient.SessionToken(), + }) + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) + defer cancel() + + initReq := mcp.InitializeRequest{ + Params: mcp.InitializeParams{ + ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION, + ClientInfo: mcp.Implementation{Name: "sentinel-test", Version: "1.0.0"}, + }, + } + + t.Run("RawClientUsesDefaultTransport", func(t *testing.T) { + sentinel.hits.Store(0) + rawClient, err := mcpclient.NewStreamableHttpClient(mcpURL, authOpt) + require.NoError(t, err) + defer func() { _ = rawClient.Close() }() + + require.NoError(t, rawClient.Start(ctx)) + _, err = rawClient.Initialize(ctx, initReq) + require.NoError(t, err) + + require.Greater(t, sentinel.hits.Load(), int64(0), + "raw client should route requests through http.DefaultTransport") + }) + + t.Run("IsolatedClientBypassesDefaultTransport", func(t *testing.T) { + sentinel.hits.Store(0) + isoClient := newIsolatedMCPClient(t, mcpURL, authOpt) + defer func() { _ = isoClient.Close() }() + + require.NoError(t, isoClient.Start(ctx)) + _, err := isoClient.Initialize(ctx, initReq) + require.NoError(t, err) + + require.Equal(t, int64(0), sentinel.hits.Load(), + "isolated client must NOT route requests through http.DefaultTransport") + }) +} diff --git a/coderd/x/chatd/mcpclient/mcpclient.go b/coderd/x/chatd/mcpclient/mcpclient.go index 16ef5ed9fd..cb7e0322c2 100644 --- a/coderd/x/chatd/mcpclient/mcpclient.go +++ b/coderd/x/chatd/mcpclient/mcpclient.go @@ -285,31 +285,24 @@ func createTransport( cfg database.MCPServerConfig, headers map[string]string, ) (transport.Interface, error) { - // Each connection gets its own HTTP client with a dedicated - // transport so that httptest.Server.Close() (which calls - // CloseIdleConnections on http.DefaultTransport) does not - // disrupt unrelated connections during parallel tests. - var httpClient *http.Client - if dt, ok := http.DefaultTransport.(*http.Transport); ok { - httpClient = &http.Client{Transport: dt.Clone()} - } else { - httpClient = &http.Client{} - } + httpClient := mcpHTTPClient() switch cfg.Transport { case "sse": - return transport.NewSSE( - cfg.Url, - transport.WithHeaders(headers), - transport.WithHTTPClient(httpClient), - ) + var opts []transport.ClientOption + opts = append(opts, transport.WithHeaders(headers)) + if httpClient != nil { + opts = append(opts, transport.WithHTTPClient(httpClient)) + } + return transport.NewSSE(cfg.Url, opts...) case "", "streamable_http": // Default to streamable HTTP, the newer transport. - return transport.NewStreamableHTTP( - cfg.Url, - transport.WithHTTPHeaders(headers), - transport.WithHTTPBasicClient(httpClient), - ) + var opts []transport.StreamableHTTPCOption + opts = append(opts, transport.WithHTTPHeaders(headers)) + if httpClient != nil { + opts = append(opts, transport.WithHTTPBasicClient(httpClient)) + } + return transport.NewStreamableHTTP(cfg.Url, opts...) default: return nil, xerrors.Errorf( "unsupported transport %q", cfg.Transport, diff --git a/coderd/x/chatd/mcpclient/mcphttpclient.go b/coderd/x/chatd/mcpclient/mcphttpclient.go new file mode 100644 index 0000000000..c34ff59262 --- /dev/null +++ b/coderd/x/chatd/mcpclient/mcphttpclient.go @@ -0,0 +1,25 @@ +package mcpclient + +import ( + "net/http" + "testing" +) + +// mcpHTTPClient returns an isolated *http.Client when running +// inside tests, or nil for production. During tests, +// httptest.Server.Close() calls +// http.DefaultTransport.CloseIdleConnections(), which disrupts +// any MCP client sharing that transport. When DefaultTransport +// is a *http.Transport it is cloned; otherwise a minimal +// transport with ProxyFromEnvironment is created as a fallback. +func mcpHTTPClient() *http.Client { + if !testing.Testing() { + return nil + } + if dt, ok := http.DefaultTransport.(*http.Transport); ok { + return &http.Client{Transport: dt.Clone()} + } + return &http.Client{Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + }} +}