YARN-9300. Lazy preemption should trigger an update on queue preemption metrics for CapacityScheduler. Contributed by Tao Yang.
This commit is contained in:
parent
dddcfa4d9f
commit
50094d7fef
|
@ -35,7 +35,6 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.time.DateUtils;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
|
@ -48,7 +47,6 @@ import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
@ -2098,26 +2096,6 @@ public class CapacityScheduler extends
|
||||||
LeafQueue queue = (LeafQueue) application.getQueue();
|
LeafQueue queue = (LeafQueue) application.getQueue();
|
||||||
queue.completedContainer(getClusterResource(), application, node,
|
queue.completedContainer(getClusterResource(), application, node,
|
||||||
rmContainer, containerStatus, event, null, true);
|
rmContainer, containerStatus, event, null, true);
|
||||||
if (ContainerExitStatus.PREEMPTED == containerStatus.getExitStatus()) {
|
|
||||||
updateQueuePreemptionMetrics(queue, rmContainer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void updateQueuePreemptionMetrics(
|
|
||||||
CSQueue queue, RMContainer rmc) {
|
|
||||||
QueueMetrics qMetrics = queue.getMetrics();
|
|
||||||
final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
|
|
||||||
final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND;
|
|
||||||
Resource containerResource = rmc.getAllocatedResource();
|
|
||||||
qMetrics.preemptContainer();
|
|
||||||
long mbSeconds = (containerResource.getMemorySize() * usedMillis)
|
|
||||||
/ DateUtils.MILLIS_PER_SECOND;
|
|
||||||
long vcSeconds = (containerResource.getVirtualCores() * usedMillis)
|
|
||||||
/ DateUtils.MILLIS_PER_SECOND;
|
|
||||||
qMetrics.updatePreemptedMemoryMBSeconds(mbSeconds);
|
|
||||||
qMetrics.updatePreemptedVcoreSeconds(vcSeconds);
|
|
||||||
qMetrics.updatePreemptedSecondsForCustomResources(containerResource,
|
|
||||||
usedSeconds);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Lock(Lock.NoLock.class)
|
@Lock(Lock.NoLock.class)
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.lang3.time.DateUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -1701,6 +1702,12 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// Notify PreemptionManager
|
// Notify PreemptionManager
|
||||||
csContext.getPreemptionManager().removeKillableContainer(
|
csContext.getPreemptionManager().removeKillableContainer(
|
||||||
new KillableContainer(rmContainer, node.getPartition(), queueName));
|
new KillableContainer(rmContainer, node.getPartition(), queueName));
|
||||||
|
|
||||||
|
// Update preemption metrics if exit status is PREEMPTED
|
||||||
|
if (containerStatus != null
|
||||||
|
&& ContainerExitStatus.PREEMPTED == containerStatus.getExitStatus()) {
|
||||||
|
updateQueuePreemptionMetrics(rmContainer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void allocateResource(Resource clusterResource,
|
void allocateResource(Resource clusterResource,
|
||||||
|
@ -2217,4 +2224,19 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
public long getDefaultApplicationLifetime() {
|
public long getDefaultApplicationLifetime() {
|
||||||
return defaultApplicationLifetime;
|
return defaultApplicationLifetime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateQueuePreemptionMetrics(RMContainer rmc) {
|
||||||
|
final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
|
||||||
|
final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND;
|
||||||
|
Resource containerResource = rmc.getAllocatedResource();
|
||||||
|
metrics.preemptContainer();
|
||||||
|
long mbSeconds = (containerResource.getMemorySize() * usedMillis)
|
||||||
|
/ DateUtils.MILLIS_PER_SECOND;
|
||||||
|
long vcSeconds = (containerResource.getVirtualCores() * usedMillis)
|
||||||
|
/ DateUtils.MILLIS_PER_SECOND;
|
||||||
|
metrics.updatePreemptedMemoryMBSeconds(mbSeconds);
|
||||||
|
metrics.updatePreemptedVcoreSeconds(vcSeconds);
|
||||||
|
metrics.updatePreemptedSecondsForCustomResources(containerResource,
|
||||||
|
usedSeconds);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
|
||||||
|
@ -48,8 +47,6 @@ import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
public class TestCapacitySchedulerLazyPreemption
|
public class TestCapacitySchedulerLazyPreemption
|
||||||
extends CapacitySchedulerPreemptionTestBase {
|
extends CapacitySchedulerPreemptionTestBase {
|
||||||
@Override
|
@Override
|
||||||
|
@ -156,6 +153,14 @@ public class TestCapacitySchedulerLazyPreemption
|
||||||
Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
|
Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
|
||||||
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
||||||
|
|
||||||
|
// Ensure preemption metrics were recored.
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Number of preempted containers incorrectly recorded:", 1,
|
||||||
|
cs.getQueue("a").getMetrics().getAggregatePreemptedContainers());
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Number of preempted containers incorrectly recorded:", 1,
|
||||||
|
cs.getRootQueue().getMetrics().getAggregatePreemptedContainers());
|
||||||
|
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue