Skip to content

Commit

Permalink
build a recorddiffer that handles strongly-typed records
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Sep 16, 2024
1 parent ee94dc9 commit ce7478f
Show file tree
Hide file tree
Showing 6 changed files with 449 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util

import java.time.Instant
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class RecordDifferTest {
@Test
fun testBasicBehavior() {
val differ =
RecordDiffer(
extractPrimaryKey = { listOf(it.data["id1"], it.data["id2"]) },
extractCursor = { it.data["updated_at"] }
)

val diff =
differ.diffRecords(
expectedRecords =
listOf(
// Extra expected record
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to Instant.parse("1970-01-01T00:00:00Z"),
"name" to "alice",
"phone" to "1234"
),
airbyteMeta = null
),
// Matching records
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to Instant.parse("1970-01-01T00:00:01Z"),
"name" to "bob",
),
airbyteMeta = null
),
// Different records
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to Instant.parse("1970-01-01T00:00:02Z"),
"name" to "charlie",
"phone" to "1234",
"email" to "[email protected]"
),
airbyteMeta = """{"sync_id": 12}""",
),
),
actualRecords =
listOf(
// Matching records
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to Instant.parse("1970-01-01T00:00:01Z"),
"name" to "bob",
),
airbyteMeta = null
),
// Different records
OutputRecord(
extractedAt = 1234,
generationId = 41,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to Instant.parse("1970-01-01T00:00:02Z"),
"name" to "charlie",
"phone" to "5678",
"address" to "1234 charlie street"
),
airbyteMeta = null
),
// Extra actual record
OutputRecord(
extractedAt = 1234,
generationId = 42,
mapOf(
"id1" to 1,
"id2" to 100,
"updated_at" to Instant.parse("1970-01-01T00:00:03Z"),
"name" to "dana",
),
airbyteMeta = null
),
),
)

Assertions.assertEquals(
"""
Missing record (pk=[1, 100], cursor=1970-01-01T00:00:00Z): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data={id1=1, id2=100, updated_at=1970-01-01T00:00:00Z, name=alice, phone=1234}, airbyteMeta=null)
Incorrect record ((pk=[1, 100], cursor=1970-01-01T00:00:02Z):
generationId: Expected 42, got 41
airbyteMeta: Expected {"sync_id":12}, got null
phone: Expected 1234, but was 5678
email: Expected [email protected], but was <unset>
address: Expected <unset>, but was 1234 charlie street
Unexpected record (pk=[1, 100], cursor=1970-01-01T00:00:03Z): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data={id1=1, id2=100, updated_at=1970-01-01T00:00:03Z, name=dana}, airbyteMeta=null)
""".trimIndent(),
diff
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test.util

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import java.time.Instant
import java.util.UUID

/** A record that we expect to exist in the destination, whether raw _or_ final. */
data class OutputRecord(
val rawId: UUID?,
val extractedAt: Instant,
val loadedAt: Instant?,
val generationId: Long?,
/**
* strongly-typed map, e.g. ZonedDateTime for timestamp_with_timezone. this makes destination
* test implementations easier. values can be null, b/c warehouse destinations with a JSON
* column type can be either SQL null, or JSON null, and we want to distinguish between those.
* Destinations _must_ filter out the airbyte_* fields from this map.
*/
val data: Map<String, Any?>,
val airbyteMeta: JsonNode?,
) {
/** Utility constructor with easier types to write by hand */
constructor(
rawId: String,
extractedAt: Long,
loadedAt: Long?,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
) : this(
UUID.fromString(rawId),
Instant.ofEpochMilli(extractedAt),
loadedAt?.let { Instant.ofEpochMilli(it) },
generationId,
data,
airbyteMeta?.let { ObjectMapper().readTree(it) },
)

/**
* Utility constructor for "expected records". [rawId] and [loadedAt] are generated by the
* destination at runtime, so we don't have those when writing the test. Just generate arbitrary
* values for them.
*/
constructor(
extractedAt: Long,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
) : this(
null,
Instant.ofEpochMilli(extractedAt),
loadedAt = null,
generationId,
data,
airbyteMeta?.let { ObjectMapper().readTree(it) },
)
}
Loading

0 comments on commit ce7478f

Please sign in to comment.