YARN-2181. Added preemption info to logs and RM web UI. Contributed by Wangda Tan

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1609561 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-07-10 20:03:35 +00:00
parent 8f520386fb
commit c9fb040c87
17 changed files with 582 additions and 72 deletions

View File

@ -80,6 +80,9 @@ Release 2.5.0 - UNRELEASED
YARN-1366. Changed AMRMClient to re-register with RM and send outstanding requests
back to RM on work-preserving RM restart. (Rohith via jianhe)
YARN-2181. Added preemption info to logs and RM web UI. (Wangda Tan via
jianhe)
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

View File

@ -223,4 +223,11 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* @return the external user-facing state of ApplicationMaster.
*/
YarnApplicationState createApplicationState();
/**
* Get RMAppMetrics of the {@link RMApp}.
*
* @return metrics
*/
RMAppMetrics getRMAppMetrics();
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -78,6 +80,7 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.resource.Resources;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class RMAppImpl implements RMApp, Recoverable {
@ -1202,4 +1205,25 @@ public class RMAppImpl implements RMApp, Recoverable {
public Set<NodeId> getRanNodes() {
return ranNodes;
}
@Override
public RMAppMetrics getRMAppMetrics() {
Resource resourcePreempted = Resource.newInstance(0, 0);
int numAMContainerPreempted = 0;
int numNonAMContainerPreempted = 0;
for (RMAppAttempt attempt : attempts.values()) {
if (null != attempt) {
RMAppAttemptMetrics attemptMetrics =
attempt.getRMAppAttemptMetrics();
Resources.addTo(resourcePreempted,
attemptMetrics.getResourcePreempted());
numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0;
numNonAMContainerPreempted +=
attemptMetrics.getNumNonAMContainersPreempted();
}
}
return new RMAppMetrics(resourcePreempted,
numNonAMContainerPreempted, numAMContainerPreempted);
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.Resource;
public class RMAppMetrics {
final Resource resourcePreempted;
final int numNonAMContainersPreempted;
final int numAMContainersPreempted;
public RMAppMetrics(Resource resourcePreempted,
int numNonAMContainersPreempted, int numAMContainersPreempted) {
this.resourcePreempted = resourcePreempted;
this.numNonAMContainersPreempted = numNonAMContainersPreempted;
this.numAMContainersPreempted = numAMContainersPreempted;
}
public Resource getResourcePreempted() {
return resourcePreempted;
}
public int getNumNonAMContainersPreempted() {
return numNonAMContainersPreempted;
}
public int getNumAMContainersPreempted() {
return numAMContainersPreempted;
}
}

View File

@ -207,4 +207,10 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
* </ul>
*/
boolean shouldCountTowardsMaxAttemptRetry();
/**
* Get metrics from the {@link RMAppAttempt}
* @return metrics
*/
RMAppAttemptMetrics getRMAppAttemptMetrics();
}

View File

@ -165,6 +165,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private RMAppAttemptState stateBeforeFinalSaving;
private Object transitionTodo;
private RMAppAttemptMetrics attemptMetrics = null;
private static final StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
RMAppAttemptEventType,
@ -227,6 +229,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.KILL,
new FinalSavingTransition(new BaseFinalTransition(
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.CONTAINER_FINISHED,
new FinalSavingTransition(
new AMContainerCrashedBeforeRunningTransition(),
RMAppAttemptState.FAILED))
// Transitions from LAUNCHED_UNMANAGED_SAVING State
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
@ -336,7 +344,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// use by the next new attempt.
.addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED,
RMAppAttemptEventType.CONTAINER_FINISHED,
new ContainerFinishedAtFailedTransition())
new ContainerFinishedAtFinalStateTransition())
.addTransition(
RMAppAttemptState.FAILED,
RMAppAttemptState.FAILED,
@ -372,8 +380,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.KILL))
.addTransition(RMAppAttemptState.FINISHED,
RMAppAttemptState.FINISHED,
RMAppAttemptEventType.CONTAINER_FINISHED,
new ContainerFinishedAtFinalStateTransition())
// Transitions from KILLED State
.addTransition(
@ -386,10 +397,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
.addTransition(RMAppAttemptState.KILLED,
RMAppAttemptState.KILLED,
RMAppAttemptEventType.CONTAINER_FINISHED,
new ContainerFinishedAtFinalStateTransition())
.installTopology();
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@ -412,6 +426,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
this.maybeLastAttempt = maybeLastAttempt;
this.stateMachine = stateMachineFactory.make(this);
this.attemptMetrics = new RMAppAttemptMetrics(applicationAttemptId);
}
@Override
@ -688,6 +703,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
diagnostics.append("Attempt recovered after RM restart");
diagnostics.append(attemptState.getDiagnostics());
this.amContainerExitStatus = attemptState.getAMContainerExitStatus();
if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) {
this.attemptMetrics.setIsPreempted();
}
setMasterContainer(attemptState.getMasterContainer());
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
this.recoveredFinalState = attemptState.getState();
@ -1453,8 +1471,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
RMAppAttemptContainerFinishedEvent containerFinishedEvent
= (RMAppAttemptContainerFinishedEvent) event;
RMAppAttemptContainerFinishedEvent containerFinishedEvent =
(RMAppAttemptContainerFinishedEvent) event;
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
@ -1462,26 +1480,28 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// the AMContainer, AppAttempt fails
if (appAttempt.masterContainer != null
&& appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
containerStatus.getContainerId())) {
// Remember the follow up transition and save the final attempt state.
appAttempt.rememberTargetTransitionsAndStoreState(event,
transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
return RMAppAttemptState.FINAL_SAVING;
}
// Normal container.Put it in completedcontainers list
// Normal container.Put it in completed containers list
appAttempt.justFinishedContainers.add(containerStatus);
return this.currentState;
}
}
private static final class ContainerFinishedAtFailedTransition
private static final class ContainerFinishedAtFinalStateTransition
extends BaseTransition {
@Override
public void
transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
RMAppAttemptContainerFinishedEvent containerFinishedEvent =
(RMAppAttemptContainerFinishedEvent) event;
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
// Normal container. Add it in completed containers list
@ -1705,4 +1725,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
public boolean mayBeLastAttempt() {
return maybeLastAttempt;
}
@Override
public RMAppAttemptMetrics getRMAppAttemptMetrics() {
// didn't use read/write lock here because RMAppAttemptMetrics has its own
// lock
return attemptMetrics;
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
public class RMAppAttemptMetrics {
private static final Log LOG = LogFactory.getLog(RMAppAttemptMetrics.class);
private ApplicationAttemptId attemptId = null;
// preemption info
private Resource resourcePreempted = Resource.newInstance(0, 0);
private AtomicInteger numNonAMContainersPreempted = new AtomicInteger(0);
private AtomicBoolean isPreempted = new AtomicBoolean(false);
private ReadLock readLock;
private WriteLock writeLock;
public RMAppAttemptMetrics(ApplicationAttemptId attemptId) {
this.attemptId = attemptId;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
}
public void updatePreemptionInfo(Resource resource, RMContainer container) {
try {
writeLock.lock();
resourcePreempted = Resources.addTo(resourcePreempted, resource);
} finally {
writeLock.unlock();
}
if (!container.isAMContainer()) {
// container got preempted is not a master container
LOG.info(String.format(
"Non-AM container preempted, current appAttemptId=%s, "
+ "containerId=%s, resource=%s", attemptId,
container.getContainerId(), resource));
numNonAMContainersPreempted.incrementAndGet();
} else {
// container got preempted is a master container
LOG.info(String.format("AM container preempted, "
+ "current appAttemptId=%s, containerId=%s, resource=%s", attemptId,
container.getContainerId(), resource));
isPreempted.set(true);
}
}
public Resource getResourcePreempted() {
try {
readLock.lock();
return resourcePreempted;
} finally {
readLock.unlock();
}
}
public int getNumNonAMContainersPreempted() {
return numNonAMContainersPreempted.get();
}
public void setIsPreempted() {
this.isPreempted.set(true);
}
public boolean getIsPreempted() {
return this.isPreempted.get();
}
}

View File

@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@ -458,10 +460,30 @@ public class RMContainerImpl implements RMContainer {
container.finishTime = System.currentTimeMillis();
container.finishedStatus = finishedEvent.getRemoteContainerStatus();
// Inform AppAttempt
// container.getContainer() can return null when a RMContainer is a
// reserved container
updateMetricsIfPreempted(container);
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
container.rmContext.getRMApplicationHistoryWriter()
.containerFinished(container);
container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
container.rmContext.getRMApplicationHistoryWriter().containerFinished(
container);
}
private static void updateMetricsIfPreempted(RMContainerImpl container) {
// If this is a preempted container, update preemption metrics
if (ContainerExitStatus.PREEMPTED == container.finishedStatus
.getExitStatus()) {
Resource resource = container.getContainer().getResource();
RMAppAttempt rmAttempt =
container.rmContext.getRMApps()
.get(container.getApplicationAttemptId().getApplicationId())
.getCurrentAppAttempt();
rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
container);
}
}
}

