diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index d724026e8e9..08c6ba28c76 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -117,8 +117,15 @@
-
-
+
+
+
+
+
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
index acc132e604f..7ce8279e5f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -17,15 +17,6 @@
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.util.TimelineServiceHelper;
-import org.codehaus.jackson.annotate.JsonSetter;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -33,6 +24,16 @@
import java.util.Set;
import java.util.TreeSet;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.codehaus.jackson.annotate.JsonSetter;
+
/**
* The basic timeline entity data structure for timeline service v2. Timeline
* entity objects are not thread safe and should not be accessed concurrently.
@@ -564,6 +565,10 @@ protected TimelineEntity getReal() {
}
public String toString() {
- return identifier.toString();
+ if (real == null) {
+ return identifier.toString();
+ } else {
+ return real.toString();
+ }
}
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 87a5e9c1b20..ef8838e1a1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -429,9 +429,8 @@ protected void putObjects(String path, MultivaluedMap params,
URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
putObjects(uri, path, params, obj);
needRetry = false;
- } catch (Exception e) {
- // TODO only handle exception for timelineServiceAddress being updated.
- // skip retry for other exceptions.
+ } catch (IOException e) {
+ // handle exception for timelineServiceAddress being updated.
checkRetryWithSleep(retries, e);
retries--;
}
@@ -458,29 +457,27 @@ private int verifyRestEndPointAvailable() throws YarnException {
* @param retries
* @param e
*/
- private void checkRetryWithSleep(int retries, Exception e) throws
- YarnException, IOException {
+ private void checkRetryWithSleep(int retries, IOException e)
+ throws YarnException, IOException {
if (retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
+ throw new YarnException("Interrupted while retrying to connect to ATS");
}
} else {
- LOG.error("TimelineClient has reached to max retry times :" +
- this.maxServiceRetries + " for service address: " +
- timelineServiceAddress);
- if (e instanceof YarnException) {
- throw (YarnException)e;
- } else if (e instanceof IOException) {
- throw (IOException)e;
- } else {
- throw new YarnException(e);
- }
+ StringBuilder msg =
+ new StringBuilder("TimelineClient has reached to max retry times : ");
+ msg.append(this.maxServiceRetries);
+ msg.append(" for service address: ");
+ msg.append(timelineServiceAddress);
+ LOG.error(msg.toString());
+ throw new IOException(msg.toString(), e);
}
}
- private void putObjects(
+ protected void putObjects(
URI base, String path, MultivaluedMap params, Object obj)
throws IOException, YarnException {
ClientResponse resp;
@@ -636,17 +633,19 @@ private Object operateDelegationToken(
/**
* Poll TimelineServiceAddress for maximum of retries times if it is null.
+ *
* @param retries
* @return the left retry times
+ * @throws IOException
*/
- private int pollTimelineServiceAddress(int retries) {
+ private int pollTimelineServiceAddress(int retries) throws YarnException {
while (timelineServiceAddress == null && retries > 0) {
try {
Thread.sleep(this.serviceRetryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ throw new YarnException("Interrupted while trying to connect ATS");
}
- // timelineServiceAddress = getTimelineServiceAddress();
retries--;
}
return retries;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
index 7803f94239d..71dafdc8461 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -34,23 +35,33 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
public class TestTimelineClientV2Impl {
private static final Log LOG =
LogFactory.getLog(TestTimelineClientV2Impl.class);
private TestV2TimelineClient client;
private static long TIME_TO_SLEEP = 150;
+ private static final String EXCEPTION_MSG = "Exception in the content";
@Before
public void setup() {
- YarnConfiguration conf = new YarnConfiguration();
+ conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
- client = createTimelineClient(conf);
+ if (!currTestName.getMethodName()
+ .contains("testRetryOnConnectionFailure")) {
+ client = createTimelineClient(conf);
+ }
}
+ @Rule
+ public TestName currTestName = new TestName();
+ private YarnConfiguration conf;
+
private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) {
ApplicationId id = ApplicationId.newInstance(0, 0);
TestV2TimelineClient client = new TestV2TimelineClient(id);
@@ -59,9 +70,34 @@ private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) {
return client;
}
- private class TestV2TimelineClient extends TimelineClientImpl {
+ private class TestV2TimelineClientForExceptionHandling
+ extends TimelineClientImpl {
+ public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
+ super(id);
+ }
+
+ protected boolean throwYarnException;
+
+ public void setThrowYarnException(boolean throwYarnException) {
+ this.throwYarnException = throwYarnException;
+ }
+
+ @Override
+ protected void putObjects(URI base, String path,
+ MultivaluedMap params, Object obj)
+ throws IOException, YarnException {
+ if (throwYarnException) {
+ throw new YarnException(EXCEPTION_MSG);
+ } else {
+ throw new IOException(
+ "Failed to get the response from the timeline server.");
+ }
+ }
+ }
+
+ private class TestV2TimelineClient
+ extends TestV2TimelineClientForExceptionHandling {
private boolean sleepBeforeReturn;
- private boolean throwException;
private List publishedEntities;
@@ -75,10 +111,6 @@ public void setSleepBeforeReturn(boolean sleepBeforeReturn) {
this.sleepBeforeReturn = sleepBeforeReturn;
}
- public void setThrowException(boolean throwException) {
- this.throwException = throwException;
- }
-
public int getNumOfTimelineEntitiesPublished() {
return publishedEntities.size();
}
@@ -91,7 +123,7 @@ public TestV2TimelineClient(ApplicationId id) {
protected void putObjects(String path,
MultivaluedMap params, Object obj)
throws IOException, YarnException {
- if (throwException) {
+ if (throwYarnException) {
throw new YarnException("ActualException");
}
publishedEntities.add((TimelineEntities) obj);
@@ -105,6 +137,45 @@ protected void putObjects(String path,
}
}
+ @Test
+ public void testExceptionMultipleRetry() {
+ TestV2TimelineClientForExceptionHandling client =
+ new TestV2TimelineClientForExceptionHandling(
+ ApplicationId.newInstance(0, 0));
+ int maxRetries = 2;
+ conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+ maxRetries);
+ client.init(conf);
+ client.start();
+ client.setTimelineServiceAddress("localhost:12345");
+ try {
+ client.putEntities(new TimelineEntity());
+ } catch (IOException e) {
+ Assert.fail("YARN exception is expected");
+ } catch (YarnException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue("IOException is expected",
+ cause instanceof IOException);
+ Assert.assertTrue("YARN exception is expected",
+ cause.getMessage().contains(
+ "TimelineClient has reached to max retry times : " + maxRetries));
+ }
+
+ client.setThrowYarnException(true);
+ try {
+ client.putEntities(new TimelineEntity());
+ } catch (IOException e) {
+ Assert.fail("YARN exception is expected");
+ } catch (YarnException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue("YARN exception is expected",
+ cause instanceof YarnException);
+ Assert.assertTrue("YARN exception is expected",
+ cause.getMessage().contains(EXCEPTION_MSG));
+ }
+ client.stop();
+ }
+
@Test
public void testPostEntities() throws Exception {
try {
@@ -189,7 +260,7 @@ public void testSyncCall() throws Exception {
@Test
public void testExceptionCalls() throws Exception {
- client.setThrowException(true);
+ client.setThrowYarnException(true);
try {
client.putEntitiesAsync(generateEntity("1"));
} catch (YarnException e) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
index 7b429948fbe..eadb5b792d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
@@ -69,4 +69,12 @@ public class ContainerMetricsConstants {
public static final String ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO =
"YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS";
+
+ // Event of this type will be emitted by NM.
+ public static final String LOCALIZATION_START_EVENT_TYPE =
+ "YARN_NM_CONTAINER_LOCALIZATION_STARTED";
+
+ // Event of this type will be emitted by NM.
+ public static final String LOCALIZATION_FINISHED_EVENT_TYPE =
+ "YARN_NM_CONTAINER_LOCALIZATION_FINISHED";
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 9e2254e2bc2..6e0e7601a01 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -55,7 +55,6 @@
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -89,6 +88,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -983,9 +983,11 @@ private void updateTimelineClientsAddress(
LOG.debug("Sync a new collector address: " + collectorAddr +
" for application: " + appId + " from RM.");
}
- TimelineClient client = application.getTimelineClient();
- if (client != null) {
- client.setTimelineServiceAddress(collectorAddr);
+ NMTimelinePublisher nmTimelinePublisher =
+ context.getNMTimelinePublisher();
+ if (nmTimelinePublisher != null) {
+ nmTimelinePublisher.setTimelineServiceAddress(
+ application.getAppId(), collectorAddr);
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
index 548c8617c7f..d667c0ee246 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -29,7 +29,6 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -42,6 +41,7 @@
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
/**
* Service that handles collector information. It is used only if the timeline
@@ -116,10 +116,10 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
String collectorAddr = collector.getCollectorAddr();
newCollectorsMap.put(appId, collectorAddr);
// set registered collector address to TimelineClient.
- TimelineClient client =
- context.getApplications().get(appId).getTimelineClient();
- if (client != null) {
- client.setTimelineServiceAddress(collectorAddr);
+ NMTimelinePublisher nmTimelinePublisher =
+ context.getNMTimelinePublisher();
+ if (nmTimelinePublisher != null) {
+ nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr);
}
}
((NodeManager.NMContext)context).addRegisteredCollectors(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
index 5de339875fe..aee0862ae81 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
@@ -22,7 +22,6 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -41,7 +40,4 @@ public interface Application extends EventHandler {
String getFlowVersion();
long getFlowRunId();
-
- TimelineClient getTimelineClient();
-
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 0531fe4e4ac..22779bb4b38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -83,7 +84,6 @@ public class ApplicationImpl implements Application {
private final ReadLock readLock;
private final WriteLock writeLock;
private final Context context;
- private TimelineClient timelineClient;
private static final Log LOG = LogFactory.getLog(ApplicationImpl.class);
@@ -143,7 +143,7 @@ public ApplicationImpl(Dispatcher dispatcher, String user,
}
this.flowContext = flowContext;
if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
- createAndStartTimelineClient(conf);
+ context.getNMTimelinePublisher().createTimelineClient(appId);
}
}
}
@@ -175,13 +175,6 @@ public long getFlowRunId() {
}
}
- private void createAndStartTimelineClient(Configuration conf) {
- // create and start timeline client
- this.timelineClient = TimelineClient.createTimelineClient(appId);
- timelineClient.init(conf);
- timelineClient.start();
- }
-
@Override
public String getUser() {
return user.toString();
@@ -192,11 +185,6 @@ public ApplicationId getAppId() {
return appId;
}
- @Override
- public TimelineClient getTimelineClient() {
- return timelineClient;
- }
-
@Override
public ApplicationState getApplicationState() {
this.readLock.lock();
@@ -575,9 +563,10 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
registeredCollectors.remove(app.getAppId());
}
// stop timelineClient when application get finished.
- TimelineClient timelineClient = app.getTimelineClient();
- if (timelineClient != null) {
- timelineClient.stop();
+ NMTimelinePublisher nmTimelinePublisher =
+ app.context.getNMTimelinePublisher();
+ if (nmTimelinePublisher != null) {
+ nmTimelinePublisher.stopTimelineClient(app.getAppId());
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 70b7e8dd66e..4d3dafdc068 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,7 +31,6 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -41,16 +42,15 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -72,9 +72,12 @@ public class NMTimelinePublisher extends CompositeService {
private String httpAddress;
+ protected final Map appToClientMap;
+
public NMTimelinePublisher(Context context) {
super(NMTimelinePublisher.class.getName());
this.context = context;
+ appToClientMap = new ConcurrentHashMap<>();
}
@Override
@@ -82,12 +85,6 @@ protected void serviceInit(Configuration conf) throws Exception {
dispatcher = new AsyncDispatcher();
dispatcher.register(NMTimelineEventType.class,
new ForwardingEventHandler());
- dispatcher
- .register(ContainerEventType.class, new ContainerEventHandler());
- dispatcher.register(ApplicationEventType.class,
- new ApplicationEventHandler());
- dispatcher.register(LocalizationEventType.class,
- new LocalizationEventDispatcher());
addIfService(dispatcher);
super.serviceInit(conf);
}
@@ -112,7 +109,6 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) {
}
}
- @SuppressWarnings("unchecked")
public void reportContainerResourceUsage(Container container, Long pmemUsage,
Float cpuUsagePercentPerCore) {
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
@@ -133,15 +129,32 @@ public void reportContainerResourceUsage(Container container, Long pmemUsage,
Math.round(cpuUsagePercentPerCore));
entity.addMetric(cpuMetric);
}
- dispatcher.getEventHandler()
- .handle(new TimelinePublishEvent(entity, container.getContainerId()
- .getApplicationAttemptId().getApplicationId()));
+ ApplicationId appId = container.getContainerId().getApplicationAttemptId()
+ .getApplicationId();
+ try {
+ // no need to put it as part of publisher as timeline client already has
+ // Queuing concept
+ TimelineClient timelineClient = getTimelineClient(appId);
+ if (timelineClient != null) {
+ timelineClient.putEntitiesAsync(entity);
+ } else {
+ LOG.error("Seems like client has been removed before the container"
+ + " metric could be published for " + container.getContainerId());
+ }
+ } catch (IOException | YarnException e) {
+ LOG.error("Failed to publish Container metrics for container "
+ + container.getContainerId(), e);
+ }
}
}
- private void publishContainerCreatedEvent(ContainerEntity entity,
- ContainerId containerId, Resource resource, Priority priority,
- long timestamp) {
+ @SuppressWarnings("unchecked")
+ private void publishContainerCreatedEvent(ContainerEvent event) {
+ ContainerId containerId = event.getContainerID();
+ ContainerEntity entity = createContainerEntity(containerId);
+ Container container = context.getContainers().get(containerId);
+ Resource resource = container.getResource();
+
Map entityInfo = new HashMap();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
resource.getMemory());
@@ -152,7 +165,7 @@ private void publishContainerCreatedEvent(ContainerEntity entity,
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
nodeId.getPort());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
- priority.toString());
+ container.getPriority().toString());
entityInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
httpAddress);
@@ -160,13 +173,15 @@ private void publishContainerCreatedEvent(ContainerEntity entity,
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
- tEvent.setTimestamp(timestamp);
+ tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
- entity.setCreatedTime(timestamp);
- putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
+ entity.setCreatedTime(event.getTimestamp());
+ dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+ containerId.getApplicationAttemptId().getApplicationId()));
}
+ @SuppressWarnings("unchecked")
private void publishContainerFinishedEvent(ContainerStatus containerStatus,
long timeStamp) {
ContainerId containerId = containerStatus.getContainerId();
@@ -186,7 +201,38 @@ private void publishContainerFinishedEvent(ContainerStatus containerStatus,
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
- putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
+
+ dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+ containerId.getApplicationAttemptId().getApplicationId()));
+ }
+
+ private void publishContainerLocalizationEvent(
+ ContainerLocalizationEvent event, String eventType) {
+ Container container = event.getContainer();
+ ContainerId containerId = container.getContainerId();
+ TimelineEntity entity = createContainerEntity(containerId);
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(eventType);
+ tEvent.setTimestamp(event.getTimestamp());
+ entity.addEvent(tEvent);
+
+ ApplicationId appId =
+ container.getContainerId().getApplicationAttemptId().getApplicationId();
+ try {
+ // no need to put it as part of publisher as timeline client already has
+ // Queuing concept
+ TimelineClient timelineClient = getTimelineClient(appId);
+ if (timelineClient != null) {
+ timelineClient.putEntitiesAsync(entity);
+ } else {
+ LOG.error("Seems like client has been removed before the event could be"
+ + " published for " + container.getContainerId());
+ }
+ } catch (IOException | YarnException e) {
+ LOG.error("Failed to publish Container metrics for container "
+ + container.getContainerId(), e);
+ }
}
private static ContainerEntity createContainerEntity(
@@ -207,23 +253,33 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) {
LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
- TimelineClient timelineClient =
- context.getApplications().get(appId).getTimelineClient();
- timelineClient.putEntities(entity);
+ TimelineClient timelineClient = getTimelineClient(appId);
+ if (timelineClient != null) {
+ timelineClient.putEntities(entity);
+ } else {
+ LOG.error("Seems like client has been removed before the entity "
+ + "could be published for " + entity);
+ }
} catch (Exception e) {
LOG.error("Error when publishing entity " + entity, e);
}
}
- @SuppressWarnings("unchecked")
public void publishApplicationEvent(ApplicationEvent event) {
// publish only when the desired event is received
switch (event.getType()) {
case INIT_APPLICATION:
case FINISH_APPLICATION:
- case APPLICATION_CONTAINER_FINISHED:
case APPLICATION_LOG_HANDLING_FAILED:
- dispatcher.getEventHandler().handle(event);
+ // TODO need to be handled in future,
+ // not sure to publish under which entity
+ break;
+ case APPLICATION_CONTAINER_FINISHED:
+ // this is actually used to publish the container Event
+ ApplicationContainerFinishedEvent evnt =
+ (ApplicationContainerFinishedEvent) event;
+ publishContainerFinishedEvent(evnt.getContainerStatus(),
+ event.getTimestamp());
break;
default:
@@ -235,12 +291,11 @@ public void publishApplicationEvent(ApplicationEvent event) {
}
}
- @SuppressWarnings("unchecked")
public void publishContainerEvent(ContainerEvent event) {
// publish only when the desired event is received
switch (event.getType()) {
case INIT_CONTAINER:
- dispatcher.getEventHandler().handle(event);
+ publishContainerCreatedEvent(event);
break;
default:
@@ -253,15 +308,17 @@ public void publishContainerEvent(ContainerEvent event) {
}
}
- @SuppressWarnings("unchecked")
public void publishLocalizationEvent(LocalizationEvent event) {
// publish only when the desired event is received
switch (event.getType()) {
case CONTAINER_RESOURCES_LOCALIZED:
- case INIT_CONTAINER_RESOURCES:
- dispatcher.getEventHandler().handle(event);
+ publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
+ ContainerMetricsConstants.LOCALIZATION_FINISHED_EVENT_TYPE);
+ break;
+ case INIT_CONTAINER_RESOURCES:
+ publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
+ ContainerMetricsConstants.LOCALIZATION_START_EVENT_TYPE);
break;
-
default:
if (LOG.isDebugEnabled()) {
LOG.debug(event.getType()
@@ -272,64 +329,6 @@ public void publishLocalizationEvent(LocalizationEvent event) {
}
}
- private class ApplicationEventHandler implements
- EventHandler {
- @Override
- public void handle(ApplicationEvent event) {
- switch (event.getType()) {
- case APPLICATION_CONTAINER_FINISHED:
- // this is actually used to publish the container Event
- ApplicationContainerFinishedEvent evnt =
- (ApplicationContainerFinishedEvent) event;
- publishContainerFinishedEvent(evnt.getContainerStatus(),
- event.getTimestamp());
- break;
- default:
- LOG.error("Seems like event type is captured only in "
- + "publishApplicationEvent method and not handled here");
- break;
- }
- }
- }
-
- private class ContainerEventHandler implements EventHandler {
- @Override
- public void handle(ContainerEvent event) {
- ContainerId containerId = event.getContainerID();
- Container container = context.getContainers().get(containerId);
- long timestamp = event.getTimestamp();
- ContainerEntity entity = createContainerEntity(containerId);
-
- switch (event.getType()) {
- case INIT_CONTAINER:
- publishContainerCreatedEvent(entity, containerId,
- container.getResource(), container.getPriority(), timestamp);
- break;
- default:
- LOG.error("Seems like event type is captured only in "
- + "publishContainerEvent method and not handled here");
- break;
- }
- }
- }
-
- private static final class LocalizationEventDispatcher implements
- EventHandler {
- @Override
- public void handle(LocalizationEvent event) {
- switch (event.getType()) {
- case INIT_CONTAINER_RESOURCES:
- case CONTAINER_RESOURCES_LOCALIZED:
- // TODO after priority based flush jira is finished
- break;
- default:
- LOG.error("Seems like event type is captured only in "
- + "publishLocalizationEvent method and not handled here");
- break;
- }
- }
- }
-
/**
* EventHandler implementation which forward events to NMMetricsPublisher.
* Making use of it, NMMetricsPublisher can avoid to have a public handle
@@ -363,4 +362,33 @@ public TimelineEntity getTimelineEntityToPublish() {
return entityToPublish;
}
}
+
+ public void createTimelineClient(ApplicationId appId) {
+ if (!appToClientMap.containsKey(appId)) {
+ TimelineClient timelineClient =
+ TimelineClient.createTimelineClient(appId);
+ timelineClient.init(getConfig());
+ timelineClient.start();
+ appToClientMap.put(appId, timelineClient);
+ }
+ }
+
+ public void stopTimelineClient(ApplicationId appId) {
+ TimelineClient client = appToClientMap.remove(appId);
+ if (client != null) {
+ client.stop();
+ }
+ }
+
+ public void setTimelineServiceAddress(ApplicationId appId,
+ String collectorAddr) {
+ TimelineClient client = appToClientMap.get(appId);
+ if (client != null) {
+ client.setTimelineServiceAddress(collectorAddr);
+ }
+ }
+
+ private TimelineClient getTimelineClient(ApplicationId appId) {
+ return appToClientMap.get(appId);
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
index 830ed6b5048..4aa28d2dbe7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -20,14 +20,12 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -39,7 +37,6 @@
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.junit.Assert;
@@ -53,20 +50,23 @@ public class TestNMTimelinePublisher {
public void testContainerResourceUsage() {
Context context = mock(Context.class);
@SuppressWarnings("unchecked")
- ConcurrentMap map = mock(ConcurrentMap.class);
- Application aApp = mock(Application.class);
- when(map.get(any(ApplicationId.class))).thenReturn(aApp);
- DummyTimelineClient timelineClient = new DummyTimelineClient();
- when(aApp.getTimelineClient()).thenReturn(timelineClient);
- when(context.getApplications()).thenReturn(map);
+ final DummyTimelineClient timelineClient = new DummyTimelineClient();
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
when(context.getHttpPort()).thenReturn(0);
- NMTimelinePublisher publisher = new NMTimelinePublisher(context);
+ NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
+ public void createTimelineClient(ApplicationId appId) {
+ if (!appToClientMap.containsKey(appId)) {
+ appToClientMap.put(appId, timelineClient);
+ }
+ }
+ };
publisher.init(new Configuration());
publisher.start();
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ publisher.createTimelineClient(appId);
Container aContainer = mock(Container.class);
when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId(
- ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1),
+ ApplicationAttemptId.newInstance(appId, 1),
0L));
publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
@@ -141,7 +141,7 @@ protected static class DummyTimelineClient extends TimelineClientImpl {
private TimelineEntity[] lastPublishedEntities;
@Override
- public void putEntities(TimelineEntity... entities)
+ public void putEntitiesAsync(TimelineEntity... entities)
throws IOException, YarnException {
this.lastPublishedEntities = entities;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
index 4d1be843688..c98304001a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
@@ -101,9 +101,4 @@ public String getFlowVersion() {
public long getFlowRunId() {
return flowRunId;
}
-
- @Override
- public TimelineClient getTimelineClient() {
- return timelineClient;
- }
}