Skip to content

Commit

Permalink
Add MessageHandler (#309)
Browse files Browse the repository at this point in the history
* Add initial implementation for MessageHandler

This commit adds very basic implementation of MessageHandler. This structure
stores a SignedMessage, and calls, depending on which kind of message it is, and
on its underlying type, either a specified closure, or a fallback one.

The goal is to provide an API that would be nicer to work with than the msg!
macro.

Current implementation features a state-machine like algorithm and currently
only handles messages that can responded to (aka "questions").

* Make AnswerSender carry its own signature.

This allows us not to trust caller of AnswerSender::reply to provide a
correct signature. As such, the corresponding method can be documented.

This is necessary because such method may be called in the closure that
are passed to MessageHandler::with_question.

Note: this commit renames AnswerSender::send to AnswerSender::respond, and
removes the signature part. This method is public but not documented. As
such, this theorically breaking change should not break any code.

* Add on_* functions

This allows us to match on both regular messages (the ones we can't
respond to) as well as the broadcasts. It follows the same model
established previously.

* Add documentation for MessageHandler API

* Make sender address available for each on_* function

* Allow MessageHandler to return something

Previous implementation of MessageHandler always returned nothing, as it
was not considered important. However, returning something is important
at least in the fibonacci example.

This commit allows the MessageHandler to return some data. It requires
every matcher to return the same data type. This data is stored in the
MessageHandler and returned by the on_fallback function.

* Rewrite the fibonacci example with MessageHanlder

* Remove useless clone by destructuring on fallback.

This allows us to remove additional code.

* Add a proof of concept that we can match over different types using the MessageHandler

Co-authored-by: Jeremy Lempereur <[email protected]>
  • Loading branch information
scrabsha and o0Ignition0o authored Mar 2, 2021
1 parent 8d6707f commit 835be05
Show file tree
Hide file tree
Showing 5 changed files with 495 additions and 15 deletions.
147 changes: 147 additions & 0 deletions src/bastion/examples/fibonacci_message_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use bastion::{message::MessageHandler, prelude::*};

use tracing::{error, info};

// This terribly slow implementation
// will allow us to be rough on the cpu
fn fib(n: usize) -> usize {
if n == 0 || n == 1 {
n
} else {
fib(n - 1) + fib(n - 2)
}
}

// This terrible helper is converting `fib 50` into a tuple ("fib", 50)
// we might want to use actual serializable / deserializable structures
// in the real world
fn deserialize_into_fib_command(message: String) -> (String, usize) {
let arguments: Vec<&str> = message.split(' ').collect();
let command = arguments.first().map(|s| s.to_string()).unwrap_or_default();
let number = usize::from_str_radix(arguments.get(1).unwrap_or(&"0"), 10).unwrap_or(0);
(command, number)
}

// This is the heavylifting.
// A child will wait for a message, and try to process it.
async fn fib_child_task(ctx: BastionContext) -> Result<(), ()> {
loop {
MessageHandler::new(ctx.recv().await?)
.on_question(|request: String, sender| {
let (command, number) = deserialize_into_fib_command(request);
if command == "fib" {
sender
.reply(format!("{}", fib(number)))
.expect("couldn't reply :(");
} else {
sender
.reply(format!(
"I'm sorry I didn't understand the task I was supposed to do"
))
.expect("couldn't reply :(");
}
})
.on_broadcast(|broadcast: &String, _sender_addr| {
info!("received broadcast: {:?}", *broadcast);
})
.on_tell(|message: String, _sender_addr| {
info!("someone told me something: {}", message);
})
.on_fallback(|unknown, _sender_addr| {
error!(
"uh oh, I received a message I didn't understand\n {:?}",
unknown
);
});
}
}

// This little helper allows me to send a request, and get a reply.
// The types are `String` for this quick example, but there's a way for us to do better.
// We will see this in other examples.
async fn request(child: &ChildRef, body: String) -> std::io::Result<String> {
let answer = child
.ask_anonymously(body)
.expect("couldn't perform request");

Ok(
MessageHandler::new(answer.await.expect("couldn't receive answer"))
.on_tell(|reply, _sender_addr| reply)
.on_fallback(|unknown, _sender_addr| {
error!(
"uh oh, I received a message I didn't understand: {:?}",
unknown
);
"".to_string()
}),
)
}

// RUST_LOG=info cargo run --example fibonacci_message_handler
fn main() {
// This will allow us to have nice colored logs when we run the program
env_logger::init();

// We need a bastion in order to run everything
Bastion::init();
Bastion::start();

// Spawn 4 children that will execute our fibonacci task
let children =
Bastion::children(|children| children.with_redundancy(4).with_exec(fib_child_task))
.expect("couldn't create children");

// Broadcasting 1 message to the children
// Have a look at the console output
// to see 1 log entry for each child!
children
.broadcast("Hello there :)".to_string())
.expect("Couldn't broadcast to the children.");

let mut fib_to_compute = 35;
for child in children.elems() {
child
.tell_anonymously("shhh here's a message, don't tell anyone.".to_string())
.expect("Couldn't whisper to child.");

let now = std::time::Instant::now();
// by using run!, we are blocking.
// we could have used spawn! instead,
// to run everything in parallel.
let fib_reply = run!(request(child, format!("fib {}", fib_to_compute)))
.expect("send_command_to_child failed");

println!(
"fib({}) = {} - Computed in {}ms",
fib_to_compute,
fib_reply,
now.elapsed().as_millis()
);
// Let's not go too far with the fib sequence
// Otherwise the computer may take a while!
fib_to_compute += 2;
}
Bastion::stop();
Bastion::block_until_stopped();
}

// Compiling bastion v0.3.5-alpha (/home/ignition/Projects/oss/bastion/src/bastion)
// Finished dev [unoptimized + debuginfo] target(s) in 1.07s
// Running `target/debug/examples/fibonacci`
// [2020-05-08T14:00:53Z INFO bastion::system] System: Initializing.
// [2020-05-08T14:00:53Z INFO bastion::system] System: Launched.
// [2020-05-08T14:00:53Z INFO bastion::system] System: Starting.
// [2020-05-08T14:00:53Z INFO bastion::system] System: Launching Supervisor(00000000-0000-0000-0000-000000000000).
// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// fib(35) = 9227465 - Computed in 78ms
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// fib(37) = 24157817 - Computed in 196ms
// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// fib(39) = 63245986 - Computed in 512ms
// [2020-05-08T14:00:54Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// fib(41) = 165580141 - Computed in 1327ms
// [2020-05-08T14:00:55Z INFO bastion::system] System: Stopping.
63 changes: 63 additions & 0 deletions src/bastion/examples/message_handler_multiple_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use bastion::message::MessageHandler;
use bastion::prelude::*;
use std::fmt::Debug;
use tracing::error;

// This example shows that it is possible to use the MessageHandler to match
// over different types of message.

async fn child_task(ctx: BastionContext) -> Result<(), ()> {
loop {
MessageHandler::new(ctx.recv().await?)
.on_question(|n: i32, sender| {
if n == 42 {
sender.reply(101).expect("Failed to reply to sender");
} else {
error!("Expected number `42`, found `{}`", n);
}
})
.on_question(|s: &str, sender| {
if s == "marco" {
sender.reply("polo").expect("Failed to reply to sender");
} else {
panic!("Expected string `marco`, found `{}`", s);
}
})
.on_fallback(|v, addr| panic!("Wrong message from {:?}: got {:?}", addr, v))
}
}

async fn request<T: 'static + Debug + Send + Sync>(
child: &ChildRef,
body: T,
) -> std::io::Result<()> {
let answer = child
.ask_anonymously(body)
.expect("Couldn't perform request")
.await
.expect("Couldn't receive answer");

MessageHandler::new(answer)
.on_tell(|n: i32, _| assert_eq!(n, 101))
.on_tell(|s: &str, _| assert_eq!(s, "polo"))
.on_fallback(|_, _| panic!("Unknown message"));

Ok(())
}

