HBASE-13317 Region server reportForDuty stuck looping if there is a master change (Jerry He)

This commit is contained in:
Enis Soztutar 2015-03-31 22:59:08 -07:00
parent ffdcc00952
commit 6415742c37
3 changed files with 198 additions and 4 deletions

View File

@ -168,6 +168,7 @@ public class LocalHBaseCluster {
return addRegionServer(new Configuration(conf), this.regionThreads.size()); return addRegionServer(new Configuration(conf), this.regionThreads.size());
} }
@SuppressWarnings("unchecked")
public JVMClusterUtil.RegionServerThread addRegionServer( public JVMClusterUtil.RegionServerThread addRegionServer(
Configuration config, final int index) Configuration config, final int index)
throws IOException { throws IOException {
@ -181,8 +182,9 @@ public class LocalHBaseCluster {
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
JVMClusterUtil.RegionServerThread rst = JVMClusterUtil.RegionServerThread rst =
JVMClusterUtil.createRegionServerThread(config, cp, JVMClusterUtil.createRegionServerThread(config, cp, (Class<? extends HRegionServer>) conf
this.regionServerClass, index); .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
this.regionThreads.add(rst); this.regionThreads.add(rst);
return rst; return rst;
} }
@ -261,6 +263,13 @@ public class LocalHBaseCluster {
return liveServers; return liveServers;
} }
/**
* @return the Configuration used by this LocalHBaseCluster
*/
public Configuration getConfiguration() {
return this.conf;
}
/** /**
* Wait for the specified region server to stop * Wait for the specified region server to stop
* Removes this thread from list of running threads. * Removes this thread from list of running threads.

View File

@ -2105,13 +2105,14 @@ public class HRegionServer extends HasThread implements
/** /**
* Get the current master from ZooKeeper and open the RPC connection to it. * Get the current master from ZooKeeper and open the RPC connection to it.
* * To get a fresh connection, the current rssStub must be null.
* Method will block until a master is available. You can break from this * Method will block until a master is available. You can break from this
* block by requesting the server stop. * block by requesting the server stop.
* *
* @return master + port, or null if server has been stopped * @return master + port, or null if server has been stopped
*/ */
private synchronized ServerName createRegionServerStatusStub() { @VisibleForTesting
protected synchronized ServerName createRegionServerStatusStub() {
if (rssStub != null) { if (rssStub != null) {
return masterAddressTracker.getMasterAddress(); return masterAddressTracker.getMasterAddress();
} }
@ -2216,6 +2217,7 @@ public class HRegionServer extends HasThread implements
LOG.debug("Master is not running yet"); LOG.debug("Master is not running yet");
} else { } else {
LOG.warn("error telling master we are up", se); LOG.warn("error telling master we are up", se);
rssStub = null;
} }
} }
return result; return result;

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestRegionServerReportForDuty {
private static final Log LOG = LogFactory.getLog(TestRegionServerReportForDuty.class);
private static final long SLEEP_INTERVAL = 500;
private HBaseTestingUtility testUtil;
private LocalHBaseCluster cluster;
private RegionServerThread rs;
private RegionServerThread rs2;
private MasterThread master;
private MasterThread backupMaster;
@Before
public void setUp() throws Exception {
testUtil = new HBaseTestingUtility();
testUtil.startMiniDFSCluster(1);
testUtil.startMiniZKCluster(1);
testUtil.createRootDir();
cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
}
@After
public void tearDown() throws Exception {
cluster.shutdown();
cluster.join();
testUtil.shutdownMiniZKCluster();
testUtil.shutdownMiniDFSCluster();
}
/**
* Tests region sever reportForDuty with backup master becomes primary master after
* the first master goes away.
*/
@Test (timeout=180000)
public void testReportForDutyWithMasterChange() throws Exception {
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
// master has a rs. defaultMinToStart = 2
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
master = cluster.addMaster();
rs = cluster.addRegionServer();
LOG.debug("Starting master: " + master.getMaster().getServerName());
master.start();
rs.start();
// Add a 2nd region server
cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
rs2 = cluster.addRegionServer();
// Start the region server. This region server will refresh RPC connection
// from the current active master to the next active master before completing
// reportForDuty
LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
rs2.start();
waitForClusterOnline(master);
// Stop the current master.
master.getMaster().stop("Stopping master");
// Start a new master and use another random unique port
// Also let it wait for exactly 2 region severs to report in.
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 3);
backupMaster = cluster.addMaster();
LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
backupMaster.start();
waitForClusterOnline(backupMaster);
// Do some checking/asserts here.
assertTrue(backupMaster.getMaster().isActiveMaster());
assertTrue(backupMaster.getMaster().isInitialized());
assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 3);
}
private void waitForClusterOnline(MasterThread master) throws InterruptedException {
while (true) {
if (master.getMaster().isInitialized()
&& ((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
break;
}
Thread.sleep(SLEEP_INTERVAL);
LOG.debug("Waiting for master to come online ...");
}
rs.waitForServerOnline();
}
// Create a Region Server that provide a hook so that we can wait for the master switch over
// before continuing reportForDuty to the mater.
// The idea is that we get a RPC connection to the first active master, then we wait.
// The first master goes down, the second master becomes the active master. The region
// server continues reportForDuty. It should succeed with the new master.
public static class MyRegionServer extends MiniHBaseClusterRegionServer {
private ServerName sn;
// This flag is to make sure this rs has obtained the rpcStub to the first master.
// The first master will go down after this.
private boolean rpcStubCreatedFlag = false;
private boolean masterChanged = false;
public MyRegionServer(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
}
@Override
protected synchronized ServerName createRegionServerStatusStub() {
sn = super.createRegionServerStatusStub();
rpcStubCreatedFlag = true;
// Wait for master switch over. Only do this for the second region server.
while (!masterChanged) {
ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
if (newSn != null && !newSn.equals(sn)) {
masterChanged = true;
break;
}
try {
Thread.sleep(SLEEP_INTERVAL);
} catch (InterruptedException e) {
}
LOG.debug("Waiting for master switch over ... ");
}
return sn;
}
public boolean getRpcStubCreatedFlag() {
return rpcStubCreatedFlag;
}
}
}