From 30dae97c3e7800c1f7b7544841e61096bb048715 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 27 Jan 2022 09:14:52 -0600 Subject: [PATCH] chore: Buffer remote candidates like local (#77) * chore: Buffer remote candidates like local This was added for local candidates, and is required for remote to prevent a race where they are added before a negotiation is complete. I removed the mutex earlier, because it would cause a different race. I didn't realize the remote candidates wouldn't be buffered, but with this change they are! * Use local description instead * Add logging for candidate flush * Fix race with atomic bool * Simplify locks * Add mutex to flush * Reset buffer * Remove leak dependency to limit confusion * Fix ordering * Revert channel close * Flush candidates after remote session description is set * Bump up count to ensure race is fixed * Use custom ICE dependency * Fix data race * Lower timeout to make for fast CI * Add back mutex to prevent race * Improve debug logging * Lock on local description * Flush local candidates uniquely * Fix race * Move mutex to prevent candidate send race * Move lock to handshake so no race can occur * Reduce timeout to improve test times * Move unlock to defer * Use flushed bool instead of checking remote --- peer/channel.go | 1 + peer/conn.go | 34 ++++++++++++++++++++++++++-------- peer/conn_test.go | 4 ++-- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/peer/channel.go b/peer/channel.go index 8c3f511899..2c95d495c4 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -281,6 +281,7 @@ func (c *Channel) closeWithError(err error) error { c.conn.dcDisconnectListeners.Sub(1) c.conn.dcFailedListeners.Sub(1) c.conn.dcClosedWaitGroup.Done() + return err } diff --git a/peer/conn.go b/peer/conn.go index 11e2a05f0c..6eddee070a 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -72,8 +72,8 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp dcDisconnectChannel: make(chan struct{}), dcFailedChannel: make(chan struct{}), localCandidateChannel: make(chan webrtc.ICECandidateInit), - localSessionDescriptionChannel: make(chan webrtc.SessionDescription), pendingCandidates: make([]webrtc.ICECandidateInit, 0), + localSessionDescriptionChannel: make(chan webrtc.SessionDescription), remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription), } if client { @@ -120,8 +120,9 @@ type Conn struct { localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription - pendingCandidates []webrtc.ICECandidateInit - pendingCandidatesMutex sync.Mutex + pendingCandidates []webrtc.ICECandidateInit + pendingCandidatesMutex sync.Mutex + pendingCandidatesFlushed bool pingChannelID uint16 pingEchoChannelID uint16 @@ -141,15 +142,15 @@ func (c *Conn) init() error { if iceCandidate == nil { return } - // ICE Candidates on a remote peer are reset when an offer - // is received. We must wait until the offer<->answer has - // been negotiated to flush candidates. c.pendingCandidatesMutex.Lock() defer c.pendingCandidatesMutex.Unlock() - if c.rtc.RemoteDescription() == nil { + + if !c.pendingCandidatesFlushed { + c.opts.Logger.Debug(context.Background(), "adding local candidate to buffer") c.pendingCandidates = append(c.pendingCandidates, iceCandidate.ToJSON()) return } + c.opts.Logger.Debug(context.Background(), "adding local candidate") select { case <-c.closed: break @@ -282,10 +283,17 @@ func (c *Conn) negotiate() { err := c.rtc.SetRemoteDescription(remoteDescription) if err != nil { + c.pendingCandidatesMutex.Unlock() _ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err)) return } + if c.offerrer { + // ICE candidates reset when an offer/answer is set for the first + // time. If candidates flush before this point, a connection could fail. + c.flushPendingCandidates() + } + if !c.offerrer { answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) if err != nil { @@ -305,11 +313,19 @@ func (c *Conn) negotiate() { return case c.localSessionDescriptionChannel <- answer: } - } + // Wait until the local description is set to flush candidates. + c.flushPendingCandidates() + } +} + +// flushPendingCandidates writes all local candidates to the candidate send channel. +// The localCandidateChannel is expected to be serviced, otherwise this could block. +func (c *Conn) flushPendingCandidates() { c.pendingCandidatesMutex.Lock() defer c.pendingCandidatesMutex.Unlock() for _, pendingCandidate := range c.pendingCandidates { + c.opts.Logger.Debug(context.Background(), "flushing local candidate") select { case <-c.closed: return @@ -317,6 +333,7 @@ func (c *Conn) negotiate() { } } c.pendingCandidates = make([]webrtc.ICECandidateInit, 0) + c.pendingCandidatesFlushed = true c.opts.Logger.Debug(context.Background(), "flushed candidates") } @@ -328,6 +345,7 @@ func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { // AddRemoteCandidate adds a remote candidate to the RTC connection. func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { + c.opts.Logger.Debug(context.Background(), "adding remote candidate") return c.rtc.AddICECandidate(i) } diff --git a/peer/conn_test.go b/peer/conn_test.go index 0b954579c9..f964a61bfe 100644 --- a/peer/conn_test.go +++ b/peer/conn_test.go @@ -35,11 +35,11 @@ var ( // In CI resources are frequently contended, so increasing this value // results in less flakes. if os.Getenv("CI") == "true" { - return 4 * time.Second + return 3 * time.Second } return 100 * time.Millisecond }() - failedTimeout = disconnectedTimeout * 4 + failedTimeout = disconnectedTimeout * 3 keepAliveInterval = time.Millisecond * 2 // There's a global race in the vnet library allocation code.