From 878e1cfc773efd99e3ec6ac1652d95d36881e671 Mon Sep 17 00:00:00 2001 From: Eric Payne Date: Fri, 18 Mar 2016 16:11:06 +0000 Subject: [PATCH] YARN-4686. MiniYARNCluster.start() returns before cluster is completely started. Contributed by Eric Badger. (cherry picked from commit 92b7e0d41302b6b110927f99de5c2b4a4a93c5fd) --- .../yarn/client/ProtocolHATestBase.java | 3 +- .../hadoop/yarn/client/TestRMFailover.java | 2 - .../yarn/client/api/impl/TestYarnClient.java | 18 +++++ .../nodemanager/NodeStatusUpdaterImpl.java | 50 +++++++------ .../hadoop/yarn/server/MiniYARNCluster.java | 75 +++++++------------ .../yarn/server/TestMiniYARNClusterForHA.java | 4 - 6 files changed, 75 insertions(+), 77 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 45629b266a6..c9a0fc7988f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -217,7 +217,7 @@ protected YarnClient createAndStartYarnClient(Configuration conf) { protected void verifyConnections() throws InterruptedException, YarnException { assertTrue("NMs failed to connect to the RM", - cluster.waitForNodeManagersToConnect(20000)); + cluster.waitForNodeManagersToConnect(5000)); verifyClientConnection(); } @@ -279,7 +279,6 @@ protected void startHACluster(int numOfNMs, boolean overrideClientRMService, cluster.resetStartFailoverFlag(false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index cbc220aa061..f32335127d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -142,7 +142,6 @@ public void testExplicitFailover() conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); @@ -231,7 +230,6 @@ public void testEmbeddedWebAppProxy() throws YarnException, conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); cluster.init(conf); cluster.start(); - getAdminService(0).transitionToActive(req); assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); verifyConnections(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 2c34b99c1ff..2d11d8a7f75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -1195,6 +1195,24 @@ public void testReservationAPIs() { client.init(yarnConf); client.start(); + int attempts; + for(attempts = 10; attempts > 0; attempts--) { + if (cluster.getResourceManager().getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity() + .getMemory() > 0) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (attempts <= 0) { + Assert.fail("Exhausted attempts in checking if node capacity was " + + "added to the plan"); + } + // create a reservation Clock clock = new UTCClock(); long arrival = clock.getTime(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 58067314436..ad983fe1654 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -96,6 +96,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); private final Object heartbeatMonitor = new Object(); + private final Object shutdownMonitor = new Object(); private final Context context; private final Dispatcher dispatcher; @@ -240,15 +241,17 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { // the isStopped check is for avoiding multiple unregistrations. - if (this.registeredWithRM && !this.isStopped - && !isNMUnderSupervisionWithRecoveryEnabled() - && !context.getDecommissioned() && !failedToConnect) { - unRegisterNM(); + synchronized(shutdownMonitor) { + if (this.registeredWithRM && !this.isStopped + && !isNMUnderSupervisionWithRecoveryEnabled() + && !context.getDecommissioned() && !failedToConnect) { + unRegisterNM(); + } + // Interrupt the updater. + this.isStopped = true; + stopRMProxy(); + super.serviceStop(); } - // Interrupt the updater. - this.isStopped = true; - stopRMProxy(); - super.serviceStop(); } private boolean isNMUnderSupervisionWithRecoveryEnabled() { @@ -275,19 +278,24 @@ private void unRegisterNM() { protected void rebootNodeStatusUpdaterAndRegisterWithRM() { // Interrupt the updater. - this.isStopped = true; - - try { - statusUpdater.join(); - registerWithRM(); - statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); - this.isStopped = false; - statusUpdater.start(); - LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); - } catch (Exception e) { - String errorMessage = "Unexpected error rebooting NodeStatusUpdater"; - LOG.error(errorMessage, e); - throw new YarnRuntimeException(e); + synchronized(shutdownMonitor) { + if(this.isStopped) { + LOG.info("Currently being shutdown. Aborting reboot"); + return; + } + this.isStopped = true; + try { + statusUpdater.join(); + registerWithRM(); + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); + statusUpdater.start(); + this.isStopped = false; + LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); + } catch (Exception e) { + String errorMessage = "Unexpected error rebooting NodeStatusUpdater"; + LOG.error(errorMessage, e); + throw new YarnRuntimeException(e); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 2147ee50757..3d557d3db0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -41,6 +41,7 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; @@ -76,6 +78,7 @@ import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -275,6 +278,12 @@ public void serviceInit(Configuration conf) throws Exception { conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } + @Override + protected synchronized void serviceStart() throws Exception { + super.serviceStart(); + this.waitForNodeManagersToConnect(5000); + } + private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) { String hostname = MiniYARNCluster.getHostname(); conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); @@ -314,19 +323,7 @@ public void handle(RMAppAttemptEvent event) { private synchronized void startResourceManager(final int index) { try { - Thread rmThread = new Thread() { - public void run() { - resourceManagers[index].start(); - } - }; - rmThread.setName("RM-" + index); - rmThread.start(); - int waitCount = 0; - while (resourceManagers[index].getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for RM to start..."); - Thread.sleep(1500); - } + resourceManagers[index].start(); if (resourceManagers[index].getServiceState() != STATE.STARTED) { // RM could have failed. throw new IOException( @@ -456,6 +453,11 @@ protected synchronized void serviceInit(Configuration conf) @Override protected synchronized void serviceStart() throws Exception { startResourceManager(index); + if(index == 0) { + resourceManagers[index].getRMContext().getRMAdminService() + .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED)); + } Configuration conf = resourceManagers[index].getConfig(); LOG.info("MiniYARN ResourceManager address: " + conf.get(YarnConfiguration.RM_ADDRESS)); @@ -564,26 +566,12 @@ private String prepareDirs(String dirType, int numDirs) { } protected synchronized void serviceStart() throws Exception { - try { - new Thread() { - public void run() { - nodeManagers[index].start(); - } - }.start(); - int waitCount = 0; - while (nodeManagers[index].getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for NM " + index + " to start..."); - Thread.sleep(1000); - } - if (nodeManagers[index].getServiceState() != STATE.STARTED) { - // RM could have failed. - throw new IOException("NodeManager " + index + " failed to start"); - } - super.serviceStart(); - } catch (Throwable t) { - throw new YarnRuntimeException(t); + nodeManagers[index].start(); + if (nodeManagers[index].getServiceState() != STATE.STARTED) { + // NM could have failed. + throw new IOException("NodeManager " + index + " failed to start"); } + super.serviceStart(); } @Override @@ -714,7 +702,7 @@ protected void stopRMProxy() { } /** * Wait for all the NodeManagers to connect to the ResourceManager. * - * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds. + * @param timeout Time to wait (sleeps in 10 ms intervals) in milliseconds. * @return true if all NodeManagers connect to the (Active) * ResourceManager, false otherwise. * @throws YarnException @@ -723,17 +711,19 @@ protected void stopRMProxy() { } public boolean waitForNodeManagersToConnect(long timeout) throws YarnException, InterruptedException { GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); - for (int i = 0; i < timeout / 100; i++) { + for (int i = 0; i < timeout / 10; i++) { ResourceManager rm = getResourceManager(); if (rm == null) { throw new YarnException("Can not find the active RM."); } else if (nodeManagers.length == rm.getClientRMService() - .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) { + .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) { + LOG.info("All Node Managers connected in MiniYARNCluster"); return true; } - Thread.sleep(100); + Thread.sleep(10); } + LOG.info("Node Managers did not connect within 5000ms"); return false; } @@ -768,18 +758,7 @@ protected synchronized void serviceInit(Configuration conf) @Override protected synchronized void serviceStart() throws Exception { - - new Thread() { - public void run() { - appHistoryServer.start(); - }; - }.start(); - int waitCount = 0; - while (appHistoryServer.getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for Timeline Server to start..."); - Thread.sleep(1500); - } + appHistoryServer.start(); if (appHistoryServer.getServiceState() != STATE.STARTED) { // AHS could have failed. IOException ioe = new IOException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java index e84d62ea2e5..384d1cd78f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java @@ -44,10 +44,6 @@ public void setup() throws IOException, InterruptedException { cluster.init(conf); cluster.start(); - cluster.getResourceManager(0).getRMContext().getRMAdminService() - .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( - HAServiceProtocol.RequestSource.REQUEST_BY_USER)); - assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); }