diff --git a/src/receive/decoding.rs b/src/receive/decoding.rs index 8e1e989..a0c908e 100644 --- a/src/receive/decoding.rs +++ b/src/receive/decoding.rs @@ -1,10 +1,11 @@ //! Worker that decodes RaptorQ packets into protocol messages -use std::{cmp::Ordering, thread::yield_now}; - use crate::{protocol, receive}; -pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::Error> { +pub(crate) fn start( + receiver: &receive::Receiver, + nb_decoding_threads: u8, +) -> Result<(), receive::Error> { let encoding_block_size = receiver.object_transmission_info.transfer_length(); loop { @@ -23,38 +24,32 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E match decoder.decode(packets) { None => { - log::warn!("lost block {block_id}"); + log::error!("lost block {block_id}, synchronization lost"); continue; } Some(block) => { - log::trace!("block {} decoded with {} bytes!", block_id, block.len()); - - let mut retry_cnt = 0; + log::trace!("block {block_id} decoded with {} bytes!", block.len()); loop { + let mut retried = 0; let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock"); - match block_id.cmp(&to_receive) { - Ordering::Equal => { - receiver - .to_dispatch - .send(protocol::Message::deserialize(block))?; - *to_receive = to_receive.wrapping_add(1); - break; - } - Ordering::Greater => { - // Thread is too late, drop the packet and kill the current job - log::warn!("Dropping the packet {block_id}"); + + if *to_receive == block_id { + // The decoded block is the expected one, dispatching it + receiver + .to_dispatch + .send(protocol::Message::deserialize(block))?; + *to_receive = to_receive.wrapping_add(1); + break; + } else { + // The decoded block is not the expected one + // Retrying until all decoding threads had one chance to dispatch their block + retried += 1; + if nb_decoding_threads < retried { + // All decoding threads should have had one chance to dispatch their block + log::warn!("dropping block {block_id} after trying to dispatch it {retried} times"); break; } - Ordering::Less => { - if retry_cnt < 10 { - retry_cnt +=1; - yield_now(); - } else { - break; - } - - } } } } diff --git a/src/receive/dispatch.rs b/src/receive/dispatch.rs index ade2cfc..611f79c 100644 --- a/src/receive/dispatch.rs +++ b/src/receive/dispatch.rs @@ -59,9 +59,7 @@ pub(crate) fn start(receiver: &receive::Receiver) -> Result<(), receive::E protocol::MessageType::Start => { let (client_sendq, client_recvq) = crossbeam_channel::unbounded::(); - active_transfers.insert(client_id, client_sendq); - receiver.to_clients.send((client_id, client_recvq))?; } diff --git a/src/receive/mod.rs b/src/receive/mod.rs index e2e8b3e..6e078b1 100644 --- a/src/receive/mod.rs +++ b/src/receive/mod.rs @@ -276,7 +276,9 @@ where for i in 0..self.config.nb_decoding_threads { thread::Builder::new() .name(format!("decoding_{i}")) - .spawn_scoped(scope, || decoding::start(self))?; + .spawn_scoped(scope, || { + decoding::start(self, self.config.nb_decoding_threads) + })?; } thread::Builder::new()