Skip to content

Commit

Permalink
Use nodeName instead of replicaId
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme committed Oct 22, 2024
1 parent 287dd5f commit 820af12
Show file tree
Hide file tree
Showing 12 changed files with 695 additions and 631 deletions.
5 changes: 4 additions & 1 deletion clientlib/src/main/proto/yelp/nrtsearch/luceneserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -917,11 +917,13 @@ message StateResponse {
message AddReplicaRequest {
int32 magicNumber = 1; //magic number send on all requests since these are meant for internal communication only
string indexName = 2; //index name
int32 replicaId = 3; //replica Id
int32 replicaId = 3 [deprecated = true]; //replica Id
string hostName = 4; // replica host name
int32 port = 5; // replica port number
// index id
string indexId = 6;
// node name
string nodeName = 7;
}

message AddReplicaResponse {
Expand Down Expand Up @@ -1046,6 +1048,7 @@ message GetNodesResponse {
message NodeInfo {
string hostname = 1; //name or ip address of the remote host that this node is connected to for binary replication
int32 port = 2; //port number of the remote host that this node is connected to for binary replication
string nodeName = 3; //name of the remote node
}

message DeleteByQueryRequest {
Expand Down
1,203 changes: 613 additions & 590 deletions grpc-gateway/luceneserver.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions grpc-gateway/luceneserver.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4142,6 +4142,9 @@
"port": {
"type": "integer",
"format": "int32"
},
"nodeName": {
"type": "string"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public NrtsearchConfig(InputStream yamlStream) {
configReader.getLong("maxConnectionAgeForReplication", AS_LARGE_AS_INFINITE);
maxConnectionAgeGraceForReplication =
configReader.getLong("maxConnectionAgeGraceForReplication", AS_LARGE_AS_INFINITE);
nodeName = configReader.getString("nodeName", DEFAULT_NODE_NAME);
nodeName = substituteEnvVariables(configReader.getString("nodeName", DEFAULT_NODE_NAME));
hostName = substituteEnvVariables(configReader.getString("hostName", DEFAULT_HOSTNAME));
stateDir = configReader.getString("stateDir", DEFAULT_STATE_DIR.toString());
indexDir = configReader.getString("indexDir", DEFAULT_INDEX_DIR.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,13 @@ private void shutdown() throws InterruptedException {
}

public void addReplicas(
String indexName, String indexId, int replicaId, String hostName, int port) {
String indexName, String indexId, String nodeName, String hostName, int port) {
AddReplicaRequest addReplicaRequest =
AddReplicaRequest.newBuilder()
.setMagicNumber(BINARY_MAGIC)
.setIndexName(indexName)
.setIndexId(indexId)
.setReplicaId(replicaId)
.setNodeName(nodeName)
.setHostName(hostName)
.setPort(port)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addRe
}
try {
shardState.nrtPrimaryNode.addReplica(
addReplicaRequest.getReplicaId(),
addReplicaRequest.getNodeName(),
// channel for primary to talk to replica
new ReplicationServerClient(
addReplicaRequest.getHostName(), addReplicaRequest.getPort(), useKeepAlive));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ private GetNodesResponse handle(IndexState indexState) {
NodeInfo.newBuilder()
.setHostname(hostPort.getHostName())
.setPort(hostPort.getPort())
.setNodeName(replica.getNodeName())
.build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private SearcherVersion handle(IndexState indexState, String indexId) {
Iterator<NRTPrimaryNode.ReplicaDetails> it = replicasInfos.iterator();
while (it.hasNext()) {
NRTPrimaryNode.ReplicaDetails replicaDetails = it.next();
int replicaID = replicaDetails.getReplicaId();
String nodeName = replicaDetails.getNodeName();
ReplicationServerClient currentReplicaServerClient =
replicaDetails.getReplicationServerClient();
try {
Expand All @@ -89,8 +89,8 @@ private SearcherVersion handle(IndexState indexState, String indexId) {
Status status = e.getStatus();
if (status.getCode().equals(Status.UNAVAILABLE.getCode())) {
logger.info(
"NRTPRimaryNode: sendNRTPoint, lost connection to replicaId: "
+ replicaDetails.getReplicaId()
"NRTPRimaryNode: sendNRTPoint, lost connection to nodeName: "
+ nodeName
+ " host: "
+ replicaDetails.getHostPort().getHostName()
+ " port: "
Expand All @@ -99,13 +99,13 @@ private SearcherVersion handle(IndexState indexState, String indexId) {
}
} catch (Exception e) {
shardState.nrtPrimaryNode.message(
"top: failed to connect R"
+ replicaID
"top: failed to connect "
+ nodeName
+ " for newNRTPoint; skipping: "
+ e.getMessage());
logger.info(
"top: failed to connect R"
+ replicaID
"top: failed to connect "
+ nodeName
+ " for newNRTPoint; skipping: "
+ e.getMessage());
}
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/com/yelp/nrtsearch/server/index/ShardState.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@

public class ShardState implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(ShardState.class);
public static final int REPLICA_ID = 0;
public static final String INDEX_DATA_DIR_NAME = "index";
public static final String TAXONOMY_DATA_DIR_NAME = "taxonomy";
final ExecutorService searchExecutor;
Expand Down Expand Up @@ -980,7 +979,7 @@ public synchronized void startReplica(
indexStateManager.getIndexId(),
primaryAddress,
hostPort,
REPLICA_ID,
configuration.getNodeName(),
indexDir,
new ShardSearcherFactory(true, false),
verbose ? System.out : new PrintStream(OutputStream.nullOutputStream()),
Expand Down Expand Up @@ -1115,7 +1114,7 @@ public void run() {
.addReplicas(
shardState.indexStateManager.getCurrent().getName(),
shardState.indexStateManager.getIndexId(),
REPLICA_ID,
nrtReplicaNode.getNodeName(),
nrtReplicaNode.getHostPort().getHostName(),
nrtReplicaNode.getHostPort().getPort());
}
Expand Down
69 changes: 48 additions & 21 deletions src/main/java/com/yelp/nrtsearch/server/nrt/NRTPrimaryNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ public NrtDataManager getNrtDataManager() {
}

public static class ReplicaDetails {
private final int replicaId;
private final String nodeName;
private final HostPort hostPort;
private final ReplicationServerClient replicationServerClient;

public int getReplicaId() {
return replicaId;
public String getNodeName() {
return nodeName;
}

public ReplicationServerClient getReplicationServerClient() {
Expand All @@ -108,9 +108,9 @@ public HostPort getHostPort() {
return hostPort;
}

ReplicaDetails(int replicaId, ReplicationServerClient replicationServerClient) {
this.replicaId = replicaId;
ReplicaDetails(String nodeName, ReplicationServerClient replicationServerClient) {
this.replicationServerClient = replicationServerClient;
this.nodeName = nodeName;
this.hostPort =
new HostPort(replicationServerClient.getHost(), replicationServerClient.getPort());
}
Expand All @@ -124,7 +124,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReplicaDetails that = (ReplicaDetails) o;
return replicaId == that.replicaId && Objects.equals(hostPort, that.hostPort);
return Objects.equals(nodeName, that.nodeName) && Objects.equals(hostPort, that.hostPort);
}

/*
Expand All @@ -133,7 +133,7 @@ public boolean equals(Object o) {
* */
@Override
public int hashCode() {
return Objects.hash(replicaId, hostPort);
return Objects.hash(nodeName, hostPort);
}
}

Expand Down Expand Up @@ -207,7 +207,7 @@ void sendNewNRTPointToReplicas() {
Iterator<ReplicaDetails> it = replicasInfos.iterator();
while (it.hasNext()) {
ReplicaDetails replicaDetails = it.next();
int replicaID = replicaDetails.replicaId;
String nodeName = replicaDetails.getNodeName();
ReplicationServerClient currentReplicaServerClient = replicaDetails.replicationServerClient;
try {
currentReplicaServerClient.newNRTPoint(
Expand All @@ -216,17 +216,17 @@ void sendNewNRTPointToReplicas() {
Status status = e.getStatus();
if (status.getCode().equals(Status.UNAVAILABLE.getCode())) {
logger.warn(
"NRTPrimaryNode: sendNRTPoint, lost connection to replicaId: {} host: {} port: {}",
replicaDetails.replicaId,
"NRTPrimaryNode: sendNRTPoint, lost connection to nodeName: {} host: {} port: {}",
nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort());
currentReplicaServerClient.close();
it.remove();
} else if (status.getCode().equals(Status.FAILED_PRECONDITION.getCode())
|| status.getCode().equals(Status.NOT_FOUND.getCode())) {
logger.warn(
"NRTPrimaryNode: sendNRTPoint, replicaId: {} host: {} port: {} cannot process nrt point, closing connection: {}",
replicaDetails.replicaId,
"NRTPrimaryNode: sendNRTPoint, nodeName: {} host: {} port: {} cannot process nrt point, closing connection: {}",
nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort(),
status);
Expand All @@ -236,8 +236,8 @@ void sendNewNRTPointToReplicas() {
} catch (Exception e) {
String msg =
String.format(
"top: failed to connect R%d for newNRTPoint; skipping: %s",
replicaID, e.getMessage());
"top: failed to connect %s for newNRTPoint; skipping: %s",
nodeName, e.getMessage());
message(msg);
logger.warn(msg);
}
Expand Down Expand Up @@ -476,11 +476,34 @@ public void setRAMBufferSizeMB(double mb) {
writer.getConfig().setRAMBufferSizeMB(mb);
}

public void addReplica(int replicaID, ReplicationServerClient replicationServerClient)
public void addReplica(String nodeName, ReplicationServerClient replicationServerClient)
throws IOException {
logMessage("add replica: " + warmingSegments.size() + " current warming merges ");
ReplicaDetails replicaDetails = new ReplicaDetails(replicaID, replicationServerClient);
ReplicaDetails replicaDetails = new ReplicaDetails(nodeName, replicationServerClient);
logMessage(
String.format(
"Add replica %s (%s:%d) : %d current warming merges ",
replicaDetails.nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort(),
warmingSegments.size()));
if (!replicasInfos.contains(replicaDetails)) {
Iterator<ReplicaDetails> it = replicasInfos.iterator();
while (it.hasNext()) {
ReplicaDetails existingReplicaDetails = it.next();
// This replica may have reused the address of a previous one that has not been cleaned up
// yet. We should remove any existing replica with the same address to avoid duplicate
// requests.
if (existingReplicaDetails.hostPort.equals(replicaDetails.hostPort)) {
logMessage(
String.format(
"Removing existing replica with same address %s (%s:%d)",
existingReplicaDetails.nodeName,
existingReplicaDetails.hostPort.getHostName(),
existingReplicaDetails.hostPort.getPort()));
existingReplicaDetails.replicationServerClient.close();
it.remove();
}
}
replicasInfos.add(replicaDetails);
}
// Step through all currently warming segments and try to add this replica if it isn't there
Expand All @@ -492,7 +515,8 @@ public void addReplica(int replicaID, ReplicationServerClient replicationServerC
if (preCopy.connections.contains(replicationServerClient)) {
logMessage(
String.format(
"Replica %s:%d is already warming this segment",
"Replica %s (%s:%d) is already warming this segment",
replicaDetails.nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort()));
// It's possible (maybe) that the replica started up, then a merge kicked off, and it
Expand All @@ -513,7 +537,8 @@ public void addReplica(int replicaID, ReplicationServerClient replicationServerC
filesMetadata)) {
logMessage(
String.format(
"Start precopying merged segments for new replica %s:%d",
"Start precopying merged segments for new replica %s (%s:%d)",
replicaDetails.nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort()));
} else {
Expand All @@ -522,7 +547,8 @@ public void addReplica(int replicaID, ReplicationServerClient replicationServerC
// nrt point sent to this replica
logMessage(
String.format(
"Merge precopy already completed, unable to add new replica %s:%d",
"Merge precopy already completed, unable to add new replica %s (%s:%d)",
replicaDetails.nodeName,
replicaDetails.replicationServerClient.getHost(),
replicaDetails.replicationServerClient.getPort()));
}
Expand All @@ -543,7 +569,8 @@ public void close() throws IOException {
ReplicationServerClient replicationServerClient = replicaDetails.getReplicationServerClient();
HostPort replicaHostPort = replicaDetails.getHostPort();
logger.info(
"CLOSE NRT PRIMARY, closing replica channel host:{}, port:{}",
"CLOSE NRT PRIMARY, closing replica channel nodeName: {} host:{}, port:{}",
replicaDetails.getNodeName(),
replicaHostPort.getHostName(),
replicaHostPort.getPort());
replicationServerClient.close();
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/com/yelp/nrtsearch/server/nrt/NRTReplicaNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class NRTReplicaNode extends ReplicaNode {
private final ReplicaDeleterManager replicaDeleterManager;
private final String indexName;
private final String indexId;
private final String nodeName;
private final boolean ackedCopy;
private final boolean filterIncompatibleSegmentReaders;
final NrtCopyThread nrtCopyThread;
Expand All @@ -62,7 +63,7 @@ public NRTReplicaNode(
String indexId,
ReplicationServerClient primaryAddress,
HostPort hostPort,
int replicaId,
String nodeName,
Directory indexDir,
SearcherFactory searcherFactory,
PrintStream printStream,
Expand All @@ -71,10 +72,12 @@ public NRTReplicaNode(
boolean filterIncompatibleSegmentReaders,
int lowPriorityCopyPercentage)
throws IOException {
super(replicaId, indexDir, searcherFactory, printStream);
// the id is always 0, the nodeName is the identifier
super(0, indexDir, searcherFactory, printStream);
this.primaryAddress = primaryAddress;
this.indexName = indexName;
this.indexId = indexId;
this.nodeName = nodeName;
this.ackedCopy = ackedCopy;
this.hostPort = hostPort;
replicaDeleterManager = decInitialCommit ? new ReplicaDeleterManager(this) : null;
Expand Down Expand Up @@ -232,7 +235,7 @@ protected void launch(CopyJob job) {
protected void sendNewReplica() throws IOException {
logger.info(String.format("send new_replica to primary: %s", primaryAddress));
primaryAddress.addReplicas(
indexName, this.indexId, this.id, hostPort.getHostName(), hostPort.getPort());
indexName, this.indexId, this.nodeName, hostPort.getHostName(), hostPort.getPort());
}

public CopyJob launchPreCopyFiles(
Expand Down Expand Up @@ -281,6 +284,10 @@ public ReplicationServerClient getPrimaryAddress() {
return primaryAddress;
}

public String getNodeName() {
return nodeName;
}

public HostPort getHostPort() {
return hostPort;
}
Expand All @@ -290,7 +297,8 @@ public HostPort getHostPort() {
public boolean isKnownToPrimary() {
GetNodesResponse getNodesResponse = primaryAddress.getConnectedNodes(indexName);
for (NodeInfo nodeInfo : getNodesResponse.getNodesList()) {
if (hostPort.equals(new HostPort(nodeInfo.getHostname(), nodeInfo.getPort()))) {
if (nodeName.equals(nodeInfo.getNodeName())
&& hostPort.equals(new HostPort(nodeInfo.getHostname(), nodeInfo.getPort()))) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ public void registerWithPrimary(String indexName, long timeoutMs) throws IOExcep
.setMagicNumber(BINARY_MAGIC)
.setIndexName(indexName)
.setIndexId(indexStateManager.getIndexId())
.setReplicaId(ShardState.REPLICA_ID)
.setNodeName(getGlobalState().getNodeName())
.setHostName("localhost")
.setPort(getGlobalState().getReplicationPort())
.build();
Expand Down

0 comments on commit 820af12

Please sign in to comment.