HBASE-26255 Add an option to use region location from meta table in TableSnapshotInputFormat (#3661) (#3668)

Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org>
This commit is contained in:
huaxiangsun 2021-09-09 10:43:36 -07:00 committed by GitHub
parent f670fc5d4d
commit 21d9741adb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 11 deletions

View File

@ -32,9 +32,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.ClientSideRegionScanner; import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
@ -101,6 +105,15 @@ public class TableSnapshotInputFormatImpl {
"hbase.TableSnapshotInputFormat.locality.enabled"; "hbase.TableSnapshotInputFormat.locality.enabled";
public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true; public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;
/**
* Whether to calculate the Snapshot region location by region location from meta.
* It is much faster than computing block locations for splits.
*/
public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION =
"hbase.TableSnapshotInputFormat.locality.by.region.location";
public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT = false;
/** /**
* In some scenario, scan limited rows on each InputSplit for sampling data extraction * In some scenario, scan limited rows on each InputSplit for sampling data extraction
*/ */
@ -392,17 +405,49 @@ public class TableSnapshotInputFormatImpl {
SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT); SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT);
scan.setScanMetricsEnabled(scanMetricsEnabled); scan.setScanMetricsEnabled(scanMetricsEnabled);
boolean useRegionLoc = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
Connection connection = null;
RegionLocator regionLocator = null;
if (localityEnabled && useRegionLoc) {
Configuration newConf = new Configuration(conf);
newConf.setInt("hbase.hconnection.threads.max", 1);
try {
connection = ConnectionFactory.createConnection(newConf);
regionLocator = connection.getRegionLocator(htd.getTableName());
/* Get all locations for the table and cache it */
regionLocator.getAllRegionLocations();
} finally {
if (connection != null) {
connection.close();
}
}
}
List<InputSplit> splits = new ArrayList<>(); List<InputSplit> splits = new ArrayList<>();
for (HRegionInfo hri : regionManifests) { for (HRegionInfo hri : regionManifests) {
// load region descriptor // load region descriptor
List<String> hosts = null;
if (localityEnabled) {
if (regionLocator != null) {
/* Get Location from the local cache */
HRegionLocation
location = regionLocator.getRegionLocation(hri.getStartKey(), false);
hosts = new ArrayList<>(1);
hosts.add(location.getHostname());
} else {
hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir);
}
}
if (numSplits > 1) { if (numSplits > 1) {
byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
for (int i = 0; i < sp.length - 1; i++) { for (int i = 0; i < sp.length - 1; i++) {
if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
sp[i + 1])) { sp[i + 1])) {
List<String> hosts =
calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
Scan boundedScan = new Scan(scan); Scan boundedScan = new Scan(scan);
if (scan.getStartRow().length == 0) { if (scan.getStartRow().length == 0) {
@ -425,8 +470,7 @@ public class TableSnapshotInputFormatImpl {
} else { } else {
if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
hri.getStartKey(), hri.getEndKey())) { hri.getStartKey(), hri.getEndKey())) {
List<String> hosts =
calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
} }
} }
@ -440,14 +484,9 @@ public class TableSnapshotInputFormatImpl {
* only when localityEnabled is true. * only when localityEnabled is true.
*/ */
private static List<String> calculateLocationsForInputSplit(Configuration conf, private static List<String> calculateLocationsForInputSplit(Configuration conf,
TableDescriptor htd, HRegionInfo hri, Path tableDir, boolean localityEnabled) TableDescriptor htd, HRegionInfo hri, Path tableDir)
throws IOException { throws IOException {
if (localityEnabled) { // care block locality return getBestLocations(conf, HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
return getBestLocations(conf,
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
} else { // do not care block locality
return null;
}
} }
/** /**

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -198,6 +200,18 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
} }
} }
@Test
public void testWithMockedMapReduceSingleRegionByRegionLocation() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true);
try {
testWithMockedMapReduce(UTIL, name.getMethodName() + "Snapshot", 1, 1, 1,
true);
} finally {
conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION);
}
}
@Override @Override
public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
String snapshotName, Path tmpTableDir) throws Exception { String snapshotName, Path tmpTableDir) throws Exception {
@ -218,6 +232,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
Configuration conf = util.getConfiguration(); Configuration conf = util.getConfiguration();
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo); conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
Job job = new Job(conf); Job job = new Job(conf);
Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
@ -406,6 +422,9 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
boolean byRegionLoc =
job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
for (int i = 0; i < splits.size(); i++) { for (int i = 0; i < splits.size(); i++) {
// validate input split // validate input split
InputSplit split = splits.get(i); InputSplit split = splits.get(i);
@ -413,6 +432,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
if (localityEnabled) { if (localityEnabled) {
Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
if (byRegionLoc) {
// When it uses region location from meta, the hostname will be "localhost",
// the location from hdfs block location is "127.0.0.1".
Assert.assertEquals(1, split.getLocations().length);
Assert.assertTrue("Not using region location!",
split.getLocations()[0].equals("localhost"));
} else {
Assert.assertTrue("Not using region location!",
split.getLocations()[0].equals("127.0.0.1"));
}
} else { } else {
Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
} }