Merge r1580077 from trunk. YARN-1849. Fixed NPE in ResourceTrackerService#registerNodeManager for UAM. Contributed by Karthik Kambatla

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1580078 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-03-21 22:57:09 +00:00
parent 444af8fac5
commit b701a84d37
4 changed files with 107 additions and 38 deletions

View File

@ -524,6 +524,9 @@ Release 2.4.0 - UNRELEASED
YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write
more log-data than the log-length that it records. (Mit Desai via vinodk) more log-data than the log-length that it records. (Mit Desai via vinodk)
YARN-1849. Fixed NPE in ResourceTrackerService#registerNodeManager for UAM
(Karthik Kambatla via jianhe )
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -187,12 +188,51 @@ public class ResourceTrackerService extends AbstractService implements
super.serviceStop(); super.serviceStop();
} }
/**
* Helper method to handle received ContainerStatus. If this corresponds to
* the completion of a master-container of a managed AM,
* we call the handler for RMAppAttemptContainerFinishedEvent.
*/
@SuppressWarnings("unchecked")
@VisibleForTesting
void handleContainerStatus(ContainerStatus containerStatus) {
ApplicationAttemptId appAttemptId =
containerStatus.getContainerId().getApplicationAttemptId();
RMApp rmApp =
rmContext.getRMApps().get(appAttemptId.getApplicationId());
if (rmApp == null) {
LOG.error("Received finished container : "
+ containerStatus.getContainerId()
+ "for unknown application " + appAttemptId.getApplicationId()
+ " Skipping.");
return;
}
if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring container completion status for unmanaged AM"
+ rmApp.getApplicationId());
}
return;
}
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
Container masterContainer = rmAppAttempt.getMasterContainer();
if (masterContainer.getId().equals(containerStatus.getContainerId())
&& containerStatus.getState() == ContainerState.COMPLETE) {
// sending master container finished event.
RMAppAttemptContainerFinishedEvent evt =
new RMAppAttemptContainerFinishedEvent(appAttemptId,
containerStatus);
rmContext.getDispatcher().getEventHandler().handle(evt);
}
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, RegisterNodeManagerRequest request) throws YarnException,
IOException { IOException {
NodeId nodeId = request.getNodeId(); NodeId nodeId = request.getNodeId();
String host = nodeId.getHost(); String host = nodeId.getHost();
int cmPort = nodeId.getPort(); int cmPort = nodeId.getPort();
@ -204,29 +244,7 @@ public class ResourceTrackerService extends AbstractService implements
LOG.info("received container statuses on node manager register :" LOG.info("received container statuses on node manager register :"
+ request.getContainerStatuses()); + request.getContainerStatuses());
for (ContainerStatus containerStatus : request.getContainerStatuses()) { for (ContainerStatus containerStatus : request.getContainerStatuses()) {
ApplicationAttemptId appAttemptId = handleContainerStatus(containerStatus);
containerStatus.getContainerId().getApplicationAttemptId();
RMApp rmApp =
rmContext.getRMApps().get(appAttemptId.getApplicationId());
if (rmApp != null) {
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
if (rmAppAttempt != null) {
if (rmAppAttempt.getMasterContainer().getId()
.equals(containerStatus.getContainerId())
&& containerStatus.getState() == ContainerState.COMPLETE) {
// sending master container finished event.
RMAppAttemptContainerFinishedEvent evt =
new RMAppAttemptContainerFinishedEvent(appAttemptId,
containerStatus);
rmContext.getDispatcher().getEventHandler().handle(evt);
}
}
} else {
LOG.error("Received finished container :"
+ containerStatus.getContainerId()
+ " for non existing application :"
+ appAttemptId.getApplicationId());
}
} }
} }
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory

View File

@ -35,9 +35,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -629,7 +631,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
} }
private void setMasterContainer(Container container) { @InterfaceAudience.Private
@VisibleForTesting
public void setMasterContainer(Container container) {
masterContainer = container; masterContainer = container;
} }

View File

@ -26,8 +26,6 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.MetricsSystem;
@ -45,21 +43,29 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
public class TestResourceTrackerService { public class TestResourceTrackerService {
@ -468,26 +474,64 @@ public class TestResourceTrackerService {
ClusterMetrics.getMetrics().getUnhealthyNMs()); ClusterMetrics.getMetrics().getUnhealthyNMs());
} }
@SuppressWarnings("unchecked")
@Test @Test
public void testNodeRegistrationWithContainers() throws Exception { public void testHandleContainerStatusInvalidCompletions() throws Exception {
rm = new MockRM(); rm = new MockRM(new YarnConfiguration());
rm.init(new YarnConfiguration());
rm.start(); rm.start();
RMApp app = rm.submitApp(1024);
MockNM nm = rm.registerNode("host1:1234", 8192); EventHandler handler =
nm.nodeHeartbeat(true); spy(rm.getRMContext().getDispatcher().getEventHandler());
// Register node with some container statuses // Case 1: Unmanaged AM
RMApp app = rm.submitApp(1024, true);
// Case 1.1: AppAttemptId is null
ContainerStatus status = ContainerStatus.newInstance( ContainerStatus status = ContainerStatus.newInstance(
ContainerId.newInstance(ApplicationAttemptId.newInstance( ContainerId.newInstance(ApplicationAttemptId.newInstance(
app.getApplicationId(), 2), 1), app.getApplicationId(), 2), 1),
ContainerState.COMPLETE, "Dummy Completed", 0); ContainerState.COMPLETE, "Dummy Completed", 0);
rm.getResourceTrackerService().handleContainerStatus(status);
verify(handler, never()).handle((Event) any());
// The following shouldn't throw NPE // Case 1.2: Master container is null
nm.registerNode(Collections.singletonList(status)); RMAppAttemptImpl currentAttempt =
assertEquals("Incorrect number of nodes", 1, (RMAppAttemptImpl) app.getCurrentAppAttempt();
rm.getRMContext().getRMNodes().size()); currentAttempt.setMasterContainer(null);
status = ContainerStatus.newInstance(
ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
ContainerState.COMPLETE, "Dummy Completed", 0);
rm.getResourceTrackerService().handleContainerStatus(status);
verify(handler, never()).handle((Event)any());
// Case 2: Managed AM
app = rm.submitApp(1024);
// Case 2.1: AppAttemptId is null
status = ContainerStatus.newInstance(
ContainerId.newInstance(ApplicationAttemptId.newInstance(
app.getApplicationId(), 2), 1),
ContainerState.COMPLETE, "Dummy Completed", 0);
try {
rm.getResourceTrackerService().handleContainerStatus(status);
} catch (Exception e) {
// expected - ignore
}
verify(handler, never()).handle((Event)any());
// Case 2.2: Master container is null
currentAttempt =
(RMAppAttemptImpl) app.getCurrentAppAttempt();
currentAttempt.setMasterContainer(null);
status = ContainerStatus.newInstance(
ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
ContainerState.COMPLETE, "Dummy Completed", 0);
try {
rm.getResourceTrackerService().handleContainerStatus(status);
} catch (Exception e) {
// expected - ignore
}
verify(handler, never()).handle((Event)any());
} }
@Test @Test