From a83cf2f837052509a1a0591f8cc0d435078ca970 Mon Sep 17 00:00:00 2001 From: Andrew Prudhomme Date: Mon, 21 Oct 2024 10:38:41 -0700 Subject: [PATCH] Use handleUnaryRequest for replication server methods (#769) --- .../server/grpc/NrtsearchServer.java | 24 +++++----- .../server/handler/AddReplicaHandler.java | 34 +++----------- .../server/handler/CommitHandler.java | 42 +++++++---------- .../server/handler/CopyFilesHandler.java | 42 +++++++++-------- .../server/handler/DeleteByQueryHandler.java | 4 +- .../handler/DeleteDocumentsHandler.java | 2 +- .../server/handler/DeleteIndexHandler.java | 2 +- .../handler/ForceMergeDeletesHandler.java | 2 +- .../server/handler/ForceMergeHandler.java | 2 +- .../GetAllSnapshotIndexGenHandler.java | 6 +-- .../server/handler/GetNodesInfoHandler.java | 25 +++------- .../server/handler/NewNRTPointHandler.java | 47 ++++--------------- .../server/handler/RecvCopyStateHandler.java | 44 ++++------------- .../server/handler/RecvRawFileHandler.java | 3 +- .../server/handler/RecvRawFileV2Handler.java | 2 +- .../server/handler/RegisterFieldsHandler.java | 2 +- ...ReplicaCurrentSearchingVersionHandler.java | 29 ++---------- .../server/handler/SearchHandler.java | 30 +++--------- .../server/handler/SearchV2Handler.java | 29 +++--------- .../server/handler/SettingsHandler.java | 2 +- .../server/handler/StartIndexHandler.java | 2 +- .../server/handler/StartIndexV2Handler.java | 2 +- .../server/handler/StatusHandler.java | 2 +- .../server/handler/StopIndexHandler.java | 2 +- .../server/handler/UpdateFieldsHandler.java | 2 +- .../server/handler/WriteNRTPointHandler.java | 31 +++--------- .../server/field/DateTimeFieldDefTest.java | 6 +-- .../grpc/ReplicationServerClientTest.java | 4 +- 28 files changed, 130 insertions(+), 294 deletions(-) diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java index c67e78aec..8b064b685 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java @@ -694,9 +694,6 @@ public void custom(CustomRequest request, StreamObserver respons } static class ReplicationServerImpl extends ReplicationServerGrpc.ReplicationServerImplBase { - private final GlobalState globalState; - private final boolean verifyIndexId; - private final AddReplicaHandler addReplicaHandler; private final CopyFilesHandler copyFilesHandler; private final GetNodesInfoHandler getNodesInfoHandler; @@ -709,8 +706,6 @@ static class ReplicationServerImpl extends ReplicationServerGrpc.ReplicationServ private final WriteNRTPointHandler writeNRTPointHandler; public ReplicationServerImpl(GlobalState globalState, boolean verifyIndexId) { - this.globalState = globalState; - this.verifyIndexId = verifyIndexId; addReplicaHandler = new AddReplicaHandler(globalState, verifyIndexId); copyFilesHandler = new CopyFilesHandler(globalState, verifyIndexId); @@ -729,7 +724,8 @@ public ReplicationServerImpl(GlobalState globalState, boolean verifyIndexId) { public void addReplicas( AddReplicaRequest addReplicaRequest, StreamObserver responseStreamObserver) { - addReplicaHandler.handle(addReplicaRequest, responseStreamObserver); + Handler.handleUnaryRequest( + "addReplicas", addReplicaRequest, responseStreamObserver, addReplicaHandler); } @Override @@ -753,7 +749,7 @@ public StreamObserver recvRawFileV2( @Override public void recvCopyState( CopyStateRequest request, StreamObserver responseObserver) { - recvCopyStateHandler.handle(request, responseObserver); + Handler.handleUnaryRequest("recvCopyState", request, responseObserver, recvCopyStateHandler); } @Override @@ -763,25 +759,31 @@ public void copyFiles(CopyFiles request, StreamObserver response @Override public void newNRTPoint(NewNRTPoint request, StreamObserver responseObserver) { - newNRTPointHandler.handle(request, responseObserver); + Handler.handleUnaryRequest("newNRTPoint", request, responseObserver, newNRTPointHandler); } @Override public void writeNRTPoint( IndexName indexNameRequest, StreamObserver responseObserver) { - writeNRTPointHandler.handle(indexNameRequest, responseObserver); + Handler.handleUnaryRequest( + "writeNRTPoint", indexNameRequest, responseObserver, writeNRTPointHandler); } @Override public void getCurrentSearcherVersion( IndexName indexNameRequest, StreamObserver responseObserver) { - replicaCurrentSearchingVersionHandler.handle(indexNameRequest, responseObserver); + Handler.handleUnaryRequest( + "getCurrentSearcherVersion", + indexNameRequest, + responseObserver, + replicaCurrentSearchingVersionHandler); } @Override public void getConnectedNodes( GetNodesRequest getNodesRequest, StreamObserver responseObserver) { - getNodesInfoHandler.handle(getNodesRequest, responseObserver); + Handler.handleUnaryRequest( + "getConnectedNodes", getNodesRequest, responseObserver, getNodesInfoHandler); } } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/AddReplicaHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/AddReplicaHandler.java index 407898397..4c09c821b 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/AddReplicaHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/AddReplicaHandler.java @@ -22,9 +22,6 @@ import com.yelp.nrtsearch.server.index.IndexStateManager; import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,31 +39,14 @@ public AddReplicaHandler(GlobalState globalState, boolean verifyIndexId) { } @Override - public void handle( - AddReplicaRequest addReplicaRequest, StreamObserver responseObserver) { - try { - IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(addReplicaRequest.getIndexName()); - checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); + public AddReplicaResponse handle(AddReplicaRequest addReplicaRequest) throws Exception { + IndexStateManager indexStateManager = getIndexStateManager(addReplicaRequest.getIndexName()); + checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); - IndexState indexState = indexStateManager.getCurrent(); - AddReplicaResponse reply = handle(indexState, addReplicaRequest); - logger.info("AddReplicaHandler returned " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (StatusRuntimeException e) { - logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e); - responseObserver.onError(e); - } catch (Exception e) { - logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to addReplicas for index: " - + addReplicaRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + IndexState indexState = indexStateManager.getCurrent(); + AddReplicaResponse reply = handle(indexState, addReplicaRequest); + logger.info("AddReplicaHandler returned {}", reply); + return reply; } private AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) { diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/CommitHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/CommitHandler.java index b33469835..bb11ae164 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/CommitHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/CommitHandler.java @@ -15,15 +15,16 @@ */ package com.yelp.nrtsearch.server.handler; +import com.google.protobuf.InvalidProtocolBufferException; import com.yelp.nrtsearch.server.grpc.CommitRequest; import com.yelp.nrtsearch.server.grpc.CommitResponse; import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.state.GlobalState; +import com.yelp.nrtsearch.server.utils.ProtoMessagePrinter; import io.grpc.Context; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; -import java.io.IOException; import java.util.concurrent.RejectedExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,8 +45,7 @@ public void handle(CommitRequest commitRequest, StreamObserver r .wrap( () -> { try { - IndexState indexState = - getGlobalState().getIndexOrThrow(commitRequest.getIndexName()); + IndexState indexState = getIndexState(commitRequest.getIndexName()); long gen = indexState.commit(); CommitResponse reply = CommitResponse.newBuilder() @@ -53,36 +53,30 @@ public void handle(CommitRequest commitRequest, StreamObserver r .setPrimaryId(getGlobalState().getEphemeralId()) .build(); logger.debug( - String.format( - "CommitHandler committed to index: %s for sequenceId: %s", - commitRequest.getIndexName(), gen)); + "CommitHandler committed to index: {} for sequenceId: {}", + commitRequest.getIndexName(), + gen); responseObserver.onNext(reply); responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: " - + commitRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + commitRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); } catch (Exception e) { + String requestStr; + try { + requestStr = + ProtoMessagePrinter.omittingInsignificantWhitespace() + .print(commitRequest); + } catch (InvalidProtocolBufferException ignored) { + // Ignore as invalid proto would have thrown an exception earlier + requestStr = commitRequest.toString(); + } logger.warn( - "error while trying to commit to index " - + commitRequest.getIndexName(), - e); + String.format("Error handling commit request: %s", requestStr), e); if (e instanceof StatusRuntimeException) { responseObserver.onError(e); } else { responseObserver.onError( - Status.UNKNOWN + Status.INTERNAL .withDescription( - "error while trying to commit to index: " + "Error while trying to commit to index: " + commitRequest.getIndexName()) .augmentDescription(e.getMessage()) .asRuntimeException()); diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/CopyFilesHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/CopyFilesHandler.java index cabf2073d..58daa0573 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/CopyFilesHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/CopyFilesHandler.java @@ -15,6 +15,7 @@ */ package com.yelp.nrtsearch.server.handler; +import com.google.protobuf.InvalidProtocolBufferException; import com.yelp.nrtsearch.server.grpc.CopyFiles; import com.yelp.nrtsearch.server.grpc.TransferStatus; import com.yelp.nrtsearch.server.grpc.TransferStatusCode; @@ -24,6 +25,7 @@ import com.yelp.nrtsearch.server.monitoring.NrtMetrics; import com.yelp.nrtsearch.server.nrt.NRTReplicaNode; import com.yelp.nrtsearch.server.state.GlobalState; +import com.yelp.nrtsearch.server.utils.ProtoMessagePrinter; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -49,31 +51,33 @@ public CopyFilesHandler(GlobalState globalState, boolean verifyIndexId) { @Override public void handle(CopyFiles request, StreamObserver responseObserver) { try { - IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(request.getIndexName()); + IndexStateManager indexStateManager = getIndexStateManager(request.getIndexName()); checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); IndexState indexState = indexStateManager.getCurrent(); // we need to send multiple responses to client from this method handle(indexState, request, responseObserver); - logger.info("CopyFilesHandler returned successfully"); - } catch (StatusRuntimeException e) { - logger.warn("error while trying copyFiles " + request.getIndexName(), e); - responseObserver.onError(e); } catch (Exception e) { - logger.warn( - String.format( - "error on copyFiles for primaryGen: %s, for index: %s", - request.getPrimaryGen(), request.getIndexName()), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - String.format( - "error on copyFiles for primaryGen: %s, for index: %s", - request.getPrimaryGen(), request.getIndexName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); + String requestStr; + try { + requestStr = ProtoMessagePrinter.omittingInsignificantWhitespace().print(request); + } catch (InvalidProtocolBufferException ignored) { + // Ignore as invalid proto would have thrown an exception earlier + requestStr = request.toString(); + } + logger.warn(String.format("Error handling copyFiles request: %s", requestStr), e); + if (e instanceof StatusRuntimeException) { + responseObserver.onError(e); + } else { + responseObserver.onError( + Status.INTERNAL + .withDescription( + String.format( + "Error on copyFiles for primaryGen: %s, for index: %s", + request.getPrimaryGen(), request.getIndexName())) + .augmentDescription(e.getMessage()) + .asRuntimeException()); + } } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteByQueryHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteByQueryHandler.java index 3a30a7979..980fdac5c 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteByQueryHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteByQueryHandler.java @@ -38,9 +38,9 @@ public DeleteByQueryHandler(GlobalState globalState) { @Override public AddDocumentResponse handle(DeleteByQueryRequest deleteByQueryRequest) throws Exception { - IndexState indexState = getGlobalState().getIndexOrThrow(deleteByQueryRequest.getIndexName()); + IndexState indexState = getIndexState(deleteByQueryRequest.getIndexName()); AddDocumentResponse reply = handle(indexState, deleteByQueryRequest); - logger.debug("DeleteDocumentsHandler returned " + reply); + logger.debug("DeleteDocumentsHandler returned {}", reply); return reply; } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteDocumentsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteDocumentsHandler.java index 7bdb39c4c..687365f50 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteDocumentsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteDocumentsHandler.java @@ -39,7 +39,7 @@ public DeleteDocumentsHandler(GlobalState globalState) { @Override public AddDocumentResponse handle(AddDocumentRequest addDocumentRequest) throws Exception { - IndexState indexState = getGlobalState().getIndexOrThrow(addDocumentRequest.getIndexName()); + IndexState indexState = getIndexState(addDocumentRequest.getIndexName()); AddDocumentResponse reply = handleInternal(indexState, addDocumentRequest); logger.debug("DeleteDocumentsHandler returned {}", reply); return reply; diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteIndexHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteIndexHandler.java index bd2d0fd39..88e420a3d 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteIndexHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteIndexHandler.java @@ -35,7 +35,7 @@ public DeleteIndexResponse handle(DeleteIndexRequest deleteIndexRequest) throws logger.info("Received delete index request: {}", deleteIndexRequest); IndexState indexState = getIndexState(deleteIndexRequest.getIndexName()); DeleteIndexResponse reply = handle(indexState); - logger.info("DeleteIndexHandler returned " + reply); + logger.info("DeleteIndexHandler returned {}", reply); return reply; } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeDeletesHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeDeletesHandler.java index 0d076cd34..d7a64e78c 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeDeletesHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeDeletesHandler.java @@ -42,7 +42,7 @@ public ForceMergeDeletesResponse handle(ForceMergeDeletesRequest forceMergeReque } try { - IndexState indexState = getGlobalState().getIndexOrThrow(forceMergeRequest.getIndexName()); + IndexState indexState = getIndexState(forceMergeRequest.getIndexName()); ShardState shardState = indexState.getShards().get(0); logger.info("Beginning force merge deletes for index: {}", forceMergeRequest.getIndexName()); shardState.writer.forceMergeDeletes(forceMergeRequest.getDoWait()); diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeHandler.java index df2fe6548..98d85705e 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeHandler.java @@ -40,7 +40,7 @@ public ForceMergeResponse handle(ForceMergeRequest forceMergeRequest) throws Exc throw new IllegalArgumentException("Cannot have 0 max segments"); } - IndexState indexState = getGlobalState().getIndexOrThrow(forceMergeRequest.getIndexName()); + IndexState indexState = getIndexState(forceMergeRequest.getIndexName()); ShardState shardState = indexState.getShards().get(0); logger.info("Beginning force merge for index: {}", forceMergeRequest.getIndexName()); shardState.writer.forceMerge( diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/GetAllSnapshotIndexGenHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/GetAllSnapshotIndexGenHandler.java index 8c7386de3..7723a1204 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/GetAllSnapshotIndexGenHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/GetAllSnapshotIndexGenHandler.java @@ -33,11 +33,7 @@ public GetAllSnapshotIndexGenHandler(GlobalState globalState) { @Override public GetAllSnapshotGenResponse handle(GetAllSnapshotGenRequest request) throws Exception { Set snapshotGens = - getGlobalState() - .getIndexOrThrow(request.getIndexName()) - .getShard(0) - .snapshotGenToVersion - .keySet(); + getIndexState(request.getIndexName()).getShard(0).snapshotGenToVersion.keySet(); return GetAllSnapshotGenResponse.newBuilder().addAllIndexGens(snapshotGens).build(); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/GetNodesInfoHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/GetNodesInfoHandler.java index 6d50d9352..b29843659 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/GetNodesInfoHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/GetNodesInfoHandler.java @@ -23,8 +23,6 @@ import com.yelp.nrtsearch.server.nrt.NRTPrimaryNode; import com.yelp.nrtsearch.server.state.GlobalState; import com.yelp.nrtsearch.server.utils.HostPort; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,29 +35,18 @@ public GetNodesInfoHandler(GlobalState globalState) { } @Override - public void handle( - GetNodesRequest getNodesRequest, StreamObserver responseObserver) { - try { - IndexState indexState = getGlobalState().getIndexOrThrow(getNodesRequest.getIndexName()); - GetNodesResponse reply = handle(indexState); - logger.debug("GetNodesInfoHandler returned GetNodeResponse of size " + reply.getNodesCount()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn("error on GetNodesInfoHandler", e); - responseObserver.onError( - Status.INTERNAL - .withDescription("error on GetNodesInfoHandler") - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public GetNodesResponse handle(GetNodesRequest getNodesRequest) throws Exception { + IndexState indexState = getIndexState(getNodesRequest.getIndexName()); + GetNodesResponse reply = handle(indexState); + logger.debug("GetNodesInfoHandler returned GetNodeResponse of size {}", reply.getNodesCount()); + return reply; } private GetNodesResponse handle(IndexState indexState) { GetNodesResponse.Builder builder = GetNodesResponse.newBuilder(); ShardState shardState = indexState.getShard(0); if (!shardState.isPrimary() || !shardState.isStarted()) { - logger.warn("index \"" + indexState.getName() + "\" is not a primary or was not started yet"); + logger.warn("index \"{}\" is not a primary or was not started yet", indexState.getName()); } else { // shard is a primary and started Collection replicasInfo = shardState.nrtPrimaryNode.getNodesInfo(); diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/NewNRTPointHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/NewNRTPointHandler.java index f6cea8dd0..5c8da8c78 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/NewNRTPointHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/NewNRTPointHandler.java @@ -22,9 +22,6 @@ import com.yelp.nrtsearch.server.index.IndexStateManager; import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,43 +36,15 @@ public NewNRTPointHandler(GlobalState globalState, boolean verifyIndexId) { } @Override - public void handle(NewNRTPoint request, StreamObserver responseObserver) { - try { - IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(request.getIndexName()); - checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); + public TransferStatus handle(NewNRTPoint request) throws Exception { + IndexStateManager indexStateManager = getIndexStateManager(request.getIndexName()); + checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); - IndexState indexState = indexStateManager.getCurrent(); - TransferStatus reply = handle(indexState, request); - logger.debug( - "NewNRTPointHandler returned status " - + reply.getCode() - + " message: " - + reply.getMessage()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (StatusRuntimeException e) { - logger.warn( - String.format( - "error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s", - request.getIndexName(), request.getVersion(), request.getPrimaryGen()), - e); - responseObserver.onError(e); - } catch (Exception e) { - logger.warn( - String.format( - "error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s", - request.getIndexName(), request.getVersion(), request.getPrimaryGen()), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - String.format( - "error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s", - request.getIndexName(), request.getVersion(), request.getPrimaryGen())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + IndexState indexState = indexStateManager.getCurrent(); + TransferStatus reply = handle(indexState, request); + logger.debug( + "NewNRTPointHandler returned status {} message: {}", reply.getCode(), reply.getMessage()); + return reply; } private TransferStatus handle(IndexState indexState, NewNRTPoint newNRTPointRequest) { diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/RecvCopyStateHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/RecvCopyStateHandler.java index 274b6e5ec..f9058ac97 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/RecvCopyStateHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/RecvCopyStateHandler.java @@ -25,9 +25,6 @@ import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.nrt.NRTPrimaryNode; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.Map; import org.apache.lucene.replicator.nrt.FileMetaData; @@ -45,37 +42,16 @@ public RecvCopyStateHandler(GlobalState globalState, boolean verifyIndexId) { } @Override - public void handle(CopyStateRequest request, StreamObserver responseObserver) { - try { - IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(request.getIndexName()); - checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); - - IndexState indexState = indexStateManager.getCurrent(); - CopyState reply = handle(indexState, request); - logger.debug( - "RecvCopyStateHandler returned, completedMergeFiles count: " - + reply.getCompletedMergeFilesCount()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (StatusRuntimeException e) { - logger.warn("error while trying recvCopyState " + request.getIndexName(), e); - responseObserver.onError(e); - } catch (Exception e) { - logger.warn( - String.format( - "error on recvCopyState for replicaId: %s, for index: %s", - request.getReplicaId(), request.getIndexName()), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - String.format( - "error on recvCopyState for replicaId: %s, for index: %s", - request.getReplicaId(), request.getIndexName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public CopyState handle(CopyStateRequest request) throws Exception { + IndexStateManager indexStateManager = getIndexStateManager(request.getIndexName()); + checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); + + IndexState indexState = indexStateManager.getCurrent(); + CopyState reply = handle(indexState, request); + logger.debug( + "RecvCopyStateHandler returned, completedMergeFiles count: {}", + reply.getCompletedMergeFilesCount()); + return reply; } private CopyState handle(IndexState indexState, CopyStateRequest copyStateRequest) { diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileHandler.java index ad4ba60e9..7a5e44f8b 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileHandler.java @@ -42,8 +42,7 @@ public RecvRawFileHandler(GlobalState globalState, boolean verifyIndexId) { @Override public void handle(FileInfo fileInfoRequest, StreamObserver responseObserver) { try { - IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(fileInfoRequest.getIndexName()); + IndexStateManager indexStateManager = getIndexStateManager(fileInfoRequest.getIndexName()); checkIndexId(fileInfoRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); IndexState indexState = indexStateManager.getCurrent(); diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileV2Handler.java b/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileV2Handler.java index d575d26df..ef3a77108 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileV2Handler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileV2Handler.java @@ -59,7 +59,7 @@ public void onNext(FileInfo fileInfoRequest) { if (indexState == null) { // Start transfer IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(fileInfoRequest.getIndexName()); + getIndexStateManager(fileInfoRequest.getIndexName()); checkIndexId( fileInfoRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/RegisterFieldsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/RegisterFieldsHandler.java index df01abdc1..95dd9c5af 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/RegisterFieldsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/RegisterFieldsHandler.java @@ -35,7 +35,7 @@ public FieldDefResponse handle(FieldDefRequest fieldDefRequest) throws Exception IndexStateManager indexStateManager = getIndexStateManager(fieldDefRequest.getIndexName()); String updatedFields = indexStateManager.updateFields(fieldDefRequest.getFieldList()); FieldDefResponse reply = FieldDefResponse.newBuilder().setResponse(updatedFields).build(); - logger.info("RegisterFieldsHandler registered fields " + reply); + logger.info("RegisterFieldsHandler registered fields {}", reply); return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/ReplicaCurrentSearchingVersionHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/ReplicaCurrentSearchingVersionHandler.java index ce0910fec..791ef2a7b 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/ReplicaCurrentSearchingVersionHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/ReplicaCurrentSearchingVersionHandler.java @@ -20,8 +20,6 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,28 +33,11 @@ public ReplicaCurrentSearchingVersionHandler(GlobalState globalState) { } @Override - public void handle(IndexName indexNameRequest, StreamObserver responseObserver) { - try { - IndexState indexState = getGlobalState().getIndexOrThrow(indexNameRequest.getIndexName()); - SearcherVersion reply = handle(indexState, indexNameRequest); - logger.info("ReplicaCurrentSearchingVersionHandler returned version " + reply.getVersion()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn( - String.format( - "error on getCurrentSearcherVersion for indexName: %s", - indexNameRequest.getIndexName()), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - String.format( - "error on getCurrentSearcherVersion for indexName: %s", - indexNameRequest.getIndexName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public SearcherVersion handle(IndexName indexNameRequest) throws Exception { + IndexState indexState = getIndexState(indexNameRequest.getIndexName()); + SearcherVersion reply = handle(indexState, indexNameRequest); + logger.info("ReplicaCurrentSearchingVersionHandler returned version {}", reply.getVersion()); + return reply; } private SearcherVersion handle(IndexState indexState, IndexName indexNameRequest) { diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/SearchHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/SearchHandler.java index 8313eea77..108171a18 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/SearchHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/SearchHandler.java @@ -108,39 +108,23 @@ public void handle(SearchRequest searchRequest, StreamObserver r setResponseCompression(searchRequest.getResponseCompression(), responseObserver); responseObserver.onNext(reply); responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: {}", - searchRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + searchRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); } catch (Exception e) { - String searchRequestJson = null; + String requestStr; try { - searchRequestJson = protoMessagePrinter.print(searchRequest); + requestStr = protoMessagePrinter.print(searchRequest); } catch (InvalidProtocolBufferException ignored) { // Ignore as invalid proto would have thrown an exception earlier + requestStr = searchRequest.toString(); } - logger.warn( - "error while trying to execute search for index {}: request: {}", - searchRequest.getIndexName(), - searchRequestJson, - e); + logger.warn("Error handling search request: {}", requestStr, e); if (e instanceof StatusRuntimeException) { responseObserver.onError(e); } else { responseObserver.onError( - Status.UNKNOWN + Status.INTERNAL .withDescription( String.format( - "error while trying to execute search for index %s. check logs for full searchRequest.", + "Error while trying to execute search for index %s. check logs for full searchRequest.", searchRequest.getIndexName())) .augmentDescription(e.getMessage()) .asRuntimeException()); @@ -150,7 +134,7 @@ public void handle(SearchRequest searchRequest, StreamObserver r public SearchResponse getSearchResponse(SearchRequest searchRequest) throws IOException, SearchHandlerException { - IndexState indexState = getGlobalState().getIndexOrThrow(searchRequest.getIndexName()); + IndexState indexState = getIndexState(searchRequest.getIndexName()); return handle(indexState, searchRequest); } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/SearchV2Handler.java b/src/main/java/com/yelp/nrtsearch/server/handler/SearchV2Handler.java index f05ab2c97..a0814e5db 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/SearchV2Handler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/SearchV2Handler.java @@ -25,7 +25,6 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; -import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,39 +47,23 @@ public void handle(SearchRequest searchRequest, StreamObserver responseObse setResponseCompression(searchRequest.getResponseCompression(), responseObserver); responseObserver.onNext(Any.pack(searchResponse)); responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: {}", - searchRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + searchRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); } catch (Exception e) { - String searchRequestJson = null; + String requestStr; try { - searchRequestJson = protoMessagePrinter.print(searchRequest); + requestStr = protoMessagePrinter.print(searchRequest); } catch (InvalidProtocolBufferException ignored) { // Ignore as invalid proto would have thrown an exception earlier + requestStr = searchRequest.toString(); } - logger.warn( - String.format( - "error while trying to execute search for index %s: request: %s", - searchRequest.getIndexName(), searchRequestJson), - e); + logger.warn(String.format("Error handling searchV2 request: %s", requestStr), e); if (e instanceof StatusRuntimeException) { responseObserver.onError(e); } else { responseObserver.onError( - Status.UNKNOWN + Status.INTERNAL .withDescription( String.format( - "error while trying to execute search for index %s. check logs for full searchRequest.", + "Error while trying to execute search for index %s. check logs for full searchRequest.", searchRequest.getIndexName())) .augmentDescription(e.getMessage()) .asRuntimeException()); diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/SettingsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/SettingsHandler.java index 9cc37d0a7..c64de2830 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/SettingsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/SettingsHandler.java @@ -42,7 +42,7 @@ public SettingsResponse handle(SettingsRequest settingsRequest) throws Exception logger.info("Received settings request: {}", settingsRequest); IndexState indexState = getIndexState(settingsRequest.getIndexName()); SettingsResponse reply = handle(indexState, settingsRequest); - logger.info("SettingsHandler returned " + reply); + logger.info("SettingsHandler returned {}", reply); return reply; } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexHandler.java index 4ce5f351f..e592b4e03 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexHandler.java @@ -40,7 +40,7 @@ public StartIndexResponse handle(StartIndexRequest startIndexRequest) throws Exc } StartIndexResponse reply = getGlobalState().startIndex(startIndexRequest); - logger.info("StartIndexHandler returned " + reply.toString()); + logger.info("StartIndexHandler returned {}", reply.toString()); return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexV2Handler.java b/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexV2Handler.java index d11cadff4..7758683fb 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexV2Handler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexV2Handler.java @@ -40,7 +40,7 @@ public StartIndexResponse handle(StartIndexV2Request startIndexRequest) throws E } StartIndexResponse reply = getGlobalState().startIndexV2(startIndexRequest); - logger.info("StartIndexV2Handler returned " + reply.toString()); + logger.info("StartIndexV2Handler returned {}", reply.toString()); return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/StatusHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/StatusHandler.java index 5d3777cbe..226b34b1d 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/StatusHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/StatusHandler.java @@ -32,7 +32,7 @@ public StatusHandler() { public HealthCheckResponse handle(HealthCheckRequest request) throws Exception { HealthCheckResponse reply = HealthCheckResponse.newBuilder().setHealth(TransferStatusCode.Done).build(); - logger.debug("HealthCheckResponse returned " + reply); + logger.debug("HealthCheckResponse returned {}", reply); return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/StopIndexHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/StopIndexHandler.java index 61c6345de..bbdd851e1 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/StopIndexHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/StopIndexHandler.java @@ -33,7 +33,7 @@ public DummyResponse handle(StopIndexRequest stopIndexRequest) throws Exception logger.info("Received stop index request: {}", stopIndexRequest); DummyResponse reply = getGlobalState().stopIndex(stopIndexRequest); - logger.info("StopIndexHandler returned " + reply); + logger.info("StopIndexHandler returned {}", reply); return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/UpdateFieldsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/UpdateFieldsHandler.java index 98d5cb2de..cefd8cad6 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/UpdateFieldsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/UpdateFieldsHandler.java @@ -35,7 +35,7 @@ public FieldDefResponse handle(FieldDefRequest fieldDefRequest) throws Exception IndexStateManager indexStateManager = getIndexStateManager(fieldDefRequest.getIndexName()); String updatedFields = indexStateManager.updateFields(fieldDefRequest.getFieldList()); FieldDefResponse reply = FieldDefResponse.newBuilder().setResponse(updatedFields).build(); - logger.info("UpdateFieldsHandler registered fields " + reply); + logger.info("UpdateFieldsHandler registered fields {}", reply); return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/WriteNRTPointHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/WriteNRTPointHandler.java index 33f968e76..41047a122 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/WriteNRTPointHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/WriteNRTPointHandler.java @@ -25,7 +25,6 @@ import com.yelp.nrtsearch.server.state.GlobalState; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.Collection; import java.util.Iterator; @@ -42,29 +41,13 @@ public WriteNRTPointHandler(GlobalState globalState) { } @Override - public void handle(IndexName indexNameRequest, StreamObserver responseObserver) { - try { - IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(indexNameRequest.getIndexName()); - String indexId = indexStateManager.getIndexId(); - IndexState indexState = indexStateManager.getCurrent(); - SearcherVersion reply = handle(indexState, indexId); - logger.debug("WriteNRTPointHandler returned version " + reply.getVersion()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn( - String.format( - "error on writeNRTPoint for indexName: %s", indexNameRequest.getIndexName()), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - String.format( - "error on writeNRTPoint for indexName: %s", indexNameRequest.getIndexName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public SearcherVersion handle(IndexName indexNameRequest) throws Exception { + IndexStateManager indexStateManager = getIndexStateManager(indexNameRequest.getIndexName()); + String indexId = indexStateManager.getIndexId(); + IndexState indexState = indexStateManager.getCurrent(); + SearcherVersion reply = handle(indexState, indexId); + logger.debug("WriteNRTPointHandler returned version {}", reply.getVersion()); + return reply; } private SearcherVersion handle(IndexState indexState, String indexId) { diff --git a/src/test/java/com/yelp/nrtsearch/server/field/DateTimeFieldDefTest.java b/src/test/java/com/yelp/nrtsearch/server/field/DateTimeFieldDefTest.java index 34ce7669d..8aa4b6c56 100644 --- a/src/test/java/com/yelp/nrtsearch/server/field/DateTimeFieldDefTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/field/DateTimeFieldDefTest.java @@ -350,7 +350,7 @@ public void testRangeQueryEpochMillisInvalidFormat() { } catch (RuntimeException e) { assertEquals( String.format( - "UNKNOWN: error while trying to execute search for index test_index. check logs for full searchRequest.\n" + "INTERNAL: Error while trying to execute search for index test_index. check logs for full searchRequest.\n" + "Text \'%s\' could not be parsed, unparsed text found at index 23", dateTimeValueUpper), e.getMessage()); @@ -377,7 +377,7 @@ public void testRangeQueryDateOptionalTime() { } catch (RuntimeException e) { assertEquals( String.format( - "UNKNOWN: error while trying to execute search for index test_index. check logs for full searchRequest.\n" + "INTERNAL: Error while trying to execute search for index test_index. check logs for full searchRequest.\n" + "For input string: \"%s\"", dateTimeValueLower), e.getMessage()); @@ -404,7 +404,7 @@ public void testRangeQueryStringDateTimeInvalidFormat() { } catch (RuntimeException e) { assertEquals( String.format( - "UNKNOWN: error while trying to execute search for index test_index. check logs for full searchRequest.\n" + "INTERNAL: Error while trying to execute search for index test_index. check logs for full searchRequest.\n" + "Text '%s' could not be parsed at index 0", dateTimeValueLower), e.getMessage()); diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java index 533984ca6..f59aead5c 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java @@ -72,8 +72,6 @@ private void writeNodeFile(List nodes) throws IOException { private Server getBasicReplicationServer() throws IOException { // we only need to test connectivity for now GlobalState mockGlobalState = mock(GlobalState.class); - when(mockGlobalState.getIndexOrThrow(any(String.class))) - .thenThrow(new RuntimeException("Expected")); NrtsearchConfig mockConfiguration = mock(NrtsearchConfig.class); when(mockGlobalState.getConfiguration()).thenReturn(mockConfiguration); when(mockConfiguration.getUseKeepAliveForReplication()).thenReturn(true); @@ -89,7 +87,7 @@ private void verifyConnected(ReplicationServerClient client) { client.getConnectedNodes("test_index"); fail(); } catch (StatusRuntimeException e) { - assertEquals("INTERNAL: error on GetNodesInfoHandler\nExpected", e.getMessage()); + assertEquals("NOT_FOUND: Index test_index not found", e.getMessage()); } }