From d751a61e5a8b65cb74f18d82f9a1249bfa5d4574 Mon Sep 17 00:00:00 2001 From: Jian He Date: Wed, 9 Jul 2014 18:25:45 +0000 Subject: [PATCH] YARN-1366. Changed AMRMClient to re-register with RM and send outstanding requests back to RM on work-preserving RM restart. Contributed by Rohith git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1609254 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop-yarn/hadoop-yarn-client/pom.xml | 6 + .../hadoop/yarn/client/api/AMRMClient.java | 23 +- .../api/async/impl/AMRMClientAsyncImpl.java | 4 +- .../yarn/client/api/impl/AMRMClientImpl.java | 78 ++- .../api/async/impl/TestAMRMClientAsync.java | 33 -- .../api/impl/TestAMRMClientOnRMRestart.java | 499 ++++++++++++++++++ .../src/test/resources/core-site.xml | 25 + 8 files changed, 622 insertions(+), 49 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f6e9d14e16e..d1c319cbbb4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -75,6 +75,9 @@ Release 2.5.0 - UNRELEASED YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving restart is enabled. (Anubhav Dhoot via jianhe) + YARN-1366. Changed AMRMClient to re-register with RM and send outstanding requests + back to RM on work-preserving RM restart. (Rohith via jianhe) + IMPROVEMENTS YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index e1567a96bf8..56b9981a3d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -105,6 +105,12 @@ org.apache.hadoop hadoop-yarn-common + + org.apache.hadoop + hadoop-yarn-common + test-jar + test + org.apache.hadoop diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 23f4ea13b4b..3daa1568963 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -207,14 +207,21 @@ public abstract class AMRMClient extends /** * Request additional containers and receive new container allocations. - * Requests made via addContainerRequest are sent to the - * ResourceManager. New containers assigned to the master are - * retrieved. Status of completed containers and node health updates are - * also retrieved. - * This also doubles up as a heartbeat to the ResourceManager and must be - * made periodically. - * The call may not always return any new allocations of containers. - * App should not make concurrent allocate requests. May cause request loss. + * Requests made via addContainerRequest are sent to the + * ResourceManager. New containers assigned to the master are + * retrieved. Status of completed containers and node health updates are also + * retrieved. This also doubles up as a heartbeat to the ResourceManager and + * must be made periodically. The call may not always return any new + * allocations of containers. App should not make concurrent allocate + * requests. May cause request loss. + * + *

+ * Note : If the user has not removed container requests that have already + * been satisfied, then the re-register may end up sending the entire + * container requests to the RM (including matched requests). Which would mean + * the RM could end up giving it a lot of new allocated containers. + *

+ * * @param progressIndicator Indicates progress made by the master * @return the response of the allocate request * @throws YarnException diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 57acb2c371b..e7659bd3df6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -234,8 +234,7 @@ extends AMRMClientAsync { while (true) { try { responseQueue.put(response); - if (response.getAMCommand() == AMCommand.AM_RESYNC - || response.getAMCommand() == AMCommand.AM_SHUTDOWN) { + if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) { return; } break; @@ -280,7 +279,6 @@ extends AMRMClientAsync { if (response.getAMCommand() != null) { switch(response.getAMCommand()) { - case AM_RESYNC: case AM_SHUTDOWN: handler.onShutdownRequest(); LOG.info("Shutdown requested. Stopping callback."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 1eebaaca713..1db70545786 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -47,7 +47,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; @@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.RackResolver; @@ -77,10 +80,18 @@ public class AMRMClientImpl extends AMRMClient { private int lastResponseId = 0; + protected String appHostName; + protected int appHostPort; + protected String appTrackingUrl; + protected ApplicationMasterProtocol rmClient; protected Resource clusterAvailableResources; protected int clusterNodeCount; + // blacklistedNodes is required for keeping history of blacklisted nodes that + // are sent to RM. On RESYNC command from RM, blacklistedNodes are used to get + // current blacklisted nodes and send back to RM. + protected final Set blacklistedNodes = new HashSet(); protected final Set blacklistAdditions = new HashSet(); protected final Set blacklistRemovals = new HashSet(); @@ -150,6 +161,10 @@ public class AMRMClientImpl extends AMRMClient { protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); protected final Set release = new TreeSet(); + // pendingRelease holds history or release requests.request is removed only if + // RM sends completedContainer. + // How it different from release? --> release is for per allocate() request. + protected Set pendingRelease = new TreeSet(); public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); @@ -185,19 +200,27 @@ public class AMRMClientImpl extends AMRMClient { public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException { + this.appHostName = appHostName; + this.appHostPort = appHostPort; + this.appTrackingUrl = appTrackingUrl; Preconditions.checkArgument(appHostName != null, "The host name should not be null"); Preconditions.checkArgument(appHostPort >= -1, "Port number of the host" + " should be any integers larger than or equal to -1"); - // do this only once ??? + + return registerApplicationMaster(); + } + + private RegisterApplicationMasterResponse registerApplicationMaster() + throws YarnException, IOException { RegisterApplicationMasterRequest request = - RegisterApplicationMasterRequest.newInstance(appHostName, appHostPort, - appTrackingUrl); + RegisterApplicationMasterRequest.newInstance(this.appHostName, + this.appHostPort, this.appTrackingUrl); RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request); - synchronized (this) { - if(!response.getNMTokensFromPreviousAttempts().isEmpty()) { + lastResponseId = 0; + if (!response.getNMTokensFromPreviousAttempts().isEmpty()) { populateNMTokens(response.getNMTokensFromPreviousAttempts()); } } @@ -249,6 +272,25 @@ public class AMRMClientImpl extends AMRMClient { } allocateResponse = rmClient.allocate(allocateRequest); + if (isResyncCommand(allocateResponse)) { + LOG.warn("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + synchronized (this) { + release.addAll(this.pendingRelease); + blacklistAdditions.addAll(this.blacklistedNodes); + for (Map> rr : remoteRequestsTable + .values()) { + for (Map capabalities : rr.values()) { + for (ResourceRequestInfo request : capabalities.values()) { + addResourceRequestToAsk(request.remoteRequest); + } + } + } + } + // re register with RM + registerApplicationMaster(); + return allocate(progressIndicator); + } synchronized (this) { // update these on successful RPC @@ -258,6 +300,11 @@ public class AMRMClientImpl extends AMRMClient { if (!allocateResponse.getNMTokens().isEmpty()) { populateNMTokens(allocateResponse.getNMTokens()); } + if (!pendingRelease.isEmpty() + && !allocateResponse.getCompletedContainersStatuses().isEmpty()) { + removePendingReleaseRequests(allocateResponse + .getCompletedContainersStatuses()); + } } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -288,6 +335,18 @@ public class AMRMClientImpl extends AMRMClient { return allocateResponse; } + protected void removePendingReleaseRequests( + List completedContainersStatuses) { + for (ContainerStatus containerStatus : completedContainersStatuses) { + pendingRelease.remove(containerStatus.getContainerId()); + } + } + + private boolean isResyncCommand(AllocateResponse allocateResponse) { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } + @Private @VisibleForTesting protected void populateNMTokens(List nmTokens) { @@ -324,6 +383,12 @@ public class AMRMClientImpl extends AMRMClient { } catch (InterruptedException e) { LOG.info("Interrupted while waiting for application" + " to be removed from RMStateStore"); + } catch (ApplicationMasterNotRegisteredException e) { + LOG.warn("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + // re register with RM + registerApplicationMaster(); + unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl); } } @@ -414,6 +479,7 @@ public class AMRMClientImpl extends AMRMClient { public synchronized void releaseAssignedContainer(ContainerId containerId) { Preconditions.checkArgument(containerId != null, "ContainerId can not be null."); + pendingRelease.add(containerId); release.add(containerId); } @@ -655,6 +721,7 @@ public class AMRMClientImpl extends AMRMClient { if (blacklistAdditions != null) { this.blacklistAdditions.addAll(blacklistAdditions); + this.blacklistedNodes.addAll(blacklistAdditions); // if some resources are also in blacklistRemovals updated before, we // should remove them here. this.blacklistRemovals.removeAll(blacklistAdditions); @@ -662,6 +729,7 @@ public class AMRMClientImpl extends AMRMClient { if (blacklistRemovals != null) { this.blacklistRemovals.addAll(blacklistRemovals); + this.blacklistedNodes.removeAll(blacklistRemovals); // if some resources are in blacklistAdditions before, we should remove // them here. this.blacklistAdditions.removeAll(blacklistRemovals); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index e21c4baad0e..728a558df3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -203,39 +203,6 @@ public class TestAMRMClientAsync { Assert.assertTrue(callbackHandler.callbackCount == 0); } - @Test//(timeout=10000) - public void testAMRMClientAsyncReboot() throws Exception { - Configuration conf = new Configuration(); - TestCallbackHandler callbackHandler = new TestCallbackHandler(); - @SuppressWarnings("unchecked") - AMRMClient client = mock(AMRMClientImpl.class); - - final AllocateResponse rebootResponse = createAllocateResponse( - new ArrayList(), new ArrayList(), null); - rebootResponse.setAMCommand(AMCommand.AM_RESYNC); - when(client.allocate(anyFloat())).thenReturn(rebootResponse); - - AMRMClientAsync asyncClient = - AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); - asyncClient.init(conf); - asyncClient.start(); - - synchronized (callbackHandler.notifier) { - asyncClient.registerApplicationMaster("localhost", 1234, null); - while(callbackHandler.reboot == false) { - try { - callbackHandler.notifier.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - asyncClient.stop(); - // stopping should have joined all threads and completed all callbacks - Assert.assertTrue(callbackHandler.callbackCount == 0); - } - @Test (timeout = 10000) public void testAMRMClientAsyncShutDown() throws Exception { Configuration conf = new Configuration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java new file mode 100644 index 00000000000..0e343574d10 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestAMRMClientOnRMRestart { + static Configuration conf = null; + + @BeforeClass + public static void setup() throws Exception { + conf = new Configuration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + } + + // Test does major 6 steps verification. + // Step-1 : AMRMClient send allocate request for 2 container requests + // Step-2 : 2 containers are allocated by RM. + // Step-3 : AM Send 1 containerRequest(cRequest3) and 1 releaseRequests to + // RM + // Step-4 : On RM restart, AM(does not know RM is restarted) sends additional + // containerRequest(cRequest4) and blacklisted nodes. + // Intern RM send resync command + // Step-5 : Allocater after resync command & new containerRequest(cRequest5) + // Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5 + @Test(timeout = 60000) + public void testAMRMClientResendsRequestsOnRMRestart() throws Exception { + + UserGroupInformation.setLoginUser(null); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // Phase-1 Start 1st RM + MyResourceManager rm1 = new MyResourceManager(conf, memStore); + rm1.start(); + DrainDispatcher dispatcher = + (DrainDispatcher) rm1.getRMContext().getDispatcher(); + + // Submit the application + RMApp app = rm1.submitApp(1024); + dispatcher.await(); + + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + ApplicationAttemptId appAttemptId = + app.getCurrentAppAttempt().getAppAttemptId(); + rm1.sendAMLaunched(appAttemptId); + dispatcher.await(); + + org.apache.hadoop.security.token.Token token = + rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId()) + .getRMAppAttempt(appAttemptId).getAMRMToken(); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + + // Step-1 : AMRMClient send allocate request for 2 ContainerRequest + // cRequest1 = h1 and cRequest2 = h1,h2 + // blacklisted nodes = h2 + AMRMClient amClient = new MyAMRMClientImpl(rm1); + amClient.init(conf); + amClient.start(); + + amClient.registerApplicationMaster("Host", 10000, ""); + + ContainerRequest cRequest1 = createReq(1, 1024, new String[] { "h1" }); + amClient.addContainerRequest(cRequest1); + + ContainerRequest cRequest2 = + createReq(1, 1024, new String[] { "h1", "h2" }); + amClient.addContainerRequest(cRequest2); + + List blacklistAdditions = new ArrayList(); + List blacklistRemoval = new ArrayList(); + blacklistAdditions.add("h2"); + blacklistRemoval.add("h10"); + amClient.updateBlacklist(blacklistAdditions, blacklistRemoval); + blacklistAdditions.remove("h2");// remove from local list + + AllocateResponse allocateResponse = amClient.allocate(0.1f); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, allocateResponse + .getAllocatedContainers().size()); + + // Why 4 ask, why not 3 ask even h2 is blacklisted? + // On blacklisting host,applicationmaster has to remove ask request from + // remoterequest table.Here,test does not remove explicitely + assertAsksAndReleases(4, 0, rm1); + assertBlacklistAdditionsAndRemovals(1, 1, rm1); + + // Step-2 : NM heart beat is sent. + // On 2nd AM allocate request, RM allocates 2 containers to AM + nm1.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + allocateResponse = amClient.allocate(0.2f); + dispatcher.await(); + // 2 containers are allocated i.e for cRequest1 and cRequest2. + Assert.assertEquals("No of assignments must be 0", 2, allocateResponse + .getAllocatedContainers().size()); + assertAsksAndReleases(0, 0, rm1); + assertBlacklistAdditionsAndRemovals(0, 0, rm1); + + List allocatedContainers = + allocateResponse.getAllocatedContainers(); + // removed allocated container requests + amClient.removeContainerRequest(cRequest1); + amClient.removeContainerRequest(cRequest2); + + allocateResponse = amClient.allocate(0.2f); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, allocateResponse + .getAllocatedContainers().size()); + assertAsksAndReleases(4, 0, rm1); + assertBlacklistAdditionsAndRemovals(0, 0, rm1); + + // Step-3 : Send 1 containerRequest and 1 releaseRequests to RM + ContainerRequest cRequest3 = createReq(1, 1024, new String[] { "h1" }); + amClient.addContainerRequest(cRequest3); + + int pendingRelease = 0; + Iterator it = allocatedContainers.iterator(); + while (it.hasNext()) { + amClient.releaseAssignedContainer(it.next().getId()); + pendingRelease++; + it.remove(); + break;// remove one container + } + + allocateResponse = amClient.allocate(0.3f); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, allocateResponse + .getAllocatedContainers().size()); + assertAsksAndReleases(3, pendingRelease, rm1); + assertBlacklistAdditionsAndRemovals(0, 0, rm1); + int completedContainer = + allocateResponse.getCompletedContainersStatuses().size(); + pendingRelease -= completedContainer; + + // Phase-2 start 2nd RM is up + MyResourceManager rm2 = new MyResourceManager(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + ((MyAMRMClientImpl) amClient).updateRMProxy(rm2); + dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); + + // NM should be rebooted on heartbeat, even first heartbeat for nm2 + NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); + + // new NM to represent NM re-register + nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); + dispatcher.await(); + + blacklistAdditions.add("h3"); + amClient.updateBlacklist(blacklistAdditions, null); + blacklistAdditions.remove("h3"); + + it = allocatedContainers.iterator(); + while (it.hasNext()) { + amClient.releaseAssignedContainer(it.next().getId()); + pendingRelease++; + it.remove(); + } + + ContainerRequest cRequest4 = + createReq(1, 1024, new String[] { "h1", "h2" }); + amClient.addContainerRequest(cRequest4); + + // Step-4 : On RM restart, AM(does not know RM is restarted) sends + // additional + // containerRequest and blacklisted nodes. + // Intern RM send resync command,AMRMClient resend allocate request + allocateResponse = amClient.allocate(0.3f); + dispatcher.await(); + + completedContainer = + allocateResponse.getCompletedContainersStatuses().size(); + pendingRelease -= completedContainer; + + assertAsksAndReleases(4, pendingRelease, rm2); + assertBlacklistAdditionsAndRemovals(2, 0, rm2); + + ContainerRequest cRequest5 = + createReq(1, 1024, new String[] { "h1", "h2", "h3" }); + amClient.addContainerRequest(cRequest5); + + // Step-5 : Allocater after resync command + allocateResponse = amClient.allocate(0.5f); + dispatcher.await(); + Assert.assertEquals("No of assignments must be 0", 0, allocateResponse + .getAllocatedContainers().size()); + + assertAsksAndReleases(5, 0, rm2); + assertBlacklistAdditionsAndRemovals(0, 0, rm2); + + int noAssignedContainer = 0; + int count = 5; + while (count-- > 0) { + nm1.nodeHeartbeat(true); + dispatcher.await(); + + allocateResponse = amClient.allocate(0.5f); + dispatcher.await(); + noAssignedContainer += allocateResponse.getAllocatedContainers().size(); + if (noAssignedContainer == 3) { + break; + } + Thread.sleep(1000); + } + + // Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5 + Assert.assertEquals("Number of container should be 3", 3, + noAssignedContainer); + + amClient.stop(); + rm1.stop(); + rm2.stop(); + } + + // Test verify for + // 1. AM try to unregister without registering + // 2. AM register to RM, and try to unregister immediately after RM restart + @Test(timeout = 60000) + public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception { + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // Phase-1 Start 1st RM + MyResourceManager rm1 = new MyResourceManager(conf, memStore); + rm1.start(); + DrainDispatcher dispatcher = + (DrainDispatcher) rm1.getRMContext().getDispatcher(); + + // Submit the application + RMApp app = rm1.submitApp(1024); + dispatcher.await(); + + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); // Node heartbeat + dispatcher.await(); + + ApplicationAttemptId appAttemptId = + app.getCurrentAppAttempt().getAppAttemptId(); + rm1.sendAMLaunched(appAttemptId); + dispatcher.await(); + + org.apache.hadoop.security.token.Token token = + rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId()) + .getRMAppAttempt(appAttemptId).getAMRMToken(); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + + AMRMClient amClient = new MyAMRMClientImpl(rm1); + amClient.init(conf); + amClient.start(); + + amClient.registerApplicationMaster("h1", 10000, ""); + amClient.allocate(0.1f); + + // Phase-2 start 2nd RM is up + MyResourceManager rm2 = new MyResourceManager(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + ((MyAMRMClientImpl) amClient).updateRMProxy(rm2); + dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); + + // NM should be rebooted on heartbeat, even first heartbeat for nm2 + NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); + + // new NM to represent NM re-register + nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); + + ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); + NMContainerStatus containerReport = + NMContainerStatus.newInstance(containerId, ContainerState.RUNNING, + Resource.newInstance(1024, 1), "recover container", 0, + Priority.newInstance(0), 0); + nm1.registerNode(Arrays.asList(containerReport), null); + nm1.nodeHeartbeat(true); + dispatcher.await(); + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + rm2.waitForState(appAttemptId, RMAppAttemptState.FINISHING); + nm1.nodeHeartbeat(appAttemptId, 1, ContainerState.COMPLETE); + rm2.waitForState(appAttemptId, RMAppAttemptState.FINISHED); + rm2.waitForState(app.getApplicationId(), RMAppState.FINISHED); + + amClient.stop(); + rm1.stop(); + rm2.stop(); + + } + + private static class MyFifoScheduler extends FifoScheduler { + + public MyFifoScheduler(RMContext rmContext) { + super(); + try { + Configuration conf = new Configuration(); + reinitialize(conf, rmContext); + } catch (IOException ie) { + assert (false); + } + } + + List lastAsk = null; + List lastRelease = null; + List lastBlacklistAdditions; + List lastBlacklistRemovals; + + // override this to copy the objects otherwise FifoScheduler updates the + // numContainers in same objects as kept by RMContainerAllocator + @Override + public synchronized Allocation allocate( + ApplicationAttemptId applicationAttemptId, List ask, + List release, List blacklistAdditions, + List blacklistRemovals) { + List askCopy = new ArrayList(); + for (ResourceRequest req : ask) { + ResourceRequest reqCopy = + ResourceRequest.newInstance(req.getPriority(), + req.getResourceName(), req.getCapability(), + req.getNumContainers(), req.getRelaxLocality()); + askCopy.add(reqCopy); + } + lastAsk = ask; + lastRelease = release; + lastBlacklistAdditions = blacklistAdditions; + lastBlacklistRemovals = blacklistRemovals; + return super.allocate(applicationAttemptId, askCopy, release, + blacklistAdditions, blacklistRemovals); + } + } + + private static class MyResourceManager extends MockRM { + + private static long fakeClusterTimeStamp = System.currentTimeMillis(); + + public MyResourceManager(Configuration conf, RMStateStore store) { + super(conf, store); + } + + @Override + public void serviceStart() throws Exception { + super.serviceStart(); + // Ensure that the application attempt IDs for all the tests are the same + // The application attempt IDs will be used as the login user names + MyResourceManager.setClusterTimeStamp(fakeClusterTimeStamp); + } + + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } + + @Override + protected EventHandler createSchedulerEventDispatcher() { + // Dispatch inline for test sanity + return new EventHandler() { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + + @Override + protected ResourceScheduler createScheduler() { + return new MyFifoScheduler(this.getRMContext()); + } + + MyFifoScheduler getMyFifoScheduler() { + return (MyFifoScheduler) scheduler; + } + } + + private static class MyAMRMClientImpl extends + AMRMClientImpl { + private MyResourceManager rm; + + public MyAMRMClientImpl(MyResourceManager rm) { + this.rm = rm; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + this.rmClient = this.rm.getApplicationMasterService(); + } + + @Override + protected void serviceStop() throws Exception { + rmClient = null; + super.serviceStop(); + } + + public void updateRMProxy(MyResourceManager rm) { + rmClient = rm.getApplicationMasterService(); + } + } + + private static void assertBlacklistAdditionsAndRemovals( + int expectedAdditions, int expectedRemovals, MyResourceManager rm) { + Assert.assertEquals(expectedAdditions, + rm.getMyFifoScheduler().lastBlacklistAdditions.size()); + Assert.assertEquals(expectedRemovals, + rm.getMyFifoScheduler().lastBlacklistRemovals.size()); + } + + private static void assertAsksAndReleases(int expectedAsk, + int expectedRelease, MyResourceManager rm) { + Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size()); + Assert.assertEquals(expectedRelease, + rm.getMyFifoScheduler().lastRelease.size()); + } + + private ContainerRequest createReq(int priority, int memory, String[] hosts) { + Resource capability = Resource.newInstance(memory, 1); + Priority priorityOfContainer = Priority.newInstance(priority); + return new ContainerRequest(capability, hosts, + new String[] { NetworkTopology.DEFAULT_RACK }, priorityOfContainer); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml new file mode 100644 index 00000000000..f0d3085ef85 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml @@ -0,0 +1,25 @@ + + + + + + + + + hadoop.security.token.service.use_ip + false + + +