HBASE-20128 Add new UTs which extends the old replication UTs but set replication scope to SERIAL

This commit is contained in:
huzheng 2018-04-12 15:17:32 +08:00
parent aa8ceb8167
commit be3df29cef
16 changed files with 222 additions and 55 deletions

View File

@ -127,16 +127,9 @@ public class ReplicationPeers {
}
/**
<<<<<<< 2bb2fd611d4b88c724a2b561f10433b56c6fd3dd
* Update the peerConfig for the a given peer cluster
* @param id a short that identifies the cluster
* @param peerConfig new config for the peer cluster
* @throws ReplicationException
=======
* Helper method to connect to a peer
* @param peerId peer's identifier
* @return object representing the peer
>>>>>>> HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
*/
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);

View File

@ -96,12 +96,13 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
admin1.modifyTable(tableName, table);
admin1.enableTable(tableName);
admin1.disableTableReplication(tableName);
table = admin1.getTableDescriptor(tableName);
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
}
admin1.deleteColumnFamily(table.getTableName(), f.getName());
}
@Test
@ -158,6 +159,9 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
for (HColumnDescriptor fam : table.getColumnFamilies()) {
assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
}
admin1.deleteColumnFamily(tableName, f.getName());
admin2.deleteColumnFamily(tableName, f.getName());
}
@Test
@ -252,12 +256,14 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
rpc.getConfiguration().put("key1", "value2");
admin.updatePeerConfig(peerId, rpc);
if (!TestUpdatableReplicationEndpoint.hasCalledBack()) {
synchronized(TestUpdatableReplicationEndpoint.class) {
synchronized (TestUpdatableReplicationEndpoint.class) {
TestUpdatableReplicationEndpoint.class.wait(2000L);
}
}
assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack());
admin.removePeer(peerId);
}
public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint {

View File

@ -90,7 +90,7 @@ public class TestMasterReplication {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterReplication.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
private static final Logger LOG = LoggerFactory.getLogger(TestMasterReplication.class);
private Configuration baseConfiguration;

View File

@ -58,7 +58,7 @@ public class TestMultiSlaveReplication {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMultiSlaveReplication.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
private static final Logger LOG = LoggerFactory.getLogger(TestMultiSlaveReplication.class);
private static Configuration conf1;
private static Configuration conf2;

View File

@ -49,10 +49,17 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({MediumTests.class})
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
@RunWith(Parameterized.class)
@Category({ MediumTests.class })
public class TestNamespaceReplication extends TestReplicationBase {
@ClassRule
@ -77,6 +84,19 @@ public class TestNamespaceReplication extends TestReplicationBase {
private static Admin admin1;
private static Admin admin2;
@Parameter
public boolean serialPeer;
@Override
protected boolean isSerialPeer() {
return serialPeer;
}
@Parameters(name = "{index}: serialPeer={0}")
public static List<Boolean> parameters() {
return ImmutableList.of(true, false);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
@ -224,7 +244,7 @@ public class TestNamespaceReplication extends TestReplicationBase {
assertArrayEquals(val, res.value());
break;
}
Thread.sleep(SLEEP_TIME);
Thread.sleep(10 * SLEEP_TIME);
}
}
}
@ -244,7 +264,7 @@ public class TestNamespaceReplication extends TestReplicationBase {
} else {
break;
}
Thread.sleep(SLEEP_TIME);
Thread.sleep(10 * SLEEP_TIME);
}
}
}

View File

