diff --git a/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java b/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java index 426e5892a..4de7c6e0f 100644 --- a/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java +++ b/src/main/java/com/yelp/nrtsearch/server/config/NrtsearchConfig.java @@ -19,6 +19,7 @@ import com.google.protobuf.util.JsonFormat; import com.yelp.nrtsearch.server.grpc.IndexLiveSettings; import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; +import com.yelp.nrtsearch.server.index.DirectoryFactory; import com.yelp.nrtsearch.server.utils.JsonUtils; import com.yelp.nrtsearch.server.warming.WarmerConfig; import java.io.IOException; @@ -108,6 +109,7 @@ public class NrtsearchConfig { private final int lowPriorityCopyPercentage; private final boolean verifyReplicationIndexId; private final boolean useKeepAliveForReplication; + private final DirectoryFactory.MMapGrouping mmapGrouping; @Inject public NrtsearchConfig(InputStream yamlStream) { @@ -180,6 +182,11 @@ public NrtsearchConfig(InputStream yamlStream) { lowPriorityCopyPercentage = configReader.getInteger("lowPriorityCopyPercentage", 0); verifyReplicationIndexId = configReader.getBoolean("verifyReplicationIndexId", true); useKeepAliveForReplication = configReader.getBoolean("useKeepAliveForReplication", false); + mmapGrouping = + configReader.get( + "mmapGrouping", + o -> DirectoryFactory.parseMMapGrouping(o.toString()), + DirectoryFactory.MMapGrouping.SEGMENT); List indicesWithOverrides = configReader.getKeysOrEmpty("indexLiveSettingsOverrides"); Map liveSettingsMap = new HashMap<>(); @@ -360,6 +367,10 @@ public boolean getUseKeepAliveForReplication() { return useKeepAliveForReplication; } + public DirectoryFactory.MMapGrouping getMMapGrouping() { + return mmapGrouping; + } + public IndexLiveSettings getLiveSettingsOverride(String indexName) { return indexLiveSettingsOverrides.getOrDefault( indexName, IndexLiveSettings.newBuilder().build()); diff --git a/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileHandler.java b/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileHandler.java index 7a5e44f8b..bb370aeb1 100644 --- a/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/handler/RecvRawFileHandler.java @@ -48,7 +48,7 @@ public void handle(FileInfo fileInfoRequest, StreamObserver respon IndexState indexState = indexStateManager.getCurrent(); ShardState shardState = indexState.getShard(0); try (IndexInput luceneFile = - shardState.indexDir.openInput(fileInfoRequest.getFileName(), IOContext.DEFAULT)) { + shardState.indexDir.openInput(fileInfoRequest.getFileName(), IOContext.READONCE)) { long len = luceneFile.length(); long pos = fileInfoRequest.getFpStart(); luceneFile.seek(pos); diff --git a/src/main/java/com/yelp/nrtsearch/server/index/DirectoryFactory.java b/src/main/java/com/yelp/nrtsearch/server/index/DirectoryFactory.java index fbb0647c4..3a858fdf5 100644 --- a/src/main/java/com/yelp/nrtsearch/server/index/DirectoryFactory.java +++ b/src/main/java/com/yelp/nrtsearch/server/index/DirectoryFactory.java @@ -15,11 +15,15 @@ */ package com.yelp.nrtsearch.server.index; +import com.google.common.annotations.VisibleForTesting; import com.yelp.nrtsearch.server.config.IndexPreloadConfig; +import com.yelp.nrtsearch.server.config.NrtsearchConfig; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.nio.file.Path; +import java.util.Optional; +import java.util.function.Function; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.MMapDirectory; @@ -28,6 +32,13 @@ /** A factory to open a {@link Directory} from a provided filesystem path. */ public abstract class DirectoryFactory { + // How files should be grouped in memory Arenas when using MMapDirectory + public enum MMapGrouping { + SEGMENT, + SEGMENT_EXCEPT_SI, + NONE + } + /** Sole constructor. */ public DirectoryFactory() {} @@ -43,7 +54,7 @@ public DirectoryFactory() {} * Returns an instance, using the specified implementation {FSDirectory, MMapDirectory, * NIOFSDirectory}. */ - public static DirectoryFactory get(final String dirImpl) { + public static DirectoryFactory get(final String dirImpl, NrtsearchConfig config) { if (dirImpl.equals("FSDirectory")) { return new DirectoryFactory() { @Override @@ -51,6 +62,7 @@ public Directory open(Path path, IndexPreloadConfig preloadConfig) throws IOExce Directory directory = FSDirectory.open(path); if (directory instanceof MMapDirectory mMapDirectory) { mMapDirectory.setPreload(preloadConfig.preloadPredicate()); + setMMapGrouping(mMapDirectory, config.getMMapGrouping()); } return directory; } @@ -61,6 +73,7 @@ public Directory open(Path path, IndexPreloadConfig preloadConfig) throws IOExce public Directory open(Path path, IndexPreloadConfig preloadConfig) throws IOException { MMapDirectory mMapDirectory = new MMapDirectory(path); mMapDirectory.setPreload(preloadConfig.preloadPredicate()); + setMMapGrouping(mMapDirectory, config.getMMapGrouping()); return mMapDirectory; } }; @@ -105,7 +118,7 @@ public Directory open(Path path, IndexPreloadConfig preloadConfig) throws IOExce } else { return finalCtor.newInstance(path, preloadConfig); } - } catch (InstantiationException ie) { + } catch (InstantiationException | InvocationTargetException | IllegalAccessException ie) { throw new RuntimeException( "failed to instantiate directory class \"" + dirImpl @@ -113,25 +126,49 @@ public Directory open(Path path, IndexPreloadConfig preloadConfig) throws IOExce + path + "\"", ie); - } catch (InvocationTargetException ite) { - throw new RuntimeException( - "failed to instantiate directory class \"" - + dirImpl - + "\" on path=\"" - + path - + "\"", - ite); - } catch (IllegalAccessException iae) { - throw new RuntimeException( - "failed to instantiate directory class \"" - + dirImpl - + "\" on path=\"" - + path - + "\"", - iae); } } }; } } + + // Function to group segments by their names, excluding ".si" files + public static Function> SEGMENT_EXCEPT_SI_FUNCTION = + (filename) -> { + if (filename.endsWith(".si")) { + return Optional.empty(); + } + return MMapDirectory.GROUP_BY_SEGMENT.apply(filename); + }; + + /** + * Set MMapGrouping for the directory. + * + * @param directory the MMapDirectory + * @param grouping the MMapGrouping + */ + @VisibleForTesting + static void setMMapGrouping(MMapDirectory directory, MMapGrouping grouping) { + switch (grouping) { + case SEGMENT -> directory.setGroupingFunction(MMapDirectory.GROUP_BY_SEGMENT); + case SEGMENT_EXCEPT_SI -> directory.setGroupingFunction(SEGMENT_EXCEPT_SI_FUNCTION); + case NONE -> directory.setGroupingFunction(MMapDirectory.NO_GROUPING); + } + } + + /** + * Parse MMapGrouping from string. + * + * @param grouping the string representation of the grouping + * @return MMapGrouping + * @throws IllegalArgumentException if the grouping is invalid + */ + public static MMapGrouping parseMMapGrouping(String grouping) { + return switch (grouping) { + case "SEGMENT" -> MMapGrouping.SEGMENT; + case "SEGMENT_EXCEPT_SI" -> MMapGrouping.SEGMENT_EXCEPT_SI; + case "NONE" -> MMapGrouping.NONE; + default -> throw new IllegalArgumentException("Invalid MMapGrouping: " + grouping); + }; + } } diff --git a/src/main/java/com/yelp/nrtsearch/server/index/ImmutableIndexState.java b/src/main/java/com/yelp/nrtsearch/server/index/ImmutableIndexState.java index 30c01dc18..8fbaffe34 100644 --- a/src/main/java/com/yelp/nrtsearch/server/index/ImmutableIndexState.java +++ b/src/main/java/com/yelp/nrtsearch/server/index/ImmutableIndexState.java @@ -237,7 +237,9 @@ public ImmutableIndexState( } indexMergeSchedulerAutoThrottle = mergedSettings.getIndexMergeSchedulerAutoThrottle().getValue(); - directoryFactory = DirectoryFactory.get(mergedSettings.getDirectory().getValue()); + directoryFactory = + DirectoryFactory.get( + mergedSettings.getDirectory().getValue(), globalState.getConfiguration()); // live settings mergedLiveSettings = diff --git a/src/main/java/com/yelp/nrtsearch/server/nrt/NRTPrimaryNode.java b/src/main/java/com/yelp/nrtsearch/server/nrt/NRTPrimaryNode.java index 53006c80a..659560a2e 100644 --- a/src/main/java/com/yelp/nrtsearch/server/nrt/NRTPrimaryNode.java +++ b/src/main/java/com/yelp/nrtsearch/server/nrt/NRTPrimaryNode.java @@ -579,4 +579,9 @@ public void close() throws IOException { nrtDataManager.close(); super.close(); } + + @Override + public FileMetaData readLocalFileMetaData(String fileName) throws IOException { + return NrtUtils.readOnceLocalFileMetaData(fileName, lastFileMetaData, this); + } } diff --git a/src/main/java/com/yelp/nrtsearch/server/nrt/NRTReplicaNode.java b/src/main/java/com/yelp/nrtsearch/server/nrt/NRTReplicaNode.java index 6320dbd5b..fb8c61708 100644 --- a/src/main/java/com/yelp/nrtsearch/server/nrt/NRTReplicaNode.java +++ b/src/main/java/com/yelp/nrtsearch/server/nrt/NRTReplicaNode.java @@ -383,4 +383,9 @@ public void syncFromCurrentPrimary(long primaryWaitMs, long maxTimeMs) throws IO } logger.info("Finished syncing nrt point from current primary, current version: {}", curVersion); } + + @Override + public FileMetaData readLocalFileMetaData(String fileName) throws IOException { + return NrtUtils.readOnceLocalFileMetaData(fileName, lastFileMetaData, this); + } } diff --git a/src/main/java/com/yelp/nrtsearch/server/nrt/NrtUtils.java b/src/main/java/com/yelp/nrtsearch/server/nrt/NrtUtils.java new file mode 100644 index 000000000..e50d256c1 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/server/nrt/NrtUtils.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.server.nrt; + +import static org.apache.lucene.replicator.nrt.Node.VERBOSE_FILES; +import static org.apache.lucene.replicator.nrt.Node.bytesToString; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Map; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.replicator.nrt.FileMetaData; +import org.apache.lucene.replicator.nrt.Node; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; + +public class NrtUtils { + + /** + * Modified version of {@link Node} implementation that opens files with the READONCE io context. + * + * @param fileName the name of the file to read + * @param cache a map to cache file metadata + * @param node the nrt node reading the metadata + * @return the metadata of the file, or null if the file is corrupt or does not exist + * @throws IOException if an I/O error occurs + */ + public static FileMetaData readOnceLocalFileMetaData( + String fileName, Map cache, Node node) throws IOException { + + FileMetaData result; + if (cache != null) { + // We may already have this file cached from the last NRT point: + result = cache.get(fileName); + } else { + result = null; + } + + if (result == null) { + // Pull from the filesystem + long checksum; + long length; + byte[] header; + byte[] footer; + try (IndexInput in = node.getDirectory().openInput(fileName, IOContext.READONCE)) { + try { + length = in.length(); + header = CodecUtil.readIndexHeader(in); + footer = CodecUtil.readFooter(in); + checksum = CodecUtil.retrieveChecksum(in); + } catch (@SuppressWarnings("unused") EOFException | CorruptIndexException cie) { + // File exists but is busted: we must copy it. This happens when node had crashed, + // corrupting an un-fsync'd file. On init we try + // to delete such unreferenced files, but virus checker can block that, leaving this bad + // file. + if (VERBOSE_FILES) { + node.message("file " + fileName + ": will copy [existing file is corrupt]"); + } + return null; + } + if (VERBOSE_FILES) { + node.message("file " + fileName + " has length=" + bytesToString(length)); + } + } catch (@SuppressWarnings("unused") FileNotFoundException | NoSuchFileException e) { + if (VERBOSE_FILES) { + node.message("file " + fileName + ": will copy [file does not exist]"); + } + return null; + } + + // NOTE: checksum is redundant w/ footer, but we break it out separately because when the bits + // cross the wire we need direct access to + // checksum when copying to catch bit flips: + result = new FileMetaData(header, footer, length, checksum); + } + + return result; + } +} diff --git a/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java b/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java index 95a30019d..5d42e0b44 100644 --- a/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/config/NrtsearchConfigTest.java @@ -21,6 +21,7 @@ import com.google.protobuf.Int32Value; import com.yelp.nrtsearch.server.grpc.IndexLiveSettings; import com.yelp.nrtsearch.server.grpc.ReplicationServerClient; +import com.yelp.nrtsearch.server.index.DirectoryFactory; import java.io.ByteArrayInputStream; import org.apache.lucene.search.suggest.document.CompletionPostingsFormat.FSTLoadMode; import org.junit.Test; @@ -206,4 +207,18 @@ public void testVerifyReplicationIndexId_set() { NrtsearchConfig luceneConfig = getForConfig(config); assertFalse(luceneConfig.getVerifyReplicationIndexId()); } + + @Test + public void testMMapGrouping_default() { + String config = "nodeName: \"lucene_server_foo\""; + NrtsearchConfig luceneConfig = getForConfig(config); + assertEquals(DirectoryFactory.MMapGrouping.SEGMENT, luceneConfig.getMMapGrouping()); + } + + @Test + public void testMMapGrouping_set() { + String config = "mmapGrouping: NONE"; + NrtsearchConfig luceneConfig = getForConfig(config); + assertEquals(DirectoryFactory.MMapGrouping.NONE, luceneConfig.getMMapGrouping()); + } } diff --git a/src/test/java/com/yelp/nrtsearch/server/index/DirectoryFactoryTest.java b/src/test/java/com/yelp/nrtsearch/server/index/DirectoryFactoryTest.java index a0ed35119..8dcbd834e 100644 --- a/src/test/java/com/yelp/nrtsearch/server/index/DirectoryFactoryTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/index/DirectoryFactoryTest.java @@ -15,10 +15,16 @@ */ package com.yelp.nrtsearch.server.index; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import com.yelp.nrtsearch.server.config.IndexPreloadConfig; +import com.yelp.nrtsearch.server.config.NrtsearchConfig; import com.yelp.nrtsearch.server.config.YamlConfigReader; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -33,8 +39,16 @@ public class DirectoryFactoryTest { @ClassRule public static final TemporaryFolder folder = new TemporaryFolder(); - private static final DirectoryFactory mmFactory = DirectoryFactory.get("MMapDirectory"); - private static final DirectoryFactory fsFactory = DirectoryFactory.get("FSDirectory"); + private static final DirectoryFactory mmFactory = + DirectoryFactory.get( + "MMapDirectory", + new NrtsearchConfig( + new ByteArrayInputStream("nodeName: \"lucene_server_foo\"".getBytes()))); + private static final DirectoryFactory fsFactory = + DirectoryFactory.get( + "FSDirectory", + new NrtsearchConfig( + new ByteArrayInputStream("nodeName: \"lucene_server_foo\"".getBytes()))); @Test public void testMMapDefault() throws IOException { @@ -117,4 +131,40 @@ public void testFSPreloadAll() throws IOException { assertTrue(((MMapDirectory) directory).getPreload()); } } + + @Test + public void testParseMMapGrouping() { + assertSame( + DirectoryFactory.MMapGrouping.SEGMENT, DirectoryFactory.parseMMapGrouping("SEGMENT")); + assertSame( + DirectoryFactory.MMapGrouping.SEGMENT_EXCEPT_SI, + DirectoryFactory.parseMMapGrouping("SEGMENT_EXCEPT_SI")); + assertSame(DirectoryFactory.MMapGrouping.NONE, DirectoryFactory.parseMMapGrouping("NONE")); + } + + @Test + public void testParseMMapGroupingInvalid() { + try { + DirectoryFactory.parseMMapGrouping("INVALID"); + } catch (IllegalArgumentException e) { + assertEquals("Invalid MMapGrouping: INVALID", e.getMessage()); + } + } + + @Test + public void testSetMMapGrouping() { + MMapDirectory mockMMapDirectory = mock(MMapDirectory.class); + DirectoryFactory.setMMapGrouping(mockMMapDirectory, DirectoryFactory.MMapGrouping.SEGMENT); + verify(mockMMapDirectory, times(1)).setGroupingFunction(MMapDirectory.GROUP_BY_SEGMENT); + + mock(MMapDirectory.class); + DirectoryFactory.setMMapGrouping( + mockMMapDirectory, DirectoryFactory.MMapGrouping.SEGMENT_EXCEPT_SI); + verify(mockMMapDirectory, times(1)) + .setGroupingFunction(DirectoryFactory.SEGMENT_EXCEPT_SI_FUNCTION); + + mock(MMapDirectory.class); + DirectoryFactory.setMMapGrouping(mockMMapDirectory, DirectoryFactory.MMapGrouping.NONE); + verify(mockMMapDirectory, times(1)).setGroupingFunction(MMapDirectory.NO_GROUPING); + } }