feat: remove loadtest cmd, add new scaletest cmd (#5310)

This commit is contained in:
Dean Sheather
2022-12-16 01:04:24 +10:00
committed by GitHub
parent 306fe4a91b
commit 6b6eac2518
41 changed files with 1887 additions and 1007 deletions
+89
View File
@@ -0,0 +1,89 @@
package agentconn
import (
"net/url"
"github.com/google/uuid"
"golang.org/x/xerrors"
"github.com/coder/coder/coderd/httpapi"
)
type ConnectionMode string
const (
ConnectionModeDirect ConnectionMode = "direct"
ConnectionModeDerp ConnectionMode = "derp"
)
type Config struct {
// AgentID is the ID of the agent to connect to.
AgentID uuid.UUID `json:"agent_id"`
// ConnectionMode is the strategy to use when connecting to the agent.
ConnectionMode ConnectionMode `json:"connection_mode"`
// HoldDuration is the duration to hold the connection open for. If set to
// 0, the connection will be closed immediately after making each request
// once.
HoldDuration httpapi.Duration `json:"hold_duration"`
// Connections is the list of connections to make to services running
// inside the workspace. Only HTTP connections are supported.
Connections []Connection `json:"connections"`
}
type Connection struct {
// URL is the address to connect to (e.g. "http://127.0.0.1:8080/path"). The
// endpoint must respond with a any response within timeout. The IP address
// is ignored and the connection is made to the agent's WireGuard IP
// instead.
URL string `json:"url"`
// Interval is the duration to wait between connections to this endpoint. If
// set to 0, the connection will only be made once. Must be set to 0 if
// the parent config's hold_duration is set to 0.
Interval httpapi.Duration `json:"interval"`
// Timeout is the duration to wait for a connection to this endpoint to
// succeed. If set to 0, the default timeout will be used.
Timeout httpapi.Duration `json:"timeout"`
}
func (c Config) Validate() error {
if c.AgentID == uuid.Nil {
return xerrors.New("agent_id must be set")
}
if c.ConnectionMode == "" {
return xerrors.New("connection_mode must be set")
}
switch c.ConnectionMode {
case ConnectionModeDirect:
case ConnectionModeDerp:
default:
return xerrors.Errorf("invalid connection_mode: %q", c.ConnectionMode)
}
if c.HoldDuration < 0 {
return xerrors.New("hold_duration must be a positive value")
}
for i, conn := range c.Connections {
if conn.URL == "" {
return xerrors.Errorf("connections[%d].url must be set", i)
}
u, err := url.Parse(conn.URL)
if err != nil {
return xerrors.Errorf("connections[%d].url is not a valid URL: %w", i, err)
}
if u.Scheme != "http" {
return xerrors.Errorf("connections[%d].url has an unsupported scheme %q, only http is supported", i, u.Scheme)
}
if conn.Interval < 0 {
return xerrors.Errorf("connections[%d].interval must be a positive value", i)
}
if conn.Interval > 0 && c.HoldDuration == 0 {
return xerrors.Errorf("connections[%d].interval must be 0 if hold_duration is 0", i)
}
if conn.Timeout < 0 {
return xerrors.Errorf("connections[%d].timeout must be a positive value", i)
}
}
return nil
}
+184
View File
@@ -0,0 +1,184 @@
package agentconn_test
import (
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/coder/coder/coderd/httpapi"
"github.com/coder/coder/scaletest/agentconn"
)
func Test_Config(t *testing.T) {
t.Parallel()
id := uuid.New()
cases := []struct {
name string
config agentconn.Config
errContains string
}{
{
name: "OK",
config: agentconn.Config{
AgentID: id,
ConnectionMode: agentconn.ConnectionModeDirect,
HoldDuration: httpapi.Duration(time.Minute),
Connections: []agentconn.Connection{
{
URL: "http://localhost:8080/path",
Interval: httpapi.Duration(time.Second),
Timeout: httpapi.Duration(time.Second),
},
{
URL: "http://localhost:8000/differentpath",
Interval: httpapi.Duration(2 * time.Second),
Timeout: httpapi.Duration(2 * time.Second),
},
},
},
},
{
name: "NoAgentID",
config: agentconn.Config{
AgentID: uuid.Nil,
ConnectionMode: agentconn.ConnectionModeDirect,
HoldDuration: 0,
Connections: nil,
},
errContains: "agent_id must be set",
},
{
name: "NoConnectionMode",
config: agentconn.Config{
AgentID: id,
ConnectionMode: "",
HoldDuration: 0,
Connections: nil,
},
errContains: "connection_mode must be set",
},
{
name: "InvalidConnectionMode",
config: agentconn.Config{
AgentID: id,
ConnectionMode: "blah",
HoldDuration: 0,
Connections: nil,
},
errContains: "invalid connection_mode",
},
{
name: "NegativeHoldDuration",
config: agentconn.Config{
AgentID: id,
ConnectionMode: agentconn.ConnectionModeDerp,
HoldDuration: -1,
Connections: nil,
},
errContains: "hold_duration must be a positive value",
},
{
name: "ConnectionNoURL",
config: agentconn.Config{
AgentID: id,
ConnectionMode: agentconn.ConnectionModeDirect,
HoldDuration: 1,
Connections: []agentconn.Connection{{
URL: "",
Interval: 0,
Timeout: 0,
}},
},
errContains: "connections[0].url must be set",
},
{
name: "ConnectionInvalidURL",
config: agentconn.Config{
AgentID: id,
ConnectionMode: agentconn.ConnectionModeDirect,
HoldDuration: 1,
Connections: []agentconn.Connection{{
URL: string([]byte{0x7f}),
Interval: 0,
Timeout: 0,
}},
},
errContains: "connections[0].url is not a valid URL",
},
{
name: "ConnectionInvalidURLScheme",
config: agentconn.Config{
AgentID: id,
ConnectionMode: agentconn.ConnectionModeDirect,
HoldDuration: 1,
Connections: []agentconn.Connection{{
URL: "blah://localhost:8080",
Interval: 0,
Timeout: 0,
}},
},
errContains: "connections[0].url has an unsupported scheme",
},
{
name: "ConnectionNegativeInterval",
config: agentconn.Config{
AgentID: id,
ConnectionMode: agentconn.ConnectionModeDirect,
HoldDuration: 1,
Connections: []agentconn.Connection{{
URL: "http://localhost:8080",
Interval: -1,
Timeout: 0,
}},
},
errContains: "connections[0].interval must be a positive value",
},
{
name: "ConnectionIntervalMustBeZero",
config: agentconn.Config{
AgentID: id,
ConnectionMode: agentconn.ConnectionModeDirect,
HoldDuration: 0,
Connections: []agentconn.Connection{{
URL: "http://localhost:8080",
Interval: 1,
Timeout: 0,
}},
},
errContains: "connections[0].interval must be 0 if hold_duration is 0",
},
{
name: "ConnectionNegativeTimeout",
config: agentconn.Config{
AgentID: id,
ConnectionMode: agentconn.ConnectionModeDirect,
HoldDuration: 1,
Connections: []agentconn.Connection{{
URL: "http://localhost:8080",
Interval: 0,
Timeout: -1,
}},
},
errContains: "connections[0].timeout must be a positive value",
},
}
for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
t.Parallel()
err := c.config.Validate()
if c.errContains != "" {
require.Error(t, err)
require.Contains(t, err.Error(), c.errContains)
} else {
require.NoError(t, err)
}
})
}
}
+382
View File
@@ -0,0 +1,382 @@
package agentconn
import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/netip"
"net/url"
"strconv"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/scaletest/harness"
"github.com/coder/coder/scaletest/loadtestutil"
)
const defaultRequestTimeout = 5 * time.Second
type holdDurationEndedError struct{}
func (holdDurationEndedError) Error() string {
return "hold duration ended"
}
type Runner struct {
client *codersdk.Client
cfg Config
}
var _ harness.Runnable = &Runner{}
func NewRunner(client *codersdk.Client, cfg Config) *Runner {
return &Runner{
client: client,
cfg: cfg,
}
}
// Run implements Runnable.
func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
ctx, span := tracing.StartSpan(ctx)
defer span.End()
logs = loadtestutil.NewSyncWriter(logs)
logger := slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug)
r.client.Logger = logger
r.client.LogBodies = true
_, _ = fmt.Fprintln(logs, "Opening connection to workspace agent")
switch r.cfg.ConnectionMode {
case ConnectionModeDirect:
_, _ = fmt.Fprintln(logs, "\tUsing direct connection...")
case ConnectionModeDerp:
_, _ = fmt.Fprintln(logs, "\tUsing proxied DERP connection through coder server...")
}
conn, err := r.client.DialWorkspaceAgent(ctx, r.cfg.AgentID, &codersdk.DialWorkspaceAgentOptions{
Logger: logger.Named("agentconn"),
// If the config requested DERP, then force DERP.
BlockEndpoints: r.cfg.ConnectionMode == ConnectionModeDerp,
})
if err != nil {
return xerrors.Errorf("dial workspace agent: %w", err)
}
defer conn.Close()
err = waitForDisco(ctx, logs, conn)
if err != nil {
return xerrors.Errorf("wait for discovery connection: %w", err)
}
// Wait for a direct connection if requested.
if r.cfg.ConnectionMode == ConnectionModeDirect {
err = waitForDirectConnection(ctx, logs, conn)
if err != nil {
return xerrors.Errorf("wait for direct connection: %w", err)
}
}
// Ensure DERP for completeness.
if r.cfg.ConnectionMode == ConnectionModeDerp {
status := conn.Status()
if len(status.Peers()) != 1 {
return xerrors.Errorf("check connection mode: expected 1 peer, got %d", len(status.Peers()))
}
peer := status.Peer[status.Peers()[0]]
if peer.Relay == "" || peer.CurAddr != "" {
return xerrors.Errorf("check connection mode: peer is connected directly, not via DERP")
}
}
_, _ = fmt.Fprint(logs, "\nConnection established.\n\n")
// HACK: even though the ping passed above, we still need to open a
// connection to the agent to ensure it's ready to accept connections. Not
// sure why this is the case but it seems to be necessary.
err = verifyConnection(ctx, logs, conn)
if err != nil {
return xerrors.Errorf("verify connection: %w", err)
}
_, _ = fmt.Fprint(logs, "\nConnection verified.\n\n")
// Make initial connections sequentially to ensure the services are
// reachable before we start spawning a bunch of goroutines and tickers.
err = performInitialConnections(ctx, logs, conn, r.cfg.Connections)
if err != nil {
return xerrors.Errorf("perform initial connections: %w", err)
}
if r.cfg.HoldDuration > 0 {
err = holdConnection(ctx, logs, conn, time.Duration(r.cfg.HoldDuration), r.cfg.Connections)
if err != nil {
return xerrors.Errorf("hold connection: %w", err)
}
}
err = conn.Close()
if err != nil {
return xerrors.Errorf("close connection: %w", err)
}
return nil
}
func waitForDisco(ctx context.Context, logs io.Writer, conn *codersdk.AgentConn) error {
const pingAttempts = 10
const pingDelay = 1 * time.Second
ctx, span := tracing.StartSpan(ctx)
defer span.End()
for i := 0; i < pingAttempts; i++ {
_, _ = fmt.Fprintf(logs, "\tDisco ping attempt %d/%d...\n", i+1, pingAttempts)
pingCtx, cancel := context.WithTimeout(ctx, defaultRequestTimeout)
_, err := conn.Ping(pingCtx)
cancel()
if err == nil {
break
}
if i == pingAttempts-1 {
return xerrors.Errorf("ping workspace agent: %w", err)
}
select {
case <-ctx.Done():
return xerrors.Errorf("wait for connection to be established: %w", ctx.Err())
// We use time.After here since it's a very short duration so leaking a
// timer is fine.
case <-time.After(pingDelay):
}
}
return nil
}
func waitForDirectConnection(ctx context.Context, logs io.Writer, conn *codersdk.AgentConn) error {
const directConnectionAttempts = 30
const directConnectionDelay = 1 * time.Second
ctx, span := tracing.StartSpan(ctx)
defer span.End()
for i := 0; i < directConnectionAttempts; i++ {
_, _ = fmt.Fprintf(logs, "\tDirect connection check %d/%d...\n", i+1, directConnectionAttempts)
status := conn.Status()
var err error
if len(status.Peers()) != 1 {
_, _ = fmt.Fprintf(logs, "\t\tExpected 1 peer, found %d", len(status.Peers()))
err = xerrors.Errorf("expected 1 peer, got %d", len(status.Peers()))
} else {
peer := status.Peer[status.Peers()[0]]
_, _ = fmt.Fprintf(logs, "\t\tCurAddr: %s\n", peer.CurAddr)
_, _ = fmt.Fprintf(logs, "\t\tRelay: %s\n", peer.Relay)
if peer.Relay != "" && peer.CurAddr == "" {
err = xerrors.Errorf("peer is connected via DERP, not direct")
}
}
if err == nil {
break
}
if i == directConnectionAttempts-1 {
return xerrors.Errorf("wait for direct connection to agent: %w", err)
}
select {
case <-ctx.Done():
return xerrors.Errorf("wait for direct connection to agent: %w", ctx.Err())
// We use time.After here since it's a very short duration so
// leaking a timer is fine.
case <-time.After(directConnectionDelay):
}
}
return nil
}
func verifyConnection(ctx context.Context, logs io.Writer, conn *codersdk.AgentConn) error {
const verifyConnectionAttempts = 30
const verifyConnectionDelay = 1 * time.Second
ctx, span := tracing.StartSpan(ctx)
defer span.End()
client := agentHTTPClient(conn)
for i := 0; i < verifyConnectionAttempts; i++ {
_, _ = fmt.Fprintf(logs, "\tVerify connection attempt %d/%d...\n", i+1, verifyConnectionAttempts)
verifyCtx, cancel := context.WithTimeout(ctx, defaultRequestTimeout)
u := &url.URL{
Scheme: "http",
Host: net.JoinHostPort("localhost", strconv.Itoa(codersdk.TailnetStatisticsPort)),
Path: "/",
}
req, err := http.NewRequestWithContext(verifyCtx, http.MethodGet, u.String(), nil)
if err != nil {
cancel()
return xerrors.Errorf("new verify connection request to %q: %w", u.String(), err)
}
resp, err := client.Do(req)
cancel()
if err == nil {
_ = resp.Body.Close()
break
}
if i == verifyConnectionAttempts-1 {
return xerrors.Errorf("verify connection: %w", err)
}
select {
case <-ctx.Done():
return xerrors.Errorf("verify connection: %w", ctx.Err())
case <-time.After(verifyConnectionDelay):
}
}
return nil
}
func performInitialConnections(ctx context.Context, logs io.Writer, conn *codersdk.AgentConn, specs []Connection) error {
if len(specs) == 0 {
return nil
}
ctx, span := tracing.StartSpan(ctx)
defer span.End()
_, _ = fmt.Fprintln(logs, "Performing initial service connections...")
client := agentHTTPClient(conn)
for i, connSpec := range specs {
_, _ = fmt.Fprintf(logs, "\t%d. %s\n", i, connSpec.URL)
timeout := defaultRequestTimeout
if connSpec.Timeout > 0 {
timeout = time.Duration(connSpec.Timeout)
}
ctx, cancel := context.WithTimeout(ctx, timeout)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, connSpec.URL, nil)
if err != nil {
cancel()
return xerrors.Errorf("create request: %w", err)
}
res, err := client.Do(req)
cancel()
if err != nil {
_, _ = fmt.Fprintf(logs, "\t\tFailed: %+v\n", err)
return xerrors.Errorf("make initial connection to conn spec %d %q: %w", i, connSpec.URL, err)
}
_ = res.Body.Close()
_, _ = fmt.Fprintln(logs, "\t\tOK")
}
return nil
}
func holdConnection(ctx context.Context, logs io.Writer, conn *codersdk.AgentConn, holdDur time.Duration, specs []Connection) error {
ctx, span := tracing.StartSpan(ctx)
defer span.End()
eg, egCtx := errgroup.WithContext(ctx)
client := agentHTTPClient(conn)
if len(specs) > 0 {
_, _ = fmt.Fprintln(logs, "\nStarting connection loops...")
}
for i, connSpec := range specs {
i, connSpec := i, connSpec
if connSpec.Interval <= 0 {
continue
}
eg.Go(func() error {
t := time.NewTicker(time.Duration(connSpec.Interval))
defer t.Stop()
timeout := defaultRequestTimeout
if connSpec.Timeout > 0 {
timeout = time.Duration(connSpec.Timeout)
}
for {
select {
case <-egCtx.Done():
return egCtx.Err()
case <-t.C:
ctx, cancel := context.WithTimeout(ctx, timeout)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, connSpec.URL, nil)
if err != nil {
cancel()
return xerrors.Errorf("create request: %w", err)
}
res, err := client.Do(req)
cancel()
if err != nil {
_, _ = fmt.Fprintf(logs, "\tERR: %s (%d): %+v\n", connSpec.URL, i, err)
return xerrors.Errorf("make connection to conn spec %d %q: %w", i, connSpec.URL, err)
}
res.Body.Close()
_, _ = fmt.Fprintf(logs, "\tOK: %s (%d)\n", connSpec.URL, i)
t.Reset(time.Duration(connSpec.Interval))
}
}
})
}
// Wait for the hold duration to end. We use a fake error to signal that
// the hold duration has ended.
_, _ = fmt.Fprintf(logs, "\nWaiting for %s...\n", holdDur)
eg.Go(func() error {
t := time.NewTicker(holdDur)
defer t.Stop()
select {
case <-egCtx.Done():
return egCtx.Err()
case <-t.C:
// Returning an error here will cause the errgroup context to
// be canceled, which is what we want. This fake error is
// ignored below.
return holdDurationEndedError{}
}
})
err := eg.Wait()
if err != nil && !xerrors.Is(err, holdDurationEndedError{}) {
return xerrors.Errorf("run connections loop: %w", err)
}
return nil
}
func agentHTTPClient(conn *codersdk.AgentConn) *http.Client {
return &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
_, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, xerrors.Errorf("split host port %q: %w", addr, err)
}
portUint, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return nil, xerrors.Errorf("parse port %q: %w", port, err)
}
return conn.DialContextTCP(ctx, netip.AddrPortFrom(codersdk.TailnetIP, uint16(portUint)))
},
},
}
}
+283
View File
@@ -0,0 +1,283 @@
package agentconn_test
import (
"bytes"
"context"
"fmt"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/agent"
"github.com/coder/coder/coderd/coderdtest"
"github.com/coder/coder/coderd/httpapi"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/provisioner/echo"
"github.com/coder/coder/provisionersdk/proto"
"github.com/coder/coder/scaletest/agentconn"
"github.com/coder/coder/testutil"
)
func Test_Runner(t *testing.T) {
t.Parallel()
t.Run("Derp+Simple", func(t *testing.T) {
t.Parallel()
client, agentID := setupRunnerTest(t)
runner := agentconn.NewRunner(client, agentconn.Config{
AgentID: agentID,
ConnectionMode: agentconn.ConnectionModeDerp,
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
logs := bytes.NewBuffer(nil)
err := runner.Run(ctx, "1", logs)
logStr := logs.String()
t.Log("Runner logs:\n\n" + logStr)
require.NoError(t, err)
require.Contains(t, logStr, "Opening connection to workspace agent")
require.Contains(t, logStr, "Using proxied DERP connection")
require.Contains(t, logStr, "Disco ping attempt 1/10...")
require.Contains(t, logStr, "Connection established")
require.Contains(t, logStr, "Verify connection attempt 1/30...")
require.Contains(t, logStr, "Connection verified")
require.NotContains(t, logStr, "Performing initial service connections")
require.NotContains(t, logStr, "Starting connection loops")
require.NotContains(t, logStr, "Waiting for ")
})
//nolint:paralleltest // Measures timing as part of the test.
t.Run("Direct+Hold", func(t *testing.T) {
client, agentID := setupRunnerTest(t)
runner := agentconn.NewRunner(client, agentconn.Config{
AgentID: agentID,
ConnectionMode: agentconn.ConnectionModeDirect,
HoldDuration: httpapi.Duration(testutil.WaitShort),
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
logs := bytes.NewBuffer(nil)
start := time.Now()
err := runner.Run(ctx, "1", logs)
logStr := logs.String()
t.Log("Runner logs:\n\n" + logStr)
require.NoError(t, err)
require.WithinRange(t,
time.Now(),
start.Add(testutil.WaitShort-time.Second),
start.Add(testutil.WaitShort+5*time.Second),
)
require.Contains(t, logStr, "Opening connection to workspace agent")
require.Contains(t, logStr, "Using direct connection")
require.Contains(t, logStr, "Disco ping attempt 1/10...")
require.Contains(t, logStr, "Direct connection check 1/30...")
require.Contains(t, logStr, "Connection established")
require.Contains(t, logStr, "Verify connection attempt 1/30...")
require.Contains(t, logStr, "Connection verified")
require.NotContains(t, logStr, "Performing initial service connections")
require.NotContains(t, logStr, "Starting connection loops")
require.Contains(t, logStr, fmt.Sprintf("Waiting for %s", testutil.WaitShort))
})
t.Run("Derp+ServicesNoHold", func(t *testing.T) {
t.Parallel()
client, agentID := setupRunnerTest(t)
service1URL, service1Count := testServer(t)
service2URL, service2Count := testServer(t)
runner := agentconn.NewRunner(client, agentconn.Config{
AgentID: agentID,
ConnectionMode: agentconn.ConnectionModeDerp,
HoldDuration: 0,
Connections: []agentconn.Connection{
{
URL: service1URL,
Timeout: httpapi.Duration(time.Second),
},
{
URL: service2URL,
Timeout: httpapi.Duration(time.Second),
},
},
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
logs := bytes.NewBuffer(nil)
err := runner.Run(ctx, "1", logs)
logStr := logs.String()
t.Log("Runner logs:\n\n" + logStr)
require.NoError(t, err)
require.Contains(t, logStr, "Opening connection to workspace agent")
require.Contains(t, logStr, "Using proxied DERP connection")
require.Contains(t, logStr, "Disco ping attempt 1/10...")
require.Contains(t, logStr, "Connection established")
require.Contains(t, logStr, "Verify connection attempt 1/30...")
require.Contains(t, logStr, "Connection verified")
require.Contains(t, logStr, "Performing initial service connections")
require.Contains(t, logStr, "0. "+service1URL)
require.Contains(t, logStr, "1. "+service2URL)
require.NotContains(t, logStr, "Starting connection loops")
require.NotContains(t, logStr, "Waiting for ")
require.EqualValues(t, 1, service1Count())
require.EqualValues(t, 1, service2Count())
})
//nolint:paralleltest // Measures timing as part of the test.
t.Run("Derp+Hold+Services", func(t *testing.T) {
client, agentID := setupRunnerTest(t)
service1URL, service1Count := testServer(t)
service2URL, service2Count := testServer(t)
service3URL, service3Count := testServer(t)
runner := agentconn.NewRunner(client, agentconn.Config{
AgentID: agentID,
ConnectionMode: agentconn.ConnectionModeDerp,
HoldDuration: httpapi.Duration(testutil.WaitShort),
Connections: []agentconn.Connection{
{
URL: service1URL,
// No interval.
Timeout: httpapi.Duration(time.Second),
},
{
URL: service2URL,
Interval: httpapi.Duration(1 * time.Second),
Timeout: httpapi.Duration(time.Second),
},
{
URL: service3URL,
Interval: httpapi.Duration(500 * time.Millisecond),
Timeout: httpapi.Duration(time.Second),
},
},
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
logs := bytes.NewBuffer(nil)
start := time.Now()
err := runner.Run(ctx, "1", logs)
logStr := logs.String()
t.Log("Runner logs:\n\n" + logStr)
require.NoError(t, err)
require.WithinRange(t,
time.Now(),
start.Add(testutil.WaitShort-time.Second),
start.Add(testutil.WaitShort+5*time.Second),
)
require.Contains(t, logStr, "Opening connection to workspace agent")
require.Contains(t, logStr, "Using proxied DERP connection")
require.Contains(t, logStr, "Disco ping attempt 1/10...")
require.Contains(t, logStr, "Connection established")
require.Contains(t, logStr, "Verify connection attempt 1/30...")
require.Contains(t, logStr, "Connection verified")
require.Contains(t, logStr, "Performing initial service connections")
require.Contains(t, logStr, "0. "+service1URL)
require.Contains(t, logStr, "1. "+service2URL)
require.Contains(t, logStr, "Starting connection loops")
require.NotContains(t, logStr, fmt.Sprintf("OK: %s (0)", service1URL))
require.Contains(t, logStr, fmt.Sprintf("OK: %s (1)", service2URL))
require.Contains(t, logStr, fmt.Sprintf("OK: %s (2)", service3URL))
require.Contains(t, logStr, fmt.Sprintf("Waiting for %s", testutil.WaitShort))
t.Logf("service 1 called %d times", service1Count())
t.Logf("service 2 called %d times", service2Count())
t.Logf("service 3 called %d times", service3Count())
require.EqualValues(t, 1, service1Count())
require.NotEqualValues(t, 1, service2Count())
require.NotEqualValues(t, 1, service3Count())
// service 3 should've been called way more times than service 2
require.True(t, service3Count() > service2Count()+2)
})
}
func setupRunnerTest(t *testing.T) (client *codersdk.Client, agentID uuid.UUID) {
t.Helper()
client = coderdtest.New(t, &coderdtest.Options{
IncludeProvisionerDaemon: true,
})
user := coderdtest.CreateFirstUser(t, client)
authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionPlan: echo.ProvisionComplete,
ProvisionApply: []*proto.Provision_Response{{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Resources: []*proto.Resource{{
Name: "example",
Type: "aws_instance",
Agents: []*proto.Agent{{
Id: uuid.NewString(),
Name: "agent",
Auth: &proto.Agent_Token{
Token: authToken,
},
Apps: []*proto.App{},
}},
}},
},
},
}},
})
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
agentClient := codersdk.New(client.URL)
agentClient.SetSessionToken(authToken)
agentCloser := agent.New(agent.Options{
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
t.Cleanup(func() {
_ = agentCloser.Close()
})
resources := coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID)
return client, resources[0].Agents[0].ID
}
func testServer(t *testing.T) (string, func() int64) {
t.Helper()
var count int64
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&count, 1)
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(srv.Close)
return srv.URL, func() int64 {
return atomic.LoadInt64(&count)
}
}