HBASE-2338 log recovery: deleted items may be resurrected; 2nd attempt... fixes broke build

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@929962 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-04-01 13:27:11 +00:00
parent 3e6935613c
commit 4cbfa4b594
1 changed files with 26 additions and 15 deletions

View File

@ -1172,30 +1172,41 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
byte[] family = e.getKey(); byte[] family = e.getKey();
List<KeyValue> kvs = e.getValue(); List<KeyValue> kvs = e.getValue();
Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
Store store = getStore(family); Store store = getStore(family);
for (KeyValue kv: kvs) { for (KeyValue kv: kvs) {
// Check if time is LATEST, change to time of most recent addition if so // Check if time is LATEST, change to time of most recent addition if so
// This is expensive. // This is expensive.
if (kv.isLatestTimestamp() && kv.isDeleteType()) { if (kv.isLatestTimestamp() && kv.isDeleteType()) {
byte[] qual = kv.getQualifier();
if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
Integer count = kvCount.get(qual);
if (count == null) {
kvCount.put(qual, new Integer(1));
} else {
kvCount.put(qual, new Integer(count+1));
}
count = kvCount.get(qual);
List<KeyValue> result = new ArrayList<KeyValue>(1); List<KeyValue> result = new ArrayList<KeyValue>(1);
Get g = new Get(kv.getRow()); Get g = new Get(kv.getRow());
g.setMaxVersions(count);
NavigableSet<byte []> qualifiers = NavigableSet<byte []> qualifiers =
new TreeSet<byte []>(Bytes.BYTES_COMPARATOR); new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
byte [] q = kv.getQualifier(); qualifiers.add(qual);
if(q == null) q = HConstants.EMPTY_BYTE_ARRAY; get(store, g, qualifiers, result);
qualifiers.add(q); if (result.size() < count) {
get(store, g, qualifiers, result); // Nothing to delete
if (result.isEmpty()) { kv.updateLatestStamp(byteNow);
// Nothing to delete continue;
continue; }
} if (result.size() > count) {
if (result.size() > 1) { throw new RuntimeException("Unexpected size: " + result.size());
throw new RuntimeException("Unexpected size: " + result.size()); }
} KeyValue getkv = result.get(count - 1);
KeyValue getkv = result.get(0); Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
} else { } else {
kv.updateLatestStamp(byteNow); kv.updateLatestStamp(byteNow);
} }