HBASE-19245 MultiTableInputFormatBase#getSplits creates a Connection per Table

We make one Connection only instead of a Connection per table (Change is
just moving one line it involves right-shifting body of the function)
This commit is contained in:
Michael Stack 2017-11-13 11:42:10 -08:00
parent 7139113fde
commit 3ad300a2b0
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
1 changed files with 50 additions and 48 deletions

View File

@ -178,60 +178,62 @@ public abstract class MultiTableInputFormatBase extends
List<InputSplit> splits = new ArrayList<>(); List<InputSplit> splits = new ArrayList<>();
Iterator iter = tableMaps.entrySet().iterator(); Iterator iter = tableMaps.entrySet().iterator();
while (iter.hasNext()) { // Make a single Connection to the Cluster and use it across all tables.
Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration())) {
TableName tableName = entry.getKey(); while (iter.hasNext()) {
List<Scan> scanList = entry.getValue(); Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
TableName tableName = entry.getKey();
try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration()); List<Scan> scanList = entry.getValue();
Table table = conn.getTable(tableName); try (Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) { RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
regionLocator, conn.getAdmin()); regionLocator, conn.getAdmin());
Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
for (Scan scan : scanList) { for (Scan scan : scanList) {
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new IOException("Expecting at least one region for table : " throw new IOException("Expecting at least one region for table : "
+ tableName.getNameAsString()); + tableName.getNameAsString());
}
int count = 0;
byte[] startRow = scan.getStartRow();
byte[] stopRow = scan.getStopRow();
for (int i = 0; i < keys.getFirst().length; i++) {
if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue;
} }
int count = 0;
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || byte[] startRow = scan.getStartRow();
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && byte[] stopRow = scan.getStopRow();
(stopRow.length == 0 || Bytes.compareTo(stopRow,
keys.getFirst()[i]) > 0)) {
byte[] splitStart = startRow.length == 0 ||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
keys.getFirst()[i] : startRow;
byte[] splitStop = (stopRow.length == 0 ||
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
keys.getSecond()[i].length > 0 ?
keys.getSecond()[i] : stopRow;
HRegionLocation hregionLocation = regionLocator.getRegionLocation( for (int i = 0; i < keys.getFirst().length; i++) {
keys.getFirst()[i], false); if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
String regionHostname = hregionLocation.getHostname(); continue;
HRegionInfo regionInfo = hregionLocation.getRegionInfo(); }
String encodedRegionName = regionInfo.getEncodedName();
long regionSize = sizeCalculator.getRegionSize(
regionInfo.getRegionName());
TableSplit split = new TableSplit(table.getName(), if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
scan, splitStart, splitStop, regionHostname, Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
encodedRegionName, regionSize); (stopRow.length == 0 || Bytes.compareTo(stopRow,
keys.getFirst()[i]) > 0)) {
byte[] splitStart = startRow.length == 0 ||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
keys.getFirst()[i] : startRow;
byte[] splitStop = (stopRow.length == 0 ||
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
keys.getSecond()[i].length > 0 ?
keys.getSecond()[i] : stopRow;
splits.add(split); HRegionLocation hregionLocation = regionLocator.getRegionLocation(
keys.getFirst()[i], false);
String regionHostname = hregionLocation.getHostname();
HRegionInfo regionInfo = hregionLocation.getRegionInfo();
String encodedRegionName = regionInfo.getEncodedName();
long regionSize = sizeCalculator.getRegionSize(
regionInfo.getRegionName());
if (LOG.isDebugEnabled()) TableSplit split = new TableSplit(table.getName(),
LOG.debug("getSplits: split -> " + (count++) + " -> " + split); scan, splitStart, splitStop, regionHostname,
encodedRegionName, regionSize);
splits.add(split);
if (LOG.isDebugEnabled()) {
LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
}
}
} }
} }
} }