HBASE-24859: Optimize in-memory representation of HBase map reduce table splits (#2591)

Patch fixes the single table input format case.

Signed-off-by: Michael Stack <stack@apache.org>
Signed-off-by: Reid Chan <reidchan@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Sandeep Pal 2020-10-30 12:08:56 -07:00 committed by GitHub
parent 8813b3bfc1
commit 3238abcfce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 11 deletions

View File

@ -323,7 +323,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
}
List<InputSplit> splits = new ArrayList<>(1);
long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
TableSplit split = new TableSplit(tableName, scan,
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
TableSplit split = new TableSplit(tableName, null,
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
splits.add(split);
@ -363,8 +366,11 @@ extends InputFormat<ImmutableBytesWritable, Result> {
byte[] regionName = location.getRegionInfo().getRegionName();
String encodedRegionName = location.getRegionInfo().getEncodedName();
long regionSize = sizeCalculator.getRegionSize(regionName);
TableSplit split = new TableSplit(tableName, scan,
splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
TableSplit split = new TableSplit(tableName, null, splitStart, splitStop,
regionLocation, encodedRegionName, regionSize);
splits.add(split);
if (LOG.isDebugEnabled()) {
LOG.debug("getSplits: split -> " + i + " -> " + split);
@ -427,9 +433,12 @@ extends InputFormat<ImmutableBytesWritable, Result> {
// Split Region into n chunks evenly
byte[][] splitKeys = Bytes.split(startRow, endRow, true, n-1);
for (int i = 0; i < splitKeys.length - 1; i++) {
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
//notice that the regionSize parameter may be not very accurate
TableSplit tsplit =
new TableSplit(tableName, scan, splitKeys[i], splitKeys[i + 1], regionLocation,
new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], regionLocation,
encodedRegionName, regionSize / n);
res.add(tsplit);
}
@ -527,7 +536,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
}
}
i = j - 1;
TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, regionLocation,
// In the table input format for single table, we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
TableSplit t = new TableSplit(tableName, null, splitStartKey, splitEndKey, regionLocation,
encodedRegionName, totalSize);
resultList.add(t);
}

View File

@ -25,10 +25,10 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TableSplit extends InputSplit
implements Writable, Comparable<TableSplit> {
implements Writable, Comparable<TableSplit> {
/** @deprecated LOG variable would be made private. */
@Deprecated
public static final Log LOG = LogFactory.getLog(TableSplit.class);
@ -86,7 +86,18 @@ implements Writable, Comparable<TableSplit> {
private byte [] endRow;
private String regionLocation;
private String encodedRegionName = "";
/**
* The scan object may be null but the serialized form of scan is never null
* or empty since we serialize the scan object with default values then.
* Having no scanner in TableSplit doesn't necessarily mean there is no scanner
* for mapreduce job, it just means that we do not need to set it for each split.
* For example, it is not required to have a scan object for
* {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use the scan from the
* job conf and scanner is supposed to be same for all the splits of table.
*/
private String scan = ""; // stores the serialized form of the Scan
private long length; // Contains estimation of region size in bytes
/** Default constructor. */
@ -208,12 +219,23 @@ implements Writable, Comparable<TableSplit> {
* Returns a Scan object from the stored string representation.
*
* @return Returns a Scan object based on the stored scanner.
* @throws IOException
* @throws IOException throws IOException if deserialization fails
*/
public Scan getScan() throws IOException {
return TableMapReduceUtil.convertStringToScan(this.scan);
}
/**
* Returns a scan string
* @return scan as string. Should be noted that this is not same as getScan().toString()
* because Scan object will have the default values when empty scan string is
* deserialized. Thus, getScan().toString() can never be empty
*/
@InterfaceAudience.Private
public String getScanAsString() {
return this.scan;
}
/**
* Returns the table name converted to a byte array.
* @see #getTable()

View File

@ -23,7 +23,11 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -37,7 +41,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
@ -271,6 +274,12 @@ public abstract class TestTableInputFormatScanBase {
TableInputFormat tif = new TableInputFormat();
tif.setConf(job.getConfiguration());
List<InputSplit> splits = tif.getSplits(job);
for (InputSplit split : splits) {
TableSplit tableSplit = (TableSplit) split;
// In table input format, we do no store the scanner at the split level
// because we use the scan object from the map-reduce job conf itself.
Assert.assertTrue(tableSplit.getScanAsString().isEmpty());
}
Assert.assertEquals(expectedNumOfSplits, splits.size());
}