HBASE-15396 Enhance mapreduce.TableSplit to add encoded region name
Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
parent
6905d272d3
commit
7d3a89ce8e
|
@ -223,11 +223,13 @@ public abstract class MultiTableInputFormatBase extends
|
||||||
keys.getFirst()[i], false);
|
keys.getFirst()[i], false);
|
||||||
String regionHostname = hregionLocation.getHostname();
|
String regionHostname = hregionLocation.getHostname();
|
||||||
HRegionInfo regionInfo = hregionLocation.getRegionInfo();
|
HRegionInfo regionInfo = hregionLocation.getRegionInfo();
|
||||||
|
String encodedRegionName = regionInfo.getEncodedName();
|
||||||
long regionSize = sizeCalculator.getRegionSize(
|
long regionSize = sizeCalculator.getRegionSize(
|
||||||
regionInfo.getRegionName());
|
regionInfo.getRegionName());
|
||||||
|
|
||||||
TableSplit split = new TableSplit(table.getName(),
|
TableSplit split = new TableSplit(table.getName(),
|
||||||
scan, splitStart, splitStop, regionHostname, regionSize);
|
scan, splitStart, splitStop, regionHostname,
|
||||||
|
encodedRegionName, regionSize);
|
||||||
|
|
||||||
splits.add(split);
|
splits.add(split);
|
||||||
|
|
||||||
|
|
|
@ -300,9 +300,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
keys.getSecond()[i] : stopRow;
|
keys.getSecond()[i] : stopRow;
|
||||||
|
|
||||||
byte[] regionName = location.getRegionInfo().getRegionName();
|
byte[] regionName = location.getRegionInfo().getRegionName();
|
||||||
|
String encodedRegionName = location.getRegionInfo().getEncodedName();
|
||||||
long regionSize = sizeCalculator.getRegionSize(regionName);
|
long regionSize = sizeCalculator.getRegionSize(regionName);
|
||||||
TableSplit split = new TableSplit(tableName, scan,
|
TableSplit split = new TableSplit(tableName, scan,
|
||||||
splitStart, splitStop, regionLocation, regionSize);
|
splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
|
||||||
splits.add(split);
|
splits.add(split);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("getSplits: split -> " + i + " -> " + split);
|
LOG.debug("getSplits: split -> " + i + " -> " + split);
|
||||||
|
@ -382,6 +383,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
TableSplit ts = (TableSplit)list.get(count);
|
TableSplit ts = (TableSplit)list.get(count);
|
||||||
TableName tableName = ts.getTable();
|
TableName tableName = ts.getTable();
|
||||||
String regionLocation = ts.getRegionLocation();
|
String regionLocation = ts.getRegionLocation();
|
||||||
|
String encodedRegionName = ts.getEncodedRegionName();
|
||||||
long regionSize = ts.getLength();
|
long regionSize = ts.getLength();
|
||||||
if (regionSize >= dataSkewThreshold) {
|
if (regionSize >= dataSkewThreshold) {
|
||||||
// if the current region size is large than the data skew threshold,
|
// if the current region size is large than the data skew threshold,
|
||||||
|
@ -390,9 +392,9 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
//Set the size of child TableSplit as 1/2 of the region size. The exact size of the
|
//Set the size of child TableSplit as 1/2 of the region size. The exact size of the
|
||||||
// MapReduce input splits is not far off.
|
// MapReduce input splits is not far off.
|
||||||
TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey, regionLocation,
|
TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey, regionLocation,
|
||||||
regionSize / 2);
|
encodedRegionName, regionSize / 2);
|
||||||
TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation,
|
TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation,
|
||||||
regionSize - regionSize / 2);
|
encodedRegionName, regionSize - regionSize / 2);
|
||||||
resultList.add(t1);
|
resultList.add(t1);
|
||||||
resultList.add(t2);
|
resultList.add(t2);
|
||||||
count++;
|
count++;
|
||||||
|
@ -419,7 +421,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey,
|
TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey,
|
||||||
regionLocation, totalSize);
|
regionLocation, encodedRegionName, totalSize);
|
||||||
resultList.add(t);
|
resultList.add(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,9 @@ implements Writable, Comparable<TableSplit> {
|
||||||
enum Version {
|
enum Version {
|
||||||
UNVERSIONED(0),
|
UNVERSIONED(0),
|
||||||
// Initial number we put on TableSplit when we introduced versioning.
|
// Initial number we put on TableSplit when we introduced versioning.
|
||||||
INITIAL(-1);
|
INITIAL(-1),
|
||||||
|
// Added an encoded region name field for easier identification of split -> region
|
||||||
|
WITH_ENCODED_REGION_NAME(-2);
|
||||||
|
|
||||||
final int code;
|
final int code;
|
||||||
static final Version[] byCode;
|
static final Version[] byCode;
|
||||||
|
@ -78,11 +80,12 @@ implements Writable, Comparable<TableSplit> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Version VERSION = Version.INITIAL;
|
private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME;
|
||||||
private TableName tableName;
|
private TableName tableName;
|
||||||
private byte [] startRow;
|
private byte [] startRow;
|
||||||
private byte [] endRow;
|
private byte [] endRow;
|
||||||
private String regionLocation;
|
private String regionLocation;
|
||||||
|
private String encodedRegionName = "";
|
||||||
private String scan = ""; // stores the serialized form of the Scan
|
private String scan = ""; // stores the serialized form of the Scan
|
||||||
private long length; // Contains estimation of region size in bytes
|
private long length; // Contains estimation of region size in bytes
|
||||||
|
|
||||||
|
@ -95,6 +98,7 @@ implements Writable, Comparable<TableSplit> {
|
||||||
/**
|
/**
|
||||||
* Creates a new instance while assigning all variables.
|
* Creates a new instance while assigning all variables.
|
||||||
* Length of region is set to 0
|
* Length of region is set to 0
|
||||||
|
* Encoded name of the region is set to blank
|
||||||
*
|
*
|
||||||
* @param tableName The name of the current table.
|
* @param tableName The name of the current table.
|
||||||
* @param scan The scan associated with this split.
|
* @param scan The scan associated with this split.
|
||||||
|
@ -109,6 +113,7 @@ implements Writable, Comparable<TableSplit> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance while assigning all variables.
|
* Creates a new instance while assigning all variables.
|
||||||
|
* Encoded name of region is set to blank
|
||||||
*
|
*
|
||||||
* @param tableName The name of the current table.
|
* @param tableName The name of the current table.
|
||||||
* @param scan The scan associated with this split.
|
* @param scan The scan associated with this split.
|
||||||
|
@ -118,6 +123,21 @@ implements Writable, Comparable<TableSplit> {
|
||||||
*/
|
*/
|
||||||
public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
|
public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
|
||||||
final String location, long length) {
|
final String location, long length) {
|
||||||
|
this(tableName, scan, startRow, endRow, location, "", length);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance while assigning all variables.
|
||||||
|
*
|
||||||
|
* @param tableName The name of the current table.
|
||||||
|
* @param scan The scan associated with this split.
|
||||||
|
* @param startRow The start row of the split.
|
||||||
|
* @param endRow The end row of the split.
|
||||||
|
* @param encodedRegionName The region ID.
|
||||||
|
* @param location The location of the region.
|
||||||
|
*/
|
||||||
|
public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
|
||||||
|
final String location, final String encodedRegionName, long length) {
|
||||||
this.tableName = tableName;
|
this.tableName = tableName;
|
||||||
try {
|
try {
|
||||||
this.scan =
|
this.scan =
|
||||||
|
@ -128,11 +148,13 @@ implements Writable, Comparable<TableSplit> {
|
||||||
this.startRow = startRow;
|
this.startRow = startRow;
|
||||||
this.endRow = endRow;
|
this.endRow = endRow;
|
||||||
this.regionLocation = location;
|
this.regionLocation = location;
|
||||||
|
this.encodedRegionName = encodedRegionName;
|
||||||
this.length = length;
|
this.length = length;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance without a scanner.
|
* Creates a new instance without a scanner.
|
||||||
|
* Length of region is set to 0
|
||||||
*
|
*
|
||||||
* @param tableName The name of the current table.
|
* @param tableName The name of the current table.
|
||||||
* @param startRow The start row of the split.
|
* @param startRow The start row of the split.
|
||||||
|
@ -227,6 +249,15 @@ implements Writable, Comparable<TableSplit> {
|
||||||
return new String[] {regionLocation};
|
return new String[] {regionLocation};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the region's encoded name.
|
||||||
|
*
|
||||||
|
* @return The region's encoded name.
|
||||||
|
*/
|
||||||
|
public String getEncodedRegionName() {
|
||||||
|
return encodedRegionName;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the length of the split.
|
* Returns the length of the split.
|
||||||
*
|
*
|
||||||
|
@ -271,6 +302,9 @@ implements Writable, Comparable<TableSplit> {
|
||||||
scan = Bytes.toString(Bytes.readByteArray(in));
|
scan = Bytes.toString(Bytes.readByteArray(in));
|
||||||
}
|
}
|
||||||
length = WritableUtils.readVLong(in);
|
length = WritableUtils.readVLong(in);
|
||||||
|
if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) {
|
||||||
|
encodedRegionName = Bytes.toString(Bytes.readByteArray(in));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -288,6 +322,7 @@ implements Writable, Comparable<TableSplit> {
|
||||||
Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
|
Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
|
||||||
Bytes.writeByteArray(out, Bytes.toBytes(scan));
|
Bytes.writeByteArray(out, Bytes.toBytes(scan));
|
||||||
WritableUtils.writeVLong(out, length);
|
WritableUtils.writeVLong(out, length);
|
||||||
|
Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -316,6 +351,7 @@ implements Writable, Comparable<TableSplit> {
|
||||||
sb.append(", start row: ").append(Bytes.toStringBinary(startRow));
|
sb.append(", start row: ").append(Bytes.toStringBinary(startRow));
|
||||||
sb.append(", end row: ").append(Bytes.toStringBinary(endRow));
|
sb.append(", end row: ").append(Bytes.toStringBinary(endRow));
|
||||||
sb.append(", region location: ").append(regionLocation);
|
sb.append(", region location: ").append(regionLocation);
|
||||||
|
sb.append(", encoded region name: ").append(encodedRegionName);
|
||||||
sb.append(")");
|
sb.append(")");
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
@ -355,6 +391,7 @@ implements Writable, Comparable<TableSplit> {
|
||||||
result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
|
result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
|
||||||
result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
|
result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
|
||||||
result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);
|
result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);
|
||||||
|
result = 31 * result + (encodedRegionName != null ? encodedRegionName.hashCode() : 0);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,13 +93,31 @@ public class TestTableSplit {
|
||||||
"location");
|
"location");
|
||||||
String str =
|
String str =
|
||||||
"HBase table split(table name: table, scan: , start row: row-start, "
|
"HBase table split(table name: table, scan: , start row: row-start, "
|
||||||
+ "end row: row-end, region location: location)";
|
+ "end row: row-end, region location: location, "
|
||||||
|
+ "encoded region name: )";
|
||||||
|
Assert.assertEquals(str, split.toString());
|
||||||
|
|
||||||
|
split =
|
||||||
|
new TableSplit(TableName.valueOf("table"), null, "row-start".getBytes(),
|
||||||
|
"row-end".getBytes(), "location", "encoded-region-name", 1000L);
|
||||||
|
str =
|
||||||
|
"HBase table split(table name: table, scan: , start row: row-start, "
|
||||||
|
+ "end row: row-end, region location: location, "
|
||||||
|
+ "encoded region name: encoded-region-name)";
|
||||||
Assert.assertEquals(str, split.toString());
|
Assert.assertEquals(str, split.toString());
|
||||||
|
|
||||||
split = new TableSplit((TableName) null, null, null, null);
|
split = new TableSplit((TableName) null, null, null, null);
|
||||||
str =
|
str =
|
||||||
"HBase table split(table name: null, scan: , start row: null, "
|
"HBase table split(table name: null, scan: , start row: null, "
|
||||||
+ "end row: null, region location: null)";
|
+ "end row: null, region location: null, "
|
||||||
|
+ "encoded region name: )";
|
||||||
|
Assert.assertEquals(str, split.toString());
|
||||||
|
|
||||||
|
split = new TableSplit((TableName) null, null, null, null, null, null, 1000L);
|
||||||
|
str =
|
||||||
|
"HBase table split(table name: null, scan: , start row: null, "
|
||||||
|
+ "end row: null, region location: null, "
|
||||||
|
+ "encoded region name: null)";
|
||||||
Assert.assertEquals(str, split.toString());
|
Assert.assertEquals(str, split.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue