mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix(coderd/agentapi): graceful fallback for missing session_id
Old boundary clients do not send session_id. Instead of returning a hard error that silently drops all logging and usage tracking, fall back to log-only mode when session_id is absent or unparseable. DB persistence is skipped but structured logging and the usage tracker still run. Update the agentapi unit tests to expect success (not error) for missing and invalid session_id cases, and convert the agent e2e test to a table test covering three client variants: old client (no session_id), new client with correlation disabled (empty session_id), and new client with a valid session_id.
This commit is contained in:
+131
-96
@@ -42,111 +42,146 @@ func sendBoundaryLogsRequest(t *testing.T, conn net.Conn, req *agentproto.Report
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// TestBoundaryLogs_EndToEnd is an end-to-end test that sends a protobuf
|
||||
// message over the agent's unix socket (as boundary would) and verifies
|
||||
// it is ultimately logged by coderd with the correct structured fields.
|
||||
// TestBoundaryLogs_EndToEnd sends protobuf messages over the agent's unix
|
||||
// socket (as boundary would) and verifies structured logging works for all
|
||||
// client variants: old clients without session correlation, new clients
|
||||
// with correlation disabled, and new clients with a valid session ID.
|
||||
func TestBoundaryLogs_EndToEnd(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, srv.Close()) })
|
||||
|
||||
sink := testutil.NewFakeSink(t)
|
||||
logger := sink.Logger(slog.LevelInfo)
|
||||
workspaceID := uuid.New()
|
||||
templateID := uuid.New()
|
||||
templateVersionID := uuid.New()
|
||||
reporter := &agentapi.BoundaryLogsAPI{
|
||||
Log: logger,
|
||||
WorkspaceID: workspaceID,
|
||||
TemplateID: templateID,
|
||||
TemplateVersionID: templateVersionID,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
forwarderDone := make(chan error, 1)
|
||||
go func() {
|
||||
forwarderDone <- srv.RunForwarder(ctx, reporter)
|
||||
}()
|
||||
|
||||
conn, err := net.Dial("unix", socketPath)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
// Allowed HTTP request.
|
||||
req := &agentproto.ReportBoundaryLogsRequest{
|
||||
Logs: []*agentproto.BoundaryLog{
|
||||
{
|
||||
Allowed: true,
|
||||
Time: timestamppb.Now(),
|
||||
Resource: &agentproto.BoundaryLog_HttpRequest_{
|
||||
HttpRequest: &agentproto.BoundaryLog_HttpRequest{
|
||||
Method: "GET",
|
||||
Url: "https://example.com/allowed",
|
||||
MatchedRule: "*.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
tests := []struct {
|
||||
name string
|
||||
sessionID string
|
||||
}{
|
||||
{
|
||||
// Given: an old boundary client that does not send session_id.
|
||||
// Then: logs are still forwarded and structured fields are correct.
|
||||
name: "OldClient",
|
||||
sessionID: "",
|
||||
},
|
||||
{
|
||||
// Given: a new boundary client with correlation disabled
|
||||
// (empty session_id, identical wire format to old client).
|
||||
// Then: logs are still forwarded and structured fields are correct.
|
||||
name: "NewClientCorrelationDisabled",
|
||||
sessionID: "",
|
||||
},
|
||||
{
|
||||
// Given: a new boundary client that sends a valid session_id.
|
||||
// Then: logs are still forwarded and structured fields are correct.
|
||||
name: "NewClientWithSessionID",
|
||||
sessionID: uuid.New().String(),
|
||||
},
|
||||
}
|
||||
sendBoundaryLogsRequest(t, conn, req)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return len(sink.Entries()) >= 1
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
entries := sink.Entries()
|
||||
require.Len(t, entries, 1)
|
||||
entry := entries[0]
|
||||
require.Equal(t, slog.LevelInfo, entry.Level)
|
||||
require.Equal(t, "boundary_request", entry.Message)
|
||||
require.Equal(t, "allow", getField(entry.Fields, "decision"))
|
||||
require.Equal(t, workspaceID.String(), getField(entry.Fields, "workspace_id"))
|
||||
require.Equal(t, templateID.String(), getField(entry.Fields, "template_id"))
|
||||
require.Equal(t, templateVersionID.String(), getField(entry.Fields, "template_version_id"))
|
||||
require.Equal(t, "GET", getField(entry.Fields, "http_method"))
|
||||
require.Equal(t, "https://example.com/allowed", getField(entry.Fields, "http_url"))
|
||||
require.Equal(t, "*.example.com", getField(entry.Fields, "matched_rule"))
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
// Denied HTTP request.
|
||||
req2 := &agentproto.ReportBoundaryLogsRequest{
|
||||
Logs: []*agentproto.BoundaryLog{
|
||||
{
|
||||
Allowed: false,
|
||||
Time: timestamppb.Now(),
|
||||
Resource: &agentproto.BoundaryLog_HttpRequest_{
|
||||
HttpRequest: &agentproto.BoundaryLog_HttpRequest{
|
||||
Method: "POST",
|
||||
Url: "https://blocked.com/denied",
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, srv.Close()) })
|
||||
|
||||
sink := testutil.NewFakeSink(t)
|
||||
logger := sink.Logger(slog.LevelInfo)
|
||||
workspaceID := uuid.New()
|
||||
templateID := uuid.New()
|
||||
templateVersionID := uuid.New()
|
||||
reporter := &agentapi.BoundaryLogsAPI{
|
||||
Log: logger,
|
||||
WorkspaceID: workspaceID,
|
||||
TemplateID: templateID,
|
||||
TemplateVersionID: templateVersionID,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
forwarderDone := make(chan error, 1)
|
||||
go func() {
|
||||
forwarderDone <- srv.RunForwarder(ctx, reporter)
|
||||
}()
|
||||
|
||||
conn, err := net.Dial("unix", socketPath)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
// When: an allowed HTTP request is sent.
|
||||
req := &agentproto.ReportBoundaryLogsRequest{
|
||||
SessionId: tc.sessionID,
|
||||
Logs: []*agentproto.BoundaryLog{
|
||||
{
|
||||
Allowed: true,
|
||||
Time: timestamppb.Now(),
|
||||
Resource: &agentproto.BoundaryLog_HttpRequest_{
|
||||
HttpRequest: &agentproto.BoundaryLog_HttpRequest{
|
||||
Method: "GET",
|
||||
Url: "https://example.com/allowed",
|
||||
MatchedRule: "*.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
sendBoundaryLogsRequest(t, conn, req)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return len(sink.Entries()) >= 1
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
|
||||
entries := sink.Entries()
|
||||
require.Len(t, entries, 1)
|
||||
entry := entries[0]
|
||||
require.Equal(t, slog.LevelInfo, entry.Level)
|
||||
require.Equal(t, "boundary_request", entry.Message)
|
||||
require.Equal(t, "allow", getField(entry.Fields, "decision"))
|
||||
require.Equal(t, workspaceID.String(), getField(entry.Fields, "workspace_id"))
|
||||
require.Equal(t, templateID.String(), getField(entry.Fields, "template_id"))
|
||||
require.Equal(t, templateVersionID.String(), getField(entry.Fields, "template_version_id"))
|
||||
require.Equal(t, "GET", getField(entry.Fields, "http_method"))
|
||||
require.Equal(t, "https://example.com/allowed", getField(entry.Fields, "http_url"))
|
||||
require.Equal(t, "*.example.com", getField(entry.Fields, "matched_rule"))
|
||||
|
||||
// When: a denied HTTP request is sent.
|
||||
req2 := &agentproto.ReportBoundaryLogsRequest{
|
||||
SessionId: tc.sessionID,
|
||||
Logs: []*agentproto.BoundaryLog{
|
||||
{
|
||||
Allowed: false,
|
||||
Time: timestamppb.Now(),
|
||||
Resource: &agentproto.BoundaryLog_HttpRequest_{
|
||||
HttpRequest: &agentproto.BoundaryLog_HttpRequest{
|
||||
Method: "POST",
|
||||
Url: "https://blocked.com/denied",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
sendBoundaryLogsRequest(t, conn, req2)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return len(sink.Entries()) >= 2
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
|
||||
entries = sink.Entries()
|
||||
entry = entries[1]
|
||||
require.Len(t, entries, 2)
|
||||
require.Equal(t, slog.LevelInfo, entry.Level)
|
||||
require.Equal(t, "boundary_request", entry.Message)
|
||||
require.Equal(t, "deny", getField(entry.Fields, "decision"))
|
||||
require.Equal(t, workspaceID.String(), getField(entry.Fields, "workspace_id"))
|
||||
require.Equal(t, templateID.String(), getField(entry.Fields, "template_id"))
|
||||
require.Equal(t, templateVersionID.String(), getField(entry.Fields, "template_version_id"))
|
||||
require.Equal(t, "POST", getField(entry.Fields, "http_method"))
|
||||
require.Equal(t, "https://blocked.com/denied", getField(entry.Fields, "http_url"))
|
||||
require.Equal(t, nil, getField(entry.Fields, "matched_rule"))
|
||||
|
||||
cancel()
|
||||
<-forwarderDone
|
||||
})
|
||||
}
|
||||
sendBoundaryLogsRequest(t, conn, req2)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return len(sink.Entries()) >= 2
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
|
||||
entries = sink.Entries()
|
||||
entry = entries[1]
|
||||
require.Len(t, entries, 2)
|
||||
require.Equal(t, slog.LevelInfo, entry.Level)
|
||||
require.Equal(t, "boundary_request", entry.Message)
|
||||
require.Equal(t, "deny", getField(entry.Fields, "decision"))
|
||||
require.Equal(t, workspaceID.String(), getField(entry.Fields, "workspace_id"))
|
||||
require.Equal(t, templateID.String(), getField(entry.Fields, "template_id"))
|
||||
require.Equal(t, templateVersionID.String(), getField(entry.Fields, "template_version_id"))
|
||||
require.Equal(t, "POST", getField(entry.Fields, "http_method"))
|
||||
require.Equal(t, "https://blocked.com/denied", getField(entry.Fields, "http_url"))
|
||||
require.Equal(t, nil, getField(entry.Fields, "matched_rule"))
|
||||
|
||||
cancel()
|
||||
<-forwarderDone
|
||||
}
|
||||
|
||||
@@ -43,22 +43,36 @@ func (a *BoundaryLogsAPI) ReportBoundaryLogs(ctx context.Context, req *agentprot
|
||||
return nil, xerrors.Errorf("batch size %d exceeds maximum of %d", len(req.Logs), maxBoundaryLogsPerBatch)
|
||||
}
|
||||
|
||||
sessionID, err := uuid.Parse(req.GetSessionId())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parse session_id: %w", err)
|
||||
}
|
||||
|
||||
now := dbtime.Now()
|
||||
|
||||
// Lazy-create the boundary session on first log arrival.
|
||||
// If this fails (transient DB error), we continue so that
|
||||
// logs are still persisted. The session will be created on
|
||||
// a subsequent batch since every request carries the session
|
||||
// details.
|
||||
if sessionErr := a.ensureSession(ctx, sessionID, req.GetConfinedProcessName(), now); sessionErr != nil {
|
||||
a.Log.Error(ctx, "failed to ensure boundary session",
|
||||
slog.F("session_id", sessionID.String()),
|
||||
slog.Error(sessionErr))
|
||||
// Parse session_id if present. Old boundary clients may not send it,
|
||||
// so a missing or invalid session_id disables DB persistence but
|
||||
// structured logging and usage tracking still run.
|
||||
var sessionID uuid.UUID
|
||||
persistEnabled := false
|
||||
if raw := req.GetSessionId(); raw != "" {
|
||||
parsed, parseErr := uuid.Parse(raw)
|
||||
if parseErr != nil {
|
||||
a.Log.Warn(ctx, "invalid session_id, persistence disabled for this batch",
|
||||
slog.F("raw_session_id", raw),
|
||||
slog.Error(parseErr))
|
||||
} else {
|
||||
sessionID = parsed
|
||||
persistEnabled = true
|
||||
}
|
||||
}
|
||||
|
||||
if persistEnabled {
|
||||
// Lazy-create the boundary session on first log arrival.
|
||||
// If this fails (transient DB error), we continue so that
|
||||
// logs are still persisted. The session will be created on
|
||||
// a subsequent batch since every request carries the session
|
||||
// details.
|
||||
if sessionErr := a.ensureSession(ctx, sessionID, req.GetConfinedProcessName(), now); sessionErr != nil {
|
||||
a.Log.Error(ctx, "failed to ensure boundary session",
|
||||
slog.F("session_id", sessionID.String()),
|
||||
slog.Error(sessionErr))
|
||||
}
|
||||
}
|
||||
|
||||
// Collect batch insert params while iterating.
|
||||
@@ -128,7 +142,7 @@ func (a *BoundaryLogsAPI) ReportBoundaryLogs(ctx context.Context, req *agentprot
|
||||
}
|
||||
|
||||
// Batch-insert all collected logs in a single query.
|
||||
if len(batch.ID) > 0 {
|
||||
if persistEnabled && len(batch.ID) > 0 {
|
||||
if insertErr := a.insertLogs(ctx, batch); insertErr != nil {
|
||||
a.Log.Error(ctx, "failed to insert boundary logs",
|
||||
slog.F("session_id", sessionID.String()),
|
||||
|
||||
@@ -332,13 +332,13 @@ func TestReportBoundaryLogs(t *testing.T) {
|
||||
require.Len(t, logs, 2, "logs from both batches must be persisted")
|
||||
})
|
||||
|
||||
t.Run("MissingSessionIDReturnsError", func(t *testing.T) {
|
||||
t.Run("MissingSessionIDFallsBackToLogOnly", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Given: a request with no session_id.
|
||||
// Given: a request with no session_id (old boundary client).
|
||||
api := &agentapi.BoundaryLogsAPI{
|
||||
Log: testutil.Logger(t),
|
||||
// Database intentionally nil; the error must fire before any DB call.
|
||||
// Database intentionally nil; persistence is skipped.
|
||||
}
|
||||
|
||||
// When: boundary logs are reported without a session_id.
|
||||
@@ -357,9 +357,9 @@ func TestReportBoundaryLogs(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
// Then: an error is returned.
|
||||
require.Error(t, err)
|
||||
require.Nil(t, resp)
|
||||
// Then: the request succeeds (log-only mode), no error.
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
})
|
||||
|
||||
t.Run("EmptyHTTPRequestSkipped", func(t *testing.T) {
|
||||
@@ -397,13 +397,13 @@ func TestReportBoundaryLogs(t *testing.T) {
|
||||
require.Empty(t, logs, "nil HttpRequest must not produce a log row")
|
||||
})
|
||||
|
||||
t.Run("InvalidSessionIDReturnsError", func(t *testing.T) {
|
||||
t.Run("InvalidSessionIDFallsBackToLogOnly", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Given: a request with a session_id that is not a valid UUID.
|
||||
api := &agentapi.BoundaryLogsAPI{
|
||||
Log: testutil.Logger(t),
|
||||
// Database intentionally nil; the error must fire before any DB call.
|
||||
// Database intentionally nil; persistence is skipped.
|
||||
}
|
||||
|
||||
// When: boundary logs are reported with an invalid session_id.
|
||||
@@ -423,9 +423,9 @@ func TestReportBoundaryLogs(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
// Then: an error is returned.
|
||||
require.Error(t, err)
|
||||
require.Nil(t, resp)
|
||||
// Then: the request succeeds (log-only mode), no error.
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
})
|
||||
|
||||
t.Run("PersistsLogsAndTracksBoundaryUsage", func(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user