YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port information once an AM crashes. Contributed by Jian He.

svn merge --ignore-ancestry -c 1553772 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1553773 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-12-28 01:10:02 +00:00
parent 05ff546482
commit ee20593b64
4 changed files with 133 additions and 4 deletions

View File

@ -173,6 +173,9 @@ Release 2.4.0 - UNRELEASED
YARN-1523. Use StandbyException instead of RMNotYetReadyException (kasha) YARN-1523. Use StandbyException instead of RMNotYetReadyException (kasha)
YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port
information once an AM crashes. (Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -139,7 +139,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private float progress = 0; private float progress = 0;
private String host = "N/A"; private String host = "N/A";
private int rpcPort; private int rpcPort = -1;
private String originalTrackingUrl = "N/A"; private String originalTrackingUrl = "N/A";
private String proxiedTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A";
private long startTime = 0; private long startTime = 0;
@ -526,6 +526,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
proxiedTrackingUrl = originalTrackingUrl; proxiedTrackingUrl = originalTrackingUrl;
} }
private void invalidateAMHostAndPort() {
this.host = "N/A";
this.rpcPort = -1;
}
// This is only used for RMStateStore. Normal operation must invoke the secret // This is only used for RMStateStore. Normal operation must invoke the secret
// manager to get the key and not use the local key directly. // manager to get the key and not use the local key directly.
@Override @Override
@ -1033,6 +1038,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
{ {
// don't leave the tracking URL pointing to a non-existent AM // don't leave the tracking URL pointing to a non-existent AM
appAttempt.setTrackingUrlToRMAppPage(); appAttempt.setTrackingUrlToRMAppPage();
appAttempt.invalidateAMHostAndPort();
appEvent = appEvent =
new RMAppFailedAttemptEvent(applicationId, new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_KILLED, RMAppEventType.ATTEMPT_KILLED,
@ -1043,6 +1049,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
{ {
// don't leave the tracking URL pointing to a non-existent AM // don't leave the tracking URL pointing to a non-existent AM
appAttempt.setTrackingUrlToRMAppPage(); appAttempt.setTrackingUrlToRMAppPage();
appAttempt.invalidateAMHostAndPort();
appEvent = appEvent =
new RMAppFailedAttemptEvent(applicationId, new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_FAILED,
@ -1059,7 +1066,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.eventHandler.handle(appEvent); appAttempt.eventHandler.handle(appEvent);
appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent( appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
appAttemptId, finalAttemptState)); appAttemptId, finalAttemptState));
appAttempt.removeCredentials(appAttempt); appAttempt.removeCredentials(appAttempt);
} }
} }

View File

@ -19,26 +19,33 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import javax.security.auth.login.Configuration;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@ -368,6 +375,111 @@ public class TestRM {
rm1.stop(); rm1.stop();
} }
// This is to test AM Host and rpc port are invalidated after the am attempt
// is killed or failed, so that client doesn't get the wrong information.
@Test (timeout = 80000)
public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MockRM rm1 = new MockRM(conf);
rm1.start();
// a succeeded app
RMApp app1 = rm1.submitApp(200);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = launchAM(app1, rm1, nm1);
finishApplicationMaster(app1, rm1, nm1, am1);
// a failed app
RMApp app2 = rm1.submitApp(200);
MockAM am2 = launchAM(app2, rm1, nm1);
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
// a killed app
RMApp app3 = rm1.submitApp(200);
MockAM am3 = launchAM(app3, rm1, nm1);
rm1.killApp(app3.getApplicationId());
rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
GetApplicationsRequest request1 =
GetApplicationsRequest.newInstance(EnumSet.of(
YarnApplicationState.FINISHED, YarnApplicationState.KILLED,
YarnApplicationState.FAILED));
GetApplicationsResponse response1 =
rm1.getClientRMService().getApplications(request1);
List<ApplicationReport> appList1 = response1.getApplicationList();
Assert.assertEquals(3, appList1.size());
for (ApplicationReport report : appList1) {
// killed/failed apps host and rpc port are invalidated.
if (report.getApplicationId().equals(app2.getApplicationId())
|| report.getApplicationId().equals(app3.getApplicationId())) {
Assert.assertEquals("N/A", report.getHost());
Assert.assertEquals(-1, report.getRpcPort());
}
// succeeded app's host and rpc port is not invalidated
if (report.getApplicationId().equals(app1.getApplicationId())) {
Assert.assertFalse(report.getHost().equals("N/A"));
Assert.assertTrue(report.getRpcPort() != -1);
}
}
}
@Test (timeout = 60000)
public void testInvalidatedAMHostPortOnAMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// a failed app
RMApp app2 = rm1.submitApp(200);
MockAM am2 = launchAM(app2, rm1, nm1);
nm1
.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
// before new attempt is launched, the app report returns the invalid AM
// host and port.
GetApplicationReportRequest request1 =
GetApplicationReportRequest.newInstance(app2.getApplicationId());
ApplicationReport report1 =
rm1.getClientRMService().getApplicationReport(request1)
.getApplicationReport();
Assert.assertEquals("N/A", report1.getHost());
Assert.assertEquals(-1, report1.getRpcPort());
}
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
return am;
}
private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
MockAM am) throws Exception {
FinishApplicationMasterRequest req =
FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.SUCCEEDED, "", "");
am.unregisterAppAttempt(req);
am.waitForState(RMAppAttemptState.FINISHING);
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestRM t = new TestRM(); TestRM t = new TestRM();
t.testGetNewAppId(); t.testGetNewAppId();

View File

@ -806,6 +806,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptId().getApplicationId()); applicationAttempt.getAppAttemptId().getApplicationId());
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyAMHostAndPortInvalidated();
} }
@Test @Test
@ -841,6 +842,7 @@ public class TestRMAppAttemptTransitions {
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyAMHostAndPortInvalidated();
} }
@Test(timeout=10000) @Test(timeout=10000)
@ -878,6 +880,7 @@ public class TestRMAppAttemptTransitions {
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyAMHostAndPortInvalidated();
} }
@Test @Test
@ -1125,4 +1128,9 @@ public class TestRMAppAttemptTransitions {
verify(store, times(1)).updateApplicationAttemptState( verify(store, times(1)).updateApplicationAttemptState(
any(ApplicationAttemptState.class)); any(ApplicationAttemptState.class));
} }
private void verifyAMHostAndPortInvalidated() {
assertEquals("N/A", applicationAttempt.getHost());
assertEquals(-1, applicationAttempt.getRpcPort());
}
} }