YARN-5375. invoke MockRM#drainEvents implicitly in MockRM methods to reduce test failures. Contributed by sandflee.

(cherry picked from commit d65603517e)
This commit is contained in:
Rohith Sharma K S 2016-11-16 15:14:00 +05:30
parent efc9ffc3b6
commit 74ac78b3c0
11 changed files with 134 additions and 63 deletions

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.yarn.event; package org.apache.hadoop.yarn.event;
import org.apache.hadoop.conf.Configuration;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -37,6 +39,13 @@ public DrainDispatcher(BlockingQueue<Event> eventQueue) {
this.mutex = this; this.mutex = this;
} }
@Override
public void serviceInit(Configuration conf)
throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
super.serviceInit(conf);
}
/** /**
* Wait till event thread enters WAITING state (i.e. waiting for new events). * Wait till event thread enters WAITING state (i.e. waiting for new events).
*/ */
@ -50,7 +59,7 @@ public void waitForEventThreadToWait() {
* Busy loop waiting for all queued events to drain. * Busy loop waiting for all queued events to drain.
*/ */
public void await() { public void await() {
while (!drained) { while (!isDrained()) {
Thread.yield(); Thread.yield();
} }
} }
@ -96,8 +105,10 @@ public void handle(Event event) {
@Override @Override
protected boolean isDrained() { protected boolean isDrained() {
synchronized (mutex) {
return drained; return drained;
} }
}
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {

View File

@ -671,14 +671,18 @@ public void setRMDispatcher(Dispatcher dispatcher) {
} }
AsyncDispatcher dispatcher; AsyncDispatcher dispatcher;
@SuppressWarnings("rawtypes")
@VisibleForTesting
protected EventHandler rmStateStoreEventHandler;
@Override @Override
protected void serviceInit(Configuration conf) throws Exception{ protected void serviceInit(Configuration conf) throws Exception{
// create async handler // create async handler
dispatcher = new AsyncDispatcher(); dispatcher = new AsyncDispatcher();
dispatcher.init(conf); dispatcher.init(conf);
rmStateStoreEventHandler = new ForwardingEventHandler();
dispatcher.register(RMStateStoreEventType.class, dispatcher.register(RMStateStoreEventType.class,
new ForwardingEventHandler()); rmStateStoreEventHandler);
dispatcher.setDrainEventsOnStop(); dispatcher.setDrainEventsOnStop();
initInternal(conf); initInternal(conf);
} }
@ -790,12 +794,12 @@ public void storeNewApplication(RMApp app) {
ApplicationStateData.newInstance(app.getSubmitTime(), ApplicationStateData.newInstance(app.getSubmitTime(),
app.getStartTime(), context, app.getUser(), app.getCallerContext()); app.getStartTime(), context, app.getUser(), app.getCallerContext());
appState.setApplicationTimeouts(app.getApplicationTimeouts()); appState.setApplicationTimeouts(app.getApplicationTimeouts());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void updateApplicationState(ApplicationStateData appState) { public void updateApplicationState(ApplicationStateData appState) {
dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); getRMStateStoreEventHandler().handle(new RMStateUpdateAppEvent(appState));
} }
public void updateApplicationStateSynchronously(ApplicationStateData appState, public void updateApplicationStateSynchronously(ApplicationStateData appState,
@ -842,14 +846,14 @@ public void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
attempMetrics.getPreemptedVcore() attempMetrics.getPreemptedVcore()
); );
dispatcher.getEventHandler().handle( getRMStateStoreEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState)); new RMStateStoreAppAttemptEvent(attemptState));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void updateApplicationAttemptState( public void updateApplicationAttemptState(
ApplicationAttemptStateData attemptState) { ApplicationAttemptStateData attemptState) {
dispatcher.getEventHandler().handle( getRMStateStoreEventHandler().handle(
new RMStateUpdateAppAttemptEvent(attemptState)); new RMStateUpdateAppAttemptEvent(attemptState));
} }
@ -1021,7 +1025,8 @@ public void removeApplication(RMApp app) {
appState.attempts.put(appAttempt.getAppAttemptId(), null); appState.attempts.put(appAttempt.getAppAttemptId(), null);
} }
dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); getRMStateStoreEventHandler().handle(
new RMStateStoreRemoveAppEvent(appState));
} }
/** /**
@ -1042,7 +1047,7 @@ protected abstract void removeApplicationStateInternal(
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public synchronized void removeApplicationAttempt( public synchronized void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId) { ApplicationAttemptId applicationAttemptId) {
dispatcher.getEventHandler().handle( getRMStateStoreEventHandler().handle(
new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId)); new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId));
} }
@ -1211,4 +1216,9 @@ public RMStateStoreState getRMStateStoreState() {
this.readLock.unlock(); this.readLock.unlock();
} }
} }
@SuppressWarnings("rawtypes")
protected EventHandler getRMStateStoreEventHandler() {
return dispatcher.getEventHandler();
}
} }

View File

@ -67,6 +67,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@ -74,6 +75,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -93,6 +96,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -117,6 +121,7 @@ public class MockRM extends ResourceManager {
private static final int WAIT_MS_PER_LOOP = 10; private static final int WAIT_MS_PER_LOOP = 10;
private final boolean useNullRMNodeLabelsManager; private final boolean useNullRMNodeLabelsManager;
private boolean disableDrainEventsImplicitly;
public MockRM() { public MockRM() {
this(new YarnConfiguration()); this(new YarnConfiguration());
@ -135,11 +140,39 @@ public MockRM(Configuration conf, RMStateStore store,
super(); super();
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager; this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
if(store != null) { if (store != null) {
setRMStateStore(store); setRMStateStore(store);
} else {
Class storeClass = getRMContext().getStateStore().getClass();
if (storeClass.equals(MemoryRMStateStore.class)) {
MockRMMemoryStateStore mockStateStore = new MockRMMemoryStateStore();
mockStateStore.init(conf);
setRMStateStore(mockStateStore);
} else if (storeClass.equals(NullRMStateStore.class)) {
MockRMNullStateStore mockStateStore = new MockRMNullStateStore();
mockStateStore.init(conf);
setRMStateStore(mockStateStore);
}
} }
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
disableDrainEventsImplicitly = false;
}
public class MockRMMemoryStateStore extends MemoryRMStateStore {
@SuppressWarnings("rawtypes")
@Override
protected EventHandler getRMStateStoreEventHandler() {
return rmStateStoreEventHandler;
}
}
public class MockRMNullStateStore extends NullRMStateStore {
@SuppressWarnings("rawtypes")
@Override
protected EventHandler getRMStateStoreEventHandler() {
return rmStateStoreEventHandler;
}
} }
@Override @Override
@ -159,6 +192,16 @@ protected Dispatcher createDispatcher() {
return new DrainDispatcher(); return new DrainDispatcher();
} }
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new EventHandler<SchedulerEvent>() {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);
}
};
}
public void drainEvents() { public void drainEvents() {
Dispatcher rmDispatcher = getRmDispatcher(); Dispatcher rmDispatcher = getRmDispatcher();
if (rmDispatcher instanceof DrainDispatcher) { if (rmDispatcher instanceof DrainDispatcher) {
@ -170,6 +213,7 @@ public void drainEvents() {
private void waitForState(ApplicationId appId, EnumSet<RMAppState> finalStates) private void waitForState(ApplicationId appId, EnumSet<RMAppState> finalStates)
throws InterruptedException { throws InterruptedException {
drainEventsImplicitly();
RMApp app = getRMContext().getRMApps().get(appId); RMApp app = getRMContext().getRMApps().get(appId);
Assert.assertNotNull("app shouldn't be null", app); Assert.assertNotNull("app shouldn't be null", app);
final int timeoutMsecs = 80 * SECOND; final int timeoutMsecs = 80 * SECOND;
@ -200,6 +244,7 @@ private void waitForState(ApplicationId appId, EnumSet<RMAppState> finalStates)
*/ */
public void waitForState(ApplicationId appId, RMAppState finalState) public void waitForState(ApplicationId appId, RMAppState finalState)
throws InterruptedException { throws InterruptedException {
drainEventsImplicitly();
RMApp app = getRMContext().getRMApps().get(appId); RMApp app = getRMContext().getRMApps().get(appId);
Assert.assertNotNull("app shouldn't be null", app); Assert.assertNotNull("app shouldn't be null", app);
final int timeoutMsecs = 80 * SECOND; final int timeoutMsecs = 80 * SECOND;
@ -245,6 +290,7 @@ public void waitForState(ApplicationAttemptId attemptId,
public void waitForState(ApplicationAttemptId attemptId, public void waitForState(ApplicationAttemptId attemptId,
RMAppAttemptState finalState, int timeoutMsecs) RMAppAttemptState finalState, int timeoutMsecs)
throws InterruptedException { throws InterruptedException {
drainEventsImplicitly();
RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
Assert.assertNotNull("app shouldn't be null", app); Assert.assertNotNull("app shouldn't be null", app);
RMAppAttempt attempt = app.getRMAppAttempt(attemptId); RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
@ -295,6 +341,7 @@ public static void waitForState(RMAppAttempt attempt,
public void waitForContainerToComplete(RMAppAttempt attempt, public void waitForContainerToComplete(RMAppAttempt attempt,
NMContainerStatus completedContainer) throws InterruptedException { NMContainerStatus completedContainer) throws InterruptedException {
drainEventsImplicitly();
int timeWaiting = 0; int timeWaiting = 0;
while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) { while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) {
List<ContainerStatus> containers = attempt.getJustFinishedContainers(); List<ContainerStatus> containers = attempt.getJustFinishedContainers();
@ -394,6 +441,7 @@ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
*/ */
public boolean waitForState(Collection<MockNM> nms, ContainerId containerId, public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
RMContainerState containerState, int timeoutMsecs) throws Exception { RMContainerState containerState, int timeoutMsecs) throws Exception {
drainEventsImplicitly();
RMContainer container = getResourceScheduler().getRMContainer(containerId); RMContainer container = getResourceScheduler().getRMContainer(containerId);
int timeWaiting = 0; int timeWaiting = 0;
while (container == null) { while (container == null) {
@ -404,6 +452,7 @@ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
for (MockNM nm : nms) { for (MockNM nm : nms) {
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
} }
drainEventsImplicitly();
container = getResourceScheduler().getRMContainer(containerId); container = getResourceScheduler().getRMContainer(containerId);
LOG.info("Waiting for container " + containerId + " to be " LOG.info("Waiting for container " + containerId + " to be "
+ containerState + ", container is null right now."); + containerState + ", container is null right now.");
@ -421,6 +470,7 @@ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
for (MockNM nm : nms) { for (MockNM nm : nms) {
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
} }
drainEventsImplicitly();
Thread.sleep(WAIT_MS_PER_LOOP); Thread.sleep(WAIT_MS_PER_LOOP);
timeWaiting += WAIT_MS_PER_LOOP; timeWaiting += WAIT_MS_PER_LOOP;
} }
@ -698,7 +748,7 @@ PrivilegedAction<SubmitApplicationResponse> setClientReq(
public MockNM registerNode(String nodeIdStr, int memory) throws Exception { public MockNM registerNode(String nodeIdStr, int memory) throws Exception {
MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService()); MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService());
nm.registerNode(); nm.registerNode();
drainEvents(); drainEventsImplicitly();
return nm; return nm;
} }
@ -707,7 +757,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores)
MockNM nm = MockNM nm =
new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService()); new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService());
nm.registerNode(); nm.registerNode();
drainEvents(); drainEventsImplicitly();
return nm; return nm;
} }
@ -717,7 +767,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores,
new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(),
YarnVersionInfo.getVersion()); YarnVersionInfo.getVersion());
nm.registerNode(runningApplications); nm.registerNode(runningApplications);
drainEvents(); drainEventsImplicitly();
return nm; return nm;
} }
@ -725,12 +775,14 @@ public void sendNodeStarted(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId()); nm.getNodeId());
node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null));
drainEventsImplicitly();
} }
public void sendNodeLost(MockNM nm) throws Exception { public void sendNodeLost(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId()); nm.getNodeId());
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE));
drainEventsImplicitly();
} }
/** /**
@ -743,6 +795,7 @@ public void sendNodeLost(MockNM nm) throws Exception {
*/ */
public void waitForState(NodeId nodeId, NodeState finalState) public void waitForState(NodeId nodeId, NodeState finalState)
throws InterruptedException { throws InterruptedException {
drainEventsImplicitly();
RMNode node = getRMContext().getRMNodes().get(nodeId); RMNode node = getRMContext().getRMNodes().get(nodeId);
if (node == null) { if (node == null) {
node = getRMContext().getInactiveRMNodes().get(nodeId); node = getRMContext().getInactiveRMNodes().get(nodeId);
@ -774,7 +827,9 @@ public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception {
public KillApplicationResponse killApp(ApplicationId appId) throws Exception { public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
ApplicationClientProtocol client = getClientRMService(); ApplicationClientProtocol client = getClientRMService();
KillApplicationRequest req = KillApplicationRequest.newInstance(appId); KillApplicationRequest req = KillApplicationRequest.newInstance(appId);
return client.forceKillApplication(req); KillApplicationResponse response = client.forceKillApplication(req);
drainEventsImplicitly();
return response;
} }
public FailApplicationAttemptResponse failApplicationAttempt( public FailApplicationAttemptResponse failApplicationAttempt(
@ -782,7 +837,10 @@ public FailApplicationAttemptResponse failApplicationAttempt(
ApplicationClientProtocol client = getClientRMService(); ApplicationClientProtocol client = getClientRMService();
FailApplicationAttemptRequest req = FailApplicationAttemptRequest req =
FailApplicationAttemptRequest.newInstance(attemptId); FailApplicationAttemptRequest.newInstance(attemptId);
return client.failApplicationAttempt(req); FailApplicationAttemptResponse response =
client.failApplicationAttempt(req);
drainEventsImplicitly();
return response;
} }
/** /**
@ -807,6 +865,7 @@ public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId)
.getEventHandler() .getEventHandler()
.handle( .handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED)); new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED));
drainEventsImplicitly();
return am; return am;
} }
@ -817,6 +876,7 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId)
getRMContext().getDispatcher().getEventHandler() getRMContext().getDispatcher().getEventHandler()
.handle(new RMAppAttemptEvent(appAttemptId, .handle(new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.LAUNCH_FAILED, "Failed")); RMAppAttemptEventType.LAUNCH_FAILED, "Failed"));
drainEventsImplicitly();
} }
@Override @Override
@ -966,6 +1026,7 @@ public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm,
am.unregisterAppAttempt(req,true); am.unregisterAppAttempt(req,true);
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHING); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
rm.drainEventsImplicitly();
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
} }
@ -974,6 +1035,7 @@ public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm,
private static void waitForSchedulerAppAttemptAdded( private static void waitForSchedulerAppAttemptAdded(
ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException { ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException {
int tick = 0; int tick = 0;
rm.drainEventsImplicitly();
// Wait for at most 5 sec // Wait for at most 5 sec
while (null == ((AbstractYarnScheduler) rm.getResourceScheduler()) while (null == ((AbstractYarnScheduler) rm.getResourceScheduler())
.getApplicationAttempt(attemptId) && tick < 50) { .getApplicationAttempt(attemptId) && tick < 50) {
@ -1015,9 +1077,11 @@ public static MockAM launchAMWhenAsyncSchedulingEnabled(RMApp app, MockRM rm)
*/ */
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception { throws Exception {
rm.drainEventsImplicitly();
RMAppAttempt attempt = waitForAttemptScheduled(app, rm); RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
LOG.info("Launch AM " + attempt.getAppAttemptId()); LOG.info("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
rm.drainEventsImplicitly();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
return am; return am;
@ -1025,12 +1089,14 @@ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm) public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm)
throws Exception { throws Exception {
rm.drainEventsImplicitly();
// UAMs go directly to LAUNCHED state // UAMs go directly to LAUNCHED state
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt(); RMAppAttempt attempt = app.getCurrentAppAttempt();
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
LOG.info("Launch AM " + attempt.getAppAttemptId()); LOG.info("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
rm.drainEventsImplicitly();
MockAM am = new MockAM(rm.getRMContext(), rm.masterService, MockAM am = new MockAM(rm.getRMContext(), rm.masterService,
attempt.getAppAttemptId()); attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
@ -1067,6 +1133,7 @@ public void updateReservationState(ReservationUpdateRequest request)
throws IOException, YarnException { throws IOException, YarnException {
ApplicationClientProtocol client = getClientRMService(); ApplicationClientProtocol client = getClientRMService();
client.updateReservation(request); client.updateReservation(request);
drainEventsImplicitly();
} }
// Explicitly reset queue metrics for testing. // Explicitly reset queue metrics for testing.
@ -1087,6 +1154,7 @@ public void signalToContainer(ContainerId containerId,
SignalContainerRequest req = SignalContainerRequest req =
SignalContainerRequest.newInstance(containerId, command); SignalContainerRequest.newInstance(containerId, command);
client.signalToContainer(req); client.signalToContainer(req);
drainEventsImplicitly();
} }
/** /**
@ -1099,6 +1167,7 @@ public void signalToContainer(ContainerId containerId,
public void waitForAppRemovedFromScheduler(ApplicationId appId) public void waitForAppRemovedFromScheduler(ApplicationId appId)
throws InterruptedException { throws InterruptedException {
int timeWaiting = 0; int timeWaiting = 0;
drainEventsImplicitly();
Map<ApplicationId, SchedulerApplication> apps = Map<ApplicationId, SchedulerApplication> apps =
((AbstractYarnScheduler) getResourceScheduler()) ((AbstractYarnScheduler) getResourceScheduler())
@ -1116,6 +1185,20 @@ public void waitForAppRemovedFromScheduler(ApplicationId appId)
LOG.info("app is removed from scheduler, " + appId); LOG.info("app is removed from scheduler, " + appId);
} }
private void drainEventsImplicitly() {
if (!disableDrainEventsImplicitly) {
drainEvents();
}
}
public void disableDrainEventsImplicitly() {
disableDrainEventsImplicitly = true;
}
public void enableDrainEventsImplicityly() {
disableDrainEventsImplicitly = false;
}
public RMApp submitApp(int masterMemory, Priority priority, public RMApp submitApp(int masterMemory, Priority priority,
Map<ApplicationTimeoutType, Long> applicationTimeouts) throws Exception { Map<ApplicationTimeoutType, Long> applicationTimeouts) throws Exception {
Resource resource = Resource.newInstance(masterMemory, 0); Resource resource = Resource.newInstance(masterMemory, 0);

View File

@ -42,9 +42,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@ -53,7 +50,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@ -167,17 +163,6 @@ public void testContainerCleanup() throws Exception {
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
final DrainDispatcher dispatcher = new DrainDispatcher(); final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm = new MockRM() { MockRM rm = new MockRM() {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
super.handle(event);
}
};
}
@Override @Override
protected Dispatcher createDispatcher() { protected Dispatcher createDispatcher() {
return dispatcher; return dispatcher;

View File

@ -31,8 +31,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -43,7 +41,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
@ -244,17 +241,6 @@ private MockRM startRM(YarnConfiguration conf,
memStore.init(conf); memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore) { MockRM rm1 = new MockRM(conf, memStore) {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
super.handle(event);
}
};
}
@Override @Override
protected Dispatcher createDispatcher() { protected Dispatcher createDispatcher() {
return dispatcher; return dispatcher;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.junit.Before; import org.junit.Before;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
@ -56,7 +57,6 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -559,7 +559,7 @@ public void testInvalidatedAMHostPortOnAMRestart() throws Exception {
@Test (timeout = 60000) @Test (timeout = 60000)
public void testApplicationKillAtAcceptedState() throws Exception { public void testApplicationKillAtAcceptedState() throws Exception {
final Dispatcher dispatcher = new AsyncDispatcher() { final Dispatcher dispatcher = new DrainDispatcher() {
@Override @Override
public EventHandler getEventHandler() { public EventHandler getEventHandler() {
@ -640,7 +640,7 @@ protected Dispatcher createDispatcher() {
public void testKillFinishingApp() throws Exception{ public void testKillFinishingApp() throws Exception{
// this dispatcher ignores RMAppAttemptEventType.KILL event // this dispatcher ignores RMAppAttemptEventType.KILL event
final Dispatcher dispatcher = new AsyncDispatcher() { final Dispatcher dispatcher = new DrainDispatcher() {
@Override @Override
public EventHandler getEventHandler() { public EventHandler getEventHandler() {
@ -694,7 +694,7 @@ protected Dispatcher createDispatcher() {
public void testKillFailingApp() throws Exception{ public void testKillFailingApp() throws Exception{
// this dispatcher ignores RMAppAttemptEventType.KILL event // this dispatcher ignores RMAppAttemptEventType.KILL event
final Dispatcher dispatcher = new AsyncDispatcher() { final Dispatcher dispatcher = new DrainDispatcher() {
@Override @Override
public EventHandler getEventHandler() { public EventHandler getEventHandler() {

View File

@ -1514,6 +1514,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
// start RM // start RM
final MockRM rm1 = createMockRM(conf, memStore); final MockRM rm1 = createMockRM(conf, memStore);
rm1.disableDrainEventsImplicitly();
rm1.start(); rm1.start();
// create apps. // create apps.

View File

@ -21,7 +21,6 @@
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -34,7 +33,6 @@
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@ -42,7 +40,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -64,16 +61,6 @@ public void init(Configuration conf) {
"1.0"); "1.0");
super.init(conf); super.init(conf);
} }
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
super.handle(event);
}
};
}
@Override @Override
protected Dispatcher createDispatcher() { protected Dispatcher createDispatcher() {

View File

@ -574,7 +574,7 @@ public void testResourceRequestRecoveryToTheRightAppAttempt()
// AM crashes, and a new app-attempt gets created // AM crashes, and a new app-attempt gets created
node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE); node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE);
rm.waitForState(node, am1ContainerID, RMContainerState.COMPLETED, 30 * 1000); rm.drainEvents();
RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm); RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm);
ApplicationAttemptId applicationAttemptTwoID = ApplicationAttemptId applicationAttemptTwoID =
rmAppAttempt2.getAppAttemptId(); rmAppAttempt2.getAppAttemptId();

View File

@ -4620,6 +4620,13 @@ private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode(
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
containerManagerPort, httpPort, rackName, capability, containerManagerPort, httpPort, rackName, capability,
resourceManager); resourceManager);
// after YARN-5375, scheduler event is processed in rm main dispatcher,
// wait it processed, or may lead dead lock
if (resourceManager instanceof MockRM) {
((MockRM) resourceManager).drainEvents();
}
NodeAddedSchedulerEvent nodeAddEvent1 = NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
.get(nm.getNodeId())); .get(nm.getNodeId()));

View File

@ -88,6 +88,7 @@ protected void configureServlets() {
rm = new MockRM(new Configuration()); rm = new MockRM(new Configuration());
rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
rm.disableDrainEventsImplicitly();
bind(ResourceManager.class).toInstance(rm); bind(ResourceManager.class).toInstance(rm);
serve("/*").with(GuiceContainer.class); serve("/*").with(GuiceContainer.class);
} }