YARN-1490. Introduced the ability to make ResourceManager optionally not kill all containers when an ApplicationMaster exits. Contributed by Jian He.

svn merge --ignore-ancestry -c 1557143 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1557144 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-01-10 15:05:18 +00:00
parent 9619fc7740
commit 76fc12eadf
36 changed files with 1042 additions and 743 deletions

View File

@ -861,5 +861,10 @@ public ApplicationResourceUsageReport getAppResourceUsageReport(
public List<ApplicationAttemptId> getAppsInQueue(String queue) {
return scheduler.getAppsInQueue(queue);
}
@Override
public RMContainer getRMContainer(ContainerId containerId) {
return null;
}
}

View File

@ -40,6 +40,9 @@ Release 2.4.0 - UNRELEASED
YARN-1029. Added embedded leader election in the ResourceManager. (Karthik
Kambatla via vinodkv)
YARN-1490. Introduced the ability to make ResourceManager optionally not kill
all containers when an ApplicationMaster exits. (Jian He via vinodkv)
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.util.Records;
/**
@ -57,7 +58,8 @@ public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType) {
int maxAppAttempts, Resource resource, String applicationType,
boolean keepContainers) {
ApplicationSubmissionContext context =
Records.newRecord(ApplicationSubmissionContext.class);
context.setApplicationId(applicationId);
@ -70,9 +72,22 @@ public static ApplicationSubmissionContext newInstance(
context.setMaxAppAttempts(maxAppAttempts);
context.setResource(resource);
context.setApplicationType(applicationType);
context.setKeepContainersAcrossApplicationAttempts(keepContainers);
return context;
}
@Public
@Stable
public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType) {
return newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
resource, null, false);
}
@Public
@Stable
public static ApplicationSubmissionContext newInstance(
@ -268,4 +283,35 @@ public static ApplicationSubmissionContext newInstance(
@Public
@Stable
public abstract void setApplicationType(String applicationType);
/**
* Get the flag which indicates whether to keep containers across application
* attempts or not.
*
* @return the flag which indicates whether to keep containers across
* application attempts or not.
*/
@Public
@Stable
public abstract boolean getKeepContainersAcrossApplicationAttempts();
/**
* Set the flag which indicates whether to keep containers across application
* attempts.
* <p>
* If the flag is true, running containers will not be killed when application
* attempt fails and these containers will be retrieved by the new application
* attempt on registration via
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}.
* </p>
*
* @param keepContainers
* the flag which indicates whether to keep containers across
* application attempts.
*/
@Public
@Stable
public abstract void setKeepContainersAcrossApplicationAttempts(
boolean keepContainers);
}

View File

@ -46,10 +46,20 @@ public static ContainerId newInstance(ApplicationAttemptId appAttemptId,
}
/**
* Get the <code>ApplicationAttemptId</code> of the application to which
* the <code>Container</code> was assigned.
* @return <code>ApplicationAttemptId</code> of the application to which
* the <code>Container</code> was assigned
* Get the <code>ApplicationAttemptId</code> of the application to which the
* <code>Container</code> was assigned.
* <p>
* Note: If containers are kept alive across application attempts via
* {@link ApplicationSubmissionContext#setKeepContainersAcrossApplicationAttempts(boolean)}
* the <code>ContainerId</code> does not necessarily contain the current
* running application attempt's <code>ApplicationAttemptId</code> This
* container can be allocated by previously exited application attempt and
* managed by the current running attempt thus have the previous application
* attempt's <code>ApplicationAttemptId</code>.
* </p>
*
* @return <code>ApplicationAttemptId</code> of the application to which the
* <code>Container</code> was assigned
*/
@Public
@Stable

View File

@ -248,6 +248,7 @@ message ApplicationSubmissionContextProto {
optional int32 maxAppAttempts = 8 [default = 0];
optional ResourceProto resource = 9;
optional string applicationType = 10 [default = "YARN"];
optional bool keep_containers_across_application_attempts = 11 [default = false];
}
enum ApplicationAccessTypeProto {

View File

@ -298,6 +298,19 @@ public void setResource(Resource resource) {
this.resource = resource;
}
@Override
public void
setKeepContainersAcrossApplicationAttempts(boolean keepContainers) {
maybeInitBuilder();
builder.setKeepContainersAcrossApplicationAttempts(keepContainers);
}
@Override
public boolean getKeepContainersAcrossApplicationAttempts() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
return p.getKeepContainersAcrossApplicationAttempts();
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}

View File

@ -422,11 +422,18 @@ public AllocateResponse allocate(AllocateRequest request)
throw e;
}
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException e) {
LOG.warn("Invalid container release by application " + appAttemptId, e);
throw e;
RMApp app =
this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
// In the case of work-preserving AM restart, it's possible for the
// AM to release containers from the earlier attempt.
if (!app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException e) {
LOG.warn("Invalid container release by application " + appAttemptId, e);
throw e;
}
}
// Send new requests to appAttempt.
@ -434,8 +441,6 @@ public AllocateResponse allocate(AllocateRequest request)
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals);
RMApp app = this.rmContext.getRMApps().get(
appAttemptId.getApplicationId());
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse =

View File

@ -23,14 +23,20 @@
public class RMAppFailedAttemptEvent extends RMAppEvent {
private final String diagnostics;
private final boolean transferStateFromPreviousAttempt;
public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event,
String diagnostics) {
String diagnostics, boolean transferStateFromPreviousAttempt) {
super(appId, event);
this.diagnostics = diagnostics;
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
}
public String getDiagnostics() {
return this.diagnostics;
}
public boolean getTransferStateFromPreviousAttempt() {
return transferStateFromPreviousAttempt;
}
}

View File