View File

@ -36,7 +36,9 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@ -110,19 +112,42 @@ public class AppBlock extends HtmlBlock {
setTitle(join("Application ", aid));
info("Application Overview").
_("User:", app.getUser()).
_("Name:", app.getName()).
_("Application Type:", app.getApplicationType()).
_("Application Tags:", app.getApplicationTags()).
_("State:", app.getState()).
_("FinalStatus:", app.getFinalStatus()).
_("Started:", Times.format(app.getStartTime())).
_("Elapsed:", StringUtils.formatTime(
Times.elapsed(app.getStartTime(), app.getFinishTime()))).
_("Tracking URL:", !app.isTrackingUrlReady() ?
"#" : app.getTrackingUrlPretty(), app.getTrackingUI()).
_("Diagnostics:", app.getNote());
RMAppMetrics appMerics = rmApp.getRMAppMetrics();
RMAppAttemptMetrics attemptMetrics =
rmApp.getCurrentAppAttempt().getRMAppAttemptMetrics();
info("Application Overview")
._("User:", app.getUser())
._("Name:", app.getName())
._("Application Type:", app.getApplicationType())
._("Application Tags:", app.getApplicationTags())
._("State:", app.getState())
._("FinalStatus:", app.getFinalStatus())
._("Started:", Times.format(app.getStartTime()))
._("Elapsed:",
StringUtils.formatTime(Times.elapsed(app.getStartTime(),
app.getFinishTime())))
._("Tracking URL:",
!app.isTrackingUrlReady() ? "#" : app.getTrackingUrlPretty(),
app.getTrackingUI())
._("Diagnostics:", app.getNote());
DIV<Hamlet> pdiv = html.
_(InfoBlock.class).
div(_INFO_WRAP);
info("Application Overview").clear();
info("Application Metrics")
._("Total Resource Preempted:",
appMerics.getResourcePreempted())
._("Total Number of Non-AM Containers Preempted:",
String.valueOf(appMerics.getNumNonAMContainersPreempted()))
._("Total Number of AM Containers Preempted:",
String.valueOf(appMerics.getNumAMContainersPreempted()))
._("Resource Preempted from Current Attempt:",
attemptMetrics.getResourcePreempted())
._("Number of Non-AM Containers Preempted from Current Attempt:",
String.valueOf(attemptMetrics
.getNumNonAMContainersPreempted()));
pdiv._();
Collection<RMAppAttempt> attempts = rmApp.getAppAttempts().values();
String amString =

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
@ -79,6 +80,12 @@ public class AppInfo {
protected int allocatedVCores;
protected int runningContainers;
// preemption info fields
protected int preemptedResourceMB;
protected int preemptedResourceVCores;
protected int numNonAMContainerPreempted;
protected int numAMContainerPreempted;
public AppInfo() {
} // JAXB needs this
@ -147,6 +154,17 @@ public class AppInfo {
}
}
}
// copy preemption info fields
RMAppMetrics appMetrics = app.getRMAppMetrics();
numAMContainerPreempted =
appMetrics.getNumAMContainersPreempted();
preemptedResourceMB =
appMetrics.getResourcePreempted().getMemory();
numNonAMContainerPreempted =
appMetrics.getNumNonAMContainersPreempted();
preemptedResourceVCores =
appMetrics.getResourcePreempted().getVirtualCores();
}
}
@ -254,4 +272,19 @@ public class AppInfo {
return this.allocatedVCores;
}
public int getPreemptedMB() {
return preemptedResourceMB;
}
public int getPreemptedVCores() {
return preemptedResourceVCores;
}
public int getNumNonAMContainersPreempted() {
return numNonAMContainerPreempted;
}
public int getNumAMContainersPreempted() {
return numAMContainerPreempted;
}
}

