Skip to content

Commit

Permalink
build a recorddiffer that handles strongly-typed records
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Sep 17, 2024
1 parent ee94dc9 commit baa37fd
Show file tree
Hide file tree
Showing 7 changed files with 581 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,144 @@
package io.airbyte.cdk.data

import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import java.time.ZoneOffset

sealed interface AirbyteValue
sealed interface AirbyteValue {
companion object {
fun from(value: Any?): AirbyteValue =
when (value) {
null -> NullValue
is String -> StringValue(value)
is Boolean -> BooleanValue(value)
is Int -> IntegerValue(value.toLong())
is Long -> IntegerValue(value)
is Double -> NumberValue(BigDecimal.valueOf(value))
is BigDecimal -> NumberValue(value)
is LocalDate -> DateValue(value.toString())
is OffsetDateTime, is LocalDateTime -> TimestampValue(value.toString())
is OffsetTime, is LocalTime -> TimeValue(value.toString())
is Map<*, *> -> ObjectValue.from(@Suppress("UNCHECKED_CAST") (value as Map<String, Any?>))
is List<*> -> ArrayValue.from(value)
else -> throw IllegalArgumentException("Unrecognized value (${value.javaClass.name}: $value")
}
}
}

data object NullValue : AirbyteValue
// Comparable implementations are intended for use in tests.
// They're not particularly robust, and probably shouldn't be relied on
// for actual sync-time logic.
// (mostly the date/timestamp/time types - everything else is fine)
data object NullValue : AirbyteValue, Comparable<NullValue> {
override fun compareTo(other: NullValue): Int = 0
}

@JvmInline value class StringValue(val value: String) : AirbyteValue
@JvmInline value class StringValue(val value: String) : AirbyteValue, Comparable<StringValue> {
override fun compareTo(other: StringValue): Int = value.compareTo(other.value)
}

@JvmInline value class BooleanValue(val value: Boolean) : AirbyteValue
@JvmInline value class BooleanValue(val value: Boolean) : AirbyteValue, Comparable<BooleanValue> {
override fun compareTo(other: BooleanValue): Int = value.compareTo(other.value)
}

@JvmInline value class IntegerValue(val value: Long) : AirbyteValue
@JvmInline value class IntegerValue(val value: Long) : AirbyteValue, Comparable<IntegerValue> {
override fun compareTo(other: IntegerValue): Int = value.compareTo(other.value)
}

@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue
@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue, Comparable<NumberValue> {
override fun compareTo(other: NumberValue): Int = value.compareTo(other.value)
}

@JvmInline value class DateValue(val value: String) : AirbyteValue
@JvmInline value class DateValue(val value: String) : AirbyteValue, Comparable<DateValue> {
override fun compareTo(other: DateValue): Int {
val thisDate =
try {
LocalDate.parse(value)
} catch (e: Exception) {
LocalDate.MIN
}
val otherDate =
try {
LocalDate.parse(other.value)
} catch (e: Exception) {
LocalDate.MIN
}
return thisDate.compareTo(otherDate)
}
}

