Merge -c 1465067 from trunk to branch-2 for YARN-193. Scheduler.normalizeRequest does not account for allocation requests that exceed maximumAllocation limits (Zhijie Shen via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1465069 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-04-05 17:48:35 +00:00
parent 003ecf8137
commit e97e0eaec7
21 changed files with 450 additions and 44 deletions

View File

@ -65,6 +65,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-458. YARN daemon addresses must be placed in many different configs.
(sandyr via tucu)
YARN-193. Scheduler.normalizeRequest does not account for allocation
requests that exceed maximumAllocation limits (Zhijie Shen via bikas)
OPTIMIZATIONS
BUG FIXES

View File

@ -122,9 +122,9 @@ public class YarnConfiguration extends Configuration {
public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_MB =
YARN_PREFIX + "scheduler.maximum-allocation-mb";
public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192;
public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES =
public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES =
YARN_PREFIX + "scheduler.maximum-allocation-vcores";
public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES = 32;
public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4;
/** Number of threads to handle scheduler interface.*/
public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT =

View File

@ -54,15 +54,17 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.service.AbstractService;
@ -276,6 +278,14 @@ public class ApplicationMasterService extends AbstractService implements
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
// sanity check
try {
SchedulerUtils.validateResourceRequests(ask,
rScheduler.getMaximumResourceCapability());
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw RPCUtil.getRemoteException(e);
}
// Send new requests to appAttempt.
Allocation allocation =
this.rScheduler.allocate(appAttemptId, ask, release);

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@ -86,8 +87,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -273,6 +277,21 @@ public class ClientRMService extends AbstractService implements
// Safety
submissionContext.setUser(user);
// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
ResourceRequest amReq = BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getAMContainerSpec().getResource(), 1);
try {
SchedulerUtils.validateResourceRequest(amReq,
scheduler.getMaximumResourceCapability());
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + applicationId, e);
throw RPCUtil.getRemoteException(e);
}
}
// This needs to be synchronous as the client can query
// immediately following the submission to get the application status.
// So call handle directly and do not send an event.

View File

@ -327,12 +327,52 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.applicationACLsManager, this.conf);
}
// sanity check for configurations
protected static void validateConfigs(Configuration conf) {
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
// validate max-attempts
int globalMaxAppAttempts =
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
if (globalMaxAppAttempts <= 0) {
throw new YarnException(
"The global max attempts should be a positive integer.");
throw new YarnException("Invalid global max attempts configuration"
+ ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
+ "=" + globalMaxAppAttempts + ", it should be a positive integer.");
}
// validate scheduler memory allocation setting
int minMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int maxMem = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
if (minMem <= 0 || minMem > maxMem) {
throw new YarnException("Invalid resource scheduler memory"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "=" + minMem
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ "=" + maxMem + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
// validate scheduler vcores allocation setting
int minVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
int maxVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
if (minVcores <= 0 || minVcores > maxVcores) {
throw new YarnException("Invalid resource scheduler vcores"
+ " allocation configuration"
+ ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ "=" + minVcores
+ ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ "=" + maxVcores + ", min and max should be greater than 0"
+ ", max should be no smaller than min.");
}
}

View File

@ -53,12 +53,14 @@ public class DefaultResourceCalculator extends ResourceCalculator {
}
@Override
public Resource normalize(Resource r, Resource minimumResource) {
return Resources.createResource(
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource) {
int normalizedMemory = Math.min(
roundUp(
Math.max(r.getMemory(), minimumResource.getMemory()),
minimumResource.getMemory())
);
minimumResource.getMemory()),
maximumResource.getMemory());
return Resources.createResource(normalizedMemory);
}
@Override

View File

@ -123,15 +123,20 @@ public class DominantResourceCalculator extends ResourceCalculator {
}
@Override
public Resource normalize(Resource r, Resource minimumResource) {
return Resources.createResource(
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource) {
int normalizedMemory = Math.min(
roundUp(
Math.max(r.getMemory(), minimumResource.getMemory()),
Math.max(r.getMemory(), minimumResource.getMemory()),
minimumResource.getMemory()),
maximumResource.getMemory());
int normalizedCores = Math.min(
roundUp(
Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
minimumResource.getVirtualCores())
);
minimumResource.getVirtualCores()),
maximumResource.getVirtualCores());
return Resources.createResource(normalizedMemory,
normalizedCores);
}
@Override

View File

