Skip to content

Commit

Permalink
Significantly speed up game data cloning for AI and battle calculator. (
Browse files Browse the repository at this point in the history
#10454)

* Speed up game data cloning for battle calculator.

Cloning game data involved creating a byte[] array out of the data and then converting it to a new GameData object.
Previously, both steps would be done for each concurrent worker. With this change, we only create the byte[] array once instead of N times, speeding up the process.

Also fixes a couple comments.

* Fix a comment.

* Clean up.

* Improve logic.

* Use original timing for calculating number of threads.

* A bit more clean up.

* Dont compress data when cloning games.
  • Loading branch information
asvitkine authored May 17, 2022
1 parent ccd22c5 commit 32582e8
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
* <li>https://youtu.be/V1vQf4qyMXg?t=56m12s
* </ul>
*
* <p>A typical usage of this annotation will and pattern will look like this: Code formatted below
* for easy copy paste:
* <p>A typical usage of this annotation and pattern will look like this: Code formatted below for
* easy copy paste:
*
* <pre>
* &#64;SerializationProxySupport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,17 @@ public static Optional<GameData> loadGame(final InputStream is) {
* @return The loaded game data, or an empty optional if an error occurs.
*/
public static Optional<GameData> loadGame(final Version ourVersion, final InputStream is) {
try (ObjectInputStream input = new ObjectInputStream(new GZIPInputStream(is))) {
try (GZIPInputStream input = new GZIPInputStream(is)) {
return loadGameUncompressed(ourVersion, input);
} catch (final Throwable e) {
log.error("Error loading game data", e);
return Optional.empty();
}
}

public static Optional<GameData> loadGameUncompressed(
final Version ourVersion, final InputStream is) {
try (ObjectInputStream input = new ObjectInputStream(is)) {
final Object version = input.readObject();

if (isCompatibleVersion(ourVersion, version)
Expand Down Expand Up @@ -162,59 +172,61 @@ private static void loadDelegates(final ObjectInputStream input, final GameData
/**
* Saves the specified game data to the specified stream.
*
* @param os The stream to which the game data will be saved. Note that this stream will be closed
* if this method returns successfully.
* @param out The stream to which the game data will be saved. Note that this stream will be
* closed if this method returns successfully.
* @param gameData The game data to save.
* @throws IOException If an error occurs while saving the game.
*/
public static void saveGame(
final OutputStream os, final GameData gameData, final Version engineVersion)
final OutputStream out, final GameData gameData, final Version engineVersion)
throws IOException {
checkNotNull(os);
checkNotNull(out);
checkNotNull(gameData);

saveGame(os, gameData, true, engineVersion);
}

static void saveGame(
final OutputStream sink,
final GameData data,
final boolean saveDelegateInfo,
final Version engineVersion)
throws IOException {
final Path tempFile =
Files.createTempFile(
GameDataManager.class.getSimpleName(), GameDataFileUtils.getExtension());
try {
// write to temporary file first in case of error
try (OutputStream os = Files.newOutputStream(tempFile);
OutputStream bufferedOutStream = new BufferedOutputStream(os);
OutputStream zippedOutStream = new GZIPOutputStream(bufferedOutStream);
ObjectOutputStream outStream = new ObjectOutputStream(zippedOutStream)) {
outStream.writeObject(engineVersion);
data.acquireReadLock();
try {
outStream.writeObject(data);
if (saveDelegateInfo) {
writeDelegates(data, outStream);
} else {
outStream.writeObject(DELEGATE_LIST_END);
}
} finally {
data.releaseReadLock();
}
OutputStream zippedOutStream = new GZIPOutputStream(bufferedOutStream)) {
saveGameUncompressed(zippedOutStream, gameData, true, engineVersion);
}

// now write to sink (ensure sink is closed per method contract)
try (InputStream is = Files.newInputStream(tempFile);
OutputStream os = new BufferedOutputStream(sink)) {
OutputStream os = new BufferedOutputStream(out)) {
IOUtils.copy(is, os);
}
} finally {
Files.delete(tempFile);
}
}

public static void saveGameUncompressed(
final OutputStream sink,
final GameData data,
final boolean saveDelegateInfo,
final Version engineVersion)
throws IOException {
// write to temporary file first in case of error
try (ObjectOutputStream outStream = new ObjectOutputStream(sink)) {
outStream.writeObject(engineVersion);
data.acquireReadLock();
try {
outStream.writeObject(data);
if (saveDelegateInfo) {
writeDelegates(data, outStream);
} else {
outStream.writeObject(DELEGATE_LIST_END);
}
} finally {
data.releaseReadLock();
}
}
}

private static void writeDelegates(final GameData data, final ObjectOutputStream out)
throws IOException {
for (final IDelegate delegate : data.getDelegates()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,30 @@ public static Optional<GameData> cloneGameData(final GameData data) {
*/
public static Optional<GameData> cloneGameData(
final GameData data, final boolean copyDelegates, final Version engineVersion) {
final byte[] bytes = gameDataToBytes(data, copyDelegates, engineVersion).orElse(null);
if (bytes != null) {
return createGameDataFromBytes(bytes, engineVersion);
}
return Optional.empty();
}

public static Optional<byte[]> gameDataToBytes(
GameData data, boolean copyDelegates, Version engineVersion) {
try {
final byte[] bytes =
return Optional.of(
IoUtils.writeToMemory(
os -> GameDataManager.saveGame(os, data, copyDelegates, engineVersion));
os -> GameDataManager.saveGameUncompressed(os, data, copyDelegates, engineVersion)));
} catch (final IOException e) {
log.error("Failed to clone game data", e);
return Optional.empty();
}
}

public static Optional<GameData> createGameDataFromBytes(
final byte[] bytes, final Version engineVersion) {
try {
return IoUtils.readFromMemory(
bytes, inputStream -> GameDataManager.loadGame(engineVersion, inputStream));
bytes, inputStream -> GameDataManager.loadGameUncompressed(engineVersion, inputStream));
} catch (final IOException e) {
log.error("Failed to clone game data", e);
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Setter;
import org.triplea.util.Version;

Expand All @@ -32,14 +33,16 @@ class BattleCalculator implements IBattleCalculator {
private volatile boolean cancelled = false;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

BattleCalculator(
final GameData data, final boolean dataHasAlreadyBeenCloned, final Version engineVersion) {
gameData =
Preconditions.checkNotNull(
dataHasAlreadyBeenCloned
? data
: GameDataUtils.cloneGameData(data, false, engineVersion).orElse(null),
"Error cloning game data (low memory?)");
private BattleCalculator(@Nullable GameData data) {
gameData = Preconditions.checkNotNull(data, "Error cloning game data (low memory?)");
}

BattleCalculator(GameData data, Version engineVersion) {
this(GameDataUtils.cloneGameData(data, false, engineVersion).orElse(null));
}

BattleCalculator(byte[] data, Version engineVersion) {
this(GameDataUtils.createGameDataFromBytes(data, engineVersion).orElse(null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.triplea.injection.Injections;
import org.triplea.java.concurrency.AsyncRunner;
import org.triplea.java.concurrency.CountUpAndDownLatch;
import org.triplea.util.Version;

/**
* Concurrent wrapper class for the OddsCalculator. It spawns multiple worker threads and splits up
Expand Down Expand Up @@ -120,57 +121,30 @@ private void createWorkers(final GameData data) {
final long startTime = System.currentTimeMillis();
final long startMemory =
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
final GameData newData;
final Version engineVersion = Injections.getInstance().getEngineVersion();
final byte[] serializedData;
try {
// make first copy, then release lock on it so game can continue (ie: we don't want to lock
// on it while we copy
// it 16 times, when once is enough) don't let the data change while we make the first copy
// Serialize the data, then release lock on it so game can continue (ie: we don't want to
// lock on it while we copy it 16 times).
data.acquireWriteLock();
newData =
GameDataUtils.cloneGameDataWithoutHistory(
data, false, Injections.getInstance().getEngineVersion())
.orElse(null);
if (newData == null) {
serializedData = GameDataUtils.gameDataToBytes(data, false, engineVersion).orElse(null);
if (serializedData == null) {
return;
}
} finally {
data.releaseWriteLock();
}
final int currentThreads =
getThreadsToUse((System.currentTimeMillis() - startTime), startMemory);
try {
// make sure all workers are using the same data
newData.acquireReadLock();
int i = 0;
// we are already in 1 executor thread, so we have MAX_THREADS-1 threads left to use
if (currentThreads <= 2 || MAX_THREADS <= 2) {
// if 2 or fewer threads, do not multi-thread the copying (we have already copied it once
// above, so at most
// only 1 more copy to make)
while (cancelCurrentOperation.get() >= 0 && i < currentThreads) {
// the last one will use our already copied data from above, without copying it again
workers.add(
new BattleCalculator(
newData, (currentThreads == ++i), Injections.getInstance().getEngineVersion()));
}
} else {
// multi-thread our copying, cus why the heck not
// (it increases the speed of copying by about double)
workers.addAll(
IntStream.range(1, currentThreads)
.parallel()
.filter(j -> cancelCurrentOperation.get() >= 0)
.mapToObj(
j ->
new BattleCalculator(
newData, false, Injections.getInstance().getEngineVersion()))
.collect(Collectors.toList()));
// the last one will use our already copied data from above, without copying it again
workers.add(
new BattleCalculator(newData, true, Injections.getInstance().getEngineVersion()));
}
} finally {
newData.releaseReadLock();
if (cancelCurrentOperation.get() >= 0) {
// Create the first battle calc on the current thread to measure the end-to-end copy time.
workers.add(new BattleCalculator(serializedData, engineVersion));
int threadsToUse = getThreadsToUse((System.currentTimeMillis() - startTime), startMemory);
// Now, create the remaining ones in parallel.
workers.addAll(
IntStream.range(1, threadsToUse)
.parallel()
.filter(j -> cancelCurrentOperation.get() >= 0)
.mapToObj(j -> new BattleCalculator(serializedData, engineVersion))
.collect(Collectors.toList()));
}
}
if (cancelCurrentOperation.get() < 0 || data == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void testUnbalancedFight() {
final GamePlayer germans = germans(gameData);
final List<Unit> attackingUnits = infantry(gameData).create(100, russians);
final List<Unit> bombardingUnits = List.of();
final BattleCalculator calculator = new BattleCalculator(gameData, false, new Version("2.0.0"));
final BattleCalculator calculator = new BattleCalculator(gameData, new Version("2.0.0"));
final AggregateResults results =
calculator.calculate(
russians,
Expand Down Expand Up @@ -65,7 +65,7 @@ void testKeepOneAttackingLand() {
final List<Unit> attackingUnits = infantry(gameData).create(1, germans);
attackingUnits.addAll(bomber(gameData).create(1, germans));
final List<Unit> bombardingUnits = List.of();
final BattleCalculator calculator = new BattleCalculator(gameData, false, new Version("2.0.0"));
final BattleCalculator calculator = new BattleCalculator(gameData, new Version("2.0.0"));
calculator.setKeepOneAttackingLandUnit(true);
final AggregateResults results =
calculator.calculate(
Expand All @@ -88,7 +88,7 @@ void testAttackingTransports() {
final Territory sz1 = territory("1 Sea Zone", gameData);
final List<Unit> attacking = transport(gameData).create(2, americans(gameData));
final List<Unit> defending = submarine(gameData).create(2, germans(gameData));
final BattleCalculator calculator = new BattleCalculator(gameData, false, new Version("2.0.0"));
final BattleCalculator calculator = new BattleCalculator(gameData, new Version("2.0.0"));
calculator.setKeepOneAttackingLandUnit(false);
final AggregateResults results =
calculator.calculate(
Expand All @@ -112,7 +112,7 @@ void testDefendingTransports() {
final Territory sz1 = territory("1 Sea Zone", gameData);
final List<Unit> attacking = submarine(gameData).create(2, americans(gameData));
final List<Unit> defending = transport(gameData).create(2, germans(gameData));
final BattleCalculator calculator = new BattleCalculator(gameData, false, new Version("2.0.0"));
final BattleCalculator calculator = new BattleCalculator(gameData, new Version("2.0.0"));
calculator.setKeepOneAttackingLandUnit(false);
final AggregateResults results =
calculator.calculate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public void close() {
}

/**
* Creates a perf timer with a reporting frequency. The reporting frequency specifies N specifies
* that performance information should be printed every N executions of the timer. If 0, no
* information is printed.
* Creates a perf timer with a reporting frequency. The reporting frequency N specifies that
* performance information should be printed every N executions of the timer. If 0, no information
* is printed.
*
* @param title The name of the timer
* @param reportingFrequency The reporting frequency.
Expand Down

0 comments on commit 32582e8

Please sign in to comment.