YARN-8130 Race condition when container events are published for KILLED applications. (Rohith Sharma K S via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-05-14 11:08:42 -07:00
parent 6beb25ab7e
commit 2d00a0c71b
4 changed files with 113 additions and 27 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.AbstractEvent;
/**
@ -25,11 +26,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
* timelineservice v2.
*/
public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> {
public NMTimelineEvent(NMTimelineEventType type) {
super(type);
private ApplicationId appId;
public NMTimelineEvent(NMTimelineEventType type, ApplicationId appId) {
super(type, System.currentTimeMillis());
this.appId=appId;
}
public NMTimelineEvent(NMTimelineEventType type, long timestamp) {
super(type, timestamp);
public ApplicationId getApplicationId() {
return appId;
}
}

View File

@ -24,4 +24,7 @@ package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
public enum NMTimelineEventType {
// Publish the NM Timeline entity
TIMELINE_ENTITY_PUBLISH,
// Stop and remove timeline client
STOP_TIMELINE_CLIENT
}

View File

@ -96,7 +96,7 @@ public class NMTimelinePublisher extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
dispatcher = new AsyncDispatcher("NM Timeline dispatcher");
dispatcher = createDispatcher();
dispatcher.register(NMTimelineEventType.class,
new ForwardingEventHandler());
addIfService(dispatcher);
@ -113,6 +113,10 @@ public class NMTimelinePublisher extends CompositeService {
super.serviceInit(conf);
}
protected AsyncDispatcher createDispatcher() {
return new AsyncDispatcher("NM Timeline dispatcher");
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
@ -141,6 +145,9 @@ public class NMTimelinePublisher extends CompositeService {
putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(),
((TimelinePublishEvent) event).getApplicationId());
break;
case STOP_TIMELINE_CLIENT:
removeAndStopTimelineClient(event.getApplicationId());
break;
default:
LOG.error("Unknown NMTimelineEvent type: " + event.getType());
}
@ -392,20 +399,13 @@ public class NMTimelinePublisher extends CompositeService {
}
private static class TimelinePublishEvent extends NMTimelineEvent {
private ApplicationId appId;
private TimelineEntity entityToPublish;
public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) {
super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System
.currentTimeMillis());
this.appId = appId;
super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, appId);
this.entityToPublish = entity;
}
public ApplicationId getApplicationId() {
return appId;
}
public TimelineEntity getTimelineEntityToPublish() {
return entityToPublish;
}
@ -434,6 +434,11 @@ public class NMTimelinePublisher extends CompositeService {
}
public void stopTimelineClient(ApplicationId appId) {
dispatcher.getEventHandler().handle(
new NMTimelineEvent(NMTimelineEventType.STOP_TIMELINE_CLIENT, appId));
}
private void removeAndStopTimelineClient(ApplicationId appId) {
TimelineV2Client client = appToClientMap.remove(appId);
if (client != null) {
client.stop();

View File

@ -31,34 +31,47 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.junit.Assert;
import org.junit.Test;
import org.junit.After;
import org.junit.Before;
public class TestNMTimelinePublisher {
private static final String MEMORY_ID = "MEMORY";
private static final String CPU_ID = "CPU";
@Test
public void testContainerResourceUsage() {
Context context = mock(Context.class);
@SuppressWarnings("unchecked")
final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
private NMTimelinePublisher publisher;
private DummyTimelineClient timelineClient;
private Configuration conf;
private DrainDispatcher dispatcher;
Configuration conf = new Configuration();
@Before public void setup() throws Exception {
conf = new Configuration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
3000L);
timelineClient = new DummyTimelineClient(null);
Context context = createMockContext();
dispatcher = new DrainDispatcher();
NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
publisher = new NMTimelinePublisher(context) {
public void createTimelineClient(ApplicationId appId) {
if (!getAppToClientMap().containsKey(appId)) {
timelineClient.init(getConfig());
@ -66,15 +79,73 @@ public class TestNMTimelinePublisher {
getAppToClientMap().put(appId, timelineClient);
}
}
@Override protected AsyncDispatcher createDispatcher() {
return dispatcher;
}
};
publisher.init(conf);
publisher.start();
}
private Context createMockContext() {
Context context = mock(Context.class);
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
return context;
}
@After public void tearDown() throws Exception {
if (publisher != null) {
publisher.stop();
}
if (timelineClient != null) {
timelineClient.stop();
}
}
@Test public void testPublishContainerFinish() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 2);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
String diag = "test-diagnostics";
int exitStatus = 0;
ContainerStatus cStatus = mock(ContainerStatus.class);
when(cStatus.getContainerId()).thenReturn(cId);
when(cStatus.getDiagnostics()).thenReturn(diag);
when(cStatus.getExitStatus()).thenReturn(exitStatus);
long timeStamp = System.currentTimeMillis();
ApplicationContainerFinishedEvent finishedEvent =
new ApplicationContainerFinishedEvent(cStatus, timeStamp);
publisher.createTimelineClient(appId);
publisher.publishApplicationEvent(finishedEvent);
publisher.stopTimelineClient(appId);
dispatcher.await();
ContainerEntity cEntity = new ContainerEntity();
cEntity.setId(cId.toString());
TimelineEntity[] lastPublishedEntities =
timelineClient.getLastPublishedEntities();
Assert.assertNotNull(lastPublishedEntities);
Assert.assertEquals(1, lastPublishedEntities.length);
TimelineEntity entity = lastPublishedEntities[0];
Assert.assertTrue(cEntity.equals(entity));
Assert.assertEquals(diag,
entity.getInfo().get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
Assert.assertEquals(exitStatus,
entity.getInfo().get(ContainerMetricsConstants.EXIT_STATUS_INFO));
}
@Test public void testContainerResourceUsage() {
ApplicationId appId = ApplicationId.newInstance(0, 1);
publisher.createTimelineClient(appId);
Container aContainer = mock(Container.class);
when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId(
ApplicationAttemptId.newInstance(appId, 1),
0L));
when(aContainer.getContainerId()).thenReturn(ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId, 1), 0L));
publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
timelineClient.reset();
@ -91,7 +162,6 @@ public class TestNMTimelinePublisher {
(float) ResourceCalculatorProcessTree.UNAVAILABLE);
verifyPublishedResourceUsageMetrics(timelineClient, 1024L,
ResourceCalculatorProcessTree.UNAVAILABLE);
publisher.stop();
}
private void verifyPublishedResourceUsageMetrics(
@ -151,8 +221,12 @@ public class TestNMTimelinePublisher {
private TimelineEntity[] lastPublishedEntities;
@Override
public void putEntitiesAsync(TimelineEntity... entities)
@Override public void putEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException {
this.lastPublishedEntities = entities;
}
@Override public void putEntities(TimelineEntity... entities)
throws IOException, YarnException {
this.lastPublishedEntities = entities;
}