refactor(enterprise): inject replica manager and keep nats separate from sync

This commit is contained in:
Jon Ayers
2026-05-29 20:30:10 +00:00
parent 01db03f9f0
commit 283aad2c83
5 changed files with 87 additions and 33 deletions
+7 -21
View File
@@ -7,7 +7,6 @@ import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"database/sql"
@@ -98,7 +97,6 @@ import (
"github.com/coder/coder/v2/coderd/workspaceapps/appurl"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/coderd/wsbuilder"
natspubsub "github.com/coder/coder/v2/coderd/x/nats"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpcsdk"
"github.com/coder/coder/v2/cryptorand"
@@ -779,25 +777,13 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
}
options.Database = database.New(sqlDB)
experiments := coderd.ReadExperiments(options.Logger, options.DeploymentValues.Experiments.Value())
if experiments.Enabled(codersdk.ExperimentNATSPubsub) {
token := fmt.Sprintf("%x", sha256.Sum256([]byte(dbURL)))
ps, err := natspubsub.New(ctx, logger.Named("pubsub"), natspubsub.Options{
ClusterAuthToken: token,
})
if err != nil {
return xerrors.Errorf("create nats pubsub: %w", err)
}
options.Pubsub = ps
} else {
ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
if err != nil {
return xerrors.Errorf("create pubsub: %w", err)
}
options.Pubsub = ps
if options.DeploymentValues.Prometheus.Enable {
options.PrometheusRegistry.MustRegister(ps)
}
ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
if err != nil {
return xerrors.Errorf("create pubsub: %w", err)
}
options.Pubsub = ps
if options.DeploymentValues.Prometheus.Enable {
options.PrometheusRegistry.MustRegister(ps)
}
defer options.Pubsub.Close()
+41
View File
@@ -4,9 +4,11 @@ package cli
import (
"context"
"crypto/sha256"
"database/sql"
"encoding/base64"
"errors"
"fmt"
"io"
"net/url"
"time"
@@ -17,6 +19,8 @@ import (
agplcoderd "github.com/coder/coder/v2/coderd"
"github.com/coder/coder/v2/coderd/database"
natspubsub "github.com/coder/coder/v2/coderd/x/nats"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/enterprise/audit"
"github.com/coder/coder/v2/enterprise/audit/backends"
@@ -24,6 +28,7 @@ import (
"github.com/coder/coder/v2/enterprise/coderd/dormancy"
"github.com/coder/coder/v2/enterprise/coderd/usage"
"github.com/coder/coder/v2/enterprise/dbcrypt"
"github.com/coder/coder/v2/enterprise/replicasync"
"github.com/coder/coder/v2/enterprise/trialer"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/quartz"
@@ -128,11 +133,47 @@ func (r *RootCmd) Server(_ func()) *serpent.Command {
closers := &multiCloser{}
meshTLSConfig, err := replicasync.CreateDERPMeshTLSConfig(options.AccessURL.Hostname(), options.TLSCertificates)
if err != nil {
return nil, nil, xerrors.Errorf("create DERP mesh TLS config: %w", err)
}
replicaManager, err := replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, &replicasync.Options{
RelayAddress: options.DeploymentValues.DERP.Server.RelayURL.String(),
// #nosec G115 - DERP region IDs are small and fit in int32
RegionID: int32(options.DeploymentValues.DERP.Server.RegionID.Value()),
TLSConfig: meshTLSConfig,
UpdateInterval: o.ReplicaSyncUpdateInterval,
})
if err != nil {
return nil, nil, xerrors.Errorf("initialize replica: %w", err)
}
o.ReplicaManager = replicaManager
replicaManagerOwnedByAPI := false
defer func() {
if !replicaManagerOwnedByAPI {
_ = replicaManager.Close()
}
}()
if agplcoderd.ReadExperiments(options.Logger, options.DeploymentValues.Experiments.Value()).Enabled(codersdk.ExperimentNATSPubsub) {
token := fmt.Sprintf("%x", sha256.Sum256([]byte(options.DeploymentValues.PostgresURL.String())))
natsPubsub, err := natspubsub.New(ctx, options.Logger.Named("pubsub"), natspubsub.Options{
ClusterAuthToken: token,
})
if err != nil {
return nil, nil, xerrors.Errorf("create nats pubsub: %w", err)
}
options.Pubsub = natsPubsub
closers.Add(natsPubsub)
}
// Create the enterprise API.
api, err := coderd.New(ctx, o)
if err != nil {
_ = closers.Close()
return nil, nil, err
}
replicaManagerOwnedByAPI = true
closers.Add(api)
// Start the enterprise usage publisher routine. This won't do anything
+7 -10
View File
@@ -689,17 +689,10 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
// We always want to run the replica manager even if we don't have DERP
// enabled, since it's used to detect other coder servers for licensing.
api.replicaManager, err = replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, &replicasync.Options{
ID: api.AGPL.ID,
RelayAddress: options.DERPServerRelayAddress,
// #nosec G115 - DERP region IDs are small and fit in int32
RegionID: int32(options.DERPServerRegionID),
TLSConfig: meshTLSConfig,
UpdateInterval: options.ReplicaSyncUpdateInterval,
})
if err != nil {
return nil, xerrors.Errorf("initialize replica: %w", err)
if options.ReplicaManager == nil {
return nil, xerrors.New("replica manager is required")
}
api.replicaManager = options.ReplicaManager
replicaManagerPtr.Store(api.replicaManager)
if api.AGPL.Experiments.Enabled(codersdk.ExperimentNATSPubsub) {
if natsPubsub, ok := api.Pubsub.(*natspubsub.Pubsub); ok {
@@ -798,6 +791,10 @@ type Options struct {
ExternalTokenEncryption []dbcrypt.Cipher
// ReplicaManager detects and syncs multiple Coder replicas. When provided,
// the API owns and closes it.
ReplicaManager *replicasync.Manager
// Used for high availability.
ReplicaSyncUpdateInterval time.Duration
ReplicaErrorGracePeriod time.Duration
+11 -2
View File
@@ -629,7 +629,7 @@ func TestMultiReplica_NATSPubsubPeers(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitLong)
db, _ := dbtestutil.NewDB(t)
db, pgPubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
clusterToken := "shared-token"
@@ -641,11 +641,20 @@ func TestMultiReplica_NATSPubsubPeers(t *testing.T) {
})
require.NoError(t, err)
t.Cleanup(func() { _ = natsA.Close() })
replicaA, err := replicasync.New(ctx, logger.Named("replica-a"), db, pgPubsub, &replicasync.Options{
ID: uuid.New(),
RelayAddress: fmt.Sprintf("nats://127.0.0.1:%d", portA),
RegionID: 12344,
UpdateInterval: testutil.IntervalFast,
})
require.NoError(t, err)
t.Cleanup(func() { _ = replicaA.Close() })
dv := coderdtest.DeploymentValues(t)
dv.Experiments = []string{string(codersdk.ExperimentNATSPubsub)}
coderdenttest.NewWithAPI(t, &coderdenttest.Options{
EntitlementsUpdateInterval: 25 * time.Millisecond,
ReplicaManager: replicaA,
ReplicaSyncUpdateInterval: 25 * time.Millisecond,
Options: &coderdtest.Options{
Logger: &logger,
@@ -664,7 +673,7 @@ func TestMultiReplica_NATSPubsubPeers(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { _ = natsB.Close() })
mgr, err := replicasync.New(ctx, logger.Named("replica-b"), db, natsB, &replicasync.Options{
mgr, err := replicasync.New(ctx, logger.Named("replica-b"), db, pgPubsub, &replicasync.Options{
ID: uuid.New(),
RelayAddress: fmt.Sprintf("nats://127.0.0.1:%d", portB),
RegionID: 12345,
@@ -31,6 +31,7 @@ import (
"github.com/coder/coder/v2/enterprise/coderd/license"
entprebuilds "github.com/coder/coder/v2/enterprise/coderd/prebuilds"
"github.com/coder/coder/v2/enterprise/dbcrypt"
"github.com/coder/coder/v2/enterprise/replicasync"
"github.com/coder/coder/v2/provisioner/echo"
"github.com/coder/coder/v2/provisioner/terraform"
"github.com/coder/coder/v2/provisionerd"
@@ -73,6 +74,7 @@ type Options struct {
LicenseOptions *LicenseOptions
DontAddLicense bool
DontAddFirstUser bool
ReplicaManager *replicasync.Manager
ReplicaSyncUpdateInterval time.Duration
ReplicaErrorGracePeriod time.Duration
ExternalTokenEncryption []dbcrypt.Cipher
@@ -103,6 +105,23 @@ func NewWithAPI(t *testing.T, options *Options) (
}
require.False(t, options.DontAddFirstUser && !options.DontAddLicense, "DontAddFirstUser requires DontAddLicense")
setHandler, cancelFunc, serverURL, oop := coderdtest.NewOptions(t, options.Options)
replicaManager := options.ReplicaManager
if replicaManager == nil {
var err error
replicaManager, err = replicasync.New(context.Background(), oop.Logger, oop.Database, oop.Pubsub, &replicasync.Options{
RelayAddress: serverURL.String(),
// #nosec G115 - DERP region IDs are small and fit in int32.
RegionID: int32(oop.DeploymentValues.DERP.Server.RegionID.Value()),
UpdateInterval: options.ReplicaSyncUpdateInterval,
})
require.NoError(t, err)
}
replicaManagerOwnedByAPI := false
defer func() {
if !replicaManagerOwnedByAPI {
_ = replicaManager.Close()
}
}()
coderAPI, err := coderd.New(context.Background(), &coderd.Options{
RBAC: true,
ConnectionLogging: options.ConnectionLogging,
@@ -112,6 +131,7 @@ func NewWithAPI(t *testing.T, options *Options) (
UseLegacySCIM: options.UseLegacySCIM,
DERPServerRelayAddress: serverURL.String(),
DERPServerRegionID: int(oop.DeploymentValues.DERP.Server.RegionID.Value()),
ReplicaManager: replicaManager,
ReplicaSyncUpdateInterval: options.ReplicaSyncUpdateInterval,
ReplicaErrorGracePeriod: options.ReplicaErrorGracePeriod,
Options: oop,
@@ -123,6 +143,7 @@ func NewWithAPI(t *testing.T, options *Options) (
ExternalTokenEncryption: options.ExternalTokenEncryption,
})
require.NoError(t, err)
replicaManagerOwnedByAPI = true
setHandler(coderAPI.AGPL.RootHandler)
var provisionerCloser io.Closer = nopcloser{}
if options.IncludeProvisionerDaemon {