HBASE-14905 VerifyReplication does not honour versions option (Vishal Khandelwal)

This commit is contained in:
tedyu 2015-12-03 16:09:22 -08:00
parent 7117a2e35d
commit 67ba6598b1
3 changed files with 163 additions and 1 deletions

View File

@ -121,6 +121,8 @@ public class VerifyReplication extends Configured implements Tool {
}
}
scan.setTimeRange(startTime, endTime);
int versions = conf.getInt(NAME+".versions", -1);
LOG.info("Setting number of version inside map as: " + versions);
if (versions >= 0) {
scan.setMaxVersions(versions);
}
@ -269,6 +271,9 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
LOG.info("Peer Quorum Address: " + peerQuorumAddress);
conf.setInt(NAME + ".versions", versions);
LOG.info("Number of version: " + versions);
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(VerifyReplication.class);
@ -276,6 +281,7 @@ public class VerifyReplication extends Configured implements Tool {
scan.setTimeRange(startTime, endTime);
if (versions >= 0) {
scan.setMaxVersions(versions);
LOG.info("Number of versions set to " + versions);
}
if(families != null) {
String[] fams = families.split(",");

View File

@ -135,7 +135,7 @@ public class TestReplicationBase {
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setMaxVersions(3);
fam.setMaxVersions(100);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -62,6 +63,9 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.ByteString;
import com.sun.tools.javac.code.Attribute.Array;
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSmallTests extends TestReplicationBase {
@ -490,6 +494,158 @@ public class TestReplicationSmallTests extends TestReplicationBase {
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
}
@Test(timeout=300000)
// VerifyReplication should honor versions option
public void testHBase14905() throws Exception {
// normal Batch tests
byte[] qualifierName = Bytes.toBytes("f1");
Put put = new Put(Bytes.toBytes("r1"));
put.addColumn(famName, qualifierName, Bytes.toBytes("v1002"));
htable1.put(put);
put.addColumn(famName, qualifierName, Bytes.toBytes("v1001"));
htable1.put(put);
put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
htable1.put(put);
Scan scan = new Scan();
scan.setMaxVersions(100);
ResultScanner scanner1 = htable1.getScanner(scan);
Result[] res1 = scanner1.next(1);
scanner1.close();
assertEquals(1, res1.length);
assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
for (int i = 0; i < NB_RETRIES; i++) {
scan = new Scan();
scan.setMaxVersions(100);
scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(1);
scanner1.close();
if (res1.length != 1) {
LOG.info("Only got " + res1.length + " rows");
Thread.sleep(SLEEP_TIME);
} else {
int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
if (cellNumber != 3) {
LOG.info("Only got " + cellNumber + " cells");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
if (i == NB_RETRIES-1) {
fail("Waited too much time for normal batch replication");
}
}
put.addColumn(famName, qualifierName, Bytes.toBytes("v1111"));
htable2.put(put);
put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
htable2.put(put);
scan = new Scan();
scan.setMaxVersions(100);
scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(NB_ROWS_IN_BATCH);
scanner1.close();
assertEquals(1, res1.length);
assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()};
Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(0, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(1, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
}
@Test(timeout=300000)
// VerifyReplication should honor versions option
public void testVersionMismatchHBase14905() throws Exception {
// normal Batch tests
byte[] qualifierName = Bytes.toBytes("f1");
Put put = new Put(Bytes.toBytes("r1"));
long ts = System.currentTimeMillis();
put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
htable1.put(put);
put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
htable1.put(put);
put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
htable1.put(put);
Scan scan = new Scan();
scan.setMaxVersions(100);
ResultScanner scanner1 = htable1.getScanner(scan);
Result[] res1 = scanner1.next(1);
scanner1.close();
assertEquals(1, res1.length);
assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
for (int i = 0; i < NB_RETRIES; i++) {
scan = new Scan();
scan.setMaxVersions(100);
scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(1);
scanner1.close();
if (res1.length != 1) {
LOG.info("Only got " + res1.length + " rows");
Thread.sleep(SLEEP_TIME);
} else {
int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
if (cellNumber != 3) {
LOG.info("Only got " + cellNumber + " cells");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
if (i == NB_RETRIES-1) {
fail("Waited too much time for normal batch replication");
}
}
try {
// Disabling replication and modifying the particular version of the cell to validate the feature.
admin.disablePeer("2");
Put put2 = new Put(Bytes.toBytes("r1"));
put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
htable2.put(put2);
scan = new Scan();
scan.setMaxVersions(100);
scanner1 = htable2.getScanner(scan);
res1 = scanner1.next(NB_ROWS_IN_BATCH);
scanner1.close();
assertEquals(1, res1.length);
assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()};
Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(0, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(1, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
}
finally {
admin.enablePeer("2");
}
}
/**
* Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out
* the compaction WALEdit