From 3b18accd8eb9ef0e4681cb95c6c46802ea6ad435 Mon Sep 17 00:00:00 2001 From: sershe Date: Wed, 12 Jun 2013 01:57:59 +0000 Subject: [PATCH] HBASE-8700 IntegrationTestBigLinkedList can fail due to random number collision git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1492034 13f79535-47bb-0310-9956-ffa450edef68 --- .../test/IntegrationTestBigLinkedList.java | 289 +++++++++++------- 1 file changed, 174 insertions(+), 115 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index e99619df290..a3a26d69fe1 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.StringWriter; import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; @@ -54,10 +55,9 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.VLongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counters; @@ -147,6 +147,7 @@ import org.junit.experimental.categories.Category; */ @Category(IntegrationTests.class) public class IntegrationTestBigLinkedList extends Configured implements Tool { + private static final byte[] NO_KEY = new byte[1]; protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table"; @@ -170,11 +171,22 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { private static final String GENERATOR_NUM_MAPPERS_KEY = "IntegrationTestBigLinkedList.generator.map.tasks"; + private static final String GENERATOR_WIDTH_KEY + = "IntegrationTestBigLinkedList.generator.width"; + + private static final String GENERATOR_WRAP_KEY + = "IntegrationTestBigLinkedList.generator.wrap"; + protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster + private static final int WIDTH_DEFAULT = 1000000; + private static final int WRAP_DEFAULT = 25; + + private static final int ROWKEY_LENGTH = 16; + static class CINode { - long key; - long prev; + byte[] key; + byte[] prev; String client; long count; } @@ -186,14 +198,11 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(Generator.class); - private static final int WIDTH = 1000000; - private static final int WRAP = WIDTH * 25; - public static enum Counts { UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT } - static class GeneratorInputFormat extends InputFormat { + static class GeneratorInputFormat extends InputFormat { static class GeneratorInputSplit extends InputSplit implements Writable { @Override public long getLength() throws IOException, InterruptedException { @@ -211,7 +220,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } } - static class GeneratorRecordReader extends RecordReader { + static class GeneratorRecordReader extends RecordReader { private long count; private long numNodes; private Random rand; @@ -221,8 +230,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } @Override - public LongWritable getCurrentKey() throws IOException, InterruptedException { - return new LongWritable(Math.abs(rand.nextLong())); + public BytesWritable getCurrentKey() throws IOException, InterruptedException { + byte[] bytes = new byte[ROWKEY_LENGTH]; + rand.nextBytes(bytes); + return new BytesWritable(bytes); } @Override @@ -250,7 +261,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } @Override - public RecordReader createRecordReader( + public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { GeneratorRecordReader rr = new GeneratorRecordReader(); rr.initialize(split, context); @@ -304,18 +315,19 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { * |___________________________| */ static class GeneratorMapper - extends Mapper { + extends Mapper { Random rand = new Random(); - long[] first = null; - long[] prev = null; - long[] current = new long[WIDTH]; + byte[][] first = null; + byte[][] prev = null; + byte[][] current = null; byte[] id; long count = 0; int i; HTable table; long numNodes; - long wrap = WRAP; + long wrap; + int width; protected void setup(Context context) throws IOException, InterruptedException { id = Bytes.toBytes(UUID.randomUUID().toString()); @@ -323,9 +335,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { table = new HTable(conf, getTableName(conf)); table.setAutoFlush(false); table.setWriteBufferSize(4 * 1024 * 1024); - numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000); - if (numNodes < 25000000) { - wrap = numNodes; + this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT); + current = new byte[this.width][]; + int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT); + this.wrap = (long)wrapMultiplier * width; + this.numNodes = context.getConfiguration().getLong( + GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT); + if (this.numNodes < this.wrap) { + this.wrap = this.numNodes; } }; @@ -334,17 +351,17 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { }; @Override - protected void map(LongWritable key, NullWritable value, Context output) throws IOException { - current[i++] = Math.abs(key.get()); - - if (i == current.length) { + protected void map(BytesWritable key, NullWritable value, Context output) throws IOException { + current[i] = new byte[key.getLength()]; + System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength()); + if (++i == current.length) { persist(output, count, prev, current, id); i = 0; if (first == null) first = current; prev = current; - current = new long[WIDTH]; + current = new byte[this.width][]; count += current.length; output.setStatus("Count " + count); @@ -362,18 +379,18 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } } - private static void circularLeftShift(long[] first) { - long ez = first[0]; + private static void circularLeftShift(T[] first) { + T ez = first[0]; for (int i = 0; i < first.length - 1; i++) first[i] = first[i + 1]; first[first.length - 1] = ez; } - private void persist(Context output, long count, long[] prev, long[] current, byte[] id) + private void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id) throws IOException { for (int i = 0; i < current.length; i++) { - Put put = new Put(Bytes.toBytes(current[i])); - put.add(FAMILY_NAME, COLUMN_PREV, Bytes.toBytes(prev == null ? -1 : prev[i])); + Put put = new Put(current[i]); + put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]); if (count >= 0) { put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); @@ -397,15 +414,18 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { public int run(String[] args) throws Exception { if (args.length < 3) { System.out.println("Usage : " + Generator.class.getSimpleName() + - " "); - System.out.println(" where should be a multiple of 25M"); + " [ ]"); + System.out.println(" where should be a multiple of " + + " width*wrap multiplier, 25M by default"); return 0; } int numMappers = Integer.parseInt(args[0]); long numNodes = Long.parseLong(args[1]); Path tmpOutput = new Path(args[2]); - return run(numMappers, numNodes, tmpOutput); + Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]); + Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]); + return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier); } protected void createSchema() throws IOException { @@ -419,8 +439,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { admin.close(); } - public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput) - throws Exception { + public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, + Integer width, Integer wrapMuplitplier) throws Exception { LOG.info("Running RandomInputGenerator with numMappers=" + numMappers + ", numNodes=" + numNodes); Job job = new Job(getConf()); @@ -430,11 +450,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { job.setJarByClass(getClass()); job.setInputFormatClass(GeneratorInputFormat.class); - job.setOutputKeyClass(LongWritable.class); + job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(NullWritable.class); - job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); - job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes); + setJobConf(job, numMappers, numNodes, width, wrapMuplitplier); job.setMapperClass(Mapper.class); //identity mapper @@ -446,7 +465,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { return success ? 0 : 1; } - public int runGenerator(int numMappers, long numNodes, Path tmpOutput) throws Exception { + public int runGenerator(int numMappers, long numNodes, Path tmpOutput, + Integer width, Integer wrapMuplitplier) throws Exception { LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes); createSchema(); @@ -461,8 +481,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); - job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); - job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes); + setJobConf(job, numMappers, numNodes, width, wrapMuplitplier); job.setMapperClass(GeneratorMapper.class); @@ -477,13 +496,13 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { return success ? 0 : 1; } - public int run(int numMappers, long numNodes, Path tmpOutput) throws Exception { - int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput); + public int run(int numMappers, long numNodes, Path tmpOutput, + Integer width, Integer wrapMuplitplier) throws Exception { + int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier); if (ret > 0) { return ret; } - - return runGenerator(numMappers, numNodes, tmpOutput); + return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier); } } @@ -494,72 +513,83 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { static class Verify extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(Verify.class); - private static final VLongWritable DEF = new VLongWritable(-1); + private static final BytesWritable DEF = new BytesWritable(NO_KEY); private Job job; - public static class VerifyMapper extends TableMapper { - private LongWritable row = new LongWritable(); - private LongWritable ref = new LongWritable(); - private VLongWritable vrow = new VLongWritable(); + public static class VerifyMapper extends TableMapper { + private BytesWritable row = new BytesWritable(); + private BytesWritable ref = new BytesWritable(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException ,InterruptedException { - row.set(Bytes.toLong(key.get())); + byte[] rowKey = key.get(); + row.set(rowKey, 0, rowKey.length); context.write(row, DEF); - - long prev = Bytes.toLong(value.getValue(FAMILY_NAME, COLUMN_PREV)); - if (prev >= 0) { - ref.set(prev); - vrow.set(Bytes.toLong(key.get())); - context.write(ref, vrow); + byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV); + if (prev != null && prev.length > 0) { + ref.set(prev, 0, prev.length); + context.write(ref, row); + } else { + LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey))); } } } public static enum Counts { - UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT + UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES } - public static class VerifyReducer extends Reducer { - private ArrayList refs = new ArrayList(); + public static class VerifyReducer extends Reducer { + private ArrayList refs = new ArrayList(); - public void reduce(LongWritable key, Iterable values, Context context) + public void reduce(BytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { int defCount = 0; refs.clear(); - for (VLongWritable type : values) { - if (type.get() == -1) { + for (BytesWritable type : values) { + if (type.getLength() == DEF.getLength()) { defCount++; } else { - refs.add(type.get()); + byte[] bytes = new byte[type.getLength()]; + System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength()); + refs.add(bytes); } } // TODO check for more than one def, should not happen + StringBuilder refsSb = null; + String keyString = null; + if (defCount == 0 || refs.size() != 1) { + refsSb = new StringBuilder(); + String comma = ""; + for (byte[] ref : refs) { + refsSb.append(comma); + comma = ","; + refsSb.append(Bytes.toStringBinary(ref)); + } + byte[] bytes = new byte[key.getLength()]; + keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); + } + if (defCount == 0 && refs.size() > 0) { // this is bad, found a node that is referenced but not defined. It must have been - //lost, emit some info about this node for debugging purposes. - - StringBuilder sb = new StringBuilder(); - String comma = ""; - for (Long ref : refs) { - sb.append(comma); - comma = ","; - sb.append(String.format("%016x", ref)); - } - - context.write(new Text(String.format("%016x", key.get())), new Text(sb.toString())); + // lost, emit some info about this node for debugging purposes. + context.write(new Text(keyString), new Text(refsSb.toString())); context.getCounter(Counts.UNDEFINED).increment(1); - } else if (defCount > 0 && refs.size() == 0) { // node is defined but not referenced + context.write(new Text(keyString), new Text("none")); context.getCounter(Counts.UNREFERENCED).increment(1); } else { + if (refs.size() > 1) { + context.write(new Text(keyString), new Text(refsSb.toString())); + context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1); + } // node is defined and referenced context.getCounter(Counts.REFERENCED).increment(1); } @@ -600,7 +630,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob(getTableName(getConf()), scan, - VerifyMapper.class, LongWritable.class, VLongWritable.class, job); + VerifyMapper.class, BytesWritable.class, BytesWritable.class, job); job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); @@ -623,6 +653,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { Counter referenced = counters.findCounter(Counts.REFERENCED); Counter unreferenced = counters.findCounter(Counts.UNREFERENCED); Counter undefined = counters.findCounter(Counts.UNDEFINED); + Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES); boolean success = true; //assert @@ -633,7 +664,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } if (unreferenced.getValue() > 0) { - LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()); + boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue()); + LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue() + + (couldBeMultiRef ? "; could be due to duplicate random numbers" : "")); success = false; } @@ -654,15 +687,15 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(Loop.class); - protected void runGenerator(int numMappers, long numNodes, String outputDir) throws Exception { + protected void runGenerator(int numMappers, long numNodes, + String outputDir, Integer width, Integer wrapMuplitplier) throws Exception { Path outputPath = new Path(outputDir); UUID uuid = UUID.randomUUID(); //create a random UUID. Path generatorOutput = new Path(outputPath, uuid.toString()); Generator generator = new Generator(); generator.setConf(getConf()); - int retCode = generator.run(numMappers, numNodes, generatorOutput); - + int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier); if (retCode > 0) { throw new RuntimeException("Generator failed with return code: " + retCode); } @@ -691,10 +724,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length < 5) { - System.err.println("Usage: Loop "); + System.err.println("Usage: Loop [ ]"); return 1; } - LOG.info("Running Loop with args:" + Arrays.deepToString(args)); int numIterations = Integer.parseInt(args[0]); @@ -702,6 +734,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { long numNodes = Long.parseLong(args[2]); String outputDir = args[3]; int numReducers = Integer.parseInt(args[4]); + Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); + Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]); long expectedNumNodes = 0; @@ -709,9 +743,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { numIterations = Integer.MAX_VALUE; //run indefinitely (kind of) } - for (int i=0; i < numIterations; i++) { + for (int i = 0; i < numIterations; i++) { LOG.info("Starting iteration = " + i); - runGenerator(numMappers, numNodes, outputDir); + runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier); expectedNumNodes += numMappers * numNodes; runVerify(outputDir, numReducers, expectedNumNodes); @@ -752,10 +786,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { scan.setBatch(10000); if (cmd.hasOption("s")) - scan.setStartRow(Bytes.toBytes(new BigInteger(cmd.getOptionValue("s"), 16).longValue())); + scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s"))); if (cmd.hasOption("e")) - scan.setStopRow(Bytes.toBytes(new BigInteger(cmd.getOptionValue("e"), 16).longValue())); + scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e"))); int limit = 0; if (cmd.hasOption("l")) @@ -770,7 +804,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { int count = 0; while (result != null && count++ < limit) { node = getCINode(result, node); - System.out.printf("%016x:%016x:%012d:%s\n", node.key, node.prev, node.count, node.client); + System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key), + Bytes.toStringBinary(node.prev), node.count, node.client); result = scanner.next(); } scanner.close(); @@ -789,10 +824,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { System.out.println("Usage : " + Delete.class.getSimpleName() + " "); return 0; } - long val = new BigInteger(args[0], 16).longValue(); + byte[] val = Bytes.toBytesBinary(args[0]); org.apache.hadoop.hbase.client.Delete delete - = new org.apache.hadoop.hbase.client.Delete(Bytes.toBytes(val)); + = new org.apache.hadoop.hbase.client.Delete(val); HTable table = new HTable(getConf(), getTableName(getConf())); @@ -812,6 +847,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { public int run(String[] args) throws IOException { Options options = new Options(); options.addOption("n", "num", true, "number of queries"); + options.addOption("s", "start", true, "key to start at, binary string"); + options.addOption("l", "logevery", true, "log every N queries"); GnuParser parser = new GnuParser(); CommandLine cmd = null; @@ -832,30 +869,40 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { if (cmd.hasOption('n')) { maxQueries = Long.parseLong(cmd.getOptionValue("n")); } + Random rand = new Random(); + boolean isSpecificStart = cmd.hasOption('s'); + byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null; + int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1; HTable table = new HTable(getConf(), getTableName(getConf())); - - Random rand = new Random(); - long numQueries = 0; - - while (numQueries < maxQueries) { - CINode node = findStartNode(rand, table); + // If isSpecificStart is set, only walk one list from that particular node. + // Note that in case of circular (or P-shaped) list it will walk forever, as is + // the case in normal run without startKey. + while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) { + if (!isSpecificStart) { + startKey = new byte[ROWKEY_LENGTH]; + rand.nextBytes(startKey); + } + CINode node = findStartNode(table, startKey); + if (node == null && isSpecificStart) { + System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey)); + } numQueries++; - while (node != null && node.prev >= 0 && numQueries < maxQueries) { - long prev = node.prev; - + while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) { + byte[] prev = node.prev; long t1 = System.currentTimeMillis(); node = getNode(prev, table, node); long t2 = System.currentTimeMillis(); - System.out.printf("CQ %d %016x \n", t2 - t1, prev); //cold cache - numQueries++; - - t1 = System.currentTimeMillis(); - node = getNode(prev, table, node); - t2 = System.currentTimeMillis(); - System.out.printf("HQ %d %016x \n", t2 - t1, prev); //hot cache + if (numQueries % logEvery == 0) { + System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev)); + } numQueries++; + if (node == null) { + System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev)); + } else if (node.prev.length == NO_KEY.length) { + System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key)); + } } } @@ -863,9 +910,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { return 0; } - private static CINode findStartNode(Random rand, HTable table) throws IOException { + private static CINode findStartNode(HTable table, byte[] startKey) throws IOException { Scan scan = new Scan(); - scan.setStartRow(Bytes.toBytes(Math.abs(rand.nextLong()))); + scan.setStartRow(startKey); scan.setBatch(1); scan.addColumn(FAMILY_NAME, COLUMN_PREV); @@ -877,7 +924,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { if ( result != null) { CINode node = getCINode(result, new CINode()); - System.out.printf("FSR %d %016x\n", t2 - t1, node.key); + System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key)); return node; } @@ -886,8 +933,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { return null; } - private CINode getNode(long row, HTable table, CINode node) throws IOException { - Get get = new Get(Bytes.toBytes(row)); + private CINode getNode(byte[] row, HTable table, CINode node) throws IOException { + Get get = new Get(row); get.addColumn(FAMILY_NAME, COLUMN_PREV); Result result = table.get(get); return getCINode(result, node); @@ -899,11 +946,11 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } private static CINode getCINode(Result result, CINode node) { - node.key = Bytes.toLong(result.getRow()); + node.key = Bytes.copy(result.getRow()); if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) { - node.prev = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_PREV)); + node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV)); } else { - node.prev = -1; + node.prev = NO_KEY; } if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) { node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT)); @@ -1006,4 +1053,16 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { int ret = ToolRunner.run(HBaseConfiguration.create(), new IntegrationTestBigLinkedList(), args); System.exit(ret); } -} \ No newline at end of file + + private static void setJobConf(Job job, int numMappers, long numNodes, + Integer width, Integer wrapMuplitplier) { + job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); + job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes); + if (width != null) { + job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width.intValue()); + } + if (wrapMuplitplier != null) { + job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMuplitplier.intValue()); + } + } +}