YARN-8911. ContainerScheduler incorrectly uses percentage number as the cpu resource utlization.

This commit is contained in:
Haibo Chen 2018-10-24 07:58:26 -07:00
parent 0891cdda79
commit 766b78ee07
8 changed files with 57 additions and 91 deletions

View File

@ -89,18 +89,18 @@ public abstract class ResourceUtilization implements
public abstract void setPhysicalMemory(int pmem); public abstract void setPhysicalMemory(int pmem);
/** /**
* Get <em>CPU</em> utilization. * Get <em>CPU</em> utilization (The amount of vcores used).
* *
* @return <em>CPU utilization</em> normalized to 1 CPU * @return <em>CPU utilization</em>
*/ */
@Public @Public
@Unstable @Unstable
public abstract float getCPU(); public abstract float getCPU();
/** /**
* Set <em>CPU</em> utilization. * Set <em>CPU</em> utilization (The amount of vcores used).
* *
* @param cpu <em>CPU utilization</em> normalized to 1 CPU * @param cpu <em>CPU utilization</em>
*/ */
@Public @Public
@Unstable @Unstable

View File

@ -43,8 +43,7 @@ public interface ContainersMonitor extends Service,
static void increaseResourceUtilization( static void increaseResourceUtilization(
ContainersMonitor containersMonitor, ResourceUtilization resourceUtil, ContainersMonitor containersMonitor, ResourceUtilization resourceUtil,
Resource resource) { Resource resource) {
float vCores = (float) resource.getVirtualCores() / float vCores = (float) resource.getVirtualCores();
containersMonitor.getVCoresAllocatedForContainers();
int vmem = (int) (resource.getMemorySize() int vmem = (int) (resource.getMemorySize()
* containersMonitor.getVmemRatio()); * containersMonitor.getVmemRatio());
resourceUtil.addTo((int)resource.getMemorySize(), vmem, vCores); resourceUtil.addTo((int)resource.getMemorySize(), vmem, vCores);
@ -60,8 +59,7 @@ public interface ContainersMonitor extends Service,
static void decreaseResourceUtilization( static void decreaseResourceUtilization(
ContainersMonitor containersMonitor, ResourceUtilization resourceUtil, ContainersMonitor containersMonitor, ResourceUtilization resourceUtil,
Resource resource) { Resource resource) {
float vCores = (float) resource.getVirtualCores() / float vCores = (float) resource.getVirtualCores();
containersMonitor.getVCoresAllocatedForContainers();
int vmem = (int) (resource.getMemorySize() int vmem = (int) (resource.getMemorySize()
* containersMonitor.getVmemRatio()); * containersMonitor.getVmemRatio());
resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores); resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores);

View File

@ -949,7 +949,8 @@ public class ContainersMonitorImpl extends AbstractService implements
public void subtractNodeResourcesFromResourceUtilization( public void subtractNodeResourcesFromResourceUtilization(
ResourceUtilization resourceUtil) { ResourceUtilization resourceUtil) {
resourceUtil.subtractFrom((int) (getPmemAllocatedForContainers() >> 20), resourceUtil.subtractFrom((int) (getPmemAllocatedForContainers() >> 20),
(int) (getVmemAllocatedForContainers() >> 20), 1.0f); (int) (getVmemAllocatedForContainers() >> 20),
getVCoresAllocatedForContainers());
} }
@Override @Override

View File

@ -123,35 +123,14 @@ public class AllocationBasedResourceUtilizationTracker implements
this.containersAllocation.getCPU(), this.containersAllocation.getCPU(),
getContainersMonitor().getVCoresAllocatedForContainers()); getContainersMonitor().getVCoresAllocatedForContainers());
} }
// Check CPU. Compare using integral values of cores to avoid decimal // Check CPU.
// inaccuracies. if (this.containersAllocation.getCPU() + cpuVcores >
if (!hasEnoughCpu(this.containersAllocation.getCPU(), getContainersMonitor().getVCoresAllocatedForContainers()) {
getContainersMonitor().getVCoresAllocatedForContainers(), cpuVcores)) {
return false; return false;
} }
return true; return true;
} }
/**
* Returns whether there is enough space for coresRequested in totalCores.
* Converts currentAllocation usage to nearest integer count before comparing,
* as floats are inherently imprecise. NOTE: this calculation assumes that
* requested core counts must be integers, and currentAllocation core count
* must also be an integer.
*
* @param currentAllocation The current allocation, a float value from 0 to 1.
* @param totalCores The total cores in the system.
* @param coresRequested The number of cores requested.
* @return True if currentAllocationtotalCores*coresRequested &lt;=
* totalCores.
*/
public boolean hasEnoughCpu(float currentAllocation, long totalCores,
int coresRequested) {
// Must not cast here, as it would truncate the decimal digits.
return Math.round(currentAllocation * totalCores)
+ coresRequested <= totalCores;
}
public ContainersMonitor getContainersMonitor() { public ContainersMonitor getContainersMonitor() {
return this.scheduler.getContainersMonitor(); return this.scheduler.getContainersMonitor();
} }

View File

@ -594,10 +594,7 @@ public class ContainerScheduler extends AbstractService implements
ResourceUtilization resourcesToFreeUp) { ResourceUtilization resourcesToFreeUp) {
return resourcesToFreeUp.getPhysicalMemory() <= 0 && return resourcesToFreeUp.getPhysicalMemory() <= 0 &&
resourcesToFreeUp.getVirtualMemory() <= 0 && resourcesToFreeUp.getVirtualMemory() <= 0 &&
// Convert the number of cores to nearest integral number, due to resourcesToFreeUp.getCPU() <= 0;
// imprecision of direct float comparison.
Math.round(resourcesToFreeUp.getCPU()
* getContainersMonitor().getVCoresAllocatedForContainers()) <= 0;
} }
private ResourceUtilization resourcesToFreeUp( private ResourceUtilization resourcesToFreeUp(

View File

@ -527,7 +527,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
assertNotNull(app); assertNotNull(app);
ResourceUtilization utilization = ResourceUtilization utilization =
ResourceUtilization.newInstance(1024, 2048, 0.25F); ResourceUtilization.newInstance(1024, 2048, 1.0F);
assertEquals(cm.getContainerScheduler().getNumRunningContainers(), 1); assertEquals(cm.getContainerScheduler().getNumRunningContainers(), 1);
assertEquals(utilization, assertEquals(utilization,
cm.getContainerScheduler().getCurrentUtilization()); cm.getContainerScheduler().getCurrentUtilization());

View File

@ -72,22 +72,4 @@ public class TestAllocationBasedResourceUtilizationTracker {
} }
Assert.assertFalse(tracker.hasResourcesAvailable(testContainer)); Assert.assertFalse(tracker.hasResourcesAvailable(testContainer));
} }
/**
* Test the case where the current allocation has been truncated to 0.8888891
* (8/9 cores used). Request 1 additional core - hasEnoughCpu should return
* true.
*/
@Test
public void testHasEnoughCpu() {
AllocationBasedResourceUtilizationTracker tracker =
new AllocationBasedResourceUtilizationTracker(mockContainerScheduler);
float currentAllocation = 0.8888891f;
long totalCores = 9;
int alreadyUsedCores = 8;
Assert.assertTrue(tracker.hasEnoughCpu(currentAllocation, totalCores,
(int) totalCores - alreadyUsedCores));
Assert.assertFalse(tracker.hasEnoughCpu(currentAllocation, totalCores,
(int) totalCores - alreadyUsedCores + 1));
}
} }

View File

