diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index 372303c320..48b550c9ed 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -3770,6 +3770,25 @@ const docTemplate = `{ } } }, + "/tailnet": { + "get": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "tags": [ + "Agents" + ], + "summary": "User-scoped tailnet RPC connection", + "operationId": "user-scoped-tailnet-rpc-connection", + "responses": { + "101": { + "description": "Switching Protocols" + } + } + } + }, "/templates": { "get": { "security": [ diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index db8b53e966..c9c79b443d 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -3316,6 +3316,23 @@ } } }, + "/tailnet": { + "get": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "tags": ["Agents"], + "summary": "User-scoped tailnet RPC connection", + "operationId": "user-scoped-tailnet-rpc-connection", + "responses": { + "101": { + "description": "Switching Protocols" + } + } + } + }, "/templates": { "get": { "security": [ diff --git a/coderd/coderd.go b/coderd/coderd.go index 70101b7020..39df674fec 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -493,6 +493,8 @@ func New(options *Options) *API { } } + updatesProvider := NewUpdatesProvider(options.Logger.Named("workspace_updates"), options.Pubsub, options.Database, options.Authorizer) + // Start a background process that rotates keys. We intentionally start this after the caches // are created to force initial requests for a key to populate the caches. This helps catch // bugs that may only occur when a key isn't precached in tests and the latency cost is minimal. @@ -523,6 +525,7 @@ func New(options *Options) *API { metricsCache: metricsCache, Auditor: atomic.Pointer[audit.Auditor]{}, TailnetCoordinator: atomic.Pointer[tailnet.Coordinator]{}, + UpdatesProvider: updatesProvider, TemplateScheduleStore: options.TemplateScheduleStore, UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore, AccessControlStore: options.AccessControlStore, @@ -652,12 +655,13 @@ func New(options *Options) *API { panic("CoordinatorResumeTokenProvider is nil") } api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{ - Logger: api.Logger.Named("tailnetclient"), - CoordPtr: &api.TailnetCoordinator, - DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency, - DERPMapFn: api.DERPMap, - NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler, - ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider, + Logger: api.Logger.Named("tailnetclient"), + CoordPtr: &api.TailnetCoordinator, + DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency, + DERPMapFn: api.DERPMap, + NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler, + ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider, + WorkspaceUpdatesProvider: api.UpdatesProvider, }) if err != nil { api.Logger.Fatal(context.Background(), "failed to initialize tailnet client service", slog.Error(err)) @@ -1327,6 +1331,10 @@ func New(options *Options) *API { }) r.Get("/dispatch-methods", api.notificationDispatchMethods) }) + r.Route("/tailnet", func(r chi.Router) { + r.Use(apiKeyMiddleware) + r.Get("/", api.tailnetRPCConn) + }) }) if options.SwaggerEndpoint { @@ -1408,6 +1416,8 @@ type API struct { AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore] PortSharer atomic.Pointer[portsharing.PortSharer] + UpdatesProvider tailnet.WorkspaceUpdatesProvider + HTTPAuth *HTTPAuthorizer // APIHandler serves "/api/v2" @@ -1489,6 +1499,7 @@ func (api *API) Close() error { _ = api.OIDCConvertKeyCache.Close() _ = api.AppSigningKeyCache.Close() _ = api.AppEncryptionKeyCache.Close() + _ = api.UpdatesProvider.Close() return nil } diff --git a/coderd/database/dbfake/dbfake.go b/coderd/database/dbfake/dbfake.go index 3ff9f59fa1..ca514479ca 100644 --- a/coderd/database/dbfake/dbfake.go +++ b/coderd/database/dbfake/dbfake.go @@ -224,6 +224,14 @@ func (b WorkspaceBuildBuilder) Do() WorkspaceResponse { } _ = dbgen.WorkspaceBuildParameters(b.t, b.db, b.params) + if b.ws.Deleted { + err = b.db.UpdateWorkspaceDeletedByID(ownerCtx, database.UpdateWorkspaceDeletedByIDParams{ + ID: b.ws.ID, + Deleted: true, + }) + require.NoError(b.t, err) + } + if b.ps != nil { msg, err := json.Marshal(wspubsub.WorkspaceEvent{ Kind: wspubsub.WorkspaceEventKindStateChange, diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 14e986123e..922d80f0e8 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -33,6 +33,7 @@ import ( "github.com/coder/coder/v2/coderd/httpapi" "github.com/coder/coder/v2/coderd/httpmw" "github.com/coder/coder/v2/coderd/jwtutils" + "github.com/coder/coder/v2/coderd/rbac" "github.com/coder/coder/v2/coderd/rbac/policy" "github.com/coder/coder/v2/coderd/wspubsub" "github.com/coder/coder/v2/codersdk" @@ -844,31 +845,10 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R return } - // Accept a resume_token query parameter to use the same peer ID. - var ( - peerID = uuid.New() - resumeToken = r.URL.Query().Get("resume_token") - ) - if resumeToken != "" { - var err error - peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(ctx, resumeToken) - // If the token is missing the key ID, it's probably an old token in which - // case we just want to generate a new peer ID. - if xerrors.Is(err, jwtutils.ErrMissingKeyID) { - peerID = uuid.New() - } else if err != nil { - httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{ - Message: workspacesdk.CoordinateAPIInvalidResumeToken, - Detail: err.Error(), - Validations: []codersdk.ValidationError{ - {Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken}, - }, - }) - return - } else { - api.Logger.Debug(ctx, "accepted coordinate resume token for peer", - slog.F("peer_id", peerID.String())) - } + peerID, err := api.handleResumeToken(ctx, rw, r) + if err != nil { + // handleResumeToken has already written the response. + return } api.WebsocketWaitMutex.Lock() @@ -891,13 +871,47 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R go httpapi.Heartbeat(ctx, conn) defer conn.Close(websocket.StatusNormalClosure, "") - err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, peerID, workspaceAgent.ID) + err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{ + Name: "client", + ID: peerID, + Auth: tailnet.ClientCoordinateeAuth{ + AgentID: workspaceAgent.ID, + }, + }) if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) { _ = conn.Close(websocket.StatusInternalError, err.Error()) return } } +// handleResumeToken accepts a resume_token query parameter to use the same peer ID +func (api *API) handleResumeToken(ctx context.Context, rw http.ResponseWriter, r *http.Request) (peerID uuid.UUID, err error) { + peerID = uuid.New() + resumeToken := r.URL.Query().Get("resume_token") + if resumeToken != "" { + peerID, err = api.Options.CoordinatorResumeTokenProvider.VerifyResumeToken(ctx, resumeToken) + // If the token is missing the key ID, it's probably an old token in which + // case we just want to generate a new peer ID. + if xerrors.Is(err, jwtutils.ErrMissingKeyID) { + peerID = uuid.New() + err = nil + } else if err != nil { + httpapi.Write(ctx, rw, http.StatusUnauthorized, codersdk.Response{ + Message: workspacesdk.CoordinateAPIInvalidResumeToken, + Detail: err.Error(), + Validations: []codersdk.ValidationError{ + {Field: "resume_token", Detail: workspacesdk.CoordinateAPIInvalidResumeToken}, + }, + }) + return peerID, err + } else { + api.Logger.Debug(ctx, "accepted coordinate resume token for peer", + slog.F("peer_id", peerID.String())) + } + } + return peerID, err +} + // @Summary Post workspace agent log source // @ID post-workspace-agent-log-source // @Security CoderSessionToken @@ -1469,6 +1483,80 @@ func (api *API) workspaceAgentsExternalAuthListen(ctx context.Context, rw http.R } } +// @Summary User-scoped tailnet RPC connection +// @ID user-scoped-tailnet-rpc-connection +// @Security CoderSessionToken +// @Tags Agents +// @Success 101 +// @Router /tailnet [get] +func (api *API) tailnetRPCConn(rw http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + version := "2.0" + qv := r.URL.Query().Get("version") + if qv != "" { + version = qv + } + if err := proto.CurrentVersion.Validate(version); err != nil { + httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ + Message: "Unknown or unsupported API version", + Validations: []codersdk.ValidationError{ + {Field: "version", Detail: err.Error()}, + }, + }) + return + } + + peerID, err := api.handleResumeToken(ctx, rw, r) + if err != nil { + // handleResumeToken has already written the response. + return + } + + // Used to authorize tunnel request + sshPrep, err := api.HTTPAuth.AuthorizeSQLFilter(r, policy.ActionSSH, rbac.ResourceWorkspace.Type) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error preparing sql filter.", + Detail: err.Error(), + }) + return + } + + api.WebsocketWaitMutex.Lock() + api.WebsocketWaitGroup.Add(1) + api.WebsocketWaitMutex.Unlock() + defer api.WebsocketWaitGroup.Done() + + conn, err := websocket.Accept(rw, r, nil) + if err != nil { + httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ + Message: "Failed to accept websocket.", + Detail: err.Error(), + }) + return + } + ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary) + defer wsNetConn.Close() + defer conn.Close(websocket.StatusNormalClosure, "") + + go httpapi.Heartbeat(ctx, conn) + err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{ + Name: "client", + ID: peerID, + Auth: tailnet.ClientUserCoordinateeAuth{ + Auth: &rbacAuthorizer{ + sshPrep: sshPrep, + db: api.Database, + }, + }, + }) + if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) { + _ = conn.Close(websocket.StatusInternalError, err.Error()) + return + } +} + // createExternalAuthResponse creates an ExternalAuthResponse based on the // provider type. This is to support legacy `/workspaceagents/me/gitauth` // which uses `Username` and `Password`. diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index ba67797547..1ab2eb64b8 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "maps" "net" "net/http" "runtime" @@ -38,6 +39,7 @@ import ( "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/externalauth" "github.com/coder/coder/v2/coderd/jwtutils" + "github.com/coder/coder/v2/coderd/rbac" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" @@ -1930,6 +1932,106 @@ func TestWorkspaceAgentExternalAuthListen(t *testing.T) { }) } +func TestOwnedWorkspacesCoordinate(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitLong) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + firstClient, _, api := coderdtest.NewWithAPI(t, &coderdtest.Options{ + Coordinator: tailnet.NewCoordinator(logger), + }) + firstUser := coderdtest.CreateFirstUser(t, firstClient) + member, memberUser := coderdtest.CreateAnotherUser(t, firstClient, firstUser.OrganizationID, rbac.RoleTemplateAdmin()) + + // Create a workspace with an agent + firstWorkspace := buildWorkspaceWithAgent(t, member, firstUser.OrganizationID, memberUser.ID, api.Database, api.Pubsub) + + u, err := member.URL.Parse("/api/v2/tailnet") + require.NoError(t, err) + q := u.Query() + q.Set("version", "2.0") + u.RawQuery = q.Encode() + + //nolint:bodyclose // websocket package closes this for you + wsConn, resp, err := websocket.Dial(ctx, u.String(), &websocket.DialOptions{ + HTTPHeader: http.Header{ + "Coder-Session-Token": []string{member.SessionToken()}, + }, + }) + if err != nil { + if resp.StatusCode != http.StatusSwitchingProtocols { + err = codersdk.ReadBodyAsError(resp) + } + require.NoError(t, err) + } + defer wsConn.Close(websocket.StatusNormalClosure, "done") + + rpcClient, err := tailnet.NewDRPCClient( + websocket.NetConn(ctx, wsConn, websocket.MessageBinary), + logger, + ) + require.NoError(t, err) + + stream, err := rpcClient.WorkspaceUpdates(ctx, &tailnetproto.WorkspaceUpdatesRequest{ + WorkspaceOwnerId: tailnet.UUIDToByteSlice(memberUser.ID), + }) + require.NoError(t, err) + + // First update will contain the existing workspace and agent + update, err := stream.Recv() + require.NoError(t, err) + require.Len(t, update.UpsertedWorkspaces, 1) + require.EqualValues(t, update.UpsertedWorkspaces[0].Id, firstWorkspace.ID) + require.Len(t, update.UpsertedAgents, 1) + require.EqualValues(t, update.UpsertedAgents[0].WorkspaceId, firstWorkspace.ID) + require.Len(t, update.DeletedWorkspaces, 0) + require.Len(t, update.DeletedAgents, 0) + + // Build a second workspace + secondWorkspace := buildWorkspaceWithAgent(t, member, firstUser.OrganizationID, memberUser.ID, api.Database, api.Pubsub) + + // Wait for the second workspace to be running with an agent + expectedState := map[uuid.UUID]workspace{ + secondWorkspace.ID: { + Status: tailnetproto.Workspace_RUNNING, + NumAgents: 1, + }, + } + waitForUpdates(t, ctx, stream, map[uuid.UUID]workspace{}, expectedState) + + // Wait for the workspace and agent to be deleted + secondWorkspace.Deleted = true + dbfake.WorkspaceBuild(t, api.Database, secondWorkspace). + Seed(database.WorkspaceBuild{ + Transition: database.WorkspaceTransitionDelete, + BuildNumber: 2, + }).Do() + + waitForUpdates(t, ctx, stream, expectedState, map[uuid.UUID]workspace{ + secondWorkspace.ID: { + Status: tailnetproto.Workspace_DELETED, + NumAgents: 0, + }, + }) +} + +func buildWorkspaceWithAgent( + t *testing.T, + client *codersdk.Client, + orgID uuid.UUID, + ownerID uuid.UUID, + db database.Store, + ps pubsub.Pubsub, +) database.WorkspaceTable { + r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ + OrganizationID: orgID, + OwnerID: ownerID, + }).WithAgent().Pubsub(ps).Do() + _ = agenttest.New(t, client.URL, r.AgentToken) + coderdtest.NewWorkspaceAgentWaiter(t, client, r.Workspace.ID).Wait() + return r.Workspace +} + func requireGetManifest(ctx context.Context, t testing.TB, aAPI agentproto.DRPCAgentClient) agentsdk.Manifest { mp, err := aAPI.GetManifest(ctx, &agentproto.GetManifestRequest{}) require.NoError(t, err) @@ -1949,3 +2051,91 @@ func postStartup(ctx context.Context, t testing.TB, client agent.Client, startup _, err = aAPI.UpdateStartup(ctx, &agentproto.UpdateStartupRequest{Startup: startup}) return err } + +type workspace struct { + Status tailnetproto.Workspace_Status + NumAgents int +} + +func waitForUpdates( + t *testing.T, + //nolint:revive // t takes precedence + ctx context.Context, + stream tailnetproto.DRPCTailnet_WorkspaceUpdatesClient, + currentState map[uuid.UUID]workspace, + expectedState map[uuid.UUID]workspace, +) { + t.Helper() + errCh := make(chan error, 1) + go func() { + for { + select { + case <-ctx.Done(): + errCh <- ctx.Err() + return + default: + } + update, err := stream.Recv() + if err != nil { + errCh <- err + return + } + for _, ws := range update.UpsertedWorkspaces { + id, err := uuid.FromBytes(ws.Id) + if err != nil { + errCh <- err + return + } + currentState[id] = workspace{ + Status: ws.Status, + NumAgents: currentState[id].NumAgents, + } + } + for _, ws := range update.DeletedWorkspaces { + id, err := uuid.FromBytes(ws.Id) + if err != nil { + errCh <- err + return + } + currentState[id] = workspace{ + Status: tailnetproto.Workspace_DELETED, + NumAgents: currentState[id].NumAgents, + } + } + for _, a := range update.UpsertedAgents { + id, err := uuid.FromBytes(a.WorkspaceId) + if err != nil { + errCh <- err + return + } + currentState[id] = workspace{ + Status: currentState[id].Status, + NumAgents: currentState[id].NumAgents + 1, + } + } + for _, a := range update.DeletedAgents { + id, err := uuid.FromBytes(a.WorkspaceId) + if err != nil { + errCh <- err + return + } + currentState[id] = workspace{ + Status: currentState[id].Status, + NumAgents: currentState[id].NumAgents - 1, + } + } + if maps.Equal(currentState, expectedState) { + errCh <- nil + return + } + } + }() + select { + case err := <-errCh: + if err != nil { + t.Fatal(err) + } + case <-ctx.Done(): + t.Fatal("Timeout waiting for desired state", currentState) + } +} diff --git a/coderd/workspacebuilds.go b/coderd/workspacebuilds.go index da785ac3a5..0974d85b54 100644 --- a/coderd/workspacebuilds.go +++ b/coderd/workspacebuilds.go @@ -916,7 +916,7 @@ func (api *API) convertWorkspaceBuild( MaxDeadline: codersdk.NewNullTime(build.MaxDeadline, !build.MaxDeadline.IsZero()), Reason: codersdk.BuildReason(build.Reason), Resources: apiResources, - Status: convertWorkspaceStatus(apiJob.Status, transition), + Status: codersdk.ConvertWorkspaceStatus(apiJob.Status, transition), DailyCost: build.DailyCost, }, nil } @@ -946,40 +946,6 @@ func convertWorkspaceResource(resource database.WorkspaceResource, agents []code } } -func convertWorkspaceStatus(jobStatus codersdk.ProvisionerJobStatus, transition codersdk.WorkspaceTransition) codersdk.WorkspaceStatus { - switch jobStatus { - case codersdk.ProvisionerJobPending: - return codersdk.WorkspaceStatusPending - case codersdk.ProvisionerJobRunning: - switch transition { - case codersdk.WorkspaceTransitionStart: - return codersdk.WorkspaceStatusStarting - case codersdk.WorkspaceTransitionStop: - return codersdk.WorkspaceStatusStopping - case codersdk.WorkspaceTransitionDelete: - return codersdk.WorkspaceStatusDeleting - } - case codersdk.ProvisionerJobSucceeded: - switch transition { - case codersdk.WorkspaceTransitionStart: - return codersdk.WorkspaceStatusRunning - case codersdk.WorkspaceTransitionStop: - return codersdk.WorkspaceStatusStopped - case codersdk.WorkspaceTransitionDelete: - return codersdk.WorkspaceStatusDeleted - } - case codersdk.ProvisionerJobCanceling: - return codersdk.WorkspaceStatusCanceling - case codersdk.ProvisionerJobCanceled: - return codersdk.WorkspaceStatusCanceled - case codersdk.ProvisionerJobFailed: - return codersdk.WorkspaceStatusFailed - } - - // return error status since we should never get here - return codersdk.WorkspaceStatusFailed -} - func (api *API) buildTimings(ctx context.Context, build database.WorkspaceBuild) (codersdk.WorkspaceBuildTimings, error) { provisionerTimings, err := api.Database.GetProvisionerJobTimingsByJobID(ctx, build.JobID) if err != nil && !errors.Is(err, sql.ErrNoRows) { diff --git a/coderd/workspaceupdates.go b/coderd/workspaceupdates.go new file mode 100644 index 0000000000..630a4be49e --- /dev/null +++ b/coderd/workspaceupdates.go @@ -0,0 +1,313 @@ +package coderd + +import ( + "context" + "fmt" + "sync" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" + + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbauthz" + "github.com/coder/coder/v2/coderd/database/pubsub" + "github.com/coder/coder/v2/coderd/rbac" + "github.com/coder/coder/v2/coderd/util/slice" + "github.com/coder/coder/v2/coderd/wspubsub" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/tailnet" + "github.com/coder/coder/v2/tailnet/proto" +) + +type UpdatesQuerier interface { + // GetAuthorizedWorkspacesAndAgentsByOwnerID requires a context with an actor set + GetWorkspacesAndAgentsByOwnerID(ctx context.Context, ownerID uuid.UUID) ([]database.GetWorkspacesAndAgentsByOwnerIDRow, error) + GetWorkspaceByAgentID(ctx context.Context, agentID uuid.UUID) (database.Workspace, error) +} + +type workspacesByID = map[uuid.UUID]ownedWorkspace + +type ownedWorkspace struct { + WorkspaceName string + Status proto.Workspace_Status + Agents []database.AgentIDNamePair +} + +// Equal does not compare agents +func (w ownedWorkspace) Equal(other ownedWorkspace) bool { + return w.WorkspaceName == other.WorkspaceName && + w.Status == other.Status +} + +type sub struct { + // ALways contains an actor + ctx context.Context + cancelFn context.CancelFunc + + mu sync.RWMutex + userID uuid.UUID + ch chan *proto.WorkspaceUpdate + prev workspacesByID + + db UpdatesQuerier + ps pubsub.Pubsub + logger slog.Logger + + psCancelFn func() +} + +func (s *sub) handleEvent(ctx context.Context, event wspubsub.WorkspaceEvent, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + switch event.Kind { + case wspubsub.WorkspaceEventKindStateChange: + case wspubsub.WorkspaceEventKindAgentConnectionUpdate: + case wspubsub.WorkspaceEventKindAgentTimeout: + case wspubsub.WorkspaceEventKindAgentLifecycleUpdate: + default: + if err == nil { + return + } else { + // Always attempt an update if the pubsub lost connection + s.logger.Warn(ctx, "failed to handle workspace event", slog.Error(err)) + } + } + + // Use context containing actor + rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(s.ctx, s.userID) + if err != nil { + s.logger.Warn(ctx, "failed to get workspaces and agents by owner ID", slog.Error(err)) + return + } + latest := convertRows(rows) + + out, updated := produceUpdate(s.prev, latest) + if !updated { + return + } + + s.prev = latest + select { + case <-s.ctx.Done(): + return + case s.ch <- out: + } +} + +func (s *sub) start(ctx context.Context) (err error) { + rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(ctx, s.userID) + if err != nil { + return xerrors.Errorf("get workspaces and agents by owner ID: %w", err) + } + + latest := convertRows(rows) + initUpdate, _ := produceUpdate(workspacesByID{}, latest) + s.ch <- initUpdate + s.prev = latest + + cancel, err := s.ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(s.userID), wspubsub.HandleWorkspaceEvent(s.handleEvent)) + if err != nil { + return xerrors.Errorf("subscribe to workspace event channel: %w", err) + } + + s.psCancelFn = cancel + return nil +} + +func (s *sub) Close() error { + s.cancelFn() + + if s.psCancelFn != nil { + s.psCancelFn() + } + + close(s.ch) + return nil +} + +func (s *sub) Updates() <-chan *proto.WorkspaceUpdate { + return s.ch +} + +var _ tailnet.Subscription = (*sub)(nil) + +type updatesProvider struct { + ps pubsub.Pubsub + logger slog.Logger + db UpdatesQuerier + auth rbac.Authorizer + + ctx context.Context + cancelFn func() +} + +var _ tailnet.WorkspaceUpdatesProvider = (*updatesProvider)(nil) + +func NewUpdatesProvider( + logger slog.Logger, + ps pubsub.Pubsub, + db UpdatesQuerier, + auth rbac.Authorizer, +) tailnet.WorkspaceUpdatesProvider { + ctx, cancel := context.WithCancel(context.Background()) + out := &updatesProvider{ + auth: auth, + db: db, + ps: ps, + logger: logger, + ctx: ctx, + cancelFn: cancel, + } + return out +} + +func (u *updatesProvider) Close() error { + u.cancelFn() + return nil +} + +// Subscribe subscribes to workspace updates for a user, for the workspaces +// that user is authorized to `ActionRead` on. The provided context must have +// a dbauthz actor set. +func (u *updatesProvider) Subscribe(ctx context.Context, userID uuid.UUID) (tailnet.Subscription, error) { + actor, ok := dbauthz.ActorFromContext(ctx) + if !ok { + return nil, xerrors.Errorf("actor not found in context") + } + ctx, cancel := context.WithCancel(u.ctx) + ctx = dbauthz.As(ctx, actor) + ch := make(chan *proto.WorkspaceUpdate, 1) + sub := &sub{ + ctx: ctx, + cancelFn: cancel, + userID: userID, + ch: ch, + db: u.db, + ps: u.ps, + logger: u.logger.Named(fmt.Sprintf("workspace_updates_subscriber_%s", userID)), + prev: workspacesByID{}, + } + err := sub.start(ctx) + if err != nil { + _ = sub.Close() + return nil, err + } + + return sub, nil +} + +func produceUpdate(old, new workspacesByID) (out *proto.WorkspaceUpdate, updated bool) { + out = &proto.WorkspaceUpdate{ + UpsertedWorkspaces: []*proto.Workspace{}, + UpsertedAgents: []*proto.Agent{}, + DeletedWorkspaces: []*proto.Workspace{}, + DeletedAgents: []*proto.Agent{}, + } + + for wsID, newWorkspace := range new { + oldWorkspace, exists := old[wsID] + // Upsert both workspace and agents if the workspace is new + if !exists { + out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &proto.Workspace{ + Id: tailnet.UUIDToByteSlice(wsID), + Name: newWorkspace.WorkspaceName, + Status: newWorkspace.Status, + }) + for _, agent := range newWorkspace.Agents { + out.UpsertedAgents = append(out.UpsertedAgents, &proto.Agent{ + Id: tailnet.UUIDToByteSlice(agent.ID), + Name: agent.Name, + WorkspaceId: tailnet.UUIDToByteSlice(wsID), + }) + } + updated = true + continue + } + // Upsert workspace if the workspace is updated + if !newWorkspace.Equal(oldWorkspace) { + out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &proto.Workspace{ + Id: tailnet.UUIDToByteSlice(wsID), + Name: newWorkspace.WorkspaceName, + Status: newWorkspace.Status, + }) + updated = true + } + + add, remove := slice.SymmetricDifference(oldWorkspace.Agents, newWorkspace.Agents) + for _, agent := range add { + out.UpsertedAgents = append(out.UpsertedAgents, &proto.Agent{ + Id: tailnet.UUIDToByteSlice(agent.ID), + Name: agent.Name, + WorkspaceId: tailnet.UUIDToByteSlice(wsID), + }) + updated = true + } + for _, agent := range remove { + out.DeletedAgents = append(out.DeletedAgents, &proto.Agent{ + Id: tailnet.UUIDToByteSlice(agent.ID), + Name: agent.Name, + WorkspaceId: tailnet.UUIDToByteSlice(wsID), + }) + updated = true + } + } + + // Delete workspace and agents if the workspace is deleted + for wsID, oldWorkspace := range old { + if _, exists := new[wsID]; !exists { + out.DeletedWorkspaces = append(out.DeletedWorkspaces, &proto.Workspace{ + Id: tailnet.UUIDToByteSlice(wsID), + Name: oldWorkspace.WorkspaceName, + Status: oldWorkspace.Status, + }) + for _, agent := range oldWorkspace.Agents { + out.DeletedAgents = append(out.DeletedAgents, &proto.Agent{ + Id: tailnet.UUIDToByteSlice(agent.ID), + Name: agent.Name, + WorkspaceId: tailnet.UUIDToByteSlice(wsID), + }) + } + updated = true + } + } + + return out, updated +} + +func convertRows(rows []database.GetWorkspacesAndAgentsByOwnerIDRow) workspacesByID { + out := workspacesByID{} + for _, row := range rows { + agents := []database.AgentIDNamePair{} + for _, agent := range row.Agents { + agents = append(agents, database.AgentIDNamePair{ + ID: agent.ID, + Name: agent.Name, + }) + } + out[row.ID] = ownedWorkspace{ + WorkspaceName: row.Name, + Status: tailnet.WorkspaceStatusToProto(codersdk.ConvertWorkspaceStatus(codersdk.ProvisionerJobStatus(row.JobStatus), codersdk.WorkspaceTransition(row.Transition))), + Agents: agents, + } + } + return out +} + +type rbacAuthorizer struct { + sshPrep rbac.PreparedAuthorized + db UpdatesQuerier +} + +func (r *rbacAuthorizer) AuthorizeTunnel(ctx context.Context, agentID uuid.UUID) error { + ws, err := r.db.GetWorkspaceByAgentID(ctx, agentID) + if err != nil { + return xerrors.Errorf("get workspace by agent ID: %w", err) + } + // Authorizes against `ActionSSH` + return r.sshPrep.Authorize(ctx, ws.RBACObject()) +} + +var _ tailnet.TunnelAuthorizer = (*rbacAuthorizer)(nil) diff --git a/coderd/workspaceupdates_test.go b/coderd/workspaceupdates_test.go new file mode 100644 index 0000000000..7c01e6611f --- /dev/null +++ b/coderd/workspaceupdates_test.go @@ -0,0 +1,371 @@ +package coderd_test + +import ( + "context" + "encoding/json" + "slices" + "strings" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/coderd" + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbauthz" + "github.com/coder/coder/v2/coderd/database/pubsub" + "github.com/coder/coder/v2/coderd/rbac" + "github.com/coder/coder/v2/coderd/rbac/policy" + "github.com/coder/coder/v2/coderd/wspubsub" + "github.com/coder/coder/v2/tailnet" + "github.com/coder/coder/v2/tailnet/proto" + "github.com/coder/coder/v2/testutil" +) + +func TestWorkspaceUpdates(t *testing.T) { + t.Parallel() + + ws1ID := uuid.UUID{0x01} + ws1IDSlice := tailnet.UUIDToByteSlice(ws1ID) + agent1ID := uuid.UUID{0x02} + agent1IDSlice := tailnet.UUIDToByteSlice(agent1ID) + ws2ID := uuid.UUID{0x03} + ws2IDSlice := tailnet.UUIDToByteSlice(ws2ID) + ws3ID := uuid.UUID{0x04} + ws3IDSlice := tailnet.UUIDToByteSlice(ws3ID) + agent2ID := uuid.UUID{0x05} + agent2IDSlice := tailnet.UUIDToByteSlice(agent2ID) + ws4ID := uuid.UUID{0x06} + ws4IDSlice := tailnet.UUIDToByteSlice(ws4ID) + agent3ID := uuid.UUID{0x07} + agent3IDSlice := tailnet.UUIDToByteSlice(agent3ID) + + ownerID := uuid.UUID{0x08} + memberRole, err := rbac.RoleByName(rbac.RoleMember()) + require.NoError(t, err) + ownerSubject := rbac.Subject{ + FriendlyName: "member", + ID: ownerID.String(), + Roles: rbac.Roles{memberRole}, + Scope: rbac.ScopeAll, + } + + t.Run("Basic", func(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + + db := &mockWorkspaceStore{ + orderedRows: []database.GetWorkspacesAndAgentsByOwnerIDRow{ + // Gains agent2 + { + ID: ws1ID, + Name: "ws1", + JobStatus: database.ProvisionerJobStatusRunning, + Transition: database.WorkspaceTransitionStart, + Agents: []database.AgentIDNamePair{ + { + ID: agent1ID, + Name: "agent1", + }, + }, + }, + // Changes status + { + ID: ws2ID, + Name: "ws2", + JobStatus: database.ProvisionerJobStatusRunning, + Transition: database.WorkspaceTransitionStart, + }, + // Is deleted + { + ID: ws3ID, + Name: "ws3", + JobStatus: database.ProvisionerJobStatusSucceeded, + Transition: database.WorkspaceTransitionStop, + Agents: []database.AgentIDNamePair{ + { + ID: agent3ID, + Name: "agent3", + }, + }, + }, + }, + } + + ps := &mockPubsub{ + cbs: map[string]pubsub.ListenerWithErr{}, + } + + updateProvider := coderd.NewUpdatesProvider(slogtest.Make(t, nil), ps, db, &mockAuthorizer{}) + t.Cleanup(func() { + _ = updateProvider.Close() + }) + + sub, err := updateProvider.Subscribe(dbauthz.As(ctx, ownerSubject), ownerID) + require.NoError(t, err) + t.Cleanup(func() { + _ = sub.Close() + }) + + update := testutil.RequireRecvCtx(ctx, t, sub.Updates()) + slices.SortFunc(update.UpsertedWorkspaces, func(a, b *proto.Workspace) int { + return strings.Compare(a.Name, b.Name) + }) + slices.SortFunc(update.UpsertedAgents, func(a, b *proto.Agent) int { + return strings.Compare(a.Name, b.Name) + }) + require.Equal(t, &proto.WorkspaceUpdate{ + UpsertedWorkspaces: []*proto.Workspace{ + { + Id: ws1IDSlice, + Name: "ws1", + Status: proto.Workspace_STARTING, + }, + { + Id: ws2IDSlice, + Name: "ws2", + Status: proto.Workspace_STARTING, + }, + { + Id: ws3IDSlice, + Name: "ws3", + Status: proto.Workspace_STOPPED, + }, + }, + UpsertedAgents: []*proto.Agent{ + { + Id: agent1IDSlice, + Name: "agent1", + WorkspaceId: ws1IDSlice, + }, + { + Id: agent3IDSlice, + Name: "agent3", + WorkspaceId: ws3IDSlice, + }, + }, + DeletedWorkspaces: []*proto.Workspace{}, + DeletedAgents: []*proto.Agent{}, + }, update) + + // Update the database + db.orderedRows = []database.GetWorkspacesAndAgentsByOwnerIDRow{ + { + ID: ws1ID, + Name: "ws1", + JobStatus: database.ProvisionerJobStatusRunning, + Transition: database.WorkspaceTransitionStart, + Agents: []database.AgentIDNamePair{ + { + ID: agent1ID, + Name: "agent1", + }, + { + ID: agent2ID, + Name: "agent2", + }, + }, + }, + { + ID: ws2ID, + Name: "ws2", + JobStatus: database.ProvisionerJobStatusRunning, + Transition: database.WorkspaceTransitionStop, + }, + { + ID: ws4ID, + Name: "ws4", + JobStatus: database.ProvisionerJobStatusRunning, + Transition: database.WorkspaceTransitionStart, + }, + } + publishWorkspaceEvent(t, ps, ownerID, &wspubsub.WorkspaceEvent{ + Kind: wspubsub.WorkspaceEventKindStateChange, + WorkspaceID: ws1ID, + }) + + update = testutil.RequireRecvCtx(ctx, t, sub.Updates()) + slices.SortFunc(update.UpsertedWorkspaces, func(a, b *proto.Workspace) int { + return strings.Compare(a.Name, b.Name) + }) + require.Equal(t, &proto.WorkspaceUpdate{ + UpsertedWorkspaces: []*proto.Workspace{ + { + // Changed status + Id: ws2IDSlice, + Name: "ws2", + Status: proto.Workspace_STOPPING, + }, + { + // New workspace + Id: ws4IDSlice, + Name: "ws4", + Status: proto.Workspace_STARTING, + }, + }, + UpsertedAgents: []*proto.Agent{ + { + Id: agent2IDSlice, + Name: "agent2", + WorkspaceId: ws1IDSlice, + }, + }, + DeletedWorkspaces: []*proto.Workspace{ + { + Id: ws3IDSlice, + Name: "ws3", + Status: proto.Workspace_STOPPED, + }, + }, + DeletedAgents: []*proto.Agent{ + { + Id: agent3IDSlice, + Name: "agent3", + WorkspaceId: ws3IDSlice, + }, + }, + }, update) + }) + + t.Run("Resubscribe", func(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + + db := &mockWorkspaceStore{ + orderedRows: []database.GetWorkspacesAndAgentsByOwnerIDRow{ + { + ID: ws1ID, + Name: "ws1", + JobStatus: database.ProvisionerJobStatusRunning, + Transition: database.WorkspaceTransitionStart, + Agents: []database.AgentIDNamePair{ + { + ID: agent1ID, + Name: "agent1", + }, + }, + }, + }, + } + + ps := &mockPubsub{ + cbs: map[string]pubsub.ListenerWithErr{}, + } + + updateProvider := coderd.NewUpdatesProvider(slogtest.Make(t, nil), ps, db, &mockAuthorizer{}) + t.Cleanup(func() { + _ = updateProvider.Close() + }) + + sub, err := updateProvider.Subscribe(dbauthz.As(ctx, ownerSubject), ownerID) + require.NoError(t, err) + t.Cleanup(func() { + _ = sub.Close() + }) + + expected := &proto.WorkspaceUpdate{ + UpsertedWorkspaces: []*proto.Workspace{ + { + Id: ws1IDSlice, + Name: "ws1", + Status: proto.Workspace_STARTING, + }, + }, + UpsertedAgents: []*proto.Agent{ + { + Id: agent1IDSlice, + Name: "agent1", + WorkspaceId: ws1IDSlice, + }, + }, + DeletedWorkspaces: []*proto.Workspace{}, + DeletedAgents: []*proto.Agent{}, + } + + update := testutil.RequireRecvCtx(ctx, t, sub.Updates()) + slices.SortFunc(update.UpsertedWorkspaces, func(a, b *proto.Workspace) int { + return strings.Compare(a.Name, b.Name) + }) + require.Equal(t, expected, update) + + resub, err := updateProvider.Subscribe(dbauthz.As(ctx, ownerSubject), ownerID) + require.NoError(t, err) + t.Cleanup(func() { + _ = resub.Close() + }) + + update = testutil.RequireRecvCtx(ctx, t, resub.Updates()) + slices.SortFunc(update.UpsertedWorkspaces, func(a, b *proto.Workspace) int { + return strings.Compare(a.Name, b.Name) + }) + require.Equal(t, expected, update) + }) +} + +func publishWorkspaceEvent(t *testing.T, ps pubsub.Pubsub, ownerID uuid.UUID, event *wspubsub.WorkspaceEvent) { + msg, err := json.Marshal(event) + require.NoError(t, err) + ps.Publish(wspubsub.WorkspaceEventChannel(ownerID), msg) +} + +type mockWorkspaceStore struct { + orderedRows []database.GetWorkspacesAndAgentsByOwnerIDRow +} + +// GetAuthorizedWorkspacesAndAgentsByOwnerID implements coderd.UpdatesQuerier. +func (m *mockWorkspaceStore) GetWorkspacesAndAgentsByOwnerID(context.Context, uuid.UUID) ([]database.GetWorkspacesAndAgentsByOwnerIDRow, error) { + return m.orderedRows, nil +} + +// GetWorkspaceByAgentID implements coderd.UpdatesQuerier. +func (*mockWorkspaceStore) GetWorkspaceByAgentID(context.Context, uuid.UUID) (database.Workspace, error) { + return database.Workspace{}, nil +} + +var _ coderd.UpdatesQuerier = (*mockWorkspaceStore)(nil) + +type mockPubsub struct { + cbs map[string]pubsub.ListenerWithErr +} + +// Close implements pubsub.Pubsub. +func (*mockPubsub) Close() error { + panic("unimplemented") +} + +// Publish implements pubsub.Pubsub. +func (m *mockPubsub) Publish(event string, message []byte) error { + cb, ok := m.cbs[event] + if !ok { + return nil + } + cb(context.Background(), message, nil) + return nil +} + +func (*mockPubsub) Subscribe(string, pubsub.Listener) (cancel func(), err error) { + panic("unimplemented") +} + +func (m *mockPubsub) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (func(), error) { + m.cbs[event] = listener + return func() {}, nil +} + +var _ pubsub.Pubsub = (*mockPubsub)(nil) + +type mockAuthorizer struct{} + +func (*mockAuthorizer) Authorize(context.Context, rbac.Subject, policy.Action, rbac.Object) error { + return nil +} + +// Prepare implements rbac.Authorizer. +func (*mockAuthorizer) Prepare(context.Context, rbac.Subject, policy.Action, string) (rbac.PreparedAuthorized, error) { + return nil, nil +} + +var _ rbac.Authorizer = (*mockAuthorizer)(nil) diff --git a/codersdk/provisionerdaemons.go b/codersdk/provisionerdaemons.go index 7ba10539b6..7b14afbbb2 100644 --- a/codersdk/provisionerdaemons.go +++ b/codersdk/provisionerdaemons.go @@ -402,3 +402,37 @@ func (c *Client) DeleteProvisionerKey(ctx context.Context, organizationID uuid.U } return nil } + +func ConvertWorkspaceStatus(jobStatus ProvisionerJobStatus, transition WorkspaceTransition) WorkspaceStatus { + switch jobStatus { + case ProvisionerJobPending: + return WorkspaceStatusPending + case ProvisionerJobRunning: + switch transition { + case WorkspaceTransitionStart: + return WorkspaceStatusStarting + case WorkspaceTransitionStop: + return WorkspaceStatusStopping + case WorkspaceTransitionDelete: + return WorkspaceStatusDeleting + } + case ProvisionerJobSucceeded: + switch transition { + case WorkspaceTransitionStart: + return WorkspaceStatusRunning + case WorkspaceTransitionStop: + return WorkspaceStatusStopped + case WorkspaceTransitionDelete: + return WorkspaceStatusDeleted + } + case ProvisionerJobCanceling: + return WorkspaceStatusCanceling + case ProvisionerJobCanceled: + return WorkspaceStatusCanceled + case ProvisionerJobFailed: + return WorkspaceStatusFailed + } + + // return error status since we should never get here + return WorkspaceStatusFailed +} diff --git a/codersdk/workspacesdk/connector_internal_test.go b/codersdk/workspacesdk/connector_internal_test.go index 19f1930c89..009de5c6bf 100644 --- a/codersdk/workspacesdk/connector_internal_test.go +++ b/codersdk/workspacesdk/connector_internal_test.go @@ -580,6 +580,11 @@ func (f *fakeDRPCClient) RefreshResumeToken(_ context.Context, _ *proto.RefreshR }, nil } +// WorkspaceUpdates implements proto.DRPCTailnetClient. +func (*fakeDRPCClient) WorkspaceUpdates(context.Context, *proto.WorkspaceUpdatesRequest) (proto.DRPCTailnet_WorkspaceUpdatesClient, error) { + panic("unimplemented") +} + type fakeDRPCConn struct{} var _ drpc.Conn = &fakeDRPCConn{} diff --git a/docs/reference/api/agents.md b/docs/reference/api/agents.md index 8e7f46bc7d..6ccffeb823 100644 --- a/docs/reference/api/agents.md +++ b/docs/reference/api/agents.md @@ -20,6 +20,26 @@ curl -X GET http://coder-server:8080/api/v2/derp-map \ To perform this operation, you must be authenticated. [Learn more](authentication.md). +## User-scoped tailnet RPC connection + +### Code samples + +```shell +# Example request using curl +curl -X GET http://coder-server:8080/api/v2/tailnet \ + -H 'Coder-Session-Token: API_KEY' +``` + +`GET /tailnet` + +### Responses + +| Status | Meaning | Description | Schema | +| ------ | ------------------------------------------------------------------------ | ------------------- | ------ | +| 101 | [Switching Protocols](https://tools.ietf.org/html/rfc7231#section-6.2.2) | Switching Protocols | | + +To perform this operation, you must be authenticated. [Learn more](authentication.md). + ## Authenticate agent on AWS instance ### Code samples diff --git a/enterprise/tailnet/connio.go b/enterprise/tailnet/connio.go index fd2c99bdeb..923af4bee0 100644 --- a/enterprise/tailnet/connio.go +++ b/enterprise/tailnet/connio.go @@ -133,7 +133,7 @@ var errDisconnect = xerrors.New("graceful disconnect") func (c *connIO) handleRequest(req *proto.CoordinateRequest) error { c.logger.Debug(c.peerCtx, "got request") - err := c.auth.Authorize(req) + err := c.auth.Authorize(c.peerCtx, req) if err != nil { c.logger.Warn(c.peerCtx, "unauthorized request", slog.Error(err)) return xerrors.Errorf("authorize request: %w", err) diff --git a/enterprise/tailnet/pgcoord_test.go b/enterprise/tailnet/pgcoord_test.go index 08c0017a2d..c0d122aa74 100644 --- a/enterprise/tailnet/pgcoord_test.go +++ b/enterprise/tailnet/pgcoord_test.go @@ -913,6 +913,42 @@ func TestPGCoordinatorDual_PeerReconnect(t *testing.T) { p2.AssertNeverUpdateKind(p1.ID, proto.CoordinateResponse_PeerUpdate_DISCONNECTED) } +// TestPGCoordinatorPropogatedPeerContext tests that the context for a specific peer +// is propogated through to the `Authorize` method of the coordinatee auth +func TestPGCoordinatorPropogatedPeerContext(t *testing.T) { + t.Parallel() + + if !dbtestutil.WillUsePostgres() { + t.Skip("test only with postgres") + } + + ctx := testutil.Context(t, testutil.WaitShort) + store, ps := dbtestutil.NewDB(t) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + + peerCtx := context.WithValue(ctx, agpltest.FakeSubjectKey{}, struct{}{}) + peerID := uuid.UUID{0x01} + agentID := uuid.UUID{0x02} + + c1, err := tailnet.NewPGCoord(ctx, logger, ps, store) + require.NoError(t, err) + defer func() { + err := c1.Close() + require.NoError(t, err) + }() + + ch := make(chan struct{}) + auth := agpltest.FakeCoordinateeAuth{ + Chan: ch, + } + + reqs, _ := c1.Coordinate(peerCtx, peerID, "peer1", auth) + + testutil.RequireSendCtx(ctx, t, reqs, &proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: agpl.UUIDToByteSlice(agentID)}}) + + _ = testutil.RequireRecvCtx(ctx, t, ch) +} + func assertEventuallyStatus(ctx context.Context, t *testing.T, store database.Store, agentID uuid.UUID, status database.TailnetStatus) { t.Helper() assert.Eventually(t, func() bool { diff --git a/tailnet/convert.go b/tailnet/convert.go index a7d224dc01..3ba97e443f 100644 --- a/tailnet/convert.go +++ b/tailnet/convert.go @@ -9,6 +9,7 @@ import ( "tailscale.com/tailcfg" "tailscale.com/types/key" + "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/tailnet/proto" ) @@ -270,3 +271,30 @@ func DERPNodeFromProto(node *proto.DERPMap_Region_Node) *tailcfg.DERPNode { CanPort80: node.CanPort_80, } } + +func WorkspaceStatusToProto(status codersdk.WorkspaceStatus) proto.Workspace_Status { + switch status { + case codersdk.WorkspaceStatusCanceled: + return proto.Workspace_CANCELED + case codersdk.WorkspaceStatusCanceling: + return proto.Workspace_CANCELING + case codersdk.WorkspaceStatusDeleted: + return proto.Workspace_DELETED + case codersdk.WorkspaceStatusDeleting: + return proto.Workspace_DELETING + case codersdk.WorkspaceStatusFailed: + return proto.Workspace_FAILED + case codersdk.WorkspaceStatusPending: + return proto.Workspace_PENDING + case codersdk.WorkspaceStatusRunning: + return proto.Workspace_RUNNING + case codersdk.WorkspaceStatusStarting: + return proto.Workspace_STARTING + case codersdk.WorkspaceStatusStopped: + return proto.Workspace_STOPPED + case codersdk.WorkspaceStatusStopping: + return proto.Workspace_STOPPING + default: + return proto.Workspace_UNKNOWN + } +} diff --git a/tailnet/coordinator.go b/tailnet/coordinator.go index 54ce868df9..b059259895 100644 --- a/tailnet/coordinator.go +++ b/tailnet/coordinator.go @@ -566,7 +566,7 @@ func (c *core) node(id uuid.UUID) *Node { return v1Node } -func (c *core) handleRequest(p *peer, req *proto.CoordinateRequest) error { +func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.CoordinateRequest) error { c.mutex.Lock() defer c.mutex.Unlock() if c.closed { @@ -577,7 +577,7 @@ func (c *core) handleRequest(p *peer, req *proto.CoordinateRequest) error { return ErrAlreadyRemoved } - if err := pr.auth.Authorize(req); err != nil { + if err := pr.auth.Authorize(ctx, req); err != nil { return xerrors.Errorf("authorize request: %w", err) } diff --git a/tailnet/coordinator_test.go b/tailnet/coordinator_test.go index 5ffffde824..b3a803cd6a 100644 --- a/tailnet/coordinator_test.go +++ b/tailnet/coordinator_test.go @@ -328,7 +328,13 @@ func TestRemoteCoordination(t *testing.T) { serveErr := make(chan error, 1) go func() { - err := svc.ServeClient(ctx, proto.CurrentVersion.String(), sC, clientID, agentID) + err := svc.ServeClient(ctx, proto.CurrentVersion.String(), sC, tailnet.StreamID{ + Name: "client", + ID: clientID, + Auth: tailnet.ClientCoordinateeAuth{ + AgentID: agentID, + }, + }) serveErr <- err }() @@ -377,7 +383,13 @@ func TestRemoteCoordination_SendsReadyForHandshake(t *testing.T) { serveErr := make(chan error, 1) go func() { - err := svc.ServeClient(ctx, proto.CurrentVersion.String(), sC, clientID, agentID) + err := svc.ServeClient(ctx, proto.CurrentVersion.String(), sC, tailnet.StreamID{ + Name: "client", + ID: clientID, + Auth: tailnet.ClientCoordinateeAuth{ + AgentID: agentID, + }, + }) serveErr <- err }() @@ -517,3 +529,36 @@ func (f *fakeCoordinatee) SetNodeCallback(callback func(*tailnet.Node)) { defer f.Unlock() f.callback = callback } + +// TestCoordinatorPropogatedPeerContext tests that the context for a specific peer +// is propogated through to the `Authorizeā€œ method of the coordinatee auth +func TestCoordinatorPropogatedPeerContext(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + + peerCtx := context.WithValue(ctx, test.FakeSubjectKey{}, struct{}{}) + peerCtx, peerCtxCancel := context.WithCancel(peerCtx) + peerID := uuid.UUID{0x01} + agentID := uuid.UUID{0x02} + + c1 := tailnet.NewCoordinator(logger) + t.Cleanup(func() { + err := c1.Close() + require.NoError(t, err) + }) + + ch := make(chan struct{}) + auth := test.FakeCoordinateeAuth{ + Chan: ch, + } + + reqs, _ := c1.Coordinate(peerCtx, peerID, "peer1", auth) + + testutil.RequireSendCtx(ctx, t, reqs, &proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: tailnet.UUIDToByteSlice(agentID)}}) + _ = testutil.RequireRecvCtx(ctx, t, ch) + // If we don't cancel the context, the coordinator close will wait until the + // peer request loop finishes, which will be after the timeout + peerCtxCancel() +} diff --git a/tailnet/peer.go b/tailnet/peer.go index eadc882f5a..7d69764abe 100644 --- a/tailnet/peer.go +++ b/tailnet/peer.go @@ -121,7 +121,7 @@ func (p *peer) storeMappingLocked( }, nil } -func (p *peer) reqLoop(ctx context.Context, logger slog.Logger, handler func(*peer, *proto.CoordinateRequest) error) { +func (p *peer) reqLoop(ctx context.Context, logger slog.Logger, handler func(context.Context, *peer, *proto.CoordinateRequest) error) { for { select { case <-ctx.Done(): @@ -133,7 +133,7 @@ func (p *peer) reqLoop(ctx context.Context, logger slog.Logger, handler func(*pe return } logger.Debug(ctx, "peerReadLoop got request") - if err := handler(p, req); err != nil { + if err := handler(ctx, p, req); err != nil { if xerrors.Is(err, ErrAlreadyRemoved) || xerrors.Is(err, ErrClosed) { return } diff --git a/tailnet/proto/tailnet.pb.go b/tailnet/proto/tailnet.pb.go index c4302954c0..b2a03fa53f 100644 --- a/tailnet/proto/tailnet.pb.go +++ b/tailnet/proto/tailnet.pb.go @@ -228,6 +228,79 @@ func (TelemetryEvent_ClientType) EnumDescriptor() ([]byte, []int) { return file_tailnet_proto_tailnet_proto_rawDescGZIP(), []int{9, 1} } +type Workspace_Status int32 + +const ( + Workspace_UNKNOWN Workspace_Status = 0 + Workspace_PENDING Workspace_Status = 1 + Workspace_STARTING Workspace_Status = 2 + Workspace_RUNNING Workspace_Status = 3 + Workspace_STOPPING Workspace_Status = 4 + Workspace_STOPPED Workspace_Status = 5 + Workspace_FAILED Workspace_Status = 6 + Workspace_CANCELING Workspace_Status = 7 + Workspace_CANCELED Workspace_Status = 8 + Workspace_DELETING Workspace_Status = 9 + Workspace_DELETED Workspace_Status = 10 +) + +// Enum value maps for Workspace_Status. +var ( + Workspace_Status_name = map[int32]string{ + 0: "UNKNOWN", + 1: "PENDING", + 2: "STARTING", + 3: "RUNNING", + 4: "STOPPING", + 5: "STOPPED", + 6: "FAILED", + 7: "CANCELING", + 8: "CANCELED", + 9: "DELETING", + 10: "DELETED", + } + Workspace_Status_value = map[string]int32{ + "UNKNOWN": 0, + "PENDING": 1, + "STARTING": 2, + "RUNNING": 3, + "STOPPING": 4, + "STOPPED": 5, + "FAILED": 6, + "CANCELING": 7, + "CANCELED": 8, + "DELETING": 9, + "DELETED": 10, + } +) + +func (x Workspace_Status) Enum() *Workspace_Status { + p := new(Workspace_Status) + *p = x + return p +} + +func (x Workspace_Status) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Workspace_Status) Descriptor() protoreflect.EnumDescriptor { + return file_tailnet_proto_tailnet_proto_enumTypes[4].Descriptor() +} + +func (Workspace_Status) Type() protoreflect.EnumType { + return &file_tailnet_proto_tailnet_proto_enumTypes[4] +} + +func (x Workspace_Status) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Workspace_Status.Descriptor instead. +func (Workspace_Status) EnumDescriptor() ([]byte, []int) { + return file_tailnet_proto_tailnet_proto_rawDescGZIP(), []int{14, 0} +} + type DERPMap struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1174,6 +1247,250 @@ func (*TelemetryResponse) Descriptor() ([]byte, []int) { return file_tailnet_proto_tailnet_proto_rawDescGZIP(), []int{11} } +type WorkspaceUpdatesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + WorkspaceOwnerId []byte `protobuf:"bytes,1,opt,name=workspace_owner_id,json=workspaceOwnerId,proto3" json:"workspace_owner_id,omitempty"` // UUID +} + +func (x *WorkspaceUpdatesRequest) Reset() { + *x = WorkspaceUpdatesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_tailnet_proto_tailnet_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WorkspaceUpdatesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WorkspaceUpdatesRequest) ProtoMessage() {} + +func (x *WorkspaceUpdatesRequest) ProtoReflect() protoreflect.Message { + mi := &file_tailnet_proto_tailnet_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WorkspaceUpdatesRequest.ProtoReflect.Descriptor instead. +func (*WorkspaceUpdatesRequest) Descriptor() ([]byte, []int) { + return file_tailnet_proto_tailnet_proto_rawDescGZIP(), []int{12} +} + +func (x *WorkspaceUpdatesRequest) GetWorkspaceOwnerId() []byte { + if x != nil { + return x.WorkspaceOwnerId + } + return nil +} + +type WorkspaceUpdate struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + UpsertedWorkspaces []*Workspace `protobuf:"bytes,1,rep,name=upserted_workspaces,json=upsertedWorkspaces,proto3" json:"upserted_workspaces,omitempty"` + UpsertedAgents []*Agent `protobuf:"bytes,2,rep,name=upserted_agents,json=upsertedAgents,proto3" json:"upserted_agents,omitempty"` + DeletedWorkspaces []*Workspace `protobuf:"bytes,3,rep,name=deleted_workspaces,json=deletedWorkspaces,proto3" json:"deleted_workspaces,omitempty"` + DeletedAgents []*Agent `protobuf:"bytes,4,rep,name=deleted_agents,json=deletedAgents,proto3" json:"deleted_agents,omitempty"` +} + +func (x *WorkspaceUpdate) Reset() { + *x = WorkspaceUpdate{} + if protoimpl.UnsafeEnabled { + mi := &file_tailnet_proto_tailnet_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WorkspaceUpdate) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WorkspaceUpdate) ProtoMessage() {} + +func (x *WorkspaceUpdate) ProtoReflect() protoreflect.Message { + mi := &file_tailnet_proto_tailnet_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WorkspaceUpdate.ProtoReflect.Descriptor instead. +func (*WorkspaceUpdate) Descriptor() ([]byte, []int) { + return file_tailnet_proto_tailnet_proto_rawDescGZIP(), []int{13} +} + +func (x *WorkspaceUpdate) GetUpsertedWorkspaces() []*Workspace { + if x != nil { + return x.UpsertedWorkspaces + } + return nil +} + +func (x *WorkspaceUpdate) GetUpsertedAgents() []*Agent { + if x != nil { + return x.UpsertedAgents + } + return nil +} + +func (x *WorkspaceUpdate) GetDeletedWorkspaces() []*Workspace { + if x != nil { + return x.DeletedWorkspaces + } + return nil +} + +func (x *WorkspaceUpdate) GetDeletedAgents() []*Agent { + if x != nil { + return x.DeletedAgents + } + return nil +} + +type Workspace struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // UUID + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Status Workspace_Status `protobuf:"varint,3,opt,name=status,proto3,enum=coder.tailnet.v2.Workspace_Status" json:"status,omitempty"` +} + +func (x *Workspace) Reset() { + *x = Workspace{} + if protoimpl.UnsafeEnabled { + mi := &file_tailnet_proto_tailnet_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Workspace) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Workspace) ProtoMessage() {} + +func (x *Workspace) ProtoReflect() protoreflect.Message { + mi := &file_tailnet_proto_tailnet_proto_msgTypes[14] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Workspace.ProtoReflect.Descriptor instead. +func (*Workspace) Descriptor() ([]byte, []int) { + return file_tailnet_proto_tailnet_proto_rawDescGZIP(), []int{14} +} + +func (x *Workspace) GetId() []byte { + if x != nil { + return x.Id + } + return nil +} + +func (x *Workspace) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *Workspace) GetStatus() Workspace_Status { + if x != nil { + return x.Status + } + return Workspace_UNKNOWN +} + +type Agent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // UUID + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + WorkspaceId []byte `protobuf:"bytes,3,opt,name=workspace_id,json=workspaceId,proto3" json:"workspace_id,omitempty"` // UUID +} + +func (x *Agent) Reset() { + *x = Agent{} + if protoimpl.UnsafeEnabled { + mi := &file_tailnet_proto_tailnet_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Agent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Agent) ProtoMessage() {} + +func (x *Agent) ProtoReflect() protoreflect.Message { + mi := &file_tailnet_proto_tailnet_proto_msgTypes[15] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Agent.ProtoReflect.Descriptor instead. +func (*Agent) Descriptor() ([]byte, []int) { + return file_tailnet_proto_tailnet_proto_rawDescGZIP(), []int{15} +} + +func (x *Agent) GetId() []byte { + if x != nil { + return x.Id + } + return nil +} + +func (x *Agent) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *Agent) GetWorkspaceId() []byte { + if x != nil { + return x.WorkspaceId + } + return nil +} + type DERPMap_HomeParams struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1185,7 +1502,7 @@ type DERPMap_HomeParams struct { func (x *DERPMap_HomeParams) Reset() { *x = DERPMap_HomeParams{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[12] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1198,7 +1515,7 @@ func (x *DERPMap_HomeParams) String() string { func (*DERPMap_HomeParams) ProtoMessage() {} func (x *DERPMap_HomeParams) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[12] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[16] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1237,7 +1554,7 @@ type DERPMap_Region struct { func (x *DERPMap_Region) Reset() { *x = DERPMap_Region{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[13] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1250,7 +1567,7 @@ func (x *DERPMap_Region) String() string { func (*DERPMap_Region) ProtoMessage() {} func (x *DERPMap_Region) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[13] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[17] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1331,7 +1648,7 @@ type DERPMap_Region_Node struct { func (x *DERPMap_Region_Node) Reset() { *x = DERPMap_Region_Node{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[16] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1344,7 +1661,7 @@ func (x *DERPMap_Region_Node) String() string { func (*DERPMap_Region_Node) ProtoMessage() {} func (x *DERPMap_Region_Node) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[16] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[20] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1462,7 +1779,7 @@ type CoordinateRequest_UpdateSelf struct { func (x *CoordinateRequest_UpdateSelf) Reset() { *x = CoordinateRequest_UpdateSelf{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[19] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[23] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1475,7 +1792,7 @@ func (x *CoordinateRequest_UpdateSelf) String() string { func (*CoordinateRequest_UpdateSelf) ProtoMessage() {} func (x *CoordinateRequest_UpdateSelf) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[19] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[23] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1507,7 +1824,7 @@ type CoordinateRequest_Disconnect struct { func (x *CoordinateRequest_Disconnect) Reset() { *x = CoordinateRequest_Disconnect{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[20] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[24] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1520,7 +1837,7 @@ func (x *CoordinateRequest_Disconnect) String() string { func (*CoordinateRequest_Disconnect) ProtoMessage() {} func (x *CoordinateRequest_Disconnect) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[20] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[24] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1547,7 +1864,7 @@ type CoordinateRequest_Tunnel struct { func (x *CoordinateRequest_Tunnel) Reset() { *x = CoordinateRequest_Tunnel{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[21] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[25] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1560,7 +1877,7 @@ func (x *CoordinateRequest_Tunnel) String() string { func (*CoordinateRequest_Tunnel) ProtoMessage() {} func (x *CoordinateRequest_Tunnel) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[21] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[25] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1598,7 +1915,7 @@ type CoordinateRequest_ReadyForHandshake struct { func (x *CoordinateRequest_ReadyForHandshake) Reset() { *x = CoordinateRequest_ReadyForHandshake{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[22] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[26] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1611,7 +1928,7 @@ func (x *CoordinateRequest_ReadyForHandshake) String() string { func (*CoordinateRequest_ReadyForHandshake) ProtoMessage() {} func (x *CoordinateRequest_ReadyForHandshake) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[22] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[26] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1648,7 +1965,7 @@ type CoordinateResponse_PeerUpdate struct { func (x *CoordinateResponse_PeerUpdate) Reset() { *x = CoordinateResponse_PeerUpdate{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[23] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[27] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1661,7 +1978,7 @@ func (x *CoordinateResponse_PeerUpdate) String() string { func (*CoordinateResponse_PeerUpdate) ProtoMessage() {} func (x *CoordinateResponse_PeerUpdate) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[23] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[27] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1717,7 +2034,7 @@ type Netcheck_NetcheckIP struct { func (x *Netcheck_NetcheckIP) Reset() { *x = Netcheck_NetcheckIP{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[26] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[30] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1730,7 +2047,7 @@ func (x *Netcheck_NetcheckIP) String() string { func (*Netcheck_NetcheckIP) ProtoMessage() {} func (x *Netcheck_NetcheckIP) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[26] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[30] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1773,7 +2090,7 @@ type TelemetryEvent_P2PEndpoint struct { func (x *TelemetryEvent_P2PEndpoint) Reset() { *x = TelemetryEvent_P2PEndpoint{} if protoimpl.UnsafeEnabled { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[27] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[31] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1786,7 +2103,7 @@ func (x *TelemetryEvent_P2PEndpoint) String() string { func (*TelemetryEvent_P2PEndpoint) ProtoMessage() {} func (x *TelemetryEvent_P2PEndpoint) ProtoReflect() protoreflect.Message { - mi := &file_tailnet_proto_tailnet_proto_msgTypes[27] + mi := &file_tailnet_proto_tailnet_proto_msgTypes[31] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2171,35 +2488,86 @@ var file_tailnet_proto_tailnet_proto_rawDesc = []byte{ 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x13, 0x0a, 0x11, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x89, 0x03, 0x0a, 0x07, 0x54, 0x61, 0x69, 0x6c, - 0x6e, 0x65, 0x74, 0x12, 0x58, 0x0a, 0x0d, 0x50, 0x6f, 0x73, 0x74, 0x54, 0x65, 0x6c, 0x65, 0x6d, - 0x65, 0x74, 0x72, 0x79, 0x12, 0x22, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, - 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, - 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, - 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x65, 0x6c, 0x65, - 0x6d, 0x65, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x56, 0x0a, - 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x45, 0x52, 0x50, 0x4d, 0x61, 0x70, 0x73, 0x12, - 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, - 0x76, 0x32, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x45, 0x52, 0x50, 0x4d, 0x61, 0x70, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, - 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x45, 0x52, 0x50, - 0x4d, 0x61, 0x70, 0x30, 0x01, 0x12, 0x6f, 0x0a, 0x12, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, - 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x2b, 0x2e, 0x63, 0x6f, - 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x47, 0x0a, 0x17, 0x57, 0x6f, 0x72, 0x6b, 0x73, + 0x70, 0x61, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x12, 0x77, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x5f, + 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x10, + 0x77, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x49, 0x64, + 0x22, 0xad, 0x02, 0x0a, 0x0f, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x55, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x12, 0x4c, 0x0a, 0x13, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x65, 0x64, + 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x1b, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, + 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x52, 0x12, + 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x65, 0x64, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, + 0x65, 0x73, 0x12, 0x40, 0x0a, 0x0f, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x65, 0x64, 0x5f, 0x61, + 0x67, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x63, 0x6f, + 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x41, + 0x67, 0x65, 0x6e, 0x74, 0x52, 0x0e, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x65, 0x64, 0x41, 0x67, + 0x65, 0x6e, 0x74, 0x73, 0x12, 0x4a, 0x0a, 0x12, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, + 0x77, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x1b, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, + 0x2e, 0x76, 0x32, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x52, 0x11, 0x64, + 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x73, + 0x12, 0x3e, 0x0a, 0x0e, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x67, 0x65, 0x6e, + 0x74, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, + 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x41, 0x67, 0x65, 0x6e, + 0x74, 0x52, 0x0d, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x73, + 0x22, 0x8a, 0x02, 0x0a, 0x09, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, + 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x12, 0x3a, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x22, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, + 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x2e, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x9c, + 0x01, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, + 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, + 0x47, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x54, 0x41, 0x52, 0x54, 0x49, 0x4e, 0x47, 0x10, + 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x03, 0x12, 0x0c, + 0x0a, 0x08, 0x53, 0x54, 0x4f, 0x50, 0x50, 0x49, 0x4e, 0x47, 0x10, 0x04, 0x12, 0x0b, 0x0a, 0x07, + 0x53, 0x54, 0x4f, 0x50, 0x50, 0x45, 0x44, 0x10, 0x05, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, + 0x4c, 0x45, 0x44, 0x10, 0x06, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x49, + 0x4e, 0x47, 0x10, 0x07, 0x12, 0x0c, 0x0a, 0x08, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x45, 0x44, + 0x10, 0x08, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x49, 0x4e, 0x47, 0x10, 0x09, + 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x0a, 0x22, 0x4e, 0x0a, + 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x77, 0x6f, + 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x49, 0x64, 0x32, 0xed, 0x03, + 0x0a, 0x07, 0x54, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x12, 0x58, 0x0a, 0x0d, 0x50, 0x6f, 0x73, + 0x74, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x12, 0x22, 0x2e, 0x63, 0x6f, 0x64, + 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x65, + 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, + 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, + 0x32, 0x2e, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x56, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x45, 0x52, + 0x50, 0x4d, 0x61, 0x70, 0x73, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, + 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, + 0x45, 0x52, 0x50, 0x4d, 0x61, 0x70, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, + 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, + 0x32, 0x2e, 0x44, 0x45, 0x52, 0x50, 0x4d, 0x61, 0x70, 0x30, 0x01, 0x12, 0x6f, 0x0a, 0x12, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, - 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, - 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x52, 0x65, 0x66, 0x72, - 0x65, 0x73, 0x68, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5b, 0x0a, 0x0a, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, - 0x6e, 0x61, 0x74, 0x65, 0x12, 0x23, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, - 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, - 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x63, 0x6f, 0x64, 0x65, - 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6f, - 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, - 0x01, 0x30, 0x01, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, - 0x2f, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6e, 0x12, 0x2b, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, + 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x73, 0x75, + 0x6d, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, + 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, + 0x32, 0x2e, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x54, + 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5b, 0x0a, 0x0a, + 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x12, 0x23, 0x2e, 0x63, 0x6f, 0x64, + 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, + 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x24, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, + 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x62, 0x0a, 0x10, 0x57, 0x6f, 0x72, + 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x12, 0x29, 0x2e, + 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, + 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, + 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x57, 0x6f, 0x72, 0x6b, + 0x73, 0x70, 0x61, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x30, 0x01, 0x42, 0x29, 0x5a, + 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x64, 0x65, + 0x72, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x74, 0x61, 0x69, 0x6c, 0x6e, + 0x65, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -2214,107 +2582,119 @@ func file_tailnet_proto_tailnet_proto_rawDescGZIP() []byte { return file_tailnet_proto_tailnet_proto_rawDescData } -var file_tailnet_proto_tailnet_proto_enumTypes = make([]protoimpl.EnumInfo, 4) -var file_tailnet_proto_tailnet_proto_msgTypes = make([]protoimpl.MessageInfo, 28) +var file_tailnet_proto_tailnet_proto_enumTypes = make([]protoimpl.EnumInfo, 5) +var file_tailnet_proto_tailnet_proto_msgTypes = make([]protoimpl.MessageInfo, 32) var file_tailnet_proto_tailnet_proto_goTypes = []interface{}{ (CoordinateResponse_PeerUpdate_Kind)(0), // 0: coder.tailnet.v2.CoordinateResponse.PeerUpdate.Kind (IPFields_IPClass)(0), // 1: coder.tailnet.v2.IPFields.IPClass (TelemetryEvent_Status)(0), // 2: coder.tailnet.v2.TelemetryEvent.Status (TelemetryEvent_ClientType)(0), // 3: coder.tailnet.v2.TelemetryEvent.ClientType - (*DERPMap)(nil), // 4: coder.tailnet.v2.DERPMap - (*StreamDERPMapsRequest)(nil), // 5: coder.tailnet.v2.StreamDERPMapsRequest - (*Node)(nil), // 6: coder.tailnet.v2.Node - (*RefreshResumeTokenRequest)(nil), // 7: coder.tailnet.v2.RefreshResumeTokenRequest - (*RefreshResumeTokenResponse)(nil), // 8: coder.tailnet.v2.RefreshResumeTokenResponse - (*CoordinateRequest)(nil), // 9: coder.tailnet.v2.CoordinateRequest - (*CoordinateResponse)(nil), // 10: coder.tailnet.v2.CoordinateResponse - (*IPFields)(nil), // 11: coder.tailnet.v2.IPFields - (*Netcheck)(nil), // 12: coder.tailnet.v2.Netcheck - (*TelemetryEvent)(nil), // 13: coder.tailnet.v2.TelemetryEvent - (*TelemetryRequest)(nil), // 14: coder.tailnet.v2.TelemetryRequest - (*TelemetryResponse)(nil), // 15: coder.tailnet.v2.TelemetryResponse - (*DERPMap_HomeParams)(nil), // 16: coder.tailnet.v2.DERPMap.HomeParams - (*DERPMap_Region)(nil), // 17: coder.tailnet.v2.DERPMap.Region - nil, // 18: coder.tailnet.v2.DERPMap.RegionsEntry - nil, // 19: coder.tailnet.v2.DERPMap.HomeParams.RegionScoreEntry - (*DERPMap_Region_Node)(nil), // 20: coder.tailnet.v2.DERPMap.Region.Node - nil, // 21: coder.tailnet.v2.Node.DerpLatencyEntry - nil, // 22: coder.tailnet.v2.Node.DerpForcedWebsocketEntry - (*CoordinateRequest_UpdateSelf)(nil), // 23: coder.tailnet.v2.CoordinateRequest.UpdateSelf - (*CoordinateRequest_Disconnect)(nil), // 24: coder.tailnet.v2.CoordinateRequest.Disconnect - (*CoordinateRequest_Tunnel)(nil), // 25: coder.tailnet.v2.CoordinateRequest.Tunnel - (*CoordinateRequest_ReadyForHandshake)(nil), // 26: coder.tailnet.v2.CoordinateRequest.ReadyForHandshake - (*CoordinateResponse_PeerUpdate)(nil), // 27: coder.tailnet.v2.CoordinateResponse.PeerUpdate - nil, // 28: coder.tailnet.v2.Netcheck.RegionV4LatencyEntry - nil, // 29: coder.tailnet.v2.Netcheck.RegionV6LatencyEntry - (*Netcheck_NetcheckIP)(nil), // 30: coder.tailnet.v2.Netcheck.NetcheckIP - (*TelemetryEvent_P2PEndpoint)(nil), // 31: coder.tailnet.v2.TelemetryEvent.P2PEndpoint - (*timestamppb.Timestamp)(nil), // 32: google.protobuf.Timestamp - (*durationpb.Duration)(nil), // 33: google.protobuf.Duration - (*wrapperspb.BoolValue)(nil), // 34: google.protobuf.BoolValue - (*wrapperspb.FloatValue)(nil), // 35: google.protobuf.FloatValue + (Workspace_Status)(0), // 4: coder.tailnet.v2.Workspace.Status + (*DERPMap)(nil), // 5: coder.tailnet.v2.DERPMap + (*StreamDERPMapsRequest)(nil), // 6: coder.tailnet.v2.StreamDERPMapsRequest + (*Node)(nil), // 7: coder.tailnet.v2.Node + (*RefreshResumeTokenRequest)(nil), // 8: coder.tailnet.v2.RefreshResumeTokenRequest + (*RefreshResumeTokenResponse)(nil), // 9: coder.tailnet.v2.RefreshResumeTokenResponse + (*CoordinateRequest)(nil), // 10: coder.tailnet.v2.CoordinateRequest + (*CoordinateResponse)(nil), // 11: coder.tailnet.v2.CoordinateResponse + (*IPFields)(nil), // 12: coder.tailnet.v2.IPFields + (*Netcheck)(nil), // 13: coder.tailnet.v2.Netcheck + (*TelemetryEvent)(nil), // 14: coder.tailnet.v2.TelemetryEvent + (*TelemetryRequest)(nil), // 15: coder.tailnet.v2.TelemetryRequest + (*TelemetryResponse)(nil), // 16: coder.tailnet.v2.TelemetryResponse + (*WorkspaceUpdatesRequest)(nil), // 17: coder.tailnet.v2.WorkspaceUpdatesRequest + (*WorkspaceUpdate)(nil), // 18: coder.tailnet.v2.WorkspaceUpdate + (*Workspace)(nil), // 19: coder.tailnet.v2.Workspace + (*Agent)(nil), // 20: coder.tailnet.v2.Agent + (*DERPMap_HomeParams)(nil), // 21: coder.tailnet.v2.DERPMap.HomeParams + (*DERPMap_Region)(nil), // 22: coder.tailnet.v2.DERPMap.Region + nil, // 23: coder.tailnet.v2.DERPMap.RegionsEntry + nil, // 24: coder.tailnet.v2.DERPMap.HomeParams.RegionScoreEntry + (*DERPMap_Region_Node)(nil), // 25: coder.tailnet.v2.DERPMap.Region.Node + nil, // 26: coder.tailnet.v2.Node.DerpLatencyEntry + nil, // 27: coder.tailnet.v2.Node.DerpForcedWebsocketEntry + (*CoordinateRequest_UpdateSelf)(nil), // 28: coder.tailnet.v2.CoordinateRequest.UpdateSelf + (*CoordinateRequest_Disconnect)(nil), // 29: coder.tailnet.v2.CoordinateRequest.Disconnect + (*CoordinateRequest_Tunnel)(nil), // 30: coder.tailnet.v2.CoordinateRequest.Tunnel + (*CoordinateRequest_ReadyForHandshake)(nil), // 31: coder.tailnet.v2.CoordinateRequest.ReadyForHandshake + (*CoordinateResponse_PeerUpdate)(nil), // 32: coder.tailnet.v2.CoordinateResponse.PeerUpdate + nil, // 33: coder.tailnet.v2.Netcheck.RegionV4LatencyEntry + nil, // 34: coder.tailnet.v2.Netcheck.RegionV6LatencyEntry + (*Netcheck_NetcheckIP)(nil), // 35: coder.tailnet.v2.Netcheck.NetcheckIP + (*TelemetryEvent_P2PEndpoint)(nil), // 36: coder.tailnet.v2.TelemetryEvent.P2PEndpoint + (*timestamppb.Timestamp)(nil), // 37: google.protobuf.Timestamp + (*durationpb.Duration)(nil), // 38: google.protobuf.Duration + (*wrapperspb.BoolValue)(nil), // 39: google.protobuf.BoolValue + (*wrapperspb.FloatValue)(nil), // 40: google.protobuf.FloatValue } var file_tailnet_proto_tailnet_proto_depIdxs = []int32{ - 16, // 0: coder.tailnet.v2.DERPMap.home_params:type_name -> coder.tailnet.v2.DERPMap.HomeParams - 18, // 1: coder.tailnet.v2.DERPMap.regions:type_name -> coder.tailnet.v2.DERPMap.RegionsEntry - 32, // 2: coder.tailnet.v2.Node.as_of:type_name -> google.protobuf.Timestamp - 21, // 3: coder.tailnet.v2.Node.derp_latency:type_name -> coder.tailnet.v2.Node.DerpLatencyEntry - 22, // 4: coder.tailnet.v2.Node.derp_forced_websocket:type_name -> coder.tailnet.v2.Node.DerpForcedWebsocketEntry - 33, // 5: coder.tailnet.v2.RefreshResumeTokenResponse.refresh_in:type_name -> google.protobuf.Duration - 32, // 6: coder.tailnet.v2.RefreshResumeTokenResponse.expires_at:type_name -> google.protobuf.Timestamp - 23, // 7: coder.tailnet.v2.CoordinateRequest.update_self:type_name -> coder.tailnet.v2.CoordinateRequest.UpdateSelf - 24, // 8: coder.tailnet.v2.CoordinateRequest.disconnect:type_name -> coder.tailnet.v2.CoordinateRequest.Disconnect - 25, // 9: coder.tailnet.v2.CoordinateRequest.add_tunnel:type_name -> coder.tailnet.v2.CoordinateRequest.Tunnel - 25, // 10: coder.tailnet.v2.CoordinateRequest.remove_tunnel:type_name -> coder.tailnet.v2.CoordinateRequest.Tunnel - 26, // 11: coder.tailnet.v2.CoordinateRequest.ready_for_handshake:type_name -> coder.tailnet.v2.CoordinateRequest.ReadyForHandshake - 27, // 12: coder.tailnet.v2.CoordinateResponse.peer_updates:type_name -> coder.tailnet.v2.CoordinateResponse.PeerUpdate + 21, // 0: coder.tailnet.v2.DERPMap.home_params:type_name -> coder.tailnet.v2.DERPMap.HomeParams + 23, // 1: coder.tailnet.v2.DERPMap.regions:type_name -> coder.tailnet.v2.DERPMap.RegionsEntry + 37, // 2: coder.tailnet.v2.Node.as_of:type_name -> google.protobuf.Timestamp + 26, // 3: coder.tailnet.v2.Node.derp_latency:type_name -> coder.tailnet.v2.Node.DerpLatencyEntry + 27, // 4: coder.tailnet.v2.Node.derp_forced_websocket:type_name -> coder.tailnet.v2.Node.DerpForcedWebsocketEntry + 38, // 5: coder.tailnet.v2.RefreshResumeTokenResponse.refresh_in:type_name -> google.protobuf.Duration + 37, // 6: coder.tailnet.v2.RefreshResumeTokenResponse.expires_at:type_name -> google.protobuf.Timestamp + 28, // 7: coder.tailnet.v2.CoordinateRequest.update_self:type_name -> coder.tailnet.v2.CoordinateRequest.UpdateSelf + 29, // 8: coder.tailnet.v2.CoordinateRequest.disconnect:type_name -> coder.tailnet.v2.CoordinateRequest.Disconnect + 30, // 9: coder.tailnet.v2.CoordinateRequest.add_tunnel:type_name -> coder.tailnet.v2.CoordinateRequest.Tunnel + 30, // 10: coder.tailnet.v2.CoordinateRequest.remove_tunnel:type_name -> coder.tailnet.v2.CoordinateRequest.Tunnel + 31, // 11: coder.tailnet.v2.CoordinateRequest.ready_for_handshake:type_name -> coder.tailnet.v2.CoordinateRequest.ReadyForHandshake + 32, // 12: coder.tailnet.v2.CoordinateResponse.peer_updates:type_name -> coder.tailnet.v2.CoordinateResponse.PeerUpdate 1, // 13: coder.tailnet.v2.IPFields.class:type_name -> coder.tailnet.v2.IPFields.IPClass - 34, // 14: coder.tailnet.v2.Netcheck.OSHasIPv6:type_name -> google.protobuf.BoolValue - 34, // 15: coder.tailnet.v2.Netcheck.MappingVariesByDestIP:type_name -> google.protobuf.BoolValue - 34, // 16: coder.tailnet.v2.Netcheck.HairPinning:type_name -> google.protobuf.BoolValue - 34, // 17: coder.tailnet.v2.Netcheck.UPnP:type_name -> google.protobuf.BoolValue - 34, // 18: coder.tailnet.v2.Netcheck.PMP:type_name -> google.protobuf.BoolValue - 34, // 19: coder.tailnet.v2.Netcheck.PCP:type_name -> google.protobuf.BoolValue - 28, // 20: coder.tailnet.v2.Netcheck.RegionV4Latency:type_name -> coder.tailnet.v2.Netcheck.RegionV4LatencyEntry - 29, // 21: coder.tailnet.v2.Netcheck.RegionV6Latency:type_name -> coder.tailnet.v2.Netcheck.RegionV6LatencyEntry - 30, // 22: coder.tailnet.v2.Netcheck.GlobalV4:type_name -> coder.tailnet.v2.Netcheck.NetcheckIP - 30, // 23: coder.tailnet.v2.Netcheck.GlobalV6:type_name -> coder.tailnet.v2.Netcheck.NetcheckIP - 32, // 24: coder.tailnet.v2.TelemetryEvent.time:type_name -> google.protobuf.Timestamp + 39, // 14: coder.tailnet.v2.Netcheck.OSHasIPv6:type_name -> google.protobuf.BoolValue + 39, // 15: coder.tailnet.v2.Netcheck.MappingVariesByDestIP:type_name -> google.protobuf.BoolValue + 39, // 16: coder.tailnet.v2.Netcheck.HairPinning:type_name -> google.protobuf.BoolValue + 39, // 17: coder.tailnet.v2.Netcheck.UPnP:type_name -> google.protobuf.BoolValue + 39, // 18: coder.tailnet.v2.Netcheck.PMP:type_name -> google.protobuf.BoolValue + 39, // 19: coder.tailnet.v2.Netcheck.PCP:type_name -> google.protobuf.BoolValue + 33, // 20: coder.tailnet.v2.Netcheck.RegionV4Latency:type_name -> coder.tailnet.v2.Netcheck.RegionV4LatencyEntry + 34, // 21: coder.tailnet.v2.Netcheck.RegionV6Latency:type_name -> coder.tailnet.v2.Netcheck.RegionV6LatencyEntry + 35, // 22: coder.tailnet.v2.Netcheck.GlobalV4:type_name -> coder.tailnet.v2.Netcheck.NetcheckIP + 35, // 23: coder.tailnet.v2.Netcheck.GlobalV6:type_name -> coder.tailnet.v2.Netcheck.NetcheckIP + 37, // 24: coder.tailnet.v2.TelemetryEvent.time:type_name -> google.protobuf.Timestamp 2, // 25: coder.tailnet.v2.TelemetryEvent.status:type_name -> coder.tailnet.v2.TelemetryEvent.Status 3, // 26: coder.tailnet.v2.TelemetryEvent.client_type:type_name -> coder.tailnet.v2.TelemetryEvent.ClientType - 31, // 27: coder.tailnet.v2.TelemetryEvent.p2p_endpoint:type_name -> coder.tailnet.v2.TelemetryEvent.P2PEndpoint - 4, // 28: coder.tailnet.v2.TelemetryEvent.derp_map:type_name -> coder.tailnet.v2.DERPMap - 12, // 29: coder.tailnet.v2.TelemetryEvent.latest_netcheck:type_name -> coder.tailnet.v2.Netcheck - 33, // 30: coder.tailnet.v2.TelemetryEvent.connection_age:type_name -> google.protobuf.Duration - 33, // 31: coder.tailnet.v2.TelemetryEvent.connection_setup:type_name -> google.protobuf.Duration - 33, // 32: coder.tailnet.v2.TelemetryEvent.p2p_setup:type_name -> google.protobuf.Duration - 33, // 33: coder.tailnet.v2.TelemetryEvent.derp_latency:type_name -> google.protobuf.Duration - 33, // 34: coder.tailnet.v2.TelemetryEvent.p2p_latency:type_name -> google.protobuf.Duration - 35, // 35: coder.tailnet.v2.TelemetryEvent.throughput_mbits:type_name -> google.protobuf.FloatValue - 13, // 36: coder.tailnet.v2.TelemetryRequest.events:type_name -> coder.tailnet.v2.TelemetryEvent - 19, // 37: coder.tailnet.v2.DERPMap.HomeParams.region_score:type_name -> coder.tailnet.v2.DERPMap.HomeParams.RegionScoreEntry - 20, // 38: coder.tailnet.v2.DERPMap.Region.nodes:type_name -> coder.tailnet.v2.DERPMap.Region.Node - 17, // 39: coder.tailnet.v2.DERPMap.RegionsEntry.value:type_name -> coder.tailnet.v2.DERPMap.Region - 6, // 40: coder.tailnet.v2.CoordinateRequest.UpdateSelf.node:type_name -> coder.tailnet.v2.Node - 6, // 41: coder.tailnet.v2.CoordinateResponse.PeerUpdate.node:type_name -> coder.tailnet.v2.Node - 0, // 42: coder.tailnet.v2.CoordinateResponse.PeerUpdate.kind:type_name -> coder.tailnet.v2.CoordinateResponse.PeerUpdate.Kind - 33, // 43: coder.tailnet.v2.Netcheck.RegionV4LatencyEntry.value:type_name -> google.protobuf.Duration - 33, // 44: coder.tailnet.v2.Netcheck.RegionV6LatencyEntry.value:type_name -> google.protobuf.Duration - 11, // 45: coder.tailnet.v2.Netcheck.NetcheckIP.fields:type_name -> coder.tailnet.v2.IPFields - 11, // 46: coder.tailnet.v2.TelemetryEvent.P2PEndpoint.fields:type_name -> coder.tailnet.v2.IPFields - 14, // 47: coder.tailnet.v2.Tailnet.PostTelemetry:input_type -> coder.tailnet.v2.TelemetryRequest - 5, // 48: coder.tailnet.v2.Tailnet.StreamDERPMaps:input_type -> coder.tailnet.v2.StreamDERPMapsRequest - 7, // 49: coder.tailnet.v2.Tailnet.RefreshResumeToken:input_type -> coder.tailnet.v2.RefreshResumeTokenRequest - 9, // 50: coder.tailnet.v2.Tailnet.Coordinate:input_type -> coder.tailnet.v2.CoordinateRequest - 15, // 51: coder.tailnet.v2.Tailnet.PostTelemetry:output_type -> coder.tailnet.v2.TelemetryResponse - 4, // 52: coder.tailnet.v2.Tailnet.StreamDERPMaps:output_type -> coder.tailnet.v2.DERPMap - 8, // 53: coder.tailnet.v2.Tailnet.RefreshResumeToken:output_type -> coder.tailnet.v2.RefreshResumeTokenResponse - 10, // 54: coder.tailnet.v2.Tailnet.Coordinate:output_type -> coder.tailnet.v2.CoordinateResponse - 51, // [51:55] is the sub-list for method output_type - 47, // [47:51] is the sub-list for method input_type - 47, // [47:47] is the sub-list for extension type_name - 47, // [47:47] is the sub-list for extension extendee - 0, // [0:47] is the sub-list for field type_name + 36, // 27: coder.tailnet.v2.TelemetryEvent.p2p_endpoint:type_name -> coder.tailnet.v2.TelemetryEvent.P2PEndpoint + 5, // 28: coder.tailnet.v2.TelemetryEvent.derp_map:type_name -> coder.tailnet.v2.DERPMap + 13, // 29: coder.tailnet.v2.TelemetryEvent.latest_netcheck:type_name -> coder.tailnet.v2.Netcheck + 38, // 30: coder.tailnet.v2.TelemetryEvent.connection_age:type_name -> google.protobuf.Duration + 38, // 31: coder.tailnet.v2.TelemetryEvent.connection_setup:type_name -> google.protobuf.Duration + 38, // 32: coder.tailnet.v2.TelemetryEvent.p2p_setup:type_name -> google.protobuf.Duration + 38, // 33: coder.tailnet.v2.TelemetryEvent.derp_latency:type_name -> google.protobuf.Duration + 38, // 34: coder.tailnet.v2.TelemetryEvent.p2p_latency:type_name -> google.protobuf.Duration + 40, // 35: coder.tailnet.v2.TelemetryEvent.throughput_mbits:type_name -> google.protobuf.FloatValue + 14, // 36: coder.tailnet.v2.TelemetryRequest.events:type_name -> coder.tailnet.v2.TelemetryEvent + 19, // 37: coder.tailnet.v2.WorkspaceUpdate.upserted_workspaces:type_name -> coder.tailnet.v2.Workspace + 20, // 38: coder.tailnet.v2.WorkspaceUpdate.upserted_agents:type_name -> coder.tailnet.v2.Agent + 19, // 39: coder.tailnet.v2.WorkspaceUpdate.deleted_workspaces:type_name -> coder.tailnet.v2.Workspace + 20, // 40: coder.tailnet.v2.WorkspaceUpdate.deleted_agents:type_name -> coder.tailnet.v2.Agent + 4, // 41: coder.tailnet.v2.Workspace.status:type_name -> coder.tailnet.v2.Workspace.Status + 24, // 42: coder.tailnet.v2.DERPMap.HomeParams.region_score:type_name -> coder.tailnet.v2.DERPMap.HomeParams.RegionScoreEntry + 25, // 43: coder.tailnet.v2.DERPMap.Region.nodes:type_name -> coder.tailnet.v2.DERPMap.Region.Node + 22, // 44: coder.tailnet.v2.DERPMap.RegionsEntry.value:type_name -> coder.tailnet.v2.DERPMap.Region + 7, // 45: coder.tailnet.v2.CoordinateRequest.UpdateSelf.node:type_name -> coder.tailnet.v2.Node + 7, // 46: coder.tailnet.v2.CoordinateResponse.PeerUpdate.node:type_name -> coder.tailnet.v2.Node + 0, // 47: coder.tailnet.v2.CoordinateResponse.PeerUpdate.kind:type_name -> coder.tailnet.v2.CoordinateResponse.PeerUpdate.Kind + 38, // 48: coder.tailnet.v2.Netcheck.RegionV4LatencyEntry.value:type_name -> google.protobuf.Duration + 38, // 49: coder.tailnet.v2.Netcheck.RegionV6LatencyEntry.value:type_name -> google.protobuf.Duration + 12, // 50: coder.tailnet.v2.Netcheck.NetcheckIP.fields:type_name -> coder.tailnet.v2.IPFields + 12, // 51: coder.tailnet.v2.TelemetryEvent.P2PEndpoint.fields:type_name -> coder.tailnet.v2.IPFields + 15, // 52: coder.tailnet.v2.Tailnet.PostTelemetry:input_type -> coder.tailnet.v2.TelemetryRequest + 6, // 53: coder.tailnet.v2.Tailnet.StreamDERPMaps:input_type -> coder.tailnet.v2.StreamDERPMapsRequest + 8, // 54: coder.tailnet.v2.Tailnet.RefreshResumeToken:input_type -> coder.tailnet.v2.RefreshResumeTokenRequest + 10, // 55: coder.tailnet.v2.Tailnet.Coordinate:input_type -> coder.tailnet.v2.CoordinateRequest + 17, // 56: coder.tailnet.v2.Tailnet.WorkspaceUpdates:input_type -> coder.tailnet.v2.WorkspaceUpdatesRequest + 16, // 57: coder.tailnet.v2.Tailnet.PostTelemetry:output_type -> coder.tailnet.v2.TelemetryResponse + 5, // 58: coder.tailnet.v2.Tailnet.StreamDERPMaps:output_type -> coder.tailnet.v2.DERPMap + 9, // 59: coder.tailnet.v2.Tailnet.RefreshResumeToken:output_type -> coder.tailnet.v2.RefreshResumeTokenResponse + 11, // 60: coder.tailnet.v2.Tailnet.Coordinate:output_type -> coder.tailnet.v2.CoordinateResponse + 18, // 61: coder.tailnet.v2.Tailnet.WorkspaceUpdates:output_type -> coder.tailnet.v2.WorkspaceUpdate + 57, // [57:62] is the sub-list for method output_type + 52, // [52:57] is the sub-list for method input_type + 52, // [52:52] is the sub-list for extension type_name + 52, // [52:52] is the sub-list for extension extendee + 0, // [0:52] is the sub-list for field type_name } func init() { file_tailnet_proto_tailnet_proto_init() } @@ -2468,7 +2848,7 @@ func file_tailnet_proto_tailnet_proto_init() { } } file_tailnet_proto_tailnet_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DERPMap_HomeParams); i { + switch v := v.(*WorkspaceUpdatesRequest); i { case 0: return &v.state case 1: @@ -2480,7 +2860,31 @@ func file_tailnet_proto_tailnet_proto_init() { } } file_tailnet_proto_tailnet_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DERPMap_Region); i { + switch v := v.(*WorkspaceUpdate); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tailnet_proto_tailnet_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Workspace); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tailnet_proto_tailnet_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Agent); i { case 0: return &v.state case 1: @@ -2492,7 +2896,7 @@ func file_tailnet_proto_tailnet_proto_init() { } } file_tailnet_proto_tailnet_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DERPMap_Region_Node); i { + switch v := v.(*DERPMap_HomeParams); i { case 0: return &v.state case 1: @@ -2503,8 +2907,8 @@ func file_tailnet_proto_tailnet_proto_init() { return nil } } - file_tailnet_proto_tailnet_proto_msgTypes[19].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CoordinateRequest_UpdateSelf); i { + file_tailnet_proto_tailnet_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DERPMap_Region); i { case 0: return &v.state case 1: @@ -2516,31 +2920,7 @@ func file_tailnet_proto_tailnet_proto_init() { } } file_tailnet_proto_tailnet_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CoordinateRequest_Disconnect); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_tailnet_proto_tailnet_proto_msgTypes[21].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CoordinateRequest_Tunnel); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_tailnet_proto_tailnet_proto_msgTypes[22].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CoordinateRequest_ReadyForHandshake); i { + switch v := v.(*DERPMap_Region_Node); i { case 0: return &v.state case 1: @@ -2552,7 +2932,31 @@ func file_tailnet_proto_tailnet_proto_init() { } } file_tailnet_proto_tailnet_proto_msgTypes[23].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CoordinateResponse_PeerUpdate); i { + switch v := v.(*CoordinateRequest_UpdateSelf); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tailnet_proto_tailnet_proto_msgTypes[24].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CoordinateRequest_Disconnect); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tailnet_proto_tailnet_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CoordinateRequest_Tunnel); i { case 0: return &v.state case 1: @@ -2564,7 +2968,7 @@ func file_tailnet_proto_tailnet_proto_init() { } } file_tailnet_proto_tailnet_proto_msgTypes[26].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Netcheck_NetcheckIP); i { + switch v := v.(*CoordinateRequest_ReadyForHandshake); i { case 0: return &v.state case 1: @@ -2576,6 +2980,30 @@ func file_tailnet_proto_tailnet_proto_init() { } } file_tailnet_proto_tailnet_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CoordinateResponse_PeerUpdate); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tailnet_proto_tailnet_proto_msgTypes[30].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Netcheck_NetcheckIP); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tailnet_proto_tailnet_proto_msgTypes[31].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*TelemetryEvent_P2PEndpoint); i { case 0: return &v.state @@ -2593,8 +3021,8 @@ func file_tailnet_proto_tailnet_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_tailnet_proto_tailnet_proto_rawDesc, - NumEnums: 4, - NumMessages: 28, + NumEnums: 5, + NumMessages: 32, NumExtensions: 0, NumServices: 1, }, diff --git a/tailnet/proto/tailnet.proto b/tailnet/proto/tailnet.proto index b375ead7c7..55af05c08a 100644 --- a/tailnet/proto/tailnet.proto +++ b/tailnet/proto/tailnet.proto @@ -198,9 +198,47 @@ message TelemetryRequest { message TelemetryResponse {} +message WorkspaceUpdatesRequest { + bytes workspace_owner_id = 1; // UUID +} + +message WorkspaceUpdate { + repeated Workspace upserted_workspaces = 1; + repeated Agent upserted_agents = 2; + repeated Workspace deleted_workspaces = 3; + repeated Agent deleted_agents = 4; +} + +message Workspace { + bytes id = 1; // UUID + string name = 2; + + enum Status { + UNKNOWN = 0; + PENDING = 1; + STARTING = 2; + RUNNING = 3; + STOPPING = 4; + STOPPED = 5; + FAILED = 6; + CANCELING = 7; + CANCELED = 8; + DELETING = 9; + DELETED = 10; + } + Status status = 3; +} + +message Agent { + bytes id = 1; // UUID + string name = 2; + bytes workspace_id = 3; // UUID +} + service Tailnet { rpc PostTelemetry(TelemetryRequest) returns (TelemetryResponse); rpc StreamDERPMaps(StreamDERPMapsRequest) returns (stream DERPMap); rpc RefreshResumeToken(RefreshResumeTokenRequest) returns (RefreshResumeTokenResponse); rpc Coordinate(stream CoordinateRequest) returns (stream CoordinateResponse); + rpc WorkspaceUpdates(WorkspaceUpdatesRequest) returns (stream WorkspaceUpdate); } diff --git a/tailnet/proto/tailnet_drpc.pb.go b/tailnet/proto/tailnet_drpc.pb.go index c0c3fcef65..9dac4c06f3 100644 --- a/tailnet/proto/tailnet_drpc.pb.go +++ b/tailnet/proto/tailnet_drpc.pb.go @@ -42,6 +42,7 @@ type DRPCTailnetClient interface { StreamDERPMaps(ctx context.Context, in *StreamDERPMapsRequest) (DRPCTailnet_StreamDERPMapsClient, error) RefreshResumeToken(ctx context.Context, in *RefreshResumeTokenRequest) (*RefreshResumeTokenResponse, error) Coordinate(ctx context.Context) (DRPCTailnet_CoordinateClient, error) + WorkspaceUpdates(ctx context.Context, in *WorkspaceUpdatesRequest) (DRPCTailnet_WorkspaceUpdatesClient, error) } type drpcTailnetClient struct { @@ -151,11 +152,52 @@ func (x *drpcTailnet_CoordinateClient) RecvMsg(m *CoordinateResponse) error { return x.MsgRecv(m, drpcEncoding_File_tailnet_proto_tailnet_proto{}) } +func (c *drpcTailnetClient) WorkspaceUpdates(ctx context.Context, in *WorkspaceUpdatesRequest) (DRPCTailnet_WorkspaceUpdatesClient, error) { + stream, err := c.cc.NewStream(ctx, "/coder.tailnet.v2.Tailnet/WorkspaceUpdates", drpcEncoding_File_tailnet_proto_tailnet_proto{}) + if err != nil { + return nil, err + } + x := &drpcTailnet_WorkspaceUpdatesClient{stream} + if err := x.MsgSend(in, drpcEncoding_File_tailnet_proto_tailnet_proto{}); err != nil { + return nil, err + } + if err := x.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type DRPCTailnet_WorkspaceUpdatesClient interface { + drpc.Stream + Recv() (*WorkspaceUpdate, error) +} + +type drpcTailnet_WorkspaceUpdatesClient struct { + drpc.Stream +} + +func (x *drpcTailnet_WorkspaceUpdatesClient) GetStream() drpc.Stream { + return x.Stream +} + +func (x *drpcTailnet_WorkspaceUpdatesClient) Recv() (*WorkspaceUpdate, error) { + m := new(WorkspaceUpdate) + if err := x.MsgRecv(m, drpcEncoding_File_tailnet_proto_tailnet_proto{}); err != nil { + return nil, err + } + return m, nil +} + +func (x *drpcTailnet_WorkspaceUpdatesClient) RecvMsg(m *WorkspaceUpdate) error { + return x.MsgRecv(m, drpcEncoding_File_tailnet_proto_tailnet_proto{}) +} + type DRPCTailnetServer interface { PostTelemetry(context.Context, *TelemetryRequest) (*TelemetryResponse, error) StreamDERPMaps(*StreamDERPMapsRequest, DRPCTailnet_StreamDERPMapsStream) error RefreshResumeToken(context.Context, *RefreshResumeTokenRequest) (*RefreshResumeTokenResponse, error) Coordinate(DRPCTailnet_CoordinateStream) error + WorkspaceUpdates(*WorkspaceUpdatesRequest, DRPCTailnet_WorkspaceUpdatesStream) error } type DRPCTailnetUnimplementedServer struct{} @@ -176,9 +218,13 @@ func (s *DRPCTailnetUnimplementedServer) Coordinate(DRPCTailnet_CoordinateStream return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } +func (s *DRPCTailnetUnimplementedServer) WorkspaceUpdates(*WorkspaceUpdatesRequest, DRPCTailnet_WorkspaceUpdatesStream) error { + return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) +} + type DRPCTailnetDescription struct{} -func (DRPCTailnetDescription) NumMethods() int { return 4 } +func (DRPCTailnetDescription) NumMethods() int { return 5 } func (DRPCTailnetDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { switch n { @@ -217,6 +263,15 @@ func (DRPCTailnetDescription) Method(n int) (string, drpc.Encoding, drpc.Receive &drpcTailnet_CoordinateStream{in1.(drpc.Stream)}, ) }, DRPCTailnetServer.Coordinate, true + case 4: + return "/coder.tailnet.v2.Tailnet/WorkspaceUpdates", drpcEncoding_File_tailnet_proto_tailnet_proto{}, + func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { + return nil, srv.(DRPCTailnetServer). + WorkspaceUpdates( + in1.(*WorkspaceUpdatesRequest), + &drpcTailnet_WorkspaceUpdatesStream{in2.(drpc.Stream)}, + ) + }, DRPCTailnetServer.WorkspaceUpdates, true default: return "", nil, nil, nil, false } @@ -296,3 +351,16 @@ func (x *drpcTailnet_CoordinateStream) Recv() (*CoordinateRequest, error) { func (x *drpcTailnet_CoordinateStream) RecvMsg(m *CoordinateRequest) error { return x.MsgRecv(m, drpcEncoding_File_tailnet_proto_tailnet_proto{}) } + +type DRPCTailnet_WorkspaceUpdatesStream interface { + drpc.Stream + Send(*WorkspaceUpdate) error +} + +type drpcTailnet_WorkspaceUpdatesStream struct { + drpc.Stream +} + +func (x *drpcTailnet_WorkspaceUpdatesStream) Send(m *WorkspaceUpdate) error { + return x.MsgSend(m, drpcEncoding_File_tailnet_proto_tailnet_proto{}) +} diff --git a/tailnet/service.go b/tailnet/service.go index 7f38f63a58..cfbbb77a98 100644 --- a/tailnet/service.go +++ b/tailnet/service.go @@ -39,13 +39,28 @@ func WithStreamID(ctx context.Context, streamID StreamID) context.Context { return context.WithValue(ctx, streamIDContextKey{}, streamID) } +type WorkspaceUpdatesProvider interface { + io.Closer + Subscribe(ctx context.Context, userID uuid.UUID) (Subscription, error) +} + +type Subscription interface { + io.Closer + Updates() <-chan *proto.WorkspaceUpdate +} + +type TunnelAuthorizer interface { + AuthorizeTunnel(ctx context.Context, agentID uuid.UUID) error +} + type ClientServiceOptions struct { - Logger slog.Logger - CoordPtr *atomic.Pointer[Coordinator] - DERPMapUpdateFrequency time.Duration - DERPMapFn func() *tailcfg.DERPMap - NetworkTelemetryHandler func(batch []*proto.TelemetryEvent) - ResumeTokenProvider ResumeTokenProvider + Logger slog.Logger + CoordPtr *atomic.Pointer[Coordinator] + DERPMapUpdateFrequency time.Duration + DERPMapFn func() *tailcfg.DERPMap + NetworkTelemetryHandler func(batch []*proto.TelemetryEvent) + ResumeTokenProvider ResumeTokenProvider + WorkspaceUpdatesProvider WorkspaceUpdatesProvider } // ClientService is a tailnet coordination service that accepts a connection and version from a @@ -64,12 +79,13 @@ func NewClientService(options ClientServiceOptions) ( s := &ClientService{Logger: options.Logger, CoordPtr: options.CoordPtr} mux := drpcmux.New() drpcService := &DRPCService{ - CoordPtr: options.CoordPtr, - Logger: options.Logger, - DerpMapUpdateFrequency: options.DERPMapUpdateFrequency, - DerpMapFn: options.DERPMapFn, - NetworkTelemetryHandler: options.NetworkTelemetryHandler, - ResumeTokenProvider: options.ResumeTokenProvider, + CoordPtr: options.CoordPtr, + Logger: options.Logger, + DerpMapUpdateFrequency: options.DERPMapUpdateFrequency, + DerpMapFn: options.DERPMapFn, + NetworkTelemetryHandler: options.NetworkTelemetryHandler, + ResumeTokenProvider: options.ResumeTokenProvider, + WorkspaceUpdatesProvider: options.WorkspaceUpdatesProvider, } err := proto.DRPCRegisterTailnet(mux, drpcService) if err != nil { @@ -89,7 +105,7 @@ func NewClientService(options ClientServiceOptions) ( return s, nil } -func (s *ClientService) ServeClient(ctx context.Context, version string, conn net.Conn, id uuid.UUID, agent uuid.UUID) error { +func (s *ClientService) ServeClient(ctx context.Context, version string, conn net.Conn, streamID StreamID) error { major, _, err := apiversion.Parse(version) if err != nil { s.Logger.Warn(ctx, "serve client called with unparsable version", slog.Error(err)) @@ -97,12 +113,6 @@ func (s *ClientService) ServeClient(ctx context.Context, version string, conn ne } switch major { case 2: - auth := ClientCoordinateeAuth{AgentID: agent} - streamID := StreamID{ - Name: "client", - ID: id, - Auth: auth, - } return s.ServeConnV2(ctx, conn, streamID) default: s.Logger.Warn(ctx, "serve client called with unsupported version", slog.F("version", version)) @@ -125,12 +135,13 @@ func (s ClientService) ServeConnV2(ctx context.Context, conn net.Conn, streamID // DRPCService is the dRPC-based, version 2.x of the tailnet API and implements proto.DRPCClientServer type DRPCService struct { - CoordPtr *atomic.Pointer[Coordinator] - Logger slog.Logger - DerpMapUpdateFrequency time.Duration - DerpMapFn func() *tailcfg.DERPMap - NetworkTelemetryHandler func(batch []*proto.TelemetryEvent) - ResumeTokenProvider ResumeTokenProvider + CoordPtr *atomic.Pointer[Coordinator] + Logger slog.Logger + DerpMapUpdateFrequency time.Duration + DerpMapFn func() *tailcfg.DERPMap + NetworkTelemetryHandler func(batch []*proto.TelemetryEvent) + ResumeTokenProvider ResumeTokenProvider + WorkspaceUpdatesProvider WorkspaceUpdatesProvider } func (s *DRPCService) PostTelemetry(_ context.Context, req *proto.TelemetryRequest) (*proto.TelemetryResponse, error) { @@ -205,6 +216,38 @@ func (s *DRPCService) Coordinate(stream proto.DRPCTailnet_CoordinateStream) erro return nil } +func (s *DRPCService) WorkspaceUpdates(req *proto.WorkspaceUpdatesRequest, stream proto.DRPCTailnet_WorkspaceUpdatesStream) error { + defer stream.Close() + + ctx := stream.Context() + + ownerID, err := uuid.FromBytes(req.WorkspaceOwnerId) + if err != nil { + return xerrors.Errorf("parse workspace owner ID: %w", err) + } + + sub, err := s.WorkspaceUpdatesProvider.Subscribe(ctx, ownerID) + if err != nil { + return xerrors.Errorf("subscribe to workspace updates: %w", err) + } + defer sub.Close() + + for { + select { + case updates, ok := <-sub.Updates(): + if !ok { + return nil + } + err := stream.Send(updates) + if err != nil { + return xerrors.Errorf("send workspace update: %w", err) + } + case <-stream.Context().Done(): + return nil + } + } +} + type communicator struct { logger slog.Logger stream proto.DRPCTailnet_CoordinateStream diff --git a/tailnet/service_test.go b/tailnet/service_test.go index 0f4b4795c4..f5a01cc2fb 100644 --- a/tailnet/service_test.go +++ b/tailnet/service_test.go @@ -1,6 +1,7 @@ package tailnet_test import ( + "context" "io" "net" "sync/atomic" @@ -52,7 +53,13 @@ func TestClientService_ServeClient_V2(t *testing.T) { agentID := uuid.MustParse("20000001-0000-0000-0000-000000000000") errCh := make(chan error, 1) go func() { - err := uut.ServeClient(ctx, "2.0", s, clientID, agentID) + err := uut.ServeClient(ctx, "2.0", s, tailnet.StreamID{ + Name: "client", + ID: clientID, + Auth: tailnet.ClientCoordinateeAuth{ + AgentID: agentID, + }, + }) t.Logf("ServeClient returned; err=%v", err) errCh <- err }() @@ -74,7 +81,7 @@ func TestClientService_ServeClient_V2(t *testing.T) { require.NotNil(t, call) require.Equal(t, call.ID, clientID) require.Equal(t, call.Name, "client") - require.NoError(t, call.Auth.Authorize(&proto.CoordinateRequest{ + require.NoError(t, call.Auth.Authorize(ctx, &proto.CoordinateRequest{ AddTunnel: &proto.CoordinateRequest_Tunnel{Id: agentID[:]}, })) req := testutil.RequireRecvCtx(ctx, t, call.Reqs) @@ -157,7 +164,13 @@ func TestClientService_ServeClient_V1(t *testing.T) { agentID := uuid.MustParse("20000001-0000-0000-0000-000000000000") errCh := make(chan error, 1) go func() { - err := uut.ServeClient(ctx, "1.0", s, clientID, agentID) + err := uut.ServeClient(ctx, "1.0", s, tailnet.StreamID{ + Name: "client", + ID: clientID, + Auth: tailnet.ClientCoordinateeAuth{ + AgentID: agentID, + }, + }) t.Logf("ServeClient returned; err=%v", err) errCh <- err }() @@ -213,3 +226,170 @@ func TestNetworkTelemetryBatcher(t *testing.T) { require.Equal(t, "5", string(batch[0].Id)) require.Equal(t, "6", string(batch[1].Id)) } + +func TestClientUserCoordinateeAuth(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + + agentID := uuid.UUID{0x01} + agentID2 := uuid.UUID{0x02} + clientID := uuid.UUID{0x03} + + updatesCh := make(chan *proto.WorkspaceUpdate, 1) + updatesProvider := &fakeUpdatesProvider{ch: updatesCh} + + fCoord, client := createUpdateService(t, ctx, clientID, updatesProvider) + + // Coordinate + stream, err := client.Coordinate(ctx) + require.NoError(t, err) + defer stream.Close() + + err = stream.Send(&proto.CoordinateRequest{ + UpdateSelf: &proto.CoordinateRequest_UpdateSelf{Node: &proto.Node{PreferredDerp: 11}}, + }) + require.NoError(t, err) + + call := testutil.RequireRecvCtx(ctx, t, fCoord.CoordinateCalls) + require.NotNil(t, call) + require.Equal(t, call.ID, clientID) + require.Equal(t, call.Name, "client") + req := testutil.RequireRecvCtx(ctx, t, call.Reqs) + require.Equal(t, int32(11), req.GetUpdateSelf().GetNode().GetPreferredDerp()) + + // Authorize uses `ClientUserCoordinateeAuth` + require.NoError(t, call.Auth.Authorize(ctx, &proto.CoordinateRequest{ + AddTunnel: &proto.CoordinateRequest_Tunnel{Id: tailnet.UUIDToByteSlice(agentID)}, + })) + require.Error(t, call.Auth.Authorize(ctx, &proto.CoordinateRequest{ + AddTunnel: &proto.CoordinateRequest_Tunnel{Id: tailnet.UUIDToByteSlice(agentID2)}, + })) +} + +func TestWorkspaceUpdates(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + updatesCh := make(chan *proto.WorkspaceUpdate, 1) + updatesProvider := &fakeUpdatesProvider{ch: updatesCh} + + clientID := uuid.UUID{0x03} + wsID := uuid.UUID{0x04} + + _, client := createUpdateService(t, ctx, clientID, updatesProvider) + + // Workspace updates + expected := &proto.WorkspaceUpdate{ + UpsertedWorkspaces: []*proto.Workspace{ + { + Id: tailnet.UUIDToByteSlice(wsID), + Name: "ws1", + Status: proto.Workspace_RUNNING, + }, + }, + UpsertedAgents: []*proto.Agent{}, + DeletedWorkspaces: []*proto.Workspace{}, + DeletedAgents: []*proto.Agent{}, + } + updatesCh <- expected + + updatesStream, err := client.WorkspaceUpdates(ctx, &proto.WorkspaceUpdatesRequest{ + WorkspaceOwnerId: tailnet.UUIDToByteSlice(clientID), + }) + require.NoError(t, err) + defer updatesStream.Close() + + updates, err := updatesStream.Recv() + require.NoError(t, err) + require.Len(t, updates.GetUpsertedWorkspaces(), 1) + require.Equal(t, expected.GetUpsertedWorkspaces()[0].GetName(), updates.GetUpsertedWorkspaces()[0].GetName()) + require.Equal(t, expected.GetUpsertedWorkspaces()[0].GetStatus(), updates.GetUpsertedWorkspaces()[0].GetStatus()) + require.Equal(t, expected.GetUpsertedWorkspaces()[0].GetId(), updates.GetUpsertedWorkspaces()[0].GetId()) +} + +//nolint:revive // t takes precedence +func createUpdateService(t *testing.T, ctx context.Context, clientID uuid.UUID, updates tailnet.WorkspaceUpdatesProvider) (*tailnettest.FakeCoordinator, proto.DRPCTailnetClient) { + fCoord := tailnettest.NewFakeCoordinator() + var coord tailnet.Coordinator = fCoord + coordPtr := atomic.Pointer[tailnet.Coordinator]{} + coordPtr.Store(&coord) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + + uut, err := tailnet.NewClientService(tailnet.ClientServiceOptions{ + Logger: logger, + CoordPtr: &coordPtr, + WorkspaceUpdatesProvider: updates, + }) + require.NoError(t, err) + + c, s := net.Pipe() + t.Cleanup(func() { + _ = c.Close() + _ = s.Close() + }) + + errCh := make(chan error, 1) + go func() { + err := uut.ServeClient(ctx, "2.0", s, tailnet.StreamID{ + Name: "client", + ID: clientID, + Auth: tailnet.ClientUserCoordinateeAuth{ + Auth: &fakeTunnelAuth{}, + }, + }) + t.Logf("ServeClient returned; err=%v", err) + errCh <- err + }() + + client, err := tailnet.NewDRPCClient(c, logger) + require.NoError(t, err) + + t.Cleanup(func() { + err = c.Close() + require.NoError(t, err) + err = testutil.RequireRecvCtx(ctx, t, errCh) + require.True(t, xerrors.Is(err, io.EOF) || xerrors.Is(err, io.ErrClosedPipe)) + }) + return fCoord, client +} + +type fakeUpdatesProvider struct { + ch chan *proto.WorkspaceUpdate +} + +func (*fakeUpdatesProvider) Close() error { + return nil +} + +func (f *fakeUpdatesProvider) Subscribe(context.Context, uuid.UUID) (tailnet.Subscription, error) { + return &fakeSubscription{ch: f.ch}, nil +} + +type fakeSubscription struct { + ch chan *proto.WorkspaceUpdate +} + +func (*fakeSubscription) Close() error { + return nil +} + +func (f *fakeSubscription) Updates() <-chan *proto.WorkspaceUpdate { + return f.ch +} + +var _ tailnet.Subscription = (*fakeSubscription)(nil) + +var _ tailnet.WorkspaceUpdatesProvider = (*fakeUpdatesProvider)(nil) + +type fakeTunnelAuth struct{} + +// AuthorizeTunnel implements tailnet.TunnelAuthorizer. +func (*fakeTunnelAuth) AuthorizeTunnel(_ context.Context, agentID uuid.UUID) error { + if agentID[0] != 1 { + return xerrors.New("policy disallows request") + } + return nil +} + +var _ tailnet.TunnelAuthorizer = (*fakeTunnelAuth)(nil) diff --git a/tailnet/test/peer.go b/tailnet/test/peer.go index ce9a507499..9426beac86 100644 --- a/tailnet/test/peer.go +++ b/tailnet/test/peer.go @@ -370,3 +370,20 @@ func (p *Peer) UngracefulDisconnect(ctx context.Context) { close(p.reqs) p.Close(ctx) } + +type FakeSubjectKey struct{} + +type FakeCoordinateeAuth struct { + Chan chan struct{} +} + +func (f FakeCoordinateeAuth) Authorize(ctx context.Context, _ *proto.CoordinateRequest) error { + _, ok := ctx.Value(FakeSubjectKey{}).(struct{}) + if !ok { + return xerrors.New("unauthorized") + } + f.Chan <- struct{}{} + return nil +} + +var _ tailnet.CoordinateeAuth = (*FakeCoordinateeAuth)(nil) diff --git a/tailnet/tunnel.go b/tailnet/tunnel.go index 3e55abb955..c1335f4c17 100644 --- a/tailnet/tunnel.go +++ b/tailnet/tunnel.go @@ -1,6 +1,7 @@ package tailnet import ( + "context" "net/netip" "github.com/google/uuid" @@ -12,13 +13,13 @@ import ( var legacyWorkspaceAgentIP = netip.MustParseAddr("fd7a:115c:a1e0:49d6:b259:b7ac:b1b2:48f4") type CoordinateeAuth interface { - Authorize(req *proto.CoordinateRequest) error + Authorize(ctx context.Context, req *proto.CoordinateRequest) error } // SingleTailnetCoordinateeAuth allows all tunnels, since Coderd and wsproxy are allowed to initiate a tunnel to any agent type SingleTailnetCoordinateeAuth struct{} -func (SingleTailnetCoordinateeAuth) Authorize(*proto.CoordinateRequest) error { +func (SingleTailnetCoordinateeAuth) Authorize(context.Context, *proto.CoordinateRequest) error { return nil } @@ -27,7 +28,7 @@ type ClientCoordinateeAuth struct { AgentID uuid.UUID } -func (c ClientCoordinateeAuth) Authorize(req *proto.CoordinateRequest) error { +func (c ClientCoordinateeAuth) Authorize(_ context.Context, req *proto.CoordinateRequest) error { if tun := req.GetAddTunnel(); tun != nil { uid, err := uuid.FromBytes(tun.Id) if err != nil { @@ -39,24 +40,7 @@ func (c ClientCoordinateeAuth) Authorize(req *proto.CoordinateRequest) error { } } - if upd := req.GetUpdateSelf(); upd != nil { - for _, addrStr := range upd.Node.Addresses { - pre, err := netip.ParsePrefix(addrStr) - if err != nil { - return xerrors.Errorf("parse node address: %w", err) - } - - if pre.Bits() != 128 { - return xerrors.Errorf("invalid address bits, expected 128, got %d", pre.Bits()) - } - } - } - - if rfh := req.GetReadyForHandshake(); rfh != nil { - return xerrors.Errorf("clients may not send ready_for_handshake") - } - - return nil + return handleClientNodeRequests(req) } // AgentCoordinateeAuth disallows all tunnels, since agents are not allowed to initiate their own tunnels @@ -64,7 +48,7 @@ type AgentCoordinateeAuth struct { ID uuid.UUID } -func (a AgentCoordinateeAuth) Authorize(req *proto.CoordinateRequest) error { +func (a AgentCoordinateeAuth) Authorize(_ context.Context, req *proto.CoordinateRequest) error { if tun := req.GetAddTunnel(); tun != nil { return xerrors.New("agents cannot open tunnels") } @@ -91,6 +75,46 @@ func (a AgentCoordinateeAuth) Authorize(req *proto.CoordinateRequest) error { return nil } +type ClientUserCoordinateeAuth struct { + Auth TunnelAuthorizer +} + +func (a ClientUserCoordinateeAuth) Authorize(ctx context.Context, req *proto.CoordinateRequest) error { + if tun := req.GetAddTunnel(); tun != nil { + uid, err := uuid.FromBytes(tun.Id) + if err != nil { + return xerrors.Errorf("parse add tunnel id: %w", err) + } + err = a.Auth.AuthorizeTunnel(ctx, uid) + if err != nil { + return xerrors.Errorf("workspace agent not found or you do not have permission") + } + } + + return handleClientNodeRequests(req) +} + +// handleClientNodeRequests validates GetUpdateSelf requests and declines ReadyForHandshake requests +func handleClientNodeRequests(req *proto.CoordinateRequest) error { + if upd := req.GetUpdateSelf(); upd != nil { + for _, addrStr := range upd.Node.Addresses { + pre, err := netip.ParsePrefix(addrStr) + if err != nil { + return xerrors.Errorf("parse node address: %w", err) + } + + if pre.Bits() != 128 { + return xerrors.Errorf("invalid address bits, expected 128, got %d", pre.Bits()) + } + } + } + + if rfh := req.GetReadyForHandshake(); rfh != nil { + return xerrors.Errorf("clients may not send ready_for_handshake") + } + return nil +} + // tunnelStore contains tunnel information and allows querying it. It is not threadsafe and all // methods must be serialized by holding, e.g. the core mutex. type tunnelStore struct {