From ce7478f9dea92d0308bbc4628abcbf904fbba0e1 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 28 Aug 2024 14:17:13 -0700 Subject: [PATCH] build a recorddiffer that handles strongly-typed records --- .../airbyte/cdk/test/util/RecordDifferTest.kt | 121 +++++++++ .../io/airbyte/cdk/test/util/OutputRecord.kt | 62 +++++ .../io/airbyte/cdk/test/util/RecordDiffer.kt | 256 ++++++++++++++++++ .../BaseSqlGeneratorIntegrationTest.kt | 4 +- .../typing_deduping/BaseTypingDedupingTest.kt | 4 +- ...{RecordDiffer.kt => LegacyRecordDiffer.kt} | 7 +- 6 files changed, 449 insertions(+), 5 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt create mode 100644 airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt create mode 100644 airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/RecordDiffer.kt rename airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/{RecordDiffer.kt => LegacyRecordDiffer.kt} (98%) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt new file mode 100644 index 000000000000..01b08195f325 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.test.util + +import java.time.Instant +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class RecordDifferTest { + @Test + fun testBasicBehavior() { + val differ = + RecordDiffer( + extractPrimaryKey = { listOf(it.data["id1"], it.data["id2"]) }, + extractCursor = { it.data["updated_at"] } + ) + + val diff = + differ.diffRecords( + expectedRecords = + listOf( + // Extra expected record + OutputRecord( + extractedAt = 1234, + generationId = 42, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to Instant.parse("1970-01-01T00:00:00Z"), + "name" to "alice", + "phone" to "1234" + ), + airbyteMeta = null + ), + // Matching records + OutputRecord( + extractedAt = 1234, + generationId = 42, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to Instant.parse("1970-01-01T00:00:01Z"), + "name" to "bob", + ), + airbyteMeta = null + ), + // Different records + OutputRecord( + extractedAt = 1234, + generationId = 42, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to Instant.parse("1970-01-01T00:00:02Z"), + "name" to "charlie", + "phone" to "1234", + "email" to "charlie@example.com" + ), + airbyteMeta = """{"sync_id": 12}""", + ), + ), + actualRecords = + listOf( + // Matching records + OutputRecord( + extractedAt = 1234, + generationId = 42, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to Instant.parse("1970-01-01T00:00:01Z"), + "name" to "bob", + ), + airbyteMeta = null + ), + // Different records + OutputRecord( + extractedAt = 1234, + generationId = 41, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to Instant.parse("1970-01-01T00:00:02Z"), + "name" to "charlie", + "phone" to "5678", + "address" to "1234 charlie street" + ), + airbyteMeta = null + ), + // Extra actual record + OutputRecord( + extractedAt = 1234, + generationId = 42, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to Instant.parse("1970-01-01T00:00:03Z"), + "name" to "dana", + ), + airbyteMeta = null + ), + ), + ) + + Assertions.assertEquals( + """ + Missing record (pk=[1, 100], cursor=1970-01-01T00:00:00Z): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data={id1=1, id2=100, updated_at=1970-01-01T00:00:00Z, name=alice, phone=1234}, airbyteMeta=null) + Incorrect record ((pk=[1, 100], cursor=1970-01-01T00:00:02Z): + generationId: Expected 42, got 41 + airbyteMeta: Expected {"sync_id":12}, got null + phone: Expected 1234, but was 5678 + email: Expected charlie@example.com, but was + address: Expected , but was 1234 charlie street + Unexpected record (pk=[1, 100], cursor=1970-01-01T00:00:03Z): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data={id1=1, id2=100, updated_at=1970-01-01T00:00:03Z, name=dana}, airbyteMeta=null) + """.trimIndent(), + diff + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt new file mode 100644 index 000000000000..15328f4a72d0 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.test.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import java.time.Instant +import java.util.UUID + +/** A record that we expect to exist in the destination, whether raw _or_ final. */ +data class OutputRecord( + val rawId: UUID?, + val extractedAt: Instant, + val loadedAt: Instant?, + val generationId: Long?, + /** + * strongly-typed map, e.g. ZonedDateTime for timestamp_with_timezone. this makes destination + * test implementations easier. values can be null, b/c warehouse destinations with a JSON + * column type can be either SQL null, or JSON null, and we want to distinguish between those. + * Destinations _must_ filter out the airbyte_* fields from this map. + */ + val data: Map, + val airbyteMeta: JsonNode?, +) { + /** Utility constructor with easier types to write by hand */ + constructor( + rawId: String, + extractedAt: Long, + loadedAt: Long?, + generationId: Long?, + data: Map, + airbyteMeta: String?, + ) : this( + UUID.fromString(rawId), + Instant.ofEpochMilli(extractedAt), + loadedAt?.let { Instant.ofEpochMilli(it) }, + generationId, + data, + airbyteMeta?.let { ObjectMapper().readTree(it) }, + ) + + /** + * Utility constructor for "expected records". [rawId] and [loadedAt] are generated by the + * destination at runtime, so we don't have those when writing the test. Just generate arbitrary + * values for them. + */ + constructor( + extractedAt: Long, + generationId: Long?, + data: Map, + airbyteMeta: String?, + ) : this( + null, + Instant.ofEpochMilli(extractedAt), + loadedAt = null, + generationId, + data, + airbyteMeta?.let { ObjectMapper().readTree(it) }, + ) +} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/RecordDiffer.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/RecordDiffer.kt new file mode 100644 index 000000000000..baa09ea1be4b --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/RecordDiffer.kt @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.test.util + +import kotlin.reflect.jvm.jvmName + +class RecordDiffer( + /** + * The path(s) to the primary key fields from a record. Most streams will have some `id` + * field(s), even if they're not running in dedup mode. This comparator lets us match records + * together to generate a more useful diff. + * + * In the rare case that a stream truly has no PK, the default value simply returns an empty + * list. + */ + val primaryKey: List> = emptyList(), + /** The path to the cursor from a record, or null if the stream has no cursor. */ + val cursor: List? = null, +) { + private fun extract(data: Map, path: List): Any? { + return when (path.size) { + 0 -> throw IllegalArgumentException("Empty path") + 1 -> data[path.first()] + else -> { + when (val next = data[path.first()]) { + null -> null + is Map<*, *> -> + extract( + @Suppress("UNCHECKED_CAST") (next as Map), + path.subList(1, path.size), + ) + else -> + throw IllegalArgumentException( + "Encountered non-map entry in path: $next at ${path.first()}" + ) + } + } + } + } + + // if primaryKey is empty list, this always returns emptyList. + private fun extractPrimaryKey(record: OutputRecord): List { + val pks = mutableListOf() + for (pkField in primaryKey) { + pks.add(extract(record.data, pkField)) + } + return pks + } + private fun extractCursor(record: OutputRecord): Any? { + return extract(record.data, cursor!!) + } + + /** Comparator that sorts records by their primary key */ + private val identityComparator: Comparator = Comparator { rec1, rec2 -> + val pk1 = extractPrimaryKey(rec1) + val pk2 = extractPrimaryKey(rec2) + if (pk1.size != pk2.size) { + throw IllegalStateException( + "Records must have the same number of primary keys. Got $pk1 and $pk2." + ) + } + + // Compare each PK field in order, until we find a field that the two records differ in. + // If all the fields are equal, then these two records have the same PK. + pk1.zip(pk2) + .map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) } + .firstOrNull { it != 0 } + ?: 0 + } + + /** + * Comparator to sort records by their cursor (if there is one), breaking ties with extractedAt + */ + private val sortComparator: Comparator = + Comparator.comparing( + { it: OutputRecord -> (if (cursor == null) 0 else extractCursor(it)) }, + valueComparator + ) + .thenComparing { it -> it.extractedAt } + + /** + * The actual comparator we'll use to sort the expected/actual record lists. I.e. group records + * by their PK, then within each PK, sort by cursor/extractedAt. + */ + private val everythingComparator = identityComparator.thenComparing(sortComparator) + + /** Returns a pretty-printed diff of the two lists, or null if they were identical */ + fun diffRecords( + expectedRecords: List, + actualRecords: List + ): String? { + val expectedRecordsSorted = expectedRecords.sortedWith(everythingComparator) + val actualRecordsSorted = actualRecords.sortedWith(everythingComparator) + + // Match up all the records between the expected and actual records, + // or if there's no matching record then detect that also. + // We'll filter this list down to actual differing records later on. + val matches = mutableListOf() + var expectedRecordIndex = 0 + var actualRecordIndex = 0 + while ( + expectedRecordIndex < expectedRecordsSorted.size && + actualRecordIndex < actualRecordsSorted.size + ) { + val expectedRecord = expectedRecords[expectedRecordIndex] + val actualRecord = actualRecords[actualRecordIndex] + val compare = everythingComparator.compare(expectedRecord, actualRecord) + if (compare == 0) { + // These records are the same underlying record + matches.add(MatchingRecords(expectedRecord, actualRecord)) + expectedRecordIndex++ + actualRecordIndex++ + } else if (compare < 0) { + // There's an extra expected record + matches.add(MatchingRecords(expectedRecord, actualRecord = null)) + expectedRecordIndex++ + } else { + // There's an extra actual record + matches.add(MatchingRecords(expectedRecord = null, actualRecord)) + actualRecordIndex++ + } + } + + // Tail loops in case we reached the end of one list before the other. + while (expectedRecordIndex < expectedRecords.size) { + matches.add(MatchingRecords(expectedRecords[expectedRecordIndex], actualRecord = null)) + expectedRecordIndex++ + } + while (actualRecordIndex < actualRecords.size) { + matches.add(MatchingRecords(expectedRecord = null, actualRecords[actualRecordIndex])) + actualRecordIndex++ + } + + // We've paired up all the records, now find just the ones that are wrong. + val diffs = matches.filter { it.isMismatch() } + return if (diffs.isEmpty()) { + null + } else { + diffs.joinToString("\n") { it.prettyPrintMismatch() } + } + } + + private inner class MatchingRecords( + val expectedRecord: OutputRecord?, + val actualRecord: OutputRecord?, + ) { + fun isMismatch(): Boolean = + (expectedRecord == null && actualRecord != null) || + (expectedRecord != null && actualRecord == null) || + !recordsMatch(expectedRecord, actualRecord) + + fun prettyPrintMismatch(): String { + return if (expectedRecord == null) { + "Unexpected record (${generateRecordIdentifier(actualRecord!!)}): $actualRecord" + } else if (actualRecord == null) { + "Missing record (${generateRecordIdentifier(expectedRecord)}): $expectedRecord" + } else { + "Incorrect record (${generateRecordIdentifier(actualRecord)}):\n" + + generateDiffString(expectedRecord, actualRecord).prependIndent(" ") + } + } + + private fun recordsMatch( + expectedRecord: OutputRecord?, + actualRecord: OutputRecord?, + ): Boolean = + (expectedRecord == null && actualRecord == null) || + (expectedRecord != null && + actualRecord != null && + generateDiffString(expectedRecord, actualRecord).isEmpty()) + + private fun generateRecordIdentifier(record: OutputRecord): String { + // If the PK is an empty list, then don't include it + val pk: List = extractPrimaryKey(record) + val pkString = if (pk.isEmpty()) "" else "pk=$pk" + + if (cursor != null) { + val cursor: Any? = extractCursor(record) + return "$pkString, cursor=$cursor" + } else { + return pkString + } + } + + private fun generateDiffString( + expectedRecord: OutputRecord, + actualRecord: OutputRecord, + ): String { + val diff: StringBuilder = StringBuilder() + // Intentionally don't diff loadedAt / rawId, since those are generated dynamically by + // the destination. + if (expectedRecord.extractedAt != actualRecord.extractedAt) { + diff.append( + "extractedAt: Expected ${expectedRecord.extractedAt}, got ${actualRecord.extractedAt}\n" + ) + } + if (expectedRecord.generationId != actualRecord.generationId) { + diff.append( + "generationId: Expected ${expectedRecord.generationId}, got ${actualRecord.generationId}\n" + ) + } + if (expectedRecord.airbyteMeta != actualRecord.airbyteMeta) { + diff.append( + "airbyteMeta: Expected ${expectedRecord.airbyteMeta}, got ${actualRecord.airbyteMeta}\n" + ) + } + + // Diff the data. Iterate over all keys in the expected/actual records and compare their + // values. + val allDataKeys: Set = expectedRecord.data.keys + actualRecord.data.keys + allDataKeys.forEach { key -> + val expectedPresent: Boolean = expectedRecord.data.containsKey(key) + val actualPresent: Boolean = actualRecord.data.containsKey(key) + if (expectedPresent && !actualPresent) { + // The expected record contained this key, but the actual record was missing + // this key. + diff.append("$key: Expected ${expectedRecord.data[key]}, but was \n") + } else if (!expectedPresent && actualPresent) { + // The expected record didn't contain this key, but the actual record contained + // this key. + diff.append("$key: Expected , but was ${actualRecord.data[key]}\n") + } else if (expectedPresent && actualPresent) { + // The expected and actual records both contain this key. + // Compare the values for equality. + // (actualPresent is always true here, but I think the if-tree is more readable + // with it explicitly in the condition) + val expectedValue = expectedRecord.data[key] + val actualValue = actualRecord.data[key] + if (expectedValue != actualValue) { + diff.append("$key: Expected $expectedValue, but was $actualValue\n") + } + } + } + return diff.toString().trim() + } + } + + companion object { + val valueComparator: Comparator = + Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!) } + + private fun compare(v1: Any, v2: Any): Int { + // when comparing values of different types, just sort by their class name. + // in theory, we could check for numeric types and handle them smartly... + // that's a lot of work though + return if (v1::class != v2::class) { + v1::class.jvmName.compareTo(v2::class.jvmName) + } else { + // otherwise, just be a terrible person + @Suppress("UNCHECKED_CAST") (v1 as Comparable).compareTo(v2 as Comparable) + } + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 30bb00b8fee5..6cc2619d1aa8 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -47,7 +47,7 @@ private val LOGGER = KotlinLogging.logger {} */ @Execution(ExecutionMode.CONCURRENT) abstract class BaseSqlGeneratorIntegrationTest { - protected var DIFFER: RecordDiffer = mock() + protected var DIFFER: LegacyRecordDiffer = mock() /** Subclasses may use these four StreamConfigs in their tests. */ protected var incrementalDedupStream: StreamConfig = mock() @@ -200,7 +200,7 @@ abstract class BaseSqlGeneratorIntegrationTest,