YARN-3946. Update exact reason as to why a submitted app is in ACCEPTED state to app's diagnostic message. (Naganarasimha G R via wangda)
This commit is contained in:
parent
7fb212e5e6
commit
6cb0af3c39
|
@ -610,6 +610,9 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
YARN-4248. REST API for submit/update/delete Reservations. (curino)
|
||||
|
||||
YARN-3946. Update exact reason as to why a submitted app is in ACCEPTED state to
|
||||
app's diagnostic message. (Naganarasimha G R via wangda)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
|
|
@ -645,7 +645,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
progress = currentAttempt.getProgress();
|
||||
logAggregationStatus = this.getLogAggregationStatusForAppReport();
|
||||
}
|
||||
diags = this.diagnostics.toString();
|
||||
//if the diagnostics is not already set get it from attempt
|
||||
diags = getDiagnostics().toString();
|
||||
|
||||
if (currentAttempt != null &&
|
||||
currentAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
|
||||
|
@ -750,8 +751,13 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
@Override
|
||||
public StringBuilder getDiagnostics() {
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
if (diagnostics.length() == 0 && getCurrentAppAttempt() != null) {
|
||||
String appAttemptDiagnostics = getCurrentAppAttempt().getDiagnostics();
|
||||
if (appAttemptDiagnostics != null) {
|
||||
return new StringBuilder(appAttemptDiagnostics);
|
||||
}
|
||||
}
|
||||
return this.diagnostics;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
|
|
|
@ -246,4 +246,10 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
|||
* @return the finish time of the application attempt.
|
||||
*/
|
||||
long getFinishTime();
|
||||
|
||||
/**
|
||||
* To capture Launch diagnostics of the app.
|
||||
* @param amLaunchDiagnostics
|
||||
*/
|
||||
void updateAMLaunchDiagnostics(String amLaunchDiagnostics);
|
||||
}
|
||||
|
|
|
@ -73,11 +73,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventT
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistUpdates;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
|
@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
|
@ -187,6 +188,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
private ResourceRequest amReq = null;
|
||||
private BlacklistManager blacklistedNodesForAM = null;
|
||||
|
||||
private String amLaunchDiagnostics;
|
||||
|
||||
private static final StateMachineFactory<RMAppAttemptImpl,
|
||||
RMAppAttemptState,
|
||||
RMAppAttemptEventType,
|
||||
|
@ -703,8 +706,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
@Override
|
||||
public String getDiagnostics() {
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
if (diagnostics.length() == 0 && amLaunchDiagnostics != null) {
|
||||
return amLaunchDiagnostics;
|
||||
}
|
||||
return this.diagnostics.toString();
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
|
@ -1409,6 +1414,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
appAttempt.launchAMStartTime;
|
||||
ClusterMetrics.getMetrics().addAMLaunchDelay(delay);
|
||||
}
|
||||
|
||||
appAttempt
|
||||
.updateAMLaunchDiagnostics(AMState.LAUNCHED.getDiagnosticMessage());
|
||||
// Register with AMLivelinessMonitor
|
||||
appAttempt.attemptLaunched();
|
||||
|
||||
|
@ -1507,6 +1515,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
appAttempt.originalTrackingUrl =
|
||||
sanitizeTrackingUrl(registrationEvent.getTrackingurl());
|
||||
|
||||
// reset AMLaunchDiagnostics once AM Registers with RM
|
||||
appAttempt.updateAMLaunchDiagnostics(null);
|
||||
|
||||
// Let the app know
|
||||
appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
|
||||
.getAppAttemptId().getApplicationId(),
|
||||
|
@ -2097,4 +2108,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateAMLaunchDiagnostics(String amLaunchDiagnostics) {
|
||||
this.amLaunchDiagnostics = amLaunchDiagnostics;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
|
@ -30,6 +29,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.lang.time.DateUtils;
|
||||
import org.apache.commons.lang.time.FastDateFormat;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
|
@ -85,6 +86,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
private static final Log LOG = LogFactory
|
||||
.getLog(SchedulerApplicationAttempt.class);
|
||||
|
||||
private FastDateFormat fdf =
|
||||
FastDateFormat.getInstance("EEE MMM dd HH:mm:ss Z yyyy");
|
||||
|
||||
private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000;
|
||||
protected long lastMemoryAggregateAllocationUpdateTime = 0;
|
||||
private long lastMemorySeconds = 0;
|
||||
|
@ -148,10 +152,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
protected Queue queue;
|
||||
protected boolean isStopped = false;
|
||||
|
||||
private String appAMNodePartitionName = CommonNodeLabelsManager.NO_LABEL;
|
||||
protected String appAMNodePartitionName = CommonNodeLabelsManager.NO_LABEL;
|
||||
|
||||
protected final RMContext rmContext;
|
||||
|
||||
|
||||
private RMAppAttempt appAttempt;
|
||||
|
||||
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
|
||||
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
||||
RMContext rmContext) {
|
||||
|
@ -166,9 +172,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
if (rmContext.getRMApps() != null &&
|
||||
rmContext.getRMApps()
|
||||
.containsKey(applicationAttemptId.getApplicationId())) {
|
||||
RMApp rmApp = rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
|
||||
ApplicationSubmissionContext appSubmissionContext =
|
||||
rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
|
||||
rmApp
|
||||
.getApplicationSubmissionContext();
|
||||
appAttempt = rmApp.getCurrentAppAttempt();
|
||||
if (appSubmissionContext != null) {
|
||||
unmanagedAM = appSubmissionContext.getUnmanagedAM();
|
||||
this.logAggregationContext =
|
||||
|
@ -489,7 +497,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
ContainerType containerType = ContainerType.TASK;
|
||||
// The working knowledge is that masterContainer for AM is null as it
|
||||
// itself is the master container.
|
||||
if (isWaitingForAMContainer(getApplicationId())) {
|
||||
if (isWaitingForAMContainer()) {
|
||||
containerType = ContainerType.APPLICATION_MASTER;
|
||||
}
|
||||
try {
|
||||
|
@ -575,13 +583,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
return returnList;
|
||||
}
|
||||
|
||||
public boolean isWaitingForAMContainer(ApplicationId applicationId) {
|
||||
public boolean isWaitingForAMContainer() {
|
||||
// The working knowledge is that masterContainer for AM is null as it
|
||||
// itself is the master container.
|
||||
RMAppAttempt appAttempt =
|
||||
rmContext.getRMApps().get(applicationId).getCurrentAppAttempt();
|
||||
return (appAttempt != null && appAttempt.getMasterContainer() == null
|
||||
&& appAttempt.getSubmissionContext().getUnmanagedAM() == false);
|
||||
return (!unmanagedAM && appAttempt.getMasterContainer() == null);
|
||||
}
|
||||
|
||||
// Blacklist used for user containers
|
||||
|
@ -603,7 +608,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
}
|
||||
|
||||
public boolean isBlacklisted(String resourceName) {
|
||||
boolean useAMBlacklist = isWaitingForAMContainer(getApplicationId());
|
||||
boolean useAMBlacklist = isWaitingForAMContainer();
|
||||
return this.appSchedulingInfo.isBlacklisted(resourceName, useAMBlacklist);
|
||||
}
|
||||
|
||||
|
@ -905,4 +910,71 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
public String getAppAMNodePartitionName() {
|
||||
return appAMNodePartitionName;
|
||||
}
|
||||
|
||||
public void updateAMContainerDiagnostics(AMState state,
|
||||
String diagnosticMessage) {
|
||||
if (!isWaitingForAMContainer()) {
|
||||
return;
|
||||
}
|
||||
StringBuilder diagnosticMessageBldr = new StringBuilder();
|
||||
diagnosticMessageBldr.append("[");
|
||||
diagnosticMessageBldr.append(fdf.format(System.currentTimeMillis()));
|
||||
diagnosticMessageBldr.append("] ");
|
||||
switch (state) {
|
||||
case INACTIVATED:
|
||||
diagnosticMessageBldr.append(state.diagnosticMessage);
|
||||
if (diagnosticMessage != null) {
|
||||
diagnosticMessageBldr.append(diagnosticMessage);
|
||||
}
|
||||
getPendingAppDiagnosticMessage(diagnosticMessageBldr);
|
||||
break;
|
||||
case ACTIVATED:
|
||||
diagnosticMessageBldr.append(state.diagnosticMessage);
|
||||
if (diagnosticMessage != null) {
|
||||
diagnosticMessageBldr.append(diagnosticMessage);
|
||||
}
|
||||
getActivedAppDiagnosticMessage(diagnosticMessageBldr);
|
||||
break;
|
||||
default:
|
||||
// UNMANAGED , ASSIGNED
|
||||
diagnosticMessageBldr.append(state.diagnosticMessage);
|
||||
break;
|
||||
}
|
||||
appAttempt.updateAMLaunchDiagnostics(diagnosticMessageBldr.toString());
|
||||
}
|
||||
|
||||
protected void getPendingAppDiagnosticMessage(
|
||||
StringBuilder diagnosticMessage) {
|
||||
// Give the specific information which might be applicable for the
|
||||
// respective scheduler
|
||||
// like partitionAMResourcelimit,UserAMResourceLimit, queue'AMResourceLimit
|
||||
}
|
||||
|
||||
protected void getActivedAppDiagnosticMessage(
|
||||
StringBuilder diagnosticMessage) {
|
||||
// Give the specific information which might be applicable for the
|
||||
// respective scheduler
|
||||
// queue's resource usage for specific partition
|
||||
}
|
||||
|
||||
public static enum AMState {
|
||||
UNMANAGED("User launched the Application Master, since it's unmanaged. "),
|
||||
INACTIVATED("Application is added to the scheduler and is not yet activated. "),
|
||||
ACTIVATED("Application is Activated, waiting for resources to be assigned for AM. "),
|
||||
ASSIGNED("Scheduler has assigned a container for AM, waiting for AM "
|
||||
+ "container to be launched"),
|
||||
LAUNCHED("AM container is launched, waiting for AM container to Register "
|
||||
+ "with RM")
|
||||
;
|
||||
|
||||
private String diagnosticMessage;
|
||||
|
||||
AMState(String diagnosticMessage) {
|
||||
this.diagnosticMessage = diagnosticMessage;
|
||||
}
|
||||
|
||||
public String getDiagnosticMessage() {
|
||||
return diagnosticMessage;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
/**
|
||||
* diagnostic messages for AMcontainer launching
|
||||
*/
|
||||
public interface CSAMContainerLaunchDiagnosticsConstants {
|
||||
String SKIP_AM_ALLOCATION_IN_IGNORE_EXCLUSIVE_MODE =
|
||||
"Skipping assigning to Node in Ignore Exclusivity mode. ";
|
||||
String SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE =
|
||||
"Skipped scheduling for this Node as its black listed. ";
|
||||
String SKIP_AM_ALLOCATION_DUE_TO_LOCALITY =
|
||||
"Skipping assigning to Node as request locality is not matching. ";
|
||||
String QUEUE_AM_RESOURCE_LIMIT_EXCEED =
|
||||
"Queue's AM resource limit exceeded. ";
|
||||
String USER_AM_RESOURCE_LIMIT_EXCEED = "User's AM resource limit exceeded. ";
|
||||
String LAST_NODE_PROCESSED_MSG =
|
||||
" Last Node which was processed for the application : ";
|
||||
}
|
|
@ -960,7 +960,7 @@ public class CapacityScheduler extends
|
|||
updateDemandForQueue = (LeafQueue) application.getQueue();
|
||||
}
|
||||
|
||||
if (application.isWaitingForAMContainer(application.getApplicationId())) {
|
||||
if (application.isWaitingForAMContainer()) {
|
||||
// Allocate is for AM and update AM blacklist for this
|
||||
application.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
|
@ -635,7 +636,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
queueUsage.getAMUsed(partitionName));
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("application AMResource "
|
||||
LOG.debug("application "+application.getId() +" AMResource "
|
||||
+ application.getAMResource(partitionName)
|
||||
+ " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
|
||||
+ " amLimit " + amLimit + " lastClusterResource "
|
||||
|
@ -654,6 +655,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
+ " skipping enforcement to allow at least one application"
|
||||
+ " to start");
|
||||
} else {
|
||||
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
||||
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
|
||||
LOG.info("Not activating application " + applicationId
|
||||
+ " as amIfStarted: " + amIfStarted + " exceeds amLimit: "
|
||||
+ amLimit);
|
||||
|
@ -686,6 +689,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
+ " low. skipping enforcement to allow at least one application"
|
||||
+ " to start");
|
||||
} else {
|
||||
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
||||
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
|
||||
LOG.info("Not activating application " + applicationId
|
||||
+ " for user: " + user + " as userAmIfStarted: "
|
||||
+ userAmIfStarted + " exceeds userAmLimit: " + userAMLimit);
|
||||
|
@ -694,6 +699,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
}
|
||||
user.activateApplication();
|
||||
orderingPolicy.addSchedulableEntity(application);
|
||||
application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
|
||||
|
||||
queueUsage.incAMUsed(partitionName,
|
||||
application.getAMResource(partitionName));
|
||||
user.getResourceUsage().incAMUsed(partitionName,
|
||||
|
@ -869,6 +876,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
// Check user limit
|
||||
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
|
||||
application, node.getPartition(), currentResourceLimits)) {
|
||||
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
|
||||
"User capacity has reached its maximum limit.");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -904,7 +913,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
// Done
|
||||
return assignment;
|
||||
} else if (!assignment.getSkipped()) {
|
||||
} else if (assignment.getSkipped()) {
|
||||
application.updateNodeInfoForAMDiagnostics(node);
|
||||
} else {
|
||||
// If we don't allocate anything, and it is not skipped by application,
|
||||
// we will return to respect FIFO of applications
|
||||
return CSAssignment.NULL_ASSIGNMENT;
|
||||
|
|
|
@ -28,12 +28,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
|
@ -79,6 +79,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|||
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
||||
ResourceLimits resourceLimits, Priority priority) {
|
||||
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
|
||||
application.updateAppSkipNodeDiagnostics(
|
||||
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
|
||||
return ContainerAllocation.APP_SKIPPED;
|
||||
}
|
||||
|
||||
|
@ -99,16 +101,14 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|||
// AM container allocation doesn't support non-exclusive allocation to
|
||||
// avoid painful of preempt an AM container
|
||||
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
|
||||
RMAppAttempt rmAppAttempt =
|
||||
rmContext.getRMApps().get(application.getApplicationId())
|
||||
.getCurrentAppAttempt();
|
||||
if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
|
||||
&& null == rmAppAttempt.getMasterContainer()) {
|
||||
if (application.isWaitingForAMContainer()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skip allocating AM container to app_attempt="
|
||||
+ application.getApplicationAttemptId()
|
||||
+ ", don't allow to allocate AM container in non-exclusive mode");
|
||||
}
|
||||
application.updateAppSkipNodeDiagnostics(
|
||||
"Skipping assigning to Node in Ignore Exclusivity mode. ");
|
||||
return ContainerAllocation.APP_SKIPPED;
|
||||
}
|
||||
}
|
||||
|
@ -318,6 +318,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|||
schedulingMode, currentResoureLimits);
|
||||
}
|
||||
|
||||
application.updateAppSkipNodeDiagnostics(
|
||||
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY);
|
||||
return ContainerAllocation.APP_SKIPPED;
|
||||
}
|
||||
|
||||
|
@ -621,6 +623,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|||
|
||||
// something went wrong getting/creating the container
|
||||
if (container == null) {
|
||||
application
|
||||
.updateAppSkipNodeDiagnostics("Scheduling of container failed. ");
|
||||
LOG.warn("Couldn't get container for allocation!");
|
||||
return ContainerAllocation.APP_SKIPPED;
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -54,13 +55,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
@ -87,6 +90,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
|
||||
private AbstractContainerAllocator containerAllocator;
|
||||
|
||||
/**
|
||||
* to hold the message if its app doesn't not get container from a node
|
||||
*/
|
||||
private String appSkipNodeDiagnostics;
|
||||
|
||||
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
|
||||
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
||||
RMContext rmContext) {
|
||||
|
@ -187,6 +195,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
|
||||
request.getNodeLabelExpression());
|
||||
|
||||
updateAMContainerDiagnostics(AMState.ASSIGNED, null);
|
||||
|
||||
// Add it to allContainers list.
|
||||
newlyAllocatedContainers.add(rmContainer);
|
||||
liveContainers.put(container.getId(), rmContainer);
|
||||
|
@ -502,4 +512,85 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
getCSLeafQueue().incAMUsedResource(newPartition, containerResource, this);
|
||||
}
|
||||
}
|
||||
|
||||
protected void getPendingAppDiagnosticMessage(
|
||||
StringBuilder diagnosticMessage) {
|
||||
LeafQueue queue = getCSLeafQueue();
|
||||
diagnosticMessage.append(" Details : AM Partition = ");
|
||||
diagnosticMessage.append(appAMNodePartitionName.isEmpty()
|
||||
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appAMNodePartitionName);
|
||||
diagnosticMessage.append("; ");
|
||||
diagnosticMessage.append("AM Resource Request = ");
|
||||
diagnosticMessage.append(getAMResource(appAMNodePartitionName));
|
||||
diagnosticMessage.append("; ");
|
||||
diagnosticMessage.append("Queue Resource Limit for AM = ");
|
||||
diagnosticMessage
|
||||
.append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName));
|
||||
diagnosticMessage.append("; ");
|
||||
diagnosticMessage.append("User AM Resource Limit of the queue = ");
|
||||
diagnosticMessage.append(
|
||||
queue.getUserAMResourceLimitPerPartition(appAMNodePartitionName));
|
||||
diagnosticMessage.append("; ");
|
||||
diagnosticMessage.append("Queue AM Resource Usage = ");
|
||||
diagnosticMessage.append(
|
||||
queue.getQueueResourceUsage().getAMUsed(appAMNodePartitionName));
|
||||
diagnosticMessage.append("; ");
|
||||
}
|
||||
|
||||
protected void getActivedAppDiagnosticMessage(
|
||||
StringBuilder diagnosticMessage) {
|
||||
LeafQueue queue = getCSLeafQueue();
|
||||
QueueCapacities queueCapacities = queue.getQueueCapacities();
|
||||
diagnosticMessage.append(" Details : AM Partition = ");
|
||||
diagnosticMessage.append(appAMNodePartitionName.isEmpty()
|
||||
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appAMNodePartitionName);
|
||||
diagnosticMessage.append(" ; ");
|
||||
diagnosticMessage.append("Partition Resource = ");
|
||||
diagnosticMessage.append(rmContext.getNodeLabelManager()
|
||||
.getResourceByLabel(appAMNodePartitionName, Resources.none()));
|
||||
diagnosticMessage.append(" ; ");
|
||||
diagnosticMessage.append("Queue's Absolute capacity = ");
|
||||
diagnosticMessage.append(
|
||||
queueCapacities.getAbsoluteCapacity(appAMNodePartitionName) * 100);
|
||||
diagnosticMessage.append(" % ; ");
|
||||
diagnosticMessage.append("Queue's Absolute used capacity = ");
|
||||
diagnosticMessage.append(
|
||||
queueCapacities.getAbsoluteUsedCapacity(appAMNodePartitionName) * 100);
|
||||
diagnosticMessage.append(" % ; ");
|
||||
diagnosticMessage.append("Queue's Absolute max capacity = ");
|
||||
diagnosticMessage.append(
|
||||
queueCapacities.getAbsoluteMaximumCapacity(appAMNodePartitionName)
|
||||
* 100);
|
||||
diagnosticMessage.append(" % ; ");
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the message temporarily if the reason is known for why scheduling did
|
||||
* not happen for a given node, if not message will be over written
|
||||
* @param message
|
||||
*/
|
||||
public void updateAppSkipNodeDiagnostics(String message) {
|
||||
this.appSkipNodeDiagnostics = message;
|
||||
}
|
||||
|
||||
public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) {
|
||||
if (isWaitingForAMContainer()) {
|
||||
StringBuilder diagnosticMessageBldr = new StringBuilder();
|
||||
if (appSkipNodeDiagnostics != null) {
|
||||
diagnosticMessageBldr.append(appSkipNodeDiagnostics);
|
||||
appSkipNodeDiagnostics = null;
|
||||
}
|
||||
diagnosticMessageBldr.append(
|
||||
CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG);
|
||||
diagnosticMessageBldr.append(node.getNodeID());
|
||||
diagnosticMessageBldr.append(" ( Partition : ");
|
||||
diagnosticMessageBldr.append(node.getLabels());
|
||||
diagnosticMessageBldr.append(", Total resource : ");
|
||||
diagnosticMessageBldr.append(node.getTotalResource());
|
||||
diagnosticMessageBldr.append(", Available resource : ");
|
||||
diagnosticMessageBldr.append(node.getAvailableResource());
|
||||
diagnosticMessageBldr.append(" ).");
|
||||
updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,9 +75,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
|
@ -1008,7 +1008,7 @@ public class FairScheduler extends
|
|||
preemptionContainerIds.add(container.getContainerId());
|
||||
}
|
||||
|
||||
if (application.isWaitingForAMContainer(application.getApplicationId())) {
|
||||
if (application.isWaitingForAMContainer()) {
|
||||
// Allocate is for AM and update AM blacklist for this
|
||||
application.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
|
|
|
@ -359,7 +359,7 @@ public class FifoScheduler extends
|
|||
" #ask=" + ask.size());
|
||||
}
|
||||
|
||||
if (application.isWaitingForAMContainer(application.getApplicationId())) {
|
||||
if (application.isWaitingForAMContainer()) {
|
||||
// Allocate is for AM and update AM blacklist for this
|
||||
application.updateAMBlacklist(
|
||||
blacklistAdditions, blacklistRemovals);
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
|
@ -80,9 +81,12 @@ public class TestCapacitySchedulerPlanFollower extends
|
|||
ConcurrentMap<ApplicationId, RMApp> spyApps =
|
||||
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
|
||||
RMApp rmApp = mock(RMApp.class);
|
||||
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
|
||||
when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
|
||||
.thenReturn(null);
|
||||
.thenReturn(rmAppAttempt);
|
||||
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
|
||||
Mockito.doReturn(true).when(spyApps).containsKey((ApplicationId) Matchers.any());
|
||||
when(spyRMContext.getRMApps()).thenReturn(spyApps);
|
||||
when(spyRMContext.getScheduler()).thenReturn(scheduler);
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
|
@ -602,7 +603,13 @@ public class TestApplicationLimits {
|
|||
when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
|
||||
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
|
||||
when(spyRMContext.getRMApps()).thenReturn(spyApps);
|
||||
|
||||
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
|
||||
when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
|
||||
.thenReturn(rmAppAttempt);
|
||||
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
|
||||
Mockito.doReturn(true).when(spyApps)
|
||||
.containsKey((ApplicationId) Matchers.any());
|
||||
|
||||
Priority priority_1 = TestUtils.createMockPriority(1);
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabels
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -120,15 +121,13 @@ public class TestApplicationLimitsByPartition {
|
|||
|
||||
// Submit app1 with 1Gb AM resource to Queue A1 for label X
|
||||
RMApp app1 = rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
||||
MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
// Submit app2 with 1Gb AM resource to Queue A1 for label X
|
||||
RMApp app2 = rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
||||
MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||
|
||||
// Submit 3rd app to Queue A1 for label X, and this will be pending as
|
||||
// AM limit is already crossed for label X. (2GB)
|
||||
rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
||||
RMApp pendingApp = rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
||||
|
@ -138,6 +137,16 @@ public class TestApplicationLimitsByPartition {
|
|||
// pending.
|
||||
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
|
||||
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
||||
Assert.assertTrue("AM diagnostics not set properly", app1.getDiagnostics()
|
||||
.toString().contains(AMState.ACTIVATED.getDiagnosticMessage()));
|
||||
Assert.assertTrue("AM diagnostics not set properly", app2.getDiagnostics()
|
||||
.toString().contains(AMState.ACTIVATED.getDiagnosticMessage()));
|
||||
Assert.assertTrue("AM diagnostics not set properly",
|
||||
pendingApp.getDiagnostics().toString()
|
||||
.contains(AMState.INACTIVATED.getDiagnosticMessage()));
|
||||
Assert.assertTrue("AM diagnostics not set properly",
|
||||
pendingApp.getDiagnostics().toString().contains(
|
||||
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED));
|
||||
|
||||
// Now verify the same test case in Queue C1 where label is not configured.
|
||||
// Submit an app to Queue C1 with empty label
|
||||
|
@ -150,7 +159,7 @@ public class TestApplicationLimitsByPartition {
|
|||
|
||||
// Submit 3rd app to Queue C1. This will be pending as Queue's am-limit
|
||||
// is reached.
|
||||
rm1.submitApp(GB, "app", "user", null, "c1");
|
||||
pendingApp = rm1.submitApp(GB, "app", "user", null, "c1");
|
||||
|
||||
leafQueue = (LeafQueue) cs.getQueue("c1");
|
||||
Assert.assertNotNull(leafQueue);
|
||||
|
@ -159,6 +168,12 @@ public class TestApplicationLimitsByPartition {
|
|||
// is reached.
|
||||
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
|
||||
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
||||
Assert.assertTrue("AM diagnostics not set properly",
|
||||
pendingApp.getDiagnostics().toString()
|
||||
.contains(AMState.INACTIVATED.getDiagnosticMessage()));
|
||||
Assert.assertTrue("AM diagnostics not set properly",
|
||||
pendingApp.getDiagnostics().toString().contains(
|
||||
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED));
|
||||
|
||||
rm1.killApp(app3.getApplicationId());
|
||||
Thread.sleep(1000);
|
||||
|
@ -288,11 +303,10 @@ public class TestApplicationLimitsByPartition {
|
|||
|
||||
// Submit app1 (2 GB) to Queue A1 and label X
|
||||
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "x");
|
||||
MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
// Submit 2nd app to label "X" with one GB. Since queue am-limit is 2GB,
|
||||
// 2nd app will be pending and first one will get activated.
|
||||
rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
||||
RMApp pendingApp = rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
||||
|
@ -302,6 +316,14 @@ public class TestApplicationLimitsByPartition {
|
|||
// used for partition "x" also.
|
||||
Assert.assertEquals(1, leafQueue.getNumActiveApplications());
|
||||
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
||||
Assert.assertTrue("AM diagnostics not set properly", app1.getDiagnostics()
|
||||
.toString().contains(AMState.ACTIVATED.getDiagnosticMessage()));
|
||||
Assert.assertTrue("AM diagnostics not set properly",
|
||||
pendingApp.getDiagnostics().toString()
|
||||
.contains(AMState.INACTIVATED.getDiagnosticMessage()));
|
||||
Assert.assertTrue("AM diagnostics not set properly",
|
||||
pendingApp.getDiagnostics().toString()
|
||||
.contains(CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED));
|
||||
rm1.close();
|
||||
}
|
||||
|
||||
|
@ -381,7 +403,7 @@ public class TestApplicationLimitsByPartition {
|
|||
// 4Gb -> 40% of label "X" in queue A1
|
||||
// Since we have 2 users, 50% of 4Gb will be max for each user. Here user1
|
||||
// has already crossed this 2GB limit, hence this app will be pending.
|
||||
rm1.submitApp(GB, "app", user_1, null, "a1", "x");
|
||||
RMApp pendingApp = rm1.submitApp(GB, "app", user_1, null, "a1", "x");
|
||||
|
||||
// Verify active applications count per user and also in queue level.
|
||||
Assert.assertEquals(3, leafQueue.getNumActiveApplications());
|
||||
|
@ -389,6 +411,14 @@ public class TestApplicationLimitsByPartition {
|
|||
Assert.assertEquals(2, leafQueue.getNumActiveApplications(user_1));
|
||||
Assert.assertEquals(1, leafQueue.getNumPendingApplications(user_1));
|
||||
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
||||
|
||||
//verify Diagnostic messages
|
||||
Assert.assertTrue("AM diagnostics not set properly",
|
||||
pendingApp.getDiagnostics().toString()
|
||||
.contains(AMState.INACTIVATED.getDiagnosticMessage()));
|
||||
Assert.assertTrue("AM diagnostics not set properly",
|
||||
pendingApp.getDiagnostics().toString().contains(
|
||||
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED));
|
||||
rm1.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -1028,10 +1028,11 @@ public class TestNodeLabelContainerAllocation {
|
|||
|
||||
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||
rm1.start();
|
||||
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
|
||||
String nodeIdStr = "h1:1234";
|
||||
MockNM nm1 = rm1.registerNode(nodeIdStr, 8 * GB); // label = x
|
||||
|
||||
// launch an app to queue b1 (label = y), AM container should be launched in nm3
|
||||
rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
||||
RMApp app = rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||
|
@ -1040,7 +1041,17 @@ public class TestNodeLabelContainerAllocation {
|
|||
for (int i = 0; i < 50; i++) {
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||
}
|
||||
|
||||
|
||||
Assert.assertTrue(
|
||||
"Scheduler diagnostics should have reason for not assigning the node",
|
||||
app.getDiagnostics().toString().contains(
|
||||
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_IGNORE_EXCLUSIVE_MODE));
|
||||
|
||||
Assert.assertTrue(
|
||||
"Scheduler diagnostics should have last processed node information",
|
||||
app.getDiagnostics().toString().contains(
|
||||
CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG
|
||||
+ nodeIdStr + " ( Partition : [x]"));
|
||||
Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
|
||||
.getNumContainers());
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
|
@ -65,6 +67,7 @@ import org.apache.log4j.LogManager;
|
|||
import org.apache.log4j.Logger;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestReservations {
|
||||
|
||||
|
@ -188,6 +191,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_0 = spy(app_0);
|
||||
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||
|
||||
a.submitApplicationAttempt(app_0, user_0);
|
||||
|
@ -196,6 +202,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(1, 0);
|
||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_1 = spy(app_1);
|
||||
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
a.submitApplicationAttempt(app_1, user_0);
|
||||
|
||||
// Setup some nodes
|
||||
|
@ -348,6 +357,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_0 = spy(app_0);
|
||||
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||
|
||||
a.submitApplicationAttempt(app_0, user_0);
|
||||
|
@ -356,6 +368,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(1, 0);
|
||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_1 = spy(app_1);
|
||||
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
a.submitApplicationAttempt(app_1, user_0);
|
||||
|
||||
// Setup some nodes
|
||||
|
@ -502,6 +517,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_0 = spy(app_0);
|
||||
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||
|
||||
a.submitApplicationAttempt(app_0, user_0);
|
||||
|
@ -510,6 +528,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(1, 0);
|
||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_1 = spy(app_1);
|
||||
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
a.submitApplicationAttempt(app_1, user_0);
|
||||
|
||||
// Setup some nodes
|
||||
|
@ -758,6 +779,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_0 = spy(app_0);
|
||||
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||
|
||||
a.submitApplicationAttempt(app_0, user_0);
|
||||
|
@ -766,6 +790,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(1, 0);
|
||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_1 = spy(app_1);
|
||||
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
a.submitApplicationAttempt(app_1, user_0);
|
||||
|
||||
// Setup some nodes
|
||||
|
@ -927,6 +954,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_0 = spy(app_0);
|
||||
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||
a.submitApplicationAttempt(app_0, user_0);
|
||||
|
||||
|
@ -934,6 +964,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(1, 0);
|
||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_1 = spy(app_1);
|
||||
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
a.submitApplicationAttempt(app_1, user_0);
|
||||
|
||||
// Setup some nodes
|
||||
|
@ -1068,6 +1101,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_0 = spy(app_0);
|
||||
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||
|
||||
a.submitApplicationAttempt(app_0, user_0);
|
||||
|
@ -1076,6 +1112,9 @@ public class TestReservations {
|
|||
.getMockApplicationAttemptId(1, 0);
|
||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext);
|
||||
app_1 = spy(app_1);
|
||||
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||
any(String.class));
|
||||
a.submitApplicationAttempt(app_1, user_0);
|
||||
|
||||
// Setup some nodes
|
||||
|
|
|
@ -1387,8 +1387,8 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
|||
assertEquals("progress doesn't match", 0, progress, 0.0);
|
||||
WebServicesTestUtils.checkStringMatch("trackingUI", "UNASSIGNED",
|
||||
trackingUI);
|
||||
WebServicesTestUtils.checkStringMatch("diagnostics", app.getDiagnostics()
|
||||
.toString(), diagnostics);
|
||||
WebServicesTestUtils.checkStringEqual("diagnostics",
|
||||
app.getDiagnostics().toString(), diagnostics);
|
||||
assertEquals("clusterId doesn't match",
|
||||
ResourceManager.getClusterTimeStamp(), clusterId);
|
||||
assertEquals("startedTime doesn't match", app.getStartTime(), startedTime);
|
||||
|
|
Loading…
Reference in New Issue