YARN-5774. MR Job stuck in ACCEPTED status without any progress in Fair Scheduler

if set yarn.scheduler.minimum-allocation-mb to 0. (Contributed by Yufei Gu via Daniel Templeton)

(cherry picked from commit 25f9872be6)
This commit is contained in:
Daniel Templeton 2016-11-29 09:40:49 -08:00
parent 0c895e8a62
commit b8bebb8607
16 changed files with 258 additions and 146 deletions

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* {@code AbstractResourceRequest} represents a generic resource request made
* by an application to the {@code ResourceManager}.
* <p>
* It includes:
* <ul>
* <li>{@link Resource} capability required for each request.</li>
* </ul>
*
* @see Resource
*/
@Public
@Unstable
public abstract class AbstractResourceRequest {
/**
* Set the <code>Resource</code> capability of the request
* @param capability <code>Resource</code> capability of the request
*/
@Public
@Stable
public abstract void setCapability(Resource capability);
/**
* Get the <code>Resource</code> capability of the request.
* @return <code>Resource</code> capability of the request
*/
@Public
@Stable
public abstract Resource getCapability();
}

View File

@ -58,7 +58,8 @@ import org.apache.hadoop.yarn.util.Records;
*/ */
@Public @Public
@Stable @Stable
public abstract class ResourceRequest implements Comparable<ResourceRequest> { public abstract class ResourceRequest extends AbstractResourceRequest
implements Comparable<ResourceRequest>{
@Public @Public
@Stable @Stable
@ -345,22 +346,6 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
@Stable @Stable
public abstract void setResourceName(String resourceName); public abstract void setResourceName(String resourceName);
/**
* Get the <code>Resource</code> capability of the request.
* @return <code>Resource</code> capability of the request
*/
@Public
@Stable
public abstract Resource getCapability();
/**
* Set the <code>Resource</code> capability of the request
* @param capability <code>Resource</code> capability of the request
*/
@Public
@Stable
public abstract void setCapability(Resource capability);
/** /**
* Get the number of containers required with the given specifications. * Get the number of containers required with the given specifications.
* @return number of containers required with the given specifications * @return number of containers required with the given specifications

View File

@ -62,7 +62,7 @@ import org.apache.hadoop.yarn.util.Records;
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
public abstract class UpdateContainerRequest { public abstract class UpdateContainerRequest extends AbstractResourceRequest {
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ -127,22 +127,6 @@ public abstract class UpdateContainerRequest {
@InterfaceStability.Unstable @InterfaceStability.Unstable
public abstract void setContainerId(ContainerId containerId); public abstract void setContainerId(ContainerId containerId);
/**
* Get the <code>Resource</code> capability of the container.
* @return <code>Resource</code> capability of the container
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract Resource getCapability();
/**
* Set the <code>Resource</code> capability of the container.
* @param capability <code>Resource</code> capability of the container
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract void setCapability(Resource capability);
/** /**
* Get the target <code>ExecutionType</code> of the container. * Get the target <code>ExecutionType</code> of the container.
* @return <code>ExecutionType</code> of the container * @return <code>ExecutionType</code> of the container

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.yarn.util.resource; package org.apache.hadoop.yarn.util.resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -24,6 +26,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
@Private @Private
@Unstable @Unstable
public class DefaultResourceCalculator extends ResourceCalculator { public class DefaultResourceCalculator extends ResourceCalculator {
private static final Log LOG =
LogFactory.getLog(DefaultResourceCalculator.class);
@Override @Override
public int compare(Resource unused, Resource lhs, Resource rhs) { public int compare(Resource unused, Resource lhs, Resource rhs) {
@ -64,6 +68,13 @@ public class DefaultResourceCalculator extends ResourceCalculator {
@Override @Override
public Resource normalize(Resource r, Resource minimumResource, public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) { Resource maximumResource, Resource stepFactor) {
if (stepFactor.getMemorySize() == 0) {
LOG.error("Memory cannot be allocated in increments of zero. Assuming " +
minimumResource.getMemorySize() + "MB increment size. "
+ "Please ensure the scheduler configuration is correct.");
stepFactor = minimumResource;
}
long normalizedMemory = Math.min( long normalizedMemory = Math.min(
roundUp( roundUp(
Math.max(r.getMemorySize(), minimumResource.getMemorySize()), Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
@ -72,12 +83,6 @@ public class DefaultResourceCalculator extends ResourceCalculator {
return Resources.createResource(normalizedMemory); return Resources.createResource(normalizedMemory);
} }
@Override
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource) {
return normalize(r, minimumResource, maximumResource, minimumResource);
}
@Override @Override
public Resource roundUp(Resource r, Resource stepFactor) { public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource( return Resources.createResource(

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.yarn.util.resource; package org.apache.hadoop.yarn.util.resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -45,6 +47,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
@Private @Private
@Unstable @Unstable
public class DominantResourceCalculator extends ResourceCalculator { public class DominantResourceCalculator extends ResourceCalculator {
private static final Log LOG =
LogFactory.getLog(DominantResourceCalculator.class);
@Override @Override
public int compare(Resource clusterResource, Resource lhs, Resource rhs) { public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
@ -152,6 +156,25 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override @Override
public Resource normalize(Resource r, Resource minimumResource, public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) { Resource maximumResource, Resource stepFactor) {
if (stepFactor.getMemorySize() == 0 || stepFactor.getVirtualCores() == 0) {
Resource step = Resources.clone(stepFactor);
if (stepFactor.getMemorySize() == 0) {
LOG.error("Memory cannot be allocated in increments of zero. Assuming "
+ minimumResource.getMemorySize() + "MB increment size. "
+ "Please ensure the scheduler configuration is correct.");
step.setMemorySize(minimumResource.getMemorySize());
}
if (stepFactor.getVirtualCores() == 0) {
LOG.error("VCore cannot be allocated in increments of zero. Assuming "
+ minimumResource.getVirtualCores() + "VCores increment size. "
+ "Please ensure the scheduler configuration is correct.");
step.setVirtualCores(minimumResource.getVirtualCores());
}
stepFactor = step;
}
long normalizedMemory = Math.min( long normalizedMemory = Math.min(
roundUp( roundUp(
Math.max(r.getMemorySize(), minimumResource.getMemorySize()), Math.max(r.getMemorySize(), minimumResource.getMemorySize()),

View File

@ -99,22 +99,7 @@ public abstract class ResourceCalculator {
/** /**
* Normalize resource <code>r</code> given the base * Normalize resource <code>r</code> given the base
* <code>minimumResource</code> and verify against max allowed * <code>minimumResource</code> and verify against max allowed
* <code>maximumResource</code> * <code>maximumResource</code> using a step factor for the normalization.
*
* @param r resource
* @param minimumResource step-factor
* @param maximumResource the upper bound of the resource to be allocated
* @return normalized resource
*/
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource) {
return normalize(r, minimumResource, maximumResource, minimumResource);
}
/**
* Normalize resource <code>r</code> given the base
* <code>minimumResource</code> and verify against max allowed
* <code>maximumResource</code> using a step factor for hte normalization.
* *
* @param r resource * @param r resource
* @param minimumResource minimum value * @param minimumResource minimum value

View File

@ -150,4 +150,83 @@ public class TestResourceCalculator {
Resources.min(resourceCalculator, clusterResource, lhs, rhs)); Resources.min(resourceCalculator, clusterResource, lhs, rhs));
} }
/**
* Test resource normalization.
*/
@Test(timeout = 10000)
public void testNormalize() {
// requested resources value cannot be an arbitrary number.
Resource ask = Resource.newInstance(1111, 2);
Resource min = Resource.newInstance(1024, 1);
Resource max = Resource.newInstance(8 * 1024, 8);
Resource increment = Resource.newInstance(1024, 4);
if (resourceCalculator instanceof DefaultResourceCalculator) {
Resource result = Resources.normalize(resourceCalculator,
ask, min, max, increment);
Assert.assertEquals(2 * 1024, result.getMemorySize());
} else if (resourceCalculator instanceof DominantResourceCalculator) {
Resource result = Resources.normalize(resourceCalculator,
ask, min, max, increment);
Assert.assertEquals(2 * 1024, result.getMemorySize());
Assert.assertEquals(4, result.getVirtualCores());
}
// if resources asked are less than minimum resource, then normalize it to
// minimum resource.
ask = Resource.newInstance(512, 0);
min = Resource.newInstance(2 * 1024, 2);
max = Resource.newInstance(8 * 1024, 8);
increment = Resource.newInstance(1024, 1);
if (resourceCalculator instanceof DefaultResourceCalculator) {
Resource result = Resources.normalize(resourceCalculator,
ask, min, max, increment);
Assert.assertEquals(2 * 1024, result.getMemorySize());
} else if (resourceCalculator instanceof DominantResourceCalculator) {
Resource result = Resources.normalize(resourceCalculator,
ask, min, max, increment);
Assert.assertEquals(2 * 1024, result.getMemorySize());
Assert.assertEquals(2, result.getVirtualCores());
}
// if resources asked are larger than maximum resource, then normalize it to
// maximum resources.
ask = Resource.newInstance(9 * 1024, 9);
min = Resource.newInstance(2 * 1024, 2);
max = Resource.newInstance(8 * 1024, 8);
increment = Resource.newInstance(1024, 1);
if (resourceCalculator instanceof DefaultResourceCalculator) {
Resource result = Resources.normalize(resourceCalculator,
ask, min, max, increment);
Assert.assertEquals(8 * 1024, result.getMemorySize());
} else if (resourceCalculator instanceof DominantResourceCalculator) {
Resource result = Resources.normalize(resourceCalculator,
ask, min, max, increment);
Assert.assertEquals(8 * 1024, result.getMemorySize());
Assert.assertEquals(8, result.getVirtualCores());
}
// if increment is 0, use minimum resource as the increment resource.
ask = Resource.newInstance(1111, 2);
min = Resource.newInstance(2 * 1024, 2);
max = Resource.newInstance(8 * 1024, 8);
increment = Resource.newInstance(0, 0);
if (resourceCalculator instanceof DefaultResourceCalculator) {
Resource result = Resources.normalize(resourceCalculator,
ask, min, max, increment);
Assert.assertEquals(2 * 1024, result.getMemorySize());
} else if (resourceCalculator instanceof DominantResourceCalculator) {
Resource result = Resources.normalize(resourceCalculator,
ask, min, max, increment);
Assert.assertEquals(2 * 1024, result.getMemorySize());
Assert.assertEquals(2, result.getVirtualCores());
}
}
} }

