From e39ccf3b2410e32271680ca21d37e5d385592193 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Wed, 19 Mar 2014 03:51:27 +0000 Subject: [PATCH] YARN-1690. Made DistributedShell send timeline entities+events. Contributed by Mayank Bansal. svn merge --ignore-ancestry -c 1579123 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1579124 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../distributedshell/ApplicationMaster.java | 107 +++++++++++++++++- .../TestDistributedShell.java | 38 ++++++- .../ApplicationHistoryServer.java | 9 +- .../hadoop/yarn/server/MiniYARNCluster.java | 69 +++++++++++ 5 files changed, 222 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1813242b256..6742fb7bcde 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -160,6 +160,9 @@ Release 2.4.0 - UNRELEASED YARN-1705. Reset cluster-metrics on transition to standby. (Rohith via kasha) + YARN-1690. Made DistributedShell send timeline entities+events. (Mayank Bansal + via zjshen) + IMPROVEMENTS YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index a62b5e64044..88ae207fe8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -45,6 +45,7 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -55,6 +56,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -80,7 +82,10 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; @@ -90,6 +95,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; @@ -160,6 +166,18 @@ public class ApplicationMaster { private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + @VisibleForTesting + @Private + public static enum DSEvent { + DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END + } + + @VisibleForTesting + @Private + public static enum DSEntity { + DS_APP_ATTEMPT, DS_CONTAINER + } + // Configuration private Configuration conf; @@ -242,6 +260,9 @@ public class ApplicationMaster { // Launch threads private List launchThreads = new ArrayList(); + // Timeline Client + private TimelineClient timelineClient; + private final String linux_bash_command = "bash"; private final String windows_command = "cmd /c"; @@ -261,7 +282,8 @@ public class ApplicationMaster { result = appMaster.finish(); } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); - System.exit(1); + LogManager.shutdown(); + ExitUtil.terminate(1, t); } if (result) { LOG.info("Application Master completed successfully. exiting"); @@ -316,7 +338,6 @@ public class ApplicationMaster { * @throws IOException */ public boolean init(String[] args) throws ParseException, IOException { - Options opts = new Options(); opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes"); @@ -464,6 +485,11 @@ public class ApplicationMaster { requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + // Creating the Timeline Client + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineClient.start(); + return true; } @@ -485,6 +511,13 @@ public class ApplicationMaster { @SuppressWarnings({ "unchecked" }) public void run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); + try { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_START); + } catch (Exception e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString(), e); + } Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); @@ -564,6 +597,13 @@ public class ApplicationMaster { amRMClient.addContainerRequest(containerAsk); } numRequestedContainers.set(numTotalContainersToRequest); + try { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_END); + } catch (Exception e) { + LOG.error("App Attempt start event coud not be pulished for " + + appAttemptID.toString(), e); + } } @VisibleForTesting @@ -668,6 +708,12 @@ public class ApplicationMaster { LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); } + try { + publishContainerEndEvent(timelineClient, containerStatus); + } catch (Exception e) { + LOG.error("Container start event could not be pulished for " + + containerStatus.getContainerId().toString(), e); + } } // ask for more containers if any failed @@ -782,6 +828,13 @@ public class ApplicationMaster { if (container != null) { applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } + try { + ApplicationMaster.publishContainerStartEvent( + applicationMaster.timelineClient, container); + } catch (Exception e) { + LOG.error("Container start event coud not be pulished for " + + container.getId().toString(), e); + } } @Override @@ -968,4 +1021,54 @@ public class ApplicationMaster { org.apache.commons.io.IOUtils.closeQuietly(ds); } } + + private static void publishContainerStartEvent(TimelineClient timelineClient, + Container container) throws IOException, YarnException { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(container.getId().toString()); + entity.setEntityType(DSEntity.DS_CONTAINER.toString()); + entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() + .toString()); + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(DSEvent.DS_CONTAINER_START.toString()); + event.addEventInfo("Node", container.getNodeId().toString()); + event.addEventInfo("Resources", container.getResource().toString()); + entity.addEvent(event); + + timelineClient.putEntities(entity); + } + + private static void publishContainerEndEvent(TimelineClient timelineClient, + ContainerStatus container) throws IOException, YarnException { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(container.getContainerId().toString()); + entity.setEntityType(DSEntity.DS_CONTAINER.toString()); + entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() + .toString()); + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(DSEvent.DS_CONTAINER_END.toString()); + event.addEventInfo("State", container.getState().name()); + event.addEventInfo("Exit Status", container.getExitStatus()); + entity.addEvent(event); + + timelineClient.putEntities(entity); + } + + private static void publishApplicationAttemptEvent( + TimelineClient timelineClient, String appAttemptId, DSEvent appEvent) + throws IOException, YarnException { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(appAttemptId); + entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); + entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() + .toString()); + TimelineEvent event = new TimelineEvent(); + event.setEventType(appEvent.toString()); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + + timelineClient.putEntities(entity); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index fdff89e279e..13317740cfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -36,11 +36,14 @@ import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -68,6 +71,7 @@ public class TestDistributedShell { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); + conf.set("yarn.log.dir", "target"); if (yarnCluster == null) { yarnCluster = new MiniYARNCluster( TestDistributedShell.class.getSimpleName(), 1, 1, 1); @@ -92,6 +96,12 @@ public class TestDistributedShell { os.write(bytesOut.toByteArray()); os.close(); } + FileContext fsContext = FileContext.getLocalFSFileContext(); + fsContext + .delete( + new Path(conf + .get("yarn.timeline-service.leveldb-timeline-store.path")), + true); try { Thread.sleep(2000); } catch (InterruptedException e) { @@ -108,6 +118,12 @@ public class TestDistributedShell { yarnCluster = null; } } + FileContext fsContext = FileContext.getLocalFSFileContext(); + fsContext + .delete( + new Path(conf + .get("yarn.timeline-service.leveldb-timeline-store.path")), + true); } @Test(timeout=90000) @@ -171,7 +187,27 @@ public class TestDistributedShell { t.join(); LOG.info("Client run completed. Result=" + result); Assert.assertTrue(result.get()); - + + TimelineEntities entitiesAttempts = yarnCluster + .getApplicationHistoryServer() + .getTimelineStore() + .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(), + null, null, null, null, null, null); + Assert.assertNotNull(entitiesAttempts); + Assert.assertEquals(1, entitiesAttempts.getEntities().size()); + Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents() + .size()); + Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType() + .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString()); + TimelineEntities entities = yarnCluster + .getApplicationHistoryServer() + .getTimelineStore() + .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null, + null, null, null, null, null); + Assert.assertNotNull(entities); + Assert.assertEquals(2, entities.getEntities().size()); + Assert.assertEquals(entities.getEntities().get(0).getEntityType() + .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString()); } @Test(timeout=90000) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java index 89e13f487df..731ae14319d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -169,5 +169,12 @@ public class ApplicationHistoryServer extends CompositeService { throw new YarnRuntimeException(msg, e); } } - + /** + * @return ApplicationTimelineStore + */ + @Private + @VisibleForTesting + public TimelineStore getTimelineStore() { + return timelineStore; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index ff2e995ae5f..1576d064a49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; @@ -100,6 +101,9 @@ public class MiniYARNCluster extends CompositeService { private ResourceManager[] resourceManagers; private String[] rmIds; + private ApplicationHistoryServer appHistoryServer; + private ApplicationHistoryServerWrapper appHistoryServerWrapper; + private boolean useFixedPorts; private boolean useRpc = false; private int failoverTimeout; @@ -241,6 +245,8 @@ public class MiniYARNCluster extends CompositeService { addService(new NodeManagerWrapper(index)); } + addService(new ApplicationHistoryServerWrapper()); + super.serviceInit( conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } @@ -649,4 +655,67 @@ public class MiniYARNCluster extends CompositeService { } return false; } + + private class ApplicationHistoryServerWrapper extends AbstractService { + public ApplicationHistoryServerWrapper() { + super(ApplicationHistoryServerWrapper.class.getName()); + } + + @Override + protected synchronized void serviceInit(Configuration conf) + throws Exception { + if (!conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, + YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) { + conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS); + conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); + } + appHistoryServer = new ApplicationHistoryServer(); + appHistoryServer.init(conf); + super.serviceInit(conf); + } + + @Override + protected synchronized void serviceStart() throws Exception { + try { + new Thread() { + public void run() { + appHistoryServer.start(); + }; + }.start(); + int waitCount = 0; + while (appHistoryServer.getServiceState() == STATE.INITED + && waitCount++ < 60) { + LOG.info("Waiting for Timeline Server to start..."); + Thread.sleep(1500); + } + if (appHistoryServer.getServiceState() != STATE.STARTED) { + // AHS could have failed. + throw new IOException( + "ApplicationHistoryServer failed to start. Final state is " + + appHistoryServer.getServiceState()); + } + super.serviceStart(); + } catch (Throwable t) { + throw new YarnRuntimeException(t); + } + LOG.info("MiniYARN ApplicationHistoryServer address: " + + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS)); + LOG.info("MiniYARN ApplicationHistoryServer web address: " + + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS)); + } + + @Override + protected synchronized void serviceStop() throws Exception { + if (appHistoryServer != null) { + appHistoryServer.stop(); + } + super.serviceStop(); + } + } + + public ApplicationHistoryServer getApplicationHistoryServer() { + return this.appHistoryServer; + } }