chore!: route connection logs to new table (#18340)

### Breaking Change (changelog note):
> User connections to workspaces, and the opening of workspace apps or ports will no longer create entries in the audit log. Those events will now be included in the 'Connection Log'.
Please see the 'Connection Log' page in the dashboard, and the Connection Log [documentation](https://coder.com/docs/admin/monitoring/connection-logs) for details. Those with permission to view the Audit Log will also be able to view the Connection Log. The new Connection Log has the same licensing restrictions as the Audit Log, and requires a Premium Coder deployment.

### Context

This is the first PR of a few for moving connection events out of the audit log, and into a new database table and web UI page called the 'Connection Log'.

This PR:
- Creates the new table
- Adds and tests queries for inserting and reading, including reading with an RBAC filter.
- Implements the corresponding RBAC changes, such that anyone who can view the audit log can read from the table
- Implements, under the enterprise package, a `ConnectionLogger` abstraction to replace the `Auditor` abstraction for these logs. (No-op'd in AGPL, like the `Auditor`)
- Routes SSH connection and Workspace App events into the new `ConnectionLogger`
- Updates all existing tests to check the values of the `ConnectionLogger` instead of the `Auditor`.

Future PRs:
- Add filtering to the query
- Add an enterprise endpoint to query the new table
- Write a query to delete old events from the audit log, call it from dbpurge.
- Implement a table in the Web UI for viewing connection logs.


> [!NOTE]
> The PRs in this stack obviously won't be (completely) atomic. Whilst they'll each pass CI, the stack is designed to be merged all at once. I'm splitting them up for the sake of those reviewing, and so changes can be reviewed as early as possible.  Despite this, it's really hard to make this PR any smaller than it already is. I'll be keeping it in draft until it's actually ready to merge.
This commit is contained in:
Ethan
2025-07-15 14:36:06 +10:00
committed by GitHub
parent 43b0bb7f61
commit 08e17a07fc
54 changed files with 2199 additions and 493 deletions
+8 -8
View File
@@ -19,7 +19,7 @@ import (
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor"
"github.com/coder/coder/v2/coderd/appearance"
"github.com/coder/coder/v2/coderd/audit"
"github.com/coder/coder/v2/coderd/connectionlog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
@@ -50,7 +50,7 @@ type API struct {
*ResourcesMonitoringAPI
*LogsAPI
*ScriptsAPI
*AuditAPI
*ConnLogAPI
*SubAgentAPI
*tailnet.DRPCService
@@ -71,7 +71,7 @@ type Options struct {
Database database.Store
NotificationsEnqueuer notifications.Enqueuer
Pubsub pubsub.Pubsub
Auditor *atomic.Pointer[audit.Auditor]
ConnectionLogger *atomic.Pointer[connectionlog.ConnectionLogger]
DerpMapFn func() *tailcfg.DERPMap
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
StatsReporter *workspacestats.Reporter
@@ -180,11 +180,11 @@ func New(opts Options) *API {
Database: opts.Database,
}
api.AuditAPI = &AuditAPI{
AgentFn: api.agent,
Auditor: opts.Auditor,
Database: opts.Database,
Log: opts.Log,
api.ConnLogAPI = &ConnLogAPI{
AgentFn: api.agent,
ConnectionLogger: opts.ConnectionLogger,
Database: opts.Database,
Log: opts.Log,
}
api.DRPCService = &tailnet.DRPCService{
-105
View File
@@ -1,105 +0,0 @@
package agentapi
import (
"context"
"encoding/json"
"strconv"
"sync/atomic"
"github.com/google/uuid"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/emptypb"
"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/audit"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/db2sdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
)
type AuditAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error)
Auditor *atomic.Pointer[audit.Auditor]
Database database.Store
Log slog.Logger
}
func (a *AuditAPI) ReportConnection(ctx context.Context, req *agentproto.ReportConnectionRequest) (*emptypb.Empty, error) {
// We will use connection ID as request ID, typically this is the
// SSH session ID as reported by the agent.
connectionID, err := uuid.FromBytes(req.GetConnection().GetId())
if err != nil {
return nil, xerrors.Errorf("connection id from bytes: %w", err)
}
action, err := db2sdk.AuditActionFromAgentProtoConnectionAction(req.GetConnection().GetAction())
if err != nil {
return nil, err
}
connectionType, err := agentsdk.ConnectionTypeFromProto(req.GetConnection().GetType())
if err != nil {
return nil, err
}
// Fetch contextual data for this audit event.
workspaceAgent, err := a.AgentFn(ctx)
if err != nil {
return nil, xerrors.Errorf("get agent: %w", err)
}
workspace, err := a.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
if err != nil {
return nil, xerrors.Errorf("get workspace by agent id: %w", err)
}
build, err := a.Database.GetLatestWorkspaceBuildByWorkspaceID(ctx, workspace.ID)
if err != nil {
return nil, xerrors.Errorf("get latest workspace build by workspace id: %w", err)
}
// We pass the below information to the Auditor so that it
// can form a friendly string for the user to view in the UI.
type additionalFields struct {
audit.AdditionalFields
ConnectionType agentsdk.ConnectionType `json:"connection_type"`
Reason string `json:"reason,omitempty"`
}
resourceInfo := additionalFields{
AdditionalFields: audit.AdditionalFields{
WorkspaceID: workspace.ID,
WorkspaceName: workspace.Name,
WorkspaceOwner: workspace.OwnerUsername,
BuildNumber: strconv.FormatInt(int64(build.BuildNumber), 10),
BuildReason: database.BuildReason(string(build.Reason)),
},
ConnectionType: connectionType,
Reason: req.GetConnection().GetReason(),
}
riBytes, err := json.Marshal(resourceInfo)
if err != nil {
a.Log.Error(ctx, "marshal resource info for agent connection failed", slog.Error(err))
riBytes = []byte("{}")
}
audit.BackgroundAudit(ctx, &audit.BackgroundAuditParams[database.WorkspaceAgent]{
Audit: *a.Auditor.Load(),
Log: a.Log,
Time: req.GetConnection().GetTimestamp().AsTime(),
OrganizationID: workspace.OrganizationID,
RequestID: connectionID,
Action: action,
New: workspaceAgent,
Old: workspaceAgent,
IP: req.GetConnection().GetIp(),
Status: int(req.GetConnection().GetStatusCode()),
AdditionalFields: riBytes,
// It's not possible to tell which user connected. Once we have
// the capability, this may be reported by the agent.
UserID: uuid.Nil,
})
return &emptypb.Empty{}, nil
}
+106
View File
@@ -0,0 +1,106 @@
package agentapi
import (
"context"
"database/sql"
"sync/atomic"
"github.com/google/uuid"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/emptypb"
"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/connectionlog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/db2sdk"
)
type ConnLogAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error)
ConnectionLogger *atomic.Pointer[connectionlog.ConnectionLogger]
Database database.Store
Log slog.Logger
}
func (a *ConnLogAPI) ReportConnection(ctx context.Context, req *agentproto.ReportConnectionRequest) (*emptypb.Empty, error) {
// We use the connection ID to identify which connection log event to mark
// as closed, when we receive a close action for that ID.
connectionID, err := uuid.FromBytes(req.GetConnection().GetId())
if err != nil {
return nil, xerrors.Errorf("connection id from bytes: %w", err)
}
if connectionID == uuid.Nil {
return nil, xerrors.New("connection ID cannot be nil")
}
action, err := db2sdk.ConnectionLogStatusFromAgentProtoConnectionAction(req.GetConnection().GetAction())
if err != nil {
return nil, err
}
connectionType, err := db2sdk.ConnectionLogConnectionTypeFromAgentProtoConnectionType(req.GetConnection().GetType())
if err != nil {
return nil, err
}
var code sql.NullInt32
if action == database.ConnectionStatusDisconnected {
code = sql.NullInt32{
Int32: req.GetConnection().GetStatusCode(),
Valid: true,
}
}
// Fetch contextual data for this connection log event.
workspaceAgent, err := a.AgentFn(ctx)
if err != nil {
return nil, xerrors.Errorf("get agent: %w", err)
}
workspace, err := a.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
if err != nil {
return nil, xerrors.Errorf("get workspace by agent id: %w", err)
}
reason := req.GetConnection().GetReason()
connLogger := *a.ConnectionLogger.Load()
err = connLogger.Upsert(ctx, database.UpsertConnectionLogParams{
ID: uuid.New(),
Time: req.GetConnection().GetTimestamp().AsTime(),
OrganizationID: workspace.OrganizationID,
WorkspaceOwnerID: workspace.OwnerID,
WorkspaceID: workspace.ID,
WorkspaceName: workspace.Name,
AgentName: workspaceAgent.Name,
Type: connectionType,
Code: code,
Ip: database.ParseIP(req.GetConnection().GetIp()),
ConnectionID: uuid.NullUUID{
UUID: connectionID,
Valid: true,
},
DisconnectReason: sql.NullString{
String: reason,
Valid: reason != "",
},
// We supply the action:
// - So the DB can handle duplicate connections or disconnections properly.
// - To make it clear whether this is a connection or disconnection
// prior to it's insertion into the DB (logs)
ConnectionStatus: action,
// It's not possible to tell which user connected. Once we have
// the capability, this may be reported by the agent.
UserID: uuid.NullUUID{
Valid: false,
},
// N/A
UserAgent: sql.NullString{},
// N/A
SlugOrPort: sql.NullString{},
})
if err != nil {
return nil, xerrors.Errorf("export connection log: %w", err)
}
return &emptypb.Empty{}, nil
}
@@ -2,7 +2,7 @@ package agentapi_test
import (
"context"
"encoding/json"
"database/sql"
"net"
"sync/atomic"
"testing"
@@ -16,15 +16,14 @@ import (
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/agentapi"
"github.com/coder/coder/v2/coderd/audit"
"github.com/coder/coder/v2/coderd/connectionlog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/db2sdk"
"github.com/coder/coder/v2/coderd/database/dbmock"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk/agentsdk"
)
func TestAuditReport(t *testing.T) {
func TestConnectionLog(t *testing.T) {
t.Parallel()
var (
@@ -38,10 +37,6 @@ func TestAuditReport(t *testing.T) {
OwnerID: owner.ID,
Name: "cool-workspace",
}
build = database.WorkspaceBuild{
ID: uuid.New(),
WorkspaceID: workspace.ID,
}
agent = database.WorkspaceAgent{
ID: uuid.New(),
}
@@ -62,7 +57,7 @@ func TestAuditReport(t *testing.T) {
id: uuid.New(),
action: agentproto.Connection_CONNECT.Enum(),
typ: agentproto.Connection_SSH.Enum(),
time: time.Now(),
time: dbtime.Now(),
ip: "127.0.0.1",
status: 200,
},
@@ -71,7 +66,7 @@ func TestAuditReport(t *testing.T) {
id: uuid.New(),
action: agentproto.Connection_CONNECT.Enum(),
typ: agentproto.Connection_VSCODE.Enum(),
time: time.Now(),
time: dbtime.Now(),
ip: "8.8.8.8",
},
{
@@ -79,28 +74,28 @@ func TestAuditReport(t *testing.T) {
id: uuid.New(),
action: agentproto.Connection_CONNECT.Enum(),
typ: agentproto.Connection_JETBRAINS.Enum(),
time: time.Now(),
time: dbtime.Now(),
},
{
name: "Reconnecting PTY Connect",
id: uuid.New(),
action: agentproto.Connection_CONNECT.Enum(),
typ: agentproto.Connection_RECONNECTING_PTY.Enum(),
time: time.Now(),
time: dbtime.Now(),
},
{
name: "SSH Disconnect",
id: uuid.New(),
action: agentproto.Connection_DISCONNECT.Enum(),
typ: agentproto.Connection_SSH.Enum(),
time: time.Now(),
time: dbtime.Now(),
},
{
name: "SSH Disconnect",
id: uuid.New(),
action: agentproto.Connection_DISCONNECT.Enum(),
typ: agentproto.Connection_SSH.Enum(),
time: time.Now(),
time: dbtime.Now(),
status: 500,
reason: "because error says so",
},
@@ -110,15 +105,14 @@ func TestAuditReport(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
mAudit := audit.NewMock()
connLogger := connectionlog.NewFake()
mDB := dbmock.NewMockStore(gomock.NewController(t))
mDB.EXPECT().GetWorkspaceByAgentID(gomock.Any(), agent.ID).Return(workspace, nil)
mDB.EXPECT().GetLatestWorkspaceBuildByWorkspaceID(gomock.Any(), workspace.ID).Return(build, nil)
api := &agentapi.AuditAPI{
Auditor: asAtomicPointer[audit.Auditor](mAudit),
Database: mDB,
api := &agentapi.ConnLogAPI{
ConnectionLogger: asAtomicPointer[connectionlog.ConnectionLogger](connLogger),
Database: mDB,
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
@@ -135,41 +129,48 @@ func TestAuditReport(t *testing.T) {
},
})
require.True(t, mAudit.Contains(t, database.AuditLog{
Time: dbtime.Time(tt.time).In(time.UTC),
Action: agentProtoConnectionActionToAudit(t, *tt.action),
OrganizationID: workspace.OrganizationID,
UserID: uuid.Nil,
RequestID: tt.id,
ResourceType: database.ResourceTypeWorkspaceAgent,
ResourceID: agent.ID,
ResourceTarget: agent.Name,
Ip: pqtype.Inet{Valid: true, IPNet: net.IPNet{IP: net.ParseIP(tt.ip), Mask: net.CIDRMask(32, 32)}},
StatusCode: tt.status,
}))
require.True(t, connLogger.Contains(t, database.UpsertConnectionLogParams{
Time: dbtime.Time(tt.time).In(time.UTC),
OrganizationID: workspace.OrganizationID,
WorkspaceOwnerID: workspace.OwnerID,
WorkspaceID: workspace.ID,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
UserID: uuid.NullUUID{
UUID: uuid.Nil,
Valid: false,
},
ConnectionStatus: agentProtoConnectionActionToConnectionLog(t, *tt.action),
// Check some additional fields.
var m map[string]any
err := json.Unmarshal(mAudit.AuditLogs()[0].AdditionalFields, &m)
require.NoError(t, err)
require.Equal(t, string(agentProtoConnectionTypeToSDK(t, *tt.typ)), m["connection_type"].(string))
if tt.reason != "" {
require.Equal(t, tt.reason, m["reason"])
}
Code: sql.NullInt32{
Int32: tt.status,
Valid: *tt.action == agentproto.Connection_DISCONNECT,
},
Ip: pqtype.Inet{Valid: true, IPNet: net.IPNet{IP: net.ParseIP(tt.ip), Mask: net.CIDRMask(32, 32)}},
Type: agentProtoConnectionTypeToConnectionLog(t, *tt.typ),
DisconnectReason: sql.NullString{
String: tt.reason,
Valid: tt.reason != "",
},
ConnectionID: uuid.NullUUID{
UUID: tt.id,
Valid: tt.id != uuid.Nil,
},
}))
})
}
}
func agentProtoConnectionActionToAudit(t *testing.T, action agentproto.Connection_Action) database.AuditAction {
a, err := db2sdk.AuditActionFromAgentProtoConnectionAction(action)
func agentProtoConnectionTypeToConnectionLog(t *testing.T, typ agentproto.Connection_Type) database.ConnectionType {
a, err := db2sdk.ConnectionLogConnectionTypeFromAgentProtoConnectionType(typ)
require.NoError(t, err)
return a
}
func agentProtoConnectionTypeToSDK(t *testing.T, typ agentproto.Connection_Type) agentsdk.ConnectionType {
action, err := agentsdk.ConnectionTypeFromProto(typ)
func agentProtoConnectionActionToConnectionLog(t *testing.T, action agentproto.Connection_Action) database.ConnectionStatus {
a, err := db2sdk.ConnectionLogStatusFromAgentProtoConnectionAction(action)
require.NoError(t, err)
return action
return a
}
func asAtomicPointer[T any](v T) *atomic.Pointer[T] {