diff --git a/Gopkg.lock b/Gopkg.lock index 55310b9f0723e..417273c465318 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -105,6 +105,12 @@ ] revision = "104e2578924bb3b211150c19414d0144b82165bb" +[[projects]] + name = "github.com/gorilla/websocket" + packages = ["."] + revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b" + version = "v1.2.0" + [[projects]] name = "github.com/grpc-ecosystem/go-grpc-prometheus" packages = ["."] @@ -636,6 +642,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "54e0dd7c4513ef8ceddf3cbb7e4d20a7b868c4a850b8c5efce81065181e2b745" + inputs-digest = "51bf169451e7cef474b4402438cb4a0cf0cf43159a08fc5bdf29498c46031dc1" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index b011401d8f51c..c1f21cdd1c1fe 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -48,6 +48,9 @@ required = ["github.com/golang/protobuf/protoc-gen-go"] name = "github.com/prometheus/client_golang" revision = "9bb6ab929dcbe1c8393cd9ef70387cb69811bd1c" +[[constraint]] + name = "github.com/gorilla/websocket" + version = "1.2.0" # # k8s.io/kubernetes dependency fixes # taken from https://github.com/kubernetes/kubernetes/blob/master/Godeps/Godeps.json diff --git a/cli/Dockerfile-bin b/cli/Dockerfile-bin index 0bb137c48cd32..01008bb8c9c4c 100644 --- a/cli/Dockerfile-bin +++ b/cli/Dockerfile-bin @@ -1,5 +1,5 @@ ## compile binaries -FROM gcr.io/linkerd-io/go-deps:baf323e4 as golang +FROM gcr.io/linkerd-io/go-deps:766a0983 as golang WORKDIR /go/src/github.com/linkerd/linkerd2 COPY cli cli COPY controller/k8s controller/k8s diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index 2b53544cc8360..35b95d345dbce 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -5,14 +5,13 @@ import ( "fmt" "io" "os" + "strings" "text/tabwriter" "github.com/linkerd/linkerd2/controller/api/util" pb "github.com/linkerd/linkerd2/controller/gen/public" - "github.com/linkerd/linkerd2/pkg/addr" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "google.golang.org/grpc/codes" ) type tapOptions struct { @@ -74,7 +73,19 @@ func newCmdTap() *cobra.Command { Args: cobra.RangeArgs(1, 2), ValidArgs: util.ValidTargets, RunE: func(cmd *cobra.Command, args []string) error { - req, err := buildTapByResourceRequest(args, options) + requestParams := util.TapRequestParams{ + Resource: strings.Join(args, "/"), + Namespace: options.namespace, + ToResource: options.toResource, + ToNamespace: options.toNamespace, + MaxRps: options.maxRps, + Scheme: options.scheme, + Method: options.method, + Authority: options.authority, + Path: options.path, + } + + req, err := util.BuildTapByResourceRequest(requestParams) if err != nil { return err } @@ -108,103 +119,11 @@ func newCmdTap() *cobra.Command { return cmd } -func buildTapByResourceRequest( - resource []string, - options *tapOptions, -) (*pb.TapByResourceRequest, error) { - - target, err := util.BuildResource(options.namespace, resource...) - if err != nil { - return nil, fmt.Errorf("target resource invalid: %s", err) - } - if !contains(util.ValidTargets, target.Type) { - return nil, fmt.Errorf("unsupported resource type [%s]", target.Type) - } - - matches := []*pb.TapByResourceRequest_Match{} - - if options.toResource != "" { - destination, err := util.BuildResource(options.toNamespace, options.toResource) - if err != nil { - return nil, fmt.Errorf("destination resource invalid: %s", err) - } - if !contains(util.ValidDestinations, destination.Type) { - return nil, fmt.Errorf("unsupported resource type [%s]", target.Type) - } - - match := pb.TapByResourceRequest_Match{ - Match: &pb.TapByResourceRequest_Match_Destinations{ - Destinations: &pb.ResourceSelection{ - Resource: &destination, - }, - }, - } - matches = append(matches, &match) - } - - if options.scheme != "" { - match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{ - Match: &pb.TapByResourceRequest_Match_Http_Scheme{Scheme: options.scheme}, - }) - matches = append(matches, &match) - } - if options.method != "" { - match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{ - Match: &pb.TapByResourceRequest_Match_Http_Method{Method: options.method}, - }) - matches = append(matches, &match) - } - if options.authority != "" { - match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{ - Match: &pb.TapByResourceRequest_Match_Http_Authority{Authority: options.authority}, - }) - matches = append(matches, &match) - } - if options.path != "" { - match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{ - Match: &pb.TapByResourceRequest_Match_Http_Path{Path: options.path}, - }) - matches = append(matches, &match) - } - - return &pb.TapByResourceRequest{ - Target: &pb.ResourceSelection{ - Resource: &target, - }, - MaxRps: options.maxRps, - Match: &pb.TapByResourceRequest_Match{ - Match: &pb.TapByResourceRequest_Match_All{ - All: &pb.TapByResourceRequest_Match_Seq{ - Matches: matches, - }, - }, - }, - }, nil -} - -func contains(list []string, s string) bool { - for _, elem := range list { - if s == elem { - return true - } - } - return false -} - -func buildMatchHTTP(match *pb.TapByResourceRequest_Match_Http) pb.TapByResourceRequest_Match { - return pb.TapByResourceRequest_Match{ - Match: &pb.TapByResourceRequest_Match_Http_{ - Http: match, - }, - } -} - func requestTapByResourceFromAPI(w io.Writer, client pb.ApiClient, req *pb.TapByResourceRequest) error { rsp, err := client.TapByResource(context.Background(), req) if err != nil { return err } - return renderTap(w, rsp) } @@ -230,7 +149,7 @@ func writeTapEventsToBuffer(tapClient pb.Api_TapByResourceClient, w *tabwriter.W fmt.Fprintln(os.Stderr, err) break } - _, err = fmt.Fprintln(w, renderTapEvent(event)) + _, err = fmt.Fprintln(w, util.RenderTapEvent(event)) if err != nil { return err } @@ -238,89 +157,3 @@ func writeTapEventsToBuffer(tapClient pb.Api_TapByResourceClient, w *tabwriter.W return nil } - -func renderTapEvent(event *pb.TapEvent) string { - dstLabels := event.GetDestinationMeta().GetLabels() - - dst := addr.PublicAddressToString(event.GetDestination()) - if pod := dstLabels["pod"]; pod != "" { - dst = fmt.Sprintf("%s:%d", pod, event.GetDestination().GetPort()) - } - - proxy := "???" - tls := "" - switch event.GetProxyDirection() { - case pb.TapEvent_INBOUND: - proxy = "in " // A space is added so it aligns with `out`. - srcLabels := event.GetSourceMeta().GetLabels() - tls = srcLabels["tls"] - case pb.TapEvent_OUTBOUND: - proxy = "out" - tls = dstLabels["tls"] - default: - // Too old for TLS. - } - - flow := fmt.Sprintf("proxy=%s src=%s dst=%s tls=%s", - proxy, - addr.PublicAddressToString(event.GetSource()), - dst, - tls, - ) - - switch ev := event.GetHttp().GetEvent().(type) { - case *pb.TapEvent_Http_RequestInit_: - return fmt.Sprintf("req id=%d:%d %s :method=%s :authority=%s :path=%s", - ev.RequestInit.GetId().GetBase(), - ev.RequestInit.GetId().GetStream(), - flow, - ev.RequestInit.GetMethod().GetRegistered().String(), - ev.RequestInit.GetAuthority(), - ev.RequestInit.GetPath(), - ) - - case *pb.TapEvent_Http_ResponseInit_: - return fmt.Sprintf("rsp id=%d:%d %s :status=%d latency=%dµs", - ev.ResponseInit.GetId().GetBase(), - ev.ResponseInit.GetId().GetStream(), - flow, - ev.ResponseInit.GetHttpStatus(), - ev.ResponseInit.GetSinceRequestInit().GetNanos()/1000, - ) - - case *pb.TapEvent_Http_ResponseEnd_: - switch eos := ev.ResponseEnd.GetEos().GetEnd().(type) { - case *pb.Eos_GrpcStatusCode: - return fmt.Sprintf("end id=%d:%d %s grpc-status=%s duration=%dµs response-length=%dB", - ev.ResponseEnd.GetId().GetBase(), - ev.ResponseEnd.GetId().GetStream(), - flow, - codes.Code(eos.GrpcStatusCode), - ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, - ev.ResponseEnd.GetResponseBytes(), - ) - - case *pb.Eos_ResetErrorCode: - return fmt.Sprintf("end id=%d:%d %s reset-error=%+v duration=%dµs response-length=%dB", - ev.ResponseEnd.GetId().GetBase(), - ev.ResponseEnd.GetId().GetStream(), - flow, - eos.ResetErrorCode, - ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, - ev.ResponseEnd.GetResponseBytes(), - ) - - default: - return fmt.Sprintf("end id=%d:%d %s duration=%dµs response-length=%dB", - ev.ResponseEnd.GetId().GetBase(), - ev.ResponseEnd.GetId().GetStream(), - flow, - ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, - ev.ResponseEnd.GetResponseBytes(), - ) - } - - default: - return fmt.Sprintf("unknown %s", flow) - } -} diff --git a/cli/cmd/tap_test.go b/cli/cmd/tap_test.go index 0960d3f638c56..9b2b7671eb916 100644 --- a/cli/cmd/tap_test.go +++ b/cli/cmd/tap_test.go @@ -9,6 +9,7 @@ import ( "github.com/golang/protobuf/ptypes/duration" "github.com/linkerd/linkerd2/controller/api/public" + "github.com/linkerd/linkerd2/controller/api/util" pb "github.com/linkerd/linkerd2/controller/gen/public" "github.com/linkerd/linkerd2/pkg/addr" "github.com/linkerd/linkerd2/pkg/k8s" @@ -19,17 +20,15 @@ func TestRequestTapByResourceFromAPI(t *testing.T) { t.Run("Should render busy response if everything went well", func(t *testing.T) { resourceType := k8s.Pod targetName := "pod-666" - options := &tapOptions{ - scheme: "https", - method: "GET", - authority: "localhost", - path: "/some/path", + params := util.TapRequestParams{ + Resource: resourceType + "/" + targetName, + Scheme: "https", + Method: "GET", + Authority: "localhost", + Path: "/some/path", } - req, err := buildTapByResourceRequest( - []string{resourceType, targetName}, - options, - ) + req, err := util.BuildTapByResourceRequest(params) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -41,8 +40,8 @@ func TestRequestTapByResourceFromAPI(t *testing.T) { Id: &pb.TapEvent_Http_StreamId{ Base: 1, }, - Authority: options.authority, - Path: options.path, + Authority: params.Authority, + Path: params.Path, }, }, }, @@ -98,17 +97,15 @@ func TestRequestTapByResourceFromAPI(t *testing.T) { t.Run("Should render empty response if no events returned", func(t *testing.T) { resourceType := k8s.Pod targetName := "pod-666" - options := &tapOptions{ - scheme: "https", - method: "GET", - authority: "localhost", - path: "/some/path", + params := util.TapRequestParams{ + Resource: resourceType + "/" + targetName, + Scheme: "https", + Method: "GET", + Authority: "localhost", + Path: "/some/path", } - req, err := buildTapByResourceRequest( - []string{resourceType, targetName}, - options, - ) + req, err := util.BuildTapByResourceRequest(params) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -139,17 +136,15 @@ func TestRequestTapByResourceFromAPI(t *testing.T) { t.SkipNow() resourceType := k8s.Pod targetName := "pod-666" - options := &tapOptions{ - scheme: "https", - method: "GET", - authority: "localhost", - path: "/some/path", + params := util.TapRequestParams{ + Resource: resourceType + "/" + targetName, + Scheme: "https", + Method: "GET", + Authority: "localhost", + Path: "/some/path", } - req, err := buildTapByResourceRequest( - []string{resourceType, targetName}, - options, - ) + req, err := util.BuildTapByResourceRequest(params) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -218,7 +213,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "req id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= :method=POST :authority=hello.default:7777 :path=/hello.v1.HelloService/Hello" - output := renderTapEvent(event) + output := util.RenderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -235,7 +230,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "rsp id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= :status=200 latency=999µs" - output := renderTapEvent(event) + output := util.RenderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -256,7 +251,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= grpc-status=OK duration=888µs response-length=111B" - output := renderTapEvent(event) + output := util.RenderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -277,7 +272,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= reset-error=123 duration=888µs response-length=111B" - output := renderTapEvent(event) + output := util.RenderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -296,7 +291,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= duration=888µs response-length=111B" - output := renderTapEvent(event) + output := util.RenderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -314,7 +309,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= duration=888µs response-length=111B" - output := renderTapEvent(event) + output := util.RenderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -324,7 +319,7 @@ func TestEventToString(t *testing.T) { event := toTapEvent(&pb.TapEvent_Http{}) expectedOutput := "unknown proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls=" - output := renderTapEvent(event) + output := util.RenderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } diff --git a/controller/Dockerfile b/controller/Dockerfile index 9495d90d26390..bf291d542df49 100644 --- a/controller/Dockerfile +++ b/controller/Dockerfile @@ -1,5 +1,5 @@ ## compile controller services -FROM gcr.io/linkerd-io/go-deps:baf323e4 as golang +FROM gcr.io/linkerd-io/go-deps:766a0983 as golang WORKDIR /go/src/github.com/linkerd/linkerd2 COPY controller/gen controller/gen COPY pkg pkg diff --git a/controller/api/util/api_utils.go b/controller/api/util/api_utils.go index 9eb0407770d8d..c15bb5919845a 100644 --- a/controller/api/util/api_utils.go +++ b/controller/api/util/api_utils.go @@ -2,10 +2,12 @@ package util import ( "errors" + "fmt" "strings" "time" pb "github.com/linkerd/linkerd2/controller/gen/public" + "github.com/linkerd/linkerd2/pkg/addr" "github.com/linkerd/linkerd2/pkg/k8s" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -59,6 +61,18 @@ type StatSummaryRequestParams struct { AllNamespaces bool } +type TapRequestParams struct { + Resource string + Namespace string + ToResource string + ToNamespace string + MaxRps float32 + Scheme string + Method string + Authority string + Path string +} + // GRPCError generates a gRPC error code, as defined in // google.golang.org/grpc/status. // If the error is nil or already a gRPC error, return the error. @@ -231,3 +245,176 @@ func buildResource(namespace string, resType string, name string) (pb.Resource, Name: name, }, nil } + +func BuildTapByResourceRequest(params TapRequestParams) (*pb.TapByResourceRequest, error) { + target, err := BuildResource(params.Namespace, params.Resource) + if err != nil { + return nil, fmt.Errorf("target resource invalid: %s", err) + } + if !contains(ValidTargets, target.Type) { + return nil, fmt.Errorf("unsupported resource type [%s]", target.Type) + } + + matches := []*pb.TapByResourceRequest_Match{} + + if params.ToResource != "" { + destination, err := BuildResource(params.ToNamespace, params.ToResource) + if err != nil { + return nil, fmt.Errorf("destination resource invalid: %s", err) + } + if !contains(ValidDestinations, destination.Type) { + return nil, fmt.Errorf("unsupported resource type [%s]", target.Type) + } + + match := pb.TapByResourceRequest_Match{ + Match: &pb.TapByResourceRequest_Match_Destinations{ + Destinations: &pb.ResourceSelection{ + Resource: &destination, + }, + }, + } + matches = append(matches, &match) + } + + if params.Scheme != "" { + match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{ + Match: &pb.TapByResourceRequest_Match_Http_Scheme{Scheme: params.Scheme}, + }) + matches = append(matches, &match) + } + if params.Method != "" { + match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{ + Match: &pb.TapByResourceRequest_Match_Http_Method{Method: params.Method}, + }) + matches = append(matches, &match) + } + if params.Authority != "" { + match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{ + Match: &pb.TapByResourceRequest_Match_Http_Authority{Authority: params.Authority}, + }) + matches = append(matches, &match) + } + if params.Path != "" { + match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{ + Match: &pb.TapByResourceRequest_Match_Http_Path{Path: params.Path}, + }) + matches = append(matches, &match) + } + + return &pb.TapByResourceRequest{ + Target: &pb.ResourceSelection{ + Resource: &target, + }, + MaxRps: params.MaxRps, + Match: &pb.TapByResourceRequest_Match{ + Match: &pb.TapByResourceRequest_Match_All{ + All: &pb.TapByResourceRequest_Match_Seq{ + Matches: matches, + }, + }, + }, + }, nil +} + +func buildMatchHTTP(match *pb.TapByResourceRequest_Match_Http) pb.TapByResourceRequest_Match { + return pb.TapByResourceRequest_Match{ + Match: &pb.TapByResourceRequest_Match_Http_{ + Http: match, + }, + } +} + +func contains(list []string, s string) bool { + for _, elem := range list { + if s == elem { + return true + } + } + return false +} + +func RenderTapEvent(event *pb.TapEvent) string { + dstLabels := event.GetDestinationMeta().GetLabels() + + dst := addr.PublicAddressToString(event.GetDestination()) + if pod := dstLabels["pod"]; pod != "" { + dst = fmt.Sprintf("%s:%d", pod, event.GetDestination().GetPort()) + } + + proxy := "???" + tls := "" + switch event.GetProxyDirection() { + case pb.TapEvent_INBOUND: + proxy = "in " // A space is added so it aligns with `out`. + srcLabels := event.GetSourceMeta().GetLabels() + tls = srcLabels["tls"] + case pb.TapEvent_OUTBOUND: + proxy = "out" + tls = dstLabels["tls"] + default: + // Too old for TLS. + } + + flow := fmt.Sprintf("proxy=%s src=%s dst=%s tls=%s", + proxy, + addr.PublicAddressToString(event.GetSource()), + dst, + tls, + ) + + switch ev := event.GetHttp().GetEvent().(type) { + case *pb.TapEvent_Http_RequestInit_: + return fmt.Sprintf("req id=%d:%d %s :method=%s :authority=%s :path=%s", + ev.RequestInit.GetId().GetBase(), + ev.RequestInit.GetId().GetStream(), + flow, + ev.RequestInit.GetMethod().GetRegistered().String(), + ev.RequestInit.GetAuthority(), + ev.RequestInit.GetPath(), + ) + + case *pb.TapEvent_Http_ResponseInit_: + return fmt.Sprintf("rsp id=%d:%d %s :status=%d latency=%dµs", + ev.ResponseInit.GetId().GetBase(), + ev.ResponseInit.GetId().GetStream(), + flow, + ev.ResponseInit.GetHttpStatus(), + ev.ResponseInit.GetSinceRequestInit().GetNanos()/1000, + ) + + case *pb.TapEvent_Http_ResponseEnd_: + switch eos := ev.ResponseEnd.GetEos().GetEnd().(type) { + case *pb.Eos_GrpcStatusCode: + return fmt.Sprintf("end id=%d:%d %s grpc-status=%s duration=%dµs response-length=%dB", + ev.ResponseEnd.GetId().GetBase(), + ev.ResponseEnd.GetId().GetStream(), + flow, + codes.Code(eos.GrpcStatusCode), + ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, + ev.ResponseEnd.GetResponseBytes(), + ) + + case *pb.Eos_ResetErrorCode: + return fmt.Sprintf("end id=%d:%d %s reset-error=%+v duration=%dµs response-length=%dB", + ev.ResponseEnd.GetId().GetBase(), + ev.ResponseEnd.GetId().GetStream(), + flow, + eos.ResetErrorCode, + ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, + ev.ResponseEnd.GetResponseBytes(), + ) + + default: + return fmt.Sprintf("end id=%d:%d %s duration=%dµs response-length=%dB", + ev.ResponseEnd.GetId().GetBase(), + ev.ResponseEnd.GetId().GetStream(), + flow, + ev.ResponseEnd.GetSinceResponseInit().GetNanos()/1000, + ev.ResponseEnd.GetResponseBytes(), + ) + } + + default: + return fmt.Sprintf("unknown %s", flow) + } +} diff --git a/proxy-init/Dockerfile b/proxy-init/Dockerfile index f27770692ca3c..cc3c6fab80558 100644 --- a/proxy-init/Dockerfile +++ b/proxy-init/Dockerfile @@ -1,5 +1,5 @@ ## compile proxy-init utility -FROM gcr.io/linkerd-io/go-deps:baf323e4 as golang +FROM gcr.io/linkerd-io/go-deps:766a0983 as golang WORKDIR /go/src/github.com/linkerd/linkerd2 COPY ./proxy-init ./proxy-init RUN CGO_ENABLED=0 GOOS=linux go install -v ./proxy-init/ diff --git a/web/Dockerfile b/web/Dockerfile index bb8f7c2c74a26..f0fd4da7f89e3 100644 --- a/web/Dockerfile +++ b/web/Dockerfile @@ -24,7 +24,7 @@ ENV NODE_ENV production RUN $ROOT/bin/web build ## compile go server -FROM gcr.io/linkerd-io/go-deps:baf323e4 as golang +FROM gcr.io/linkerd-io/go-deps:766a0983 as golang WORKDIR /go/src/github.com/linkerd/linkerd2 COPY web web COPY controller controller diff --git a/web/app/css/tap.css b/web/app/css/tap.css new file mode 100644 index 0000000000000..06329b3556e39 --- /dev/null +++ b/web/app/css/tap.css @@ -0,0 +1,17 @@ +@import 'styles.css'; + +.tap-display { + font-size: 10px; + margin-top: calc(4 * var(--base-width)); +} + +button.ant-btn-primary { + &.tap-start { + background-color: var(--green); + border-color: var(--green); + } + &.tap-stop { + background-color: var(--siennared); + border-color: var(--siennared); + } +} diff --git a/web/app/js/components/Sidebar.jsx b/web/app/js/components/Sidebar.jsx index 9a5ca014f41f7..38edb16465764 100644 --- a/web/app/js/components/Sidebar.jsx +++ b/web/app/js/components/Sidebar.jsx @@ -161,6 +161,13 @@ class Sidebar extends React.Component { + {/* + + + Tap + + */} + { _.map(_.take(this.state.namespaces, this.state.maxNsItemsToShow), ns => { return ( diff --git a/web/app/js/components/Tap.jsx b/web/app/js/components/Tap.jsx new file mode 100644 index 0000000000000..4d3c88aa1d1aa --- /dev/null +++ b/web/app/js/components/Tap.jsx @@ -0,0 +1,144 @@ +import _ from 'lodash'; +import ErrorBanner from './ErrorBanner.jsx'; +import PageHeader from './PageHeader.jsx'; +import PropTypes from 'prop-types'; +import React from 'react'; +import { withContext } from './util/AppContext.jsx'; +import { Button, Form, Icon, Input } from 'antd'; +import './../../css/tap.css'; + +class Tap extends React.Component { + static propTypes = { + pathPrefix: PropTypes.string.isRequired + } + + constructor(props) { + super(props); + + this.state = { + errors: "", + resource: "", + namespace: "", + messages: [], + maxLinesToDisplay: 40, + webSocketRequestSent: false + }; + } + + componentWillUnmount() { + this.ws.close(1000); + this.stopTapStreaming(); + this.stopServerPolling(); + } + + onWebsocketRecv = e => { + this.setState({ + messages: _(this.state.messages) + .push(e.data) + .takeRight(this.state.maxLinesToDisplay) + .value() + }); + } + + onWebsocketClose = e => { + if (!e.wasClean) { + this.setState({ + errors: `Websocket [${e.code}] ${e.reason}` + }); + } + this.stopTapStreaming(); + } + + onWebsocketOpen = () => { + this.ws.send(JSON.stringify({ + id: "tap-web", + resource: this.state.resource, + namespace: this.state.namespace + })); + this.setState({ + webSocketRequestSent: true, + errors: "" + }); + } + + startTapSteaming() { + this.setState({ + messages: [] + }); + + let protocol = window.location.protocol === "https:" ? "wss" : "ws"; + let tapWebSocket = `${protocol}://${window.location.host}${this.props.pathPrefix}/api/tap`; + + this.ws = new WebSocket(tapWebSocket); + this.ws.onmessage = this.onWebsocketRecv; + this.ws.onclose = this.onWebsocketClose; + this.ws.onopen = this.onWebsocketOpen; + } + + stopTapStreaming() { + this.setState({ + webSocketRequestSent: false + }); + + window.clearInterval(this.timerId); + } + + handleTapStart = e => { + e.preventDefault(); + this.startTapSteaming(); + } + + handleTapStop = () => { + this.ws.close(1000); + } + + handleFormChange = formVal => { + let state = {}; + return e => { + state[formVal] = e.target.value; + this.setState(state); + }; + } + + renderTapForm = () => { + return ( +
+ + + + + + + + + + { this.state.webSocketRequestSent ? + : + + } + +
+ ); + } + + render() { + return ( +
+ {!this.state.errors ? null : } + + + {this.renderTapForm()} + +
+ + { this.state.webSocketRequestSent && _.size(this.state.messages) === 0 ? +
Starting tap on {this.state.resource} in namespace {this.state.namespace}
: null } + { _.map(this.state.messages, (m, i) =>
{m}
)} +
+
+
+ ); + } +} + +export default withContext(Tap); diff --git a/web/app/js/index.js b/web/app/js/index.js index dfdf685dc73d0..d898a52219898 100644 --- a/web/app/js/index.js +++ b/web/app/js/index.js @@ -8,6 +8,7 @@ import ReactDOM from 'react-dom'; import ResourceList from './components/ResourceList.jsx'; import ServiceMesh from './components/ServiceMesh.jsx'; import Sidebar from './components/Sidebar.jsx'; +import Tap from './components/Tap.jsx'; import { BrowserRouter, Redirect, Route, Switch } from 'react-router-dom'; import './../css/styles.css'; import './../img/favicon.png'; // needs to be referenced somewhere so webpack bundles it @@ -40,6 +41,7 @@ let applicationHtml = ( + } /> diff --git a/web/srv/api_handlers.go b/web/srv/api_handlers.go index a824a3ef5471d..2fbfc207a9ad8 100644 --- a/web/srv/api_handlers.go +++ b/web/srv/api_handlers.go @@ -2,10 +2,12 @@ package srv import ( "encoding/json" + "io" "net/http" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" + "github.com/gorilla/websocket" "github.com/julienschmidt/httprouter" "github.com/linkerd/linkerd2/controller/api/util" pb "github.com/linkerd/linkerd2/controller/gen/public" @@ -22,6 +24,11 @@ type ( var ( defaultResourceType = k8s.Deployment pbMarshaler = jsonpb.Marshaler{EmitDefaults: true} + maxMessageSize = 2048 + websocketUpgrader = websocket.Upgrader{ + ReadBufferSize: maxMessageSize, + WriteBufferSize: maxMessageSize, + } ) func renderJsonError(w http.ResponseWriter, err error, status int) { @@ -110,3 +117,78 @@ func (h *handler) handleApiStat(w http.ResponseWriter, req *http.Request, p http } renderJsonPb(w, result) } + +func (h *handler) handleApiTap(w http.ResponseWriter, req *http.Request, p httprouter.Params) { + ws, err := websocketUpgrader.Upgrade(w, req, nil) + if err != nil { + renderJsonError(w, err, http.StatusInternalServerError) + return + } + defer ws.Close() + + messageType, message, err := ws.ReadMessage() + if err != nil { + ws.WriteMessage(websocket.CloseMessage, []byte(err.Error())) + return + } + + if messageType != websocket.TextMessage { + ws.WriteMessage(websocket.CloseMessage, []byte("MessageType not supported")) + return + } + + var requestParams util.TapRequestParams + err = json.Unmarshal(message, &requestParams) + if err != nil { + ws.WriteMessage(websocket.CloseMessage, []byte(err.Error())) + return + } + + if requestParams.MaxRps == 0.0 { + requestParams.MaxRps = 1.0 + } + + tapReq, err := util.BuildTapByResourceRequest(requestParams) + if err != nil { + ws.WriteMessage(websocket.CloseMessage, []byte(err.Error())) + return + } + + tapClient, err := h.apiClient.TapByResource(req.Context(), tapReq) + if err != nil { + ws.WriteMessage(websocket.CloseMessage, []byte(err.Error())) + return + } + defer tapClient.CloseSend() + + go func() { + for { + rsp, err := tapClient.Recv() + if err == io.EOF { + break + } + if err != nil { + ws.WriteMessage(websocket.CloseMessage, []byte(err.Error())) + break + } + + tapEvent := util.RenderTapEvent(rsp) + if err := ws.WriteMessage(websocket.TextMessage, []byte(tapEvent)); err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + log.Error(err) + } + break + } + } + }() + + for { + _, _, err := ws.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + log.Errorf("Unexpected close error: %s", err) + } + return + } + } +} diff --git a/web/srv/server.go b/web/srv/server.go index 98f2a0073c966..6a84299e8791d 100644 --- a/web/srv/server.go +++ b/web/srv/server.go @@ -90,6 +90,7 @@ func NewServer(addr, templateDir, staticDir, uuid, controllerNamespace, webpackD server.router.GET("/replicationcontrollers", handler.handleIndex) server.router.GET("/pods", handler.handleIndex) server.router.GET("/authorities", handler.handleIndex) + server.router.GET("/tap", handler.handleIndex) server.router.ServeFiles( "/dist/*filepath", // add catch-all parameter to match all files in dir filesonly.FileSystem(server.staticDir)) @@ -98,6 +99,7 @@ func NewServer(addr, templateDir, staticDir, uuid, controllerNamespace, webpackD server.router.GET("/api/version", handler.handleApiVersion) server.router.GET("/api/tps-reports", handler.handleApiStat) server.router.GET("/api/pods", handler.handleApiPods) + server.router.GET("/api/tap", handler.handleApiTap) return httpServer }