Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible fix for PredictionThreshold error dropping rollback requests causing desync #77

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/input_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ impl<T: Config> InputQueue<T> {
self.frame_delay = delay;
}

pub(crate) fn reset_first_incorrect_frame(&mut self) {
self.first_incorrect_frame = NULL_FRAME;
}

pub(crate) fn reset_prediction(&mut self) {
self.prediction.frame = NULL_FRAME;
self.first_incorrect_frame = NULL_FRAME;
self.last_requested_frame = NULL_FRAME;
}

Expand Down Expand Up @@ -104,7 +107,9 @@ impl<T: Config> InputQueue<T> {
pub(crate) fn input(&mut self, requested_frame: Frame) -> (T::Input, InputStatus) {
// No one should ever try to grab any input when we have a prediction error.
// Doing so means that we're just going further down the wrong path. Assert this to verify that it's true.
assert!(self.first_incorrect_frame == NULL_FRAME);
//
// TODO: We no longer reset this until certain rollback will be performed. Figure out what to do with assert?
// assert!(self.first_incorrect_frame == NULL_FRAME);

// Remember the last requested frame number for later. We'll need this in add_input() to drop out of prediction mode.
self.last_requested_frame = requested_frame;
Expand Down
37 changes: 34 additions & 3 deletions src/sessions/p2p_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,14 @@ impl<T: Config> P2PSession<T> {
let first_incorrect = self
.sync_layer
.check_simulation_consistency(self.disconnect_frame);
let mut adjusted_gamestate = false;
if first_incorrect != NULL_FRAME {
self.adjust_gamestate(first_incorrect, confirmed_frame, &mut requests);

// TODO: This seems to assume we will successfully rollback to disconnected frame (or other first incorrect)
// probably should verify requests return without error before clearing.
self.disconnect_frame = NULL_FRAME;
adjusted_gamestate = true;
}

let last_saved = self.sync_layer.last_saved_frame();
Expand All @@ -305,9 +310,33 @@ impl<T: Config> P2PSession<T> {
// send confirmed inputs to spectators before throwing them away
self.send_confirmed_inputs_to_spectators(confirmed_frame);

// set the last confirmed frame and discard all saved inputs before that frame
self.sync_layer
.set_last_confirmed_frame(confirmed_frame, self.sparse_saving);
let mut input_error: Option<GgrsError> = None;
{
// Determine if we hit prediction window, using speculative new confirmed_frame.
// (We only commit new confirmed_frame to sync layer if we are certain we do not error here,
// and that requests are returned for potential rollback to be processed.)
let frames_ahead = self.sync_layer.current_frame() - confirmed_frame;
if self.sync_layer.current_frame() >= self.max_prediction as i32
&& frames_ahead >= self.max_prediction as i32
{
input_error = Some(GgrsError::PredictionThreshold);
}
}

if input_error.is_none() {
// If we requested a rollback: Reset first incorrect frame, as it will be corrected by requests.
// ( This must not be reset on input error, as this would cause requests to be dropped, meaning we reset
// incorrect frame without actually rolling back ).
if adjusted_gamestate {
self.sync_layer.reset_first_incorrect_frame();
}

// set the last confirmed frame and discard all saved inputs before that frame. We only do this
// if we are certain requests will actually be processed (no error).
//
self.sync_layer
.set_last_confirmed_frame(confirmed_frame, self.sparse_saving);
}

/*
* DESYNC DETECTION
Expand Down Expand Up @@ -341,6 +370,8 @@ impl<T: Config> P2PSession<T> {
self.local_connect_status[handle].last_frame = actual_frame;
}
None => {
// TODO: This error may drop requests and miss a correction causing desync, handle like PredictionThreshold?
// Or be ok with desync due to mis-use of api?
return Err(GgrsError::InvalidRequest {
info: "Missing local input while calling advance_frame().".to_owned(),
});
Expand Down
6 changes: 6 additions & 0 deletions src/sync_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ impl<T: Config> SyncLayer<T> {
self.input_queues[player_handle].set_frame_delay(delay);
}

pub(crate) fn reset_first_incorrect_frame(&mut self) {
for i in 0..self.num_players {
self.input_queues[i].reset_first_incorrect_frame();
}
}

pub(crate) fn reset_prediction(&mut self) {
for i in 0..self.num_players {
self.input_queues[i].reset_prediction();
Expand Down
103 changes: 103 additions & 0 deletions tests/debug_socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use std::{
collections::HashMap,
hash::Hash,
sync::{Arc, Mutex},
};

pub type MessageBuffer<A> = Vec<(A, ggrs::Message)>;

/// A dummy socket for reproducing controlled delays in delivering messages.
///
/// No messages sent will be made available to receiver unless test implementor
/// explicitly flushes message. This allows implementing tests that wish to reproduce
/// precise delays in message delivery.
///
/// [`DebugSocket::build_sockets`] will generate connected sockets for all addresses.
#[derive(Default, Clone)]
pub struct DebugSocket<A: Clone + PartialEq + Eq + Hash> {
/// Messages sent, but not yet flushed to be made available to receiver.
sent_messages: HashMap<A, Arc<Mutex<Vec<ggrs::Message>>>>,

/// Message buffers per address that are shared between all sockets.
/// When socket flushes messages to recepient, messages moved from sent buffer
/// to remote buffer.
remote_delivery_buffers: HashMap<A, Arc<Mutex<MessageBuffer<A>>>>,

/// Delivered messages ready for consumption for local owner of socket
received_messages: Arc<Mutex<MessageBuffer<A>>>,

/// Address of local socket
local_addr: A,
}

impl<A> DebugSocket<A>
where
A: Clone + PartialEq + Eq + Hash,
{
/// Build socket for each address such that each one can write to
/// any other address.
pub fn build_sockets(addrs: Vec<A>) -> Vec<DebugSocket<A>> {
// Create shared buffer for each address
let receive_buffers: HashMap<A, Arc<Mutex<MessageBuffer<A>>>> =
addrs.iter().fold(Default::default(), |mut map, addr| {
map.insert(addr.clone(), Arc::new(Mutex::new(vec![])));
map
});

let mut sockets = Vec::<DebugSocket<A>>::default();
for addr in addrs.clone() {
sockets.push(DebugSocket {
sent_messages: addrs.iter().fold(Default::default(), |mut map, addr| {
map.insert(addr.clone(), Arc::new(Mutex::new(vec![])));
map
}),
remote_delivery_buffers: receive_buffers.clone(),
// Receive message from delivery buffer for this address
received_messages: receive_buffers.get(&addr).unwrap().clone(),
local_addr: addr,
})
}
sockets
}

/// Deliver messages sent to other receiving sockets
pub fn flush_message(&mut self) {
for (addr, sent) in self.sent_messages.iter_mut() {
let mut sent = sent.lock().unwrap();
let mut remote_buffer = self
.remote_delivery_buffers
.get_mut(addr)
.unwrap()
.lock()
.unwrap();

remote_buffer.extend(sent.drain(..).map(|m| (self.local_addr.clone(), m)));
}
}

/// Deliver messages sent to specified address
#[allow(dead_code)]
pub fn flush_message_for_addr(&mut self, addr: A) {
let mut sent = self.sent_messages.get_mut(&addr).unwrap().lock().unwrap();
let mut remote_buffer = self
.remote_delivery_buffers
.get_mut(&addr)
.unwrap()
.lock()
.unwrap();

remote_buffer.extend(sent.drain(..).map(|m| (self.local_addr.clone(), m)));
}
}

impl<A: Clone + PartialEq + Eq + Hash> ggrs::NonBlockingSocket<A> for DebugSocket<A> {
fn send_to(&mut self, msg: &ggrs::Message, addr: &A) {
let mut sent = self.sent_messages.get_mut(addr).unwrap().lock().unwrap();
sent.push(msg.clone());
}

fn receive_all_messages(&mut self) -> Vec<(A, ggrs::Message)> {
let mut messages = self.received_messages.lock().unwrap();
messages.drain(..).collect()
}
}
Loading
Loading