YARN-9590. Correct incompatible, incomplete and redundant activities. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2019-06-06 19:55:03 +08:00
parent 9fded678ff
commit 0976392502
9 changed files with 36 additions and 18 deletions

View File

@ -102,20 +102,20 @@ public class ActivitiesLogger {
// Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(nodeId,
requestName, null,
priorityStr, ActivityState.SKIPPED, diagnostic, type,
priorityStr, appState, diagnostic, type,
null);
type = "request";
// Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getApplicationId().toString(), requestName,
priorityStr, ActivityState.SKIPPED,
priorityStr, appState,
ActivityDiagnosticConstant.EMPTY, type, allocationRequestId);
}
// Add queue-application activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(nodeId,
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.SKIPPED,
application.getPriority().toString(), appState,
schedulerKey != null ? ActivityDiagnosticConstant.EMPTY :
diagnostic, "app", null);
}

View File

@ -339,8 +339,10 @@ public class ActivitiesManager extends AbstractService {
appAllocations = curAppAllocations;
}
}
if (appAllocations.size() == appActivitiesMaxQueueLength) {
int curQueueLength = appAllocations.size();
while (curQueueLength >= appActivitiesMaxQueueLength) {
appAllocations.poll();
--curQueueLength;
}
appAllocations.add(appAllocation);
Long stopTime =

View File

@ -1597,7 +1597,7 @@ public class CapacityScheduler extends
} else{
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
queue.getParent().getQueueName(), queue.getQueueName(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
node, reservedContainer.getContainerId(), AllocationState.SKIPPED);
}
@ -1687,6 +1687,10 @@ public class CapacityScheduler extends
}
LOG.debug("This node or this node partition doesn't have available or "
+ "killable resource");
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, null,
"", getRootQueue().getQueueName(), ActivityState.REJECTED,
ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + " "
+ candidates.getPartition());
return null;
}

View File

@ -1188,6 +1188,9 @@ public class LeafQueue extends AbstractCSQueue {
application.updateNodeInfoForAMDiagnostics(node);
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM);
return assignment;
} else{
// If we don't allocate anything, and it is not skipped by application,

View File

@ -109,16 +109,10 @@ public abstract class AbstractContainerAllocator {
allocatedResource);
if (rmContainer != null) {
ActivitiesLogger.APP.recordAppActivityWithAllocation(
activitiesManager, node, application, updatedContainer,
ActivityState.RE_RESERVED);
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
} else {
ActivitiesLogger.APP.recordAppActivityWithAllocation(
activitiesManager, node, application, updatedContainer,
ActivityState.RESERVED);
ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
updatedContainer.getContainerId(), ActivityState.RESERVED,
@ -149,7 +143,7 @@ public abstract class AbstractContainerAllocator {
node, application, updatedContainer, ActivityState.ALLOCATED);
ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
updatedContainer.getContainerId(), ActivityState.ACCEPTED,
updatedContainer.getContainerId(), ActivityState.ALLOCATED,
ActivityDiagnosticConstant.EMPTY);
// Update unformed resource
@ -162,6 +156,9 @@ public abstract class AbstractContainerAllocator {
assignment.setSkippedType(
CSAssignment.SkippedType.QUEUE_LIMIT);
}
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
}
return assignment;

View File

@ -628,6 +628,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
}
}
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE
+ getResourceDiagnostics(capability, availableForDC),
rmContainer == null ?
ActivityState.RESERVED : ActivityState.RE_RESERVED);
ContainerAllocation result = new ContainerAllocation(null,
pendingAsk.getPerAllocationResource(), AllocationState.RESERVED);
result.containerNodeType = type;
@ -824,7 +830,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
ActivityDiagnosticConstant.
APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE);
return new ContainerAllocation(reservedContainer, null,
AllocationState.QUEUE_SKIPPED);
AllocationState.PRIORITY_SKIPPED);
}
result = ContainerAllocation.PRIORITY_SKIPPED;

View File

@ -43,6 +43,7 @@ public class AppAllocationInfo {
private long timestamp;
private String dateTime;
private String allocationState;
private String diagnostic;
private List<AppRequestAllocationInfo> requestAllocation;
AppAllocationInfo() {
@ -57,6 +58,7 @@ public class AppAllocationInfo {
this.timestamp = allocation.getTime();
this.dateTime = new Date(allocation.getTime()).toString();
this.allocationState = allocation.getAppState().name();
this.diagnostic = allocation.getDiagnostic();
Map<String, List<ActivityNode>> requestToActivityNodes =
allocation.getAllocationAttempts().stream().collect(Collectors
.groupingBy((e) -> e.getRequestPriority() + "_" + e
@ -96,4 +98,8 @@ public class AppAllocationInfo {
public List<AppRequestAllocationInfo> getRequestAllocation() {
return requestAllocation;
}
public String getDiagnostic() {
return diagnostic;
}
}

View File

@ -460,7 +460,7 @@ public class TestRMWebServicesSchedulerActivities
//Check app activities
verifyNumberOfAllocations(json, 1);
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED");
verifyStateOfAllocations(allocations, "allocationState", "ALLOCATED");
//Check request allocation
JSONObject requestAllocationObj =
allocations.getJSONObject("requestAllocation");
@ -527,7 +527,7 @@ public class TestRMWebServicesSchedulerActivities
JSONArray allocations = json.getJSONArray("allocations");
for (int i = 0; i < allocations.length(); i++) {
verifyStateOfAllocations(allocations.getJSONObject(i),
"allocationState", "ACCEPTED");
"allocationState", "ALLOCATED");
}
} finally {
rm.stop();

View File

@ -277,7 +277,7 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = json.getJSONObject("allocations");
verifyStateOfAllocations(allocationObj, "allocationState", "ACCEPTED");
verifyStateOfAllocations(allocationObj, "allocationState", "ALLOCATED");
JSONObject requestAllocationObj =
allocationObj.getJSONObject("requestAllocation");
verifyNumberOfAllocationAttempts(requestAllocationObj, 2);
@ -437,7 +437,7 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
}
// check second activity is for first allocation with ALLOCATED state
allocationObj = allocationArray.getJSONObject(1);
verifyStateOfAllocations(allocationObj, "allocationState", "ACCEPTED");
verifyStateOfAllocations(allocationObj, "allocationState", "ALLOCATED");
requestAllocationObj = allocationObj.getJSONObject("requestAllocation");
verifyNumberOfAllocationAttempts(requestAllocationObj, 1);
verifyStateOfAllocations(requestAllocationObj, "allocationState",
@ -610,7 +610,7 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
}
// check second activity is for first allocation with ALLOCATED state
allocationObj = allocationArray.getJSONObject(1);
verifyStateOfAllocations(allocationObj, "allocationState", "ACCEPTED");
verifyStateOfAllocations(allocationObj, "allocationState", "ALLOCATED");
requestAllocationObj = allocationObj.getJSONObject("requestAllocation");
verifyNumberOfAllocationAttempts(requestAllocationObj, 1);
verifyStateOfAllocations(requestAllocationObj, "allocationState",