From 6415742c3757ca72080ca63372d1a0f82ac7607e Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Tue, 31 Mar 2015 22:59:08 -0700 Subject: [PATCH] HBASE-13317 Region server reportForDuty stuck looping if there is a master change (Jerry He) --- .../hadoop/hbase/LocalHBaseCluster.java | 13 +- .../hbase/regionserver/HRegionServer.java | 6 +- .../TestRegionServerReportForDuty.java | 183 ++++++++++++++++++ 3 files changed, 198 insertions(+), 4 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 17fc34f3f53..1263318e789 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -168,6 +168,7 @@ public class LocalHBaseCluster { return addRegionServer(new Configuration(conf), this.regionThreads.size()); } + @SuppressWarnings("unchecked") public JVMClusterUtil.RegionServerThread addRegionServer( Configuration config, final int index) throws IOException { @@ -181,8 +182,9 @@ public class LocalHBaseCluster { CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); JVMClusterUtil.RegionServerThread rst = - JVMClusterUtil.createRegionServerThread(config, cp, - this.regionServerClass, index); + JVMClusterUtil.createRegionServerThread(config, cp, (Class) conf + .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index); + this.regionThreads.add(rst); return rst; } @@ -261,6 +263,13 @@ public class LocalHBaseCluster { return liveServers; } + /** + * @return the Configuration used by this LocalHBaseCluster + */ + public Configuration getConfiguration() { + return this.conf; + } + /** * Wait for the specified region server to stop * Removes this thread from list of running threads. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a618a329201..2e13a404190 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2105,13 +2105,14 @@ public class HRegionServer extends HasThread implements /** * Get the current master from ZooKeeper and open the RPC connection to it. - * + * To get a fresh connection, the current rssStub must be null. * Method will block until a master is available. You can break from this * block by requesting the server stop. * * @return master + port, or null if server has been stopped */ - private synchronized ServerName createRegionServerStatusStub() { + @VisibleForTesting + protected synchronized ServerName createRegionServerStatusStub() { if (rssStub != null) { return masterAddressTracker.getMasterAddress(); } @@ -2216,6 +2217,7 @@ public class HRegionServer extends HasThread implements LOG.debug("Master is not running yet"); } else { LOG.warn("error telling master we are up", se); + rssStub = null; } } return result; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java new file mode 100644 index 00000000000..ebb82097aca --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.LocalHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestRegionServerReportForDuty { + + private static final Log LOG = LogFactory.getLog(TestRegionServerReportForDuty.class); + + private static final long SLEEP_INTERVAL = 500; + + private HBaseTestingUtility testUtil; + private LocalHBaseCluster cluster; + private RegionServerThread rs; + private RegionServerThread rs2; + private MasterThread master; + private MasterThread backupMaster; + + @Before + public void setUp() throws Exception { + testUtil = new HBaseTestingUtility(); + testUtil.startMiniDFSCluster(1); + testUtil.startMiniZKCluster(1); + testUtil.createRootDir(); + cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); + } + + @After + public void tearDown() throws Exception { + cluster.shutdown(); + cluster.join(); + testUtil.shutdownMiniZKCluster(); + testUtil.shutdownMiniDFSCluster(); + } + + /** + * Tests region sever reportForDuty with backup master becomes primary master after + * the first master goes away. + */ + @Test (timeout=180000) + public void testReportForDutyWithMasterChange() throws Exception { + + // Start a master and wait for it to become the active/primary master. + // Use a random unique port + cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); + // master has a rs. defaultMinToStart = 2 + cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2); + cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2); + master = cluster.addMaster(); + rs = cluster.addRegionServer(); + LOG.debug("Starting master: " + master.getMaster().getServerName()); + master.start(); + rs.start(); + + // Add a 2nd region server + cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); + rs2 = cluster.addRegionServer(); + // Start the region server. This region server will refresh RPC connection + // from the current active master to the next active master before completing + // reportForDuty + LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); + rs2.start(); + + waitForClusterOnline(master); + + // Stop the current master. + master.getMaster().stop("Stopping master"); + + // Start a new master and use another random unique port + // Also let it wait for exactly 2 region severs to report in. + cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); + cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3); + cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 3); + backupMaster = cluster.addMaster(); + LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); + backupMaster.start(); + + waitForClusterOnline(backupMaster); + + // Do some checking/asserts here. + assertTrue(backupMaster.getMaster().isActiveMaster()); + assertTrue(backupMaster.getMaster().isInitialized()); + assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 3); + + } + + private void waitForClusterOnline(MasterThread master) throws InterruptedException { + while (true) { + if (master.getMaster().isInitialized() + && ((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { + break; + } + Thread.sleep(SLEEP_INTERVAL); + LOG.debug("Waiting for master to come online ..."); + } + rs.waitForServerOnline(); + } + + // Create a Region Server that provide a hook so that we can wait for the master switch over + // before continuing reportForDuty to the mater. + // The idea is that we get a RPC connection to the first active master, then we wait. + // The first master goes down, the second master becomes the active master. The region + // server continues reportForDuty. It should succeed with the new master. + public static class MyRegionServer extends MiniHBaseClusterRegionServer { + + private ServerName sn; + // This flag is to make sure this rs has obtained the rpcStub to the first master. + // The first master will go down after this. + private boolean rpcStubCreatedFlag = false; + private boolean masterChanged = false; + + public MyRegionServer(Configuration conf, CoordinatedStateManager cp) + throws IOException, KeeperException, + InterruptedException { + super(conf, cp); + } + + @Override + protected synchronized ServerName createRegionServerStatusStub() { + sn = super.createRegionServerStatusStub(); + rpcStubCreatedFlag = true; + + // Wait for master switch over. Only do this for the second region server. + while (!masterChanged) { + ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); + if (newSn != null && !newSn.equals(sn)) { + masterChanged = true; + break; + } + try { + Thread.sleep(SLEEP_INTERVAL); + } catch (InterruptedException e) { + } + LOG.debug("Waiting for master switch over ... "); + } + return sn; + } + + public boolean getRpcStubCreatedFlag() { + return rpcStubCreatedFlag; + } + } +}