View File

@ -452,11 +452,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
throw e; throw e;
} }
SchedulerUtils.normalizeRequest(amReq, scheduler.getResourceCalculator(), scheduler.normalizeRequest(amReq);
scheduler.getClusterResource(),
scheduler.getMinimumResourceCapability(),
scheduler.getMaximumResourceCapability(),
scheduler.getMinimumResourceCapability());
return amReq; return amReq;
} }

View File

@ -73,7 +73,6 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
/** /**
@ -304,13 +303,7 @@ public class RMServerUtils {
return false; return false;
} }
ResourceScheduler scheduler = rmContext.getScheduler(); ResourceScheduler scheduler = rmContext.getScheduler();
ResourceCalculator rc = scheduler.getResourceCalculator(); scheduler.normalizeRequest(request);
Resource targetResource = Resources.normalize(rc, request.getCapability(),
scheduler.getMinimumResourceCapability(),
scheduler.getMaximumResourceCapability(),
scheduler.getMinimumResourceCapability());
// Update normalized target resource
request.setCapability(targetResource);
return true; return true;
} }

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -1021,4 +1022,23 @@ public abstract class AbstractYarnScheduler
} }
} }
@Override
public void normalizeRequest(AbstractResourceRequest ask) {
SchedulerUtils.normalizeRequest(ask,
getResourceCalculator(),
getMinimumResourceCapability(),
getMaximumResourceCapability(),
getMinimumResourceCapability());
}
/**
* Normalize a list of resource requests.
*
* @param asks resource requests
*/
protected void normalizeRequests(List<ResourceRequest> asks) {
for (ResourceRequest ask: asks) {
normalizeRequest(ask);
}
}
} }

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -33,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
@ -123,23 +123,6 @@ public class SchedulerUtils {
return containerStatus; return containerStatus;
} }
/**
* Utility method to normalize a list of resource requests, by insuring that
* the memory for each request is a multiple of minMemory and is not zero.
*/
public static void normalizeRequests(
List<ResourceRequest> asks,
ResourceCalculator resourceCalculator,
Resource clusterResource,
Resource minimumResource,
Resource maximumResource) {
for (ResourceRequest ask : asks) {
normalizeRequest(
ask, resourceCalculator, clusterResource, minimumResource,
maximumResource, minimumResource);
}
}
/** /**
* Utility method to normalize a resource request, by insuring that the * Utility method to normalize a resource request, by insuring that the
* requested memory is a multiple of minMemory and is not zero. * requested memory is a multiple of minMemory and is not zero.
@ -147,47 +130,23 @@ public class SchedulerUtils {
public static void normalizeRequest( public static void normalizeRequest(
ResourceRequest ask, ResourceRequest ask,
ResourceCalculator resourceCalculator, ResourceCalculator resourceCalculator,
Resource clusterResource,
Resource minimumResource, Resource minimumResource,
Resource maximumResource) { Resource maximumResource) {
Resource normalized = normalizeRequest(ask, resourceCalculator,
Resources.normalize( minimumResource, maximumResource, minimumResource);
resourceCalculator, ask.getCapability(), minimumResource,
maximumResource, minimumResource);
ask.setCapability(normalized);
}
/**
* Utility method to normalize a list of resource requests, by insuring that
* the memory for each request is a multiple of minMemory and is not zero.
*/
public static void normalizeRequests(
List<ResourceRequest> asks,
ResourceCalculator resourceCalculator,
Resource clusterResource,
Resource minimumResource,
Resource maximumResource,
Resource incrementResource) {
for (ResourceRequest ask : asks) {
normalizeRequest(
ask, resourceCalculator, clusterResource, minimumResource,
maximumResource, incrementResource);
}
} }
/** /**
* Utility method to normalize a resource request, by insuring that the * Utility method to normalize a resource request, by insuring that the
* requested memory is a multiple of minMemory and is not zero. * requested memory is a multiple of increment resource and is not zero.
*/ */
public static void normalizeRequest( public static void normalizeRequest(
ResourceRequest ask, AbstractResourceRequest ask,
ResourceCalculator resourceCalculator, ResourceCalculator resourceCalculator,
Resource clusterResource,
Resource minimumResource, Resource minimumResource,
Resource maximumResource, Resource maximumResource,
Resource incrementResource) { Resource incrementResource) {
Resource normalized = Resource normalized = Resources.normalize(
Resources.normalize(
resourceCalculator, ask.getCapability(), minimumResource, resourceCalculator, ask.getCapability(), minimumResource,
maximumResource, incrementResource); maximumResource, incrementResource);
ask.setCapability(normalized); ask.setCapability(normalized);

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -368,4 +369,11 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @return SchedulerNode corresponds to nodeId * @return SchedulerNode corresponds to nodeId
*/ */
SchedulerNode getSchedulerNode(NodeId nodeId); SchedulerNode getSchedulerNode(NodeId nodeId);
/**
* Normalize a resource request.
*
* @param request the resource request to be normalized
*/
void normalizeRequest(AbstractResourceRequest request);
} }

View File

@ -1120,9 +1120,7 @@ public class CapacityScheduler extends
decreaseContainers(decreaseRequests, application); decreaseContainers(decreaseRequests, application);
// Sanity check for new allocation requests // Sanity check for new allocation requests
SchedulerUtils.normalizeRequests(ask, getResourceCalculator(), normalizeRequests(ask);
getClusterResource(), getMinimumResourceCapability(),
getMaximumResourceCapability());
Allocation allocation; Allocation allocation;

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -222,12 +223,12 @@ public class FairScheduler extends
getClusterResource(), resource, reservationThreshold); getClusterResource(), resource, reservationThreshold);
} }
private void validateConf(Configuration conf) { private void validateConf(FairSchedulerConfiguration config) {
// validate scheduler memory allocation setting // validate scheduler memory allocation setting
int minMem = conf.getInt( int minMem = config.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int maxMem = conf.getInt( int maxMem = config.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
@ -242,11 +243,19 @@ public class FairScheduler extends
+ "the minimum allocation value."); + "the minimum allocation value.");
} }
long incrementMem = config.getIncrementAllocation().getMemorySize();
if (incrementMem <= 0) {
throw new YarnRuntimeException("Invalid resource scheduler memory"
+ " allocation configuration: "
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB
+ "=" + incrementMem + ". Values must be greater than 0.");
}
// validate scheduler vcores allocation setting // validate scheduler vcores allocation setting
int minVcores = conf.getInt( int minVcores = config.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
int maxVcores = conf.getInt( int maxVcores = config.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
@ -260,6 +269,14 @@ public class FairScheduler extends
+ "and the maximum allocation value must be greater than or equal to" + "and the maximum allocation value must be greater than or equal to"
+ "the minimum allocation value."); + "the minimum allocation value.");
} }
int incrementVcore = config.getIncrementAllocation().getVirtualCores();
if (incrementVcore <= 0) {
throw new YarnRuntimeException("Invalid resource scheduler vcores"
+ " allocation configuration: "
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES
+ "=" + incrementVcore + ". Values must be greater than 0.");
}
} }
public FairSchedulerConfiguration getConf() { public FairSchedulerConfiguration getConf() {
@ -963,6 +980,15 @@ public class FairScheduler extends
} }
} }
@Override
public void normalizeRequest(AbstractResourceRequest ask) {
SchedulerUtils.normalizeRequest(ask,
DOMINANT_RESOURCE_CALCULATOR,
minimumAllocation,
getMaximumResourceCapability(),
incrAllocation);
}
@Override @Override
public Allocation allocate(ApplicationAttemptId appAttemptId, public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release, List<ResourceRequest> ask, List<ContainerId> release,
@ -979,9 +1005,7 @@ public class FairScheduler extends
} }
// Sanity check // Sanity check
SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR, normalizeRequests(ask);
getClusterResource(), minimumAllocation, getMaximumResourceCapability(),
incrAllocation);
// Record container allocation start time // Record container allocation start time
application.recordContainerRequestTime(getClock().getTime()); application.recordContainerRequestTime(getClock().getTime());