@ -88,13 +88,16 @@ public abstract class ResourceCalculator {
/**
* Normalize resource <code>r</code> given the base
* <code>minimumResource</code>.
* <code>minimumResource</code> and verify against max allowed
* <code>maximumResource</code>
*
* @param r resource
* @param minimumResource step-factor
* @param maximumResource the upper bound of the resource to be allocated
* @return normalized resource
*/
public abstract Resource normalize(Resource r, Resource minimumResource);
public abstract Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource);
/**
* Round-up resource <code>r</code> given factor <code>stepFactor</code>.

View File

@ -132,8 +132,9 @@ public class Resources {
}
public static Resource normalize(
ResourceCalculator calculator, Resource lhs, Resource factor) {
return calculator.normalize(lhs, factor);
ResourceCalculator calculator, Resource lhs, Resource factor,
Resource limit) {
return calculator.normalize(lhs, factor, limit);
}
public static Resource roundUp(

View File

@ -740,6 +740,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt.submissionContext
.getAMContainerSpec().getResource(), 1);
// SchedulerUtils.validateResourceRequests is not necessary because
// AM resource has been checked when submission
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId,
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.YarnException;
/**
* The exception is thrown when the requested resource is out of the range
* of the configured lower and upper resource boundaries.
*
*/
public class InvalidResourceRequestException extends YarnException {
public InvalidResourceRequestException(Throwable cause) {
super(cause);
}
public InvalidResourceRequestException(String message) {
super(message);
}
public InvalidResourceRequestException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -89,10 +89,12 @@ public class SchedulerUtils {
List<ResourceRequest> asks,
ResourceCalculator resourceCalculator,
Resource clusterResource,
Resource minimumResource) {
Resource minimumResource,
Resource maximumResource) {
for (ResourceRequest ask : asks) {
normalizeRequest(
ask, resourceCalculator, clusterResource, minimumResource);
ask, resourceCalculator, clusterResource, minimumResource,
maximumResource);
}
}
@ -104,11 +106,50 @@ public class SchedulerUtils {
ResourceRequest ask,
ResourceCalculator resourceCalculator,
Resource clusterResource,
Resource minimumResource) {
Resource minimumResource,
Resource maximumResource) {
Resource normalized =
Resources.normalize(
resourceCalculator, ask.getCapability(), minimumResource);
resourceCalculator, ask.getCapability(), minimumResource,
maximumResource);
ask.setCapability(normalized);
}
/**
* Utility method to validate a resource request, by insuring that the
* requested memory/vcore is non-negative and not greater than max
*/
public static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource) throws InvalidResourceRequestException {
if (resReq.getCapability().getMemory() < 0 ||
resReq.getCapability().getMemory() > maximumResource.getMemory()) {
throw new InvalidResourceRequestException("Invalid resource request"
+ ", requested memory < 0"
+ ", or requested memory > max configured"
+ ", requestedMemory=" + resReq.getCapability().getMemory()
+ ", maxMemory=" + maximumResource.getMemory());
}
if (resReq.getCapability().getVirtualCores() < 0 ||
resReq.getCapability().getVirtualCores() >
maximumResource.getVirtualCores()) {
throw new InvalidResourceRequestException("Invalid resource request"
+ ", requested virtual cores < 0"
+ ", or requested virtual cores > max configured"
+ ", requestedVirtualCores="
+ resReq.getCapability().getVirtualCores()
+ ", maxVirtualCores=" + maximumResource.getVirtualCores());
}
}
/**
* Utility method to validate a list resource requests, by insuring that the
* requested memory/vcore is non-negative and not greater than max
*/
public static void validateResourceRequests(List<ResourceRequest> ask,
Resource maximumResource) throws InvalidResourceRequestException {
for (ResourceRequest resReq : ask) {
validateResourceRequest(resReq, maximumResource);
}
}
}

View File

@ -483,7 +483,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
// Sanity check
SchedulerUtils.normalizeRequests(
ask, calculator, getClusterResources(), minimumAllocation);
ask, calculator, getClusterResources(), minimumAllocation,
maximumAllocation);
// Release containers
for (ContainerId releasedContainerId : release) {

View File

@ -315,8 +315,8 @@ public class CapacitySchedulerConfiguration extends Configuration {
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
int maximumCores = getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES);
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
return Resources.createResource(maximumMemory, maximumCores);
}

View File

