mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix(coderd/x/chatd): remove cache-miss check blocking agent recovery (#24634)
The cache-miss isAgentUnreachable check added in #24336 runs before dialWithLazyValidation, preventing the existing switch mechanism from discovering the new agent after a workspace rebuild. The chat's stale agent binding is never repaired, causing an infinite loop of 'agent is disconnected' errors. Remove the cache-miss check. The cache-hit check remains (it verifies the agent behind an established connection). The dial timeout and dialWithLazyValidation already bound the cache-miss failure path. Closes CODAGT-248
This commit is contained in:
committed by
GitHub
parent
514b4994c6
commit
1ace519c6e
@@ -614,13 +614,6 @@ func (c *turnWorkspaceContext) getWorkspaceConn(ctx context.Context) (workspaces
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Status check on cache miss: the freshly fetched
|
||||
// agent row may already show disconnected.
|
||||
if isAgentUnreachable(c.server.clock.Now(), agent, c.server.agentInactiveDisconnectTimeout) {
|
||||
c.clearCachedWorkspaceState()
|
||||
return nil, errChatAgentDisconnected
|
||||
}
|
||||
|
||||
// Wrap the dial in a timeout to bound the time spent
|
||||
// waiting for an unreachable agent. The timeout scopes
|
||||
// only dialWithLazyValidation, not ensureWorkspaceAgent
|
||||
|
||||
@@ -3709,34 +3709,239 @@ func TestSafeSweepIdleStreams_RecoversFromPanic(t *testing.T) {
|
||||
}, "safeSweepIdleStreams must recover panics so the janitor loop keeps running")
|
||||
}
|
||||
|
||||
func TestGetWorkspaceConn_StaleAgentRecovery(t *testing.T) {
|
||||
// Regression test: when a workspace is rebuilt, the chat's stored
|
||||
// agent ID points to a disconnected agent from the old build. The
|
||||
// cache-miss path must let dialWithLazyValidation discover the new
|
||||
// agent instead of rejecting the old one immediately.
|
||||
t.Parallel()
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
db := dbmock.NewMockStore(ctrl)
|
||||
|
||||
workspaceID := uuid.New()
|
||||
oldAgentID := uuid.New()
|
||||
newAgentID := uuid.New()
|
||||
buildID := uuid.New()
|
||||
|
||||
// Old agent: disconnected (from previous build).
|
||||
oldAgent := database.WorkspaceAgent{
|
||||
ID: oldAgentID,
|
||||
FirstConnectedAt: sql.NullTime{
|
||||
Time: time.Now().Add(-10 * time.Minute),
|
||||
Valid: true,
|
||||
},
|
||||
LastConnectedAt: sql.NullTime{
|
||||
Time: time.Now().Add(-10 * time.Minute),
|
||||
Valid: true,
|
||||
},
|
||||
DisconnectedAt: sql.NullTime{
|
||||
Time: time.Now().Add(-9 * time.Minute),
|
||||
Valid: true,
|
||||
},
|
||||
}
|
||||
|
||||
// New agent: connected (from latest build).
|
||||
newAgent := database.WorkspaceAgent{
|
||||
ID: newAgentID,
|
||||
Name: "main",
|
||||
FirstConnectedAt: sql.NullTime{
|
||||
Time: time.Now().Add(-1 * time.Minute),
|
||||
Valid: true,
|
||||
},
|
||||
LastConnectedAt: sql.NullTime{
|
||||
Time: time.Now(),
|
||||
Valid: true,
|
||||
},
|
||||
}
|
||||
|
||||
chat := database.Chat{
|
||||
ID: uuid.New(),
|
||||
WorkspaceID: uuid.NullUUID{
|
||||
UUID: workspaceID,
|
||||
Valid: true,
|
||||
},
|
||||
AgentID: uuid.NullUUID{
|
||||
UUID: oldAgentID,
|
||||
Valid: true,
|
||||
},
|
||||
}
|
||||
|
||||
// ensureWorkspaceAgent fetches the stale agent.
|
||||
db.EXPECT().GetWorkspaceAgentByID(gomock.Any(), oldAgentID).
|
||||
Return(oldAgent, nil).Times(1)
|
||||
// Lazy validation discovers the new agent.
|
||||
db.EXPECT().GetWorkspaceAgentsInLatestBuildByWorkspaceID(gomock.Any(), workspaceID).
|
||||
Return([]database.WorkspaceAgent{newAgent}, nil).Times(1)
|
||||
// Post-switch: persist the new binding.
|
||||
db.EXPECT().GetLatestWorkspaceBuildByWorkspaceID(gomock.Any(), workspaceID).
|
||||
Return(database.WorkspaceBuild{ID: buildID}, nil).Times(1)
|
||||
db.EXPECT().GetWorkspaceAgentByID(gomock.Any(), newAgentID).
|
||||
Return(newAgent, nil).Times(1)
|
||||
|
||||
updatedChat := chat
|
||||
updatedChat.AgentID = uuid.NullUUID{UUID: newAgentID, Valid: true}
|
||||
updatedChat.BuildID = uuid.NullUUID{UUID: buildID, Valid: true}
|
||||
db.EXPECT().UpdateChatBuildAgentBinding(gomock.Any(), database.UpdateChatBuildAgentBindingParams{
|
||||
ID: chat.ID,
|
||||
BuildID: uuid.NullUUID{UUID: buildID, Valid: true},
|
||||
AgentID: uuid.NullUUID{UUID: newAgentID, Valid: true},
|
||||
}).Return(updatedChat, nil).Times(1)
|
||||
|
||||
newConn := agentconnmock.NewMockAgentConn(ctrl)
|
||||
newConn.EXPECT().SetExtraHeaders(gomock.Any()).Times(1)
|
||||
|
||||
server := &Server{
|
||||
db: db,
|
||||
logger: slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}),
|
||||
clock: quartz.NewReal(),
|
||||
agentInactiveDisconnectTimeout: 30 * time.Second,
|
||||
dialTimeout: defaultDialTimeout,
|
||||
}
|
||||
server.agentConnFn = func(_ context.Context, id uuid.UUID) (workspacesdk.AgentConn, func(), error) {
|
||||
switch id {
|
||||
case oldAgentID:
|
||||
return nil, nil, xerrors.New("agent is not connected")
|
||||
case newAgentID:
|
||||
return newConn, func() {}, nil
|
||||
default:
|
||||
return nil, nil, xerrors.Errorf("unexpected agent ID: %s", id)
|
||||
}
|
||||
}
|
||||
|
||||
chatStateMu := &sync.Mutex{}
|
||||
currentChat := chat
|
||||
workspaceCtx := turnWorkspaceContext{
|
||||
server: server,
|
||||
chatStateMu: chatStateMu,
|
||||
currentChat: ¤tChat,
|
||||
loadChatSnapshot: func(context.Context, uuid.UUID) (database.Chat, error) {
|
||||
return database.Chat{}, nil
|
||||
},
|
||||
}
|
||||
defer workspaceCtx.close()
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitMedium)
|
||||
gotConn, err := workspaceCtx.getWorkspaceConn(ctx)
|
||||
require.NoError(t, err, "getWorkspaceConn should recover stale agent binding")
|
||||
require.Same(t, newConn, gotConn, "should return the connection to the new agent")
|
||||
|
||||
// Verify the cache was updated to the new agent so subsequent
|
||||
// cache-hit calls use the correct agent ID.
|
||||
workspaceCtx.mu.Lock()
|
||||
defer workspaceCtx.mu.Unlock()
|
||||
require.Equal(t, newAgentID, workspaceCtx.agent.ID, "cached agent should be the new agent")
|
||||
require.True(t, workspaceCtx.agentLoaded)
|
||||
require.Same(t, newConn, workspaceCtx.conn, "connection should be cached for subsequent calls")
|
||||
}
|
||||
|
||||
func TestGetWorkspaceConn_SameBuildAgentCrash(t *testing.T) {
|
||||
// When an agent crashes on the same build (disconnected, but still
|
||||
// in the latest build), dialWithLazyValidation dials, fails fast,
|
||||
// validation finds the same agent, and the retry also fails. The
|
||||
// wrapped dial error propagates (not errChatAgentDisconnected).
|
||||
t.Parallel()
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
db := dbmock.NewMockStore(ctrl)
|
||||
|
||||
workspaceID := uuid.New()
|
||||
agentID := uuid.New()
|
||||
|
||||
// Agent: disconnected (crashed on current build).
|
||||
agent := database.WorkspaceAgent{
|
||||
ID: agentID,
|
||||
Name: "main",
|
||||
FirstConnectedAt: sql.NullTime{
|
||||
Time: time.Now().Add(-10 * time.Minute),
|
||||
Valid: true,
|
||||
},
|
||||
LastConnectedAt: sql.NullTime{
|
||||
Time: time.Now().Add(-10 * time.Minute),
|
||||
Valid: true,
|
||||
},
|
||||
DisconnectedAt: sql.NullTime{
|
||||
Time: time.Now().Add(-9 * time.Minute),
|
||||
Valid: true,
|
||||
},
|
||||
}
|
||||
|
||||
chat := database.Chat{
|
||||
ID: uuid.New(),
|
||||
WorkspaceID: uuid.NullUUID{
|
||||
UUID: workspaceID,
|
||||
Valid: true,
|
||||
},
|
||||
AgentID: uuid.NullUUID{
|
||||
UUID: agentID,
|
||||
Valid: true,
|
||||
},
|
||||
}
|
||||
|
||||
// ensureWorkspaceAgent fetches the (crashed) agent.
|
||||
db.EXPECT().GetWorkspaceAgentByID(gomock.Any(), agentID).
|
||||
Return(agent, nil).Times(1)
|
||||
// Validation finds the same agent in the latest build.
|
||||
db.EXPECT().GetWorkspaceAgentsInLatestBuildByWorkspaceID(gomock.Any(), workspaceID).
|
||||
Return([]database.WorkspaceAgent{agent}, nil).Times(1)
|
||||
|
||||
dialErr := xerrors.New("agent is not connected")
|
||||
server := &Server{
|
||||
db: db,
|
||||
logger: slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}),
|
||||
clock: quartz.NewReal(),
|
||||
agentInactiveDisconnectTimeout: 30 * time.Second,
|
||||
dialTimeout: defaultDialTimeout,
|
||||
}
|
||||
server.agentConnFn = func(_ context.Context, _ uuid.UUID) (workspacesdk.AgentConn, func(), error) {
|
||||
return nil, nil, dialErr
|
||||
}
|
||||
|
||||
chatStateMu := &sync.Mutex{}
|
||||
currentChat := chat
|
||||
workspaceCtx := turnWorkspaceContext{
|
||||
server: server,
|
||||
chatStateMu: chatStateMu,
|
||||
currentChat: ¤tChat,
|
||||
loadChatSnapshot: func(context.Context, uuid.UUID) (database.Chat, error) {
|
||||
return database.Chat{}, nil
|
||||
},
|
||||
}
|
||||
defer workspaceCtx.close()
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitMedium)
|
||||
gotConn, err := workspaceCtx.getWorkspaceConn(ctx)
|
||||
require.Nil(t, gotConn)
|
||||
require.Error(t, err)
|
||||
// The error should be a wrapped dial error, not the
|
||||
// agent-disconnected sentinel.
|
||||
require.NotErrorIs(t, err, errChatAgentDisconnected)
|
||||
require.ErrorIs(t, err, dialErr)
|
||||
|
||||
// Cache should not have a connection, but the agent should
|
||||
// still be loaded (ensureWorkspaceAgent cached it).
|
||||
workspaceCtx.mu.Lock()
|
||||
defer workspaceCtx.mu.Unlock()
|
||||
require.True(t, workspaceCtx.agentLoaded)
|
||||
require.Nil(t, workspaceCtx.conn)
|
||||
}
|
||||
|
||||
func TestGetWorkspaceConn_StatusCheck(t *testing.T) {
|
||||
// The cache-hit status check re-fetches the agent row for a fresh
|
||||
// heartbeat timestamp. These tests verify that path detects
|
||||
// disconnected or timed-out agents and that healthy or DB-error
|
||||
// paths return the cached connection.
|
||||
t.Parallel()
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
agent database.WorkspaceAgent
|
||||
cacheHit bool
|
||||
dbError bool
|
||||
wantErr error
|
||||
wantDialCalled bool
|
||||
wantReleaseCalled bool
|
||||
}
|
||||
|
||||
tests := []testCase{
|
||||
{
|
||||
name: "DisconnectedAgentCacheMiss",
|
||||
agent: database.WorkspaceAgent{
|
||||
FirstConnectedAt: sql.NullTime{
|
||||
Time: time.Now().Add(-10 * time.Minute),
|
||||
Valid: true,
|
||||
},
|
||||
LastConnectedAt: sql.NullTime{
|
||||
Time: time.Now().Add(-10 * time.Minute),
|
||||
Valid: true,
|
||||
},
|
||||
},
|
||||
wantErr: errChatAgentDisconnected,
|
||||
},
|
||||
{
|
||||
name: "DisconnectedAgentCacheHit",
|
||||
agent: database.WorkspaceAgent{
|
||||
@@ -3749,24 +3954,20 @@ func TestGetWorkspaceConn_StatusCheck(t *testing.T) {
|
||||
Valid: true,
|
||||
},
|
||||
},
|
||||
cacheHit: true,
|
||||
wantErr: errChatAgentDisconnected,
|
||||
wantReleaseCalled: true,
|
||||
},
|
||||
{
|
||||
name: "TimedOutAgentCacheMiss",
|
||||
// Agent never connected and the connection timeout
|
||||
// has elapsed. This is the cache-hit timeout branch
|
||||
// of isAgentUnreachable.
|
||||
name: "TimedOutAgentCacheHit",
|
||||
agent: database.WorkspaceAgent{
|
||||
CreatedAt: time.Now().Add(-10 * time.Minute),
|
||||
ConnectionTimeoutSeconds: 60,
|
||||
},
|
||||
wantErr: errChatAgentDisconnected,
|
||||
},
|
||||
{
|
||||
// A "connecting" agent (never connected, normal after
|
||||
// fresh build) must NOT be blocked by the status check.
|
||||
name: "ConnectingAgentProceeds",
|
||||
agent: database.WorkspaceAgent{},
|
||||
wantDialCalled: true,
|
||||
wantErr: errChatAgentDisconnected,
|
||||
wantReleaseCalled: true,
|
||||
},
|
||||
{
|
||||
name: "CacheHitHealthyAgent",
|
||||
@@ -3780,7 +3981,6 @@ func TestGetWorkspaceConn_StatusCheck(t *testing.T) {
|
||||
Valid: true,
|
||||
},
|
||||
},
|
||||
cacheHit: true,
|
||||
},
|
||||
{
|
||||
// When GetWorkspaceAgentByID returns an error on
|
||||
@@ -3796,8 +3996,7 @@ func TestGetWorkspaceConn_StatusCheck(t *testing.T) {
|
||||
Valid: true,
|
||||
},
|
||||
},
|
||||
cacheHit: true,
|
||||
dbError: true,
|
||||
dbError: true,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -3838,17 +4037,8 @@ func TestGetWorkspaceConn_StatusCheck(t *testing.T) {
|
||||
Times(1)
|
||||
}
|
||||
|
||||
var dialCalled bool
|
||||
var releaseCalled bool
|
||||
|
||||
// For ConnectingAgentProceeds the dial returns a real
|
||||
// mock conn; for all others it should not be reached.
|
||||
var dialConn *agentconnmock.MockAgentConn
|
||||
if tc.wantDialCalled {
|
||||
dialConn = agentconnmock.NewMockAgentConn(ctrl)
|
||||
dialConn.EXPECT().SetExtraHeaders(gomock.Any()).Times(1)
|
||||
}
|
||||
|
||||
server := &Server{
|
||||
db: db,
|
||||
logger: slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}),
|
||||
@@ -3856,16 +4046,13 @@ func TestGetWorkspaceConn_StatusCheck(t *testing.T) {
|
||||
agentInactiveDisconnectTimeout: 30 * time.Second,
|
||||
dialTimeout: defaultDialTimeout,
|
||||
}
|
||||
server.agentConnFn = func(_ context.Context, id uuid.UUID) (workspacesdk.AgentConn, func(), error) {
|
||||
dialCalled = true
|
||||
if dialConn != nil {
|
||||
return dialConn, func() {}, nil
|
||||
}
|
||||
server.agentConnFn = func(context.Context, uuid.UUID) (workspacesdk.AgentConn, func(), error) {
|
||||
return nil, nil, xerrors.New("should not be called")
|
||||
}
|
||||
|
||||
chatStateMu := &sync.Mutex{}
|
||||
currentChat := chat
|
||||
cachedConn := agentconnmock.NewMockAgentConn(ctrl)
|
||||
workspaceCtx := turnWorkspaceContext{
|
||||
server: server,
|
||||
chatStateMu: chatStateMu,
|
||||
@@ -3873,51 +4060,29 @@ func TestGetWorkspaceConn_StatusCheck(t *testing.T) {
|
||||
loadChatSnapshot: func(context.Context, uuid.UUID) (database.Chat, error) {
|
||||
return database.Chat{}, nil
|
||||
},
|
||||
agent: agent,
|
||||
agentLoaded: true,
|
||||
conn: cachedConn,
|
||||
releaseConn: func() { releaseCalled = true },
|
||||
cachedWorkspaceID: chat.WorkspaceID,
|
||||
}
|
||||
defer workspaceCtx.close()
|
||||
|
||||
// For cache-hit tests, pre-populate the cached
|
||||
// connection state.
|
||||
var cachedConn *agentconnmock.MockAgentConn
|
||||
if tc.cacheHit {
|
||||
cachedConn = agentconnmock.NewMockAgentConn(ctrl)
|
||||
workspaceCtx.agent = agent
|
||||
workspaceCtx.agentLoaded = true
|
||||
workspaceCtx.conn = cachedConn
|
||||
workspaceCtx.releaseConn = func() { releaseCalled = true }
|
||||
workspaceCtx.cachedWorkspaceID = chat.WorkspaceID
|
||||
}
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
gotConn, err := workspaceCtx.getWorkspaceConn(ctx)
|
||||
|
||||
switch {
|
||||
case tc.wantErr != nil:
|
||||
if tc.wantErr != nil {
|
||||
require.Nil(t, gotConn)
|
||||
require.ErrorIs(t, err, tc.wantErr)
|
||||
case tc.cacheHit:
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Same(t, cachedConn, gotConn)
|
||||
default:
|
||||
// Cache-miss success (ConnectingAgentProceeds).
|
||||
require.NoError(t, err)
|
||||
require.Same(t, dialConn, gotConn)
|
||||
}
|
||||
|
||||
require.Equal(t, tc.wantDialCalled, dialCalled, "dial called")
|
||||
require.Equal(t, tc.wantReleaseCalled, releaseCalled, "release called")
|
||||
|
||||
// For error cases on cache-miss, the cache should be
|
||||
// cleared.
|
||||
if tc.wantErr != nil && !tc.cacheHit {
|
||||
workspaceCtx.mu.Lock()
|
||||
defer workspaceCtx.mu.Unlock()
|
||||
require.False(t, workspaceCtx.agentLoaded)
|
||||
require.Nil(t, workspaceCtx.conn)
|
||||
}
|
||||
// For cache-hit disconnect, the cache should also be
|
||||
// cleared.
|
||||
if tc.wantErr != nil && tc.cacheHit {
|
||||
// For cache-hit disconnect, the cache should be cleared.
|
||||
if tc.wantErr != nil {
|
||||
workspaceCtx.mu.Lock()
|
||||
defer workspaceCtx.mu.Unlock()
|
||||
require.False(t, workspaceCtx.agentLoaded)
|
||||
|
||||
Reference in New Issue
Block a user