diff --git a/ansible/group_vars/all b/ansible/group_vars/all index bac6e237280..072e4aebb15 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -112,6 +112,8 @@ controller: authentication: spi: "{{ controller_authentication_spi | default('') }}" loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}" + username: "{{ controller_username | default('controller.user') }}" + password: "{{ controller_password | default('controller.pass') }}" entitlement: spi: "{{ controller_entitlement_spi | default('') }}" protocol: "{{ controller_protocol | default('https') }}" @@ -126,6 +128,10 @@ controller: password: "openwhisk" name: "{{ __controller_ssl_keyPrefix }}openwhisk-keystore.p12" extraEnv: "{{ controller_extraEnv | default({}) }}" + deployment: + ignore_error: "{{ controller_deployment_ignore_error | default('False') }}" + retries: "{{ controller_deployment_retries | default(180) }}" + delay: "{{ controller_deployment_delay | default(5) }}" jmx: basePortController: 15000 @@ -234,6 +240,10 @@ invoker: creationMaxPeek: "{{ container_creation_max_peek | default(500) }}" reactiveSpi: "{{ invokerReactive_spi | default('') }}" serverSpi: "{{ invokerServer_spi | default('') }}" + deployment: + ignore_error: "{{ invoker_deployment_ignore_error | default('False') }}" + retries: "{{ invoker_deployment_retries | default(180) }}" + delay: "{{ invoker_deployment_delay | default(5) }}" userLogs: spi: "{{ userLogs_spi | default('org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider') }}" @@ -450,8 +460,7 @@ metrics: user_events: "{{ user_events_enabled | default(false) | lower }}" zeroDowntimeDeployment: - enabled: "{{ zerodowntime_deployment_switch | default(true) }}" - solution: "{{ zerodowntime_deployment_solution | default('apicall') }}" + enabled: "{{ zerodowntime_deployment_switch | default(false) }}" etcd: version: "{{ etcd_version | default('v3.4.0') }}" diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 26d241ce667..5c46392a364 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -155,6 +155,8 @@ "CONFIG_whisk_info_date": "{{ whisk.version.date }}" "CONFIG_whisk_info_buildNo": "{{ docker.image.tag }}" "CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}" + "CONFIG_whisk_controller_username": "{{ controller.username }}" + "CONFIG_whisk_controller_password": "{{ controller.password }}" "KAFKA_HOSTS": "{{ kafka_connect_string }}" "CONFIG_whisk_kafka_replicationFactor": @@ -363,6 +365,53 @@ include_tasks: "lean.yml" when: lean +# Before redeploy controller, should remove that controller instance from nginx +- name: remove the controller from nginx's upstream configuration + shell: + docker exec -t nginx sh -c "sed -i \"s/ server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/ \#server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/g\" /etc/nginx/nginx.conf && nginx -s reload" + delegate_to: "{{ item }}" + with_items: "{{ groups['edge'] }}" + when: zeroDowntimeDeployment.enabled == true + +- name: wait some time for controllers fire all existing triggers + shell: sleep 5s + when: zeroDowntimeDeployment.enabled == true + +- name: wait until {{ controller_name }} executes all existing activations + uri: + url: "{{ controller.protocol }}://{{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/activation/count" + validate_certs: no + client_key: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.key }}" + client_cert: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.cert }}" + return_content: yes + user: "{{ controller.username }}" + password: "{{ controller.password }}" + force_basic_auth: yes + register: result + until: result.content == '0' + retries: "{{ controller.deployment.retries }}" + delay: "{{ controller.deployment.delay }}" + when: zeroDowntimeDeployment.enabled == true + ignore_errors: "{{ controller.deployment.ignore_error }}" + +- name: Disable {{ controller_name }} before remove controller + uri: + url: "{{ controller.protocol }}://{{ ansible_host }}:{{ controller.basePort + groups['controllers'].index(inventory_hostname) }}/disable" + validate_certs: no + client_key: "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.key }}" + client_cert: "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.cert }}" + method: POST + status_code: 200 + user: "{{ controller.username }}" + password: "{{ controller.password }}" + force_basic_auth: yes + ignore_errors: "{{ controller.deployment.ignore_error }}" + when: zeroDowntimeDeployment.enabled == true + +- name: wait some time for controller to gracefully shutdown the consumer for activation ack + shell: sleep 5s + when: zeroDowntimeDeployment.enabled == true + - name: (re)start controller docker_container: name: "{{ controller_name }}" @@ -397,3 +446,37 @@ until: result.status == 200 retries: 12 delay: 10 + +- name: warm up activation path + uri: + url: + "{{controller.protocol}}://{{ lookup('file', '{{ catalog_auth_key }}')}}@{{ansible_host}}:{{controller_port}}/api/v1/namespaces/_/actions/invokerHealthTestAction{{controller_index}}?blocking=false&result=false" + validate_certs: "no" + client_key: + "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.key }}" + client_cert: + "{{ controller.confdir }}/{{ controller_name }}/{{ controller.ssl.cert }}" + method: POST + ignore_errors: True + +- name: wait for all invokers in {{ controller_name }} to become up + uri: + url: "{{ controller.protocol }}://{{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/invokers" + validate_certs: no + client_key: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.key }}" + client_cert: "{{ controller.confdir }}/controller{{ groups['controllers'].index(inventory_hostname) }}/{{ controller.ssl.cert }}" + return_content: yes + register: invokerStatus + until: invokerStatus.json|length >= 1 and "unhealthy" not in invokerStatus.content + retries: 14 + delay: 5 + when: zeroDowntimeDeployment.enabled == true + +# When all invokers report their status to controller, add the controller instance to nginx when exist at least one invoker is up +- name: Add the controller back to nginx's upstream configuration when there exist at least one healthy invoker + shell: + docker exec -t nginx sh -c "sed -i \"s/ \#server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/ server {{ ansible_host }}:{{ controller.basePort + (controller_index | int) }}/g\" /etc/nginx/nginx.conf && nginx -s reload" + delegate_to: "{{ item }}" + with_items: "{{ groups['edge'] }}" + ignore_errors: True + when: zeroDowntimeDeployment.enabled == true and "up" in invokerStatus.content diff --git a/ansible/roles/invoker/tasks/clean.yml b/ansible/roles/invoker/tasks/clean.yml index 8d7ebaca26f..b8f0b2f34c9 100644 --- a/ansible/roles/invoker/tasks/clean.yml +++ b/ansible/roles/invoker/tasks/clean.yml @@ -22,6 +22,37 @@ invoker_name: "{{ name_prefix ~ ((invoker_index_base | int) + host_group.index(inventory_hostname)) }}" invoker_index: "{{ (invoker_index_base | int) + host_group.index(inventory_hostname) }}" +- name: disable invoker{{ groups['invokers'].index(inventory_hostname) }} + uri: + url: "{{ invoker.protocol }}://{{ ansible_host }}:{{ invoker.port + groups['invokers'].index(inventory_hostname) }}/disable" + validate_certs: no + client_key: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.key }}" + client_cert: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.cert }}" + method: POST + status_code: 200 + user: "{{ invoker.username }}" + password: "{{ invoker.password }}" + force_basic_auth: yes + ignore_errors: "{{ invoker.deployment.ignore_error }}" + when: zeroDowntimeDeployment.enabled == true and enable_scheduler + +- name: wait invoker{{ groups['invokers'].index(inventory_hostname) }} to clean up all existing containers + uri: + url: "{{ invoker.protocol }}://{{ ansible_host }}:{{ invoker.port + groups['invokers'].index(inventory_hostname) }}/pool/count" + validate_certs: no + client_key: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.key }}" + client_cert: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.cert }}" + user: "{{ invoker.username }}" + password: "{{ invoker.password }}" + force_basic_auth: yes + return_content: yes + register: result + until: result.content == '0' + retries: "{{ invoker.deployment.retries }}" + delay: "{{ invoker.deployment.delay }}" + when: zeroDowntimeDeployment.enabled == true and enable_scheduler + ignore_errors: "{{ invoker.deployment.ignore_error }}" + - name: remove invoker docker_container: name: "{{ invoker_name }}" @@ -59,12 +90,14 @@ path: "{{ whisk_logs_dir }}/{{ invoker_name }}" state: absent become: "{{ logs.dir.become }}" + when: mode == "clean" - name: remove invoker conf directory file: path: "{{ invoker.confdir }}/{{ invoker_name }}" state: absent become: "{{ invoker.dir.become }}" + when: mode == "clean" # Workaround for orphaned ifstate.veth* files on Ubuntu 14.04 # See https://github.com/moby/moby/issues/22513 diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 0223df97d24..62e21fb3487 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -17,6 +17,12 @@ --- # This role installs invokers. +### +# When the zero-downtime-deployment is enabled, clean.yml is used to gracefully shut down the invoker. +# +- import_tasks: clean.yml + when: zeroDowntimeDeployment.enabled == true and enable_scheduler + - import_tasks: docker_login.yml - name: get invoker name and index diff --git a/ansible/roles/schedulers/tasks/deploy.yml b/ansible/roles/schedulers/tasks/deploy.yml index d1ec6e4870c..280ea68b75e 100644 --- a/ansible/roles/schedulers/tasks/deploy.yml +++ b/ansible/roles/schedulers/tasks/deploy.yml @@ -280,11 +280,6 @@ include_tasks: "{{ item }}.yml" with_items: "{{ scheduler_plugins | default([]) }}" -- name: Judge current scheduler whether deployed - shell: echo $(docker ps | grep {{ scheduler_name }} | wc -l) - register: schedulerDeployed - when: zeroDowntimeDeployment.enabled == true - - name: disable scheduler{{ groups['schedulers'].index(inventory_hostname) }} before redeploy scheduler uri: url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/disable" @@ -295,27 +290,23 @@ password: "{{ scheduler.password }}" force_basic_auth: yes ignore_errors: "{{ scheduler.deployment_ignore_error }}" - when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0" + when: zeroDowntimeDeployment.enabled == true -- name: wait until all queue and create queue task is finished before redeploy scheduler when using apicall solution or half solution +- name: wait until all activation is finished before redeploy scheduler uri: - url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/queue/total" + url: "{{ scheduler.protocol }}://{{ ansible_host }}:{{ scheduler_port }}/activation/count" validate_certs: no return_content: yes user: "{{ scheduler.username }}" password: "{{ scheduler.password }}" force_basic_auth: yes - register: totalQueue - until: totalQueue.content == "0" + register: result + until: result.content == "0" retries: 180 delay: 5 - when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0" + when: zeroDowntimeDeployment.enabled == true ignore_errors: "{{ scheduler.deployment_ignore_error }}" -- name: wait until all queue and create queue task is finished before redeploy scheduler using sleep solution - shell: sleep 120s - when: zeroDowntimeDeployment.enabled == true and schedulerDeployed.stdout != "0" and zeroDowntimeDeployment.solution == 'sleep' - - name: (re)start scheduler docker_container: name: "{{ scheduler_name }}" diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2 index 03ade2b2d6e..e9dc2c2dd5b 100644 --- a/ansible/templates/whisk.properties.j2 +++ b/ansible/templates/whisk.properties.j2 @@ -53,11 +53,15 @@ edge.host.apiport=443 kafkaras.host.port={{ kafka.ras.port }} redis.host.port={{ redis.port }} invoker.hosts.basePort={{ invoker.port }} +invoker.username={{ invoker.username }} +invoker.password={{ invoker.password }} controller.hosts={{ groups["controllers"] | map('extract', hostvars, 'ansible_host') | list | join(",") }} controller.host.basePort={{ controller.basePort }} controller.instances={{ controller.instances }} controller.protocol={{ controller.protocol }} +controller.username={{ controller.username }} +controller.password={{ controller.password }} invoker.container.network=bridge invoker.container.policy={{ invoker_container_policy_name | default()}} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index c029edb9db0..f9039e947a1 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -312,6 +312,9 @@ object ConfigKeys { val dataManagementServiceRetryInterval = "whisk.scheduler.data-management-service.retry-interval" + val whiskControllerUsername = "whisk.controller.username" + val whiskControllerPassword = "whisk.controller.password" + val whiskSchedulerUsername = "whisk.scheduler.username" val whiskSchedulerPassword = "whisk.scheduler.password" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala index b65b1149679..da0c4d1028d 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala @@ -532,9 +532,13 @@ object InvokerResourceMessage extends DefaultJsonProtocol { * ... * ] */ -object StatusQuery +object GetState -case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String) +case class StatusData(invocationNamespace: String, + fqn: String, + waitingActivation: List[ActivationId], + status: String, + data: String) extends Message { override def serialize: String = StatusData.serdes.write(this).compactPrint diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala index 2c544185edf..e5f3397da99 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala @@ -20,9 +20,10 @@ package org.apache.openwhisk.core.service import akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.ibm.etcd.api.Event.EventType import com.ibm.etcd.client.kv.WatchUpdate -import org.apache.openwhisk.common.Logging +import org.apache.openwhisk.common.{GracefulShutdown, Logging} import org.apache.openwhisk.core.etcd.EtcdClient import org.apache.openwhisk.core.etcd.EtcdType._ + import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap @@ -141,6 +142,13 @@ class WatcherService(etcdClient: EtcdClient)(implicit logging: Logging, actorSys // always send WatcherClosed back to sender if it need a feedback if (request.needFeedback) sender ! WatcherClosed(request.watchKey, request.isPrefix) + + case GracefulShutdown => + watcher.close() + putWatchers.clear() + deleteWatchers.clear() + prefixPutWatchers.clear() + prefixDeleteWatchers.clear() } } diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf index e7bf1a9566d..7467b2df1c5 100644 --- a/core/controller/src/main/resources/application.conf +++ b/core/controller/src/main/resources/application.conf @@ -122,4 +122,8 @@ whisk{ file-system : true dir-path : "/swagger-ui/" } + controller { + username: "controller.user" + password: "controller.pass" + } } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala index 0e662faa454..2ff1ecb5641 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala @@ -22,7 +22,8 @@ import akka.actor.{ActorSystem, CoordinatedShutdown} import akka.event.Logging.InfoLevel import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.headers.BasicHttpCredentials +import akka.http.scaladsl.model.{StatusCodes, Uri} import akka.http.scaladsl.server.Route import kamon.Kamon import org.apache.openwhisk.common.Https.HttpsConfig @@ -36,6 +37,7 @@ import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator import org.apache.openwhisk.core.entity.ExecManifest.Runtimes import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.loadBalancer.LoadBalancerProvider +import org.apache.openwhisk.http.ErrorResponse.terminate import org.apache.openwhisk.http.{BasicHttpService, BasicRasService} import org.apache.openwhisk.spi.SpiLoader import pureconfig._ @@ -96,7 +98,7 @@ class Controller(val instance: ControllerInstanceId, (pathEndOrSingleSlash & get) { complete(info) } - } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth + } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ activationStatus ~ disable } // initialize datastores @@ -167,6 +169,22 @@ class Controller(val instance: ControllerInstanceId, } } + /** + * Handles GET /activation URI. + * + * @return running activation + */ + protected[controller] val activationStatus = { + implicit val executionContext = actorSystem.dispatcher + (pathPrefix("activation") & get) { + pathEndOrSingleSlash { + complete(loadBalancer.activeActivationsByController.map(_.toJson)) + } ~ path("count") { + complete(loadBalancer.activeActivationsByController(controllerInstance.asString).map(_.toJson)) + } + } + } + // controller top level info private val info = Controller.info( whiskConfig, @@ -175,6 +193,30 @@ class Controller(val instance: ControllerInstanceId, LogLimit.config, runtimes, List(apiV1.basepath())) + + private val controllerUsername = loadConfigOrThrow[String](ConfigKeys.whiskControllerUsername) + private val controllerPassword = loadConfigOrThrow[String](ConfigKeys.whiskControllerPassword) + + /** + * disable controller + */ + private def disable(implicit transid: TransactionId) = { + implicit val executionContext = actorSystem.dispatcher + implicit val jsonPrettyResponsePrinter = PrettyPrinter + (path("disable") & post) { + extractCredentials { + case Some(BasicHttpCredentials(username, password)) => + if (username == controllerUsername && password == controllerPassword) { + loadBalancer.close + logging.warn(this, "controller is disabled") + complete("controller is disabled") + } else { + terminate(StatusCodes.Unauthorized, "username or password are wrong.") + } + case _ => terminate(StatusCodes.Unauthorized) + } + } + } } /** diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala index 045828d66cd..1a204cf42dd 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala @@ -86,8 +86,9 @@ abstract class CommonLoadBalancer(config: WhiskConfig, override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue) override def activeActivationsByController(controller: String): Future[Int] = Future.successful(activationsPerController.get(ControllerInstanceId(controller)).map(_.intValue()).getOrElse(0)) - override def activeActivationsByController: Future[List[ActivationId]] = - Future.successful(activationSlots.keySet.toList) + override def activeActivationsByController: Future[List[(String, String)]] = + Future.successful( + activationSlots.values.map(entry => (entry.id.asString, entry.fullyQualifiedEntityName.toString)).toList) override def activeActivationsByInvoker(invoker: String): Future[Int] = Future.successful( activationsPerInvoker.get(InvokerInstanceId(invoker.toInt, userMemory = 0.MB)).map(_.intValue()).getOrElse(0)) diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala index 3c247e0dd66..bf04d1d2a4d 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala @@ -663,8 +663,9 @@ class FPCPoolBalancer(config: WhiskConfig, Future.successful(activationsPerController.get(ControllerInstanceId(controller)).map(_.intValue()).getOrElse(0)) /** Gets the in-flight activations */ - override def activeActivationsByController: Future[List[ActivationId]] = - Future.successful(activationSlots.keySet.toList) + override def activeActivationsByController: Future[List[(String, String)]] = + Future.successful( + activationSlots.values.map(entry => (entry.id.asString, entry.fullyQualifiedEntityName.toString)).toList) /** Gets the number of in-flight activations for a specific invoker. */ override def activeActivationsByInvoker(invoker: String): Future[Int] = Future.successful(0) diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala index a4a0038bc04..d9a38c6d26f 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala @@ -59,7 +59,7 @@ trait LoadBalancer { def activeActivationsByController(controller: String): Future[Int] /** Gets the in-flight activations */ - def activeActivationsByController: Future[List[ActivationId]] + def activeActivationsByController: Future[List[(String, String)]] /** Gets the number of in-flight activations for a specific invoker. */ def activeActivationsByInvoker(invoker: String): Future[Int] diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala index 2c2392feb3e..f525f88de01 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala @@ -22,7 +22,7 @@ import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash} import akka.grpc.internal.ClientClosedException import akka.pattern.pipe import io.grpc.StatusRuntimeException -import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId} import org.apache.openwhisk.core.connector.ActivationMessage import org.apache.openwhisk.core.containerpool.ContainerId import org.apache.openwhisk.core.entity._ @@ -124,13 +124,13 @@ class ActivationClientProxy( stay() case Event(e: RescheduleActivation, client: Client) => - logging.info(this, s"got a reschedule message ${e.msg.activationId} for action: ${e.msg.action}") + logging.info(this, s"[${containerId.asString}] got a reschedule message ${e.msg.activationId} for action: ${e.msg.action}") client.activationClient .rescheduleActivation( RescheduleRequest(e.invocationNamespace, e.fqn.serialize, e.rev.serialize, e.msg.serialize)) .recover { case t => - logging.error(this, s"Failed to reschedule activation (error: $t)") + logging.error(this, s"[${containerId.asString}] Failed to reschedule activation (error: $t)") RescheduleResponse() } .foreach(res => { @@ -139,7 +139,7 @@ class ActivationClientProxy( stay() case Event(msg: ActivationMessage, _: Client) => - logging.debug(this, s"got a message ${msg.activationId} for action: ${msg.action}") + logging.debug(this, s"[${containerId.asString}] got a message ${msg.activationId} for action: ${msg.action}") context.parent ! msg stay() @@ -152,7 +152,7 @@ class ActivationClientProxy( case _: NoMemoryQueue => logging.error( this, - s"The queue of action ${action} under invocationNamespace ${invocationNamespace} does not exist. Check for queues in other schedulers.") + s"[${containerId.asString}] The queue of action ${action} under invocationNamespace ${invocationNamespace} does not exist. Check for queues in other schedulers.") c.activationClient .close() .flatMap(_ => @@ -162,7 +162,7 @@ class ActivationClientProxy( stay() case _: ActionMismatch => - logging.error(this, s"action version does not match: $action") + logging.error(this, s"[${containerId.asString}] action version does not match: $action") c.activationClient.close().andThen { case _ => self ! ClientClosed } @@ -170,7 +170,7 @@ class ActivationClientProxy( goto(ClientProxyRemoving) case _: NoActivationMessage => // retry - logging.debug(this, s"no activation message exist: $action") + logging.debug(this, s"[${containerId.asString}] no activation message exist: $action") context.parent ! RetryRequestActivation stay() @@ -182,7 +182,7 @@ class ActivationClientProxy( case Event(f: FailureMessage, c: Client) => f.cause match { case t: ParsingException => - logging.error(this, s"failed to parse activation message: $t") + logging.error(this, s"[${containerId.asString}] failed to parse activation message: $t") context.parent ! RetryRequestActivation stay() @@ -191,13 +191,13 @@ class ActivationClientProxy( // In such situation, it is better to stop the activationClientProxy, otherwise, in short time, // it would print huge log due to create another grpcClient to fetch activation again. case t: StatusRuntimeException if t.getMessage.contains(ActivationClientProxy.hostResolveError) => - logging.error(this, s"akka grpc server connection failed: $t") + logging.error(this, s"[${containerId.asString}] akka grpc server connection failed: $t") self ! ClientClosed goto(ClientProxyRemoving) case t: StatusRuntimeException => - logging.error(this, s"akka grpc server connection failed: $t") + logging.error(this, s"[${containerId.asString}] akka grpc server connection failed: $t") c.activationClient .close() .flatMap(_ => @@ -207,13 +207,13 @@ class ActivationClientProxy( stay() case _: ClientClosedException => - logging.error(this, s"grpc client is already closed for $action") + logging.error(this, s"[${containerId.asString}] grpc client is already closed for $action") self ! ClientClosed goto(ClientProxyRemoving) case t: Throwable => - logging.error(this, s"get activation from remote server error: $t") + logging.error(this, s"[${containerId.asString}] get activation from remote server error: $t") safelyCloseClient(c) goto(ClientProxyRemoving) } @@ -227,22 +227,12 @@ class ActivationClientProxy( } when(ClientProxyRemoving) { - case Event(request: RequestActivation, client: Client) => - request.newScheduler match { - // if scheduler is changed, client needs to be recreated - case Some(scheduler) if scheduler.host != client.rpcHost || scheduler.rpcPort != client.rpcPort => - val newHost = request.newScheduler.get.host - val newPort = request.newScheduler.get.rpcPort - client.activationClient - .close() - .flatMap(_ => - createActivationClient(invocationNamespace, action, newHost, newPort, tryOtherScheduler = false)) - .pipeTo(self) - case _ => - requestActivationMessage(invocationNamespace, action, rev, client.activationClient, request.lastDuration) - .pipeTo(self) - } + // This is the case where the last activation message is sent to the container proxy and container proxy requested + // another activation. But the activation client is being shut down and it no longer fetches any request. + case Event(_: RequestActivation, c: Client) => + safelyCloseClient(c) + stay() case Event(msg: ActivationMessage, _: Client) => @@ -250,13 +240,14 @@ class ActivationClientProxy( stay() - case Event(_: MemoryQueueError, _: Client) => + case Event(_: MemoryQueueError, c: Client) => + safelyCloseClient(c) self ! ClientClosed stay() case Event(f: FailureMessage, c: Client) => - logging.error(this, s"some error happened for action: ${action} in state: $stateName, caused by: $f") + logging.error(this, s"[${containerId.asString}] some error happened for action: ${action} in state: $stateName, caused by: $f") safelyCloseClient(c) stay() @@ -279,16 +270,24 @@ class ActivationClientProxy( warmed = true stay - case Event(CloseClientProxy, c: Client) => - safelyCloseClient(c) + // When disabling an invoker, there could still be activations in the queue. + // The activation client keeps fetching data and will forward it to the container(parent). + // Once it receives `NoActivationMessage` from the queue, it will close the activation client and send `ClientClosed` + // to the container(parent), rather than sending `RetryRequestActivation`. + // When a container proxy(parent) receives `ClientClosed`, it will finally shut down. + case Event(GracefulShutdown, _: Client) => + logging.info(this, s"[${containerId.asString}] safely close client proxy and go to the ClientProxyRemoving state") + goto(ClientProxyRemoving) case Event(ClientClosed, _) => + logging.info(this, s"[${containerId.asString}] the underlying client is closed, stopping the activation client proxy") context.parent ! ClientClosed stop() case Event(StopClientProxy, c: Client) => + logging.info(this, s"[${containerId.asString}] stop close client proxy and go to the ClientProxyRemoving state") safelyCloseClient(c) stay() } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala index 5b0c283c32e..2a89630a550 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala @@ -18,14 +18,15 @@ package org.apache.openwhisk.core.containerpool.v2 import java.util.concurrent.atomic.AtomicInteger -import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props} +import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props} import org.apache.openwhisk.common._ import org.apache.openwhisk.core.connector.ContainerCreationError._ import org.apache.openwhisk.core.connector.{ ContainerCreationAckMessage, ContainerCreationMessage, ContainerDeletionMessage, + GetState, ResultMetadata } import org.apache.openwhisk.core.containerpool.{ @@ -41,6 +42,7 @@ import org.apache.openwhisk.core.containerpool.{ import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.http.Messages +import spray.json.DefaultJsonProtocol import scala.annotation.tailrec import scala.collection.concurrent.TrieMap @@ -50,6 +52,27 @@ import scala.concurrent.duration._ import scala.util.{Random, Try} import scala.collection.immutable.Queue +object TotalContainerPoolState extends DefaultJsonProtocol { + implicit val prewarmedPoolSerdes = jsonFormat2(PrewarmedContainerPoolState.apply) + implicit val warmPoolSerdes = jsonFormat2(WarmContainerPoolState.apply) + implicit val totalPoolSerdes = jsonFormat5(TotalContainerPoolState.apply) +} + +case class PrewarmedContainerPoolState(total: Int, countsByKind: Map[String, Int]) +case class WarmContainerPoolState(total: Int, containers: List[BasicContainerInfo]) +case class TotalContainerPoolState(totalContainers: Int, + inProgressCount: Int, + prewarmedPool: PrewarmedContainerPoolState, + busyPool: WarmContainerPoolState, + pausedPool: WarmContainerPoolState) { + + def serialize(): String = TotalContainerPoolState.totalPoolSerdes.write(this).compactPrint +} + +case class NotSupportedPoolState() { + def serialize(): String = "not supported" +} + case class CreationContainer(creationMessage: ContainerCreationMessage, action: WhiskAction) case class DeletionContainer(deletionMessage: ContainerDeletionMessage) case object Remove @@ -88,7 +111,7 @@ class FunctionPullingContainerPool( implicit val ec = context.system.dispatcher - protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, Data] + protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, ContainerAvailableData] protected[containerpool] var inProgressPool = immutable.Map.empty[ActorRef, Data] protected[containerpool] var warmedPool = immutable.Map.empty[ActorRef, WarmData] protected[containerpool] var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData] @@ -413,6 +436,15 @@ class FunctionPullingContainerPool( // Reset the prewarmCreateCount value when do expiration check and backfill prewarm if possible prewarmCreateFailedCount.set(0) adjustPrewarmedContainer(false, true) + + case GetState => + val totalContainers = busyPool.size + inProgressPool.size + warmedPool.size + prewarmedPool.size + val prewarmedState = + PrewarmedContainerPoolState(prewarmedPool.size, prewarmedPool.groupBy(_._2.kind).mapValues(_.size).toMap) + val busyState = WarmContainerPoolState(busyPool.size, busyPool.values.map(_.basicContainerInfo).toList) + val pausedState = WarmContainerPoolState(warmedPool.size, warmedPool.values.map(_.basicContainerInfo).toList) + sender() ! TotalContainerPoolState(totalContainers, inProgressPool.size, prewarmedState, busyState, pausedState) + } /** Install prewarm containers up to the configured requirements for each kind/memory combination or specified kind/memory */ diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala index 2049dc2d104..28d3e1e1cd1 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala @@ -127,16 +127,30 @@ case class PreWarmData(container: Container, def isExpired(): Boolean = expires.exists(_.isOverdue()) } -case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction) +object BasicContainerInfo extends DefaultJsonProtocol { + implicit val prewarmedPoolSerdes = jsonFormat4(BasicContainerInfo.apply) +} + +sealed case class BasicContainerInfo(containerId: String, namespace: String, action: String, kind: String) + +sealed abstract class ContainerAvailableData(container: Container, + invocationNamespace: String, + action: ExecutableWhiskAction) extends Data(action.limits.memory.megabytes.MB) { override def getContainer = Some(container) + + val basicContainerInfo = + BasicContainerInfo(container.containerId.asString, invocationNamespace, action.name.asString, action.exec.kind) } +case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction) + extends ContainerAvailableData(container, invocationNamespace, action) + case class InitializedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction, override val clientProxy: ActorRef) - extends Data(action.limits.memory.megabytes.MB) + extends ContainerAvailableData(container, invocationNamespace, action) with WithClient { override def getContainer = Some(container) def toReschedulingData(resumeRun: RunActivation) = @@ -149,7 +163,7 @@ case class WarmData(container: Container, revision: DocRevision, lastUsed: Instant, override val clientProxy: ActorRef) - extends Data(action.limits.memory.megabytes.MB) + extends ContainerAvailableData(container, invocationNamespace, action) with WithClient { override def getContainer = Some(container) def toReschedulingData(resumeRun: RunActivation) = @@ -159,12 +173,10 @@ case class WarmData(container: Container, case class ReschedulingData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction, - override val clientProxy: ActorRef, + clientProxy: ActorRef, resumeRun: RunActivation) - extends Data(action.limits.memory.megabytes.MB) - with WithClient { - override def getContainer = Some(container) -} + extends ContainerAvailableData(container, invocationNamespace, action) + with WithClient class FunctionPullingContainerProxy( factory: (TransactionId, @@ -628,8 +640,8 @@ class FunctionPullingContainerProxy( Some(instance), Some(data.container.containerId))) - // Just send CloseClientProxy to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time. - data.clientProxy ! CloseClientProxy + // Just send GracefulShutdown to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time. + data.clientProxy ! GracefulShutdown stay case x: Event if x.event != PingCache => delay @@ -774,7 +786,6 @@ class FunctionPullingContainerProxy( case Event(Remove | GracefulShutdown, _) => stay() - case Event(DetermineKeepContainer(_), _) => stay() } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala index 6665e6373ad..b0a4ad80eeb 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala @@ -262,7 +262,7 @@ case class HealthActivationServiceClient() extends Actor { case _ => // do nothing } - case CloseClientProxy => + case GracefulShutdown => closed = true } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala index f3503b55e37..1321d8add18 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala @@ -46,11 +46,11 @@ class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, sys super.routes ~ extractCredentials { case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword => (path("enable") & post) { - invoker.enable() + complete(invoker.enable()) } ~ (path("disable") & post) { - invoker.disable() + complete(invoker.disable()) } ~ (path("isEnabled") & get) { - invoker.isEnabled() + complete(invoker.isEnabled()) } case _ => terminate(StatusCodes.Unauthorized) } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala index 67660e8a608..dd6198a34a6 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala @@ -20,8 +20,8 @@ package org.apache.openwhisk.core.invoker import akka.Done import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props} import akka.grpc.GrpcClientSettings -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.Route +import akka.pattern.ask +import akka.util.Timeout import com.ibm.etcd.api.Event.EventType import com.ibm.etcd.client.kv.KvClient.Watch import com.ibm.etcd.client.kv.WatchUpdate @@ -31,7 +31,7 @@ import org.apache.openwhisk.core.connector._ import org.apache.openwhisk.core.containerpool._ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider import org.apache.openwhisk.core.containerpool.v2._ -import org.apache.openwhisk.core.database.{UserContext, _} +import org.apache.openwhisk.core.database._ import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue @@ -373,28 +373,33 @@ class FPCInvokerReactive(config: WhiskConfig, maxPeek, sendAckToScheduler)) - override def enable(): Route = { + override def enable(): String = { invokerHealthManager ! Enable pool ! Enable warmUp() - complete("Success enable invoker") + s"${instance.toString} is now enabled." } - override def disable(): Route = { + override def disable(): String = { invokerHealthManager ! GracefulShutdown pool ! GracefulShutdown warmUpWatcher.foreach(_.close()) warmUpWatcher = None - complete("Successfully disabled invoker") + s"${instance.toString} is now disabled." } - override def isEnabled(): Route = { - complete(InvokerEnabled(warmUpWatcher.nonEmpty).serialize()) + override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = { + implicit val timeout: Timeout = 5.seconds + (pool ? GetState).mapTo[TotalContainerPoolState].map(Right(_)) } - override def backfillPrewarm(): Route = { + override def isEnabled(): String = { + InvokerEnabled(warmUpWatcher.nonEmpty).serialize() + } + + override def backfillPrewarm(): String = { pool ! AdjustPrewarmedContainer - complete("backfilling prewarm container") + "backfilling prewarm container" } private val warmUpFetchRequest = FetchRequest( diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala index 61d194f4b04..88c8b40766c 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala @@ -27,6 +27,8 @@ import org.apache.openwhisk.http.BasicRasService import org.apache.openwhisk.http.ErrorResponse.terminate import pureconfig.loadConfigOrThrow import spray.json.PrettyPrinter +import spray.json.DefaultJsonProtocol._ +import spray.json._ import scala.concurrent.ExecutionContext @@ -46,11 +48,31 @@ class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemP super.routes ~ extractCredentials { case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword => (path("enable") & post) { - invoker.enable() + complete(invoker.enable()) } ~ (path("disable") & post) { - invoker.disable() + complete(invoker.disable()) } ~ (path("isEnabled") & get) { - invoker.isEnabled() + complete(invoker.isEnabled()) + } ~ (pathPrefix("pool") & get) { + pathEndOrSingleSlash { + complete { + invoker.getPoolState().map { + case Right(poolState) => + poolState.serialize() + case Left(value) => + value.serialize() + } + } + } ~ (path("count") & get) { + complete { + invoker.getPoolState().map { + case Right(poolState) => + (poolState.busyPool.total + poolState.pausedPool.total + poolState.inProgressCount).toJson.compactPrint + case Left(value) => + value.serialize() + } + } + } } case _ => terminate(StatusCodes.Unauthorized) } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 997825865d5..8319c511b17 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -19,13 +19,13 @@ package org.apache.openwhisk.core.invoker import akka.Done import akka.actor.{ActorSystem, CoordinatedShutdown} -import akka.http.scaladsl.server.Route import com.typesafe.config.ConfigValueFactory import kamon.Kamon import org.apache.openwhisk.common.Https.HttpsConfig import org.apache.openwhisk.common._ import org.apache.openwhisk.core.WhiskConfig._ import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider} +import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState} import org.apache.openwhisk.core.containerpool.{Container, ContainerPoolConfig} import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ @@ -246,10 +246,11 @@ trait InvokerProvider extends Spi { // this trait can be used to add common implementation trait InvokerCore { - def enable(): Route - def disable(): Route - def isEnabled(): Route - def backfillPrewarm(): Route + def enable(): String + def disable(): String + def isEnabled(): String + def backfillPrewarm(): String + def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] } /** diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 31d1f22930c..d7aae4a3c29 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -19,20 +19,19 @@ package org.apache.openwhisk.core.invoker import java.nio.charset.StandardCharsets import java.time.Instant + import akka.Done import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props} import akka.event.Logging.InfoLevel -import akka.http.scaladsl.server.Directives.complete -import akka.http.scaladsl.server.Route import org.apache.openwhisk.common._ import org.apache.openwhisk.common.tracing.WhiskTracerProvider import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender} import org.apache.openwhisk.core.connector._ import org.apache.openwhisk.core.containerpool._ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider -import org.apache.openwhisk.core.database.{UserContext, _} +import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState} +import org.apache.openwhisk.core.database._ import org.apache.openwhisk.core.entity._ -import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.http.Messages @@ -304,32 +303,36 @@ class InvokerReactive( private var healthScheduler: Option[ActorRef] = Some(getHealthScheduler) - override def enable(): Route = { + override def enable(): String = { if (healthScheduler.isEmpty) { healthScheduler = Some(getHealthScheduler) - complete(s"${instance.toString} is now enabled.") + s"${instance.toString} is now enabled." } else { - complete(s"${instance.toString} is already enabled.") + s"${instance.toString} is already enabled." } } - override def disable(): Route = { + override def disable(): String = { pingController(isEnabled = false) if (healthScheduler.nonEmpty) { actorSystem.stop(healthScheduler.get) healthScheduler = None - complete(s"${instance.toString} is now disabled.") + s"${instance.toString} is now disabled." } else { - complete(s"${instance.toString} is already disabled.") + s"${instance.toString} is already disabled." } } - override def isEnabled(): Route = { - complete(InvokerEnabled(healthScheduler.nonEmpty).serialize()) + override def isEnabled(): String = { + InvokerEnabled(healthScheduler.nonEmpty).serialize() + } + + override def backfillPrewarm(): String = { + "not supported" } - override def backfillPrewarm(): Route = { - complete("not supported") + override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = { + Future.successful(Left(NotSupportedPoolState())) } } diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala index 4fd2f9b691e..aec923508e3 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala @@ -49,23 +49,29 @@ class FPCSchedulerServer(scheduler: SchedulerCore, systemUsername: String, syste complete { scheduler.getState.map { case (list, creationCount) => - (list - .map(scheduler => scheduler._1.asString -> scheduler._2.toString) - .toMap - ++ Map("creationCount" -> creationCount.toString)).toJson.asJsObject + val sum = list.map(tuple => tuple._2).sum + (Map("queue" -> sum.toString) ++ Map("creationCount" -> creationCount.toString)).toJson } } } ~ (path("disable") & post) { logger.warn(this, "Scheduler is disabled") scheduler.disable() complete("scheduler disabled") - } ~ (path(FPCSchedulerServer.queuePathPrefix / "total") & get) { - complete { - scheduler.getQueueSize.map(_.toString) + } ~ (pathPrefix(FPCSchedulerServer.queuePathPrefix) & get) { + pathEndOrSingleSlash { + complete(scheduler.getQueueStatusData.map(s => s.toJson)) + } ~ (path("count") & get) { + complete(scheduler.getQueueSize.map(s => s.toJson)) } - } ~ (path(FPCSchedulerServer.queuePathPrefix / "status") & get) { - complete { - scheduler.getQueueStatusData.map(s => s.toJson) + } ~ (path("activation" / "count") & get) { + pathEndOrSingleSlash { + complete( + scheduler.getQueueStatusData + .map { s => + s.map(_.waitingActivation.size) + } + .map(a => a.sum) + .map(_.toJson)) } } case _ => @@ -79,7 +85,7 @@ object FPCSchedulerServer { private val schedulerUsername = loadConfigOrThrow[String](ConfigKeys.whiskSchedulerUsername) private val schedulerPassword = loadConfigOrThrow[String](ConfigKeys.whiskSchedulerPassword) - private val queuePathPrefix = "queue" + private val queuePathPrefix = "queues" def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala index 2038fc1c350..e3ed70d8a0e 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala @@ -127,11 +127,11 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE } override def getQueueSize: Future[Int] = { - queueManager.ask(QueueSize)(Timeout(5.seconds)).mapTo[Int] + queueManager.ask(QueueSize)(Timeout(1.minute)).mapTo[Int] } override def getQueueStatusData: Future[List[StatusData]] = { - queueManager.ask(StatusQuery)(Timeout(5.seconds)).mapTo[Future[List[StatusData]]].flatten + queueManager.ask(GetState)(Timeout(1.minute)).mapTo[Future[List[StatusData]]].flatten } override def disable(): Unit = { diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala index acf311e9f2c..8a3db898a5b 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala @@ -72,6 +72,8 @@ class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Loggin QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match { case Some(queueValue) => implicit val transid = TransactionId.serdes.read(request.transactionId.parseJson) + if (!request.alive) logging.info(this, s"the container(${request.containerId}) is not alive") + (queueValue.queue ? GetActivation( transid, fqn, diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala index 7f174f9aa9b..28ab22210c6 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -46,6 +46,7 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu import pureconfig.loadConfigOrThrow import spray.json._ import pureconfig.generic.auto._ +import scala.collection.JavaConverters._ import java.time.{Duration, Instant} import java.util.concurrent.atomic.AtomicInteger @@ -69,17 +70,30 @@ case object NamespaceThrottled extends MemoryQueueState // Data sealed abstract class MemoryQueueData() -case class NoData() extends MemoryQueueData() -case class NoActors() extends MemoryQueueData() -case class RunningData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData() -case class ThrottledData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData() +case class NoData() extends MemoryQueueData() { + override def toString = "NoData" +} +case class NoActors() extends MemoryQueueData() { + override def toString = "NoActors" +} +case class RunningData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData() { + override def toString = "RunningData" +} +case class ThrottledData(schedulerActor: ActorRef, droppingActor: ActorRef) extends MemoryQueueData() { + override def toString = "ThrottledData" +} case class FlushingData(schedulerActor: ActorRef, droppingActor: ActorRef, error: ContainerCreationError, reason: String, activeDuringFlush: Boolean = false) - extends MemoryQueueData() -case class RemovingData(schedulerActor: ActorRef, droppingActor: ActorRef, outdated: Boolean) extends MemoryQueueData() + extends MemoryQueueData() { + override def toString = s"ThrottledData(error: $error, reason: $reason, activeDuringFlush: $activeDuringFlush)" +} +case class RemovingData(schedulerActor: ActorRef, droppingActor: ActorRef, outdated: Boolean) + extends MemoryQueueData() { + override def toString = s"RemovingData(outdated: $outdated)" +} // Events sent by the actor case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey: Option[String]) @@ -154,8 +168,8 @@ class MemoryQueue(private val etcdClient: EtcdClient, private val staleQueueRemovedMsg = QueueRemoved(invocationNamespace, action.toDocId.asDocInfo(revision), None) private val actionRetentionTimeout = MemoryQueue.getRetentionTimeout(actionMetaData, queueConfig) - private[queue] var containers = Set.empty[String] - private[queue] var creationIds = Set.empty[String] + private[queue] var containers = java.util.concurrent.ConcurrentHashMap.newKeySet[String]().asScala + private[queue] var creationIds = java.util.concurrent.ConcurrentHashMap.newKeySet[String]().asScala private[queue] var queue = Queue.empty[TimeSeriesActivationEntry] private[queue] var in = new AtomicInteger(0) @@ -565,8 +579,13 @@ class MemoryQueue(private val etcdClient: EtcdClient, stay // common case for all statuses - case Event(StatusQuery, _) => - sender ! StatusData(invocationNamespace, action.asString, queue.size, stateName.toString, stateData.toString) + case Event(GetState, _) => + sender ! StatusData( + invocationNamespace, + action.asString, + queue.toList.map(_.msg.activationId), + stateName.toString, + stateData.toString) stay // Common case for all cases @@ -575,9 +594,6 @@ class MemoryQueue(private val etcdClient: EtcdClient, // delete relative data, e.g leaderKey, namespaceThrottlingKey, actionThrottlingKey cleanUpData() - // let queue manager knows this queue is going to stop and let it forward incoming activations to a new queue - context.parent ! queueRemovedMsg - goto(Removing) using getRemovingData(data, outdated = false) // the version is updated. it's a shared case for all states @@ -662,7 +678,6 @@ class MemoryQueue(private val etcdClient: EtcdClient, private def cleanUpDataAndGotoRemoved() = { cleanUpWatcher() cleanUpData() - context.parent ! queueRemovedMsg goto(Removed) using NoData() } @@ -671,8 +686,6 @@ class MemoryQueue(private val etcdClient: EtcdClient, cleanUpActors(data) cleanUpData() - context.parent ! queueRemovedMsg - goto(Removed) using NoData() } @@ -688,7 +701,6 @@ class MemoryQueue(private val etcdClient: EtcdClient, // let the container manager know this version of containers are outdated. containerManager ! ContainerDeletion(invocationNamespace, action, revision, actionMetaData) } - self ! QueueRemovedCompleted goto(Removed) using NoData() } else { @@ -1017,7 +1029,8 @@ class MemoryQueue(private val etcdClient: EtcdClient, queue = newQueue logging.info( this, - s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}") + s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}")( + msg.transid) val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration MetricEmitter.emitHistogramMetric( LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), @@ -1051,7 +1064,8 @@ class MemoryQueue(private val etcdClient: EtcdClient, case Right(msg) => logging.info( this, - s"[$invocationNamespace:$action:$stateName] Send msg ${msg.activationId} to waiting request ${request.containerId}") + s"[$invocationNamespace:$action:$stateName] Send msg ${msg.activationId} to waiting request ${request.containerId}")( + msg.transid) cancelPoll.cancel() case Left(_) => // do nothing } diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala index d87338dd335..ad7b17103f1 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala @@ -261,11 +261,11 @@ class QueueManager( case QueueSize => sender ! QueuePool.size - case StatusQuery => - val poolStatus = Future.sequence { - QueuePool.values.map(_.queue.ask(StatusQuery).mapTo[StatusData]) - } - sender ! poolStatus + case GetState => + val result = + Future.sequence(QueuePool.values.map(_.queue.ask(GetState)(Timeout(5.seconds)).mapTo[StatusData]).toList) + + sender ! result case msg => logging.error(this, s"failed to elect a leader for ${msg}") diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala index 77cba01e710..9d33a508b45 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala @@ -24,7 +24,7 @@ import akka.grpc.internal.ClientClosedException import akka.testkit.{ImplicitSender, TestKit, TestProbe} import common.StreamLogging import io.grpc.StatusRuntimeException -import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.common.{GracefulShutdown, TransactionId} import org.apache.openwhisk.core.connector.ActivationMessage import org.apache.openwhisk.core.containerpool.ContainerId import org.apache.openwhisk.core.containerpool.v2._ @@ -37,9 +37,9 @@ import org.apache.openwhisk.grpc import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest, RescheduleRequest, RescheduleResponse} import org.junit.runner.RunWith import org.scalamock.scalatest.MockFactory +import org.scalatest.concurrent.ScalaFutures import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} -import org.scalatest.concurrent.ScalaFutures import scala.collection.mutable.ArrayBuffer import scala.concurrent.Future @@ -349,7 +349,7 @@ class ActivationClientProxyTests probe expectTerminated machine } - it should "be closed when it receives a CloseClientProxy message for a normal timeout case" in within(timeout) { + it should "be closed when it receives a GracefulShutdown message for a normal timeout case" in within(timeout) { val fetch = (_: FetchRequest) => Future(grpc.FetchResponse(AResponse(Right(message)).serialize)) val activationClient = MockActivationServiceClient(fetch) val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int, _: Boolean) => Future(activationClient) @@ -362,14 +362,14 @@ class ActivationClientProxyTests registerCallback(machine, probe) ready(machine, probe) - machine ! CloseClientProxy - awaitAssert(activationClient.isClosed shouldBe true) + machine ! GracefulShutdown probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving)) machine ! RequestActivation() probe expectMsg ClientClosed + awaitAssert(activationClient.isClosed shouldBe true) probe expectTerminated machine } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala index 14cf432b555..e1da77e6323 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala @@ -33,11 +33,11 @@ import org.apache.openwhisk.core.connector.{ MessageProducer, ResultMetadata } -import org.apache.openwhisk.core.containerpool.docker.DockerContainer import org.apache.openwhisk.core.containerpool.v2._ import org.apache.openwhisk.core.containerpool.{ Container, ContainerAddress, + ContainerId, ContainerPoolConfig, ContainerRemoved, PrewarmContainerCreationConfig, @@ -137,9 +137,15 @@ class FunctionPullingContainerPoolTests private val schedulerInstanceId = SchedulerInstanceId("0") private val producer = stub[MessageProducer] private val prewarmedData = PreWarmData(mock[MockableV2Container], actionKind, memoryLimit) + private val mockContainer = mock[MockableV2Container] + (mockContainer.containerId _: () => ContainerId) + .expects() + .returning(ContainerId("test-container-id")) + .anyNumberOfTimes() + private val initializedData = InitializedData( - mock[MockableV2Container], + mockContainer, invocationNamespace.asString, whiskAction.toExecutableWhiskAction.get, TestProbe().ref) @@ -315,6 +321,11 @@ class FunctionPullingContainerPoolTests private def retry[T](fn: => T) = org.apache.openwhisk.utils.retry(fn, 10, Some(1.second)) it should "stop containers gradually when shut down" in within(timeout * 20) { + (mockContainer.containerId _: () => ContainerId) + .expects() + .returning(ContainerId("test-container-id")) + .anyNumberOfTimes() + val (containers, factory) = testContainers(10) val disablingContainers = ListBuffer[ActorRef]() @@ -356,7 +367,7 @@ class FunctionPullingContainerPoolTests pool, ContainerIsPaused( WarmData( - stub[DockerContainer], + mockContainer, invocationNamespace.asString, whiskAction.toExecutableWhiskAction.get, doc.rev, @@ -636,6 +647,10 @@ class FunctionPullingContainerPoolTests } it should "use a warmed container when invocationNamespace, action and revision matched" in within(timeout) { + (mockContainer.containerId _: () => ContainerId) + .expects() + .returning(ContainerId("test-container-id")) + .anyNumberOfTimes() val (containers, factory) = testContainers(3) val doc = put(entityStore, whiskAction) @@ -654,7 +669,7 @@ class FunctionPullingContainerPoolTests pool.tell( ContainerIsPaused( WarmData( - stub[DockerContainer], + mockContainer, invocationNamespace.asString, whiskAction.toExecutableWhiskAction.get, doc.rev, @@ -687,6 +702,10 @@ class FunctionPullingContainerPoolTests } it should "retry when chosen warmed container is failed to resume" in within(timeout) { + (mockContainer.containerId _: () => ContainerId) + .expects() + .returning(ContainerId("test-container-id")) + .anyNumberOfTimes() val (containers, factory) = testContainers(2) val doc = put(entityStore, whiskAction) @@ -706,7 +725,7 @@ class FunctionPullingContainerPoolTests pool.tell( ContainerIsPaused( WarmData( - stub[DockerContainer], + mockContainer, invocationNamespace.asString, whiskAction.toExecutableWhiskAction.get, doc.rev, @@ -724,7 +743,7 @@ class FunctionPullingContainerPoolTests pool.tell( ResumeFailed( WarmData( - stub[DockerContainer], + mockContainer, invocationNamespace.asString, whiskAction.toExecutableWhiskAction.get, doc.rev, @@ -739,6 +758,10 @@ class FunctionPullingContainerPoolTests } it should "remove oldest previously used container to make space for the job passed to run" in within(timeout) { + (mockContainer.containerId _: () => ContainerId) + .expects() + .returning(ContainerId("test-container-id")) + .anyNumberOfTimes() val (containers, factory) = testContainers(2) val doc = put(entityStore, whiskAction) @@ -757,7 +780,7 @@ class FunctionPullingContainerPoolTests pool.tell( ContainerIsPaused( WarmData( - stub[DockerContainer], + mockContainer, invocationNamespace.asString, whiskAction.toExecutableWhiskAction.get, doc.rev, @@ -769,7 +792,7 @@ class FunctionPullingContainerPoolTests pool.tell( ContainerIsPaused( WarmData( - stub[DockerContainer], + mockContainer, invocationNamespace.asString, whiskAction.toExecutableWhiskAction.get, doc.rev, @@ -781,7 +804,7 @@ class FunctionPullingContainerPoolTests pool.tell( ContainerIsPaused( WarmData( - stub[DockerContainer], + mockContainer, invocationNamespace.asString, whiskAction.toExecutableWhiskAction.get, doc.rev, @@ -854,6 +877,10 @@ class FunctionPullingContainerPoolTests } it should "send ack(success) to scheduler when chosen warmed container is resumed" in within(timeout) { + (mockContainer.containerId _: () => ContainerId) + .expects() + .returning(ContainerId("test-container-id")) + .anyNumberOfTimes() val (containers, factory) = testContainers(1) val doc = put(entityStore, whiskAction) // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled. @@ -878,7 +905,7 @@ class FunctionPullingContainerPoolTests pool.tell( ContainerIsPaused( WarmData( - stub[DockerContainer], + mockContainer, invocationNamespace.asString, whiskAction.toExecutableWhiskAction.get, doc.rev, @@ -894,7 +921,7 @@ class FunctionPullingContainerPoolTests pool.tell( Resumed( WarmData( - stub[DockerContainer], + mockContainer, invocationNamespace.asString, whiskAction.toExecutableWhiskAction.get, doc.rev, diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala index 2355356bfaf..04516d4a828 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala @@ -44,9 +44,9 @@ import org.apache.openwhisk.core.containerpool.{ } import org.apache.openwhisk.core.database.{ArtifactStore, StaleParameter, UserContext} import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} +import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.core.entity.types.AuthStore -import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.etcd.EtcdClient import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys import org.apache.openwhisk.core.etcd.EtcdType._ @@ -1607,7 +1607,7 @@ class FunctionPullingContainerProxyTests UnregisterData(ContainerKeys .existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId)))) - client.expectMsg(CloseClientProxy) + client.expectMsg(GracefulShutdown) client.send(machine, ClientClosed) probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing)) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala index 13f95721175..a9a72f460c4 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala @@ -320,7 +320,7 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC override def totalActiveActivations = Future.successful(0) override def activeActivationsFor(namespace: UUID) = Future.successful(0) override def activeActivationsByController(controller: String): Future[Int] = Future.successful(0) - override def activeActivationsByController: Future[List[ActivationId]] = Future.successful(List(ActivationId("id"))) + override def activeActivationsByController: Future[List[(String, String)]] = Future.successful(List(("", ""))) override def activeActivationsByInvoker(invoker: String): Future[Int] = Future.successful(0) override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala index 76f1410a720..ea5e73b50d9 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala @@ -24,6 +24,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest import akka.http.scaladsl.unmarshalling.Unmarshal import common.StreamLogging import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState} import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore} import org.apache.openwhisk.http.BasicHttpService @@ -32,6 +33,8 @@ import org.scalamock.scalatest.MockFactory import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers} import org.scalatest.junit.JUnitRunner +import scala.concurrent.Future + /** * Tests InvokerServer API. */ @@ -135,22 +138,27 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService { var enableCount = 0 var disableCount = 0 - override def enable(): Route = { + override def enable(): String = { enableCount += 1 - complete("") + s"" } - override def disable(): Route = { + override def disable(): String = { disableCount += 1 - complete("") + s"" } - override def isEnabled(): Route = { + override def isEnabled(): String = { complete(InvokerEnabled(true).serialize()) + s"" + } + + override def backfillPrewarm(): String = { + "" } - override def backfillPrewarm(): Route = { - complete("") + override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = { + Future.successful(Left(NotSupportedPoolState())) } def reset(): Unit = { diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala index 2393b8775fd..cf298fb5fcf 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala @@ -24,6 +24,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest import akka.http.scaladsl.unmarshalling.Unmarshal import common.StreamLogging import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState} import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore} import org.apache.openwhisk.http.BasicHttpService @@ -32,6 +33,8 @@ import org.scalamock.scalatest.MockFactory import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers} import org.scalatest.junit.JUnitRunner +import scala.concurrent.Future + /** * Tests InvokerServerV2 API. */ @@ -134,22 +137,27 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService { var enableCount = 0 var disableCount = 0 - override def enable(): Route = { + override def enable(): String = { enableCount += 1 - complete("") + "" } - override def disable(): Route = { + override def disable(): String = { disableCount += 1 - complete("") + "" } - override def isEnabled(): Route = { + override def isEnabled(): String = { complete(InvokerEnabled(true).serialize()) + "" + } + + override def backfillPrewarm(): String = { + "" } - override def backfillPrewarm(): Route = { - complete("") + override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = { + Future.successful(Left(NotSupportedPoolState())) } def reset(): Unit = { diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala index 0dab4f4ce91..26dc3386991 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala @@ -25,7 +25,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest import common.StreamLogging import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.core.connector.StatusData -import org.apache.openwhisk.core.entity.SchedulerInstanceId +import org.apache.openwhisk.core.entity.{ActivationId, SchedulerInstanceId} import org.junit.runner.RunWith import org.scalamock.scalatest.MockFactory import org.scalatest.junit.JUnitRunner @@ -56,9 +56,10 @@ class FPCSchedulerServerTests val queues = List((SchedulerInstanceId("0"), 2), (SchedulerInstanceId("1"), 3)) val creationCount = 1 val testQueueSize = 2 + val activationIds = (1 to 10).map(_ => ActivationId.generate()).toList val statusDatas = List( - StatusData("testns1", "testaction1", 10, "Running", "RunningData"), - StatusData("testns2", "testaction2", 5, "Running", "RunningData")) + StatusData("testns1", "testaction1", activationIds, "Running", "RunningData"), + StatusData("testns2", "testaction2", activationIds.take(5), "Running", "RunningData")) // Create scheduler val scheduler = new TestScheduler(queues, creationCount, testQueueSize, statusDatas) @@ -85,8 +86,8 @@ class FPCSchedulerServerTests val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) Get(s"/state") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { status should be(OK) - responseAs[JsObject] shouldBe (queues.map(s => s._1.asString -> s._2.toString).toMap ++ Map( - "creationCount" -> creationCount.toString)).toJson + responseAs[JsObject] shouldBe (Map("creationCount" -> creationCount.toString) ++ Map( + "queue" -> queues.map(_._2).sum.toString)).toJson } } @@ -94,7 +95,7 @@ class FPCSchedulerServerTests it should "get total queue" in { implicit val tid = transid() val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) - Get(s"/queue/total") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { + Get(s"/queues/count") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { status should be(OK) responseAs[String] shouldBe testQueueSize.toString } @@ -104,7 +105,7 @@ class FPCSchedulerServerTests it should "get all queue status" in { implicit val tid = transid() val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) - Get(s"/queue/status") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { + Get(s"/queues") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { status should be(OK) responseAs[List[JsObject]] shouldBe statusDatas.map(_.toJson) } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala index 1cb2a5cdae2..a3cd73e1ac1 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala @@ -47,6 +47,7 @@ import spray.json.{JsObject, JsString} import java.time.Instant import scala.collection.immutable.Queue +import scala.collection.mutable import scala.concurrent.Future import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS} import scala.language.postfixOps @@ -172,9 +173,12 @@ class MemoryQueueFlowTests expectDataCleanUp(watcher, dataMgmtService) - parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) + // the queue is timed out again in the removed state + fsm ! StateTimeout + + parent.expectMsg(queueRemovedMsg) fsm ! QueueRemovedCompleted probe.expectTerminated(fsm, 10.seconds) @@ -284,9 +288,11 @@ class MemoryQueueFlowTests fsm ! StateTimeout - parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) + // the queue is timed out again in the removed state + fsm ! StateTimeout + fsm ! QueueRemovedCompleted expectDataCleanUp(watcher, dataMgmtService) @@ -369,15 +375,19 @@ class MemoryQueueFlowTests fsm ! GracefulShutdown - parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, NamespaceThrottled, Removing)) - fsm ! QueueRemovedCompleted + // the queue is timed out in the Removing state + fsm ! StateTimeout expectDataCleanUp(watcher, dataMgmtService) probe.expectMsg(Transition(fsm, Removing, Removed)) + // the queue is timed out again in the Removed state + fsm ! StateTimeout + fsm ! QueueRemovedCompleted + probe.expectTerminated(fsm, 10.seconds) } @@ -494,9 +504,12 @@ class MemoryQueueFlowTests fsm ! StateTimeout - parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) + // the queue is timed out again in the Removed state + fsm ! StateTimeout + + parent.expectMsg(queueRemovedMsg) fsm ! QueueRemovedCompleted expectDataCleanUp(watcher, dataMgmtService) @@ -644,9 +657,12 @@ class MemoryQueueFlowTests probe.expectMsg(Transition(fsm, Running, Idle)) fsm ! StateTimeout - parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) + // the queue is timed out again in the Removed state + fsm ! StateTimeout + parent.expectMsg(queueRemovedMsg) + fsm ! QueueRemovedCompleted expectDataCleanUp(watcher, dataMgmtService) @@ -756,9 +772,11 @@ class MemoryQueueFlowTests fsm ! StateTimeout - parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) + // the queue is timed out again in the Removed state + fsm ! StateTimeout + parent.expectMsg(queueRemovedMsg) fsm ! QueueRemovedCompleted expectDataCleanUp(watcher, dataMgmtService) @@ -838,6 +856,8 @@ class MemoryQueueFlowTests expectDataCleanUp(watcher, dataMgmtService) probe.expectMsg(Transition(fsm, Flushing, Removed)) + // the queue is timed out again in the Removed state + fsm ! StateTimeout parent.expectMsg(queueRemovedMsg) fsm ! QueueRemovedCompleted @@ -935,9 +955,11 @@ class MemoryQueueFlowTests clock.plusSeconds(flushGrace.toSeconds * 2) fsm ! StateTimeout - parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Flushing, Removed)) + // the queue is timed out again in the Removed state + fsm ! StateTimeout + parent.expectMsg(queueRemovedMsg) fsm ! QueueRemovedCompleted expectDataCleanUp(watcher, dataMgmtService) @@ -1032,15 +1054,17 @@ class MemoryQueueFlowTests container.send(fsm, getActivation(false)) container.expectMsg(ActivationResponse(Left(NoActivationMessage()))) - fsm.underlyingActor.creationIds = Set.empty[String] + fsm.underlyingActor.creationIds = mutable.Set.empty[String] fsm ! StateTimeout probe.expectMsg(Transition(fsm, Running, Idle)) fsm ! StateTimeout - parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Idle, Removed)) + // the queue is timed out again in the Removed state + fsm ! StateTimeout + parent.expectMsg(queueRemovedMsg) fsm ! QueueRemovedCompleted expectDataCleanUp(watcher, dataMgmtService) @@ -1147,9 +1171,11 @@ class MemoryQueueFlowTests fsm ! StateTimeout expectDataCleanUp(watcher, dataMgmtService) - parent.expectMsg(queueRemovedMsg) probe.expectMsg(Transition(fsm, Flushing, Removed)) + // the queue is timed out again in the Removed state + fsm ! StateTimeout + parent.expectMsg(queueRemovedMsg) fsm ! QueueRemovedCompleted probe.expectTerminated(fsm, 10.seconds) @@ -1229,8 +1255,6 @@ class MemoryQueueFlowTests UnregisterData(namespaceThrottlingKey), UnregisterData(actionThrottlingKey)) - parent.expectMsg(queueRemovedMsg) - probe.expectMsg(Transition(fsm, Running, Removing)) // a newly arrived message should be properly handled @@ -1240,8 +1264,6 @@ class MemoryQueueFlowTests fsm ! messages(2) - fsm ! QueueRemovedCompleted - // if there is a message, it should not terminate fsm ! StateTimeout @@ -1259,6 +1281,11 @@ class MemoryQueueFlowTests probe.expectMsg(Transition(fsm, Removing, Removed)) + // the queue is timed out again in the Removed state + fsm ! StateTimeout + parent.expectMsg(queueRemovedMsg) + fsm ! QueueRemovedCompleted + probe.expectTerminated(fsm, 10.seconds) } @@ -1328,7 +1355,6 @@ class MemoryQueueFlowTests // another queue is already running fsm ! InitialDataStorageResults(`leaderKey`, Left(AlreadyExist())) - parent.expectMsg(queueRemovedMsg) parent.expectMsg(message) @@ -1341,6 +1367,9 @@ class MemoryQueueFlowTests // move to the Deprecated state probe.expectMsg(Transition(fsm, state, Removed)) + // the queue is timed out again in the Removed state + fsm ! StateTimeout + parent.expectMsg(queueRemovedMsg) fsm ! QueueRemovedCompleted probe.expectTerminated(fsm, 10.seconds) @@ -1431,6 +1460,7 @@ class MemoryQueueFlowTests // the queue is supposed to send queueRemovedMsg once again and stops itself. parent.expectMsg(queueRemovedMsg) + fsm ! QueueRemovedCompleted probe.expectTerminated(fsm, 10.seconds) } } @@ -1501,7 +1531,7 @@ class MemoryQueueFlowTests fsm.setState(state, FlushingData(schedulingActors.ref, schedulingActors.ref, WhiskError, "whisk error")) case Removing => - fsm.underlyingActor.containers = Set(testContainerId) + fsm.underlyingActor.containers = mutable.Set(testContainerId) fsm ! message fsm.setState(state, RemovingData(schedulingActors.ref, schedulingActors.ref, outdated = true)) @@ -1537,7 +1567,7 @@ class MemoryQueueFlowTests container.send(fsm, getActivation()) container.expectMsg(ActivationResponse(Right(message))) // has no old containers for old queue, so send the message to queueManager - fsm.underlyingActor.containers = Set.empty[String] + fsm.underlyingActor.containers = mutable.Set.empty[String] fsm.underlyingActor.queue = Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(Instant.now.toEpochMilli + 1000), message)) fsm ! StopSchedulingAsOutdated @@ -1742,20 +1772,20 @@ class MemoryQueueFlowTests UnwatchEndpoint(existingContainerKey, isPrefix = true, watcherName), UnwatchEndpoint(leaderKey, isPrefix = false, watcherName)) - // queue is stale and will be removed - parent.expectMsg(queueRemovedMsg) - probe.expectMsg(Transition(fsm, state, Removed)) + // the queue is timed out againd in the Removed state + fsm ! StateTimeout + + // queue is stale and will be removed + parent.expectMsg(queueRemovedMsg) fsm ! QueueRemovedCompleted probe.expectTerminated(fsm, 10.seconds) case _ => // queue is stale and will be removed - parent.expectMsg(queueRemovedMsg) - probe.expectMsg(Transition(fsm, state, Removing)) - fsm ! QueueRemovedCompleted + probe.expectMsg(Transition(fsm, state, Removing)) // queue should not be terminated as there is an activation fsm ! StateTimeout @@ -1771,6 +1801,11 @@ class MemoryQueueFlowTests UnwatchEndpoint(leaderKey, isPrefix = false, watcherName)) probe.expectMsg(Transition(fsm, Removing, Removed)) + + // the queue is timed out againd in the Removed state + fsm ! StateTimeout + parent.expectMsg(queueRemovedMsg) + fsm ! QueueRemovedCompleted probe.expectTerminated(fsm, 10.seconds) } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala index 71184726938..d7ec5afd143 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala @@ -57,6 +57,7 @@ import org.scalatest.junit.JUnitRunner import spray.json.{JsObject, JsString} import scala.collection.immutable.Queue +import scala.collection.mutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.{higherKinds, postfixOps} @@ -478,7 +479,7 @@ class MemoryQueueTests probe2 watch fsm // do not remove itself when there are still existing containers - fsm.underlyingActor.containers = Set("1") + fsm.underlyingActor.containers = mutable.Set("1") fsm.setState(Running, RunningData(schedulerActor, droppingActor)) expectMsg(Transition(fsm, Uninitialized, Running)) fsm ! StateTimeout @@ -488,7 +489,7 @@ class MemoryQueueTests dataManagementService.expectNoMessage() // change the existing containers count to 0, the StateTimeout should work - fsm.underlyingActor.containers = Set.empty[String] + fsm.underlyingActor.containers = mutable.Set.empty[String] fsm ! StateTimeout probe.expectTerminated(schedulerActor) probe.expectTerminated(droppingActor) @@ -496,12 +497,15 @@ class MemoryQueueTests expectMsg(Transition(fsm, Running, Idle)) fsm ! StateTimeout - parent.expectMsg(QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(revision), Some(leaderKey))) expectMsg(Transition(fsm, Idle, Removed)) - fsm ! QueueRemovedCompleted dataManagementService.expectMsg(UnregisterData(leaderKey)) dataManagementService.expectMsg(UnregisterData(namespaceThrottlingKey)) dataManagementService.expectMsg(UnregisterData(actionThrottlingKey)) + + // queue is timed out again in the Removed state. + fsm ! StateTimeout + parent.expectMsg(QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(revision), Some(leaderKey))) + fsm ! QueueRemovedCompleted probe2.expectTerminated(fsm) fsm.stop() @@ -562,7 +566,7 @@ class MemoryQueueTests fsm ! Start expectMsg(Transition(fsm, Uninitialized, Running)) - fsm.underlyingActor.creationIds = Set.empty[String] + fsm.underlyingActor.creationIds = mutable.Set.empty[String] fsm ! StateTimeout expectMsg(Transition(fsm, Running, Idle)) @@ -576,8 +580,8 @@ class MemoryQueueTests .futureValue shouldBe GetActivationResponse(Right(message)) queueRef.queue.length shouldBe 0 - fsm.underlyingActor.containers = Set.empty[String] - fsm.underlyingActor.creationIds = Set.empty[String] + fsm.underlyingActor.containers = mutable.Set.empty[String] + fsm.underlyingActor.creationIds = mutable.Set.empty[String] fsm ! StateTimeout expectMsg(Transition(fsm, Running, Idle)) (fsm ? GetActivation(tid, fqn, testContainerId, false, None)) @@ -643,7 +647,7 @@ class MemoryQueueTests fsm ! Start expectMsg(Transition(fsm, Uninitialized, Running)) - fsm.underlyingActor.creationIds = Set.empty[String] + fsm.underlyingActor.creationIds = mutable.Set.empty[String] fsm ! StateTimeout expectMsg(Transition(fsm, Running, Idle)) @@ -651,9 +655,13 @@ class MemoryQueueTests expectMsg(Transition(fsm, Idle, Removed)) queueRef.queue.length shouldBe 0 fsm ! message - parent.expectMsg(queueRemovedMsg) + + // queue is timed out again in the Removed state. parent.expectMsg(message) + fsm ! StateTimeout + parent.expectMsg(queueRemovedMsg) + expectNoMessage() fsm.stop() @@ -1095,10 +1103,13 @@ class MemoryQueueTests fsm ! message probe.expectMsg(ActivationResponse.developerError("nonExecutbleAction error")) - parent.expectMsg( + parent.expectMsgAnyOf( 2 * queueConfig.flushGrace + 5.seconds, - QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey))) - parent.expectMsg(Transition(fsm, Flushing, Removed)) + QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)), + Transition(fsm, Flushing, Removed)) + + fsm ! StateTimeout + parent.expectMsg(queueRemovedMsg) fsm ! QueueRemovedCompleted parent.expectTerminated(fsm) @@ -1206,10 +1217,9 @@ class MemoryQueueTests lastAckedActivationResult.response.result shouldBe Some(JsObject("error" -> JsString("resource not enough"))) }, 5.seconds) - parent.expectMsg(queueRemovedMsg) - // should goto Removed - parent.expectMsg(Transition(fsm, Flushing, Removed)) + parent.expectMsgAnyOf(queueRemovedMsg, Transition(fsm, Flushing, Removed)) + fsm ! QueueRemovedCompleted fsm.stop() @@ -1250,7 +1260,7 @@ class MemoryQueueTests val now = Instant.now fsm.underlyingActor.queue = Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message)) - fsm.underlyingActor.containers = Set.empty[String] + fsm.underlyingActor.containers = mutable.Set.empty[String] fsm.setState(Running, RunningData(probe.ref, probe.ref)) fsm ! StopSchedulingAsOutdated // update action queueManager.expectMsg(staleQueueRemovedMsg) @@ -1294,7 +1304,7 @@ class MemoryQueueTests val now = Instant.now fsm.underlyingActor.queue = Queue.apply(TimeSeriesActivationEntry(Instant.ofEpochMilli(now.toEpochMilli + 1000), message)) - fsm.underlyingActor.containers = Set(testContainerId) + fsm.underlyingActor.containers = mutable.Set(testContainerId) fsm.setState(Running, RunningData(probe.ref, probe.ref)) fsm ! StopSchedulingAsOutdated // update action queueManager.expectMsg(staleQueueRemovedMsg) @@ -1399,10 +1409,10 @@ class MemoryQueueTests val duration = FiniteDuration(queueConfig.maxBlackboxRetentionMs, MILLISECONDS) + queueConfig.flushGrace probe.expectMsg(duration, ActivationResponse.whiskError("no available invokers")) - parent.expectMsg( + parent.expectMsgAnyOf( duration, - QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey))) - parent.expectMsg(Transition(fsm, Flushing, Removed)) + QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), Some(leaderKey)), + Transition(fsm, Flushing, Removed)) fsm ! QueueRemovedCompleted parent.expectTerminated(fsm) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala index b60472e8161..6d8d80f7214 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala @@ -29,7 +29,7 @@ import org.apache.openwhisk.common.{GracefulShutdown, TransactionId} import org.apache.openwhisk.core.WarmUp.warmUpAction import org.apache.openwhisk.core.ack.ActiveAck import org.apache.openwhisk.core.connector.test.TestConnector -import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, StatusData, StatusQuery} +import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, GetState, StatusData} import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext} import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} import org.apache.openwhisk.core.entity._ @@ -100,7 +100,8 @@ class QueueManagerTests ControllerInstanceId("0"), blocking = false, content = None) - val statusData = StatusData(testInvocationNamespace, testFQN.asString, 0, "Running", "RunningData") + val statusData = + StatusData(testInvocationNamespace, testFQN.asString, List.empty[ActivationId], "Running", "RunningData") // update start time for activation to ensure it's not stale def newActivation(start: Instant = Instant.now()): ActivationMessage = { @@ -132,7 +133,7 @@ class QueueManagerTests override def receive: Receive = { case GetActivation(_, _, _, _, _, _) => sender ! ActivationResponse(Right(newActivation())) - case StatusQuery => + case GetState => sender ! statusData } })) @@ -796,7 +797,7 @@ class QueueManagerTests (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1 - (queueManager ? StatusQuery).mapTo[Future[Iterable[StatusData]]].futureValue.futureValue shouldBe List(statusData) + (queueManager ? GetState).mapTo[Future[List[StatusData]]].flatten.futureValue shouldBe List(statusData) } it should "drop the activation message that has not been scheduled for a long time" in {