From 4c5b737368e91d0b9b022d8d88cd7076c7a9cb4c Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 18 Dec 2024 11:26:38 +0200 Subject: [PATCH] fix: accumulate agentstats until reported and fix insights DAU offset (#15832) --- agent/stats.go | 17 ++++++++++++----- agent/stats_internal_test.go | 18 +++++++++++++++--- coderd/insights.go | 2 +- coderd/insights_test.go | 25 ++++++++++++++++++++++--- 4 files changed, 50 insertions(+), 12 deletions(-) diff --git a/agent/stats.go b/agent/stats.go index 2615ab3396..898d7117c6 100644 --- a/agent/stats.go +++ b/agent/stats.go @@ -2,6 +2,7 @@ package agent import ( "context" + "maps" "sync" "time" @@ -32,7 +33,7 @@ type statsDest interface { // statsDest (agent API in prod) type statsReporter struct { *sync.Cond - networkStats *map[netlogtype.Connection]netlogtype.Counts + networkStats map[netlogtype.Connection]netlogtype.Counts unreported bool lastInterval time.Duration @@ -54,8 +55,15 @@ func (s *statsReporter) callback(_, _ time.Time, virtual, _ map[netlogtype.Conne s.L.Lock() defer s.L.Unlock() s.logger.Debug(context.Background(), "got stats callback") - s.networkStats = &virtual - s.unreported = true + // Accumulate stats until they've been reported. + if s.unreported && len(s.networkStats) > 0 { + for k, v := range virtual { + s.networkStats[k] = s.networkStats[k].Add(v) + } + } else { + s.networkStats = maps.Clone(virtual) + s.unreported = true + } s.Broadcast() } @@ -96,9 +104,8 @@ func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error { if ctxDone { return nil } - networkStats := *s.networkStats s.unreported = false - if err = s.reportLocked(ctx, dest, networkStats); err != nil { + if err = s.reportLocked(ctx, dest, s.networkStats); err != nil { return xerrors.Errorf("report stats: %w", err) } } diff --git a/agent/stats_internal_test.go b/agent/stats_internal_test.go index e032910ee3..9fd6aa102a 100644 --- a/agent/stats_internal_test.go +++ b/agent/stats_internal_test.go @@ -64,7 +64,7 @@ func TestStatsReporter(t *testing.T) { require.Equal(t, netStats, gotNetStats) // while we are collecting the stats, send in two new netStats to simulate - // what happens if we don't keep up. Only the latest should be kept. + // what happens if we don't keep up. The stats should be accumulated. netStats0 := map[netlogtype.Connection]netlogtype.Counts{ { Proto: ipproto.TCP, @@ -102,9 +102,21 @@ func TestStatsReporter(t *testing.T) { require.Equal(t, stats, update.Stats) testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.UpdateStatsResponse{ReportInterval: durationpb.New(interval)}) - // second update -- only netStats1 is reported + // second update -- netStat0 and netStats1 are accumulated and reported + wantNetStats := map[netlogtype.Connection]netlogtype.Counts{ + { + Proto: ipproto.TCP, + Src: netip.MustParseAddrPort("192.168.1.33:4887"), + Dst: netip.MustParseAddrPort("192.168.2.99:9999"), + }: { + TxPackets: 21, + TxBytes: 21, + RxPackets: 21, + RxBytes: 21, + }, + } gotNetStats = testutil.RequireRecvCtx(ctx, t, fCollector.calls) - require.Equal(t, netStats1, gotNetStats) + require.Equal(t, wantNetStats, gotNetStats) stats = &proto.Stats{SessionCountJetbrains: 66} testutil.RequireSendCtx(ctx, t, fCollector.stats, stats) update = testutil.RequireRecvCtx(ctx, t, fDest.reqs) diff --git a/coderd/insights.go b/coderd/insights.go index 7234a88d44..d5faacee90 100644 --- a/coderd/insights.go +++ b/coderd/insights.go @@ -89,7 +89,7 @@ func (api *API) returnDAUsInternal(rw http.ResponseWriter, r *http.Request, temp } for _, row := range rows { resp.Entries = append(resp.Entries, codersdk.DAUEntry{ - Date: row.StartTime.Format(time.DateOnly), + Date: row.StartTime.In(loc).Format(time.DateOnly), Amount: int(row.ActiveUsers), }) } diff --git a/coderd/insights_test.go b/coderd/insights_test.go index b47bc8ada5..43ef04435c 100644 --- a/coderd/insights_test.go +++ b/coderd/insights_test.go @@ -48,16 +48,27 @@ func TestDeploymentInsights(t *testing.T) { db, ps := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure()) logger := testutil.Logger(t) rollupEvents := make(chan dbrollup.Event) + statsInterval := 500 * time.Millisecond + // Speed up the test by controlling batch size and interval. + batcher, closeBatcher, err := workspacestats.NewBatcher(context.Background(), + workspacestats.BatcherWithLogger(logger.Named("batcher").Leveled(slog.LevelDebug)), + workspacestats.BatcherWithStore(db), + workspacestats.BatcherWithBatchSize(1), + workspacestats.BatcherWithInterval(statsInterval), + ) + require.NoError(t, err) + defer closeBatcher() client := coderdtest.New(t, &coderdtest.Options{ Database: db, Pubsub: ps, Logger: &logger, IncludeProvisionerDaemon: true, - AgentStatsRefreshInterval: time.Millisecond * 100, + AgentStatsRefreshInterval: statsInterval, + StatsBatcher: batcher, DatabaseRolluper: dbrollup.New( logger.Named("dbrollup").Leveled(slog.LevelDebug), db, - dbrollup.WithInterval(time.Millisecond*100), + dbrollup.WithInterval(statsInterval/2), dbrollup.WithEventChannel(rollupEvents), ), }) @@ -76,7 +87,7 @@ func TestDeploymentInsights(t *testing.T) { workspace := coderdtest.CreateWorkspace(t, client, template.ID) coderdtest.AwaitWorkspaceBuildJobCompleted(t, client, workspace.LatestBuild.ID) - ctx := testutil.Context(t, testutil.WaitLong) + ctx := testutil.Context(t, testutil.WaitSuperLong) // Pre-check, no permission issues. daus, err := client.DeploymentDAUs(ctx, codersdk.TimezoneOffsetHour(clientTz)) @@ -108,6 +119,13 @@ func TestDeploymentInsights(t *testing.T) { err = sess.Start("cat") require.NoError(t, err) + select { + case <-ctx.Done(): + require.Fail(t, "timed out waiting for initial rollup event", ctx.Err()) + case ev := <-rollupEvents: + require.True(t, ev.Init, "want init event") + } + for { select { case <-ctx.Done(): @@ -120,6 +138,7 @@ func TestDeploymentInsights(t *testing.T) { if len(daus.Entries) > 0 && daus.Entries[len(daus.Entries)-1].Amount > 0 { break } + t.Logf("waiting for deployment daus to update: %+v", daus) } }