YARN-3995. Some of the NM events are not getting published due race condition when AM container finishes in NM (Naganarasimha G R via sjlee)

This commit is contained in:
Sangjin Lee 2016-01-11 10:09:34 -08:00
parent 829cceebc0
commit cc16683cef
4 changed files with 35 additions and 13 deletions

View File

@ -1981,6 +1981,11 @@ public class YarnConfiguration extends Configuration {
public static final int public static final int
DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60; DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
public static final String ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS =
TIMELINE_SERVICE_PREFIX + "app-collector.linger-period.ms";
public static final int DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = 1000;
// mark app-history related configs @Private as application history is going // mark app-history related configs @Private as application history is going
// to be integrated into the timeline service // to be integrated into the timeline service
@Private @Private

View File

@ -2213,6 +2213,13 @@
<value>60</value> <value>60</value>
</property> </property>
<property>
<description>Time period till which the application collector will be alive
in NM, after the application master container finishes.</description>
<name>yarn.timeline-service.app-collector.linger-period.ms</name>
<value>1000</value>
</property>
<!-- Shared Cache Configuration --> <!-- Shared Cache Configuration -->
<property> <property>

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.timelineservice.collector; package org.apache.hadoop.yarn.server.timelineservice.collector;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -54,6 +57,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
private static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final int SHUTDOWN_HOOK_PRIORITY = 30;
private final NodeTimelineCollectorManager collectorManager; private final NodeTimelineCollectorManager collectorManager;
private long collectorLingerPeriod;
private ScheduledExecutorService scheduler;
public PerNodeTimelineCollectorsAuxService() { public PerNodeTimelineCollectorsAuxService() {
this(new NodeTimelineCollectorManager()); this(new NodeTimelineCollectorManager());
@ -70,6 +75,10 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
if (!YarnConfiguration.timelineServiceV2Enabled(conf)) { if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
throw new YarnException("Timeline service v2 is not enabled"); throw new YarnException("Timeline service v2 is not enabled");
} }
collectorLingerPeriod =
conf.getLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
YarnConfiguration.DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS);
scheduler = Executors.newSingleThreadScheduledExecutor();
collectorManager.init(conf); collectorManager.init(conf);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -82,6 +91,12 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
scheduler.shutdown();
if (!scheduler.awaitTermination(collectorLingerPeriod,
TimeUnit.MILLISECONDS)) {
LOG.warn(
"Scheduler terminated before removing the application collectors");
}
collectorManager.stop(); collectorManager.stop();
super.serviceStop(); super.serviceStop();
} }
@ -141,17 +156,11 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
final ApplicationId appId = final ApplicationId appId =
context.getContainerId().getApplicationAttemptId().getApplicationId(); context.getContainerId().getApplicationAttemptId().getApplicationId();
new Thread(new Runnable() { scheduler.schedule(new Runnable() {
public void run() { public void run() {
try {
// TODO Temporary Fix until solution for YARN-3995 is finalized.
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
removeApplication(appId); removeApplication(appId);
} }
}).start(); }, collectorLingerPeriod, TimeUnit.MILLISECONDS);
} }
} }

View File

@ -22,12 +22,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
@ -45,8 +47,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
public class TestPerNodeTimelineCollectorsAuxService { public class TestPerNodeTimelineCollectorsAuxService {
private ApplicationAttemptId appAttemptId; private ApplicationAttemptId appAttemptId;
private PerNodeTimelineCollectorsAuxService auxService; private PerNodeTimelineCollectorsAuxService auxService;
@ -103,8 +103,9 @@ public class TestPerNodeTimelineCollectorsAuxService {
when(context.getContainerType()).thenReturn( when(context.getContainerType()).thenReturn(
ContainerType.APPLICATION_MASTER); ContainerType.APPLICATION_MASTER);
auxService.stopContainer(context); auxService.stopContainer(context);
// auxService should have the app's collector and need to remove only after
// TODO Temporary Fix until solution for YARN-3995 is finalized // a configured period
assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
Thread.sleep(500l); Thread.sleep(500l);
if (!auxService.hasApplication(appAttemptId.getApplicationId())) { if (!auxService.hasApplication(appAttemptId.getApplicationId())) {