HBASE-8672 Create an Integration test for Bulk Loads
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1491656 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bcc3626301
commit
30c8032cb2
|
@ -0,0 +1,626 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.RandomStringUtils;
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.IntegrationTests;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
|
import org.apache.hadoop.io.WritableComparator;
|
||||||
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
import org.apache.hadoop.mapreduce.InputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.apache.hadoop.mapreduce.Partitioner;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Bulk Load and MR on a distributed cluster.
|
||||||
|
* It starts an MR job that creates linked chains
|
||||||
|
*
|
||||||
|
* The format of rows is like this:
|
||||||
|
* Row Key -> Long
|
||||||
|
*
|
||||||
|
* L:<< Chain Id >> -> Row Key of the next link in the chain
|
||||||
|
* S:<< Chain Id >> -> The step in the chain that his link is.
|
||||||
|
* D:<< Chain Id >> -> Random Data.
|
||||||
|
*
|
||||||
|
* All chains start on row 0.
|
||||||
|
* All rk's are > 0.
|
||||||
|
*
|
||||||
|
* After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
|
||||||
|
*
|
||||||
|
* There are a few options exposed:
|
||||||
|
*
|
||||||
|
* hbase.IntegrationTestBulkLoad.chainLength
|
||||||
|
* The number of rows that will be part of each and every chain.
|
||||||
|
*
|
||||||
|
* hbase.IntegrationTestBulkLoad.numMaps
|
||||||
|
* The number of mappers that will be run. Each mapper creates on linked list chain.
|
||||||
|
*
|
||||||
|
* hbase.IntegrationTestBulkLoad.numImportRounds
|
||||||
|
* How many jobs will be run to create linked lists.
|
||||||
|
*
|
||||||
|
* hbase.IntegrationTestBulkLoad.tableName
|
||||||
|
* The name of the table.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Category(IntegrationTests.class)
|
||||||
|
public class IntegrationTestBulkLoad implements Configurable, Tool {
|
||||||
|
|
||||||
|
private static byte[] CHAIN_FAM = Bytes.toBytes("L");
|
||||||
|
private static byte[] SORT_FAM = Bytes.toBytes("S");
|
||||||
|
private static byte[] DATA_FAM = Bytes.toBytes("D");
|
||||||
|
|
||||||
|
private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
|
||||||
|
private static int CHAIN_LENGTH = 900000;
|
||||||
|
|
||||||
|
private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
|
||||||
|
private static int NUM_MAPS = 1;
|
||||||
|
|
||||||
|
private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
|
||||||
|
private static int NUM_IMPORT_ROUNDS = 1;
|
||||||
|
|
||||||
|
|
||||||
|
private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
|
||||||
|
private static String TABLE_NAME = "IntegrationTestBulkLoad";
|
||||||
|
|
||||||
|
private static IntegrationTestingUtility util;
|
||||||
|
|
||||||
|
private String tableName;
|
||||||
|
private byte[] tableNameBytes;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBulkLoad() throws Exception {
|
||||||
|
setupTable();
|
||||||
|
int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
|
||||||
|
for (int i = 0; i < numImportRounds; i++) {
|
||||||
|
runLinkedListMRJob(i);
|
||||||
|
}
|
||||||
|
runCheck();
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[][] getSplits(int numRegions) {
|
||||||
|
RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
|
||||||
|
split.setFirstRow(Bytes.toBytes(0L));
|
||||||
|
split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
|
||||||
|
return split.split(numRegions);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupTable() throws IOException {
|
||||||
|
tableName = getConf().get(TABLE_NAME_KEY, TABLE_NAME);
|
||||||
|
tableNameBytes = Bytes.toBytes(tableName);
|
||||||
|
if (util.getHBaseAdmin().tableExists(tableNameBytes)) {
|
||||||
|
util.deleteTable(tableNameBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
util.createTable(
|
||||||
|
tableNameBytes,
|
||||||
|
new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
|
||||||
|
getSplits(16)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runLinkedListMRJob(int iteration) throws Exception {
|
||||||
|
String jobName = IntegrationTestBulkLoad.class.getSimpleName() + " - " +
|
||||||
|
EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
Configuration conf = new Configuration(util.getConfiguration());
|
||||||
|
Path p = util.getDataTestDirOnTestFS(tableName + "-" + iteration);
|
||||||
|
HTable table = new HTable(conf, tableName);
|
||||||
|
|
||||||
|
conf.setBoolean("mapreduce.map.speculative", false);
|
||||||
|
conf.setBoolean("mapreduce.reduce.speculative", false);
|
||||||
|
|
||||||
|
Job job = new Job(conf);
|
||||||
|
|
||||||
|
job.setJobName(jobName);
|
||||||
|
|
||||||
|
// set the input format so that we can create map tasks with no data input.
|
||||||
|
job.setInputFormatClass(RandomInputFormat.class);
|
||||||
|
|
||||||
|
// Set the mapper classes.
|
||||||
|
job.setMapperClass(LinkedListCreationMapper.class);
|
||||||
|
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||||
|
job.setMapOutputValueClass(KeyValue.class);
|
||||||
|
|
||||||
|
// Use the identity reducer
|
||||||
|
// So nothing to do here.
|
||||||
|
|
||||||
|
// Set this jar.
|
||||||
|
job.setJarByClass(getClass());
|
||||||
|
|
||||||
|
// Set where to place the hfiles.
|
||||||
|
FileOutputFormat.setOutputPath(job, p);
|
||||||
|
|
||||||
|
// Configure the partitioner and other things needed for HFileOutputFormat.
|
||||||
|
HFileOutputFormat.configureIncrementalLoad(job, table);
|
||||||
|
|
||||||
|
// Run the job making sure it works.
|
||||||
|
assertEquals(true, job.waitForCompletion(true));
|
||||||
|
|
||||||
|
// Create a new loader.
|
||||||
|
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
|
||||||
|
|
||||||
|
// Load the HFiles in.
|
||||||
|
loader.doBulkLoad(p, table);
|
||||||
|
|
||||||
|
// Delete the files.
|
||||||
|
util.getTestFileSystem().delete(p, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to generate splits. Each split gets a dummy split file. The associated
|
||||||
|
* RecordReader generates a single random number.
|
||||||
|
*
|
||||||
|
* This class is adapted from Hadoop tests.
|
||||||
|
*/
|
||||||
|
static class RandomInputFormat extends InputFormat<Text, LongWritable> {
|
||||||
|
public List<InputSplit> getSplits(JobContext job) throws IOException {
|
||||||
|
List<InputSplit> result = new ArrayList<InputSplit>();
|
||||||
|
int numSplits = job.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
|
||||||
|
for (int i = 0; i < numSplits; ++i) {
|
||||||
|
result.add(new FileSplit(new Path("/tmp", "dummy-split-" + i), 0, 1, null));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RecordReader that doesn't read anything. Instead it generates a single random number.
|
||||||
|
* This is useful for debugging or starting map tasks with no data inpput.
|
||||||
|
*
|
||||||
|
* This class is adapted from Hadoop tests.
|
||||||
|
*/
|
||||||
|
static class RandomRecordReader extends RecordReader<Text, LongWritable> {
|
||||||
|
Path name;
|
||||||
|
Text key = null;
|
||||||
|
LongWritable value = new LongWritable();
|
||||||
|
|
||||||
|
public RandomRecordReader(Path p) {
|
||||||
|
name = p;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initialize(InputSplit split,
|
||||||
|
TaskAttemptContext context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean nextKeyValue() {
|
||||||
|
if (name != null) {
|
||||||
|
key = new Text();
|
||||||
|
key.set(name.getName());
|
||||||
|
name = null;
|
||||||
|
value.set(new Random().nextLong());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Text getCurrentKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public LongWritable getCurrentValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getProgress() {
|
||||||
|
return 0.0f;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public RecordReader<Text, LongWritable> createRecordReader(InputSplit split,
|
||||||
|
TaskAttemptContext context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return new RandomRecordReader(((FileSplit) split).getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mapper that creates a linked list of KeyValues.
|
||||||
|
*
|
||||||
|
* Each map task generates one linked list.
|
||||||
|
* All lists start on row key 0L.
|
||||||
|
* All lists should be CHAIN_LENGTH long.
|
||||||
|
*/
|
||||||
|
public static class LinkedListCreationMapper
|
||||||
|
extends Mapper<Text, LongWritable, ImmutableBytesWritable, KeyValue> {
|
||||||
|
|
||||||
|
private Random rand = new Random();
|
||||||
|
|
||||||
|
protected void map(Text key, LongWritable value, Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
long chainId = value.get();
|
||||||
|
byte[] chainIdArray = Bytes.toBytes(chainId);
|
||||||
|
long currentRow = 0;
|
||||||
|
long nextRow = Math.abs(rand.nextLong());
|
||||||
|
|
||||||
|
int chainLength = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
|
||||||
|
|
||||||
|
for (long i = 0; i < chainLength; i++) {
|
||||||
|
byte[] rk = Bytes.toBytes(currentRow);
|
||||||
|
|
||||||
|
// Next link in the chain.
|
||||||
|
KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
|
||||||
|
// What link in the chain this is.
|
||||||
|
KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
|
||||||
|
// Added data so that large stores are created.
|
||||||
|
KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
|
||||||
|
Bytes.toBytes(RandomStringUtils.randomAlphabetic(100))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Emit the key values.
|
||||||
|
context.write(new ImmutableBytesWritable(rk), linkKv);
|
||||||
|
context.write(new ImmutableBytesWritable(rk), sortKv);
|
||||||
|
context.write(new ImmutableBytesWritable(rk), dataKv);
|
||||||
|
// Move to the next row.
|
||||||
|
currentRow = nextRow;
|
||||||
|
nextRow = Math.abs(rand.nextLong());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writable class used as the key to group links in the linked list.
|
||||||
|
*
|
||||||
|
* Used as the key emited from a pass over the table.
|
||||||
|
*/
|
||||||
|
public static class LinkKey implements WritableComparable<LinkKey> {
|
||||||
|
|
||||||
|
private Long chainId;
|
||||||
|
|
||||||
|
public Long getOrder() {
|
||||||
|
return order;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getChainId() {
|
||||||
|
return chainId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long order;
|
||||||
|
|
||||||
|
public LinkKey() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public LinkKey(long chainId, long order) {
|
||||||
|
this.chainId = chainId;
|
||||||
|
this.order = order;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(LinkKey linkKey) {
|
||||||
|
int res = getChainId().compareTo(linkKey.getChainId());
|
||||||
|
if (res == 0) {
|
||||||
|
res = getOrder().compareTo(linkKey.getOrder());
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput dataOutput) throws IOException {
|
||||||
|
WritableUtils.writeVLong(dataOutput, chainId);
|
||||||
|
WritableUtils.writeVLong(dataOutput, order);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput dataInput) throws IOException {
|
||||||
|
chainId = WritableUtils.readVLong(dataInput);
|
||||||
|
order = WritableUtils.readVLong(dataInput);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writable used as the value emitted from a pass over the hbase table.
|
||||||
|
*/
|
||||||
|
public static class LinkChain implements WritableComparable<LinkChain> {
|
||||||
|
|
||||||
|
public Long getNext() {
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getRk() {
|
||||||
|
return rk;
|
||||||
|
}
|
||||||
|
|
||||||
|
public LinkChain() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public LinkChain(Long rk, Long next) {
|
||||||
|
this.rk = rk;
|
||||||
|
this.next = next;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long rk;
|
||||||
|
private Long next;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(LinkChain linkChain) {
|
||||||
|
int res = getRk().compareTo(linkChain.getRk());
|
||||||
|
if (res == 0) {
|
||||||
|
res = getNext().compareTo(linkChain.getNext());
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput dataOutput) throws IOException {
|
||||||
|
WritableUtils.writeVLong(dataOutput, rk);
|
||||||
|
WritableUtils.writeVLong(dataOutput, next);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput dataInput) throws IOException {
|
||||||
|
rk = WritableUtils.readVLong(dataInput);
|
||||||
|
next = WritableUtils.readVLong(dataInput);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to figure out what partition to send a link in the chain to. This is based upon
|
||||||
|
* the linkKey's ChainId.
|
||||||
|
*/
|
||||||
|
public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
|
||||||
|
@Override
|
||||||
|
public int getPartition(LinkKey linkKey,
|
||||||
|
LinkChain linkChain,
|
||||||
|
int numPartitions) {
|
||||||
|
int hash = linkKey.getChainId().hashCode();
|
||||||
|
int partition = hash % numPartitions;
|
||||||
|
return partition;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comparator used to figure out if a linkKey should be grouped together. This is based upon the
|
||||||
|
* linkKey's ChainId.
|
||||||
|
*/
|
||||||
|
public static class NaturalKeyGroupingComparator extends WritableComparator {
|
||||||
|
|
||||||
|
protected NaturalKeyGroupingComparator() {
|
||||||
|
super(LinkKey.class, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int compare(WritableComparable w1, WritableComparable w2) {
|
||||||
|
LinkKey k1 = (LinkKey) w1;
|
||||||
|
LinkKey k2 = (LinkKey) w2;
|
||||||
|
|
||||||
|
return k1.getChainId().compareTo(k2.getChainId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comparator used to order linkKeys so that they are passed to a reducer in order. This is based
|
||||||
|
* upon linkKey ChainId and Order.
|
||||||
|
*/
|
||||||
|
public static class CompositeKeyComparator extends WritableComparator {
|
||||||
|
|
||||||
|
protected CompositeKeyComparator() {
|
||||||
|
super(LinkKey.class, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(WritableComparable w1, WritableComparable w2) {
|
||||||
|
LinkKey k1 = (LinkKey) w1;
|
||||||
|
LinkKey k2 = (LinkKey) w2;
|
||||||
|
|
||||||
|
return k1.compareTo(k2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mapper to pass over the table.
|
||||||
|
*
|
||||||
|
* For every row there could be multiple chains that landed on this row. So emit a linkKey
|
||||||
|
* and value for each.
|
||||||
|
*/
|
||||||
|
public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
|
||||||
|
protected void map(ImmutableBytesWritable key, Result value, Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
long longRk = Bytes.toLong(value.getRow());
|
||||||
|
|
||||||
|
for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
|
||||||
|
long chainId = Bytes.toLong(entry.getKey());
|
||||||
|
long next = Bytes.toLong(entry.getValue());
|
||||||
|
long order = Bytes.toLong(value.getColumn(SORT_FAM, entry.getKey()).get(0).getValue());
|
||||||
|
context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that does the actual checking of the links.
|
||||||
|
*
|
||||||
|
* All links in the chain should be grouped and sorted when sent to this class. Then the chain
|
||||||
|
* will be traversed making sure that no link is missing and that the chain is the correct length.
|
||||||
|
*
|
||||||
|
* This will throw an exception if anything is not correct. That causes the job to fail if any
|
||||||
|
* data is corrupt.
|
||||||
|
*/
|
||||||
|
public static class LinkedListCheckingReducer
|
||||||
|
extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
|
||||||
|
protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
|
||||||
|
throws java.io.IOException, java.lang.InterruptedException {
|
||||||
|
long next = -1L;
|
||||||
|
long count = 0L;
|
||||||
|
|
||||||
|
for (LinkChain lc : values) {
|
||||||
|
|
||||||
|
if (next == -1) {
|
||||||
|
if (lc.getRk() != 0L) throw new RuntimeException("Chains should all start at 0 rk");
|
||||||
|
next = lc.getNext();
|
||||||
|
} else {
|
||||||
|
if (next != lc.getRk())
|
||||||
|
throw new RuntimeException("Missing a link in the chain. Expecthing " +
|
||||||
|
next + " got " + lc.getRk());
|
||||||
|
next = lc.getNext();
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
|
||||||
|
if (count != expectedChainLen)
|
||||||
|
throw new RuntimeException("Chain wasn't the correct length. Expected " +
|
||||||
|
expectedChainLen + " got " + count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After adding data to the table start a mr job to
|
||||||
|
* @throws IOException
|
||||||
|
* @throws ClassNotFoundException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
|
||||||
|
Configuration conf = getConf();
|
||||||
|
String jobName = tableName + "_check" + EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
Path p = util.getDataTestDirOnTestFS(jobName);
|
||||||
|
|
||||||
|
Job job = new Job(conf);
|
||||||
|
|
||||||
|
job.setJarByClass(getClass());
|
||||||
|
|
||||||
|
job.setPartitionerClass(NaturalKeyPartitioner.class);
|
||||||
|
job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
|
||||||
|
job.setSortComparatorClass(CompositeKeyComparator.class);
|
||||||
|
|
||||||
|
Scan s = new Scan();
|
||||||
|
s.addFamily(CHAIN_FAM);
|
||||||
|
s.addFamily(SORT_FAM);
|
||||||
|
s.setMaxVersions(1);
|
||||||
|
s.setCacheBlocks(false);
|
||||||
|
s.setBatch(100);
|
||||||
|
|
||||||
|
TableMapReduceUtil.initTableMapperJob(
|
||||||
|
Bytes.toBytes(tableName),
|
||||||
|
new Scan(),
|
||||||
|
LinkedListCheckingMapper.class,
|
||||||
|
LinkKey.class,
|
||||||
|
LinkChain.class,
|
||||||
|
job
|
||||||
|
);
|
||||||
|
|
||||||
|
job.setReducerClass(LinkedListCheckingReducer.class);
|
||||||
|
job.setOutputKeyClass(NullWritable.class);
|
||||||
|
job.setOutputValueClass(NullWritable.class);
|
||||||
|
|
||||||
|
FileOutputFormat.setOutputPath(job, p);
|
||||||
|
|
||||||
|
assertEquals(true, job.waitForCompletion(true));
|
||||||
|
|
||||||
|
// Delete the files.
|
||||||
|
util.getTestFileSystem().delete(p, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void provisionCluster() throws Exception {
|
||||||
|
if (null == util) {
|
||||||
|
util = new IntegrationTestingUtility();
|
||||||
|
}
|
||||||
|
|
||||||
|
util.initializeCluster(1);
|
||||||
|
|
||||||
|
// Scale this up on a real cluster
|
||||||
|
if (util.isDistributedCluster()) {
|
||||||
|
util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
|
||||||
|
Integer.toString(util.getHBaseAdmin().getClusterStatus().getServersSize() * 20)
|
||||||
|
);
|
||||||
|
util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "3");
|
||||||
|
} else {
|
||||||
|
util.startMiniMapReduceCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void releaseCluster() throws Exception {
|
||||||
|
util.restoreCluster();
|
||||||
|
util = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
provisionCluster();
|
||||||
|
testBulkLoad();
|
||||||
|
releaseCluster();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
if (util != null) {
|
||||||
|
throw new IllegalArgumentException("setConf not supported after the cluster has been started.");
|
||||||
|
}
|
||||||
|
util = new IntegrationTestingUtility(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return util.getConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
IntegrationTestingUtility.setUseDistributedCluster(conf);
|
||||||
|
int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
|
||||||
|
System.exit(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -336,6 +336,9 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
||||||
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
|
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conf.setStrings("io.serializations", conf.get("io.serializations"),
|
||||||
|
KeyValueSerialization.class.getName());
|
||||||
|
|
||||||
// Use table's region boundaries for TOP split points.
|
// Use table's region boundaries for TOP split points.
|
||||||
LOG.info("Looking up current regions for table " + table);
|
LOG.info("Looking up current regions for table " + table);
|
||||||
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
|
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
|
||||||
|
@ -349,6 +352,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
||||||
configureBloomType(table, conf);
|
configureBloomType(table, conf);
|
||||||
|
|
||||||
TableMapReduceUtil.addDependencyJars(job);
|
TableMapReduceUtil.addDependencyJars(job);
|
||||||
|
TableMapReduceUtil.initCredentials(job);
|
||||||
LOG.info("Incremental table output configured.");
|
LOG.info("Incremental table output configured.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -230,6 +230,18 @@ public class RegionSplitter {
|
||||||
* @return the separator character to use when storing / printing the row
|
* @return the separator character to use when storing / printing the row
|
||||||
*/
|
*/
|
||||||
String separator();
|
String separator();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the first row
|
||||||
|
* @param userInput byte array of the row key.
|
||||||
|
*/
|
||||||
|
void setFirstRow(byte[] userInput);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the last row
|
||||||
|
* @param userInput byte array of the row key.
|
||||||
|
*/
|
||||||
|
void setLastRow(byte[] userInput);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -872,6 +884,16 @@ public class RegionSplitter {
|
||||||
return " ";
|
return " ";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setFirstRow(byte[] userInput) {
|
||||||
|
firstRow = Bytes.toString(userInput);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setLastRow(byte[] userInput) {
|
||||||
|
lastRow = Bytes.toString(userInput);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Divide 2 numbers in half (for split algorithm)
|
* Divide 2 numbers in half (for split algorithm)
|
||||||
*
|
*
|
||||||
|
@ -992,6 +1014,17 @@ public class RegionSplitter {
|
||||||
lastRowBytes = Bytes.toBytesBinary(userInput);
|
lastRowBytes = Bytes.toBytesBinary(userInput);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setFirstRow(byte[] userInput) {
|
||||||
|
firstRowBytes = userInput;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setLastRow(byte[] userInput) {
|
||||||
|
lastRowBytes = userInput;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] strToRow(String input) {
|
public byte[] strToRow(String input) {
|
||||||
return Bytes.toBytesBinary(input);
|
return Bytes.toBytesBinary(input);
|
||||||
|
|
|
@ -1212,6 +1212,25 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
||||||
return new HTable(getConfiguration(), tableName);
|
return new HTable(getConfiguration(), tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a table.
|
||||||
|
* @param tableName
|
||||||
|
* @param families
|
||||||
|
* @param splitRows
|
||||||
|
* @return An HTable instance for the created table.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public HTable createTable(byte[] tableName, byte[][] families, byte[][] splitRows)
|
||||||
|
throws IOException {
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
|
for(byte[] family:families) {
|
||||||
|
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||||
|
desc.addFamily(hcd);
|
||||||
|
}
|
||||||
|
getHBaseAdmin().createTable(desc, splitRows);
|
||||||
|
return new HTable(getConfiguration(), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Drop an existing table
|
* Drop an existing table
|
||||||
* @param tableName existing table
|
* @param tableName existing table
|
||||||
|
|
Loading…
Reference in New Issue