HBASE-6195 Increment data will be lost when the memstore is flushed (Xing Shi)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1349471 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-06-12 18:31:07 +00:00
parent b8d3979505
commit 9975cc0d5f
2 changed files with 107 additions and 8 deletions

View File

@ -4675,8 +4675,8 @@ public class HRegion implements HeapSize { // , Writable{
boolean flush = false;
WALEdit walEdits = null;
List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
long now = EnvironmentEdgeManager.currentTimeMillis();
Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
long before = EnvironmentEdgeManager.currentTimeMillis();
long size = 0;
long txid = 0;
@ -4687,11 +4687,13 @@ public class HRegion implements HeapSize { // , Writable{
Integer lid = getLock(lockid, row, true);
this.updatesLock.readLock().lock();
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
increment.getFamilyMap().entrySet()) {
Store store = stores.get(family.getKey());
List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
// Get previous values for all columns in this family
Get get = new Get(row);
@ -4727,10 +4729,8 @@ public class HRegion implements HeapSize { // , Writable{
}
}
// Write the KVs for this family into the store
size += store.upsert(kvs);
allKVs.addAll(kvs);
kvs.clear();
//store the kvs to the temporary memstore before writing HLog
tempMemstore.put(store, kvs);
}
// Actually write to WAL now
@ -4739,10 +4739,16 @@ public class HRegion implements HeapSize { // , Writable{
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
walEdits, HConstants.DEFAULT_CLUSTER_ID, now,
walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
}
//Actually write to Memstore now
for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
size += store.upsert(entry.getValue());
allKVs.addAll(entry.getValue());
}
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
} finally {
@ -4757,7 +4763,7 @@ public class HRegion implements HeapSize { // , Writable{
}
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - now);
this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
if (flush) {
// Request a cache flush. Do it outside update lock.

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -3517,6 +3518,98 @@ public class TestHRegion extends HBaseTestCase {
HRegion.closeHRegion(region);
}
}
/**
* TestCase for increment
*
*/
private static class Incrementer implements Runnable {
private HRegion region;
private final static byte[] incRow = Bytes.toBytes("incRow");
private final static byte[] family = Bytes.toBytes("family");
private final static byte[] qualifier = Bytes.toBytes("qualifier");
private final static long ONE = 1l;
private int incCounter;
public Incrementer(HRegion region, int incCounter) {
this.region = region;
this.incCounter = incCounter;
}
@Override
public void run() {
int count = 0;
while (count < incCounter) {
Increment inc = new Increment(incRow);
inc.addColumn(family, qualifier, ONE);
count++;
try {
region.increment(inc, null, true);
} catch (IOException e) {
e.printStackTrace();
break;
}
}
}
}
/**
* Test case to check increment function with memstore flushing
* @throws Exception
*/
@Test
public void testParallelIncrementWithMemStoreFlush() throws Exception {
Configuration conf = HBaseConfiguration.create();
String method = "testParallelismIncrementWithMemStoreFlush";
byte[] tableName = Bytes.toBytes(method);
byte[] family = Incrementer.family;
this.region = initHRegion(tableName, method, conf, family);
final HRegion region = this.region;
final AtomicBoolean incrementDone = new AtomicBoolean(false);
Runnable reader = new Runnable() {
@Override
public void run() {
while (!incrementDone.get()) {
try {
region.flushcache();
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
//after all increment finished, the row will increment to 20*100 = 2000
int threadNum = 20;
int incCounter = 100;
long expected = threadNum * incCounter;
Thread[] incrementers = new Thread[threadNum];
Thread flushThread = new Thread(reader);
for (int i = 0; i < threadNum; i++) {
incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
incrementers[i].start();
}
flushThread.start();
for (int i = 0; i < threadNum; i++) {
incrementers[i].join();
}
incrementDone.set(true);
flushThread.join();
Get get = new Get(Incrementer.incRow);
get.addColumn(Incrementer.family, Incrementer.qualifier);
get.setMaxVersions(1);
Result res = this.region.get(get, null);
List<KeyValue> kvs = res.getColumn(Incrementer.family,
Incrementer.qualifier);
//we just got the latest version
assertEquals(kvs.size(), 1);
KeyValue kv = kvs.get(0);
assertEquals(expected, Bytes.toLong(kv.getBuffer(), kv.getValueOffset()));
this.region = null;
}
private void putData(int startRow, int numRows, byte [] qf,
byte [] ...families)