From 67ba6598b1be167409a31c4e210b7218823b7beb Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 3 Dec 2015 16:09:22 -0800 Subject: [PATCH] HBASE-14905 VerifyReplication does not honour versions option (Vishal Khandelwal) --- .../replication/VerifyReplication.java | 6 + .../replication/TestReplicationBase.java | 2 +- .../TestReplicationSmallTests.java | 156 ++++++++++++++++++ 3 files changed, 163 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 28f9f39f779..e00c68254fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -121,6 +121,8 @@ public class VerifyReplication extends Configured implements Tool { } } scan.setTimeRange(startTime, endTime); + int versions = conf.getInt(NAME+".versions", -1); + LOG.info("Setting number of version inside map as: " + versions); if (versions >= 0) { scan.setMaxVersions(versions); } @@ -269,6 +271,9 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); LOG.info("Peer Quorum Address: " + peerQuorumAddress); + conf.setInt(NAME + ".versions", versions); + LOG.info("Number of version: " + versions); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(VerifyReplication.class); @@ -276,6 +281,7 @@ public class VerifyReplication extends Configured implements Tool { scan.setTimeRange(startTime, endTime); if (versions >= 0) { scan.setMaxVersions(versions); + LOG.info("Number of versions set to " + versions); } if(families != null) { String[] fams = families.split(","); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index ac87269110d..e52a60022ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -135,7 +135,7 @@ public class TestReplicationBase { HTableDescriptor table = new HTableDescriptor(tableName); HColumnDescriptor fam = new HColumnDescriptor(famName); - fam.setMaxVersions(3); + fam.setMaxVersions(100); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); fam = new HColumnDescriptor(noRepfamName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index b050f494653..4823597defe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -62,6 +63,9 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.protobuf.ByteString; +import com.sun.tools.javac.code.Attribute.Array; + @Category({ReplicationTests.class, LargeTests.class}) public class TestReplicationSmallTests extends TestReplicationBase { @@ -490,6 +494,158 @@ public class TestReplicationSmallTests extends TestReplicationBase { findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); } + @Test(timeout=300000) + // VerifyReplication should honor versions option + public void testHBase14905() throws Exception { + // normal Batch tests + byte[] qualifierName = Bytes.toBytes("f1"); + Put put = new Put(Bytes.toBytes("r1")); + put.addColumn(famName, qualifierName, Bytes.toBytes("v1002")); + htable1.put(put); + put.addColumn(famName, qualifierName, Bytes.toBytes("v1001")); + htable1.put(put); + put.addColumn(famName, qualifierName, Bytes.toBytes("v1112")); + htable1.put(put); + + Scan scan = new Scan(); + scan.setMaxVersions(100); + ResultScanner scanner1 = htable1.getScanner(scan); + Result[] res1 = scanner1.next(1); + scanner1.close(); + + assertEquals(1, res1.length); + assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); + + for (int i = 0; i < NB_RETRIES; i++) { + scan = new Scan(); + scan.setMaxVersions(100); + scanner1 = htable2.getScanner(scan); + res1 = scanner1.next(1); + scanner1.close(); + if (res1.length != 1) { + LOG.info("Only got " + res1.length + " rows"); + Thread.sleep(SLEEP_TIME); + } else { + int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); + if (cellNumber != 3) { + LOG.info("Only got " + cellNumber + " cells"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + if (i == NB_RETRIES-1) { + fail("Waited too much time for normal batch replication"); + } + } + + put.addColumn(famName, qualifierName, Bytes.toBytes("v1111")); + htable2.put(put); + put.addColumn(famName, qualifierName, Bytes.toBytes("v1112")); + htable2.put(put); + + scan = new Scan(); + scan.setMaxVersions(100); + scanner1 = htable2.getScanner(scan); + res1 = scanner1.next(NB_ROWS_IN_BATCH); + scanner1.close(); + + assertEquals(1, res1.length); + assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size()); + + String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()}; + Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); + if (job == null) { + fail("Job wasn't created, see the log"); + } + if (!job.waitForCompletion(true)) { + fail("Job failed, see the log"); + } + assertEquals(0, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(1, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + } + + @Test(timeout=300000) + // VerifyReplication should honor versions option + public void testVersionMismatchHBase14905() throws Exception { + // normal Batch tests + byte[] qualifierName = Bytes.toBytes("f1"); + Put put = new Put(Bytes.toBytes("r1")); + long ts = System.currentTimeMillis(); + put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1")); + htable1.put(put); + put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2")); + htable1.put(put); + put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3")); + htable1.put(put); + + Scan scan = new Scan(); + scan.setMaxVersions(100); + ResultScanner scanner1 = htable1.getScanner(scan); + Result[] res1 = scanner1.next(1); + scanner1.close(); + + assertEquals(1, res1.length); + assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); + + for (int i = 0; i < NB_RETRIES; i++) { + scan = new Scan(); + scan.setMaxVersions(100); + scanner1 = htable2.getScanner(scan); + res1 = scanner1.next(1); + scanner1.close(); + if (res1.length != 1) { + LOG.info("Only got " + res1.length + " rows"); + Thread.sleep(SLEEP_TIME); + } else { + int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size(); + if (cellNumber != 3) { + LOG.info("Only got " + cellNumber + " cells"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + if (i == NB_RETRIES-1) { + fail("Waited too much time for normal batch replication"); + } + } + + try { + // Disabling replication and modifying the particular version of the cell to validate the feature. + admin.disablePeer("2"); + Put put2 = new Put(Bytes.toBytes("r1")); + put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99")); + htable2.put(put2); + + scan = new Scan(); + scan.setMaxVersions(100); + scanner1 = htable2.getScanner(scan); + res1 = scanner1.next(NB_ROWS_IN_BATCH); + scanner1.close(); + assertEquals(1, res1.length); + assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); + + String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()}; + Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); + if (job == null) { + fail("Job wasn't created, see the log"); + } + if (!job.waitForCompletion(true)) { + fail("Job failed, see the log"); + } + assertEquals(0, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(1, job.getCounters(). + findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + } + finally { + admin.enablePeer("2"); + } + } + /** * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out * the compaction WALEdit