View File

@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@ -34,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
@ -251,4 +251,22 @@ public class MockAM {
public ApplicationAttemptId getApplicationAttemptId() {
return this.attemptId;
}
public List<Container> allocateAndWaitForContainers(int nContainer,
int memory, MockNM nm) throws Exception {
// AM request for containers
allocate("ANY", memory, nContainer, null);
// kick the scheduler
nm.nodeHeartbeat(true);
List<Container> conts =
allocate(new ArrayList<ResourceRequest>(), null)
.getAllocatedContainers();
while (conts.size() < nContainer) {
nm.nodeHeartbeat(true);
conts.addAll(allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(500);
}
return conts;
}
}

View File

@ -32,10 +32,12 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -165,6 +167,11 @@ public abstract class MockAsm extends MockApps {
public Set<NodeId> getRanNodes() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public RMAppMetrics getRMAppMetrics() {
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0);
}
}
public static RMApp newApplication(int i) {

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -238,4 +239,13 @@ public class MockRMApp implements RMApp {
public Set<NodeId> getRanNodes() {
return null;
}
public Resource getResourcePreempted() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public RMAppMetrics getRMAppMetrics() {
throw new UnsupportedOperationException("Not supported yet.");
}
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -72,7 +74,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@ -707,6 +712,46 @@ public class TestCapacityScheduler {
}
}
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
return am;
}
private void waitForAppPreemptionInfo(RMApp app, Resource preempted,
int numAMPreempted, int numTaskPreempted,
Resource currentAttemptPreempted, boolean currentAttemptAMPreempted,
int numLatestAttemptTaskPreempted) throws InterruptedException {
while (true) {
RMAppMetrics appPM = app.getRMAppMetrics();
RMAppAttemptMetrics attemptPM =
app.getCurrentAppAttempt().getRMAppAttemptMetrics();
if (appPM.getResourcePreempted().equals(preempted)
&& appPM.getNumAMContainersPreempted() == numAMPreempted
&& appPM.getNumNonAMContainersPreempted() == numTaskPreempted
&& attemptPM.getResourcePreempted().equals(currentAttemptPreempted)
&& app.getCurrentAppAttempt().getRMAppAttemptMetrics()
.getIsPreempted() == currentAttemptAMPreempted
&& attemptPM.getNumNonAMContainersPreempted() ==
numLatestAttemptTaskPreempted) {
return;
}
Thread.sleep(500);
}
}
private void waitForNewAttemptCreated(RMApp app,
ApplicationAttemptId previousAttemptId) throws InterruptedException {
while (app.getCurrentAppAttempt().equals(previousAttemptId)) {
Thread.sleep(500);
}
}
@Test(timeout = 30000)
public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception {
final YarnConfiguration conf = new YarnConfiguration();
@ -828,4 +873,78 @@ public class TestCapacityScheduler {
cs.stop();
}
@Test(timeout = 120000)
public void testPreemptionInfo() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
int CONTAINER_MEMORY = 1024; // start RM
MockRM rm1 = new MockRM(conf);
rm1.start();
// get scheduler
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
// start NM
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// create app and launch the AM
RMApp app0 = rm1.submitApp(CONTAINER_MEMORY);
MockAM am0 = launchAM(app0, rm1, nm1);
// get scheduler app
FiCaSchedulerApp schedulerAppAttempt =
cs.getSchedulerApplications().get(app0.getApplicationId())
.getCurrentAppAttempt();
// allocate some containers and launch them
List<Container> allocatedContainers =
am0.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
// kill the 3 containers
for (Container c : allocatedContainers) {
cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check values
waitForAppPreemptionInfo(app0,
Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3,
Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
// kill app0-attempt0 AM container
cs.killContainer(schedulerAppAttempt.getRMContainer(app0
.getCurrentAppAttempt().getMasterContainer().getId()));
// wait for app0 failed
waitForNewAttemptCreated(app0, am0.getApplicationAttemptId());
// check values
waitForAppPreemptionInfo(app0,
Resource.newInstance(CONTAINER_MEMORY * 4, 4), 1, 3,
Resource.newInstance(0, 0), false, 0);
// launch app0-attempt1
MockAM am1 = launchAM(app0, rm1, nm1);
schedulerAppAttempt =
cs.getSchedulerApplications().get(app0.getApplicationId())
.getCurrentAppAttempt();
// allocate some containers and launch them
allocatedContainers =
am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
for (Container c : allocatedContainers) {
cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check values
waitForAppPreemptionInfo(app0,
Resource.newInstance(CONTAINER_MEMORY * 7, 7), 1, 6,
Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
rm1.stop();
}
}

View File

@ -41,13 +41,14 @@ import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -499,7 +500,10 @@ public class TestLeafQueue {
// Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
null, RMContainerEventType.KILL, null);
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
}
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -510,7 +514,10 @@ public class TestLeafQueue {
// Release each container from app_1
for (RMContainer rmContainer : app_1.getLiveContainers()) {
a.completedContainer(clusterResource, app_1, node_0, rmContainer,
null, RMContainerEventType.KILL, null);
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
@ -871,7 +878,10 @@ public class TestLeafQueue {
// 8. Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
null, RMContainerEventType.KILL, null);
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
}
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -882,7 +892,10 @@ public class TestLeafQueue {
// 9. Release each container from app_2
for (RMContainer rmContainer : app_2.getLiveContainers()) {
a.completedContainer(clusterResource, app_2, node_0, rmContainer,
null, RMContainerEventType.KILL, null);
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
}
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -893,7 +906,10 @@ public class TestLeafQueue {
// 10. Release each container from app_3
for (RMContainer rmContainer : app_3.getLiveContainers()) {
a.completedContainer(clusterResource, app_3, node_0, rmContainer,
null, RMContainerEventType.KILL, null);
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -979,9 +995,12 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Now free 1 container from app_0 i.e. 1G
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(),
null, RMContainerEventType.KILL, null);
RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -992,9 +1011,12 @@ public class TestLeafQueue {
assertEquals(1*GB, a.getMetrics().getAllocatedMB());
// Now finish another container from app_0 and fulfill the reservation
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(),
null, RMContainerEventType.KILL, null);
rmContainer = app_0.getLiveContainers().iterator().next();
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -1094,9 +1116,12 @@ public class TestLeafQueue {
assertEquals(6*GB, a.getMetrics().getAllocatedMB());
// Now free 1 container from app_0 and try to assign to node_0
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(),
null, RMContainerEventType.KILL, null);
RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -1186,9 +1211,12 @@ public class TestLeafQueue {
assertEquals(2*GB, node_0.getUsedResource().getMemory());
// Now free 1 container from app_0 i.e. 1G, and re-reserve it
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(),
null, RMContainerEventType.KILL, null);
RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1218,9 +1246,12 @@ public class TestLeafQueue {
assertEquals(2, app_1.getReReservations(priority));
// Now finish another container from app_0 and see the reservation cancelled
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(),
null, RMContainerEventType.KILL, null);
rmContainer = app_0.getLiveContainers().iterator().next();
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
CSAssignment assignment = a.assignContainers(clusterResource, node_0);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());

View File

@ -17,6 +17,13 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -32,16 +39,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
public class FairSchedulerTestBase {
protected static class MockClock implements Clock {
private long time = 0;
@ -153,6 +158,13 @@ public class FairSchedulerTestBase {
priority, numContainers, true);
ask.add(request);
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
RMApp rmApp = mock(RMApp.class);
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
new RMAppAttemptMetrics(id));
resourceManager.getRMContext().getRMApps()
.put(id.getApplicationId(), rmApp);
return id;
}

View File

@ -1310,33 +1310,44 @@ public class TestRMWebServicesApps extends JerseyTest {
WebServicesTestUtils.getXmlString(element, "amContainerLogs"),
WebServicesTestUtils.getXmlInt(element, "allocatedMB"),
WebServicesTestUtils.getXmlInt(element, "allocatedVCores"),
WebServicesTestUtils.getXmlInt(element, "runningContainers"));
WebServicesTestUtils.getXmlInt(element, "runningContainers"),
WebServicesTestUtils.getXmlInt(element, "preemptedResourceMB"),
WebServicesTestUtils.getXmlInt(element, "preemptedResourceVCores"),
WebServicesTestUtils.getXmlInt(element, "numNonAMContainerPreempted"),
WebServicesTestUtils.getXmlInt(element, "numAMContainerPreempted"));
}
}
public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
Exception {
// 20 because trackingUrl not assigned yet
assertEquals("incorrect number of elements", 20, info.length());
// 28 because trackingUrl not assigned yet
assertEquals("incorrect number of elements", 24, info.length());
verifyAppInfoGeneric(app, info.getString("id"), info.getString("user"),
info.getString("name"), info.getString("applicationType"), info.getString("queue"),
info.getString("state"), info.getString("finalStatus"),
(float) info.getDouble("progress"), info.getString("trackingUI"),
info.getString("diagnostics"), info.getLong("clusterId"),
info.getLong("startedTime"), info.getLong("finishedTime"),
info.getLong("elapsedTime"), info.getString("amHostHttpAddress"),
info.getString("amContainerLogs"), info.getInt("allocatedMB"),
info.getInt("allocatedVCores"), info.getInt("runningContainers"));
info.getString("name"), info.getString("applicationType"),
info.getString("queue"), info.getString("state"),
info.getString("finalStatus"), (float) info.getDouble("progress"),
info.getString("trackingUI"), info.getString("diagnostics"),
info.getLong("clusterId"), info.getLong("startedTime"),
info.getLong("finishedTime"), info.getLong("elapsedTime"),
info.getString("amHostHttpAddress"), info.getString("amContainerLogs"),
info.getInt("allocatedMB"), info.getInt("allocatedVCores"),
info.getInt("runningContainers"),
info.getInt("preemptedResourceMB"),
info.getInt("preemptedResourceVCores"),
info.getInt("numNonAMContainerPreempted"),
info.getInt("numAMContainerPreempted"));
}
public void verifyAppInfoGeneric(RMApp app, String id, String user,
String name, String applicationType, String queue, String state, String finalStatus,
float progress, String trackingUI, String diagnostics, long clusterId,
long startedTime, long finishedTime, long elapsedTime,
String amHostHttpAddress, String amContainerLogs, int allocatedMB,
int allocatedVCores, int numContainers) throws JSONException,
String name, String applicationType, String queue, String state,
String finalStatus, float progress, String trackingUI,
String diagnostics, long clusterId, long startedTime, long finishedTime,
long elapsedTime, String amHostHttpAddress, String amContainerLogs,
int allocatedMB, int allocatedVCores, int numContainers,
int preemptedResourceMB, int preemptedResourceVCores,
int numNonAMContainerPreempted, int numAMContainerPreempted) throws JSONException,
Exception {
WebServicesTestUtils.checkStringMatch("id", app.getApplicationId()
@ -1371,6 +1382,18 @@ public class TestRMWebServicesApps extends JerseyTest {
assertEquals("allocatedMB doesn't match", 1024, allocatedMB);
assertEquals("allocatedVCores doesn't match", 1, allocatedVCores);
assertEquals("numContainers doesn't match", 1, numContainers);
assertEquals("preemptedResourceMB doesn't match", app
.getRMAppMetrics().getResourcePreempted().getMemory(),
preemptedResourceMB);
assertEquals("preemptedResourceVCores doesn't match", app
.getRMAppMetrics().getResourcePreempted().getVirtualCores(),
preemptedResourceVCores);
assertEquals("numNonAMContainerPreempted doesn't match", app
.getRMAppMetrics().getNumNonAMContainersPreempted(),
numNonAMContainerPreempted);
assertEquals("numAMContainerPreempted doesn't match", app
.getRMAppMetrics().getNumAMContainersPreempted(),
numAMContainerPreempted);
}
@Test