Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destinations cdk: recorddiffer that handles strongly-typed records #44874

Open
wants to merge 1 commit into
base: issue-9361/load-cdk-with-e2e-dest-post-refactor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,154 @@
package io.airbyte.cdk.data

import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import java.time.ZoneOffset

sealed interface AirbyteValue
sealed interface AirbyteValue {
companion object {
fun from(value: Any?): AirbyteValue =
when (value) {
null -> NullValue
is String -> StringValue(value)
is Boolean -> BooleanValue(value)
is Int -> IntegerValue(value.toLong())
is Long -> IntegerValue(value)
is Double -> NumberValue(BigDecimal.valueOf(value))
is BigDecimal -> NumberValue(value)
is LocalDate -> DateValue(value.toString())
is OffsetDateTime,
is LocalDateTime -> TimestampValue(value.toString())
is OffsetTime,
is LocalTime -> TimeValue(value.toString())
is Map<*, *> ->
ObjectValue.from(@Suppress("UNCHECKED_CAST") (value as Map<String, Any?>))
is List<*> -> ArrayValue.from(value)
else ->
throw IllegalArgumentException(
"Unrecognized value (${value.javaClass.name}: $value"
)
}
}
}

data object NullValue : AirbyteValue
// Comparable implementations are intended for use in tests.
// They're not particularly robust, and probably shouldn't be relied on
// for actual sync-time logic.
// (mostly the date/timestamp/time types - everything else is fine)
data object NullValue : AirbyteValue, Comparable<NullValue> {
override fun compareTo(other: NullValue): Int = 0
}

@JvmInline value class StringValue(val value: String) : AirbyteValue
@JvmInline
value class StringValue(val value: String) : AirbyteValue, Comparable<StringValue> {
override fun compareTo(other: StringValue): Int = value.compareTo(other.value)
}

@JvmInline value class BooleanValue(val value: Boolean) : AirbyteValue
@JvmInline
value class BooleanValue(val value: Boolean) : AirbyteValue, Comparable<BooleanValue> {
override fun compareTo(other: BooleanValue): Int = value.compareTo(other.value)
}

@JvmInline value class IntegerValue(val value: Long) : AirbyteValue
@JvmInline
value class IntegerValue(val value: Long) : AirbyteValue, Comparable<IntegerValue> {
override fun compareTo(other: IntegerValue): Int = value.compareTo(other.value)
}

@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue
@JvmInline
value class NumberValue(val value: BigDecimal) : AirbyteValue, Comparable<NumberValue> {
override fun compareTo(other: NumberValue): Int = value.compareTo(other.value)
}

@JvmInline value class DateValue(val value: String) : AirbyteValue
@JvmInline
value class DateValue(val value: String) : AirbyteValue, Comparable<DateValue> {
override fun compareTo(other: DateValue): Int {
val thisDate =
try {
LocalDate.parse(value)
} catch (e: Exception) {
LocalDate.MIN
}
val otherDate =
try {
LocalDate.parse(other.value)
} catch (e: Exception) {
LocalDate.MIN
}
return thisDate.compareTo(otherDate)
}
}

