chore: implement CoderVPN client & tunnel (#15612)

Addresses #14734.

This PR wires up `tunnel.go` to a `tailnet.Conn` via the new `/tailnet` endpoint, with all the necessary controllers such that a VPN connection can be started, stopped and inspected via the CoderVPN protocol.
This commit is contained in:
Ethan
2024-12-05 13:30:22 +11:00
committed by GitHub
parent b5b0a0e746
commit ba48069325
14 changed files with 1431 additions and 157 deletions
+1 -1
View File
@@ -60,7 +60,7 @@ func (r *RootCmd) vpnDaemonRun() *serpent.Command {
defer pipe.Close()
logger.Info(ctx, "starting tunnel")
tunnel, err := vpn.NewTunnel(ctx, logger, pipe)
tunnel, err := vpn.NewTunnel(ctx, logger, pipe, vpn.NewClient())
if err != nil {
return xerrors.Errorf("create new tunnel for client: %w", err)
}
+1
View File
@@ -27,6 +27,7 @@ func (e *Encoder[T]) Encode(v T) error {
return nil
}
// nolint: revive // complains that Decoder has the same function name
func (e *Encoder[T]) Close(c websocket.StatusCode) error {
return e.conn.Close(c, "")
}
+17 -7
View File
@@ -14,6 +14,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/tailscale/wireguard-go/tun"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
@@ -113,6 +114,8 @@ type Options struct {
DNSConfigurator dns.OSConfigurator
// Router is optional, and is passed to the underlying wireguard engine.
Router router.Router
// TUNDev is optional, and is passed to the underlying wireguard engine.
TUNDev tun.Device
}
// TelemetrySink allows tailnet.Conn to send network telemetry to the Coder
@@ -143,6 +146,8 @@ func NewConn(options *Options) (conn *Conn, err error) {
return nil, xerrors.New("At least one IP range must be provided")
}
netns.SetEnabled(options.TUNDev != nil)
var telemetryStore *TelemetryStore
if options.TelemetrySink != nil {
var err error
@@ -187,6 +192,7 @@ func NewConn(options *Options) (conn *Conn, err error) {
SetSubsystem: sys.Set,
DNS: options.DNSConfigurator,
Router: options.Router,
Tun: options.TUNDev,
})
if err != nil {
return nil, xerrors.Errorf("create wgengine: %w", err)
@@ -197,11 +203,14 @@ func NewConn(options *Options) (conn *Conn, err error) {
}
}()
wireguardEngine.InstallCaptureHook(options.CaptureHook)
dialer.UseNetstackForIP = func(ip netip.Addr) bool {
_, ok := wireguardEngine.PeerForIP(ip)
return ok
if options.TUNDev == nil {
dialer.UseNetstackForIP = func(ip netip.Addr) bool {
_, ok := wireguardEngine.PeerForIP(ip)
return ok
}
}
wireguardEngine = wgengine.NewWatchdog(wireguardEngine)
sys.Set(wireguardEngine)
magicConn := sys.MagicSock.Get()
@@ -244,11 +253,12 @@ func NewConn(options *Options) (conn *Conn, err error) {
return nil, xerrors.Errorf("create netstack: %w", err)
}
dialer.NetstackDialTCP = func(ctx context.Context, dst netip.AddrPort) (net.Conn, error) {
return netStack.DialContextTCP(ctx, dst)
if options.TUNDev == nil {
dialer.NetstackDialTCP = func(ctx context.Context, dst netip.AddrPort) (net.Conn, error) {
return netStack.DialContextTCP(ctx, dst)
}
netStack.ProcessLocalIPs = true
}
netStack.ProcessLocalIPs = true
wireguardEngine = wgengine.NewWatchdog(wireguardEngine)
cfgMaps := newConfigMaps(
options.Logger,
+211 -61
View File
@@ -7,6 +7,7 @@ import (
"maps"
"math"
"net/netip"
"slices"
"strings"
"sync"
"time"
@@ -19,6 +20,7 @@ import (
"tailscale.com/util/dnsname"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/quartz"
@@ -112,6 +114,11 @@ type DNSHostsSetter interface {
SetDNSHosts(hosts map[dnsname.FQDN][]netip.Addr) error
}
// UpdatesHandler is anything that expects a stream of workspace update diffs.
type UpdatesHandler interface {
Update(WorkspaceUpdate) error
}
// ControlProtocolClients represents an abstract interface to the tailnet control plane via a set
// of protocol clients. The Closer should close all the clients (e.g. by closing the underlying
// connection).
@@ -855,65 +862,121 @@ func (r *basicResumeTokenRefresher) refresh() {
r.timer.Reset(dur, "basicResumeTokenRefresher", "refresh")
}
type tunnelAllWorkspaceUpdatesController struct {
type TunnelAllWorkspaceUpdatesController struct {
coordCtrl *TunnelSrcCoordController
dnsHostSetter DNSHostsSetter
updateHandler UpdatesHandler
ownerUsername string
logger slog.Logger
mu sync.Mutex
updater *tunnelUpdater
}
type workspace struct {
id uuid.UUID
name string
agents map[uuid.UUID]agent
type Workspace struct {
ID uuid.UUID
Name string
Status proto.Workspace_Status
ownerUsername string
agents map[uuid.UUID]*Agent
}
// addAllDNSNames adds names for all of its agents to the given map of names
func (w workspace) addAllDNSNames(names map[dnsname.FQDN][]netip.Addr, owner string) error {
for _, a := range w.agents {
// updateDNSNames updates the DNS names for all agents in the workspace.
func (w *Workspace) updateDNSNames() error {
for id, a := range w.agents {
names := make(map[dnsname.FQDN][]netip.Addr)
// TODO: technically, DNS labels cannot start with numbers, but the rules are often not
// strictly enforced.
fqdn, err := dnsname.ToFQDN(fmt.Sprintf("%s.%s.me.coder.", a.name, w.name))
fqdn, err := dnsname.ToFQDN(fmt.Sprintf("%s.%s.me.coder.", a.Name, w.Name))
if err != nil {
return err
}
names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.id)}
fqdn, err = dnsname.ToFQDN(fmt.Sprintf("%s.%s.%s.coder.", a.name, w.name, owner))
names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.ID)}
fqdn, err = dnsname.ToFQDN(fmt.Sprintf("%s.%s.%s.coder.", a.Name, w.Name, w.ownerUsername))
if err != nil {
return err
}
names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.id)}
}
if len(w.agents) == 1 {
fqdn, err := dnsname.ToFQDN(fmt.Sprintf("%s.coder.", w.name))
if err != nil {
return err
}
for _, a := range w.agents {
names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.id)}
names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.ID)}
if len(w.agents) == 1 {
fqdn, err := dnsname.ToFQDN(fmt.Sprintf("%s.coder.", w.Name))
if err != nil {
return err
}
for _, a := range w.agents {
names[fqdn] = []netip.Addr{CoderServicePrefix.AddrFromUUID(a.ID)}
}
}
a.Hosts = names
w.agents[id] = a
}
return nil
}
type agent struct {
id uuid.UUID
name string
type Agent struct {
ID uuid.UUID
Name string
WorkspaceID uuid.UUID
Hosts map[dnsname.FQDN][]netip.Addr
}
func (t *tunnelAllWorkspaceUpdatesController) New(client WorkspaceUpdatesClient) CloserWaiter {
func (a *Agent) Clone() Agent {
hosts := make(map[dnsname.FQDN][]netip.Addr, len(a.Hosts))
for k, v := range a.Hosts {
hosts[k] = slices.Clone(v)
}
return Agent{
ID: a.ID,
Name: a.Name,
WorkspaceID: a.WorkspaceID,
Hosts: hosts,
}
}
func (t *TunnelAllWorkspaceUpdatesController) New(client WorkspaceUpdatesClient) CloserWaiter {
t.mu.Lock()
defer t.mu.Unlock()
updater := &tunnelUpdater{
client: client,
errChan: make(chan error, 1),
logger: t.logger,
coordCtrl: t.coordCtrl,
dnsHostsSetter: t.dnsHostSetter,
updateHandler: t.updateHandler,
ownerUsername: t.ownerUsername,
recvLoopDone: make(chan struct{}),
workspaces: make(map[uuid.UUID]*workspace),
workspaces: make(map[uuid.UUID]*Workspace),
}
go updater.recvLoop()
return updater
t.updater = updater
go t.updater.recvLoop()
return t.updater
}
func (t *TunnelAllWorkspaceUpdatesController) CurrentState() (WorkspaceUpdate, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.updater == nil {
return WorkspaceUpdate{}, xerrors.New("no updater")
}
t.updater.Lock()
defer t.updater.Unlock()
out := WorkspaceUpdate{
UpsertedWorkspaces: make([]*Workspace, 0, len(t.updater.workspaces)),
UpsertedAgents: make([]*Agent, 0, len(t.updater.workspaces)),
DeletedWorkspaces: make([]*Workspace, 0),
DeletedAgents: make([]*Agent, 0),
}
for _, w := range t.updater.workspaces {
out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &Workspace{
ID: w.ID,
Name: w.Name,
Status: w.Status,
})
for _, a := range w.agents {
out.UpsertedAgents = append(out.UpsertedAgents, ptr.Ref(a.Clone()))
}
}
return out, nil
}
type tunnelUpdater struct {
@@ -922,14 +985,13 @@ type tunnelUpdater struct {
client WorkspaceUpdatesClient
coordCtrl *TunnelSrcCoordController
dnsHostsSetter DNSHostsSetter
updateHandler UpdatesHandler
ownerUsername string
recvLoopDone chan struct{}
// don't need the mutex since only manipulated by the recvLoop
workspaces map[uuid.UUID]*workspace
sync.Mutex
closed bool
workspaces map[uuid.UUID]*Workspace
closed bool
}
func (t *tunnelUpdater) Close(ctx context.Context) error {
@@ -990,18 +1052,68 @@ func (t *tunnelUpdater) recvLoop() {
}
}
type WorkspaceUpdate struct {
UpsertedWorkspaces []*Workspace
UpsertedAgents []*Agent
DeletedWorkspaces []*Workspace
DeletedAgents []*Agent
}
func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
clone := WorkspaceUpdate{
UpsertedWorkspaces: make([]*Workspace, len(w.UpsertedWorkspaces)),
UpsertedAgents: make([]*Agent, len(w.UpsertedAgents)),
DeletedWorkspaces: make([]*Workspace, len(w.DeletedWorkspaces)),
DeletedAgents: make([]*Agent, len(w.DeletedAgents)),
}
for i, ws := range w.UpsertedWorkspaces {
clone.UpsertedWorkspaces[i] = &Workspace{
ID: ws.ID,
Name: ws.Name,
Status: ws.Status,
}
}
for i, a := range w.UpsertedAgents {
clone.UpsertedAgents[i] = ptr.Ref(a.Clone())
}
for i, ws := range w.DeletedWorkspaces {
clone.DeletedWorkspaces[i] = &Workspace{
ID: ws.ID,
Name: ws.Name,
Status: ws.Status,
}
}
for i, a := range w.DeletedAgents {
clone.DeletedAgents[i] = ptr.Ref(a.Clone())
}
return clone
}
func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
t.Lock()
defer t.Unlock()
currentUpdate := WorkspaceUpdate{
UpsertedWorkspaces: []*Workspace{},
UpsertedAgents: []*Agent{},
DeletedWorkspaces: []*Workspace{},
DeletedAgents: []*Agent{},
}
for _, uw := range update.UpsertedWorkspaces {
workspaceID, err := uuid.FromBytes(uw.Id)
if err != nil {
return xerrors.Errorf("failed to parse workspace ID: %w", err)
}
w := workspace{
id: workspaceID,
name: uw.Name,
agents: make(map[uuid.UUID]agent),
w := &Workspace{
ID: workspaceID,
Name: uw.Name,
Status: uw.Status,
ownerUsername: t.ownerUsername,
agents: make(map[uuid.UUID]*Agent),
}
t.upsertWorkspace(w)
t.upsertWorkspaceLocked(w)
currentUpdate.UpsertedWorkspaces = append(currentUpdate.UpsertedWorkspaces, w)
}
// delete agents before deleting workspaces, since the agents have workspace ID references
@@ -1014,17 +1126,22 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
if err != nil {
return xerrors.Errorf("failed to parse workspace ID: %w", err)
}
err = t.deleteAgent(workspaceID, agentID)
deletedAgent, err := t.deleteAgentLocked(workspaceID, agentID)
if err != nil {
return xerrors.Errorf("failed to delete agent: %w", err)
}
currentUpdate.DeletedAgents = append(currentUpdate.DeletedAgents, deletedAgent)
}
for _, dw := range update.DeletedWorkspaces {
workspaceID, err := uuid.FromBytes(dw.Id)
if err != nil {
return xerrors.Errorf("failed to parse workspace ID: %w", err)
}
t.deleteWorkspace(workspaceID)
deletedWorkspace, err := t.deleteWorkspaceLocked(workspaceID)
if err != nil {
return xerrors.Errorf("failed to delete workspace: %w", err)
}
currentUpdate.DeletedWorkspaces = append(currentUpdate.DeletedWorkspaces, deletedWorkspace)
}
// upsert agents last, after all workspaces have been added and deleted, since agents reference
@@ -1038,17 +1155,18 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
if err != nil {
return xerrors.Errorf("failed to parse workspace ID: %w", err)
}
a := agent{name: ua.Name, id: agentID}
err = t.upsertAgent(workspaceID, a)
a := &Agent{Name: ua.Name, ID: agentID, WorkspaceID: workspaceID}
err = t.upsertAgentLocked(workspaceID, a)
if err != nil {
return xerrors.Errorf("failed to upsert agent: %w", err)
}
currentUpdate.UpsertedAgents = append(currentUpdate.UpsertedAgents, a)
}
allAgents := t.allAgentIDs()
allAgents := t.allAgentIDsLocked()
t.coordCtrl.SyncDestinations(allAgents)
dnsNames := t.updateDNSNamesLocked()
if t.dnsHostsSetter != nil {
t.logger.Debug(context.Background(), "updating dns hosts")
dnsNames := t.allDNSNames()
err := t.dnsHostsSetter.SetDNSHosts(dnsNames)
if err != nil {
return xerrors.Errorf("failed to set DNS hosts: %w", err)
@@ -1056,41 +1174,60 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
} else {
t.logger.Debug(context.Background(), "skipping setting DNS names because we have no setter")
}
if t.updateHandler != nil {
t.logger.Debug(context.Background(), "calling update handler")
err := t.updateHandler.Update(currentUpdate.Clone())
if err != nil {
t.logger.Error(context.Background(), "failed to call update handler", slog.Error(err))
}
}
return nil
}
func (t *tunnelUpdater) upsertWorkspace(w workspace) {
old, ok := t.workspaces[w.id]
func (t *tunnelUpdater) upsertWorkspaceLocked(w *Workspace) *Workspace {
old, ok := t.workspaces[w.ID]
if !ok {
t.workspaces[w.id] = &w
return
t.workspaces[w.ID] = w
return w
}
old.name = w.name
old.Name = w.Name
old.Status = w.Status
old.ownerUsername = w.ownerUsername
return w
}
func (t *tunnelUpdater) deleteWorkspace(id uuid.UUID) {
func (t *tunnelUpdater) deleteWorkspaceLocked(id uuid.UUID) (*Workspace, error) {
w, ok := t.workspaces[id]
if !ok {
return nil, xerrors.Errorf("workspace %s not found", id)
}
delete(t.workspaces, id)
return w, nil
}
func (t *tunnelUpdater) upsertAgent(workspaceID uuid.UUID, a agent) error {
func (t *tunnelUpdater) upsertAgentLocked(workspaceID uuid.UUID, a *Agent) error {
w, ok := t.workspaces[workspaceID]
if !ok {
return xerrors.Errorf("workspace %s not found", workspaceID)
}
w.agents[a.id] = a
w.agents[a.ID] = a
return nil
}
func (t *tunnelUpdater) deleteAgent(workspaceID, id uuid.UUID) error {
func (t *tunnelUpdater) deleteAgentLocked(workspaceID, id uuid.UUID) (*Agent, error) {
w, ok := t.workspaces[workspaceID]
if !ok {
return xerrors.Errorf("workspace %s not found", workspaceID)
return nil, xerrors.Errorf("workspace %s not found", workspaceID)
}
a, ok := w.agents[id]
if !ok {
return nil, xerrors.Errorf("agent %s not found in workspace %s", id, workspaceID)
}
delete(w.agents, id)
return nil
return a, nil
}
func (t *tunnelUpdater) allAgentIDs() []uuid.UUID {
func (t *tunnelUpdater) allAgentIDsLocked() []uuid.UUID {
out := make([]uuid.UUID, 0, len(t.workspaces))
for _, w := range t.workspaces {
for id := range w.agents {
@@ -1100,41 +1237,54 @@ func (t *tunnelUpdater) allAgentIDs() []uuid.UUID {
return out
}
func (t *tunnelUpdater) allDNSNames() map[dnsname.FQDN][]netip.Addr {
// updateDNSNamesLocked updates the DNS names for all workspaces in the tunnelUpdater.
// t.Mutex must be held.
func (t *tunnelUpdater) updateDNSNamesLocked() map[dnsname.FQDN][]netip.Addr {
names := make(map[dnsname.FQDN][]netip.Addr)
for _, w := range t.workspaces {
err := w.addAllDNSNames(names, t.ownerUsername)
err := w.updateDNSNames()
if err != nil {
// This should never happen in production, because converting the FQDN only fails
// if names are too long, and we put strict length limits on agent, workspace, and user
// names.
t.logger.Critical(context.Background(),
"failed to include DNS name(s)",
slog.F("workspace_id", w.id),
slog.F("workspace_id", w.ID),
slog.Error(err))
}
for _, a := range w.agents {
for name, addrs := range a.Hosts {
names[name] = addrs
}
}
}
return names
}
type TunnelAllOption func(t *tunnelAllWorkspaceUpdatesController)
type TunnelAllOption func(t *TunnelAllWorkspaceUpdatesController)
// WithDNS configures the tunnelAllWorkspaceUpdatesController to set DNS names for all workspaces
// and agents it learns about.
func WithDNS(d DNSHostsSetter, ownerUsername string) TunnelAllOption {
return func(t *tunnelAllWorkspaceUpdatesController) {
return func(t *TunnelAllWorkspaceUpdatesController) {
t.dnsHostSetter = d
t.ownerUsername = ownerUsername
}
}
func WithHandler(h UpdatesHandler) TunnelAllOption {
return func(t *TunnelAllWorkspaceUpdatesController) {
t.updateHandler = h
}
}
// NewTunnelAllWorkspaceUpdatesController creates a WorkspaceUpdatesController that creates tunnels
// (via the TunnelSrcCoordController) to all agents received over the WorkspaceUpdates RPC. If a
// DNSHostSetter is provided, it also programs DNS hosts based on the agent and workspace names.
func NewTunnelAllWorkspaceUpdatesController(
logger slog.Logger, c *TunnelSrcCoordController, opts ...TunnelAllOption,
) WorkspaceUpdatesController {
t := &tunnelAllWorkspaceUpdatesController{logger: logger, coordCtrl: c}
) *TunnelAllWorkspaceUpdatesController {
t := &TunnelAllWorkspaceUpdatesController{logger: logger, coordCtrl: c}
for _, opt := range opts {
opt(t)
}
+176 -22
View File
@@ -7,6 +7,7 @@ import (
"net"
"net/netip"
"slices"
"strings"
"sync"
"sync/atomic"
"testing"
@@ -1451,10 +1452,35 @@ func (f *fakeDNSSetter) SetDNSHosts(hosts map[dnsname.FQDN][]netip.Addr) error {
}
}
func newFakeUpdateHandler(ctx context.Context, t testing.TB) *fakeUpdateHandler {
return &fakeUpdateHandler{
ctx: ctx,
t: t,
ch: make(chan tailnet.WorkspaceUpdate),
}
}
type fakeUpdateHandler struct {
ctx context.Context
t testing.TB
ch chan tailnet.WorkspaceUpdate
}
func (f *fakeUpdateHandler) Update(wu tailnet.WorkspaceUpdate) error {
f.t.Helper()
select {
case <-f.ctx.Done():
return timeoutOnFakeErr
case f.ch <- wu:
// OK
}
return nil
}
func setupConnectedAllWorkspaceUpdatesController(
ctx context.Context, t testing.TB, logger slog.Logger, opts ...tailnet.TunnelAllOption,
) (
*fakeCoordinatorClient, *fakeWorkspaceUpdateClient,
*fakeCoordinatorClient, *fakeWorkspaceUpdateClient, *tailnet.TunnelAllWorkspaceUpdatesController,
) {
fConn := &fakeCoordinatee{}
tsc := tailnet.NewTunnelSrcCoordController(logger, fConn)
@@ -1484,7 +1510,7 @@ func setupConnectedAllWorkspaceUpdatesController(
err := testutil.RequireRecvCtx(ctx, t, updateCW.Wait())
require.ErrorIs(t, err, io.EOF)
})
return coordC, updateC
return coordC, updateC, uut
}
func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
@@ -1492,9 +1518,12 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
ctx := testutil.Context(t, testutil.WaitShort)
logger := testutil.Logger(t)
fUH := newFakeUpdateHandler(ctx, t)
fDNS := newFakeDNSSetter(ctx, t)
coordC, updateC := setupConnectedAllWorkspaceUpdatesController(ctx, t, logger,
tailnet.WithDNS(fDNS, "testy"))
coordC, updateC, updateCtrl := setupConnectedAllWorkspaceUpdatesController(ctx, t, logger,
tailnet.WithDNS(fDNS, "testy"),
tailnet.WithHandler(fUH),
)
// Initial update contains 2 workspaces with 1 & 2 agents, respectively
w1ID := testUUID(1)
@@ -1528,19 +1557,71 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
require.Contains(t, adds, w2a1ID)
require.Contains(t, adds, w2a2ID)
ws1a1IP := netip.MustParseAddr("fd60:627a:a42b:0101::")
w2a1IP := netip.MustParseAddr("fd60:627a:a42b:0201::")
w2a2IP := netip.MustParseAddr("fd60:627a:a42b:0202::")
// Also triggers setting DNS hosts
expectedDNS := map[dnsname.FQDN][]netip.Addr{
"w1a1.w1.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
"w2a1.w2.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0201::")},
"w2a2.w2.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0202::")},
"w1a1.w1.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
"w2a1.w2.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0201::")},
"w2a2.w2.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0202::")},
"w1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
"w1a1.w1.me.coder.": {ws1a1IP},
"w2a1.w2.me.coder.": {w2a1IP},
"w2a2.w2.me.coder.": {w2a2IP},
"w1a1.w1.testy.coder.": {ws1a1IP},
"w2a1.w2.testy.coder.": {w2a1IP},
"w2a2.w2.testy.coder.": {w2a2IP},
"w1.coder.": {ws1a1IP},
}
dnsCall := testutil.RequireRecvCtx(ctx, t, fDNS.calls)
require.Equal(t, expectedDNS, dnsCall.hosts)
testutil.RequireSendCtx(ctx, t, dnsCall.err, nil)
currentState := tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{ID: w1ID, Name: "w1"},
{ID: w2ID, Name: "w2"},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: w1a1ID, Name: "w1a1", WorkspaceID: w1ID,
Hosts: map[dnsname.FQDN][]netip.Addr{
"w1.coder.": {ws1a1IP},
"w1a1.w1.me.coder.": {ws1a1IP},
"w1a1.w1.testy.coder.": {ws1a1IP},
},
},
{
ID: w2a1ID, Name: "w2a1", WorkspaceID: w2ID,
Hosts: map[dnsname.FQDN][]netip.Addr{
"w2a1.w2.me.coder.": {w2a1IP},
"w2a1.w2.testy.coder.": {w2a1IP},
},
},
{
ID: w2a2ID, Name: "w2a2", WorkspaceID: w2ID,
Hosts: map[dnsname.FQDN][]netip.Addr{
"w2a2.w2.me.coder.": {w2a2IP},
"w2a2.w2.testy.coder.": {w2a2IP},
},
},
},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
}
// And the callback
cbUpdate := testutil.RequireRecvCtx(ctx, t, fUH.ch)
require.Equal(t, currentState, cbUpdate)
// Current recvState should match
recvState, err := updateCtrl.CurrentState()
require.NoError(t, err)
slices.SortFunc(recvState.UpsertedWorkspaces, func(a, b *tailnet.Workspace) int {
return strings.Compare(a.Name, b.Name)
})
slices.SortFunc(recvState.UpsertedAgents, func(a, b *tailnet.Agent) int {
return strings.Compare(a.Name, b.Name)
})
require.Equal(t, currentState, recvState)
}
func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
@@ -1548,13 +1629,19 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
ctx := testutil.Context(t, testutil.WaitShort)
logger := testutil.Logger(t)
fUH := newFakeUpdateHandler(ctx, t)
fDNS := newFakeDNSSetter(ctx, t)
coordC, updateC := setupConnectedAllWorkspaceUpdatesController(ctx, t, logger,
tailnet.WithDNS(fDNS, "testy"))
coordC, updateC, updateCtrl := setupConnectedAllWorkspaceUpdatesController(ctx, t, logger,
tailnet.WithDNS(fDNS, "testy"),
tailnet.WithHandler(fUH),
)
w1ID := testUUID(1)
w1a1ID := testUUID(1, 1)
w1a2ID := testUUID(1, 2)
ws1a1IP := netip.MustParseAddr("fd60:627a:a42b:0101::")
ws1a2IP := netip.MustParseAddr("fd60:627a:a42b:0102::")
initUp := &proto.WorkspaceUpdate{
UpsertedWorkspaces: []*proto.Workspace{
{Id: w1ID[:], Name: "w1"},
@@ -1574,14 +1661,37 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
// DNS for w1a1
expectedDNS := map[dnsname.FQDN][]netip.Addr{
"w1a1.w1.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
"w1a1.w1.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
"w1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
"w1a1.w1.testy.coder.": {ws1a1IP},
"w1a1.w1.me.coder.": {ws1a1IP},
"w1.coder.": {ws1a1IP},
}
dnsCall := testutil.RequireRecvCtx(ctx, t, fDNS.calls)
require.Equal(t, expectedDNS, dnsCall.hosts)
testutil.RequireSendCtx(ctx, t, dnsCall.err, nil)
initRecvUp := tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{ID: w1ID, Name: "w1"},
},
UpsertedAgents: []*tailnet.Agent{
{ID: w1a1ID, Name: "w1a1", WorkspaceID: w1ID, Hosts: map[dnsname.FQDN][]netip.Addr{
"w1a1.w1.testy.coder.": {ws1a1IP},
"w1a1.w1.me.coder.": {ws1a1IP},
"w1.coder.": {ws1a1IP},
}},
},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
}
cbUpdate := testutil.RequireRecvCtx(ctx, t, fUH.ch)
require.Equal(t, initRecvUp, cbUpdate)
// Current state should match initial
state, err := updateCtrl.CurrentState()
require.NoError(t, err)
require.Equal(t, initRecvUp, state)
// Send update that removes w1a1 and adds w1a2
agentUpdate := &proto.WorkspaceUpdate{
UpsertedAgents: []*proto.Agent{
@@ -1606,13 +1716,51 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
// DNS contains only w1a2
expectedDNS = map[dnsname.FQDN][]netip.Addr{
"w1a2.w1.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0102::")},
"w1a2.w1.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0102::")},
"w1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0102::")},
"w1a2.w1.testy.coder.": {ws1a2IP},
"w1a2.w1.me.coder.": {ws1a2IP},
"w1.coder.": {ws1a2IP},
}
dnsCall = testutil.RequireRecvCtx(ctx, t, fDNS.calls)
require.Equal(t, expectedDNS, dnsCall.hosts)
testutil.RequireSendCtx(ctx, t, dnsCall.err, nil)
cbUpdate = testutil.RequireRecvCtx(ctx, t, fUH.ch)
sndRecvUpdate := tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{},
UpsertedAgents: []*tailnet.Agent{
{ID: w1a2ID, Name: "w1a2", WorkspaceID: w1ID, Hosts: map[dnsname.FQDN][]netip.Addr{
"w1a2.w1.testy.coder.": {ws1a2IP},
"w1a2.w1.me.coder.": {ws1a2IP},
"w1.coder.": {ws1a2IP},
}},
},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{
{ID: w1a1ID, Name: "w1a1", WorkspaceID: w1ID, Hosts: map[dnsname.FQDN][]netip.Addr{
"w1a1.w1.testy.coder.": {ws1a1IP},
"w1a1.w1.me.coder.": {ws1a1IP},
"w1.coder.": {ws1a1IP},
}},
},
}
require.Equal(t, sndRecvUpdate, cbUpdate)
state, err = updateCtrl.CurrentState()
require.NoError(t, err)
require.Equal(t, tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{ID: w1ID, Name: "w1"},
},
UpsertedAgents: []*tailnet.Agent{
{ID: w1a2ID, Name: "w1a2", WorkspaceID: w1ID, Hosts: map[dnsname.FQDN][]netip.Addr{
"w1a2.w1.testy.coder.": {ws1a2IP},
"w1a2.w1.me.coder.": {ws1a2IP},
"w1.coder.": {ws1a2IP},
}},
},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
}, state)
}
func TestTunnelAllWorkspaceUpdatesController_DNSError(t *testing.T) {
@@ -1635,6 +1783,8 @@ func TestTunnelAllWorkspaceUpdatesController_DNSError(t *testing.T) {
w1ID := testUUID(1)
w1a1ID := testUUID(1, 1)
ws1a1IP := netip.MustParseAddr("fd60:627a:a42b:0101::")
initUp := &proto.WorkspaceUpdate{
UpsertedWorkspaces: []*proto.Workspace{
{Id: w1ID[:], Name: "w1"},
@@ -1648,9 +1798,9 @@ func TestTunnelAllWorkspaceUpdatesController_DNSError(t *testing.T) {
// DNS for w1a1
expectedDNS := map[dnsname.FQDN][]netip.Addr{
"w1a1.w1.me.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
"w1a1.w1.testy.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
"w1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
"w1a1.w1.me.coder.": {ws1a1IP},
"w1a1.w1.testy.coder.": {ws1a1IP},
"w1.coder.": {ws1a1IP},
}
dnsCall := testutil.RequireRecvCtx(ctx, t, fDNS.calls)
require.Equal(t, expectedDNS, dnsCall.hosts)
@@ -1778,6 +1928,10 @@ type fakeWorkspaceUpdatesController struct {
calls chan *newWorkspaceUpdatesCall
}
func (*fakeWorkspaceUpdatesController) CurrentState() *proto.WorkspaceUpdate {
panic("unimplemented")
}
type newWorkspaceUpdatesCall struct {
client tailnet.WorkspaceUpdatesClient
resp chan<- tailnet.CloserWaiter
+180
View File
@@ -0,0 +1,180 @@
package vpn
import (
"context"
"net/http"
"net/netip"
"net/url"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"tailscale.com/net/dns"
"tailscale.com/wgengine/router"
"github.com/tailscale/wireguard-go/tun"
"cdr.dev/slog"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/quartz"
)
type Conn interface {
CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error)
Close() error
}
type vpnConn struct {
*tailnet.Conn
cancelFn func()
controller *tailnet.Controller
updatesCtrl *tailnet.TunnelAllWorkspaceUpdatesController
}
func (c *vpnConn) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) {
return c.updatesCtrl.CurrentState()
}
func (c *vpnConn) Close() error {
c.cancelFn()
<-c.controller.Closed()
return c.Conn.Close()
}
type client struct{}
type Client interface {
NewConn(ctx context.Context, serverURL *url.URL, token string, options *Options) (Conn, error)
}
func NewClient() Client {
return &client{}
}
type Options struct {
Headers http.Header
Logger slog.Logger
DNSConfigurator dns.OSConfigurator
Router router.Router
TUNFileDescriptor *int
UpdateHandler tailnet.UpdatesHandler
}
func (*client) NewConn(initCtx context.Context, serverURL *url.URL, token string, options *Options) (vpnC Conn, err error) {
if options == nil {
options = &Options{}
}
if options.Headers == nil {
options.Headers = http.Header{}
}
var dev tun.Device
if options.TUNFileDescriptor != nil {
// No-op on non-Darwin platforms.
dev, err = makeTUN(*options.TUNFileDescriptor)
if err != nil {
return nil, xerrors.Errorf("make TUN: %w", err)
}
}
headers := options.Headers
sdk := codersdk.New(serverURL)
sdk.SetSessionToken(token)
sdk.HTTPClient.Transport = &codersdk.HeaderTransport{
Transport: http.DefaultTransport,
Header: headers,
}
// New context, separate from initCtx. We don't want to cancel the
// connection if initCtx is canceled.
ctx, cancel := context.WithCancel(context.Background())
defer func() {
if err != nil {
cancel()
}
}()
rpcURL, err := sdk.URL.Parse("/api/v2/tailnet")
if err != nil {
return nil, xerrors.Errorf("parse rpc url: %w", err)
}
me, err := sdk.User(initCtx, codersdk.Me)
if err != nil {
return nil, xerrors.Errorf("get user: %w", err)
}
connInfo, err := workspacesdk.New(sdk).AgentConnectionInfoGeneric(initCtx)
if err != nil {
return nil, xerrors.Errorf("get connection info: %w", err)
}
headers.Set(codersdk.SessionTokenHeader, token)
dialer := workspacesdk.NewWebsocketDialer(options.Logger, rpcURL, &websocket.DialOptions{
HTTPClient: sdk.HTTPClient,
HTTPHeader: headers,
CompressionMode: websocket.CompressionDisabled,
}, workspacesdk.WithWorkspaceUpdates(&proto.WorkspaceUpdatesRequest{
WorkspaceOwnerId: tailnet.UUIDToByteSlice(me.ID),
}))
ip := tailnet.CoderServicePrefix.RandomAddr()
conn, err := tailnet.NewConn(&tailnet.Options{
Addresses: []netip.Prefix{netip.PrefixFrom(ip, 128)},
DERPMap: connInfo.DERPMap,
DERPHeader: &headers,
DERPForceWebSockets: connInfo.DERPForceWebSockets,
Logger: options.Logger,
BlockEndpoints: connInfo.DisableDirectConnections,
DNSConfigurator: options.DNSConfigurator,
Router: options.Router,
TUNDev: dev,
})
if err != nil {
return nil, xerrors.Errorf("create tailnet: %w", err)
}
defer func() {
if err != nil {
_ = conn.Close()
}
}()
clk := quartz.NewReal()
controller := tailnet.NewController(options.Logger, dialer)
coordCtrl := tailnet.NewTunnelSrcCoordController(options.Logger, conn)
controller.ResumeTokenCtrl = tailnet.NewBasicResumeTokenController(options.Logger, clk)
controller.CoordCtrl = coordCtrl
controller.DERPCtrl = tailnet.NewBasicDERPController(options.Logger, conn)
updatesCtrl := tailnet.NewTunnelAllWorkspaceUpdatesController(
options.Logger,
coordCtrl,
tailnet.WithDNS(conn, me.Name),
tailnet.WithHandler(options.UpdateHandler),
)
controller.WorkspaceUpdatesCtrl = updatesCtrl
controller.Run(ctx)
options.Logger.Debug(ctx, "running tailnet API v2+ connector")
select {
case <-initCtx.Done():
return nil, xerrors.Errorf("timed out waiting for coordinator and derp map: %w", initCtx.Err())
case err = <-dialer.Connected():
if err != nil {
options.Logger.Error(ctx, "failed to connect to tailnet v2+ API", slog.Error(err))
return nil, xerrors.Errorf("start connector: %w", err)
}
options.Logger.Debug(ctx, "connected to tailnet v2+ API")
}
return &vpnConn{
Conn: conn,
cancelFn: cancel,
controller: controller,
updatesCtrl: updatesCtrl,
}, nil
}
+188
View File
@@ -0,0 +1,188 @@
package vpn_test
import (
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"nhooyr.io/websocket"
"tailscale.com/net/dns"
"tailscale.com/tailcfg"
"github.com/coder/coder/v2/coderd/httpapi"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/coder/v2/tailnet/tailnettest"
"github.com/coder/coder/v2/testutil"
"github.com/coder/coder/v2/vpn"
)
func TestClient_WorkspaceUpdates(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := testutil.Logger(t)
userID := uuid.UUID{1}
wsID := uuid.UUID{2}
peerID := uuid.UUID{3}
fCoord := tailnettest.NewFakeCoordinator()
var coord tailnet.Coordinator = fCoord
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
coordPtr.Store(&coord)
ctrl := gomock.NewController(t)
mProvider := tailnettest.NewMockWorkspaceUpdatesProvider(ctrl)
mSub := tailnettest.NewMockSubscription(ctrl)
outUpdateCh := make(chan *proto.WorkspaceUpdate, 1)
inUpdateCh := make(chan tailnet.WorkspaceUpdate, 1)
mProvider.EXPECT().Subscribe(gomock.Any(), userID).Times(1).Return(mSub, nil)
mSub.EXPECT().Updates().MinTimes(1).Return(outUpdateCh)
mSub.EXPECT().Close().Times(1).Return(nil)
svc, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
Logger: logger,
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: time.Hour,
DERPMapFn: func() *tailcfg.DERPMap { return &tailcfg.DERPMap{} },
WorkspaceUpdatesProvider: mProvider,
ResumeTokenProvider: tailnet.NewInsecureTestResumeTokenProvider(),
})
require.NoError(t, err)
user := make(chan struct{})
connInfo := make(chan struct{})
serveErrCh := make(chan error)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/users/me":
httpapi.Write(ctx, w, http.StatusOK, codersdk.User{
ReducedUser: codersdk.ReducedUser{
MinimalUser: codersdk.MinimalUser{
ID: userID,
},
},
})
user <- struct{}{}
case "/api/v2/workspaceagents/connection":
httpapi.Write(ctx, w, http.StatusOK, workspacesdk.AgentConnectionInfo{
DisableDirectConnections: false,
})
connInfo <- struct{}{}
case "/api/v2/tailnet":
// need 2.3 for WorkspaceUpdates RPC
cVer := r.URL.Query().Get("version")
assert.Equal(t, "2.3", cVer)
sws, err := websocket.Accept(w, r, nil)
if !assert.NoError(t, err) {
return
}
wsCtx, nc := codersdk.WebsocketNetConn(ctx, sws, websocket.MessageBinary)
serveErrCh <- svc.ServeConnV2(wsCtx, nc, tailnet.StreamID{
Name: "client",
ID: peerID,
// Auth can be nil as we use a mock update provider
Auth: tailnet.ClientUserCoordinateeAuth{
Auth: nil,
},
})
default:
http.NotFound(w, r)
}
}))
t.Cleanup(server.Close)
svrURL, err := url.Parse(server.URL)
require.NoError(t, err)
connErrCh := make(chan error)
connCh := make(chan vpn.Conn)
go func() {
conn, err := vpn.NewClient().NewConn(ctx, svrURL, "fakeToken", &vpn.Options{
UpdateHandler: updateHandler(func(wu tailnet.WorkspaceUpdate) error {
inUpdateCh <- wu
return nil
}),
DNSConfigurator: &noopConfigurator{},
})
connErrCh <- err
connCh <- conn
}()
testutil.RequireRecvCtx(ctx, t, user)
testutil.RequireRecvCtx(ctx, t, connInfo)
err = testutil.RequireRecvCtx(ctx, t, connErrCh)
require.NoError(t, err)
conn := testutil.RequireRecvCtx(ctx, t, connCh)
// Send a workspace update
update := &proto.WorkspaceUpdate{
UpsertedWorkspaces: []*proto.Workspace{
{
Id: wsID[:],
},
},
}
testutil.RequireSendCtx(ctx, t, outUpdateCh, update)
// It'll be received by the update handler
recvUpdate := testutil.RequireRecvCtx(ctx, t, inUpdateCh)
require.Len(t, recvUpdate.UpsertedWorkspaces, 1)
require.Equal(t, wsID, recvUpdate.UpsertedWorkspaces[0].ID)
// And be reflected on the Conn's state
state, err := conn.CurrentWorkspaceState()
require.NoError(t, err)
require.Equal(t, tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wsID,
},
},
UpsertedAgents: []*tailnet.Agent{},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
}, state)
// Close the conn
conn.Close()
err = testutil.RequireRecvCtx(ctx, t, serveErrCh)
require.NoError(t, err)
}
type updateHandler func(tailnet.WorkspaceUpdate) error
func (h updateHandler) Update(u tailnet.WorkspaceUpdate) error {
return h(u)
}
type noopConfigurator struct{}
func (*noopConfigurator) Close() error {
return nil
}
func (*noopConfigurator) GetBaseConfig() (dns.OSConfig, error) {
return dns.OSConfig{}, nil
}
func (*noopConfigurator) SetDNS(dns.OSConfig) error {
return nil
}
func (*noopConfigurator) SupportsSplitDNS() bool {
return true
}
var _ dns.OSConfigurator = (*noopConfigurator)(nil)
+6 -3
View File
@@ -22,7 +22,7 @@ const (
)
// OpenTunnel creates a new VPN tunnel by `dup`ing the provided 'PIPE'
// file descriptors for reading, writing, and logging.
// file descriptors for reading and writing.
//
//export OpenTunnel
func OpenTunnel(cReadFD, cWriteFD int32) int32 {
@@ -46,8 +46,11 @@ func OpenTunnel(cReadFD, cWriteFD int32) int32 {
return ErrOpenPipe
}
// Logs will be sent over the protocol
_, err = vpn.NewTunnel(ctx, slog.Make(), conn)
_, err = vpn.NewTunnel(ctx, slog.Make(), conn, vpn.NewClient(),
vpn.UseAsDNSConfig(),
vpn.UseAsRouter(),
vpn.UseAsLogger(),
)
if err != nil {
unix.Close(readFD)
unix.Close(writeFD)
+10
View File
@@ -0,0 +1,10 @@
//go:build !darwin
package vpn
import "github.com/tailscale/wireguard-go/tun"
// This is a no-op on non-Darwin platforms.
func makeTUN(int) (tun.Device, error) {
return nil, nil
}
+30
View File
@@ -0,0 +1,30 @@
//go:build darwin
package vpn
import (
"os"
"github.com/tailscale/wireguard-go/tun"
"golang.org/x/sys/unix"
"golang.org/x/xerrors"
)
func makeTUN(tunFD int) (tun.Device, error) {
dupTunFd, err := unix.Dup(tunFD)
if err != nil {
return nil, xerrors.Errorf("dup tun fd: %w", err)
}
err = unix.SetNonblock(dupTunFd, true)
if err != nil {
unix.Close(dupTunFd)
return nil, xerrors.Errorf("set nonblock: %w", err)
}
fileTun, err := tun.CreateTUNFromFile(os.NewFile(uintptr(dupTunFd), "/dev/tun"), 0)
if err != nil {
unix.Close(dupTunFd)
return nil, xerrors.Errorf("create TUN from File: %w", err)
}
return fileTun, nil
}
+195 -18
View File
@@ -6,32 +6,56 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"reflect"
"strconv"
"sync"
"unicode"
"golang.org/x/xerrors"
"tailscale.com/net/dns"
"tailscale.com/wgengine/router"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/tailnet"
)
type Tunnel struct {
speaker[*TunnelMessage, *ManagerMessage, ManagerMessage]
ctx context.Context
logger slog.Logger
requestLoopDone chan struct{}
logger slog.Logger
logMu sync.Mutex
logs []*TunnelMessage
client Client
conn Conn
// clientLogger is a separate logger than `logger` when the `UseAsLogger`
// option is used, to avoid the tunnel using itself as a sink for it's own
// logs, which could lead to deadlocks.
clientLogger slog.Logger
// router and dnsConfigurator may be nil
router router.Router
dnsConfigurator dns.OSConfigurator
}
type TunnelOption func(t *Tunnel)
func NewTunnel(
ctx context.Context, logger slog.Logger, conn io.ReadWriteCloser,
ctx context.Context,
logger slog.Logger,
mgrConn io.ReadWriteCloser,
client Client,
opts ...TunnelOption,
) (*Tunnel, error) {
logger = logger.Named("vpn")
s, err := newSpeaker[*TunnelMessage, *ManagerMessage](
ctx, logger, conn, SpeakerRoleTunnel, SpeakerRoleManager)
ctx, logger, mgrConn, SpeakerRoleTunnel, SpeakerRoleManager)
if err != nil {
return nil, err
}
@@ -40,7 +64,13 @@ func NewTunnel(
speaker: *(s),
ctx: ctx,
logger: logger,
clientLogger: logger,
requestLoopDone: make(chan struct{}),
client: client,
}
for _, opt := range opts {
opt(t)
}
t.speaker.start()
go t.requestLoop()
@@ -55,6 +85,14 @@ func (t *Tunnel) requestLoop() {
if err := req.sendReply(resp); err != nil {
t.logger.Debug(t.ctx, "failed to send RPC reply", slog.Error(err))
}
if _, ok := resp.GetMsg().(*TunnelMessage_Stop); ok {
// TODO: Wait for the reply to be sent before closing the speaker.
// err := t.speaker.Close()
// if err != nil {
// t.logger.Error(t.ctx, "failed to close speaker", slog.Error(err))
// }
return
}
continue
}
// Not a unary RPC. We don't know of any message types that are neither a response nor a
@@ -70,12 +108,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
resp.Rpc = &RPC{ResponseTo: msgID}
switch msg := req.GetMsg().(type) {
case *ManagerMessage_GetPeerUpdate:
// TODO: actually get the peer updates
state, err := t.conn.CurrentWorkspaceState()
if err != nil {
t.logger.Critical(t.ctx, "failed to get current workspace state", slog.Error(err))
}
resp.Msg = &TunnelMessage_PeerUpdate{
PeerUpdate: &PeerUpdate{
UpsertedWorkspaces: nil,
UpsertedAgents: nil,
},
PeerUpdate: convertWorkspaceUpdate(state),
}
return resp
case *ManagerMessage_Start:
@@ -84,27 +122,35 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
slog.F("url", startReq.CoderUrl),
slog.F("tunnel_fd", startReq.TunnelFileDescriptor),
)
// TODO: actually start the tunnel
err := t.start(startReq)
var errStr string
if err != nil {
t.logger.Error(t.ctx, "failed to start tunnel", slog.Error(err))
errStr = err.Error()
}
resp.Msg = &TunnelMessage_Start{
Start: &StartResponse{
Success: true,
Success: err == nil,
ErrorMessage: errStr,
},
}
return resp
case *ManagerMessage_Stop:
t.logger.Info(t.ctx, "stopping CoderVPN tunnel")
// TODO: actually stop the tunnel
resp.Msg = &TunnelMessage_Stop{
Stop: &StopResponse{
Success: true,
},
}
err := t.speaker.Close()
err := t.stop(msg.Stop)
var errStr string
if err != nil {
t.logger.Error(t.ctx, "failed to close speaker", slog.Error(err))
t.logger.Error(t.ctx, "failed to stop tunnel", slog.Error(err))
errStr = err.Error()
} else {
t.logger.Info(t.ctx, "coderVPN tunnel stopped")
}
resp.Msg = &TunnelMessage_Stop{
Stop: &StopResponse{
Success: err == nil,
ErrorMessage: errStr,
},
}
return resp
default:
t.logger.Warn(t.ctx, "unhandled manager request", slog.F("request", msg))
@@ -112,6 +158,24 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
}
}
func UseAsRouter() TunnelOption {
return func(t *Tunnel) {
t.router = NewRouter(t)
}
}
func UseAsLogger() TunnelOption {
return func(t *Tunnel) {
t.clientLogger = slog.Make(t)
}
}
func UseAsDNSConfig() TunnelOption {
return func(t *Tunnel) {
t.dnsConfigurator = NewDNSConfigurator(t)
}
}
// ApplyNetworkSettings sends a request to the manager to apply the given network settings
func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRequest) error {
msg, err := t.speaker.unaryRPC(ctx, &TunnelMessage{
@@ -129,6 +193,65 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
return nil
}
func (t *Tunnel) Update(update tailnet.WorkspaceUpdate) error {
msg := &TunnelMessage{
Msg: &TunnelMessage_PeerUpdate{
PeerUpdate: convertWorkspaceUpdate(update),
},
}
select {
case <-t.ctx.Done():
return t.ctx.Err()
case t.sendCh <- msg:
}
return nil
}
func (t *Tunnel) start(req *StartRequest) error {
rawURL := req.GetCoderUrl()
if rawURL == "" {
return xerrors.New("missing coder url")
}
svrURL, err := url.Parse(rawURL)
if err != nil {
return xerrors.Errorf("parse url %q: %w", rawURL, err)
}
apiToken := req.GetApiToken()
if apiToken == "" {
return xerrors.New("missing api token")
}
var header http.Header
for _, h := range req.GetHeaders() {
header.Add(h.GetName(), h.GetValue())
}
if t.conn == nil {
t.conn, err = t.client.NewConn(
t.ctx,
svrURL,
apiToken,
&Options{
Headers: header,
Logger: t.clientLogger,
DNSConfigurator: t.dnsConfigurator,
Router: t.router,
TUNFileDescriptor: ptr.Ref(int(req.GetTunnelFileDescriptor())),
UpdateHandler: t,
},
)
} else {
t.logger.Warn(t.ctx, "asked to start tunnel, but tunnel is already running")
}
return err
}
func (t *Tunnel) stop(*StopRequest) error {
if t.conn == nil {
return nil
}
return t.conn.Close()
}
var _ slog.Sink = &Tunnel{}
func (t *Tunnel) LogEntry(_ context.Context, e slog.SinkEntry) {
@@ -170,6 +293,60 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
return l
}
func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
out := &PeerUpdate{
UpsertedWorkspaces: make([]*Workspace, len(update.UpsertedWorkspaces)),
UpsertedAgents: make([]*Agent, len(update.UpsertedAgents)),
DeletedWorkspaces: make([]*Workspace, len(update.DeletedWorkspaces)),
DeletedAgents: make([]*Agent, len(update.DeletedAgents)),
}
for i, ws := range update.UpsertedWorkspaces {
out.UpsertedWorkspaces[i] = &Workspace{
Id: tailnet.UUIDToByteSlice(ws.ID),
Name: ws.Name,
Status: Workspace_Status(ws.Status),
}
}
for i, agent := range update.UpsertedAgents {
fqdn := make([]string, 0, len(agent.Hosts))
for name := range agent.Hosts {
fqdn = append(fqdn, name.WithTrailingDot())
}
out.UpsertedAgents[i] = &Agent{
Id: tailnet.UUIDToByteSlice(agent.ID),
Name: agent.Name,
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
Fqdn: fqdn,
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
// TODO: Populate
LastHandshake: nil,
}
}
for i, ws := range update.DeletedWorkspaces {
out.DeletedWorkspaces[i] = &Workspace{
Id: tailnet.UUIDToByteSlice(ws.ID),
Name: ws.Name,
Status: Workspace_Status(ws.Status),
}
}
for i, agent := range update.DeletedAgents {
fqdn := make([]string, 0, len(agent.Hosts))
for name := range agent.Hosts {
fqdn = append(fqdn, name.WithTrailingDot())
}
out.DeletedAgents[i] = &Agent{
Id: tailnet.UUIDToByteSlice(agent.ID),
Name: agent.Name,
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
Fqdn: fqdn,
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
// TODO: Populate
LastHandshake: nil,
}
}
return out
}
// the following are taken from sloghuman:
func formatValue(v interface{}) string {
+280
View File
@@ -0,0 +1,280 @@
package vpn
import (
"context"
"net"
"net/url"
"sync"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/testutil"
)
func newFakeClient(ctx context.Context, t *testing.T) *fakeClient {
return &fakeClient{
t: t,
ctx: ctx,
ch: make(chan *fakeConn, 1),
}
}
type fakeClient struct {
t *testing.T
ctx context.Context
ch chan *fakeConn
}
var _ Client = (*fakeClient)(nil)
func (f *fakeClient) NewConn(context.Context, *url.URL, string, *Options) (Conn, error) {
select {
case <-f.ctx.Done():
return nil, f.ctx.Err()
case conn := <-f.ch:
return conn, nil
}
}
func newFakeConn(state tailnet.WorkspaceUpdate) *fakeConn {
return &fakeConn{
closed: make(chan struct{}),
state: state,
}
}
type fakeConn struct {
state tailnet.WorkspaceUpdate
closed chan struct{}
doClose sync.Once
}
var _ Conn = (*fakeConn)(nil)
func (f *fakeConn) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) {
return f.state, nil
}
func (f *fakeConn) Close() error {
f.doClose.Do(func() {
close(f.closed)
})
return nil
}
func TestTunnel_StartStop(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{})
_, mgr := setupTunnel(t, ctx, client)
errCh := make(chan error, 1)
var resp *TunnelMessage
// When: we start the tunnel
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
// Then: `NewConn` is called,
testutil.RequireSendCtx(ctx, t, client.ch, conn)
// And: a response is received
err := testutil.RequireRecvCtx(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// When: we stop the tunnel
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Stop{},
})
resp = r
errCh <- err
}()
// Then: `Close` is called on the connection
testutil.RequireRecvCtx(ctx, t, conn.closed)
// And: a Stop response is received
err = testutil.RequireRecvCtx(ctx, t, errCh)
require.NoError(t, err)
_, ok = resp.Msg.(*TunnelMessage_Stop)
require.True(t, ok)
err = mgr.Close()
require.NoError(t, err)
}
func TestTunnel_PeerUpdate(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
wsID1 := uuid.UUID{1}
wsID2 := uuid.UUID{2}
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wsID1,
},
{
ID: wsID2,
},
},
})
tun, mgr := setupTunnel(t, ctx, client)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSendCtx(ctx, t, client.ch, conn)
err := testutil.RequireRecvCtx(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wsID2,
},
},
})
require.NoError(t, err)
// Then: the tunnel sends a PeerUpdate message
req := testutil.RequireRecvCtx(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedWorkspaces, 1)
require.Equal(t, wsID2[:], req.msg.GetPeerUpdate().UpsertedWorkspaces[0].Id)
// When: the manager requests a PeerUpdate
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_GetPeerUpdate{},
})
resp = r
errCh <- err
}()
// Then: a PeerUpdate message is sent using the Conn's state
err = testutil.RequireRecvCtx(ctx, t, errCh)
require.NoError(t, err)
_, ok = resp.Msg.(*TunnelMessage_PeerUpdate)
require.True(t, ok)
require.Len(t, resp.GetPeerUpdate().UpsertedWorkspaces, 2)
require.Equal(t, wsID1[:], resp.GetPeerUpdate().UpsertedWorkspaces[0].Id)
require.Equal(t, wsID2[:], resp.GetPeerUpdate().UpsertedWorkspaces[1].Id)
}
func TestTunnel_NetworkSettings(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{})
tun, mgr := setupTunnel(t, ctx, client)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSendCtx(ctx, t, client.ch, conn)
err := testutil.RequireRecvCtx(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)
// When: we inform the tunnel of network settings
go func() {
err := tun.ApplyNetworkSettings(ctx, &NetworkSettingsRequest{
Mtu: 1200,
})
errCh <- err
}()
// Then: the tunnel sends a NetworkSettings message
req := testutil.RequireRecvCtx(ctx, t, mgr.requests)
require.NotNil(t, req.msg.Rpc)
require.Equal(t, uint32(1200), req.msg.GetNetworkSettings().Mtu)
go func() {
testutil.RequireSendCtx(ctx, t, mgr.sendCh, &ManagerMessage{
Rpc: &RPC{ResponseTo: req.msg.Rpc.MsgId},
Msg: &ManagerMessage_NetworkSettings{
NetworkSettings: &NetworkSettingsResponse{
Success: true,
},
},
})
}()
// And: `ApplyNetworkSettings` returns without error once the manager responds
err = testutil.RequireRecvCtx(ctx, t, errCh)
require.NoError(t, err)
}
//nolint:revive // t takes precedence
func setupTunnel(t *testing.T, ctx context.Context, client *fakeClient) (*Tunnel, *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage]) {
mp, tp := net.Pipe()
t.Cleanup(func() { _ = mp.Close() })
t.Cleanup(func() { _ = tp.Close() })
logger := testutil.Logger(t)
var tun *Tunnel
var mgr *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage]
errCh := make(chan error, 2)
go func() {
tunnel, err := NewTunnel(ctx, logger.Named("tunnel"), tp, client)
tun = tunnel
errCh <- err
}()
go func() {
manager, err := newSpeaker[*ManagerMessage, *TunnelMessage](ctx, logger.Named("manager"), mp, SpeakerRoleManager, SpeakerRoleTunnel)
mgr = manager
errCh <- err
}()
err := testutil.RequireRecvCtx(ctx, t, errCh)
require.NoError(t, err)
err = testutil.RequireRecvCtx(ctx, t, errCh)
require.NoError(t, err)
mgr.start()
return tun, mgr
}
+129 -44
View File
@@ -718,7 +718,7 @@ type Agent struct {
Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // UUID
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
WorkspaceId []byte `protobuf:"bytes,3,opt,name=workspace_id,json=workspaceId,proto3" json:"workspace_id,omitempty"` // UUID
Fqdn string `protobuf:"bytes,4,opt,name=fqdn,proto3" json:"fqdn,omitempty"`
Fqdn []string `protobuf:"bytes,4,rep,name=fqdn,proto3" json:"fqdn,omitempty"`
IpAddrs []string `protobuf:"bytes,5,rep,name=ip_addrs,json=ipAddrs,proto3" json:"ip_addrs,omitempty"`
// last_handshake is the primary indicator of whether we are connected to a peer. Zero value or
// anything longer than 5 minutes ago means there is a problem.
@@ -778,11 +778,11 @@ func (x *Agent) GetWorkspaceId() []byte {
return nil
}
func (x *Agent) GetFqdn() string {
func (x *Agent) GetFqdn() []string {
if x != nil {
return x.Fqdn
}
return ""
return nil
}
func (x *Agent) GetIpAddrs() []string {
@@ -953,9 +953,10 @@ type StartRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
TunnelFileDescriptor int32 `protobuf:"varint,1,opt,name=tunnel_file_descriptor,json=tunnelFileDescriptor,proto3" json:"tunnel_file_descriptor,omitempty"`
CoderUrl string `protobuf:"bytes,2,opt,name=coder_url,json=coderUrl,proto3" json:"coder_url,omitempty"`
ApiToken string `protobuf:"bytes,3,opt,name=api_token,json=apiToken,proto3" json:"api_token,omitempty"`
TunnelFileDescriptor int32 `protobuf:"varint,1,opt,name=tunnel_file_descriptor,json=tunnelFileDescriptor,proto3" json:"tunnel_file_descriptor,omitempty"`
CoderUrl string `protobuf:"bytes,2,opt,name=coder_url,json=coderUrl,proto3" json:"coder_url,omitempty"`
ApiToken string `protobuf:"bytes,3,opt,name=api_token,json=apiToken,proto3" json:"api_token,omitempty"`
Headers []*StartRequest_Header `protobuf:"bytes,4,rep,name=headers,proto3" json:"headers,omitempty"`
}
func (x *StartRequest) Reset() {
@@ -1011,6 +1012,13 @@ func (x *StartRequest) GetApiToken() string {
return ""
}
func (x *StartRequest) GetHeaders() []*StartRequest_Header {
if x != nil {
return x.Headers
}
return nil
}
type StartResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -1579,6 +1587,62 @@ func (x *NetworkSettingsRequest_IPv6Settings_IPv6Route) GetRouter() string {
return ""
}
// Additional HTTP headers added to all requests
type StartRequest_Header struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
}
func (x *StartRequest_Header) Reset() {
*x = StartRequest_Header{}
if protoimpl.UnsafeEnabled {
mi := &file_vpn_vpn_proto_msgTypes[20]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StartRequest_Header) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StartRequest_Header) ProtoMessage() {}
func (x *StartRequest_Header) ProtoReflect() protoreflect.Message {
mi := &file_vpn_vpn_proto_msgTypes[20]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StartRequest_Header.ProtoReflect.Descriptor instead.
func (*StartRequest_Header) Descriptor() ([]byte, []int) {
return file_vpn_vpn_proto_rawDescGZIP(), []int{10, 0}
}
func (x *StartRequest_Header) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *StartRequest_Header) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
var File_vpn_vpn_proto protoreflect.FileDescriptor
var file_vpn_vpn_proto_rawDesc = []byte{
@@ -1680,7 +1744,7 @@ var file_vpn_vpn_proto_rawDesc = []byte{
0x6e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x77, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63,
0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x77, 0x6f, 0x72, 0x6b,
0x73, 0x70, 0x61, 0x63, 0x65, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x71, 0x64, 0x6e, 0x18,
0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x66, 0x71, 0x64, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x69,
0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x66, 0x71, 0x64, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x69,
0x70, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x69,
0x70, 0x41, 0x64, 0x64, 0x72, 0x73, 0x12, 0x41, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x68,
0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a,
@@ -1775,30 +1839,37 @@ var file_vpn_vpn_proto_rawDesc = []byte{
0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73,
0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65,
0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x7e, 0x0a, 0x0c, 0x53,
0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x16, 0x74,
0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x72,
0x69, 0x70, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x14, 0x74, 0x75, 0x6e,
0x6e, 0x65, 0x6c, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f,
0x72, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x12, 0x1b,
0x0a, 0x09, 0x61, 0x70, 0x69, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x61, 0x70, 0x69, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x4e, 0x0a, 0x0d, 0x53,
0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07,
0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73,
0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65,
0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x0d, 0x0a, 0x0b, 0x53,
0x74, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x4d, 0x0a, 0x0c, 0x53, 0x74,
0x6f, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75,
0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63,
0x63, 0x65, 0x73, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x39, 0x5a, 0x1d, 0x67, 0x69, 0x74,
0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, 0x6f,
0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x76, 0x70, 0x6e, 0xaa, 0x02, 0x17, 0x43, 0x6f, 0x64,
0x65, 0x72, 0x2e, 0x44, 0x65, 0x73, 0x6b, 0x74, 0x6f, 0x70, 0x2e, 0x56, 0x70, 0x6e, 0x2e, 0x50,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xe6, 0x01, 0x0a, 0x0c,
0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x16,
0x74, 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x64, 0x65, 0x73, 0x63,
0x72, 0x69, 0x70, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x14, 0x74, 0x75,
0x6e, 0x6e, 0x65, 0x6c, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
0x6f, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x75, 0x72, 0x6c, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x12,
0x1b, 0x0a, 0x09, 0x61, 0x70, 0x69, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01,
0x28, 0x09, 0x52, 0x08, 0x61, 0x70, 0x69, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x32, 0x0a, 0x07,
0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e,
0x76, 0x70, 0x6e, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73,
0x1a, 0x32, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14,
0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76,
0x61, 0x6c, 0x75, 0x65, 0x22, 0x4e, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73,
0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12,
0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x22, 0x0d, 0x0a, 0x0b, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x22, 0x4d, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01,
0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x23, 0x0a,
0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x42, 0x39, 0x5a, 0x1d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f,
0x76, 0x70, 0x6e, 0xaa, 0x02, 0x17, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x44, 0x65, 0x73, 0x6b,
0x74, 0x6f, 0x70, 0x2e, 0x56, 0x70, 0x6e, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -1814,7 +1885,7 @@ func file_vpn_vpn_proto_rawDescGZIP() []byte {
}
var file_vpn_vpn_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_vpn_vpn_proto_msgTypes = make([]protoimpl.MessageInfo, 20)
var file_vpn_vpn_proto_msgTypes = make([]protoimpl.MessageInfo, 21)
var file_vpn_vpn_proto_goTypes = []interface{}{
(Log_Level)(0), // 0: vpn.Log.Level
(Workspace_Status)(0), // 1: vpn.Workspace.Status
@@ -1838,7 +1909,8 @@ var file_vpn_vpn_proto_goTypes = []interface{}{
(*NetworkSettingsRequest_IPv6Settings)(nil), // 19: vpn.NetworkSettingsRequest.IPv6Settings
(*NetworkSettingsRequest_IPv4Settings_IPv4Route)(nil), // 20: vpn.NetworkSettingsRequest.IPv4Settings.IPv4Route
(*NetworkSettingsRequest_IPv6Settings_IPv6Route)(nil), // 21: vpn.NetworkSettingsRequest.IPv6Settings.IPv6Route
(*timestamppb.Timestamp)(nil), // 22: google.protobuf.Timestamp
(*StartRequest_Header)(nil), // 22: vpn.StartRequest.Header
(*timestamppb.Timestamp)(nil), // 23: google.protobuf.Timestamp
}
var file_vpn_vpn_proto_depIdxs = []int32{
2, // 0: vpn.ManagerMessage.rpc:type_name -> vpn.RPC
@@ -1859,19 +1931,20 @@ var file_vpn_vpn_proto_depIdxs = []int32{
8, // 15: vpn.PeerUpdate.deleted_workspaces:type_name -> vpn.Workspace
9, // 16: vpn.PeerUpdate.deleted_agents:type_name -> vpn.Agent
1, // 17: vpn.Workspace.status:type_name -> vpn.Workspace.Status
22, // 18: vpn.Agent.last_handshake:type_name -> google.protobuf.Timestamp
23, // 18: vpn.Agent.last_handshake:type_name -> google.protobuf.Timestamp
17, // 19: vpn.NetworkSettingsRequest.dns_settings:type_name -> vpn.NetworkSettingsRequest.DNSSettings
18, // 20: vpn.NetworkSettingsRequest.ipv4_settings:type_name -> vpn.NetworkSettingsRequest.IPv4Settings
19, // 21: vpn.NetworkSettingsRequest.ipv6_settings:type_name -> vpn.NetworkSettingsRequest.IPv6Settings
20, // 22: vpn.NetworkSettingsRequest.IPv4Settings.included_routes:type_name -> vpn.NetworkSettingsRequest.IPv4Settings.IPv4Route
20, // 23: vpn.NetworkSettingsRequest.IPv4Settings.excluded_routes:type_name -> vpn.NetworkSettingsRequest.IPv4Settings.IPv4Route
21, // 24: vpn.NetworkSettingsRequest.IPv6Settings.included_routes:type_name -> vpn.NetworkSettingsRequest.IPv6Settings.IPv6Route
21, // 25: vpn.NetworkSettingsRequest.IPv6Settings.excluded_routes:type_name -> vpn.NetworkSettingsRequest.IPv6Settings.IPv6Route
26, // [26:26] is the sub-list for method output_type
26, // [26:26] is the sub-list for method input_type
26, // [26:26] is the sub-list for extension type_name
26, // [26:26] is the sub-list for extension extendee
0, // [0:26] is the sub-list for field type_name
22, // 22: vpn.StartRequest.headers:type_name -> vpn.StartRequest.Header
20, // 23: vpn.NetworkSettingsRequest.IPv4Settings.included_routes:type_name -> vpn.NetworkSettingsRequest.IPv4Settings.IPv4Route
20, // 24: vpn.NetworkSettingsRequest.IPv4Settings.excluded_routes:type_name -> vpn.NetworkSettingsRequest.IPv4Settings.IPv4Route
21, // 25: vpn.NetworkSettingsRequest.IPv6Settings.included_routes:type_name -> vpn.NetworkSettingsRequest.IPv6Settings.IPv6Route
21, // 26: vpn.NetworkSettingsRequest.IPv6Settings.excluded_routes:type_name -> vpn.NetworkSettingsRequest.IPv6Settings.IPv6Route
27, // [27:27] is the sub-list for method output_type
27, // [27:27] is the sub-list for method input_type
27, // [27:27] is the sub-list for extension type_name
27, // [27:27] is the sub-list for extension extendee
0, // [0:27] is the sub-list for field type_name
}
func init() { file_vpn_vpn_proto_init() }
@@ -2120,6 +2193,18 @@ func file_vpn_vpn_proto_init() {
return nil
}
}
file_vpn_vpn_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StartRequest_Header); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_vpn_vpn_proto_msgTypes[1].OneofWrappers = []interface{}{
(*ManagerMessage_GetPeerUpdate)(nil),
@@ -2140,7 +2225,7 @@ func file_vpn_vpn_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_vpn_vpn_proto_rawDesc,
NumEnums: 2,
NumMessages: 20,
NumMessages: 21,
NumExtensions: 0,
NumServices: 0,
},
+7 -1
View File
@@ -105,7 +105,7 @@ message Agent {
bytes id = 1; // UUID
string name = 2;
bytes workspace_id = 3; // UUID
string fqdn = 4;
repeated string fqdn = 4;
repeated string ip_addrs = 5;
// last_handshake is the primary indicator of whether we are connected to a peer. Zero value or
// anything longer than 5 minutes ago means there is a problem.
@@ -179,6 +179,12 @@ message StartRequest {
int32 tunnel_file_descriptor = 1;
string coder_url = 2;
string api_token = 3;
// Additional HTTP headers added to all requests
message Header {
string name = 1;
string value = 2;
}
repeated Header headers = 4;
}
message StartResponse {