fix: accumulate agentstats until reported and fix insights DAU offset (#15832)

This commit is contained in:
Mathias Fredriksson
2024-12-18 11:26:38 +02:00
committed by GitHub
parent 77dc510a45
commit 4c5b737368
4 changed files with 50 additions and 12 deletions
+12 -5
View File
@@ -2,6 +2,7 @@ package agent
import (
"context"
"maps"
"sync"
"time"
@@ -32,7 +33,7 @@ type statsDest interface {
// statsDest (agent API in prod)
type statsReporter struct {
*sync.Cond
networkStats *map[netlogtype.Connection]netlogtype.Counts
networkStats map[netlogtype.Connection]netlogtype.Counts
unreported bool
lastInterval time.Duration
@@ -54,8 +55,15 @@ func (s *statsReporter) callback(_, _ time.Time, virtual, _ map[netlogtype.Conne
s.L.Lock()
defer s.L.Unlock()
s.logger.Debug(context.Background(), "got stats callback")
s.networkStats = &virtual
s.unreported = true
// Accumulate stats until they've been reported.
if s.unreported && len(s.networkStats) > 0 {
for k, v := range virtual {
s.networkStats[k] = s.networkStats[k].Add(v)
}
} else {
s.networkStats = maps.Clone(virtual)
s.unreported = true
}
s.Broadcast()
}
@@ -96,9 +104,8 @@ func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error {
if ctxDone {
return nil
}
networkStats := *s.networkStats
s.unreported = false
if err = s.reportLocked(ctx, dest, networkStats); err != nil {
if err = s.reportLocked(ctx, dest, s.networkStats); err != nil {
return xerrors.Errorf("report stats: %w", err)
}
}
+15 -3
View File
@@ -64,7 +64,7 @@ func TestStatsReporter(t *testing.T) {
require.Equal(t, netStats, gotNetStats)
// while we are collecting the stats, send in two new netStats to simulate
// what happens if we don't keep up. Only the latest should be kept.
// what happens if we don't keep up. The stats should be accumulated.
netStats0 := map[netlogtype.Connection]netlogtype.Counts{
{
Proto: ipproto.TCP,
@@ -102,9 +102,21 @@ func TestStatsReporter(t *testing.T) {
require.Equal(t, stats, update.Stats)
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.UpdateStatsResponse{ReportInterval: durationpb.New(interval)})
// second update -- only netStats1 is reported
// second update -- netStat0 and netStats1 are accumulated and reported
wantNetStats := map[netlogtype.Connection]netlogtype.Counts{
{
Proto: ipproto.TCP,
Src: netip.MustParseAddrPort("192.168.1.33:4887"),
Dst: netip.MustParseAddrPort("192.168.2.99:9999"),
}: {
TxPackets: 21,
TxBytes: 21,
RxPackets: 21,
RxBytes: 21,
},
}
gotNetStats = testutil.RequireRecvCtx(ctx, t, fCollector.calls)
require.Equal(t, netStats1, gotNetStats)
require.Equal(t, wantNetStats, gotNetStats)
stats = &proto.Stats{SessionCountJetbrains: 66}
testutil.RequireSendCtx(ctx, t, fCollector.stats, stats)
update = testutil.RequireRecvCtx(ctx, t, fDest.reqs)
+1 -1
View File
@@ -89,7 +89,7 @@ func (api *API) returnDAUsInternal(rw http.ResponseWriter, r *http.Request, temp
}
for _, row := range rows {
resp.Entries = append(resp.Entries, codersdk.DAUEntry{
Date: row.StartTime.Format(time.DateOnly),
Date: row.StartTime.In(loc).Format(time.DateOnly),
Amount: int(row.ActiveUsers),
})
}
+22 -3
View File
@@ -48,16 +48,27 @@ func TestDeploymentInsights(t *testing.T) {
db, ps := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure())
logger := testutil.Logger(t)
rollupEvents := make(chan dbrollup.Event)
statsInterval := 500 * time.Millisecond
// Speed up the test by controlling batch size and interval.
batcher, closeBatcher, err := workspacestats.NewBatcher(context.Background(),
workspacestats.BatcherWithLogger(logger.Named("batcher").Leveled(slog.LevelDebug)),
workspacestats.BatcherWithStore(db),
workspacestats.BatcherWithBatchSize(1),
workspacestats.BatcherWithInterval(statsInterval),
)
require.NoError(t, err)
defer closeBatcher()
client := coderdtest.New(t, &coderdtest.Options{
Database: db,
Pubsub: ps,
Logger: &logger,
IncludeProvisionerDaemon: true,
AgentStatsRefreshInterval: time.Millisecond * 100,
AgentStatsRefreshInterval: statsInterval,
StatsBatcher: batcher,
DatabaseRolluper: dbrollup.New(
logger.Named("dbrollup").Leveled(slog.LevelDebug),
db,
dbrollup.WithInterval(time.Millisecond*100),
dbrollup.WithInterval(statsInterval/2),
dbrollup.WithEventChannel(rollupEvents),
),
})
@@ -76,7 +87,7 @@ func TestDeploymentInsights(t *testing.T) {
workspace := coderdtest.CreateWorkspace(t, client, template.ID)
coderdtest.AwaitWorkspaceBuildJobCompleted(t, client, workspace.LatestBuild.ID)
ctx := testutil.Context(t, testutil.WaitLong)
ctx := testutil.Context(t, testutil.WaitSuperLong)
// Pre-check, no permission issues.
daus, err := client.DeploymentDAUs(ctx, codersdk.TimezoneOffsetHour(clientTz))
@@ -108,6 +119,13 @@ func TestDeploymentInsights(t *testing.T) {
err = sess.Start("cat")
require.NoError(t, err)
select {
case <-ctx.Done():
require.Fail(t, "timed out waiting for initial rollup event", ctx.Err())
case ev := <-rollupEvents:
require.True(t, ev.Init, "want init event")
}
for {
select {
case <-ctx.Done():
@@ -120,6 +138,7 @@ func TestDeploymentInsights(t *testing.T) {
if len(daus.Entries) > 0 && daus.Entries[len(daus.Entries)-1].Amount > 0 {
break
}
t.Logf("waiting for deployment daus to update: %+v", daus)
}
}