diff --git a/CHANGES.txt b/CHANGES.txt index ac9973ab3a8..d36cdaed720 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -126,6 +126,8 @@ Release 0.19.0 - Unreleased HBASE-938 Major compaction period is not checked periodically HBASE-947 [Optimization] Major compaction should remove deletes as well as the deleted cell + HBASE-675 Report correct server hosting a table split for assignment to + for MR Jobs NEW FEATURES HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters] diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java index 7e0c3398a16..9cbd4be0c2e 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java +++ b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java @@ -263,7 +263,7 @@ implements InputFormat { * {@link InputSplit} array. * * @param job the map task {@link JobConf} - * @param numSplits a hint to calculate the number of splits + * @param numSplits a hint to calculate the number of splits (mapred.map.tasks). * * @return the input splits * @@ -280,24 +280,23 @@ implements InputFormat { if (this.inputColumns == null || this.inputColumns.length == 0) { throw new IOException("Expecting at least one column"); } - int realNumSplits = numSplits > startKeys.length ? startKeys.length - : numSplits; + int realNumSplits = numSplits > startKeys.length? startKeys.length: + numSplits; InputSplit[] splits = new InputSplit[realNumSplits]; int middle = startKeys.length / realNumSplits; int startPos = 0; for (int i = 0; i < realNumSplits; i++) { int lastPos = startPos + middle; lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; + String regionLocation = table.getRegionLocation(startKeys[startPos]). + getServerAddress().getHostname(); splits[i] = new TableSplit(this.table.getTableName(), - startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] - : HConstants.EMPTY_START_ROW); - if (LOG.isDebugEnabled()) { - LOG.debug("split: " + i + "->" + splits[i]); - } + startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]: + HConstants.EMPTY_START_ROW, regionLocation); + LOG.info("split: " + i + "->" + splits[i]); startPos = lastPos; } return splits; - } /** diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java b/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java index 6405d78b080..759c03a658c 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java +++ b/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java @@ -30,15 +30,16 @@ import org.apache.hadoop.mapred.InputSplit; /** * A table split corresponds to a key range [low, high) */ -public class TableSplit implements InputSplit { +public class TableSplit implements InputSplit, Comparable { private byte [] m_tableName; private byte [] m_startRow; private byte [] m_endRow; + private String m_regionLocation; /** default constructor */ public TableSplit() { this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY, ""); } /** @@ -46,26 +47,38 @@ public class TableSplit implements InputSplit { * @param tableName * @param startRow * @param endRow + * @param location */ - public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow) { - m_tableName = tableName; - m_startRow = startRow; - m_endRow = endRow; + public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow, + final String location) { + this.m_tableName = tableName; + this.m_startRow = startRow; + this.m_endRow = endRow; + this.m_regionLocation = location; } /** @return table name */ public byte [] getTableName() { - return m_tableName; + return this.m_tableName; } /** @return starting row key */ public byte [] getStartRow() { - return m_startRow; + return this.m_startRow; } /** @return end row key */ public byte [] getEndRow() { - return m_endRow; + return this.m_endRow; + } + + /** @return the region's hostname */ + public String getRegionLocation() { + return this.m_regionLocation; + } + + public String[] getLocations() { + return new String[] {this.m_regionLocation}; } public long getLength() { @@ -73,26 +86,28 @@ public class TableSplit implements InputSplit { return 0; } - public String[] getLocations() { - // Return a random node from the cluster for now - return new String[] { }; - } - public void readFields(DataInput in) throws IOException { this.m_tableName = Bytes.readByteArray(in); this.m_startRow = Bytes.readByteArray(in); this.m_endRow = Bytes.readByteArray(in); + this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); } public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, this.m_tableName); Bytes.writeByteArray(out, this.m_startRow); Bytes.writeByteArray(out, this.m_endRow); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); } @Override public String toString() { - return Bytes.toString(m_tableName) +"," + Bytes.toString(m_startRow) + - "," + Bytes.toString(m_endRow); + return m_regionLocation + ":" + + Bytes.toString(m_startRow) + "," + Bytes.toString(m_endRow); } -} \ No newline at end of file + + public int compareTo(Object arg) { + TableSplit other = (TableSplit)arg; + return Bytes.compareTo(getStartRow(), other.getStartRow()); + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/package-info.java b/src/java/org/apache/hadoop/hbase/mapred/package-info.java index eea1a82e72b..cd5fd20177b 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/package-info.java +++ b/src/java/org/apache/hadoop/hbase/mapred/package-info.java @@ -85,7 +85,10 @@ or review the org.apache.hadoop.hbase.mapred.TestTableMapReduce uni specify source/sink table and column names in your configuration.

Reading from hbase, the TableInputFormat asks hbase for the list of -regions and makes a map-per-region. +regions and makes a map-per-region or mapred.map.tasks maps, +whichever is smaller (If your job only has two maps, up mapred.map.tasks +to a number > number of regions). Maps will run on the adjacent TaskTracker +if you are running a TaskTracer and RegionServer per node. Writing, it may make sense to avoid the reduce step and write yourself back into hbase from inside your map. You'd do this when your job does not need the sort and collation that mapreduce does on the map emitted data; on insert,