diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 3bffc2b7e46..3ac1547206a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -497,7 +497,7 @@ public boolean init(String[] args) throws ParseException { } if (cliParser.hasOption("flow_run_id")) { try { - flowRunId = Long.valueOf(cliParser.getOptionValue("flow_run_id")); + flowRunId = Long.parseLong(cliParser.getOptionValue("flow_run_id")); } catch (NumberFormatException e) { throw new IllegalArgumentException( "Flow run is not a valid long value", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index aac8c4bf31e..f977cf01a10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -129,7 +130,6 @@ protected void setupInternal(int numNodeManager) throws Exception { private void setupInternal(int numNodeManager, float timelineVersion) throws Exception { - LOG.info("Starting up YARN cluster"); conf = new YarnConfiguration(); @@ -140,7 +140,6 @@ private void setupInternal(int numNodeManager, float timelineVersion) boolean enableATSServer = true; // disable aux-service based timeline aggregators conf.set(YarnConfiguration.NM_AUX_SERVICES, ""); - conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8"); conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); @@ -155,7 +154,9 @@ private void setupInternal(int numNodeManager, float timelineVersion) conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, true); - conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, + true); + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, false); // ATS version specific settings if (timelineVersion == 1.0f) { @@ -180,6 +181,9 @@ private void setupInternal(int numNodeManager, float timelineVersion) conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME + ".class", PerNodeTimelineCollectorsAuxService.class.getName()); + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, + false); } else { Assert.fail("Wrong timeline version number: " + timelineVersion); } @@ -187,7 +191,7 @@ private void setupInternal(int numNodeManager, float timelineVersion) if (yarnCluster == null) { yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, - numNodeManager, 1, 1, enableATSServer); + numNodeManager, 1, 1); yarnCluster.init(conf); yarnCluster.start(); @@ -390,13 +394,15 @@ public void run() { if (checkHostname(appReport.getHost()) && appReport.getRpcPort() == -1) { verified = true; } - if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED) { + + if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED + && appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) { break; } } Assert.assertTrue(errorMessage, verified); t.join(); - LOG.info("Client run completed. Result=" + result); + LOG.info("Client run completed for testDSShell. Result=" + result); Assert.assertTrue(result.get()); if (timelineVersionWatcher.getTimelineVersion() == 1.5f) { @@ -477,9 +483,9 @@ private void checkTimelineV1(boolean haveDomain) throws Exception { } } - private void checkTimelineV2( - boolean haveDomain, ApplicationId appId, boolean defaultFlow) - throws Exception { + private void checkTimelineV2(boolean haveDomain, ApplicationId appId, + boolean defaultFlow) throws Exception { + LOG.info("Started checkTimelineV2 "); // For PoC check in /tmp/timeline_service_data YARN-3264 String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT @@ -530,12 +536,29 @@ private void checkTimelineV2( verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_APPLICATION.toString(), appMetricsTimestampFileName); - verifyStringExistsSpecifiedTimes(appEntityFile, - ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, - "Application created event should be published atleast once"); - verifyStringExistsSpecifiedTimes(appEntityFile, - ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, - "Application finished event should be published atleast once"); + Assert.assertEquals( + "Application created event should be published atleast once", + 1, + getNumOfStringOccurences(appEntityFile, + ApplicationMetricsConstants.CREATED_EVENT_TYPE)); + + // to avoid race condition of testcase, atleast check 4 times with sleep + // of 500ms + long numOfStringOccurences = 0; + for (int i = 0; i < 4; i++) { + numOfStringOccurences = + getNumOfStringOccurences(appEntityFile, + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + if (numOfStringOccurences > 0) { + break; + } else { + Thread.sleep(500l); + } + } + Assert.assertEquals( + "Application finished event should be published atleast once", + 1, + numOfStringOccurences); // Verify RM posting AppAttempt life cycle Events are getting published String appAttemptMetricsTimestampFileName = @@ -546,12 +569,17 @@ private void checkTimelineV2( verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttemptMetricsTimestampFileName); - verifyStringExistsSpecifiedTimes(appAttemptEntityFile, - AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, - "AppAttempt register event should be published atleast once"); - verifyStringExistsSpecifiedTimes(appAttemptEntityFile, - AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, - "AppAttempt finished event should be published atleast once"); + Assert.assertEquals( + "AppAttempt register event should be published atleast once", + 1, + getNumOfStringOccurences(appAttemptEntityFile, + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)); + + Assert.assertEquals( + "AppAttempt finished event should be published atleast once", + 1, + getNumOfStringOccurences(appAttemptEntityFile, + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)); } finally { FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); } @@ -570,8 +598,7 @@ private File verifyEntityTypeFileExists(String basePath, String entityType, return entityFile; } - private void verifyStringExistsSpecifiedTimes(File entityFile, - String searchString, long expectedNumOfTimes, String errorMsg) + private long getNumOfStringOccurences(File entityFile, String searchString) throws IOException { BufferedReader reader = null; String strLine; @@ -585,7 +612,7 @@ private void verifyStringExistsSpecifiedTimes(File entityFile, } finally { reader.close(); } - Assert.assertEquals(errorMsg, expectedNumOfTimes, actualCount); + return actualCount; } /** @@ -1261,4 +1288,3 @@ private int verifyContainerLog(int containerNum, return numOfWords; } } - diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java index b62b09122d5..c55f20216b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java @@ -30,7 +30,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import com.google.common.collect.ImmutableMap; @@ -40,7 +42,7 @@ public class TestDistributedShellWithNodeLabels { static final int NUM_NMS = 2; TestDistributedShell distShellTest; - + @Before public void setup() throws Exception { distShellTest = new TestDistributedShell(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 29883616d90..00968795db0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -418,6 +418,14 @@ public void putObjects(String path, MultivaluedMap params, // timelineServiceAddress could haven't be initialized yet // or stale (only for new timeline service) int retries = pollTimelineServiceAddress(this.maxServiceRetries); + if (timelineServiceAddress == null) { + String errMessage = "TimelineClient has reached to max retry times : " + + this.maxServiceRetries + + ", but failed to fetch timeline service address. Please verify" + + " Timeline Auxillary Service is configured in all the NMs"; + LOG.error(errMessage); + throw new YarnException(errMessage); + } // timelineServiceAddress could be stale, add retry logic here. boolean needRetry = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 7a3745d2116..036fdf5575a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -130,11 +130,11 @@ public ApplicationImpl(Dispatcher dispatcher, String user, String flowId, context, -1); Configuration conf = context.getConf(); if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) { - createAndStartTimelienClient(conf); + createAndStartTimelineClient(conf); } } - private void createAndStartTimelienClient(Configuration conf) { + private void createAndStartTimelineClient(Configuration conf) { // create and start timeline client this.timelineClient = TimelineClient.createTimelineClient(appId); timelineClient.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index c1fa72e2857..ce001291103 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -96,11 +96,8 @@ public class ContainersMonitorImpl extends AbstractService implements // For posting entities in new timeline service in a non-blocking way // TODO replace with event loop in TimelineClient. - private static ExecutorService threadPool = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); - + private static ExecutorService threadPool; + @Private public static enum ContainerMetric { CPU, MEMORY @@ -225,6 +222,10 @@ protected void serviceInit(Configuration conf) throws Exception { if (publishContainerMetricsToTimelineService) { LOG.info("NodeManager has been configured to publish container " + "metrics to Timeline Service V2."); + threadPool = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); } else { LOG.warn("NodeManager has not been configured to publish container " + "metrics to Timeline Service V2."); @@ -280,6 +281,9 @@ protected void serviceStop() throws Exception { // TODO remove threadPool after adding non-blocking call in TimelineClient private static void shutdownAndAwaitTermination() { + if (threadPool == null) { + return; + } threadPool.shutdown(); try { if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { @@ -689,7 +693,6 @@ public void run() { timelineClient.putEntities(entity); } catch (IOException|YarnException e) { LOG.error("putEntityNonBlocking get failed: " + e); - throw new RuntimeException(e.toString()); } } }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java index 7d1b6574494..116bf64ca74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java @@ -56,7 +56,7 @@ public void postPut(ApplicationId appId, TimelineCollector collector) { if (parts.length != 2 || parts[1].isEmpty()) { continue; } - switch (parts[0]) { + switch (parts[0].toUpperCase()) { case TimelineUtils.FLOW_NAME_TAG_PREFIX: collector.getTimelineEntityContext().setFlowName(parts[1]); break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 31051db1687..851ac30f28e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -48,21 +48,11 @@ import com.google.common.annotations.VisibleForTesting; - -/** - * - * It is a singleton, and instances should be obtained via - * {@link #getInstance()}. - * - */ @Private @Unstable public class NodeTimelineCollectorManager extends TimelineCollectorManager { private static final Log LOG = LogFactory.getLog(NodeTimelineCollectorManager.class); - private static final NodeTimelineCollectorManager INSTANCE = - new NodeTimelineCollectorManager(); - // REST server for this collector manager private HttpServer2 timelineRestServer; @@ -73,10 +63,6 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; - static NodeTimelineCollectorManager getInstance() { - return INSTANCE; - } - @VisibleForTesting protected NodeTimelineCollectorManager() { super(NodeTimelineCollectorManager.class.getName()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 36ff5c0ab70..3ede97aefed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -56,8 +56,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { private final NodeTimelineCollectorManager collectorManager; public PerNodeTimelineCollectorsAuxService() { - // use the same singleton - this(NodeTimelineCollectorManager.getInstance()); + this(new NodeTimelineCollectorManager()); } @VisibleForTesting PerNodeTimelineCollectorsAuxService( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index d54715cc4a4..23ad4f4f9b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -128,7 +128,7 @@ public boolean remove(ApplicationId appId) { postRemove(appId, collector); // stop the service to do clean up collector.stop(); - LOG.info("the collector service for " + appId + " was removed"); + LOG.info("The collector service for " + appId + " was removed"); } return collector != null; }