Skip to content

Commit

Permalink
fix reconnect between SC and SPU: #43
Browse files Browse the repository at this point in the history
always register valid SPU even if there exists active SPU connection
  • Loading branch information
sehz committed Jun 3, 2020
1 parent f898eae commit 46707f5
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 32 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.0
0.5.1
1 change: 1 addition & 0 deletions src/cli/src/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
VERSION
2 changes: 1 addition & 1 deletion src/cli/src/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.0
0.5.1
3 changes: 2 additions & 1 deletion src/sc-core/src/core/spus/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::iter::FromIterator;
use std::io::Error as IoError;
use std::io::ErrorKind;


use flv_util::socket_helpers::ServerAddress;
use flv_types::SpuId;
use flv_metadata::spu::{Endpoint, SpuSpec, SpuStatus, IngressPort};
Expand Down Expand Up @@ -290,7 +291,7 @@ impl SpuLocalStore {
// check if spu can be registered
pub fn validate_spu_for_registered(&self, id: &SpuId) -> bool {
for (_, spu) in (self.inner_store().read()).iter() {
if spu.id() == id && spu.status.is_offline() {
if spu.id() == id {
return true;
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/sc-core/src/services/private_api/private_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ impl KfService<TcpStream> for ScInternalService

api_loop!(
api_stream,
InternalScRequest::UpdateLrsRequest(msg) => {
debug!("received lrs request: {}",msg);
context.send_lrs_to_sender(msg.request).await;
},
InternalScRequest::RegisterSpuRequest(_request) => {
error!("registration req only valid during initialization");
return Err(KfSocketError::IoError(IoError::new(ErrorKind::InvalidData,"register spu request is only valid beggining")))
}
InternalScRequest::UpdateLrsRequest(msg) => {
debug!("received lrs request: {}",msg);
context.send_lrs_to_sender(msg.request).await;
},
InternalScRequest::RegisterSpuRequest(_request) => {
error!("registration req only valid during initialization");
return Err(KfSocketError::IoError(IoError::new(ErrorKind::InvalidData,"register spu request is only valid at init")))
}
);

debug!("api loop terminated; clearing sink");
Expand Down
55 changes: 34 additions & 21 deletions src/spu-server/src/controllers/sc/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::time::Duration;
use std::process;
use std::io::Error as IoError;
use std::sync::Arc;

Expand Down Expand Up @@ -83,6 +82,9 @@ impl ScDispatcher<FileReplica> {
async fn dispatch_loop(mut self) {
let mut counter: u64 = 0;

const WAIT_RECONNECT_INTERVAL: u64 = 3000;


loop {
debug!("entering SC dispatch loop: {}", counter);

Expand All @@ -93,27 +95,35 @@ impl ScDispatcher<FileReplica> {
);

// register and exit on error
match self.send_spu_registeration(&mut socket).await {
Ok(_) => {}
let status = match self.send_spu_registeration(&mut socket).await {
Ok(status) => status,
Err(err) => {
print_cli_err!(format!("cannot register with sc: {}", err));
process::exit(-1);
}
}

// continuously process updates from and send back status to SC
match self.sc_request_loop(socket).await {
Ok(_) => {
debug!("sc request loop finished: {}", counter);
// give little bit time before trying to reconnect
sleep(Duration::from_millis(100)).await;
counter = counter + 1;
print_cli_err!(format!(
"spu registeration failed with sc due to error: {}",
err
));
false
}
Err(err) => {
warn!("error, connecting to sc: {:#?}", err);
// We are connection to sc. Retry again
// Currently we use 3 seconds to retry but this should be using backoff algorithm
sleep(Duration::from_millis(3000)).await;
};

if !status {
warn!("sleeping 3 seconds before re-trying re-register");
sleep(Duration::from_millis(WAIT_RECONNECT_INTERVAL)).await;
} else {
// continuously process updates from and send back status to SC
match self.sc_request_loop(socket).await {
Ok(_) => {
debug!("sc connection terminated: {}, waiting before reconnecting", counter);
// give little bit time before trying to reconnect
sleep(Duration::from_millis(10)).await;
counter = counter + 1;
}
Err(err) => {
warn!("error connecting to sc: {:#?}, waiting before reconnecting", err);
// We are connection to sc. Retry again
// Currently we use 3 seconds to retry but this should be using backoff algorithm
sleep(Duration::from_millis(WAIT_RECONNECT_INTERVAL)).await;
}
}
}
}
Expand Down Expand Up @@ -177,6 +187,9 @@ impl ScDispatcher<FileReplica> {
}
}

drop(api_stream);
drop(shared_sink);

Ok(())
}

Expand Down Expand Up @@ -209,7 +222,7 @@ impl ScDispatcher<FileReplica> {

Ok(false)
} else {
debug!("spu '{}' registration Ok", local_spu_id);
info!("spu '{}' registration successful", local_spu_id);

Ok(true)
}
Expand Down

0 comments on commit 46707f5

Please sign in to comment.