HBASE-7407 TestMasterFailover under tests some cases and over tests some others
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1445074 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e6ef5ce981
commit
d2fb5a546f
|
@ -52,4 +52,8 @@ public class DoNotRetryIOException extends HBaseIOException {
|
|||
public DoNotRetryIOException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public DoNotRetryIOException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
|
@ -26,13 +25,21 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
* This exception is thrown by the master when a region server was shut down and
|
||||
* restarted so fast that the master still hasn't processed the server shutdown
|
||||
* of the first instance, or when master is initializing and client call admin
|
||||
* operations
|
||||
* operations, or when an operation is performed on a region server that is still starting.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public class PleaseHoldException extends IOException {
|
||||
public class PleaseHoldException extends HBaseIOException {
|
||||
public PleaseHoldException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public PleaseHoldException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public PleaseHoldException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -143,7 +143,8 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
|||
|
||||
// Master controlled events to be executed on the master
|
||||
M_SERVER_SHUTDOWN (70, ExecutorType.MASTER_SERVER_OPERATIONS), // Master is processing shutdown of a RS
|
||||
M_META_SERVER_SHUTDOWN (72, ExecutorType.MASTER_META_SERVER_OPERATIONS); // Master is processing shutdown of RS hosting a meta region (-ROOT- or .META.).
|
||||
M_META_SERVER_SHUTDOWN (72, ExecutorType.MASTER_META_SERVER_OPERATIONS), // Master is processing shutdown of RS hosting a meta region (-ROOT- or .META.).
|
||||
M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS); // Master is processing recovery of regions found in ZK RIT
|
||||
|
||||
private final int code;
|
||||
private final ExecutorService.ExecutorType executor;
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
|
|||
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.KeyLocker;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -410,7 +411,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.info("Found regions out on cluster or in RIT; failover");
|
||||
// Process list of dead servers and regions in RIT.
|
||||
// See HBASE-4580 for more information.
|
||||
processDeadServersAndRecoverLostRegions(deadServers, nodes);
|
||||
processDeadServersAndRecoverLostRegions(deadServers);
|
||||
} else {
|
||||
// Fresh cluster startup.
|
||||
LOG.info("Clean cluster startup. Assigning userregions");
|
||||
|
@ -491,87 +492,122 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
*/
|
||||
void processRegionsInTransition(
|
||||
final RegionTransition rt, final HRegionInfo regionInfo,
|
||||
int expectedVersion) throws KeeperException {
|
||||
final int expectedVersion) throws KeeperException {
|
||||
EventType et = rt.getEventType();
|
||||
// Get ServerName. Could not be null.
|
||||
ServerName sn = rt.getServerName();
|
||||
final ServerName sn = rt.getServerName();
|
||||
String encodedRegionName = regionInfo.getEncodedName();
|
||||
LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
|
||||
|
||||
|
||||
if (regionStates.isRegionInTransition(encodedRegionName)) {
|
||||
// Just return
|
||||
return;
|
||||
}
|
||||
switch (et) {
|
||||
case M_ZK_REGION_CLOSING:
|
||||
// If zk node of the region was updated by a live server skip this
|
||||
// region and just add it into RIT.
|
||||
if (!serverManager.isServerOnline(sn)) {
|
||||
// If was not online, its closed now. Force to OFFLINE and this
|
||||
// will get it reassigned if appropriate
|
||||
forceOffline(regionInfo, rt);
|
||||
} else {
|
||||
// Just insert region into RIT.
|
||||
// If this never updates the timeout will trigger new assignment
|
||||
regionStates.updateRegionState(rt, RegionState.State.CLOSING);
|
||||
}
|
||||
break;
|
||||
case M_ZK_REGION_CLOSING:
|
||||
// If zk node of the region was updated by a live server skip this
|
||||
// region and just add it into RIT.
|
||||
if (!serverManager.isServerOnline(sn)) {
|
||||
// If was not online, its closed now. Force to OFFLINE and this
|
||||
// will get it reassigned if appropriate
|
||||
forceOffline(regionInfo, rt);
|
||||
} else {
|
||||
// Insert into RIT & resend the query to the region server: may be the previous master
|
||||
// died before sending the query the first time.
|
||||
regionStates.updateRegionState(rt, RegionState.State.CLOSING);
|
||||
final RegionState rs = regionStates.getRegionState(regionInfo);
|
||||
this.executorService.submit(
|
||||
new EventHandler(server, EventType.M_MASTER_RECOVERY) {
|
||||
@Override
|
||||
public void process() throws IOException {
|
||||
ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
|
||||
try {
|
||||
unassign(regionInfo, rs, expectedVersion, sn, true);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
break;
|
||||
|
||||
case RS_ZK_REGION_CLOSED:
|
||||
case RS_ZK_REGION_FAILED_OPEN:
|
||||
// Region is closed, insert into RIT and handle it
|
||||
addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
|
||||
break;
|
||||
case RS_ZK_REGION_CLOSED:
|
||||
case RS_ZK_REGION_FAILED_OPEN:
|
||||
// Region is closed, insert into RIT and handle it
|
||||
addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
|
||||
break;
|
||||
|
||||
case M_ZK_REGION_OFFLINE:
|
||||
// If zk node of the region was updated by a live server skip this
|
||||
// region and just add it into RIT.
|
||||
if (!serverManager.isServerOnline(sn)) {
|
||||
// Region is offline, insert into RIT and handle it like a closed
|
||||
addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
|
||||
} else {
|
||||
// Just insert region into RIT.
|
||||
// If this never updates the timeout will trigger new assignment
|
||||
regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
|
||||
}
|
||||
break;
|
||||
case M_ZK_REGION_OFFLINE:
|
||||
// If zk node of the region was updated by a live server skip this
|
||||
// region and just add it into RIT.
|
||||
if (!serverManager.isServerOnline(sn)) {
|
||||
// Region is offline, insert into RIT and handle it like a closed
|
||||
addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
|
||||
} else {
|
||||
// Insert in RIT and resend to the regionserver
|
||||
regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
|
||||
final RegionState rs = regionStates.getRegionState(regionInfo);
|
||||
this.executorService.submit(
|
||||
new EventHandler(server, EventType.M_MASTER_RECOVERY) {
|
||||
@Override
|
||||
public void process() throws IOException {
|
||||
ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
|
||||
try {
|
||||
assign(rs, false, false);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
break;
|
||||
|
||||
case RS_ZK_REGION_OPENING:
|
||||
regionStates.updateRegionState(rt, RegionState.State.OPENING);
|
||||
if (regionInfo.isMetaTable() || !serverManager.isServerOnline(sn)) {
|
||||
// If ROOT or .META. table is waiting for timeout monitor to assign
|
||||
// it may take lot of time when the assignment.timeout.period is
|
||||
// the default value which may be very long. We will not be able
|
||||
// to serve any request during this time.
|
||||
// So we will assign the ROOT and .META. region immediately.
|
||||
// For a user region, if the server is not online, it takes
|
||||
// some time for timeout monitor to kick in. We know the region
|
||||
// won't open. So we will assign the opening
|
||||
// region immediately too.
|
||||
//
|
||||
// Otherwise, just insert region into RIT. If the state never
|
||||
// updates, the timeout will trigger new assignment
|
||||
processOpeningState(regionInfo);
|
||||
}
|
||||
break;
|
||||
case RS_ZK_REGION_OPENING:
|
||||
if (!serverManager.isServerOnline(sn)) {
|
||||
forceOffline(regionInfo, rt);
|
||||
} else {
|
||||
regionStates.updateRegionState(rt, RegionState.State.OPENING);
|
||||
}
|
||||
break;
|
||||
|
||||
case RS_ZK_REGION_OPENED:
|
||||
if (!serverManager.isServerOnline(sn)) {
|
||||
forceOffline(regionInfo, rt);
|
||||
} else {
|
||||
// Region is opened, insert into RIT and handle it
|
||||
regionStates.updateRegionState(rt, RegionState.State.OPEN);
|
||||
new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
|
||||
}
|
||||
break;
|
||||
case RS_ZK_REGION_SPLITTING:
|
||||
LOG.debug("Processed region in state : " + et);
|
||||
break;
|
||||
case RS_ZK_REGION_SPLIT:
|
||||
LOG.debug("Processed region in state : " + et);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Received region in state :" + et + " is not valid");
|
||||
case RS_ZK_REGION_OPENED:
|
||||
if (!serverManager.isServerOnline(sn)) {
|
||||
forceOffline(regionInfo, rt);
|
||||
} else {
|
||||
// Region is opened, insert into RIT and handle it
|
||||
// This could be done asynchronously, we would need then to acquire the lock in the
|
||||
// handler.
|
||||
regionStates.updateRegionState(rt, RegionState.State.OPEN);
|
||||
new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
|
||||
}
|
||||
break;
|
||||
case RS_ZK_REGION_SPLITTING:
|
||||
if (!serverManager.isServerOnline(sn)) {
|
||||
// The regionserver started the split, but died before updating the status.
|
||||
// It means (hopefully) that the split was not finished
|
||||
// TBD - to study. In the meantime, do nothing as in the past.
|
||||
LOG.warn("Processed region " + regionInfo.getEncodedName() + " in state : " + et +
|
||||
" on a dead regionserver: " + sn + " doing nothing");
|
||||
} else {
|
||||
LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
|
||||
et + " nothing to do.");
|
||||
// We don't do anything. The way the code is written in RS_ZK_REGION_SPLIT management,
|
||||
// it adds the RS_ZK_REGION_SPLITTING state if needed. So we don't have to do it here.
|
||||
}
|
||||
break;
|
||||
case RS_ZK_REGION_SPLIT:
|
||||
if (!serverManager.isServerOnline(sn)) {
|
||||
forceOffline(regionInfo, rt);
|
||||
} else {
|
||||
LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
|
||||
et + " nothing to do.");
|
||||
// We don't do anything. The regionserver is supposed to update the znode
|
||||
// multiple times so if it's still up we will receive an update soon.
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Received region in state :" + et + " is not valid.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -711,7 +747,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
// Check it has daughters.
|
||||
byte [] payload = rt.getPayload();
|
||||
List<HRegionInfo> daughters = null;
|
||||
List<HRegionInfo> daughters;
|
||||
try {
|
||||
daughters = HRegionInfo.parseDelimitedFrom(payload, 0, payload.length);
|
||||
} catch (IOException e) {
|
||||
|
@ -851,7 +887,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
*/
|
||||
private boolean convertPendingCloseToSplitting(final RegionState rs) {
|
||||
if (!rs.isPendingClose()) return false;
|
||||
LOG.debug("Converting PENDING_CLOSE to SPLITING; rs=" + rs);
|
||||
LOG.debug("Converting PENDING_CLOSE to SPLITTING; rs=" + rs);
|
||||
regionStates.updateRegionState(
|
||||
rs.getRegion(), RegionState.State.SPLITTING);
|
||||
// Clean up existing state. Clear from region plans seems all we
|
||||
|
@ -874,7 +910,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
RegionState regionState = regionStates.getRegionTransitionState(encodedName);
|
||||
switch (rt.getEventType()) {
|
||||
case M_ZK_REGION_OFFLINE:
|
||||
HRegionInfo regionInfo = null;
|
||||
HRegionInfo regionInfo;
|
||||
if (regionState != null) {
|
||||
regionInfo = regionState.getRegion();
|
||||
} else {
|
||||
|
@ -1367,7 +1403,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
HRegionInfo region = state.getRegion();
|
||||
String encodedRegionName = region.getEncodedName();
|
||||
Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
|
||||
if (nodeVersion == null || nodeVersion.intValue() == -1) {
|
||||
if (nodeVersion == null || nodeVersion == -1) {
|
||||
LOG.warn("failed to offline in zookeeper: " + region);
|
||||
failedToOpenRegions.add(region); // assign individually later
|
||||
Lock lock = locks.remove(encodedRegionName);
|
||||
|
@ -1572,12 +1608,13 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
RegionPlan plan = null;
|
||||
long maxRegionServerStartupWaitTime = -1;
|
||||
HRegionInfo region = state.getRegion();
|
||||
RegionOpeningState regionOpenState;
|
||||
for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
|
||||
if (plan == null) { // Get a server for the region at first
|
||||
plan = getRegionPlan(region, forceNewPlan);
|
||||
}
|
||||
if (plan == null) {
|
||||
LOG.debug("Unable to determine a plan to assign " + region);
|
||||
LOG.warn("Unable to determine a plan to assign " + region);
|
||||
this.timeoutMonitor.setAllRegionServersOffline(true);
|
||||
return; // Should get reassigned later when RIT times out.
|
||||
}
|
||||
|
@ -1609,118 +1646,119 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.debug("Server stopped; skipping assign of " + region);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
LOG.info("Assigning region " + region.getRegionNameAsString() +
|
||||
LOG.info("Assigning region " + region.getRegionNameAsString() +
|
||||
" to " + plan.getDestination().toString());
|
||||
// Transition RegionState to PENDING_OPEN
|
||||
currentState = regionStates.updateRegionState(region,
|
||||
// Transition RegionState to PENDING_OPEN
|
||||
currentState = regionStates.updateRegionState(region,
|
||||
RegionState.State.PENDING_OPEN, plan.getDestination());
|
||||
// Send OPEN RPC. This can fail if the server on other end is is not up.
|
||||
// Pass the version that was obtained while setting the node to OFFLINE.
|
||||
RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
|
||||
.getDestination(), region, versionOfOfflineNode);
|
||||
if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
|
||||
processAlreadyOpenedRegion(region, plan.getDestination());
|
||||
} else if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
|
||||
// Failed opening this region
|
||||
throw new Exception("Get regionOpeningState=" + regionOpenState);
|
||||
|
||||
boolean needNewPlan;
|
||||
final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
|
||||
" to " + plan.getDestination();
|
||||
try {
|
||||
regionOpenState = serverManager.sendRegionOpen(
|
||||
plan.getDestination(), region, versionOfOfflineNode);
|
||||
|
||||
if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
|
||||
// Failed opening this region, looping again on a new server.
|
||||
needNewPlan = true;
|
||||
LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
|
||||
" trying to assign elsewhere instead; " +
|
||||
"try=" + i + " of " + this.maximumAttempts);
|
||||
} else {
|
||||
// we're done
|
||||
if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
|
||||
processAlreadyOpenedRegion(region, plan.getDestination());
|
||||
}
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
} catch (Throwable t) {
|
||||
if (t instanceof RemoteException) {
|
||||
t = ((RemoteException) t).unwrapRemoteException();
|
||||
}
|
||||
boolean regionAlreadyInTransitionException = false;
|
||||
boolean serverNotRunningYet = false;
|
||||
boolean socketTimedOut = false;
|
||||
if (t instanceof RegionAlreadyInTransitionException) {
|
||||
regionAlreadyInTransitionException = true;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Failed assignment in: " + plan.getDestination() + " due to "
|
||||
+ t.getMessage());
|
||||
}
|
||||
} else if (t instanceof ServerNotRunningYetException) {
|
||||
|
||||
// Should we wait a little before retrying? If the server is starting it's yes.
|
||||
// If the region is already in transition, it's yes as well: we want to be sure that
|
||||
// the region will get opened but we don't want a double assignment.
|
||||
boolean hold = (t instanceof RegionAlreadyInTransitionException ||
|
||||
t instanceof ServerNotRunningYetException);
|
||||
|
||||
// In case socket is timed out and the region server is still online,
|
||||
// the openRegion RPC could have been accepted by the server and
|
||||
// just the response didn't go through. So we will retry to
|
||||
// open the region on the same server to avoid possible
|
||||
// double assignment.
|
||||
boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
|
||||
&& this.serverManager.isServerOnline(plan.getDestination()));
|
||||
|
||||
|
||||
if (hold) {
|
||||
LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
|
||||
"try=" + i + " of " + this.maximumAttempts, t);
|
||||
|
||||
if (maxRegionServerStartupWaitTime < 0) {
|
||||
maxRegionServerStartupWaitTime = System.currentTimeMillis() +
|
||||
this.server.getConfiguration().
|
||||
getLong("hbase.regionserver.rpc.startup.waittime", 60000);
|
||||
maxRegionServerStartupWaitTime = EnvironmentEdgeManager.currentTimeMillis() +
|
||||
this.server.getConfiguration().
|
||||
getLong("hbase.regionserver.rpc.startup.waittime", 60000);
|
||||
}
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
if (now < maxRegionServerStartupWaitTime) {
|
||||
LOG.debug("Server is not yet up; waiting up to " +
|
||||
(maxRegionServerStartupWaitTime - now) + "ms", t);
|
||||
serverNotRunningYet = true;
|
||||
(maxRegionServerStartupWaitTime - now) + "ms", t);
|
||||
Thread.sleep(100);
|
||||
i--; // reset the try count
|
||||
needNewPlan = false;
|
||||
} else {
|
||||
LOG.debug("Server is not up for a while; try a new one", t);
|
||||
needNewPlan = true;
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Failed to assign "
|
||||
+ region.getRegionNameAsString() + " since interrupted", ie);
|
||||
+ region.getRegionNameAsString() + " since interrupted", ie);
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
} else if (t instanceof java.net.SocketTimeoutException
|
||||
&& this.serverManager.isServerOnline(plan.getDestination())) {
|
||||
// In case socket is timed out and the region server is still online,
|
||||
// the openRegion RPC could have been accepted by the server and
|
||||
// just the response didn't go through. So we will retry to
|
||||
// open the region on the same server to avoid possible
|
||||
// double assignment.
|
||||
socketTimedOut = true;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Call openRegion() to " + plan.getDestination()
|
||||
+ " has timed out when trying to assign "
|
||||
+ region.getRegionNameAsString()
|
||||
+ ", but the region might already be opened on "
|
||||
+ plan.getDestination() + ".", t);
|
||||
}
|
||||
} else if (retry) {
|
||||
needNewPlan = false;
|
||||
LOG.warn(assignMsg + ", trying to assign to the same region server " +
|
||||
"try=" + i + " of " + this.maximumAttempts, t);
|
||||
} else {
|
||||
needNewPlan = true;
|
||||
LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
|
||||
" try=" + i + " of " + this.maximumAttempts, t);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.warn("Failed assignment of "
|
||||
+ region.getRegionNameAsString()
|
||||
+ " to "
|
||||
+ plan.getDestination()
|
||||
+ ", trying to assign "
|
||||
+ (regionAlreadyInTransitionException || serverNotRunningYet || socketTimedOut
|
||||
? "to the same region server because of RegionAlreadyInTransitionException"
|
||||
+ "/ServerNotRunningYetException/SocketTimeoutException;"
|
||||
: "elsewhere instead; ")
|
||||
+ "try=" + i + " of " + this.maximumAttempts, t);
|
||||
if (i == this.maximumAttempts) {
|
||||
// Don't reset the region state or get a new plan any more.
|
||||
// This is the last try.
|
||||
continue;
|
||||
}
|
||||
|
||||
if (i == this.maximumAttempts) {
|
||||
// Don't reset the region state or get a new plan any more.
|
||||
// This is the last try.
|
||||
continue;
|
||||
}
|
||||
// If region opened on destination of present plan, reassigning to new
|
||||
// RS may cause double assignments. In case of RegionAlreadyInTransitionException
|
||||
// reassigning to same RS.
|
||||
if (needNewPlan) {
|
||||
// Force a new plan and reassign. Will return null if no servers.
|
||||
// The new plan could be the same as the existing plan since we don't
|
||||
// exclude the server of the original plan, which should not be
|
||||
// excluded since it could be the only server up now.
|
||||
RegionPlan newPlan = getRegionPlan(region, true);
|
||||
|
||||
// If region opened on destination of present plan, reassigning to new
|
||||
// RS may cause double assignments. In case of RegionAlreadyInTransitionException
|
||||
// reassigning to same RS.
|
||||
RegionPlan newPlan = plan;
|
||||
if (!(regionAlreadyInTransitionException
|
||||
|| serverNotRunningYet || socketTimedOut)) {
|
||||
// Force a new plan and reassign. Will return null if no servers.
|
||||
// The new plan could be the same as the existing plan since we don't
|
||||
// exclude the server of the original plan, which should not be
|
||||
// excluded since it could be the only server up now.
|
||||
newPlan = getRegionPlan(region, true);
|
||||
}
|
||||
if (newPlan == null) {
|
||||
this.timeoutMonitor.setAllRegionServersOffline(true);
|
||||
LOG.warn("Unable to find a viable location to assign region " +
|
||||
region.getRegionNameAsString());
|
||||
region.getRegionNameAsString());
|
||||
return;
|
||||
}
|
||||
if (plan != newPlan
|
||||
&& !plan.getDestination().equals(newPlan.getDestination())) {
|
||||
|
||||
if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
|
||||
// Clean out plan we failed execute and one that doesn't look like it'll
|
||||
// succeed anyways; we need a new plan!
|
||||
// Transition back to OFFLINE
|
||||
currentState = regionStates.updateRegionState(
|
||||
region, RegionState.State.OFFLINE);
|
||||
currentState = regionStates.updateRegionState(region, RegionState.State.OFFLINE);
|
||||
versionOfOfflineNode = -1;
|
||||
plan = newPlan;
|
||||
}
|
||||
|
@ -1740,7 +1778,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
} catch (KeeperException.NoNodeException e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The unassigned node " + encodedRegionName
|
||||
+ " doesnot exist.");
|
||||
+ " does not exist.");
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
server.abort(
|
||||
|
@ -1778,7 +1816,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
regionStates.updateRegionState(state.getRegion(),
|
||||
RegionState.State.OFFLINE);
|
||||
int versionOfOfflineNode = -1;
|
||||
int versionOfOfflineNode;
|
||||
try {
|
||||
// get the version after setting the znode to OFFLINE
|
||||
versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
|
||||
|
@ -1829,7 +1867,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
RegionPlan randomPlan = null;
|
||||
boolean newPlan = false;
|
||||
RegionPlan existingPlan = null;
|
||||
RegionPlan existingPlan;
|
||||
|
||||
synchronized (this.regionPlans) {
|
||||
existingPlan = this.regionPlans.get(encodedName);
|
||||
|
@ -2034,7 +2072,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
server.abort(
|
||||
"Unexpected ZK exception deleting node CLOSING/CLOSED for the region "
|
||||
+ encodedName, ke);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2410,16 +2447,15 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* that were in RIT.
|
||||
* <p>
|
||||
*
|
||||
*
|
||||
* @param deadServers
|
||||
* The list of dead servers which failed while there was no active
|
||||
* master. Can be null.
|
||||
* @param nodes
|
||||
* The regions in RIT
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
*/
|
||||
private void processDeadServersAndRecoverLostRegions(
|
||||
Map<ServerName, List<HRegionInfo>> deadServers, List<String> nodes)
|
||||
Map<ServerName, List<HRegionInfo>> deadServers)
|
||||
throws IOException, KeeperException {
|
||||
if (deadServers != null) {
|
||||
for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
|
||||
|
@ -2429,7 +2465,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
nodes = ZKUtil.listChildrenAndWatchForNewChildren(
|
||||
List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(
|
||||
this.watcher, this.watcher.assignmentZNode);
|
||||
if (!nodes.isEmpty()) {
|
||||
for (String encodedRegionName : nodes) {
|
||||
|
@ -2672,12 +2708,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
invokeAssign(regionInfo);
|
||||
} catch (KeeperException ke) {
|
||||
LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
|
||||
return;
|
||||
} catch (DeserializationException e) {
|
||||
LOG.error("Unexpected exception parsing CLOSING region", e);
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
void invokeAssign(HRegionInfo regionInfo) {
|
||||
|
|
|
@ -1408,7 +1408,7 @@ Server {
|
|||
* @param b If false, the catalog janitor won't do anything.
|
||||
*/
|
||||
public void setCatalogJanitorEnabled(final boolean b) {
|
||||
((CatalogJanitor)this.catalogJanitorChore).setEnabled(b);
|
||||
this.catalogJanitorChore.setEnabled(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1908,7 +1908,7 @@ Server {
|
|||
public String[] getCoprocessors() {
|
||||
Set<String> masterCoprocessors =
|
||||
getCoprocessorHost().getCoprocessors();
|
||||
return masterCoprocessors.toArray(new String[0]);
|
||||
return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -186,6 +186,8 @@ public class TestAssignmentManager {
|
|||
@Test(timeout = 5000)
|
||||
public void testBalanceOnMasterFailoverScenarioWithOpenedNode()
|
||||
throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException {
|
||||
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, REGIONINFO, 0, null, true)).
|
||||
thenReturn(true);
|
||||
AssignmentManagerWithExtrasForTesting am =
|
||||
setUpMockedAssignmentManager(this.server, this.serverManager);
|
||||
try {
|
||||
|
@ -232,6 +234,8 @@ public class TestAssignmentManager {
|
|||
@Test(timeout = 5000)
|
||||
public void testBalanceOnMasterFailoverScenarioWithClosedNode()
|
||||
throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException {
|
||||
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, REGIONINFO, 0, null, true)).
|
||||
thenReturn(true);
|
||||
AssignmentManagerWithExtrasForTesting am =
|
||||
setUpMockedAssignmentManager(this.server, this.serverManager);
|
||||
try {
|
||||
|
@ -279,6 +283,8 @@ public class TestAssignmentManager {
|
|||
@Test(timeout = 5000)
|
||||
public void testBalanceOnMasterFailoverScenarioWithOfflineNode()
|
||||
throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException {
|
||||
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, REGIONINFO, 0, null, true)).
|
||||
thenReturn(true);
|
||||
AssignmentManagerWithExtrasForTesting am =
|
||||
setUpMockedAssignmentManager(this.server, this.serverManager);
|
||||
try {
|
||||
|
@ -574,7 +580,7 @@ public class TestAssignmentManager {
|
|||
ClientProtocol implementation = Mockito.mock(ClientProtocol.class);
|
||||
// Get a meta row result that has region up on SERVERNAME_A
|
||||
|
||||
Result r = null;
|
||||
Result r;
|
||||
if (splitRegion) {
|
||||
r = MetaMockingUtil.getMetaTableRowResultAsSplitRegion(REGIONINFO, SERVERNAME_A);
|
||||
} else {
|
||||
|
@ -942,6 +948,30 @@ public class TestAssignmentManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Scenario:<ul>
|
||||
* <li> master starts a close, and creates a znode</li>
|
||||
* <li> it fails just at this moment, before contacting the RS</li>
|
||||
* <li> while the second master is coming up, the targeted RS dies. But it's before ZK timeout so
|
||||
* we don't know, and we have an exception.</li>
|
||||
* <li> the master must handle this nicely and reassign.
|
||||
* </ul>
|
||||
*/
|
||||
@Test
|
||||
public void testClosingFailureDuringRecovery() throws Exception {
|
||||
|
||||
AssignmentManagerWithExtrasForTesting am =
|
||||
setUpMockedAssignmentManager(this.server, this.serverManager);
|
||||
ZKAssign.createNodeClosing(this.watcher, REGIONINFO, SERVERNAME_A);
|
||||
am.getRegionStates().createRegionState(REGIONINFO);
|
||||
|
||||
assertFalse( am.getRegionStates().isRegionsInTransition() );
|
||||
|
||||
am.processRegionInTransition(REGIONINFO.getEncodedName(), REGIONINFO);
|
||||
|
||||
assertTrue( am.getRegionStates().isRegionsInTransition() );
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new ephemeral node in the SPLITTING state for the specified region.
|
||||
* Create it ephemeral in case regionserver dies mid-split.
|
||||
|
@ -1138,7 +1168,7 @@ public class TestAssignmentManager {
|
|||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
while (!t.isAlive()) Threads.sleep(1);
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -154,11 +153,6 @@ public class TestMasterFailover {
|
|||
|
||||
// Create config to use for this cluster
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// Need to drop the timeout much lower
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 4000);
|
||||
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
|
||||
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 3);
|
||||
|
||||
// Start the cluster
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
|
@ -278,6 +272,8 @@ public class TestMasterFailover {
|
|||
*/
|
||||
|
||||
// Region that should be assigned but is not and is in ZK as OFFLINE
|
||||
// Cause: This can happen if the master crashed after creating the znode but before sending the
|
||||
// request to the region server
|
||||
HRegionInfo region = enabledRegions.remove(0);
|
||||
regionsThatShouldBeOnline.add(region);
|
||||
ZKAssign.createNodeOffline(zkw, region, serverName);
|
||||
|
@ -285,6 +281,7 @@ public class TestMasterFailover {
|
|||
/*
|
||||
* ZK = CLOSING
|
||||
*/
|
||||
// Cause: Same as offline.
|
||||
regionsThatShouldBeOnline.add(closingRegion);
|
||||
ZKAssign.createNodeClosing(zkw, closingRegion, serverName);
|
||||
|
||||
|
@ -293,6 +290,7 @@ public class TestMasterFailover {
|
|||
*/
|
||||
|
||||
// Region of enabled table closed but not ack
|
||||
//Cause: Master was down while the region server updated the ZK status.
|
||||
region = enabledRegions.remove(0);
|
||||
regionsThatShouldBeOnline.add(region);
|
||||
int version = ZKAssign.createNodeClosing(zkw, region, serverName);
|
||||
|
@ -304,21 +302,12 @@ public class TestMasterFailover {
|
|||
version = ZKAssign.createNodeClosing(zkw, region, serverName);
|
||||
ZKAssign.transitionNodeClosed(zkw, region, serverName, version);
|
||||
|
||||
/*
|
||||
* ZK = OPENING
|
||||
*/
|
||||
|
||||
// RS was opening a region of enabled table but never finishes
|
||||
region = enabledRegions.remove(0);
|
||||
regionsThatShouldBeOnline.add(region);
|
||||
ZKAssign.createNodeOffline(zkw, region, serverName);
|
||||
ZKAssign.transitionNodeOpening(zkw, region, serverName);
|
||||
|
||||
/*
|
||||
* ZK = OPENED
|
||||
*/
|
||||
|
||||
// Region of enabled table was opened on RS
|
||||
// Cause: as offline
|
||||
region = enabledRegions.remove(0);
|
||||
regionsThatShouldBeOnline.add(region);
|
||||
ZKAssign.createNodeOffline(zkw, region, serverName);
|
||||
|
@ -333,6 +322,7 @@ public class TestMasterFailover {
|
|||
}
|
||||
|
||||
// Region of disable table was opened on RS
|
||||
// Cause: Master failed while updating the status for this region server.
|
||||
region = disabledRegions.remove(0);
|
||||
regionsThatShouldBeOffline.add(region);
|
||||
ZKAssign.createNodeOffline(zkw, region, serverName);
|
||||
|
@ -457,9 +447,7 @@ public class TestMasterFailover {
|
|||
// Create and start the cluster
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
// Need to drop the timeout much lower
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 4000);
|
||||
|
||||
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
|
||||
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
|
@ -771,25 +759,6 @@ public class TestMasterFailover {
|
|||
assertTrue(cluster.waitForActiveAndReadyMaster());
|
||||
log("Master is ready");
|
||||
|
||||
// Let's add some weird states to master in-memory state
|
||||
|
||||
// After HBASE-3181, we need to have some ZK state if we're PENDING_OPEN
|
||||
// b/c it is impossible for us to get into this state w/o a zk node
|
||||
// this is not true of PENDING_CLOSE
|
||||
|
||||
// PENDING_OPEN and enabled
|
||||
region = enabledRegions.remove(0);
|
||||
regionsThatShouldBeOnline.add(region);
|
||||
master.getAssignmentManager().getRegionStates().updateRegionState(
|
||||
region, RegionState.State.PENDING_OPEN);
|
||||
ZKAssign.createNodeOffline(zkw, region, master.getServerName());
|
||||
// PENDING_OPEN and disabled
|
||||
region = disabledRegions.remove(0);
|
||||
regionsThatShouldBeOffline.add(region);
|
||||
master.getAssignmentManager().getRegionStates().updateRegionState(
|
||||
region, RegionState.State.PENDING_OPEN);
|
||||
ZKAssign.createNodeOffline(zkw, region, master.getServerName());
|
||||
|
||||
// Failover should be completed, now wait for no RIT
|
||||
log("Waiting for no more RIT");
|
||||
ZKAssign.blockUntilNoRIT(zkw);
|
||||
|
@ -863,8 +832,6 @@ public class TestMasterFailover {
|
|||
// Start the cluster
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
|
||||
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 8000);
|
||||
conf.setInt("hbase.master.info.port", -1);
|
||||
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
|
@ -1016,84 +983,5 @@ public class TestMasterFailover {
|
|||
// Stop the cluster
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* return the index of the active master in the cluster
|
||||
* @throws MasterNotRunningException if no active master found
|
||||
*/
|
||||
private int getActiveMasterIndex(MiniHBaseCluster cluster) throws MasterNotRunningException {
|
||||
// get all the master threads
|
||||
List<MasterThread> masterThreads = cluster.getMasterThreads();
|
||||
|
||||
for (int i = 0; i < masterThreads.size(); i++) {
|
||||
if (masterThreads.get(i).getMaster().isActiveMaster()) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
throw new MasterNotRunningException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Kill the master and wait for a new active master to show up
|
||||
* @param cluster
|
||||
* @return the new active master
|
||||
* @throws InterruptedException
|
||||
* @throws IOException
|
||||
*/
|
||||
private HMaster killActiveAndWaitForNewActive(MiniHBaseCluster cluster)
|
||||
throws InterruptedException, IOException {
|
||||
int activeIndex = getActiveMasterIndex(cluster);
|
||||
HMaster active = cluster.getMaster();
|
||||
cluster.stopMaster(activeIndex);
|
||||
cluster.waitOnMaster(activeIndex);
|
||||
assertTrue(cluster.waitForActiveAndReadyMaster());
|
||||
// double check this is actually a new master
|
||||
HMaster newActive = cluster.getMaster();
|
||||
assertFalse(active == newActive);
|
||||
return newActive;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that if the master fails, the load balancer maintains its
|
||||
* state (running or not) when the next master takes over
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test (timeout=240000)
|
||||
public void testMasterFailoverBalancerPersistence() throws Exception {
|
||||
final int NUM_MASTERS = 3;
|
||||
final int NUM_RS = 1;
|
||||
|
||||
// Start the cluster
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
|
||||
assertTrue(cluster.waitForActiveAndReadyMaster());
|
||||
HMaster active = cluster.getMaster();
|
||||
// check that the balancer is on by default for the active master
|
||||
ClusterStatus clusterStatus = active.getClusterStatus();
|
||||
assertTrue(clusterStatus.isBalancerOn());
|
||||
|
||||
active = killActiveAndWaitForNewActive(cluster);
|
||||
|
||||
// ensure the load balancer is still running on new master
|
||||
clusterStatus = active.getClusterStatus();
|
||||
assertTrue(clusterStatus.isBalancerOn());
|
||||
|
||||
// turn off the load balancer
|
||||
active.balanceSwitch(false);
|
||||
|
||||
// once more, kill active master and wait for new active master to show up
|
||||
active = killActiveAndWaitForNewActive(cluster);
|
||||
|
||||
// ensure the load balancer is not running on the new master
|
||||
clusterStatus = active.getClusterStatus();
|
||||
assertFalse(clusterStatus.isBalancerOn());
|
||||
|
||||
// Stop the cluster
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestMasterFailoverBalancerPersistence {
|
||||
|
||||
/**
|
||||
* Test that if the master fails, the load balancer maintains its
|
||||
* state (running or not) when the next master takes over
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 240000)
|
||||
public void testMasterFailoverBalancerPersistence() throws Exception {
|
||||
final int NUM_MASTERS = 3;
|
||||
final int NUM_RS = 1;
|
||||
|
||||
// Start the cluster
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
|
||||
assertTrue(cluster.waitForActiveAndReadyMaster());
|
||||
HMaster active = cluster.getMaster();
|
||||
// check that the balancer is on by default for the active master
|
||||
ClusterStatus clusterStatus = active.getClusterStatus();
|
||||
assertTrue(clusterStatus.isBalancerOn());
|
||||
|
||||
active = killActiveAndWaitForNewActive(cluster);
|
||||
|
||||
// ensure the load balancer is still running on new master
|
||||
clusterStatus = active.getClusterStatus();
|
||||
assertTrue(clusterStatus.isBalancerOn());
|
||||
|
||||
// turn off the load balancer
|
||||
active.balanceSwitch(false);
|
||||
|
||||
// once more, kill active master and wait for new active master to show up
|
||||
active = killActiveAndWaitForNewActive(cluster);
|
||||
|
||||
// ensure the load balancer is not running on the new master
|
||||
clusterStatus = active.getClusterStatus();
|
||||
assertFalse(clusterStatus.isBalancerOn());
|
||||
|
||||
// Stop the cluster
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Kill the master and wait for a new active master to show up
|
||||
*
|
||||
* @param cluster
|
||||
* @return the new active master
|
||||
* @throws InterruptedException
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
private HMaster killActiveAndWaitForNewActive(MiniHBaseCluster cluster)
|
||||
throws InterruptedException, IOException {
|
||||
int activeIndex = getActiveMasterIndex(cluster);
|
||||
HMaster active = cluster.getMaster();
|
||||
cluster.stopMaster(activeIndex);
|
||||
cluster.waitOnMaster(activeIndex);
|
||||
assertTrue(cluster.waitForActiveAndReadyMaster());
|
||||
// double check this is actually a new master
|
||||
HMaster newActive = cluster.getMaster();
|
||||
assertFalse(active == newActive);
|
||||
return newActive;
|
||||
}
|
||||
|
||||
/**
|
||||
* return the index of the active master in the cluster
|
||||
*
|
||||
* @throws org.apache.hadoop.hbase.MasterNotRunningException
|
||||
* if no active master found
|
||||
*/
|
||||
private int getActiveMasterIndex(MiniHBaseCluster cluster) throws MasterNotRunningException {
|
||||
// get all the master threads
|
||||
List<JVMClusterUtil.MasterThread> masterThreads = cluster.getMasterThreads();
|
||||
|
||||
for (int i = 0; i < masterThreads.size(); i++) {
|
||||
if (masterThreads.get(i).getMaster().isActiveMaster()) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
throw new MasterNotRunningException();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue