Skip to content

Commit

Permalink
reimplement fix for akka/pekko cluster (#1594) (#1664)
Browse files Browse the repository at this point in the history
* Revert "revert #1568 due to test failures (#1587)"

This reverts commit 7af03e5.

* temp run nightly test in this PR

* no need for square brackets because the set print adds them

* logging to find issue

* support tcp protocols

* Update ClusterDaemon.scala

* remove temp logging

* try to fix issue in Remoting

* extra tests

* more tests

* ignore udp tests

* try to make tests tidy up after failures

* Update MixedProtocolClusterSpec.scala

* Update MixedProtocolClusterSpec.scala

* run main cluster tests for PR
  • Loading branch information
pjfanning authored Jan 2, 2025
1 parent 5572049 commit 38facf7
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 15 deletions.
47 changes: 47 additions & 0 deletions .github/workflows/build-test-prValidation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,53 @@ jobs:
-Dpekko.log.timestamps=true \
validatePullRequest
pekko-classic-remoting-tests:
name: Pekko Classic Remoting Tests
runs-on: ubuntu-22.04
if: github.repository == 'apache/pekko'
strategy:
fail-fast: false
matrix:
command:
- cluster/test distributed-data/test cluster-tools/test cluster-metrics/test
steps:
- name: Checkout
uses: actions/checkout@v4
with:
# we don't know what commit the last tag was it's safer to get entire repo so previousStableVersion resolves
fetch-depth: 0
fetch-tags: true

- name: Setup Java 11
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 11

- name: Install sbt
uses: sbt/setup-sbt@v1

- name: Cache Coursier cache
uses: coursier/cache-action@v6

- name: Enable jvm-opts
run: cp .jvmopts-ci .jvmopts

- name: sbt ${{ matrix.command }}
env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
# note that this is not running any multi-jvm tests because multi-in-test=false
run: |-
sbt \
-Djava.security.egd=file:/dev/./urandom \
-Dpekko.remote.artery.enabled=off \
-Dpekko.test.timefactor=2 \
-Dpekko.actor.testkit.typed.timefactor=2 \
-Dpekko.test.tags.exclude=gh-exclude,timing \
-Dpekko.test.multi-in-test=false \
-Dpekko.cluster.assert=on \
clean ${{ matrix.command }}
jdk-21-extra-tests:
name: Java 21 Extra Tests (including all tests that need Java 9+)
runs-on: ubuntu-22.04
Expand Down
29 changes: 17 additions & 12 deletions cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

package org.apache.pekko.cluster

import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NonFatal

import scala.annotation.nowarn
import com.typesafe.config.Config

import org.apache.pekko
Expand All @@ -30,13 +30,11 @@ import pekko.annotation.InternalApi
import pekko.cluster.ClusterEvent._
import pekko.cluster.MemberStatus._
import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import pekko.event.ActorWithLogClass
import pekko.event.Logging
import pekko.event.{ ActorWithLogClass, Logging }
import pekko.pattern.ask
import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent }
import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent, RemoteSettings }
import pekko.remote.artery.QuarantinedEvent
import pekko.util.Timeout
import pekko.util.Version
import pekko.util.{ Timeout, Version }

/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
Expand Down Expand Up @@ -365,6 +363,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val statsEnabled = PublishStatsInterval.isFinite
var gossipStats = GossipStats()

val acceptedProtocols: Set[String] = {
val remoteSettings: RemoteSettings = new RemoteSettings(context.system.settings.config)
val initSet = remoteSettings.AcceptProtocolNames
val tcpSet = initSet.map(protocol => s"$protocol.tcp")
initSet ++ tcpSet
}

