From 548d61c4065dcc8f5ffbfaa5881764896a8e43cd Mon Sep 17 00:00:00 2001 From: Bruce Marriner Date: Thu, 18 Feb 2016 21:41:37 -0600 Subject: [PATCH] Add ability to cleaned close voice websocket and udp connections --- voice.go | 188 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 129 insertions(+), 59 deletions(-) diff --git a/voice.go b/voice.go index a36c196..9c09d36 100644 --- a/voice.go +++ b/voice.go @@ -46,6 +46,9 @@ type Voice struct { guildID string channelID string userID string + + // Used to send a close signal to goroutines + close chan struct{} } // ------------------------------------------------------------------------------------------------ @@ -78,7 +81,8 @@ type voiceHandshakeOp struct { // are captured. func (v *Voice) Open() (err error) { - // TODO: How do we handle changing channels? + v.Lock() + defer v.Unlock() // Don't open a websocket if one is already open if v.wsConn != nil { @@ -104,44 +108,33 @@ func (v *Voice) Open() (err error) { // Start a listening for voice websocket events // TODO add a check here to make sure Listen worked by monitoring // a chan or bool? - go v.wsListen() + v.close = make(chan struct{}) + go v.wsListen(v.wsConn, v.close) return } -// Close closes the voice connection -func (v *Voice) Close() { - - if v.UDPConn != nil { - err := v.UDPConn.Close() - if err != nil { - fmt.Println("error closing udp connection: ", err) - } - } - - if v.wsConn != nil { - err := v.wsConn.Close() - if err != nil { - fmt.Println("error closing websocket connection: ", err) - } - } -} - // wsListen listens on the voice websocket for messages and passes them // to the voice event handler. This is automatically called by the Open func -func (v *Voice) wsListen() { +func (v *Voice) wsListen(wsConn *websocket.Conn, close <-chan struct{}) { for { messageType, message, err := v.wsConn.ReadMessage() if err != nil { + // TODO: add reconnect, matching wsapi.go:listen() // TODO: Handle this problem better. // TODO: needs proper logging fmt.Println("Voice Listen Error:", err) - break + return } // Pass received message to voice event handler - go v.wsEvent(messageType, message) + select { + case <-close: + return + default: + go v.wsEvent(messageType, message) + } } return @@ -174,7 +167,7 @@ func (v *Voice) wsEvent(messageType int, message []byte) { } // Start the voice websocket heartbeat to keep the connection alive - go v.wsHeartbeat(v.OP2.HeartbeatInterval) + go v.wsHeartbeat(v.wsConn, v.close, v.OP2.HeartbeatInterval) // TODO monitor a chan/bool to verify this was successful // Start the UDP connection @@ -186,12 +179,16 @@ func (v *Voice) wsEvent(messageType int, message []byte) { // Start the opusSender. // TODO: Should we allow 48000/960 values to be user defined? - v.OpusSend = make(chan []byte, 2) - go v.opusSender(v.OpusSend, 48000, 960) + if v.OpusSend == nil { + v.OpusSend = make(chan []byte, 2) + } + go v.opusSender(v.UDPConn, v.close, v.OpusSend, 48000, 960) // Start the opusReceiver - v.OpusRecv = make(chan *Packet, 2) - go v.opusReceiver(v.OpusRecv) + if v.OpusRecv == nil { + v.OpusRecv = make(chan *Packet, 2) + } + go v.opusReceiver(v.UDPConn, v.close, v.OpusRecv) return case 3: // HEARTBEAT response @@ -224,20 +221,33 @@ type voiceHeartbeatOp struct { Data int `json:"d"` } +// NOTE :: When a guild voice server changes how do we shut this down +// properly, so a new connection can be setup without fuss? +// // wsHeartbeat sends regular heartbeats to voice Discord so it knows the client // is still connected. If you do not send these heartbeats Discord will // disconnect the websocket connection after a few seconds. -func (v *Voice) wsHeartbeat(i time.Duration) { +func (v *Voice) wsHeartbeat(wsConn *websocket.Conn, close <-chan struct{}, i time.Duration) { + if close == nil || wsConn == nil { + return + } + + var err error ticker := time.NewTicker(i * time.Millisecond) for { - err := v.wsConn.WriteJSON(voiceHeartbeatOp{3, int(time.Now().Unix())}) + err = wsConn.WriteJSON(voiceHeartbeatOp{3, int(time.Now().Unix())}) if err != nil { - v.Ready = false fmt.Println("wsHeartbeat send error: ", err) - return // TODO better logging + return + } + + select { + case <-ticker.C: + // continue loop and send heartbeat + case <-close: + return } - <-ticker.C } } @@ -297,6 +307,21 @@ type voiceUDPOp struct { // from voice.wsEvent OP2 func (v *Voice) udpOpen() (err error) { + v.Lock() + defer v.Unlock() + + if v.wsConn == nil { + return fmt.Errorf("nil voice websocket") + } + + if v.close == nil { + return fmt.Errorf("nil close channel") + } + + if v.endpoint == "" { + return fmt.Errorf("empty endpoint") + } + host := fmt.Sprintf("%s:%d", strings.TrimSuffix(v.endpoint, ":80"), v.OP2.Port) addr, err := net.ResolveUDPAddr("udp", host) if err != nil { @@ -362,7 +387,7 @@ func (v *Voice) udpOpen() (err error) { } // start udpKeepAlive - go v.udpKeepAlive(5 * time.Second) + go v.udpKeepAlive(v.UDPConn, v.close, 5*time.Second) // TODO: find a way to check that it fired off okay return @@ -370,7 +395,11 @@ func (v *Voice) udpOpen() (err error) { // udpKeepAlive sends a udp packet to keep the udp connection open // This is still a bit of a "proof of concept" -func (v *Voice) udpKeepAlive(i time.Duration) { +func (v *Voice) udpKeepAlive(UDPConn *net.UDPConn, close <-chan struct{}, i time.Duration) { + + if UDPConn == nil || close == nil { + return + } var err error var sequence uint64 @@ -384,34 +413,37 @@ func (v *Voice) udpKeepAlive(i time.Duration) { binary.LittleEndian.PutUint64(packet, sequence) sequence++ - _, err = v.UDPConn.Write(packet) + _, err = UDPConn.Write(packet) if err != nil { fmt.Println("udpKeepAlive udp write error : ", err) return } - <-ticker.C + + select { + case <-ticker.C: + // continue loop and send keepalive + case <-close: + return + } } } // opusSender will listen on the given channel and send any // pre-encoded opus audio to Discord. Supposedly. -func (v *Voice) opusSender(opus <-chan []byte, rate, size int) { +func (v *Voice) opusSender(UDPConn *net.UDPConn, close <-chan struct{}, opus <-chan []byte, rate, size int) { - // TODO: Better checking to prevent this from running more than - // one instance at a time. - v.Lock() - if opus == nil { - v.Unlock() + if UDPConn == nil || close == nil { return } - v.Unlock() runtime.LockOSThread() // Voice is now ready to receive audio packets // TODO: this needs reviewed as I think there must be a better way. v.Ready = true - defer func() { v.Ready = false }() + defer func() { + v.Ready = false + }() var sequence uint16 var timestamp uint32 @@ -426,26 +458,32 @@ func (v *Voice) opusSender(opus <-chan []byte, rate, size int) { ticker := time.NewTicker(time.Millisecond * time.Duration(size/(rate/1000))) for { - // Add sequence and timestamp to udpPacket - binary.BigEndian.PutUint16(udpHeader[2:], sequence) - binary.BigEndian.PutUint32(udpHeader[4:], timestamp) - // Get data from chan. If chan is closed, return. recvbuf, ok := <-opus if !ok { return } + // Add sequence and timestamp to udpPacket + binary.BigEndian.PutUint16(udpHeader[2:], sequence) + binary.BigEndian.PutUint32(udpHeader[4:], timestamp) + // Combine the UDP Header and the opus data sendbuf := append(udpHeader, recvbuf...) // block here until we're exactly at the right time :) // Then send rtp audio packet to Discord over UDP - <-ticker.C - _, err := v.UDPConn.Write(sendbuf) + select { + case <-ticker.C: + // continue + case <-close: + return + } + _, err := UDPConn.Write(sendbuf) if err != nil { fmt.Println("error writing to udp connection: ", err) + return } if (sequence) == 0xFFFF { @@ -475,27 +513,29 @@ type Packet struct { // opusReceiver listens on the UDP socket for incoming packets // and sends them across the given channel // NOTE :: This function may change names later. -func (v *Voice) opusReceiver(c chan *Packet) { +func (v *Voice) opusReceiver(UDPConn *net.UDPConn, close <-chan struct{}, c chan *Packet) { - // TODO: Better checking to prevent this from running more than - // one instance at a time. - v.Lock() - if c == nil { - v.Unlock() + if UDPConn == nil || close == nil { return } - v.Unlock() p := Packet{} recvbuf := make([]byte, 1024) for { - rlen, err := v.UDPConn.Read(recvbuf) + rlen, err := UDPConn.Read(recvbuf) if err != nil { fmt.Println("opusReceiver UDP Read error:", err) return } + select { + case <-close: + return + default: + // continue loop + } + // For now, skip anything except audio. if rlen < 12 || recvbuf[0] != 0x80 { continue @@ -512,3 +552,33 @@ func (v *Voice) opusReceiver(c chan *Packet) { } } } + +// Close closes the voice ws and udp connections +func (v *Voice) Close() { + + v.Lock() + defer v.Unlock() + + v.Ready = false + + if v.close != nil { + close(v.close) + v.close = nil + } + + if v.UDPConn != nil { + err := v.UDPConn.Close() + if err != nil { + fmt.Println("error closing udp connection: ", err) + } + v.UDPConn = nil + } + + if v.wsConn != nil { + err := v.wsConn.Close() + if err != nil { + fmt.Println("error closing websocket connection: ", err) + } + v.wsConn = nil + } +}