View File

@ -335,9 +335,7 @@ public class FifoScheduler extends
} }
// Sanity check // Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator, normalizeRequests(ask);
getClusterResource(), minimumAllocation,
getMaximumResourceCapability());
// Release containers // Release containers
releaseContainers(release, application); releaseContainers(release, application);

View File

@ -112,37 +112,37 @@ public class TestSchedulerUtils {
// case negative memory // case negative memory
ask.setCapability(Resources.createResource(-1024)); ask.setCapability(Resources.createResource(-1024));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
maxResource); maxResource);
assertEquals(minMemory, ask.getCapability().getMemorySize()); assertEquals(minMemory, ask.getCapability().getMemorySize());
// case zero memory // case zero memory
ask.setCapability(Resources.createResource(0)); ask.setCapability(Resources.createResource(0));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
maxResource); maxResource);
assertEquals(minMemory, ask.getCapability().getMemorySize()); assertEquals(minMemory, ask.getCapability().getMemorySize());
// case memory is a multiple of minMemory // case memory is a multiple of minMemory
ask.setCapability(Resources.createResource(2 * minMemory)); ask.setCapability(Resources.createResource(2 * minMemory));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
maxResource); maxResource);
assertEquals(2 * minMemory, ask.getCapability().getMemorySize()); assertEquals(2 * minMemory, ask.getCapability().getMemorySize());
// case memory is not a multiple of minMemory // case memory is not a multiple of minMemory
ask.setCapability(Resources.createResource(minMemory + 10)); ask.setCapability(Resources.createResource(minMemory + 10));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
maxResource); maxResource);
assertEquals(2 * minMemory, ask.getCapability().getMemorySize()); assertEquals(2 * minMemory, ask.getCapability().getMemorySize());
// case memory is equal to max allowed // case memory is equal to max allowed
ask.setCapability(Resources.createResource(maxMemory)); ask.setCapability(Resources.createResource(maxMemory));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
maxResource); maxResource);
assertEquals(maxMemory, ask.getCapability().getMemorySize()); assertEquals(maxMemory, ask.getCapability().getMemorySize());
// case memory is just less than max // case memory is just less than max
ask.setCapability(Resources.createResource(maxMemory - 10)); ask.setCapability(Resources.createResource(maxMemory - 10));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
maxResource); maxResource);
assertEquals(maxMemory, ask.getCapability().getMemorySize()); assertEquals(maxMemory, ask.getCapability().getMemorySize());
@ -150,14 +150,14 @@ public class TestSchedulerUtils {
maxResource = Resources.createResource(maxMemory - 10, 0); maxResource = Resources.createResource(maxMemory - 10, 0);
ask.setCapability(Resources.createResource(maxMemory - 100)); ask.setCapability(Resources.createResource(maxMemory - 100));
// multiple of minMemory > maxMemory, then reduce to maxMemory // multiple of minMemory > maxMemory, then reduce to maxMemory
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
maxResource); maxResource);
assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize()); assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize());
// ask is more than max // ask is more than max
maxResource = Resources.createResource(maxMemory, 0); maxResource = Resources.createResource(maxMemory, 0);
ask.setCapability(Resources.createResource(maxMemory + 100)); ask.setCapability(Resources.createResource(maxMemory + 100));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource, SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
maxResource); maxResource);
assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize()); assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize());
} }
@ -175,13 +175,13 @@ public class TestSchedulerUtils {
// case negative memory/vcores // case negative memory/vcores
ask.setCapability(Resources.createResource(-1024, -1)); ask.setCapability(Resources.createResource(-1024, -1));
SchedulerUtils.normalizeRequest( SchedulerUtils.normalizeRequest(
ask, resourceCalculator, clusterResource, minResource, maxResource); ask, resourceCalculator, minResource, maxResource);
assertEquals(minResource, ask.getCapability()); assertEquals(minResource, ask.getCapability());
// case zero memory/vcores // case zero memory/vcores
ask.setCapability(Resources.createResource(0, 0)); ask.setCapability(Resources.createResource(0, 0));
SchedulerUtils.normalizeRequest( SchedulerUtils.normalizeRequest(
ask, resourceCalculator, clusterResource, minResource, maxResource); ask, resourceCalculator, minResource, maxResource);
assertEquals(minResource, ask.getCapability()); assertEquals(minResource, ask.getCapability());
assertEquals(1, ask.getCapability().getVirtualCores()); assertEquals(1, ask.getCapability().getVirtualCores());
assertEquals(1024, ask.getCapability().getMemorySize()); assertEquals(1024, ask.getCapability().getMemorySize());
@ -189,7 +189,7 @@ public class TestSchedulerUtils {
// case non-zero memory & zero cores // case non-zero memory & zero cores
ask.setCapability(Resources.createResource(1536, 0)); ask.setCapability(Resources.createResource(1536, 0));
SchedulerUtils.normalizeRequest( SchedulerUtils.normalizeRequest(
ask, resourceCalculator, clusterResource, minResource, maxResource); ask, resourceCalculator, minResource, maxResource);
assertEquals(Resources.createResource(2048, 1), ask.getCapability()); assertEquals(Resources.createResource(2048, 1), ask.getCapability());
assertEquals(1, ask.getCapability().getVirtualCores()); assertEquals(1, ask.getCapability().getVirtualCores());
assertEquals(2048, ask.getCapability().getMemorySize()); assertEquals(2048, ask.getCapability().getMemorySize());