Skip to content

Commit

Permalink
queue: Fix broken test from 91440a7. See #169
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrossie committed Apr 12, 2024
1 parent 91440a7 commit 158f874
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
3 changes: 1 addition & 2 deletions queue/src/main/java/org/killbill/queue/DBBackedQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,11 @@ public Void inTransaction(final QueueSqlDao<T> transactional, final TransactionS
final List<T> lateEntries = new LinkedList<T>();
for (final T entryLeftBehind : entriesLeftBehind) {
// entryIsBeingProcessedByThisNode is a sign of a stuck entry on this node
final boolean entryIsBeingProcessedByThisNode = owner.equals(entryLeftBehind.getProcessingOwner());
// entryCreatedByThisNodeAndNeverProcessed is likely a sign of the queue being late
final boolean entryCreatedByThisNodeAndNeverProcessed = owner.equals(entryLeftBehind.getCreatingOwner()) && entryLeftBehind.getProcessingOwner() == null;
if (entryCreatedByThisNodeAndNeverProcessed) {
lateEntries.add(entryLeftBehind);
} else { /* This includes entryIsBeingProcessedByThisNode. See https://github.com/killbill/killbill-commons/issues/169 */
} else { /* This includes entryIsBeingProcessedByThisNode (owner.equals(entryLeftBehind.getProcessingOwner())). See https://github.com/killbill/killbill-commons/issues/169 */
// Set the status to REAPED in the history table
entryLeftBehind.setProcessingState(PersistentQueueEntryLifecycleState.REAPED);
entriesToReInsert.add(entryLeftBehind);
Expand Down
3 changes: 3 additions & 0 deletions queue/src/test/java/org/killbill/TestSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class TestSetup {

private static final String TEST_DB_PROPERTY_PREFIX = "org.killbill.billing.dbi.test.";

protected static final Long SEARCH_KEY_1 = 1L;
protected static final Long SEARCH_KEY_2 = 2L;

protected EmbeddedDB embeddedDB;

protected DBI dbi;
Expand Down
2 changes: 0 additions & 2 deletions queue/src/test/java/org/killbill/queue/TestReaper.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@

public class TestReaper extends TestSetup {

private static final Long SEARCH_KEY_1 = 1L;
private static final Long SEARCH_KEY_2 = 2L;

private DBBackedQueue<BusEventModelDao> queue;
private PersistentBusSqlDao sqlDao;
Expand Down
20 changes: 15 additions & 5 deletions queue/src/test/java/org/killbill/queue/TestReaperIntegration.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.killbill.TestSetup;
import org.killbill.bus.DefaultPersistentBus;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.BusEventWithMetadata;
import org.killbill.bus.api.PersistentBus.EventBusException;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.bus.dao.BusEventModelDao;
Expand All @@ -39,6 +40,7 @@
import org.killbill.commons.eventbus.Subscribe;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.skife.config.TimeSpan;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -164,9 +166,17 @@ public void testWithStuckEntryProcessedByThisNode() throws EventBusException, Js

// Trigger reaper
clock.addDeltaFromReality(config.getReapThreshold().getMillis());
// It's a no-op though (it won't reap itself)
handler.ensureNotSeen(event2);
handler.assertSeenEvents(2);
// Give a chance to the reaper to run
Thread.sleep(500);

clock.addDeltaFromReality(1000);
handler.waitFor(event2);

// See https://github.com/killbill/killbill-commons/issues/169
handler.assertSeenEvents(3);
final Iterable<BusEventWithMetadata<BusEvent>> result = bus.getHistoricalBusEventsForSearchKey2(now, SEARCH_KEY_2);
final long nbItems = result.spliterator().getExactSizeIfKnown();
assertEquals(nbItems, 4);
}


Expand Down Expand Up @@ -278,8 +288,8 @@ public DummyEvent(@JsonProperty("name") final String name,

public DummyEvent() {
this(UUID.randomUUID().toString(),
System.currentTimeMillis(),
System.currentTimeMillis(),
SEARCH_KEY_1,
SEARCH_KEY_2,
UUID.randomUUID());
}

Expand Down

0 comments on commit 158f874

Please sign in to comment.