@ -24,7 +24,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
@ -50,13 +49,12 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -67,11 +65,6 @@ import org.slf4j.LoggerFactory;
* All other tests should have their own classes and extend this one
*/
public class TestReplicationBase {
/*
{
((Log4JLogger) ReplicationSource.LOG).getLogger().setLevel(Level.ALL);
}*/
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
protected static Configuration conf1 = HBaseConfiguration.create();
@ -100,13 +93,10 @@ public class TestReplicationBase {
protected static final byte[] famName = Bytes.toBytes("f");
protected static final byte[] row = Bytes.toBytes("row");
protected static final byte[] noRepfamName = Bytes.toBytes("norep");
protected static final String PEER_ID2 = "2";
@Parameter
public static boolean seperateOldWALs;
@Parameters
public static List<Boolean> params() {
return Arrays.asList(false, true);
protected boolean isSerialPeer() {
return false;
}
protected final void cleanUp() throws IOException, InterruptedException {
@ -197,9 +187,6 @@ public class TestReplicationBase {
conf1.setBoolean("replication.source.eof.autorecovery", true);
conf1.setLong("hbase.serial.replication.waiting.ms", 100);
// Parameter config
conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, seperateOldWALs);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
@ -227,10 +214,7 @@ public class TestReplicationBase {
// as a component in deciding maximum number of parallel batches to send to the peer cluster.
utility2.startMiniCluster(4);
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build();
hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
hbaseAdmin.addReplicationPeer("2", rpc);
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
@ -254,6 +238,26 @@ public class TestReplicationBase {
htable2 = connection2.getTable(tableName);
}
private boolean peerExist(String peerId) throws IOException {
return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
}
@Before
public void setUpBase() throws IOException {
if (!peerExist(PEER_ID2)) {
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(utility2.getClusterKey()).setSerial(isSerialPeer()).build();
hbaseAdmin.addReplicationPeer(PEER_ID2, rpc);
}
}
@After
public void tearDownBase() throws IOException {
if (peerExist(PEER_ID2)) {
hbaseAdmin.removeReplicationPeer(PEER_ID2);
}
}
protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
Put put = new Put(row);
put.addColumn(famName, row, row);

View File

@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Get;
@ -37,13 +39,20 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
/**
* Test handling of changes to the number of a peer's regionservers.
*/
@Category({ReplicationTests.class, LargeTests.class})
@RunWith(Parameterized.class)
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
@ClassRule
@ -53,6 +62,19 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationChangingPeerRegionservers.class);
@Parameter
public boolean serialPeer;
@Override
protected boolean isSerialPeer() {
return serialPeer;
}
@Parameters(name = "{index}: serialPeer={0}")
public static List<Boolean> parameters() {
return ImmutableList.of(true, false);
}
/**
* @throws java.lang.Exception
*/
@ -60,8 +82,8 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
public void setUp() throws Exception {
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r :
utility1.getHBaseCluster().getRegionServerThreads()) {
for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
.getRegionServerThreads()) {
utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
}
utility1.deleteTableData(tableName);
@ -94,7 +116,6 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
@Test
public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
LOG.info("testSimplePutDelete");
MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
int numRS = peerCluster.getRegionServerThreads().size();
@ -116,7 +137,6 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
assertEquals(numRS, peerCluster.getRegionServerThreads().size());
doPutTest(Bytes.toBytes(3));
}
private void doPutTest(byte[] row) throws IOException, InterruptedException {

View File

@ -126,6 +126,12 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
admin2.createNamespace(NamespaceDescriptor.create("NS").build());
}
testEditsBehindDroppedTable(true, "NS:test_dropped");
try (Admin admin1 = connection1.getAdmin()) {
admin1.deleteNamespace("NS");
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.deleteNamespace("NS");
}
}
private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception {

View File

@ -65,7 +65,7 @@ import org.slf4j.LoggerFactory;
/**
* Tests ReplicationSource and ReplicationEndpoint interactions
*/
@Category({ReplicationTests.class, MediumTests.class})
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationEndpoint extends TestReplicationBase {
@ClassRule
@ -79,7 +79,6 @@ public class TestReplicationEndpoint extends TestReplicationBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
admin.removePeer("2");
numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
}
@ -390,6 +389,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
static volatile List<Entry> lastEntries = null;
public ReplicationEndpointForTest() {
replicateCount.set(0);
contructedCount.incrementAndGet();
}
@ -434,6 +434,10 @@ public class TestReplicationEndpoint extends TestReplicationBase {
static AtomicInteger replicateCount = new AtomicInteger();
static boolean failedOnce;
public InterClusterReplicationEndpointForTest() {
replicateCount.set(0);
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
boolean success = super.replicate(replicateContext);

View File

@ -23,15 +23,12 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Runs the TestReplicationKillRS test and selects the RS to kill in the master cluster
* Do not add other tests in this class.
*/
@RunWith(Parameterized.class)
@Category({ReplicationTests.class, LargeTests.class})
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationKillMasterRS extends TestReplicationKillRS {
@ClassRule
@ -42,5 +39,4 @@ public class TestReplicationKillMasterRS extends TestReplicationKillRS {
public void killOneMasterRS() throws Exception {
loadTableAndKillRS(utility1);
}
}

View File

@ -29,7 +29,7 @@ import org.junit.experimental.categories.Category;
* Run the same test as TestReplicationKillMasterRS but with WAL compression enabled
* Do not add other tests in this class.
*/
@Category({ReplicationTests.class, LargeTests.class})
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationKillMasterRSCompressed extends TestReplicationKillMasterRS {
@ClassRule

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationKillMasterRSWithSeparateOldWALs extends TestReplicationKillRS {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationKillMasterRSWithSeparateOldWALs.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true);
TestReplicationBase.setUpBeforeClass();
}
@Test
public void killOneMasterRS() throws Exception {
loadTableAndKillRS(utility1);
}
}

View File

@ -23,15 +23,12 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Runs the TestReplicationKillRS test and selects the RS to kill in the slave cluster
* Do not add other tests in this class.
*/
@RunWith(Parameterized.class)
@Category({ReplicationTests.class, LargeTests.class})
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationKillSlaveRS extends TestReplicationKillRS {
@ClassRule

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationKillSlaveRSWithSeparateOldWALs extends TestReplicationKillRS {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationKillSlaveRSWithSeparateOldWALs.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true);
TestReplicationBase.setUpBeforeClass();
}
@Test
public void killOneSlaveRS() throws Exception {
loadTableAndKillRS(utility2);
}
}

View File

@ -55,9 +55,16 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
@RunWith(Parameterized.class)
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSmallTests extends TestReplicationBase {
@ -68,6 +75,19 @@ public class TestReplicationSmallTests extends TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class);
private static final String PEER_ID = "2";
@Parameter
public boolean serialPeer;
@Override
protected boolean isSerialPeer() {
return serialPeer;
}
@Parameters(name = "{index}: serialPeer={0}")
public static List<Boolean> parameters() {
return ImmutableList.of(true, false);
}
@Before
public void setUp() throws Exception {
cleanUp();
@ -316,8 +336,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
lastRow = currentRow;
}
LOG.error("Last row: " + lastRow);
fail("Waited too much time for normal batch replication, " + res.length + " instead of " +
NB_ROWS_IN_BIG_BATCH + "; waited=" + (System.currentTimeMillis() - start) + "ms");
fail("Waited too much time for normal batch replication, " + res.length + " instead of "
+ NB_ROWS_IN_BIG_BATCH + "; waited=" + (System.currentTimeMillis() - start) + "ms");
} else {
LOG.info("Only got " + res.length + " rows... retrying");
Thread.sleep(SLEEP_TIME);

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@ -46,7 +48,7 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ReplicationTests.class, LargeTests.class})
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpTool extends TestReplicationBase {
@ClassRule
@ -97,7 +99,12 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
t2_syncupTarget.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
t2_syncupTarget.addFamily(fam);
}
@After
public void tearDownBase() throws IOException {
// Do nothing, just replace the super tearDown. because the super tearDown will use the
// out-of-data HBase admin to remove replication peer, which will be result in failure.
}
/**