Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MessageHandler #309

Merged
merged 10 commits into from
Mar 2, 2021
147 changes: 147 additions & 0 deletions src/bastion/examples/fibonacci_message_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use bastion::{message::MessageHandler, prelude::*};

use tracing::{error, info};

// This terribly slow implementation
// will allow us to be rough on the cpu
fn fib(n: usize) -> usize {
if n == 0 || n == 1 {
n
} else {
fib(n - 1) + fib(n - 2)
}
}

// This terrible helper is converting `fib 50` into a tuple ("fib", 50)
// we might want to use actual serializable / deserializable structures
// in the real world
fn deserialize_into_fib_command(message: String) -> (String, usize) {
let arguments: Vec<&str> = message.split(' ').collect();
let command = arguments.first().map(|s| s.to_string()).unwrap_or_default();
let number = usize::from_str_radix(arguments.get(1).unwrap_or(&"0"), 10).unwrap_or(0);
(command, number)
}

// This is the heavylifting.
// A child will wait for a message, and try to process it.
async fn fib_child_task(ctx: BastionContext) -> Result<(), ()> {
loop {
MessageHandler::new(ctx.recv().await?)
.on_question(|request: String, sender| {
let (command, number) = deserialize_into_fib_command(request);
if command == "fib" {
sender
.reply(format!("{}", fib(number)))
.expect("couldn't reply :(");
} else {
sender
.reply(format!(
"I'm sorry I didn't understand the task I was supposed to do"
))
.expect("couldn't reply :(");
}
})
.on_broadcast(|broadcast: &String, _sender_addr| {
info!("received broadcast: {:?}", *broadcast);
})
.on_tell(|message: String, _sender_addr| {
info!("someone told me something: {}", message);
})
.on_fallback(|unknown, _sender_addr| {
error!(
"uh oh, I received a message I didn't understand\n {:?}",
unknown
);
});
}
}

// This little helper allows me to send a request, and get a reply.
// The types are `String` for this quick example, but there's a way for us to do better.
// We will see this in other examples.
async fn request(child: &ChildRef, body: String) -> std::io::Result<String> {
let answer = child
.ask_anonymously(body)
.expect("couldn't perform request");

Ok(
MessageHandler::new(answer.await.expect("couldn't receive answer"))
.on_tell(|reply, _sender_addr| reply)
.on_fallback(|unknown, _sender_addr| {
error!(
"uh oh, I received a message I didn't understand: {:?}",
unknown
);
"".to_string()
}),
)
}

// RUST_LOG=info cargo run --example fibonacci_message_handler
fn main() {
// This will allow us to have nice colored logs when we run the program
env_logger::init();

// We need a bastion in order to run everything
Bastion::init();
Bastion::start();

// Spawn 4 children that will execute our fibonacci task
let children =
Bastion::children(|children| children.with_redundancy(4).with_exec(fib_child_task))
.expect("couldn't create children");

// Broadcasting 1 message to the children
// Have a look at the console output
// to see 1 log entry for each child!
children
.broadcast("Hello there :)".to_string())
.expect("Couldn't broadcast to the children.");

let mut fib_to_compute = 35;
for child in children.elems() {
child
.tell_anonymously("shhh here's a message, don't tell anyone.".to_string())
.expect("Couldn't whisper to child.");

let now = std::time::Instant::now();
// by using run!, we are blocking.
// we could have used spawn! instead,
// to run everything in parallel.
let fib_reply = run!(request(child, format!("fib {}", fib_to_compute)))
.expect("send_command_to_child failed");

println!(
"fib({}) = {} - Computed in {}ms",
fib_to_compute,
fib_reply,
now.elapsed().as_millis()
);
// Let's not go too far with the fib sequence
// Otherwise the computer may take a while!
fib_to_compute += 2;
}
Bastion::stop();
Bastion::block_until_stopped();
}

// Compiling bastion v0.3.5-alpha (/home/ignition/Projects/oss/bastion/src/bastion)
// Finished dev [unoptimized + debuginfo] target(s) in 1.07s
// Running `target/debug/examples/fibonacci`
// [2020-05-08T14:00:53Z INFO bastion::system] System: Initializing.
// [2020-05-08T14:00:53Z INFO bastion::system] System: Launched.
// [2020-05-08T14:00:53Z INFO bastion::system] System: Starting.
// [2020-05-08T14:00:53Z INFO bastion::system] System: Launching Supervisor(00000000-0000-0000-0000-000000000000).
// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// fib(35) = 9227465 - Computed in 78ms
// [2020-05-08T14:00:53Z INFO fibonacci] received broadcast: "Hello there :)"
// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// fib(37) = 24157817 - Computed in 196ms
// [2020-05-08T14:00:53Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// fib(39) = 63245986 - Computed in 512ms
// [2020-05-08T14:00:54Z INFO fibonacci] someone told me something: shhh here's a message, don't tell anyone.
// fib(41) = 165580141 - Computed in 1327ms
// [2020-05-08T14:00:55Z INFO bastion::system] System: Stopping.
63 changes: 63 additions & 0 deletions src/bastion/examples/message_handler_multiple_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use bastion::message::MessageHandler;
use bastion::prelude::*;
use std::fmt::Debug;
use tracing::error;

// This example shows that it is possible to use the MessageHandler to match
// over different types of message.

async fn child_task(ctx: BastionContext) -> Result<(), ()> {
loop {
MessageHandler::new(ctx.recv().await?)
.on_question(|n: i32, sender| {
if n == 42 {
sender.reply(101).expect("Failed to reply to sender");
} else {
error!("Expected number `42`, found `{}`", n);
}
})
.on_question(|s: &str, sender| {
if s == "marco" {
sender.reply("polo").expect("Failed to reply to sender");
} else {
panic!("Expected string `marco`, found `{}`", s);
}
})
.on_fallback(|v, addr| panic!("Wrong message from {:?}: got {:?}", addr, v))
}
}

async fn request<T: 'static + Debug + Send + Sync>(
child: &ChildRef,
body: T,
) -> std::io::Result<()> {
let answer = child
.ask_anonymously(body)
.expect("Couldn't perform request")
.await
.expect("Couldn't receive answer");

MessageHandler::new(answer)
.on_tell(|n: i32, _| assert_eq!(n, 101))
.on_tell(|s: &str, _| assert_eq!(s, "polo"))
.on_fallback(|_, _| panic!("Unknown message"));

Ok(())
}

fn main() {
env_logger::init();

Bastion::init();
Bastion::start();

let children =
Bastion::children(|c| c.with_exec(child_task)).expect("Failed to spawn children");

let child = &children.elems()[0];

run!(request(child, 42)).unwrap();
run!(request(child, "marco")).unwrap();

// run!(request(child, "foo")).unwrap();
}
2 changes: 1 addition & 1 deletion src/bastion/src/child_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl ChildRef {
///
pub fn ask_anonymously<M: Message>(&self, msg: M) -> Result<Answer, M> {
debug!("ChildRef({}): Asking message: {:?}", self.id(), msg);
let (msg, answer) = BastionMessage::ask(msg);
let (msg, answer) = BastionMessage::ask(msg, self.addr());
let env = Envelope::from_dead_letters(msg);
// FIXME: panics?
self.send(env).map_err(|env| env.into_msg().unwrap())?;
Expand Down
2 changes: 1 addition & 1 deletion src/bastion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ impl BastionContext {
msg,
to
);
let (msg, answer) = BastionMessage::ask(msg);
let (msg, answer) = BastionMessage::ask(msg, self.signature());
let env = Envelope::new_with_sign(msg, self.signature());
// FIXME: panics?
to.sender()
Expand Down
Loading