From 1e610848980012e5fe49c1b5d68bdef08854d419 Mon Sep 17 00:00:00 2001 From: enoperm <61619504+enoperm@users.noreply.github.com> Date: Sat, 21 Sep 2024 12:05:36 +0200 Subject: [PATCH] Add compatibility with only websocket-capable clients (#2132) * handle control protocol through websocket The necessary behaviour is already in place, but the wasm build only issued GETs, and the handler was not invoked. * get DERP-over-websocket working for wasm clients * Prepare for testing builtin websocket-over-DERP Still needs some way to assert that clients are connected through websockets, rather than the TCP hijacking version of DERP. * integration tests: properly differentiate between DERP transports * do not touch unrelated code * linter fixes * integration testing: unexport common implementation of derp server scenario * fixup! integration testing: unexport common implementation of derp server scenario * dockertestutil/logs: remove unhelpful comment * update changelog --------- Co-authored-by: Csaba Sarkadi --- .github/workflows/test-integration.yaml | 1 + CHANGELOG.md | 6 +- go.mod | 2 +- hscontrol/app.go | 2 +- hscontrol/derp/server/derp_server.go | 53 +++++++++++ hscontrol/types/users.go | 2 +- hscontrol/util/net.go | 1 - integration/dns_test.go | 1 - integration/dockertestutil/logs.go | 42 +++++---- integration/embedded_derp_test.go | 118 +++++++++++++++++++++--- integration/hsic/hsic.go | 6 ++ integration/tailscale.go | 3 + integration/tsic/tsic.go | 37 ++++++++ integration/utils.go | 43 ++++++++- 14 files changed, 280 insertions(+), 37 deletions(-) diff --git a/.github/workflows/test-integration.yaml b/.github/workflows/test-integration.yaml index d6c7eff2..80daf20a 100644 --- a/.github/workflows/test-integration.yaml +++ b/.github/workflows/test-integration.yaml @@ -41,6 +41,7 @@ jobs: - TestResolveMagicDNS - TestValidateResolvConf - TestDERPServerScenario + - TestDERPServerWebsocketScenario - TestPingAllByIP - TestPingAllByIPPublicDERP - TestAuthKeyLogoutAndRelogin diff --git a/CHANGELOG.md b/CHANGELOG.md index f8035d51..7e91082c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # CHANGELOG -## 0.23.0 (2023-09-18) +## Next + +- Improved compatibilty of built-in DERP server with clients connecting over WebSocket. + +## 0.23.0 (2024-09-18) This release was intended to be mainly a code reorganisation and refactoring, significantly improving the maintainability of the codebase. This should allow us to improve further and make it easier for the maintainers to keep on top of the project. However, as you all have noticed, it turned out to become a much larger, much longer release cycle than anticipated. It has ended up to be a release with a lot of rewrites and changes to the code base and functionality of Headscale, cleaning up a lot of technical debt and introducing a lot of improvements. This does come with some breaking changes, diff --git a/go.mod b/go.mod index 18089bbd..73893d82 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.23.0 require ( github.com/AlecAivazis/survey/v2 v2.3.7 + github.com/coder/websocket v1.8.12 github.com/coreos/go-oidc/v3 v3.11.0 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/deckarep/golang-set/v2 v2.6.0 @@ -79,7 +80,6 @@ require ( github.com/bits-and-blooms/bitset v1.13.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/coder/websocket v1.8.12 // indirect github.com/containerd/console v1.0.4 // indirect github.com/containerd/continuity v0.4.3 // indirect github.com/coreos/go-iptables v0.7.1-0.20240112124308-65c67c9f46e6 // indirect diff --git a/hscontrol/app.go b/hscontrol/app.go index 4a5b4679..1d3cb629 100644 --- a/hscontrol/app.go +++ b/hscontrol/app.go @@ -425,7 +425,7 @@ func (h *Headscale) createRouter(grpcMux *grpcRuntime.ServeMux) *mux.Router { router := mux.NewRouter() router.Use(prometheusMiddleware) - router.HandleFunc(ts2021UpgradePath, h.NoiseUpgradeHandler).Methods(http.MethodPost) + router.HandleFunc(ts2021UpgradePath, h.NoiseUpgradeHandler).Methods(http.MethodPost, http.MethodGet) router.HandleFunc("/health", h.HealthHandler).Methods(http.MethodGet) router.HandleFunc("/key", h.KeyHandler).Methods(http.MethodGet) diff --git a/hscontrol/derp/server/derp_server.go b/hscontrol/derp/server/derp_server.go index 0b0c9b16..0c97806f 100644 --- a/hscontrol/derp/server/derp_server.go +++ b/hscontrol/derp/server/derp_server.go @@ -1,6 +1,7 @@ package server import ( + "bufio" "context" "encoding/json" "fmt" @@ -12,11 +13,13 @@ import ( "strings" "time" + "github.com/coder/websocket" "github.com/juanfont/headscale/hscontrol/types" "github.com/juanfont/headscale/hscontrol/util" "github.com/rs/zerolog/log" "tailscale.com/derp" "tailscale.com/net/stun" + "tailscale.com/net/wsconn" "tailscale.com/tailcfg" "tailscale.com/types/key" ) @@ -132,6 +135,56 @@ func (d *DERPServer) DERPHandler( return } + if strings.Contains(req.Header.Get("Sec-Websocket-Protocol"), "derp") { + d.serveWebsocket(writer, req) + } else { + d.servePlain(writer, req) + } +} + +func (d *DERPServer) serveWebsocket(writer http.ResponseWriter, req *http.Request) { + websocketConn, err := websocket.Accept(writer, req, &websocket.AcceptOptions{ + Subprotocols: []string{"derp"}, + OriginPatterns: []string{"*"}, + // Disable compression because DERP transmits WireGuard messages that + // are not compressible. + // Additionally, Safari has a broken implementation of compression + // (see https://github.com/nhooyr/websocket/issues/218) that makes + // enabling it actively harmful. + CompressionMode: websocket.CompressionDisabled, + }) + if err != nil { + log.Error(). + Caller(). + Err(err). + Msg("Failed to upgrade websocket request") + + writer.Header().Set("Content-Type", "text/plain") + writer.WriteHeader(http.StatusInternalServerError) + + _, err = writer.Write([]byte("Failed to upgrade websocket request")) + if err != nil { + log.Error(). + Caller(). + Err(err). + Msg("Failed to write response") + } + + return + } + defer websocketConn.Close(websocket.StatusInternalError, "closing") + if websocketConn.Subprotocol() != "derp" { + websocketConn.Close(websocket.StatusPolicyViolation, "client must speak the derp subprotocol") + + return + } + + wc := wsconn.NetConn(req.Context(), websocketConn, websocket.MessageBinary, req.RemoteAddr) + brw := bufio.NewReadWriter(bufio.NewReader(wc), bufio.NewWriter(wc)) + d.tailscaleDERP.Accept(req.Context(), wc, brw, req.RemoteAddr) +} + +func (d *DERPServer) servePlain(writer http.ResponseWriter, req *http.Request) { fastStart := req.Header.Get(fastStartHeader) == "1" hijacker, ok := writer.(http.Hijacker) diff --git a/hscontrol/types/users.go b/hscontrol/types/users.go index 63e73a56..3e934e34 100644 --- a/hscontrol/types/users.go +++ b/hscontrol/types/users.go @@ -19,7 +19,7 @@ type User struct { Name string `gorm:"unique"` } -// TODO(kradalby): See if we can fill in Gravatar here +// TODO(kradalby): See if we can fill in Gravatar here. func (u *User) profilePicURL() string { return "" } diff --git a/hscontrol/util/net.go b/hscontrol/util/net.go index c44b7287..59a8d635 100644 --- a/hscontrol/util/net.go +++ b/hscontrol/util/net.go @@ -13,7 +13,6 @@ func GrpcSocketDialer(ctx context.Context, addr string) (net.Conn, error) { return d.DialContext(ctx, "unix", addr) } - // TODO(kradalby): Remove after go 1.24, will be in stdlib. // Compare returns an integer comparing two prefixes. // The result will be 0 if p == p2, -1 if p < p2, and +1 if p > p2. diff --git a/integration/dns_test.go b/integration/dns_test.go index 085448c5..efe702e9 100644 --- a/integration/dns_test.go +++ b/integration/dns_test.go @@ -242,5 +242,4 @@ func TestValidateResolvConf(t *testing.T) { } }) } - } diff --git a/integration/dockertestutil/logs.go b/integration/dockertestutil/logs.go index 64c3c9ac..7d104e43 100644 --- a/integration/dockertestutil/logs.go +++ b/integration/dockertestutil/logs.go @@ -3,6 +3,7 @@ package dockertestutil import ( "bytes" "context" + "io" "log" "os" "path" @@ -13,6 +14,28 @@ import ( const filePerm = 0o644 +func WriteLog( + pool *dockertest.Pool, + resource *dockertest.Resource, + stdout io.Writer, + stderr io.Writer, +) error { + return pool.Client.Logs( + docker.LogsOptions{ + Context: context.TODO(), + Container: resource.Container.ID, + OutputStream: stdout, + ErrorStream: stderr, + Tail: "all", + RawTerminal: false, + Stdout: true, + Stderr: true, + Follow: false, + Timestamps: false, + }, + ) +} + func SaveLog( pool *dockertest.Pool, resource *dockertest.Resource, @@ -23,23 +46,8 @@ func SaveLog( return "", "", err } - var stdout bytes.Buffer - var stderr bytes.Buffer - - err = pool.Client.Logs( - docker.LogsOptions{ - Context: context.TODO(), - Container: resource.Container.ID, - OutputStream: &stdout, - ErrorStream: &stderr, - Tail: "all", - RawTerminal: false, - Stdout: true, - Stderr: true, - Follow: false, - Timestamps: false, - }, - ) + var stdout, stderr bytes.Buffer + err = WriteLog(pool, resource, &stdout, &stderr) if err != nil { return "", "", err } diff --git a/integration/embedded_derp_test.go b/integration/embedded_derp_test.go index 259c565a..6009aed5 100644 --- a/integration/embedded_derp_test.go +++ b/integration/embedded_derp_test.go @@ -15,6 +15,11 @@ import ( "github.com/ory/dockertest/v3" ) +type ClientsSpec struct { + Plain int + WebsocketDERP int +} + type EmbeddedDERPServerScenario struct { *Scenario @@ -22,6 +27,65 @@ type EmbeddedDERPServerScenario struct { } func TestDERPServerScenario(t *testing.T) { + spec := map[string]ClientsSpec{ + "user1": { + Plain: len(MustTestVersions), + WebsocketDERP: 0, + }, + } + + derpServerScenario(t, spec, func(scenario *EmbeddedDERPServerScenario) { + allClients, err := scenario.ListTailscaleClients() + assertNoErrListClients(t, err) + t.Logf("checking %d clients for websocket connections", len(allClients)) + + for _, client := range allClients { + if didClientUseWebsocketForDERP(t, client) { + t.Logf( + "client %q used websocket a connection, but was not expected to", + client.Hostname(), + ) + t.Fail() + } + } + }) +} + +func TestDERPServerWebsocketScenario(t *testing.T) { + spec := map[string]ClientsSpec{ + "user1": { + Plain: 0, + WebsocketDERP: len(MustTestVersions), + }, + } + + derpServerScenario(t, spec, func(scenario *EmbeddedDERPServerScenario) { + allClients, err := scenario.ListTailscaleClients() + assertNoErrListClients(t, err) + t.Logf("checking %d clients for websocket connections", len(allClients)) + + for _, client := range allClients { + if !didClientUseWebsocketForDERP(t, client) { + t.Logf( + "client %q does not seem to have used a websocket connection, even though it was expected to do so", + client.Hostname(), + ) + t.Fail() + } + } + }) +} + +// This function implements the common parts of a DERP scenario, +// we *want* it to show up in stacktraces, +// so marking it as a test helper would be counterproductive. +// +//nolint:thelper +func derpServerScenario( + t *testing.T, + spec map[string]ClientsSpec, + furtherAssertions ...func(*EmbeddedDERPServerScenario), +) { IntegrationSkip(t) // t.Parallel() @@ -34,20 +98,18 @@ func TestDERPServerScenario(t *testing.T) { } defer scenario.ShutdownAssertNoPanics(t) - spec := map[string]int{ - "user1": len(MustTestVersions), - } - err = scenario.CreateHeadscaleEnv( spec, hsic.WithTestName("derpserver"), hsic.WithExtraPorts([]string{"3478/udp"}), hsic.WithEmbeddedDERPServerOnly(), + hsic.WithPort(443), hsic.WithTLS(), hsic.WithHostnameAsServerURL(), hsic.WithConfigEnv(map[string]string{ "HEADSCALE_DERP_AUTO_UPDATE_ENABLED": "true", "HEADSCALE_DERP_UPDATE_FREQUENCY": "10s", + "HEADSCALE_LISTEN_ADDR": "0.0.0.0:443", }), ) assertNoErrHeadscaleEnv(t, err) @@ -76,6 +138,11 @@ func TestDERPServerScenario(t *testing.T) { } success := pingDerpAllHelper(t, allClients, allHostnames) + if len(allHostnames)*len(allClients) > success { + t.FailNow() + + return + } for _, client := range allClients { status, err := client.Status() @@ -98,6 +165,9 @@ func TestDERPServerScenario(t *testing.T) { time.Sleep(30 * time.Second) success = pingDerpAllHelper(t, allClients, allHostnames) + if len(allHostnames)*len(allClients) > success { + t.Fail() + } for _, client := range allClients { status, err := client.Status() @@ -114,10 +184,14 @@ func TestDERPServerScenario(t *testing.T) { } t.Logf("Run2: %d successful pings out of %d", success, len(allClients)*len(allHostnames)) + + for _, check := range furtherAssertions { + check(&scenario) + } } func (s *EmbeddedDERPServerScenario) CreateHeadscaleEnv( - users map[string]int, + users map[string]ClientsSpec, opts ...hsic.Option, ) error { hsServer, err := s.Headscale(opts...) @@ -137,6 +211,7 @@ func (s *EmbeddedDERPServerScenario) CreateHeadscaleEnv( if err != nil { return err } + log.Printf("headscale server ip address: %s", hsServer.GetIP()) hash, err := util.GenerateRandomStringDNSSafe(scenarioHashLength) if err != nil { @@ -149,14 +224,31 @@ func (s *EmbeddedDERPServerScenario) CreateHeadscaleEnv( return err } - err = s.CreateTailscaleIsolatedNodesInUser( - hash, - userName, - "all", - clientCount, - ) - if err != nil { - return err + if clientCount.Plain > 0 { + // Containers that use default DERP config + err = s.CreateTailscaleIsolatedNodesInUser( + hash, + userName, + "all", + clientCount.Plain, + ) + if err != nil { + return err + } + } + + if clientCount.WebsocketDERP > 0 { + // Containers that use DERP-over-WebSocket + err = s.CreateTailscaleIsolatedNodesInUser( + hash, + userName, + "all", + clientCount.WebsocketDERP, + tsic.WithWebsocketDERP(true), + ) + if err != nil { + return err + } } key, err := s.CreatePreAuthKey(userName, true, false) diff --git a/integration/hsic/hsic.go b/integration/hsic/hsic.go index 20a778b8..c2ae3336 100644 --- a/integration/hsic/hsic.go +++ b/integration/hsic/hsic.go @@ -461,6 +461,12 @@ func (t *HeadscaleInContainer) Shutdown() (string, string, error) { return stdoutPath, stderrPath, t.pool.Purge(t.container) } +// WriteLogs writes the current stdout/stderr log of the container to +// the given io.Writers. +func (t *HeadscaleInContainer) WriteLogs(stdout, stderr io.Writer) error { + return dockertestutil.WriteLog(t.pool, t.container, stdout, stderr) +} + // SaveLog saves the current stdout log of the container to a path // on the host system. func (t *HeadscaleInContainer) SaveLog(path string) (string, string, error) { diff --git a/integration/tailscale.go b/integration/tailscale.go index 5b1baf1b..f858d2c2 100644 --- a/integration/tailscale.go +++ b/integration/tailscale.go @@ -1,6 +1,7 @@ package integration import ( + "io" "net/netip" "net/url" @@ -41,4 +42,6 @@ type TailscaleClient interface { // FailingPeersAsString returns a formatted-ish multi-line-string of peers in the client // and a bool indicating if the clients online count and peer count is equal. FailingPeersAsString() (string, bool, error) + + WriteLogs(stdout, stderr io.Writer) error } diff --git a/integration/tsic/tsic.go b/integration/tsic/tsic.go index a3fac17c..944bb94d 100644 --- a/integration/tsic/tsic.go +++ b/integration/tsic/tsic.go @@ -67,6 +67,7 @@ type TailscaleInContainer struct { // optional config headscaleCert []byte headscaleHostname string + withWebsocketDERP bool withSSH bool withTags []string withEntrypoint []string @@ -126,6 +127,14 @@ func WithTags(tags []string) Option { } } +// WithWebsocketDERP toggles a development knob to +// force enable DERP connection through the new websocket protocol. +func WithWebsocketDERP(enabled bool) Option { + return func(tsic *TailscaleInContainer) { + tsic.withWebsocketDERP = enabled + } +} + // WithSSH enables SSH for the Tailscale instance. func WithSSH() Option { return func(tsic *TailscaleInContainer) { @@ -206,6 +215,14 @@ func New( // }, Entrypoint: tsic.withEntrypoint, ExtraHosts: tsic.withExtraHosts, + Env: []string{}, + } + + if tsic.withWebsocketDERP { + tailscaleOptions.Env = append( + tailscaleOptions.Env, + fmt.Sprintf("TS_DEBUG_DERP_WS_CLIENT=%t", tsic.withWebsocketDERP), + ) } if tsic.headscaleHostname != "" { @@ -351,6 +368,15 @@ func (t *TailscaleInContainer) Execute( return stdout, stderr, nil } +// Retrieve container logs. +func (t *TailscaleInContainer) Logs(stdout, stderr io.Writer) error { + return dockertestutil.WriteLog( + t.pool, + t.container, + stdout, stderr, + ) +} + // Up runs the login routine on the given Tailscale instance. // This login mechanism uses the authorised key for authentication. func (t *TailscaleInContainer) Login( @@ -999,10 +1025,21 @@ func (t *TailscaleInContainer) WriteFile(path string, data []byte) error { // on the host system. func (t *TailscaleInContainer) SaveLog(path string) error { // TODO(kradalby): Assert if tailscale logs contains panics. + // NOTE(enoperm): `t.WriteLog | countMatchingLines` + // is probably most of what is for that, + // but I'd rather not change the behaviour here, + // as it may affect all the other tests + // I have not otherwise touched. _, _, err := dockertestutil.SaveLog(t.pool, t.container, path) return err } +// WriteLogs writes the current stdout/stderr log of the container to +// the given io.Writers. +func (t *TailscaleInContainer) WriteLogs(stdout, stderr io.Writer) error { + return dockertestutil.WriteLog(t.pool, t.container, stdout, stderr) +} + // ReadFile reads a file from the Tailscale container. // It returns the content of the file as a byte slice. func (t *TailscaleInContainer) ReadFile(path string) ([]byte, error) { diff --git a/integration/utils.go b/integration/utils.go index 840dbc4c..ec6aeecf 100644 --- a/integration/utils.go +++ b/integration/utils.go @@ -1,6 +1,9 @@ package integration import ( + "bufio" + "bytes" + "io" "os" "strings" "sync" @@ -78,6 +81,25 @@ func assertContains(t *testing.T, str, subStr string) { } } +func didClientUseWebsocketForDERP(t *testing.T, client TailscaleClient) bool { + t.Helper() + + buf := &bytes.Buffer{} + err := client.WriteLogs(buf, buf) + if err != nil { + t.Fatalf("failed to fetch client logs: %s: %s", client.Hostname(), err) + } + + count, err := countMatchingLines(buf, func(line string) bool { + return strings.Contains(line, "websocket: connected to ") + }) + if err != nil { + t.Fatalf("failed to process client logs: %s: %s", client.Hostname(), err) + } + + return count > 0 +} + func pingAllHelper(t *testing.T, clients []TailscaleClient, addrs []string, opts ...tsic.PingOption) int { t.Helper() success := 0 @@ -113,7 +135,7 @@ func pingDerpAllHelper(t *testing.T, clients []TailscaleClient, addrs []string) tsic.WithPingUntilDirect(false), ) if err != nil { - t.Fatalf("failed to ping %s from %s: %s", addr, client.Hostname(), err) + t.Logf("failed to ping %s from %s: %s", addr, client.Hostname(), err) } else { success++ } @@ -321,6 +343,25 @@ func dockertestMaxWait() time.Duration { return wait } +func countMatchingLines(in io.Reader, predicate func(string) bool) (int, error) { + count := 0 + scanner := bufio.NewScanner(in) + { + const logBufferInitialSize = 1024 << 10 // preallocate 1 MiB + buff := make([]byte, logBufferInitialSize) + scanner.Buffer(buff, len(buff)) + scanner.Split(bufio.ScanLines) + } + + for scanner.Scan() { + if predicate(scanner.Text()) { + count += 1 + } + } + + return count, scanner.Err() +} + // func dockertestCommandTimeout() time.Duration { // timeout := 10 * time.Second //nolint //