diff --git a/queue/src/main/java/org/killbill/queue/DBBackedQueue.java b/queue/src/main/java/org/killbill/queue/DBBackedQueue.java index afde91d4..2b779ea2 100644 --- a/queue/src/main/java/org/killbill/queue/DBBackedQueue.java +++ b/queue/src/main/java/org/killbill/queue/DBBackedQueue.java @@ -282,39 +282,28 @@ public Void inTransaction(final QueueSqlDao transactional, final TransactionS return null; } - final Collection entriesToMove = new ArrayList(entriesLeftBehind.size()); final List entriesToReInsert = new ArrayList(entriesLeftBehind.size()); - final List stuckEntries = new LinkedList(); final List lateEntries = new LinkedList(); for (final T entryLeftBehind : entriesLeftBehind) { // entryIsBeingProcessedByThisNode is a sign of a stuck entry on this node final boolean entryIsBeingProcessedByThisNode = owner.equals(entryLeftBehind.getProcessingOwner()); // entryCreatedByThisNodeAndNeverProcessed is likely a sign of the queue being late final boolean entryCreatedByThisNodeAndNeverProcessed = owner.equals(entryLeftBehind.getCreatingOwner()) && entryLeftBehind.getProcessingOwner() == null; - if (entryIsBeingProcessedByThisNode) { - // See https://github.com/killbill/killbill-commons/issues/47 - stuckEntries.add(entryLeftBehind); - } else if (entryCreatedByThisNodeAndNeverProcessed) { + if (entryCreatedByThisNodeAndNeverProcessed) { lateEntries.add(entryLeftBehind); - } else { - // Fields will be reset appropriately in insertReapedEntriesFromTransaction - entriesToReInsert.add(entryLeftBehind); - + } else { /* This includes entryIsBeingProcessedByThisNode. See https://github.com/killbill/killbill-commons/issues/169 */ // Set the status to REAPED in the history table entryLeftBehind.setProcessingState(PersistentQueueEntryLifecycleState.REAPED); - entriesToMove.add(entryLeftBehind); + entriesToReInsert.add(entryLeftBehind); } } - if (!stuckEntries.isEmpty()) { - log.warn("{} reapEntries: stuck queue entries {}", DB_QUEUE_LOG_ID, stuckEntries); - } if (!lateEntries.isEmpty()) { log.warn("{} reapEntries: late queue entries {}", DB_QUEUE_LOG_ID, lateEntries); } if (!entriesToReInsert.isEmpty()) { - moveEntriesToHistoryFromTransaction(transactional, entriesToMove); + moveEntriesToHistoryFromTransaction(transactional, entriesToReInsert); insertReapedEntriesFromTransaction(transactional, entriesToReInsert, now); log.warn("{} reapEntries: {} entries were reaped by {} {}", DB_QUEUE_LOG_ID, diff --git a/queue/src/test/java/org/killbill/queue/TestReaper.java b/queue/src/test/java/org/killbill/queue/TestReaper.java index 5bad29c9..1be72c40 100644 --- a/queue/src/test/java/org/killbill/queue/TestReaper.java +++ b/queue/src/test/java/org/killbill/queue/TestReaper.java @@ -102,9 +102,10 @@ public void testReapEntries() { queue.reapEntries(now.minus(config.getReapThreshold().getMillis()).toDate()); final List readyEntriesAfterReaping = sqlDao.getReadyEntries(now.toDate(), 10, CreatorName.get(), config.getTableName()); - assertEquals(readyEntriesAfterReaping.size(), 2); + assertEquals(readyEntriesAfterReaping.size(), 3); assertEquals(readyEntriesAfterReaping.get(0).getRecordId(), (Long) 1L); assertTrue(readyEntriesAfterReaping.get(1).getRecordId() > (Long) 6L); + assertTrue(readyEntriesAfterReaping.get(2).getRecordId() > (Long) 6L); final List readyOrInProcessingAfterReaping = Iterators.toUnmodifiableList(sqlDao.getReadyOrInProcessingQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getTableName())); assertEquals(readyOrInProcessingAfterReaping.size(), 6); @@ -112,16 +113,17 @@ public void testReapEntries() { assertEquals(readyOrInProcessingAfterReaping.get(1).getRecordId(), (Long) 2L); assertEquals(readyOrInProcessingAfterReaping.get(2).getRecordId(), (Long) 3L); assertEquals(readyOrInProcessingAfterReaping.get(3).getRecordId(), (Long) 4L); - // That stuck entry hasn't moved (https://github.com/killbill/killbill-commons/issues/47) - assertEquals(readyOrInProcessingAfterReaping.get(4).getRecordId(), (Long) 5L); - // New (reaped) one + // New (reaped) ones + assertTrue(readyOrInProcessingAfterReaping.get(4).getRecordId() > (Long) 6L); assertTrue(readyOrInProcessingAfterReaping.get(5).getRecordId() > (Long) 6L); // Check history table final List historicalQueueEntries = Iterators.toUnmodifiableList(sqlDao.getHistoricalQueueEntriesForSearchKeys(SEARCH_KEY_1, SEARCH_KEY_2, config.getHistoryTableName())); - assertEquals(historicalQueueEntries.size(), 1); + assertEquals(historicalQueueEntries.size(), 2); assertEquals(historicalQueueEntries.get(0).getProcessingState(), PersistentQueueEntryLifecycleState.REAPED); - assertEquals(historicalQueueEntries.get(0).getUserToken(), readyOrInProcessingAfterReaping.get(5).getUserToken()); + assertEquals(historicalQueueEntries.get(0).getUserToken(), readyOrInProcessingAfterReaping.get(4).getUserToken()); + assertEquals(historicalQueueEntries.get(1).getProcessingState(), PersistentQueueEntryLifecycleState.REAPED); + assertEquals(historicalQueueEntries.get(1).getUserToken(), readyOrInProcessingAfterReaping.get(5).getUserToken()); } private BusEventModelDao createEntry(final long recordId,