HBASE-8874 PutCombiner is skipping KeyValues while combining puts of same row during bulkload (Rajeshbabu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1508125 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b7833c80f5
commit
342b69f497
hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce
@ -19,15 +19,18 @@
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.TreeMap;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
|
||||
/**
|
||||
@ -43,31 +46,50 @@ public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
|
||||
@Override
|
||||
protected void reduce(K row, Iterable<Put> vals, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
int cnt = 0;
|
||||
// There's nothing to say <code>K row</code> is the same as the rowkey
|
||||
// used to construct Puts (value) instances. Thus the map of put.getRow()
|
||||
// to combined Put is necessary.
|
||||
// TODO: would be better if we knew <code>K row</code> and Put rowkey were
|
||||
// identical. Then this whole Put buffering business goes away.
|
||||
// TODO: Could use HeapSize to create an upper bound on the memory size of
|
||||
// the puts map and flush some portion of the content while looping. This
|
||||
// Using HeapSize to create an upper bound on the memory size of
|
||||
// the puts and flush some portion of the content while looping. This
|
||||
// flush could result in multiple Puts for a single rowkey. That is
|
||||
// acceptable because Combiner is run as an optimization and it's not
|
||||
// critical that all Puts are grouped perfectly.
|
||||
Map<byte[], Put> puts = new TreeMap<byte[], Put>(Bytes.BYTES_COMPARATOR);
|
||||
long threshold = context.getConfiguration().getLong(
|
||||
"putcombiner.row.threshold", 1L * (1<<30));
|
||||
int cnt = 0;
|
||||
long curSize = 0;
|
||||
Put put = null;
|
||||
Map<byte[], List<? extends Cell>> familyMap = null;
|
||||
for (Put p : vals) {
|
||||
cnt++;
|
||||
if (!puts.containsKey(p.getRow())) {
|
||||
puts.put(p.getRow(), p);
|
||||
if (put == null) {
|
||||
put = p;
|
||||
familyMap = put.getFamilyMap();
|
||||
} else {
|
||||
puts.get(p.getRow()).getFamilyMap().putAll(p.getFamilyMap());
|
||||
for (Entry<byte[], List<? extends Cell>> entry : p.getFamilyMap()
|
||||
.entrySet()) {
|
||||
List<? extends Cell> cells = familyMap.get(entry.getKey());
|
||||
List<KeyValue> kvs = (cells != null) ? (List<KeyValue>) cells : null;
|
||||
for (Cell cell : entry.getValue()) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
curSize += kv.heapSize();
|
||||
if (kvs != null) {
|
||||
kvs.add(kv);
|
||||
}
|
||||
}
|
||||
if (cells == null) {
|
||||
familyMap.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
|
||||
if (curSize > threshold) {
|
||||
LOG.info(String.format("Combined %d Put(s) into %d.", cnt, 1));
|
||||
context.write(row, put);
|
||||
put = null;
|
||||
cnt = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (Put p : puts.values()) {
|
||||
context.write(row, p);
|
||||
if (put != null) {
|
||||
LOG.info(String.format("Combined %d Put(s) into %d.", cnt, 1));
|
||||
context.write(row, put);
|
||||
}
|
||||
LOG.info(String.format("Combined %d Put(s) into %d.", cnt, puts.size()));
|
||||
}
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ public class PutSortReducer extends
|
||||
{
|
||||
// although reduce() is called per-row, handle pathological case
|
||||
long threshold = context.getConfiguration().getLong(
|
||||
"putsortreducer.row.threshold", 2L * (1<<30));
|
||||
"putsortreducer.row.threshold", 1L * (1<<30));
|
||||
Iterator<Put> iter = puts.iterator();
|
||||
while (iter.hasNext()) {
|
||||
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
|
||||
@ -67,7 +67,7 @@ public class PutSortReducer extends
|
||||
for (Cell cell: cells) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
map.add(kv);
|
||||
curSize += kv.getLength();
|
||||
curSize += kv.heapSize();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user