YARN-4356. Ensure the timeline service v.2 is disabled cleanly and has no

impact when it's turned off. Contributed by Sangjin Lee.
This commit is contained in:
Li Lu 2015-12-11 11:17:34 -08:00 committed by Sangjin Lee
parent cf4666bb8c
commit 89e5c44f9e
44 changed files with 527 additions and 372 deletions

View File

@ -19,9 +19,6 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@ -31,7 +28,11 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -57,9 +58,9 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@ -81,11 +82,10 @@ import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a
@ -133,20 +133,17 @@ public class JobHistoryEventHandler extends AbstractService
protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
// For posting entities in new timeline service in a non-blocking way
// TODO YARN-3367 replace with event loop in TimelineClient.
private static ExecutorService threadPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
// should job completion be force when the AM shuts down?
protected volatile boolean forceJobCompletion = false;
protected TimelineClient timelineClient;
private boolean newTimelineServiceEnabled = false;
private boolean timelineServiceV2Enabled = false;
// For posting entities in new timeline service in a non-blocking way
// TODO YARN-3367 replace with event loop in TimelineClient.
private ExecutorService threadPool;
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
@ -276,22 +273,26 @@ public class JobHistoryEventHandler extends AbstractService
// configuration status: off, on_with_v1 or on_with_v2.
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
LOG.info("Emitting job history data to the timeline service is enabled");
if (YarnConfiguration.timelineServiceEnabled(conf)) {
timelineClient =
((MRAppMaster.RunningAppContext)context).getTimelineClient();
timelineClient.init(conf);
newTimelineServiceEnabled = conf.getBoolean(
MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1"));
LOG.info("Emitting job history data to the timeline server is enabled");
timelineServiceV2Enabled =
YarnConfiguration.timelineServiceV2Enabled(conf);
LOG.info("Timeline service is enabled; version: " +
YarnConfiguration.getTimelineServiceVersion(conf));
if (timelineServiceV2Enabled) {
// initialize the thread pool for v.2 timeline service
threadPool = createThreadPool();
}
} else {
LOG.info("Timeline service is not enabled");
}
} else {
LOG.info("Emitting job history data to the timeline server is not enabled");
LOG.info("Emitting job history data to the timeline server is not " +
"enabled");
}
// Flag for setting
@ -459,19 +460,27 @@ public class JobHistoryEventHandler extends AbstractService
if (timelineClient != null) {
timelineClient.stop();
}
shutdownAndAwaitTermination();
if (threadPool != null) {
shutdownAndAwaitTermination();
}
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
}
// TODO remove threadPool after adding non-blocking call in TimelineClient
private static void shutdownAndAwaitTermination() {
private ExecutorService createThreadPool() {
return Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
}
private void shutdownAndAwaitTermination() {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
LOG.error("ThreadPool did not terminate");
LOG.error("ThreadPool did not terminate");
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
@ -633,7 +642,7 @@ public class JobHistoryEventHandler extends AbstractService
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
if (timelineClient != null) {
if (newTimelineServiceEnabled) {
if (timelineServiceV2Enabled) {
processEventForNewTimelineService(historyEvent, event.getJobID(),
event.getTimestamp());
} else {

View File

@ -1078,14 +1078,9 @@ public class MRAppMaster extends CompositeService {
this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)
&& conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
&& YarnConfiguration.timelineServiceEnabled(conf)) {
boolean newTimelineServiceEnabled = conf.getBoolean(
MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
if (newTimelineServiceEnabled) {
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// create new version TimelineClient
timelineClient = TimelineClient.createTimelineClient(
appAttemptID.getApplicationId());

View File

@ -474,11 +474,6 @@ public interface MRJobConfig {
"mapreduce.job.emit-timeline-data";
public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
false;
public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
"mapreduce.job.new-timeline-service.enabled";
public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
false;
public static final String MR_PREFIX = "yarn.app.mapreduce.";

View File

@ -637,13 +637,6 @@
</description>
</property>
<property>
<name>mapreduce.job.new-timeline-service.enabled</name>
<value>false</value>
<description>Specifies if posting job and task events to new timeline service.
</description>
</property>
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>0</value>

View File

@ -166,11 +166,10 @@ public class TestMRTimelineEventHandling {
LOG.info("testMRNewTimelineServiceEventHandling start.");
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
// enable new timeline service
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
// enable new timeline serivce in MR side
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, true);
// enable aux-service based timeline collectors
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME

View File

@ -173,7 +173,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
boolean enableTimelineAuxService = false;
if (nmAuxServices != null) {
for (String nmAuxService: nmAuxServices) {
if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) {
if (nmAuxService.equals(TIMELINE_AUX_SERVICE_NAME)) {
enableTimelineAuxService = true;
break;
}

View File

@ -83,6 +83,10 @@ public class YarnConfiguration extends Configuration {
new DeprecationDelta("yarn.client.max-nodemanagers-proxies",
NM_CLIENT_MAX_NM_PROXIES)
});
Configuration.addDeprecations(new DeprecationDelta[] {
new DeprecationDelta(RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
SYSTEM_METRICS_PUBLISHER_ENABLED)
});
}
//Configurations
@ -486,7 +490,8 @@ public class YarnConfiguration extends Configuration {
/**
* The setting that controls whether yarn system metrics is published on the
* timeline server or not by RM. This configuration setting is for ATS V1
* timeline server or not by RM. This configuration setting is for ATS V1.
* This is now deprecated in favor of SYSTEM_METRICS_PUBLISHER_ENABLED.
*/
public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX
+ "system-metrics-publisher.enabled";
@ -2781,13 +2786,52 @@ public class YarnConfiguration extends Configuration {
}
return clusterId;
}
public static boolean systemMetricsPublisherEnabled(Configuration conf) {
// helper methods for timeline service configuration
/**
* Returns whether the timeline service is enabled via configuration.
*
* @param conf the configuration
* @return whether the timeline service is enabled.
*/
public static boolean timelineServiceEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
&& conf.getBoolean(
YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
}
/**
* Returns the timeline service version. It does not check whether the
* timeline service itself is enabled.
*
* @param conf the configuration
* @return the timeline service version as a float.
*/
public static float getTimelineServiceVersion(Configuration conf) {
return conf.getFloat(TIMELINE_SERVICE_VERSION,
DEFAULT_TIMELINE_SERVICE_VERSION);
}
/**
* Returns whether the timeline service v.2 is enabled via configuration.
*
* @param conf the configuration
* @return whether the timeline service v.2 is enabled. V.2 refers to a
* version greater than equal to 2 but smaller than 3.
*/
public static boolean timelineServiceV2Enabled(Configuration conf) {
return timelineServiceEnabled(conf) &&
(int)getTimelineServiceVersion(conf) == 2;
}
/**
* Returns whether the system publisher is enabled.
*
* @param conf the configuration
* @return whether the system publisher is enabled.
*/
public static boolean systemMetricsPublisherEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
}
/* For debugging. mp configurations to system output as XML format. */

View File

@ -223,14 +223,11 @@ public class ApplicationMaster {
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
private boolean newTimelineService = false;
private boolean timelineServiceV2 = false;
// For posting entities in new timeline service in a non-blocking way
// TODO replace with event loop in TimelineClient.
private static ExecutorService threadPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
private ExecutorService threadPool;
// App Master configuration
// No. of containers to run shell command on
@ -331,8 +328,10 @@ public class ApplicationMaster {
}
appMaster.run();
result = appMaster.finish();
shutdownAndAwaitTermination();
if (appMaster.threadPool != null) {
appMaster.shutdownAndAwaitTermination();
}
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
LogManager.shutdown();
@ -346,16 +345,22 @@ public class ApplicationMaster {
System.exit(2);
}
}
//TODO remove threadPool after adding non-blocking call in TimelineClient
private static void shutdownAndAwaitTermination() {
private ExecutorService createThreadPool() {
return Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
}
private void shutdownAndAwaitTermination() {
threadPool.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
LOG.error("ThreadPool did not terminate");
LOG.error("ThreadPool did not terminate");
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
@ -433,8 +438,7 @@ public class ApplicationMaster {
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("timeline_service_version", true,
"Version for timeline service");
opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args);
@ -586,27 +590,15 @@ public class ApplicationMaster {
containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
"container_retry_interval", "0"));
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
if (cliParser.hasOption("timeline_service_version")) {
String timelineServiceVersion =
cliParser.getOptionValue("timeline_service_version", "v1");
if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) {
newTimelineService = false;
} else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) {
newTimelineService = true;
} else {
throw new IllegalArgumentException(
"timeline_service_version is not set properly, should be 'v1' or 'v2'");
}
if (YarnConfiguration.timelineServiceEnabled(conf)) {
timelineServiceV2 =
YarnConfiguration.timelineServiceV2Enabled(conf);
if (timelineServiceV2) {
threadPool = createThreadPool();
}
} else {
timelineClient = null;
LOG.warn("Timeline service is not enabled");
if (cliParser.hasOption("timeline_service_version")) {
throw new IllegalArgumentException(
"Timeline service is not enabled");
}
}
return true;
@ -668,16 +660,17 @@ public class ApplicationMaster {
nmClientAsync.start();
startTimelineClient(conf);
// need to bind timelineClient
amRMClient.registerTimelineClient(timelineClient);
if (timelineServiceV2) {
// need to bind timelineClient
amRMClient.registerTimelineClient(timelineClient);
}
if(timelineClient != null) {
if (newTimelineService) {
publishApplicationAttemptEventOnNewTimelineService(timelineClient,
appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId,
appSubmitterUgi);
if (timelineServiceV2) {
publishApplicationAttemptEventOnTimelineServiceV2(
DSEvent.DS_APP_ATTEMPT_START);
} else {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
}
@ -748,10 +741,9 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
if (YarnConfiguration.timelineServiceEnabled(conf)) {
// Creating the Timeline Client
if (newTimelineService) {
if (timelineServiceV2) {
timelineClient = TimelineClient.createTimelineClient(
appAttemptID.getApplicationId());
} else {
@ -787,10 +779,9 @@ public class ApplicationMaster {
}
if (timelineClient != null) {
if (newTimelineService) {
publishApplicationAttemptEventOnNewTimelineService(timelineClient,
appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId,
appSubmitterUgi);
if (timelineServiceV2) {
publishApplicationAttemptEventOnTimelineServiceV2(
DSEvent.DS_APP_ATTEMPT_END);
} else {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
@ -900,12 +891,11 @@ public class ApplicationMaster {
+ containerStatus.getContainerId());
}
if(timelineClient != null) {
if (newTimelineService) {
publishContainerEndEventOnNewTimelineService(
timelineClient, containerStatus, domainId, appSubmitterUgi);
if (timelineServiceV2) {
publishContainerEndEventOnTimelineServiceV2(containerStatus);
} else {
publishContainerEndEvent(
timelineClient, containerStatus, domainId, appSubmitterUgi);
timelineClient, containerStatus, domainId, appSubmitterUgi);
}
}
}
@ -1033,14 +1023,13 @@ public class ApplicationMaster {
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
if (applicationMaster.newTimelineService) {
ApplicationMaster.publishContainerStartEventOnNewTimelineService(
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
if (applicationMaster.timelineServiceV2) {
applicationMaster.publishContainerStartEventOnTimelineServiceV2(
container);
} else {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
}
}
}
@ -1349,7 +1338,7 @@ public class ApplicationMaster {
LOG.error("App Attempt "
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+ " event could not be published for "
+ appAttemptId.toString(), e);
+ appAttemptID, e);
}
}
@ -1397,27 +1386,24 @@ public class ApplicationMaster {
return new Thread(runnableLaunchContainer);
}
private static void publishContainerStartEventOnNewTimelineService(
final TimelineClient timelineClient, final Container container,
final String domainId, final UserGroupInformation ugi) {
private void publishContainerStartEventOnTimelineServiceV2(
final Container container) {
Runnable publishWrapper = new Runnable() {
public void run() {
publishContainerStartEventOnNewTimelineServiceBase(timelineClient,
container, domainId, ugi);
publishContainerStartEventOnTimelineServiceV2Base(container);
}
};
threadPool.execute(publishWrapper);
}
private static void publishContainerStartEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, Container container, String domainId,
UserGroupInformation ugi) {
private void publishContainerStartEventOnTimelineServiceV2Base(
Container container) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
//entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName());
entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
@ -1428,7 +1414,7 @@ public class ApplicationMaster {
entity.addEvent(event);
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
@ -1442,27 +1428,24 @@ public class ApplicationMaster {
}
}
private static void publishContainerEndEventOnNewTimelineService(
final TimelineClient timelineClient, final ContainerStatus container,
final String domainId, final UserGroupInformation ugi) {
private void publishContainerEndEventOnTimelineServiceV2(
final ContainerStatus container) {
Runnable publishWrapper = new Runnable() {
public void run() {
publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
container, domainId, ugi);
publishContainerEndEventOnTimelineServiceV2Base(container);
}
};
threadPool.execute(publishWrapper);
}
private static void publishContainerEndEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, final ContainerStatus container,
final String domainId, final UserGroupInformation ugi) {
private void publishContainerEndEventOnTimelineServiceV2Base(
final ContainerStatus container) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getContainerId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
//entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName());
entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
@ -1472,7 +1455,7 @@ public class ApplicationMaster {
entity.addEvent(event);
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
@ -1486,29 +1469,25 @@ public class ApplicationMaster {
}
}
private static void publishApplicationAttemptEventOnNewTimelineService(
final TimelineClient timelineClient, final String appAttemptId,
final DSEvent appEvent, final String domainId,
final UserGroupInformation ugi) {
private void publishApplicationAttemptEventOnTimelineServiceV2(
final DSEvent appEvent) {
Runnable publishWrapper = new Runnable() {
public void run() {
publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient,
appAttemptId, appEvent, domainId, ugi);
publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent);
}
};
threadPool.execute(publishWrapper);
}
private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, String appAttemptId,
DSEvent appEvent, String domainId, UserGroupInformation ugi) {
private void publishApplicationAttemptEventOnTimelineServiceV2Base(
DSEvent appEvent) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(appAttemptId);
entity.setId(appAttemptID.toString());
entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
//entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName());
entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setId(appEvent.toString());
@ -1516,7 +1495,7 @@ public class ApplicationMaster {
entity.addEvent(event);
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
timelineClient.putEntities(entity);
@ -1527,7 +1506,7 @@ public class ApplicationMaster {
LOG.error("App Attempt "
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+ " event could not be published for "
+ appAttemptId.toString(),
+ appAttemptID,
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}

View File

@ -196,8 +196,6 @@ public class Client {
// Command line options
private Options opts;
private String timelineServiceVersion;
private static final String shellCommandPath = "shellCommands";
private static final String shellArgsPath = "shellArgs";
private static final String appMasterJarPath = "AppMaster.jar";
@ -273,7 +271,6 @@ public class Client {
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("timeline_service_version", true, "Version for timeline service");
opts.addOption("keep_containers_across_application_attempts", false,
"Flag to indicate whether to keep containers across application attempts." +
" If the flag is true, running containers will not be killed when" +
@ -387,16 +384,6 @@ public class Client {
+ " Specified virtual cores=" + amVCores);
}
if (cliParser.hasOption("timeline_service_version")) {
timelineServiceVersion =
cliParser.getOptionValue("timeline_service_version", "v1");
if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") ||
timelineServiceVersion.trim().equalsIgnoreCase("v2"))) {
throw new IllegalArgumentException(
"timeline_service_version is not set properly, should be 'v1' or 'v2'");
}
}
if (!cliParser.hasOption("jar")) {
throw new IllegalArgumentException("No jar file specified for application master");
}
@ -726,9 +713,6 @@ public class Client {
vargs.addAll(containerRetryOptions);
if (timelineServiceVersion != null) {
vargs.add("--timeline_service_version " + timelineServiceVersion);
}
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

View File

@ -138,9 +138,10 @@ public class TestDistributedShell {
conf.set("yarn.log.dir", "target");
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
// mark if we need to launch the v1 timeline server
boolean enableATSServer = true;
// disable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
@ -157,7 +158,6 @@ public class TestDistributedShell {
true);
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
true);
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, false);
// ATS version specific settings
if (timelineVersion == 1.0f) {
@ -177,7 +177,6 @@ public class TestDistributedShell {
DistributedShellTimelinePlugin.class.getName());
} else if (timelineVersion == 2.0f) {
// disable v1 timeline server since we no longer have a server here
enableATSServer = false;
// enable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
@ -331,12 +330,7 @@ public class TestDistributedShell {
}
boolean isTestingTimelineV2 = false;
if (timelineVersionWatcher.getTimelineVersion() == 2.0f) {
String[] timelineArgs = {
"--timeline_service_version",
"v2"
};
isTestingTimelineV2 = true;
args = mergeArgs(args, timelineArgs);
if (!defaultFlow) {
String[] flowArgs = {
"--flow_name",

View File

@ -54,17 +54,21 @@ public abstract class TimelineClient extends AbstractService implements
* current user may use {@link UserGroupInformation#doAs} another user to
* construct and initialize a timeline client if the following operations are
* supposed to be conducted by that user.
*
* @return a timeline client
*/
protected ApplicationId contextAppId;
/**
* Creates an instance of the timeline v.1.x client.
*/
@Public
public static TimelineClient createTimelineClient() {
TimelineClient client = new TimelineClientImpl();
return client;
}
/**
* Creates an instance of the timeline v.2 client.
*/
@Public
public static TimelineClient createTimelineClient(ApplicationId appId) {
TimelineClient client = new TimelineClientImpl(appId);
@ -203,8 +207,9 @@ public abstract class TimelineClient extends AbstractService implements
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
* aggregator. It is a blocking API. The method will not return until all the
* put entities have been persisted.
* service v.2 collector. It is a blocking API. The method will not return
* until all the put entities have been persisted. If this method is invoked
* for a non-v.2 timeline client instance, a YarnException is thrown.
* </p>
*
* @param entities
@ -220,8 +225,9 @@ public abstract class TimelineClient extends AbstractService implements
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
* aggregator. It is an asynchronous API. The method will return once all the
* entities are received.
* service v.2 collector. It is an asynchronous API. The method will return
* once all the entities are received. If this method is invoked for a
* non-v.2 timeline client instance, a YarnException is thrown.
* </p>
*
* @param entities

View File

@ -388,6 +388,9 @@ public class TimelineClientImpl extends TimelineClient {
private void putEntities(boolean async,
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
throws IOException, YarnException {
if (!timelineServiceV2) {
throw new YarnException("v.2 method is invoked on a v.1.x client");
}
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
entitiesContainer =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();

View File

@ -785,14 +785,15 @@
<property>
<description>The setting that controls whether yarn system metrics is
published to the Timeline server (version one) or not, by RM.
This configuration is deprecated.</description>
This configuration is now deprecated in favor of
yarn.system-metrics-publisher.enabled.</description>
<name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
<value>false</value>
</property>
<property>
<description>The setting that controls whether yarn system metrics is
published on the Timeline server (version two) or not by RM And NM.</description>
published on the Timeline service or not by RM And NM.</description>
<name>yarn.system-metrics-publisher.enabled</name>
<value>false</value>
</property>

View File

@ -261,10 +261,12 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private void initRegisteredCollectors() {
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList();
this.registeredCollectors = new HashMap<ApplicationId, String> ();
for (AppCollectorsMapProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
this.registeredCollectors.put(appId, c.getAppCollectorAddr());
if (!list.isEmpty()) {
this.registeredCollectors = new HashMap<>();
for (AppCollectorsMapProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
this.registeredCollectors.put(appId, c.getAppCollectorAddr());
}
}
}

View File

@ -589,10 +589,12 @@ public class NodeHeartbeatResponsePBImpl extends
private void initAppCollectorsMap() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<AppCollectorsMapProto> list = p.getAppCollectorsMapList();
this.appCollectorsMap = new HashMap<ApplicationId, String> ();
for (AppCollectorsMapProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
if (!list.isEmpty()) {
this.appCollectorsMap = new HashMap<>();
for (AppCollectorsMapProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
}
}
}

View File

@ -75,7 +75,8 @@ public interface Context {
/**
* Get the registered collectors that located on this NM.
* @return registered
* @return registered collectors, or null if the timeline service v.2 is not
* enabled
*/
Map<ApplicationId, String> getRegisteredCollectors();

View File

@ -100,6 +100,7 @@ public class NodeManager extends CompositeService
private Context context;
private AsyncDispatcher dispatcher;
private ContainerManagerImpl containerManager;
// the NM collector service is set only if the timeline service v.2 is enabled
private NMCollectorService nmCollectorService;
private NodeStatusUpdater nodeStatusUpdater;
private NodeResourceMonitor nodeResourceMonitor;
@ -382,8 +383,10 @@ public class NodeManager extends CompositeService
DefaultMetricsSystem.initialize("NodeManager");
this.nmCollectorService = createNMCollectorService(context);
addService(nmCollectorService);
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
this.nmCollectorService = createNMCollectorService(context);
addService(nmCollectorService);
}
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
@ -480,8 +483,7 @@ public class NodeManager extends CompositeService
protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
protected Map<ApplicationId, String> registeredCollectors =
new ConcurrentHashMap<ApplicationId, String>();
protected Map<ApplicationId, String> registeredCollectors;
protected final ConcurrentMap<ContainerId,
org.apache.hadoop.yarn.api.records.Container> increasedContainers =
@ -514,6 +516,9 @@ public class NodeManager extends CompositeService
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
NMStateStoreService stateStore, boolean isDistSchedulingEnabled,
Configuration conf) {
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
this.registeredCollectors = new ConcurrentHashMap<>();
}
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.dirsHandler = dirsHandler;
@ -790,7 +795,14 @@ public class NodeManager extends CompositeService
return this.context;
}
// For testing
/**
* Returns the NM collector service. It should be used only for testing
* purposes.
*
* @return the NM collector service, or null if the timeline service v.2 is
* not enabled
*/
@VisibleForTesting
NMCollectorService getNMCollectorService() {
return this.nmCollectorService;
}
@ -798,6 +810,7 @@ public class NodeManager extends CompositeService
public static void main(String[] args) throws IOException {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
@SuppressWarnings("resource")
NodeManager nodeManager = new NodeManager();
Configuration conf = new YarnConfiguration();
new GenericOptionsParser(conf, args);

View File

@ -909,7 +909,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
newResource.toString());
}
}
if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
updateTimelineClientsAddress(response);
}
@ -943,7 +943,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
/**
* Caller should take care of sending non null nodelabels for both
* arguments
*
*
* @param nodeLabelsNew
* @param nodeLabelsOld
* @return if the New node labels are diff from the older one.
@ -959,27 +959,37 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private void updateTimelineClientsAddress(
NodeHeartbeatResponse response) {
Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
response.getAppCollectorsMap().entrySet();
for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
ApplicationId appId = entry.getKey();
String collectorAddr = entry.getValue();
Map<ApplicationId, String> knownCollectorsMap =
response.getAppCollectorsMap();
if (knownCollectorsMap == null) {
LOG.warn("the collectors map is null");
} else {
Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
knownCollectorsMap.entrySet();
for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
ApplicationId appId = entry.getKey();
String collectorAddr = entry.getValue();
// Only handle applications running on local node.
// Not include apps with timeline collectors running in local
Application application = context.getApplications().get(appId);
if (application != null &&
!context.getRegisteredCollectors().containsKey(appId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sync a new collector address: " + collectorAddr +
" for application: " + appId + " from RM.");
// Only handle applications running on local node.
// Not include apps with timeline collectors running in local
Application application = context.getApplications().get(appId);
// TODO this logic could be problematic if the collector address
// gets updated due to NM restart or collector service failure
if (application != null &&
!context.getRegisteredCollectors().containsKey(appId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sync a new collector address: " + collectorAddr +
" for application: " + appId + " from RM.");
}
TimelineClient client = application.getTimelineClient();
if (client != null) {
client.setTimelineServiceAddress(collectorAddr);
}
}
TimelineClient client = application.getTimelineClient();
client.setTimelineServiceAddress(collectorAddr);
}
}
}
private void updateMasterKeys(NodeHeartbeatResponse response) {
// See if the master-key has rolled over
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();

View File

@ -43,6 +43,10 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
/**
* Service that handles collector information. It is used only if the timeline
* service v.2 is enabled.
*/
public class NMCollectorService extends CompositeService implements
CollectorNodemanagerProtocol {
@ -113,9 +117,9 @@ public class NMCollectorService extends CompositeService implements
String collectorAddr = collector.getCollectorAddr();
newCollectorsMap.put(appId, collectorAddr);
// set registered collector address to TimelineClient.
if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
TimelineClient client =
context.getApplications().get(appId).getTimelineClient();
TimelineClient client =
context.getApplications().get(appId).getTimelineClient();
if (client != null) {
client.setTimelineServiceAddress(collectorAddr);
}
}

View File

@ -117,6 +117,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl.FlowContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
@ -192,7 +193,8 @@ public class ContainerManagerImpl extends CompositeService implements
private long waitForContainersOnShutdownMillis;
private final NMTimelinePublisher nmMetricsPublisher;
// NM metrics publisher is set only if the timeline service v.2 is enabled
private NMTimelinePublisher nmMetricsPublisher;
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@ -220,9 +222,17 @@ public class ContainerManagerImpl extends CompositeService implements
auxiliaryServices.registerServiceListener(this);
addService(auxiliaryServices);
nmMetricsPublisher = createNMTimelinePublisher(context);
context.setNMTimelinePublisher(nmMetricsPublisher);
this.containersMonitor = createContainersMonitor(exec);
// initialize the metrics publisher if the timeline service v.2 is enabled
// and the system publisher is enabled
Configuration conf = context.getConf();
if (YarnConfiguration.timelineServiceV2Enabled(conf) &&
YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
LOG.info("YARN system metrics publishing service is enabled");
nmMetricsPublisher = createNMTimelinePublisher(context);
context.setNMTimelinePublisher(nmMetricsPublisher);
}
this.containersMonitor =
new ContainersMonitorImpl(exec, dispatcher, this.context);
addService(this.containersMonitor);
dispatcher.register(ContainerEventType.class,
@ -238,7 +248,6 @@ public class ContainerManagerImpl extends CompositeService implements
addService(dispatcher);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@ -343,7 +352,7 @@ public class ContainerManagerImpl extends CompositeService implements
LOG.info("Recovering application " + appId);
//TODO: Recover flow and flow run ID
ApplicationImpl app = new ApplicationImpl(
dispatcher, p.getUser(), null, null, 0L, appId, creds, context,
dispatcher, p.getUser(), null, appId, creds, context,
p.getAppLogAggregationInitedTime());
context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
@ -970,20 +979,27 @@ public class ContainerManagerImpl extends CompositeService implements
try {
if (!isServiceStopped()) {
// Create the application
String flowName = launchContext.getEnvironment().get(
TimelineUtils.FLOW_NAME_TAG_PREFIX);
String flowVersion = launchContext.getEnvironment().get(
TimelineUtils.FLOW_VERSION_TAG_PREFIX);
String flowRunIdStr = launchContext.getEnvironment().get(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
long flowRunId = 0L;
if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
flowRunId = Long.parseLong(flowRunIdStr);
// populate the flow context from the launch context if the timeline
// service v.2 is enabled
FlowContext flowContext = null;
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
String flowName = launchContext.getEnvironment().get(
TimelineUtils.FLOW_NAME_TAG_PREFIX);
String flowVersion = launchContext.getEnvironment().get(
TimelineUtils.FLOW_VERSION_TAG_PREFIX);
String flowRunIdStr = launchContext.getEnvironment().get(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
long flowRunId = 0L;
if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
flowRunId = Long.parseLong(flowRunIdStr);
}
flowContext =
new FlowContext(flowName, flowVersion, flowRunId);
}
if (!context.getApplications().containsKey(applicationID)) {
Application application =
new ApplicationImpl(dispatcher, user, flowName, flowVersion,
flowRunId, applicationID, credentials, context);
new ApplicationImpl(dispatcher, user, flowContext,
applicationID, credentials, context);
if (context.getApplications().putIfAbsent(applicationID,
application) == null) {
LOG.info("Creating a new application reference for app "
@ -1335,7 +1351,9 @@ public class ContainerManagerImpl extends CompositeService implements
Container c = containers.get(event.getContainerID());
if (c != null) {
c.handle(event);
nmMetricsPublisher.publishContainerEvent(event);
if (nmMetricsPublisher != null) {
nmMetricsPublisher.publishContainerEvent(event);
}
} else {
LOG.warn("Event " + event + " sent to absent container " +
event.getContainerID());
@ -1351,7 +1369,9 @@ public class ContainerManagerImpl extends CompositeService implements
event.getApplicationID());
if (app != null) {
app.handle(event);
nmMetricsPublisher.publishApplicationEvent(event);
if (nmMetricsPublisher != null) {
nmMetricsPublisher.publishApplicationEvent(event);
}
} else {
LOG.warn("Event " + event + " sent to absent application "
+ event.getApplicationID());
@ -1374,7 +1394,9 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public void handle(LocalizationEvent event) {
origLocalizationEventHandler.handle(event);
timelinePublisher.publishLocalizationEvent(event);
if (timelinePublisher != null) {
timelinePublisher.publishLocalizationEvent(event);
}
}
}

View File

@ -74,9 +74,8 @@ public class ApplicationImpl implements Application {
final Dispatcher dispatcher;
final String user;
final String flowName;
final String flowVersion;
final long flowRunId;
// flow context is set only if the timeline service v.2 is enabled
private FlowContext flowContext;
final ApplicationId appId;
final Credentials credentials;
Map<ApplicationAccessType, String> applicationACLs;
@ -102,15 +101,24 @@ public class ApplicationImpl implements Application {
private long applicationLogInitedTimestamp = -1;
private final NMStateStoreService appStateStore;
public ApplicationImpl(Dispatcher dispatcher, String user, String flowName,
String flowVersion, long flowRunId, ApplicationId appId,
Credentials credentials, Context context,
public ApplicationImpl(Dispatcher dispatcher, String user,
ApplicationId appId, Credentials credentials, Context context) {
this(dispatcher, user, null, appId, credentials, context, -1L);
}
public ApplicationImpl(Dispatcher dispatcher, String user,
ApplicationId appId, Credentials credentials, Context context,
long recoveredLogInitedTime) {
this(dispatcher, user, null, appId, credentials, context,
recoveredLogInitedTime);
}
public ApplicationImpl(Dispatcher dispatcher, String user,
FlowContext flowContext, ApplicationId appId, Credentials credentials,
Context context, long recoveredLogInitedTime) {
this.dispatcher = dispatcher;
this.user = user;
this.flowName = flowName;
this.flowVersion = flowVersion;
this.flowRunId = flowRunId;
this.flowContext = flowContext;
this.appId = appId;
this.credentials = credentials;
this.aclsManager = context.getApplicationACLsManager();
@ -123,17 +131,50 @@ public class ApplicationImpl implements Application {
setAppLogInitedTimestamp(recoveredLogInitedTime);
}
public ApplicationImpl(Dispatcher dispatcher, String user, String flowId,
String flowVersion, long flowRunId, ApplicationId appId,
public ApplicationImpl(Dispatcher dispatcher, String user,
FlowContext flowContext, ApplicationId appId,
Credentials credentials, Context context) {
this(dispatcher, user, flowId, flowVersion, flowRunId, appId, credentials,
this(dispatcher, user, flowContext, appId, credentials,
context, -1);
Configuration conf = context.getConf();
if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
createAndStartTimelineClient(conf);
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
if (flowContext == null) {
throw new IllegalArgumentException("flow context cannot be null");
}
this.flowContext = flowContext;
if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
createAndStartTimelineClient(conf);
}
}
}
/**
* Data object that encapsulates the flow context for the application purpose.
*/
public static class FlowContext {
private final String flowName;
private final String flowVersion;
private final long flowRunId;
public FlowContext(String flowName, String flowVersion, long flowRunId) {
this.flowName = flowName;
this.flowVersion = flowVersion;
this.flowRunId = flowRunId;
}
public String getFlowName() {
return flowName;
}
public String getFlowVersion() {
return flowVersion;
}
public long getFlowRunId() {
return flowRunId;
}
}
private void createAndStartTimelineClient(Configuration conf) {
// create and start timeline client
this.timelineClient = TimelineClient.createTimelineClient(appId);
@ -528,7 +569,11 @@ public class ApplicationImpl implements Application {
// Remove collectors info for finished apps.
// TODO check we remove related collectors info in failure cases
// (YARN-3038)
app.context.getRegisteredCollectors().remove(app.getAppId());
Map<ApplicationId, String> registeredCollectors =
app.context.getRegisteredCollectors();
if (registeredCollectors != null) {
registeredCollectors.remove(app.getAppId());
}
// stop timelineClient when application get finished.
TimelineClient timelineClient = app.getTimelineClient();
if (timelineClient != null) {
@ -595,16 +640,16 @@ public class ApplicationImpl implements Application {
@Override
public String getFlowName() {
return flowName;
return flowContext == null ? null : flowContext.getFlowName();
}
@Override
public String getFlowVersion() {
return flowVersion;
return flowContext == null ? null : flowContext.getFlowVersion();
}
@Override
public long getFlowRunId() {
return flowRunId;
return flowContext == null ? 0L : flowContext.getFlowRunId();
}
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
@ -573,9 +574,13 @@ public class ContainersMonitorImpl extends AbstractService implements
ContainerImpl container =
(ContainerImpl) context.getContainers().get(containerId);
container.getNMTimelinePublisher().reportContainerResourceUsage(
container, currentTime, pId, currentPmemUsage,
cpuUsageTotalCoresPercentage);
NMTimelinePublisher nmMetricsPublisher =
container.getNMTimelinePublisher();
if (nmMetricsPublisher != null) {
nmMetricsPublisher.reportContainerResourceUsage(
container, currentTime, pId, currentPmemUsage,
cpuUsageTotalCoresPercentage);
}
} catch (Exception e) {
// Log the exception and proceed to the next container.
LOG.warn("Uncaught exception in ContainersMonitorImpl "

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -56,12 +55,16 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* Metrics publisher service that publishes data to the timeline service v.2. It
* is used only if the timeline service v.2 is enabled and the system publishing
* of events and metrics is enabled.
*/
public class NMTimelinePublisher extends CompositeService {
private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class);
private Dispatcher dispatcher;
private boolean publishSystemMetrics;
private Context context;
@ -76,24 +79,16 @@ public class NMTimelinePublisher extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
publishSystemMetrics =
YarnConfiguration.systemMetricsPublisherEnabled(conf);
if (publishSystemMetrics) {
dispatcher = new AsyncDispatcher();
dispatcher.register(NMTimelineEventType.class,
new ForwardingEventHandler());
dispatcher
.register(ContainerEventType.class, new ContainerEventHandler());
dispatcher.register(ApplicationEventType.class,
new ApplicationEventHandler());
dispatcher.register(LocalizationEventType.class,
new LocalizationEventDispatcher());
addIfService(dispatcher);
LOG.info("YARN system metrics publishing service is enabled");
} else {
LOG.info("YARN system metrics publishing service is not enabled");
}
dispatcher = new AsyncDispatcher();
dispatcher.register(NMTimelineEventType.class,
new ForwardingEventHandler());
dispatcher
.register(ContainerEventType.class, new ContainerEventHandler());
dispatcher.register(ApplicationEventType.class,
new ApplicationEventHandler());
dispatcher.register(LocalizationEventType.class,
new LocalizationEventDispatcher());
addIfService(dispatcher);
super.serviceInit(conf);
}
@ -121,8 +116,9 @@ public class NMTimelinePublisher extends CompositeService {
public void reportContainerResourceUsage(Container container,
long createdTime, String pId, Long pmemUsage,
Float cpuUsageTotalCoresPercentage) {
if (publishSystemMetrics
&& (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE)) {
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
cpuUsageTotalCoresPercentage !=
ResourceCalculatorProcessTree.UNAVAILABLE) {
ContainerEntity entity =
createContainerEntity(container.getContainerId());
long currentTimeMillis = System.currentTimeMillis();
@ -219,9 +215,6 @@ public class NMTimelinePublisher extends CompositeService {
}
public void publishApplicationEvent(ApplicationEvent event) {
if (!publishSystemMetrics) {
return;
}
// publish only when the desired event is received
switch (event.getType()) {
case INIT_APPLICATION:
@ -242,9 +235,6 @@ public class NMTimelinePublisher extends CompositeService {
}
public void publishContainerEvent(ContainerEvent event) {
if (!publishSystemMetrics) {
return;
}
// publish only when the desired event is received
switch (event.getType()) {
case INIT_CONTAINER:
@ -262,9 +252,6 @@ public class NMTimelinePublisher extends CompositeService {
}
public void publishLocalizationEvent(LocalizationEvent event) {
if (!publishSystemMetrics) {
return;
}
// publish only when the desired event is received
switch (event.getType()) {
case CONTAINER_RESOURCES_LOCALIZED:

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -95,7 +94,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -106,7 +104,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestContainerManagerRecovery extends BaseContainerManagerTest {
@ -726,9 +723,9 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
}
@Override
public NMTimelinePublisher createNMTimelinePublisher(Context context) {
NMTimelinePublisher timelinePublisher = mock(NMTimelinePublisher.class);
return timelinePublisher;
public NMTimelinePublisher
createNMTimelinePublisher(Context context) {
return null;
}
};
}

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@ -555,7 +554,7 @@ public class TestApplication {
this.appId = BuilderUtils.newApplicationId(timestamp, id);
app = new ApplicationImpl(
dispatcher, this.user, null, null, 0, appId, null, context);
dispatcher, this.user, appId, null, context);
containers = new ArrayList<Container>();
for (int i = 0; i < numContainers; i++) {
Container container = createMockedContainer(this.appId, i);

View File

@ -32,15 +32,14 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -63,6 +62,7 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.w3c.dom.Document;
@ -339,7 +339,7 @@ public class TestNMWebServices extends JerseyTestBase {
final String filename = "logfile1";
final String logMessage = "log message\n";
nmContext.getApplications().put(appId, new ApplicationImpl(null, "user",
null, null, 0, appId, null, nmContext));
appId, null, nmContext));
MockContainer container = new MockContainer(appAttemptId,
new AsyncDispatcher(), new Configuration(), "user", appId, 1);

View File

@ -311,8 +311,11 @@ public class ApplicationMasterService extends AbstractService implements
RMApp rmApp =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
// Remove collector address when app get finished.
rmApp.removeCollectorAddr();
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
rmApp.removeCollectorAddr();
}
// checking whether the app exits in RMStateStore at first not to throw
// ApplicationDoesNotExistInCacheException before and after
// RM work-preserving restart.
@ -575,8 +578,10 @@ public class ApplicationMasterService extends AbstractService implements
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add collector address for this application
allocateResponse.setCollectorAddr(
this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
allocateResponse.setCollectorAddr(
this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
}
// add preemption to the allocateResponse message (if any)
allocateResponse

View File

@ -584,24 +584,27 @@ public class ClientRMService extends AbstractService implements
throw RPCUtil.getRemoteException(ie);
}
// Sanity check for flow run
String value = null;
try {
for (String tag : submissionContext.getApplicationTags()) {
if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
tag.startsWith(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1);
Long.valueOf(value);
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
// Sanity check for flow run
String value = null;
try {
for (String tag : submissionContext.getApplicationTags()) {
if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
tag.startsWith(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length()
+ 1);
Long.valueOf(value);
}
}
} catch (NumberFormatException e) {
LOG.warn("Invalid to flow run: " + value +
". Flow run should be a long integer", e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(e);
}
} catch (NumberFormatException e) {
LOG.warn("Invalid to flow run: " + value +
". Flow run should be a long integer", e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(e);
}
// Check whether app has already been put into rmContext,

View File

@ -402,8 +402,11 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
LOG.warn(message);
throw new YarnException(message);
}
// Start timeline collector for the submitted app
application.startTimelineCollector();
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// Start timeline collector for the submitted app
application.startTimelineCollector();
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());

View File

@ -469,18 +469,19 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected SystemMetricsPublisher createSystemMetricsPublisher() {
boolean timelineServiceEnabled =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
SystemMetricsPublisher publisher = null;
if (timelineServiceEnabled) {
if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
LOG.info("TimelineService V1 is configured");
publisher = new TimelineServiceV1Publisher();
} else {
LOG.info("TimelineService V2 is configured");
SystemMetricsPublisher publisher;
if (YarnConfiguration.timelineServiceEnabled(conf) &&
YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// we're dealing with the v.2.x publisher
LOG.info("system metrics publisher with the timeline service V2 is " +
"configured");
publisher = new TimelineServiceV2Publisher(rmContext);
} else {
// we're dealing with the v.1.x publisher
LOG.info("system metrics publisher with the timeline service V1 is " +
"configured");
publisher = new TimelineServiceV1Publisher();
}
} else {
LOG.info("TimelineServicePublisher is not configured");
@ -606,10 +607,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
RMTimelineCollectorManager timelineCollectorManager =
createRMTimelineCollectorManager();
addService(timelineCollectorManager);
rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
if (YarnConfiguration.timelineServiceV2Enabled(configuration)) {
RMTimelineCollectorManager timelineCollectorManager =
createRMTimelineCollectorManager();
addService(timelineCollectorManager);
rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
}
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);

View File

@ -517,10 +517,15 @@ public class ResourceTrackerService extends AbstractService implements
message);
}
// Check & update collectors info from request.
// TODO make sure it won't have race condition issue for AM failed over case
// that the older registration could possible override the newer one.
updateAppCollectorsMap(request);
boolean timelineV2Enabled =
YarnConfiguration.timelineServiceV2Enabled(getConfig());
if (timelineV2Enabled) {
// Check & update collectors info from request.
// TODO make sure it won't have race condition issue for AM failed over
// case that the older registration could possible override the newer
// one.
updateAppCollectorsMap(request);
}
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
@ -539,12 +544,12 @@ public class ResourceTrackerService extends AbstractService implements
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
}
// Return collectors' map that NM needs to know
// TODO we should optimize this to only include collector info that NM
// doesn't know yet.
List<ApplicationId> keepAliveApps =
remoteNodeStatus.getKeepAliveApplications();
if (keepAliveApps != null) {
if (timelineV2Enabled && keepAliveApps != null) {
// Return collectors' map that NM needs to know
// TODO we should optimize this to only include collector info that NM
// doesn't know yet.
setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
}

View File

@ -229,12 +229,14 @@ public class AMLauncher implements Runnable {
.get(applicationId)
.getSubmitTime()));
// Set flow context info
for (String tag :
rmContext.getRMApps().get(applicationId).getApplicationTags()) {
setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag);
setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag);
setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag);
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// Set flow context info
for (String tag :
rmContext.getRMApps().get(applicationId).getApplicationTags()) {
setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag);
setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag);
setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag);
}
}
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* This class is responsible for posting application, appattempt & Container
* This class is responsible for posting application, appattempt &amp; Container
* lifecycle related events to timeline service V2
*/
@Private

View File

@ -955,15 +955,17 @@ public class RMAppImpl implements RMApp, Recoverable {
extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
LOG.info("Updating collector info for app: " + app.getApplicationId());
if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) {
LOG.info("Updating collector info for app: " + app.getApplicationId());
RMAppCollectorUpdateEvent appCollectorUpdateEvent =
(RMAppCollectorUpdateEvent) event;
// Update collector address
app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
RMAppCollectorUpdateEvent appCollectorUpdateEvent =
(RMAppCollectorUpdateEvent) event;
// Update collector address
app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
// TODO persistent to RMStateStore for recover
// Save to RMStateStore
// TODO persistent to RMStateStore for recover
// Save to RMStateStore
}
};
}

View File

@ -656,6 +656,7 @@ public class TestClientRMService {
ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager,
mockAclsManager, mockQueueACLsManager, null);
rmService.init(new Configuration());
// without name and queue
@ -749,6 +750,7 @@ public class TestClientRMService {
ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager,
mockAclsManager, mockQueueACLsManager, null);
rmService.init(new Configuration());
// Initialize appnames and queues
String[] queues = {QUEUE_1, QUEUE_2};
@ -912,6 +914,7 @@ public class TestClientRMService {
final ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager, null, null,
null);
rmService.init(new Configuration());
// submit an app and wait for it to block while in app submission
Thread t = new Thread() {

View File

@ -75,7 +75,7 @@ public class TestSystemMetricsPublisher {
public static void setup() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,

View File

@ -131,6 +131,7 @@ public class TestSystemMetricsPublisherForV2 {
private static Configuration getTimelineV2Conf() {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.setInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2);

View File

@ -20,41 +20,55 @@ package org.apache.hadoop.yarn.server.timelineservice;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timelineservice.*;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
public class TestTimelineServiceClientIntegration {
private static NodeTimelineCollectorManager collectorManager;
private static PerNodeTimelineCollectorsAuxService auxService;
private static Configuration conf;
@BeforeClass
public static void setupClass() throws Exception {
try {
collectorManager = new MockNodeTimelineCollectorManager();
conf = new YarnConfiguration();
// enable timeline service v.2
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
auxService =
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
collectorManager);
collectorManager, conf);
auxService.addApplication(ApplicationId.newInstance(0, 1));
} catch (ExitUtil.ExitException e) {
fail();
@ -76,7 +90,7 @@ public class TestTimelineServiceClientIntegration {
// set the timeline service address manually
client.setTimelineServiceAddress(
collectorManager.getRestServerBindAddress());
client.init(new YarnConfiguration());
client.init(conf);
client.start();
TimelineEntity entity = new TimelineEntity();
entity.setType("test entity type");
@ -103,7 +117,7 @@ public class TestTimelineServiceClientIntegration {
// set the timeline service address manually
client.setTimelineServiceAddress(
collectorManager.getRestServerBindAddress());
client.init(new YarnConfiguration());
client.init(conf);
client.start();
ClusterEntity cluster = new ClusterEntity();
cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID);

View File

@ -30,12 +30,11 @@ import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
@ -68,6 +67,9 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
throw new YarnException("Timeline service v2 is not enabled");
}
collectorManager.init(conf);
super.serviceInit(conf);
}
@ -175,7 +177,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
@VisibleForTesting
public static PerNodeTimelineCollectorsAuxService
launchServer(String[] args, NodeTimelineCollectorManager collectorManager) {
launchServer(String[] args, NodeTimelineCollectorManager collectorManager,
Configuration conf) {
Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(
@ -187,7 +190,6 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
new PerNodeTimelineCollectorsAuxService(collectorManager);
ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration();
auxService.init(conf);
auxService.start();
} catch (Throwable t) {
@ -210,6 +212,9 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
}
public static void main(String[] args) {
launchServer(args, null);
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
launchServer(args, null, conf);
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
@ -66,6 +67,10 @@ public class TimelineReaderServer extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
throw new YarnException("timeline service v.2 is not enabled");
}
TimelineReader timelineReaderStore = createTimelineReaderStore(conf);
addService(timelineReaderStore);
timelineReaderManager = createTimelineReaderManager(timelineReaderStore);
@ -143,7 +148,8 @@ public class TimelineReaderServer extends CompositeService {
return readerWebServer.getConnectorAddress(0).getPort();
}
static TimelineReaderServer startTimelineReaderServer(String[] args) {
static TimelineReaderServer startTimelineReaderServer(String[] args,
Configuration conf) {
Thread.setDefaultUncaughtExceptionHandler(
new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(TimelineReaderServer.class,
@ -154,7 +160,6 @@ public class TimelineReaderServer extends CompositeService {
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(timelineReaderServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration();
timelineReaderServer.init(conf);
timelineReaderServer.start();
} catch (Throwable t) {
@ -165,6 +170,9 @@ public class TimelineReaderServer extends CompositeService {
}
public static void main(String[] args) {
startTimelineReaderServer(args);
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
startTimelineReaderServer(args, conf);
}
}

View File

@ -50,11 +50,16 @@ import java.io.IOException;
public class TestPerNodeTimelineCollectorsAuxService {
private ApplicationAttemptId appAttemptId;
private PerNodeTimelineCollectorsAuxService auxService;
private Configuration conf;
public TestPerNodeTimelineCollectorsAuxService() {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
conf = new YarnConfiguration();
// enable timeline service v.2
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
}
@After
@ -134,7 +139,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
try {
auxService =
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
createCollectorManager());
createCollectorManager(), conf);
} catch (ExitUtil.ExitException e) {
assertEquals(0, e.status);
ExitUtil.resetFirstExitException();
@ -160,7 +165,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
NodeTimelineCollectorManager collectorManager = createCollectorManager();
PerNodeTimelineCollectorsAuxService auxService =
spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
auxService.init(new YarnConfiguration());
auxService.init(conf);
auxService.start();
return auxService;
}

View File

@ -30,8 +30,11 @@ public class TestTimelineReaderServer {
@Test(timeout = 60000)
public void testStartStopServer() throws Exception {
@SuppressWarnings("resource")
TimelineReaderServer server = new TimelineReaderServer();
Configuration config = new YarnConfiguration();
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
"localhost:0");
try {

View File

@ -72,6 +72,8 @@ public class TestTimelineReaderWebServices {
public void init() throws Exception {
try {
Configuration config = new YarnConfiguration();
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
"localhost:0");
config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");

View File

@ -238,6 +238,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
public void init() throws Exception {
try {
Configuration config = util.getConfiguration();
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
"localhost:0");
config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");