diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 148f367135a..661b981e706 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -99,23 +100,25 @@ import org.apache.hadoop.util.StringUtils; * setScan(scan); * } * } + * + * The number of InputSplits(mappers) match the number of regions in a table by default. + * Set "hbase.mapreduce.input.mappers.per.region" to specify how many mappers per region, set + * this property will disable autobalance below. + * + * Set "hbase.mapreduce.input.autobalance" to enable autobalance, hbase will assign mappers based on + * average region size; For regions, whose size larger than average region size may assigned more mappers, + * and for continuous small one, they may group together to use one mapper. If actual calculated average + * region size is too big, it is not good to only assign 1 mapper for those large regions. Then use + * "hbase.mapreduce.input.average.regionsize" to set max average region size when enable "autobalanece", + * default was average region size is 8G. * + * */ @InterfaceAudience.Public @InterfaceStability.Stable public abstract class TableInputFormatBase extends InputFormat { - /** Specify if we enable auto-balance for input in M/R jobs.*/ - public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance"; - /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce - * .input.autobalance property.*/ - public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" + - ".maxskewratio"; - /** Specify if the row key in table is text (ASCII between 32~126), - * default is true. False means the table is using binary row key*/ - public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey"; - private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); private static final String NOT_INITIALIZED = "The input format instance has not been properly " + @@ -125,6 +128,14 @@ extends InputFormat { " previous error. Please look at the previous logs lines from" + " the task's full log for more details."; + /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */ + public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance"; + /** In auto-balance, we split input by ave region size, if calculated region size is too big, we can set it. */ + public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.input.average.regionsize"; + + /** Set the number of Mappers for each region, all regions have same number of Mappers */ + public static final String NUM_MAPPERS_PER_REGION = "hbase.mapreduce.input.mappers.per.region"; + /** Holds the details for the internal scanner. * * @see Scan */ @@ -140,7 +151,7 @@ extends InputFormat { /** The underlying {@link Connection} of the table. */ private Connection connection; - + /** The reverse DNS lookup cache mapping: IPAddress => HostName */ private HashMap reverseDNSCacheMap = new HashMap(); @@ -252,28 +263,68 @@ extends InputFormat { } catch (IllegalStateException exception) { throw new IOException(INITIALIZATION_ERROR, exception); } - try { - RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, admin); + List splits = oneInputSplitPerRegion(); + + // set same number of mappers for each region + if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) { + int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1); + List res = new ArrayList<>(); + for (int i = 0; i < splits.size(); i++) { + List tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion); + res.addAll(tmp); + } + return res; + } + + //The default value of "hbase.mapreduce.input.autobalance" is false. + if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false) != false) { + long maxAveRegionSize = context.getConfiguration().getInt(MAX_AVERAGE_REGION_SIZE, 8*1073741824); + return calculateAutoBalancedSplits(splits, maxAveRegionSize); + } + + // return one mapper per region + return splits; + } catch (NamingException e) { + throw new IOException(e); + } finally { + if (closeOnFinish) { + closeTable(); + } + } + } + + /** + * Create one InputSplit per region + * + * @return The list of InputSplit for all the regions + * @throws IOException + */ + private List oneInputSplitPerRegion() throws IOException, NamingException { + RegionSizeCalculator sizeCalculator = + new RegionSizeCalculator(getRegionLocator(), getAdmin()); + + TableName tableName = getTable().getName(); Pair keys = getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + HRegionLocation regLoc = + getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); if (null == regLoc) { throw new IOException("Expecting at least one region."); } - List splits = new ArrayList(1); + List splits = new ArrayList<>(1); long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); - TableSplit split = new TableSplit(table.getName(), scan, + TableSplit split = new TableSplit(tableName, scan, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc - .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); + .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); splits.add(split); return splits; } - List splits = new ArrayList(keys.getFirst().length); + List splits = new ArrayList<>(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { - if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } @@ -283,16 +334,16 @@ extends InputFormat { if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && (stopRow.length == 0 || - Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { byte[] splitStart = startRow.length == 0 || - Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? + Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys.getFirst()[i] : startRow; byte[] splitStop = (stopRow.length == 0 || - Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && - keys.getSecond()[i].length > 0 ? + Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && + keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; - HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false); + HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); // The below InetSocketAddress creation does a name resolution. InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); if (isa.isUnresolved()) { @@ -300,49 +351,159 @@ extends InputFormat { } InetAddress regionAddress = isa.getAddress(); String regionLocation; - try { - regionLocation = reverseDNS(regionAddress); - } catch (NamingException e) { - LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e); - regionLocation = location.getHostname(); - } + regionLocation = reverseDNS(regionAddress); byte[] regionName = location.getRegionInfo().getRegionName(); String encodedRegionName = location.getRegionInfo().getEncodedName(); long regionSize = sizeCalculator.getRegionSize(regionName); - TableSplit split = new TableSplit(table.getName(), scan, - splitStart, splitStop, regionLocation, encodedRegionName, regionSize); + TableSplit split = new TableSplit(tableName, scan, + splitStart, splitStop, regionLocation, encodedRegionName, regionSize); splits.add(split); if (LOG.isDebugEnabled()) { LOG.debug("getSplits: split -> " + i + " -> " + split); } } } - //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled. - boolean enableAutoBalance = context.getConfiguration().getBoolean( - MAPREDUCE_INPUT_AUTOBALANCE, false); - if (enableAutoBalance) { - long totalRegionSize=0; - for (int i = 0; i < splits.size(); i++){ - TableSplit ts = (TableSplit)splits.get(i); - totalRegionSize += ts.getLength(); + return splits; + } + + /** + * Create n splits for one InputSplit, For now only support uniform distribution + * @param split A TableSplit corresponding to a range of rowkeys + * @param n Number of ranges after splitting. Pass 1 means no split for the range + * Pass 2 if you want to split the range in two; + * @return A list of TableSplit, the size of the list is n + * @throws IllegalArgumentIOException + */ + protected List createNInputSplitsUniform(InputSplit split, int n) + throws IllegalArgumentIOException { + if (split == null || !(split instanceof TableSplit)) { + throw new IllegalArgumentIOException( + "InputSplit for CreateNSplitsPerRegion can not be null + " + + "and should be instance of TableSplit"); + } + //if n < 1, then still continue using n = 1 + n = n < 1 ? 1 : n; + List res = new ArrayList<>(n); + if (n == 1) { + res.add(split); + return res; + } + + // Collect Region related information + TableSplit ts = (TableSplit) split; + TableName tableName = ts.getTable(); + String regionLocation = ts.getRegionLocation(); + String encodedRegionName = ts.getEncodedRegionName(); + long regionSize = ts.getLength(); + byte[] startRow = ts.getStartRow(); + byte[] endRow = ts.getEndRow(); + + // For special case: startRow or endRow is empty + if (startRow.length == 0 && endRow.length == 0){ + startRow = new byte[1]; + endRow = new byte[1]; + startRow[0] = 0; + endRow[0] = -1; + } + if (startRow.length == 0 && endRow.length != 0){ + startRow = new byte[1]; + startRow[0] = 0; + } + if (startRow.length != 0 && endRow.length == 0){ + endRow =new byte[startRow.length]; + for (int k = 0; k < startRow.length; k++){ + endRow[k] = -1; } - long averageRegionSize = totalRegionSize / splits.size(); - // the averageRegionSize must be positive. - if (averageRegionSize <= 0) { - LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " + - "set it to 1."); - averageRegionSize = 1; - } - return calculateRebalancedSplits(splits, context, averageRegionSize); - } else { + } + + // Split Region into n chunks evenly + byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1); + for (int i = 0; i < splitKeys.length - 1; i++) { + //notice that the regionSize parameter may be not very accurate + TableSplit tsplit = + new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation, + encodedRegionName, regionSize / n); + res.add(tsplit); + } + return res; + } + /** + * Calculates the number of MapReduce input splits for the map tasks. The number of + * MapReduce input splits depends on the average region size. + * Make it 'public' for testing + * + * @param splits The list of input splits before balance. + * @param maxAverageRegionSize max Average region size for one mapper + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( + *org.apache.hadoop.mapreduce.JobContext) + */ + public List calculateAutoBalancedSplits(List splits, long maxAverageRegionSize) + throws IOException { + if (splits.size() == 0) { return splits; } - } finally { - if (closeOnFinish) { - closeTable(); + List resultList = new ArrayList<>(); + long totalRegionSize = 0; + for (int i = 0; i < splits.size(); i++) { + TableSplit ts = (TableSplit) splits.get(i); + totalRegionSize += ts.getLength(); + } + long averageRegionSize = totalRegionSize / splits.size(); + // totalRegionSize might be overflow, and the averageRegionSize must be positive. + if (averageRegionSize <= 0) { + LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " + + "set it to Long.MAX_VALUE " + splits.size()); + averageRegionSize = Long.MAX_VALUE / splits.size(); + } + //if averageRegionSize is too big, change it to default as 8 GB, + if (averageRegionSize > maxAverageRegionSize) { + averageRegionSize = maxAverageRegionSize; + } + // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' region + // set default as 64M = (default hdfs block size); + if (averageRegionSize < 64 * 1048576) { + return splits; + } + for (int i = 0; i < splits.size(); i++) { + TableSplit ts = (TableSplit) splits.get(i); + TableName tableName = ts.getTable(); + String regionLocation = ts.getRegionLocation(); + String encodedRegionName = ts.getEncodedRegionName(); + long regionSize = ts.getLength(); + + if (regionSize >= averageRegionSize) { + // make this region as multiple MapReduce input split. + int n = (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0); + List temp = createNInputSplitsUniform(ts, n); + resultList.addAll(temp); + } else { + // if the total size of several small continuous regions less than the average region size, + // combine them into one MapReduce input split. + long totalSize = regionSize; + byte[] splitStartKey = ts.getStartRow(); + byte[] splitEndKey = ts.getEndRow(); + int j = i + 1; + while (j < splits.size()) { + TableSplit nextRegion = (TableSplit) splits.get(j); + long nextRegionSize = nextRegion.getLength(); + if (totalSize + nextRegionSize <= averageRegionSize) { + totalSize = totalSize + nextRegionSize; + splitEndKey = nextRegion.getEndRow(); + j++; + } else { + break; + } + } + i = j - 1; + TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation, + encodedRegionName, totalSize); + resultList.add(t); } } + return resultList; } /** @@ -368,161 +529,6 @@ extends InputFormat { return hostName; } - /** - * Calculates the number of MapReduce input splits for the map tasks. The number of - * MapReduce input splits depends on the average region size and the "data skew ratio" user set in - * configuration. - * - * @param list The list of input splits before balance. - * @param context The current job context. - * @param average The average size of all regions . - * @return The list of input splits. - * @throws IOException When creating the list of splits fails. - * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( - * org.apache.hadoop.mapreduce.JobContext) - */ - public List calculateRebalancedSplits(List list, JobContext context, - long average) throws IOException { - List resultList = new ArrayList(); - Configuration conf = context.getConfiguration(); - //The default data skew ratio is 3 - long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3); - //It determines which mode to use: text key mode or binary key mode. The default is text mode. - boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true); - long dataSkewThreshold = dataSkewRatio * average; - int count = 0; - while (count < list.size()) { - TableSplit ts = (TableSplit)list.get(count); - TableName tableName = ts.getTable(); - String regionLocation = ts.getRegionLocation(); - String encodedRegionName = ts.getEncodedRegionName(); - long regionSize = ts.getLength(); - if (regionSize >= dataSkewThreshold) { - // if the current region size is large than the data skew threshold, - // split the region into two MapReduce input splits. - byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey); - if (Arrays.equals(ts.getEndRow(), splitKey)) { - // Not splitting since the end key is the same as the split key - resultList.add(ts); - } else { - //Set the size of child TableSplit as 1/2 of the region size. The exact size of the - // MapReduce input splits is not far off. - TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey, - regionLocation, regionSize / 2); - TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation, - regionSize - regionSize / 2); - resultList.add(t1); - resultList.add(t2); - } - count++; - } else if (regionSize >= average) { - // if the region size between average size and data skew threshold size, - // make this region as one MapReduce input split. - resultList.add(ts); - count++; - } else { - // if the total size of several small continuous regions less than the average region size, - // combine them into one MapReduce input split. - long totalSize = regionSize; - byte[] splitStartKey = ts.getStartRow(); - byte[] splitEndKey = ts.getEndRow(); - count++; - for (; count < list.size(); count++) { - TableSplit nextRegion = (TableSplit)list.get(count); - long nextRegionSize = nextRegion.getLength(); - if (totalSize + nextRegionSize <= dataSkewThreshold) { - totalSize = totalSize + nextRegionSize; - splitEndKey = nextRegion.getEndRow(); - } else { - break; - } - } - TableSplit t = new TableSplit(table.getName(), scan, splitStartKey, splitEndKey, - regionLocation, encodedRegionName, totalSize); - resultList.add(t); - } - } - return resultList; - } - - /** - * select a split point in the region. The selection of the split point is based on an uniform - * distribution assumption for the keys in a region. - * Here are some examples: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
start keyend keyis textsplit point
'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g''a', 'a', 'a', 'f', 'f', 'f'true'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51
'1', '1', '1', '0', '0', '0''1', '1', '2', '5', '7', '9', '0'true'1', '1', '1', -78, -77, -76, -104
'1', '1', '1', '0''1', '1', '2', '0'true'1', '1', '1', -80
13, -19, 126, 12713, -19, 127, 0false13, -19, 126, -65
- * - * Set this function as "public static", make it easier for test. - * - * @param start Start key of the region - * @param end End key of the region - * @param isText It determines to use text key mode or binary key mode - * @return The split point in the region. - */ - public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) { - byte upperLimitByte; - byte lowerLimitByte; - //Use text mode or binary mode. - if (isText) { - //The range of text char set in ASCII is [32,126], the lower limit is space and the upper - // limit is '~'. - upperLimitByte = '~'; - lowerLimitByte = ' '; - } else { - upperLimitByte = -1; - lowerLimitByte = 0; - } - // For special case - // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h" - // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~" - if (start.length == 0 && end.length == 0){ - return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)}; - } - if (start.length == 0 && end.length != 0){ - return new byte[]{ end[0] }; - } - if (start.length != 0 && end.length == 0){ - byte[] result =new byte[start.length]; - result[0]=start[0]; - for (int k = 1; k < start.length; k++){ - result[k] = upperLimitByte; - } - return result; - } - return Bytes.split(start, end, false, 1)[1]; - } - /** * * @@ -532,7 +538,7 @@ extends InputFormat { * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, * (and hence, not contributing to the InputSplit), given the start and end keys of the same.
* Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, - * continuously. In addition to reducing InputSplits, reduces the load on the region server as + * continuously. In addition to reducing InputSplits, reduces the load on the region server as * well, due to the ordering of the keys. *
*
@@ -570,7 +576,7 @@ extends InputFormat { } return regionLocator; } - + /** * Allows subclasses to get the {@link Table}. */ @@ -598,7 +604,7 @@ extends InputFormat { * retreiving an Admin interface to the HBase cluster. * * @param table The table to get the data from. - * @throws IOException + * @throws IOException * @deprecated Use {@link #initializeTable(Connection, TableName)} instead. */ @Deprecated @@ -629,8 +635,8 @@ extends InputFormat { * Allows subclasses to initialize the table information. * * @param connection The {@link Connection} to the HBase cluster. MUST be unmanaged. We will close. - * @param tableName The {@link TableName} of the table to process. - * @throws IOException + * @param tableName The {@link TableName} of the table to process. + * @throws IOException */ protected void initializeTable(Connection connection, TableName tableName) throws IOException { if (this.table != null || this.connection != null) { @@ -671,7 +677,7 @@ extends InputFormat { protected void setTableRecordReader(TableRecordReader tableRecordReader) { this.tableRecordReader = tableRecordReader; } - + /** * Handle subclass specific set up. * Each of the entry points used by the MapReduce framework, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java index 325e4c4ee64..d16456b0dad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java @@ -100,12 +100,7 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase { } /** - * Tests a MR scan using specific number of mappers. The test table has 25 regions, - * and all region sizes are set as 0 as default. The average region size is 1 (the smallest - * positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into two - * MapRedcue input splits, the number of MR input splits should be 50; when we set hbase - * .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average region - * size, all regions will be combined into 1 MapRedcue input split. + * Tests a MR scan using specific number of mappers. The test table has 26 regions, * * @throws IOException * @throws ClassNotFoundException @@ -113,93 +108,29 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase { */ @Test public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException { - HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME); - List locs = table.getRegionLocator().getAllRegionLocations(); - - testNumOfSplits("-1", locs.size()*2); - table.close(); - testNumOfSplits("100", 1); + testNumOfSplits(1, 26); + testNumOfSplits(3, 78); } /** - * Tests the getSplitKey() method in TableInputFormatBase.java - * + * Runs a MR to test TIF using specific number of mappers. The test table has 26 regions, + * @throws InterruptedException * @throws IOException * @throws ClassNotFoundException - * @throws InterruptedException */ @Test - public void testGetSplitsPoint() throws IOException, InterruptedException, - ClassNotFoundException { - byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' }; - byte[] end1 = { 'a', 'a', 'a', 'f', 'f' }; - byte[] splitPoint1 = { 'a', 'a', 'a', 'd', 'd', -78, 50, -77 }; - testGetSplitKey(start1, end1, splitPoint1, true); + public void testSpecifiedNumOfMappersMR() + throws InterruptedException, IOException, ClassNotFoundException { + testNumOfSplitsMR(2, 52); + testNumOfSplitsMR(4, 104); + } - byte[] start2 = { '1', '1', '1', '0', '0', '0' }; - byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' }; - byte[] splitPoint2 = { '1', '1', '1', -78, -77, -76, -104 }; - testGetSplitKey(start2, end2, splitPoint2, true); - - byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' }; - byte[] end3 = { 'a', 'a', 'b' }; - byte[] splitPoint3 = { 'a', 'a', 'a', -80, -80, -80 }; - testGetSplitKey(start3, end3, splitPoint3, true); - - byte[] start4 = { 'a', 'a', 'a' }; - byte[] end4 = { 'a', 'a', 'a', 'z' }; - byte[] splitPoint4 = { 'a', 'a', 'a', '=' }; - testGetSplitKey(start4, end4, splitPoint4, true); - - byte[] start5 = { 'a', 'a', 'a' }; - byte[] end5 = { 'a', 'a', 'b', 'a' }; - byte[] splitPoint5 = { 'a', 'a', 'a', -80 }; - testGetSplitKey(start5, end5, splitPoint5, true); - - // Test Case 6: empty key and "hhhqqqwww", split point is "h" - byte[] start6 = {}; - byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' }; - byte[] splitPointText6 = { 'h' }; - byte[] splitPointBinary6 = { 104 }; - testGetSplitKey(start6, end6, splitPointText6, true); - testGetSplitKey(start6, end6, splitPointBinary6, false); - - // Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text key or - // binary key). - byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' }; - byte[] end7 = {}; - byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~' }; - byte[] splitPointBinary7 = { 'f', -1, -1, -1, -1, -1, -1 }; - testGetSplitKey(start7, end7, splitPointText7, true); - testGetSplitKey(start7, end7, splitPointBinary7, false); - - // Test Case 8: both start key and end key are empty. Split point depends on the mode we - // choose (text key or binary key). - byte[] start8 = {}; - byte[] end8 = {}; - byte[] splitPointText8 = { 'O' }; - byte[] splitPointBinary8 = { 0 }; - testGetSplitKey(start8, end8, splitPointText8, true); - testGetSplitKey(start8, end8, splitPointBinary8, false); - - // Test Case 9: Binary Key example - byte[] start9 = { 13, -19, 126, 127 }; - byte[] end9 = { 13, -19, 127, 0 }; - byte[] splitPoint9 = { 13, -19, 126, -65 }; - testGetSplitKey(start9, end9, splitPoint9, false); - - // Test Case 10: Binary key split when the start key is an unsigned byte and the end byte is a - // signed byte - byte[] start10 = { 'x' }; - byte[] end10 = { -128 }; - byte[] splitPoint10 = { '|' }; - testGetSplitKey(start10, end10, splitPoint10, false); - - // Test Case 11: Binary key split when the start key is an signed byte and the end byte is a - // signed byte - byte[] start11 = { -100 }; - byte[] end11 = { -90 }; - byte[] splitPoint11 = { -95 }; - testGetSplitKey(start11, end11, splitPoint11, false); + /** + * Test if autoBalance create correct splits + * @throws IOException + */ + @Test + public void testAutoBalanceSplits() throws IOException { + testAutobalanceNumOfSplit(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index 6521d2dd4f1..b0ae937749c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -19,13 +19,11 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.NavigableMap; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,10 +37,13 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -66,6 +67,7 @@ public abstract class TestTableInputFormatScanBase { static final byte[] TABLE_NAME = Bytes.toBytes("scantest"); static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")}; static final String KEY_STARTROW = "startRow"; static final String KEY_LASTROW = "stpRow"; @@ -245,40 +247,86 @@ public abstract class TestTableInputFormatScanBase { /** - * Tests a MR scan using data skew auto-balance + * Tests Number of inputSplits for MR job when specify number of mappers for TableInputFormatXXX + * This test does not run MR job * * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ - public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException, - InterruptedException, - ClassNotFoundException { + public void testNumOfSplits(int splitsPerRegion, int expectedNumOfSplits) throws IOException, + InterruptedException, + ClassNotFoundException { String jobName = "TestJobForNumOfSplits"; LOG.info("Before map/reduce startup - job " + jobName); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); Scan scan = new Scan(); scan.addFamily(INPUT_FAMILY); - c.set("hbase.mapreduce.input.autobalance", "true"); - c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio); + c.setInt("hbase.mapreduce.input.mappers.per.region", splitsPerRegion); c.set(KEY_STARTROW, ""); c.set(KEY_LASTROW, ""); Job job = new Job(c, jobName); - TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class, - ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + TableMapReduceUtil.initTableMapperJob(TableName.valueOf(TABLE_NAME).getNameAsString(), scan, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); TableInputFormat tif = new TableInputFormat(); tif.setConf(job.getConfiguration()); - Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName())); List splits = tif.getSplits(job); Assert.assertEquals(expectedNumOfSplits, splits.size()); } /** - * Tests for the getSplitKey() method in TableInputFormatBase.java + * Run MR job to check the number of mapper = expectedNumOfSplits + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException */ - public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) { - byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText); - Assert.assertArrayEquals(splitKey, result); + public void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits) throws IOException, + InterruptedException, + ClassNotFoundException { + String jobName = "TestJobForNumOfSplits-MR"; + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + c.setInt("hbase.mapreduce.input.mappers.per.region", splitsPerRegion); + Job job = new Job(c, jobName); + TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); + job.setOutputFormatClass(NullOutputFormat.class); + assertTrue("job failed!", job.waitForCompletion(true)); + // for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS, + // we use TaskCounter.SHUFFLED_MAPS to get total launched maps + assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits, + job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue()); + } + + /** + * Run MR job to test autobalance for setting number of mappers for TIF + * This does not run real MR job + */ + public void testAutobalanceNumOfSplit() throws IOException { + // set up splits for testing + List splits = new ArrayList<>(5); + int[] regionLen = {100, 200, 200, 400, 600}; + for (int i = 0; i < 5; i++) { + InputSplit split = new TableSplit(TableName.valueOf(TABLE_NAME), new Scan(), + Bytes.toBytes(i), Bytes.toBytes(i + 1), "", "", regionLen[i] * 1048576); + splits.add(split); + } + TableInputFormat tif = new TableInputFormat(); + List res = tif.calculateAutoBalancedSplits(splits, 1073741824); + + assertEquals("Saw the wrong number of splits", 5, res.size()); + TableSplit ts1 = (TableSplit) res.get(0); + assertEquals("The first split end key should be", 2, Bytes.toInt(ts1.getEndRow())); + TableSplit ts2 = (TableSplit) res.get(1); + assertEquals("The second split regionsize should be", 200 * 1048576, ts2.getLength()); + TableSplit ts3 = (TableSplit) res.get(2); + assertEquals("The third split start key should be", 3, Bytes.toInt(ts3.getStartRow())); + TableSplit ts4 = (TableSplit) res.get(4); + assertNotEquals("The seventh split start key should not be", 4, Bytes.toInt(ts4.getStartRow())); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java index f5efa3455bd..3af2ca91cc0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java @@ -392,9 +392,7 @@ public class TestNamespaceAuditor { assertEquals(initialRegions, hris.size()); Collections.sort(hris); // verify that we cannot split - HRegionInfo hriToSplit2 = hris.get(1); - ADMIN.split(tableTwo, - TableInputFormatBase.getSplitKey(hriToSplit2.getStartKey(), hriToSplit2.getEndKey(), true)); + ADMIN.split(tableTwo, Bytes.toBytes("6")); waitForMergeToComplete(tableTwo, encodedRegionNamesToMerge); assertEquals(initialRegions, ADMIN.getTableRegions(tableTwo).size()); }