HBASE-13168 Backport HBASE-12590 "A solution for data skew in HBase-Mapreduce Job"
This commit is contained in:
parent
619d58f9b5
commit
05aef46d94
|
@ -31,6 +31,7 @@ import javax.naming.NamingException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -104,6 +105,16 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
public abstract class TableInputFormatBase
|
public abstract class TableInputFormatBase
|
||||||
extends InputFormat<ImmutableBytesWritable, Result> {
|
extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
|
|
||||||
|
/** Specify if we enable auto-balance for input in M/R jobs.*/
|
||||||
|
public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
|
||||||
|
/** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce
|
||||||
|
* .input.autobalance property.*/
|
||||||
|
public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" +
|
||||||
|
".maxskewratio";
|
||||||
|
/** Specify if the row key in table is text (ASCII between 32~126),
|
||||||
|
* default is true. False means the table is using binary row key*/
|
||||||
|
public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
|
||||||
|
|
||||||
final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
|
final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
|
||||||
|
|
||||||
private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
|
private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
|
||||||
|
@ -304,7 +315,26 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return splits;
|
//The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled.
|
||||||
|
boolean enableAutoBalance = context.getConfiguration().getBoolean(
|
||||||
|
MAPREDUCE_INPUT_AUTOBALANCE, false);
|
||||||
|
if (enableAutoBalance) {
|
||||||
|
long totalRegionSize=0;
|
||||||
|
for (int i = 0; i < splits.size(); i++){
|
||||||
|
TableSplit ts = (TableSplit)splits.get(i);
|
||||||
|
totalRegionSize += ts.getLength();
|
||||||
|
}
|
||||||
|
long averageRegionSize = totalRegionSize / splits.size();
|
||||||
|
// the averageRegionSize must be positive.
|
||||||
|
if (averageRegionSize <= 0) {
|
||||||
|
LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
|
||||||
|
"set it to 1.");
|
||||||
|
averageRegionSize = 1;
|
||||||
|
}
|
||||||
|
return calculateRebalancedSplits(splits, context, averageRegionSize);
|
||||||
|
} else {
|
||||||
|
return splits;
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (closeOnFinish) {
|
if (closeOnFinish) {
|
||||||
closeTable();
|
closeTable();
|
||||||
|
@ -335,6 +365,170 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
return hostName;
|
return hostName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates the number of MapReduce input splits for the map tasks. The number of
|
||||||
|
* MapReduce input splits depends on the average region size and the "data skew ratio" user set in
|
||||||
|
* configuration.
|
||||||
|
*
|
||||||
|
* @param list The list of input splits before balance.
|
||||||
|
* @param context The current job context.
|
||||||
|
* @param average The average size of all regions .
|
||||||
|
* @return The list of input splits.
|
||||||
|
* @throws IOException When creating the list of splits fails.
|
||||||
|
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
|
||||||
|
* org.apache.hadoop.mapreduce.JobContext)
|
||||||
|
*/
|
||||||
|
public List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context,
|
||||||
|
long average) throws IOException {
|
||||||
|
List<InputSplit> resultList = new ArrayList<InputSplit>();
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
//The default data skew ratio is 3
|
||||||
|
long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
|
||||||
|
//It determines which mode to use: text key mode or binary key mode. The default is text mode.
|
||||||
|
boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
|
||||||
|
long dataSkewThreshold = dataSkewRatio * average;
|
||||||
|
int count = 0;
|
||||||
|
while (count < list.size()) {
|
||||||
|
TableSplit ts = (TableSplit)list.get(count);
|
||||||
|
String regionLocation = ts.getRegionLocation();
|
||||||
|
long regionSize = ts.getLength();
|
||||||
|
if (regionSize >= dataSkewThreshold) {
|
||||||
|
// if the current region size is large than the data skew threshold,
|
||||||
|
// split the region into two MapReduce input splits.
|
||||||
|
byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
|
||||||
|
//Set the size of child TableSplit as 1/2 of the region size. The exact size of the
|
||||||
|
// MapReduce input splits is not far off.
|
||||||
|
TableSplit t1 = new TableSplit(table.getName(), ts.getStartRow(), splitKey, regionLocation,
|
||||||
|
regionSize / 2);
|
||||||
|
TableSplit t2 = new TableSplit(table.getName(), splitKey, ts.getEndRow(), regionLocation,
|
||||||
|
regionSize - regionSize / 2);
|
||||||
|
resultList.add(t1);
|
||||||
|
resultList.add(t2);
|
||||||
|
count++;
|
||||||
|
} else if (regionSize >= average) {
|
||||||
|
// if the region size between average size and data skew threshold size,
|
||||||
|
// make this region as one MapReduce input split.
|
||||||
|
resultList.add(ts);
|
||||||
|
count++;
|
||||||
|
} else {
|
||||||
|
// if the total size of several small continuous regions less than the average region size,
|
||||||
|
// combine them into one MapReduce input split.
|
||||||
|
long totalSize = regionSize;
|
||||||
|
byte[] splitStartKey = ts.getStartRow();
|
||||||
|
byte[] splitEndKey = ts.getEndRow();
|
||||||
|
count++;
|
||||||
|
for (; count < list.size(); count++) {
|
||||||
|
TableSplit nextRegion = (TableSplit)list.get(count);
|
||||||
|
long nextRegionSize = nextRegion.getLength();
|
||||||
|
if (totalSize + nextRegionSize <= dataSkewThreshold) {
|
||||||
|
totalSize = totalSize + nextRegionSize;
|
||||||
|
splitEndKey = nextRegion.getEndRow();
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TableSplit t = new TableSplit(table.getName(), splitStartKey, splitEndKey,
|
||||||
|
regionLocation, totalSize);
|
||||||
|
resultList.add(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return resultList;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* select a split point in the region. The selection of the split point is based on an uniform
|
||||||
|
* distribution assumption for the keys in a region.
|
||||||
|
* Here are some examples:
|
||||||
|
* startKey: aaabcdefg endKey: aaafff split point: aaad
|
||||||
|
* startKey: 111000 endKey: 1125790 split point: 111b
|
||||||
|
* startKey: 1110 endKey: 1120 split point: 111_
|
||||||
|
* startKey: binary key { 13, -19, 126, 127 }, endKey: binary key { 13, -19, 127, 0 },
|
||||||
|
* split point: binary key { 13, -19, 127, -64 }
|
||||||
|
* Set this function as "public static", make it easier for test.
|
||||||
|
*
|
||||||
|
* @param start Start key of the region
|
||||||
|
* @param end End key of the region
|
||||||
|
* @param isText It determines to use text key mode or binary key mode
|
||||||
|
* @return The split point in the region.
|
||||||
|
*/
|
||||||
|
public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
|
||||||
|
byte upperLimitByte;
|
||||||
|
byte lowerLimitByte;
|
||||||
|
//Use text mode or binary mode.
|
||||||
|
if (isText) {
|
||||||
|
//The range of text char set in ASCII is [32,126], the lower limit is space and the upper
|
||||||
|
// limit is '~'.
|
||||||
|
upperLimitByte = '~';
|
||||||
|
lowerLimitByte = ' ';
|
||||||
|
} else {
|
||||||
|
upperLimitByte = Byte.MAX_VALUE;
|
||||||
|
lowerLimitByte = Byte.MIN_VALUE;
|
||||||
|
}
|
||||||
|
// For special case
|
||||||
|
// Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
|
||||||
|
// Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~"
|
||||||
|
if (start.length == 0 && end.length == 0){
|
||||||
|
return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
|
||||||
|
}
|
||||||
|
if (start.length == 0 && end.length != 0){
|
||||||
|
return new byte[]{ end[0] };
|
||||||
|
}
|
||||||
|
if (start.length != 0 && end.length == 0){
|
||||||
|
byte[] result =new byte[start.length];
|
||||||
|
result[0]=start[0];
|
||||||
|
for (int k = 1; k < start.length; k++){
|
||||||
|
result[k] = upperLimitByte;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
// A list to store bytes in split key
|
||||||
|
List resultBytesList = new ArrayList();
|
||||||
|
int maxLength = start.length > end.length ? start.length : end.length;
|
||||||
|
for (int i = 0; i < maxLength; i++) {
|
||||||
|
//calculate the midpoint byte between the first difference
|
||||||
|
//for example: "11ae" and "11chw", the midpoint is "11b"
|
||||||
|
//another example: "11ae" and "11bhw", the first different byte is 'a' and 'b',
|
||||||
|
// there is no midpoint between 'a' and 'b', so we need to check the next byte.
|
||||||
|
if (start[i] == end[i]) {
|
||||||
|
resultBytesList.add(start[i]);
|
||||||
|
//For special case like: startKey="aaa", endKey="aaaz", splitKey="aaaM"
|
||||||
|
if (i + 1 == start.length) {
|
||||||
|
resultBytesList.add((byte) ((lowerLimitByte + end[i + 1]) / 2));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//if the two bytes differ by 1, like ['a','b'], We need to check the next byte to find
|
||||||
|
// the midpoint.
|
||||||
|
if ((int)end[i] - (int)start[i] == 1) {
|
||||||
|
//get next byte after the first difference
|
||||||
|
byte startNextByte = (i + 1 < start.length) ? start[i + 1] : lowerLimitByte;
|
||||||
|
byte endNextByte = (i + 1 < end.length) ? end[i + 1] : lowerLimitByte;
|
||||||
|
int byteRange = (upperLimitByte - startNextByte) + (endNextByte - lowerLimitByte) + 1;
|
||||||
|
int halfRange = byteRange / 2;
|
||||||
|
if ((int)startNextByte + halfRange > (int)upperLimitByte) {
|
||||||
|
resultBytesList.add(end[i]);
|
||||||
|
resultBytesList.add((byte) (startNextByte + halfRange - upperLimitByte +
|
||||||
|
lowerLimitByte));
|
||||||
|
} else {
|
||||||
|
resultBytesList.add(start[i]);
|
||||||
|
resultBytesList.add((byte) (startNextByte + halfRange));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//calculate the midpoint key by the fist different byte (normal case),
|
||||||
|
// like "11ae" and "11chw", the midpoint is "11b"
|
||||||
|
resultBytesList.add((byte) ((start[i] + end[i]) / 2));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//transform the List of bytes to byte[]
|
||||||
|
byte result[] = new byte[resultBytesList.size()];
|
||||||
|
for (int k = 0; k < resultBytesList.size(); k++) {
|
||||||
|
result[k] = (byte) resultBytesList.get(k);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
|
@ -344,12 +538,14 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
* This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
|
* This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
|
||||||
* (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
|
* (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
|
||||||
* Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
|
* Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
|
||||||
* continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
|
* continuously. In addition to reducing InputSplits, reduces the load on the region server as
|
||||||
|
* well, due to the ordering of the keys.
|
||||||
* <br>
|
* <br>
|
||||||
* <br>
|
* <br>
|
||||||
* Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region.
|
* Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region.
|
||||||
* <br>
|
* <br>
|
||||||
* Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).
|
* Override this method, if you want to bulk exclude regions altogether from M-R.
|
||||||
|
* By default, no region is excluded( i.e. all regions are included).
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
* @param startKey Start key of the region
|
* @param startKey Start key of the region
|
||||||
|
|
|
@ -19,7 +19,10 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -95,4 +98,97 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase {
|
||||||
throws IOException, InterruptedException, ClassNotFoundException {
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
testScan(null, "opp", "opo");
|
testScan(null, "opp", "opo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests a MR scan using specific number of mappers. The test table has 25 regions,
|
||||||
|
* and all region sizes are set as 0 as default. The average region size is 1 (the smallest
|
||||||
|
* positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into two
|
||||||
|
* MapRedcue input splits, the number of MR input splits should be 50; when we set hbase
|
||||||
|
* .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average region
|
||||||
|
* size, all regions will be combined into 1 MapRedcue input split.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws ClassNotFoundException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
|
HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
|
||||||
|
List<HRegionLocation> locs = table.getRegionLocator().getAllRegionLocations();
|
||||||
|
|
||||||
|
testNumOfSplits("-1", locs.size()*2);
|
||||||
|
table.close();
|
||||||
|
testNumOfSplits("100", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the getSplitKey() method in TableInputFormatBase.java
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws ClassNotFoundException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetSplitsPoint() throws IOException, InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
// Test Case 1: "aaabcdef" and "aaaff", split point is "aaad".
|
||||||
|
byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' };
|
||||||
|
byte[] end1 = { 'a', 'a', 'a', 'f', 'f' };
|
||||||
|
byte[] splitPoint1 = { 'a', 'a', 'a', 'd' };
|
||||||
|
testGetSplitKey(start1, end1, splitPoint1, true);
|
||||||
|
|
||||||
|
// Test Case 2: "111000" and "1125790", split point is "111b".
|
||||||
|
byte[] start2 = { '1', '1', '1', '0', '0', '0' };
|
||||||
|
byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' };
|
||||||
|
byte[] splitPoint2 = { '1', '1', '1', 'b' };
|
||||||
|
testGetSplitKey(start2, end2, splitPoint2, true);
|
||||||
|
|
||||||
|
// Test Case 3: "aaaaaa" and "aab", split point is "aaap".
|
||||||
|
byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' };
|
||||||
|
byte[] end3 = { 'a', 'a', 'b' };
|
||||||
|
byte[] splitPoint3 = { 'a', 'a', 'a', 'p' };
|
||||||
|
testGetSplitKey(start3, end3, splitPoint3, true);
|
||||||
|
|
||||||
|
// Test Case 4: "aaa" and "aaaz", split point is "aaaM".
|
||||||
|
byte[] start4 = { 'a', 'a', 'a' };
|
||||||
|
byte[] end4 = { 'a', 'a', 'a', 'z' };
|
||||||
|
byte[] splitPoint4 = { 'a', 'a', 'a', 'M' };
|
||||||
|
testGetSplitKey(start4, end4, splitPoint4, true);
|
||||||
|
|
||||||
|
// Test Case 5: "aaa" and "aaba", split point is "aaap".
|
||||||
|
byte[] start5 = { 'a', 'a', 'a' };
|
||||||
|
byte[] end5 = { 'a', 'a', 'b', 'a' };
|
||||||
|
byte[] splitPoint5 = { 'a', 'a', 'a', 'p' };
|
||||||
|
testGetSplitKey(start5, end5, splitPoint5, true);
|
||||||
|
|
||||||
|
// Test Case 6: empty key and "hhhqqqwww", split point is "h"
|
||||||
|
byte[] start6 = {};
|
||||||
|
byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' };
|
||||||
|
byte[] splitPoint6 = { 'h' };
|
||||||
|
testGetSplitKey(start6, end6, splitPoint6, true);
|
||||||
|
|
||||||
|
// Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text key or
|
||||||
|
// binary key).
|
||||||
|
byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' };
|
||||||
|
byte[] end7 = {};
|
||||||
|
byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~' };
|
||||||
|
byte[] splitPointBinary7 = { 'f', 127, 127, 127, 127, 127, 127 };
|
||||||
|
testGetSplitKey(start7, end7, splitPointText7, true);
|
||||||
|
testGetSplitKey(start7, end7, splitPointBinary7, false);
|
||||||
|
|
||||||
|
// Test Case 8: both start key and end key are empty. Split point depends on the mode we
|
||||||
|
// choose (text key or binary key).
|
||||||
|
byte[] start8 = {};
|
||||||
|
byte[] end8 = {};
|
||||||
|
byte[] splitPointText8 = { 'O' };
|
||||||
|
byte[] splitPointBinary8 = { 0 };
|
||||||
|
testGetSplitKey(start8, end8, splitPointText8, true);
|
||||||
|
testGetSplitKey(start8, end8, splitPointBinary8, false);
|
||||||
|
|
||||||
|
// Test Case 9: Binary Key example
|
||||||
|
byte[] start9 = { 13, -19, 126, 127 };
|
||||||
|
byte[] end9 = { 13, -19, 127, 0 };
|
||||||
|
byte[] splitPoint9 = { 13, -19, 127, -64 };
|
||||||
|
testGetSplitKey(start9, end9, splitPoint9, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
|
|
||||||
|
@ -30,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
@ -37,12 +40,15 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.Reducer;
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Tests various scan start and stop row scenarios. This is set in a scan and
|
* Tests various scan start and stop row scenarios. This is set in a scan and
|
||||||
|
@ -239,5 +245,42 @@ public abstract class TestTableInputFormatScanBase {
|
||||||
LOG.info("After map/reduce completion - job " + jobName);
|
LOG.info("After map/reduce completion - job " + jobName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests a MR scan using data skew auto-balance
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws ClassNotFoundException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
|
||||||
|
InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
String jobName = "TestJobForNumOfSplits";
|
||||||
|
LOG.info("Before map/reduce startup - job " + jobName);
|
||||||
|
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.addFamily(INPUT_FAMILY);
|
||||||
|
c.set("hbase.mapreduce.input.autobalance", "true");
|
||||||
|
c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
|
||||||
|
c.set(KEY_STARTROW, "");
|
||||||
|
c.set(KEY_LASTROW, "");
|
||||||
|
Job job = new Job(c, jobName);
|
||||||
|
TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
|
||||||
|
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
|
||||||
|
TableInputFormat tif = new TableInputFormat();
|
||||||
|
tif.setConf(job.getConfiguration());
|
||||||
|
Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName()));
|
||||||
|
List<InputSplit> splits = tif.getSplits(job);
|
||||||
|
Assert.assertEquals(expectedNumOfSplits, splits.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for the getSplitKey() method in TableInputFormatBase.java
|
||||||
|
*/
|
||||||
|
public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) {
|
||||||
|
byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
|
||||||
|
Assert.assertArrayEquals(splitKey, result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue