From cb118c8de6ddb783e90c07912a5fbdd629eabf06 Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Thu, 2 Jun 2016 18:00:08 -0700 Subject: [PATCH] HBASE-15935 Set up a concurrent walker that walks flushed circular linked lists as a Loop Mode Signed-off-by: Elliott Clark --- .../test/IntegrationTestBigLinkedList.java | 393 ++++++++++++++---- ...rationTestBigLinkedListWithVisibility.java | 15 +- .../test/IntegrationTestReplication.java | 3 +- 3 files changed, 316 insertions(+), 95 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 430c8a69cc9..6c54dcaeac4 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -168,8 +168,9 @@ import com.google.common.collect.Sets; * * Below is a description of the Java programs * - * Generator - A map only job that generates data. As stated previously,· - * its best to generate data in multiples of 25M. + * Generator - A map only job that generates data. As stated previously,·its best to generate data + * in multiples of 25M. An option is also available to allow concurrent walkers to select and walk + * random flushed loops during this phase. * * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and * UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same @@ -182,6 +183,11 @@ import com.google.common.collect.Sets; * Delete - A standalone program that deletes a single node * * This class can be run as a unit test, as an integration test, or from the command line + * + * ex: + * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList + * loop 2 1 100000 /temp 1 1000 50 1 0 + * */ @Category(IntegrationTests.class) public class IntegrationTestBigLinkedList extends IntegrationTestBase { @@ -217,6 +223,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { private static final String GENERATOR_WRAP_KEY = "IntegrationTestBigLinkedList.generator.wrap"; + private static final String CONCURRENT_WALKER_KEY + = "IntegrationTestBigLinkedList.generator.concurrentwalkers"; + protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters @@ -225,6 +234,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { private static final int WRAP_DEFAULT = 25; private static final int ROWKEY_LENGTH = 16; + private static final int CONCURRENT_WALKER_DEFAULT = 0; + protected String toRun; protected String[] otherArgs; @@ -256,6 +267,18 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY = "generator.multiple.columnfamilies"; + public static enum Counts { + SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION + } + + public static final String USAGE = "Usage : " + Generator.class.getSimpleName() + + " [ " + + " ] \n" + + "where should be a multiple of width*wrap multiplier, 25M by default \n" + + "walkers will verify random flushed loop during Generation."; + + public Job job; + static class GeneratorInputFormat extends InputFormat { static class GeneratorInputSplit extends InputSplit implements Writable { @Override @@ -371,6 +394,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { * |___________________________| * */ + static class GeneratorMapper extends Mapper { @@ -388,6 +412,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { boolean multipleUnevenColumnFamilies; byte[] tinyValue = new byte[] { 't' }; byte[] bigValue = null; + Configuration conf; + + volatile boolean walkersStop; + int numWalkers; + volatile List flushedLoops = new ArrayList<>(); + List walkers = new ArrayList<>(); @Override protected void setup(Context context) throws IOException, InterruptedException { @@ -404,6 +434,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { this.wrap = this.numNodes; } this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); + this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT); + this.walkersStop = false; + this.conf = context.getConfiguration(); } protected void instantiateHTable() throws IOException { @@ -414,6 +447,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { @Override protected void cleanup(Context context) throws IOException ,InterruptedException { + joinWalkers(); mutator.close(); connection.close(); } @@ -442,9 +476,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { // this block of code turns the 1 million linked list of length 25 into one giant //circular linked list of 25 million circularLeftShift(first); - persist(output, -1, prev, first, null); - + // At this point the entire loop has been flushed so we can add one of its nodes to the + // concurrent walker + if (numWalkers > 0) { + addFlushed(key.getBytes()); + if (walkers.isEmpty()) { + startWalkers(numWalkers, conf, output); + } + } first = null; prev = null; } @@ -457,6 +497,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { first[first.length - 1] = ez; } + private void addFlushed(byte[] rowKey) { + synchronized (flushedLoops) { + flushedLoops.add(Bytes.toLong(rowKey)); + flushedLoops.notifyAll(); + } + } + protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id) throws IOException { for (int i = 0; i < current.length; i++) { @@ -488,27 +535,153 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { output.progress(); } } - mutator.flush(); } + + private void startWalkers(int numWalkers, Configuration conf, Context context) { + LOG.info("Starting " + numWalkers + " concurrent walkers"); + for (int i = 0; i < numWalkers; i++) { + Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context)); + walker.start(); + walkers.add(walker); + } + } + + private void joinWalkers() { + walkersStop = true; + synchronized (flushedLoops) { + flushedLoops.notifyAll(); + } + for (Thread walker : walkers) { + try { + walker.join(); + } catch (InterruptedException e) { + // no-op + } + } + } + + /** + * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by + * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are + * configured to only log erroneous nodes. + */ + + public class ContinuousConcurrentWalker implements Runnable { + + ConcurrentWalker walker; + Configuration conf; + Context context; + Random rand; + + public ContinuousConcurrentWalker(Configuration conf, Context context) { + this.conf = conf; + this.context = context; + rand = new Random(); + } + + @Override + public void run() { + while (!walkersStop) { + try { + long node = selectLoop(); + try { + walkLoop(node); + } catch (IOException e) { + context.getCounter(Counts.IOEXCEPTION).increment(1l); + return; + } + } catch (InterruptedException e) { + return; + } + } + } + + private void walkLoop(long node) throws IOException { + walker = new ConcurrentWalker(context); + walker.setConf(conf); + walker.run(node, wrap); + } + + private long selectLoop () throws InterruptedException{ + synchronized (flushedLoops) { + while (flushedLoops.isEmpty() && !walkersStop) { + flushedLoops.wait(); + } + if (walkersStop) { + throw new InterruptedException(); + } + return flushedLoops.get(rand.nextInt(flushedLoops.size())); + } + } + } + + public static class ConcurrentWalker extends WalkerBase { + + Context context; + + public ConcurrentWalker(Context context) {this.context = context;} + + public void run(long startKeyIn, long maxQueriesIn) throws IOException { + + long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE; + byte[] startKey = Bytes.toBytes(startKeyIn); + + Connection connection = ConnectionFactory.createConnection(getConf()); + Table table = connection.getTable(getTableName(getConf())); + long numQueries = 0; + // If isSpecificStart is set, only walk one list from that particular node. + // Note that in case of circular (or P-shaped) list it will walk forever, as is + // the case in normal run without startKey. + + CINode node = findStartNode(table, startKey); + if (node == null) { + LOG.error("Start node not found: " + Bytes.toStringBinary(startKey)); + throw new IOException("Start node not found: " + startKeyIn); + } + while (numQueries < maxQueries) { + numQueries++; + byte[] prev = node.prev; + long t1 = System.currentTimeMillis(); + node = getNode(prev, table, node); + long t2 = System.currentTimeMillis(); + if (node == null) { + LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev)); + context.getCounter(Counts.UNDEFINED).increment(1l); + } else if (node.prev.length == NO_KEY.length) { + LOG.error("ConcurrentWalker found TERMINATING NODE: " + + Bytes.toStringBinary(node.key)); + context.getCounter(Counts.TERMINATING).increment(1l); + } else { + // Increment for successful walk + context.getCounter(Counts.SUCCESS).increment(1l); + } + } + table.close(); + connection.close(); + } + } } @Override public int run(String[] args) throws Exception { if (args.length < 3) { - System.out.println("Usage : " + Generator.class.getSimpleName() + - " [ ]"); - System.out.println(" where should be a multiple of " + - " width*wrap multiplier, 25M by default"); - return 0; + System.err.println(USAGE); + return 1; + } + try { + int numMappers = Integer.parseInt(args[0]); + long numNodes = Long.parseLong(args[1]); + Path tmpOutput = new Path(args[2]); + Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]); + Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]); + Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]); + return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); + } catch (NumberFormatException e) { + System.err.println("Parsing generator arguments failed: " + e.getMessage()); + System.err.println(USAGE); + return 1; } - - int numMappers = Integer.parseInt(args[0]); - long numNodes = Long.parseLong(args[1]); - Path tmpOutput = new Path(args[2]); - Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]); - Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]); - return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier); } protected void createSchema() throws IOException { @@ -562,7 +735,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, - Integer width, Integer wrapMuplitplier) throws Exception { + Integer width, Integer wrapMultiplier, Integer numWalkers) + throws Exception { LOG.info("Running RandomInputGenerator with numMappers=" + numMappers + ", numNodes=" + numNodes); Job job = Job.getInstance(getConf()); @@ -575,7 +749,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(NullWritable.class); - setJobConf(job, numMappers, numNodes, width, wrapMuplitplier); + setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); job.setMapperClass(Mapper.class); //identity mapper @@ -588,10 +762,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } public int runGenerator(int numMappers, long numNodes, Path tmpOutput, - Integer width, Integer wrapMuplitplier) throws Exception { + Integer width, Integer wrapMultiplier, Integer numWalkers) + throws Exception { LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes); createSchema(); - Job job = Job.getInstance(getConf()); + job = Job.getInstance(getConf()); job.setJobName("Link Generator"); job.setNumReduceTasks(0); @@ -602,7 +777,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); - setJobConf(job, numMappers, numNodes, width, wrapMuplitplier); + setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); setMapperForGenerator(job); @@ -629,12 +804,34 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } public int run(int numMappers, long numNodes, Path tmpOutput, - Integer width, Integer wrapMuplitplier) throws Exception { - int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier); + Integer width, Integer wrapMultiplier, Integer numWalkers) + throws Exception { + int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, + numWalkers); if (ret > 0) { return ret; } - return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier); + return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); + } + + public boolean verify() { + try { + Counters counters = job.getCounters(); + + if (counters.findCounter(Counts.TERMINATING).getValue() > 0 || + counters.findCounter(Counts.UNDEFINED).getValue() > 0 || + counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) { + LOG.error("Concurrent walker failed to verify during Generation phase"); + LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue()); + LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue()); + LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue()); + return false; + } + } catch (IOException e) { + LOG.info("Generator verification could not find counter"); + return false; + } + return true; } } @@ -1240,21 +1437,33 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { static class Loop extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(Loop.class); + private static final String USAGE = "Usage: Loop " + + " [ " + + " ] \n" + + "where should be a multiple of width*wrap multiplier, 25M by default \n" + + "walkers will select and verify random flushed loop during Generation."; IntegrationTestBigLinkedList it; protected void runGenerator(int numMappers, long numNodes, - String outputDir, Integer width, Integer wrapMuplitplier) throws Exception { + String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers) + throws Exception { Path outputPath = new Path(outputDir); UUID uuid = UUID.randomUUID(); //create a random UUID. Path generatorOutput = new Path(outputPath, uuid.toString()); Generator generator = new Generator(); generator.setConf(getConf()); - int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier); + int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, + numWalkers); if (retCode > 0) { throw new RuntimeException("Generator failed with return code: " + retCode); } + if (numWalkers > 0) { + if (!generator.verify()) { + throw new RuntimeException("Generator.verify failed"); + } + } } protected void runVerify(String outputDir, @@ -1273,41 +1482,43 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { if (!verify.verify(expectedNumNodes)) { throw new RuntimeException("Verify.verify failed"); } - - LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes); + LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); } @Override public int run(String[] args) throws Exception { if (args.length < 5) { - System.err.println("Usage: Loop [ ]"); + System.err.println(USAGE); return 1; } - LOG.info("Running Loop with args:" + Arrays.deepToString(args)); + try { + int numIterations = Integer.parseInt(args[0]); + int numMappers = Integer.parseInt(args[1]); + long numNodes = Long.parseLong(args[2]); + String outputDir = args[3]; + int numReducers = Integer.parseInt(args[4]); + Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); + Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); + Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]); - int numIterations = Integer.parseInt(args[0]); - int numMappers = Integer.parseInt(args[1]); - long numNodes = Long.parseLong(args[2]); - String outputDir = args[3]; - int numReducers = Integer.parseInt(args[4]); - Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); - Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]); + long expectedNumNodes = 0; - long expectedNumNodes = 0; - - if (numIterations < 0) { - numIterations = Integer.MAX_VALUE; //run indefinitely (kind of) + if (numIterations < 0) { + numIterations = Integer.MAX_VALUE; //run indefinitely (kind of) + } + LOG.info("Running Loop with args:" + Arrays.deepToString(args)); + for (int i = 0; i < numIterations; i++) { + LOG.info("Starting iteration = " + i); + runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers); + expectedNumNodes += numMappers * numNodes; + runVerify(outputDir, numReducers, expectedNumNodes); + } + return 0; + } catch (NumberFormatException e) { + System.err.println("Parsing loop arguments failed: " + e.getMessage()); + System.err.println(USAGE); + return 1; } - - for (int i = 0; i < numIterations; i++) { - LOG.info("Starting iteration = " + i); - runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier); - expectedNumNodes += numMappers * numNodes; - - runVerify(outputDir, numReducers, expectedNumNodes); - } - - return 0; } } @@ -1399,13 +1610,47 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } } + abstract static class WalkerBase extends Configured{ + protected static CINode findStartNode(Table table, byte[] startKey) throws IOException { + Scan scan = new Scan(); + scan.setStartRow(startKey); + scan.setBatch(1); + scan.addColumn(FAMILY_NAME, COLUMN_PREV); + + long t1 = System.currentTimeMillis(); + ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); + long t2 = System.currentTimeMillis(); + scanner.close(); + + if ( result != null) { + CINode node = getCINode(result, new CINode()); + System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key)); + return node; + } + + System.out.println("FSR " + (t2 - t1)); + + return null; + } + protected CINode getNode(byte[] row, Table table, CINode node) throws IOException { + Get get = new Get(row); + get.addColumn(FAMILY_NAME, COLUMN_PREV); + Result result = table.get(get); + return getCINode(result, node); + } + } /** * A stand alone program that follows a linked list created by {@link Generator} and prints * timing info. */ - private static class Walker extends Configured implements Tool { + private static class Walker extends WalkerBase implements Tool { + + public Walker(){} + @Override public int run(String[] args) throws IOException { + Options options = new Options(); options.addOption("n", "num", true, "number of queries"); options.addOption("s", "start", true, "key to start at, binary string"); @@ -1432,6 +1677,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } Random rand = new SecureRandom(); boolean isSpecificStart = cmd.hasOption('s'); + byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null; int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1; @@ -1451,12 +1697,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey)); } numQueries++; - while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) { + while (node != null && node.prev.length != NO_KEY.length && + numQueries < maxQueries) { byte[] prev = node.prev; long t1 = System.currentTimeMillis(); node = getNode(prev, table, node); long t2 = System.currentTimeMillis(); - if (numQueries % logEvery == 0) { + if (logEvery > 0 && numQueries % logEvery == 0) { System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev)); } numQueries++; @@ -1467,41 +1714,10 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } } } - table.close(); connection.close(); return 0; } - - private static CINode findStartNode(Table table, byte[] startKey) throws IOException { - Scan scan = new Scan(); - scan.setStartRow(startKey); - scan.setBatch(1); - scan.addColumn(FAMILY_NAME, COLUMN_PREV); - - long t1 = System.currentTimeMillis(); - ResultScanner scanner = table.getScanner(scan); - Result result = scanner.next(); - long t2 = System.currentTimeMillis(); - scanner.close(); - - if ( result != null) { - CINode node = getCINode(result, new CINode()); - System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key)); - return node; - } - - System.out.println("FSR " + (t2 - t1)); - - return null; - } - - private CINode getNode(byte[] row, Table table, CINode node) throws IOException { - Get get = new Get(row); - get.addColumn(FAMILY_NAME, COLUMN_PREV); - Result result = table.get(get); - return getCINode(result, node); - } } private static class Clean extends Configured implements Tool { @@ -1691,7 +1907,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } private static void setJobConf(Job job, int numMappers, long numNodes, - Integer width, Integer wrapMultiplier) { + Integer width, Integer wrapMultiplier, Integer numWalkers) { job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes); if (width != null) { @@ -1700,6 +1916,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { if (wrapMultiplier != null) { job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier); } + if (numWalkers != null) { + job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers); + } } public static void setJobScannerConf(Job job) { diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java index f8a8ffa99c8..cdee14ddb74 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java @@ -480,21 +480,22 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB @Override protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width, - Integer wrapMuplitplier) throws Exception { + Integer wrapMultiplier, Integer numWalkers) throws Exception { Path outputPath = new Path(outputDir); UUID uuid = UUID.randomUUID(); // create a random UUID. Path generatorOutput = new Path(outputPath, uuid.toString()); Generator generator = new VisibilityGenerator(); generator.setConf(getConf()); - int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier); + int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, + numWalkers); if (retCode > 0) { throw new RuntimeException("Generator failed with return code: " + retCode); } } protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width, - Integer wrapMuplitplier, int tableIndex) throws Exception { + Integer wrapMultiplier, int tableIndex) throws Exception { LOG.info("Running copier on table "+IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex)); Copier copier = new Copier( IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true); @@ -595,8 +596,7 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB String outputDir = args[3]; int numReducers = Integer.parseInt(args[4]); Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); - Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]); - + Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); long expectedNumNodes = 0; if (numIterations < 0) { @@ -606,7 +606,8 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB for (int i = 0; i < numIterations; i++) { LOG.info("Starting iteration = " + i); LOG.info("Generating data"); - runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier); + // By default run no concurrent walkers for test with visibility + runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0); expectedNumNodes += numMappers * numNodes; // Copying wont work because expressions are not returned back to the // client @@ -619,7 +620,7 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB sleep(SLEEP_IN_MS); for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { LOG.info("Deleting data on table with index: "+j); - runDelete(numMappers, numNodes, outputDir, width, wrapMuplitplier, j); + runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j); sleep(SLEEP_IN_MS); LOG.info("Verifying common table after deleting"); runVerify(outputDir, numReducers, expectedNumNodes, j); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java index c6668ad58cf..141b24db43f 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java @@ -267,7 +267,8 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList { Generator generator = new Generator(); generator.setConf(source.getConfiguration()); - int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier); + // Disable concurrent walkers for IntegrationTestReplication + int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 0); if (retCode > 0) { throw new RuntimeException("Generator failed with return code: " + retCode); }