Skip to content

Commit

Permalink
Add updated versions of nrt_utils snapshot/restore commands
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme committed Sep 17, 2024
1 parent df81b1e commit 462094b
Show file tree
Hide file tree
Showing 25 changed files with 2,746 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import com.yelp.nrtsearch.server.luceneserver.nrt.state.NrtFileMetaData;
import com.yelp.nrtsearch.server.luceneserver.nrt.state.NrtPointState;
import com.yelp.nrtsearch.server.remote.RemoteBackend;
import com.yelp.nrtsearch.server.remote.RemoteUtils;
import com.yelp.nrtsearch.server.utils.FileUtil;
import com.yelp.nrtsearch.server.utils.TimeStringUtil;
import java.io.Closeable;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -180,7 +182,9 @@ public void restoreIfNeeded(Path shardDataDir) throws IOException {

if (hasRestoreData()) {
logger.info("Restoring index data for service: {}, index: {}", serviceName, indexIdentifier);
NrtPointState pointState = remoteBackend.downloadPointState(serviceName, indexIdentifier);
InputStream pointStateStream = remoteBackend.downloadPointState(serviceName, indexIdentifier);
byte[] pointStateBytes = pointStateStream.readAllBytes();
NrtPointState pointState = RemoteUtils.pointStateFromUtf8(pointStateBytes);

long start = System.nanoTime();
try {
Expand Down Expand Up @@ -292,7 +296,8 @@ public void run() {
task.copyState.version);
Map<String, NrtFileMetaData> versionFiles = uploadDiff(task.copyState);
NrtPointState pointState = new NrtPointState(task.copyState, versionFiles, ephemeralId);
remoteBackend.uploadPointState(serviceName, indexIdentifier, pointState);
byte[] data = RemoteUtils.pointStateToUtf8(pointState);
remoteBackend.uploadPointState(serviceName, indexIdentifier, pointState, data);
lastPointState = pointState;
} else {
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.yelp.nrtsearch.server.luceneserver.nrt.state;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -31,6 +33,8 @@
* primaryId and timeString.
*/
@JsonDeserialize(using = NrtFileMetaDataDeserializer.class)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class NrtFileMetaData extends FileMetaData {

public String primaryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.yelp.nrtsearch.server.luceneserver.nrt.state;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -26,6 +28,8 @@
import org.apache.lucene.replicator.nrt.FileMetaData;

/** State of a single NRT point, including the files and metadata associated with it. */
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public final class NrtPointState {

public Map<String, NrtFileMetaData> files;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,21 @@ void downloadIndexFiles(
*
* @param service service name
* @param indexIdentifier unique index identifier
* @param nrtPointState NRT point state to upload
* @param nrtPointState NRT point state
* @param data point state data to upload
* @throws IOException on error uploading point state
*/
void uploadPointState(String service, String indexIdentifier, NrtPointState nrtPointState)
void uploadPointState(
String service, String indexIdentifier, NrtPointState nrtPointState, byte[] data)
throws IOException;

/**
* Download NRT point state from the remote backend.
*
* @param service service name
* @param indexIdentifier unique index identifier
* @return downloaded NRT point state
* @return input stream of point state data
* @throws IOException on error downloading point state
*/
NrtPointState downloadPointState(String service, String indexIdentifier) throws IOException;
InputStream downloadPointState(String service, String indexIdentifier) throws IOException;
}
50 changes: 50 additions & 0 deletions src/main/java/com/yelp/nrtsearch/server/remote/RemoteUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2024 Yelp Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yelp.nrtsearch.server.remote;

import com.yelp.nrtsearch.server.luceneserver.nrt.state.NrtPointState;
import com.yelp.nrtsearch.server.luceneserver.state.StateUtils;
import com.yelp.nrtsearch.server.utils.JsonUtils;
import java.io.IOException;

public class RemoteUtils {

private RemoteUtils() {}

/**
* Convert a {@link NrtPointState} to a UTF-8 encoded byte array.
*
* @param pointState point state
* @return UTF-8 encoded byte array
* @throws IOException on error converting to UTF-8
*/
public static byte[] pointStateToUtf8(NrtPointState pointState) throws IOException {
String jsonString = JsonUtils.objectToJsonStr(pointState);
return StateUtils.toUTF8(jsonString);
}

/**
* Convert a UTF-8 encoded byte array to a {@link NrtPointState}.
*
* @param data UTF-8 encoded byte array
* @return NrtPointState
* @throws IOException on error converting from UTF-8
*/
public static NrtPointState pointStateFromUtf8(byte[] data) throws IOException {
String jsonString = StateUtils.fromUTF8(data);
return JsonUtils.readValue(jsonString, NrtPointState.class);
}
}
132 changes: 93 additions & 39 deletions src/main/java/com/yelp/nrtsearch/server/remote/s3/S3Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.yelp.nrtsearch.server.config.LuceneServerConfiguration;
import com.yelp.nrtsearch.server.luceneserver.nrt.state.NrtFileMetaData;
Expand Down Expand Up @@ -58,9 +56,6 @@

/** Backend implementation that stored data in amazon s3 object storage. */
public class S3Backend implements RemoteBackend {
private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

static final String GLOBAL_STATE_PREFIX_FORMAT = "%s/%s/";
static final String INDEX_RESOURCE_PREFIX_FORMAT = "%s/%s/%s/";
static final String GLOBAL_STATE_FILE_FORMAT = "%s-%s";
Expand Down Expand Up @@ -124,6 +119,10 @@ public S3Backend(String serviceBucket, boolean savePluginBeforeUnzip, AmazonS3 s
.build();
}

public AmazonS3 getS3() {
return s3;
}

@Override
public String downloadPluginIfNeeded(String pluginNameOrPath, Path destPath) {
if (S3Util.isValidS3FilePath(pluginNameOrPath) && pluginNameOrPath.endsWith(ZIP_EXTENSION)) {
Expand Down Expand Up @@ -276,8 +275,35 @@ static String getGlobalStateResourcePrefix(String service) {
return String.format(GLOBAL_STATE_PREFIX_FORMAT, service, GLOBAL_STATE);
}

@VisibleForTesting
static String getIndexResourcePrefix(
/**
* Get the S3 prefix for the specified index resource type.
*
* @param service service name
* @param indexIdentifier index identifier
* @param resourceType resource type
* @return S3 prefix
*/
public static String getIndexResourcePrefix(
String service, String indexIdentifier, IndexResourceType resourceType) {
return switch (resourceType) {
case WARMING_QUERIES -> getIndexResourcePrefix(service, indexIdentifier, WARMING);
case POINT_STATE -> getIndexResourcePrefix(service, indexIdentifier, POINT_STATE);
case INDEX_STATE -> getIndexResourcePrefix(service, indexIdentifier, INDEX_STATE);
};
}

/**
* Get the S3 prefix for index data files.
*
* @param service service name
* @param indexIdentifier index identifier
* @return S3 prefix
*/
public static String getIndexDataPrefix(String service, String indexIdentifier) {
return getIndexResourcePrefix(service, indexIdentifier, DATA);
}

private static String getIndexResourcePrefix(
String service, String indexIdentifier, String resourceType) {
return String.format(INDEX_RESOURCE_PREFIX_FORMAT, service, indexIdentifier, resourceType);
}
Expand All @@ -298,29 +324,31 @@ public InputStream downloadGlobalState(String service) throws IOException {
@Override
public void uploadIndexState(String service, String indexIdentifier, byte[] data)
throws IOException {
String prefix = getIndexResourcePrefix(service, indexIdentifier, INDEX_STATE);
String prefix = getIndexResourcePrefix(service, indexIdentifier, IndexResourceType.INDEX_STATE);
String fileName = getIndexStateFileName();
uploadResource(prefix, fileName, service, indexIdentifier, data);
}

@Override
public InputStream downloadIndexState(String service, String indexIdentifier) throws IOException {
String prefix = getIndexResourcePrefix(service, indexIdentifier, INDEX_STATE);
String prefix = getIndexResourcePrefix(service, indexIdentifier, IndexResourceType.INDEX_STATE);
return downloadResource(prefix);
}

@Override
public void uploadWarmingQueries(String service, String indexIdentifier, byte[] data)
throws IOException {
String prefix = getIndexResourcePrefix(service, indexIdentifier, WARMING);
String prefix =
getIndexResourcePrefix(service, indexIdentifier, IndexResourceType.WARMING_QUERIES);
String fileName = getWarmingQueriesFileName();
uploadResource(prefix, fileName, service, indexIdentifier, data);
}

@Override
public InputStream downloadWarmingQueries(String service, String indexIdentifier)
throws IOException {
String prefix = getIndexResourcePrefix(service, indexIdentifier, WARMING);
String prefix =
getIndexResourcePrefix(service, indexIdentifier, IndexResourceType.WARMING_QUERIES);
return downloadResource(prefix);
}

Expand All @@ -329,7 +357,7 @@ public void uploadIndexFiles(
String service, String indexIdentifier, Path indexDir, Map<String, NrtFileMetaData> files)
throws IOException {
List<FileNamePair> fileList = getFileNamePairs(files);
String backendPrefix = getIndexResourcePrefix(service, indexIdentifier, DATA);
String backendPrefix = getIndexDataPrefix(service, indexIdentifier);
List<Upload> uploadList = new LinkedList<>();
boolean hasFailure = false;
Throwable failureCause = null;
Expand Down Expand Up @@ -372,7 +400,7 @@ public void downloadIndexFiles(
String service, String indexIdentifier, Path indexDir, Map<String, NrtFileMetaData> files)
throws IOException {
List<FileNamePair> fileList = getFileNamePairs(files);
String backendPrefix = getIndexResourcePrefix(service, indexIdentifier, DATA);
String backendPrefix = getIndexDataPrefix(service, indexIdentifier);
List<Download> downloadList = new LinkedList<>();
boolean hasFailure = false;
Throwable failureCause = null;
Expand Down Expand Up @@ -421,17 +449,16 @@ static List<FileNamePair> getFileNamePairs(Map<String, NrtFileMetaData> files) {
}

@Override
public void uploadPointState(String service, String indexIdentifier, NrtPointState nrtPointState)
public void uploadPointState(
String service, String indexIdentifier, NrtPointState nrtPointState, byte[] data)
throws IOException {
String prefix = getIndexResourcePrefix(service, indexIdentifier, POINT_STATE);
String prefix = getIndexResourcePrefix(service, indexIdentifier, IndexResourceType.POINT_STATE);
String fileName = getPointStateFileName(nrtPointState);
String backendKey = prefix + fileName;
String jsonString = OBJECT_MAPPER.writeValueAsString(nrtPointState);
byte[] bytes = StateUtils.toUTF8(jsonString);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(bytes.length);
metadata.setContentLength(data.length);
PutObjectRequest request =
new PutObjectRequest(serviceBucket, backendKey, new ByteArrayInputStream(bytes), metadata);
new PutObjectRequest(serviceBucket, backendKey, new ByteArrayInputStream(data), metadata);
request.setGeneralProgressListener(
new S3ProgressListenerImpl(service, indexIdentifier, "upload_point_state"));
s3.putObject(request);
Expand All @@ -440,18 +467,11 @@ public void uploadPointState(String service, String indexIdentifier, NrtPointSta
}

@Override
public NrtPointState downloadPointState(String service, String indexIdentifier)
throws IOException {
String prefix = getIndexResourcePrefix(service, indexIdentifier, POINT_STATE);
public InputStream downloadPointState(String service, String indexIdentifier) throws IOException {
String prefix = getIndexResourcePrefix(service, indexIdentifier, IndexResourceType.POINT_STATE);
String fileName = getCurrentResourceName(prefix);
String backendKey = prefix + fileName;
GetObjectRequest request = new GetObjectRequest(serviceBucket, backendKey);
request.setGeneralProgressListener(
new S3ProgressListenerImpl(service, indexIdentifier, "download_point_state"));
S3Object s3Object = s3.getObject(request);
byte[] stateBytes = IOUtils.toByteArray(s3Object.getObjectContent());
String jsonString = StateUtils.fromUTF8(stateBytes);
return OBJECT_MAPPER.readValue(jsonString, NrtPointState.class);
return downloadFromS3Path(serviceBucket, backendKey);
}

private void uploadResource(
Expand Down Expand Up @@ -486,31 +506,59 @@ static String getGlobalStateFileName() {
return String.format(GLOBAL_STATE_FILE_FORMAT, timestamp, UUID.randomUUID());
}

@VisibleForTesting
static String getIndexStateFileName() {
/**
* Get file name to use to store index state in S3. The file name will be unique for each call.
*
* @return file name
*/
public static String getIndexStateFileName() {
String timestamp = generateTimeStringSec();
return String.format(INDEX_STATE_FILE_FORMAT, timestamp, UUID.randomUUID());
}

@VisibleForTesting
static String getPointStateFileName(NrtPointState nrtPointState) {
/**
* Get file name to use to store point state in S3. The file name will depend on the point state
* index version.
*
* @param nrtPointState
* @return
*/
public static String getPointStateFileName(NrtPointState nrtPointState) {
String timestamp = generateTimeStringSec();
return String.format(
POINT_STATE_FILE_FORMAT, timestamp, nrtPointState.primaryId, nrtPointState.version);
}

@VisibleForTesting
static String getIndexBackendFileName(String fileName, NrtFileMetaData fileMetaData) {
/**
* Get the file name to use to store lucene index file in S3.
*
* @param fileName local file name
* @param fileMetaData file metadata
* @return backend file name
*/
public static String getIndexBackendFileName(String fileName, NrtFileMetaData fileMetaData) {
return String.format(
INDEX_BACKEND_FILE_FORMAT, fileMetaData.timeString, fileMetaData.primaryId, fileName);
}

@VisibleForTesting
static String getWarmingQueriesFileName() {
/**
* Get the file name to use to store warming queries in S3. The file name will be unique for each
* call.
*
* @return file name
*/
public static String getWarmingQueriesFileName() {
return String.format(WARMING_QUERIES_FILE_FORMAT, generateTimeStringSec(), UUID.randomUUID());
}

String getCurrentResourceName(String prefix) throws IOException {
/**
* Get the current blessed resource version for the specified prefix.
*
* @param prefix resource prefix
* @return current resource version
* @throws IOException if an error occurs while fetching the current resource version
*/
public String getCurrentResourceName(String prefix) throws IOException {
String key = prefix + CURRENT_VERSION;
try {
S3Object s3Object = s3.getObject(serviceBucket, key);
Expand All @@ -525,7 +573,13 @@ String getCurrentResourceName(String prefix) throws IOException {
}
}

void setCurrentResource(String prefix, String version) {
/**
* Set the current blessed resource version for the specified prefix.
*
* @param prefix resource prefix
* @param version resource version
*/
public void setCurrentResource(String prefix, String version) {
String key = prefix + CURRENT_VERSION;
byte[] bytes = StateUtils.toUTF8(version);
ObjectMetadata metadata = new ObjectMetadata();
Expand Down
Loading

0 comments on commit 462094b

Please sign in to comment.