YARN-4711. NM is going down with NPE's due to single thread processing of events by Timeline client (Naganarasimha G R via sjlee)

This commit is contained in:
Sangjin Lee 2016-03-28 15:50:03 -07:00
parent 6f6cc647d6
commit 84c35ac6c4
12 changed files with 279 additions and 179 deletions

View File

@ -117,8 +117,15 @@
<!-- Object cast is based on the event type -->
<Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher$ApplicationEventHandler" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
<Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher" />
<Method name="publishApplicationEvent" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher" />
<Method name="publishLocalizationEvent" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<Match>

View File

@ -17,15 +17,6 @@
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.codehaus.jackson.annotate.JsonSetter;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -33,6 +24,16 @@
import java.util.Set;
import java.util.TreeSet;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.codehaus.jackson.annotate.JsonSetter;
/**
* The basic timeline entity data structure for timeline service v2. Timeline
* entity objects are not thread safe and should not be accessed concurrently.
@ -564,6 +565,10 @@ protected TimelineEntity getReal() {
}
public String toString() {
return identifier.toString();
if (real == null) {
return identifier.toString();
} else {
return real.toString();
}
}
}

View File

@ -429,9 +429,8 @@ protected void putObjects(String path, MultivaluedMap<String, String> params,
URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
putObjects(uri, path, params, obj);
needRetry = false;
} catch (Exception e) {
// TODO only handle exception for timelineServiceAddress being updated.
// skip retry for other exceptions.
} catch (IOException e) {
// handle exception for timelineServiceAddress being updated.
checkRetryWithSleep(retries, e);
retries--;
}
@ -458,29 +457,27 @@ private int verifyRestEndPointAvailable() throws YarnException {
* @param retries
* @param e
*/
private void checkRetryWithSleep(int retries, Exception e) throws
YarnException, IOException {
private void checkRetryWithSleep(int retries, IOException e)
throws YarnException, IOException {
if (retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while retrying to connect to ATS");
}
} else {
LOG.error("TimelineClient has reached to max retry times :" +
this.maxServiceRetries + " for service address: " +
timelineServiceAddress);
if (e instanceof YarnException) {
throw (YarnException)e;
} else if (e instanceof IOException) {
throw (IOException)e;
} else {
throw new YarnException(e);
}
StringBuilder msg =
new StringBuilder("TimelineClient has reached to max retry times : ");
msg.append(this.maxServiceRetries);
msg.append(" for service address: ");
msg.append(timelineServiceAddress);
LOG.error(msg.toString());
throw new IOException(msg.toString(), e);
}
}
private void putObjects(
protected void putObjects(
URI base, String path, MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
ClientResponse resp;
@ -636,17 +633,19 @@ private Object operateDelegationToken(
/**
* Poll TimelineServiceAddress for maximum of retries times if it is null.
*
* @param retries
* @return the left retry times
* @throws IOException
*/
private int pollTimelineServiceAddress(int retries) {
private int pollTimelineServiceAddress(int retries) throws YarnException {
while (timelineServiceAddress == null && retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new YarnException("Interrupted while trying to connect ATS");
}
// timelineServiceAddress = getTimelineServiceAddress();
retries--;
}
return retries;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@ -34,23 +35,33 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class TestTimelineClientV2Impl {
private static final Log LOG =
LogFactory.getLog(TestTimelineClientV2Impl.class);
private TestV2TimelineClient client;
private static long TIME_TO_SLEEP = 150;
private static final String EXCEPTION_MSG = "Exception in the content";
@Before
public void setup() {
YarnConfiguration conf = new YarnConfiguration();
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
client = createTimelineClient(conf);
if (!currTestName.getMethodName()
.contains("testRetryOnConnectionFailure")) {
client = createTimelineClient(conf);
}
}
@Rule
public TestName currTestName = new TestName();
private YarnConfiguration conf;
private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) {
ApplicationId id = ApplicationId.newInstance(0, 0);
TestV2TimelineClient client = new TestV2TimelineClient(id);
@ -59,9 +70,34 @@ private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) {
return client;
}
private class TestV2TimelineClient extends TimelineClientImpl {
private class TestV2TimelineClientForExceptionHandling
extends TimelineClientImpl {
public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
super(id);
}
protected boolean throwYarnException;
public void setThrowYarnException(boolean throwYarnException) {
this.throwYarnException = throwYarnException;
}
@Override
protected void putObjects(URI base, String path,
MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
if (throwYarnException) {
throw new YarnException(EXCEPTION_MSG);
} else {
throw new IOException(
"Failed to get the response from the timeline server.");
}
}
}
private class TestV2TimelineClient
extends TestV2TimelineClientForExceptionHandling {
private boolean sleepBeforeReturn;
private boolean throwException;
private List<TimelineEntities> publishedEntities;
@ -75,10 +111,6 @@ public void setSleepBeforeReturn(boolean sleepBeforeReturn) {
this.sleepBeforeReturn = sleepBeforeReturn;
}
public void setThrowException(boolean throwException) {
this.throwException = throwException;
}
public int getNumOfTimelineEntitiesPublished() {
return publishedEntities.size();
}
@ -91,7 +123,7 @@ public TestV2TimelineClient(ApplicationId id) {
protected void putObjects(String path,
MultivaluedMap<String, String> params, Object obj)
throws IOException, YarnException {
if (throwException) {
if (throwYarnException) {
throw new YarnException("ActualException");
}
publishedEntities.add((TimelineEntities) obj);
@ -105,6 +137,45 @@ protected void putObjects(String path,
}
}
@Test
public void testExceptionMultipleRetry() {
TestV2TimelineClientForExceptionHandling client =
new TestV2TimelineClientForExceptionHandling(
ApplicationId.newInstance(0, 0));
int maxRetries = 2;
conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
maxRetries);
client.init(conf);
client.start();
client.setTimelineServiceAddress("localhost:12345");
try {
client.putEntities(new TimelineEntity());
} catch (IOException e) {
Assert.fail("YARN exception is expected");
} catch (YarnException e) {
Throwable cause = e.getCause();
Assert.assertTrue("IOException is expected",
cause instanceof IOException);
Assert.assertTrue("YARN exception is expected",
cause.getMessage().contains(
"TimelineClient has reached to max retry times : " + maxRetries));
}
client.setThrowYarnException(true);
try {
client.putEntities(new TimelineEntity());
} catch (IOException e) {
Assert.fail("YARN exception is expected");
} catch (YarnException e) {
Throwable cause = e.getCause();
Assert.assertTrue("YARN exception is expected",
cause instanceof YarnException);
Assert.assertTrue("YARN exception is expected",
cause.getMessage().contains(EXCEPTION_MSG));
}
client.stop();
}
@Test
public void testPostEntities() throws Exception {
try {
@ -189,7 +260,7 @@ public void testSyncCall() throws Exception {
@Test
public void testExceptionCalls() throws Exception {
client.setThrowException(true);
client.setThrowYarnException(true);
try {
client.putEntitiesAsync(generateEntity("1"));
} catch (YarnException e) {

View File

@ -69,4 +69,12 @@ public class ContainerMetricsConstants {
public static final String ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO =
"YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS";
// Event of this type will be emitted by NM.
public static final String LOCALIZATION_START_EVENT_TYPE =
"YARN_NM_CONTAINER_LOCALIZATION_STARTED";
// Event of this type will be emitted by NM.
public static final String LOCALIZATION_FINISHED_EVENT_TYPE =
"YARN_NM_CONTAINER_LOCALIZATION_FINISHED";
}

View File

@ -55,7 +55,6 @@
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -89,6 +88,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -983,9 +983,11 @@ private void updateTimelineClientsAddress(
LOG.debug("Sync a new collector address: " + collectorAddr +
" for application: " + appId + " from RM.");
}
TimelineClient client = application.getTimelineClient();
if (client != null) {
client.setTimelineServiceAddress(collectorAddr);
NMTimelinePublisher nmTimelinePublisher =
context.getNMTimelinePublisher();
if (nmTimelinePublisher != null) {
nmTimelinePublisher.setTimelineServiceAddress(
application.getAppId(), collectorAddr);
}
}
}

View File

@ -29,7 +29,6 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -42,6 +41,7 @@
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
/**
* Service that handles collector information. It is used only if the timeline
@ -116,10 +116,10 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
String collectorAddr = collector.getCollectorAddr();
newCollectorsMap.put(appId, collectorAddr);
// set registered collector address to TimelineClient.
TimelineClient client =
context.getApplications().get(appId).getTimelineClient();
if (client != null) {
client.setTimelineServiceAddress(collectorAddr);
NMTimelinePublisher nmTimelinePublisher =
context.getNMTimelinePublisher();
if (nmTimelinePublisher != null) {
nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr);
}
}
((NodeManager.NMContext)context).addRegisteredCollectors(

View File

@ -22,7 +22,6 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -41,7 +40,4 @@ public interface Application extends EventHandler<ApplicationEvent> {
String getFlowVersion();
long getFlowRunId();
TimelineClient getTimelineClient();
}

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@ -83,7 +84,6 @@ public class ApplicationImpl implements Application {
private final ReadLock readLock;
private final WriteLock writeLock;
private final Context context;
private TimelineClient timelineClient;
private static final Log LOG = LogFactory.getLog(ApplicationImpl.class);
@ -143,7 +143,7 @@ public ApplicationImpl(Dispatcher dispatcher, String user,
}
this.flowContext = flowContext;
if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
createAndStartTimelineClient(conf);
context.getNMTimelinePublisher().createTimelineClient(appId);
}
}
}
@ -175,13 +175,6 @@ public long getFlowRunId() {
}
}
private void createAndStartTimelineClient(Configuration conf) {
// create and start timeline client
this.timelineClient = TimelineClient.createTimelineClient(appId);
timelineClient.init(conf);
timelineClient.start();
}
@Override
public String getUser() {
return user.toString();
@ -192,11 +185,6 @@ public ApplicationId getAppId() {
return appId;
}
@Override
public TimelineClient getTimelineClient() {
return timelineClient;
}
@Override
public ApplicationState getApplicationState() {
this.readLock.lock();
@ -575,9 +563,10 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
registeredCollectors.remove(app.getAppId());
}
// stop timelineClient when application get finished.
TimelineClient timelineClient = app.getTimelineClient();
if (timelineClient != null) {
timelineClient.stop();
NMTimelinePublisher nmTimelinePublisher =
app.context.getNMTimelinePublisher();
if (nmTimelinePublisher != null) {
nmTimelinePublisher.stopTimelineClient(app.getAppId());
}
}
}

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -29,7 +31,6 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@ -41,16 +42,15 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@ -72,9 +72,12 @@ public class NMTimelinePublisher extends CompositeService {
private String httpAddress;
protected final Map<ApplicationId, TimelineClient> appToClientMap;
public NMTimelinePublisher(Context context) {
super(NMTimelinePublisher.class.getName());
this.context = context;
appToClientMap = new ConcurrentHashMap<>();
}
@Override
@ -82,12 +85,6 @@ protected void serviceInit(Configuration conf) throws Exception {
dispatcher = new AsyncDispatcher();
dispatcher.register(NMTimelineEventType.class,
new ForwardingEventHandler());
dispatcher
.register(ContainerEventType.class, new ContainerEventHandler());
dispatcher.register(ApplicationEventType.class,
new ApplicationEventHandler());
dispatcher.register(LocalizationEventType.class,
new LocalizationEventDispatcher());
addIfService(dispatcher);
super.serviceInit(conf);
}
@ -112,7 +109,6 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) {
}
}
@SuppressWarnings("unchecked")
public void reportContainerResourceUsage(Container container, Long pmemUsage,
Float cpuUsagePercentPerCore) {
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
@ -133,15 +129,32 @@ public void reportContainerResourceUsage(Container container, Long pmemUsage,
Math.round(cpuUsagePercentPerCore));
entity.addMetric(cpuMetric);
}
dispatcher.getEventHandler()
.handle(new TimelinePublishEvent(entity, container.getContainerId()
.getApplicationAttemptId().getApplicationId()));
ApplicationId appId = container.getContainerId().getApplicationAttemptId()
.getApplicationId();
try {
// no need to put it as part of publisher as timeline client already has
// Queuing concept
TimelineClient timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
LOG.error("Seems like client has been removed before the container"
+ " metric could be published for " + container.getContainerId());
}
} catch (IOException | YarnException e) {
LOG.error("Failed to publish Container metrics for container "
+ container.getContainerId(), e);
}
}
}
private void publishContainerCreatedEvent(ContainerEntity entity,
ContainerId containerId, Resource resource, Priority priority,
long timestamp) {
@SuppressWarnings("unchecked")
private void publishContainerCreatedEvent(ContainerEvent event) {
ContainerId containerId = event.getContainerID();
ContainerEntity entity = createContainerEntity(containerId);
Container container = context.getContainers().get(containerId);
Resource resource = container.getResource();
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
resource.getMemory());
@ -152,7 +165,7 @@ private void publishContainerCreatedEvent(ContainerEntity entity,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
nodeId.getPort());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
priority.toString());
container.getPriority().toString());
entityInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
httpAddress);
@ -160,13 +173,15 @@ private void publishContainerCreatedEvent(ContainerEntity entity,
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(timestamp);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
entity.setCreatedTime(timestamp);
putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
entity.setCreatedTime(event.getTimestamp());
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
}
@SuppressWarnings("unchecked")
private void publishContainerFinishedEvent(ContainerStatus containerStatus,
long timeStamp) {
ContainerId containerId = containerStatus.getContainerId();
@ -186,7 +201,38 @@ private void publishContainerFinishedEvent(ContainerStatus containerStatus,
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
}
private void publishContainerLocalizationEvent(
ContainerLocalizationEvent event, String eventType) {
Container container = event.getContainer();
ContainerId containerId = container.getContainerId();
TimelineEntity entity = createContainerEntity(containerId);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(eventType);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
ApplicationId appId =
container.getContainerId().getApplicationAttemptId().getApplicationId();
try {
// no need to put it as part of publisher as timeline client already has
// Queuing concept
TimelineClient timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
LOG.error("Seems like client has been removed before the event could be"
+ " published for " + container.getContainerId());
}
} catch (IOException | YarnException e) {
LOG.error("Failed to publish Container metrics for container "
+ container.getContainerId(), e);
}
}
private static ContainerEntity createContainerEntity(
@ -207,23 +253,33 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) {
LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
TimelineClient timelineClient =
context.getApplications().get(appId).getTimelineClient();
timelineClient.putEntities(entity);
TimelineClient timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntities(entity);
} else {
LOG.error("Seems like client has been removed before the entity "
+ "could be published for " + entity);
}
} catch (Exception e) {
LOG.error("Error when publishing entity " + entity, e);
}
}
@SuppressWarnings("unchecked")
public void publishApplicationEvent(ApplicationEvent event) {
// publish only when the desired event is received
switch (event.getType()) {
case INIT_APPLICATION:
case FINISH_APPLICATION:
case APPLICATION_CONTAINER_FINISHED:
case APPLICATION_LOG_HANDLING_FAILED:
dispatcher.getEventHandler().handle(event);
// TODO need to be handled in future,
// not sure to publish under which entity
break;
case APPLICATION_CONTAINER_FINISHED:
// this is actually used to publish the container Event
ApplicationContainerFinishedEvent evnt =
(ApplicationContainerFinishedEvent) event;
publishContainerFinishedEvent(evnt.getContainerStatus(),
event.getTimestamp());
break;
default:
@ -235,12 +291,11 @@ public void publishApplicationEvent(ApplicationEvent event) {
}
}
@SuppressWarnings("unchecked")
public void publishContainerEvent(ContainerEvent event) {
// publish only when the desired event is received
switch (event.getType()) {
case INIT_CONTAINER:
dispatcher.getEventHandler().handle(event);
publishContainerCreatedEvent(event);
break;
default:
@ -253,15 +308,17 @@ public void publishContainerEvent(ContainerEvent event) {
}
}
@SuppressWarnings("unchecked")
public void publishLocalizationEvent(LocalizationEvent event) {
// publish only when the desired event is received
switch (event.getType()) {
case CONTAINER_RESOURCES_LOCALIZED:
case INIT_CONTAINER_RESOURCES:
dispatcher.getEventHandler().handle(event);
publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
ContainerMetricsConstants.LOCALIZATION_FINISHED_EVENT_TYPE);
break;
case INIT_CONTAINER_RESOURCES:
publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
ContainerMetricsConstants.LOCALIZATION_START_EVENT_TYPE);
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug(event.getType()
@ -272,64 +329,6 @@ public void publishLocalizationEvent(LocalizationEvent event) {
}
}
private class ApplicationEventHandler implements
EventHandler<ApplicationEvent> {
@Override
public void handle(ApplicationEvent event) {
switch (event.getType()) {
case APPLICATION_CONTAINER_FINISHED:
// this is actually used to publish the container Event
ApplicationContainerFinishedEvent evnt =
(ApplicationContainerFinishedEvent) event;
publishContainerFinishedEvent(evnt.getContainerStatus(),
event.getTimestamp());
break;
default:
LOG.error("Seems like event type is captured only in "
+ "publishApplicationEvent method and not handled here");
break;
}
}
}
private class ContainerEventHandler implements EventHandler<ContainerEvent> {
@Override
public void handle(ContainerEvent event) {
ContainerId containerId = event.getContainerID();
Container container = context.getContainers().get(containerId);
long timestamp = event.getTimestamp();
ContainerEntity entity = createContainerEntity(containerId);
switch (event.getType()) {
case INIT_CONTAINER:
publishContainerCreatedEvent(entity, containerId,
container.getResource(), container.getPriority(), timestamp);
break;
default:
LOG.error("Seems like event type is captured only in "
+ "publishContainerEvent method and not handled here");
break;
}
}
}
private static final class LocalizationEventDispatcher implements
EventHandler<LocalizationEvent> {
@Override
public void handle(LocalizationEvent event) {
switch (event.getType()) {
case INIT_CONTAINER_RESOURCES:
case CONTAINER_RESOURCES_LOCALIZED:
// TODO after priority based flush jira is finished
break;
default:
LOG.error("Seems like event type is captured only in "
+ "publishLocalizationEvent method and not handled here");
break;
}
}
}
/**
* EventHandler implementation which forward events to NMMetricsPublisher.
* Making use of it, NMMetricsPublisher can avoid to have a public handle
@ -363,4 +362,33 @@ public TimelineEntity getTimelineEntityToPublish() {
return entityToPublish;
}
}
public void createTimelineClient(ApplicationId appId) {
if (!appToClientMap.containsKey(appId)) {
TimelineClient timelineClient =
TimelineClient.createTimelineClient(appId);
timelineClient.init(getConfig());
timelineClient.start();
appToClientMap.put(appId, timelineClient);
}
}
public void stopTimelineClient(ApplicationId appId) {
TimelineClient client = appToClientMap.remove(appId);
if (client != null) {
client.stop();
}
}
public void setTimelineServiceAddress(ApplicationId appId,
String collectorAddr) {
TimelineClient client = appToClientMap.get(appId);
if (client != null) {
client.setTimelineServiceAddress(collectorAddr);
}
}
private TimelineClient getTimelineClient(ApplicationId appId) {
return appToClientMap.get(appId);
}
}

View File

@ -20,14 +20,12 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -39,7 +37,6 @@
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.junit.Assert;
@ -53,20 +50,23 @@ public class TestNMTimelinePublisher {
public void testContainerResourceUsage() {
Context context = mock(Context.class);
@SuppressWarnings("unchecked")
ConcurrentMap<ApplicationId, Application> map = mock(ConcurrentMap.class);
Application aApp = mock(Application.class);
when(map.get(any(ApplicationId.class))).thenReturn(aApp);
DummyTimelineClient timelineClient = new DummyTimelineClient();
when(aApp.getTimelineClient()).thenReturn(timelineClient);
when(context.getApplications()).thenReturn(map);
final DummyTimelineClient timelineClient = new DummyTimelineClient();
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
when(context.getHttpPort()).thenReturn(0);
NMTimelinePublisher publisher = new NMTimelinePublisher(context);
NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
public void createTimelineClient(ApplicationId appId) {
if (!appToClientMap.containsKey(appId)) {
appToClientMap.put(appId, timelineClient);
}
}
};
publisher.init(new Configuration());
publisher.start();
ApplicationId appId = ApplicationId.newInstance(0, 1);
publisher.createTimelineClient(appId);
Container aContainer = mock(Container.class);
when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1),
ApplicationAttemptId.newInstance(appId, 1),
0L));
publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
@ -141,7 +141,7 @@ protected static class DummyTimelineClient extends TimelineClientImpl {
private TimelineEntity[] lastPublishedEntities;
@Override
public void putEntities(TimelineEntity... entities)
public void putEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException {
this.lastPublishedEntities = entities;
}

View File

@ -101,9 +101,4 @@ public String getFlowVersion() {
public long getFlowRunId() {
return flowRunId;
}
@Override
public TimelineClient getTimelineClient() {
return timelineClient;
}
}