HBASE-16894 Create more than 1 split per region, generalize HBASE-12590

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Yi Liang 2017-09-29 15:12:03 -07:00 committed by Andrew Purtell
parent 1c7321f9d5
commit fc783ef045
4 changed files with 302 additions and 319 deletions

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -99,23 +100,25 @@ import org.apache.hadoop.util.StringUtils;
* setScan(scan); * setScan(scan);
* } * }
* } * }
*
* The number of InputSplits(mappers) match the number of regions in a table by default.
* Set "hbase.mapreduce.input.mappers.per.region" to specify how many mappers per region, set
* this property will disable autobalance below.
*
* Set "hbase.mapreduce.input.autobalance" to enable autobalance, hbase will assign mappers based on
* average region size; For regions, whose size larger than average region size may assigned more mappers,
* and for continuous small one, they may group together to use one mapper. If actual calculated average
* region size is too big, it is not good to only assign 1 mapper for those large regions. Then use
* "hbase.mapreduce.input.average.regionsize" to set max average region size when enable "autobalanece",
* default was average region size is 8G.
* </pre> * </pre>
*
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public abstract class TableInputFormatBase public abstract class TableInputFormatBase
extends InputFormat<ImmutableBytesWritable, Result> { extends InputFormat<ImmutableBytesWritable, Result> {
/** Specify if we enable auto-balance for input in M/R jobs.*/
public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
/** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce
* .input.autobalance property.*/
public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" +
".maxskewratio";
/** Specify if the row key in table is text (ASCII between 32~126),
* default is true. False means the table is using binary row key*/
public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
private static final String NOT_INITIALIZED = "The input format instance has not been properly " + private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
@ -125,6 +128,14 @@ extends InputFormat<ImmutableBytesWritable, Result> {
" previous error. Please look at the previous logs lines from" + " previous error. Please look at the previous logs lines from" +
" the task's full log for more details."; " the task's full log for more details.";
/** Specify if we enable auto-balance to set number of mappers in M/R jobs. */
public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
/** In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it. */
public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.input.average.regionsize";
/** Set the number of Mappers for each region, all regions have same number of Mappers */
public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.input.mappers.per.region";
/** Holds the details for the internal scanner. /** Holds the details for the internal scanner.
* *
* @see Scan */ * @see Scan */
@ -140,7 +151,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
/** The underlying {@link Connection} of the table. */ /** The underlying {@link Connection} of the table. */
private Connection connection; private Connection connection;
/** The reverse DNS lookup cache mapping: IPAddress => HostName */ /** The reverse DNS lookup cache mapping: IPAddress => HostName */
private HashMap<InetAddress, String> reverseDNSCacheMap = private HashMap<InetAddress, String> reverseDNSCacheMap =
new HashMap<InetAddress, String>(); new HashMap<InetAddress, String>();
@ -252,28 +263,68 @@ extends InputFormat<ImmutableBytesWritable, Result> {
} catch (IllegalStateException exception) { } catch (IllegalStateException exception) {
throw new IOException(INITIALIZATION_ERROR, exception); throw new IOException(INITIALIZATION_ERROR, exception);
} }
try { try {
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, admin); List<InputSplit> splits = oneInputSplitPerRegion();
// set same number of mappers for each region
if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) {
int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1);
List<InputSplit> res = new ArrayList<>();
for (int i = 0; i < splits.size(); i++) {
List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion);
res.addAll(tmp);
}
return res;
}
//The default value of "hbase.mapreduce.input.autobalance" is false.
if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false) != false) {
long maxAveRegionSize = context.getConfiguration().getInt(MAX_AVERAGE_REGION_SIZE, 8*1073741824);
return calculateAutoBalancedSplits(splits, maxAveRegionSize);
}
// return one mapper per region
return splits;
} catch (NamingException e) {
throw new IOException(e);
} finally {
if (closeOnFinish) {
closeTable();
}
}
}
/**
* Create one InputSplit per region
*
* @return The list of InputSplit for all the regions
* @throws IOException
*/
private List<InputSplit> oneInputSplitPerRegion() throws IOException, NamingException {
RegionSizeCalculator sizeCalculator =
new RegionSizeCalculator(getRegionLocator(), getAdmin());
TableName tableName = getTable().getName();
Pair<byte[][], byte[][]> keys = getStartEndKeys(); Pair<byte[][], byte[][]> keys = getStartEndKeys();
if (keys == null || keys.getFirst() == null || if (keys == null || keys.getFirst() == null ||
keys.getFirst().length == 0) { keys.getFirst().length == 0) {
HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); HRegionLocation regLoc =
getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
if (null == regLoc) { if (null == regLoc) {
throw new IOException("Expecting at least one region."); throw new IOException("Expecting at least one region.");
} }
List<InputSplit> splits = new ArrayList<InputSplit>(1); List<InputSplit> splits = new ArrayList<>(1);
long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
TableSplit split = new TableSplit(table.getName(), scan, TableSplit split = new TableSplit(tableName, scan,
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
splits.add(split); splits.add(split);
return splits; return splits;
} }
List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length); List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) { for (int i = 0; i < keys.getFirst().length; i++) {
if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue; continue;
} }
@ -283,16 +334,16 @@ extends InputFormat<ImmutableBytesWritable, Result> {
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
(stopRow.length == 0 || (stopRow.length == 0 ||
Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
byte[] splitStart = startRow.length == 0 || byte[] splitStart = startRow.length == 0 ||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
keys.getFirst()[i] : startRow; keys.getFirst()[i] : startRow;
byte[] splitStop = (stopRow.length == 0 || byte[] splitStop = (stopRow.length == 0 ||
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
keys.getSecond()[i].length > 0 ? keys.getSecond()[i].length > 0 ?
keys.getSecond()[i] : stopRow; keys.getSecond()[i] : stopRow;
HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false); HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
// The below InetSocketAddress creation does a name resolution. // The below InetSocketAddress creation does a name resolution.
InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
if (isa.isUnresolved()) { if (isa.isUnresolved()) {
@ -300,49 +351,159 @@ extends InputFormat<ImmutableBytesWritable, Result> {
} }
InetAddress regionAddress = isa.getAddress(); InetAddress regionAddress = isa.getAddress();
String regionLocation; String regionLocation;
try { regionLocation = reverseDNS(regionAddress);
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e);
regionLocation = location.getHostname();
}
byte[] regionName = location.getRegionInfo().getRegionName(); byte[] regionName = location.getRegionInfo().getRegionName();
String encodedRegionName = location.getRegionInfo().getEncodedName(); String encodedRegionName = location.getRegionInfo().getEncodedName();
long regionSize = sizeCalculator.getRegionSize(regionName); long regionSize = sizeCalculator.getRegionSize(regionName);
TableSplit split = new TableSplit(table.getName(), scan, TableSplit split = new TableSplit(tableName, scan,
splitStart, splitStop, regionLocation, encodedRegionName, regionSize); splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
splits.add(split); splits.add(split);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("getSplits: split -> " + i + " -> " + split); LOG.debug("getSplits: split -> " + i + " -> " + split);
} }
} }
} }
//The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled. return splits;
boolean enableAutoBalance = context.getConfiguration().getBoolean( }
MAPREDUCE_INPUT_AUTOBALANCE, false);
if (enableAutoBalance) { /**
long totalRegionSize=0; * Create n splits for one InputSplit, For now only support uniform distribution
for (int i = 0; i < splits.size(); i++){ * @param split A TableSplit corresponding to a range of rowkeys
TableSplit ts = (TableSplit)splits.get(i); * @param n Number of ranges after splitting. Pass 1 means no split for the range
totalRegionSize += ts.getLength(); * Pass 2 if you want to split the range in two;
* @return A list of TableSplit, the size of the list is n
* @throws IllegalArgumentIOException
*/
protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
throws IllegalArgumentIOException {
if (split == null || !(split instanceof TableSplit)) {
throw new IllegalArgumentIOException(
"InputSplit for CreateNSplitsPerRegion can not be null + "
+ "and should be instance of TableSplit");
}
//if n < 1, then still continue using n = 1
n = n < 1 ? 1 : n;
List<InputSplit> res = new ArrayList<>(n);
if (n == 1) {
res.add(split);
return res;
}
// Collect Region related information
TableSplit ts = (TableSplit) split;
TableName tableName = ts.getTable();
String regionLocation = ts.getRegionLocation();
String encodedRegionName = ts.getEncodedRegionName();
long regionSize = ts.getLength();
byte[] startRow = ts.getStartRow();
byte[] endRow = ts.getEndRow();
// For special case: startRow or endRow is empty
if (startRow.length == 0 && endRow.length == 0){
startRow = new byte[1];
endRow = new byte[1];
startRow[0] = 0;
endRow[0] = -1;
}
if (startRow.length == 0 && endRow.length != 0){
startRow = new byte[1];
startRow[0] = 0;
}
if (startRow.length != 0 && endRow.length == 0){
endRow =new byte[startRow.length];
for (int k = 0; k < startRow.length; k++){
endRow[k] = -1;
} }
long averageRegionSize = totalRegionSize / splits.size(); }
// the averageRegionSize must be positive.
if (averageRegionSize <= 0) { // Split Region into n chunks evenly
LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " + byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
"set it to 1."); for (int i = 0; i < splitKeys.length - 1; i++) {
averageRegionSize = 1; //notice that the regionSize parameter may be not very accurate
} TableSplit tsplit =
return calculateRebalancedSplits(splits, context, averageRegionSize); new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation,
} else { encodedRegionName, regionSize / n);
res.add(tsplit);
}
return res;
}
/**
* Calculates the number of MapReduce input splits for the map tasks. The number of
* MapReduce input splits depends on the average region size.
* Make it 'public' for testing
*
* @param splits The list of input splits before balance.
* @param maxAverageRegionSize max Average region size for one mapper
* @return The list of input splits.
* @throws IOException When creating the list of splits fails.
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
*org.apache.hadoop.mapreduce.JobContext)
*/
public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, long maxAverageRegionSize)
throws IOException {
if (splits.size() == 0) {
return splits; return splits;
} }
} finally { List<InputSplit> resultList = new ArrayList<>();
if (closeOnFinish) { long totalRegionSize = 0;
closeTable(); for (int i = 0; i < splits.size(); i++) {
TableSplit ts = (TableSplit) splits.get(i);
totalRegionSize += ts.getLength();
}
long averageRegionSize = totalRegionSize / splits.size();
// totalRegionSize might be overflow, and the averageRegionSize must be positive.
if (averageRegionSize <= 0) {
LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " +
"set it to Long.MAX_VALUE " + splits.size());
averageRegionSize = Long.MAX_VALUE / splits.size();
}
//if averageRegionSize is too big, change it to default as 8 GB,
if (averageRegionSize > maxAverageRegionSize) {
averageRegionSize = maxAverageRegionSize;
}
// if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region
// set default as 64M = (default hdfs block size);
if (averageRegionSize < 64 * 1048576) {
return splits;
}
for (int i = 0; i < splits.size(); i++) {
TableSplit ts = (TableSplit) splits.get(i);
TableName tableName = ts.getTable();
String regionLocation = ts.getRegionLocation();
String encodedRegionName = ts.getEncodedRegionName();
long regionSize = ts.getLength();
if (regionSize >= averageRegionSize) {
// make this region as multiple MapReduce input split.
int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0);
List<InputSplit> temp = createNInputSplitsUniform(ts, n);
resultList.addAll(temp);
} else {
// if the total size of several small continuous regions less than the average region size,
// combine them into one MapReduce input split.
long totalSize = regionSize;
byte[] splitStartKey = ts.getStartRow();
byte[] splitEndKey = ts.getEndRow();
int j = i + 1;
while (j < splits.size()) {
TableSplit nextRegion = (TableSplit) splits.get(j);
long nextRegionSize = nextRegion.getLength();
if (totalSize + nextRegionSize <= averageRegionSize) {
totalSize = totalSize + nextRegionSize;
splitEndKey = nextRegion.getEndRow();
j++;
} else {
break;
}
}
i = j - 1;
TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation,
encodedRegionName, totalSize);
resultList.add(t);
} }
} }
return resultList;
} }
/** /**
@ -368,161 +529,6 @@ extends InputFormat<ImmutableBytesWritable, Result> {
return hostName; return hostName;
} }
/**
* Calculates the number of MapReduce input splits for the map tasks. The number of
* MapReduce input splits depends on the average region size and the "data skew ratio" user set in
* configuration.
*
* @param list The list of input splits before balance.
* @param context The current job context.
* @param average The average size of all regions .
* @return The list of input splits.
* @throws IOException When creating the list of splits fails.
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
* org.apache.hadoop.mapreduce.JobContext)
*/
public List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context,
long average) throws IOException {
List<InputSplit> resultList = new ArrayList<InputSplit>();
Configuration conf = context.getConfiguration();
//The default data skew ratio is 3
long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
//It determines which mode to use: text key mode or binary key mode. The default is text mode.
boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
long dataSkewThreshold = dataSkewRatio * average;
int count = 0;
while (count < list.size()) {
TableSplit ts = (TableSplit)list.get(count);
TableName tableName = ts.getTable();
String regionLocation = ts.getRegionLocation();
String encodedRegionName = ts.getEncodedRegionName();
long regionSize = ts.getLength();
if (regionSize >= dataSkewThreshold) {
// if the current region size is large than the data skew threshold,
// split the region into two MapReduce input splits.
byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
if (Arrays.equals(ts.getEndRow(), splitKey)) {
// Not splitting since the end key is the same as the split key
resultList.add(ts);
} else {
//Set the size of child TableSplit as 1/2 of the region size. The exact size of the
// MapReduce input splits is not far off.
TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey,
regionLocation, regionSize / 2);
TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation,
regionSize - regionSize / 2);
resultList.add(t1);
resultList.add(t2);
}
count++;
} else if (regionSize >= average) {
// if the region size between average size and data skew threshold size,
// make this region as one MapReduce input split.
resultList.add(ts);
count++;
} else {
// if the total size of several small continuous regions less than the average region size,
// combine them into one MapReduce input split.
long totalSize = regionSize;
byte[] splitStartKey = ts.getStartRow();
byte[] splitEndKey = ts.getEndRow();
count++;
for (; count < list.size(); count++) {
TableSplit nextRegion = (TableSplit)list.get(count);
long nextRegionSize = nextRegion.getLength();
if (totalSize + nextRegionSize <= dataSkewThreshold) {
totalSize = totalSize + nextRegionSize;
splitEndKey = nextRegion.getEndRow();
} else {
break;
}
}
TableSplit t = new TableSplit(table.getName(), scan, splitStartKey, splitEndKey,
regionLocation, encodedRegionName, totalSize);
resultList.add(t);
}
}
return resultList;
}
/**
* select a split point in the region. The selection of the split point is based on an uniform
* distribution assumption for the keys in a region.
* Here are some examples:
*
* <table>
* <tr>
* <th>start key</th>
* <th>end key</th>
* <th>is text</th>
* <th>split point</th>
* </tr>
* <tr>
* <td>'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g'</td>
* <td>'a', 'a', 'a', 'f', 'f', 'f'</td>
* <td>true</td>
* <td>'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51</td>
* </tr>
* <tr>
* <td>'1', '1', '1', '0', '0', '0'</td>
* <td>'1', '1', '2', '5', '7', '9', '0'</td>
* <td>true</td>
* <td>'1', '1', '1', -78, -77, -76, -104</td>
* </tr>
* <tr>
* <td>'1', '1', '1', '0'</td>
* <td>'1', '1', '2', '0'</td>
* <td>true</td>
* <td>'1', '1', '1', -80</td>
* </tr>
* <tr>
* <td>13, -19, 126, 127</td>
* <td>13, -19, 127, 0</td>
* <td>false</td>
* <td>13, -19, 126, -65</td>
* </tr>
* </table>
*
* Set this function as "public static", make it easier for test.
*
* @param start Start key of the region
* @param end End key of the region
* @param isText It determines to use text key mode or binary key mode
* @return The split point in the region.
*/
public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
byte upperLimitByte;
byte lowerLimitByte;
//Use text mode or binary mode.
if (isText) {
//The range of text char set in ASCII is [32,126], the lower limit is space and the upper
// limit is '~'.
upperLimitByte = '~';
lowerLimitByte = ' ';
} else {
upperLimitByte = -1;
lowerLimitByte = 0;
}
// For special case
// Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
// Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~"
if (start.length == 0 && end.length == 0){
return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
}
if (start.length == 0 && end.length != 0){
return new byte[]{ end[0] };
}
if (start.length != 0 && end.length == 0){
byte[] result =new byte[start.length];
result[0]=start[0];
for (int k = 1; k < start.length; k++){
result[k] = upperLimitByte;
}
return result;
}
return Bytes.split(start, end, false, 1)[1];
}
/** /**
* *
* *
@ -532,7 +538,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
* This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
* (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br> * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
* Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
* continuously. In addition to reducing InputSplits, reduces the load on the region server as * continuously. In addition to reducing InputSplits, reduces the load on the region server as
* well, due to the ordering of the keys. * well, due to the ordering of the keys.
* <br> * <br>
* <br> * <br>
@ -570,7 +576,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
} }
return regionLocator; return regionLocator;
} }
/** /**
* Allows subclasses to get the {@link Table}. * Allows subclasses to get the {@link Table}.
*/ */
@ -598,7 +604,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
* retreiving an Admin interface to the HBase cluster. * retreiving an Admin interface to the HBase cluster.
* *
* @param table The table to get the data from. * @param table The table to get the data from.
* @throws IOException * @throws IOException
* @deprecated Use {@link #initializeTable(Connection, TableName)} instead. * @deprecated Use {@link #initializeTable(Connection, TableName)} instead.
*/ */
@Deprecated @Deprecated
@ -629,8 +635,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
* Allows subclasses to initialize the table information. * Allows subclasses to initialize the table information.
* *
* @param connection The {@link Connection} to the HBase cluster. MUST be unmanaged. We will close. * @param connection The {@link Connection} to the HBase cluster. MUST be unmanaged. We will close.
* @param tableName The {@link TableName} of the table to process. * @param tableName The {@link TableName} of the table to process.
* @throws IOException * @throws IOException
*/ */
protected void initializeTable(Connection connection, TableName tableName) throws IOException { protected void initializeTable(Connection connection, TableName tableName) throws IOException {
if (this.table != null || this.connection != null) { if (this.table != null || this.connection != null) {
@ -671,7 +677,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
protected void setTableRecordReader(TableRecordReader tableRecordReader) { protected void setTableRecordReader(TableRecordReader tableRecordReader) {
this.tableRecordReader = tableRecordReader; this.tableRecordReader = tableRecordReader;
} }
/** /**
* Handle subclass specific set up. * Handle subclass specific set up.
* Each of the entry points used by the MapReduce framework, * Each of the entry points used by the MapReduce framework,

View File

@ -100,12 +100,7 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase {
} }
/** /**
* Tests a MR scan using specific number of mappers. The test table has 25 regions, * Tests a MR scan using specific number of mappers. The test table has 26 regions,
* and all region sizes are set as 0 as default. The average region size is 1 (the smallest
* positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into two
* MapRedcue input splits, the number of MR input splits should be 50; when we set hbase
* .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average region
* size, all regions will be combined into 1 MapRedcue input split.
* *
* @throws IOException * @throws IOException
* @throws ClassNotFoundException * @throws ClassNotFoundException
@ -113,93 +108,29 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase {
*/ */
@Test @Test
public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException { public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException {
HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME); testNumOfSplits(1, 26);
List<HRegionLocation> locs = table.getRegionLocator().getAllRegionLocations(); testNumOfSplits(3, 78);
testNumOfSplits("-1", locs.size()*2);
table.close();
testNumOfSplits("100", 1);
} }
/** /**
* Tests the getSplitKey() method in TableInputFormatBase.java * Runs a MR to test TIF using specific number of mappers. The test table has 26 regions,
* * @throws InterruptedException
* @throws IOException * @throws IOException
* @throws ClassNotFoundException * @throws ClassNotFoundException
* @throws InterruptedException
*/ */
@Test @Test
public void testGetSplitsPoint() throws IOException, InterruptedException, public void testSpecifiedNumOfMappersMR()
ClassNotFoundException { throws InterruptedException, IOException, ClassNotFoundException {
byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' }; testNumOfSplitsMR(2, 52);
byte[] end1 = { 'a', 'a', 'a', 'f', 'f' }; testNumOfSplitsMR(4, 104);
byte[] splitPoint1 = { 'a', 'a', 'a', 'd', 'd', -78, 50, -77 }; }
testGetSplitKey(start1, end1, splitPoint1, true);
byte[] start2 = { '1', '1', '1', '0', '0', '0' }; /**
byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' }; * Test if autoBalance create correct splits
byte[] splitPoint2 = { '1', '1', '1', -78, -77, -76, -104 }; * @throws IOException
testGetSplitKey(start2, end2, splitPoint2, true); */
@Test
byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' }; public void testAutoBalanceSplits() throws IOException {
byte[] end3 = { 'a', 'a', 'b' }; testAutobalanceNumOfSplit();
byte[] splitPoint3 = { 'a', 'a', 'a', -80, -80, -80 };
testGetSplitKey(start3, end3, splitPoint3, true);
byte[] start4 = { 'a', 'a', 'a' };
byte[] end4 = { 'a', 'a', 'a', 'z' };
byte[] splitPoint4 = { 'a', 'a', 'a', '=' };
testGetSplitKey(start4, end4, splitPoint4, true);
byte[] start5 = { 'a', 'a', 'a' };
byte[] end5 = { 'a', 'a', 'b', 'a' };
byte[] splitPoint5 = { 'a', 'a', 'a', -80 };
testGetSplitKey(start5, end5, splitPoint5, true);
// Test Case 6: empty key and "hhhqqqwww", split point is "h"
byte[] start6 = {};
byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' };
byte[] splitPointText6 = { 'h' };
byte[] splitPointBinary6 = { 104 };
testGetSplitKey(start6, end6, splitPointText6, true);
testGetSplitKey(start6, end6, splitPointBinary6, false);
// Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text key or
// binary key).
byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' };
byte[] end7 = {};
byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~' };
byte[] splitPointBinary7 = { 'f', -1, -1, -1, -1, -1, -1 };
testGetSplitKey(start7, end7, splitPointText7, true);
testGetSplitKey(start7, end7, splitPointBinary7, false);
// Test Case 8: both start key and end key are empty. Split point depends on the mode we
// choose (text key or binary key).
byte[] start8 = {};
byte[] end8 = {};
byte[] splitPointText8 = { 'O' };
byte[] splitPointBinary8 = { 0 };
testGetSplitKey(start8, end8, splitPointText8, true);
testGetSplitKey(start8, end8, splitPointBinary8, false);
// Test Case 9: Binary Key example
byte[] start9 = { 13, -19, 126, 127 };
byte[] end9 = { 13, -19, 127, 0 };
byte[] splitPoint9 = { 13, -19, 126, -65 };
testGetSplitKey(start9, end9, splitPoint9, false);
// Test Case 10: Binary key split when the start key is an unsigned byte and the end byte is a
// signed byte
byte[] start10 = { 'x' };
byte[] end10 = { -128 };
byte[] splitPoint10 = { '|' };
testGetSplitKey(start10, end10, splitPoint10, false);
// Test Case 11: Binary key split when the start key is an signed byte and the end byte is a
// signed byte
byte[] start11 = { -100 };
byte[] end11 = { -90 };
byte[] splitPoint11 = { -95 };
testGetSplitKey(start11, end11, splitPoint11, false);
} }
} }

View File

@ -19,13 +19,11 @@
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.*;
import java.util.Locale;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -39,10 +37,13 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -66,6 +67,7 @@ public abstract class TestTableInputFormatScanBase {
static final byte[] TABLE_NAME = Bytes.toBytes("scantest"); static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")};
static final String KEY_STARTROW = "startRow"; static final String KEY_STARTROW = "startRow";
static final String KEY_LASTROW = "stpRow"; static final String KEY_LASTROW = "stpRow";
@ -245,40 +247,86 @@ public abstract class TestTableInputFormatScanBase {
/** /**
* Tests a MR scan using data skew auto-balance * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX
* This test does not run MR job
* *
* @throws IOException * @throws IOException
* @throws ClassNotFoundException * @throws ClassNotFoundException
* @throws InterruptedException * @throws InterruptedException
*/ */
public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException, public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws IOException,
InterruptedException, InterruptedException,
ClassNotFoundException { ClassNotFoundException {
String jobName = "TestJobForNumOfSplits"; String jobName = "TestJobForNumOfSplits";
LOG.info("Before map/reduce startup - job " + jobName); LOG.info("Before map/reduce startup - job " + jobName);
Configuration c = new Configuration(TEST_UTIL.getConfiguration()); Configuration c = new Configuration(TEST_UTIL.getConfiguration());
Scan scan = new Scan(); Scan scan = new Scan();
scan.addFamily(INPUT_FAMILY); scan.addFamily(INPUT_FAMILY);
c.set("hbase.mapreduce.input.autobalance", "true"); c.setInt("hbase.mapreduce.input.mappers.per.region", splitsPerRegion);
c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
c.set(KEY_STARTROW, ""); c.set(KEY_STARTROW, "");
c.set(KEY_LASTROW, ""); c.set(KEY_LASTROW, "");
Job job = new Job(c, jobName); Job job = new Job(c, jobName);
TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class, TableMapReduceUtil.initTableMapperJob(TableName.valueOf(TABLE_NAME).getNameAsString(), scan, ScanMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
TableInputFormat tif = new TableInputFormat(); TableInputFormat tif = new TableInputFormat();
tif.setConf(job.getConfiguration()); tif.setConf(job.getConfiguration());
Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName()));
List<InputSplit> splits = tif.getSplits(job); List<InputSplit> splits = tif.getSplits(job);
Assert.assertEquals(expectedNumOfSplits, splits.size()); Assert.assertEquals(expectedNumOfSplits, splits.size());
} }
/** /**
* Tests for the getSplitKey() method in TableInputFormatBase.java * Run MR job to check the number of mapper = expectedNumOfSplits
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/ */
public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) { public void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) throws IOException,
byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText); InterruptedException,
Assert.assertArrayEquals(splitKey, result); ClassNotFoundException {
String jobName = "TestJobForNumOfSplits-MR";
LOG.info("Before map/reduce startup - job " + jobName);
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
Scan scan = new Scan();
scan.addFamily(INPUT_FAMILY);
c.setInt("hbase.mapreduce.input.mappers.per.region", splitsPerRegion);
Job job = new Job(c, jobName);
TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
job.setReducerClass(ScanReducer.class);
job.setNumReduceTasks(1);
job.setOutputFormatClass(NullOutputFormat.class);
assertTrue("job failed!", job.waitForCompletion(true));
// for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
// we use TaskCounter.SHUFFLED_MAPS to get total launched maps
assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits,
job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue());
}
/**
* Run MR job to test autobalance for setting number of mappers for TIF
* This does not run real MR job
*/
public void testAutobalanceNumOfSplit() throws IOException {
// set up splits for testing
List<InputSplit> splits = new ArrayList<>(5);
int[] regionLen = {100, 200, 200, 400, 600};
for (int i = 0; i < 5; i++) {
InputSplit split = new TableSplit(TableName.valueOf(TABLE_NAME), new Scan(),
Bytes.toBytes(i), Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576);
splits.add(split);
}
TableInputFormat tif = new TableInputFormat();
List<InputSplit> res = tif.calculateAutoBalancedSplits(splits, 1073741824);
assertEquals("Saw the wrong number of splits", 5, res.size());
TableSplit ts1 = (TableSplit) res.get(0);
assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow()));
TableSplit ts2 = (TableSplit) res.get(1);
assertEquals("The second split regionsize should be", 200 * 1048576, ts2.getLength());
TableSplit ts3 = (TableSplit) res.get(2);
assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow()));
TableSplit ts4 = (TableSplit) res.get(4);
assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow()));
} }
} }

View File

@ -392,9 +392,7 @@ public class TestNamespaceAuditor {
assertEquals(initialRegions, hris.size()); assertEquals(initialRegions, hris.size());
Collections.sort(hris); Collections.sort(hris);
// verify that we cannot split // verify that we cannot split
HRegionInfo hriToSplit2 = hris.get(1); ADMIN.split(tableTwo, Bytes.toBytes("6"));
ADMIN.split(tableTwo,
TableInputFormatBase.getSplitKey(hriToSplit2.getStartKey(), hriToSplit2.getEndKey(), true));
waitForMergeToComplete(tableTwo, encodedRegionNamesToMerge); waitForMergeToComplete(tableTwo, encodedRegionNamesToMerge);
assertEquals(initialRegions, ADMIN.getTableRegions(tableTwo).size()); assertEquals(initialRegions, ADMIN.getTableRegions(tableTwo).size());
} }