From 01246cdc99a03deb6696a6f7c3cbaa796007bd4d Mon Sep 17 00:00:00 2001 From: larsh Date: Tue, 17 Jan 2012 19:38:13 +0000 Subject: [PATCH] HBASE-5203 Group atomic put/delete operation into a single WALEdit to handle region server failures. (Lars H) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1232551 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hbase/client/Delete.java | 29 ++ .../hadoop/hbase/regionserver/HRegion.java | 331 +++++++++--------- .../regionserver/ReplicationSink.java | 122 +++---- .../hbase/client/TestFromClientSide.java | 12 +- .../regionserver/TestAtomicOperation.java | 2 + 5 files changed, 264 insertions(+), 232 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/src/main/java/org/apache/hadoop/hbase/client/Delete.java index 51bbc638d87..3869acf34c0 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -137,6 +137,35 @@ public class Delete extends Mutation familyMap.put(kv.getFamily(), list); } + /** + * Advanced use only. + * Add an existing delete marker to this Delete object. + * @param kv An existing KeyValue of type "delete". + * @return this for invocation chaining + * @throws IOException + */ + public Delete addDeleteMarker(KeyValue kv) throws IOException { + if (!kv.isDelete()) { + throw new IOException("The recently added KeyValue is not of type " + + "delete. Rowkey: " + Bytes.toStringBinary(this.row)); + } + if (Bytes.compareTo(this.row, 0, row.length, kv.getBuffer(), + kv.getRowOffset(), kv.getRowLength()) != 0) { + throw new IOException("The row in the recently added KeyValue " + + Bytes.toStringBinary(kv.getBuffer(), kv.getRowOffset(), + kv.getRowLength()) + " doesn't match the original one " + + Bytes.toStringBinary(this.row)); + } + byte [] family = kv.getFamily(); + List list = familyMap.get(family); + if (list == null) { + list = new ArrayList(); + } + list.add(kv); + familyMap.put(family, list); + return this; + } + /** * Delete all versions of all columns of the specified family. *

diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a018266e0f7..c7cc402587d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1686,7 +1686,7 @@ public class HRegion implements HeapSize { // , Writable{ try { // All edits for the given row (across all column families) must happen atomically. prepareDelete(delete); - internalDelete(delete, delete.getClusterId(), writeToWAL, null, null); + internalDelete(delete, delete.getClusterId(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1707,26 +1707,77 @@ public class HRegion implements HeapSize { // , Writable{ delete.setFamilyMap(familyMap); delete.setClusterId(clusterId); delete.setWriteToWAL(writeToWAL); - internalDelete(delete, clusterId, writeToWAL, null, null); + internalDelete(delete, clusterId, writeToWAL); + } + + /** + * Setup a Delete object with correct timestamps. + * Caller should the row and region locks. + * @param delete + * @param now + * @throws IOException + */ + private void prepareDeleteTimestamps(Delete delete, byte[] byteNow) + throws IOException { + Map> familyMap = delete.getFamilyMap(); + for (Map.Entry> e : familyMap.entrySet()) { + + byte[] family = e.getKey(); + List kvs = e.getValue(); + Map kvCount = new TreeMap(Bytes.BYTES_COMPARATOR); + + for (KeyValue kv: kvs) { + // Check if time is LATEST, change to time of most recent addition if so + // This is expensive. + if (kv.isLatestTimestamp() && kv.isDeleteType()) { + byte[] qual = kv.getQualifier(); + if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY; + + Integer count = kvCount.get(qual); + if (count == null) { + kvCount.put(qual, 1); + } else { + kvCount.put(qual, count + 1); + } + count = kvCount.get(qual); + + Get get = new Get(kv.getRow()); + get.setMaxVersions(count); + get.addColumn(family, qual); + + List result = get(get, false); + + if (result.size() < count) { + // Nothing to delete + kv.updateLatestStamp(byteNow); + continue; + } + if (result.size() > count) { + throw new RuntimeException("Unexpected size: " + result.size()); + } + KeyValue getkv = result.get(count - 1); + Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), + getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); + } else { + kv.updateLatestStamp(byteNow); + } + } + } } /** * @param delete The Delete command - * @param familyMap map of family to edits for the given family. + * @param clusterId UUID of the originating cluster (for replication). * @param writeToWAL - * @param writeEntry Optional mvcc write point to use - * @param walEdit Optional walEdit to use. A non-null walEdit indicates - * that the coprocessor hooks are run by the caller * @throws IOException */ private void internalDelete(Delete delete, UUID clusterId, - boolean writeToWAL, MultiVersionConsistencyControl.WriteEntry writeEntry, - WALEdit walEdit) throws IOException { + boolean writeToWAL) throws IOException { Map> familyMap = delete.getFamilyMap(); - WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit; + WALEdit walEdit = new WALEdit(); /* Run coprocessor pre hook outside of locks to avoid deadlock */ - if (coprocessorHost != null && walEdit == null) { - if (coprocessorHost.preDelete(delete, localWalEdit, writeToWAL)) { + if (coprocessorHost != null) { + if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) { return; } } @@ -1737,49 +1788,7 @@ public class HRegion implements HeapSize { // , Writable{ updatesLock.readLock().lock(); try { - for (Map.Entry> e : familyMap.entrySet()) { - - byte[] family = e.getKey(); - List kvs = e.getValue(); - Map kvCount = new TreeMap(Bytes.BYTES_COMPARATOR); - - for (KeyValue kv: kvs) { - // Check if time is LATEST, change to time of most recent addition if so - // This is expensive. - if (kv.isLatestTimestamp() && kv.isDeleteType()) { - byte[] qual = kv.getQualifier(); - if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY; - - Integer count = kvCount.get(qual); - if (count == null) { - kvCount.put(qual, 1); - } else { - kvCount.put(qual, count + 1); - } - count = kvCount.get(qual); - - Get get = new Get(kv.getRow()); - get.setMaxVersions(count); - get.addColumn(family, qual); - - List result = get(get, false); - - if (result.size() < count) { - // Nothing to delete - kv.updateLatestStamp(byteNow); - continue; - } - if (result.size() > count) { - throw new RuntimeException("Unexpected size: " + result.size()); - } - KeyValue getkv = result.get(count - 1); - Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), - getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); - } else { - kv.updateLatestStamp(byteNow); - } - } - } + prepareDeleteTimestamps(delete, byteNow); if (writeToWAL) { // write/sync to WAL should happen before we touch memstore. @@ -1790,21 +1799,21 @@ public class HRegion implements HeapSize { // , Writable{ // // bunch up all edits across all column families into a // single WALEdit. - addFamilyMapToWALEdit(familyMap, localWalEdit); + addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), - localWalEdit, clusterId, now, this.htableDescriptor); + walEdit, clusterId, now, this.htableDescriptor); } // Now make changes to the memstore. - long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry); + long addedSize = applyFamilyMapToMemstore(familyMap, null); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); } finally { this.updatesLock.readLock().unlock(); } // do after lock - if (coprocessorHost != null && walEdit == null) { - coprocessorHost.postDelete(delete, localWalEdit, writeToWAL); + if (coprocessorHost != null) { + coprocessorHost.postDelete(delete, walEdit, writeToWAL); } final long after = EnvironmentEdgeManager.currentTimeMillis(); final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix( @@ -1876,7 +1885,7 @@ public class HRegion implements HeapSize { // , Writable{ try { // All edits for the given row (across all column families) must happen atomically. - internalPut(put, put.getClusterId(), writeToWAL, null, null); + internalPut(put, put.getClusterId(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } @@ -2305,13 +2314,11 @@ public class HRegion implements HeapSize { // , Writable{ // originating cluster. A slave cluster receives the result as a Put // or Delete if (isPut) { - internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL, - null, null); + internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); } else { Delete d = (Delete)w; prepareDelete(d); - internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL, null, - null); + internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL); } return true; } @@ -2406,26 +2413,23 @@ public class HRegion implements HeapSize { // , Writable{ p.setFamilyMap(familyMap); p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); p.setWriteToWAL(true); - this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true, null, null); + this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true); } /** * Add updates first to the hlog (if writeToWal) and then add values to memstore. * Warning: Assumption is caller has lock on passed in row. * @param put The Put command + * @param clusterId UUID of the originating cluster (for replication). * @param writeToWAL if true, then we should write to the log - * @param writeEntry Optional mvcc write point to use - * @param walEdit Optional walEdit to use. A non-null walEdit indicates - * that the coprocessor hooks are run by the caller * @throws IOException */ - private void internalPut(Put put, UUID clusterId, boolean writeToWAL, - MultiVersionConsistencyControl.WriteEntry writeEntry, WALEdit walEdit) throws IOException { + private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException { Map> familyMap = put.getFamilyMap(); - WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit; + WALEdit walEdit = new WALEdit(); /* run pre put hook outside of lock to avoid deadlock */ - if (coprocessorHost != null && walEdit == null) { - if (coprocessorHost.prePut(put, localWalEdit, writeToWAL)) { + if (coprocessorHost != null) { + if (coprocessorHost.prePut(put, walEdit, writeToWAL)) { return; } } @@ -2445,19 +2449,19 @@ public class HRegion implements HeapSize { // , Writable{ // for some reason fail to write/sync to commit log, the memstore // will contain uncommitted transactions. if (writeToWAL) { - addFamilyMapToWALEdit(familyMap, localWalEdit); + addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), - localWalEdit, clusterId, now, this.htableDescriptor); + walEdit, clusterId, now, this.htableDescriptor); } - long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry); + long addedSize = applyFamilyMapToMemstore(familyMap, null); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); } finally { this.updatesLock.readLock().unlock(); } - if (coprocessorHost != null && walEdit == null) { - coprocessorHost.postPut(put, localWalEdit, writeToWAL); + if (coprocessorHost != null) { + coprocessorHost.postPut(put, walEdit, writeToWAL); } // do after lock @@ -4140,92 +4144,107 @@ public class HRegion implements HeapSize { // , Writable{ return results; } - public int mutateRow(RowMutation rm, + public void mutateRow(RowMutation rm, Integer lockid) throws IOException { + boolean flush = false; startRegionOperation(); - List walEdits = new ArrayList(rm.getMutations().size()); - - // 1. run all pre-hooks before the atomic operation - // if any pre hook indicates "bypass", bypass the entire operation - // Note that this requires creating the WALEdits here and passing - // them to the actual Put/Delete operations. - for (Mutation m : rm.getMutations()) { - WALEdit walEdit = new WALEdit(); - walEdits.add(walEdit); - if (coprocessorHost == null) { - continue; - } - if (m instanceof Put) { - if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { - // by pass everything - return 0; - } - } else if (m instanceof Delete) { - Delete d = (Delete) m; - prepareDelete(d); - if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) { - // by pass everything - return 0; - } - } - } - - // 2. acquire the row lock - Integer lid = getLock(lockid, rm.getRow(), true); - - // 3. acquire the region lock - this.updatesLock.readLock().lock(); - - // 4. Get a mvcc write number - MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert(); + Integer lid = null; try { - int i = 0; - // 5. Perform the actual mutations - for (Mutation m : rm.getMutations()) { - if (m instanceof Put) { - internalPut((Put) m, HConstants.DEFAULT_CLUSTER_ID, - m.getWriteToWAL(), w, walEdits.get(i)); - } else if (m instanceof Delete) { - Delete d = (Delete) m; - prepareDelete(d); - internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, d.getWriteToWAL(), - w, walEdits.get(i)); - } else { - throw new DoNotRetryIOException( - "Action must be Put or Delete. But was: " - + m.getClass().getName()); - } - i++; - } - return i; - } finally { - // 6. roll mvcc forward - mvcc.completeMemstoreInsert(w); - // 7. release region lock - this.updatesLock.readLock().unlock(); - try { - // 8. run all coprocessor post hooks - if (coprocessorHost != null) { - int i = 0; - for (Mutation m : rm.getMutations()) { - if (m instanceof Put) { - coprocessorHost.postPut((Put) m, walEdits.get(i), - m.getWriteToWAL()); - } else if (m instanceof Delete) { - coprocessorHost.postDelete((Delete) m, walEdits.get(i), - m.getWriteToWAL()); + // 1. run all pre-hooks before the atomic operation + // if any pre hook indicates "bypass", bypass the entire operation + + // one WALEdit is used for all edits. + WALEdit walEdit = new WALEdit(); + if (coprocessorHost != null) { + for (Mutation m : rm.getMutations()) { + if (m instanceof Put) { + if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { + // by pass everything + return; + } + } else if (m instanceof Delete) { + Delete d = (Delete) m; + prepareDelete(d); + if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) { + // by pass everything + return; } - i++; } } - } finally { - if (lid != null) { - // 9. release the row lock - releaseRowLock(lid); - } - closeRegionOperation(); } + + // 2. acquire the row lock + lid = getLock(lockid, rm.getRow(), true); + + // 3. acquire the region lock + this.updatesLock.readLock().lock(); + + // 4. Get a mvcc write number + MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert(); + + long now = EnvironmentEdgeManager.currentTimeMillis(); + byte[] byteNow = Bytes.toBytes(now); + try { + // 5. Check mutations and apply edits to a single WALEdit + for (Mutation m : rm.getMutations()) { + if (m instanceof Put) { + Map> familyMap = m.getFamilyMap(); + checkFamilies(familyMap.keySet()); + checkTimestamps(familyMap, now); + updateKVTimestamps(familyMap.values(), byteNow); + } else if (m instanceof Delete) { + Delete d = (Delete) m; + prepareDelete(d); + prepareDeleteTimestamps(d, byteNow); + } else { + throw new DoNotRetryIOException( + "Action must be Put or Delete. But was: " + + m.getClass().getName()); + } + if (m.getWriteToWAL()) { + addFamilyMapToWALEdit(m.getFamilyMap(), walEdit); + } + } + + // 6. append/sync all edits at once + // TODO: Do batching as in doMiniBatchPut + this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit, + HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); + + // 7. apply to memstore + long addedSize = 0; + for (Mutation m : rm.getMutations()) { + addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w); + } + flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); + } finally { + // 8. roll mvcc forward + mvcc.completeMemstoreInsert(w); + + // 9. release region lock + this.updatesLock.readLock().unlock(); + } + // 10. run all coprocessor post hooks, after region lock is released + if (coprocessorHost != null) { + for (Mutation m : rm.getMutations()) { + if (m instanceof Put) { + coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); + } else if (m instanceof Delete) { + coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL()); + } + } + } + } finally { + if (lid != null) { + // 11. release the row lock + releaseRowLock(lid); + } + if (flush) { + // 12. Flush cache if needed. Do it outside update lock. + requestFlush(); + } + closeRegionOperation(); } } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 7fe0ae590c2..9c3f387f364 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -23,11 +23,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -94,62 +94,39 @@ public class ReplicationSink { // to the same table. try { long totalReplicated = 0; - // Map of table => list of puts, we only want to flushCommits once per + // Map of table => list of Rows, we only want to flushCommits once per // invocation of this method per table. - Map> puts = new TreeMap>(Bytes.BYTES_COMPARATOR); + Map> rows = new TreeMap>(Bytes.BYTES_COMPARATOR); for (HLog.Entry entry : entries) { WALEdit edit = entry.getEdit(); + byte[] table = entry.getKey().getTablename(); + Put put = null; + Delete del = null; + KeyValue lastKV = null; List kvs = edit.getKeyValues(); - if (kvs.get(0).isDelete()) { - Delete delete = new Delete(kvs.get(0).getRow(), - kvs.get(0).getTimestamp(), null); - delete.setClusterId(entry.getKey().getClusterId()); - for (KeyValue kv : kvs) { - switch (Type.codeToType(kv.getType())) { - case DeleteFamily: - // family marker - delete.deleteFamily(kv.getFamily(), kv.getTimestamp()); - break; - case DeleteColumn: - // column marker - delete.deleteColumns(kv.getFamily(), kv.getQualifier(), - kv.getTimestamp()); - break; - case Delete: - // version marker - delete.deleteColumn(kv.getFamily(), kv.getQualifier(), - kv.getTimestamp()); - break; - } - } - delete(entry.getKey().getTablename(), delete); - } else { - byte[] table = entry.getKey().getTablename(); - List tableList = puts.get(table); - if (tableList == null) { - tableList = new ArrayList(); - puts.put(table, tableList); - } - // With mini-batching, we need to expect multiple rows per edit - byte[] lastKey = kvs.get(0).getRow(); - Put put = new Put(lastKey, kvs.get(0).getTimestamp()); - put.setClusterId(entry.getKey().getClusterId()); - for (KeyValue kv : kvs) { - byte[] key = kv.getRow(); - if (!Bytes.equals(lastKey, key)) { - tableList.add(put); - put = new Put(key, kv.getTimestamp()); + for (KeyValue kv : kvs) { + if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { + if (kv.isDelete()) { + del = new Delete(kv.getRow()); + del.setClusterId(entry.getKey().getClusterId()); + addToMultiMap(rows, table, del); + } else { + put = new Put(kv.getRow()); put.setClusterId(entry.getKey().getClusterId()); + addToMultiMap(rows, table, put); } - put.add(kv); - lastKey = key; } - tableList.add(put); + if (kv.isDelete()) { + del.addDeleteMarker(kv); + } else { + put.add(kv); + } + lastKV = kv; } totalReplicated++; } - for(byte [] table : puts.keySet()) { - put(table, puts.get(table)); + for(byte [] table : rows.keySet()) { + batch(table, rows.get(table)); } this.metrics.setAgeOfLastAppliedOp( entries[entries.length-1].getKey().getWriteTime()); @@ -162,39 +139,40 @@ public class ReplicationSink { } /** - * Do the puts and handle the pool + * Simple helper to a map from key to (a list of) values + * TODO: Make a general utility method + * @param map + * @param key + * @param value + * @return + */ + private List addToMultiMap(Map> map, K key, V value) { + List values = map.get(key); + if (values == null) { + values = new ArrayList(); + map.put(key, values); + } + values.add(value); + return values; + } + + /** + * Do the changes and handle the pool * @param tableName table to insert into - * @param puts list of puts + * @param rows list of actions * @throws IOException */ - private void put(byte[] tableName, List puts) throws IOException { - if (puts.isEmpty()) { + private void batch(byte[] tableName, List rows) throws IOException { + if (rows.isEmpty()) { return; } HTableInterface table = null; try { table = this.pool.getTable(tableName); - table.put(puts); - this.metrics.appliedOpsRate.inc(puts.size()); - } finally { - if (table != null) { - table.close(); - } - } - } - - /** - * Do the delete and handle the pool - * @param tableName table to delete in - * @param delete the delete to use - * @throws IOException - */ - private void delete(byte[] tableName, Delete delete) throws IOException { - HTableInterface table = null; - try { - table = this.pool.getTable(tableName); - table.delete(delete); - this.metrics.appliedOpsRate.inc(1); + table.batch(rows); + this.metrics.appliedOpsRate.inc(rows.size()); + } catch (InterruptedException ix) { + throw new IOException(ix); } finally { if (table != null) { table.close(); diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index ff338b3448c..95ab8e637ef 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -4046,7 +4047,6 @@ public class TestFromClientSide { Bytes.toBytes("a"), Bytes.toBytes("b") }; RowMutation arm = new RowMutation(ROW); - arm.add(new Delete(ROW)); Put p = new Put(ROW); p.add(FAMILY, QUALIFIERS[0], VALUE); arm.add(p); @@ -4054,15 +4054,19 @@ public class TestFromClientSide { Get g = new Get(ROW); Result r = t.get(g); - // delete was first, row should exist assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); arm = new RowMutation(ROW); + p = new Put(ROW); + p.add(FAMILY, QUALIFIERS[1], VALUE); arm.add(p); - arm.add(new Delete(ROW)); + Delete d = new Delete(ROW); + d.deleteColumns(FAMILY, QUALIFIERS[0]); + arm.add(d); t.batch(Arrays.asList((Row)arm)); r = t.get(g); - assertTrue(r.isEmpty()); + assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); + assertNull(r.getValue(FAMILY, QUALIFIERS[0])); } @Test diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 3c3196ee468..948bfec1f29 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -333,6 +333,8 @@ public class TestAtomicOperation extends HBaseTestCase { } } catch (IOException e) { e.printStackTrace(); + failures.incrementAndGet(); + fail(); } } }