From bd5497705b01bf0e713dee3ca24160e7e13d3fe6 Mon Sep 17 00:00:00 2001 From: Marc Kriguer Date: Mon, 29 Aug 2022 15:50:51 -0700 Subject: [PATCH 1/3] Write a new utility to copy an existing "transaction" avro file into a new location, shifting the consensus_timestamp field by a multiple of 3 years. Signed-off-by: Marc Kriguer --- .../util/ShiftAvroConsensusTimes.java | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java diff --git a/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java b/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java new file mode 100644 index 0000000..c017efb --- /dev/null +++ b/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java @@ -0,0 +1,158 @@ +package com.swirlds.streamloader.util; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class ShiftAvroConsensusTimes { + // copied from ./src/main/java/com/swirlds/streamloader/processing/TransactionProcessingBlock.java; check + // there for latest version! + private static final Schema TRANSACTION_AVRO_SCHEMA = new Schema.Parser().parse(""" + {"namespace": "com.swirlds", + "type": "record", + "name": "transaction", + "fields": [ + {"name": "consensus_timestamp", "type": "long"}, + {"name": "entityId", "type": "long"}, + {"name": "type", "type": "string"}, + {"name": "index", "type": "long"}, + {"name": "transaction_id", "type": "string"}, + {"name": "result", "type": "string"}, + {"name": "fields", "type": "string"}, + {"name": "transfers_tokens", "type": "string"}, + {"name": "transfers_hbar", "type": "string"}, + {"name": "transfers_nft", "type": "string"}, + {"name": "contract_logs", "type": "string", "default" : ""}, + {"name": "contract_results", "type": "string", "default" : ""}, + {"name": "contract_state_change", "type": "string"}, + {"name": "nonce", "type": "int"}, + {"name": "scheduled", "type": "boolean"}, + {"name": "assessed_custom_fees", "type": "string"}, + {"name": "ids", "type": {"type" : "array", "items" : "long"}}, + {"name": "credited_ids", "type": {"type" : "array", "items" : "long"}}, + {"name": "debited_ids", "type": {"type" : "array", "items" : "long"}} + ] + }"""); + + public static void main(String[] args) { + Schema schema = TRANSACTION_AVRO_SCHEMA; + List jvmArgs = Arrays.asList(args); + if (jvmArgs.size() != 3) { + System.out.println("Usage: ShiftAvroConsensusTimes <# of 3-year offsets>"); + System.exit(1); + } + final String inputFilename = jvmArgs.get(0); + final String outputFilename = jvmArgs.get(1); + long offsetInNanos = 0L; + try { + int years = 3 * Integer.parseInt(jvmArgs.get(2)); + if (years < -15) { + System.out.print("Warning: it is possible that your timestamps will end up pre-epoch (negative longs)"); + System.out.println(".\nThis is not necessarily an error - just be cautious."); + } + offsetInNanos = years * 365 * 24 * 3600 * 1000000000L; + System.out.println("offset: " + offsetInNanos); + } catch (NumberFormatException ex) { + ex.printStackTrace(); + System.exit(2); + } + + File inputFile = new File(inputFilename); + File outputFile = new File(outputFilename); + if (!inputFile.exists()) { + System.out.println("Input file '" + inputFilename + "' does not exist!"); + System.exit(3); + } + if (outputFile.exists()) { + System.out.println("Output file '" + outputFilename + "' already exists!"); + System.exit(4); + } + + DatumWriter datumWriter = new GenericDatumWriter<>(schema); + DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter); + try { + dataFileWriter.create(schema, outputFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + + GenericRecord record = null; + GenericRecord outputRecord; + DatumReader datumReader = new GenericDatumReader<>(); + long recordNumber = 0; + long minimumInputTimestamp = Long.MAX_VALUE; + long maximumInputTimestamp = Long.MIN_VALUE; + long minimumOutputTimestamp = Long.MAX_VALUE; + long maximumOutputTimestamp = Long.MIN_VALUE; + try { + DataFileReader dataFileReader = new DataFileReader<>(inputFile, datumReader); + + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + outputRecord = new GenericData.Record(schema); + Object consensusTimestampAsObject = record.get("consensus_timestamp"); + Long consensusTimestamp = 0L; + try { + if (consensusTimestampAsObject instanceof Long) { + consensusTimestamp = ((Long) consensusTimestampAsObject).longValue(); + } else if (consensusTimestampAsObject instanceof Integer) { + consensusTimestamp = ((Integer) consensusTimestampAsObject).longValue(); + } else if (consensusTimestampAsObject instanceof String) { + consensusTimestamp = Long.valueOf((String) consensusTimestampAsObject); + } else { + System.out.println("Found invalid consensus_timestamp " + consensusTimestampAsObject + + " in record #" + recordNumber + " of " + inputFilename); + } + } catch (Exception e) { + System.out.println("Found invalid consensus_timestamp " + consensusTimestampAsObject + + " in record #" + recordNumber + " of " + inputFilename); + } + minimumInputTimestamp = Math.min(minimumInputTimestamp, consensusTimestamp); + maximumInputTimestamp = Math.max(minimumInputTimestamp, consensusTimestamp); + consensusTimestamp += offsetInNanos; + minimumOutputTimestamp = Math.min(minimumOutputTimestamp, consensusTimestamp); + maximumOutputTimestamp = Math.max(minimumOutputTimestamp, consensusTimestamp); + outputRecord.put("consensus_timestamp", consensusTimestamp); + outputRecord.put("entityId", record.get("entityId")); + outputRecord.put("type", record.get("type")); + outputRecord.put("index", record.get("index")); + outputRecord.put("transaction_id", record.get("transaction_id")); + outputRecord.put("result", record.get("result")); + outputRecord.put("fields", record.get("fields")); + outputRecord.put("transfers_tokens", record.get("transfers_tokens")); + outputRecord.put("transfers_hbar", record.get("transfers_hbar")); + outputRecord.put("transfers_nft", record.get("transfers_nft")); + outputRecord.put("contract_logs", record.get("contract_logs")); + outputRecord.put("contract_results", record.get("contract_results")); + outputRecord.put("contract_state_change", record.get("contract_state_change")); + outputRecord.put("nonce", record.get("nonce")); + outputRecord.put("scheduled", record.get("scheduled")); + outputRecord.put("assessed_custom_fees", record.get("assessed_custom_fees")); + outputRecord.put("ids", record.get("ids")); + outputRecord.put("credited_ids", record.get("credited_ids")); + outputRecord.put("debited_ids", record.get("debited_ids")); + recordNumber++; + dataFileWriter.append(outputRecord); + } + dataFileWriter.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + System.out.println("A total of " + recordNumber + " records were copied/modified."); + System.out.println("Input consensus timestamps ranged from " + minimumInputTimestamp + " to " + + maximumInputTimestamp); + System.out.println("Output consensus timestamps ranged from " + minimumOutputTimestamp + " to " + + maximumOutputTimestamp); + } +} From 3afc588a5298d5208f17da88f71611c68ee7023b Mon Sep 17 00:00:00 2001 From: Marc Kriguer Date: Mon, 29 Aug 2022 17:31:25 -0700 Subject: [PATCH 2/3] Since we don't have sufficient disk space to copy all the transaction.avro files multiple times, add an option to upload the (now-temporary) output file and then remove it. Signed-off-by: Marc Kriguer --- .../util/GoogleStorageHelper.java | 18 ++++++++++++++++- .../util/ShiftAvroConsensusTimes.java | 20 ++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/swirlds/streamloader/util/GoogleStorageHelper.java b/src/main/java/com/swirlds/streamloader/util/GoogleStorageHelper.java index 7997f04..285172c 100644 --- a/src/main/java/com/swirlds/streamloader/util/GoogleStorageHelper.java +++ b/src/main/java/com/swirlds/streamloader/util/GoogleStorageHelper.java @@ -99,6 +99,23 @@ private static ByteBuffer getBuffer(ThreadLocal threadLocal, int siz } return buf.clear(); } + + public static void uploadFile(String bucketName, Path filePath, String filePathInBucket) { + try { + // read whole file into a ByteBuffer + final int fileSize = (int)Files.size(filePath); + final ByteBuffer inputBuffer = getBuffer(inputBuffers, fileSize, false); + try (final var channel = Files.newByteChannel(filePath, READ)) { + channel.read(inputBuffer); + } + inputBuffer.flip(); + // upload + uploadBlob(bucketName, filePathInBucket, inputBuffer.array()); + } catch (IOException e) { + Utils.failWithError(e); + } + } + public static void compressAndUploadFile(String bucketName, Path filePath, String filePathInBucket) { try { // read whole file @@ -124,5 +141,4 @@ public static void compressAndUploadFile(String bucketName, Path filePath, Strin } } - } diff --git a/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java b/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java index c017efb..3e1527e 100644 --- a/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java +++ b/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java @@ -12,9 +12,12 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Paths; import java.util.Arrays; import java.util.List; +import static com.swirlds.streamloader.util.GoogleStorageHelper.uploadFile; + public class ShiftAvroConsensusTimes { // copied from ./src/main/java/com/swirlds/streamloader/processing/TransactionProcessingBlock.java; check // there for latest version! @@ -45,6 +48,8 @@ public class ShiftAvroConsensusTimes { ] }"""); + private static final boolean uploadAndRemoveGeneratedFiles = true; + public static void main(String[] args) { Schema schema = TRANSACTION_AVRO_SCHEMA; List jvmArgs = Arrays.asList(args); @@ -52,7 +57,7 @@ public static void main(String[] args) { System.out.println("Usage: ShiftAvroConsensusTimes <# of 3-year offsets>"); System.exit(1); } - final String inputFilename = jvmArgs.get(0); + final String inputFilename = jvmArgs.get(0); final String outputFilename = jvmArgs.get(1); long offsetInNanos = 0L; try { @@ -154,5 +159,18 @@ record = dataFileReader.next(record); maximumInputTimestamp); System.out.println("Output consensus timestamps ranged from " + minimumOutputTimestamp + " to " + maximumOutputTimestamp); + if (uploadAndRemoveGeneratedFiles) { + try { + uploadFile("pinot-ingestion", Paths.get(outputFilename), schema.getName() + "/" + + outputFilename.substring(outputFilename.lastIndexOf("/") + 1)); + if (outputFile.delete()) { + System.out.println("File " + outputFilename + " successfully uploaded and deleted locally."); + } else { + System.out.println("*** File " + outputFilename + " not successfully uploaded / deleted."); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } } } From 74a6326afedb6cf34a04eaea48e908d886f146a1 Mon Sep 17 00:00:00 2001 From: Marc Kriguer Date: Tue, 30 Aug 2022 16:24:02 -0700 Subject: [PATCH 3/3] Get compressAndUoloadFile() working again. Also, get "snappy" codec for avro files to work. But choose one or the other, not both. Signed-off-by: Marc Kriguer --- .../util/GoogleStorageHelper.java | 21 +++++++++---------- .../util/ShiftAvroConsensusTimes.java | 11 +++++++--- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/swirlds/streamloader/util/GoogleStorageHelper.java b/src/main/java/com/swirlds/streamloader/util/GoogleStorageHelper.java index 285172c..df4e623 100644 --- a/src/main/java/com/swirlds/streamloader/util/GoogleStorageHelper.java +++ b/src/main/java/com/swirlds/streamloader/util/GoogleStorageHelper.java @@ -9,6 +9,7 @@ import javax.json.Json; import javax.json.stream.JsonParser; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URL; import java.nio.ByteBuffer; @@ -16,7 +17,7 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Iterator; -import java.util.zip.Deflater; +import java.util.zip.GZIPOutputStream; import static java.nio.file.StandardOpenOption.READ; @@ -120,22 +121,20 @@ public static void compressAndUploadFile(String bucketName, Path filePath, Strin try { // read whole file final int fileSize = (int)Files.size(filePath); - final ByteBuffer inputBuffer = getBuffer(inputBuffers, fileSize, true); + final ByteBuffer inputBuffer = getBuffer(inputBuffers, fileSize, false); try (final var channel = Files.newByteChannel(filePath, READ)) { channel.read(inputBuffer); } inputBuffer.flip(); // Compress the bytes - final ByteBuffer outputBuffer = getBuffer(outputBuffers, fileSize/2, false); // assume at least half size - final Deflater compressor = new Deflater(); - compressor.setInput(inputBuffer); - compressor.finish(); - final int compressedDataLength = compressor.deflate(outputBuffer); - compressor.end(); - outputBuffer.flip(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(baos)) { + gzip.write(inputBuffer.array()); + } catch (IOException e) { + Utils.failWithError(e); + } // upload - GoogleStorageHelper.uploadBlob(bucketName, filePathInBucket, - outputBuffer.array(), 0, outputBuffer.limit()); + GoogleStorageHelper.uploadBlob(bucketName, filePathInBucket, baos.toByteArray()); } catch (IOException e) { Utils.failWithError(e); } diff --git a/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java b/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java index 3e1527e..0c38c2d 100644 --- a/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java +++ b/src/main/java/com/swirlds/streamloader/util/ShiftAvroConsensusTimes.java @@ -1,6 +1,7 @@ package com.swirlds.streamloader.util; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -16,7 +17,7 @@ import java.util.Arrays; import java.util.List; -import static com.swirlds.streamloader.util.GoogleStorageHelper.uploadFile; +import static com.swirlds.streamloader.util.GoogleStorageHelper.compressAndUploadFile; public class ShiftAvroConsensusTimes { // copied from ./src/main/java/com/swirlds/streamloader/processing/TransactionProcessingBlock.java; check @@ -87,6 +88,8 @@ public static void main(String[] args) { DatumWriter datumWriter = new GenericDatumWriter<>(schema); DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter); try { + // smallest files tend to be gzipped, non-snappy .avro files. + // dataFileWriter.setCodec(CodecFactory.snappyCodec()); dataFileWriter.create(schema, outputFile); } catch (IOException e) { throw new RuntimeException(e); @@ -161,8 +164,10 @@ record = dataFileReader.next(record); maximumOutputTimestamp); if (uploadAndRemoveGeneratedFiles) { try { - uploadFile("pinot-ingestion", Paths.get(outputFilename), schema.getName() + "/" + - outputFilename.substring(outputFilename.lastIndexOf("/") + 1)); + // don't try to gzip an avro file written with the "snappy" codec -- the gzipped file + // ends up larger than the avro file! + compressAndUploadFile("pinot-ingestion", Paths.get(outputFilename), schema.getName() + "/" + + outputFilename.substring(outputFilename.lastIndexOf("/") + 1) + ".gz"); if (outputFile.delete()) { System.out.println("File " + outputFilename + " successfully uploaded and deleted locally."); } else {