@JvmInline value class TimestampValue(val value: String) : AirbyteValue
@JvmInline
value class TimestampValue(val value: String) : AirbyteValue, Comparable<TimestampValue> {
override fun compareTo(other: TimestampValue): Int {
// Do all comparisons using OffsetDateTime for convenience.
// First, try directly parsing as OffsetDateTime.
// If that fails, try parsing as LocalDateTime and assume UTC.
// We could maybe have separate value classes for these cases,
// but that comes with its own set of problems
// (mostly around sources declaring bad schemas).
val thisTimestamp =
try {
OffsetDateTime.parse(value)
} catch (e: Exception) {
LocalDateTime.parse(value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalDateTime.MIN.atOffset(ZoneOffset.UTC)
}
val otherTimestamp =
try {
OffsetDateTime.parse(other.value)
} catch (e: Exception) {
LocalDateTime.parse(other.value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalDateTime.MIN.atOffset(ZoneOffset.UTC)
}
return thisTimestamp.compareTo(otherTimestamp)
}
}

@JvmInline value class TimeValue(val value: String) : AirbyteValue
@JvmInline
value class TimeValue(val value: String) : AirbyteValue, Comparable<TimeValue> {
override fun compareTo(other: TimeValue): Int {
// Similar to TimestampValue, try parsing with/without timezone,
// and do all comparisons using OffsetTime.
val thisTime =
try {
OffsetTime.parse(value)
} catch (e: Exception) {
LocalTime.parse(value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalTime.MIN.atOffset(ZoneOffset.UTC)
}
val otherTime =
try {
OffsetTime.parse(other.value)
} catch (e: Exception) {
LocalTime.parse(other.value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalTime.MIN.atOffset(ZoneOffset.UTC)
}
return thisTime.compareTo(otherTime)
}
}

@JvmInline value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue
@JvmInline
value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue {
companion object {
fun from(list: List<Any?>): ArrayValue = ArrayValue(list.map { it as AirbyteValue })
}
}

@JvmInline value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue
@JvmInline
value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue {
companion object {
fun from(map: Map<String, Any?>): ObjectValue =
ObjectValue(map.mapValuesTo(linkedMapOf()) { (_, v) -> AirbyteValue.from(v) })
}
}

@JvmInline value class UnknownValue(val what: String) : AirbyteValue
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util

import java.time.OffsetDateTime
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class RecordDifferTest {
@Test
fun testBasicBehavior() {
val differ =
RecordDiffer(
primaryKey = listOf(listOf("id1"), listOf("id2")),
cursor = listOf("updated_at"),
)

val diff =
differ.diffRecords(
expectedRecords =
listOf(
// Extra expected record
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:00Z"),
"name" to "alice",
"phone" to "1234"
),
airbyteMeta = null
),
// Matching records
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:01Z"),
"name" to "bob",
),
airbyteMeta = null
),
// Different records
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:02Z"),
"name" to "charlie",
"phone" to "1234",
"email" to "[email protected]"
),
airbyteMeta = """{"sync_id": 12}""",
),
),
actualRecords =
listOf(
// Matching records
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:01Z"),
"name" to "bob",
),
airbyteMeta = null
),
// Different records
OutputRecord(
extractedAt = 1234,
generationId = 41,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:02Z"),
"name" to "charlie",
"phone" to "5678",
"address" to "1234 charlie street"
),
airbyteMeta = null
),
// Extra actual record
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:03Z"),
"name" to "dana",
),
airbyteMeta = null
),
),
)

Assertions.assertEquals(
"""
Missing record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00Z), name=StringValue(value=alice), phone=StringValue(value=1234)}), airbyteMeta=null)
Incorrect record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:02Z)):
generationId: Expected 42, got 41
airbyteMeta: Expected {"sync_id":12}, got null
phone: Expected StringValue(value=1234), but was StringValue(value=5678)
email: Expected StringValue([email protected]), but was <unset>
address: Expected <unset>, but was StringValue(value=1234 charlie street)
Unexpected record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:03Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00:03Z), name=StringValue(value=dana)}), airbyteMeta=null)
""".trimIndent(),
diff
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.data.ObjectValue
import java.time.Instant
import java.util.UUID

/** A record that we expect to exist in the destination, whether raw _or_ final. */
data class OutputRecord(
val rawId: UUID?,
val extractedAt: Instant,
val loadedAt: Instant?,
val generationId: Long?,
/**
* strongly-typed map, e.g. ZonedDateTime for timestamp_with_timezone. this makes destination
* test implementations easier. values can be null, b/c warehouse destinations with a JSON
* column type can be either SQL null, or JSON null, and we want to distinguish between those.
* Destinations _must_ filter out the airbyte_* fields from this map.
*/
val data: ObjectValue,
val airbyteMeta: JsonNode?,
) {
/** Utility constructor with easier types to write by hand */
constructor(
rawId: String,
extractedAt: Long,
loadedAt: Long?,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
) : this(
UUID.fromString(rawId),
Instant.ofEpochMilli(extractedAt),
loadedAt?.let { Instant.ofEpochMilli(it) },
generationId,
ObjectValue.from(data),
airbyteMeta?.let { ObjectMapper().readTree(it) },
)

/**
* Utility constructor for "expected records". [rawId] and [loadedAt] are generated by the
* destination at runtime, so we don't have those when writing the test. Just generate arbitrary
* values for them.
*/
constructor(
extractedAt: Long,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
) : this(
null,
Instant.ofEpochMilli(extractedAt),
loadedAt = null,
generationId,
ObjectValue.from(data),
airbyteMeta?.let { ObjectMapper().readTree(it) },
)
}
Loading
Loading