no concurrent writes to voice ws, log cleanup

This commit is contained in:
Bruce Marriner 2016-05-01 19:54:28 -05:00
parent 5dc57e19c7
commit cb9e30670f
2 changed files with 86 additions and 34 deletions

108
voice.go
View file

@ -45,6 +45,7 @@ type VoiceConnection struct {
OpusRecv chan *Packet // Chan for receiving opus audio OpusRecv chan *Packet // Chan for receiving opus audio
wsConn *websocket.Conn wsConn *websocket.Conn
wsMutex sync.Mutex
udpConn *net.UDPConn udpConn *net.UDPConn
session *Session session *Session
@ -92,7 +93,9 @@ func (v *VoiceConnection) Speaking(b bool) (err error) {
} }
data := voiceSpeakingOp{5, voiceSpeakingData{b, 0}} data := voiceSpeakingOp{5, voiceSpeakingData{b, 0}}
v.wsMutex.Lock()
err = v.wsConn.WriteJSON(data) err = v.wsConn.WriteJSON(data)
v.wsMutex.Unlock()
if err != nil { if err != nil {
log.Println("Speaking() write json error:", err) log.Println("Speaking() write json error:", err)
return return
@ -106,7 +109,9 @@ func (v *VoiceConnection) Speaking(b bool) (err error) {
func (v *VoiceConnection) ChangeChannel(channelID string, mute, deaf bool) (err error) { func (v *VoiceConnection) ChangeChannel(channelID string, mute, deaf bool) (err error) {
data := voiceChannelJoinOp{4, voiceChannelJoinData{&v.GuildID, &channelID, mute, deaf}} data := voiceChannelJoinOp{4, voiceChannelJoinData{&v.GuildID, &channelID, mute, deaf}}
v.wsMutex.Lock()
err = v.session.wsConn.WriteJSON(data) err = v.session.wsConn.WriteJSON(data)
v.wsMutex.Unlock()
return return
} }
@ -119,7 +124,9 @@ func (v *VoiceConnection) Disconnect() (err error) {
// Send a OP4 with a nil channel to disconnect // Send a OP4 with a nil channel to disconnect
if v.sessionID != "" { if v.sessionID != "" {
data := voiceChannelJoinOp{4, voiceChannelJoinData{&v.GuildID, nil, true, true}} data := voiceChannelJoinOp{4, voiceChannelJoinData{&v.GuildID, nil, true, true}}
v.wsMutex.Lock()
err = v.session.wsConn.WriteJSON(data) err = v.session.wsConn.WriteJSON(data)
v.wsMutex.Unlock()
v.sessionID = "" v.sessionID = ""
} }
@ -244,9 +251,10 @@ func (v *VoiceConnection) open() (err error) {
// Connect to VoiceConnection Websocket // Connect to VoiceConnection Websocket
vg := fmt.Sprintf("wss://%s", strings.TrimSuffix(v.endpoint, ":80")) vg := fmt.Sprintf("wss://%s", strings.TrimSuffix(v.endpoint, ":80"))
v.log(LogInformational, "connecting to voice endpoint %s", vg)
v.wsConn, _, err = websocket.DefaultDialer.Dial(vg, nil) v.wsConn, _, err = websocket.DefaultDialer.Dial(vg, nil)
if err != nil { if err != nil {
log.Println("VOICE error opening websocket:", err) v.log(LogWarning, "error connecting to voice endpoint %s, %s", vg, err)
return return
} }
@ -282,7 +290,7 @@ func (v *VoiceConnection) open() (err error) {
func (v *VoiceConnection) wsListen(wsConn *websocket.Conn, close <-chan struct{}) { func (v *VoiceConnection) wsListen(wsConn *websocket.Conn, close <-chan struct{}) {
for { for {
messageType, message, err := v.wsConn.ReadMessage() _, message, err := v.wsConn.ReadMessage()
if err != nil { if err != nil {
// Detect if we have been closed manually. If a Close() has already // Detect if we have been closed manually. If a Close() has already
// happened, the websocket we are listening on will be different to the // happened, the websocket we are listening on will be different to the
@ -292,8 +300,20 @@ func (v *VoiceConnection) wsListen(wsConn *websocket.Conn, close <-chan struct{}
v.RUnlock() v.RUnlock()
if sameConnection { if sameConnection {
log.Println("voice websocket closed unexpectantly,", err) log.Println("voice endpoint %s websocket closed unexpectantly,", v.endpoint, err)
// temp code.
neterr, ok := err.(net.Error)
if ok {
if neterr.Timeout() {
v.log(LogDebug, "neterr udp timeout error")
}
if neterr.Temporary() {
v.log(LogDebug, "neterr udp tempoary error")
}
v.log(LogDebug, "neterr udp error %s", neterr.Error())
}
// There has been an error reading, Close() the websocket so that // There has been an error reading, Close() the websocket so that
// OnDisconnect is fired. // OnDisconnect is fired.
// TODO add Voice OnDisconnect event :) // TODO add Voice OnDisconnect event :)
@ -311,23 +331,20 @@ func (v *VoiceConnection) wsListen(wsConn *websocket.Conn, close <-chan struct{}
case <-close: case <-close:
return return
default: default:
go v.wsEvent(messageType, message) go v.onEvent(message)
} }
} }
} }
// wsEvent handles any voice websocket events. This is only called by the // wsEvent handles any voice websocket events. This is only called by the
// wsListen() function. // wsListen() function.
func (v *VoiceConnection) wsEvent(messageType int, message []byte) { func (v *VoiceConnection) onEvent(message []byte) {
if v.Debug { v.log(LogDebug, "received: %s", string(message))
log.Println("wsEvent received: ", messageType)
printJSON(message)
}
var e Event var e Event
if err := json.Unmarshal(message, &e); err != nil { if err := json.Unmarshal(message, &e); err != nil {
log.Println("wsEvent Unmarshall error: ", err) log.Println("unmarshall error, %s", err)
return return
} }
@ -336,8 +353,7 @@ func (v *VoiceConnection) wsEvent(messageType int, message []byte) {
case 2: // READY case 2: // READY
if err := json.Unmarshal(e.RawData, &v.op2); err != nil { if err := json.Unmarshal(e.RawData, &v.op2); err != nil {
log.Println("voiceWS.onEvent OP2 Unmarshall error: ", err) v.log(LogError, "OP2 unmarshall error, %s, %s", err, string(e.RawData))
printJSON(e.RawData) // TODO: Better error logging
return return
} }
@ -348,7 +364,7 @@ func (v *VoiceConnection) wsEvent(messageType int, message []byte) {
// Start the UDP connection // Start the UDP connection
err := v.udpOpen() err := v.udpOpen()
if err != nil { if err != nil {
log.Println("Error opening udp connection: ", err) v.log(LogError, "error opening udp connection, %s", err)
return return
} }
@ -379,8 +395,7 @@ func (v *VoiceConnection) wsEvent(messageType int, message []byte) {
case 4: // udp encryption secret key case 4: // udp encryption secret key
v.op4 = voiceOP4{} v.op4 = voiceOP4{}
if err := json.Unmarshal(e.RawData, &v.op4); err != nil { if err := json.Unmarshal(e.RawData, &v.op4); err != nil {
log.Println("voiceWS.onEvent OP4 Unmarshall error: ", err) v.log(LogError, "OP4 unmarshall error, %s, %s", err, string(e.RawData))
printJSON(e.RawData)
return return
} }
return return
@ -392,8 +407,7 @@ func (v *VoiceConnection) wsEvent(messageType int, message []byte) {
voiceSpeakingUpdate := &VoiceSpeakingUpdate{} voiceSpeakingUpdate := &VoiceSpeakingUpdate{}
if err := json.Unmarshal(e.RawData, voiceSpeakingUpdate); err != nil { if err := json.Unmarshal(e.RawData, voiceSpeakingUpdate); err != nil {
log.Println("voiceWS.onEvent VoiceSpeakingUpdate Unmarshal error: ", err) v.log(LogError, "OP5 unmarshall error, %s, %s", err, string(e.RawData))
printJSON(e.RawData)
return return
} }
@ -402,8 +416,7 @@ func (v *VoiceConnection) wsEvent(messageType int, message []byte) {
} }
default: default:
log.Println("UNKNOWN VOICE OP: ", e.Operation) v.log(LogError, "unknown voice operation, %d, %s", e.Operation, string(e.RawData))
printJSON(e.RawData)
} }
return return
@ -429,10 +442,12 @@ func (v *VoiceConnection) wsHeartbeat(wsConn *websocket.Conn, close <-chan struc
var err error var err error
ticker := time.NewTicker(i * time.Millisecond) ticker := time.NewTicker(i * time.Millisecond)
for { for {
v.log(LogDebug, "Sending heartbeat packet") v.log(LogDebug, "sending heartbeat packet")
v.wsMutex.Lock()
err = wsConn.WriteJSON(voiceHeartbeatOp{3, int(time.Now().Unix())}) err = wsConn.WriteJSON(voiceHeartbeatOp{3, int(time.Now().Unix())})
v.wsMutex.Unlock()
if err != nil { if err != nil {
log.Println("wsHeartbeat send error: ", err) v.log(LogError, "error sending heartbeat to voice endpoint %s, %s", v.endpoint, err)
return return
} }
@ -493,15 +508,14 @@ func (v *VoiceConnection) udpOpen() (err error) {
host := fmt.Sprintf("%s:%d", strings.TrimSuffix(v.endpoint, ":80"), v.op2.Port) host := fmt.Sprintf("%s:%d", strings.TrimSuffix(v.endpoint, ":80"), v.op2.Port)
addr, err := net.ResolveUDPAddr("udp", host) addr, err := net.ResolveUDPAddr("udp", host)
if err != nil { if err != nil {
log.Println("udpOpen resolve addr error: ", err) v.log(LogWarning, "error resolving udp host %s, %s", host, err)
// TODO better logging
return return
} }
v.log(LogInformational, "connecting to udp addr %s", addr.String())
v.udpConn, err = net.DialUDP("udp", nil, addr) v.udpConn, err = net.DialUDP("udp", nil, addr)
if err != nil { if err != nil {
log.Println("udpOpen dial udp error: ", err) v.log(LogWarning, "error connecting to udp addr %s, %s", addr.String(), err)
// TODO better logging
return return
} }
@ -511,8 +525,7 @@ func (v *VoiceConnection) udpOpen() (err error) {
binary.BigEndian.PutUint32(sb, v.op2.SSRC) binary.BigEndian.PutUint32(sb, v.op2.SSRC)
_, err = v.udpConn.Write(sb) _, err = v.udpConn.Write(sb)
if err != nil { if err != nil {
log.Println("udpOpen udp write error : ", err) v.log(LogWarning, "udp write error to %s, %s", addr.String(), err)
// TODO better logging
return return
} }
@ -523,12 +536,13 @@ func (v *VoiceConnection) udpOpen() (err error) {
rb := make([]byte, 70) rb := make([]byte, 70)
rlen, _, err := v.udpConn.ReadFromUDP(rb) rlen, _, err := v.udpConn.ReadFromUDP(rb)
if err != nil { if err != nil {
log.Println("udpOpen udp read error : ", err) v.log(LogWarning, "udp read error, %s, %s", addr.String(), err)
// TODO better logging
return return
} }
if rlen < 70 { if rlen < 70 {
log.Println("VoiceConnection RLEN should be 70 but isn't") v.log(LogWarning, "received udp packet too small")
return fmt.Errorf("received udp packet too small")
} }
// Loop over position 4 though 20 to grab the IP address // Loop over position 4 though 20 to grab the IP address
@ -548,9 +562,11 @@ func (v *VoiceConnection) udpOpen() (err error) {
// the UDP connection handshake. // the UDP connection handshake.
data := voiceUDPOp{1, voiceUDPD{"udp", voiceUDPData{ip, port, "xsalsa20_poly1305"}}} data := voiceUDPOp{1, voiceUDPD{"udp", voiceUDPData{ip, port, "xsalsa20_poly1305"}}}
v.wsMutex.Lock()
err = v.wsConn.WriteJSON(data) err = v.wsConn.WriteJSON(data)
v.wsMutex.Unlock()
if err != nil { if err != nil {
log.Println("udpOpen write json error:", err) v.log(LogWarning, "udp write error, %#v, %s", data, err)
return return
} }
@ -582,7 +598,7 @@ func (v *VoiceConnection) udpKeepAlive(udpConn *net.UDPConn, close <-chan struct
_, err = udpConn.Write(packet) _, err = udpConn.Write(packet)
if err != nil { if err != nil {
log.Println("udpKeepAlive udp write error : ", err) v.log(LogError, "write error, %s")
return return
} }
@ -656,7 +672,18 @@ func (v *VoiceConnection) opusSender(udpConn *net.UDPConn, close <-chan struct{}
_, err := udpConn.Write(sendbuf) _, err := udpConn.Write(sendbuf)
if err != nil { if err != nil {
log.Println("error writing to udp connection: ", err) v.log(LogError, "udp write error, %s", err)
neterr, ok := err.(net.Error)
if ok {
if neterr.Timeout() {
v.log(LogDebug, "neterr udp timeout error")
}
if neterr.Temporary() {
v.log(LogDebug, "neterr udp tempoary error")
}
v.log(LogDebug, "neterr udp error %s", neterr.Error())
}
return return
} }
@ -708,7 +735,20 @@ func (v *VoiceConnection) opusReceiver(udpConn *net.UDPConn, close <-chan struct
v.RUnlock() v.RUnlock()
if sameConnection { if sameConnection {
log.Println("voice udp connection closed unexpectantly,", err) v.log(LogError, "udp read error, %s, %s", v.endpoint, err)
// temp code.
neterr, ok := err.(net.Error)
if ok {
if neterr.Timeout() {
v.log(LogDebug, "neterr udp timeout error")
}
if neterr.Temporary() {
v.log(LogDebug, "neterr udp tempoary error")
}
v.log(LogDebug, "neterr udp error %s", neterr.Error())
}
// There has been an error reading, Close() the websocket so that // There has been an error reading, Close() the websocket so that
// OnDisconnect is fired. // OnDisconnect is fired.

View file

@ -204,6 +204,18 @@ func (s *Session) listen(wsConn *websocket.Conn, listening <-chan interface{}) {
if sameConnection { if sameConnection {
neterr, ok := err.(net.Error)
if ok {
if neterr.Timeout() {
v.log(LogDebug, "neterr udp timeout error")
}
if neterr.Temporary() {
v.log(LogDebug, "neterr udp tempoary error")
}
v.log(LogDebug, "neterr udp error %s", neterr.Error())
}
s.log(LogWarning, "error reading from gateway %s websocket, %s", s.gateway, err) s.log(LogWarning, "error reading from gateway %s websocket, %s", s.gateway, err)
// There has been an error reading, close the websocket so that // There has been an error reading, close the websocket so that
// OnDisconnect event is emitted. // OnDisconnect event is emitted.