mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix: make TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease deterministic (#23279)
Eliminates the timing flake in
`TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease` by making the
chatd worker loop clock-controllable.
## Changes
**`coderd/chatd/chatd.go`**
- Replace `time.NewTicker` calls in `Server.start()` with
`p.clock.NewTicker` using named quartz tags `("chatd", "acquire")` and
`("chatd", "stale-recovery")`.
**`coderd/chatd/chatd_test.go`**
- Inject `quartz.NewMock(t)` into the test via `newActiveTestServer`
config override.
- Trap the acquire ticker so the test controls exactly when pending
chats are reacquired.
- Rewrite the test flow as explicit clock-advance steps instead of
wall-clock polling.
**`AGENTS.md`**
- Document the PR title scope rule (scope must be a real path containing
all changed files).
## Validation
- `go test ./coderd/chatd -run
TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease -count=100` ✅
- `go test ./coderd/chatd` ✅
- `make lint` ✅
This commit is contained in:
@@ -208,6 +208,11 @@ seems like it should use `time.Sleep`, read through https://github.com/coder/qua
|
||||
|
||||
- Follow [Uber Go Style Guide](https://github.com/uber-go/guide/blob/master/style.md)
|
||||
- Commit format: `type(scope): message`
|
||||
- PR titles follow the same `type(scope): message` format.
|
||||
- When you use a scope, it must be a real filesystem path containing every
|
||||
changed file.
|
||||
- Use a broader path scope, or omit the scope, for cross-cutting changes.
|
||||
- Example: `fix(coderd/chatd): ...` for changes only in `coderd/chatd/`.
|
||||
|
||||
### Frontend Patterns
|
||||
|
||||
|
||||
+21
-11
@@ -21,19 +21,25 @@ import (
|
||||
func TestUserOIDCClaims(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fake := oidctest.NewFakeIDP(t,
|
||||
oidctest.WithServing(),
|
||||
)
|
||||
cfg := fake.OIDCConfig(t, nil, func(cfg *coderd.OIDCConfig) {
|
||||
cfg.AllowSignups = true
|
||||
})
|
||||
ownerClient := coderdtest.New(t, &coderdtest.Options{
|
||||
OIDCConfig: cfg,
|
||||
})
|
||||
newOIDCTest := func(t *testing.T) (*oidctest.FakeIDP, *codersdk.Client) {
|
||||
t.Helper()
|
||||
|
||||
fake := oidctest.NewFakeIDP(t,
|
||||
oidctest.WithServing(),
|
||||
)
|
||||
cfg := fake.OIDCConfig(t, nil, func(cfg *coderd.OIDCConfig) {
|
||||
cfg.AllowSignups = true
|
||||
})
|
||||
ownerClient := coderdtest.New(t, &coderdtest.Options{
|
||||
OIDCConfig: cfg,
|
||||
})
|
||||
return fake, ownerClient
|
||||
}
|
||||
|
||||
t.Run("OwnClaims", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fake, ownerClient := newOIDCTest(t)
|
||||
claims := jwt.MapClaims{
|
||||
"email": "alice@coder.com",
|
||||
"email_verified": true,
|
||||
@@ -61,6 +67,7 @@ func TestUserOIDCClaims(t *testing.T) {
|
||||
t.Run("Table", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fake, ownerClient := newOIDCTest(t)
|
||||
claims := jwt.MapClaims{
|
||||
"email": "bob@coder.com",
|
||||
"email_verified": true,
|
||||
@@ -102,20 +109,22 @@ func TestUserOIDCClaims(t *testing.T) {
|
||||
t.Run("OnlyOwnClaims", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
aliceFake, aliceOwnerClient := newOIDCTest(t)
|
||||
aliceClaims := jwt.MapClaims{
|
||||
"email": "alice-isolation@coder.com",
|
||||
"email_verified": true,
|
||||
"sub": uuid.NewString(),
|
||||
}
|
||||
aliceClient, aliceLoginResp := fake.Login(t, ownerClient, aliceClaims)
|
||||
aliceClient, aliceLoginResp := aliceFake.Login(t, aliceOwnerClient, aliceClaims)
|
||||
defer aliceLoginResp.Body.Close()
|
||||
|
||||
bobFake, bobOwnerClient := newOIDCTest(t)
|
||||
bobClaims := jwt.MapClaims{
|
||||
"email": "bob-isolation@coder.com",
|
||||
"email_verified": true,
|
||||
"sub": uuid.NewString(),
|
||||
}
|
||||
bobClient, bobLoginResp := fake.Login(t, ownerClient, bobClaims)
|
||||
bobClient, bobLoginResp := bobFake.Login(t, bobOwnerClient, bobClaims)
|
||||
defer bobLoginResp.Body.Close()
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitMedium)
|
||||
@@ -134,6 +143,7 @@ func TestUserOIDCClaims(t *testing.T) {
|
||||
t.Run("ClaimsNeverNull", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fake, ownerClient := newOIDCTest(t)
|
||||
// Use minimal claims — just enough for OIDC login.
|
||||
claims := jwt.MapClaims{
|
||||
"email": "minimal@coder.com",
|
||||
|
||||
+10
-2
@@ -1400,11 +1400,19 @@ func (p *Server) start(ctx context.Context) {
|
||||
// to handle chats orphaned by crashed or redeployed workers.
|
||||
p.recoverStaleChats(ctx)
|
||||
|
||||
acquireTicker := time.NewTicker(p.pendingChatAcquireInterval)
|
||||
acquireTicker := p.clock.NewTicker(
|
||||
p.pendingChatAcquireInterval,
|
||||
"chatd",
|
||||
"acquire",
|
||||
)
|
||||
defer acquireTicker.Stop()
|
||||
|
||||
staleRecoveryInterval := p.inFlightChatStaleAfter / staleRecoveryIntervalDivisor
|
||||
staleTicker := time.NewTicker(staleRecoveryInterval)
|
||||
staleTicker := p.clock.NewTicker(
|
||||
staleRecoveryInterval,
|
||||
"chatd",
|
||||
"stale-recovery",
|
||||
)
|
||||
defer staleTicker.Stop()
|
||||
|
||||
for {
|
||||
|
||||
+62
-33
@@ -41,6 +41,7 @@ import (
|
||||
"github.com/coder/coder/v2/provisioner/echo"
|
||||
proto "github.com/coder/coder/v2/provisionersdk/proto"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
"github.com/coder/quartz"
|
||||
)
|
||||
|
||||
func TestInterruptChatBroadcastsStatusAcrossInstances(t *testing.T) {
|
||||
@@ -878,6 +879,8 @@ func TestPromoteQueuedAllowsAlreadyQueuedMessageWhenUsageLimitReached(t *testing
|
||||
func TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const acquireInterval = 10 * time.Millisecond
|
||||
|
||||
db, ps := dbtestutil.NewDB(t)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
|
||||
@@ -888,6 +891,10 @@ func TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
clock := quartz.NewMock(t)
|
||||
acquireTrap := clock.Trap().NewTicker("chatd", "acquire")
|
||||
defer acquireTrap.Close()
|
||||
|
||||
streamStarted := make(chan struct{})
|
||||
interrupted := make(chan struct{})
|
||||
allowFinish := make(chan struct{})
|
||||
@@ -921,18 +928,12 @@ func TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease(t *testing.T) {
|
||||
)
|
||||
})
|
||||
|
||||
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
|
||||
server := chatd.New(chatd.Config{
|
||||
Logger: logger,
|
||||
Database: db,
|
||||
ReplicaID: uuid.New(),
|
||||
Pubsub: ps,
|
||||
PendingChatAcquireInterval: 10 * time.Millisecond,
|
||||
InFlightChatStaleAfter: testutil.WaitSuperLong,
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, server.Close())
|
||||
server := newActiveTestServer(t, db, ps, func(cfg *chatd.Config) {
|
||||
cfg.Clock = clock
|
||||
cfg.PendingChatAcquireInterval = acquireInterval
|
||||
cfg.InFlightChatStaleAfter = testutil.WaitSuperLong
|
||||
})
|
||||
acquireTrap.MustWait(ctx).MustRelease(ctx)
|
||||
|
||||
user, model := seedChatDependencies(ctx, t, db)
|
||||
setOpenAIProviderBaseURL(ctx, t, db, openAIURL)
|
||||
@@ -945,13 +946,7 @@ func TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
fromDB, dbErr := db.GetChatByID(ctx, chat.ID)
|
||||
if dbErr != nil {
|
||||
return false
|
||||
}
|
||||
return fromDB.Status == database.ChatStatusRunning && fromDB.WorkerID.Valid
|
||||
}, testutil.WaitMedium, testutil.IntervalFast)
|
||||
clock.Advance(acquireInterval).MustWait(ctx)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
select {
|
||||
@@ -971,19 +966,6 @@ func TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease(t *testing.T) {
|
||||
require.True(t, queuedResult.Queued)
|
||||
require.NotNil(t, queuedResult.QueuedMessage)
|
||||
|
||||
// Send "later queued" immediately after "queued" while the first
|
||||
// message is still in chat_queued_messages. The existing backlog
|
||||
// (len(existingQueued) > 0) guarantees this is queued regardless
|
||||
// of chat status, avoiding a race where the auto-promoted "queued"
|
||||
// message finishes processing before we can send this.
|
||||
laterQueuedResult, err := server.SendMessage(ctx, chatd.SendMessageOptions{
|
||||
ChatID: chat.ID,
|
||||
Content: []codersdk.ChatMessagePart{codersdk.ChatMessageText("later queued")},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.True(t, laterQueuedResult.Queued)
|
||||
require.NotNil(t, laterQueuedResult.QueuedMessage)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-interrupted:
|
||||
@@ -993,6 +975,32 @@ func TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease(t *testing.T) {
|
||||
}
|
||||
}, testutil.WaitMedium, testutil.IntervalFast)
|
||||
|
||||
close(allowFinish)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
queued, dbErr := db.GetChatQueuedMessages(ctx, chat.ID)
|
||||
if dbErr != nil || len(queued) != 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
fromDB, dbErr := db.GetChatByID(ctx, chat.ID)
|
||||
if dbErr != nil {
|
||||
return false
|
||||
}
|
||||
return fromDB.Status == database.ChatStatusPending && !fromDB.WorkerID.Valid
|
||||
}, testutil.WaitMedium, testutil.IntervalFast)
|
||||
|
||||
// Keep the acquire loop frozen here so "queued" stays pending.
|
||||
// That makes the later send queue because the chat is still busy,
|
||||
// rather than because the scheduler happened to be slow.
|
||||
laterQueuedResult, err := server.SendMessage(ctx, chatd.SendMessageOptions{
|
||||
ChatID: chat.ID,
|
||||
Content: []codersdk.ChatMessagePart{codersdk.ChatMessageText("later queued")},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.True(t, laterQueuedResult.Queued)
|
||||
require.NotNil(t, laterQueuedResult.QueuedMessage)
|
||||
|
||||
spendChat, err := db.InsertChat(ctx, database.InsertChatParams{
|
||||
OwnerID: user.ID,
|
||||
WorkspaceID: uuid.NullUUID{},
|
||||
@@ -1030,7 +1038,25 @@ func TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
close(allowFinish)
|
||||
clock.Advance(acquireInterval).MustWait(ctx)
|
||||
require.Eventually(t, func() bool {
|
||||
return requestCount.Load() >= 2
|
||||
}, testutil.WaitMedium, testutil.IntervalFast)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
queued, dbErr := db.GetChatQueuedMessages(ctx, chat.ID)
|
||||
if dbErr != nil || len(queued) != 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
fromDB, dbErr := db.GetChatByID(ctx, chat.ID)
|
||||
if dbErr != nil {
|
||||
return false
|
||||
}
|
||||
return fromDB.Status == database.ChatStatusPending && !fromDB.WorkerID.Valid
|
||||
}, testutil.WaitMedium, testutil.IntervalFast)
|
||||
|
||||
clock.Advance(acquireInterval).MustWait(ctx)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
queued, dbErr := db.GetChatQueuedMessages(ctx, chat.ID)
|
||||
@@ -1065,7 +1091,10 @@ func TestInterruptAutoPromotionIgnoresLaterUsageLimitIncrease(t *testing.T) {
|
||||
if len(userTexts) != 3 {
|
||||
return false
|
||||
}
|
||||
return userTexts[0] == "hello" && userTexts[1] == "queued" && userTexts[2] == "later queued"
|
||||
return requestCount.Load() >= 3 &&
|
||||
userTexts[0] == "hello" &&
|
||||
userTexts[1] == "queued" &&
|
||||
userTexts[2] == "later queued"
|
||||
}, testutil.WaitLong, testutil.IntervalFast)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user