Files
coder/enterprise/derpmesh/derpmesh.go
T
Kyle Carberry 2ba4a62a0d feat: Add high availability for multiple replicas (#4555)
* feat: HA tailnet coordinator

* fixup! feat: HA tailnet coordinator

* fixup! feat: HA tailnet coordinator

* remove printlns

* close all connections on coordinator

* impelement high availability feature

* fixup! impelement high availability feature

* fixup! impelement high availability feature

* fixup! impelement high availability feature

* fixup! impelement high availability feature

* Add replicas

* Add DERP meshing to arbitrary addresses

* Move packages to highavailability folder

* Move coordinator to high availability package

* Add flags for HA

* Rename to replicasync

* Denest packages for replicas

* Add test for multiple replicas

* Fix coordination test

* Add HA to the helm chart

* Rename function pointer

* Add warnings for HA

* Add the ability to block endpoints

* Add flag to disable P2P connections

* Wow, I made the tests pass

* Add replicas endpoint

* Ensure close kills replica

* Update sql

* Add database latency to high availability

* Pipe TLS to DERP mesh

* Fix DERP mesh with TLS

* Add tests for TLS

* Fix replica sync TLS

* Fix RootCA for replica meshing

* Remove ID from replicasync

* Fix getting certificates for meshing

* Remove excessive locking

* Fix linting

* Store mesh key in the database

* Fix replica key for tests

* Fix types gen

* Fix unlocking unlocked

* Fix race in tests

* Update enterprise/derpmesh/derpmesh.go

Co-authored-by: Colin Adler <colin1adler@gmail.com>

* Rename to syncReplicas

* Reuse http client

* Delete old replicas on a CRON

* Fix race condition in connection tests

* Fix linting

* Fix nil type

* Move pubsub to in-memory for twenty test

* Add comment for configuration tweaking

* Fix leak with transport

* Fix close leak in derpmesh

* Fix race when creating server

* Remove handler update

* Skip test on Windows

* Fix DERP mesh test

* Wrap HTTP handler replacement in mutex

* Fix error message for relay

* Fix API handler for normal tests

* Fix speedtest

* Fix replica resend

* Fix derpmesh send

* Ping async

* Increase wait time of template version jobd

* Fix race when closing replica sync

* Add name to client

* Log the derpmap being used

* Don't connect if DERP is empty

* Improve agent coordinator logging

* Fix lock in coordinator

* Fix relay addr

* Fix race when updating durations

* Fix client publish race

* Run pubsub loop in a queue

* Store agent nodes in order

* Fix coordinator locking

* Check for closed pipe

Co-authored-by: Colin Adler <colin1adler@gmail.com>
2022-10-17 13:43:30 +00:00

166 lines
3.9 KiB
Go

package derpmesh
import (
"context"
"crypto/tls"
"net"
"net/url"
"sync"
"golang.org/x/xerrors"
"tailscale.com/derp"
"tailscale.com/derp/derphttp"
"tailscale.com/types/key"
"github.com/coder/coder/tailnet"
"cdr.dev/slog"
)
// New constructs a new mesh for DERP servers.
func New(logger slog.Logger, server *derp.Server, tlsConfig *tls.Config) *Mesh {
return &Mesh{
logger: logger,
server: server,
tlsConfig: tlsConfig,
ctx: context.Background(),
closed: make(chan struct{}),
active: make(map[string]context.CancelFunc),
}
}
type Mesh struct {
logger slog.Logger
server *derp.Server
ctx context.Context
tlsConfig *tls.Config
mutex sync.Mutex
closed chan struct{}
active map[string]context.CancelFunc
}
// SetAddresses performs a diff of the incoming addresses and adds
// or removes DERP clients from the mesh.
//
// Connect is only used for testing to ensure DERPs are meshed before
// exchanging messages.
// nolint:revive
func (m *Mesh) SetAddresses(addresses []string, connect bool) {
total := make(map[string]struct{}, 0)
for _, address := range addresses {
addressURL, err := url.Parse(address)
if err != nil {
m.logger.Error(m.ctx, "invalid address", slog.F("address", err), slog.Error(err))
continue
}
derpURL, err := addressURL.Parse("/derp")
if err != nil {
m.logger.Error(m.ctx, "parse derp", slog.F("address", err), slog.Error(err))
continue
}
address = derpURL.String()
total[address] = struct{}{}
added, err := m.addAddress(address, connect)
if err != nil {
m.logger.Error(m.ctx, "failed to add address", slog.F("address", address), slog.Error(err))
continue
}
if added {
m.logger.Debug(m.ctx, "added mesh address", slog.F("address", address))
}
}
m.mutex.Lock()
for address := range m.active {
_, found := total[address]
if found {
continue
}
removed := m.removeAddress(address)
if removed {
m.logger.Debug(m.ctx, "removed mesh address", slog.F("address", address))
}
}
m.mutex.Unlock()
}
// addAddress begins meshing with a new address. It returns false if the address is already being meshed with.
// It's expected that this is a full HTTP address with a path.
// e.g. http://127.0.0.1:8080/derp
// nolint:revive
func (m *Mesh) addAddress(address string, connect bool) (bool, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.isClosed() {
return false, nil
}
_, isActive := m.active[address]
if isActive {
return false, nil
}
client, err := derphttp.NewClient(m.server.PrivateKey(), address, tailnet.Logger(m.logger.Named("client")))
if err != nil {
return false, xerrors.Errorf("create derp client: %w", err)
}
client.TLSConfig = m.tlsConfig
client.MeshKey = m.server.MeshKey()
client.SetURLDialer(func(ctx context.Context, network, addr string) (net.Conn, error) {
var dialer net.Dialer
return dialer.DialContext(ctx, network, addr)
})
if connect {
_ = client.Connect(m.ctx)
}
ctx, cancelFunc := context.WithCancel(m.ctx)
closed := make(chan struct{})
closeFunc := func() {
cancelFunc()
_ = client.Close()
<-closed
}
m.active[address] = closeFunc
go func() {
defer close(closed)
client.RunWatchConnectionLoop(ctx, m.server.PublicKey(), tailnet.Logger(m.logger.Named("loop")), func(np key.NodePublic) {
m.server.AddPacketForwarder(np, client)
}, func(np key.NodePublic) {
m.server.RemovePacketForwarder(np, client)
})
}()
return true, nil
}
// removeAddress stops meshing with a given address.
func (m *Mesh) removeAddress(address string) bool {
cancelFunc, isActive := m.active[address]
if isActive {
cancelFunc()
}
return isActive
}
// Close ends all active meshes with the DERP server.
func (m *Mesh) Close() error {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.isClosed() {
return nil
}
close(m.closed)
for _, cancelFunc := range m.active {
cancelFunc()
}
return nil
}
func (m *Mesh) isClosed() bool {
select {
case <-m.closed:
return true
default:
}
return false
}