HBASE-24859: Optimize in-memory representation of HBase map reduce table splits (#2609)
Patch fixes the single table input format case. Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
7a41247ef7
commit
5abbda1969
|
@ -26,10 +26,6 @@ import java.net.UnknownHostException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -52,6 +48,9 @@ import org.apache.hadoop.mapreduce.RecordReader;
|
|||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.net.DNS;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
|
@ -287,7 +286,7 @@ public abstract class TableInputFormatBase
|
|||
* Create one InputSplit per region
|
||||
*
|
||||
* @return The list of InputSplit for all the regions
|
||||
* @throws IOException
|
||||
* @throws IOException throws IOException
|
||||
*/
|
||||
private List<InputSplit> oneInputSplitPerRegion() throws IOException {
|
||||
RegionSizeCalculator sizeCalculator =
|
||||
|
@ -305,7 +304,10 @@ public abstract class TableInputFormatBase
|
|||
}
|
||||
List<InputSplit> splits = new ArrayList<>(1);
|
||||
long regionSize = sizeCalculator.getRegionSize(regLoc.getRegion().getRegionName());
|
||||
TableSplit split = new TableSplit(tableName, scan,
|
||||
// In the table input format for single table we do not need to
|
||||
// store the scan object in table split because it can be memory intensive and redundant
|
||||
// information to what is already stored in conf SCAN. See HBASE-25212
|
||||
TableSplit split = new TableSplit(tableName, null,
|
||||
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
|
||||
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
|
||||
splits.add(split);
|
||||
|
@ -345,7 +347,10 @@ public abstract class TableInputFormatBase
|
|||
byte[] regionName = location.getRegion().getRegionName();
|
||||
String encodedRegionName = location.getRegion().getEncodedName();
|
||||
long regionSize = sizeCalculator.getRegionSize(regionName);
|
||||
TableSplit split = new TableSplit(tableName, scan,
|
||||
// In the table input format for single table we do not need to
|
||||
// store the scan object in table split because it can be memory intensive and redundant
|
||||
// information to what is already stored in conf SCAN. See HBASE-25212
|
||||
TableSplit split = new TableSplit(tableName, null,
|
||||
splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
|
||||
splits.add(split);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -362,7 +367,7 @@ public abstract class TableInputFormatBase
|
|||
* @param n Number of ranges after splitting. Pass 1 means no split for the range
|
||||
* Pass 2 if you want to split the range in two;
|
||||
* @return A list of TableSplit, the size of the list is n
|
||||
* @throws IllegalArgumentIOException
|
||||
* @throws IllegalArgumentIOException throws IllegalArgumentIOException
|
||||
*/
|
||||
protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n)
|
||||
throws IllegalArgumentIOException {
|
||||
|
@ -409,9 +414,12 @@ public abstract class TableInputFormatBase
|
|||
// Split Region into n chunks evenly
|
||||
byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
|
||||
for (int i = 0; i < splitKeys.length - 1; i++) {
|
||||
// In the table input format for single table we do not need to
|
||||
// store the scan object in table split because it can be memory intensive and redundant
|
||||
// information to what is already stored in conf SCAN. See HBASE-25212
|
||||
//notice that the regionSize parameter may be not very accurate
|
||||
TableSplit tsplit =
|
||||
new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation,
|
||||
new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], regionLocation,
|
||||
encodedRegionName, regionSize / n);
|
||||
res.add(tsplit);
|
||||
}
|
||||
|
@ -488,7 +496,10 @@ public abstract class TableInputFormatBase
|
|||
}
|
||||
}
|
||||
i = j - 1;
|
||||
TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation,
|
||||
// In the table input format for single table we do not need to
|
||||
// store the scan object in table split because it can be memory intensive and redundant
|
||||
// information to what is already stored in conf SCAN. See HBASE-25212
|
||||
TableSplit t = new TableSplit(tableName, null, splitStartKey, splitEndKey, regionLocation,
|
||||
encodedRegionName, totalSize);
|
||||
resultList.add(t);
|
||||
}
|
||||
|
@ -508,7 +519,9 @@ public abstract class TableInputFormatBase
|
|||
// reverse DNS using jndi doesn't work well with ipv6 addresses.
|
||||
ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
|
||||
}
|
||||
if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
|
||||
if (ipAddressString == null) {
|
||||
throw new UnknownHostException("No host found for " + ipAddress);
|
||||
}
|
||||
hostName = Strings.domainNamePointerToHostName(ipAddressString);
|
||||
this.reverseDNSCacheMap.put(ipAddress, hostName);
|
||||
}
|
||||
|
|
|
@ -22,17 +22,16 @@ import java.io.DataInput;
|
|||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A table split corresponds to a key range (low, high) and an optional scanner.
|
||||
|
@ -40,7 +39,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
|
|||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class TableSplit extends InputSplit
|
||||
implements Writable, Comparable<TableSplit> {
|
||||
implements Writable, Comparable<TableSplit> {
|
||||
/** @deprecated LOG variable would be made private. fix in hbase 3.0 */
|
||||
@Deprecated
|
||||
public static final Logger LOG = LoggerFactory.getLogger(TableSplit.class);
|
||||
|
@ -84,6 +83,16 @@ implements Writable, Comparable<TableSplit> {
|
|||
private byte [] endRow;
|
||||
private String regionLocation;
|
||||
private String encodedRegionName = "";
|
||||
|
||||
/**
|
||||
* The scan object may be null but the serialized form of scan is never null
|
||||
* or empty since we serialize the scan object with default values then.
|
||||
* Having no scanner in TableSplit doesn't necessarily mean there is no scanner
|
||||
* for mapreduce job, it just means that we do not need to set it for each split.
|
||||
* For example, it is not required to have a scan object for
|
||||
* {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use the scan from the
|
||||
* job conf and scanner is supposed to be same for all the splits of table.
|
||||
*/
|
||||
private String scan = ""; // stores the serialized form of the Scan
|
||||
private long length; // Contains estimation of region size in bytes
|
||||
|
||||
|
@ -182,12 +191,23 @@ implements Writable, Comparable<TableSplit> {
|
|||
* Returns a Scan object from the stored string representation.
|
||||
*
|
||||
* @return Returns a Scan object based on the stored scanner.
|
||||
* @throws IOException
|
||||
* @throws IOException throws IOException if deserialization fails
|
||||
*/
|
||||
public Scan getScan() throws IOException {
|
||||
return TableMapReduceUtil.convertStringToScan(this.scan);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a scan string
|
||||
* @return scan as string. Should be noted that this is not same as getScan().toString()
|
||||
* because Scan object will have the default values when empty scan string is
|
||||
* deserialized. Thus, getScan().toString() can never be empty
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public String getScanAsString() {
|
||||
return this.scan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the table name converted to a byte array.
|
||||
* @see #getTable()
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.mapreduce;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -86,7 +85,7 @@ public abstract class TestTableInputFormatScanBase {
|
|||
* Pass the key and value to reduce.
|
||||
*/
|
||||
public static class ScanMapper
|
||||
extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
|
||||
extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
|
||||
|
||||
/**
|
||||
* Pass the key and value to reduce.
|
||||
|
@ -99,7 +98,7 @@ public abstract class TestTableInputFormatScanBase {
|
|||
@Override
|
||||
public void map(ImmutableBytesWritable key, Result value,
|
||||
Context context)
|
||||
throws IOException, InterruptedException {
|
||||
throws IOException, InterruptedException {
|
||||
if (value.size() != 2) {
|
||||
throw new IOException("There should be two input columns");
|
||||
}
|
||||
|
@ -123,7 +122,7 @@ public abstract class TestTableInputFormatScanBase {
|
|||
* Checks the last and first key seen against the scanner boundaries.
|
||||
*/
|
||||
public static class ScanReducer
|
||||
extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
|
||||
extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
|
||||
NullWritable, NullWritable> {
|
||||
|
||||
private String first = null;
|
||||
|
@ -131,7 +130,7 @@ public abstract class TestTableInputFormatScanBase {
|
|||
|
||||
protected void reduce(ImmutableBytesWritable key,
|
||||
Iterable<ImmutableBytesWritable> values, Context context)
|
||||
throws IOException ,InterruptedException {
|
||||
throws IOException ,InterruptedException {
|
||||
int count = 0;
|
||||
for (ImmutableBytesWritable value : values) {
|
||||
String val = Bytes.toStringBinary(value.get());
|
||||
|
@ -144,7 +143,7 @@ public abstract class TestTableInputFormatScanBase {
|
|||
}
|
||||
|
||||
protected void cleanup(Context context)
|
||||
throws IOException, InterruptedException {
|
||||
throws IOException, InterruptedException {
|
||||
Configuration c = context.getConfiguration();
|
||||
String startRow = c.get(KEY_STARTROW);
|
||||
String lastRow = c.get(KEY_LASTROW);
|
||||
|
@ -249,6 +248,12 @@ public abstract class TestTableInputFormatScanBase {
|
|||
tif.setConf(job.getConfiguration());
|
||||
Assert.assertEquals(TABLE_NAME, table.getName());
|
||||
List<InputSplit> splits = tif.getSplits(job);
|
||||
for (InputSplit split : splits) {
|
||||
TableSplit tableSplit = (TableSplit) split;
|
||||
// In table input format, we do no store the scanner at the split level
|
||||
// because we use the scan object from the map-reduce job conf itself.
|
||||
Assert.assertTrue(tableSplit.getScanAsString().isEmpty());
|
||||
}
|
||||
Assert.assertEquals(expectedNumOfSplits, splits.size());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue