diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 47dafd5a699..0f1dc701f24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5146,6 +5146,8 @@ public class HRegion implements HeapSize { // , Writable{ int idx = 0; for (Cell kv: family.getValue()) { long amount = Bytes.toLong(CellUtil.cloneValue(kv)); + boolean noWriteBack = (amount == 0); + Cell c = null; if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) { c = results.get(idx); @@ -5187,57 +5189,66 @@ public class HRegion implements HeapSize { // , Writable{ newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV)); } - kvs.add(newKV); + allKVs.add(newKV); - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); + if (!noWriteBack) { + kvs.add(newKV); + + // Prepare WAL updates + if (writeToWAL) { + if (walEdits == null) { + walEdits = new WALEdit(); + } + walEdits.add(newKV); } - walEdits.add(newKV); } } //store the kvs to the temporary memstore before writing HLog - tempMemstore.put(store, kvs); + if (!kvs.isEmpty()) { + tempMemstore.put(store, kvs); + } } // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), - this.htableDescriptor.getTableName(), walEdits, new ArrayList(), - EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, - true, nonceGroup, nonce); - } else { - recordMutationWithoutWal(increment.getFamilyCellMap()); + if (walEdits != null && !walEdits.isEmpty()) { + if (writeToWAL) { + // Using default cluster id, as this can only happen in the orginating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + txid = this.log.appendNoSync(this.getRegionInfo(), + this.htableDescriptor.getTableName(), walEdits, new ArrayList(), + EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, + true, nonceGroup, nonce); + } else { + recordMutationWithoutWal(increment.getFamilyCellMap()); + } } //Actually write to Memstore now - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - for (Cell cell : entry.getValue()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - size += store.add(kv); + if (!tempMemstore.isEmpty()) { + for (Map.Entry> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + if (store.getFamily().getMaxVersions() == 1) { + // upsert if VERSIONS for this CF == 1 + size += store.upsert(entry.getValue(), getSmallestReadPoint()); + } else { + // otherwise keep older versions around + for (Cell cell : entry.getValue()) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + size += store.add(kv); + } } } - allKVs.addAll(entry.getValue()); + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); } finally { this.updatesLock.readLock().unlock(); } } finally { rowLock.release(); } - if (writeToWAL) { + if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) { // sync the transaction log outside the rowlock syncOrDefer(txid, durability); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java index 8f74ce7eefd..8b26f589d2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java @@ -32,7 +32,9 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -130,6 +132,62 @@ public class TestDurability { verifyHLogCount(wal, 12); } + @Test + public void testIncrement() throws Exception { + byte[] row1 = Bytes.toBytes("row1"); + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + byte[] col3 = Bytes.toBytes("col3"); + + // Setting up region + HLog wal = HLogFactory.createHLog(FS, DIR, "myhlogdir", + "myhlogdir_archive", CONF); + byte[] tableName = Bytes.toBytes("TestIncrement"); + HRegion region = createHRegion(tableName, "increment", wal, false); + + // col1: amount = 1, 1 write back to WAL + Increment inc1 = new Increment(row1); + inc1.addColumn(FAMILY, col1, 1); + Result res = region.increment(inc1); + assertEquals(1, res.size()); + assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); + verifyHLogCount(wal, 1); + + // col1: amount = 0, 0 write back to WAL + inc1 = new Increment(row1); + inc1.addColumn(FAMILY, col1, 0); + res = region.increment(inc1); + assertEquals(1, res.size()); + assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); + verifyHLogCount(wal, 1); + + // col1: amount = 0, col2: amount = 0, col3: amount = 0 + // 0 write back to WAL + inc1 = new Increment(row1); + inc1.addColumn(FAMILY, col1, 0); + inc1.addColumn(FAMILY, col2, 0); + inc1.addColumn(FAMILY, col3, 0); + res = region.increment(inc1); + assertEquals(3, res.size()); + assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); + assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2))); + assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3))); + verifyHLogCount(wal, 1); + + // col1: amount = 5, col2: amount = 4, col3: amount = 3 + // 1 write back to WAL + inc1 = new Increment(row1); + inc1.addColumn(FAMILY, col1, 5); + inc1.addColumn(FAMILY, col2, 4); + inc1.addColumn(FAMILY, col3, 3); + res = region.increment(inc1); + assertEquals(3, res.size()); + assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1))); + assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2))); + assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3))); + verifyHLogCount(wal, 2); + } + private Put newPut(Durability durability) { Put p = new Put(ROW); p.add(FAMILY, COL, COL);