diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 41234673c6a..b22ed0c2066 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -31,6 +31,7 @@ import javax.naming.NamingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HConstants; @@ -91,6 +92,16 @@ import org.apache.hadoop.util.StringUtils; public abstract class TableInputFormatBase extends InputFormat { + /** Specify if we enable auto-balance for input in M/R jobs.*/ + public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance"; + /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce + * .input.autobalance property.*/ + public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" + + ".maxskewratio"; + /** Specify if the row key in table is text (ASCII between 32~126), + * default is true. False means the table is using binary row key*/ + public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey"; + final Log LOG = LogFactory.getLog(TableInputFormatBase.class); /** Holds the details for the internal scanner. @@ -223,7 +234,7 @@ extends InputFormat { } List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { - if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false); @@ -266,7 +277,26 @@ extends InputFormat { } } } - return splits; + //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled. + boolean enableAutoBalance = context.getConfiguration() + .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false); + if (enableAutoBalance) { + long totalRegionSize=0; + for (int i = 0; i < splits.size(); i++){ + TableSplit ts = (TableSplit)splits.get(i); + totalRegionSize += ts.getLength(); + } + long averageRegionSize = totalRegionSize / splits.size(); + // the averageRegionSize must be positive. + if (averageRegionSize <= 0) { + LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " + + "set it to 1."); + averageRegionSize = 1; + } + return calculateRebalancedSplits(splits, context, averageRegionSize); + } else { + return splits; + } } public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException { @@ -288,6 +318,170 @@ extends InputFormat { return hostName; } + /** + * Calculates the number of MapReduce input splits for the map tasks. The number of + * MapReduce input splits depends on the average region size and the "data skew ratio" user set in + * configuration. + * + * @param list The list of input splits before balance. + * @param context The current job context. + * @param average The average size of all regions . + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( + * org.apache.hadoop.mapreduce.JobContext) + */ + public List calculateRebalancedSplits(List list, JobContext context, + long average) throws IOException { + List resultList = new ArrayList(); + Configuration conf = context.getConfiguration(); + //The default data skew ratio is 3 + long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3); + //It determines which mode to use: text key mode or binary key mode. The default is text mode. + boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true); + long dataSkewThreshold = dataSkewRatio * average; + int count = 0; + while (count < list.size()) { + TableSplit ts = (TableSplit)list.get(count); + String regionLocation = ts.getRegionLocation(); + long regionSize = ts.getLength(); + if (regionSize >= dataSkewThreshold) { + // if the current region size is large than the data skew threshold, + // split the region into two MapReduce input splits. + byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey); + //Set the size of child TableSplit as 1/2 of the region size. The exact size of the + // MapReduce input splits is not far off. + TableSplit t1 = new TableSplit(table.getName(), ts.getStartRow(), splitKey, regionLocation, + regionSize / 2); + TableSplit t2 = new TableSplit(table.getName(), splitKey, ts.getEndRow(), regionLocation, + regionSize - regionSize / 2); + resultList.add(t1); + resultList.add(t2); + count++; + } else if (regionSize >= average) { + // if the region size between average size and data skew threshold size, + // make this region as one MapReduce input split. + resultList.add(ts); + count++; + } else { + // if the total size of several small continuous regions less than the average region size, + // combine them into one MapReduce input split. + long totalSize = regionSize; + byte[] splitStartKey = ts.getStartRow(); + byte[] splitEndKey = ts.getEndRow(); + count++; + for (; count < list.size(); count++) { + TableSplit nextRegion = (TableSplit)list.get(count); + long nextRegionSize = nextRegion.getLength(); + if (totalSize + nextRegionSize <= dataSkewThreshold) { + totalSize = totalSize + nextRegionSize; + splitEndKey = nextRegion.getEndRow(); + } else { + break; + } + } + TableSplit t = new TableSplit(table.getName(), splitStartKey, splitEndKey, + regionLocation, totalSize); + resultList.add(t); + } + } + return resultList; + } + + /** + * select a split point in the region. The selection of the split point is based on an uniform + * distribution assumption for the keys in a region. + * Here are some examples: + * startKey: aaabcdefg endKey: aaafff split point: aaad + * startKey: 111000 endKey: 1125790 split point: 111b + * startKey: 1110 endKey: 1120 split point: 111_ + * startKey: binary key { 13, -19, 126, 127 }, endKey: binary key { 13, -19, 127, 0 }, + * split point: binary key { 13, -19, 127, -64 } + * Set this function as "public static", make it easier for test. + * + * @param start Start key of the region + * @param end End key of the region + * @param isText It determines to use text key mode or binary key mode + * @return The split point in the region. + */ + public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) { + byte upperLimitByte; + byte lowerLimitByte; + //Use text mode or binary mode. + if (isText) { + //The range of text char set in ASCII is [32,126], the lower limit is space and the upper + // limit is '~'. + upperLimitByte = '~'; + lowerLimitByte = ' '; + } else { + upperLimitByte = Byte.MAX_VALUE; + lowerLimitByte = Byte.MIN_VALUE; + } + // For special case + // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h" + // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~" + if (start.length == 0 && end.length == 0){ + return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)}; + } + if (start.length == 0 && end.length != 0){ + return new byte[]{ end[0] }; + } + if (start.length != 0 && end.length == 0){ + byte[] result =new byte[start.length]; + result[0]=start[0]; + for (int k = 1; k < start.length; k++){ + result[k] = upperLimitByte; + } + return result; + } + // A list to store bytes in split key + List resultBytesList = new ArrayList(); + int maxLength = start.length > end.length ? start.length : end.length; + for (int i = 0; i < maxLength; i++) { + //calculate the midpoint byte between the first difference + //for example: "11ae" and "11chw", the midpoint is "11b" + //another example: "11ae" and "11bhw", the first different byte is 'a' and 'b', + // there is no midpoint between 'a' and 'b', so we need to check the next byte. + if (start[i] == end[i]) { + resultBytesList.add(start[i]); + //For special case like: startKey="aaa", endKey="aaaz", splitKey="aaaM" + if (i + 1 == start.length) { + resultBytesList.add((byte) ((lowerLimitByte + end[i + 1]) / 2)); + break; + } + } else { + //if the two bytes differ by 1, like ['a','b'], We need to check the next byte to find + // the midpoint. + if ((int)end[i] - (int)start[i] == 1) { + //get next byte after the first difference + byte startNextByte = (i + 1 < start.length) ? start[i + 1] : lowerLimitByte; + byte endNextByte = (i + 1 < end.length) ? end[i + 1] : lowerLimitByte; + int byteRange = (upperLimitByte - startNextByte) + (endNextByte - lowerLimitByte) + 1; + int halfRange = byteRange / 2; + if ((int)startNextByte + halfRange > (int)upperLimitByte) { + resultBytesList.add(end[i]); + resultBytesList.add((byte) (startNextByte + halfRange - upperLimitByte + + lowerLimitByte)); + } else { + resultBytesList.add(start[i]); + resultBytesList.add((byte) (startNextByte + halfRange)); + } + } else { + //calculate the midpoint key by the fist different byte (normal case), + // like "11ae" and "11chw", the midpoint is "11b" + resultBytesList.add((byte) ((start[i] + end[i]) / 2)); + } + break; + } + } + //transform the List of bytes to byte[] + byte result[] = new byte[resultBytesList.size()]; + for (int k = 0; k < resultBytesList.size(); k++) { + result[k] = (byte) resultBytesList.get(k); + } + return result; + } + /** * * diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java index 47cb8344bb2..0503f19acc1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java @@ -97,4 +97,93 @@ public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase { testScan(null, "opp", "opo"); } + /** + * Tests a MR scan using specific number of mappers. The test table has 25 regions, + * and all region sizes are set as 0 as default. The average region size is 1 (the smallest + * positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into two + * MapRedcue input splits, the number of MR input splits should be 50; when we set hbase + * .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average region + * size, all regions will be combined into 1 MapRedcue input split. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException { + testNumOfSplits("-1", 50); + testNumOfSplits("100", 1); + } + + /** + * Tests the getSplitKey() method in TableInputFormatBase.java + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testGetSplitsPoint() throws IOException, InterruptedException, + ClassNotFoundException { + // Test Case 1: "aaabcdef" and "aaaff", split point is "aaad". + byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' }; + byte[] end1 = { 'a', 'a', 'a', 'f', 'f' }; + byte[] splitPoint1 = { 'a', 'a', 'a', 'd' }; + testGetSplitKey(start1, end1, splitPoint1, true); + + // Test Case 2: "111000" and "1125790", split point is "111b". + byte[] start2 = { '1', '1', '1', '0', '0', '0' }; + byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' }; + byte[] splitPoint2 = { '1', '1', '1', 'b' }; + testGetSplitKey(start2, end2, splitPoint2, true); + + // Test Case 3: "aaaaaa" and "aab", split point is "aaap". + byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' }; + byte[] end3 = { 'a', 'a', 'b' }; + byte[] splitPoint3 = { 'a', 'a', 'a', 'p' }; + testGetSplitKey(start3, end3, splitPoint3, true); + + // Test Case 4: "aaa" and "aaaz", split point is "aaaM". + byte[] start4 = { 'a', 'a', 'a' }; + byte[] end4 = { 'a', 'a', 'a', 'z' }; + byte[] splitPoint4 = { 'a', 'a', 'a', 'M' }; + testGetSplitKey(start4, end4, splitPoint4, true); + + // Test Case 5: "aaa" and "aaba", split point is "aaap". + byte[] start5 = { 'a', 'a', 'a' }; + byte[] end5 = { 'a', 'a', 'b', 'a' }; + byte[] splitPoint5 = { 'a', 'a', 'a', 'p' }; + testGetSplitKey(start5, end5, splitPoint5, true); + + // Test Case 6: empty key and "hhhqqqwww", split point is "h" + byte[] start6 = {}; + byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' }; + byte[] splitPoint6 = { 'h' }; + testGetSplitKey(start6, end6, splitPoint6, true); + + // Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text key or + // binary key). + byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' }; + byte[] end7 = {}; + byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~' }; + byte[] splitPointBinary7 = { 'f', 127, 127, 127, 127, 127, 127 }; + testGetSplitKey(start7, end7, splitPointText7, true); + testGetSplitKey(start7, end7, splitPointBinary7, false); + + // Test Case 8: both start key and end key are empty. Split point depends on the mode we + // choose (text key or binary key). + byte[] start8 = {}; + byte[] end8 = {}; + byte[] splitPointText8 = { 'O' }; + byte[] splitPointBinary8 = { 0 }; + testGetSplitKey(start8, end8, splitPointText8, true); + testGetSplitKey(start8, end8, splitPointBinary8, false); + + // Test Case 9: Binary Key example + byte[] start9 = { 13, -19, 126, 127 }; + byte[] end9 = { 13, -19, 127, 0 }; + byte[] splitPoint9 = { 13, -19, 127, -64 }; + testGetSplitKey(start9, end9, splitPoint9, false); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java index 750ea39f293..eb420924678 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -37,12 +39,15 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; + /** *

* Tests various scan start and stop row scenarios. This is set in a scan and @@ -240,5 +245,42 @@ public abstract class TestTableInputFormatScanBase { LOG.info("After map/reduce completion - job " + jobName); } + + /** + * Tests a MR scan using data skew auto-balance + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException, + InterruptedException, + ClassNotFoundException { + String jobName = "TestJobForNumOfSplits"; + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + c.set("hbase.mapreduce.input.autobalance", "true"); + c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio); + c.set(KEY_STARTROW, ""); + c.set(KEY_LASTROW, ""); + Job job = new Job(c, jobName); + TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + TableInputFormat tif = new TableInputFormat(); + tif.setConf(job.getConfiguration()); + Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName())); + List splits = tif.getSplits(job); + Assert.assertEquals(expectedNumOfSplits, splits.size()); + } + + /** + * Tests for the getSplitKey() method in TableInputFormatBase.java + */ + public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) { + byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText); + Assert.assertArrayEquals(splitKey, result); + } }