Skip to content

Commit

Permalink
Refactor BackendManager / BackendStorage.
Browse files Browse the repository at this point in the history
This also fixes kubernetes-sigs#294
  • Loading branch information
jkh52 committed Apr 12, 2024
1 parent a650c3b commit 1a61ebb
Show file tree
Hide file tree
Showing 10 changed files with 985 additions and 842 deletions.
358 changes: 75 additions & 283 deletions pkg/server/backend_manager.go

Large diffs are not rendered by default.

655 changes: 312 additions & 343 deletions pkg/server/backend_manager_test.go

Large diffs are not rendered by default.

59 changes: 0 additions & 59 deletions pkg/server/default_route_backend_manager.go

This file was deleted.

86 changes: 0 additions & 86 deletions pkg/server/desthost_backend_manager.go

This file was deleted.

9 changes: 1 addition & 8 deletions pkg/server/readiness_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,4 @@ type ReadinessManager interface {
Ready() (bool, string)
}

var _ ReadinessManager = &DefaultBackendStorage{}

func (s *DefaultBackendStorage) Ready() (bool, string) {
if s.NumBackends() == 0 {
return false, "no connection to any proxy agent"
}
return true, ""
}
var _ ReadinessManager = &DefaultBackendManager{}
61 changes: 13 additions & 48 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ func (pm *PendingDialManager) removeForStream(streamUID string) []*ProxyClientCo

// ProxyServer
type ProxyServer struct {
// BackendManagers contains a list of BackendManagers
BackendManagers []BackendManager
BackendManager BackendManager

// Readiness reports if the proxy server is ready, i.e., if the proxy
// server has connections to proxy agents (backends). Note that the
Expand All @@ -215,9 +214,6 @@ type ProxyServer struct {

// agent authentication
AgentAuthenticationOptions *AgentTokenAuthenticationOptions

// TODO: move strategies into BackendStorage
proxyStrategies []ProxyStrategy
}

// AgentTokenAuthenticationOptions contains list of parameters required for agent token based authentication
Expand All @@ -233,45 +229,28 @@ var _ agent.AgentServiceServer = &ProxyServer{}

var _ client.ProxyServiceServer = &ProxyServer{}

func genContext(proxyStrategies []ProxyStrategy, reqHost string) context.Context {
func genContext(reqHost string) context.Context {
ctx := context.Background()
for _, ps := range proxyStrategies {
switch ps {
case ProxyStrategyDestHost:
addr := util.RemovePortFromHost(reqHost)
ctx = context.WithValue(ctx, destHostKey, addr)
}
}
addr := util.RemovePortFromHost(reqHost)
ctx = context.WithValue(ctx, destHostKey, addr)
return ctx
}

func (s *ProxyServer) getBackend(reqHost string) (Backend, error) {
ctx := genContext(s.proxyStrategies, reqHost)
for _, bm := range s.BackendManagers {
be, err := bm.Backend(ctx)
if err == nil {
return be, nil
}
if ignoreNotFound(err) != nil {
// if can't find a backend through current BackendManager, move on
// to the next one
return nil, err
}
ctx := genContext(reqHost)
be, err := s.BackendManager.Backend(ctx)
if err == nil {
return be, nil
}
return nil, &ErrNotFound{}
}

func (s *ProxyServer) addBackend(backend Backend) {
// TODO: refactor BackendStorage to acquire lock once, not up to 3 times.
for _, bm := range s.BackendManagers {
bm.AddBackend(backend)
}
s.BackendManager.AddBackend(backend)
}

func (s *ProxyServer) removeBackend(backend Backend) {
for _, bm := range s.BackendManagers {
bm.RemoveBackend(backend)
}
s.BackendManager.RemoveBackend(backend)
}

func (s *ProxyServer) addEstablished(agentID string, connID int64, p *ProxyClientConnection) {
Expand Down Expand Up @@ -377,30 +356,16 @@ func (s *ProxyServer) removeEstablishedForStream(streamUID string) []*ProxyClien

// NewProxyServer creates a new ProxyServer instance
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions) *ProxyServer {
var bms []BackendManager
for _, ps := range proxyStrategies {
switch ps {
case ProxyStrategyDestHost:
bms = append(bms, NewDestHostBackendManager())
case ProxyStrategyDefault:
bms = append(bms, NewDefaultBackendManager())
case ProxyStrategyDefaultRoute:
bms = append(bms, NewDefaultRouteBackendManager())
default:
klog.ErrorS(nil, "Unknown proxy strategy", "strategy", ps)
}
}
bm := NewDefaultBackendManager(proxyStrategies)

return &ProxyServer{
established: make(map[string](map[int64]*ProxyClientConnection)),
PendingDial: NewPendingDialManager(),
serverID: serverID,
serverCount: serverCount,
BackendManagers: bms,
BackendManager: bm,
AgentAuthenticationOptions: agentAuthenticationOptions,
// use the first backend-manager as the Readiness Manager
Readiness: bms[0],
proxyStrategies: proxyStrategies,
Readiness: bm,
}
}

Expand Down
Loading

0 comments on commit 1a61ebb

Please sign in to comment.