From 8de95963555aecf49952a1c7a8b3d17c76dceb6f Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 26 Mar 2018 16:08:20 +0800 Subject: [PATCH] HBASE-20127 Add UT for serial replication after failover --- .../SerialReplicationTestBase.java | 7 ++ .../replication/TestSerialReplication.java | 5 -- .../TestSerialReplicationFailover.java | 76 +++++++++++++++++++ 3 files changed, 83 insertions(+), 5 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index 83afd811160..b5aae851f2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -113,6 +113,7 @@ public class SerialReplicationTestBase { @BeforeClass public static void setUpBeforeClass() throws Exception { UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); + UTIL.getConfiguration().setLong("replication.sleep.before.failover", 1000); UTIL.startMiniCluster(3); // disable balancer UTIL.getAdmin().balancerSwitch(false, true); @@ -200,6 +201,11 @@ public class SerialReplicationTestBase { }); } + protected final void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception { + UTIL.getAdmin().enableReplicationPeer(PEER_ID); + waitUntilReplicationDone(expectedEntries); + } + protected final void addPeer(boolean enabled) throws IOException { UTIL.getAdmin().addReplicationPeer(PEER_ID, ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") @@ -221,6 +227,7 @@ public class SerialReplicationTestBase { assertTrue( "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), entry.getKey().getSequenceId() >= seqId); + seqId = entry.getKey().getSequenceId(); count++; } assertEquals(expectedEntries, count); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java index 94b79d97c81..bedb2ecacb8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -63,11 +63,6 @@ public class TestSerialReplication extends SerialReplicationTestBase { addPeer(false); } - private void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception { - UTIL.getAdmin().enableReplicationPeer(PEER_ID); - waitUntilReplicationDone(expectedEntries); - } - @Test public void testRegionMove() throws Exception { TableName tableName = TableName.valueOf(name.getMethodName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java new file mode 100644 index 00000000000..324a69fa8a8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestSerialReplicationFailover extends SerialReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSerialReplicationFailover.class); + + @Before + public void setUp() throws IOException, StreamLacksCapabilityException { + setupWALWriter(); + // add in disable state, so later when enabling it all sources will start push together. + addPeer(false); + } + + @Test + public void testKillRS() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); + UTIL.waitTableAvailable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionServerThread thread = UTIL.getMiniHBaseCluster().getRegionServerThreads().stream() + .filter(t -> !t.getRegionServer().getRegions(tableName).isEmpty()).findFirst().get(); + thread.getRegionServer().abort("for testing"); + thread.join(); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 100; i < 200; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + enablePeerAndWaitUntilReplicationDone(200); + checkOrder(200); + } +}