YARN-2375. Allow enabling/disabling timeline server per framework. (Mit Desai via jeagles)

This commit is contained in:
Jonathan Eagles 2014-11-20 23:34:35 -06:00
parent c95b878abf
commit c298a9a845
5 changed files with 74 additions and 100 deletions

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -243,9 +244,15 @@ protected void serviceInit(Configuration conf) throws Exception {
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) { MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
timelineClient = TimelineClient.createTimelineClient(); if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
timelineClient.init(conf); YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
LOG.info("Emitting job history data to the timeline server is enabled"); timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
LOG.info("Timeline service is enabled");
LOG.info("Emitting job history data to the timeline server is enabled");
} else {
LOG.info("Timeline service is not enabled");
}
} else { } else {
LOG.info("Emitting job history data to the timeline server is not enabled"); LOG.info("Emitting job history data to the timeline server is not enabled");
} }

View File

@ -81,6 +81,9 @@ Release 2.7.0 - UNRELEASED
YARN-2802. ClusterMetrics to include AM launch and register delays. YARN-2802. ClusterMetrics to include AM launch and register delays.
(Zhihai Xu via kasha) (Zhihai Xu via kasha)
YARN-2375. Allow enabling/disabling timeline server per framework.
(Mit Desai via jeagles)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -497,10 +497,16 @@ public boolean init(String[] args) throws ParseException, IOException {
requestPriority = Integer.parseInt(cliParser requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0")); .getOptionValue("priority", "0"));
// Creating the Timeline Client if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
timelineClient = TimelineClient.createTimelineClient(); YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
timelineClient.init(conf); // Creating the Timeline Client
timelineClient.start(); timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
timelineClient.start();
} else {
timelineClient = null;
LOG.warn("Timeline service is not enabled");
}
return true; return true;
} }
@ -548,9 +554,11 @@ public void run() throws YarnException, IOException {
appSubmitterUgi = appSubmitterUgi =
UserGroupInformation.createRemoteUser(appSubmitterUserName); UserGroupInformation.createRemoteUser(appSubmitterUserName);
appSubmitterUgi.addCredentials(credentials); appSubmitterUgi.addCredentials(credentials);
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), if(timelineClient != null) {
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
@ -617,8 +625,10 @@ public void run() throws YarnException, IOException {
} }
numRequestedContainers.set(numTotalContainers); numRequestedContainers.set(numTotalContainers);
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), if(timelineClient != null) {
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
} }
@VisibleForTesting @VisibleForTesting
@ -681,6 +691,11 @@ protected boolean finish() {
amRMClient.stop(); amRMClient.stop();
// Stop Timeline Client
if(timelineClient != null) {
timelineClient.stop();
}
return success; return success;
} }
@ -724,8 +739,10 @@ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
LOG.info("Container completed successfully." + ", containerId=" LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId()); + containerStatus.getContainerId());
} }
publishContainerEndEvent( if(timelineClient != null) {
timelineClient, containerStatus, domainId, appSubmitterUgi); publishContainerEndEvent(
timelineClient, containerStatus, domainId, appSubmitterUgi);
}
} }
// ask for more containers if any failed // ask for more containers if any failed
@ -840,9 +857,11 @@ public void onContainerStarted(ContainerId containerId,
if (container != null) { if (container != null) {
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
} }
ApplicationMaster.publishContainerStartEvent( if(applicationMaster.timelineClient != null) {
applicationMaster.timelineClient, container, ApplicationMaster.publishContainerStartEvent(
applicationMaster.domainId, applicationMaster.appSubmitterUgi); applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
}
} }
@Override @Override

View File

