diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index b78cbceb1b7..f4ccce8a972 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -53,7 +53,7 @@ import org.apache.yetus.audience.InterfaceAudience; * Used to add/remove a replication peer. */ @InterfaceAudience.Private -public final class ReplicationPeerManager { +public class ReplicationPeerManager { private final ReplicationPeerStorage peerStorage; @@ -61,8 +61,7 @@ public final class ReplicationPeerManager { private final ConcurrentMap peers; - private ReplicationPeerManager(ReplicationPeerStorage peerStorage, - ReplicationQueueStorage queueStorage, + ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, ConcurrentMap peers) { this.peerStorage = peerStorage; this.queueStorage = queueStorage; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationProcedureRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationProcedureRetry.java new file mode 100644 index 00000000000..ab35b4633a5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationProcedureRetry.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; + +/** + * All the modification method will fail once in the test and should finally succeed. + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationProcedureRetry { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, MockHMaster.class, HMaster.class); + UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @After + public void tearDownAfterTest() throws IOException { + for (ReplicationPeerDescription desc : UTIL.getAdmin().listReplicationPeers()) { + UTIL.getAdmin().removeReplicationPeer(desc.getPeerId()); + } + } + + private void doTest() throws IOException { + Admin admin = UTIL.getAdmin(); + String peerId = "1"; + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey("localhost:" + UTIL.getZkCluster().getClientPort() + ":/hbase2").build(); + admin.addReplicationPeer(peerId, peerConfig, true); + + assertEquals(peerConfig.getClusterKey(), + admin.getReplicationPeerConfig(peerId).getClusterKey()); + ReplicationPeerConfig newPeerConfig = + ReplicationPeerConfig.newBuilder(peerConfig).setBandwidth(123456).build(); + admin.updateReplicationPeerConfig(peerId, newPeerConfig); + assertEquals(newPeerConfig.getBandwidth(), + admin.getReplicationPeerConfig(peerId).getBandwidth()); + + admin.disableReplicationPeer(peerId); + assertFalse(admin.listReplicationPeers().get(0).isEnabled()); + + admin.enableReplicationPeer(peerId); + assertTrue(admin.listReplicationPeers().get(0).isEnabled()); + + admin.removeReplicationPeer(peerId); + assertTrue(admin.listReplicationPeers().isEmpty()); + + // make sure that we have run into the mocked method + MockHMaster master = (MockHMaster) UTIL.getHBaseCluster().getMaster(); + assertTrue(master.addPeerCalled); + assertTrue(master.removePeerCalled); + assertTrue(master.updatePeerConfigCalled); + assertTrue(master.enablePeerCalled); + assertTrue(master.disablePeerCalled); + } + + @Test + public void testErrorBeforeUpdate() throws IOException, ReplicationException { + ((MockHMaster) UTIL.getHBaseCluster().getMaster()).reset(true); + doTest(); + } + + @Test + public void testErrorAfterUpdate() throws IOException, ReplicationException { + ((MockHMaster) UTIL.getHBaseCluster().getMaster()).reset(false); + doTest(); + } + + public static final class MockHMaster extends HMaster { + + volatile boolean addPeerCalled; + + volatile boolean removePeerCalled; + + volatile boolean updatePeerConfigCalled; + + volatile boolean enablePeerCalled; + + volatile boolean disablePeerCalled; + + private ReplicationPeerManager manager; + + public MockHMaster(Configuration conf) throws IOException, KeeperException { + super(conf); + } + + private Object invokeWithError(InvocationOnMock invocation, boolean errorBeforeUpdate) + throws Throwable { + if (errorBeforeUpdate) { + throw new ReplicationException("mock error before update"); + } + invocation.callRealMethod(); + throw new ReplicationException("mock error after update"); + } + + public void reset(boolean errorBeforeUpdate) throws ReplicationException { + addPeerCalled = false; + removePeerCalled = false; + updatePeerConfigCalled = false; + enablePeerCalled = false; + disablePeerCalled = false; + ReplicationPeerManager m = super.getReplicationPeerManager(); + manager = spy(m); + doAnswer(invocation -> { + if (!addPeerCalled) { + addPeerCalled = true; + return invokeWithError(invocation, errorBeforeUpdate); + } else { + return invocation.callRealMethod(); + } + }).when(manager).addPeer(anyString(), any(ReplicationPeerConfig.class), anyBoolean()); + doAnswer(invocation -> { + if (!removePeerCalled) { + removePeerCalled = true; + return invokeWithError(invocation, errorBeforeUpdate); + } else { + return invocation.callRealMethod(); + } + }).when(manager).removePeer(anyString()); + doAnswer(invocation -> { + if (!updatePeerConfigCalled) { + updatePeerConfigCalled = true; + return invokeWithError(invocation, errorBeforeUpdate); + } else { + return invocation.callRealMethod(); + } + }).when(manager).updatePeerConfig(anyString(), any(ReplicationPeerConfig.class)); + doAnswer(invocation -> { + if (!enablePeerCalled) { + enablePeerCalled = true; + return invokeWithError(invocation, errorBeforeUpdate); + } else { + return invocation.callRealMethod(); + } + }).when(manager).enablePeer(anyString()); + doAnswer(invocation -> { + if (!disablePeerCalled) { + disablePeerCalled = true; + return invokeWithError(invocation, errorBeforeUpdate); + } else { + return invocation.callRealMethod(); + } + }).when(manager).disablePeer(anyString()); + } + + @Override + public ReplicationPeerManager getReplicationPeerManager() { + return manager; + } + } +}