From 2597672df5df387818396105e471fa6128dd008e Mon Sep 17 00:00:00 2001 From: Sylvain Bellemare Date: Tue, 3 Jul 2018 11:17:52 +0100 Subject: [PATCH] Add CONF message phase to BA Addresses #59 --- honeybadgerbft/core/binaryagreement.py | 65 ++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/honeybadgerbft/core/binaryagreement.py b/honeybadgerbft/core/binaryagreement.py index 142a0d0..f38370e 100644 --- a/honeybadgerbft/core/binaryagreement.py +++ b/honeybadgerbft/core/binaryagreement.py @@ -2,12 +2,15 @@ from gevent.event import Event from collections import defaultdict +from distutils.util import strtobool +from os import environ import logging from honeybadgerbft.exceptions import RedundantMessageError, AbandonedNodeError logger = logging.getLogger(__name__) +CONF_PHASE = strtobool(environ.get('CONF_PHASE', '1')) def binaryagreement(sid, pid, N, f, coin, input, decide, broadcast, receive): @@ -28,7 +31,9 @@ def binaryagreement(sid, pid, N, f, coin, input, decide, broadcast, receive): # Messages received are routed to either a shared coin, the broadcast, or AUX est_values = defaultdict(lambda: [set(), set()]) aux_values = defaultdict(lambda: [set(), set()]) + conf_values = defaultdict(lambda: {(0,): set(), (1,): set(), (0, 1): set()}) est_sent = defaultdict(lambda: [False, False]) + conf_sent = defaultdict(lambda: {(0,): False, (1,): False, (0, 1): False}) bin_values = defaultdict(set) # This event is triggered whenever bin_values or aux_values changes @@ -100,6 +105,27 @@ def _recv(): bv_signal.set() + elif msg[0] == 'CONF' and CONF_PHASE: + _, r, v = msg + assert v in ((0,), (1,), (0, 1)) + if sender in conf_values[r][v]: + logger.warn(f'Redundant CONF received {msg} by {sender}', + extra={'nodeid': pid, 'epoch': r}) + # FIXME: Raise for now to simplify things & be consistent + # with how other TAGs are handled. Will replace the raise + # with a continue statement as part of + # https://github.com/initc3/HoneyBadgerBFT-Python/issues/10 + raise RedundantMessageError( + 'Redundant CONF received {}'.format(msg)) + + conf_values[r][v].add(sender) + logger.debug( + f'add v = {v} to conf_value[{r}] = {conf_values[r]}', + extra={'nodeid': pid, 'epoch': r}, + ) + + bv_signal.set() + # Translate mmr14 broadcast into coin.broadcast # _coin_broadcast = lambda (r, sig): broadcast(('COIN', r, sig)) # _coin_recv = Queue() @@ -159,6 +185,39 @@ def _recv(): logger.debug(f'Completed AUX phase with values = {values}', extra={'nodeid': pid, 'epoch': r}) + + # XXX CONF phase + logger.debug( + f'block until at least N-f ({N-f}) CONF values are received', + extra={'nodeid': pid, 'epoch': r}) + if CONF_PHASE and not conf_sent[r][tuple(values)]: + conf_sent[r][tuple(values)] = True + logger.debug(f"broadcast {('CONF', r, tuple(values))}", + extra={'nodeid': pid, 'epoch': r}) + broadcast(('CONF', r, tuple(bin_values[r]))) + while True: + logger.debug( + f'looping ... conf_values[r] is: {conf_values[r]}', + extra={'nodeid': pid, 'epoch': r}, + ) + if 1 in bin_values[r] and len(conf_values[r][(1,)]) >= N - f: + values = set((1,)) + break + if 0 in bin_values[r] and len(conf_values[r][(0,)]) >= N - f: + values = set((0,)) + break + if (sum(len(senders) for conf_value, senders in + conf_values[r].items() if senders and + set(conf_value).issubset(bin_values[r])) >= N - f): + values = set((0, 1)) + break + + bv_signal.clear() + bv_signal.wait() + + logger.debug(f'Completed CONF phase with values = {values}', + extra={'nodeid': pid, 'epoch': r}) + logger.debug( f'Block until receiving the common coin value', extra={'nodeid': pid, 'epoch': r}, @@ -174,8 +233,6 @@ def _recv(): s=s, already_decided=already_decided, decide=decide, - nodeid=pid, - epoch=r, ) except AbandonedNodeError: # print('[sid:%s] [pid:%d] QUITTING in round %d' % (sid,pid,r))) @@ -187,13 +244,11 @@ def _recv(): r += 1 -def set_new_estimate(*, values, s, already_decided, decide, nodeid, epoch): +def set_new_estimate(*, values, s, already_decided, decide): if len(values) == 1: v = next(iter(values)) if v == s: if already_decided is None: - logger.debug(f'DECIDE on value {v}', - extra={'nodeid': pid, 'epoch': epoch}) already_decided = v decide(v) elif already_decided == v: