YARN-2629. Made the distributed shell use the domain-based timeline ACLs. Contributed by Zhijie Shen.
(cherry picked from commit 1d4612f5ad
)
This commit is contained in:
parent
971eb31d28
commit
4d170d3db6
|
@ -294,6 +294,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2615. Changed ClientToAMTokenIdentifier/RM(Timeline)DelegationTokenIdentifier
|
YARN-2615. Changed ClientToAMTokenIdentifier/RM(Timeline)DelegationTokenIdentifier
|
||||||
to use protobuf as payload. (Junping Du via jianhe)
|
to use protobuf as payload. (Junping Du via jianhe)
|
||||||
|
|
||||||
|
YARN-2629. Made the distributed shell use the domain-based timeline ACLs.
|
||||||
|
(zjshen)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||||
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||||
|
@ -246,6 +248,9 @@ public class ApplicationMaster {
|
||||||
// File length needed for local resource
|
// File length needed for local resource
|
||||||
private long shellScriptPathLen = 0;
|
private long shellScriptPathLen = 0;
|
||||||
|
|
||||||
|
// Timeline domain ID
|
||||||
|
private String domainId = null;
|
||||||
|
|
||||||
// Hardcoded path to shell script in launch container's local env
|
// Hardcoded path to shell script in launch container's local env
|
||||||
private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
|
private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
|
||||||
private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
|
private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
|
||||||
|
@ -465,7 +470,9 @@ public class ApplicationMaster {
|
||||||
shellScriptPathLen = Long.valueOf(envs
|
shellScriptPathLen = Long.valueOf(envs
|
||||||
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
|
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
|
||||||
}
|
}
|
||||||
|
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
|
||||||
|
domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
|
||||||
|
}
|
||||||
if (!scriptPath.isEmpty()
|
if (!scriptPath.isEmpty()
|
||||||
&& (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
|
&& (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
|
||||||
LOG.error("Illegal values in env for shell script path" + ", path="
|
LOG.error("Illegal values in env for shell script path" + ", path="
|
||||||
|
@ -515,13 +522,6 @@ public class ApplicationMaster {
|
||||||
@SuppressWarnings({ "unchecked" })
|
@SuppressWarnings({ "unchecked" })
|
||||||
public void run() throws YarnException, IOException {
|
public void run() throws YarnException, IOException {
|
||||||
LOG.info("Starting ApplicationMaster");
|
LOG.info("Starting ApplicationMaster");
|
||||||
try {
|
|
||||||
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
||||||
DSEvent.DS_APP_ATTEMPT_START);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("App Attempt start event could not be published for "
|
|
||||||
+ appAttemptID.toString(), e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
|
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
|
||||||
// are marked as LimitedPrivate
|
// are marked as LimitedPrivate
|
||||||
|
@ -548,6 +548,9 @@ public class ApplicationMaster {
|
||||||
UserGroupInformation.createRemoteUser(appSubmitterUserName);
|
UserGroupInformation.createRemoteUser(appSubmitterUserName);
|
||||||
appSubmitterUgi.addCredentials(credentials);
|
appSubmitterUgi.addCredentials(credentials);
|
||||||
|
|
||||||
|
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
||||||
|
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
|
||||||
|
|
||||||
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
||||||
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
|
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
|
||||||
amRMClient.init(conf);
|
amRMClient.init(conf);
|
||||||
|
@ -612,13 +615,9 @@ public class ApplicationMaster {
|
||||||
amRMClient.addContainerRequest(containerAsk);
|
amRMClient.addContainerRequest(containerAsk);
|
||||||
}
|
}
|
||||||
numRequestedContainers.set(numTotalContainers);
|
numRequestedContainers.set(numTotalContainers);
|
||||||
try {
|
|
||||||
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
||||||
DSEvent.DS_APP_ATTEMPT_END);
|
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("App Attempt start event could not be published for "
|
|
||||||
+ appAttemptID.toString(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -724,12 +723,8 @@ public class ApplicationMaster {
|
||||||
LOG.info("Container completed successfully." + ", containerId="
|
LOG.info("Container completed successfully." + ", containerId="
|
||||||
+ containerStatus.getContainerId());
|
+ containerStatus.getContainerId());
|
||||||
}
|
}
|
||||||
try {
|
publishContainerEndEvent(
|
||||||
publishContainerEndEvent(timelineClient, containerStatus);
|
timelineClient, containerStatus, domainId, appSubmitterUgi);
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Container start event could not be published for "
|
|
||||||
+ containerStatus.getContainerId().toString(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ask for more containers if any failed
|
// ask for more containers if any failed
|
||||||
|
@ -844,13 +839,9 @@ public class ApplicationMaster {
|
||||||
if (container != null) {
|
if (container != null) {
|
||||||
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
|
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
ApplicationMaster.publishContainerStartEvent(
|
ApplicationMaster.publishContainerStartEvent(
|
||||||
applicationMaster.timelineClient, container);
|
applicationMaster.timelineClient, container,
|
||||||
} catch (Exception e) {
|
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
|
||||||
LOG.error("Container start event could not be published for "
|
|
||||||
+ container.getId().toString(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1050,13 +1041,14 @@ public class ApplicationMaster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void publishContainerStartEvent(TimelineClient timelineClient,
|
private static void publishContainerStartEvent(
|
||||||
Container container) throws IOException, YarnException {
|
final TimelineClient timelineClient, Container container, String domainId,
|
||||||
TimelineEntity entity = new TimelineEntity();
|
UserGroupInformation ugi) {
|
||||||
|
final TimelineEntity entity = new TimelineEntity();
|
||||||
entity.setEntityId(container.getId().toString());
|
entity.setEntityId(container.getId().toString());
|
||||||
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
|
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
|
||||||
entity.addPrimaryFilter("user",
|
entity.setDomainId(domainId);
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
entity.addPrimaryFilter("user", ugi.getShortUserName());
|
||||||
TimelineEvent event = new TimelineEvent();
|
TimelineEvent event = new TimelineEvent();
|
||||||
event.setTimestamp(System.currentTimeMillis());
|
event.setTimestamp(System.currentTimeMillis());
|
||||||
event.setEventType(DSEvent.DS_CONTAINER_START.toString());
|
event.setEventType(DSEvent.DS_CONTAINER_START.toString());
|
||||||
|
@ -1064,16 +1056,28 @@ public class ApplicationMaster {
|
||||||
event.addEventInfo("Resources", container.getResource().toString());
|
event.addEventInfo("Resources", container.getResource().toString());
|
||||||
entity.addEvent(event);
|
entity.addEvent(event);
|
||||||
|
|
||||||
timelineClient.putEntities(entity);
|
try {
|
||||||
|
ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
|
||||||
|
@Override
|
||||||
|
public TimelinePutResponse run() throws Exception {
|
||||||
|
return timelineClient.putEntities(entity);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Container start event could not be published for "
|
||||||
|
+ container.getId().toString(),
|
||||||
|
e instanceof UndeclaredThrowableException ? e.getCause() : e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void publishContainerEndEvent(TimelineClient timelineClient,
|
private static void publishContainerEndEvent(
|
||||||
ContainerStatus container) throws IOException, YarnException {
|
final TimelineClient timelineClient, ContainerStatus container,
|
||||||
TimelineEntity entity = new TimelineEntity();
|
String domainId, UserGroupInformation ugi) {
|
||||||
|
final TimelineEntity entity = new TimelineEntity();
|
||||||
entity.setEntityId(container.getContainerId().toString());
|
entity.setEntityId(container.getContainerId().toString());
|
||||||
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
|
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
|
||||||
entity.addPrimaryFilter("user",
|
entity.setDomainId(domainId);
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
entity.addPrimaryFilter("user", ugi.getShortUserName());
|
||||||
TimelineEvent event = new TimelineEvent();
|
TimelineEvent event = new TimelineEvent();
|
||||||
event.setTimestamp(System.currentTimeMillis());
|
event.setTimestamp(System.currentTimeMillis());
|
||||||
event.setEventType(DSEvent.DS_CONTAINER_END.toString());
|
event.setEventType(DSEvent.DS_CONTAINER_END.toString());
|
||||||
|
@ -1081,22 +1085,46 @@ public class ApplicationMaster {
|
||||||
event.addEventInfo("Exit Status", container.getExitStatus());
|
event.addEventInfo("Exit Status", container.getExitStatus());
|
||||||
entity.addEvent(event);
|
entity.addEvent(event);
|
||||||
|
|
||||||
timelineClient.putEntities(entity);
|
try {
|
||||||
|
ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
|
||||||
|
@Override
|
||||||
|
public TimelinePutResponse run() throws Exception {
|
||||||
|
return timelineClient.putEntities(entity);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Container end event could not be published for "
|
||||||
|
+ container.getContainerId().toString(),
|
||||||
|
e instanceof UndeclaredThrowableException ? e.getCause() : e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void publishApplicationAttemptEvent(
|
private static void publishApplicationAttemptEvent(
|
||||||
TimelineClient timelineClient, String appAttemptId, DSEvent appEvent)
|
final TimelineClient timelineClient, String appAttemptId,
|
||||||
throws IOException, YarnException {
|
DSEvent appEvent, String domainId, UserGroupInformation ugi) {
|
||||||
TimelineEntity entity = new TimelineEntity();
|
final TimelineEntity entity = new TimelineEntity();
|
||||||
entity.setEntityId(appAttemptId);
|
entity.setEntityId(appAttemptId);
|
||||||
entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
|
entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
|
||||||
entity.addPrimaryFilter("user",
|
entity.setDomainId(domainId);
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
entity.addPrimaryFilter("user", ugi.getShortUserName());
|
||||||
TimelineEvent event = new TimelineEvent();
|
TimelineEvent event = new TimelineEvent();
|
||||||
event.setEventType(appEvent.toString());
|
event.setEventType(appEvent.toString());
|
||||||
event.setTimestamp(System.currentTimeMillis());
|
event.setTimestamp(System.currentTimeMillis());
|
||||||
entity.addEvent(event);
|
entity.addEvent(event);
|
||||||
|
|
||||||
timelineClient.putEntities(entity);
|
try {
|
||||||
|
ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
|
||||||
|
@Override
|
||||||
|
public TimelinePutResponse run() throws Exception {
|
||||||
|
return timelineClient.putEntities(entity);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("App Attempt "
|
||||||
|
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
|
||||||
|
+ " event could not be published for "
|
||||||
|
+ appAttemptId.toString(),
|
||||||
|
e instanceof UndeclaredThrowableException ? e.getCause() : e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,11 +70,14 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Client for Distributed Shell application submission to YARN.
|
* Client for Distributed Shell application submission to YARN.
|
||||||
|
@ -167,6 +170,18 @@ public class Client {
|
||||||
// Debug flag
|
// Debug flag
|
||||||
boolean debugFlag = false;
|
boolean debugFlag = false;
|
||||||
|
|
||||||
|
// Timeline domain ID
|
||||||
|
private String domainId = null;
|
||||||
|
|
||||||
|
// Flag to indicate whether to create the domain of the given ID
|
||||||
|
private boolean toCreateDomain = false;
|
||||||
|
|
||||||
|
// Timeline domain reader access control
|
||||||
|
private String viewACLs = null;
|
||||||
|
|
||||||
|
// Timeline domain writer access control
|
||||||
|
private String modifyACLs = null;
|
||||||
|
|
||||||
// Command line options
|
// Command line options
|
||||||
private Options opts;
|
private Options opts;
|
||||||
|
|
||||||
|
@ -256,6 +271,14 @@ public class Client {
|
||||||
"If failure count reaches to maxAppAttempts, " +
|
"If failure count reaches to maxAppAttempts, " +
|
||||||
"the application will be failed.");
|
"the application will be failed.");
|
||||||
opts.addOption("debug", false, "Dump out debug information");
|
opts.addOption("debug", false, "Dump out debug information");
|
||||||
|
opts.addOption("domain", true, "ID of the timeline domain where the "
|
||||||
|
+ "timeline entities will be put");
|
||||||
|
opts.addOption("view_acls", true, "Users and groups that allowed to "
|
||||||
|
+ "view the timeline entities in the given domain");
|
||||||
|
opts.addOption("modify_acls", true, "Users and groups that allowed to "
|
||||||
|
+ "modify the timeline entities in the given domain");
|
||||||
|
opts.addOption("create", false, "Flag to indicate whether to create the "
|
||||||
|
+ "domain specified with -domain.");
|
||||||
opts.addOption("help", false, "Print usage");
|
opts.addOption("help", false, "Print usage");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -385,6 +408,18 @@ public class Client {
|
||||||
|
|
||||||
log4jPropFile = cliParser.getOptionValue("log_properties", "");
|
log4jPropFile = cliParser.getOptionValue("log_properties", "");
|
||||||
|
|
||||||
|
// Get timeline domain options
|
||||||
|
if (cliParser.hasOption("domain")) {
|
||||||
|
domainId = cliParser.getOptionValue("domain");
|
||||||
|
toCreateDomain = cliParser.hasOption("create");
|
||||||
|
if (cliParser.hasOption("view_acls")) {
|
||||||
|
viewACLs = cliParser.getOptionValue("view_acls");
|
||||||
|
}
|
||||||
|
if (cliParser.hasOption("modify_acls")) {
|
||||||
|
modifyACLs = cliParser.getOptionValue("modify_acls");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -431,6 +466,10 @@ public class Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (domainId != null && domainId.length() > 0 && toCreateDomain) {
|
||||||
|
prepareTimelineDomain();
|
||||||
|
}
|
||||||
|
|
||||||
// Get a new application id
|
// Get a new application id
|
||||||
YarnClientApplication app = yarnClient.createApplication();
|
YarnClientApplication app = yarnClient.createApplication();
|
||||||
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
|
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
|
||||||
|
@ -535,6 +574,9 @@ public class Client {
|
||||||
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
|
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
|
||||||
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
|
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
|
||||||
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
|
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
|
||||||
|
if (domainId != null && domainId.length() > 0) {
|
||||||
|
env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
|
||||||
|
}
|
||||||
|
|
||||||
// Add AppMaster.jar location to classpath
|
// Add AppMaster.jar location to classpath
|
||||||
// At some point we should not be required to add
|
// At some point we should not be required to add
|
||||||
|
@ -773,4 +815,35 @@ public class Client {
|
||||||
scFileStatus.getLen(), scFileStatus.getModificationTime());
|
scFileStatus.getLen(), scFileStatus.getModificationTime());
|
||||||
localResources.put(fileDstPath, scRsrc);
|
localResources.put(fileDstPath, scRsrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void prepareTimelineDomain() {
|
||||||
|
TimelineClient timelineClient = null;
|
||||||
|
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
|
||||||
|
timelineClient = TimelineClient.createTimelineClient();
|
||||||
|
timelineClient.init(conf);
|
||||||
|
timelineClient.start();
|
||||||
|
} else {
|
||||||
|
LOG.warn("Cannot put the domain " + domainId +
|
||||||
|
" because the timeline service is not enabled");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
//TODO: we need to check and combine the existing timeline domain ACLs,
|
||||||
|
//but let's do it once we have client java library to query domains.
|
||||||
|
TimelineDomain domain = new TimelineDomain();
|
||||||
|
domain.setId(domainId);
|
||||||
|
domain.setReaders(
|
||||||
|
viewACLs != null && viewACLs.length() > 0 ? viewACLs : " ");
|
||||||
|
domain.setWriters(
|
||||||
|
modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " ");
|
||||||
|
timelineClient.putDomain(domain);
|
||||||
|
LOG.info("Put the timeline domain: " +
|
||||||
|
TimelineUtils.dumpTimelineRecordtoJSON(domain));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error when putting the timeline domain", e);
|
||||||
|
} finally {
|
||||||
|
timelineClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,4 +44,9 @@ public class DSConstants {
|
||||||
* Used to validate the local resource.
|
* Used to validate the local resource.
|
||||||
*/
|
*/
|
||||||
public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN";
|
public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Environment key name denoting the timeline domain ID.
|
||||||
|
*/
|
||||||
|
public static final String DISTRIBUTEDSHELLTIMELINEDOMAIN = "DISTRIBUTEDSHELLTIMELINEDOMAIN";
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,10 +29,10 @@ import java.io.PrintWriter;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -43,6 +43,7 @@ import org.apache.hadoop.util.JarFinder;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -129,8 +131,16 @@ public class TestDistributedShell {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=90000)
|
@Test(timeout=90000)
|
||||||
public void testDSShell() throws Exception {
|
public void testDSShellWithDomain() throws Exception {
|
||||||
|
testDSShell(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testDSShellWithoutDomain() throws Exception {
|
||||||
|
testDSShell(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDSShell(boolean haveDomain) throws Exception {
|
||||||
String[] args = {
|
String[] args = {
|
||||||
"--jar",
|
"--jar",
|
||||||
APPMASTER_JAR,
|
APPMASTER_JAR,
|
||||||
|
@ -147,6 +157,20 @@ public class TestDistributedShell {
|
||||||
"--container_vcores",
|
"--container_vcores",
|
||||||
"1"
|
"1"
|
||||||
};
|
};
|
||||||
|
if (haveDomain) {
|
||||||
|
String[] domainArgs = {
|
||||||
|
"--domain",
|
||||||
|
"TEST_DOMAIN",
|
||||||
|
"--view_acls",
|
||||||
|
"reader_user reader_group",
|
||||||
|
"--modify_acls",
|
||||||
|
"writer_user writer_group",
|
||||||
|
"--create"
|
||||||
|
};
|
||||||
|
List<String> argsList = new ArrayList<String>(Arrays.asList(args));
|
||||||
|
argsList.addAll(Arrays.asList(domainArgs));
|
||||||
|
args = argsList.toArray(new String[argsList.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Initializing DS Client");
|
LOG.info("Initializing DS Client");
|
||||||
final Client client = new Client(new Configuration(yarnCluster.getConfig()));
|
final Client client = new Client(new Configuration(yarnCluster.getConfig()));
|
||||||
|
@ -199,6 +223,14 @@ public class TestDistributedShell {
|
||||||
LOG.info("Client run completed. Result=" + result);
|
LOG.info("Client run completed. Result=" + result);
|
||||||
Assert.assertTrue(result.get());
|
Assert.assertTrue(result.get());
|
||||||
|
|
||||||
|
TimelineDomain domain = null;
|
||||||
|
if (haveDomain) {
|
||||||
|
domain = yarnCluster.getApplicationHistoryServer()
|
||||||
|
.getTimelineStore().getDomain("TEST_DOMAIN");
|
||||||
|
Assert.assertNotNull(domain);
|
||||||
|
Assert.assertEquals("reader_user reader_group", domain.getReaders());
|
||||||
|
Assert.assertEquals("writer_user writer_group", domain.getWriters());
|
||||||
|
}
|
||||||
TimelineEntities entitiesAttempts = yarnCluster
|
TimelineEntities entitiesAttempts = yarnCluster
|
||||||
.getApplicationHistoryServer()
|
.getApplicationHistoryServer()
|
||||||
.getTimelineStore()
|
.getTimelineStore()
|
||||||
|
@ -210,6 +242,13 @@ public class TestDistributedShell {
|
||||||
.size());
|
.size());
|
||||||
Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType()
|
Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType()
|
||||||
.toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
|
.toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
|
||||||
|
if (haveDomain) {
|
||||||
|
Assert.assertEquals(domain.getId(),
|
||||||
|
entitiesAttempts.getEntities().get(0).getDomainId());
|
||||||
|
} else {
|
||||||
|
Assert.assertEquals("DEFAULT",
|
||||||
|
entitiesAttempts.getEntities().get(0).getDomainId());
|
||||||
|
}
|
||||||
TimelineEntities entities = yarnCluster
|
TimelineEntities entities = yarnCluster
|
||||||
.getApplicationHistoryServer()
|
.getApplicationHistoryServer()
|
||||||
.getTimelineStore()
|
.getTimelineStore()
|
||||||
|
@ -219,6 +258,13 @@ public class TestDistributedShell {
|
||||||
Assert.assertEquals(2, entities.getEntities().size());
|
Assert.assertEquals(2, entities.getEntities().size());
|
||||||
Assert.assertEquals(entities.getEntities().get(0).getEntityType()
|
Assert.assertEquals(entities.getEntities().get(0).getEntityType()
|
||||||
.toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString());
|
.toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString());
|
||||||
|
if (haveDomain) {
|
||||||
|
Assert.assertEquals(domain.getId(),
|
||||||
|
entities.getEntities().get(0).getDomainId());
|
||||||
|
} else {
|
||||||
|
Assert.assertEquals("DEFAULT",
|
||||||
|
entities.getEntities().get(0).getDomainId());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -210,9 +210,6 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public ClientResponse doPostingObject(Object object, String path) {
|
public ClientResponse doPostingObject(Object object, String path) {
|
||||||
WebResource webResource = client.resource(resURI);
|
WebResource webResource = client.resource(resURI);
|
||||||
if (path != null) {
|
|
||||||
webResource.path(path);
|
|
||||||
}
|
|
||||||
if (path == null) {
|
if (path == null) {
|
||||||
return webResource.accept(MediaType.APPLICATION_JSON)
|
return webResource.accept(MediaType.APPLICATION_JSON)
|
||||||
.type(MediaType.APPLICATION_JSON)
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
|
Loading…
Reference in New Issue