@ -646,11 +646,12 @@ public class FairScheduler implements ResourceScheduler {
*
* @param asks a list of resource requests
* @param minMemory the configured minimum memory allocation
* @param maxMemory the configured maximum memory allocation
*/
static void normalizeRequests(List<ResourceRequest> asks,
int minMemory) {
int minMemory, int maxMemory) {
for (ResourceRequest ask : asks) {
normalizeRequest(ask, minMemory);
normalizeRequest(ask, minMemory, maxMemory);
}
}
@ -660,11 +661,14 @@ public class FairScheduler implements ResourceScheduler {
*
* @param ask the resource request
* @param minMemory the configured minimum memory allocation
* @param maxMemory the configured maximum memory allocation
*/
static void normalizeRequest(ResourceRequest ask, int minMemory) {
static void normalizeRequest(ResourceRequest ask, int minMemory,
int maxMemory) {
int memory = Math.max(ask.getCapability().getMemory(), minMemory);
ask.getCapability().setMemory(
minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0)));
int normalizedMemory =
minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0));
ask.getCapability().setMemory(Math.min(normalizedMemory, maxMemory));
}
@Override
@ -680,7 +684,8 @@ public class FairScheduler implements ResourceScheduler {
}
// Sanity check
normalizeRequests(ask, minimumAllocation.getMemory());
normalizeRequests(ask, minimumAllocation.getMemory(),
maximumAllocation.getMemory());
// Release containers
for (ContainerId releasedContainerId : release) {

View File

@ -232,7 +232,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
// Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
clusterResource, minimumAllocation);
clusterResource, minimumAllocation, maximumAllocation);
// Release containers
for (ContainerId releasedContainer : release) {

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@ -254,6 +255,12 @@ public class TestClientRMService {
public void testConcurrentAppSubmit()
throws IOException, InterruptedException, BrokenBarrierException {
YarnScheduler yarnScheduler = mock(YarnScheduler.class);
when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
@ -311,13 +318,54 @@ public class TestClientRMService {
endBarrier.await();
t.join();
}
@Test (timeout = 30000)
public void testInvalidResourceRequestWhenSubmittingApplication()
throws IOException, InterruptedException, BrokenBarrierException {
YarnScheduler yarnScheduler = mock(YarnScheduler.class);
when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
null, mock(ApplicationACLsManager.class), new Configuration());
final ApplicationId appId = getApplicationId(100);
final SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId);
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1);
when(submitRequest.getApplicationSubmissionContext().getAMContainerSpec()
.getResource()).thenReturn(resource);
final ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
// submit an app
try {
rmService.submitApplication(submitRequest);
Assert.fail("Application submission should fail because resource" +
" request is invalid.");
} catch (YarnRemoteException e) {
// Exception is expected
Assert.assertTrue("The thrown exception is not" +
" InvalidResourceRequestException",
e.getMessage().startsWith("Invalid resource request"));
}
}
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
String user = MockApps.newUserName();
String queue = MockApps.newQueue();
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
Resource resource = mock(Resource.class);
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
when(amContainerSpec.getResource()).thenReturn(resource);
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);

View File

@ -198,6 +198,8 @@ public class TestFifoScheduler {
int allocMB = 1536;
YarnConfiguration conf = new YarnConfiguration(TestFifoScheduler.conf);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, allocMB);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
allocMB * 10);
// Test for something lesser than this.
testMinimumAllocation(conf, allocMB / 2);
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
@ -192,9 +193,41 @@ public class TestResourceManager {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, -1);
try {
resourceManager.init(conf);
fail("Exception is expected because the global max attempts is negative.");
fail("Exception is expected because the global max attempts" +
" is negative.");
} catch (YarnException e) {
// Exception is expected.
assertTrue("The thrown exception is not the expected one.",
e.getMessage().startsWith(
"Invalid global max attempts configuration"));
}
conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
resourceManager.init(conf);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnException e) {
// Exception is expected.
assertTrue("The thrown exception is not the expected one.",
e.getMessage().startsWith(
"Invalid resource scheduler memory"));
}
conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
try {
resourceManager.init(conf);
fail("Exception is expected because the min vcores allocation is" +
" larger than the max vcores allocation.");
} catch (YarnException e) {
// Exception is expected.
assertTrue("The thrown exception is not the expected one.",
e.getMessage().startsWith(
"Invalid resource scheduler vcores"));
}
}

View File

