diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1eb2ba9f0e6..49fedbcbf70 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -747,9 +747,6 @@ Release 2.1.0-beta - 2013-07-02 YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu) - YARN-245. Fixed NodeManager to handle duplicate responses from - ResourceManager. (Mayank Bansal via vinodkv) - YARN-932. TestResourceLocalizationService.testLocalizationInit can fail on JDK7. (Karthik Kambatla via Sandy Ryza) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index bde789041bb..d4776bce758 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.SecurityUtil; @@ -159,7 +158,7 @@ public class NodeManager extends CompositeService addService(del); // NodeManager level dispatcher - this.dispatcher = (AsyncDispatcher) createDispatcher(); + this.dispatcher = new AsyncDispatcher(); nodeHealthChecker = new NodeHealthCheckerService(); addService(nodeHealthChecker); @@ -204,16 +203,6 @@ public class NodeManager extends CompositeService // TODO add local dirs to del } - @Private - protected Dispatcher createDispatcher(){ - return new AsyncDispatcher(); - } - - @Private - public Dispatcher getDispatcher(){ - return this.dispatcher; - } - @Override protected void serviceStart() throws Exception { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 781489783a6..8169677bd42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -369,13 +369,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context .getNMTokenSecretManager().getCurrentKey()); response = resourceTracker.nodeHeartbeat(request); - // Checking if the response id is the same which we just processed - // If yes then ignore the update. - if (lastHeartBeatID != response.getResponseId() - 1) { - LOG.info("Discarding the duplicate response " - + response.getResponseId()); - continue; - } //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); updateMasterKeys(response); @@ -402,6 +395,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; } + lastHeartBeatID = response.getResponseId(); List containersToCleanup = response .getContainersToCleanup(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 73904aa5816..1e7386a24eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -59,9 +59,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -433,26 +431,6 @@ public class TestNodeStatusUpdater { } } - private class MyNodeManager7 extends NodeManager { - private ResourceTracker resourceTracker; - private MyNodeStatusUpdater3 nodeStatusUpdater; - - @Override - protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - this.nodeStatusUpdater = - new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics); - resourceTracker = new MyResourceTracker7(context); - this.nodeStatusUpdater.resourceTracker = resourceTracker; - - return this.nodeStatusUpdater; - } - - protected MyNodeStatusUpdater3 getNodeStatusUpdater() { - return this.nodeStatusUpdater; - } - } - private class MyNodeManager2 extends NodeManager { public boolean isStopped = false; private NodeStatusUpdater nodeStatusUpdater; @@ -574,68 +552,6 @@ public class TestNodeStatusUpdater { } } - private class MyResourceTracker7 implements ResourceTracker { - public NodeAction heartBeatNodeAction = NodeAction.NORMAL; - public NodeAction registerNodeAction = NodeAction.NORMAL; - private final Context context; - private int lastRequestedHeartBeat = 0; - private boolean gotDuplicateHeartBeatRequest = false; - private ApplicationId appId = BuilderUtils.newApplicationId(1, 1); - - MyResourceTracker7(Context context) { - this.context = context; - } - - @Override - public RegisterNodeManagerResponse registerNodeManager( - RegisterNodeManagerRequest request) throws YarnException, IOException { - RegisterNodeManagerResponse response = - recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); - response.setNodeAction(registerNodeAction); - response.setContainerTokenMasterKey(createMasterKey()); - response.setNMTokenMasterKey(createMasterKey()); - return response; - } - - @Override - public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) - throws YarnException, IOException { - - if (lastRequestedHeartBeat != 0 - && lastRequestedHeartBeat == request.getNodeStatus().getResponseId()) { - LOG.info("GOT Duplicate heartbeatId " - + request.getNodeStatus().getResponseId()); - gotDuplicateHeartBeatRequest = true; - } - lastRequestedHeartBeat = request.getNodeStatus().getResponseId(); - LOG.info("Got heartBeatId: [" + heartBeatID + "]"); - NodeStatus nodeStatus = request.getNodeStatus(); - nodeStatus.setResponseId(heartBeatID++); - NodeHeartbeatResponse nhResponse = - YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, - heartBeatNodeAction, null, null, null, null, 1000L); - - if (heartBeatID == 5) { - LOG.info("Sending FINISH_APP for application: [" + appId + "]"); - this.context.getApplications().put(appId, mock(Application.class)); - nhResponse - .addAllApplicationsToCleanup(Collections.singletonList(appId)); - } - if (heartBeatID == 6) { - nhResponse.setResponseId(5); - LOG.info("Sending FINISH_APP for application: [" + appId + "]"); - this.context.getApplications().put(appId, mock(Application.class)); - nhResponse - .addAllApplicationsToCleanup(Collections.singletonList(appId)); - } - return nhResponse; - } - - public boolean isGotDuplicateHeartBeatRequest() { - return gotDuplicateHeartBeatRequest; - } - } - private class MyResourceTracker4 implements ResourceTracker { public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -829,7 +745,7 @@ public class TestNodeStatusUpdater { lfs.delete(new Path(basedir.getPath()), true); } - @Test(timeout = 60000) + @Test public void testNMRegistration() throws InterruptedException { nm = new NodeManager() { @Override @@ -889,7 +805,7 @@ public class TestNodeStatusUpdater { nm.stop(); } - @Test(timeout = 60000) + @Test public void testStopReentrant() throws Exception { final AtomicInteger numCleanups = new AtomicInteger(0); nm = new NodeManager() { @@ -935,49 +851,7 @@ public class TestNodeStatusUpdater { Assert.assertEquals(numCleanups.get(), 1); } - @SuppressWarnings("rawtypes") - class MyDispatcher7 extends AsyncDispatcher { - public volatile int finishapp_event; - - protected void dispatch(Event event) { - if (event.getType().name() - .equals(ContainerManagerEventType.FINISH_APPS.toString())) { - ++finishapp_event; - } - } - } - - @Test(timeout = 60000) - public void testDuplicateResponseFromRM() throws Exception { - MyNodeManager7 nm = new MyNodeManager7() { - protected Dispatcher createDispatcher() { - return new MyDispatcher7(); - } - }; - try { - YarnConfiguration conf = createNMConfig(); - conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 4000l); - nm.init(conf); - nm.start(); - MyResourceTracker7 rt = - (MyResourceTracker7) nm.getNodeStatusUpdater().getRMClient(); - while (heartBeatID < 7) { - Thread.sleep(1000l); - } - Assert.assertTrue(rt.isGotDuplicateHeartBeatRequest()); - - MyDispatcher7 nmdispatcher = (MyDispatcher7) nm.getDispatcher(); - // We are sending two FINISH_APPS in heartbeat 5 and 6 - // Checking we get only one time FINISH_APPS event which is the first one - Assert.assertEquals(1, nmdispatcher.finishapp_event); - - } finally { - if (nm.getServiceState() == STATE.STARTED) - nm.stop(); - } - } - - @Test(timeout = 60000) + @Test public void testNodeDecommision() throws Exception { nm = getNodeManager(NodeAction.SHUTDOWN); YarnConfiguration conf = createNMConfig(); @@ -1024,7 +898,7 @@ public class TestNodeStatusUpdater { NodeHealthCheckerService healthChecker); } - @Test(timeout = 60000) + @Test public void testNMShutdownForRegistrationFailure() throws Exception { nm = new NodeManagerWithCustomNodeStatusUpdater() { @@ -1137,7 +1011,7 @@ public class TestNodeStatusUpdater { * started properly, RM will think that the NM is alive and will retire the NM * only after NM_EXPIRY interval. See MAPREDUCE-2749. */ - @Test(timeout = 60000) + @Test public void testNoRegistrationWhenNMServicesFail() throws Exception { nm = new NodeManager() { @@ -1168,7 +1042,7 @@ public class TestNodeStatusUpdater { verifyNodeStartFailure("Starting of RPC Server failed"); } - @Test(timeout = 60000) + @Test public void testApplicationKeepAlive() throws Exception { MyNodeManager nm = new MyNodeManager(); try {