YARN-9573. DistributedShell cannot specify LogAggregationContext. Contributed by Adam Antal.
This commit is contained in:
parent
9c3806cf1b
commit
d590745046
|
@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
|
@ -93,6 +94,7 @@ import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
|||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -196,7 +198,9 @@ public class Client {
|
|||
private String placementSpec = "";
|
||||
// log4j.properties file
|
||||
// if available, add to local resources and set into classpath
|
||||
private String log4jPropFile = "";
|
||||
private String log4jPropFile = "";
|
||||
// rolling
|
||||
private String rollingFilesPattern = "";
|
||||
|
||||
// Start time for client
|
||||
private final long clientStartTime = System.currentTimeMillis();
|
||||
|
@ -271,7 +275,7 @@ public class Client {
|
|||
}
|
||||
if (result) {
|
||||
LOG.info("Application completed successfully");
|
||||
System.exit(0);
|
||||
System.exit(0);
|
||||
}
|
||||
LOG.error("Application failed to complete successfully");
|
||||
System.exit(2);
|
||||
|
@ -335,6 +339,8 @@ public class Client {
|
|||
opts.addOption("enforce_execution_type", false,
|
||||
"Flag to indicate whether to enforce execution type of containers");
|
||||
opts.addOption("log_properties", true, "log4j.properties file");
|
||||
opts.addOption("rolling_log_pattern", true,
|
||||
"pattern for files that should be aggregated in a rolling fashion");
|
||||
opts.addOption("keep_containers_across_application_attempts", false,
|
||||
"Flag to indicate whether to keep containers across application "
|
||||
+ "attempts."
|
||||
|
@ -432,6 +438,10 @@ public class Client {
|
|||
}
|
||||
}
|
||||
|
||||
if (cliParser.hasOption("rolling_log_pattern")) {
|
||||
rollingFilesPattern = cliParser.getOptionValue("rolling_log_pattern");
|
||||
}
|
||||
|
||||
if (cliParser.hasOption("help")) {
|
||||
printUsage();
|
||||
return false;
|
||||
|
@ -476,7 +486,7 @@ public class Client {
|
|||
|
||||
if (!cliParser.hasOption("jar")) {
|
||||
throw new IllegalArgumentException("No jar file specified for application master");
|
||||
}
|
||||
}
|
||||
|
||||
appMasterJar = cliParser.getOptionValue("jar");
|
||||
|
||||
|
@ -666,16 +676,16 @@ public class Client {
|
|||
+ ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
|
||||
+ ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
|
||||
+ ", queueApplicationCount=" + queueInfo.getApplications().size()
|
||||
+ ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
|
||||
+ ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
|
||||
|
||||
List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
|
||||
for (QueueUserACLInfo aclInfo : listAclInfo) {
|
||||
for (QueueACL userAcl : aclInfo.getUserAcls()) {
|
||||
LOG.info("User ACL Info for Queue"
|
||||
+ ", queueName=" + aclInfo.getQueueName()
|
||||
+ ", queueName=" + aclInfo.getQueueName()
|
||||
+ ", userAcl=" + userAcl.name());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (domainId != null && domainId.length() > 0 && toCreateDomain) {
|
||||
prepareTimelineDomain();
|
||||
|
@ -772,7 +782,7 @@ public class Client {
|
|||
|
||||
// set local resources for the application master
|
||||
// local files or archives as needed
|
||||
// In this scenario, the jar file for the application master is part of the local resources
|
||||
// In this scenario, the jar file for the application master is part of the local resources
|
||||
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
|
||||
|
||||
LOG.info("Copy App Master jar from local filesystem and add to local environment");
|
||||
|
@ -793,7 +803,7 @@ public class Client {
|
|||
// To do this, we need to first copy into the filesystem that is visible
|
||||
// to the yarn framework.
|
||||
// We do not need to set this as a local resource for the application
|
||||
// master as the application master does not need it.
|
||||
// master as the application master does not need it.
|
||||
String hdfsShellScriptLocation = "";
|
||||
long hdfsShellScriptLen = 0;
|
||||
long hdfsShellScriptTimestamp = 0;
|
||||
|
@ -837,7 +847,7 @@ public class Client {
|
|||
env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
|
||||
}
|
||||
|
||||
// Add AppMaster.jar location to classpath
|
||||
// Add AppMaster.jar location to classpath
|
||||
// At some point we should not be required to add
|
||||
// the hadoop specific classpaths to the env.
|
||||
// It should be provided out of the box.
|
||||
|
@ -935,7 +945,7 @@ public class Client {
|
|||
|
||||
LOG.info("Completed setting up app master command " + command.toString());
|
||||
List<String> commands = new ArrayList<String>();
|
||||
commands.add(command.toString());
|
||||
commands.add(command.toString());
|
||||
|
||||
// Set up the container launch context for the application master
|
||||
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
|
||||
|
@ -996,6 +1006,8 @@ public class Client {
|
|||
// Set the queue to which this application is to be submitted in the RM
|
||||
appContext.setQueue(amQueue);
|
||||
|
||||
specifyLogAggregationContext(appContext);
|
||||
|
||||
// Submit the application to the applications manager
|
||||
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
|
||||
// Ignore the response as either a valid response object is returned on success
|
||||
|
@ -1013,6 +1025,15 @@ public class Client {
|
|||
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void specifyLogAggregationContext(ApplicationSubmissionContext appContext) {
|
||||
if (!rollingFilesPattern.isEmpty()) {
|
||||
LogAggregationContext logAggregationContext = LogAggregationContext
|
||||
.newInstance(null, null, rollingFilesPattern, "");
|
||||
appContext.setLogAggregationContext(logAggregationContext);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Monitor the submitted application for completion.
|
||||
* Kill application if time expires.
|
||||
|
@ -1061,9 +1082,9 @@ public class Client {
|
|||
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
|
||||
+ ". Breaking monitoring loop");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (YarnApplicationState.KILLED == state
|
||||
else if (YarnApplicationState.KILLED == state
|
||||
|| YarnApplicationState.FAILED == state) {
|
||||
LOG.info("Application did not finish."
|
||||
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
|
||||
|
@ -1097,7 +1118,7 @@ public class Client {
|
|||
|
||||
// Response can be ignored as it is non-null on success or
|
||||
// throws an exception in case of failures
|
||||
yarnClient.killApplication(appId);
|
||||
yarnClient.killApplication(appId);
|
||||
}
|
||||
|
||||
private void addToLocalResources(FileSystem fs, String fileSrcPath,
|
||||
|
|
|
@ -17,7 +17,8 @@
|
|||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.applications.distributedshell;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -60,12 +61,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
|
@ -95,6 +98,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineW
|
|||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
|
||||
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -956,6 +960,29 @@ public class TestDistributedShell {
|
|||
Assert.assertTrue(LOG_AM.isDebugEnabled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpecifyingLogAggregationContext() throws Exception {
|
||||
String regex = ".*(foo|bar)\\d";
|
||||
String[] args = {
|
||||
"--jar",
|
||||
APPMASTER_JAR,
|
||||
"--shell_command",
|
||||
"echo",
|
||||
"--rolling_log_pattern",
|
||||
regex
|
||||
};
|
||||
final Client client =
|
||||
new Client(new Configuration(yarnCluster.getConfig()));
|
||||
Assert.assertTrue(client.init(args));
|
||||
|
||||
ApplicationSubmissionContext context =
|
||||
Records.newRecord(ApplicationSubmissionContext.class);
|
||||
client.specifyLogAggregationContext(context);
|
||||
LogAggregationContext logContext = context.getLogAggregationContext();
|
||||
assertEquals(logContext.getRolledLogsIncludePattern(), regex);
|
||||
assertTrue(logContext.getRolledLogsExcludePattern().isEmpty());
|
||||
}
|
||||
|
||||
public void testDSShellWithCommands() throws Exception {
|
||||
|
||||
String[] args = {
|
||||
|
|
Loading…
Reference in New Issue