@ -20,17 +20,19 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doNothing;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
.RecoveredContainerState; .RecoveredContainerState;
@ -40,7 +42,6 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
/** /**
@ -49,6 +50,10 @@ import org.mockito.MockitoAnnotations;
* ExecutionType. * ExecutionType.
*/ */
public class TestContainerSchedulerRecovery { public class TestContainerSchedulerRecovery {
private static final Resource CONTAINER_SIZE =
Resource.newInstance(1024, 4);
private static final ResourceUtilization ZERO =
ResourceUtilization.newInstance(0, 0, 0.0f);
@Mock private NMContext context; @Mock private NMContext context;
@ -66,13 +71,9 @@ public class TestContainerSchedulerRecovery {
@Mock private ContainerId containerId; @Mock private ContainerId containerId;
@Mock private AllocationBasedResourceUtilizationTracker @InjectMocks private ContainerScheduler spy =
allocationBasedResourceUtilizationTracker;
@InjectMocks private ContainerScheduler tempContainerScheduler =
new ContainerScheduler(context, dispatcher, metrics, 0); new ContainerScheduler(context, dispatcher, metrics, 0);
private ContainerScheduler spy;
private RecoveredContainerState createRecoveredContainerState( private RecoveredContainerState createRecoveredContainerState(
RecoveredContainerStatus status) { RecoveredContainerStatus status) {
@ -81,16 +82,32 @@ public class TestContainerSchedulerRecovery {
return mockState; return mockState;
} }
/**
* Set up the {@link ContainersMonitor} dependency of
* {@link ResourceUtilizationTracker} so that we can
* verify the resource utilization.
*/
private void setupContainerMonitor() {
ContainersMonitor containersMonitor = mock(ContainersMonitor.class);
when(containersMonitor.getVCoresAllocatedForContainers()).thenReturn(10L);
when(containersMonitor.getPmemAllocatedForContainers()).thenReturn(10240L);
when(containersMonitor.getVmemRatio()).thenReturn(1.0f);
when(containersMonitor.getVmemAllocatedForContainers()).thenReturn(10240L);
ContainerManager cm = mock(ContainerManager.class);
when(cm.getContainersMonitor()).thenReturn(containersMonitor);
when(context.getContainerManager()).thenReturn(cm);
}
@Before public void setUp() throws Exception { @Before public void setUp() throws Exception {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
spy = spy(tempContainerScheduler); setupContainerMonitor();
when(container.getContainerId()).thenReturn(containerId); when(container.getContainerId()).thenReturn(containerId);
when(container.getResource()).thenReturn(CONTAINER_SIZE);
when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId); when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId);
when(containerId.getApplicationAttemptId().getApplicationId()) when(containerId.getApplicationAttemptId().getApplicationId())
.thenReturn(appId); .thenReturn(appId);
when(containerId.getContainerId()).thenReturn(123L); when(containerId.getContainerId()).thenReturn(123L);
doNothing().when(allocationBasedResourceUtilizationTracker)
.addContainerResources(container);
} }
@After public void tearDown() { @After public void tearDown() {
@ -112,8 +129,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(1, spy.getNumQueuedGuaranteedContainers()); assertEquals(1, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers()); assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) assertEquals(ZERO, spy.getCurrentUtilization());
.addContainerResources(container);
} }
/*Test if a container is recovered as QUEUED, OPPORTUNISTIC, /*Test if a container is recovered as QUEUED, OPPORTUNISTIC,
@ -132,8 +148,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(1, spy.getNumQueuedOpportunisticContainers()); assertEquals(1, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers()); assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) assertEquals(ZERO, spy.getCurrentUtilization());
.addContainerResources(container);
} }
/*Test if a container is recovered as PAUSED, GUARANTEED, /*Test if a container is recovered as PAUSED, GUARANTEED,
@ -152,8 +167,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(1, spy.getNumQueuedGuaranteedContainers()); assertEquals(1, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers()); assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) assertEquals(ZERO, spy.getCurrentUtilization());
.addContainerResources(container);
} }
/*Test if a container is recovered as PAUSED, OPPORTUNISTIC, /*Test if a container is recovered as PAUSED, OPPORTUNISTIC,
@ -172,8 +186,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(1, spy.getNumQueuedOpportunisticContainers()); assertEquals(1, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers()); assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) assertEquals(ZERO, spy.getCurrentUtilization());
.addContainerResources(container);
} }
/*Test if a container is recovered as LAUNCHED, GUARANTEED, /*Test if a container is recovered as LAUNCHED, GUARANTEED,
@ -192,8 +205,9 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(1, spy.getNumRunningContainers()); assertEquals(1, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1)) assertEquals(
.addContainerResources(container); ResourceUtilization.newInstance(1024, 1024, 4.0f),
spy.getCurrentUtilization());
} }
/*Test if a container is recovered as LAUNCHED, OPPORTUNISTIC, /*Test if a container is recovered as LAUNCHED, OPPORTUNISTIC,
@ -212,8 +226,9 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(1, spy.getNumRunningContainers()); assertEquals(1, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1)) assertEquals(
.addContainerResources(container); ResourceUtilization.newInstance(1024, 1024, 4.0f),
spy.getCurrentUtilization());
} }
/*Test if a container is recovered as REQUESTED, GUARANTEED, /*Test if a container is recovered as REQUESTED, GUARANTEED,
@ -232,8 +247,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers()); assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) assertEquals(ZERO, spy.getCurrentUtilization());
.addContainerResources(container);
} }
/*Test if a container is recovered as REQUESTED, OPPORTUNISTIC, /*Test if a container is recovered as REQUESTED, OPPORTUNISTIC,
@ -252,8 +266,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers()); assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) assertEquals(ZERO, spy.getCurrentUtilization());
.addContainerResources(container);
} }
/*Test if a container is recovered as COMPLETED, GUARANTEED, /*Test if a container is recovered as COMPLETED, GUARANTEED,
@ -272,8 +285,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers()); assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) assertEquals(ZERO, spy.getCurrentUtilization());
.addContainerResources(container);
} }
/*Test if a container is recovered as COMPLETED, OPPORTUNISTIC, /*Test if a container is recovered as COMPLETED, OPPORTUNISTIC,
@ -292,8 +304,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers()); assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) assertEquals(ZERO, spy.getCurrentUtilization());
.addContainerResources(container);
} }
/*Test if a container is recovered as GUARANTEED but no executionType set, /*Test if a container is recovered as GUARANTEED but no executionType set,
@ -311,8 +322,7 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers()); assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) assertEquals(ZERO, spy.getCurrentUtilization());
.addContainerResources(container);
} }
/*Test if a container is recovered as PAUSED but no executionType set, /*Test if a container is recovered as PAUSED but no executionType set,
@ -330,7 +340,6 @@ public class TestContainerSchedulerRecovery {
assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedGuaranteedContainers());
assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers());
assertEquals(0, spy.getNumRunningContainers()); assertEquals(0, spy.getNumRunningContainers());
Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) assertEquals(ZERO, spy.getCurrentUtilization());
.addContainerResources(container);
} }
} }