Skip to content

Commit

Permalink
Merge pull request #170 from killbill/fix-for-169
Browse files Browse the repository at this point in the history
queue: Allow stuck entries (same node) to be reaped. See #169
  • Loading branch information
sbrossie authored Apr 12, 2024
2 parents f471f14 + 158f874 commit cb648bc
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 29 deletions.
20 changes: 4 additions & 16 deletions queue/src/main/java/org/killbill/queue/DBBackedQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,39 +282,27 @@ public Void inTransaction(final QueueSqlDao<T> transactional, final TransactionS
return null;
}

final Collection<T> entriesToMove = new ArrayList<T>(entriesLeftBehind.size());
final List<T> entriesToReInsert = new ArrayList<T>(entriesLeftBehind.size());
final List<T> stuckEntries = new LinkedList<T>();
final List<T> lateEntries = new LinkedList<T>();
for (final T entryLeftBehind : entriesLeftBehind) {
// entryIsBeingProcessedByThisNode is a sign of a stuck entry on this node
final boolean entryIsBeingProcessedByThisNode = owner.equals(entryLeftBehind.getProcessingOwner());
// entryCreatedByThisNodeAndNeverProcessed is likely a sign of the queue being late
final boolean entryCreatedByThisNodeAndNeverProcessed = owner.equals(entryLeftBehind.getCreatingOwner()) && entryLeftBehind.getProcessingOwner() == null;
if (entryIsBeingProcessedByThisNode) {
// See https://github.com/killbill/killbill-commons/issues/47
stuckEntries.add(entryLeftBehind);
} else if (entryCreatedByThisNodeAndNeverProcessed) {
if (entryCreatedByThisNodeAndNeverProcessed) {
lateEntries.add(entryLeftBehind);
} else {
// Fields will be reset appropriately in insertReapedEntriesFromTransaction
entriesToReInsert.add(entryLeftBehind);

} else { /* This includes entryIsBeingProcessedByThisNode (owner.equals(entryLeftBehind.getProcessingOwner())). See https://github.com/killbill/killbill-commons/issues/169 */
// Set the status to REAPED in the history table
entryLeftBehind.setProcessingState(PersistentQueueEntryLifecycleState.REAPED);
entriesToMove.add(entryLeftBehind);
entriesToReInsert.add(entryLeftBehind);
}
}

if (!stuckEntries.isEmpty()) {
log.warn("{} reapEntries: stuck queue entries {}", DB_QUEUE_LOG_ID, stuckEntries);
}
if (!lateEntries.isEmpty()) {
log.warn("{} reapEntries: late queue entries {}", DB_QUEUE_LOG_ID, lateEntries);
}

if (!entriesToReInsert.isEmpty()) {
moveEntriesToHistoryFromTransaction(transactional, entriesToMove);
moveEntriesToHistoryFromTransaction(transactional, entriesToReInsert);
insertReapedEntriesFromTransaction(transactional, entriesToReInsert, now);
log.warn("{} reapEntries: {} entries were reaped by {} {}",
DB_QUEUE_LOG_ID,
Expand Down
3 changes: 3 additions & 0 deletions queue/src/test/java/org/killbill/TestSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class TestSetup {

private static final String TEST_DB_PROPERTY_PREFIX = "org.killbill.billing.dbi.test.";

protected static final Long SEARCH_KEY_1 = 1L;
protected static final Long SEARCH_KEY_2 = 2L;

protected EmbeddedDB embeddedDB;

protected DBI dbi;
Expand Down
16 changes: 8 additions & 8 deletions queue/src/test/java/org/killbill/queue/TestReaper.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@

public class TestReaper extends TestSetup {

private static final Long SEARCH_KEY_1 = 1L;
private static final Long SEARCH_KEY_2 = 2L;

private DBBackedQueue<BusEventModelDao> queue;
private PersistentBusSqlDao sqlDao;
Expand Down Expand Up @@ -102,26 +100,28 @@ public void testReapEntries() {
queue.reapEntries(now.minus(config.getReapThreshold().getMillis()).toDate());

final List<BusEventModelDao> readyEntriesAfterReaping = sqlDao.getReadyEntries(now.toDate(), 10, CreatorName.get(), config.getTableName());
assertEquals(readyEntriesAfterReaping.size(), 2);
assertEquals(readyEntriesAfterReaping.size(), 3);
assertEquals(readyEntriesAfterReaping.get(0).getRecordId(), (Long) 1L);
assertTrue(readyEntriesAfterReaping.get(1).getRecordId() > (Long) 6L);
assertTrue(readyEntriesAfterReaping.get(2).getRecordId() > (Long) 6L);

final List<BusEventModelDao> readyOrInProcessingAfterReaping = Iterators.toUnmodifiableList(sqlDao.getReadyOrInProcessingQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getTableName()));
assertEquals(readyOrInProcessingAfterReaping.size(), 6);
assertEquals(readyOrInProcessingAfterReaping.get(0).getRecordId(), (Long) 1L);
assertEquals(readyOrInProcessingAfterReaping.get(1).getRecordId(), (Long) 2L);
assertEquals(readyOrInProcessingAfterReaping.get(2).getRecordId(), (Long) 3L);
assertEquals(readyOrInProcessingAfterReaping.get(3).getRecordId(), (Long) 4L);
// That stuck entry hasn't moved (https://github.com/killbill/killbill-commons/issues/47)
assertEquals(readyOrInProcessingAfterReaping.get(4).getRecordId(), (Long) 5L);
// New (reaped) one
// New (reaped) ones
assertTrue(readyOrInProcessingAfterReaping.get(4).getRecordId() > (Long) 6L);
assertTrue(readyOrInProcessingAfterReaping.get(5).getRecordId() > (Long) 6L);

// Check history table
final List<BusEventModelDao> historicalQueueEntries = Iterators.toUnmodifiableList(sqlDao.getHistoricalQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getHistoryTableName()));
assertEquals(historicalQueueEntries.size(), 1);
assertEquals(historicalQueueEntries.size(), 2);
assertEquals(historicalQueueEntries.get(0).getProcessingState(), PersistentQueueEntryLifecycleState.REAPED);
assertEquals(historicalQueueEntries.get(0).getUserToken(), readyOrInProcessingAfterReaping.get(5).getUserToken());
assertEquals(historicalQueueEntries.get(0).getUserToken(), readyOrInProcessingAfterReaping.get(4).getUserToken());
assertEquals(historicalQueueEntries.get(1).getProcessingState(), PersistentQueueEntryLifecycleState.REAPED);
assertEquals(historicalQueueEntries.get(1).getUserToken(), readyOrInProcessingAfterReaping.get(5).getUserToken());
}

private BusEventModelDao createEntry(final long recordId,
Expand Down
20 changes: 15 additions & 5 deletions queue/src/test/java/org/killbill/queue/TestReaperIntegration.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.killbill.TestSetup;
import org.killbill.bus.DefaultPersistentBus;
import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.BusEventWithMetadata;
import org.killbill.bus.api.PersistentBus.EventBusException;
import org.killbill.bus.api.PersistentBusConfig;
import org.killbill.bus.dao.BusEventModelDao;
Expand All @@ -39,6 +40,7 @@
import org.killbill.commons.eventbus.Subscribe;
import org.killbill.queue.api.PersistentQueueEntryLifecycleState;
import org.skife.config.TimeSpan;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -164,9 +166,17 @@ public void testWithStuckEntryProcessedByThisNode() throws EventBusException, Js

// Trigger reaper
clock.addDeltaFromReality(config.getReapThreshold().getMillis());
// It's a no-op though (it won't reap itself)
handler.ensureNotSeen(event2);
handler.assertSeenEvents(2);
// Give a chance to the reaper to run
Thread.sleep(500);

clock.addDeltaFromReality(1000);
handler.waitFor(event2);

// See https://github.com/killbill/killbill-commons/issues/169
handler.assertSeenEvents(3);
final Iterable<BusEventWithMetadata<BusEvent>> result = bus.getHistoricalBusEventsForSearchKey2(now, SEARCH_KEY_2);
final long nbItems = result.spliterator().getExactSizeIfKnown();
assertEquals(nbItems, 4);
}


Expand Down Expand Up @@ -278,8 +288,8 @@ public DummyEvent(@JsonProperty("name") final String name,

public DummyEvent() {
this(UUID.randomUUID().toString(),
System.currentTimeMillis(),
System.currentTimeMillis(),
SEARCH_KEY_1,
SEARCH_KEY_2,
UUID.randomUUID());
}

Expand Down

0 comments on commit cb648bc

Please sign in to comment.