YARN-9573. DistributedShell cannot specify LogAggregationContext. Contributed by Adam Antal.

This commit is contained in:
Sunil G 2019-06-06 14:51:55 +05:30
parent ec26c431f9
commit 649666e118
2 changed files with 60 additions and 13 deletions

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
@ -204,6 +205,8 @@ public class Client {
// log4j.properties file // log4j.properties file
// if available, add to local resources and set into classpath // if available, add to local resources and set into classpath
private String log4jPropFile = ""; private String log4jPropFile = "";
// rolling
private String rollingFilesPattern = "";
// Start time for client // Start time for client
private final long clientStartTime = System.currentTimeMillis(); private final long clientStartTime = System.currentTimeMillis();
@ -344,6 +347,8 @@ public class Client {
opts.addOption("enforce_execution_type", false, opts.addOption("enforce_execution_type", false,
"Flag to indicate whether to enforce execution type of containers"); "Flag to indicate whether to enforce execution type of containers");
opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("rolling_log_pattern", true,
"pattern for files that should be aggregated in a rolling fashion");
opts.addOption("keep_containers_across_application_attempts", false, opts.addOption("keep_containers_across_application_attempts", false,
"Flag to indicate whether to keep containers across application " "Flag to indicate whether to keep containers across application "
+ "attempts." + "attempts."
@ -443,6 +448,10 @@ public class Client {
} }
} }
if (cliParser.hasOption("rolling_log_pattern")) {
rollingFilesPattern = cliParser.getOptionValue("rolling_log_pattern");
}
if (cliParser.hasOption("help")) { if (cliParser.hasOption("help")) {
printUsage(); printUsage();
return false; return false;
@ -1062,6 +1071,8 @@ public class Client {
// Set the queue to which this application is to be submitted in the RM // Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue); appContext.setQueue(amQueue);
specifyLogAggregationContext(appContext);
// Submit the application to the applications manager // Submit the application to the applications manager
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
// Ignore the response as either a valid response object is returned on success // Ignore the response as either a valid response object is returned on success
@ -1079,6 +1090,15 @@ public class Client {
} }
@VisibleForTesting
void specifyLogAggregationContext(ApplicationSubmissionContext appContext) {
if (!rollingFilesPattern.isEmpty()) {
LogAggregationContext logAggregationContext = LogAggregationContext
.newInstance(null, null, rollingFilesPattern, "");
appContext.setLogAggregationContext(logAggregationContext);
}
}
/** /**
* Monitor the submitted application for completion. * Monitor the submitted application for completion.
* Kill application if time expires. * Kill application if time expires.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.applications.distributedshell; package org.apache.hadoop.yarn.applications.distributedshell;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -62,12 +63,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
@ -97,6 +100,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineW
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -958,6 +962,29 @@ public class TestDistributedShell {
Assert.assertTrue(LOG_AM.isDebugEnabled()); Assert.assertTrue(LOG_AM.isDebugEnabled());
} }
@Test
public void testSpecifyingLogAggregationContext() throws Exception {
String regex = ".*(foo|bar)\\d";
String[] args = {
"--jar",
APPMASTER_JAR,
"--shell_command",
"echo",
"--rolling_log_pattern",
regex
};
final Client client =
new Client(new Configuration(yarnCluster.getConfig()));
Assert.assertTrue(client.init(args));
ApplicationSubmissionContext context =
Records.newRecord(ApplicationSubmissionContext.class);
client.specifyLogAggregationContext(context);
LogAggregationContext logContext = context.getLogAggregationContext();
assertEquals(logContext.getRolledLogsIncludePattern(), regex);
assertTrue(logContext.getRolledLogsExcludePattern().isEmpty());
}
public void testDSShellWithCommands() throws Exception { public void testDSShellWithCommands() throws Exception {
String[] args = { String[] args = {