mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
feat: purge old provisioner daemons (#10949)
This commit is contained in:
@@ -791,6 +791,13 @@ func (q *querier) DeleteLicense(ctx context.Context, id int32) (int32, error) {
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (q *querier) DeleteOldProvisionerDaemons(ctx context.Context) error {
|
||||
if err := q.authorizeContext(ctx, rbac.ActionDelete, rbac.ResourceSystem); err != nil {
|
||||
return err
|
||||
}
|
||||
return q.db.DeleteOldProvisionerDaemons(ctx)
|
||||
}
|
||||
|
||||
func (q *querier) DeleteOldWorkspaceAgentLogs(ctx context.Context) error {
|
||||
if err := q.authorizeContext(ctx, rbac.ActionDelete, rbac.ResourceSystem); err != nil {
|
||||
return err
|
||||
|
||||
@@ -1112,8 +1112,27 @@ func (q *FakeQuerier) DeleteLicense(_ context.Context, id int32) (int32, error)
|
||||
return 0, sql.ErrNoRows
|
||||
}
|
||||
|
||||
func (q *FakeQuerier) DeleteOldProvisionerDaemons(_ context.Context) error {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
|
||||
now := dbtime.Now()
|
||||
weekInterval := 7 * 24 * time.Hour
|
||||
weekAgo := now.Add(-weekInterval)
|
||||
|
||||
var validDaemons []database.ProvisionerDaemon
|
||||
for _, p := range q.provisionerDaemons {
|
||||
if (p.CreatedAt.Before(weekAgo) && !p.UpdatedAt.Valid) || (p.UpdatedAt.Valid && p.UpdatedAt.Time.Before(weekAgo)) {
|
||||
continue
|
||||
}
|
||||
validDaemons = append(validDaemons, p)
|
||||
}
|
||||
q.provisionerDaemons = validDaemons
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*FakeQuerier) DeleteOldWorkspaceAgentLogs(_ context.Context) error {
|
||||
// noop
|
||||
// no-op
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -4845,6 +4864,7 @@ func (q *FakeQuerier) InsertProvisionerDaemon(_ context.Context, arg database.In
|
||||
Name: arg.Name,
|
||||
Provisioners: arg.Provisioners,
|
||||
Tags: arg.Tags,
|
||||
UpdatedAt: arg.UpdatedAt,
|
||||
}
|
||||
q.provisionerDaemons = append(q.provisionerDaemons, daemon)
|
||||
return daemon, nil
|
||||
|
||||
@@ -211,6 +211,13 @@ func (m metricsStore) DeleteLicense(ctx context.Context, id int32) (int32, error
|
||||
return licenseID, err
|
||||
}
|
||||
|
||||
func (m metricsStore) DeleteOldProvisionerDaemons(ctx context.Context) error {
|
||||
start := time.Now()
|
||||
r0 := m.s.DeleteOldProvisionerDaemons(ctx)
|
||||
m.queryLatencies.WithLabelValues("DeleteOldProvisionerDaemons").Observe(time.Since(start).Seconds())
|
||||
return r0
|
||||
}
|
||||
|
||||
func (m metricsStore) DeleteOldWorkspaceAgentLogs(ctx context.Context) error {
|
||||
start := time.Now()
|
||||
r0 := m.s.DeleteOldWorkspaceAgentLogs(ctx)
|
||||
|
||||
@@ -309,6 +309,20 @@ func (mr *MockStoreMockRecorder) DeleteLicense(arg0, arg1 interface{}) *gomock.C
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteLicense", reflect.TypeOf((*MockStore)(nil).DeleteLicense), arg0, arg1)
|
||||
}
|
||||
|
||||
// DeleteOldProvisionerDaemons mocks base method.
|
||||
func (m *MockStore) DeleteOldProvisionerDaemons(arg0 context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "DeleteOldProvisionerDaemons", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// DeleteOldProvisionerDaemons indicates an expected call of DeleteOldProvisionerDaemons.
|
||||
func (mr *MockStoreMockRecorder) DeleteOldProvisionerDaemons(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteOldProvisionerDaemons", reflect.TypeOf((*MockStore)(nil).DeleteOldProvisionerDaemons), arg0)
|
||||
}
|
||||
|
||||
// DeleteOldWorkspaceAgentLogs mocks base method.
|
||||
func (m *MockStore) DeleteOldWorkspaceAgentLogs(arg0 context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
||||
@@ -9,12 +9,13 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"cdr.dev/slog"
|
||||
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbauthz"
|
||||
)
|
||||
|
||||
const (
|
||||
delay = 24 * time.Hour
|
||||
delay = 10 * time.Minute
|
||||
)
|
||||
|
||||
// New creates a new periodically purging database instance.
|
||||
@@ -23,37 +24,47 @@ const (
|
||||
// This is for cleaning up old, unused resources from the database that take up space.
|
||||
func New(ctx context.Context, logger slog.Logger, db database.Store) io.Closer {
|
||||
closed := make(chan struct{})
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(ctx)
|
||||
//nolint:gocritic // The system purges old db records without user input.
|
||||
ctx = dbauthz.AsSystemRestricted(ctx)
|
||||
|
||||
// Use time.Nanosecond to force an initial tick. It will be reset to the
|
||||
// correct duration after executing once.
|
||||
ticker := time.NewTicker(time.Nanosecond)
|
||||
doTick := func() {
|
||||
defer ticker.Reset(delay)
|
||||
|
||||
var eg errgroup.Group
|
||||
eg.Go(func() error {
|
||||
return db.DeleteOldWorkspaceAgentLogs(ctx)
|
||||
})
|
||||
eg.Go(func() error {
|
||||
return db.DeleteOldWorkspaceAgentStats(ctx)
|
||||
})
|
||||
eg.Go(func() error {
|
||||
return db.DeleteOldProvisionerDaemons(ctx)
|
||||
})
|
||||
err := eg.Wait()
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
logger.Error(ctx, "failed to purge old database entries", slog.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(closed)
|
||||
|
||||
ticker := time.NewTicker(delay)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
ticker.Stop()
|
||||
doTick()
|
||||
}
|
||||
|
||||
var eg errgroup.Group
|
||||
eg.Go(func() error {
|
||||
return db.DeleteOldWorkspaceAgentLogs(ctx)
|
||||
})
|
||||
eg.Go(func() error {
|
||||
return db.DeleteOldWorkspaceAgentStats(ctx)
|
||||
})
|
||||
err := eg.Wait()
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
logger.Error(ctx, "failed to purge old database entries", slog.Error(err))
|
||||
}
|
||||
|
||||
ticker.Reset(delay)
|
||||
}
|
||||
}()
|
||||
return &instance{
|
||||
|
||||
@@ -2,15 +2,23 @@ package dbpurge_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/goleak"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbmem"
|
||||
"github.com/coder/coder/v2/coderd/database/dbpurge"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtestutil"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
@@ -24,3 +32,72 @@ func TestPurge(t *testing.T) {
|
||||
err := purger.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestDeleteOldProvisionerDaemons(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db, _ := dbtestutil.NewDB(t)
|
||||
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
|
||||
defer cancel()
|
||||
|
||||
now := dbtime.Now()
|
||||
|
||||
// given
|
||||
_, err := db.InsertProvisionerDaemon(ctx, database.InsertProvisionerDaemonParams{
|
||||
// Provisioner daemon created 14 days ago, and checked in just before 7 days deadline.
|
||||
ID: uuid.New(),
|
||||
Name: "external-0",
|
||||
Provisioners: []database.ProvisionerType{"echo"},
|
||||
CreatedAt: now.Add(-14 * 24 * time.Hour),
|
||||
UpdatedAt: sql.NullTime{Valid: true, Time: now.Add(-7 * 24 * time.Hour).Add(time.Minute)},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
_, err = db.InsertProvisionerDaemon(ctx, database.InsertProvisionerDaemonParams{
|
||||
// Provisioner daemon created 8 days ago, and checked in last time an hour after creation.
|
||||
ID: uuid.New(),
|
||||
Name: "external-1",
|
||||
Provisioners: []database.ProvisionerType{"echo"},
|
||||
CreatedAt: now.Add(-8 * 24 * time.Hour),
|
||||
UpdatedAt: sql.NullTime{Valid: true, Time: now.Add(-8 * 24 * time.Hour).Add(time.Hour)},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
_, err = db.InsertProvisionerDaemon(ctx, database.InsertProvisionerDaemonParams{
|
||||
// Provisioner daemon created 9 days ago, and never checked in.
|
||||
ID: uuid.New(),
|
||||
Name: "external-2",
|
||||
Provisioners: []database.ProvisionerType{"echo"},
|
||||
CreatedAt: now.Add(-9 * 24 * time.Hour),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
_, err = db.InsertProvisionerDaemon(ctx, database.InsertProvisionerDaemonParams{
|
||||
// Provisioner daemon created 6 days ago, and never checked in.
|
||||
ID: uuid.New(),
|
||||
Name: "external-3",
|
||||
Provisioners: []database.ProvisionerType{"echo"},
|
||||
CreatedAt: now.Add(-6 * 24 * time.Hour),
|
||||
UpdatedAt: sql.NullTime{Valid: true, Time: now.Add(-6 * 24 * time.Hour)},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// when
|
||||
closer := dbpurge.New(ctx, logger, db)
|
||||
defer closer.Close()
|
||||
|
||||
// then
|
||||
require.Eventually(t, func() bool {
|
||||
daemons, err := db.GetProvisionerDaemons(ctx)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return contains(daemons, "external-0") &&
|
||||
contains(daemons, "external-3")
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
}
|
||||
|
||||
func contains(daemons []database.ProvisionerDaemon, name string) bool {
|
||||
return slices.ContainsFunc(daemons, func(d database.ProvisionerDaemon) bool {
|
||||
return d.Name == name
|
||||
})
|
||||
}
|
||||
|
||||
@@ -56,6 +56,11 @@ type sqlcQuerier interface {
|
||||
DeleteGroupMemberFromGroup(ctx context.Context, arg DeleteGroupMemberFromGroupParams) error
|
||||
DeleteGroupMembersByOrgAndUser(ctx context.Context, arg DeleteGroupMembersByOrgAndUserParams) error
|
||||
DeleteLicense(ctx context.Context, id int32) (int32, error)
|
||||
// Delete provisioner daemons that have been created at least a week ago
|
||||
// and have not connected to coderd since a week.
|
||||
// A provisioner daemon with "zeroed" updated_at column indicates possible
|
||||
// connectivity issues (no provisioner daemon activity since registration).
|
||||
DeleteOldProvisionerDaemons(ctx context.Context) error
|
||||
// If an agent hasn't connected in the last 7 days, we purge it's logs.
|
||||
// Logs can take up a lot of space, so it's important we clean up frequently.
|
||||
DeleteOldWorkspaceAgentLogs(ctx context.Context) error
|
||||
|
||||
@@ -2986,6 +2986,22 @@ func (q *sqlQuerier) GetParameterSchemasByJobID(ctx context.Context, jobID uuid.
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const deleteOldProvisionerDaemons = `-- name: DeleteOldProvisionerDaemons :exec
|
||||
DELETE FROM provisioner_daemons WHERE (
|
||||
(created_at < (NOW() - INTERVAL '7 days') AND updated_at IS NULL) OR
|
||||
(updated_at IS NOT NULL AND updated_at < (NOW() - INTERVAL '7 days'))
|
||||
)
|
||||
`
|
||||
|
||||
// Delete provisioner daemons that have been created at least a week ago
|
||||
// and have not connected to coderd since a week.
|
||||
// A provisioner daemon with "zeroed" updated_at column indicates possible
|
||||
// connectivity issues (no provisioner daemon activity since registration).
|
||||
func (q *sqlQuerier) DeleteOldProvisionerDaemons(ctx context.Context) error {
|
||||
_, err := q.db.ExecContext(ctx, deleteOldProvisionerDaemons)
|
||||
return err
|
||||
}
|
||||
|
||||
const getProvisionerDaemons = `-- name: GetProvisionerDaemons :many
|
||||
SELECT
|
||||
id, created_at, updated_at, name, provisioners, replica_id, tags
|
||||
@@ -3031,10 +3047,11 @@ INSERT INTO
|
||||
created_at,
|
||||
"name",
|
||||
provisioners,
|
||||
tags
|
||||
tags,
|
||||
updated_at
|
||||
)
|
||||
VALUES
|
||||
($1, $2, $3, $4, $5) RETURNING id, created_at, updated_at, name, provisioners, replica_id, tags
|
||||
($1, $2, $3, $4, $5, $6) RETURNING id, created_at, updated_at, name, provisioners, replica_id, tags
|
||||
`
|
||||
|
||||
type InsertProvisionerDaemonParams struct {
|
||||
@@ -3043,6 +3060,7 @@ type InsertProvisionerDaemonParams struct {
|
||||
Name string `db:"name" json:"name"`
|
||||
Provisioners []ProvisionerType `db:"provisioners" json:"provisioners"`
|
||||
Tags StringMap `db:"tags" json:"tags"`
|
||||
UpdatedAt sql.NullTime `db:"updated_at" json:"updated_at"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) InsertProvisionerDaemon(ctx context.Context, arg InsertProvisionerDaemonParams) (ProvisionerDaemon, error) {
|
||||
@@ -3052,6 +3070,7 @@ func (q *sqlQuerier) InsertProvisionerDaemon(ctx context.Context, arg InsertProv
|
||||
arg.Name,
|
||||
pq.Array(arg.Provisioners),
|
||||
arg.Tags,
|
||||
arg.UpdatedAt,
|
||||
)
|
||||
var i ProvisionerDaemon
|
||||
err := row.Scan(
|
||||
|
||||
@@ -11,7 +11,18 @@ INSERT INTO
|
||||
created_at,
|
||||
"name",
|
||||
provisioners,
|
||||
tags
|
||||
tags,
|
||||
updated_at
|
||||
)
|
||||
VALUES
|
||||
($1, $2, $3, $4, $5) RETURNING *;
|
||||
($1, $2, $3, $4, $5, $6) RETURNING *;
|
||||
|
||||
-- name: DeleteOldProvisionerDaemons :exec
|
||||
-- Delete provisioner daemons that have been created at least a week ago
|
||||
-- and have not connected to coderd since a week.
|
||||
-- A provisioner daemon with "zeroed" updated_at column indicates possible
|
||||
-- connectivity issues (no provisioner daemon activity since registration).
|
||||
DELETE FROM provisioner_daemons WHERE (
|
||||
(created_at < (NOW() - INTERVAL '7 days') AND updated_at IS NULL) OR
|
||||
(updated_at IS NOT NULL AND updated_at < (NOW() - INTERVAL '7 days'))
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user