YARN-4747. AHS error 500 due to NPE when container start event is missing. Contributed by Varun Saxena
This commit is contained in:
parent
23248f63aa
commit
b2ed6ae731
|
@ -587,19 +587,22 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
NodeId allocatedNode = NodeId.newInstance(allocatedHost, allocatedPort);
|
|
||||||
ContainerId containerId =
|
ContainerId containerId =
|
||||||
ConverterUtils.toContainerId(entity.getEntityId());
|
ConverterUtils.toContainerId(entity.getEntityId());
|
||||||
String logUrl = WebAppUtils.getAggregatedLogURL(
|
String logUrl = null;
|
||||||
|
NodeId allocatedNode = null;
|
||||||
|
if (allocatedHost != null) {
|
||||||
|
allocatedNode = NodeId.newInstance(allocatedHost, allocatedPort);
|
||||||
|
logUrl = WebAppUtils.getAggregatedLogURL(
|
||||||
serverHttpAddress,
|
serverHttpAddress,
|
||||||
allocatedNode.toString(),
|
allocatedNode.toString(),
|
||||||
containerId.toString(),
|
containerId.toString(),
|
||||||
containerId.toString(),
|
containerId.toString(),
|
||||||
user);
|
user);
|
||||||
|
}
|
||||||
return ContainerReport.newInstance(
|
return ContainerReport.newInstance(
|
||||||
ConverterUtils.toContainerId(entity.getEntityId()),
|
ConverterUtils.toContainerId(entity.getEntityId()),
|
||||||
Resource.newInstance(allocatedMem, allocatedVcore),
|
Resource.newInstance(allocatedMem, allocatedVcore), allocatedNode,
|
||||||
NodeId.newInstance(allocatedHost, allocatedPort),
|
|
||||||
Priority.newInstance(allocatedPriority),
|
Priority.newInstance(allocatedPriority),
|
||||||
createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state,
|
createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state,
|
||||||
nodeHttpAddress);
|
nodeHttpAddress);
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
|
||||||
public class ContainerFinishedEvent extends SystemMetricsEvent {
|
public class ContainerFinishedEvent extends SystemMetricsEvent {
|
||||||
|
|
||||||
|
@ -27,17 +28,20 @@ public class ContainerFinishedEvent extends SystemMetricsEvent {
|
||||||
private String diagnosticsInfo;
|
private String diagnosticsInfo;
|
||||||
private int containerExitStatus;
|
private int containerExitStatus;
|
||||||
private ContainerState state;
|
private ContainerState state;
|
||||||
|
private NodeId allocatedNode;
|
||||||
|
|
||||||
public ContainerFinishedEvent(
|
public ContainerFinishedEvent(
|
||||||
ContainerId containerId,
|
ContainerId containerId,
|
||||||
String diagnosticsInfo,
|
String diagnosticsInfo,
|
||||||
int containerExitStatus,
|
int containerExitStatus,
|
||||||
ContainerState state,
|
ContainerState state,
|
||||||
long finishedTime) {
|
long finishedTime,
|
||||||
|
NodeId allocatedNode) {
|
||||||
super(SystemMetricsEventType.CONTAINER_FINISHED, finishedTime);
|
super(SystemMetricsEventType.CONTAINER_FINISHED, finishedTime);
|
||||||
this.containerId = containerId;
|
this.containerId = containerId;
|
||||||
this.diagnosticsInfo = diagnosticsInfo;
|
this.diagnosticsInfo = diagnosticsInfo;
|
||||||
this.containerExitStatus = containerExitStatus;
|
this.containerExitStatus = containerExitStatus;
|
||||||
|
this.allocatedNode = allocatedNode;
|
||||||
this.state = state;
|
this.state = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,4 +66,7 @@ public class ContainerFinishedEvent extends SystemMetricsEvent {
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public NodeId getAllocatedNode() {
|
||||||
|
return allocatedNode;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -218,7 +218,7 @@ public class SystemMetricsPublisher extends CompositeService {
|
||||||
container.getDiagnosticsInfo(),
|
container.getDiagnosticsInfo(),
|
||||||
container.getContainerExitStatus(),
|
container.getContainerExitStatus(),
|
||||||
container.getContainerState(),
|
container.getContainerState(),
|
||||||
finishedTime));
|
finishedTime, container.getAllocatedNode()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -479,6 +479,12 @@ public class SystemMetricsPublisher extends CompositeService {
|
||||||
event.getContainerExitStatus());
|
event.getContainerExitStatus());
|
||||||
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
|
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
|
||||||
event.getContainerState().toString());
|
event.getContainerState().toString());
|
||||||
|
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
|
||||||
|
event.getAllocatedNode().getHost());
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
|
||||||
|
event.getAllocatedNode().getPort());
|
||||||
|
entity.setOtherInfo(entityInfo);
|
||||||
tEvent.setEventInfo(eventInfo);
|
tEvent.setEventInfo(eventInfo);
|
||||||
entity.addEvent(tEvent);
|
entity.addEvent(tEvent);
|
||||||
putEntity(entity);
|
putEntity(entity);
|
||||||
|
|
|
@ -344,6 +344,36 @@ public class TestSystemMetricsPublisher {
|
||||||
Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent);
|
Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testPublishHostPortInfoOnContainerFinished() throws Exception {
|
||||||
|
ContainerId containerId =
|
||||||
|
ContainerId.newContainerId(ApplicationAttemptId.newInstance(
|
||||||
|
ApplicationId.newInstance(0, 1), 1), 1);
|
||||||
|
RMContainer container = createRMContainer(containerId);
|
||||||
|
metricsPublisher.containerFinished(container, container.getFinishTime());
|
||||||
|
TimelineEntity entity = null;
|
||||||
|
do {
|
||||||
|
entity =
|
||||||
|
store.getEntity(containerId.toString(),
|
||||||
|
ContainerMetricsConstants.ENTITY_TYPE,
|
||||||
|
EnumSet.allOf(Field.class));
|
||||||
|
} while (entity == null || entity.getEvents().size() < 1);
|
||||||
|
Assert.assertNotNull(entity.getOtherInfo());
|
||||||
|
Assert.assertEquals(2, entity.getOtherInfo().size());
|
||||||
|
Assert.assertNotNull(entity.getOtherInfo().get(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO));
|
||||||
|
Assert.assertNotNull(entity.getOtherInfo().get(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO));
|
||||||
|
Assert.assertEquals(
|
||||||
|
container.getAllocatedNode().getHost(),
|
||||||
|
entity.getOtherInfo().get(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO));
|
||||||
|
Assert.assertEquals(
|
||||||
|
container.getAllocatedNode().getPort(),
|
||||||
|
entity.getOtherInfo().get(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO));
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testPublishContainerMetrics() throws Exception {
|
public void testPublishContainerMetrics() throws Exception {
|
||||||
ContainerId containerId =
|
ContainerId containerId =
|
||||||
|
|
Loading…
Reference in New Issue