@JvmInline value class TimestampValue(val value: String) : AirbyteValue
@JvmInline value class TimestampValue(val value: String) : AirbyteValue, Comparable<TimestampValue> {
override fun compareTo(other: TimestampValue): Int {
// Do all comparisons using OffsetDateTime for convenience.
// First, try directly parsing as OffsetDateTime.
// If that fails, try parsing as LocalDateTime and assume UTC.
// We could maybe have separate value classes for these cases,
// but that comes with its own set of problems
// (mostly around sources declaring bad schemas).
val thisTimestamp =
try {
OffsetDateTime.parse(value)
} catch (e: Exception) {
LocalDateTime.parse(value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalDateTime.MIN.atOffset(ZoneOffset.UTC)
}
val otherTimestamp =
try {
OffsetDateTime.parse(other.value)
} catch (e: Exception) {
LocalDateTime.parse(other.value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalDateTime.MIN.atOffset(ZoneOffset.UTC)
}
return thisTimestamp.compareTo(otherTimestamp)
}
}

@JvmInline value class TimeValue(val value: String) : AirbyteValue
@JvmInline value class TimeValue(val value: String) : AirbyteValue, Comparable<TimeValue> {
override fun compareTo(other: TimeValue): Int {
// Similar to TimestampValue, try parsing with/without timezone,
// and do all comparisons using OffsetTime.
val thisTime =
try {
OffsetTime.parse(value)
} catch (e: Exception) {
LocalTime.parse(value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalTime.MIN.atOffset(ZoneOffset.UTC)
}
val otherTime =
try {
OffsetTime.parse(other.value)
} catch (e: Exception) {
LocalTime.parse(other.value).atOffset(ZoneOffset.UTC)
} catch (e: Exception) {
LocalTime.MIN.atOffset(ZoneOffset.UTC)
}
return thisTime.compareTo(otherTime)
}
}

@JvmInline value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue
@JvmInline value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue {
companion object {
fun from(list: List<Any?>): ArrayValue =
ArrayValue(list.map { it as AirbyteValue })
}
}

@JvmInline value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue
@JvmInline value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue {
companion object {
fun from(map: Map<String, Any?>): ObjectValue =
ObjectValue(
map.mapValuesTo(linkedMapOf()) { (_, v) ->
AirbyteValue.from(v)
}
)
}
}

@JvmInline value class UnknownValue(val what: String) : AirbyteValue
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util

import java.time.OffsetDateTime
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class RecordDifferTest {
@Test
fun testBasicBehavior() {
val differ =
RecordDiffer(
primaryKey = listOf(listOf("id1"), listOf("id2")),
cursor = listOf("updated_at"),
)

val diff =
differ.diffRecords(
expectedRecords =
listOf(
// Extra expected record
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:00Z"),
"name" to "alice",
"phone" to "1234"
),
airbyteMeta = null
),
// Matching records
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:01Z"),
"name" to "bob",
),
airbyteMeta = null
),
// Different records
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:02Z"),
"name" to "charlie",
"phone" to "1234",
"email" to "[email protected]"
),
airbyteMeta = """{"sync_id": 12}""",
),
),
actualRecords =
listOf(
// Matching records
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:01Z"),
"name" to "bob",
),
airbyteMeta = null
),
// Different records
OutputRecord(
extractedAt = 1234,
generationId = 41,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:02Z"),
"name" to "charlie",
"phone" to "5678",
"address" to "1234 charlie street"
),
airbyteMeta = null
),
// Extra actual record
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to OffsetDateTime.parse("1970-01-01T00:00:03Z"),
"name" to "dana",
),
airbyteMeta = null
),
),
)

Assertions.assertEquals(
"""
Missing record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00Z), name=StringValue(value=alice), phone=StringValue(value=1234)}), airbyteMeta=null)
Incorrect record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:02Z)):
generationId: Expected 42, got 41
airbyteMeta: Expected {"sync_id":12}, got null
phone: Expected StringValue(value=1234), but was StringValue(value=5678)
email: Expected StringValue([email protected]), but was <unset>
address: Expected <unset>, but was StringValue(value=1234 charlie street)
Unexpected record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:03Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00:03Z), name=StringValue(value=dana)}), airbyteMeta=null)
""".trimIndent(),
diff
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.data.AirbyteValue
import io.airbyte.cdk.data.ObjectValue
import java.time.Instant
import java.util.UUID

/** A record that we expect to exist in the destination, whether raw _or_ final. */
data class OutputRecord(
val rawId: UUID?,
val extractedAt: Instant,
val loadedAt: Instant?,
val generationId: Long?,
/**
* strongly-typed map, e.g. ZonedDateTime for timestamp_with_timezone. this makes destination
* test implementations easier. values can be null, b/c warehouse destinations with a JSON
* column type can be either SQL null, or JSON null, and we want to distinguish between those.
* Destinations _must_ filter out the airbyte_* fields from this map.
*/
val data: ObjectValue,
val airbyteMeta: JsonNode?,
) {
/** Utility constructor with easier types to write by hand */
constructor(
rawId: String,
extractedAt: Long,
loadedAt: Long?,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
) : this(
UUID.fromString(rawId),
Instant.ofEpochMilli(extractedAt),
loadedAt?.let { Instant.ofEpochMilli(it) },
generationId,
ObjectValue.from(data),
airbyteMeta?.let { ObjectMapper().readTree(it) },
)

/**
* Utility constructor for "expected records". [rawId] and [loadedAt] are generated by the
* destination at runtime, so we don't have those when writing the test. Just generate arbitrary
* values for them.
*/
constructor(
extractedAt: Long,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
) : this(
null,
Instant.ofEpochMilli(extractedAt),
loadedAt = null,
generationId,
ObjectValue.from(data),
airbyteMeta?.let { ObjectMapper().readTree(it) },
)
}
Loading

0 comments on commit baa37fd

Please sign in to comment.