refactor(site/src/pages/AgentsPage): use createReconnectingWebSocket in git and workspace watchers (#23736)

This commit is contained in:
Danielle Maywood
2026-03-30 14:05:05 +01:00
committed by GitHub
parent 027f93c913
commit 6a2f389110
3 changed files with 166 additions and 124 deletions
+98 -28
View File
@@ -37,6 +37,7 @@ import {
import { isMobileViewport } from "#/utils/mobile";
import { pageTitle } from "#/utils/page";
import { rewriteLocalhostURL } from "#/utils/portForward";
import { createReconnectingWebSocket } from "#/utils/reconnectingWebSocket";
import {
AgentChatPageLoadingView,
AgentChatPageNotFoundView,
@@ -283,6 +284,56 @@ function resolveCompactionThreshold(
return config.compression_threshold;
}
// Compile-time guard: ensures the workspace watcher bailout comparison
// covers every WorkspaceAgent field the UI reads. If WorkspaceAgent
// gains a new field, this will error until the field is either added
// to the comparison or explicitly excluded here.
type _UncoveredAgentFields = Omit<
TypesGen.WorkspaceAgent,
| "id"
| "status"
| "name"
| "expanded_directory"
// Fields below are intentionally not compared. They change
// frequently (stats, metadata) or are objects/arrays that would
// require deep comparison, and the UI does not read them.
| "parent_id"
| "created_at"
| "updated_at"
| "first_connected_at"
| "last_connected_at"
| "disconnected_at"
| "started_at"
| "ready_at"
| "lifecycle_state"
| "resource_id"
| "instance_id"
| "architecture"
| "environment_variables"
| "operating_system"
| "logs_length"
| "logs_overflowed"
| "directory"
| "version"
| "api_version"
| "apps"
| "latency"
| "connection_timeout_seconds"
| "troubleshooting_url"
| "subsystems"
| "health"
| "display_apps"
| "log_sources"
| "scripts"
| "startup_script_behavior"
>;
// If this errors, a new field was added to WorkspaceAgent.
// Decide: does the UI read it? If yes, add it to the first
// section of the Omit above and to the bailout comparison
// in the workspace watcher message handler. If no, add it
// to the excluded section of the Omit.
const _agentFieldGuard: Record<keyof _UncoveredAgentFields, true> = {};
const AgentChatPage: FC = () => {
const { agentId } = useParams<{ agentId: string }>();
const {
@@ -387,35 +438,54 @@ const AgentChatPage: FC = () => {
if (!workspaceId) {
return;
}
const socket = watchWorkspace(workspaceId);
socket.addEventListener("message", (event) => {
if (event.parseError) {
return;
}
if (event.parsedMessage.type === "data") {
const next = event.parsedMessage.data as TypesGen.Workspace;
queryClient.setQueryData<TypesGen.Workspace | undefined>(
workspaceByIdKey(workspaceId),
(prev) => {
// Return the same reference when nothing the UI
// reads has changed. This prevents react-query
// from notifying subscribers and avoids a full
// AgentChatPage re-render on every heartbeat.
if (
prev &&
prev.latest_build.status === next.latest_build.status &&
prev.latest_build.resources === next.latest_build.resources &&
prev.name === next.name &&
prev.owner_name === next.owner_name
) {
return prev;
}
return next;
},
);
}
return createReconnectingWebSocket({
connect() {
const socket = watchWorkspace(workspaceId);
socket.addEventListener("message", (event) => {
if (event.parseError) {
return;
}
if (event.parsedMessage.type === "data") {
const next = event.parsedMessage.data as TypesGen.Workspace;
queryClient.setQueryData<TypesGen.Workspace | undefined>(
workspaceByIdKey(workspaceId),
(prev) => {
// Return the same reference when nothing the UI
// reads has changed. This prevents react-query
// from notifying subscribers and avoids a full
// AgentChatPage re-render on every heartbeat.
const prevAgent = getWorkspaceAgent(prev, undefined);
const nextAgent = getWorkspaceAgent(next, undefined);
if (
prev &&
prev.latest_build.status === next.latest_build.status &&
prev.name === next.name &&
prev.owner_name === next.owner_name &&
prevAgent?.id === nextAgent?.id &&
prevAgent?.status === nextAgent?.status &&
prevAgent?.name === nextAgent?.name &&
prevAgent?.expanded_directory ===
nextAgent?.expanded_directory
) {
return prev;
}
return next;
},
);
}
});
return socket;
},
onOpen() {
// Refetch workspace data on reconnection to cover
// events missed while disconnected. Also fires on the
// initial connection (harmless, may deduplicate with
// the in-flight useQuery fetch).
void queryClient.invalidateQueries({
queryKey: workspaceByIdKey(workspaceId),
});
},
});
return () => socket.close();
}, [workspaceId, queryClient]);
const sshConfigQuery = useQuery(deploymentSSHConfig());
const workspace = workspaceQuery.data;
@@ -169,7 +169,8 @@ describe("useGitWatcher", () => {
expect(result.current.isConnected).toBe(false);
// Simulate the browser firing the close event after
// socket.close() — the disposedRef guard must prevent
// socket.close(). The dispose guard inside
// createReconnectingWebSocket must prevent
// the reconnect handler from scheduling a new attempt.
mockWatchChatGit.mockClear();
act(() => socket.simulateClose());
@@ -403,7 +404,8 @@ describe("useGitWatcher", () => {
expect(socket.close).toHaveBeenCalledTimes(1);
// Simulate the browser firing the close event after
// socket.close() — the disposedRef guard must prevent
// socket.close(). The dispose guard inside
// createReconnectingWebSocket must prevent
// the reconnect handler from scheduling a new attempt.
mockWatchChatGit.mockClear();
act(() => socket.simulateClose());
@@ -6,6 +6,7 @@ import type {
WorkspaceAgentRepoChanges,
WorkspaceAgentStatus,
} from "#/api/typesGenerated";
import { createReconnectingWebSocket } from "#/utils/reconnectingWebSocket";
// Compile-time guard: ensures the bailout comparison in setRepositories
// covers every data field. If WorkspaceAgentRepoChanges gains a new
@@ -34,8 +35,6 @@ interface UseGitWatcherResult {
refresh: () => boolean;
}
const MAX_BACKOFF_MS = 30_000;
export function useGitWatcher({
chatId,
agentStatus,
@@ -46,10 +45,6 @@ export function useGitWatcher({
const [isConnected, setIsConnected] = useState(false);
const socketRef = useRef<WebSocket | null>(null);
const reconnectAttemptRef = useRef(0);
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
// Track whether we've been disposed to avoid reconnecting after unmount.
const disposedRef = useRef(false);
const sendMessage = (msg: WorkspaceAgentGitClientMessage): boolean => {
const socket = socketRef.current;
@@ -69,107 +64,82 @@ export function useGitWatcher({
return;
}
disposedRef.current = false;
const activeChatId = chatId;
function connect() {
if (disposedRef.current) {
return;
}
const dispose = createReconnectingWebSocket({
connect() {
const socket = watchChatGit(activeChatId);
socketRef.current = socket;
const socket = watchChatGit(chatId!);
socketRef.current = socket;
socket.addEventListener("message", (event) => {
// Ignore messages from superseded connections.
if (socketRef.current !== socket) {
return;
}
let data: WorkspaceAgentGitServerMessage;
try {
data = JSON.parse(
String((event as MessageEvent).data),
) as WorkspaceAgentGitServerMessage;
} catch {
// Ignore unparsable messages.
return;
}
socket.addEventListener("open", () => {
// Ignore open events from superseded connections.
if (socketRef.current !== socket) {
return;
}
setIsConnected(true);
reconnectAttemptRef.current = 0;
});
socket.addEventListener("message", (event) => {
// Ignore messages from superseded connections.
if (socketRef.current !== socket) {
return;
}
let data: WorkspaceAgentGitServerMessage;
try {
data = JSON.parse(
String(event.data),
) as WorkspaceAgentGitServerMessage;
} catch {
// Ignore unparsable messages.
return;
}
if (data.type === "changes" && data.repositories) {
setRepositories((prev) => {
let changed = false;
const next = new Map(prev);
for (const repo of data.repositories!) {
if (repo.removed) {
if (next.has(repo.repo_root)) {
next.delete(repo.repo_root);
changed = true;
}
} else {
const existing = next.get(repo.repo_root);
if (
!existing ||
existing.branch !== repo.branch ||
existing.remote_origin !== repo.remote_origin ||
existing.unified_diff !== repo.unified_diff
) {
next.set(repo.repo_root, repo);
changed = true;
if (data.type === "changes" && data.repositories) {
setRepositories((prev) => {
let changed = false;
const next = new Map(prev);
for (const repo of data.repositories!) {
if (repo.removed) {
if (next.has(repo.repo_root)) {
next.delete(repo.repo_root);
changed = true;
}
} else {
const existing = next.get(repo.repo_root);
if (
!existing ||
existing.branch !== repo.branch ||
existing.remote_origin !== repo.remote_origin ||
existing.unified_diff !== repo.unified_diff
) {
next.set(repo.repo_root, repo);
changed = true;
}
}
}
}
return changed ? next : prev;
});
} else if (data.type === "error") {
console.warn("[useGitWatcher] server error:", data.message);
}
});
return changed ? next : prev;
});
} else if (data.type === "error") {
console.warn("[useGitWatcher] server error:", data.message);
}
});
// Note: WebSocket "error" events are always followed by a "close"
// event, so reconnection is handled here.
socket.addEventListener("close", () => {
// Ignore close events from superseded connections.
if (socketRef.current !== socket) {
return;
}
return socket;
},
onOpen() {
setIsConnected(true);
},
onDisconnect() {
setIsConnected(false);
socketRef.current = null;
},
if (disposedRef.current) {
return;
}
// Reconnect with exponential backoff.
const attempt = reconnectAttemptRef.current;
const delay = Math.min(1000 * 2 ** attempt, MAX_BACKOFF_MS);
reconnectAttemptRef.current = attempt + 1;
reconnectTimerRef.current = setTimeout(connect, delay);
});
}
connect();
// 30s cap instead of the utility default 10s. The git
// endpoint may be slow to respond after a workspace wakes.
maxMs: 30_000,
});
return () => {
disposedRef.current = true;
if (reconnectTimerRef.current !== null) {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = null;
}
if (socketRef.current) {
socketRef.current.close();
socketRef.current = null;
}
// dispose() suppresses onDisconnect, so reset state
// explicitly.
dispose();
setIsConnected(false);
setRepositories(new Map());
reconnectAttemptRef.current = 0;
socketRef.current = null;
};
}, [chatId, agentStatus]);