Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use handleUnaryRequest for replication server methods #769

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -694,9 +694,6 @@ public void custom(CustomRequest request, StreamObserver<CustomResponse> respons
}

static class ReplicationServerImpl extends ReplicationServerGrpc.ReplicationServerImplBase {
private final GlobalState globalState;
private final boolean verifyIndexId;

private final AddReplicaHandler addReplicaHandler;
private final CopyFilesHandler copyFilesHandler;
private final GetNodesInfoHandler getNodesInfoHandler;
Expand All @@ -709,8 +706,6 @@ static class ReplicationServerImpl extends ReplicationServerGrpc.ReplicationServ
private final WriteNRTPointHandler writeNRTPointHandler;

public ReplicationServerImpl(GlobalState globalState, boolean verifyIndexId) {
this.globalState = globalState;
this.verifyIndexId = verifyIndexId;

addReplicaHandler = new AddReplicaHandler(globalState, verifyIndexId);
copyFilesHandler = new CopyFilesHandler(globalState, verifyIndexId);
Expand All @@ -729,7 +724,8 @@ public ReplicationServerImpl(GlobalState globalState, boolean verifyIndexId) {
public void addReplicas(
AddReplicaRequest addReplicaRequest,
StreamObserver<AddReplicaResponse> responseStreamObserver) {
addReplicaHandler.handle(addReplicaRequest, responseStreamObserver);
Handler.handleUnaryRequest(
"addReplicas", addReplicaRequest, responseStreamObserver, addReplicaHandler);
}

@Override
Expand All @@ -753,7 +749,7 @@ public StreamObserver<FileInfo> recvRawFileV2(
@Override
public void recvCopyState(
CopyStateRequest request, StreamObserver<CopyState> responseObserver) {
recvCopyStateHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("recvCopyState", request, responseObserver, recvCopyStateHandler);
}

@Override
Expand All @@ -763,25 +759,31 @@ public void copyFiles(CopyFiles request, StreamObserver<TransferStatus> response

@Override
public void newNRTPoint(NewNRTPoint request, StreamObserver<TransferStatus> responseObserver) {
newNRTPointHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("newNRTPoint", request, responseObserver, newNRTPointHandler);
}

@Override
public void writeNRTPoint(
IndexName indexNameRequest, StreamObserver<SearcherVersion> responseObserver) {
writeNRTPointHandler.handle(indexNameRequest, responseObserver);
Handler.handleUnaryRequest(
"writeNRTPoint", indexNameRequest, responseObserver, writeNRTPointHandler);
}

@Override
public void getCurrentSearcherVersion(
IndexName indexNameRequest, StreamObserver<SearcherVersion> responseObserver) {
replicaCurrentSearchingVersionHandler.handle(indexNameRequest, responseObserver);
Handler.handleUnaryRequest(
"getCurrentSearcherVersion",
indexNameRequest,
responseObserver,
replicaCurrentSearchingVersionHandler);
}

@Override
public void getConnectedNodes(
GetNodesRequest getNodesRequest, StreamObserver<GetNodesResponse> responseObserver) {
getNodesInfoHandler.handle(getNodesRequest, responseObserver);
Handler.handleUnaryRequest(
"getConnectedNodes", getNodesRequest, responseObserver, getNodesInfoHandler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import com.yelp.nrtsearch.server.index.IndexStateManager;
import com.yelp.nrtsearch.server.index.ShardState;
import com.yelp.nrtsearch.server.state.GlobalState;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,31 +39,14 @@ public AddReplicaHandler(GlobalState globalState, boolean verifyIndexId) {
}

@Override
public void handle(
AddReplicaRequest addReplicaRequest, StreamObserver<AddReplicaResponse> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManagerOrThrow(addReplicaRequest.getIndexName());
checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);
public AddReplicaResponse handle(AddReplicaRequest addReplicaRequest) throws Exception {
IndexStateManager indexStateManager = getIndexStateManager(addReplicaRequest.getIndexName());
checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
AddReplicaResponse reply = handle(indexState, addReplicaRequest);
logger.info("AddReplicaHandler returned " + reply);
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
"error while trying to addReplicas for index: "
+ addReplicaRequest.getIndexName())
.augmentDescription(e.getMessage())
.asRuntimeException());
}
IndexState indexState = indexStateManager.getCurrent();
AddReplicaResponse reply = handle(indexState, addReplicaRequest);
logger.info("AddReplicaHandler returned {}", reply);
return reply;
}

private AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) {
Expand Down
42 changes: 18 additions & 24 deletions src/main/java/com/yelp/nrtsearch/server/handler/CommitHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
*/
package com.yelp.nrtsearch.server.handler;

import com.google.protobuf.InvalidProtocolBufferException;
import com.yelp.nrtsearch.server.grpc.CommitRequest;
import com.yelp.nrtsearch.server.grpc.CommitResponse;
import com.yelp.nrtsearch.server.index.IndexState;
import com.yelp.nrtsearch.server.state.GlobalState;
import com.yelp.nrtsearch.server.utils.ProtoMessagePrinter;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,45 +45,38 @@ public void handle(CommitRequest commitRequest, StreamObserver<CommitResponse> r
.wrap(
() -> {
try {
IndexState indexState =
getGlobalState().getIndexOrThrow(commitRequest.getIndexName());
IndexState indexState = getIndexState(commitRequest.getIndexName());
long gen = indexState.commit();
CommitResponse reply =
CommitResponse.newBuilder()
.setGen(gen)
.setPrimaryId(getGlobalState().getEphemeralId())
.build();
logger.debug(
String.format(
"CommitHandler committed to index: %s for sequenceId: %s",
commitRequest.getIndexName(), gen));
"CommitHandler committed to index: {} for sequenceId: {}",
commitRequest.getIndexName(),
gen);
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (IOException e) {
logger.warn(
"error while trying to read index state dir for indexName: "
+ commitRequest.getIndexName(),
e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
"error while trying to read index state dir for indexName: "
+ commitRequest.getIndexName())
.augmentDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
} catch (Exception e) {
String requestStr;
try {
requestStr =
ProtoMessagePrinter.omittingInsignificantWhitespace()
.print(commitRequest);
} catch (InvalidProtocolBufferException ignored) {
// Ignore as invalid proto would have thrown an exception earlier
requestStr = commitRequest.toString();
}
logger.warn(
"error while trying to commit to index "
+ commitRequest.getIndexName(),
e);
String.format("Error handling commit request: %s", requestStr), e);
if (e instanceof StatusRuntimeException) {
responseObserver.onError(e);
} else {
responseObserver.onError(
Status.UNKNOWN
Status.INTERNAL
.withDescription(
"error while trying to commit to index: "
"Error while trying to commit to index: "
+ commitRequest.getIndexName())
.augmentDescription(e.getMessage())
.asRuntimeException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.yelp.nrtsearch.server.handler;

import com.google.protobuf.InvalidProtocolBufferException;
import com.yelp.nrtsearch.server.grpc.CopyFiles;
import com.yelp.nrtsearch.server.grpc.TransferStatus;
import com.yelp.nrtsearch.server.grpc.TransferStatusCode;
Expand All @@ -24,6 +25,7 @@
import com.yelp.nrtsearch.server.monitoring.NrtMetrics;
import com.yelp.nrtsearch.server.nrt.NRTReplicaNode;
import com.yelp.nrtsearch.server.state.GlobalState;
import com.yelp.nrtsearch.server.utils.ProtoMessagePrinter;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
Expand All @@ -49,31 +51,33 @@ public CopyFilesHandler(GlobalState globalState, boolean verifyIndexId) {
@Override
public void handle(CopyFiles request, StreamObserver<TransferStatus> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManagerOrThrow(request.getIndexName());
IndexStateManager indexStateManager = getIndexStateManager(request.getIndexName());
checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
// we need to send multiple responses to client from this method
handle(indexState, request, responseObserver);
logger.info("CopyFilesHandler returned successfully");
} catch (StatusRuntimeException e) {
logger.warn("error while trying copyFiles " + request.getIndexName(), e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn(
String.format(
"error on copyFiles for primaryGen: %s, for index: %s",
request.getPrimaryGen(), request.getIndexName()),
e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
String.format(
"error on copyFiles for primaryGen: %s, for index: %s",
request.getPrimaryGen(), request.getIndexName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
String requestStr;
try {
requestStr = ProtoMessagePrinter.omittingInsignificantWhitespace().print(request);
} catch (InvalidProtocolBufferException ignored) {
// Ignore as invalid proto would have thrown an exception earlier
requestStr = request.toString();
}
logger.warn(String.format("Error handling copyFiles request: %s", requestStr), e);
if (e instanceof StatusRuntimeException) {
responseObserver.onError(e);
} else {
responseObserver.onError(
Status.INTERNAL
.withDescription(
String.format(
"Error on copyFiles for primaryGen: %s, for index: %s",
request.getPrimaryGen(), request.getIndexName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public DeleteByQueryHandler(GlobalState globalState) {

@Override
public AddDocumentResponse handle(DeleteByQueryRequest deleteByQueryRequest) throws Exception {
IndexState indexState = getGlobalState().getIndexOrThrow(deleteByQueryRequest.getIndexName());
IndexState indexState = getIndexState(deleteByQueryRequest.getIndexName());
AddDocumentResponse reply = handle(indexState, deleteByQueryRequest);
logger.debug("DeleteDocumentsHandler returned " + reply);
logger.debug("DeleteDocumentsHandler returned {}", reply);
return reply;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public DeleteDocumentsHandler(GlobalState globalState) {

@Override
public AddDocumentResponse handle(AddDocumentRequest addDocumentRequest) throws Exception {
IndexState indexState = getGlobalState().getIndexOrThrow(addDocumentRequest.getIndexName());
IndexState indexState = getIndexState(addDocumentRequest.getIndexName());
AddDocumentResponse reply = handleInternal(indexState, addDocumentRequest);
logger.debug("DeleteDocumentsHandler returned {}", reply);
return reply;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public DeleteIndexResponse handle(DeleteIndexRequest deleteIndexRequest) throws
logger.info("Received delete index request: {}", deleteIndexRequest);
IndexState indexState = getIndexState(deleteIndexRequest.getIndexName());
DeleteIndexResponse reply = handle(indexState);
logger.info("DeleteIndexHandler returned " + reply);
logger.info("DeleteIndexHandler returned {}", reply);
return reply;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public ForceMergeDeletesResponse handle(ForceMergeDeletesRequest forceMergeReque
}

try {
IndexState indexState = getGlobalState().getIndexOrThrow(forceMergeRequest.getIndexName());
IndexState indexState = getIndexState(forceMergeRequest.getIndexName());
ShardState shardState = indexState.getShards().get(0);
logger.info("Beginning force merge deletes for index: {}", forceMergeRequest.getIndexName());
shardState.writer.forceMergeDeletes(forceMergeRequest.getDoWait());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public ForceMergeResponse handle(ForceMergeRequest forceMergeRequest) throws Exc
throw new IllegalArgumentException("Cannot have 0 max segments");
}

IndexState indexState = getGlobalState().getIndexOrThrow(forceMergeRequest.getIndexName());
IndexState indexState = getIndexState(forceMergeRequest.getIndexName());
ShardState shardState = indexState.getShards().get(0);
logger.info("Beginning force merge for index: {}", forceMergeRequest.getIndexName());
shardState.writer.forceMerge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ public GetAllSnapshotIndexGenHandler(GlobalState globalState) {
@Override
public GetAllSnapshotGenResponse handle(GetAllSnapshotGenRequest request) throws Exception {
Set<Long> snapshotGens =
getGlobalState()
.getIndexOrThrow(request.getIndexName())
.getShard(0)
.snapshotGenToVersion
.keySet();
getIndexState(request.getIndexName()).getShard(0).snapshotGenToVersion.keySet();
return GetAllSnapshotGenResponse.newBuilder().addAllIndexGens(snapshotGens).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.yelp.nrtsearch.server.nrt.NRTPrimaryNode;
import com.yelp.nrtsearch.server.state.GlobalState;
import com.yelp.nrtsearch.server.utils.HostPort;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,29 +35,18 @@ public GetNodesInfoHandler(GlobalState globalState) {
}

@Override
public void handle(
GetNodesRequest getNodesRequest, StreamObserver<GetNodesResponse> responseObserver) {
try {
IndexState indexState = getGlobalState().getIndexOrThrow(getNodesRequest.getIndexName());
GetNodesResponse reply = handle(indexState);
logger.debug("GetNodesInfoHandler returned GetNodeResponse of size " + reply.getNodesCount());
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Exception e) {
logger.warn("error on GetNodesInfoHandler", e);
responseObserver.onError(
Status.INTERNAL
.withDescription("error on GetNodesInfoHandler")
.augmentDescription(e.getMessage())
.asRuntimeException());
}
public GetNodesResponse handle(GetNodesRequest getNodesRequest) throws Exception {
IndexState indexState = getIndexState(getNodesRequest.getIndexName());
GetNodesResponse reply = handle(indexState);
logger.debug("GetNodesInfoHandler returned GetNodeResponse of size {}", reply.getNodesCount());
return reply;
}

private GetNodesResponse handle(IndexState indexState) {
GetNodesResponse.Builder builder = GetNodesResponse.newBuilder();
ShardState shardState = indexState.getShard(0);
if (!shardState.isPrimary() || !shardState.isStarted()) {
logger.warn("index \"" + indexState.getName() + "\" is not a primary or was not started yet");
logger.warn("index \"{}\" is not a primary or was not started yet", indexState.getName());
} else { // shard is a primary and started
Collection<NRTPrimaryNode.ReplicaDetails> replicasInfo =
shardState.nrtPrimaryNode.getNodesInfo();
Expand Down
Loading
Loading