Merge r1574623 from trunk to branch-2.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1574625 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5e096ccac7
commit
296ad40766
|
@ -244,6 +244,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
YARN-1785. FairScheduler treats app lookup failures as ERRORs.
|
YARN-1785. FairScheduler treats app lookup failures as ERRORs.
|
||||||
(bc Wong via kasha)
|
(bc Wong via kasha)
|
||||||
|
|
||||||
|
YARN-1752. Fixed ApplicationMasterService to reject unregister request if
|
||||||
|
AM did not register before. (Rohith Sharma via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -24,10 +24,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This exception is thrown when an ApplicationMaster asks for resources by
|
* This exception is thrown when an ApplicationMaster asks for resources by
|
||||||
* calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} API
|
* calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries
|
||||||
* without first registering by calling
|
* to unregister by calling
|
||||||
|
* {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)}
|
||||||
|
* API without first registering by calling
|
||||||
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
|
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
|
||||||
* or if it tries to register more then once.
|
* or if it tries to register more than once.
|
||||||
*/
|
*/
|
||||||
public class InvalidApplicationMasterRequestException extends YarnException {
|
public class InvalidApplicationMasterRequestException extends YarnException {
|
||||||
|
|
||||||
|
|
|
@ -332,6 +332,19 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
|
|
||||||
// Allow only one thread in AM to do finishApp at a time.
|
// Allow only one thread in AM to do finishApp at a time.
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
|
if (!hasApplicationMasterRegistered(applicationAttemptId)) {
|
||||||
|
String message =
|
||||||
|
"Application Master is trying to unregister before registering for: "
|
||||||
|
+ applicationAttemptId.getApplicationId();
|
||||||
|
LOG.error(message);
|
||||||
|
RMAuditLogger.logFailure(
|
||||||
|
this.rmContext.getRMApps()
|
||||||
|
.get(applicationAttemptId.getApplicationId()).getUser(),
|
||||||
|
AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
|
||||||
|
message, applicationAttemptId.getApplicationId(),
|
||||||
|
applicationAttemptId);
|
||||||
|
throw new InvalidApplicationMasterRequestException(message);
|
||||||
|
}
|
||||||
|
|
||||||
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ public class RMAuditLogger {
|
||||||
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
|
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
|
||||||
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
|
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
|
||||||
public static final String REGISTER_AM = "Register App Master";
|
public static final String REGISTER_AM = "Register App Master";
|
||||||
|
public static final String UNREGISTER_AM = "Unregister App Master";
|
||||||
public static final String ALLOC_CONTAINER = "AM Allocated Container";
|
public static final String ALLOC_CONTAINER = "AM Allocated Container";
|
||||||
public static final String RELEASE_CONTAINER = "AM Released Container";
|
public static final String RELEASE_CONTAINER = "AM Released Container";
|
||||||
|
|
||||||
|
|
|
@ -225,12 +225,14 @@ public class MockAM {
|
||||||
final FinishApplicationMasterRequest req =
|
final FinishApplicationMasterRequest req =
|
||||||
FinishApplicationMasterRequest.newInstance(
|
FinishApplicationMasterRequest.newInstance(
|
||||||
FinalApplicationStatus.SUCCEEDED, "", "");
|
FinalApplicationStatus.SUCCEEDED, "", "");
|
||||||
unregisterAppAttempt(req);
|
unregisterAppAttempt(req,true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unregisterAppAttempt(final FinishApplicationMasterRequest req)
|
public void unregisterAppAttempt(final FinishApplicationMasterRequest req,
|
||||||
throws Exception {
|
boolean waitForStateRunning) throws Exception {
|
||||||
|
if (waitForStateRunning) {
|
||||||
waitForState(RMAppAttemptState.RUNNING);
|
waitForState(RMAppAttemptState.RUNNING);
|
||||||
|
}
|
||||||
UserGroupInformation ugi =
|
UserGroupInformation ugi =
|
||||||
UserGroupInformation.createRemoteUser(attemptId.toString());
|
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||||
Token<AMRMTokenIdentifier> token =
|
Token<AMRMTokenIdentifier> token =
|
||||||
|
|
|
@ -477,7 +477,7 @@ public class MockRM extends ResourceManager {
|
||||||
FinishApplicationMasterRequest req =
|
FinishApplicationMasterRequest req =
|
||||||
FinishApplicationMasterRequest.newInstance(
|
FinishApplicationMasterRequest.newInstance(
|
||||||
FinalApplicationStatus.SUCCEEDED, "", "");
|
FinalApplicationStatus.SUCCEEDED, "", "");
|
||||||
am.unregisterAppAttempt(req);
|
am.unregisterAppAttempt(req,true);
|
||||||
am.waitForState(RMAppAttemptState.FINISHING);
|
am.waitForState(RMAppAttemptState.FINISHING);
|
||||||
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
am.waitForState(RMAppAttemptState.FINISHED);
|
am.waitForState(RMAppAttemptState.FINISHED);
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||||
|
@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
|
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
|
@ -246,4 +248,39 @@ public class TestApplicationMasterService {
|
||||||
sleep(100);
|
sleep(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=1200000)
|
||||||
|
public void testFinishApplicationMasterBeforeRegistering() throws Exception {
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
try {
|
||||||
|
rm.start();
|
||||||
|
// Register node1
|
||||||
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||||
|
// Submit an application
|
||||||
|
RMApp app1 = rm.submitApp(2048);
|
||||||
|
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
|
||||||
|
FinishApplicationMasterRequest req =
|
||||||
|
FinishApplicationMasterRequest.newInstance(
|
||||||
|
FinalApplicationStatus.FAILED, "", "");
|
||||||
|
Throwable cause = null;
|
||||||
|
try {
|
||||||
|
am1.unregisterAppAttempt(req, false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
cause = e.getCause();
|
||||||
|
}
|
||||||
|
Assert.assertNotNull(cause);
|
||||||
|
Assert
|
||||||
|
.assertTrue(cause instanceof InvalidApplicationMasterRequestException);
|
||||||
|
Assert.assertNotNull(cause.getMessage());
|
||||||
|
Assert
|
||||||
|
.assertTrue(cause
|
||||||
|
.getMessage()
|
||||||
|
.contains(
|
||||||
|
"Application Master is trying to unregister before registering for:"));
|
||||||
|
} finally {
|
||||||
|
if (rm != null) {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -928,7 +928,7 @@ public class TestRMRestart {
|
||||||
((MemoryRMStateStore) rm.getRMContext().getStateStore()).getState();
|
((MemoryRMStateStore) rm.getRMContext().getStateStore()).getState();
|
||||||
Map<ApplicationId, ApplicationState> rmAppState =
|
Map<ApplicationId, ApplicationState> rmAppState =
|
||||||
rmState.getApplicationState();
|
rmState.getApplicationState();
|
||||||
am.unregisterAppAttempt(req);
|
am.unregisterAppAttempt(req,true);
|
||||||
am.waitForState(RMAppAttemptState.FINISHING);
|
am.waitForState(RMAppAttemptState.FINISHING);
|
||||||
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
am.waitForState(RMAppAttemptState.FINISHED);
|
am.waitForState(RMAppAttemptState.FINISHED);
|
||||||
|
|
Loading…
Reference in New Issue