HBASE-9759 Prevent random number collision in IntegrationTestBulkLoad

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1536551 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2013-10-28 23:09:52 +00:00
parent ba51a43ca3
commit 5a831da632
1 changed files with 134 additions and 76 deletions

View File

@ -29,7 +29,10 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -48,7 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
@ -61,7 +64,6 @@ import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
@ -101,6 +103,8 @@ import org.junit.experimental.categories.Category;
@Category(IntegrationTests.class)
public class IntegrationTestBulkLoad extends IntegrationTestBase {
private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
private static byte[] CHAIN_FAM = Bytes.toBytes("L");
private static byte[] SORT_FAM = Bytes.toBytes("S");
private static byte[] DATA_FAM = Bytes.toBytes("D");
@ -114,18 +118,24 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
private static int NUM_IMPORT_ROUNDS = 1;
private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
private static String TABLE_NAME = "IntegrationTestBulkLoad";
@Test
public void testBulkLoad() throws Exception {
runLoad();
runCheck();
}
public void runLoad() throws Exception {
setupTable();
int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
LOG.info("Running load with numIterations:" + numImportRounds);
for (int i = 0; i < numImportRounds; i++) {
runLinkedListMRJob(i);
}
runCheck();
}
private byte[][] getSplits(int numRegions) {
@ -156,13 +166,14 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
conf.setInt(ROUND_NUM_KEY, iteration);
Job job = new Job(conf);
job.setJobName(jobName);
// set the input format so that we can create map tasks with no data input.
job.setInputFormatClass(RandomInputFormat.class);
job.setInputFormatClass(ITBulkLoadInputFormat.class);
// Set the mapper classes.
job.setMapperClass(LinkedListCreationMapper.class);
@ -194,74 +205,78 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
util.getTestFileSystem().delete(p, true);
}
/**
* Class to generate splits. Each split gets a dummy split file. The associated
* RecordReader generates a single random number.
*
* This class is adapted from Hadoop tests.
*/
static class RandomInputFormat extends InputFormat<Text, LongWritable> {
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> result = new ArrayList<InputSplit>();
int numSplits = job.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
public static class EmptySplit extends InputSplit implements Writable {
@Override
public void write(DataOutput out) throws IOException { }
@Override
public void readFields(DataInput in) throws IOException { }
@Override
public long getLength() { return 0L; }
@Override
public String[] getLocations() { return new String[0]; }
}
public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
private int index = -1;
private K[] keys;
private V[] values;
public FixedRecordReader(K[] keys, V[] values) {
this.keys = keys;
this.values = values;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException { }
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return ++index < keys.length;
}
@Override
public K getCurrentKey() throws IOException, InterruptedException {
return keys[index];
}
@Override
public V getCurrentValue() throws IOException, InterruptedException {
return values[index];
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (float)index / keys.length;
}
@Override
public void close() throws IOException {
}
}
public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
ArrayList<InputSplit> ret = new ArrayList<InputSplit>(numSplits);
for (int i = 0; i < numSplits; ++i) {
result.add(new FileSplit(new Path("/tmp", "dummy-split-" + i), 0, 1, null));
ret.add(new EmptySplit());
}
return result;
return ret;
}
/**
* RecordReader that doesn't read anything. Instead it generates a single random number.
* This is useful for debugging or starting map tasks with no data inpput.
*
* This class is adapted from Hadoop tests.
*/
static class RandomRecordReader extends RecordReader<Text, LongWritable> {
Path name;
Text key = null;
LongWritable value = new LongWritable();
public RandomRecordReader(Path p) {
name = p;
}
public void initialize(InputSplit split,
TaskAttemptContext context)
@Override
public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
int taskId = context.getTaskAttemptID().getTaskID().getId();
int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
}
taskId = taskId + iteration * numMapTasks;
numMapTasks = numMapTasks * numIterations;
public boolean nextKeyValue() {
if (name != null) {
key = new Text();
key.set(name.getName());
name = null;
value.set(new Random().nextLong());
return true;
}
return false;
}
long chainId = Math.abs(new Random().nextLong());
chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
public Text getCurrentKey() {
return key;
}
public LongWritable getCurrentValue() {
return value;
}
public void close() {
}
public float getProgress() {
return 0.0f;
}
}
public RecordReader<Text, LongWritable> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
return new RandomRecordReader(((FileSplit) split).getPath());
return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
}
}
@ -273,19 +288,21 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
* All lists should be CHAIN_LENGTH long.
*/
public static class LinkedListCreationMapper
extends Mapper<Text, LongWritable, ImmutableBytesWritable, KeyValue> {
extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
private Random rand = new Random();
protected void map(Text key, LongWritable value, Context context)
@Override
protected void map(LongWritable key, LongWritable value, Context context)
throws IOException, InterruptedException {
long chainId = value.get();
LOG.info("Starting mapper with chainId:" + chainId);
byte[] chainIdArray = Bytes.toBytes(chainId);
long currentRow = 0;
long nextRow = Math.abs(rand.nextLong());
int chainLength = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
long nextRow = getNextRow(0, chainLength);
for (long i = 0; i < chainLength; i++) {
byte[] rk = Bytes.toBytes(currentRow);
@ -305,9 +322,19 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
context.write(new ImmutableBytesWritable(rk), dataKv);
// Move to the next row.
currentRow = nextRow;
nextRow = Math.abs(rand.nextLong());
nextRow = getNextRow(i+1, chainLength);
}
}
/** Returns a unique row id within this chain for this index */
private long getNextRow(long index, long chainLength) {
long nextRow = Math.abs(rand.nextLong());
// use significant bits from the random number, but pad with index to ensure it is unique
// this also ensures that we do not reuse row = 0
// row collisions from multiple mappers are fine, since we guarantee unique chainIds
nextRow = nextRow - (nextRow % chainLength) + index;
return nextRow;
}
}
/**
@ -427,6 +454,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
super(LinkKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
LinkKey k1 = (LinkKey) w1;
LinkKey k2 = (LinkKey) w2;
@ -461,6 +489,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
* and value for each.
*/
public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
long longRk = Bytes.toLong(value.getRow());
@ -486,6 +515,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
*/
public static class LinkedListCheckingReducer
extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
@Override
protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
throws java.io.IOException, java.lang.InterruptedException {
long next = -1L;
@ -494,12 +524,13 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
for (LinkChain lc : values) {
if (next == -1) {
if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk");
if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk"
+ ". Chain:" + key.chainId + ", order:" + key.order);
next = lc.getNext();
} else {
if (next != lc.getRk())
throw new RuntimeException("Missing a link in the chain. Expecthing " +
next + " got " + lc.getRk());
throw new RuntimeException("Missing a link in the chain. Expecting " +
next + " got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order);
next = lc.getNext();
}
count++;
@ -508,7 +539,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
if (count != expectedChainLen)
throw new RuntimeException("Chain wasn't the correct length. Expected " +
expectedChainLen + " got " + count);
expectedChainLen + " got " + count + ". Chain:" + key.chainId + ", order:" + key.order);
}
}
@ -519,6 +550,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
* @throws InterruptedException
*/
private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
LOG.info("Running check");
Configuration conf = getConf();
String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTimeMillis();
Path p = util.getDataTestDirOnTestFS(jobName);
@ -536,7 +568,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
s.addFamily(SORT_FAM);
s.setMaxVersions(1);
s.setCacheBlocks(false);
s.setBatch(100);
s.setBatch(1000);
TableMapReduceUtil.initTableMapperJob(
Bytes.toBytes(getTablename()),
@ -575,9 +607,35 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
}
}
private static final String OPT_LOAD = "load";
private static final String OPT_CHECK = "check";
private boolean load = false;
private boolean check = false;
@Override
protected void addOptions() {
super.addOptions();
super.addOptNoArg(OPT_CHECK, "Run check only");
super.addOptNoArg(OPT_LOAD, "Run load only");
}
@Override
protected void processOptions(CommandLine cmd) {
super.processOptions(cmd);
check = cmd.hasOption(OPT_CHECK);
load = cmd.hasOption(OPT_LOAD);
}
@Override
public int runTestFromCommandLine() throws Exception {
runCheck();
if (load) {
runLoad();
} else if (check) {
runCheck();
} else {
testBulkLoad();
}
return 0;
}