HBASE-15396 Enhance mapreduce.TableSplit to add encoded region name

(cherry picked from commit 7d3a89ce8e)

Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
Harsh J 2016-04-01 15:20:47 +05:30 committed by Sean Busbey
parent e6a32141b3
commit ff075fd9df
4 changed files with 77 additions and 18 deletions

View File

@ -223,11 +223,13 @@ public abstract class MultiTableInputFormatBase extends
keys.getFirst()[i], false); keys.getFirst()[i], false);
String regionHostname = hregionLocation.getHostname(); String regionHostname = hregionLocation.getHostname();
HRegionInfo regionInfo = hregionLocation.getRegionInfo(); HRegionInfo regionInfo = hregionLocation.getRegionInfo();
String encodedRegionName = regionInfo.getEncodedName();
long regionSize = sizeCalculator.getRegionSize( long regionSize = sizeCalculator.getRegionSize(
regionInfo.getRegionName()); regionInfo.getRegionName());
TableSplit split = new TableSplit(table.getName(), TableSplit split = new TableSplit(table.getName(),
scan, splitStart, splitStop, regionHostname, regionSize); scan, splitStart, splitStop, regionHostname,
encodedRegionName, regionSize);
splits.add(split); splits.add(split);

View File

@ -306,9 +306,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
keys.getSecond()[i] : stopRow; keys.getSecond()[i] : stopRow;
byte[] regionName = location.getRegionInfo().getRegionName(); byte[] regionName = location.getRegionInfo().getRegionName();
String encodedRegionName = location.getRegionInfo().getEncodedName();
long regionSize = sizeCalculator.getRegionSize(regionName); long regionSize = sizeCalculator.getRegionSize(regionName);
TableSplit split = new TableSplit(table.getName(), scan, TableSplit split = new TableSplit(table.getName(), scan,
splitStart, splitStop, regionLocation, regionSize); splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
splits.add(split); splits.add(split);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("getSplits: split -> " + i + " -> " + split); LOG.debug("getSplits: split -> " + i + " -> " + split);
@ -391,6 +392,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
while (count < list.size()) { while (count < list.size()) {
TableSplit ts = (TableSplit)list.get(count); TableSplit ts = (TableSplit)list.get(count);
String regionLocation = ts.getRegionLocation(); String regionLocation = ts.getRegionLocation();
String encodedRegionName = ts.getEncodedRegionName();
long regionSize = ts.getLength(); long regionSize = ts.getLength();
if (regionSize >= dataSkewThreshold) { if (regionSize >= dataSkewThreshold) {
// if the current region size is large than the data skew threshold, // if the current region size is large than the data skew threshold,
@ -399,9 +401,9 @@ extends InputFormat<ImmutableBytesWritable, Result> {
//Set the size of child TableSplit as 1/2 of the region size. The exact size of the //Set the size of child TableSplit as 1/2 of the region size. The exact size of the
// MapReduce input splits is not far off. // MapReduce input splits is not far off.
TableSplit t1 = new TableSplit(table.getName(), scan, ts.getStartRow(), splitKey, TableSplit t1 = new TableSplit(table.getName(), scan, ts.getStartRow(), splitKey,
regionLocation, regionSize / 2); regionLocation, encodedRegionName, regionSize / 2);
TableSplit t2 = new TableSplit(table.getName(), scan, splitKey, ts.getEndRow(), TableSplit t2 = new TableSplit(table.getName(), scan, splitKey, ts.getEndRow(),
regionLocation, regionSize - regionSize / 2); regionLocation, encodedRegionName, regionSize - regionSize / 2);
resultList.add(t1); resultList.add(t1);
resultList.add(t2); resultList.add(t2);
count++; count++;
@ -428,7 +430,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
} }
} }
TableSplit t = new TableSplit(table.getName(), scan, splitStartKey, splitEndKey, TableSplit t = new TableSplit(table.getName(), scan, splitStartKey, splitEndKey,
regionLocation, totalSize); regionLocation, encodedRegionName, totalSize);
resultList.add(t); resultList.add(t);
} }
} }

View File

