HBASE-19245 MultiTableInputFormatBase#getSplits creates a Connection per Table
We make one Connection only instead of a Connection per table (Change is just moving one line it involves right-shifting body of the function)
This commit is contained in:
parent
0ac18a8126
commit
797670b129
|
@ -178,60 +178,62 @@ public abstract class MultiTableInputFormatBase extends
|
|||
|
||||
List<InputSplit> splits = new ArrayList<>();
|
||||
Iterator iter = tableMaps.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
|
||||
TableName tableName = entry.getKey();
|
||||
List<Scan> scanList = entry.getValue();
|
||||
|
||||
try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
|
||||
Table table = conn.getTable(tableName);
|
||||
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
|
||||
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
|
||||
regionLocator, conn.getAdmin());
|
||||
Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
|
||||
for (Scan scan : scanList) {
|
||||
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
|
||||
throw new IOException("Expecting at least one region for table : "
|
||||
+ tableName.getNameAsString());
|
||||
}
|
||||
int count = 0;
|
||||
|
||||
byte[] startRow = scan.getStartRow();
|
||||
byte[] stopRow = scan.getStopRow();
|
||||
|
||||
for (int i = 0; i < keys.getFirst().length; i++) {
|
||||
if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
|
||||
continue;
|
||||
// Make a single Connection to the Cluster and use it across all tables.
|
||||
try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration())) {
|
||||
while (iter.hasNext()) {
|
||||
Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
|
||||
TableName tableName = entry.getKey();
|
||||
List<Scan> scanList = entry.getValue();
|
||||
try (Table table = conn.getTable(tableName);
|
||||
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
|
||||
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
|
||||
regionLocator, conn.getAdmin());
|
||||
Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
|
||||
for (Scan scan : scanList) {
|
||||
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
|
||||
throw new IOException("Expecting at least one region for table : "
|
||||
+ tableName.getNameAsString());
|
||||
}
|
||||
int count = 0;
|
||||
|
||||
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
|
||||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
|
||||
(stopRow.length == 0 || Bytes.compareTo(stopRow,
|
||||
keys.getFirst()[i]) > 0)) {
|
||||
byte[] splitStart = startRow.length == 0 ||
|
||||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
|
||||
keys.getFirst()[i] : startRow;
|
||||
byte[] splitStop = (stopRow.length == 0 ||
|
||||
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
|
||||
keys.getSecond()[i].length > 0 ?
|
||||
keys.getSecond()[i] : stopRow;
|
||||
byte[] startRow = scan.getStartRow();
|
||||
byte[] stopRow = scan.getStopRow();
|
||||
|
||||
HRegionLocation hregionLocation = regionLocator.getRegionLocation(
|
||||
keys.getFirst()[i], false);
|
||||
String regionHostname = hregionLocation.getHostname();
|
||||
HRegionInfo regionInfo = hregionLocation.getRegionInfo();
|
||||
String encodedRegionName = regionInfo.getEncodedName();
|
||||
long regionSize = sizeCalculator.getRegionSize(
|
||||
regionInfo.getRegionName());
|
||||
for (int i = 0; i < keys.getFirst().length; i++) {
|
||||
if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
TableSplit split = new TableSplit(table.getName(),
|
||||
scan, splitStart, splitStop, regionHostname,
|
||||
encodedRegionName, regionSize);
|
||||
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
|
||||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
|
||||
(stopRow.length == 0 || Bytes.compareTo(stopRow,
|
||||
keys.getFirst()[i]) > 0)) {
|
||||
byte[] splitStart = startRow.length == 0 ||
|
||||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
|
||||
keys.getFirst()[i] : startRow;
|
||||
byte[] splitStop = (stopRow.length == 0 ||
|
||||
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
|
||||
keys.getSecond()[i].length > 0 ?
|
||||
keys.getSecond()[i] : stopRow;
|
||||
|
||||
splits.add(split);
|
||||
HRegionLocation hregionLocation = regionLocator.getRegionLocation(
|
||||
keys.getFirst()[i], false);
|
||||
String regionHostname = hregionLocation.getHostname();
|
||||
HRegionInfo regionInfo = hregionLocation.getRegionInfo();
|
||||
String encodedRegionName = regionInfo.getEncodedName();
|
||||
long regionSize = sizeCalculator.getRegionSize(
|
||||
regionInfo.getRegionName());
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
|
||||
TableSplit split = new TableSplit(table.getName(),
|
||||
scan, splitStart, splitStop, regionHostname,
|
||||
encodedRegionName, regionSize);
|
||||
|
||||
splits.add(split);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue