YARN-5774. MR Job stuck in ACCEPTED status without any progress in Fair Scheduler
if set yarn.scheduler.minimum-allocation-mb to 0. (Contributed by Yufei Gu via Daniel Templeton)
This commit is contained in:
parent
00096dcc0c
commit
25f9872be6
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
|
||||
/**
|
||||
* {@code AbstractResourceRequest} represents a generic resource request made
|
||||
* by an application to the {@code ResourceManager}.
|
||||
* <p>
|
||||
* It includes:
|
||||
* <ul>
|
||||
* <li>{@link Resource} capability required for each request.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @see Resource
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class AbstractResourceRequest {
|
||||
|
||||
/**
|
||||
* Set the <code>Resource</code> capability of the request
|
||||
* @param capability <code>Resource</code> capability of the request
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract void setCapability(Resource capability);
|
||||
|
||||
/**
|
||||
* Get the <code>Resource</code> capability of the request.
|
||||
* @return <code>Resource</code> capability of the request
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract Resource getCapability();
|
||||
}
|
|
@ -58,7 +58,8 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
||||
public abstract class ResourceRequest extends AbstractResourceRequest
|
||||
implements Comparable<ResourceRequest>{
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
|
@ -345,22 +346,6 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
|||
@Stable
|
||||
public abstract void setResourceName(String resourceName);
|
||||
|
||||
/**
|
||||
* Get the <code>Resource</code> capability of the request.
|
||||
* @return <code>Resource</code> capability of the request
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract Resource getCapability();
|
||||
|
||||
/**
|
||||
* Set the <code>Resource</code> capability of the request
|
||||
* @param capability <code>Resource</code> capability of the request
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract void setCapability(Resource capability);
|
||||
|
||||
/**
|
||||
* Get the number of containers required with the given specifications.
|
||||
* @return number of containers required with the given specifications
|
||||
|
|
|
@ -62,7 +62,7 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public abstract class UpdateContainerRequest {
|
||||
public abstract class UpdateContainerRequest extends AbstractResourceRequest {
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
|
@ -127,22 +127,6 @@ public abstract class UpdateContainerRequest {
|
|||
@InterfaceStability.Unstable
|
||||
public abstract void setContainerId(ContainerId containerId);
|
||||
|
||||
/**
|
||||
* Get the <code>Resource</code> capability of the container.
|
||||
* @return <code>Resource</code> capability of the container
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public abstract Resource getCapability();
|
||||
|
||||
/**
|
||||
* Set the <code>Resource</code> capability of the container.
|
||||
* @param capability <code>Resource</code> capability of the container
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public abstract void setCapability(Resource capability);
|
||||
|
||||
/**
|
||||
* Get the target <code>ExecutionType</code> of the container.
|
||||
* @return <code>ExecutionType</code> of the container
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.util.resource;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -24,7 +26,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
@Private
|
||||
@Unstable
|
||||
public class DefaultResourceCalculator extends ResourceCalculator {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(DefaultResourceCalculator.class);
|
||||
|
||||
@Override
|
||||
public int compare(Resource unused, Resource lhs, Resource rhs) {
|
||||
// Only consider memory
|
||||
|
@ -64,6 +68,13 @@ public class DefaultResourceCalculator extends ResourceCalculator {
|
|||
@Override
|
||||
public Resource normalize(Resource r, Resource minimumResource,
|
||||
Resource maximumResource, Resource stepFactor) {
|
||||
if (stepFactor.getMemorySize() == 0) {
|
||||
LOG.error("Memory cannot be allocated in increments of zero. Assuming " +
|
||||
minimumResource.getMemorySize() + "MB increment size. "
|
||||
+ "Please ensure the scheduler configuration is correct.");
|
||||
stepFactor = minimumResource;
|
||||
}
|
||||
|
||||
long normalizedMemory = Math.min(
|
||||
roundUp(
|
||||
Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
|
||||
|
@ -72,12 +83,6 @@ public class DefaultResourceCalculator extends ResourceCalculator {
|
|||
return Resources.createResource(normalizedMemory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource normalize(Resource r, Resource minimumResource,
|
||||
Resource maximumResource) {
|
||||
return normalize(r, minimumResource, maximumResource, minimumResource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource roundUp(Resource r, Resource stepFactor) {
|
||||
return Resources.createResource(
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.util.resource;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -45,7 +47,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
@Private
|
||||
@Unstable
|
||||
public class DominantResourceCalculator extends ResourceCalculator {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(DominantResourceCalculator.class);
|
||||
|
||||
@Override
|
||||
public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
|
||||
|
||||
|
@ -152,6 +156,25 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
@Override
|
||||
public Resource normalize(Resource r, Resource minimumResource,
|
||||
Resource maximumResource, Resource stepFactor) {
|
||||
if (stepFactor.getMemorySize() == 0 || stepFactor.getVirtualCores() == 0) {
|
||||
Resource step = Resources.clone(stepFactor);
|
||||
if (stepFactor.getMemorySize() == 0) {
|
||||
LOG.error("Memory cannot be allocated in increments of zero. Assuming "
|
||||
+ minimumResource.getMemorySize() + "MB increment size. "
|
||||
+ "Please ensure the scheduler configuration is correct.");
|
||||
step.setMemorySize(minimumResource.getMemorySize());
|
||||
}
|
||||
|
||||
if (stepFactor.getVirtualCores() == 0) {
|
||||
LOG.error("VCore cannot be allocated in increments of zero. Assuming "
|
||||
+ minimumResource.getVirtualCores() + "VCores increment size. "
|
||||
+ "Please ensure the scheduler configuration is correct.");
|
||||
step.setVirtualCores(minimumResource.getVirtualCores());
|
||||
}
|
||||
|
||||
stepFactor = step;
|
||||
}
|
||||
|
||||
long normalizedMemory = Math.min(
|
||||
roundUp(
|
||||
Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
|
||||
|
|
|
@ -99,22 +99,7 @@ public abstract class ResourceCalculator {
|
|||
/**
|
||||
* Normalize resource <code>r</code> given the base
|
||||
* <code>minimumResource</code> and verify against max allowed
|
||||
* <code>maximumResource</code>
|
||||
*
|
||||
* @param r resource
|
||||
* @param minimumResource step-factor
|
||||
* @param maximumResource the upper bound of the resource to be allocated
|
||||
* @return normalized resource
|
||||
*/
|
||||
public Resource normalize(Resource r, Resource minimumResource,
|
||||
Resource maximumResource) {
|
||||
return normalize(r, minimumResource, maximumResource, minimumResource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalize resource <code>r</code> given the base
|
||||
* <code>minimumResource</code> and verify against max allowed
|
||||
* <code>maximumResource</code> using a step factor for hte normalization.
|
||||
* <code>maximumResource</code> using a step factor for the normalization.
|
||||
*
|
||||
* @param r resource
|
||||
* @param minimumResource minimum value
|
||||
|
|
|
@ -150,4 +150,83 @@ public class TestResourceCalculator {
|
|||
Resources.min(resourceCalculator, clusterResource, lhs, rhs));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test resource normalization.
|
||||
*/
|
||||
@Test(timeout = 10000)
|
||||
public void testNormalize() {
|
||||
// requested resources value cannot be an arbitrary number.
|
||||
Resource ask = Resource.newInstance(1111, 2);
|
||||
Resource min = Resource.newInstance(1024, 1);
|
||||
Resource max = Resource.newInstance(8 * 1024, 8);
|
||||
Resource increment = Resource.newInstance(1024, 4);
|
||||
if (resourceCalculator instanceof DefaultResourceCalculator) {
|
||||
Resource result = Resources.normalize(resourceCalculator,
|
||||
ask, min, max, increment);
|
||||
|
||||
Assert.assertEquals(2 * 1024, result.getMemorySize());
|
||||
} else if (resourceCalculator instanceof DominantResourceCalculator) {
|
||||
Resource result = Resources.normalize(resourceCalculator,
|
||||
ask, min, max, increment);
|
||||
|
||||
Assert.assertEquals(2 * 1024, result.getMemorySize());
|
||||
Assert.assertEquals(4, result.getVirtualCores());
|
||||
}
|
||||
|
||||
// if resources asked are less than minimum resource, then normalize it to
|
||||
// minimum resource.
|
||||
ask = Resource.newInstance(512, 0);
|
||||
min = Resource.newInstance(2 * 1024, 2);
|
||||
max = Resource.newInstance(8 * 1024, 8);
|
||||
increment = Resource.newInstance(1024, 1);
|
||||
if (resourceCalculator instanceof DefaultResourceCalculator) {
|
||||
Resource result = Resources.normalize(resourceCalculator,
|
||||
ask, min, max, increment);
|
||||
|
||||
Assert.assertEquals(2 * 1024, result.getMemorySize());
|
||||
} else if (resourceCalculator instanceof DominantResourceCalculator) {
|
||||
Resource result = Resources.normalize(resourceCalculator,
|
||||
ask, min, max, increment);
|
||||
|
||||
Assert.assertEquals(2 * 1024, result.getMemorySize());
|
||||
Assert.assertEquals(2, result.getVirtualCores());
|
||||
}
|
||||
|
||||
// if resources asked are larger than maximum resource, then normalize it to
|
||||
// maximum resources.
|
||||
ask = Resource.newInstance(9 * 1024, 9);
|
||||
min = Resource.newInstance(2 * 1024, 2);
|
||||
max = Resource.newInstance(8 * 1024, 8);
|
||||
increment = Resource.newInstance(1024, 1);
|
||||
if (resourceCalculator instanceof DefaultResourceCalculator) {
|
||||
Resource result = Resources.normalize(resourceCalculator,
|
||||
ask, min, max, increment);
|
||||
|
||||
Assert.assertEquals(8 * 1024, result.getMemorySize());
|
||||
} else if (resourceCalculator instanceof DominantResourceCalculator) {
|
||||
Resource result = Resources.normalize(resourceCalculator,
|
||||
ask, min, max, increment);
|
||||
|
||||
Assert.assertEquals(8 * 1024, result.getMemorySize());
|
||||
Assert.assertEquals(8, result.getVirtualCores());
|
||||
}
|
||||
|
||||
// if increment is 0, use minimum resource as the increment resource.
|
||||
ask = Resource.newInstance(1111, 2);
|
||||
min = Resource.newInstance(2 * 1024, 2);
|
||||
max = Resource.newInstance(8 * 1024, 8);
|
||||
increment = Resource.newInstance(0, 0);
|
||||
if (resourceCalculator instanceof DefaultResourceCalculator) {
|
||||
Resource result = Resources.normalize(resourceCalculator,
|
||||
ask, min, max, increment);
|
||||
|
||||
Assert.assertEquals(2 * 1024, result.getMemorySize());
|
||||
} else if (resourceCalculator instanceof DominantResourceCalculator) {
|
||||
Resource result = Resources.normalize(resourceCalculator,
|
||||
ask, min, max, increment);
|
||||
|
||||
Assert.assertEquals(2 * 1024, result.getMemorySize());
|
||||
Assert.assertEquals(2, result.getVirtualCores());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -460,11 +460,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
throw e;
|
||||
}
|
||||
|
||||
SchedulerUtils.normalizeRequest(amReq, scheduler.getResourceCalculator(),
|
||||
scheduler.getClusterResource(),
|
||||
scheduler.getMinimumResourceCapability(),
|
||||
scheduler.getMaximumResourceCapability(),
|
||||
scheduler.getMinimumResourceCapability());
|
||||
scheduler.normalizeRequest(amReq);
|
||||
return amReq;
|
||||
}
|
||||
|
||||
|
|
|
@ -73,7 +73,6 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
|
@ -304,13 +303,7 @@ public class RMServerUtils {
|
|||
return false;
|
||||
}
|
||||
ResourceScheduler scheduler = rmContext.getScheduler();
|
||||
ResourceCalculator rc = scheduler.getResourceCalculator();
|
||||
Resource targetResource = Resources.normalize(rc, request.getCapability(),
|
||||
scheduler.getMinimumResourceCapability(),
|
||||
scheduler.getMaximumResourceCapability(),
|
||||
scheduler.getMinimumResourceCapability());
|
||||
// Update normalized target resource
|
||||
request.setCapability(targetResource);
|
||||
scheduler.normalizeRequest(request);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -1024,4 +1025,23 @@ public abstract class AbstractYarnScheduler
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void normalizeRequest(AbstractResourceRequest ask) {
|
||||
SchedulerUtils.normalizeRequest(ask,
|
||||
getResourceCalculator(),
|
||||
getMinimumResourceCapability(),
|
||||
getMaximumResourceCapability(),
|
||||
getMinimumResourceCapability());
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalize a list of resource requests.
|
||||
*
|
||||
* @param asks resource requests
|
||||
*/
|
||||
protected void normalizeRequests(List<ResourceRequest> asks) {
|
||||
for (ResourceRequest ask: asks) {
|
||||
normalizeRequest(ask);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
@ -33,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
|
|||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
|
@ -123,23 +123,6 @@ public class SchedulerUtils {
|
|||
return containerStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to normalize a list of resource requests, by insuring that
|
||||
* the memory for each request is a multiple of minMemory and is not zero.
|
||||
*/
|
||||
public static void normalizeRequests(
|
||||
List<ResourceRequest> asks,
|
||||
ResourceCalculator resourceCalculator,
|
||||
Resource clusterResource,
|
||||
Resource minimumResource,
|
||||
Resource maximumResource) {
|
||||
for (ResourceRequest ask : asks) {
|
||||
normalizeRequest(
|
||||
ask, resourceCalculator, clusterResource, minimumResource,
|
||||
maximumResource, minimumResource);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to normalize a resource request, by insuring that the
|
||||
* requested memory is a multiple of minMemory and is not zero.
|
||||
|
@ -147,49 +130,25 @@ public class SchedulerUtils {
|
|||
public static void normalizeRequest(
|
||||
ResourceRequest ask,
|
||||
ResourceCalculator resourceCalculator,
|
||||
Resource clusterResource,
|
||||
Resource minimumResource,
|
||||
Resource maximumResource) {
|
||||
Resource normalized =
|
||||
Resources.normalize(
|
||||
resourceCalculator, ask.getCapability(), minimumResource,
|
||||
maximumResource, minimumResource);
|
||||
ask.setCapability(normalized);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to normalize a list of resource requests, by insuring that
|
||||
* the memory for each request is a multiple of minMemory and is not zero.
|
||||
*/
|
||||
public static void normalizeRequests(
|
||||
List<ResourceRequest> asks,
|
||||
ResourceCalculator resourceCalculator,
|
||||
Resource clusterResource,
|
||||
Resource minimumResource,
|
||||
Resource maximumResource,
|
||||
Resource incrementResource) {
|
||||
for (ResourceRequest ask : asks) {
|
||||
normalizeRequest(
|
||||
ask, resourceCalculator, clusterResource, minimumResource,
|
||||
maximumResource, incrementResource);
|
||||
}
|
||||
normalizeRequest(ask, resourceCalculator,
|
||||
minimumResource, maximumResource, minimumResource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to normalize a resource request, by insuring that the
|
||||
* requested memory is a multiple of minMemory and is not zero.
|
||||
* requested memory is a multiple of increment resource and is not zero.
|
||||
*/
|
||||
public static void normalizeRequest(
|
||||
ResourceRequest ask,
|
||||
ResourceCalculator resourceCalculator,
|
||||
Resource clusterResource,
|
||||
AbstractResourceRequest ask,
|
||||
ResourceCalculator resourceCalculator,
|
||||
Resource minimumResource,
|
||||
Resource maximumResource,
|
||||
Resource incrementResource) {
|
||||
Resource normalized =
|
||||
Resources.normalize(
|
||||
resourceCalculator, ask.getCapability(), minimumResource,
|
||||
maximumResource, incrementResource);
|
||||
Resource normalized = Resources.normalize(
|
||||
resourceCalculator, ask.getCapability(), minimumResource,
|
||||
maximumResource, incrementResource);
|
||||
ask.setCapability(normalized);
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -368,4 +369,11 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||
* @return SchedulerNode corresponds to nodeId
|
||||
*/
|
||||
SchedulerNode getSchedulerNode(NodeId nodeId);
|
||||
|
||||
/**
|
||||
* Normalize a resource request.
|
||||
*
|
||||
* @param request the resource request to be normalized
|
||||
*/
|
||||
void normalizeRequest(AbstractResourceRequest request);
|
||||
}
|
||||
|
|
|
@ -1119,9 +1119,7 @@ public class CapacityScheduler extends
|
|||
decreaseContainers(decreaseRequests, application);
|
||||
|
||||
// Sanity check for new allocation requests
|
||||
SchedulerUtils.normalizeRequests(ask, getResourceCalculator(),
|
||||
getClusterResource(), getMinimumResourceCapability(),
|
||||
getMaximumResourceCapability());
|
||||
normalizeRequests(ask);
|
||||
|
||||
Allocation allocation;
|
||||
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
@ -212,12 +213,12 @@ public class FairScheduler extends
|
|||
getClusterResource(), resource, reservationThreshold);
|
||||
}
|
||||
|
||||
private void validateConf(Configuration conf) {
|
||||
private void validateConf(FairSchedulerConfiguration config) {
|
||||
// validate scheduler memory allocation setting
|
||||
int minMem = conf.getInt(
|
||||
int minMem = config.getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
||||
int maxMem = conf.getInt(
|
||||
int maxMem = config.getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
|
||||
|
@ -232,11 +233,19 @@ public class FairScheduler extends
|
|||
+ "the minimum allocation value.");
|
||||
}
|
||||
|
||||
long incrementMem = config.getIncrementAllocation().getMemorySize();
|
||||
if (incrementMem <= 0) {
|
||||
throw new YarnRuntimeException("Invalid resource scheduler memory"
|
||||
+ " allocation configuration: "
|
||||
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB
|
||||
+ "=" + incrementMem + ". Values must be greater than 0.");
|
||||
}
|
||||
|
||||
// validate scheduler vcores allocation setting
|
||||
int minVcores = conf.getInt(
|
||||
int minVcores = config.getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
||||
int maxVcores = conf.getInt(
|
||||
int maxVcores = config.getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||
|
||||
|
@ -250,6 +259,14 @@ public class FairScheduler extends
|
|||
+ "and the maximum allocation value must be greater than or equal to"
|
||||
+ "the minimum allocation value.");
|
||||
}
|
||||
|
||||
int incrementVcore = config.getIncrementAllocation().getVirtualCores();
|
||||
if (incrementVcore <= 0) {
|
||||
throw new YarnRuntimeException("Invalid resource scheduler vcores"
|
||||
+ " allocation configuration: "
|
||||
+ FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES
|
||||
+ "=" + incrementVcore + ". Values must be greater than 0.");
|
||||
}
|
||||
}
|
||||
|
||||
public FairSchedulerConfiguration getConf() {
|
||||
|
@ -767,6 +784,15 @@ public class FairScheduler extends
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void normalizeRequest(AbstractResourceRequest ask) {
|
||||
SchedulerUtils.normalizeRequest(ask,
|
||||
DOMINANT_RESOURCE_CALCULATOR,
|
||||
minimumAllocation,
|
||||
getMaximumResourceCapability(),
|
||||
incrAllocation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Allocation allocate(ApplicationAttemptId appAttemptId,
|
||||
List<ResourceRequest> ask, List<ContainerId> release,
|
||||
|
@ -783,9 +809,7 @@ public class FairScheduler extends
|
|||
}
|
||||
|
||||
// Sanity check
|
||||
SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR,
|
||||
getClusterResource(), minimumAllocation, getMaximumResourceCapability(),
|
||||
incrAllocation);
|
||||
normalizeRequests(ask);
|
||||
|
||||
// Record container allocation start time
|
||||
application.recordContainerRequestTime(getClock().getTime());
|
||||
|
|
|
@ -335,9 +335,7 @@ public class FifoScheduler extends
|
|||
}
|
||||
|
||||
// Sanity check
|
||||
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
|
||||
getClusterResource(), minimumAllocation,
|
||||
getMaximumResourceCapability());
|
||||
normalizeRequests(ask);
|
||||
|
||||
// Release containers
|
||||
releaseContainers(release, application);
|
||||
|
|
|
@ -112,37 +112,37 @@ public class TestSchedulerUtils {
|
|||
|
||||
// case negative memory
|
||||
ask.setCapability(Resources.createResource(-1024));
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
|
||||
maxResource);
|
||||
assertEquals(minMemory, ask.getCapability().getMemorySize());
|
||||
|
||||
// case zero memory
|
||||
ask.setCapability(Resources.createResource(0));
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
|
||||
maxResource);
|
||||
assertEquals(minMemory, ask.getCapability().getMemorySize());
|
||||
|
||||
// case memory is a multiple of minMemory
|
||||
ask.setCapability(Resources.createResource(2 * minMemory));
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
|
||||
maxResource);
|
||||
assertEquals(2 * minMemory, ask.getCapability().getMemorySize());
|
||||
|
||||
// case memory is not a multiple of minMemory
|
||||
ask.setCapability(Resources.createResource(minMemory + 10));
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
|
||||
maxResource);
|
||||
assertEquals(2 * minMemory, ask.getCapability().getMemorySize());
|
||||
|
||||
// case memory is equal to max allowed
|
||||
ask.setCapability(Resources.createResource(maxMemory));
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
|
||||
maxResource);
|
||||
assertEquals(maxMemory, ask.getCapability().getMemorySize());
|
||||
|
||||
// case memory is just less than max
|
||||
ask.setCapability(Resources.createResource(maxMemory - 10));
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
|
||||
maxResource);
|
||||
assertEquals(maxMemory, ask.getCapability().getMemorySize());
|
||||
|
||||
|
@ -150,14 +150,14 @@ public class TestSchedulerUtils {
|
|||
maxResource = Resources.createResource(maxMemory - 10, 0);
|
||||
ask.setCapability(Resources.createResource(maxMemory - 100));
|
||||
// multiple of minMemory > maxMemory, then reduce to maxMemory
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
|
||||
maxResource);
|
||||
assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize());
|
||||
|
||||
// ask is more than max
|
||||
maxResource = Resources.createResource(maxMemory, 0);
|
||||
ask.setCapability(Resources.createResource(maxMemory + 100));
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
|
||||
SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
|
||||
maxResource);
|
||||
assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize());
|
||||
}
|
||||
|
@ -175,13 +175,13 @@ public class TestSchedulerUtils {
|
|||
// case negative memory/vcores
|
||||
ask.setCapability(Resources.createResource(-1024, -1));
|
||||
SchedulerUtils.normalizeRequest(
|
||||
ask, resourceCalculator, clusterResource, minResource, maxResource);
|
||||
ask, resourceCalculator, minResource, maxResource);
|
||||
assertEquals(minResource, ask.getCapability());
|
||||
|
||||
// case zero memory/vcores
|
||||
ask.setCapability(Resources.createResource(0, 0));
|
||||
SchedulerUtils.normalizeRequest(
|
||||
ask, resourceCalculator, clusterResource, minResource, maxResource);
|
||||
ask, resourceCalculator, minResource, maxResource);
|
||||
assertEquals(minResource, ask.getCapability());
|
||||
assertEquals(1, ask.getCapability().getVirtualCores());
|
||||
assertEquals(1024, ask.getCapability().getMemorySize());
|
||||
|
@ -189,7 +189,7 @@ public class TestSchedulerUtils {
|
|||
// case non-zero memory & zero cores
|
||||
ask.setCapability(Resources.createResource(1536, 0));
|
||||
SchedulerUtils.normalizeRequest(
|
||||
ask, resourceCalculator, clusterResource, minResource, maxResource);
|
||||
ask, resourceCalculator, minResource, maxResource);
|
||||
assertEquals(Resources.createResource(2048, 1), ask.getCapability());
|
||||
assertEquals(1, ask.getCapability().getVirtualCores());
|
||||
assertEquals(2048, ask.getCapability().getMemorySize());
|
||||
|
|
Loading…
Reference in New Issue