YARN-4686. MiniYARNCluster.start() returns before cluster is completely started. Contributed by Eric Badger.

(cherry picked from commit 92b7e0d413)
This commit is contained in:
Eric Payne 2016-03-18 16:11:06 +00:00
parent 41621a994d
commit dd1e4107e5
6 changed files with 75 additions and 77 deletions

View File

@ -217,7 +217,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
protected void verifyConnections() throws InterruptedException, protected void verifyConnections() throws InterruptedException,
YarnException { YarnException {
assertTrue("NMs failed to connect to the RM", assertTrue("NMs failed to connect to the RM",
cluster.waitForNodeManagersToConnect(20000)); cluster.waitForNodeManagersToConnect(5000));
verifyClientConnection(); verifyClientConnection();
} }
@ -279,7 +279,6 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
cluster.resetStartFailoverFlag(false); cluster.resetStartFailoverFlag(false);
cluster.init(conf); cluster.init(conf);
cluster.start(); cluster.start();
getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections(); verifyConnections();

View File

@ -142,7 +142,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster.init(conf); cluster.init(conf);
cluster.start(); cluster.start();
getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections(); verifyConnections();
@ -231,7 +230,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster.init(conf); cluster.init(conf);
cluster.start(); cluster.start();
getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections(); verifyConnections();

View File

