Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add benchmarks for magnolify-parquet vs parquet-avro R/W #1040

Merged
merged 11 commits into from
Sep 20, 2024
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ lazy val jmh: Project = project
cats % Test,
datastore % Test,
guava % Test,
parquet % Test,
protobuf % "test->test",
scalacheck % Test,
tensorflow % Test,
Expand All @@ -766,7 +767,13 @@ lazy val jmh: Project = project
"com.google.apis" % "google-api-services-bigquery" % bigqueryVersion % Test,
"com.google.cloud.datastore" % "datastore-v1-proto-client" % datastoreVersion % Test,
"org.apache.avro" % "avro" % avroVersion % Test,
"org.tensorflow" % "tensorflow-core-api" % tensorflowVersion % Test
"org.tensorflow" % "tensorflow-core-api" % tensorflowVersion % Test,
"joda-time" % "joda-time" % jodaTimeVersion % Test,
"org.apache.parquet" % "parquet-avro" % parquetVersion % Test,
"org.apache.parquet" % "parquet-column" % parquetVersion % Test,
"org.apache.parquet" % "parquet-hadoop" % parquetVersion % Test,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % Test,
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % Test
)
)

Expand Down
147 changes: 145 additions & 2 deletions jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package magnolify.jmh

import java.util.concurrent.TimeUnit

import magnolify.scalacheck.auto._
import magnolify.test.Simple._
import org.scalacheck._
import org.openjdk.jmh.annotations._

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

object MagnolifyBench {
val seed: rng.Seed = rng.Seed(0)
val prms: Gen.Parameters = Gen.Parameters.default
Expand Down Expand Up @@ -157,7 +159,148 @@ class ExampleBench {
private val exampleNested = implicitly[Arbitrary[ExampleNested]].arbitrary(prms, seed).get
private val example = exampleType.to(exampleNested).build()
@Benchmark def exampleTo: Example.Builder = exampleType.to(exampleNested)
@Benchmark def exampleFrom: ExampleNested = exampleType.from(example)
@Benchmark def exampleFrom: ExampleNested =
exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap)
RustedBones marked this conversation as resolved.
Show resolved Hide resolved
}

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
class ParquetBench {
import MagnolifyBench._
import ParquetStates._
import magnolify.avro._
import org.apache.avro.generic.GenericRecord

private val genericRecord = AvroType[Nested].to(nested)

@Benchmark def parquetWriteMagnolify(state: ParquetCaseClassWriteState): Unit =
state.writer.write(nested)
@Benchmark def parquetWriteAvro(state: ParquetAvroWriteState): Unit =
state.writer.write(genericRecord)

@Benchmark def parquetReadMagnolify(state: ParquetCaseClassReadState): Nested =
state.reader.read()
@Benchmark def parquetReadAvro(state: ParquetAvroReadState): GenericRecord = state.reader.read()
}

object ParquetStates {
import MagnolifyBench._
import magnolify.avro._
import magnolify.parquet._
import magnolify.parquet.ParquetArray.AvroCompat._
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.conf.PlainParquetConfiguration
import org.apache.parquet.avro.{AvroReadSupport, AvroWriteSupport}
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.api.{ReadSupport, WriteSupport}
import org.apache.parquet.schema.MessageType
import org.apache.parquet.io._
import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.apache.parquet.column.impl.ColumnWriteStoreV1

@State(Scope.Benchmark)
class ReadState[T](
schema: MessageType,
writeSupport: WriteSupport[T],
readSupport: ReadSupport[T],
record: T
) {
import org.apache.parquet.hadoop.api.InitContext

var reader: RecordReader[T] = null

@Setup(Level.Trial)
def setup(): Unit = {
// Write page
val columnIO = new ColumnIOFactory(true).getColumnIO(schema)
val pageStore = new ParquetInMemoryPageStore(1)
val columnWriteStore = new ColumnWriteStoreV1(
schema,
pageStore,
ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build
)
val recordConsumer = columnIO.getRecordWriter(columnWriteStore)
writeSupport.init(new PlainParquetConfiguration())
writeSupport.prepareForWrite(recordConsumer)
writeSupport.write(record)
recordConsumer.flush()
columnWriteStore.flush()

// Set up reader
val conf = new Configuration()
reader = columnIO.getRecordReader(
pageStore,
readSupport.prepareForRead(
conf,
new java.util.HashMap,
schema,
readSupport.init(new InitContext(conf, new java.util.HashMap, schema))
)
): @nowarn("cat=deprecation")
}
}

@State(Scope.Benchmark)
class WriteState[T](writeSupport: WriteSupport[T]) {
val writer = writeSupport

@Setup(Level.Trial)
def setup(): Unit = {
writeSupport.init(new PlainParquetConfiguration())
// Use a no-op RecordConsumer; we want to measure only the record -> group conversion, and not pollute the
// benchmark with background tasks like flushing pages/blocks or validating records
writeSupport.prepareForWrite(new RecordConsumer {
override def startMessage(): Unit = {}
override def endMessage(): Unit = {}
override def startField(field: String, index: Int): Unit = {}
override def endField(field: String, index: Int): Unit = {}
override def startGroup(): Unit = {}
override def endGroup(): Unit = {}
override def addInteger(value: Int): Unit = {}
override def addLong(value: Long): Unit = {}
override def addBoolean(value: Boolean): Unit = {}
override def addBinary(value: Binary): Unit = {}
override def addFloat(value: Float): Unit = {}
override def addDouble(value: Double): Unit = {}
})
}
}

// R/W support for Group <-> Case Class Conversion (magnolify-parquet)
private val parquetType = ParquetType[Nested]
class ParquetCaseClassReadState
extends ParquetStates.ReadState[Nested](
parquetType.schema,
parquetType.writeSupport,
parquetType.readSupport,
nested
)
class ParquetCaseClassWriteState
extends ParquetStates.WriteState[Nested](parquetType.writeSupport)

// R/W support for Group <-> Avro Conversion (parquet-avro)
private val avroType = AvroType[Nested]
class ParquetAvroReadState
extends ParquetStates.ReadState[GenericRecord](
parquetType.schema,
new AvroWriteSupport[GenericRecord](
parquetType.schema,
parquetType.avroSchema,
GenericData.get()
),
new AvroReadSupport[GenericRecord](GenericData.get()),
avroType.to(nested)
)
class ParquetAvroWriteState
extends ParquetStates.WriteState[GenericRecord](
new AvroWriteSupport[GenericRecord](
parquetType.schema,
parquetType.avroSchema,
GenericData.get()
)
)
}

