Merge HADOOP-10251. Both NameNodes could be in STANDBY State if SNN network is unstable. Contributed by Vinayakumar B.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1589495 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uma Maheswara Rao G 2014-04-23 19:04:33 +00:00
parent db667639eb
commit cdfb1aedd9
4 changed files with 107 additions and 6 deletions

View File

@ -78,6 +78,9 @@ Release 2.5.0 - UNRELEASED
HADOOP-10526. Chance for Stream leakage in CompressorStream. (Rushabh HADOOP-10526. Chance for Stream leakage in CompressorStream. (Rushabh
Shah via kihwal) Shah via kihwal)
HADOOP-10251. Both NameNodes could be in STANDBY State if SNN network is unstable
(Vinayakumar B via umamahesh)
Release 2.4.1 - UNRELEASED Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -74,6 +74,9 @@ public class HealthMonitor {
private List<Callback> callbacks = Collections.synchronizedList( private List<Callback> callbacks = Collections.synchronizedList(
new LinkedList<Callback>()); new LinkedList<Callback>());
private List<ServiceStateCallback> serviceStateCallbacks = Collections
.synchronizedList(new LinkedList<ServiceStateCallback>());
private HAServiceStatus lastServiceState = new HAServiceStatus( private HAServiceStatus lastServiceState = new HAServiceStatus(
HAServiceState.INITIALIZING); HAServiceState.INITIALIZING);
@ -134,7 +137,15 @@ public void addCallback(Callback cb) {
public void removeCallback(Callback cb) { public void removeCallback(Callback cb) {
callbacks.remove(cb); callbacks.remove(cb);
} }
public synchronized void addServiceStateCallback(ServiceStateCallback cb) {
this.serviceStateCallbacks.add(cb);
}
public synchronized void removeServiceStateCallback(ServiceStateCallback cb) {
serviceStateCallbacks.remove(cb);
}
public void shutdown() { public void shutdown() {
LOG.info("Stopping HealthMonitor thread"); LOG.info("Stopping HealthMonitor thread");
shouldRun = false; shouldRun = false;
@ -217,6 +228,9 @@ private void doHealthChecks() throws InterruptedException {
private synchronized void setLastServiceStatus(HAServiceStatus status) { private synchronized void setLastServiceStatus(HAServiceStatus status) {
this.lastServiceState = status; this.lastServiceState = status;
for (ServiceStateCallback cb : serviceStateCallbacks) {
cb.reportServiceStatus(lastServiceState);
}
} }
private synchronized void enterState(State newState) { private synchronized void enterState(State newState) {
@ -293,4 +307,11 @@ public void run() {
static interface Callback { static interface Callback {
void enteredState(State newState); void enteredState(State newState);
} }
/**
* Callback interface for service states.
*/
static interface ServiceStateCallback {
void reportServiceStatus(HAServiceStatus status);
}
} }

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.util.ZKUtil;
@ -105,6 +106,8 @@ public abstract class ZKFailoverController {
private State lastHealthState = State.INITIALIZING; private State lastHealthState = State.INITIALIZING;
private volatile HAServiceState serviceState = HAServiceState.INITIALIZING;
/** Set if a fatal error occurs */ /** Set if a fatal error occurs */
private String fatalError = null; private String fatalError = null;
@ -294,6 +297,7 @@ private boolean confirmFormat() {
private void initHM() { private void initHM() {
healthMonitor = new HealthMonitor(conf, localTarget); healthMonitor = new HealthMonitor(conf, localTarget);
healthMonitor.addCallback(new HealthCallbacks()); healthMonitor.addCallback(new HealthCallbacks());
healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
healthMonitor.start(); healthMonitor.start();
} }
@ -376,6 +380,7 @@ private synchronized void becomeActive() throws ServiceFailedException {
String msg = "Successfully transitioned " + localTarget + String msg = "Successfully transitioned " + localTarget +
" to active state"; " to active state";
LOG.info(msg); LOG.info(msg);
serviceState = HAServiceState.ACTIVE;
recordActiveAttempt(new ActiveAttemptRecord(true, msg)); recordActiveAttempt(new ActiveAttemptRecord(true, msg));
} catch (Throwable t) { } catch (Throwable t) {
@ -484,6 +489,7 @@ private synchronized void becomeStandby() {
// TODO handle this. It's a likely case since we probably got fenced // TODO handle this. It's a likely case since we probably got fenced
// at the same time. // at the same time.
} }
serviceState = HAServiceState.STANDBY;
} }
@ -574,6 +580,7 @@ private void doCedeActive(int millisToCede)
delayJoiningUntilNanotime = System.nanoTime() + delayJoiningUntilNanotime = System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(millisToCede); TimeUnit.MILLISECONDS.toNanos(millisToCede);
elector.quitElection(needFence); elector.quitElection(needFence);
serviceState = HAServiceState.INITIALIZING;
} }
} }
recheckElectability(); recheckElectability();
@ -739,12 +746,16 @@ private void recheckElectability() {
switch (lastHealthState) { switch (lastHealthState) {
case SERVICE_HEALTHY: case SERVICE_HEALTHY:
elector.joinElection(targetToData(localTarget)); elector.joinElection(targetToData(localTarget));
if (quitElectionOnBadState) {
quitElectionOnBadState = false;
}
break; break;
case INITIALIZING: case INITIALIZING:
LOG.info("Ensuring that " + localTarget + " does not " + LOG.info("Ensuring that " + localTarget + " does not " +
"participate in active master election"); "participate in active master election");
elector.quitElection(false); elector.quitElection(false);
serviceState = HAServiceState.INITIALIZING;
break; break;
case SERVICE_UNHEALTHY: case SERVICE_UNHEALTHY:
@ -752,6 +763,7 @@ private void recheckElectability() {
LOG.info("Quitting master election for " + localTarget + LOG.info("Quitting master election for " + localTarget +
" and marking that fencing is necessary"); " and marking that fencing is necessary");
elector.quitElection(true); elector.quitElection(true);
serviceState = HAServiceState.INITIALIZING;
break; break;
case HEALTH_MONITOR_FAILED: case HEALTH_MONITOR_FAILED:
@ -784,6 +796,44 @@ public void run() {
whenNanos, TimeUnit.NANOSECONDS); whenNanos, TimeUnit.NANOSECONDS);
} }
int serviceStateMismatchCount = 0;
boolean quitElectionOnBadState = false;
void verifyChangedServiceState(HAServiceState changedState) {
synchronized (elector) {
synchronized (this) {
if (serviceState == HAServiceState.INITIALIZING) {
if (quitElectionOnBadState) {
LOG.debug("rechecking for electability from bad state");
recheckElectability();
}
return;
}
if (changedState == serviceState) {
serviceStateMismatchCount = 0;
return;
}
if (serviceStateMismatchCount == 0) {
// recheck one more time. As this might be due to parallel transition.
serviceStateMismatchCount++;
return;
}
// quit the election as the expected state and reported state
// mismatches.
LOG.error("Local service " + localTarget
+ " has changed the serviceState to " + changedState
+ ". Expected was " + serviceState
+ ". Quitting election marking fencing necessary.");
delayJoiningUntilNanotime = System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(1000);
elector.quitElection(true);
quitElectionOnBadState = true;
serviceStateMismatchCount = 0;
serviceState = HAServiceState.INITIALIZING;
}
}
}
/** /**
* @return the last health state passed to the FC * @return the last health state passed to the FC
* by the HealthMonitor. * by the HealthMonitor.
@ -855,7 +905,17 @@ public void enteredState(HealthMonitor.State newState) {
recheckElectability(); recheckElectability();
} }
} }
/**
* Callbacks for HAServiceStatus
*/
class ServiceStateCallBacks implements HealthMonitor.ServiceStateCallback {
@Override
public void reportServiceStatus(HAServiceStatus status) {
verifyChangedServiceState(status.getState());
}
}
private static class ActiveAttemptRecord { private static class ActiveAttemptRecord {
private final boolean succeeded; private final boolean succeeded;
private final String status; private final String status;

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ha; package org.apache.hadoop.ha;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.junit.Assume.assumeTrue;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
@ -29,7 +28,6 @@
import org.apache.hadoop.ha.HealthMonitor.State; import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC; import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -68,8 +66,6 @@ public class TestZKFailoverController extends ClientBaseWithFixes {
@Before @Before
public void setupConfAndServices() { public void setupConfAndServices() {
// skip tests on Windows until after resolution of ZooKeeper client bug
assumeTrue(!Shell.WINDOWS);
conf = new Configuration(); conf = new Configuration();
conf.set(ZKFailoverController.ZK_ACL_KEY, TEST_ACL); conf.set(ZKFailoverController.ZK_ACL_KEY, TEST_ACL);
conf.set(ZKFailoverController.ZK_AUTH_KEY, TEST_AUTH_GOOD); conf.set(ZKFailoverController.ZK_AUTH_KEY, TEST_AUTH_GOOD);
@ -232,6 +228,27 @@ public void testAutoFailoverOnBadHealth() throws Exception {
cluster.stop(); cluster.stop();
} }
} }
/**
* Test that, when the health monitor indicates bad health status,
* failover is triggered. Also ensures that graceful active->standby
* transition is used when possible, falling back to fencing when
* the graceful approach fails.
*/
@Test(timeout=15000)
public void testAutoFailoverOnBadState() throws Exception {
try {
cluster.start();
DummyHAService svc0 = cluster.getService(0);
LOG.info("Faking svc0 to change the state, should failover to svc1");
svc0.state = HAServiceState.STANDBY;
// Should fail back to svc0 at this point
cluster.waitForHAState(1, HAServiceState.ACTIVE);
} finally {
cluster.stop();
}
}
@Test(timeout=15000) @Test(timeout=15000)
public void testAutoFailoverOnLostZKSession() throws Exception { public void testAutoFailoverOnLostZKSession() throws Exception {