HBASE-675 Report correct server hosting a table split for assignment to for MR Jobs

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@718509 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-11-18 06:56:30 +00:00
parent 9b4ca73d59
commit 83cef9ee78
4 changed files with 46 additions and 27 deletions

View File

@ -126,6 +126,8 @@ Release 0.19.0 - Unreleased
HBASE-938 Major compaction period is not checked periodically
HBASE-947 [Optimization] Major compaction should remove deletes as well as
the deleted cell
HBASE-675 Report correct server hosting a table split for assignment to
for MR Jobs
NEW FEATURES
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]

View File

@ -263,7 +263,7 @@ implements InputFormat<ImmutableBytesWritable, RowResult> {
* {@link InputSplit} array.
*
* @param job the map task {@link JobConf}
* @param numSplits a hint to calculate the number of splits
* @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
*
* @return the input splits
*
@ -280,24 +280,23 @@ implements InputFormat<ImmutableBytesWritable, RowResult> {
if (this.inputColumns == null || this.inputColumns.length == 0) {
throw new IOException("Expecting at least one column");
}
int realNumSplits = numSplits > startKeys.length ? startKeys.length
: numSplits;
int realNumSplits = numSplits > startKeys.length? startKeys.length:
numSplits;
InputSplit[] splits = new InputSplit[realNumSplits];
int middle = startKeys.length / realNumSplits;
int startPos = 0;
for (int i = 0; i < realNumSplits; i++) {
int lastPos = startPos + middle;
lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
String regionLocation = table.getRegionLocation(startKeys[startPos]).
getServerAddress().getHostname();
splits[i] = new TableSplit(this.table.getTableName(),
startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]
: HConstants.EMPTY_START_ROW);
if (LOG.isDebugEnabled()) {
LOG.debug("split: " + i + "->" + splits[i]);
}
startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
HConstants.EMPTY_START_ROW, regionLocation);
LOG.info("split: " + i + "->" + splits[i]);
startPos = lastPos;
}
return splits;
}
/**

View File

@ -30,15 +30,16 @@ import org.apache.hadoop.mapred.InputSplit;
/**
* A table split corresponds to a key range [low, high)
*/
public class TableSplit implements InputSplit {
public class TableSplit implements InputSplit, Comparable {
private byte [] m_tableName;
private byte [] m_startRow;
private byte [] m_endRow;
private String m_regionLocation;
/** default constructor */
public TableSplit() {
this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY);
HConstants.EMPTY_BYTE_ARRAY, "");
}
/**
@ -46,26 +47,38 @@ public class TableSplit implements InputSplit {
* @param tableName
* @param startRow
* @param endRow
* @param location
*/
public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow) {
m_tableName = tableName;
m_startRow = startRow;
m_endRow = endRow;
public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
final String location) {
this.m_tableName = tableName;
this.m_startRow = startRow;
this.m_endRow = endRow;
this.m_regionLocation = location;
}
/** @return table name */
public byte [] getTableName() {
return m_tableName;
return this.m_tableName;
}
/** @return starting row key */
public byte [] getStartRow() {
return m_startRow;
return this.m_startRow;
}
/** @return end row key */
public byte [] getEndRow() {
return m_endRow;
return this.m_endRow;
}
/** @return the region's hostname */
public String getRegionLocation() {
return this.m_regionLocation;
}
public String[] getLocations() {
return new String[] {this.m_regionLocation};
}
public long getLength() {
@ -73,26 +86,28 @@ public class TableSplit implements InputSplit {
return 0;
}
public String[] getLocations() {
// Return a random node from the cluster for now
return new String[] { };
}
public void readFields(DataInput in) throws IOException {
this.m_tableName = Bytes.readByteArray(in);
this.m_startRow = Bytes.readByteArray(in);
this.m_endRow = Bytes.readByteArray(in);
this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
}
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.m_tableName);
Bytes.writeByteArray(out, this.m_startRow);
Bytes.writeByteArray(out, this.m_endRow);
Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
}
@Override
public String toString() {
return Bytes.toString(m_tableName) +"," + Bytes.toString(m_startRow) +
"," + Bytes.toString(m_endRow);
return m_regionLocation + ":" +
Bytes.toString(m_startRow) + "," + Bytes.toString(m_endRow);
}
}
public int compareTo(Object arg) {
TableSplit other = (TableSplit)arg;
return Bytes.compareTo(getStartRow(), other.getStartRow());
}
}

View File

@ -85,7 +85,10 @@ or review the <code>org.apache.hadoop.hbase.mapred.TestTableMapReduce</code> uni
specify source/sink table and column names in your configuration.</p>
<p>Reading from hbase, the TableInputFormat asks hbase for the list of
regions and makes a map-per-region.
regions and makes a map-per-region or <code>mapred.map.tasks maps</code>,
whichever is smaller (If your job only has two maps, up mapred.map.tasks
to a number > number of regions). Maps will run on the adjacent TaskTracker
if you are running a TaskTracer and RegionServer per node.
Writing, it may make sense to avoid the reduce step and write yourself back into
hbase from inside your map. You'd do this when your job does not need the sort
and collation that mapreduce does on the map emitted data; on insert,