HBASE-16894 Create more than 1 split per region, generalize HBASE-12590
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
56830c3094
commit
16d483f900
|
@ -24,13 +24,12 @@ import java.net.InetAddress;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
|
@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.RegionLocator;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -93,20 +93,21 @@ import org.apache.hadoop.util.StringUtils;
|
|||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
*
|
||||
* The number of InputSplits(mappers) match the number of regions in a table by default.
|
||||
* Set "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set
|
||||
* this property will disable autobalance below.\
|
||||
* Set "hbase.mapreduce.tif.input.autobalance" to enable autobalance, hbase will assign mappers
|
||||
* based on average region size; For regions, whose size larger than average region size may assigned
|
||||
* more mappers, and for smaller one, they may group together to use one mapper. If actual average
|
||||
* region size is too big, like 50G, it is not good to only assign 1 mapper for those large regions.
|
||||
* Use "hbase.mapreduce.tif.ave.regionsize" to set max average region size when enable "autobalanece",
|
||||
* default mas average region size is 8G.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public abstract class TableInputFormatBase
|
||||
extends InputFormat<ImmutableBytesWritable, Result> {
|
||||
|
||||
/** Specify if we enable auto-balance for input in M/R jobs.*/
|
||||
public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
|
||||
/** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce
|
||||
* .input.autobalance property.*/
|
||||
public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" +
|
||||
".maxskewratio";
|
||||
/** Specify if the row key in table is text (ASCII between 32~126),
|
||||
* default is true. False means the table is using binary row key*/
|
||||
public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
|
||||
extends InputFormat<ImmutableBytesWritable, Result> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
|
||||
|
||||
|
@ -114,8 +115,17 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
"initialized. Ensure you call initializeTable either in your constructor or initialize " +
|
||||
"method";
|
||||
private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
|
||||
" previous error. Please look at the previous logs lines from" +
|
||||
" the task's full log for more details.";
|
||||
" previous error. Please look at the previous logs lines from" +
|
||||
" the task's full log for more details.";
|
||||
|
||||
/** Specify if we enable auto-balance to set number of mappers in M/R jobs. */
|
||||
public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance";
|
||||
/** In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it. */
|
||||
public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize";
|
||||
|
||||
/** Set the number of Mappers for each region, all regions have same number of Mappers */
|
||||
public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.tableinput.mappers.per.region";
|
||||
|
||||
|
||||
/** Holds the details for the internal scanner.
|
||||
*
|
||||
|
@ -134,7 +144,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
|
||||
|
||||
/** The reverse DNS lookup cache mapping: IPAddress => HostName */
|
||||
private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>();
|
||||
private HashMap<InetAddress, String> reverseDNSCacheMap =
|
||||
new HashMap<>();
|
||||
|
||||
/**
|
||||
* Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
|
||||
|
@ -151,7 +162,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
@Override
|
||||
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
|
||||
InputSplit split, TaskAttemptContext context)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
// Just in case a subclass is relying on JobConfigurable magic.
|
||||
if (table == null) {
|
||||
initialize(context);
|
||||
|
@ -215,9 +226,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Calculates the splits that will serve as input for the map tasks. The
|
||||
* number of splits matches the number of regions in a table.
|
||||
*
|
||||
* Calculates the splits that will serve as input for the map tasks.
|
||||
* @param context The current job context.
|
||||
* @return The list of input splits.
|
||||
* @throws IOException When creating the list of splits fails.
|
||||
|
@ -245,89 +254,27 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
}
|
||||
|
||||
try {
|
||||
RegionSizeCalculator sizeCalculator =
|
||||
new RegionSizeCalculator(getRegionLocator(), getAdmin());
|
||||
List<InputSplit> splits = oneInputSplitPerRegion();
|
||||
|
||||
TableName tableName = getTable().getName();
|
||||
|
||||
Pair<byte[][], byte[][]> keys = getStartEndKeys();
|
||||
if (keys == null || keys.getFirst() == null ||
|
||||
keys.getFirst().length == 0) {
|
||||
HRegionLocation regLoc =
|
||||
getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
|
||||
if (null == regLoc) {
|
||||
throw new IOException("Expecting at least one region.");
|
||||
// set same number of mappers for each region
|
||||
if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) {
|
||||
int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1);
|
||||
List<InputSplit> res = new ArrayList<>();
|
||||
for (int i = 0; i < splits.size(); i++) {
|
||||
List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion);
|
||||
res.addAll(tmp);
|
||||
}
|
||||
List<InputSplit> splits = new ArrayList<>(1);
|
||||
long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
|
||||
TableSplit split = new TableSplit(tableName, scan,
|
||||
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
|
||||
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
|
||||
splits.add(split);
|
||||
return splits;
|
||||
return res;
|
||||
}
|
||||
List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
|
||||
for (int i = 0; i < keys.getFirst().length; i++) {
|
||||
if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
byte[] startRow = scan.getStartRow();
|
||||
byte[] stopRow = scan.getStopRow();
|
||||
// determine if the given start an stop key fall into the region
|
||||
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
|
||||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
|
||||
(stopRow.length == 0 ||
|
||||
Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
|
||||
byte[] splitStart = startRow.length == 0 ||
|
||||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
|
||||
keys.getFirst()[i] : startRow;
|
||||
byte[] splitStop = (stopRow.length == 0 ||
|
||||
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
|
||||
keys.getSecond()[i].length > 0 ?
|
||||
keys.getSecond()[i] : stopRow;
|
||||
|
||||
HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
|
||||
// The below InetSocketAddress creation does a name resolution.
|
||||
InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
|
||||
if (isa.isUnresolved()) {
|
||||
LOG.warn("Failed resolve " + isa);
|
||||
}
|
||||
InetAddress regionAddress = isa.getAddress();
|
||||
String regionLocation;
|
||||
regionLocation = reverseDNS(regionAddress);
|
||||
|
||||
byte[] regionName = location.getRegionInfo().getRegionName();
|
||||
String encodedRegionName = location.getRegionInfo().getEncodedName();
|
||||
long regionSize = sizeCalculator.getRegionSize(regionName);
|
||||
TableSplit split = new TableSplit(tableName, scan,
|
||||
splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
|
||||
splits.add(split);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("getSplits: split -> " + i + " -> " + split);
|
||||
}
|
||||
}
|
||||
}
|
||||
//The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled.
|
||||
boolean enableAutoBalance = context.getConfiguration()
|
||||
.getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false);
|
||||
if (enableAutoBalance) {
|
||||
long totalRegionSize=0;
|
||||
for (int i = 0; i < splits.size(); i++){
|
||||
TableSplit ts = (TableSplit)splits.get(i);
|
||||
totalRegionSize += ts.getLength();
|
||||
}
|
||||
long averageRegionSize = totalRegionSize / splits.size();
|
||||
// the averageRegionSize must be positive.
|
||||
if (averageRegionSize <= 0) {
|
||||
LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
|
||||
"set it to 1.");
|
||||
averageRegionSize = 1;
|
||||
}
|
||||
return calculateRebalancedSplits(splits, context, averageRegionSize);
|
||||
} else {
|
||||
return splits;
|
||||
//The default value of "hbase.mapreduce.input.autobalance" is false.
|
||||
if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false) != false) {
|
||||
long maxAveRegionSize = context.getConfiguration().getInt(MAX_AVERAGE_REGION_SIZE, 8*1073741824);
|
||||
return calculateAutoBalancedSplits(splits, maxAveRegionSize);
|
||||
}
|
||||
|
||||
// return one mapper per region
|
||||
return splits;
|
||||
} finally {
|
||||
if (closeOnFinish) {
|
||||
closeTable();
|
||||
|
@ -335,6 +282,218 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create one InputSplit per region
|
||||
*
|
||||
* @return The list of InputSplit for all the regions
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<InputSplit> oneInputSplitPerRegion() throws IOException {
|
||||
RegionSizeCalculator sizeCalculator =
|
||||
new RegionSizeCalculator(getRegionLocator(), getAdmin());
|
||||
|
||||
TableName tableName = getTable().getName();
|
||||
|
||||
Pair<byte[][], byte[][]> keys = getStartEndKeys();
|
||||
if (keys == null || keys.getFirst() == null ||
|
||||
keys.getFirst().length == 0) {
|
||||
HRegionLocation regLoc =
|
||||
getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
|
||||
if (null == regLoc) {
|
||||
throw new IOException("Expecting at least one region.");
|
||||
}
|
||||
List<InputSplit> splits = new ArrayList<>(1);
|
||||
long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
|
||||
TableSplit split = new TableSplit(tableName, scan,
|
||||
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
|
||||
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
|
||||
splits.add(split);
|
||||
return splits;
|
||||
}
|
||||
List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
|
||||
for (int i = 0; i < keys.getFirst().length; i++) {
|
||||
if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
byte[] startRow = scan.getStartRow();
|
||||
byte[] stopRow = scan.getStopRow();
|
||||
// determine if the given start an stop key fall into the region
|
||||
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
|
||||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
|
||||
(stopRow.length == 0 ||
|
||||
Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
|
||||
byte[] splitStart = startRow.length == 0 ||
|
||||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
|
||||
keys.getFirst()[i] : startRow;
|
||||
byte[] splitStop = (stopRow.length == 0 ||
|
||||
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
|
||||
keys.getSecond()[i].length > 0 ?
|
||||
keys.getSecond()[i] : stopRow;
|
||||
|
||||
HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
|
||||
// The below InetSocketAddress creation does a name resolution.
|
||||
InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
|
||||
if (isa.isUnresolved()) {
|
||||
LOG.warn("Failed resolve " + isa);
|
||||
}
|
||||
InetAddress regionAddress = isa.getAddress();
|
||||
String regionLocation;
|
||||
regionLocation = reverseDNS(regionAddress);
|
||||
|
||||
byte[] regionName = location.getRegionInfo().getRegionName();
|
||||
String encodedRegionName = location.getRegionInfo().getEncodedName();
|
||||
long regionSize = sizeCalculator.getRegionSize(regionName);
|
||||
TableSplit split = new TableSplit(tableName, scan,
|
||||
splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
|
||||
splits.add(split);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("getSplits: split -> " + i + " -> " + split);
|
||||
}
|
||||
}
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create n splits for one InputSplit, For now only support uniform distribution
|
||||
* @param split A TableSplit corresponding to a range of rowkeys
|
||||
* @param n Number of ranges after splitting. Pass 1 means no split for the range
|
||||
* Pass 2 if you want to split the range in two;
|
||||
* @return A list of TableSplit, the size of the list is n
|
||||
* @throws IllegalArgumentIOException
|
||||
*/
|
||||
protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
|
||||
throws IllegalArgumentIOException {
|
||||
if (split == null || !(split instanceof TableSplit)) {
|
||||
throw new IllegalArgumentIOException(
|
||||
"InputSplit for CreateNSplitsPerRegion can not be null + "
|
||||
+ "and should be instance of TableSplit");
|
||||
}
|
||||
//if n < 1, then still continue using n = 1
|
||||
n = n < 1 ? 1 : n;
|
||||
List<InputSplit> res = new ArrayList<>(n);
|
||||
if (n == 1) {
|
||||
res.add(split);
|
||||
return res;
|
||||
}
|
||||
|
||||
// Collect Region related information
|
||||
TableSplit ts = (TableSplit) split;
|
||||
TableName tableName = ts.getTable();
|
||||
String regionLocation = ts.getRegionLocation();
|
||||
String encodedRegionName = ts.getEncodedRegionName();
|
||||
long regionSize = ts.getLength();
|
||||
byte[] startRow = ts.getStartRow();
|
||||
byte[] endRow = ts.getEndRow();
|
||||
|
||||
// For special case: startRow or endRow is empty
|
||||
if (startRow.length == 0 && endRow.length == 0){
|
||||
startRow = new byte[1];
|
||||
endRow = new byte[1];
|
||||
startRow[0] = 0;
|
||||
endRow[0] = -1;
|
||||
}
|
||||
if (startRow.length == 0 && endRow.length != 0){
|
||||
startRow = new byte[1];
|
||||
startRow[0] = 0;
|
||||
}
|
||||
if (startRow.length != 0 && endRow.length == 0){
|
||||
endRow =new byte[startRow.length];
|
||||
for (int k = 0; k < startRow.length; k++){
|
||||
endRow[k] = -1;
|
||||
}
|
||||
}
|
||||
|
||||
// Split Region into n chunks evenly
|
||||
byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
|
||||
for (int i = 0; i < splitKeys.length - 1; i++) {
|
||||
//notice that the regionSize parameter may be not very accurate
|
||||
TableSplit tsplit =
|
||||
new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation,
|
||||
encodedRegionName, regionSize / n);
|
||||
res.add(tsplit);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
/**
|
||||
* Calculates the number of MapReduce input splits for the map tasks. The number of
|
||||
* MapReduce input splits depends on the average region size.
|
||||
* Make it 'public' for testing
|
||||
*
|
||||
* @param splits The list of input splits before balance.
|
||||
* @param maxAverageRegionSize max Average region size for one mapper
|
||||
* @return The list of input splits.
|
||||
* @throws IOException When creating the list of splits fails.
|
||||
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
|
||||
*org.apache.hadoop.mapreduce.JobContext)
|
||||
*/
|
||||
public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, long maxAverageRegionSize)
|
||||
throws IOException {
|
||||
if (splits.size() == 0) {
|
||||
return splits;
|
||||
}
|
||||
List<InputSplit> resultList = new ArrayList<>();
|
||||
long totalRegionSize = 0;
|
||||
for (int i = 0; i < splits.size(); i++) {
|
||||
TableSplit ts = (TableSplit) splits.get(i);
|
||||
totalRegionSize += ts.getLength();
|
||||
}
|
||||
long averageRegionSize = totalRegionSize / splits.size();
|
||||
// totalRegionSize might be overflow, and the averageRegionSize must be positive.
|
||||
if (averageRegionSize <= 0) {
|
||||
LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " +
|
||||
"set it to Long.MAX_VALUE " + splits.size());
|
||||
averageRegionSize = Long.MAX_VALUE / splits.size();
|
||||
}
|
||||
//if averageRegionSize is too big, change it to default as 1 GB,
|
||||
if (averageRegionSize > maxAverageRegionSize) {
|
||||
averageRegionSize = maxAverageRegionSize;
|
||||
}
|
||||
// if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region
|
||||
// set default as 16M = (default hdfs block size) / 4;
|
||||
if (averageRegionSize < 16 * 1048576) {
|
||||
return splits;
|
||||
}
|
||||
for (int i = 0; i < splits.size(); i++) {
|
||||
TableSplit ts = (TableSplit) splits.get(i);
|
||||
TableName tableName = ts.getTable();
|
||||
String regionLocation = ts.getRegionLocation();
|
||||
String encodedRegionName = ts.getEncodedRegionName();
|
||||
long regionSize = ts.getLength();
|
||||
|
||||
if (regionSize >= averageRegionSize) {
|
||||
// make this region as multiple MapReduce input split.
|
||||
int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0);
|
||||
List<InputSplit> temp = createNInputSplitsUniform(ts, n);
|
||||
resultList.addAll(temp);
|
||||
} else {
|
||||
// if the total size of several small continuous regions less than the average region size,
|
||||
// combine them into one MapReduce input split.
|
||||
long totalSize = regionSize;
|
||||
byte[] splitStartKey = ts.getStartRow();
|
||||
byte[] splitEndKey = ts.getEndRow();
|
||||
int j = i + 1;
|
||||
while (j < splits.size()) {
|
||||
TableSplit nextRegion = (TableSplit) splits.get(j);
|
||||
long nextRegionSize = nextRegion.getLength();
|
||||
if (totalSize + nextRegionSize <= averageRegionSize) {
|
||||
totalSize = totalSize + nextRegionSize;
|
||||
splitEndKey = nextRegion.getEndRow();
|
||||
j++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
i = j - 1;
|
||||
TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation,
|
||||
encodedRegionName, totalSize);
|
||||
resultList.add(t);
|
||||
}
|
||||
}
|
||||
return resultList;
|
||||
}
|
||||
|
||||
String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
|
||||
String hostName = this.reverseDNSCacheMap.get(ipAddress);
|
||||
if (hostName == null) {
|
||||
|
@ -354,162 +513,6 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
return hostName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the number of MapReduce input splits for the map tasks. The number of
|
||||
* MapReduce input splits depends on the average region size and the "data skew ratio" user set in
|
||||
* configuration.
|
||||
*
|
||||
* @param list The list of input splits before balance.
|
||||
* @param context The current job context.
|
||||
* @param average The average size of all regions .
|
||||
* @return The list of input splits.
|
||||
* @throws IOException When creating the list of splits fails.
|
||||
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
|
||||
* org.apache.hadoop.mapreduce.JobContext)
|
||||
*/
|
||||
private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context,
|
||||
long average) throws IOException {
|
||||
List<InputSplit> resultList = new ArrayList<>();
|
||||
Configuration conf = context.getConfiguration();
|
||||
//The default data skew ratio is 3
|
||||
long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
|
||||
//It determines which mode to use: text key mode or binary key mode. The default is text mode.
|
||||
boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
|
||||
long dataSkewThreshold = dataSkewRatio * average;
|
||||
int count = 0;
|
||||
while (count < list.size()) {
|
||||
TableSplit ts = (TableSplit)list.get(count);
|
||||
TableName tableName = ts.getTable();
|
||||
String regionLocation = ts.getRegionLocation();
|
||||
String encodedRegionName = ts.getEncodedRegionName();
|
||||
long regionSize = ts.getLength();
|
||||
if (regionSize >= dataSkewThreshold) {
|
||||
// if the current region size is large than the data skew threshold,
|
||||
// split the region into two MapReduce input splits.
|
||||
byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
|
||||
if (Arrays.equals(ts.getEndRow(), splitKey)) {
|
||||
// Not splitting since the end key is the same as the split key
|
||||
resultList.add(ts);
|
||||
} else {
|
||||
//Set the size of child TableSplit as 1/2 of the region size. The exact size of the
|
||||
// MapReduce input splits is not far off.
|
||||
TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey,
|
||||
regionLocation, regionSize / 2);
|
||||
TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation,
|
||||
regionSize - regionSize / 2);
|
||||
resultList.add(t1);
|
||||
resultList.add(t2);
|
||||
}
|
||||
count++;
|
||||
} else if (regionSize >= average) {
|
||||
// if the region size between average size and data skew threshold size,
|
||||
// make this region as one MapReduce input split.
|
||||
resultList.add(ts);
|
||||
count++;
|
||||
} else {
|
||||
// if the total size of several small continuous regions less than the average region size,
|
||||
// combine them into one MapReduce input split.
|
||||
long totalSize = regionSize;
|
||||
byte[] splitStartKey = ts.getStartRow();
|
||||
byte[] splitEndKey = ts.getEndRow();
|
||||
count++;
|
||||
for (; count < list.size(); count++) {
|
||||
TableSplit nextRegion = (TableSplit)list.get(count);
|
||||
long nextRegionSize = nextRegion.getLength();
|
||||
if (totalSize + nextRegionSize <= dataSkewThreshold) {
|
||||
totalSize = totalSize + nextRegionSize;
|
||||
splitEndKey = nextRegion.getEndRow();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey,
|
||||
regionLocation, encodedRegionName, totalSize);
|
||||
resultList.add(t);
|
||||
}
|
||||
}
|
||||
return resultList;
|
||||
}
|
||||
|
||||
/**
|
||||
* select a split point in the region. The selection of the split point is based on an uniform
|
||||
* distribution assumption for the keys in a region.
|
||||
* Here are some examples:
|
||||
*
|
||||
* <table>
|
||||
* <tr>
|
||||
* <th>start key</th>
|
||||
* <th>end key</th>
|
||||
* <th>is text</th>
|
||||
* <th>split point</th>
|
||||
* </tr>
|
||||
* <tr>
|
||||
* <td>'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g'</td>
|
||||
* <td>'a', 'a', 'a', 'f', 'f', 'f'</td>
|
||||
* <td>true</td>
|
||||
* <td>'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51</td>
|
||||
* </tr>
|
||||
* <tr>
|
||||
* <td>'1', '1', '1', '0', '0', '0'</td>
|
||||
* <td>'1', '1', '2', '5', '7', '9', '0'</td>
|
||||
* <td>true</td>
|
||||
* <td>'1', '1', '1', -78, -77, -76, -104</td>
|
||||
* </tr>
|
||||
* <tr>
|
||||
* <td>'1', '1', '1', '0'</td>
|
||||
* <td>'1', '1', '2', '0'</td>
|
||||
* <td>true</td>
|
||||
* <td>'1', '1', '1', -80</td>
|
||||
* </tr>
|
||||
* <tr>
|
||||
* <td>13, -19, 126, 127</td>
|
||||
* <td>13, -19, 127, 0</td>
|
||||
* <td>false</td>
|
||||
* <td>13, -19, 126, -65</td>
|
||||
* </tr>
|
||||
* </table>
|
||||
*
|
||||
* Set this function as "public static", make it easier for test.
|
||||
*
|
||||
* @param start Start key of the region
|
||||
* @param end End key of the region
|
||||
* @param isText It determines to use text key mode or binary key mode
|
||||
* @return The split point in the region.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
|
||||
byte upperLimitByte;
|
||||
byte lowerLimitByte;
|
||||
//Use text mode or binary mode.
|
||||
if (isText) {
|
||||
//The range of text char set in ASCII is [32,126], the lower limit is space and the upper
|
||||
// limit is '~'.
|
||||
upperLimitByte = '~';
|
||||
lowerLimitByte = ' ';
|
||||
} else {
|
||||
upperLimitByte = -1;
|
||||
lowerLimitByte = 0;
|
||||
}
|
||||
// For special case
|
||||
// Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
|
||||
// Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~"
|
||||
if (start.length == 0 && end.length == 0){
|
||||
return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
|
||||
}
|
||||
if (start.length == 0 && end.length != 0){
|
||||
return new byte[]{ end[0] };
|
||||
}
|
||||
if (start.length != 0 && end.length == 0){
|
||||
byte[] result =new byte[start.length];
|
||||
result[0]=start[0];
|
||||
for (int k = 1; k < start.length; k++){
|
||||
result[k] = upperLimitByte;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return Bytes.split(start, end, false, 1)[1];
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if the given region is to be included in the InputSplit while splitting
|
||||
* the regions of a table.
|
||||
|
@ -649,4 +652,4 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -98,103 +98,38 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase {
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests a MR scan using specific number of mappers. The test table has 25 regions,
|
||||
* and all region sizes are set as 0 as default. The average region size is 1 (the smallest
|
||||
* positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into two
|
||||
* MapRedcue input splits, the number of MR input splits should be 50; when we set hbase
|
||||
* .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average region
|
||||
* size, all regions will be combined into 1 MapRedcue input split.
|
||||
* Tests a MR scan using specific number of mappers. The test table has 26 regions,
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws ClassNotFoundException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException {
|
||||
testNumOfSplits("-1", 52);
|
||||
testNumOfSplits("100", 1);
|
||||
public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException {
|
||||
testNumOfSplits(1, 26);
|
||||
testNumOfSplits(3, 78);
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a MR to test TIF using specific number of mappers. The test table has 26 regions,
|
||||
* @throws InterruptedException
|
||||
* @throws IOException
|
||||
* @throws ClassNotFoundException
|
||||
*/
|
||||
@Test
|
||||
public void testSpecifiedNumOfMappersMR()
|
||||
throws InterruptedException, IOException, ClassNotFoundException {
|
||||
testNumOfSplitsMR(2, 52);
|
||||
testNumOfSplitsMR(4, 104);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the getSplitKey() method in TableInputFormatBase.java
|
||||
*
|
||||
* Test if autoBalance create correct splits
|
||||
* @throws IOException
|
||||
* @throws ClassNotFoundException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
public void testGetSplitsPoint() throws IOException, InterruptedException,
|
||||
ClassNotFoundException {
|
||||
byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' };
|
||||
byte[] end1 = { 'a', 'a', 'a', 'f', 'f' };
|
||||
byte[] splitPoint1 = { 'a', 'a', 'a', 'd', 'd', -78, 50, -77 };
|
||||
testGetSplitKey(start1, end1, splitPoint1, true);
|
||||
|
||||
byte[] start2 = { '1', '1', '1', '0', '0', '0' };
|
||||
byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' };
|
||||
byte[] splitPoint2 = { '1', '1', '1', -78, -77, -76, -104 };
|
||||
testGetSplitKey(start2, end2, splitPoint2, true);
|
||||
|
||||
byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' };
|
||||
byte[] end3 = { 'a', 'a', 'b' };
|
||||
byte[] splitPoint3 = { 'a', 'a', 'a', -80, -80, -80 };
|
||||
testGetSplitKey(start3, end3, splitPoint3, true);
|
||||
|
||||
byte[] start4 = { 'a', 'a', 'a' };
|
||||
byte[] end4 = { 'a', 'a', 'a', 'z' };
|
||||
byte[] splitPoint4 = { 'a', 'a', 'a', '=' };
|
||||
testGetSplitKey(start4, end4, splitPoint4, true);
|
||||
|
||||
byte[] start5 = { 'a', 'a', 'a' };
|
||||
byte[] end5 = { 'a', 'a', 'b', 'a' };
|
||||
byte[] splitPoint5 = { 'a', 'a', 'a', -80 };
|
||||
testGetSplitKey(start5, end5, splitPoint5, true);
|
||||
|
||||
// Test Case 6: empty key and "hhhqqqwww", split point is "h"
|
||||
byte[] start6 = {};
|
||||
byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' };
|
||||
byte[] splitPointText6 = { 'h' };
|
||||
byte[] splitPointBinary6 = { 104 };
|
||||
testGetSplitKey(start6, end6, splitPointText6, true);
|
||||
testGetSplitKey(start6, end6, splitPointBinary6, false);
|
||||
|
||||
// Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text key or
|
||||
// binary key).
|
||||
byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' };
|
||||
byte[] end7 = {};
|
||||
byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~' };
|
||||
byte[] splitPointBinary7 = { 'f', -1, -1, -1, -1, -1, -1 };
|
||||
testGetSplitKey(start7, end7, splitPointText7, true);
|
||||
testGetSplitKey(start7, end7, splitPointBinary7, false);
|
||||
|
||||
// Test Case 8: both start key and end key are empty. Split point depends on the mode we
|
||||
// choose (text key or binary key).
|
||||
byte[] start8 = {};
|
||||
byte[] end8 = {};
|
||||
byte[] splitPointText8 = { 'O' };
|
||||
byte[] splitPointBinary8 = { 0 };
|
||||
testGetSplitKey(start8, end8, splitPointText8, true);
|
||||
testGetSplitKey(start8, end8, splitPointBinary8, false);
|
||||
|
||||
// Test Case 9: Binary Key example
|
||||
byte[] start9 = { 13, -19, 126, 127 };
|
||||
byte[] end9 = { 13, -19, 127, 0 };
|
||||
byte[] splitPoint9 = { 13, -19, 126, -65 };
|
||||
testGetSplitKey(start9, end9, splitPoint9, false);
|
||||
|
||||
// Test Case 10: Binary key split when the start key is an unsigned byte and the end byte is a
|
||||
// signed byte
|
||||
byte[] start10 = { 'x' };
|
||||
byte[] end10 = { -128 };
|
||||
byte[] splitPoint10 = { '|' };
|
||||
testGetSplitKey(start10, end10, splitPoint10, false);
|
||||
|
||||
// Test Case 11: Binary key split when the start key is an signed byte and the end byte is a
|
||||
// signed byte
|
||||
byte[] start11 = { -100 };
|
||||
byte[] end11 = { -90 };
|
||||
byte[] splitPoint11 = { -95 };
|
||||
testGetSplitKey(start11, end11, splitPoint11, false);
|
||||
public void testAutoBalanceSplits() throws IOException {
|
||||
testAutobalanceNumOfSplit();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,13 +19,11 @@
|
|||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -39,10 +37,13 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -134,7 +135,7 @@ public abstract class TestTableInputFormatScanBase {
|
|||
*/
|
||||
public static class ScanReducer
|
||||
extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
|
||||
NullWritable, NullWritable> {
|
||||
NullWritable, NullWritable> {
|
||||
|
||||
private String first = null;
|
||||
private String last = null;
|
||||
|
@ -247,28 +248,28 @@ public abstract class TestTableInputFormatScanBase {
|
|||
|
||||
|
||||
/**
|
||||
* Tests a MR scan using data skew auto-balance
|
||||
* Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX
|
||||
* This test does not run MR job
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws ClassNotFoundException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
|
||||
InterruptedException,
|
||||
ClassNotFoundException {
|
||||
public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws IOException,
|
||||
InterruptedException,
|
||||
ClassNotFoundException {
|
||||
String jobName = "TestJobForNumOfSplits";
|
||||
LOG.info("Before map/reduce startup - job " + jobName);
|
||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(INPUT_FAMILYS[0]);
|
||||
scan.addFamily(INPUT_FAMILYS[1]);
|
||||
c.set("hbase.mapreduce.input.autobalance", "true");
|
||||
c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
|
||||
c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
|
||||
c.set(KEY_STARTROW, "");
|
||||
c.set(KEY_LASTROW, "");
|
||||
Job job = new Job(c, jobName);
|
||||
TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
|
||||
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
|
||||
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
|
||||
TableInputFormat tif = new TableInputFormat();
|
||||
tif.setConf(job.getConfiguration());
|
||||
Assert.assertEquals(TABLE_NAME, table.getName());
|
||||
|
@ -277,11 +278,61 @@ public abstract class TestTableInputFormatScanBase {
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests for the getSplitKey() method in TableInputFormatBase.java
|
||||
* Run MR job to check the number of mapper = expectedNumOfSplits
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws ClassNotFoundException
|
||||
*/
|
||||
public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) {
|
||||
byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
|
||||
Assert.assertArrayEquals(splitKey, result);
|
||||
public void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) throws IOException,
|
||||
InterruptedException,
|
||||
ClassNotFoundException {
|
||||
String jobName = "TestJobForNumOfSplits-MR";
|
||||
LOG.info("Before map/reduce startup - job " + jobName);
|
||||
JobConf c = new JobConf(TEST_UTIL.getConfiguration());
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(INPUT_FAMILYS[0]);
|
||||
scan.addFamily(INPUT_FAMILYS[1]);
|
||||
c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
|
||||
c.set(KEY_STARTROW, "");
|
||||
c.set(KEY_LASTROW, "");
|
||||
Job job = Job.getInstance(c, jobName);
|
||||
TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
|
||||
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
|
||||
job.setReducerClass(ScanReducer.class);
|
||||
job.setNumReduceTasks(1);
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
assertTrue("job failed!", job.waitForCompletion(true));
|
||||
// for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
|
||||
// we use TaskCounter.SHUFFLED_MAPS to get total launched maps
|
||||
assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits,
|
||||
job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Run MR job to test autobalance for setting number of mappers for TIF
|
||||
* This does not run real MR job
|
||||
*/
|
||||
public void testAutobalanceNumOfSplit() throws IOException {
|
||||
// set up splits for testing
|
||||
List<InputSplit> splits = new ArrayList<>(5);
|
||||
int[] regionLen = {10, 20, 20, 40, 60};
|
||||
for (int i = 0; i < 5; i++) {
|
||||
InputSplit split = new TableSplit(TABLE_NAME, new Scan(),
|
||||
Bytes.toBytes(i), Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576);
|
||||
splits.add(split);
|
||||
}
|
||||
TableInputFormat tif = new TableInputFormat();
|
||||
List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824);
|
||||
|
||||
assertEquals("Saw the wrong number of splits", 5, res.size());
|
||||
TableSplit ts1 = (TableSplit) res.get(0);
|
||||
assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow()));
|
||||
TableSplit ts2 = (TableSplit) res.get(1);
|
||||
assertEquals("The second split regionsize should be", 20 * 1048576, ts2.getLength());
|
||||
TableSplit ts3 = (TableSplit) res.get(2);
|
||||
assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow()));
|
||||
TableSplit ts4 = (TableSplit) res.get(4);
|
||||
assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue