diff --git a/src/bastion/examples/fibonacci_message_handler.rs b/src/bastion/examples/fibonacci_message_handler.rs new file mode 100644 index 00000000..28dd3fd7 --- /dev/null +++ b/src/bastion/examples/fibonacci_message_handler.rs @@ -0,0 +1,147 @@ +use bastion::{message::MessageHandler, prelude::*}; + +use tracing::{error, info}; + +// This terribly slow implementation +// will allow us to be rough on the cpu +fn fib(n: usize) -> usize { + if n == 0 || n == 1 { + n + } else { + fib(n - 1) + fib(n - 2) + } +} + +// This terrible helper is converting `fib 50` into a tuple ("fib", 50) +// we might want to use actual serializable / deserializable structures +// in the real world +fn deserialize_into_fib_command(message: String) -> (String, usize) { + let arguments: Vec<&str> = message.split(' ').collect(); + let command = arguments.first().map(|s| s.to_string()).unwrap_or_default(); + let number = usize::from_str_radix(arguments.get(1).unwrap_or(&"0"), 10).unwrap_or(0); + (command, number) +} + +// This is the heavylifting. +// A child will wait for a message, and try to process it. +async fn fib_child_task(ctx: BastionContext) -> Result<(), ()> { + loop { + MessageHandler::new(ctx.recv().await?) + .on_question(|request: String, sender| { + let (command, number) = deserialize_into_fib_command(request); + if command == "fib" { + sender + .reply(format!("{}", fib(number))) + .expect("couldn't reply :("); + } else { + sender + .reply(format!( + "I'm sorry I didn't understand the task I was supposed to do" + )) + .expect("couldn't reply :("); + } + }) + .on_broadcast(|broadcast: &String, _sender_addr| { + info!("received broadcast: {:?}", *broadcast); + }) + .on_tell(|message: String, _sender_addr| { + info!("someone told me something: {}", message); + }) + .on_fallback(|unknown, _sender_addr| { + error!( + "uh oh, I received a message I didn't understand\n {:?}", + unknown + ); + }); + } +} + +// This little helper allows me to send a request, and get a reply. +// The types are `String` for this quick example, but there's a way for us to do better. +// We will see this in other examples. +async fn request(child: &ChildRef, body: String) -> std::io::Result { + let answer = child + .ask_anonymously(body) + .expect("couldn't perform request"); + + Ok( + MessageHandler::new(answer.await.expect("couldn't receive answer")) + .on_tell(|reply, _sender_addr| reply) + .on_fallback(|unknown, _sender_addr| { + error!( + "uh oh, I received a message I didn't understand: {:?}", + unknown + ); + "".to_string() + }), + ) +} + +// RUST_LOG=info cargo run --example fibonacci_message_handler +fn main() { + // This will allow us to have nice colored logs when we run the program + env_logger::init(); + + // We need a bastion in order to run everything + Bastion::init(); + Bastion::start(); + + // Spawn 4 children that will execute our fibonacci task + let children = + Bastion::children(|children| children.with_redundancy(4).with_exec(fib_child_task)) + .expect("couldn't create children"); + + // Broadcasting 1 message to the children + // Have a look at the console output + // to see 1 log entry for each child! + children + .broadcast("Hello there :)".to_string()) + .expect("Couldn't broadcast to the children."); + + let mut fib_to_compute = 35; + for child in children.elems() { + child + .tell_anonymously("shhh here's a message, don't tell anyone.".to_string()) + .expect("Couldn't whisper to child."); + + let now = std::time::Instant::now(); + // by using run!, we are blocking. + // we could have used spawn! instead, + // to run everything in parallel. + let fib_reply = run!(request(child, format!("fib {}", fib_to_compute))) + .expect("send_command_to_child failed"); + + println!( + "fib({}) = {} - Computed in {}ms", + fib_to_compute, + fib_reply, + now.elapsed().as_millis() + ); + // Let's not go too far with the fib sequence + // Otherwise the computer may take a while! + fib_to_compute += 2; + } + Bastion::stop(); + Bastion::block_until_stopped(); +} + +// Compiling bastion v0.3.5-alpha (/home/ignition/Projects/oss/bastion/src/bastion) +// Finished dev [unoptimized + debuginfo] target(s) in 1.07s +// Running `target/debug/examples/fibonacci` +// [2020-05-08T14:00:53Z INFO bastion::system] System: Initializing. +// [2020-05-08T14:00:53Z INFO bastion::system] System: Launched. +// [2020-05-08T14:00:53Z INFO bastion::system] System: Starting. +// [2020-05-08T14:00:53Z INFO bastion::system] System: Launching Supervisor(00000000-0000-0000-0000-000000000000). +// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone. +// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)" +// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)" +// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)" +// fib(35) = 9227465 - Computed in 78ms +// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)" +// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone. +// fib(37) = 24157817 - Computed in 196ms +// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone. +// fib(39) = 63245986 - Computed in 512ms +// [2020-05-08T14:00:54Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone. +// fib(41) = 165580141 - Computed in 1327ms +// [2020-05-08T14:00:55Z INFO bastion::system] System: Stopping. diff --git a/src/bastion/examples/message_handler_multiple_types.rs b/src/bastion/examples/message_handler_multiple_types.rs new file mode 100644 index 00000000..0f93e09d --- /dev/null +++ b/src/bastion/examples/message_handler_multiple_types.rs @@ -0,0 +1,63 @@ +use bastion::message::MessageHandler; +use bastion::prelude::*; +use std::fmt::Debug; +use tracing::error; + +// This example shows that it is possible to use the MessageHandler to match +// over different types of message. + +async fn child_task(ctx: BastionContext) -> Result<(), ()> { + loop { + MessageHandler::new(ctx.recv().await?) + .on_question(|n: i32, sender| { + if n == 42 { + sender.reply(101).expect("Failed to reply to sender"); + } else { + error!("Expected number `42`, found `{}`", n); + } + }) + .on_question(|s: &str, sender| { + if s == "marco" { + sender.reply("polo").expect("Failed to reply to sender"); + } else { + panic!("Expected string `marco`, found `{}`", s); + } + }) + .on_fallback(|v, addr| panic!("Wrong message from {:?}: got {:?}", addr, v)) + } +} + +async fn request( + child: &ChildRef, + body: T, +) -> std::io::Result<()> { + let answer = child + .ask_anonymously(body) + .expect("Couldn't perform request") + .await + .expect("Couldn't receive answer"); + + MessageHandler::new(answer) + .on_tell(|n: i32, _| assert_eq!(n, 101)) + .on_tell(|s: &str, _| assert_eq!(s, "polo")) + .on_fallback(|_, _| panic!("Unknown message")); + + Ok(()) +} + +fn main() { + env_logger::init(); + + Bastion::init(); + Bastion::start(); + + let children = + Bastion::children(|c| c.with_exec(child_task)).expect("Failed to spawn children"); + + let child = &children.elems()[0]; + + run!(request(child, 42)).unwrap(); + run!(request(child, "marco")).unwrap(); + + // run!(request(child, "foo")).unwrap(); +} diff --git a/src/bastion/src/child_ref.rs b/src/bastion/src/child_ref.rs index e159bf6b..0ecef052 100644 --- a/src/bastion/src/child_ref.rs +++ b/src/bastion/src/child_ref.rs @@ -305,7 +305,7 @@ impl ChildRef { /// pub fn ask_anonymously(&self, msg: M) -> Result { debug!("ChildRef({}): Asking message: {:?}", self.id(), msg); - let (msg, answer) = BastionMessage::ask(msg); + let (msg, answer) = BastionMessage::ask(msg, self.addr()); let env = Envelope::from_dead_letters(msg); // FIXME: panics? self.send(env).map_err(|env| env.into_msg().unwrap())?; diff --git a/src/bastion/src/context.rs b/src/bastion/src/context.rs index 3c0311b2..a2d563cf 100644 --- a/src/bastion/src/context.rs +++ b/src/bastion/src/context.rs @@ -730,7 +730,7 @@ impl BastionContext { msg, to ); - let (msg, answer) = BastionMessage::ask(msg); + let (msg, answer) = BastionMessage::ask(msg, self.signature()); let env = Envelope::new_with_sign(msg, self.signature()); // FIXME: panics? to.sender() diff --git a/src/bastion/src/message.rs b/src/bastion/src/message.rs index a5749822..796936d8 100644 --- a/src/bastion/src/message.rs +++ b/src/bastion/src/message.rs @@ -33,9 +33,14 @@ use tracing::{debug, trace}; pub trait Message: Any + Send + Sync + Debug {} impl Message for T where T: Any + Send + Sync + Debug {} +/// Allows to respond to questions. +/// +/// This type features the [`respond`] method, that allows to respond to a +/// question. +/// +/// [`respond`]: #method.respond #[derive(Debug)] -#[doc(hidden)] -pub struct AnswerSender(oneshot::Sender); +pub struct AnswerSender(oneshot::Sender, RefAddr); #[derive(Debug)] /// A [`Future`] returned when successfully "asking" a @@ -257,14 +262,17 @@ pub(crate) enum Deployment { } impl AnswerSender { - // FIXME: we can't let manipulating Signature in a public API - // but now it's being called only by a macro so we are trusting it - #[doc(hidden)] - pub fn send(self, msg: M, sign: RefAddr) -> Result<(), M> { + /// Sends data back to the original sender. + /// + /// Returns `Ok` if the data was sent successfully, otherwise returns the + /// original data. + pub fn reply(self, msg: M) -> Result<(), M> { debug!("{:?}: Sending answer: {:?}", self, msg); let msg = Msg::tell(msg); trace!("{:?}: Sending message: {:?}", self, msg); - self.0 + + let AnswerSender(sender, sign) = self; + sender .send(SignedMessage::new(msg, sign)) .map_err(|smsg| smsg.msg.try_unwrap().unwrap()) } @@ -281,10 +289,10 @@ impl Msg { Msg(inner) } - pub(crate) fn ask(msg: M) -> (Self, Answer) { + pub(crate) fn ask(msg: M, sign: RefAddr) -> (Self, Answer) { let msg = Box::new(msg); let (sender, recver) = oneshot::channel(); - let sender = AnswerSender(sender); + let sender = AnswerSender(sender, sign); let answer = Answer(recver); let sender = Some(sender); @@ -397,6 +405,16 @@ impl Msg { } } +impl AsRef for Msg { + fn as_ref(&self) -> &dyn Any { + match &self.0 { + MsgInner::Broadcast(msg) => msg.as_ref(), + MsgInner::Tell(msg) => msg.as_ref(), + MsgInner::Ask { msg, .. } => msg.as_ref(), + } + } +} + impl BastionMessage { pub(crate) fn start() -> Self { BastionMessage::Start @@ -456,8 +474,8 @@ impl BastionMessage { BastionMessage::Message(msg) } - pub(crate) fn ask(msg: M) -> (Self, Answer) { - let (msg, answer) = Msg::ask(msg); + pub(crate) fn ask(msg: M, sign: RefAddr) -> (Self, Answer) { + let (msg, answer) = Msg::ask(msg, sign); (BastionMessage::Message(msg), answer) } @@ -763,7 +781,7 @@ macro_rules! msg { ($ctx:expr, $answer:expr) => { { let sign = $ctx.signature(); - sender.send($answer, sign) + sender.reply($answer) } }; } @@ -858,6 +876,258 @@ macro_rules! answer { ($msg:expr, $answer:expr) => {{ let (mut msg, sign) = $msg.extract(); let sender = msg.take_sender().expect("failed to take render"); - sender.send($answer, sign) + sender.reply($answer) }}; } + +#[derive(Debug)] +enum MessageHandlerState { + Matched(O), + Unmatched(SignedMessage), +} + +impl MessageHandlerState { + fn take_message(self) -> Result { + match self { + MessageHandlerState::Unmatched(msg) => Ok(msg), + MessageHandlerState::Matched(output) => Err(output), + } + } + + fn output_or_else(self, f: impl FnOnce(SignedMessage) -> O) -> O { + match self { + MessageHandlerState::Matched(output) => output, + MessageHandlerState::Unmatched(msg) => f(msg), + } + } +} + +/// Matches a [`Msg`] (as returned by [`BastionContext::recv`] +/// or [`BastionContext::try_recv`]) with different types. +/// +/// This type may replace the [`msg!`] macro in the future. +/// +/// The [`new`] function creates a new [`MessageHandler`], which is then +/// matched on with the `on_*` functions. +/// +/// There are different kind of messages: +/// - messages that are broadcasted, which can be matched with the +/// [`on_broadcast`] method, +/// - messages that can be responded to, which are matched with the +/// [`on_question`] method, +/// - messages that can not be responded to, which are matched with +/// [`on_tell`], +/// - fallback case, which matches everything, entitled [`on_fallback`]. +/// +/// The closure passed to the functions described previously must return the +/// same type. This value is retrieved when [`on_fallback`] is invoked. +/// +/// Questions can be responded to by calling [`reply`] on the provided +/// sender. +/// +/// # Example +/// +/// ```rust +/// # use bastion::prelude::*; +/// # use bastion::message::MessageHandler; +/// # +/// # #[cfg(feature = "tokio-runtime")] +/// # #[tokio::main] +/// # async fn main() { +/// # run(); +/// # } +/// # +/// # #[cfg(not(feature = "tokio-runtime"))] +/// # fn main() { +/// # run(); +/// # } +/// # +/// # fn run() { +/// # Bastion::init(); +/// // The message that will be broadcasted... +/// const BCAST_MSG: &'static str = "A message containing data (broadcast)."; +/// // The message that will be "told" to the child... +/// const TELL_MSG: &'static str = "A message containing data (tell)."; +/// // The message that will be "asked" to the child... +/// const ASK_MSG: &'static str = "A message containing data (ask)."; +/// +/// Bastion::children(|children| { +/// children.with_exec(|ctx: BastionContext| { +/// async move { +/// # ctx.tell(&ctx.current().addr(), TELL_MSG).unwrap(); +/// # ctx.ask(&ctx.current().addr(), ASK_MSG).unwrap(); +/// # +/// loop { +/// MessageHandler::new(ctx.recv().await?) +/// // We match on broadcasts of &str +/// .on_broadcast(|msg: &&str, _sender_addr| { +/// assert_eq!(*msg, BCAST_MSG); +/// // Handle the message... +/// }) +/// // We match on messages of &str +/// .on_tell(|msg: &str, _sender_addr| { +/// assert_eq!(msg, TELL_MSG); +/// // Handle the message... +/// }) +/// // We match on questions of &str +/// .on_question(|msg: &str, sender| { +/// assert_eq!(msg, ASK_MSG); +/// // Handle the message... +/// +/// // ...and eventually answer to it... +/// sender.reply("An answer to the message."); +/// }) +/// // We are only broadcasting, "telling" and "asking" a +/// // `&str` in this example, so we know that this won't +/// // happen... +/// .on_fallback(|msg, _sender_addr| ()); +/// } +/// } +/// }) +/// }).expect("Couldn't start the children group."); +/// # +/// # Bastion::start(); +/// # Bastion::broadcast(BCAST_MSG).unwrap(); +/// # Bastion::stop(); +/// # Bastion::block_until_stopped(); +/// # } +/// ``` +/// +/// [`BastionContext::recv`]: crate::context::BastionContext::recv +/// [`BastionContext::try_recv`]: crate::context::BastionContext::try_recv +/// [`new`]: Self::new +/// [`on_broadcast`]: Self::on_broadcast +/// [`on_question`]: Self::on_question +/// [`on_tell`]: Self::on_tell +/// [`on_fallback`]: Self::on_fallback +/// [`reply`]: AnswerSender::reply +#[derive(Debug)] +pub struct MessageHandler { + state: MessageHandlerState, +} + +impl MessageHandler { + /// Creates a new [`MessageHandler`] with an incoming message. + pub fn new(msg: SignedMessage) -> MessageHandler { + let state = MessageHandlerState::Unmatched(msg); + MessageHandler { state } + } + + /// Matches on a question of a specific type. + /// + /// This will consume the inner data and call `f` if the contained message + /// can be replied to. + pub fn on_question(self, f: F) -> MessageHandler + where + T: 'static, + F: FnOnce(T, AnswerSender) -> O, + { + match self.try_into_question::() { + Ok((arg, sender)) => { + let val = f(arg, sender); + MessageHandler::matched(val) + } + Err(this) => this, + } + } + + /// Calls a fallback function if the message has still not matched yet. + /// + /// This consumes the [`MessageHandler`], so that no matching can be + /// performed anymore. + pub fn on_fallback(self, f: F) -> O + where + F: FnOnce(&dyn Any, RefAddr) -> O, + { + self.state + .output_or_else(|SignedMessage { msg, sign }| f(msg.as_ref(), sign)) + } + + /// Calls a function if the incoming message is a broadcast and has a + /// specific type. + pub fn on_broadcast(self, f: F) -> MessageHandler + where + T: 'static + Send + Sync, + F: FnOnce(&T, RefAddr) -> O, + { + match self.try_into_broadcast::() { + Ok((arg, addr)) => { + let val = f(arg.as_ref(), addr); + MessageHandler::matched(val) + } + Err(this) => this, + } + } + + /// Calls a function if the incoming message can't be replied to and has a + /// specific type. + pub fn on_tell(self, f: F) -> MessageHandler + where + T: 'static, + F: FnOnce(T, RefAddr) -> O, + { + match self.try_into_tell::() { + Ok((msg, addr)) => { + let val = f(msg, addr); + MessageHandler::matched(val) + } + Err(this) => this, + } + } + + fn matched(output: O) -> MessageHandler { + let state = MessageHandlerState::Matched(output); + MessageHandler { state } + } + + fn try_into_question(self) -> Result<(T, AnswerSender), MessageHandler> { + match self.state.take_message() { + Ok(SignedMessage { + msg: + Msg(MsgInner::Ask { + msg, + sender: Some(sender), + }), + .. + }) if msg.is::() => { + let msg: Box = msg; + Ok((*msg.downcast::().unwrap(), sender)) + } + + Ok(anything) => Err(MessageHandler::new(anything)), + Err(output) => Err(MessageHandler::matched(output)), + } + } + + fn try_into_broadcast( + self, + ) -> Result<(Arc, RefAddr), MessageHandler> { + match self.state.take_message() { + Ok(SignedMessage { + msg: Msg(MsgInner::Broadcast(msg)), + sign, + }) if msg.is::() => { + let msg: Arc = msg; + Ok((msg.downcast::().unwrap(), sign)) + } + + Ok(anything) => Err(MessageHandler::new(anything)), + Err(output) => Err(MessageHandler::matched(output)), + } + } + + fn try_into_tell(self) -> Result<(T, RefAddr), MessageHandler> { + match self.state.take_message() { + Ok(SignedMessage { + msg: Msg(MsgInner::Tell(msg)), + sign, + }) if msg.is::() => { + let msg: Box = msg; + Ok((*msg.downcast::().unwrap(), sign)) + } + + Ok(anything) => Err(MessageHandler::new(anything)), + Err(output) => Err(MessageHandler::matched(output)), + } + } +}