From 1d4612f5ad9678c952b416e798dccd20c88f96ef Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Thu, 9 Oct 2014 12:59:47 -0700 Subject: [PATCH] YARN-2629. Made the distributed shell use the domain-based timeline ACLs. Contributed by Zhijie Shen. --- hadoop-yarn-project/CHANGES.txt | 3 + .../distributedshell/ApplicationMaster.java | 120 +++++++++++------- .../applications/distributedshell/Client.java | 75 ++++++++++- .../distributedshell/DSConstants.java | 5 + .../TestDistributedShell.java | 52 +++++++- .../client/api/impl/TimelineClientImpl.java | 3 - 6 files changed, 205 insertions(+), 53 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5186224e7c5..1e016e7f668 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -324,6 +324,9 @@ Release 2.6.0 - UNRELEASED YARN-2615. Changed ClientToAMTokenIdentifier/RM(Timeline)DelegationTokenIdentifier to use protobuf as payload. (Junping Du via jianhe) + YARN-2629. Made the distributed shell use the domain-based timeline ACLs. + (zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index df9f34b56e3..e6ded009b0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.StringReader; +import java.lang.reflect.UndeclaredThrowableException; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -246,6 +248,9 @@ public class ApplicationMaster { // File length needed for local resource private long shellScriptPathLen = 0; + // Timeline domain ID + private String domainId = null; + // Hardcoded path to shell script in launch container's local env private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh"; private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH @@ -465,7 +470,9 @@ public class ApplicationMaster { shellScriptPathLen = Long.valueOf(envs .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); } - + if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { + domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); + } if (!scriptPath.isEmpty() && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { LOG.error("Illegal values in env for shell script path" + ", path=" @@ -515,13 +522,6 @@ public class ApplicationMaster { @SuppressWarnings({ "unchecked" }) public void run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); - try { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START); - } catch (Exception e) { - LOG.error("App Attempt start event could not be published for " - + appAttemptID.toString(), e); - } // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class // are marked as LimitedPrivate @@ -548,6 +548,9 @@ public class ApplicationMaster { UserGroupInformation.createRemoteUser(appSubmitterUserName); appSubmitterUgi.addCredentials(credentials); + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); @@ -612,13 +615,9 @@ public class ApplicationMaster { amRMClient.addContainerRequest(containerAsk); } numRequestedContainers.set(numTotalContainers); - try { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_END); - } catch (Exception e) { - LOG.error("App Attempt start event could not be published for " - + appAttemptID.toString(), e); - } + + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); } @VisibleForTesting @@ -724,12 +723,8 @@ public class ApplicationMaster { LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); } - try { - publishContainerEndEvent(timelineClient, containerStatus); - } catch (Exception e) { - LOG.error("Container start event could not be published for " - + containerStatus.getContainerId().toString(), e); - } + publishContainerEndEvent( + timelineClient, containerStatus, domainId, appSubmitterUgi); } // ask for more containers if any failed @@ -844,13 +839,9 @@ public class ApplicationMaster { if (container != null) { applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } - try { - ApplicationMaster.publishContainerStartEvent( - applicationMaster.timelineClient, container); - } catch (Exception e) { - LOG.error("Container start event could not be published for " - + container.getId().toString(), e); - } + ApplicationMaster.publishContainerStartEvent( + applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); } @Override @@ -1050,13 +1041,14 @@ public class ApplicationMaster { } } - private static void publishContainerStartEvent(TimelineClient timelineClient, - Container container) throws IOException, YarnException { - TimelineEntity entity = new TimelineEntity(); + private static void publishContainerStartEvent( + final TimelineClient timelineClient, Container container, String domainId, + UserGroupInformation ugi) { + final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); - entity.addPrimaryFilter("user", - UserGroupInformation.getCurrentUser().getShortUserName()); + entity.setDomainId(domainId); + entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_START.toString()); @@ -1064,16 +1056,28 @@ public class ApplicationMaster { event.addEventInfo("Resources", container.getResource().toString()); entity.addEvent(event); - timelineClient.putEntities(entity); + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return timelineClient.putEntities(entity); + } + }); + } catch (Exception e) { + LOG.error("Container start event could not be published for " + + container.getId().toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } } - private static void publishContainerEndEvent(TimelineClient timelineClient, - ContainerStatus container) throws IOException, YarnException { - TimelineEntity entity = new TimelineEntity(); + private static void publishContainerEndEvent( + final TimelineClient timelineClient, ContainerStatus container, + String domainId, UserGroupInformation ugi) { + final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getContainerId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); - entity.addPrimaryFilter("user", - UserGroupInformation.getCurrentUser().getShortUserName()); + entity.setDomainId(domainId); + entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_END.toString()); @@ -1081,22 +1085,46 @@ public class ApplicationMaster { event.addEventInfo("Exit Status", container.getExitStatus()); entity.addEvent(event); - timelineClient.putEntities(entity); + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return timelineClient.putEntities(entity); + } + }); + } catch (Exception e) { + LOG.error("Container end event could not be published for " + + container.getContainerId().toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } } private static void publishApplicationAttemptEvent( - TimelineClient timelineClient, String appAttemptId, DSEvent appEvent) - throws IOException, YarnException { - TimelineEntity entity = new TimelineEntity(); + final TimelineClient timelineClient, String appAttemptId, + DSEvent appEvent, String domainId, UserGroupInformation ugi) { + final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(appAttemptId); entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); - entity.addPrimaryFilter("user", - UserGroupInformation.getCurrentUser().getShortUserName()); + entity.setDomainId(domainId); + entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setEventType(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); entity.addEvent(event); - timelineClient.putEntities(entity); + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return timelineClient.putEntities(entity); + } + }); + } catch (Exception e) { + LOG.error("App Attempt " + + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + + " event could not be published for " + + appAttemptId.toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index f3ce64ce074..2067aca4810 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -70,11 +70,14 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** * Client for Distributed Shell application submission to YARN. @@ -165,7 +168,19 @@ public class Client { private long attemptFailuresValidityInterval = -1; // Debug flag - boolean debugFlag = false; + boolean debugFlag = false; + + // Timeline domain ID + private String domainId = null; + + // Flag to indicate whether to create the domain of the given ID + private boolean toCreateDomain = false; + + // Timeline domain reader access control + private String viewACLs = null; + + // Timeline domain writer access control + private String modifyACLs = null; // Command line options private Options opts; @@ -256,6 +271,14 @@ public class Client { "If failure count reaches to maxAppAttempts, " + "the application will be failed."); opts.addOption("debug", false, "Dump out debug information"); + opts.addOption("domain", true, "ID of the timeline domain where the " + + "timeline entities will be put"); + opts.addOption("view_acls", true, "Users and groups that allowed to " + + "view the timeline entities in the given domain"); + opts.addOption("modify_acls", true, "Users and groups that allowed to " + + "modify the timeline entities in the given domain"); + opts.addOption("create", false, "Flag to indicate whether to create the " + + "domain specified with -domain."); opts.addOption("help", false, "Print usage"); } @@ -385,6 +408,18 @@ public class Client { log4jPropFile = cliParser.getOptionValue("log_properties", ""); + // Get timeline domain options + if (cliParser.hasOption("domain")) { + domainId = cliParser.getOptionValue("domain"); + toCreateDomain = cliParser.hasOption("create"); + if (cliParser.hasOption("view_acls")) { + viewACLs = cliParser.getOptionValue("view_acls"); + } + if (cliParser.hasOption("modify_acls")) { + modifyACLs = cliParser.getOptionValue("modify_acls"); + } + } + return true; } @@ -431,6 +466,10 @@ public class Client { } } + if (domainId != null && domainId.length() > 0 && toCreateDomain) { + prepareTimelineDomain(); + } + // Get a new application id YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); @@ -535,6 +574,9 @@ public class Client { env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); + if (domainId != null && domainId.length() > 0) { + env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId); + } // Add AppMaster.jar location to classpath // At some point we should not be required to add @@ -773,4 +815,35 @@ public class Client { scFileStatus.getLen(), scFileStatus.getModificationTime()); localResources.put(fileDstPath, scRsrc); } + + private void prepareTimelineDomain() { + TimelineClient timelineClient = null; + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineClient.start(); + } else { + LOG.warn("Cannot put the domain " + domainId + + " because the timeline service is not enabled"); + return; + } + try { + //TODO: we need to check and combine the existing timeline domain ACLs, + //but let's do it once we have client java library to query domains. + TimelineDomain domain = new TimelineDomain(); + domain.setId(domainId); + domain.setReaders( + viewACLs != null && viewACLs.length() > 0 ? viewACLs : " "); + domain.setWriters( + modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " "); + timelineClient.putDomain(domain); + LOG.info("Put the timeline domain: " + + TimelineUtils.dumpTimelineRecordtoJSON(domain)); + } catch (Exception e) { + LOG.error("Error when putting the timeline domain", e); + } finally { + timelineClient.stop(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java index 5912f14434b..fbaf2d47fa1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java @@ -44,4 +44,9 @@ public class DSConstants { * Used to validate the local resource. */ public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN"; + + /** + * Environment key name denoting the timeline domain ID. + */ + public static final String DISTRIBUTEDSHELLTIMELINEDOMAIN = "DISTRIBUTEDSHELLTIMELINEDOMAIN"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 6dff94c7684..2414d4d3ea9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -29,10 +29,10 @@ import java.io.PrintWriter; import java.net.InetAddress; import java.net.URL; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,6 +43,7 @@ import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -129,8 +131,16 @@ public class TestDistributedShell { } @Test(timeout=90000) - public void testDSShell() throws Exception { + public void testDSShellWithDomain() throws Exception { + testDSShell(true); + } + @Test(timeout=90000) + public void testDSShellWithoutDomain() throws Exception { + testDSShell(false); + } + + public void testDSShell(boolean haveDomain) throws Exception { String[] args = { "--jar", APPMASTER_JAR, @@ -147,6 +157,20 @@ public class TestDistributedShell { "--container_vcores", "1" }; + if (haveDomain) { + String[] domainArgs = { + "--domain", + "TEST_DOMAIN", + "--view_acls", + "reader_user reader_group", + "--modify_acls", + "writer_user writer_group", + "--create" + }; + List argsList = new ArrayList(Arrays.asList(args)); + argsList.addAll(Arrays.asList(domainArgs)); + args = argsList.toArray(new String[argsList.size()]); + } LOG.info("Initializing DS Client"); final Client client = new Client(new Configuration(yarnCluster.getConfig())); @@ -198,7 +222,15 @@ public class TestDistributedShell { t.join(); LOG.info("Client run completed. Result=" + result); Assert.assertTrue(result.get()); - + + TimelineDomain domain = null; + if (haveDomain) { + domain = yarnCluster.getApplicationHistoryServer() + .getTimelineStore().getDomain("TEST_DOMAIN"); + Assert.assertNotNull(domain); + Assert.assertEquals("reader_user reader_group", domain.getReaders()); + Assert.assertEquals("writer_user writer_group", domain.getWriters()); + } TimelineEntities entitiesAttempts = yarnCluster .getApplicationHistoryServer() .getTimelineStore() @@ -210,6 +242,13 @@ public class TestDistributedShell { .size()); Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType() .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString()); + if (haveDomain) { + Assert.assertEquals(domain.getId(), + entitiesAttempts.getEntities().get(0).getDomainId()); + } else { + Assert.assertEquals("DEFAULT", + entitiesAttempts.getEntities().get(0).getDomainId()); + } TimelineEntities entities = yarnCluster .getApplicationHistoryServer() .getTimelineStore() @@ -219,6 +258,13 @@ public class TestDistributedShell { Assert.assertEquals(2, entities.getEntities().size()); Assert.assertEquals(entities.getEntities().get(0).getEntityType() .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString()); + if (haveDomain) { + Assert.assertEquals(domain.getId(), + entities.getEntities().get(0).getDomainId()); + } else { + Assert.assertEquals("DEFAULT", + entities.getEntities().get(0).getDomainId()); + } } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 13ce0742d11..fbddd14ac4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -210,9 +210,6 @@ public class TimelineClientImpl extends TimelineClient { @VisibleForTesting public ClientResponse doPostingObject(Object object, String path) { WebResource webResource = client.resource(resURI); - if (path != null) { - webResource.path(path); - } if (path == null) { return webResource.accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON)