diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3939f47d0ab..05d26fb5aee 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1172,30 +1172,41 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ byte[] family = e.getKey(); List kvs = e.getValue(); - + Map kvCount = new TreeMap(Bytes.BYTES_COMPARATOR); + Store store = getStore(family); for (KeyValue kv: kvs) { // Check if time is LATEST, change to time of most recent addition if so // This is expensive. if (kv.isLatestTimestamp() && kv.isDeleteType()) { + byte[] qual = kv.getQualifier(); + if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY; + Integer count = kvCount.get(qual); + if (count == null) { + kvCount.put(qual, new Integer(1)); + } else { + kvCount.put(qual, new Integer(count+1)); + } + count = kvCount.get(qual); + List result = new ArrayList(1); Get g = new Get(kv.getRow()); + g.setMaxVersions(count); NavigableSet qualifiers = new TreeSet(Bytes.BYTES_COMPARATOR); - byte [] q = kv.getQualifier(); - if(q == null) q = HConstants.EMPTY_BYTE_ARRAY; - qualifiers.add(q); - get(store, g, qualifiers, result); - if (result.isEmpty()) { - // Nothing to delete - continue; - } - if (result.size() > 1) { - throw new RuntimeException("Unexpected size: " + result.size()); - } - KeyValue getkv = result.get(0); - Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), - getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); + qualifiers.add(qual); + get(store, g, qualifiers, result); + if (result.size() < count) { + // Nothing to delete + kv.updateLatestStamp(byteNow); + continue; + } + if (result.size() > count) { + throw new RuntimeException("Unexpected size: " + result.size()); + } + KeyValue getkv = result.get(count - 1); + Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), + getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); } else { kv.updateLatestStamp(byteNow); }