mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix(coderd): handle ignored errors across coderd packages (#22851)
Handle previously ignored error return values in coderd: - coderd/chats.go: check sendEvent errors, log on failure - coderd/chatd/chattest: thread testing.TB through server structs, replace log.Printf with t.Logf, check writeSSEEvent errors - coderd/chatd/chattool/createworkspace.go: log UpdateChatWorkspace failure instead of discarding both return values - coderd/chatd/chattool/execute.go: surface ProcessOutput error in the timeout message returned to the caller - coderd/provisionerdserver: log stream.Send failure in the DownloadFile error helper
This commit is contained in:
committed by
GitHub
parent
3bd840fe27
commit
9d33c340ec
@@ -2557,6 +2557,7 @@ func (p *Server) runChat(
|
||||
CreateFn: p.createWorkspaceFn,
|
||||
AgentConnFn: chattool.AgentConnFunc(p.agentConnFn),
|
||||
WorkspaceMu: &workspaceMu,
|
||||
Logger: p.logger,
|
||||
}),
|
||||
chattool.StartWorkspace(chattool.StartWorkspaceOptions{
|
||||
DB: p.db,
|
||||
|
||||
@@ -96,6 +96,7 @@ type AnthropicDeltaBlock struct {
|
||||
// anthropicServer is a test server that mocks the Anthropic API.
|
||||
type anthropicServer struct {
|
||||
mu sync.Mutex
|
||||
t testing.TB
|
||||
server *httptest.Server
|
||||
handler AnthropicHandler
|
||||
request *AnthropicRequest
|
||||
@@ -109,6 +110,7 @@ func NewAnthropic(t testing.TB, handler AnthropicHandler) string {
|
||||
t.Helper()
|
||||
|
||||
s := &anthropicServer{
|
||||
t: t,
|
||||
handler: handler,
|
||||
}
|
||||
|
||||
@@ -143,7 +145,7 @@ func (s *anthropicServer) handleMessages(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
func (s *anthropicServer) writeResponse(w http.ResponseWriter, req *AnthropicRequest, resp AnthropicResponse) {
|
||||
if resp.Error != nil {
|
||||
writeErrorResponse(w, resp.Error)
|
||||
writeErrorResponse(s.t, w, resp.Error)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -223,7 +225,6 @@ func (s *anthropicServer) writeStreamingResponse(w http.ResponseWriter, chunks <
|
||||
}
|
||||
|
||||
func (s *anthropicServer) writeNonStreamingResponse(w http.ResponseWriter, resp *AnthropicMessage) {
|
||||
_ = s // receiver unused but kept for consistency
|
||||
response := map[string]interface{}{
|
||||
"id": resp.ID,
|
||||
"type": resp.Type,
|
||||
@@ -241,7 +242,9 @@ func (s *anthropicServer) writeNonStreamingResponse(w http.ResponseWriter, resp
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("anthropic-version", "2023-06-01")
|
||||
_ = json.NewEncoder(w).Encode(response)
|
||||
if err := json.NewEncoder(w).Encode(response); err != nil {
|
||||
s.t.Logf("writeNonStreamingResponse: failed to encode response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// AnthropicStreamingResponse creates a streaming response from chunks.
|
||||
|
||||
@@ -3,6 +3,7 @@ package chattest
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// ErrorResponse describes an HTTP error that a test server should return
|
||||
@@ -15,7 +16,7 @@ type ErrorResponse struct {
|
||||
|
||||
// writeErrorResponse writes a JSON error response matching the common
|
||||
// provider error format used by both Anthropic and OpenAI.
|
||||
func writeErrorResponse(w http.ResponseWriter, errResp *ErrorResponse) {
|
||||
func writeErrorResponse(t testing.TB, w http.ResponseWriter, errResp *ErrorResponse) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(errResp.StatusCode)
|
||||
body := map[string]interface{}{
|
||||
@@ -24,7 +25,9 @@ func writeErrorResponse(w http.ResponseWriter, errResp *ErrorResponse) {
|
||||
"message": errResp.Message,
|
||||
},
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(body)
|
||||
if err := json.NewEncoder(w).Encode(body); err != nil {
|
||||
t.Logf("writeErrorResponse: failed to encode error response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// AnthropicErrorResponse returns an AnthropicResponse that causes the
|
||||
|
||||
@@ -113,6 +113,7 @@ type OpenAICompletion struct {
|
||||
// openAIServer is a test server that mocks the OpenAI API.
|
||||
type openAIServer struct {
|
||||
mu sync.Mutex
|
||||
t testing.TB
|
||||
server *httptest.Server
|
||||
handler OpenAIHandler
|
||||
request *OpenAIRequest
|
||||
@@ -126,6 +127,7 @@ func NewOpenAI(t testing.TB, handler OpenAIHandler) string {
|
||||
t.Helper()
|
||||
|
||||
s := &openAIServer{
|
||||
t: t,
|
||||
handler: handler,
|
||||
}
|
||||
|
||||
@@ -176,7 +178,7 @@ func (s *openAIServer) handleResponses(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func (s *openAIServer) writeChatCompletionsResponse(w http.ResponseWriter, req *OpenAIRequest, resp OpenAIResponse) {
|
||||
if resp.Error != nil {
|
||||
writeErrorResponse(w, resp.Error)
|
||||
writeErrorResponse(s.t, w, resp.Error)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -205,7 +207,7 @@ func (s *openAIServer) writeChatCompletionsResponse(w http.ResponseWriter, req *
|
||||
|
||||
func (s *openAIServer) writeResponsesAPIResponse(w http.ResponseWriter, req *OpenAIRequest, resp OpenAIResponse) {
|
||||
if resp.Error != nil {
|
||||
writeErrorResponse(w, resp.Error)
|
||||
writeErrorResponse(s.t, w, resp.Error)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -345,19 +347,25 @@ func writeResponsesAPIStreaming(w http.ResponseWriter, r *http.Request, chunks <
|
||||
// the fantasy client closes open text
|
||||
// blocks and persists the step content.
|
||||
for outputIndex, itemID := range itemIDs {
|
||||
_ = writeSSEEvent(w, responses.ResponseTextDoneEvent{
|
||||
if err := writeSSEEvent(w, responses.ResponseTextDoneEvent{
|
||||
ItemID: itemID,
|
||||
OutputIndex: int64(outputIndex),
|
||||
})
|
||||
_ = writeSSEEvent(w, responses.ResponseOutputItemDoneEvent{
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
if err := writeSSEEvent(w, responses.ResponseOutputItemDoneEvent{
|
||||
OutputIndex: int64(outputIndex),
|
||||
Item: responses.ResponseOutputItemUnion{
|
||||
ID: itemID,
|
||||
Type: "message",
|
||||
},
|
||||
})
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := writeSSEEvent(w, responses.ResponseCompletedEvent{}); err != nil {
|
||||
return
|
||||
}
|
||||
_ = writeSSEEvent(w, responses.ResponseCompletedEvent{})
|
||||
flusher.Flush()
|
||||
return
|
||||
}
|
||||
@@ -411,13 +419,13 @@ func writeResponsesAPIStreaming(w http.ResponseWriter, r *http.Request, chunks <
|
||||
}
|
||||
|
||||
func (s *openAIServer) writeChatCompletionsNonStreaming(w http.ResponseWriter, resp *OpenAICompletion) {
|
||||
_ = s // receiver unused but kept for consistency
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||
s.t.Logf("writeChatCompletionsNonStreaming: failed to encode response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *openAIServer) writeResponsesAPINonStreaming(w http.ResponseWriter, resp *OpenAICompletion) {
|
||||
_ = s // receiver unused but kept for consistency
|
||||
// Convert all choices to output format
|
||||
outputs := make([]map[string]interface{}, len(resp.Choices))
|
||||
for i, choice := range resp.Choices {
|
||||
@@ -443,7 +451,9 @@ func (s *openAIServer) writeResponsesAPINonStreaming(w http.ResponseWriter, resp
|
||||
"usage": resp.Usage,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(response)
|
||||
if err := json.NewEncoder(w).Encode(response); err != nil {
|
||||
s.t.Logf("writeResponsesAPINonStreaming: failed to encode response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// OpenAIStreamingResponse creates a streaming response from chunks.
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/util/namesgenerator"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
@@ -67,6 +68,7 @@ type CreateWorkspaceOptions struct {
|
||||
CreateFn CreateWorkspaceFn
|
||||
AgentConnFn AgentConnFunc
|
||||
WorkspaceMu *sync.Mutex
|
||||
Logger slog.Logger
|
||||
}
|
||||
|
||||
type createWorkspaceArgs struct {
|
||||
@@ -192,13 +194,19 @@ func CreateWorkspace(options CreateWorkspaceOptions) fantasy.AgentTool {
|
||||
|
||||
// Persist workspace + agent association on the chat.
|
||||
if options.DB != nil && options.ChatID != uuid.Nil {
|
||||
_, _ = options.DB.UpdateChatWorkspace(ctx, database.UpdateChatWorkspaceParams{
|
||||
if _, err := options.DB.UpdateChatWorkspace(ctx, database.UpdateChatWorkspaceParams{
|
||||
ID: options.ChatID,
|
||||
WorkspaceID: uuid.NullUUID{
|
||||
UUID: workspace.ID,
|
||||
Valid: true,
|
||||
},
|
||||
})
|
||||
}); err != nil {
|
||||
options.Logger.Warn(ctx, "failed to persist chat workspace association",
|
||||
slog.F("chat_id", options.ChatID),
|
||||
slog.F("workspace_id", workspace.ID),
|
||||
slog.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the agent to come online and startup scripts to finish.
|
||||
|
||||
@@ -245,14 +245,18 @@ func pollProcess(
|
||||
context.Background(),
|
||||
5*time.Second,
|
||||
)
|
||||
outputResp, _ := conn.ProcessOutput(bgCtx, processID)
|
||||
outputResp, outputErr := conn.ProcessOutput(bgCtx, processID)
|
||||
bgCancel()
|
||||
output := truncateOutput(outputResp.Output)
|
||||
timeoutMsg := fmt.Sprintf("command timed out after %s", timeout)
|
||||
if outputErr != nil {
|
||||
timeoutMsg += fmt.Sprintf(" (failed to get output: %v)", outputErr)
|
||||
}
|
||||
return ExecuteResult{
|
||||
Success: false,
|
||||
Output: output,
|
||||
ExitCode: -1,
|
||||
Error: fmt.Sprintf("command timed out after %s", timeout),
|
||||
Error: timeoutMsg,
|
||||
Truncated: outputResp.Truncated,
|
||||
}
|
||||
case <-ticker.C:
|
||||
|
||||
+19
-9
@@ -104,28 +104,34 @@ func (api *API) watchChats(rw http.ResponseWriter, r *http.Request) {
|
||||
api.Logger.Error(ctx, "chat event subscription error", slog.Error(err))
|
||||
return
|
||||
}
|
||||
_ = sendEvent(codersdk.ServerSentEvent{
|
||||
if err := sendEvent(codersdk.ServerSentEvent{
|
||||
Type: codersdk.ServerSentEventTypeData,
|
||||
Data: payload,
|
||||
})
|
||||
}); err != nil {
|
||||
api.Logger.Debug(ctx, "failed to send chat event", slog.Error(err))
|
||||
}
|
||||
},
|
||||
))
|
||||
if err != nil {
|
||||
_ = sendEvent(codersdk.ServerSentEvent{
|
||||
if err := sendEvent(codersdk.ServerSentEvent{
|
||||
Type: codersdk.ServerSentEventTypeError,
|
||||
Data: codersdk.Response{
|
||||
Message: "Internal error subscribing to chat events.",
|
||||
Detail: err.Error(),
|
||||
},
|
||||
})
|
||||
}); err != nil {
|
||||
api.Logger.Debug(ctx, "failed to send chat subscribe error event", slog.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
defer cancelSubscribe()
|
||||
|
||||
// Send initial ping to signal the connection is ready.
|
||||
_ = sendEvent(codersdk.ServerSentEvent{
|
||||
if err := sendEvent(codersdk.ServerSentEvent{
|
||||
Type: codersdk.ServerSentEventTypePing,
|
||||
})
|
||||
}); err != nil {
|
||||
api.Logger.Debug(ctx, "failed to send chat ping event", slog.Error(err))
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -1040,13 +1046,15 @@ func (api *API) streamChat(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
snapshot, events, cancel, ok := api.chatDaemon.Subscribe(ctx, chatID, r.Header, afterMessageID)
|
||||
if !ok {
|
||||
_ = sendEvent(codersdk.ServerSentEvent{
|
||||
if err := sendEvent(codersdk.ServerSentEvent{
|
||||
Type: codersdk.ServerSentEventTypeError,
|
||||
Data: codersdk.Response{
|
||||
Message: "Chat streaming is not available.",
|
||||
Detail: "Chat stream state is not configured.",
|
||||
},
|
||||
})
|
||||
}); err != nil {
|
||||
api.Logger.Debug(ctx, "failed to send chat stream unavailable event", slog.Error(err))
|
||||
}
|
||||
// Ensure the WebSocket is closed so senderClosed
|
||||
// completes and the handler can return.
|
||||
<-senderClosed
|
||||
@@ -2160,7 +2168,9 @@ func (api *API) chatFileByID(rw http.ResponseWriter, r *http.Request) {
|
||||
rw.Header().Set("Cache-Control", "private, max-age=31536000, immutable")
|
||||
rw.Header().Set("Content-Length", strconv.Itoa(len(chatFile.Data)))
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
_, _ = rw.Write(chatFile.Data)
|
||||
if _, err := rw.Write(chatFile.Data); err != nil {
|
||||
api.Logger.Debug(ctx, "failed to write chat file response", slog.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func createChatInputFromRequest(ctx context.Context, db database.Store, req codersdk.CreateChatRequest) (
|
||||
|
||||
@@ -1528,13 +1528,18 @@ func (s *server) DownloadFile(request *proto.FileRequest, stream proto.DRPCProvi
|
||||
|
||||
// A graceful error message will help debugging.
|
||||
fail := func(err error) error {
|
||||
_ = stream.Send(&sdkproto.FileUpload{
|
||||
if sendErr := stream.Send(&sdkproto.FileUpload{
|
||||
Type: &sdkproto.FileUpload_Error{
|
||||
Error: &sdkproto.FailedFile{
|
||||
Error: err.Error(),
|
||||
},
|
||||
},
|
||||
})
|
||||
}); sendErr != nil {
|
||||
s.Logger.Warn(ctx, "failed to send error response on download stream",
|
||||
slog.Error(sendErr),
|
||||
slog.F("original_error", err.Error()),
|
||||
)
|
||||
}
|
||||
return err
|
||||
}
|
||||
if request.FileId == "" || request.FileId == uuid.Nil.String() {
|
||||
|
||||
Reference in New Issue
Block a user