Merge pull request #346 from kradalby/integration-test-concurrent-join

Fix ip allocation bug, make integration tests faster
This commit is contained in:
Kristoffer Dalby 2022-02-25 10:37:35 +01:00 committed by GitHub
commit 08c7076667
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 138 additions and 76 deletions

View file

@ -29,4 +29,4 @@ jobs:
- name: Run Integration tests - name: Run Integration tests
if: steps.changed-files.outputs.any_changed == 'true' if: steps.changed-files.outputs.any_changed == 'true'
run: go test -tags integration -timeout 30m run: make test_integration

View file

@ -2,7 +2,13 @@
**TBD (TBD):** **TBD (TBD):**
**0.14.0 (2022-xx-xx):** **0.15.0 (2022-xx-xx):**
**Changes**:
- Fix a bug were the same IP could be assigned to multiple hosts if joined in quick succession [#346](https://github.com/juanfont/headscale/pull/346)
**0.14.0 (2022-02-25):**
**UPCOMING BREAKING**: **UPCOMING BREAKING**:
From the **next** version (`0.15.0`), all machines will be able to communicate regardless of From the **next** version (`0.15.0`), all machines will be able to communicate regardless of

View file

@ -18,7 +18,7 @@ test:
@go test -coverprofile=coverage.out ./... @go test -coverprofile=coverage.out ./...
test_integration: test_integration:
go test -tags integration -timeout 30m -count=1 ./... go test -failfast -tags integration -timeout 30m -count=1 ./...
test_integration_cli: test_integration_cli:
go test -tags integration -v integration_cli_test.go integration_common_test.go go test -tags integration -v integration_cli_test.go integration_common_test.go

5
api.go
View file

@ -574,6 +574,9 @@ func (h *Headscale) handleAuthKey(
Str("func", "handleAuthKey"). Str("func", "handleAuthKey").
Str("machine", machine.Name). Str("machine", machine.Name).
Msg("Authentication key was valid, proceeding to acquire IP addresses") Msg("Authentication key was valid, proceeding to acquire IP addresses")
h.ipAllocationMutex.Lock()
ips, err := h.getAvailableIPs() ips, err := h.getAvailableIPs()
if err != nil { if err != nil {
log.Error(). log.Error().
@ -602,6 +605,8 @@ func (h *Headscale) handleAuthKey(
machine.Registered = true machine.Registered = true
machine.RegisterMethod = RegisterMethodAuthKey machine.RegisterMethod = RegisterMethodAuthKey
h.db.Save(&machine) h.db.Save(&machine)
h.ipAllocationMutex.Unlock()
} }
pak.Used = true pak.Used = true

2
app.go
View file

@ -153,6 +153,8 @@ type Headscale struct {
oidcStateCache *cache.Cache oidcStateCache *cache.Cache
requestedExpiryCache *cache.Cache requestedExpiryCache *cache.Cache
ipAllocationMutex sync.Mutex
} }
// Look up the TLS constant relative to user-supplied TLS client // Look up the TLS constant relative to user-supplied TLS client

View file

@ -15,6 +15,7 @@ import (
"os" "os"
"path" "path"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -44,6 +45,8 @@ type IntegrationTestSuite struct {
headscale dockertest.Resource headscale dockertest.Resource
namespaces map[string]TestNamespace namespaces map[string]TestNamespace
joinWaitGroup sync.WaitGroup
} }
func TestIntegrationTestSuite(t *testing.T) { func TestIntegrationTestSuite(t *testing.T) {
@ -118,7 +121,7 @@ func (s *IntegrationTestSuite) saveLog(
return err return err
} }
fmt.Printf("Saving logs for %s to %s\n", resource.Container.Name, basePath) log.Printf("Saving logs for %s to %s\n", resource.Container.Name, basePath)
err = ioutil.WriteFile( err = ioutil.WriteFile(
path.Join(basePath, resource.Container.Name+".stdout.log"), path.Join(basePath, resource.Container.Name+".stdout.log"),
@ -141,6 +144,34 @@ func (s *IntegrationTestSuite) saveLog(
return nil return nil
} }
func (s *IntegrationTestSuite) Join(
endpoint, key, hostname string,
tailscale dockertest.Resource,
) {
defer s.joinWaitGroup.Done()
command := []string{
"tailscale",
"up",
"-login-server",
endpoint,
"--authkey",
key,
"--hostname",
hostname,
}
log.Println("Join command:", command)
log.Printf("Running join command for %s\n", hostname)
_, err := ExecuteCommand(
&tailscale,
command,
[]string{},
)
assert.Nil(s.T(), err)
log.Printf("%s joined\n", hostname)
}
func (s *IntegrationTestSuite) tailscaleContainer( func (s *IntegrationTestSuite) tailscaleContainer(
namespace, identifier, version string, namespace, identifier, version string,
) (string, *dockertest.Resource) { ) (string, *dockertest.Resource) {
@ -178,7 +209,7 @@ func (s *IntegrationTestSuite) tailscaleContainer(
if err != nil { if err != nil {
log.Fatalf("Could not start resource: %s", err) log.Fatalf("Could not start resource: %s", err)
} }
fmt.Printf("Created %s container\n", hostname) log.Printf("Created %s container\n", hostname)
return hostname, pts return hostname, pts
} }
@ -221,15 +252,15 @@ func (s *IntegrationTestSuite) SetupSuite() {
Cmd: []string{"headscale", "serve"}, Cmd: []string{"headscale", "serve"},
} }
fmt.Println("Creating headscale container") log.Println("Creating headscale container")
if pheadscale, err := s.pool.BuildAndRunWithBuildOptions(headscaleBuildOptions, headscaleOptions, DockerRestartPolicy); err == nil { if pheadscale, err := s.pool.BuildAndRunWithBuildOptions(headscaleBuildOptions, headscaleOptions, DockerRestartPolicy); err == nil {
s.headscale = *pheadscale s.headscale = *pheadscale
} else { } else {
log.Fatalf("Could not start resource: %s", err) log.Fatalf("Could not start resource: %s", err)
} }
fmt.Println("Created headscale container") log.Println("Created headscale container")
fmt.Println("Creating tailscale containers") log.Println("Creating tailscale containers")
for namespace, scales := range s.namespaces { for namespace, scales := range s.namespaces {
for i := 0; i < scales.count; i++ { for i := 0; i < scales.count; i++ {
version := tailscaleVersions[i%len(tailscaleVersions)] version := tailscaleVersions[i%len(tailscaleVersions)]
@ -243,7 +274,7 @@ func (s *IntegrationTestSuite) SetupSuite() {
} }
} }
fmt.Println("Waiting for headscale to be ready") log.Println("Waiting for headscale to be ready")
hostEndpoint := fmt.Sprintf("localhost:%s", s.headscale.GetPort("8080/tcp")) hostEndpoint := fmt.Sprintf("localhost:%s", s.headscale.GetPort("8080/tcp"))
if err := s.pool.Retry(func() error { if err := s.pool.Retry(func() error {
@ -266,19 +297,19 @@ func (s *IntegrationTestSuite) SetupSuite() {
// https://github.com/stretchr/testify/issues/849 // https://github.com/stretchr/testify/issues/849
return // fmt.Errorf("Could not connect to headscale: %s", err) return // fmt.Errorf("Could not connect to headscale: %s", err)
} }
fmt.Println("headscale container is ready") log.Println("headscale container is ready")
for namespace, scales := range s.namespaces { for namespace, scales := range s.namespaces {
fmt.Printf("Creating headscale namespace: %s\n", namespace) log.Printf("Creating headscale namespace: %s\n", namespace)
result, err := ExecuteCommand( result, err := ExecuteCommand(
&s.headscale, &s.headscale,
[]string{"headscale", "namespaces", "create", namespace}, []string{"headscale", "namespaces", "create", namespace},
[]string{}, []string{},
) )
fmt.Println("headscale create namespace result: ", result) log.Println("headscale create namespace result: ", result)
assert.Nil(s.T(), err) assert.Nil(s.T(), err)
fmt.Printf("Creating pre auth key for %s\n", namespace) log.Printf("Creating pre auth key for %s\n", namespace)
preAuthResult, err := ExecuteCommand( preAuthResult, err := ExecuteCommand(
&s.headscale, &s.headscale,
[]string{ []string{
@ -304,33 +335,16 @@ func (s *IntegrationTestSuite) SetupSuite() {
headscaleEndpoint := "http://headscale:8080" headscaleEndpoint := "http://headscale:8080"
fmt.Printf( log.Printf(
"Joining tailscale containers to headscale at %s\n", "Joining tailscale containers to headscale at %s\n",
headscaleEndpoint, headscaleEndpoint,
) )
for hostname, tailscale := range scales.tailscales { for hostname, tailscale := range scales.tailscales {
command := []string{ s.joinWaitGroup.Add(1)
"tailscale", go s.Join(headscaleEndpoint, preAuthKey.Key, hostname, tailscale)
"up",
"-login-server",
headscaleEndpoint,
"--authkey",
preAuthKey.Key,
"--hostname",
hostname,
}
fmt.Println("Join command:", command)
fmt.Printf("Running join command for %s\n", hostname)
result, err := ExecuteCommand(
&tailscale,
command,
[]string{},
)
fmt.Println("tailscale result: ", result)
assert.Nil(s.T(), err)
fmt.Printf("%s joined\n", hostname)
} }
s.joinWaitGroup.Wait()
} }
// The nodes need a bit of time to get their updated maps from headscale // The nodes need a bit of time to get their updated maps from headscale
@ -350,7 +364,7 @@ func (s *IntegrationTestSuite) HandleStats(
func (s *IntegrationTestSuite) TestListNodes() { func (s *IntegrationTestSuite) TestListNodes() {
for namespace, scales := range s.namespaces { for namespace, scales := range s.namespaces {
fmt.Println("Listing nodes") log.Println("Listing nodes")
result, err := ExecuteCommand( result, err := ExecuteCommand(
&s.headscale, &s.headscale,
[]string{"headscale", "--namespace", namespace, "nodes", "list"}, []string{"headscale", "--namespace", namespace, "nodes", "list"},
@ -358,7 +372,7 @@ func (s *IntegrationTestSuite) TestListNodes() {
) )
assert.Nil(s.T(), err) assert.Nil(s.T(), err)
fmt.Printf("List nodes: \n%s\n", result) log.Printf("List nodes: \n%s\n", result)
// Chck that the correct count of host is present in node list // Chck that the correct count of host is present in node list
lines := strings.Split(result, "\n") lines := strings.Split(result, "\n")
@ -381,7 +395,7 @@ func (s *IntegrationTestSuite) TestGetIpAddresses() {
s.T().Run(hostname, func(t *testing.T) { s.T().Run(hostname, func(t *testing.T) {
assert.NotNil(t, ip) assert.NotNil(t, ip)
fmt.Printf("IP for %s: %s\n", hostname, ip) log.Printf("IP for %s: %s\n", hostname, ip)
// c.Assert(ip.Valid(), check.IsTrue) // c.Assert(ip.Valid(), check.IsTrue)
assert.True(t, ip.Is4() || ip.Is6()) assert.True(t, ip.Is4() || ip.Is6())
@ -410,7 +424,7 @@ func (s *IntegrationTestSuite) TestGetIpAddresses() {
// s.T().Run(hostname, func(t *testing.T) { // s.T().Run(hostname, func(t *testing.T) {
// command := []string{"tailscale", "status", "--json"} // command := []string{"tailscale", "status", "--json"}
// //
// fmt.Printf("Getting status for %s\n", hostname) // log.Printf("Getting status for %s\n", hostname)
// result, err := ExecuteCommand( // result, err := ExecuteCommand(
// &tailscale, // &tailscale,
// command, // command,
@ -477,7 +491,7 @@ func (s *IntegrationTestSuite) TestPingAllPeersByAddress() {
ip.String(), ip.String(),
} }
fmt.Printf( log.Printf(
"Pinging from %s to %s (%s)\n", "Pinging from %s to %s (%s)\n",
hostname, hostname,
peername, peername,
@ -489,7 +503,7 @@ func (s *IntegrationTestSuite) TestPingAllPeersByAddress() {
[]string{}, []string{},
) )
assert.Nil(t, err) assert.Nil(t, err)
fmt.Printf("Result for %s: %s\n", hostname, result) log.Printf("Result for %s: %s\n", hostname, result)
assert.Contains(t, result, "pong") assert.Contains(t, result, "pong")
}) })
} }
@ -512,6 +526,7 @@ func (s *IntegrationTestSuite) TestTailDrop() {
} }
time.Sleep(sleepInverval) time.Sleep(sleepInverval)
} }
return return
} }
@ -534,7 +549,7 @@ func (s *IntegrationTestSuite) TestTailDrop() {
fmt.Sprintf("%s:", peername), fmt.Sprintf("%s:", peername),
} }
retry(10, 1*time.Second, func() error { retry(10, 1*time.Second, func() error {
fmt.Printf( log.Printf(
"Sending file from %s to %s\n", "Sending file from %s to %s\n",
hostname, hostname,
peername, peername,
@ -573,7 +588,7 @@ func (s *IntegrationTestSuite) TestTailDrop() {
"ls", "ls",
fmt.Sprintf("/tmp/file_from_%s", peername), fmt.Sprintf("/tmp/file_from_%s", peername),
} }
fmt.Printf( log.Printf(
"Checking file in %s (%s) from %s (%s)\n", "Checking file in %s (%s) from %s (%s)\n",
hostname, hostname,
ips[hostname], ips[hostname],
@ -586,7 +601,7 @@ func (s *IntegrationTestSuite) TestTailDrop() {
[]string{}, []string{},
) )
assert.Nil(t, err) assert.Nil(t, err)
fmt.Printf("Result for %s: %s\n", peername, result) log.Printf("Result for %s: %s\n", peername, result)
assert.Equal( assert.Equal(
t, t,
fmt.Sprintf("/tmp/file_from_%s\n", peername), fmt.Sprintf("/tmp/file_from_%s\n", peername),
@ -616,7 +631,7 @@ func (s *IntegrationTestSuite) TestPingAllPeersByHostname() {
fmt.Sprintf("%s.%s.headscale.net", peername, namespace), fmt.Sprintf("%s.%s.headscale.net", peername, namespace),
} }
fmt.Printf( log.Printf(
"Pinging using hostname from %s to %s\n", "Pinging using hostname from %s to %s\n",
hostname, hostname,
peername, peername,
@ -627,7 +642,7 @@ func (s *IntegrationTestSuite) TestPingAllPeersByHostname() {
[]string{}, []string{},
) )
assert.Nil(t, err) assert.Nil(t, err)
fmt.Printf("Result for %s: %s\n", hostname, result) log.Printf("Result for %s: %s\n", hostname, result)
assert.Contains(t, result, "pong") assert.Contains(t, result, "pong")
}) })
} }
@ -650,7 +665,7 @@ func (s *IntegrationTestSuite) TestMagicDNS() {
fmt.Sprintf("%s.%s.headscale.net", peername, namespace), fmt.Sprintf("%s.%s.headscale.net", peername, namespace),
} }
fmt.Printf( log.Printf(
"Resolving name %s from %s\n", "Resolving name %s from %s\n",
peername, peername,
hostname, hostname,
@ -661,7 +676,7 @@ func (s *IntegrationTestSuite) TestMagicDNS() {
[]string{}, []string{},
) )
assert.Nil(t, err) assert.Nil(t, err)
fmt.Printf("Result for %s: %s\n", hostname, result) log.Printf("Result for %s: %s\n", hostname, result)
for _, ip := range ips { for _, ip := range ips {
assert.Contains(t, result, ip.String()) assert.Contains(t, result, ip.String())

View file

@ -755,6 +755,9 @@ func (h *Headscale) RegisterMachine(
return nil, err return nil, err
} }
h.ipAllocationMutex.Lock()
defer h.ipAllocationMutex.Unlock()
ips, err := h.getAvailableIPs() ips, err := h.getAvailableIPs()
if err != nil { if err != nil {
log.Error(). log.Error().

View file

@ -317,6 +317,8 @@ func (h *Headscale) OIDCCallback(ctx *gin.Context) {
return return
} }
h.ipAllocationMutex.Lock()
ips, err := h.getAvailableIPs() ips, err := h.getAvailableIPs()
if err != nil { if err != nil {
log.Error(). log.Error().
@ -338,6 +340,8 @@ func (h *Headscale) OIDCCallback(ctx *gin.Context) {
machine.LastSuccessfulUpdate = &now machine.LastSuccessfulUpdate = &now
machine.Expiry = &requestedTime machine.Expiry = &requestedTime
h.db.Save(&machine) h.db.Save(&machine)
h.ipAllocationMutex.Unlock()
} }
var content bytes.Buffer var content bytes.Buffer

View file

@ -157,9 +157,6 @@ func GetIPPrefixEndpoints(na netaddr.IPPrefix) (network, broadcast netaddr.IP) {
return return
} }
// TODO: Is this concurrency safe?
// What would happen if multiple hosts were to register at the same time?
// Would we attempt to assign the same addresses to multiple nodes?
func (h *Headscale) getAvailableIP(ipPrefix netaddr.IPPrefix) (*netaddr.IP, error) { func (h *Headscale) getAvailableIP(ipPrefix netaddr.IPPrefix) (*netaddr.IP, error) {
usedIps, err := h.getUsedIPs() usedIps, err := h.getUsedIPs()
if err != nil { if err != nil {
@ -179,7 +176,7 @@ func (h *Headscale) getAvailableIP(ipPrefix netaddr.IPPrefix) (*netaddr.IP, erro
switch { switch {
case ip.Compare(ipPrefixBroadcastAddress) == 0: case ip.Compare(ipPrefixBroadcastAddress) == 0:
fallthrough fallthrough
case containsIPs(usedIps, ip): case usedIps.Contains(ip):
fallthrough fallthrough
case ip.IsZero() || ip.IsLoopback(): case ip.IsZero() || ip.IsLoopback():
ip = ip.Next() ip = ip.Next()
@ -192,24 +189,46 @@ func (h *Headscale) getAvailableIP(ipPrefix netaddr.IPPrefix) (*netaddr.IP, erro
} }
} }
func (h *Headscale) getUsedIPs() ([]netaddr.IP, error) { func (h *Headscale) getUsedIPs() (*netaddr.IPSet, error) {
// FIXME: This really deserves a better data model, // FIXME: This really deserves a better data model,
// but this was quick to get running and it should be enough // but this was quick to get running and it should be enough
// to begin experimenting with a dual stack tailnet. // to begin experimenting with a dual stack tailnet.
var addressesSlices []string var addressesSlices []string
h.db.Model(&Machine{}).Pluck("ip_addresses", &addressesSlices) h.db.Model(&Machine{}).Pluck("ip_addresses", &addressesSlices)
ips := make([]netaddr.IP, 0, len(h.cfg.IPPrefixes)*len(addressesSlices)) log.Trace().
Strs("addresses", addressesSlices).
Msg("Got allocated ip addresses from databases")
var ips netaddr.IPSetBuilder
for _, slice := range addressesSlices { for _, slice := range addressesSlices {
var a MachineAddresses var machineAddresses MachineAddresses
err := a.Scan(slice) err := machineAddresses.Scan(slice)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to read ip from database: %w", err) return &netaddr.IPSet{}, fmt.Errorf(
"failed to read ip from database: %w",
err,
)
}
for _, ip := range machineAddresses {
ips.Add(ip)
} }
ips = append(ips, a...)
} }
return ips, nil log.Trace().
Interface("addresses", ips).
Msg("Parsed ip addresses that has been allocated from databases")
ipSet, err := ips.IPSet()
if err != nil {
return &netaddr.IPSet{}, fmt.Errorf(
"failed to build IP Set: %w",
err,
)
}
return ipSet, nil
} }
func containsString(ss []string, s string) bool { func containsString(ss []string, s string) bool {
@ -222,16 +241,6 @@ func containsString(ss []string, s string) bool {
return false return false
} }
func containsIPs(ips []netaddr.IP, ip netaddr.IP) bool {
for _, v := range ips {
if v == ip {
return true
}
}
return false
}
func tailNodesToString(nodes []*tailcfg.Node) string { func tailNodesToString(nodes []*tailcfg.Node) string {
temp := make([]string, len(nodes)) temp := make([]string, len(nodes))

View file

@ -48,9 +48,12 @@ func (s *Suite) TestGetUsedIps(c *check.C) {
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
expected := netaddr.MustParseIP("10.27.0.1") expected := netaddr.MustParseIP("10.27.0.1")
expectedIPSetBuilder := netaddr.IPSetBuilder{}
expectedIPSetBuilder.Add(expected)
expectedIPSet, _ := expectedIPSetBuilder.IPSet()
c.Assert(len(usedIps), check.Equals, 1) c.Assert(usedIps.Equal(expectedIPSet), check.Equals, true)
c.Assert(usedIps[0], check.Equals, expected) c.Assert(usedIps.Contains(expected), check.Equals, true)
machine1, err := app.GetMachineByID(0) machine1, err := app.GetMachineByID(0)
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
@ -64,6 +67,8 @@ func (s *Suite) TestGetMultiIp(c *check.C) {
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
for index := 1; index <= 350; index++ { for index := 1; index <= 350; index++ {
app.ipAllocationMutex.Lock()
ips, err := app.getAvailableIPs() ips, err := app.getAvailableIPs()
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
@ -86,17 +91,30 @@ func (s *Suite) TestGetMultiIp(c *check.C) {
IPAddresses: ips, IPAddresses: ips,
} }
app.db.Save(&machine) app.db.Save(&machine)
app.ipAllocationMutex.Unlock()
} }
usedIps, err := app.getUsedIPs() usedIps, err := app.getUsedIPs()
c.Assert(err, check.IsNil) c.Assert(err, check.IsNil)
c.Assert(len(usedIps), check.Equals, 350) expected0 := netaddr.MustParseIP("10.27.0.1")
expected9 := netaddr.MustParseIP("10.27.0.10")
expected300 := netaddr.MustParseIP("10.27.0.45")
c.Assert(usedIps[0], check.Equals, netaddr.MustParseIP("10.27.0.1")) notExpectedIPSetBuilder := netaddr.IPSetBuilder{}
c.Assert(usedIps[9], check.Equals, netaddr.MustParseIP("10.27.0.10")) notExpectedIPSetBuilder.Add(expected0)
c.Assert(usedIps[300], check.Equals, netaddr.MustParseIP("10.27.1.45")) notExpectedIPSetBuilder.Add(expected9)
notExpectedIPSetBuilder.Add(expected300)
notExpectedIPSet, err := notExpectedIPSetBuilder.IPSet()
c.Assert(err, check.IsNil)
// We actually expect it to be a lot larger
c.Assert(usedIps.Equal(notExpectedIPSet), check.Equals, false)
c.Assert(usedIps.Contains(expected0), check.Equals, true)
c.Assert(usedIps.Contains(expected9), check.Equals, true)
c.Assert(usedIps.Contains(expected300), check.Equals, true)
// Check that we can read back the IPs // Check that we can read back the IPs
machine1, err := app.GetMachineByID(1) machine1, err := app.GetMachineByID(1)