From 91440a7be718a860db3c82dcddbf367f4b95fccf Mon Sep 17 00:00:00 2001 From: stephane brossier Date: Thu, 11 Apr 2024 19:04:14 -0700 Subject: [PATCH 1/2] queue: Allow stuck entries (same node) to be reaped. See #169 --- .../org/killbill/queue/DBBackedQueue.java | 19 ++++--------------- .../java/org/killbill/queue/TestReaper.java | 14 ++++++++------ 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/queue/src/main/java/org/killbill/queue/DBBackedQueue.java b/queue/src/main/java/org/killbill/queue/DBBackedQueue.java index afde91d4..2b779ea2 100644 --- a/queue/src/main/java/org/killbill/queue/DBBackedQueue.java +++ b/queue/src/main/java/org/killbill/queue/DBBackedQueue.java @@ -282,39 +282,28 @@ public Void inTransaction(final QueueSqlDao transactional, final TransactionS return null; } - final Collection entriesToMove = new ArrayList(entriesLeftBehind.size()); final List entriesToReInsert = new ArrayList(entriesLeftBehind.size()); - final List stuckEntries = new LinkedList(); final List lateEntries = new LinkedList(); for (final T entryLeftBehind : entriesLeftBehind) { // entryIsBeingProcessedByThisNode is a sign of a stuck entry on this node final boolean entryIsBeingProcessedByThisNode = owner.equals(entryLeftBehind.getProcessingOwner()); // entryCreatedByThisNodeAndNeverProcessed is likely a sign of the queue being late final boolean entryCreatedByThisNodeAndNeverProcessed = owner.equals(entryLeftBehind.getCreatingOwner()) && entryLeftBehind.getProcessingOwner() == null; - if (entryIsBeingProcessedByThisNode) { - // See https://github.com/killbill/killbill-commons/issues/47 - stuckEntries.add(entryLeftBehind); - } else if (entryCreatedByThisNodeAndNeverProcessed) { + if (entryCreatedByThisNodeAndNeverProcessed) { lateEntries.add(entryLeftBehind); - } else { - // Fields will be reset appropriately in insertReapedEntriesFromTransaction - entriesToReInsert.add(entryLeftBehind); - + } else { /* This includes entryIsBeingProcessedByThisNode. See https://github.com/killbill/killbill-commons/issues/169 */ // Set the status to REAPED in the history table entryLeftBehind.setProcessingState(PersistentQueueEntryLifecycleState.REAPED); - entriesToMove.add(entryLeftBehind); + entriesToReInsert.add(entryLeftBehind); } } - if (!stuckEntries.isEmpty()) { - log.warn("{} reapEntries: stuck queue entries {}", DB_QUEUE_LOG_ID, stuckEntries); - } if (!lateEntries.isEmpty()) { log.warn("{} reapEntries: late queue entries {}", DB_QUEUE_LOG_ID, lateEntries); } if (!entriesToReInsert.isEmpty()) { - moveEntriesToHistoryFromTransaction(transactional, entriesToMove); + moveEntriesToHistoryFromTransaction(transactional, entriesToReInsert); insertReapedEntriesFromTransaction(transactional, entriesToReInsert, now); log.warn("{} reapEntries: {} entries were reaped by {} {}", DB_QUEUE_LOG_ID, diff --git a/queue/src/test/java/org/killbill/queue/TestReaper.java b/queue/src/test/java/org/killbill/queue/TestReaper.java index 5bad29c9..1be72c40 100644 --- a/queue/src/test/java/org/killbill/queue/TestReaper.java +++ b/queue/src/test/java/org/killbill/queue/TestReaper.java @@ -102,9 +102,10 @@ public void testReapEntries() { queue.reapEntries(now.minus(config.getReapThreshold().getMillis()).toDate()); final List readyEntriesAfterReaping = sqlDao.getReadyEntries(now.toDate(), 10, CreatorName.get(), config.getTableName()); - assertEquals(readyEntriesAfterReaping.size(), 2); + assertEquals(readyEntriesAfterReaping.size(), 3); assertEquals(readyEntriesAfterReaping.get(0).getRecordId(), (Long) 1L); assertTrue(readyEntriesAfterReaping.get(1).getRecordId() > (Long) 6L); + assertTrue(readyEntriesAfterReaping.get(2).getRecordId() > (Long) 6L); final List readyOrInProcessingAfterReaping = Iterators.toUnmodifiableList(sqlDao.getReadyOrInProcessingQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getTableName())); assertEquals(readyOrInProcessingAfterReaping.size(), 6); @@ -112,16 +113,17 @@ public void testReapEntries() { assertEquals(readyOrInProcessingAfterReaping.get(1).getRecordId(), (Long) 2L); assertEquals(readyOrInProcessingAfterReaping.get(2).getRecordId(), (Long) 3L); assertEquals(readyOrInProcessingAfterReaping.get(3).getRecordId(), (Long) 4L); - // That stuck entry hasn't moved (https://github.com/killbill/killbill-commons/issues/47) - assertEquals(readyOrInProcessingAfterReaping.get(4).getRecordId(), (Long) 5L); - // New (reaped) one + // New (reaped) ones + assertTrue(readyOrInProcessingAfterReaping.get(4).getRecordId() > (Long) 6L); assertTrue(readyOrInProcessingAfterReaping.get(5).getRecordId() > (Long) 6L); // Check history table final List historicalQueueEntries = Iterators.toUnmodifiableList(sqlDao.getHistoricalQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getHistoryTableName())); - assertEquals(historicalQueueEntries.size(), 1); + assertEquals(historicalQueueEntries.size(), 2); assertEquals(historicalQueueEntries.get(0).getProcessingState(), PersistentQueueEntryLifecycleState.REAPED); - assertEquals(historicalQueueEntries.get(0).getUserToken(), readyOrInProcessingAfterReaping.get(5).getUserToken()); + assertEquals(historicalQueueEntries.get(0).getUserToken(), readyOrInProcessingAfterReaping.get(4).getUserToken()); + assertEquals(historicalQueueEntries.get(1).getProcessingState(), PersistentQueueEntryLifecycleState.REAPED); + assertEquals(historicalQueueEntries.get(1).getUserToken(), readyOrInProcessingAfterReaping.get(5).getUserToken()); } private BusEventModelDao createEntry(final long recordId, From 158f8747ef578aa4b1fa35da8fe1f2262f0084fa Mon Sep 17 00:00:00 2001 From: stephane brossier Date: Fri, 12 Apr 2024 11:13:15 -0700 Subject: [PATCH 2/2] queue: Fix broken test from 91440a7be7. See #169 --- .../org/killbill/queue/DBBackedQueue.java | 3 +-- .../src/test/java/org/killbill/TestSetup.java | 3 +++ .../java/org/killbill/queue/TestReaper.java | 2 -- .../killbill/queue/TestReaperIntegration.java | 20 ++++++++++++++----- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/queue/src/main/java/org/killbill/queue/DBBackedQueue.java b/queue/src/main/java/org/killbill/queue/DBBackedQueue.java index 2b779ea2..e38470cb 100644 --- a/queue/src/main/java/org/killbill/queue/DBBackedQueue.java +++ b/queue/src/main/java/org/killbill/queue/DBBackedQueue.java @@ -286,12 +286,11 @@ public Void inTransaction(final QueueSqlDao transactional, final TransactionS final List lateEntries = new LinkedList(); for (final T entryLeftBehind : entriesLeftBehind) { // entryIsBeingProcessedByThisNode is a sign of a stuck entry on this node - final boolean entryIsBeingProcessedByThisNode = owner.equals(entryLeftBehind.getProcessingOwner()); // entryCreatedByThisNodeAndNeverProcessed is likely a sign of the queue being late final boolean entryCreatedByThisNodeAndNeverProcessed = owner.equals(entryLeftBehind.getCreatingOwner()) && entryLeftBehind.getProcessingOwner() == null; if (entryCreatedByThisNodeAndNeverProcessed) { lateEntries.add(entryLeftBehind); - } else { /* This includes entryIsBeingProcessedByThisNode. See https://github.com/killbill/killbill-commons/issues/169 */ + } else { /* This includes entryIsBeingProcessedByThisNode (owner.equals(entryLeftBehind.getProcessingOwner())). See https://github.com/killbill/killbill-commons/issues/169 */ // Set the status to REAPED in the history table entryLeftBehind.setProcessingState(PersistentQueueEntryLifecycleState.REAPED); entriesToReInsert.add(entryLeftBehind); diff --git a/queue/src/test/java/org/killbill/TestSetup.java b/queue/src/test/java/org/killbill/TestSetup.java index 21f05b36..3e32fc33 100644 --- a/queue/src/test/java/org/killbill/TestSetup.java +++ b/queue/src/test/java/org/killbill/TestSetup.java @@ -53,6 +53,9 @@ public class TestSetup { private static final String TEST_DB_PROPERTY_PREFIX = "org.killbill.billing.dbi.test."; + protected static final Long SEARCH_KEY_1 = 1L; + protected static final Long SEARCH_KEY_2 = 2L; + protected EmbeddedDB embeddedDB; protected DBI dbi; diff --git a/queue/src/test/java/org/killbill/queue/TestReaper.java b/queue/src/test/java/org/killbill/queue/TestReaper.java index 1be72c40..3534acba 100644 --- a/queue/src/test/java/org/killbill/queue/TestReaper.java +++ b/queue/src/test/java/org/killbill/queue/TestReaper.java @@ -42,8 +42,6 @@ public class TestReaper extends TestSetup { - private static final Long SEARCH_KEY_1 = 1L; - private static final Long SEARCH_KEY_2 = 2L; private DBBackedQueue queue; private PersistentBusSqlDao sqlDao; diff --git a/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java b/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java index 4c09a8e4..879bfa7b 100644 --- a/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java +++ b/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java @@ -31,6 +31,7 @@ import org.killbill.TestSetup; import org.killbill.bus.DefaultPersistentBus; import org.killbill.bus.api.BusEvent; +import org.killbill.bus.api.BusEventWithMetadata; import org.killbill.bus.api.PersistentBus.EventBusException; import org.killbill.bus.api.PersistentBusConfig; import org.killbill.bus.dao.BusEventModelDao; @@ -39,6 +40,7 @@ import org.killbill.commons.eventbus.Subscribe; import org.killbill.queue.api.PersistentQueueEntryLifecycleState; import org.skife.config.TimeSpan; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -164,9 +166,17 @@ public void testWithStuckEntryProcessedByThisNode() throws EventBusException, Js // Trigger reaper clock.addDeltaFromReality(config.getReapThreshold().getMillis()); - // It's a no-op though (it won't reap itself) - handler.ensureNotSeen(event2); - handler.assertSeenEvents(2); + // Give a chance to the reaper to run + Thread.sleep(500); + + clock.addDeltaFromReality(1000); + handler.waitFor(event2); + + // See https://github.com/killbill/killbill-commons/issues/169 + handler.assertSeenEvents(3); + final Iterable> result = bus.getHistoricalBusEventsForSearchKey2(now, SEARCH_KEY_2); + final long nbItems = result.spliterator().getExactSizeIfKnown(); + assertEquals(nbItems, 4); } @@ -278,8 +288,8 @@ public DummyEvent(@JsonProperty("name") final String name, public DummyEvent() { this(UUID.randomUUID().toString(), - System.currentTimeMillis(), - System.currentTimeMillis(), + SEARCH_KEY_1, + SEARCH_KEY_2, UUID.randomUUID()); }