From 2cccf4061cc4021c48e29879700dbc94f832b7d1 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Fri, 27 Jul 2018 14:35:03 -0700 Subject: [PATCH] YARN-8517. getContainer and getContainers ResourceManager REST API methods are not documented (snemeth via rkanter) --- .../InvalidResourceRequestException.java | 36 + .../resourcemanager/DefaultAMSProcessor.java | 23 +- .../scheduler/SchedulerUtils.java | 55 +- .../scheduler/TestSchedulerUtils.java | 630 ++++++++++-------- 4 files changed, 430 insertions(+), 314 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidResourceRequestException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidResourceRequestException.java index f4fd2fa38a1..1ea9eefd87f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidResourceRequestException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidResourceRequestException.java @@ -30,19 +30,55 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; * */ public class InvalidResourceRequestException extends YarnException { + public static final String LESS_THAN_ZERO_RESOURCE_MESSAGE_TEMPLATE = + "Invalid resource request! Cannot allocate containers as " + + "requested resource is less than 0! " + + "Requested resource type=[%s], " + "Requested resource=%s"; + + public static final String GREATER_THAN_MAX_RESOURCE_MESSAGE_TEMPLATE = + "Invalid resource request! Cannot allocate containers as " + + "requested resource is greater than " + + "maximum allowed allocation. " + + "Requested resource type=[%s], " + + "Requested resource=%s, maximum allowed allocation=%s, " + + "please note that maximum allowed allocation is calculated " + + "by scheduler based on maximum resource of registered " + + "NodeManagers, which might be less than configured " + + "maximum allocation=%s"; + + public static final String UNKNOWN_REASON_MESSAGE_TEMPLATE = + "Invalid resource request! " + + "Cannot allocate containers for an unknown reason! " + + "Requested resource type=[%s], Requested resource=%s"; + + public enum InvalidResourceType { + LESS_THAN_ZERO, GREATER_THEN_MAX_ALLOCATION, UNKNOWN; + } private static final long serialVersionUID = 13498237L; + private final InvalidResourceType invalidResourceType; public InvalidResourceRequestException(Throwable cause) { super(cause); + this.invalidResourceType = InvalidResourceType.UNKNOWN; } public InvalidResourceRequestException(String message) { + this(message, InvalidResourceType.UNKNOWN); + } + + public InvalidResourceRequestException(String message, + InvalidResourceType invalidResourceType) { super(message); + this.invalidResourceType = invalidResourceType; } public InvalidResourceRequestException(String message, Throwable cause) { super(message, cause); + this.invalidResourceType = InvalidResourceType.UNKNOWN; } + public InvalidResourceType getInvalidResourceType() { + return invalidResourceType; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 71558a76be9..43f73e48a40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -53,6 +53,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException + .InvalidResourceType; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -89,6 +91,12 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import static org.apache.hadoop.yarn.exceptions + .InvalidResourceRequestException.InvalidResourceType + .GREATER_THEN_MAX_ALLOCATION; +import static org.apache.hadoop.yarn.exceptions + .InvalidResourceRequestException.InvalidResourceType.LESS_THAN_ZERO; + /** * This is the default Application Master Service processor. It has be the * last processor in the @{@link AMSProcessingChain}. @@ -231,8 +239,8 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { maximumCapacity, app.getQueue(), getScheduler(), getRmContext()); } catch (InvalidResourceRequestException e) { - LOG.warn("Invalid resource ask by application " + appAttemptId, e); - throw e; + RMAppAttempt rmAppAttempt = app.getRMAppAttempt(appAttemptId); + handleInvalidResourceException(e, rmAppAttempt); } try { @@ -336,6 +344,17 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { allocation.getPreviousAttemptContainers()); } + private void handleInvalidResourceException(InvalidResourceRequestException e, + RMAppAttempt rmAppAttempt) throws InvalidResourceRequestException { + if (e.getInvalidResourceType() == LESS_THAN_ZERO || + e.getInvalidResourceType() == GREATER_THEN_MAX_ALLOCATION) { + rmAppAttempt.updateAMLaunchDiagnostics(e.getMessage()); + } + LOG.warn("Invalid resource ask by application " + + rmAppAttempt.getAppAttemptId(), e); + throw e; + } + private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) { Map updatedNodes = new HashMap<>(); if(app.pullRMNodeUpdates(updatedNodes) > 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 844057ea142..9b07d37de52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException + .InvalidResourceType; import org.apache.hadoop.yarn.exceptions .SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -61,6 +63,15 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import static org.apache.hadoop.yarn.exceptions + .InvalidResourceRequestException + .GREATER_THAN_MAX_RESOURCE_MESSAGE_TEMPLATE; +import static org.apache.hadoop.yarn.exceptions + .InvalidResourceRequestException + .LESS_THAN_ZERO_RESOURCE_MESSAGE_TEMPLATE; +import static org.apache.hadoop.yarn.exceptions + .InvalidResourceRequestException.UNKNOWN_REASON_MESSAGE_TEMPLATE; + /** * Utilities shared by schedulers. */ @@ -257,9 +268,9 @@ public class SchedulerUtils { } - public static void normalizeAndValidateRequest(ResourceRequest resReq, - Resource maximumResource, String queueName, YarnScheduler scheduler, - boolean isRecovery, RMContext rmContext, QueueInfo queueInfo) + private static void normalizeAndValidateRequest(ResourceRequest resReq, + Resource maximumResource, String queueName, YarnScheduler scheduler, + boolean isRecovery, RMContext rmContext, QueueInfo queueInfo) throws InvalidResourceRequestException { Configuration conf = rmContext.getYarnConfiguration(); // If Node label is not enabled throw exception @@ -384,13 +395,13 @@ public class SchedulerUtils { if (requestedRI.getValue() < 0) { throwInvalidResourceException(reqResource, availableResource, - reqResourceName); + reqResourceName, InvalidResourceType.LESS_THAN_ZERO); } boolean valid = checkResource(requestedRI, availableResource); if (!valid) { throwInvalidResourceException(reqResource, availableResource, - reqResourceName); + reqResourceName, InvalidResourceType.GREATER_THEN_MAX_ALLOCATION); } } } @@ -470,18 +481,30 @@ public class SchedulerUtils { } private static void throwInvalidResourceException(Resource reqResource, - Resource availableResource, String reqResourceName) + Resource maxAllowedAllocation, String reqResourceName, + InvalidResourceType invalidResourceType) throws InvalidResourceRequestException { - throw new InvalidResourceRequestException( - "Invalid resource request, requested resource type=[" + reqResourceName - + "] < 0 or greater than maximum allowed allocation. Requested " - + "resource=" + reqResource + ", maximum allowed allocation=" - + availableResource - + ", please note that maximum allowed allocation is calculated " - + "by scheduler based on maximum resource of registered " - + "NodeManagers, which might be less than configured " - + "maximum allocation=" - + ResourceUtils.getResourceTypesMaximumAllocation()); + final String message; + + if (invalidResourceType == InvalidResourceType.LESS_THAN_ZERO) { + message = String.format(LESS_THAN_ZERO_RESOURCE_MESSAGE_TEMPLATE, + reqResourceName, reqResource); + } else if (invalidResourceType == + InvalidResourceType.GREATER_THEN_MAX_ALLOCATION) { + message = String.format(GREATER_THAN_MAX_RESOURCE_MESSAGE_TEMPLATE, + reqResourceName, reqResource, maxAllowedAllocation, + ResourceUtils.getResourceTypesMaximumAllocation()); + } else if (invalidResourceType == InvalidResourceType.UNKNOWN) { + message = String.format(UNKNOWN_REASON_MESSAGE_TEMPLATE, reqResourceName, + reqResource); + } else { + throw new IllegalArgumentException(String.format( + "InvalidResourceType argument should be either " + "%s, %s or %s", + InvalidResourceType.LESS_THAN_ZERO, + InvalidResourceType.GREATER_THEN_MAX_ALLOCATION, + InvalidResourceType.UNKNOWN)); + } + throw new InvalidResourceRequestException(message, invalidResourceType); } private static void checkQueueLabelInLabelManager(String labelExpression, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index 15cfdb01e7b..2ec2de29ff7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import static org.apache.hadoop.yarn.exceptions + .InvalidResourceRequestException.InvalidResourceType + .GREATER_THEN_MAX_ALLOCATION; +import static org.apache.hadoop.yarn.exceptions + .InvalidResourceRequestException.InvalidResourceType.LESS_THAN_ZERO; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -67,6 +72,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException + .InvalidResourceType; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -145,7 +152,7 @@ public class TestSchedulerUtils { private void initResourceTypes() { Configuration yarnConf = new Configuration(); yarnConf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, - CustomResourceTypesConfigurationProvider.class.getName()); + CustomResourceTypesConfigurationProvider.class.getName()); ResourceUtils.resetResourceTypes(yarnConf); } @@ -162,51 +169,51 @@ public class TestSchedulerUtils { .build()); } - @Test (timeout = 30000) + @Test(timeout = 30000) public void testNormalizeRequest() { ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); - + final int minMemory = 1024; final int maxMemory = 8192; Resource minResource = Resources.createResource(minMemory, 0); Resource maxResource = Resources.createResource(maxMemory, 0); - + ResourceRequest ask = new ResourceRequestPBImpl(); // case negative memory ask.setCapability(Resources.createResource(-1024)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, - maxResource); + maxResource); assertEquals(minMemory, ask.getCapability().getMemorySize()); // case zero memory ask.setCapability(Resources.createResource(0)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, - maxResource); + maxResource); assertEquals(minMemory, ask.getCapability().getMemorySize()); // case memory is a multiple of minMemory ask.setCapability(Resources.createResource(2 * minMemory)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, - maxResource); + maxResource); assertEquals(2 * minMemory, ask.getCapability().getMemorySize()); // case memory is not a multiple of minMemory ask.setCapability(Resources.createResource(minMemory + 10)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, - maxResource); + maxResource); assertEquals(2 * minMemory, ask.getCapability().getMemorySize()); // case memory is equal to max allowed ask.setCapability(Resources.createResource(maxMemory)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, - maxResource); + maxResource); assertEquals(maxMemory, ask.getCapability().getMemorySize()); // case memory is just less than max ask.setCapability(Resources.createResource(maxMemory - 10)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, - maxResource); + maxResource); assertEquals(maxMemory, ask.getCapability().getMemorySize()); // max is not a multiple of min @@ -214,39 +221,39 @@ public class TestSchedulerUtils { ask.setCapability(Resources.createResource(maxMemory - 100)); // multiple of minMemory > maxMemory, then reduce to maxMemory SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, - maxResource); + maxResource); assertEquals(maxResource.getMemorySize(), - ask.getCapability().getMemorySize()); + ask.getCapability().getMemorySize()); // ask is more than max maxResource = Resources.createResource(maxMemory, 0); ask.setCapability(Resources.createResource(maxMemory + 100)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, - maxResource); + maxResource); assertEquals(maxResource.getMemorySize(), - ask.getCapability().getMemorySize()); + ask.getCapability().getMemorySize()); } - @Test (timeout = 30000) + @Test(timeout = 30000) public void testNormalizeRequestWithDominantResourceCalculator() { ResourceCalculator resourceCalculator = new DominantResourceCalculator(); - + Resource minResource = Resources.createResource(1024, 1); Resource maxResource = Resources.createResource(10240, 10); Resource clusterResource = Resources.createResource(10 * 1024, 10); - + ResourceRequest ask = new ResourceRequestPBImpl(); // case negative memory/vcores ask.setCapability(Resources.createResource(-1024, -1)); SchedulerUtils.normalizeRequest( - ask, resourceCalculator, minResource, maxResource); + ask, resourceCalculator, minResource, maxResource); assertEquals(minResource, ask.getCapability()); // case zero memory/vcores ask.setCapability(Resources.createResource(0, 0)); SchedulerUtils.normalizeRequest( - ask, resourceCalculator, minResource, maxResource); + ask, resourceCalculator, minResource, maxResource); assertEquals(minResource, ask.getCapability()); assertEquals(1, ask.getCapability().getVirtualCores()); assertEquals(1024, ask.getCapability().getMemorySize()); @@ -254,28 +261,28 @@ public class TestSchedulerUtils { // case non-zero memory & zero cores ask.setCapability(Resources.createResource(1536, 0)); SchedulerUtils.normalizeRequest( - ask, resourceCalculator, minResource, maxResource); + ask, resourceCalculator, minResource, maxResource); assertEquals(Resources.createResource(2048, 1), ask.getCapability()); assertEquals(1, ask.getCapability().getVirtualCores()); assertEquals(2048, ask.getCapability().getMemorySize()); } - + @Test(timeout = 30000) public void testValidateResourceRequestWithErrorLabelsPermission() - throws IOException { + throws IOException { // mock queue and scheduler YarnScheduler scheduler = mock(YarnScheduler.class); Set queueAccessibleNodeLabels = Sets.newHashSet(); QueueInfo queueInfo = mock(QueueInfo.class); when(queueInfo.getQueueName()).thenReturn("queue"); when(queueInfo.getAccessibleNodeLabels()) - .thenReturn(queueAccessibleNodeLabels); + .thenReturn(queueAccessibleNodeLabels); when(scheduler.getQueueInfo(any(String.class), anyBoolean(), anyBoolean())) - .thenReturn(queueInfo); + .thenReturn(queueInfo); Resource maxResource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); // queue has labels, success cases try { @@ -283,36 +290,36 @@ public class TestSchedulerUtils { queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); rmContext.getNodeLabelManager().addToCluserNodeLabels( - ImmutableSet.of(NodeLabel.newInstance("x"), - NodeLabel.newInstance("y"))); + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"))); Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); + mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); resReq.setNodeLabelExpression("y"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); - + scheduler, rmContext); + resReq.setNodeLabelExpression(""); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); - + scheduler, rmContext); + resReq.setNodeLabelExpression(" "); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); } catch (InvalidResourceRequestException e) { e.printStackTrace(); fail("Should be valid when request labels is a subset of queue labels"); } finally { rmContext.getNodeLabelManager().removeFromClusterNodeLabels( - Arrays.asList("x", "y")); + Arrays.asList("x", "y")); } - + // same as above, but cluster node labels don't contains label being // requested. should fail try { @@ -320,42 +327,42 @@ public class TestSchedulerUtils { queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); + mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); - + scheduler, rmContext); + fail("Should fail"); } catch (InvalidResourceRequestException e) { } - + // queue has labels, failed cases (when ask a label not included by queue) try { // set queue accessible node labesl to [x, y] queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); rmContext.getNodeLabelManager().addToCluserNodeLabels( - ImmutableSet.of(NodeLabel.newInstance("x"), - NodeLabel.newInstance("y"))); - + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"))); + Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); + mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("z"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { } finally { rmContext.getNodeLabelManager().removeFromClusterNodeLabels( - Arrays.asList("x", "y")); + Arrays.asList("x", "y")); } - + // we don't allow specify more than two node labels in a single expression // now try { @@ -363,225 +370,225 @@ public class TestSchedulerUtils { queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); rmContext.getNodeLabelManager().addToCluserNodeLabels( - ImmutableSet.of(NodeLabel.newInstance("x"), - NodeLabel.newInstance("y"))); - + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"))); + Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); + mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("x && y"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { } finally { rmContext.getNodeLabelManager().removeFromClusterNodeLabels( - Arrays.asList("x", "y")); + Arrays.asList("x", "y")); } - + // queue doesn't have label, succeed (when request no label) queueAccessibleNodeLabels.clear(); try { // set queue accessible node labels to empty queueAccessibleNodeLabels.clear(); - + Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); + mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); - + scheduler, rmContext); + resReq.setNodeLabelExpression(""); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); - + scheduler, rmContext); + resReq.setNodeLabelExpression(" "); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); } catch (InvalidResourceRequestException e) { e.printStackTrace(); fail("Should be valid when request labels is empty"); } - boolean invalidlabelexception=false; + boolean invalidlabelexception = false; // queue doesn't have label, failed (when request any label) try { // set queue accessible node labels to empty queueAccessibleNodeLabels.clear(); - + rmContext.getNodeLabelManager().addToCluserNodeLabels( - ImmutableSet.of(NodeLabel.newInstance("x"))); - + ImmutableSet.of(NodeLabel.newInstance("x"))); + Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); + mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); fail("Should fail"); } catch (InvalidLabelResourceRequestException e) { - invalidlabelexception=true; + invalidlabelexception = true; } catch (InvalidResourceRequestException e) { } finally { rmContext.getNodeLabelManager().removeFromClusterNodeLabels( - Arrays.asList("x")); + Arrays.asList("x")); } Assert.assertTrue("InvalidLabelResourceRequestException expected", - invalidlabelexception); + invalidlabelexception); // queue is "*", always succeeded try { // set queue accessible node labels to empty queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY); - + rmContext.getNodeLabelManager().addToCluserNodeLabels( - ImmutableSet.of(NodeLabel.newInstance("x"), - NodeLabel.newInstance("y"), NodeLabel.newInstance("z"))); - + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"), NodeLabel.newInstance("z"))); + Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); + mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); - + scheduler, rmContext); + resReq.setNodeLabelExpression("y"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); - + scheduler, rmContext); + resReq.setNodeLabelExpression("z"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); } catch (InvalidResourceRequestException e) { e.printStackTrace(); fail("Should be valid when queue can access any labels"); } finally { rmContext.getNodeLabelManager().removeFromClusterNodeLabels( - Arrays.asList("x", "y", "z")); + Arrays.asList("x", "y", "z")); } - + // same as above, but cluster node labels don't contains label, should fail try { // set queue accessible node labels to empty queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY); - + Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); + mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { } - + // we don't allow resource name other than ANY and specify label try { // set queue accessible node labesl to [x, y] queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); rmContext.getNodeLabelManager().addToCluserNodeLabels( - ImmutableSet.of(NodeLabel.newInstance("x"), - NodeLabel.newInstance("y"))); - + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"))); + Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), "rack", resource, 1); + mock(Priority.class), "rack", resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { } finally { rmContext.getNodeLabelManager().removeFromClusterNodeLabels( - Arrays.asList("x", "y")); + Arrays.asList("x", "y")); } - + // we don't allow resource name other than ANY and specify label even if // queue has accessible label = * try { // set queue accessible node labesl to * queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays - .asList(CommonNodeLabelsManager.ANY)); + .asList(CommonNodeLabelsManager.ANY)); rmContext.getNodeLabelManager().addToCluserNodeLabels( - ImmutableSet.of(NodeLabel.newInstance("x"))); - + ImmutableSet.of(NodeLabel.newInstance("x"))); + Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), "rack", resource, 1); + mock(Priority.class), "rack", resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { } finally { rmContext.getNodeLabelManager().removeFromClusterNodeLabels( - Arrays.asList("x")); + Arrays.asList("x")); } try { Resource resource = Resources.createResource(0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq1 = BuilderUtils - .newResourceRequest(mock(Priority.class), "*", resource, 1, "x"); + .newResourceRequest(mock(Priority.class), "*", resource, 1, "x"); SchedulerUtils.normalizeAndvalidateRequest(resReq1, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { assertEquals("Invalid label resource request, cluster do not contain , " - + "label= x", e.getMessage()); + + "label= x", e.getMessage()); } try { rmContext.getYarnConfiguration() - .set(YarnConfiguration.NODE_LABELS_ENABLED, "false"); + .set(YarnConfiguration.NODE_LABELS_ENABLED, "false"); Resource resource = Resources.createResource(0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq1 = BuilderUtils - .newResourceRequest(mock(Priority.class), "*", resource, 1, "x"); + .newResourceRequest(mock(Priority.class), "*", resource, 1, "x"); SchedulerUtils.normalizeAndvalidateRequest(resReq1, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); Assert.assertEquals(RMNodeLabelsManager.NO_LABEL, - resReq1.getNodeLabelExpression()); + resReq1.getNodeLabelExpression()); } catch (InvalidResourceRequestException e) { assertEquals("Invalid resource request, node label not enabled but " - + "request contains label expression", e.getMessage()); + + "request contains label expression", e.getMessage()); } } - @Test (timeout = 30000) + @Test(timeout = 30000) public void testValidateResourceRequest() { YarnScheduler mockScheduler = mock(YarnScheduler.class); Resource maxResource = - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); // zero memory try { Resource resource = - Resources.createResource(0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + Resources.createResource(0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = - BuilderUtils.newResourceRequest(mock(Priority.class), - ResourceRequest.ANY, resource, 1); + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler, rmContext); + mockScheduler, rmContext); } catch (InvalidResourceRequestException e) { fail("Zero memory should be accepted"); } @@ -589,13 +596,13 @@ public class TestSchedulerUtils { // zero vcores try { Resource resource = - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); ResourceRequest resReq = - BuilderUtils.newResourceRequest(mock(Priority.class), - ResourceRequest.ANY, resource, 1); + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler, rmContext); + mockScheduler, rmContext); } catch (InvalidResourceRequestException e) { fail("Zero vcores should be accepted"); } @@ -603,14 +610,14 @@ public class TestSchedulerUtils { // max memory try { Resource resource = - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = - BuilderUtils.newResourceRequest(mock(Priority.class), - ResourceRequest.ANY, resource, 1); + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler, rmContext); + mockScheduler, rmContext); } catch (InvalidResourceRequestException e) { fail("Max memory should be accepted"); } @@ -618,14 +625,14 @@ public class TestSchedulerUtils { // max vcores try { Resource resource = - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); ResourceRequest resReq = - BuilderUtils.newResourceRequest(mock(Priority.class), - ResourceRequest.ANY, resource, 1); + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler, rmContext); + mockScheduler, rmContext); } catch (InvalidResourceRequestException e) { fail("Max vcores should not be accepted"); } @@ -633,77 +640,77 @@ public class TestSchedulerUtils { // negative memory try { Resource resource = - Resources.createResource(-1, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + Resources.createResource(-1, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = - BuilderUtils.newResourceRequest(mock(Priority.class), - ResourceRequest.ANY, resource, 1); + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler, rmContext); + mockScheduler, rmContext); fail("Negative memory should not be accepted"); } catch (InvalidResourceRequestException e) { - // expected + assertEquals(LESS_THAN_ZERO, e.getInvalidResourceType()); } // negative vcores try { Resource resource = - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1); + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1); ResourceRequest resReq = - BuilderUtils.newResourceRequest(mock(Priority.class), - ResourceRequest.ANY, resource, 1); + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler, rmContext); + mockScheduler, rmContext); fail("Negative vcores should not be accepted"); } catch (InvalidResourceRequestException e) { - // expected + assertEquals(LESS_THAN_ZERO, e.getInvalidResourceType()); } // more than max memory try { Resource resource = - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = - BuilderUtils.newResourceRequest(mock(Priority.class), - ResourceRequest.ANY, resource, 1); + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler, rmContext); + mockScheduler, rmContext); fail("More than max memory should not be accepted"); } catch (InvalidResourceRequestException e) { - // expected + assertEquals(GREATER_THEN_MAX_ALLOCATION, e.getInvalidResourceType()); } // more than max vcores try { Resource resource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1); + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1); ResourceRequest resReq = - BuilderUtils.newResourceRequest(mock(Priority.class), - ResourceRequest.ANY, resource, 1); + BuilderUtils.newResourceRequest(mock(Priority.class), + ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler, rmContext); + mockScheduler, rmContext); fail("More than max vcores should not be accepted"); } catch (InvalidResourceRequestException e) { - // expected + assertEquals(GREATER_THEN_MAX_ALLOCATION, e.getInvalidResourceType()); } } - + @Test public void testValidateResourceBlacklistRequest() throws Exception { MyContainerManager containerManager = new MyContainerManager(); final MockRMWithAMS rm = - new MockRMWithAMS(new YarnConfiguration(), containerManager); + new MockRMWithAMS(new YarnConfiguration(), containerManager); rm.start(); MockNM nm1 = rm.registerNode("localhost:1234", 5120); Map acls = - new HashMap(2); + new HashMap(2); acls.put(ApplicationAccessType.VIEW_APP, "*"); RMApp app = rm.submitApp(1024, "appname", "appuser", acls); @@ -718,33 +725,33 @@ public class TestSchedulerUtils { final YarnRPC rpc = YarnRPC.create(yarnConf); UserGroupInformation currentUser = - UserGroupInformation.createRemoteUser(applicationAttemptId.toString()); + UserGroupInformation.createRemoteUser(applicationAttemptId.toString()); Credentials credentials = containerManager.getContainerCredentials(); final InetSocketAddress rmBindAddress = - rm.getApplicationMasterService().getBindAddress(); + rm.getApplicationMasterService().getBindAddress(); Token amRMToken = - MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress, - credentials.getAllTokens()); + MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress, + credentials.getAllTokens()); currentUser.addToken(amRMToken); ApplicationMasterProtocol client = - currentUser.doAs(new PrivilegedAction() { - @Override - public ApplicationMasterProtocol run() { - return (ApplicationMasterProtocol) rpc.getProxy( - ApplicationMasterProtocol.class, rmBindAddress, yarnConf); - } - }); + currentUser.doAs(new PrivilegedAction() { + @Override + public ApplicationMasterProtocol run() { + return (ApplicationMasterProtocol) rpc.getProxy( + ApplicationMasterProtocol.class, rmBindAddress, yarnConf); + } + }); RegisterApplicationMasterRequest request = Records - .newRecord(RegisterApplicationMasterRequest.class); + .newRecord(RegisterApplicationMasterRequest.class); client.registerApplicationMaster(request); ResourceBlacklistRequest blacklistRequest = - ResourceBlacklistRequest.newInstance( - Collections.singletonList(ResourceRequest.ANY), null); + ResourceBlacklistRequest.newInstance( + Collections.singletonList(ResourceRequest.ANY), null); AllocateRequest allocateRequest = - AllocateRequest.newInstance(0, 0.0f, null, null, blacklistRequest); + AllocateRequest.newInstance(0, 0.0f, null, null, blacklistRequest); boolean error = false; try { client.allocate(allocateRequest); @@ -753,26 +760,26 @@ public class TestSchedulerUtils { } rm.stop(); - + Assert.assertTrue( - "Didn't not catch InvalidResourceBlacklistRequestException", error); + "Didn't not catch InvalidResourceBlacklistRequestException", error); } private void waitForLaunchedState(RMAppAttempt attempt) - throws InterruptedException { + throws InterruptedException { int waitCount = 0; while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED - && waitCount++ < 20) { + && waitCount++ < 20) { LOG.info("Waiting for AppAttempt to reach LAUNCHED state. " - + "Current state is " + attempt.getAppAttemptState()); + + "Current state is " + attempt.getAppAttemptState()); Thread.sleep(1000); } Assert.assertEquals(attempt.getAppAttemptState(), - RMAppAttemptState.LAUNCHED); + RMAppAttemptState.LAUNCHED); } @Test - public void testComparePriorities(){ + public void testComparePriorities() { Priority high = Priority.newInstance(1); Priority low = Priority.newInstance(2); assertTrue(high.compareTo(low) > 0); @@ -781,22 +788,22 @@ public class TestSchedulerUtils { @Test public void testCreateAbnormalContainerStatus() { ContainerStatus cd = SchedulerUtils.createAbnormalContainerStatus( - ContainerId.newContainerId(ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1), "x"); + ContainerId.newContainerId(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1), "x"); Assert.assertEquals(ContainerExitStatus.ABORTED, cd.getExitStatus()); } @Test public void testCreatePreemptedContainerStatus() { ContainerStatus cd = SchedulerUtils.createPreemptedContainerStatus( - ContainerId.newContainerId(ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1), "x"); + ContainerId.newContainerId(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1), "x"); Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus()); } - - @Test (timeout = 30000) + + @Test(timeout = 30000) public void testNormalizeNodeLabelExpression() - throws IOException { + throws IOException { // mock queue and scheduler YarnScheduler scheduler = mock(YarnScheduler.class); Set queueAccessibleNodeLabels = Sets.newHashSet(); @@ -805,11 +812,11 @@ public class TestSchedulerUtils { when(queueInfo.getAccessibleNodeLabels()).thenReturn(queueAccessibleNodeLabels); when(queueInfo.getDefaultNodeLabelExpression()).thenReturn(" x "); when(scheduler.getQueueInfo(any(String.class), anyBoolean(), anyBoolean())) - .thenReturn(queueInfo); - + .thenReturn(queueInfo); + Resource maxResource = Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); // queue has labels, success cases try { @@ -817,156 +824,163 @@ public class TestSchedulerUtils { queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); rmContext.getNodeLabelManager().addToCluserNodeLabels( - ImmutableSet.of(NodeLabel.newInstance("x"), - NodeLabel.newInstance("y"))); + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"))); Resource resource = Resources.createResource( - 0, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); ResourceRequest resReq = BuilderUtils.newResourceRequest( - mock(Priority.class), ResourceRequest.ANY, resource, 1); + mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); Assert.assertEquals("x", resReq.getNodeLabelExpression()); - + resReq.setNodeLabelExpression(" y "); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler, rmContext); + scheduler, rmContext); Assert.assertEquals("y", resReq.getNodeLabelExpression()); } catch (InvalidResourceRequestException e) { e.printStackTrace(); fail("Should be valid when request labels is a subset of queue labels"); } finally { rmContext.getNodeLabelManager().removeFromClusterNodeLabels( - Arrays.asList("x", "y")); + Arrays.asList("x", "y")); } } @Test public void testCustomResourceRequestedUnitIsSmallerThanAvailableUnit() - throws InvalidResourceRequestException { + throws InvalidResourceRequestException { Resource requestedResource = - ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap.of("custom-resource-1", "11")); + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "11")); Resource availableResource = - ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap.of("custom-resource-1", "0G")); + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "0G")); exception.expect(InvalidResourceRequestException.class); exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator - .create().withRequestedResourceType("custom-resource-1") - .withRequestedResource(requestedResource) - .withAvailableAllocation(availableResource) - .withMaxAllocation(configuredMaxAllocation).build()); + .create().withRequestedResourceType("custom-resource-1") + .withRequestedResource(requestedResource) + .withAvailableAllocation(availableResource) + .withMaxAllocation(configuredMaxAllocation) + .withInvalidResourceType(GREATER_THEN_MAX_ALLOCATION) + .build()); SchedulerUtils.checkResourceRequestAgainstAvailableResource( - requestedResource, availableResource); + requestedResource, availableResource); } @Test public void testCustomResourceRequestedUnitIsSmallerThanAvailableUnit2() { Resource requestedResource = - ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap.of("custom-resource-1", "11")); + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "11")); Resource availableResource = - ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap.of("custom-resource-1", "1G")); + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "1G")); try { SchedulerUtils.checkResourceRequestAgainstAvailableResource( - requestedResource, availableResource); + requestedResource, availableResource); } catch (InvalidResourceRequestException e) { fail(String.format( - "Resource request should be accepted. Requested: %s, available: %s", - requestedResource, availableResource)); + "Resource request should be accepted. Requested: %s, available: %s", + requestedResource, availableResource)); } } @Test public void testCustomResourceRequestedUnitIsGreaterThanAvailableUnit() - throws InvalidResourceRequestException { + throws InvalidResourceRequestException { Resource requestedResource = - ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap.of("custom-resource-1", "1M")); + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "1M")); Resource availableResource = ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap. builder().put("custom-resource-1", "120k") - .build()); + ImmutableMap.builder().put("custom-resource-1", + "120k") + .build()); exception.expect(InvalidResourceRequestException.class); exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator - .create().withRequestedResourceType("custom-resource-1") - .withRequestedResource(requestedResource) - .withAvailableAllocation(availableResource) - .withMaxAllocation(configuredMaxAllocation).build()); + .create().withRequestedResourceType("custom-resource-1") + .withRequestedResource(requestedResource) + .withAvailableAllocation(availableResource) + .withMaxAllocation(configuredMaxAllocation) + .withInvalidResourceType(GREATER_THEN_MAX_ALLOCATION) + .build()); SchedulerUtils.checkResourceRequestAgainstAvailableResource( - requestedResource, availableResource); + requestedResource, availableResource); } @Test public void testCustomResourceRequestedUnitIsGreaterThanAvailableUnit2() { Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap. builder().put("custom-resource-1", "11M") - .build()); + ImmutableMap.builder().put("custom-resource-1", "11M") + .build()); Resource availableResource = - ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap.of("custom-resource-1", "1G")); + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "1G")); try { SchedulerUtils.checkResourceRequestAgainstAvailableResource( - requestedResource, availableResource); + requestedResource, availableResource); } catch (InvalidResourceRequestException e) { fail(String.format( - "Resource request should be accepted. Requested: %s, available: %s", - requestedResource, availableResource)); + "Resource request should be accepted. Requested: %s, available: %s", + requestedResource, availableResource)); } } @Test public void testCustomResourceRequestedUnitIsSameAsAvailableUnit() { Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap.of("custom-resource-1", "11M")); + ImmutableMap.of("custom-resource-1", "11M")); Resource availableResource = ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap.of("custom-resource-1", "100M")); + ImmutableMap.of("custom-resource-1", "100M")); try { SchedulerUtils.checkResourceRequestAgainstAvailableResource( - requestedResource, availableResource); + requestedResource, availableResource); } catch (InvalidResourceRequestException e) { fail(String.format( - "Resource request should be accepted. Requested: %s, available: %s", - requestedResource, availableResource)); + "Resource request should be accepted. Requested: %s, available: %s", + requestedResource, availableResource)); } } @Test public void testCustomResourceRequestedUnitIsSameAsAvailableUnit2() - throws InvalidResourceRequestException { + throws InvalidResourceRequestException { Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap.of("custom-resource-1", "110M")); + ImmutableMap.of("custom-resource-1", "110M")); Resource availableResource = ResourceTypesTestHelper.newResource(1, 1, - ImmutableMap.of("custom-resource-1", "100M")); + ImmutableMap.of("custom-resource-1", "100M")); exception.expect(InvalidResourceRequestException.class); exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator - .create().withRequestedResourceType("custom-resource-1") - .withRequestedResource(requestedResource) - .withAvailableAllocation(availableResource) - .withMaxAllocation(configuredMaxAllocation).build()); + .create().withRequestedResourceType("custom-resource-1") + .withRequestedResource(requestedResource) + .withAvailableAllocation(availableResource) + .withInvalidResourceType(GREATER_THEN_MAX_ALLOCATION) + .withMaxAllocation(configuredMaxAllocation) + .build()); SchedulerUtils.checkResourceRequestAgainstAvailableResource( - requestedResource, availableResource); + requestedResource, availableResource); } public static void waitSchedulerApplicationAttemptStopped( - AbstractYarnScheduler ys, - ApplicationAttemptId attemptId) throws InterruptedException { + AbstractYarnScheduler ys, + ApplicationAttemptId attemptId) throws InterruptedException { SchedulerApplicationAttempt schedulerApp = - ys.getApplicationAttempt(attemptId); + ys.getApplicationAttempt(attemptId); if (null == schedulerApp) { return; } @@ -986,35 +1000,35 @@ public class TestSchedulerUtils { } public static SchedulerApplication - verifyAppAddedAndRemovedFromScheduler( + verifyAppAddedAndRemovedFromScheduler( Map> applications, EventHandler handler, String queueName) { ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationId.newInstance(System.currentTimeMillis(), 1); AppAddedSchedulerEvent appAddedEvent = - new AppAddedSchedulerEvent(appId, queueName, "user"); + new AppAddedSchedulerEvent(appId, queueName, "user"); handler.handle(appAddedEvent); SchedulerApplication app = - applications.get(appId); + applications.get(appId); // verify application is added. Assert.assertNotNull(app); Assert.assertEquals("user", app.getUser()); AppRemovedSchedulerEvent appRemoveEvent = - new AppRemovedSchedulerEvent(appId, RMAppState.FINISHED); + new AppRemovedSchedulerEvent(appId, RMAppState.FINISHED); handler.handle(appRemoveEvent); Assert.assertNull(applications.get(appId)); return app; } - + private static RMContext getMockRMContext() { RMContext rmContext = mock(RMContext.class); RMNodeLabelsManager nlm = new NullRMNodeLabelsManager(); nlm.init(new Configuration(false)); when(rmContext.getYarnConfiguration()).thenReturn(conf); rmContext.getYarnConfiguration().set(YarnConfiguration.NODE_LABELS_ENABLED, - "true"); + "true"); when(rmContext.getNodeLabelManager()).thenReturn(nlm); return rmContext; } @@ -1026,6 +1040,7 @@ public class TestSchedulerUtils { private Resource availableAllocation; private Resource configuredMaxAllowedAllocation; private String resourceType; + private InvalidResourceType invalidResourceType; InvalidResourceRequestExceptionMessageGenerator(StringBuilder sb) { this.sb = sb; @@ -1033,7 +1048,7 @@ public class TestSchedulerUtils { public static InvalidResourceRequestExceptionMessageGenerator create() { return new InvalidResourceRequestExceptionMessageGenerator( - new StringBuilder()); + new StringBuilder()); } InvalidResourceRequestExceptionMessageGenerator withRequestedResource( @@ -1055,23 +1070,46 @@ public class TestSchedulerUtils { } InvalidResourceRequestExceptionMessageGenerator withMaxAllocation( - Resource r) { + Resource r) { this.configuredMaxAllowedAllocation = r; return this; } + InvalidResourceRequestExceptionMessageGenerator + withInvalidResourceType(InvalidResourceType invalidResourceType) { + this.invalidResourceType = invalidResourceType; + return this; + } + public String build() { - return sb - .append("Invalid resource request, requested resource type=[") - .append(resourceType).append("]") - .append(" < 0 or greater than maximum allowed allocation. ") - .append("Requested resource=").append(requestedResource).append(", ") - .append("maximum allowed allocation=").append(availableAllocation) - .append(", please note that maximum allowed allocation is calculated " - + "by scheduler based on maximum resource of " + - "registered NodeManagers, which might be less than " + - "configured maximum allocation=") - .append(configuredMaxAllowedAllocation).toString(); + if (invalidResourceType == LESS_THAN_ZERO) { + return sb.append("Invalid resource request! " + + "Cannot allocate containers as " + + "requested resource is less than 0! ") + .append("Requested resource type=[") + .append(resourceType).append("]") + .append(", Requested resource=") + .append(requestedResource).toString(); + + } else if (invalidResourceType == GREATER_THEN_MAX_ALLOCATION) { + return sb.append("Invalid resource request! " + + "Cannot allocate containers as " + + "requested resource is greater than " + + "maximum allowed allocation. ") + .append("Requested resource type=[").append(resourceType) + .append("], ") + .append("Requested resource=").append(requestedResource) + .append(", maximum allowed allocation=") + .append(availableAllocation) + .append(", please note that maximum allowed allocation is " + + "calculated by scheduler based on maximum resource " + + "of registered NodeManagers, which might be less " + + "than configured maximum allocation=") + .append(configuredMaxAllowedAllocation) + .toString(); + } + throw new IllegalStateException("Wrong type of InvalidResourceType is " + + "detected!"); } } }