@ -52,7 +52,9 @@ implements Writable, Comparable<TableSplit> {
enum Version { enum Version {
UNVERSIONED(0), UNVERSIONED(0),
// Initial number we put on TableSplit when we introduced versioning. // Initial number we put on TableSplit when we introduced versioning.
INITIAL(-1); INITIAL(-1),
// Added an encoded region name field for easier identification of split -> region
WITH_ENCODED_REGION_NAME(-2);
final int code; final int code;
static final Version[] byCode; static final Version[] byCode;
@ -78,11 +80,12 @@ implements Writable, Comparable<TableSplit> {
} }
} }
private static final Version VERSION = Version.INITIAL; private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME;
private TableName tableName; private TableName tableName;
private byte [] startRow; private byte [] startRow;
private byte [] endRow; private byte [] endRow;
private String regionLocation; private String regionLocation;
private String encodedRegionName = "";
private String scan = ""; // stores the serialized form of the Scan private String scan = ""; // stores the serialized form of the Scan
private long length; // Contains estimation of region size in bytes private long length; // Contains estimation of region size in bytes
@ -107,6 +110,7 @@ implements Writable, Comparable<TableSplit> {
/** /**
* Creates a new instance while assigning all variables. * Creates a new instance while assigning all variables.
* Length of region is set to 0 * Length of region is set to 0
* Encoded name of the region is set to blank
* *
* @param tableName The name of the current table. * @param tableName The name of the current table.
* @param scan The scan associated with this split. * @param scan The scan associated with this split.
@ -121,6 +125,7 @@ implements Writable, Comparable<TableSplit> {
/** /**
* Creates a new instance while assigning all variables. * Creates a new instance while assigning all variables.
* Encoded name of region is set to blank
* *
* @param tableName The name of the current table. * @param tableName The name of the current table.
* @param scan The scan associated with this split. * @param scan The scan associated with this split.
@ -130,6 +135,21 @@ implements Writable, Comparable<TableSplit> {
*/ */
public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow, public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
final String location, long length) { final String location, long length) {
this(tableName, scan, startRow, endRow, location, "", length);
}
/**
* Creates a new instance while assigning all variables.
*
* @param tableName The name of the current table.
* @param scan The scan associated with this split.
* @param startRow The start row of the split.
* @param endRow The end row of the split.
* @param encodedRegionName The region ID.
* @param location The location of the region.
*/
public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
final String location, final String encodedRegionName, long length) {
this.tableName = tableName; this.tableName = tableName;
try { try {
this.scan = this.scan =
@ -140,6 +160,7 @@ implements Writable, Comparable<TableSplit> {
this.startRow = startRow; this.startRow = startRow;
this.endRow = endRow; this.endRow = endRow;
this.regionLocation = location; this.regionLocation = location;
this.encodedRegionName = encodedRegionName;
this.length = length; this.length = length;
} }
@ -157,6 +178,7 @@ implements Writable, Comparable<TableSplit> {
/** /**
* Creates a new instance without a scanner. * Creates a new instance without a scanner.
* Length of region is set to 0
* *
* @param tableName The name of the current table. * @param tableName The name of the current table.
* @param startRow The start row of the split. * @param startRow The start row of the split.
@ -251,6 +273,15 @@ implements Writable, Comparable<TableSplit> {
return new String[] {regionLocation}; return new String[] {regionLocation};
} }
/**
* Returns the region's encoded name.
*
* @return The region's encoded name.
*/
public String getEncodedRegionName() {
return encodedRegionName;
}
/** /**
* Returns the length of the split. * Returns the length of the split.
* *
@ -295,6 +326,9 @@ implements Writable, Comparable<TableSplit> {
scan = Bytes.toString(Bytes.readByteArray(in)); scan = Bytes.toString(Bytes.readByteArray(in));
} }
length = WritableUtils.readVLong(in); length = WritableUtils.readVLong(in);
if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) {
encodedRegionName = Bytes.toString(Bytes.readByteArray(in));
}
} }
/** /**
@ -312,6 +346,7 @@ implements Writable, Comparable<TableSplit> {
Bytes.writeByteArray(out, Bytes.toBytes(regionLocation)); Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
Bytes.writeByteArray(out, Bytes.toBytes(scan)); Bytes.writeByteArray(out, Bytes.toBytes(scan));
WritableUtils.writeVLong(out, length); WritableUtils.writeVLong(out, length);
Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName));
} }
/** /**
@ -340,6 +375,7 @@ implements Writable, Comparable<TableSplit> {
sb.append(", start row: ").append(Bytes.toStringBinary(startRow)); sb.append(", start row: ").append(Bytes.toStringBinary(startRow));
sb.append(", end row: ").append(Bytes.toStringBinary(endRow)); sb.append(", end row: ").append(Bytes.toStringBinary(endRow));
sb.append(", region location: ").append(regionLocation); sb.append(", region location: ").append(regionLocation);
sb.append(", encoded region name: ").append(encodedRegionName);
sb.append(")"); sb.append(")");
return sb.toString(); return sb.toString();
} }
@ -372,13 +408,14 @@ implements Writable, Comparable<TableSplit> {
regionLocation.equals(((TableSplit)o).regionLocation); regionLocation.equals(((TableSplit)o).regionLocation);
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = tableName != null ? tableName.hashCode() : 0; int result = tableName != null ? tableName.hashCode() : 0;
result = 31 * result + (scan != null ? scan.hashCode() : 0); result = 31 * result + (scan != null ? scan.hashCode() : 0);
result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0); result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0); result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0); result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);
return result; result = 31 * result + (encodedRegionName != null ? encodedRegionName.hashCode() : 0);
} return result;
}
} }

View File

@ -92,13 +92,31 @@ public class TestTableSplit {
"location"); "location");
String str = String str =
"HBase table split(table name: table, scan: , start row: row-start, " "HBase table split(table name: table, scan: , start row: row-start, "
+ "end row: row-end, region location: location)"; + "end row: row-end, region location: location, "
+ "encoded region name: )";
Assert.assertEquals(str, split.toString());
split =
new TableSplit(TableName.valueOf("table"), null, "row-start".getBytes(),
"row-end".getBytes(), "location", "encoded-region-name", 1000L);
str =
"HBase table split(table name: table, scan: , start row: row-start, "
+ "end row: row-end, region location: location, "
+ "encoded region name: encoded-region-name)";
Assert.assertEquals(str, split.toString()); Assert.assertEquals(str, split.toString());
split = new TableSplit((TableName) null, null, null, null); split = new TableSplit((TableName) null, null, null, null);
str = str =
"HBase table split(table name: null, scan: , start row: null, " "HBase table split(table name: null, scan: , start row: null, "
+ "end row: null, region location: null)"; + "end row: null, region location: null, "
+ "encoded region name: )";
Assert.assertEquals(str, split.toString());
split = new TableSplit((TableName) null, null, null, null, null, null, 1000L);
str =
"HBase table split(table name: null, scan: , start row: null, "
+ "end row: null, region location: null, "
+ "encoded region name: null)";
Assert.assertEquals(str, split.toString()); Assert.assertEquals(str, split.toString());
} }
} }