var seedNodes = SeedNodes
var seedNodeProcess: Option[ActorRef] = None
var seedNodeProcessCounter = 0 // for unique names
Expand Down Expand Up @@ -701,10 +706,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
* which will reply with a `Welcome` message.
*/
def join(address: Address): Unit = {
if (address.protocol != selfAddress.protocol)
if (!acceptedProtocols.contains(address.protocol))
logWarning(
"Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]",
selfAddress.protocol,
"Trying to join member with wrong protocol, but was ignored, expected any of {} but was [{}]",
acceptedProtocols,
address.protocol)
else if (address.system != selfAddress.system)
logWarning(
Expand Down Expand Up @@ -750,10 +755,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: Version): Unit = {
if (!preparingForShutdown) {
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (joiningNode.address.protocol != selfAddress.protocol)
if (!acceptedProtocols.contains(joiningNode.address.protocol))
logWarning(
"Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.protocol,
"Member with wrong protocol tried to join, but was ignored, expected any of {} but was [{}]",
acceptedProtocols,
joiningNode.address.protocol)
else if (joiningNode.address.system != selfAddress.system)
logWarning(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.cluster

import com.typesafe.config.{ Config, ConfigFactory }

import org.apache.pekko.testkit.{ LongRunningTest, PekkoSpec }

object MixedProtocolClusterSpec {

val baseConfig: Config =
ConfigFactory.parseString("""
pekko.actor.provider = "cluster"
pekko.coordinated-shutdown.terminate-actor-system = on
pekko.remote.artery.canonical.port = 0
pekko.remote.classic.netty.tcp.port = 0
pekko.remote.artery.advanced.aeron.idle-cpu-level = 3
pekko.remote.accept-protocol-names = ["pekko", "akka"]
pekko.cluster.jmx.multi-mbeans-in-same-jvm = on
pekko.cluster.configuration-compatibility-check.enforce-on-join = off
""")

val configWithUdp: Config =
ConfigFactory.parseString("""
pekko.remote.artery.transport = "aeron-udp"
""").withFallback(baseConfig)

val configWithPekkoUdp: Config =
ConfigFactory.parseString("""
pekko.remote.protocol-name = "pekko"
""").withFallback(configWithUdp)

val configWithAkkaUdp: Config =
ConfigFactory.parseString("""
pekko.remote.protocol-name = "akka"
""").withFallback(configWithUdp)

val configWithPekkoTcp: Config =
ConfigFactory.parseString("""
pekko.remote.protocol-name = "pekko"
""").withFallback(baseConfig)

val configWithAkkaTcp: Config =
ConfigFactory.parseString("""
pekko.remote.protocol-name = "akka"
""").withFallback(baseConfig)

val configWithNetty: Config =
ConfigFactory.parseString("""
pekko.remote.artery.enabled = false
pekko.remote.classic {
enabled-transports = ["pekko.remote.classic.netty.tcp"]
}
""").withFallback(baseConfig)

val configWithPekkoNetty: Config =
ConfigFactory.parseString("""
pekko.remote.protocol-name = "pekko"
""").withFallback(configWithNetty)

val configWithAkkaNetty: Config =
ConfigFactory.parseString("""
pekko.remote.protocol-name = "akka"
""").withFallback(configWithNetty)
}

class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit {

import MixedProtocolClusterSpec._

"A node using the akka protocol" must {

"be allowed to join a cluster with a node using the pekko protocol (udp)" taggedAs LongRunningTest in {

val clusterTestUtil = new ClusterTestUtil(system.name)
try {
// start the first node with the "pekko" protocol
clusterTestUtil.newActorSystem(configWithPekkoUdp)

// have a node using the "akka" protocol join
val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaUdp)
clusterTestUtil.formCluster()

awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
} finally {
clusterTestUtil.shutdownAll()
}
}

"be allowed to join a cluster with a node using the pekko protocol (tcp)" taggedAs LongRunningTest in {

val clusterTestUtil = new ClusterTestUtil(system.name)
try {
// start the first node with the "pekko" protocol
clusterTestUtil.newActorSystem(configWithPekkoTcp)

// have a node using the "akka" protocol join
val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaTcp)
clusterTestUtil.formCluster()

awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
} finally {
clusterTestUtil.shutdownAll()
}
}

"be allowed to join a cluster with a node using the pekko protocol (netty)" taggedAs LongRunningTest in {

val clusterTestUtil = new ClusterTestUtil(system.name)
try {
// start the first node with the "pekko" protocol
clusterTestUtil.newActorSystem(configWithPekkoNetty)

// have a node using the "akka" protocol join
val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaNetty)
clusterTestUtil.formCluster()

awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
} finally {
clusterTestUtil.shutdownAll()
}
}

"allow a node using the pekko protocol to join the cluster (udp)" taggedAs LongRunningTest in {

val clusterTestUtil = new ClusterTestUtil(system.name)
try {
// create the first node with the "akka" protocol
clusterTestUtil.newActorSystem(configWithAkkaUdp)

// have a node using the "pekko" protocol join
val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoUdp)
clusterTestUtil.formCluster()

awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
} finally {
clusterTestUtil.shutdownAll()
}
}

"allow a node using the pekko protocol to join the cluster (tcp)" taggedAs LongRunningTest in {

val clusterTestUtil = new ClusterTestUtil(system.name)
try {
// create the first node with the "akka" protocol
clusterTestUtil.newActorSystem(configWithAkkaTcp)

// have a node using the "pekko" protocol join
val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoTcp)
clusterTestUtil.formCluster()

awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
} finally {
clusterTestUtil.shutdownAll()
}
}

"allow a node using the pekko protocol to join the cluster (netty)" taggedAs LongRunningTest in {

val clusterTestUtil = new ClusterTestUtil(system.name)
try {
// create the first node with the "akka" protocol
clusterTestUtil.newActorSystem(configWithAkkaNetty)

// have a node using the "pekko" protocol join
val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoNetty)
clusterTestUtil.formCluster()

awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
} finally {
clusterTestUtil.shutdownAll()
}
}
}
}
22 changes: 19 additions & 3 deletions remote/src/main/scala/org/apache/pekko/remote/Remoting.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,12 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
Await.result(addressesPromise.future, StartupTimeout.duration)
if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null)

transportMapping = transports
val mapping = transports
.groupBy {
case (transport, _) => transport.schemeIdentifier
}
.map { case (k, v) => k -> v.toSet }
transportMapping = addProtocolsToMap(mapping)

defaultAddress = transports.head._2
addresses = transports.map { _._2 }.toSet
Expand Down Expand Up @@ -296,6 +297,21 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
}
}
}

private def addProtocolsToMap(
map: Map[String, Set[(PekkoProtocolTransport, Address)]]): Map[String, Set[(PekkoProtocolTransport, Address)]] = {
if (AcceptProtocolNames.size > 1) {
map.flatMap { case (protocol, transports) =>
val tcpProtocol = protocol.endsWith(".tcp")
AcceptProtocolNames.map { newProtocol =>
if (tcpProtocol)
s"$newProtocol.tcp" -> transports
else
newProtocol -> transports
}
}
} else map
}
}

/**
Expand Down Expand Up @@ -567,7 +583,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
}

OneForOneStrategy(loggingEnabled = false) {
case InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) =>
case InvalidAssociation(localAddress, remoteAddress, reason, disassociationInfo) =>
keepQuarantinedOr(remoteAddress) {
val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]"
log.warning(
Expand All @@ -580,7 +596,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
causedBy)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
}
disassiciationInfo.foreach {
disassociationInfo.foreach {
case AssociationHandle.Quarantined =>
context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress, remoteAddress))
case _ => // do nothing
Expand Down

0 comments on commit 38facf7

Please sign in to comment.