diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValue.kt index 73acce297030..4f6040bc599e 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValue.kt @@ -5,27 +5,154 @@ package io.airbyte.cdk.data import java.math.BigDecimal +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.OffsetTime +import java.time.ZoneOffset -sealed interface AirbyteValue +sealed interface AirbyteValue { + companion object { + fun from(value: Any?): AirbyteValue = + when (value) { + null -> NullValue + is String -> StringValue(value) + is Boolean -> BooleanValue(value) + is Int -> IntegerValue(value.toLong()) + is Long -> IntegerValue(value) + is Double -> NumberValue(BigDecimal.valueOf(value)) + is BigDecimal -> NumberValue(value) + is LocalDate -> DateValue(value.toString()) + is OffsetDateTime, + is LocalDateTime -> TimestampValue(value.toString()) + is OffsetTime, + is LocalTime -> TimeValue(value.toString()) + is Map<*, *> -> + ObjectValue.from(@Suppress("UNCHECKED_CAST") (value as Map)) + is List<*> -> ArrayValue.from(value) + else -> + throw IllegalArgumentException( + "Unrecognized value (${value.javaClass.name}: $value" + ) + } + } +} -data object NullValue : AirbyteValue +// Comparable implementations are intended for use in tests. +// They're not particularly robust, and probably shouldn't be relied on +// for actual sync-time logic. +// (mostly the date/timestamp/time types - everything else is fine) +data object NullValue : AirbyteValue, Comparable { + override fun compareTo(other: NullValue): Int = 0 +} -@JvmInline value class StringValue(val value: String) : AirbyteValue +@JvmInline +value class StringValue(val value: String) : AirbyteValue, Comparable { + override fun compareTo(other: StringValue): Int = value.compareTo(other.value) +} -@JvmInline value class BooleanValue(val value: Boolean) : AirbyteValue +@JvmInline +value class BooleanValue(val value: Boolean) : AirbyteValue, Comparable { + override fun compareTo(other: BooleanValue): Int = value.compareTo(other.value) +} -@JvmInline value class IntegerValue(val value: Long) : AirbyteValue +@JvmInline +value class IntegerValue(val value: Long) : AirbyteValue, Comparable { + override fun compareTo(other: IntegerValue): Int = value.compareTo(other.value) +} -@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue +@JvmInline +value class NumberValue(val value: BigDecimal) : AirbyteValue, Comparable { + override fun compareTo(other: NumberValue): Int = value.compareTo(other.value) +} -@JvmInline value class DateValue(val value: String) : AirbyteValue +@JvmInline +value class DateValue(val value: String) : AirbyteValue, Comparable { + override fun compareTo(other: DateValue): Int { + val thisDate = + try { + LocalDate.parse(value) + } catch (e: Exception) { + LocalDate.MIN + } + val otherDate = + try { + LocalDate.parse(other.value) + } catch (e: Exception) { + LocalDate.MIN + } + return thisDate.compareTo(otherDate) + } +} -@JvmInline value class TimestampValue(val value: String) : AirbyteValue +@JvmInline +value class TimestampValue(val value: String) : AirbyteValue, Comparable { + override fun compareTo(other: TimestampValue): Int { + // Do all comparisons using OffsetDateTime for convenience. + // First, try directly parsing as OffsetDateTime. + // If that fails, try parsing as LocalDateTime and assume UTC. + // We could maybe have separate value classes for these cases, + // but that comes with its own set of problems + // (mostly around sources declaring bad schemas). + val thisTimestamp = + try { + OffsetDateTime.parse(value) + } catch (e: Exception) { + LocalDateTime.parse(value).atOffset(ZoneOffset.UTC) + } catch (e: Exception) { + LocalDateTime.MIN.atOffset(ZoneOffset.UTC) + } + val otherTimestamp = + try { + OffsetDateTime.parse(other.value) + } catch (e: Exception) { + LocalDateTime.parse(other.value).atOffset(ZoneOffset.UTC) + } catch (e: Exception) { + LocalDateTime.MIN.atOffset(ZoneOffset.UTC) + } + return thisTimestamp.compareTo(otherTimestamp) + } +} -@JvmInline value class TimeValue(val value: String) : AirbyteValue +@JvmInline +value class TimeValue(val value: String) : AirbyteValue, Comparable { + override fun compareTo(other: TimeValue): Int { + // Similar to TimestampValue, try parsing with/without timezone, + // and do all comparisons using OffsetTime. + val thisTime = + try { + OffsetTime.parse(value) + } catch (e: Exception) { + LocalTime.parse(value).atOffset(ZoneOffset.UTC) + } catch (e: Exception) { + LocalTime.MIN.atOffset(ZoneOffset.UTC) + } + val otherTime = + try { + OffsetTime.parse(other.value) + } catch (e: Exception) { + LocalTime.parse(other.value).atOffset(ZoneOffset.UTC) + } catch (e: Exception) { + LocalTime.MIN.atOffset(ZoneOffset.UTC) + } + return thisTime.compareTo(otherTime) + } +} -@JvmInline value class ArrayValue(val values: List) : AirbyteValue +@JvmInline +value class ArrayValue(val values: List) : AirbyteValue { + companion object { + fun from(list: List): ArrayValue = ArrayValue(list.map { it as AirbyteValue }) + } +} -@JvmInline value class ObjectValue(val values: LinkedHashMap) : AirbyteValue +@JvmInline +value class ObjectValue(val values: LinkedHashMap) : AirbyteValue { + companion object { + fun from(map: Map): ObjectValue = + ObjectValue(map.mapValuesTo(linkedMapOf()) { (_, v) -> AirbyteValue.from(v) }) + } +} @JvmInline value class UnknownValue(val what: String) : AirbyteValue diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt new file mode 100644 index 000000000000..ae55eabb84a4 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/test/util/RecordDifferTest.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.test.util + +import java.time.OffsetDateTime +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class RecordDifferTest { + @Test + fun testBasicBehavior() { + val differ = + RecordDiffer( + primaryKey = listOf(listOf("id1"), listOf("id2")), + cursor = listOf("updated_at"), + ) + + val diff = + differ.diffRecords( + expectedRecords = + listOf( + // Extra expected record + OutputRecord( + extractedAt = 1234, + generationId = 42, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to OffsetDateTime.parse("1970-01-01T00:00:00Z"), + "name" to "alice", + "phone" to "1234" + ), + airbyteMeta = null + ), + // Matching records + OutputRecord( + extractedAt = 1234, + generationId = 42, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to OffsetDateTime.parse("1970-01-01T00:00:01Z"), + "name" to "bob", + ), + airbyteMeta = null + ), + // Different records + OutputRecord( + extractedAt = 1234, + generationId = 42, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to OffsetDateTime.parse("1970-01-01T00:00:02Z"), + "name" to "charlie", + "phone" to "1234", + "email" to "charlie@example.com" + ), + airbyteMeta = """{"sync_id": 12}""", + ), + ), + actualRecords = + listOf( + // Matching records + OutputRecord( + extractedAt = 1234, + generationId = 42, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to OffsetDateTime.parse("1970-01-01T00:00:01Z"), + "name" to "bob", + ), + airbyteMeta = null + ), + // Different records + OutputRecord( + extractedAt = 1234, + generationId = 41, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to OffsetDateTime.parse("1970-01-01T00:00:02Z"), + "name" to "charlie", + "phone" to "5678", + "address" to "1234 charlie street" + ), + airbyteMeta = null + ), + // Extra actual record + OutputRecord( + extractedAt = 1234, + generationId = 42, + mapOf( + "id1" to 1, + "id2" to 100, + "updated_at" to OffsetDateTime.parse("1970-01-01T00:00:03Z"), + "name" to "dana", + ), + airbyteMeta = null + ), + ), + ) + + Assertions.assertEquals( + """ + Missing record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00Z), name=StringValue(value=alice), phone=StringValue(value=1234)}), airbyteMeta=null) + Incorrect record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:02Z)): + generationId: Expected 42, got 41 + airbyteMeta: Expected {"sync_id":12}, got null + phone: Expected StringValue(value=1234), but was StringValue(value=5678) + email: Expected StringValue(value=charlie@example.com), but was + address: Expected , but was StringValue(value=1234 charlie street) + Unexpected record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:03Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00:03Z), name=StringValue(value=dana)}), airbyteMeta=null) + """.trimIndent(), + diff + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt new file mode 100644 index 000000000000..89e8f1a0b2d0 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/OutputRecord.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.test.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import io.airbyte.cdk.data.ObjectValue +import java.time.Instant +import java.util.UUID + +/** A record that we expect to exist in the destination, whether raw _or_ final. */ +data class OutputRecord( + val rawId: UUID?, + val extractedAt: Instant, + val loadedAt: Instant?, + val generationId: Long?, + /** + * strongly-typed map, e.g. ZonedDateTime for timestamp_with_timezone. this makes destination + * test implementations easier. values can be null, b/c warehouse destinations with a JSON + * column type can be either SQL null, or JSON null, and we want to distinguish between those. + * Destinations _must_ filter out the airbyte_* fields from this map. + */ + val data: ObjectValue, + val airbyteMeta: JsonNode?, +) { + /** Utility constructor with easier types to write by hand */ + constructor( + rawId: String, + extractedAt: Long, + loadedAt: Long?, + generationId: Long?, + data: Map, + airbyteMeta: String?, + ) : this( + UUID.fromString(rawId), + Instant.ofEpochMilli(extractedAt), + loadedAt?.let { Instant.ofEpochMilli(it) }, + generationId, + ObjectValue.from(data), + airbyteMeta?.let { ObjectMapper().readTree(it) }, + ) + + /** + * Utility constructor for "expected records". [rawId] and [loadedAt] are generated by the + * destination at runtime, so we don't have those when writing the test. Just generate arbitrary + * values for them. + */ + constructor( + extractedAt: Long, + generationId: Long?, + data: Map, + airbyteMeta: String?, + ) : this( + null, + Instant.ofEpochMilli(extractedAt), + loadedAt = null, + generationId, + ObjectValue.from(data), + airbyteMeta?.let { ObjectMapper().readTree(it) }, + ) +} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/RecordDiffer.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/RecordDiffer.kt new file mode 100644 index 000000000000..5c18d3954e29 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/test/util/RecordDiffer.kt @@ -0,0 +1,264 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.test.util + +import io.airbyte.cdk.data.AirbyteValue +import io.airbyte.cdk.data.IntegerValue +import io.airbyte.cdk.data.NullValue +import io.airbyte.cdk.data.ObjectValue +import kotlin.reflect.jvm.jvmName + +class RecordDiffer( + /** + * The path(s) to the primary key fields from a record. Most streams will have some `id` + * field(s), even if they're not running in dedup mode. This comparator lets us match records + * together to generate a more useful diff. + * + * In the rare case that a stream truly has no PK, the default value simply returns an empty + * list. + */ + val primaryKey: List> = emptyList(), + /** The path to the cursor from a record, or null if the stream has no cursor. */ + val cursor: List? = null, +) { + private fun extract(data: Map, path: List): AirbyteValue { + return when (path.size) { + 0 -> throw IllegalArgumentException("Empty path") + 1 -> data[path.first()] ?: NullValue + else -> { + when (val next = data[path.first()]) { + null -> NullValue + is ObjectValue -> extract(next.values, path.subList(1, path.size)) + else -> + throw IllegalArgumentException( + "Encountered non-map entry in path: $next at ${path.first()}" + ) + } + } + } + } + + // if primaryKey is empty list, this always returns emptyList. + private fun extractPrimaryKey(record: OutputRecord): List { + val pks = mutableListOf() + for (pkField in primaryKey) { + pks.add(extract(record.data.values, pkField)) + } + return pks + } + private fun extractCursor(record: OutputRecord): AirbyteValue { + return extract(record.data.values, cursor!!) + } + + /** Comparator that sorts records by their primary key */ + private val identityComparator: Comparator = Comparator { rec1, rec2 -> + val pk1 = extractPrimaryKey(rec1) + val pk2 = extractPrimaryKey(rec2) + if (pk1.size != pk2.size) { + throw IllegalStateException( + "Records must have the same number of primary keys. Got $pk1 and $pk2." + ) + } + + // Compare each PK field in order, until we find a field that the two records differ in. + // If all the fields are equal, then these two records have the same PK. + pk1.zip(pk2) + .map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) } + .firstOrNull { it != 0 } + ?: 0 + } + + /** + * Comparator to sort records by their cursor (if there is one), breaking ties with extractedAt + */ + private val sortComparator: Comparator = + Comparator.comparing( + { it: OutputRecord -> + (if (cursor == null) IntegerValue(0) else extractCursor(it)) + }, + valueComparator + ) + .thenComparing { it -> it.extractedAt } + + /** + * The actual comparator we'll use to sort the expected/actual record lists. I.e. group records + * by their PK, then within each PK, sort by cursor/extractedAt. + */ + private val everythingComparator = identityComparator.thenComparing(sortComparator) + + /** Returns a pretty-printed diff of the two lists, or null if they were identical */ + fun diffRecords( + expectedRecords: List, + actualRecords: List + ): String? { + val expectedRecordsSorted = expectedRecords.sortedWith(everythingComparator) + val actualRecordsSorted = actualRecords.sortedWith(everythingComparator) + + // Match up all the records between the expected and actual records, + // or if there's no matching record then detect that also. + // We'll filter this list down to actual differing records later on. + val matches = mutableListOf() + var expectedRecordIndex = 0 + var actualRecordIndex = 0 + while ( + expectedRecordIndex < expectedRecordsSorted.size && + actualRecordIndex < actualRecordsSorted.size + ) { + val expectedRecord = expectedRecords[expectedRecordIndex] + val actualRecord = actualRecords[actualRecordIndex] + val compare = everythingComparator.compare(expectedRecord, actualRecord) + if (compare == 0) { + // These records are the same underlying record + matches.add(MatchingRecords(expectedRecord, actualRecord)) + expectedRecordIndex++ + actualRecordIndex++ + } else if (compare < 0) { + // There's an extra expected record + matches.add(MatchingRecords(expectedRecord, actualRecord = null)) + expectedRecordIndex++ + } else { + // There's an extra actual record + matches.add(MatchingRecords(expectedRecord = null, actualRecord)) + actualRecordIndex++ + } + } + + // Tail loops in case we reached the end of one list before the other. + while (expectedRecordIndex < expectedRecords.size) { + matches.add(MatchingRecords(expectedRecords[expectedRecordIndex], actualRecord = null)) + expectedRecordIndex++ + } + while (actualRecordIndex < actualRecords.size) { + matches.add(MatchingRecords(expectedRecord = null, actualRecords[actualRecordIndex])) + actualRecordIndex++ + } + + // We've paired up all the records, now find just the ones that are wrong. + val diffs = matches.filter { it.isMismatch() } + return if (diffs.isEmpty()) { + null + } else { + diffs.joinToString("\n") { it.prettyPrintMismatch() } + } + } + + private inner class MatchingRecords( + val expectedRecord: OutputRecord?, + val actualRecord: OutputRecord?, + ) { + fun isMismatch(): Boolean = + (expectedRecord == null && actualRecord != null) || + (expectedRecord != null && actualRecord == null) || + !recordsMatch(expectedRecord, actualRecord) + + fun prettyPrintMismatch(): String { + return if (expectedRecord == null) { + "Unexpected record (${generateRecordIdentifier(actualRecord!!)}): $actualRecord" + } else if (actualRecord == null) { + "Missing record (${generateRecordIdentifier(expectedRecord)}): $expectedRecord" + } else { + "Incorrect record (${generateRecordIdentifier(actualRecord)}):\n" + + generateDiffString(expectedRecord, actualRecord).prependIndent(" ") + } + } + + private fun recordsMatch( + expectedRecord: OutputRecord?, + actualRecord: OutputRecord?, + ): Boolean = + (expectedRecord == null && actualRecord == null) || + (expectedRecord != null && + actualRecord != null && + generateDiffString(expectedRecord, actualRecord).isEmpty()) + + private fun generateRecordIdentifier(record: OutputRecord): String { + // If the PK is an empty list, then don't include it + val pk: List = extractPrimaryKey(record) + val pkString = if (pk.isEmpty()) "" else "pk=$pk" + + if (cursor != null) { + val cursor: AirbyteValue = extractCursor(record) + return "$pkString, cursor=$cursor" + } else { + return pkString + } + } + + private fun generateDiffString( + expectedRecord: OutputRecord, + actualRecord: OutputRecord, + ): String { + val diff: StringBuilder = StringBuilder() + // Intentionally don't diff loadedAt / rawId, since those are generated dynamically by + // the destination. + if (expectedRecord.extractedAt != actualRecord.extractedAt) { + diff.append( + "extractedAt: Expected ${expectedRecord.extractedAt}, got ${actualRecord.extractedAt}\n" + ) + } + if (expectedRecord.generationId != actualRecord.generationId) { + diff.append( + "generationId: Expected ${expectedRecord.generationId}, got ${actualRecord.generationId}\n" + ) + } + if (expectedRecord.airbyteMeta != actualRecord.airbyteMeta) { + diff.append( + "airbyteMeta: Expected ${expectedRecord.airbyteMeta}, got ${actualRecord.airbyteMeta}\n" + ) + } + + // Diff the data. Iterate over all keys in the expected/actual records and compare their + // values. + val allDataKeys: Set = + expectedRecord.data.values.keys + actualRecord.data.values.keys + allDataKeys.forEach { key -> + val expectedPresent: Boolean = expectedRecord.data.values.containsKey(key) + val actualPresent: Boolean = actualRecord.data.values.containsKey(key) + if (expectedPresent && !actualPresent) { + // The expected record contained this key, but the actual record was missing + // this key. + diff.append( + "$key: Expected ${expectedRecord.data.values[key]}, but was \n" + ) + } else if (!expectedPresent && actualPresent) { + // The expected record didn't contain this key, but the actual record contained + // this key. + diff.append( + "$key: Expected , but was ${actualRecord.data.values[key]}\n" + ) + } else if (expectedPresent && actualPresent) { + // The expected and actual records both contain this key. + // Compare the values for equality. + // (actualPresent is always true here, but I think the if-tree is more readable + // with it explicitly in the condition) + val expectedValue = expectedRecord.data.values[key] + val actualValue = actualRecord.data.values[key] + if (expectedValue != actualValue) { + diff.append("$key: Expected $expectedValue, but was $actualValue\n") + } + } + } + return diff.toString().trim() + } + } + + companion object { + val valueComparator: Comparator = + Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!) } + + private fun compare(v1: AirbyteValue, v2: AirbyteValue): Int { + // when comparing values of different types, just sort by their class name. + // in theory, we could check for numeric types and handle them smartly... + // that's a lot of work though + return if (v1::class != v2::class) { + v1::class.jvmName.compareTo(v2::class.jvmName) + } else { + // otherwise, just be a terrible person. + // we know these are the same type, so this is safe to do. + @Suppress("UNCHECKED_CAST") (v1 as Comparable).compareTo(v2) + } + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 30bb00b8fee5..6cc2619d1aa8 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -47,7 +47,7 @@ private val LOGGER = KotlinLogging.logger {} */ @Execution(ExecutionMode.CONCURRENT) abstract class BaseSqlGeneratorIntegrationTest { - protected var DIFFER: RecordDiffer = mock() + protected var DIFFER: LegacyRecordDiffer = mock() /** Subclasses may use these four StreamConfigs in their tests. */ protected var incrementalDedupStream: StreamConfig = mock() @@ -200,7 +200,7 @@ abstract class BaseSqlGeneratorIntegrationTest,