diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index bee4926ca6b..53eb9f40dca 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -70,7 +70,7 @@ public class TableSnapshotInputFormatImpl { // key for specifying the root dir of the restored snapshot protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; - /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */ + /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution, int)} */ private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier"; private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f; @@ -86,6 +86,19 @@ public class TableSnapshotInputFormatImpl { */ public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region"; + /** + * Whether to calculate the block location for splits. Default to true. + * If the computing layer runs outside of HBase cluster, the block locality does not master. + * Setting this value to false could skip the calculation and save some time. + * + * Set access modifier to "public" so that these could be accessed by test classes of + * both org.apache.hadoop.hbase.mapred + * and org.apache.hadoop.hbase.mapreduce. + */ + public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY = + "hbase.TableSnapshotInputFormat.locality.enabled"; + public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true; + /** * Implementation class for InputSplit logic common between mapred and mapreduce. */ @@ -356,6 +369,9 @@ public class TableSnapshotInputFormatImpl { Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); + boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, + SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); + List splits = new ArrayList<>(); for (HRegionInfo hri : regionManifests) { // load region descriptor @@ -365,36 +381,42 @@ public class TableSnapshotInputFormatImpl { for (int i = 0; i < sp.length - 1; i++) { if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1])) { - // compute HDFS locations from snapshot files (which will get the locations for - // referred hfiles) - List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + List hosts = + calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled); - int len = Math.min(3, hosts.size()); - hosts = hosts.subList(0, len); Scan boundedScan = new Scan(scan); boundedScan.setStartRow(sp[i]); boundedScan.setStopRow(sp[i + 1]); + splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); } } } else { if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), - hri.getStartKey(), hri.getEndKey())) { - // compute HDFS locations from snapshot files (which will get the locations for - // referred hfiles) - List hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); - - int len = Math.min(3, hosts.size()); - hosts = hosts.subList(0, len); + hri.getStartKey(), hri.getEndKey())) { + List hosts = + calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled); splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); } } } return splits; + } + /** + * Compute block locations for snapshot files (which will get the locations for referred hfiles) + * only when localityEnabled is true. + */ + private static List calculateLocationsForInputSplit(Configuration conf, + TableDescriptor htd, HRegionInfo hri, Path tableDir, boolean localityEnabled) + throws IOException { + if (localityEnabled) { // care block locality + return getBestLocations(conf, + HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); + } else { // do not care block locality + return null; + } } /** @@ -408,30 +430,41 @@ public class TableSnapshotInputFormatImpl { * we are doing a simple heuristic, where we will pass all hosts which have at least 80% * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top * host with the best locality. + * Return at most numTopsAtMost locations if there are more than that. */ - public static List getBestLocations( - Configuration conf, HDFSBlocksDistribution blockDistribution) { - List locations = new ArrayList<>(3); - + private static List getBestLocations(Configuration conf, + HDFSBlocksDistribution blockDistribution, int numTopsAtMost) { HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); - if (hostAndWeights.length == 0) { - return locations; + if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is + return null; } + if (numTopsAtMost < 1) { // invalid if numTopsAtMost < 1, correct it to be 1 + numTopsAtMost = 1; + } + int top = Math.min(numTopsAtMost, hostAndWeights.length); + List locations = new ArrayList<>(top); HostAndWeight topHost = hostAndWeights[0]; locations.add(topHost.getHost()); - // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality + if (top == 1) { // only care about the top host + return locations; + } + + // When top >= 2, + // do the heuristic: filter all hosts which have at least cutoffMultiplier % of block locality double cutoffMultiplier = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); double filterWeight = topHost.getWeight() * cutoffMultiplier; - for (int i = 1; i < hostAndWeights.length; i++) { + for (int i = 1; i <= top - 1; i++) { if (hostAndWeights[i].getWeight() >= filterWeight) { locations.add(hostAndWeights[i].getHost()); } else { + // As hostAndWeights is in descending order, + // we could break the loop as long as we meet a weight which is less than filterWeight. break; } } @@ -439,6 +472,12 @@ public class TableSnapshotInputFormatImpl { return locations; } + public static List getBestLocations(Configuration conf, + HDFSBlocksDistribution blockDistribution) { + // 3 nodes will contain highly local blocks. So default to 3. + return getBestLocations(conf, blockDistribution, 3); + } + private static String getSnapshotName(Configuration conf) { String snapshotName = conf.get(SNAPSHOT_NAME_KEY); if (snapshotName == null) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java index be36b6a6513..8b4e9185856 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.mapred; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; import static org.mockito.Mockito.mock; import org.apache.hadoop.fs.Path; @@ -138,7 +139,10 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Test @Override public void testWithMockedMapReduceMultiRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10); + testWithMockedMapReduce( + UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true); + // It does not matter whether true or false is given to setLocalityEnabledTo, + // because it is not read in testWithMockedMapReduce(). } @Test @@ -165,7 +169,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Override protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception { + int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) + throws Exception { setupCluster(); final TableName tableName = TableName.valueOf(name.getMethodName()); try { @@ -173,6 +178,9 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); JobConf job = new JobConf(util.getConfiguration()); + // setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that + // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified + // and the default value is taken. Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); if (numSplitsPerRegion > 1) { @@ -206,10 +214,25 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + // SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified, + // so the default value is taken. + boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; + for (int i = 0; i < splits.length; i++) { // validate input split InputSplit split = splits[i]; Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); + if (localityEnabled) { + // When localityEnabled is true, meant to verify split.getLocations() + // by the following statement: + // Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); + // However, getLocations() of some splits could return an empty array (length is 0), + // so drop the verification on length. + // TODO: investigate how to verify split.getLocations() when localityEnabled is true + Assert.assertTrue(split.getLocations() != null); + } else { + Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); + } // validate record reader OutputCollector collector = mock(OutputCollector.class); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java index 362dca1963a..4e11275b232 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java @@ -78,7 +78,8 @@ public abstract class TableSnapshotInputFormatTestBase { } protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception; + int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) + throws Exception; protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits, @@ -90,12 +91,12 @@ public abstract class TableSnapshotInputFormatTestBase { @Test public void testWithMockedMapReduceSingleRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1); + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1, true); } @Test public void testWithMockedMapReduceMultiRegion() throws Exception { - testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8); + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8, false); } @Test diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 890eb2fe11a..2ed6081e31b 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.mapreduce; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -98,7 +101,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa Configuration conf = UTIL.getConfiguration(); HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution(); - Assert.assertEquals(Lists.newArrayList(), + Assert.assertEquals(null, TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1); @@ -132,7 +135,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6); blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9); - Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"), + Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"), TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); } @@ -210,14 +213,17 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa @Override public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception { + int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) + throws Exception { setupCluster(); final TableName tableName = TableName.valueOf(name.getMethodName()); try { createTableAndSnapshot( util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); - Job job = new Job(util.getConfiguration()); + Configuration conf = util.getConfiguration(); + conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo); + Job job = new Job(conf); Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan @@ -304,10 +310,19 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + boolean localityEnabled = + job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, + SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); + for (int i = 0; i < splits.size(); i++) { // validate input split InputSplit split = splits.get(i); Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + if (localityEnabled) { + Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); + } else { + Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); + } // validate record reader TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);