Skip to content

Commit

Permalink
bug: Fix issue where RPC warp sync results in consecutive warp syncs …
Browse files Browse the repository at this point in the history
…always failing and falling back to RPC.
  • Loading branch information
Zurcusa committed Aug 29, 2024
1 parent 23aaff7 commit c9d5ac2
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 18 deletions.
2 changes: 0 additions & 2 deletions src/main/java/com/limechain/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.limechain.rpc.Function;
import com.limechain.rpc.RpcClient;
import com.limechain.rpc.server.RpcApp;
import com.limechain.storage.LocalStorage;
import com.limechain.utils.DivLogger;
import org.teavm.jso.JSBody;
import org.teavm.jso.core.JSString;
Expand All @@ -18,7 +17,6 @@ public class Main {

public static void main(String[] args) {
log.log("Starting LimeChain node...");
LocalStorage.clear();
RpcApp rpcApp = new RpcApp();
rpcApp.start();

Expand Down
10 changes: 4 additions & 6 deletions src/main/java/com/limechain/client/LightClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,15 @@ public void start() {
this.network.updateCurrentSelectedPeer();

if (this.network.getKademliaService().getSuccessfulBootNodes() > 0) {
System.out.println("Node successfully connected to a peer! Sync will use warp protocol!");
warpSyncMachine.start(false);
return;
warpSyncMachine.setProtocolSync(true);
break;
} else {
retryCount++;
System.out.println("Waiting to retry peer connection...");
Thread.sleep(1000);
Thread.sleep(2000);
}
}

System.out.println("Node failed to connect to peer! Sync will use RPC calls!!");
warpSyncMachine.start(true);
warpSyncMachine.start();
}
}
1 change: 1 addition & 0 deletions src/main/java/com/limechain/storage/DBConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class DBConstants {
public static final String LATEST_ROUND = "ss::latestRound";
public static final String SET_ID = "ss::setId";
public static final String STATE_ROOT = "ss::stateRoot";
public static final String IS_PROTOCOL_SYNC = "ss::isProtocolSync";

//</editor-fold>
}
12 changes: 12 additions & 0 deletions src/main/java/com/limechain/storage/block/SyncState.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class SyncState {
public SyncState() {
this.genesisBlockHash = GenesisBlockHash.POLKADOT;

clearStoredStateIfNeeded();
loadState();
this.startingBlock = this.lastFinalizedBlockNumber;
}
Expand Down Expand Up @@ -89,4 +90,15 @@ public void setLightSyncState(LightSyncState initState) {
setAuthoritySet(initState.getGrandpaAuthoritySet().getCurrentAuthorities());
finalizeHeader(initState.getFinalizedBlockHeader());
}

public void saveIsProtocolSync(boolean isProtocolSync) {
LocalStorage.save(DBConstants.IS_PROTOCOL_SYNC, isProtocolSync);
}

private void clearStoredStateIfNeeded() {
boolean isProtocolSync = LocalStorage.find(DBConstants.IS_PROTOCOL_SYNC, boolean.class).orElse(false);
if (!isProtocolSync) {
LocalStorage.clear();
}
}
}
14 changes: 8 additions & 6 deletions src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ public class WarpSyncMachine {

private static final DivLogger log = new DivLogger();

private Queue<WarpSyncFragment> fragmentsQueue;
private WarpSyncAction warpSyncAction;
private boolean isProtocolSync;
private final PriorityQueue<Pair<BigInteger, Authority[]>> scheduledAuthorityChanges;
private final ChainInformation chainInformation;
private Queue<WarpSyncFragment> fragmentsQueue;
private final ChainService chainService;
private WarpSyncAction warpSyncAction;
private final WarpSyncState warpState;
private final Network networkService;
private final SyncState syncState;
Expand All @@ -50,6 +51,7 @@ public WarpSyncMachine(Network network, ChainService chainService, SyncState syn
this.scheduledAuthorityChanges = new PriorityQueue<>(Comparator.comparing(Pair::getValue0));
this.chainInformation = new ChainInformation();
this.onFinishCallbacks = new ArrayList<>();
this.isProtocolSync = false;
}

public void nextState() {
Expand All @@ -64,16 +66,15 @@ public boolean isSyncing() {
return this.warpSyncAction.getClass() != FinishedAction.class;
}

public void start(boolean useRpc) {
if (!useRpc) {
public void start() {
if (isProtocolSync) {
System.out.println("Warping via warp protocol... ");
LightSyncState initState = LightSyncState.decode(this.chainService.getChainSpec().getLightSyncState());

if (this.syncState.getLastFinalizedBlockNumber()
.compareTo(initState.getFinalizedBlockHeader().getBlockNumber()) < 0) {
this.syncState.setLightSyncState(initState);
}
System.out.println(this.syncState.getLastFinalizedBlockHash());
System.out.println(this.syncState.getLastFinalizedBlockNumber());

final Hash256 initStateHash = this.syncState.getLastFinalizedBlockHash();
// Always start with requesting fragments
Expand Down Expand Up @@ -106,6 +107,7 @@ private void finishWarpSync() {
this.warpState.setWarpSyncFinished(true);
// this.networkService.handshakeBootNodes();
this.syncState.persistState();
syncState.saveIsProtocolSync(isProtocolSync);
System.out.println("Warp sync finished.");
log.log(Level.INFO, "Highest known block at #" + syncState.getLastFinalizedBlockNumber());
this.onFinishCallbacks.forEach(Runnable::run);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;

@Log
Expand Down Expand Up @@ -38,7 +37,8 @@ public void next(WarpSyncMachine sync) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.log(Level.SEVERE, "Retry warp sync request fragment exception: "
+ e.getMessage(), e.getStackTrace());
+ e.getMessage(), e.getStackTrace());
sync.setWarpSyncAction(new RpcFallbackAction());
}
}
if (this.result != null) {
Expand All @@ -50,8 +50,8 @@ public void next(WarpSyncMachine sync) {

@Override
public void handle(WarpSyncMachine sync) {
WarpSyncResponse resp = sync.getNetworkService().makeWarpSyncRequest(blockHash.toString());
try {
WarpSyncResponse resp = sync.getNetworkService().makeWarpSyncRequest(blockHash.toString());
if (resp == null) {
throw new MissingObjectException("No response received.");
}
Expand All @@ -63,7 +63,7 @@ public void handle(WarpSyncMachine sync) {
}
warpSyncState.setWarpSyncFragmentsFinished(resp.isFinished());
sync.setFragmentsQueue(new ArrayDeque<>(
Arrays.stream(resp.getFragments()).toList())
Arrays.stream(resp.getFragments()).toList())
);

this.result = resp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void handle(WarpSyncMachine sync) {
syncState.setSetId(BigInteger.valueOf(roundStateResult.getSetId()));
syncState.resetRound();

sync.setProtocolSync(false);
} catch (Exception e) {
log.log(Level.WARNING, "Error while calling rpc endpoints: " + e.getMessage());
this.error = e;
Expand Down

0 comments on commit c9d5ac2

Please sign in to comment.