YARN-9440. Improve diagnostics for scheduler and app activities. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2019-05-06 20:00:15 +08:00
parent 1d70c8ca0f
commit 12b7059ddc
30 changed files with 1477 additions and 275 deletions

View File

@ -17,18 +17,25 @@
*/
package org.apache.hadoop.yarn.util.resource;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import java.util.Set;
@Private
@Unstable
public class DefaultResourceCalculator extends ResourceCalculator {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultResourceCalculator.class);
private static final Set<String> INSUFFICIENT_RESOURCE_NAME =
ImmutableSet.of(ResourceInformation.MEMORY_URI);
@Override
public int compare(Resource unused, Resource lhs, Resource rhs,
boolean singleType) {
@ -150,4 +157,13 @@ public class DefaultResourceCalculator extends ResourceCalculator {
public boolean isAnyMajorResourceAboveZero(Resource resource) {
return resource.getMemorySize() > 0;
}
public Set<String> getInsufficientResourceNames(Resource required,
Resource available) {
if (required.getMemorySize() > available.getMemorySize()) {
return INSUFFICIENT_RESOURCE_NAME;
} else {
return ImmutableSet.of();
}
}
}

View File

@ -28,6 +28,9 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* A {@link ResourceCalculator} which uses the concept of
@ -588,4 +591,15 @@ public class DominantResourceCalculator extends ResourceCalculator {
}
return false;
}
@Override
public Set<String> getInsufficientResourceNames(Resource required,
Resource available) {
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
return IntStream.range(0, maxLength).filter(
i -> required.getResourceInformation(i).getValue() > available
.getResourceInformation(i).getValue())
.mapToObj(i -> ResourceUtils.getResourceTypesArray()[i].getName())
.collect(Collectors.toSet());
}
}

View File

@ -21,6 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import java.util.Set;
/**
* A set of {@link Resource} comparison and manipulation interfaces.
*/
@ -284,4 +286,15 @@ public abstract class ResourceCalculator {
* @return returns true if any resource is {@literal >} 0
*/
public abstract boolean isAnyMajorResourceAboveZero(Resource resource);
/**
* Get insufficient resource names via comparing required resource and
* capacity resource.
*
* @param required - required resource
* @param available - available resource
* @return insufficient resource names
*/
public abstract Set<String> getInsufficientResourceNames(Resource required,
Resource available);
}

View File

@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.util.resource;
import java.util.Arrays;
import java.util.Collection;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Before;
@ -406,4 +408,121 @@ public class TestResourceCalculator {
0));
assertEquals(0.0, ratio, 0.00001);
}
@Test
public void testFitsInDiagnosticsCollector() {
if (resourceCalculator instanceof DefaultResourceCalculator) {
// required-resource = (0, 0)
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 0),
newResource(0, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 0),
newResource(0, 1)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 0),
newResource(1, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 0),
newResource(1, 1)));
// required-resource = (0, 1)
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 1),
newResource(0, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 1),
newResource(0, 1)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 1),
newResource(1, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 1),
newResource(1, 1)));
// required-resource = (1, 0)
assertEquals(ImmutableSet.of(ResourceInformation.MEMORY_URI),
resourceCalculator.getInsufficientResourceNames(newResource(1, 0),
newResource(0, 0)));
assertEquals(ImmutableSet.of(ResourceInformation.MEMORY_URI),
resourceCalculator.getInsufficientResourceNames(newResource(1, 0),
newResource(0, 1)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(1, 0),
newResource(1, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(1, 0),
newResource(1, 1)));
// required-resource = (1, 1)
assertEquals(ImmutableSet.of(ResourceInformation.MEMORY_URI),
resourceCalculator.getInsufficientResourceNames(newResource(1, 1),
newResource(0, 0)));
assertEquals(ImmutableSet.of(ResourceInformation.MEMORY_URI),
resourceCalculator.getInsufficientResourceNames(newResource(1, 1),
newResource(0, 1)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(1, 1),
newResource(1, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(1, 1),
newResource(1, 1)));
} else if (resourceCalculator instanceof DominantResourceCalculator) {
// required-resource = (0, 0)
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 0),
newResource(0, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 0),
newResource(0, 1)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 0),
newResource(1, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 0),
newResource(1, 1)));
// required-resource = (0, 1)
assertEquals(ImmutableSet.of(ResourceInformation.VCORES_URI),
resourceCalculator.getInsufficientResourceNames(newResource(0, 1),
newResource(0, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 1),
newResource(0, 1)));
assertEquals(ImmutableSet.of(ResourceInformation.VCORES_URI),
resourceCalculator.getInsufficientResourceNames(newResource(0, 1),
newResource(1, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(0, 1),
newResource(1, 1)));
// required-resource = (1, 0)
assertEquals(ImmutableSet.of(ResourceInformation.MEMORY_URI),
resourceCalculator.getInsufficientResourceNames(newResource(1, 0),
newResource(0, 0)));
assertEquals(ImmutableSet.of(ResourceInformation.MEMORY_URI),
resourceCalculator.getInsufficientResourceNames(newResource(1, 0),
newResource(0, 1)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(1, 0),
newResource(1, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(1, 0),
newResource(1, 1)));
// required-resource = (1, 1)
assertEquals(ImmutableSet.of(ResourceInformation.MEMORY_URI,
ResourceInformation.VCORES_URI), resourceCalculator
.getInsufficientResourceNames(newResource(1, 1), newResource(0, 0)));
assertEquals(ImmutableSet.of(ResourceInformation.MEMORY_URI),
resourceCalculator.getInsufficientResourceNames(newResource(1, 1),
newResource(0, 1)));
assertEquals(ImmutableSet.of(ResourceInformation.VCORES_URI),
resourceCalculator.getInsufficientResourceNames(newResource(1, 1),
newResource(1, 0)));
assertEquals(ImmutableSet.of(),
resourceCalculator.getInsufficientResourceNames(newResource(1, 1),
newResource(1, 1)));
}
}
}

View File

