mirror of
https://github.com/coder/coder.git
synced 2026-06-02 20:48:20 +00:00
fix: increase MaxMessageSize to 16 MiB (#24599)
This commit is contained in:
@@ -739,9 +739,12 @@ func (m *mapper) run() {
|
|||||||
m.logger.Debug(m.ctx, "skipping nil node update")
|
m.logger.Debug(m.ctx, "skipping nil node update")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := m.c.Enqueue(update); err != nil {
|
for _, chunk := range update.Chunked() {
|
||||||
// lots of reasons this could happen, most usually, the peer has disconnected.
|
if err := m.c.Enqueue(chunk); err != nil {
|
||||||
m.logger.Debug(m.ctx, "failed to enqueue node update", slog.Error(err))
|
// lots of reasons this could happen, most usually, the peer has disconnected.
|
||||||
|
m.logger.Debug(m.ctx, "failed to enqueue chunk", slog.Error(err))
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+9
-7
@@ -70,14 +70,16 @@ func (p *peer) batchUpdateMappingLocked(others []*peer, k proto.CoordinateRespon
|
|||||||
if len(req.PeerUpdates) == 0 {
|
if len(req.PeerUpdates) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
select {
|
for _, chunk := range req.Chunked() {
|
||||||
case p.resps <- req:
|
select {
|
||||||
p.lastWrite = time.Now()
|
case p.resps <- chunk:
|
||||||
p.logger.Debug(context.Background(), "wrote batched update", slog.F("num_peer_updates", len(req.PeerUpdates)))
|
p.lastWrite = time.Now()
|
||||||
return nil
|
default:
|
||||||
default:
|
return ErrWouldBlock
|
||||||
return ErrWouldBlock
|
}
|
||||||
}
|
}
|
||||||
|
p.logger.Debug(context.Background(), "wrote batched update", slog.F("num_peer_updates", len(req.PeerUpdates)))
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var errNoResp = xerrors.New("no response needed")
|
var errNoResp = xerrors.New("no response needed")
|
||||||
|
|||||||
@@ -0,0 +1,22 @@
|
|||||||
|
package proto
|
||||||
|
|
||||||
|
// maxPeerUpdatesPerMessage is the maximum number of peer updates that
|
||||||
|
// can be sent in a single CoordinateResponse to stay under DRPC
|
||||||
|
// message size limits.
|
||||||
|
const maxPeerUpdatesPerMessage = 1024
|
||||||
|
|
||||||
|
// Chunked splits the response into multiple responses, each containing
|
||||||
|
// at most maxPeerUpdatesPerMessage peer updates to stay under the DRPC
|
||||||
|
// 4 MiB transport limit.
|
||||||
|
func (r *CoordinateResponse) Chunked() []*CoordinateResponse {
|
||||||
|
updates := r.GetPeerUpdates()
|
||||||
|
if len(updates) <= maxPeerUpdatesPerMessage {
|
||||||
|
return []*CoordinateResponse{r}
|
||||||
|
}
|
||||||
|
var chunks []*CoordinateResponse
|
||||||
|
for i := 0; i < len(updates); i += maxPeerUpdatesPerMessage {
|
||||||
|
end := min(i+maxPeerUpdatesPerMessage, len(updates))
|
||||||
|
chunks = append(chunks, &CoordinateResponse{PeerUpdates: updates[i:end]})
|
||||||
|
}
|
||||||
|
return chunks
|
||||||
|
}
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
package proto_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/coder/coder/v2/tailnet/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCoordinateResponse_Chunked(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("NoChunkingNeeded", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
resp := &proto.CoordinateResponse{
|
||||||
|
PeerUpdates: make([]*proto.CoordinateResponse_PeerUpdate, 100),
|
||||||
|
}
|
||||||
|
chunks := resp.Chunked()
|
||||||
|
require.Len(t, chunks, 1)
|
||||||
|
require.Equal(t, resp, chunks[0])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ExactLimit", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
resp := &proto.CoordinateResponse{
|
||||||
|
PeerUpdates: make([]*proto.CoordinateResponse_PeerUpdate, 1024),
|
||||||
|
}
|
||||||
|
chunks := resp.Chunked()
|
||||||
|
require.Len(t, chunks, 1)
|
||||||
|
require.Equal(t, resp, chunks[0])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("MultipleChunks", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
n := 1024*3 + 500
|
||||||
|
resp := &proto.CoordinateResponse{
|
||||||
|
PeerUpdates: make([]*proto.CoordinateResponse_PeerUpdate, n),
|
||||||
|
}
|
||||||
|
chunks := resp.Chunked()
|
||||||
|
require.Len(t, chunks, 4)
|
||||||
|
total := 0
|
||||||
|
for _, c := range chunks {
|
||||||
|
total += len(c.GetPeerUpdates())
|
||||||
|
}
|
||||||
|
require.Equal(t, n, total)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("EmptyResponse", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
resp := &proto.CoordinateResponse{}
|
||||||
|
chunks := resp.Chunked()
|
||||||
|
require.Len(t, chunks, 1)
|
||||||
|
})
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user