diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index d3d1974a5d8..914a1460172 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.client.api; import java.io.IOException; import java.util.Collection; +import java.util.Map; +import java.util.Set; import java.util.function.Supplier; import java.util.List; @@ -39,7 +41,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.resource.Resources; @@ -553,6 +557,18 @@ public abstract class AMRMClient extends } } + /** + * Add a Collection of SchedulingRequests. The AMRMClient will ensure that + * all requests in the same batch are sent in the same allocate call. + * @param schedulingRequests Collection of Scheduling Requests. + */ + @Public + @InterfaceStability.Unstable + public void addSchedulingRequests( + Collection schedulingRequests) { + + } + /** * Register the application master. This must be called before any * other interaction @@ -568,7 +584,27 @@ public abstract class AMRMClient extends int appHostPort, String appTrackingUrl) throws YarnException, IOException; - + + /** + * Register the application master. This must be called before any + * other interaction + * @param appHostName Name of the host on which master is running + * @param appHostPort Port master is listening on + * @param appTrackingUrl URL at which the master info can be seen + * @param placementConstraints Placement Constraints mappings. + * @return RegisterApplicationMasterResponse + * @throws YarnException + * @throws IOException + */ + @Public + @InterfaceStability.Unstable + public RegisterApplicationMasterResponse registerApplicationMaster( + String appHostName, int appHostPort, String appTrackingUrl, + Map, PlacementConstraint> placementConstraints) + throws YarnException, IOException { + throw new YarnException("Not supported"); + } + /** * Request additional containers and receive new container allocations. * Requests made via addContainerRequest are sent to the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 2b82ad62651..0af687bd58b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.client.api.async; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -38,9 +40,12 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineV2Client; @@ -205,6 +210,19 @@ extends AbstractService { String resourceName, Resource capability); + /** + * Add a Collection of SchedulingRequests. The AMRMClient will ensure that + * all requests in the same batch are sent in the same allocate call. + * @param schedulingRequests Collection of Scheduling Requests. + */ + @Public + @Unstable + public void addSchedulingRequests( + Collection schedulingRequests) { + + } + + /** * Returns all matching ContainerRequests that match the given Priority, * ResourceName, ExecutionType and Capability. @@ -249,6 +267,26 @@ extends AbstractService { String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException; + /** + * Register the application master. This must be called before any + * other interaction + * @param appHostName Name of the host on which master is running + * @param appHostPort Port master is listening on + * @param appTrackingUrl URL at which the master info can be seen + * @param placementConstraints Placement Constraints mappings. + * @return RegisterApplicationMasterResponse + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + public RegisterApplicationMasterResponse registerApplicationMaster( + String appHostName, int appHostPort, String appTrackingUrl, + Map, PlacementConstraint> placementConstraints) + throws YarnException, IOException { + throw new YarnException("Not supported"); + } + /** * Unregister the application master. This must be called in the end. * @param appStatus Success/Failure status of the master @@ -494,6 +532,16 @@ extends AbstractService { public void onContainersReceivedFromPreviousAttempts( List containers) { } + + /** + * Called when the RM has rejected Scheduling Requests. + * @param rejectedSchedulingRequests Rejected Scheduling Requests. + */ + @Public + @Unstable + public void onRequestsRejected( + List rejectedSchedulingRequests) { + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 33b0abaf90f..4f04b66e100 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -36,9 +38,12 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineV2Client; @@ -150,18 +155,50 @@ extends AMRMClientAsync { Resource capability) { return client.getMatchingRequests(priority, resourceName, capability); } - + + @Override + public void addSchedulingRequests( + Collection schedulingRequests) { + client.addSchedulingRequests(schedulingRequests); + } + /** * Registers this application master with the resource manager. On successful * registration, starts the heartbeating thread. + * + * @param appHostName Name of the host on which master is running + * @param appHostPort Port master is listening on + * @param appTrackingUrl URL at which the master info can be seen + * @return Register AM Response. * @throws YarnException * @throws IOException */ public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException { + return registerApplicationMaster( + appHostName, appHostPort, appTrackingUrl, null); + } + + /** + * Registers this application master with the resource manager. On successful + * registration, starts the heartbeating thread. + * + * @param appHostName Name of the host on which master is running + * @param appHostPort Port master is listening on + * @param appTrackingUrl URL at which the master info can be seen + * @param placementConstraintsMap Placement Constraints Mapping. + * @return Register AM Response. + * @throws YarnException + * @throws IOException + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + String appHostName, int appHostPort, String appTrackingUrl, + Map, PlacementConstraint> placementConstraintsMap) + throws YarnException, IOException { RegisterApplicationMasterResponse response = client - .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl); + .registerApplicationMaster(appHostName, appHostPort, + appTrackingUrl, placementConstraintsMap); heartbeatThread.start(); return response; } @@ -366,6 +403,14 @@ extends AMRMClientAsync { response.getContainersFromPreviousAttempts()); } } + List rejectedSchedulingRequests = + response.getRejectedSchedulingRequests(); + if (!rejectedSchedulingRequests.isEmpty()) { + if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) { + ((AMRMClientAsync.AbstractCallbackHandler) handler) + .onRequestsRejected(rejectedSchedulingRequests); + } + } progress = handler.getProgress(); } catch (Throwable ex) { handler.onError(ex); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 5507c07fc11..8e2336f7bd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -30,9 +30,11 @@ import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.TreeSet; import java.util.AbstractMap.SimpleEntry; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -60,9 +62,11 @@ import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -106,6 +110,12 @@ public class AMRMClientImpl extends AMRMClient { protected final Set blacklistedNodes = new HashSet(); protected final Set blacklistAdditions = new HashSet(); protected final Set blacklistRemovals = new HashSet(); + private Map, PlacementConstraint> placementConstraints = + new HashMap<>(); + private Queue> batchedSchedulingRequests = + new LinkedList<>(); + private Map, List> outstandingSchedRequests = + new ConcurrentHashMap<>(); protected Map resourceProfilesMap; @@ -218,14 +228,26 @@ public class AMRMClientImpl extends AMRMClient { } super.serviceStop(); } - + @Override public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException { + return registerApplicationMaster(appHostName, appHostPort, appTrackingUrl, + null); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + String appHostName, int appHostPort, String appTrackingUrl, + Map, PlacementConstraint> placementConstraintsMap) + throws YarnException, IOException { this.appHostName = appHostName; this.appHostPort = appHostPort; this.appTrackingUrl = appTrackingUrl; + if (placementConstraintsMap != null && !placementConstraintsMap.isEmpty()) { + this.placementConstraints.putAll(placementConstraintsMap); + } Preconditions.checkArgument(appHostName != null, "The host name should not be null"); Preconditions.checkArgument(appHostPort >= -1, "Port number of the host" @@ -240,6 +262,9 @@ public class AMRMClientImpl extends AMRMClient { RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest.newInstance(this.appHostName, this.appHostPort, this.appTrackingUrl); + if (!this.placementConstraints.isEmpty()) { + request.setPlacementConstraints(this.placementConstraints); + } RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request); synchronized (this) { @@ -248,10 +273,22 @@ public class AMRMClientImpl extends AMRMClient { populateNMTokens(response.getNMTokensFromPreviousAttempts()); } this.resourceProfilesMap = response.getResourceProfiles(); + List prevContainers = + response.getContainersFromPreviousAttempts(); + removeFromOutstandingSchedulingRequests(prevContainers); + recreateSchedulingRequestBatch(); } return response; } + @Override + public void addSchedulingRequests( + Collection schedulingRequests) { + synchronized (this.batchedSchedulingRequests) { + this.batchedSchedulingRequests.add(schedulingRequests); + } + } + @Override public AllocateResponse allocate(float progressIndicator) throws YarnException, IOException { @@ -288,6 +325,7 @@ public class AMRMClientImpl extends AMRMClient { .responseId(lastResponseId).progress(progressIndicator) .askList(askList).resourceBlacklistRequest(blacklistRequest) .releaseList(releaseList).updateRequests(updateList).build(); + populateSchedulingRequests(allocateRequest); // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -296,6 +334,10 @@ public class AMRMClientImpl extends AMRMClient { try { allocateResponse = rmClient.allocate(allocateRequest); + removeFromOutstandingSchedulingRequests( + allocateResponse.getAllocatedContainers()); + removeFromOutstandingSchedulingRequests( + allocateResponse.getContainersFromPreviousAttempts()); } catch (ApplicationMasterNotRegisteredException e) { LOG.warn("ApplicationMaster is out of sync with ResourceManager," + " hence resyncing."); @@ -397,6 +439,104 @@ public class AMRMClientImpl extends AMRMClient { return allocateResponse; } + private void populateSchedulingRequests(AllocateRequest allocateRequest) { + synchronized (this.batchedSchedulingRequests) { + if (!this.batchedSchedulingRequests.isEmpty()) { + List newReqs = new LinkedList<>(); + Iterator> iter = + this.batchedSchedulingRequests.iterator(); + while (iter.hasNext()) { + Collection requests = iter.next(); + newReqs.addAll(requests); + addToOutstandingSchedulingRequests(requests); + iter.remove(); + } + allocateRequest.setSchedulingRequests(newReqs); + } + } + } + + private void recreateSchedulingRequestBatch() { + List batched = new ArrayList<>(); + synchronized (this.outstandingSchedRequests) { + for (List schedReqs : + this.outstandingSchedRequests.values()) { + batched.addAll(schedReqs); + } + } + synchronized (this.batchedSchedulingRequests) { + this.batchedSchedulingRequests.add(batched); + } + } + + private void addToOutstandingSchedulingRequests( + Collection requests) { + for (SchedulingRequest req : requests) { + List schedulingRequests = + this.outstandingSchedRequests.computeIfAbsent( + req.getAllocationTags(), x -> new LinkedList<>()); + SchedulingRequest matchingReq = null; + synchronized (schedulingRequests) { + for (SchedulingRequest schedReq : schedulingRequests) { + if (isMatching(req, schedReq)) { + matchingReq = schedReq; + break; + } + } + if (matchingReq != null) { + matchingReq.getResourceSizing().setNumAllocations( + req.getResourceSizing().getNumAllocations()); + } else { + schedulingRequests.add(req); + } + } + } + } + + private boolean isMatching(SchedulingRequest schedReq1, + SchedulingRequest schedReq2) { + return schedReq1.getPriority().equals(schedReq2.getPriority()) && + schedReq1.getExecutionType().getExecutionType().equals( + schedReq1.getExecutionType().getExecutionType()) && + schedReq1.getAllocationRequestId() == + schedReq2.getAllocationRequestId(); + } + + private void removeFromOutstandingSchedulingRequests( + Collection containers) { + if (containers == null || containers.isEmpty()) { + return; + } + for (Container container : containers) { + if (container.getAllocationTags() != null && + !container.getAllocationTags().isEmpty()) { + List schedReqs = + this.outstandingSchedRequests.get(container.getAllocationTags()); + if (schedReqs != null && !schedReqs.isEmpty()) { + synchronized (schedReqs) { + Iterator iter = schedReqs.iterator(); + while (iter.hasNext()) { + SchedulingRequest schedReq = iter.next(); + if (schedReq.getPriority().equals(container.getPriority()) && + schedReq.getAllocationRequestId() == + container.getAllocationRequestId()) { + int numAllocations = + schedReq.getResourceSizing().getNumAllocations(); + numAllocations--; + if (numAllocations == 0) { + iter.remove(); + } else { + schedReq.getResourceSizing() + .setNumAllocations(numAllocations); + } + } + } + } + } + } + } + } + private List createUpdateList() { List updateList = new ArrayList<>(); for (Map.Entry + * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Base class for testing AMRMClient. + */ +public class BaseAMRMClientTest { + + protected Configuration conf = null; + protected MiniYARNCluster yarnCluster = null; + protected YarnClient yarnClient = null; + protected List nodeReports = null; + protected ApplicationAttemptId attemptId = null; + + protected String schedulerName = CapacityScheduler.class.getName(); + protected boolean autoUpdate = false; + + protected int nodeCount = 3; + protected long amExpireMs = 4000; + protected int rollingIntervalSec = 13; + + + protected Resource capability; + protected Priority priority; + protected Priority priority2; + protected String node; + protected String rack; + protected String[] nodes; + protected String[] racks; + + @Before + public void setup() throws Exception { + conf = new YarnConfiguration(); + createClusterAndStartApplication(conf); + } + + protected void createClusterAndStartApplication(Configuration conf) + throws Exception { + // start minicluster + this.conf = conf; + if (autoUpdate) { + conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true); + } + conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName); + conf.setLong( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, + rollingIntervalSec); + conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, amExpireMs); + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + // set the minimum allocation so that resource decrease can go under 1024 + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + conf.setInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); + yarnCluster = new MiniYARNCluster( + TestAMRMClient.class.getName(), nodeCount, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); + + // start rm client + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + + // get node info + assertTrue("All node managers did not connect to the RM within the " + + "allotted 5-second timeout", + yarnCluster.waitForNodeManagersToConnect(5000L)); + nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); + assertEquals("Not all node managers were reported running", + nodeCount, nodeReports.size()); + + priority = Priority.newInstance(1); + priority2 = Priority.newInstance(2); + capability = Resource.newInstance(1024, 1); + + node = nodeReports.get(0).getNodeId().getHost(); + rack = nodeReports.get(0).getRackName(); + nodes = new String[]{ node }; + racks = new String[]{ rack }; + + // submit new app + ApplicationSubmissionContext appContext = + yarnClient.createApplication().getApplicationSubmissionContext(); + ApplicationId appId = appContext.getApplicationId(); + // set the application name + appContext.setApplicationName("Test"); + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(0); + appContext.setPriority(pri); + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue("default"); + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = + BuilderUtils.newContainerLaunchContext( + Collections. emptyMap(), + new HashMap(), Arrays.asList("sleep", "100"), + new HashMap(), null, + new HashMap()); + appContext.setAMContainerSpec(amContainer); + appContext.setResource(Resource.newInstance(1024, 1)); + // Create the request to send to the applications manager + SubmitApplicationRequest appRequest = Records + .newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + // Submit the application to the applications manager + yarnClient.submitApplication(appContext); + + // wait for app to start + RMAppAttempt appAttempt = null; + while (true) { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport.getYarnApplicationState() == + YarnApplicationState.ACCEPTED) { + attemptId = appReport.getCurrentApplicationAttemptId(); + appAttempt = + yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + while (true) { + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + break; + } + } + break; + } + } + // Just dig into the ResourceManager and get the AMRMToken just for the sake + // of testing. + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + + // emulate RM setup of AMRM token in credentials by adding the token + // *before* setting the token service + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + appAttempt.getAMRMToken().setService( + ClientRMProxy.getAMRMTokenService(conf)); + } + + @After + public void teardown() throws YarnException, IOException { + yarnClient.killApplication(attemptId.getApplicationId()); + attemptId = null; + + if (yarnClient != null && + yarnClient.getServiceState() == Service.STATE.STARTED) { + yarnClient.stop(); + } + if (yarnCluster != null && + yarnCluster.getServiceState() == Service.STATE.STARTED) { + yarnCluster.stop(); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 3ecc5cdf5b7..b059118bef5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -43,7 +43,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; @@ -56,24 +55,18 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMTokenCache; -import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -81,10 +74,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedule import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; -import org.junit.After; import org.junit.Assert; import org.junit.Assume; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -97,26 +88,8 @@ import org.eclipse.jetty.util.log.Log; * Test application master client class to resource manager. */ @RunWith(value = Parameterized.class) -public class TestAMRMClient { - private String schedulerName = null; - private boolean autoUpdate = false; - private Configuration conf = null; - private MiniYARNCluster yarnCluster = null; - private YarnClient yarnClient = null; - private List nodeReports = null; - private ApplicationAttemptId attemptId = null; - private int nodeCount = 3; - - static final int rolling_interval_sec = 13; - static final long am_expire_ms = 4000; +public class TestAMRMClient extends BaseAMRMClientTest{ - private Resource capability; - private Priority priority; - private Priority priority2; - private String node; - private String rack; - private String[] nodes; - private String[] racks; private final static int DEFAULT_ITERATION = 3; public TestAMRMClient(String schedulerName, boolean autoUpdate) { @@ -134,127 +107,6 @@ public class TestAMRMClient { }); } - @Before - public void setup() throws Exception { - conf = new YarnConfiguration(); - createClusterAndStartApplication(conf); - } - - private void createClusterAndStartApplication(Configuration conf) - throws Exception { - // start minicluster - this.conf = conf; - if (autoUpdate) { - conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true); - } - conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName); - conf.setLong( - YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, - rolling_interval_sec); - conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms); - conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); - // set the minimum allocation so that resource decrease can go under 1024 - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); - conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); - conf.setBoolean( - YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); - conf.setInt( - YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); - yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); - yarnCluster.init(conf); - yarnCluster.start(); - - // start rm client - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); - yarnClient.start(); - - // get node info - assertTrue("All node managers did not connect to the RM within the " - + "allotted 5-second timeout", - yarnCluster.waitForNodeManagersToConnect(5000L)); - nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); - assertEquals("Not all node managers were reported running", - nodeCount, nodeReports.size()); - - priority = Priority.newInstance(1); - priority2 = Priority.newInstance(2); - capability = Resource.newInstance(1024, 1); - - node = nodeReports.get(0).getNodeId().getHost(); - rack = nodeReports.get(0).getRackName(); - nodes = new String[]{ node }; - racks = new String[]{ rack }; - - // submit new app - ApplicationSubmissionContext appContext = - yarnClient.createApplication().getApplicationSubmissionContext(); - ApplicationId appId = appContext.getApplicationId(); - // set the application name - appContext.setApplicationName("Test"); - // Set the priority for the application master - Priority pri = Records.newRecord(Priority.class); - pri.setPriority(0); - appContext.setPriority(pri); - // Set the queue to which this application is to be submitted in the RM - appContext.setQueue("default"); - // Set up the container launch context for the application master - ContainerLaunchContext amContainer = - BuilderUtils.newContainerLaunchContext( - Collections. emptyMap(), - new HashMap(), Arrays.asList("sleep", "100"), - new HashMap(), null, - new HashMap()); - appContext.setAMContainerSpec(amContainer); - appContext.setResource(Resource.newInstance(1024, 1)); - // Create the request to send to the applications manager - SubmitApplicationRequest appRequest = Records - .newRecord(SubmitApplicationRequest.class); - appRequest.setApplicationSubmissionContext(appContext); - // Submit the application to the applications manager - yarnClient.submitApplication(appContext); - - // wait for app to start - RMAppAttempt appAttempt = null; - while (true) { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { - attemptId = appReport.getCurrentApplicationAttemptId(); - appAttempt = - yarnCluster.getResourceManager().getRMContext().getRMApps() - .get(attemptId.getApplicationId()).getCurrentAppAttempt(); - while (true) { - if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { - break; - } - } - break; - } - } - // Just dig into the ResourceManager and get the AMRMToken just for the sake - // of testing. - UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); - - // emulate RM setup of AMRM token in credentials by adding the token - // *before* setting the token service - UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); - appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf)); - } - - @After - public void teardown() throws YarnException, IOException { - yarnClient.killApplication(attemptId.getApplicationId()); - attemptId = null; - - if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) { - yarnClient.stop(); - } - if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) { - yarnCluster.stop(); - } - } - @Test (timeout = 60000) public void testAMRMClientNoMatchingRequests() throws IOException, YarnException { @@ -905,7 +757,7 @@ public class TestAMRMClient { initAMRMClientAndTest(false); } - private void initAMRMClientAndTest(boolean useAllocReqId) + protected void initAMRMClientAndTest(boolean useAllocReqId) throws YarnException, IOException { AMRMClient amClient = null; try { @@ -1946,7 +1798,7 @@ public class TestAMRMClient { // Wait for enough time and make sure the roll_over happens // At mean time, the old AMRMToken should continue to work while (System.currentTimeMillis() - startTime < - rolling_interval_sec * 1000) { + rollingIntervalSec * 1000) { amClient.allocate(0.1f); sleep(1000); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java new file mode 100644 index 00000000000..fdc8d58d723 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.lang.Thread.sleep; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; + +/** + * Test Placement Constraints and Scheduling Requests. + */ +public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest { + + @Test(timeout=60000) + public void testAMRMClientWithPlacementConstraints() + throws Exception { + // we have to create a new instance of MiniYARNCluster to avoid SASL qop + // mismatches between client and server + teardown(); + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + createClusterAndStartApplication(conf); + + AMRMClient amClient = + AMRMClient.createAMRMClient(); + amClient.setNMTokenCache(new NMTokenCache()); + //asserting we are not using the singleton instance cache + Assert.assertNotSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + + final List allocatedContainers = new ArrayList<>(); + final List rejectedSchedulingRequests = + new ArrayList<>(); + AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, 1000, + new AMRMClientAsync.AbstractCallbackHandler() { + @Override + public void onContainersAllocated(List containers) { + allocatedContainers.addAll(containers); + } + + @Override + public void onRequestsRejected( + List rejReqs) { + rejectedSchedulingRequests.addAll(rejReqs); + } + + @Override + public void onContainersCompleted(List statuses) {} + @Override + public void onContainersUpdated(List containers) {} + @Override + public void onShutdownRequest() {} + @Override + public void onNodesUpdated(List updatedNodes) {} + @Override + public void onError(Throwable e) {} + + @Override + public float getProgress() { + return 0.1f; + } + }); + + asyncClient.init(conf); + asyncClient.start(); + Map, PlacementConstraint> pcMapping = new HashMap<>(); + pcMapping.put(Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))); + pcMapping.put(Collections.singleton("bar"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("bar")))); + asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping); + + // Send two types of requests - 4 with source tag "foo" have numAlloc = 1 + // and 1 with source tag "bar" and has numAlloc = 4. Both should be + // handled similarly. i.e: Since there are only 3 nodes, + // 2 schedulingRequests - 1 with source tag "foo" on one with source + // tag "bar" should get rejected. + asyncClient.addSchedulingRequests( + Arrays.asList( + // 4 reqs with numAlloc = 1 + schedulingRequest(1, 1, 1, 1, 512, "foo"), + schedulingRequest(1, 1, 2, 1, 512, "foo"), + schedulingRequest(1, 1, 3, 1, 512, "foo"), + schedulingRequest(1, 1, 4, 1, 512, "foo"), + // 1 req with numAlloc = 4 + schedulingRequest(4, 1, 5, 1, 512, "bar"))); + + // kick the scheduler + waitForContainerAllocation(allocatedContainers, + rejectedSchedulingRequests, 6, 2); + + Assert.assertEquals(6, allocatedContainers.size()); + Map> containersPerNode = + allocatedContainers.stream().collect( + Collectors.groupingBy(Container::getNodeId)); + + // Ensure 2 containers allocated per node. + // Each node should have a "foo" and a "bar" container. + Assert.assertEquals(3, containersPerNode.entrySet().size()); + HashSet srcTags = new HashSet<>(Arrays.asList("foo", "bar")); + containersPerNode.entrySet().forEach( + x -> + Assert.assertEquals( + srcTags, + x.getValue() + .stream() + .map(y -> y.getAllocationTags().iterator().next()) + .collect(Collectors.toSet())) + ); + + // Ensure 2 rejected requests - 1 of "foo" and 1 of "bar" + Assert.assertEquals(2, rejectedSchedulingRequests.size()); + Assert.assertEquals(srcTags, + rejectedSchedulingRequests + .stream() + .map(x -> x.getRequest().getAllocationTags().iterator().next()) + .collect(Collectors.toSet())); + + asyncClient.stop(); + } + + private static void waitForContainerAllocation( + List allocatedContainers, + List rejectedRequests, + int containerNum, int rejNum) throws Exception { + + int maxCount = 10; + while (maxCount >= 0 && + (allocatedContainers.size() < containerNum || + rejectedRequests.size() < rejNum)) { + maxCount--; + sleep(1000); + } + } + + private static SchedulingRequest schedulingRequest(int numAllocations, + int priority, long allocReqId, int cores, int mem, String... tags) { + return schedulingRequest(numAllocations, priority, allocReqId, cores, mem, + ExecutionType.GUARANTEED, tags); + } + + private static SchedulingRequest schedulingRequest(int numAllocations, + int priority, long allocReqId, int cores, int mem, + ExecutionType execType, String... tags) { + return SchedulingRequest.newBuilder() + .priority(Priority.newInstance(priority)) + .allocationRequestId(allocReqId) + .allocationTags(new HashSet<>(Arrays.asList(tags))) + .executionType(ExecutionTypeRequest.newInstance(execType, true)) + .resourceSizing( + ResourceSizing.newInstance(numAllocations, + Resource.newInstance(mem, cores))) + .build(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 563df0dab77..a50422126de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -262,6 +262,9 @@ public class RMContainerImpl implements RMContainer { rmContext.getSystemMetricsPublisher().containerCreated( this, this.creationTime); } + if (this.container != null) { + this.allocationTags = this.container.getAllocationTags(); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 213d7841859..72376df6d6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -589,6 +589,7 @@ public abstract class AbstractYarnScheduler container.setVersion(status.getVersion()); container.setExecutionType(status.getExecutionType()); container.setAllocationRequestId(status.getAllocationRequestId()); + container.setAllocationTags(status.getAllocationTags()); ApplicationAttemptId attemptId = container.getId().getApplicationAttemptId(); RMContainer rmContainer = new RMContainerImpl(container, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 88a9049495d..3930a35cf38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -672,6 +672,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { containerType, container.getExecutionType(), container.getAllocationRequestId(), rmContainer.getAllocationTags())); + container.setAllocationTags(rmContainer.getAllocationTags()); updateNMToken(container); } catch (IllegalArgumentException e) { // DNS might be down, skip returning this container. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 956a3c95051..c4b82e84623 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -64,12 +64,12 @@ public final class PlacementConstraintsUtil { throws InvalidAllocationTagsQueryException { long minScopeCardinality = 0; long maxScopeCardinality = 0; - if (sc.getScope() == PlacementConstraints.NODE) { + if (sc.getScope().equals(PlacementConstraints.NODE)) { minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId, te.getTargetValues(), Long::max); maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId, te.getTargetValues(), Long::min); - } else if (sc.getScope() == PlacementConstraints.RACK) { + } else if (sc.getScope().equals(PlacementConstraints.RACK)) { minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId, te.getTargetValues(), Long::max); maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,