@ -105,7 +105,6 @@ public class TimelineClientImpl extends TimelineClient {
private DelegationTokenAuthenticator authenticator; private DelegationTokenAuthenticator authenticator;
private DelegationTokenAuthenticatedURL.Token token; private DelegationTokenAuthenticatedURL.Token token;
private URI resURI; private URI resURI;
private boolean isEnabled;
@Private @Private
@VisibleForTesting @VisibleForTesting
@ -247,55 +246,42 @@ public TimelineClientImpl() {
} }
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
isEnabled = conf.getBoolean( ClientConfig cc = new DefaultClientConfig();
YarnConfiguration.TIMELINE_SERVICE_ENABLED, cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); connConfigurator = newConnConfigurator(conf);
if (!isEnabled) { if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Timeline service is not enabled"); authenticator = new KerberosDelegationTokenAuthenticator();
} else { } else {
ClientConfig cc = new DefaultClientConfig(); authenticator = new PseudoDelegationTokenAuthenticator();
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
connConfigurator = newConnConfigurator(conf);
if (UserGroupInformation.isSecurityEnabled()) {
authenticator = new KerberosDelegationTokenAuthenticator();
} else {
authenticator = new PseudoDelegationTokenAuthenticator();
}
authenticator.setConnectionConfigurator(connConfigurator);
token = new DelegationTokenAuthenticatedURL.Token();
connectionRetry = new TimelineClientConnectionRetry(conf);
client = new Client(new URLConnectionClientHandler(
new TimelineURLConnectionFactory()), cc);
TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
client.addFilter(retryFilter);
if (YarnConfiguration.useHttps(conf)) {
resURI = URI
.create(JOINER.join("https://", conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
RESOURCE_URI_STR));
} else {
resURI = URI.create(JOINER.join("http://", conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
RESOURCE_URI_STR));
}
LOG.info("Timeline service address: " + resURI);
} }
authenticator.setConnectionConfigurator(connConfigurator);
token = new DelegationTokenAuthenticatedURL.Token();
connectionRetry = new TimelineClientConnectionRetry(conf);
client = new Client(new URLConnectionClientHandler(
new TimelineURLConnectionFactory()), cc);
TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
client.addFilter(retryFilter);
if (YarnConfiguration.useHttps(conf)) {
resURI = URI
.create(JOINER.join("https://", conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
RESOURCE_URI_STR));
} else {
resURI = URI.create(JOINER.join("http://", conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
RESOURCE_URI_STR));
}
LOG.info("Timeline service address: " + resURI);
super.serviceInit(conf); super.serviceInit(conf);
} }
@Override @Override
public TimelinePutResponse putEntities( public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException { TimelineEntity... entities) throws IOException, YarnException {
if (!isEnabled) {
if (LOG.isDebugEnabled()) {
LOG.debug("Nothing will be put because timeline service is not enabled");
}
return new TimelinePutResponse();
}
TimelineEntities entitiesContainer = new TimelineEntities(); TimelineEntities entitiesContainer = new TimelineEntities();
entitiesContainer.addEntities(Arrays.asList(entities)); entitiesContainer.addEntities(Arrays.asList(entities));
ClientResponse resp = doPosting(entitiesContainer, null); ClientResponse resp = doPosting(entitiesContainer, null);
@ -306,12 +292,6 @@ public TimelinePutResponse putEntities(
@Override @Override
public void putDomain(TimelineDomain domain) throws IOException, public void putDomain(TimelineDomain domain) throws IOException,
YarnException { YarnException {
if (!isEnabled) {
if (LOG.isDebugEnabled()) {
LOG.debug("Nothing will be put because timeline service is not enabled");
}
return;
}
doPosting(domain, "domain"); doPosting(domain, "domain");
} }

View File

@ -119,41 +119,6 @@ public void testPostEntitiesConnectionRefused() throws Exception {
} }
} }
@Test
public void testPostEntitiesTimelineServiceNotEnabled() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
TimelineClientImpl client = createTimelineClient(conf);
mockEntityClientResponse(
client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
try {
TimelinePutResponse response = client.putEntities(generateEntity());
Assert.assertEquals(0, response.getErrors().size());
} catch (YarnException e) {
Assert.fail(
"putEntities should already return before throwing the exception");
}
}
@Test
public void testPostEntitiesTimelineServiceDefaultNotEnabled()
throws Exception {
YarnConfiguration conf = new YarnConfiguration();
// Unset the timeline service's enabled properties.
// Make sure default value is pickup up
conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED);
TimelineClientImpl client = createTimelineClient(conf);
mockEntityClientResponse(client, ClientResponse.Status.INTERNAL_SERVER_ERROR,
false, false);
try {
TimelinePutResponse response = client.putEntities(generateEntity());
Assert.assertEquals(0, response.getErrors().size());
} catch (YarnException e) {
Assert
.fail("putEntities should already return before throwing the exception");
}
}
@Test @Test
public void testPutDomain() throws Exception { public void testPutDomain() throws Exception {
mockDomainClientResponse(client, ClientResponse.Status.OK, false); mockDomainClientResponse(client, ClientResponse.Status.OK, false);