fix: reduce excessive logging when database is unreachable (#17363)

Fixes #17045

---------

Signed-off-by: Danny Kopping <dannykopping@gmail.com>
This commit is contained in:
Danny Kopping
2025-04-15 10:55:30 +02:00
committed by GitHub
parent 2f99d70640
commit 0b18e458f4
10 changed files with 160 additions and 20 deletions
+1
View File
@@ -679,6 +679,7 @@ func New(options *Options) *API {
DERPFn: api.DERPMap, DERPFn: api.DERPMap,
Logger: options.Logger, Logger: options.Logger,
ClientID: uuid.New(), ClientID: uuid.New(),
DatabaseHealthCheck: api.Database,
} }
stn, err := NewServerTailnet(api.ctx, stn, err := NewServerTailnet(api.ctx,
options.Logger, options.Logger,
+15 -1
View File
@@ -24,9 +24,11 @@ import (
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"cdr.dev/slog" "cdr.dev/slog"
"github.com/coder/coder/v2/coderd/tracing" "github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/coderd/workspaceapps/appurl" "github.com/coder/coder/v2/coderd/workspaceapps/appurl"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/site" "github.com/coder/coder/v2/site"
"github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet"
@@ -534,6 +536,10 @@ func NewMultiAgentController(ctx context.Context, logger slog.Logger, tracer tra
return m return m
} }
type Pinger interface {
Ping(context.Context) (time.Duration, error)
}
// InmemTailnetDialer is a tailnet.ControlProtocolDialer that connects to a Coordinator and DERPMap // InmemTailnetDialer is a tailnet.ControlProtocolDialer that connects to a Coordinator and DERPMap
// service running in the same memory space. // service running in the same memory space.
type InmemTailnetDialer struct { type InmemTailnetDialer struct {
@@ -541,9 +547,17 @@ type InmemTailnetDialer struct {
DERPFn func() *tailcfg.DERPMap DERPFn func() *tailcfg.DERPMap
Logger slog.Logger Logger slog.Logger
ClientID uuid.UUID ClientID uuid.UUID
// DatabaseHealthCheck is used to validate that the store is reachable.
DatabaseHealthCheck Pinger
}
func (a *InmemTailnetDialer) Dial(ctx context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) {
if a.DatabaseHealthCheck != nil {
if _, err := a.DatabaseHealthCheck.Ping(ctx); err != nil {
return tailnet.ControlProtocolClients{}, xerrors.Errorf("%w: %v", codersdk.ErrDatabaseNotReachable, err)
}
} }
func (a *InmemTailnetDialer) Dial(_ context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) {
coord := a.CoordPtr.Load() coord := a.CoordPtr.Load()
if coord == nil { if coord == nil {
return tailnet.ControlProtocolClients{}, xerrors.Errorf("tailnet coordinator not initialized") return tailnet.ControlProtocolClients{}, xerrors.Errorf("tailnet coordinator not initialized")
+41
View File
@@ -11,6 +11,7 @@ import (
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -18,6 +19,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"github.com/coder/coder/v2/agent" "github.com/coder/coder/v2/agent"
@@ -25,6 +27,7 @@ import (
"github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd" "github.com/coder/coder/v2/coderd"
"github.com/coder/coder/v2/coderd/workspaceapps/appurl" "github.com/coder/coder/v2/coderd/workspaceapps/appurl"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet"
@@ -365,6 +368,44 @@ func TestServerTailnet_ReverseProxy(t *testing.T) {
}) })
} }
func TestDialFailure(t *testing.T) {
t.Parallel()
// Setup.
ctx := testutil.Context(t, testutil.WaitShort)
logger := testutil.Logger(t)
// Given: a tailnet coordinator.
coord := tailnet.NewCoordinator(logger)
t.Cleanup(func() {
_ = coord.Close()
})
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
coordPtr.Store(&coord)
// Given: a fake DB healthchecker which will always fail.
fch := &failingHealthcheck{}
// When: dialing the in-memory coordinator.
dialer := &coderd.InmemTailnetDialer{
CoordPtr: &coordPtr,
Logger: logger,
ClientID: uuid.UUID{5},
DatabaseHealthCheck: fch,
}
_, err := dialer.Dial(ctx, nil)
// Then: the error returned reflects the database has failed its healthcheck.
require.ErrorIs(t, err, codersdk.ErrDatabaseNotReachable)
}
type failingHealthcheck struct{}
func (failingHealthcheck) Ping(context.Context) (time.Duration, error) {
// Simulate a database connection error.
return 0, xerrors.New("oops")
}
type wrappedListener struct { type wrappedListener struct {
net.Listener net.Listener
dials int32 dials int32
+10
View File
@@ -997,6 +997,16 @@ func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) {
func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.Request) { func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
// Ensure the database is reachable before proceeding.
_, err := api.Database.Ping(ctx)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: codersdk.DatabaseNotReachable,
Detail: err.Error(),
})
return
}
// This route accepts user API key auth and workspace proxy auth. The moon actor has // This route accepts user API key auth and workspace proxy auth. The moon actor has
// full permissions so should be able to pass this authz check. // full permissions so should be able to pass this authz check.
workspace := httpmw.WorkspaceParam(r) workspace := httpmw.WorkspaceParam(r)
+7
View File
@@ -0,0 +1,7 @@
package codersdk
import "golang.org/x/xerrors"
const DatabaseNotReachable = "database not reachable"
var ErrDatabaseNotReachable = xerrors.New(DatabaseNotReachable)
+8 -1
View File
@@ -11,17 +11,19 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"cdr.dev/slog" "cdr.dev/slog"
"github.com/coder/websocket"
"github.com/coder/coder/v2/buildinfo" "github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto" "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/websocket"
) )
var permanentErrorStatuses = []int{ var permanentErrorStatuses = []int{
http.StatusConflict, // returned if client/agent connections disabled (browser only) http.StatusConflict, // returned if client/agent connections disabled (browser only)
http.StatusBadRequest, // returned if API mismatch http.StatusBadRequest, // returned if API mismatch
http.StatusNotFound, // returned if user doesn't have permission or agent doesn't exist http.StatusNotFound, // returned if user doesn't have permission or agent doesn't exist
http.StatusInternalServerError, // returned if database is not reachable,
} }
type WebsocketDialer struct { type WebsocketDialer struct {
@@ -89,6 +91,11 @@ func (w *WebsocketDialer) Dial(ctx context.Context, r tailnet.ResumeTokenControl
"Ensure your client release version (%s, different than the API version) matches the server release version", "Ensure your client release version (%s, different than the API version) matches the server release version",
buildinfo.Version()) buildinfo.Version())
} }
if sdkErr.Message == codersdk.DatabaseNotReachable &&
sdkErr.StatusCode() == http.StatusInternalServerError {
err = xerrors.Errorf("%w: %v", codersdk.ErrDatabaseNotReachable, err)
}
} }
w.connected <- err w.connected <- err
return tailnet.ControlProtocolClients{}, err return tailnet.ControlProtocolClients{}, err
@@ -1,13 +1,21 @@
package workspacesdk_test package workspacesdk_test
import ( import (
"net/http"
"net/http/httptest"
"net/url" "net/url"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"github.com/coder/websocket"
"github.com/coder/coder/v2/coderd/httpapi"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/testutil"
) )
func TestWorkspaceRewriteDERPMap(t *testing.T) { func TestWorkspaceRewriteDERPMap(t *testing.T) {
@@ -37,3 +45,30 @@ func TestWorkspaceRewriteDERPMap(t *testing.T) {
require.Equal(t, "coconuts.org", node.HostName) require.Equal(t, "coconuts.org", node.HostName)
require.Equal(t, 44558, node.DERPPort) require.Equal(t, 44558, node.DERPPort)
} }
func TestWorkspaceDialerFailure(t *testing.T) {
t.Parallel()
// Setup.
ctx := testutil.Context(t, testutil.WaitShort)
logger := testutil.Logger(t)
// Given: a mock HTTP server which mimicks an unreachable database when calling the coordination endpoint.
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{
Message: codersdk.DatabaseNotReachable,
Detail: "oops",
})
}))
t.Cleanup(srv.Close)
u, err := url.Parse(srv.URL)
require.NoError(t, err)
// When: calling the coordination endpoint.
dialer := workspacesdk.NewWebsocketDialer(logger, u, &websocket.DialOptions{})
_, err = dialer.Dial(ctx, nil)
// Then: an error indicating a database issue is returned, to conditionalize the behavior of the caller.
require.ErrorIs(t, err, codersdk.ErrDatabaseNotReachable)
}
+21 -9
View File
@@ -20,12 +20,13 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"cdr.dev/slog" "cdr.dev/slog"
"github.com/coder/retry"
"github.com/coder/coder/v2/coderd/tracing" "github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/provisionerd/proto" "github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionerd/runner" "github.com/coder/coder/v2/provisionerd/runner"
sdkproto "github.com/coder/coder/v2/provisionersdk/proto" sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
"github.com/coder/retry"
) )
// Dialer represents the function to create a daemon client connection. // Dialer represents the function to create a daemon client connection.
@@ -290,7 +291,7 @@ func (p *Server) acquireLoop() {
defer p.wg.Done() defer p.wg.Done()
defer func() { close(p.acquireDoneCh) }() defer func() { close(p.acquireDoneCh) }()
ctx := p.closeContext ctx := p.closeContext
for { for retrier := retry.New(10*time.Millisecond, 1*time.Second); retrier.Wait(ctx); {
if p.acquireExit() { if p.acquireExit() {
return return
} }
@@ -299,7 +300,17 @@ func (p *Server) acquireLoop() {
p.opts.Logger.Debug(ctx, "shut down before client (re) connected") p.opts.Logger.Debug(ctx, "shut down before client (re) connected")
return return
} }
p.acquireAndRunOne(client) err := p.acquireAndRunOne(client)
if err != nil && ctx.Err() == nil { // Only log if context is not done.
// Short-circuit: don't wait for the retry delay to exit, if required.
if p.acquireExit() {
return
}
p.opts.Logger.Warn(ctx, "failed to acquire job, retrying", slog.F("delay", fmt.Sprintf("%vms", retrier.Delay.Milliseconds())), slog.Error(err))
} else {
// Reset the retrier after each successful acquisition.
retrier.Reset()
}
} }
} }
@@ -318,7 +329,7 @@ func (p *Server) acquireExit() bool {
return false return false
} }
func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) { func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) error {
ctx := p.closeContext ctx := p.closeContext
p.opts.Logger.Debug(ctx, "start of acquireAndRunOne") p.opts.Logger.Debug(ctx, "start of acquireAndRunOne")
job, err := p.acquireGraceful(client) job, err := p.acquireGraceful(client)
@@ -327,15 +338,15 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
if errors.Is(err, context.Canceled) || if errors.Is(err, context.Canceled) ||
errors.Is(err, yamux.ErrSessionShutdown) || errors.Is(err, yamux.ErrSessionShutdown) ||
errors.Is(err, fasthttputil.ErrInmemoryListenerClosed) { errors.Is(err, fasthttputil.ErrInmemoryListenerClosed) {
return return err
} }
p.opts.Logger.Warn(ctx, "provisionerd was unable to acquire job", slog.Error(err)) p.opts.Logger.Warn(ctx, "provisionerd was unable to acquire job", slog.Error(err))
return return xerrors.Errorf("failed to acquire job: %w", err)
} }
if job.JobId == "" { if job.JobId == "" {
p.opts.Logger.Debug(ctx, "acquire job successfully canceled") p.opts.Logger.Debug(ctx, "acquire job successfully canceled")
return return nil
} }
if len(job.TraceMetadata) > 0 { if len(job.TraceMetadata) > 0 {
@@ -392,9 +403,9 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
Error: fmt.Sprintf("failed to connect to provisioner: %s", resp.Error), Error: fmt.Sprintf("failed to connect to provisioner: %s", resp.Error),
}) })
if err != nil { if err != nil {
p.opts.Logger.Error(ctx, "provisioner job failed", slog.F("job_id", job.JobId), slog.Error(err)) p.opts.Logger.Error(ctx, "failed to report provisioner job failed", slog.F("job_id", job.JobId), slog.Error(err))
} }
return return xerrors.Errorf("failed to report provisioner job failed: %w", err)
} }
p.mutex.Lock() p.mutex.Lock()
@@ -418,6 +429,7 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
p.mutex.Lock() p.mutex.Lock()
p.activeJob = nil p.activeJob = nil
p.mutex.Unlock() p.mutex.Unlock()
return nil
} }
// acquireGraceful attempts to acquire a job from the server, handling canceling the acquisition if we gracefully shut // acquireGraceful attempts to acquire a job from the server, handling canceling the acquisition if we gracefully shut
+3
View File
@@ -591,6 +591,9 @@ export interface DangerousConfig {
readonly allow_all_cors: boolean; readonly allow_all_cors: boolean;
} }
// From codersdk/database.go
export const DatabaseNotReachable = "database not reachable";
// From healthsdk/healthsdk.go // From healthsdk/healthsdk.go
export interface DatabaseReport extends BaseReport { export interface DatabaseReport extends BaseReport {
readonly healthy: boolean; readonly healthy: boolean;
+12 -2
View File
@@ -2,6 +2,7 @@ package tailnet
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"maps" "maps"
@@ -21,11 +22,12 @@ import (
"tailscale.com/util/dnsname" "tailscale.com/util/dnsname"
"cdr.dev/slog" "cdr.dev/slog"
"github.com/coder/quartz"
"github.com/coder/retry"
"github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/tailnet/proto" "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/quartz"
"github.com/coder/retry"
) )
// A Controller connects to the tailnet control plane, and then uses the control protocols to // A Controller connects to the tailnet control plane, and then uses the control protocols to
@@ -1381,6 +1383,14 @@ func (c *Controller) Run(ctx context.Context) {
if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) { if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
return return
} }
// If the database is unreachable by the control plane, there's not much we can do, so we'll just retry later.
if errors.Is(err, codersdk.ErrDatabaseNotReachable) {
c.logger.Warn(c.ctx, "control plane lost connection to database, retrying",
slog.Error(err), slog.F("delay", fmt.Sprintf("%vms", retrier.Delay.Milliseconds())))
continue
}
errF := slog.Error(err) errF := slog.Error(err)
var sdkErr *codersdk.Error var sdkErr *codersdk.Error
if xerrors.As(err, &sdkErr) { if xerrors.As(err, &sdkErr) {