diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala index bb8a1b572a0..c3e1ee21947 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -699,6 +699,8 @@ class MemoryQueue(private val etcdClient: EtcdClient, cleanUpWatcher() cleanUpData() + context.parent ! queueRemovedMsg + goto(Removed) using NoData() } @@ -706,6 +708,8 @@ class MemoryQueue(private val etcdClient: EtcdClient, cleanUpActors(data) cleanUpData() + context.parent ! queueRemovedMsg + goto(Removed) using NoData() } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala index c2c5ae34003..f9ff811159a 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala @@ -654,13 +654,14 @@ class MemoryQueueTests fsm ! StateTimeout expectMsg(Transition(fsm, Idle, Removed)) queueRef.queue.length shouldBe 0 + parent.expectMsg(queueRemovedMsg) + fsm ! message // queue is timed out again in the Removed state. parent.expectMsg(message) fsm ! StateTimeout - parent.expectMsg(queueRemovedMsg) expectNoMessage() @@ -1103,10 +1104,7 @@ class MemoryQueueTests fsm ! message probe.expectMsg(ActivationResponse.developerError("nonExecutbleAction error")) - parent.expectMsgAnyOf( - 2 * queueConfig.flushGrace + 5.seconds, - QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)), - Transition(fsm, Flushing, Removed)) + parent.expectMsgAllOf(2 * queueConfig.flushGrace + 5.seconds, queueRemovedMsg, Transition(fsm, Flushing, Removed)) fsm ! StateTimeout parent.expectMsg(queueRemovedMsg) @@ -1409,10 +1407,7 @@ class MemoryQueueTests val duration = FiniteDuration(queueConfig.maxBlackboxRetentionMs, MILLISECONDS) + queueConfig.flushGrace probe.expectMsg(duration, ActivationResponse.whiskError("no available invokers")) - parent.expectMsgAnyOf( - duration, - QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)), - Transition(fsm, Flushing, Removed)) + parent.expectMsgAllOf(duration, queueRemovedMsg, Transition(fsm, Flushing, Removed)) fsm ! QueueRemovedCompleted parent.expectTerminated(fsm)