YARN-7411. Inter-Queue preemption's computeFixpointAllocation need to handle absolute resources while computing normalizedGuarantee. (Sunil G via wangda)
Change-Id: I41b1d7558c20fc4eb2050d40134175a2ef6330cb
(cherry picked from commit 034b312d9f
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
I did not backport the changes to ResourcePBImpl.java because the code was removed by mistake and soon added back as part of YARN-7483 (Eric Payne).
This commit is contained in:
parent
ae7469cb67
commit
835bc686e0
|
@ -111,6 +111,14 @@ public class DefaultResourceCalculator extends ResourceCalculator {
|
||||||
stepFactor.getMemorySize()));
|
stepFactor.getMemorySize()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource multiplyAndNormalizeUp(Resource r, double[] by,
|
||||||
|
Resource stepFactor) {
|
||||||
|
return Resources.createResource(
|
||||||
|
roundUp((long) (r.getMemorySize() * by[0] + 0.5),
|
||||||
|
stepFactor.getMemorySize()));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource multiplyAndNormalizeDown(Resource r, double by,
|
public Resource multiplyAndNormalizeDown(Resource r, double by,
|
||||||
Resource stepFactor) {
|
Resource stepFactor) {
|
||||||
|
|
|
@ -495,6 +495,27 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource multiplyAndNormalizeUp(Resource r, double[] by,
|
||||||
|
Resource stepFactor) {
|
||||||
|
Resource ret = Resource.newInstance(r);
|
||||||
|
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||||
|
for (int i = 0; i < maxLength; i++) {
|
||||||
|
ResourceInformation rResourceInformation = r.getResourceInformation(i);
|
||||||
|
ResourceInformation stepFactorResourceInformation = stepFactor
|
||||||
|
.getResourceInformation(i);
|
||||||
|
|
||||||
|
long rValue = rResourceInformation.getValue();
|
||||||
|
long stepFactorValue = UnitsConversionUtil.convert(
|
||||||
|
stepFactorResourceInformation.getUnits(),
|
||||||
|
rResourceInformation.getUnits(),
|
||||||
|
stepFactorResourceInformation.getValue());
|
||||||
|
ret.setResourceValue(i, ResourceCalculator
|
||||||
|
.roundUp((long) Math.ceil(rValue * by[i]), stepFactorValue));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource multiplyAndNormalizeUp(Resource r, double by,
|
public Resource multiplyAndNormalizeUp(Resource r, double by,
|
||||||
Resource stepFactor) {
|
Resource stepFactor) {
|
||||||
|
|
|
@ -125,7 +125,19 @@ public abstract class ResourceCalculator {
|
||||||
*/
|
*/
|
||||||
public abstract Resource multiplyAndNormalizeUp(
|
public abstract Resource multiplyAndNormalizeUp(
|
||||||
Resource r, double by, Resource stepFactor);
|
Resource r, double by, Resource stepFactor);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Multiply resource <code>r</code> by factor <code>by</code>
|
||||||
|
* and normalize up using step-factor <code>stepFactor</code>.
|
||||||
|
*
|
||||||
|
* @param r resource to be multiplied
|
||||||
|
* @param by multiplier array for all resource types
|
||||||
|
* @param stepFactor factor by which to normalize up
|
||||||
|
* @return resulting normalized resource
|
||||||
|
*/
|
||||||
|
public abstract Resource multiplyAndNormalizeUp(
|
||||||
|
Resource r, double[] by, Resource stepFactor);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Multiply resource <code>r</code> by factor <code>by</code>
|
* Multiply resource <code>r</code> by factor <code>by</code>
|
||||||
* and normalize down using step-factor <code>stepFactor</code>.
|
* and normalize down using step-factor <code>stepFactor</code>.
|
||||||
|
|
|
@ -354,6 +354,11 @@ public class Resources {
|
||||||
return lhs;
|
return lhs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Resource multiplyAndNormalizeUp(ResourceCalculator calculator,
|
||||||
|
Resource lhs, double[] by, Resource factor) {
|
||||||
|
return calculator.multiplyAndNormalizeUp(lhs, by, factor);
|
||||||
|
}
|
||||||
|
|
||||||
public static Resource multiplyAndNormalizeUp(
|
public static Resource multiplyAndNormalizeUp(
|
||||||
ResourceCalculator calculator,Resource lhs, double by, Resource factor) {
|
ResourceCalculator calculator,Resource lhs, double by, Resource factor) {
|
||||||
return calculator.multiplyAndNormalizeUp(lhs, by, factor);
|
return calculator.multiplyAndNormalizeUp(lhs, by, factor);
|
||||||
|
|
|
@ -19,8 +19,11 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
|
||||||
|
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -198,18 +201,33 @@ public class AbstractPreemptableResourceCalculator {
|
||||||
private void resetCapacity(Resource clusterResource,
|
private void resetCapacity(Resource clusterResource,
|
||||||
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
|
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
|
||||||
Resource activeCap = Resource.newInstance(0, 0);
|
Resource activeCap = Resource.newInstance(0, 0);
|
||||||
|
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||||
|
|
||||||
if (ignoreGuar) {
|
if (ignoreGuar) {
|
||||||
for (TempQueuePerPartition q : queues) {
|
for (TempQueuePerPartition q : queues) {
|
||||||
q.normalizedGuarantee = 1.0f / queues.size();
|
for (int i = 0; i < maxLength; i++) {
|
||||||
|
q.normalizedGuarantee[i] = 1.0f / queues.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (TempQueuePerPartition q : queues) {
|
for (TempQueuePerPartition q : queues) {
|
||||||
Resources.addTo(activeCap, q.getGuaranteed());
|
Resources.addTo(activeCap, q.getGuaranteed());
|
||||||
}
|
}
|
||||||
for (TempQueuePerPartition q : queues) {
|
for (TempQueuePerPartition q : queues) {
|
||||||
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
|
for (int i = 0; i < maxLength; i++) {
|
||||||
q.getGuaranteed(), activeCap);
|
ResourceInformation nResourceInformation = q.getGuaranteed()
|
||||||
|
.getResourceInformation(i);
|
||||||
|
ResourceInformation dResourceInformation = activeCap
|
||||||
|
.getResourceInformation(i);
|
||||||
|
|
||||||
|
long nValue = nResourceInformation.getValue();
|
||||||
|
long dValue = UnitsConversionUtil.convert(
|
||||||
|
dResourceInformation.getUnits(), nResourceInformation.getUnits(),
|
||||||
|
dResourceInformation.getValue());
|
||||||
|
if (dValue != 0) {
|
||||||
|
q.normalizedGuarantee[i] = (float) nValue / dValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -46,7 +48,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
|
||||||
Resource untouchableExtra;
|
Resource untouchableExtra;
|
||||||
Resource preemptableExtra;
|
Resource preemptableExtra;
|
||||||
|
|
||||||
double normalizedGuarantee;
|
double[] normalizedGuarantee;
|
||||||
|
|
||||||
final ArrayList<TempQueuePerPartition> children;
|
final ArrayList<TempQueuePerPartition> children;
|
||||||
private Collection<TempAppPerPartition> apps;
|
private Collection<TempAppPerPartition> apps;
|
||||||
|
@ -84,7 +86,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
|
||||||
pendingDeductReserved = Resources.createResource(0);
|
pendingDeductReserved = Resources.createResource(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.normalizedGuarantee = Float.NaN;
|
this.normalizedGuarantee = new double[ResourceUtils
|
||||||
|
.getNumberOfKnownResourceTypes()];
|
||||||
this.children = new ArrayList<>();
|
this.children = new ArrayList<>();
|
||||||
this.apps = new ArrayList<>();
|
this.apps = new ArrayList<>();
|
||||||
this.untouchableExtra = Resource.newInstance(0, 0);
|
this.untouchableExtra = Resource.newInstance(0, 0);
|
||||||
|
@ -226,8 +229,9 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
|
||||||
sb.append(" NAME: " + queueName).append(" CUR: ").append(current)
|
sb.append(" NAME: " + queueName).append(" CUR: ").append(current)
|
||||||
.append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
|
.append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
|
||||||
.append(" GAR: ").append(getGuaranteed()).append(" NORM: ")
|
.append(" GAR: ").append(getGuaranteed()).append(" NORM: ")
|
||||||
.append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ")
|
.append(Arrays.toString(normalizedGuarantee))
|
||||||
.append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted)
|
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
|
||||||
|
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
|
||||||
.append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
|
.append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
|
||||||
.append(" UNTOUCHABLE: ").append(untouchableExtra)
|
.append(" UNTOUCHABLE: ").append(untouchableExtra)
|
||||||
.append(" PREEMPTABLE: ").append(preemptableExtra).append("\n");
|
.append(" PREEMPTABLE: ").append(preemptableExtra).append("\n");
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -542,6 +544,18 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
|
||||||
} else {
|
} else {
|
||||||
res = Resources.createResource(Integer.valueOf(resource[0]),
|
res = Resources.createResource(Integer.valueOf(resource[0]),
|
||||||
Integer.valueOf(resource[1]));
|
Integer.valueOf(resource[1]));
|
||||||
|
if (resource.length > 2) {
|
||||||
|
// Using the same order of resources from ResourceUtils, set resource
|
||||||
|
// informations.
|
||||||
|
ResourceInformation[] storedResourceInfo = ResourceUtils
|
||||||
|
.getResourceTypesArray();
|
||||||
|
for (int i = 2; i < resource.length; i++) {
|
||||||
|
res.setResourceInformation(storedResourceInfo[i].getName(),
|
||||||
|
ResourceInformation.newInstance(storedResourceInfo[i].getName(),
|
||||||
|
storedResourceInfo[i].getUnits(),
|
||||||
|
Integer.valueOf(resource[i])));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,17 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
|
@ -613,4 +619,74 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions
|
||||||
verify(mDisp, never()).handle(
|
verify(mDisp, never()).handle(
|
||||||
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
|
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNormalizeGuaranteeWithMultipleResource() throws IOException {
|
||||||
|
// Initialize resource map
|
||||||
|
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||||
|
String RESOURCE_1 = "res1";
|
||||||
|
|
||||||
|
// Initialize mandatory resources
|
||||||
|
ResourceInformation memory = ResourceInformation.newInstance(
|
||||||
|
ResourceInformation.MEMORY_MB.getName(),
|
||||||
|
ResourceInformation.MEMORY_MB.getUnits(),
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||||
|
ResourceInformation vcores = ResourceInformation.newInstance(
|
||||||
|
ResourceInformation.VCORES.getName(),
|
||||||
|
ResourceInformation.VCORES.getUnits(),
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||||
|
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
||||||
|
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
||||||
|
riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
|
||||||
|
ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));
|
||||||
|
|
||||||
|
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queue structure is:
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* root
|
||||||
|
* / \
|
||||||
|
* a b
|
||||||
|
* / \ / \
|
||||||
|
* a1 a2 b1 b2
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* a1 and b2 are using most of resources.
|
||||||
|
* a2 and b1 needs more resources. Both are under served.
|
||||||
|
* hence demand will consider both queue's need while trying to
|
||||||
|
* do preemption.
|
||||||
|
*/
|
||||||
|
String labelsConfig =
|
||||||
|
"=100,true;";
|
||||||
|
String nodesConfig =
|
||||||
|
"n1=;"; // n1 is default partition
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending
|
||||||
|
"root(=[100:100:10 100:100:10 100:100:10 100:100:10]);" + //root
|
||||||
|
"-a(=[50:80:4 100:100:10 80:90:10 30:20:4]);" + // a
|
||||||
|
"--a1(=[25:30:2 100:50:10 80:90:10 0]);" + // a1
|
||||||
|
"--a2(=[25:50:2 100:50:10 0 30:20:4]);" + // a2
|
||||||
|
"-b(=[50:20:6 100:100:10 20:10 40:50:8]);" + // b
|
||||||
|
"--b1(=[25:5:4 100:20:10 0 20:10:4]);" + // b1
|
||||||
|
"--b2(=[25:15:2 100:20:10 20:10 20:10:4])"; // b2
|
||||||
|
String appsConfig=
|
||||||
|
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||||
|
"a1\t" // app1 in a1
|
||||||
|
+ "(1,8:9:1,n1,,10,false);" +
|
||||||
|
"b2\t" // app2 in b2
|
||||||
|
+ "(1,2:1,n1,,10,false)"; // 80 of y
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
verify(mDisp, times(7)).handle(
|
||||||
|
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
|
||||||
|
|
||||||
|
riMap.remove(RESOURCE_1);
|
||||||
|
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue