From 3174151ad85481bdf105c7d20aa4a18244ca9e5d Mon Sep 17 00:00:00 2001 From: Sasha Pourcelot Date: Wed, 3 Feb 2021 18:25:08 +0100 Subject: [PATCH 1/9] Add initial implementation for MessageHandler This commit adds very basic implementation of MessageHandler. This structure stores a SignedMessage, and calls, depending on which kind of message it is, and on its underlying type, either a specified closure, or a fallback one. The goal is to provide an API that would be nicer to work with than the msg! macro. Current implementation features a state-machine like algorithm and currently only handles messages that can responded to (aka "questions"). --- src/bastion/src/message.rs | 73 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/src/bastion/src/message.rs b/src/bastion/src/message.rs index cba09d79..6d0f800f 100644 --- a/src/bastion/src/message.rs +++ b/src/bastion/src/message.rs @@ -865,3 +865,76 @@ macro_rules! answer { sender.send($answer, sign) }}; } + +#[derive(Debug)] +pub struct MessageHandler { + msg: Option, +} + +impl MessageHandler { + pub fn new(msg: SignedMessage) -> MessageHandler { + let msg = Some(msg); + MessageHandler { msg } + } + + pub fn on_question(self, f: F) -> MessageHandler + where + T: 'static, + F: FnOnce(T, AnswerSender), + { + match self.try_into_question::() { + Ok((arg, sender)) => { + f(arg, sender); + MessageHandler::empty() + } + Err(this) => this, + } + } + + pub fn on_fallback(mut self, f: impl FnOnce(&dyn Any)) { + if let Some(msg) = self.fallback_arg() { + f(msg); + self.msg.take(); + } + } + + fn empty() -> MessageHandler { + MessageHandler { msg: None } + } + + fn try_into_question(mut self) -> Result<(T, AnswerSender), MessageHandler> { + match self.msg.take() { + Some(SignedMessage { + msg: + Msg(MsgInner::Ask { + msg, + sender: Some(sender), + }), + .. + }) if msg.is::() => { + let msg: Box = msg; + Ok((*msg.downcast::().unwrap(), sender)) + } + + _ => Err(self), + } + } + + fn fallback_arg(&self) -> Option<&dyn Any> { + let msg = match self.msg.as_ref()?.msg.0 { + MsgInner::Broadcast(ref msg) => msg.as_ref(), + MsgInner::Tell(ref msg) => msg.as_ref(), + MsgInner::Ask { ref msg, .. } => msg.as_ref(), + }; + + Some(msg) + } +} + +impl Drop for MessageHandler { + fn drop(&mut self) { + if let Some(msg) = self.fallback_arg() { + panic!("Unhandled message: {:#?}", msg); + } + } +} From 2801fda3293edba66c29b72634e03b53819c3dde Mon Sep 17 00:00:00 2001 From: Sasha Pourcelot Date: Wed, 3 Feb 2021 23:58:59 +0100 Subject: [PATCH 2/9] Make AnswerSender carry its own signature. This allows us not to trust caller of AnswerSender::reply to provide a correct signature. As such, the corresponding method can be documented. This is necessary because such method may be called in the closure that are passed to MessageHandler::with_question. Note: this commit renames AnswerSender::send to AnswerSender::respond, and removes the signature part. This method is public but not documented. As such, this theorically breaking change should not break any code. --- src/bastion/src/child_ref.rs | 2 +- src/bastion/src/context.rs | 2 +- src/bastion/src/message.rs | 32 +++++++++++++++++++------------- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/bastion/src/child_ref.rs b/src/bastion/src/child_ref.rs index 0dcfedf3..689c5668 100644 --- a/src/bastion/src/child_ref.rs +++ b/src/bastion/src/child_ref.rs @@ -306,7 +306,7 @@ impl ChildRef { /// [`Answer`]: message/struct.Answer.html pub fn ask_anonymously(&self, msg: M) -> Result { debug!("ChildRef({}): Asking message: {:?}", self.id(), msg); - let (msg, answer) = BastionMessage::ask(msg); + let (msg, answer) = BastionMessage::ask(msg, self.addr()); let env = Envelope::from_dead_letters(msg); // FIXME: panics? self.send(env).map_err(|env| env.into_msg().unwrap())?; diff --git a/src/bastion/src/context.rs b/src/bastion/src/context.rs index 8bd76b4c..46071385 100644 --- a/src/bastion/src/context.rs +++ b/src/bastion/src/context.rs @@ -738,7 +738,7 @@ impl BastionContext { msg, to ); - let (msg, answer) = BastionMessage::ask(msg); + let (msg, answer) = BastionMessage::ask(msg, self.signature()); let env = Envelope::new_with_sign(msg, self.signature()); // FIXME: panics? to.sender() diff --git a/src/bastion/src/message.rs b/src/bastion/src/message.rs index 6d0f800f..c827d0c6 100644 --- a/src/bastion/src/message.rs +++ b/src/bastion/src/message.rs @@ -33,9 +33,12 @@ use tracing::{debug, trace}; pub trait Message: Any + Send + Sync + Debug {} impl Message for T where T: Any + Send + Sync + Debug {} +/// Allows to respond to questions. +/// +/// This type features the [`respond`] method, that allows to respond to a +/// question. #[derive(Debug)] -#[doc(hidden)] -pub struct AnswerSender(oneshot::Sender); +pub struct AnswerSender(oneshot::Sender, RefAddr); #[derive(Debug)] /// A [`Future`] returned when successfully "asking" a @@ -260,14 +263,17 @@ pub(crate) enum Deployment { } impl AnswerSender { - // FIXME: we can't let manipulating Signature in a public API - // but now it's being called only by a macro so we are trusting it - #[doc(hidden)] - pub fn send(self, msg: M, sign: RefAddr) -> Result<(), M> { + /// Sends data back to the original sender. + /// + /// Returns `Ok` if the data was sent successfully, otherwise returns the + /// original data. + pub fn reply(self, msg: M) -> Result<(), M> { debug!("{:?}: Sending answer: {:?}", self, msg); let msg = Msg::tell(msg); trace!("{:?}: Sending message: {:?}", self, msg); - self.0 + + let AnswerSender(sender, sign) = self; + sender .send(SignedMessage::new(msg, sign)) .map_err(|smsg| smsg.msg.try_unwrap().unwrap()) } @@ -284,10 +290,10 @@ impl Msg { Msg(inner) } - pub(crate) fn ask(msg: M) -> (Self, Answer) { + pub(crate) fn ask(msg: M, sign: RefAddr) -> (Self, Answer) { let msg = Box::new(msg); let (sender, recver) = oneshot::channel(); - let sender = AnswerSender(sender); + let sender = AnswerSender(sender, sign); let answer = Answer(recver); let sender = Some(sender); @@ -459,8 +465,8 @@ impl BastionMessage { BastionMessage::Message(msg) } - pub(crate) fn ask(msg: M) -> (Self, Answer) { - let (msg, answer) = Msg::ask(msg); + pub(crate) fn ask(msg: M, sign: RefAddr) -> (Self, Answer) { + let (msg, answer) = Msg::ask(msg, sign); (BastionMessage::Message(msg), answer) } @@ -767,7 +773,7 @@ macro_rules! msg { ($ctx:expr, $answer:expr) => { { let sign = $ctx.signature(); - sender.send($answer, sign) + sender.reply($answer) } }; } @@ -862,7 +868,7 @@ macro_rules! answer { ($msg:expr, $answer:expr) => {{ let (mut msg, sign) = $msg.extract(); let sender = msg.take_sender().expect("failed to take render"); - sender.send($answer, sign) + sender.reply($answer) }}; } From 5d9865b47923d8a598a5ef9a359dfe4850204414 Mon Sep 17 00:00:00 2001 From: Sasha Pourcelot Date: Fri, 5 Feb 2021 10:18:26 +0100 Subject: [PATCH 3/9] Add on_* functions This allows us to match on both regular messages (the ones we can't respond to) as well as the broadcasts. It follows the same model established previously. --- src/bastion/src/message.rs | 61 +++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/src/bastion/src/message.rs b/src/bastion/src/message.rs index c827d0c6..11acf50b 100644 --- a/src/bastion/src/message.rs +++ b/src/bastion/src/message.rs @@ -897,13 +897,44 @@ impl MessageHandler { } } - pub fn on_fallback(mut self, f: impl FnOnce(&dyn Any)) { + pub fn on_fallback(mut self, f: F) + where + F: FnOnce(&dyn Any), + { if let Some(msg) = self.fallback_arg() { f(msg); self.msg.take(); } } + pub fn on_broadcast(self, f: F) -> MessageHandler + where + T: 'static + Send + Sync, + F: FnOnce(&T), + { + match self.try_into_broadcast::() { + Ok(arg) => { + f(arg.as_ref()); + MessageHandler::empty() + } + Err(this) => this, + } + } + + pub fn on_tell(self, f: F) -> MessageHandler + where + T: 'static, + F: FnOnce(T), + { + match self.try_into_tell::() { + Ok(msg) => { + f(msg); + MessageHandler::empty() + } + Err(this) => this, + } + } + fn empty() -> MessageHandler { MessageHandler { msg: None } } @@ -926,6 +957,34 @@ impl MessageHandler { } } + fn try_into_broadcast(mut self) -> Result, MessageHandler> { + match self.msg.take() { + Some(SignedMessage { + msg: Msg(MsgInner::Broadcast(msg)), + .. + }) if msg.is::() => { + let msg: Arc = msg; + Ok(msg.downcast::().unwrap()) + } + + _ => Err(self), + } + } + + fn try_into_tell(mut self) -> Result { + match self.msg.take() { + Some(SignedMessage { + msg: Msg(MsgInner::Tell(msg)), + .. + }) if msg.is::() => { + let msg: Box = msg; + Ok(*msg.downcast::().unwrap()) + } + + _ => Err(self), + } + } + fn fallback_arg(&self) -> Option<&dyn Any> { let msg = match self.msg.as_ref()?.msg.0 { MsgInner::Broadcast(ref msg) => msg.as_ref(), From 5c42e29e11f60fa4343659644734affb8bd66ce1 Mon Sep 17 00:00:00 2001 From: Sasha Pourcelot Date: Fri, 5 Feb 2021 17:57:17 +0100 Subject: [PATCH 4/9] Add documentation for MessageHandler API --- src/bastion/src/message.rs | 114 +++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/src/bastion/src/message.rs b/src/bastion/src/message.rs index 11acf50b..2ddb1026 100644 --- a/src/bastion/src/message.rs +++ b/src/bastion/src/message.rs @@ -37,6 +37,8 @@ impl Message for T where T: Any + Send + Sync + Debug {} /// /// This type features the [`respond`] method, that allows to respond to a /// question. +/// +/// [`respond`]: #method.respond #[derive(Debug)] pub struct AnswerSender(oneshot::Sender, RefAddr); @@ -872,17 +874,121 @@ macro_rules! answer { }}; } +/// Matches a [`Msg`] (as returned by [`BastionContext::recv`] +/// or [`BastionContext::try_recv`]) with different types. +/// +/// This type may replace the [`msg!`] macro in the future. +/// +/// The [`new`] function creates a new [`MessageHandler`], which is then +/// matched on with the `on_*` functions. +/// +/// There are different kind of messages: +/// - messages that are broadcasted, which can be matched with the +/// [`on_broadcast`] method, +/// - messages that can be responded to, which are matched with the +/// [`on_question`] method, +/// - messages that can not be responded to, which are matched with +/// [`on_tell`], +/// - fallback case, which matches everything, entitled [`on_fallback`]. +/// +/// The closure passed to the functions described previously must return the +/// same type. This value is retrieved when [`on_fallback`] is invoked. +/// +/// Questions can be responded to by calling [`reply`] on the provided +/// sender. +/// +/// # Example +/// +/// ```rust +/// # use bastion::prelude::*; +/// # use bastion::message::MessageHandler; +/// # +/// # #[cfg(feature = "tokio-runtime")] +/// # #[tokio::main] +/// # async fn main() { +/// # run(); +/// # } +/// # +/// # #[cfg(not(feature = "tokio-runtime"))] +/// # fn main() { +/// # run(); +/// # } +/// # +/// # fn run() { +/// # Bastion::init(); +/// // The message that will be broadcasted... +/// const BCAST_MSG: &'static str = "A message containing data (broadcast)."; +/// // The message that will be "told" to the child... +/// const TELL_MSG: &'static str = "A message containing data (tell)."; +/// // The message that will be "asked" to the child... +/// const ASK_MSG: &'static str = "A message containing data (ask)."; +/// +/// Bastion::children(|children| { +/// children.with_exec(|ctx: BastionContext| { +/// async move { +/// # ctx.tell(&ctx.current().addr(), TELL_MSG).unwrap(); +/// # ctx.ask(&ctx.current().addr(), ASK_MSG).unwrap(); +/// # +/// loop { +/// MessageHandler::new(ctx.recv().await?) +/// // We match on broadcasts of &str +/// .on_broadcast(|msg: &&str| { +/// assert_eq!(*msg, BCAST_MSG); +/// // Handle the message... +/// }) +/// // We match on messages of &str +/// .on_tell(|msg: &str| { +/// assert_eq!(msg, TELL_MSG); +/// // Handle the message... +/// }) +/// // We match on questions of &str +/// .on_question(|msg: &str, sender| { +/// assert_eq!(msg, ASK_MSG); +/// // Handle the message... +/// +/// // ...and eventually answer to it... +/// sender.reply("An answer to the message."); +/// }) +/// // We are only broadcasting, "telling" and "asking" a +/// // `&str` in this example, so we know that this won't +/// // happen... +/// .on_fallback(|msg| ()); +/// } +/// } +/// }) +/// }).expect("Couldn't start the children group."); +/// # +/// # Bastion::start(); +/// # Bastion::broadcast(BCAST_MSG).unwrap(); +/// # Bastion::stop(); +/// # Bastion::block_until_stopped(); +/// # } +/// ``` +/// +/// [`BastionContext::recv`]: crate::context::BastionContext::recv +/// [`BastionContext::try_recv`]: crate::context::BastionContext::try_recv +/// [`new`]: Self::new +/// [`on_broadcast`]: Self::on_broadcast +/// [`on_question`]: Self::on_question +/// [`on_tell`]: Self::on_tell +/// [`on_fallback`]: Self::on_fallback +/// [`reply`]: AnswerSender::reply #[derive(Debug)] pub struct MessageHandler { msg: Option, } impl MessageHandler { + /// Creates a new [`MessageHandler`] with an incoming message. pub fn new(msg: SignedMessage) -> MessageHandler { let msg = Some(msg); MessageHandler { msg } } + /// Matches on a question of a specific type. + /// + /// This will consume the inner data and call `f` if the contained message + /// can be replied to. pub fn on_question(self, f: F) -> MessageHandler where T: 'static, @@ -897,6 +1003,10 @@ impl MessageHandler { } } + /// Calls a fallback function if the message has still not matched yet. + /// + /// This consumes the [`MessageHandler`], so that no matching can be + /// performed anymore. pub fn on_fallback(mut self, f: F) where F: FnOnce(&dyn Any), @@ -907,6 +1017,8 @@ impl MessageHandler { } } + /// Calls a function if the incoming message is a broadcast and has a + /// specific type. pub fn on_broadcast(self, f: F) -> MessageHandler where T: 'static + Send + Sync, @@ -921,6 +1033,8 @@ impl MessageHandler { } } + /// Calls a function if the incoming message can't be replied to and has a + /// specific type. pub fn on_tell(self, f: F) -> MessageHandler where T: 'static, From 6734428dd15f89da45d271ba804470e3cd450317 Mon Sep 17 00:00:00 2001 From: Sasha Pourcelot Date: Fri, 5 Feb 2021 17:57:59 +0100 Subject: [PATCH 5/9] Make sender address available for each on_* function --- src/bastion/src/message.rs | 52 +++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/src/bastion/src/message.rs b/src/bastion/src/message.rs index 2ddb1026..22258d0c 100644 --- a/src/bastion/src/message.rs +++ b/src/bastion/src/message.rs @@ -279,6 +279,11 @@ impl AnswerSender { .send(SignedMessage::new(msg, sign)) .map_err(|smsg| smsg.msg.try_unwrap().unwrap()) } + + /// Returns the sender signature. + pub fn signature(&self) -> &RefAddr { + &self.1 + } } impl Msg { @@ -932,12 +937,12 @@ macro_rules! answer { /// loop { /// MessageHandler::new(ctx.recv().await?) /// // We match on broadcasts of &str -/// .on_broadcast(|msg: &&str| { +/// .on_broadcast(|msg: &&str, _sender_addr| { /// assert_eq!(*msg, BCAST_MSG); /// // Handle the message... /// }) /// // We match on messages of &str -/// .on_tell(|msg: &str| { +/// .on_tell(|msg: &str, _sender_addr| { /// assert_eq!(msg, TELL_MSG); /// // Handle the message... /// }) @@ -952,7 +957,7 @@ macro_rules! answer { /// // We are only broadcasting, "telling" and "asking" a /// // `&str` in this example, so we know that this won't /// // happen... -/// .on_fallback(|msg| ()); +/// .on_fallback(|msg, _sender_addr| ()); /// } /// } /// }) @@ -1009,10 +1014,10 @@ impl MessageHandler { /// performed anymore. pub fn on_fallback(mut self, f: F) where - F: FnOnce(&dyn Any), + F: FnOnce(&dyn Any, RefAddr), { - if let Some(msg) = self.fallback_arg() { - f(msg); + if let Some((msg, addr)) = self.fallback_arg() { + f(msg, addr.clone()); self.msg.take(); } } @@ -1022,11 +1027,11 @@ impl MessageHandler { pub fn on_broadcast(self, f: F) -> MessageHandler where T: 'static + Send + Sync, - F: FnOnce(&T), + F: FnOnce(&T, RefAddr), { match self.try_into_broadcast::() { - Ok(arg) => { - f(arg.as_ref()); + Ok((arg, addr)) => { + f(arg.as_ref(), addr); MessageHandler::empty() } Err(this) => this, @@ -1038,11 +1043,11 @@ impl MessageHandler { pub fn on_tell(self, f: F) -> MessageHandler where T: 'static, - F: FnOnce(T), + F: FnOnce(T, RefAddr), { match self.try_into_tell::() { - Ok(msg) => { - f(msg); + Ok((msg, addr)) => { + f(msg, addr); MessageHandler::empty() } Err(this) => this, @@ -1071,42 +1076,47 @@ impl MessageHandler { } } - fn try_into_broadcast(mut self) -> Result, MessageHandler> { + fn try_into_broadcast( + mut self, + ) -> Result<(Arc, RefAddr), MessageHandler> { match self.msg.take() { Some(SignedMessage { msg: Msg(MsgInner::Broadcast(msg)), - .. + sign, }) if msg.is::() => { let msg: Arc = msg; - Ok(msg.downcast::().unwrap()) + Ok((msg.downcast::().unwrap(), sign)) } _ => Err(self), } } - fn try_into_tell(mut self) -> Result { + fn try_into_tell(mut self) -> Result<(T, RefAddr), MessageHandler> { match self.msg.take() { Some(SignedMessage { msg: Msg(MsgInner::Tell(msg)), - .. + sign, }) if msg.is::() => { let msg: Box = msg; - Ok(*msg.downcast::().unwrap()) + Ok((*msg.downcast::().unwrap(), sign)) } _ => Err(self), } } - fn fallback_arg(&self) -> Option<&dyn Any> { - let msg = match self.msg.as_ref()?.msg.0 { + fn fallback_arg(&self) -> Option<(&dyn Any, &RefAddr)> { + let inner_message = self.msg.as_ref()?; + let addr = inner_message.signature(); + + let msg = match inner_message.msg.0 { MsgInner::Broadcast(ref msg) => msg.as_ref(), MsgInner::Tell(ref msg) => msg.as_ref(), MsgInner::Ask { ref msg, .. } => msg.as_ref(), }; - Some(msg) + Some((msg, addr)) } } From b5d5dfc98505b5d79f8863d9ad26d69d615d7d93 Mon Sep 17 00:00:00 2001 From: Sasha Pourcelot Date: Thu, 4 Feb 2021 23:09:05 +0100 Subject: [PATCH 6/9] Allow MessageHandler to return something Previous implementation of MessageHandler always returned nothing, as it was not considered important. However, returning something is important at least in the fibonacci example. This commit allows the MessageHandler to return some data. It requires every matcher to return the same data type. This data is stored in the MessageHandler and returned by the on_fallback function. --- src/bastion/src/message.rs | 133 ++++++++++++++++++++----------------- 1 file changed, 73 insertions(+), 60 deletions(-) diff --git a/src/bastion/src/message.rs b/src/bastion/src/message.rs index 22258d0c..3d6e9057 100644 --- a/src/bastion/src/message.rs +++ b/src/bastion/src/message.rs @@ -413,6 +413,16 @@ impl Msg { } } +impl AsRef for Msg { + fn as_ref(&self) -> &dyn Any { + match &self.0 { + MsgInner::Broadcast(msg) => msg.as_ref(), + MsgInner::Tell(msg) => msg.as_ref(), + MsgInner::Ask { msg, .. } => msg.as_ref(), + } + } +} + impl BastionMessage { pub(crate) fn start() -> Self { BastionMessage::Start @@ -879,6 +889,28 @@ macro_rules! answer { }}; } +#[derive(Debug)] +enum MessageHandlerState { + Matched(O), + Unmatched(SignedMessage), +} + +impl MessageHandlerState { + fn take_message(self) -> Result { + match self { + MessageHandlerState::Unmatched(msg) => Ok(msg), + MessageHandlerState::Matched(output) => Err(output), + } + } + + fn output_or_else(self, f: impl FnOnce(SignedMessage) -> O) -> O { + match self { + MessageHandlerState::Matched(output) => output, + MessageHandlerState::Unmatched(msg) => f(msg), + } + } +} + /// Matches a [`Msg`] (as returned by [`BastionContext::recv`] /// or [`BastionContext::try_recv`]) with different types. /// @@ -979,30 +1011,30 @@ macro_rules! answer { /// [`on_fallback`]: Self::on_fallback /// [`reply`]: AnswerSender::reply #[derive(Debug)] -pub struct MessageHandler { - msg: Option, +pub struct MessageHandler { + state: MessageHandlerState, } -impl MessageHandler { +impl MessageHandler { /// Creates a new [`MessageHandler`] with an incoming message. - pub fn new(msg: SignedMessage) -> MessageHandler { - let msg = Some(msg); - MessageHandler { msg } + pub fn new(msg: SignedMessage) -> MessageHandler { + let state = MessageHandlerState::Unmatched(msg); + MessageHandler { state } } /// Matches on a question of a specific type. /// /// This will consume the inner data and call `f` if the contained message /// can be replied to. - pub fn on_question(self, f: F) -> MessageHandler + pub fn on_question(self, f: F) -> MessageHandler where T: 'static, - F: FnOnce(T, AnswerSender), + F: FnOnce(T, AnswerSender) -> O, { match self.try_into_question::() { Ok((arg, sender)) => { - f(arg, sender); - MessageHandler::empty() + let val = f(arg, sender); + MessageHandler::matched(val) } Err(this) => this, } @@ -1012,27 +1044,25 @@ impl MessageHandler { /// /// This consumes the [`MessageHandler`], so that no matching can be /// performed anymore. - pub fn on_fallback(mut self, f: F) + pub fn on_fallback(self, f: F) -> O where - F: FnOnce(&dyn Any, RefAddr), + F: FnOnce(&dyn Any, RefAddr) -> O, { - if let Some((msg, addr)) = self.fallback_arg() { - f(msg, addr.clone()); - self.msg.take(); - } + self.state + .output_or_else(|msg| f(msg.msg.as_ref(), msg.signature().clone())) } /// Calls a function if the incoming message is a broadcast and has a /// specific type. - pub fn on_broadcast(self, f: F) -> MessageHandler + pub fn on_broadcast(self, f: F) -> MessageHandler where T: 'static + Send + Sync, - F: FnOnce(&T, RefAddr), + F: FnOnce(&T, RefAddr) -> O, { match self.try_into_broadcast::() { Ok((arg, addr)) => { - f(arg.as_ref(), addr); - MessageHandler::empty() + let val = f(arg.as_ref(), addr); + MessageHandler::matched(val) } Err(this) => this, } @@ -1040,27 +1070,28 @@ impl MessageHandler { /// Calls a function if the incoming message can't be replied to and has a /// specific type. - pub fn on_tell(self, f: F) -> MessageHandler + pub fn on_tell(self, f: F) -> MessageHandler where T: 'static, - F: FnOnce(T, RefAddr), + F: FnOnce(T, RefAddr) -> O, { match self.try_into_tell::() { Ok((msg, addr)) => { - f(msg, addr); - MessageHandler::empty() + let val = f(msg, addr); + MessageHandler::matched(val) } Err(this) => this, } } - fn empty() -> MessageHandler { - MessageHandler { msg: None } + fn matched(output: O) -> MessageHandler { + let state = MessageHandlerState::Matched(output); + MessageHandler { state } } - fn try_into_question(mut self) -> Result<(T, AnswerSender), MessageHandler> { - match self.msg.take() { - Some(SignedMessage { + fn try_into_question(self) -> Result<(T, AnswerSender), MessageHandler> { + match self.state.take_message() { + Ok(SignedMessage { msg: Msg(MsgInner::Ask { msg, @@ -1072,15 +1103,16 @@ impl MessageHandler { Ok((*msg.downcast::().unwrap(), sender)) } - _ => Err(self), + Ok(anything) => Err(MessageHandler::new(anything)), + Err(output) => Err(MessageHandler::matched(output)), } } fn try_into_broadcast( - mut self, - ) -> Result<(Arc, RefAddr), MessageHandler> { - match self.msg.take() { - Some(SignedMessage { + self, + ) -> Result<(Arc, RefAddr), MessageHandler> { + match self.state.take_message() { + Ok(SignedMessage { msg: Msg(MsgInner::Broadcast(msg)), sign, }) if msg.is::() => { @@ -1088,13 +1120,14 @@ impl MessageHandler { Ok((msg.downcast::().unwrap(), sign)) } - _ => Err(self), + Ok(anything) => Err(MessageHandler::new(anything)), + Err(output) => Err(MessageHandler::matched(output)), } } - fn try_into_tell(mut self) -> Result<(T, RefAddr), MessageHandler> { - match self.msg.take() { - Some(SignedMessage { + fn try_into_tell(self) -> Result<(T, RefAddr), MessageHandler> { + match self.state.take_message() { + Ok(SignedMessage { msg: Msg(MsgInner::Tell(msg)), sign, }) if msg.is::() => { @@ -1102,28 +1135,8 @@ impl MessageHandler { Ok((*msg.downcast::().unwrap(), sign)) } - _ => Err(self), - } - } - - fn fallback_arg(&self) -> Option<(&dyn Any, &RefAddr)> { - let inner_message = self.msg.as_ref()?; - let addr = inner_message.signature(); - - let msg = match inner_message.msg.0 { - MsgInner::Broadcast(ref msg) => msg.as_ref(), - MsgInner::Tell(ref msg) => msg.as_ref(), - MsgInner::Ask { ref msg, .. } => msg.as_ref(), - }; - - Some((msg, addr)) - } -} - -impl Drop for MessageHandler { - fn drop(&mut self) { - if let Some(msg) = self.fallback_arg() { - panic!("Unhandled message: {:#?}", msg); + Ok(anything) => Err(MessageHandler::new(anything)), + Err(output) => Err(MessageHandler::matched(output)), } } } From d56c37ebd4c82eea58962aea7bfde0320fc42cea Mon Sep 17 00:00:00 2001 From: Sasha Pourcelot Date: Sat, 6 Feb 2021 18:17:29 +0100 Subject: [PATCH 7/9] Rewrite the fibonacci example with MessageHanlder --- .../examples/fibonacci_message_handler.rs | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 src/bastion/examples/fibonacci_message_handler.rs diff --git a/src/bastion/examples/fibonacci_message_handler.rs b/src/bastion/examples/fibonacci_message_handler.rs new file mode 100644 index 00000000..28dd3fd7 --- /dev/null +++ b/src/bastion/examples/fibonacci_message_handler.rs @@ -0,0 +1,147 @@ +use bastion::{message::MessageHandler, prelude::*}; + +use tracing::{error, info}; + +// This terribly slow implementation +// will allow us to be rough on the cpu +fn fib(n: usize) -> usize { + if n == 0 || n == 1 { + n + } else { + fib(n - 1) + fib(n - 2) + } +} + +// This terrible helper is converting `fib 50` into a tuple ("fib", 50) +// we might want to use actual serializable / deserializable structures +// in the real world +fn deserialize_into_fib_command(message: String) -> (String, usize) { + let arguments: Vec<&str> = message.split(' ').collect(); + let command = arguments.first().map(|s| s.to_string()).unwrap_or_default(); + let number = usize::from_str_radix(arguments.get(1).unwrap_or(&"0"), 10).unwrap_or(0); + (command, number) +} + +// This is the heavylifting. +// A child will wait for a message, and try to process it. +async fn fib_child_task(ctx: BastionContext) -> Result<(), ()> { + loop { + MessageHandler::new(ctx.recv().await?) + .on_question(|request: String, sender| { + let (command, number) = deserialize_into_fib_command(request); + if command == "fib" { + sender + .reply(format!("{}", fib(number))) + .expect("couldn't reply :("); + } else { + sender + .reply(format!( + "I'm sorry I didn't understand the task I was supposed to do" + )) + .expect("couldn't reply :("); + } + }) + .on_broadcast(|broadcast: &String, _sender_addr| { + info!("received broadcast: {:?}", *broadcast); + }) + .on_tell(|message: String, _sender_addr| { + info!("someone told me something: {}", message); + }) + .on_fallback(|unknown, _sender_addr| { + error!( + "uh oh, I received a message I didn't understand\n {:?}", + unknown + ); + }); + } +} + +// This little helper allows me to send a request, and get a reply. +// The types are `String` for this quick example, but there's a way for us to do better. +// We will see this in other examples. +async fn request(child: &ChildRef, body: String) -> std::io::Result { + let answer = child + .ask_anonymously(body) + .expect("couldn't perform request"); + + Ok( + MessageHandler::new(answer.await.expect("couldn't receive answer")) + .on_tell(|reply, _sender_addr| reply) + .on_fallback(|unknown, _sender_addr| { + error!( + "uh oh, I received a message I didn't understand: {:?}", + unknown + ); + "".to_string() + }), + ) +} + +// RUST_LOG=info cargo run --example fibonacci_message_handler +fn main() { + // This will allow us to have nice colored logs when we run the program + env_logger::init(); + + // We need a bastion in order to run everything + Bastion::init(); + Bastion::start(); + + // Spawn 4 children that will execute our fibonacci task + let children = + Bastion::children(|children| children.with_redundancy(4).with_exec(fib_child_task)) + .expect("couldn't create children"); + + // Broadcasting 1 message to the children + // Have a look at the console output + // to see 1 log entry for each child! + children + .broadcast("Hello there :)".to_string()) + .expect("Couldn't broadcast to the children."); + + let mut fib_to_compute = 35; + for child in children.elems() { + child + .tell_anonymously("shhh here's a message, don't tell anyone.".to_string()) + .expect("Couldn't whisper to child."); + + let now = std::time::Instant::now(); + // by using run!, we are blocking. + // we could have used spawn! instead, + // to run everything in parallel. + let fib_reply = run!(request(child, format!("fib {}", fib_to_compute))) + .expect("send_command_to_child failed"); + + println!( + "fib({}) = {} - Computed in {}ms", + fib_to_compute, + fib_reply, + now.elapsed().as_millis() + ); + // Let's not go too far with the fib sequence + // Otherwise the computer may take a while! + fib_to_compute += 2; + } + Bastion::stop(); + Bastion::block_until_stopped(); +} + +// Compiling bastion v0.3.5-alpha (/home/ignition/Projects/oss/bastion/src/bastion) +// Finished dev [unoptimized + debuginfo] target(s) in 1.07s +// Running `target/debug/examples/fibonacci` +// [2020-05-08T14:00:53Z INFO bastion::system] System: Initializing. +// [2020-05-08T14:00:53Z INFO bastion::system] System: Launched. +// [2020-05-08T14:00:53Z INFO bastion::system] System: Starting. +// [2020-05-08T14:00:53Z INFO bastion::system] System: Launching Supervisor(00000000-0000-0000-0000-000000000000). +// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone. +// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)" +// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)" +// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)" +// fib(35) = 9227465 - Computed in 78ms +// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)" +// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone. +// fib(37) = 24157817 - Computed in 196ms +// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone. +// fib(39) = 63245986 - Computed in 512ms +// [2020-05-08T14:00:54Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone. +// fib(41) = 165580141 - Computed in 1327ms +// [2020-05-08T14:00:55Z INFO bastion::system] System: Stopping. From ef6a050df4510fd4e9378f7d0557b708f4d7c9fa Mon Sep 17 00:00:00 2001 From: Sasha Pourcelot Date: Sun, 14 Feb 2021 21:54:06 +0100 Subject: [PATCH 8/9] Remove useless clone by destructuring on fallback. This allows us to remove additional code. --- src/bastion/src/message.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/bastion/src/message.rs b/src/bastion/src/message.rs index 3d6e9057..8d36145f 100644 --- a/src/bastion/src/message.rs +++ b/src/bastion/src/message.rs @@ -279,11 +279,6 @@ impl AnswerSender { .send(SignedMessage::new(msg, sign)) .map_err(|smsg| smsg.msg.try_unwrap().unwrap()) } - - /// Returns the sender signature. - pub fn signature(&self) -> &RefAddr { - &self.1 - } } impl Msg { @@ -1049,7 +1044,7 @@ impl MessageHandler { F: FnOnce(&dyn Any, RefAddr) -> O, { self.state - .output_or_else(|msg| f(msg.msg.as_ref(), msg.signature().clone())) + .output_or_else(|SignedMessage { msg, sign }| f(msg.as_ref(), sign)) } /// Calls a function if the incoming message is a broadcast and has a From 04e8b7df345777b1ccbe8c39bda79fc5c415a014 Mon Sep 17 00:00:00 2001 From: Sasha Pourcelot Date: Wed, 17 Feb 2021 19:34:28 +0100 Subject: [PATCH 9/9] Add a proof of concept that we can match over different types using the MessageHandler --- .../message_handler_multiple_types.rs | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 src/bastion/examples/message_handler_multiple_types.rs diff --git a/src/bastion/examples/message_handler_multiple_types.rs b/src/bastion/examples/message_handler_multiple_types.rs new file mode 100644 index 00000000..0f93e09d --- /dev/null +++ b/src/bastion/examples/message_handler_multiple_types.rs @@ -0,0 +1,63 @@ +use bastion::message::MessageHandler; +use bastion::prelude::*; +use std::fmt::Debug; +use tracing::error; + +// This example shows that it is possible to use the MessageHandler to match +// over different types of message. + +async fn child_task(ctx: BastionContext) -> Result<(), ()> { + loop { + MessageHandler::new(ctx.recv().await?) + .on_question(|n: i32, sender| { + if n == 42 { + sender.reply(101).expect("Failed to reply to sender"); + } else { + error!("Expected number `42`, found `{}`", n); + } + }) + .on_question(|s: &str, sender| { + if s == "marco" { + sender.reply("polo").expect("Failed to reply to sender"); + } else { + panic!("Expected string `marco`, found `{}`", s); + } + }) + .on_fallback(|v, addr| panic!("Wrong message from {:?}: got {:?}", addr, v)) + } +} + +async fn request( + child: &ChildRef, + body: T, +) -> std::io::Result<()> { + let answer = child + .ask_anonymously(body) + .expect("Couldn't perform request") + .await + .expect("Couldn't receive answer"); + + MessageHandler::new(answer) + .on_tell(|n: i32, _| assert_eq!(n, 101)) + .on_tell(|s: &str, _| assert_eq!(s, "polo")) + .on_fallback(|_, _| panic!("Unknown message")); + + Ok(()) +} + +fn main() { + env_logger::init(); + + Bastion::init(); + Bastion::start(); + + let children = + Bastion::children(|c| c.with_exec(child_task)).expect("Failed to spawn children"); + + let child = &children.elems()[0]; + + run!(request(child, 42)).unwrap(); + run!(request(child, "marco")).unwrap(); + + // run!(request(child, "foo")).unwrap(); +}