HBASE-8700 IntegrationTestBigLinkedList can fail due to random number collision

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1492034 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2013-06-12 01:57:59 +00:00
parent fbd8cf7a73
commit 3b18accd8e
1 changed files with 174 additions and 115 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.test;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.io.StringWriter;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -54,10 +55,9 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
@ -147,6 +147,7 @@ import org.junit.experimental.categories.Category;
*/ */
@Category(IntegrationTests.class) @Category(IntegrationTests.class)
public class IntegrationTestBigLinkedList extends Configured implements Tool { public class IntegrationTestBigLinkedList extends Configured implements Tool {
private static final byte[] NO_KEY = new byte[1];
protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table"; protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
@ -170,11 +171,22 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
private static final String GENERATOR_NUM_MAPPERS_KEY private static final String GENERATOR_NUM_MAPPERS_KEY
= "IntegrationTestBigLinkedList.generator.map.tasks"; = "IntegrationTestBigLinkedList.generator.map.tasks";
private static final String GENERATOR_WIDTH_KEY
= "IntegrationTestBigLinkedList.generator.width";
private static final String GENERATOR_WRAP_KEY
= "IntegrationTestBigLinkedList.generator.wrap";
protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
private static final int WIDTH_DEFAULT = 1000000;
private static final int WRAP_DEFAULT = 25;
private static final int ROWKEY_LENGTH = 16;
static class CINode { static class CINode {
long key; byte[] key;
long prev; byte[] prev;
String client; String client;
long count; long count;
} }
@ -186,14 +198,11 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(Generator.class); private static final Log LOG = LogFactory.getLog(Generator.class);
private static final int WIDTH = 1000000;
private static final int WRAP = WIDTH * 25;
public static enum Counts { public static enum Counts {
UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
} }
static class GeneratorInputFormat extends InputFormat<LongWritable,NullWritable> { static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
static class GeneratorInputSplit extends InputSplit implements Writable { static class GeneratorInputSplit extends InputSplit implements Writable {
@Override @Override
public long getLength() throws IOException, InterruptedException { public long getLength() throws IOException, InterruptedException {
@ -211,7 +220,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
} }
} }
static class GeneratorRecordReader extends RecordReader<LongWritable,NullWritable> { static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
private long count; private long count;
private long numNodes; private long numNodes;
private Random rand; private Random rand;
@ -221,8 +230,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
} }
@Override @Override
public LongWritable getCurrentKey() throws IOException, InterruptedException { public BytesWritable getCurrentKey() throws IOException, InterruptedException {
return new LongWritable(Math.abs(rand.nextLong())); byte[] bytes = new byte[ROWKEY_LENGTH];
rand.nextBytes(bytes);
return new BytesWritable(bytes);
} }
@Override @Override
@ -250,7 +261,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
} }
@Override @Override
public RecordReader<LongWritable,NullWritable> createRecordReader( public RecordReader<BytesWritable,NullWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
GeneratorRecordReader rr = new GeneratorRecordReader(); GeneratorRecordReader rr = new GeneratorRecordReader();
rr.initialize(split, context); rr.initialize(split, context);
@ -304,18 +315,19 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
* |___________________________| * |___________________________|
*/ */
static class GeneratorMapper static class GeneratorMapper
extends Mapper<LongWritable, NullWritable, NullWritable, NullWritable> { extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
Random rand = new Random(); Random rand = new Random();
long[] first = null; byte[][] first = null;
long[] prev = null; byte[][] prev = null;
long[] current = new long[WIDTH]; byte[][] current = null;
byte[] id; byte[] id;
long count = 0; long count = 0;
int i; int i;
HTable table; HTable table;
long numNodes; long numNodes;
long wrap = WRAP; long wrap;
int width;
protected void setup(Context context) throws IOException, InterruptedException { protected void setup(Context context) throws IOException, InterruptedException {
id = Bytes.toBytes(UUID.randomUUID().toString()); id = Bytes.toBytes(UUID.randomUUID().toString());
@ -323,9 +335,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
table = new HTable(conf, getTableName(conf)); table = new HTable(conf, getTableName(conf));
table.setAutoFlush(false); table.setAutoFlush(false);
table.setWriteBufferSize(4 * 1024 * 1024); table.setWriteBufferSize(4 * 1024 * 1024);
numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000); this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
if (numNodes < 25000000) { current = new byte[this.width][];
wrap = numNodes; int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
this.wrap = (long)wrapMultiplier * width;
this.numNodes = context.getConfiguration().getLong(
GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
if (this.numNodes < this.wrap) {
this.wrap = this.numNodes;
} }
}; };
@ -334,17 +351,17 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
}; };
@Override @Override
protected void map(LongWritable key, NullWritable value, Context output) throws IOException { protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
current[i++] = Math.abs(key.get()); current[i] = new byte[key.getLength()];
System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
if (i == current.length) { if (++i == current.length) {
persist(output, count, prev, current, id); persist(output, count, prev, current, id);
i = 0; i = 0;
if (first == null) if (first == null)
first = current; first = current;
prev = current; prev = current;
current = new long[WIDTH]; current = new byte[this.width][];
count += current.length; count += current.length;
output.setStatus("Count " + count); output.setStatus("Count " + count);
@ -362,18 +379,18 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
} }
} }
private static void circularLeftShift(long[] first) { private static <T> void circularLeftShift(T[] first) {
long ez = first[0]; T ez = first[0];
for (int i = 0; i < first.length - 1; i++) for (int i = 0; i < first.length - 1; i++)
first[i] = first[i + 1]; first[i] = first[i + 1];
first[first.length - 1] = ez; first[first.length - 1] = ez;
} }
private void persist(Context output, long count, long[] prev, long[] current, byte[] id) private void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
throws IOException { throws IOException {
for (int i = 0; i < current.length; i++) { for (int i = 0; i < current.length; i++) {
Put put = new Put(Bytes.toBytes(current[i])); Put put = new Put(current[i]);
put.add(FAMILY_NAME, COLUMN_PREV, Bytes.toBytes(prev == null ? -1 : prev[i])); put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
if (count >= 0) { if (count >= 0) {
put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
@ -397,15 +414,18 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
if (args.length < 3) { if (args.length < 3) {
System.out.println("Usage : " + Generator.class.getSimpleName() + System.out.println("Usage : " + Generator.class.getSimpleName() +
" <num mappers> <num nodes per map> <tmp output dir>"); " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>]");
System.out.println(" where <num nodes per map> should be a multiple of 25M"); System.out.println(" where <num nodes per map> should be a multiple of " +
" width*wrap multiplier, 25M by default");
return 0; return 0;
} }
int numMappers = Integer.parseInt(args[0]); int numMappers = Integer.parseInt(args[0]);
long numNodes = Long.parseLong(args[1]); long numNodes = Long.parseLong(args[1]);
Path tmpOutput = new Path(args[2]); Path tmpOutput = new Path(args[2]);
return run(numMappers, numNodes, tmpOutput); Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
} }
protected void createSchema() throws IOException { protected void createSchema() throws IOException {
@ -419,8 +439,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
admin.close(); admin.close();
} }
public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput) public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
throws Exception { Integer width, Integer wrapMuplitplier) throws Exception {
LOG.info("Running RandomInputGenerator with numMappers=" + numMappers LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
+ ", numNodes=" + numNodes); + ", numNodes=" + numNodes);
Job job = new Job(getConf()); Job job = new Job(getConf());
@ -430,11 +450,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
job.setJarByClass(getClass()); job.setJarByClass(getClass());
job.setInputFormatClass(GeneratorInputFormat.class); job.setInputFormatClass(GeneratorInputFormat.class);
job.setOutputKeyClass(LongWritable.class); job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(NullWritable.class); job.setOutputValueClass(NullWritable.class);
job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
job.setMapperClass(Mapper.class); //identity mapper job.setMapperClass(Mapper.class); //identity mapper
@ -446,7 +465,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
return success ? 0 : 1; return success ? 0 : 1;
} }
public int runGenerator(int numMappers, long numNodes, Path tmpOutput) throws Exception { public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
Integer width, Integer wrapMuplitplier) throws Exception {
LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes); LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
createSchema(); createSchema();
@ -461,8 +481,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
job.setOutputKeyClass(NullWritable.class); job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class); job.setOutputValueClass(NullWritable.class);
job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
job.setMapperClass(GeneratorMapper.class); job.setMapperClass(GeneratorMapper.class);
@ -477,13 +496,13 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
return success ? 0 : 1; return success ? 0 : 1;
} }
public int run(int numMappers, long numNodes, Path tmpOutput) throws Exception { public int run(int numMappers, long numNodes, Path tmpOutput,
int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput); Integer width, Integer wrapMuplitplier) throws Exception {
int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
if (ret > 0) { if (ret > 0) {
return ret; return ret;
} }
return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
return runGenerator(numMappers, numNodes, tmpOutput);
} }
} }
@ -494,72 +513,83 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
static class Verify extends Configured implements Tool { static class Verify extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(Verify.class); private static final Log LOG = LogFactory.getLog(Verify.class);
private static final VLongWritable DEF = new VLongWritable(-1); private static final BytesWritable DEF = new BytesWritable(NO_KEY);
private Job job; private Job job;
public static class VerifyMapper extends TableMapper<LongWritable, VLongWritable> { public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
private LongWritable row = new LongWritable(); private BytesWritable row = new BytesWritable();
private LongWritable ref = new LongWritable(); private BytesWritable ref = new BytesWritable();
private VLongWritable vrow = new VLongWritable();
@Override @Override
protected void map(ImmutableBytesWritable key, Result value, Context context) protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException ,InterruptedException { throws IOException ,InterruptedException {
row.set(Bytes.toLong(key.get())); byte[] rowKey = key.get();
row.set(rowKey, 0, rowKey.length);
context.write(row, DEF); context.write(row, DEF);
byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
long prev = Bytes.toLong(value.getValue(FAMILY_NAME, COLUMN_PREV)); if (prev != null && prev.length > 0) {
if (prev >= 0) { ref.set(prev, 0, prev.length);
ref.set(prev); context.write(ref, row);
vrow.set(Bytes.toLong(key.get())); } else {
context.write(ref, vrow); LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
} }
} }
} }
public static enum Counts { public static enum Counts {
UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES
} }
public static class VerifyReducer extends Reducer<LongWritable,VLongWritable,Text,Text> { public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
private ArrayList<Long> refs = new ArrayList<Long>(); private ArrayList<byte[]> refs = new ArrayList<byte[]>();
public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
throws IOException, InterruptedException { throws IOException, InterruptedException {
int defCount = 0; int defCount = 0;
refs.clear(); refs.clear();
for (VLongWritable type : values) { for (BytesWritable type : values) {
if (type.get() == -1) { if (type.getLength() == DEF.getLength()) {
defCount++; defCount++;
} else { } else {
refs.add(type.get()); byte[] bytes = new byte[type.getLength()];
System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
refs.add(bytes);
} }
} }
// TODO check for more than one def, should not happen // TODO check for more than one def, should not happen
StringBuilder refsSb = null;
String keyString = null;
if (defCount == 0 || refs.size() != 1) {
refsSb = new StringBuilder();
String comma = "";
for (byte[] ref : refs) {
refsSb.append(comma);
comma = ",";
refsSb.append(Bytes.toStringBinary(ref));
}
byte[] bytes = new byte[key.getLength()];
keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
}
if (defCount == 0 && refs.size() > 0) { if (defCount == 0 && refs.size() > 0) {
// this is bad, found a node that is referenced but not defined. It must have been // this is bad, found a node that is referenced but not defined. It must have been
//lost, emit some info about this node for debugging purposes. // lost, emit some info about this node for debugging purposes.
context.write(new Text(keyString), new Text(refsSb.toString()));
StringBuilder sb = new StringBuilder();
String comma = "";
for (Long ref : refs) {
sb.append(comma);
comma = ",";
sb.append(String.format("%016x", ref));
}
context.write(new Text(String.format("%016x", key.get())), new Text(sb.toString()));
context.getCounter(Counts.UNDEFINED).increment(1); context.getCounter(Counts.UNDEFINED).increment(1);
} else if (defCount > 0 && refs.size() == 0) { } else if (defCount > 0 && refs.size() == 0) {
// node is defined but not referenced // node is defined but not referenced
context.write(new Text(keyString), new Text("none"));
context.getCounter(Counts.UNREFERENCED).increment(1); context.getCounter(Counts.UNREFERENCED).increment(1);
} else { } else {
if (refs.size() > 1) {
context.write(new Text(keyString), new Text(refsSb.toString()));
context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
}
// node is defined and referenced // node is defined and referenced
context.getCounter(Counts.REFERENCED).increment(1); context.getCounter(Counts.REFERENCED).increment(1);
} }
@ -600,7 +630,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
scan.setCacheBlocks(false); scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(getTableName(getConf()), scan, TableMapReduceUtil.initTableMapperJob(getTableName(getConf()), scan,
VerifyMapper.class, LongWritable.class, VLongWritable.class, job); VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
@ -623,6 +653,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
Counter referenced = counters.findCounter(Counts.REFERENCED); Counter referenced = counters.findCounter(Counts.REFERENCED);
Counter unreferenced = counters.findCounter(Counts.UNREFERENCED); Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
Counter undefined = counters.findCounter(Counts.UNDEFINED); Counter undefined = counters.findCounter(Counts.UNDEFINED);
Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
boolean success = true; boolean success = true;
//assert //assert
@ -633,7 +664,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
} }
if (unreferenced.getValue() > 0) { if (unreferenced.getValue() > 0) {
LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()); boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
+ (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
success = false; success = false;
} }
@ -654,15 +687,15 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(Loop.class); private static final Log LOG = LogFactory.getLog(Loop.class);
protected void runGenerator(int numMappers, long numNodes, String outputDir) throws Exception { protected void runGenerator(int numMappers, long numNodes,
String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
Path outputPath = new Path(outputDir); Path outputPath = new Path(outputDir);
UUID uuid = UUID.randomUUID(); //create a random UUID. UUID uuid = UUID.randomUUID(); //create a random UUID.
Path generatorOutput = new Path(outputPath, uuid.toString()); Path generatorOutput = new Path(outputPath, uuid.toString());
Generator generator = new Generator(); Generator generator = new Generator();
generator.setConf(getConf()); generator.setConf(getConf());
int retCode = generator.run(numMappers, numNodes, generatorOutput); int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
if (retCode > 0) { if (retCode > 0) {
throw new RuntimeException("Generator failed with return code: " + retCode); throw new RuntimeException("Generator failed with return code: " + retCode);
} }
@ -691,10 +724,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
if (args.length < 5) { if (args.length < 5) {
System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>"); System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>]");
return 1; return 1;
} }
LOG.info("Running Loop with args:" + Arrays.deepToString(args)); LOG.info("Running Loop with args:" + Arrays.deepToString(args));
int numIterations = Integer.parseInt(args[0]); int numIterations = Integer.parseInt(args[0]);
@ -702,6 +734,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
long numNodes = Long.parseLong(args[2]); long numNodes = Long.parseLong(args[2]);
String outputDir = args[3]; String outputDir = args[3];
int numReducers = Integer.parseInt(args[4]); int numReducers = Integer.parseInt(args[4]);
Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
long expectedNumNodes = 0; long expectedNumNodes = 0;
@ -709,9 +743,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
numIterations = Integer.MAX_VALUE; //run indefinitely (kind of) numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
} }
for (int i=0; i < numIterations; i++) { for (int i = 0; i < numIterations; i++) {
LOG.info("Starting iteration = " + i); LOG.info("Starting iteration = " + i);
runGenerator(numMappers, numNodes, outputDir); runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
expectedNumNodes += numMappers * numNodes; expectedNumNodes += numMappers * numNodes;
runVerify(outputDir, numReducers, expectedNumNodes); runVerify(outputDir, numReducers, expectedNumNodes);
@ -752,10 +786,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
scan.setBatch(10000); scan.setBatch(10000);
if (cmd.hasOption("s")) if (cmd.hasOption("s"))
scan.setStartRow(Bytes.toBytes(new BigInteger(cmd.getOptionValue("s"), 16).longValue())); scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
if (cmd.hasOption("e")) if (cmd.hasOption("e"))
scan.setStopRow(Bytes.toBytes(new BigInteger(cmd.getOptionValue("e"), 16).longValue())); scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
int limit = 0; int limit = 0;
if (cmd.hasOption("l")) if (cmd.hasOption("l"))
@ -770,7 +804,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
int count = 0; int count = 0;
while (result != null && count++ < limit) { while (result != null && count++ < limit) {
node = getCINode(result, node); node = getCINode(result, node);
System.out.printf("%016x:%016x:%012d:%s\n", node.key, node.prev, node.count, node.client); System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
Bytes.toStringBinary(node.prev), node.count, node.client);
result = scanner.next(); result = scanner.next();
} }
scanner.close(); scanner.close();
@ -789,10 +824,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>"); System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
return 0; return 0;
} }
long val = new BigInteger(args[0], 16).longValue(); byte[] val = Bytes.toBytesBinary(args[0]);
org.apache.hadoop.hbase.client.Delete delete org.apache.hadoop.hbase.client.Delete delete
= new org.apache.hadoop.hbase.client.Delete(Bytes.toBytes(val)); = new org.apache.hadoop.hbase.client.Delete(val);
HTable table = new HTable(getConf(), getTableName(getConf())); HTable table = new HTable(getConf(), getTableName(getConf()));
@ -812,6 +847,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
public int run(String[] args) throws IOException { public int run(String[] args) throws IOException {
Options options = new Options(); Options options = new Options();
options.addOption("n", "num", true, "number of queries"); options.addOption("n", "num", true, "number of queries");
options.addOption("s", "start", true, "key to start at, binary string");
options.addOption("l", "logevery", true, "log every N queries");
GnuParser parser = new GnuParser(); GnuParser parser = new GnuParser();
CommandLine cmd = null; CommandLine cmd = null;
@ -832,30 +869,40 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
if (cmd.hasOption('n')) { if (cmd.hasOption('n')) {
maxQueries = Long.parseLong(cmd.getOptionValue("n")); maxQueries = Long.parseLong(cmd.getOptionValue("n"));
} }
Random rand = new Random();
boolean isSpecificStart = cmd.hasOption('s');
byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
HTable table = new HTable(getConf(), getTableName(getConf())); HTable table = new HTable(getConf(), getTableName(getConf()));
Random rand = new Random();
long numQueries = 0; long numQueries = 0;
// If isSpecificStart is set, only walk one list from that particular node.
while (numQueries < maxQueries) { // Note that in case of circular (or P-shaped) list it will walk forever, as is
CINode node = findStartNode(rand, table); // the case in normal run without startKey.
while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
if (!isSpecificStart) {
startKey = new byte[ROWKEY_LENGTH];
rand.nextBytes(startKey);
}
CINode node = findStartNode(table, startKey);
if (node == null && isSpecificStart) {
System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
}
numQueries++; numQueries++;
while (node != null && node.prev >= 0 && numQueries < maxQueries) { while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
long prev = node.prev; byte[] prev = node.prev;
long t1 = System.currentTimeMillis(); long t1 = System.currentTimeMillis();
node = getNode(prev, table, node); node = getNode(prev, table, node);
long t2 = System.currentTimeMillis(); long t2 = System.currentTimeMillis();
System.out.printf("CQ %d %016x \n", t2 - t1, prev); //cold cache if (numQueries % logEvery == 0) {
numQueries++; System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
}
t1 = System.currentTimeMillis();
node = getNode(prev, table, node);
t2 = System.currentTimeMillis();
System.out.printf("HQ %d %016x \n", t2 - t1, prev); //hot cache
numQueries++; numQueries++;
if (node == null) {
System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
} else if (node.prev.length == NO_KEY.length) {
System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
}
} }
} }
@ -863,9 +910,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
return 0; return 0;
} }
private static CINode findStartNode(Random rand, HTable table) throws IOException { private static CINode findStartNode(HTable table, byte[] startKey) throws IOException {
Scan scan = new Scan(); Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(Math.abs(rand.nextLong()))); scan.setStartRow(startKey);
scan.setBatch(1); scan.setBatch(1);
scan.addColumn(FAMILY_NAME, COLUMN_PREV); scan.addColumn(FAMILY_NAME, COLUMN_PREV);
@ -877,7 +924,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
if ( result != null) { if ( result != null) {
CINode node = getCINode(result, new CINode()); CINode node = getCINode(result, new CINode());
System.out.printf("FSR %d %016x\n", t2 - t1, node.key); System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
return node; return node;
} }
@ -886,8 +933,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
return null; return null;
} }
private CINode getNode(long row, HTable table, CINode node) throws IOException { private CINode getNode(byte[] row, HTable table, CINode node) throws IOException {
Get get = new Get(Bytes.toBytes(row)); Get get = new Get(row);
get.addColumn(FAMILY_NAME, COLUMN_PREV); get.addColumn(FAMILY_NAME, COLUMN_PREV);
Result result = table.get(get); Result result = table.get(get);
return getCINode(result, node); return getCINode(result, node);
@ -899,11 +946,11 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
} }
private static CINode getCINode(Result result, CINode node) { private static CINode getCINode(Result result, CINode node) {
node.key = Bytes.toLong(result.getRow()); node.key = Bytes.copy(result.getRow());
if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) { if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
node.prev = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_PREV)); node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
} else { } else {
node.prev = -1; node.prev = NO_KEY;
} }
if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) { if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT)); node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
@ -1006,4 +1053,16 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
int ret = ToolRunner.run(HBaseConfiguration.create(), new IntegrationTestBigLinkedList(), args); int ret = ToolRunner.run(HBaseConfiguration.create(), new IntegrationTestBigLinkedList(), args);
System.exit(ret); System.exit(ret);
} }
}
private static void setJobConf(Job job, int numMappers, long numNodes,
Integer width, Integer wrapMuplitplier) {
job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
if (width != null) {
job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width.intValue());
}
if (wrapMuplitplier != null) {
job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMuplitplier.intValue());
}
}
}