mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
feat(agent): add subagent ID fields to devcontainers in manifest (#21848)
Update the agent protobuf schema (agent/proto/agent.proto) to include: - subagent_id field in WorkspaceAgentDevcontainer message - id field in CreateSubAgentRequest message Bump the Agent API version from v2.7 to v2.8 and update all client references throughout the codebase (ConnectRPC27 -> ConnectRPC28, DRPCAgentClient27 -> DRPCAgentClient28).
This commit is contained in:
+22
-22
@@ -108,8 +108,8 @@ type Options struct {
|
||||
}
|
||||
|
||||
type Client interface {
|
||||
ConnectRPC27(ctx context.Context) (
|
||||
proto.DRPCAgentClient27, tailnetproto.DRPCTailnetClient27, error,
|
||||
ConnectRPC28(ctx context.Context) (
|
||||
proto.DRPCAgentClient28, tailnetproto.DRPCTailnetClient28, error,
|
||||
)
|
||||
tailnet.DERPMapRewriter
|
||||
agentsdk.RefreshableSessionTokenProvider
|
||||
@@ -533,7 +533,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
|
||||
fn()
|
||||
}
|
||||
|
||||
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
tickerDone := make(chan struct{})
|
||||
collectDone := make(chan struct{})
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
@@ -748,7 +748,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient27
|
||||
|
||||
// reportLifecycle reports the current lifecycle state once. All state
|
||||
// changes are reported in order.
|
||||
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
for {
|
||||
select {
|
||||
case <-a.lifecycleUpdate:
|
||||
@@ -828,7 +828,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
|
||||
}
|
||||
|
||||
// reportConnectionsLoop reports connections to the agent for auditing.
|
||||
func (a *agent) reportConnectionsLoop(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
func (a *agent) reportConnectionsLoop(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
for {
|
||||
select {
|
||||
case <-a.reportConnectionsUpdate:
|
||||
@@ -963,7 +963,7 @@ func (a *agent) reportConnection(id uuid.UUID, connectionType proto.Connection_T
|
||||
// fetchServiceBannerLoop fetches the service banner on an interval. It will
|
||||
// not be fetched immediately; the expectation is that it is primed elsewhere
|
||||
// (and must be done before the session actually starts).
|
||||
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
ticker := time.NewTicker(a.announcementBannersRefreshInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
@@ -998,7 +998,7 @@ func (a *agent) run() (retErr error) {
|
||||
}
|
||||
|
||||
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
|
||||
aAPI, tAPI, err := a.client.ConnectRPC27(a.hardCtx)
|
||||
aAPI, tAPI, err := a.client.ConnectRPC28(a.hardCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1015,7 +1015,7 @@ func (a *agent) run() (retErr error) {
|
||||
connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI)
|
||||
|
||||
connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop,
|
||||
func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
func(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("fetch service banner: %w", err)
|
||||
@@ -1032,7 +1032,7 @@ func (a *agent) run() (retErr error) {
|
||||
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
|
||||
// shutdown scripts.
|
||||
connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain,
|
||||
func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
func(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
err := a.logSender.SendLoop(ctx, aAPI)
|
||||
if xerrors.Is(err, agentsdk.ErrLogLimitExceeded) {
|
||||
// we don't want this error to tear down the API connection and propagate to the
|
||||
@@ -1046,7 +1046,7 @@ func (a *agent) run() (retErr error) {
|
||||
// Forward boundary audit logs to coderd if boundary log forwarding is enabled.
|
||||
// These are audit logs so they should continue during graceful shutdown.
|
||||
if a.boundaryLogProxy != nil {
|
||||
proxyFunc := func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
proxyFunc := func(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
return a.boundaryLogProxy.RunForwarder(ctx, aAPI)
|
||||
}
|
||||
connMan.startAgentAPI("boundary log proxy", gracefulShutdownBehaviorRemain, proxyFunc)
|
||||
@@ -1060,7 +1060,7 @@ func (a *agent) run() (retErr error) {
|
||||
connMan.startAgentAPI("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)
|
||||
|
||||
// resources monitor can cease as soon as we start gracefully shutting down.
|
||||
connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
logger := a.logger.Named("resources_monitor")
|
||||
clk := quartz.NewReal()
|
||||
config, err := aAPI.GetResourcesMonitoringConfiguration(ctx, &proto.GetResourcesMonitoringConfigurationRequest{})
|
||||
@@ -1107,7 +1107,7 @@ func (a *agent) run() (retErr error) {
|
||||
connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
|
||||
|
||||
connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop,
|
||||
func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
func(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
if err := manifestOK.wait(ctx); err != nil {
|
||||
return xerrors.Errorf("no manifest: %w", err)
|
||||
}
|
||||
@@ -1140,7 +1140,7 @@ func (a *agent) run() (retErr error) {
|
||||
|
||||
connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)
|
||||
|
||||
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
if err := networkOK.wait(ctx); err != nil {
|
||||
return xerrors.Errorf("no network: %w", err)
|
||||
}
|
||||
@@ -1155,8 +1155,8 @@ func (a *agent) run() (retErr error) {
|
||||
}
|
||||
|
||||
// handleManifest returns a function that fetches and processes the manifest
|
||||
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
return func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
|
||||
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
return func(ctx context.Context, aAPI proto.DRPCAgentClient28) error {
|
||||
var (
|
||||
sentResult = false
|
||||
err error
|
||||
@@ -1319,7 +1319,7 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
|
||||
|
||||
func (a *agent) createDevcontainer(
|
||||
ctx context.Context,
|
||||
aAPI proto.DRPCAgentClient27,
|
||||
aAPI proto.DRPCAgentClient28,
|
||||
dc codersdk.WorkspaceAgentDevcontainer,
|
||||
script codersdk.WorkspaceAgentScript,
|
||||
) (err error) {
|
||||
@@ -1351,8 +1351,8 @@ func (a *agent) createDevcontainer(
|
||||
|
||||
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
|
||||
// the tailnet using the information in the manifest
|
||||
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient27) error {
|
||||
return func(ctx context.Context, aAPI proto.DRPCAgentClient27) (retErr error) {
|
||||
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient28) error {
|
||||
return func(ctx context.Context, aAPI proto.DRPCAgentClient28) (retErr error) {
|
||||
if err := manifestOK.wait(ctx); err != nil {
|
||||
return xerrors.Errorf("no manifest: %w", err)
|
||||
}
|
||||
@@ -2146,8 +2146,8 @@ const (
|
||||
|
||||
type apiConnRoutineManager struct {
|
||||
logger slog.Logger
|
||||
aAPI proto.DRPCAgentClient27
|
||||
tAPI tailnetproto.DRPCTailnetClient24
|
||||
aAPI proto.DRPCAgentClient28
|
||||
tAPI tailnetproto.DRPCTailnetClient28
|
||||
eg *errgroup.Group
|
||||
stopCtx context.Context
|
||||
remainCtx context.Context
|
||||
@@ -2155,7 +2155,7 @@ type apiConnRoutineManager struct {
|
||||
|
||||
func newAPIConnRoutineManager(
|
||||
gracefulCtx, hardCtx context.Context, logger slog.Logger,
|
||||
aAPI proto.DRPCAgentClient27, tAPI tailnetproto.DRPCTailnetClient24,
|
||||
aAPI proto.DRPCAgentClient28, tAPI tailnetproto.DRPCTailnetClient28,
|
||||
) *apiConnRoutineManager {
|
||||
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
|
||||
// exit if the errgroup hits an error, which usually means a problem with the conn.
|
||||
@@ -2188,7 +2188,7 @@ func newAPIConnRoutineManager(
|
||||
// but for Tailnet.
|
||||
func (a *apiConnRoutineManager) startAgentAPI(
|
||||
name string, behavior gracefulShutdownBehavior,
|
||||
f func(context.Context, proto.DRPCAgentClient27) error,
|
||||
f func(context.Context, proto.DRPCAgentClient28) error,
|
||||
) {
|
||||
logger := a.logger.With(slog.F("name", name))
|
||||
var ctx context.Context
|
||||
|
||||
@@ -146,12 +146,12 @@ type SubAgentClient interface {
|
||||
// agent API client.
|
||||
type subAgentAPIClient struct {
|
||||
logger slog.Logger
|
||||
api agentproto.DRPCAgentClient27
|
||||
api agentproto.DRPCAgentClient28
|
||||
}
|
||||
|
||||
var _ SubAgentClient = (*subAgentAPIClient)(nil)
|
||||
|
||||
func NewSubAgentClientFromAPI(logger slog.Logger, agentAPI agentproto.DRPCAgentClient27) SubAgentClient {
|
||||
func NewSubAgentClientFromAPI(logger slog.Logger, agentAPI agentproto.DRPCAgentClient28) SubAgentClient {
|
||||
if agentAPI == nil {
|
||||
panic("developer error: agentAPI cannot be nil")
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ func TestSubAgentClient_CreateWithDisplayApps(t *testing.T) {
|
||||
|
||||
agentAPI := agenttest.NewClient(t, logger, uuid.New(), agentsdk.Manifest{}, statsCh, tailnet.NewCoordinator(logger))
|
||||
|
||||
agentClient, _, err := agentAPI.ConnectRPC27(ctx)
|
||||
agentClient, _, err := agentAPI.ConnectRPC28(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
subAgentClient := agentcontainers.NewSubAgentClientFromAPI(logger, agentClient)
|
||||
@@ -245,7 +245,7 @@ func TestSubAgentClient_CreateWithDisplayApps(t *testing.T) {
|
||||
|
||||
agentAPI := agenttest.NewClient(t, logger, uuid.New(), agentsdk.Manifest{}, statsCh, tailnet.NewCoordinator(logger))
|
||||
|
||||
agentClient, _, err := agentAPI.ConnectRPC27(ctx)
|
||||
agentClient, _, err := agentAPI.ConnectRPC28(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
subAgentClient := agentcontainers.NewSubAgentClientFromAPI(logger, agentClient)
|
||||
|
||||
@@ -124,8 +124,8 @@ func (c *Client) Close() {
|
||||
c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
|
||||
}
|
||||
|
||||
func (c *Client) ConnectRPC27(ctx context.Context) (
|
||||
agentproto.DRPCAgentClient27, proto.DRPCTailnetClient27, error,
|
||||
func (c *Client) ConnectRPC28(ctx context.Context) (
|
||||
agentproto.DRPCAgentClient28, proto.DRPCTailnetClient28, error,
|
||||
) {
|
||||
conn, lis := drpcsdk.MemTransportPipe()
|
||||
c.LastWorkspaceAgent = func() {
|
||||
|
||||
+603
-580
File diff suppressed because it is too large
Load Diff
@@ -105,6 +105,7 @@ message WorkspaceAgentDevcontainer {
|
||||
string workspace_folder = 2;
|
||||
string config_path = 3;
|
||||
string name = 4;
|
||||
optional bytes subagent_id = 5;
|
||||
}
|
||||
|
||||
message GetManifestRequest {}
|
||||
@@ -435,6 +436,8 @@ message CreateSubAgentRequest {
|
||||
}
|
||||
|
||||
repeated DisplayApp display_apps = 6;
|
||||
|
||||
optional bytes id = 7;
|
||||
}
|
||||
|
||||
message CreateSubAgentResponse {
|
||||
|
||||
@@ -72,3 +72,10 @@ type DRPCAgentClient27 interface {
|
||||
DRPCAgentClient26
|
||||
ReportBoundaryLogs(ctx context.Context, in *ReportBoundaryLogsRequest) (*ReportBoundaryLogsResponse, error)
|
||||
}
|
||||
|
||||
// DRPCAgentClient28 is the Agent API at v2.8. It adds a SubagentId field to the
|
||||
// WorkspaceAgentDevcontainer message, and a Id field to the CreateSubAgentRequest
|
||||
// message. Compatible with Coder v2.31+
|
||||
type DRPCAgentClient28 interface {
|
||||
DRPCAgentClient27
|
||||
}
|
||||
|
||||
@@ -3096,7 +3096,7 @@ func requireGetManifest(ctx context.Context, t testing.TB, aAPI agentproto.DRPCA
|
||||
}
|
||||
|
||||
func postStartup(ctx context.Context, t testing.TB, client agent.Client, startup *agentproto.Startup) error {
|
||||
aAPI, _, err := client.ConnectRPC27(ctx)
|
||||
aAPI, _, err := client.ConnectRPC28(ctx)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
cErr := aAPI.DRPCConn().Close()
|
||||
|
||||
@@ -267,6 +267,18 @@ func (c *Client) ConnectRPC27(ctx context.Context) (
|
||||
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
|
||||
}
|
||||
|
||||
// ConnectRPC28 returns a dRPC client to the Agent API v2.8. It is useful when you want to be
|
||||
// maximally compatible with Coderd Release Versions from 2.31+
|
||||
func (c *Client) ConnectRPC28(ctx context.Context) (
|
||||
proto.DRPCAgentClient28, tailnetproto.DRPCTailnetClient28, error,
|
||||
) {
|
||||
conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 8))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
|
||||
}
|
||||
|
||||
// ConnectRPC connects to the workspace agent API and tailnet API
|
||||
func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
|
||||
return c.connectRPCVersion(ctx, proto.CurrentVersion)
|
||||
|
||||
@@ -51,7 +51,12 @@ type DRPCTailnetClient26 interface {
|
||||
DRPCTailnetClient25
|
||||
}
|
||||
|
||||
// DRPCTailnetClient26 is the Tailnet API at v2.7.
|
||||
// DRPCTailnetClient27 is the Tailnet API at v2.7.
|
||||
type DRPCTailnetClient27 interface {
|
||||
DRPCTailnetClient26
|
||||
}
|
||||
|
||||
// DRPCTailnetClient28 is the Tailnet API at v2.8.
|
||||
type DRPCTailnetClient28 interface {
|
||||
DRPCTailnetClient27
|
||||
}
|
||||
|
||||
@@ -60,9 +60,12 @@ import (
|
||||
// API v2.7:
|
||||
// - Added support for ReportBoundaryLogs RPC on the Agent API for forwarding
|
||||
// boundary audit logs to coderd.
|
||||
//
|
||||
// API v2.8:
|
||||
// - Added support for pre-created sub agents on the Agent API.
|
||||
const (
|
||||
CurrentMajor = 2
|
||||
CurrentMinor = 7
|
||||
CurrentMinor = 8
|
||||
)
|
||||
|
||||
var CurrentVersion = apiversion.New(CurrentMajor, CurrentMinor)
|
||||
|
||||
Reference in New Issue
Block a user