diff --git a/.github/workflows/2-system.yaml b/.github/workflows/2-system.yaml index 29a7514dfcc..58e293b6ea4 100644 --- a/.github/workflows/2-system.yaml +++ b/.github/workflows/2-system.yaml @@ -29,7 +29,7 @@ on: env: # openwhisk env TEST_SUITE: System - ANSIBLE_CMD: "ansible-playbook -i environments/local -e docker_image_prefix=testing" + ANSIBLE_CMD: "ansible-playbook -i environments/local -e docker_image_prefix=testing -e container_pool_akka_client=false" GRADLE_PROJS_SKIP: "" ## secrets diff --git a/.github/workflows/4-standalone.yaml b/.github/workflows/4-standalone.yaml index ceaa1d90253..a7a31098bab 100644 --- a/.github/workflows/4-standalone.yaml +++ b/.github/workflows/4-standalone.yaml @@ -65,6 +65,8 @@ jobs: sudo rm -rf "$AGENT_TOOLSDIRECTORY" - name: Check free space run: df -h + - name: Disable the scheduler + run: "./tools/github/disable-scheduler.sh" - id: tests name: Run Tests run: "./tools/github/run${{ env.TEST_SUITE }}Tests.sh" diff --git a/ansible/README.md b/ansible/README.md index 18f3f0ae87e..ceeaba5c98a 100644 --- a/ansible/README.md +++ b/ansible/README.md @@ -195,12 +195,13 @@ Set the value of pause-grace to 10s by default . ``` -#### Enable the scheduler -- Make sure you enable the scheduler by configuring `scheduler_enable`. +#### Disable the scheduler +- You can disable the scheduler by configuring `scheduler_enable`. +- The scheduler is enabled by default. **ansible/environments/local/group_vars/all** ```yaml -scheduler_enable: true +scheduler_enable: false ``` #### [Optional] Enable ElasticSearch Activation Store diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 7b80ec6a0ed..eeb6511e7a2 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -498,7 +498,7 @@ durationChecker: spi: "{{ duration_checker_spi | default('') }}" timeWindow: "{{ duration_checker_time_window | default('1 d') }}" -enable_scheduler: "{{ scheduler_enable | default(false) }}" +enable_scheduler: "{{ scheduler_enable | default(true) }}" scheduler: protocol: "{{ scheduler_protocol | default('http') }}" diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf index d43dd122105..cdf692e4c3e 100644 --- a/common/scala/src/main/resources/reference.conf +++ b/common/scala/src/main/resources/reference.conf @@ -23,12 +23,12 @@ whisk.spi { MessagingProvider = org.apache.openwhisk.connector.kafka.KafkaMessagingProvider ContainerFactoryProvider = org.apache.openwhisk.core.containerpool.docker.DockerContainerFactoryProvider LogStoreProvider = org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider - LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer - EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider + LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.FPCPoolBalancer + EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.FPCEntitlementProvider AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective - InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive - InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer - DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.NoopDurationCheckerProvider + InvokerProvider = org.apache.openwhisk.core.invoker.FPCInvokerReactive + InvokerServerProvider = org.apache.openwhisk.core.invoker.FPCInvokerServer + DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationCheckerProvider } dispatchers { diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf index 1eb4eb5805c..9a9ee10b4fd 100644 --- a/core/controller/src/main/resources/reference.conf +++ b/core/controller/src/main/resources/reference.conf @@ -31,7 +31,7 @@ whisk { timeout-addon = 1m fpc { - use-per-min-throttles = false + use-per-min-throttles = true } } controller { diff --git a/tests/build.gradle b/tests/build.gradle index 61e07b25848..30906ac7437 100644 --- a/tests/build.gradle +++ b/tests/build.gradle @@ -58,6 +58,7 @@ def systemIncludes = [ "org/apache/openwhisk/core/apigw/actions/test/**", "org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*", "org/apache/openwhisk/core/controller/test/*ControllerApiTests*", + "org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*", "apigw/healthtests/**", "ha/**", "services/**", @@ -78,7 +79,7 @@ ext.testSets = [ "org/apache/openwhisk/core/limits/**", "org/apache/openwhisk/core/scheduler/**", "org/apache/openwhisk/core/invoker/test/*InvokerBootUpTests*", - "org/apache/openwhisk/core/loadBalancer/test/*FPCPoolBalancerTests*", + "org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*", "org/apache/openwhisk/common/etcd/**", "**/*CacheConcurrencyTests*", "**/*ControllerApiTests*", @@ -89,6 +90,7 @@ ext.testSets = [ "REQUIRE_SYSTEM" : [ "includes" : systemIncludes, "excludes": [ + "org/apache/openwhisk/core/loadBalancer/test/*FPCPoolBalancerTests*", "system/basic/WskMultiRuntimeTests*", 'invokerShoot/**' ] @@ -99,7 +101,6 @@ ext.testSets = [ "org/apache/openwhisk/core/containerpool/v2/test/**", "org/apache/openwhisk/core/scheduler/**", "org/apache/openwhisk/core/invoker/test/*InvokerBootUpTests*", - "org/apache/openwhisk/core/loadBalancer/test/*FPCPoolBalancerTests*", "org/apache/openwhisk/core/service/**", ] ], diff --git a/tests/src/test/scala/limits/ThrottleTests.scala b/tests/src/test/scala/limits/ThrottleTests.scala index 7b9e00fad60..0d29afe195d 100644 --- a/tests/src/test/scala/limits/ThrottleTests.scala +++ b/tests/src/test/scala/limits/ThrottleTests.scala @@ -229,68 +229,6 @@ class ThrottleTests settleThrottles(alreadyWaited) } } - - it should "throttle 'concurrent' activations of one action" in withAssetCleaner(wskprops) { (wp, assetHelper) => - val name = "checkConcurrentActionThrottle" - assetHelper.withCleaner(wsk.action, name) { - val timeoutAction = Some(TestUtils.getTestActionFilename("sleep.js")) - (action, _) => - action.create(name, timeoutAction) - } - - // The sleep is necessary as the load balancer currently has a latency before recognizing concurrency. - val sleep = 15.seconds - // Adding a bit of overcommit since some loadbalancers rely on some overcommit. This won't hurt those who don't - // since all activations are taken into account to check for throttled invokes below. - val slowInvokes = (maximumConcurrentInvokes * 1.2).toInt - val fastInvokes = 4 - val fastInvokeDuration = 4.seconds - val slowInvokeDuration = sleep + fastInvokeDuration - - // These invokes will stay active long enough that all are issued and load balancer has recognized concurrency. - val startSlowInvokes = Instant.now - val slowResults = untilThrottled(slowInvokes) { () => - wsk.action.invoke( - name, - Map("sleepTimeInMs" -> slowInvokeDuration.toMillis.toJson), - expectedExitCode = DONTCARE_EXIT) - } - val afterSlowInvokes = Instant.now - val slowIssueDuration = durationBetween(startSlowInvokes, afterSlowInvokes) - println( - s"$slowInvokes slow invokes (dur = ${slowInvokeDuration.toSeconds} sec) took ${slowIssueDuration.toSeconds} seconds to issue") - - // Sleep to let the background thread get the newest values (refreshes every 2 seconds) - println(s"Sleeping for ${sleep.toSeconds} sec") - Thread.sleep(sleep.toMillis) - - // These fast invokes will trigger the concurrency-based throttling. - val startFastInvokes = Instant.now - val fastResults = untilThrottled(fastInvokes) { () => - wsk.action.invoke( - name, - Map("sleepTimeInMs" -> fastInvokeDuration.toMillis.toJson), - expectedExitCode = DONTCARE_EXIT) - } - val afterFastInvokes = Instant.now - val fastIssueDuration = durationBetween(afterFastInvokes, startFastInvokes) - println( - s"$fastInvokes fast invokes (dur = ${fastInvokeDuration.toSeconds} sec) took ${fastIssueDuration.toSeconds} seconds to issue") - - val combinedResults = slowResults ++ fastResults - try { - val throttledCount = throttledActivations(combinedResults, tooManyConcurrentRequests(0, 0)) - throttledCount should be > 0 - } finally { - val alreadyWaited = durationBetween(afterSlowInvokes, Instant.now) - settleThrottles(alreadyWaited) - println("clearing activations") - } - // wait for the activations last, giving the activations time to complete and - // may avoid unnecessarily polling; if these fail, the throttle may not be settled - println("waiting for activations to complete") - waitForActivations(combinedResults) - } } @RunWith(classOf[JUnitRunner]) @@ -458,19 +396,6 @@ class NamespaceSpecificThrottleTests }, 2, Some(1.second)) } - it should "respect overridden concurrent throttle of 0" in withAssetCleaner(zeroConcProps) { (wp, assetHelper) => - implicit val props = wp - val actionName = "zeroConcurrentAction" - - assetHelper.withCleaner(wsk.action, actionName) { (action, _) => - action.create(actionName, defaultAction) - } - - wsk.action.invoke(actionName, expectedExitCode = TooManyRequests.intValue).stderr should { - include(prefix(tooManyConcurrentRequests(0, 0))) and include("allowed: 0") - } - } - it should "not store an activation if disabled for this namespace" in withAssetCleaner(activationDisabled) { (wp, assetHelper) => implicit val props = wp diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala index 06724e5f0f1..a62ac24a039 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala @@ -37,6 +37,7 @@ class FPCEntitlementProviderTests extends ControllerTestCommon with ScalaFutures val someUser = WhiskAuthHelpers.newIdentity() val action = FullyQualifiedEntityName(EntityPath("testns"), EntityName("action")) val loadBalancer = mock[LoadBalancer] + (loadBalancer.clusterSize _).expects().returning(1).anyNumberOfTimes() (loadBalancer .checkThrottle(_: EntityPath, _: String)) .expects(someUser.namespace.name.toPath, action.fullPath.asString) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala index 2ea660cca8f..adeef2b87ff 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala @@ -101,55 +101,6 @@ class ConcurrencyTests extends TestHelpers with WskTestHelpers with WskActorSyst } } - //This tests generates the same load against the same action as previous test, BUT with concurrency set to 1 - it should "execute activations sequentially when concurrency = 1 " in withAssetCleaner(wskprops) { - assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean)) - - (wp, assetHelper) => - val name = "TestNonConcurrentAction" - assetHelper.withCleaner(wsk.action, name, confirmDelete = true) { - val actionName = TestUtils.getTestActionFilename("concurrent.js") - (action, _) => - //disable log collection since concurrent activation requires specialized log processing - // (at action runtime and using specialized LogStore) - action.create(name, Some(actionName), logsize = Some(0.bytes), concurrency = Some(1)) - } - //warm the container (concurrent activations with no warmed container, will cause multiple containers to be used - so we force one to warm up) - val run = wsk.action.invoke(name, Map("warm" -> 1.toJson), blocking = true) - withActivation(wsk.activation, run) { response => - val logs = response.logs.get - withClue(logs) { logs.size shouldBe 0 } - - response.response.status shouldBe "success" - response.response.result shouldBe Some(JsObject("warm" -> 1.toJson)) - } - - //read configs to determine max concurrency support - currently based on single invoker and invokerUserMemory config - val busyThreshold = - (loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool).userMemory / MemoryLimit.STD_MEMORY).toInt - - //run maximum allowed concurrent actions via Futures - val requestCount = busyThreshold - println(s"executing $requestCount activations") - val runs = (1 to requestCount).map { _ => - Future { - //expect only 1 activation concurrently (within the 1 second timeout implemented in concurrent.js) - wsk.action.invoke(name, Map("requestCount" -> 1.toJson), blocking = true) - } - } - - //none of the actions will complete till the requestCount is reached - Await.result(Future.sequence(runs), 50.seconds).foreach { run => - withActivation(wsk.activation, run) { response => - val logs = response.logs.get - withClue(logs) { logs.size shouldBe 0 } - response.response.status shouldBe "success" - //expect only 1 activation concurrently - response.response.result shouldBe Some(JsObject("msg" -> s"Received 1 activations.".toJson)) - } - } - } - it should "allow concurrent activations to gracefully complete when one fails" in withAssetCleaner(wskprops) { assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean)) (wp, assetHelper) => diff --git a/tools/github/disable-scheduler.sh b/tools/github/disable-scheduler.sh new file mode 100755 index 00000000000..950f81eee66 --- /dev/null +++ b/tools/github/disable-scheduler.sh @@ -0,0 +1,112 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -e + +cat > ${GITHUB_WORKSPACE}/common/scala/src/main/resources/reference.conf << EOL +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +include "s3-reference.conf" + +whisk.spi { + ArtifactStoreProvider = org.apache.openwhisk.core.database.CouchDbStoreProvider + ActivationStoreProvider = org.apache.openwhisk.core.database.ArtifactActivationStoreProvider + MessagingProvider = org.apache.openwhisk.connector.kafka.KafkaMessagingProvider + ContainerFactoryProvider = org.apache.openwhisk.core.containerpool.docker.DockerContainerFactoryProvider + LogStoreProvider = org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider + LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer + EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider + AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective + InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive + InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer + DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.NoopDurationCheckerProvider +} + +dispatchers { + # Custom dispatcher for CouchDB Client. Tune as needed. + couch-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + + # Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor + thread-pool-executor { + # Min number of threads to cap factor-based corePoolSize number to + core-pool-size-min = 2 + + # The core-pool-size-factor is used to determine corePoolSize of the + # ThreadPoolExecutor using the following formula: + # ceil(available processors * factor). + # Resulting size is then bounded by the core-pool-size-min and + # core-pool-size-max values. + core-pool-size-factor = 2.0 + + # Max number of threads to cap factor-based corePoolSize number to + core-pool-size-max = 32 + } + # Throughput defines the number of messages that are processed in a batch + # before the thread is returned to the pool. Set to 1 for as fair as possible. + throughput = 5 + } + + # Custom dispatcher for Kafka client. Tune as needed. + kafka-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + + # Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor + thread-pool-executor { + # Min number of threads to cap factor-based corePoolSize number to + core-pool-size-min = 2 + + # The core-pool-size-factor is used to determine corePoolSize of the + # ThreadPoolExecutor using the following formula: + # ceil(available processors * factor). + # Resulting size is then bounded by the core-pool-size-min and + # core-pool-size-max values. + core-pool-size-factor = 2.0 + + # Max number of threads to cap factor-based corePoolSize number to + core-pool-size-max = 32 + } + + # Throughput defines the number of messages that are processed in a batch + # before the thread is returned to the pool. Set to 1 for as fair as possible. + throughput = 5 + } + lease-service-dispatcher { + type = PinnedDispatcher + executor = "thread-pool-executor" + } +} +EOL