YARN-6137. Yarn client implicitly invoke ATS client which accesses HDFS. Contributed by Li Lu

(cherry picked from commit 37b4acf7ce)
This commit is contained in:
Jason Lowe 2017-02-08 14:59:09 -06:00
parent 959db38662
commit af65f8bb59
2 changed files with 40 additions and 38 deletions

View File

@ -134,7 +134,7 @@ public class YarnClientImpl extends YarnClient {
private long asyncApiPollTimeoutMillis; private long asyncApiPollTimeoutMillis;
protected AHSClient historyClient; protected AHSClient historyClient;
private boolean historyServiceEnabled; private boolean historyServiceEnabled;
protected TimelineClient timelineClient; protected volatile TimelineClient timelineClient;
@VisibleForTesting @VisibleForTesting
Text timelineService; Text timelineService;
@VisibleForTesting @VisibleForTesting
@ -174,24 +174,9 @@ public class YarnClientImpl extends YarnClient {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
try {
timelineServiceEnabled = true; timelineServiceEnabled = true;
timelineClient = createTimelineClient();
timelineClient.init(conf);
timelineDTRenewer = getTimelineDelegationTokenRenewer(conf); timelineDTRenewer = getTimelineDelegationTokenRenewer(conf);
timelineService = TimelineUtils.buildTimelineTokenService(conf); timelineService = TimelineUtils.buildTimelineTokenService(conf);
} catch (NoClassDefFoundError error) {
// When attempt to initiate the timeline client with
// different set of dependencies, it may fail with
// NoClassDefFoundError. When some of them are not compatible
// with timeline server. This is not necessarily a fatal error
// to the client.
LOG.warn("Timeline client could not be initialized "
+ "because dependency missing or incompatible,"
+ " disabling timeline client.",
error);
timelineServiceEnabled = false;
}
} }
timelineServiceBestEffort = conf.getBoolean( timelineServiceBestEffort = conf.getBoolean(
@ -212,9 +197,6 @@ public class YarnClientImpl extends YarnClient {
if (historyServiceEnabled) { if (historyServiceEnabled) {
historyClient.start(); historyClient.start();
} }
if (timelineServiceEnabled) {
timelineClient.start();
}
} catch (IOException e) { } catch (IOException e) {
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);
} }
@ -229,7 +211,7 @@ public class YarnClientImpl extends YarnClient {
if (historyServiceEnabled) { if (historyServiceEnabled) {
historyClient.stop(); historyClient.stop();
} }
if (timelineServiceEnabled) { if (timelineClient != null) {
timelineClient.stop(); timelineClient.stop();
} }
super.serviceStop(); super.serviceStop();
@ -369,8 +351,18 @@ public class YarnClientImpl extends YarnClient {
org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier> org.apache.hadoop.security.token.Token<TimelineDelegationTokenIdentifier>
getTimelineDelegationToken() throws IOException, YarnException { getTimelineDelegationToken() throws IOException, YarnException {
try { try {
// Only reachable when both security and timeline service are enabled.
if (timelineClient == null) {
synchronized (this) {
if (timelineClient == null) {
timelineClient = createTimelineClient();
timelineClient.init(getConfig());
timelineClient.start();
}
}
}
return timelineClient.getDelegationToken(timelineDTRenewer); return timelineClient.getDelegationToken(timelineDTRenewer);
} catch (Exception e ) { } catch (Exception e) {
if (timelineServiceBestEffort) { if (timelineServiceBestEffort) {
LOG.warn("Failed to get delegation token from the timeline server: " LOG.warn("Failed to get delegation token from the timeline server: "
+ e.getMessage()); + e.getMessage());

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.client.api.impl; package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -157,25 +156,36 @@ public class TestYarnClient {
} }
@Test @Test
public void testTimelineClientInitFailure() throws Exception{ public void testStartWithTimelineV15Failure() throws Exception{
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT,
true);
YarnClient client = YarnClient.createYarnClient(); YarnClient client = YarnClient.createYarnClient();
if(client instanceof YarnClientImpl) { if(client instanceof YarnClientImpl) {
YarnClientImpl impl = (YarnClientImpl) client; YarnClientImpl impl = (YarnClientImpl) client;
YarnClientImpl spyClient = spy(impl); YarnClientImpl spyClient = spy(impl);
when(spyClient.createTimelineClient()).thenThrow( when(spyClient.createTimelineClient()).thenThrow(
new NoClassDefFoundError( new IOException("ATS v1.5 client initialization failed. "));
"Mock a failure when init timeline instance"));
spyClient.init(conf); spyClient.init(conf);
spyClient.start(); spyClient.start();
assertFalse("Timeline client should be disabled when" spyClient.getTimelineDelegationToken();
+ "it is failed to init",
spyClient.timelineServiceEnabled);
spyClient.stop(); spyClient.stop();
} }
} }
@Test
public void testStartWithTimelineV15() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
YarnClientImpl client = (YarnClientImpl) YarnClient.createYarnClient();
client.init(conf);
client.start();
client.stop();
}
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Test (timeout = 30000) @Test (timeout = 30000)
public void testSubmitApplication() throws Exception { public void testSubmitApplication() throws Exception {