HBASE-5096 Replication does not handle deletes correctly. (Lars H)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1224851 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c8ddab6962
commit
2dcc010d36
|
@ -461,6 +461,7 @@ Release 0.92.0 - Unreleased
|
|||
HBASE-5078 DistributedLogSplitter failing to split file because it has edits for
|
||||
lots of regions
|
||||
HBASE-5077 SplitLogWorker fails to let go of a task, kills the RS
|
||||
HBASE-5096 Replication does not handle deletes correctly. (Lars H)
|
||||
|
||||
TESTS
|
||||
HBASE-4450 test for number of blocks read: to serve as baseline for expected
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||
import org.apache.hadoop.hbase.client.HTablePool;
|
||||
|
@ -104,11 +105,21 @@ public class ReplicationSink {
|
|||
kvs.get(0).getTimestamp(), null);
|
||||
delete.setClusterId(entry.getKey().getClusterId());
|
||||
for (KeyValue kv : kvs) {
|
||||
if (kv.isDeleteFamily()) {
|
||||
delete.deleteFamily(kv.getFamily());
|
||||
} else if (!kv.isEmptyColumn()) {
|
||||
delete.deleteColumn(kv.getFamily(),
|
||||
kv.getQualifier());
|
||||
switch (Type.codeToType(kv.getType())) {
|
||||
case DeleteFamily:
|
||||
// family marker
|
||||
delete.deleteFamily(kv.getFamily(), kv.getTimestamp());
|
||||
break;
|
||||
case DeleteColumn:
|
||||
// column marker
|
||||
delete.deleteColumns(kv.getFamily(), kv.getQualifier(),
|
||||
kv.getTimestamp());
|
||||
break;
|
||||
case Delete:
|
||||
// version marker
|
||||
delete.deleteColumn(kv.getFamily(), kv.getQualifier(),
|
||||
kv.getTimestamp());
|
||||
break;
|
||||
}
|
||||
}
|
||||
delete(entry.getKey().getTablename(), delete);
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -195,6 +196,94 @@ public class TestReplication {
|
|||
utility1.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that version and column delete marker types are replicated
|
||||
* correctly.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=300000)
|
||||
public void testDeleteTypes() throws Exception {
|
||||
LOG.info("testDeleteTypes");
|
||||
final byte[] v1 = Bytes.toBytes("v1");
|
||||
final byte[] v2 = Bytes.toBytes("v2");
|
||||
final byte[] v3 = Bytes.toBytes("v3");
|
||||
htable1 = new HTable(conf1, tableName);
|
||||
|
||||
long t = EnvironmentEdgeManager.currentTimeMillis();
|
||||
// create three versions for "row"
|
||||
Put put = new Put(row);
|
||||
put.add(famName, row, t, v1);
|
||||
htable1.put(put);
|
||||
|
||||
put = new Put(row);
|
||||
put.add(famName, row, t+1, v2);
|
||||
htable1.put(put);
|
||||
|
||||
put = new Put(row);
|
||||
put.add(famName, row, t+2, v3);
|
||||
htable1.put(put);
|
||||
|
||||
Get get = new Get(row);
|
||||
get.setMaxVersions();
|
||||
for (int i = 0; i < NB_RETRIES; i++) {
|
||||
if (i==NB_RETRIES-1) {
|
||||
fail("Waited too much time for put replication");
|
||||
}
|
||||
Result res = htable2.get(get);
|
||||
if (res.size() < 3) {
|
||||
LOG.info("Rows not available");
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
} else {
|
||||
assertArrayEquals(res.raw()[0].getValue(), v3);
|
||||
assertArrayEquals(res.raw()[1].getValue(), v2);
|
||||
assertArrayEquals(res.raw()[2].getValue(), v1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// place a version delete marker (delete last version)
|
||||
Delete d = new Delete(row);
|
||||
d.deleteColumn(famName, row, t);
|
||||
htable1.delete(d);
|
||||
|
||||
get = new Get(row);
|
||||
get.setMaxVersions();
|
||||
for (int i = 0; i < NB_RETRIES; i++) {
|
||||
if (i==NB_RETRIES-1) {
|
||||
fail("Waited too much time for put replication");
|
||||
}
|
||||
Result res = htable2.get(get);
|
||||
if (res.size() > 2) {
|
||||
LOG.info("Version not deleted");
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
} else {
|
||||
assertArrayEquals(res.raw()[0].getValue(), v3);
|
||||
assertArrayEquals(res.raw()[1].getValue(), v2);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// place a column delete marker
|
||||
d = new Delete(row);
|
||||
d.deleteColumns(famName, row, t+2);
|
||||
htable1.delete(d);
|
||||
|
||||
// now *both* of the remaining version should be deleted
|
||||
// at the replica
|
||||
get = new Get(row);
|
||||
for (int i = 0; i < NB_RETRIES; i++) {
|
||||
if (i==NB_RETRIES-1) {
|
||||
fail("Waited too much time for del replication");
|
||||
}
|
||||
Result res = htable2.get(get);
|
||||
if (res.size() >= 1) {
|
||||
LOG.info("Rows not deleted");
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a row, check it's replicated, delete it, check's gone
|
||||
* @throws Exception
|
||||
|
|
Loading…
Reference in New Issue