Skip to content

Commit

Permalink
feat: add trace log (#59)
Browse files Browse the repository at this point in the history
* feat: close watcher gracefully in error responder

* feat: upstream info filter

* feat: add trace log

* feat: tracing metric

* fix: typo Reqeust -> Request
  • Loading branch information
xuqingyun authored Mar 26, 2024
1 parent 7a52a11 commit d051695
Show file tree
Hide file tree
Showing 25 changed files with 778 additions and 112 deletions.
3 changes: 3 additions & 0 deletions cmd/kube-gateway/app/options/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ProxyOptions struct {
SecureServing *proxyoptions.SecureServingOptions
ProcessInfo *genericoptions.ProcessInfo
Logging *proxyoptions.LoggingOptions
Tracing *proxyoptions.TracingOptions
ServerRun *proxyoptions.ServerRunOptions
RateLimiter *proxyoptions.RateLimiterOptions
}
Expand All @@ -38,6 +39,7 @@ func NewProxyOptions() *ProxyOptions {
SecureServing: proxyoptions.NewSecureServingOptions(),
ProcessInfo: genericoptions.NewProcessInfo("kube-gateway-proxy", "kube-system"),
Logging: proxyoptions.NewLoggingOptions(),
Tracing: proxyoptions.NewTracingOptions(),
ServerRun: proxyoptions.NewServerRunOptions(),
RateLimiter: proxyoptions.NewRateLimiterOptions(),
}
Expand All @@ -50,6 +52,7 @@ func (s *ProxyOptions) Flags() (fss cliflag.NamedFlagSets) {
s.Authorization.AddFlags(fs)
s.SecureServing.AddFlags(fs)
s.Logging.AddFlags(fs)
s.Tracing.AddFlags(fs)
s.ServerRun.AddFlags(fs)
s.RateLimiter.AddFlags(fs)
return
Expand Down
6 changes: 5 additions & 1 deletion cmd/kube-gateway/app/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func CreateProxyConfig(
// Proxy handler
oo := &proxyHandlerOptions{
clusterManager: clusterController,
enableProxyTracing: o.Tracing.EnableProxyTracing,
enableAccessLog: o.Logging.EnableProxyAccessLog,
maxInflightThreshold: o.ServerRun.MaxInflightThreshold,
maxQPSThreshold: o.ServerRun.MaxQPSThreshold,
Expand Down Expand Up @@ -127,6 +128,7 @@ func buildProxyRecommenedOptions(o *options.ProxyOptions, controlplaneOptions *o

type proxyHandlerOptions struct {
clusterManager clusters.Manager
enableProxyTracing bool
enableAccessLog bool
maxInflightThreshold int32
maxQPSThreshold int32
Expand Down Expand Up @@ -159,7 +161,9 @@ func buildProxyHandlerChainFunc(o *proxyHandlerOptions) func(apiHandler http.Han
handler = gatewayfilters.WithRequestRate(handler, c.LongRunningFunc, rateMonitor)

handler = gatewayfilters.WithPreProcessingMetrics(handler)
handler = gatewayfilters.WithExtraRequestInfo(handler, &request.ExtraRequestInfoFactory{})
handler = gatewayfilters.WithTraceLog(handler, o.enableProxyTracing, c.LongRunningFunc)
handler = gatewayfilters.WithUpstreamInfo(handler, o.clusterManager, c.Serializer)
handler = gatewayfilters.WithExtraRequestInfo(handler, &request.ExtraRequestInfoFactory{}, c.Serializer)
handler = gatewayfilters.WithTerminationMetrics(handler)
handler = gatewayfilters.WithRequestInfo(handler, c.RequestInfoResolver)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && o.goawayChance > 0 {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/go-openapi/spec v0.19.3
github.com/gobeam/stringy v0.0.5
github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.3.1 // indirect
github.com/kubewharf/apiserver-runtime v0.0.0
github.com/libp2p/go-reuseport v0.4.0
github.com/pires/go-proxyproto v0.6.2
Expand Down Expand Up @@ -68,7 +69,6 @@ require (
github.com/golang/protobuf v1.3.4 // indirect
github.com/google/go-cmp v0.3.1 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/gophercloud/gophercloud v0.1.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
Expand Down
8 changes: 5 additions & 3 deletions pkg/clusters/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (

// GlobalRateLimiter enable remote limiter for proxy
GlobalRateLimiter featuregate.Feature = "GlobalRateLimiter"

// Tracing enable proxy tracing for trace log and trace metric
Tracing featuregate.Feature = "Tracing"
)

var (
Expand All @@ -54,19 +57,18 @@ var (
CloseConnectionWhenIdle: {Default: false, PreRelease: featuregate.Alpha},
DenyAllRequests: {Default: false, PreRelease: featuregate.Alpha},
GlobalRateLimiter: {Default: false, PreRelease: featuregate.Alpha},
Tracing: {Default: false, PreRelease: featuregate.Alpha},
}

defaultKnownFeatures []string
)

func init() {
runtime.Must(DefaultMutableFeatureGate.Add(defaultFeatureGates))
defaultKnownFeatures = DefaultMutableFeatureGate.KnownFeatures()
}

func IsDefault(fg featuregate.FeatureGate) bool {
// output features is already sorted
inputFeatures := fg.KnownFeatures()
defaultKnownFeatures := DefaultMutableFeatureGate.KnownFeatures()
if len(defaultKnownFeatures) != len(inputFeatures) {
return false
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/gateway/endpoints/filters/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
package filters

import (
"net"
"net/http"

gatewaynet "github.com/kubewharf/kubegateway/pkg/gateway/net"
"github.com/kubewharf/kubegateway/pkg/gateway/endpoints/request"
)

func WithDispatcher(handler http.Handler, dispacher http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
hostname := gatewaynet.HostWithoutPort(req.Host)
if ip := net.ParseIP(hostname); ip != nil {
info, ok := request.ExtraRequestInfoFrom(req.Context())
if !ok || !info.IsProxyRequest {
handler.ServeHTTP(w, req)
return
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/gateway/endpoints/filters/requestinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import (
"fmt"
"net/http"

"k8s.io/apimachinery/pkg/runtime"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"

"github.com/kubewharf/kubegateway/pkg/gateway/endpoints/request"
)

// WithHost attaches a request host to the context.
func WithExtraRequestInfo(handler http.Handler, resolver request.ExtraRequestInfoResolver) http.Handler {
// WithExtraRequestInfo attaches a request host to the context.
func WithExtraRequestInfo(handler http.Handler, resolver request.ExtraRequestInfoResolver, s runtime.NegotiatedSerializer) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
info, err := resolver.NewExtraRequestInfo(req)
Expand All @@ -39,7 +40,7 @@ func WithExtraRequestInfo(handler http.Handler, resolver request.ExtraRequestInf
})
}

// record impersonator because request user will be replaced by impersonatee
// WithImpersonator record impersonator because request user will be replaced by impersonatee
func WithImpersonator(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
Expand Down
109 changes: 109 additions & 0 deletions pkg/gateway/endpoints/filters/tracelog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package filters

import (
"errors"
"fmt"
"io"
"net/http"

utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog"

"github.com/kubewharf/kubegateway/pkg/clusters/features"
"github.com/kubewharf/kubegateway/pkg/gateway/endpoints/request"
"github.com/kubewharf/kubegateway/pkg/gateway/metrics"
"github.com/kubewharf/kubegateway/pkg/util/tracing"
)

// WithTraceLog is a filter that record trace log.
func WithTraceLog(handler http.Handler, enableTracing bool, longRunningRequestCheck apirequest.LongRunningRequestCheck) http.Handler {
if !enableTracing {
return handler
}

klog.V(2).Infof("Enable proxy tracing, ShortRequestLogThreshold=%v, ListRequestLogThreshold=%v", request.ShortRequestLogThreshold, request.ListRequestLogThreshold)

return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()

requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
// if this happens, the handler chain isn't setup correctly because there is no request info
responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context"))
return
}

// Skip tracing long-running requests.
if longRunningRequestCheck(req, requestInfo) {
handler.ServeHTTP(w, req)
return
}

extraInfo, ok := request.ExtraRequestInfoFrom(ctx)
if !ok {
responsewriters.InternalError(w, req, fmt.Errorf("failed to get extra request info from context"))
return
}

cluster := extraInfo.UpstreamCluster
if cluster == nil || !cluster.FeatureEnabled(features.Tracing) {
handler.ServeHTTP(w, req)
return
}

tr := tracing.New(fmt.Sprintf("Trace for %v %v", req.Method, req.RequestURI))
ctx = tracing.WithRequestTraceInfo(ctx, tr)

req = req.WithContext(ctx)

defer func() {
tr.End()

metrics.RecordProxyTraceLatency(tr.StageLatency(), extraInfo.Hostname, requestInfo)

threshold := request.LogThreshold(requestInfo.Verb)
if req.Header.Get("x-debug-trace-log") == "1" || tr.IfLong(threshold) {
tr.WithAttributes(traceFields(req, requestInfo)...)
tr.Log()
}
}()

rd := &traceReader{
ReadCloser: req.Body,
trace: tr,
}
req.Body = rd

handler.ServeHTTP(w, req)
})
}

func traceFields(req *http.Request, requestInfo *apirequest.RequestInfo) []tracing.KeyValue {
sourceIPs := utilnet.SourceIPs(req)
return []tracing.KeyValue{
tracing.StringKeyValue("verb", requestInfo.Verb),
tracing.StringKeyValue("resource", requestInfo.Resource),
tracing.StringKeyValue("name", requestInfo.Name),
tracing.StringKeyValue("host", req.Host),
tracing.StringKeyValue("user-agent", req.Header.Get("User-Agent")),
tracing.StringKeyValue("srcIP", fmt.Sprintf("%v", sourceIPs)),
}
}

var _ io.ReadCloser = &traceReader{}

type traceReader struct {
io.ReadCloser
trace *tracing.RequestTraceInfo
}

// Write implements io.ReadCloser
func (r *traceReader) Read(p []byte) (int, error) {
n, err := r.ReadCloser.Read(p)
if err == io.EOF {
r.trace.Step(tracing.StepReadRequest)
}
return n, err
}
67 changes: 67 additions & 0 deletions pkg/gateway/endpoints/filters/upstreaminfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2022 ByteDance and its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filters

import (
"fmt"
"github.com/kubewharf/kubegateway/pkg/clusters"
"github.com/kubewharf/kubegateway/pkg/clusters/features"
"github.com/kubewharf/kubegateway/pkg/gateway/endpoints/response"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"net"
"net/http"

"github.com/kubewharf/kubegateway/pkg/gateway/endpoints/request"
)

// WithUpstreamInfo attaches upstream cluster info to ExtraRequestInfo
func WithUpstreamInfo(handler http.Handler, clusterManager clusters.Manager, s runtime.NegotiatedSerializer) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()

info, ok := request.ExtraRequestInfoFrom(ctx)
if !ok {
handler.ServeHTTP(w, req)
return
}

if ip := net.ParseIP(info.Hostname); ip == nil {
info.IsProxyRequest = true
cluster, ok := clusterManager.Get(info.Hostname)
if !ok {
response.TerminateWithError(s,
errors.NewServiceUnavailable(fmt.Sprintf("the request cluster(%s) is not being proxied", info.Hostname)),
response.TerminationReasonClusterNotBeingProxied, w, req)
return
}
info.UpstreamCluster = cluster

if cluster.FeatureEnabled(features.CloseConnectionWhenIdle) {
// Send a GOAWAY and tear down the TCP connection when idle.
w.Header().Set("Connection", "close")
}

if cluster.FeatureEnabled(features.DenyAllRequests) {
response.TerminateWithError(s, errors.NewServiceUnavailable(fmt.Sprintf("request for %v denied by featureGate(DenyAllRequests)", info.Hostname)),
response.TerminationReasonCircuitBreaker, w, req)
return
}
}

req = req.WithContext(request.WithExtraRequestInfo(ctx, info))
handler.ServeHTTP(w, req)
})
}
3 changes: 3 additions & 0 deletions pkg/gateway/endpoints/request/requestinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
authenticationv1 "k8s.io/api/authentication/v1"
"k8s.io/apiserver/pkg/authentication/user"

"github.com/kubewharf/kubegateway/pkg/clusters"
"github.com/kubewharf/kubegateway/pkg/gateway/net"
)

Expand Down Expand Up @@ -56,6 +57,8 @@ type ExtraRequestInfo struct {
Hostname string // hostname without port
IsImpersonateRequest bool
Impersonator user.Info
UpstreamCluster *clusters.ClusterInfo
IsProxyRequest bool
}

// WithExtraRequestInfo returns a copy of parent in which the ExtraRequestInfo value is set
Expand Down
42 changes: 42 additions & 0 deletions pkg/gateway/endpoints/request/requestlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package request

import (
"k8s.io/klog"
"os"
"strconv"
"strings"
"time"
)

var (
ShortRequestLogThreshold = time.Second * 5
ListRequestLogThreshold = time.Second * 30
)

func init() {
if val := os.Getenv("SHORT_REQUEST_LOG_THRESHOLD_SECONDS"); len(val) > 0 {
i, err := strconv.Atoi(val)
if err != nil {
klog.Warningf("Illegal REQUEST_TRACE_LOG_THRESHOLD_SECONDS: %v", val)
} else {
ShortRequestLogThreshold = time.Second * time.Duration(i)
}
}

if val := os.Getenv("LIST_REQUEST_LOG_THRESHOLD_SECONDS"); len(val) > 0 {
i, err := strconv.Atoi(val)
if err != nil {
klog.Warningf("Illegal LIST_REQUEST_TRACE_LOG_THRESHOLD_SECONDS: %v", val)
} else {
ListRequestLogThreshold = time.Second * time.Duration(i)
}
}
}

func LogThreshold(verb string) time.Duration {
threshold := ShortRequestLogThreshold
if strings.Contains(strings.ToLower(verb), "list") {
threshold = ListRequestLogThreshold
}
return threshold
}
Loading

0 comments on commit d051695

Please sign in to comment.