From ade7834bc800d727a6273eeb874b8b453f93b0b1 Mon Sep 17 00:00:00 2001 From: cdavid Date: Tue, 12 Mar 2024 12:56:55 -0700 Subject: [PATCH] [PLAT-14294] Query latency increase workloads --- .../com/yugabyte/sample/apps/AppBase.java | 10 +- .../sample/apps/CassandraInserts.java | 2 +- .../sample/apps/CassandraKeyValue.java | 9 +- .../apps/CassandraUniqueSecondaryIndex.java | 2 +- .../com/yugabyte/sample/apps/SqlDataLoad.java | 2 +- .../sample/apps/anomalies/PlanAnomaly.java | 428 ++++++++++++++++++ .../sample/apps/anomalies/PlanAnomalyV2.java | 109 +++++ .../apps/anomalies/SqlInsertTablets.java | 326 +++++++++++++ .../anomalies/SqlInsertTabletsSkewQuery.java | 135 ++++++ .../yugabyte/sample/common/CmdLineOpts.java | 32 +- 10 files changed, 1035 insertions(+), 20 deletions(-) create mode 100644 src/main/java/com/yugabyte/sample/apps/anomalies/PlanAnomaly.java create mode 100644 src/main/java/com/yugabyte/sample/apps/anomalies/PlanAnomalyV2.java create mode 100644 src/main/java/com/yugabyte/sample/apps/anomalies/SqlInsertTablets.java create mode 100644 src/main/java/com/yugabyte/sample/apps/anomalies/SqlInsertTabletsSkewQuery.java diff --git a/src/main/java/com/yugabyte/sample/apps/AppBase.java b/src/main/java/com/yugabyte/sample/apps/AppBase.java index 014c8ea..6e274f4 100644 --- a/src/main/java/com/yugabyte/sample/apps/AppBase.java +++ b/src/main/java/com/yugabyte/sample/apps/AppBase.java @@ -98,7 +98,7 @@ public abstract class AppBase implements MetricsTracker.StatusMessageAppender { public static final int NUM_KEYS_TO_READ_FOR_YSQL_AND_YCQL = 1500000; // Variable to track start time of the workload. - private long workloadStartTime = -1; + protected long workloadStartTime = -1; // Instance of the workload configuration. public static AppConfig appConfig = new AppConfig(); // The configuration of the load tester. @@ -428,8 +428,8 @@ public synchronized void resetClients() { cassandra_session = null; } - Random random = new Random(); - byte[] buffer; + protected Random random = new Random(); + protected byte[] buffer; Checksum checksum = new Adler32(); // For binary values we store checksum in bytes. @@ -852,7 +852,7 @@ public String appenderName() { /** * Close the Connection. */ - static void close(Connection c) { + protected static void close(Connection c) { if (c != null) { try { c.close(); @@ -865,7 +865,7 @@ static void close(Connection c) { /** * Close the PreparedStatement. */ - static void close(PreparedStatement ps) { + protected static void close(PreparedStatement ps) { if (ps != null) { try { ps.close(); diff --git a/src/main/java/com/yugabyte/sample/apps/CassandraInserts.java b/src/main/java/com/yugabyte/sample/apps/CassandraInserts.java index 420abd5..134dc07 100644 --- a/src/main/java/com/yugabyte/sample/apps/CassandraInserts.java +++ b/src/main/java/com/yugabyte/sample/apps/CassandraInserts.java @@ -25,7 +25,7 @@ * which is indexed. */ public class CassandraInserts extends CassandraKeyValue { - private static final Logger LOG = Logger.getLogger(CassandraSecondaryIndex.class); + private static final Logger LOG = Logger.getLogger(CassandraInserts.class); static { appConfig.readIOPSPercentage = -1; diff --git a/src/main/java/com/yugabyte/sample/apps/CassandraKeyValue.java b/src/main/java/com/yugabyte/sample/apps/CassandraKeyValue.java index 0664d67..7dc7213 100644 --- a/src/main/java/com/yugabyte/sample/apps/CassandraKeyValue.java +++ b/src/main/java/com/yugabyte/sample/apps/CassandraKeyValue.java @@ -17,13 +17,10 @@ import java.util.Arrays; import java.util.List; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; import org.apache.log4j.Logger; -import com.yugabyte.sample.common.SimpleLoadGenerator.Key; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; /** * This workload writes and reads some random string keys from a CQL server. By default, this app @@ -31,7 +28,7 @@ */ public class CassandraKeyValue extends CassandraKeyValueBase { - private static final Logger LOG = Logger.getLogger(CassandraKeyValueBase.class); + private static final Logger LOG = Logger.getLogger(CassandraKeyValue.class); // The default table name to create and use for CRUD ops. private static final String DEFAULT_TABLE_NAME = CassandraKeyValue.class.getSimpleName(); static { diff --git a/src/main/java/com/yugabyte/sample/apps/CassandraUniqueSecondaryIndex.java b/src/main/java/com/yugabyte/sample/apps/CassandraUniqueSecondaryIndex.java index 6b434a8..3f211de 100644 --- a/src/main/java/com/yugabyte/sample/apps/CassandraUniqueSecondaryIndex.java +++ b/src/main/java/com/yugabyte/sample/apps/CassandraUniqueSecondaryIndex.java @@ -25,7 +25,7 @@ * which is indexed. */ public class CassandraUniqueSecondaryIndex extends CassandraSecondaryIndex { - private static final Logger LOG = Logger.getLogger(CassandraSecondaryIndex.class); + private static final Logger LOG = Logger.getLogger(CassandraUniqueSecondaryIndex.class); // The default table name to create and use for CRUD ops. private static final String DEFAULT_TABLE_NAME = diff --git a/src/main/java/com/yugabyte/sample/apps/SqlDataLoad.java b/src/main/java/com/yugabyte/sample/apps/SqlDataLoad.java index 899a9a2..d63209b 100644 --- a/src/main/java/com/yugabyte/sample/apps/SqlDataLoad.java +++ b/src/main/java/com/yugabyte/sample/apps/SqlDataLoad.java @@ -28,7 +28,7 @@ * Then runs a write workload to load data into the target table. */ public class SqlDataLoad extends AppBase { - private static final Logger LOG = Logger.getLogger(SqlInserts.class); + private static final Logger LOG = Logger.getLogger(SqlDataLoad.class); // Static initialization of this workload's config. These are good defaults for getting a decent // read dominated workload on a reasonably powered machine. Exact IOPS will of course vary diff --git a/src/main/java/com/yugabyte/sample/apps/anomalies/PlanAnomaly.java b/src/main/java/com/yugabyte/sample/apps/anomalies/PlanAnomaly.java new file mode 100644 index 0000000..34c086c --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/anomalies/PlanAnomaly.java @@ -0,0 +1,428 @@ +package com.yugabyte.sample.apps.anomalies; + +import com.yugabyte.sample.common.CmdLineOpts.ContactPoint; +import com.yugabyte.sample.common.SimpleLoadGenerator.Key; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Logger; + +public class PlanAnomaly extends SqlInsertTablets { + + private static final Logger LOG = Logger.getLogger(PlanAnomaly.class); + + private static final AtomicInteger READ_SKEW_SETUP = new AtomicInteger(0); + + public PreparedStatement preparedSelectNoHint; + private boolean readSkewThread = false; + public int readCounter = 0; + public long lowerBound = 0; + public long upperBound = 0; + public int percentage = 10; + public int beginRatio = 3; + + public PlanAnomaly() { + buffer = new byte[appConfig.valueSize]; + WAIT_TIMEOUT_MS = 3000; + } + + public void setupHinting() throws ClassNotFoundException, SQLException { + String dbName = appConfig.defaultPostgresDatabase; + String dbUser = appConfig.dbUsername; + String dbPass = appConfig.dbPassword; + appConfig.enableDriverDebug = true; + + for (ContactPoint contactPoint : configuration.contactPoints) { + LOG.info("Setting up pg_hint_plan extension for host " + contactPoint.getHost()); + Connection connection = + getRawConnection(contactPoint.getHost(), contactPoint.getPort(), dbName, dbUser, dbPass); + Statement s = connection.createStatement(); + s.addBatch("CREATE EXTENSION IF NOT EXISTS pg_hint_plan;"); + s.executeBatch(); + s = connection.createStatement(); + s.addBatch("SET pg_hint_plan.enable_hint_table = on;"); + s.executeBatch(); + s = connection.createStatement(); + s.addBatch("truncate hint_plan.hints;"); + s.executeBatch(); + s = connection.createStatement(); + s.addBatch( + "INSERT INTO hint_plan.hints(norm_query_string, application_name, hints)VALUES('SELECT" + + " DISTINCT t50000.c_int,t500000.c_varchar FROM t500000 right OUTER JOIN t100 ON" + + " t500000.c_decimal = t100.c_decimal right OUTER JOIN t50000 ON t500000.c_decimal =" + + " t50000.c_decimal WHERE t500000.c_int in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?," + + " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?," + + " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ORDER BY t50000.c_int, t500000.c_varchar desc limit" + + " ?;', '', 'Leading ( t50000 t500000 ) t100 HashJoin(t50000 t500000)" + + " HashJoin(t50000 t500000 t100) IndexOnlyScan(t500000) SeqScan(t100)" + + " IndexScan(t50000)');"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.executeBatch(); + LOG.info("pg_hint_plan extension setup and statements reset"); + close(connection); + } + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + try (Connection connection = getPostgresConnection()) { + + if (tableOp.equals(TableOp.DropTable)) { + + LOG.info("Dropping any table(s) left from previous runs if any"); + Statement s = connection.createStatement(); + s.addBatch("DROP TABLE IF EXISTS t1000000 CASCADE;"); + s.addBatch("DROP TABLE IF EXISTS t500000 CASCADE;"); + s.addBatch("DROP TABLE IF EXISTS t50000 CASCADE;"); + s.addBatch("DROP TABLE IF EXISTS t100 CASCADE;"); + s.executeBatch(); + LOG.info("Dropped"); + } + + Statement s = connection.createStatement(); + s.addBatch( + "CREATE TABLE IF NOT EXISTS t1000000\n" + + "( c_int int,\n" + + " c_bool bool,\n" + + " c_text text,\n" + + " c_varchar varchar,\n" + + " c_decimal decimal,\n" + + " c_float float,\n" + + " c_real real,\n" + + " c_money money\n" + + ") SPLIT INTO 3 TABLETS;"); + s.addBatch( + "INSERT INTO t1000000\n" + + "SELECT c_int,\n" + + " (case when c_int % 2 = 0 then true else false end) as c_bool,\n" + + " (c_int + 0.0001)::text as c_text,\n" + + " (c_int + 0.0002):: varchar as c_varchar,\n" + + " (c_int + 0.1):: decimal as c_decimal,\n" + + " (c_int + 0.2):: float as c_float,\n" + + " (c_int + 0.3):: real as c_real,\n" + + " (c_int + 0.4) ::money as c_money FROM generate_Series(1, 100000 * 10)" + + " c_int " + + "WHERE (SELECT COUNT(*) FROM t1000000) = 0;"); + s.addBatch("CREATE INDEX IF NOT EXISTS t1000000_1_idx ON t1000000 (c_int ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t1000000_2_idx ON t1000000 (c_int ASC, c_bool ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t1000000_3_idx ON t1000000 (c_int ASC, c_text ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t1000000_4_idx ON t1000000 (c_int ASC, c_varchar ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t1000000_5_idx ON t1000000 (c_float ASC, c_text ASC," + + " c_varchar ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t1000000_6_idx ON t1000000 (c_float ASC, c_decimal ASC," + + " c_varchar ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t1000000_7_idx ON t1000000 (c_float ASC, c_real ASC, c_money" + + " ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + LOG.info("t1000000"); + + s.addBatch( + "CREATE TABLE IF NOT EXISTS t500000\n" + + "( c_int int,\n" + + " c_bool bool,\n" + + " c_text text,\n" + + " c_varchar varchar,\n" + + " c_decimal decimal,\n" + + " c_float float,\n" + + " c_real real,\n" + + " c_money money\n" + + ") SPLIT INTO 3 TABLETS;"); + s.addBatch( + "INSERT INTO t500000\n" + + "SELECT c_int,\n" + + " (case when c_int % 2 = 0 then true else false end) as c_bool,\n" + + " (c_int + 0.0001)::text as c_text,\n" + + " (c_int + 0.0002):: varchar as c_varchar,\n" + + " (c_int + 0.1):: decimal as c_decimal,\n" + + " (c_int + 0.2):: float as c_float,\n" + + " (c_int + 0.3):: real as c_real,\n" + + " (c_int + 0.4) ::money as c_money FROM generate_Series(1, 50000 * 10)" + + " c_int " + + "WHERE (SELECT COUNT(*) FROM t500000) = 0;"); + s.addBatch("CREATE INDEX IF NOT EXISTS t500000_1_idx ON t500000 (c_int ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t500000_2_idx ON t500000 (c_int ASC, c_bool ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t500000_3_idx ON t500000 (c_int ASC, c_text ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t500000_4_idx ON t500000 (c_int ASC, c_varchar ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t500000_5_idx ON t500000 (c_float ASC, c_text ASC, c_varchar" + + " ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t500000_6_idx ON t500000 (c_float ASC, c_decimal ASC," + + " c_varchar ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t500000_7_idx ON t500000 (c_float ASC, c_real ASC, c_money" + + " ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + LOG.info("t500000"); + + s.addBatch( + "CREATE TABLE IF NOT EXISTS t50000\n" + + "( c_int int,\n" + + " c_bool bool,\n" + + " c_text text,\n" + + " c_varchar varchar,\n" + + " c_decimal decimal,\n" + + " c_float float,\n" + + " c_real real,\n" + + " c_money money\n" + + ") SPLIT INTO 3 TABLETS;"); + ; + s.addBatch( + "INSERT INTO t50000\n" + + "SELECT c_int,\n" + + " (case when c_int % 2 = 0 then true else false end) as c_bool,\n" + + " (c_int + 0.0001)::text as c_text,\n" + + " (c_int + 0.0002):: varchar as c_varchar,\n" + + " (c_int + 0.1):: decimal as c_decimal,\n" + + " (c_int + 0.2):: float as c_float,\n" + + " (c_int + 0.3):: real as c_real,\n" + + " (c_int + 0.4) ::money as c_money FROM generate_Series(1, 5000 * 10)" + + " c_int " + + "WHERE (SELECT COUNT(*) FROM t50000) = 0;"); + s.addBatch("CREATE INDEX IF NOT EXISTS t50000_1_idx ON t50000 (c_int ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t50000_2_idx ON t50000 (c_int ASC, c_bool ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t50000_3_idx ON t50000 (c_int ASC, c_text ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t50000_4_idx ON t50000 (c_int ASC, c_varchar ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t50000_5_idx ON t50000 (c_float ASC, c_text ASC, c_varchar" + + " ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t50000_6_idx ON t50000 (c_float ASC, c_decimal ASC, c_varchar" + + " ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t50000_7_idx ON t50000 (c_float ASC, c_real ASC, c_money" + + " ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + LOG.info("t50000"); + + s.addBatch( + "CREATE TABLE IF NOT EXISTS t100\n" + + "( c_int int,\n" + + " c_bool bool,\n" + + " c_text text,\n" + + " c_varchar varchar,\n" + + " c_decimal decimal,\n" + + " c_float float,\n" + + " c_real real,\n" + + " c_money money\n" + + ") SPLIT INTO 3 TABLETS;"); + s.addBatch( + "INSERT INTO t100\n" + + "SELECT c_int,\n" + + " (case when c_int % 2 = 0 then true else false end) as c_bool,\n" + + " (c_int + 0.0001)::text as c_text,\n" + + " (c_int + 0.0002):: varchar as c_varchar,\n" + + " (c_int + 0.1):: decimal as c_decimal,\n" + + " (c_int + 0.2):: float as c_float,\n" + + " (c_int + 0.3):: real as c_real,\n" + + " (c_int + 0.4) ::money as c_money FROM generate_Series(1, 10 * 10)" + + " c_int " + + "WHERE (SELECT COUNT(*) FROM t100) = 0;"); + s.addBatch("CREATE INDEX IF NOT EXISTS t100_1_idx ON t100 (c_int ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t100_2_idx ON t100 (c_int ASC, c_bool ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t100_3_idx ON t100 (c_int ASC, c_text ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch("CREATE INDEX IF NOT EXISTS t100_4_idx ON t100 (c_int ASC, c_varchar ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t100_5_idx ON t100 (c_float ASC, c_text ASC, c_varchar" + + " ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t100_6_idx ON t100 (c_float ASC, c_decimal ASC, c_varchar" + + " ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + s.addBatch( + "CREATE INDEX IF NOT EXISTS t100_7_idx ON t100 (c_float ASC, c_real ASC, c_money ASC);"); + s.addBatch("SELECT pg_stat_statements_reset();"); + LOG.info("t100"); + s.executeBatch(); + + s = connection.createStatement(); + s.addBatch("ANALYZE t1000000;"); + s.addBatch("ANALYZE t500000;"); + s.addBatch("ANALYZE t50000;"); + s.addBatch("ANALYZE t100;"); + LOG.info("Analyze. Executing"); + + s.executeBatch(); + LOG.info("Tables created"); + + if (tableOp.equals(TableOp.TruncateTable)) { + Statement t = connection.createStatement(); + t.addBatch("TRUNCATE TABLE t1000000;"); + t.addBatch("TRUNCATE TABLE t500000;"); + t.addBatch("TRUNCATE TABLE t50000;"); + t.addBatch("TRUNCATE TABLE t100;"); + t.executeBatch(); + LOG.info("Truncated table: t1000000, t500000, t50000, t100"); + } + } + + // setupHinting(); + } + + @Override + public PreparedStatement getPreparedSelect() throws Exception { + if (preparedSelect == null) { + LOG.info("Preparing SELECT statement"); + close(selConnection); + selConnection = getPostgresConnectionFair(); + + preparedSelect = + selConnection.prepareStatement( + "/*+Leading ( t50000 t100 ) t500000 HashJoin(t50000 t100) HashJoin(t50000 t100" + + " t500000) IndexScan(t500000) SeqScan(t100) SeqScan(t50000)*/SELECT DISTINCT" + + " t50000.c_int,t500000.c_varchar FROM t500000 right OUTER JOIN t100 ON" + + " t500000.c_decimal = t100.c_decimal right OUTER JOIN t50000 ON" + + " t500000.c_decimal = t50000.c_decimal WHERE t500000.c_int in (13, 17, 74, 93," + + " 76, 8, 82, 44, 26, 40, 96, 42, 3, 38, 98, 60, 1, 81, 62, 6, 1, 63, 29, 62," + + " 93, 81, 35, 20, 28, 61, 56, 67, 8, 9, 62, 15, 51, 62, 81, 70, 40, 58, 95, 34," + + " 74, 36, 80, 9, 74, 18) ORDER BY t50000.c_int, t500000.c_varchar desc limit" + + " 100;"); + preparedSelectNoHint = + selConnection.prepareStatement( + "SELECT DISTINCT t50000.c_int,t500000.c_varchar FROM t500000 right OUTER JOIN t100" + + " ON t500000.c_decimal = t100.c_decimal right OUTER JOIN t50000 ON" + + " t500000.c_decimal = t50000.c_decimal WHERE t500000.c_int in (13, 17, 74, 93," + + " 76, 8, 82, 44, 26, 40, 96, 42, 3, 38, 98, 60, 1, 81, 62, 6, 1, 63, 29, 62," + + " 93, 81, 35, 20, 28, 61, 56, 67, 8, 9, 62, 15, 51, 62, 81, 70, 40, 58, 95, 34," + + " 74, 36, 80, 9, 74, 18) ORDER BY t50000.c_int, t500000.c_varchar desc limit" + + " 100;"); + + preparedSelectNoHint.execute(); + LOG.info("Prepared SELECT statement"); + } + return preparedSelect; + } + + @Override + public long doRead() { + try { + doReadNoBarrier(null); + return 1; + } catch (Exception e) { + LOG.info("Failed reading value: ", e); + close(preparedSelect); + preparedSelect = null; + return 0; + } + } + + public void executeQuery(PreparedStatement statement) throws Exception { + try (ResultSet rs1 = statement.executeQuery()) {} + } + + @Override + public long doReadNoBarrier(Key key) { + PreparedStatement statement = null; + try { + + boolean timeBased = appConfig.runTimeSeconds > 0; + + if (READ_SKEW_SETUP.get() < 10) { + if (READ_SKEW_SETUP.incrementAndGet() == 1) { + readSkewThread = true; + if (timeBased) { + // 6/8 normal traffic + 1/8 high latency traffic + 1/8 normal traffic + lowerBound = appConfig.runTimeSeconds / 8 * 6; + upperBound = appConfig.runTimeSeconds / 8 * 7; + + LOG.info( + "Setting up read skew thread appConfig.runTimeSeconds = " + + appConfig.runTimeSeconds + + " configuration.getNumReaderThreads() = " + + configuration.getNumReaderThreads() + + " lowerBound = " + + lowerBound + + " upperBound = " + + upperBound); + } else { + long queriesPerThread = appConfig.numKeysToRead / configuration.getNumReaderThreads(); + lowerBound = queriesPerThread / beginRatio; + // Lower bound determines when we start making slow queries. + // Upper bound determines how many queries we make. + // In our case, the unhinted query takes ~8s and the hinted query 400ms. + // This means that the unhinted query is 20x slower than the hinted query. + // So, while the slow thread makes 1 call, the other 2 threads make 20 calls each. + upperBound = lowerBound + queriesPerThread / percentage; + + LOG.info( + "Setting up read skew thread appConfig.numKeysToRead = " + + appConfig.numKeysToRead + + " configuration.getNumReaderThreads() = " + + configuration.getNumReaderThreads() + + " queriesPerThread = " + + queriesPerThread + + " lowerBound = " + + lowerBound + + " upperBound = " + + upperBound); + } + } + } + + statement = getPreparedSelect(); + if (readSkewThread) { + if (timeBased) { + long currentTimeSec = (System.currentTimeMillis() - workloadStartTime) / 1000; + if (currentTimeSec > lowerBound && currentTimeSec < upperBound) { + if (readCounter % 100 == 0) { + LOG.info( + "Read Skew Iteration: " + (currentTimeSec - lowerBound) + + " / " + (upperBound - lowerBound) + " sec done."); + } + statement = preparedSelectNoHint; + } + } else { + if (readCounter > lowerBound && readCounter < upperBound) { + if (readCounter % 100 == 0) { + LOG.info("Read Skew Iteration: " + readCounter + " / " + upperBound + " done."); + } + statement = preparedSelectNoHint; + } + } + } + executeQuery(statement); + readCounter++; + } catch (Exception e) { + LOG.info("Failed reading value: ", e); + close(preparedSelect); + preparedSelect = null; + return 0; + } + return 1; + } + + @Override + public void recordExistingRowCount() { + LOG.info("Skipping recording the row count"); + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/anomalies/PlanAnomalyV2.java b/src/main/java/com/yugabyte/sample/apps/anomalies/PlanAnomalyV2.java new file mode 100644 index 0000000..4a2748c --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/anomalies/PlanAnomalyV2.java @@ -0,0 +1,109 @@ +package com.yugabyte.sample.apps.anomalies; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Random; +import org.apache.log4j.Logger; + +public class PlanAnomalyV2 extends PlanAnomaly { + + private static final Logger LOG = Logger.getLogger(PlanAnomalyV2.class); + private static final int KEY_LIMIT = 1000000; + + public PlanAnomalyV2() { + buffer = new byte[appConfig.valueSize]; + WAIT_TIMEOUT_MS = 3000; + // We start at 1/4 of the queries + beginRatio = 4; + // The ratio between the queries is ~20x + // Without hint: 45ms + // With hint: 2ms + percentage = 30; + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + try (Connection connection = getPostgresConnection()) { + + if (tableOp.equals(TableOp.DropTable)) { + LOG.info("Dropping any table(s) left from previous runs if any"); + Statement s = connection.createStatement(); + s.addBatch(String.format("DROP TABLE IF EXISTS %s CASCADE;", getTableName())); + s.executeBatch(); + LOG.info("Dropped"); + } + + Statement s = connection.createStatement(); + s.addBatch( + String.format( + "CREATE TABLE IF NOT EXISTS %s (k1 int, k2 int, v1 int, v2 int, v3 int, PRIMARY" + + " KEY(k1,k2)) SPLIT INTO 3 TABLETS;", + getTableName())); + + s.addBatch( + String.format( + "CREATE INDEX IF NOT EXISTS %s_v1_v2 ON %s(v1,v2);", getTableName(), getTableName())); + s.executeBatch(); + LOG.info("Created table and index " + getTableName()); + + if (tableOp.equals(TableOp.TruncateTable)) { + Statement t = connection.createStatement(); + t.addBatch(String.format("TRUNCATE TABLE %s;", getTableName())); + t.executeBatch(); + LOG.info("Truncated table " + getTableName()); + } + + s = connection.createStatement(); + String createIndexStatement = + String.format( + "INSERT INTO %s SELECT i / 100000,i %% 100000,i / 10,i %% 10, i FROM" + + " generate_series(1,1000000) AS i WHERE (SELECT COUNT(*) from %s) = 0;", + getTableName(), getTableName()); + LOG.info("Create index: " + createIndexStatement); + s.addBatch(createIndexStatement); + s.executeBatch(); + + LOG.info("Inserted table " + getTableName()); + + s = connection.createStatement(); + s.addBatch(String.format("ANALYZE %s;", getTableName())); + LOG.info("Analyze. Executing"); + s.executeBatch(); + } + } + + @Override + public PreparedStatement getPreparedSelect() throws Exception { + if (preparedSelect == null) { + LOG.info("Preparing SELECT statement"); + close(selConnection); + selConnection = getPostgresConnectionFair(); + + String query = String.format("select * from %s where v1 = ? and k1 = ?;", getTableName()); + String hint = String.format("/*+IndexScan(%s %s_v1_v2)*/", getTableName(), getTableName()); + + LOG.info("sel " + hint + query); + LOG.info("selNoHint " + query); + + preparedSelect = selConnection.prepareStatement(hint + query); + preparedSelectNoHint = selConnection.prepareStatement(query); + LOG.info("Prepared SELECT statement"); + } + + return preparedSelect; + } + + @Override + public void executeQuery(PreparedStatement statement) throws Exception { + long key = getNextKey(); + statement.setLong(1, key / 10); + statement.setLong(2, key / 100000); + try (ResultSet rs1 = statement.executeQuery()) {} + } + + public long getNextKey() { + return random.nextInt(KEY_LIMIT); + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/anomalies/SqlInsertTablets.java b/src/main/java/com/yugabyte/sample/apps/anomalies/SqlInsertTablets.java new file mode 100644 index 0000000..9dfbd27 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/anomalies/SqlInsertTablets.java @@ -0,0 +1,326 @@ +package com.yugabyte.sample.apps.anomalies; + +import com.yugabyte.sample.apps.AppBase; +import com.yugabyte.sample.apps.SqlInserts; +import com.yugabyte.sample.common.CmdLineOpts; +import com.yugabyte.sample.common.CmdLineOpts.ContactPoint; +import com.yugabyte.sample.common.SimpleLoadGenerator.Key; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Hashtable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.log4j.Logger; + +public class SqlInsertTablets extends SqlInserts { + private static final Logger LOG = Logger.getLogger(SqlInsertTablets.class); + + public static CyclicBarrier READ_BARRIER; + public static CyclicBarrier WRITE_BARRIER; + public static AtomicBoolean SETUP_LOCK = new AtomicBoolean(false); + + public static int WAIT_TIMEOUT_MS = 1000; + + @Override + public void initialize(CmdLineOpts configuration) { + LOG.info("Initializing workload " + this.getClass().getSimpleName()); + + boolean shouldSetup = SETUP_LOCK.compareAndSet(false, true); + if (shouldSetup) { + LOG.info( + "Setting up iteration synchronization barriers for " + + configuration.getNumReaderThreads() + + " reader threads and " + + configuration.getNumWriterThreads() + + " writer threads."); + if (configuration.getNumReaderThreads() >= 1) { + READ_BARRIER = new CyclicBarrier(configuration.getNumReaderThreads()); + } + if (configuration.getNumWriterThreads() >= 1) { + WRITE_BARRIER = new CyclicBarrier(configuration.getNumWriterThreads()); + } + } + } + + // Static initialization of this workload's config. These are good defaults for getting a decent + // read dominated workload on a reasonably powered machine. Exact IOPS will of course vary + // depending on the machine and what resources it has to spare. + static { + // Disable the read-write percentage. + appConfig.readIOPSPercentage = -1; + // Set the read and write threads to 2 each. + appConfig.numReaderThreads = 2; + appConfig.numWriterThreads = 2; + // The number of keys to read. + appConfig.numKeysToRead = NUM_KEYS_TO_READ_FOR_YSQL_AND_YCQL; + // The number of keys to write. This is the combined total number of inserts and updates. + appConfig.numKeysToWrite = NUM_KEYS_TO_WRITE_FOR_YSQL_AND_YCQL; + // The number of unique keys to write. This determines the number of inserts (as opposed to + // updates). + appConfig.numUniqueKeysToWrite = NUM_UNIQUE_KEYS_FOR_YSQL_AND_YCQL; + } + + // The default table name to create and use for CRUD ops. + private static final String DEFAULT_TABLE_NAME = "PostgresqlKeyValue"; + + // The shared prepared select statement for fetching the data. + public volatile Connection selConnection = null; + public volatile PreparedStatement preparedSelect = null; + + // The shared prepared insert statement for inserting the data. + public volatile Connection insConnection = null; + public volatile PreparedStatement preparedInsert = null; + + public SqlInsertTablets() { + buffer = new byte[appConfig.valueSize]; + } + + public static Hashtable contactPointUsage = new Hashtable<>(); + + public Connection getPostgresConnectionFair() { + // under a lock + // initialize a Hash that keeps track of + // the number of times each contact point has been used + // find the smallest number + // create a connection for that endpoint + // increment the number of times that endpoint has been used + synchronized (SqlInsertTablets.class) { + if (contactPointUsage.isEmpty()) { + LOG.info("Initializing contact point usage"); + for (ContactPoint contactPoint : configuration.contactPoints) { + contactPointUsage.put(contactPoint, 0); + } + } + + int minUsage = Integer.MAX_VALUE; + ContactPoint minContactPoint = null; + for (ContactPoint contactPoint : contactPointUsage.keySet()) { + int usage = contactPointUsage.get(contactPoint); + // LOG.info("Contact point: " + contactPoint.getHost() + " usage: " + usage); + if (usage < minUsage) { + minUsage = usage; + minContactPoint = contactPoint; + } + } + + try { + String dbName = appConfig.defaultPostgresDatabase; + String dbUser = appConfig.dbUsername; + String dbPass = appConfig.dbPassword; + Connection connection = + getRawConnection( + minContactPoint.getHost(), minContactPoint.getPort(), dbName, dbUser, dbPass); + + contactPointUsage.put(minContactPoint, minUsage + 1); + return connection; + } catch (Exception e) { + LOG.error("Failed to get connection to " + minContactPoint, e); + return null; + } + } + } + + public Connection getRawConnection( + String hostname, int port, String dbName, String dbUser, String dbPass) + throws SQLException, ClassNotFoundException { + // Use the PG driver + Class.forName("org.postgresql.Driver"); + + Properties props = new Properties(); + props.setProperty("user", dbUser); + props.setProperty("password", dbPass); + if (appConfig.enableDriverDebug) { + props.setProperty("loggerLevel", "debug"); + } + props.setProperty("reWriteBatchedInserts", "true"); + + String connectStr = String.format("%s//%s:%d/%s", "jdbc:postgresql:", hostname, port, dbName); + + LOG.info("Establishing connection to host: " + connectStr); + Connection connection = DriverManager.getConnection(connectStr, props); + return connection; + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + try (Connection connection = getPostgresConnection()) { + + // (Re)Create the table (every run should start cleanly with an empty table). + if (tableOp.equals(TableOp.DropTable)) { + connection + .createStatement() + .execute(String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + } + connection + .createStatement() + .execute( + String.format( + "CREATE TABLE IF NOT EXISTS %s (k text PRIMARY KEY, v text) SPLIT INTO 3 TABLETS", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + if (tableOp.equals(TableOp.TruncateTable)) { + connection.createStatement().execute(String.format("TRUNCATE TABLE %s", getTableName())); + LOG.info(String.format("Truncated table: %s", getTableName())); + } + } + } + + public String getTableName() { + String tableName = appConfig.tableName != null ? appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + public PreparedStatement getPreparedSelect() throws Exception { + if (preparedSelect == null) { + LOG.info("Preparing SELECT statement"); + close(selConnection); + selConnection = getPostgresConnectionFair(); + preparedSelect = + selConnection.prepareStatement( + String.format("SELECT k, v FROM %s WHERE k = ?;", getTableName())); + } + return preparedSelect; + } + + @Override + public long doRead() { + try { + return doReadNoBarrier(getSimpleLoadGenerator().getKeyToRead()); + } finally { + try { + READ_BARRIER.await(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + LOG.error("Read barrier - expected for last iteration", e); + } + } + } + + int i = 0; + + public long doReadNoBarrier(Key key) { + if (key == null) { + // There are no keys to read yet. + return 0; + } + + try { + PreparedStatement statement = getPreparedSelect(); + statement.setString(1, key.asString()); + try (ResultSet rs = statement.executeQuery()) { + if (!rs.next()) { + LOG.error("Read key: " + key.asString() + " expected 1 row in result, got 0"); + return 0; + } + + if (!key.asString().equals(rs.getString("k"))) { + LOG.error("Read key: " + key.asString() + ", got " + rs.getString("k")); + } + LOG.debug("Read key: " + key.toString()); + + key.verify(rs.getString("v")); + + if (rs.next()) { + LOG.error("Read key: " + key.asString() + " expected 1 row in result, got more"); + return 0; + } + } + } catch (Exception e) { + LOG.info("Failed reading value: " + key.getValueStr(), e); + close(preparedSelect); + preparedSelect = null; + return 0; + } + return 1; + } + + public PreparedStatement getPreparedInsert() throws Exception { + if (preparedInsert == null) { + LOG.info("Preparing INSERT statement"); + close(insConnection); + insConnection = getPostgresConnectionFair(); + preparedInsert = + insConnection.prepareStatement( + String.format("INSERT INTO %s (k, v) VALUES (?, ?);", getTableName())); + } + return preparedInsert; + } + + @Override + public long doWrite(int threadIdx) { + try { + Key key = getSimpleLoadGenerator().getKeyToWrite(); + long a = doWriteNoBarrier(threadIdx, key); + getSimpleLoadGenerator().recordWriteSuccess(key); + return a; + } finally { + try { + WRITE_BARRIER.await(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + LOG.error("Write barrier - expected for last iteration", e); + } + } + } + + public long doWriteNoBarrier(int threadIdx, Key key) { + if (key == null) { + return 0; + } + + int result = 0; + try { + PreparedStatement statement = getPreparedInsert(); + // Prefix hashcode to ensure generated keys are random and not sequential. + statement.setString(1, key.asString()); + statement.setString(2, key.getValueStr()); + result = statement.executeUpdate(); + LOG.debug( + "Wrote key: " + key.asString() + ", " + key.getValueStr() + ", return code: " + result); + } catch (Exception e) { + getSimpleLoadGenerator().recordWriteFailure(key); + LOG.info("Failed writing key: " + key.asString(), e); + close(preparedInsert); + preparedInsert = null; + } + return result; + } + + @Override + public List getWorkloadDescription() { + return Arrays.asList( + "Sample key-value app built on PostgreSQL with concurrent readers and writers. The app" + + " inserts unique string keys", + "each with a string value to a postgres table with an index on the value column.", + "There are multiple readers and writers that update these keys and read them", + "for a specified number of operations,default value for read ops is " + + AppBase.appConfig.numKeysToRead + + " and write ops is " + + AppBase.appConfig.numKeysToWrite + + ", with the readers query the keys by the associated values that are", + "indexed. Note that the number of reads and writes to perform can be specified as", + "a parameter, user can run read/write(both) operations indefinitely by passing -1 to" + + " --num_reads or --num_writes or both."); + } + + @Override + public List getWorkloadOptionalArguments() { + return Arrays.asList( + "--num_unique_keys " + appConfig.numUniqueKeysToWrite, + "--num_reads " + appConfig.numKeysToRead, + "--num_writes " + appConfig.numKeysToWrite, + "--num_threads_read " + appConfig.numReaderThreads, + "--num_threads_write " + appConfig.numWriterThreads, + "--load_balance " + appConfig.loadBalance, + "--topology_keys " + appConfig.topologyKeys, + "--debug_driver " + appConfig.enableDriverDebug); + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/anomalies/SqlInsertTabletsSkewQuery.java b/src/main/java/com/yugabyte/sample/apps/anomalies/SqlInsertTabletsSkewQuery.java new file mode 100644 index 0000000..fd4c613 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/anomalies/SqlInsertTabletsSkewQuery.java @@ -0,0 +1,135 @@ +package com.yugabyte.sample.apps.anomalies; + +import com.yugabyte.sample.common.SimpleLoadGenerator.Key; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Logger; + +public class SqlInsertTabletsSkewQuery extends SqlInsertTablets { + private static final Logger LOG = Logger.getLogger(SqlInsertTabletsSkewQuery.class); + + private static final AtomicInteger READ_SKEW_SETUP = new AtomicInteger(0); + private static final AtomicInteger WRITE_SKEW_SETUP = new AtomicInteger(0); + + private boolean readSkewThread = false; + private boolean writeSkewThread = false; + public Thread readSkewThreadInstance = null; + public Thread writeSkewThreadInstance = null; + + public int writeCounter = 0; + public boolean writeSkewThreadStarted = false; + public int readCounter = 0; + public boolean readSkewThreadStarted = false; + + public void setupReadThread() { + if (READ_SKEW_SETUP.get() > 10) return; + if (READ_SKEW_SETUP.incrementAndGet() == 1) { + LOG.info("This is a read skew thread " + " - keys to read: " + appConfig.numKeysToRead); + long limit = appConfig.numKeysToRead / configuration.getNumReaderThreads() / 2; + readSkewThread = true; + + readSkewThreadInstance = + new Thread( + new Runnable() { + @Override + public void run() { + LOG.info("Beginning read skew thread up to " + limit + " keys."); + SqlInsertTabletsSkewQuery app = new SqlInsertTabletsSkewQuery(); + for (int i = 0; i < limit; i++) { + if (i % 1000 == 0) { + LOG.info("Read Skew Iteration: " + i + " / " + limit + " done."); + } + try { + app.doReadNoBarrier(getSimpleLoadGenerator().getKeyToRead()); + } catch (Exception e) { + LOG.info("Failed reading key: ", e); + } + } + LOG.info("Read Skew thread done"); + } + }); + } + } + + public void setupWriteThread(int threadIdx) { + if (WRITE_SKEW_SETUP.get() > 10) return; + if (WRITE_SKEW_SETUP.incrementAndGet() == 1) { + LOG.info( + "This is a write skew thread for " + + threadIdx + + " - keys to write: " + + appConfig.numKeysToWrite); + long limit = appConfig.numKeysToWrite / configuration.getNumWriterThreads() / 2; + writeSkewThread = true; + + writeSkewThreadInstance = + new Thread( + new Runnable() { + @Override + public void run() { + LOG.info("Beginning write skew thread with " + limit + " keys."); + String th = "Thread" + threadIdx; + SqlInsertTabletsSkewQuery app = new SqlInsertTabletsSkewQuery(); + for (int i = 0; i < limit; i++) { + if (i % 1000 == 0) { + LOG.info("Write Skew Iteration: " + i + " / " + limit + " done."); + } + try { + Key key = new Key(i, th); + app.doWriteNoBarrier(0, key); + } catch (Exception e) { + LOG.info("Failed writing key: ", e); + } + } + LOG.info("Write Skew thread done"); + } + }); + } + } + + @Override + public long doRead() { + try { + + long ret = super.doRead(); + readCounter++; + setupReadThread(); + + if (readSkewThread + && !readSkewThreadStarted + && readCounter + >= ((appConfig.numKeysToRead / configuration.getNumReaderThreads()) * 0.3)) { + readSkewThreadInstance.start(); + readSkewThreadStarted = true; + } + + return ret; + } catch (Exception e) { + LOG.info("Failed reading key: ", e); + return 0; + } + } + + @Override + public long doWrite(int threadIdx) { + try { + + long ret = super.doWrite(threadIdx); + writeCounter++; + setupWriteThread(threadIdx); + + if (writeSkewThread + && !writeSkewThreadStarted + && writeCounter + >= ((appConfig.numKeysToWrite / configuration.getNumWriterThreads()) * 0.3)) { + LOG.info("Starting write skew thread"); + writeSkewThreadInstance.start(); + writeSkewThreadStarted = true; + } + + return ret; + } catch (Exception e) { + LOG.info("Failed reading key: ", e); + return 0; + } + } +} diff --git a/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java b/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java index 54af5f0..a3700c3 100644 --- a/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java +++ b/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java @@ -13,10 +13,12 @@ package com.yugabyte.sample.common; +import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.UUID; import org.apache.commons.cli.BasicParser; @@ -33,6 +35,10 @@ // Import * so we can list the sample apps. import com.yugabyte.sample.apps.*; import com.yugabyte.sample.apps.AppBase.TableOp; +import com.yugabyte.sample.apps.anomalies.PlanAnomaly; +import com.yugabyte.sample.apps.anomalies.PlanAnomalyV2; +import com.yugabyte.sample.apps.anomalies.SqlInsertTablets; +import com.yugabyte.sample.apps.anomalies.SqlInsertTabletsSkewQuery; /** * This is a helper class to parse the user specified command-line options if they were specified, @@ -46,6 +52,10 @@ public class CmdLineOpts { // safely. public static UUID loadTesterUUID; + private static final List WORKLOAD_PACKAGES = ImmutableList.of( + "com.yugabyte.sample.apps.", + "com.yugabyte.sample.apps.anomalies"); + // The various apps present in this sample. private final static List HELP_WORKLOADS = ImmutableList.of( CassandraInserts.class, @@ -74,7 +84,11 @@ public class CmdLineOpts { SqlSecondaryIndex.class, SqlSnapshotTxns.class, SqlUpdates.class, - SqlStaleReadDetector.class + SqlStaleReadDetector.class, + SqlInsertTablets.class, + SqlInsertTabletsSkewQuery.class, + PlanAnomaly.class, + PlanAnomalyV2.class ); // The class type of the app needed to spawn new objects. @@ -581,11 +595,17 @@ public String appName() { return AppBase.appConfig.appName; } - private static Class getAppClass(String workloadType) - throws ClassNotFoundException { - // Get the workload class. - return Class.forName("com.yugabyte.sample.apps." + workloadType) - .asSubclass(AppBase.class); + private static Class getAppClass(String workloadType) { + for (String pkg : WORKLOAD_PACKAGES) { + try { + Class result = Class.forName(pkg + workloadType) + .asSubclass(AppBase.class); + return result; + } catch (ClassNotFoundException e) { + LOG.debug("Workload class not found in package " + pkg); + } + } + throw new RuntimeException("Workload " + workloadType + " not found"); } private void initializeThreadCount(CommandLine cmd) {