YARN-4686. MiniYARNCluster.start() returns before cluster is completely started. Contributed by Eric Badger.
(cherry picked from commit 92b7e0d413
)
This commit is contained in:
parent
2fe4225051
commit
878e1cfc77
|
@ -217,7 +217,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
||||||
protected void verifyConnections() throws InterruptedException,
|
protected void verifyConnections() throws InterruptedException,
|
||||||
YarnException {
|
YarnException {
|
||||||
assertTrue("NMs failed to connect to the RM",
|
assertTrue("NMs failed to connect to the RM",
|
||||||
cluster.waitForNodeManagersToConnect(20000));
|
cluster.waitForNodeManagersToConnect(5000));
|
||||||
verifyClientConnection();
|
verifyClientConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,7 +279,6 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
||||||
cluster.resetStartFailoverFlag(false);
|
cluster.resetStartFailoverFlag(false);
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
cluster.start();
|
cluster.start();
|
||||||
getAdminService(0).transitionToActive(req);
|
|
||||||
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
||||||
verifyConnections();
|
verifyConnections();
|
||||||
|
|
||||||
|
|
|
@ -142,7 +142,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
|
||||||
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
cluster.start();
|
cluster.start();
|
||||||
getAdminService(0).transitionToActive(req);
|
|
||||||
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
||||||
verifyConnections();
|
verifyConnections();
|
||||||
|
|
||||||
|
@ -231,7 +230,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
|
||||||
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
cluster.start();
|
cluster.start();
|
||||||
getAdminService(0).transitionToActive(req);
|
|
||||||
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
||||||
verifyConnections();
|
verifyConnections();
|
||||||
|
|
||||||
|
|
|
@ -1195,6 +1195,24 @@ public class TestYarnClient {
|
||||||
client.init(yarnConf);
|
client.init(yarnConf);
|
||||||
client.start();
|
client.start();
|
||||||
|
|
||||||
|
int attempts;
|
||||||
|
for(attempts = 10; attempts > 0; attempts--) {
|
||||||
|
if (cluster.getResourceManager().getRMContext().getReservationSystem()
|
||||||
|
.getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
|
||||||
|
.getMemory() > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (attempts <= 0) {
|
||||||
|
Assert.fail("Exhausted attempts in checking if node capacity was "
|
||||||
|
+ "added to the plan");
|
||||||
|
}
|
||||||
|
|
||||||
// create a reservation
|
// create a reservation
|
||||||
Clock clock = new UTCClock();
|
Clock clock = new UTCClock();
|
||||||
long arrival = clock.getTime();
|
long arrival = clock.getTime();
|
||||||
|
|
|
@ -96,6 +96,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
|
private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
|
||||||
|
|
||||||
private final Object heartbeatMonitor = new Object();
|
private final Object heartbeatMonitor = new Object();
|
||||||
|
private final Object shutdownMonitor = new Object();
|
||||||
|
|
||||||
private final Context context;
|
private final Context context;
|
||||||
private final Dispatcher dispatcher;
|
private final Dispatcher dispatcher;
|
||||||
|
@ -240,6 +241,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStop() throws Exception {
|
protected void serviceStop() throws Exception {
|
||||||
// the isStopped check is for avoiding multiple unregistrations.
|
// the isStopped check is for avoiding multiple unregistrations.
|
||||||
|
synchronized(shutdownMonitor) {
|
||||||
if (this.registeredWithRM && !this.isStopped
|
if (this.registeredWithRM && !this.isStopped
|
||||||
&& !isNMUnderSupervisionWithRecoveryEnabled()
|
&& !isNMUnderSupervisionWithRecoveryEnabled()
|
||||||
&& !context.getDecommissioned() && !failedToConnect) {
|
&& !context.getDecommissioned() && !failedToConnect) {
|
||||||
|
@ -250,6 +252,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
stopRMProxy();
|
stopRMProxy();
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isNMUnderSupervisionWithRecoveryEnabled() {
|
private boolean isNMUnderSupervisionWithRecoveryEnabled() {
|
||||||
Configuration config = getConfig();
|
Configuration config = getConfig();
|
||||||
|
@ -275,14 +278,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
|
|
||||||
protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
|
protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
|
||||||
// Interrupt the updater.
|
// Interrupt the updater.
|
||||||
|
synchronized(shutdownMonitor) {
|
||||||
|
if(this.isStopped) {
|
||||||
|
LOG.info("Currently being shutdown. Aborting reboot");
|
||||||
|
return;
|
||||||
|
}
|
||||||
this.isStopped = true;
|
this.isStopped = true;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
statusUpdater.join();
|
statusUpdater.join();
|
||||||
registerWithRM();
|
registerWithRM();
|
||||||
statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
|
statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
|
||||||
this.isStopped = false;
|
|
||||||
statusUpdater.start();
|
statusUpdater.start();
|
||||||
|
this.isStopped = false;
|
||||||
LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
|
LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
|
String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
|
||||||
|
@ -290,6 +297,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
throw new YarnRuntimeException(e);
|
throw new YarnRuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected void stopRMProxy() {
|
protected void stopRMProxy() {
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||||
|
@ -76,6 +78,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
|
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
|
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
|
||||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -275,6 +278,12 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
|
conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized void serviceStart() throws Exception {
|
||||||
|
super.serviceStart();
|
||||||
|
this.waitForNodeManagersToConnect(5000);
|
||||||
|
}
|
||||||
|
|
||||||
private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) {
|
private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) {
|
||||||
String hostname = MiniYARNCluster.getHostname();
|
String hostname = MiniYARNCluster.getHostname();
|
||||||
conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
|
conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
|
||||||
|
@ -314,19 +323,7 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
|
|
||||||
private synchronized void startResourceManager(final int index) {
|
private synchronized void startResourceManager(final int index) {
|
||||||
try {
|
try {
|
||||||
Thread rmThread = new Thread() {
|
|
||||||
public void run() {
|
|
||||||
resourceManagers[index].start();
|
resourceManagers[index].start();
|
||||||
}
|
|
||||||
};
|
|
||||||
rmThread.setName("RM-" + index);
|
|
||||||
rmThread.start();
|
|
||||||
int waitCount = 0;
|
|
||||||
while (resourceManagers[index].getServiceState() == STATE.INITED
|
|
||||||
&& waitCount++ < 60) {
|
|
||||||
LOG.info("Waiting for RM to start...");
|
|
||||||
Thread.sleep(1500);
|
|
||||||
}
|
|
||||||
if (resourceManagers[index].getServiceState() != STATE.STARTED) {
|
if (resourceManagers[index].getServiceState() != STATE.STARTED) {
|
||||||
// RM could have failed.
|
// RM could have failed.
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
|
@ -456,6 +453,11 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void serviceStart() throws Exception {
|
protected synchronized void serviceStart() throws Exception {
|
||||||
startResourceManager(index);
|
startResourceManager(index);
|
||||||
|
if(index == 0) {
|
||||||
|
resourceManagers[index].getRMContext().getRMAdminService()
|
||||||
|
.transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED));
|
||||||
|
}
|
||||||
Configuration conf = resourceManagers[index].getConfig();
|
Configuration conf = resourceManagers[index].getConfig();
|
||||||
LOG.info("MiniYARN ResourceManager address: " +
|
LOG.info("MiniYARN ResourceManager address: " +
|
||||||
conf.get(YarnConfiguration.RM_ADDRESS));
|
conf.get(YarnConfiguration.RM_ADDRESS));
|
||||||
|
@ -564,26 +566,12 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void serviceStart() throws Exception {
|
protected synchronized void serviceStart() throws Exception {
|
||||||
try {
|
|
||||||
new Thread() {
|
|
||||||
public void run() {
|
|
||||||
nodeManagers[index].start();
|
nodeManagers[index].start();
|
||||||
}
|
|
||||||
}.start();
|
|
||||||
int waitCount = 0;
|
|
||||||
while (nodeManagers[index].getServiceState() == STATE.INITED
|
|
||||||
&& waitCount++ < 60) {
|
|
||||||
LOG.info("Waiting for NM " + index + " to start...");
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
if (nodeManagers[index].getServiceState() != STATE.STARTED) {
|
if (nodeManagers[index].getServiceState() != STATE.STARTED) {
|
||||||
// RM could have failed.
|
// NM could have failed.
|
||||||
throw new IOException("NodeManager " + index + " failed to start");
|
throw new IOException("NodeManager " + index + " failed to start");
|
||||||
}
|
}
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
} catch (Throwable t) {
|
|
||||||
throw new YarnRuntimeException(t);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -714,7 +702,7 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
/**
|
/**
|
||||||
* Wait for all the NodeManagers to connect to the ResourceManager.
|
* Wait for all the NodeManagers to connect to the ResourceManager.
|
||||||
*
|
*
|
||||||
* @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds.
|
* @param timeout Time to wait (sleeps in 10 ms intervals) in milliseconds.
|
||||||
* @return true if all NodeManagers connect to the (Active)
|
* @return true if all NodeManagers connect to the (Active)
|
||||||
* ResourceManager, false otherwise.
|
* ResourceManager, false otherwise.
|
||||||
* @throws YarnException
|
* @throws YarnException
|
||||||
|
@ -723,17 +711,19 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
public boolean waitForNodeManagersToConnect(long timeout)
|
public boolean waitForNodeManagersToConnect(long timeout)
|
||||||
throws YarnException, InterruptedException {
|
throws YarnException, InterruptedException {
|
||||||
GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
|
GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
|
||||||
for (int i = 0; i < timeout / 100; i++) {
|
for (int i = 0; i < timeout / 10; i++) {
|
||||||
ResourceManager rm = getResourceManager();
|
ResourceManager rm = getResourceManager();
|
||||||
if (rm == null) {
|
if (rm == null) {
|
||||||
throw new YarnException("Can not find the active RM.");
|
throw new YarnException("Can not find the active RM.");
|
||||||
}
|
}
|
||||||
else if (nodeManagers.length == rm.getClientRMService()
|
else if (nodeManagers.length == rm.getClientRMService()
|
||||||
.getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
|
.getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
|
||||||
|
LOG.info("All Node Managers connected in MiniYARNCluster");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
Thread.sleep(100);
|
Thread.sleep(10);
|
||||||
}
|
}
|
||||||
|
LOG.info("Node Managers did not connect within 5000ms");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -768,18 +758,7 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void serviceStart() throws Exception {
|
protected synchronized void serviceStart() throws Exception {
|
||||||
|
|
||||||
new Thread() {
|
|
||||||
public void run() {
|
|
||||||
appHistoryServer.start();
|
appHistoryServer.start();
|
||||||
};
|
|
||||||
}.start();
|
|
||||||
int waitCount = 0;
|
|
||||||
while (appHistoryServer.getServiceState() == STATE.INITED
|
|
||||||
&& waitCount++ < 60) {
|
|
||||||
LOG.info("Waiting for Timeline Server to start...");
|
|
||||||
Thread.sleep(1500);
|
|
||||||
}
|
|
||||||
if (appHistoryServer.getServiceState() != STATE.STARTED) {
|
if (appHistoryServer.getServiceState() != STATE.STARTED) {
|
||||||
// AHS could have failed.
|
// AHS could have failed.
|
||||||
IOException ioe = new IOException(
|
IOException ioe = new IOException(
|
||||||
|
|
|
@ -44,10 +44,6 @@ public class TestMiniYARNClusterForHA {
|
||||||
cluster.init(conf);
|
cluster.init(conf);
|
||||||
cluster.start();
|
cluster.start();
|
||||||
|
|
||||||
cluster.getResourceManager(0).getRMContext().getRMAdminService()
|
|
||||||
.transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
|
|
||||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
|
|
||||||
|
|
||||||
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue