mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
feat: add Prometheus metrics for boundary log drop reporting (#22521)
Add Prometheus metrics to the boundary log proxy for observability: - batches_dropped_total (reason: buffer_full, forward_failed) - logs_dropped_total (reason: buffer_full, forward_failed, boundary_channel_full, boundary_batch_full) - batches_forwarded_total Also add BoundaryStatus to the BoundaryMessage envelope so boundary can report dropped log counts as a separate wire message. The agent records these as Prometheus metrics, making boundary-side data loss visible. Backwards compatibility for older versions of boundary is maintained. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
+6
-1
@@ -420,7 +420,12 @@ func (a *agent) initSocketServer() {
|
||||
|
||||
// startBoundaryLogProxyServer starts the boundary log proxy socket server.
|
||||
func (a *agent) startBoundaryLogProxyServer() {
|
||||
proxy := boundarylogproxy.NewServer(a.logger, a.boundaryLogProxySocketPath)
|
||||
if a.boundaryLogProxySocketPath == "" {
|
||||
a.logger.Warn(a.hardCtx, "boundary log proxy socket path not defined; not starting proxy")
|
||||
return
|
||||
}
|
||||
|
||||
proxy := boundarylogproxy.NewServer(a.logger, a.boundaryLogProxySocketPath, a.prometheusRegistry)
|
||||
if err := proxy.Start(); err != nil {
|
||||
a.logger.Warn(a.hardCtx, "failed to start boundary log proxy", slog.Error(err))
|
||||
return
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
@@ -69,7 +70,7 @@ func TestBoundaryLogs_EndToEnd(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -34,6 +34,7 @@ type BoundaryMessage struct {
|
||||
// Types that are assignable to Msg:
|
||||
//
|
||||
// *BoundaryMessage_Logs
|
||||
// *BoundaryMessage_Status
|
||||
Msg isBoundaryMessage_Msg `protobuf_oneof:"msg"`
|
||||
}
|
||||
|
||||
@@ -83,6 +84,13 @@ func (x *BoundaryMessage) GetLogs() *proto.ReportBoundaryLogsRequest {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *BoundaryMessage) GetStatus() *BoundaryStatus {
|
||||
if x, ok := x.GetMsg().(*BoundaryMessage_Status); ok {
|
||||
return x.Status
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type isBoundaryMessage_Msg interface {
|
||||
isBoundaryMessage_Msg()
|
||||
}
|
||||
@@ -91,8 +99,75 @@ type BoundaryMessage_Logs struct {
|
||||
Logs *proto.ReportBoundaryLogsRequest `protobuf:"bytes,1,opt,name=logs,proto3,oneof"`
|
||||
}
|
||||
|
||||
type BoundaryMessage_Status struct {
|
||||
Status *BoundaryStatus `protobuf:"bytes,2,opt,name=status,proto3,oneof"`
|
||||
}
|
||||
|
||||
func (*BoundaryMessage_Logs) isBoundaryMessage_Msg() {}
|
||||
|
||||
func (*BoundaryMessage_Status) isBoundaryMessage_Msg() {}
|
||||
|
||||
// BoundaryStatus carries operational metadata from boundary to the agent.
|
||||
// The agent records these values as Prometheus metrics. This message is
|
||||
// never forwarded to coderd.
|
||||
type BoundaryStatus struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
// Logs dropped because boundary's internal channel buffer was full.
|
||||
DroppedChannelFull int64 `protobuf:"varint,1,opt,name=dropped_channel_full,json=droppedChannelFull,proto3" json:"dropped_channel_full,omitempty"`
|
||||
// Logs dropped because boundary's batch buffer was full after a
|
||||
// failed flush attempt.
|
||||
DroppedBatchFull int64 `protobuf:"varint,2,opt,name=dropped_batch_full,json=droppedBatchFull,proto3" json:"dropped_batch_full,omitempty"`
|
||||
}
|
||||
|
||||
func (x *BoundaryStatus) Reset() {
|
||||
*x = BoundaryStatus{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_agent_boundarylogproxy_codec_boundary_proto_msgTypes[1]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *BoundaryStatus) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*BoundaryStatus) ProtoMessage() {}
|
||||
|
||||
func (x *BoundaryStatus) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_agent_boundarylogproxy_codec_boundary_proto_msgTypes[1]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use BoundaryStatus.ProtoReflect.Descriptor instead.
|
||||
func (*BoundaryStatus) Descriptor() ([]byte, []int) {
|
||||
return file_agent_boundarylogproxy_codec_boundary_proto_rawDescGZIP(), []int{1}
|
||||
}
|
||||
|
||||
func (x *BoundaryStatus) GetDroppedChannelFull() int64 {
|
||||
if x != nil {
|
||||
return x.DroppedChannelFull
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *BoundaryStatus) GetDroppedBatchFull() int64 {
|
||||
if x != nil {
|
||||
return x.DroppedBatchFull
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
var File_agent_boundarylogproxy_codec_boundary_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_agent_boundarylogproxy_codec_boundary_proto_rawDesc = []byte{
|
||||
@@ -102,17 +177,29 @@ var file_agent_boundarylogproxy_codec_boundary_proto_rawDesc = []byte{
|
||||
0x6f, 0x64, 0x65, 0x72, 0x2e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x6c, 0x6f, 0x67,
|
||||
0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x63, 0x2e, 0x76, 0x31, 0x1a, 0x17,
|
||||
0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x61, 0x67, 0x65, 0x6e,
|
||||
0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x0f, 0x42, 0x6f, 0x75, 0x6e, 0x64,
|
||||
0x61, 0x72, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3f, 0x0a, 0x04, 0x6c, 0x6f,
|
||||
0x67, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72,
|
||||
0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74,
|
||||
0x42, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75,
|
||||
0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x04, 0x6c, 0x6f, 0x67, 0x73, 0x42, 0x05, 0x0a, 0x03, 0x6d,
|
||||
0x73, 0x67, 0x42, 0x38, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
|
||||
0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f,
|
||||
0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x6c, 0x6f,
|
||||
0x67, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x63, 0x62, 0x06, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x33,
|
||||
0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa4, 0x01, 0x0a, 0x0f, 0x42, 0x6f, 0x75, 0x6e,
|
||||
0x64, 0x61, 0x72, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3f, 0x0a, 0x04, 0x6c,
|
||||
0x6f, 0x67, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x63, 0x6f, 0x64, 0x65,
|
||||
0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x72,
|
||||
0x74, 0x42, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71,
|
||||
0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x04, 0x6c, 0x6f, 0x67, 0x73, 0x12, 0x49, 0x0a, 0x06,
|
||||
0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x63,
|
||||
0x6f, 0x64, 0x65, 0x72, 0x2e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x6c, 0x6f, 0x67,
|
||||
0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x42,
|
||||
0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x00, 0x52,
|
||||
0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x42, 0x05, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x22, 0x70,
|
||||
0x0a, 0x0e, 0x42, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73,
|
||||
0x12, 0x30, 0x0a, 0x14, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x5f, 0x63, 0x68, 0x61, 0x6e,
|
||||
0x6e, 0x65, 0x6c, 0x5f, 0x66, 0x75, 0x6c, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x12,
|
||||
0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x46, 0x75,
|
||||
0x6c, 0x6c, 0x12, 0x2c, 0x0a, 0x12, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x5f, 0x62, 0x61,
|
||||
0x74, 0x63, 0x68, 0x5f, 0x66, 0x75, 0x6c, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x10,
|
||||
0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x42, 0x61, 0x74, 0x63, 0x68, 0x46, 0x75, 0x6c, 0x6c,
|
||||
0x42, 0x38, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63,
|
||||
0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x61, 0x67,
|
||||
0x65, 0x6e, 0x74, 0x2f, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x6c, 0x6f, 0x67, 0x70,
|
||||
0x72, 0x6f, 0x78, 0x79, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
|
||||
0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -127,18 +214,20 @@ func file_agent_boundarylogproxy_codec_boundary_proto_rawDescGZIP() []byte {
|
||||
return file_agent_boundarylogproxy_codec_boundary_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_agent_boundarylogproxy_codec_boundary_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
|
||||
var file_agent_boundarylogproxy_codec_boundary_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
|
||||
var file_agent_boundarylogproxy_codec_boundary_proto_goTypes = []interface{}{
|
||||
(*BoundaryMessage)(nil), // 0: coder.boundarylogproxy.codec.v1.BoundaryMessage
|
||||
(*proto.ReportBoundaryLogsRequest)(nil), // 1: coder.agent.v2.ReportBoundaryLogsRequest
|
||||
(*BoundaryStatus)(nil), // 1: coder.boundarylogproxy.codec.v1.BoundaryStatus
|
||||
(*proto.ReportBoundaryLogsRequest)(nil), // 2: coder.agent.v2.ReportBoundaryLogsRequest
|
||||
}
|
||||
var file_agent_boundarylogproxy_codec_boundary_proto_depIdxs = []int32{
|
||||
1, // 0: coder.boundarylogproxy.codec.v1.BoundaryMessage.logs:type_name -> coder.agent.v2.ReportBoundaryLogsRequest
|
||||
1, // [1:1] is the sub-list for method output_type
|
||||
1, // [1:1] is the sub-list for method input_type
|
||||
1, // [1:1] is the sub-list for extension type_name
|
||||
1, // [1:1] is the sub-list for extension extendee
|
||||
0, // [0:1] is the sub-list for field type_name
|
||||
2, // 0: coder.boundarylogproxy.codec.v1.BoundaryMessage.logs:type_name -> coder.agent.v2.ReportBoundaryLogsRequest
|
||||
1, // 1: coder.boundarylogproxy.codec.v1.BoundaryMessage.status:type_name -> coder.boundarylogproxy.codec.v1.BoundaryStatus
|
||||
2, // [2:2] is the sub-list for method output_type
|
||||
2, // [2:2] is the sub-list for method input_type
|
||||
2, // [2:2] is the sub-list for extension type_name
|
||||
2, // [2:2] is the sub-list for extension extendee
|
||||
0, // [0:2] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_agent_boundarylogproxy_codec_boundary_proto_init() }
|
||||
@@ -159,9 +248,22 @@ func file_agent_boundarylogproxy_codec_boundary_proto_init() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_agent_boundarylogproxy_codec_boundary_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*BoundaryStatus); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
file_agent_boundarylogproxy_codec_boundary_proto_msgTypes[0].OneofWrappers = []interface{}{
|
||||
(*BoundaryMessage_Logs)(nil),
|
||||
(*BoundaryMessage_Status)(nil),
|
||||
}
|
||||
type x struct{}
|
||||
out := protoimpl.TypeBuilder{
|
||||
@@ -169,7 +271,7 @@ func file_agent_boundarylogproxy_codec_boundary_proto_init() {
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: file_agent_boundarylogproxy_codec_boundary_proto_rawDesc,
|
||||
NumEnums: 0,
|
||||
NumMessages: 1,
|
||||
NumMessages: 2,
|
||||
NumExtensions: 0,
|
||||
NumServices: 0,
|
||||
},
|
||||
|
||||
@@ -13,5 +13,17 @@ import "agent/proto/agent.proto";
|
||||
message BoundaryMessage {
|
||||
oneof msg {
|
||||
coder.agent.v2.ReportBoundaryLogsRequest logs = 1;
|
||||
BoundaryStatus status = 2;
|
||||
}
|
||||
}
|
||||
|
||||
// BoundaryStatus carries operational metadata from boundary to the agent.
|
||||
// The agent records these values as Prometheus metrics. This message is
|
||||
// never forwarded to coderd.
|
||||
message BoundaryStatus {
|
||||
// Logs dropped because boundary's internal channel buffer was full.
|
||||
int64 dropped_channel_full = 1;
|
||||
// Logs dropped because boundary's batch buffer was full after a
|
||||
// failed flush attempt.
|
||||
int64 dropped_batch_full = 2;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
package boundarylogproxy
|
||||
|
||||
import "github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
// Metrics tracks observability for the boundary -> agent -> coderd audit log
|
||||
// pipeline.
|
||||
//
|
||||
// Audit logs from boundary workspaces pass through several async buffers
|
||||
// before reaching coderd, and any stage can silently drop data. These
|
||||
// metrics make that loss visible so operators/devs can:
|
||||
//
|
||||
// - Bubble up data loss: a non-zero drop rate means audit logs are being
|
||||
// lost, which may have auditing implications.
|
||||
// - Identify the bottleneck: the reason label pinpoints where drops
|
||||
// occur: boundary's internal buffers, the agent's channel, or the
|
||||
// RPC to coderd.
|
||||
// - Tune buffer sizes: sustained "buffer_full" drops indicate the
|
||||
// agent's channel (or boundary's batch buffer) is too small for the
|
||||
// workload. Combined with batches_forwarded_total you can compute a
|
||||
// drop rate: drops / (drops + forwards).
|
||||
// - Detect batch forwarding issues: "forward_failed" drops increase when
|
||||
// the agent cannot reach coderd.
|
||||
//
|
||||
// Drops are captured at two stages:
|
||||
// - Agent-side: the agent's channel buffer overflows (reason
|
||||
// "buffer_full") or the RPC forward to coderd fails (reason
|
||||
// "forward_failed").
|
||||
// - Boundary-reported: boundary self-reports drops via BoundaryStatus
|
||||
// messages (reasons "boundary_channel_full", "boundary_batch_full").
|
||||
// These arrive on the next successful flush from boundary.
|
||||
//
|
||||
// There are circumstances where metrics could be lost e.g., agent restarts,
|
||||
// boundary crashes, or the agent shuts down when the DRPC connection is down.
|
||||
type Metrics struct {
|
||||
batchesDropped *prometheus.CounterVec
|
||||
logsDropped *prometheus.CounterVec
|
||||
batchesForwarded prometheus.Counter
|
||||
}
|
||||
|
||||
func newMetrics(registerer prometheus.Registerer) *Metrics {
|
||||
batchesDropped := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "agent",
|
||||
Subsystem: "boundary_log_proxy",
|
||||
Name: "batches_dropped_total",
|
||||
Help: "Total number of boundary log batches dropped before reaching coderd. " +
|
||||
"Reason: buffer_full = the agent's internal buffer is full, meaning boundary is producing logs faster than the agent can forward them to coderd; " +
|
||||
"forward_failed = the agent failed to send the batch to coderd, potentially because coderd is unreachable or the connection was interrupted.",
|
||||
}, []string{"reason"})
|
||||
registerer.MustRegister(batchesDropped)
|
||||
|
||||
logsDropped := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "agent",
|
||||
Subsystem: "boundary_log_proxy",
|
||||
Name: "logs_dropped_total",
|
||||
Help: "Total number of individual boundary log entries dropped before reaching coderd. " +
|
||||
"Reason: buffer_full = the agent's internal buffer is full; " +
|
||||
"forward_failed = the agent failed to send the batch to coderd; " +
|
||||
"boundary_channel_full = boundary's internal send channel overflowed, meaning boundary is generating logs faster than it can batch and send them; " +
|
||||
"boundary_batch_full = boundary's outgoing batch buffer overflowed after a failed flush, meaning boundary could not write to the agent's socket.",
|
||||
}, []string{"reason"})
|
||||
registerer.MustRegister(logsDropped)
|
||||
|
||||
batchesForwarded := prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "agent",
|
||||
Subsystem: "boundary_log_proxy",
|
||||
Name: "batches_forwarded_total",
|
||||
Help: "Total number of boundary log batches successfully forwarded to coderd. " +
|
||||
"Compare with batches_dropped_total to compute a drop rate.",
|
||||
})
|
||||
registerer.MustRegister(batchesForwarded)
|
||||
|
||||
return &Metrics{
|
||||
batchesDropped: batchesDropped,
|
||||
logsDropped: logsDropped,
|
||||
batchesForwarded: batchesForwarded,
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/xerrors"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
@@ -26,6 +27,13 @@ const (
|
||||
logBufferSize = 100
|
||||
)
|
||||
|
||||
const (
|
||||
droppedReasonBoundaryChannelFull = "boundary_channel_full"
|
||||
droppedReasonBoundaryBatchFull = "boundary_batch_full"
|
||||
droppedReasonBufferFull = "buffer_full"
|
||||
droppedReasonForwardFailed = "forward_failed"
|
||||
)
|
||||
|
||||
// DefaultSocketPath returns the default path for the boundary audit log socket.
|
||||
func DefaultSocketPath() string {
|
||||
return filepath.Join(os.TempDir(), "boundary-audit.sock")
|
||||
@@ -43,6 +51,7 @@ type Reporter interface {
|
||||
type Server struct {
|
||||
logger slog.Logger
|
||||
socketPath string
|
||||
metrics *Metrics
|
||||
|
||||
listener net.Listener
|
||||
cancel context.CancelFunc
|
||||
@@ -53,10 +62,11 @@ type Server struct {
|
||||
}
|
||||
|
||||
// NewServer creates a new boundary log proxy server.
|
||||
func NewServer(logger slog.Logger, socketPath string) *Server {
|
||||
func NewServer(logger slog.Logger, socketPath string, registerer prometheus.Registerer) *Server {
|
||||
return &Server{
|
||||
logger: logger.Named("boundary-log-proxy"),
|
||||
socketPath: socketPath,
|
||||
metrics: newMetrics(registerer),
|
||||
logs: make(chan *agentproto.ReportBoundaryLogsRequest, logBufferSize),
|
||||
}
|
||||
}
|
||||
@@ -100,9 +110,13 @@ func (s *Server) RunForwarder(ctx context.Context, sender Reporter) error {
|
||||
s.logger.Warn(ctx, "failed to forward boundary logs",
|
||||
slog.Error(err),
|
||||
slog.F("log_count", len(req.Logs)))
|
||||
s.metrics.batchesDropped.WithLabelValues(droppedReasonForwardFailed).Inc()
|
||||
s.metrics.logsDropped.WithLabelValues(droppedReasonForwardFailed).Add(float64(len(req.Logs)))
|
||||
// Continue forwarding other logs. The current batch is lost,
|
||||
// but the socket stays alive.
|
||||
continue
|
||||
}
|
||||
s.metrics.batchesForwarded.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -177,6 +191,8 @@ func (s *Server) handleMessage(ctx context.Context, msg proto.Message) {
|
||||
switch inner := m.Msg.(type) {
|
||||
case *codec.BoundaryMessage_Logs:
|
||||
s.bufferLogs(ctx, inner.Logs)
|
||||
case *codec.BoundaryMessage_Status:
|
||||
s.recordBoundaryStatus(inner.Status)
|
||||
default:
|
||||
s.logger.Warn(ctx, "unknown BoundaryMessage variant")
|
||||
}
|
||||
@@ -185,12 +201,23 @@ func (s *Server) handleMessage(ctx context.Context, msg proto.Message) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) recordBoundaryStatus(status *codec.BoundaryStatus) {
|
||||
if n := status.DroppedChannelFull; n > 0 {
|
||||
s.metrics.logsDropped.WithLabelValues(droppedReasonBoundaryChannelFull).Add(float64(n))
|
||||
}
|
||||
if n := status.DroppedBatchFull; n > 0 {
|
||||
s.metrics.logsDropped.WithLabelValues(droppedReasonBoundaryBatchFull).Add(float64(n))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) bufferLogs(ctx context.Context, req *agentproto.ReportBoundaryLogsRequest) {
|
||||
select {
|
||||
case s.logs <- req:
|
||||
default:
|
||||
s.logger.Warn(ctx, "dropping boundary logs, buffer full",
|
||||
slog.F("log_count", len(req.Logs)))
|
||||
s.metrics.batchesDropped.WithLabelValues(droppedReasonBufferFull).Inc()
|
||||
s.metrics.logsDropped.WithLabelValues(droppedReasonBufferFull).Add(float64(len(req.Logs)))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
@@ -45,6 +46,20 @@ func sendLogs(t *testing.T, conn net.Conn, req *agentproto.ReportBoundaryLogsReq
|
||||
}
|
||||
}
|
||||
|
||||
// sendStatus writes a BoundaryMessage envelope containing a BoundaryStatus
|
||||
// to the connection using TagV2.
|
||||
func sendStatus(t *testing.T, conn net.Conn, status *codec.BoundaryStatus) {
|
||||
t.Helper()
|
||||
|
||||
msg := &codec.BoundaryMessage{
|
||||
Msg: &codec.BoundaryMessage_Status{Status: status},
|
||||
}
|
||||
err := codec.WriteMessage(conn, codec.TagV2, msg)
|
||||
if err != nil {
|
||||
t.Errorf("write status: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// fakeReporter implements boundarylogproxy.Reporter for testing.
|
||||
type fakeReporter struct {
|
||||
mu sync.Mutex
|
||||
@@ -87,7 +102,7 @@ func TestServer_StartAndClose(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -106,7 +121,7 @@ func TestServer_ReceiveAndForwardLogs(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -166,7 +181,7 @@ func TestServer_MultipleMessages(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -218,7 +233,7 @@ func TestServer_MultipleConnections(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -279,7 +294,7 @@ func TestServer_MessageTooLarge(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -307,7 +322,7 @@ func TestServer_ForwarderContinuesAfterError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -392,7 +407,7 @@ func TestServer_CloseStopsForwarder(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -421,7 +436,7 @@ func TestServer_InvalidProtobuf(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -480,7 +495,7 @@ func TestServer_InvalidHeader(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -530,7 +545,7 @@ func TestServer_AllowRequest(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -588,7 +603,7 @@ func TestServer_TagV1BackwardsCompatibility(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath)
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, prometheus.NewRegistry())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -660,3 +675,181 @@ func TestServer_TagV1BackwardsCompatibility(t *testing.T) {
|
||||
cancel()
|
||||
<-forwarderDone
|
||||
}
|
||||
|
||||
func TestServer_Metrics(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
makeReq := func(n int) *agentproto.ReportBoundaryLogsRequest {
|
||||
logs := make([]*agentproto.BoundaryLog, n)
|
||||
for i := range n {
|
||||
logs[i] = &agentproto.BoundaryLog{
|
||||
Allowed: true,
|
||||
Time: timestamppb.Now(),
|
||||
Resource: &agentproto.BoundaryLog_HttpRequest_{
|
||||
HttpRequest: &agentproto.BoundaryLog_HttpRequest{
|
||||
Method: "GET",
|
||||
Url: "https://example.com",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
return &agentproto.ReportBoundaryLogsRequest{Logs: logs}
|
||||
}
|
||||
|
||||
// BufferFull needs its own setup because it intentionally does not run
|
||||
// a forwarder so the channel fills up.
|
||||
t.Run("BufferFull", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, reg)
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, srv.Close()) })
|
||||
|
||||
conn, err := net.Dial("unix", socketPath)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
// Fill the buffer (size 100) without running a forwarder so nothing
|
||||
// drains. Then send one more to trigger the drop path.
|
||||
for range 101 {
|
||||
sendLogs(t, conn, makeReq(1))
|
||||
}
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return getCounterVecValue(t, reg, "agent_boundary_log_proxy_batches_dropped_total", "buffer_full") >= 1
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
require.GreaterOrEqual(t,
|
||||
getCounterVecValue(t, reg, "agent_boundary_log_proxy_logs_dropped_total", "buffer_full"),
|
||||
float64(1))
|
||||
})
|
||||
|
||||
// The remaining metrics share one server, forwarder, and connection. The
|
||||
// phases run sequentially so metrics accumulate.
|
||||
t.Run("Forwarding", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
socketPath := filepath.Join(testutil.TempDirUnixSocket(t), "boundary.sock")
|
||||
srv := boundarylogproxy.NewServer(testutil.Logger(t), socketPath, reg)
|
||||
|
||||
err := srv.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, srv.Close()) })
|
||||
|
||||
reportNotify := make(chan struct{}, 4)
|
||||
reporter := &fakeReporter{
|
||||
err: context.DeadlineExceeded,
|
||||
errOnce: true,
|
||||
reportCb: func() {
|
||||
select {
|
||||
case reportNotify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
forwarderDone := make(chan error, 1)
|
||||
go func() {
|
||||
forwarderDone <- srv.RunForwarder(ctx, reporter)
|
||||
}()
|
||||
|
||||
conn, err := net.Dial("unix", socketPath)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
// Phase 1: the first forward errors
|
||||
sendLogs(t, conn, makeReq(2))
|
||||
|
||||
select {
|
||||
case <-reportNotify:
|
||||
case <-time.After(testutil.WaitShort):
|
||||
t.Fatal("timed out waiting for forward attempt")
|
||||
}
|
||||
|
||||
// The metric is incremented after ReportBoundaryLogs returns, so we
|
||||
// need to poll briefly.
|
||||
require.Eventually(t, func() bool {
|
||||
return getCounterVecValue(t, reg, "agent_boundary_log_proxy_batches_dropped_total", "forward_failed") >= 1
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
require.Equal(t, float64(2),
|
||||
getCounterVecValue(t, reg, "agent_boundary_log_proxy_logs_dropped_total", "forward_failed"))
|
||||
|
||||
// Phase 2: forward succeeds.
|
||||
sendLogs(t, conn, makeReq(1))
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return len(reporter.getLogs()) >= 1
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
require.Equal(t, float64(1),
|
||||
getCounterValue(t, reg, "agent_boundary_log_proxy_batches_forwarded_total"))
|
||||
|
||||
// Phase 3: boundary-reported drop counts arrive as a separate BoundaryStatus
|
||||
// message, not piggybacked on log batches.
|
||||
sendStatus(t, conn, &codec.BoundaryStatus{
|
||||
DroppedChannelFull: 5,
|
||||
DroppedBatchFull: 3,
|
||||
})
|
||||
|
||||
// Status is handled immediately by the reader goroutine, not by the
|
||||
// forwarder, so poll metrics directly.
|
||||
require.Eventually(t, func() bool {
|
||||
return getCounterVecValue(t, reg, "agent_boundary_log_proxy_logs_dropped_total", "boundary_channel_full") >= 5
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
require.Equal(t, float64(5),
|
||||
getCounterVecValue(t, reg, "agent_boundary_log_proxy_logs_dropped_total", "boundary_channel_full"))
|
||||
require.Equal(t, float64(3),
|
||||
getCounterVecValue(t, reg, "agent_boundary_log_proxy_logs_dropped_total", "boundary_batch_full"))
|
||||
|
||||
cancel()
|
||||
<-forwarderDone
|
||||
})
|
||||
}
|
||||
|
||||
// getCounterVecValue returns the current value of a CounterVec metric filtered
|
||||
// by the given reason label.
|
||||
func getCounterVecValue(t *testing.T, reg *prometheus.Registry, name, reason string) float64 {
|
||||
t.Helper()
|
||||
|
||||
metrics, err := reg.Gather()
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, mf := range metrics {
|
||||
if mf.GetName() != name {
|
||||
continue
|
||||
}
|
||||
for _, m := range mf.GetMetric() {
|
||||
for _, lp := range m.GetLabel() {
|
||||
if lp.GetName() == "reason" && lp.GetValue() == reason {
|
||||
return m.GetCounter().GetValue()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
// getCounterValue returns the current value of a Counter metric.
|
||||
func getCounterValue(t *testing.T, reg *prometheus.Registry, name string) float64 {
|
||||
t.Helper()
|
||||
|
||||
metrics, err := reg.Gather()
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, mf := range metrics {
|
||||
if mf.GetName() != name {
|
||||
continue
|
||||
}
|
||||
for _, m := range mf.GetMetric() {
|
||||
return m.GetCounter().GetValue()
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -105,7 +105,10 @@ deployment. They will always be available from the agent.
|
||||
<!-- Code generated by 'make docs/admin/integrations/prometheus.md'. DO NOT EDIT -->
|
||||
|
||||
| Name | Type | Description | Labels |
|
||||
|-------------------------------------------------------------------------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------|
|
||||
|-------------------------------------------------------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------|
|
||||
| `agent_boundary_log_proxy_batches_dropped_total` | counter | Total number of boundary log batches dropped before reaching coderd. Reason: buffer_full = the agent's internal buffer is full, meaning boundary is producing logs faster than the agent can forward them to coderd; forward_failed = the agent failed to send the batch to coderd, potentially because coderd is unreachable or the connection was interrupted. | `reason` |
|
||||
| `agent_boundary_log_proxy_batches_forwarded_total` | counter | Total number of boundary log batches successfully forwarded to coderd. Compare with batches_dropped_total to compute a drop rate. | |
|
||||
| `agent_boundary_log_proxy_logs_dropped_total` | counter | Total number of individual boundary log entries dropped before reaching coderd. Reason: buffer_full = the agent's internal buffer is full; forward_failed = the agent failed to send the batch to coderd; boundary_channel_full = boundary's internal send channel overflowed, meaning boundary is generating logs faster than it can batch and send them; boundary_batch_full = boundary's outgoing batch buffer overflowed after a failed flush, meaning boundary could not write to the agent's socket. | `reason` |
|
||||
| `agent_scripts_executed_total` | counter | Total number of scripts executed by the Coder agent. Includes cron scheduled scripts. | `agent_name` `success` `template_name` `username` `workspace_name` |
|
||||
| `coder_aibridged_circuit_breaker_rejects_total` | counter | Total number of requests rejected due to open circuit breaker. | `endpoint` `model` `provider` |
|
||||
| `coder_aibridged_circuit_breaker_state` | gauge | Current state of the circuit breaker (0=closed, 0.5=half-open, 1=open). | `endpoint` `model` `provider` |
|
||||
|
||||
@@ -1,3 +1,12 @@
|
||||
# HELP agent_boundary_log_proxy_batches_dropped_total Total number of boundary log batches dropped before reaching coderd. Reason: buffer_full = the agent's internal buffer is full, meaning boundary is producing logs faster than the agent can forward them to coderd; forward_failed = the agent failed to send the batch to coderd, potentially because coderd is unreachable or the connection was interrupted.
|
||||
# TYPE agent_boundary_log_proxy_batches_dropped_total counter
|
||||
agent_boundary_log_proxy_batches_dropped_total{reason=""} 0
|
||||
# HELP agent_boundary_log_proxy_batches_forwarded_total Total number of boundary log batches successfully forwarded to coderd. Compare with batches_dropped_total to compute a drop rate.
|
||||
# TYPE agent_boundary_log_proxy_batches_forwarded_total counter
|
||||
agent_boundary_log_proxy_batches_forwarded_total 0
|
||||
# HELP agent_boundary_log_proxy_logs_dropped_total Total number of individual boundary log entries dropped before reaching coderd. Reason: buffer_full = the agent's internal buffer is full; forward_failed = the agent failed to send the batch to coderd; boundary_channel_full = boundary's internal send channel overflowed, meaning boundary is generating logs faster than it can batch and send them; boundary_batch_full = boundary's outgoing batch buffer overflowed after a failed flush, meaning boundary could not write to the agent's socket.
|
||||
# TYPE agent_boundary_log_proxy_logs_dropped_total counter
|
||||
agent_boundary_log_proxy_logs_dropped_total{reason=""} 0
|
||||
# HELP coder_pubsub_connected Whether we are connected (1) or not connected (0) to postgres
|
||||
# TYPE coder_pubsub_connected gauge
|
||||
coder_pubsub_connected 0
|
||||
|
||||
Reference in New Issue
Block a user