YARN-5270. Solve miscellaneous issues caused by YARN-4844. Contributed by Wangda Tan

This commit is contained in:
Jian He 2016-07-11 22:38:35 -07:00
parent 4e7e48cdc1
commit 23eb3c7ceb
34 changed files with 162 additions and 122 deletions

View File

@ -135,7 +135,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -664,7 +663,7 @@ public abstract class TaskAttemptImpl implements
//TODO:create the resource reqt for this Task attempt //TODO:create the resource reqt for this Task attempt
this.resourceCapability = recordFactory.newRecordInstance(Resource.class); this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
this.resourceCapability.setMemory( this.resourceCapability.setMemorySize(
getMemoryRequired(conf, taskId.getTaskType())); getMemoryRequired(conf, taskId.getTaskType()));
this.resourceCapability.setVirtualCores( this.resourceCapability.setVirtualCores(
getCpuRequired(conf, taskId.getTaskType())); getCpuRequired(conf, taskId.getTaskType()));

View File

@ -374,7 +374,7 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
} }
// set the resources // set the resources
reqEvent.getCapability().setMemory(mapResourceRequest.getMemorySize()); reqEvent.getCapability().setMemorySize(mapResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores( reqEvent.getCapability().setVirtualCores(
mapResourceRequest.getVirtualCores()); mapResourceRequest.getVirtualCores());
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
@ -402,7 +402,7 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
} }
// set the resources // set the resources
reqEvent.getCapability().setMemory(reduceResourceRequest.getMemorySize()); reqEvent.getCapability().setMemorySize(reduceResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores( reqEvent.getCapability().setVirtualCores(
reduceResourceRequest.getVirtualCores()); reduceResourceRequest.getVirtualCores());
if (reqEvent.getEarlierAttemptFailed()) { if (reqEvent.getEarlierAttemptFailed()) {

View File

@ -89,7 +89,7 @@ public class TestTypeConverter {
ApplicationResourceUsageReport appUsageRpt = Records ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class); .newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class); Resource r = Records.newRecord(Resource.class);
r.setMemory(2048); r.setMemorySize(2048);
appUsageRpt.setNeededResources(r); appUsageRpt.setNeededResources(r);
appUsageRpt.setNumReservedContainers(1); appUsageRpt.setNumReservedContainers(1);
appUsageRpt.setNumUsedContainers(3); appUsageRpt.setNumUsedContainers(3);
@ -128,7 +128,7 @@ public class TestTypeConverter {
ApplicationResourceUsageReport appUsageRpt = Records ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class); .newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class); Resource r = Records.newRecord(Resource.class);
r.setMemory(2048); r.setMemorySize(2048);
appUsageRpt.setNeededResources(r); appUsageRpt.setNeededResources(r);
appUsageRpt.setNumReservedContainers(1); appUsageRpt.setNumReservedContainers(1);
appUsageRpt.setNumUsedContainers(3); appUsageRpt.setNumUsedContainers(3);

View File

@ -337,7 +337,7 @@ public class YARNRunner implements ClientProtocol {
// Setup resource requirements // Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class); Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemory( capability.setMemorySize(
conf.getInt( conf.getInt(
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
) )

View File

@ -543,11 +543,11 @@ public class ResourceSchedulerWrapper
} }
); );
metrics.register("variable.cluster.allocated.vcores", metrics.register("variable.cluster.allocated.vcores",
new Gauge<Long>() { new Gauge<Integer>() {
@Override @Override
public Long getValue() { public Integer getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) { if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0L; return 0;
} else { } else {
return scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); return scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
} }
@ -567,11 +567,11 @@ public class ResourceSchedulerWrapper
} }
); );
metrics.register("variable.cluster.available.vcores", metrics.register("variable.cluster.available.vcores",
new Gauge<Long>() { new Gauge<Integer>() {
@Override @Override
public Long getValue() { public Integer getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) { if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0L; return 0;
} else { } else {
return scheduler.getRootQueueMetrics().getAvailableVirtualCores(); return scheduler.getRootQueueMetrics().getAvailableVirtualCores();
} }

View File

@ -19,14 +19,13 @@
package org.apache.hadoop.yarn.api.records; package org.apache.hadoop.yarn.api.records;
import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
* <p><code>Resource</code> models a set of computer resources in the * <p><code>Resource</code> models a set of computer resources in the
* cluster.</p> * cluster.</p>
@ -56,9 +55,18 @@ public abstract class Resource implements Comparable<Resource> {
@Public @Public
@Stable @Stable
public static Resource newInstance(long memory, long vCores) { public static Resource newInstance(int memory, int vCores) {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory); resource.setMemorySize(memory);
resource.setVirtualCores(vCores);
return resource;
}
@Public
@Stable
public static Resource newInstance(long memory, int vCores) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemorySize(memory);
resource.setVirtualCores(vCores); resource.setVirtualCores(vCores);
return resource; return resource;
} }
@ -78,19 +86,31 @@ public abstract class Resource implements Comparable<Resource> {
* Get <em>memory</em> of the resource. * Get <em>memory</em> of the resource.
* @return <em>memory</em> of the resource * @return <em>memory</em> of the resource
*/ */
@Private @Public
@Unstable @Stable
public long getMemorySize() { public long getMemorySize() {
throw new NotImplementedException("getVirtualCoresSize is not implemented"); throw new NotImplementedException(
"This method is implemented by ResourcePBImpl");
} }
/**
* Set <em>memory</em> of the resource.
* @param memory <em>memory</em> of the resource
*/
@Public
@Deprecated
public abstract void setMemory(int memory);
/** /**
* Set <em>memory</em> of the resource. * Set <em>memory</em> of the resource.
* @param memory <em>memory</em> of the resource * @param memory <em>memory</em> of the resource
*/ */
@Public @Public
@Stable @Stable
public abstract void setMemory(long memory); public void setMemorySize(long memory) {
throw new NotImplementedException(
"This method is implemented by ResourcePBImpl");
}
/** /**
@ -106,12 +126,6 @@ public abstract class Resource implements Comparable<Resource> {
@Public @Public
@Evolving @Evolving
public abstract int getVirtualCores(); public abstract int getVirtualCores();
@Public
@Unstable
public long getVirtualCoresSize() {
throw new NotImplementedException("getVirtualCoresSize is not implemented");
}
/** /**
* Set <em>number of virtual cpu cores</em> of the resource. * Set <em>number of virtual cpu cores</em> of the resource.
@ -125,9 +139,7 @@ public abstract class Resource implements Comparable<Resource> {
*/ */
@Public @Public
@Evolving @Evolving
public void setVirtualCores(long vCores) { public abstract void setVirtualCores(int vCores);
throw new NotImplementedException("getVirtualCoresSize is not implemented");
}
@Override @Override
public int hashCode() { public int hashCode() {

View File

@ -55,7 +55,7 @@ message ContainerIdProto {
message ResourceProto { message ResourceProto {
optional int64 memory = 1; optional int64 memory = 1;
optional int64 virtual_cores = 2; optional int32 virtual_cores = 2;
} }
message ResourceUtilizationProto { message ResourceUtilizationProto {

View File

@ -171,7 +171,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
Records.newRecord(ContainerLaunchContext.class); Records.newRecord(ContainerLaunchContext.class);
appContext.setAMContainerSpec(amContainer); appContext.setAMContainerSpec(amContainer);
Resource capability = Records.newRecord(Resource.class); Resource capability = Records.newRecord(Resource.class);
capability.setMemory(10); capability.setMemorySize(10);
capability.setVirtualCores(1); capability.setVirtualCores(1);
appContext.setResource(capability); appContext.setResource(capability);
ApplicationId appId = client.submitApplication(appContext); ApplicationId appId = client.submitApplication(appContext);

View File

@ -67,24 +67,25 @@ public class ResourcePBImpl extends Resource {
} }
@Override @Override
public void setMemory(long memory) { @SuppressWarnings("deprecation")
public void setMemory(int memory) {
setMemorySize(memory);
}
@Override
public void setMemorySize(long memory) {
maybeInitBuilder(); maybeInitBuilder();
builder.setMemory(memory); builder.setMemory(memory);
} }
@Override @Override
public int getVirtualCores() { public int getVirtualCores() {
return (int) getVirtualCoresSize();
}
@Override
public long getVirtualCoresSize() {
ResourceProtoOrBuilder p = viaProto ? proto : builder; ResourceProtoOrBuilder p = viaProto ? proto : builder;
return p.getVirtualCores(); return p.getVirtualCores();
} }
@Override @Override
public void setVirtualCores(long vCores) { public void setVirtualCores(int vCores) {
maybeInitBuilder(); maybeInitBuilder();
builder.setVirtualCores(vCores); builder.setVirtualCores(vCores);
} }

View File

@ -56,7 +56,7 @@ public class DefaultResourceCalculator extends ResourceCalculator {
} }
@Override @Override
public Resource divideAndCeil(Resource numerator, long denominator) { public Resource divideAndCeil(Resource numerator, int denominator) {
return Resources.createResource( return Resources.createResource(
divideAndCeil(numerator.getMemorySize(), denominator)); divideAndCeil(numerator.getMemorySize(), denominator));
} }

View File

@ -142,7 +142,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
} }
@Override @Override
public Resource divideAndCeil(Resource numerator, long denominator) { public Resource divideAndCeil(Resource numerator, int denominator) {
return Resources.createResource( return Resources.createResource(
divideAndCeil(numerator.getMemorySize(), denominator), divideAndCeil(numerator.getMemorySize(), denominator),
divideAndCeil(numerator.getVirtualCores(), denominator) divideAndCeil(numerator.getVirtualCores(), denominator)
@ -157,7 +157,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
Math.max(r.getMemorySize(), minimumResource.getMemorySize()), Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
stepFactor.getMemorySize()), stepFactor.getMemorySize()),
maximumResource.getMemorySize()); maximumResource.getMemorySize());
long normalizedCores = Math.min( int normalizedCores = Math.min(
roundUp( roundUp(
Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()), Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
stepFactor.getVirtualCores()), stepFactor.getVirtualCores()),

View File

@ -30,6 +30,13 @@ public abstract class ResourceCalculator {
public abstract int public abstract int
compare(Resource clusterResource, Resource lhs, Resource rhs); compare(Resource clusterResource, Resource lhs, Resource rhs);
public static int divideAndCeil(int a, int b) {
if (b == 0) {
return 0;
}
return (a + (b - 1)) / b;
}
public static long divideAndCeil(long a, long b) { public static long divideAndCeil(long a, long b) {
if (b == 0) { if (b == 0) {
@ -38,6 +45,10 @@ public abstract class ResourceCalculator {
return (a + (b - 1)) / b; return (a + (b - 1)) / b;
} }
public static int roundUp(int a, int b) {
return divideAndCeil(a, b) * b;
}
public static long roundUp(long a, long b) { public static long roundUp(long a, long b) {
return divideAndCeil(a, b) * b; return divideAndCeil(a, b) * b;
} }
@ -46,6 +57,10 @@ public abstract class ResourceCalculator {
return (a / b) * b; return (a / b) * b;
} }
public static int roundDown(int a, int b) {
return (a / b) * b;
}
/** /**
* Compute the number of containers which can be allocated given * Compute the number of containers which can be allocated given
* <code>available</code> and <code>required</code> resources. * <code>available</code> and <code>required</code> resources.
@ -169,7 +184,7 @@ public abstract class ResourceCalculator {
* @param denominator denominator * @param denominator denominator
* @return resultant resource * @return resultant resource
*/ */
public abstract Resource divideAndCeil(Resource numerator, long denominator); public abstract Resource divideAndCeil(Resource numerator, int denominator);
/** /**
* Check if a smaller resource can be contained by bigger resource. * Check if a smaller resource can be contained by bigger resource.

View File

@ -42,7 +42,13 @@ public class Resources {
} }
@Override @Override
public void setMemory(long memory) { public void setMemorySize(long memory) {
throw new RuntimeException("NONE cannot be modified!");
}
@Override
@SuppressWarnings("deprecation")
public void setMemory(int memory) {
throw new RuntimeException("NONE cannot be modified!"); throw new RuntimeException("NONE cannot be modified!");
} }
@ -52,12 +58,7 @@ public class Resources {
} }
@Override @Override
public long getVirtualCoresSize() { public void setVirtualCores(int cores) {
return 0;
}
@Override
public void setVirtualCores(long cores) {
throw new RuntimeException("NONE cannot be modified!"); throw new RuntimeException("NONE cannot be modified!");
} }
@ -86,7 +87,13 @@ public class Resources {
} }
@Override @Override
public void setMemory(long memory) { @SuppressWarnings("deprecation")
public void setMemory(int memory) {
throw new RuntimeException("UNBOUNDED cannot be modified!");
}
@Override
public void setMemorySize(long memory) {
throw new RuntimeException("UNBOUNDED cannot be modified!"); throw new RuntimeException("UNBOUNDED cannot be modified!");
} }
@ -96,12 +103,7 @@ public class Resources {
} }
@Override @Override
public long getVirtualCoresSize() { public void setVirtualCores(int cores) {
return Long.MAX_VALUE;
}
@Override
public void setVirtualCores(long cores) {
throw new RuntimeException("UNBOUNDED cannot be modified!"); throw new RuntimeException("UNBOUNDED cannot be modified!");
} }
@ -109,20 +111,31 @@ public class Resources {
public int compareTo(Resource o) { public int compareTo(Resource o) {
long diff = Long.MAX_VALUE - o.getMemorySize(); long diff = Long.MAX_VALUE - o.getMemorySize();
if (diff == 0) { if (diff == 0) {
diff = Long.MAX_VALUE - o.getVirtualCoresSize(); diff = Integer.MAX_VALUE - o.getVirtualCores();
} }
return Long.signum(diff); return Long.signum(diff);
} }
}; };
public static Resource createResource(int memory) {
return createResource(memory, (memory > 0) ? 1 : 0);
}
public static Resource createResource(int memory, int cores) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemorySize(memory);
resource.setVirtualCores(cores);
return resource;
}
public static Resource createResource(long memory) { public static Resource createResource(long memory) {
return createResource(memory, (memory > 0) ? 1 : 0); return createResource(memory, (memory > 0) ? 1 : 0);
} }
public static Resource createResource(long memory, long cores) { public static Resource createResource(long memory, int cores) {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory); resource.setMemorySize(memory);
resource.setVirtualCores(cores); resource.setVirtualCores(cores);
return resource; return resource;
} }
@ -140,7 +153,7 @@ public class Resources {
} }
public static Resource addTo(Resource lhs, Resource rhs) { public static Resource addTo(Resource lhs, Resource rhs) {
lhs.setMemory(lhs.getMemorySize() + rhs.getMemorySize()); lhs.setMemorySize(lhs.getMemorySize() + rhs.getMemorySize());
lhs.setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores()); lhs.setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores());
return lhs; return lhs;
} }
@ -150,7 +163,7 @@ public class Resources {
} }
public static Resource subtractFrom(Resource lhs, Resource rhs) { public static Resource subtractFrom(Resource lhs, Resource rhs) {
lhs.setMemory(lhs.getMemorySize() - rhs.getMemorySize()); lhs.setMemorySize(lhs.getMemorySize() - rhs.getMemorySize());
lhs.setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores()); lhs.setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores());
return lhs; return lhs;
} }
@ -164,7 +177,7 @@ public class Resources {
} }
public static Resource multiplyTo(Resource lhs, double by) { public static Resource multiplyTo(Resource lhs, double by) {
lhs.setMemory((int)(lhs.getMemorySize() * by)); lhs.setMemorySize((long)(lhs.getMemorySize() * by));
lhs.setVirtualCores((int)(lhs.getVirtualCores() * by)); lhs.setVirtualCores((int)(lhs.getVirtualCores() * by));
return lhs; return lhs;
} }
@ -179,7 +192,7 @@ public class Resources {
*/ */
public static Resource multiplyAndAddTo( public static Resource multiplyAndAddTo(
Resource lhs, Resource rhs, double by) { Resource lhs, Resource rhs, double by) {
lhs.setMemory(lhs.getMemorySize() + (int)(rhs.getMemorySize() * by)); lhs.setMemorySize(lhs.getMemorySize() + (long)(rhs.getMemorySize() * by));
lhs.setVirtualCores(lhs.getVirtualCores() lhs.setVirtualCores(lhs.getVirtualCores()
+ (int)(rhs.getVirtualCores() * by)); + (int)(rhs.getVirtualCores() * by));
return lhs; return lhs;
@ -197,7 +210,7 @@ public class Resources {
public static Resource multiplyAndRoundDown(Resource lhs, double by) { public static Resource multiplyAndRoundDown(Resource lhs, double by) {
Resource out = clone(lhs); Resource out = clone(lhs);
out.setMemory((int)(lhs.getMemorySize() * by)); out.setMemorySize((long)(lhs.getMemorySize() * by));
out.setVirtualCores((int)(lhs.getVirtualCores() * by)); out.setVirtualCores((int)(lhs.getVirtualCores() * by));
return out; return out;
} }

View File

@ -24,18 +24,18 @@ import static org.junit.Assert.assertTrue;
public class TestResources { public class TestResources {
public Resource createResource(long memory, long vCores) { public Resource createResource(long memory, int vCores) {
return Resource.newInstance(memory, vCores); return Resource.newInstance(memory, vCores);
} }
@Test(timeout=1000) @Test(timeout=1000)
public void testCompareToWithUnboundedResource() { public void testCompareToWithUnboundedResource() {
assertTrue(Resources.unbounded().compareTo( assertTrue(Resources.unbounded().compareTo(
createResource(Long.MAX_VALUE, Long.MAX_VALUE)) == 0); createResource(Long.MAX_VALUE, Integer.MAX_VALUE)) == 0);
assertTrue(Resources.unbounded().compareTo( assertTrue(Resources.unbounded().compareTo(
createResource(Long.MAX_VALUE, 0)) > 0); createResource(Long.MAX_VALUE, 0)) > 0);
assertTrue(Resources.unbounded().compareTo( assertTrue(Resources.unbounded().compareTo(
createResource(0, Long.MAX_VALUE)) > 0); createResource(0, Integer.MAX_VALUE)) > 0);
} }
@Test(timeout=1000) @Test(timeout=1000)

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -425,9 +424,9 @@ public class BuilderUtils {
return report; return report;
} }
public static Resource newResource(long memory, long vCores) { public static Resource newResource(long memory, int vCores) {
Resource resource = recordFactory.newRecordInstance(Resource.class); Resource resource = recordFactory.newRecordInstance(Resource.class);
resource.setMemory(memory); resource.setMemorySize(memory);
resource.setVirtualCores(vCores); resource.setVirtualCores(vCores);
return resource; return resource;
} }

View File

@ -193,7 +193,7 @@ public class TestYarnServerApiClasses {
original.setHttpPort(8080); original.setHttpPort(8080);
original.setNodeId(getNodeId()); original.setNodeId(getNodeId());
Resource resource = recordFactory.newRecordInstance(Resource.class); Resource resource = recordFactory.newRecordInstance(Resource.class);
resource.setMemory(10000); resource.setMemorySize(10000);
resource.setVirtualCores(2); resource.setVirtualCores(2);
original.setResource(resource); original.setResource(resource);
RegisterNodeManagerRequestPBImpl copy = new RegisterNodeManagerRequestPBImpl( RegisterNodeManagerRequestPBImpl copy = new RegisterNodeManagerRequestPBImpl(

View File

@ -462,7 +462,7 @@ public abstract class BaseAMRMProxyTest {
pri.setPriority(priority); pri.setPriority(priority);
req.setPriority(pri); req.setPriority(pri);
Resource capability = Records.newRecord(Resource.class); Resource capability = Records.newRecord(Resource.class);
capability.setMemory(memory); capability.setMemorySize(memory);
capability.setVirtualCores(vCores); capability.setVirtualCores(vCores);
req.setCapability(capability); req.setCapability(capability);
if (labelExpression != null) { if (labelExpression != null) {

View File

@ -33,16 +33,16 @@ public class TestNodeManagerMetrics {
DefaultMetricsSystem.initialize("NodeManager"); DefaultMetricsSystem.initialize("NodeManager");
NodeManagerMetrics metrics = NodeManagerMetrics.create(); NodeManagerMetrics metrics = NodeManagerMetrics.create();
Resource total = Records.newRecord(Resource.class); Resource total = Records.newRecord(Resource.class);
total.setMemory(8*GiB); total.setMemorySize(8*GiB);
total.setVirtualCores(16); total.setVirtualCores(16);
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(512); //512MiB resource.setMemorySize(512); //512MiB
resource.setVirtualCores(2); resource.setVirtualCores(2);
Resource largerResource = Records.newRecord(Resource.class); Resource largerResource = Records.newRecord(Resource.class);
largerResource.setMemory(1024); largerResource.setMemorySize(1024);
largerResource.setVirtualCores(2); largerResource.setVirtualCores(2);
Resource smallerResource = Records.newRecord(Resource.class); Resource smallerResource = Records.newRecord(Resource.class);
smallerResource.setMemory(256); smallerResource.setMemorySize(256);
smallerResource.setVirtualCores(1); smallerResource.setVirtualCores(1);
metrics.addResource(total); metrics.addResource(total);

View File

@ -331,7 +331,7 @@ public class ResourceTrackerService extends AbstractService implements
String nid = nodeId.toString(); String nid = nodeId.toString();
if (nodes != null && Arrays.asList(nodes).contains(nid)) { if (nodes != null && Arrays.asList(nodes).contains(nid)) {
capability.setMemory(this.drConf.getMemoryPerNode(nid)); capability.setMemorySize(this.drConf.getMemoryPerNode(nid));
capability.setVirtualCores(this.drConf.getVcoresPerNode(nid)); capability.setVirtualCores(this.drConf.getVcoresPerNode(nid));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Resource for node: " + nid + " is adjusted to " + LOG.debug("Resource for node: " + nid + " is adjusted to " +

View File

@ -706,7 +706,7 @@ public abstract class AbstractYarnScheduler
int nodeMemory = (int)totalResource.getMemorySize(); int nodeMemory = (int)totalResource.getMemorySize();
if (nodeMemory > maxNodeMemory) { if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory; maxNodeMemory = nodeMemory;
maximumAllocation.setMemory(Math.min( maximumAllocation.setMemorySize(Math.min(
configuredMaximumAllocation.getMemorySize(), maxNodeMemory)); configuredMaximumAllocation.getMemorySize(), maxNodeMemory));
} }
int nodeVCores = totalResource.getVirtualCores(); int nodeVCores = totalResource.getVirtualCores();
@ -738,9 +738,9 @@ public abstract class AbstractYarnScheduler
} }
} }
if (maxNodeMemory == -1) { // no nodes if (maxNodeMemory == -1) { // no nodes
maximumAllocation.setMemory(configuredMaximumAllocation.getMemorySize()); maximumAllocation.setMemorySize(configuredMaximumAllocation.getMemorySize());
} else { } else {
maximumAllocation.setMemory( maximumAllocation.setMemorySize(
Math.min(configuredMaximumAllocation.getMemorySize(), maxNodeMemory)); Math.min(configuredMaximumAllocation.getMemorySize(), maxNodeMemory));
} }
if (maxNodeVCores == -1) { // no nodes if (maxNodeVCores == -1) { // no nodes

View File

@ -61,7 +61,7 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of apps failed") MutableCounterInt appsFailed; @Metric("# of apps failed") MutableCounterInt appsFailed;
@Metric("Allocated memory in MB") MutableGaugeLong allocatedMB; @Metric("Allocated memory in MB") MutableGaugeLong allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeLong allocatedVCores; @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers; @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated; @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of allocated node-local containers") @Metric("Aggregate # of allocated node-local containers")
@ -72,12 +72,12 @@ public class QueueMetrics implements MetricsSource {
MutableCounterLong aggregateOffSwitchContainersAllocated; MutableCounterLong aggregateOffSwitchContainersAllocated;
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased; @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
@Metric("Available memory in MB") MutableGaugeLong availableMB; @Metric("Available memory in MB") MutableGaugeLong availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeLong availableVCores; @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
@Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB; @Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB;
@Metric("Pending CPU allocation in virtual cores") MutableGaugeLong pendingVCores; @Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores;
@Metric("# of pending containers") MutableGaugeInt pendingContainers; @Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in MB") MutableGaugeLong reservedMB; @Metric("# of reserved memory in MB") MutableGaugeLong reservedMB;
@Metric("Reserved CPU in virtual cores") MutableGaugeLong reservedVCores; @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers; @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
@Metric("# of active users") MutableGaugeInt activeUsers; @Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active applications") MutableGaugeInt activeApplications; @Metric("# of active applications") MutableGaugeInt activeApplications;
@ -364,7 +364,7 @@ public class QueueMetrics implements MetricsSource {
private void _incrPendingResources(int containers, Resource res) { private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers); pendingContainers.incr(containers);
pendingMB.incr(res.getMemorySize() * containers); pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCoresSize() * containers); pendingVCores.incr(res.getVirtualCores() * containers);
} }
public void decrPendingResources(String user, int containers, Resource res) { public void decrPendingResources(String user, int containers, Resource res) {
@ -381,7 +381,7 @@ public class QueueMetrics implements MetricsSource {
private void _decrPendingResources(int containers, Resource res) { private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers); pendingContainers.decr(containers);
pendingMB.decr(res.getMemorySize() * containers); pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCoresSize() * containers); pendingVCores.decr(res.getVirtualCores() * containers);
} }
public void incrNodeTypeAggregations(String user, NodeType type) { public void incrNodeTypeAggregations(String user, NodeType type) {
@ -409,7 +409,7 @@ public class QueueMetrics implements MetricsSource {
aggregateContainersAllocated.incr(containers); aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemorySize() * containers); allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCoresSize() * containers); allocatedVCores.incr(res.getVirtualCores() * containers);
if (decrPending) { if (decrPending) {
_decrPendingResources(containers, res); _decrPendingResources(containers, res);
} }
@ -448,7 +448,7 @@ public class QueueMetrics implements MetricsSource {
allocatedContainers.decr(containers); allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers); aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemorySize() * containers); allocatedMB.decr(res.getMemorySize() * containers);
allocatedVCores.decr(res.getVirtualCoresSize() * containers); allocatedVCores.decr(res.getVirtualCores() * containers);
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.releaseResources(user, containers, res); userMetrics.releaseResources(user, containers, res);
@ -561,14 +561,15 @@ public class QueueMetrics implements MetricsSource {
} }
public Resource getAllocatedResources() { public Resource getAllocatedResources() {
return BuilderUtils.newResource(allocatedMB.value(), allocatedVCores.value()); return BuilderUtils.newResource(allocatedMB.value(),
(int) allocatedVCores.value());
} }
public long getAllocatedMB() { public long getAllocatedMB() {
return allocatedMB.value(); return allocatedMB.value();
} }
public long getAllocatedVirtualCores() { public int getAllocatedVirtualCores() {
return allocatedVCores.value(); return allocatedVCores.value();
} }
@ -580,7 +581,7 @@ public class QueueMetrics implements MetricsSource {
return availableMB.value(); return availableMB.value();
} }
public long getAvailableVirtualCores() { public int getAvailableVirtualCores() {
return availableVCores.value(); return availableVCores.value();
} }
@ -588,7 +589,7 @@ public class QueueMetrics implements MetricsSource {
return pendingMB.value(); return pendingMB.value();
} }
public long getPendingVirtualCores() { public int getPendingVirtualCores() {
return pendingVCores.value(); return pendingVCores.value();
} }
@ -600,7 +601,7 @@ public class QueueMetrics implements MetricsSource {
return reservedMB.value(); return reservedMB.value();
} }
public long getReservedVirtualCores() { public int getReservedVirtualCores() {
return reservedVCores.value(); return reservedVCores.value();
} }

View File

@ -443,7 +443,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public synchronized Resource getHeadroom() { public synchronized Resource getHeadroom() {
// Corner case to deal with applications being slightly over-limit // Corner case to deal with applications being slightly over-limit
if (resourceLimit.getMemorySize() < 0) { if (resourceLimit.getMemorySize() < 0) {
resourceLimit.setMemory(0); resourceLimit.setMemorySize(0);
} }
return resourceLimit; return resourceLimit;

View File

@ -66,7 +66,7 @@ public class CapacityHeadroomProvider {
} }
// Corner case to deal with applications being slightly over-limit // Corner case to deal with applications being slightly over-limit
if (headroom.getMemorySize() < 0) { if (headroom.getMemorySize() < 0) {
headroom.setMemory(0); headroom.setMemorySize(0);
} }
return headroom; return headroom;
} }

View File

@ -429,7 +429,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
} }
public void clearPreemptedResources() { public void clearPreemptedResources() {
preemptedResources.setMemory(0); preemptedResources.setMemorySize(0);
preemptedResources.setVirtualCores(0); preemptedResources.setVirtualCores(0);
} }

View File

@ -266,10 +266,10 @@ public class ComputeFairShares {
private static void setResourceValue(long val, Resource resource, ResourceType type) { private static void setResourceValue(long val, Resource resource, ResourceType type) {
switch (type) { switch (type) {
case MEMORY: case MEMORY:
resource.setMemory(val); resource.setMemorySize(val);
break; break;
case CPU: case CPU:
resource.setVirtualCores(val); resource.setVirtualCores((int)val);
break; break;
default: default:
throw new IllegalArgumentException("Invalid resource"); throw new IllegalArgumentException("Invalid resource");

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
public class ResourceInfo { public class ResourceInfo {
long memory; long memory;
long vCores; int vCores;
public ResourceInfo() { public ResourceInfo() {
} }
@ -42,7 +42,7 @@ public class ResourceInfo {
return memory; return memory;
} }
public long getvCores() { public int getvCores() {
return vCores; return vCores;
} }

View File

@ -220,7 +220,7 @@ public class MockAM {
pri.setPriority(priority); pri.setPriority(priority);
req.setPriority(pri); req.setPriority(pri);
Resource capability = Records.newRecord(Resource.class); Resource capability = Records.newRecord(Resource.class);
capability.setMemory(memory); capability.setMemorySize(memory);
req.setCapability(capability); req.setCapability(capability);
if (labelExpression != null) { if (labelExpression != null) {
req.setNodeLabelExpression(labelExpression); req.setNodeLabelExpression(labelExpression);

View File

@ -83,19 +83,19 @@ public class MockNodes {
public static Resource newResource(int mem) { public static Resource newResource(int mem) {
Resource rs = recordFactory.newRecordInstance(Resource.class); Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory(mem); rs.setMemorySize(mem);
return rs; return rs;
} }
public static Resource newUsedResource(Resource total) { public static Resource newUsedResource(Resource total) {
Resource rs = recordFactory.newRecordInstance(Resource.class); Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory((int)(Math.random() * total.getMemorySize())); rs.setMemorySize((int)(Math.random() * total.getMemorySize()));
return rs; return rs;
} }
public static Resource newAvailResource(Resource total, Resource used) { public static Resource newAvailResource(Resource total, Resource used) {
Resource rs = recordFactory.newRecordInstance(Resource.class); Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory(total.getMemorySize() - used.getMemorySize()); rs.setMemorySize(total.getMemorySize() - used.getMemorySize());
return rs; return rs;
} }

View File

@ -382,7 +382,7 @@ public class MockRM extends ResourceManager {
Map<ApplicationAccessType, String> acls, String queue, String amLabel) Map<ApplicationAccessType, String> acls, String queue, String amLabel)
throws Exception { throws Exception {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory); resource.setMemorySize(masterMemory);
Priority priority = Priority.newInstance(0); Priority priority = Priority.newInstance(0);
return submitApp(resource, name, user, acls, false, queue, return submitApp(resource, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@ -435,7 +435,7 @@ public class MockRM extends ResourceManager {
int maxAppAttempts, Credentials ts, String appType, int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers) throws Exception { boolean waitForAccepted, boolean keepContainers) throws Exception {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory); resource.setMemorySize(masterMemory);
return submitApp(resource, name, user, acls, unmanaged, queue, return submitApp(resource, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers, maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
false, null, 0, null, true, Priority.newInstance(0)); false, null, 0, null, true, Priority.newInstance(0));
@ -444,7 +444,7 @@ public class MockRM extends ResourceManager {
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
throws Exception { throws Exception {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory); resource.setMemorySize(masterMemory);
Priority priority = Priority.newInstance(0); Priority priority = Priority.newInstance(0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser() return submitApp(resource, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, .getShortUserName(), null, false, null,
@ -459,7 +459,7 @@ public class MockRM extends ResourceManager {
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId) throws Exception { ApplicationId applicationId) throws Exception {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory); resource.setMemorySize(masterMemory);
Priority priority = Priority.newInstance(0); Priority priority = Priority.newInstance(0);
return submitApp(resource, name, user, acls, unmanaged, queue, return submitApp(resource, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers, maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
@ -469,7 +469,7 @@ public class MockRM extends ResourceManager {
public RMApp submitApp(int masterMemory, public RMApp submitApp(int masterMemory,
LogAggregationContext logAggregationContext) throws Exception { LogAggregationContext logAggregationContext) throws Exception {
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory); resource.setMemorySize(masterMemory);
Priority priority = Priority.newInstance(0); Priority priority = Priority.newInstance(0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser() return submitApp(resource, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null, .getShortUserName(), null, false, null,

View File

@ -820,21 +820,21 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
resourceTrackerService.registerNodeManager(req); resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response1.getNodeAction()); Assert.assertEquals(NodeAction.SHUTDOWN,response1.getNodeAction());
capability.setMemory(2048); capability.setMemorySize(2048);
capability.setVirtualCores(1); capability.setVirtualCores(1);
req.setResource(capability); req.setResource(capability);
RegisterNodeManagerResponse response2 = RegisterNodeManagerResponse response2 =
resourceTrackerService.registerNodeManager(req); resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response2.getNodeAction()); Assert.assertEquals(NodeAction.SHUTDOWN,response2.getNodeAction());
capability.setMemory(1024); capability.setMemorySize(1024);
capability.setVirtualCores(4); capability.setVirtualCores(4);
req.setResource(capability); req.setResource(capability);
RegisterNodeManagerResponse response3 = RegisterNodeManagerResponse response3 =
resourceTrackerService.registerNodeManager(req); resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response3.getNodeAction()); Assert.assertEquals(NodeAction.SHUTDOWN,response3.getNodeAction());
capability.setMemory(2048); capability.setMemorySize(2048);
capability.setVirtualCores(4); capability.setVirtualCores(4);
req.setResource(capability); req.setResource(capability);
RegisterNodeManagerResponse response4 = RegisterNodeManagerResponse response4 =

View File

@ -1318,7 +1318,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
// submit app with keepContainersAcrossApplicationAttempts true // submit app with keepContainersAcrossApplicationAttempts true
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(200); resource.setMemorySize(200);
RMApp app0 = rm1.submitApp(resource, "", UserGroupInformation RMApp app0 = rm1.submitApp(resource, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null, .getCurrentUser().getShortUserName(), null, false, null,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true,

View File

@ -127,7 +127,7 @@ public class TestReservationSystemUtil {
public Resource createResource(int memory, int vCores) { public Resource createResource(int memory, int vCores) {
Resource resource = new ResourcePBImpl(); Resource resource = new ResourcePBImpl();
resource.setMemory(memory); resource.setMemorySize(memory);
resource.setVirtualCores(vCores); resource.setVirtualCores(vCores);
return resource; return resource;
} }

View File

@ -402,9 +402,9 @@ public class TestQueueMetrics {
} }
public static void checkResources(MetricsSource source, long allocatedMB, public static void checkResources(MetricsSource source, long allocatedMB,
long allocatedCores, int allocCtnrs, long aggreAllocCtnrs, int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
long aggreReleasedCtnrs, long availableMB, long availableCores, long pendingMB, long aggreReleasedCtnrs, long availableMB, int availableCores, long pendingMB,
long pendingCores, int pendingCtnrs, long reservedMB, long reservedCores, int pendingCores, int pendingCtnrs, long reservedMB, int reservedCores,
int reservedCtnrs) { int reservedCtnrs) {
MetricsRecordBuilder rb = getMetrics(source); MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb); assertGauge("AllocatedMB", allocatedMB, rb);

View File

@ -1143,7 +1143,7 @@ public class TestDelegationTokenRenewer {
// submit app1 with a token, set cancelTokenWhenComplete to false; // submit app1 with a token, set cancelTokenWhenComplete to false;
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(200); resource.setMemorySize(200);
RMApp app1 = rm.submitApp(resource, "name", "user", null, false, null, 2, RMApp app1 = rm.submitApp(resource, "name", "user", null, false, null, 2,
credentials, null, true, false, false, null, 0, null, false, null); credentials, null, true, false, false, null, 0, null, false, null);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
@ -1208,7 +1208,7 @@ public class TestDelegationTokenRenewer {
Assert.assertFalse(Renewer.cancelled); Assert.assertFalse(Renewer.cancelled);
Resource resource = Records.newRecord(Resource.class); Resource resource = Records.newRecord(Resource.class);
resource.setMemory(200); resource.setMemorySize(200);
RMApp app1 = RMApp app1 =
rm.submitApp(resource, "name", "user", null, false, null, 2, credentials, rm.submitApp(resource, "name", "user", null, false, null, 2, credentials,
null, true, false, false, null, 0, null, true, null); null, true, false, false, null, 0, null, true, null);