HBASE-12223 MultiTableInputFormatBase.getSplits is too slow (Yuanbo Peng)

This commit is contained in:
tedyu 2014-12-18 05:10:12 -08:00
parent d66ae28c07
commit effbe85888
1 changed files with 62 additions and 43 deletions

View File

@ -46,6 +46,9 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
/** /**
* A base for {@link MultiTableInputFormat}s. Receives a list of * A base for {@link MultiTableInputFormat}s. Receives a list of
* {@link Scan} instances that define the input tables and * {@link Scan} instances that define the input tables and
@ -129,25 +132,42 @@ public abstract class MultiTableInputFormatBase extends
if (scans.isEmpty()) { if (scans.isEmpty()) {
throw new IOException("No scans were provided."); throw new IOException("No scans were provided.");
} }
List<InputSplit> splits = new ArrayList<InputSplit>();
Map<TableName, List<Scan>> tableMaps = new HashMap<TableName, List<Scan>>();
for (Scan scan : scans) { for (Scan scan : scans) {
byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
if (tableNameBytes == null) if (tableNameBytes == null)
throw new IOException("A scan object did not have a table name"); throw new IOException("A scan object did not have a table name");
TableName tableName = TableName.valueOf(tableNameBytes); TableName tableName = TableName.valueOf(tableNameBytes);
List<Scan> scanList = tableMaps.get(tableName);
if (scanList == null) {
scanList = new ArrayList<Scan>();
tableMaps.put(tableName, scanList);
}
scanList.add(scan);
}
List<InputSplit> splits = new ArrayList<InputSplit>();
Iterator iter = tableMaps.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
TableName tableName = entry.getKey();
List<Scan> scanList = entry.getValue();
Table table = null; Table table = null;
RegionLocator regionLocator = null; RegionLocator regionLocator = null;
Connection conn = null; Connection conn = null;
try{ try{
conn = ConnectionFactory.createConnection(context.getConfiguration()); conn = ConnectionFactory.createConnection(context.getConfiguration());
table = conn.getTable(tableName); table = conn.getTable(tableName);
regionLocator = conn.getRegionLocator(tableName);
regionLocator = (RegionLocator) table; regionLocator = (RegionLocator) table;
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
regionLocator, conn.getAdmin());
Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
if (keys == null || keys.getFirst() == null || for (Scan scan : scanList) {
keys.getFirst().length == 0) { if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new IOException("Expecting at least one region for table : " throw new IOException("Expecting at least one region for table : "
+ tableName.getNameAsString()); + tableName.getNameAsString());
} }
@ -156,48 +176,47 @@ public abstract class MultiTableInputFormatBase extends
byte[] startRow = scan.getStartRow(); byte[] startRow = scan.getStartRow();
byte[] stopRow = scan.getStopRow(); byte[] stopRow = scan.getStopRow();
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
regionLocator, conn.getAdmin());
for (int i = 0; i < keys.getFirst().length; i++) { for (int i = 0; i < keys.getFirst().length; i++) {
if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue; continue;
} }
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
(stopRow.length == 0 || Bytes.compareTo(stopRow,
keys.getFirst()[i]) > 0)) {
byte[] splitStart = startRow.length == 0 ||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
keys.getFirst()[i] : startRow;
byte[] splitStop = (stopRow.length == 0 ||
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
keys.getSecond()[i].length > 0 ?
keys.getSecond()[i] : stopRow;
HRegionLocation hregionLocation = regionLocator.getRegionLocation( HRegionLocation hregionLocation = regionLocator.getRegionLocation(
keys.getFirst()[i], false); keys.getFirst()[i], false);
String regionHostname = hregionLocation.getHostname(); String regionHostname = hregionLocation.getHostname();
HRegionInfo regionInfo = hregionLocation.getRegionInfo(); HRegionInfo regionInfo = hregionLocation.getRegionInfo();
long regionSize = sizeCalculator.getRegionSize(
regionInfo.getRegionName());
// determine if the given start and stop keys fall into the range TableSplit split = new TableSplit(table.getName(),
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
(stopRow.length == 0 ||
Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
byte[] splitStart =
startRow.length == 0 ||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys
.getFirst()[i] : startRow;
byte[] splitStop =
(stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i],
stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys
.getSecond()[i] : stopRow;
long regionSize = sizeCalculator.getRegionSize(regionInfo.getRegionName());
TableSplit split =
new TableSplit(regionLocator.getName(),
scan, splitStart, splitStop, regionHostname, regionSize); scan, splitStart, splitStop, regionHostname, regionSize);
splits.add(split); splits.add(split);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("getSplits: split -> " + (count++) + " -> " + split); LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
} }
} }
}
} finally { } finally {
if (null != table) table.close(); if (null != table) table.close();
if (null != regionLocator) regionLocator.close(); if (null != regionLocator) regionLocator.close();
if (null != conn) conn.close(); if (null != conn) conn.close();
} }
} }
return splits; return splits;
} }