Skip to content

Commit

Permalink
Enable the scheduler by default (#5463)
Browse files Browse the repository at this point in the history
* Enable the scheduler by default

* Configure proper spi implementations to enable the scheduler

* Adjust tests category

* Enable per-minute throttling for FPC load balancer

* Fix FPCEntitlementProviderTests

* Disable akka client

* Remove a test case that does not fit with the scheduler

* Remove concurrency-related tests

* Disable the scheduler in the standalone tests

* Revert disabling akka http client

* Disable the scheduler from the workflow

* Fix the root directory

* Disable akka http client for system tests
  • Loading branch information
style95 authored Mar 6, 2024
1 parent 5529cc4 commit 4fac03a
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 137 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/2-system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ on:
env:
# openwhisk env
TEST_SUITE: System
ANSIBLE_CMD: "ansible-playbook -i environments/local -e docker_image_prefix=testing"
ANSIBLE_CMD: "ansible-playbook -i environments/local -e docker_image_prefix=testing -e container_pool_akka_client=false"
GRADLE_PROJS_SKIP: ""

## secrets
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/4-standalone.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ jobs:
sudo rm -rf "$AGENT_TOOLSDIRECTORY"
- name: Check free space
run: df -h
- name: Disable the scheduler
run: "./tools/github/disable-scheduler.sh"
- id: tests
name: Run Tests
run: "./tools/github/run${{ env.TEST_SUITE }}Tests.sh"
Expand Down
7 changes: 4 additions & 3 deletions ansible/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ Set the value of pause-grace to 10s by default
.
```

#### Enable the scheduler
- Make sure you enable the scheduler by configuring `scheduler_enable`.
#### Disable the scheduler
- You can disable the scheduler by configuring `scheduler_enable`.
- The scheduler is enabled by default.

**ansible/environments/local/group_vars/all**
```yaml
scheduler_enable: true
scheduler_enable: false
```
#### [Optional] Enable ElasticSearch Activation Store
Expand Down
2 changes: 1 addition & 1 deletion ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ durationChecker:
spi: "{{ duration_checker_spi | default('') }}"
timeWindow: "{{ duration_checker_time_window | default('1 d') }}"

enable_scheduler: "{{ scheduler_enable | default(false) }}"
enable_scheduler: "{{ scheduler_enable | default(true) }}"

scheduler:
protocol: "{{ scheduler_protocol | default('http') }}"
Expand Down
10 changes: 5 additions & 5 deletions common/scala/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ whisk.spi {
MessagingProvider = org.apache.openwhisk.connector.kafka.KafkaMessagingProvider
ContainerFactoryProvider = org.apache.openwhisk.core.containerpool.docker.DockerContainerFactoryProvider
LogStoreProvider = org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer
EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.FPCPoolBalancer
EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.FPCEntitlementProvider
AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective
InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive
InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.NoopDurationCheckerProvider
InvokerProvider = org.apache.openwhisk.core.invoker.FPCInvokerReactive
InvokerServerProvider = org.apache.openwhisk.core.invoker.FPCInvokerServer
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationCheckerProvider
}

dispatchers {
Expand Down
2 changes: 1 addition & 1 deletion core/controller/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ whisk {
timeout-addon = 1m

fpc {
use-per-min-throttles = false
use-per-min-throttles = true
}
}
controller {
Expand Down
5 changes: 3 additions & 2 deletions tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def systemIncludes = [
"org/apache/openwhisk/core/apigw/actions/test/**",
"org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*",
"org/apache/openwhisk/core/controller/test/*ControllerApiTests*",
"org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*",
"apigw/healthtests/**",
"ha/**",
"services/**",
Expand All @@ -78,7 +79,7 @@ ext.testSets = [
"org/apache/openwhisk/core/limits/**",
"org/apache/openwhisk/core/scheduler/**",
"org/apache/openwhisk/core/invoker/test/*InvokerBootUpTests*",
"org/apache/openwhisk/core/loadBalancer/test/*FPCPoolBalancerTests*",
"org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*",
"org/apache/openwhisk/common/etcd/**",
"**/*CacheConcurrencyTests*",
"**/*ControllerApiTests*",
Expand All @@ -89,6 +90,7 @@ ext.testSets = [
"REQUIRE_SYSTEM" : [
"includes" : systemIncludes,
"excludes": [
"org/apache/openwhisk/core/loadBalancer/test/*FPCPoolBalancerTests*",
"system/basic/WskMultiRuntimeTests*",
'invokerShoot/**'
]
Expand All @@ -99,7 +101,6 @@ ext.testSets = [
"org/apache/openwhisk/core/containerpool/v2/test/**",
"org/apache/openwhisk/core/scheduler/**",
"org/apache/openwhisk/core/invoker/test/*InvokerBootUpTests*",
"org/apache/openwhisk/core/loadBalancer/test/*FPCPoolBalancerTests*",
"org/apache/openwhisk/core/service/**",
]
],
Expand Down
75 changes: 0 additions & 75 deletions tests/src/test/scala/limits/ThrottleTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,68 +229,6 @@ class ThrottleTests
settleThrottles(alreadyWaited)
}
}

it should "throttle 'concurrent' activations of one action" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
val name = "checkConcurrentActionThrottle"
assetHelper.withCleaner(wsk.action, name) {
val timeoutAction = Some(TestUtils.getTestActionFilename("sleep.js"))
(action, _) =>
action.create(name, timeoutAction)
}

// The sleep is necessary as the load balancer currently has a latency before recognizing concurrency.
val sleep = 15.seconds
// Adding a bit of overcommit since some loadbalancers rely on some overcommit. This won't hurt those who don't
// since all activations are taken into account to check for throttled invokes below.
val slowInvokes = (maximumConcurrentInvokes * 1.2).toInt
val fastInvokes = 4
val fastInvokeDuration = 4.seconds
val slowInvokeDuration = sleep + fastInvokeDuration

// These invokes will stay active long enough that all are issued and load balancer has recognized concurrency.
val startSlowInvokes = Instant.now
val slowResults = untilThrottled(slowInvokes) { () =>
wsk.action.invoke(
name,
Map("sleepTimeInMs" -> slowInvokeDuration.toMillis.toJson),
expectedExitCode = DONTCARE_EXIT)
}
val afterSlowInvokes = Instant.now
val slowIssueDuration = durationBetween(startSlowInvokes, afterSlowInvokes)
println(
s"$slowInvokes slow invokes (dur = ${slowInvokeDuration.toSeconds} sec) took ${slowIssueDuration.toSeconds} seconds to issue")

// Sleep to let the background thread get the newest values (refreshes every 2 seconds)
println(s"Sleeping for ${sleep.toSeconds} sec")
Thread.sleep(sleep.toMillis)

// These fast invokes will trigger the concurrency-based throttling.
val startFastInvokes = Instant.now
val fastResults = untilThrottled(fastInvokes) { () =>
wsk.action.invoke(
name,
Map("sleepTimeInMs" -> fastInvokeDuration.toMillis.toJson),
expectedExitCode = DONTCARE_EXIT)
}
val afterFastInvokes = Instant.now
val fastIssueDuration = durationBetween(afterFastInvokes, startFastInvokes)
println(
s"$fastInvokes fast invokes (dur = ${fastInvokeDuration.toSeconds} sec) took ${fastIssueDuration.toSeconds} seconds to issue")

val combinedResults = slowResults ++ fastResults
try {
val throttledCount = throttledActivations(combinedResults, tooManyConcurrentRequests(0, 0))
throttledCount should be > 0
} finally {
val alreadyWaited = durationBetween(afterSlowInvokes, Instant.now)
settleThrottles(alreadyWaited)
println("clearing activations")
}
// wait for the activations last, giving the activations time to complete and
// may avoid unnecessarily polling; if these fail, the throttle may not be settled
println("waiting for activations to complete")
waitForActivations(combinedResults)
}
}

@RunWith(classOf[JUnitRunner])
Expand Down Expand Up @@ -458,19 +396,6 @@ class NamespaceSpecificThrottleTests
}, 2, Some(1.second))
}

it should "respect overridden concurrent throttle of 0" in withAssetCleaner(zeroConcProps) { (wp, assetHelper) =>
implicit val props = wp
val actionName = "zeroConcurrentAction"

assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
action.create(actionName, defaultAction)
}

wsk.action.invoke(actionName, expectedExitCode = TooManyRequests.intValue).stderr should {
include(prefix(tooManyConcurrentRequests(0, 0))) and include("allowed: 0")
}
}

it should "not store an activation if disabled for this namespace" in withAssetCleaner(activationDisabled) {
(wp, assetHelper) =>
implicit val props = wp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class FPCEntitlementProviderTests extends ControllerTestCommon with ScalaFutures
val someUser = WhiskAuthHelpers.newIdentity()
val action = FullyQualifiedEntityName(EntityPath("testns"), EntityName("action"))
val loadBalancer = mock[LoadBalancer]
(loadBalancer.clusterSize _).expects().returning(1).anyNumberOfTimes()
(loadBalancer
.checkThrottle(_: EntityPath, _: String))
.expects(someUser.namespace.name.toPath, action.fullPath.asString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,55 +101,6 @@ class ConcurrencyTests extends TestHelpers with WskTestHelpers with WskActorSyst
}
}

//This tests generates the same load against the same action as previous test, BUT with concurrency set to 1
it should "execute activations sequentially when concurrency = 1 " in withAssetCleaner(wskprops) {
assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))

(wp, assetHelper) =>
val name = "TestNonConcurrentAction"
assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
val actionName = TestUtils.getTestActionFilename("concurrent.js")
(action, _) =>
//disable log collection since concurrent activation requires specialized log processing
// (at action runtime and using specialized LogStore)
action.create(name, Some(actionName), logsize = Some(0.bytes), concurrency = Some(1))
}
//warm the container (concurrent activations with no warmed container, will cause multiple containers to be used - so we force one to warm up)
val run = wsk.action.invoke(name, Map("warm" -> 1.toJson), blocking = true)
withActivation(wsk.activation, run) { response =>
val logs = response.logs.get
withClue(logs) { logs.size shouldBe 0 }

response.response.status shouldBe "success"
response.response.result shouldBe Some(JsObject("warm" -> 1.toJson))
}

//read configs to determine max concurrency support - currently based on single invoker and invokerUserMemory config
val busyThreshold =
(loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool).userMemory / MemoryLimit.STD_MEMORY).toInt

//run maximum allowed concurrent actions via Futures
val requestCount = busyThreshold
println(s"executing $requestCount activations")
val runs = (1 to requestCount).map { _ =>
Future {
//expect only 1 activation concurrently (within the 1 second timeout implemented in concurrent.js)
wsk.action.invoke(name, Map("requestCount" -> 1.toJson), blocking = true)
}
}

//none of the actions will complete till the requestCount is reached
Await.result(Future.sequence(runs), 50.seconds).foreach { run =>
withActivation(wsk.activation, run) { response =>
val logs = response.logs.get
withClue(logs) { logs.size shouldBe 0 }
response.response.status shouldBe "success"
//expect only 1 activation concurrently
response.response.result shouldBe Some(JsObject("msg" -> s"Received 1 activations.".toJson))
}
}
}

it should "allow concurrent activations to gracefully complete when one fails" in withAssetCleaner(wskprops) {
assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))
(wp, assetHelper) =>
Expand Down
112 changes: 112 additions & 0 deletions tools/github/disable-scheduler.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -e

cat > ${GITHUB_WORKSPACE}/common/scala/src/main/resources/reference.conf << EOL
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
include "s3-reference.conf"
whisk.spi {
ArtifactStoreProvider = org.apache.openwhisk.core.database.CouchDbStoreProvider
ActivationStoreProvider = org.apache.openwhisk.core.database.ArtifactActivationStoreProvider
MessagingProvider = org.apache.openwhisk.connector.kafka.KafkaMessagingProvider
ContainerFactoryProvider = org.apache.openwhisk.core.containerpool.docker.DockerContainerFactoryProvider
LogStoreProvider = org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer
EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider
AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective
InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive
InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.NoopDurationCheckerProvider
}
dispatchers {
# Custom dispatcher for CouchDB Client. Tune as needed.
couch-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
# Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor
thread-pool-executor {
# Min number of threads to cap factor-based corePoolSize number to
core-pool-size-min = 2
# The core-pool-size-factor is used to determine corePoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 2.0
# Max number of threads to cap factor-based corePoolSize number to
core-pool-size-max = 32
}
# Throughput defines the number of messages that are processed in a batch
# before the thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 5
}
# Custom dispatcher for Kafka client. Tune as needed.
kafka-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
# Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor
thread-pool-executor {
# Min number of threads to cap factor-based corePoolSize number to
core-pool-size-min = 2
# The core-pool-size-factor is used to determine corePoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 2.0
# Max number of threads to cap factor-based corePoolSize number to
core-pool-size-max = 32
}
# Throughput defines the number of messages that are processed in a batch
# before the thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 5
}
lease-service-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
}
}
EOL

0 comments on commit 4fac03a

Please sign in to comment.