@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@ -76,6 +77,7 @@
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.resource.Resources;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class RMAppImpl implements RMApp, Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
@ -646,24 +648,26 @@ public void recover(RMState state) throws Exception{
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
createNewAttempt(false);
createNewAttempt();
((RMAppAttemptImpl)this.currentAttempt).recover(state);
}
}
@SuppressWarnings("unchecked")
private void createNewAttempt(boolean startAttempt) {
private void createNewAttempt() {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf);
submissionContext, conf, maxAppAttempts == attempts.size());
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
if(startAttempt) {
handler.handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
}
}
private void
createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
createNewAttempt();
handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
transferStateFromPreviousAttempt));
}
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
@ -688,7 +692,6 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
@SuppressWarnings("unchecked")
private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@ -729,7 +732,6 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
private static final class AddApplicationToSchedulerTransition extends
RMAppTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
if (event instanceof RMAppNewSavedEvent) {
@ -751,14 +753,13 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private static final class StartAppAttemptTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.createNewAttempt(true);
app.createAndStartNewAttempt(false);
};
}
private static final class FinalStateSavedTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
@ -959,7 +960,6 @@ private static String getAppKilledDiagnostics() {
}
private static class KillAttemptTransition extends RMAppTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.stateBeforeKilling = app.getState();
@ -987,7 +987,6 @@ private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
return nodes;
}
@SuppressWarnings("unchecked")
public void transition(RMAppImpl app, RMAppEvent event) {
Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
for (NodeId nodeId : nodes) {
@ -1019,7 +1018,21 @@ public AttemptFailedTransition(RMAppState initialState) {
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (!app.submissionContext.getUnmanagedAM()
&& app.attempts.size() < app.maxAppAttempts) {
app.createNewAttempt(true);
boolean transferStateFromPreviousAttempt = false;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt =
failedEvent.getTransferStateFromPreviousAttempt();
RMAppAttempt oldAttempt = app.currentAttempt;
app.createAndStartNewAttempt(transferStateFromPreviousAttempt);
// Transfer the state from the previous attempt to the current attempt.
// Note that the previous failed attempt may still be collecting the
// container events from the scheduler and update its data structures
// before the new attempt is created.
if (transferStateFromPreviousAttempt) {
((RMAppAttemptImpl) app.currentAttempt)
.transferStateFromPreviousAttempt(oldAttempt);
}
return initialState;
} else {
app.rememberTargetTransitionsAndStoreState(event,

View File

@ -129,9 +129,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private SecretKey clientTokenMasterKey = null;
//nodes on while this attempt's containers ran
private final Set<NodeId> ranNodes =
private Set<NodeId> ranNodes =
new HashSet<NodeId>();
private final List<ContainerStatus> justFinishedContainers =
private List<ContainerStatus> justFinishedContainers =
new ArrayList<ContainerStatus>();
private Container masterContainer;
@ -148,7 +148,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private final StringBuilder diagnostics = new StringBuilder();
private Configuration conf;
private final boolean isLastAttempt;
private static final ExpiredTransition EXPIRED_TRANSITION =
new ExpiredTransition();
@ -330,6 +330,12 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
RMAppAttemptEventType.KILL))
// Transitions from FAILED State
// For work-preserving AM restart, failed attempt are still capturing
// CONTAINER_FINISHED event and record the finished containers for the
// use by the next new attempt.
.addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED,
RMAppAttemptEventType.CONTAINER_FINISHED,
new ContainerFinishedAtFailedTransition())
.addTransition(
RMAppAttemptState.FAILED,
RMAppAttemptState.FAILED,
@ -338,8 +344,7 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.STATUS_UPDATE,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_FINISHED))
RMAppAttemptEventType.CONTAINER_ALLOCATED))
// Transitions from FINISHING State
.addTransition(RMAppAttemptState.FINISHING,
@ -390,7 +395,7 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext,
Configuration conf) {
Configuration conf, boolean isLastAttempt) {
this.conf = conf;
this.applicationAttemptId = appAttemptId;
this.rmContext = rmContext;
@ -404,7 +409,7 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
this.writeLock = lock.writeLock();
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
this.isLastAttempt = isLastAttempt;
this.stateMachine = stateMachineFactory.make(this);
}
@ -685,6 +690,11 @@ public void recover(RMState state) throws Exception {
this.startTime = attemptState.getStartTime();
}
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainers();
this.ranNodes = attempt.getRanNodes();
}
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
throws IOException {
if (appAttemptTokens == null) {
@ -721,6 +731,12 @@ private static final class AttemptStartedTransition extends BaseTransition {
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
boolean transferStateFromPreviousAttempt = false;
if (event instanceof RMAppStartAttemptEvent) {
transferStateFromPreviousAttempt =
((RMAppStartAttemptEvent) event)
.getTransferStateFromPreviousAttempt();
}
appAttempt.startTime = System.currentTimeMillis();
// Register with the ApplicationMasterService
@ -740,9 +756,10 @@ public void transition(RMAppAttemptImpl appAttempt,
new Token<AMRMTokenIdentifier>(id,
appAttempt.rmContext.getAMRMTokenSecretManager());
// Add the applicationAttempt to the scheduler
// Add the applicationAttempt to the scheduler and inform the scheduler
// whether to transfer the state from previous attempt.
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
appAttempt.applicationAttemptId));
appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
}
}
@ -981,6 +998,7 @@ public void transition(RMAppAttemptImpl appAttempt,
// Tell the application and the scheduler
ApplicationId applicationId = appAttemptId.getApplicationId();
RMAppEvent appEvent = null;
boolean keepContainersAcrossAppAttempts = false;
switch (finalAttemptState) {
case FINISHED:
{
@ -996,7 +1014,7 @@ public void transition(RMAppAttemptImpl appAttempt,
appEvent =
new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_KILLED,
"Application killed by user.");
"Application killed by user.", false);
}
break;
case FAILED:
@ -1004,10 +1022,17 @@ public void transition(RMAppAttemptImpl appAttempt,
// don't leave the tracking URL pointing to a non-existent AM
appAttempt.setTrackingUrlToRMAppPage();
appAttempt.invalidateAMHostAndPort();
if (appAttempt.submissionContext
.getKeepContainersAcrossApplicationAttempts()
&& !appAttempt.isLastAttempt
&& !appAttempt.submissionContext.getUnmanagedAM()) {
keepContainersAcrossAppAttempts = true;
}
appEvent =
new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_FAILED,
appAttempt.getDiagnostics());
RMAppEventType.ATTEMPT_FAILED, appAttempt.getDiagnostics(),
keepContainersAcrossAppAttempts);
}
break;
default:
@ -1019,7 +1044,7 @@ public void transition(RMAppAttemptImpl appAttempt,
appAttempt.eventHandler.handle(appEvent);
appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
appAttemptId, finalAttemptState));
appAttemptId, finalAttemptState, keepContainersAcrossAppAttempts));
appAttempt.removeCredentials(appAttempt);
}
}
@ -1045,6 +1070,11 @@ private static final class UnmanagedAMAttemptSavedTransition
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
appAttempt.checkAttemptStoreError(event);
// TODO Today unmanaged AM client is waiting for app state to be Accepted to
// launch the AM. This is broken since we changed to start the attempt
// after the application is Accepted. We may need to introduce an attempt
// report that client can rely on to query the attempt state and choose to
// launch the unmanaged AM.
super.transition(appAttempt, event);
}
}
@ -1346,6 +1376,20 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
}
}
private static final class ContainerFinishedAtFailedTransition
extends BaseTransition {
@Override
public void
transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
RMAppAttemptContainerFinishedEvent containerFinishedEvent =
(RMAppAttemptContainerFinishedEvent) event;
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
// Normal container. Add it in completed containers list
appAttempt.justFinishedContainers.add(containerStatus);
}
}
private static class ContainerFinishedFinalStateSavedTransition extends
BaseTransition {
@Override

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
public class RMAppStartAttemptEvent extends RMAppAttemptEvent {
private final boolean transferStateFromPreviousAttempt;
public RMAppStartAttemptEvent(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt) {
super(appAttemptId, RMAppAttemptEventType.START);
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
}
public boolean getTransferStateFromPreviousAttempt() {
return transferStateFromPreviousAttempt;
}
}

View File

@ -61,7 +61,7 @@ public class AppSchedulingInfo {
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
final Map<Priority, Map<String, ResourceRequest>> requests =
new HashMap<Priority, Map<String, ResourceRequest>>();
final Set<String> blacklist = new HashSet<String>();
private Set<String> blacklist = new HashSet<String>();
//private final ApplicationStore store;
private final ActiveUsersManager activeUsersManager;
@ -399,4 +399,15 @@ synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
public synchronized void setQueue(Queue queue) {
this.queue = queue;
}
public synchronized Set<String> getBlackList() {
return this.blacklist;
}
public synchronized void transferStateFromPreviousAppSchedulingInfo(
AppSchedulingInfo appInfo) {
// this.priorities = appInfo.getPriorities();
// this.requests = appInfo.getRequests();
this.blacklist = appInfo.getBlackList();
}
}

View File

@ -26,6 +26,7 @@ public class SchedulerApplication {
private final Queue queue;
private final String user;
private SchedulerApplicationAttempt currentAttempt;
public SchedulerApplication(Queue queue, String user) {
this.queue = queue;
@ -39,4 +40,12 @@ public Queue getQueue() {
public String getUser() {
return user;
}
public SchedulerApplicationAttempt getCurrentAppAttempt() {
return currentAttempt;
}
public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) {
this.currentAttempt = currentAttempt;
}
}

View File

@ -64,7 +64,7 @@ public abstract class SchedulerApplicationAttempt {
protected final AppSchedulingInfo appSchedulingInfo;
protected final Map<ContainerId, RMContainer> liveContainers =
protected Map<ContainerId, RMContainer> liveContainers =
new HashMap<ContainerId, RMContainer>();
protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
@ -73,7 +73,7 @@ public abstract class SchedulerApplicationAttempt {
protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
protected final Resource currentConsumption = Resource.newInstance(0, 0);
protected Resource currentConsumption = Resource.newInstance(0, 0);
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
@ -407,4 +407,29 @@ public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
Resources.add(currentConsumption, currentReservation));
}
public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
return this.liveContainers;
}
public synchronized Resource getResourceLimit() {
return this.resourceLimit;
}
public synchronized Map<Priority, Long> getLastScheduledContainer() {
return this.lastScheduledContainer;
}
public synchronized void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
this.liveContainers = appAttempt.getLiveContainersMap();
// this.reReservations = appAttempt.reReservations;
this.currentConsumption = appAttempt.getCurrentConsumption();
this.resourceLimit = appAttempt.getResourceLimit();
// this.currentReservation = appAttempt.currentReservation;
// this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
// this.schedulingOpportunities = appAttempt.schedulingOpportunities;
this.lastScheduledContainer = appAttempt.getLastScheduledContainer();
this.appSchedulingInfo
.transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
}
}

View File

@ -19,13 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
/**
@ -170,4 +171,13 @@ boolean checkAccess(UserGroupInformation callerUGI,
@LimitedPrivate("yarn")
@Stable
public List<ApplicationAttemptId> getAppsInQueue(String queueName);
/**
* Get the container for the given containerId.
* @param containerId
* @return the container for the given containerId.
*/
@LimitedPrivate("yarn")
@Unstable
public RMContainer getRMContainer(ContainerId containerId);
}

View File

