Skip to content

Commit

Permalink
Delete ETCD data first when disabling the invoker (#5333)
Browse files Browse the repository at this point in the history
* Delete ETCD data first when disabling the invoker

* Add the cluster name to controllers and invokers

* Handle unhandled message in the Removing state
  • Loading branch information
style95 authored Oct 14, 2022
1 parent 236ca5e commit 145971b
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 19 deletions.
1 change: 1 addition & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@

"CONFIG_whisk_info_date": "{{ whisk.version.date }}"
"CONFIG_whisk_info_buildNo": "{{ docker.image.tag }}"
"CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}"

"KAFKA_HOSTS": "{{ kafka_connect_string }}"
"CONFIG_whisk_kafka_replicationFactor":
Expand Down
1 change: 1 addition & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@
"CONFIG_whisk_containerPool_batchDeletionSize": "{{ container_pool_batchDeletionSize | default(10) }}"
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"
"CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}"

- name: extend invoker dns env
set_fact:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,7 @@ class FunctionPullingContainerProxy(
if (runningActivations.isEmpty) {
logging.info(this, s"The Client closed in state: $stateName, action: ${data.action}")
// Stop ContainerProxy(ActivationClientProxy will stop also when send ClientClosed to ContainerProxy).
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
None)
cleanUp(data.container, None, false)
} else {
logging.info(
this,
Expand All @@ -624,6 +619,15 @@ class FunctionPullingContainerProxy(
// ContainerProxy will be terminated by StateTimeout if there is no further activation
case Event(GracefulShutdown, data: WarmData) =>
logging.info(this, s"receive GracefulShutdown for action: ${data.action}")
// clean up the etcd data first so that the scheduler can provision more containers in advance.
dataManagementService ! UnregisterData(
ContainerKeys.existingContainers(
data.invocationNamespace,
data.action.fullyQualifiedName(true),
data.action.rev,
Some(instance),
Some(data.container.containerId)))

// Just send CloseClientProxy to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time.
data.clientProxy ! CloseClientProxy
stay
Expand Down Expand Up @@ -765,10 +769,14 @@ class FunctionPullingContainerProxy(
case Event(StateTimeout, _) =>
logging.error(this, s"could not receive ClientClosed for ${unusedTimeout}, so just stop the container proxy.")

stop
stop()

case Event(Remove | GracefulShutdown, _) =>
stay()


case Event(DetermineKeepContainer(_), _) =>
stay()
}

whenUnhandled {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@

package org.apache.openwhisk.core.containerpool.v2.test

import java.net.InetSocketAddress
import java.time.Instant
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.FSM.{CurrentState, StateTimeout, SubscribeTransitionCallBack, Transition}
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.http.scaladsl.model
import akka.io.Tcp.Connect
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import akka.testkit.{ImplicitSender, TestFSMRef, TestKit, TestProbe}
import akka.util.ByteString
import com.ibm.etcd.api.{DeleteRangeResponse, KeyValue, PutResponse}
import com.ibm.etcd.client.{EtcdClient => Client}
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction}
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage}
import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
Expand All @@ -51,7 +46,7 @@ import org.apache.openwhisk.core.database.{ArtifactStore, StaleParameter, UserCo
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.types.AuthStore
import org.apache.openwhisk.core.entity.{ExecutableWhiskAction, _}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys
import org.apache.openwhisk.core.etcd.EtcdType._
Expand All @@ -65,6 +60,10 @@ import org.scalatest.{Assertion, BeforeAndAfterAll, FlatSpecLike, Matchers}
import spray.json.DefaultJsonProtocol._
import spray.json.{JsObject, _}

import java.net.InetSocketAddress
import java.time.Instant
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
import scala.collection.mutable.{Map => MutableMap}
import scala.concurrent.duration._
Expand Down Expand Up @@ -291,8 +290,9 @@ class FunctionPullingContainerProxyTests
Future.successful(count)
}

def getLiveContainerCountFail(count: Long) = LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
Future.failed(new Exception("failure"))
def getLiveContainerCountFail(count: Long) = LoggedFunction {
(_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
Future.failed(new Exception("failure"))
}

def getLiveContainerCountFailFirstCall(count: Long) = {
Expand Down Expand Up @@ -961,7 +961,7 @@ class FunctionPullingContainerProxyTests
}
client.send(machine, ClientClosed)

probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing))
probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing))

awaitAssert {
factory.calls should have size 1
Expand Down Expand Up @@ -1137,7 +1137,8 @@ class FunctionPullingContainerProxyTests
}
}

it should "destroy container proxy when stopping due to timeout and getting live count fails permanently" in within(timeout) {
it should "destroy container proxy when stopping due to timeout and getting live count fails permanently" in within(
timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
Expand Down Expand Up @@ -1532,6 +1533,96 @@ class FunctionPullingContainerProxyTests
}
}

it should "remove the ETCD data first when disabling the container proxy" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient

val instanceId = InvokerInstanceId(0, userMemory = defaultUserMemory)
val probe = TestProbe()
val machine =
TestFSMRef(
new FunctionPullingContainerProxy(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
instanceId,
invokerHealthManager.ref,
poolConfig,
timeoutConfig,
healthchecksConfig(),
None),
probe.ref)

registerCallback(machine, probe)

machine ! Initialize(invocationNamespace.asString, fqn, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())

val containerId = machine.underlyingActor.stateData.getContainer match {
case Some(container) => container.containerId
case None => ContainerId("")
}

dataManagementService.expectMsg(RegisterData(
s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId))}",
""))

probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)

probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
client.send(machine, message)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
machine ! GracefulShutdown

dataManagementService.expectMsg(
UnregisterData(ContainerKeys
.existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId))))

client.expectMsg(CloseClientProxy)
client.send(machine, ClientClosed)

probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing))

awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 2
collector.calls.length shouldBe 2
container.destroyCount shouldBe 1
acker.calls.length shouldBe 2
store.calls.length shouldBe 2
}
}

it should "pause itself when timeout and recover when got a new Initialize" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
Expand Down

0 comments on commit 145971b

Please sign in to comment.