@ -1195,6 +1195,24 @@ public class TestYarnClient {
client.init(yarnConf); client.init(yarnConf);
client.start(); client.start();
int attempts;
for(attempts = 10; attempts > 0; attempts--) {
if (cluster.getResourceManager().getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
.getMemory() > 0) {
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (attempts <= 0) {
Assert.fail("Exhausted attempts in checking if node capacity was "
+ "added to the plan");
}
// create a reservation // create a reservation
Clock clock = new UTCClock(); Clock clock = new UTCClock();
long arrival = clock.getTime(); long arrival = clock.getTime();

View File

@ -96,6 +96,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
private final Object heartbeatMonitor = new Object(); private final Object heartbeatMonitor = new Object();
private final Object shutdownMonitor = new Object();
private final Context context; private final Context context;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
@ -240,15 +241,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
// the isStopped check is for avoiding multiple unregistrations. // the isStopped check is for avoiding multiple unregistrations.
if (this.registeredWithRM && !this.isStopped synchronized(shutdownMonitor) {
&& !isNMUnderSupervisionWithRecoveryEnabled() if (this.registeredWithRM && !this.isStopped
&& !context.getDecommissioned() && !failedToConnect) { && !isNMUnderSupervisionWithRecoveryEnabled()
unRegisterNM(); && !context.getDecommissioned() && !failedToConnect) {
unRegisterNM();
}
// Interrupt the updater.
this.isStopped = true;
stopRMProxy();
super.serviceStop();
} }
// Interrupt the updater.
this.isStopped = true;
stopRMProxy();
super.serviceStop();
} }
private boolean isNMUnderSupervisionWithRecoveryEnabled() { private boolean isNMUnderSupervisionWithRecoveryEnabled() {
@ -275,19 +278,24 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
protected void rebootNodeStatusUpdaterAndRegisterWithRM() { protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
// Interrupt the updater. // Interrupt the updater.
this.isStopped = true; synchronized(shutdownMonitor) {
if(this.isStopped) {
try { LOG.info("Currently being shutdown. Aborting reboot");
statusUpdater.join(); return;
registerWithRM(); }
statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); this.isStopped = true;
this.isStopped = false; try {
statusUpdater.start(); statusUpdater.join();
LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); registerWithRM();
} catch (Exception e) { statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
String errorMessage = "Unexpected error rebooting NodeStatusUpdater"; statusUpdater.start();
LOG.error(errorMessage, e); this.isStopped = false;
throw new YarnRuntimeException(e); LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
} catch (Exception e) {
String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
LOG.error(errorMessage, e);
throw new YarnRuntimeException(e);
}
} }
} }

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@ -76,6 +78,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -275,6 +278,12 @@ public class MiniYARNCluster extends CompositeService {
conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
} }
@Override
protected synchronized void serviceStart() throws Exception {
super.serviceStart();
this.waitForNodeManagersToConnect(5000);
}
private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) { private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) {
String hostname = MiniYARNCluster.getHostname(); String hostname = MiniYARNCluster.getHostname();
conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
@ -314,19 +323,7 @@ public class MiniYARNCluster extends CompositeService {
private synchronized void startResourceManager(final int index) { private synchronized void startResourceManager(final int index) {
try { try {
Thread rmThread = new Thread() { resourceManagers[index].start();
public void run() {
resourceManagers[index].start();
}
};
rmThread.setName("RM-" + index);
rmThread.start();
int waitCount = 0;
while (resourceManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1500);
}
if (resourceManagers[index].getServiceState() != STATE.STARTED) { if (resourceManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed. // RM could have failed.
throw new IOException( throw new IOException(
@ -456,6 +453,11 @@ public class MiniYARNCluster extends CompositeService {
@Override @Override
protected synchronized void serviceStart() throws Exception { protected synchronized void serviceStart() throws Exception {
startResourceManager(index); startResourceManager(index);
if(index == 0) {
resourceManagers[index].getRMContext().getRMAdminService()
.transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED));
}
Configuration conf = resourceManagers[index].getConfig(); Configuration conf = resourceManagers[index].getConfig();
LOG.info("Starting resourcemanager " + index); LOG.info("Starting resourcemanager " + index);
LOG.info("MiniYARN ResourceManager address: " + LOG.info("MiniYARN ResourceManager address: " +
@ -565,26 +567,12 @@ public class MiniYARNCluster extends CompositeService {
} }
protected synchronized void serviceStart() throws Exception { protected synchronized void serviceStart() throws Exception {
try { nodeManagers[index].start();
new Thread() { if (nodeManagers[index].getServiceState() != STATE.STARTED) {
public void run() { // NM could have failed.
nodeManagers[index].start(); throw new IOException("NodeManager " + index + " failed to start");
}
}.start();
int waitCount = 0;
while (nodeManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for NM " + index + " to start...");
Thread.sleep(1000);
}
if (nodeManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException("NodeManager " + index + " failed to start");
}
super.serviceStart();
} catch (Throwable t) {
throw new YarnRuntimeException(t);
} }
super.serviceStart();
} }
@Override @Override
@ -715,7 +703,7 @@ public class MiniYARNCluster extends CompositeService {
/** /**
* Wait for all the NodeManagers to connect to the ResourceManager. * Wait for all the NodeManagers to connect to the ResourceManager.
* *
* @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds. * @param timeout Time to wait (sleeps in 10 ms intervals) in milliseconds.
* @return true if all NodeManagers connect to the (Active) * @return true if all NodeManagers connect to the (Active)
* ResourceManager, false otherwise. * ResourceManager, false otherwise.
* @throws YarnException * @throws YarnException
@ -724,17 +712,19 @@ public class MiniYARNCluster extends CompositeService {
public boolean waitForNodeManagersToConnect(long timeout) public boolean waitForNodeManagersToConnect(long timeout)
throws YarnException, InterruptedException { throws YarnException, InterruptedException {
GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
for (int i = 0; i < timeout / 100; i++) { for (int i = 0; i < timeout / 10; i++) {
ResourceManager rm = getResourceManager(); ResourceManager rm = getResourceManager();
if (rm == null) { if (rm == null) {
throw new YarnException("Can not find the active RM."); throw new YarnException("Can not find the active RM.");
} }
else if (nodeManagers.length == rm.getClientRMService() else if (nodeManagers.length == rm.getClientRMService()
.getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) { .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
LOG.info("All Node Managers connected in MiniYARNCluster");
return true; return true;
} }
Thread.sleep(100); Thread.sleep(10);
} }
LOG.info("Node Managers did not connect within 5000ms");
return false; return false;
} }
@ -769,18 +759,7 @@ public class MiniYARNCluster extends CompositeService {
@Override @Override
protected synchronized void serviceStart() throws Exception { protected synchronized void serviceStart() throws Exception {
appHistoryServer.start();
new Thread() {
public void run() {
appHistoryServer.start();
};
}.start();
int waitCount = 0;
while (appHistoryServer.getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for Timeline Server to start...");
Thread.sleep(1500);
}
if (appHistoryServer.getServiceState() != STATE.STARTED) { if (appHistoryServer.getServiceState() != STATE.STARTED) {
// AHS could have failed. // AHS could have failed.
IOException ioe = new IOException( IOException ioe = new IOException(

View File

@ -44,10 +44,6 @@ public class TestMiniYARNClusterForHA {
cluster.init(conf); cluster.init(conf);
cluster.start(); cluster.start();
cluster.getResourceManager(0).getRMContext().getRMAdminService()
.transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
} }