From 754d64d339df65f07889218708bbbc52b5c3d301 Mon Sep 17 00:00:00 2001 From: Bruce Marriner Date: Thu, 26 May 2016 19:35:17 -0500 Subject: [PATCH] Initial code to support voice reconnecting Also includes some logging improvements and a small fix to track speaking state and to send speaking packet if needed before sending opus packets. --- voice.go | 99 +++++++++++++++++++++++++++++++++++++++++++++++--------- wsapi.go | 7 ++++ 2 files changed, 91 insertions(+), 15 deletions(-) diff --git a/voice.go b/voice.go index a16a04f..53f9443 100644 --- a/voice.go +++ b/voice.go @@ -32,14 +32,16 @@ import ( type VoiceConnection struct { sync.RWMutex - Debug bool // If true, print extra logging -- DEPRECATED - LogLevel int - Ready bool // If true, voice is ready to send/receive audio - UserID string - GuildID string - ChannelID string - deaf bool - mute bool + Debug bool // If true, print extra logging -- DEPRECATED + LogLevel int + Ready bool // If true, voice is ready to send/receive audio + UserID string + GuildID string + ChannelID string + deaf bool + mute bool + speaking bool + reconnecting bool // If true, voice connection is trying to reconnect OpusSend chan []byte // Chan for sending opus audio OpusRecv chan *Packet // Chan for receiving opus audio @@ -78,6 +80,8 @@ type VoiceSpeakingUpdateHandler func(vc *VoiceConnection, vs *VoiceSpeakingUpdat // b : Send true if speaking, false if not. func (v *VoiceConnection) Speaking(b bool) (err error) { + v.log(LogDebug, "called (%t)", b) + type voiceSpeakingData struct { Speaking bool `json:"speaking"` Delay int `json:"delay"` @@ -97,9 +101,11 @@ func (v *VoiceConnection) Speaking(b bool) (err error) { err = v.wsConn.WriteJSON(data) v.wsMutex.Unlock() if err != nil { + v.speaking = false log.Println("Speaking() write json error:", err) return } + v.speaking = b return } @@ -118,6 +124,7 @@ func (v *VoiceConnection) ChangeChannel(channelID string, mute, deaf bool) (err v.ChannelID = channelID v.deaf = deaf v.mute = mute + v.speaking = false return } @@ -148,10 +155,13 @@ func (v *VoiceConnection) Disconnect() (err error) { // Close closes the voice ws and udp connections func (v *VoiceConnection) Close() { + v.log(LogInformational, "called") + v.Lock() defer v.Unlock() v.Ready = false + v.speaking = false if v.close != nil { close(v.close) @@ -234,11 +244,14 @@ func (v *VoiceConnection) waitUntilConnected() error { // are captured. func (v *VoiceConnection) open() (err error) { + v.log(LogInformational, "called") + v.Lock() defer v.Unlock() // Don't open a websocket if one is already open if v.wsConn != nil { + v.log(LogWarning, "refusing to overwrite non-nil websocket") return } @@ -321,14 +334,10 @@ func (v *VoiceConnection) wsListen(wsConn *websocket.Conn, close <-chan struct{} } v.log(LogDebug, "neterr udp error %s", neterr.Error()) } - // There has been an error reading, Close() the websocket so that - // OnDisconnect is fired. - // TODO add Voice OnDisconnect event :) - v.Close() - // TODO: close should return errs like data websocket Close - // Attempt to reconnect, with expenonential backoff up to 10 minutes. - // TODO add reconnect code + // Start reconnect goroutine then exit. + go v.reconnect() + } return } @@ -660,6 +669,13 @@ func (v *VoiceConnection) opusSender(udpConn *net.UDPConn, close <-chan struct{} // else, continue loop } + if !v.speaking { + err := v.Speaking(true) + if err != nil { + v.log(LogError, "error sending speaking packet, %s", err) + } + } + // Add sequence and timestamp to udpPacket binary.BigEndian.PutUint16(udpHeader[2:], sequence) binary.BigEndian.PutUint32(udpHeader[4:], timestamp) @@ -797,3 +813,56 @@ func (v *VoiceConnection) opusReceiver(udpConn *net.UDPConn, close <-chan struct } } } + +// Reconnect will close down a voice connection then immediately try to +// reconnect to that session. +func (v *VoiceConnection) reconnect() { + + v.Lock() + if v.reconnecting { + return + } + v.reconnecting = true + v.Unlock() + + defer func() { v.reconnecting = false }() + + v.log(LogInformational, "called") + + v.Close() + + // Take a short nap to allow everything to close. + time.Sleep(1 * time.Second) + + wait := time.Duration(1) + + // TODO After X attempts abort. + // Right now this code has the potential to create abandoned goroutines + + for { + + if v.session == nil { + v.log(LogInformational, "cannot reconnect with nil session") + return + } + + if v.session.DataReady == false { + v.log(LogInformational, "cannot reconenct with unready session") + continue + } + + v.log(LogInformational, "trying to reconnect to voice") + + _, err := v.session.ChannelVoiceJoin(v.GuildID, v.ChannelID, v.mute, v.deaf) + if err == nil { + v.log(LogInformational, "successfully reconnected to voice") + return + } + + <-time.After(wait * time.Second) + wait *= 2 + if wait > 600 { + wait = 600 + } + } +} diff --git a/wsapi.go b/wsapi.go index c11c966..26e92df 100644 --- a/wsapi.go +++ b/wsapi.go @@ -236,6 +236,11 @@ func (s *Session) listen(wsConn *websocket.Conn, listening <-chan interface{}) { if s.Open() == nil { s.log(LogInformational, "successfully reconnected to gateway") + + // Now, if we have any VoiceConnections, reconnect all of them. + for _, v := range s.VoiceConnections { + go v.reconnect() + } return } @@ -607,6 +612,8 @@ func (s *Session) onVoiceStateUpdate(se *Session, st *VoiceStateUpdate) { // the new region endpoint. func (s *Session) onVoiceServerUpdate(se *Session, st *VoiceServerUpdate) { + s.log(LogInformational, "called") + voice, exists := s.VoiceConnections[st.GuildID] // If no VoiceConnection exists, just skip this