YARN-3125. Made the distributed shell use timeline service next gen and add an integration test for it. Contributed by Junping Du and Li Lu.

(cherry picked from commit bf08f7f0ed4900ce52f98137297dd1a47ba2a536)
This commit is contained in:
Zhijie Shen 2015-02-27 08:46:42 -08:00 committed by Sangjin Lee
parent 9d57c9c015
commit d45ff878c4
5 changed files with 245 additions and 19 deletions

View File

@ -218,6 +218,8 @@ public class ApplicationMaster {
private int appMasterRpcPort = -1;
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
private boolean newTimelineService = false;
// App Master configuration
// No. of containers to run shell command on
@ -401,7 +403,8 @@ public class ApplicationMaster {
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("timeline_service_version", true,
"Version for timeline service");
opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args);
@ -552,6 +555,30 @@ public class ApplicationMaster {
cliParser.getOptionValue("container_max_retries", "0"));
containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
"container_retry_interval", "0"));
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
if (cliParser.hasOption("timeline_service_version")) {
String timelineServiceVersion =
cliParser.getOptionValue("timeline_service_version", "v1");
if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) {
newTimelineService = false;
} else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) {
newTimelineService = true;
} else {
throw new IllegalArgumentException(
"timeline_service_version is not set properly, should be 'v1' or 'v2'");
}
}
} else {
timelineClient = null;
LOG.warn("Timeline service is not enabled");
if (cliParser.hasOption("timeline_service_version")) {
throw new IllegalArgumentException(
"Timeline service is not enabled");
}
}
return true;
}
@ -599,7 +626,6 @@ public class ApplicationMaster {
UserGroupInformation.createRemoteUser(appSubmitterUserName);
appSubmitterUgi.addCredentials(credentials);
AMRMClientAsync.AbstractCallbackHandler allocListener =
new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
@ -613,8 +639,14 @@ public class ApplicationMaster {
startTimelineClient(conf);
if(timelineClient != null) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
if (newTimelineService) {
publishApplicationAttemptEventOnNewTimelineService(timelineClient,
appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId,
appSubmitterUgi);
} else {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
}
// Setup local RPC Server to accept status requests directly from clients
@ -717,9 +749,15 @@ public class ApplicationMaster {
} catch (InterruptedException ex) {}
}
if(timelineClient != null) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
if (timelineClient != null) {
if (newTimelineService) {
publishApplicationAttemptEventOnNewTimelineService(timelineClient,
appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId,
appSubmitterUgi);
} else {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
}
// Join all launched threads
@ -825,8 +863,13 @@ public class ApplicationMaster {
+ containerStatus.getContainerId());
}
if(timelineClient != null) {
publishContainerEndEvent(
timelineClient, containerStatus, domainId, appSubmitterUgi);
if (newTimelineService) {
publishContainerEndEventOnNewTimelineService(
timelineClient, containerStatus, domainId, appSubmitterUgi);
} else {
publishContainerEndEvent(
timelineClient, containerStatus, domainId, appSubmitterUgi);
}
}
}
@ -952,6 +995,16 @@ public class ApplicationMaster {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
if (applicationMaster.newTimelineService) {
ApplicationMaster.publishContainerStartEventOnNewTimelineService(
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
} else {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
}
}
}
@ -1306,4 +1359,102 @@ public class ApplicationMaster {
shellId);
return new Thread(runnableLaunchContainer);
}
private static void publishContainerStartEventOnNewTimelineService(
final TimelineClient timelineClient, Container container, String domainId,
UserGroupInformation ugi) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
//entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setId(DSEvent.DS_CONTAINER_START.toString());
event.addInfo("Node", container.getNodeId().toString());
event.addInfo("Resources", container.getResource().toString());
entity.addEvent(event);
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
return null;
}
});
} catch (Exception e) {
LOG.error("Container start event could not be published for "
+ container.getId().toString(),
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
private static void publishContainerEndEventOnNewTimelineService(
final TimelineClient timelineClient, ContainerStatus container,
String domainId, UserGroupInformation ugi) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getContainerId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
//entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setId(DSEvent.DS_CONTAINER_END.toString());
event.addInfo("State", container.getState().name());
event.addInfo("Exit Status", container.getExitStatus());
entity.addEvent(event);
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
return null;
}
});
} catch (Exception e) {
LOG.error("Container end event could not be published for "
+ container.getContainerId().toString(),
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
private static void publishApplicationAttemptEventOnNewTimelineService(
final TimelineClient timelineClient, String appAttemptId,
DSEvent appEvent, String domainId, UserGroupInformation ugi) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(appAttemptId);
entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
//entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setId(appEvent.toString());
event.setTimestamp(System.currentTimeMillis());
entity.addEvent(event);
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
return null;
}
});
} catch (Exception e) {
LOG.error("App Attempt "
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+ " event could not be published for "
+ appAttemptId.toString(),
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
}

View File

@ -189,6 +189,8 @@ public class Client {
// Command line options
private Options opts;
private String timelineServiceVersion;
private static final String shellCommandPath = "shellCommands";
private static final String shellArgsPath = "shellArgs";
@ -264,6 +266,7 @@ public class Client {
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("timeline_service_version", true, "Version for timeline service");
opts.addOption("keep_containers_across_application_attempts", false,
"Flag to indicate whether to keep containers across application attempts." +
" If the flag is true, running containers will not be killed when" +
@ -370,6 +373,16 @@ public class Client {
throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
+ " Specified virtual cores=" + amVCores);
}
if (cliParser.hasOption("timeline_service_version")) {
timelineServiceVersion =
cliParser.getOptionValue("timeline_service_version", "v1");
if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") ||
timelineServiceVersion.trim().equalsIgnoreCase("v2"))) {
throw new IllegalArgumentException(
"timeline_service_version is not set properly, should be 'v1' or 'v2'");
}
}
if (!cliParser.hasOption("jar")) {
throw new IllegalArgumentException("No jar file specified for application master");
@ -667,13 +680,16 @@ public class Client {
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
}
if (debugFlag) {
vargs.add("--debug");
}
vargs.addAll(containerRetryOptions);
if (timelineServiceVersion != null) {
vargs.add("--timeline_service_version " + timelineServiceVersion);
}
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
@ -683,7 +699,7 @@ public class Client {
command.append(str).append(" ");
}
LOG.info("Completed setting up app master command " + command.toString());
LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -85,8 +86,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import com.sun.jersey.api.client.ClientHandlerException;
public class TestDistributedShell {
private static final Log LOG =
@ -99,6 +98,7 @@ public class TestDistributedShell {
protected YarnConfiguration conf = null;
private static final int NUM_NMS = 1;
private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_aggregator";
protected final static String APPMASTER_JAR =
JarFinder.getJar(ApplicationMaster.class);
@ -122,11 +122,16 @@ public class TestDistributedShell {
throws Exception {
LOG.info("Starting up YARN cluster");
conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
conf.set("yarn.log.dir", "target");
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
// mark if we need to launch the v1 timeline server
boolean enableATSServer = true;
// disable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
conf.set("mapreduce.jobhistory.address",
@ -148,6 +153,13 @@ public class TestDistributedShell {
PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster);
conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
DistributedShellTimelinePlugin.class.getName());
} else if (timelineVersion == 2.0f) {
// disable v1 timeline server since we no longer have a server here
enableATSServer = false;
// enable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
+ ".class", PerNodeAggregatorServer.class.getName());
} else {
Assert.fail("Wrong timeline version number: " + timelineVersion);
}
@ -155,7 +167,7 @@ public class TestDistributedShell {
if (yarnCluster == null) {
yarnCluster =
new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
numNodeManager, 1, 1);
numNodeManager, 1, 1, enableATSServer);
yarnCluster.init(conf);
yarnCluster.start();
@ -242,6 +254,12 @@ public class TestDistributedShell {
testDSShell(true);
}
@Test(timeout=90000)
@TimelineVersion(2.0f)
public void testDSShellWithoutDomainV2() throws Exception {
testDSShell(false);
}
public void testDSShell(boolean haveDomain) throws Exception {
String[] args = {
"--jar",
@ -269,9 +287,17 @@ public class TestDistributedShell {
"writer_user writer_group",
"--create"
};
List<String> argsList = new ArrayList<String>(Arrays.asList(args));
argsList.addAll(Arrays.asList(domainArgs));
args = argsList.toArray(new String[argsList.size()]);
args = mergeArgs(args, domainArgs);
}
boolean isTestingTimelineV2 = false;
if (timelineVersionWatcher.getTimelineVersion() == 2.0f) {
String[] timelineArgs = {
"--timeline_service_version",
"v2"
};
isTestingTimelineV2 = true;
args = mergeArgs(args, timelineArgs);
LOG.info("Setup: Using timeline v2!");
}
LOG.info("Initializing DS Client");
@ -344,6 +370,15 @@ public class TestDistributedShell {
}
TimelineDomain domain = null;
if (!isTestingTimelineV2) {
checkTimelineV1(haveDomain);
} else {
checkTimelineV2(haveDomain);
}
}
private void checkTimelineV1(boolean haveDomain) throws Exception {
TimelineDomain domain = null;
if (haveDomain) {
domain = yarnCluster.getApplicationHistoryServer()
.getTimelineStore().getDomain("TEST_DOMAIN");
@ -394,6 +429,24 @@ public class TestDistributedShell {
}
}
private void checkTimelineV2(boolean haveDomain) {
// TODO check timeline V2 here after we have a storage layer
}
/**
* Utility function to merge two String arrays to form a new String array for
* our argumemts.
*
* @param args
* @param newArgs
* @return a String array consists of {args, newArgs}
*/
private String[] mergeArgs(String[] args, String[] newArgs) {
List<String> argsList = new ArrayList<String>(Arrays.asList(args));
argsList.addAll(Arrays.asList(newArgs));
return argsList.toArray(new String[argsList.size()]);
}
/*
* NetUtils.getHostname() returns a string in the form "hostname/ip".
* Sometimes the hostname we get is the FQDN and sometimes the short name. In

View File

@ -72,6 +72,12 @@ public class BaseAggregatorService extends CompositeService {
*/
public void postEntities(TimelineEntities entities,
UserGroupInformation callerUgi) {
// Add this output temporarily for our prototype
// TODO remove this after we have an actual implementation
LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE");
LOG.info("postEntities(entities=" + entities + ", callerUgi=" +
callerUgi + ")");
// TODO implement
if (LOG.isDebugEnabled()) {
LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +

View File

@ -118,8 +118,8 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
.setConf(conf)
.addEndpoint(URI.create("http://" + bindAddress));
timelineRestServer = builder.build();
// TODO: replace this by an authentification filter in future.
HashMap<String, String> options = new HashMap<String, String>();
// TODO: replace this by an authentication filter in future.
HashMap<String, String> options = new HashMap<>();
String username = conf.get(HADOOP_HTTP_STATIC_USER,
DEFAULT_HADOOP_HTTP_STATIC_USER);
options.put(HADOOP_HTTP_STATIC_USER, username);