YARN-610. ClientToken is no longer set in the environment of the Containers. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1493968 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b02dc5b464
commit
5d1b453b85
|
@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
|
||||||
|
|
||||||
|
@ -58,4 +59,6 @@ public interface AppContext {
|
||||||
ClusterInfo getClusterInfo();
|
ClusterInfo getClusterInfo();
|
||||||
|
|
||||||
Set<String> getBlacklistedNodes();
|
Set<String> getBlacklistedNodes();
|
||||||
|
|
||||||
|
ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,6 +128,7 @@ import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
|
@ -884,9 +885,12 @@ public class MRAppMaster extends CompositeService {
|
||||||
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
|
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final ClusterInfo clusterInfo = new ClusterInfo();
|
private final ClusterInfo clusterInfo = new ClusterInfo();
|
||||||
|
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
|
||||||
|
|
||||||
public RunningAppContext(Configuration config) {
|
public RunningAppContext(Configuration config) {
|
||||||
this.conf = config;
|
this.conf = config;
|
||||||
|
this.clientToAMTokenSecretManager =
|
||||||
|
new ClientToAMTokenSecretManager(appAttemptID, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -943,6 +947,11 @@ public class MRAppMaster extends CompositeService {
|
||||||
public Set<String> getBlacklistedNodes() {
|
public Set<String> getBlacklistedNodes() {
|
||||||
return ((RMContainerRequestor) containerAllocator).getBlacklistedNodes();
|
return ((RMContainerRequestor) containerAllocator).getBlacklistedNodes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
||||||
|
return clientToAMTokenSecretManager;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.security.SecurityInfo;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.security.token.TokenInfo;
|
import org.apache.hadoop.security.token.TokenInfo;
|
||||||
import org.apache.hadoop.security.token.TokenSelector;
|
import org.apache.hadoop.security.token.TokenSelector;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSelector;
|
||||||
|
|
||||||
public class MRClientSecurityInfo extends SecurityInfo {
|
public class MRClientSecurityInfo extends SecurityInfo {
|
||||||
|
|
||||||
|
@ -51,7 +51,7 @@ public class MRClientSecurityInfo extends SecurityInfo {
|
||||||
@Override
|
@Override
|
||||||
public Class<? extends TokenSelector<? extends TokenIdentifier>>
|
public Class<? extends TokenSelector<? extends TokenIdentifier>>
|
||||||
value() {
|
value() {
|
||||||
return ClientTokenSelector.class;
|
return ClientToAMTokenSelector.class;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -79,14 +78,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
|
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
|
import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
|
||||||
import org.apache.hadoop.yarn.webapp.WebApp;
|
import org.apache.hadoop.yarn.webapp.WebApp;
|
||||||
import org.apache.hadoop.yarn.webapp.WebApps;
|
import org.apache.hadoop.yarn.webapp.WebApps;
|
||||||
|
|
||||||
|
@ -117,19 +113,9 @@ public class MRClientService extends AbstractService
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
InetSocketAddress address = new InetSocketAddress(0);
|
InetSocketAddress address = new InetSocketAddress(0);
|
||||||
|
|
||||||
ClientToAMTokenSecretManager secretManager = null;
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
|
||||||
String secretKeyStr =
|
|
||||||
System
|
|
||||||
.getenv(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME);
|
|
||||||
byte[] bytes = Base64.decodeBase64(secretKeyStr);
|
|
||||||
secretManager =
|
|
||||||
new ClientToAMTokenSecretManager(
|
|
||||||
this.appContext.getApplicationAttemptId(), bytes);
|
|
||||||
}
|
|
||||||
server =
|
server =
|
||||||
rpc.getServer(MRClientProtocol.class, protocolHandler, address,
|
rpc.getServer(MRClientProtocol.class, protocolHandler, address,
|
||||||
conf, secretManager,
|
conf, appContext.getClientToAMTokenSecretManager(),
|
||||||
conf.getInt(MRJobConfig.MR_AM_JOB_CLIENT_THREAD_COUNT,
|
conf.getInt(MRJobConfig.MR_AM_JOB_CLIENT_THREAD_COUNT,
|
||||||
MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT),
|
MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT),
|
||||||
MRJobConfig.MR_AM_JOB_CLIENT_PORT_RANGE);
|
MRJobConfig.MR_AM_JOB_CLIENT_PORT_RANGE);
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
@ -154,6 +155,9 @@ public abstract class RMCommunicator extends AbstractService
|
||||||
maxContainerCapability = response.getMaximumResourceCapability();
|
maxContainerCapability = response.getMaximumResourceCapability();
|
||||||
this.context.getClusterInfo().setMaxContainerCapability(
|
this.context.getClusterInfo().setMaxContainerCapability(
|
||||||
maxContainerCapability);
|
maxContainerCapability);
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
setClientToAMToken(response.getClientToAMTokenMasterKey());
|
||||||
|
}
|
||||||
this.applicationACLs = response.getApplicationACLs();
|
this.applicationACLs = response.getApplicationACLs();
|
||||||
LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
|
LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
|
||||||
} catch (Exception are) {
|
} catch (Exception are) {
|
||||||
|
@ -162,6 +166,11 @@ public abstract class RMCommunicator extends AbstractService
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setClientToAMToken(ByteBuffer clientToAMTokenMasterKey) {
|
||||||
|
byte[] key = clientToAMTokenMasterKey.array();
|
||||||
|
context.getClientToAMTokenSecretManager().setMasterKey(key);
|
||||||
|
}
|
||||||
|
|
||||||
protected void unregister() {
|
protected void unregister() {
|
||||||
try {
|
try {
|
||||||
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
|
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
@ -125,4 +126,8 @@ public class MockAppContext implements AppContext {
|
||||||
this.blacklistedNodes = blacklistedNodes;
|
this.blacklistedNodes = blacklistedNodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
||||||
|
// Not implemented
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -855,5 +856,10 @@ public class TestRuntimeEstimators {
|
||||||
public Set<String> getBlacklistedNodes() {
|
public Set<String> getBlacklistedNodes() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
||||||
|
// Not Implemented
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
@ -306,6 +307,12 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
||||||
// TODO AppContext - Not Required
|
// TODO AppContext - Not Required
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getBlacklistedNodes() {
|
public Set<String> getBlacklistedNodes() {
|
||||||
|
// Not Implemented
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
||||||
|
// Not implemented.
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
|
|
||||||
public class ClientServiceDelegate {
|
public class ClientServiceDelegate {
|
||||||
|
@ -180,9 +180,10 @@ public class ClientServiceDelegate {
|
||||||
serviceAddr = NetUtils.createSocketAddrForHost(
|
serviceAddr = NetUtils.createSocketAddrForHost(
|
||||||
application.getHost(), application.getRpcPort());
|
application.getHost(), application.getRpcPort());
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
org.apache.hadoop.yarn.api.records.Token clientToken = application.getClientToken();
|
org.apache.hadoop.yarn.api.records.Token clientToAMToken =
|
||||||
Token<ClientTokenIdentifier> token =
|
application.getClientToAMToken();
|
||||||
ProtoUtils.convertFromProtoFormat(clientToken, serviceAddr);
|
Token<ClientToAMTokenIdentifier> token =
|
||||||
|
ProtoUtils.convertFromProtoFormat(clientToAMToken, serviceAddr);
|
||||||
newUgi.addToken(token);
|
newUgi.addToken(token);
|
||||||
}
|
}
|
||||||
LOG.debug("Connecting to " + serviceAddr);
|
LOG.debug("Connecting to " + serviceAddr);
|
||||||
|
|
|
@ -183,6 +183,9 @@ Release 2.1.0-beta - UNRELEASED
|
||||||
YARN-822. Renamed ApplicationToken to be AMRMToken, and similarly the
|
YARN-822. Renamed ApplicationToken to be AMRMToken, and similarly the
|
||||||
corresponding TokenSelector and SecretManager. (Omkar Vinit Joshi via vinodkv)
|
corresponding TokenSelector and SecretManager. (Omkar Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
|
YARN-610. ClientToken is no longer set in the environment of the Containers.
|
||||||
|
(Omkar Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
YARN-482. FS: Extend SchedulingMode to intermediate queues.
|
YARN-482. FS: Extend SchedulingMode to intermediate queues.
|
||||||
|
|
|
@ -29,10 +29,6 @@ import org.apache.hadoop.util.Shell;
|
||||||
*/
|
*/
|
||||||
public interface ApplicationConstants {
|
public interface ApplicationConstants {
|
||||||
|
|
||||||
// TODO: They say tokens via env isn't good.
|
|
||||||
public static final String APPLICATION_CLIENT_SECRET_ENV_NAME =
|
|
||||||
"AppClientSecretEnv";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment
|
* The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment
|
||||||
* only
|
* only
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.protocolrecords;
|
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -84,4 +85,18 @@ public abstract class RegisterApplicationMasterResponse {
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setApplicationACLs(Map<ApplicationAccessType, String> acls);
|
public abstract void setApplicationACLs(Map<ApplicationAccessType, String> acls);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set ClientToAMToken master key.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Stable
|
||||||
|
public abstract void setClientToAMTokenMasterKey(ByteBuffer key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get ClientToAMToken master key.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Stable
|
||||||
|
public abstract ByteBuffer getClientToAMTokenMasterKey();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -34,6 +35,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterR
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
|
|
||||||
public class RegisterApplicationMasterResponsePBImpl extends
|
public class RegisterApplicationMasterResponsePBImpl extends
|
||||||
RegisterApplicationMasterResponse {
|
RegisterApplicationMasterResponse {
|
||||||
|
@ -201,6 +204,22 @@ public class RegisterApplicationMasterResponsePBImpl extends
|
||||||
this.applicationACLS.putAll(appACLs);
|
this.applicationACLS.putAll(appACLs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setClientToAMTokenMasterKey(ByteBuffer key) {
|
||||||
|
if (key == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setClientToAmTokenMasterKey(ByteString.copyFrom(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer getClientToAMTokenMasterKey() {
|
||||||
|
ByteBuffer key =
|
||||||
|
ByteBuffer.wrap(builder.getClientToAmTokenMasterKey().toByteArray());
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
private Resource convertFromProtoFormat(ResourceProto resource) {
|
private Resource convertFromProtoFormat(ResourceProto resource) {
|
||||||
return new ResourcePBImpl(resource);
|
return new ResourcePBImpl(resource);
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ public abstract class ApplicationReport {
|
||||||
@Stable
|
@Stable
|
||||||
public static ApplicationReport newInstance(ApplicationId applicationId,
|
public static ApplicationReport newInstance(ApplicationId applicationId,
|
||||||
ApplicationAttemptId applicationAttemptId, String user, String queue,
|
ApplicationAttemptId applicationAttemptId, String user, String queue,
|
||||||
String name, String host, int rpcPort, Token clientToken,
|
String name, String host, int rpcPort, Token clientToAMToken,
|
||||||
YarnApplicationState state, String diagnostics, String url,
|
YarnApplicationState state, String diagnostics, String url,
|
||||||
long startTime, long finishTime, FinalApplicationStatus finalStatus,
|
long startTime, long finishTime, FinalApplicationStatus finalStatus,
|
||||||
ApplicationResourceUsageReport appResources, String origTrackingUrl,
|
ApplicationResourceUsageReport appResources, String origTrackingUrl,
|
||||||
|
@ -67,7 +67,7 @@ public abstract class ApplicationReport {
|
||||||
report.setName(name);
|
report.setName(name);
|
||||||
report.setHost(host);
|
report.setHost(host);
|
||||||
report.setRpcPort(rpcPort);
|
report.setRpcPort(rpcPort);
|
||||||
report.setClientToken(clientToken);
|
report.setClientToAMToken(clientToAMToken);
|
||||||
report.setYarnApplicationState(state);
|
report.setYarnApplicationState(state);
|
||||||
report.setDiagnostics(diagnostics);
|
report.setDiagnostics(diagnostics);
|
||||||
report.setTrackingUrl(url);
|
report.setTrackingUrl(url);
|
||||||
|
@ -172,13 +172,13 @@ public abstract class ApplicationReport {
|
||||||
* Get the <em>client token</em> for communicating with the
|
* Get the <em>client token</em> for communicating with the
|
||||||
* <code>ApplicationMaster</code>.
|
* <code>ApplicationMaster</code>.
|
||||||
* <p>
|
* <p>
|
||||||
* <code>ClientToken</code> is the security token used by the AMs to verify
|
* <em>ClientToAMToken</em> is the security token used by the AMs to verify
|
||||||
* authenticity of any <code>client</code>.
|
* authenticity of any <code>client</code>.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* The <code>ResourceManager</code>, provides a secure token (via
|
* The <code>ResourceManager</code>, provides a secure token (via
|
||||||
* {@link ApplicationReport#getClientToken()}) which is verified by the
|
* {@link ApplicationReport#getClientToAMToken()}) which is verified by the
|
||||||
* ApplicationMaster when the client directly talks to an AM.
|
* ApplicationMaster when the client directly talks to an AM.
|
||||||
* </p>
|
* </p>
|
||||||
* @return <em>client token</em> for communicating with the
|
* @return <em>client token</em> for communicating with the
|
||||||
|
@ -186,11 +186,11 @@ public abstract class ApplicationReport {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
public abstract Token getClientToken();
|
public abstract Token getClientToAMToken();
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setClientToken(Token clientToken);
|
public abstract void setClientToAMToken(Token clientToAMToken);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the <code>YarnApplicationState</code> of the application.
|
* Get the <code>YarnApplicationState</code> of the application.
|
||||||
|
|
|
@ -42,7 +42,7 @@ public class ApplicationReportPBImpl extends ApplicationReport {
|
||||||
|
|
||||||
private ApplicationId applicationId;
|
private ApplicationId applicationId;
|
||||||
private ApplicationAttemptId currentApplicationAttemptId;
|
private ApplicationAttemptId currentApplicationAttemptId;
|
||||||
private Token clientToken = null;
|
private Token clientToAMToken = null;
|
||||||
|
|
||||||
public ApplicationReportPBImpl() {
|
public ApplicationReportPBImpl() {
|
||||||
builder = ApplicationReportProto.newBuilder();
|
builder = ApplicationReportProto.newBuilder();
|
||||||
|
@ -160,16 +160,16 @@ public class ApplicationReportPBImpl extends ApplicationReport {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Token getClientToken() {
|
public Token getClientToAMToken() {
|
||||||
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
|
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
if (this.clientToken != null) {
|
if (this.clientToAMToken != null) {
|
||||||
return this.clientToken;
|
return this.clientToAMToken;
|
||||||
}
|
}
|
||||||
if (!p.hasClientToken()) {
|
if (!p.hasClientToAmToken()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
this.clientToken = convertFromProtoFormat(p.getClientToken());
|
this.clientToAMToken = convertFromProtoFormat(p.getClientToAmToken());
|
||||||
return this.clientToken;
|
return this.clientToAMToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -309,11 +309,11 @@ public class ApplicationReportPBImpl extends ApplicationReport {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setClientToken(Token clientToken) {
|
public void setClientToAMToken(Token clientToAMToken) {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
if (clientToken == null)
|
if (clientToAMToken == null)
|
||||||
builder.clearClientToken();
|
builder.clearClientToAmToken();
|
||||||
this.clientToken = clientToken;
|
this.clientToAMToken = clientToAMToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -412,10 +412,10 @@ public class ApplicationReportPBImpl extends ApplicationReport {
|
||||||
builder.getCurrentApplicationAttemptId())) {
|
builder.getCurrentApplicationAttemptId())) {
|
||||||
builder.setCurrentApplicationAttemptId(convertToProtoFormat(this.currentApplicationAttemptId));
|
builder.setCurrentApplicationAttemptId(convertToProtoFormat(this.currentApplicationAttemptId));
|
||||||
}
|
}
|
||||||
if (this.clientToken != null
|
if (this.clientToAMToken != null
|
||||||
&& !((TokenPBImpl) this.clientToken).getProto().equals(
|
&& !((TokenPBImpl) this.clientToAMToken).getProto().equals(
|
||||||
builder.getClientToken())) {
|
builder.getClientToAmToken())) {
|
||||||
builder.setClientToken(convertToProtoFormat(this.clientToken));
|
builder.setClientToAmToken(convertToProtoFormat(this.clientToAMToken));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -131,7 +131,7 @@ message ApplicationReportProto {
|
||||||
optional string name = 4;
|
optional string name = 4;
|
||||||
optional string host = 5;
|
optional string host = 5;
|
||||||
optional int32 rpc_port = 6;
|
optional int32 rpc_port = 6;
|
||||||
optional hadoop.common.TokenProto client_token = 7;
|
optional hadoop.common.TokenProto client_to_am_token = 7;
|
||||||
optional YarnApplicationStateProto yarn_application_state = 8;
|
optional YarnApplicationStateProto yarn_application_state = 8;
|
||||||
optional string trackingUrl = 9;
|
optional string trackingUrl = 9;
|
||||||
optional string diagnostics = 10 [default = "N/A"];
|
optional string diagnostics = 10 [default = "N/A"];
|
||||||
|
|
|
@ -36,7 +36,8 @@ message RegisterApplicationMasterRequestProto {
|
||||||
|
|
||||||
message RegisterApplicationMasterResponseProto {
|
message RegisterApplicationMasterResponseProto {
|
||||||
optional ResourceProto maximumCapability = 1;
|
optional ResourceProto maximumCapability = 1;
|
||||||
repeated ApplicationACLMapProto application_ACLs = 2;
|
optional bytes client_to_am_token_master_key = 2;
|
||||||
|
repeated ApplicationACLMapProto application_ACLs = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message FinishApplicationMasterRequestProto {
|
message FinishApplicationMasterRequestProto {
|
||||||
|
|
|
@ -601,7 +601,7 @@ public class Client {
|
||||||
|
|
||||||
LOG.info("Got application report from ASM for"
|
LOG.info("Got application report from ASM for"
|
||||||
+ ", appId=" + appId.getId()
|
+ ", appId=" + appId.getId()
|
||||||
+ ", clientToken=" + report.getClientToken()
|
+ ", clientToAMToken=" + report.getClientToAMToken()
|
||||||
+ ", appDiagnostics=" + report.getDiagnostics()
|
+ ", appDiagnostics=" + report.getDiagnostics()
|
||||||
+ ", appMasterHost=" + report.getHost()
|
+ ", appMasterHost=" + report.getHost()
|
||||||
+ ", appQueue=" + report.getQueue()
|
+ ", appQueue=" + report.getQueue()
|
||||||
|
|
|
@ -388,8 +388,8 @@ public class UnmanagedAMLauncher {
|
||||||
|
|
||||||
LOG.info("Got application report from ASM for" + ", appId="
|
LOG.info("Got application report from ASM for" + ", appId="
|
||||||
+ appId.getId() + ", appAttemptId="
|
+ appId.getId() + ", appAttemptId="
|
||||||
+ report.getCurrentApplicationAttemptId() + ", clientToken="
|
+ report.getCurrentApplicationAttemptId() + ", clientToAMToken="
|
||||||
+ report.getClientToken() + ", appDiagnostics="
|
+ report.getClientToAMToken() + ", appDiagnostics="
|
||||||
+ report.getDiagnostics() + ", appMasterHost=" + report.getHost()
|
+ report.getDiagnostics() + ", appMasterHost=" + report.getHost()
|
||||||
+ ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
|
+ ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
|
||||||
+ report.getRpcPort() + ", appStartTime=" + report.getStartTime()
|
+ report.getRpcPort() + ", appStartTime=" + report.getStartTime()
|
||||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
@Public
|
@Public
|
||||||
@Evolving
|
@Evolving
|
||||||
public abstract class BaseClientToAMTokenSecretManager extends
|
public abstract class BaseClientToAMTokenSecretManager extends
|
||||||
SecretManager<ClientTokenIdentifier> {
|
SecretManager<ClientToAMTokenIdentifier> {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public abstract SecretKey getMasterKey(
|
public abstract SecretKey getMasterKey(
|
||||||
|
@ -45,14 +45,14 @@ public abstract class BaseClientToAMTokenSecretManager extends
|
||||||
@Private
|
@Private
|
||||||
@Override
|
@Override
|
||||||
public synchronized byte[] createPassword(
|
public synchronized byte[] createPassword(
|
||||||
ClientTokenIdentifier identifier) {
|
ClientToAMTokenIdentifier identifier) {
|
||||||
return createPassword(identifier.getBytes(),
|
return createPassword(identifier.getBytes(),
|
||||||
getMasterKey(identifier.getApplicationAttemptID()));
|
getMasterKey(identifier.getApplicationAttemptID()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Override
|
@Override
|
||||||
public byte[] retrievePassword(ClientTokenIdentifier identifier)
|
public byte[] retrievePassword(ClientToAMTokenIdentifier identifier)
|
||||||
throws SecretManager.InvalidToken {
|
throws SecretManager.InvalidToken {
|
||||||
SecretKey masterKey = getMasterKey(identifier.getApplicationAttemptID());
|
SecretKey masterKey = getMasterKey(identifier.getApplicationAttemptID());
|
||||||
if (masterKey == null) {
|
if (masterKey == null) {
|
||||||
|
@ -63,8 +63,8 @@ public abstract class BaseClientToAMTokenSecretManager extends
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Override
|
@Override
|
||||||
public ClientTokenIdentifier createIdentifier() {
|
public ClientToAMTokenIdentifier createIdentifier() {
|
||||||
return new ClientTokenIdentifier();
|
return new ClientToAMTokenIdentifier();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Evolving
|
@Evolving
|
||||||
public class ClientTokenIdentifier extends TokenIdentifier {
|
public class ClientToAMTokenIdentifier extends TokenIdentifier {
|
||||||
|
|
||||||
public static final Text KIND_NAME = new Text("YARN_CLIENT_TOKEN");
|
public static final Text KIND_NAME = new Text("YARN_CLIENT_TOKEN");
|
||||||
|
|
||||||
|
@ -43,10 +43,10 @@ public class ClientTokenIdentifier extends TokenIdentifier {
|
||||||
// TODO: Add more information in the tokenID such that it is not
|
// TODO: Add more information in the tokenID such that it is not
|
||||||
// transferrable, more secure etc.
|
// transferrable, more secure etc.
|
||||||
|
|
||||||
public ClientTokenIdentifier() {
|
public ClientToAMTokenIdentifier() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClientTokenIdentifier(ApplicationAttemptId id) {
|
public ClientToAMTokenIdentifier(ApplicationAttemptId id) {
|
||||||
this();
|
this();
|
||||||
this.applicationAttemptId = id;
|
this.applicationAttemptId = id;
|
||||||
}
|
}
|
|
@ -38,12 +38,17 @@ public class ClientToAMTokenSecretManager extends
|
||||||
BaseClientToAMTokenSecretManager {
|
BaseClientToAMTokenSecretManager {
|
||||||
|
|
||||||
// Only one client-token and one master-key for AM
|
// Only one client-token and one master-key for AM
|
||||||
private final SecretKey masterKey;
|
private SecretKey masterKey;
|
||||||
|
|
||||||
public ClientToAMTokenSecretManager(
|
public ClientToAMTokenSecretManager(
|
||||||
ApplicationAttemptId applicationAttemptID, byte[] secretKeyBytes) {
|
ApplicationAttemptId applicationAttemptID, byte[] key) {
|
||||||
super();
|
super();
|
||||||
this.masterKey = SecretManager.createSecretKey(secretKeyBytes);
|
if (key != null) {
|
||||||
|
this.masterKey = SecretManager.createSecretKey(key);
|
||||||
|
} else {
|
||||||
|
this.masterKey = null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -52,4 +57,7 @@ public class ClientToAMTokenSecretManager extends
|
||||||
return this.masterKey;
|
return this.masterKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMasterKey(byte[] key) {
|
||||||
|
this.masterKey = SecretManager.createSecretKey(key);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,54 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.security.client;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.security.token.TokenSelector;
|
||||||
|
|
||||||
|
public class ClientToAMTokenSelector implements
|
||||||
|
TokenSelector<ClientToAMTokenIdentifier> {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(ClientToAMTokenSelector.class);
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public Token<ClientToAMTokenIdentifier> selectToken(Text service,
|
||||||
|
Collection<Token<? extends TokenIdentifier>> tokens) {
|
||||||
|
if (service == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
LOG.debug("Looking for a token with service " + service.toString());
|
||||||
|
for (Token<? extends TokenIdentifier> token : tokens) {
|
||||||
|
LOG.debug("Token kind is " + token.getKind().toString()
|
||||||
|
+ " and the token's service name is " + token.getService());
|
||||||
|
if (ClientToAMTokenIdentifier.KIND_NAME.equals(token.getKind())
|
||||||
|
&& service.equals(token.getService())) {
|
||||||
|
return (Token<ClientToAMTokenIdentifier>) token;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,58 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.security.client;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.security.token.TokenSelector;
|
|
||||||
|
|
||||||
@Public
|
|
||||||
@Stable
|
|
||||||
public class ClientTokenSelector implements
|
|
||||||
TokenSelector<ClientTokenIdentifier> {
|
|
||||||
|
|
||||||
private static final Log LOG = LogFactory
|
|
||||||
.getLog(ClientTokenSelector.class);
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public Token<ClientTokenIdentifier> selectToken(Text service,
|
|
||||||
Collection<Token<? extends TokenIdentifier>> tokens) {
|
|
||||||
if (service == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
LOG.debug("Looking for a token with service " + service.toString());
|
|
||||||
for (Token<? extends TokenIdentifier> token : tokens) {
|
|
||||||
LOG.debug("Token kind is " + token.getKind().toString()
|
|
||||||
+ " and the token's service name is " + token.getService());
|
|
||||||
if (ClientTokenIdentifier.KIND_NAME.equals(token.getKind())
|
|
||||||
&& service.equals(token.getService())) {
|
|
||||||
return (Token<ClientTokenIdentifier>) token;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -13,5 +13,5 @@
|
||||||
#
|
#
|
||||||
org.apache.hadoop.yarn.security.ContainerTokenIdentifier
|
org.apache.hadoop.yarn.security.ContainerTokenIdentifier
|
||||||
org.apache.hadoop.yarn.security.AMRMTokenIdentifier
|
org.apache.hadoop.yarn.security.AMRMTokenIdentifier
|
||||||
org.apache.hadoop.yarn.security.client.ClientTokenIdentifier
|
org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier
|
||||||
org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier
|
org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier
|
||||||
|
|
|
@ -61,7 +61,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -224,7 +223,7 @@ public class BuilderUtils {
|
||||||
return newToken(Token.class, identifier, kind, password, service);
|
return newToken(Token.class, identifier, kind, password, service);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Token newClientToken(byte[] identifier, String kind,
|
public static Token newClientToAMToken(byte[] identifier, String kind,
|
||||||
byte[] password, String service) {
|
byte[] password, String service) {
|
||||||
return newToken(Token.class, identifier, kind, password, service);
|
return newToken(Token.class, identifier, kind, password, service);
|
||||||
}
|
}
|
||||||
|
@ -299,7 +298,7 @@ public class BuilderUtils {
|
||||||
public static ApplicationReport newApplicationReport(
|
public static ApplicationReport newApplicationReport(
|
||||||
ApplicationId applicationId, ApplicationAttemptId applicationAttemptId,
|
ApplicationId applicationId, ApplicationAttemptId applicationAttemptId,
|
||||||
String user, String queue, String name, String host, int rpcPort,
|
String user, String queue, String name, String host, int rpcPort,
|
||||||
Token clientToken, YarnApplicationState state, String diagnostics,
|
Token clientToAMToken, YarnApplicationState state, String diagnostics,
|
||||||
String url, long startTime, long finishTime,
|
String url, long startTime, long finishTime,
|
||||||
FinalApplicationStatus finalStatus,
|
FinalApplicationStatus finalStatus,
|
||||||
ApplicationResourceUsageReport appResources, String origTrackingUrl,
|
ApplicationResourceUsageReport appResources, String origTrackingUrl,
|
||||||
|
@ -313,7 +312,7 @@ public class BuilderUtils {
|
||||||
report.setName(name);
|
report.setName(name);
|
||||||
report.setHost(host);
|
report.setHost(host);
|
||||||
report.setRpcPort(rpcPort);
|
report.setRpcPort(rpcPort);
|
||||||
report.setClientToken(clientToken);
|
report.setClientToAMToken(clientToAMToken);
|
||||||
report.setYarnApplicationState(state);
|
report.setYarnApplicationState(state);
|
||||||
report.setDiagnostics(diagnostics);
|
report.setDiagnostics(diagnostics);
|
||||||
report.setTrackingUrl(url);
|
report.setTrackingUrl(url);
|
||||||
|
|
|
@ -214,6 +214,12 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
.getMaximumResourceCapability());
|
.getMaximumResourceCapability());
|
||||||
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
|
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
|
||||||
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
|
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
LOG.info("Setting client token master key");
|
||||||
|
response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext
|
||||||
|
.getClientToAMTokenSecretManager()
|
||||||
|
.getMasterKey(applicationAttemptId).getEncoded()));
|
||||||
|
}
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class RMContextImpl implements RMContext {
|
||||||
AMRMTokenSecretManager appTokenSecretManager,
|
AMRMTokenSecretManager appTokenSecretManager,
|
||||||
RMContainerTokenSecretManager containerTokenSecretManager,
|
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||||
NMTokenSecretManagerInRM nmTokenSecretManager,
|
NMTokenSecretManagerInRM nmTokenSecretManager,
|
||||||
ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
|
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
|
||||||
this.rmDispatcher = rmDispatcher;
|
this.rmDispatcher = rmDispatcher;
|
||||||
this.stateStore = store;
|
this.stateStore = store;
|
||||||
this.containerAllocationExpirer = containerAllocationExpirer;
|
this.containerAllocationExpirer = containerAllocationExpirer;
|
||||||
|
@ -81,7 +81,7 @@ public class RMContextImpl implements RMContext {
|
||||||
this.appTokenSecretManager = appTokenSecretManager;
|
this.appTokenSecretManager = appTokenSecretManager;
|
||||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||||
this.clientToAMTokenSecretManager = clientTokenSecretManager;
|
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -94,11 +94,11 @@ public class RMContextImpl implements RMContext {
|
||||||
AMRMTokenSecretManager appTokenSecretManager,
|
AMRMTokenSecretManager appTokenSecretManager,
|
||||||
RMContainerTokenSecretManager containerTokenSecretManager,
|
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||||
NMTokenSecretManagerInRM nmTokenSecretManager,
|
NMTokenSecretManagerInRM nmTokenSecretManager,
|
||||||
ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
|
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
|
||||||
this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
|
this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
|
||||||
amFinishingMonitor, tokenRenewer, appTokenSecretManager,
|
amFinishingMonitor, tokenRenewer, appTokenSecretManager,
|
||||||
containerTokenSecretManager, nmTokenSecretManager,
|
containerTokenSecretManager, nmTokenSecretManager,
|
||||||
clientTokenSecretManager);
|
clientToAMTokenSecretManager);
|
||||||
RMStateStore nullStore = new NullRMStateStore();
|
RMStateStore nullStore = new NullRMStateStore();
|
||||||
nullStore.setDispatcher(rmDispatcher);
|
nullStore.setDispatcher(rmDispatcher);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -165,12 +165,12 @@ public class AMLauncher implements Runnable {
|
||||||
new String[0])));
|
new String[0])));
|
||||||
|
|
||||||
// Finalize the container
|
// Finalize the container
|
||||||
setupTokensAndEnv(container, containerID);
|
setupTokens(container, containerID);
|
||||||
|
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupTokensAndEnv(
|
private void setupTokens(
|
||||||
ContainerLaunchContext container, ContainerId containerID)
|
ContainerLaunchContext container, ContainerId containerID)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Map<String, String> environment = container.getEnvironment();
|
Map<String, String> environment = container.getEnvironment();
|
||||||
|
@ -210,15 +210,6 @@ public class AMLauncher implements Runnable {
|
||||||
credentials.writeTokenStorageToStream(dob);
|
credentials.writeTokenStorageToStream(dob);
|
||||||
container.setTokens(ByteBuffer.wrap(dob.getData(), 0,
|
container.setTokens(ByteBuffer.wrap(dob.getData(), 0,
|
||||||
dob.getLength()));
|
dob.getLength()));
|
||||||
|
|
||||||
SecretKey clientSecretKey =
|
|
||||||
this.rmContext.getClientToAMTokenSecretManager().getMasterKey(
|
|
||||||
application.getAppAttemptId());
|
|
||||||
String encoded =
|
|
||||||
Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
|
|
||||||
environment.put(
|
|
||||||
ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME,
|
|
||||||
encoded);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
@ -382,9 +382,10 @@ public abstract class RMStateStore {
|
||||||
if(appToken != null){
|
if(appToken != null){
|
||||||
credentials.addToken(appToken.getService(), appToken);
|
credentials.addToken(appToken.getService(), appToken);
|
||||||
}
|
}
|
||||||
Token<ClientTokenIdentifier> clientToken = appAttempt.getClientToken();
|
Token<ClientToAMTokenIdentifier> clientToAMToken =
|
||||||
if(clientToken != null){
|
appAttempt.getClientToAMToken();
|
||||||
credentials.addToken(clientToken.getService(), clientToken);
|
if(clientToAMToken != null){
|
||||||
|
credentials.addToken(clientToAMToken.getService(), clientToAMToken);
|
||||||
}
|
}
|
||||||
return credentials;
|
return credentials;
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
||||||
|
@ -430,7 +430,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ApplicationAttemptId currentApplicationAttemptId = null;
|
ApplicationAttemptId currentApplicationAttemptId = null;
|
||||||
org.apache.hadoop.yarn.api.records.Token clientToken = null;
|
org.apache.hadoop.yarn.api.records.Token clientToAMToken = null;
|
||||||
String trackingUrl = UNAVAILABLE;
|
String trackingUrl = UNAVAILABLE;
|
||||||
String host = UNAVAILABLE;
|
String host = UNAVAILABLE;
|
||||||
String origTrackingUrl = UNAVAILABLE;
|
String origTrackingUrl = UNAVAILABLE;
|
||||||
|
@ -445,13 +445,15 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
|
currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
|
||||||
trackingUrl = this.currentAttempt.getTrackingUrl();
|
trackingUrl = this.currentAttempt.getTrackingUrl();
|
||||||
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
|
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
|
||||||
Token<ClientTokenIdentifier> attemptClientToken =
|
Token<ClientToAMTokenIdentifier> attemptClientToAMToken =
|
||||||
this.currentAttempt.getClientToken();
|
this.currentAttempt.getClientToAMToken();
|
||||||
if (attemptClientToken != null) {
|
if (attemptClientToAMToken != null) {
|
||||||
clientToken =
|
clientToAMToken =
|
||||||
BuilderUtils.newClientToken(attemptClientToken.getIdentifier(),
|
BuilderUtils.newClientToAMToken(
|
||||||
attemptClientToken.getKind().toString(), attemptClientToken
|
attemptClientToAMToken.getIdentifier(),
|
||||||
.getPassword(), attemptClientToken.getService().toString());
|
attemptClientToAMToken.getKind().toString(),
|
||||||
|
attemptClientToAMToken.getPassword(),
|
||||||
|
attemptClientToAMToken.getService().toString());
|
||||||
}
|
}
|
||||||
host = this.currentAttempt.getHost();
|
host = this.currentAttempt.getHost();
|
||||||
rpcPort = this.currentAttempt.getRpcPort();
|
rpcPort = this.currentAttempt.getRpcPort();
|
||||||
|
@ -469,7 +471,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
|
||||||
return BuilderUtils.newApplicationReport(this.applicationId,
|
return BuilderUtils.newApplicationReport(this.applicationId,
|
||||||
currentApplicationAttemptId, this.user, this.queue,
|
currentApplicationAttemptId, this.user, this.queue,
|
||||||
this.name, host, rpcPort, clientToken,
|
this.name, host, rpcPort, clientToAMToken,
|
||||||
createApplicationState(this.stateMachine.getCurrentState()), diags,
|
createApplicationState(this.stateMachine.getCurrentState()), diags,
|
||||||
trackingUrl, this.startTime, this.finishTime, finishState,
|
trackingUrl, this.startTime, this.finishTime, finishState,
|
||||||
appUsageReport, origTrackingUrl, progress, this.applicationType);
|
appUsageReport, origTrackingUrl, progress, this.applicationType);
|
||||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -94,7 +94,7 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
||||||
* The token required by the clients to talk to the application attempt
|
* The token required by the clients to talk to the application attempt
|
||||||
* @return the token required by the clients to talk to the application attempt
|
* @return the token required by the clients to talk to the application attempt
|
||||||
*/
|
*/
|
||||||
Token<ClientTokenIdentifier> getClientToken();
|
Token<ClientToAMTokenIdentifier> getClientToAMToken();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Diagnostics information for the application attempt.
|
* Diagnostics information for the application attempt.
|
||||||
|
|
|
@ -63,8 +63,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenSelector;
|
import org.apache.hadoop.yarn.security.AMRMTokenSelector;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSelector;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||||
|
@ -129,7 +129,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
private final WriteLock writeLock;
|
private final WriteLock writeLock;
|
||||||
|
|
||||||
private final ApplicationAttemptId applicationAttemptId;
|
private final ApplicationAttemptId applicationAttemptId;
|
||||||
private Token<ClientTokenIdentifier> clientToken;
|
private Token<ClientToAMTokenIdentifier> clientToAMToken;
|
||||||
private final ApplicationSubmissionContext submissionContext;
|
private final ApplicationSubmissionContext submissionContext;
|
||||||
private Token<AMRMTokenIdentifier> amrmToken = null;
|
private Token<AMRMTokenIdentifier> amrmToken = null;
|
||||||
|
|
||||||
|
@ -498,8 +498,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Token<ClientTokenIdentifier> getClientToken() {
|
public Token<ClientToAMTokenIdentifier> getClientToAMToken() {
|
||||||
return this.clientToken;
|
return this.clientToAMToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -673,9 +673,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
}
|
}
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
|
||||||
ClientTokenSelector clientTokenSelector = new ClientTokenSelector();
|
ClientToAMTokenSelector clientToAMTokenSelector =
|
||||||
this.clientToken =
|
new ClientToAMTokenSelector();
|
||||||
clientTokenSelector.selectToken(new Text(),
|
this.clientToAMToken =
|
||||||
|
clientToAMTokenSelector.selectToken(new Text(),
|
||||||
appAttemptTokens.getAllTokens());
|
appAttemptTokens.getAllTokens());
|
||||||
|
|
||||||
InetSocketAddress serviceAddr = conf.getSocketAddr(
|
InetSocketAddress serviceAddr = conf.getSocketAddr(
|
||||||
|
@ -720,9 +721,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
appAttempt.rmContext.getClientToAMTokenSecretManager()
|
appAttempt.rmContext.getClientToAMTokenSecretManager()
|
||||||
.registerApplication(appAttempt.applicationAttemptId);
|
.registerApplication(appAttempt.applicationAttemptId);
|
||||||
|
|
||||||
// create clientToken
|
// create clientToAMToken
|
||||||
appAttempt.clientToken =
|
appAttempt.clientToAMToken =
|
||||||
new Token<ClientTokenIdentifier>(new ClientTokenIdentifier(
|
new Token<ClientToAMTokenIdentifier>(new ClientToAMTokenIdentifier(
|
||||||
appAttempt.applicationAttemptId),
|
appAttempt.applicationAttemptId),
|
||||||
appAttempt.rmContext.getClientToAMTokenSecretManager());
|
appAttempt.rmContext.getClientToAMTokenSecretManager());
|
||||||
|
|
||||||
|
@ -1050,7 +1051,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
appAttempt.rmContext.getAMFinishingMonitor().unregister(
|
appAttempt.rmContext.getAMFinishingMonitor().unregister(
|
||||||
appAttempt.getAppAttemptId());
|
appAttempt.getAppAttemptId());
|
||||||
|
|
||||||
// Unregister from the ClientTokenSecretManager
|
// Unregister from the ClientToAMTokenSecretManager
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
appAttempt.rmContext.getClientToAMTokenSecretManager()
|
appAttempt.rmContext.getClientToAMTokenSecretManager()
|
||||||
.unRegisterApplication(appAttempt.getAppAttemptId());
|
.unRegisterApplication(appAttempt.getAppAttemptId());
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
@ -49,7 +50,7 @@ public class MockAM {
|
||||||
private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
|
private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
|
||||||
private final List<ContainerId> releases = new ArrayList<ContainerId>();
|
private final List<ContainerId> releases = new ArrayList<ContainerId>();
|
||||||
|
|
||||||
MockAM(RMContext context, ApplicationMasterProtocol amRMProtocol,
|
public MockAM(RMContext context, ApplicationMasterProtocol amRMProtocol,
|
||||||
ApplicationAttemptId attemptId) {
|
ApplicationAttemptId attemptId) {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.amRMProtocol = amRMProtocol;
|
this.amRMProtocol = amRMProtocol;
|
||||||
|
@ -77,7 +78,7 @@ public class MockAM {
|
||||||
finalState, attempt.getAppAttemptState());
|
finalState, attempt.getAppAttemptState());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void registerAppAttempt() throws Exception {
|
public RegisterApplicationMasterResponse registerAppAttempt() throws Exception {
|
||||||
waitForState(RMAppAttemptState.LAUNCHED);
|
waitForState(RMAppAttemptState.LAUNCHED);
|
||||||
responseId = 0;
|
responseId = 0;
|
||||||
RegisterApplicationMasterRequest req = Records.newRecord(RegisterApplicationMasterRequest.class);
|
RegisterApplicationMasterRequest req = Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||||
|
@ -85,7 +86,7 @@ public class MockAM {
|
||||||
req.setHost("");
|
req.setHost("");
|
||||||
req.setRpcPort(1);
|
req.setRpcPort(1);
|
||||||
req.setTrackingUrl("");
|
req.setTrackingUrl("");
|
||||||
amRMProtocol.registerApplicationMaster(req);
|
return amRMProtocol.registerApplicationMaster(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addRequests(String[] hosts, int memory, int priority,
|
public void addRequests(String[] hosts, int memory, int priority,
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class MockRM extends ResourceManager {
|
||||||
while (!finalState.equals(app.getState()) && timeoutSecs++ < 40) {
|
while (!finalState.equals(app.getState()) && timeoutSecs++ < 40) {
|
||||||
System.out.println("App : " + appId + " State is : " + app.getState()
|
System.out.println("App : " + appId + " State is : " + app.getState()
|
||||||
+ " Waiting for state : " + finalState);
|
+ " Waiting for state : " + finalState);
|
||||||
Thread.sleep(1000);
|
Thread.sleep(2000);
|
||||||
}
|
}
|
||||||
System.out.println("App State is : " + app.getState());
|
System.out.println("App State is : " + app.getState());
|
||||||
Assert.assertEquals("App state is not correct (timedout)", finalState,
|
Assert.assertEquals("App state is not correct (timedout)", finalState,
|
||||||
|
|
|
@ -338,7 +338,7 @@ public class TestApplicationACLs {
|
||||||
Assert.assertEquals("Enemy should not see app rpc port!",
|
Assert.assertEquals("Enemy should not see app rpc port!",
|
||||||
-1, appReport.getRpcPort());
|
-1, appReport.getRpcPort());
|
||||||
Assert.assertEquals("Enemy should not see app client token!",
|
Assert.assertEquals("Enemy should not see app client token!",
|
||||||
null, appReport.getClientToken());
|
null, appReport.getClientToAMToken());
|
||||||
Assert.assertEquals("Enemy should not see app diagnostics!",
|
Assert.assertEquals("Enemy should not see app diagnostics!",
|
||||||
UNAVAILABLE, appReport.getDiagnostics());
|
UNAVAILABLE, appReport.getDiagnostics());
|
||||||
Assert.assertEquals("Enemy should not see app tracking url!",
|
Assert.assertEquals("Enemy should not see app tracking url!",
|
||||||
|
|
|
@ -534,10 +534,11 @@ public class TestRMRestart {
|
||||||
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
|
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
|
||||||
attemptState.getMasterContainer().getId());
|
attemptState.getMasterContainer().getId());
|
||||||
|
|
||||||
// the appToken and clientToken that are generated when RMAppAttempt is created,
|
// the appToken and clientToAMToken that are generated when RMAppAttempt
|
||||||
|
// is created,
|
||||||
HashSet<Token<?>> tokenSet = new HashSet<Token<?>>();
|
HashSet<Token<?>> tokenSet = new HashSet<Token<?>>();
|
||||||
tokenSet.add(attempt1.getAMRMToken());
|
tokenSet.add(attempt1.getAMRMToken());
|
||||||
tokenSet.add(attempt1.getClientToken());
|
tokenSet.add(attempt1.getClientToAMToken());
|
||||||
|
|
||||||
// assert application Token is saved
|
// assert application Token is saved
|
||||||
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
|
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
|
||||||
|
@ -556,12 +557,13 @@ public class TestRMRestart {
|
||||||
Assert.assertNotNull(loadedAttempt1);
|
Assert.assertNotNull(loadedAttempt1);
|
||||||
savedTokens.clear();
|
savedTokens.clear();
|
||||||
savedTokens.add(loadedAttempt1.getAMRMToken());
|
savedTokens.add(loadedAttempt1.getAMRMToken());
|
||||||
savedTokens.add(loadedAttempt1.getClientToken());
|
savedTokens.add(loadedAttempt1.getClientToAMToken());
|
||||||
Assert.assertEquals(tokenSet, savedTokens);
|
Assert.assertEquals(tokenSet, savedTokens);
|
||||||
|
|
||||||
// assert clientToken is recovered back to api-versioned clientToken
|
// assert clientToAMToken is recovered back to api-versioned
|
||||||
Assert.assertEquals(attempt1.getClientToken(),
|
// clientToAMToken
|
||||||
loadedAttempt1.getClientToken());
|
Assert.assertEquals(attempt1.getClientToAMToken(),
|
||||||
|
loadedAttempt1.getClientToAMToken());
|
||||||
|
|
||||||
// Not testing ApplicationTokenSecretManager has the password populated back,
|
// Not testing ApplicationTokenSecretManager has the password populated back,
|
||||||
// that is needed in work-preserving restart
|
// that is needed in work-preserving restart
|
||||||
|
|
|
@ -18,41 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import junit.framework.Assert;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
||||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class TestSchedulerNegotiator {
|
public class TestSchedulerNegotiator {
|
||||||
// private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
// private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
|
@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||||
|
@ -206,7 +206,7 @@ public class TestRMStateStore {
|
||||||
|
|
||||||
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
|
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
|
||||||
String containerIdStr, Token<AMRMTokenIdentifier> appToken,
|
String containerIdStr, Token<AMRMTokenIdentifier> appToken,
|
||||||
Token<ClientTokenIdentifier> clientToken, TestDispatcher dispatcher)
|
Token<ClientToAMTokenIdentifier> clientToAMToken, TestDispatcher dispatcher)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
Container container = new ContainerPBImpl();
|
Container container = new ContainerPBImpl();
|
||||||
|
@ -215,7 +215,7 @@ public class TestRMStateStore {
|
||||||
when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
|
when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
|
||||||
when(mockAttempt.getMasterContainer()).thenReturn(container);
|
when(mockAttempt.getMasterContainer()).thenReturn(container);
|
||||||
when(mockAttempt.getAMRMToken()).thenReturn(appToken);
|
when(mockAttempt.getAMRMToken()).thenReturn(appToken);
|
||||||
when(mockAttempt.getClientToken()).thenReturn(clientToken);
|
when(mockAttempt.getClientToAMToken()).thenReturn(clientToAMToken);
|
||||||
dispatcher.attemptId = attemptId;
|
dispatcher.attemptId = attemptId;
|
||||||
dispatcher.storedException = null;
|
dispatcher.storedException = null;
|
||||||
store.storeApplicationAttempt(mockAttempt);
|
store.storeApplicationAttempt(mockAttempt);
|
||||||
|
@ -233,7 +233,7 @@ public class TestRMStateStore {
|
||||||
|
|
||||||
AMRMTokenSecretManager appTokenMgr =
|
AMRMTokenSecretManager appTokenMgr =
|
||||||
new AMRMTokenSecretManager(conf);
|
new AMRMTokenSecretManager(conf);
|
||||||
ClientToAMTokenSecretManagerInRM clientTokenMgr =
|
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
|
||||||
new ClientToAMTokenSecretManagerInRM();
|
new ClientToAMTokenSecretManagerInRM();
|
||||||
|
|
||||||
ApplicationAttemptId attemptId1 = ConverterUtils
|
ApplicationAttemptId attemptId1 = ConverterUtils
|
||||||
|
@ -243,14 +243,14 @@ public class TestRMStateStore {
|
||||||
|
|
||||||
// create application token1 for attempt1
|
// create application token1 for attempt1
|
||||||
List<Token<?>> appAttemptToken1 =
|
List<Token<?>> appAttemptToken1 =
|
||||||
generateTokens(attemptId1, appTokenMgr, clientTokenMgr, conf);
|
generateTokens(attemptId1, appTokenMgr, clientToAMTokenMgr, conf);
|
||||||
HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
|
HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
|
||||||
attemptTokenSet1.addAll(appAttemptToken1);
|
attemptTokenSet1.addAll(appAttemptToken1);
|
||||||
|
|
||||||
ContainerId containerId1 = storeAttempt(store, attemptId1,
|
ContainerId containerId1 = storeAttempt(store, attemptId1,
|
||||||
"container_1352994193343_0001_01_000001",
|
"container_1352994193343_0001_01_000001",
|
||||||
(Token<AMRMTokenIdentifier>) (appAttemptToken1.get(0)),
|
(Token<AMRMTokenIdentifier>) (appAttemptToken1.get(0)),
|
||||||
(Token<ClientTokenIdentifier>)(appAttemptToken1.get(1)),
|
(Token<ClientToAMTokenIdentifier>)(appAttemptToken1.get(1)),
|
||||||
dispatcher);
|
dispatcher);
|
||||||
|
|
||||||
String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
|
String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
|
||||||
|
@ -259,14 +259,14 @@ public class TestRMStateStore {
|
||||||
|
|
||||||
// create application token2 for attempt2
|
// create application token2 for attempt2
|
||||||
List<Token<?>> appAttemptToken2 =
|
List<Token<?>> appAttemptToken2 =
|
||||||
generateTokens(attemptId2, appTokenMgr, clientTokenMgr, conf);
|
generateTokens(attemptId2, appTokenMgr, clientToAMTokenMgr, conf);
|
||||||
HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
|
HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
|
||||||
attemptTokenSet2.addAll(appAttemptToken2);
|
attemptTokenSet2.addAll(appAttemptToken2);
|
||||||
|
|
||||||
ContainerId containerId2 = storeAttempt(store, attemptId2,
|
ContainerId containerId2 = storeAttempt(store, attemptId2,
|
||||||
"container_1352994193343_0001_02_000001",
|
"container_1352994193343_0001_02_000001",
|
||||||
(Token<AMRMTokenIdentifier>) (appAttemptToken2.get(0)),
|
(Token<AMRMTokenIdentifier>) (appAttemptToken2.get(0)),
|
||||||
(Token<ClientTokenIdentifier>)(appAttemptToken2.get(1)),
|
(Token<ClientToAMTokenIdentifier>)(appAttemptToken2.get(1)),
|
||||||
dispatcher);
|
dispatcher);
|
||||||
|
|
||||||
ApplicationAttemptId attemptIdRemoved = ConverterUtils
|
ApplicationAttemptId attemptIdRemoved = ConverterUtils
|
||||||
|
@ -373,21 +373,22 @@ public class TestRMStateStore {
|
||||||
|
|
||||||
private List<Token<?>> generateTokens(ApplicationAttemptId attemptId,
|
private List<Token<?>> generateTokens(ApplicationAttemptId attemptId,
|
||||||
AMRMTokenSecretManager appTokenMgr,
|
AMRMTokenSecretManager appTokenMgr,
|
||||||
ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) {
|
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr, Configuration conf) {
|
||||||
AMRMTokenIdentifier appTokenId =
|
AMRMTokenIdentifier appTokenId =
|
||||||
new AMRMTokenIdentifier(attemptId);
|
new AMRMTokenIdentifier(attemptId);
|
||||||
Token<AMRMTokenIdentifier> appToken =
|
Token<AMRMTokenIdentifier> appToken =
|
||||||
new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
|
new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
|
||||||
appToken.setService(new Text("appToken service"));
|
appToken.setService(new Text("appToken service"));
|
||||||
|
|
||||||
ClientTokenIdentifier clientTokenId = new ClientTokenIdentifier(attemptId);
|
ClientToAMTokenIdentifier clientToAMTokenId =
|
||||||
clientTokenMgr.registerApplication(attemptId);
|
new ClientToAMTokenIdentifier(attemptId);
|
||||||
Token<ClientTokenIdentifier> clientToken =
|
clientToAMTokenMgr.registerApplication(attemptId);
|
||||||
new Token<ClientTokenIdentifier>(clientTokenId, clientTokenMgr);
|
Token<ClientToAMTokenIdentifier> clientToAMToken =
|
||||||
clientToken.setService(new Text("clientToken service"));
|
new Token<ClientToAMTokenIdentifier>(clientToAMTokenId, clientToAMTokenMgr);
|
||||||
|
clientToAMToken.setService(new Text("clientToAMToken service"));
|
||||||
List<Token<?>> tokenPair = new ArrayList<Token<?>>();
|
List<Token<?>> tokenPair = new ArrayList<Token<?>>();
|
||||||
tokenPair.add(0, appToken);
|
tokenPair.add(0, appToken);
|
||||||
tokenPair.add(1, clientToken);
|
tokenPair.add(1, clientToAMToken);
|
||||||
return tokenPair;
|
return tokenPair;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,17 +19,18 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.annotation.Annotation;
|
import java.lang.annotation.Annotation;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
import javax.security.sasl.SaslException;
|
import javax.security.sasl.SaslException;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
@ -45,26 +46,21 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.security.token.TokenInfo;
|
import org.apache.hadoop.security.token.TokenInfo;
|
||||||
import org.apache.hadoop.security.token.TokenSelector;
|
import org.apache.hadoop.security.token.TokenSelector;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSelector;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRMWithCustomAMLauncher;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRMWithCustomAMLauncher;
|
||||||
|
@ -74,7 +70,7 @@ import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestClientTokens {
|
public class TestClientToAMTokens {
|
||||||
|
|
||||||
private interface CustomProtocol {
|
private interface CustomProtocol {
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
|
@ -97,7 +93,7 @@ public class TestClientTokens {
|
||||||
@Override
|
@Override
|
||||||
public Class<? extends TokenSelector<? extends TokenIdentifier>>
|
public Class<? extends TokenSelector<? extends TokenIdentifier>>
|
||||||
value() {
|
value() {
|
||||||
return ClientTokenSelector.class;
|
return ClientToAMTokenSelector.class;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -112,14 +108,15 @@ public class TestClientTokens {
|
||||||
CustomProtocol {
|
CustomProtocol {
|
||||||
|
|
||||||
private final ApplicationAttemptId appAttemptId;
|
private final ApplicationAttemptId appAttemptId;
|
||||||
private final String secretKey;
|
private final byte[] secretKey;
|
||||||
private InetSocketAddress address;
|
private InetSocketAddress address;
|
||||||
private boolean pinged = false;
|
private boolean pinged = false;
|
||||||
|
private ClientToAMTokenSecretManager secretManager;
|
||||||
|
|
||||||
public CustomAM(ApplicationAttemptId appId, String secretKeyStr) {
|
public CustomAM(ApplicationAttemptId appId, byte[] secretKey) {
|
||||||
super("CustomAM");
|
super("CustomAM");
|
||||||
this.appAttemptId = appId;
|
this.appAttemptId = appId;
|
||||||
this.secretKey = secretKeyStr;
|
this.secretKey = secretKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -131,9 +128,7 @@ public class TestClientTokens {
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
|
|
||||||
ClientToAMTokenSecretManager secretManager = null;
|
secretManager = new ClientToAMTokenSecretManager(this.appAttemptId, secretKey);
|
||||||
byte[] bytes = Base64.decodeBase64(this.secretKey);
|
|
||||||
secretManager = new ClientToAMTokenSecretManager(this.appAttemptId, bytes);
|
|
||||||
Server server;
|
Server server;
|
||||||
try {
|
try {
|
||||||
server =
|
server =
|
||||||
|
@ -147,44 +142,22 @@ public class TestClientTokens {
|
||||||
this.address = NetUtils.getConnectAddress(server);
|
this.address = NetUtils.getConnectAddress(server);
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
||||||
|
return this.secretManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class CustomNM implements ContainerManagementProtocol {
|
|
||||||
|
|
||||||
public String clientTokensSecret;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
|
||||||
throws YarnException {
|
|
||||||
this.clientTokensSecret =
|
|
||||||
request.getContainerLaunchContext().getEnvironment()
|
|
||||||
.get(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public StopContainerResponse stopContainer(StopContainerRequest request)
|
|
||||||
throws YarnException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetContainerStatusResponse getContainerStatus(
|
|
||||||
GetContainerStatusRequest request) throws YarnException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClientTokens() throws Exception {
|
public void testClientToAMs() throws Exception {
|
||||||
|
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
"kerberos");
|
"kerberos");
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
|
||||||
CustomNM containerManager = new CustomNM();
|
ContainerManagementProtocol containerManager =
|
||||||
|
mock(ContainerManagementProtocol.class);
|
||||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
|
|
||||||
MockRM rm = new MockRMWithCustomAMLauncher(conf, containerManager) {
|
MockRM rm = new MockRMWithCustomAMLauncher(conf, containerManager) {
|
||||||
|
@ -207,13 +180,16 @@ public class TestClientTokens {
|
||||||
|
|
||||||
// Submit an app
|
// Submit an app
|
||||||
RMApp app = rm.submitApp(1024);
|
RMApp app = rm.submitApp(1024);
|
||||||
dispatcher.await();
|
|
||||||
|
|
||||||
// Set up a node.
|
// Set up a node.
|
||||||
MockNM nm1 = rm.registerNode("localhost:1234", 3072);
|
MockNM nm1 = rm.registerNode("localhost:1234", 3072);
|
||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
|
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
// Get the app-report.
|
// Get the app-report.
|
||||||
GetApplicationReportRequest request =
|
GetApplicationReportRequest request =
|
||||||
Records.newRecord(GetApplicationReportRequest.class);
|
Records.newRecord(GetApplicationReportRequest.class);
|
||||||
|
@ -221,21 +197,43 @@ public class TestClientTokens {
|
||||||
GetApplicationReportResponse reportResponse =
|
GetApplicationReportResponse reportResponse =
|
||||||
rm.getClientRMService().getApplicationReport(request);
|
rm.getClientRMService().getApplicationReport(request);
|
||||||
ApplicationReport appReport = reportResponse.getApplicationReport();
|
ApplicationReport appReport = reportResponse.getApplicationReport();
|
||||||
org.apache.hadoop.yarn.api.records.Token clientToken = appReport.getClientToken();
|
org.apache.hadoop.yarn.api.records.Token clientToAMToken =
|
||||||
|
appReport.getClientToAMToken();
|
||||||
|
|
||||||
// Wait till AM is 'launched'
|
ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId();
|
||||||
int waitTime = 0;
|
final MockAM mockAM =
|
||||||
while (containerManager.clientTokensSecret == null && waitTime++ < 20) {
|
new MockAM(rm.getRMContext(), rm.getApplicationMasterService(),
|
||||||
Thread.sleep(1000);
|
app.getCurrentAppAttempt().getAppAttemptId());
|
||||||
|
UserGroupInformation appUgi =
|
||||||
|
UserGroupInformation.createRemoteUser(appAttempt.toString());
|
||||||
|
RegisterApplicationMasterResponse response =
|
||||||
|
appUgi.doAs(new PrivilegedAction<RegisterApplicationMasterResponse>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegisterApplicationMasterResponse run() {
|
||||||
|
RegisterApplicationMasterResponse response = null;
|
||||||
|
try {
|
||||||
|
response = mockAM.registerAppAttempt();
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail("Exception was not expected");
|
||||||
}
|
}
|
||||||
Assert.assertNotNull(containerManager.clientTokensSecret);
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// ClientToAMToken master key should have been received on register
|
||||||
|
// application master response.
|
||||||
|
Assert.assertNotNull(response.getClientToAMTokenMasterKey());
|
||||||
|
Assert
|
||||||
|
.assertTrue(response.getClientToAMTokenMasterKey().array().length > 0);
|
||||||
|
|
||||||
// Start the AM with the correct shared-secret.
|
// Start the AM with the correct shared-secret.
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
app.getAppAttempts().keySet().iterator().next();
|
app.getAppAttempts().keySet().iterator().next();
|
||||||
Assert.assertNotNull(appAttemptId);
|
Assert.assertNotNull(appAttemptId);
|
||||||
final CustomAM am =
|
final CustomAM am =
|
||||||
new CustomAM(appAttemptId, containerManager.clientTokensSecret);
|
new CustomAM(appAttemptId, response.getClientToAMTokenMasterKey()
|
||||||
|
.array());
|
||||||
am.init(conf);
|
am.init(conf);
|
||||||
am.start();
|
am.start();
|
||||||
|
|
||||||
|
@ -256,17 +254,17 @@ public class TestClientTokens {
|
||||||
|
|
||||||
// Verify denial for a malicious user
|
// Verify denial for a malicious user
|
||||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("me");
|
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("me");
|
||||||
Token<ClientTokenIdentifier> token =
|
Token<ClientToAMTokenIdentifier> token =
|
||||||
ProtoUtils.convertFromProtoFormat(clientToken, am.address);
|
ProtoUtils.convertFromProtoFormat(clientToAMToken, am.address);
|
||||||
|
|
||||||
// Malicious user, messes with appId
|
// Malicious user, messes with appId
|
||||||
ClientTokenIdentifier maliciousID =
|
ClientToAMTokenIdentifier maliciousID =
|
||||||
new ClientTokenIdentifier(BuilderUtils.newApplicationAttemptId(
|
new ClientToAMTokenIdentifier(BuilderUtils.newApplicationAttemptId(
|
||||||
BuilderUtils.newApplicationId(app.getApplicationId()
|
BuilderUtils.newApplicationId(app.getApplicationId()
|
||||||
.getClusterTimestamp(), 42), 43));
|
.getClusterTimestamp(), 42), 43));
|
||||||
|
|
||||||
Token<ClientTokenIdentifier> maliciousToken =
|
Token<ClientToAMTokenIdentifier> maliciousToken =
|
||||||
new Token<ClientTokenIdentifier>(maliciousID.getBytes(),
|
new Token<ClientToAMTokenIdentifier>(maliciousID.getBytes(),
|
||||||
token.getPassword(), token.getKind(),
|
token.getPassword(), token.getKind(),
|
||||||
token.getService());
|
token.getService());
|
||||||
ugi.addToken(maliciousToken);
|
ugi.addToken(maliciousToken);
|
Loading…
Reference in New Issue