HBASE-2553 Revisit IncrementColumnValue implementation in 0.22
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@964973 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
edf052f87a
commit
2fd862b403
|
@ -26,6 +26,7 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-2397 Bytes.toStringBinary escapes printable chars
|
HBASE-2397 Bytes.toStringBinary escapes printable chars
|
||||||
HBASE-2771 Update our hadoop jar to be latest from 0.20-append branch
|
HBASE-2771 Update our hadoop jar to be latest from 0.20-append branch
|
||||||
HBASE-2803 Remove remaining Get code from Store.java,etc
|
HBASE-2803 Remove remaining Get code from Store.java,etc
|
||||||
|
HBASE-2553 Revisit IncrementColumnValue implementation in 0.22
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
HBASE-1791 Timeout in IndexRecordWriter (Bradford Stephens via Andrew
|
HBASE-1791 Timeout in IndexRecordWriter (Bradford Stephens via Andrew
|
||||||
|
|
|
@ -351,6 +351,74 @@ public class MemStore implements HeapSize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given the specs of a column, update it, first by inserting a new record,
|
||||||
|
* then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
|
||||||
|
* will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
|
||||||
|
* store will ensure that the insert/delete each are atomic. A scanner/reader will either
|
||||||
|
* get the new value, or the old value and all readers will eventually only see the new
|
||||||
|
* value after the old was removed.
|
||||||
|
*
|
||||||
|
* @param row
|
||||||
|
* @param family
|
||||||
|
* @param qualifier
|
||||||
|
* @param newValue
|
||||||
|
* @param now
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public long updateColumnValue(byte[] row,
|
||||||
|
byte[] family,
|
||||||
|
byte[] qualifier,
|
||||||
|
long newValue,
|
||||||
|
long now) {
|
||||||
|
this.lock.readLock().lock();
|
||||||
|
try {
|
||||||
|
// create a new KeyValue with 'now' and a 0 memstoreTS == immediately visible
|
||||||
|
KeyValue newKv = new KeyValue(row, family, qualifier,
|
||||||
|
now,
|
||||||
|
Bytes.toBytes(newValue));
|
||||||
|
|
||||||
|
long addedSize = add(newKv);
|
||||||
|
|
||||||
|
// now find and RM the old one(s) to prevent version explosion:
|
||||||
|
SortedSet<KeyValue> ss = kvset.tailSet(newKv);
|
||||||
|
Iterator<KeyValue> it = ss.iterator();
|
||||||
|
while ( it.hasNext() ) {
|
||||||
|
KeyValue kv = it.next();
|
||||||
|
|
||||||
|
if (kv == newKv) {
|
||||||
|
// ignore the one i just put in (heh)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// if this isnt the row we are interested in, then bail:
|
||||||
|
if (0 != Bytes.compareTo(
|
||||||
|
newKv.getBuffer(), newKv.getRowOffset(), newKv.getRowLength(),
|
||||||
|
kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
|
||||||
|
break; // rows dont match, bail.
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the qualifier matches and it's a put, just RM it out of the kvset.
|
||||||
|
if (0 == Bytes.compareTo(
|
||||||
|
newKv.getBuffer(), newKv.getQualifierOffset(), newKv.getQualifierLength(),
|
||||||
|
kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength())) {
|
||||||
|
|
||||||
|
// to be extra safe we only remove Puts that have a memstoreTS==0
|
||||||
|
if (kv.getType() == KeyValue.Type.Put.getCode() &&
|
||||||
|
kv.getMemstoreTS() == 0) {
|
||||||
|
// false means there was a change, so give us the size.
|
||||||
|
addedSize -= heapSizeChange(kv, false);
|
||||||
|
|
||||||
|
it.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return addedSize;
|
||||||
|
} finally {
|
||||||
|
this.lock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Immutable data structure to hold member found in set and the set it was
|
* Immutable data structure to hold member found in set and the set it was
|
||||||
* found in. Include set because it is carrying context.
|
* found in. Include set because it is carrying context.
|
||||||
|
@ -400,76 +468,6 @@ public class MemStore implements HeapSize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// TODO fix this not to use QueryMatcher!
|
|
||||||
/**
|
|
||||||
* Gets from either the memstore or the snapshop, and returns a code
|
|
||||||
* to let you know which is which.
|
|
||||||
*
|
|
||||||
* @param matcher query matcher
|
|
||||||
* @param result puts results here
|
|
||||||
* @return 1 == memstore, 2 == snapshot, 0 == none
|
|
||||||
*/
|
|
||||||
int getWithCode(QueryMatcher matcher, List<KeyValue> result) {
|
|
||||||
this.lock.readLock().lock();
|
|
||||||
try {
|
|
||||||
boolean fromMemstore = internalGet(this.kvset, matcher, result);
|
|
||||||
if (fromMemstore || matcher.isDone())
|
|
||||||
return 1;
|
|
||||||
|
|
||||||
matcher.update();
|
|
||||||
boolean fromSnapshot = internalGet(this.snapshot, matcher, result);
|
|
||||||
if (fromSnapshot || matcher.isDone())
|
|
||||||
return 2;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
} finally {
|
|
||||||
this.lock.readLock().unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Small utility functions for use by Store.incrementColumnValue
|
|
||||||
* _only_ under the threat of pain and everlasting race conditions.
|
|
||||||
*/
|
|
||||||
void readLockLock() {
|
|
||||||
this.lock.readLock().lock();
|
|
||||||
}
|
|
||||||
void readLockUnlock() {
|
|
||||||
this.lock.readLock().unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param set memstore or snapshot
|
|
||||||
* @param matcher query matcher
|
|
||||||
* @param result list to add results to
|
|
||||||
* @return true if done with store (early-out), false if not
|
|
||||||
*/
|
|
||||||
boolean internalGet(final NavigableSet<KeyValue> set,
|
|
||||||
final QueryMatcher matcher, final List<KeyValue> result) {
|
|
||||||
if(set.isEmpty()) return false;
|
|
||||||
// Seek to startKey
|
|
||||||
SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
|
|
||||||
for (KeyValue kv : tail) {
|
|
||||||
QueryMatcher.MatchCode res = matcher.match(kv);
|
|
||||||
switch(res) {
|
|
||||||
case INCLUDE:
|
|
||||||
result.add(kv);
|
|
||||||
break;
|
|
||||||
case SKIP:
|
|
||||||
break;
|
|
||||||
case NEXT:
|
|
||||||
return false;
|
|
||||||
case DONE:
|
|
||||||
return true;
|
|
||||||
default:
|
|
||||||
throw new RuntimeException("Unexpected " + res);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if this memstore may contain the required keys
|
* Check if this memstore may contain the required keys
|
||||||
* @param scan
|
* @param scan
|
||||||
|
|
|
@ -1298,49 +1298,18 @@ public class Store implements HeapSize {
|
||||||
public long updateColumnValue(byte [] row, byte [] f,
|
public long updateColumnValue(byte [] row, byte [] f,
|
||||||
byte [] qualifier, long newValue)
|
byte [] qualifier, long newValue)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<KeyValue> result = new ArrayList<KeyValue>();
|
|
||||||
KeyComparator keyComparator = this.comparator.getRawComparator();
|
|
||||||
|
|
||||||
KeyValue kv = null;
|
|
||||||
// Setting up the QueryMatcher
|
|
||||||
Get get = new Get(row);
|
|
||||||
NavigableSet<byte[]> qualifiers =
|
|
||||||
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
|
||||||
qualifiers.add(qualifier);
|
|
||||||
QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
|
|
||||||
keyComparator, 1);
|
|
||||||
|
|
||||||
// lock memstore snapshot for this critical section:
|
|
||||||
this.lock.readLock().lock();
|
this.lock.readLock().lock();
|
||||||
memstore.readLockLock();
|
|
||||||
try {
|
try {
|
||||||
int memstoreCode = this.memstore.getWithCode(matcher, result);
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
return this.memstore.updateColumnValue(row,
|
||||||
|
f,
|
||||||
|
qualifier,
|
||||||
|
newValue,
|
||||||
|
now);
|
||||||
|
|
||||||
if (memstoreCode != 0) {
|
|
||||||
// was in memstore (or snapshot)
|
|
||||||
kv = result.get(0).clone();
|
|
||||||
byte [] buffer = kv.getBuffer();
|
|
||||||
int valueOffset = kv.getValueOffset();
|
|
||||||
Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0,
|
|
||||||
Bytes.SIZEOF_LONG);
|
|
||||||
if (memstoreCode == 2) {
|
|
||||||
// from snapshot, assign new TS
|
|
||||||
long currTs = System.currentTimeMillis();
|
|
||||||
if (currTs == kv.getTimestamp()) {
|
|
||||||
currTs++; // unlikely but catastrophic
|
|
||||||
}
|
|
||||||
Bytes.putBytes(buffer, kv.getTimestampOffset(),
|
|
||||||
Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
kv = new KeyValue(row, f, qualifier,
|
|
||||||
System.currentTimeMillis(),
|
|
||||||
Bytes.toBytes(newValue));
|
|
||||||
}
|
|
||||||
return add(kv);
|
|
||||||
// end lock
|
|
||||||
} finally {
|
} finally {
|
||||||
memstore.readLockUnlock();
|
|
||||||
this.lock.readLock().unlock();
|
this.lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -843,27 +843,27 @@ public class Bytes {
|
||||||
/**
|
/**
|
||||||
* Lexographically compare two arrays.
|
* Lexographically compare two arrays.
|
||||||
*
|
*
|
||||||
* @param b1 left operand
|
* @param buffer1 left operand
|
||||||
* @param b2 right operand
|
* @param buffer2 right operand
|
||||||
* @param s1 Where to start comparing in the left buffer
|
* @param offset1 Where to start comparing in the left buffer
|
||||||
* @param s2 Where to start comparing in the right buffer
|
* @param offset2 Where to start comparing in the right buffer
|
||||||
* @param l1 How much to compare from the left buffer
|
* @param length1 How much to compare from the left buffer
|
||||||
* @param l2 How much to compare from the right buffer
|
* @param length2 How much to compare from the right buffer
|
||||||
* @return 0 if equal, < 0 if left is less than right, etc.
|
* @return 0 if equal, < 0 if left is less than right, etc.
|
||||||
*/
|
*/
|
||||||
public static int compareTo(byte[] b1, int s1, int l1,
|
public static int compareTo(byte[] buffer1, int offset1, int length1,
|
||||||
byte[] b2, int s2, int l2) {
|
byte[] buffer2, int offset2, int length2) {
|
||||||
// Bring WritableComparator code local
|
// Bring WritableComparator code local
|
||||||
int end1 = s1 + l1;
|
int end1 = offset1 + length1;
|
||||||
int end2 = s2 + l2;
|
int end2 = offset2 + length2;
|
||||||
for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
|
for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
|
||||||
int a = (b1[i] & 0xff);
|
int a = (buffer1[i] & 0xff);
|
||||||
int b = (b2[j] & 0xff);
|
int b = (buffer2[j] & 0xff);
|
||||||
if (a != b) {
|
if (a != b) {
|
||||||
return a - b;
|
return a - b;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return l1 - l2;
|
return length1 - length2;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1859,7 +1859,8 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
assertEquals(value+amount, result);
|
assertEquals(value+amount, result);
|
||||||
|
|
||||||
Store store = region.getStore(fam1);
|
Store store = region.getStore(fam1);
|
||||||
assertEquals(1, store.memstore.kvset.size());
|
// we will have the original Put, and also the ICV'ed Put as well.
|
||||||
|
assertEquals(2, store.memstore.kvset.size());
|
||||||
assertTrue(store.memstore.snapshot.isEmpty());
|
assertTrue(store.memstore.snapshot.isEmpty());
|
||||||
|
|
||||||
assertICV(row, fam1, qual1, value+amount);
|
assertICV(row, fam1, qual1, value+amount);
|
||||||
|
|
Loading…
Reference in New Issue