YARN-9883. Reshape SchedulerHealth class. Contributed by D M Murali Krishna Reddy

This commit is contained in:
adamantal 2020-12-03 09:55:06 +01:00
parent 42a29199c0
commit 9969745343
1 changed files with 123 additions and 13 deletions

View File

@ -25,6 +25,31 @@ import org.apache.hadoop.yarn.api.records.Resource;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/**
* SchedulerHealth class holds the details of the schedulers operations.
*
* <p><code>SchedulerHealth</code> provides clients with information such as:
* <ol>
* <li>
* scheduler's latest timestamp
* </li>
* <li>
* resources allocated, reserved, released in the last scheduler run
* </li>
* <li>
* latest allocation, release, reservation, preemption details
* </li>
* <li>
* count of latest allocation, release, reservation, preemption
* </li>
* <li>
* aggregate count of latest allocation, release, reservation, preemption,
* fulfilled reservation
* </li>
*</ol>
*
*/
public class SchedulerHealth { public class SchedulerHealth {
static public class DetailedInformation { static public class DetailedInformation {
@ -62,22 +87,22 @@ public class SchedulerHealth {
ALLOCATION, RELEASE, PREEMPTION, RESERVATION, FULFILLED_RESERVATION ALLOCATION, RELEASE, PREEMPTION, RESERVATION, FULFILLED_RESERVATION
} }
long lastSchedulerRunTime; private long lastSchedulerRunTime;
Map<Operation, Resource> lastSchedulerRunDetails; private Map<Operation, Resource> lastSchedulerRunDetails;
Map<Operation, DetailedInformation> schedulerHealthDetails; private Map<Operation, DetailedInformation> lastSchedulerHealthDetails;
Map<Operation, Long> schedulerOperationCounts; private Map<Operation, Long> schedulerOperationCounts;
// this is for counts since the RM started, never reset // this is for counts since the RM started, never reset
Map<Operation, Long> schedulerOperationAggregateCounts; private Map<Operation, Long> schedulerOperationAggregateCounts;
public SchedulerHealth() { SchedulerHealth() {
lastSchedulerRunDetails = new ConcurrentHashMap<>(); lastSchedulerRunDetails = new ConcurrentHashMap<>();
schedulerHealthDetails = new ConcurrentHashMap<>(); lastSchedulerHealthDetails = new ConcurrentHashMap<>();
schedulerOperationCounts = new ConcurrentHashMap<>(); schedulerOperationCounts = new ConcurrentHashMap<>();
schedulerOperationAggregateCounts = new ConcurrentHashMap<>(); schedulerOperationAggregateCounts = new ConcurrentHashMap<>();
for (Operation op : Operation.values()) { for (Operation op : Operation.values()) {
lastSchedulerRunDetails.put(op, Resource.newInstance(0, 0)); lastSchedulerRunDetails.put(op, Resource.newInstance(0, 0));
schedulerOperationCounts.put(op, 0L); schedulerOperationCounts.put(op, 0L);
schedulerHealthDetails.put(op, new DetailedInformation(0, null, null, lastSchedulerHealthDetails.put(op, new DetailedInformation(0, null, null,
null)); null));
schedulerOperationAggregateCounts.put(op, 0L); schedulerOperationAggregateCounts.put(op, 0L);
} }
@ -88,28 +113,28 @@ public class SchedulerHealth {
ContainerId containerId, String queue) { ContainerId containerId, String queue) {
DetailedInformation di = DetailedInformation di =
new DetailedInformation(timestamp, nodeId, containerId, queue); new DetailedInformation(timestamp, nodeId, containerId, queue);
schedulerHealthDetails.put(Operation.ALLOCATION, di); lastSchedulerHealthDetails.put(Operation.ALLOCATION, di);
} }
public void updateRelease(long timestamp, NodeId nodeId, public void updateRelease(long timestamp, NodeId nodeId,
ContainerId containerId, String queue) { ContainerId containerId, String queue) {
DetailedInformation di = DetailedInformation di =
new DetailedInformation(timestamp, nodeId, containerId, queue); new DetailedInformation(timestamp, nodeId, containerId, queue);
schedulerHealthDetails.put(Operation.RELEASE, di); lastSchedulerHealthDetails.put(Operation.RELEASE, di);
} }
public void updatePreemption(long timestamp, NodeId nodeId, public void updatePreemption(long timestamp, NodeId nodeId,
ContainerId containerId, String queue) { ContainerId containerId, String queue) {
DetailedInformation di = DetailedInformation di =
new DetailedInformation(timestamp, nodeId, containerId, queue); new DetailedInformation(timestamp, nodeId, containerId, queue);
schedulerHealthDetails.put(Operation.PREEMPTION, di); lastSchedulerHealthDetails.put(Operation.PREEMPTION, di);
} }
public void updateReservation(long timestamp, NodeId nodeId, public void updateReservation(long timestamp, NodeId nodeId,
ContainerId containerId, String queue) { ContainerId containerId, String queue) {
DetailedInformation di = DetailedInformation di =
new DetailedInformation(timestamp, nodeId, containerId, queue); new DetailedInformation(timestamp, nodeId, containerId, queue);
schedulerHealthDetails.put(Operation.RESERVATION, di); lastSchedulerHealthDetails.put(Operation.RESERVATION, di);
} }
public void updateSchedulerRunDetails(long timestamp, Resource allocated, public void updateSchedulerRunDetails(long timestamp, Resource allocated,
@ -150,6 +175,11 @@ public class SchedulerHealth {
schedulerOperationAggregateCounts.put(op, tmp + count); schedulerOperationAggregateCounts.put(op, tmp + count);
} }
/**
* Get the timestamp of the latest scheduler operation.
*
* @return the scheduler's latest timestamp
*/
public long getLastSchedulerRunTime() { public long getLastSchedulerRunTime() {
return lastSchedulerRunTime; return lastSchedulerRunTime;
} }
@ -158,34 +188,69 @@ public class SchedulerHealth {
return lastSchedulerRunDetails.get(op); return lastSchedulerRunDetails.get(op);
} }
/**
* Get the resources allocated in the last scheduler run.
*
* @return resources allocated
*/
public Resource getResourcesAllocated() { public Resource getResourcesAllocated() {
return getResourceDetails(Operation.ALLOCATION); return getResourceDetails(Operation.ALLOCATION);
} }
/**
* Get the resources reserved in the last scheduler run.
*
* @return resources reserved
*/
public Resource getResourcesReserved() { public Resource getResourcesReserved() {
return getResourceDetails(Operation.RESERVATION); return getResourceDetails(Operation.RESERVATION);
} }
/**
* Get the resources released in the last scheduler run.
*
* @return resources released
*/
public Resource getResourcesReleased() { public Resource getResourcesReleased() {
return getResourceDetails(Operation.RELEASE); return getResourceDetails(Operation.RELEASE);
} }
private DetailedInformation getDetailedInformation(Operation op) { private DetailedInformation getDetailedInformation(Operation op) {
return schedulerHealthDetails.get(op); return lastSchedulerHealthDetails.get(op);
} }
/**
* Get the details of last allocation.
*
* @return last allocation details
*/
public DetailedInformation getLastAllocationDetails() { public DetailedInformation getLastAllocationDetails() {
return getDetailedInformation(Operation.ALLOCATION); return getDetailedInformation(Operation.ALLOCATION);
} }
/**
* Get the details of last release.
*
* @return last release details
*/
public DetailedInformation getLastReleaseDetails() { public DetailedInformation getLastReleaseDetails() {
return getDetailedInformation(Operation.RELEASE); return getDetailedInformation(Operation.RELEASE);
} }
/**
* Get the details of last reservation.
*
* @return last reservation details
*/
public DetailedInformation getLastReservationDetails() { public DetailedInformation getLastReservationDetails() {
return getDetailedInformation(Operation.RESERVATION); return getDetailedInformation(Operation.RESERVATION);
} }
/**
* Get the details of last preemption.
*
* @return last preemption details
*/
public DetailedInformation getLastPreemptionDetails() { public DetailedInformation getLastPreemptionDetails() {
return getDetailedInformation(Operation.PREEMPTION); return getDetailedInformation(Operation.PREEMPTION);
} }
@ -194,18 +259,38 @@ public class SchedulerHealth {
return schedulerOperationCounts.get(op); return schedulerOperationCounts.get(op);
} }
/**
* Get the count of allocation from the latest scheduler health report.
*
* @return allocation count
*/
public Long getAllocationCount() { public Long getAllocationCount() {
return getOperationCount(Operation.ALLOCATION); return getOperationCount(Operation.ALLOCATION);
} }
/**
* Get the count of release from the latest scheduler health report.
*
* @return release count
*/
public Long getReleaseCount() { public Long getReleaseCount() {
return getOperationCount(Operation.RELEASE); return getOperationCount(Operation.RELEASE);
} }
/**
* Get the count of reservation from the latest scheduler health report.
*
* @return reservation count
*/
public Long getReservationCount() { public Long getReservationCount() {
return getOperationCount(Operation.RESERVATION); return getOperationCount(Operation.RESERVATION);
} }
/**
* Get the count of preemption from the latest scheduler health report.
*
* @return preemption count
*/
public Long getPreemptionCount() { public Long getPreemptionCount() {
return getOperationCount(Operation.PREEMPTION); return getOperationCount(Operation.PREEMPTION);
} }
@ -214,22 +299,47 @@ public class SchedulerHealth {
return schedulerOperationAggregateCounts.get(op); return schedulerOperationAggregateCounts.get(op);
} }
/**
* Get the aggregate of all the allocations count.
*
* @return aggregate allocation count
*/
public Long getAggregateAllocationCount() { public Long getAggregateAllocationCount() {
return getAggregateOperationCount(Operation.ALLOCATION); return getAggregateOperationCount(Operation.ALLOCATION);
} }
/**
* Get the aggregate of all the release count.
*
* @return aggregate release count
*/
public Long getAggregateReleaseCount() { public Long getAggregateReleaseCount() {
return getAggregateOperationCount(Operation.RELEASE); return getAggregateOperationCount(Operation.RELEASE);
} }
/**
* Get the aggregate of all the reservations count.
*
* @return aggregate reservation count
*/
public Long getAggregateReservationCount() { public Long getAggregateReservationCount() {
return getAggregateOperationCount(Operation.RESERVATION); return getAggregateOperationCount(Operation.RESERVATION);
} }
/**
* Get the aggregate of all the preemption count.
*
* @return aggregate preemption count
*/
public Long getAggregatePreemptionCount() { public Long getAggregatePreemptionCount() {
return getAggregateOperationCount(Operation.PREEMPTION); return getAggregateOperationCount(Operation.PREEMPTION);
} }
/**
* Get the aggregate of all the fulfilled reservations count.
*
* @return aggregate fulfilled reservations count
*/
public Long getAggregateFulFilledReservationsCount() { public Long getAggregateFulFilledReservationsCount() {
return getAggregateOperationCount(Operation.FULFILLED_RESERVATION); return getAggregateOperationCount(Operation.FULFILLED_RESERVATION);
} }