Return simple responses immediatly

This commit rearranges the poll handler to immediatly accept
updates and notify its peers and return, not travel down the
function for a bit. This reduces the DB calls and other
holdups that isnt necessary to send a "lite response", a
map response without peers, or accepting an endpoint update.

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
Kristoffer Dalby 2023-09-11 06:18:31 -05:00 committed by Kristoffer Dalby
parent 217ccd6540
commit c957f893bd
3 changed files with 221 additions and 137 deletions

View file

@ -94,8 +94,6 @@ type Headscale struct {
shutdownChan chan struct{} shutdownChan chan struct{}
pollNetMapStreamWG sync.WaitGroup pollNetMapStreamWG sync.WaitGroup
pollStreamOpenMu sync.Mutex
} }
func NewHeadscale(cfg *types.Config) (*Headscale, error) { func NewHeadscale(cfg *types.Config) (*Headscale, error) {

View file

@ -362,13 +362,15 @@ func (hsdb *HSDatabase) deleteMachine(machine *types.Machine) error {
return nil return nil
} }
func (hsdb *HSDatabase) TouchMachine(machine *types.Machine) error { // UpdateLastSeen sets a machine's last seen field indicating that we
hsdb.mu.Lock() // have recently communicating with this machine.
defer hsdb.mu.Unlock() // This is mostly used to indicate if a machine is online and is not
// extremely important to make sure is fully correct and to avoid
// holding up the hot path, does not contain any locks and isnt
// concurrency safe. But that should be ok.
func (hsdb *HSDatabase) UpdateLastSeen(machine *types.Machine) error {
return hsdb.db.Model(machine).Updates(types.Machine{ return hsdb.db.Model(machine).Updates(types.Machine{
LastSeen: machine.LastSeen, LastSeen: machine.LastSeen,
LastSuccessfulUpdate: machine.LastSuccessfulUpdate,
}).Error }).Error
} }

View file

@ -64,24 +64,80 @@ func (h *Headscale) handlePoll(
mapRequest tailcfg.MapRequest, mapRequest tailcfg.MapRequest,
isNoise bool, isNoise bool,
) { ) {
// Immediate open the channel and register it if the client wants logInfo, logErr := logPollFunc(mapRequest, machine, isNoise)
// a stream of MapResponses to prevent initial map response and
// following updates missing
var updateChan chan types.StateUpdate
if mapRequest.Stream {
h.pollStreamOpenMu.Lock()
h.pollNetMapStreamWG.Add(1)
defer h.pollNetMapStreamWG.Done()
updateChan = make(chan types.StateUpdate) // If OmitPeers is true, Stream is false, and ReadOnly is false,
defer closeChanWithLog(updateChan, machine.Hostname, "updateChan") // then te server will let clients update their endpoints without
// breaking existing long-polling (Stream == true) connections.
// In this case, the server can omit the entire response; the client
// only checks the HTTP response status code.
if mapRequest.OmitPeers && !mapRequest.Stream && !mapRequest.ReadOnly {
log.Info().
Caller().
Bool("noise", isNoise).
Bool("readOnly", mapRequest.ReadOnly).
Bool("omitPeers", mapRequest.OmitPeers).
Bool("stream", mapRequest.Stream).
Str("node_key", machine.NodeKey).
Str("machine", machine.Hostname).
Strs("endpoints", machine.Endpoints).
Msg("Received endpoint update")
// Register the node's update channel now := time.Now().UTC()
h.nodeNotifier.AddNode(machine.MachineKey, updateChan) machine.Endpoints = mapRequest.Endpoints
defer h.nodeNotifier.RemoveNode(machine.MachineKey) machine.LastSeen = &now
if err := h.db.MachineSave(machine); err != nil {
logErr(err, "Failed to persist/update machine in the database")
http.Error(writer, "", http.StatusInternalServerError)
return
}
h.nodeNotifier.NotifyWithIgnore(
types.StateUpdate{
Type: types.StatePeerChanged,
Changed: types.Machines{machine},
},
machine.MachineKey)
writer.WriteHeader(http.StatusOK)
if f, ok := writer.(http.Flusher); ok {
f.Flush()
}
return
// ReadOnly is whether the client just wants to fetch the
// MapResponse, without updating their Endpoints. The
// Endpoints field will be ignored and LastSeen will not be
// updated and peers will not be notified of changes.
//
// The intended use is for clients to discover the DERP map at
// start-up before their first real endpoint update.
} else if mapRequest.OmitPeers && !mapRequest.Stream && mapRequest.ReadOnly {
h.handleLiteRequest(writer, machine, mapRequest, isNoise)
return
} else if mapRequest.OmitPeers && mapRequest.Stream {
logErr(nil, "Ignoring request, don't know how to handle it")
return
} }
logInfo, logErr := logPollFunc(mapRequest, machine, isNoise) // Handle requests not related to continouos updates immediately.
// TODO(kradalby): I am not sure if this has any function based on
// incoming requests from clients.
if mapRequest.ReadOnly && !mapRequest.Stream {
h.handleReadOnly(writer, machine, mapRequest, isNoise)
return
}
machine.Hostname = mapRequest.Hostinfo.Hostname
machine.HostInfo = types.HostInfo(*mapRequest.Hostinfo)
machine.DiscoKey = util.DiscoPublicKeyStripPrefix(mapRequest.DiscoKey)
machine.Endpoints = mapRequest.Endpoints
// When a node connects to control, list the peers it has at // When a node connects to control, list the peers it has at
// that given point, further updates are kept in memory in // that given point, further updates are kept in memory in
@ -107,11 +163,6 @@ func (h *Headscale) handlePoll(
h.cfg.RandomizeClientPort, h.cfg.RandomizeClientPort,
) )
machine.Hostname = mapRequest.Hostinfo.Hostname
machine.HostInfo = types.HostInfo(*mapRequest.Hostinfo)
machine.DiscoKey = util.DiscoPublicKeyStripPrefix(mapRequest.DiscoKey)
now := time.Now().UTC()
err = h.db.SaveMachineRoutes(machine) err = h.db.SaveMachineRoutes(machine)
if err != nil { if err != nil {
logErr(err, "Error processing machine routes") logErr(err, "Error processing machine routes")
@ -126,19 +177,6 @@ func (h *Headscale) handlePoll(
} }
} }
// From Tailscale client:
//
// ReadOnly is whether the client just wants to fetch the MapResponse,
// without updating their Endpoints. The Endpoints field will be ignored and
// LastSeen will not be updated and peers will not be notified of changes.
//
// The intended use is for clients to discover the DERP map at start-up
// before their first real endpoint update.
if !mapRequest.ReadOnly {
machine.Endpoints = mapRequest.Endpoints
machine.LastSeen = &now
}
// TODO(kradalby): Save specific stuff, not whole object. // TODO(kradalby): Save specific stuff, not whole object.
if err := h.db.MachineSave(machine); err != nil { if err := h.db.MachineSave(machine); err != nil {
logErr(err, "Failed to persist/update machine in the database") logErr(err, "Failed to persist/update machine in the database")
@ -147,82 +185,6 @@ func (h *Headscale) handlePoll(
return return
} }
if !mapRequest.ReadOnly {
// It sounds like we should update the nodes when we have received a endpoint update
// even tho the comments in the tailscale code dont explicitly say so.
updateRequestsFromNode.WithLabelValues(machine.User.Name, machine.Hostname, "endpoint-update").
Inc()
// Tell all the other nodes about the new endpoint, but dont update ourselves.
h.nodeNotifier.NotifyWithIgnore(
types.StateUpdate{
Type: types.StatePeerChanged,
Changed: types.Machines{machine},
},
machine.MachineKey)
}
// We update our peers if the client is not sending ReadOnly in the MapRequest
// so we don't distribute its initial request (it comes with
// empty endpoints to peers)
// Details on the protocol can be found in https://github.com/tailscale/tailscale/blob/main/tailcfg/tailcfg.go#L696
logInfo("Client map request processed")
if mapRequest.ReadOnly {
logInfo("Client is starting up. Probably interested in a DERP map")
mapResp, err := mapp.FullMapResponse(mapRequest, machine, h.ACLPolicy)
if err != nil {
logErr(err, "Failed to create MapResponse")
http.Error(writer, "", http.StatusInternalServerError)
return
}
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
writer.WriteHeader(http.StatusOK)
_, err = writer.Write(mapResp)
if err != nil {
logErr(err, "Failed to write response")
}
if f, ok := writer.(http.Flusher); ok {
f.Flush()
}
return
}
if mapRequest.OmitPeers && !mapRequest.Stream {
logInfo("Client sent endpoint update and is ok with a response without peer list")
mapResp, err := mapp.LiteMapResponse(mapRequest, machine, h.ACLPolicy)
if err != nil {
logErr(err, "Failed to create MapResponse")
http.Error(writer, "", http.StatusInternalServerError)
return
}
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
writer.WriteHeader(http.StatusOK)
_, err = writer.Write(mapResp)
if err != nil {
logErr(err, "Failed to write response")
}
return
} else if mapRequest.OmitPeers && mapRequest.Stream {
log.Warn().
Str("handler", "PollNetMap").
Bool("noise", isNoise).
Str("machine", machine.Hostname).
Msg("Ignoring request, don't know how to handle it")
return
}
logInfo("Sending initial map") logInfo("Sending initial map")
mapResp, err := mapp.FullMapResponse(mapRequest, machine, h.ACLPolicy) mapResp, err := mapp.FullMapResponse(mapRequest, machine, h.ACLPolicy)
@ -247,6 +209,24 @@ func (h *Headscale) handlePoll(
return return
} }
h.nodeNotifier.NotifyWithIgnore(
types.StateUpdate{
Type: types.StatePeerChanged,
Changed: types.Machines{machine},
},
machine.MachineKey)
// Set up the client stream
h.pollNetMapStreamWG.Add(1)
defer h.pollNetMapStreamWG.Done()
updateChan := make(chan types.StateUpdate)
defer closeChanWithLog(updateChan, machine.Hostname, "updateChan")
// Register the node's update channel
h.nodeNotifier.AddNode(machine.MachineKey, updateChan)
defer h.nodeNotifier.RemoveNode(machine.MachineKey)
keepAliveTicker := time.NewTicker(keepAliveInterval) keepAliveTicker := time.NewTicker(keepAliveInterval)
ctx = context.WithValue(ctx, machineNameContextKey, machine.Hostname) ctx = context.WithValue(ctx, machineNameContextKey, machine.Hostname)
@ -254,8 +234,6 @@ func (h *Headscale) handlePoll(
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
h.pollStreamOpenMu.Unlock()
for { for {
logInfo("Waiting for update on stream channel") logInfo("Waiting for update on stream channel")
select { select {
@ -280,15 +258,23 @@ func (h *Headscale) handlePoll(
return return
} }
err = h.db.TouchMachine(machine) // This goroutine is not ideal, but we have a potential issue here
if err != nil { // where it blocks too long and that holds up updates.
logErr(err, "Cannot update machine LastSeen") // One alternative is to split these different channels into
// goroutines, but then you might have a problem without a lock
// if a keepalive is written at the same time as an update.
go func() {
err = h.db.UpdateLastSeen(machine)
if err != nil {
logErr(err, "Cannot update machine LastSeen")
return return
} }
}()
case update := <-updateChan: case update := <-updateChan:
logInfo("Received update") logInfo("Received update")
now := time.Now()
var data []byte var data []byte
var err error var err error
@ -332,25 +318,37 @@ func (h *Headscale) handlePoll(
return return
} }
// Keep track of the last successful update, // See comment in keepAliveTicker
// we sometimes end in a state were the update go func() {
// is not picked up by a client and we use this err = h.db.UpdateLastSeen(machine)
// to determine if we should "force" an update. if err != nil {
err = h.db.TouchMachine(machine) logErr(err, "Cannot update machine LastSeen")
if err != nil {
logErr(err, "Cannot update machine LastSuccessfulUpdate")
return return
} }
}()
logInfo("Update sent") log.Info().
Caller().
Bool("noise", isNoise).
Bool("readOnly", mapRequest.ReadOnly).
Bool("omitPeers", mapRequest.OmitPeers).
Bool("stream", mapRequest.Stream).
Str("node_key", machine.NodeKey).
Str("machine", machine.Hostname).
TimeDiff("timeSpent", time.Now(), now).
Msg("update sent")
case <-ctx.Done(): case <-ctx.Done():
logInfo("The client has closed the connection") logInfo("The client has closed the connection")
err := h.db.TouchMachine(machine) go func() {
if err != nil { err = h.db.UpdateLastSeen(machine)
logErr(err, "Cannot update machine LastSeen") if err != nil {
} logErr(err, "Cannot update machine LastSeen")
return
}
}()
// The connection has been closed, so we can stop polling. // The connection has been closed, so we can stop polling.
return return
@ -372,3 +370,89 @@ func closeChanWithLog[C chan []byte | chan struct{} | chan types.StateUpdate](ch
close(channel) close(channel)
} }
// TODO(kradalby): This might not actually be used,
// observing incoming client requests indicates it
// is not.
func (h *Headscale) handleReadOnly(
writer http.ResponseWriter,
machine *types.Machine,
mapRequest tailcfg.MapRequest,
isNoise bool,
) {
logInfo, logErr := logPollFunc(mapRequest, machine, isNoise)
mapp := mapper.NewMapper(
machine,
// TODO(kradalby): It might not be acceptable to send
// an empty peer list here.
types.Machines{},
h.privateKey2019,
isNoise,
h.DERPMap,
h.cfg.BaseDomain,
h.cfg.DNSConfig,
h.cfg.LogTail.Enabled,
h.cfg.RandomizeClientPort,
)
logInfo("Client is starting up. Probably interested in a DERP map")
mapResp, err := mapp.FullMapResponse(mapRequest, machine, h.ACLPolicy)
if err != nil {
logErr(err, "Failed to create MapResponse")
http.Error(writer, "", http.StatusInternalServerError)
return
}
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
writer.WriteHeader(http.StatusOK)
_, err = writer.Write(mapResp)
if err != nil {
logErr(err, "Failed to write response")
}
if f, ok := writer.(http.Flusher); ok {
f.Flush()
}
}
func (h *Headscale) handleLiteRequest(
writer http.ResponseWriter,
machine *types.Machine,
mapRequest tailcfg.MapRequest,
isNoise bool,
) {
logInfo, logErr := logPollFunc(mapRequest, machine, isNoise)
mapp := mapper.NewMapper(
machine,
// TODO(kradalby): It might not be acceptable to send
// an empty peer list here.
types.Machines{},
h.privateKey2019,
isNoise,
h.DERPMap,
h.cfg.BaseDomain,
h.cfg.DNSConfig,
h.cfg.LogTail.Enabled,
h.cfg.RandomizeClientPort,
)
logInfo("Client asked for a lite update, responding without peers")
mapResp, err := mapp.LiteMapResponse(mapRequest, machine, h.ACLPolicy)
if err != nil {
logErr(err, "Failed to create MapResponse")
http.Error(writer, "", http.StatusInternalServerError)
return
}
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
writer.WriteHeader(http.StatusOK)
_, err = writer.Write(mapResp)
if err != nil {
logErr(err, "Failed to write response")
}
}