Files
coder/coderd/database/queries/chats.sql
T
Kyle Carberry 5eebd3829f fix: use cursor-based query for chat stream notifications (#22510)
## Problem

The pubsub notification handler in `chatd` re-fetched **all** messages
from the DB on every new message notification, then filtered in Go with
`msg.ID > lastMessageID`. This grows linearly with conversation length —
every new message triggers a full table scan of that chat's history.

The `AfterMessageID` field in the pubsub notification payload was
clearly designed for cursor-based fetching, but no matching query
existed.

## Fix

- Add `GetChatMessagesByChatIDAfter` SQL query with `WHERE id >
@after_id`, so the database does the filtering instead of Go.
- Use it in the pubsub notification handler in `chatd.go`, passing
`lastMessageID` as the cursor.
- Implement the dbauthz wrapper (was a `panic("not implemented")` stub
from codegen) with the same read-check-on-parent-chat pattern as
adjacent methods.
- Add dbauthz test coverage for the new method.

**Not changed:** The initial snapshot in `Subscribe()` still loads all
messages — that's correct, since a newly-connecting client needs the
full conversation state. The waste was only in the ongoing notification
path.
2026-03-02 16:31:04 -05:00

411 lines
8.5 KiB
SQL

-- name: ArchiveChatByID :exec
UPDATE chats SET archived = true, updated_at = NOW()
WHERE id = @id OR root_chat_id = @id;
-- name: UnarchiveChatByID :exec
UPDATE chats SET archived = false, updated_at = NOW() WHERE id = @id::uuid;
-- name: DeleteChatMessagesByChatID :exec
DELETE FROM
chat_messages
WHERE
chat_id = @chat_id::uuid;
-- name: DeleteChatMessagesAfterID :exec
DELETE FROM
chat_messages
WHERE
chat_id = @chat_id::uuid
AND id > @after_id::bigint;
-- name: GetChatByID :one
SELECT
*
FROM
chats
WHERE
id = @id::uuid;
-- name: GetChatMessageByID :one
SELECT
*
FROM
chat_messages
WHERE
id = @id::bigint;
-- name: GetChatMessagesByChatID :many
SELECT
*
FROM
chat_messages
WHERE
chat_id = @chat_id::uuid
AND id > @after_id::bigint
AND visibility IN ('user', 'both')
ORDER BY
created_at ASC;
-- name: GetChatMessagesForPromptByChatID :many
WITH latest_compressed_summary AS (
SELECT
id
FROM
chat_messages
WHERE
chat_id = @chat_id::uuid
AND role = 'system'
AND visibility IN ('model', 'both')
AND compressed = TRUE
ORDER BY
created_at DESC,
id DESC
LIMIT
1
)
SELECT
*
FROM
chat_messages
WHERE
chat_id = @chat_id::uuid
AND visibility IN ('model', 'both')
AND (
(
role = 'system'
AND compressed = FALSE
)
OR (
compressed = FALSE
AND (
NOT EXISTS (
SELECT
1
FROM
latest_compressed_summary
)
OR id > (
SELECT
id
FROM
latest_compressed_summary
)
)
)
OR id = (
SELECT
id
FROM
latest_compressed_summary
)
)
ORDER BY
created_at ASC,
id ASC;
-- name: GetChatsByOwnerID :many
SELECT
*
FROM
chats
WHERE
owner_id = @owner_id::uuid
AND archived = false
ORDER BY
updated_at DESC;
-- name: ListChildChatsByParentID :many
SELECT
*
FROM
chats
WHERE
parent_chat_id = @parent_chat_id::uuid
ORDER BY
created_at ASC;
-- name: ListChatsByRootID :many
SELECT
*
FROM
chats
WHERE
root_chat_id = @root_chat_id::uuid
ORDER BY
created_at ASC;
-- name: InsertChat :one
INSERT INTO chats (
owner_id,
workspace_id,
parent_chat_id,
root_chat_id,
last_model_config_id,
title
) VALUES (
@owner_id::uuid,
sqlc.narg('workspace_id')::uuid,
sqlc.narg('parent_chat_id')::uuid,
sqlc.narg('root_chat_id')::uuid,
@last_model_config_id::uuid,
@title::text
)
RETURNING
*;
-- name: InsertChatMessage :one
WITH updated_chat AS (
UPDATE
chats
SET
last_model_config_id = sqlc.narg('model_config_id')::uuid
WHERE
id = @chat_id::uuid
AND sqlc.narg('model_config_id')::uuid IS NOT NULL
)
INSERT INTO chat_messages (
chat_id,
model_config_id,
role,
content,
visibility,
input_tokens,
output_tokens,
total_tokens,
reasoning_tokens,
cache_creation_tokens,
cache_read_tokens,
context_limit,
compressed
) VALUES (
@chat_id::uuid,
sqlc.narg('model_config_id')::uuid,
@role::text,
sqlc.narg('content')::jsonb,
@visibility::chat_message_visibility,
sqlc.narg('input_tokens')::bigint,
sqlc.narg('output_tokens')::bigint,
sqlc.narg('total_tokens')::bigint,
sqlc.narg('reasoning_tokens')::bigint,
sqlc.narg('cache_creation_tokens')::bigint,
sqlc.narg('cache_read_tokens')::bigint,
sqlc.narg('context_limit')::bigint,
COALESCE(sqlc.narg('compressed')::boolean, FALSE)
)
RETURNING
*;
-- name: UpdateChatMessageByID :one
UPDATE
chat_messages
SET
model_config_id = COALESCE(sqlc.narg('model_config_id')::uuid, model_config_id),
content = sqlc.narg('content')::jsonb
WHERE
id = @id::bigint
RETURNING
*;
-- name: UpdateChatByID :one
UPDATE
chats
SET
title = @title::text,
updated_at = NOW()
WHERE
id = @id::uuid
RETURNING
*;
-- name: UpdateChatWorkspace :one
UPDATE
chats
SET
workspace_id = sqlc.narg('workspace_id')::uuid,
updated_at = NOW()
WHERE
id = @id::uuid
RETURNING
*;
-- name: AcquireChat :one
-- Acquires a pending chat for processing. Uses SKIP LOCKED to prevent
-- multiple replicas from acquiring the same chat.
UPDATE
chats
SET
status = 'running'::chat_status,
started_at = @started_at::timestamptz,
heartbeat_at = @started_at::timestamptz,
updated_at = @started_at::timestamptz,
worker_id = @worker_id::uuid
WHERE
id = (
SELECT
id
FROM
chats
WHERE
status = 'pending'::chat_status
ORDER BY
updated_at ASC
FOR UPDATE
SKIP LOCKED
LIMIT
1
)
RETURNING
*;
-- name: UpdateChatStatus :one
UPDATE
chats
SET
status = @status::chat_status,
worker_id = sqlc.narg('worker_id')::uuid,
started_at = sqlc.narg('started_at')::timestamptz,
heartbeat_at = sqlc.narg('heartbeat_at')::timestamptz,
last_error = sqlc.narg('last_error')::text,
updated_at = NOW()
WHERE
id = @id::uuid
RETURNING
*;
-- name: GetStaleChats :many
-- Find chats that appear stuck (running but heartbeat has expired).
-- Used for recovery after coderd crashes or long hangs.
SELECT
*
FROM
chats
WHERE
status = 'running'::chat_status
AND heartbeat_at < @stale_threshold::timestamptz;
-- name: UpdateChatHeartbeat :execrows
-- Bumps the heartbeat timestamp for a running chat so that other
-- replicas know the worker is still alive.
UPDATE
chats
SET
heartbeat_at = NOW()
WHERE
id = @id::uuid
AND worker_id = @worker_id::uuid
AND status = 'running'::chat_status;
-- name: GetChatDiffStatusByChatID :one
SELECT
*
FROM
chat_diff_statuses
WHERE
chat_id = @chat_id::uuid;
-- name: GetChatDiffStatusesByChatIDs :many
SELECT
*
FROM
chat_diff_statuses
WHERE
chat_id = ANY(@chat_ids::uuid[]);
-- name: UpsertChatDiffStatusReference :one
INSERT INTO chat_diff_statuses (
chat_id,
url,
git_branch,
git_remote_origin,
stale_at
) VALUES (
@chat_id::uuid,
sqlc.narg('url')::text,
@git_branch::text,
@git_remote_origin::text,
@stale_at::timestamptz
)
ON CONFLICT (chat_id) DO UPDATE
SET
url = CASE
WHEN EXCLUDED.url IS NOT NULL THEN EXCLUDED.url
ELSE chat_diff_statuses.url
END,
git_branch = CASE
WHEN EXCLUDED.git_branch != '' THEN EXCLUDED.git_branch
ELSE chat_diff_statuses.git_branch
END,
git_remote_origin = CASE
WHEN EXCLUDED.git_remote_origin != '' THEN EXCLUDED.git_remote_origin
ELSE chat_diff_statuses.git_remote_origin
END,
stale_at = EXCLUDED.stale_at,
updated_at = NOW()
RETURNING
*;
-- name: UpsertChatDiffStatus :one
INSERT INTO chat_diff_statuses (
chat_id,
url,
pull_request_state,
changes_requested,
additions,
deletions,
changed_files,
refreshed_at,
stale_at
) VALUES (
@chat_id::uuid,
sqlc.narg('url')::text,
sqlc.narg('pull_request_state')::text,
@changes_requested::boolean,
@additions::integer,
@deletions::integer,
@changed_files::integer,
@refreshed_at::timestamptz,
@stale_at::timestamptz
)
ON CONFLICT (chat_id) DO UPDATE
SET
url = EXCLUDED.url,
pull_request_state = EXCLUDED.pull_request_state,
changes_requested = EXCLUDED.changes_requested,
additions = EXCLUDED.additions,
deletions = EXCLUDED.deletions,
changed_files = EXCLUDED.changed_files,
refreshed_at = EXCLUDED.refreshed_at,
stale_at = EXCLUDED.stale_at,
updated_at = NOW()
RETURNING
*;
-- name: InsertChatQueuedMessage :one
INSERT INTO chat_queued_messages (chat_id, content)
VALUES (@chat_id, @content)
RETURNING *;
-- name: GetChatQueuedMessages :many
SELECT * FROM chat_queued_messages
WHERE chat_id = @chat_id
ORDER BY id ASC;
-- name: DeleteChatQueuedMessage :exec
DELETE FROM chat_queued_messages WHERE id = @id AND chat_id = @chat_id;
-- name: DeleteAllChatQueuedMessages :exec
DELETE FROM chat_queued_messages WHERE chat_id = @chat_id;
-- name: PopNextQueuedMessage :one
DELETE FROM chat_queued_messages
WHERE id = (
SELECT cqm.id FROM chat_queued_messages cqm
WHERE cqm.chat_id = @chat_id
ORDER BY cqm.id ASC
LIMIT 1
)
RETURNING *;
-- name: GetChatByIDForUpdate :one
SELECT * FROM chats WHERE id = @id::uuid FOR UPDATE;