Fix issues in the poll loop

This commit is contained in:
Juan Font Alonso 2022-06-20 21:40:28 +02:00
parent 8e63b53b0c
commit 5e9004c407

77
poll.go
View file

@ -293,17 +293,17 @@ func (h *Headscale) PollNetMapStream(
machine, machine,
) )
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname).
Msg("Waiting for data to stream...")
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname).
Msgf("pollData is %#v, keepAliveChan is %#v, updateChan is %#v", pollDataChan, keepAliveChan, updateChan)
for { for {
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname).
Msg("Waiting for data to stream...")
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname).
Msgf("pollData is %#v, keepAliveChan is %#v, updateChan is %#v", pollDataChan, keepAliveChan, updateChan)
select { select {
case data := <-pollDataChan: case data := <-pollDataChan:
log.Trace(). log.Trace().
@ -321,8 +321,9 @@ func (h *Headscale) PollNetMapStream(
Err(err). Err(err).
Msg("Cannot write data") Msg("Cannot write data")
break return
} }
log.Trace(). log.Trace().
Str("handler", "PollNetMapStream"). Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname). Str("machine", machine.Hostname).
@ -343,7 +344,7 @@ func (h *Headscale) PollNetMapStream(
// client has been removed from database // client has been removed from database
// since the stream opened, terminate connection. // since the stream opened, terminate connection.
break return
} }
now := time.Now().UTC() now := time.Now().UTC()
machine.LastSeen = &now machine.LastSeen = &now
@ -360,16 +361,16 @@ func (h *Headscale) PollNetMapStream(
Str("channel", "pollData"). Str("channel", "pollData").
Err(err). Err(err).
Msg("Cannot update machine LastSuccessfulUpdate") Msg("Cannot update machine LastSuccessfulUpdate")
} else {
log.Trace(). return
Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname).
Str("channel", "pollData").
Int("bytes", len(data)).
Msg("Machine entry in database updated successfully after sending pollData")
} }
break log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname).
Str("channel", "pollData").
Int("bytes", len(data)).
Msg("Machine entry in database updated successfully after sending data")
case data := <-keepAliveChan: case data := <-keepAliveChan:
log.Trace(). log.Trace().
@ -387,8 +388,9 @@ func (h *Headscale) PollNetMapStream(
Err(err). Err(err).
Msg("Cannot write keep alive message") Msg("Cannot write keep alive message")
break return
} }
log.Trace(). log.Trace().
Str("handler", "PollNetMapStream"). Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname). Str("machine", machine.Hostname).
@ -409,7 +411,7 @@ func (h *Headscale) PollNetMapStream(
// client has been removed from database // client has been removed from database
// since the stream opened, terminate connection. // since the stream opened, terminate connection.
break return
} }
now := time.Now().UTC() now := time.Now().UTC()
machine.LastSeen = &now machine.LastSeen = &now
@ -421,16 +423,16 @@ func (h *Headscale) PollNetMapStream(
Str("channel", "keepAlive"). Str("channel", "keepAlive").
Err(err). Err(err).
Msg("Cannot update machine LastSeen") Msg("Cannot update machine LastSeen")
} else {
log.Trace(). return
Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname).
Str("channel", "keepAlive").
Int("bytes", len(data)).
Msg("Machine updated successfully after sending keep alive")
} }
break log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Hostname).
Str("channel", "keepAlive").
Int("bytes", len(data)).
Msg("Machine updated successfully after sending keep alive")
case <-updateChan: case <-updateChan:
log.Trace(). log.Trace().
@ -440,6 +442,7 @@ func (h *Headscale) PollNetMapStream(
Msg("Received a request for update") Msg("Received a request for update")
updateRequestsReceivedOnChannel.WithLabelValues(machine.Namespace.Name, machine.Hostname). updateRequestsReceivedOnChannel.WithLabelValues(machine.Namespace.Name, machine.Hostname).
Inc() Inc()
if h.isOutdated(machine) { if h.isOutdated(machine) {
var lastUpdate time.Time var lastUpdate time.Time
if machine.LastSuccessfulUpdate != nil { if machine.LastSuccessfulUpdate != nil {
@ -459,6 +462,8 @@ func (h *Headscale) PollNetMapStream(
Str("channel", "update"). Str("channel", "update").
Err(err). Err(err).
Msg("Could not get the map update") Msg("Could not get the map update")
return
} }
_, err = w.Write(data) _, err = w.Write(data)
if err != nil { if err != nil {
@ -515,7 +520,10 @@ func (h *Headscale) PollNetMapStream(
Str("channel", "update"). Str("channel", "update").
Err(err). Err(err).
Msg("Cannot update machine LastSuccessfulUpdate") Msg("Cannot update machine LastSuccessfulUpdate")
return
} }
} else { } else {
var lastUpdate time.Time var lastUpdate time.Time
if machine.LastSuccessfulUpdate != nil { if machine.LastSuccessfulUpdate != nil {
@ -529,8 +537,6 @@ func (h *Headscale) PollNetMapStream(
Msgf("%s is up to date", machine.Hostname) Msgf("%s is up to date", machine.Hostname)
} }
return
case <-ctx.Done(): case <-ctx.Done():
log.Info(). log.Info().
Str("handler", "PollNetMapStream"). Str("handler", "PollNetMapStream").
@ -550,7 +556,7 @@ func (h *Headscale) PollNetMapStream(
// client has been removed from database // client has been removed from database
// since the stream opened, terminate connection. // since the stream opened, terminate connection.
break return
} }
now := time.Now().UTC() now := time.Now().UTC()
machine.LastSeen = &now machine.LastSeen = &now
@ -564,11 +570,10 @@ func (h *Headscale) PollNetMapStream(
Msg("Cannot update machine LastSeen") Msg("Cannot update machine LastSeen")
} }
break // The connection has been closed, so we can stop polling.
return
} }
} }
log.Info().Msgf("Closing poll loop to %s", machine.Hostname)
} }
func (h *Headscale) scheduledPollWorker( func (h *Headscale) scheduledPollWorker(