From 9975cc0d5f9a4e9bf41fe4bf65403e2fb52a189a Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Tue, 12 Jun 2012 18:31:07 +0000 Subject: [PATCH] HBASE-6195 Increment data will be lost when the memstore is flushed (Xing Shi) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1349471 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HRegion.java | 22 +++-- .../hbase/regionserver/TestHRegion.java | 93 +++++++++++++++++++ 2 files changed, 107 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3f85607e576..f607f623895 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4675,8 +4675,8 @@ public class HRegion implements HeapSize { // , Writable{ boolean flush = false; WALEdit walEdits = null; List allKVs = new ArrayList(increment.numColumns()); - List kvs = new ArrayList(increment.numColumns()); - long now = EnvironmentEdgeManager.currentTimeMillis(); + Map> tempMemstore = new HashMap>(); + long before = EnvironmentEdgeManager.currentTimeMillis(); long size = 0; long txid = 0; @@ -4687,11 +4687,13 @@ public class HRegion implements HeapSize { // , Writable{ Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); try { + long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family for (Map.Entry> family : increment.getFamilyMap().entrySet()) { Store store = stores.get(family.getKey()); + List kvs = new ArrayList(family.getValue().size()); // Get previous values for all columns in this family Get get = new Get(row); @@ -4727,10 +4729,8 @@ public class HRegion implements HeapSize { // , Writable{ } } - // Write the KVs for this family into the store - size += store.upsert(kvs); - allKVs.addAll(kvs); - kvs.clear(); + //store the kvs to the temporary memstore before writing HLog + tempMemstore.put(store, kvs); } // Actually write to WAL now @@ -4739,10 +4739,16 @@ public class HRegion implements HeapSize { // , Writable{ // cluster. A slave cluster receives the final value (not the delta) // as a Put. txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), - walEdits, HConstants.DEFAULT_CLUSTER_ID, now, + walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor); } + //Actually write to Memstore now + for (Map.Entry> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + size += store.upsert(entry.getValue()); + allKVs.addAll(entry.getValue()); + } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { @@ -4757,7 +4763,7 @@ public class HRegion implements HeapSize { // , Writable{ } long after = EnvironmentEdgeManager.currentTimeMillis(); - this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - now); + this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before); if (flush) { // Request a cache flush. Do it outside update lock. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 2e6f02263bd..9fe25fd294d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -3517,6 +3518,98 @@ public class TestHRegion extends HBaseTestCase { HRegion.closeHRegion(region); } } + + /** + * TestCase for increment + * + */ + private static class Incrementer implements Runnable { + private HRegion region; + private final static byte[] incRow = Bytes.toBytes("incRow"); + private final static byte[] family = Bytes.toBytes("family"); + private final static byte[] qualifier = Bytes.toBytes("qualifier"); + private final static long ONE = 1l; + private int incCounter; + + public Incrementer(HRegion region, int incCounter) { + this.region = region; + this.incCounter = incCounter; + } + + @Override + public void run() { + int count = 0; + while (count < incCounter) { + Increment inc = new Increment(incRow); + inc.addColumn(family, qualifier, ONE); + count++; + try { + region.increment(inc, null, true); + } catch (IOException e) { + e.printStackTrace(); + break; + } + } + } + } + + /** + * Test case to check increment function with memstore flushing + * @throws Exception + */ + @Test + public void testParallelIncrementWithMemStoreFlush() throws Exception { + Configuration conf = HBaseConfiguration.create(); + String method = "testParallelismIncrementWithMemStoreFlush"; + byte[] tableName = Bytes.toBytes(method); + byte[] family = Incrementer.family; + this.region = initHRegion(tableName, method, conf, family); + final HRegion region = this.region; + final AtomicBoolean incrementDone = new AtomicBoolean(false); + Runnable reader = new Runnable() { + @Override + public void run() { + while (!incrementDone.get()) { + try { + region.flushcache(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }; + + //after all increment finished, the row will increment to 20*100 = 2000 + int threadNum = 20; + int incCounter = 100; + long expected = threadNum * incCounter; + Thread[] incrementers = new Thread[threadNum]; + Thread flushThread = new Thread(reader); + for (int i = 0; i < threadNum; i++) { + incrementers[i] = new Thread(new Incrementer(this.region, incCounter)); + incrementers[i].start(); + } + flushThread.start(); + for (int i = 0; i < threadNum; i++) { + incrementers[i].join(); + } + + incrementDone.set(true); + flushThread.join(); + + Get get = new Get(Incrementer.incRow); + get.addColumn(Incrementer.family, Incrementer.qualifier); + get.setMaxVersions(1); + Result res = this.region.get(get, null); + List kvs = res.getColumn(Incrementer.family, + Incrementer.qualifier); + + //we just got the latest version + assertEquals(kvs.size(), 1); + KeyValue kv = kvs.get(0); + assertEquals(expected, Bytes.toLong(kv.getBuffer(), kv.getValueOffset())); + this.region = null; + } private void putData(int startRow, int numRows, byte [] qf, byte [] ...families)