diff --git a/src/main/java/com/yelp/nrtsearch/server/cli/LiveSettingsCommand.java b/src/main/java/com/yelp/nrtsearch/server/cli/LiveSettingsCommand.java deleted file mode 100644 index 9afc9e679..000000000 --- a/src/main/java/com/yelp/nrtsearch/server/cli/LiveSettingsCommand.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Copyright 2020 Yelp Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yelp.nrtsearch.server.cli; - -import com.yelp.nrtsearch.server.grpc.LuceneServerClient; -import java.util.concurrent.Callable; -import picocli.CommandLine; - -@CommandLine.Command( - name = LiveSettingsCommand.LIVE_SETTINGS, - description = "Updates the lives settings for the the specified index") -public class LiveSettingsCommand implements Callable { - public static final String LIVE_SETTINGS = "liveSettings"; - - @CommandLine.ParentCommand private LuceneClientCommand baseCmd; - - @CommandLine.Option( - names = {"-i", "--indexName"}, - description = "Name of the index whose live settings are to be updated", - required = true) - private String indexName; - - @CommandLine.Option( - names = {"--maxRefreshSec"}, - description = - "Longest time to wait before reopening IndexSearcher (i.e., periodic background reopen), or 0 to keep current value. (default: ${DEFAULT-VALUE})", - defaultValue = "0") - private double maxRefreshSec; - - @CommandLine.Option( - names = {"--minRefreshSec"}, - description = - "Shortest time to wait before reopening IndexSearcher (i.e., when a search is waiting for a specific indexGen), or 0 to keep current value. (default: ${DEFAULT-VALUE})", - defaultValue = "0") - private double minRefreshSec; - - @CommandLine.Option( - names = {"--maxSearcherAgeSec"}, - description = "Non-current searchers older than this are pruned. (default: ${DEFAULT-VALUE})", - defaultValue = "60.0") - private double maxSearcherAgeSec; - - @CommandLine.Option( - names = {"--indexRamBufferSizeMB"}, - description = "Size (in MB) of IndexWriter's RAM buffer. (default: ${DEFAULT-VALUE})", - defaultValue = "250") - private double indexRamBufferSizeMB; - - @CommandLine.Option( - names = {"--addDocumentsMaxBufferLen"}, - description = "Max number of documents to add at a time. (default: ${DEFAULT-VALUE})", - defaultValue = "100") - private int addDocumentsMaxBufferLen; - - @CommandLine.Option( - names = {"--sliceMaxDocs"}, - description = - "Max documents per index slice, or 0 to keep current value. (default: ${DEFAULT-VALUE})", - defaultValue = "0") - private int sliceMaxDocs; - - @CommandLine.Option( - names = {"--sliceMaxSegments"}, - description = - "Max segments per index slice, or 0 to keep current value. (default: ${DEFAULT-VALUE})", - defaultValue = "0") - private int sliceMaxSegments; - - @CommandLine.Option( - names = {"--virtualShards"}, - description = - "Number of virtual shards to partition index into, or 0 to keep current value. (default: ${DEFAULT-VALUE})", - defaultValue = "0") - private int virtualShards; - - @CommandLine.Option( - names = {"--maxMergedSegmentMB"}, - description = - "Max sized segment to produce during normal merging (default: ${DEFAULT-VALUE})", - defaultValue = "0") - private int maxMergedSegmentMB; - - @CommandLine.Option( - names = {"--segmentsPerTier"}, - description = - "Number of segments per tier used by TieredMergePolicy, or 0 to keep current value. (default: ${DEFAULT-VALUE})", - defaultValue = "0") - private int segmentsPerTier; - - @CommandLine.Option( - names = {"--defaultSearchTimeoutSec"}, - description = - "Search timeout to use when not provided by the request, or -1 to keep current value. (default: ${DEFAULT-VALUE})", - defaultValue = "-1") - private double defaultSearchTimeoutSec; - - @CommandLine.Option( - names = {"--defaultSearchTimeoutCheckEvery"}, - description = - "Timeout check every value to use when not provided by the request, or -1 to keep current value. (default: ${DEFAULT-VALUE})", - defaultValue = "-1") - private int defaultSearchTimeoutCheckEvery; - - @CommandLine.Option( - names = {"--defaultTerminateAfter"}, - description = - "Terminate after to use when not provided by the request, or -1 to keep current value. (default: ${DEFAULT-VALUE})", - defaultValue = "-1") - private int defaultTerminateAfter; - - public String getIndexName() { - return indexName; - } - - public double getMaxRefreshSec() { - return maxRefreshSec; - } - - public double getMinRefreshSec() { - return minRefreshSec; - } - - public double getMaxSearcherAgeSec() { - return maxSearcherAgeSec; - } - - public double getIndexRamBufferSizeMB() { - return indexRamBufferSizeMB; - } - - public int getAddDocumentsMaxBufferLen() { - return addDocumentsMaxBufferLen; - } - - public int getSliceMaxDocs() { - return sliceMaxDocs; - } - - public int getSliceMaxSegments() { - return sliceMaxSegments; - } - - public int getVirtualShards() { - return virtualShards; - } - - public int getMaxMergedSegmentMB() { - return maxMergedSegmentMB; - } - - public int getSegmentsPerTier() { - return segmentsPerTier; - } - - public double getDefaultSearchTimeoutSec() { - return defaultSearchTimeoutSec; - } - - public int getDefaultSearchTimeoutCheckEvery() { - return defaultSearchTimeoutCheckEvery; - } - - public int getDefaultTerminateAfter() { - return defaultTerminateAfter; - } - - @Override - public Integer call() throws Exception { - LuceneServerClient client = baseCmd.getClient(); - try { - client.liveSettings( - getIndexName(), - getMaxRefreshSec(), - getMinRefreshSec(), - getMaxSearcherAgeSec(), - getIndexRamBufferSizeMB(), - getAddDocumentsMaxBufferLen(), - getSliceMaxDocs(), - getSliceMaxSegments(), - getVirtualShards(), - getMaxMergedSegmentMB(), - getSegmentsPerTier(), - getDefaultSearchTimeoutSec(), - getDefaultSearchTimeoutCheckEvery(), - getDefaultTerminateAfter()); - } finally { - client.shutdown(); - } - return 0; - } -} diff --git a/src/main/java/com/yelp/nrtsearch/server/cli/LuceneClientCommand.java b/src/main/java/com/yelp/nrtsearch/server/cli/LuceneClientCommand.java index cc73f23b8..4a85378c5 100644 --- a/src/main/java/com/yelp/nrtsearch/server/cli/LuceneClientCommand.java +++ b/src/main/java/com/yelp/nrtsearch/server/cli/LuceneClientCommand.java @@ -37,14 +37,12 @@ ForceMergeCommand.class, ForceMergeDeletesCommand.class, IndicesCommand.class, - LiveSettingsCommand.class, LiveSettingsV2Command.class, ReadyCommand.class, RefreshCommand.class, RegisterFieldsCommand.class, ReloadStateCommand.class, SearchCommand.class, - SettingsCommand.class, SettingsV2Command.class, StartIndexCommand.class, StartIndexV2Command.class, diff --git a/src/main/java/com/yelp/nrtsearch/server/cli/SettingsCommand.java b/src/main/java/com/yelp/nrtsearch/server/cli/SettingsCommand.java deleted file mode 100644 index eb3875836..000000000 --- a/src/main/java/com/yelp/nrtsearch/server/cli/SettingsCommand.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2020 Yelp Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yelp.nrtsearch.server.cli; - -import com.yelp.nrtsearch.server.grpc.LuceneServerClient; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.concurrent.Callable; -import picocli.CommandLine; - -@CommandLine.Command( - name = SettingsCommand.SETTINGS, - description = - "Updates the settings for the specified index from the file if no settings specified gets the current settings") -public class SettingsCommand implements Callable { - public static final String SETTINGS = "settings"; - - @CommandLine.ParentCommand private LuceneClientCommand baseCmd; - - @CommandLine.Option( - names = {"-f", "--fileName"}, - description = "Name of the file containing the settings to be updated", - required = true) - private String fileName; - - public String getFileName() { - return fileName; - } - - @Override - public Integer call() throws Exception { - LuceneServerClient client = baseCmd.getClient(); - try { - Path filePath = Paths.get(getFileName()); - client.settings(filePath); - } finally { - client.shutdown(); - } - return 0; - } -} diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServerClient.java b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServerClient.java index 61ab82cc5..1dfd9ca54 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServerClient.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServerClient.java @@ -26,6 +26,7 @@ import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -112,71 +113,8 @@ public void createIndex( try { response = blockingStub.createIndex(requestBuilder.build()); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; - } - logger.info("Server returned : " + response.getResponse()); - } - - public void liveSettings( - String indexName, - double maxRefreshSec, - double minRefreshSec, - double maxSearcherAgeSec, - double indexRamBufferSizeMB, - int addDocumentsMaxBufferLen, - int sliceMaxDocs, - int sliceMaxSegments, - int virtualShards, - int maxMergedSegmentMB, - int segmentsPerTier, - double defaultSearchTimeoutSec, - int defaultSearchTimeoutCheckEvery, - int defaultTerminateAfter) { - logger.info( - String.format( - "will try to update liveSettings for indexName: %s, " - + "maxRefreshSec: %s, minRefreshSec: %s, maxSearcherAgeSec: %s, " - + "indexRamBufferSizeMB: %s, addDocumentsMaxBufferLen: %s, sliceMaxDocs: %s, " - + "sliceMaxSegments: %s, virtualShards: %s, maxMergedSegmentMB: %s, segmentsPerTier: %s, " - + "defaultSearchTimeoutSec: %s, defaultSearchTimeoutCheckEvery: %s, defaultTerminateAfter: %s ", - indexName, - maxRefreshSec, - minRefreshSec, - maxSearcherAgeSec, - indexRamBufferSizeMB, - addDocumentsMaxBufferLen, - sliceMaxDocs, - sliceMaxSegments, - virtualShards, - maxMergedSegmentMB, - segmentsPerTier, - defaultSearchTimeoutSec, - defaultSearchTimeoutCheckEvery, - defaultTerminateAfter)); - LiveSettingsRequest request = - LiveSettingsRequest.newBuilder() - .setIndexName(indexName) - .setMaxRefreshSec(maxRefreshSec) - .setMinRefreshSec(minRefreshSec) - .setMaxSearcherAgeSec(maxSearcherAgeSec) - .setIndexRamBufferSizeMB(indexRamBufferSizeMB) - .setAddDocumentsMaxBufferLen(addDocumentsMaxBufferLen) - .setSliceMaxDocs(sliceMaxDocs) - .setSliceMaxSegments(sliceMaxSegments) - .setVirtualShards(virtualShards) - .setMaxMergedSegmentMB(maxMergedSegmentMB) - .setSegmentsPerTier(segmentsPerTier) - .setDefaultSearchTimeoutSec(defaultSearchTimeoutSec) - .setDefaultSearchTimeoutCheckEvery(defaultSearchTimeoutCheckEvery) - .setDefaultTerminateAfter(defaultTerminateAfter) - .build(); - LiveSettingsResponse response; - try { - response = blockingStub.liveSettings(request); - } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to create index {}", indexName); + throw e; } logger.info("Server returned : " + response.getResponse()); } @@ -186,8 +124,8 @@ public void liveSettingsV2(LiveSettingsV2Request liveSettingsV2Request) { try { response = blockingStub.liveSettingsV2(liveSettingsV2Request); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to update live settings"); + throw e; } try { logger.info("Server returned : " + JsonFormat.printer().print(response.getLiveSettings())); @@ -202,8 +140,8 @@ public void registerFields(String jsonStr) { try { response = blockingStub.registerFields(fieldDefRequest); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to register fields"); + throw e; } logger.info("Server returned : " + response.getResponse()); } @@ -213,21 +151,9 @@ public void reloadState() { try { blockingStub.reloadState(reloadStateRequest); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - } - } - - public void settings(Path filePath) throws IOException { - SettingsRequest settingsRequest = - new LuceneServerClientBuilder.SettingsClientBuilder().buildRequest(filePath); - SettingsResponse response; - try { - response = blockingStub.settings(settingsRequest); - } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to reload state"); + throw e; } - logger.info("Server returned : " + response.getResponse()); } public void settingsV2(String indexName, Path filePath) throws IOException { @@ -243,8 +169,8 @@ public void settingsV2(String indexName, Path filePath) throws IOException { try { response = blockingStub.settingsV2(settingsRequest); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to apply settings"); + throw e; } try { logger.info("Server returned : " + JsonFormat.printer().print(response.getSettings())); @@ -260,8 +186,8 @@ public void startIndex(Path filePath) throws IOException { try { response = blockingStub.startIndex(startIndexRequest); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to start index"); + throw e; } logger.info("Server returned : " + response.toString()); } @@ -273,8 +199,8 @@ public void startIndexV2(String indexName) { try { response = blockingStub.startIndexV2(startIndexRequest); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to start index {}", indexName); + throw e; } logger.info("Server returned : " + response.toString()); } @@ -282,6 +208,7 @@ public void startIndexV2(String indexName) { public void addDocuments(Stream addDocumentRequestStream) throws InterruptedException { final CountDownLatch finishLatch = new CountDownLatch(1); + List exception = new ArrayList<>(); StreamObserver responseObserver = new StreamObserver<>() { @@ -298,6 +225,7 @@ public void onNext(AddDocumentResponse value) { @Override public void onError(Throwable t) { logger.error(t.getMessage(), t); + exception.add(t); finishLatch.countDown(); } @@ -326,7 +254,11 @@ public void onCompleted() { // Receiving happens asynchronously, so block here for 5 minutes if (!finishLatch.await(5, TimeUnit.MINUTES)) { - logger.warn("addDocuments can not finish within 5 minutes"); + throw new IllegalStateException("addDocuments can not finish within 5 minutes"); + } + + if (!exception.isEmpty()) { + throw new RuntimeException(exception.get(0)); } } @@ -337,8 +269,8 @@ public void refresh(String indexName) { try { response = blockingStub.refresh(request); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to call refresh on index {}", indexName); + throw e; } logger.info("Server returned refreshTimeMS : " + response.getRefreshTimeMS()); } @@ -350,8 +282,8 @@ public void commit(String indexName) { try { response = blockingStub.commit(request); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to commit index {}", indexName); + throw e; } logger.info( "Server returned sequence id: " @@ -367,8 +299,8 @@ public void stats(String indexName) { try { response = blockingStub.stats(request); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to retrieve stats for index {}", indexName); + throw e; } logger.info("Server returned sequence id: " + response); } @@ -394,8 +326,8 @@ public void search(Path filePath) throws IOException { try { response = blockingStub.search(searchRequest); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to search"); + throw e; } logger.info("Server returned : " + response.toString()); } @@ -407,8 +339,8 @@ public void delete(Path filePath) throws IOException { try { response = blockingStub.delete(addDocumentRequest); } catch (StatusRuntimeException e) { - logger.warn("RPC failed: {}", e.getStatus()); - return; + logger.error("Unable to delete documents"); + throw e; } logger.info( "Server returned indexGen : "