Skip to content

Commit

Permalink
Introduce separate structs for distribution
Browse files Browse the repository at this point in the history
The existing distribution didn't have a great separation between the different kinds, like having some fields that only apply to one of the variants and not the others.

This splits out the distributions into separate structs to make the logic more clear. The structs have been moved to separate files, which makes it much easier to grok the logic for a specific distribution variant.

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Nov 7, 2024
1 parent 0ecf5bf commit 0f06d0f
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 158 deletions.
19 changes: 11 additions & 8 deletions linkerd/distribute/src/keys.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use ahash::{HashMap, HashMapExt};
use rand::{
distributions::{WeightedError, WeightedIndex},
prelude::Distribution as _,
Rng,
};
use std::{collections::HashMap, hash::Hash};
use std::hash::Hash;

/// Uniquely identifies a key/backend pair for a distribution. This allows
/// backends to have the same key and still participate in request distribution.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct KeyId {
pub(crate) struct KeyId {
idx: usize,
}

impl KeyId {
pub(crate) fn new(idx: usize) -> Self {
Self { idx }
}
}

#[derive(Debug)]
pub struct ServiceKeys<K> {
ids: Vec<KeyId>,
Expand All @@ -37,6 +32,14 @@ pub(crate) struct WeightedKeySelector<'a, K> {
index: WeightedIndex<u32>,
}

// === impl KeyId ===

impl KeyId {
pub(crate) fn new(idx: usize) -> Self {
Self { idx }
}
}

// === impl UnweightedKeys ===

// PartialEq, Eq, and Hash are all valid to implement for UnweightedKeys since
Expand Down
7 changes: 3 additions & 4 deletions linkerd/distribute/src/params.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use ahash::AHashSet;
use rand::distributions::WeightedError;
use std::{fmt::Debug, hash::Hash, sync::Arc};

use crate::{
keys::{ServiceKeys, WeightedKey},
WeightedServiceKeys,
};
use ahash::AHashSet;
use rand::distributions::WeightedError;
use std::{fmt::Debug, hash::Hash, sync::Arc};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Backends<K>(pub(crate) Arc<AHashSet<K>>)
Expand Down
213 changes: 67 additions & 146 deletions linkerd/distribute/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,26 @@
use self::{first::FirstAvailableSelection, random::RandomAvailableSelection};
use super::Distribution;
use crate::keys::{KeyId, ServiceKeys, WeightedServiceKeys};
use linkerd_stack::{NewService, Service};
use rand::{distributions::WeightedError, rngs::SmallRng, SeedableRng};
use std::{
collections::HashMap,
hash::Hash,
sync::Arc,
task::{Context, Poll},
};

mod first;
mod random;

/// A service that distributes requests over a set of backends.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Distribute<K, S> {
backends: HashMap<KeyId, S>,
selection: Selection<K>,

/// Stores the index of the backend that has been polled to ready. The
/// service at this index will be used on the next invocation of
/// `Service::call`.
ready_idx: Option<KeyId>,
selection: Selection<K, S>,
}

/// Holds per-distribution state for a [`Distribute`] service.
#[derive(Debug)]
enum Selection<K> {
enum Selection<K, S> {
Empty,
FirstAvailable {
keys: Arc<ServiceKeys<K>>,
},
RandomAvailable {
keys: Arc<WeightedServiceKeys<K>>,
rng: SmallRng,
},
FirstAvailable(FirstAvailableSelection<S>),
RandomAvailable(RandomAvailableSelection<K, S>),
}

// === impl Distribute ===
Expand All @@ -41,30 +30,25 @@ impl<K: Hash + Eq, S> Distribute<K, S> {
where
N: for<'a> NewService<&'a K, Service = S>,
{
let backends = Self::make_backends(&dist, make_svc);
Self {
backends,
selection: dist.into(),
ready_idx: None,
selection: Self::make_selection(&dist, make_svc),
}
}

fn make_backends<N>(dist: &Distribution<K>, make_svc: N) -> HashMap<KeyId, S>
fn make_selection<N>(dist: &Distribution<K>, make_svc: N) -> Selection<K, S>
where
N: for<'a> NewService<&'a K, Service = S>,
{
// Build the backends needed for this distribution, in the required
// order (so that weighted indices align).
match dist {
Distribution::Empty => HashMap::new(),
Distribution::FirstAvailable(keys) => keys
.iter()
.map(|&id| (id, make_svc.new_service(keys.get(id))))
.collect(),
Distribution::RandomAvailable(keys) => keys
.iter()
.map(|&id| (id, make_svc.new_service(&keys.get(id).key)))
.collect(),
Distribution::Empty => Selection::Empty,
Distribution::FirstAvailable(keys) => {
Selection::FirstAvailable(FirstAvailableSelection::new(keys, make_svc))
}
Distribution::RandomAvailable(keys) => {
Selection::RandomAvailable(RandomAvailableSelection::new(keys, make_svc))
}
}
}
}
Expand All @@ -84,132 +68,49 @@ where
/// readiness. We expect that these inner services should be buffered or
/// otherwise drive themselves to readiness (i.e. via SpawnReady).
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// If we've already chosen a ready index, then skip polling.
if self.ready_idx.is_some() {
return Poll::Ready(Ok(()));
}

match self.selection {
match &mut self.selection {
Selection::Empty => {
tracing::debug!("empty distribution will never become ready");
Poll::Pending
}

Selection::FirstAvailable { ref keys } => {
for id in keys.iter() {
let svc = self
.backends
.get_mut(id)
.expect("distributions must not reference unknown backends");
if svc.poll_ready(cx)?.is_ready() {
self.ready_idx = Some(*id);
return Poll::Ready(Ok(()));
}
}
}

// Choose a random index (via the weighted distribution) to try to
// poll the backend. Continue selecting endpoints until we find one
// that is ready or we've tried all backends in the distribution.
Selection::RandomAvailable {
ref keys,
ref mut rng,
} => {
let mut selector = keys.selector();
loop {
let id = selector.select_weighted(rng);
let svc = self
.backends
.get_mut(&id)
.expect("distributions must not reference unknown backends");

if svc.poll_ready(cx)?.is_ready() {
self.ready_idx = Some(id);
return Poll::Ready(Ok(()));
}

// Since the backend we just tried isn't ready, zero out the weight
// so that it's not tried again in this round, i.e. subsequent calls
// to `poll_ready` can try this backend again.
match selector.disable_backend(id) {
Ok(()) => {}
Err(WeightedError::AllWeightsZero) => {
// There are no backends remaining.
break;
}
Err(error) => {
tracing::error!(%error, "unexpected error updating weights; giving up");
break;
}
}
}
}
Selection::FirstAvailable(s) => s.poll_ready(cx),
Selection::RandomAvailable(s) => s.poll_ready(cx),
}

debug_assert!(self.ready_idx.is_none());
tracing::trace!("no ready services in distribution");
Poll::Pending
}

fn call(&mut self, req: Req) -> Self::Future {
let id = self
.ready_idx
.take()
.expect("poll_ready must be called first");

let svc = self.backends.get_mut(&id).expect("index must exist");

svc.call(req)
}
}

