HBASE-15482 Provide an option to skip calculating block locations for SnapshotInputFormat

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Xiang Li 2017-12-07 01:06:33 +08:00 committed by tedyu
parent 70608acf28
commit 5e7d16a3ce
4 changed files with 110 additions and 32 deletions

View File

@ -70,7 +70,7 @@ public class TableSnapshotInputFormatImpl {
// key for specifying the root dir of the restored snapshot // key for specifying the root dir of the restored snapshot
protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
/** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */ /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution, int)} */
private static final String LOCALITY_CUTOFF_MULTIPLIER = private static final String LOCALITY_CUTOFF_MULTIPLIER =
"hbase.tablesnapshotinputformat.locality.cutoff.multiplier"; "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f; private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
@ -86,6 +86,19 @@ public class TableSnapshotInputFormatImpl {
*/ */
public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region"; public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region";
/**
* Whether to calculate the block location for splits. Default to true.
* If the computing layer runs outside of HBase cluster, the block locality does not master.
* Setting this value to false could skip the calculation and save some time.
*
* Set access modifier to "public" so that these could be accessed by test classes of
* both org.apache.hadoop.hbase.mapred
* and org.apache.hadoop.hbase.mapreduce.
*/
public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY =
"hbase.TableSnapshotInputFormat.locality.enabled";
public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;
/** /**
* Implementation class for InputSplit logic common between mapred and mapreduce. * Implementation class for InputSplit logic common between mapred and mapreduce.
*/ */
@ -356,6 +369,9 @@ public class TableSnapshotInputFormatImpl {
Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
List<InputSplit> splits = new ArrayList<>(); List<InputSplit> splits = new ArrayList<>();
for (HRegionInfo hri : regionManifests) { for (HRegionInfo hri : regionManifests) {
// load region descriptor // load region descriptor
@ -365,36 +381,42 @@ public class TableSnapshotInputFormatImpl {
for (int i = 0; i < sp.length - 1; i++) { for (int i = 0; i < sp.length - 1; i++) {
if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
sp[i + 1])) { sp[i + 1])) {
// compute HDFS locations from snapshot files (which will get the locations for List<String> hosts =
// referred hfiles) calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
List<String> hosts = getBestLocations(conf,
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
int len = Math.min(3, hosts.size());
hosts = hosts.subList(0, len);
Scan boundedScan = new Scan(scan); Scan boundedScan = new Scan(scan);
boundedScan.setStartRow(sp[i]); boundedScan.setStartRow(sp[i]);
boundedScan.setStopRow(sp[i + 1]); boundedScan.setStopRow(sp[i + 1]);
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
} }
} }
} else { } else {
if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
hri.getStartKey(), hri.getEndKey())) { hri.getStartKey(), hri.getEndKey())) {
// compute HDFS locations from snapshot files (which will get the locations for List<String> hosts =
// referred hfiles) calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
List<String> hosts = getBestLocations(conf,
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
int len = Math.min(3, hosts.size());
hosts = hosts.subList(0, len);
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
} }
} }
} }
return splits; return splits;
}
/**
* Compute block locations for snapshot files (which will get the locations for referred hfiles)
* only when localityEnabled is true.
*/
private static List<String> calculateLocationsForInputSplit(Configuration conf,
TableDescriptor htd, HRegionInfo hri, Path tableDir, boolean localityEnabled)
throws IOException {
if (localityEnabled) { // care block locality
return getBestLocations(conf,
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
} else { // do not care block locality
return null;
}
} }
/** /**
@ -408,30 +430,41 @@ public class TableSnapshotInputFormatImpl {
* we are doing a simple heuristic, where we will pass all hosts which have at least 80% * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
* (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
* host with the best locality. * host with the best locality.
* Return at most numTopsAtMost locations if there are more than that.
*/ */
public static List<String> getBestLocations( private static List<String> getBestLocations(Configuration conf,
Configuration conf, HDFSBlocksDistribution blockDistribution) { HDFSBlocksDistribution blockDistribution, int numTopsAtMost) {
List<String> locations = new ArrayList<>(3);
HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
if (hostAndWeights.length == 0) { if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is
return locations; return null;
} }
if (numTopsAtMost < 1) { // invalid if numTopsAtMost < 1, correct it to be 1
numTopsAtMost = 1;
}
int top = Math.min(numTopsAtMost, hostAndWeights.length);
List<String> locations = new ArrayList<>(top);
HostAndWeight topHost = hostAndWeights[0]; HostAndWeight topHost = hostAndWeights[0];
locations.add(topHost.getHost()); locations.add(topHost.getHost());
// Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality if (top == 1) { // only care about the top host
return locations;
}
// When top >= 2,
// do the heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
double cutoffMultiplier double cutoffMultiplier
= conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
double filterWeight = topHost.getWeight() * cutoffMultiplier; double filterWeight = topHost.getWeight() * cutoffMultiplier;
for (int i = 1; i < hostAndWeights.length; i++) { for (int i = 1; i <= top - 1; i++) {
if (hostAndWeights[i].getWeight() >= filterWeight) { if (hostAndWeights[i].getWeight() >= filterWeight) {
locations.add(hostAndWeights[i].getHost()); locations.add(hostAndWeights[i].getHost());
} else { } else {
// As hostAndWeights is in descending order,
// we could break the loop as long as we meet a weight which is less than filterWeight.
break; break;
} }
} }
@ -439,6 +472,12 @@ public class TableSnapshotInputFormatImpl {
return locations; return locations;
} }
public static List<String> getBestLocations(Configuration conf,
HDFSBlocksDistribution blockDistribution) {
// 3 nodes will contain highly local blocks. So default to 3.
return getBestLocations(conf, blockDistribution, 3);
}
private static String getSnapshotName(Configuration conf) { private static String getSnapshotName(Configuration conf) {
String snapshotName = conf.get(SNAPSHOT_NAME_KEY); String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
if (snapshotName == null) { if (snapshotName == null) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.mapred; package org.apache.hadoop.hbase.mapred;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -138,7 +139,10 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Test @Test
@Override @Override
public void testWithMockedMapReduceMultiRegion() throws Exception { public void testWithMockedMapReduceMultiRegion() throws Exception {
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10); testWithMockedMapReduce(
UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10, true);
// It does not matter whether true or false is given to setLocalityEnabledTo,
// because it is not read in testWithMockedMapReduce().
} }
@Test @Test
@ -165,7 +169,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Override @Override
protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception { int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
throws Exception {
setupCluster(); setupCluster();
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
try { try {
@ -173,6 +178,9 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
JobConf job = new JobConf(util.getConfiguration()); JobConf job = new JobConf(util.getConfiguration());
// setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that
// SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified
// and the default value is taken.
Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
if (numSplitsPerRegion > 1) { if (numSplitsPerRegion > 1) {
@ -206,10 +214,25 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
HBaseTestingUtility.SeenRowTracker rowTracker = HBaseTestingUtility.SeenRowTracker rowTracker =
new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
// SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified,
// so the default value is taken.
boolean localityEnabled = SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
for (int i = 0; i < splits.length; i++) { for (int i = 0; i < splits.length; i++) {
// validate input split // validate input split
InputSplit split = splits[i]; InputSplit split = splits[i];
Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit);
if (localityEnabled) {
// When localityEnabled is true, meant to verify split.getLocations()
// by the following statement:
// Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
// However, getLocations() of some splits could return an empty array (length is 0),
// so drop the verification on length.
// TODO: investigate how to verify split.getLocations() when localityEnabled is true
Assert.assertTrue(split.getLocations() != null);
} else {
Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
}
// validate record reader // validate record reader
OutputCollector collector = mock(OutputCollector.class); OutputCollector collector = mock(OutputCollector.class);

View File

@ -78,7 +78,8 @@ public abstract class TableSnapshotInputFormatTestBase {
} }
protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception; int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
throws Exception;
protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits, String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits,
@ -90,12 +91,12 @@ public abstract class TableSnapshotInputFormatTestBase {
@Test @Test
public void testWithMockedMapReduceSingleRegion() throws Exception { public void testWithMockedMapReduceSingleRegion() throws Exception {
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1); testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1, true);
} }
@Test @Test
public void testWithMockedMapReduceMultiRegion() throws Exception { public void testWithMockedMapReduceMultiRegion() throws Exception {
testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8); testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8, false);
} }
@Test @Test

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -98,7 +101,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
Configuration conf = UTIL.getConfiguration(); Configuration conf = UTIL.getConfiguration();
HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution(); HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
Assert.assertEquals(Lists.newArrayList(), Assert.assertEquals(null,
TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1); blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
@ -132,7 +135,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6); blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9); blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"), Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"),
TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
} }
@ -210,14 +213,17 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Override @Override
public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception { int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
throws Exception {
setupCluster(); setupCluster();
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
try { try {
createTableAndSnapshot( createTableAndSnapshot(
util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
Job job = new Job(util.getConfiguration()); Configuration conf = util.getConfiguration();
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
Job job = new Job(conf);
Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
@ -304,10 +310,19 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
HBaseTestingUtility.SeenRowTracker rowTracker = HBaseTestingUtility.SeenRowTracker rowTracker =
new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
boolean localityEnabled =
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
for (int i = 0; i < splits.size(); i++) { for (int i = 0; i < splits.size(); i++) {
// validate input split // validate input split
InputSplit split = splits.get(i); InputSplit split = splits.get(i);
Assert.assertTrue(split instanceof TableSnapshotRegionSplit); Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
if (localityEnabled) {
Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
} else {
Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
}
// validate record reader // validate record reader
TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);