YARN-1861. Fixed a bug in RM to reset leader-election on fencing that was causing both RMs to be stuck in standby mode when automatic failover is enabled. Contributed by Karthik Kambatla and Xuan Gong.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1594356 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dac9028ef9
commit
2f87e77cb7
|
@ -209,6 +209,10 @@ Release 2.4.1 - UNRELEASED
|
||||||
YARN-1201. TestAMAuthorization fails with local hostname cannot be resolved.
|
YARN-1201. TestAMAuthorization fails with local hostname cannot be resolved.
|
||||||
(Wangda Tan via junping_du)
|
(Wangda Tan via junping_du)
|
||||||
|
|
||||||
|
YARN-1861. Fixed a bug in RM to reset leader-election on fencing that was
|
||||||
|
causing both RMs to be stuck in standby mode when automatic failover is
|
||||||
|
enabled. (Karthik Kambatla and Xuan Gong via vinodkv)
|
||||||
|
|
||||||
Release 2.4.0 - 2014-04-07
|
Release 2.4.0 - 2014-04-07
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.service.Service.STATE;
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
|
@ -42,6 +43,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
|
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -169,6 +173,7 @@ public class TestRMFailover extends ClientBaseWithFixes {
|
||||||
verifyConnections();
|
verifyConnections();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
public void testAutomaticFailover()
|
public void testAutomaticFailover()
|
||||||
throws YarnException, InterruptedException, IOException {
|
throws YarnException, InterruptedException, IOException {
|
||||||
|
@ -186,6 +191,25 @@ public class TestRMFailover extends ClientBaseWithFixes {
|
||||||
|
|
||||||
failover();
|
failover();
|
||||||
verifyConnections();
|
verifyConnections();
|
||||||
|
|
||||||
|
// Make the current Active handle an RMFatalEvent,
|
||||||
|
// so it transitions to standby.
|
||||||
|
ResourceManager rm = cluster.getResourceManager(
|
||||||
|
cluster.getActiveRMIndex());
|
||||||
|
RMFatalEvent event =
|
||||||
|
new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED,
|
||||||
|
"Fake RMFatalEvent");
|
||||||
|
rm.getRMContext().getDispatcher().getEventHandler().handle(event);
|
||||||
|
int maxWaitingAttempts = 2000;
|
||||||
|
while (maxWaitingAttempts-- > 0 ) {
|
||||||
|
if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(1);
|
||||||
|
}
|
||||||
|
Assert.assertFalse("RM didn't transition to Standby ",
|
||||||
|
maxWaitingAttempts == 0);
|
||||||
|
verifyConnections();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
@ -86,6 +87,7 @@ public class AdminService extends CompositeService implements
|
||||||
private String rmId;
|
private String rmId;
|
||||||
|
|
||||||
private boolean autoFailoverEnabled;
|
private boolean autoFailoverEnabled;
|
||||||
|
private EmbeddedElectorService embeddedElector;
|
||||||
|
|
||||||
private Server server;
|
private Server server;
|
||||||
private InetSocketAddress masterServiceAddress;
|
private InetSocketAddress masterServiceAddress;
|
||||||
|
@ -106,7 +108,8 @@ public class AdminService extends CompositeService implements
|
||||||
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
|
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
|
||||||
if (autoFailoverEnabled) {
|
if (autoFailoverEnabled) {
|
||||||
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
|
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
|
||||||
addIfService(createEmbeddedElectorService());
|
embeddedElector = createEmbeddedElectorService();
|
||||||
|
addIfService(embeddedElector);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -181,6 +184,13 @@ public class AdminService extends CompositeService implements
|
||||||
return new EmbeddedElectorService(rmContext);
|
return new EmbeddedElectorService(rmContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
void resetLeaderElection() {
|
||||||
|
if (embeddedElector != null) {
|
||||||
|
embeddedElector.resetLeaderElection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private UserGroupInformation checkAccess(String method) throws IOException {
|
private UserGroupInformation checkAccess(String method) throws IOException {
|
||||||
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
|
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
|
||||||
}
|
}
|
||||||
|
|
|
@ -194,4 +194,9 @@ public class EmbeddedElectorService extends AbstractService
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void resetLeaderElection() {
|
||||||
|
elector.quitElection(false);
|
||||||
|
elector.joinElection(localActiveNodeInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -664,6 +664,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
// Transition to standby and reinit active services
|
// Transition to standby and reinit active services
|
||||||
LOG.info("Transitioning RM to Standby mode");
|
LOG.info("Transitioning RM to Standby mode");
|
||||||
rm.transitionToStandby(true);
|
rm.transitionToStandby(true);
|
||||||
|
rm.adminService.resetLeaderElection();
|
||||||
return;
|
return;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.fatal("Failed to transition RM to Standby mode.");
|
LOG.fatal("Failed to transition RM to Standby mode.");
|
||||||
|
|
|
@ -652,12 +652,14 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
*/
|
*/
|
||||||
public boolean waitForNodeManagersToConnect(long timeout)
|
public boolean waitForNodeManagersToConnect(long timeout)
|
||||||
throws YarnException, InterruptedException {
|
throws YarnException, InterruptedException {
|
||||||
ResourceManager rm = getResourceManager();
|
|
||||||
GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
|
GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
|
||||||
|
|
||||||
for (int i = 0; i < timeout / 100; i++) {
|
for (int i = 0; i < timeout / 100; i++) {
|
||||||
if (nodeManagers.length == rm.getClientRMService().getClusterMetrics(req)
|
ResourceManager rm = getResourceManager();
|
||||||
.getClusterMetrics().getNumNodeManagers()) {
|
if (rm == null) {
|
||||||
|
throw new YarnException("Can not find the active RM.");
|
||||||
|
}
|
||||||
|
else if (nodeManagers.length == rm.getClientRMService()
|
||||||
|
.getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
Loading…
Reference in New Issue