Skip to content

Commit

Permalink
defined the output from the Common Subset algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
vkomenda committed May 7, 2018
1 parent 8b7f7b4 commit 15353e8
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 44 deletions.
8 changes: 7 additions & 1 deletion src/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
let mut outgoing = VecDeque::new();

match *message {
_ if self.terminated => {
// The algorithm instance has already terminated.
Err(Error::Terminated)
}

AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => {
update_map_of_sets(&mut self.received_bval, uid, b);
let count_bval = self.received_bval.iter().fold(0, |count, (_, values)| {
Expand Down Expand Up @@ -113,7 +118,7 @@ impl<NodeUid: Clone + Eq + Hash> Agreement<NodeUid> {
}
}

AgreementMessage::Aux((_epoch, b)) => {
AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => {
update_map_of_sets(&mut self.received_aux, uid, b);
if !self.bin_values.is_empty() {
let coin_result = self.try_coin();
Expand Down Expand Up @@ -231,5 +236,6 @@ where

#[derive(Clone, Debug)]
pub enum Error {
Terminated,
NotImplemented,
}
88 changes: 45 additions & 43 deletions src/common_subset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use proto::{AgreementMessage, BroadcastMessage};

// TODO: Make this a generic argument of `Broadcast`.
type ProposedValue = Vec<u8>;
// Type of output from the Common Subset message handler.
type CommonSubsetOutput<NodeUid> = (Option<HashSet<ProposedValue>>, VecDeque<Output<NodeUid>>);

/// Input from a remote node to Common Subset.
pub enum Input<NodeUid> {
Expand All @@ -27,9 +29,6 @@ pub enum Input<NodeUid> {
}

/// Output from Common Subset to remote nodes.
///
/// FIXME: We can do an interface that doesn't need this type and instead works
/// directly with the `TargetBroadcastMessage` and `AgreementMessage`.
pub enum Output<NodeUid> {
/// A broadcast message to be sent to the destination set in the
/// `TargetedBroadcastMessage`.
Expand All @@ -46,6 +45,8 @@ pub struct CommonSubset<NodeUid: Eq + Hash> {
broadcast_instances: HashMap<NodeUid, Broadcast<NodeUid>>,
agreement_instances: HashMap<NodeUid, Agreement<NodeUid>>,
broadcast_results: HashMap<NodeUid, ProposedValue>,
/// FIXME: The result may be a set of bool rather than a single bool due to
/// the ability of Agreement to output multiple values.
agreement_results: HashMap<NodeUid, bool>,
}

Expand Down Expand Up @@ -100,10 +101,7 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {

/// Upon delivery of v_j from RBC_j, if input has not yet been provided to
/// BA_j, then provide input 1 to BA_j. See Figure 11.
pub fn on_broadcast_result(
&mut self,
uid: &NodeUid,
) -> Result<Option<AgreementMessage>, Error> {
fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result<Option<AgreementMessage>, Error> {
if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) {
if !agreement_instance.has_input() {
Ok(Some(agreement_instance.set_input(true)))
Expand All @@ -115,45 +113,55 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
}
}

/// Receive input from a remote node.
/// Receive input from a remote node. The output contains an optional result
/// of the Common Subset algorithm - a set of proposed values - and a queue
/// of messages to be sent to remote nodes, or an error.
pub fn on_input(
&mut self,
message: Input<NodeUid>,
) -> Result<VecDeque<Output<NodeUid>>, Error> {
) -> Result<CommonSubsetOutput<NodeUid>, Error> {
match message {
Input::Broadcast(uid, bmessage) => {
let mut instance_result = None;
let input_result = {
let input_result: Result<VecDeque<Output<NodeUid>>, Error> = {
if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) {
broadcast_instance
.handle_broadcast_message(&uid, bmessage)
.map(|(value, queue)| {
instance_result = value;
.map(|(opt_value, queue)| {
instance_result = opt_value;
queue.into_iter().map(Output::Broadcast).collect()
})
.map_err(Error::from)
} else {
Err(Error::NoSuchBroadcastInstance)
}
};
if instance_result.is_some() {
self.on_broadcast_result(&uid)?;
let mut opt_message: Option<AgreementMessage> = None;
if let Some(value) = instance_result {
self.broadcast_results.insert(uid.clone(), value);
opt_message = self.on_broadcast_result(&uid)?;
}
input_result
input_result.map(|mut queue| {
if let Some(agreement_message) = opt_message {
// Append the message to agreement nodes to the common output queue.
queue.push_back(Output::Agreement(agreement_message))
}
(None, queue)
})
}

Input::Agreement(uid, amessage) => {
// The result defaults to error.
let mut result = Err(Error::NoSuchAgreementInstance);

// FIXME: send the message to the Agreement instance and
if let Some(mut agreement_instance) = self.agreement_instances.get_mut(&uid) {
// Optional output of agreement and outgoing agreement
// messages to remote nodes.
result = if agreement_instance.terminated() {
// This instance has terminated and does not accept input.
Ok((None, VecDeque::new()))
} else {
// Send the message to the agreement instance.
agreement_instance
.on_input(uid.clone(), &amessage)
.map_err(Error::from)
Expand All @@ -164,37 +172,36 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
if let Some(b) = output {
outgoing.append(&mut self.on_agreement_result(uid, b));
}
Ok(outgoing.into_iter().map(Output::Agreement).collect())
Ok((
self.try_agreement_completion(),
outgoing.into_iter().map(Output::Agreement).collect(),
))
} else {
// error
result
.map(|(_, messages)| messages.into_iter().map(Output::Agreement).collect())
result.map(|(_, messages)| {
(None, messages.into_iter().map(Output::Agreement).collect())
})
}
}
}
}

/// Callback to be invoked on receipt of a returned value of the Agreement
/// instance `uid`.
///
/// FIXME: It is likely that only one `AgreementMessage` is required because
/// Figure 11 does not count the number of messages but the number of nodes
/// that sent messages.
fn on_agreement_result(&mut self, uid: NodeUid, result: bool) -> VecDeque<AgreementMessage> {
let mut outgoing = VecDeque::new();
// Upon delivery of value 1 from at least N − f instances of BA, provide
// input 0 to each instance of BA that has not yet been provided input.
if result {
self.agreement_results.insert(uid, result);
let results1: Vec<bool> = self.agreement_results
.iter()
.map(|(_, v)| *v)
.filter(|b| *b)
.collect();
// The number of instances of BA that output 1.
let results1: usize =
self.agreement_results
.iter()
.fold(0, |count, (_, v)| if *v { count + 1 } else { count });

if results1.len() >= self.num_nodes - self.num_faulty_nodes {
let instances = &mut self.agreement_instances;
for (_uid0, instance) in instances.iter_mut() {
if results1 >= self.num_nodes - self.num_faulty_nodes {
for instance in self.agreement_instances.values_mut() {
if !instance.has_input() {
outgoing.push_back(instance.set_input(false));
}
Expand All @@ -204,24 +211,19 @@ impl<NodeUid: Clone + Debug + Display + Eq + Hash + Ord> CommonSubset<NodeUid> {
outgoing
}

pub fn on_agreement_completion(&self) -> Option<HashSet<ProposedValue>> {
fn try_agreement_completion(&self) -> Option<HashSet<ProposedValue>> {
// Once all instances of BA have completed, let C ⊂ [1..N] be
// the indexes of each BA that delivered 1. Wait for the output
// v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j.
let instance_uids: HashSet<NodeUid> = self.agreement_instances
.iter()
.map(|(k, _)| k.clone())
.collect();
let completed_uids: HashSet<NodeUid> = self.agreement_results
if self.agreement_instances
.iter()
.map(|(k, _)| k.clone())
.collect();
if instance_uids == completed_uids {
// All instances of Agreement that delivered `true`.
let delivered_1: HashSet<NodeUid> = self.agreement_results
.all(|(_, instance)| instance.terminated())
{
// All instances of Agreement that delivered `true` (or "1" in the paper).
let delivered_1: HashSet<&NodeUid> = self.agreement_results
.iter()
.filter(|(_, v)| **v)
.map(|(k, _)| k.clone())
.map(|(k, _)| k)
.collect();
// Results of Broadcast instances in `delivered_1`
let broadcast_results: HashSet<ProposedValue> = self.broadcast_results
Expand Down

0 comments on commit 15353e8

Please sign in to comment.