diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f607f623895..f322111c745 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4529,8 +4529,8 @@ public class HRegion implements HeapSize { // , Writable{ boolean flush = false; WALEdit walEdits = null; List allKVs = new ArrayList(append.size()); - List kvs = new ArrayList(append.size()); - long now = EnvironmentEdgeManager.currentTimeMillis(); + Map> tempMemstore = new HashMap>(); + long before = EnvironmentEdgeManager.currentTimeMillis(); long size = 0; long txid = 0; @@ -4541,11 +4541,13 @@ public class HRegion implements HeapSize { // , Writable{ Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); try { + long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family for (Map.Entry> family : append.getFamilyMap() .entrySet()) { Store store = stores.get(family.getKey()); + List kvs = new ArrayList(family.getValue().size()); // Get previous values for all columns in this family Get get = new Get(row); @@ -4611,10 +4613,8 @@ public class HRegion implements HeapSize { // , Writable{ } } - // Write the KVs for this family into the store - size += store.upsert(kvs); - allKVs.addAll(kvs); - kvs.clear(); + //store the kvs to the temporary memstore before writing HLog + tempMemstore.put(store, kvs); } // Actually write to WAL now @@ -4624,9 +4624,16 @@ public class HRegion implements HeapSize { // , Writable{ // as a Put. txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), walEdits, - HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); + HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), + this.htableDescriptor); } + //Actually write to Memstore now + for (Map.Entry> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + size += store.upsert(entry.getValue()); + allKVs.addAll(entry.getValue()); + } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { @@ -4641,7 +4648,7 @@ public class HRegion implements HeapSize { // , Writable{ } long after = EnvironmentEdgeManager.currentTimeMillis(); - this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - now); + this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before); if (flush) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index eacddfb24ca..08949facf2a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -3611,6 +3612,103 @@ public class TestHRegion extends HBaseTestCase { this.region = null; } + /** + * TestCase for append + * + */ + private static class Appender implements Runnable { + private HRegion region; + private final static byte[] appendRow = Bytes.toBytes("appendRow"); + private final static byte[] family = Bytes.toBytes("family"); + private final static byte[] qualifier = Bytes.toBytes("qualifier"); + private final static byte[] CHAR = Bytes.toBytes("a"); + private int appendCounter; + + public Appender(HRegion region, int appendCounter) { + this.region = region; + this.appendCounter = appendCounter; + } + + @Override + public void run() { + int count = 0; + while (count < appendCounter) { + Append app = new Append(appendRow); + app.add(family, qualifier, CHAR); + count++; + try { + region.append(app, null, true); + } catch (IOException e) { + e.printStackTrace(); + break; + } + } + } + } + + /** + * Test case to check append function with memstore flushing + * @throws Exception + */ + @Test + public void testParallelAppendWithMemStoreFlush() throws Exception { + Configuration conf = HBaseConfiguration.create(); + String method = "testParallelAppendWithMemStoreFlush"; + byte[] tableName = Bytes.toBytes(method); + byte[] family = Appender.family; + this.region = initHRegion(tableName, method, conf, family); + final HRegion region = this.region; + final AtomicBoolean appendDone = new AtomicBoolean(false); + Runnable flusher = new Runnable() { + @Override + public void run() { + while (!appendDone.get()) { + try { + region.flushcache(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }; + + //after all append finished, the value will append to threadNum * appendCounter Appender.CHAR + int threadNum = 20; + int appendCounter = 100; + byte[] expected = new byte[threadNum * appendCounter]; + for (int i = 0; i < threadNum * appendCounter; i++) { + System.arraycopy(Appender.CHAR, 0, expected, i, 1); + } + Thread[] appenders = new Thread[threadNum]; + Thread flushThread = new Thread(flusher); + for (int i = 0; i < threadNum; i++) { + appenders[i] = new Thread(new Appender(this.region, appendCounter)); + appenders[i].start(); + } + flushThread.start(); + for (int i = 0; i < threadNum; i++) { + appenders[i].join(); + } + + appendDone.set(true); + flushThread.join(); + + Get get = new Get(Appender.appendRow); + get.addColumn(Appender.family, Appender.qualifier); + get.setMaxVersions(1); + Result res = this.region.get(get, null); + List kvs = res.getColumn(Appender.family, + Appender.qualifier); + + //we just got the latest version + assertEquals(kvs.size(), 1); + KeyValue kv = kvs.get(0); + byte[] appendResult = new byte[kv.getValueLength()]; + System.arraycopy(kv.getBuffer(), kv.getValueOffset(), appendResult, 0, kv.getValueLength()); + assertEquals(expected, appendResult); + this.region = null; + } + private void putData(int startRow, int numRows, byte [] qf, byte [] ...families) throws IOException {