YARN-3946. Update exact reason as to why a submitted app is in ACCEPTED state to app's diagnostic message. (Naganarasimha G R via wangda)
(cherry picked from commit6cb0af3c39
) (cherry picked from commita1a723fdff
) Conflicts: hadoop-yarn-project/CHANGES.txt
This commit is contained in:
parent
4f21f9408e
commit
8ee83765f4
|
@ -542,6 +542,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-4405. Support node label store in non-appendable file system. (Wangda
|
YARN-4405. Support node label store in non-appendable file system. (Wangda
|
||||||
Tan via jianhe)
|
Tan via jianhe)
|
||||||
|
|
||||||
|
YARN-3946. Update exact reason as to why a submitted app is in ACCEPTED state to
|
||||||
|
app's diagnostic message. (Naganarasimha G R via wangda)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -645,7 +645,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
progress = currentAttempt.getProgress();
|
progress = currentAttempt.getProgress();
|
||||||
logAggregationStatus = this.getLogAggregationStatusForAppReport();
|
logAggregationStatus = this.getLogAggregationStatusForAppReport();
|
||||||
}
|
}
|
||||||
diags = this.diagnostics.toString();
|
//if the diagnostics is not already set get it from attempt
|
||||||
|
diags = getDiagnostics().toString();
|
||||||
|
|
||||||
if (currentAttempt != null &&
|
if (currentAttempt != null &&
|
||||||
currentAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
|
currentAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
|
||||||
|
@ -750,8 +751,13 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
@Override
|
@Override
|
||||||
public StringBuilder getDiagnostics() {
|
public StringBuilder getDiagnostics() {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
if (diagnostics.length() == 0 && getCurrentAppAttempt() != null) {
|
||||||
|
String appAttemptDiagnostics = getCurrentAppAttempt().getDiagnostics();
|
||||||
|
if (appAttemptDiagnostics != null) {
|
||||||
|
return new StringBuilder(appAttemptDiagnostics);
|
||||||
|
}
|
||||||
|
}
|
||||||
return this.diagnostics;
|
return this.diagnostics;
|
||||||
} finally {
|
} finally {
|
||||||
this.readLock.unlock();
|
this.readLock.unlock();
|
||||||
|
|
|
@ -246,4 +246,10 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
||||||
* @return the finish time of the application attempt.
|
* @return the finish time of the application attempt.
|
||||||
*/
|
*/
|
||||||
long getFinishTime();
|
long getFinishTime();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To capture Launch diagnostics of the app.
|
||||||
|
* @param amLaunchDiagnostics
|
||||||
|
*/
|
||||||
|
void updateAMLaunchDiagnostics(String amLaunchDiagnostics);
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,11 +73,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventT
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistUpdates;
|
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistUpdates;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
|
@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
|
@ -187,6 +188,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
private ResourceRequest amReq = null;
|
private ResourceRequest amReq = null;
|
||||||
private BlacklistManager blacklistedNodesForAM = null;
|
private BlacklistManager blacklistedNodesForAM = null;
|
||||||
|
|
||||||
|
private String amLaunchDiagnostics;
|
||||||
|
|
||||||
private static final StateMachineFactory<RMAppAttemptImpl,
|
private static final StateMachineFactory<RMAppAttemptImpl,
|
||||||
RMAppAttemptState,
|
RMAppAttemptState,
|
||||||
RMAppAttemptEventType,
|
RMAppAttemptEventType,
|
||||||
|
@ -703,8 +706,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
@Override
|
@Override
|
||||||
public String getDiagnostics() {
|
public String getDiagnostics() {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
if (diagnostics.length() == 0 && amLaunchDiagnostics != null) {
|
||||||
|
return amLaunchDiagnostics;
|
||||||
|
}
|
||||||
return this.diagnostics.toString();
|
return this.diagnostics.toString();
|
||||||
} finally {
|
} finally {
|
||||||
this.readLock.unlock();
|
this.readLock.unlock();
|
||||||
|
@ -1409,6 +1414,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
appAttempt.launchAMStartTime;
|
appAttempt.launchAMStartTime;
|
||||||
ClusterMetrics.getMetrics().addAMLaunchDelay(delay);
|
ClusterMetrics.getMetrics().addAMLaunchDelay(delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
appAttempt
|
||||||
|
.updateAMLaunchDiagnostics(AMState.LAUNCHED.getDiagnosticMessage());
|
||||||
// Register with AMLivelinessMonitor
|
// Register with AMLivelinessMonitor
|
||||||
appAttempt.attemptLaunched();
|
appAttempt.attemptLaunched();
|
||||||
|
|
||||||
|
@ -1507,6 +1515,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
appAttempt.originalTrackingUrl =
|
appAttempt.originalTrackingUrl =
|
||||||
sanitizeTrackingUrl(registrationEvent.getTrackingurl());
|
sanitizeTrackingUrl(registrationEvent.getTrackingurl());
|
||||||
|
|
||||||
|
// reset AMLaunchDiagnostics once AM Registers with RM
|
||||||
|
appAttempt.updateAMLaunchDiagnostics(null);
|
||||||
|
|
||||||
// Let the app know
|
// Let the app know
|
||||||
appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
|
appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
|
||||||
.getAppAttemptId().getApplicationId(),
|
.getAppAttemptId().getApplicationId(),
|
||||||
|
@ -2097,4 +2108,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
this.writeLock.unlock();
|
this.writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateAMLaunchDiagnostics(String amLaunchDiagnostics) {
|
||||||
|
this.amLaunchDiagnostics = amLaunchDiagnostics;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -30,6 +29,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.commons.lang.time.DateUtils;
|
import org.apache.commons.lang.time.DateUtils;
|
||||||
|
import org.apache.commons.lang.time.FastDateFormat;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
@ -85,6 +86,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(SchedulerApplicationAttempt.class);
|
.getLog(SchedulerApplicationAttempt.class);
|
||||||
|
|
||||||
|
private FastDateFormat fdf =
|
||||||
|
FastDateFormat.getInstance("EEE MMM dd HH:mm:ss Z yyyy");
|
||||||
|
|
||||||
private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000;
|
private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000;
|
||||||
protected long lastMemoryAggregateAllocationUpdateTime = 0;
|
protected long lastMemoryAggregateAllocationUpdateTime = 0;
|
||||||
private long lastMemorySeconds = 0;
|
private long lastMemorySeconds = 0;
|
||||||
|
@ -148,10 +152,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
protected Queue queue;
|
protected Queue queue;
|
||||||
protected boolean isStopped = false;
|
protected boolean isStopped = false;
|
||||||
|
|
||||||
private String appAMNodePartitionName = CommonNodeLabelsManager.NO_LABEL;
|
protected String appAMNodePartitionName = CommonNodeLabelsManager.NO_LABEL;
|
||||||
|
|
||||||
protected final RMContext rmContext;
|
protected final RMContext rmContext;
|
||||||
|
|
||||||
|
private RMAppAttempt appAttempt;
|
||||||
|
|
||||||
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
|
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
|
||||||
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
||||||
RMContext rmContext) {
|
RMContext rmContext) {
|
||||||
|
@ -166,9 +172,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
if (rmContext.getRMApps() != null &&
|
if (rmContext.getRMApps() != null &&
|
||||||
rmContext.getRMApps()
|
rmContext.getRMApps()
|
||||||
.containsKey(applicationAttemptId.getApplicationId())) {
|
.containsKey(applicationAttemptId.getApplicationId())) {
|
||||||
|
RMApp rmApp = rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
|
||||||
ApplicationSubmissionContext appSubmissionContext =
|
ApplicationSubmissionContext appSubmissionContext =
|
||||||
rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
|
rmApp
|
||||||
.getApplicationSubmissionContext();
|
.getApplicationSubmissionContext();
|
||||||
|
appAttempt = rmApp.getCurrentAppAttempt();
|
||||||
if (appSubmissionContext != null) {
|
if (appSubmissionContext != null) {
|
||||||
unmanagedAM = appSubmissionContext.getUnmanagedAM();
|
unmanagedAM = appSubmissionContext.getUnmanagedAM();
|
||||||
this.logAggregationContext =
|
this.logAggregationContext =
|
||||||
|
@ -489,7 +497,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
ContainerType containerType = ContainerType.TASK;
|
ContainerType containerType = ContainerType.TASK;
|
||||||
// The working knowledge is that masterContainer for AM is null as it
|
// The working knowledge is that masterContainer for AM is null as it
|
||||||
// itself is the master container.
|
// itself is the master container.
|
||||||
if (isWaitingForAMContainer(getApplicationId())) {
|
if (isWaitingForAMContainer()) {
|
||||||
containerType = ContainerType.APPLICATION_MASTER;
|
containerType = ContainerType.APPLICATION_MASTER;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -575,13 +583,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
return returnList;
|
return returnList;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isWaitingForAMContainer(ApplicationId applicationId) {
|
public boolean isWaitingForAMContainer() {
|
||||||
// The working knowledge is that masterContainer for AM is null as it
|
// The working knowledge is that masterContainer for AM is null as it
|
||||||
// itself is the master container.
|
// itself is the master container.
|
||||||
RMAppAttempt appAttempt =
|
return (!unmanagedAM && appAttempt.getMasterContainer() == null);
|
||||||
rmContext.getRMApps().get(applicationId).getCurrentAppAttempt();
|
|
||||||
return (appAttempt != null && appAttempt.getMasterContainer() == null
|
|
||||||
&& appAttempt.getSubmissionContext().getUnmanagedAM() == false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Blacklist used for user containers
|
// Blacklist used for user containers
|
||||||
|
@ -603,7 +608,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isBlacklisted(String resourceName) {
|
public boolean isBlacklisted(String resourceName) {
|
||||||
boolean useAMBlacklist = isWaitingForAMContainer(getApplicationId());
|
boolean useAMBlacklist = isWaitingForAMContainer();
|
||||||
return this.appSchedulingInfo.isBlacklisted(resourceName, useAMBlacklist);
|
return this.appSchedulingInfo.isBlacklisted(resourceName, useAMBlacklist);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -905,4 +910,71 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
public String getAppAMNodePartitionName() {
|
public String getAppAMNodePartitionName() {
|
||||||
return appAMNodePartitionName;
|
return appAMNodePartitionName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateAMContainerDiagnostics(AMState state,
|
||||||
|
String diagnosticMessage) {
|
||||||
|
if (!isWaitingForAMContainer()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
StringBuilder diagnosticMessageBldr = new StringBuilder();
|
||||||
|
diagnosticMessageBldr.append("[");
|
||||||
|
diagnosticMessageBldr.append(fdf.format(System.currentTimeMillis()));
|
||||||
|
diagnosticMessageBldr.append("] ");
|
||||||
|
switch (state) {
|
||||||
|
case INACTIVATED:
|
||||||
|
diagnosticMessageBldr.append(state.diagnosticMessage);
|
||||||
|
if (diagnosticMessage != null) {
|
||||||
|
diagnosticMessageBldr.append(diagnosticMessage);
|
||||||
|
}
|
||||||
|
getPendingAppDiagnosticMessage(diagnosticMessageBldr);
|
||||||
|
break;
|
||||||
|
case ACTIVATED:
|
||||||
|
diagnosticMessageBldr.append(state.diagnosticMessage);
|
||||||
|
if (diagnosticMessage != null) {
|
||||||
|
diagnosticMessageBldr.append(diagnosticMessage);
|
||||||
|
}
|
||||||
|
getActivedAppDiagnosticMessage(diagnosticMessageBldr);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// UNMANAGED , ASSIGNED
|
||||||
|
diagnosticMessageBldr.append(state.diagnosticMessage);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
appAttempt.updateAMLaunchDiagnostics(diagnosticMessageBldr.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void getPendingAppDiagnosticMessage(
|
||||||
|
StringBuilder diagnosticMessage) {
|
||||||
|
// Give the specific information which might be applicable for the
|
||||||
|
// respective scheduler
|
||||||
|
// like partitionAMResourcelimit,UserAMResourceLimit, queue'AMResourceLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void getActivedAppDiagnosticMessage(
|
||||||
|
StringBuilder diagnosticMessage) {
|
||||||
|
// Give the specific information which might be applicable for the
|
||||||
|
// respective scheduler
|
||||||
|
// queue's resource usage for specific partition
|
||||||
|
}
|
||||||
|
|
||||||
|
public static enum AMState {
|
||||||
|
UNMANAGED("User launched the Application Master, since it's unmanaged. "),
|
||||||
|
INACTIVATED("Application is added to the scheduler and is not yet activated. "),
|
||||||
|
ACTIVATED("Application is Activated, waiting for resources to be assigned for AM. "),
|
||||||
|
ASSIGNED("Scheduler has assigned a container for AM, waiting for AM "
|
||||||
|
+ "container to be launched"),
|
||||||
|
LAUNCHED("AM container is launched, waiting for AM container to Register "
|
||||||
|
+ "with RM")
|
||||||
|
;
|
||||||
|
|
||||||
|
private String diagnosticMessage;
|
||||||
|
|
||||||
|
AMState(String diagnosticMessage) {
|
||||||
|
this.diagnosticMessage = diagnosticMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDiagnosticMessage() {
|
||||||
|
return diagnosticMessage;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* diagnostic messages for AMcontainer launching
|
||||||
|
*/
|
||||||
|
public interface CSAMContainerLaunchDiagnosticsConstants {
|
||||||
|
String SKIP_AM_ALLOCATION_IN_IGNORE_EXCLUSIVE_MODE =
|
||||||
|
"Skipping assigning to Node in Ignore Exclusivity mode. ";
|
||||||
|
String SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE =
|
||||||
|
"Skipped scheduling for this Node as its black listed. ";
|
||||||
|
String SKIP_AM_ALLOCATION_DUE_TO_LOCALITY =
|
||||||
|
"Skipping assigning to Node as request locality is not matching. ";
|
||||||
|
String QUEUE_AM_RESOURCE_LIMIT_EXCEED =
|
||||||
|
"Queue's AM resource limit exceeded. ";
|
||||||
|
String USER_AM_RESOURCE_LIMIT_EXCEED = "User's AM resource limit exceeded. ";
|
||||||
|
String LAST_NODE_PROCESSED_MSG =
|
||||||
|
" Last Node which was processed for the application : ";
|
||||||
|
}
|
|
@ -960,7 +960,7 @@ public class CapacityScheduler extends
|
||||||
updateDemandForQueue = (LeafQueue) application.getQueue();
|
updateDemandForQueue = (LeafQueue) application.getQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (application.isWaitingForAMContainer(application.getApplicationId())) {
|
if (application.isWaitingForAMContainer()) {
|
||||||
// Allocate is for AM and update AM blacklist for this
|
// Allocate is for AM and update AM blacklist for this
|
||||||
application.updateAMBlacklist(
|
application.updateAMBlacklist(
|
||||||
blacklistAdditions, blacklistRemovals);
|
blacklistAdditions, blacklistRemovals);
|
||||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
|
@ -634,7 +635,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
queueUsage.getAMUsed(partitionName));
|
queueUsage.getAMUsed(partitionName));
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("application AMResource "
|
LOG.debug("application "+application.getId() +" AMResource "
|
||||||
+ application.getAMResource(partitionName)
|
+ application.getAMResource(partitionName)
|
||||||
+ " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
|
+ " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
|
||||||
+ " amLimit " + amLimit + " lastClusterResource "
|
+ " amLimit " + amLimit + " lastClusterResource "
|
||||||
|
@ -653,6 +654,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
+ " skipping enforcement to allow at least one application"
|
+ " skipping enforcement to allow at least one application"
|
||||||
+ " to start");
|
+ " to start");
|
||||||
} else {
|
} else {
|
||||||
|
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
|
||||||
LOG.info("Not activating application " + applicationId
|
LOG.info("Not activating application " + applicationId
|
||||||
+ " as amIfStarted: " + amIfStarted + " exceeds amLimit: "
|
+ " as amIfStarted: " + amIfStarted + " exceeds amLimit: "
|
||||||
+ amLimit);
|
+ amLimit);
|
||||||
|
@ -685,6 +688,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
+ " low. skipping enforcement to allow at least one application"
|
+ " low. skipping enforcement to allow at least one application"
|
||||||
+ " to start");
|
+ " to start");
|
||||||
} else {
|
} else {
|
||||||
|
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
|
||||||
LOG.info("Not activating application " + applicationId
|
LOG.info("Not activating application " + applicationId
|
||||||
+ " for user: " + user + " as userAmIfStarted: "
|
+ " for user: " + user + " as userAmIfStarted: "
|
||||||
+ userAmIfStarted + " exceeds userAmLimit: " + userAMLimit);
|
+ userAmIfStarted + " exceeds userAmLimit: " + userAMLimit);
|
||||||
|
@ -693,6 +698,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
user.activateApplication();
|
user.activateApplication();
|
||||||
orderingPolicy.addSchedulableEntity(application);
|
orderingPolicy.addSchedulableEntity(application);
|
||||||
|
application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
|
||||||
|
|
||||||
queueUsage.incAMUsed(partitionName,
|
queueUsage.incAMUsed(partitionName,
|
||||||
application.getAMResource(partitionName));
|
application.getAMResource(partitionName));
|
||||||
user.getResourceUsage().incAMUsed(partitionName,
|
user.getResourceUsage().incAMUsed(partitionName,
|
||||||
|
@ -868,6 +875,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// Check user limit
|
// Check user limit
|
||||||
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
|
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
|
||||||
application, node.getPartition(), currentResourceLimits)) {
|
application, node.getPartition(), currentResourceLimits)) {
|
||||||
|
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
|
||||||
|
"User capacity has reached its maximum limit.");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -903,7 +912,9 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// Done
|
// Done
|
||||||
return assignment;
|
return assignment;
|
||||||
} else if (!assignment.getSkipped()) {
|
} else if (assignment.getSkipped()) {
|
||||||
|
application.updateNodeInfoForAMDiagnostics(node);
|
||||||
|
} else {
|
||||||
// If we don't allocate anything, and it is not skipped by application,
|
// If we don't allocate anything, and it is not skipped by application,
|
||||||
// we will return to respect FIFO of applications
|
// we will return to respect FIFO of applications
|
||||||
return CSAssignment.NULL_ASSIGNMENT;
|
return CSAssignment.NULL_ASSIGNMENT;
|
||||||
|
|
|
@ -28,12 +28,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
@ -79,6 +79,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
||||||
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
||||||
ResourceLimits resourceLimits, Priority priority) {
|
ResourceLimits resourceLimits, Priority priority) {
|
||||||
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
|
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
|
||||||
|
application.updateAppSkipNodeDiagnostics(
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
|
||||||
return ContainerAllocation.APP_SKIPPED;
|
return ContainerAllocation.APP_SKIPPED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,16 +101,14 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
||||||
// AM container allocation doesn't support non-exclusive allocation to
|
// AM container allocation doesn't support non-exclusive allocation to
|
||||||
// avoid painful of preempt an AM container
|
// avoid painful of preempt an AM container
|
||||||
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
|
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
|
||||||
RMAppAttempt rmAppAttempt =
|
if (application.isWaitingForAMContainer()) {
|
||||||
rmContext.getRMApps().get(application.getApplicationId())
|
|
||||||
.getCurrentAppAttempt();
|
|
||||||
if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
|
|
||||||
&& null == rmAppAttempt.getMasterContainer()) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Skip allocating AM container to app_attempt="
|
LOG.debug("Skip allocating AM container to app_attempt="
|
||||||
+ application.getApplicationAttemptId()
|
+ application.getApplicationAttemptId()
|
||||||
+ ", don't allow to allocate AM container in non-exclusive mode");
|
+ ", don't allow to allocate AM container in non-exclusive mode");
|
||||||
}
|
}
|
||||||
|
application.updateAppSkipNodeDiagnostics(
|
||||||
|
"Skipping assigning to Node in Ignore Exclusivity mode. ");
|
||||||
return ContainerAllocation.APP_SKIPPED;
|
return ContainerAllocation.APP_SKIPPED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -318,6 +318,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
||||||
schedulingMode, currentResoureLimits);
|
schedulingMode, currentResoureLimits);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
application.updateAppSkipNodeDiagnostics(
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY);
|
||||||
return ContainerAllocation.APP_SKIPPED;
|
return ContainerAllocation.APP_SKIPPED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -621,6 +623,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
||||||
|
|
||||||
// something went wrong getting/creating the container
|
// something went wrong getting/creating the container
|
||||||
if (container == null) {
|
if (container == null) {
|
||||||
|
application
|
||||||
|
.updateAppSkipNodeDiagnostics("Scheduling of container failed. ");
|
||||||
LOG.warn("Couldn't get container for allocation!");
|
LOG.warn("Couldn't get container for allocation!");
|
||||||
return ContainerAllocation.APP_SKIPPED;
|
return ContainerAllocation.APP_SKIPPED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
@ -54,13 +55,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -87,6 +90,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
|
|
||||||
private AbstractContainerAllocator containerAllocator;
|
private AbstractContainerAllocator containerAllocator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* to hold the message if its app doesn't not get container from a node
|
||||||
|
*/
|
||||||
|
private String appSkipNodeDiagnostics;
|
||||||
|
|
||||||
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
|
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
|
||||||
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
||||||
RMContext rmContext) {
|
RMContext rmContext) {
|
||||||
|
@ -187,6 +195,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
|
node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
|
||||||
request.getNodeLabelExpression());
|
request.getNodeLabelExpression());
|
||||||
|
|
||||||
|
updateAMContainerDiagnostics(AMState.ASSIGNED, null);
|
||||||
|
|
||||||
// Add it to allContainers list.
|
// Add it to allContainers list.
|
||||||
newlyAllocatedContainers.add(rmContainer);
|
newlyAllocatedContainers.add(rmContainer);
|
||||||
liveContainers.put(container.getId(), rmContainer);
|
liveContainers.put(container.getId(), rmContainer);
|
||||||
|
@ -502,4 +512,85 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
getCSLeafQueue().incAMUsedResource(newPartition, containerResource, this);
|
getCSLeafQueue().incAMUsedResource(newPartition, containerResource, this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void getPendingAppDiagnosticMessage(
|
||||||
|
StringBuilder diagnosticMessage) {
|
||||||
|
LeafQueue queue = getCSLeafQueue();
|
||||||
|
diagnosticMessage.append(" Details : AM Partition = ");
|
||||||
|
diagnosticMessage.append(appAMNodePartitionName.isEmpty()
|
||||||
|
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appAMNodePartitionName);
|
||||||
|
diagnosticMessage.append("; ");
|
||||||
|
diagnosticMessage.append("AM Resource Request = ");
|
||||||
|
diagnosticMessage.append(getAMResource(appAMNodePartitionName));
|
||||||
|
diagnosticMessage.append("; ");
|
||||||
|
diagnosticMessage.append("Queue Resource Limit for AM = ");
|
||||||
|
diagnosticMessage
|
||||||
|
.append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName));
|
||||||
|
diagnosticMessage.append("; ");
|
||||||
|
diagnosticMessage.append("User AM Resource Limit of the queue = ");
|
||||||
|
diagnosticMessage.append(
|
||||||
|
queue.getUserAMResourceLimitPerPartition(appAMNodePartitionName));
|
||||||
|
diagnosticMessage.append("; ");
|
||||||
|
diagnosticMessage.append("Queue AM Resource Usage = ");
|
||||||
|
diagnosticMessage.append(
|
||||||
|
queue.getQueueResourceUsage().getAMUsed(appAMNodePartitionName));
|
||||||
|
diagnosticMessage.append("; ");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void getActivedAppDiagnosticMessage(
|
||||||
|
StringBuilder diagnosticMessage) {
|
||||||
|
LeafQueue queue = getCSLeafQueue();
|
||||||
|
QueueCapacities queueCapacities = queue.getQueueCapacities();
|
||||||
|
diagnosticMessage.append(" Details : AM Partition = ");
|
||||||
|
diagnosticMessage.append(appAMNodePartitionName.isEmpty()
|
||||||
|
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appAMNodePartitionName);
|
||||||
|
diagnosticMessage.append(" ; ");
|
||||||
|
diagnosticMessage.append("Partition Resource = ");
|
||||||
|
diagnosticMessage.append(rmContext.getNodeLabelManager()
|
||||||
|
.getResourceByLabel(appAMNodePartitionName, Resources.none()));
|
||||||
|
diagnosticMessage.append(" ; ");
|
||||||
|
diagnosticMessage.append("Queue's Absolute capacity = ");
|
||||||
|
diagnosticMessage.append(
|
||||||
|
queueCapacities.getAbsoluteCapacity(appAMNodePartitionName) * 100);
|
||||||
|
diagnosticMessage.append(" % ; ");
|
||||||
|
diagnosticMessage.append("Queue's Absolute used capacity = ");
|
||||||
|
diagnosticMessage.append(
|
||||||
|
queueCapacities.getAbsoluteUsedCapacity(appAMNodePartitionName) * 100);
|
||||||
|
diagnosticMessage.append(" % ; ");
|
||||||
|
diagnosticMessage.append("Queue's Absolute max capacity = ");
|
||||||
|
diagnosticMessage.append(
|
||||||
|
queueCapacities.getAbsoluteMaximumCapacity(appAMNodePartitionName)
|
||||||
|
* 100);
|
||||||
|
diagnosticMessage.append(" % ; ");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the message temporarily if the reason is known for why scheduling did
|
||||||
|
* not happen for a given node, if not message will be over written
|
||||||
|
* @param message
|
||||||
|
*/
|
||||||
|
public void updateAppSkipNodeDiagnostics(String message) {
|
||||||
|
this.appSkipNodeDiagnostics = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) {
|
||||||
|
if (isWaitingForAMContainer()) {
|
||||||
|
StringBuilder diagnosticMessageBldr = new StringBuilder();
|
||||||
|
if (appSkipNodeDiagnostics != null) {
|
||||||
|
diagnosticMessageBldr.append(appSkipNodeDiagnostics);
|
||||||
|
appSkipNodeDiagnostics = null;
|
||||||
|
}
|
||||||
|
diagnosticMessageBldr.append(
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG);
|
||||||
|
diagnosticMessageBldr.append(node.getNodeID());
|
||||||
|
diagnosticMessageBldr.append(" ( Partition : ");
|
||||||
|
diagnosticMessageBldr.append(node.getLabels());
|
||||||
|
diagnosticMessageBldr.append(", Total resource : ");
|
||||||
|
diagnosticMessageBldr.append(node.getTotalResource());
|
||||||
|
diagnosticMessageBldr.append(", Available resource : ");
|
||||||
|
diagnosticMessageBldr.append(node.getAvailableResource());
|
||||||
|
diagnosticMessageBldr.append(" ).");
|
||||||
|
updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,9 +75,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
|
@ -1008,7 +1008,7 @@ public class FairScheduler extends
|
||||||
preemptionContainerIds.add(container.getContainerId());
|
preemptionContainerIds.add(container.getContainerId());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (application.isWaitingForAMContainer(application.getApplicationId())) {
|
if (application.isWaitingForAMContainer()) {
|
||||||
// Allocate is for AM and update AM blacklist for this
|
// Allocate is for AM and update AM blacklist for this
|
||||||
application.updateAMBlacklist(
|
application.updateAMBlacklist(
|
||||||
blacklistAdditions, blacklistRemovals);
|
blacklistAdditions, blacklistRemovals);
|
||||||
|
|
|
@ -359,7 +359,7 @@ public class FifoScheduler extends
|
||||||
" #ask=" + ask.size());
|
" #ask=" + ask.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (application.isWaitingForAMContainer(application.getApplicationId())) {
|
if (application.isWaitingForAMContainer()) {
|
||||||
// Allocate is for AM and update AM blacklist for this
|
// Allocate is for AM and update AM blacklist for this
|
||||||
application.updateAMBlacklist(
|
application.updateAMBlacklist(
|
||||||
blacklistAdditions, blacklistRemovals);
|
blacklistAdditions, blacklistRemovals);
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
@ -80,9 +81,12 @@ public class TestCapacitySchedulerPlanFollower extends
|
||||||
ConcurrentMap<ApplicationId, RMApp> spyApps =
|
ConcurrentMap<ApplicationId, RMApp> spyApps =
|
||||||
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
|
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
|
||||||
RMApp rmApp = mock(RMApp.class);
|
RMApp rmApp = mock(RMApp.class);
|
||||||
|
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
|
||||||
when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
|
when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
|
||||||
.thenReturn(null);
|
.thenReturn(rmAppAttempt);
|
||||||
|
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||||
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
|
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
|
||||||
|
Mockito.doReturn(true).when(spyApps).containsKey((ApplicationId) Matchers.any());
|
||||||
when(spyRMContext.getRMApps()).thenReturn(spyApps);
|
when(spyRMContext.getRMApps()).thenReturn(spyApps);
|
||||||
when(spyRMContext.getScheduler()).thenReturn(scheduler);
|
when(spyRMContext.getScheduler()).thenReturn(scheduler);
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
@ -602,7 +603,13 @@ public class TestApplicationLimits {
|
||||||
when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
|
when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
|
||||||
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
|
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
|
||||||
when(spyRMContext.getRMApps()).thenReturn(spyApps);
|
when(spyRMContext.getRMApps()).thenReturn(spyApps);
|
||||||
|
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
|
||||||
|
when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
|
||||||
|
.thenReturn(rmAppAttempt);
|
||||||
|
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||||
|
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
|
||||||
|
Mockito.doReturn(true).when(spyApps)
|
||||||
|
.containsKey((ApplicationId) Matchers.any());
|
||||||
|
|
||||||
Priority priority_1 = TestUtils.createMockPriority(1);
|
Priority priority_1 = TestUtils.createMockPriority(1);
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabels
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -120,15 +121,13 @@ public class TestApplicationLimitsByPartition {
|
||||||
|
|
||||||
// Submit app1 with 1Gb AM resource to Queue A1 for label X
|
// Submit app1 with 1Gb AM resource to Queue A1 for label X
|
||||||
RMApp app1 = rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
RMApp app1 = rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
||||||
MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
||||||
|
|
||||||
// Submit app2 with 1Gb AM resource to Queue A1 for label X
|
// Submit app2 with 1Gb AM resource to Queue A1 for label X
|
||||||
RMApp app2 = rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
RMApp app2 = rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
||||||
MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
|
||||||
|
|
||||||
// Submit 3rd app to Queue A1 for label X, and this will be pending as
|
// Submit 3rd app to Queue A1 for label X, and this will be pending as
|
||||||
// AM limit is already crossed for label X. (2GB)
|
// AM limit is already crossed for label X. (2GB)
|
||||||
rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
RMApp pendingApp = rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
||||||
|
|
||||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
||||||
|
@ -138,6 +137,16 @@ public class TestApplicationLimitsByPartition {
|
||||||
// pending.
|
// pending.
|
||||||
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
|
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
|
||||||
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly", app1.getDiagnostics()
|
||||||
|
.toString().contains(AMState.ACTIVATED.getDiagnosticMessage()));
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly", app2.getDiagnostics()
|
||||||
|
.toString().contains(AMState.ACTIVATED.getDiagnosticMessage()));
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly",
|
||||||
|
pendingApp.getDiagnostics().toString()
|
||||||
|
.contains(AMState.INACTIVATED.getDiagnosticMessage()));
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly",
|
||||||
|
pendingApp.getDiagnostics().toString().contains(
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED));
|
||||||
|
|
||||||
// Now verify the same test case in Queue C1 where label is not configured.
|
// Now verify the same test case in Queue C1 where label is not configured.
|
||||||
// Submit an app to Queue C1 with empty label
|
// Submit an app to Queue C1 with empty label
|
||||||
|
@ -150,7 +159,7 @@ public class TestApplicationLimitsByPartition {
|
||||||
|
|
||||||
// Submit 3rd app to Queue C1. This will be pending as Queue's am-limit
|
// Submit 3rd app to Queue C1. This will be pending as Queue's am-limit
|
||||||
// is reached.
|
// is reached.
|
||||||
rm1.submitApp(GB, "app", "user", null, "c1");
|
pendingApp = rm1.submitApp(GB, "app", "user", null, "c1");
|
||||||
|
|
||||||
leafQueue = (LeafQueue) cs.getQueue("c1");
|
leafQueue = (LeafQueue) cs.getQueue("c1");
|
||||||
Assert.assertNotNull(leafQueue);
|
Assert.assertNotNull(leafQueue);
|
||||||
|
@ -159,6 +168,12 @@ public class TestApplicationLimitsByPartition {
|
||||||
// is reached.
|
// is reached.
|
||||||
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
|
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
|
||||||
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly",
|
||||||
|
pendingApp.getDiagnostics().toString()
|
||||||
|
.contains(AMState.INACTIVATED.getDiagnosticMessage()));
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly",
|
||||||
|
pendingApp.getDiagnostics().toString().contains(
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED));
|
||||||
|
|
||||||
rm1.killApp(app3.getApplicationId());
|
rm1.killApp(app3.getApplicationId());
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
@ -288,11 +303,10 @@ public class TestApplicationLimitsByPartition {
|
||||||
|
|
||||||
// Submit app1 (2 GB) to Queue A1 and label X
|
// Submit app1 (2 GB) to Queue A1 and label X
|
||||||
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "x");
|
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "x");
|
||||||
MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
||||||
|
|
||||||
// Submit 2nd app to label "X" with one GB. Since queue am-limit is 2GB,
|
// Submit 2nd app to label "X" with one GB. Since queue am-limit is 2GB,
|
||||||
// 2nd app will be pending and first one will get activated.
|
// 2nd app will be pending and first one will get activated.
|
||||||
rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
RMApp pendingApp = rm1.submitApp(GB, "app", "user", null, "a1", "x");
|
||||||
|
|
||||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
||||||
|
@ -302,6 +316,14 @@ public class TestApplicationLimitsByPartition {
|
||||||
// used for partition "x" also.
|
// used for partition "x" also.
|
||||||
Assert.assertEquals(1, leafQueue.getNumActiveApplications());
|
Assert.assertEquals(1, leafQueue.getNumActiveApplications());
|
||||||
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly", app1.getDiagnostics()
|
||||||
|
.toString().contains(AMState.ACTIVATED.getDiagnosticMessage()));
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly",
|
||||||
|
pendingApp.getDiagnostics().toString()
|
||||||
|
.contains(AMState.INACTIVATED.getDiagnosticMessage()));
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly",
|
||||||
|
pendingApp.getDiagnostics().toString()
|
||||||
|
.contains(CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED));
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,7 +403,7 @@ public class TestApplicationLimitsByPartition {
|
||||||
// 4Gb -> 40% of label "X" in queue A1
|
// 4Gb -> 40% of label "X" in queue A1
|
||||||
// Since we have 2 users, 50% of 4Gb will be max for each user. Here user1
|
// Since we have 2 users, 50% of 4Gb will be max for each user. Here user1
|
||||||
// has already crossed this 2GB limit, hence this app will be pending.
|
// has already crossed this 2GB limit, hence this app will be pending.
|
||||||
rm1.submitApp(GB, "app", user_1, null, "a1", "x");
|
RMApp pendingApp = rm1.submitApp(GB, "app", user_1, null, "a1", "x");
|
||||||
|
|
||||||
// Verify active applications count per user and also in queue level.
|
// Verify active applications count per user and also in queue level.
|
||||||
Assert.assertEquals(3, leafQueue.getNumActiveApplications());
|
Assert.assertEquals(3, leafQueue.getNumActiveApplications());
|
||||||
|
@ -389,6 +411,14 @@ public class TestApplicationLimitsByPartition {
|
||||||
Assert.assertEquals(2, leafQueue.getNumActiveApplications(user_1));
|
Assert.assertEquals(2, leafQueue.getNumActiveApplications(user_1));
|
||||||
Assert.assertEquals(1, leafQueue.getNumPendingApplications(user_1));
|
Assert.assertEquals(1, leafQueue.getNumPendingApplications(user_1));
|
||||||
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
|
||||||
|
|
||||||
|
//verify Diagnostic messages
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly",
|
||||||
|
pendingApp.getDiagnostics().toString()
|
||||||
|
.contains(AMState.INACTIVATED.getDiagnosticMessage()));
|
||||||
|
Assert.assertTrue("AM diagnostics not set properly",
|
||||||
|
pendingApp.getDiagnostics().toString().contains(
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED));
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1028,10 +1028,11 @@ public class TestNodeLabelContainerAllocation {
|
||||||
|
|
||||||
rm1.getRMContext().setNodeLabelManager(mgr);
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
|
String nodeIdStr = "h1:1234";
|
||||||
|
MockNM nm1 = rm1.registerNode(nodeIdStr, 8 * GB); // label = x
|
||||||
|
|
||||||
// launch an app to queue b1 (label = y), AM container should be launched in nm3
|
// launch an app to queue b1 (label = y), AM container should be launched in nm3
|
||||||
rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
RMApp app = rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
||||||
|
|
||||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
@ -1041,6 +1042,16 @@ public class TestNodeLabelContainerAllocation {
|
||||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Scheduler diagnostics should have reason for not assigning the node",
|
||||||
|
app.getDiagnostics().toString().contains(
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_IGNORE_EXCLUSIVE_MODE));
|
||||||
|
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Scheduler diagnostics should have last processed node information",
|
||||||
|
app.getDiagnostics().toString().contains(
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG
|
||||||
|
+ nodeIdStr + " ( Partition : [x]"));
|
||||||
Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
|
Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
|
||||||
.getNumContainers());
|
.getNumContainers());
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
|
@ -65,6 +67,7 @@ import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
public class TestReservations {
|
public class TestReservations {
|
||||||
|
|
||||||
|
@ -188,6 +191,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(0, 0);
|
.getMockApplicationAttemptId(0, 0);
|
||||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_0 = spy(app_0);
|
||||||
|
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||||
|
|
||||||
a.submitApplicationAttempt(app_0, user_0);
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
@ -196,6 +202,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(1, 0);
|
.getMockApplicationAttemptId(1, 0);
|
||||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_1 = spy(app_1);
|
||||||
|
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
a.submitApplicationAttempt(app_1, user_0);
|
a.submitApplicationAttempt(app_1, user_0);
|
||||||
|
|
||||||
// Setup some nodes
|
// Setup some nodes
|
||||||
|
@ -348,6 +357,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(0, 0);
|
.getMockApplicationAttemptId(0, 0);
|
||||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_0 = spy(app_0);
|
||||||
|
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||||
|
|
||||||
a.submitApplicationAttempt(app_0, user_0);
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
@ -356,6 +368,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(1, 0);
|
.getMockApplicationAttemptId(1, 0);
|
||||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_1 = spy(app_1);
|
||||||
|
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
a.submitApplicationAttempt(app_1, user_0);
|
a.submitApplicationAttempt(app_1, user_0);
|
||||||
|
|
||||||
// Setup some nodes
|
// Setup some nodes
|
||||||
|
@ -502,6 +517,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(0, 0);
|
.getMockApplicationAttemptId(0, 0);
|
||||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_0 = spy(app_0);
|
||||||
|
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||||
|
|
||||||
a.submitApplicationAttempt(app_0, user_0);
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
@ -510,6 +528,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(1, 0);
|
.getMockApplicationAttemptId(1, 0);
|
||||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_1 = spy(app_1);
|
||||||
|
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
a.submitApplicationAttempt(app_1, user_0);
|
a.submitApplicationAttempt(app_1, user_0);
|
||||||
|
|
||||||
// Setup some nodes
|
// Setup some nodes
|
||||||
|
@ -758,6 +779,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(0, 0);
|
.getMockApplicationAttemptId(0, 0);
|
||||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_0 = spy(app_0);
|
||||||
|
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||||
|
|
||||||
a.submitApplicationAttempt(app_0, user_0);
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
@ -766,6 +790,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(1, 0);
|
.getMockApplicationAttemptId(1, 0);
|
||||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_1 = spy(app_1);
|
||||||
|
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
a.submitApplicationAttempt(app_1, user_0);
|
a.submitApplicationAttempt(app_1, user_0);
|
||||||
|
|
||||||
// Setup some nodes
|
// Setup some nodes
|
||||||
|
@ -927,6 +954,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(0, 0);
|
.getMockApplicationAttemptId(0, 0);
|
||||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_0 = spy(app_0);
|
||||||
|
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||||
a.submitApplicationAttempt(app_0, user_0);
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
|
||||||
|
@ -934,6 +964,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(1, 0);
|
.getMockApplicationAttemptId(1, 0);
|
||||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_1 = spy(app_1);
|
||||||
|
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
a.submitApplicationAttempt(app_1, user_0);
|
a.submitApplicationAttempt(app_1, user_0);
|
||||||
|
|
||||||
// Setup some nodes
|
// Setup some nodes
|
||||||
|
@ -1068,6 +1101,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(0, 0);
|
.getMockApplicationAttemptId(0, 0);
|
||||||
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_0 = spy(app_0);
|
||||||
|
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||||
|
|
||||||
a.submitApplicationAttempt(app_0, user_0);
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
@ -1076,6 +1112,9 @@ public class TestReservations {
|
||||||
.getMockApplicationAttemptId(1, 0);
|
.getMockApplicationAttemptId(1, 0);
|
||||||
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext);
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_1 = spy(app_1);
|
||||||
|
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
a.submitApplicationAttempt(app_1, user_0);
|
a.submitApplicationAttempt(app_1, user_0);
|
||||||
|
|
||||||
// Setup some nodes
|
// Setup some nodes
|
||||||
|
|
|
@ -1387,8 +1387,8 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
assertEquals("progress doesn't match", 0, progress, 0.0);
|
assertEquals("progress doesn't match", 0, progress, 0.0);
|
||||||
WebServicesTestUtils.checkStringMatch("trackingUI", "UNASSIGNED",
|
WebServicesTestUtils.checkStringMatch("trackingUI", "UNASSIGNED",
|
||||||
trackingUI);
|
trackingUI);
|
||||||
WebServicesTestUtils.checkStringMatch("diagnostics", app.getDiagnostics()
|
WebServicesTestUtils.checkStringEqual("diagnostics",
|
||||||
.toString(), diagnostics);
|
app.getDiagnostics().toString(), diagnostics);
|
||||||
assertEquals("clusterId doesn't match",
|
assertEquals("clusterId doesn't match",
|
||||||
ResourceManager.getClusterTimeStamp(), clusterId);
|
ResourceManager.getClusterTimeStamp(), clusterId);
|
||||||
assertEquals("startedTime doesn't match", app.getStartTime(), startedTime);
|
assertEquals("startedTime doesn't match", app.getStartTime(), startedTime);
|
||||||
|
|
Loading…
Reference in New Issue