Skip to content

Commit

Permalink
queue: Allow stuck entries (same node) to be reaped. See #169
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrossie committed Apr 12, 2024
1 parent f471f14 commit 91440a7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 21 deletions.
19 changes: 4 additions & 15 deletions queue/src/main/java/org/killbill/queue/DBBackedQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,39 +282,28 @@ public Void inTransaction(final QueueSqlDao<T> transactional, final TransactionS
return null;
}

final Collection<T> entriesToMove = new ArrayList<T>(entriesLeftBehind.size());
final List<T> entriesToReInsert = new ArrayList<T>(entriesLeftBehind.size());
final List<T> stuckEntries = new LinkedList<T>();
final List<T> lateEntries = new LinkedList<T>();
for (final T entryLeftBehind : entriesLeftBehind) {
// entryIsBeingProcessedByThisNode is a sign of a stuck entry on this node
final boolean entryIsBeingProcessedByThisNode = owner.equals(entryLeftBehind.getProcessingOwner());
// entryCreatedByThisNodeAndNeverProcessed is likely a sign of the queue being late
final boolean entryCreatedByThisNodeAndNeverProcessed = owner.equals(entryLeftBehind.getCreatingOwner()) && entryLeftBehind.getProcessingOwner() == null;
if (entryIsBeingProcessedByThisNode) {
// See https://github.com/killbill/killbill-commons/issues/47
stuckEntries.add(entryLeftBehind);
} else if (entryCreatedByThisNodeAndNeverProcessed) {
if (entryCreatedByThisNodeAndNeverProcessed) {
lateEntries.add(entryLeftBehind);
} else {
// Fields will be reset appropriately in insertReapedEntriesFromTransaction
entriesToReInsert.add(entryLeftBehind);

} else { /* This includes entryIsBeingProcessedByThisNode. See https://github.com/killbill/killbill-commons/issues/169 */
// Set the status to REAPED in the history table
entryLeftBehind.setProcessingState(PersistentQueueEntryLifecycleState.REAPED);
entriesToMove.add(entryLeftBehind);
entriesToReInsert.add(entryLeftBehind);
}
}

if (!stuckEntries.isEmpty()) {
log.warn("{} reapEntries: stuck queue entries {}", DB_QUEUE_LOG_ID, stuckEntries);
}
if (!lateEntries.isEmpty()) {
log.warn("{} reapEntries: late queue entries {}", DB_QUEUE_LOG_ID, lateEntries);
}

if (!entriesToReInsert.isEmpty()) {
moveEntriesToHistoryFromTransaction(transactional, entriesToMove);
moveEntriesToHistoryFromTransaction(transactional, entriesToReInsert);
insertReapedEntriesFromTransaction(transactional, entriesToReInsert, now);
log.warn("{} reapEntries: {} entries were reaped by {} {}",
DB_QUEUE_LOG_ID,
Expand Down
14 changes: 8 additions & 6 deletions queue/src/test/java/org/killbill/queue/TestReaper.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,28 @@ public void testReapEntries() {
queue.reapEntries(now.minus(config.getReapThreshold().getMillis()).toDate());

final List<BusEventModelDao> readyEntriesAfterReaping = sqlDao.getReadyEntries(now.toDate(), 10, CreatorName.get(), config.getTableName());
assertEquals(readyEntriesAfterReaping.size(), 2);
assertEquals(readyEntriesAfterReaping.size(), 3);
assertEquals(readyEntriesAfterReaping.get(0).getRecordId(), (Long) 1L);
assertTrue(readyEntriesAfterReaping.get(1).getRecordId() > (Long) 6L);
assertTrue(readyEntriesAfterReaping.get(2).getRecordId() > (Long) 6L);

final List<BusEventModelDao> readyOrInProcessingAfterReaping = Iterators.toUnmodifiableList(sqlDao.getReadyOrInProcessingQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getTableName()));
assertEquals(readyOrInProcessingAfterReaping.size(), 6);
assertEquals(readyOrInProcessingAfterReaping.get(0).getRecordId(), (Long) 1L);
assertEquals(readyOrInProcessingAfterReaping.get(1).getRecordId(), (Long) 2L);
assertEquals(readyOrInProcessingAfterReaping.get(2).getRecordId(), (Long) 3L);
assertEquals(readyOrInProcessingAfterReaping.get(3).getRecordId(), (Long) 4L);
// That stuck entry hasn't moved (https://github.com/killbill/killbill-commons/issues/47)
assertEquals(readyOrInProcessingAfterReaping.get(4).getRecordId(), (Long) 5L);
// New (reaped) one
// New (reaped) ones
assertTrue(readyOrInProcessingAfterReaping.get(4).getRecordId() > (Long) 6L);
assertTrue(readyOrInProcessingAfterReaping.get(5).getRecordId() > (Long) 6L);

// Check history table
final List<BusEventModelDao> historicalQueueEntries = Iterators.toUnmodifiableList(sqlDao.getHistoricalQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getHistoryTableName()));
assertEquals(historicalQueueEntries.size(), 1);
assertEquals(historicalQueueEntries.size(), 2);
assertEquals(historicalQueueEntries.get(0).getProcessingState(), PersistentQueueEntryLifecycleState.REAPED);
assertEquals(historicalQueueEntries.get(0).getUserToken(), readyOrInProcessingAfterReaping.get(5).getUserToken());
assertEquals(historicalQueueEntries.get(0).getUserToken(), readyOrInProcessingAfterReaping.get(4).getUserToken());
assertEquals(historicalQueueEntries.get(1).getProcessingState(), PersistentQueueEntryLifecycleState.REAPED);
assertEquals(historicalQueueEntries.get(1).getUserToken(), readyOrInProcessingAfterReaping.get(5).getUserToken());
}

private BusEventModelDao createEntry(final long recordId,
Expand Down

0 comments on commit 91440a7

Please sign in to comment.