@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
@ -765,16 +767,18 @@ public class AppSchedulingInfo {
* @param schedulerKey schedulerKey
* @param schedulerNode schedulerNode
* @param schedulingMode schedulingMode
* @param dcOpt optional diagnostics collector
* @return can use the node or not.
*/
public boolean precheckNode(SchedulerRequestKey schedulerKey,
SchedulerNode schedulerNode, SchedulingMode schedulingMode) {
SchedulerNode schedulerNode, SchedulingMode schedulingMode,
Optional<DiagnosticsCollector> dcOpt) {
this.readLock.lock();
try {
AppPlacementAllocator ap =
schedulerKeyToAppPlacementAllocator.get(schedulerKey);
return (ap != null) && ap.precheckNode(schedulerNode,
schedulingMode);
schedulingMode, dcOpt);
} finally {
this.readLock.unlock();
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
/**
* Utility for logging scheduler activities
@ -49,10 +50,11 @@ public class ActivitiesLogger {
*/
public static void recordSkippedAppActivityWithoutAllocation(
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
SchedulerApplicationAttempt application,
SchedulerRequestKey requestKey,
String diagnostic) {
recordAppActivityWithoutAllocation(activitiesManager, node, application,
priority, diagnostic, ActivityState.SKIPPED);
requestKey, diagnostic, ActivityState.SKIPPED);
}
/*
@ -83,25 +85,39 @@ public class ActivitiesLogger {
*/
public static void recordAppActivityWithoutAllocation(
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
SchedulerApplicationAttempt application,
SchedulerRequestKey schedulerKey,
String diagnostic, ActivityState appState) {
if (activitiesManager == null) {
return;
}
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (activitiesManager.shouldRecordThisNode(nodeId)) {
if (schedulerKey != null) {
String allocationRequestId =
String.valueOf(schedulerKey.getAllocationRequestId());
String priorityStr = getPriorityStr(schedulerKey);
String requestName = getRequestName(priorityStr, allocationRequestId);
String type = "container";
// Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getApplicationId().toString(), null,
priority.toString(), ActivityState.SKIPPED, diagnostic, type);
type = "app";
requestName, null,
priorityStr, ActivityState.SKIPPED, diagnostic, type,
null);
type = "request";
// Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getApplicationId().toString(), requestName,
priorityStr, ActivityState.SKIPPED,
ActivityDiagnosticConstant.EMPTY, type, allocationRequestId);
}
// Add queue-application activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.EMPTY, type);
schedulerKey != null ? ActivityDiagnosticConstant.EMPTY :
diagnostic, "app", null);
}
// Add application-container activity into specific application allocation
// Under this condition, it fails to allocate a container to this
@ -110,8 +126,11 @@ public class ActivitiesLogger {
application.getApplicationId())) {
String type = "container";
activitiesManager.addSchedulingActivityForApp(
application.getApplicationId(), null, priority.toString(), appState,
diagnostic, type);
application.getApplicationId(), null,
getPriorityStr(schedulerKey), appState,
diagnostic, type, nodeId,
schedulerKey == null ?
null : String.valueOf(schedulerKey.getAllocationRequestId()));
}
}
@ -127,21 +146,38 @@ public class ActivitiesLogger {
return;
}
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (nodeId == null || nodeId == ActivitiesManager.EMPTY_NODE_ID) {
nodeId = updatedContainer.getNodeId();
}
if (activitiesManager.shouldRecordThisNode(nodeId)) {
String containerPriorityStr =
updatedContainer.getContainer().getPriority().toString();
String allocationRequestId = String
.valueOf(updatedContainer.getContainer().getAllocationRequestId());
String requestName =
getRequestName(containerPriorityStr, allocationRequestId);
String type = "container";
// Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(nodeId,
requestName,
updatedContainer.getContainer().toString(),
containerPriorityStr,
activityState, ActivityDiagnosticConstant.EMPTY, type, null);
type = "request";
// Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getApplicationId().toString(),
updatedContainer.getContainer().toString(),
updatedContainer.getContainer().getPriority().toString(),
activityState, ActivityDiagnosticConstant.EMPTY, type);
requestName, containerPriorityStr,
activityState, ActivityDiagnosticConstant.EMPTY, type,
allocationRequestId);
type = "app";
// Add queue-application activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.ACCEPTED,
ActivityDiagnosticConstant.EMPTY, type);
ActivityDiagnosticConstant.EMPTY, type, null);
}
// Add application-container activity into specific application allocation
if (activitiesManager.shouldRecordThisApp(
@ -151,7 +187,9 @@ public class ActivitiesLogger {
application.getApplicationId(),
updatedContainer.getContainerId(),
updatedContainer.getContainer().getPriority().toString(),
activityState, ActivityDiagnosticConstant.EMPTY, type);
activityState, ActivityDiagnosticConstant.EMPTY, type, nodeId,
String.valueOf(
updatedContainer.getContainer().getAllocationRequestId()));
}
}
@ -286,7 +324,7 @@ public class ActivitiesLogger {
Priority priority, ActivityState state, String diagnostic, String type) {
activitiesManager.addSchedulingActivityForNode(nodeId, parentName,
childName, priority != null ? priority.toString() : null, state,
diagnostic, type);
diagnostic, type, null);
}
private static NodeId getRecordingNodeId(ActivitiesManager activitiesManager,
@ -294,4 +332,17 @@ public class ActivitiesLogger {
return activitiesManager == null ? null :
activitiesManager.getRecordingNodeId(node);
}
private static String getRequestName(String priority,
String allocationRequestId) {
return "request_"
+ (priority == null ? "" : priority)
+ "_" + (allocationRequestId == null ? "" : allocationRequestId);
}
private static String getPriorityStr(SchedulerRequestKey schedulerKey) {
Priority priority = schedulerKey == null ?
null : schedulerKey.getPriority();
return priority == null ? null : priority.toString();
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.service.AbstractService;
@ -52,6 +54,8 @@ public class ActivitiesManager extends AbstractService {
// An empty node ID, we use this variable as a placeholder
// in the activity records when recording multiple nodes assignments.
public static final NodeId EMPTY_NODE_ID = NodeId.newInstance("", 0);
public static final String DIAGNOSTICS_DETAILS_SEPARATOR = "\n";
public static final String EMPTY_DIAGNOSTICS = "";
private ThreadLocal<Map<NodeId, List<NodeAllocation>>>
recordingNodesAllocation;
@VisibleForTesting
@ -69,6 +73,7 @@ public class ActivitiesManager extends AbstractService {
private int timeThreshold = 600 * 1000;
private final RMContext rmContext;
private volatile boolean stopped;
private ThreadLocal<DiagnosticsCollectorManager> diagnosticCollectorManager;
public ActivitiesManager(RMContext rmContext) {
super(ActivitiesManager.class.getName());
@ -78,6 +83,9 @@ public class ActivitiesManager extends AbstractService {
completedAppAllocations = new ConcurrentHashMap<>();
activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>();
diagnosticCollectorManager = ThreadLocal.withInitial(
() -> new DiagnosticsCollectorManager(
new GenericDiagnosticsCollector()));
this.rmContext = rmContext;
}
@ -191,6 +199,8 @@ public class ActivitiesManager extends AbstractService {
if (activeRecordedNodes.remove(nodeID)) {
List<NodeAllocation> nodeAllocation = new ArrayList<>();
recordingNodesAllocation.get().put(nodeID, nodeAllocation);
// enable diagnostic collector
diagnosticCollectorManager.get().enable();
}
}
@ -205,6 +215,8 @@ public class ActivitiesManager extends AbstractService {
appsAllocation.get().put(applicationId,
new AppAllocation(application.getPriority(), nodeID,
application.getQueueName()));
// enable diagnostic collector
diagnosticCollectorManager.get().enable();
} else {
turnOffActivityMonitoringForApp(applicationId);
}
@ -214,11 +226,11 @@ public class ActivitiesManager extends AbstractService {
// Add queue, application or container activity into specific node allocation.
void addSchedulingActivityForNode(NodeId nodeId, String parentName,
String childName, String priority, ActivityState state, String diagnostic,
String type) {
String type, String allocationRequestId) {
if (shouldRecordThisNode(nodeId)) {
NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeId);
nodeAllocation.addAllocationActivity(parentName, childName, priority,
state, diagnostic, type);
state, diagnostic, type, nodeId, allocationRequestId);
}
}
@ -226,12 +238,14 @@ public class ActivitiesManager extends AbstractService {
// allocation.
void addSchedulingActivityForApp(ApplicationId applicationId,
ContainerId containerId, String priority, ActivityState state,
String diagnostic, String type) {
String diagnostic, String type, NodeId nodeId,
String allocationRequestId) {
if (shouldRecordThisApp(applicationId)) {
AppAllocation appAllocation = appsAllocation.get().get(applicationId);
appAllocation.addAppAllocationActivity(containerId == null ?
"Container-Id-Not-Assigned" :
containerId.toString(), priority, state, diagnostic, type);
containerId.toString(), priority, state, diagnostic, type, nodeId,
allocationRequestId);
}
}
@ -297,6 +311,8 @@ public class ActivitiesManager extends AbstractService {
completedNodeAllocations.put(nodeID, value);
}
}
// disable diagnostic collector
diagnosticCollectorManager.get().disable();
}
boolean shouldRecordThisApp(ApplicationId applicationId) {
@ -369,4 +385,69 @@ public class ActivitiesManager extends AbstractService {
}
return null;
}
/**
* Class to manage the diagnostics collector.
*/
public static class DiagnosticsCollectorManager {
private boolean enabled = false;
private DiagnosticsCollector gdc;
public boolean isEnabled() {
return enabled;
}
public void enable() {
this.enabled = true;
}
public void disable() {
this.enabled = false;
}
public DiagnosticsCollectorManager(DiagnosticsCollector gdc) {
this.gdc = gdc;
}
public Optional<DiagnosticsCollector> getOptionalDiagnosticsCollector() {
if (enabled) {
return Optional.of(gdc);
} else {
return Optional.empty();
}
}
}
public Optional<DiagnosticsCollector> getOptionalDiagnosticsCollector() {
return diagnosticCollectorManager.get().getOptionalDiagnosticsCollector();
}
public String getResourceDiagnostics(ResourceCalculator rc, Resource required,
Resource available) {
Optional<DiagnosticsCollector> dcOpt = getOptionalDiagnosticsCollector();
if (dcOpt.isPresent()) {
dcOpt.get().collectResourceDiagnostics(rc, required, available);
return getDiagnostics(dcOpt.get());
}
return EMPTY_DIAGNOSTICS;
}
public static String getDiagnostics(Optional<DiagnosticsCollector> dcOpt) {
if (dcOpt != null && dcOpt.isPresent()) {
DiagnosticsCollector dc = dcOpt.get();
if (dc != null && dc.getDiagnostics() != null) {
return getDiagnostics(dc);
}
}
return EMPTY_DIAGNOSTICS;
}
private static String getDiagnostics(DiagnosticsCollector dc) {
StringBuilder sb = new StringBuilder();
sb.append(", ").append(dc.getDiagnostics());
if (dc.getDetails() != null) {
sb.append(DIAGNOSTICS_DETAILS_SEPARATOR).append(dc.getDetails());
}
return sb.toString();
}
}

View File

@ -38,9 +38,6 @@ public class ActivityDiagnosticConstant {
public final static String PRIORITY_SKIPPED = "Priority skipped";
public final static String PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST =
"Priority skipped because off-switch request is null";
public final static String
PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST =
"Priority skipped because partition of node doesn't match request";
public final static String SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY =
"Priority skipped because of relax locality is not allowed";
public final static String SKIP_IN_IGNORE_EXCLUSIVITY_MODE =
@ -74,4 +71,10 @@ public class ActivityDiagnosticConstant {
"All priorities are skipped of the app";
public final static String RESPECT_FIFO = "To respect FIFO of applications, "
+ "skipped following applications in the queue";
public final static String
NODE_DO_NOT_MATCH_PARTITION_OR_PLACEMENT_CONSTRAINTS =
"Node does not match partition or placement constraints";
public final static String
NODE_CAN_NOT_FIND_CONTAINER_TO_BE_UNRESERVED_WHEN_NEEDED =
"Node can't find a container to be unreserved when needed";
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.hadoop.yarn.api.records.NodeId;
import java.util.LinkedList;
import java.util.List;
@ -33,18 +35,32 @@ public class ActivityNode {
private String requestPriority;
private ActivityState state;
private String diagnostic;
private NodeId nodeId;
private String allocationRequestId;
private List<ActivityNode> childNode;
public ActivityNode(String activityNodeName, String parentName,
String priority, ActivityState state, String diagnostic, String type) {
this(activityNodeName, parentName, priority, state, diagnostic, type, null,
null);
}
public ActivityNode(String activityNodeName, String parentName,
String priority, ActivityState state, String diagnostic, String type,
NodeId nodeId, String allocationRequestId) {
this.activityNodeName = activityNodeName;
this.parentName = parentName;
if (type != null) {
if (type.equals("app")) {
this.appPriority = priority;
} else if (type.equals("request")) {
this.requestPriority = priority;
this.allocationRequestId = allocationRequestId;
} else if (type.equals("container")) {
this.requestPriority = priority;
this.allocationRequestId = allocationRequestId;
this.nodeId = nodeId;
}
}
this.state = state;
@ -84,6 +100,14 @@ public class ActivityNode {
return requestPriority;
}
public NodeId getNodeId() {
return nodeId;
}
public String getAllocationRequestId() {
return allocationRequestId;
}
public boolean getType() {
if (appPriority != null) {
return true;
@ -97,6 +121,9 @@ public class ActivityNode {
sb.append(this.activityNodeName + " ")
.append(this.appPriority + " ")
.append(this.state + " ");
if (this.nodeId != null) {
sb.append(this.nodeId + " ");
}
if (!this.diagnostic.equals("")) {
sb.append(this.diagnostic + "\n");
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,19 +34,25 @@ public class AllocationActivity {
private String requestPriority = null;
private ActivityState state;
private String diagnostic = null;
private NodeId nodeId;
private String allocationRequestId;
private static final Logger LOG =
LoggerFactory.getLogger(AllocationActivity.class);
public AllocationActivity(String parentName, String queueName,
String priority, ActivityState state, String diagnostic, String type) {
String priority, ActivityState state, String diagnostic, String type,
NodeId nodeId, String allocationRequestId) {
this.childName = queueName;
this.parentName = parentName;
if (type != null) {
if (type.equals("app")) {
this.appPriority = priority;
} else if (type.equals("container")) {
} else if (type.equals("request")) {
this.requestPriority = priority;
this.allocationRequestId = allocationRequestId;
} else if (type.equals("container")) {
this.nodeId = nodeId;
}
}
this.state = state;
@ -58,7 +65,12 @@ public class AllocationActivity {
this.state, this.diagnostic, "app");
} else if (requestPriority != null) {
return new ActivityNode(this.childName, this.parentName,
this.requestPriority, this.state, this.diagnostic, "container");
this.requestPriority, this.state, this.diagnostic, "request", null,
allocationRequestId);
} else if (nodeId != null) {
return new ActivityNode(this.childName, this.parentName,
this.requestPriority, this.state, this.diagnostic, "container",
this.nodeId, null);
} else {
return new ActivityNode(this.childName, this.parentName, null, this.state,
this.diagnostic, null);

View File

@ -56,9 +56,10 @@ public class AppAllocation {
}
public void addAppAllocationActivity(String containerId, String priority,
ActivityState state, String diagnostic, String type) {
ActivityState state, String diagnose, String type, NodeId nId,
String allocationRequestId) {
ActivityNode container = new ActivityNode(containerId, null, priority,
state, diagnostic, type);
state, diagnose, type, nId, allocationRequestId);
this.allocationAttempts.add(container);
if (state == ActivityState.REJECTED) {
this.appState = ActivityState.SKIPPED;

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
/**
* Generic interface that can be used for collecting diagnostics.
*/
public interface DiagnosticsCollector {
void collect(String diagnostics, String details);
String getDiagnostics();
String getDetails();
void collectResourceDiagnostics(ResourceCalculator rc,
Resource required, Resource available);
void collectPlacementConstraintDiagnostics(PlacementConstraint pc,
PlacementConstraint.TargetExpression.TargetType targetType);
void collectPartitionDiagnostics(
String requiredPartition, String nodePartition);
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import java.util.Set;
/**
* Generic interface that can be used for collecting diagnostics.
*/
public class GenericDiagnosticsCollector implements DiagnosticsCollector {
public final static String RESOURCE_DIAGNOSTICS_PREFIX =
"insufficient resources=";
public final static String PLACEMENT_CONSTRAINT_DIAGNOSTICS_PREFIX =
"unsatisfied PC expression=";
public final static String PARTITION_DIAGNOSTICS_PREFIX =
"unsatisfied node partition=";
private String diagnostics;
private String details;
public void collect(String diagnosticsInfo, String detailsInfo) {
this.diagnostics = diagnosticsInfo;
this.details = detailsInfo;
}
public String getDiagnostics() {
return diagnostics;
}
public String getDetails() {
return details;
}
public void collectResourceDiagnostics(ResourceCalculator rc,
Resource required, Resource available) {
Set<String> insufficientResourceNames =
rc.getInsufficientResourceNames(required, available);
this.diagnostics = new StringBuilder(RESOURCE_DIAGNOSTICS_PREFIX)
.append(insufficientResourceNames).toString();
this.details = new StringBuilder().append("required=").append(required)
.append(", available=").append(available).toString();
}
public void collectPlacementConstraintDiagnostics(PlacementConstraint pc,
PlacementConstraint.TargetExpression.TargetType targetType) {
this.diagnostics =
new StringBuilder(PLACEMENT_CONSTRAINT_DIAGNOSTICS_PREFIX).append("\"")
.append(pc).append("\", target-type=").append(targetType)
.toString();
this.details = null;
}
public void collectPartitionDiagnostics(
String requiredPartition, String nodePartition) {
this.diagnostics =
new StringBuilder(PARTITION_DIAGNOSTICS_PREFIX).append(nodePartition)
.append(", required-partition=").append(requiredPartition)
.toString();
this.details = null;
}
}

View File

@ -55,9 +55,10 @@ public class NodeAllocation {
}
public void addAllocationActivity(String parentName, String childName,
String priority, ActivityState state, String diagnostic, String type) {
String priority, ActivityState state, String diagnostic, String type,
NodeId nId, String allocationRequestId) {
AllocationActivity allocate = new AllocationActivity(parentName, childName,
priority, state, diagnostic, type);
priority, state, diagnostic, type, nId, allocationRequestId);
this.allocationOperations.add(allocate);
}
@ -134,7 +135,7 @@ public class NodeAllocation {
return root;
}
public String getNodeId() {
return nodeId.toString();
public NodeId getNodeId() {
return nodeId;
}
}

View File

@ -1074,8 +1074,8 @@ public class LeafQueue extends AbstractCSQueue {
&& !accessibleToPartition(candidates.getPartition())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + candidates
.getPartition());
ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + " "
+ candidates.getPartition());
return CSAssignment.NULL_ASSIGNMENT;
}

View File

@ -21,13 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocat
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -100,14 +101,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
Resource clusterResource, FiCaSchedulerNode node,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
SchedulerRequestKey schedulerKey) {
Priority priority = schedulerKey.getPriority();
PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
ResourceRequest.ANY);
if (offswitchPendingAsk.getCount() <= 0) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -118,7 +117,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Do we need containers at this 'priority'?
if (application.getOutstandingAsksCount(schedulerKey) <= 0) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -133,7 +132,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
application.updateAppSkipNodeDiagnostics(
"Skipping assigning to Node in Ignore Exclusivity mode. ");
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.SKIP_IN_IGNORE_EXCLUSIVITY_MODE);
return ContainerAllocation.APP_SKIPPED;
}
@ -141,11 +140,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Is the nodePartition of pending request matches the node's partition
// If not match, jump to next priority.
if (!appInfo.precheckNode(schedulerKey, node, schedulingMode)) {
Optional<DiagnosticsCollector> dcOpt = activitiesManager == null ?
Optional.empty() :
activitiesManager.getOptionalDiagnosticsCollector();
if (!appInfo.precheckNode(schedulerKey, node, schedulingMode, dcOpt)) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.
PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST);
NODE_DO_NOT_MATCH_PARTITION_OR_PLACEMENT_CONSTRAINTS
+ ActivitiesManager.getDiagnostics(dcOpt));
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -153,7 +156,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) {
LOG.debug("doesn't need containers based on reservation algo!");
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.DO_NOT_NEED_ALLOCATIONATTEMPTINFOS);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -164,7 +167,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
LOG.debug("cannot allocate required resource={} because of headroom",
required);
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM);
return ContainerAllocation.QUEUE_SKIPPED;
}
@ -179,7 +182,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// This is possible when #pending resource decreased by a different
// thread.
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -209,7 +212,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
+ rmContext.getScheduler().getNumClusterNodes());
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.NON_PARTITIONED_PARTITION_FIRST);
return ContainerAllocation.APP_SKIPPED;
}
@ -220,13 +223,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
private ContainerAllocation checkIfNodeBlackListed(FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey) {
Priority priority = schedulerKey.getPriority();
if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
application.updateAppSkipNodeDiagnostics(
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE);
return ContainerAllocation.APP_SKIPPED;
}
@ -366,7 +367,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Skip node-local request, go to rack-local request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey.getPriority(),
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.SKIP_NODE_LOCAL_REQUEST);
return ContainerAllocation.LOCALITY_SKIPPED;
}
@ -384,7 +385,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Skip rack-local request, go to off-switch request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey.getPriority(),
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.SKIP_RACK_LOCAL_REQUEST);
return ContainerAllocation.LOCALITY_SKIPPED;
}
@ -403,7 +404,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
application.updateAppSkipNodeDiagnostics(
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY);
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey.getPriority(),
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.SKIP_OFF_SWITCH_REQUEST);
return ContainerAllocation.APP_SKIPPED;
}
@ -412,8 +413,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
Priority priority = schedulerKey.getPriority();
ContainerAllocation allocation;
NodeType requestLocalityType = null;
@ -439,7 +438,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
if (rackLocalAsk.getCount() > 0) {
if (!appInfo.canDelayTo(schedulerKey, node.getRackName())) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -465,7 +464,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
if (offSwitchAsk.getCount() > 0) {
if (!appInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -489,7 +488,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
return allocation;
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.PRIORITY_SKIPPED);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -498,7 +497,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
PendingAsk pendingAsk, NodeType type, RMContainer rmContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
Priority priority = schedulerKey.getPriority();
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
@ -511,15 +509,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
Resource available = node.getUnallocatedResource();
Resource totalResource = node.getTotalResource();
if (!Resources.lessThanOrEqual(rc, clusterResource,
capability, totalResource)) {
if (!Resources.fitsIn(rc, capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for ask : " + pendingAsk
+ " node total capability : " + node.getTotalResource());
// Skip this locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE);
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE
+ getResourceDiagnostics(capability, totalResource));
return ContainerAllocation.LOCALITY_SKIPPED;
}
@ -529,6 +527,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Can we allocate a container on this node?
long availableContainers =
rc.computeAvailableContainers(available, capability);
// available resource for diagnostics collector
Resource availableForDC = available;
// How much need to unreserve equals to:
// max(required - headroom, amountNeedUnreserve)
@ -562,6 +562,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
break;
}
}
availableForDC = availableAndKillable;
}
if (availableContainers > 0) {
@ -594,8 +595,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
if (null == unreservedContainer) {
// Skip the locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.LOCALITY_SKIPPED);
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.
NODE_CAN_NOT_FIND_CONTAINER_TO_BE_UNRESERVED_WHEN_NEEDED);
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
@ -619,8 +621,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Skip the locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.LOCALITY_SKIPPED);
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE
+ getResourceDiagnostics(capability, availableForDC));
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
@ -633,8 +636,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
}
// Skip the locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.LOCALITY_SKIPPED);
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE
+ getResourceDiagnostics(capability, availableForDC));
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
@ -708,7 +712,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
new ContainerAllocation(allocationResult.containerToBeUnreserved,
null, AllocationState.APP_SKIPPED);
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
node, application, schedulerKey.getPriority(),
node, application, schedulerKey,
ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED);
return ret;
}
@ -730,7 +734,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
.updateAppSkipNodeDiagnostics("Scheduling of container failed. ");
LOG.warn("Couldn't get container for allocation!");
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
node, application, schedulerKey.getPriority(),
node, application, schedulerKey,
ActivityDiagnosticConstant.COULD_NOT_GET_CONTAINER,
ActivityState.REJECTED);
return ContainerAllocation.APP_SKIPPED;
@ -753,7 +757,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
+ " schedulerRequestKey=" + schedulerKey);
ActivitiesLogger.APP
.recordAppActivityWithoutAllocation(activitiesManager, node,
application, schedulerKey.getPriority(),
application, schedulerKey,
ActivityDiagnosticConstant.
PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST,
ActivityState.REJECTED);
@ -815,6 +819,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// This could be null when #pending request decreased by another thread.
if (schedulingPS == null) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, null, application, schedulerKey,
ActivityDiagnosticConstant.
APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE);
return new ContainerAllocation(reservedContainer, null,
AllocationState.QUEUE_SKIPPED);
}
@ -873,7 +881,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
.getPartition());
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, application.getPriority(),
activitiesManager, node, application, null,
ActivityDiagnosticConstant.APPLICATION_DO_NOT_NEED_RESOURCE);
return CSAssignment.SKIP_ASSIGNMENT;
}
@ -893,9 +901,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// We will reach here if we skipped all priorities of the app, so we will
// skip the app.
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
return CSAssignment.SKIP_ASSIGNMENT;
} else {
ContainerAllocation result =
@ -905,4 +910,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
reservedContainer, node);
}
}
private String getResourceDiagnostics(Resource required, Resource available) {
if (activitiesManager == null) {
return ActivitiesManager.EMPTY_DIAGNOSTICS;
}
return activitiesManager.getResourceDiagnostics(rc, required, available);
}
}

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -213,7 +215,8 @@ public final class PlacementConstraintsUtil {
private static boolean canSatisfySingleConstraint(ApplicationId applicationId,
SingleConstraint singleConstraint, SchedulerNode schedulerNode,
AllocationTagsManager tagsManager)
AllocationTagsManager tagsManager,
Optional<DiagnosticsCollector> dcOpt)
throws InvalidAllocationTagsQueryException {
// Iterate through TargetExpressions
Iterator<TargetExpression> expIt =
@ -225,12 +228,20 @@ public final class PlacementConstraintsUtil {
// Check if conditions are met
if (!canSatisfySingleConstraintExpression(applicationId,
singleConstraint, currentExp, schedulerNode, tagsManager)) {
if (dcOpt.isPresent()) {
dcOpt.get().collectPlacementConstraintDiagnostics(
singleConstraint.build(), TargetType.ALLOCATION_TAG);
}
return false;
}
} else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) {
// This is a node attribute expression, check it.
if (!canSatisfyNodeConstraintExpression(singleConstraint, currentExp,
schedulerNode)) {
if (dcOpt.isPresent()) {
dcOpt.get().collectPlacementConstraintDiagnostics(
singleConstraint.build(), TargetType.NODE_ATTRIBUTE);
}
return false;
}
}
@ -249,12 +260,13 @@ public final class PlacementConstraintsUtil {
* @throws InvalidAllocationTagsQueryException
*/
private static boolean canSatisfyAndConstraint(ApplicationId appId,
And constraint, SchedulerNode node, AllocationTagsManager atm)
And constraint, SchedulerNode node, AllocationTagsManager atm,
Optional<DiagnosticsCollector> dcOpt)
throws InvalidAllocationTagsQueryException {
// Iterate over the constraints tree, if found any child constraint
// isn't satisfied, return false.
for (AbstractConstraint child : constraint.getChildren()) {
if(!canSatisfyConstraints(appId, child.build(), node, atm)) {
if(!canSatisfyConstraints(appId, child.build(), node, atm, dcOpt)) {
return false;
}
}
@ -271,10 +283,11 @@ public final class PlacementConstraintsUtil {
* @throws InvalidAllocationTagsQueryException
*/
private static boolean canSatisfyOrConstraint(ApplicationId appId,
Or constraint, SchedulerNode node, AllocationTagsManager atm)
Or constraint, SchedulerNode node, AllocationTagsManager atm,
Optional<DiagnosticsCollector> dcOpt)
throws InvalidAllocationTagsQueryException {
for (AbstractConstraint child : constraint.getChildren()) {
if (canSatisfyConstraints(appId, child.build(), node, atm)) {
if (canSatisfyConstraints(appId, child.build(), node, atm, dcOpt)) {
return true;
}
}
@ -283,7 +296,8 @@ public final class PlacementConstraintsUtil {
private static boolean canSatisfyConstraints(ApplicationId appId,
PlacementConstraint constraint, SchedulerNode node,
AllocationTagsManager atm)
AllocationTagsManager atm,
Optional<DiagnosticsCollector> dcOpt)
throws InvalidAllocationTagsQueryException {
if (constraint == null) {
LOG.debug("Constraint is found empty during constraint validation for"
@ -300,13 +314,13 @@ public final class PlacementConstraintsUtil {
// TODO handle other type of constraints, e.g CompositeConstraint
if (sConstraintExpr instanceof SingleConstraint) {
SingleConstraint single = (SingleConstraint) sConstraintExpr;
return canSatisfySingleConstraint(appId, single, node, atm);
return canSatisfySingleConstraint(appId, single, node, atm, dcOpt);
} else if (sConstraintExpr instanceof And) {
And and = (And) sConstraintExpr;
return canSatisfyAndConstraint(appId, and, node, atm);
return canSatisfyAndConstraint(appId, and, node, atm, dcOpt);
} else if (sConstraintExpr instanceof Or) {
Or or = (Or) sConstraintExpr;
return canSatisfyOrConstraint(appId, or, node, atm);
return canSatisfyOrConstraint(appId, or, node, atm, dcOpt);
} else {
throw new InvalidAllocationTagsQueryException(
"Unsupported type of constraint: "
@ -331,12 +345,14 @@ public final class PlacementConstraintsUtil {
* @param schedulerNode node
* @param pcm placement constraint manager
* @param atm allocation tags manager
* @param dcOpt optional diagnostics collector
* @return true if the given node satisfies the constraint of the request
* @throws InvalidAllocationTagsQueryException
*/
public static boolean canSatisfyConstraints(ApplicationId applicationId,
SchedulingRequest request, SchedulerNode schedulerNode,
PlacementConstraintManager pcm, AllocationTagsManager atm)
PlacementConstraintManager pcm, AllocationTagsManager atm,
Optional<DiagnosticsCollector> dcOpt)
throws InvalidAllocationTagsQueryException {
Set<String> sourceTags = null;
PlacementConstraint pc = null;
@ -346,7 +362,15 @@ public final class PlacementConstraintsUtil {
}
return canSatisfyConstraints(applicationId,
pcm.getMultilevelConstraint(applicationId, sourceTags, pc),
schedulerNode, atm);
schedulerNode, atm, dcOpt);
}
public static boolean canSatisfyConstraints(ApplicationId applicationId,
SchedulingRequest request, SchedulerNode schedulerNode,
PlacementConstraintManager pcm, AllocationTagsManager atm)
throws InvalidAllocationTagsQueryException {
return canSatisfyConstraints(applicationId, request, schedulerNode, pcm,
atm, Optional.empty());
}
private static NodeAttribute getNodeConstraintFromRequest(String attrKey,

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@ -32,6 +33,7 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
/**
* <p>
@ -150,8 +152,13 @@ public abstract class AppPlacementAllocator<N extends SchedulerNode> {
*
* @param schedulerNode schedulerNode
* @param schedulingMode schedulingMode
* @param dcOpt optional diagnostics collector
* @return accepted/not
*/
public abstract boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode,
Optional<DiagnosticsCollector> dcOpt);
public abstract boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -41,6 +42,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -391,9 +393,11 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
}
@Override
public boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode) {
SchedulingMode schedulingMode,
Optional<DiagnosticsCollector> dcOpt) {
// We will only look at node label = nodeLabelToLookAt according to
// schedulingMode and partition of node.
LOG.debug("precheckNode is invoked for {},{}", schedulerNode.getNodeID(),
@ -405,7 +409,18 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
}
return primaryRequestedPartition.equals(nodePartitionToLookAt);
boolean rst = primaryRequestedPartition.equals(nodePartitionToLookAt);
if (!rst && dcOpt.isPresent()) {
dcOpt.get().collectPartitionDiagnostics(primaryRequestedPartition,
nodePartitionToLookAt);
}
return rst;
}
@Override
public boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode) {
return precheckNode(schedulerNode, schedulingMode, Optional.empty());
}
@Override

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ExecutionType;
@ -47,6 +48,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -344,7 +346,8 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
}
}
private boolean checkCardinalityAndPending(SchedulerNode node) {
private boolean checkCardinalityAndPending(SchedulerNode node,
Optional<DiagnosticsCollector> dcOpt) {
// Do we still have pending resource?
if (schedulingRequest.getResourceSizing().getNumAllocations() <= 0) {
return false;
@ -354,7 +357,7 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
try {
return PlacementConstraintsUtil.canSatisfyConstraints(
appSchedulingInfo.getApplicationId(), schedulingRequest, node,
placementConstraintManager, allocationTagsManager);
placementConstraintManager, allocationTagsManager, dcOpt);
} catch (InvalidAllocationTagsQueryException e) {
LOG.warn("Failed to query node cardinality:", e);
return false;
@ -365,7 +368,7 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
public boolean canAllocate(NodeType type, SchedulerNode node) {
readLock.lock();
try {
return checkCardinalityAndPending(node);
return checkCardinalityAndPending(node, Optional.empty());
} finally {
readLock.unlock();
}
@ -379,6 +382,13 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
@Override
public boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode) {
return precheckNode(schedulerNode, schedulingMode, Optional.empty());
}
@Override
public boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode,
Optional<DiagnosticsCollector> dcOpt) {
// We will only look at node label = nodeLabelToLookAt according to
// schedulingMode and partition of node.
String nodePartitionToLookAt;
@ -391,8 +401,15 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
readLock.lock();
try {
// Check node partition as well as cardinality/pending resources.
return this.targetNodePartition.equals(nodePartitionToLookAt)
&& checkCardinalityAndPending(schedulerNode);
boolean rst = this.targetNodePartition.equals(nodePartitionToLookAt);
if (!rst) {
if (dcOpt.isPresent()) {
dcOpt.get().collectPartitionDiagnostics(targetNodePartition,
nodePartitionToLookAt);
}
return rst;
}
return checkCardinalityAndPending(schedulerNode, dcOpt);
} finally {
readLock.unlock();
}

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
import javax.xml.bind.annotation.XmlAccessType;
@ -63,7 +65,11 @@ public class ActivitiesInfo {
if (nodeAllocations.size() == 0) {
diagnostic = "do not have available resources";
} else {
this.nodeId = nodeAllocations.get(0).getNodeId();
NodeId rootNodeId = nodeAllocations.get(0).getNodeId();
if (rootNodeId != null && !Strings
.isNullOrEmpty(rootNodeId.getHost())) {
this.nodeId = nodeAllocations.get(0).getNodeId().toString();
}
Date date = new Date();
date.setTime(nodeAllocations.get(0).getTimeStamp());

View File

@ -18,7 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import com.google.common.base.Strings;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@ -38,17 +41,30 @@ public class ActivityNodeInfo {
protected String requestPriority;
protected String allocationState;
protected String diagnostic;
private String nodeId;
private String allocationRequestId;
protected List<ActivityNodeInfo> children;
ActivityNodeInfo() {
}
public ActivityNodeInfo(String name, ActivityState allocationState,
String diagnostic, NodeId nId) {
this.name = name;
this.allocationState = allocationState.name();
this.diagnostic = diagnostic;
setNodeId(nId);
}
ActivityNodeInfo(ActivityNode node) {
this.name = node.getName();
getPriority(node);
setPriority(node);
setNodeId(node.getNodeId());
this.allocationState = node.getState().name();
this.diagnostic = node.getDiagnostic();
this.requestPriority = node.getRequestPriority();
this.allocationRequestId = node.getAllocationRequestId();
this.children = new ArrayList<>();
for (ActivityNode child : node.getChildren()) {
@ -57,11 +73,25 @@ public class ActivityNodeInfo {
}
}
private void getPriority(ActivityNode node) {
public void setNodeId(NodeId nId) {
if (nId != null && !Strings.isNullOrEmpty(nId.getHost())) {
this.nodeId = nId.toString();
}
}
private void setPriority(ActivityNode node) {
if (node.getType()) {
this.appPriority = node.getAppPriority();
} else {
this.requestPriority = node.getRequestPriority();
}
}
public String getNodeId() {
return nodeId;
}
public String getAllocationRequestId() {
return allocationRequestId;
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
@ -29,6 +27,8 @@ import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/*
* DAO object to display application allocation detailed information.
@ -36,38 +36,62 @@ import java.util.List;
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class AppAllocationInfo {
protected String nodeId;
protected String queueName;
protected String appPriority;
protected String allocatedContainerId;
protected String allocationState;
protected String diagnostic;
protected String timeStamp;
protected List<ActivityNodeInfo> allocationAttempt;
private static final Logger LOG =
LoggerFactory.getLogger(AppAllocationInfo.class);
private String nodeId;
private String queueName;
private String appPriority;
private long timestamp;
private String dateTime;
private String allocationState;
private List<AppRequestAllocationInfo> requestAllocation;
AppAllocationInfo() {
}
AppAllocationInfo(AppAllocation allocation) {
this.allocationAttempt = new ArrayList<>();
this.requestAllocation = new ArrayList<>();
this.nodeId = allocation.getNodeId();
this.queueName = allocation.getQueueName();
this.appPriority = allocation.getPriority();
this.allocatedContainerId = allocation.getContainerId();
this.timestamp = allocation.getTime();
this.dateTime = new Date(allocation.getTime()).toString();
this.allocationState = allocation.getAppState().name();
this.diagnostic = allocation.getDiagnostic();
Map<String, List<ActivityNode>> requestToActivityNodes =
allocation.getAllocationAttempts().stream().collect(Collectors
.groupingBy((e) -> e.getRequestPriority() + "_" + e
.getAllocationRequestId(), Collectors.toList()));
for (List<ActivityNode> requestActivityNodes : requestToActivityNodes
.values()) {
AppRequestAllocationInfo requestAllocationInfo =
new AppRequestAllocationInfo(requestActivityNodes);
this.requestAllocation.add(requestAllocationInfo);
}
}
Date date = new Date();
date.setTime(allocation.getTime());
this.timeStamp = date.toString();
public String getNodeId() {
return nodeId;
}
for (ActivityNode attempt : allocation.getAllocationAttempts()) {
ActivityNodeInfo containerInfo = new ActivityNodeInfo(attempt);
this.allocationAttempt.add(containerInfo);
}
public String getQueueName() {
return queueName;
}
public String getAppPriority() {
return appPriority;
}
public long getTimestamp() {
return timestamp;
}
public String getDateTime() {
return dateTime;
}
public String getAllocationState() {
return allocationState;
}
public List<AppRequestAllocationInfo> getRequestAllocation() {
return requestAllocation;
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import com.google.common.collect.Iterables;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;
/**
* DAO object to display request allocation detailed information.
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class AppRequestAllocationInfo {
private String requestPriority;
private String allocationRequestId;
private String allocationState;
private List<ActivityNodeInfo> allocationAttempt;
AppRequestAllocationInfo() {
}
AppRequestAllocationInfo(List<ActivityNode> activityNodes) {
this.allocationAttempt = new ArrayList<>();
ActivityNode lastActivityNode = Iterables.getLast(activityNodes);
this.requestPriority = lastActivityNode.getRequestPriority();
this.allocationRequestId = lastActivityNode.getAllocationRequestId();
this.allocationState = lastActivityNode.getState().name();
for (ActivityNode attempt : activityNodes) {
ActivityNodeInfo containerInfo =
new ActivityNodeInfo(attempt.getName(), attempt.getState(),
attempt.getDiagnostic(), attempt.getNodeId());
this.allocationAttempt.add(containerInfo);
}
}
public String getRequestPriority() {
return requestPriority;
}
public String getAllocationRequestId() {
return allocationRequestId;
}
public String getAllocationState() {
return allocationState;
}
public List<ActivityNodeInfo> getAllocationAttempt() {
return allocationAttempt;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert;
import org.junit.Before;
@ -132,7 +133,8 @@ public class TestActivitiesManager {
.startNodeUpdateRecording(activitiesManager, node.getNodeID());
ActivitiesLogger.APP
.recordAppActivityWithoutAllocation(activitiesManager, node,
randomApp, Priority.newInstance(0),
randomApp,
new SchedulerRequestKey(Priority.newInstance(0), 0, null),
ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
ActivityState.REJECTED);
ActivitiesLogger.NODE
@ -176,7 +178,8 @@ public class TestActivitiesManager {
ActivitiesManager.EMPTY_NODE_ID);
ActivitiesLogger.APP
.recordAppActivityWithoutAllocation(activitiesManager, node,
randomApp, Priority.newInstance(0),
randomApp,
new SchedulerRequestKey(Priority.newInstance(0), 0, null),
ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
ActivityState.REJECTED);
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
@ -216,7 +219,8 @@ public class TestActivitiesManager {
for (SchedulerNode node : nodes) {
ActivitiesLogger.APP
.recordAppActivityWithoutAllocation(activitiesManager, node,
randomApp, Priority.newInstance(0),
randomApp,
new SchedulerRequestKey(Priority.newInstance(0), 0, null),
ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
ActivityState.REJECTED);
}

View File

@ -36,6 +36,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -63,6 +64,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.GenericDiagnosticsCollector;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
@ -231,6 +234,15 @@ public class TestPlacementConstraintsUtil {
createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
// Test diagnostics collector
DiagnosticsCollector collector =
new GenericDiagnosticsCollector();
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm,
Optional.of(collector)));
Assert.assertNotNull(collector.getDiagnostics());
Assert.assertTrue(collector.getDiagnostics().contains("ALLOCATION_TAG"));
}
@Test

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.GenericDiagnosticsCollector;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.function.Predicate;
import static org.junit.Assert.assertEquals;
/**
* Some Utils for activities tests.
*/
public final class ActivitiesTestUtils {
public static final String INSUFFICIENT_RESOURCE_DIAGNOSTIC_PREFIX =
ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE
+ ", " + GenericDiagnosticsCollector.RESOURCE_DIAGNOSTICS_PREFIX;
public static final String UNMATCHED_PARTITION_OR_PC_DIAGNOSTIC_PREFIX =
ActivityDiagnosticConstant.
NODE_DO_NOT_MATCH_PARTITION_OR_PLACEMENT_CONSTRAINTS + ", "
+ GenericDiagnosticsCollector.PLACEMENT_CONSTRAINT_DIAGNOSTICS_PREFIX;
private ActivitiesTestUtils(){}
public static List<JSONObject> findInAllocations(JSONObject allocationObj,
Predicate p) throws JSONException {
List<JSONObject> target = new ArrayList<>();
recursiveFindObj(allocationObj.getJSONObject("root"), p, target);
return target;
}
private static void recursiveFindObj(JSONObject obj, Predicate p,
List<JSONObject> target) throws JSONException {
if (p.test(obj)) {
target.add(obj);
}
if (obj.has("children")) {
JSONArray childrenObjs = obj.optJSONArray("children");
if (childrenObjs != null) {
for (int i = 0; i < childrenObjs.length(); i++) {
recursiveFindObj(childrenObjs.getJSONObject(i), p, target);
}
} else {
JSONObject childrenObj = obj.optJSONObject("children");
recursiveFindObj(childrenObj, p, target);
}
}
}
public static SchedulingRequest schedulingRequest(int numContainers,
int priority, long allocReqId, int cores, int mem,
PlacementConstraint placementConstraintExpression, String... tags) {
return SchedulingRequest.newBuilder()
.priority(Priority.newInstance(priority))
.allocationRequestId(allocReqId)
.allocationTags(new HashSet<>(Arrays.asList(tags))).executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true))
.resourceSizing(ResourceSizing
.newInstance(numContainers, Resource.newInstance(mem, cores)))
.placementConstraintExpression(placementConstraintExpression).build();
}
public static void verifyNumberOfNodes(JSONObject allocation, int expectValue)
throws Exception {
if (allocation.isNull("root")) {
assertEquals("State of allocation is wrong", expectValue, 0);
} else {
assertEquals("State of allocation is wrong", expectValue,
1 + getNumberOfNodes(allocation.getJSONObject("root")));
}
}
public static int getNumberOfNodes(JSONObject allocation) throws Exception {
if (!allocation.isNull("children")) {
Object object = allocation.get("children");
if (object.getClass() == JSONObject.class) {
return 1 + getNumberOfNodes((JSONObject) object);
} else {
int count = 0;
for (int i = 0; i < ((JSONArray) object).length(); i++) {
count += (1 + getNumberOfNodes(
((JSONArray) object).getJSONObject(i)));
}
return count;
}
} else {
return 0;
}
}
public static void verifyStateOfAllocations(JSONObject allocation,
String nameToCheck, String expectState) throws Exception {
assertEquals("State of allocation is wrong", expectState,
allocation.get(nameToCheck));
}
public static void verifyNumberOfAllocations(JSONObject json, int expectValue)
throws Exception {
if (json.isNull("allocations")) {
assertEquals("Number of allocations is wrong", expectValue, 0);
} else {
Object object = json.get("allocations");
if (object.getClass() == JSONObject.class) {
assertEquals("Number of allocations is wrong", expectValue, 1);
} else if (object.getClass() == JSONArray.class) {
assertEquals("Number of allocations is wrong in: " + object,
expectValue, ((JSONArray) object).length());
}
}
}
public static void verifyQueueOrder(JSONObject json, String expectOrder)
throws Exception {
String order = "";
if (!json.isNull("root")) {
JSONObject root = json.getJSONObject("root");
order = root.getString("name") + "-" + getQueueOrder(root);
}
assertEquals("Order of queue is wrong", expectOrder,
order.substring(0, order.length() - 1));
}
public static String getQueueOrder(JSONObject node) throws Exception {
if (!node.isNull("children")) {
Object children = node.get("children");
if (children.getClass() == JSONObject.class) {
if (!((JSONObject) children).isNull("appPriority")) {
return "";
}
return ((JSONObject) children).getString("name") + "-" + getQueueOrder(
(JSONObject) children);
} else if (children.getClass() == JSONArray.class) {
String order = "";
for (int i = 0; i < ((JSONArray) children).length(); i++) {
JSONObject child = (JSONObject) ((JSONArray) children).get(i);
if (!child.isNull("appPriority")) {
return "";
}
order += (child.getString("name") + "-" + getQueueOrder(child));
}
return order;
}
}
return "";
}
public static void verifyNumberOfAllocationAttempts(JSONObject allocation,
int expectValue) throws Exception {
if (allocation.isNull("allocationAttempt")) {
assertEquals("Number of allocation attempts is wrong", expectValue, 0);
} else {
Object object = allocation.get("allocationAttempt");
if (object.getClass() == JSONObject.class) {
assertEquals("Number of allocations attempts is wrong", expectValue, 1);
} else if (object.getClass() == JSONArray.class) {
assertEquals("Number of allocations attempts is wrong", expectValue,
((JSONArray) object).length());
}
}
}
}

View File

@ -100,6 +100,8 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
serve("/*").with(GuiceContainer.class);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.http.JettyUtils;
@ -29,12 +30,16 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
@ -42,9 +47,23 @@ import org.junit.Test;
import javax.ws.rs.core.MediaType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.INSUFFICIENT_RESOURCE_DIAGNOSTIC_PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.UNMATCHED_PARTITION_OR_PC_DIAGNOSTIC_PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyNumberOfAllocationAttempts;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyNumberOfAllocations;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyNumberOfNodes;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyQueueOrder;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyStateOfAllocations;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestRMWebServicesSchedulerActivities
extends TestRMWebServicesCapacitySched {
@ -101,8 +120,7 @@ public class TestRMWebServicesSchedulerActivities
verifyStateOfAllocations(json.getJSONObject("allocations"),
"finalAllocationState", "ALLOCATED");
verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b2-b3-b1");
}
finally {
} finally {
rm.stop();
}
}
@ -150,8 +168,7 @@ public class TestRMWebServicesSchedulerActivities
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
} finally {
rm.stop();
}
}
@ -184,8 +201,7 @@ public class TestRMWebServicesSchedulerActivities
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
} finally {
rm.stop();
}
}
@ -233,8 +249,7 @@ public class TestRMWebServicesSchedulerActivities
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
} finally {
rm.stop();
}
}
@ -355,8 +370,7 @@ public class TestRMWebServicesSchedulerActivities
allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "finalAllocationState",
"ALLOCATED_FROM_RESERVED");
}
finally {
} finally {
rm.stop();
}
}
@ -401,114 +415,15 @@ public class TestRMWebServicesSchedulerActivities
verifyStateOfAllocations(allocations, "finalAllocationState",
"ALLOCATED");
verifyNumberOfNodes(allocations, 5);
// Increase number of nodes to 6 since request node has been added
verifyNumberOfNodes(allocations, 6);
verifyQueueOrder(json.getJSONObject("allocations"), "root-b-b1");
}
finally {
} finally {
rm.stop();
}
}
private void verifyNumberOfNodes(JSONObject allocation, int realValue)
throws Exception {
if (allocation.isNull("root")) {
assertEquals("State of allocation is wrong", 0, realValue);
} else {
assertEquals("State of allocation is wrong",
1 + getNumberOfNodes(allocation.getJSONObject("root")), realValue);
}
}
private int getNumberOfNodes(JSONObject allocation) throws Exception {
if (!allocation.isNull("children")) {
Object object = allocation.get("children");
if (object.getClass() == JSONObject.class) {
return 1 + getNumberOfNodes((JSONObject) object);
} else {
int count = 0;
for (int i = 0; i < ((JSONArray) object).length(); i++) {
count += (1 + getNumberOfNodes(
((JSONArray) object).getJSONObject(i)));
}
return count;
}
} else {
return 0;
}
}
private void verifyStateOfAllocations(JSONObject allocation,
String nameToCheck, String realState) throws Exception {
assertEquals("State of allocation is wrong", allocation.get(nameToCheck),
realState);
}
private void verifyNumberOfAllocations(JSONObject json, int realValue)
throws Exception {
if (json.isNull("allocations")) {
assertEquals("Number of allocations is wrong", 0, realValue);
} else {
Object object = json.get("allocations");
if (object.getClass() == JSONObject.class) {
assertEquals("Number of allocations is wrong", 1, realValue);
} else if (object.getClass() == JSONArray.class) {
assertEquals("Number of allocations is wrong in: " + object,
((JSONArray) object).length(), realValue);
}
}
}
private void verifyQueueOrder(JSONObject json, String realOrder)
throws Exception {
String order = "";
if (!json.isNull("root")) {
JSONObject root = json.getJSONObject("root");
order = root.getString("name") + "-" + getQueueOrder(root);
}
assertEquals("Order of queue is wrong",
order.substring(0, order.length() - 1), realOrder);
}
private String getQueueOrder(JSONObject node) throws Exception {
if (!node.isNull("children")) {
Object children = node.get("children");
if (children.getClass() == JSONObject.class) {
if (!((JSONObject) children).isNull("appPriority")) {
return "";
}
return ((JSONObject) children).getString("name") + "-" + getQueueOrder(
(JSONObject) children);
} else if (children.getClass() == JSONArray.class) {
String order = "";
for (int i = 0; i < ((JSONArray) children).length(); i++) {
JSONObject child = (JSONObject) ((JSONArray) children).get(i);
if (!child.isNull("appPriority")) {
return "";
}
order += (child.getString("name") + "-" + getQueueOrder(child));
}
return order;
}
}
return "";
}
private void verifyNumberOfAllocationAttempts(JSONObject allocation,
int realValue) throws Exception {
if (allocation.isNull("allocationAttempt")) {
assertEquals("Number of allocation attempts is wrong", 0, realValue);
} else {
Object object = allocation.get("allocationAttempt");
if (object.getClass() == JSONObject.class) {
assertEquals("Number of allocations attempts is wrong", 1, realValue);
} else if (object.getClass() == JSONArray.class) {
assertEquals("Number of allocations attempts is wrong",
((JSONArray) object).length(), realValue);
}
}
}
@Test
public void testAppActivityJSON() throws Exception {
//Start RM so that it accepts app submissions
@ -542,14 +457,25 @@ public class TestRMWebServicesSchedulerActivities
response.getType().toString());
json = response.getEntity(JSONObject.class);
//Check app activities
verifyNumberOfAllocations(json, 1);
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED");
verifyNumberOfAllocationAttempts(allocations, 1);
}
finally {
//Check request allocation
JSONObject requestAllocationObj =
allocations.getJSONObject("requestAllocation");
verifyStateOfAllocations(requestAllocationObj, "allocationState",
"ALLOCATED");
assertEquals("0", requestAllocationObj.optString("requestPriority"));
assertEquals("-1", requestAllocationObj.optString("allocationRequestId"));
//Check allocation attempts
verifyNumberOfAllocationAttempts(requestAllocationObj, 1);
JSONObject allocationAttemptObj =
requestAllocationObj.getJSONObject("allocationAttempt");
verifyStateOfAllocations(allocationAttemptObj, "allocationState",
"ALLOCATED");
assertNotNull(allocationAttemptObj.get("nodeId"));
} finally {
rm.stop();
}
}
@ -603,8 +529,7 @@ public class TestRMWebServicesSchedulerActivities
verifyStateOfAllocations(allocations.getJSONObject(i),
"allocationState", "ACCEPTED");
}
}
finally {
} finally {
rm.stop();
}
}
@ -651,8 +576,7 @@ public class TestRMWebServicesSchedulerActivities
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
} finally {
rm.stop();
}
}
@ -685,8 +609,7 @@ public class TestRMWebServicesSchedulerActivities
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
} finally {
rm.stop();
}
}
@ -791,10 +714,243 @@ public class TestRMWebServicesSchedulerActivities
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 3);
}
finally {
} finally {
rm.stop();
}
}
@Test (timeout=30000)
public void testInsufficientResourceDiagnostic()
throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024);
try {
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("cluster").path("scheduler/activities")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("waiting for next allocation",
json.getString("diagnostic"));
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "*",
Resources.createResource(5 * 1024), 1)), null);
//will reserve a container on nm1
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
response =
r.path("ws").path("v1").path("cluster").path("scheduler/activities")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = json.getJSONObject("allocations");
// check diagnostics
Predicate<JSONObject> findReqPred =
(obj) -> obj.optString("name").equals("request_-1_-1");
List<JSONObject> app2ReqObjs =
ActivitiesTestUtils.findInAllocations(allocationObj, findReqPred);
assertEquals(1, app2ReqObjs.size());
JSONObject reqChild = app2ReqObjs.get(0).getJSONObject("children");
assertTrue(reqChild.getString("diagnostic")
.contains(INSUFFICIENT_RESOURCE_DIAGNOSTIC_PREFIX));
} finally {
rm.stop();
}
}
@Test (timeout=30000)
public void testPlacementConstraintDiagnostic()
throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
try {
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// init scheduling request
PlacementConstraint pcExpression = PlacementConstraints
.build(PlacementConstraints.targetIn(NODE, allocationTag("foo")));
List<SchedulingRequest> schedulingRequests = new ArrayList<>();
schedulingRequests.add(ActivitiesTestUtils
.schedulingRequest(5, 1, 1, 1, 512, pcExpression, "foo"));
AllocateRequest allocateReq =
AllocateRequest.newBuilder().schedulingRequests(schedulingRequests)
.build();
am1.allocate(allocateReq);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("cluster").path("scheduler/activities")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("waiting for next allocation",
json.getString("diagnostic"));
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
response =
r.path("ws").path("v1").path("cluster").path("scheduler/activities")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = json.getJSONObject("allocations");
// check diagnostics
Predicate<JSONObject> findReqPred =
(obj) -> obj.optString("name").equals("request_1_1");
List<JSONObject> reqObjs =
ActivitiesTestUtils.findInAllocations(allocationObj, findReqPred);
assertEquals(1, reqObjs.size());
JSONObject reqChild = reqObjs.get(0).getJSONObject("children");
assertTrue(reqChild.getString("diagnostic")
.contains(UNMATCHED_PARTITION_OR_PC_DIAGNOSTIC_PREFIX));
} finally {
rm.stop();
}
}
@Test (timeout=30000)
public void testAppInsufficientResourceDiagnostic()
throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024);
try {
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("scheduler/app-activities").queryParams(params)
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("waiting for display",
json.getString("diagnostic"));
// am1 asks for 1 * 5GB container
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "*",
Resources.createResource(5 * 1024), 1)), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
response =
r.path("ws").path("v1").path("cluster")
.path("scheduler/app-activities").queryParams(params)
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = json.getJSONObject("allocations");
JSONObject requestAllocationObj =
allocationObj.getJSONObject("requestAllocation");
verifyNumberOfAllocationAttempts(requestAllocationObj, 1);
JSONObject allocationAttemptObj =
requestAllocationObj.getJSONObject("allocationAttempt");
verifyStateOfAllocations(allocationAttemptObj, "allocationState",
"SKIPPED");
assertTrue(allocationAttemptObj.optString("diagnostic")
.contains(INSUFFICIENT_RESOURCE_DIAGNOSTIC_PREFIX));
} finally {
rm.stop();
}
}
@Test (timeout=30000)
public void testAppPlacementConstraintDiagnostic()
throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024);
try {
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("scheduler/app-activities").queryParams(params)
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("waiting for display",
json.getString("diagnostic"));
// am1 asks for 1 * 5GB container with PC expression: in,node,foo
PlacementConstraint pcExpression = PlacementConstraints
.build(PlacementConstraints.targetIn(NODE, allocationTag("foo")));
List<SchedulingRequest> schedulingRequests = new ArrayList<>();
schedulingRequests.add(ActivitiesTestUtils
.schedulingRequest(5, 1, 1, 1, 512, pcExpression, "foo"));
AllocateRequest allocateReq =
AllocateRequest.newBuilder().schedulingRequests(schedulingRequests)
.build();
am1.allocate(allocateReq);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
response =
r.path("ws").path("v1").path("cluster")
.path("scheduler/app-activities").queryParams(params)
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = json.getJSONObject("allocations");
JSONObject requestAllocationObj =
allocationObj.getJSONObject("requestAllocation");
verifyNumberOfAllocationAttempts(requestAllocationObj, 1);
JSONObject allocationAttemptObj =
requestAllocationObj.getJSONObject("allocationAttempt");
verifyStateOfAllocations(allocationAttemptObj, "allocationState",
"SKIPPED");
assertTrue(allocationAttemptObj.optString("diagnostic")
.contains(UNMATCHED_PARTITION_OR_PC_DIAGNOSTIC_PREFIX));
} finally {
rm.stop();
}
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -49,9 +50,18 @@ import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.MediaType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.INSUFFICIENT_RESOURCE_DIAGNOSTIC_PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.findInAllocations;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyNumberOfAllocationAttempts;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyNumberOfAllocations;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyStateOfAllocations;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests for scheduler/app activities when multi-nodes enabled.
@ -97,6 +107,8 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
conf.set(policyConfPrefix + ".class",
ResourceUsageMultiNodeLookupPolicy.class.getName());
conf.set(policyConfPrefix + ".sorting-interval.ms", "0");
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
serve("/*").with(GuiceContainer.class);
@ -115,6 +127,7 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
config.setCapacity(queueB, 89.5f);
config.setMaximumApplicationMasterResourcePerQueuePercent(queueB, 100);
}
@Before
@ -217,7 +230,7 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
}
}
@Test
@Test (timeout=30000)
public void testAppAssignContainer() throws Exception {
rm.start();
@ -260,34 +273,175 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
verifyNumberOfAllocations(json, 1);
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED");
JSONArray allocationAttempts =
allocations.getJSONArray("allocationAttempt");
assertEquals(2, allocationAttempts.length());
JSONObject allocationObj = json.getJSONObject("allocations");
verifyStateOfAllocations(allocationObj, "allocationState", "ACCEPTED");
JSONObject requestAllocationObj =
allocationObj.getJSONObject("requestAllocation");
verifyNumberOfAllocationAttempts(requestAllocationObj, 2);
verifyStateOfAllocations(requestAllocationObj, "allocationState",
"ALLOCATED");
JSONArray allocationAttemptArray =
requestAllocationObj.getJSONArray("allocationAttempt");
JSONObject allocationAttempt1 = allocationAttemptArray.getJSONObject(0);
verifyStateOfAllocations(allocationAttempt1, "allocationState",
"SKIPPED");
assertTrue(allocationAttempt1.optString("diagnostic")
.contains(INSUFFICIENT_RESOURCE_DIAGNOSTIC_PREFIX));
JSONObject allocationAttempt2 = allocationAttemptArray.getJSONObject(1);
verifyStateOfAllocations(allocationAttempt2, "allocationState",
"ALLOCATED");
} finally {
rm.stop();
}
}
private void verifyNumberOfAllocations(JSONObject json, int realValue)
throws Exception {
if (json.isNull("allocations")) {
assertEquals("Number of allocations is wrong", 0, realValue);
} else {
Object object = json.get("allocations");
if (object.getClass() == JSONObject.class) {
assertEquals("Number of allocations is wrong", 1, realValue);
} else if (object.getClass() == JSONArray.class) {
assertEquals("Number of allocations is wrong in: " + object,
((JSONArray) object).length(), realValue);
@Test (timeout=30000)
public void testInsufficientResourceDiagnostic() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 2 * 1024);
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 2 * 1024);
MockNM nm4 = rm.registerNode("127.0.0.4:1234", 2 * 1024);
try {
RMApp app1 = rm.submitApp(3072, "app1", "user1", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
RMApp app2 = rm.submitApp(1024, "app2", "user1", null, "b");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
WebResource r = resource();
ClientResponse response =
r.path("ws").path("v1").path("cluster").path("scheduler/activities")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("waiting for next allocation", json.getString("diagnostic"));
//Request a container for am2, will reserve a container on nm1
am2.allocate("*", 4096, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
response =
r.path("ws").path("v1").path("cluster").path("scheduler/activities")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
json = response.getEntity(JSONObject.class);
//Check app activities
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = json.getJSONObject("allocations");
//Check diagnostic for request of app1
Predicate<JSONObject> findApp1Pred = (obj) -> obj.optString("name")
.equals(app1.getApplicationId().toString());
JSONObject app1Obj =
findInAllocations(allocationObj, findApp1Pred).get(0);
assertEquals("SKIPPED", app1Obj.optString("allocationState"));
assertEquals(ActivityDiagnosticConstant.APPLICATION_DO_NOT_NEED_RESOURCE,
app1Obj.optString("diagnostic"));
//Check diagnostic for request of app2
Predicate<JSONObject> findApp2ReqPred =
(obj) -> obj.optString("name").equals("request_1_-1");
List<JSONObject> app2ReqObjs =
findInAllocations(allocationObj, findApp2ReqPred);
assertEquals(1, app2ReqObjs.size());
JSONArray app2ReqChildren = app2ReqObjs.get(0).getJSONArray("children");
assertEquals(4, app2ReqChildren.length());
for (int i = 0; i < app2ReqChildren.length(); i++) {
JSONObject reqChild = app2ReqChildren.getJSONObject(i);
if (reqChild.getString("allocationState").equals("SKIPPED")) {
String diagnostic = reqChild.getString("diagnostic");
assertTrue(
diagnostic.contains(INSUFFICIENT_RESOURCE_DIAGNOSTIC_PREFIX));
}
}
} finally {
rm.stop();
}
}
private void verifyStateOfAllocations(JSONObject allocation,
String nameToCheck, String realState) throws Exception {
assertEquals("State of allocation is wrong", allocation.get(nameToCheck),
realState);
@Test (timeout=30000)
public void testAppInsufficientResourceDiagnostic() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 2 * 1024);
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 2 * 1024);
MockNM nm4 = rm.registerNode("127.0.0.4:1234", 2 * 1024);
try {
RMApp app1 = rm.submitApp(3072, "app1", "user1", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("waiting for display", json.getString("diagnostic"));
//Request two containers with different priority for am1
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(0), "*",
Resources.createResource(1024), 1), ResourceRequest
.newInstance(Priority.newInstance(1), "*",
Resources.createResource(4096), 1)), null);
//Trigger scheduling, will allocate a container with priority 0
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
//Trigger scheduling, will reserve a container with priority 1 on nm1
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
json = response.getEntity(JSONObject.class);
//Check app activities
verifyNumberOfAllocations(json, 2);
JSONArray allocationArray = json.getJSONArray("allocations");
//Check first activity is for second allocation with RESERVED state
JSONObject allocationObj = allocationArray.getJSONObject(0);
verifyStateOfAllocations(allocationObj, "allocationState", "RESERVED");
JSONObject requestAllocationObj =
allocationObj.getJSONObject("requestAllocation");
verifyNumberOfAllocationAttempts(requestAllocationObj, 4);
JSONArray allocationAttemptArray =
requestAllocationObj.getJSONArray("allocationAttempt");
for (int i=0; i<allocationAttemptArray.length(); i++) {
JSONObject allocationAttemptObj =
allocationAttemptArray.getJSONObject(i);
if (i != allocationAttemptArray.length()-1) {
assertTrue(allocationAttemptObj.optString("diagnostic")
.contains(INSUFFICIENT_RESOURCE_DIAGNOSTIC_PREFIX));
}
}
// check second activity is for first allocation with ALLOCATED state
allocationObj = allocationArray.getJSONObject(1);
verifyStateOfAllocations(allocationObj, "allocationState", "ACCEPTED");
requestAllocationObj = allocationObj.getJSONObject("requestAllocation");
verifyNumberOfAllocationAttempts(requestAllocationObj, 1);
verifyStateOfAllocations(requestAllocationObj, "allocationState",
"ALLOCATED");
}
finally {
rm.stop();
}
}
}