YARN-8130 Race condition when container events are published for KILLED applications. (Rohith Sharma K S via Haibo Chen)
(cherry picked from commit 2d00a0c71b
)
This commit is contained in:
parent
4f55941390
commit
8d3598525d
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
|
package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -25,11 +26,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||||
* timelineservice v2.
|
* timelineservice v2.
|
||||||
*/
|
*/
|
||||||
public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> {
|
public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> {
|
||||||
public NMTimelineEvent(NMTimelineEventType type) {
|
private ApplicationId appId;
|
||||||
super(type);
|
|
||||||
|
public NMTimelineEvent(NMTimelineEventType type, ApplicationId appId) {
|
||||||
|
super(type, System.currentTimeMillis());
|
||||||
|
this.appId=appId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public NMTimelineEvent(NMTimelineEventType type, long timestamp) {
|
public ApplicationId getApplicationId() {
|
||||||
super(type, timestamp);
|
return appId;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -24,4 +24,7 @@ package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
|
||||||
public enum NMTimelineEventType {
|
public enum NMTimelineEventType {
|
||||||
// Publish the NM Timeline entity
|
// Publish the NM Timeline entity
|
||||||
TIMELINE_ENTITY_PUBLISH,
|
TIMELINE_ENTITY_PUBLISH,
|
||||||
|
|
||||||
|
// Stop and remove timeline client
|
||||||
|
STOP_TIMELINE_CLIENT
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class NMTimelinePublisher extends CompositeService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
dispatcher = new AsyncDispatcher("NM Timeline dispatcher");
|
dispatcher = createDispatcher();
|
||||||
dispatcher.register(NMTimelineEventType.class,
|
dispatcher.register(NMTimelineEventType.class,
|
||||||
new ForwardingEventHandler());
|
new ForwardingEventHandler());
|
||||||
addIfService(dispatcher);
|
addIfService(dispatcher);
|
||||||
|
@ -113,6 +113,10 @@ public class NMTimelinePublisher extends CompositeService {
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected AsyncDispatcher createDispatcher() {
|
||||||
|
return new AsyncDispatcher("NM Timeline dispatcher");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
|
@ -141,6 +145,9 @@ public class NMTimelinePublisher extends CompositeService {
|
||||||
putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(),
|
putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(),
|
||||||
((TimelinePublishEvent) event).getApplicationId());
|
((TimelinePublishEvent) event).getApplicationId());
|
||||||
break;
|
break;
|
||||||
|
case STOP_TIMELINE_CLIENT:
|
||||||
|
removeAndStopTimelineClient(event.getApplicationId());
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.error("Unknown NMTimelineEvent type: " + event.getType());
|
LOG.error("Unknown NMTimelineEvent type: " + event.getType());
|
||||||
}
|
}
|
||||||
|
@ -392,20 +399,13 @@ public class NMTimelinePublisher extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TimelinePublishEvent extends NMTimelineEvent {
|
private static class TimelinePublishEvent extends NMTimelineEvent {
|
||||||
private ApplicationId appId;
|
|
||||||
private TimelineEntity entityToPublish;
|
private TimelineEntity entityToPublish;
|
||||||
|
|
||||||
public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) {
|
public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) {
|
||||||
super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System
|
super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, appId);
|
||||||
.currentTimeMillis());
|
|
||||||
this.appId = appId;
|
|
||||||
this.entityToPublish = entity;
|
this.entityToPublish = entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ApplicationId getApplicationId() {
|
|
||||||
return appId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TimelineEntity getTimelineEntityToPublish() {
|
public TimelineEntity getTimelineEntityToPublish() {
|
||||||
return entityToPublish;
|
return entityToPublish;
|
||||||
}
|
}
|
||||||
|
@ -434,6 +434,11 @@ public class NMTimelinePublisher extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stopTimelineClient(ApplicationId appId) {
|
public void stopTimelineClient(ApplicationId appId) {
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new NMTimelineEvent(NMTimelineEventType.STOP_TIMELINE_CLIENT, appId));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeAndStopTimelineClient(ApplicationId appId) {
|
||||||
TimelineV2Client client = appToClientMap.remove(appId);
|
TimelineV2Client client = appToClientMap.remove(appId);
|
||||||
if (client != null) {
|
if (client != null) {
|
||||||
client.stop();
|
client.stop();
|
||||||
|
|
|
@ -31,34 +31,47 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
|
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
|
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
public class TestNMTimelinePublisher {
|
public class TestNMTimelinePublisher {
|
||||||
private static final String MEMORY_ID = "MEMORY";
|
private static final String MEMORY_ID = "MEMORY";
|
||||||
private static final String CPU_ID = "CPU";
|
private static final String CPU_ID = "CPU";
|
||||||
|
|
||||||
@Test
|
private NMTimelinePublisher publisher;
|
||||||
public void testContainerResourceUsage() {
|
private DummyTimelineClient timelineClient;
|
||||||
Context context = mock(Context.class);
|
private Configuration conf;
|
||||||
@SuppressWarnings("unchecked")
|
private DrainDispatcher dispatcher;
|
||||||
final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
|
|
||||||
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
|
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
|
@Before public void setup() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||||
|
conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
|
||||||
|
3000L);
|
||||||
|
timelineClient = new DummyTimelineClient(null);
|
||||||
|
Context context = createMockContext();
|
||||||
|
dispatcher = new DrainDispatcher();
|
||||||
|
|
||||||
NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
|
publisher = new NMTimelinePublisher(context) {
|
||||||
public void createTimelineClient(ApplicationId appId) {
|
public void createTimelineClient(ApplicationId appId) {
|
||||||
if (!getAppToClientMap().containsKey(appId)) {
|
if (!getAppToClientMap().containsKey(appId)) {
|
||||||
timelineClient.init(getConfig());
|
timelineClient.init(getConfig());
|
||||||
|
@ -66,15 +79,73 @@ public class TestNMTimelinePublisher {
|
||||||
getAppToClientMap().put(appId, timelineClient);
|
getAppToClientMap().put(appId, timelineClient);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override protected AsyncDispatcher createDispatcher() {
|
||||||
|
return dispatcher;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
publisher.init(conf);
|
publisher.init(conf);
|
||||||
publisher.start();
|
publisher.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Context createMockContext() {
|
||||||
|
Context context = mock(Context.class);
|
||||||
|
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
|
@After public void tearDown() throws Exception {
|
||||||
|
if (publisher != null) {
|
||||||
|
publisher.stop();
|
||||||
|
}
|
||||||
|
if (timelineClient != null) {
|
||||||
|
timelineClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testPublishContainerFinish() throws Exception {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 2);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
|
||||||
|
|
||||||
|
String diag = "test-diagnostics";
|
||||||
|
int exitStatus = 0;
|
||||||
|
ContainerStatus cStatus = mock(ContainerStatus.class);
|
||||||
|
when(cStatus.getContainerId()).thenReturn(cId);
|
||||||
|
when(cStatus.getDiagnostics()).thenReturn(diag);
|
||||||
|
when(cStatus.getExitStatus()).thenReturn(exitStatus);
|
||||||
|
long timeStamp = System.currentTimeMillis();
|
||||||
|
|
||||||
|
ApplicationContainerFinishedEvent finishedEvent =
|
||||||
|
new ApplicationContainerFinishedEvent(cStatus, timeStamp);
|
||||||
|
|
||||||
|
publisher.createTimelineClient(appId);
|
||||||
|
publisher.publishApplicationEvent(finishedEvent);
|
||||||
|
publisher.stopTimelineClient(appId);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
ContainerEntity cEntity = new ContainerEntity();
|
||||||
|
cEntity.setId(cId.toString());
|
||||||
|
TimelineEntity[] lastPublishedEntities =
|
||||||
|
timelineClient.getLastPublishedEntities();
|
||||||
|
|
||||||
|
Assert.assertNotNull(lastPublishedEntities);
|
||||||
|
Assert.assertEquals(1, lastPublishedEntities.length);
|
||||||
|
TimelineEntity entity = lastPublishedEntities[0];
|
||||||
|
Assert.assertTrue(cEntity.equals(entity));
|
||||||
|
Assert.assertEquals(diag,
|
||||||
|
entity.getInfo().get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
|
||||||
|
Assert.assertEquals(exitStatus,
|
||||||
|
entity.getInfo().get(ContainerMetricsConstants.EXIT_STATUS_INFO));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testContainerResourceUsage() {
|
||||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
publisher.createTimelineClient(appId);
|
publisher.createTimelineClient(appId);
|
||||||
Container aContainer = mock(Container.class);
|
Container aContainer = mock(Container.class);
|
||||||
when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId(
|
when(aContainer.getContainerId()).thenReturn(ContainerId
|
||||||
ApplicationAttemptId.newInstance(appId, 1),
|
.newContainerId(ApplicationAttemptId.newInstance(appId, 1), 0L));
|
||||||
0L));
|
|
||||||
publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
|
publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
|
||||||
verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
|
verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
|
||||||
timelineClient.reset();
|
timelineClient.reset();
|
||||||
|
@ -91,7 +162,6 @@ public class TestNMTimelinePublisher {
|
||||||
(float) ResourceCalculatorProcessTree.UNAVAILABLE);
|
(float) ResourceCalculatorProcessTree.UNAVAILABLE);
|
||||||
verifyPublishedResourceUsageMetrics(timelineClient, 1024L,
|
verifyPublishedResourceUsageMetrics(timelineClient, 1024L,
|
||||||
ResourceCalculatorProcessTree.UNAVAILABLE);
|
ResourceCalculatorProcessTree.UNAVAILABLE);
|
||||||
publisher.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyPublishedResourceUsageMetrics(
|
private void verifyPublishedResourceUsageMetrics(
|
||||||
|
@ -151,8 +221,12 @@ public class TestNMTimelinePublisher {
|
||||||
|
|
||||||
private TimelineEntity[] lastPublishedEntities;
|
private TimelineEntity[] lastPublishedEntities;
|
||||||
|
|
||||||
@Override
|
@Override public void putEntitiesAsync(TimelineEntity... entities)
|
||||||
public void putEntitiesAsync(TimelineEntity... entities)
|
throws IOException, YarnException {
|
||||||
|
this.lastPublishedEntities = entities;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void putEntities(TimelineEntity... entities)
|
||||||
throws IOException, YarnException {
|
throws IOException, YarnException {
|
||||||
this.lastPublishedEntities = entities;
|
this.lastPublishedEntities = entities;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue