Skip to content

Commit

Permalink
Add round-robin upstream policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Barenboim committed May 25, 2022
1 parent 5b436d3 commit 290c9d7
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 26 deletions.
28 changes: 22 additions & 6 deletions src/manager/UpstreamManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ class __UpstreamManager
std::vector<UPSGroupPolicy *> upstream_policies;
};

int UpstreamManager::upstream_create_round_robin(const std::string& name,
bool try_another)
{
WFNameService *ns = WFGlobal::get_name_service();
auto *policy = new UPSRoundRobinPolicy(try_another);

if (ns->add_policy(name.c_str(), policy) >= 0)
{
__UpstreamManager::get_instance()->add_upstream_policy(policy);
return 0;
}

delete policy;
return -1;
}

static unsigned int __default_consistent_hash(const char *path,
const char *query,
const char *fragment)
Expand All @@ -70,7 +86,7 @@ static unsigned int __default_consistent_hash(const char *path,
int UpstreamManager::upstream_create_consistent_hash(const std::string& name,
upstream_route_t consistent_hash)
{
auto *ns = WFGlobal::get_name_service();
WFNameService *ns = WFGlobal::get_name_service();
UPSConsistentHashPolicy *policy;

policy = new UPSConsistentHashPolicy(
Expand All @@ -89,8 +105,8 @@ int UpstreamManager::upstream_create_consistent_hash(const std::string& name,
int UpstreamManager::upstream_create_weighted_random(const std::string& name,
bool try_another)
{
auto *ns = WFGlobal::get_name_service();
UPSWeightedRandomPolicy *policy = new UPSWeightedRandomPolicy(try_another);
WFNameService *ns = WFGlobal::get_name_service();
auto *policy = new UPSWeightedRandomPolicy(try_another);

if (ns->add_policy(name.c_str(), policy) >= 0)
{
Expand All @@ -104,8 +120,8 @@ int UpstreamManager::upstream_create_weighted_random(const std::string& name,

int UpstreamManager::upstream_create_vnswrr(const std::string& name)
{
auto *ns = WFGlobal::get_name_service();
UPSWeightedRandomPolicy *policy = new UPSVNSWRRPolicy();
WFNameService *ns = WFGlobal::get_name_service();
auto *policy = new UPSVNSWRRPolicy();

if (ns->add_policy(name.c_str(), policy) >= 0)
{
Expand All @@ -122,7 +138,7 @@ int UpstreamManager::upstream_create_manual(const std::string& name,
bool try_another,
upstream_route_t consistent_hash)
{
auto *ns = WFGlobal::get_name_service();
WFNameService *ns = WFGlobal::get_name_service();
UPSManualPolicy *policy;

policy = new UPSManualPolicy(try_another, std::move(select),
Expand Down
15 changes: 15 additions & 0 deletions src/manager/UpstreamManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@
class UpstreamManager
{
public:
/**
* @brief MODE 0: round-robin select
* @param[in] name upstream name
* @param[in] try_another when first choice is failed, try another one or not
* @return success/fail
* @retval 0 success
* @retval -1 fail, more info see errno
* @note
* when first choose server is already down:
* - if try_another==false, request will be failed
* - if try_another==true, upstream will choose the next
*/
static int upstream_create_round_robin(const std::string& name,
bool try_another);

/**
* @brief MODE 1: consistent-hashing select
* @param[in] name upstream name
Expand Down
24 changes: 14 additions & 10 deletions src/nameservice/UpstreamPolicies.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ bool UPSGroupPolicy::select(const ParsedURI& uri, WFNSTracing *tracing,

this->check_breaker();

// select_addr == NULL will only happened in consistent_hash
// select_addr == NULL will happen in consistent_hash
EndpointAddress *select_addr = this->first_strategy(uri, tracing);

if (!select_addr || select_addr->fail_count >= select_addr->params->max_fails)
Expand Down Expand Up @@ -331,8 +331,6 @@ void UPSGroupPolicy::add_server_locked(EndpointAddress *addr)
group->backups.push_back(addr);
pthread_mutex_unlock(&group->mutex);
this->server_list_change(addr, ADD_SERVER);

return;
}

int UPSGroupPolicy::remove_server_locked(const std::string& address)
Expand Down Expand Up @@ -448,15 +446,26 @@ void UPSGroupPolicy::hash_map_remove_addr(const std::string& address)
}
}

EndpointAddress *UPSRoundRobinPolicy::first_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
return this->servers[this->cur_idx++ % this->servers.size()];
}

EndpointAddress *UPSRoundRobinPolicy::another_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
EndpointAddress *addr = this->servers[this->cur_idx++ % this->servers.size()];
return this->check_and_get(addr, false, tracing);
}

void UPSWeightedRandomPolicy::add_server_locked(EndpointAddress *addr)
{
UPSAddrParams *params = static_cast<UPSAddrParams *>(addr->params);

UPSGroupPolicy::add_server_locked(addr);
if (params->server_type == 0)
this->total_weight += params->weight;

return;
}

int UPSWeightedRandomPolicy::remove_server_locked(const std::string& address)
Expand Down Expand Up @@ -638,7 +647,6 @@ void UPSVNSWRRPolicy::add_server_locked(EndpointAddress *addr)
{
UPSWeightedRandomPolicy::add_server_locked(addr);
init();
return;
}

int UPSVNSWRRPolicy::remove_server_locked(const std::string& address)
Expand All @@ -662,8 +670,6 @@ void UPSConsistentHashPolicy::add_server_locked(EndpointAddress *addr)
{
UPSGroupPolicy::add_server_locked(addr);
this->hash_map_add_addr(addr);

return;
}

int UPSConsistentHashPolicy::remove_server_locked(const std::string& address)
Expand Down Expand Up @@ -702,8 +708,6 @@ void UPSManualPolicy::add_server_locked(EndpointAddress *addr)

if (this->try_another)
this->hash_map_add_addr(addr);

return;
}

int UPSManualPolicy::remove_server_locked(const std::string& address)
Expand Down
46 changes: 36 additions & 10 deletions src/nameservice/UpstreamPolicies.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
#include <utility>
#include <map>
#include <vector>
#include <atomic>
#include <functional>
#include "URIParser.h"
#include "EndpointParams.h"
#include "WFNameService.h"
#include "WFServiceGovernance.h"

using upstream_route_t = std::function<unsigned int (const char *, const char *, const char *)>;
using upstream_route_t = std::function<unsigned int (const char *path,
const char *query,
const char *fragment)>;

class EndpointGroup;
class UPSGroupPolicy;
Expand All @@ -50,8 +53,9 @@ class UPSGroupPolicy : public WFServiceGovernance
{
public:
UPSGroupPolicy();
~UPSGroupPolicy();
virtual ~UPSGroupPolicy();

public:
virtual bool select(const ParsedURI& uri, WFNSTracing *tracing,
EndpointAddress **addr);
virtual void add_server(const std::string& address,
Expand All @@ -72,8 +76,8 @@ class UPSGroupPolicy : public WFServiceGovernance
virtual void add_server_locked(EndpointAddress *addr);
virtual int remove_server_locked(const std::string& address);

EndpointAddress *check_and_get(EndpointAddress *addr,
bool addr_failed, WFNSTracing *tracing);
EndpointAddress *check_and_get(EndpointAddress *addr, bool addr_failed,
WFNSTracing *tracing);

bool is_alive(const EndpointAddress *addr) const;

Expand All @@ -86,6 +90,24 @@ class UPSGroupPolicy : public WFServiceGovernance
std::map<unsigned int, EndpointAddress *> addr_hash;
};

class UPSRoundRobinPolicy : public UPSGroupPolicy
{
public:
UPSRoundRobinPolicy(bool try_another)
{
this->try_another = try_another;
}

protected:
virtual EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
virtual EndpointAddress *another_strategy(const ParsedURI& uri,
WFNSTracing *tracing);

protected:
std::atomic<size_t> cur_idx;
};

class UPSWeightedRandomPolicy : public UPSGroupPolicy
{
public:
Expand All @@ -95,10 +117,12 @@ class UPSWeightedRandomPolicy : public UPSGroupPolicy
this->available_weight = 0;
this->try_another = try_another;
}
EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
EndpointAddress *another_strategy(const ParsedURI& uri,
WFNSTracing *tracing);

protected:
virtual EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
virtual EndpointAddress *another_strategy(const ParsedURI& uri,
WFNSTracing *tracing);

protected:
virtual void add_server_locked(EndpointAddress *addr);
Expand All @@ -120,8 +144,10 @@ class UPSVNSWRRPolicy : public UPSWeightedRandomPolicy
this->cur_idx = 0;
this->try_another = false;
};
EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);

protected:
virtual EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);

private:
virtual void add_server_locked(EndpointAddress *addr);
Expand Down

0 comments on commit 290c9d7

Please sign in to comment.