@ -19,54 +19,92 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestSchedulerUtils {
@Test
@Test (timeout = 30000)
public void testNormalizeRequest() {
ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
final int minMemory = 1024;
final int maxMemory = 8192;
Resource minResource = Resources.createResource(minMemory, 0);
Resource maxResource = Resources.createResource(maxMemory, 0);
ResourceRequest ask = new ResourceRequestPBImpl();
// case negative memory
ask.setCapability(Resources.createResource(-1024));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
maxResource);
assertEquals(minMemory, ask.getCapability().getMemory());
// case zero memory
ask.setCapability(Resources.createResource(0));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
maxResource);
assertEquals(minMemory, ask.getCapability().getMemory());
// case memory is a multiple of minMemory
ask.setCapability(Resources.createResource(2 * minMemory));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
maxResource);
assertEquals(2 * minMemory, ask.getCapability().getMemory());
// case memory is not a multiple of minMemory
ask.setCapability(Resources.createResource(minMemory + 10));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource);
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
maxResource);
assertEquals(2 * minMemory, ask.getCapability().getMemory());
// case memory is equal to max allowed
ask.setCapability(Resources.createResource(maxMemory));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
maxResource);
assertEquals(maxMemory, ask.getCapability().getMemory());
// case memory is just less than max
ask.setCapability(Resources.createResource(maxMemory - 10));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
maxResource);
assertEquals(maxMemory, ask.getCapability().getMemory());
// max is not a multiple of min
maxResource = Resources.createResource(maxMemory - 10, 0);
ask.setCapability(Resources.createResource(maxMemory - 100));
// multiple of minMemory > maxMemory, then reduce to maxMemory
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
maxResource);
assertEquals(maxResource.getMemory(), ask.getCapability().getMemory());
// ask is more than max
maxResource = Resources.createResource(maxMemory, 0);
ask.setCapability(Resources.createResource(maxMemory + 100));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, null, minResource,
maxResource);
assertEquals(maxResource.getMemory(), ask.getCapability().getMemory());
}
@Test
@Test (timeout = 30000)
public void testNormalizeRequestWithDominantResourceCalculator() {
ResourceCalculator resourceCalculator = new DominantResourceCalculator();
Resource minResource = Resources.createResource(1024, 1);
Resource maxResource = Resources.createResource(10240, 10);
Resource clusterResource = Resources.createResource(10 * 1024, 10);
ResourceRequest ask = new ResourceRequestPBImpl();
@ -74,13 +112,13 @@ public class TestSchedulerUtils {
// case negative memory/vcores
ask.setCapability(Resources.createResource(-1024, -1));
SchedulerUtils.normalizeRequest(
ask, resourceCalculator, clusterResource, minResource);
ask, resourceCalculator, clusterResource, minResource, maxResource);
assertEquals(minResource, ask.getCapability());
// case zero memory/vcores
ask.setCapability(Resources.createResource(0, 0));
SchedulerUtils.normalizeRequest(
ask, resourceCalculator, clusterResource, minResource);
ask, resourceCalculator, clusterResource, minResource, maxResource);
assertEquals(minResource, ask.getCapability());
assertEquals(1, ask.getCapability().getVirtualCores());
assertEquals(1024, ask.getCapability().getMemory());
@ -88,9 +126,118 @@ public class TestSchedulerUtils {
// case non-zero memory & zero cores
ask.setCapability(Resources.createResource(1536, 0));
SchedulerUtils.normalizeRequest(
ask, resourceCalculator, clusterResource, minResource);
ask, resourceCalculator, clusterResource, minResource, maxResource);
assertEquals(Resources.createResource(2048, 1), ask.getCapability());
assertEquals(1, ask.getCapability().getVirtualCores());
assertEquals(2048, ask.getCapability().getMemory());
}
@Test (timeout = 30000)
public void testValidateResourceRequest() {
Resource maxResource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
// zero memory
try {
Resource resource = Resources.createResource(
0,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource);
} catch (InvalidResourceRequestException e) {
fail("Zero memory should be accepted");
}
// zero vcores
try {
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
0);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource);
} catch (InvalidResourceRequestException e) {
fail("Zero vcores should be accepted");
}
// max memory
try {
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource);
} catch (InvalidResourceRequestException e) {
fail("Max memory should be accepted");
}
// max vcores
try {
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource);
} catch (InvalidResourceRequestException e) {
fail("Max vcores should not be accepted");
}
// negative memory
try {
Resource resource = Resources.createResource(
-1,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource);
fail("Negative memory should not be accepted");
} catch (InvalidResourceRequestException e) {
// expected
}
// negative vcores
try {
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-1);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource);
fail("Negative vcores should not be accepted");
} catch (InvalidResourceRequestException e) {
// expected
}
// more than max memory
try {
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource);
fail("More than max memory should not be accepted");
} catch (InvalidResourceRequestException e) {
// expected
}
// more than max vcores
try {
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ 1);
ResourceRequest resReq = BuilderUtils.newResourceRequest(
mock(Priority.class), ResourceRequest.ANY, resource, 1);
SchedulerUtils.validateResourceRequest(resReq, maxResource);
fail("More than max vcores should not be accepted");
} catch (InvalidResourceRequestException e) {
// expected
}
}
}

View File

@ -101,6 +101,8 @@ public class TestFairScheduler {
public void setUp() throws IOException {
scheduler = new FairScheduler();
Configuration conf = createConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1024);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
resourceManager = new ResourceManager();