YARN-2629. Made the distributed shell use the domain-based timeline ACLs. Contributed by Zhijie Shen.

This commit is contained in:
Zhijie Shen 2014-10-09 12:59:47 -07:00
parent d7b647f0ee
commit 1d4612f5ad
6 changed files with 205 additions and 53 deletions

View File

@ -324,6 +324,9 @@ Release 2.6.0 - UNRELEASED
YARN-2615. Changed ClientToAMTokenIdentifier/RM(Timeline)DelegationTokenIdentifier
to use protobuf as payload. (Junping Du via jianhe)
YARN-2629. Made the distributed shell use the domain-based timeline ACLs.
(zjshen)
OPTIMIZATIONS
BUG FIXES

View File

@ -24,6 +24,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.StringReader;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
@ -86,6 +87,7 @@
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@ -246,6 +248,9 @@ public static enum DSEntity {
// File length needed for local resource
private long shellScriptPathLen = 0;
// Timeline domain ID
private String domainId = null;
// Hardcoded path to shell script in launch container's local env
private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
@ -465,7 +470,9 @@ public boolean init(String[] args) throws ParseException, IOException {
shellScriptPathLen = Long.valueOf(envs
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
}
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
}
if (!scriptPath.isEmpty()
&& (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
LOG.error("Illegal values in env for shell script path" + ", path="
@ -515,13 +522,6 @@ private void printUsage(Options opts) {
@SuppressWarnings({ "unchecked" })
public void run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster");
try {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START);
} catch (Exception e) {
LOG.error("App Attempt start event could not be published for "
+ appAttemptID.toString(), e);
}
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
// are marked as LimitedPrivate
@ -548,6 +548,9 @@ public void run() throws YarnException, IOException {
UserGroupInformation.createRemoteUser(appSubmitterUserName);
appSubmitterUgi.addCredentials(credentials);
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
@ -612,13 +615,9 @@ public void run() throws YarnException, IOException {
amRMClient.addContainerRequest(containerAsk);
}
numRequestedContainers.set(numTotalContainers);
try {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END);
} catch (Exception e) {
LOG.error("App Attempt start event could not be published for "
+ appAttemptID.toString(), e);
}
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
@VisibleForTesting
@ -724,12 +723,8 @@ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
}
try {
publishContainerEndEvent(timelineClient, containerStatus);
} catch (Exception e) {
LOG.error("Container start event could not be published for "
+ containerStatus.getContainerId().toString(), e);
}
publishContainerEndEvent(
timelineClient, containerStatus, domainId, appSubmitterUgi);
}
// ask for more containers if any failed
@ -844,13 +839,9 @@ public void onContainerStarted(ContainerId containerId,
if (container != null) {
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
}
try {
ApplicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container);
} catch (Exception e) {
LOG.error("Container start event could not be published for "
+ container.getId().toString(), e);
}
ApplicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
}
@Override
@ -1050,13 +1041,14 @@ private String readContent(String filePath) throws IOException {
}
}
private static void publishContainerStartEvent(TimelineClient timelineClient,
Container container) throws IOException, YarnException {
TimelineEntity entity = new TimelineEntity();
private static void publishContainerStartEvent(
final TimelineClient timelineClient, Container container, String domainId,
UserGroupInformation ugi) {
final TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
entity.addPrimaryFilter("user",
UserGroupInformation.getCurrentUser().getShortUserName());
entity.setDomainId(domainId);
entity.addPrimaryFilter("user", ugi.getShortUserName());
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_START.toString());
@ -1064,16 +1056,28 @@ private static void publishContainerStartEvent(TimelineClient timelineClient,
event.addEventInfo("Resources", container.getResource().toString());
entity.addEvent(event);
timelineClient.putEntities(entity);
try {
ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
@Override
public TimelinePutResponse run() throws Exception {
return timelineClient.putEntities(entity);
}
});
} catch (Exception e) {
LOG.error("Container start event could not be published for "
+ container.getId().toString(),
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
private static void publishContainerEndEvent(TimelineClient timelineClient,
ContainerStatus container) throws IOException, YarnException {
TimelineEntity entity = new TimelineEntity();
private static void publishContainerEndEvent(
final TimelineClient timelineClient, ContainerStatus container,
String domainId, UserGroupInformation ugi) {
final TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getContainerId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
entity.addPrimaryFilter("user",
UserGroupInformation.getCurrentUser().getShortUserName());
entity.setDomainId(domainId);
entity.addPrimaryFilter("user", ugi.getShortUserName());
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_END.toString());
@ -1081,22 +1085,46 @@ private static void publishContainerEndEvent(TimelineClient timelineClient,
event.addEventInfo("Exit Status", container.getExitStatus());
entity.addEvent(event);
timelineClient.putEntities(entity);
try {
ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
@Override
public TimelinePutResponse run() throws Exception {
return timelineClient.putEntities(entity);
}
});
} catch (Exception e) {
LOG.error("Container end event could not be published for "
+ container.getContainerId().toString(),
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
private static void publishApplicationAttemptEvent(
TimelineClient timelineClient, String appAttemptId, DSEvent appEvent)
throws IOException, YarnException {
TimelineEntity entity = new TimelineEntity();
final TimelineClient timelineClient, String appAttemptId,
DSEvent appEvent, String domainId, UserGroupInformation ugi) {
final TimelineEntity entity = new TimelineEntity();
entity.setEntityId(appAttemptId);
entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
entity.addPrimaryFilter("user",
UserGroupInformation.getCurrentUser().getShortUserName());
entity.setDomainId(domainId);
entity.addPrimaryFilter("user", ugi.getShortUserName());
TimelineEvent event = new TimelineEvent();
event.setEventType(appEvent.toString());
event.setTimestamp(System.currentTimeMillis());
entity.addEvent(event);
timelineClient.putEntities(entity);
try {
ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
@Override
public TimelinePutResponse run() throws Exception {
return timelineClient.putEntities(entity);
}
});
} catch (Exception e) {
LOG.error("App Attempt "
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+ " event could not be published for "
+ appAttemptId.toString(),
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
}

View File

@ -70,11 +70,14 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* Client for Distributed Shell application submission to YARN.
@ -165,7 +168,19 @@ public class Client {
private long attemptFailuresValidityInterval = -1;
// Debug flag
boolean debugFlag = false;
boolean debugFlag = false;
// Timeline domain ID
private String domainId = null;
// Flag to indicate whether to create the domain of the given ID
private boolean toCreateDomain = false;
// Timeline domain reader access control
private String viewACLs = null;
// Timeline domain writer access control
private String modifyACLs = null;
// Command line options
private Options opts;
@ -256,6 +271,14 @@ public Client(Configuration conf) throws Exception {
"If failure count reaches to maxAppAttempts, " +
"the application will be failed.");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("domain", true, "ID of the timeline domain where the "
+ "timeline entities will be put");
opts.addOption("view_acls", true, "Users and groups that allowed to "
+ "view the timeline entities in the given domain");
opts.addOption("modify_acls", true, "Users and groups that allowed to "
+ "modify the timeline entities in the given domain");
opts.addOption("create", false, "Flag to indicate whether to create the "
+ "domain specified with -domain.");
opts.addOption("help", false, "Print usage");
}
@ -385,6 +408,18 @@ public boolean init(String[] args) throws ParseException {
log4jPropFile = cliParser.getOptionValue("log_properties", "");
// Get timeline domain options
if (cliParser.hasOption("domain")) {
domainId = cliParser.getOptionValue("domain");
toCreateDomain = cliParser.hasOption("create");
if (cliParser.hasOption("view_acls")) {
viewACLs = cliParser.getOptionValue("view_acls");
}
if (cliParser.hasOption("modify_acls")) {
modifyACLs = cliParser.getOptionValue("modify_acls");
}
}
return true;
}
@ -431,6 +466,10 @@ public boolean run() throws IOException, YarnException {
}
}
if (domainId != null && domainId.length() > 0 && toCreateDomain) {
prepareTimelineDomain();
}
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
@ -535,6 +574,9 @@ public boolean run() throws IOException, YarnException {
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
if (domainId != null && domainId.length() > 0) {
env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
}
// Add AppMaster.jar location to classpath
// At some point we should not be required to add
@ -773,4 +815,35 @@ private void addToLocalResources(FileSystem fs, String fileSrcPath,
scFileStatus.getLen(), scFileStatus.getModificationTime());
localResources.put(fileDstPath, scRsrc);
}
private void prepareTimelineDomain() {
TimelineClient timelineClient = null;
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
timelineClient.start();
} else {
LOG.warn("Cannot put the domain " + domainId +
" because the timeline service is not enabled");
return;
}
try {
//TODO: we need to check and combine the existing timeline domain ACLs,
//but let's do it once we have client java library to query domains.
TimelineDomain domain = new TimelineDomain();
domain.setId(domainId);
domain.setReaders(
viewACLs != null && viewACLs.length() > 0 ? viewACLs : " ");
domain.setWriters(
modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " ");
timelineClient.putDomain(domain);
LOG.info("Put the timeline domain: " +
TimelineUtils.dumpTimelineRecordtoJSON(domain));
} catch (Exception e) {
LOG.error("Error when putting the timeline domain", e);
} finally {
timelineClient.stop();
}
}
}

View File

@ -44,4 +44,9 @@ public class DSConstants {
* Used to validate the local resource.
*/
public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN";
/**
* Environment key name denoting the timeline domain ID.
*/
public static final String DISTRIBUTEDSHELLTIMELINEDOMAIN = "DISTRIBUTEDSHELLTIMELINEDOMAIN";
}

View File

@ -29,10 +29,10 @@
import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -43,6 +43,7 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -52,6 +53,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -129,8 +131,16 @@ public void tearDown() throws IOException {
}
@Test(timeout=90000)
public void testDSShell() throws Exception {
public void testDSShellWithDomain() throws Exception {
testDSShell(true);
}
@Test(timeout=90000)
public void testDSShellWithoutDomain() throws Exception {
testDSShell(false);
}
public void testDSShell(boolean haveDomain) throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
@ -147,6 +157,20 @@ public void testDSShell() throws Exception {
"--container_vcores",
"1"
};
if (haveDomain) {
String[] domainArgs = {
"--domain",
"TEST_DOMAIN",
"--view_acls",
"reader_user reader_group",
"--modify_acls",
"writer_user writer_group",
"--create"
};
List<String> argsList = new ArrayList<String>(Arrays.asList(args));
argsList.addAll(Arrays.asList(domainArgs));
args = argsList.toArray(new String[argsList.size()]);
}
LOG.info("Initializing DS Client");
final Client client = new Client(new Configuration(yarnCluster.getConfig()));
@ -198,7 +222,15 @@ public void run() {
t.join();
LOG.info("Client run completed. Result=" + result);
Assert.assertTrue(result.get());
TimelineDomain domain = null;
if (haveDomain) {
domain = yarnCluster.getApplicationHistoryServer()
.getTimelineStore().getDomain("TEST_DOMAIN");
Assert.assertNotNull(domain);
Assert.assertEquals("reader_user reader_group", domain.getReaders());
Assert.assertEquals("writer_user writer_group", domain.getWriters());
}
TimelineEntities entitiesAttempts = yarnCluster
.getApplicationHistoryServer()
.getTimelineStore()
@ -210,6 +242,13 @@ public void run() {
.size());
Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType()
.toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
if (haveDomain) {
Assert.assertEquals(domain.getId(),
entitiesAttempts.getEntities().get(0).getDomainId());
} else {
Assert.assertEquals("DEFAULT",
entitiesAttempts.getEntities().get(0).getDomainId());
}
TimelineEntities entities = yarnCluster
.getApplicationHistoryServer()
.getTimelineStore()
@ -219,6 +258,13 @@ public void run() {
Assert.assertEquals(2, entities.getEntities().size());
Assert.assertEquals(entities.getEntities().get(0).getEntityType()
.toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString());
if (haveDomain) {
Assert.assertEquals(domain.getId(),
entities.getEntities().get(0).getDomainId());
} else {
Assert.assertEquals("DEFAULT",
entities.getEntities().get(0).getDomainId());
}
}
/*

View File

@ -210,9 +210,6 @@ public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
@VisibleForTesting
public ClientResponse doPostingObject(Object object, String path) {
WebResource webResource = client.resource(resURI);
if (path != null) {
webResource.path(path);
}
if (path == null) {
return webResource.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)