svn merge -c 1363067 FIXES: MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved (Jason Lowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1363070 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e8b0614072
commit
e69e5c9cd3
|
@ -1,5 +1,8 @@
|
||||||
Hadoop MapReduce Change Log
|
Hadoop MapReduce Change Log
|
||||||
|
|
||||||
|
MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved
|
||||||
|
(Jason Lowe via bobby)
|
||||||
|
|
||||||
MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in
|
MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in
|
||||||
YarnConfiguration. (ahmed via tucu)
|
YarnConfiguration. (ahmed via tucu)
|
||||||
|
|
||||||
|
|
|
@ -51,6 +51,8 @@ public interface RMContext {
|
||||||
|
|
||||||
AMLivelinessMonitor getAMLivelinessMonitor();
|
AMLivelinessMonitor getAMLivelinessMonitor();
|
||||||
|
|
||||||
|
AMLivelinessMonitor getAMFinishingMonitor();
|
||||||
|
|
||||||
ContainerAllocationExpirer getContainerAllocationExpirer();
|
ContainerAllocationExpirer getContainerAllocationExpirer();
|
||||||
|
|
||||||
DelegationTokenRenewer getDelegationTokenRenewer();
|
DelegationTokenRenewer getDelegationTokenRenewer();
|
||||||
|
|
|
@ -49,6 +49,7 @@ public class RMContextImpl implements RMContext {
|
||||||
= new ConcurrentHashMap<String, RMNode>();
|
= new ConcurrentHashMap<String, RMNode>();
|
||||||
|
|
||||||
private AMLivelinessMonitor amLivelinessMonitor;
|
private AMLivelinessMonitor amLivelinessMonitor;
|
||||||
|
private AMLivelinessMonitor amFinishingMonitor;
|
||||||
private ContainerAllocationExpirer containerAllocationExpirer;
|
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||||
private final DelegationTokenRenewer tokenRenewer;
|
private final DelegationTokenRenewer tokenRenewer;
|
||||||
private final ApplicationTokenSecretManager appTokenSecretManager;
|
private final ApplicationTokenSecretManager appTokenSecretManager;
|
||||||
|
@ -56,12 +57,14 @@ public class RMContextImpl implements RMContext {
|
||||||
public RMContextImpl(Store store, Dispatcher rmDispatcher,
|
public RMContextImpl(Store store, Dispatcher rmDispatcher,
|
||||||
ContainerAllocationExpirer containerAllocationExpirer,
|
ContainerAllocationExpirer containerAllocationExpirer,
|
||||||
AMLivelinessMonitor amLivelinessMonitor,
|
AMLivelinessMonitor amLivelinessMonitor,
|
||||||
|
AMLivelinessMonitor amFinishingMonitor,
|
||||||
DelegationTokenRenewer tokenRenewer,
|
DelegationTokenRenewer tokenRenewer,
|
||||||
ApplicationTokenSecretManager appTokenSecretManager) {
|
ApplicationTokenSecretManager appTokenSecretManager) {
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.rmDispatcher = rmDispatcher;
|
this.rmDispatcher = rmDispatcher;
|
||||||
this.containerAllocationExpirer = containerAllocationExpirer;
|
this.containerAllocationExpirer = containerAllocationExpirer;
|
||||||
this.amLivelinessMonitor = amLivelinessMonitor;
|
this.amLivelinessMonitor = amLivelinessMonitor;
|
||||||
|
this.amFinishingMonitor = amFinishingMonitor;
|
||||||
this.tokenRenewer = tokenRenewer;
|
this.tokenRenewer = tokenRenewer;
|
||||||
this.appTokenSecretManager = appTokenSecretManager;
|
this.appTokenSecretManager = appTokenSecretManager;
|
||||||
}
|
}
|
||||||
|
@ -106,6 +109,11 @@ public class RMContextImpl implements RMContext {
|
||||||
return this.amLivelinessMonitor;
|
return this.amLivelinessMonitor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AMLivelinessMonitor getAMFinishingMonitor() {
|
||||||
|
return this.amFinishingMonitor;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DelegationTokenRenewer getDelegationTokenRenewer() {
|
public DelegationTokenRenewer getDelegationTokenRenewer() {
|
||||||
return tokenRenewer;
|
return tokenRenewer;
|
||||||
|
|
|
@ -155,13 +155,16 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
|
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
|
||||||
addService(amLivelinessMonitor);
|
addService(amLivelinessMonitor);
|
||||||
|
|
||||||
|
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
|
||||||
|
addService(amFinishingMonitor);
|
||||||
|
|
||||||
DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer();
|
DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer();
|
||||||
addService(tokenRenewer);
|
addService(tokenRenewer);
|
||||||
|
|
||||||
this.rmContext =
|
this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
|
||||||
new RMContextImpl(this.store, this.rmDispatcher,
|
this.containerAllocationExpirer,
|
||||||
this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer,
|
amLivelinessMonitor, amFinishingMonitor,
|
||||||
this.appTokenSecretManager);
|
tokenRenewer, this.appTokenSecretManager);
|
||||||
|
|
||||||
// Register event handler for NodesListManager
|
// Register event handler for NodesListManager
|
||||||
this.nodesListManager = new NodesListManager(this.rmContext);
|
this.nodesListManager = new NodesListManager(this.rmContext);
|
||||||
|
|
|
@ -27,6 +27,7 @@ public enum RMAppEventType {
|
||||||
APP_REJECTED,
|
APP_REJECTED,
|
||||||
APP_ACCEPTED,
|
APP_ACCEPTED,
|
||||||
ATTEMPT_REGISTERED,
|
ATTEMPT_REGISTERED,
|
||||||
|
ATTEMPT_FINISHING,
|
||||||
ATTEMPT_FINISHED, // Will send the final state
|
ATTEMPT_FINISHED, // Will send the final state
|
||||||
ATTEMPT_FAILED,
|
ATTEMPT_FAILED,
|
||||||
ATTEMPT_KILLED,
|
ATTEMPT_KILLED,
|
||||||
|
|
|
@ -147,6 +147,8 @@ public class RMAppImpl implements RMApp {
|
||||||
// Transitions from RUNNING state
|
// Transitions from RUNNING state
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
|
.addTransition(RMAppState.RUNNING, RMAppState.FINISHING,
|
||||||
|
RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
|
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
|
||||||
RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
|
RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
|
||||||
.addTransition(RMAppState.RUNNING,
|
.addTransition(RMAppState.RUNNING,
|
||||||
|
@ -156,12 +158,24 @@ public class RMAppImpl implements RMApp {
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
|
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
|
||||||
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||||
|
|
||||||
|
// Transitions from FINISHING state
|
||||||
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
||||||
|
RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
|
||||||
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
||||||
|
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||||
|
// ignorable transitions
|
||||||
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
||||||
|
RMAppEventType.NODE_UPDATE)
|
||||||
|
|
||||||
// Transitions from FINISHED state
|
// Transitions from FINISHED state
|
||||||
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
||||||
RMAppEventType.KILL)
|
RMAppEventType.KILL)
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
||||||
RMAppEventType.NODE_UPDATE)
|
EnumSet.of(
|
||||||
|
RMAppEventType.NODE_UPDATE,
|
||||||
|
RMAppEventType.ATTEMPT_FINISHING,
|
||||||
|
RMAppEventType.ATTEMPT_FINISHED))
|
||||||
|
|
||||||
// Transitions from FAILED state
|
// Transitions from FAILED state
|
||||||
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
||||||
|
@ -339,6 +353,7 @@ public class RMAppImpl implements RMApp {
|
||||||
return YarnApplicationState.ACCEPTED;
|
return YarnApplicationState.ACCEPTED;
|
||||||
case RUNNING:
|
case RUNNING:
|
||||||
return YarnApplicationState.RUNNING;
|
return YarnApplicationState.RUNNING;
|
||||||
|
case FINISHING:
|
||||||
case FINISHED:
|
case FINISHED:
|
||||||
return YarnApplicationState.FINISHED;
|
return YarnApplicationState.FINISHED;
|
||||||
case KILLED:
|
case KILLED:
|
||||||
|
@ -357,6 +372,7 @@ public class RMAppImpl implements RMApp {
|
||||||
case RUNNING:
|
case RUNNING:
|
||||||
return FinalApplicationStatus.UNDEFINED;
|
return FinalApplicationStatus.UNDEFINED;
|
||||||
// finished without a proper final state is the same as failed
|
// finished without a proper final state is the same as failed
|
||||||
|
case FINISHING:
|
||||||
case FINISHED:
|
case FINISHED:
|
||||||
case FAILED:
|
case FAILED:
|
||||||
return FinalApplicationStatus.FAILED;
|
return FinalApplicationStatus.FAILED;
|
||||||
|
@ -548,6 +564,14 @@ public class RMAppImpl implements RMApp {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class RMAppFinishingTransition extends
|
||||||
|
RMAppTransition {
|
||||||
|
@Override
|
||||||
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
|
app.finishTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class AppKilledTransition extends FinalTransition {
|
private static class AppKilledTransition extends FinalTransition {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
|
@ -591,7 +615,9 @@ public class RMAppImpl implements RMApp {
|
||||||
app.handler.handle(
|
app.handler.handle(
|
||||||
new RMNodeCleanAppEvent(nodeId, app.applicationId));
|
new RMNodeCleanAppEvent(nodeId, app.applicationId));
|
||||||
}
|
}
|
||||||
|
if (app.getState() != RMAppState.FINISHING) {
|
||||||
app.finishTime = System.currentTimeMillis();
|
app.finishTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
app.handler.handle(
|
app.handler.handle(
|
||||||
new RMAppManagerEvent(app.applicationId,
|
new RMAppManagerEvent(app.applicationId,
|
||||||
RMAppManagerEventType.APP_COMPLETED));
|
RMAppManagerEventType.APP_COMPLETED));
|
||||||
|
|
|
@ -19,5 +19,5 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||||
|
|
||||||
public enum RMAppState {
|
public enum RMAppState {
|
||||||
NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
|
NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHING, FINISHED, FAILED, KILLED
|
||||||
}
|
}
|
||||||
|
|
|
@ -197,7 +197,8 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
||||||
new FinalTransition(RMAppAttemptState.KILLED))
|
new FinalTransition(RMAppAttemptState.KILLED))
|
||||||
|
|
||||||
// Transitions from RUNNING State
|
// Transitions from RUNNING State
|
||||||
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINISHED,
|
.addTransition(RMAppAttemptState.RUNNING,
|
||||||
|
EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED),
|
||||||
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
|
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
|
||||||
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
|
||||||
RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
|
RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
|
||||||
|
@ -233,6 +234,21 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
||||||
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
||||||
RMAppAttemptEventType.CONTAINER_FINISHED))
|
RMAppAttemptEventType.CONTAINER_FINISHED))
|
||||||
|
|
||||||
|
// Transitions from FINISHING State
|
||||||
|
.addTransition(RMAppAttemptState.FINISHING,
|
||||||
|
EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED),
|
||||||
|
RMAppAttemptEventType.CONTAINER_FINISHED,
|
||||||
|
new AMFinishingContainerFinishedTransition())
|
||||||
|
.addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED,
|
||||||
|
RMAppAttemptEventType.EXPIRE,
|
||||||
|
new FinalTransition(RMAppAttemptState.FINISHED))
|
||||||
|
.addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHING,
|
||||||
|
EnumSet.of(
|
||||||
|
RMAppAttemptEventType.UNREGISTERED,
|
||||||
|
RMAppAttemptEventType.STATUS_UPDATE,
|
||||||
|
RMAppAttemptEventType.CONTAINER_ALLOCATED,
|
||||||
|
RMAppAttemptEventType.KILL))
|
||||||
|
|
||||||
// Transitions from FINISHED State
|
// Transitions from FINISHED State
|
||||||
.addTransition(
|
.addTransition(
|
||||||
RMAppAttemptState.FINISHED,
|
RMAppAttemptState.FINISHED,
|
||||||
|
@ -830,6 +846,8 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
||||||
// UnRegister from AMLivelinessMonitor
|
// UnRegister from AMLivelinessMonitor
|
||||||
appAttempt.rmContext.getAMLivelinessMonitor().unregister(
|
appAttempt.rmContext.getAMLivelinessMonitor().unregister(
|
||||||
appAttempt.getAppAttemptId());
|
appAttempt.getAppAttemptId());
|
||||||
|
appAttempt.rmContext.getAMFinishingMonitor().unregister(
|
||||||
|
appAttempt.getAppAttemptId());
|
||||||
|
|
||||||
if(!appAttempt.submissionContext.getUnmanagedAM()) {
|
if(!appAttempt.submissionContext.getUnmanagedAM()) {
|
||||||
// Tell the launcher to cleanup.
|
// Tell the launcher to cleanup.
|
||||||
|
@ -874,15 +892,21 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class AMUnregisteredTransition extends FinalTransition {
|
private static final class AMUnregisteredTransition implements
|
||||||
|
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
|
||||||
public AMUnregisteredTransition() {
|
|
||||||
super(RMAppAttemptState.FINISHED);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMAppAttemptImpl appAttempt,
|
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||||
RMAppAttemptEvent event) {
|
RMAppAttemptEvent event) {
|
||||||
|
ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
|
||||||
|
|
||||||
|
appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId);
|
||||||
|
|
||||||
|
// Remove the AppAttempt from the ApplicationTokenSecretManager
|
||||||
|
appAttempt.rmContext.getApplicationTokenSecretManager()
|
||||||
|
.applicationMasterFinished(appAttemptId);
|
||||||
|
|
||||||
|
appAttempt.progress = 1.0f;
|
||||||
|
|
||||||
RMAppAttemptUnregistrationEvent unregisterEvent
|
RMAppAttemptUnregistrationEvent unregisterEvent
|
||||||
= (RMAppAttemptUnregistrationEvent) event;
|
= (RMAppAttemptUnregistrationEvent) event;
|
||||||
|
@ -892,8 +916,20 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
||||||
appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
|
appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
|
||||||
appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
|
appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
|
||||||
|
|
||||||
// Tell the app and the scheduler
|
// Tell the app
|
||||||
super.transition(appAttempt, event);
|
if (appAttempt.getSubmissionContext().getUnmanagedAM()) {
|
||||||
|
// Unmanaged AMs have no container to wait for, so they skip
|
||||||
|
// the FINISHING state and go straight to FINISHED.
|
||||||
|
new FinalTransition(RMAppAttemptState.FINISHED).transition(
|
||||||
|
appAttempt, event);
|
||||||
|
return RMAppAttemptState.FINISHED;
|
||||||
|
}
|
||||||
|
appAttempt.rmContext.getAMFinishingMonitor().register(appAttemptId);
|
||||||
|
ApplicationId applicationId =
|
||||||
|
appAttempt.getAppAttemptId().getApplicationId();
|
||||||
|
appAttempt.eventHandler.handle(
|
||||||
|
new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHING));
|
||||||
|
return RMAppAttemptState.FINISHING;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -958,6 +994,33 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class AMFinishingContainerFinishedTransition
|
||||||
|
implements
|
||||||
|
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||||
|
RMAppAttemptEvent event) {
|
||||||
|
|
||||||
|
RMAppAttemptContainerFinishedEvent containerFinishedEvent
|
||||||
|
= (RMAppAttemptContainerFinishedEvent) event;
|
||||||
|
ContainerStatus containerStatus =
|
||||||
|
containerFinishedEvent.getContainerStatus();
|
||||||
|
|
||||||
|
// Is this container the ApplicationMaster container?
|
||||||
|
if (appAttempt.masterContainer.getId().equals(
|
||||||
|
containerStatus.getContainerId())) {
|
||||||
|
new FinalTransition(RMAppAttemptState.FINISHED).transition(
|
||||||
|
appAttempt, containerFinishedEvent);
|
||||||
|
return RMAppAttemptState.FINISHED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normal container.
|
||||||
|
appAttempt.justFinishedContainers.add(containerStatus);
|
||||||
|
return RMAppAttemptState.FINISHING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getStartTime() {
|
public long getStartTime() {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
|
|
|
@ -19,6 +19,6 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
||||||
|
|
||||||
public enum RMAppAttemptState {
|
public enum RMAppAttemptState {
|
||||||
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHED,
|
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING,
|
||||||
KILLED,
|
FINISHING, FINISHED, KILLED,
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,8 @@ public class RmController extends Controller {
|
||||||
RMAppState.NEW.toString(),
|
RMAppState.NEW.toString(),
|
||||||
RMAppState.SUBMITTED.toString(),
|
RMAppState.SUBMITTED.toString(),
|
||||||
RMAppState.ACCEPTED.toString(),
|
RMAppState.ACCEPTED.toString(),
|
||||||
RMAppState.RUNNING.toString()));
|
RMAppState.RUNNING.toString(),
|
||||||
|
RMAppState.FINISHING.toString()));
|
||||||
|
|
||||||
ResourceManager rm = getInstance(ResourceManager.class);
|
ResourceManager rm = getInstance(ResourceManager.class);
|
||||||
ResourceScheduler rs = rm.getResourceScheduler();
|
ResourceScheduler rs = rm.getResourceScheduler();
|
||||||
|
|
|
@ -18,13 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -33,6 +36,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
public class MockNM {
|
public class MockNM {
|
||||||
|
@ -85,6 +89,20 @@ public class MockNM {
|
||||||
b, ++responseId);
|
b, ++responseId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
|
||||||
|
int containerId, ContainerState containerState) throws Exception {
|
||||||
|
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
|
||||||
|
new HashMap<ApplicationId, List<ContainerStatus>>(1);
|
||||||
|
ContainerStatus amContainerStatus = BuilderUtils.newContainerStatus(
|
||||||
|
BuilderUtils.newContainerId(attemptId, 1),
|
||||||
|
ContainerState.COMPLETE, "Success", 0);
|
||||||
|
ArrayList<ContainerStatus> containerStatusList =
|
||||||
|
new ArrayList<ContainerStatus>(1);
|
||||||
|
containerStatusList.add(amContainerStatus);
|
||||||
|
nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
|
||||||
|
return nodeHeartbeat(nodeUpdate, true);
|
||||||
|
}
|
||||||
|
|
||||||
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||||
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
|
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
|
||||||
return nodeHeartbeat(conts, isHealthy, ++responseId);
|
return nodeHeartbeat(conts, isHealthy, ++responseId);
|
||||||
|
|
|
@ -91,8 +91,11 @@ public class TestAppManager{
|
||||||
rmDispatcher);
|
rmDispatcher);
|
||||||
AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
|
AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
|
||||||
rmDispatcher);
|
rmDispatcher);
|
||||||
|
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
|
||||||
|
rmDispatcher);
|
||||||
return new RMContextImpl(new MemStore(), rmDispatcher,
|
return new RMContextImpl(new MemStore(), rmDispatcher,
|
||||||
containerAllocationExpirer, amLivelinessMonitor, null, null) {
|
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||||
|
null, null) {
|
||||||
@Override
|
@Override
|
||||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||||
return map;
|
return map;
|
||||||
|
|
|
@ -92,6 +92,8 @@ public class TestApplicationCleanup {
|
||||||
Assert.assertEquals(request, conts.size());
|
Assert.assertEquals(request, conts.size());
|
||||||
|
|
||||||
am.unregisterAppAttempt();
|
am.unregisterAppAttempt();
|
||||||
|
HeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
|
||||||
|
ContainerState.COMPLETE);
|
||||||
am.waitForState(RMAppAttemptState.FINISHED);
|
am.waitForState(RMAppAttemptState.FINISHED);
|
||||||
|
|
||||||
int cleanedConts = 0;
|
int cleanedConts = 0;
|
||||||
|
@ -102,8 +104,7 @@ public class TestApplicationCleanup {
|
||||||
//currently only containers are cleaned via this
|
//currently only containers are cleaned via this
|
||||||
//AM container is cleaned via container launcher
|
//AM container is cleaned via container launcher
|
||||||
waitCount = 0;
|
waitCount = 0;
|
||||||
while ((cleanedConts < 3 || cleanedApps < 1) && waitCount++ < 20) {
|
while ((cleanedConts < 2 || cleanedApps < 1) && waitCount++ < 20) {
|
||||||
HeartbeatResponse resp = nm1.nodeHeartbeat(true);
|
|
||||||
contsToClean = resp.getContainersToCleanupList();
|
contsToClean = resp.getContainersToCleanupList();
|
||||||
apps = resp.getApplicationsToCleanupList();
|
apps = resp.getApplicationsToCleanupList();
|
||||||
LOG.info("Waiting to get cleanup events.. cleanedConts: "
|
LOG.info("Waiting to get cleanup events.. cleanedConts: "
|
||||||
|
@ -111,12 +112,13 @@ public class TestApplicationCleanup {
|
||||||
cleanedConts += contsToClean.size();
|
cleanedConts += contsToClean.size();
|
||||||
cleanedApps += apps.size();
|
cleanedApps += apps.size();
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
resp = nm1.nodeHeartbeat(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(1, apps.size());
|
Assert.assertEquals(1, apps.size());
|
||||||
Assert.assertEquals(app.getApplicationId(), apps.get(0));
|
Assert.assertEquals(app.getApplicationId(), apps.get(0));
|
||||||
Assert.assertEquals(1, cleanedApps);
|
Assert.assertEquals(1, cleanedApps);
|
||||||
Assert.assertEquals(3, cleanedConts);
|
Assert.assertEquals(2, cleanedConts);
|
||||||
|
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||||
|
@ -182,6 +183,10 @@ public class TestApplicationMasterLauncher {
|
||||||
am.registerAppAttempt();
|
am.registerAppAttempt();
|
||||||
am.unregisterAppAttempt();
|
am.unregisterAppAttempt();
|
||||||
|
|
||||||
|
//complete the AM container to finish the app normally
|
||||||
|
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
|
am.waitForState(RMAppAttemptState.FINISHED);
|
||||||
|
|
||||||
waitCount = 0;
|
waitCount = 0;
|
||||||
while (containerManager.cleanedup == false && waitCount++ < 20) {
|
while (containerManager.cleanedup == false && waitCount++ < 20) {
|
||||||
LOG.info("Waiting for AM Cleanup to happen..");
|
LOG.info("Waiting for AM Cleanup to happen..");
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
@ -72,6 +73,7 @@ public class TestRM {
|
||||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||||
am.registerAppAttempt();
|
am.registerAppAttempt();
|
||||||
am.unregisterAppAttempt();
|
am.unregisterAppAttempt();
|
||||||
|
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
am.waitForState(RMAppAttemptState.FINISHED);
|
am.waitForState(RMAppAttemptState.FINISHED);
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
@ -127,6 +129,7 @@ public class TestRM {
|
||||||
Assert.assertEquals(10, conts.size());
|
Assert.assertEquals(10, conts.size());
|
||||||
|
|
||||||
am.unregisterAppAttempt();
|
am.unregisterAppAttempt();
|
||||||
|
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
am.waitForState(RMAppAttemptState.FINISHED);
|
am.waitForState(RMAppAttemptState.FINISHED);
|
||||||
|
|
||||||
rm.stop();
|
rm.stop();
|
||||||
|
|
|
@ -77,7 +77,7 @@ public class TestRMNodeTransitions {
|
||||||
InlineDispatcher rmDispatcher = new InlineDispatcher();
|
InlineDispatcher rmDispatcher = new InlineDispatcher();
|
||||||
|
|
||||||
rmContext =
|
rmContext =
|
||||||
new RMContextImpl(new MemStore(), rmDispatcher, null, null,
|
new RMContextImpl(new MemStore(), rmDispatcher, null, null, null,
|
||||||
mock(DelegationTokenRenewer.class), null);
|
mock(DelegationTokenRenewer.class), null);
|
||||||
scheduler = mock(YarnScheduler.class);
|
scheduler = mock(YarnScheduler.class);
|
||||||
doAnswer(
|
doAnswer(
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class TestNMExpiry {
|
||||||
// Dispatcher that processes events inline
|
// Dispatcher that processes events inline
|
||||||
Dispatcher dispatcher = new InlineDispatcher();
|
Dispatcher dispatcher = new InlineDispatcher();
|
||||||
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
|
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
|
||||||
null, null, null);
|
null, null, null, null);
|
||||||
dispatcher.register(SchedulerEventType.class,
|
dispatcher.register(SchedulerEventType.class,
|
||||||
new InlineDispatcher.EmptyEventHandler());
|
new InlineDispatcher.EmptyEventHandler());
|
||||||
dispatcher.register(RMNodeEventType.class,
|
dispatcher.register(RMNodeEventType.class,
|
||||||
|
|
|
@ -65,7 +65,8 @@ public class TestRMNMRPCResponseId {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
RMContext context =
|
RMContext context =
|
||||||
new RMContextImpl(new MemStore(), dispatcher, null, null, null, null);
|
new RMContextImpl(new MemStore(), dispatcher, null, null, null,
|
||||||
|
null, null);
|
||||||
dispatcher.register(RMNodeEventType.class,
|
dispatcher.register(RMNodeEventType.class,
|
||||||
new ResourceManager.NodeEventDispatcher(context));
|
new ResourceManager.NodeEventDispatcher(context));
|
||||||
NodesListManager nodesListManager = new NodesListManager(context);
|
NodesListManager nodesListManager = new NodesListManager(context);
|
||||||
|
|
|
@ -118,10 +118,10 @@ public class TestRMAppTransitions {
|
||||||
ContainerAllocationExpirer containerAllocationExpirer =
|
ContainerAllocationExpirer containerAllocationExpirer =
|
||||||
mock(ContainerAllocationExpirer.class);
|
mock(ContainerAllocationExpirer.class);
|
||||||
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
||||||
this.rmContext =
|
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
|
||||||
new RMContextImpl(new MemStore(), rmDispatcher,
|
this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
|
||||||
containerAllocationExpirer, amLivelinessMonitor, null,
|
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||||
new ApplicationTokenSecretManager(conf));
|
null, new ApplicationTokenSecretManager(conf));
|
||||||
|
|
||||||
rmDispatcher.register(RMAppAttemptEventType.class,
|
rmDispatcher.register(RMAppAttemptEventType.class,
|
||||||
new TestApplicationAttemptEventDispatcher(this.rmContext));
|
new TestApplicationAttemptEventDispatcher(this.rmContext));
|
||||||
|
@ -278,14 +278,35 @@ public class TestRMAppTransitions {
|
||||||
return application;
|
return application;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected RMApp testCreateAppFinishing(
|
||||||
|
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||||
|
// unmanaged AMs don't use the FINISHING state
|
||||||
|
assert submissionContext == null || !submissionContext.getUnmanagedAM();
|
||||||
|
RMApp application = testCreateAppRunning(submissionContext);
|
||||||
|
// RUNNING => FINISHING event RMAppEventType.ATTEMPT_FINISHING
|
||||||
|
RMAppEvent finishingEvent =
|
||||||
|
new RMAppEvent(application.getApplicationId(),
|
||||||
|
RMAppEventType.ATTEMPT_FINISHING);
|
||||||
|
application.handle(finishingEvent);
|
||||||
|
assertAppState(RMAppState.FINISHING, application);
|
||||||
|
assertTimesAtFinish(application);
|
||||||
|
return application;
|
||||||
|
}
|
||||||
|
|
||||||
protected RMApp testCreateAppFinished(
|
protected RMApp testCreateAppFinished(
|
||||||
ApplicationSubmissionContext submissionContext) throws IOException {
|
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||||
RMApp application = testCreateAppRunning(submissionContext);
|
// unmanaged AMs don't use the FINISHING state
|
||||||
// RUNNING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
|
RMApp application = null;
|
||||||
RMAppEvent event =
|
if (submissionContext != null && submissionContext.getUnmanagedAM()) {
|
||||||
|
application = testCreateAppRunning(submissionContext);
|
||||||
|
} else {
|
||||||
|
application = testCreateAppFinishing(submissionContext);
|
||||||
|
}
|
||||||
|
// RUNNING/FINISHING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
|
||||||
|
RMAppEvent finishedEvent =
|
||||||
new RMAppEvent(application.getApplicationId(),
|
new RMAppEvent(application.getApplicationId(),
|
||||||
RMAppEventType.ATTEMPT_FINISHED);
|
RMAppEventType.ATTEMPT_FINISHED);
|
||||||
application.handle(event);
|
application.handle(finishedEvent);
|
||||||
assertAppState(RMAppState.FINISHED, application);
|
assertAppState(RMAppState.FINISHED, application);
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
// finished without a proper unregister implies failed
|
// finished without a proper unregister implies failed
|
||||||
|
@ -468,6 +489,17 @@ public class TestRMAppTransitions {
|
||||||
assertFailed(application, ".*Failing the application.*");
|
assertFailed(application, ".*Failing the application.*");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppFinishingKill() throws IOException {
|
||||||
|
LOG.info("--- START: testAppFinishedFinished ---");
|
||||||
|
|
||||||
|
RMApp application = testCreateAppFinishing(null);
|
||||||
|
// FINISHING => FINISHED event RMAppEventType.KILL
|
||||||
|
RMAppEvent event =
|
||||||
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
||||||
|
application.handle(event);
|
||||||
|
assertAppState(RMAppState.FINISHED, application);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppFinishedFinished() throws IOException {
|
public void testAppFinishedFinished() throws IOException {
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSch
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -87,6 +89,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
private ApplicationMasterService masterService;
|
private ApplicationMasterService masterService;
|
||||||
private ApplicationMasterLauncher applicationMasterLauncher;
|
private ApplicationMasterLauncher applicationMasterLauncher;
|
||||||
private AMLivelinessMonitor amLivelinessMonitor;
|
private AMLivelinessMonitor amLivelinessMonitor;
|
||||||
|
private AMLivelinessMonitor amFinishingMonitor;
|
||||||
|
|
||||||
private RMApp application;
|
private RMApp application;
|
||||||
private RMAppAttempt applicationAttempt;
|
private RMAppAttempt applicationAttempt;
|
||||||
|
@ -150,10 +153,10 @@ public class TestRMAppAttemptTransitions {
|
||||||
ContainerAllocationExpirer containerAllocationExpirer =
|
ContainerAllocationExpirer containerAllocationExpirer =
|
||||||
mock(ContainerAllocationExpirer.class);
|
mock(ContainerAllocationExpirer.class);
|
||||||
amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
||||||
rmContext =
|
amFinishingMonitor = mock(AMLivelinessMonitor.class);
|
||||||
new RMContextImpl(new MemStore(), rmDispatcher,
|
rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
|
||||||
containerAllocationExpirer, amLivelinessMonitor, null,
|
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||||
new ApplicationTokenSecretManager(new Configuration()));
|
null, new ApplicationTokenSecretManager(new Configuration()));
|
||||||
|
|
||||||
scheduler = mock(YarnScheduler.class);
|
scheduler = mock(YarnScheduler.class);
|
||||||
masterService = mock(ApplicationMasterService.class);
|
masterService = mock(ApplicationMasterService.class);
|
||||||
|
@ -366,6 +369,23 @@ public class TestRMAppAttemptTransitions {
|
||||||
// TODO - need to add more checks relevant to this state
|
// TODO - need to add more checks relevant to this state
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link RMAppAttemptState#FINISHING}
|
||||||
|
*/
|
||||||
|
private void testAppAttemptFinishingState(Container container,
|
||||||
|
FinalApplicationStatus finalStatus,
|
||||||
|
String trackingUrl,
|
||||||
|
String diagnostics) {
|
||||||
|
assertEquals(RMAppAttemptState.FINISHING,
|
||||||
|
applicationAttempt.getAppAttemptState());
|
||||||
|
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
||||||
|
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||||
|
assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId().
|
||||||
|
getApplicationId()+"/", applicationAttempt.getTrackingUrl());
|
||||||
|
assertEquals(container, applicationAttempt.getMasterContainer());
|
||||||
|
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link RMAppAttemptState#FINISHED}
|
* {@link RMAppAttemptState#FINISHED}
|
||||||
*/
|
*/
|
||||||
|
@ -408,6 +428,8 @@ public class TestRMAppAttemptTransitions {
|
||||||
|
|
||||||
// Mock the allocation of AM container
|
// Mock the allocation of AM container
|
||||||
Container container = mock(Container.class);
|
Container container = mock(Container.class);
|
||||||
|
when(container.getId()).thenReturn(
|
||||||
|
BuilderUtils.newContainerId(applicationAttempt.getAppAttemptId(), 1));
|
||||||
Allocation allocation = mock(Allocation.class);
|
Allocation allocation = mock(Allocation.class);
|
||||||
when(allocation.getContainers()).
|
when(allocation.getContainers()).
|
||||||
thenReturn(Collections.singletonList(container));
|
thenReturn(Collections.singletonList(container));
|
||||||
|
@ -448,6 +470,18 @@ public class TestRMAppAttemptTransitions {
|
||||||
testAppAttemptRunningState(container, host, rpcPort, trackingUrl);
|
testAppAttemptRunningState(container, host, rpcPort, trackingUrl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void unregisterApplicationAttempt(Container container,
|
||||||
|
FinalApplicationStatus finalStatus, String trackingUrl,
|
||||||
|
String diagnostics) {
|
||||||
|
applicationAttempt.handle(
|
||||||
|
new RMAppAttemptUnregistrationEvent(
|
||||||
|
applicationAttempt.getAppAttemptId(),
|
||||||
|
trackingUrl, finalStatus, diagnostics));
|
||||||
|
testAppAttemptFinishingState(container, finalStatus,
|
||||||
|
trackingUrl, diagnostics);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnmanagedAMSuccess() {
|
public void testUnmanagedAMSuccess() {
|
||||||
unmanagedAM = true;
|
unmanagedAM = true;
|
||||||
|
@ -553,36 +587,99 @@ public class TestRMAppAttemptTransitions {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnregisterToKilledFinish() {
|
public void testUnregisterToKilledFinishing() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||||
String trackingUrl = "newtrackingurl";
|
unregisterApplicationAttempt(amContainer,
|
||||||
String diagnostics = "Killed by user";
|
FinalApplicationStatus.KILLED, "newtrackingurl",
|
||||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.KILLED;
|
"Killed by user");
|
||||||
applicationAttempt.handle(
|
|
||||||
new RMAppAttemptUnregistrationEvent(
|
|
||||||
applicationAttempt.getAppAttemptId(),
|
|
||||||
trackingUrl, finalStatus, diagnostics));
|
|
||||||
testAppAttemptFinishedState(amContainer, finalStatus,
|
|
||||||
trackingUrl, diagnostics, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnregisterToSuccessfulFinish() {
|
public void testUnregisterToSuccessfulFinishing() {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||||
|
unregisterApplicationAttempt(amContainer,
|
||||||
|
FinalApplicationStatus.SUCCEEDED, "mytrackingurl", "Successful");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFinishingKill() {
|
||||||
|
Container amContainer = allocateApplicationAttempt();
|
||||||
|
launchApplicationAttempt(amContainer);
|
||||||
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||||
|
FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
|
||||||
|
String trackingUrl = "newtrackingurl";
|
||||||
|
String diagnostics = "Job failed";
|
||||||
|
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
|
||||||
|
diagnostics);
|
||||||
|
applicationAttempt.handle(
|
||||||
|
new RMAppAttemptEvent(
|
||||||
|
applicationAttempt.getAppAttemptId(),
|
||||||
|
RMAppAttemptEventType.KILL));
|
||||||
|
testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl,
|
||||||
|
diagnostics);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFinishingExpire() {
|
||||||
|
Container amContainer = allocateApplicationAttempt();
|
||||||
|
launchApplicationAttempt(amContainer);
|
||||||
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||||
|
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||||
String trackingUrl = "mytrackingurl";
|
String trackingUrl = "mytrackingurl";
|
||||||
String diagnostics = "Successful";
|
String diagnostics = "Successful";
|
||||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
|
||||||
|
diagnostics);
|
||||||
applicationAttempt.handle(
|
applicationAttempt.handle(
|
||||||
new RMAppAttemptUnregistrationEvent(
|
new RMAppAttemptEvent(
|
||||||
applicationAttempt.getAppAttemptId(),
|
applicationAttempt.getAppAttemptId(),
|
||||||
trackingUrl, finalStatus, diagnostics));
|
RMAppAttemptEventType.EXPIRE));
|
||||||
testAppAttemptFinishedState(amContainer, finalStatus,
|
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||||
trackingUrl, diagnostics, 0);
|
diagnostics, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFinishingToFinishing() {
|
||||||
|
Container amContainer = allocateApplicationAttempt();
|
||||||
|
launchApplicationAttempt(amContainer);
|
||||||
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||||
|
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||||
|
String trackingUrl = "mytrackingurl";
|
||||||
|
String diagnostics = "Successful";
|
||||||
|
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
|
||||||
|
diagnostics);
|
||||||
|
// container must be AM container to move from FINISHING to FINISHED
|
||||||
|
applicationAttempt.handle(
|
||||||
|
new RMAppAttemptContainerFinishedEvent(
|
||||||
|
applicationAttempt.getAppAttemptId(),
|
||||||
|
BuilderUtils.newContainerStatus(
|
||||||
|
BuilderUtils.newContainerId(
|
||||||
|
applicationAttempt.getAppAttemptId(), 42),
|
||||||
|
ContainerState.COMPLETE, "", 0)));
|
||||||
|
testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl,
|
||||||
|
diagnostics);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSuccessfulFinishingToFinished() {
|
||||||
|
Container amContainer = allocateApplicationAttempt();
|
||||||
|
launchApplicationAttempt(amContainer);
|
||||||
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||||
|
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||||
|
String trackingUrl = "mytrackingurl";
|
||||||
|
String diagnostics = "Successful";
|
||||||
|
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
|
||||||
|
diagnostics);
|
||||||
|
applicationAttempt.handle(
|
||||||
|
new RMAppAttemptContainerFinishedEvent(
|
||||||
|
applicationAttempt.getAppAttemptId(),
|
||||||
|
BuilderUtils.newContainerStatus(amContainer.getId(),
|
||||||
|
ContainerState.COMPLETE, "", 0)));
|
||||||
|
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||||
|
diagnostics, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class TestUtils {
|
||||||
new ContainerAllocationExpirer(nullDispatcher);
|
new ContainerAllocationExpirer(nullDispatcher);
|
||||||
|
|
||||||
RMContext rmContext =
|
RMContext rmContext =
|
||||||
new RMContextImpl(null, nullDispatcher, cae, null, null,
|
new RMContextImpl(null, nullDispatcher, cae, null, null, null,
|
||||||
new ApplicationTokenSecretManager(new Configuration()));
|
new ApplicationTokenSecretManager(new Configuration()));
|
||||||
|
|
||||||
return rmContext;
|
return rmContext;
|
||||||
|
|
|
@ -86,8 +86,8 @@ public class TestFifoScheduler {
|
||||||
@Test
|
@Test
|
||||||
public void testAppAttemptMetrics() throws Exception {
|
public void testAppAttemptMetrics() throws Exception {
|
||||||
AsyncDispatcher dispatcher = new InlineDispatcher();
|
AsyncDispatcher dispatcher = new InlineDispatcher();
|
||||||
RMContext rmContext =
|
RMContext rmContext = new RMContextImpl(null, dispatcher, null,
|
||||||
new RMContextImpl(null, dispatcher, null, null, null, null);
|
null, null, null, null);
|
||||||
|
|
||||||
FifoScheduler schedular = new FifoScheduler();
|
FifoScheduler schedular = new FifoScheduler();
|
||||||
schedular.reinitialize(new Configuration(), null, rmContext);
|
schedular.reinitialize(new Configuration(), null, rmContext);
|
||||||
|
|
|
@ -158,7 +158,8 @@ public class TestRMWebApp {
|
||||||
for (RMNode node : deactivatedNodes) {
|
for (RMNode node : deactivatedNodes) {
|
||||||
deactivatedNodesMap.put(node.getHostName(), node);
|
deactivatedNodesMap.put(node.getHostName(), node);
|
||||||
}
|
}
|
||||||
return new RMContextImpl(new MemStore(), null, null, null, null, null) {
|
return new RMContextImpl(new MemStore(), null, null, null, null,
|
||||||
|
null, null) {
|
||||||
@Override
|
@Override
|
||||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||||
return applicationsMaps;
|
return applicationsMaps;
|
||||||
|
|
|
@ -31,6 +31,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
|
@ -545,6 +546,8 @@ public class TestRMWebServicesApps extends JerseyTest {
|
||||||
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
||||||
am.registerAppAttempt();
|
am.registerAppAttempt();
|
||||||
am.unregisterAppAttempt();
|
am.unregisterAppAttempt();
|
||||||
|
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
1, ContainerState.COMPLETE);
|
||||||
rm.submitApp(1024);
|
rm.submitApp(1024);
|
||||||
rm.submitApp(1024);
|
rm.submitApp(1024);
|
||||||
|
|
||||||
|
@ -573,6 +576,8 @@ public class TestRMWebServicesApps extends JerseyTest {
|
||||||
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
||||||
am.registerAppAttempt();
|
am.registerAppAttempt();
|
||||||
am.unregisterAppAttempt();
|
am.unregisterAppAttempt();
|
||||||
|
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
1, ContainerState.COMPLETE);
|
||||||
|
|
||||||
rm.submitApp(1024);
|
rm.submitApp(1024);
|
||||||
rm.submitApp(1024);
|
rm.submitApp(1024);
|
||||||
|
@ -605,6 +610,8 @@ public class TestRMWebServicesApps extends JerseyTest {
|
||||||
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
||||||
am.registerAppAttempt();
|
am.registerAppAttempt();
|
||||||
am.unregisterAppAttempt();
|
am.unregisterAppAttempt();
|
||||||
|
amNodeManager.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
1, ContainerState.COMPLETE);
|
||||||
|
|
||||||
rm.submitApp(1024);
|
rm.submitApp(1024);
|
||||||
rm.submitApp(1024);
|
rm.submitApp(1024);
|
||||||
|
|
Loading…
Reference in New Issue