diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index f5ad854643b..dd4415bf525 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -29,7 +29,10 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import org.apache.commons.cli.CommandLine; import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -48,7 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; @@ -61,7 +64,6 @@ import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.junit.Test; @@ -101,6 +103,8 @@ import org.junit.experimental.categories.Category; @Category(IntegrationTests.class) public class IntegrationTestBulkLoad extends IntegrationTestBase { + private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class); + private static byte[] CHAIN_FAM = Bytes.toBytes("L"); private static byte[] SORT_FAM = Bytes.toBytes("S"); private static byte[] DATA_FAM = Bytes.toBytes("D"); @@ -114,18 +118,24 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds"; private static int NUM_IMPORT_ROUNDS = 1; + private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum"; private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName"; private static String TABLE_NAME = "IntegrationTestBulkLoad"; @Test public void testBulkLoad() throws Exception { + runLoad(); + runCheck(); + } + + public void runLoad() throws Exception { setupTable(); int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); + LOG.info("Running load with numIterations:" + numImportRounds); for (int i = 0; i < numImportRounds; i++) { runLinkedListMRJob(i); } - runCheck(); } private byte[][] getSplits(int numRegions) { @@ -156,13 +166,14 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); + conf.setInt(ROUND_NUM_KEY, iteration); Job job = new Job(conf); job.setJobName(jobName); // set the input format so that we can create map tasks with no data input. - job.setInputFormatClass(RandomInputFormat.class); + job.setInputFormatClass(ITBulkLoadInputFormat.class); // Set the mapper classes. job.setMapperClass(LinkedListCreationMapper.class); @@ -194,74 +205,78 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { util.getTestFileSystem().delete(p, true); } - /** - * Class to generate splits. Each split gets a dummy split file. The associated - * RecordReader generates a single random number. - * - * This class is adapted from Hadoop tests. - */ - static class RandomInputFormat extends InputFormat { - public List getSplits(JobContext job) throws IOException { - List result = new ArrayList(); - int numSplits = job.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); + public static class EmptySplit extends InputSplit implements Writable { + @Override + public void write(DataOutput out) throws IOException { } + @Override + public void readFields(DataInput in) throws IOException { } + @Override + public long getLength() { return 0L; } + @Override + public String[] getLocations() { return new String[0]; } + } + + public static class FixedRecordReader extends RecordReader { + private int index = -1; + private K[] keys; + private V[] values; + + public FixedRecordReader(K[] keys, V[] values) { + this.keys = keys; + this.values = values; + } + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, + InterruptedException { } + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return ++index < keys.length; + } + @Override + public K getCurrentKey() throws IOException, InterruptedException { + return keys[index]; + } + @Override + public V getCurrentValue() throws IOException, InterruptedException { + return values[index]; + } + @Override + public float getProgress() throws IOException, InterruptedException { + return (float)index / keys.length; + } + @Override + public void close() throws IOException { + } + } + + public static class ITBulkLoadInputFormat extends InputFormat { + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException { + int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); + ArrayList ret = new ArrayList(numSplits); for (int i = 0; i < numSplits; ++i) { - result.add(new FileSplit(new Path("/tmp", "dummy-split-" + i), 0, 1, null)); + ret.add(new EmptySplit()); } - return result; + return ret; } - /** - * RecordReader that doesn't read anything. Instead it generates a single random number. - * This is useful for debugging or starting map tasks with no data inpput. - * - * This class is adapted from Hadoop tests. - */ - static class RandomRecordReader extends RecordReader { - Path name; - Text key = null; - LongWritable value = new LongWritable(); - - public RandomRecordReader(Path p) { - name = p; - } - - public void initialize(InputSplit split, - TaskAttemptContext context) + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + int taskId = context.getTaskAttemptID().getTaskID().getId(); + int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS); + int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); + int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0); - } + taskId = taskId + iteration * numMapTasks; + numMapTasks = numMapTasks * numIterations; - public boolean nextKeyValue() { - if (name != null) { - key = new Text(); - key.set(name.getName()); - name = null; - value.set(new Random().nextLong()); - return true; - } - return false; - } + long chainId = Math.abs(new Random().nextLong()); + chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations + LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)}; - public Text getCurrentKey() { - return key; - } - - public LongWritable getCurrentValue() { - return value; - } - - public void close() { - } - - public float getProgress() { - return 0.0f; - } - } - - public RecordReader createRecordReader(InputSplit split, - TaskAttemptContext context) - throws IOException, InterruptedException { - return new RandomRecordReader(((FileSplit) split).getPath()); + return new FixedRecordReader(keys, keys); } } @@ -273,19 +288,21 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { * All lists should be CHAIN_LENGTH long. */ public static class LinkedListCreationMapper - extends Mapper { + extends Mapper { private Random rand = new Random(); - protected void map(Text key, LongWritable value, Context context) + @Override + protected void map(LongWritable key, LongWritable value, Context context) throws IOException, InterruptedException { - long chainId = value.get(); + LOG.info("Starting mapper with chainId:" + chainId); + byte[] chainIdArray = Bytes.toBytes(chainId); long currentRow = 0; - long nextRow = Math.abs(rand.nextLong()); - int chainLength = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH); + long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH); + long nextRow = getNextRow(0, chainLength); for (long i = 0; i < chainLength; i++) { byte[] rk = Bytes.toBytes(currentRow); @@ -305,9 +322,19 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { context.write(new ImmutableBytesWritable(rk), dataKv); // Move to the next row. currentRow = nextRow; - nextRow = Math.abs(rand.nextLong()); + nextRow = getNextRow(i+1, chainLength); } } + + /** Returns a unique row id within this chain for this index */ + private long getNextRow(long index, long chainLength) { + long nextRow = Math.abs(rand.nextLong()); + // use significant bits from the random number, but pad with index to ensure it is unique + // this also ensures that we do not reuse row = 0 + // row collisions from multiple mappers are fine, since we guarantee unique chainIds + nextRow = nextRow - (nextRow % chainLength) + index; + return nextRow; + } } /** @@ -427,6 +454,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { super(LinkKey.class, true); } + @Override public int compare(WritableComparable w1, WritableComparable w2) { LinkKey k1 = (LinkKey) w1; LinkKey k2 = (LinkKey) w2; @@ -461,6 +489,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { * and value for each. */ public static class LinkedListCheckingMapper extends TableMapper { + @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { long longRk = Bytes.toLong(value.getRow()); @@ -486,6 +515,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { */ public static class LinkedListCheckingReducer extends Reducer { + @Override protected void reduce(LinkKey key, Iterable values, Context context) throws java.io.IOException, java.lang.InterruptedException { long next = -1L; @@ -494,12 +524,13 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { for (LinkChain lc : values) { if (next == -1) { - if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk"); + if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk" + + ". Chain:" + key.chainId + ", order:" + key.order); next = lc.getNext(); } else { if (next != lc.getRk()) - throw new RuntimeException("Missing a link in the chain. Expecthing " + - next + " got " + lc.getRk()); + throw new RuntimeException("Missing a link in the chain. Expecting " + + next + " got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order); next = lc.getNext(); } count++; @@ -508,7 +539,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH); if (count != expectedChainLen) throw new RuntimeException("Chain wasn't the correct length. Expected " + - expectedChainLen + " got " + count); + expectedChainLen + " got " + count + ". Chain:" + key.chainId + ", order:" + key.order); } } @@ -519,6 +550,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { * @throws InterruptedException */ private void runCheck() throws IOException, ClassNotFoundException, InterruptedException { + LOG.info("Running check"); Configuration conf = getConf(); String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTimeMillis(); Path p = util.getDataTestDirOnTestFS(jobName); @@ -536,7 +568,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { s.addFamily(SORT_FAM); s.setMaxVersions(1); s.setCacheBlocks(false); - s.setBatch(100); + s.setBatch(1000); TableMapReduceUtil.initTableMapperJob( Bytes.toBytes(getTablename()), @@ -575,9 +607,35 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { } } + private static final String OPT_LOAD = "load"; + private static final String OPT_CHECK = "check"; + + private boolean load = false; + private boolean check = false; + + @Override + protected void addOptions() { + super.addOptions(); + super.addOptNoArg(OPT_CHECK, "Run check only"); + super.addOptNoArg(OPT_LOAD, "Run load only"); + } + + @Override + protected void processOptions(CommandLine cmd) { + super.processOptions(cmd); + check = cmd.hasOption(OPT_CHECK); + load = cmd.hasOption(OPT_LOAD); + } + @Override public int runTestFromCommandLine() throws Exception { - runCheck(); + if (load) { + runLoad(); + } else if (check) { + runCheck(); + } else { + testBulkLoad(); + } return 0; }