diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java index 44a726f06..c67e78aec 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/NrtsearchServer.java @@ -457,56 +457,61 @@ public GlobalState getGlobalState() { @Override public void createIndex( CreateIndexRequest req, StreamObserver responseObserver) { - createIndexHandler.handle(req, responseObserver); + Handler.handleUnaryRequest("createIndex", req, responseObserver, createIndexHandler); } @Override public void liveSettings( LiveSettingsRequest req, StreamObserver responseObserver) { - liveSettingsHandler.handle(req, responseObserver); + Handler.handleUnaryRequest("liveSettings", req, responseObserver, liveSettingsHandler); } @Override public void liveSettingsV2( LiveSettingsV2Request req, StreamObserver responseObserver) { - liveSettingsV2Handler.handle(req, responseObserver); + Handler.handleUnaryRequest("liveSettingsV2", req, responseObserver, liveSettingsV2Handler); } @Override public void registerFields( FieldDefRequest fieldDefRequest, StreamObserver responseObserver) { - registerFieldsHandler.handle(fieldDefRequest, responseObserver); + Handler.handleUnaryRequest( + "registerFields", fieldDefRequest, responseObserver, registerFieldsHandler); } @Override public void updateFields( FieldDefRequest fieldDefRequest, StreamObserver responseObserver) { - updateFieldsHandler.handle(fieldDefRequest, responseObserver); + Handler.handleUnaryRequest( + "updateFields", fieldDefRequest, responseObserver, updateFieldsHandler); } @Override public void settings( SettingsRequest settingsRequest, StreamObserver responseObserver) { - settingsHandler.handle(settingsRequest, responseObserver); + Handler.handleUnaryRequest("settings", settingsRequest, responseObserver, settingsHandler); } @Override public void settingsV2( SettingsV2Request settingsRequest, StreamObserver responseObserver) { - settingsV2Handler.handle(settingsRequest, responseObserver); + Handler.handleUnaryRequest( + "settingsV2", settingsRequest, responseObserver, settingsV2Handler); } @Override public void startIndex( StartIndexRequest startIndexRequest, StreamObserver responseObserver) { - startIndexHandler.handle(startIndexRequest, responseObserver); + Handler.handleUnaryRequest( + "startIndex", startIndexRequest, responseObserver, startIndexHandler); } @Override public void startIndexV2( StartIndexV2Request startIndexRequest, StreamObserver responseObserver) { - startIndexV2Handler.handle(startIndexRequest, responseObserver); + Handler.handleUnaryRequest( + "startIndexV2", startIndexRequest, responseObserver, startIndexV2Handler); } @Override @@ -519,7 +524,8 @@ public StreamObserver addDocuments( public void refresh( RefreshRequest refreshRequest, StreamObserver refreshResponseStreamObserver) { - refreshHandler.handle(refreshRequest, refreshResponseStreamObserver); + Handler.handleUnaryRequest( + "refresh", refreshRequest, refreshResponseStreamObserver, refreshHandler); } @Override @@ -531,7 +537,7 @@ public void commit( @Override public void stats( StatsRequest statsRequest, StreamObserver statsResponseStreamObserver) { - statsHandler.handle(statsRequest, statsResponseStreamObserver); + Handler.handleUnaryRequest("stats", statsRequest, statsResponseStreamObserver, statsHandler); } @Override @@ -550,46 +556,50 @@ public void searchV2( public void delete( AddDocumentRequest addDocumentRequest, StreamObserver responseObserver) { - deleteDocumentsHandler.handle(addDocumentRequest, responseObserver); + Handler.handleUnaryRequest( + "delete", addDocumentRequest, responseObserver, deleteDocumentsHandler); } @Override public void deleteByQuery( DeleteByQueryRequest deleteByQueryRequest, StreamObserver responseObserver) { - deleteByQueryHandler.handle(deleteByQueryRequest, responseObserver); + Handler.handleUnaryRequest( + "deleteByQuery", deleteByQueryRequest, responseObserver, deleteByQueryHandler); } @Override public void deleteAll( DeleteAllDocumentsRequest deleteAllDocumentsRequest, StreamObserver responseObserver) { - deleteAllDocumentsHandler.handle(deleteAllDocumentsRequest, responseObserver); + Handler.handleUnaryRequest( + "deleteAll", deleteAllDocumentsRequest, responseObserver, deleteAllDocumentsHandler); } @Override public void deleteIndex( DeleteIndexRequest deleteIndexRequest, StreamObserver responseObserver) { - deleteIndexHandler.handle(deleteIndexRequest, responseObserver); + Handler.handleUnaryRequest( + "deleteIndex", deleteIndexRequest, responseObserver, deleteIndexHandler); } @Override public void stopIndex( StopIndexRequest stopIndexRequest, StreamObserver responseObserver) { - stopIndexHandler.handle(stopIndexRequest, responseObserver); + Handler.handleUnaryRequest("stopIndex", stopIndexRequest, responseObserver, stopIndexHandler); } @Override public void reloadState( ReloadStateRequest request, StreamObserver responseObserver) { - reloadStateHandler.handle(request, responseObserver); + Handler.handleUnaryRequest("reloadState", request, responseObserver, reloadStateHandler); } @Override public void status( HealthCheckRequest request, StreamObserver responseObserver) { - statusHandler.handle(request, responseObserver); + Handler.handleUnaryRequest("status", request, responseObserver, statusHandler); } /** @@ -600,45 +610,49 @@ public void status( @Override public void ready( ReadyCheckRequest request, StreamObserver responseObserver) { - readyHandler.handle(request, responseObserver); + Handler.handleUnaryRequest("ready", request, responseObserver, readyHandler); } @Override public void createSnapshot( CreateSnapshotRequest createSnapshotRequest, StreamObserver responseObserver) { - createSnapshotHandler.handle(createSnapshotRequest, responseObserver); + Handler.handleUnaryRequest( + "createSnapshot", createSnapshotRequest, responseObserver, createSnapshotHandler); } @Override public void releaseSnapshot( ReleaseSnapshotRequest releaseSnapshotRequest, StreamObserver responseObserver) { - releaseSnapshotHandler.handle(releaseSnapshotRequest, responseObserver); + Handler.handleUnaryRequest( + "releaseSnapshot", releaseSnapshotRequest, responseObserver, releaseSnapshotHandler); } @Override public void getAllSnapshotIndexGen( GetAllSnapshotGenRequest request, StreamObserver responseObserver) { - getAllSnapshotIndexGenHandler.handle(request, responseObserver); + Handler.handleUnaryRequest( + "getAllSnapshotIndexGen", request, responseObserver, getAllSnapshotIndexGenHandler); } @Override public void backupWarmingQueries( BackupWarmingQueriesRequest request, StreamObserver responseObserver) { - backupWarmingQueriesHandler.handle(request, responseObserver); + Handler.handleUnaryRequest( + "backupWarmingQueries", request, responseObserver, backupWarmingQueriesHandler); } @Override public void metrics(Empty request, StreamObserver responseObserver) { - metricsHandler.handle(request, responseObserver); + Handler.handleUnaryRequest("metrics", request, responseObserver, metricsHandler); } @Override public void indices(IndicesRequest request, StreamObserver responseObserver) { - indicesHandler.handle(request, responseObserver); + Handler.handleUnaryRequest("indices", request, responseObserver, indicesHandler); } @Override @@ -650,30 +664,32 @@ public void nodeInfo( @Override public void globalState( GlobalStateRequest request, StreamObserver responseObserver) { - globalStateHandler.handle(request, responseObserver); + Handler.handleUnaryRequest("globalState", request, responseObserver, globalStateHandler); } @Override public void state(StateRequest request, StreamObserver responseObserver) { - getStateHandler.handle(request, responseObserver); + Handler.handleUnaryRequest("state", request, responseObserver, getStateHandler); } @Override public void forceMerge( ForceMergeRequest forceMergeRequest, StreamObserver responseObserver) { - forceMergeHandler.handle(forceMergeRequest, responseObserver); + Handler.handleUnaryRequest( + "forceMerge", forceMergeRequest, responseObserver, forceMergeHandler); } @Override public void forceMergeDeletes( ForceMergeDeletesRequest forceMergeRequest, StreamObserver responseObserver) { - forceMergeDeletesHandler.handle(forceMergeRequest, responseObserver); + Handler.handleUnaryRequest( + "forceMergeDeletes", forceMergeRequest, responseObserver, forceMergeDeletesHandler); } @Override public void custom(CustomRequest request, StreamObserver responseObserver) { - customHandler.handle(request, responseObserver); + Handler.handleUnaryRequest("custom", request, responseObserver, customHandler); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/BackupWarmingQueriesHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/BackupWarmingQueriesHandler.java index ac85c6c3a..ca94bbdd1 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/BackupWarmingQueriesHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/BackupWarmingQueriesHandler.java @@ -21,8 +21,6 @@ import com.yelp.nrtsearch.server.state.GlobalState; import com.yelp.nrtsearch.server.warming.Warmer; import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import java.io.IOException; import java.lang.management.ManagementFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,74 +34,47 @@ public BackupWarmingQueriesHandler(GlobalState globalState) { } @Override - public void handle( - BackupWarmingQueriesRequest request, - StreamObserver responseObserver) { + public BackupWarmingQueriesResponse handle(BackupWarmingQueriesRequest request) throws Exception { logger.info("Received backup warming queries request: {}", request); String index = request.getIndex(); - try { - IndexState indexState = getGlobalState().getIndexOrThrow(index); - Warmer warmer = indexState.getWarmer(); - if (warmer == null) { - logger.warn("Unable to backup warming queries as warmer not found for index: {}", index); - responseObserver.onError( - Status.UNKNOWN - .withDescription( - "Unable to backup warming queries as warmer not found for index: " + index) - .asRuntimeException()); - return; - } - int numQueriesThreshold = request.getNumQueriesThreshold(); - int numWarmingRequests = warmer.getNumWarmingRequests(); - if (numQueriesThreshold > 0 && numWarmingRequests < numQueriesThreshold) { - logger.warn( - "Unable to backup warming queries since warmer has {} requests, which is less than threshold {}", - numWarmingRequests, - numQueriesThreshold); - responseObserver.onError( - Status.UNKNOWN - .withDescription( - String.format( - "Unable to backup warming queries since warmer has %s requests, which is less than threshold %s", - numWarmingRequests, numQueriesThreshold)) - .asRuntimeException()); - return; - } - int uptimeMinutesThreshold = request.getUptimeMinutesThreshold(); - int currUptimeMinutes = - (int) (ManagementFactory.getRuntimeMXBean().getUptime() / 1000L / 60L); - if (uptimeMinutesThreshold > 0 && currUptimeMinutes < uptimeMinutesThreshold) { - logger.warn( - "Unable to backup warming queries since uptime is {} minutes, which is less than threshold {}", - currUptimeMinutes, - uptimeMinutesThreshold); - responseObserver.onError( - Status.UNKNOWN - .withDescription( - String.format( - "Unable to backup warming queries since uptime is %s minutes, which is less than threshold %s", - currUptimeMinutes, uptimeMinutesThreshold)) - .asRuntimeException()); - return; - } - warmer.backupWarmingQueriesToS3(request.getServiceName()); - responseObserver.onNext(BackupWarmingQueriesResponse.newBuilder().build()); - responseObserver.onCompleted(); - } catch (IOException e) { - logger.error( - "Unable to backup warming queries for index: {}, service: {}", - index, - request.getServiceName(), - e); - responseObserver.onError( - Status.UNKNOWN - .withCause(e) - .withDescription( - String.format( - "Unable to backup warming queries for index: %s, service: %s", - index, request.getServiceName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); + IndexState indexState = getIndexState(index); + Warmer warmer = indexState.getWarmer(); + if (warmer == null) { + logger.warn("Unable to backup warming queries as warmer not found for index: {}", index); + throw Status.UNKNOWN + .withDescription( + "Unable to backup warming queries as warmer not found for index: " + index) + .asRuntimeException(); } + int numQueriesThreshold = request.getNumQueriesThreshold(); + int numWarmingRequests = warmer.getNumWarmingRequests(); + if (numQueriesThreshold > 0 && numWarmingRequests < numQueriesThreshold) { + logger.warn( + "Unable to backup warming queries since warmer has {} requests, which is less than threshold {}", + numWarmingRequests, + numQueriesThreshold); + throw Status.UNKNOWN + .withDescription( + String.format( + "Unable to backup warming queries since warmer has %s requests, which is less than threshold %s", + numWarmingRequests, numQueriesThreshold)) + .asRuntimeException(); + } + int uptimeMinutesThreshold = request.getUptimeMinutesThreshold(); + int currUptimeMinutes = (int) (ManagementFactory.getRuntimeMXBean().getUptime() / 1000L / 60L); + if (uptimeMinutesThreshold > 0 && currUptimeMinutes < uptimeMinutesThreshold) { + logger.warn( + "Unable to backup warming queries since uptime is {} minutes, which is less than threshold {}", + currUptimeMinutes, + uptimeMinutesThreshold); + throw Status.UNKNOWN + .withDescription( + String.format( + "Unable to backup warming queries since uptime is %s minutes, which is less than threshold %s", + currUptimeMinutes, uptimeMinutesThreshold)) + .asRuntimeException(); + } + warmer.backupWarmingQueriesToS3(request.getServiceName()); + return BackupWarmingQueriesResponse.newBuilder().build(); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/CreateIndexHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/CreateIndexHandler.java index 782488573..baeeda8e6 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/CreateIndexHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/CreateIndexHandler.java @@ -19,7 +19,6 @@ import com.yelp.nrtsearch.server.grpc.CreateIndexResponse; import com.yelp.nrtsearch.server.state.GlobalState; import io.grpc.Status; -import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,43 +30,27 @@ public CreateIndexHandler(GlobalState globalState) { } @Override - public void handle(CreateIndexRequest req, StreamObserver responseObserver) { + public CreateIndexResponse handle(CreateIndexRequest req) throws Exception { logger.info("Received create index request: {}", req); String indexName = req.getIndexName(); String validIndexNameRegex = "[A-z0-9_-]+"; if (!indexName.matches(validIndexNameRegex)) { - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - String.format( - "Index name %s is invalid - must contain only a-z, A-Z or 0-9", indexName)) - .asRuntimeException()); - return; + throw Status.INVALID_ARGUMENT + .withDescription( + String.format( + "Index name %s is invalid - must contain only a-z, A-Z or 0-9", indexName)) + .asRuntimeException(); } - try { + synchronized (getGlobalState()) { + if (getGlobalState().getIndex(indexName) != null) { + throw Status.ALREADY_EXISTS + .withDescription(String.format("Index %s already exists", indexName)) + .asRuntimeException(); + } getGlobalState().createIndex(req); - String response = String.format("Created Index name: %s", indexName); - CreateIndexResponse reply = CreateIndexResponse.newBuilder().setResponse(response).build(); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (IllegalArgumentException e) { - logger.warn("invalid IndexName: " + indexName, e); - responseObserver.onError( - Status.ALREADY_EXISTS - .withDescription("invalid indexName: " + indexName) - .augmentDescription("IllegalArgumentException()") - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn("error while trying to save index state to disk for indexName: " + indexName, e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to save index state to disk for indexName: " + indexName) - .augmentDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); } + String response = String.format("Created Index name: %s", indexName); + return CreateIndexResponse.newBuilder().setResponse(response).build(); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/CreateSnapshotHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/CreateSnapshotHandler.java index b7b0d54b5..bcbc8cb91 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/CreateSnapshotHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/CreateSnapshotHandler.java @@ -22,8 +22,6 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.apache.lucene.facet.taxonomy.SearcherTaxonomyManager; import org.apache.lucene.index.DirectoryReader; @@ -42,31 +40,12 @@ public CreateSnapshotHandler(GlobalState globalState) { } @Override - public void handle( - CreateSnapshotRequest createSnapshotRequest, - StreamObserver responseObserver) { - try { - IndexState indexState = - getGlobalState().getIndexOrThrow(createSnapshotRequest.getIndexName()); - CreateSnapshotResponse reply = createSnapshot(indexState, createSnapshotRequest); - logger.info(String.format("CreateSnapshotHandler returned results %s", reply.toString())); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn( - String.format( - "error while trying to createSnapshot for index %s", - createSnapshotRequest.getIndexName()), - e); - responseObserver.onError( - Status.UNKNOWN - .withDescription( - String.format( - "error while trying to createSnapshot for index %s", - createSnapshotRequest.getIndexName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public CreateSnapshotResponse handle(CreateSnapshotRequest createSnapshotRequest) + throws Exception { + IndexState indexState = getIndexState(createSnapshotRequest.getIndexName()); + CreateSnapshotResponse reply = createSnapshot(indexState, createSnapshotRequest); + logger.info(String.format("CreateSnapshotHandler returned results %s", reply)); + return reply; } private CreateSnapshotResponse createSnapshot( diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/CustomHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/CustomHandler.java index cb221407f..bf08946b1 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/CustomHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/CustomHandler.java @@ -19,8 +19,6 @@ import com.yelp.nrtsearch.server.grpc.CustomRequest; import com.yelp.nrtsearch.server.grpc.CustomResponse; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,17 +30,8 @@ public CustomHandler(GlobalState globalState) { } @Override - public void handle(CustomRequest request, StreamObserver responseObserver) { + public CustomResponse handle(CustomRequest request) throws Exception { logger.info("Received custom request: {}", request); - try { - CustomResponse response = CustomRequestProcessor.processCustomRequest(request); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } catch (Exception e) { - String error = - String.format("Error processing custom request %s, error: %s", request, e.getMessage()); - logger.error(error); - responseObserver.onError(Status.INTERNAL.withDescription(error).withCause(e).asException()); - } + return CustomRequestProcessor.processCustomRequest(request); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteAllDocumentsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteAllDocumentsHandler.java index 999e5a06d..f28731ad6 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteAllDocumentsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteAllDocumentsHandler.java @@ -19,8 +19,6 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,29 +33,13 @@ public DeleteAllDocumentsHandler(GlobalState globalState) { } @Override - public void handle( - DeleteAllDocumentsRequest deleteAllDocumentsRequest, - StreamObserver responseObserver) { + public DeleteAllDocumentsResponse handle(DeleteAllDocumentsRequest deleteAllDocumentsRequest) + throws Exception { logger.info("Received delete all documents request: {}", deleteAllDocumentsRequest); - try { - IndexState indexState = - getGlobalState().getIndexOrThrow(deleteAllDocumentsRequest.getIndexName()); - DeleteAllDocumentsResponse reply = handle(indexState); - logger.info("DeleteAllDocumentsHandler returned " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn( - "error while trying to deleteAll for index " + deleteAllDocumentsRequest.getIndexName(), - e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to deleteAll for index: " - + deleteAllDocumentsRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + IndexState indexState = getIndexState(deleteAllDocumentsRequest.getIndexName()); + DeleteAllDocumentsResponse reply = handle(indexState); + logger.info("DeleteAllDocumentsHandler returned " + reply); + return reply; } private DeleteAllDocumentsResponse handle(IndexState indexState) diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteByQueryHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteByQueryHandler.java index 49ece8cf7..3a30a7979 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteByQueryHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteByQueryHandler.java @@ -21,8 +21,6 @@ import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.query.QueryNodeMapper; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.List; import java.util.stream.Collectors; @@ -39,28 +37,11 @@ public DeleteByQueryHandler(GlobalState globalState) { } @Override - public void handle( - DeleteByQueryRequest deleteByQueryRequest, - StreamObserver responseObserver) { - try { - IndexState indexState = getGlobalState().getIndexOrThrow(deleteByQueryRequest.getIndexName()); - AddDocumentResponse reply = handle(indexState, deleteByQueryRequest); - logger.debug("DeleteDocumentsHandler returned " + reply.toString()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn( - "Error while trying to delete documents from index: {}", - deleteByQueryRequest.getIndexName(), - e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "Error while trying to delete documents from index: " - + deleteByQueryRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public AddDocumentResponse handle(DeleteByQueryRequest deleteByQueryRequest) throws Exception { + IndexState indexState = getGlobalState().getIndexOrThrow(deleteByQueryRequest.getIndexName()); + AddDocumentResponse reply = handle(indexState, deleteByQueryRequest); + logger.debug("DeleteDocumentsHandler returned " + reply); + return reply; } private AddDocumentResponse handle( diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteDocumentsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteDocumentsHandler.java index 3156a1e66..7bdb39c4c 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteDocumentsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteDocumentsHandler.java @@ -21,8 +21,6 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -40,27 +38,11 @@ public DeleteDocumentsHandler(GlobalState globalState) { } @Override - public void handle( - AddDocumentRequest addDocumentRequest, StreamObserver responseObserver) { - try { - IndexState indexState = getGlobalState().getIndexOrThrow(addDocumentRequest.getIndexName()); - AddDocumentResponse reply = handleInternal(indexState, addDocumentRequest); - logger.debug("DeleteDocumentsHandler returned {}", reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.error( - "error while trying to delete documents for index {}", - addDocumentRequest.getIndexName(), - e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to delete documents for index: " - + addDocumentRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public AddDocumentResponse handle(AddDocumentRequest addDocumentRequest) throws Exception { + IndexState indexState = getGlobalState().getIndexOrThrow(addDocumentRequest.getIndexName()); + AddDocumentResponse reply = handleInternal(indexState, addDocumentRequest); + logger.debug("DeleteDocumentsHandler returned {}", reply); + return reply; } private AddDocumentResponse handleInternal( diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteIndexHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteIndexHandler.java index 74dbd1a64..bd2d0fd39 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/DeleteIndexHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/DeleteIndexHandler.java @@ -19,8 +19,6 @@ import com.yelp.nrtsearch.server.grpc.DeleteIndexResponse; import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,24 +31,12 @@ public DeleteIndexHandler(GlobalState globalState) { } @Override - public void handle( - DeleteIndexRequest deleteIndexRequest, StreamObserver responseObserver) { + public DeleteIndexResponse handle(DeleteIndexRequest deleteIndexRequest) throws Exception { logger.info("Received delete index request: {}", deleteIndexRequest); - try { - IndexState indexState = getGlobalState().getIndexOrThrow(deleteIndexRequest.getIndexName()); - DeleteIndexResponse reply = handle(indexState); - logger.info("DeleteIndexHandler returned " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn("error while trying to delete index " + deleteIndexRequest.getIndexName(), e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to delete index: " + deleteIndexRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + IndexState indexState = getIndexState(deleteIndexRequest.getIndexName()); + DeleteIndexResponse reply = handle(indexState); + logger.info("DeleteIndexHandler returned " + reply); + return reply; } private DeleteIndexResponse handle(IndexState indexState) throws DeleteIndexHandlerException { diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeDeletesHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeDeletesHandler.java index b252954a0..0d076cd34 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeDeletesHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeDeletesHandler.java @@ -21,7 +21,6 @@ import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,13 +34,11 @@ public ForceMergeDeletesHandler(GlobalState globalState) { } @Override - public void handle( - ForceMergeDeletesRequest forceMergeRequest, - StreamObserver responseObserver) { + public ForceMergeDeletesResponse handle(ForceMergeDeletesRequest forceMergeRequest) + throws Exception { logger.info("Received force merge deletes request: {}", forceMergeRequest); if (forceMergeRequest.getIndexName().isEmpty()) { - responseObserver.onError(new IllegalArgumentException("Index name in request is empty")); - return; + throw new IllegalArgumentException("Index name in request is empty"); } try { @@ -52,13 +49,11 @@ public void handle( } catch (IOException e) { logger.warn( "Error during force merge deletes for index {} ", forceMergeRequest.getIndexName(), e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "Error during force merge deletes for index " + forceMergeRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - return; + throw Status.INTERNAL + .withDescription( + "Error during force merge deletes for index " + forceMergeRequest.getIndexName()) + .augmentDescription(e.getMessage()) + .asRuntimeException(); } ForceMergeDeletesResponse.Status status = @@ -66,9 +61,6 @@ public void handle( ? ForceMergeDeletesResponse.Status.FORCE_MERGE_DELETES_COMPLETED : ForceMergeDeletesResponse.Status.FORCE_MERGE_DELETES_SUBMITTED; logger.info("Force merge deletes status: {}", status); - ForceMergeDeletesResponse response = - ForceMergeDeletesResponse.newBuilder().setStatus(status).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); + return ForceMergeDeletesResponse.newBuilder().setStatus(status).build(); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeHandler.java index 5ccd4d9de..df2fe6548 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/ForceMergeHandler.java @@ -20,57 +20,37 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ForceMergeHandler extends Handler { private static final Logger logger = LoggerFactory.getLogger(ForceMergeHandler.class); - private static ForceMergeHandler instance; public ForceMergeHandler(GlobalState globalState) { super(globalState); } @Override - public void handle( - ForceMergeRequest forceMergeRequest, StreamObserver responseObserver) { + public ForceMergeResponse handle(ForceMergeRequest forceMergeRequest) throws Exception { logger.info("Received force merge request: {}", forceMergeRequest); if (forceMergeRequest.getIndexName().isEmpty()) { - responseObserver.onError(new IllegalArgumentException("Index name in request is empty")); - return; + throw new IllegalArgumentException("Index name in request is empty"); } if (forceMergeRequest.getMaxNumSegments() == 0) { - responseObserver.onError(new IllegalArgumentException("Cannot have 0 max segments")); - return; + throw new IllegalArgumentException("Cannot have 0 max segments"); } - try { - IndexState indexState = getGlobalState().getIndexOrThrow(forceMergeRequest.getIndexName()); - ShardState shardState = indexState.getShards().get(0); - logger.info("Beginning force merge for index: {}", forceMergeRequest.getIndexName()); - shardState.writer.forceMerge( - forceMergeRequest.getMaxNumSegments(), forceMergeRequest.getDoWait()); - } catch (IOException e) { - logger.warn("Error during force merge for index {} ", forceMergeRequest.getIndexName(), e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "Error during force merge for index " + forceMergeRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - return; - } + IndexState indexState = getGlobalState().getIndexOrThrow(forceMergeRequest.getIndexName()); + ShardState shardState = indexState.getShards().get(0); + logger.info("Beginning force merge for index: {}", forceMergeRequest.getIndexName()); + shardState.writer.forceMerge( + forceMergeRequest.getMaxNumSegments(), forceMergeRequest.getDoWait()); ForceMergeResponse.Status status = forceMergeRequest.getDoWait() ? ForceMergeResponse.Status.FORCE_MERGE_COMPLETED : ForceMergeResponse.Status.FORCE_MERGE_SUBMITTED; logger.info("Force merge status: {}", status); - ForceMergeResponse response = ForceMergeResponse.newBuilder().setStatus(status).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); + return ForceMergeResponse.newBuilder().setStatus(status).build(); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/GetAllSnapshotIndexGenHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/GetAllSnapshotIndexGenHandler.java index 944428033..8c7386de3 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/GetAllSnapshotIndexGenHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/GetAllSnapshotIndexGenHandler.java @@ -18,8 +18,6 @@ import com.yelp.nrtsearch.server.grpc.GetAllSnapshotGenRequest; import com.yelp.nrtsearch.server.grpc.GetAllSnapshotGenResponse; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.stub.StreamObserver; -import java.io.IOException; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,24 +31,13 @@ public GetAllSnapshotIndexGenHandler(GlobalState globalState) { } @Override - public void handle( - GetAllSnapshotGenRequest request, - StreamObserver responseObserver) { - try { - Set snapshotGens = - getGlobalState() - .getIndexOrThrow(request.getIndexName()) - .getShard(0) - .snapshotGenToVersion - .keySet(); - GetAllSnapshotGenResponse response = - GetAllSnapshotGenResponse.newBuilder().addAllIndexGens(snapshotGens).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } catch (IOException e) { - logger.error( - "Error getting all snapshotted index gens for index: {}", request.getIndexName(), e); - responseObserver.onError(e); - } + public GetAllSnapshotGenResponse handle(GetAllSnapshotGenRequest request) throws Exception { + Set snapshotGens = + getGlobalState() + .getIndexOrThrow(request.getIndexName()) + .getShard(0) + .snapshotGenToVersion + .keySet(); + return GetAllSnapshotGenResponse.newBuilder().addAllIndexGens(snapshotGens).build(); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/GetStateHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/GetStateHandler.java index c8df29217..0c611dc85 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/GetStateHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/GetStateHandler.java @@ -20,8 +20,6 @@ import com.yelp.nrtsearch.server.grpc.StateResponse; import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,22 +33,11 @@ public GetStateHandler(GlobalState globalState) { } @Override - public void handle(StateRequest request, StreamObserver responseObserver) { - try { - IndexState indexState = getGlobalState().getIndexOrThrow(request.getIndexName()); - StateResponse reply = handle(indexState); - logger.debug("GetStateHandler returned " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn("error while trying to get state for index " + request.getIndexName(), e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to get state for index " + request.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public StateResponse handle(StateRequest request) throws Exception { + IndexState indexState = getIndexState(request.getIndexName()); + StateResponse reply = handle(indexState); + logger.debug("GetStateHandler returned {}", reply); + return reply; } private StateResponse handle(IndexState indexState) throws HandlerException { diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/GlobalStateHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/GlobalStateHandler.java index 46784fbc8..a788e1351 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/GlobalStateHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/GlobalStateHandler.java @@ -18,8 +18,6 @@ import com.yelp.nrtsearch.server.grpc.GlobalStateRequest; import com.yelp.nrtsearch.server.grpc.GlobalStateResponse; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,19 +29,7 @@ public GlobalStateHandler(GlobalState globalState) { } @Override - public void handle( - GlobalStateRequest request, StreamObserver responseObserver) { - try { - responseObserver.onNext( - GlobalStateResponse.newBuilder().setGlobalState(getGlobalState().getStateInfo()).build()); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn("error while trying to get global state", e); - responseObserver.onError( - Status.UNKNOWN - .withDescription("error while trying to get global state") - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public GlobalStateResponse handle(GlobalStateRequest request) throws Exception { + return GlobalStateResponse.newBuilder().setGlobalState(getGlobalState().getStateInfo()).build(); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/Handler.java b/src/main/java/com/yelp/nrtsearch/server/handler/Handler.java index 3547f81d0..d26819bcc 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/Handler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/Handler.java @@ -21,12 +21,15 @@ import com.yelp.nrtsearch.server.grpc.LuceneServerStubBuilder; import com.yelp.nrtsearch.server.grpc.NrtsearchServer; import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; +import com.yelp.nrtsearch.server.index.IndexState; +import com.yelp.nrtsearch.server.index.IndexStateManager; import com.yelp.nrtsearch.server.state.GlobalState; import com.yelp.nrtsearch.server.utils.ProtoMessagePrinter; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +61,7 @@ public void handle(T protoRequest, StreamObserver responseObserver) { throw new UnsupportedOperationException("This method is not supported"); } - public S handle(T protoRequest) { + public S handle(T protoRequest) throws Exception { throw new UnsupportedOperationException("This method is not supported"); } @@ -103,6 +106,27 @@ void handleUnaryRequest( } } + protected IndexState getIndexState(String indexName) throws IOException, StatusRuntimeException { + IndexState indexState = getGlobalState().getIndex(indexName); + if (indexState == null) { + throw Status.NOT_FOUND + .withDescription("Index " + indexName + " not found") + .asRuntimeException(); + } + return indexState; + } + + protected IndexStateManager getIndexStateManager(String indexName) + throws IOException, StatusRuntimeException { + IndexStateManager indexStateManager = getGlobalState().getIndexStateManager(indexName); + if (indexStateManager == null) { + throw Status.NOT_FOUND + .withDescription("Index " + indexName + " not found") + .asRuntimeException(); + } + return indexStateManager; + } + /** * Set response compression on the provided {@link StreamObserver}. Should be a valid compression * type from the {@link LuceneServerStubBuilder#COMPRESSOR_REGISTRY}, or empty string for default. diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/IndicesHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/IndicesHandler.java index f90888863..c388d643d 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/IndicesHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/IndicesHandler.java @@ -21,8 +21,6 @@ import com.yelp.nrtsearch.server.grpc.StatsResponse; import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.Set; import org.slf4j.Logger; @@ -36,21 +34,10 @@ public IndicesHandler(GlobalState globalState) { } @Override - public void handle( - IndicesRequest indicesRequest, StreamObserver responseObserver) { - try { - IndicesResponse reply = getIndicesResponse(getGlobalState()); - logger.debug("IndicesRequestHandler returned " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn("error while trying to get indices stats", e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription("error while trying to get indices stats") - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public IndicesResponse handle(IndicesRequest indicesRequest) throws Exception { + IndicesResponse reply = getIndicesResponse(getGlobalState()); + logger.debug("IndicesRequestHandler returned {}", reply); + return reply; } private static IndicesResponse getIndicesResponse(GlobalState globalState) throws IOException { diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/LiveSettingsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/LiveSettingsHandler.java index 5141bddf8..e99baba0f 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/LiveSettingsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/LiveSettingsHandler.java @@ -24,8 +24,6 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.index.IndexStateManager; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,36 +36,12 @@ public LiveSettingsHandler(GlobalState globalState) { } @Override - public void handle( - LiveSettingsRequest req, StreamObserver responseObserver) { + public LiveSettingsResponse handle(LiveSettingsRequest req) throws Exception { logger.info("Received live settings request: {}", req); - try { - IndexState indexState = getGlobalState().getIndexOrThrow(req.getIndexName()); - LiveSettingsResponse reply = handle(indexState, req); - logger.info("LiveSettingsHandler returned {}", reply.toString()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (IllegalArgumentException e) { - logger.warn("index: {} was not yet created", req.getIndexName(), e); - responseObserver.onError( - Status.ALREADY_EXISTS - .withDescription("invalid indexName: " + req.getIndexName()) - .augmentDescription("IllegalArgumentException()") - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn( - "error while trying to read index state dir for indexName: " + req.getIndexName(), e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + req.getIndexName() - + "at rootDir: ") - .augmentDescription("IOException()") - .withCause(e) - .asRuntimeException()); - } + IndexState indexState = getIndexState(req.getIndexName()); + LiveSettingsResponse reply = handle(indexState, req); + logger.info("LiveSettingsHandler returned {}", reply.toString()); + return reply; } public LiveSettingsResponse handle( diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/LiveSettingsV2Handler.java b/src/main/java/com/yelp/nrtsearch/server/handler/LiveSettingsV2Handler.java index 1ddf9d220..b2bcd8880 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/LiveSettingsV2Handler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/LiveSettingsV2Handler.java @@ -21,8 +21,6 @@ import com.yelp.nrtsearch.server.grpc.LiveSettingsV2Response; import com.yelp.nrtsearch.server.index.IndexStateManager; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,36 +34,12 @@ public LiveSettingsV2Handler(GlobalState globalState) { } @Override - public void handle( - LiveSettingsV2Request req, StreamObserver responseObserver) { + public LiveSettingsV2Response handle(LiveSettingsV2Request req) throws Exception { logger.info("Received live settings V2 request: {}", req); - try { - IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(req.getIndexName()); - LiveSettingsV2Response reply = handle(indexStateManager, req); - logger.info("LiveSettingsV2Handler returned " + JsonFormat.printer().print(reply)); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (IllegalArgumentException e) { - logger.warn("index: " + req.getIndexName() + " was not yet created", e); - responseObserver.onError( - Status.ALREADY_EXISTS - .withDescription("invalid indexName: " + req.getIndexName()) - .augmentDescription("IllegalArgumentException()") - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn( - "error while trying to process live settings for indexName: " + req.getIndexName(), e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to process live settings for indexName: " - + req.getIndexName()) - .augmentDescription("Exception()") - .withCause(e) - .asRuntimeException()); - } + IndexStateManager indexStateManager = getIndexStateManager(req.getIndexName()); + LiveSettingsV2Response reply = handle(indexStateManager, req); + logger.info("LiveSettingsV2Handler returned " + JsonFormat.printer().print(reply)); + return reply; } /** diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/MetricsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/MetricsHandler.java index 9bb9c6ec1..93c6a3a0e 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/MetricsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/MetricsHandler.java @@ -18,8 +18,6 @@ import com.google.api.HttpBody; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; import io.prometheus.metrics.model.registry.PrometheusRegistry; import java.io.ByteArrayOutputStream; @@ -37,20 +35,10 @@ public MetricsHandler(PrometheusRegistry prometheusRegistry) { } @Override - public void handle(Empty request, StreamObserver responseObserver) { - try { - HttpBody reply = process(); - logger.debug("MetricsRequestHandler returned " + reply.toString()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn("error while trying to get metrics", e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription("error while trying to get metrics") - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public HttpBody handle(Empty request) throws Exception { + HttpBody reply = process(); + logger.debug("MetricsRequestHandler returned {}", reply); + return reply; } private HttpBody process() throws IOException { diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/ReadyHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/ReadyHandler.java index f81c60abc..1e843701d 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/ReadyHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/ReadyHandler.java @@ -23,7 +23,6 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.state.GlobalState; import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -40,8 +39,7 @@ public ReadyHandler(GlobalState globalState) { } @Override - public void handle( - ReadyCheckRequest request, StreamObserver responseObserver) { + public HealthCheckResponse handle(ReadyCheckRequest request) throws Exception { Set indexNames; // If specific index names are provided we will check only those indices, otherwise check all @@ -56,46 +54,33 @@ public void handle( Sets.difference(Set.copyOf(indexNamesToCheck), allIndices); if (!nonExistentIndices.isEmpty()) { logger.warn("Indices: {} do not exist", nonExistentIndices); - responseObserver.onError( - Status.UNAVAILABLE - .withDescription(String.format("Indices do not exist: %s", nonExistentIndices)) - .asRuntimeException()); - return; + throw Status.UNAVAILABLE + .withDescription(String.format("Indices do not exist: %s", nonExistentIndices)) + .asRuntimeException(); } indexNames = allIndices.stream().filter(indexNamesToCheck::contains).collect(Collectors.toSet()); } - try { - List indicesNotStarted = new ArrayList<>(); - for (String indexName : indexNames) { - IndexState indexState = getGlobalState().getIndexOrThrow(indexName); - if (!indexState.isStarted()) { - indicesNotStarted.add(indexName); - } + List indicesNotStarted = new ArrayList<>(); + for (String indexName : indexNames) { + IndexState indexState = getGlobalState().getIndexOrThrow(indexName); + if (!indexState.isStarted()) { + indicesNotStarted.add(indexName); } + } - if (indicesNotStarted.isEmpty()) { - HealthCheckResponse reply = - HealthCheckResponse.newBuilder().setHealth(TransferStatusCode.Done).build(); - logger.debug("Ready check returned " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } else { - logger.warn("Indices not started: {}", indicesNotStarted); - responseObserver.onError( - Status.UNAVAILABLE - .withDescription(String.format("Indices not started: %s", indicesNotStarted)) - .asRuntimeException()); - } - } catch (Exception e) { - logger.warn("error while trying to check if all required indices are started", e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription("error while trying to check if all required indices are started") - .augmentDescription(e.getMessage()) - .asRuntimeException()); + if (indicesNotStarted.isEmpty()) { + HealthCheckResponse reply = + HealthCheckResponse.newBuilder().setHealth(TransferStatusCode.Done).build(); + logger.debug("Ready check returned " + reply); + return reply; + } else { + logger.warn("Indices not started: {}", indicesNotStarted); + throw Status.UNAVAILABLE + .withDescription(String.format("Indices not started: %s", indicesNotStarted)) + .asRuntimeException(); } } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/RefreshHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/RefreshHandler.java index 30d19c901..603328837 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/RefreshHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/RefreshHandler.java @@ -20,9 +20,6 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,43 +31,18 @@ public RefreshHandler(GlobalState globalState) { } @Override - public void handle( - RefreshRequest refreshRequest, StreamObserver responseObserver) { - try { - IndexState indexState = getGlobalState().getIndexOrThrow(refreshRequest.getIndexName()); - final ShardState shardState = indexState.getShard(0); - long t0 = System.nanoTime(); - shardState.maybeRefreshBlocking(); - long t1 = System.nanoTime(); - double refreshTimeMs = (t1 - t0) / 1000000.0; - RefreshResponse reply = RefreshResponse.newBuilder().setRefreshTimeMS(refreshTimeMs).build(); - logger.info( - String.format( - "RefreshHandler refreshed index: %s in %f", - refreshRequest.getIndexName(), refreshTimeMs)); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: " - + refreshRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + refreshRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn("error while trying to refresh index " + refreshRequest.getIndexName(), e); - responseObserver.onError( - Status.UNKNOWN - .withDescription( - "error while trying to refresh index: " + refreshRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public RefreshResponse handle(RefreshRequest refreshRequest) throws Exception { + IndexState indexState = getIndexState(refreshRequest.getIndexName()); + final ShardState shardState = indexState.getShard(0); + long t0 = System.nanoTime(); + shardState.maybeRefreshBlocking(); + long t1 = System.nanoTime(); + double refreshTimeMs = (t1 - t0) / 1000000.0; + RefreshResponse reply = RefreshResponse.newBuilder().setRefreshTimeMS(refreshTimeMs).build(); + logger.info( + String.format( + "RefreshHandler refreshed index: %s in %f", + refreshRequest.getIndexName(), refreshTimeMs)); + return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/RegisterFieldsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/RegisterFieldsHandler.java index 15f44e2db..df01abdc1 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/RegisterFieldsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/RegisterFieldsHandler.java @@ -19,9 +19,6 @@ import com.yelp.nrtsearch.server.grpc.FieldDefResponse; import com.yelp.nrtsearch.server.index.IndexStateManager; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,40 +30,12 @@ public RegisterFieldsHandler(GlobalState globalState) { } @Override - public void handle( - FieldDefRequest fieldDefRequest, StreamObserver responseObserver) { + public FieldDefResponse handle(FieldDefRequest fieldDefRequest) throws Exception { logger.info("Received register fields request: {}", fieldDefRequest); - try { - IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(fieldDefRequest.getIndexName()); - String updatedFields = indexStateManager.updateFields(fieldDefRequest.getFieldList()); - FieldDefResponse reply = FieldDefResponse.newBuilder().setResponse(updatedFields).build(); - logger.info("RegisterFieldsHandler registered fields " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: " - + fieldDefRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + fieldDefRequest.getIndexName()) - .augmentDescription("IOException()") - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn( - "error while trying to RegisterFields for index " + fieldDefRequest.getIndexName(), e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to RegisterFields for index: " - + fieldDefRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + IndexStateManager indexStateManager = getIndexStateManager(fieldDefRequest.getIndexName()); + String updatedFields = indexStateManager.updateFields(fieldDefRequest.getFieldList()); + FieldDefResponse reply = FieldDefResponse.newBuilder().setResponse(updatedFields).build(); + logger.info("RegisterFieldsHandler registered fields " + reply); + return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/ReleaseSnapshotHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/ReleaseSnapshotHandler.java index 700671451..0dc2ec0e5 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/ReleaseSnapshotHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/ReleaseSnapshotHandler.java @@ -20,8 +20,6 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,31 +33,12 @@ public ReleaseSnapshotHandler(GlobalState globalState) { } @Override - public void handle( - ReleaseSnapshotRequest releaseSnapshotRequest, - StreamObserver responseObserver) { - try { - IndexState indexState = - getGlobalState().getIndexOrThrow(releaseSnapshotRequest.getIndexName()); - ReleaseSnapshotResponse reply = handle(indexState, releaseSnapshotRequest); - logger.info(String.format("CreateSnapshotHandler returned results %s", reply.toString())); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn( - String.format( - "error while trying to releaseSnapshot for index %s", - releaseSnapshotRequest.getIndexName()), - e); - responseObserver.onError( - Status.UNKNOWN - .withDescription( - String.format( - "error while trying to releaseSnapshot for index %s", - releaseSnapshotRequest.getIndexName())) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public ReleaseSnapshotResponse handle(ReleaseSnapshotRequest releaseSnapshotRequest) + throws Exception { + IndexState indexState = getIndexState(releaseSnapshotRequest.getIndexName()); + ReleaseSnapshotResponse reply = handle(indexState, releaseSnapshotRequest); + logger.info(String.format("CreateSnapshotHandler returned results %s", reply)); + return reply; } private ReleaseSnapshotResponse handle( diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/ReloadStateHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/ReloadStateHandler.java index 446c15834..cd78ad1ff 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/ReloadStateHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/ReloadStateHandler.java @@ -19,8 +19,6 @@ import com.yelp.nrtsearch.server.grpc.ReloadStateRequest; import com.yelp.nrtsearch.server.grpc.ReloadStateResponse; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,28 +30,12 @@ public ReloadStateHandler(GlobalState globalState) { } @Override - public void handle( - ReloadStateRequest request, StreamObserver responseObserver) { - try { - if (getGlobalState() - .getConfiguration() - .getIndexStartConfig() - .getMode() - .equals(Mode.REPLICA)) { - getGlobalState().reloadStateFromBackend(); - } else { - logger.info("Skip reloading state since it is not replica"); - } - ReloadStateResponse reloadStateResponse = ReloadStateResponse.newBuilder().build(); - responseObserver.onNext(reloadStateResponse); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn("error while trying to sync the index state", e); - responseObserver.onError( - Status.INTERNAL - .withDescription("error while trying to sync the index state") - .augmentDescription(e.getMessage()) - .asRuntimeException()); + public ReloadStateResponse handle(ReloadStateRequest request) throws Exception { + if (getGlobalState().getConfiguration().getIndexStartConfig().getMode().equals(Mode.REPLICA)) { + getGlobalState().reloadStateFromBackend(); + } else { + logger.info("Skip reloading state since it is not replica"); } + return ReloadStateResponse.newBuilder().build(); } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/SettingsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/SettingsHandler.java index 2107ee190..9cc37d0a7 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/SettingsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/SettingsHandler.java @@ -26,8 +26,6 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.index.IndexStateManager; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,40 +38,12 @@ public SettingsHandler(GlobalState globalState) { } @Override - public void handle( - SettingsRequest settingsRequest, StreamObserver responseObserver) { + public SettingsResponse handle(SettingsRequest settingsRequest) throws Exception { logger.info("Received settings request: {}", settingsRequest); - try { - IndexState indexState = getGlobalState().getIndexOrThrow(settingsRequest.getIndexName()); - SettingsResponse reply = handle(indexState, settingsRequest); - logger.info("SettingsHandler returned " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: " - + settingsRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + settingsRequest.getIndexName()) - .augmentDescription("IOException()") - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn( - "error while trying to update/get settings for index " + settingsRequest.getIndexName(), - e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to update/get settings for index: " - + settingsRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + IndexState indexState = getIndexState(settingsRequest.getIndexName()); + SettingsResponse reply = handle(indexState, settingsRequest); + logger.info("SettingsHandler returned " + reply); + return reply; } private SettingsResponse handle(final IndexState indexStateIn, SettingsRequest settingsRequest) diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/SettingsV2Handler.java b/src/main/java/com/yelp/nrtsearch/server/handler/SettingsV2Handler.java index 7ad43b545..f0f0aced1 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/SettingsV2Handler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/SettingsV2Handler.java @@ -21,8 +21,6 @@ import com.yelp.nrtsearch.server.grpc.SettingsV2Response; import com.yelp.nrtsearch.server.index.IndexStateManager; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,41 +34,12 @@ public SettingsV2Handler(GlobalState globalState) { } @Override - public void handle( - SettingsV2Request settingsRequest, StreamObserver responseObserver) { + public SettingsV2Response handle(SettingsV2Request settingsRequest) throws Exception { logger.info("Received settings V2 request: {}", settingsRequest); - try { - IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(settingsRequest.getIndexName()); - SettingsV2Response reply = handle(indexStateManager, settingsRequest); - logger.info("SettingsV2Handler returned: " + JsonFormat.printer().print(reply)); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: " - + settingsRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + settingsRequest.getIndexName()) - .augmentDescription("IOException()") - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn( - "error while trying to update/get settings for index " + settingsRequest.getIndexName(), - e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to update/get settings for index: " - + settingsRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + IndexStateManager indexStateManager = getIndexStateManager(settingsRequest.getIndexName()); + SettingsV2Response reply = handle(indexStateManager, settingsRequest); + logger.info("SettingsV2Handler returned: " + JsonFormat.printer().print(reply)); + return reply; } /** diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexHandler.java index d92906bec..4ce5f351f 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexHandler.java @@ -19,8 +19,6 @@ import com.yelp.nrtsearch.server.grpc.StartIndexResponse; import com.yelp.nrtsearch.server.state.GlobalState; import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,46 +30,17 @@ public StartIndexHandler(GlobalState globalState) { } @Override - public void handle( - StartIndexRequest startIndexRequest, StreamObserver responseObserver) { + public StartIndexResponse handle(StartIndexRequest startIndexRequest) throws Exception { logger.info("Received start index request: {}", startIndexRequest); if (startIndexRequest.getIndexName().isEmpty()) { logger.warn("error while trying to start index with empty index name."); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - String.format("error while trying to start index since indexName was empty.")) - .asRuntimeException()); - return; + throw Status.INVALID_ARGUMENT + .withDescription("error while trying to start index since indexName was empty.") + .asRuntimeException(); } - try { - StartIndexResponse reply = getGlobalState().startIndex(startIndexRequest); - logger.info("StartIndexHandler returned " + reply.toString()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: " - + startIndexRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + startIndexRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn("error while trying to start index " + startIndexRequest.getIndexName(), e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to start index: " + startIndexRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + StartIndexResponse reply = getGlobalState().startIndex(startIndexRequest); + logger.info("StartIndexHandler returned " + reply.toString()); + return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexV2Handler.java b/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexV2Handler.java index 967ef117f..d11cadff4 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexV2Handler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/StartIndexV2Handler.java @@ -19,8 +19,6 @@ import com.yelp.nrtsearch.server.grpc.StartIndexV2Request; import com.yelp.nrtsearch.server.state.GlobalState; import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,35 +30,17 @@ public StartIndexV2Handler(GlobalState globalState) { } @Override - public void handle( - StartIndexV2Request startIndexRequest, StreamObserver responseObserver) { + public StartIndexResponse handle(StartIndexV2Request startIndexRequest) throws Exception { logger.info("Received start index v2 request: {}", startIndexRequest); - try { - StartIndexResponse reply = getGlobalState().startIndexV2(startIndexRequest); - logger.info("StartIndexV2Handler returned " + reply.toString()); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: " - + startIndexRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + startIndexRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn("error while trying to start index " + startIndexRequest.getIndexName(), e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to start index: " + startIndexRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); + if (startIndexRequest.getIndexName().isEmpty()) { + logger.warn("error while trying to start index with empty index name."); + throw Status.INVALID_ARGUMENT + .withDescription("error while trying to start index since indexName was empty.") + .asRuntimeException(); } + + StartIndexResponse reply = getGlobalState().startIndexV2(startIndexRequest); + logger.info("StartIndexV2Handler returned " + reply.toString()); + return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/StatsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/StatsHandler.java index 0244b2191..69f10a3f2 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/StatsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/StatsHandler.java @@ -22,8 +22,6 @@ import com.yelp.nrtsearch.server.index.IndexState; import com.yelp.nrtsearch.server.index.ShardState; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -46,37 +44,12 @@ public StatsHandler(GlobalState globalState) { } @Override - public void handle(StatsRequest statsRequest, StreamObserver responseObserver) { - try { - IndexState indexState = getGlobalState().getIndexOrThrow(statsRequest.getIndexName()); - indexState.verifyStarted(); - StatsResponse reply = process(indexState); - logger.debug("StatsHandler retrieved stats for index: {} ", reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: {}", - statsRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + statsRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn( - "error while trying to retrieve stats for index {}", statsRequest.getIndexName(), e); - responseObserver.onError( - Status.UNKNOWN - .withDescription( - "error while trying to retrieve stats for index: " + statsRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public StatsResponse handle(StatsRequest statsRequest) throws Exception { + IndexState indexState = getIndexState(statsRequest.getIndexName()); + indexState.verifyStarted(); + StatsResponse reply = process(indexState); + logger.debug("StatsHandler retrieved stats for index: {} ", reply); + return reply; } // Public because it's used by IndicesHandler diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/StatusHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/StatusHandler.java index 3975ea808..5d3777cbe 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/StatusHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/StatusHandler.java @@ -18,8 +18,6 @@ import com.yelp.nrtsearch.server.grpc.HealthCheckRequest; import com.yelp.nrtsearch.server.grpc.HealthCheckResponse; import com.yelp.nrtsearch.server.grpc.TransferStatusCode; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,21 +29,10 @@ public StatusHandler() { } @Override - public void handle( - HealthCheckRequest request, StreamObserver responseObserver) { - try { - HealthCheckResponse reply = - HealthCheckResponse.newBuilder().setHealth(TransferStatusCode.Done).build(); - logger.debug("HealthCheckResponse returned " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn("error while trying to get status", e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription("error while trying to get status") - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + public HealthCheckResponse handle(HealthCheckRequest request) throws Exception { + HealthCheckResponse reply = + HealthCheckResponse.newBuilder().setHealth(TransferStatusCode.Done).build(); + logger.debug("HealthCheckResponse returned " + reply); + return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/StopIndexHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/StopIndexHandler.java index f19d444c4..61c6345de 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/StopIndexHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/StopIndexHandler.java @@ -18,8 +18,6 @@ import com.yelp.nrtsearch.server.grpc.DummyResponse; import com.yelp.nrtsearch.server.grpc.StopIndexRequest; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,23 +29,11 @@ public StopIndexHandler(GlobalState globalState) { } @Override - public void handle( - StopIndexRequest stopIndexRequest, StreamObserver responseObserver) { + public DummyResponse handle(StopIndexRequest stopIndexRequest) throws Exception { logger.info("Received stop index request: {}", stopIndexRequest); - try { - DummyResponse reply = getGlobalState().stopIndex(stopIndexRequest); + DummyResponse reply = getGlobalState().stopIndex(stopIndexRequest); - logger.info("StopIndexHandler returned " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.warn("error while trying to stop index " + stopIndexRequest.getIndexName(), e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to stop index: " + stopIndexRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + logger.info("StopIndexHandler returned " + reply); + return reply; } } diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/UpdateFieldsHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/UpdateFieldsHandler.java index bfa4f0f9b..98d5cb2de 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/UpdateFieldsHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/UpdateFieldsHandler.java @@ -19,9 +19,6 @@ import com.yelp.nrtsearch.server.grpc.FieldDefResponse; import com.yelp.nrtsearch.server.index.IndexStateManager; import com.yelp.nrtsearch.server.state.GlobalState; -import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,41 +30,12 @@ public UpdateFieldsHandler(GlobalState globalState) { } @Override - public void handle( - FieldDefRequest fieldDefRequest, StreamObserver responseObserver) { + public FieldDefResponse handle(FieldDefRequest fieldDefRequest) throws Exception { logger.info("Received update fields request: {}", fieldDefRequest); - try { - IndexStateManager indexStateManager = - getGlobalState().getIndexStateManagerOrThrow(fieldDefRequest.getIndexName()); - String updatedFields = indexStateManager.updateFields(fieldDefRequest.getFieldList()); - FieldDefResponse reply = FieldDefResponse.newBuilder().setResponse(updatedFields).build(); - logger.info("UpdateFieldsHandler registered fields " + reply); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (IOException e) { - logger.warn( - "error while trying to read index state dir for indexName: " - + fieldDefRequest.getIndexName(), - e); - responseObserver.onError( - Status.INTERNAL - .withDescription( - "error while trying to read index state dir for indexName: " - + fieldDefRequest.getIndexName()) - .augmentDescription("IOException()") - .withCause(e) - .asRuntimeException()); - } catch (Exception e) { - logger.warn( - "error while trying to UpdateFieldsHandler for index " + fieldDefRequest.getIndexName(), - e); - responseObserver.onError( - Status.INVALID_ARGUMENT - .withDescription( - "error while trying to UpdateFieldsHandler for index: " - + fieldDefRequest.getIndexName()) - .augmentDescription(e.getMessage()) - .asRuntimeException()); - } + IndexStateManager indexStateManager = getIndexStateManager(fieldDefRequest.getIndexName()); + String updatedFields = indexStateManager.updateFields(fieldDefRequest.getFieldList()); + FieldDefResponse reply = FieldDefResponse.newBuilder().setResponse(updatedFields).build(); + logger.info("UpdateFieldsHandler registered fields " + reply); + return reply; } } diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/NrtsearchServerIdFieldTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/NrtsearchServerIdFieldTest.java index 191b85c5c..d16df63c9 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/NrtsearchServerIdFieldTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/NrtsearchServerIdFieldTest.java @@ -263,7 +263,7 @@ public void testDuplicateIdFieldInIndexState() throws IOException, InterruptedEx .build()); } catch (RuntimeException e) { String message = - "INVALID_ARGUMENT: error while trying to UpdateFieldsHandler for index: test_index\n" + "INTERNAL: Error handling updateFields request\n" + "Index can only register one id field, found: doc_id and new_text_field"; assertEquals(message, e.getMessage()); throw e; @@ -276,7 +276,7 @@ public void testMultiValued() throws Exception { registerFields(List.of(getFieldBuilder("doc_id", true, true, true))); } catch (RuntimeException e) { String message = - "INVALID_ARGUMENT: error while trying to RegisterFields for index: test_index\n" + "INTERNAL: Error handling registerFields request\n" + "field: doc_id cannot have multivalued fields as it's an _ID field"; assertEquals(message, e.getMessage()); throw e; @@ -289,7 +289,7 @@ public void testStoreAndDocValuesFalse() throws IOException { registerFields(List.of(getFieldBuilder("doc_id", false, false, false))); } catch (RuntimeException e) { String message = - "INVALID_ARGUMENT: error while trying to RegisterFields for index: test_index\n" + "INTERNAL: Error handling registerFields request\n" + "field: doc_id is an _ID field and should be retrievable by either store=true or storeDocValues=true"; assertEquals(message, e.getMessage()); throw e; @@ -315,7 +315,7 @@ public void testMultipleDocIds() throws Exception { getFieldBuilder("doc_id_2", true, true, false))); } catch (RuntimeException e) { String message = - "INVALID_ARGUMENT: error while trying to RegisterFields for index: test_index\n" + "INTERNAL: Error handling registerFields request\n" + "Index can only register one id field, found: doc_id and doc_id_2"; assertEquals(message, e.getMessage()); throw e; diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/StateBackendServerTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/StateBackendServerTest.java index 82958aacf..40aa1eb1e 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/StateBackendServerTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/StateBackendServerTest.java @@ -1251,9 +1251,7 @@ public void testIndexAlreadyExists() throws IOException { fail(); } catch (StatusRuntimeException e) { assertEquals(Status.ALREADY_EXISTS.getCode(), e.getStatus().getCode()); - assertEquals( - "ALREADY_EXISTS: invalid indexName: test_index\nIllegalArgumentException()", - e.getMessage()); + assertEquals("ALREADY_EXISTS: Index test_index already exists", e.getMessage()); } } @@ -1452,7 +1450,7 @@ public void testIndexAlreadyStarted() throws IOException { fail(); } catch (StatusRuntimeException e) { assertEquals( - "INVALID_ARGUMENT: error while trying to start index: test_index\nIndex test_index is already started", + "INTERNAL: Error handling startIndex request\nIndex test_index is already started", e.getMessage()); } }