From 1bf7dde0fcee8883ab3490f2ea42b9b601119b15 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Mon, 25 Sep 2023 08:18:08 +0000 Subject: [PATCH 1/3] added retry with new ybclient --- .../YugabyteDBSnapshotChangeEventSource.java | 8 ++- .../YugabyteDBStreamingChangeEventSource.java | 71 +++++++++++++++---- 2 files changed, 66 insertions(+), 13 deletions(-) diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java index 3c152be7..35c78b9d 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java @@ -55,7 +55,7 @@ public class YugabyteDBSnapshotChangeEventSource extends AbstractSnapshotChangeE private final YugabyteDBConnection connection; private final AsyncYBClient asyncClient; - private final YBClient syncClient; + private YBClient syncClient; private OpId lastCompletelyProcessedLsn; @@ -592,6 +592,12 @@ protected SnapshotResult doExecute(ChangeEventSourceCon this.connectorConfig.maxConnectorRetries(), this.connectorConfig.connectorRetryDelayMs(), e); + if (e.toString().contains("tablet=null")) { + LOGGER.warn("Got a tablet=null error while fetching the snapshot, retrying with a new YBClient"); + this.syncClient = YBClientUtils.getYbClient(connectorConfig); + LOGGER.info("Created a new YBClient"); + } + try { final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM); retryMetronome.pause(); diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 0de7e29e..3601d1c5 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -81,7 +81,7 @@ public class YugabyteDBStreamingChangeEventSource implements protected OpId lastCompletelyProcessedLsn; protected final AsyncYBClient asyncYBClient; - protected final YBClient syncClient; + protected YBClient syncClient; protected YugabyteDBTypeRegistry yugabyteDBTypeRegistry; protected final Map checkPointMap; protected final ChangeEventQueue queue; @@ -348,14 +348,32 @@ protected void getChanges2(ChangeEventSourceContext context, // This schemaNeeded map here would have the elements as : Map schemaNeeded = new HashMap<>(); Map tabletSafeTime = new HashMap<>(); - for (Pair entry : tabletPairList) { + List> elementsToBeRemoved = new ArrayList<>(); + for (int i = 0; i < tabletPairList.size(); ++i) { + Pair entry = tabletPairList.get(i); // entry.getValue() will give the tabletId OpId opId = YBClientUtils.getOpIdFromGetTabletListResponse( tabletListResponse.get(entry.getKey()), entry.getValue()); if (opId == null) { - throw new RuntimeException(String.format("OpId for the given tablet {} was not found in the response," - + " restart the connector to try again", entry.getValue())); + // At this stage, we know we do not have the tablet-OpId, most probably because we lost the + // correct tablet list containing children from the list. We should remove the parent from the list + // and add its children. + GetTabletListToPollForCDCResponse getResp = syncClient.getTabletListToPollForCdc( + tableIdToTable.get(entry.getKey()), streamId, entry.getKey(), entry.getValue()); + + LOGGER.info("Response size while getting chilren for tablet {}: {}", entry.getValue(), + getResp.getTabletCheckpointPairListSize()); + + for (TabletCheckpointPair pair : getResp.getTabletCheckpointPairList()) { + addTabletIfNotPresent(tabletPairList, pair, entry.getKey(), offsetContext, schemaNeeded); + } + + // Pair entryToBeDeleted = getEntryToDelete(tabletPairList, entry.getValue()); + elementsToBeRemoved.add(entry); + LOGGER.info("Tablet {} will be removed later from the list before polling", entry.getValue()); + + continue; } // If we are getting a term and index as -1 and -1 from the server side it means @@ -366,14 +384,13 @@ protected void getChanges2(ChangeEventSourceContext context, opId = YugabyteDBOffsetContext.streamingStartLsn(); } - // For streaming, we do not want any colocated information and want to process the tables - // based on just their tablet IDs - pass false as the 'colocated' flag to enforce the same. - YBPartition p = new YBPartition(entry.getKey(), entry.getValue(), false /* colocated */); - offsetContext.initSourceInfo(p, this.connectorConfig, opId); - // We can initialise the explicit checkpoint for this tablet to the value returned by - // the cdc_service through the 'GetTabletListToPollForCDC' API - tabletToExplicitCheckpoint.put(p.getId(), opId.toCdcSdkCheckpoint()); - schemaNeeded.put(p.getId(), Boolean.TRUE); + initializePartitionsAndOffsets(entry.getKey() /* tableId */, entry.getValue() /* tablet ID */, + offsetContext, tabletToExplicitCheckpoint, schemaNeeded, opId); + } + + for (Pair entry : elementsToBeRemoved) { + tabletPairList.remove(entry); + LOGGER.info("Removed original entry for the tablet {} from list as it has been split", entry.getValue()); } // This will contain the tablet ID mapped to the number of records it has seen @@ -755,6 +772,12 @@ else if (message.isDDLMessage()) { // If there are retries left, perform them after the specified delay. LOGGER.warn("Error while trying to get the changes from the server; will attempt retry {} of {} after {} milli-seconds. Exception: {}", retryCount, connectorConfig.maxConnectorRetries(), connectorConfig.connectorRetryDelayMs(), e); + + if (e.toString().contains("tablet=null")) { + LOGGER.warn("Got a tablet = null error, retrying with a new YBClient"); + this.syncClient = YBClientUtils.getYbClient(connectorConfig); + LOGGER.info("Created a new YBClient for retrying"); + } try { final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM); @@ -768,6 +791,30 @@ else if (message.isDDLMessage()) { } } + /** + * Initialize the partition and offsets for the given tablet to get them ready for streaming. + * @param tableId table UUID + * @param tabletId tablet UUID + * @param offsetContext the {@link YugabyteDBOffsetContext} + * @param tabletToExplicitCheckpoint map containing tabletId-explicitCheckpoint mapping + * @param schemaNeeded map with information whether to get the schema for a tablet + * @param opId the OpId to initialize the partitions with + */ + private void initializePartitionsAndOffsets(String tableId, String tabletId, YugabyteDBOffsetContext offsetContext, + Map tabletToExplicitCheckpoint, + Map schemaNeeded, OpId opId) { + // For streaming, we do not want any colocated information and want to process the tables + // based on just their tablet IDs - pass false as the 'colocated' flag to enforce the same. + YBPartition p = new YBPartition(tableId, tabletId, false /* colocated */); + + offsetContext.initSourceInfo(p, this.connectorConfig, opId); + + // We can initialise the explicit checkpoint for this tablet to the value returned by + // the cdc_service through the 'GetTabletListToPollForCDC' API + tabletToExplicitCheckpoint.put(p.getId(), opId.toCdcSdkCheckpoint()); + schemaNeeded.put(p.getId(), Boolean.TRUE); + } + private void probeConnectionIfNeeded() throws SQLException { // CDCSDK Find out why it fails. // if (connectionProbeTimer.hasElapsed()) { From d44ce7d2d26728ce5a270e7b021e8a604f0273a5 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 3 Oct 2023 17:11:14 +0000 Subject: [PATCH 2/3] changed code to handle recoverable exception --- pom.xml | 2 +- .../YugabyteDBSnapshotChangeEventSource.java | 54 ++++++++------ .../YugabyteDBStreamingChangeEventSource.java | 70 +++++++++---------- 3 files changed, 65 insertions(+), 61 deletions(-) diff --git a/pom.xml b/pom.xml index 806d0917..a89c1da2 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ 3.3.1 1.7.36 1.4.0 - 0.8.64-20230929.065833-2 + 0.8.65-SNAPSHOT 2.8.9