Skip to content

Commit

Permalink
try to fix issue in Remoting
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning committed Dec 28, 2024
1 parent aba5c84 commit 542840d
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions remote/src/main/scala/org/apache/pekko/remote/Remoting.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,12 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
Await.result(addressesPromise.future, StartupTimeout.duration)
if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null)

transportMapping = transports
val mapping = transports
.groupBy {
case (transport, _) => transport.schemeIdentifier
}
.map { case (k, v) => k -> v.toSet }
transportMapping = addProtocolsToMap(mapping)

defaultAddress = transports.head._2
addresses = transports.map { _._2 }.toSet
Expand Down Expand Up @@ -296,6 +297,21 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
}
}
}

private def addProtocolsToMap(
map: Map[String, Set[(PekkoProtocolTransport, Address)]]): Map[String, Set[(PekkoProtocolTransport, Address)]] = {
if (AcceptProtocolNames.size > 1) {
map.flatMap { case (protocol, transports) =>
val tcpProtocol = protocol.endsWith(".tcp")
AcceptProtocolNames.map { newProtocol =>
if (tcpProtocol)
s"$newProtocol.tcp" -> transports
else
newProtocol -> transports
}
}
} else map
}
}

/**
Expand Down Expand Up @@ -567,7 +583,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
}

OneForOneStrategy(loggingEnabled = false) {
case InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) =>
case InvalidAssociation(localAddress, remoteAddress, reason, disassociationInfo) =>
keepQuarantinedOr(remoteAddress) {
val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]"
log.warning(
Expand All @@ -580,7 +596,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
causedBy)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
}
disassiciationInfo.foreach {
disassociationInfo.foreach {
case AssociationHandle.Quarantined =>
context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress, remoteAddress))
case _ => // do nothing
Expand Down

0 comments on commit 542840d

Please sign in to comment.