Refactor open/close logic. Support OnConnect and OnDisconnect.

This commit is contained in:
Chris Rhodes 2016-01-20 22:47:34 -08:00
parent 654c78a419
commit cd55a59ff9
4 changed files with 122 additions and 141 deletions

View file

@ -111,20 +111,27 @@ func New(args ...interface{}) (s *Session, err error) {
} }
} }
// TODO: Add code here to fetch authenticated user info like settings, // The Session is now able to have RestAPI methods called on it.
// avatar, User ID, etc. If fails, return error. // It is recommended that you now call OpenAndListen so that events
// will begin to trigger.
// Open websocket connection return
}
// OpenAndListen is a helper method that opens the websocket connection,
// does the required handshake and then immediately begins listening.
// This is the preferred way to start listening for events and is safe
// to be called inside an OnDisconnect handler.
func (s *Session) OpenAndListen() (err error) {
// Open websocket connection.
err = s.Open() err = s.Open()
if err != nil { if err != nil {
fmt.Println(err)
return return
} }
// Do websocket handshake. // Do websocket handshake.
err = s.Handshake() err = s.Handshake()
if err != nil { if err != nil {
fmt.Println(err)
return return
} }
@ -133,34 +140,3 @@ func New(args ...interface{}) (s *Session, err error) {
return return
} }
// Close closes a Discord session
// TODO: Add support for Voice WS/UDP connections
func (s *Session) Close() {
s.DataReady = false
if s.heartbeatChan != nil {
select {
case <-s.heartbeatChan:
break
default:
close(s.heartbeatChan)
}
s.heartbeatChan = nil
}
if s.listenChan != nil {
select {
case <-s.listenChan:
break
default:
close(s.listenChan)
}
s.listenChan = nil
}
if s.wsConn != nil {
s.wsConn.Close()
}
}

View file

@ -31,6 +31,9 @@ func main() {
// Register messageCreate as a callback for the OnMessageCreate event. // Register messageCreate as a callback for the OnMessageCreate event.
dg.OnMessageCreate = messageCreate dg.OnMessageCreate = messageCreate
// Open the websocket and begin listening.
dg.OpenAndListen()
// Simple way to keep program running until any key press. // Simple way to keep program running until any key press.
var input string var input string
fmt.Scanln(&input) fmt.Scanln(&input)

View file

@ -23,10 +23,19 @@ import (
// token : The authentication token returned from Discord // token : The authentication token returned from Discord
// Debug : If set to ture debug logging will be displayed. // Debug : If set to ture debug logging will be displayed.
type Session struct { type Session struct {
sync.Mutex
// General configurable settings. // General configurable settings.
Token string // Authentication token for this session Token string // Authentication token for this session
Debug bool // Debug for printing JSON request/responses Debug bool // Debug for printing JSON request/responses
// Settable Callback functions for Internal Events
// OnConnect is called when the websocket connection opens.
OnConnect func(*Session)
// OnDisconnect is called when the websocket connection closes.
// This is a good handler to add reconnection logic to.
OnDisconnect func(*Session)
// Settable Callback functions for Websocket Events // Settable Callback functions for Websocket Events
OnEvent func(*Session, *Event) OnEvent func(*Session, *Event)
OnReady func(*Session, *Ready) OnReady func(*Session, *Ready)
@ -81,14 +90,8 @@ type Session struct {
StateEnabled bool StateEnabled bool
StateMaxMessageCount int StateMaxMessageCount int
// Mutex/Bools for locks that prevent accidents. // When nil, the session is not listening.
// TODO: Add channels. listening chan interface{}
heartbeatLock sync.Mutex
heartbeatChan chan struct{}
listenLock sync.Mutex
listenChan chan struct{}
} }
// A VoiceRegion stores data for a specific voice region server. // A VoiceRegion stores data for a specific voice region server.

201
wsapi.go
View file

@ -11,6 +11,7 @@
package discordgo package discordgo
import ( import (
"errors"
"fmt" "fmt"
"runtime" "runtime"
"time" "time"
@ -20,6 +21,20 @@ import (
// Open opens a websocket connection to Discord. // Open opens a websocket connection to Discord.
func (s *Session) Open() (err error) { func (s *Session) Open() (err error) {
s.Lock()
defer func() {
s.Unlock()
// Fire OnConnect after we have unlocked the mutex,
// otherwise we may deadlock.
if err == nil && s.OnConnect != nil {
s.OnConnect(s)
}
}()
if s.wsConn != nil {
err = errors.New("Web socket already opened.")
return
}
// Get the gateway to use for the Websocket connection // Get the gateway to use for the Websocket connection
g, err := s.Gateway() g, err := s.Gateway()
@ -30,6 +45,37 @@ func (s *Session) Open() (err error) {
// TODO: See if there's a use for the http response. // TODO: See if there's a use for the http response.
// conn, response, err := websocket.DefaultDialer.Dial(session.Gateway, nil) // conn, response, err := websocket.DefaultDialer.Dial(session.Gateway, nil)
s.wsConn, _, err = websocket.DefaultDialer.Dial(g, nil) s.wsConn, _, err = websocket.DefaultDialer.Dial(g, nil)
if err != nil {
return
}
return
}
// Close closes a websocket and stops all listening/heartbeat goroutines.
func (s *Session) Close() (err error) {
s.Lock()
defer func() {
s.Unlock()
// Fire OnDisconnect after we have unlocked the mutex
// otherwise we may deadlock, especially in reconnect logic.
if err == nil && s.OnDisconnect != nil {
s.OnDisconnect(s)
}
}()
s.DataReady = false
if s.listening != nil {
close(s.listening)
s.listening = nil
}
if s.wsConn != nil {
err = s.wsConn.Close()
s.wsConn = nil
}
return return
} }
@ -53,12 +99,9 @@ type handshakeOp struct {
} }
// Handshake sends the client data to Discord during websocket initial connection. // Handshake sends the client data to Discord during websocket initial connection.
func (s *Session) Handshake() (err error) { func (s *Session) Handshake() error {
// maybe this is SendOrigin? not sure the right name here
data := handshakeOp{2, handshakeData{3, s.Token, handshakeProperties{runtime.GOOS, "Discordgo v" + VERSION, "", "", ""}}} data := handshakeOp{2, handshakeData{3, s.Token, handshakeProperties{runtime.GOOS, "Discordgo v" + VERSION, "", "", ""}}}
err = s.wsConn.WriteJSON(data) return s.wsConn.WriteJSON(data)
return
} }
type updateStatusGame struct { type updateStatusGame struct {
@ -79,6 +122,11 @@ type updateStatusOp struct {
// If idle>0 then set status to idle. If game>0 then set game. // If idle>0 then set status to idle. If game>0 then set game.
// if otherwise, set status to active, and no game. // if otherwise, set status to active, and no game.
func (s *Session) UpdateStatus(idle int, game string) (err error) { func (s *Session) UpdateStatus(idle int, game string) (err error) {
s.RLock()
defer s.RUnlock()
if s.wsConn == nil {
return errors.New("No websocket connection exists.")
}
var usd updateStatusData var usd updateStatusData
if idle > 0 { if idle > 0 {
@ -96,65 +144,36 @@ func (s *Session) UpdateStatus(idle int, game string) (err error) {
// Listen starts listening to the websocket connection for events. // Listen starts listening to the websocket connection for events.
func (s *Session) Listen() (err error) { func (s *Session) Listen() (err error) {
// TODO: need a channel or something to communicate s.Lock()
// to this so I can tell it to stop listening
if s.wsConn == nil { if s.wsConn == nil {
fmt.Println("No websocket connection exists.") s.Unlock()
return // TODO need to return an error. return errors.New("No websocket connection exists.")
}
if s.listening != nil {
s.Unlock()
return errors.New("Already listening to websocket.")
} }
// Make sure Listen is not already running s.listening = make(chan interface{})
s.listenLock.Lock()
if s.listenChan != nil {
s.listenLock.Unlock()
return
}
s.listenChan = make(chan struct{})
s.listenLock.Unlock()
// this is ugly. s.Unlock()
defer func() {
if s.listenChan == nil {
return
}
select {
case <-s.listenChan:
break
default:
close(s.listenChan)
}
s.listenChan = nil
}()
// this is ugly. // Keep a reference, as s.listening can be nilled out.
defer func() { listening := s.listening
if s.heartbeatChan == nil {
return
}
select {
case <-s.heartbeatChan:
break
default:
close(s.heartbeatChan)
}
s.listenChan = nil
}()
for { for {
messageType, message, err := s.wsConn.ReadMessage() messageType, message, err := s.wsConn.ReadMessage()
if err != nil { if err != nil {
fmt.Println("Websocket Listen Error", err) // Defer so we get better log ordering.
// TODO Log error defer s.Close()
break return fmt.Errorf("Websocket Listen Error", err)
} }
go s.event(messageType, message)
// If our chan gets closed, exit out of this loop. select {
// TODO: Can we make this smarter, using select case <-listening:
// and some other trickery? http://www.goinggo.net/2013/10/my-channel-select-bug.html return
if s.listenChan == nil { default:
return nil go s.event(messageType, message)
} }
} }
@ -192,8 +211,8 @@ func (s *Session) event(messageType int, message []byte) (err error) {
} }
switch e.Type { switch e.Type {
case "READY": case "READY":
s.DataReady = true
var st *Ready var st *Ready
if err = unmarshalEvent(e, &st); err == nil { if err = unmarshalEvent(e, &st); err == nil {
if s.StateEnabled { if s.StateEnabled {
@ -202,8 +221,8 @@ func (s *Session) event(messageType int, message []byte) (err error) {
if s.OnReady != nil { if s.OnReady != nil {
s.OnReady(s, st) s.OnReady(s, st)
} }
go s.Heartbeat(st.HeartbeatInterval)
} }
go s.heartbeat(st.HeartbeatInterval)
if s.OnReady != nil { if s.OnReady != nil {
return return
} }
@ -541,58 +560,38 @@ func (s *Session) event(messageType int, message []byte) (err error) {
return return
} }
// Heartbeat sends regular heartbeats to Discord so it knows the client func (s *Session) sendHeartbeat() error {
// is still connected. If you do not send these heartbeats Discord will return s.wsConn.WriteJSON(map[string]int{
// disconnect the websocket connection after a few seconds.
func (s *Session) Heartbeat(i time.Duration) {
if s.wsConn == nil {
fmt.Println("No websocket connection exists.")
return // TODO need to return/log an error.
}
// Make sure Heartbeat is not already running
s.heartbeatLock.Lock()
if s.heartbeatChan != nil {
s.heartbeatLock.Unlock()
return
}
s.heartbeatChan = make(chan struct{})
s.heartbeatLock.Unlock()
// this is ugly.
defer func() {
if s.heartbeatChan == nil {
return
}
select {
case <-s.heartbeatChan:
break
default:
close(s.heartbeatChan)
}
s.listenChan = nil
}()
// send first heartbeat immediately because lag could put the
// first heartbeat outside the required heartbeat interval window
ticker := time.NewTicker(i * time.Millisecond)
for {
err := s.wsConn.WriteJSON(map[string]int{
"op": 1, "op": 1,
"d": int(time.Now().Unix()), "d": int(time.Now().Unix()),
}) })
if err != nil { }
fmt.Println("error sending data heartbeat:", err)
s.DataReady = false
return // TODO log error?
}
s.DataReady = true
// heartbeat sends regular heartbeats to Discord so it knows the client
// is still connected. If you do not send these heartbeats Discord will
// disconnect the websocket connection after a few seconds.
func (s *Session) heartbeat(i time.Duration) {
// Keep a reference, as s.listening can be nilled out.
listening := s.listening
// Send first heartbeat immediately because lag could put the
// first heartbeat outside the required heartbeat interval window.
err := s.sendHeartbeat()
if err != nil {
fmt.Println("Error sending initial heartbeat:", err)
return
}
ticker := time.NewTicker(i * time.Millisecond)
for {
select { select {
case <-ticker.C: case <-ticker.C:
case <-s.heartbeatChan: err := s.sendHeartbeat()
if err != nil {
fmt.Println("Error sending heartbeat:", err)
return
}
case <-listening:
return return
} }
} }