YARN-4844. Add getMemoryLong/getVirtualCoreLong to o.a.h.y.api.records.Resource. Contributed by Wangda Tan.

This commit is contained in:
Varun Vasudev 2016-05-29 20:55:39 +05:30
parent b3f5337bb3
commit 4f36c3d214
136 changed files with 1568 additions and 1544 deletions

View File

@ -689,9 +689,9 @@ public class JobHistoryEventHandler extends AbstractService
NormalizedResourceEvent normalizedResourceEvent =
(NormalizedResourceEvent) event;
if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
summary.setResourcesPerMap(normalizedResourceEvent.getMemory());
summary.setResourcesPerMap((int) normalizedResourceEvent.getMemory());
} else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) {
summary.setResourcesPerReduce(normalizedResourceEvent.getMemory());
summary.setResourcesPerReduce((int) normalizedResourceEvent.getMemory());
}
break;
case JOB_INITED:

View File

@ -1441,7 +1441,7 @@ public abstract class TaskAttemptImpl implements
}
long duration = (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
Resource allocatedResource = taskAttempt.container.getResource();
int mbAllocated = allocatedResource.getMemory();
int mbAllocated = (int) allocatedResource.getMemorySize();
int vcoresAllocated = allocatedResource.getVirtualCores();
int minSlotMemSize = taskAttempt.conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,

View File

@ -365,10 +365,10 @@ public class RMContainerAllocator extends RMContainerRequestor
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest
.getMemory())));
.getMemorySize())));
LOG.info("mapResourceRequest:" + mapResourceRequest);
if (mapResourceRequest.getMemory() > supportedMaxContainerCapability
.getMemory()
if (mapResourceRequest.getMemorySize() > supportedMaxContainerCapability
.getMemorySize()
|| mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability
.getVirtualCores()) {
String diagMsg =
@ -382,7 +382,7 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
// set the resources
reqEvent.getCapability().setMemory(mapResourceRequest.getMemory());
reqEvent.getCapability().setMemory(mapResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
mapResourceRequest.getVirtualCores());
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
@ -392,10 +392,10 @@ public class RMContainerAllocator extends RMContainerRequestor
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceRequest.getMemory())));
reduceResourceRequest.getMemorySize())));
LOG.info("reduceResourceRequest:" + reduceResourceRequest);
if (reduceResourceRequest.getMemory() > supportedMaxContainerCapability
.getMemory()
if (reduceResourceRequest.getMemorySize() > supportedMaxContainerCapability
.getMemorySize()
|| reduceResourceRequest.getVirtualCores() > supportedMaxContainerCapability
.getVirtualCores()) {
String diagMsg =
@ -410,7 +410,7 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
// set the resources
reqEvent.getCapability().setMemory(reduceResourceRequest.getMemory());
reqEvent.getCapability().setMemory(reduceResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
reduceResourceRequest.getVirtualCores());
if (reqEvent.getEarlierAttemptFailed()) {

View File

@ -20,43 +20,42 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.Records;
import java.util.EnumSet;
public class ResourceCalculatorUtils {
public static int divideAndCeil(int a, int b) {
public static int divideAndCeil(long a, long b) {
if (b == 0) {
return 0;
}
return (a + (b - 1)) / b;
return (int) ((a + (b - 1)) / b);
}
public static int computeAvailableContainers(Resource available,
Resource required, EnumSet<SchedulerResourceTypes> resourceTypes) {
if (resourceTypes.contains(SchedulerResourceTypes.CPU)) {
return Math.min(
calculateRatioOrMaxValue(available.getMemory(), required.getMemory()),
calculateRatioOrMaxValue(available.getMemorySize(), required.getMemorySize()),
calculateRatioOrMaxValue(available.getVirtualCores(), required
.getVirtualCores()));
}
return calculateRatioOrMaxValue(
available.getMemory(), required.getMemory());
available.getMemorySize(), required.getMemorySize());
}
public static int divideAndCeilContainers(Resource required, Resource factor,
EnumSet<SchedulerResourceTypes> resourceTypes) {
if (resourceTypes.contains(SchedulerResourceTypes.CPU)) {
return Math.max(divideAndCeil(required.getMemory(), factor.getMemory()),
return Math.max(divideAndCeil(required.getMemorySize(), factor.getMemorySize()),
divideAndCeil(required.getVirtualCores(), factor.getVirtualCores()));
}
return divideAndCeil(required.getMemory(), factor.getMemory());
return divideAndCeil(required.getMemorySize(), factor.getMemorySize());
}
private static int calculateRatioOrMaxValue(int numerator, int denominator) {
private static int calculateRatioOrMaxValue(long numerator, long denominator) {
if (denominator == 0) {
return Integer.MAX_VALUE;
}
return numerator / denominator;
return (int) (numerator / denominator);
}
}

View File

@ -1794,7 +1794,7 @@ public class TestRecovery {
int appAttemptId = 3;
MRAppMetrics metrics = mock(MRAppMetrics.class);
Resource minContainerRequirements = mock(Resource.class);
when(minContainerRequirements.getMemory()).thenReturn(1000);
when(minContainerRequirements.getMemorySize()).thenReturn(1000L);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
AppContext appContext = mock(AppContext.class);

View File

@ -33,12 +33,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -47,11 +41,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -94,7 +86,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
@ -313,7 +304,7 @@ public class TestTaskAttempt{
Assert.assertEquals(rta.getLaunchTime(), 10);
Counters counters = job.getAllCounters();
int memoryMb = containerResource.getMemory();
int memoryMb = (int) containerResource.getMemorySize();
int vcores = containerResource.getVirtualCores();
Assert.assertEquals((int) Math.ceil((float) memoryMb / minContainerSize),
counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue());
@ -577,7 +568,7 @@ public class TestTaskAttempt{
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
@ -635,7 +626,7 @@ public class TestTaskAttempt{
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
@ -699,7 +690,7 @@ public class TestTaskAttempt{
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
@ -769,7 +760,7 @@ public class TestTaskAttempt{
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
@ -826,7 +817,7 @@ public class TestTaskAttempt{
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
@ -894,7 +885,7 @@ public class TestTaskAttempt{
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
@ -1054,7 +1045,7 @@ public class TestTaskAttempt{
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(),
@ -1108,7 +1099,7 @@ public class TestTaskAttempt{
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(),
@ -1165,7 +1156,7 @@ public class TestTaskAttempt{
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(),

View File

@ -201,7 +201,7 @@ public class TestLocalContainerAllocator {
Container container = containerAssignedCaptor.getValue().getContainer();
Resource containerResource = container.getResource();
Assert.assertNotNull(containerResource);
Assert.assertEquals(containerResource.getMemory(), 0);
Assert.assertEquals(containerResource.getMemorySize(), 0);
Assert.assertEquals(containerResource.getVirtualCores(), 0);
}

View File

@ -1771,7 +1771,7 @@ public class TestRMContainerAllocator {
when(excessC.getId()).thenReturn(containerId);
when(excessC.getPriority()).thenReturn(RMContainerAllocator.PRIORITY_REDUCE);
Resource mockR = mock(Resource.class);
when(mockR.getMemory()).thenReturn(2048);
when(mockR.getMemorySize()).thenReturn(2048L);
when(excessC.getResource()).thenReturn(mockR);
NodeId nId = mock(NodeId.class);
when(nId.getHost()).thenReturn("local");

View File

@ -47,8 +47,8 @@ public class TestResourceCalculatorUtils {
Integer.MAX_VALUE,
expectedNumberOfContainersForCPU);
Resource zeroCpuResource = Resource.newInstance(nonZeroResource.getMemory(),
0);
Resource zeroCpuResource = Resource.newInstance(
nonZeroResource.getMemorySize(), 0);
verifyDifferentResourceTypes(clusterAvailableResources, zeroCpuResource,
expectedNumberOfContainersForMemory,

View File

@ -521,13 +521,13 @@ public class TypeConverter {
application.getApplicationResourceUsageReport();
if (resourceUsageReport != null) {
jobStatus.setNeededMem(
resourceUsageReport.getNeededResources().getMemory());
resourceUsageReport.getNeededResources().getMemorySize());
jobStatus.setNumReservedSlots(
resourceUsageReport.getNumReservedContainers());
jobStatus.setNumUsedSlots(resourceUsageReport.getNumUsedContainers());
jobStatus.setReservedMem(
resourceUsageReport.getReservedResources().getMemory());
jobStatus.setUsedMem(resourceUsageReport.getUsedResources().getMemory());
resourceUsageReport.getReservedResources().getMemorySize());
jobStatus.setUsedMem(resourceUsageReport.getUsedResources().getMemorySize());
}
return jobStatus;
}

View File

@ -95,9 +95,9 @@ public class JobStatus implements Writable, Cloneable {
private String trackingUrl ="";
private int numUsedSlots;
private int numReservedSlots;
private int usedMem;
private int reservedMem;
private int neededMem;
private long usedMem;
private long reservedMem;
private long neededMem;
private boolean isUber;
/**
@ -580,42 +580,42 @@ public class JobStatus implements Writable, Cloneable {
/**
* @return the used memory
*/
public int getUsedMem() {
public long getUsedMem() {
return usedMem;
}
/**
* @param m the used memory
*/
public void setUsedMem(int m) {
public void setUsedMem(long m) {
this.usedMem = m;
}
/**
* @return the reserved memory
*/
public int getReservedMem() {
public long getReservedMem() {
return reservedMem;
}
/**
* @param r the reserved memory
*/
public void setReservedMem(int r) {
public void setReservedMem(long r) {
this.reservedMem = r;
}
/**
* @return the needed memory
*/
public int getNeededMem() {
public long getNeededMem() {
return neededMem;
}
/**
* @param n the needed memory
*/
public void setNeededMem(int n) {
public void setNeededMem(long n) {
this.neededMem = n;
}

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.TaskType;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NormalizedResourceEvent implements HistoryEvent {
private int memory;
private long memory;
private TaskType taskType;
/**
@ -36,7 +36,7 @@ public class NormalizedResourceEvent implements HistoryEvent {
* @param taskType the tasktype of the request.
* @param memory the normalized memory requirements.
*/
public NormalizedResourceEvent(TaskType taskType, int memory) {
public NormalizedResourceEvent(TaskType taskType, long memory) {
this.memory = memory;
this.taskType = taskType;
}
@ -53,7 +53,7 @@ public class NormalizedResourceEvent implements HistoryEvent {
* the normalized memory
* @return the normalized memory
*/
public int getMemory() {
public long getMemory() {
return this.memory;
}

View File

@ -779,9 +779,10 @@ public class CLI extends Configured implements Tool {
for (JobStatus job : jobs) {
int numUsedSlots = job.getNumUsedSlots();
int numReservedSlots = job.getNumReservedSlots();
int usedMem = job.getUsedMem();
int rsvdMem = job.getReservedMem();
int neededMem = job.getNeededMem();
long usedMem = job.getUsedMem();
long rsvdMem = job.getReservedMem();
long neededMem = job.getNeededMem();
writer.printf(dataPattern,
job.getJobID().toString(), job.getState(), job.getStartTime(),
job.getUsername(), job.getQueue(),

View File

@ -170,9 +170,9 @@ public class JobClientUnitTest {
when(mockJobStatus.getPriority()).thenReturn(JobPriority.NORMAL);
when(mockJobStatus.getNumUsedSlots()).thenReturn(1);
when(mockJobStatus.getNumReservedSlots()).thenReturn(1);
when(mockJobStatus.getUsedMem()).thenReturn(1024);
when(mockJobStatus.getReservedMem()).thenReturn(512);
when(mockJobStatus.getNeededMem()).thenReturn(2048);
when(mockJobStatus.getUsedMem()).thenReturn(1024L);
when(mockJobStatus.getReservedMem()).thenReturn(512L);
when(mockJobStatus.getNeededMem()).thenReturn(2048L);
when(mockJobStatus.getSchedulingInfo()).thenReturn("NA");
Job mockJob = mock(Job.class);

View File

@ -67,10 +67,10 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
FairScheduler fair = (FairScheduler) scheduler;
final FSAppAttempt app = fair.getSchedulerApp(appAttemptId);
metrics.register("variable.app." + oldAppId + ".demand.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return app.getDemand().getMemory();
public Long getValue() {
return app.getDemand().getMemorySize();
}
}
);
@ -83,10 +83,10 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
}
);
metrics.register("variable.app." + oldAppId + ".usage.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return app.getResourceUsage().getMemory();
public Long getValue() {
return app.getResourceUsage().getMemorySize();
}
}
);
@ -99,26 +99,26 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
}
);
metrics.register("variable.app." + oldAppId + ".minshare.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return app.getMinShare().getMemory();
public Long getValue() {
return app.getMinShare().getMemorySize();
}
}
);
metrics.register("variable.app." + oldAppId + ".minshare.vcores",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return app.getMinShare().getMemory();
public Long getValue() {
return app.getMinShare().getMemorySize();
}
}
);
metrics.register("variable.app." + oldAppId + ".maxshare.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return Math.min(app.getMaxShare().getMemory(), totalMemoryMB);
public Long getValue() {
return Math.min(app.getMaxShare().getMemorySize(), totalMemoryMB);
}
}
);
@ -154,10 +154,10 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
FairScheduler fair = (FairScheduler) scheduler;
final FSQueue queue = fair.getQueueManager().getQueue(queueName);
metrics.register("variable.queue." + queueName + ".demand.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return queue.getDemand().getMemory();
public Long getValue() {
return queue.getDemand().getMemorySize();
}
}
);
@ -170,10 +170,10 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
}
);
metrics.register("variable.queue." + queueName + ".usage.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return queue.getResourceUsage().getMemory();
public Long getValue() {
return queue.getResourceUsage().getMemorySize();
}
}
);
@ -186,10 +186,10 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
}
);
metrics.register("variable.queue." + queueName + ".minshare.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return queue.getMinShare().getMemory();
public Long getValue() {
return queue.getMinShare().getMemorySize();
}
}
);
@ -202,9 +202,9 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
}
);
metrics.register("variable.queue." + queueName + ".maxshare.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if (! maxReset &&
SLSRunner.simulateInfoMap.containsKey("Number of nodes") &&
SLSRunner.simulateInfoMap.containsKey("Node memory (MB)") &&
@ -221,7 +221,7 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
maxReset = false;
}
return Math.min(queue.getMaxShare().getMemory(), totalMemoryMB);
return Math.min(queue.getMaxShare().getMemorySize(), totalMemoryMB);
}
}
);
@ -234,10 +234,10 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
}
);
metrics.register("variable.queue." + queueName + ".fairshare.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return queue.getFairShare().getMemory();
public Long getValue() {
return queue.getFairShare().getMemorySize();
}
}
);

View File

@ -267,7 +267,7 @@ public class ResourceSchedulerWrapper
// should have one container which is AM container
RMContainer rmc = app.getLiveContainers().iterator().next();
updateQueueMetrics(queue,
rmc.getContainer().getResource().getMemory(),
rmc.getContainer().getResource().getMemorySize(),
rmc.getContainer().getResource().getVirtualCores());
}
}
@ -323,7 +323,7 @@ public class ResourceSchedulerWrapper
if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
for (RMContainer rmc : app.getLiveContainers()) {
if (rmc.getContainerId() == containerId) {
releasedMemory += rmc.getContainer().getResource().getMemory();
releasedMemory += rmc.getContainer().getResource().getMemorySize();
releasedVCores += rmc.getContainer()
.getResource().getVirtualCores();
break;
@ -332,7 +332,7 @@ public class ResourceSchedulerWrapper
} else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
if (preemptionContainerMap.containsKey(containerId)) {
Resource preResource = preemptionContainerMap.get(containerId);
releasedMemory += preResource.getMemory();
releasedMemory += preResource.getMemorySize();
releasedVCores += preResource.getVirtualCores();
preemptionContainerMap.remove(containerId);
}
@ -423,9 +423,9 @@ public class ResourceSchedulerWrapper
"counter.queue." + queueName + ".pending.cores",
"counter.queue." + queueName + ".allocated.memory",
"counter.queue." + queueName + ".allocated.cores"};
int values[] = new int[]{pendingResource.getMemory(),
long values[] = new long[]{pendingResource.getMemorySize(),
pendingResource.getVirtualCores(),
allocatedResource.getMemory(), allocatedResource.getVirtualCores()};
allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()};
for (int i = names.length - 1; i >= 0; i --) {
if (! counterMap.containsKey(names[i])) {
metrics.counter(names[i]);
@ -531,11 +531,11 @@ public class ResourceSchedulerWrapper
private void registerClusterResourceMetrics() {
metrics.register("variable.cluster.allocated.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAllocatedMB();
}
@ -543,11 +543,11 @@ public class ResourceSchedulerWrapper
}
);
metrics.register("variable.cluster.allocated.vcores",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
}
@ -555,11 +555,11 @@ public class ResourceSchedulerWrapper
}
);
metrics.register("variable.cluster.available.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAvailableMB();
}
@ -567,11 +567,11 @@ public class ResourceSchedulerWrapper
}
);
metrics.register("variable.cluster.available.vcores",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAvailableVirtualCores();
}
@ -749,7 +749,7 @@ public class ResourceSchedulerWrapper
}
private void updateQueueMetrics(String queue,
int releasedMemory, int releasedVCores) {
long releasedMemory, int releasedVCores) {
// update queue counters
SortedMap<String, Counter> counterMap = metrics.getCounters();
if (releasedMemory != 0) {

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@ -53,7 +55,7 @@ public abstract class Resource implements Comparable<Resource> {
@Public
@Stable
public static Resource newInstance(int memory, int vCores) {
public static Resource newInstance(long memory, long vCores) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory);
resource.setVirtualCores(vCores);
@ -61,20 +63,31 @@ public abstract class Resource implements Comparable<Resource> {
}
/**
* This method is DEPRECATED:
* Use {@link Resource#getMemorySize()} instead
*
* Get <em>memory</em> of the resource.
* @return <em>memory</em> of the resource
*/
@Public
@Stable
@Deprecated
public abstract int getMemory();
/**
* Get <em>memory</em> of the resource.
* @return <em>memory</em> of the resource
*/
@Private
@Unstable
public abstract long getMemorySize();
/**
* Set <em>memory</em> of the resource.
* @param memory <em>memory</em> of the resource
*/
@Public
@Stable
public abstract void setMemory(int memory);
public abstract void setMemory(long memory);
/**
@ -91,6 +104,10 @@ public abstract class Resource implements Comparable<Resource> {
@Evolving
public abstract int getVirtualCores();
@Public
@Unstable
public abstract long getVirtualCoresSize();
/**
* Set <em>number of virtual cpu cores</em> of the resource.
*
@ -103,13 +120,14 @@ public abstract class Resource implements Comparable<Resource> {
*/
@Public
@Evolving
public abstract void setVirtualCores(int vCores);
public abstract void setVirtualCores(long vCores);
@Override
public int hashCode() {
final int prime = 263167;
int result = 3571;
result = 939769357 + getMemory(); // prime * result = 939769357 initially
int result = (int) (939769357
+ getMemorySize()); // prime * result = 939769357 initially
result = prime * result + getVirtualCores();
return result;
}
@ -123,7 +141,7 @@ public abstract class Resource implements Comparable<Resource> {
if (!(obj instanceof Resource))
return false;
Resource other = (Resource) obj;
if (getMemory() != other.getMemory() ||
if (getMemorySize() != other.getMemorySize() ||
getVirtualCores() != other.getVirtualCores()) {
return false;
}
@ -132,6 +150,6 @@ public abstract class Resource implements Comparable<Resource> {
@Override
public String toString() {
return "<memory:" + getMemory() + ", vCores:" + getVirtualCores() + ">";
return "<memory:" + getMemorySize() + ", vCores:" + getVirtualCores() + ">";
}
}

View File

@ -54,8 +54,8 @@ message ContainerIdProto {
}
message ResourceProto {
optional int32 memory = 1;
optional int32 virtual_cores = 2;
optional int64 memory = 1;
optional int64 virtual_cores = 2;
}
message ResourceUtilizationProto {

View File

@ -224,7 +224,7 @@ public class ApplicationMaster {
@VisibleForTesting
protected int numTotalContainers = 1;
// Memory to request for the container on which the shell command will run
private int containerMemory = 10;
private long containerMemory = 10;
// VirtualCores to request for the container on which the shell command will run
private int containerVirtualCores = 1;
// Priority of the request
@ -631,7 +631,7 @@ public class ApplicationMaster {
appMasterTrackingUrl);
// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
long maxMem = response.getMaximumResourceCapability().getMemorySize();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
@ -861,7 +861,7 @@ public class ApplicationMaster {
+ ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory()
+ allocatedContainer.getResource().getMemorySize()
+ ", containerResourceVirtualCores"
+ allocatedContainer.getResource().getVirtualCores());
// + ", containerToken"

View File

@ -127,7 +127,7 @@ public class Client {
// Queue for App master
private String amQueue = "";
// Amt. of memory resource to request for to run the App Master
private int amMemory = 10;
private long amMemory = 10;
// Amt. of virtual core resource to request for to run the App Master
private int amVCores = 1;
@ -520,7 +520,7 @@ public class Client {
// the required resources from the RM for the app master
// Memory ask has to be a multiple of min and less than max.
// Dump out information about cluster capability as seen by the resource manager
int maxMem = appResponse.getMaximumResourceCapability().getMemory();
long maxMem = appResponse.getMaximumResourceCapability().getMemorySize();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
// A resource ask cannot exceed the max.

View File

@ -122,10 +122,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
@Override
public int compare(Resource arg0, Resource arg1) {
int mem0 = arg0.getMemory();
int mem1 = arg1.getMemory();
int cpu0 = arg0.getVirtualCores();
int cpu1 = arg1.getVirtualCores();
long mem0 = arg0.getMemorySize();
long mem1 = arg1.getMemorySize();
long cpu0 = arg0.getVirtualCores();
long cpu1 = arg1.getVirtualCores();
if(mem0 == mem1) {
if(cpu0 == cpu1) {
return 0;
@ -143,10 +143,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
static boolean canFit(Resource arg0, Resource arg1) {
int mem0 = arg0.getMemory();
int mem1 = arg1.getMemory();
int cpu0 = arg0.getVirtualCores();
int cpu1 = arg1.getVirtualCores();
long mem0 = arg0.getMemorySize();
long mem1 = arg1.getMemorySize();
long cpu0 = arg0.getVirtualCores();
long cpu1 = arg1.getVirtualCores();
return (mem0 <= mem1 && cpu0 <= cpu1);
}

View File

@ -275,9 +275,9 @@ public class NodeCLI extends YarnCLI {
nodeReportStr.println(nodeReport.getNumContainers());
nodeReportStr.print("\tMemory-Used : ");
nodeReportStr.println((nodeReport.getUsed() == null) ? "0MB"
: (nodeReport.getUsed().getMemory() + "MB"));
: (nodeReport.getUsed().getMemorySize() + "MB"));
nodeReportStr.print("\tMemory-Capacity : ");
nodeReportStr.println(nodeReport.getCapability().getMemory() + "MB");
nodeReportStr.println(nodeReport.getCapability().getMemorySize() + "MB");
nodeReportStr.print("\tCPU-Used : ");
nodeReportStr.println((nodeReport.getUsed() == null) ? "0 vcores"
: (nodeReport.getUsed().getVirtualCores() + " vcores"));

View File

@ -158,7 +158,7 @@ public class TopCLI extends YarnCLI {
displayStringsMap.put(Columns.VCORES, String.valueOf(usedVirtualCores));
usedMemory =
appReport.getApplicationResourceUsageReport().getUsedResources()
.getMemory() / 1024;
.getMemorySize() / 1024;
displayStringsMap.put(Columns.MEM, String.valueOf(usedMemory) + "G");
reservedVirtualCores =
appReport.getApplicationResourceUsageReport().getReservedResources()
@ -167,7 +167,7 @@ public class TopCLI extends YarnCLI {
String.valueOf(reservedVirtualCores));
reservedMemory =
appReport.getApplicationResourceUsageReport().getReservedResources()
.getMemory() / 1024;
.getMemorySize() / 1024;
displayStringsMap.put(Columns.RMEM, String.valueOf(reservedMemory) + "G");
attempts = appReport.getCurrentApplicationAttemptId().getAttemptId();
nodes = 0;

View File

@ -1209,7 +1209,7 @@ public class TestYarnClient {
for (attempts = 10; attempts > 0; attempts--) {
if (cluster.getResourceManager().getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
.getMemory() > 6000) {
.getMemorySize() > 6000) {
break;
}
try {

View File

@ -54,38 +54,48 @@ public class ResourcePBImpl extends Resource {
viaProto = false;
}
@Override
@SuppressWarnings("deprecation")
public int getMemory() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return (p.getMemory());
return (int) getMemorySize();
}
@Override
public void setMemory(int memory) {
public long getMemorySize() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return p.getMemory();
}
@Override
public void setMemory(long memory) {
maybeInitBuilder();
builder.setMemory((memory));
builder.setMemory(memory);
}
@Override
public int getVirtualCores() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return (p.getVirtualCores());
return (int) getVirtualCoresSize();
}
@Override
public void setVirtualCores(int vCores) {
public long getVirtualCoresSize() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return p.getVirtualCores();
}
@Override
public void setVirtualCores(long vCores) {
maybeInitBuilder();
builder.setVirtualCores((vCores));
builder.setVirtualCores(vCores);
}
@Override
public int compareTo(Resource other) {
int diff = this.getMemory() - other.getMemory();
long diff = this.getMemorySize() - other.getMemorySize();
if (diff == 0) {
diff = this.getVirtualCores() - other.getVirtualCores();
}
return diff;
return diff == 0 ? 0 : (diff > 0 ? 1 : -1);
}

View File

@ -28,13 +28,13 @@ public class DefaultResourceCalculator extends ResourceCalculator {
@Override
public int compare(Resource unused, Resource lhs, Resource rhs) {
// Only consider memory
return lhs.getMemory() - rhs.getMemory();
return Long.compare(lhs.getMemorySize(), rhs.getMemorySize());
}
@Override
public int computeAvailableContainers(Resource available, Resource required) {
public long computeAvailableContainers(Resource available, Resource required) {
// Only consider memory
return available.getMemory() / required.getMemory();
return available.getMemorySize() / required.getMemorySize();
}
@Override
@ -44,7 +44,7 @@ public class DefaultResourceCalculator extends ResourceCalculator {
}
public boolean isInvalidDivisor(Resource r) {
if (r.getMemory() == 0.0f) {
if (r.getMemorySize() == 0.0f) {
return true;
}
return false;
@ -52,23 +52,23 @@ public class DefaultResourceCalculator extends ResourceCalculator {
@Override
public float ratio(Resource a, Resource b) {
return (float)a.getMemory() / b.getMemory();
return (float)a.getMemorySize() / b.getMemorySize();
}
@Override
public Resource divideAndCeil(Resource numerator, int denominator) {
public Resource divideAndCeil(Resource numerator, long denominator) {
return Resources.createResource(
divideAndCeil(numerator.getMemory(), denominator));
divideAndCeil(numerator.getMemorySize(), denominator));
}
@Override
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) {
int normalizedMemory = Math.min(
long normalizedMemory = Math.min(
roundUp(
Math.max(r.getMemory(), minimumResource.getMemory()),
stepFactor.getMemory()),
maximumResource.getMemory());
Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
stepFactor.getMemorySize()),
maximumResource.getMemorySize());
return Resources.createResource(normalizedMemory);
}
@ -81,22 +81,22 @@ public class DefaultResourceCalculator extends ResourceCalculator {
@Override
public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource(
roundUp(r.getMemory(), stepFactor.getMemory())
roundUp(r.getMemorySize(), stepFactor.getMemorySize())
);
}
@Override
public Resource roundDown(Resource r, Resource stepFactor) {
return Resources.createResource(
roundDown(r.getMemory(), stepFactor.getMemory()));
roundDown(r.getMemorySize(), stepFactor.getMemorySize()));
}
@Override
public Resource multiplyAndNormalizeUp(Resource r, double by,
Resource stepFactor) {
return Resources.createResource(
roundUp((int)(r.getMemory() * by + 0.5), stepFactor.getMemory())
);
roundUp((long) (r.getMemorySize() * by + 0.5),
stepFactor.getMemorySize()));
}
@Override
@ -104,8 +104,8 @@ public class DefaultResourceCalculator extends ResourceCalculator {
Resource stepFactor) {
return Resources.createResource(
roundDown(
(int)(r.getMemory() * by),
stepFactor.getMemory()
(long)(r.getMemorySize() * by),
stepFactor.getMemorySize()
)
);
}
@ -113,6 +113,6 @@ public class DefaultResourceCalculator extends ResourceCalculator {
@Override
public boolean fitsIn(Resource cluster,
Resource smaller, Resource bigger) {
return smaller.getMemory() <= bigger.getMemory();
return smaller.getMemorySize() <= bigger.getMemorySize();
}
}

View File

@ -54,15 +54,15 @@ public class DominantResourceCalculator extends ResourceCalculator {
}
if (isInvalidDivisor(clusterResource)) {
if ((lhs.getMemory() < rhs.getMemory() && lhs.getVirtualCores() > rhs
if ((lhs.getMemorySize() < rhs.getMemorySize() && lhs.getVirtualCores() > rhs
.getVirtualCores())
|| (lhs.getMemory() > rhs.getMemory() && lhs.getVirtualCores() < rhs
|| (lhs.getMemorySize() > rhs.getMemorySize() && lhs.getVirtualCores() < rhs
.getVirtualCores())) {
return 0;
} else if (lhs.getMemory() > rhs.getMemory()
} else if (lhs.getMemorySize() > rhs.getMemorySize()
|| lhs.getVirtualCores() > rhs.getVirtualCores()) {
return 1;
} else if (lhs.getMemory() < rhs.getMemory()
} else if (lhs.getMemorySize() < rhs.getMemorySize()
|| lhs.getVirtualCores() < rhs.getVirtualCores()) {
return -1;
}
@ -100,20 +100,20 @@ public class DominantResourceCalculator extends ResourceCalculator {
// Just use 'dominant' resource
return (dominant) ?
Math.max(
(float)resource.getMemory() / clusterResource.getMemory(),
(float)resource.getMemorySize() / clusterResource.getMemorySize(),
(float)resource.getVirtualCores() / clusterResource.getVirtualCores()
)
:
Math.min(
(float)resource.getMemory() / clusterResource.getMemory(),
(float)resource.getMemorySize() / clusterResource.getMemorySize(),
(float)resource.getVirtualCores() / clusterResource.getVirtualCores()
);
}
@Override
public int computeAvailableContainers(Resource available, Resource required) {
public long computeAvailableContainers(Resource available, Resource required) {
return Math.min(
available.getMemory() / required.getMemory(),
available.getMemorySize() / required.getMemorySize(),
available.getVirtualCores() / required.getVirtualCores());
}
@ -127,7 +127,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override
public boolean isInvalidDivisor(Resource r) {
if (r.getMemory() == 0.0f || r.getVirtualCores() == 0.0f) {
if (r.getMemorySize() == 0.0f || r.getVirtualCores() == 0.0f) {
return true;
}
return false;
@ -136,15 +136,15 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override
public float ratio(Resource a, Resource b) {
return Math.max(
(float)a.getMemory()/b.getMemory(),
(float)a.getMemorySize()/b.getMemorySize(),
(float)a.getVirtualCores()/b.getVirtualCores()
);
}
@Override
public Resource divideAndCeil(Resource numerator, int denominator) {
public Resource divideAndCeil(Resource numerator, long denominator) {
return Resources.createResource(
divideAndCeil(numerator.getMemory(), denominator),
divideAndCeil(numerator.getMemorySize(), denominator),
divideAndCeil(numerator.getVirtualCores(), denominator)
);
}
@ -152,12 +152,12 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) {
int normalizedMemory = Math.min(
long normalizedMemory = Math.min(
roundUp(
Math.max(r.getMemory(), minimumResource.getMemory()),
stepFactor.getMemory()),
maximumResource.getMemory());
int normalizedCores = Math.min(
Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
stepFactor.getMemorySize()),
maximumResource.getMemorySize());
long normalizedCores = Math.min(
roundUp(
Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
stepFactor.getVirtualCores()),
@ -169,7 +169,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override
public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource(
roundUp(r.getMemory(), stepFactor.getMemory()),
roundUp(r.getMemorySize(), stepFactor.getMemorySize()),
roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
);
}
@ -177,7 +177,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override
public Resource roundDown(Resource r, Resource stepFactor) {
return Resources.createResource(
roundDown(r.getMemory(), stepFactor.getMemory()),
roundDown(r.getMemorySize(), stepFactor.getMemorySize()),
roundDown(r.getVirtualCores(), stepFactor.getVirtualCores())
);
}
@ -187,7 +187,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
Resource stepFactor) {
return Resources.createResource(
roundUp(
(int)Math.ceil(r.getMemory() * by), stepFactor.getMemory()),
(int)Math.ceil(r.getMemorySize() * by), stepFactor.getMemorySize()),
roundUp(
(int)Math.ceil(r.getVirtualCores() * by),
stepFactor.getVirtualCores())
@ -199,8 +199,8 @@ public class DominantResourceCalculator extends ResourceCalculator {
Resource stepFactor) {
return Resources.createResource(
roundDown(
(int)(r.getMemory() * by),
stepFactor.getMemory()
(int)(r.getMemorySize() * by),
stepFactor.getMemorySize()
),
roundDown(
(int)(r.getVirtualCores() * by),
@ -212,7 +212,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override
public boolean fitsIn(Resource cluster,
Resource smaller, Resource bigger) {
return smaller.getMemory() <= bigger.getMemory()
return smaller.getMemorySize() <= bigger.getMemorySize()
&& smaller.getVirtualCores() <= bigger.getVirtualCores();
}
}

View File

@ -31,18 +31,18 @@ public abstract class ResourceCalculator {
public abstract int
compare(Resource clusterResource, Resource lhs, Resource rhs);
public static int divideAndCeil(int a, int b) {
public static long divideAndCeil(long a, long b) {
if (b == 0) {
return 0;
}
return (a + (b - 1)) / b;
}
public static int roundUp(int a, int b) {
public static long roundUp(long a, long b) {
return divideAndCeil(a, b) * b;
}
public static int roundDown(int a, int b) {
public static long roundDown(long a, long b) {
return (a / b) * b;
}
@ -54,7 +54,7 @@ public abstract class ResourceCalculator {
* @param required required resources
* @return number of containers which can be allocated
*/
public abstract int computeAvailableContainers(
public abstract long computeAvailableContainers(
Resource available, Resource required);
/**
@ -169,7 +169,7 @@ public abstract class ResourceCalculator {
* @param denominator denominator
* @return resultant resource
*/
public abstract Resource divideAndCeil(Resource numerator, int denominator);
public abstract Resource divideAndCeil(Resource numerator, long denominator);
/**
* Check if a smaller resource can be contained by bigger resource.

View File

@ -31,12 +31,18 @@ public class Resources {
private static final Resource NONE = new Resource() {
@Override
@SuppressWarnings("deprecation")
public int getMemory() {
return 0;
}
@Override
public void setMemory(int memory) {
public long getMemorySize() {
return 0;
}
@Override
public void setMemory(long memory) {
throw new RuntimeException("NONE cannot be modified!");
}
@ -46,17 +52,22 @@ public class Resources {
}
@Override
public void setVirtualCores(int cores) {
public long getVirtualCoresSize() {
return 0;
}
@Override
public void setVirtualCores(long cores) {
throw new RuntimeException("NONE cannot be modified!");
}
@Override
public int compareTo(Resource o) {
int diff = 0 - o.getMemory();
long diff = 0 - o.getMemorySize();
if (diff == 0) {
diff = 0 - o.getVirtualCores();
}
return diff;
return Long.signum(diff);
}
};
@ -64,12 +75,18 @@ public class Resources {
private static final Resource UNBOUNDED = new Resource() {
@Override
@SuppressWarnings("deprecation")
public int getMemory() {
return Integer.MAX_VALUE;
}
@Override
public void setMemory(int memory) {
public long getMemorySize() {
return Long.MAX_VALUE;
}
@Override
public void setMemory(long memory) {
throw new RuntimeException("UNBOUNDED cannot be modified!");
}
@ -79,26 +96,31 @@ public class Resources {
}
@Override
public void setVirtualCores(int cores) {
public long getVirtualCoresSize() {
return Long.MAX_VALUE;
}
@Override
public void setVirtualCores(long cores) {
throw new RuntimeException("UNBOUNDED cannot be modified!");
}
@Override
public int compareTo(Resource o) {
int diff = Integer.MAX_VALUE - o.getMemory();
long diff = Long.MAX_VALUE - o.getMemorySize();
if (diff == 0) {
diff = Integer.MAX_VALUE - o.getVirtualCores();
diff = Long.MAX_VALUE - o.getVirtualCoresSize();
}
return diff;
return Long.signum(diff);
}
};
public static Resource createResource(int memory) {
public static Resource createResource(long memory) {
return createResource(memory, (memory > 0) ? 1 : 0);
}
public static Resource createResource(int memory, int cores) {
public static Resource createResource(long memory, long cores) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory);
resource.setVirtualCores(cores);
@ -114,11 +136,11 @@ public class Resources {
}
public static Resource clone(Resource res) {
return createResource(res.getMemory(), res.getVirtualCores());
return createResource(res.getMemorySize(), res.getVirtualCores());
}
public static Resource addTo(Resource lhs, Resource rhs) {
lhs.setMemory(lhs.getMemory() + rhs.getMemory());
lhs.setMemory(lhs.getMemorySize() + rhs.getMemorySize());
lhs.setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores());
return lhs;
}
@ -128,7 +150,7 @@ public class Resources {
}
public static Resource subtractFrom(Resource lhs, Resource rhs) {
lhs.setMemory(lhs.getMemory() - rhs.getMemory());
lhs.setMemory(lhs.getMemorySize() - rhs.getMemorySize());
lhs.setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores());
return lhs;
}
@ -142,7 +164,7 @@ public class Resources {
}
public static Resource multiplyTo(Resource lhs, double by) {
lhs.setMemory((int)(lhs.getMemory() * by));
lhs.setMemory((int)(lhs.getMemorySize() * by));
lhs.setVirtualCores((int)(lhs.getVirtualCores() * by));
return lhs;
}
@ -157,7 +179,7 @@ public class Resources {
*/
public static Resource multiplyAndAddTo(
Resource lhs, Resource rhs, double by) {
lhs.setMemory(lhs.getMemory() + (int)(rhs.getMemory() * by));
lhs.setMemory(lhs.getMemorySize() + (int)(rhs.getMemorySize() * by));
lhs.setVirtualCores(lhs.getVirtualCores()
+ (int)(rhs.getVirtualCores() * by));
return lhs;
@ -175,7 +197,7 @@ public class Resources {
public static Resource multiplyAndRoundDown(Resource lhs, double by) {
Resource out = clone(lhs);
out.setMemory((int)(lhs.getMemory() * by));
out.setMemory((int)(lhs.getMemorySize() * by));
out.setVirtualCores((int)(lhs.getVirtualCores() * by));
return out;
}
@ -264,7 +286,7 @@ public class Resources {
}
public static boolean fitsIn(Resource smaller, Resource bigger) {
return smaller.getMemory() <= bigger.getMemory() &&
return smaller.getMemorySize() <= bigger.getMemorySize() &&
smaller.getVirtualCores() <= bigger.getVirtualCores();
}
@ -274,12 +296,12 @@ public class Resources {
}
public static Resource componentwiseMin(Resource lhs, Resource rhs) {
return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
return createResource(Math.min(lhs.getMemorySize(), rhs.getMemorySize()),
Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
}
public static Resource componentwiseMax(Resource lhs, Resource rhs) {
return createResource(Math.max(lhs.getMemory(), rhs.getMemory()),
return createResource(Math.max(lhs.getMemorySize(), rhs.getMemorySize()),
Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
}
}

View File

@ -24,18 +24,18 @@ import static org.junit.Assert.assertTrue;
public class TestResources {
public Resource createResource(int memory, int vCores) {
public Resource createResource(long memory, long vCores) {
return Resource.newInstance(memory, vCores);
}
@Test(timeout=1000)
public void testCompareToWithUnboundedResource() {
assertTrue(Resources.unbounded().compareTo(
createResource(Integer.MAX_VALUE, Integer.MAX_VALUE)) == 0);
createResource(Long.MAX_VALUE, Long.MAX_VALUE)) == 0);
assertTrue(Resources.unbounded().compareTo(
createResource(Integer.MAX_VALUE, 0)) > 0);
createResource(Long.MAX_VALUE, 0)) > 0);
assertTrue(Resources.unbounded().compareTo(
createResource(0, Integer.MAX_VALUE)) > 0);
createResource(0, Long.MAX_VALUE)) > 0);
}
@Test(timeout=1000)

View File

@ -442,7 +442,7 @@ public class BuilderUtils {
return report;
}
public static Resource newResource(int memory, int vCores) {
public static Resource newResource(long memory, long vCores) {
Resource resource = recordFactory.newRecordInstance(Resource.class);
resource.setMemory(memory);
resource.setVirtualCores(vCores);

View File

@ -59,8 +59,8 @@ public class AppInfo {
protected long elapsedTime;
protected String applicationTags;
protected int priority;
private int allocatedCpuVcores;
private int allocatedMemoryMB;
private long allocatedCpuVcores;
private long allocatedMemoryMB;
protected boolean unmanagedApplication;
private String appNodeLabelExpression;
private String amNodeLabelExpression;
@ -100,7 +100,7 @@ public class AppInfo {
allocatedCpuVcores = app.getApplicationResourceUsageReport()
.getUsedResources().getVirtualCores();
allocatedMemoryMB = app.getApplicationResourceUsageReport()
.getUsedResources().getMemory();
.getUsedResources().getMemorySize();
}
}
progress = app.getProgress() * 100; // in percent
@ -152,11 +152,11 @@ public class AppInfo {
return runningContainers;
}
public int getAllocatedCpuVcores() {
public long getAllocatedCpuVcores() {
return allocatedCpuVcores;
}
public int getAllocatedMemoryMB() {
public long getAllocatedMemoryMB() {
return allocatedMemoryMB;
}

View File

@ -36,8 +36,8 @@ import org.apache.hadoop.yarn.util.Times;
public class ContainerInfo {
protected String containerId;
protected int allocatedMB;
protected int allocatedVCores;
protected long allocatedMB;
protected long allocatedVCores;
protected String assignedNodeId;
protected int priority;
protected long startedTime;
@ -57,7 +57,7 @@ public class ContainerInfo {
public ContainerInfo(ContainerReport container) {
containerId = container.getContainerId().toString();
if (container.getAllocatedResource() != null) {
allocatedMB = container.getAllocatedResource().getMemory();
allocatedMB = container.getAllocatedResource().getMemorySize();
allocatedVCores = container.getAllocatedResource().getVirtualCores();
}
if (container.getAssignedNode() != null) {
@ -79,11 +79,11 @@ public class ContainerInfo {
return containerId;
}
public int getAllocatedMB() {
public long getAllocatedMB() {
return allocatedMB;
}
public int getAllocatedVCores() {
public long getAllocatedVCores() {
return allocatedVCores;
}

View File

@ -180,7 +180,7 @@ public class TestYarnServerApiClasses {
assertEquals(1, copy.getContainersToDecrease().get(0)
.getId().getContainerId());
assertEquals(1024, copy.getContainersToDecrease().get(1)
.getResource().getMemory());
.getResource().getMemorySize());
}
/**
@ -201,7 +201,7 @@ public class TestYarnServerApiClasses {
assertEquals(8080, copy.getHttpPort());
assertEquals(9090, copy.getNodeId().getPort());
assertEquals(10000, copy.getResource().getMemory());
assertEquals(10000, copy.getResource().getMemorySize());
assertEquals(2, copy.getResource().getVirtualCores());
}
@ -273,7 +273,7 @@ public class TestYarnServerApiClasses {
assertEquals(1, copy.getIncreasedContainers().get(0)
.getId().getContainerId());
assertEquals(4096, copy.getIncreasedContainers().get(1)
.getResource().getMemory());
.getResource().getMemorySize());
}
@Test

View File

@ -405,7 +405,7 @@ public abstract class ContainerExecutor implements Configurable {
.getBoolean(
YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED,
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) {
memory = resource.getMemory();
memory = (int) resource.getMemorySize();
}
if (conf.getBoolean(

View File

@ -215,7 +215,7 @@ public class ContainerImpl implements Container {
if (recoveredCapability != null
&& !this.resource.equals(recoveredCapability)) {
// resource capability had been updated before NM was down
this.resource = Resource.newInstance(recoveredCapability.getMemory(),
this.resource = Resource.newInstance(recoveredCapability.getMemorySize(),
recoveredCapability.getVirtualCores());
}
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
@ -611,7 +611,7 @@ public class ContainerImpl implements Container {
long launchDuration = clock.getTime() - containerLaunchStartTime;
metrics.addContainerLaunchDuration(launchDuration);
long pmemBytes = getResource().getMemory() * 1024 * 1024L;
long pmemBytes = getResource().getMemorySize() * 1024 * 1024L;
float pmemRatio = daemonConf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);

View File

@ -115,8 +115,8 @@ public class CGroupsMemoryResourceHandlerImpl implements MemoryResourceHandler {
String cgroupId = container.getContainerId().toString();
//memory is in MB
long containerSoftLimit =
(long) (container.getResource().getMemory() * this.softLimit);
long containerHardLimit = container.getResource().getMemory();
(long) (container.getResource().getMemorySize() * this.softLimit);
long containerHardLimit = container.getResource().getMemorySize();
cGroupsHandler.createCGroup(MEMORY, cgroupId);
try {
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,

View File

@ -646,7 +646,7 @@ public class ContainersMonitorImpl extends AbstractService implements
ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
Resource resource = changeEvent.getResource();
pmemLimitMBs = resource.getMemory();
pmemLimitMBs = (int) resource.getMemorySize();
vmemLimitMBs = (int) (pmemLimitMBs * vmemRatio);
cpuVcores = resource.getVirtualCores();
usageMetrics.recordResourceLimit(
@ -822,7 +822,7 @@ public class ContainersMonitorImpl extends AbstractService implements
}
LOG.info("Changing resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
long pmemLimit = changeEvent.getResource().getMemorySize() * 1024L * 1024L;
long vmemLimit = (long) (pmemLimit * vmemRatio);
int cpuVcores = changeEvent.getResource().getVirtualCores();
processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);

View File

@ -600,7 +600,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId,
Resource resource, Configuration conf) {
long pmemBytes = resource.getMemory() * 1024 * 1024L;
long pmemBytes = resource.getMemorySize() * 1024 * 1024L;
float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
long vmemBytes = (long) (pmemRatio * pmemBytes);

View File

@ -115,9 +115,9 @@ public class NodeManagerMetrics {
public void allocateContainer(Resource res) {
allocatedContainers.incr();
allocatedMB = allocatedMB + res.getMemory();
allocatedMB = allocatedMB + res.getMemorySize();
allocatedGB.set((int)Math.ceil(allocatedMB/1024d));
availableMB = availableMB - res.getMemory();
availableMB = availableMB - res.getMemorySize();
availableGB.set((int)Math.floor(availableMB/1024d));
allocatedVCores.incr(res.getVirtualCores());
availableVCores.decr(res.getVirtualCores());
@ -125,16 +125,16 @@ public class NodeManagerMetrics {
public void releaseContainer(Resource res) {
allocatedContainers.decr();
allocatedMB = allocatedMB - res.getMemory();
allocatedMB = allocatedMB - res.getMemorySize();
allocatedGB.set((int)Math.ceil(allocatedMB/1024d));
availableMB = availableMB + res.getMemory();
availableMB = availableMB + res.getMemorySize();
availableGB.set((int)Math.floor(availableMB/1024d));
allocatedVCores.decr(res.getVirtualCores());
availableVCores.incr(res.getVirtualCores());
}
public void changeContainer(Resource before, Resource now) {
int deltaMB = now.getMemory() - before.getMemory();
long deltaMB = now.getMemorySize() - before.getMemorySize();
int deltaVCores = now.getVirtualCores() - before.getVirtualCores();
allocatedMB = allocatedMB + deltaMB;
allocatedGB.set((int)Math.ceil(allocatedMB/1024d));
@ -145,7 +145,7 @@ public class NodeManagerMetrics {
}
public void addResource(Resource res) {
availableMB = availableMB + res.getMemory();
availableMB = availableMB + res.getMemorySize();
availableGB.incr((int)Math.floor(availableMB/1024d));
availableVCores.incr(res.getVirtualCores());
}

View File

@ -81,7 +81,7 @@ public class ContainerInfo {
this.user = container.getUser();
Resource res = container.getResource();
if (res != null) {
this.totalMemoryNeededMB = res.getMemory();
this.totalMemoryNeededMB = res.getMemorySize();
this.totalVCoresNeeded = res.getVirtualCores();
}
this.containerLogsShortLink = ujoin("containerlogs", this.id,

View File

@ -190,7 +190,7 @@ public class TestNodeStatusUpdater {
InetSocketAddress expected = NetUtils.getConnectAddress(
conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
Assert.assertEquals(NetUtils.getHostPortString(expected), nodeId.toString());
Assert.assertEquals(5 * 1024, resource.getMemory());
Assert.assertEquals(5 * 1024, resource.getMemorySize());
registeredNodes.add(nodeId);
RegisterNodeManagerResponse response = recordFactory
@ -918,7 +918,7 @@ public class TestNodeStatusUpdater {
conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
Assert.assertEquals(NetUtils.getHostPortString(expected),
nodeId.toString());
Assert.assertEquals(5 * 1024, resource.getMemory());
Assert.assertEquals(5 * 1024, resource.getMemorySize());
registeredNodes.add(nodeId);
RegisterNodeManagerResponse response = recordFactory

View File

@ -228,7 +228,7 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
Resource r = BuilderUtils.newResource(8 * 1024 * 1024, 1);
Resource r = BuilderUtils.newResource(0, 0);
ContainerTokenIdentifier containerIdentifier =
new ContainerTokenIdentifier(cId, context.getNodeId().toString(), user,
r, System.currentTimeMillis() + 120000, 123, DUMMY_RM_IDENTIFIER,

View File

@ -94,9 +94,9 @@ public class RMNMInfo implements RMNMInfoBeans {
ni.getNodeManagerVersion());
if(report != null) {
info.put("NumContainers", report.getNumContainers());
info.put("UsedMemoryMB", report.getUsedResource().getMemory());
info.put("UsedMemoryMB", report.getUsedResource().getMemorySize());
info.put("AvailableMemoryMB",
report.getAvailableResource().getMemory());
report.getAvailableResource().getMemorySize());
}
nodesInfo.add(info);

View File

@ -142,7 +142,7 @@ public class RMServerUtils {
// example, you cannot request target resource of a <10G, 10> container to
// <20G, 8>
if (increase) {
if (originalResource.getMemory() > targetResource.getMemory()
if (originalResource.getMemorySize() > targetResource.getMemorySize()
|| originalResource.getVirtualCores() > targetResource
.getVirtualCores()) {
String msg =
@ -153,7 +153,7 @@ public class RMServerUtils {
throw new InvalidResourceRequestException(msg);
}
} else {
if (originalResource.getMemory() < targetResource.getMemory()
if (originalResource.getMemorySize() < targetResource.getMemorySize()
|| originalResource.getVirtualCores() < targetResource
.getVirtualCores()) {
String msg =
@ -243,15 +243,15 @@ public class RMServerUtils {
return;
}
for (ContainerResourceChangeRequest request : requests) {
if (request.getCapability().getMemory() < 0
|| request.getCapability().getMemory() > maximumAllocation
.getMemory()) {
if (request.getCapability().getMemorySize() < 0
|| request.getCapability().getMemorySize() > maximumAllocation
.getMemorySize()) {
throw new InvalidResourceRequestException("Invalid "
+ (increase ? "increase" : "decrease") + " request"
+ ", requested memory < 0"
+ ", or requested memory > max configured" + ", requestedMemory="
+ request.getCapability().getMemory() + ", maxMemory="
+ maximumAllocation.getMemory());
+ request.getCapability().getMemorySize() + ", maxMemory="
+ maximumAllocation.getMemorySize());
}
if (request.getCapability().getVirtualCores() < 0
|| request.getCapability().getVirtualCores() > maximumAllocation

View File

@ -362,7 +362,7 @@ public class ResourceTrackerService extends AbstractService implements
}
// Check if this node has minimum allocations
if (capability.getMemory() < minAllocMb
if (capability.getMemorySize() < minAllocMb
|| capability.getVirtualCores() < minAllocVcores) {
String message =
"NodeManager from " + host

View File

@ -480,7 +480,7 @@ public class SystemMetricsPublisher extends CompositeService {
TimelineEntity entity = createContainerEntity(event.getContainerId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
event.getAllocatedResource().getMemory());
event.getAllocatedResource().getMemorySize());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
event.getAllocatedResource().getVirtualCores());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,

View File

@ -258,17 +258,17 @@ public class TempQueuePerPartition {
void appendLogString(StringBuilder sb) {
sb.append(queueName).append(", ")
.append(current.getMemory()).append(", ")
.append(current.getMemorySize()).append(", ")
.append(current.getVirtualCores()).append(", ")
.append(pending.getMemory()).append(", ")
.append(pending.getMemorySize()).append(", ")
.append(pending.getVirtualCores()).append(", ")
.append(getGuaranteed().getMemory()).append(", ")
.append(getGuaranteed().getMemorySize()).append(", ")
.append(getGuaranteed().getVirtualCores()).append(", ")
.append(idealAssigned.getMemory()).append(", ")
.append(idealAssigned.getMemorySize()).append(", ")
.append(idealAssigned.getVirtualCores()).append(", ")
.append(toBePreempted.getMemory()).append(", ")
.append(toBePreempted.getMemorySize()).append(", ")
.append(toBePreempted.getVirtualCores() ).append(", ")
.append(actuallyToBePreempted.getMemory()).append(", ")
.append(actuallyToBePreempted.getMemorySize()).append(", ")
.append(actuallyToBePreempted.getVirtualCores());
}

View File

@ -160,7 +160,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
Resource capToAssign = res.getResourcesAtTime(now);
float targetCapacity = 0f;
if (planResources.getMemory() > 0
if (planResources.getMemorySize() > 0
&& planResources.getVirtualCores() > 0) {
if (shouldResize) {
capToAssign =

View File

@ -246,7 +246,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
long vcores;
public IntegralResource(Resource resource) {
this.memory = resource.getMemory();
this.memory = resource.getMemorySize();
this.vcores = resource.getVirtualCores();
}
@ -256,12 +256,12 @@ public class CapacityOverTimePolicy implements SharingPolicy {
}
public void add(Resource r) {
memory += r.getMemory();
memory += r.getMemorySize();
vcores += r.getVirtualCores();
}
public void subtract(Resource r) {
memory -= r.getMemory();
memory -= r.getMemorySize();
vcores -= r.getVirtualCores();
}

View File

@ -106,7 +106,7 @@ public final class ReservationSystemUtil {
public static ResourceProto convertToProtoFormat(Resource e) {
return YarnProtos.ResourceProto.newBuilder()
.setMemory(e.getMemory())
.setMemory(e.getMemorySize())
.setVirtualCores(e.getVirtualCores())
.build();
}

View File

@ -88,7 +88,7 @@ public class StageEarliestStartByDemand implements StageEarliestStart {
// Weight = total memory consumption of stage
protected double calcWeight(ReservationRequest stage) {
return (stage.getDuration() * stage.getCapability().getMemory())
return (stage.getDuration() * stage.getCapability().getMemorySize())
* (stage.getNumContainers());
}

View File

@ -712,7 +712,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
}
long usedMillis = container.finishTime - container.creationTime;
long memorySeconds = resource.getMemory()
long memorySeconds = resource.getMemorySize()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
long vcoreSeconds = resource.getVirtualCores()
* usedMillis / DateUtils.MILLIS_PER_SECOND;

View File

@ -56,8 +56,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
private Resource staleClusterCapacity = null;
// Max allocation
private int maxNodeMemory = -1;
private int maxNodeVCores = -1;
private long maxNodeMemory = -1;
private long maxNodeVCores = -1;
private Resource configuredMaxAllocation;
private boolean forceConfiguredMaxAllocation = true;
private long configuredMaxAllocationWaitTime;
@ -211,7 +211,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
}
return Resources.createResource(
Math.min(configuredMaxAllocation.getMemory(), maxNodeMemory),
Math.min(configuredMaxAllocation.getMemorySize(), maxNodeMemory),
Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores)
);
} finally {
@ -224,7 +224,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
writeLock.lock();
try {
if (add) { // added node
int nodeMemory = totalResource.getMemory();
long nodeMemory = totalResource.getMemorySize();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
}
@ -233,7 +233,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
maxNodeVCores = nodeVCores;
}
} else { // removed node
if (maxNodeMemory == totalResource.getMemory()) {
if (maxNodeMemory == totalResource.getMemorySize()) {
maxNodeMemory = -1;
}
if (maxNodeVCores == totalResource.getVirtualCores()) {

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -59,8 +60,8 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of apps killed") MutableCounterInt appsKilled;
@Metric("# of apps failed") MutableCounterInt appsFailed;
@Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@Metric("Allocated memory in MB") MutableGaugeLong allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeLong allocatedVCores;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of allocated node-local containers")
@ -70,13 +71,13 @@ public class QueueMetrics implements MetricsSource {
@Metric("Aggregate # of allocated off-switch containers")
MutableCounterLong aggregateOffSwitchContainersAllocated;
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
@Metric("Available memory in MB") MutableGaugeInt availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
@Metric("Pending memory allocation in MB") MutableGaugeInt pendingMB;
@Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores;
@Metric("Available memory in MB") MutableGaugeLong availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeLong availableVCores;
@Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB;
@Metric("Pending CPU allocation in virtual cores") MutableGaugeLong pendingVCores;
@Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in MB") MutableGaugeInt reservedMB;
@Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
@Metric("# of reserved memory in MB") MutableGaugeLong reservedMB;
@Metric("Reserved CPU in virtual cores") MutableGaugeLong reservedVCores;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers;
@Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active applications") MutableGaugeInt activeApplications;
@ -325,7 +326,7 @@ public class QueueMetrics implements MetricsSource {
* @param limit resource limit
*/
public void setAvailableResourcesToQueue(Resource limit) {
availableMB.set(limit.getMemory());
availableMB.set(limit.getMemorySize());
availableVCores.set(limit.getVirtualCores());
}
@ -362,8 +363,8 @@ public class QueueMetrics implements MetricsSource {
private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemory() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCoresSize() * containers);
}
public void decrPendingResources(String user, int containers, Resource res) {
@ -379,8 +380,8 @@ public class QueueMetrics implements MetricsSource {
private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingMB.decr(res.getMemory() * containers);
pendingVCores.decr(res.getVirtualCores() * containers);
pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCoresSize() * containers);
}
public void incrNodeTypeAggregations(String user, NodeType type) {
@ -407,8 +408,8 @@ public class QueueMetrics implements MetricsSource {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemory() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCoresSize() * containers);
if (decrPending) {
_decrPendingResources(containers, res);
}
@ -428,10 +429,10 @@ public class QueueMetrics implements MetricsSource {
* @param res
*/
public void allocateResources(String user, Resource res) {
allocatedMB.incr(res.getMemory());
allocatedMB.incr(res.getMemorySize());
allocatedVCores.incr(res.getVirtualCores());
pendingMB.decr(res.getMemory());
pendingMB.decr(res.getMemorySize());
pendingVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user);
@ -446,8 +447,8 @@ public class QueueMetrics implements MetricsSource {
public void releaseResources(String user, int containers, Resource res) {
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemory() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers);
allocatedMB.decr(res.getMemorySize() * containers);
allocatedVCores.decr(res.getVirtualCoresSize() * containers);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, containers, res);
@ -464,7 +465,7 @@ public class QueueMetrics implements MetricsSource {
* @param res
*/
public void releaseResources(String user, Resource res) {
allocatedMB.decr(res.getMemory());
allocatedMB.decr(res.getMemorySize());
allocatedVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -477,7 +478,7 @@ public class QueueMetrics implements MetricsSource {
public void reserveResource(String user, Resource res) {
reservedContainers.incr();
reservedMB.incr(res.getMemory());
reservedMB.incr(res.getMemorySize());
reservedVCores.incr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -490,7 +491,7 @@ public class QueueMetrics implements MetricsSource {
public void unreserveResource(String user, Resource res) {
reservedContainers.decr();
reservedMB.decr(res.getMemory());
reservedMB.decr(res.getMemorySize());
reservedVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -563,11 +564,11 @@ public class QueueMetrics implements MetricsSource {
return BuilderUtils.newResource(allocatedMB.value(), allocatedVCores.value());
}
public int getAllocatedMB() {
public long getAllocatedMB() {
return allocatedMB.value();
}
public int getAllocatedVirtualCores() {
public long getAllocatedVirtualCores() {
return allocatedVCores.value();
}
@ -575,19 +576,19 @@ public class QueueMetrics implements MetricsSource {
return allocatedContainers.value();
}
public int getAvailableMB() {
public long getAvailableMB() {
return availableMB.value();
}
public int getAvailableVirtualCores() {
public long getAvailableVirtualCores() {
return availableVCores.value();
}
public int getPendingMB() {
public long getPendingMB() {
return pendingMB.value();
}
public int getPendingVirtualCores() {
public long getPendingVirtualCores() {
return pendingVCores.value();
}
@ -595,11 +596,11 @@ public class QueueMetrics implements MetricsSource {
return pendingContainers.value();
}
public int getReservedMB() {
public long getReservedMB() {
return reservedMB.value();
}
public int getReservedVirtualCores() {
public long getReservedVirtualCores() {
return reservedVCores.value();
}

View File

@ -444,7 +444,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
*/
public synchronized Resource getHeadroom() {
// Corner case to deal with applications being slightly over-limit
if (resourceLimit.getMemory() < 0) {
if (resourceLimit.getMemorySize() < 0) {
resourceLimit.setMemory(0);
}
@ -480,7 +480,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
if (requests != null) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " headRoom=" + getHeadroom() + " currentConsumption="
+ attemptResourceUsage.getUsed().getMemory());
+ attemptResourceUsage.getUsed().getMemorySize());
for (ResourceRequest request : requests.values()) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " request=" + request);
@ -682,7 +682,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
for (RMContainer rmContainer : this.liveContainers.values()) {
long usedMillis = currentTimeMillis - rmContainer.getCreationTime();
Resource resource = rmContainer.getContainer().getResource();
memorySeconds += resource.getMemory() * usedMillis /
memorySeconds += resource.getMemorySize() * usedMillis /
DateUtils.MILLIS_PER_SECOND;
vcoreSeconds += resource.getVirtualCores() * usedMillis
/ DateUtils.MILLIS_PER_SECOND;

View File

@ -274,13 +274,13 @@ public class SchedulerUtils {
private static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
throws InvalidResourceRequestException {
if (resReq.getCapability().getMemory() < 0 ||
resReq.getCapability().getMemory() > maximumResource.getMemory()) {
if (resReq.getCapability().getMemorySize() < 0 ||
resReq.getCapability().getMemorySize() > maximumResource.getMemorySize()) {
throw new InvalidResourceRequestException("Invalid resource request"
+ ", requested memory < 0"
+ ", or requested memory > max configured"
+ ", requestedMemory=" + resReq.getCapability().getMemory()
+ ", maxMemory=" + maximumResource.getMemory());
+ ", requestedMemory=" + resReq.getCapability().getMemorySize()
+ ", maxMemory=" + maximumResource.getMemorySize());
}
if (resReq.getCapability().getVirtualCores() < 0 ||
resReq.getCapability().getVirtualCores() >

View File

@ -23,7 +23,7 @@ import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -32,37 +32,37 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
public class CSQueueMetrics extends QueueMetrics {
@Metric("AM memory limit in MB")
MutableGaugeInt AMResourceLimitMB;
MutableGaugeLong AMResourceLimitMB;
@Metric("AM CPU limit in virtual cores")
MutableGaugeInt AMResourceLimitVCores;
MutableGaugeLong AMResourceLimitVCores;
@Metric("Used AM memory limit in MB")
MutableGaugeInt usedAMResourceMB;
MutableGaugeLong usedAMResourceMB;
@Metric("Used AM CPU limit in virtual cores")
MutableGaugeInt usedAMResourceVCores;
MutableGaugeLong usedAMResourceVCores;
CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
super(ms, queueName, parent, enableUserMetrics, conf);
}
public int getAMResourceLimitMB() {
public long getAMResourceLimitMB() {
return AMResourceLimitMB.value();
}
public int getAMResourceLimitVCores() {
public long getAMResourceLimitVCores() {
return AMResourceLimitVCores.value();
}
public int getUsedAMResourceMB() {
public long getUsedAMResourceMB() {
return usedAMResourceMB.value();
}
public int getUsedAMResourceVCores() {
public long getUsedAMResourceVCores() {
return usedAMResourceVCores.value();
}
public void setAMResouceLimit(Resource res) {
AMResourceLimitMB.set(res.getMemory());
AMResourceLimitMB.set(res.getMemorySize());
AMResourceLimitVCores.set(res.getVirtualCores());
}
@ -74,7 +74,7 @@ public class CSQueueMetrics extends QueueMetrics {
}
public void incAMUsed(String user, Resource res) {
usedAMResourceMB.incr(res.getMemory());
usedAMResourceMB.incr(res.getMemorySize());
usedAMResourceVCores.incr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
@ -83,7 +83,7 @@ public class CSQueueMetrics extends QueueMetrics {
}
public void decAMUsed(String user, Resource res) {
usedAMResourceMB.decr(res.getMemory());
usedAMResourceMB.decr(res.getMemorySize());
usedAMResourceVCores.decr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {

View File

@ -65,7 +65,7 @@ public class CapacityHeadroomProvider {
}
}
// Corner case to deal with applications being slightly over-limit
if (headroom.getMemory() < 0) {
if (headroom.getMemorySize() < 0) {
headroom.setMemory(0);
}
return headroom;

View File

@ -677,7 +677,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
*/
public Resource getMaximumAllocationPerQueue(String queue) {
String queuePrefix = getQueuePrefix(queue);
int maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
long maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
(int)UNDEFINED);
int maxAllocationVcoresPerQueue = getInt(
queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
@ -690,7 +690,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
Resource clusterMax = getMaximumAllocation();
if (maxAllocationMbPerQueue == (int)UNDEFINED) {
LOG.info("max alloc mb per queue for " + queue + " is undefined");
maxAllocationMbPerQueue = clusterMax.getMemory();
maxAllocationMbPerQueue = clusterMax.getMemorySize();
}
if (maxAllocationVcoresPerQueue == (int)UNDEFINED) {
LOG.info("max alloc vcore per queue for " + queue + " is undefined");
@ -698,7 +698,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
}
Resource result = Resources.createResource(maxAllocationMbPerQueue,
maxAllocationVcoresPerQueue);
if (maxAllocationMbPerQueue > clusterMax.getMemory()
if (maxAllocationMbPerQueue > clusterMax.getMemorySize()
|| maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) {
throw new IllegalArgumentException(
"Queue maximum allocation cannot be larger than the cluster setting"

View File

@ -449,7 +449,7 @@ public class LeafQueue extends AbstractCSQueue {
// since we have already told running AM's the size
Resource oldMax = getMaximumAllocation();
Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
if (newMax.getMemory() < oldMax.getMemory()
if (newMax.getMemorySize() < oldMax.getMemorySize()
|| newMax.getVirtualCores() < oldMax.getVirtualCores()) {
throw new IOException(
"Trying to reinitialize "

View File

@ -442,7 +442,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
priority, capability);
// Can we allocate a container on this node?
int availableContainers =
long availableContainers =
rc.computeAvailableContainers(available, capability);
// How much need to unreserve equals to:

View File

@ -193,7 +193,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resources.subtractFrom(availableResources,
node.getUnallocatedResource());
}
if (availableResources.getMemory() < 0) {
if (availableResources.getMemorySize() < 0) {
availableResources.setMemory(0);
}
if (availableResources.getVirtualCores() < 0) {

View File

@ -128,7 +128,7 @@ public class FSParentQueue extends FSQueue {
public Resource getDemand() {
readLock.lock();
try {
return Resource.newInstance(demand.getMemory(), demand.getVirtualCores());
return Resource.newInstance(demand.getMemorySize(), demand.getVirtualCores());
} finally {
readLock.unlock();
}

View File

@ -135,18 +135,18 @@ public abstract class FSQueue implements Queue, Schedulable {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(getQueueName());
if (scheduler.getClusterResource().getMemory() == 0) {
if (scheduler.getClusterResource().getMemorySize() == 0) {
queueInfo.setCapacity(0.0f);
} else {
queueInfo.setCapacity((float) getFairShare().getMemory() /
scheduler.getClusterResource().getMemory());
queueInfo.setCapacity((float) getFairShare().getMemorySize() /
scheduler.getClusterResource().getMemorySize());
}
if (getFairShare().getMemory() == 0) {
if (getFairShare().getMemorySize() == 0) {
queueInfo.setCurrentCapacity(0.0f);
} else {
queueInfo.setCurrentCapacity((float) getResourceUsage().getMemory() /
getFairShare().getMemory());
queueInfo.setCurrentCapacity((float) getResourceUsage().getMemorySize() /
getFairShare().getMemorySize());
}
ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -31,14 +32,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@Metrics(context="yarn")
public class FSQueueMetrics extends QueueMetrics {
@Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB;
@Metric("Fair share of CPU in vcores") MutableGaugeInt fairShareVCores;
@Metric("Steady fair share of memory in MB") MutableGaugeInt steadyFairShareMB;
@Metric("Steady fair share of CPU in vcores") MutableGaugeInt steadyFairShareVCores;
@Metric("Minimum share of memory in MB") MutableGaugeInt minShareMB;
@Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores;
@Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB;
@Metric("Maximum share of CPU in vcores") MutableGaugeInt maxShareVCores;
@Metric("Fair share of memory in MB") MutableGaugeLong fairShareMB;
@Metric("Fair share of CPU in vcores") MutableGaugeLong fairShareVCores;
@Metric("Steady fair share of memory in MB") MutableGaugeLong steadyFairShareMB;
@Metric("Steady fair share of CPU in vcores") MutableGaugeLong steadyFairShareVCores;
@Metric("Minimum share of memory in MB") MutableGaugeLong minShareMB;
@Metric("Minimum share of CPU in vcores") MutableGaugeLong minShareVCores;
@Metric("Maximum share of memory in MB") MutableGaugeLong maxShareMB;
@Metric("Maximum share of CPU in vcores") MutableGaugeLong maxShareVCores;
@Metric("Maximum number of applications") MutableGaugeInt maxApps;
private String schedulingPolicy;
@ -49,54 +50,54 @@ public class FSQueueMetrics extends QueueMetrics {
}
public void setFairShare(Resource resource) {
fairShareMB.set(resource.getMemory());
fairShareMB.set(resource.getMemorySize());
fairShareVCores.set(resource.getVirtualCores());
}
public int getFairShareMB() {
public long getFairShareMB() {
return fairShareMB.value();
}
public int getFairShareVirtualCores() {
public long getFairShareVirtualCores() {
return fairShareVCores.value();
}
public void setSteadyFairShare(Resource resource) {
steadyFairShareMB.set(resource.getMemory());
steadyFairShareMB.set(resource.getMemorySize());
steadyFairShareVCores.set(resource.getVirtualCores());
}
public int getSteadyFairShareMB() {
public long getSteadyFairShareMB() {
return steadyFairShareMB.value();
}
public int getSteadyFairShareVCores() {
public long getSteadyFairShareVCores() {
return steadyFairShareVCores.value();
}
public void setMinShare(Resource resource) {
minShareMB.set(resource.getMemory());
minShareMB.set(resource.getMemorySize());
minShareVCores.set(resource.getVirtualCores());
}
public int getMinShareMB() {
public long getMinShareMB() {
return minShareMB.value();
}
public int getMinShareVirtualCores() {
public long getMinShareVirtualCores() {
return minShareVCores.value();
}
public void setMaxShare(Resource resource) {
maxShareMB.set(resource.getMemory());
maxShareMB.set(resource.getMemorySize());
maxShareVCores.set(resource.getVirtualCores());
}
public int getMaxShareMB() {
public long getMaxShareMB() {
return maxShareMB.value();
}
public int getMaxShareVirtualCores() {
public long getMaxShareVirtualCores() {
return maxShareVCores.value();
}

View File

@ -472,7 +472,7 @@ public class FairScheduler extends
}
private boolean isResourceGreaterThanNone(Resource toPreempt) {
return (toPreempt.getMemory() > 0) || (toPreempt.getVirtualCores() > 0);
return (toPreempt.getMemorySize() > 0) || (toPreempt.getVirtualCores() > 0);
}
protected void warnOrKillContainer(RMContainer container) {
@ -559,7 +559,7 @@ public class FairScheduler extends
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on current memory demand
weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2);
}
weight *= app.getPriority().getPriority();
if (weightAdjuster != null) {
@ -1214,7 +1214,7 @@ public class FairScheduler extends
if (preemptionEnabled) {
Resource clusterResource = getClusterResource();
return (preemptionUtilizationThreshold < Math.max(
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemorySize(),
(float) rootMetrics.getAllocatedVirtualCores() /
clusterResource.getVirtualCores()));
}

View File

@ -124,15 +124,15 @@ public class ComputeFairShares {
// have met all Schedulables' max shares.
int totalMaxShare = 0;
for (Schedulable sched : schedulables) {
int maxShare = getResourceValue(sched.getMaxShare(), type);
totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,
long maxShare = getResourceValue(sched.getMaxShare(), type);
totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare,
Integer.MAX_VALUE);
if (totalMaxShare == Integer.MAX_VALUE) {
break;
}
}
int totalResource = Math.max((getResourceValue(totalResources, type) -
long totalResource = Math.max((getResourceValue(totalResources, type) -
takenResources), 0);
totalResource = Math.min(totalMaxShare, totalResource);
@ -207,7 +207,7 @@ public class ComputeFairShares {
int totalResource = 0;
for (Schedulable sched : schedulables) {
int fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
if (fixedShare < 0) {
nonFixedSchedulables.add(sched);
} else {
@ -229,7 +229,7 @@ public class ComputeFairShares {
* The fairshare is fixed if either the maxShare is 0, weight is 0,
* or the Schedulable is not active for instantaneous fairshare.
*/
private static int getFairShareIfFixed(Schedulable sched,
private static long getFairShareIfFixed(Schedulable sched,
boolean isSteadyShare, ResourceType type) {
// Check if maxShare is 0
@ -245,17 +245,17 @@ public class ComputeFairShares {
// Check if weight is 0
if (sched.getWeights().getWeight(type) <= 0) {
int minShare = getResourceValue(sched.getMinShare(), type);
long minShare = getResourceValue(sched.getMinShare(), type);
return (minShare <= 0) ? 0 : minShare;
}
return -1;
}
private static int getResourceValue(Resource resource, ResourceType type) {
private static long getResourceValue(Resource resource, ResourceType type) {
switch (type) {
case MEMORY:
return resource.getMemory();
return resource.getMemorySize();
case CPU:
return resource.getVirtualCores();
default:
@ -263,7 +263,7 @@ public class ComputeFairShares {
}
}
private static void setResourceValue(int val, Resource resource, ResourceType type) {
private static void setResourceValue(long val, Resource resource, ResourceType type) {
switch (type) {
case MEMORY:
resource.setMemory(val);

View File

@ -101,13 +101,13 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
@Override
public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
Resource maxAvailable) {
int queueAvailableMemory =
Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0);
long queueAvailableMemory =
Math.max(queueFairShare.getMemorySize() - queueUsage.getMemorySize(), 0);
int queueAvailableCPU =
Math.max(queueFairShare.getVirtualCores() - queueUsage
.getVirtualCores(), 0);
Resource headroom = Resources.createResource(
Math.min(maxAvailable.getMemory(), queueAvailableMemory),
Math.min(maxAvailable.getMemorySize(), queueAvailableMemory),
Math.min(maxAvailable.getVirtualCores(),
queueAvailableCPU));
return headroom;
@ -180,8 +180,8 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
*/
void calculateShares(Resource resource, Resource pool,
ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) {
shares.setWeight(MEMORY, (float)resource.getMemory() /
(pool.getMemory() * weights.getWeight(MEMORY)));
shares.setWeight(MEMORY, (float)resource.getMemorySize() /
(pool.getMemorySize() * weights.getWeight(MEMORY)));
shares.setWeight(CPU, (float)resource.getVirtualCores() /
(pool.getVirtualCores() * weights.getWeight(CPU)));
// sort order vector by resource share

View File

@ -82,13 +82,13 @@ public class FairSharePolicy extends SchedulingPolicy {
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() /
minShareRatio1 = (double) s1.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize();
minShareRatio2 = (double) s2.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize();
useToWeightRatio1 = s1.getResourceUsage().getMemorySize() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemory() /
useToWeightRatio2 = s2.getResourceUsage().getMemorySize() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
@ -124,10 +124,10 @@ public class FairSharePolicy extends SchedulingPolicy {
@Override
public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource maxAvailable) {
int queueAvailableMemory = Math.max(
queueFairShare.getMemory() - queueUsage.getMemory(), 0);
long queueAvailableMemory = Math.max(
queueFairShare.getMemorySize() - queueUsage.getMemorySize(), 0);
Resource headroom = Resources.createResource(
Math.min(maxAvailable.getMemory(), queueAvailableMemory),
Math.min(maxAvailable.getMemorySize(), queueAvailableMemory),
maxAvailable.getVirtualCores());
return headroom;
}
@ -152,7 +152,7 @@ public class FairSharePolicy extends SchedulingPolicy {
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return usage.getMemory() > maxAMResource.getMemory();
return usage.getMemorySize() > maxAMResource.getMemorySize();
}
@Override

View File

@ -115,16 +115,16 @@ public class FifoPolicy extends SchedulingPolicy {
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return usage.getMemory() > maxAMResource.getMemory();
return usage.getMemorySize() > maxAMResource.getMemorySize();
}
@Override
public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource maxAvailable) {
int queueAvailableMemory = Math.max(
queueFairShare.getMemory() - queueUsage.getMemory(), 0);
long queueAvailableMemory = Math.max(
queueFairShare.getMemorySize() - queueUsage.getMemorySize(), 0);
Resource headroom = Resources.createResource(
Math.min(maxAvailable.getMemory(), queueAvailableMemory),
Math.min(maxAvailable.getMemorySize(), queueAvailableMemory),
maxAvailable.getVirtualCores());
return headroom;
}

View File

@ -143,11 +143,11 @@ public class FifoScheduler extends
queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
queueInfo.setCapacity(1.0f);
Resource clusterResource = getClusterResource();
if (clusterResource.getMemory() == 0) {
if (clusterResource.getMemorySize() == 0) {
queueInfo.setCurrentCapacity(0.0f);
} else {
queueInfo.setCurrentCapacity((float) usedResource.getMemory()
/ clusterResource.getMemory());
queueInfo.setCurrentCapacity((float) usedResource.getMemorySize()
/ clusterResource.getMemorySize());
}
queueInfo.setMaximumCapacity(1.0f);
queueInfo.setChildQueues(new ArrayList<QueueInfo>());
@ -697,8 +697,9 @@ public class FifoScheduler extends
Resource capability = request.getCapability();
// TODO: A buggy application with this zero would crash the scheduler.
int availableContainers = node.getUnallocatedResource().getMemory() /
capability.getMemory();
int availableContainers =
(int) (node.getUnallocatedResource().getMemorySize() /
capability.getMemorySize());
int assignedContainers =
Math.min(assignableContainers, availableContainers);

View File

@ -23,7 +23,6 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
/**
@ -67,10 +66,10 @@ public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
private double getMagnitude(SchedulableEntity r) {
double mag = r.getSchedulingResourceUsage().getCachedUsed(
CommonNodeLabelsManager.ANY).getMemory();
CommonNodeLabelsManager.ANY).getMemorySize();
if (sizeBasedWeight) {
double weight = Math.log1p(r.getSchedulingResourceUsage().getCachedDemand(
CommonNodeLabelsManager.ANY).getMemory()) / Math.log(2);
CommonNodeLabelsManager.ANY).getMemorySize()) / Math.log(2);
mag = mag / weight;
}
return mag;

View File

@ -54,8 +54,8 @@ class DefaultSchedulerPage extends RmView {
@Override public void render(Block html) {
info("\'" + sinfo.getQueueName() + "\' Queue Status").
_("Queue State:" , sinfo.getState()).
_("Minimum Queue Memory Capacity:" , Integer.toString(sinfo.getMinQueueMemoryCapacity())).
_("Maximum Queue Memory Capacity:" , Integer.toString(sinfo.getMaxQueueMemoryCapacity())).
_("Minimum Queue Memory Capacity:" , Long.toString(sinfo.getMinQueueMemoryCapacity())).
_("Maximum Queue Memory Capacity:" , Long.toString(sinfo.getMaxQueueMemoryCapacity())).
_("Number of Nodes:" , Integer.toString(sinfo.getNumNodes())).
_("Used Node Capacity:" , Integer.toString(sinfo.getUsedNodeCapacity())).
_("Available Node Capacity:" , Integer.toString(sinfo.getAvailNodeCapacity())).

View File

@ -115,7 +115,7 @@ public class FairSchedulerAppsBlock extends HtmlBlock {
AppInfo appInfo = new AppInfo(rm, app, true, WebAppUtils.getHttpSchemePrefix(conf));
String percent = StringUtils.format("%.1f", appInfo.getProgress());
ApplicationAttemptId attemptId = app.getCurrentAppAttempt().getAppAttemptId();
int fairShare = fsinfo.getAppFairShare(attemptId);
long fairShare = fsinfo.getAppFairShare(attemptId);
if (fairShare == FairSchedulerInfo.INVALID_FAIR_SHARE) {
// FairScheduler#applications don't have the entry. Skip it.
continue;

View File

@ -1514,14 +1514,14 @@ public class RMWebServices extends WebServices {
String msg = "Requested more cores than configured max";
throw new BadRequestException(msg);
}
if (newApp.getResource().getMemory() > rm.getConfig().getInt(
if (newApp.getResource().getMemorySize() > rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)) {
String msg = "Requested more memory than configured max";
throw new BadRequestException(msg);
}
Resource r =
Resource.newInstance(newApp.getResource().getMemory(), newApp
Resource.newInstance(newApp.getResource().getMemorySize(), newApp
.getResource().getvCores());
return r;
}
@ -2012,7 +2012,7 @@ public class RMWebServices extends WebServices {
.getReservationRequest()) {
ResourceInfo rInfo = resReqInfo.getCapability();
Resource capability =
Resource.newInstance(rInfo.getMemory(), rInfo.getvCores());
Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores());
int numContainers = resReqInfo.getNumContainers();
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();
@ -2125,7 +2125,7 @@ public class RMWebServices extends WebServices {
.getReservationRequest()) {
ResourceInfo rInfo = resReqInfo.getCapability();
Resource capability =
Resource.newInstance(rInfo.getMemory(), rInfo.getvCores());
Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores());
int numContainers = resReqInfo.getNumContainers();
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();

View File

@ -82,8 +82,8 @@ public class AppInfo {
protected long elapsedTime;
protected String amContainerLogs;
protected String amHostHttpAddress;
protected int allocatedMB;
protected int allocatedVCores;
protected long allocatedMB;
protected long allocatedVCores;
protected int runningContainers;
protected long memorySeconds;
protected long vcoreSeconds;
@ -91,8 +91,8 @@ public class AppInfo {
protected float clusterUsagePercentage;
// preemption info fields
protected int preemptedResourceMB;
protected int preemptedResourceVCores;
protected long preemptedResourceMB;
protected long preemptedResourceVCores;
protected int numNonAMContainerPreempted;
protected int numAMContainerPreempted;
@ -174,7 +174,7 @@ public class AppInfo {
.getApplicationResourceUsageReport();
if (resourceReport != null) {
Resource usedResources = resourceReport.getUsedResources();
allocatedMB = usedResources.getMemory();
allocatedMB = usedResources.getMemorySize();
allocatedVCores = usedResources.getVirtualCores();
runningContainers = resourceReport.getNumUsedContainers();
queueUsagePercentage = resourceReport.getQueueUsagePercentage();
@ -190,7 +190,7 @@ public class AppInfo {
numAMContainerPreempted =
appMetrics.getNumAMContainersPreempted();
preemptedResourceMB =
appMetrics.getResourcePreempted().getMemory();
appMetrics.getResourcePreempted().getMemorySize();
numNonAMContainerPreempted =
appMetrics.getNumNonAMContainersPreempted();
preemptedResourceVCores =
@ -302,19 +302,19 @@ public class AppInfo {
return this.runningContainers;
}
public int getAllocatedMB() {
public long getAllocatedMB() {
return this.allocatedMB;
}
public int getAllocatedVCores() {
public long getAllocatedVCores() {
return this.allocatedVCores;
}
public int getPreemptedMB() {
public long getPreemptedMB() {
return preemptedResourceMB;
}
public int getPreemptedVCores() {
public long getPreemptedVCores() {
return preemptedResourceVCores;
}

View File

@ -54,10 +54,10 @@ public class FairSchedulerInfo extends SchedulerInfo {
* <code>FairSchedulerInfo#INVALID_FAIR_SHARE</code> if the scheduler does
* not know about this application attempt.
*/
public int getAppFairShare(ApplicationAttemptId appAttemptId) {
public long getAppFairShare(ApplicationAttemptId appAttemptId) {
FSAppAttempt fsAppAttempt = scheduler.getSchedulerApp(appAttemptId);
return fsAppAttempt == null ?
INVALID_FAIR_SHARE : fsAppAttempt.getFairShare().getMemory();
INVALID_FAIR_SHARE : fsAppAttempt.getFairShare().getMemorySize();
}
public FairSchedulerQueueInfo getRootQueueInfo() {

View File

@ -83,8 +83,8 @@ public class FairSchedulerQueueInfo {
usedResources = new ResourceInfo(queue.getResourceUsage());
demandResources = new ResourceInfo(queue.getDemand());
fractionMemUsed = (float)usedResources.getMemory() /
clusterResources.getMemory();
fractionMemUsed = (float)usedResources.getMemorySize() /
clusterResources.getMemorySize();
steadyFairResources = new ResourceInfo(queue.getSteadyFairShare());
fairResources = new ResourceInfo(queue.getFairShare());
@ -95,11 +95,11 @@ public class FairSchedulerQueueInfo {
scheduler.getClusterResource()));
fractionMemSteadyFairShare =
(float)steadyFairResources.getMemory() / clusterResources.getMemory();
fractionMemFairShare = (float) fairResources.getMemory()
/ clusterResources.getMemory();
fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory();
fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory();
(float)steadyFairResources.getMemorySize() / clusterResources.getMemorySize();
fractionMemFairShare = (float) fairResources.getMemorySize()
/ clusterResources.getMemorySize();
fractionMemMinShare = (float)minResources.getMemorySize() / clusterResources.getMemorySize();
fractionMemMaxShare = (float)maxResources.getMemorySize() / clusterResources.getMemorySize();
maxApps = allocConf.getQueueMaxApps(queueName);

View File

@ -40,8 +40,8 @@ public class FifoSchedulerInfo extends SchedulerInfo {
protected float capacity;
protected float usedCapacity;
protected QueueState qstate;
protected int minQueueMemoryCapacity;
protected int maxQueueMemoryCapacity;
protected long minQueueMemoryCapacity;
protected long maxQueueMemoryCapacity;
protected int numNodes;
protected int usedNodeCapacity;
protected int availNodeCapacity;
@ -67,8 +67,8 @@ public class FifoSchedulerInfo extends SchedulerInfo {
this.usedCapacity = qInfo.getCurrentCapacity();
this.capacity = qInfo.getCapacity();
this.minQueueMemoryCapacity = fs.getMinimumResourceCapability().getMemory();
this.maxQueueMemoryCapacity = fs.getMaximumResourceCapability().getMemory();
this.minQueueMemoryCapacity = fs.getMinimumResourceCapability().getMemorySize();
this.maxQueueMemoryCapacity = fs.getMaximumResourceCapability().getMemorySize();
this.qstate = qInfo.getQueueState();
this.numNodes = rmContext.getRMNodes().size();
@ -79,9 +79,9 @@ public class FifoSchedulerInfo extends SchedulerInfo {
for (RMNode ni : rmContext.getRMNodes().values()) {
SchedulerNodeReport report = fs.getNodeReport(ni.getNodeID());
this.usedNodeCapacity += report.getUsedResource().getMemory();
this.availNodeCapacity += report.getAvailableResource().getMemory();
this.totalNodeCapacity += ni.getTotalCapability().getMemory();
this.usedNodeCapacity += report.getUsedResource().getMemorySize();
this.availNodeCapacity += report.getAvailableResource().getMemorySize();
this.totalNodeCapacity += ni.getTotalCapability().getMemorySize();
this.numContainers += fs.getNodeReport(ni.getNodeID()).getNumContainers();
}
}
@ -114,11 +114,11 @@ public class FifoSchedulerInfo extends SchedulerInfo {
return this.qName;
}
public int getMinQueueMemoryCapacity() {
public long getMinQueueMemoryCapacity() {
return this.minQueueMemoryCapacity;
}
public int getMaxQueueMemoryCapacity() {
public long getMaxQueueMemoryCapacity() {
return this.maxQueueMemoryCapacity;
}

View File

@ -63,8 +63,8 @@ public class NodeInfo {
this.availMemoryMB = 0;
if (report != null) {
this.numContainers = report.getNumContainers();
this.usedMemoryMB = report.getUsedResource().getMemory();
this.availMemoryMB = report.getAvailableResource().getMemory();
this.usedMemoryMB = report.getUsedResource().getMemorySize();
this.availMemoryMB = report.getAvailableResource().getMemorySize();
this.usedVirtualCores = report.getUsedResource().getVirtualCores();
this.availableVirtualCores = report.getAvailableResource().getVirtualCores();
}

View File

@ -27,22 +27,22 @@ import org.apache.hadoop.yarn.api.records.Resource;
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class ResourceInfo {
int memory;
int vCores;
long memory;
long vCores;
public ResourceInfo() {
}
public ResourceInfo(Resource res) {
memory = res.getMemory();
memory = res.getMemorySize();
vCores = res.getVirtualCores();
}
public int getMemory() {
public long getMemorySize() {
return memory;
}
public int getvCores() {
public long getvCores() {
return vCores;
}

View File

@ -194,7 +194,7 @@ public class Application {
Resource currentSpec = requestSpec.put(priority, capability);
if (currentSpec != null) {
throw new IllegalStateException("Resource spec already exists for " +
"priority " + priority.getPriority() + " - " + currentSpec.getMemory());
"priority " + priority.getPriority() + " - " + currentSpec.getMemorySize());
}
}

View File

@ -50,7 +50,7 @@ public class MockNM {
private int responseId;
private NodeId nodeId;
private int memory;
private long memory;
private int vCores;
private ResourceTrackerService resourceTracker;
private int httpPort = 2;
@ -144,7 +144,7 @@ public class MockNM {
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
Resource newResource = registrationResponse.getResource();
if (newResource != null) {
memory = newResource.getMemory();
memory = (int) newResource.getMemorySize();
vCores = newResource.getVirtualCores();
}
return registrationResponse;
@ -219,14 +219,14 @@ public class MockNM {
Resource newResource = heartbeatResponse.getResource();
if (newResource != null) {
memory = newResource.getMemory();
memory = newResource.getMemorySize();
vCores = newResource.getVirtualCores();
}
return heartbeatResponse;
}
public int getMemory() {
public long getMemory() {
return memory;
}

View File

@ -89,13 +89,13 @@ public class MockNodes {
public static Resource newUsedResource(Resource total) {
Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory((int)(Math.random() * total.getMemory()));
rs.setMemory((int)(Math.random() * total.getMemorySize()));
return rs;
}
public static Resource newAvailResource(Resource total, Resource used) {
Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory(total.getMemory() - used.getMemory());
rs.setMemory(total.getMemorySize() - used.getMemorySize());
return rs;
}

View File

@ -213,12 +213,12 @@ public class NodeManager implements ContainerManagementProtocol {
synchronized public void checkResourceUsage() {
LOG.info("Checking resource usage for " + containerManagerAddress);
Assert.assertEquals(available.getMemory(),
Assert.assertEquals(available.getMemorySize(),
resourceManager.getResourceScheduler().getNodeReport(
this.nodeId).getAvailableResource().getMemory());
Assert.assertEquals(used.getMemory(),
this.nodeId).getAvailableResource().getMemorySize());
Assert.assertEquals(used.getMemorySize(),
resourceManager.getResourceScheduler().getNodeReport(
this.nodeId).getUsedResource().getMemory());
this.nodeId).getUsedResource().getMemorySize());
}
@Override

View File

@ -488,7 +488,7 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private boolean checkCapacity(Collection<Plan> plans) {
for (Plan plan : plans) {
if (plan.getTotalCapacity().getMemory() > 0) {
if (plan.getTotalCapacity().getMemorySize() > 0) {
return true;
}
}

View File

@ -387,11 +387,11 @@ public class TestApplicationACLs {
Assert.assertEquals("Enemy should not see app reserved containers",
-1, usageReport.getNumReservedContainers());
Assert.assertEquals("Enemy should not see app used resources",
-1, usageReport.getUsedResources().getMemory());
-1, usageReport.getUsedResources().getMemorySize());
Assert.assertEquals("Enemy should not see app reserved resources",
-1, usageReport.getReservedResources().getMemory());
-1, usageReport.getReservedResources().getMemorySize());
Assert.assertEquals("Enemy should not see app needed resources",
-1, usageReport.getNeededResources().getMemory());
-1, usageReport.getNeededResources().getMemorySize());
}
private void verifyInvalidQueueWithAcl() throws Exception {

View File

@ -534,7 +534,7 @@ public class TestApplicationCleanup {
// 4. Verify Memory Usage by cluster, it should be 3072. AM memory +
// requested memory. 1024 + 2048=3072
ResourceScheduler rs = rm1.getRMContext().getScheduler();
int allocatedMB = rs.getRootQueueMetrics().getAllocatedMB();
long allocatedMB = rs.getRootQueueMetrics().getAllocatedMB();
Assert.assertEquals(amMemory + containerMemory, allocatedMB);
// 5. Re-register NM by sending completed container status

View File

@ -420,7 +420,7 @@ public class TestContainerResourceUsage {
Resource resource = rmContainer.getContainer().getResource();
long usedMillis =
rmContainer.getFinishTime() - rmContainer.getCreationTime();
long memorySeconds = resource.getMemory()
long memorySeconds = resource.getMemorySize()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
long vcoreSeconds = resource.getVirtualCores()
* usedMillis / DateUtils.MILLIS_PER_SECOND;

View File

@ -161,7 +161,7 @@ public class TestDistributedSchedulingService {
Assert.assertEquals(4,
dsRegResp.getMaxAllocatableCapabilty().getVirtualCores());
Assert.assertEquals(1024,
dsRegResp.getMinAllocatableCapabilty().getMemory());
dsRegResp.getMinAllocatableCapabilty().getMemorySize());
Assert.assertEquals(2,
dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import org.junit.Before;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.util.ArrayList;
@ -45,7 +44,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -61,8 +59,6 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -79,8 +75,6 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRM extends ParameterizedSchedulerTestBase {
@ -112,7 +106,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
GetNewApplicationResponse resp = rm.getNewAppId();
assert (resp.getApplicationId().getId() != 0);
assert (resp.getMaximumResourceCapability().getMemory() > 0);
assert (resp.getMaximumResourceCapability().getMemorySize() > 0);
rm.stop();
}

View File

@ -711,7 +711,7 @@ public class TestRMHA {
}
private void verifyClusterMetrics(int activeNodes, int appsSubmitted,
int appsPending, int containersPending, int availableMB,
int appsPending, int containersPending, long availableMB,
int activeApplications) throws Exception {
int timeoutSecs = 0;
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
@ -742,7 +742,7 @@ public class TestRMHA {
assertTrue(message, isAllMetricAssertionDone);
}
private void assertMetric(String metricName, int expected, int actual) {
private void assertMetric(String metricName, long expected, long actual) {
assertEquals("Incorrect value for metric " + metricName, expected, actual);
}

View File

@ -31,7 +31,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -50,7 +49,6 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
.AllocationExpirationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@ -869,13 +867,13 @@ public class TestRMNodeTransitions {
public void testResourceUpdateOnRunningNode() {
RMNodeImpl node = getRunningNode();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.RUNNING, node.getState());
@ -893,13 +891,13 @@ public class TestRMNodeTransitions {
public void testResourceUpdateOnNewNode() {
RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4));
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.NEW, node.getState());
@ -913,13 +911,13 @@ public class TestRMNodeTransitions {
int initialUnHealthy = cm.getUnhealthyNMs();
int initialDecommissioning = cm.getNumDecommissioningNMs();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), ResourceOption
.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.REBOOTED, node.getState());
@ -994,16 +992,16 @@ public class TestRMNodeTransitions {
public void testResourceUpdateOnDecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource originalCapacity = node.getOriginalTotalCapability();
assertEquals("Memory resource is not match.", originalCapacity.getMemory(), oldCapacity.getMemory());
assertEquals("Memory resource is not match.", originalCapacity.getMemorySize(), oldCapacity.getMemorySize());
assertEquals("CPU resource is not match.", originalCapacity.getVirtualCores(), oldCapacity.getVirtualCores());
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
@ -1016,7 +1014,7 @@ public class TestRMNodeTransitions {
public void testResourceUpdateOnRecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.RECOMMISSION));

View File

@ -193,7 +193,7 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
.synchronizePlan(ReservationSystemTestUtil.reservationQ, false);
if (rm.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
.getMemory() > 0) {
.getMemorySize() > 0) {
break;
}
LOG.info("Waiting for node capacity to be added to plan");

View File

@ -1058,7 +1058,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm.drainEvents();
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
Assert.assertEquals(3, rmNode.getHttpPort());
Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory());
Assert.assertEquals(5120, rmNode.getTotalCapability().getMemorySize());
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
}

View File

@ -87,7 +87,6 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -417,15 +416,15 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
// ************* check Queue metrics ************
QueueMetrics queueMetrics = queue.getMetrics();
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
availableResources.getVirtualCores(), usedResource.getMemory(),
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemorySize(),
availableResources.getVirtualCores(), usedResource.getMemorySize(),
usedResource.getVirtualCores());
// ************ check user metrics ***********
QueueMetrics userMetrics =
queueMetrics.getUserMetrics(app.getUser());
assertMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
availableResources.getVirtualCores(), usedResource.getMemory(),
assertMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemorySize(),
availableResources.getVirtualCores(), usedResource.getMemorySize(),
usedResource.getVirtualCores());
}
@ -485,8 +484,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
// ************ check queue metrics ****************
QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
availableResources.getVirtualCores(), usedResources.getMemory(),
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemorySize(),
availableResources.getVirtualCores(), usedResources.getMemorySize(),
usedResources.getVirtualCores());
}
@ -697,8 +696,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
q1UsedResource, 4);
QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics();
assertMetrics(queue1Metrics, 2, 0, 2, 0, 4,
q1availableResources.getMemory(),
q1availableResources.getVirtualCores(), q1UsedResource.getMemory(),
q1availableResources.getMemorySize(),
q1availableResources.getVirtualCores(), q1UsedResource.getMemorySize(),
q1UsedResource.getVirtualCores());
// assert queue B state.
@ -708,8 +707,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
q2UsedResource, 2);
QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics();
assertMetrics(queue2Metrics, 1, 0, 1, 0, 2,
q2availableResources.getMemory(),
q2availableResources.getVirtualCores(), q2UsedResource.getMemory(),
q2availableResources.getMemorySize(),
q2availableResources.getVirtualCores(), q2UsedResource.getMemorySize(),
q2UsedResource.getVirtualCores());
// assert parent queue state.
@ -718,8 +717,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16,
(float) 6 / 16);
assertMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
totalAvailableResource.getMemory(),
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
totalAvailableResource.getMemorySize(),
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemorySize(),
totalUsedResource.getVirtualCores());
}
@ -1137,8 +1136,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
private void assertMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted,
int allocatedContainers, int availableMB, int availableVirtualCores,
int allocatedMB, int allocatedVirtualCores) {
int allocatedContainers, long availableMB, long availableVirtualCores,
long allocatedMB, long allocatedVirtualCores) {
assertEquals(appsSubmitted, qm.getAppsSubmitted());
assertEquals(appsPending, qm.getAppsPending());
assertEquals(appsRunning, qm.getAppsRunning());

View File

@ -424,10 +424,12 @@ public class TestSystemMetricsPublisher {
container.getAllocatedNode().getPort(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO));
Assert.assertEquals(
container.getAllocatedResource().getMemory(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO));
Assert.assertEquals(container.getAllocatedResource().getMemorySize(),
// KeyValueBasedTimelineStore could cast long to integer, need make sure
// variables for compare have same type.
((Integer) entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO))
.longValue());
Assert.assertEquals(
container.getAllocatedResource().getVirtualCores(),
entity.getOtherInfo().get(

View File

@ -653,12 +653,12 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
public void checkPendingResource(CSQueue queue, String partition, int pending) {
ResourceUsage ru = queue.getQueueResourceUsage();
Assert.assertEquals(pending, ru.getPending(partition).getMemory());
Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
}
public void checkReservedResource(CSQueue queue, String partition, int reserved) {
ResourceUsage ru = queue.getQueueResourceUsage();
Assert.assertEquals(reserved, ru.getReserved(partition).getMemory());
Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());
}
static class IsPreemptionRequestForQueueAndNode

View File

@ -955,14 +955,17 @@ public class TestProportionalCapacityPreemptionPolicy {
// which is likely triggered since we use small numbers for readability
//run with Logger.getRootLogger().setLevel(Level.DEBUG);
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
assertEquals(10, policy.getQueuePartitions().get("queueE").get("").preemptableExtra.getMemory());
assertEquals(10, policy.getQueuePartitions().get("queueE").get("").preemptableExtra.getMemorySize());
//2nd level child(E) preempts 10, but parent A has only 9 extra
//check the parent can prempt only the extra from > 2 level child
TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get("queueA").get("");
assertEquals(0, tempQueueAPartition.untouchableExtra.getMemory());
int extraForQueueA = tempQueueAPartition.getUsed().getMemory()
- tempQueueAPartition.getGuaranteed().getMemory();
assertEquals(extraForQueueA,tempQueueAPartition.preemptableExtra.getMemory());
TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get(
"queueA").get("");
assertEquals(0, tempQueueAPartition.untouchableExtra.getMemorySize());
long extraForQueueA =
tempQueueAPartition.getUsed().getMemorySize() - tempQueueAPartition
.getGuaranteed().getMemorySize();
assertEquals(extraForQueueA,
tempQueueAPartition.preemptableExtra.getMemorySize());
}
@Test
@ -985,14 +988,18 @@ public class TestProportionalCapacityPreemptionPolicy {
policy.editSchedule();
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
assertEquals(10, policy.getQueuePartitions().get("queueE").get("").preemptableExtra.getMemory());
assertEquals(10, policy.getQueuePartitions().get("queueE")
.get("").preemptableExtra.getMemorySize());
//2nd level child(E) preempts 10, but parent A has only 9 extra
//check the parent can prempt only the extra from > 2 level child
TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get("queueA").get("");
assertEquals(0, tempQueueAPartition.untouchableExtra.getMemory());
int extraForQueueA = tempQueueAPartition.getUsed().getMemory()
- tempQueueAPartition.getGuaranteed().getMemory();
assertEquals(extraForQueueA,tempQueueAPartition.preemptableExtra.getMemory());
TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get(
"queueA").get("");
assertEquals(0, tempQueueAPartition.untouchableExtra.getMemorySize());
long extraForQueueA =
tempQueueAPartition.getUsed().getMemorySize() - tempQueueAPartition
.getGuaranteed().getMemorySize();
assertEquals(extraForQueueA,
tempQueueAPartition.preemptableExtra.getMemorySize());
}
static class IsPreemptionRequestFor
@ -1122,12 +1129,12 @@ public class TestProportionalCapacityPreemptionPolicy {
when(root.getAbsoluteCapacity()).thenReturn(
Resources.divide(rc, tot, abs[0], tot));
when(root.getAbsoluteMaximumCapacity()).thenReturn(
maxCap[0] / (float) tot.getMemory());
maxCap[0] / (float) tot.getMemorySize());
when(root.getQueueResourceUsage()).thenReturn(resUsage);
QueueCapacities rootQc = new QueueCapacities(true);
rootQc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[0], tot));
rootQc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[0], tot));
rootQc.setAbsoluteMaximumCapacity(maxCap[0] / (float) tot.getMemory());
rootQc.setAbsoluteMaximumCapacity(maxCap[0] / (float) tot.getMemorySize());
when(root.getQueueCapacities()).thenReturn(rootQc);
when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
boolean preemptionDisabled = mockPreemptionStatus("root");
@ -1153,13 +1160,13 @@ public class TestProportionalCapacityPreemptionPolicy {
when(q.getAbsoluteCapacity()).thenReturn(
Resources.divide(rc, tot, abs[i], tot));
when(q.getAbsoluteMaximumCapacity()).thenReturn(
maxCap[i] / (float) tot.getMemory());
maxCap[i] / (float) tot.getMemorySize());
// We need to make these fields to QueueCapacities
QueueCapacities qc = new QueueCapacities(false);
qc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[i], tot));
qc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[i], tot));
qc.setAbsoluteMaximumCapacity(maxCap[i] / (float) tot.getMemory());
qc.setAbsoluteMaximumCapacity(maxCap[i] / (float) tot.getMemorySize());
when(q.getQueueCapacities()).thenReturn(qc);
String parentPathName = p.getQueuePath();

Some files were not shown because too many files have changed in this diff Show More