Add rate limit buffering support (#430)

* Fix #406: reconnect() can be called while still connected

* Add memberMap to speed up member queries

* Fix error return value and remove deletion

* Fix GuildAdd member map initialization edge case

* Add rate limit buffering support

- Break request into requestInner
- Break LockBucket into LockBucketObject
- Change getBucket to GetBucket so it can be externally accessed
- Add RequestBuffer interface
- Add RequestBuffer pointer to Bucket struct
- Add RequestPostWithBuffer() function to Session

* Remove internal implementation, export ratelimiter instead
This commit is contained in:
Erik McClure 2017-10-07 10:54:46 -07:00 committed by Chris Rhodes
parent 9da2c9e76a
commit 97a510ca0a
4 changed files with 39 additions and 28 deletions

View file

@ -50,7 +50,7 @@ func New(args ...interface{}) (s *Session, err error) {
// Create an empty Session interface. // Create an empty Session interface.
s = &Session{ s = &Session{
State: NewState(), State: NewState(),
ratelimiter: NewRatelimiter(), Ratelimiter: NewRatelimiter(),
StateEnabled: true, StateEnabled: true,
Compress: true, Compress: true,
ShouldReconnectOnError: true, ShouldReconnectOnError: true,

View file

@ -41,8 +41,8 @@ func NewRatelimiter() *RateLimiter {
} }
} }
// getBucket retrieves or creates a bucket // GetBucket retrieves or creates a bucket
func (r *RateLimiter) getBucket(key string) *Bucket { func (r *RateLimiter) GetBucket(key string) *Bucket {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
@ -51,7 +51,7 @@ func (r *RateLimiter) getBucket(key string) *Bucket {
} }
b := &Bucket{ b := &Bucket{
remaining: 1, Remaining: 1,
Key: key, Key: key,
global: r.global, global: r.global,
} }
@ -67,28 +67,36 @@ func (r *RateLimiter) getBucket(key string) *Bucket {
r.buckets[key] = b r.buckets[key] = b
return b return b
} }
func (r *RateLimiter) GetWaitTime(b *Bucket, minRemaining int) time.Duration {
// LockBucket Locks until a request can be made
func (r *RateLimiter) LockBucket(bucketID string) *Bucket {
b := r.getBucket(bucketID)
b.Lock()
// If we ran out of calls and the reset time is still ahead of us // If we ran out of calls and the reset time is still ahead of us
// then we need to take it easy and relax a little // then we need to take it easy and relax a little
if b.remaining < 1 && b.reset.After(time.Now()) { if b.Remaining < minRemaining && b.reset.After(time.Now()) {
time.Sleep(b.reset.Sub(time.Now())) return b.reset.Sub(time.Now())
} }
// Check for global ratelimits // Check for global ratelimits
sleepTo := time.Unix(0, atomic.LoadInt64(r.global)) sleepTo := time.Unix(0, atomic.LoadInt64(r.global))
if now := time.Now(); now.Before(sleepTo) { if now := time.Now(); now.Before(sleepTo) {
time.Sleep(sleepTo.Sub(now)) return sleepTo.Sub(now)
} }
b.remaining-- return 0
}
// LockBucket Locks until a request can be made
func (r *RateLimiter) LockBucket(bucketID string) *Bucket {
return r.LockBucketObject(r.GetBucket(bucketID))
}
// LockBucketObject Locks an already resolved bucket until a request can be made
func (r *RateLimiter) LockBucketObject(b *Bucket) *Bucket {
b.Lock()
if wait := r.GetWaitTime(b, 1); wait > 0 {
time.Sleep(wait)
}
b.Remaining--
return b return b
} }
@ -96,13 +104,14 @@ func (r *RateLimiter) LockBucket(bucketID string) *Bucket {
type Bucket struct { type Bucket struct {
sync.Mutex sync.Mutex
Key string Key string
remaining int Remaining int
limit int limit int
reset time.Time reset time.Time
global *int64 global *int64
lastReset time.Time lastReset time.Time
customRateLimit *customRateLimit customRateLimit *customRateLimit
Userdata interface{}
} }
// Release unlocks the bucket and reads the headers to update the buckets ratelimit info // Release unlocks the bucket and reads the headers to update the buckets ratelimit info
@ -113,10 +122,10 @@ func (b *Bucket) Release(headers http.Header) error {
// Check if the bucket uses a custom ratelimiter // Check if the bucket uses a custom ratelimiter
if rl := b.customRateLimit; rl != nil { if rl := b.customRateLimit; rl != nil {
if time.Now().Sub(b.lastReset) >= rl.reset { if time.Now().Sub(b.lastReset) >= rl.reset {
b.remaining = rl.requests - 1 b.Remaining = rl.requests - 1
b.lastReset = time.Now() b.lastReset = time.Now()
} }
if b.remaining < 1 { if b.Remaining < 1 {
b.reset = time.Now().Add(rl.reset) b.reset = time.Now().Add(rl.reset)
} }
return nil return nil
@ -176,7 +185,7 @@ func (b *Bucket) Release(headers http.Header) error {
if err != nil { if err != nil {
return err return err
} }
b.remaining = int(parsedRemaining) b.Remaining = int(parsedRemaining)
} }
return nil return nil

View file

@ -65,9 +65,11 @@ func (s *Session) request(method, urlStr, contentType string, b []byte, bucketID
if bucketID == "" { if bucketID == "" {
bucketID = strings.SplitN(urlStr, "?", 2)[0] bucketID = strings.SplitN(urlStr, "?", 2)[0]
} }
return s.RequestWithLockedBucket(method, urlStr, contentType, b, s.Ratelimiter.LockBucket(bucketID), sequence)
}
bucket := s.ratelimiter.LockBucket(bucketID) // RequestWithLockedBucket makes a request using a bucket that's already been locked
func (s *Session) RequestWithLockedBucket(method, urlStr, contentType string, b []byte, bucket *Bucket, sequence int) (response []byte, err error) {
if s.Debug { if s.Debug {
log.Printf("API REQUEST %8s :: %s\n", method, urlStr) log.Printf("API REQUEST %8s :: %s\n", method, urlStr)
log.Printf("API REQUEST PAYLOAD :: [%s]\n", string(b)) log.Printf("API REQUEST PAYLOAD :: [%s]\n", string(b))
@ -139,7 +141,7 @@ func (s *Session) request(method, urlStr, contentType string, b []byte, bucketID
if sequence < s.MaxRestRetries { if sequence < s.MaxRestRetries {
s.log(LogInformational, "%s Failed (%s), Retrying...", urlStr, resp.Status) s.log(LogInformational, "%s Failed (%s), Retrying...", urlStr, resp.Status)
response, err = s.request(method, urlStr, contentType, b, bucketID, sequence+1) response, err = s.RequestWithLockedBucket(method, urlStr, contentType, b, s.Ratelimiter.LockBucketObject(bucket), sequence+1)
} else { } else {
err = fmt.Errorf("Exceeded Max retries HTTP %s, %s", resp.Status, response) err = fmt.Errorf("Exceeded Max retries HTTP %s, %s", resp.Status, response)
} }
@ -158,7 +160,7 @@ func (s *Session) request(method, urlStr, contentType string, b []byte, bucketID
// we can make the above smarter // we can make the above smarter
// this method can cause longer delays than required // this method can cause longer delays than required
response, err = s.request(method, urlStr, contentType, b, bucketID, sequence) response, err = s.RequestWithLockedBucket(method, urlStr, contentType, b, s.Ratelimiter.LockBucketObject(bucket), sequence)
default: // Error condition default: // Error condition
err = newRestError(req, resp, response) err = newRestError(req, resp, response)

View file

@ -83,6 +83,9 @@ type Session struct {
// Stores the last HeartbeatAck that was recieved (in UTC) // Stores the last HeartbeatAck that was recieved (in UTC)
LastHeartbeatAck time.Time LastHeartbeatAck time.Time
// used to deal with rate limits
Ratelimiter *RateLimiter
// Event handlers // Event handlers
handlersMu sync.RWMutex handlersMu sync.RWMutex
handlers map[string][]*eventHandlerInstance handlers map[string][]*eventHandlerInstance
@ -94,9 +97,6 @@ type Session struct {
// When nil, the session is not listening. // When nil, the session is not listening.
listening chan interface{} listening chan interface{}
// used to deal with rate limits
ratelimiter *RateLimiter
// sequence tracks the current gateway api websocket sequence number // sequence tracks the current gateway api websocket sequence number
sequence *int64 sequence *int64