YARN-3522. Fixed DistributedShell to instantiate TimeLineClient as the correct user. Contributed by Zhijie Shen

This commit is contained in:
Jian He 2015-04-23 11:07:26 -07:00
parent 395205444e
commit aa4a192feb
6 changed files with 107 additions and 45 deletions

View File

@ -277,6 +277,9 @@ Release 2.7.1 - UNRELEASED
YARN-2605. [RM HA] Rest api endpoints doing redirect incorrectly.
(Xuan Gong via stevel)
YARN-3522. Fixed DistributedShell to instantiate TimeLineClient as the
correct user. (Zhijie Shen via jianhe)
Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES

View File

@ -188,13 +188,14 @@ public static enum DSEntity {
private AMRMClientAsync amRMClient;
// In both secure and non-secure modes, this points to the job-submitter.
private UserGroupInformation appSubmitterUgi;
@VisibleForTesting
UserGroupInformation appSubmitterUgi;
// Handle to communicate with the Node Manager
private NMClientAsync nmClientAsync;
// Listen to process the response from the Node Manager
private NMCallbackHandler containerListener;
// Application Attempt Id ( combination of attemptId and fail count )
@VisibleForTesting
protected ApplicationAttemptId appAttemptID;
@ -270,7 +271,8 @@ public static enum DSEntity {
private List<Thread> launchThreads = new ArrayList<Thread>();
// Timeline Client
private TimelineClient timelineClient;
@VisibleForTesting
TimelineClient timelineClient;
private final String linux_bash_command = "bash";
private final String windows_command = "cmd /c";
@ -496,18 +498,6 @@ public boolean init(String[] args) throws ParseException, IOException {
}
requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0"));
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
// Creating the Timeline Client
timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
timelineClient.start();
} else {
timelineClient = null;
LOG.warn("Timeline service is not enabled");
}
return true;
}
@ -527,7 +517,7 @@ private void printUsage(Options opts) {
* @throws IOException
*/
@SuppressWarnings({ "unchecked" })
public void run() throws YarnException, IOException {
public void run() throws YarnException, IOException, InterruptedException {
LOG.info("Starting ApplicationMaster");
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
@ -554,11 +544,7 @@ public void run() throws YarnException, IOException {
appSubmitterUgi =
UserGroupInformation.createRemoteUser(appSubmitterUserName);
appSubmitterUgi.addCredentials(credentials);
if(timelineClient != null) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
@ -570,6 +556,12 @@ public void run() throws YarnException, IOException {
nmClientAsync.init(conf);
nmClientAsync.start();
startTimelineClient(conf);
if(timelineClient != null) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
// Setup local RPC Server to accept status requests directly from clients
// TODO need to setup a protocol for client to be able to communicate to
// the RPC server
@ -624,10 +616,30 @@ public void run() throws YarnException, IOException {
amRMClient.addContainerRequest(containerAsk);
}
numRequestedContainers.set(numTotalContainers);
}
if(timelineClient != null) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
@VisibleForTesting
void startTimelineClient(final Configuration conf)
throws YarnException, IOException, InterruptedException {
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
// Creating the Timeline Client
timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
timelineClient.start();
} else {
timelineClient = null;
LOG.warn("Timeline service is not enabled");
}
return null;
}
});
} catch (UndeclaredThrowableException e) {
throw new YarnException(e.getCause());
}
}
@ -646,6 +658,11 @@ protected boolean finish() {
} catch (InterruptedException ex) {}
}
if(timelineClient != null) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
// Join all launched threads
// needed for when we time out
// and we need to release containers
@ -1104,18 +1121,11 @@ private static void publishContainerEndEvent(
event.addEventInfo("State", container.getState().name());
event.addEventInfo("Exit Status", container.getExitStatus());
entity.addEvent(event);
try {
ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
@Override
public TimelinePutResponse run() throws Exception {
return timelineClient.putEntities(entity);
}
});
} catch (Exception e) {
timelineClient.putEntities(entity);
} catch (YarnException | IOException e) {
LOG.error("Container end event could not be published for "
+ container.getContainerId().toString(),
e instanceof UndeclaredThrowableException ? e.getCause() : e);
+ container.getContainerId().toString(), e);
}
}
@ -1131,20 +1141,13 @@ private static void publishApplicationAttemptEvent(
event.setEventType(appEvent.toString());
event.setTimestamp(System.currentTimeMillis());
entity.addEvent(event);
try {
ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
@Override
public TimelinePutResponse run() throws Exception {
return timelineClient.putEntities(entity);
}
});
} catch (Exception e) {
timelineClient.putEntities(entity);
} catch (YarnException | IOException e) {
LOG.error("App Attempt "
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+ " event could not be published for "
+ appAttemptId.toString(),
e instanceof UndeclaredThrowableException ? e.getCause() : e);
+ appAttemptId.toString(), e);
}
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Test;
public class TestDSAppMaster {
@Test
public void testTimelineClientInDSAppMaster() throws Exception {
ApplicationMaster appMaster = new ApplicationMaster();
appMaster.appSubmitterUgi =
UserGroupInformation.createUserForTesting("foo", new String[]{"bar"});
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
appMaster.startTimelineClient(conf);
Assert.assertEquals(appMaster.appSubmitterUgi,
((TimelineClientImpl)appMaster.timelineClient).getUgi());
}
}

View File

@ -29,7 +29,7 @@ public class TestDSFailedAppMaster extends ApplicationMaster {
private static final Log LOG = LogFactory.getLog(TestDSFailedAppMaster.class);
@Override
public void run() throws YarnException, IOException {
public void run() throws YarnException, IOException, InterruptedException {
super.run();
// for the 2nd attempt.

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
@ -40,6 +41,15 @@
@Unstable
public abstract class TimelineClient extends AbstractService {
/**
* Create a timeline client. The current UGI when the user initialize the
* client will be used to do the put and the delegation token operations. The
* current user may use {@link UserGroupInformation#doAs} another user to
* construct and initialize a timeline client if the following operations are
* supposed to be conducted by that user.
*
* @return a timeline client
*/
@Public
public static TimelineClient createTimelineClient() {
TimelineClient client = new TimelineClientImpl();

View File

@ -642,4 +642,9 @@ private static void printUsage() {
new HelpFormatter().printHelp("TimelineClient", opts);
}
@VisibleForTesting
@Private
public UserGroupInformation getUgi() {
return authUgi;
}
}