impl<K: Clone, S: Clone> Clone for Distribute<K, S> {
fn clone(&self) -> Self {
Self {
backends: self.backends.clone(),
selection: self.selection.clone(),
// Clear the ready index so that the new clone must become ready
// independently.
ready_idx: None,
match &mut self.selection {
Selection::Empty => unreachable!("Empty selection is never ready"),
Selection::FirstAvailable(s) => s.call(req),
Selection::RandomAvailable(s) => s.call(req),
}
}
}

impl<K, S> Default for Distribute<K, S> {
// === impl Selection ===

impl<K, S> Default for Selection<K, S> {
/// Returns an empty distribution. This distribution will never become
/// ready.
fn default() -> Self {
Self {
backends: Default::default(),
selection: Selection::Empty,
ready_idx: None,
}
Self::Empty
}
}

// === impl Selection ===

impl<K> From<Distribution<K>> for Selection<K> {
fn from(dist: Distribution<K>) -> Self {
match dist {
Distribution::Empty => Self::Empty,
Distribution::FirstAvailable(keys) => Self::FirstAvailable { keys },
Distribution::RandomAvailable(keys) => Self::RandomAvailable {
keys,
rng: SmallRng::from_rng(rand::thread_rng()).expect("RNG must initialize"),
},
impl<K, S: Clone> Clone for Selection<K, S> {
fn clone(&self) -> Self {
match self {
Self::Empty => Self::Empty,
Self::FirstAvailable(s) => Self::FirstAvailable(s.clone()),
Self::RandomAvailable(s) => Self::RandomAvailable(s.clone()),
}
}
}

impl<K> Clone for Selection<K> {
fn clone(&self) -> Self {
match self {
Self::Empty => Selection::Empty,
Self::FirstAvailable { keys } => Self::FirstAvailable { keys: keys.clone() },
Self::RandomAvailable { keys, .. } => Self::RandomAvailable {
keys: keys.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).expect("RNG must initialize"),
},
impl<K, S> Default for Distribute<K, S> {
fn default() -> Self {
Self {
selection: Selection::default(),
}
}
}
Expand All @@ -218,6 +119,8 @@ impl<K> Clone for Selection<K> {
mod tests {
use std::cell::RefCell;

use crate::keys::KeyId;

use super::*;
use tokio_test::*;
use tower_test::mock;
Expand Down Expand Up @@ -254,7 +157,7 @@ mod tests {
Default::default(),
|_: &&str| panic!("Empty service should never call make_svc"),
));
assert_eq!(dist_svc.get_ref().backends.len(), 0);
assert!(matches!(dist_svc.get_ref().selection, Selection::Empty));
assert_pending!(dist_svc.poll_ready());
}

Expand Down Expand Up @@ -286,7 +189,10 @@ mod tests {
scully_ctl.allow(1);
mulder_ctl.allow(1);
assert_ready_ok!(dist_svc.poll_ready());
assert_eq!(dist_svc.get_ref().ready_idx, Some(KeyId::new(0)));
let Selection::FirstAvailable(selection) = &dist_svc.get_ref().selection else {
panic!()
};
assert_eq!(selection.get_ready_idx(), Some(0));
let mut call = task::spawn(dist_svc.call(()));
match assert_ready!(mulder_ctl.poll_request()) {
Some(((), rsp)) => rsp.send_response(()),
Expand All @@ -307,7 +213,10 @@ mod tests {
mulder_ctl.allow(0);
scully_ctl.allow(1);
assert_ready_ok!(dist_svc.poll_ready());
assert_eq!(dist_svc.get_ref().ready_idx, Some(KeyId::new(1)));
let Selection::FirstAvailable(selection) = &dist_svc.get_ref().selection else {
panic!()
};
assert_eq!(selection.get_ready_idx(), Some(1));
let mut call = task::spawn(dist_svc.call(()));
match assert_ready!(scully_ctl.poll_request()) {
Some(((), rsp)) => rsp.send_response(()),
Expand All @@ -328,7 +237,10 @@ mod tests {
mulder_2_ctl.allow(1);
mulder_1_ctl.allow(1);
assert_ready_ok!(dist_svc.poll_ready());
assert_eq!(dist_svc.get_ref().ready_idx, Some(KeyId::new(0)));
let Selection::FirstAvailable(selection) = &dist_svc.get_ref().selection else {
panic!()
};
assert_eq!(selection.get_ready_idx(), Some(0));
let mut call = task::spawn(dist_svc.call(()));
match assert_ready!(mulder_1_ctl.poll_request()) {
Some(((), rsp)) => rsp.send_response(()),
Expand Down Expand Up @@ -371,7 +283,10 @@ mod tests {
scully_ctl.allow(1);
skinner_ctl.allow(1);
assert_ready_ok!(dist_svc.poll_ready());
assert_eq!(dist_svc.get_ref().ready_idx, Some(KeyId::new(1)));
let Selection::RandomAvailable(selection) = &dist_svc.get_ref().selection else {
panic!()
};
assert_eq!(selection.get_ready_idx(), Some(KeyId::new(1)));
let mut call = task::spawn(dist_svc.call(()));
match assert_ready!(scully_ctl.poll_request()) {
Some(((), rsp)) => rsp.send_response(()),
Expand All @@ -395,7 +310,10 @@ mod tests {
scully_ctl.allow(0);
skinner_ctl.allow(0);
assert_ready_ok!(dist_svc.poll_ready());
assert_eq!(dist_svc.get_ref().ready_idx, Some(KeyId::new(0)));
let Selection::RandomAvailable(selection) = &dist_svc.get_ref().selection else {
panic!()
};
assert_eq!(selection.get_ready_idx(), Some(KeyId::new(0)));
let mut call = task::spawn(dist_svc.call(()));
match assert_ready!(mulder_ctl.poll_request()) {
Some(((), rsp)) => rsp.send_response(()),
Expand All @@ -419,7 +337,10 @@ mod tests {
mulder_2_ctl.allow(1);
mulder_3_ctl.allow(1);
assert_ready_ok!(dist_svc.poll_ready());
assert_eq!(dist_svc.get_ref().ready_idx, Some(KeyId::new(1)));
let Selection::RandomAvailable(selection) = &dist_svc.get_ref().selection else {
panic!()
};
assert_eq!(selection.get_ready_idx(), Some(KeyId::new(1)));
let mut call = task::spawn(dist_svc.call(()));
match assert_ready!(mulder_2_ctl.poll_request()) {
Some(((), rsp)) => rsp.send_response(()),
Expand Down
Loading

0 comments on commit 0f06d0f

Please sign in to comment.