diff --git a/queue/src/main/java/org/killbill/queue/DBBackedQueue.java b/queue/src/main/java/org/killbill/queue/DBBackedQueue.java index 2b779ea2..e38470cb 100644 --- a/queue/src/main/java/org/killbill/queue/DBBackedQueue.java +++ b/queue/src/main/java/org/killbill/queue/DBBackedQueue.java @@ -286,12 +286,11 @@ public Void inTransaction(final QueueSqlDao transactional, final TransactionS final List lateEntries = new LinkedList(); for (final T entryLeftBehind : entriesLeftBehind) { // entryIsBeingProcessedByThisNode is a sign of a stuck entry on this node - final boolean entryIsBeingProcessedByThisNode = owner.equals(entryLeftBehind.getProcessingOwner()); // entryCreatedByThisNodeAndNeverProcessed is likely a sign of the queue being late final boolean entryCreatedByThisNodeAndNeverProcessed = owner.equals(entryLeftBehind.getCreatingOwner()) && entryLeftBehind.getProcessingOwner() == null; if (entryCreatedByThisNodeAndNeverProcessed) { lateEntries.add(entryLeftBehind); - } else { /* This includes entryIsBeingProcessedByThisNode. See https://github.com/killbill/killbill-commons/issues/169 */ + } else { /* This includes entryIsBeingProcessedByThisNode (owner.equals(entryLeftBehind.getProcessingOwner())). See https://github.com/killbill/killbill-commons/issues/169 */ // Set the status to REAPED in the history table entryLeftBehind.setProcessingState(PersistentQueueEntryLifecycleState.REAPED); entriesToReInsert.add(entryLeftBehind); diff --git a/queue/src/test/java/org/killbill/TestSetup.java b/queue/src/test/java/org/killbill/TestSetup.java index 21f05b36..3e32fc33 100644 --- a/queue/src/test/java/org/killbill/TestSetup.java +++ b/queue/src/test/java/org/killbill/TestSetup.java @@ -53,6 +53,9 @@ public class TestSetup { private static final String TEST_DB_PROPERTY_PREFIX = "org.killbill.billing.dbi.test."; + protected static final Long SEARCH_KEY_1 = 1L; + protected static final Long SEARCH_KEY_2 = 2L; + protected EmbeddedDB embeddedDB; protected DBI dbi; diff --git a/queue/src/test/java/org/killbill/queue/TestReaper.java b/queue/src/test/java/org/killbill/queue/TestReaper.java index 1be72c40..3534acba 100644 --- a/queue/src/test/java/org/killbill/queue/TestReaper.java +++ b/queue/src/test/java/org/killbill/queue/TestReaper.java @@ -42,8 +42,6 @@ public class TestReaper extends TestSetup { - private static final Long SEARCH_KEY_1 = 1L; - private static final Long SEARCH_KEY_2 = 2L; private DBBackedQueue queue; private PersistentBusSqlDao sqlDao; diff --git a/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java b/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java index 4c09a8e4..879bfa7b 100644 --- a/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java +++ b/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java @@ -31,6 +31,7 @@ import org.killbill.TestSetup; import org.killbill.bus.DefaultPersistentBus; import org.killbill.bus.api.BusEvent; +import org.killbill.bus.api.BusEventWithMetadata; import org.killbill.bus.api.PersistentBus.EventBusException; import org.killbill.bus.api.PersistentBusConfig; import org.killbill.bus.dao.BusEventModelDao; @@ -39,6 +40,7 @@ import org.killbill.commons.eventbus.Subscribe; import org.killbill.queue.api.PersistentQueueEntryLifecycleState; import org.skife.config.TimeSpan; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -164,9 +166,17 @@ public void testWithStuckEntryProcessedByThisNode() throws EventBusException, Js // Trigger reaper clock.addDeltaFromReality(config.getReapThreshold().getMillis()); - // It's a no-op though (it won't reap itself) - handler.ensureNotSeen(event2); - handler.assertSeenEvents(2); + // Give a chance to the reaper to run + Thread.sleep(500); + + clock.addDeltaFromReality(1000); + handler.waitFor(event2); + + // See https://github.com/killbill/killbill-commons/issues/169 + handler.assertSeenEvents(3); + final Iterable> result = bus.getHistoricalBusEventsForSearchKey2(now, SEARCH_KEY_2); + final long nbItems = result.spliterator().getExactSizeIfKnown(); + assertEquals(nbItems, 4); } @@ -278,8 +288,8 @@ public DummyEvent(@JsonProperty("name") final String name, public DummyEvent() { this(UUID.randomUUID().toString(), - System.currentTimeMillis(), - System.currentTimeMillis(), + SEARCH_KEY_1, + SEARCH_KEY_2, UUID.randomUUID()); }