HBASE-1740 ICV has a subtle race condition only visible under high load

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@814038 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-09-11 21:28:36 +00:00
parent cfef20aa18
commit a629f5db3e
6 changed files with 294 additions and 248 deletions

View File

@ -18,6 +18,7 @@ Release 0.21.0 - Unreleased
get tossed as 'duplicates'
HBASE-1794 recovered log files are not inserted into the storefile map
HBASE-1824 [stargate] default timestamp should be LATEST_TIMESTAMP
HBASE-1740 ICV has a subtle race condition only visible under high load
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable

View File

@ -2317,25 +2317,47 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
boolean flush = false;
// Lock row
Integer lid = obtainRowLock(row);
long result = 0L;
long result = amount;
try {
Store store = stores.get(family);
// Determine what to do and perform increment on returned KV, no insertion
Store.ICVResult vas =
store.incrementColumnValue(row, family, qualifier, amount);
// Write incremented value to WAL before inserting
// Get the old value:
Get get = new Get(row);
get.addColumn(family, qualifier);
List<KeyValue> results = new ArrayList<KeyValue>();
NavigableSet<byte[]> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qualifier);
store.get(get, qualifiers, results);
if (!results.isEmpty()) {
byte [] oldValue = results.get(0).getValue();
KeyValue kv = results.get(0);
byte [] buffer = kv.getBuffer();
int valueOffset = kv.getValueOffset();
result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
}
// bulid the KeyValue now:
KeyValue newKv = new KeyValue(row, family,
qualifier, System.currentTimeMillis(),
Bytes.toBytes(result));
// now log it:
if (writeToWAL) {
long now = System.currentTimeMillis();
List<KeyValue> edits = new ArrayList<KeyValue>(1);
edits.add(vas.kv);
edits.add(newKv);
this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), edits,
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
}
// Insert to the Store
store.add(vas.kv);
result = vas.value;
long size = this.memstoreSize.addAndGet(vas.sizeAdded);
// Now request the ICV to the store, this will set the timestamp
// appropriately depending on if there is a value in memcache or not.
// returns the
long size = store.updateColumnValue(row, family, qualifier, result);
size = this.memstoreSize.addAndGet(size);
flush = isFlushSize(size);
} finally {
releaseRowLock(lid);

View File

@ -497,6 +497,43 @@ public class MemStore implements HeapSize {
}
}
/**
* Gets from either the memstore or the snapshop, and returns a code
* to let you know which is which.
*
* @param matcher
* @param result
* @return 1 == memstore, 2 == snapshot, 0 == none
*/
int getWithCode(QueryMatcher matcher, List<KeyValue> result) throws IOException {
this.lock.readLock().lock();
try {
boolean fromMemstore = internalGet(this.kvset, matcher, result);
if (fromMemstore || matcher.isDone())
return 1;
matcher.update();
boolean fromSnapshot = internalGet(this.snapshot, matcher, result);
if (fromSnapshot || matcher.isDone())
return 2;
return 0;
} finally {
this.lock.readLock().unlock();
}
}
/**
* Small utility functions for use by Store.incrementColumnValue
* _only_ under the threat of pain and everlasting race conditions.
*/
void readLockLock() {
this.lock.readLock().lock();
}
void readLockUnlock() {
this.lock.readLock().unlock();
}
/**
*
* @param set memstore or snapshot

View File

@ -1462,37 +1462,22 @@ public class Store implements HConstants, HeapSize {
scanner.get(result);
}
/*
* Data structure to hold incrementColumnValue result.
*/
static class ICVResult {
final long value;
final long sizeAdded;
final KeyValue kv;
ICVResult(long value, long sizeAdded, KeyValue kv) {
this.value = value;
this.sizeAdded = sizeAdded;
this.kv = kv;
}
}
/**
* Increments the value for the given row/family/qualifier
* @param row
* @param f
* @param qualifier
* @param amount
* @return The new value.
* @param newValue the new value to set into memstore
* @return memstore size delta
* @throws IOException
*/
public ICVResult incrementColumnValue(byte [] row, byte [] f,
byte [] qualifier, long amount)
public long updateColumnValue(byte [] row, byte [] f,
byte [] qualifier, long newValue)
throws IOException {
long value = 0;
List<KeyValue> result = new ArrayList<KeyValue>();
KeyComparator keyComparator = this.comparator.getRawComparator();
KeyValue kv = null;
// Setting up the QueryMatcher
Get get = new Get(row);
NavigableSet<byte[]> qualifiers =
@ -1501,76 +1486,39 @@ public class Store implements HConstants, HeapSize {
QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
keyComparator, 1);
boolean newTs = true;
KeyValue kv = null;
// Read from memstore first:
this.memstore.internalGet(this.memstore.kvset,
matcher, result);
if (!result.isEmpty()) {
kv = result.get(0).clone();
newTs = false;
} else {
// try the snapshot.
this.memstore.internalGet(this.memstore.snapshot,
matcher, result);
if (!result.isEmpty()) {
// lock memstore snapshot for this critical section:
this.lock.readLock().lock();
memstore.readLockLock();
try {
int memstoreCode = this.memstore.getWithCode(matcher, result);
if (memstoreCode != 0) {
// was in memstore (or snapshot)
kv = result.get(0).clone();
}
}
if (kv != null) {
// Received early-out from memstore
// Make a copy of the KV and increment it
byte [] buffer = kv.getBuffer();
int valueOffset = kv.getValueOffset();
value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0,
Bytes.SIZEOF_LONG);
if (newTs) {
long currTs = System.currentTimeMillis();
if (currTs == kv.getTimestamp()) {
currTs++; // just in case something wacky happens.
}
byte [] stampBytes = Bytes.toBytes(currTs);
Bytes.putBytes(buffer, kv.getTimestampOffset(), stampBytes, 0,
byte [] buffer = kv.getBuffer();
int valueOffset = kv.getValueOffset();
Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0,
Bytes.SIZEOF_LONG);
if (memstoreCode == 2) {
// from snapshot, assign new TS
long currTs = System.currentTimeMillis();
if (currTs == kv.getTimestamp()) {
currTs++; // unlikely but catastrophic
}
Bytes.putBytes(buffer, kv.getTimestampOffset(),
Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG);
}
} else {
kv = new KeyValue(row, f, qualifier,
System.currentTimeMillis(),
Bytes.toBytes(newValue));
}
return new ICVResult(value, 0, kv);
return add(kv);
// end lock
} finally {
memstore.readLockUnlock();
this.lock.readLock().unlock();
}
// Check if we even have storefiles
if(this.storefiles.isEmpty()) {
return createNewKeyValue(row, f, qualifier, value, amount);
}
// Get storefiles for this store
List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
for (StoreFile sf : this.storefiles.descendingMap().values()) {
Reader r = sf.getReader();
if (r == null) {
LOG.warn("StoreFile " + sf + " has a null Reader");
continue;
}
storefileScanners.add(r.getScanner());
}
// StoreFileGetScan will handle reading this store's storefiles
StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
// Run a GET scan and put results into the specified list
scanner.get(result);
if(result.size() > 0) {
value = Bytes.toLong(result.get(0).getValue());
}
return createNewKeyValue(row, f, qualifier, value, amount);
}
private ICVResult createNewKeyValue(byte [] row, byte [] f,
byte [] qualifier, long value, long amount) {
long newValue = value + amount;
KeyValue newKv = new KeyValue(row, f, qualifier,
System.currentTimeMillis(),
Bytes.toBytes(newValue));
return new ICVResult(newValue, newKv.heapSize(), newKv);
}
public static final long FIXED_OVERHEAD = ClassSize.align(

View File

@ -60,9 +60,13 @@ public class TestHRegion extends HBaseTestCase {
// Test names
private final byte[] tableName = Bytes.toBytes("testtable");;
private final byte[] qual1 = Bytes.toBytes("qual1");
private final byte[] qual2 = Bytes.toBytes("qual2");
private final byte[] qual3 = Bytes.toBytes("qual3");
private final byte[] value1 = Bytes.toBytes("value1");
private final byte[] value2 = Bytes.toBytes("value2");
private final byte [] row = Bytes.toBytes("rowA");
private final byte [] row2 = Bytes.toBytes("rowB");
private final byte [] row3 = Bytes.toBytes("rowC");
/**
* @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
@ -1246,7 +1250,6 @@ public class TestHRegion extends HBaseTestCase {
byte [] col2 = Bytes.toBytes("Pub222");
Put put = new Put(row1);
put.add(family, col1, Bytes.toBytes(10L));
region.put(put);
@ -1275,12 +1278,167 @@ public class TestHRegion extends HBaseTestCase {
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(false, s.next(results));
assertEquals(0, results.size());
}
public void testIncrementColumnValue_UpdatingInPlace() throws IOException {
initHRegion(tableName, getName(), fam1);
long value = 1L;
long amount = 3L;
Put put = new Put(row);
put.add(fam1, qual1, Bytes.toBytes(value));
region.put(put);
long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
assertEquals(value+amount, result);
Store store = region.getStore(fam1);
assertEquals(1, store.memstore.kvset.size());
assertTrue(store.memstore.snapshot.isEmpty());
assertICV(row, fam1, qual1, value+amount);
}
public void testIncrementColumnValue_ConcurrentFlush() throws IOException {
initHRegion(tableName, getName(), fam1);
long value = 1L;
long amount = 3L;
Put put = new Put(row);
put.add(fam1, qual1, Bytes.toBytes(value));
region.put(put);
// now increment during a flush
Thread t = new Thread() {
public void run() {
try {
region.flushcache();
} catch (IOException e) {
LOG.info("test ICV, got IOE during flushcache()");
}
}
};
t.start();
long r = region.incrementColumnValue(row, fam1, qual1, amount, true);
assertEquals(value+amount, r);
// this also asserts there is only 1 KeyValue in the set.
assertICV(row, fam1, qual1, value+amount);
}
public void testIncrementColumnValue_UpdatingInPlace_Negative()
throws IOException {
initHRegion(tableName, getName(), fam1);
long value = 3L;
long amount = -1L;
Put put = new Put(row);
put.add(fam1, qual1, Bytes.toBytes(value));
region.put(put);
long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
assertEquals(value+amount, result);
assertICV(row, fam1, qual1, value+amount);
}
public void testIncrementColumnValue_AddingNew()
throws IOException {
initHRegion(tableName, getName(), fam1);
long value = 1L;
long amount = 3L;
Put put = new Put(row);
put.add(fam1, qual1, Bytes.toBytes(value));
put.add(fam1, qual2, Bytes.toBytes(value));
region.put(put);
long result = region.incrementColumnValue(row, fam1, qual3, amount, true);
assertEquals(amount, result);
Get get = new Get(row);
get.addColumn(fam1, qual3);
Result rr = region.get(get, null);
assertEquals(1, rr.size());
// ensure none of the other cols were incremented.
assertICV(row, fam1, qual1, value);
assertICV(row, fam1, qual2, value);
assertICV(row, fam1, qual3, amount);
}
public void testIncrementColumnValue_UpdatingFromSF() throws IOException {
initHRegion(tableName, getName(), fam1);
long value = 1L;
long amount = 3L;
Put put = new Put(row);
put.add(fam1, qual1, Bytes.toBytes(value));
put.add(fam1, qual2, Bytes.toBytes(value));
region.put(put);
// flush to disk.
region.flushcache();
Store store = region.getStore(fam1);
assertEquals(0, store.memstore.kvset.size());
long r = region.incrementColumnValue(row, fam1, qual1, amount, true);
assertEquals(value+amount, r);
assertICV(row, fam1, qual1, value+amount);
}
public void testIncrementColumnValue_AddingNewAfterSFCheck()
throws IOException {
initHRegion(tableName, getName(), fam1);
long value = 1L;
long amount = 3L;
Put put = new Put(row);
put.add(fam1, qual1, Bytes.toBytes(value));
put.add(fam1, qual2, Bytes.toBytes(value));
region.put(put);
region.flushcache();
Store store = region.getStore(fam1);
assertEquals(0, store.memstore.kvset.size());
long r = region.incrementColumnValue(row, fam1, qual3, amount, true);
assertEquals(amount, r);
assertICV(row, fam1, qual3, amount);
region.flushcache();
// ensure that this gets to disk.
assertICV(row, fam1, qual3, amount);
}
private void assertICV(byte [] row,
byte [] familiy,
byte[] qualifier,
long amount) throws IOException {
// run a get and see?
Get get = new Get(row);
get.addColumn(familiy, qualifier);
Result result = region.get(get, null);
assertEquals(1, result.size());
KeyValue kv = result.raw()[0];
long r = Bytes.toLong(kv.getValue());
assertEquals(amount, r);
}
public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions()
throws IOException {
byte [] tableName = Bytes.toBytes("testtable");

View File

@ -232,163 +232,40 @@ public class TestStore extends TestCase {
//////////////////////////////////////////////////////////////////////////////
// IncrementColumnValue tests
//////////////////////////////////////////////////////////////////////////////
/**
* Testing if the update in place works. When you want to update a value that
* is already in memstore, you don't delete it and put a new one, but just
* update the value in the original KeyValue
* @throws IOException
/*
* test the internal details of how ICV works, especially during a flush scenario.
*/
public void testIncrementColumnValue_UpdatingInPlace() throws IOException {
init(this.getName());
//Put data in memstore
long value = 1L;
long amount = 3L;
this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
assertEquals(value+amount, vas.value);
store.add(vas.kv);
Get get = new Get(row);
get.addColumn(family, qf1);
NavigableSet<byte[]> qualifiers =
new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qf1);
List<KeyValue> result = new ArrayList<KeyValue>();
this.store.get(get, qualifiers, result);
assertEquals(value + amount, Bytes.toLong(result.get(0).getValue()));
}
/**
* Same as above but for a negative number
* @throws IOException
*/
public void testIncrementColumnValue_UpdatingInPlace_Negative()
throws IOException {
init(this.getName());
//Put data in memstore
long value = 3L;
long amount = -1L;
this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
assertEquals(vas.value, value+amount);
store.add(vas.kv);
Get get = new Get(row);
get.addColumn(family, qf1);
NavigableSet<byte[]> qualifiers =
new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qf1);
List<KeyValue> result = new ArrayList<KeyValue>();
this.store.get(get, qualifiers, result);
assertEquals(value + amount, Bytes.toLong(result.get(0).getValue()));
}
/**
* When there is no mathing key already, adding a new.
* @throws IOException
*/
public void testIncrementColumnValue_AddingNew() throws IOException {
init(this.getName());
//Put data in memstore
long value = 1L;
long amount = 3L;
this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value)));
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf3, amount);
store.add(vas.kv);
Get get = new Get(row);
get.addColumn(family, qf3);
NavigableSet<byte[]> qualifiers =
new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qf3);
List<KeyValue> result = new ArrayList<KeyValue>();
this.store.get(get, qualifiers, result);
assertEquals(amount, Bytes.toLong(result.get(0).getValue()));
}
/**
* When we have the key in a file add a new key + value to memstore with the
* updates value.
* @throws IOException
*/
public void testIncrementColumnValue_UpdatingFromSF() throws IOException {
init(this.getName());
//Put data in memstore
long value = 1L;
long amount = 3L;
this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value)));
flush(1);
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
store.add(vas.kv);
Get get = new Get(row);
get.addColumn(family, qf1);
NavigableSet<byte[]> qualifiers =
new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qf1);
List<KeyValue> result = new ArrayList<KeyValue>();
this.store.get(get, qualifiers, result);
assertEquals(value + amount, Bytes.toLong(result.get(0).getValue()));
}
/**
* Same as testIncrementColumnValue_AddingNew() except that the keys are
* checked in file not in memstore
* @throws IOException
*/
public void testIncrementColumnValue_AddingNewAfterSFCheck()
throws IOException {
init(this.getName());
//Put data in memstore
long value = 1L;
long amount = 3L;
this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value)));
flush(1);
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf3, amount);
store.add(vas.kv);
Get get = new Get(row);
get.addColumn(family, qf3);
NavigableSet<byte[]> qualifiers =
new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qf3);
List<KeyValue> result = new ArrayList<KeyValue>();
this.store.get(get, qualifiers, result);
assertEquals(amount, Bytes.toLong(result.get(0).getValue()));
}
public void testIncrementColumnValue_ICVDuringFlush()
throws IOException {
init(this.getName());
long value = 1L;
long amount = 3L;
long oldValue = 1L;
long newValue = 3L;
this.store.add(new KeyValue(row, family, qf1,
System.currentTimeMillis(),
Bytes.toBytes(value)));
Bytes.toBytes(oldValue)));
// snapshot the store.
this.store.snapshot();
// incrment during the snapshot...
// add other things:
this.store.add(new KeyValue(row, family, qf2,
System.currentTimeMillis(),
Bytes.toBytes(oldValue)));
Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
// update during the snapshot.
long ret = this.store.updateColumnValue(row, family, qf1, newValue);
// memstore should have grown by some amount.
assertTrue(ret > 0);
// then flush.
this.store.flushCache(id++);
assertEquals(1, this.store.getStorefiles().size());
assertEquals(0, this.store.memstore.kvset.size());
// from the one we inserted up there, and a new one
assertEquals(2, this.store.memstore.kvset.size());
// how many key/values for this row are there?
Get get = new Get(row);
get.addColumn(family, qf1);
get.setMaxVersions(); // all versions.
@ -398,12 +275,15 @@ public class TestStore extends TestCase {
cols.add(qf1);
this.store.get(get, cols, results);
// only one, because Store.ICV doesnt add to memcache.
assertEquals(1, results.size());
assertEquals(2, results.size());
long ts1 = results.get(0).getTimestamp();
long ts2 = results.get(1).getTimestamp();
assertTrue(ts1 > ts2);
assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
// but the timestamps should be different...
long icvTs = vas.kv.getTimestamp();
long storeTs = results.get(0).getTimestamp();
assertTrue(icvTs != storeTs);
}
}