diff --git a/Akka/src/main/scala/API/AkkaExp.scala b/Akka/src/main/scala/API/AkkaExp.scala index 1f22bba7..8783fbc5 100644 --- a/Akka/src/main/scala/API/AkkaExp.scala +++ b/Akka/src/main/scala/API/AkkaExp.scala @@ -2,6 +2,7 @@ package simulation.akka.API import akka.cluster.typed.Cluster import meta.runtime.Actor +import meta.API.{SimulationDataBuilder, TimeseriesBuilder, SnapshotBuilder} import com.typesafe.config.ConfigFactory import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} import scala.collection.JavaConversions._ @@ -9,26 +10,22 @@ import akka.actor.typed.{Behavior} import akka.actor.typed.scaladsl.Behaviors import akka.actor.NoSerializationVerificationNeeded -// import akka.actor.typed.DispatcherSelector - object AkkaExp { sealed trait Command extends NoSerializationVerificationNeeded - final case class SpawnDriver(totalWorkers: Int, totalTurn: Long) extends Command - final case class SpawnWorker(workerId: Int, sims: Seq[Actor], totalWorkers: Int) extends Command + final case class SpawnDriver(totalWorkers: Int, totalTurn: Long, logControllerOn: Boolean) extends Command + final case class SpawnWorker(workerId: Int, sims: Seq[Actor], totalWorkers: Int, logControllerOn: Boolean) extends Command final case class SpawnLogController(totalWorkers: Int) extends Command final case class DriverStopped() extends Command final case class WorkerStopped(workerId: Int, sims: Seq[Actor]) extends Command final case class LogControllerStopped() extends Command - + var cluster: Cluster = null var totalWorkers: Int = 0 val stoppedWorkers: ConcurrentLinkedQueue[Int] = new ConcurrentLinkedQueue[Int]() var activeWorkers: ConcurrentLinkedQueue[Int] = new ConcurrentLinkedQueue[Int]() var finalAgents: ConcurrentLinkedQueue[Actor] = new ConcurrentLinkedQueue[Actor]() - val defaultHaltCond: Iterable[Iterable[Serializable]] => Boolean = (x: Iterable[Iterable[Serializable]]) => false - var haltCond: Iterable[Iterable[Serializable]] => Boolean = null - def materializedMachine(mid: Int, totalTurn: Long, totalWorkers: Int, actors: IndexedSeq[Actor]=Vector[Actor]()): Behavior[Command] = + def materializedMachine(mid: Int, totalTurn: Long, totalWorkers: Int, builder: SimulationDataBuilder, actors: IndexedSeq[Actor]): Behavior[Command] = Behaviors.setup { ctx => cluster = Cluster(ctx.system) this.totalWorkers = totalWorkers @@ -49,13 +46,13 @@ object AkkaExp { } else { actors.slice(i*actorsPerWorker, (i+1)*actorsPerWorker) } - ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers) + ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, false) } - waitTillFinish(Vector.empty) + // simulateUntil supports only Standalone mode for now + waitTillFinish(Vector.empty, builder, None) } - def apply(totalTurn: Long, totalWorkers: Int, actors: IndexedSeq[Actor]=Vector[Actor](), - cond: Iterable[Iterable[Serializable]] => Boolean = defaultHaltCond): Behavior[Command] = + def apply(totalTurn: Long, totalWorkers: Int, builder: SimulationDataBuilder, actors: IndexedSeq[Actor], haltCond: Option[Iterable[Iterable[Serializable]] => Boolean]): Behavior[Command] = Behaviors.setup { ctx => cluster = Cluster(ctx.system) this.totalWorkers = totalWorkers @@ -63,16 +60,14 @@ object AkkaExp { val totalActors = actors.size var actorsPerWorker = totalActors/totalWorkers - if (cond != defaultHaltCond) { - haltCond = cond - } - stoppedWorkers.clear() activeWorkers.clear() finalAgents.clear() ctx.log.debug(f"${actorsPerWorker} actors per worker") + val logControllerOn = haltCond.isDefined || builder.isInstanceOf[TimeseriesBuilder] + // Worker id is 0-indexed if (roles.exists(p => p.startsWith("Worker"))) { ctx.log.debug(f"Creating a worker!") @@ -82,7 +77,7 @@ object AkkaExp { } else { actors.slice(wid*actorsPerWorker, (wid+1)*actorsPerWorker) } - ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers) + ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, logControllerOn) } // Machine id is 0-indexed @@ -97,56 +92,67 @@ object AkkaExp { } else { actors.slice(wid*actorsPerWorker, (wid+1)*actorsPerWorker) } - ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers) + ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, logControllerOn) } } if (cluster.selfMember.hasRole("Driver")) { ctx.log.debug(f"Creating a driver!") - ctx.self ! SpawnDriver(totalWorkers, totalTurn) + ctx.self ! SpawnDriver(totalWorkers, totalTurn, logControllerOn) // Co-locate the log controller with driver - if (simulation.akka.API.OptimizationConfig.logControllerEnabled) { + if (logControllerOn) { ctx.self ! SpawnLogController(totalWorkers) } } if (cluster.selfMember.hasRole("Standalone")) { ctx.log.debug(f"Standalone mode") - ctx.self ! SpawnDriver(totalWorkers, totalTurn) - if (simulation.akka.API.OptimizationConfig.logControllerEnabled) { + ctx.self ! SpawnDriver(totalWorkers, totalTurn, logControllerOn) + + if (logControllerOn) { ctx.self ! SpawnLogController(totalWorkers) } + for (i <- Range(0, totalWorkers)){ val containedAgents = if (i == totalWorkers-1){ actors.slice(i*actorsPerWorker, totalActors) } else { actors.slice(i*actorsPerWorker, (i+1)*actorsPerWorker) } - ctx.self ! SpawnWorker(i, containedAgents, totalWorkers) + ctx.self ! SpawnWorker(i, containedAgents, totalWorkers, logControllerOn) } } - waitTillFinish(Vector.empty) + waitTillFinish(Vector.empty, builder, haltCond) } - def waitTillFinish(finalAgents: IndexedSeq[Actor]): Behavior[Command] = { + def waitTillFinish(finalAgents: IndexedSeq[Actor], builder: SimulationDataBuilder, haltCond: Option[Iterable[Iterable[Serializable]] => Boolean]): Behavior[Command] = { Behaviors.receive { (ctx, message) => message match { - case SpawnDriver(totalWorkers, totalTurn) => - val driver = ctx.spawn((new simulation.akka.core.Driver).apply(totalWorkers, totalTurn), "driver") + case SpawnDriver(totalWorkers, totalTurn, logControllerOn) => + val driver = ctx.spawn((new simulation.akka.core.Driver).apply(totalWorkers, totalTurn, logControllerOn), "driver") ctx.watchWith(driver, DriverStopped()) Behaviors.same case SpawnLogController(totalWorkers) => - val logController = if (haltCond != null) { - ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, haltCond), "logController") + val logController = if (haltCond.isDefined) { + // ctx.log.info("Conditional termination is defined!") + ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, haltCond.get, builder.asInstanceOf[TimeseriesBuilder]), "logController") } else { - ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers), "logController") + // ctx.log.info("Conditional termination is nto defined!") + ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, builder.asInstanceOf[TimeseriesBuilder]), "logController") } ctx.watchWith(logController, LogControllerStopped()) Behaviors.same - case SpawnWorker(workerId, agents, totalWorkers) => - val sim = ctx.spawn((new simulation.akka.core.Worker).apply(workerId, agents, totalWorkers), f"worker${workerId}") + case SpawnWorker(workerId, agents, totalWorkers, logControllerOn) => + val sim = builder match { + case x: TimeseriesBuilder => { + ctx.spawn((new simulation.akka.core.Worker).apply(workerId, agents, totalWorkers, Some(x.asInstanceOf[TimeseriesBuilder])), f"worker${workerId}") + } + case _: SnapshotBuilder => { + ctx.spawn((new simulation.akka.core.Worker).apply(workerId, agents, totalWorkers, None), f"worker${workerId}") + } + } activeWorkers.add(workerId) ctx.watchWith(sim, WorkerStopped(workerId, agents)) Behaviors.same @@ -175,12 +181,12 @@ object AkkaExp { if (!stoppedWorkers.contains(workerId)){ stoppedWorkers.add(workerId) if (activeWorkers.toSet.diff(stoppedWorkers.toSet).isEmpty){ - Simulate.addStoppedAgents(finalAgents ++ agents) + builder.addAgents(finalAgents ++ agents) Behaviors.stopped {() => ctx.system.terminate() } } else { - waitTillFinish(finalAgents ++ agents) + waitTillFinish(finalAgents ++ agents, builder, haltCond) } } else { if (activeWorkers.toSet.diff(stoppedWorkers.toSet).isEmpty){ @@ -188,7 +194,7 @@ object AkkaExp { ctx.system.terminate() } } else { - waitTillFinish(finalAgents) + waitTillFinish(finalAgents, builder, haltCond) } } } else { diff --git a/Akka/src/main/scala/API/Optimization.scala b/Akka/src/main/scala/API/Optimization.scala index 87176a34..ebe80ca5 100644 --- a/Akka/src/main/scala/API/Optimization.scala +++ b/Akka/src/main/scala/API/Optimization.scala @@ -7,10 +7,6 @@ case object MergedWorker extends Optimization object OptimizationConfig { var conf: Optimization = MergedWorker - var logControllerEnabled: Boolean = false - - var timeseriesSchema: SimulationTimeseries = FullTimeseries - // todo: tmp, fix with proper availability input var availability: Int = 1 diff --git a/Akka/src/main/scala/API/Simulate.scala b/Akka/src/main/scala/API/Simulate.scala index 9c39a52b..4951e1fb 100644 --- a/Akka/src/main/scala/API/Simulate.scala +++ b/Akka/src/main/scala/API/Simulate.scala @@ -1,101 +1,88 @@ package simulation.akka.API import com.typesafe.config.ConfigFactory -import meta.API.SimulationSnapshot +import meta.API.{DeforestationStrategy, SimulationData, SimulationDataBuilder, SnapshotBuilder, TimeseriesBuilder} import meta.runtime.{Actor, Message} import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.typed.ActorSystem object Simulate { - private var stoppedAgents = IndexedSeq[Actor]() + def apply(actors: IndexedSeq[Actor], totalTurn: Long, conf: Map[String, Any], cond: Option[Iterable[Iterable[Serializable]] => Boolean] = None)(implicit strategy: DeforestationStrategy): SimulationData = { - var lastWords: IndexedSeq[Message] = IndexedSeq[Message]() + require(conf.isDefinedAt("role")) // Standalone, Driver, Machine-$id + require(conf.isDefinedAt("port")) // network port + require(conf.isDefinedAt("name")) // name of the actor system, to allow concurrent simulations + require(conf.isDefinedAt("data")) // timeseries or snapshot - def addStoppedAgents(agents: IndexedSeq[Actor]): Unit = { - stoppedAgents = agents - } - - var timeseries: Iterable[Iterable[Serializable]] = null - - def initialize(): Unit = { - stoppedAgents=IndexedSeq[Actor]() - lastWords=IndexedSeq[Message]() - } - - def driver(totalTurn: Long, port: Int = 25251): SimulationSnapshot = { - initialize() + val role: String = conf("role").asInstanceOf[String] + val port: Int = conf("port").asInstanceOf[Int] + val name: String = conf("name").asInstanceOf[String] + val dataConf: String = conf("data").asInstanceOf[String] - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [Driver] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents - val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt - val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt - var totalWorkers = workersPerMachine * totalMachines - println(f"${totalMachines} total machines, ${totalWorkers} total workers") - val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers), "SimsCluster", config) - Await.ready(actorSystem.whenTerminated, 10.days) - println("Simulation ends!") - SimulationSnapshot(stoppedAgents, lastWords) - } - - - // Materialized (actors are all containedActors) - def machine(mid: Int, actors: IndexedSeq[Actor], totalTurn: Long, port: Int = 0): SimulationSnapshot = { - initialize() - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [Machine-$mid] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents - val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt - val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt - var totalWorkers = workersPerMachine * totalMachines - println(f"${totalMachines} total machines, ${totalWorkers} total workers") - - val actorSystem = ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, actors), "SimsCluster", config) - Await.ready(actorSystem.whenTerminated, 10.days) - println("Simulation ends!") - SimulationSnapshot(stoppedAgents, lastWords) - } - - def apply(actors: IndexedSeq[Actor], totalTurn: Long, - role: String= "Standalone", port: Int = 25251): SimulationSnapshot = { - initialize() - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [$role] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt var totalWorkers = workersPerMachine * totalMachines + println(f"${totalMachines} total machines, ${totalWorkers} total workers, and ${actors.size} actors") - // well-formedness check - val machinePrefix = "Machine-" - val workerPrefix = "Worker-" - try { - role match { - case "Standalone" => totalWorkers = workersPerMachine // ignore totalMachine setting - case "Driver" => - case s if s.startsWith(machinePrefix) && s.stripPrefix(machinePrefix).toInt < totalMachines => - case s if s.startsWith(workerPrefix) && s.stripPrefix(workerPrefix).toInt < totalWorkers => - case _ => throw new Exception("Invalid role!") - } - } catch { - case e: Exception => throw new Exception(f"Invalid role ${role}. Available roles are Standalone, Driver, Machine-id, or Worker-id. Replacing id with 0-based int (less than total machines or workers)") - } if (totalWorkers > actors.size){ println(f"Found more workers than agents! Set total workers from ${totalWorkers} to ${actors.size}") totalWorkers = actors.size } - val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers, actors), "SimsCluster", config) + val machinePrefix = "Machine-" + val builder: SimulationDataBuilder = if (dataConf == "timeseries") { + new TimeseriesBuilder(strategy) + } else { + new SnapshotBuilder() + } + + val ip: String = conf.getOrElse("ip", "localhost").asInstanceOf[String] + + val actorSystem = role match { + case "Standalone" => { + // local mode + val config = ConfigFactory.parseString(s""" + akka.remote.artery.canonical.port=$port + akka.remote.artery.canonical.hostname=localhost + akka.cluster.roles = [$role] + akka.cluster.seed-nodes = ["akka://$name@localhost:$port"] + """).withFallback(ConfigFactory.load("application")) + ActorSystem(AkkaExp(totalTurn, totalWorkers, builder, actors, cond), name, config) + } + case "Driver" => { + require(conf.isDefinedAt("ip")) + // By default, driver is also the seed node + val config = ConfigFactory.parseString(s""" + akka.remote.artery.canonical.hostname=$ip + akka.remote.artery.canonical.port=$port + akka.cluster.roles = [$role] + akka.cluster.seed-nodes = ["akka://$name@$ip:$port"] + """).withFallback(ConfigFactory.load("application")) + ActorSystem(AkkaExp(totalTurn, totalWorkers, builder, Vector[Actor](), None), name, config) + } + case s if s.startsWith(machinePrefix) => { + require(conf.isDefinedAt("ip")) + require(conf.isDefinedAt("seed")) // ip:port + val seed: String = conf("seed").asInstanceOf[String] + val config = ConfigFactory.parseString(s""" + akka.remote.artery.canonical.hostname=$ip + akka.remote.artery.canonical.port=$port + akka.cluster.roles = [$role] + akka.cluster.seed-nodes = ["akka://$name@$seed"] + """).withFallback(ConfigFactory.load("application")) + + // 0-based + val mid = s.stripPrefix(machinePrefix).toInt + assert(mid < totalMachines) + ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, builder, actors), name, config) + } + case _ => throw new Exception("Invalid role! Supported roles are Standalone, Driver, and Machine-$id (o-based)") + } Await.ready(actorSystem.whenTerminated, 10.days) + println("Simulation ends!") - SimulationSnapshot(stoppedAgents, lastWords) + builder.build() } -} +} \ No newline at end of file diff --git a/Akka/src/main/scala/API/SimulateUntil.scala b/Akka/src/main/scala/API/SimulateUntil.scala deleted file mode 100644 index 7b0168fa..00000000 --- a/Akka/src/main/scala/API/SimulateUntil.scala +++ /dev/null @@ -1,82 +0,0 @@ -package simulation.akka.API - -import com.typesafe.config.ConfigFactory -import meta.API.SimulationSnapshot -import meta.runtime.{Actor, Message} -import scala.concurrent.Await -import scala.concurrent.duration._ -import akka.actor.typed.ActorSystem - -class SimulateUntil { - OptimizationConfig.logControllerEnabled = true - var timeseries: Iterable[Iterable[Serializable]] = null - - def driver(totalTurn: Long, port: Int = 25251): Unit = { - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [Driver] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents - val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt - val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt - var totalWorkers = workersPerMachine * totalMachines - println(f"${totalMachines} total machines, ${totalWorkers} total workers") - val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers), "SimsCluster", config) - Await.ready(actorSystem.whenTerminated, 10.days) - println("Simulation ends!") - } - - - // Materialized (actors are all containedActors) - def machine(mid: Int, actors: IndexedSeq[Actor], totalTurn: Long, port: Int = 0): Unit = { - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [Machine-$mid] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents - val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt - val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt - var totalWorkers = workersPerMachine * totalMachines - println(f"${totalMachines} total machines, ${totalWorkers} total workers") - - val actorSystem = ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, actors), "SimsCluster", config) - Await.ready(actorSystem.whenTerminated, 10.days) - println("Simulation ends!") - } - - def apply(actors: IndexedSeq[Actor], totalTurn: Long, - cond: Iterable[Iterable[Serializable]] => Boolean, role: String= "Standalone", port: Int = 25251): Unit = { - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [$role] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents - val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt - val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt - var totalWorkers = workersPerMachine * totalMachines - println(f"${totalMachines} total machines, ${totalWorkers} total workers, and ${actors.size} actors") - // well-formedness check - val machinePrefix = "Machine-" - val workerPrefix = "Worker-" - try { - role match { - case "Standalone" => totalWorkers = workersPerMachine // ignore totalMachine setting - case "Driver" => - case s if s.startsWith(machinePrefix) && s.stripPrefix(machinePrefix).toInt < totalMachines => - case s if s.startsWith(workerPrefix) && s.stripPrefix(workerPrefix).toInt < totalWorkers => - case _ => throw new Exception("Invalid role!") - } - } catch { - case e: Exception => throw new Exception(f"Invalid role ${role}. Available roles are Standalone, Driver, Machine-id, or Worker-id. Replacing id with 0-based int (less than total machines or workers)") - } - - if (totalWorkers > actors.size){ - println(f"Found more workers than agents! Set total workers from ${totalWorkers} to ${actors.size}") - totalWorkers = actors.size - } - - val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers, actors, cond), "SimsCluster", config) - Await.ready(actorSystem.whenTerminated, 10.days) - println("Simulation ends!") - } -} diff --git a/Akka/src/main/scala/API/SimulationTimeseries.scala b/Akka/src/main/scala/API/SimulationTimeseries.scala deleted file mode 100644 index 66b11edf..00000000 --- a/Akka/src/main/scala/API/SimulationTimeseries.scala +++ /dev/null @@ -1,23 +0,0 @@ -package simulation.akka.API - -import meta.runtime.Actor -import scala.collection.mutable.Map - -abstract class SimulationTimeseries { - def mapper(x: Serializable): Serializable - def reducer(x: Iterable[Iterable[Serializable]]): Iterable[Serializable] -} - -case object FullTimeseries extends SimulationTimeseries { - // a sequential worker applies the mapper to each agent - override def mapper(x: Serializable): Serializable = { - x - } - - // workers each send an Iterable[Serializable] to the log controller. - // Log controller collects Iterable[Iterable[Serializable]] and - // applies the reducer method to reduce the intermediate data - override def reducer(x: Iterable[Iterable[Serializable]]): Iterable[Serializable] = { - x.flatten - } -} \ No newline at end of file diff --git a/Akka/src/main/scala/core/DriverImpl.scala b/Akka/src/main/scala/core/DriverImpl.scala index ece1d68b..b2774749 100644 --- a/Akka/src/main/scala/core/DriverImpl.scala +++ b/Akka/src/main/scala/core/DriverImpl.scala @@ -24,13 +24,12 @@ class Driver { private var acceptedInterval: Int = 0 private var availability: Int = simulation.akka.API.OptimizationConfig.availability - private val logControllerEnabled = simulation.akka.API.OptimizationConfig.logControllerEnabled var start: Long = 0 var end: Long = 0 var initialStart: Long = 0 - def apply(workers: Int, maxTurn: Long): Behavior[DriverEvent] = Behaviors.setup {ctx => + def apply(workers: Int, maxTurn: Long, logControllerEnabled: Boolean): Behavior[DriverEvent] = Behaviors.setup {ctx => totalWorkers = workers totalTurn = maxTurn currentTurn = 0 @@ -71,10 +70,10 @@ class Driver { if (logControllerEnabled) { ctx.system.receptionist ! Receptionist.Subscribe(LogControllerSpec.LoggerStopServiceKey, workerSub) } - driver() + driver(logControllerEnabled) } - def driver(): Behavior[DriverEvent] = + def driver(logControllerEnabled: Boolean): Behavior[DriverEvent] = Behaviors.receive[DriverEvent] { (ctx, message) => message match { case InitializeWorkers() => @@ -109,7 +108,7 @@ class Driver { RoundEnd() }, timeout=1000.seconds).apply()) - driver() + driver(logControllerEnabled) } case RoundEnd() => @@ -125,7 +124,7 @@ class Driver { } else { ctx.self ! RoundStart() } - driver() + driver(logControllerEnabled) case LogControllerFinished() => Behaviors.stopped {() => diff --git a/Akka/src/main/scala/core/LogControllerImpl.scala b/Akka/src/main/scala/core/LogControllerImpl.scala index 4ca37b40..f2db9418 100644 --- a/Akka/src/main/scala/core/LogControllerImpl.scala +++ b/Akka/src/main/scala/core/LogControllerImpl.scala @@ -1,5 +1,6 @@ package simulation.akka.core +import meta.API.TimeseriesBuilder import java.util.concurrent.{ConcurrentHashMap} import akka.actor.typed.receptionist.{Receptionist} import java.util.concurrent.atomic.AtomicInteger @@ -28,15 +29,15 @@ class LogController { var interruptDriver: Set[ActorRef[DriverSpec.InterruptDriver]] = Set() var haltCond: Iterable[Iterable[Serializable]] => Boolean = null - def apply(workers: Int): Behavior[LogControllerEvent] = Behaviors.setup {ctx => + def apply(workers: Int, builder: TimeseriesBuilder): Behavior[LogControllerEvent] = Behaviors.setup {ctx => totalWorkers = workers // Let workers and driver discover the log controller ctx.system.receptionist ! Receptionist.Register(LoggerAggregateServiceKey, ctx.self) ctx.system.receptionist ! Receptionist.Register(LoggerStopServiceKey, ctx.self) - logController() + logController(builder) } - def apply(workers: Int, haltCond: Iterable[Iterable[Serializable]] => Boolean): Behavior[LogControllerEvent] = Behaviors.setup {ctx => + def apply(workers: Int, haltCond: Iterable[Iterable[Serializable]] => Boolean, builder: TimeseriesBuilder): Behavior[LogControllerEvent] = Behaviors.setup {ctx => totalWorkers = workers this.haltCond = haltCond // Let workers and driver discover the log controller @@ -53,10 +54,10 @@ class LogController { } ctx.system.receptionist ! Receptionist.Subscribe(DriverSpec.InterruptDriverServiceKey, logControllerSub) - logControllerWithInterrupt() + logControllerWithInterrupt(builder) } - def logController(): Behavior[LogControllerEvent] = + def logController(builder: TimeseriesBuilder): Behavior[LogControllerEvent] = Behaviors.receive[LogControllerEvent] { (ctx, message) => message match { case AggregateLog(wid, time, agents) => @@ -65,7 +66,7 @@ class LogController { }).put(wid, agents) if (indexedTimeseries.get(time).size == totalWorkers) { reducedTimeseries.computeIfAbsent(time, x => { - simulation.akka.API.OptimizationConfig.timeseriesSchema.reducer(indexedTimeseries.remove(time).toSeq.map(_._2)) + builder.strategy.reducer(indexedTimeseries.remove(time).toSeq.map(_._2)) }) } Behaviors.same @@ -76,7 +77,7 @@ class LogController { } // wait up to $timeout$ ms for the rest of log to arrive from workers if ((reducedTimeseries.containsKey(time)) || ((System.currentTimeMillis() - firstReceivedStop) > timeout)) { - simulation.akka.API.Simulate.timeseries = reducedTimeseries.toSeq.sortBy(_._1).map(_._2) + builder.addTimeseries(reducedTimeseries.toSeq.sortBy(_._1).map(_._2)) replyTo ! LogControllerFinished() Behaviors.stopped {() => ctx.log.info("Time series has " + indexedTimeseries.size + " entries") @@ -84,11 +85,11 @@ class LogController { } } ctx.self ! Stop(time, replyTo) - logController() + logController(builder) } } - def logControllerWithInterrupt(): Behavior[LogControllerEvent] = + def logControllerWithInterrupt(builder: TimeseriesBuilder): Behavior[LogControllerEvent] = Behaviors.receive[LogControllerEvent] { (ctx, message) => message match { case RegisterDriverInterrupt() => @@ -118,7 +119,7 @@ class LogController { } // wait up to $timeout$ ms for the rest of log to arrive from workers if ((reducedTimeseries.containsKey(time)) || ((System.currentTimeMillis() - firstReceivedStop) > timeout)) { - simulation.akka.API.Simulate.timeseries = timeseries.toList + builder.addTimeseries(timeseries) replyTo ! LogControllerFinished() Behaviors.stopped {() => ctx.log.info("Time series has " + indexedTimeseries.size + " entries") @@ -126,7 +127,7 @@ class LogController { } } ctx.self ! Stop(time, replyTo) - logController() + logController(builder) } } } diff --git a/Akka/src/main/scala/core/WorkerImpl.scala b/Akka/src/main/scala/core/WorkerImpl.scala index a7593cf6..9b045ec1 100644 --- a/Akka/src/main/scala/core/WorkerImpl.scala +++ b/Akka/src/main/scala/core/WorkerImpl.scala @@ -1,6 +1,7 @@ package simulation.akka.core import meta.runtime.{Actor, Message} +import meta.API.TimeseriesBuilder import scala.collection.mutable.{Buffer, Map => MutMap} import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} @@ -36,9 +37,8 @@ class Worker { private var completedAgents: Long = 0 private var registeredWorkers: AtomicInteger = new AtomicInteger(0) - private val logControllerEnabled = simulation.akka.API.OptimizationConfig.logControllerEnabled - def apply(id: Int, sims: Seq[Actor], totalWorkers: Int): Behavior[WorkerEvent] = Behaviors.setup { ctx => + def apply(id: Int, sims: Seq[Actor], totalWorkers: Int, builder: Option[TimeseriesBuilder]): Behavior[WorkerEvent] = Behaviors.setup { ctx => localSims = sims.map(x => (x.id, x)).toMap totalAgents = sims.size this.totalWorkers = totalWorkers @@ -77,18 +77,18 @@ class Worker { ctx.system.receptionist ! Receptionist.Register(WorkerStopServiceKey, ctx.self) } - if (logControllerEnabled) { + if (builder.isDefined) { ctx.system.receptionist ! Receptionist.Subscribe(LogControllerSpec.LoggerAggregateServiceKey, workerSub) } - worker() + worker(builder) } // Consider replacing receivedWorkers with a total workers - private def worker(): Behavior[WorkerEvent] = + private def worker(builder: Option[TimeseriesBuilder]): Behavior[WorkerEvent] = Behaviors.receive[WorkerEvent] { (ctx, message) => message match { case Prepare() => - worker() + worker(builder) case ReceiveAgentMap(wid, nameIds, reply) => peerWorkers.computeIfAbsent(wid, x => { @@ -129,7 +129,7 @@ class Worker { if (receivedWorkers.keys().size == totalWorkers-1){ ctx.self ! RoundStart() } - worker() + worker(builder) case RoundStart() => ctx.log.debug(f"Worker ${workerId} starts! Received from ${receivedWorkers.keys().toSet}") @@ -170,7 +170,7 @@ class Worker { }) } ctx.self ! AgentsCompleted() - worker() + worker(builder) case ExpectedReceives(replyTo, acceptedInterval, availability) => // send out messages to other workers only at the beginning of a round to avoid race condition @@ -188,13 +188,13 @@ class Worker { if (receivedWorkers.keys().size == totalWorkers-1){ ctx.self ! RoundStart() } - worker() + worker(builder) case AgentsCompleted() => end = System.currentTimeMillis() ctx.log.debug(f"Worker ${workerId} runs for ${end-start} ms, propose ${proposeInterval}") - if (logControllerEnabled){ - loggerRef ! LogControllerSpec.AggregateLog(workerId, logicalClock, localSims.map(s => s._2).map(agent => simulation.akka.API.OptimizationConfig.timeseriesSchema.mapper(agent.SimClone()))) + if (builder.isDefined){ + loggerRef ! LogControllerSpec.AggregateLog(workerId, logicalClock, localSims.map(s => s._2).map(agent => builder.get.strategy.mapper(agent.SimClone()))) } sendToRef ! SendTo(workerId, proposeInterval) diff --git a/Akka/src/test/scala/GoLTileTest.scala b/Akka/src/test/scala/GoLTileTest.scala index 3292dfd7..e73a4feb 100644 --- a/Akka/src/test/scala/GoLTileTest.scala +++ b/Akka/src/test/scala/GoLTileTest.scala @@ -222,6 +222,7 @@ class GoLTileTest extends FlatSpec { agents.foreach(a => { a.msgGenerator = a.connectedAgentIds.map(i => (i, a.tile.tbs(tiles(i.toInt)))).toMap }) - val snapshot1 = API.Simulate(agents, 200) + val conf = Map("role" -> "Standalone", "port" -> 25100, "name" -> "GoLTile", "data" -> "snapshot") + val snapshot1 = API.Simulate(agents, 200, conf) } } \ No newline at end of file diff --git a/Akka/src/test/scala/piccoloTest.scala b/Akka/src/test/scala/piccoloTest.scala index a0933bc7..53b22618 100644 --- a/Akka/src/test/scala/piccoloTest.scala +++ b/Akka/src/test/scala/piccoloTest.scala @@ -3,15 +3,15 @@ package test import simulation.akka.API._ import org.scalatest.FlatSpec +import meta.API.{SimulationData, DeforestationStrategy} class piccolo extends FlatSpec { val totalRounds: Int = 100 f"The page rank algorithm with vertices, sequential workers" should f"complete" in { val agents = generated.example.piccolo.InitData() - API.OptimizationConfig.logControllerEnabled = true - API.OptimizationConfig.timeseriesSchema = FullTimeseries - val snapshot1 = API.Simulate(agents, totalRounds) - API.Simulate.timeseries.foreach(t => { println(t) }) + val conf = Map("role" -> "Standalone", "port" ->25400, "name" -> "Piccolo", "data" -> "timeseries") + val ts = API.Simulate(agents, totalRounds, conf)(DeforestationStrategy.NoReduction) + ts.timeseries.foreach(t => { println(t) }) } } \ No newline at end of file diff --git a/Akka/src/test/scala/shortestPathTest.scala b/Akka/src/test/scala/shortestPathTest.scala index 84188974..bf00995a 100644 --- a/Akka/src/test/scala/shortestPathTest.scala +++ b/Akka/src/test/scala/shortestPathTest.scala @@ -3,30 +3,27 @@ package test import org.scalatest.FlatSpec import simulation.akka.API._ +import meta.API.DeforestationStrategy +import spire.std.iterable class shortestPath extends FlatSpec { val totalVertices: Int = 50 val totalRounds: Int = 50 - case class Distance(dist: Int) extends Serializable - case object ShortestPathTimeseries extends SimulationTimeseries { - // a sequential worker applies the mapper to each agent - override def mapper(x: Serializable): Serializable = { - Distance(x.asInstanceOf[generated.core.test.shortestPath.Vertex].dist) - } - // the driver sends an Iterable[Serializable] to the log controller. Log controller collects Iterable[Iterable[Serializable]] - // and applies the reducer method to reduce the intermediate data - override def reducer(x: Iterable[Iterable[Serializable]]): Iterable[Serializable] = { - x.flatten + f"The single source shortest path algorithm over a linked list with ${totalVertices} vertices, sequential workers" should f"update the distance of all vertices in ${totalVertices} rounds" in { + case class Distance(dist: Int) extends Serializable + object ShortestPathOptStrategy extends DeforestationStrategy { + override def mapper(x: Serializable): Serializable = { + Distance(x.asInstanceOf[generated.core.test.shortestPath.Vertex].dist) + } } - } - f"The single source shortest path algorithm over a linked list with ${totalVertices} vertices, sequential workers" should f"update the distance of all vertices in ${totalVertices} rounds" in { val agents = generated.core.test.shortestPath.InitData() - API.OptimizationConfig.logControllerEnabled = true - API.OptimizationConfig.timeseriesSchema = ShortestPathTimeseries - val snapshot1 = API.Simulate(agents, totalRounds) - // assert(snapshot1.sims.map(i => i.asInstanceOf[generated.core.test.shortestPath.Vertex].dist).toSet == Range(0, totalVertices).toSet) - API.Simulate.timeseries.foreach(t => { println(t) }) + val conf = Map("role" -> "Standalone", + "port" -> 25300, + "name" -> "ShortestPath", + "data" -> "timeseries") + val ts = API.Simulate(agents, totalRounds, conf)(ShortestPathOptStrategy) + ts.timeseries.foreach(t => { println(t) }) } } \ No newline at end of file diff --git a/Akka/src/test/scala/simulateUntilTest.scala b/Akka/src/test/scala/simulateUntilTest.scala index f5592d51..a6eb2453 100644 --- a/Akka/src/test/scala/simulateUntilTest.scala +++ b/Akka/src/test/scala/simulateUntilTest.scala @@ -13,12 +13,16 @@ class simulateUntilTest extends FlatSpec { val population = 10000 val graph = cloudcity.lib.Graph.ErdosRenyiGraph(population, 0.01) val agents = generated.example.epidemic.InitData(graph) - (new API.SimulateUntil()).apply(agents, totalRounds, (ts: Iterable[Iterable[Serializable]]) => { + val conf = Map("role" -> "Standalone", + "port" -> 25200, + "name" -> "Epidemics", + "data" -> "timeseries") + Simulate.apply(agents, totalRounds, conf, Some((ts: Iterable[Iterable[Serializable]]) => { val x = ts.last.filter(i => i match { case y: generated.example.epidemic.Person => y.health == 1 }).size println("Total infected agents: " + x) x > population / 2 - }) + })) } } \ No newline at end of file diff --git a/Base/src/main/scala/Simulate.scala b/Base/src/main/scala/Simulate.scala index ede9b152..f67b9608 100644 --- a/Base/src/main/scala/Simulate.scala +++ b/Base/src/main/scala/Simulate.scala @@ -2,15 +2,44 @@ package simulation.base.API import scala.collection.mutable.{Buffer, Map => MutMap} import meta.runtime.{SimRuntime, Actor, Message} -import meta.API.{SimulationSnapshot, util} +import meta.API._ +import scala.util.Random object Simulate { - def apply(agents: Traversable[Actor], totalRound: Long): SimulationSnapshot = { + val r_seed = new Random(1000) + + // Default: anonymous simulations that returns snapshot + def apply(agents: IndexedSeq[Actor], totalRound: Long)(implicit strategy: DeforestationStrategy): SimulationData = { + new Simulate(agents, totalRound, Map("name" -> f"Simulation_${r_seed.nextInt()}", "data" -> "snapshot"), None).run(strategy) + } + + // SimulateUntil + // todo + def withUntilCondition(agents: IndexedSeq[Actor], totalRound: Long, cond: Iterable[Iterable[Serializable]] => Boolean)(implicit strategy: DeforestationStrategy): SimulationData = { + new Simulate(agents, totalRound, Map("name" -> f"Simulation_${r_seed.nextInt()}", "data" -> "timeseries"), Some(cond)).run(strategy) + } +} + +class Simulate(agents: IndexedSeq[Actor], totalRound: Long, conf: Map[String, Any], cond: Option[Iterable[Iterable[Serializable]] => Boolean] = None) { + def run(strategy: DeforestationStrategy): SimulationData = { + + require(conf.isDefinedAt("name")) // name of the actor system, to allow concurrent simulations + require(conf.isDefinedAt("data")) // timeseries or snapshot + + val name: String = conf("name").asInstanceOf[String] + val dataConf: String = conf("data").asInstanceOf[String] + + val builder: SimulationDataBuilder = if (dataConf == "timeseries") { + new TimeseriesBuilder(strategy) + } else { + new SnapshotBuilder() + } + var currentRound: Long = 0 var elapsedRound: Int = 0 var collectedMessages: MutMap[Long, Buffer[Message]] = MutMap[Long, Buffer[Message]]() var collectedSerializedMessages: MutMap[Long, Buffer[Array[Byte]]] = MutMap[Long, Buffer[Array[Byte]]]() - var actors: Traversable[Actor] = agents + var actors: IndexedSeq[Actor] = agents val initial: Long = System.currentTimeMillis() var end: Long = initial @@ -19,7 +48,7 @@ object Simulate { val start: Long = end // Add newly generated agents if (!SimRuntime.newActors.isEmpty) { - actors = SimRuntime.newActors ++ actors + actors = actors ++ SimRuntime.newActors SimRuntime.newActors.clear() } @@ -37,6 +66,12 @@ object Simulate { proposed }).min + builder match { + case x: TimeseriesBuilder => + builder.addTimeseries(Vector(strategy.transformer(actors.map(a => strategy.mapper(a.SimClone()))))) + case _ => + } + actors.filterNot(_.deleted).foreach(a => { a.receivedMessages ++= collectedMessages.getOrElse(a.id, Buffer()) a.receivedSerializedMessages ++= collectedSerializedMessages.getOrElse(a.id, Buffer()) @@ -48,11 +83,21 @@ object Simulate { end = System.currentTimeMillis() println(f"Round ${currentRound} takes ${end-start} ms") } + if (totalRound >= 1) { println(f"Average ${(end - initial)/totalRound} ms") } else { println(f"Average ${end - initial} ms") } - SimulationSnapshot(actors, collectedMessages.flatMap(i => i._2).toList) + + builder match { + case x: SnapshotBuilder => + builder.addAgents(actors) + builder.addMessages(collectedMessages.flatMap(i => i._2).toVector) + case _ => + } + + // SimulationSnapshot(actors, collectedMessages.flatMap(i => i._2).toList) + builder.build() } } diff --git a/core/src/main/scala/meta/API/DeforestationStrategy.scala b/core/src/main/scala/meta/API/DeforestationStrategy.scala new file mode 100644 index 00000000..62c7635c --- /dev/null +++ b/core/src/main/scala/meta/API/DeforestationStrategy.scala @@ -0,0 +1,23 @@ +package meta.API + +abstract class DeforestationStrategy { + // a sequential worker applies the mapper to each agent + def mapper(x: Serializable): Serializable = { + x + } + + // driver sends an Iterable[Serializable] to the log controller. Log controller collects Iterable[Iterable[Serializable]] + // and applies the reducer method to reduce the intermediate data + def reducer(x: Iterable[Iterable[Serializable]]): Iterable[Serializable] = { + x.flatten + } + + // driver applies to the collected timeseries + def transformer(x: Iterable[Serializable]): Iterable[Serializable] = { + x + } +} + +object DeforestationStrategy { + implicit object NoReduction extends DeforestationStrategy +} diff --git a/core/src/main/scala/meta/API/SimulationData.scala b/core/src/main/scala/meta/API/SimulationData.scala new file mode 100644 index 00000000..216e3585 --- /dev/null +++ b/core/src/main/scala/meta/API/SimulationData.scala @@ -0,0 +1,50 @@ +package meta.API + +import meta.runtime.{Actor, Message} +import scala.collection.mutable.Buffer + +// The distinction is for performance +sealed trait SimulationData { + def sims: Traversable[Actor] = ??? + def messages: Traversable[Message] = ??? + def timeseries: Iterable[Iterable[Serializable]] = ??? +} + +sealed case class Snapshot(override val sims: Traversable[Actor], override val messages: Traversable[Message]=List()) extends SimulationData +sealed case class Timeseries(override val timeseries: Iterable[Iterable[Serializable]]) extends SimulationData + +sealed trait SimulationDataBuilder { + def addAgents(agents: IndexedSeq[Actor]): Unit = {} + def addMessages(msgs: IndexedSeq[Message]): Unit = {} + def addTimeseries(ts: Iterable[Iterable[Serializable]]): Unit = {} + def build(): SimulationData +} + +sealed class SnapshotBuilder extends SimulationDataBuilder { + private val p_sims: Buffer[Actor] = Buffer[Actor]() + private val p_messages: Buffer[Message] = Buffer[Message]() + + override def addAgents(agents: IndexedSeq[Actor]): Unit = { + p_sims ++= agents + } + + override def addMessages(msgs: IndexedSeq[Message]): Unit = { + p_messages ++= msgs + } + + override def build(): Snapshot = { + Snapshot(p_sims.toList, p_messages.toList) + } +} + +sealed class TimeseriesBuilder(val strategy: DeforestationStrategy) extends SimulationDataBuilder { + private var timeseries: Iterable[Iterable[Serializable]] = Iterable.empty + + override def addTimeseries(ts: Iterable[Iterable[Serializable]]): Unit = { + timeseries = timeseries ++ ts + } + + override def build(): Timeseries = { + Timeseries(timeseries) + } +} \ No newline at end of file diff --git a/core/src/main/scala/meta/API/SimulationSnapshot.scala b/core/src/main/scala/meta/API/SimulationSnapshot.scala deleted file mode 100644 index 95914e67..00000000 --- a/core/src/main/scala/meta/API/SimulationSnapshot.scala +++ /dev/null @@ -1,11 +0,0 @@ -package meta.API - -import meta.runtime.{Actor, Message} - -/** - * Capture the state of the simulation. - * @param actors the Sims at the current state - * @param messages the list of in-transit messages - */ -case class SimulationSnapshot(val sims: Traversable[Actor], val messages: Traversable[Message]=List()){ -}