Skip to content

Commit

Permalink
Handle container cleanup from ActivationClient shutdown gracefully (#…
Browse files Browse the repository at this point in the history
…5348)

* Fix the regression

* Apply scalaFmt

* Fix test cases

* Make the MemoryQueueTests stable

* Make the ActivationClientProxyTests stable
  • Loading branch information
style95 authored Nov 4, 2022
1 parent 077fb6d commit 44791f3
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.openwhisk.core.containerpool.v2

import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash}
import akka.actor.{ActorSystem, FSM, Props, Stash}
import akka.grpc.internal.ClientClosedException
import akka.pattern.pipe
import io.grpc.StatusRuntimeException
Expand All @@ -36,7 +36,7 @@ import scala.concurrent.Future
import scala.util.{Success, Try}

// Event send by the actor
case class ClientCreationCompleted(client: Option[ActorRef] = None)
case object ClientCreationCompleted
case object ClientClosed

// Event received by the actor
Expand Down Expand Up @@ -91,12 +91,14 @@ class ActivationClientProxy(
stay using r

case Event(client: ActivationClient, _) =>
context.parent ! ClientCreationCompleted()
context.parent ! ClientCreationCompleted

goto(ClientProxyReady) using Client(client.client, client.rpcHost, client.rpcPort)

case Event(f: FailureMessage, _) =>
logging.error(this, s"failed to create grpc client for ${action} caused by: $f")
context.parent ! f

self ! ClientClosed

goto(ClientProxyRemoving)
Expand Down Expand Up @@ -164,9 +166,12 @@ class ActivationClientProxy(
stay()

case _: ActionMismatch =>
logging.error(this, s"[${containerId.asString}] action version does not match: $action")
val errorMsg = s"[${containerId.asString}] action version does not match: $action"
logging.error(this, errorMsg)
c.activationClient.close().andThen {
case _ => self ! ClientClosed
case _ =>
context.parent ! FailureMessage(new RuntimeException(errorMsg))
self ! ClientClosed
}

goto(ClientProxyRemoving)
Expand Down Expand Up @@ -194,6 +199,7 @@ class ActivationClientProxy(
// it would print huge log due to create another grpcClient to fetch activation again.
case t: StatusRuntimeException if t.getMessage.contains(ActivationClientProxy.hostResolveError) =>
logging.error(this, s"[${containerId.asString}] akka grpc server connection failed: $t")
context.parent ! FailureMessage(t)
self ! ClientClosed

goto(ClientProxyRemoving)
Expand All @@ -208,14 +214,18 @@ class ActivationClientProxy(

stay()

case _: ClientClosedException =>
case t: ClientClosedException =>
logging.error(this, s"[${containerId.asString}] grpc client is already closed for $action")
context.parent ! FailureMessage(t)

self ! ClientClosed

goto(ClientProxyRemoving)

case t: Throwable =>
logging.error(this, s"[${containerId.asString}] get activation from remote server error: $t")
context.parent ! FailureMessage(t)

safelyCloseClient(c)
goto(ClientProxyRemoving)
}
Expand Down Expand Up @@ -372,7 +382,7 @@ class ActivationClientProxy(
logging.debug(this, s"grpc client is closed for $fqn in the Try closure")
Future.successful(ClientClosed)
}
.getOrElse(Future.failed(new Exception(s"error to get $fqn activation from grpc server")))
.getOrElse(Future.failed(new RuntimeException(s"error to get $fqn activation from grpc server")))
}

private def createActivationClient(invocationNamespace: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ class FunctionPullingContainerProxy(
job.rpcPort,
container.containerId)) match {
case Success(clientProxy) =>
clientProxy ! StartClient
ContainerCreatedData(container, job.invocationNamespace, job.action)
InitializedData(container, job.invocationNamespace, job.action, clientProxy)
case Failure(t) =>
logging.error(this, s"failed to create activation client caused by: $t")
ClientCreationFailed(t, container, job.invocationNamespace, job.action)
Expand Down Expand Up @@ -303,7 +302,7 @@ class FunctionPullingContainerProxy(
// prewarmed state, container created
when(ContainerCreated) {
case Event(job: Initialize, data: PreWarmData) =>
Try(
val res = Try(
clientProxyFactory(
context,
job.invocationNamespace,
Expand All @@ -313,13 +312,15 @@ class FunctionPullingContainerProxy(
job.rpcPort,
data.container.containerId)) match {
case Success(proxy) =>
proxy ! StartClient
InitializedData(data.container, job.invocationNamespace, job.action, proxy)
case Failure(t) =>
logging.error(this, s"failed to create activation client for ${job.action} caused by: $t")
self ! ClientCreationFailed(t, data.container, job.invocationNamespace, job.action)
ClientCreationFailed(t, data.container, job.invocationNamespace, job.action)
}

goto(CreatingClient) using ContainerCreatedData(data.container, job.invocationNamespace, job.action)
self ! res

goto(CreatingClient)

case Event(Remove, data: PreWarmData) =>
cleanUp(data.container, None, false)
Expand All @@ -334,41 +335,27 @@ class FunctionPullingContainerProxy(

when(CreatingClient) {
// wait for client creation when cold start
case Event(job: ContainerCreatedData, _: NonexistentData) =>
stay() using job
case Event(job: InitializedData, _) =>
job.clientProxy ! StartClient

// wait for container creation when cold start
case Event(ClientCreationCompleted(proxy), _: NonexistentData) =>
akka.pattern.after(3.milliseconds, actorSystem.scheduler) {
self ! ClientCreationCompleted(proxy.orElse(Some(sender())))
Future.successful({})
}

stay()
stay() using job

// client was successfully obtained
case Event(ClientCreationCompleted(proxy), data: ContainerCreatedData) =>
val clientProxy = proxy.getOrElse(sender())
case Event(ClientCreationCompleted, data: InitializedData) =>
val fqn = data.action.fullyQualifiedName(true)
val revision = data.action.rev
dataManagementService ! RegisterData(
s"${ContainerKeys.existingContainers(data.invocationNamespace, fqn, revision, Some(instance), Some(data.container.containerId))}",
"")
self ! InitializedData(data.container, data.invocationNamespace, data.action, clientProxy)
self ! data
goto(ClientCreated)

// client creation failed
case Event(t: ClientCreationFailed, _) =>
invokerHealthManager ! HealthMessage(state = false)
cleanUp(t.container, t.invocationNamespace, t.action.fullyQualifiedName(withVersion = true), t.action.rev, None)

// there can be a case that client create is failed and a ClientClosed will be sent by ActivationClientProxy
// wait for container creation when cold start
case Event(ClientClosed, _: NonexistentData) =>
self ! ClientClosed
stay()

case Event(ClientClosed, data: ContainerCreatedData) =>
case Event(ClientClosed, data: InitializedData) =>
invokerHealthManager ! HealthMessage(state = false)
cleanUp(
data.container,
Expand All @@ -378,7 +365,7 @@ class FunctionPullingContainerProxy(
None)

// container creation failed when cold start
case Event(t: FailureMessage, _) =>
case Event(_: FailureMessage, _) =>
context.parent ! ContainerRemoved(true)
stop()

Expand Down Expand Up @@ -518,6 +505,8 @@ class FunctionPullingContainerProxy(
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
Some(data.clientProxy))

case x: Event if x.event != PingCache => delay
}

when(Running) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ case class HealthActivationServiceClient() extends Actor {
private var closed: Boolean = false

override def receive: Receive = {
case StartClient => sender() ! ClientCreationCompleted()
case StartClient => sender() ! ClientCreationCompleted
case _: RequestActivation =>
InvokerHealthManager.healthActivation match {
case Some(activation) if !closed =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class SchedulingDecisionMaker(
this,
s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, namespaceContainers: ${existingContainerCountInNs}, namespaceInProgressContainer: ${inProgressContainerCountInNs}) [$invocationNamespace:$action]")
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0))
case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
case NamespaceThrottled
if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
// do nothing
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.openwhisk.core.containerpool.v2.test

import akka.Done
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
import akka.actor.Status.Failure
import akka.actor.{ActorRef, ActorSystem}
import akka.grpc.internal.ClientClosedException
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
Expand Down Expand Up @@ -103,7 +104,7 @@ class ActivationClientProxyTests

machine ! StartClient

probe.expectMsg(ClientCreationCompleted())
probe.expectMsg(ClientCreationCompleted)
probe.expectMsg(Transition(machine, ClientProxyUninitialized, ClientProxyReady))
}

Expand All @@ -124,6 +125,9 @@ class ActivationClientProxyTests

machine ! StartClient

probe.expectMsgPF() {
case Failure(t) => t.getMessage shouldBe "The number of client creation retries has been exceeded."
}
probe.expectMsg(Transition(machine, ClientProxyUninitialized, ClientProxyRemoving))
probe.expectMsg(ClientClosed)

Expand Down Expand Up @@ -208,7 +212,14 @@ class ActivationClientProxyTests
ready(machine, probe)

machine ! RequestActivation()
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))

inAnyOrder {
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
probe.expectMsgPF() {
case Failure(t) => t.getMessage.contains(s"action version does not match") shouldBe true
}
}

probe.expectMsg(ClientClosed)

probe expectTerminated machine
Expand Down Expand Up @@ -319,7 +330,11 @@ class ActivationClientProxyTests
ready(machine, probe)

machine ! RequestActivation()
probe.expectMsgPF() {
case Failure(t) => t.isInstanceOf[ClientClosedException] shouldBe true
}
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))

probe.expectMsg(ClientClosed)

probe expectTerminated machine
Expand All @@ -343,6 +358,9 @@ class ActivationClientProxyTests
ready(machine, probe)

machine ! RequestActivation()
probe.expectMsgPF() {
case Failure(t) => t.getMessage.contains("Unknown exception") shouldBe true
}
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
probe.expectMsg(ClientClosed)

Expand Down Expand Up @@ -426,7 +444,7 @@ class ActivationClientProxyTests

def ready(machine: ActorRef, probe: TestProbe) = {
machine ! StartClient
probe.expectMsg(ClientCreationCompleted())
probe.expectMsg(ClientCreationCompleted)
probe.expectMsg(Transition(machine, ClientProxyUninitialized, ClientProxyReady))
}

Expand Down
Loading

0 comments on commit 44791f3

Please sign in to comment.