mirror of
https://github.com/coder/coder.git
synced 2026-06-03 13:08:25 +00:00
1f37df4db3
## Summary Scale-tested the `chatd` package with mock-based benchmarks to identify performance bottlenecks. This PR fixes 6 of the 8 identified issues, ranked by severity. ## Changes ### 1. Parallel tool execution (HIGH) — `chatloop.go` `executeTools` ran tool calls sequentially. Now dispatches all calls concurrently via goroutines with `sync.WaitGroup`. Results are pre-allocated by index (no mutex needed). `onResult` callbacks fire as each tool completes. ### 2. Pubsub-backed subagent await (HIGH) — `subagent.go` `awaitSubagentCompletion` polled the DB every 200ms. Now subscribes to the child chat's `ChatStreamNotifyChannel` via pubsub for near-instant notifications. Fallback poll reduced to 5s. Falls back to 200ms only when `pubsub == nil` (single-instance / in-memory). ### 3. Per-chat stream locking (MEDIUM) — `chatd.go` Replaced single global `streamMu` + `map[uuid.UUID]*chatStreamState` with `sync.Map` where each `chatStreamState` has its own `sync.Mutex`. Zero cross-chat contention. ### 4. Batch chat acquisition (MEDIUM) — `chatd.go` `processOnce` acquired 1 chat per tick. Now loops up to `maxChatsPerAcquire = 10` per tick, avoiding idle time when many chats are pending. ### 5. Reduced heartbeat frequency (LOW-MEDIUM) — `chatd.go` `chatHeartbeatInterval` changed from 30s to 60s. Safe given the 5-minute `DefaultInFlightChatStaleAfter`. ### 6. O(depth) descendant check (LOW) — `subagent.go` Replaced top-down BFS (`O(total_descendants)` queries) with bottom-up parent-chain walk (`O(depth)` queries). Includes cycle protection. ## Not addressed (intentionally) - Message serialization overhead - Buffer eviction (`buffer[1:]` pattern)
508 lines
11 KiB
SQL
508 lines
11 KiB
SQL
-- name: ArchiveChatByID :exec
|
|
UPDATE chats SET archived = true, updated_at = NOW()
|
|
WHERE id = @id OR root_chat_id = @id;
|
|
|
|
-- name: UnarchiveChatByID :exec
|
|
UPDATE chats SET archived = false, updated_at = NOW() WHERE id = @id::uuid;
|
|
|
|
-- name: DeleteChatMessagesByChatID :exec
|
|
DELETE FROM
|
|
chat_messages
|
|
WHERE
|
|
chat_id = @chat_id::uuid;
|
|
|
|
-- name: DeleteChatMessagesAfterID :exec
|
|
DELETE FROM
|
|
chat_messages
|
|
WHERE
|
|
chat_id = @chat_id::uuid
|
|
AND id > @after_id::bigint;
|
|
|
|
-- name: GetChatByID :one
|
|
SELECT
|
|
*
|
|
FROM
|
|
chats
|
|
WHERE
|
|
id = @id::uuid;
|
|
|
|
-- name: GetChatMessageByID :one
|
|
SELECT
|
|
*
|
|
FROM
|
|
chat_messages
|
|
WHERE
|
|
id = @id::bigint;
|
|
|
|
-- name: GetChatMessagesByChatID :many
|
|
SELECT
|
|
*
|
|
FROM
|
|
chat_messages
|
|
WHERE
|
|
chat_id = @chat_id::uuid
|
|
AND id > @after_id::bigint
|
|
AND visibility IN ('user', 'both')
|
|
ORDER BY
|
|
created_at ASC;
|
|
|
|
-- name: GetChatMessagesForPromptByChatID :many
|
|
WITH latest_compressed_summary AS (
|
|
SELECT
|
|
id
|
|
FROM
|
|
chat_messages
|
|
WHERE
|
|
chat_id = @chat_id::uuid
|
|
AND compressed = TRUE
|
|
AND visibility = 'model'
|
|
ORDER BY
|
|
created_at DESC,
|
|
id DESC
|
|
LIMIT
|
|
1
|
|
)
|
|
SELECT
|
|
*
|
|
FROM
|
|
chat_messages
|
|
WHERE
|
|
chat_id = @chat_id::uuid
|
|
AND visibility IN ('model', 'both')
|
|
AND (
|
|
(
|
|
role = 'system'
|
|
AND compressed = FALSE
|
|
)
|
|
OR (
|
|
compressed = FALSE
|
|
AND (
|
|
NOT EXISTS (
|
|
SELECT
|
|
1
|
|
FROM
|
|
latest_compressed_summary
|
|
)
|
|
OR id > (
|
|
SELECT
|
|
id
|
|
FROM
|
|
latest_compressed_summary
|
|
)
|
|
)
|
|
)
|
|
OR id = (
|
|
SELECT
|
|
id
|
|
FROM
|
|
latest_compressed_summary
|
|
)
|
|
)
|
|
ORDER BY
|
|
created_at ASC,
|
|
id ASC;
|
|
|
|
-- name: GetChatsByOwnerID :many
|
|
SELECT
|
|
*
|
|
FROM
|
|
chats
|
|
WHERE
|
|
owner_id = @owner_id::uuid
|
|
AND CASE
|
|
WHEN sqlc.narg('archived') :: boolean IS NULL THEN true
|
|
ELSE chats.archived = sqlc.narg('archived') :: boolean
|
|
END
|
|
AND CASE
|
|
-- This allows using the last element on a page as effectively a cursor.
|
|
-- This is an important option for scripts that need to paginate without
|
|
-- duplicating or missing data.
|
|
WHEN @after_id :: uuid != '00000000-0000-0000-0000-000000000000'::uuid THEN (
|
|
-- The pagination cursor is the last ID of the previous page.
|
|
-- The query is ordered by the updated_at field, so select all
|
|
-- rows before the cursor.
|
|
(updated_at, id) < (
|
|
SELECT
|
|
updated_at, id
|
|
FROM
|
|
chats
|
|
WHERE
|
|
id = @after_id
|
|
)
|
|
)
|
|
ELSE true
|
|
END
|
|
ORDER BY
|
|
-- Deterministic and consistent ordering of all rows, even if they share
|
|
-- a timestamp. This is to ensure consistent pagination.
|
|
(updated_at, id) DESC OFFSET @offset_opt
|
|
LIMIT
|
|
-- The chat list is unbounded and expected to grow large.
|
|
-- Default to 50 to prevent accidental excessively large queries.
|
|
COALESCE(NULLIF(@limit_opt :: int, 0), 50);
|
|
|
|
-- name: ListChildChatsByParentID :many
|
|
SELECT
|
|
*
|
|
FROM
|
|
chats
|
|
WHERE
|
|
parent_chat_id = @parent_chat_id::uuid
|
|
ORDER BY
|
|
created_at ASC;
|
|
|
|
-- name: ListChatsByRootID :many
|
|
SELECT
|
|
*
|
|
FROM
|
|
chats
|
|
WHERE
|
|
root_chat_id = @root_chat_id::uuid
|
|
ORDER BY
|
|
created_at ASC;
|
|
|
|
-- name: InsertChat :one
|
|
INSERT INTO chats (
|
|
owner_id,
|
|
workspace_id,
|
|
parent_chat_id,
|
|
root_chat_id,
|
|
last_model_config_id,
|
|
title
|
|
) VALUES (
|
|
@owner_id::uuid,
|
|
sqlc.narg('workspace_id')::uuid,
|
|
sqlc.narg('parent_chat_id')::uuid,
|
|
sqlc.narg('root_chat_id')::uuid,
|
|
@last_model_config_id::uuid,
|
|
@title::text
|
|
)
|
|
RETURNING
|
|
*;
|
|
|
|
-- name: InsertChatMessage :one
|
|
WITH updated_chat AS (
|
|
UPDATE
|
|
chats
|
|
SET
|
|
last_model_config_id = sqlc.narg('model_config_id')::uuid
|
|
WHERE
|
|
id = @chat_id::uuid
|
|
AND sqlc.narg('model_config_id')::uuid IS NOT NULL
|
|
)
|
|
INSERT INTO chat_messages (
|
|
chat_id,
|
|
created_by,
|
|
model_config_id,
|
|
role,
|
|
content,
|
|
visibility,
|
|
input_tokens,
|
|
output_tokens,
|
|
total_tokens,
|
|
reasoning_tokens,
|
|
cache_creation_tokens,
|
|
cache_read_tokens,
|
|
context_limit,
|
|
compressed
|
|
) VALUES (
|
|
@chat_id::uuid,
|
|
sqlc.narg('created_by')::uuid,
|
|
sqlc.narg('model_config_id')::uuid,
|
|
@role::text,
|
|
sqlc.narg('content')::jsonb,
|
|
@visibility::chat_message_visibility,
|
|
sqlc.narg('input_tokens')::bigint,
|
|
sqlc.narg('output_tokens')::bigint,
|
|
sqlc.narg('total_tokens')::bigint,
|
|
sqlc.narg('reasoning_tokens')::bigint,
|
|
sqlc.narg('cache_creation_tokens')::bigint,
|
|
sqlc.narg('cache_read_tokens')::bigint,
|
|
sqlc.narg('context_limit')::bigint,
|
|
COALESCE(sqlc.narg('compressed')::boolean, FALSE)
|
|
)
|
|
RETURNING
|
|
*;
|
|
|
|
-- name: UpdateChatMessageByID :one
|
|
UPDATE
|
|
chat_messages
|
|
SET
|
|
model_config_id = COALESCE(sqlc.narg('model_config_id')::uuid, model_config_id),
|
|
content = sqlc.narg('content')::jsonb
|
|
WHERE
|
|
id = @id::bigint
|
|
RETURNING
|
|
*;
|
|
|
|
-- name: UpdateChatByID :one
|
|
UPDATE
|
|
chats
|
|
SET
|
|
title = @title::text,
|
|
updated_at = NOW()
|
|
WHERE
|
|
id = @id::uuid
|
|
RETURNING
|
|
*;
|
|
|
|
-- name: UpdateChatWorkspace :one
|
|
UPDATE
|
|
chats
|
|
SET
|
|
workspace_id = sqlc.narg('workspace_id')::uuid,
|
|
updated_at = NOW()
|
|
WHERE
|
|
id = @id::uuid
|
|
RETURNING
|
|
*;
|
|
|
|
-- name: AcquireChats :many
|
|
-- Acquires up to @num_chats pending chats for processing. Uses SKIP LOCKED
|
|
-- to prevent multiple replicas from acquiring the same chat.
|
|
UPDATE
|
|
chats
|
|
SET
|
|
status = 'running'::chat_status,
|
|
started_at = @started_at::timestamptz,
|
|
heartbeat_at = @started_at::timestamptz,
|
|
updated_at = @started_at::timestamptz,
|
|
worker_id = @worker_id::uuid
|
|
WHERE
|
|
id = ANY(
|
|
SELECT
|
|
id
|
|
FROM
|
|
chats
|
|
WHERE
|
|
status = 'pending'::chat_status
|
|
ORDER BY
|
|
updated_at ASC
|
|
FOR UPDATE
|
|
SKIP LOCKED
|
|
LIMIT
|
|
@num_chats::int
|
|
)
|
|
RETURNING
|
|
*;
|
|
|
|
-- name: UpdateChatStatus :one
|
|
UPDATE
|
|
chats
|
|
SET
|
|
status = @status::chat_status,
|
|
worker_id = sqlc.narg('worker_id')::uuid,
|
|
started_at = sqlc.narg('started_at')::timestamptz,
|
|
heartbeat_at = sqlc.narg('heartbeat_at')::timestamptz,
|
|
last_error = sqlc.narg('last_error')::text,
|
|
updated_at = NOW()
|
|
WHERE
|
|
id = @id::uuid
|
|
RETURNING
|
|
*;
|
|
|
|
-- name: GetStaleChats :many
|
|
-- Find chats that appear stuck (running but heartbeat has expired).
|
|
-- Used for recovery after coderd crashes or long hangs.
|
|
SELECT
|
|
*
|
|
FROM
|
|
chats
|
|
WHERE
|
|
status = 'running'::chat_status
|
|
AND heartbeat_at < @stale_threshold::timestamptz;
|
|
|
|
-- name: UpdateChatHeartbeat :execrows
|
|
-- Bumps the heartbeat timestamp for a running chat so that other
|
|
-- replicas know the worker is still alive.
|
|
UPDATE
|
|
chats
|
|
SET
|
|
heartbeat_at = NOW()
|
|
WHERE
|
|
id = @id::uuid
|
|
AND worker_id = @worker_id::uuid
|
|
AND status = 'running'::chat_status;
|
|
|
|
-- name: GetChatDiffStatusByChatID :one
|
|
SELECT
|
|
*
|
|
FROM
|
|
chat_diff_statuses
|
|
WHERE
|
|
chat_id = @chat_id::uuid;
|
|
|
|
-- name: GetChatDiffStatusesByChatIDs :many
|
|
SELECT
|
|
*
|
|
FROM
|
|
chat_diff_statuses
|
|
WHERE
|
|
chat_id = ANY(@chat_ids::uuid[]);
|
|
|
|
-- name: UpsertChatDiffStatusReference :one
|
|
INSERT INTO chat_diff_statuses (
|
|
chat_id,
|
|
url,
|
|
git_branch,
|
|
git_remote_origin,
|
|
stale_at
|
|
) VALUES (
|
|
@chat_id::uuid,
|
|
sqlc.narg('url')::text,
|
|
@git_branch::text,
|
|
@git_remote_origin::text,
|
|
@stale_at::timestamptz
|
|
)
|
|
ON CONFLICT (chat_id) DO UPDATE
|
|
SET
|
|
url = CASE
|
|
WHEN EXCLUDED.url IS NOT NULL THEN EXCLUDED.url
|
|
ELSE chat_diff_statuses.url
|
|
END,
|
|
git_branch = CASE
|
|
WHEN EXCLUDED.git_branch != '' THEN EXCLUDED.git_branch
|
|
ELSE chat_diff_statuses.git_branch
|
|
END,
|
|
git_remote_origin = CASE
|
|
WHEN EXCLUDED.git_remote_origin != '' THEN EXCLUDED.git_remote_origin
|
|
ELSE chat_diff_statuses.git_remote_origin
|
|
END,
|
|
stale_at = EXCLUDED.stale_at,
|
|
updated_at = NOW()
|
|
RETURNING
|
|
*;
|
|
|
|
-- name: UpsertChatDiffStatus :one
|
|
INSERT INTO chat_diff_statuses (
|
|
chat_id,
|
|
url,
|
|
pull_request_state,
|
|
pull_request_title,
|
|
pull_request_draft,
|
|
changes_requested,
|
|
additions,
|
|
deletions,
|
|
changed_files,
|
|
refreshed_at,
|
|
stale_at
|
|
) VALUES (
|
|
@chat_id::uuid,
|
|
sqlc.narg('url')::text,
|
|
sqlc.narg('pull_request_state')::text,
|
|
@pull_request_title::text,
|
|
@pull_request_draft::boolean,
|
|
@changes_requested::boolean,
|
|
@additions::integer,
|
|
@deletions::integer,
|
|
@changed_files::integer,
|
|
@refreshed_at::timestamptz,
|
|
@stale_at::timestamptz
|
|
)
|
|
ON CONFLICT (chat_id) DO UPDATE
|
|
SET
|
|
url = EXCLUDED.url,
|
|
pull_request_state = EXCLUDED.pull_request_state,
|
|
pull_request_title = EXCLUDED.pull_request_title,
|
|
pull_request_draft = EXCLUDED.pull_request_draft,
|
|
changes_requested = EXCLUDED.changes_requested,
|
|
additions = EXCLUDED.additions,
|
|
deletions = EXCLUDED.deletions,
|
|
changed_files = EXCLUDED.changed_files,
|
|
refreshed_at = EXCLUDED.refreshed_at,
|
|
stale_at = EXCLUDED.stale_at,
|
|
updated_at = NOW()
|
|
RETURNING
|
|
*;
|
|
|
|
-- name: InsertChatQueuedMessage :one
|
|
INSERT INTO chat_queued_messages (chat_id, content)
|
|
VALUES (@chat_id, @content)
|
|
RETURNING *;
|
|
|
|
-- name: GetChatQueuedMessages :many
|
|
SELECT * FROM chat_queued_messages
|
|
WHERE chat_id = @chat_id
|
|
ORDER BY id ASC;
|
|
|
|
-- name: DeleteChatQueuedMessage :exec
|
|
DELETE FROM chat_queued_messages WHERE id = @id AND chat_id = @chat_id;
|
|
|
|
-- name: DeleteAllChatQueuedMessages :exec
|
|
DELETE FROM chat_queued_messages WHERE chat_id = @chat_id;
|
|
|
|
-- name: PopNextQueuedMessage :one
|
|
DELETE FROM chat_queued_messages
|
|
WHERE id = (
|
|
SELECT cqm.id FROM chat_queued_messages cqm
|
|
WHERE cqm.chat_id = @chat_id
|
|
ORDER BY cqm.id ASC
|
|
LIMIT 1
|
|
)
|
|
RETURNING *;
|
|
|
|
-- name: GetLastChatMessageByRole :one
|
|
SELECT
|
|
*
|
|
FROM
|
|
chat_messages
|
|
WHERE
|
|
chat_id = @chat_id::uuid
|
|
AND role = @role::text
|
|
ORDER BY
|
|
created_at DESC, id DESC
|
|
LIMIT
|
|
1;
|
|
|
|
-- name: GetChatByIDForUpdate :one
|
|
SELECT * FROM chats WHERE id = @id::uuid FOR UPDATE;
|
|
|
|
-- name: AcquireStaleChatDiffStatuses :many
|
|
WITH acquired AS (
|
|
UPDATE
|
|
chat_diff_statuses
|
|
SET
|
|
-- Claim for 5 minutes. The worker sets the real stale_at
|
|
-- after refresh. If the worker crashes, rows become eligible
|
|
-- again after this interval.
|
|
stale_at = NOW() + INTERVAL '5 minutes',
|
|
updated_at = NOW()
|
|
WHERE
|
|
chat_id IN (
|
|
SELECT
|
|
cds.chat_id
|
|
FROM
|
|
chat_diff_statuses cds
|
|
INNER JOIN
|
|
chats c ON c.id = cds.chat_id
|
|
WHERE
|
|
cds.stale_at <= NOW()
|
|
AND cds.git_remote_origin != ''
|
|
AND cds.git_branch != ''
|
|
AND c.archived = FALSE
|
|
ORDER BY
|
|
cds.stale_at ASC
|
|
FOR UPDATE OF cds
|
|
SKIP LOCKED
|
|
LIMIT
|
|
@limit_val::int
|
|
)
|
|
RETURNING *
|
|
)
|
|
SELECT
|
|
acquired.*,
|
|
c.owner_id
|
|
FROM
|
|
acquired
|
|
INNER JOIN
|
|
chats c ON c.id = acquired.chat_id;
|
|
|
|
-- name: BackoffChatDiffStatus :exec
|
|
UPDATE
|
|
chat_diff_statuses
|
|
SET
|
|
stale_at = @stale_at::timestamptz,
|
|
updated_at = NOW()
|
|
WHERE
|
|
chat_id = @chat_id::uuid;
|