Reverting YARN-245 to fix a critical bug.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1508277 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-07-30 03:06:54 +00:00
parent f593d57296
commit 3ca892dc0a
4 changed files with 8 additions and 154 deletions

View File

@ -762,9 +762,6 @@ Release 2.1.0-beta - 2013-07-02
YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu) YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu)
YARN-245. Fixed NodeManager to handle duplicate responses from
ResourceManager. (Mayank Bansal via vinodkv)
YARN-932. TestResourceLocalizationService.testLocalizationInit can fail on YARN-932. TestResourceLocalizationService.testLocalizationInit can fail on
JDK7. (Karthik Kambatla via Sandy Ryza) JDK7. (Karthik Kambatla via Sandy Ryza)

View File

@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
@ -159,7 +158,7 @@ public class NodeManager extends CompositeService
addService(del); addService(del);
// NodeManager level dispatcher // NodeManager level dispatcher
this.dispatcher = (AsyncDispatcher) createDispatcher(); this.dispatcher = new AsyncDispatcher();
nodeHealthChecker = new NodeHealthCheckerService(); nodeHealthChecker = new NodeHealthCheckerService();
addService(nodeHealthChecker); addService(nodeHealthChecker);
@ -204,16 +203,6 @@ public class NodeManager extends CompositeService
// TODO add local dirs to del // TODO add local dirs to del
} }
@Private
protected Dispatcher createDispatcher(){
return new AsyncDispatcher();
}
@Private
public Dispatcher getDispatcher(){
return this.dispatcher;
}
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
try { try {

View File

@ -369,13 +369,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
.setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey()); .getNMTokenSecretManager().getCurrentKey());
response = resourceTracker.nodeHeartbeat(request); response = resourceTracker.nodeHeartbeat(request);
// Checking if the response id is the same which we just processed
// If yes then ignore the update.
if (lastHeartBeatID != response.getResponseId() - 1) {
LOG.info("Discarding the duplicate response "
+ response.getResponseId());
continue;
}
//get next heartbeat interval from response //get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval(); nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response); updateMasterKeys(response);
@ -402,6 +395,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
new NodeManagerEvent(NodeManagerEventType.RESYNC)); new NodeManagerEvent(NodeManagerEventType.RESYNC));
break; break;
} }
lastHeartBeatID = response.getResponseId(); lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response List<ContainerId> containersToCleanup = response
.getContainersToCleanup(); .getContainersToCleanup();

View File

@ -59,9 +59,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -433,26 +431,6 @@ public class TestNodeStatusUpdater {
} }
} }
private class MyNodeManager7 extends NodeManager {
private ResourceTracker resourceTracker;
private MyNodeStatusUpdater3 nodeStatusUpdater;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
this.nodeStatusUpdater =
new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics);
resourceTracker = new MyResourceTracker7(context);
this.nodeStatusUpdater.resourceTracker = resourceTracker;
return this.nodeStatusUpdater;
}
protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
return this.nodeStatusUpdater;
}
}
private class MyNodeManager2 extends NodeManager { private class MyNodeManager2 extends NodeManager {
public boolean isStopped = false; public boolean isStopped = false;
private NodeStatusUpdater nodeStatusUpdater; private NodeStatusUpdater nodeStatusUpdater;
@ -574,68 +552,6 @@ public class TestNodeStatusUpdater {
} }
} }
private class MyResourceTracker7 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL;
private final Context context;
private int lastRequestedHeartBeat = 0;
private boolean gotDuplicateHeartBeatRequest = false;
private ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
MyResourceTracker7(Context context) {
this.context = context;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, IOException {
RegisterNodeManagerResponse response =
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction);
response.setContainerTokenMasterKey(createMasterKey());
response.setNMTokenMasterKey(createMasterKey());
return response;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
if (lastRequestedHeartBeat != 0
&& lastRequestedHeartBeat == request.getNodeStatus().getResponseId()) {
LOG.info("GOT Duplicate heartbeatId "
+ request.getNodeStatus().getResponseId());
gotDuplicateHeartBeatRequest = true;
}
lastRequestedHeartBeat = request.getNodeStatus().getResponseId();
LOG.info("Got heartBeatId: [" + heartBeatID + "]");
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++);
NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
heartBeatNodeAction, null, null, null, null, 1000L);
if (heartBeatID == 5) {
LOG.info("Sending FINISH_APP for application: [" + appId + "]");
this.context.getApplications().put(appId, mock(Application.class));
nhResponse
.addAllApplicationsToCleanup(Collections.singletonList(appId));
}
if (heartBeatID == 6) {
nhResponse.setResponseId(5);
LOG.info("Sending FINISH_APP for application: [" + appId + "]");
this.context.getApplications().put(appId, mock(Application.class));
nhResponse
.addAllApplicationsToCleanup(Collections.singletonList(appId));
}
return nhResponse;
}
public boolean isGotDuplicateHeartBeatRequest() {
return gotDuplicateHeartBeatRequest;
}
}
private class MyResourceTracker4 implements ResourceTracker { private class MyResourceTracker4 implements ResourceTracker {
public NodeAction registerNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL;
@ -829,7 +745,7 @@ public class TestNodeStatusUpdater {
lfs.delete(new Path(basedir.getPath()), true); lfs.delete(new Path(basedir.getPath()), true);
} }
@Test(timeout = 60000) @Test
public void testNMRegistration() throws InterruptedException { public void testNMRegistration() throws InterruptedException {
nm = new NodeManager() { nm = new NodeManager() {
@Override @Override
@ -889,7 +805,7 @@ public class TestNodeStatusUpdater {
nm.stop(); nm.stop();
} }
@Test(timeout = 60000) @Test
public void testStopReentrant() throws Exception { public void testStopReentrant() throws Exception {
final AtomicInteger numCleanups = new AtomicInteger(0); final AtomicInteger numCleanups = new AtomicInteger(0);
nm = new NodeManager() { nm = new NodeManager() {
@ -935,49 +851,7 @@ public class TestNodeStatusUpdater {
Assert.assertEquals(numCleanups.get(), 1); Assert.assertEquals(numCleanups.get(), 1);
} }
@SuppressWarnings("rawtypes") @Test
class MyDispatcher7 extends AsyncDispatcher {
public volatile int finishapp_event;
protected void dispatch(Event event) {
if (event.getType().name()
.equals(ContainerManagerEventType.FINISH_APPS.toString())) {
++finishapp_event;
}
}
}
@Test(timeout = 60000)
public void testDuplicateResponseFromRM() throws Exception {
MyNodeManager7 nm = new MyNodeManager7() {
protected Dispatcher createDispatcher() {
return new MyDispatcher7();
}
};
try {
YarnConfiguration conf = createNMConfig();
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 4000l);
nm.init(conf);
nm.start();
MyResourceTracker7 rt =
(MyResourceTracker7) nm.getNodeStatusUpdater().getRMClient();
while (heartBeatID < 7) {
Thread.sleep(1000l);
}
Assert.assertTrue(rt.isGotDuplicateHeartBeatRequest());
MyDispatcher7 nmdispatcher = (MyDispatcher7) nm.getDispatcher();
// We are sending two FINISH_APPS in heartbeat 5 and 6
// Checking we get only one time FINISH_APPS event which is the first one
Assert.assertEquals(1, nmdispatcher.finishapp_event);
} finally {
if (nm.getServiceState() == STATE.STARTED)
nm.stop();
}
}
@Test(timeout = 60000)
public void testNodeDecommision() throws Exception { public void testNodeDecommision() throws Exception {
nm = getNodeManager(NodeAction.SHUTDOWN); nm = getNodeManager(NodeAction.SHUTDOWN);
YarnConfiguration conf = createNMConfig(); YarnConfiguration conf = createNMConfig();
@ -1024,7 +898,7 @@ public class TestNodeStatusUpdater {
NodeHealthCheckerService healthChecker); NodeHealthCheckerService healthChecker);
} }
@Test(timeout = 60000) @Test
public void testNMShutdownForRegistrationFailure() throws Exception { public void testNMShutdownForRegistrationFailure() throws Exception {
nm = new NodeManagerWithCustomNodeStatusUpdater() { nm = new NodeManagerWithCustomNodeStatusUpdater() {
@ -1137,7 +1011,7 @@ public class TestNodeStatusUpdater {
* started properly, RM will think that the NM is alive and will retire the NM * started properly, RM will think that the NM is alive and will retire the NM
* only after NM_EXPIRY interval. See MAPREDUCE-2749. * only after NM_EXPIRY interval. See MAPREDUCE-2749.
*/ */
@Test(timeout = 60000) @Test
public void testNoRegistrationWhenNMServicesFail() throws Exception { public void testNoRegistrationWhenNMServicesFail() throws Exception {
nm = new NodeManager() { nm = new NodeManager() {
@ -1168,7 +1042,7 @@ public class TestNodeStatusUpdater {
verifyNodeStartFailure("Starting of RPC Server failed"); verifyNodeStartFailure("Starting of RPC Server failed");
} }
@Test(timeout = 60000) @Test
public void testApplicationKeepAlive() throws Exception { public void testApplicationKeepAlive() throws Exception {
MyNodeManager nm = new MyNodeManager(); MyNodeManager nm = new MyNodeManager();
try { try {