YARN-1752. Fixed ApplicationMasterService to reject unregister request if AM did not register before. Contributed by Rohith Sharma.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1574623 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-03-05 19:04:58 +00:00
parent 206e2b0cd2
commit 8cd23c5b48
8 changed files with 68 additions and 10 deletions

View File

@ -259,6 +259,9 @@ Release 2.4.0 - UNRELEASED
YARN-1785. FairScheduler treats app lookup failures as ERRORs.
(bc Wong via kasha)
YARN-1752. Fixed ApplicationMasterService to reject unregister request if
AM did not register before. (Rohith Sharma via jianhe)
OPTIMIZATIONS
BUG FIXES

View File

@ -24,10 +24,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
/**
* This exception is thrown when an ApplicationMaster asks for resources by
* calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} API
* without first registering by calling
* calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries
* to unregister by calling
* {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)}
* API without first registering by calling
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
* or if it tries to register more then once.
* or if it tries to register more than once.
*/
public class InvalidApplicationMasterRequestException extends YarnException {

View File

@ -332,6 +332,19 @@ public class ApplicationMasterService extends AbstractService implements
// Allow only one thread in AM to do finishApp at a time.
synchronized (lock) {
if (!hasApplicationMasterRegistered(applicationAttemptId)) {
String message =
"Application Master is trying to unregister before registering for: "
+ applicationAttemptId.getApplicationId();
LOG.error(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps()
.get(applicationAttemptId.getApplicationId()).getUser(),
AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
message, applicationAttemptId.getApplicationId(),
applicationAttemptId);
throw new InvalidApplicationMasterRequestException(message);
}
this.amLivelinessMonitor.receivedPing(applicationAttemptId);

View File

@ -50,6 +50,7 @@ public class RMAuditLogger {
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
public static final String REGISTER_AM = "Register App Master";
public static final String UNREGISTER_AM = "Unregister App Master";
public static final String ALLOC_CONTAINER = "AM Allocated Container";
public static final String RELEASE_CONTAINER = "AM Released Container";

View File

@ -225,17 +225,19 @@ public class MockAM {
final FinishApplicationMasterRequest req =
FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.SUCCEEDED, "", "");
unregisterAppAttempt(req);
unregisterAppAttempt(req,true);
}
public void unregisterAppAttempt(final FinishApplicationMasterRequest req)
throws Exception {
waitForState(RMAppAttemptState.RUNNING);
public void unregisterAppAttempt(final FinishApplicationMasterRequest req,
boolean waitForStateRunning) throws Exception {
if (waitForStateRunning) {
waitForState(RMAppAttemptState.RUNNING);
}
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token =
context.getRMApps().get(attemptId.getApplicationId())
.getRMAppAttempt(attemptId).getAMRMToken();
.getRMAppAttempt(attemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override

View File

@ -477,7 +477,7 @@ public class MockRM extends ResourceManager {
FinishApplicationMasterRequest req =
FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.SUCCEEDED, "", "");
am.unregisterAppAttempt(req);
am.unregisterAppAttempt(req,true);
am.waitForState(RMAppAttemptState.FINISHING);
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);

View File

@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@ -246,4 +248,39 @@ public class TestApplicationMasterService {
sleep(100);
}
}
@Test(timeout=1200000)
public void testFinishApplicationMasterBeforeRegistering() throws Exception {
MockRM rm = new MockRM(conf);
try {
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
// Submit an application
RMApp app1 = rm.submitApp(2048);
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
FinishApplicationMasterRequest req =
FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.FAILED, "", "");
Throwable cause = null;
try {
am1.unregisterAppAttempt(req, false);
} catch (Exception e) {
cause = e.getCause();
}
Assert.assertNotNull(cause);
Assert
.assertTrue(cause instanceof InvalidApplicationMasterRequestException);
Assert.assertNotNull(cause.getMessage());
Assert
.assertTrue(cause
.getMessage()
.contains(
"Application Master is trying to unregister before registering for:"));
} finally {
if (rm != null) {
rm.stop();
}
}
}
}

View File

@ -928,7 +928,7 @@ public class TestRMRestart {
((MemoryRMStateStore) rm.getRMContext().getStateStore()).getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
am.unregisterAppAttempt(req);
am.unregisterAppAttempt(req,true);
am.waitForState(RMAppAttemptState.FINISHING);
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED);