YARN-7835. Race condition in NM while publishing events if second attempt is launched on the same node. (Rohith Sharma K S via Haibo Chen)
(cherry picked from commit d1274c3b71
)
This commit is contained in:
parent
fb0f2cc2fb
commit
45d012f208
|
@ -19,7 +19,12 @@
|
|||
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -31,6 +36,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||
|
@ -59,6 +65,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
|||
private final NodeTimelineCollectorManager collectorManager;
|
||||
private long collectorLingerPeriod;
|
||||
private ScheduledExecutorService scheduler;
|
||||
private Map<ApplicationId, Set<ContainerId>> appIdToContainerId =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
public PerNodeTimelineCollectorsAuxService() {
|
||||
this(new NodeTimelineCollectorManager(true));
|
||||
|
@ -148,7 +156,15 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
|||
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
|
||||
ApplicationId appId = context.getContainerId().
|
||||
getApplicationAttemptId().getApplicationId();
|
||||
addApplication(appId, context.getUser());
|
||||
synchronized (appIdToContainerId) {
|
||||
Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
|
||||
if (masterContainers == null) {
|
||||
masterContainers = new HashSet<>();
|
||||
appIdToContainerId.put(appId, masterContainers);
|
||||
}
|
||||
masterContainers.add(context.getContainerId());
|
||||
addApplication(appId, context.getUser());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -162,16 +178,35 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
|||
// intercept the event of the AM container being stopped and remove the app
|
||||
// level collector service
|
||||
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
|
||||
final ApplicationId appId =
|
||||
context.getContainerId().getApplicationAttemptId().getApplicationId();
|
||||
scheduler.schedule(new Runnable() {
|
||||
public void run() {
|
||||
removeApplication(appId);
|
||||
}
|
||||
}, collectorLingerPeriod, TimeUnit.MILLISECONDS);
|
||||
final ContainerId containerId = context.getContainerId();
|
||||
removeApplicationCollector(containerId);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected Future removeApplicationCollector(final ContainerId containerId) {
|
||||
final ApplicationId appId =
|
||||
containerId.getApplicationAttemptId().getApplicationId();
|
||||
return scheduler.schedule(new Runnable() {
|
||||
public void run() {
|
||||
synchronized (appIdToContainerId) {
|
||||
Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
|
||||
if (masterContainers == null) {
|
||||
LOG.info("Stop container for " + containerId
|
||||
+ " is called before initializing container.");
|
||||
return;
|
||||
}
|
||||
masterContainers.remove(containerId);
|
||||
if (masterContainers.size() == 0) {
|
||||
// remove only if it is last master container
|
||||
removeApplication(appId);
|
||||
appIdToContainerId.remove(appId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, collectorLingerPeriod, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean hasApplication(ApplicationId appId) {
|
||||
return collectorManager.containsTimelineCollector(appId);
|
||||
|
|
|
@ -29,6 +29,7 @@ import static org.mockito.Mockito.spy;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
|
|||
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -57,10 +59,10 @@ public class TestPerNodeTimelineCollectorsAuxService {
|
|||
private ApplicationAttemptId appAttemptId;
|
||||
private PerNodeTimelineCollectorsAuxService auxService;
|
||||
private Configuration conf;
|
||||
private ApplicationId appId;
|
||||
|
||||
public TestPerNodeTimelineCollectorsAuxService() {
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||
conf = new YarnConfiguration();
|
||||
// enable timeline service v.2
|
||||
|
@ -111,15 +113,6 @@ public class TestPerNodeTimelineCollectorsAuxService {
|
|||
when(context.getContainerType()).thenReturn(
|
||||
ContainerType.APPLICATION_MASTER);
|
||||
auxService.stopContainer(context);
|
||||
// auxService should have the app's collector and need to remove only after
|
||||
// a configured period
|
||||
assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
|
||||
for (int i = 0; i < 4; i++) {
|
||||
Thread.sleep(500L);
|
||||
if (!auxService.hasApplication(appAttemptId.getApplicationId())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// auxService should not have that app
|
||||
assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
|
||||
|
@ -159,21 +152,53 @@ public class TestPerNodeTimelineCollectorsAuxService {
|
|||
private PerNodeTimelineCollectorsAuxService
|
||||
createCollectorAndAddApplication() {
|
||||
PerNodeTimelineCollectorsAuxService service = createCollector();
|
||||
|
||||
ContainerInitializationContext context =
|
||||
createContainerInitalizationContext(1);
|
||||
service.initializeContainer(context);
|
||||
return service;
|
||||
}
|
||||
|
||||
ContainerInitializationContext createContainerInitalizationContext(
|
||||
int attempt) {
|
||||
appAttemptId = ApplicationAttemptId.newInstance(appId, attempt);
|
||||
// create an AM container
|
||||
ContainerId containerId = getAMContainerId();
|
||||
ContainerInitializationContext context =
|
||||
mock(ContainerInitializationContext.class);
|
||||
when(context.getContainerId()).thenReturn(containerId);
|
||||
when(context.getContainerType()).thenReturn(
|
||||
ContainerType.APPLICATION_MASTER);
|
||||
service.initializeContainer(context);
|
||||
return service;
|
||||
when(context.getContainerType())
|
||||
.thenReturn(ContainerType.APPLICATION_MASTER);
|
||||
return context;
|
||||
}
|
||||
|
||||
ContainerTerminationContext createContainerTerminationContext(int attempt) {
|
||||
appAttemptId = ApplicationAttemptId.newInstance(appId, attempt);
|
||||
// create an AM container
|
||||
ContainerId containerId = getAMContainerId();
|
||||
ContainerTerminationContext context =
|
||||
mock(ContainerTerminationContext.class);
|
||||
when(context.getContainerId()).thenReturn(containerId);
|
||||
when(context.getContainerType())
|
||||
.thenReturn(ContainerType.APPLICATION_MASTER);
|
||||
return context;
|
||||
}
|
||||
|
||||
private PerNodeTimelineCollectorsAuxService createCollector() {
|
||||
NodeTimelineCollectorManager collectorManager = createCollectorManager();
|
||||
PerNodeTimelineCollectorsAuxService service =
|
||||
spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
|
||||
spy(new PerNodeTimelineCollectorsAuxService(collectorManager) {
|
||||
@Override
|
||||
protected Future removeApplicationCollector(ContainerId containerId) {
|
||||
Future future = super.removeApplicationCollector(containerId);
|
||||
try {
|
||||
future.get();
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Expeption thrown while removing collector");
|
||||
}
|
||||
return future;
|
||||
}
|
||||
});
|
||||
service.init(conf);
|
||||
service.start();
|
||||
return service;
|
||||
|
@ -204,4 +229,40 @@ public class TestPerNodeTimelineCollectorsAuxService {
|
|||
private ContainerId getContainerId(long id) {
|
||||
return ContainerId.newContainerId(appAttemptId, id);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRemoveAppWhenSecondAttemptAMCotainerIsLaunchedSameNode()
|
||||
throws Exception {
|
||||
// add first attempt collector
|
||||
auxService = createCollectorAndAddApplication();
|
||||
// auxService should have a single app
|
||||
assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
|
||||
|
||||
// add second attempt collector before first attempt master container stop
|
||||
ContainerInitializationContext containerInitalizationContext =
|
||||
createContainerInitalizationContext(2);
|
||||
auxService.initializeContainer(containerInitalizationContext);
|
||||
|
||||
assertTrue("Applicatin not found in collectors.",
|
||||
auxService.hasApplication(appAttemptId.getApplicationId()));
|
||||
|
||||
// first attempt stop container
|
||||
ContainerTerminationContext context = createContainerTerminationContext(1);
|
||||
auxService.stopContainer(context);
|
||||
|
||||
// 2nd attempt container removed, still collector should hold application id
|
||||
assertTrue("collector has removed application though 2nd attempt"
|
||||
+ " is running this node",
|
||||
auxService.hasApplication(appAttemptId.getApplicationId()));
|
||||
|
||||
// second attempt stop container
|
||||
context = createContainerTerminationContext(2);
|
||||
auxService.stopContainer(context);
|
||||
|
||||
// auxService should not have that app
|
||||
assertFalse("Application is not removed from collector",
|
||||
auxService.hasApplication(appAttemptId.getApplicationId()));
|
||||
auxService.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue