Skip to content

Commit

Permalink
Send a queue removed message to the queue manager (#5391)
Browse files Browse the repository at this point in the history
* Send a queue removed message to the queue manager when cleaning up the actor and data

* Add one more missing part

* Fix test cases
  • Loading branch information
style95 authored Apr 4, 2023
1 parent fedf022 commit cb3b64f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -699,13 +699,17 @@ class MemoryQueue(private val etcdClient: EtcdClient,
cleanUpWatcher()
cleanUpData()

context.parent ! queueRemovedMsg

goto(Removed) using NoData()
}

private def cleanUpActorsAndGotoRemoved(data: FlushingData) = {
cleanUpActors(data)
cleanUpData()

context.parent ! queueRemovedMsg

goto(Removed) using NoData()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,13 +654,14 @@ class MemoryQueueTests
fsm ! StateTimeout
expectMsg(Transition(fsm, Idle, Removed))
queueRef.queue.length shouldBe 0
parent.expectMsg(queueRemovedMsg)

fsm ! message

// queue is timed out again in the Removed state.
parent.expectMsg(message)

fsm ! StateTimeout
parent.expectMsg(queueRemovedMsg)

expectNoMessage()

Expand Down Expand Up @@ -1103,10 +1104,7 @@ class MemoryQueueTests
fsm ! message
probe.expectMsg(ActivationResponse.developerError("nonExecutbleAction error"))

parent.expectMsgAnyOf(
2 * queueConfig.flushGrace + 5.seconds,
QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)),
Transition(fsm, Flushing, Removed))
parent.expectMsgAllOf(2 * queueConfig.flushGrace + 5.seconds, queueRemovedMsg, Transition(fsm, Flushing, Removed))

fsm ! StateTimeout
parent.expectMsg(queueRemovedMsg)
Expand Down Expand Up @@ -1409,10 +1407,7 @@ class MemoryQueueTests

val duration = FiniteDuration(queueConfig.maxBlackboxRetentionMs, MILLISECONDS) + queueConfig.flushGrace
probe.expectMsg(duration, ActivationResponse.whiskError("no available invokers"))
parent.expectMsgAnyOf(
duration,
QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)),
Transition(fsm, Flushing, Removed))
parent.expectMsgAllOf(duration, queueRemovedMsg, Transition(fsm, Flushing, Removed))
fsm ! QueueRemovedCompleted
parent.expectTerminated(fsm)

Expand Down

0 comments on commit cb3b64f

Please sign in to comment.