@ -63,14 +63,15 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -194,10 +195,6 @@ public Configuration getConf() {
protected Map<ApplicationId, SchedulerApplication> applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
@VisibleForTesting
protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts =
new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
private boolean initialized = false;
private ResourceCalculator calculator;
@ -464,21 +461,27 @@ private synchronized void addApplication(ApplicationId applicationId,
}
private synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId) {
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) {
SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
CSQueue queue = (CSQueue) application.getQueue();
FiCaSchedulerApp SchedulerApp =
FiCaSchedulerApp attempt =
new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
queue, queue.getActiveUsersManager(), rmContext);
appAttempts.put(applicationAttemptId, SchedulerApp);
queue.submitApplicationAttempt(SchedulerApp, application.getUser());
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
}
application.setCurrentAppAttempt(attempt);
queue.submitApplicationAttempt(attempt, application.getUser());
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
+ queue.getQueueName());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
rmContext.getDispatcher().getEventHandler() .handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
@ -486,7 +489,8 @@ private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) {
SchedulerApplication application = applications.get(applicationId);
if (application == null){
// The AppRemovedSchedulerEvent maybe sent on recovery for completed apps.
// The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
// ignore it.
return;
}
CSQueue queue = (CSQueue) application.getQueue();
@ -501,52 +505,56 @@ private synchronized void doneApplication(ApplicationId applicationId,
private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState) {
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
LOG.info("Application Attempt " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
if (application == null) {
// throw new IOException("Unknown application " + applicationId +
// " has completed!");
if (application == null || attempt == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
return;
}
// Release all the running containers
for (RMContainer rmContainer : application.getLiveContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
// Release all the allocated, acquired, running containers
for (RMContainer rmContainer : attempt.getLiveContainers()) {
if (keepContainers
&& rmContainer.getState().equals(RMContainerState.RUNNING)) {
// do not kill the running container in the case of work-preserving AM
// restart.
LOG.info("Skip killing " + rmContainer.getContainerId());
continue;
}
completedContainer(
rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
}
// Release all reserved containers
for (RMContainer rmContainer : application.getReservedContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
"Application Complete"),
RMContainerEventType.KILL);
// Release all reserved containers
for (RMContainer rmContainer : attempt.getReservedContainers()) {
completedContainer(
rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(), "Application Complete"),
RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
application.stop(rmAppAttemptFinalState);
attempt.stop(rmAppAttemptFinalState);
// Inform the queue
String queueName = application.getQueue().getQueueName();
String queueName = attempt.getQueue().getQueueName();
CSQueue queue = queues.get(queueName);
if (!(queue instanceof LeafQueue)) {
LOG.error("Cannot finish application " + "from non-leaf queue: "
+ queueName);
} else {
queue.finishApplicationAttempt(application, queue.getQueueName());
queue.finishApplicationAttempt(attempt, queue.getQueueName());
}
// Remove from our data-structure
appAttempts.remove(applicationAttemptId);
}
private static final Allocation EMPTY_ALLOCATION =
@ -558,7 +566,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
FiCaSchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@ -701,7 +709,7 @@ private synchronized void nodeUpdate(RMNode nm) {
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp reservedApplication =
getApplication(reservedContainer.getApplicationAttemptId());
getCurrentAttemptForContainer(reservedContainer.getContainerId());
// Try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application " +
@ -738,12 +746,11 @@ private synchronized void nodeUpdate(RMNode nm) {
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
FiCaSchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
" on node: " + node);
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
@ -791,7 +798,8 @@ public void handle(SchedulerEvent event) {
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
}
break;
case APP_ATTEMPT_REMOVED:
@ -799,7 +807,8 @@ public void handle(SchedulerEvent event) {
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
(AppAttemptRemovedSchedulerEvent) event;
doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
appAttemptRemovedEvent.getFinalAttemptState());
appAttemptRemovedEvent.getFinalAttemptState(),
appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
}
break;
case CONTAINER_EXPIRED:
@ -874,13 +883,13 @@ private synchronized void completedContainer(RMContainer rmContainer,
Container container = rmContainer.getContainer();
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId =
container.getId().getApplicationAttemptId();
FiCaSchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application =
getCurrentAttemptForContainer(container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId();
if (application == null) {
LOG.info("Container " + container + " of" +
" unknown application " + applicationAttemptId +
" completed with event " + event);
LOG.info("Container " + container + " of" + " unknown application "
+ appId + " completed with event " + event);
return;
}
@ -892,28 +901,33 @@ private synchronized void completedContainer(RMContainer rmContainer,
queue.completedContainer(clusterResource, application, node,
rmContainer, containerStatus, event, null);
LOG.info("Application " + applicationAttemptId +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
LOG.info("Application attempt " + application.getApplicationAttemptId()
+ " released container " + container.getId() + " on node: " + node
+ " with event: " + event);
}
@Lock(Lock.NoLock.class)
FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
return appAttempts.get(applicationAttemptId);
FiCaSchedulerApp getApplicationAttempt(
ApplicationAttemptId applicationAttemptId) {
SchedulerApplication app =
applications.get(applicationAttemptId.getApplicationId());
if (app != null) {
return (FiCaSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp app = getApplication(applicationAttemptId);
FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp app = getApplication(applicationAttemptId);
FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
}
@ -922,10 +936,22 @@ FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
private RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp application =
getApplication(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
@Override
public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
@VisibleForTesting
public FiCaSchedulerApp getCurrentAttemptForContainer(
ContainerId containerId) {
SchedulerApplication app =
applications.get(containerId.getApplicationAttemptId()
.getApplicationId());
if (app != null) {
return (FiCaSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
@Override
@ -958,7 +984,7 @@ public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) {
LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
" container: " + cont.toString());
}
FiCaSchedulerApp app = appAttempts.get(aid);
FiCaSchedulerApp app = getApplicationAttempt(aid);
if (app != null) {
app.addPreemptContainer(cont.getContainerId());
}

View File

@ -219,7 +219,8 @@ public synchronized void reserveResource(
" on node " + this.reservedContainer.getReservedNode());
}
// Cannot reserve more than one application on a given node!
// Cannot reserve more than one application attempt on a given node!
// Reservation is still against attempt.
if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
reservedContainer.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" +

View File

@ -23,14 +23,21 @@
public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
private final ApplicationAttemptId applicationAttemptId;
private final boolean transferStateFromPreviousAttempt;
public AppAttemptAddedSchedulerEvent(
ApplicationAttemptId applicationAttemptId) {
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) {
super(SchedulerEventType.APP_ATTEMPT_ADDED);
this.applicationAttemptId = applicationAttemptId;
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
}
public ApplicationAttemptId getApplicationAttemptId() {
return applicationAttemptId;
}
public boolean getTransferStateFromPreviousAttempt() {
return transferStateFromPreviousAttempt;
}
}

View File

@ -25,13 +25,15 @@ public class AppAttemptRemovedSchedulerEvent extends SchedulerEvent {
private final ApplicationAttemptId applicationAttemptId;
private final RMAppAttemptState finalAttemptState;
private final boolean keepContainersAcrossAppAttempts;
public AppAttemptRemovedSchedulerEvent(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState finalAttemptState) {
RMAppAttemptState finalAttemptState, boolean keepContainers) {
super(SchedulerEventType.APP_ATTEMPT_REMOVED);
this.applicationAttemptId = applicationAttemptId;
this.finalAttemptState = finalAttemptState;
this.keepContainersAcrossAppAttempts = keepContainers;
}
public ApplicationAttemptId getApplicationAttemptID() {
@ -41,4 +43,8 @@ public ApplicationAttemptId getApplicationAttemptID() {
public RMAppAttemptState getFinalAttemptState() {
return this.finalAttemptState;
}
public boolean getKeepContainersAcrossAppAttempts() {
return this.keepContainersAcrossAppAttempts;
}
}

View File

@ -162,12 +162,6 @@ public class FairScheduler implements ResourceScheduler {
protected Map<ApplicationId, SchedulerApplication> applications =
new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
// This stores per-application-attempt scheduling information, indexed by
// attempt ID's for fast lookup.
@VisibleForTesting
protected Map<ApplicationAttemptId, FSSchedulerApp> appAttempts =
new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId
private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
@ -262,10 +256,21 @@ public QueueManager getQueueManager() {
return queueMgr;
}
private RMContainer getRMContainer(ContainerId containerId) {
FSSchedulerApp application =
appAttempts.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
@Override
public RMContainer getRMContainer(ContainerId containerId) {
FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
private FSSchedulerApp getCurrentAttemptForContainer(
ContainerId containerId) {
SchedulerApplication app =
applications.get(containerId.getApplicationAttemptId()
.getApplicationId());
if (app != null) {
return (FSSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
/**
@ -640,7 +645,8 @@ protected synchronized void addApplication(ApplicationId applicationId,
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
+ ", in queue: " + queueName + ", currently num of applications: "
+ applications.size());
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
@ -649,31 +655,35 @@ protected synchronized void addApplication(ApplicationId applicationId,
* Add a new application attempt to the scheduler.
*/
protected synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId) {
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) {
SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser();
FSLeafQueue queue = (FSLeafQueue) application.getQueue();
FSSchedulerApp schedulerApp =
FSSchedulerApp attempt =
new FSSchedulerApp(applicationAttemptId, user,
queue, new ActiveUsersManager(getRootQueueMetrics()),
rmContext);
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
}
application.setCurrentAppAttempt(attempt);
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
queue.addApp(schedulerApp, runnable);
queue.addApp(attempt, runnable);
if (runnable) {
maxRunningEnforcer.trackRunnableApp(schedulerApp);
maxRunningEnforcer.trackRunnableApp(attempt);
} else {
maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
maxRunningEnforcer.trackNonRunnableApp(attempt);
}
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
appAttempts.put(applicationAttemptId, schedulerApp);
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user: " + user + ", currently active: "
+ appAttempts.size());
+ " to scheduler from user: " + user);
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
@ -709,19 +719,27 @@ private synchronized void removeApplication(ApplicationId applicationId,
private synchronized void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState) {
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
FSSchedulerApp application = appAttempts.get(applicationAttemptId);
if (application == null) {
if (attempt == null || application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
return;
}
// Release all the running containers
for (RMContainer rmContainer : application.getLiveContainers()) {
for (RMContainer rmContainer : attempt.getLiveContainers()) {
if (keepContainers
&& rmContainer.getState().equals(RMContainerState.RUNNING)) {
// do not kill the running container in the case of work-preserving AM
// restart.
LOG.info("Skip killing " + rmContainer.getContainerId());
continue;
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
@ -730,30 +748,26 @@ private synchronized void removeApplicationAttempt(
}
// Release all reserved containers
for (RMContainer rmContainer : application.getReservedContainers()) {
for (RMContainer rmContainer : attempt.getReservedContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
"Application Complete"),
RMContainerEventType.KILL);
RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
application.stop(rmAppAttemptFinalState);
attempt.stop(rmAppAttemptFinalState);
// Inform the queue
FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
.getQueueName(), false);
boolean wasRunnable = queue.removeApp(application);
boolean wasRunnable = queue.removeApp(attempt);
if (wasRunnable) {
maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
} else {
maxRunningEnforcer.untrackNonRunnableApp(application);
maxRunningEnforcer.untrackNonRunnableApp(attempt);
}
// Remove from our data-structure
appAttempts.remove(applicationAttemptId);
}
/**
@ -769,11 +783,13 @@ private synchronized void completedContainer(RMContainer rmContainer,
Container container = rmContainer.getContainer();
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
FSSchedulerApp application = appAttempts.get(applicationAttemptId);
FSSchedulerApp application =
getCurrentAttemptForContainer(container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId();
if (application == null) {
LOG.info("Container " + container + " of" +
" unknown application " + applicationAttemptId +
" unknown application attempt " + appId +
" completed with event " + event);
return;
}
@ -790,10 +806,9 @@ private synchronized void completedContainer(RMContainer rmContainer,
updateRootQueueMetrics();
}
LOG.info("Application " + applicationAttemptId +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
LOG.info("Application attempt " + application.getApplicationAttemptId()
+ " released container " + container.getId() + " on node: " + node
+ " with event: " + event);
}
private synchronized void addNode(RMNode node) {
@ -844,7 +859,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Make sure this application exists
FSSchedulerApp application = appAttempts.get(appAttemptId);
FSSchedulerApp application = getSchedulerApp(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
@ -914,12 +929,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
*/
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
FSSchedulerApp application = appAttempts.get(applicationAttemptId);
FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
" on node: " + node);
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
return;
}
@ -1058,28 +1072,34 @@ public SchedulerNodeReport getNodeReport(NodeId nodeId) {
}
public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
return appAttempts.get(appAttemptId);
SchedulerApplication app =
applications.get(appAttemptId.getApplicationId());
if (app != null) {
return (FSSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
if (!appAttempts.containsKey(appAttemptId)) {
FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
if (attempt == null) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
return new SchedulerAppReport(appAttempts.get(appAttemptId));
return new SchedulerAppReport(attempt);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
FSSchedulerApp app = appAttempts.get(appAttemptId);
if (app == null) {
FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
if (attempt == null) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
return app.getResourceUsageReport();
return attempt.getResourceUsageReport();
}
/**
@ -1145,7 +1165,8 @@ public void handle(SchedulerEvent event) {
}
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
break;
case APP_ATTEMPT_REMOVED:
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@ -1153,8 +1174,10 @@ public void handle(SchedulerEvent event) {
}
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
(AppAttemptRemovedSchedulerEvent) event;
removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
appAttemptRemovedEvent.getFinalAttemptState());
removeApplicationAttempt(
appAttemptRemovedEvent.getApplicationAttemptID(),
appAttemptRemovedEvent.getFinalAttemptState(),
appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
break;
case CONTAINER_EXPIRED:
if (!(event instanceof ContainerExpiredSchedulerEvent)) {

View File

@ -67,6 +67,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@ -123,15 +124,11 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
private Resource maximumAllocation;
private boolean usePortForNodeName;
// Use ConcurrentSkipListMap because applications need to be ordered
@VisibleForTesting
protected Map<ApplicationId, SchedulerApplication> applications =
new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
// Use ConcurrentSkipListMap because applications need to be ordered
@VisibleForTesting
protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts
= new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
private ActiveUsersManager activeUsersManager;
private static final String DEFAULT_QUEUE_NAME = "default";
@ -270,7 +267,7 @@ public Resource getMaximumResourceCapability() {
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
FiCaSchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@ -336,22 +333,26 @@ public Allocation allocate(
}
@VisibleForTesting
FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
return appAttempts.get(applicationAttemptId);
FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
SchedulerApplication app =
applications.get(applicationAttemptId.getApplicationId());
if (app != null) {
return (FiCaSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp app = getApplication(applicationAttemptId);
FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
FiCaSchedulerApp app = getApplication(applicationAttemptId);
FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
}
@ -364,13 +365,15 @@ private synchronized void addApplication(ApplicationId applicationId,
SchedulerApplication application =
new SchedulerApplication(null, user);
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
private synchronized void addApplicationAttempt(
ApplicationAttemptId appAttemptId) {
private synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt) {
SchedulerApplication application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
@ -378,11 +381,16 @@ private synchronized void addApplicationAttempt(
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
activeUsersManager, this.rmContext);
appAttempts.put(appAttemptId, schedulerApp);
if (transferStateFromPreviousAttempt) {
schedulerApp.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
}
application.setCurrentAppAttempt(schedulerApp);
metrics.submitApp(user, appAttemptId.getAttemptId());
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser()
+ ", currently active: " + appAttempts.size());
+ " to scheduler from user " + application.getUser());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
@ -400,28 +408,33 @@ private synchronized void doneApplication(ApplicationId applicationId,
private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState)
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
throws IOException {
FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
if (application == null || attempt == null) {
throw new IOException("Unknown application " + applicationAttemptId +
" has completed!");
}
// Kill all 'live' containers
for (RMContainer container : application.getLiveContainers()) {
for (RMContainer container : attempt.getLiveContainers()) {
if (keepContainers
&& container.getState().equals(RMContainerState.RUNNING)) {
// do not kill the running container in the case of work-preserving AM
// restart.
LOG.info("Skip killing " + container.getContainerId());
continue;
}
containerCompleted(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
application.stop(rmAppAttemptFinalState);
// Remove the application
appAttempts.remove(applicationAttemptId);
attempt.stop(rmAppAttemptFinalState);
}
/**
@ -432,12 +445,13 @@ private synchronized void doneApplicationAttempt(
private void assignContainers(FiCaSchedulerNode node) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
" #applications=" + appAttempts.size());
" #applications=" + applications.size());
// Try to assign containers to applications in fifo order
for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : appAttempts
for (Map.Entry<ApplicationId, SchedulerApplication> e : applications
.entrySet()) {
FiCaSchedulerApp application = e.getValue();
FiCaSchedulerApp application =
(FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
@ -474,8 +488,10 @@ private void assignContainers(FiCaSchedulerNode node) {
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
for (FiCaSchedulerApp application : appAttempts.values()) {
application.setHeadroom(Resources.subtract(clusterResource, usedResource));
for (SchedulerApplication application : applications.values()) {
FiCaSchedulerApp attempt =
(FiCaSchedulerApp) application.getCurrentAppAttempt();
attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
}
}
@ -744,7 +760,8 @@ public void handle(SchedulerEvent event) {
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
}
break;
case APP_ATTEMPT_REMOVED:
@ -754,7 +771,8 @@ public void handle(SchedulerEvent event) {
try {
doneApplicationAttempt(
appAttemptRemovedEvent.getApplicationAttemptID(),
appAttemptRemovedEvent.getFinalAttemptState());
appAttemptRemovedEvent.getFinalAttemptState(),
appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
} catch(IOException ie) {
LOG.error("Unable to remove application "
+ appAttemptRemovedEvent.getApplicationAttemptID(), ie);
@ -780,12 +798,11 @@ public void handle(SchedulerEvent event) {
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
FiCaSchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
" on node: " + node);
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
// Some unknown container sneaked into the system. Kill it.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
@ -806,14 +823,16 @@ private synchronized void containerCompleted(RMContainer rmContainer,
// Get the application for the finished container
Container container = rmContainer.getContainer();
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
FiCaSchedulerApp application = getApplication(applicationAttemptId);
FiCaSchedulerApp application =
getCurrentAttemptForContainer(container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId();
// Get the node on which the container was allocated
FiCaSchedulerNode node = getNode(container.getNodeId());
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
LOG.info("Unknown application: " + appId +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
@ -829,7 +848,7 @@ private synchronized void containerCompleted(RMContainer rmContainer,
// Update total usage
Resources.subtractFrom(usedResource, container.getResource());
LOG.info("Application " + applicationAttemptId +
LOG.info("Application attempt " + application.getApplicationAttemptId() +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
@ -888,10 +907,21 @@ public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
return node == null ? null : new SchedulerNodeReport(node);
}
private RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp application =
getApplication(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
@Override
public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
private FiCaSchedulerApp getCurrentAttemptForContainer(
ContainerId containerId) {
SchedulerApplication app =
applications.get(containerId.getApplicationAttemptId()
.getApplicationId());
if (app != null) {
return (FiCaSchedulerApp) app.getCurrentAppAttempt();
}
return null;
}
@Override
@ -908,12 +938,12 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI,
@Override
public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
appAttempts.size());
for (FiCaSchedulerApp app : appAttempts.values()) {
apps.add(app.getApplicationAttemptId());
List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(
applications.size());
for (SchedulerApplication app : applications.values()) {
attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
}
return apps;
return attempts;
} else {
return null;
}

View File

@ -171,7 +171,7 @@ public synchronized void submit() throws IOException, YarnException {
new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
scheduler.handle(addAppEvent);
AppAttemptAddedSchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(this.applicationAttemptId);
new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false);
scheduler.handle(addAttemptEvent);
}

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.mortbay.log.Log;
public class MockNM {
@ -130,12 +131,13 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
int containerId, ContainerState containerState) throws Exception {
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
new HashMap<ApplicationId, List<ContainerStatus>>(1);
ContainerStatus amContainerStatus = BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(attemptId, 1),
ContainerState.COMPLETE, "Success", 0);
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(attemptId, containerId), containerState,
"Success", 0);
ArrayList<ContainerStatus> containerStatusList =
new ArrayList<ContainerStatus>(1);
containerStatusList.add(amContainerStatus);
containerStatusList.add(containerStatus);
Log.info("ContainerStatus: " + containerStatus);
nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
return nodeHeartbeat(nodeUpdate, true);
}
@ -152,6 +154,7 @@ public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
status.setResponseId(resId);
status.setNodeId(nodeId);
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
Log.info("entry.getValue() " + entry.getValue());
status.setContainersStatuses(entry.getValue());
}
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@ -40,7 +41,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@ -56,6 +60,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@ -122,6 +128,33 @@ public void waitForState(ApplicationAttemptId attemptId,
attempt.getAppAttemptState());
}
public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
throws Exception {
int timeoutSecs = 0;
while (getResourceScheduler().getRMContainer(containerId) == null
&& timeoutSecs++ < 40) {
System.out.println("Waiting for" + containerId + " to be allocated.");
nm.nodeHeartbeat(true);
Thread.sleep(200);
}
}
public void waitForState(MockNM nm, ContainerId containerId,
RMContainerState containerState) throws Exception {
RMContainer container = getResourceScheduler().getRMContainer(containerId);
Assert.assertNotNull("Container shouldn't be null", container);
int timeoutSecs = 0;
while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) {
System.out.println("Container : " + containerId + " State is : "
+ container.getState() + " Waiting for state : " + containerState);
nm.nodeHeartbeat(true);
Thread.sleep(300);
}
System.out.println("Container State is : " + container.getState());
Assert.assertEquals("Container state is not correct (timedout)",
containerState, container.getState());
}
// get new application id
public GetNewApplicationResponse getNewAppId() throws Exception {
ApplicationClientProtocol client = getClientRMService();
@ -172,7 +205,17 @@ public RMApp submitApp(int masterMemory, String name, String user,
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted) throws Exception {
boolean waitForAccepted)
throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, false);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers)
throws Exception {
ApplicationClientProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@ -182,6 +225,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
.newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext sub = Records
.newRecord(ApplicationSubmissionContext.class);
sub.setKeepContainersAcrossApplicationAttempts(keepContainers);
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
@ -421,4 +465,26 @@ protected void startWepApp() {
// override to disable webapp
}
public static void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
MockAM am) throws Exception {
FinishApplicationMasterRequest req =
FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.SUCCEEDED, "", "");
am.unregisterAppAttempt(req);
am.waitForState(RMAppAttemptState.FINISHING);
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
}
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
return am;
}
}

View File

@ -649,7 +649,7 @@ private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
.currentTimeMillis(), "YARN"));
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1);
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
rmContext, yarnScheduler, null, asContext, config);
rmContext, yarnScheduler, null, asContext, config, false);
when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
return app;
}

View File

@ -302,7 +302,7 @@ public void testBlackListNodes() throws Exception {
new AppAddedSchedulerEvent(appId1, "queue", "user");
fs.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId1);
new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
fs.handle(attemptEvent);
List<ContainerId> emptyId = new ArrayList<ContainerId>();
@ -396,7 +396,7 @@ public void testHeadroom() throws Exception {
new AppAddedSchedulerEvent(appId1, "queue", "user");
fs.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId1);
new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
fs.handle(attemptEvent);
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
@ -406,7 +406,7 @@ public void testHeadroom() throws Exception {
new AppAddedSchedulerEvent(appId2, "queue", "user");
fs.handle(appEvent2);
SchedulerEvent attemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId2);
new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
fs.handle(attemptEvent2);
List<ContainerId> emptyId = new ArrayList<ContainerId>();

View File

@ -28,7 +28,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
@ -38,7 +37,6 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
@ -295,6 +293,8 @@ public void testNMToken() throws Exception {
nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
ContainerState.COMPLETE);
}
nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1,
ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
Assert.assertFalse(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
@ -389,19 +389,19 @@ public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception {
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = launchAM(app1, rm1, nm1);
finishApplicationMaster(app1, rm1, nm1, am1);
MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
MockRM.finishApplicationMaster(app1, rm1, nm1, am1);
// a failed app
RMApp app2 = rm1.submitApp(200);
MockAM am2 = launchAM(app2, rm1, nm1);
MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
// a killed app
RMApp app3 = rm1.submitApp(200);
MockAM am3 = launchAM(app3, rm1, nm1);
MockAM am3 = MockRM.launchAM(app3, rm1, nm1);
rm1.killApp(app3.getApplicationId());
rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
@ -441,7 +441,7 @@ public void testInvalidatedAMHostPortOnAMRestart() throws Exception {
// a failed app
RMApp app2 = rm1.submitApp(200);
MockAM am2 = launchAM(app2, rm1, nm1);
MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
nm1
.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
@ -458,28 +458,6 @@ public void testInvalidatedAMHostPortOnAMRestart() throws Exception {
Assert.assertEquals(-1, report1.getRpcPort());
}
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
return am;
}
private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
MockAM am) throws Exception {
FinishApplicationMasterRequest req =
FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.SUCCEEDED, "", "");
am.unregisterAppAttempt(req);
am.waitForState(RMAppAttemptState.FINISHING);
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
}
public static void main(String[] args) throws Exception {
TestRM t = new TestRM();
t.testGetNewAppId();

View File

@ -164,7 +164,7 @@ public void testResourceAllocation() throws IOException,
// Notify scheduler application is finished.
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(
application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
application.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
resourceManager.getResourceScheduler().handle(appRemovedEvent1);
checkResourceUsage(nm1, nm2);

View File

@ -18,49 +18,30 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.Test;
/**
@ -68,238 +49,164 @@
*
*/
public class TestAMRestart {
// private static final Log LOG = LogFactory.getLog(TestAMRestart.class);
// ApplicationsManagerImpl appImpl;
// RMContext asmContext = new RMContextImpl(new MemStore());
// ApplicationTokenSecretManager appTokenSecretManager =
// new ApplicationTokenSecretManager();
// DummyResourceScheduler scheduler;
// private ClientRMService clientRMService;
// int count = 0;
// ApplicationId appID;
// final int maxFailures = 3;
// AtomicInteger launchNotify = new AtomicInteger();
// AtomicInteger schedulerNotify = new AtomicInteger();
// volatile boolean stop = false;
// int schedulerAddApplication = 0;
// int schedulerRemoveApplication = 0;
// int launcherLaunchCalled = 0;
// int launcherCleanupCalled = 0;
// private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
//
// private class ExtApplicationsManagerImpl extends ApplicationsManagerImpl {
// public ExtApplicationsManagerImpl(
// ApplicationTokenSecretManager applicationTokenSecretManager,
// YarnScheduler scheduler, RMContext asmContext) {
// super(applicationTokenSecretManager, scheduler, asmContext);
// }
//
// @Override
// public EventHandler<ASMEvent<AMLauncherEventType>> createNewApplicationMasterLauncher(
// ApplicationTokenSecretManager tokenSecretManager) {
// return new DummyAMLauncher();
// }
// }
//
// private class DummyAMLauncher implements EventHandler<ASMEvent<AMLauncherEventType>> {
//
// public DummyAMLauncher() {
// asmContext.getDispatcher().register(AMLauncherEventType.class, this);
// new Thread() {
// public void run() {
// while (!stop) {
// LOG.info("DEBUG -- waiting for launch");
// synchronized(launchNotify) {
// while (launchNotify.get() == 0) {
// try {
// launchNotify.wait();
// } catch (InterruptedException e) {
// }
// }
// asmContext.getDispatcher().getEventHandler().handle(
// new ApplicationEvent(
// ApplicationEventType.LAUNCHED, appID));
// launchNotify.addAndGet(-1);
// }
// }
// }
// }.start();
// }
//
// @Override
// public void handle(ASMEvent<AMLauncherEventType> event) {
// switch (event.getType()) {
// case CLEANUP:
// launcherCleanupCalled++;
// break;
// case LAUNCH:
// LOG.info("DEBUG -- launching");
// launcherLaunchCalled++;
// synchronized (launchNotify) {
// launchNotify.addAndGet(1);
// launchNotify.notify();
// }
// break;
// default:
// break;
// }
// }
// }
//
// private class DummyResourceScheduler implements ResourceScheduler {
//
// @Override
// public void removeNode(RMNode node) {
// }
//
// @Override
// public Allocation allocate(ApplicationId applicationId,
// List<ResourceRequest> ask, List<Container> release) throws IOException {
// Container container = recordFactory.newRecordInstance(Container.class);
// container.setContainerToken(recordFactory.newRecordInstance(ContainerToken.class));
// container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
// container.setContainerManagerAddress("localhost");
// container.setNodeHttpAddress("localhost:8042");
// container.setId(recordFactory.newRecordInstance(ContainerId.class));
// container.getId().setAppId(appID);
// container.getId().setId(count);
// count++;
// return new Allocation(Arrays.asList(container), Resources.none());
// }
//
// @Override
// public void handle(ASMEvent<ApplicationTrackerEventType> event) {
// switch (event.getType()) {
// case ADD:
// schedulerAddApplication++;
// break;
// case EXPIRE:
// schedulerRemoveApplication++;
// LOG.info("REMOVING app : " + schedulerRemoveApplication);
// if (schedulerRemoveApplication == maxFailures) {
// synchronized (schedulerNotify) {
// schedulerNotify.addAndGet(1);
// schedulerNotify.notify();
// }
// }
// break;
// default:
// break;
// }
// }
//
// @Override
// public QueueInfo getQueueInfo(String queueName,
// boolean includeChildQueues,
// boolean recursive) throws IOException {
// return null;
// }
// @Override
// public List<QueueUserACLInfo> getQueueUserAclInfo() {
// return null;
// }
// @Override
// public void addApplication(ApplicationId applicationId,
// ApplicationMaster master, String user, String queue, Priority priority,
// ApplicationStore store)
// throws IOException {
// }
// @Override
// public void addNode(RMNode nodeInfo) {
// }
// @Override
// public void recover(RMState state) throws Exception {
// }
// @Override
// public void reinitialize(Configuration conf,
// ContainerTokenSecretManager secretManager, RMContext rmContext)
// throws IOException {
// }
//
// @Override
// public void nodeUpdate(RMNode nodeInfo,
// Map<String, List<Container>> containers) {
// }
//
// @Override
// public Resource getMaximumResourceCapability() {
// // TODO Auto-generated method stub
// return null;
// }
//
// @Override
// public Resource getMinimumResourceCapability() {
// // TODO Auto-generated method stub
// return null;
// }
// }
//
// @Before
// public void setUp() {
//
// asmContext.getDispatcher().register(ApplicationEventType.class,
// new ResourceManager.ApplicationEventDispatcher(asmContext));
//
// appID = recordFactory.newRecordInstance(ApplicationId.class);
// appID.setClusterTimestamp(System.currentTimeMillis());
// appID.setId(1);
// Configuration conf = new Configuration();
// scheduler = new DummyResourceScheduler();
// asmContext.getDispatcher().init(conf);
// asmContext.getDispatcher().start();
// asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
// appImpl = new ExtApplicationsManagerImpl(appTokenSecretManager, scheduler, asmContext);
//
// conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L);
// conf.setInt(RMConfig.AM_MAX_RETRIES, maxFailures);
// appImpl.init(conf);
// appImpl.start();
//
// this.clientRMService = new ClientRMService(asmContext, appImpl
// .getAmLivelinessMonitor(), appImpl.getClientToAMSecretManager(),
// scheduler);
// this.clientRMService.init(conf);
// }
//
// @After
// public void tearDown() {
// }
//
// private void waitForFailed(AppAttempt application, ApplicationState
// finalState) throws Exception {
// int count = 0;
// while(application.getState() != finalState && count < 10) {
// Thread.sleep(500);
// count++;
// }
// Assert.assertEquals(finalState, application.getState());
// }
//
// @Test
// public void testAMRestart() throws Exception {
// ApplicationSubmissionContext subContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
// subContext.setApplicationId(appID);
// subContext.setApplicationName("dummyApp");
//// subContext.command = new ArrayList<String>();
//// subContext.environment = new HashMap<String, String>();
//// subContext.fsTokens = new ArrayList<String>();
// subContext.setFsTokensTodo(ByteBuffer.wrap(new byte[0]));
// SubmitApplicationRequest request = recordFactory
// .newRecordInstance(SubmitApplicationRequest.class);
// request.setApplicationSubmissionContext(subContext);
// clientRMService.submitApplication(request);
// AppAttempt application = asmContext.getApplications().get(appID);
// synchronized (schedulerNotify) {
// while(schedulerNotify.get() == 0) {
// schedulerNotify.wait();
// }
// }
// Assert.assertEquals(maxFailures, launcherCleanupCalled);
// Assert.assertEquals(maxFailures, launcherLaunchCalled);
// Assert.assertEquals(maxFailures, schedulerAddApplication);
// Assert.assertEquals(maxFailures, schedulerRemoveApplication);
// Assert.assertEquals(maxFailures, application.getFailedCount());
// waitForFailed(application, ApplicationState.FAILED);
// stop = true;
// }
@Test
public void testAMRestartWithExistingContainers() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MockRM rm1 = new MockRM(conf);
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
null, "MAPREDUCE", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 =
new MockNM("127.0.0.1:2351", 4089, rm1.getResourceTrackerService());
nm2.registerNode();
MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
int NUM_CONTAINERS = 3;
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm1.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// launch the 3rd container, for testing container allocated by previous
// attempt is completed by the next new attempt/
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING);
ContainerId containerId3 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 3);
rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
// 4th container still in AQUIRED state. for testing Acquired container is
// always killed.
ContainerId containerId4 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 4);
rm1.waitForState(nm1, containerId4, RMContainerState.ACQUIRED);
// 5th container is in Allocated state. for testing allocated container is
// always killed.
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
ContainerId containerId5 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 5);
rm1.waitForContainerAllocated(nm1, containerId5);
rm1.waitForState(nm1, containerId5, RMContainerState.ALLOCATED);
// 6th container is in Reserved state.
am1.allocate("127.0.0.1", 6000, 1, new ArrayList<ContainerId>());
ContainerId containerId6 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 6);
nm1.nodeHeartbeat(true);
SchedulerApplicationAttempt schedulerAttempt =
((CapacityScheduler) rm1.getResourceScheduler())
.getCurrentAttemptForContainer(containerId6);
while (schedulerAttempt.getReservedContainers().size() == 0) {
System.out.println("Waiting for container " + containerId6
+ " to be reserved.");
nm1.nodeHeartbeat(true);
Thread.sleep(200);
}
// assert containerId6 is reserved.
Assert.assertEquals(containerId6, schedulerAttempt.getReservedContainers()
.get(0).getContainerId());
// fail the AM by sending CONTAINER_FINISHED event without registering.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am1.waitForState(RMAppAttemptState.FAILED);
// wait for some time. previous AM's running containers should still remain
// in scheduler even though am failed
Thread.sleep(3000);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// acquired/allocated containers are cleaned up.
Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId4));
Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId5));
// wait for app to start a new attempt.
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// assert this is a new AM.
ApplicationAttemptId newAttemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
// complete container by sending the container complete event which has earlier
// attempt's attemptId
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE);
rm1.waitForState(nm1, containerId3, RMContainerState.COMPLETED);
// Even though the completed container containerId3 event was sent to the
// earlier failed attempt, new RMAppAttempt can also capture this container
// info.
// completed containerId4 is also transferred to the new attempt.
RMAppAttempt newAttempt =
app1.getRMAppAttempt(am2.getApplicationAttemptId());
// 4 containers finished, acquired/allocated/reserved/completed.
Assert.assertEquals(4, newAttempt.getJustFinishedContainers().size());
boolean container3Exists = false, container4Exists = false, container5Exists =
false, container6Exists = false;
for(ContainerStatus status : newAttempt.getJustFinishedContainers()) {
if(status.getContainerId().equals(containerId3)) {
// containerId3 is the container ran by previous attempt but finished by the
// new attempt.
container3Exists = true;
}
if (status.getContainerId().equals(containerId4)) {
// containerId4 is the Acquired Container killed by the previous attempt,
// it's now inside new attempt's finished container list.
container4Exists = true;
}
if (status.getContainerId().equals(containerId5)) {
// containerId5 is the Allocated container killed by previous failed attempt.
container5Exists = true;
}
if (status.getContainerId().equals(containerId6)) {
// containerId6 is the reserved container killed by previous failed attempt.
container6Exists = true;
}
}
Assert.assertTrue(container3Exists && container4Exists && container5Exists
&& container6Exists);
// New SchedulerApplicationAttempt also has the containers info.
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// record the scheduler attempt for testing.
SchedulerApplicationAttempt schedulerNewAttempt =
((CapacityScheduler) rm1.getResourceScheduler())
.getCurrentAttemptForContainer(containerId2);
// finish this application
MockRM.finishApplicationMaster(app1, rm1, nm1, am2);
// the 2nd attempt released the 1st attempt's running container, when the
// 2nd attempt finishes.
Assert.assertFalse(schedulerNewAttempt.getLiveContainers().contains(
containerId2));
// all 4 normal containers finished.
Assert.assertEquals(5, newAttempt.getJustFinishedContainers().size());
rm1.stop();
}
}

View File

@ -460,7 +460,7 @@ public void testUnmanagedApp() throws IOException {
LOG.info("--- START: testUnmanagedAppFailPath ---");
application = testCreateAppRunning(subContext);
RMAppEvent event = new RMAppFailedAttemptEvent(
application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "");
application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false);
application.handle(event);
rmDispatcher.await();
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
@ -582,7 +582,7 @@ public void testAppAcceptedFailed() throws IOException {
for (int i=1; i < maxAppAttempts; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
RMAppEventType.ATTEMPT_FAILED, "", false);
application.handle(event);
assertAppState(RMAppState.ACCEPTED, application);
event =
@ -598,7 +598,7 @@ public void testAppAcceptedFailed() throws IOException {
String message = "Test fail";
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, message);
RMAppEventType.ATTEMPT_FAILED, message, false);
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@ -655,7 +655,7 @@ public void testAppRunningFailed() throws IOException {
for (int i=1; i<maxAppAttempts; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
RMAppEventType.ATTEMPT_FAILED, "", false);
application.handle(event);
rmDispatcher.await();
assertAppState(RMAppState.ACCEPTED, application);
@ -680,7 +680,7 @@ public void testAppRunningFailed() throws IOException {
// after max application attempts
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
RMAppEventType.ATTEMPT_FAILED, "", false);
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@ -804,7 +804,7 @@ public void testAppKilledKilled() throws IOException {
// KILLED => KILLED event RMAppEventType.ATTEMPT_FAILED
event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
RMAppEventType.ATTEMPT_FAILED, "", false);
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -68,10 +69,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@ -120,13 +121,14 @@ public class TestRMAppAttemptTransitions {
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore store;
private RMApp application;
private RMAppImpl application;
private RMAppAttempt applicationAttempt;
private Configuration conf = new Configuration();
private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
spy(new ClientToAMTokenSecretManagerInRM());
private boolean transferStateFromPreviousAttempt = false;
private final class TestApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> {
@ -150,6 +152,11 @@ private final class TestApplicationEventDispatcher implements
@Override
public void handle(RMAppEvent event) {
assertEquals(application.getApplicationId(), event.getApplicationId());
if (event instanceof RMAppFailedAttemptEvent) {
transferStateFromPreviousAttempt =
((RMAppFailedAttemptEvent) event)
.getTransferStateFromPreviousAttempt();
}
try {
application.handle(event);
} catch (Throwable t) {
@ -254,10 +261,10 @@ public void setUp() throws Exception {
unmanagedAM = false;
application = mock(RMApp.class);
application = mock(RMAppImpl.class);
applicationAttempt =
new RMAppAttemptImpl(applicationAttemptId, rmContext, scheduler,
masterService, submissionContext, new Configuration());
masterService, submissionContext, new Configuration(), false);
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
when(application.getApplicationId()).thenReturn(applicationId);
@ -371,6 +378,7 @@ private void testAppAttemptKilledState(Container amContainer,
assertNull(applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyAttemptFinalStateSaved();
assertFalse(transferStateFromPreviousAttempt);
}
/**
@ -525,6 +533,7 @@ private void testAppAttemptFinishedState(Container container,
assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
assertFalse(transferStateFromPreviousAttempt);
}
@ -654,6 +663,7 @@ private void testUnmanagedAMSuccess(String url) {
diagnostics));
testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1,
true);
assertFalse(transferStateFromPreviousAttempt);
}
private void sendAttemptUpdateSavedEvent(RMAppAttempt applicationAttempt) {
@ -681,6 +691,21 @@ public void testUnmanagedAMUnexpectedRegistration() {
"Unmanaged AM must register after AM attempt reaches LAUNCHED state.");
}
@Test
public void testUnmanagedAMContainersCleanup() {
unmanagedAM = true;
when(submissionContext.getUnmanagedAM()).thenReturn(true);
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
.thenReturn(true);
// submit AM and check it goes to SUBMITTED state
submitApplicationAttempt();
// launch AM and verify attempt failed
applicationAttempt.handle(new RMAppAttemptRegistrationEvent(
applicationAttempt.getAppAttemptId(), "host", 8042, "oldtrackingurl"));
sendAttemptUpdateSavedEvent(applicationAttempt);
assertFalse(transferStateFromPreviousAttempt);
}
@Test
public void testNewToKilled() {
applicationAttempt.handle(
@ -1092,6 +1117,64 @@ public void testGetClientToken() throws Exception {
Assert.assertNull(token);
}
@Test
public void testFailedToFailed() {
// create a failed attempt.
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
.thenReturn(true);
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
ContainerStatus cs1 =
ContainerStatus.newInstance(amContainer.getId(),
ContainerState.COMPLETE, "some error", 123);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs1));
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
// should not kill containers when attempt fails.
assertTrue(transferStateFromPreviousAttempt);
// failed attempt captured the container finished event.
assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
ContainerStatus cs2 =
ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
ContainerState.COMPLETE, "", 0);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs2));
assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
assertEquals(cs2.getContainerId(), applicationAttempt
.getJustFinishedContainers().get(0).getContainerId());
}
@Test
public void testContainersCleanupForLastAttempt() {
// create a failed attempt.
applicationAttempt =
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), rmContext,
scheduler, masterService, submissionContext, new Configuration(),
true);
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
.thenReturn(true);
when(submissionContext.getMaxAppAttempts()).thenReturn(1);
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
ContainerStatus cs1 =
ContainerStatus.newInstance(amContainer.getId(),
ContainerState.COMPLETE, "some error", 123);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs1));
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
assertFalse(transferStateFromPreviousAttempt);
}
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {

View File

@ -562,18 +562,18 @@ public void testBlackListNodes() throws Exception {
new AppAddedSchedulerEvent(appId, "default", "user");
cs.handle(addAppEvent);
SchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId);
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
cs.handle(addAttemptEvent);
// Verify the blacklist can be updated independent of requesting containers
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null);
Assert.assertTrue(cs.getApplication(appAttemptId).isBlacklisted(host));
Assert.assertTrue(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host));
Assert.assertFalse(cs.getApplication(appAttemptId).isBlacklisted(host));
Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
rm.stop();
}
@ -598,66 +598,6 @@ public void testApplicationComparator()
assertTrue(appComparator.compare(app2, app3) < 0);
}
@Test
public void testConcurrentAccessOnApplications() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
verifyConcurrentAccessOnApplications(
cs.appAttempts, FiCaSchedulerApp.class, Queue.class);
}
public static <T extends SchedulerApplicationAttempt, Q extends Queue>
void verifyConcurrentAccessOnApplications(
final Map<ApplicationAttemptId, T> applications, Class<T> appClazz,
final Class<Q> queueClazz)
throws Exception {
final int size = 10000;
final ApplicationId appId = ApplicationId.newInstance(0, 0);
final Constructor<T> ctor = appClazz.getDeclaredConstructor(
ApplicationAttemptId.class, String.class, queueClazz,
ActiveUsersManager.class, RMContext.class);
ApplicationAttemptId appAttemptId0
= ApplicationAttemptId.newInstance(appId, 0);
applications.put(appAttemptId0, ctor.newInstance(
appAttemptId0, null, mock(queueClazz), null, null));
assertNotNull(applications.get(appAttemptId0));
// Imitating the thread of scheduler that will add and remove apps
final AtomicBoolean finished = new AtomicBoolean(false);
final AtomicBoolean failed = new AtomicBoolean(false);
Thread t = new Thread() {
@Override
public void run() {
for (int i = 1; i <= size; ++i) {
ApplicationAttemptId appAttemptId
= ApplicationAttemptId.newInstance(appId, i);
try {
applications.put(appAttemptId, ctor.newInstance(
appAttemptId, null, mock(queueClazz), null, null));
} catch (Exception e) {
failed.set(true);
finished.set(true);
return;
}
}
for (int i = 1; i <= size; ++i) {
ApplicationAttemptId appAttemptId
= ApplicationAttemptId.newInstance(appId, i);
applications.remove(appAttemptId);
}
finished.set(true);
}
};
t.start();
// Imitating the thread of rmappattempt that will get the app
while (!finished.get()) {
assertNotNull(applications.get(appAttemptId0));
}
assertFalse(failed.get());
}
@Test
public void testGetAppsInQueue() throws Exception {
Application application_0 = new Application("user_0", "a1", resourceManager);

View File

@ -63,6 +63,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -345,11 +347,16 @@ public void testAppAttemptMetrics() throws Exception {
.getMockApplicationAttemptId(0, 1);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null,
rmContext);
a.submitApplicationAttempt(app_0, user_0);
AppAddedSchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appAttemptId_0.getApplicationId(),
a.getQueueName(), user_0);
cs.handle(addAppEvent);
AppAttemptAddedSchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
cs.handle(addAttemptEvent);
when(cs.getApplication(appAttemptId_0)).thenReturn(app_0);
AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent(
appAttemptId_0, RMAppAttemptState.FAILED);
appAttemptId_0, RMAppAttemptState.FAILED, false);
cs.handle(event);
assertEquals(0, a.getMetrics().getAppsPending());
@ -365,9 +372,8 @@ public void testAppAttemptMetrics() throws Exception {
assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(1, a.getMetrics().getAppsPending());
when(cs.getApplication(appAttemptId_1)).thenReturn(app_0);
event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
RMAppAttemptState.FINISHED);
RMAppAttemptState.FINISHED, false);
cs.handle(event);
assertEquals(1, a.getMetrics().getAppsSubmitted());

View File

@ -261,7 +261,7 @@ private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if (scheduler.applications.containsKey(id.getApplicationId())) {
scheduler.addApplicationAttempt(id);
scheduler.addApplicationAttempt(id, false);
}
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
@ -590,7 +590,7 @@ public void testSimpleContainerReservation() throws Exception {
// Make sure queue 2 is waiting with a reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
getResourceUsage().getMemory());
assertEquals(1024, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory());
assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory());
// Now another node checks in with capacity
RMNode node2 =
@ -606,10 +606,10 @@ public void testSimpleContainerReservation() throws Exception {
getResourceUsage().getMemory());
// The old reservation should still be there...
assertEquals(1024, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory());
assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory());
// ... but it should disappear when we update the first node.
scheduler.handle(updateEvent);
assertEquals(0, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory());
assertEquals(0, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory());
}
@ -630,7 +630,7 @@ public void testUserAsDefaultQueue() throws Exception {
"user1");
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attempAddedEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId);
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attempAddedEvent);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getRunnableAppSchedulables().size());
@ -656,7 +656,7 @@ public void testNotUserAsDefaultQueue() throws Exception {
"user2");
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attempAddedEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId);
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attempAddedEvent);
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
.getRunnableAppSchedulables().size());
@ -710,7 +710,6 @@ public void testQueuePlacementWithPolicy() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appId;
Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.appAttempts;
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
@ -723,17 +722,17 @@ public void testQueuePlacementWithPolicy() throws Exception {
scheduler.getAllocationConfiguration().placementPolicy =
new QueuePlacementPolicy(rules, queues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.somequeue", apps.get(appId).getQueueName());
assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user1");
assertEquals("root.user1", apps.get(appId).getQueueName());
assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user3");
assertEquals("root.user3group", apps.get(appId).getQueueName());
assertEquals("root.user3group", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user4");
assertEquals("root.user4subgroup1", apps.get(appId).getQueueName());
assertEquals("root.user4subgroup1", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user5");
assertEquals("root.user5subgroup2", apps.get(appId).getQueueName());
assertEquals("root.user5subgroup2", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "otheruser");
assertEquals("root.default", apps.get(appId).getQueueName());
assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName());
// test without specified as first rule
rules = new ArrayList<QueuePlacementRule>();
@ -743,11 +742,11 @@ public void testQueuePlacementWithPolicy() throws Exception {
scheduler.getAllocationConfiguration().placementPolicy =
new QueuePlacementPolicy(rules, queues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.user1", apps.get(appId).getQueueName());
assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "somequeue", "otheruser");
assertEquals("root.somequeue", apps.get(appId).getQueueName());
assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "otheruser");
assertEquals("root.default", apps.get(appId).getQueueName());
assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName());
}
@Test
@ -802,13 +801,13 @@ public void testQueueDemandCalculation() throws Exception {
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
scheduler.addApplicationAttempt(id11);
scheduler.addApplicationAttempt(id11, false);
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
scheduler.addApplicationAttempt(id21);
scheduler.addApplicationAttempt(id21, false);
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
scheduler.addApplicationAttempt(id22);
scheduler.addApplicationAttempt(id22, false);
int minReqSize =
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
@ -854,7 +853,7 @@ public void testAppAdditionAndRemoval() throws Exception {
"user1");
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attemptAddedEvent =
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1));
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
scheduler.handle(attemptAddedEvent);
// Scheduler should have two queues (the default and the one created for user1)
@ -865,7 +864,7 @@ public void testAppAdditionAndRemoval() throws Exception {
.getRunnableAppSchedulables().size());
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED, false);
// Now remove app
scheduler.handle(appRemovedEvent1);
@ -1138,12 +1137,12 @@ public void testChoiceOfPreemptedContainers() throws Exception {
scheduler.handle(nodeUpdate3);
}
assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size());
assertEquals(1, scheduler.appAttempts.get(app2).getLiveContainers().size());
assertEquals(1, scheduler.appAttempts.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size());
assertEquals(1, scheduler.appAttempts.get(app5).getLiveContainers().size());
assertEquals(1, scheduler.appAttempts.get(app6).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size());
// Now new requests arrive from queues C and D
ApplicationAttemptId app7 =
@ -1166,16 +1165,16 @@ public void testChoiceOfPreemptedContainers() throws Exception {
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size());
assertEquals(1, scheduler.appAttempts.get(app2).getLiveContainers().size());
assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size());
assertEquals(1, scheduler.appAttempts.get(app5).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size());
// First verify we are adding containers to preemption list for the application
assertTrue(!Collections.disjoint(scheduler.appAttempts.get(app3).getLiveContainers(),
scheduler.appAttempts.get(app3).getPreemptionContainers()));
assertTrue(!Collections.disjoint(scheduler.appAttempts.get(app6).getLiveContainers(),
scheduler.appAttempts.get(app6).getPreemptionContainers()));
assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(),
scheduler.getSchedulerApp(app3).getPreemptionContainers()));
assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(),
scheduler.getSchedulerApp(app6).getPreemptionContainers()));
// Pretend 15 seconds have passed
clock.tick(15);
@ -1185,8 +1184,8 @@ public void testChoiceOfPreemptedContainers() throws Exception {
Resources.createResource(2 * 1024));
// At this point the containers should have been killed (since we are not simulating AM)
assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size());
assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
// Trigger a kill by insisting we want containers back
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
@ -1200,22 +1199,22 @@ public void testChoiceOfPreemptedContainers() throws Exception {
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.appAttempts.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size());
assertEquals(0, scheduler.appAttempts.get(app5).getLiveContainers().size());
assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
// Now A and B are below fair share, so preemption shouldn't do anything
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.appAttempts.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size());
assertEquals(0, scheduler.appAttempts.get(app5).getLiveContainers().size());
assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
}
@Test (timeout = 5000)
@ -1374,9 +1373,9 @@ public void testMultipleContainersWaitingForReservation() throws IOException {
// One container should get reservation and the other should get nothing
assertEquals(1024,
scheduler.appAttempts.get(attId1).getCurrentReservation().getMemory());
scheduler.getSchedulerApp(attId1).getCurrentReservation().getMemory());
assertEquals(0,
scheduler.appAttempts.get(attId2).getCurrentReservation().getMemory());
scheduler.getSchedulerApp(attId2).getCurrentReservation().getMemory());
}
@Test (timeout = 5000)
@ -1411,7 +1410,7 @@ public void testUserMaxRunningApps() throws Exception {
scheduler.handle(updateEvent);
// App 1 should be running
assertEquals(1, scheduler.appAttempts.get(attId1).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
"user1", 1);
@ -1420,7 +1419,7 @@ public void testUserMaxRunningApps() throws Exception {
scheduler.handle(updateEvent);
// App 2 should not be running
assertEquals(0, scheduler.appAttempts.get(attId2).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
// Request another container for app 1
createSchedulingRequestExistingApplication(1024, 1, attId1);
@ -1429,7 +1428,7 @@ public void testUserMaxRunningApps() throws Exception {
scheduler.handle(updateEvent);
// Request should be fulfilled
assertEquals(2, scheduler.appAttempts.get(attId1).getLiveContainers().size());
assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
}
@Test (timeout = 5000)
@ -1449,10 +1448,10 @@ public void testReservationWhileMultiplePriorities() throws IOException {
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
FSSchedulerApp app = scheduler.appAttempts.get(attId);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
assertEquals(1, app.getLiveContainers().size());
ContainerId containerId = scheduler.appAttempts.get(attId)
ContainerId containerId = scheduler.getSchedulerApp(attId)
.getLiveContainers().iterator().next().getContainerId();
// Cause reservation to be created
@ -1521,9 +1520,9 @@ public void testAclSubmitApplication() throws Exception {
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
"norealuserhasthisname2", 1);
FSSchedulerApp app1 = scheduler.appAttempts.get(attId1);
FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
assertNotNull("The application was not allowed", app1);
FSSchedulerApp app2 = scheduler.appAttempts.get(attId2);
FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
assertNull("The application was allowed", app2);
}
@ -1547,7 +1546,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
scheduler.addApplicationAttempt(appId);
scheduler.addApplicationAttempt(appId, false);
// 1 request with 2 nodes on the same rack. another request with 1 node on
// a different rack
@ -1566,14 +1565,14 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent1);
// should assign node local
assertEquals(1, scheduler.appAttempts.get(appId).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(appId).getLiveContainers().size());
// node 2 checks in
scheduler.update();
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(updateEvent2);
// should assign rack local
assertEquals(2, scheduler.appAttempts.get(appId).getLiveContainers().size());
assertEquals(2, scheduler.getSchedulerApp(appId).getLiveContainers().size());
}
@Test (timeout = 5000)
@ -1592,8 +1591,8 @@ public void testFifoWithinQueue() throws Exception {
"user1", 2);
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
"user1", 2);
FSSchedulerApp app1 = scheduler.appAttempts.get(attId1);
FSSchedulerApp app2 = scheduler.appAttempts.get(attId2);
FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true);
queue1.setPolicy(new FifoPolicy());
@ -1633,7 +1632,7 @@ public void testMaxAssign() throws Exception {
ApplicationAttemptId attId =
createSchedulingRequest(1024, "root.default", "user", 8);
FSSchedulerApp app = scheduler.appAttempts.get(attId);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
// set maxAssign to 2: only 2 containers should be allocated
scheduler.maxAssign = 2;
@ -1695,10 +1694,10 @@ public void testAssignContainer() throws Exception {
ApplicationAttemptId attId4 =
createSchedulingRequest(1024, fifoQueue, user, 4);
FSSchedulerApp app1 = scheduler.appAttempts.get(attId1);
FSSchedulerApp app2 = scheduler.appAttempts.get(attId2);
FSSchedulerApp app3 = scheduler.appAttempts.get(attId3);
FSSchedulerApp app4 = scheduler.appAttempts.get(attId4);
FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3);
FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4);
scheduler.getQueueManager().getLeafQueue(fifoQueue, true)
.setPolicy(SchedulingPolicy.parse("fifo"));
@ -1813,7 +1812,7 @@ public void testReservationThatDoesntFit() throws IOException {
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
FSSchedulerApp app = scheduler.appAttempts.get(attId);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
assertEquals(0, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
@ -1882,7 +1881,7 @@ public void testStrictLocality() throws IOException {
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.appAttempts.get(attId1);
FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
@ -1921,7 +1920,7 @@ public void testCancelStrictLocality() throws IOException {
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.appAttempts.get(attId1);
FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
@ -1954,7 +1953,7 @@ public void testReservationsStrictLocality() throws IOException {
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
"user1", 0);
FSSchedulerApp app = scheduler.appAttempts.get(attId);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true);
ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true);
@ -1994,7 +1993,7 @@ public void testNoMoreCpuOnNode() throws IOException {
ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
"user1", 2);
FSSchedulerApp app = scheduler.appAttempts.get(attId);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
@ -2014,10 +2013,10 @@ public void testBasicDRFAssignment() throws Exception {
ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
"user1", 2);
FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1);
FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
"user1", 2);
FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2);
FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterCapacity());
@ -2055,13 +2054,13 @@ public void testBasicDRFWithQueues() throws Exception {
ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
"user1", 2);
FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1);
FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
"user1", 2);
FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2);
FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
"user1", 2);
FSSchedulerApp app3 = scheduler.appAttempts.get(appAttId3);
FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterCapacity());
@ -2092,19 +2091,19 @@ public void testDRFHierarchicalQueues() throws Exception {
ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1);
FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2);
FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app3 = scheduler.appAttempts.get(appAttId3);
FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app4 = scheduler.appAttempts.get(appAttId4);
FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterCapacity());
@ -2184,7 +2183,7 @@ public void testHostPortNodeName() throws Exception {
NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.appAttempts.get(attId1);
FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
@ -2195,16 +2194,8 @@ public void testHostPortNodeName() throws Exception {
assertEquals(1, app.getLiveContainers().size());
}
@Test
public void testConcurrentAccessOnApplications() throws Exception {
FairScheduler fs = new FairScheduler();
TestCapacityScheduler.verifyConcurrentAccessOnApplications(
fs.appAttempts, FSSchedulerApp.class, FSLeafQueue.class);
}
private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) {
FSSchedulerApp app = scheduler.appAttempts.get(attId);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
FSLeafQueue queue = app.getQueue();
Collection<AppSchedulable> runnableApps =
queue.getRunnableAppSchedulables();
@ -2260,7 +2251,7 @@ public void testUserAndQueueMaxRunningApps() throws Exception {
// Remove app 1 and both app 2 and app 4 should becomes runnable in its place
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED);
new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent1);
verifyAppRunnable(attId2, true);
verifyQueueNumRunnable("queue2", 1, 0);
@ -2324,7 +2315,7 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception {
// Even though the app was removed from sub3, the app from sub2 gets to go
// because it came in first
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED);
new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent1);
verifyAppRunnable(attId4, true);
verifyQueueNumRunnable("queue1.sub2", 2, 0);
@ -2333,7 +2324,7 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception {
// Now test removal of a non-runnable app
AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.KILLED);
new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.KILLED, true);
scheduler.handle(appRemovedEvent2);
assertEquals(0, scheduler.maxRunningEnforcer.usersNonRunnableApps
.get("user1").size());
@ -2341,7 +2332,7 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception {
verifyQueueNumRunnable("queue1.sub3", 0, 0);
// verify it doesn't become runnable when there would be space for it
AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.FINISHED);
new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.FINISHED, true);
scheduler.handle(appRemovedEvent3);
verifyQueueNumRunnable("queue1.sub2", 1, 0);
verifyQueueNumRunnable("queue1.sub3", 0, 0);
@ -2378,7 +2369,7 @@ public void testContinuousScheduling() throws Exception {
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
fs.addApplicationAttempt(appAttemptId);
fs.addApplicationAttempt(appAttemptId, false);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request =
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
@ -2389,7 +2380,7 @@ public void testContinuousScheduling() throws Exception {
// at least one pass
Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500);
FSSchedulerApp app = fs.appAttempts.get(appAttemptId);
FSSchedulerApp app = fs.getSchedulerApp(appAttemptId);
// Wait until app gets resources.
while (app.getCurrentConsumption().equals(Resources.none())) { }
@ -2477,7 +2468,7 @@ public void testBlacklistNodes() throws Exception {
ApplicationAttemptId appAttemptId =
createSchedulingRequest(GB, "root.default", "user", 1);
FSSchedulerApp app = scheduler.appAttempts.get(appAttemptId);
FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId);
// Verify the blacklist can be updated independent of requesting containers
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
@ -2487,7 +2478,7 @@ public void testBlacklistNodes() throws Exception {
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host));
assertFalse(scheduler.appAttempts.get(appAttemptId).isBlacklisted(host));
assertFalse(scheduler.getSchedulerApp(appAttemptId).isBlacklisted(host));
List<ResourceRequest> update = Arrays.asList(
createResourceRequest(GB, node.getHostName(), 1, 0, true));

View File

@ -156,7 +156,7 @@ public void testAppAttemptMetrics() throws Exception {
SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user");
schedular.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId);
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
schedular.handle(attemptEvent);
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2);
@ -166,7 +166,7 @@ public void testAppAttemptMetrics() throws Exception {
"user");
schedular.handle(appEvent2);
SchedulerEvent attemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId);
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
schedular.handle(attemptEvent2);
int afterAppsSubmitted = metrics.getAppsSubmitted();
@ -203,7 +203,7 @@ public void testNodeLocalAssignment() throws Exception {
"user1");
scheduler.handle(appEvent);
AppAttemptAddedSchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId);
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent);
int memory = 64;
@ -293,7 +293,7 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
"user1");
scheduler.handle(appEvent);
AppAttemptAddedSchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId);
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(attemptEvent);
int memory = 1024;
@ -534,13 +534,6 @@ public void testFifoScheduler() throws Exception {
LOG.info("--- END: testFifoScheduler ---");
}
@Test
public void testConcurrentAccessOnApplications() throws Exception {
FifoScheduler fs = new FifoScheduler();
TestCapacityScheduler.verifyConcurrentAccessOnApplications(
fs.appAttempts, FiCaSchedulerApp.class, Queue.class);
}
@SuppressWarnings("resource")
@Test
public void testBlackListNodes() throws Exception {
@ -564,18 +557,18 @@ public void testBlackListNodes() throws Exception {
"user");
fs.handle(appEvent);
SchedulerEvent attemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId);
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
fs.handle(attemptEvent);
// Verify the blacklist can be updated independent of requesting containers
fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null);
Assert.assertTrue(fs.getApplication(appAttemptId).isBlacklisted(host));
Assert.assertTrue(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host));
Assert.assertFalse(fs.getApplication(appAttemptId).isBlacklisted(host));
Assert.assertFalse(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
rm.stop();
}

View File

@ -1390,7 +1390,7 @@ public void testAppAttempts() throws JSONException, Exception {
@Test
public void testMultipleAppAttempts() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 8192);
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
rm.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
@ -1403,11 +1403,13 @@ public void testMultipleAppAttempts() throws JSONException, Exception {
while (--retriesLeft > 0) {
RMAppEvent event =
new RMAppFailedAttemptEvent(app1.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
RMAppEventType.ATTEMPT_FAILED, "", false);
app1.handle(event);
rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
amNodeManager.nodeHeartbeat(true);
}
// kick the scheduler to allocate the am container.
amNodeManager.nodeHeartbeat(true);
rm.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.ALLOCATED);
assertEquals("incorrect number of attempts", maxAppAttempts,