From 649666e118a7cf92b676eaa56a8be318176c443e Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 6 Jun 2019 14:51:55 +0530 Subject: [PATCH] YARN-9573. DistributedShell cannot specify LogAggregationContext. Contributed by Adam Antal. --- .../applications/distributedshell/Client.java | 46 +++++++++++++------ .../TestDistributedShell.java | 27 +++++++++++ 2 files changed, 60 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 4bd57dd27f6..ff0769384c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; @@ -203,7 +204,9 @@ public class Client { private String nodeAttributeSpec = ""; // log4j.properties file // if available, add to local resources and set into classpath - private String log4jPropFile = ""; + private String log4jPropFile = ""; + // rolling + private String rollingFilesPattern = ""; // Start time for client private final long clientStartTime = System.currentTimeMillis(); @@ -280,7 +283,7 @@ public class Client { } if (result) { LOG.info("Application completed successfully"); - System.exit(0); + System.exit(0); } LOG.error("Application failed to complete successfully"); System.exit(2); @@ -344,6 +347,8 @@ public class Client { opts.addOption("enforce_execution_type", false, "Flag to indicate whether to enforce execution type of containers"); opts.addOption("log_properties", true, "log4j.properties file"); + opts.addOption("rolling_log_pattern", true, + "pattern for files that should be aggregated in a rolling fashion"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application " + "attempts." @@ -443,6 +448,10 @@ public class Client { } } + if (cliParser.hasOption("rolling_log_pattern")) { + rollingFilesPattern = cliParser.getOptionValue("rolling_log_pattern"); + } + if (cliParser.hasOption("help")) { printUsage(); return false; @@ -488,7 +497,7 @@ public class Client { if (!cliParser.hasOption("jar")) { throw new IllegalArgumentException("No jar file specified for application master"); - } + } appMasterJar = cliParser.getOptionValue("jar"); @@ -689,16 +698,16 @@ public class Client { + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() + ", queueApplicationCount=" + queueInfo.getApplications().size() - + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); + + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); List listAclInfo = yarnClient.getQueueAclsInfo(); for (QueueUserACLInfo aclInfo : listAclInfo) { for (QueueACL userAcl : aclInfo.getUserAcls()) { LOG.info("User ACL Info for Queue" - + ", queueName=" + aclInfo.getQueueName() + + ", queueName=" + aclInfo.getQueueName() + ", userAcl=" + userAcl.name()); } - } + } if (domainId != null && domainId.length() > 0 && toCreateDomain) { prepareTimelineDomain(); @@ -795,7 +804,7 @@ public class Client { // set local resources for the application master // local files or archives as needed - // In this scenario, the jar file for the application master is part of the local resources + // In this scenario, the jar file for the application master is part of the local resources Map localResources = new HashMap(); LOG.info("Copy App Master jar from local filesystem and add to local environment"); @@ -851,7 +860,7 @@ public class Client { // To do this, we need to first copy into the filesystem that is visible // to the yarn framework. // We do not need to set this as a local resource for the application - // master as the application master does not need it. + // master as the application master does not need it. String hdfsShellScriptLocation = ""; long hdfsShellScriptLen = 0; long hdfsShellScriptTimestamp = 0; @@ -897,7 +906,7 @@ public class Client { env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId); } - // Add AppMaster.jar location to classpath + // Add AppMaster.jar location to classpath // At some point we should not be required to add // the hadoop specific classpaths to the env. // It should be provided out of the box. @@ -1001,7 +1010,7 @@ public class Client { LOG.info("Completed setting up app master command " + command.toString()); List commands = new ArrayList(); - commands.add(command.toString()); + commands.add(command.toString()); // Set up the container launch context for the application master ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( @@ -1062,6 +1071,8 @@ public class Client { // Set the queue to which this application is to be submitted in the RM appContext.setQueue(amQueue); + specifyLogAggregationContext(appContext); + // Submit the application to the applications manager // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); // Ignore the response as either a valid response object is returned on success @@ -1079,6 +1090,15 @@ public class Client { } + @VisibleForTesting + void specifyLogAggregationContext(ApplicationSubmissionContext appContext) { + if (!rollingFilesPattern.isEmpty()) { + LogAggregationContext logAggregationContext = LogAggregationContext + .newInstance(null, null, rollingFilesPattern, ""); + appContext.setLogAggregationContext(logAggregationContext); + } + } + /** * Monitor the submitted application for completion. * Kill application if time expires. @@ -1127,9 +1147,9 @@ public class Client { + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop"); return false; - } + } } - else if (YarnApplicationState.KILLED == state + else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) { LOG.info("Application did not finish." + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() @@ -1163,7 +1183,7 @@ public class Client { // Response can be ignored as it is non-null on success or // throws an exception in case of failures - yarnClient.killApplication(appId); + yarnClient.killApplication(appId); } private void addToLocalResources(FileSystem fs, String fileSrcPath, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 025e543881e..ba7bf7a560d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.applications.distributedshell; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -62,12 +63,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; @@ -97,6 +100,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineW import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; +import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.After; import org.junit.Assert; @@ -958,6 +962,29 @@ public class TestDistributedShell { Assert.assertTrue(LOG_AM.isDebugEnabled()); } + @Test + public void testSpecifyingLogAggregationContext() throws Exception { + String regex = ".*(foo|bar)\\d"; + String[] args = { + "--jar", + APPMASTER_JAR, + "--shell_command", + "echo", + "--rolling_log_pattern", + regex + }; + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + Assert.assertTrue(client.init(args)); + + ApplicationSubmissionContext context = + Records.newRecord(ApplicationSubmissionContext.class); + client.specifyLogAggregationContext(context); + LogAggregationContext logContext = context.getLogAggregationContext(); + assertEquals(logContext.getRolledLogsIncludePattern(), regex); + assertTrue(logContext.getRolledLogsExcludePattern().isEmpty()); + } + public void testDSShellWithCommands() throws Exception { String[] args = {