YARN-369. Handle ( or throw a proper error when receiving) status updates from application masters that have not registered (Mayank Bansal & Abhishek Kapoor via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1501605 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-07-09 23:14:27 +00:00
parent e2cce7bd26
commit 15ce82b9c5
5 changed files with 178 additions and 13 deletions

View File

@ -671,6 +671,10 @@ Release 2.1.0-beta - 2013-07-02
YARN-845. RM crash with NPE on NODE_UPDATE (Mayank Bansal via bikas) YARN-845. RM crash with NPE on NODE_UPDATE (Mayank Bansal via bikas)
YARN-369. Handle ( or throw a proper error when receiving) status updates
from application masters that have not registered (Mayank Bansal &
Abhishek Kapoor via bikas)
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh. YARN-158. Yarn creating package-info.java must not depend on sh.

View File

@ -195,17 +195,35 @@ public class ApplicationMasterService extends AbstractService implements
// Allow only one thread in AM to do registerApp at a time. // Allow only one thread in AM to do registerApp at a time.
synchronized (lastResponse) { synchronized (lastResponse) {
LOG.info("AM registration " + applicationAttemptId); if (hasApplicationMasterRegistered(applicationAttemptId)) {
String message =
"Application Master is already registered : "
+ applicationAttemptId.getApplicationId();
LOG.warn(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps()
.get(applicationAttemptId.getApplicationId()).getUser(),
AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
applicationAttemptId.getApplicationId(), applicationAttemptId);
throw new InvalidApplicationMasterRequestException(message);
}
this.amLivelinessMonitor.receivedPing(applicationAttemptId); this.amLivelinessMonitor.receivedPing(applicationAttemptId);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptRegistrationEvent(applicationAttemptId, request
.getHost(), request.getRpcPort(), request.getTrackingUrl()));
RMApp app = this.rmContext.getRMApps().get(appID); RMApp app = this.rmContext.getRMApps().get(appID);
RMAuditLogger.logSuccess(app.getUser(),
AuditConstants.REGISTER_AM, "ApplicationMasterService", appID, // Setting the response id to 0 to identify if the
applicationAttemptId); // application master is register for the respective attemptid
lastResponse.setResponseId(0);
responseMap.put(applicationAttemptId, lastResponse);
LOG.info("AM registration " + applicationAttemptId);
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppAttemptRegistrationEvent(applicationAttemptId, request
.getHost(), request.getRpcPort(), request.getTrackingUrl()));
RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM,
"ApplicationMasterService", appID, applicationAttemptId);
// Pick up min/max resource from scheduler... // Pick up min/max resource from scheduler...
RegisterApplicationMasterResponse response = recordFactory RegisterApplicationMasterResponse response = recordFactory
@ -257,6 +275,24 @@ public class ApplicationMasterService extends AbstractService implements
} }
} }
/**
* @param appAttemptId
* @return true if application is registered for the respective attemptid
*/
public boolean hasApplicationMasterRegistered(
ApplicationAttemptId appAttemptId) {
boolean hasApplicationMasterRegistered = false;
AllocateResponse lastResponse = responseMap.get(appAttemptId);
if (lastResponse != null) {
synchronized (lastResponse) {
if (lastResponse.getResponseId() >= 0) {
hasApplicationMasterRegistered = true;
}
}
}
return hasApplicationMasterRegistered;
}
@Override @Override
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException { throws YarnException, IOException {
@ -272,6 +308,20 @@ public class ApplicationMasterService extends AbstractService implements
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
return resync; return resync;
} }
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
"Application Master is trying to allocate before registering for: "
+ appAttemptId.getApplicationId();
LOG.error(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
.getUser(), AuditConstants.REGISTER_AM, "",
"ApplicationMasterService", message, appAttemptId.getApplicationId(),
appAttemptId);
throw new InvalidApplicationMasterRequestException(message);
}
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */ /* old heartbeat */
return lastResponse; return lastResponse;
@ -442,7 +492,9 @@ public class ApplicationMasterService extends AbstractService implements
public void registerAppAttempt(ApplicationAttemptId attemptId) { public void registerAppAttempt(ApplicationAttemptId attemptId) {
AllocateResponse response = AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class); recordFactory.newRecordInstance(AllocateResponse.class);
response.setResponseId(0); // set response id to -1 before application master for the following
// attemptID get registered
response.setResponseId(-1);
LOG.info("Registering app attempt : " + attemptId); LOG.info("Registering app attempt : " + attemptId);
responseMap.put(attemptId, response); responseMap.put(attemptId, response);
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* The exception is thrown when an application Master call allocate without
* calling RegisterApplicationMaster or try to register more then once.
*/
public class InvalidApplicationMasterRequestException extends YarnException {
private static final long serialVersionUID = 1357686L;
public InvalidApplicationMasterRequestException(Throwable cause) {
super(cause);
}
public InvalidApplicationMasterRequestException(String message) {
super(message);
}
public InvalidApplicationMasterRequestException(String message,
Throwable cause) {
super(message, cause);
}
}

View File

@ -78,10 +78,19 @@ public class MockAM {
finalState, attempt.getAppAttemptState()); finalState, attempt.getAppAttemptState());
} }
public RegisterApplicationMasterResponse registerAppAttempt() throws Exception { public RegisterApplicationMasterResponse registerAppAttempt()
waitForState(RMAppAttemptState.LAUNCHED); throws Exception {
return registerAppAttempt(true);
}
public RegisterApplicationMasterResponse registerAppAttempt(boolean wait)
throws Exception {
if (wait) {
waitForState(RMAppAttemptState.LAUNCHED);
}
responseId = 0; responseId = 0;
RegisterApplicationMasterRequest req = Records.newRecord(RegisterApplicationMasterRequest.class); RegisterApplicationMasterRequest req =
Records.newRecord(RegisterApplicationMasterRequest.class);
req.setApplicationAttemptId(attemptId); req.setApplicationAttemptId(attemptId);
req.setHost(""); req.setHost("");
req.setRpcPort(1); req.setRpcPort(1);

View File

@ -19,12 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -166,4 +169,58 @@ public class TestApplicationMasterLauncher {
am.waitForState(RMAppAttemptState.FINISHED); am.waitForState(RMAppAttemptState.FINISHED);
rm.stop(); rm.stop();
} }
@SuppressWarnings("unused")
@Test(timeout = 100000)
public void testallocateBeforeAMRegistration() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
boolean thrown = false;
rootLogger.setLevel(Level.DEBUG);
MockRM rm = new MockRM();
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 5000);
RMApp app = rm.submitApp(2000);
// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt = app.getCurrentAppAttempt();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
// request for containers
int request = 2;
try {
AllocateResponse ar =
am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
} catch (Exception e) {
Assert.assertEquals("Application Master is trying to allocate before "
+ "registering for: " + attempt.getAppAttemptId().getApplicationId(),
e.getMessage());
thrown = true;
}
// kick the scheduler
nm1.nodeHeartbeat(true);
try {
AllocateResponse amrs =
am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
} catch (Exception e) {
Assert.assertEquals("Application Master is trying to allocate before "
+ "registering for: " + attempt.getAppAttemptId().getApplicationId(),
e.getMessage());
thrown = true;
}
Assert.assertTrue(thrown);
am.registerAppAttempt();
thrown = false;
try {
am.registerAppAttempt(false);
}
catch (Exception e) {
Assert.assertEquals("Application Master is already registered : "
+ attempt.getAppAttemptId().getApplicationId(),
e.getMessage());
thrown = true;
}
Assert.assertTrue(thrown);
}
} }