Skip to content

Commit

Permalink
Load CDK: E2E Test Replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Sep 19, 2024
1 parent b467c50 commit 3c769c5
Show file tree
Hide file tree
Showing 43 changed files with 665 additions and 984 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ class CheckOperation<T : ConfigurationJsonObjectBase, C : DestinationConfigurati
) : Operation {
override fun execute() {
try {
val pojo: T = configJsonObjectSupplier.get()
val config: C = configFactory.make(pojo)
val pojo = configJsonObjectSupplier.get()
val config = configFactory.make(pojo)
destinationCheckOperation.check(config)
val successMessage =
AirbyteMessage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
private val byDescriptor: Map<DestinationStream.Descriptor, DestinationStream> =
streams.associateBy { it.descriptor }

fun getStream(name: String, namespace: String): DestinationStream {
fun getStream(name: String, namespace: String?): DestinationStream {
val descriptor = DestinationStream.Descriptor(namespace = namespace, name = name)
return byDescriptor[descriptor]
?: throw IllegalArgumentException("Stream not found: namespace=$namespace, name=$name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,36 @@

package io.airbyte.cdk.command

import io.micronaut.context.annotation.ConfigurationProperties
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

@ConfigurationProperties("destination.config")
interface DestinationConfiguration : Configuration {
val recordBatchSizeBytes: Long
val firstStageTmpFilePrefix: String

/** Memory queue settings */
val maxMessageQueueMemoryUsageRatio: Double // as fraction of available memory
val estimatedRecordMemoryOverheadRatio: Double // 0 => No overhead, 1.0 => 2x overhead

/**
* Micronaut factory which glues [ConfigurationJsonObjectSupplier] and
* [DestinationConfigurationFactory] together to produce a [DestinationConfiguration] singleton.
*/
@Factory
private class MicronautFactory {
@Singleton
fun <I : ConfigurationJsonObjectBase> sourceConfig(
fun <I : ConfigurationJsonObjectBase> destinationConfig(
pojoSupplier: ConfigurationJsonObjectSupplier<I>,
factory: DestinationConfigurationFactory<I, out DestinationConfiguration>,
): DestinationConfiguration = factory.make(pojoSupplier.get())
): DestinationConfiguration {
return factory.make(pojoSupplier.get())
}
}

companion object {
const val DEFAULT_RECORD_BATCH_SIZE_BYTES: Long = 200L * 1024L * 1024L
const val DEFAULT_FIRST_STAGE_TMP_FILE_PREFIX = "airbyte-cdk-load-staged-raw-records"
const val DEFAULT_MAX_MESSAGE_QUEUE_MEMORY_USAGE_RATIO: Double = 0.2
const val DEFAULT_ESTIMATED_RECORD_MEMORY_OVERHEAD_RATIO: Double = 0.1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ data class DestinationStream(
val minimumGenerationId: Long,
val syncId: Long,
) {
data class Descriptor(val namespace: String, val name: String) {
data class Descriptor(val namespace: String?, val name: String) {
fun asProtocolObject(): StreamDescriptor =
StreamDescriptor().withNamespace(namespace).withName(name)
StreamDescriptor().withName(name).also {
if (namespace != null) {
it.namespace = namespace
}
}
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ sealed interface CheckpointMessage : DestinationMessage {
data class StreamCheckpoint(
val checkpoint: Checkpoint,
override val sourceStats: Stats,
override val destinationStats: Stats? = null
override val destinationStats: Stats? = null,
val additionalProperties: MutableMap<String, Any>
) : CheckpointMessage {
override fun withDestinationStats(stats: Stats) =
StreamCheckpoint(checkpoint, sourceStats, stats)
StreamCheckpoint(checkpoint, sourceStats, stats, additionalProperties)

override fun asProtocolMessage(): AirbyteMessage {
val stateMessage =
Expand All @@ -166,10 +167,11 @@ data class GlobalCheckpoint(
val state: JsonNode,
override val sourceStats: Stats,
override val destinationStats: Stats? = null,
val checkpoints: List<Checkpoint> = emptyList()
val checkpoints: List<Checkpoint> = emptyList(),
val additionalProperties: MutableMap<String, Any> = mutableMapOf()
) : CheckpointMessage {
override fun withDestinationStats(stats: Stats) =
GlobalCheckpoint(state, sourceStats, stats, checkpoints)
GlobalCheckpoint(state, sourceStats, stats, checkpoints, additionalProperties)

override fun asProtocolMessage(): AirbyteMessage {
val stateMessage =
Expand Down Expand Up @@ -259,15 +261,19 @@ class DestinationMessageFactory(private val catalog: DestinationCatalog) {
StreamCheckpoint(
checkpoint = fromAirbyteStreamState(message.state.stream),
sourceStats =
Stats(recordCount = message.state.sourceStats.recordCount.toLong())
Stats(recordCount = message.state.sourceStats.recordCount.toLong()),
additionalProperties = message.state.additionalProperties
)
AirbyteStateMessage.AirbyteStateType.GLOBAL ->
GlobalCheckpoint(
sourceStats =
Stats(recordCount = message.state.sourceStats.recordCount.toLong()),
state = message.state.global.sharedState,
checkpoints =
message.state.global.streamStates.map { fromAirbyteStreamState(it) }
message.state.global.streamStates.map {
fromAirbyteStreamState(it)
},
additionalProperties = message.state.additionalProperties
)
else -> // TODO: Do we still need to handle LEGACY?
Undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ class DefaultDestinationMessageDeserializer(private val messageFactory: Destinat

override fun deserialize(serialized: String): DestinationMessage {
try {
val node = Jsons.readTree(serialized)
val airbyteMessage = Jsons.treeToValue(node, AirbyteMessage::class.java)
// val node = Jsons.readTree(serialized)
val airbyteMessage = Jsons.readValue(serialized, AirbyteMessage::class.java)
return messageFactory.fromAirbyteMessage(airbyteMessage, serialized)
} catch (t: Throwable) {
throw RuntimeException("Failed to deserialize AirbyteMessage")
// TODO: REMOVE THIS BEFORE RELEASE!
throw RuntimeException("Failed to deserialize AirbyteMessage: $serialized", t)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package io.airbyte.cdk.message

import io.airbyte.cdk.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.command.WriteConfiguration
import io.airbyte.cdk.state.MemoryManager
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
Expand Down Expand Up @@ -51,7 +51,7 @@ data class StreamCompleteWrapped(
@Singleton
class DestinationMessageQueue(
catalog: DestinationCatalog,
config: WriteConfiguration,
config: DestinationConfiguration,
private val memoryManager: MemoryManager,
private val queueChannelFactory: QueueChannelFactory<DestinationRecordWrapped>
) : MessageQueue<DestinationStream, DestinationRecordWrapped> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class DefaultMessageConverter : MessageConverter<CheckpointMessage, AirbyteMessa
)
.withType(AirbyteStateMessage.AirbyteStateType.STREAM)
.withStream(fromStreamState(message.checkpoint))
.also {
message.additionalProperties.forEach { (key, value) ->
it.withAdditionalProperty(key, value)
}
}
is GlobalCheckpoint ->
AirbyteStateMessage()
.withSourceStats(
Expand All @@ -58,6 +63,11 @@ class DefaultMessageConverter : MessageConverter<CheckpointMessage, AirbyteMessa
.withSharedState(message.state)
.withStreamStates(message.checkpoints.map { fromStreamState(it) })
)
.also {
message.additionalProperties.forEach { (key, value) ->
it.withAdditionalProperty(key, value)
}
}
}
return AirbyteMessage().withType(AirbyteMessage.Type.STATE).withState(state)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package io.airbyte.cdk.task

import com.google.common.collect.Range
import io.airbyte.cdk.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.command.WriteConfiguration
import io.airbyte.cdk.message.BatchEnvelope
import io.airbyte.cdk.message.DestinationRecordWrapped
import io.airbyte.cdk.message.MessageQueueReader
Expand Down Expand Up @@ -38,7 +38,7 @@ import kotlinx.coroutines.yield
* spilled.
*/
class SpillToDiskTask(
private val config: WriteConfiguration,
private val config: DestinationConfiguration,
private val queueReader: MessageQueueReader<DestinationStream, DestinationRecordWrapped>,
private val streamLoader: StreamLoader,
private val launcher: DestinationTaskLauncher
Expand Down Expand Up @@ -117,7 +117,7 @@ class SpillToDiskTask(

@Singleton
class SpillToDiskTaskFactory(
private val config: WriteConfiguration,
private val config: DestinationConfiguration,
private val queueReader: MessageQueueReader<DestinationStream, DestinationRecordWrapped>
) {
fun make(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kotlinx.coroutines.runBlocking
class WriteOperation(
private val inputConsumer: InputConsumer<DestinationMessage>,
private val taskLauncher: TaskLauncher,
private val taskRunner: TaskRunner
private val taskRunner: TaskRunner,
) : Operation {
override fun execute() {
runBlocking {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
plugins {
id 'application'
id 'airbyte-java-connector'
}

Expand All @@ -9,9 +10,34 @@ airbyteJavaConnector {
}

application {
mainClass = 'io.airbyte.integrations.destination.e2e_test.TestingDestinations'
mainClass = 'io.airbyte.integrations.destination.e2e_test.E2EDestination'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']

// Uncomment and replace to run locally
//applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '--add-opens', 'java.base/java.lang=ALL-UNNAMED']
}

// Uncomment to run locally
//run {
// standardInput = System.in
//}

dependencies {
def vBulkCDK = '0.9'
def vMicronaut = '4.3.13'

ksp "io.micronaut:micronaut-inject-kotlin:${vMicronaut}"

//implementation "io.airbyte.bulk-cdk:bulk-cdk-core-base:${vBulkCDK}"
implementation project(":airbyte-cdk:bulk:core:bulk-cdk-core-base")
//implementation "io.airbyte.bulk-cdk:bulk-cdk-core-load:${vBulkCDK}"
implementation project(":airbyte-cdk:bulk:core:bulk-cdk-core-load")

kspTest "io.micronaut:micronaut-inject-kotlin:${vMicronaut}"

testImplementation platform('org.testcontainers:testcontainers-bom:1.19.8')
testImplementation project(":airbyte-cdk:bulk:core:bulk-cdk-core-load")
testImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load"))

// testImplementation testFixtures("io.airbyte.bulk-cdk:bulk-cdk-core-load:${vBulkCDK}"
}

This file was deleted.

Loading

0 comments on commit 3c769c5

Please sign in to comment.