From 2dcc010d36ca63a3e626e8e6dd287295d11a84e5 Mon Sep 17 00:00:00 2001 From: larsh Date: Tue, 27 Dec 2011 01:44:14 +0000 Subject: [PATCH] HBASE-5096 Replication does not handle deletes correctly. (Lars H) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1224851 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../regionserver/ReplicationSink.java | 21 +++-- .../hbase/replication/TestReplication.java | 89 +++++++++++++++++++ 3 files changed, 106 insertions(+), 5 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index b4054cc4082..f0a5d668b2e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -461,6 +461,7 @@ Release 0.92.0 - Unreleased HBASE-5078 DistributedLogSplitter failing to split file because it has edits for lots of regions HBASE-5077 SplitLogWorker fails to let go of a task, kills the RS + HBASE-5096 Replication does not handle deletes correctly. (Lars H) TESTS HBASE-4450 test for number of blocks read: to serve as baseline for expected diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index c06f964743b..7fe0ae590c2 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; @@ -104,11 +105,21 @@ public class ReplicationSink { kvs.get(0).getTimestamp(), null); delete.setClusterId(entry.getKey().getClusterId()); for (KeyValue kv : kvs) { - if (kv.isDeleteFamily()) { - delete.deleteFamily(kv.getFamily()); - } else if (!kv.isEmptyColumn()) { - delete.deleteColumn(kv.getFamily(), - kv.getQualifier()); + switch (Type.codeToType(kv.getType())) { + case DeleteFamily: + // family marker + delete.deleteFamily(kv.getFamily(), kv.getTimestamp()); + break; + case DeleteColumn: + // column marker + delete.deleteColumns(kv.getFamily(), kv.getQualifier(), + kv.getTimestamp()); + break; + case Delete: + // version marker + delete.deleteColumn(kv.getFamily(), kv.getQualifier(), + kv.getTimestamp()); + break; } } delete(entry.getKey().getTablename(), delete); diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java index 3e8b8003e5d..5e25bc96e0e 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -195,6 +196,94 @@ public class TestReplication { utility1.shutdownMiniCluster(); } + /** + * Verify that version and column delete marker types are replicated + * correctly. + * @throws Exception + */ + @Test(timeout=300000) + public void testDeleteTypes() throws Exception { + LOG.info("testDeleteTypes"); + final byte[] v1 = Bytes.toBytes("v1"); + final byte[] v2 = Bytes.toBytes("v2"); + final byte[] v3 = Bytes.toBytes("v3"); + htable1 = new HTable(conf1, tableName); + + long t = EnvironmentEdgeManager.currentTimeMillis(); + // create three versions for "row" + Put put = new Put(row); + put.add(famName, row, t, v1); + htable1.put(put); + + put = new Put(row); + put.add(famName, row, t+1, v2); + htable1.put(put); + + put = new Put(row); + put.add(famName, row, t+2, v3); + htable1.put(put); + + Get get = new Get(row); + get.setMaxVersions(); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() < 3) { + LOG.info("Rows not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.raw()[0].getValue(), v3); + assertArrayEquals(res.raw()[1].getValue(), v2); + assertArrayEquals(res.raw()[2].getValue(), v1); + break; + } + } + // place a version delete marker (delete last version) + Delete d = new Delete(row); + d.deleteColumn(famName, row, t); + htable1.delete(d); + + get = new Get(row); + get.setMaxVersions(); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() > 2) { + LOG.info("Version not deleted"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.raw()[0].getValue(), v3); + assertArrayEquals(res.raw()[1].getValue(), v2); + break; + } + } + + // place a column delete marker + d = new Delete(row); + d.deleteColumns(famName, row, t+2); + htable1.delete(d); + + // now *both* of the remaining version should be deleted + // at the replica + get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for del replication"); + } + Result res = htable2.get(get); + if (res.size() >= 1) { + LOG.info("Rows not deleted"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + /** * Add a row, check it's replicated, delete it, check's gone * @throws Exception