Skip to content

Commit

Permalink
Use handleUnaryRequest for replication server methods (#769)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored Oct 21, 2024
1 parent 6849a4e commit a83cf2f
Show file tree
Hide file tree
Showing 28 changed files with 130 additions and 294 deletions.
24 changes: 13 additions & 11 deletions src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -694,9 +694,6 @@ public void custom(CustomRequest request, StreamObserver<CustomResponse> respons
}

static class ReplicationServerImpl extends ReplicationServerGrpc.ReplicationServerImplBase {
private final GlobalState globalState;
private final boolean verifyIndexId;

private final AddReplicaHandler addReplicaHandler;
private final CopyFilesHandler copyFilesHandler;
private final GetNodesInfoHandler getNodesInfoHandler;
Expand All @@ -709,8 +706,6 @@ static class ReplicationServerImpl extends ReplicationServerGrpc.ReplicationServ
private final WriteNRTPointHandler writeNRTPointHandler;

public ReplicationServerImpl(GlobalState globalState, boolean verifyIndexId) {
this.globalState = globalState;
this.verifyIndexId = verifyIndexId;

addReplicaHandler = new AddReplicaHandler(globalState, verifyIndexId);
copyFilesHandler = new CopyFilesHandler(globalState, verifyIndexId);
Expand All @@ -729,7 +724,8 @@ public ReplicationServerImpl(GlobalState globalState, boolean verifyIndexId) {
public void addReplicas(
AddReplicaRequest addReplicaRequest,
StreamObserver<AddReplicaResponse> responseStreamObserver) {
addReplicaHandler.handle(addReplicaRequest, responseStreamObserver);
Handler.handleUnaryRequest(
"addReplicas", addReplicaRequest, responseStreamObserver, addReplicaHandler);
}

@Override
Expand All @@ -753,7 +749,7 @@ public StreamObserver<FileInfo> recvRawFileV2(
@Override
public void recvCopyState(
CopyStateRequest request, StreamObserver<CopyState> responseObserver) {
recvCopyStateHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("recvCopyState", request, responseObserver, recvCopyStateHandler);
}

@Override
Expand All @@ -763,25 +759,31 @@ public void copyFiles(CopyFiles request, StreamObserver<TransferStatus> response

@Override
public void newNRTPoint(NewNRTPoint request, StreamObserver<TransferStatus> responseObserver) {
newNRTPointHandler.handle(request, responseObserver);
Handler.handleUnaryRequest("newNRTPoint", request, responseObserver, newNRTPointHandler);
}

@Override
public void writeNRTPoint(
IndexName indexNameRequest, StreamObserver<SearcherVersion> responseObserver) {
writeNRTPointHandler.handle(indexNameRequest, responseObserver);
Handler.handleUnaryRequest(
"writeNRTPoint", indexNameRequest, responseObserver, writeNRTPointHandler);
}

@Override
public void getCurrentSearcherVersion(
IndexName indexNameRequest, StreamObserver<SearcherVersion> responseObserver) {
replicaCurrentSearchingVersionHandler.handle(indexNameRequest, responseObserver);
Handler.handleUnaryRequest(
"getCurrentSearcherVersion",
indexNameRequest,
responseObserver,
replicaCurrentSearchingVersionHandler);
}

@Override
public void getConnectedNodes(
GetNodesRequest getNodesRequest, StreamObserver<GetNodesResponse> responseObserver) {
getNodesInfoHandler.handle(getNodesRequest, responseObserver);
Handler.handleUnaryRequest(
"getConnectedNodes", getNodesRequest, responseObserver, getNodesInfoHandler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import com.yelp.nrtsearch.server.index.IndexStateManager;
import com.yelp.nrtsearch.server.index.ShardState;
import com.yelp.nrtsearch.server.state.GlobalState;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,31 +39,14 @@ public AddReplicaHandler(GlobalState globalState, boolean verifyIndexId) {
}

@Override
public void handle(
AddReplicaRequest addReplicaRequest, StreamObserver<AddReplicaResponse> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManagerOrThrow(addReplicaRequest.getIndexName());
checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);
public AddReplicaResponse handle(AddReplicaRequest addReplicaRequest) throws Exception {
IndexStateManager indexStateManager = getIndexStateManager(addReplicaRequest.getIndexName());
checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
AddReplicaResponse reply = handle(indexState, addReplicaRequest);
logger.info("AddReplicaHandler returned " + reply);
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
"error while trying to addReplicas for index: "
+ addReplicaRequest.getIndexName())
.augmentDescription(e.getMessage())
.asRuntimeException());
}
IndexState indexState = indexStateManager.getCurrent();
AddReplicaResponse reply = handle(indexState, addReplicaRequest);
logger.info("AddReplicaHandler returned {}", reply);
return reply;
}

private AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) {
Expand Down
42 changes: 18 additions & 24 deletions src/main/java/com/yelp/nrtsearch/server/handler/CommitHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
*/
package com.yelp.nrtsearch.server.handler;

import com.google.protobuf.InvalidProtocolBufferException;
import com.yelp.nrtsearch.server.grpc.CommitRequest;
import com.yelp.nrtsearch.server.grpc.CommitResponse;
import com.yelp.nrtsearch.server.index.IndexState;
import com.yelp.nrtsearch.server.state.GlobalState;
import com.yelp.nrtsearch.server.utils.ProtoMessagePrinter;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,45 +45,38 @@ public void handle(CommitRequest commitRequest, StreamObserver<CommitResponse> r
.wrap(
() -> {
try {
IndexState indexState =
getGlobalState().getIndexOrThrow(commitRequest.getIndexName());
IndexState indexState = getIndexState(commitRequest.getIndexName());
long gen = indexState.commit();
CommitResponse reply =
CommitResponse.newBuilder()
.setGen(gen)
.setPrimaryId(getGlobalState().getEphemeralId())
.build();
logger.debug(
String.format(
"CommitHandler committed to index: %s for sequenceId: %s",
commitRequest.getIndexName(), gen));
"CommitHandler committed to index: {} for sequenceId: {}",
commitRequest.getIndexName(),
gen);
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (IOException e) {
logger.warn(
"error while trying to read index state dir for indexName: "
+ commitRequest.getIndexName(),
e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
"error while trying to read index state dir for indexName: "
+ commitRequest.getIndexName())
.augmentDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
} catch (Exception e) {
String requestStr;
try {
requestStr =
ProtoMessagePrinter.omittingInsignificantWhitespace()
.print(commitRequest);
} catch (InvalidProtocolBufferException ignored) {
// Ignore as invalid proto would have thrown an exception earlier
requestStr = commitRequest.toString();
}
logger.warn(
"error while trying to commit to index "
+ commitRequest.getIndexName(),
e);
String.format("Error handling commit request: %s", requestStr), e);
if (e instanceof StatusRuntimeException) {
responseObserver.onError(e);
} else {
responseObserver.onError(
Status.UNKNOWN
Status.INTERNAL
.withDescription(
"error while trying to commit to index: "
"Error while trying to commit to index: "
+ commitRequest.getIndexName())
.augmentDescription(e.getMessage())
.asRuntimeException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.yelp.nrtsearch.server.handler;

import com.google.protobuf.InvalidProtocolBufferException;
import com.yelp.nrtsearch.server.grpc.CopyFiles;
import com.yelp.nrtsearch.server.grpc.TransferStatus;
import com.yelp.nrtsearch.server.grpc.TransferStatusCode;
Expand All @@ -24,6 +25,7 @@
import com.yelp.nrtsearch.server.monitoring.NrtMetrics;
import com.yelp.nrtsearch.server.nrt.NRTReplicaNode;
import com.yelp.nrtsearch.server.state.GlobalState;
import com.yelp.nrtsearch.server.utils.ProtoMessagePrinter;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
Expand All @@ -49,31 +51,33 @@ public CopyFilesHandler(GlobalState globalState, boolean verifyIndexId) {
@Override
public void handle(CopyFiles request, StreamObserver<TransferStatus> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManagerOrThrow(request.getIndexName());
IndexStateManager indexStateManager = getIndexStateManager(request.getIndexName());
checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
// we need to send multiple responses to client from this method
handle(indexState, request, responseObserver);
logger.info("CopyFilesHandler returned successfully");
} catch (StatusRuntimeException e) {
logger.warn("error while trying copyFiles " + request.getIndexName(), e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn(
String.format(
"error on copyFiles for primaryGen: %s, for index: %s",
request.getPrimaryGen(), request.getIndexName()),
e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
String.format(
"error on copyFiles for primaryGen: %s, for index: %s",
request.getPrimaryGen(), request.getIndexName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
String requestStr;
try {
requestStr = ProtoMessagePrinter.omittingInsignificantWhitespace().print(request);
} catch (InvalidProtocolBufferException ignored) {
// Ignore as invalid proto would have thrown an exception earlier
requestStr = request.toString();
}
logger.warn(String.format("Error handling copyFiles request: %s", requestStr), e);
if (e instanceof StatusRuntimeException) {
responseObserver.onError(e);
} else {
responseObserver.onError(
Status.INTERNAL
.withDescription(
String.format(
"Error on copyFiles for primaryGen: %s, for index: %s",
request.getPrimaryGen(), request.getIndexName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public DeleteByQueryHandler(GlobalState globalState) {

@Override
public AddDocumentResponse handle(DeleteByQueryRequest deleteByQueryRequest) throws Exception {
IndexState indexState = getGlobalState().getIndexOrThrow(deleteByQueryRequest.getIndexName());
IndexState indexState = getIndexState(deleteByQueryRequest.getIndexName());
AddDocumentResponse reply = handle(indexState, deleteByQueryRequest);
logger.debug("DeleteDocumentsHandler returned " + reply);
logger.debug("DeleteDocumentsHandler returned {}", reply);
return reply;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public DeleteDocumentsHandler(GlobalState globalState) {

@Override
public AddDocumentResponse handle(AddDocumentRequest addDocumentRequest) throws Exception {
IndexState indexState = getGlobalState().getIndexOrThrow(addDocumentRequest.getIndexName());
IndexState indexState = getIndexState(addDocumentRequest.getIndexName());
AddDocumentResponse reply = handleInternal(indexState, addDocumentRequest);
logger.debug("DeleteDocumentsHandler returned {}", reply);
return reply;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public DeleteIndexResponse handle(DeleteIndexRequest deleteIndexRequest) throws
logger.info("Received delete index request: {}", deleteIndexRequest);
IndexState indexState = getIndexState(deleteIndexRequest.getIndexName());
DeleteIndexResponse reply = handle(indexState);
logger.info("DeleteIndexHandler returned " + reply);
logger.info("DeleteIndexHandler returned {}", reply);
return reply;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public ForceMergeDeletesResponse handle(ForceMergeDeletesRequest forceMergeReque
}

try {
IndexState indexState = getGlobalState().getIndexOrThrow(forceMergeRequest.getIndexName());
IndexState indexState = getIndexState(forceMergeRequest.getIndexName());
ShardState shardState = indexState.getShards().get(0);
logger.info("Beginning force merge deletes for index: {}", forceMergeRequest.getIndexName());
shardState.writer.forceMergeDeletes(forceMergeRequest.getDoWait());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public ForceMergeResponse handle(ForceMergeRequest forceMergeRequest) throws Exc
throw new IllegalArgumentException("Cannot have 0 max segments");
}

IndexState indexState = getGlobalState().getIndexOrThrow(forceMergeRequest.getIndexName());
IndexState indexState = getIndexState(forceMergeRequest.getIndexName());
ShardState shardState = indexState.getShards().get(0);
logger.info("Beginning force merge for index: {}", forceMergeRequest.getIndexName());
shardState.writer.forceMerge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ public GetAllSnapshotIndexGenHandler(GlobalState globalState) {
@Override
public GetAllSnapshotGenResponse handle(GetAllSnapshotGenRequest request) throws Exception {
Set<Long> snapshotGens =
getGlobalState()
.getIndexOrThrow(request.getIndexName())
.getShard(0)
.snapshotGenToVersion
.keySet();
getIndexState(request.getIndexName()).getShard(0).snapshotGenToVersion.keySet();
return GetAllSnapshotGenResponse.newBuilder().addAllIndexGens(snapshotGens).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.yelp.nrtsearch.server.nrt.NRTPrimaryNode;
import com.yelp.nrtsearch.server.state.GlobalState;
import com.yelp.nrtsearch.server.utils.HostPort;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,29 +35,18 @@ public GetNodesInfoHandler(GlobalState globalState) {
}

@Override
public void handle(
GetNodesRequest getNodesRequest, StreamObserver<GetNodesResponse> responseObserver) {
try {
IndexState indexState = getGlobalState().getIndexOrThrow(getNodesRequest.getIndexName());
GetNodesResponse reply = handle(indexState);
logger.debug("GetNodesInfoHandler returned GetNodeResponse of size " + reply.getNodesCount());
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Exception e) {
logger.warn("error on GetNodesInfoHandler", e);
responseObserver.onError(
Status.INTERNAL
.withDescription("error on GetNodesInfoHandler")
.augmentDescription(e.getMessage())
.asRuntimeException());
}
public GetNodesResponse handle(GetNodesRequest getNodesRequest) throws Exception {
IndexState indexState = getIndexState(getNodesRequest.getIndexName());
GetNodesResponse reply = handle(indexState);
logger.debug("GetNodesInfoHandler returned GetNodeResponse of size {}", reply.getNodesCount());
return reply;
}

private GetNodesResponse handle(IndexState indexState) {
GetNodesResponse.Builder builder = GetNodesResponse.newBuilder();
ShardState shardState = indexState.getShard(0);
if (!shardState.isPrimary() || !shardState.isStarted()) {
logger.warn("index \"" + indexState.getName() + "\" is not a primary or was not started yet");
logger.warn("index \"{}\" is not a primary or was not started yet", indexState.getName());
} else { // shard is a primary and started
Collection<NRTPrimaryNode.ReplicaDetails> replicasInfo =
shardState.nrtPrimaryNode.getNodesInfo();
Expand Down
Loading

0 comments on commit a83cf2f

Please sign in to comment.