Skip to content

Commit

Permalink
feat: implement rpc call sync state populate logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
Zurcusa committed Aug 28, 2024
1 parent a201b48 commit 23aaff7
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 68 deletions.
2 changes: 2 additions & 0 deletions src/main/java/com/limechain/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.limechain.rpc.Function;
import com.limechain.rpc.RpcClient;
import com.limechain.rpc.server.RpcApp;
import com.limechain.storage.LocalStorage;
import com.limechain.utils.DivLogger;
import org.teavm.jso.JSBody;
import org.teavm.jso.core.JSString;
Expand All @@ -17,6 +18,7 @@ public class Main {

public static void main(String[] args) {
log.log("Starting LimeChain node...");
LocalStorage.clear();
RpcApp rpcApp = new RpcApp();
rpcApp.start();

Expand Down
30 changes: 20 additions & 10 deletions src/main/java/com/limechain/client/LightClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
import com.limechain.network.Network;
import com.limechain.rpc.server.AppBean;
import com.limechain.sync.warpsync.WarpSyncMachine;
import com.limechain.utils.DivLogger;
import lombok.SneakyThrows;
import lombok.extern.java.Log;

import java.util.logging.Level;

/**
* Main light client class that starts and stops execution of
* the client and hold references to dependencies
*/
@Log
public class LightClient implements HostNode {
// TODO: Add service dependencies i.e rpc, sync, network, etc.
// TODO: Do we need those as fields here...?
private final Network network;
private WarpSyncMachine warpSyncMachine;

private static final DivLogger log = new DivLogger();

/**
* @implNote the RpcApp is assumed to have been started before constructing the client,
Expand All @@ -33,16 +33,26 @@ public LightClient() {
@SneakyThrows
public void start() {
this.network.start();
while (true) {
WarpSyncMachine warpSyncMachine = AppBean.getBean(WarpSyncMachine.class);

log.log(Level.INFO, "Syncing to latest finalized block state...");

int retryCount = 0;
while (retryCount < 3) {
this.network.updateCurrentSelectedPeer();

if (this.network.getKademliaService().getSuccessfulBootNodes() > 0) {
log.log(Level.INFO, "Node successfully connected to a peer! Sync can start!");
this.warpSyncMachine = AppBean.getBean(WarpSyncMachine.class);
this.warpSyncMachine.start();
break;
System.out.println("Node successfully connected to a peer! Sync will use warp protocol!");
warpSyncMachine.start(false);
return;
} else {
retryCount++;
System.out.println("Waiting to retry peer connection...");
Thread.sleep(1000);
}
log.log(Level.INFO, "Waiting for peer connection...");
Thread.sleep(10000);
}

System.out.println("Node failed to connect to peer! Sync will use RPC calls!!");
warpSyncMachine.start(true);
}
}
12 changes: 3 additions & 9 deletions src/main/java/com/limechain/config/SystemInfo.java
Original file line number Diff line number Diff line change
@@ -1,33 +1,28 @@
package com.limechain.config;

import com.limechain.chain.Chain;
import com.limechain.storage.block.SyncState;
import com.limechain.utils.DivLogger;
import lombok.Getter;

import java.math.BigInteger;
import java.util.Map;
import java.util.logging.Level;

/**
* Configuration class used to hold and information used by the system rpc methods
*/
@Getter
public class SystemInfo {
// private final String role;
// private final String role;
private final Chain chain;
// private final String hostIdentity;
// private final String hostIdentity;
private String hostName = "Fruzhin";
private String hostVersion = "0.0.1";
private final BigInteger highestBlock;

private static final DivLogger log = new DivLogger();

public SystemInfo(HostConfig hostConfig, SyncState syncState) {
public SystemInfo(HostConfig hostConfig) {
// this.role = network.getNodeRole().name();
this.chain = hostConfig.getChain();
// this.hostIdentity = network.getHost().getPeerId().toString();
this.highestBlock = syncState.getLastFinalizedBlockNumber();
logSystemInfo();
}

Expand All @@ -48,6 +43,5 @@ public void logSystemInfo() {
// log.log(Level.INFO, authEmoji + "Role: " + role);
// log.log(Level.INFO, "Local node identity is: " + hostIdentity);
log.log(Level.INFO, "Operating System: " + System.getProperty("os.name"));
log.log(Level.INFO, "Highest known block at #" + highestBlock);
}
}
6 changes: 3 additions & 3 deletions src/main/java/com/limechain/rpc/server/CommonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected static Object getBean(Class<?> beanClass) {
beans.put(beanClass, syncState);
return syncState;
case "SystemInfo":
SystemInfo systemInfo = systemInfo((HostConfig) getBean(HostConfig.class), (SyncState) getBean(SyncState.class));
SystemInfo systemInfo = systemInfo((HostConfig) getBean(HostConfig.class));
beans.put(beanClass, systemInfo);
return systemInfo;
case "Network":
Expand Down Expand Up @@ -79,8 +79,8 @@ private static SyncState syncState() {
return new SyncState();
}

private static SystemInfo systemInfo(HostConfig hostConfig, SyncState syncState) {
return new SystemInfo(hostConfig, syncState);
private static SystemInfo systemInfo(HostConfig hostConfig) {
return new SystemInfo(hostConfig);
}

private static Network network(ChainService chainService, HostConfig hostConfig) {
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/com/limechain/storage/block/SyncState.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class SyncState {
@Setter
private Authority[] authoritySet;
private BigInteger latestRound;
@Setter
private BigInteger setId;

public SyncState() {
Expand Down Expand Up @@ -88,8 +89,4 @@ public void setLightSyncState(LightSyncState initState) {
setAuthoritySet(initState.getGrandpaAuthoritySet().getCurrentAuthorities());
finalizeHeader(initState.getFinalizedBlockHeader());
}

public String getStateRoot() {
return null;
}
}
18 changes: 5 additions & 13 deletions src/main/java/com/limechain/sync/JustificationVerifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.security.InvalidKeyException;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.PublicKey;
import java.security.Signature;
import java.security.SignatureException;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.X509EncodedKeySpec;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -49,7 +41,7 @@ public static boolean verify(Precommit[] precommits, BigInteger round) {

Set<Hash256> seenPublicKeys = new HashSet<>();
Set<Hash256> authorityKeys =
Arrays.stream(authorities).map(Authority::getPublicKey).map(Hash256::new).collect(Collectors.toSet());
Arrays.stream(authorities).map(Authority::getPublicKey).map(Hash256::new).collect(Collectors.toSet());

for (Precommit precommit : precommits) {
if (!authorityKeys.contains(precommit.getAuthorityPublicKey())) {
Expand All @@ -68,9 +60,9 @@ public static boolean verify(Precommit[] precommits, BigInteger round) {
byte[] data = getDataToVerify(precommit, authoritiesSetId, round);

boolean isValid = verifySignature(
StringUtils.toHex(precommit.getAuthorityPublicKey().getBytes()),
StringUtils.toHex(precommit.getSignature().getBytes()),
StringUtils.toHex(data));
StringUtils.toHex(precommit.getAuthorityPublicKey().getBytes()),
StringUtils.toHex(precommit.getSignature().getBytes()),
StringUtils.toHex(data));

if (!isValid) {
log.log(Level.WARNING, "Failed to verify signature");
Expand Down Expand Up @@ -142,7 +134,7 @@ private static byte[] getDataToVerify(Precommit precommit, BigInteger authoritie
}

@JSBody(params = {"publicKeyHex", "signatureHex",
"messageHex"}, script = "return verifyAsync(signatureHex, messageHex, publicKeyHex);")
"messageHex"}, script = "return verifyAsync(signatureHex, messageHex, publicKeyHex);")
public static native JSPromise<JSBoolean> verifyAsync(String publicKeyHex, String signatureHex,
String messageHex);
}
50 changes: 29 additions & 21 deletions src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
import com.limechain.storage.block.SyncState;
import com.limechain.sync.warpsync.action.FinishedAction;
import com.limechain.sync.warpsync.action.RequestFragmentsAction;
import com.limechain.sync.warpsync.action.RpcFallbackAction;
import com.limechain.sync.warpsync.action.WarpSyncAction;
import com.limechain.tuple.Pair;
import com.limechain.utils.DivLogger;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.java.Log;
import com.limechain.tuple.Pair;

import java.math.BigInteger;
import java.util.ArrayList;
Expand All @@ -23,11 +24,12 @@
import java.util.Queue;
import java.util.logging.Level;

@Log
@Getter
@Setter
public class WarpSyncMachine {

private static final DivLogger log = new DivLogger();

private final PriorityQueue<Pair<BigInteger, Authority[]>> scheduledAuthorityChanges;
private final ChainInformation chainInformation;
private Queue<WarpSyncFragment> fragmentsQueue;
Expand Down Expand Up @@ -62,22 +64,27 @@ public boolean isSyncing() {
return this.warpSyncAction.getClass() != FinishedAction.class;
}

public void start() {
LightSyncState initState = LightSyncState.decode(this.chainService.getChainSpec().getLightSyncState());

if (this.syncState.getLastFinalizedBlockNumber()
.compareTo(initState.getFinalizedBlockHeader().getBlockNumber()) < 0) {
this.syncState.setLightSyncState(initState);
public void start(boolean useRpc) {
if (!useRpc) {
LightSyncState initState = LightSyncState.decode(this.chainService.getChainSpec().getLightSyncState());

if (this.syncState.getLastFinalizedBlockNumber()
.compareTo(initState.getFinalizedBlockHeader().getBlockNumber()) < 0) {
this.syncState.setLightSyncState(initState);
}
System.out.println(this.syncState.getLastFinalizedBlockHash());
System.out.println(this.syncState.getLastFinalizedBlockNumber());

final Hash256 initStateHash = this.syncState.getLastFinalizedBlockHash();
// Always start with requesting fragments
System.out.println("Requesting fragments... " + initStateHash);
this.networkService.updateCurrentSelectedPeerWithNextBootnode();
this.warpSyncAction = new RequestFragmentsAction(initStateHash);
} else {
System.out.println("Warping via RPC... ");
this.networkService.updateCurrentSelectedPeerWithNextBootnode();
this.warpSyncAction = new RpcFallbackAction();
}
System.out.println(this.syncState.getLastFinalizedBlockHash());
System.out.println(this.syncState.getLastFinalizedBlockNumber());

final Hash256 initStateHash = this.syncState.getLastFinalizedBlockHash();

// Always start with requesting fragments
log.log(Level.INFO, "Requesting fragments... " + initStateHash);
this.networkService.updateCurrentSelectedPeerWithNextBootnode();
this.warpSyncAction = new RequestFragmentsAction(initStateHash);

// new Thread(() -> {
while (this.warpSyncAction.getClass() != FinishedAction.class) {
Expand All @@ -90,16 +97,17 @@ public void start() {
}

public void stop() {
log.info("Stopping warp sync machine");
System.out.println("Stopping warp sync machine");
this.warpSyncAction = null;
log.info("Warp sync machine stopped.");
System.out.println("Warp sync machine stopped.");
}

private void finishWarpSync() {
this.warpState.setWarpSyncFinished(true);
// this.networkService.handshakeBootNodes();
this.syncState.persistState();
log.info("Warp sync finished.");
System.out.println("Warp sync finished.");
log.log(Level.INFO, "Highest known block at #" + syncState.getLastFinalizedBlockNumber());
this.onFinishCallbacks.forEach(Runnable::run);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.limechain.sync.warpsync.action;

import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.network.protocol.warp.dto.HeaderDigest;
import com.limechain.network.protocol.warp.scale.reader.HeaderDigestReader;
import com.limechain.polkaj.Hash256;
import com.limechain.polkaj.reader.ScaleCodecReader;
import com.limechain.rpc.BlockRpcClient;
import com.limechain.rpc.dto.ChainGetHeaderResult;
import com.limechain.rpc.dto.GrandpaRoundStateResult;
import com.limechain.rpc.server.AppBean;
import com.limechain.storage.block.SyncState;
import com.limechain.sync.warpsync.WarpSyncMachine;
import com.limechain.utils.StringUtils;
import lombok.extern.java.Log;

import java.math.BigInteger;
import java.util.List;
import java.util.logging.Level;

@Log
public class RpcFallbackAction implements WarpSyncAction {
private final SyncState syncState;
private Exception error;

public RpcFallbackAction() {
this.syncState = AppBean.getBean(SyncState.class);
}

@Override
public void next(WarpSyncMachine sync) {
if (this.error != null) {
sync.setWarpSyncAction(new RequestFragmentsAction(syncState.getLastFinalizedBlockHash()));
return;
}

log.log(Level.INFO, "Populated sync state from RPC results. Block hash is now at #"
+ syncState.getLastFinalizedBlockNumber() + ": "
+ syncState.getLastFinalizedBlockHash().toString()
+ " with state root " + syncState.getStateRoot());

sync.setWarpSyncAction(new FinishedAction());
}

@Override
public void handle(WarpSyncMachine sync) {
try {
Hash256 latestFinalizedHashResult = BlockRpcClient.getLastFinalizedBlockHash();
ChainGetHeaderResult headerResult = BlockRpcClient.getHeader(latestFinalizedHashResult.toString());
GrandpaRoundStateResult roundStateResult = BlockRpcClient.getGrandpaRoundState();

BlockHeader latestFinalizedHeader = new BlockHeader();
latestFinalizedHeader.setBlockNumber(new BigInteger(
StringUtils.remove0xPrefix(headerResult.getNumber()), 16));
latestFinalizedHeader.setParentHash(Hash256.from(headerResult.getParentHash()));
latestFinalizedHeader.setStateRoot(Hash256.from(headerResult.getStateRoot()));
latestFinalizedHeader.setExtrinsicsRoot(Hash256.from(headerResult.getExtrinsicsRoot()));

List<String> digestHexes = headerResult.getDigest().getLogs();
HeaderDigest[] digests = new HeaderDigest[digestHexes.size()];
for (int i = 0; i < digestHexes.size(); i++) {
digests[i] = new HeaderDigestReader().read(
new ScaleCodecReader(StringUtils.hexToBytes(digestHexes.get(i))));
}
latestFinalizedHeader.setDigest(digests);

syncState.finalizeHeader(latestFinalizedHeader);
syncState.setSetId(BigInteger.valueOf(roundStateResult.getSetId()));
syncState.resetRound();

} catch (Exception e) {
log.log(Level.WARNING, "Error while calling rpc endpoints: " + e.getMessage());
this.error = e;
}
}
}
Loading

0 comments on commit 23aaff7

Please sign in to comment.