mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix(agent): exorcise data race haunting contextConfigAPI on reconnect (#23946)
Fixes: coder/internal#1441 - Move `contextConfigAPI` init from `handleManifest` to `init()`, matching all other API fields - Change `agentcontextconfig.NewAPI` to accept `func() string` closure (lazy directory evaluation) - `Config()` and HTTP handler now compute on demand via `a.manifest.Load().Directory` - Widen `TestAgent_Reconnect` to loop 5 reconnections with a non-empty manifest directory - Add `TestContextConfigAPI_InitOnce` internal test verifying lazy eval across manifest changes - Add `TestNewAPI_LazyDirectory` unit test for the lazy contract > 🤖 Written by a Coder Agent. Reviewed by a human.
This commit is contained in:
+6
-6
@@ -403,6 +403,12 @@ func (a *agent) init() {
|
||||
a.desktopAPI = agentdesktop.NewAPI(a.logger.Named("desktop"), desktop, a.clock)
|
||||
a.mcpManager = agentmcp.NewManager(a.logger.Named("mcp"))
|
||||
a.mcpAPI = agentmcp.NewAPI(a.logger.Named("mcp"), a.mcpManager)
|
||||
a.contextConfigAPI = agentcontextconfig.NewAPI(func() string {
|
||||
if m := a.manifest.Load(); m != nil {
|
||||
return m.Directory
|
||||
}
|
||||
return ""
|
||||
})
|
||||
a.reconnectingPTYServer = reconnectingpty.NewServer(
|
||||
a.logger.Named("reconnecting-pty"),
|
||||
a.sshServer,
|
||||
@@ -1265,12 +1271,6 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
|
||||
return xerrors.Errorf("update workspace agent startup: %w", err)
|
||||
}
|
||||
|
||||
// Initialize the context config API with the expanded
|
||||
// working directory so that it is ready before the HTTP
|
||||
// handler is created (which happens after manifestOK).
|
||||
a.contextConfigAPI = agentcontextconfig.NewAPI(
|
||||
manifest.Directory,
|
||||
)
|
||||
oldManifest := a.manifest.Swap(&manifest)
|
||||
manifestOK.complete(nil)
|
||||
sentResult = true
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -8,10 +10,22 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"cdr.dev/slog/v3/sloggers/slogtest"
|
||||
"github.com/coder/coder/v2/agent/agentcontextconfig"
|
||||
"github.com/coder/coder/v2/agent/proto"
|
||||
agentsdk "github.com/coder/coder/v2/codersdk/agentsdk"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
// platformAbsPath constructs an absolute path that is valid
|
||||
// on the current platform. On Windows, paths must include a
|
||||
// drive letter to be considered absolute.
|
||||
func platformAbsPath(parts ...string) string {
|
||||
if runtime.GOOS == "windows" {
|
||||
return `C:\` + filepath.Join(parts...)
|
||||
}
|
||||
return "/" + filepath.Join(parts...)
|
||||
}
|
||||
|
||||
// TestReportConnectionEmpty tests that reportConnection() doesn't choke if given an empty IP string, which is what we
|
||||
// send if we cannot get the remote address.
|
||||
func TestReportConnectionEmpty(t *testing.T) {
|
||||
@@ -42,3 +56,41 @@ func TestReportConnectionEmpty(t *testing.T) {
|
||||
require.Equal(t, proto.Connection_DISCONNECT, req1.GetConnection().GetAction())
|
||||
require.Equal(t, "because", req1.GetConnection().GetReason())
|
||||
}
|
||||
|
||||
func TestContextConfigAPI_InitOnce(t *testing.T) {
|
||||
// Not parallel: uses t.Setenv to clear env vars.
|
||||
|
||||
// Clear env vars so defaults are used and the test is
|
||||
// hermetic regardless of the surrounding environment.
|
||||
t.Setenv(agentcontextconfig.EnvInstructionsDirs, "")
|
||||
t.Setenv(agentcontextconfig.EnvInstructionsFile, "")
|
||||
t.Setenv(agentcontextconfig.EnvSkillsDirs, "")
|
||||
t.Setenv(agentcontextconfig.EnvSkillMetaFile, "")
|
||||
t.Setenv(agentcontextconfig.EnvMCPConfigFiles, "")
|
||||
|
||||
// After the fix, contextConfigAPI is set once in init() and
|
||||
// never reassigned. Config() evaluates lazily via the
|
||||
// manifest, so there is no concurrent write to race with.
|
||||
dir1 := platformAbsPath("dir1")
|
||||
dir2 := platformAbsPath("dir2")
|
||||
|
||||
a := &agent{}
|
||||
a.manifest.Store(&agentsdk.Manifest{Directory: dir1})
|
||||
a.contextConfigAPI = agentcontextconfig.NewAPI(func() string {
|
||||
if m := a.manifest.Load(); m != nil {
|
||||
return m.Directory
|
||||
}
|
||||
return ""
|
||||
})
|
||||
|
||||
cfg1 := a.contextConfigAPI.Config()
|
||||
require.NotEmpty(t, cfg1.MCPConfigFiles)
|
||||
require.Contains(t, cfg1.MCPConfigFiles[0], dir1)
|
||||
|
||||
// Simulate manifest update on reconnection — no field
|
||||
// reassignment needed, the lazy closure picks it up.
|
||||
a.manifest.Store(&agentsdk.Manifest{Directory: dir2})
|
||||
cfg2 := a.contextConfigAPI.Config()
|
||||
require.NotEmpty(t, cfg2.MCPConfigFiles)
|
||||
require.Contains(t, cfg2.MCPConfigFiles[0], dir2)
|
||||
}
|
||||
|
||||
+15
-8
@@ -3007,7 +3007,7 @@ func TestAgent_Speedtest(t *testing.T) {
|
||||
|
||||
func TestAgent_Reconnect(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
logger := testutil.Logger(t)
|
||||
// After the agent is disconnected from a coordinator, it's supposed
|
||||
// to reconnect!
|
||||
@@ -3020,7 +3020,8 @@ func TestAgent_Reconnect(t *testing.T) {
|
||||
logger,
|
||||
agentID,
|
||||
agentsdk.Manifest{
|
||||
DERPMap: derpMap,
|
||||
DERPMap: derpMap,
|
||||
Directory: "/test/workspace",
|
||||
},
|
||||
statsCh,
|
||||
fCoordinator,
|
||||
@@ -3033,13 +3034,19 @@ func TestAgent_Reconnect(t *testing.T) {
|
||||
})
|
||||
defer closer.Close()
|
||||
|
||||
call1 := testutil.RequireReceive(ctx, t, fCoordinator.CoordinateCalls)
|
||||
require.Equal(t, client.GetNumRefreshTokenCalls(), 1)
|
||||
close(call1.Resps) // hang up
|
||||
// expect reconnect
|
||||
// Each iteration forces the agent to reconnect by closing
|
||||
// the current coordinate call while the tracked HTTP server
|
||||
// goroutine (from connection 1's createTailnet) is still
|
||||
// alive, widening the race window.
|
||||
const reconnections = 5
|
||||
for i := range reconnections {
|
||||
call := testutil.RequireReceive(ctx, t, fCoordinator.CoordinateCalls)
|
||||
require.Equal(t, i+1, client.GetNumRefreshTokenCalls())
|
||||
close(call.Resps) // hang up — triggers reconnect
|
||||
}
|
||||
// Verify final reconnect succeeds.
|
||||
testutil.RequireReceive(ctx, t, fCoordinator.CoordinateCalls)
|
||||
// Check that the agent refreshes the token when it reconnects.
|
||||
require.Equal(t, client.GetNumRefreshTokenCalls(), 2)
|
||||
require.Equal(t, reconnections+1, client.GetNumRefreshTokenCalls())
|
||||
closer.Close()
|
||||
}
|
||||
|
||||
|
||||
@@ -29,16 +29,17 @@ const (
|
||||
// API exposes the resolved context configuration through the
|
||||
// agent's HTTP API.
|
||||
type API struct {
|
||||
config workspacesdk.ContextConfigResponse
|
||||
workingDir func() string
|
||||
}
|
||||
|
||||
// NewAPI reads context configuration from environment variables,
|
||||
// resolves all paths relative to workingDir, and returns an API
|
||||
// handler that serves the result.
|
||||
func NewAPI(workingDir string) *API {
|
||||
return &API{
|
||||
config: Config(workingDir),
|
||||
// NewAPI accepts a closure that returns the working directory.
|
||||
// The directory is evaluated lazily on each call to Config(),
|
||||
// so the caller can update it after construction.
|
||||
func NewAPI(workingDir func() string) *API {
|
||||
if workingDir == nil {
|
||||
workingDir = func() string { return "" }
|
||||
}
|
||||
return &API{workingDir: workingDir}
|
||||
}
|
||||
|
||||
// Config reads env vars and resolves paths. Exported for use
|
||||
@@ -67,7 +68,7 @@ func Config(workingDir string) workspacesdk.ContextConfigResponse {
|
||||
// Config returns the resolved config for use by other agent
|
||||
// components (e.g. MCP manager).
|
||||
func (api *API) Config() workspacesdk.ContextConfigResponse {
|
||||
return api.config
|
||||
return Config(api.workingDir())
|
||||
}
|
||||
|
||||
// Routes returns the HTTP handler for the context config
|
||||
@@ -79,5 +80,5 @@ func (api *API) Routes() http.Handler {
|
||||
}
|
||||
|
||||
func (api *API) handleGet(rw http.ResponseWriter, r *http.Request) {
|
||||
httpapi.Write(r.Context(), rw, http.StatusOK, api.config)
|
||||
httpapi.Write(r.Context(), rw, http.StatusOK, api.Config())
|
||||
}
|
||||
|
||||
@@ -93,3 +93,25 @@ func TestConfig(t *testing.T) {
|
||||
require.Equal(t, []string{a, b}, cfg.InstructionsDirs)
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewAPI_LazyDirectory(t *testing.T) {
|
||||
t.Setenv(agentcontextconfig.EnvInstructionsDirs, "")
|
||||
t.Setenv(agentcontextconfig.EnvInstructionsFile, "")
|
||||
t.Setenv(agentcontextconfig.EnvSkillsDirs, "")
|
||||
t.Setenv(agentcontextconfig.EnvSkillMetaFile, "")
|
||||
t.Setenv(agentcontextconfig.EnvMCPConfigFiles, "")
|
||||
|
||||
dir := ""
|
||||
api := agentcontextconfig.NewAPI(func() string { return dir })
|
||||
|
||||
// Before directory is set, relative paths resolve to nothing.
|
||||
cfg := api.Config()
|
||||
require.Empty(t, cfg.SkillsDirs)
|
||||
require.Empty(t, cfg.MCPConfigFiles)
|
||||
|
||||
// After setting the directory, Config() picks it up lazily.
|
||||
dir = platformAbsPath("work")
|
||||
cfg = api.Config()
|
||||
require.NotEmpty(t, cfg.SkillsDirs)
|
||||
require.Equal(t, []string{filepath.Join(dir, ".agents", "skills")}, cfg.SkillsDirs)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user