diff --git a/cli/server.go b/cli/server.go index 79846ac140..be3a7fefd2 100644 --- a/cli/server.go +++ b/cli/server.go @@ -7,7 +7,6 @@ import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" - "crypto/sha256" "crypto/tls" "crypto/x509" "database/sql" @@ -98,7 +97,6 @@ import ( "github.com/coder/coder/v2/coderd/workspaceapps/appurl" "github.com/coder/coder/v2/coderd/workspacestats" "github.com/coder/coder/v2/coderd/wsbuilder" - natspubsub "github.com/coder/coder/v2/coderd/x/nats" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/drpcsdk" "github.com/coder/coder/v2/cryptorand" @@ -779,25 +777,13 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. } options.Database = database.New(sqlDB) - experiments := coderd.ReadExperiments(options.Logger, options.DeploymentValues.Experiments.Value()) - if experiments.Enabled(codersdk.ExperimentNATSPubsub) { - token := fmt.Sprintf("%x", sha256.Sum256([]byte(dbURL))) - ps, err := natspubsub.New(ctx, logger.Named("pubsub"), natspubsub.Options{ - ClusterAuthToken: token, - }) - if err != nil { - return xerrors.Errorf("create nats pubsub: %w", err) - } - options.Pubsub = ps - } else { - ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL) - if err != nil { - return xerrors.Errorf("create pubsub: %w", err) - } - options.Pubsub = ps - if options.DeploymentValues.Prometheus.Enable { - options.PrometheusRegistry.MustRegister(ps) - } + ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL) + if err != nil { + return xerrors.Errorf("create pubsub: %w", err) + } + options.Pubsub = ps + if options.DeploymentValues.Prometheus.Enable { + options.PrometheusRegistry.MustRegister(ps) } defer options.Pubsub.Close() diff --git a/enterprise/cli/server.go b/enterprise/cli/server.go index 37febd028b..ef739a6f24 100644 --- a/enterprise/cli/server.go +++ b/enterprise/cli/server.go @@ -4,9 +4,11 @@ package cli import ( "context" + "crypto/sha256" "database/sql" "encoding/base64" "errors" + "fmt" "io" "net/url" "time" @@ -17,6 +19,8 @@ import ( agplcoderd "github.com/coder/coder/v2/coderd" "github.com/coder/coder/v2/coderd/database" + natspubsub "github.com/coder/coder/v2/coderd/x/nats" + "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/enterprise/audit" "github.com/coder/coder/v2/enterprise/audit/backends" @@ -24,6 +28,7 @@ import ( "github.com/coder/coder/v2/enterprise/coderd/dormancy" "github.com/coder/coder/v2/enterprise/coderd/usage" "github.com/coder/coder/v2/enterprise/dbcrypt" + "github.com/coder/coder/v2/enterprise/replicasync" "github.com/coder/coder/v2/enterprise/trialer" "github.com/coder/coder/v2/tailnet" "github.com/coder/quartz" @@ -128,11 +133,47 @@ func (r *RootCmd) Server(_ func()) *serpent.Command { closers := &multiCloser{} + meshTLSConfig, err := replicasync.CreateDERPMeshTLSConfig(options.AccessURL.Hostname(), options.TLSCertificates) + if err != nil { + return nil, nil, xerrors.Errorf("create DERP mesh TLS config: %w", err) + } + replicaManager, err := replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, &replicasync.Options{ + RelayAddress: options.DeploymentValues.DERP.Server.RelayURL.String(), + // #nosec G115 - DERP region IDs are small and fit in int32 + RegionID: int32(options.DeploymentValues.DERP.Server.RegionID.Value()), + TLSConfig: meshTLSConfig, + UpdateInterval: o.ReplicaSyncUpdateInterval, + }) + if err != nil { + return nil, nil, xerrors.Errorf("initialize replica: %w", err) + } + o.ReplicaManager = replicaManager + replicaManagerOwnedByAPI := false + defer func() { + if !replicaManagerOwnedByAPI { + _ = replicaManager.Close() + } + }() + + if agplcoderd.ReadExperiments(options.Logger, options.DeploymentValues.Experiments.Value()).Enabled(codersdk.ExperimentNATSPubsub) { + token := fmt.Sprintf("%x", sha256.Sum256([]byte(options.DeploymentValues.PostgresURL.String()))) + natsPubsub, err := natspubsub.New(ctx, options.Logger.Named("pubsub"), natspubsub.Options{ + ClusterAuthToken: token, + }) + if err != nil { + return nil, nil, xerrors.Errorf("create nats pubsub: %w", err) + } + options.Pubsub = natsPubsub + closers.Add(natsPubsub) + } + // Create the enterprise API. api, err := coderd.New(ctx, o) if err != nil { + _ = closers.Close() return nil, nil, err } + replicaManagerOwnedByAPI = true closers.Add(api) // Start the enterprise usage publisher routine. This won't do anything diff --git a/enterprise/coderd/coderd.go b/enterprise/coderd/coderd.go index 9ecd11c068..f3bbe20208 100644 --- a/enterprise/coderd/coderd.go +++ b/enterprise/coderd/coderd.go @@ -689,17 +689,10 @@ func New(ctx context.Context, options *Options) (_ *API, err error) { // We always want to run the replica manager even if we don't have DERP // enabled, since it's used to detect other coder servers for licensing. - api.replicaManager, err = replicasync.New(ctx, options.Logger, options.Database, options.Pubsub, &replicasync.Options{ - ID: api.AGPL.ID, - RelayAddress: options.DERPServerRelayAddress, - // #nosec G115 - DERP region IDs are small and fit in int32 - RegionID: int32(options.DERPServerRegionID), - TLSConfig: meshTLSConfig, - UpdateInterval: options.ReplicaSyncUpdateInterval, - }) - if err != nil { - return nil, xerrors.Errorf("initialize replica: %w", err) + if options.ReplicaManager == nil { + return nil, xerrors.New("replica manager is required") } + api.replicaManager = options.ReplicaManager replicaManagerPtr.Store(api.replicaManager) if api.AGPL.Experiments.Enabled(codersdk.ExperimentNATSPubsub) { if natsPubsub, ok := api.Pubsub.(*natspubsub.Pubsub); ok { @@ -798,6 +791,10 @@ type Options struct { ExternalTokenEncryption []dbcrypt.Cipher + // ReplicaManager detects and syncs multiple Coder replicas. When provided, + // the API owns and closes it. + ReplicaManager *replicasync.Manager + // Used for high availability. ReplicaSyncUpdateInterval time.Duration ReplicaErrorGracePeriod time.Duration diff --git a/enterprise/coderd/coderd_test.go b/enterprise/coderd/coderd_test.go index 4e9b7063f7..9d9238c6be 100644 --- a/enterprise/coderd/coderd_test.go +++ b/enterprise/coderd/coderd_test.go @@ -629,7 +629,7 @@ func TestMultiReplica_NATSPubsubPeers(t *testing.T) { t.Parallel() ctx := testutil.Context(t, testutil.WaitLong) - db, _ := dbtestutil.NewDB(t) + db, pgPubsub := dbtestutil.NewDB(t) logger := testutil.Logger(t) clusterToken := "shared-token" @@ -641,11 +641,20 @@ func TestMultiReplica_NATSPubsubPeers(t *testing.T) { }) require.NoError(t, err) t.Cleanup(func() { _ = natsA.Close() }) + replicaA, err := replicasync.New(ctx, logger.Named("replica-a"), db, pgPubsub, &replicasync.Options{ + ID: uuid.New(), + RelayAddress: fmt.Sprintf("nats://127.0.0.1:%d", portA), + RegionID: 12344, + UpdateInterval: testutil.IntervalFast, + }) + require.NoError(t, err) + t.Cleanup(func() { _ = replicaA.Close() }) dv := coderdtest.DeploymentValues(t) dv.Experiments = []string{string(codersdk.ExperimentNATSPubsub)} coderdenttest.NewWithAPI(t, &coderdenttest.Options{ EntitlementsUpdateInterval: 25 * time.Millisecond, + ReplicaManager: replicaA, ReplicaSyncUpdateInterval: 25 * time.Millisecond, Options: &coderdtest.Options{ Logger: &logger, @@ -664,7 +673,7 @@ func TestMultiReplica_NATSPubsubPeers(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { _ = natsB.Close() }) - mgr, err := replicasync.New(ctx, logger.Named("replica-b"), db, natsB, &replicasync.Options{ + mgr, err := replicasync.New(ctx, logger.Named("replica-b"), db, pgPubsub, &replicasync.Options{ ID: uuid.New(), RelayAddress: fmt.Sprintf("nats://127.0.0.1:%d", portB), RegionID: 12345, diff --git a/enterprise/coderd/coderdenttest/coderdenttest.go b/enterprise/coderd/coderdenttest/coderdenttest.go index 1115ba1211..5d45591b68 100644 --- a/enterprise/coderd/coderdenttest/coderdenttest.go +++ b/enterprise/coderd/coderdenttest/coderdenttest.go @@ -31,6 +31,7 @@ import ( "github.com/coder/coder/v2/enterprise/coderd/license" entprebuilds "github.com/coder/coder/v2/enterprise/coderd/prebuilds" "github.com/coder/coder/v2/enterprise/dbcrypt" + "github.com/coder/coder/v2/enterprise/replicasync" "github.com/coder/coder/v2/provisioner/echo" "github.com/coder/coder/v2/provisioner/terraform" "github.com/coder/coder/v2/provisionerd" @@ -73,6 +74,7 @@ type Options struct { LicenseOptions *LicenseOptions DontAddLicense bool DontAddFirstUser bool + ReplicaManager *replicasync.Manager ReplicaSyncUpdateInterval time.Duration ReplicaErrorGracePeriod time.Duration ExternalTokenEncryption []dbcrypt.Cipher @@ -103,6 +105,23 @@ func NewWithAPI(t *testing.T, options *Options) ( } require.False(t, options.DontAddFirstUser && !options.DontAddLicense, "DontAddFirstUser requires DontAddLicense") setHandler, cancelFunc, serverURL, oop := coderdtest.NewOptions(t, options.Options) + replicaManager := options.ReplicaManager + if replicaManager == nil { + var err error + replicaManager, err = replicasync.New(context.Background(), oop.Logger, oop.Database, oop.Pubsub, &replicasync.Options{ + RelayAddress: serverURL.String(), + // #nosec G115 - DERP region IDs are small and fit in int32. + RegionID: int32(oop.DeploymentValues.DERP.Server.RegionID.Value()), + UpdateInterval: options.ReplicaSyncUpdateInterval, + }) + require.NoError(t, err) + } + replicaManagerOwnedByAPI := false + defer func() { + if !replicaManagerOwnedByAPI { + _ = replicaManager.Close() + } + }() coderAPI, err := coderd.New(context.Background(), &coderd.Options{ RBAC: true, ConnectionLogging: options.ConnectionLogging, @@ -112,6 +131,7 @@ func NewWithAPI(t *testing.T, options *Options) ( UseLegacySCIM: options.UseLegacySCIM, DERPServerRelayAddress: serverURL.String(), DERPServerRegionID: int(oop.DeploymentValues.DERP.Server.RegionID.Value()), + ReplicaManager: replicaManager, ReplicaSyncUpdateInterval: options.ReplicaSyncUpdateInterval, ReplicaErrorGracePeriod: options.ReplicaErrorGracePeriod, Options: oop, @@ -123,6 +143,7 @@ func NewWithAPI(t *testing.T, options *Options) ( ExternalTokenEncryption: options.ExternalTokenEncryption, }) require.NoError(t, err) + replicaManagerOwnedByAPI = true setHandler(coderAPI.AGPL.RootHandler) var provisionerCloser io.Closer = nopcloser{} if options.IncludeProvisionerDaemon {