Merge branch 'release/2.34' into backport/25585-to-2.34

This commit is contained in:
TJ
2026-05-28 14:21:10 -07:00
committed by GitHub
18 changed files with 704 additions and 125 deletions
+51 -28
View File
@@ -37,6 +37,7 @@ func newAIBridgeDaemon(coderAPI *coderd.API, providers []aibridge.Provider, cfg
reg := prometheus.WrapRegistererWithPrefix("coder_aibridged_", coderAPI.PrometheusRegistry)
metrics := aibridge.NewMetrics(reg)
providerMetrics := aibridged.NewMetrics(reg)
tracer := coderAPI.TracerProvider.Tracer(tracing.TracerName)
// Create pool for reusable stateful [aibridge.RequestBridge] instances (one per user).
@@ -50,10 +51,11 @@ func newAIBridgeDaemon(coderAPI *coderd.API, providers []aibridge.Provider, cfg
// derives from env config and serves as a fallback if the database
// load fails inside the reloader.
reloader := &poolDBReloader{
pool: pool,
db: coderAPI.Database,
cfg: cfg,
logger: logger.Named("provider-loader"),
pool: pool,
db: coderAPI.Database,
cfg: cfg,
logger: logger.Named("provider-loader"),
metrics: providerMetrics,
}
unsubscribe, err := aibridged.SubscribeProviderReload(ctx, coderAPI.Pubsub, reloader, logger.Named("provider-reload"))
if err != nil {
@@ -78,14 +80,16 @@ func newAIBridgeDaemon(coderAPI *coderd.API, providers []aibridge.Provider, cfg
// the live provider set from the database and forwarding it to the
// pool.
type poolDBReloader struct {
pool *aibridged.CachedBridgePool
db database.Store
cfg codersdk.AIBridgeConfig
logger slog.Logger
pool *aibridged.CachedBridgePool
db database.Store
cfg codersdk.AIBridgeConfig
logger slog.Logger
metrics *aibridged.Metrics
}
func (r *poolDBReloader) Reload(ctx context.Context) error {
providers, err := BuildProviders(ctx, r.db, r.cfg, r.logger)
r.metrics.RecordReloadAttempt()
providers, outcomes, err := BuildProviders(ctx, r.db, r.cfg, r.logger)
if err != nil {
// Keep the previous snapshot in place: dropping all providers
// because the DB read failed would compound the visible failure
@@ -93,19 +97,15 @@ func (r *poolDBReloader) Reload(ctx context.Context) error {
return xerrors.Errorf("load ai providers from database: %w", err)
}
r.pool.ReplaceProviders(providers)
r.metrics.RecordReloadSuccess(outcomes)
return nil
}
// BuildProviders loads every enabled ai_providers row, attaches its
// keys, and constructs the equivalent [aibridge.Provider] instances.
// The database is the single source of truth for runtime provider
// configuration.
//
// Per-provider construction errors are logged and the offending row is
// excluded from the returned snapshot; only a failure of the DB query
// itself is propagated. This keeps a single misconfigured row from
// taking the whole daemon down.
func BuildProviders(ctx context.Context, db database.Store, cfg codersdk.AIBridgeConfig, logger slog.Logger) ([]aibridge.Provider, error) {
// BuildProviders loads every ai_providers row (including disabled)
// and returns the active provider list plus per-row outcomes. Per-row
// build errors are logged and excluded from providers but recorded in
// outcomes; only DB query failures propagate.
func BuildProviders(ctx context.Context, db database.Store, cfg codersdk.AIBridgeConfig, logger slog.Logger) ([]aibridge.Provider, []aibridged.ProviderOutcome, error) {
//nolint:gocritic // AsAIBridged has a minimal permission set for this purpose.
authCtx := dbauthz.AsAIBridged(ctx)
@@ -117,7 +117,7 @@ func BuildProviders(ctx context.Context, db database.Store, cfg codersdk.AIBridg
err := db.InTx(func(tx database.Store) error {
var err error
rows, err = tx.GetAIProviders(authCtx, database.GetAIProvidersParams{
IncludeDisabled: false,
IncludeDisabled: true,
})
if err != nil {
return xerrors.Errorf("load ai providers: %w", err)
@@ -129,9 +129,15 @@ func BuildProviders(ctx context.Context, db database.Store, cfg codersdk.AIBridg
// Load keys only for the enabled providers to avoid materializing
// secrets for disabled rows.
ids := make([]uuid.UUID, len(rows))
for i, r := range rows {
ids[i] = r.ID
ids := make([]uuid.UUID, 0, len(rows))
for _, r := range rows {
if !r.Enabled {
continue
}
ids = append(ids, r.ID)
}
if len(ids) == 0 {
return nil
}
keyRows, err := tx.GetAIProviderKeysByProviderIDs(authCtx, ids)
if err != nil {
@@ -143,13 +149,28 @@ func BuildProviders(ctx context.Context, db database.Store, cfg codersdk.AIBridg
return nil
}, &database.TxOptions{ReadOnly: true, TxIdentifier: "build_ai_providers"})
if err != nil {
return nil, err
return nil, nil, err
}
out := make([]aibridge.Provider, 0, len(rows))
providers := make([]aibridge.Provider, 0, len(rows))
outcomes := make([]aibridged.ProviderOutcome, 0, len(rows))
enabledCount := 0
for _, row := range rows {
outcome := aibridged.ProviderOutcome{
Name: row.Name,
Type: string(row.Type),
}
if !row.Enabled {
outcome.Status = aibridged.ProviderStatusDisabled
outcomes = append(outcomes, outcome)
continue
}
enabledCount++
prov, err := buildAIProviderFromRow(row, keysByProvider[row.ID], cfg)
if err != nil {
outcome.Status = aibridged.ProviderStatusError
outcome.Err = err
outcomes = append(outcomes, outcome)
logger.Error(ctx, "skipping misconfigured ai provider",
slog.F("provider_id", row.ID),
slog.F("provider_name", row.Name),
@@ -158,14 +179,16 @@ func BuildProviders(ctx context.Context, db database.Store, cfg codersdk.AIBridg
)
continue
}
out = append(out, prov)
outcome.Status = aibridged.ProviderStatusEnabled
outcomes = append(outcomes, outcome)
providers = append(providers, prov)
}
if len(rows) > 0 && len(out) == 0 {
if enabledCount > 0 && len(providers) == 0 {
logger.Warn(ctx, "all enabled ai providers failed to build; daemon will start with zero providers")
}
return out, nil
return providers, outcomes, nil
}
// buildAIProviderFromRow decodes the settings blob and constructs the
+46 -7
View File
@@ -13,6 +13,7 @@ import (
"github.com/coder/coder/v2/aibridge"
"github.com/coder/coder/v2/coderd"
agplaibridge "github.com/coder/coder/v2/coderd/aibridge"
"github.com/coder/coder/v2/coderd/aibridged"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
@@ -35,7 +36,8 @@ func buildFromEnv(t *testing.T, cfg codersdk.AIBridgeConfig) ([]aibridge.Provide
if err := coderd.SeedAIProvidersFromEnv(ctx, db, cfg, logger); err != nil {
return nil, err
}
return BuildProviders(ctx, db, cfg, logger)
providers, _, err := BuildProviders(ctx, db, cfg, logger)
return providers, err
}
func TestBuildProviders(t *testing.T) {
@@ -323,28 +325,35 @@ func TestBuildProvidersSkipsBadRows(t *testing.T) {
Settings: sql.NullString{String: "not-json", Valid: true},
})
providers, err := BuildProviders(ctx, db, codersdk.AIBridgeConfig{}, logger)
providers, outcomes, err := BuildProviders(ctx, db, codersdk.AIBridgeConfig{}, logger)
require.NoError(t, err)
assert.Empty(t, providers)
require.Len(t, outcomes, 1)
assert.Equal(t, "anthropic-broken", outcomes[0].Name)
assert.Equal(t, aibridged.ProviderStatusError, outcomes[0].Status)
assert.Error(t, outcomes[0].Err)
})
t.Run("UnsupportedType", func(t *testing.T) {
t.Run("EnabledButNoKeys", func(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
// Azure is a valid DB-level provider type but has no runtime
// builder yet; it must hit the default branch and be skipped.
// Azure routes through the OpenAI-family builder, which rejects
// rows without keys when BYOK is disabled. The row must be
// classified as error and excluded from the snapshot.
dbgen.AIProvider(t, db, database.AIProvider{
Type: database.AiProviderTypeAzure,
Name: "azure-openai",
BaseUrl: "https://example.openai.azure.com/",
})
providers, err := BuildProviders(ctx, db, codersdk.AIBridgeConfig{}, logger)
providers, outcomes, err := BuildProviders(ctx, db, codersdk.AIBridgeConfig{}, logger)
require.NoError(t, err)
assert.Empty(t, providers)
require.Len(t, outcomes, 1)
assert.Equal(t, aibridged.ProviderStatusError, outcomes[0].Status)
})
t.Run("BadRowDoesNotBlockGoodRow", func(t *testing.T) {
@@ -369,10 +378,40 @@ func TestBuildProvidersSkipsBadRows(t *testing.T) {
APIKey: "sk-good",
})
providers, err := BuildProviders(ctx, db, codersdk.AIBridgeConfig{}, logger)
providers, outcomes, err := BuildProviders(ctx, db, codersdk.AIBridgeConfig{}, logger)
require.NoError(t, err)
require.Len(t, providers, 1)
assert.Equal(t, "openai-good", providers[0].Name())
require.Len(t, outcomes, 2)
byName := map[string]aibridged.ProviderOutcome{}
for _, o := range outcomes {
byName[o.Name] = o
}
assert.Equal(t, aibridged.ProviderStatusError, byName["anthropic-broken"].Status)
assert.Equal(t, aibridged.ProviderStatusEnabled, byName["openai-good"].Status)
})
t.Run("DisabledRowClassifiedAsDisabled", func(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil)
dbgen.AIProvider(t, db, database.AIProvider{
Type: database.AiProviderTypeOpenai,
Name: "openai-off",
BaseUrl: "https://api.openai.com/",
}, func(p *database.InsertAIProviderParams) {
p.Enabled = false
})
providers, outcomes, err := BuildProviders(ctx, db, codersdk.AIBridgeConfig{}, logger)
require.NoError(t, err)
assert.Empty(t, providers, "disabled providers must not be in the active snapshot")
require.Len(t, outcomes, 1)
assert.Equal(t, "openai-off", outcomes[0].Name)
assert.Equal(t, aibridged.ProviderStatusDisabled, outcomes[0].Status)
assert.NoError(t, outcomes[0].Err)
})
}
+1 -1
View File
@@ -1041,7 +1041,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
// unconditionally when the bridge feature is enabled by config so
// chatd can use it regardless of license entitlement.
if vals.AI.BridgeConfig.Enabled.Value() {
aibridgeProviders, err := BuildProviders(aibridgeInitCtx, options.Database, vals.AI.BridgeConfig, logger.Named("aibridge.providers"))
aibridgeProviders, _, err := BuildProviders(aibridgeInitCtx, options.Database, vals.AI.BridgeConfig, logger.Named("aibridge.providers"))
if err != nil {
return xerrors.Errorf("build AI providers: %w", err)
}
+94
View File
@@ -0,0 +1,94 @@
package aibridged
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// Metrics is the prometheus surface for aibridged provider reloads.
type Metrics struct {
registerer prometheus.Registerer
// ProviderInfo is one series per configured provider; value is
// always 1 and the status label carries the alertable signal.
// Labels: provider_name, provider_type, status.
ProviderInfo *prometheus.GaugeVec
// ProvidersLastReloadTimestampSeconds is the unix timestamp of the
// last reload attempt, success or failure.
ProvidersLastReloadTimestampSeconds prometheus.Gauge
// ProvidersLastReloadSuccessTimestampSeconds is the unix timestamp
// of the last reload that successfully refreshed the pool. A gap
// against ProvidersLastReloadTimestampSeconds means the loop is
// firing but the refresh function is failing.
ProvidersLastReloadSuccessTimestampSeconds prometheus.Gauge
}
// NewMetrics registers the provider metrics against reg.
func NewMetrics(reg prometheus.Registerer) *Metrics {
factory := promauto.With(reg)
return &Metrics{
registerer: reg,
ProviderInfo: factory.NewGaugeVec(prometheus.GaugeOpts{
Name: "provider_info",
Help: "One series per configured AI provider. Value is always 1; the status label (enabled, disabled, error) carries the alertable signal.",
}, []string{"provider_name", "provider_type", "status"}),
ProvidersLastReloadTimestampSeconds: factory.NewGauge(prometheus.GaugeOpts{
Name: "providers_last_reload_timestamp_seconds",
Help: "Unix timestamp of the last provider reload attempt, success or failure.",
}),
ProvidersLastReloadSuccessTimestampSeconds: factory.NewGauge(prometheus.GaugeOpts{
Name: "providers_last_reload_success_timestamp_seconds",
Help: "Unix timestamp of the last provider reload that successfully refreshed the pool. A gap against coder_aibridged_providers_last_reload_timestamp_seconds means the loop is firing but the refresh function is failing.",
}),
}
}
// Unregister removes the provider metrics from the registerer.
func (m *Metrics) Unregister() {
if m == nil {
return
}
m.registerer.Unregister(m.ProviderInfo)
m.registerer.Unregister(m.ProvidersLastReloadTimestampSeconds)
m.registerer.Unregister(m.ProvidersLastReloadSuccessTimestampSeconds)
}
// RecordReloadAttempt stamps the attempt-time gauge at the start of a
// reload. A reload that hangs mid-flight is detected by watching the
// gap between this gauge and ProvidersLastReloadSuccessTimestampSeconds.
func (m *Metrics) RecordReloadAttempt() {
if m == nil {
return
}
m.ProvidersLastReloadTimestampSeconds.Set(float64(time.Now().Unix()))
}
// RecordReloadSuccess rewrites the ProviderInfo GaugeVec from the
// outcomes and stamps the success-time gauge. Reset clears series for
// providers that have left the configuration so they don't linger as
// stale.
func (m *Metrics) RecordReloadSuccess(outcomes []ProviderOutcome) {
if m == nil {
return
}
WriteProviderInfoSnapshot(m.ProviderInfo, outcomes)
m.ProvidersLastReloadSuccessTimestampSeconds.Set(float64(time.Now().Unix()))
}
// WriteProviderInfoSnapshot Resets info and writes one series per
// outcome. Both aibridged and aibridgeproxyd use this so the
// provider_info recording contract stays in one place.
func WriteProviderInfoSnapshot(info *prometheus.GaugeVec, outcomes []ProviderOutcome) {
info.Reset()
for _, o := range outcomes {
info.WithLabelValues(o.Name, o.Type, string(o.Status)).Set(1)
}
}
+84
View File
@@ -0,0 +1,84 @@
package aibridged_test
import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/coder/coder/v2/coderd/aibridged"
)
// TestMetricsRecordReloadSuccess covers the provider_info GaugeVec
// surface: every reload pass rewrites the series for the current
// outcomes and the Reset on each pass drops stale series.
func TestMetricsRecordReloadSuccess(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
m := aibridged.NewMetrics(reg)
outcomes := []aibridged.ProviderOutcome{
{Name: "alpha", Type: "openai", Status: aibridged.ProviderStatusEnabled},
{Name: "beta", Type: "anthropic", Status: aibridged.ProviderStatusDisabled},
{Name: "gamma", Type: "openai", Status: aibridged.ProviderStatusError, Err: xerrors.New("bad config")},
}
before := time.Now().Unix()
m.RecordReloadAttempt()
m.RecordReloadSuccess(outcomes)
after := time.Now().Unix()
assert.Equal(t, 1.0, promtest.ToFloat64(m.ProviderInfo.WithLabelValues("alpha", "openai", "enabled")))
assert.Equal(t, 1.0, promtest.ToFloat64(m.ProviderInfo.WithLabelValues("beta", "anthropic", "disabled")))
assert.Equal(t, 1.0, promtest.ToFloat64(m.ProviderInfo.WithLabelValues("gamma", "openai", "error")))
attemptTS := int64(promtest.ToFloat64(m.ProvidersLastReloadTimestampSeconds))
successTS := int64(promtest.ToFloat64(m.ProvidersLastReloadSuccessTimestampSeconds))
assert.GreaterOrEqual(t, attemptTS, before)
assert.LessOrEqual(t, attemptTS, after)
assert.GreaterOrEqual(t, successTS, before)
assert.LessOrEqual(t, successTS, after)
}
// TestMetricsResetsStaleProviderSeries verifies that providers removed
// from the outcome set between reloads do not leave behind stale
// series.
func TestMetricsResetsStaleProviderSeries(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
m := aibridged.NewMetrics(reg)
m.RecordReloadSuccess([]aibridged.ProviderOutcome{
{Name: "alpha", Type: "openai", Status: aibridged.ProviderStatusEnabled},
{Name: "beta", Type: "anthropic", Status: aibridged.ProviderStatusEnabled},
})
require.Equal(t, 2, promtest.CollectAndCount(m.ProviderInfo))
m.RecordReloadSuccess([]aibridged.ProviderOutcome{
{Name: "alpha", Type: "openai", Status: aibridged.ProviderStatusEnabled},
})
assert.Equal(t, 1, promtest.CollectAndCount(m.ProviderInfo),
"beta should have been Reset out of the GaugeVec")
assert.Equal(t, 1.0, promtest.ToFloat64(m.ProviderInfo.WithLabelValues("alpha", "openai", "enabled")))
}
// TestMetricsNilSafe asserts the helpers tolerate a nil receiver so
// callers can pass `nil` to disable metric updates without guarding
// every call site.
func TestMetricsNilSafe(t *testing.T) {
t.Parallel()
var m *aibridged.Metrics
require.NotPanics(t, func() {
m.RecordReloadAttempt()
m.RecordReloadSuccess(nil)
m.Unregister()
})
}
+28
View File
@@ -0,0 +1,28 @@
package aibridged
// ProviderStatus is the lifecycle state of a configured AI provider.
type ProviderStatus string
const (
// ProviderStatusEnabled indicates the provider is configured and
// valid, and is included in the active pool snapshot.
ProviderStatusEnabled ProviderStatus = "enabled"
// ProviderStatusDisabled indicates the provider is configured but
// intentionally turned off by an operator.
ProviderStatusDisabled ProviderStatus = "disabled"
// ProviderStatusError indicates the provider is configured but
// cannot be constructed (missing keys, unsupported type, malformed
// settings).
ProviderStatusError ProviderStatus = "error"
)
// ProviderOutcome classifies one ai_providers row, including disabled
// and errored rows the pool excludes. Err is populated only when
// Status == ProviderStatusError; the build error is already logged at
// the call site.
type ProviderOutcome struct {
Name string
Type string
Status ProviderStatus
Err error
}
+6
View File
@@ -120,11 +120,17 @@ deployment. They will always be available from the agent.
| `coder_aibridged_non_injected_tool_selections_total` | counter | The number of times an AI model selected a tool to be invoked by the client. | `model` `name` `provider` |
| `coder_aibridged_passthrough_total` | counter | The count of requests which were not intercepted but passed through to the upstream. | `method` `provider` `route` |
| `coder_aibridged_prompts_total` | counter | The number of prompts issued by users (initiators). | `initiator_id` `model` `provider` |
| `coder_aibridged_provider_info` | gauge | One series per configured AI provider. Value is always 1; the status label (enabled, disabled, error) carries the alertable signal. | `provider_name` `provider_type` `status` |
| `coder_aibridged_providers_last_reload_success_timestamp_seconds` | gauge | Unix timestamp of the last provider reload that successfully refreshed the pool. A gap against coder_aibridged_providers_last_reload_timestamp_seconds means the loop is firing but the refresh function is failing. | |
| `coder_aibridged_providers_last_reload_timestamp_seconds` | gauge | Unix timestamp of the last provider reload attempt, success or failure. | |
| `coder_aibridged_tokens_total` | counter | The number of tokens used by intercepted requests. | `initiator_id` `model` `provider` `type` |
| `coder_aibridgeproxyd_connect_sessions_total` | counter | Total number of CONNECT sessions established. | `type` |
| `coder_aibridgeproxyd_inflight_mitm_requests` | gauge | Number of MITM requests currently being processed. | `provider` |
| `coder_aibridgeproxyd_mitm_requests_total` | counter | Total number of MITM requests handled by the proxy. | `provider` |
| `coder_aibridgeproxyd_mitm_responses_total` | counter | Total number of MITM responses by HTTP status code class. | `code` `provider` |
| `coder_aibridgeproxyd_provider_info` | gauge | One series per configured AI provider. Value is always 1; the status label (enabled, disabled, error) carries the alertable signal. | `provider_name` `provider_type` `status` |
| `coder_aibridgeproxyd_providers_last_reload_success_timestamp_seconds` | gauge | Unix timestamp of the last provider reload that successfully refreshed the router. A gap against coder_aibridgeproxyd_providers_last_reload_timestamp_seconds means the loop is firing but the refresh function is failing. | |
| `coder_aibridgeproxyd_providers_last_reload_timestamp_seconds` | gauge | Unix timestamp of the last provider reload attempt, success or failure. | |
| `coder_derp_server_accepts_total` | counter | Total DERP connections accepted. | |
| `coder_derp_server_average_queue_duration_ms` | gauge | Average queue duration in milliseconds. | |
| `coder_derp_server_bytes_received_total` | counter | Total bytes received. | |
@@ -35,6 +35,7 @@ import (
"cdr.dev/slog/v3/sloggers/slogtest"
"github.com/coder/coder/v2/aibridge"
agplaibridge "github.com/coder/coder/v2/coderd/aibridge"
"github.com/coder/coder/v2/coderd/aibridged"
"github.com/coder/coder/v2/enterprise/aibridgeproxyd"
"github.com/coder/coder/v2/testutil"
)
@@ -209,10 +210,12 @@ func withProviderHosts(hosts ...string) testProxyOption {
host = h
}
providers = append(providers, aibridgeproxyd.ReloadedProvider{
Name: name,
Type: "openai",
Host: strings.ToLower(host),
Status: aibridgeproxyd.ProviderStatusEnabled,
ProviderOutcome: aibridged.ProviderOutcome{
Name: name,
Type: "openai",
Status: aibridged.ProviderStatusEnabled,
},
Host: strings.ToLower(host),
})
}
cfg.providers = providers
@@ -296,8 +299,8 @@ func newTestProxy(t *testing.T, opts ...testProxyOption) *aibridgeproxyd.Server
// loopback, are reachable. Tests that verify IP blocking override this.
allowedPrivateCIDRs: []string{"127.0.0.1/32"},
providers: []aibridgeproxyd.ReloadedProvider{
{Name: "test-provider", Type: "openai", Host: "127.0.0.1", Status: aibridgeproxyd.ProviderStatusEnabled},
{Name: "test-provider", Type: "openai", Host: "localhost", Status: aibridgeproxyd.ProviderStatusEnabled},
{ProviderOutcome: aibridged.ProviderOutcome{Name: "test-provider", Type: "openai", Status: aibridged.ProviderStatusEnabled}, Host: "127.0.0.1"},
{ProviderOutcome: aibridged.ProviderOutcome{Name: "test-provider", Type: "openai", Status: aibridged.ProviderStatusEnabled}, Host: "localhost"},
},
}
for _, opt := range opts {
@@ -2077,10 +2080,12 @@ func TestProxy_MITM_CustomProvider(t *testing.T) {
srv := newTestProxy(t,
withCoderAccessURL(aibridgedServer.URL),
withProviders(aibridgeproxyd.ReloadedProvider{
Name: openrouterProvider,
Type: "openai",
Host: openrouterDomain,
Status: aibridgeproxyd.ProviderStatusEnabled,
ProviderOutcome: aibridged.ProviderOutcome{
Name: openrouterProvider,
Type: "openai",
Status: aibridged.ProviderStatusEnabled,
},
Host: openrouterDomain,
}),
)
+33
View File
@@ -30,6 +30,21 @@ type Metrics struct {
// Labels: code (HTTP status code), provider
// Cardinality is bounded: ~100 used status codes x few providers.
MITMResponsesTotal *prometheus.CounterVec
// ProviderInfo is one series per configured provider; value is
// always 1 and the status label carries the alertable signal.
// Labels: provider_name, provider_type, status.
ProviderInfo *prometheus.GaugeVec
// ProvidersLastReloadTimestampSeconds is the unix timestamp of the
// last reload attempt, success or failure.
ProvidersLastReloadTimestampSeconds prometheus.Gauge
// ProvidersLastReloadSuccessTimestampSeconds is the unix timestamp
// of the last reload that successfully refreshed the router. A gap
// against ProvidersLastReloadTimestampSeconds means the loop is
// firing but the refresh function is failing.
ProvidersLastReloadSuccessTimestampSeconds prometheus.Gauge
}
// NewMetrics creates and registers all metrics for aibridgeproxyd.
@@ -58,6 +73,21 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
Name: "mitm_responses_total",
Help: "Total number of MITM responses by HTTP status code class.",
}, []string{"code", "provider"}),
ProviderInfo: factory.NewGaugeVec(prometheus.GaugeOpts{
Name: "provider_info",
Help: "One series per configured AI provider. Value is always 1; the status label (enabled, disabled, error) carries the alertable signal.",
}, []string{"provider_name", "provider_type", "status"}),
ProvidersLastReloadTimestampSeconds: factory.NewGauge(prometheus.GaugeOpts{
Name: "providers_last_reload_timestamp_seconds",
Help: "Unix timestamp of the last provider reload attempt, success or failure.",
}),
ProvidersLastReloadSuccessTimestampSeconds: factory.NewGauge(prometheus.GaugeOpts{
Name: "providers_last_reload_success_timestamp_seconds",
Help: "Unix timestamp of the last provider reload that successfully refreshed the router. A gap against coder_aibridgeproxyd_providers_last_reload_timestamp_seconds means the loop is firing but the refresh function is failing.",
}),
}
}
@@ -67,4 +97,7 @@ func (m *Metrics) Unregister() {
m.registerer.Unregister(m.MITMRequestsTotal)
m.registerer.Unregister(m.InflightMITMRequests)
m.registerer.Unregister(m.MITMResponsesTotal)
m.registerer.Unregister(m.ProviderInfo)
m.registerer.Unregister(m.ProvidersLastReloadTimestampSeconds)
m.registerer.Unregister(m.ProvidersLastReloadSuccessTimestampSeconds)
}
@@ -0,0 +1,135 @@
package aibridgeproxyd
import (
"context"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"cdr.dev/slog/v3/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/aibridged"
"github.com/coder/coder/v2/testutil"
)
// TestReloadUpdatesProviderMetrics covers the provider_info GaugeVec
// surface: every reload pass rewrites the series for the current
// snapshot, including disabled and errored rows; the Reset on each
// reload drops series for providers that have left the configuration.
func TestReloadUpdatesProviderMetrics(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
metrics := NewMetrics(reg)
reload := ProviderReload{Providers: []ReloadedProvider{
{ProviderOutcome: aibridged.ProviderOutcome{Name: "alpha", Type: "openai", Status: aibridged.ProviderStatusEnabled}, Host: "alpha.example.com"},
{ProviderOutcome: aibridged.ProviderOutcome{Name: "beta", Type: "anthropic", Status: aibridged.ProviderStatusDisabled}},
{ProviderOutcome: aibridged.ProviderOutcome{Name: "gamma", Type: "openai", Status: aibridged.ProviderStatusError, Err: xerrors.New("bad config")}},
}}
ctx := testutil.Context(t, testutil.WaitShort)
srv := &Server{
ctx: ctx,
logger: slogtest.Make(t, nil),
allowedPorts: []string{"443"},
metrics: metrics,
refreshProviders: func(context.Context) (ProviderReload, error) {
return reload, nil
},
}
srv.providerRouter.Store(emptyProviderRouter)
before := time.Now().Unix()
require.NoError(t, srv.Reload(ctx))
after := time.Now().Unix()
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.ProviderInfo.WithLabelValues("alpha", "openai", "enabled")))
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.ProviderInfo.WithLabelValues("beta", "anthropic", "disabled")))
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.ProviderInfo.WithLabelValues("gamma", "openai", "error")))
attemptTS := int64(promtest.ToFloat64(metrics.ProvidersLastReloadTimestampSeconds))
successTS := int64(promtest.ToFloat64(metrics.ProvidersLastReloadSuccessTimestampSeconds))
assert.GreaterOrEqual(t, attemptTS, before)
assert.LessOrEqual(t, attemptTS, after)
assert.GreaterOrEqual(t, successTS, before)
assert.LessOrEqual(t, successTS, after)
}
// TestReloadResetsStaleProviderSeries verifies that providers removed
// between reloads do not leave behind stale series. Without Reset, a
// removed provider's last-seen value would persist for 5+ minutes and
// could fire alerts despite the provider no longer being configured.
func TestReloadResetsStaleProviderSeries(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
metrics := NewMetrics(reg)
current := ProviderReload{Providers: []ReloadedProvider{
{ProviderOutcome: aibridged.ProviderOutcome{Name: "alpha", Type: "openai", Status: aibridged.ProviderStatusEnabled}, Host: "alpha.example.com"},
{ProviderOutcome: aibridged.ProviderOutcome{Name: "beta", Type: "anthropic", Status: aibridged.ProviderStatusEnabled}, Host: "beta.example.com"},
}}
ctx := testutil.Context(t, testutil.WaitShort)
srv := &Server{
ctx: ctx,
logger: slogtest.Make(t, nil),
allowedPorts: []string{"443"},
metrics: metrics,
refreshProviders: func(context.Context) (ProviderReload, error) {
return current, nil
},
}
srv.providerRouter.Store(emptyProviderRouter)
require.NoError(t, srv.Reload(ctx))
require.Equal(t, 2, promtest.CollectAndCount(metrics.ProviderInfo))
current = ProviderReload{Providers: []ReloadedProvider{
{ProviderOutcome: aibridged.ProviderOutcome{Name: "alpha", Type: "openai", Status: aibridged.ProviderStatusEnabled}, Host: "alpha.example.com"},
}}
require.NoError(t, srv.Reload(ctx))
assert.Equal(t, 1, promtest.CollectAndCount(metrics.ProviderInfo),
"beta should have been Reset out of the GaugeVec")
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.ProviderInfo.WithLabelValues("alpha", "openai", "enabled")))
}
// TestReloadAttemptTimestampUpdatesOnFailure asserts the attempt-time
// gauge advances even when the refresh function fails, while the
// success-time gauge does not.
func TestReloadAttemptTimestampUpdatesOnFailure(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
metrics := NewMetrics(reg)
refreshErr := xerrors.New("simulated failure")
ctx := testutil.Context(t, testutil.WaitShort)
srv := &Server{
ctx: ctx,
logger: slogtest.Make(t, nil),
allowedPorts: []string{"443"},
metrics: metrics,
refreshProviders: func(context.Context) (ProviderReload, error) {
return ProviderReload{}, refreshErr
},
}
srv.providerRouter.Store(emptyProviderRouter)
before := time.Now().Unix()
err := srv.Reload(ctx)
require.ErrorIs(t, err, refreshErr)
after := time.Now().Unix()
attemptTS := int64(promtest.ToFloat64(metrics.ProvidersLastReloadTimestampSeconds))
successTS := int64(promtest.ToFloat64(metrics.ProvidersLastReloadSuccessTimestampSeconds))
assert.GreaterOrEqual(t, attemptTS, before)
assert.LessOrEqual(t, attemptTS, after)
assert.Equal(t, int64(0), successTS, "success timestamp must not advance on failure")
}
+45 -35
View File
@@ -5,40 +5,21 @@ import (
"net/http"
"slices"
"strings"
"time"
"github.com/elazarl/goproxy"
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/coderd/aibridged"
)
// ProviderStatus describes the lifecycle state of a configured AI
// provider for observability and routing purposes.
type ProviderStatus string
const (
// ProviderStatusEnabled means the provider is configured, valid, and
// included in the active routing snapshot.
ProviderStatusEnabled ProviderStatus = "enabled"
// ProviderStatusDisabled means the provider exists in configuration
// but is intentionally turned off by an operator.
ProviderStatusDisabled ProviderStatus = "disabled"
// ProviderStatusError means the provider exists in configuration but
// cannot be routed to because of a validation failure (missing or
// invalid base URL, duplicate host, etc.).
ProviderStatusError ProviderStatus = "error"
)
// ReloadedProvider is one row from the provider configuration together
// with the outcome of evaluating it for routing. Host is populated only
// when Status == ProviderStatusEnabled; Err is populated only when
// Status == ProviderStatusError.
// ReloadedProvider is the classification of one ai_providers row.
// Host is the routable hostname; it's populated only when the embedded
// outcome's Status == aibridged.ProviderStatusEnabled.
type ReloadedProvider struct {
Name string
Type string
Host string
Status ProviderStatus
Err error
aibridged.ProviderOutcome
Host string
}
// ProviderReload is the result of a single refresh pass: every
@@ -47,8 +28,8 @@ type ProviderReload struct {
Providers []ReloadedProvider
}
// RefreshProvidersFunc returns the live provider classification used by
// Reload to rebuild the proxy's routing snapshot.
// RefreshProvidersFunc returns the live provider classification used
// by Reload to rebuild the proxy's routing snapshot.
type RefreshProvidersFunc func(ctx context.Context) (ProviderReload, error)
// Reload refreshes proxy routing from the configured provider source.
@@ -57,6 +38,7 @@ func (s *Server) Reload(ctx context.Context) error {
if s.refreshProviders == nil {
return nil
}
s.recordReloadAttempt()
reload, err := s.refreshProviders(ctx)
if err != nil {
return xerrors.Errorf("refresh ai providers for proxy routing: %w", err)
@@ -67,13 +49,14 @@ func (s *Server) Reload(ctx context.Context) error {
}
s.providerRouter.Store(router)
for _, p := range reload.Providers {
if p.Status == ProviderStatusError {
if p.Status == aibridged.ProviderStatusError {
s.logger.Warn(s.ctx, "provider excluded from routing",
slog.F("provider", p.Name),
slog.Error(p.Err),
)
}
}
s.recordReloadSuccess(reload)
s.logger.Debug(s.ctx, "aibridgeproxyd router reloaded",
slog.F("provider_count", len(reload.Providers)),
slog.F("mitm_host_count", len(router.mitmHosts)),
@@ -82,6 +65,32 @@ func (s *Server) Reload(ctx context.Context) error {
return nil
}
// recordReloadAttempt stamps the attempt-time gauge at the start of a
// Reload. A reload that hangs mid-flight is detected by watching the
// gap between this gauge and ProvidersLastReloadSuccessTimestampSeconds.
func (s *Server) recordReloadAttempt() {
if s.metrics == nil {
return
}
s.metrics.ProvidersLastReloadTimestampSeconds.Set(float64(time.Now().Unix()))
}
// recordReloadSuccess rewrites the provider_info GaugeVec from the
// classified reload and stamps the success-time gauge. Reset clears
// series for providers that have left the configuration so they don't
// linger as stale.
func (s *Server) recordReloadSuccess(reload ProviderReload) {
if s.metrics == nil {
return
}
outcomes := make([]aibridged.ProviderOutcome, len(reload.Providers))
for i, p := range reload.Providers {
outcomes[i] = p.ProviderOutcome
}
aibridged.WriteProviderInfoSnapshot(s.metrics.ProviderInfo, outcomes)
s.metrics.ProvidersLastReloadSuccessTimestampSeconds.Set(float64(time.Now().Unix()))
}
func (s *Server) loadProviderRouter() *providerRouter {
if p := s.providerRouter.Load(); p != nil {
return p
@@ -103,16 +112,17 @@ func (s *Server) mitmHostsCondition() goproxy.ReqConditionFunc {
}
// buildProviderRouter constructs a router snapshot from a classified
// provider reload. Only providers with Status == ProviderStatusEnabled
// are included in the active routing tables; the refresh function is
// responsible for classifying disabled and errored rows. First entry
// wins on duplicate hostnames as a defense-in-depth measure even though
// the refresh function should mark duplicates as errors.
// provider reload. Only providers with Status ==
// aibridged.ProviderStatusEnabled are included in the active routing
// tables; the refresh function is responsible for classifying disabled
// and errored rows. First entry wins on duplicate hostnames as a
// defense-in-depth measure even though the refresh function should
// mark duplicates as errors.
func buildProviderRouter(reload ProviderReload, allowedPorts []string) (*providerRouter, error) {
nameByHost := make(map[string]string, len(reload.Providers))
domains := make([]string, 0, len(reload.Providers))
for _, p := range reload.Providers {
if p.Status != ProviderStatusEnabled {
if p.Status != aibridged.ProviderStatusEnabled {
continue
}
host := strings.ToLower(p.Host)
@@ -9,15 +9,18 @@ import (
"golang.org/x/xerrors"
"cdr.dev/slog/v3/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/aibridged"
"github.com/coder/coder/v2/testutil"
)
func enabledProvider(name, host string) ReloadedProvider {
return ReloadedProvider{
Name: name,
Type: "openai",
Host: host,
Status: ProviderStatusEnabled,
ProviderOutcome: aibridged.ProviderOutcome{
Name: name,
Type: "openai",
Status: aibridged.ProviderStatusEnabled,
},
Host: host,
}
}
@@ -96,8 +99,8 @@ func TestBuildProviderRouter(t *testing.T) {
enabledProvider("custom", "custom-llm.example.com"),
// Host is populated on the non-enabled rows so the Status
// guard, not the empty-host guard, is what excludes them.
{Name: "off", Type: "openai", Host: "disabled.example.com", Status: ProviderStatusDisabled},
{Name: "bad", Type: "openai", Host: "errored.example.com", Status: ProviderStatusError, Err: xerrors.New("nope")},
{ProviderOutcome: aibridged.ProviderOutcome{Name: "off", Type: "openai", Status: aibridged.ProviderStatusDisabled}, Host: "disabled.example.com"},
{ProviderOutcome: aibridged.ProviderOutcome{Name: "bad", Type: "openai", Status: aibridged.ProviderStatusError, Err: xerrors.New("nope")}, Host: "errored.example.com"},
}}
router, err := buildProviderRouter(reload, []string{"443"})
@@ -121,7 +124,7 @@ func TestBuildProviderRouter(t *testing.T) {
t.Parallel()
reload := ProviderReload{Providers: []ReloadedProvider{
{Name: "provider", Type: "openai", Host: "API.Example.COM", Status: ProviderStatusEnabled},
{ProviderOutcome: aibridged.ProviderOutcome{Name: "provider", Type: "openai", Status: aibridged.ProviderStatusEnabled}, Host: "API.Example.COM"},
}}
router, err := buildProviderRouter(reload, []string{"443"})
@@ -152,7 +155,7 @@ func TestBuildProviderRouter(t *testing.T) {
t.Parallel()
reload := ProviderReload{Providers: []ReloadedProvider{
{Name: "no-host", Type: "openai", Status: ProviderStatusEnabled},
{ProviderOutcome: aibridged.ProviderOutcome{Name: "no-host", Type: "openai", Status: aibridged.ProviderStatusEnabled}},
enabledProvider("good", "api.good.example.com"),
}}
+58 -8
View File
@@ -11,10 +11,13 @@ import (
"sync"
"testing"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/coder/coder/v2/coderd/aibridged"
"github.com/coder/coder/v2/enterprise/aibridgeproxyd"
"github.com/coder/coder/v2/testutil"
)
@@ -28,6 +31,7 @@ type reloadTestHarness struct {
client *http.Client
bridged *httptest.Server
recorder *aibridgedRecorder
metrics *aibridgeproxyd.Metrics
}
// aibridgedRecorder captures the path of the last request received by
@@ -106,32 +110,34 @@ func (s *providerStore) refresh(context.Context) (aibridgeproxyd.ProviderReload,
// classifyRaw mirrors the production classifier in enterprise/cli so
// the reload tests exercise the same validation rules end-to-end.
func classifyRaw(p rawProvider, seenHost map[string]string) aibridgeproxyd.ReloadedProvider {
out := aibridgeproxyd.ReloadedProvider{Name: p.name, Type: "openai"}
out := aibridgeproxyd.ReloadedProvider{
ProviderOutcome: aibridged.ProviderOutcome{Name: p.name, Type: "openai"},
}
if strings.TrimSpace(p.baseURL) == "" {
out.Status = aibridgeproxyd.ProviderStatusError
out.Status = aibridged.ProviderStatusError
out.Err = xerrors.New("base url is empty")
return out
}
u, err := url.Parse(p.baseURL)
if err != nil {
out.Status = aibridgeproxyd.ProviderStatusError
out.Status = aibridged.ProviderStatusError
out.Err = xerrors.Errorf("invalid base url %q: %w", p.baseURL, err)
return out
}
host := strings.ToLower(u.Hostname())
if host == "" {
out.Status = aibridgeproxyd.ProviderStatusError
out.Status = aibridged.ProviderStatusError
out.Err = xerrors.Errorf("base url %q has no hostname", p.baseURL)
return out
}
if claimedBy, taken := seenHost[host]; taken {
out.Status = aibridgeproxyd.ProviderStatusError
out.Status = aibridged.ProviderStatusError
out.Err = xerrors.Errorf("hostname %q already claimed by provider %q", host, claimedBy)
return out
}
seenHost[host] = p.name
out.Host = host
out.Status = aibridgeproxyd.ProviderStatusEnabled
out.Status = aibridged.ProviderStatusEnabled
return out
}
@@ -151,10 +157,12 @@ func newReloadTestHarness(t *testing.T) *reloadTestHarness {
t.Cleanup(bridged.Close)
store := &providerStore{}
metrics := aibridgeproxyd.NewMetrics(prometheus.NewRegistry())
srv := newTestProxy(t,
withCoderAccessURL(bridged.URL),
withAllowedPorts("443"),
withRefreshProviders(store.refresh),
withMetrics(metrics),
)
certPool := getProxyCertPool(t)
@@ -169,6 +177,7 @@ func newReloadTestHarness(t *testing.T) *reloadTestHarness {
return &reloadTestHarness{
srv: srv,
store: store,
metrics: metrics,
client: client,
bridged: bridged,
recorder: recorder,
@@ -236,6 +245,25 @@ func (h *reloadTestHarness) expectNotRouted(t *testing.T, targetURL string) {
"aibridged must not be reached for non-routed host %s", targetURL)
}
// expectProviderStatus asserts the provider_info series for (name,
// status) is present with value 1.
func (h *reloadTestHarness) expectProviderStatus(t *testing.T, name, status string) {
t.Helper()
assert.Equal(t, 1.0, promtest.ToFloat64(h.metrics.ProviderInfo.WithLabelValues(name, "openai", status)),
"expected provider_info{provider_name=%q, status=%q} == 1", name, status)
}
// expectProviderAbsent asserts no series exists for the provider name
// in any status. This verifies the GaugeVec.Reset on each reload
// clears stale entries.
func (h *reloadTestHarness) expectProviderAbsent(t *testing.T, name string) {
t.Helper()
for _, status := range []string{"enabled", "disabled", "error"} {
assert.Equal(t, 0.0, promtest.ToFloat64(h.metrics.ProviderInfo.WithLabelValues(name, "openai", status)),
"expected no provider_info series for %q, found status %q", name, status)
}
}
// TestProxy_StaleTunnelStopsRoutingAfterProviderChange is the
// regression test for a bug where a long-lived CONNECT tunnel that was
// established while a provider was enabled kept routing decrypted
@@ -377,14 +405,18 @@ func TestProxy_HotReloadRoutingCRUD(t *testing.T) {
})
require.NoError(t, h.srv.Reload(t.Context()))
h.expectRoutedTo(t, "https://alpha.invalid/v1/messages", "/api/v2/aibridge/alpha/v1/messages")
h.expectProviderStatus(t, "alpha", "enabled")
// UpdateProviderName: the same BaseURL with a new name must route
// under the new name on the next Reload.
// under the new name on the next Reload. The renamed provider must
// not leave a stale alpha series behind.
h.store.set([]rawProvider{
{name: "alpha-v2", baseURL: "https://alpha.invalid/v1"},
})
require.NoError(t, h.srv.Reload(t.Context()))
h.expectRoutedTo(t, "https://alpha.invalid/v1/messages", "/api/v2/aibridge/alpha-v2/v1/messages")
h.expectProviderStatus(t, "alpha-v2", "enabled")
h.expectProviderAbsent(t, "alpha")
// UpdateProviderBaseURLHost: moving the provider to a new host must
// start MITM'ing the new host and stop MITM'ing the old one.
@@ -394,6 +426,7 @@ func TestProxy_HotReloadRoutingCRUD(t *testing.T) {
require.NoError(t, h.srv.Reload(t.Context()))
h.expectRoutedTo(t, "https://alpha-new.invalid/v1/messages", "/api/v2/aibridge/alpha-v2/v1/messages")
h.expectNotRouted(t, "https://alpha.invalid/v1/messages")
h.expectProviderStatus(t, "alpha-v2", "enabled")
// AddSecondProvider: a second provider added in the same Reload must
// route independently from the first.
@@ -404,15 +437,19 @@ func TestProxy_HotReloadRoutingCRUD(t *testing.T) {
require.NoError(t, h.srv.Reload(t.Context()))
h.expectRoutedTo(t, "https://alpha-new.invalid/v1/messages", "/api/v2/aibridge/alpha-v2/v1/messages")
h.expectRoutedTo(t, "https://beta.invalid/v1/chat/completions", "/api/v2/aibridge/beta/v1/chat/completions")
h.expectProviderStatus(t, "alpha-v2", "enabled")
h.expectProviderStatus(t, "beta", "enabled")
// DeleteOneProvider: removing alpha must keep beta routed and stop
// routing alpha.
// routing alpha. The deleted name disappears from provider_info.
h.store.set([]rawProvider{
{name: "beta", baseURL: "https://beta.invalid/v1"},
})
require.NoError(t, h.srv.Reload(t.Context()))
h.expectRoutedTo(t, "https://beta.invalid/v1/chat/completions", "/api/v2/aibridge/beta/v1/chat/completions")
h.expectNotRouted(t, "https://alpha-new.invalid/v1/messages")
h.expectProviderStatus(t, "beta", "enabled")
h.expectProviderAbsent(t, "alpha-v2")
// DeleteAllProviders: an empty Reload must collapse the router to
// the fail-closed state with no host MITM'd.
@@ -420,6 +457,7 @@ func TestProxy_HotReloadRoutingCRUD(t *testing.T) {
require.NoError(t, h.srv.Reload(t.Context()))
h.expectNotRouted(t, "https://beta.invalid/v1/chat/completions")
h.expectNotRouted(t, "https://alpha-new.invalid/v1/messages")
h.expectProviderAbsent(t, "beta")
// RecreateAfterDelete: reintroducing a previously-deleted provider
// must route again without restart, confirming the swap is
@@ -429,6 +467,11 @@ func TestProxy_HotReloadRoutingCRUD(t *testing.T) {
})
require.NoError(t, h.srv.Reload(t.Context()))
h.expectRoutedTo(t, "https://alpha.invalid/v1/messages", "/api/v2/aibridge/alpha/v1/messages")
h.expectProviderStatus(t, "alpha", "enabled")
// Both timestamp gauges must have advanced through this sequence.
assert.Positive(t, promtest.ToFloat64(h.metrics.ProvidersLastReloadTimestampSeconds))
assert.Positive(t, promtest.ToFloat64(h.metrics.ProvidersLastReloadSuccessTimestampSeconds))
}
// TestProxy_HotReloadRoutingInvalidProviders covers the resilience
@@ -453,6 +496,8 @@ func TestProxy_HotReloadRoutingInvalidProviders(t *testing.T) {
require.NoError(t, h.srv.Reload(t.Context()))
h.expectRoutedTo(t, "https://valid.invalid/v1/messages", "/api/v2/aibridge/valid/v1/messages")
h.expectProviderStatus(t, "no-url", "error")
h.expectProviderStatus(t, "valid", "enabled")
})
t.Run("MalformedBaseURLSkipped", func(t *testing.T) {
@@ -470,6 +515,9 @@ func TestProxy_HotReloadRoutingInvalidProviders(t *testing.T) {
require.NoError(t, h.srv.Reload(t.Context()))
h.expectRoutedTo(t, "https://valid.invalid/v1/messages", "/api/v2/aibridge/valid/v1/messages")
h.expectProviderStatus(t, "malformed", "error")
h.expectProviderStatus(t, "no-host", "error")
h.expectProviderStatus(t, "valid", "enabled")
})
t.Run("DuplicateHostFirstWins", func(t *testing.T) {
@@ -485,6 +533,8 @@ func TestProxy_HotReloadRoutingInvalidProviders(t *testing.T) {
require.NoError(t, h.srv.Reload(t.Context()))
h.expectRoutedTo(t, "https://shared.invalid/v1/messages", "/api/v2/aibridge/first/v1/messages")
h.expectProviderStatus(t, "first", "enabled")
h.expectProviderStatus(t, "second", "error")
})
t.Run("AllInvalidYieldsEmptyRouter", func(t *testing.T) {
+10 -8
View File
@@ -118,37 +118,39 @@ func refreshProxyProviders(db database.Store) aibridgeproxyd.RefreshProvidersFun
// hostname so later duplicates can be flagged as errors.
func classifyProviderRow(row database.AIProvider, seenHost map[string]string) aibridgeproxyd.ReloadedProvider {
out := aibridgeproxyd.ReloadedProvider{
Name: row.Name,
Type: string(row.Type),
ProviderOutcome: aibridged.ProviderOutcome{
Name: row.Name,
Type: string(row.Type),
},
}
if !row.Enabled {
out.Status = aibridgeproxyd.ProviderStatusDisabled
out.Status = aibridged.ProviderStatusDisabled
return out
}
if strings.TrimSpace(row.BaseUrl) == "" {
out.Status = aibridgeproxyd.ProviderStatusError
out.Status = aibridged.ProviderStatusError
out.Err = xerrors.New("base url is empty")
return out
}
u, err := url.Parse(row.BaseUrl)
if err != nil {
out.Status = aibridgeproxyd.ProviderStatusError
out.Status = aibridged.ProviderStatusError
out.Err = xerrors.Errorf("invalid base url %q: %w", row.BaseUrl, err)
return out
}
host := strings.ToLower(u.Hostname())
if host == "" {
out.Status = aibridgeproxyd.ProviderStatusError
out.Status = aibridged.ProviderStatusError
out.Err = xerrors.Errorf("base url %q has no hostname", row.BaseUrl)
return out
}
if claimedBy, taken := seenHost[host]; taken {
out.Status = aibridgeproxyd.ProviderStatusError
out.Status = aibridged.ProviderStatusError
out.Err = xerrors.Errorf("hostname %q already claimed by provider %q", host, claimedBy)
return out
}
seenHost[host] = row.Name
out.Host = host
out.Status = aibridgeproxyd.ProviderStatusEnabled
out.Status = aibridged.ProviderStatusEnabled
return out
}
@@ -7,8 +7,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/coder/coder/v2/coderd/aibridged"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/enterprise/aibridgeproxyd"
)
// TestClassifyProviderRow covers every branch of the classifier so the
@@ -34,7 +34,7 @@ func TestClassifyProviderRow(t *testing.T) {
got := classifyProviderRow(enabledRow("openai", "https://api.openai.com/v1"), seen)
assert.Equal(t, "openai", got.Name)
assert.Equal(t, string(database.AiProviderTypeOpenai), got.Type)
assert.Equal(t, aibridgeproxyd.ProviderStatusEnabled, got.Status)
assert.Equal(t, aibridged.ProviderStatusEnabled, got.Status)
assert.Equal(t, "api.openai.com", got.Host)
assert.NoError(t, got.Err)
assert.Equal(t, "openai", seen["api.openai.com"])
@@ -47,7 +47,7 @@ func TestClassifyProviderRow(t *testing.T) {
row := enabledRow("off", "https://api.off.example.com/v1")
row.Enabled = false
got := classifyProviderRow(row, seen)
assert.Equal(t, aibridgeproxyd.ProviderStatusDisabled, got.Status)
assert.Equal(t, aibridged.ProviderStatusDisabled, got.Status)
assert.Empty(t, got.Host, "disabled provider must not claim a host")
assert.NoError(t, got.Err)
assert.Empty(t, seen, "disabled provider must not occupy a host slot")
@@ -58,7 +58,7 @@ func TestClassifyProviderRow(t *testing.T) {
seen := map[string]string{}
got := classifyProviderRow(enabledRow("no-url", " "), seen)
assert.Equal(t, aibridgeproxyd.ProviderStatusError, got.Status)
assert.Equal(t, aibridged.ProviderStatusError, got.Status)
assert.Empty(t, got.Host)
assert.ErrorContains(t, got.Err, "base url is empty")
})
@@ -68,7 +68,7 @@ func TestClassifyProviderRow(t *testing.T) {
seen := map[string]string{}
got := classifyProviderRow(enabledRow("bad", "://not-a-url"), seen)
assert.Equal(t, aibridgeproxyd.ProviderStatusError, got.Status)
assert.Equal(t, aibridged.ProviderStatusError, got.Status)
assert.ErrorContains(t, got.Err, "invalid base url")
})
@@ -77,7 +77,7 @@ func TestClassifyProviderRow(t *testing.T) {
seen := map[string]string{}
got := classifyProviderRow(enabledRow("no-host", "https://"), seen)
assert.Equal(t, aibridgeproxyd.ProviderStatusError, got.Status)
assert.Equal(t, aibridged.ProviderStatusError, got.Status)
assert.ErrorContains(t, got.Err, "no hostname")
})
@@ -86,10 +86,10 @@ func TestClassifyProviderRow(t *testing.T) {
seen := map[string]string{}
first := classifyProviderRow(enabledRow("first", "https://shared.example.com/v1"), seen)
assert.Equal(t, aibridgeproxyd.ProviderStatusEnabled, first.Status)
assert.Equal(t, aibridged.ProviderStatusEnabled, first.Status)
second := classifyProviderRow(enabledRow("second", "https://shared.example.com/v2"), seen)
assert.Equal(t, aibridgeproxyd.ProviderStatusError, second.Status)
assert.Equal(t, aibridged.ProviderStatusError, second.Status)
assert.ErrorContains(t, second.Err, "already claimed by provider \"first\"")
assert.Equal(t, "first", seen["shared.example.com"], "first wins must not be overwritten")
})
@@ -99,7 +99,7 @@ func TestClassifyProviderRow(t *testing.T) {
seen := map[string]string{}
got := classifyProviderRow(enabledRow("mixed", "https://API.Example.COM/v1"), seen)
assert.Equal(t, aibridgeproxyd.ProviderStatusEnabled, got.Status)
assert.Equal(t, aibridged.ProviderStatusEnabled, got.Status)
assert.Equal(t, "api.example.com", got.Host)
})
}
+59 -11
View File
@@ -9,6 +9,8 @@ import (
"sync/atomic"
"testing"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
@@ -54,7 +56,7 @@ func newMockUpstream(t *testing.T, name string) *mockUpstream {
// the supplied API and subscribes it to ai_providers change events.
// This mirrors what cli/server.go does in production so /api/v2/aibridge
// requests dispatch through the real pool and reloader.
func startTestAIBridgeDaemon(t *testing.T, api *coderd.API) {
func startTestAIBridgeDaemon(t *testing.T, api *coderd.API) *aibridged.Metrics {
t.Helper()
ctx := context.Background()
@@ -62,14 +64,15 @@ func startTestAIBridgeDaemon(t *testing.T, api *coderd.API) {
cfg := api.DeploymentValues.AI.BridgeConfig
tracer := otel.Tracer("aibridge-reload-test")
providers, err := cli.BuildProviders(ctx, api.Database, cfg, logger)
providers, _, err := cli.BuildProviders(ctx, api.Database, cfg, logger)
require.NoError(t, err)
pool, err := aibridged.NewCachedBridgePool(aibridged.DefaultPoolOptions, providers, logger.Named("pool"), nil, tracer)
require.NoError(t, err)
t.Cleanup(func() { _ = pool.Shutdown(context.Background()) })
reloader := &testPoolReloader{pool: pool, db: api.Database, cfg: cfg, logger: logger.Named("reloader")}
metrics := aibridged.NewMetrics(prometheus.NewRegistry())
reloader := &testPoolReloader{pool: pool, db: api.Database, cfg: cfg, logger: logger.Named("reloader"), metrics: metrics}
unsubscribe, err := aibridged.SubscribeProviderReload(ctx, api.Pubsub, reloader, logger.Named("subscriber"))
require.NoError(t, err)
t.Cleanup(unsubscribe)
@@ -81,21 +84,25 @@ func startTestAIBridgeDaemon(t *testing.T, api *coderd.API) {
t.Cleanup(func() { _ = srv.Close() })
api.RegisterInMemoryAIBridgedHTTPHandler(srv)
return metrics
}
type testPoolReloader struct {
pool *aibridged.CachedBridgePool
db database.Store
cfg codersdk.AIBridgeConfig
logger slog.Logger
pool *aibridged.CachedBridgePool
db database.Store
cfg codersdk.AIBridgeConfig
logger slog.Logger
metrics *aibridged.Metrics
}
func (r *testPoolReloader) Reload(ctx context.Context) error {
providers, err := cli.BuildProviders(ctx, r.db, r.cfg, r.logger)
defer r.metrics.RecordReloadAttempt()
providers, outcomes, err := cli.BuildProviders(ctx, r.db, r.cfg, r.logger)
if err != nil {
return err
}
r.pool.ReplaceProviders(providers)
r.metrics.RecordReloadSuccess(outcomes)
return nil
}
@@ -124,7 +131,34 @@ func TestAIBridgeProviderHotReload(t *testing.T) {
},
})
startTestAIBridgeDaemon(t, api.AGPL)
metrics := startTestAIBridgeDaemon(t, api.AGPL)
// requireProviderStatus polls until the provider_info series for
// (name, status) settles to value 1. Reloads happen via pubsub, so
// the assertion has to be eventual.
requireProviderStatus := func(t *testing.T, name, status string) {
t.Helper()
require.Eventuallyf(t, func() bool {
return promtest.ToFloat64(metrics.ProviderInfo.WithLabelValues(name, "openai", status)) == 1
}, testutil.WaitShort, testutil.IntervalFast,
"expected provider_info{provider_name=%q, status=%q} == 1", name, status)
}
// requireProviderAbsent polls until no series exists for the
// provider name in any status. After a delete the Reset on the
// next reload must clear all previous status labels for the name.
requireProviderAbsent := func(t *testing.T, name string) {
t.Helper()
require.Eventuallyf(t, func() bool {
for _, status := range []string{"enabled", "disabled", "error"} {
if promtest.ToFloat64(metrics.ProviderInfo.WithLabelValues(name, "openai", status)) != 0 {
return false
}
}
return true
}, testutil.WaitShort, testutil.IntervalFast,
"expected provider_info series for %q to be cleared after delete", name)
}
ctx := testutil.Context(t, testutil.WaitLong)
@@ -188,6 +222,7 @@ func TestAIBridgeProviderHotReload(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "primary", created.Name)
requireRoutesTo(t, "primary", upstreamA)
requireProviderStatus(t, "primary", "enabled")
// 2. Update BaseURL: same name, now points at upstream B.
newBaseURL := upstreamB.server.URL
@@ -196,15 +231,17 @@ func TestAIBridgeProviderHotReload(t *testing.T) {
})
require.NoError(t, err)
requireRoutesTo(t, "primary", upstreamB)
requireProviderStatus(t, "primary", "enabled")
// 3. Disable: the provider drops out of the snapshot, requests
// stop reaching any upstream.
// stop reaching any upstream. The metric flips to "disabled".
disabled := false
_, err = client.UpdateAIProvider(ctx, "primary", codersdk.UpdateAIProviderRequest{
Enabled: &disabled,
})
require.NoError(t, err)
requireRoutingGone(t, "primary")
requireProviderStatus(t, "primary", "disabled")
// 4. Re-enable: routing comes back at the most recent BaseURL.
enabled := true
@@ -213,6 +250,7 @@ func TestAIBridgeProviderHotReload(t *testing.T) {
})
require.NoError(t, err)
requireRoutesTo(t, "primary", upstreamB)
requireProviderStatus(t, "primary", "enabled")
// 5. Add a second provider; both names must route independently.
_, err = client.CreateAIProvider(ctx, codersdk.CreateAIProviderRequest{
@@ -225,9 +263,19 @@ func TestAIBridgeProviderHotReload(t *testing.T) {
require.NoError(t, err)
requireRoutesTo(t, "primary", upstreamB)
requireRoutesTo(t, "secondary", upstreamA)
requireProviderStatus(t, "primary", "enabled")
requireProviderStatus(t, "secondary", "enabled")
// 6. Delete primary: only secondary remains routable.
// 6. Delete primary: only secondary remains routable. The
// provider_info series for primary disappears entirely on the
// next reload's Reset.
require.NoError(t, client.DeleteAIProvider(ctx, "primary"))
requireRoutingGone(t, "primary")
requireRoutesTo(t, "secondary", upstreamA)
requireProviderAbsent(t, "primary")
requireProviderStatus(t, "secondary", "enabled")
// Both timestamp gauges must have advanced during this test.
assert.Positive(t, promtest.ToFloat64(metrics.ProvidersLastReloadTimestampSeconds))
assert.Positive(t, promtest.ToFloat64(metrics.ProvidersLastReloadSuccessTimestampSeconds))
}
+18
View File
@@ -208,3 +208,21 @@ coder_aibridgeproxyd_mitm_requests_total{provider=""} 0
# HELP coder_aibridgeproxyd_mitm_responses_total Total number of MITM responses by HTTP status code class.
# TYPE coder_aibridgeproxyd_mitm_responses_total counter
coder_aibridgeproxyd_mitm_responses_total{code="",provider=""} 0
# HELP coder_aibridged_provider_info One series per configured AI provider. Value is always 1; the status label (enabled, disabled, error) carries the alertable signal.
# TYPE coder_aibridged_provider_info gauge
coder_aibridged_provider_info{provider_name="",provider_type="",status=""} 0
# HELP coder_aibridged_providers_last_reload_timestamp_seconds Unix timestamp of the last provider reload attempt, success or failure.
# TYPE coder_aibridged_providers_last_reload_timestamp_seconds gauge
coder_aibridged_providers_last_reload_timestamp_seconds 0
# HELP coder_aibridged_providers_last_reload_success_timestamp_seconds Unix timestamp of the last provider reload that successfully refreshed the pool. A gap against coder_aibridged_providers_last_reload_timestamp_seconds means the loop is firing but the refresh function is failing.
# TYPE coder_aibridged_providers_last_reload_success_timestamp_seconds gauge
coder_aibridged_providers_last_reload_success_timestamp_seconds 0
# HELP coder_aibridgeproxyd_provider_info One series per configured AI provider. Value is always 1; the status label (enabled, disabled, error) carries the alertable signal.
# TYPE coder_aibridgeproxyd_provider_info gauge
coder_aibridgeproxyd_provider_info{provider_name="",provider_type="",status=""} 0
# HELP coder_aibridgeproxyd_providers_last_reload_timestamp_seconds Unix timestamp of the last provider reload attempt, success or failure.
# TYPE coder_aibridgeproxyd_providers_last_reload_timestamp_seconds gauge
coder_aibridgeproxyd_providers_last_reload_timestamp_seconds 0
# HELP coder_aibridgeproxyd_providers_last_reload_success_timestamp_seconds Unix timestamp of the last provider reload that successfully refreshed the router. A gap against coder_aibridgeproxyd_providers_last_reload_timestamp_seconds means the loop is firing but the refresh function is failing.
# TYPE coder_aibridgeproxyd_providers_last_reload_success_timestamp_seconds gauge
coder_aibridgeproxyd_providers_last_reload_success_timestamp_seconds 0
+1
View File
@@ -40,6 +40,7 @@ var scanDirs = []string{
//
// eliminate the need for this skip list.
var skipPaths = []string{
"coderd/aibridged/metrics.go",
"enterprise/aibridgeproxyd/metrics.go",
}