From d5907450463ab2a3436ff4f28918d6253159cd76 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Thu, 11 Jul 2019 19:54:31 +0200 Subject: [PATCH] YARN-9573. DistributedShell cannot specify LogAggregationContext. Contributed by Adam Antal. --- .../applications/distributedshell/Client.java | 47 ++++++++++++++----- .../TestDistributedShell.java | 29 +++++++++++- 2 files changed, 62 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 9c1d8fc21c2..1d225d294b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; @@ -93,6 +94,7 @@ import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -196,7 +198,9 @@ public class Client { private String placementSpec = ""; // log4j.properties file // if available, add to local resources and set into classpath - private String log4jPropFile = ""; + private String log4jPropFile = ""; + // rolling + private String rollingFilesPattern = ""; // Start time for client private final long clientStartTime = System.currentTimeMillis(); @@ -271,7 +275,7 @@ public class Client { } if (result) { LOG.info("Application completed successfully"); - System.exit(0); + System.exit(0); } LOG.error("Application failed to complete successfully"); System.exit(2); @@ -335,6 +339,8 @@ public class Client { opts.addOption("enforce_execution_type", false, "Flag to indicate whether to enforce execution type of containers"); opts.addOption("log_properties", true, "log4j.properties file"); + opts.addOption("rolling_log_pattern", true, + "pattern for files that should be aggregated in a rolling fashion"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application " + "attempts." @@ -432,6 +438,10 @@ public class Client { } } + if (cliParser.hasOption("rolling_log_pattern")) { + rollingFilesPattern = cliParser.getOptionValue("rolling_log_pattern"); + } + if (cliParser.hasOption("help")) { printUsage(); return false; @@ -476,7 +486,7 @@ public class Client { if (!cliParser.hasOption("jar")) { throw new IllegalArgumentException("No jar file specified for application master"); - } + } appMasterJar = cliParser.getOptionValue("jar"); @@ -666,16 +676,16 @@ public class Client { + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() + ", queueApplicationCount=" + queueInfo.getApplications().size() - + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); + + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); List listAclInfo = yarnClient.getQueueAclsInfo(); for (QueueUserACLInfo aclInfo : listAclInfo) { for (QueueACL userAcl : aclInfo.getUserAcls()) { LOG.info("User ACL Info for Queue" - + ", queueName=" + aclInfo.getQueueName() + + ", queueName=" + aclInfo.getQueueName() + ", userAcl=" + userAcl.name()); } - } + } if (domainId != null && domainId.length() > 0 && toCreateDomain) { prepareTimelineDomain(); @@ -772,7 +782,7 @@ public class Client { // set local resources for the application master // local files or archives as needed - // In this scenario, the jar file for the application master is part of the local resources + // In this scenario, the jar file for the application master is part of the local resources Map localResources = new HashMap(); LOG.info("Copy App Master jar from local filesystem and add to local environment"); @@ -793,7 +803,7 @@ public class Client { // To do this, we need to first copy into the filesystem that is visible // to the yarn framework. // We do not need to set this as a local resource for the application - // master as the application master does not need it. + // master as the application master does not need it. String hdfsShellScriptLocation = ""; long hdfsShellScriptLen = 0; long hdfsShellScriptTimestamp = 0; @@ -837,7 +847,7 @@ public class Client { env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId); } - // Add AppMaster.jar location to classpath + // Add AppMaster.jar location to classpath // At some point we should not be required to add // the hadoop specific classpaths to the env. // It should be provided out of the box. @@ -935,7 +945,7 @@ public class Client { LOG.info("Completed setting up app master command " + command.toString()); List commands = new ArrayList(); - commands.add(command.toString()); + commands.add(command.toString()); // Set up the container launch context for the application master ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( @@ -996,6 +1006,8 @@ public class Client { // Set the queue to which this application is to be submitted in the RM appContext.setQueue(amQueue); + specifyLogAggregationContext(appContext); + // Submit the application to the applications manager // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); // Ignore the response as either a valid response object is returned on success @@ -1013,6 +1025,15 @@ public class Client { } + @VisibleForTesting + void specifyLogAggregationContext(ApplicationSubmissionContext appContext) { + if (!rollingFilesPattern.isEmpty()) { + LogAggregationContext logAggregationContext = LogAggregationContext + .newInstance(null, null, rollingFilesPattern, ""); + appContext.setLogAggregationContext(logAggregationContext); + } + } + /** * Monitor the submitted application for completion. * Kill application if time expires. @@ -1061,9 +1082,9 @@ public class Client { + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop"); return false; - } + } } - else if (YarnApplicationState.KILLED == state + else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) { LOG.info("Application did not finish." + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() @@ -1097,7 +1118,7 @@ public class Client { // Response can be ignored as it is non-null on success or // throws an exception in case of failures - yarnClient.killApplication(appId); + yarnClient.killApplication(appId); } private void addToLocalResources(FileSystem fs, String fileSrcPath, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 187d13bbd4e..b1358b3ee1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -17,7 +17,8 @@ */ package org.apache.hadoop.yarn.applications.distributedshell; - +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -60,12 +61,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; @@ -95,6 +98,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineW import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; +import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.After; import org.junit.Assert; @@ -956,6 +960,29 @@ public class TestDistributedShell { Assert.assertTrue(LOG_AM.isDebugEnabled()); } + @Test + public void testSpecifyingLogAggregationContext() throws Exception { + String regex = ".*(foo|bar)\\d"; + String[] args = { + "--jar", + APPMASTER_JAR, + "--shell_command", + "echo", + "--rolling_log_pattern", + regex + }; + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + Assert.assertTrue(client.init(args)); + + ApplicationSubmissionContext context = + Records.newRecord(ApplicationSubmissionContext.class); + client.specifyLogAggregationContext(context); + LogAggregationContext logContext = context.getLogAggregationContext(); + assertEquals(logContext.getRolledLogsIncludePattern(), regex); + assertTrue(logContext.getRolledLogsExcludePattern().isEmpty()); + } + public void testDSShellWithCommands() throws Exception { String[] args = {