fn main() {
env_logger::init();

Bastion::init();
Bastion::start();

let children =
Bastion::children(|c| c.with_exec(child_task)).expect("Failed to spawn children");

let child = &children.elems()[0];

run!(request(child, 42)).unwrap();
run!(request(child, "marco")).unwrap();

// run!(request(child, "foo")).unwrap();
}
2 changes: 1 addition & 1 deletion src/bastion/src/child_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl ChildRef {
///
pub fn ask_anonymously<M: Message>(&self, msg: M) -> Result<Answer, M> {
debug!("ChildRef({}): Asking message: {:?}", self.id(), msg);
let (msg, answer) = BastionMessage::ask(msg);
let (msg, answer) = BastionMessage::ask(msg, self.addr());
let env = Envelope::from_dead_letters(msg);
// FIXME: panics?
self.send(env).map_err(|env| env.into_msg().unwrap())?;
Expand Down
2 changes: 1 addition & 1 deletion src/bastion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ impl BastionContext {
msg,
to
);
let (msg, answer) = BastionMessage::ask(msg);
let (msg, answer) = BastionMessage::ask(msg, self.signature());
let env = Envelope::new_with_sign(msg, self.signature());
// FIXME: panics?
to.sender()
Expand Down
Loading

0 comments on commit 835be05

Please sign in to comment.