Skip to content

Commit

Permalink
refactor decode/dispatch for new reordering thread
Browse files Browse the repository at this point in the history
  • Loading branch information
github-af committed Jul 12, 2024
1 parent 9f8fb68 commit 4bc0653
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 65 deletions.
9 changes: 4 additions & 5 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,10 @@ impl Message {

impl fmt::Display for Message {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
let msg_type =
match self.message_type() {
Err(e) => format!("UNKNOWN {e}"),
Ok(t) => t.to_string(),
};
let msg_type = match self.message_type() {
Err(e) => format!("UNKNOWN {e}"),
Ok(t) => t.to_string(),
};
write!(
fmt,
"client {:x} message = {} data = {} byte(s)",
Expand Down
51 changes: 16 additions & 35 deletions src/receive/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@
use crate::{protocol, receive};

pub(crate) fn start<F>(
receiver: &receive::Receiver<F>,
nb_decoding_threads: u8,
) -> Result<(), receive::Error> {
pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::Error> {
let encoding_block_size = receiver.object_transmission_info.transfer_length();

loop {
let (block_id, packets) = receiver.for_decoding.recv()?;

let packets = match packets {
None => {
log::warn!("synchronization lost received, propagating");
// Sending lost synchronization signal to reorder thread
receiver.to_reordering.send((block_id, None))?;
continue;
}
Some(packets) => packets,
};

log::trace!(
"trying to decode block {block_id} with {} packets",
packets.len()
Expand All @@ -25,40 +32,14 @@ pub(crate) fn start<F>(
match decoder.decode(packets) {
None => {
log::error!("lost block {block_id}, synchronization lost");
// Sending lost synchronization signal to dispatch
receiver.to_dispatch.send(None)?;
continue;
// Sending lost synchronization signal to reorder thread
receiver.to_reordering.send((block_id, None))?;
}
Some(block) => {
log::trace!("block {block_id} decoded with {} bytes!", block.len());

let mut retried = 0;

loop {
let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock");

if *to_receive == block_id {
// The decoded block is the expected one, dispatching it
receiver
.to_dispatch
.send(Some(protocol::Message::deserialize(block)))?;
*to_receive = to_receive.wrapping_add(1);
break;
} else {
// The decoded block is not the expected one
// Retrying until all decoding threads had one chance to dispatch their block
if nb_decoding_threads < retried {
// All decoding threads should have had one chance to dispatch their block
log::warn!("dropping block {block_id} after trying to dispatch it {retried} times");

// Sending lost synchronization signal to dispatch
receiver.to_dispatch.send(None)?;

break;
}
retried += 1;
}
}
receiver
.to_reordering
.send((block_id, Some(protocol::Message::deserialize(block))))?;
}
}
}
Expand Down
15 changes: 7 additions & 8 deletions src/receive/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,13 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E
continue;
}

let message_type =
match message.message_type() {
Err(e) => {
log::error!("message of UNKNOWN type received ({e}), dropping it");
continue;
}
Ok(mt) => mt,
};
let message_type = match message.message_type() {
Err(e) => {
log::error!("message of UNKNOWN type received ({e}), dropping it");
continue;
}
Ok(mt) => mt,
};

let mut will_end = false;

Expand Down
44 changes: 31 additions & 13 deletions src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ use std::{
io::{self, Write},
net,
os::fd::AsRawFd,
sync, thread, time,
thread, time,
};

mod client;
mod clients;
mod decoding;
mod dispatch;
mod reblock;
mod reordering;
mod udp;

pub struct Config {
Expand Down Expand Up @@ -61,7 +62,8 @@ impl Config {
pub enum Error {
Io(io::Error),
SendPackets(crossbeam_channel::SendError<Vec<raptorq::EncodingPacket>>),
SendBlockPackets(crossbeam_channel::SendError<(u8, Vec<raptorq::EncodingPacket>)>),
SendBlockPackets(crossbeam_channel::SendError<(u8, Option<Vec<raptorq::EncodingPacket>>)>),
SendBlockMessage(crossbeam_channel::SendError<(u8, Option<protocol::Message>)>),
SendMessage(crossbeam_channel::SendError<Option<protocol::Message>>),
SendClients(
crossbeam_channel::SendError<(
Expand All @@ -80,6 +82,7 @@ impl fmt::Display for Error {
Self::Io(e) => write!(fmt, "I/O error: {e}"),
Self::SendPackets(e) => write!(fmt, "crossbeam send packets error: {e}"),
Self::SendBlockPackets(e) => write!(fmt, "crossbeam send block packets error: {e}"),
Self::SendBlockMessage(e) => write!(fmt, "crossbeam send block/message error: {e}"),
Self::SendMessage(e) => write!(fmt, "crossbeam send message error: {e}"),
Self::SendClients(e) => write!(fmt, "crossbeam send client error: {e}"),
Self::Receive(e) => write!(fmt, "crossbeam receive error: {e}"),
Expand All @@ -101,12 +104,18 @@ impl From<crossbeam_channel::SendError<Vec<raptorq::EncodingPacket>>> for Error
}
}

impl From<crossbeam_channel::SendError<(u8, Vec<raptorq::EncodingPacket>)>> for Error {
fn from(e: crossbeam_channel::SendError<(u8, Vec<raptorq::EncodingPacket>)>) -> Self {
impl From<crossbeam_channel::SendError<(u8, Option<Vec<raptorq::EncodingPacket>>)>> for Error {
fn from(e: crossbeam_channel::SendError<(u8, Option<Vec<raptorq::EncodingPacket>>)>) -> Self {
Self::SendBlockPackets(e)
}
}

impl From<crossbeam_channel::SendError<(u8, Option<protocol::Message>)>> for Error {
fn from(oe: crossbeam_channel::SendError<(u8, Option<protocol::Message>)>) -> Self {
Self::SendBlockMessage(oe)
}
}

impl From<crossbeam_channel::SendError<Option<protocol::Message>>> for Error {
fn from(oe: crossbeam_channel::SendError<Option<protocol::Message>>) -> Self {
Self::SendMessage(oe)
Expand Down Expand Up @@ -157,11 +166,14 @@ pub struct Receiver<F> {
pub(crate) to_buffer_size: usize,
pub(crate) from_max_messages: u16,
pub(crate) multiplex_control: semaphore::Semaphore,
pub(crate) block_to_receive: sync::Mutex<u8>,
pub(crate) resync_needed_block_id: crossbeam_utils::atomic::AtomicCell<(bool, u8)>,
pub(crate) to_reblock: crossbeam_channel::Sender<Vec<raptorq::EncodingPacket>>,
pub(crate) for_reblock: crossbeam_channel::Receiver<Vec<raptorq::EncodingPacket>>,
pub(crate) to_decoding: crossbeam_channel::Sender<(u8, Vec<raptorq::EncodingPacket>)>,
pub(crate) for_decoding: crossbeam_channel::Receiver<(u8, Vec<raptorq::EncodingPacket>)>,
pub(crate) to_decoding: crossbeam_channel::Sender<(u8, Option<Vec<raptorq::EncodingPacket>>)>,
pub(crate) for_decoding:
crossbeam_channel::Receiver<(u8, Option<Vec<raptorq::EncodingPacket>>)>,
pub(crate) to_reordering: crossbeam_channel::Sender<(u8, Option<protocol::Message>)>,
pub(crate) for_reordering: crossbeam_channel::Receiver<(u8, Option<protocol::Message>)>,
pub(crate) to_dispatch: crossbeam_channel::Sender<Option<protocol::Message>>,
pub(crate) for_dispatch: crossbeam_channel::Receiver<Option<protocol::Message>>,
pub(crate) to_clients: crossbeam_channel::Sender<(
Expand Down Expand Up @@ -198,12 +210,14 @@ where

let multiplex_control = semaphore::Semaphore::new(config.nb_clients as usize);

let block_to_receive = sync::Mutex::new(0);
let resync_needed_block_id = crossbeam_utils::atomic::AtomicCell::default();

let (to_reblock, for_reblock) =
crossbeam_channel::unbounded::<Vec<raptorq::EncodingPacket>>();
let (to_decoding, for_decoding) =
crossbeam_channel::unbounded::<(u8, Vec<raptorq::EncodingPacket>)>();
crossbeam_channel::unbounded::<(u8, Option<Vec<raptorq::EncodingPacket>>)>();
let (to_reordering, for_reordering) =
crossbeam_channel::unbounded::<(u8, Option<protocol::Message>)>();
let (to_dispatch, for_dispatch) =
crossbeam_channel::unbounded::<Option<protocol::Message>>();

Expand All @@ -218,11 +232,13 @@ where
to_buffer_size,
from_max_messages,
multiplex_control,
block_to_receive,
resync_needed_block_id,
to_reblock,
for_reblock,
to_decoding,
for_decoding,
to_reordering,
for_reordering,
to_dispatch,
for_dispatch,
to_clients,
Expand Down Expand Up @@ -273,12 +289,14 @@ where
.name("dispatch".to_string())
.spawn_scoped(scope, || dispatch::start(self))?;

thread::Builder::new()
.name("reordering".to_string())
.spawn_scoped(scope, || reordering::start(self))?;

for i in 0..self.config.nb_decoding_threads {
thread::Builder::new()
.name(format!("decoding_{i}"))
.spawn_scoped(scope, || {
decoding::start(self, self.config.nb_decoding_threads)
})?;
.spawn_scoped(scope, || decoding::start(self))?;
}

thread::Builder::new()
Expand Down
11 changes: 7 additions & 4 deletions src/receive/reblock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E
// no more traffic but ongoing block, trying to decode
if nb_normal_packets as usize <= qlen {
log::debug!("flushing block {block_id} with {qlen} packets");
receiver.to_decoding.send((block_id, queue))?;
receiver.to_decoding.send((block_id, Some(queue)))?;
block_id = block_id.wrapping_add(1);
} else {
log::debug!(
"not enough packets ({qlen} packets) to decode block {block_id}"
);
log::warn!("lost block {block_id}");
receiver.to_decoding.send((block_id, None))?;
desynchro = true;
}
queue = Vec::with_capacity(capacity);
Expand All @@ -54,7 +55,7 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E

if desynchro {
block_id = message_block_id;
*receiver.block_to_receive.lock().expect("acquire lock") = block_id;
receiver.resync_needed_block_id.store((true, block_id));
desynchro = false;
}

Expand All @@ -70,7 +71,9 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E
pqueue.push(packet);
if nb_normal_packets as usize <= pqueue.len() {
//now there is enough packets to decode it
receiver.to_decoding.send((message_block_id, pqueue))?;
receiver
.to_decoding
.send((message_block_id, Some(pqueue)))?;
prev_queue = None;
} else {
prev_queue = Some(pqueue);
Expand All @@ -88,7 +91,7 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E

if nb_normal_packets as usize <= queue.len() {
//enough packets in the current block to decode it
receiver.to_decoding.send((block_id, queue))?;
receiver.to_decoding.send((block_id, Some(queue)))?;
if prev_queue.is_some() {
log::warn!("lost block {}", block_id.wrapping_sub(1));
}
Expand Down
62 changes: 62 additions & 0 deletions src/receive/reordering.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//! Worker that reorders received messages according to block numbers
use crate::receive;

pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::Error> {
let mut block_to_receive = 0;
let mut pending_messages = [const { None }; u8::MAX as usize + 1];

loop {
let (block_id, message) = receiver.for_reordering.recv()?;

if message.is_none() {
// Synchronization lost, dropping everything
log::warn!("synchronization lost received, dropping everything, propagating it");
pending_messages.fill_with(|| None);
receiver.to_dispatch.send(None)?;
continue;
}

let (resync_needed, resync_block_id) = receiver.resync_needed_block_id.take();

if resync_needed {
log::debug!("forced resynchronization");
if pending_messages.iter().any(Option::is_some) {
log::warn!("forced resynchronization with pending messages, dropping everything");
pending_messages.fill_with(|| None);
}
block_to_receive = resync_block_id;
}

log::debug!("received block {block_id}, expecting block {block_to_receive}");

if block_to_receive == block_id {
let message = if pending_messages[block_to_receive as usize].is_some() {
// a message was already pending
// using the old one, storing the newly received one
pending_messages[block_to_receive as usize]
.replace(message)
.expect("infallible")
} else {
// no message was pending, using the newly received one
message
};

receiver.to_dispatch.send(message)?;
block_to_receive = block_to_receive.wrapping_add(1);

// flushing as much as possible further pending blocks
while let Some(message) = pending_messages[block_to_receive as usize].take() {
receiver.to_dispatch.send(message)?;
block_to_receive = block_to_receive.wrapping_add(1);
}
} else if pending_messages[block_id as usize]
.replace(message)
.is_some()
{
log::error!("received a new block {block_id} but existing one was not sent to dispatch, synchronization lost, dropping everything");
pending_messages.fill_with(|| None);
receiver.to_dispatch.send(None)?;
}
}
}

0 comments on commit 4bc0653

Please sign in to comment.