// Collections are not supported
Expand Down
139 changes: 139 additions & 0 deletions jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package magnolify.jmh

import org.apache.parquet.bytes.{ByteBufferReleaser, BytesInput, HeapByteBufferAllocator}
import org.apache.parquet.column.{ColumnDescriptor, Encoding}
import org.apache.parquet.column.page._
import org.apache.parquet.column.statistics._

import scala.collection.mutable

/**
* An in-memory Parquet page store modeled after parquet-java's MemPageStore, used to benchmark
* ParquetType conversion between Parquet Groups and Scala case classes
*/
class ParquetInMemoryPageStore(rowCount: Long) extends PageReadStore with PageWriteStore {
Copy link
Contributor Author

@clairemcginty clairemcginty Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These classes are heavily based on this parquet-java package, which sadly are not a part of any artifact: https://github.com/apache/parquet-java/tree/master/parquet-column/src/test/java/org/apache/parquet/column/page/mem

lazy val writers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryWriter]()
lazy val readers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryReader]()

override def getPageReader(path: ColumnDescriptor): PageReader =
readers.getOrElseUpdate(
path, {
val writer = writers(path)
new ParquetInMemoryReader(writer.pages.toList, writer.dictionaryPage)
}
)

override def getPageWriter(path: ColumnDescriptor): PageWriter =
writers.getOrElseUpdate(path, new ParquetInMemoryWriter)

override def getRowCount: Long = rowCount
}

class ParquetInMemoryReader(pages: List[DataPageV1], dictionaryPage: DictionaryPage)
extends PageReader {
// Infinitely return the first page; for the purposes of benchmarking, we don't care about the data itself
private val page = pages.head

override def readDictionaryPage(): DictionaryPage = dictionaryPage
override def getTotalValueCount: Long = Long.MaxValue
override def readPage(): DataPage = new DataPageV1(
page.getBytes.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)),
page.getValueCount,
page.getUncompressedSize,
page.getStatistics,
page.getRlEncoding,
page.getDlEncoding,
page.getValueEncoding
)
}

class ParquetInMemoryWriter extends PageWriter {
var numRows = 0
var numValues: Long = 0
var memSize: Long = 0
val pages = new mutable.ListBuffer[DataPageV1]()
var dictionaryPage: DictionaryPage = null

override def writePage(
bytesInput: BytesInput,
valueCount: Int,
statistics: Statistics[_],
rlEncoding: Encoding,
dlEncoding: Encoding,
valuesEncoding: Encoding
): Unit =
writePage(bytesInput, valueCount, 1, statistics, rlEncoding, dlEncoding, valuesEncoding)

override def writePage(
bytesInput: BytesInput,
valueCount: Int,
rowCount: Int,
statistics: Statistics[_],
sizeStatistics: SizeStatistics,
rlEncoding: Encoding,
dlEncoding: Encoding,
valuesEncoding: Encoding
): Unit =
writePage(bytesInput, valueCount, rowCount, statistics, rlEncoding, dlEncoding, valuesEncoding)

override def writePage(
bytesInput: BytesInput,
valueCount: Int,
rowCount: Int,
statistics: Statistics[_],
rlEncoding: Encoding,
dlEncoding: Encoding,
valuesEncoding: Encoding
): Unit = {
pages.addOne(
new DataPageV1(
bytesInput.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)),
valueCount,
bytesInput.size().toInt,
statistics,
rlEncoding,
dlEncoding,
valuesEncoding
)
)
memSize += bytesInput.size()
numRows += rowCount
numValues += valueCount
}

override def writePageV2(
rowCount: Int,
nullCount: Int,
valueCount: Int,
repetitionLevels: BytesInput,
definitionLevels: BytesInput,
dataEncoding: Encoding,
data: BytesInput,
statistics: Statistics[_]
): Unit = ???

override def getMemSize: Long = memSize

override def allocatedSize(): Long = memSize

override def writeDictionaryPage(dictionaryPage: DictionaryPage): Unit =
this.dictionaryPage = dictionaryPage

override def memUsageString(prefix: String): String = s"$prefix $memSize bytes"
}
Loading