diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index cf7fcc5acbc..f336b0f052c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -217,7 +217,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes { protected void verifyConnections() throws InterruptedException, YarnException { assertTrue("NMs failed to connect to the RM", - cluster.waitForNodeManagersToConnect(20000)); + cluster.waitForNodeManagersToConnect(5000)); verifyClientConnection(); } @@ -279,7 +279,6 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes { cluster.resetStartFailoverFlag(false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index cbc220aa061..f32335127d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -142,7 +142,6 @@ public class TestRMFailover extends ClientBaseWithFixes { conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); @@ -231,7 +230,6 @@ public class TestRMFailover extends ClientBaseWithFixes { conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 2c34b99c1ff..2d11d8a7f75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -1195,6 +1195,24 @@ public class TestYarnClient { client.init(yarnConf); client.start(); + int attempts; + for(attempts = 10; attempts > 0; attempts--) { + if (cluster.getResourceManager().getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity() + .getMemory() > 0) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (attempts <= 0) { + Assert.fail("Exhausted attempts in checking if node capacity was " + + "added to the plan"); + } + // create a reservation Clock clock = new UTCClock(); long arrival = clock.getTime(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 58067314436..ad983fe1654 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -96,6 +96,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); private final Object heartbeatMonitor = new Object(); + private final Object shutdownMonitor = new Object(); private final Context context; private final Dispatcher dispatcher; @@ -240,15 +241,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @Override protected void serviceStop() throws Exception { // the isStopped check is for avoiding multiple unregistrations. - if (this.registeredWithRM && !this.isStopped - && !isNMUnderSupervisionWithRecoveryEnabled() - && !context.getDecommissioned() && !failedToConnect) { - unRegisterNM(); + synchronized(shutdownMonitor) { + if (this.registeredWithRM && !this.isStopped + && !isNMUnderSupervisionWithRecoveryEnabled() + && !context.getDecommissioned() && !failedToConnect) { + unRegisterNM(); + } + // Interrupt the updater. + this.isStopped = true; + stopRMProxy(); + super.serviceStop(); } - // Interrupt the updater. - this.isStopped = true; - stopRMProxy(); - super.serviceStop(); } private boolean isNMUnderSupervisionWithRecoveryEnabled() { @@ -275,19 +278,24 @@ public class NodeStatusUpdaterImpl extends AbstractService implements protected void rebootNodeStatusUpdaterAndRegisterWithRM() { // Interrupt the updater. - this.isStopped = true; - - try { - statusUpdater.join(); - registerWithRM(); - statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); - this.isStopped = false; - statusUpdater.start(); - LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); - } catch (Exception e) { - String errorMessage = "Unexpected error rebooting NodeStatusUpdater"; - LOG.error(errorMessage, e); - throw new YarnRuntimeException(e); + synchronized(shutdownMonitor) { + if(this.isStopped) { + LOG.info("Currently being shutdown. Aborting reboot"); + return; + } + this.isStopped = true; + try { + statusUpdater.join(); + registerWithRM(); + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); + statusUpdater.start(); + this.isStopped = false; + LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); + } catch (Exception e) { + String errorMessage = "Unexpected error rebooting NodeStatusUpdater"; + LOG.error(errorMessage, e); + throw new YarnRuntimeException(e); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 024adc624ed..74b77323046 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -41,6 +41,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; @@ -76,6 +78,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -275,6 +278,12 @@ public class MiniYARNCluster extends CompositeService { conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } + @Override + protected synchronized void serviceStart() throws Exception { + super.serviceStart(); + this.waitForNodeManagersToConnect(5000); + } + private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) { String hostname = MiniYARNCluster.getHostname(); conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); @@ -314,19 +323,7 @@ public class MiniYARNCluster extends CompositeService { private synchronized void startResourceManager(final int index) { try { - Thread rmThread = new Thread() { - public void run() { - resourceManagers[index].start(); - } - }; - rmThread.setName("RM-" + index); - rmThread.start(); - int waitCount = 0; - while (resourceManagers[index].getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for RM to start..."); - Thread.sleep(1500); - } + resourceManagers[index].start(); if (resourceManagers[index].getServiceState() != STATE.STARTED) { // RM could have failed. throw new IOException( @@ -456,6 +453,11 @@ public class MiniYARNCluster extends CompositeService { @Override protected synchronized void serviceStart() throws Exception { startResourceManager(index); + if(index == 0) { + resourceManagers[index].getRMContext().getRMAdminService() + .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED)); + } Configuration conf = resourceManagers[index].getConfig(); LOG.info("Starting resourcemanager " + index); LOG.info("MiniYARN ResourceManager address: " + @@ -565,26 +567,12 @@ public class MiniYARNCluster extends CompositeService { } protected synchronized void serviceStart() throws Exception { - try { - new Thread() { - public void run() { - nodeManagers[index].start(); - } - }.start(); - int waitCount = 0; - while (nodeManagers[index].getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for NM " + index + " to start..."); - Thread.sleep(1000); - } - if (nodeManagers[index].getServiceState() != STATE.STARTED) { - // RM could have failed. - throw new IOException("NodeManager " + index + " failed to start"); - } - super.serviceStart(); - } catch (Throwable t) { - throw new YarnRuntimeException(t); + nodeManagers[index].start(); + if (nodeManagers[index].getServiceState() != STATE.STARTED) { + // NM could have failed. + throw new IOException("NodeManager " + index + " failed to start"); } + super.serviceStart(); } @Override @@ -715,7 +703,7 @@ public class MiniYARNCluster extends CompositeService { /** * Wait for all the NodeManagers to connect to the ResourceManager. * - * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds. + * @param timeout Time to wait (sleeps in 10 ms intervals) in milliseconds. * @return true if all NodeManagers connect to the (Active) * ResourceManager, false otherwise. * @throws YarnException @@ -724,17 +712,19 @@ public class MiniYARNCluster extends CompositeService { public boolean waitForNodeManagersToConnect(long timeout) throws YarnException, InterruptedException { GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); - for (int i = 0; i < timeout / 100; i++) { + for (int i = 0; i < timeout / 10; i++) { ResourceManager rm = getResourceManager(); if (rm == null) { throw new YarnException("Can not find the active RM."); } else if (nodeManagers.length == rm.getClientRMService() - .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) { + .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) { + LOG.info("All Node Managers connected in MiniYARNCluster"); return true; } - Thread.sleep(100); + Thread.sleep(10); } + LOG.info("Node Managers did not connect within 5000ms"); return false; } @@ -769,18 +759,7 @@ public class MiniYARNCluster extends CompositeService { @Override protected synchronized void serviceStart() throws Exception { - - new Thread() { - public void run() { - appHistoryServer.start(); - }; - }.start(); - int waitCount = 0; - while (appHistoryServer.getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for Timeline Server to start..."); - Thread.sleep(1500); - } + appHistoryServer.start(); if (appHistoryServer.getServiceState() != STATE.STARTED) { // AHS could have failed. IOException ioe = new IOException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java index e84d62ea2e5..384d1cd78f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java @@ -44,10 +44,6 @@ public class TestMiniYARNClusterForHA { cluster.init(conf); cluster.start(); - cluster.getResourceManager(0).getRMContext().getRMAdminService() - .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( - HAServiceProtocol.RequestSource.REQUEST_BY_USER)); - assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); }