YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the sake of localization and log-aggregation for long-running services. Contributed by Jian He.
This commit is contained in:
parent
5b1dfe78b8
commit
a16d022ca4
|
@ -400,6 +400,10 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2703. Added logUploadedTime into LogValue for better display. (Xuan Gong
|
||||
via zjshen)
|
||||
|
||||
YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the
|
||||
sake of localization and log-aggregation for long-running services. (Jian He
|
||||
via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -695,6 +695,10 @@ public class YarnConfiguration extends Configuration {
|
|||
RM_PREFIX + "delegation-token-renewer.thread-count";
|
||||
public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
|
||||
|
||||
public static final String RM_PROXY_USER_PRIVILEGES_ENABLED = RM_PREFIX
|
||||
+ "proxy-user-privileges.enabled";
|
||||
public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
|
||||
|
||||
/** Whether to enable log aggregation */
|
||||
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
|
||||
+ "log-aggregation-enable";
|
||||
|
|
|
@ -553,6 +553,21 @@
|
|||
<value>30000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>If true, ResourceManager will have proxy-user privileges.
|
||||
Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to
|
||||
do localization and log-aggregation on behalf of the user. If this is set to true,
|
||||
ResourceManager is able to request new hdfs delegation tokens on behalf of
|
||||
the user. This is needed by long-running-service, because the hdfs tokens
|
||||
will eventually expire and YARN requires new valid tokens to do localization
|
||||
and log-aggregation. Note that to enable this use case, the corresponding
|
||||
HDFS NameNode has to configure ResourceManager as the proxy-user so that
|
||||
ResourceManager can itself ask for new tokens on behalf of the user when
|
||||
tokens are past their max-life-time.</description>
|
||||
<name>yarn.resourcemanager.proxy-user-privileges.enabled</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Interval for the roll over for the master key used to generate
|
||||
application tokens
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -58,4 +60,11 @@ public interface NodeHeartbeatResponse {
|
|||
String getDiagnosticsMessage();
|
||||
|
||||
void setDiagnosticsMessage(String diagnosticsMessage);
|
||||
|
||||
// Credentials (i.e. hdfs tokens) needed by NodeManagers for application
|
||||
// localizations and logAggreations.
|
||||
Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
|
||||
|
||||
void setSystemCredentialsForApps(
|
||||
Map<ApplicationId, ByteBuffer> systemCredentials);
|
||||
}
|
||||
|
|
|
@ -18,21 +18,26 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
|
@ -49,6 +54,8 @@ public class NodeHeartbeatResponsePBImpl extends
|
|||
private List<ContainerId> containersToCleanup = null;
|
||||
private List<ContainerId> containersToBeRemovedFromNM = null;
|
||||
private List<ApplicationId> applicationsToCleanup = null;
|
||||
private Map<ApplicationId, ByteBuffer> systemCredentials = null;
|
||||
|
||||
private MasterKey containerTokenMasterKey = null;
|
||||
private MasterKey nmTokenMasterKey = null;
|
||||
|
||||
|
@ -62,7 +69,7 @@ public class NodeHeartbeatResponsePBImpl extends
|
|||
}
|
||||
|
||||
public NodeHeartbeatResponseProto getProto() {
|
||||
mergeLocalToProto();
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
|
@ -86,6 +93,19 @@ public class NodeHeartbeatResponsePBImpl extends
|
|||
builder.setNmTokenMasterKey(
|
||||
convertToProtoFormat(this.nmTokenMasterKey));
|
||||
}
|
||||
if (this.systemCredentials != null) {
|
||||
addSystemCredentialsToProto();
|
||||
}
|
||||
}
|
||||
|
||||
private void addSystemCredentialsToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearSystemCredentialsForApps();
|
||||
for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
|
||||
builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder()
|
||||
.setAppId(convertToProtoFormat(entry.getKey()))
|
||||
.setCredentialsForApp(ProtoUtils.convertToProtoFormat(entry.getValue())));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -387,6 +407,38 @@ public class NodeHeartbeatResponsePBImpl extends
|
|||
builder.addAllApplicationsToCleanup(iterable);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
|
||||
if (this.systemCredentials != null) {
|
||||
return this.systemCredentials;
|
||||
}
|
||||
initSystemCredentials();
|
||||
return systemCredentials;
|
||||
}
|
||||
|
||||
private void initSystemCredentials() {
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<SystemCredentialsForAppsProto> list = p.getSystemCredentialsForAppsList();
|
||||
this.systemCredentials = new HashMap<ApplicationId, ByteBuffer> ();
|
||||
for (SystemCredentialsForAppsProto c : list) {
|
||||
ApplicationId appId = convertFromProtoFormat(c.getAppId());
|
||||
ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp());
|
||||
this.systemCredentials.put(appId, byteBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSystemCredentialsForApps(
|
||||
Map<ApplicationId, ByteBuffer> systemCredentials) {
|
||||
if (systemCredentials == null || systemCredentials.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
maybeInitBuilder();
|
||||
this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>();
|
||||
this.systemCredentials.putAll(systemCredentials);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNextHeartBeatInterval() {
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
|
|
@ -59,6 +59,12 @@ message NodeHeartbeatResponseProto {
|
|||
optional int64 nextHeartBeatInterval = 7;
|
||||
optional string diagnostics_message = 8;
|
||||
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
|
||||
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
|
||||
}
|
||||
|
||||
message SystemCredentialsForAppsProto {
|
||||
optional ApplicationIdProto appId = 1;
|
||||
optional bytes credentialsForApp = 2;
|
||||
}
|
||||
|
||||
message NMContainerStatusProto {
|
||||
|
|
|
@ -18,9 +18,18 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
|
@ -29,10 +38,10 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -93,4 +102,33 @@ public class TestProtocolRecords {
|
|||
Assert.assertEquals(1, requestProto.getRunningApplications().size());
|
||||
Assert.assertEquals(appId, requestProto.getRunningApplications().get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeHeartBeatResponse() throws IOException {
|
||||
NodeHeartbeatResponse record =
|
||||
Records.newRecord(NodeHeartbeatResponse.class);
|
||||
Map<ApplicationId, ByteBuffer> appCredentials =
|
||||
new HashMap<ApplicationId, ByteBuffer>();
|
||||
Credentials app1Cred = new Credentials();
|
||||
|
||||
Token<DelegationTokenIdentifier> token1 =
|
||||
new Token<DelegationTokenIdentifier>();
|
||||
token1.setKind(new Text("kind1"));
|
||||
app1Cred.addToken(new Text("token1"), token1);
|
||||
Token<DelegationTokenIdentifier> token2 =
|
||||
new Token<DelegationTokenIdentifier>();
|
||||
token2.setKind(new Text("kind2"));
|
||||
app1Cred.addToken(new Text("token2"), token2);
|
||||
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
app1Cred.writeTokenStorageToStream(dob);
|
||||
ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
|
||||
record.setSystemCredentialsForApps(appCredentials);
|
||||
|
||||
NodeHeartbeatResponse proto =
|
||||
new NodeHeartbeatResponsePBImpl(
|
||||
((NodeHeartbeatResponsePBImpl) record).getProto());
|
||||
Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -54,6 +56,8 @@ public interface Context {
|
|||
|
||||
ConcurrentMap<ApplicationId, Application> getApplications();
|
||||
|
||||
Map<ApplicationId, Credentials> getSystemCredentialsForApps();
|
||||
|
||||
ConcurrentMap<ContainerId, Container> getContainers();
|
||||
|
||||
NMContainerTokenSecretManager getContainerTokenSecretManager();
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
@ -32,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
@ -313,6 +316,10 @@ public class NodeManager extends CompositeService
|
|||
private NodeId nodeId = null;
|
||||
protected final ConcurrentMap<ApplicationId, Application> applications =
|
||||
new ConcurrentHashMap<ApplicationId, Application>();
|
||||
|
||||
private Map<ApplicationId, Credentials> systemCredentials =
|
||||
new HashMap<ApplicationId, Credentials>();
|
||||
|
||||
protected final ConcurrentMap<ContainerId, Container> containers =
|
||||
new ConcurrentSkipListMap<ContainerId, Container>();
|
||||
|
||||
|
@ -420,6 +427,16 @@ public class NodeManager extends CompositeService
|
|||
public void setDecommissioned(boolean isDecommissioned) {
|
||||
this.isDecommissioned = isDecommissioned;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationId, Credentials> getSystemCredentialsForApps() {
|
||||
return systemCredentials;
|
||||
}
|
||||
|
||||
public void setSystemCrendentials(
|
||||
Map<ApplicationId, Credentials> systemCredentials) {
|
||||
this.systemCredentials = systemCredentials;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -36,7 +37,9 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.VersionUtil;
|
||||
|
@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
|
@ -525,6 +529,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
return this.rmIdentifier;
|
||||
}
|
||||
|
||||
private static Map<ApplicationId, Credentials> parseCredentials(
|
||||
Map<ApplicationId, ByteBuffer> systemCredentials) throws IOException {
|
||||
Map<ApplicationId, Credentials> map =
|
||||
new HashMap<ApplicationId, Credentials>();
|
||||
for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
|
||||
Credentials credentials = new Credentials();
|
||||
DataInputByteBuffer buf = new DataInputByteBuffer();
|
||||
ByteBuffer buffer = entry.getValue();
|
||||
buffer.rewind();
|
||||
buf.reset(buffer);
|
||||
credentials.readTokenStorageStream(buf);
|
||||
map.put(entry.getKey(), credentials);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Retrieved credentials form RM: " + map);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
protected void startStatusUpdater() {
|
||||
|
||||
statusUpdaterRunnable = new Runnable() {
|
||||
|
@ -598,6 +621,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
new CMgrCompletedAppsEvent(appsToCleanup,
|
||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
||||
}
|
||||
|
||||
Map<ApplicationId, ByteBuffer> systemCredentials =
|
||||
response.getSystemCredentialsForApps();
|
||||
if (systemCredentials != null && !systemCredentials.isEmpty()) {
|
||||
((NMContext) context)
|
||||
.setSystemCrendentials(parseCredentials(systemCredentials));
|
||||
}
|
||||
} catch (ConnectException e) {
|
||||
//catch and throw the exception if tried MAX wait time to connect RM
|
||||
dispatcher.getEventHandler().handle(
|
||||
|
|
|
@ -186,7 +186,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
this.metrics = metrics;
|
||||
|
||||
rsrcLocalizationSrvc =
|
||||
createResourceLocalizationService(exec, deletionContext);
|
||||
createResourceLocalizationService(exec, deletionContext, context);
|
||||
addService(rsrcLocalizationSrvc);
|
||||
|
||||
containersLauncher = createContainersLauncher(context, exec);
|
||||
|
@ -362,9 +362,9 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
|
||||
protected ResourceLocalizationService createResourceLocalizationService(
|
||||
ContainerExecutor exec, DeletionService deletionContext) {
|
||||
ContainerExecutor exec, DeletionService deletionContext, Context context) {
|
||||
return new ResourceLocalizationService(this.dispatcher, exec,
|
||||
deletionContext, dirsHandler, context.getNMStateStore());
|
||||
deletionContext, dirsHandler, context);
|
||||
}
|
||||
|
||||
protected ContainersLauncher createContainersLauncher(Context context,
|
||||
|
|
|
@ -83,11 +83,11 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
|
@ -158,6 +158,7 @@ public class ResourceLocalizationService extends CompositeService
|
|||
private LocalResourcesTracker publicRsrc;
|
||||
|
||||
private LocalDirsHandlerService dirsHandler;
|
||||
private Context nmContext;
|
||||
|
||||
/**
|
||||
* Map of LocalResourceTrackers keyed by username, for private
|
||||
|
@ -177,7 +178,7 @@ public class ResourceLocalizationService extends CompositeService
|
|||
|
||||
public ResourceLocalizationService(Dispatcher dispatcher,
|
||||
ContainerExecutor exec, DeletionService delService,
|
||||
LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
|
||||
LocalDirsHandlerService dirsHandler, Context context) {
|
||||
|
||||
super(ResourceLocalizationService.class.getName());
|
||||
this.exec = exec;
|
||||
|
@ -189,7 +190,8 @@ public class ResourceLocalizationService extends CompositeService
|
|||
new ThreadFactoryBuilder()
|
||||
.setNameFormat("ResourceLocalizationService Cache Cleanup")
|
||||
.build());
|
||||
this.stateStore = stateStore;
|
||||
this.stateStore = context.getNMStateStore();
|
||||
this.nmContext = context;
|
||||
}
|
||||
|
||||
FileContext getLocalFileContext(Configuration conf) {
|
||||
|
@ -1110,11 +1112,36 @@ public class ResourceLocalizationService extends CompositeService
|
|||
}
|
||||
}
|
||||
|
||||
private Credentials getSystemCredentialsSentFromRM(
|
||||
LocalizerContext localizerContext) throws IOException {
|
||||
ApplicationId appId =
|
||||
localizerContext.getContainerId().getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
Credentials systemCredentials =
|
||||
nmContext.getSystemCredentialsForApps().get(appId);
|
||||
if (systemCredentials == null) {
|
||||
return null;
|
||||
}
|
||||
LOG.info("Adding new framework tokens from RM for " + appId);
|
||||
for (Token<?> token : systemCredentials.getAllTokens()) {
|
||||
LOG.info("Adding new application-token for localization: " + token);
|
||||
}
|
||||
return systemCredentials;
|
||||
}
|
||||
|
||||
private void writeCredentials(Path nmPrivateCTokensPath)
|
||||
throws IOException {
|
||||
DataOutputStream tokenOut = null;
|
||||
try {
|
||||
Credentials credentials = context.getCredentials();
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
Credentials systemCredentials =
|
||||
getSystemCredentialsSentFromRM(context);
|
||||
if (systemCredentials != null) {
|
||||
credentials = systemCredentials;
|
||||
}
|
||||
}
|
||||
|
||||
FileContext lfs = getLocalFileContext(getConfig());
|
||||
tokenOut =
|
||||
lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
|
||||
|
|
|
@ -39,9 +39,10 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
|
@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
|
@ -342,6 +344,18 @@ public class LogAggregationService extends AbstractService implements
|
|||
Map<ApplicationAccessType, String> appAcls,
|
||||
LogAggregationContext logAggregationContext) {
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
Credentials systemCredentials =
|
||||
context.getSystemCredentialsForApps().get(appId);
|
||||
if (systemCredentials != null) {
|
||||
LOG.info("Adding new framework tokens from RM for " + appId);
|
||||
for (Token<?> token : systemCredentials.getAllTokens()) {
|
||||
LOG.info("Adding new application-token for log-aggregation: " + token);
|
||||
}
|
||||
credentials = systemCredentials;
|
||||
}
|
||||
}
|
||||
|
||||
// Get user's FileSystem credentials
|
||||
final UserGroupInformation userUgi =
|
||||
UserGroupInformation.createRemoteUser(user);
|
||||
|
|
|
@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
|
||||
public class DummyContainerManager extends ContainerManagerImpl {
|
||||
|
@ -74,9 +73,9 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected ResourceLocalizationService createResourceLocalizationService(
|
||||
ContainerExecutor exec, DeletionService deletionContext) {
|
||||
ContainerExecutor exec, DeletionService deletionContext, Context context) {
|
||||
return new ResourceLocalizationService(super.dispatcher, exec,
|
||||
deletionContext, super.dirsHandler, new NMNullStateStoreService()) {
|
||||
deletionContext, super.dirsHandler, context) {
|
||||
@Override
|
||||
public void handle(LocalizationEvent event) {
|
||||
switch (event.getType()) {
|
||||
|
|
|
@ -44,10 +44,14 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.service.ServiceOperations;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -561,6 +565,7 @@ public class TestNodeStatusUpdater {
|
|||
|
||||
// Test NodeStatusUpdater sends the right container statuses each time it
|
||||
// heart beats.
|
||||
private Credentials expectedCredentials = new Credentials();
|
||||
private class MyResourceTracker4 implements ResourceTracker {
|
||||
|
||||
public NodeAction registerNodeAction = NodeAction.NORMAL;
|
||||
|
@ -576,6 +581,11 @@ public class TestNodeStatusUpdater {
|
|||
createContainerStatus(5, ContainerState.COMPLETE);
|
||||
|
||||
public MyResourceTracker4(Context context) {
|
||||
// create app Credentials
|
||||
org.apache.hadoop.security.token.Token<DelegationTokenIdentifier> token1 =
|
||||
new org.apache.hadoop.security.token.Token<DelegationTokenIdentifier>();
|
||||
token1.setKind(new Text("kind1"));
|
||||
expectedCredentials.addToken(new Text("token1"), token1);
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
|
@ -694,6 +704,14 @@ public class TestNodeStatusUpdater {
|
|||
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
|
||||
heartBeatNodeAction, null, null, null, null, 1000L);
|
||||
nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
|
||||
Map<ApplicationId, ByteBuffer> appCredentials =
|
||||
new HashMap<ApplicationId, ByteBuffer>();
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
expectedCredentials.writeTokenStorageToStream(dob);
|
||||
ByteBuffer byteBuffer1 =
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
|
||||
nhResponse.setSystemCredentialsForApps(appCredentials);
|
||||
return nhResponse;
|
||||
}
|
||||
}
|
||||
|
@ -1293,6 +1311,8 @@ public class TestNodeStatusUpdater {
|
|||
if(assertionFailedInThread.get()) {
|
||||
Assert.fail("ContainerStatus Backup failed");
|
||||
}
|
||||
Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps()
|
||||
.get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1")));
|
||||
nm.stop();
|
||||
}
|
||||
|
||||
|
|
|
@ -278,8 +278,7 @@ public class TestContainerManagerRecovery {
|
|||
private ContainerManagerImpl createContainerManager(Context context) {
|
||||
final LogHandler logHandler = mock(LogHandler.class);
|
||||
final ResourceLocalizationService rsrcSrv =
|
||||
new ResourceLocalizationService(null, null, null, null,
|
||||
context.getNMStateStore()) {
|
||||
new ResourceLocalizationService(null, null, null, null, context) {
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
}
|
||||
|
@ -320,7 +319,7 @@ public class TestContainerManagerRecovery {
|
|||
|
||||
@Override
|
||||
protected ResourceLocalizationService createResourceLocalizationService(
|
||||
ContainerExecutor exec, DeletionService deletionContext) {
|
||||
ContainerExecutor exec, DeletionService deletionContext, Context context) {
|
||||
return rsrcSrv;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,12 @@ import org.junit.Assert;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestLocalCacheDirectoryManager {
|
||||
|
@ -73,8 +78,12 @@ public class TestLocalCacheDirectoryManager {
|
|||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
|
||||
Exception e = null;
|
||||
NMContext nmContext =
|
||||
new NMContext(new NMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService());
|
||||
ResourceLocalizationService service =
|
||||
new ResourceLocalizationService(null, null, null, null, null);
|
||||
new ResourceLocalizationService(null, null, null, null, nmContext);
|
||||
try {
|
||||
service.init(conf);
|
||||
} catch (Exception e1) {
|
||||
|
|
|
@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
||||
|
@ -138,6 +139,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
|
|||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.After;
|
||||
|
@ -159,7 +163,7 @@ public class TestResourceLocalizationService {
|
|||
private Configuration conf;
|
||||
private AbstractFileSystem spylfs;
|
||||
private FileContext lfs;
|
||||
|
||||
private NMContext nmContext;
|
||||
@BeforeClass
|
||||
public static void setupClass() {
|
||||
mockServer = mock(Server.class);
|
||||
|
@ -174,6 +178,9 @@ public class TestResourceLocalizationService {
|
|||
|
||||
String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
|
||||
nmContext = new NMContext(new NMContainerTokenSecretManager(
|
||||
conf), new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService());
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -206,8 +213,7 @@ public class TestResourceLocalizationService {
|
|||
|
||||
ResourceLocalizationService locService =
|
||||
spy(new ResourceLocalizationService(dispatcher, exec, delService,
|
||||
diskhandler,
|
||||
new NMNullStateStoreService()));
|
||||
diskhandler, nmContext));
|
||||
doReturn(lfs)
|
||||
.when(locService).getLocalFileContext(isA(Configuration.class));
|
||||
try {
|
||||
|
@ -268,8 +274,7 @@ public class TestResourceLocalizationService {
|
|||
|
||||
ResourceLocalizationService locService =
|
||||
spy(new ResourceLocalizationService(dispatcher, exec, delService,
|
||||
diskhandler,
|
||||
nmStateStoreService));
|
||||
diskhandler,nmContext));
|
||||
doReturn(lfs)
|
||||
.when(locService).getLocalFileContext(isA(Configuration.class));
|
||||
try {
|
||||
|
@ -340,8 +345,7 @@ public class TestResourceLocalizationService {
|
|||
|
||||
ResourceLocalizationService rawService =
|
||||
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||
dirsHandler,
|
||||
new NMNullStateStoreService());
|
||||
dirsHandler, nmContext);
|
||||
ResourceLocalizationService spyService = spy(rawService);
|
||||
doReturn(mockServer).when(spyService).createServer();
|
||||
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
|
||||
|
@ -751,8 +755,7 @@ public class TestResourceLocalizationService {
|
|||
|
||||
ResourceLocalizationService rawService =
|
||||
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||
dirsHandler,
|
||||
new NMNullStateStoreService());
|
||||
dirsHandler, nmContext);
|
||||
ResourceLocalizationService spyService = spy(rawService);
|
||||
doReturn(mockServer).when(spyService).createServer();
|
||||
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
|
||||
|
@ -965,8 +968,7 @@ public class TestResourceLocalizationService {
|
|||
try {
|
||||
ResourceLocalizationService rawService =
|
||||
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||
dirsHandler,
|
||||
new NMNullStateStoreService());
|
||||
dirsHandler, nmContext);
|
||||
ResourceLocalizationService spyService = spy(rawService);
|
||||
doReturn(mockServer).when(spyService).createServer();
|
||||
doReturn(lfs).when(spyService).getLocalFileContext(
|
||||
|
@ -1075,7 +1077,7 @@ public class TestResourceLocalizationService {
|
|||
try {
|
||||
ResourceLocalizationService rawService =
|
||||
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||
dirsHandlerSpy, new NMNullStateStoreService());
|
||||
dirsHandlerSpy, nmContext);
|
||||
ResourceLocalizationService spyService = spy(rawService);
|
||||
doReturn(mockServer).when(spyService).createServer();
|
||||
doReturn(lfs).when(spyService).getLocalFileContext(
|
||||
|
@ -1188,7 +1190,7 @@ public class TestResourceLocalizationService {
|
|||
|
||||
ResourceLocalizationService rls =
|
||||
new ResourceLocalizationService(dispatcher1, exec, delService,
|
||||
localDirHandler, new NMNullStateStoreService());
|
||||
localDirHandler, nmContext);
|
||||
dispatcher1.register(LocalizationEventType.class, rls);
|
||||
rls.init(conf);
|
||||
|
||||
|
@ -1341,7 +1343,7 @@ public class TestResourceLocalizationService {
|
|||
|
||||
ResourceLocalizationService rls =
|
||||
new ResourceLocalizationService(dispatcher1, exec, delService,
|
||||
localDirHandler, new NMNullStateStoreService());
|
||||
localDirHandler, nmContext);
|
||||
dispatcher1.register(LocalizationEventType.class, rls);
|
||||
rls.init(conf);
|
||||
|
||||
|
@ -1507,7 +1509,7 @@ public class TestResourceLocalizationService {
|
|||
// it as otherwise it will remove requests from pending queue.
|
||||
ResourceLocalizationService rawService =
|
||||
new ResourceLocalizationService(dispatcher1, exec, delService,
|
||||
dirsHandler, new NMNullStateStoreService());
|
||||
dirsHandler, nmContext);
|
||||
ResourceLocalizationService spyService = spy(rawService);
|
||||
dispatcher1.register(LocalizationEventType.class, spyService);
|
||||
spyService.init(conf);
|
||||
|
@ -1795,9 +1797,13 @@ public class TestResourceLocalizationService {
|
|||
ContainerExecutor exec = mock(ContainerExecutor.class);
|
||||
LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
|
||||
DeletionService delService = mock(DeletionService.class);
|
||||
NMContext nmContext =
|
||||
new NMContext(new NMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), stateStore);
|
||||
ResourceLocalizationService rawService =
|
||||
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||
dirsHandler, stateStore);
|
||||
dirsHandler, nmContext);
|
||||
ResourceLocalizationService spyService = spy(rawService);
|
||||
doReturn(mockServer).when(spyService).createServer();
|
||||
doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
|
||||
|
@ -1861,7 +1867,7 @@ public class TestResourceLocalizationService {
|
|||
// setup mocks
|
||||
ResourceLocalizationService rawService =
|
||||
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||
mockDirsHandler, new NMNullStateStoreService());
|
||||
mockDirsHandler, nmContext);
|
||||
ResourceLocalizationService spyService = spy(rawService);
|
||||
doReturn(mockServer).when(spyService).createServer();
|
||||
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
|
||||
|
|
|
@ -278,7 +278,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
try {
|
||||
credentials = parseCredentials(submissionContext);
|
||||
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
|
||||
credentials, submissionContext.getCancelTokensWhenComplete());
|
||||
credentials, submissionContext.getCancelTokensWhenComplete(),
|
||||
application.getUser());
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unable to parse credentials.", e);
|
||||
// Sending APP_REJECTED is fine, since we assume that the
|
||||
|
@ -325,7 +326,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
credentials = parseCredentials(appContext);
|
||||
// synchronously renew delegation token on recovery.
|
||||
rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
|
||||
credentials, appContext.getCancelTokensWhenComplete());
|
||||
credentials, appContext.getCancelTokensWhenComplete(),
|
||||
application.getUser());
|
||||
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unable to parse and renew delegation tokens.", e);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
|
@ -57,6 +58,8 @@ public interface RMContext {
|
|||
|
||||
ConcurrentMap<ApplicationId, RMApp> getRMApps();
|
||||
|
||||
ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
|
||||
|
||||
ConcurrentMap<String, RMNode> getInactiveRMNodes();
|
||||
|
||||
ConcurrentMap<NodeId, RMNode> getRMNodes();
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
|
@ -67,6 +68,9 @@ public class RMContextImpl implements RMContext {
|
|||
private final ConcurrentMap<String, RMNode> inactiveNodes
|
||||
= new ConcurrentHashMap<String, RMNode>();
|
||||
|
||||
private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>();
|
||||
|
||||
private boolean isHAEnabled;
|
||||
private boolean isWorkPreservingRecoveryEnabled;
|
||||
private HAServiceState haServiceState =
|
||||
|
@ -444,4 +448,8 @@ public class RMContextImpl implements RMContext {
|
|||
public void setSystemClock(Clock clock) {
|
||||
this.systemClock = clock;
|
||||
}
|
||||
|
||||
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
|
||||
return systemCredentials;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -31,6 +33,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
|
|||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.VersionUtil;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
|
@ -385,7 +388,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
|
||||
.getResponseId()) {
|
||||
LOG.info("Received duplicate heartbeat from node "
|
||||
+ rmNode.getNodeAddress());
|
||||
+ rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
|
||||
return lastNodeHeartbeatResponse;
|
||||
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
|
||||
.getResponseId()) {
|
||||
|
@ -410,6 +413,12 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
|
||||
populateKeys(request, nodeHeartBeatResponse);
|
||||
|
||||
ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
|
||||
rmContext.getSystemCredentialsForApps();
|
||||
if (!systemCredentials.isEmpty()) {
|
||||
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
|
||||
}
|
||||
|
||||
// 4. Send status to RMNode, saving the latest response.
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -45,14 +47,20 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
|
@ -82,12 +90,10 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
private DelegationTokenCancelThread dtCancelThread =
|
||||
new DelegationTokenCancelThread();
|
||||
private ThreadPoolExecutor renewerService;
|
||||
|
||||
// managing the list of tokens using Map
|
||||
// appId=>List<tokens>
|
||||
private Set<DelegationTokenToRenew> delegationTokens =
|
||||
Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
|
||||
|
||||
|
||||
private ConcurrentMap<ApplicationId, Set<DelegationTokenToRenew>> appTokens =
|
||||
new ConcurrentHashMap<ApplicationId, Set<DelegationTokenToRenew>>();
|
||||
|
||||
private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
|
||||
new ConcurrentHashMap<ApplicationId, Long>();
|
||||
|
||||
|
@ -99,20 +105,33 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
|
||||
|
||||
private boolean tokenKeepAliveEnabled;
|
||||
|
||||
private boolean hasProxyUserPrivileges;
|
||||
private long credentialsValidTimeRemaining;
|
||||
|
||||
// this config is supposedly not used by end-users.
|
||||
public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
|
||||
YarnConfiguration.RM_PREFIX + "system-credentials.valid-time-remaining";
|
||||
public static final long DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
|
||||
10800000; // 3h
|
||||
|
||||
public DelegationTokenRenewer() {
|
||||
super(DelegationTokenRenewer.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void serviceInit(Configuration conf) throws Exception {
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.hasProxyUserPrivileges =
|
||||
conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
|
||||
this.tokenKeepAliveEnabled =
|
||||
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
|
||||
this.tokenRemovalDelayMs =
|
||||
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
||||
|
||||
this.credentialsValidTimeRemaining =
|
||||
conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING,
|
||||
DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING);
|
||||
setLocalSecretManagerAndServiceAddr();
|
||||
renewerService = createNewThreadPoolService(conf);
|
||||
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
|
||||
|
@ -182,7 +201,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
if (renewalTimer != null) {
|
||||
renewalTimer.cancel();
|
||||
}
|
||||
delegationTokens.clear();
|
||||
appTokens.clear();
|
||||
this.renewerService.shutdown();
|
||||
dtCancelThread.interrupt();
|
||||
try {
|
||||
|
@ -212,22 +231,28 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
public long expirationDate;
|
||||
public TimerTask timerTask;
|
||||
public final boolean shouldCancelAtEnd;
|
||||
|
||||
public DelegationTokenToRenew(
|
||||
ApplicationId jId, Token<?> token,
|
||||
Configuration conf, long expirationDate, boolean shouldCancelAtEnd) {
|
||||
public long maxDate;
|
||||
public String user;
|
||||
|
||||
public DelegationTokenToRenew(ApplicationId jId, Token<?> token,
|
||||
Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
|
||||
String user) {
|
||||
this.token = token;
|
||||
this.user = user;
|
||||
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
|
||||
try {
|
||||
AbstractDelegationTokenIdentifier identifier =
|
||||
(AbstractDelegationTokenIdentifier) token.decodeIdentifier();
|
||||
maxDate = identifier.getMaxDate();
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
this.applicationId = jId;
|
||||
this.conf = conf;
|
||||
this.expirationDate = expirationDate;
|
||||
this.timerTask = null;
|
||||
this.shouldCancelAtEnd = shouldCancelAtEnd;
|
||||
if (this.token==null || this.applicationId==null || this.conf==null) {
|
||||
throw new IllegalArgumentException("Invalid params to renew token" +
|
||||
";token=" + this.token +
|
||||
";appId=" + this.applicationId +
|
||||
";conf=" + this.conf);
|
||||
}
|
||||
}
|
||||
|
||||
public void setTimerTask(TimerTask tTask) {
|
||||
|
@ -317,16 +342,14 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
}
|
||||
}
|
||||
}
|
||||
//adding token
|
||||
private void addTokenToList(DelegationTokenToRenew t) {
|
||||
delegationTokens.add(t);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Set<Token<?>> getDelegationTokens() {
|
||||
Set<Token<?>> tokens = new HashSet<Token<?>>();
|
||||
for(DelegationTokenToRenew delegationToken : delegationTokens) {
|
||||
tokens.add(delegationToken.token);
|
||||
for (Set<DelegationTokenToRenew> tokenList : appTokens.values()) {
|
||||
for (DelegationTokenToRenew token : tokenList) {
|
||||
tokens.add(token.token);
|
||||
}
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
@ -337,25 +360,28 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
* @param ts tokens
|
||||
* @param shouldCancelAtEnd true if tokens should be canceled when the app is
|
||||
* done else false.
|
||||
* @param user user
|
||||
* @throws IOException
|
||||
*/
|
||||
public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
|
||||
boolean shouldCancelAtEnd) {
|
||||
boolean shouldCancelAtEnd, String user) {
|
||||
processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(
|
||||
applicationId, ts, shouldCancelAtEnd));
|
||||
applicationId, ts, shouldCancelAtEnd, user));
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronously renew delegation tokens.
|
||||
* @param user user
|
||||
*/
|
||||
public void addApplicationSync(ApplicationId applicationId, Credentials ts,
|
||||
boolean shouldCancelAtEnd) throws IOException{
|
||||
boolean shouldCancelAtEnd, String user) throws IOException,
|
||||
InterruptedException {
|
||||
handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(
|
||||
applicationId, ts, shouldCancelAtEnd));
|
||||
applicationId, ts, shouldCancelAtEnd, user));
|
||||
}
|
||||
|
||||
private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
|
||||
throws IOException {
|
||||
throws IOException, InterruptedException {
|
||||
ApplicationId applicationId = evt.getApplicationId();
|
||||
Credentials ts = evt.getCredentials();
|
||||
boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
|
||||
|
@ -375,14 +401,21 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
// all renewable tokens are valid
|
||||
// At RM restart it is safe to assume that all the previously added tokens
|
||||
// are valid
|
||||
List<DelegationTokenToRenew> tokenList =
|
||||
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
|
||||
appTokens.put(applicationId,
|
||||
Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>()));
|
||||
Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>();
|
||||
boolean hasHdfsToken = false;
|
||||
for (Token<?> token : tokens) {
|
||||
if (token.isManaged()) {
|
||||
tokenList.add(new DelegationTokenToRenew(applicationId,
|
||||
token, getConfig(), now, shouldCancelAtEnd));
|
||||
token, getConfig(), now, shouldCancelAtEnd, evt.getUser()));
|
||||
if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
|
||||
LOG.info(applicationId + " found existing hdfs token " + token);
|
||||
hasHdfsToken = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!tokenList.isEmpty()) {
|
||||
// Renewing token and adding it to timer calls are separated purposefully
|
||||
// If user provides incorrect token then it should not be added for
|
||||
|
@ -395,14 +428,15 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
}
|
||||
}
|
||||
for (DelegationTokenToRenew dtr : tokenList) {
|
||||
addTokenToList(dtr);
|
||||
appTokens.get(applicationId).add(dtr);
|
||||
setTimerForTokenRenewal(dtr);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Registering token for renewal for:" + " service = "
|
||||
+ dtr.token.getService() + " for appId = " + dtr.applicationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasHdfsToken) {
|
||||
requestNewHdfsDelegationToken(applicationId, evt.getUser(),
|
||||
shouldCancelAtEnd);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -424,14 +458,16 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
}
|
||||
|
||||
Token<?> token = dttr.token;
|
||||
|
||||
try {
|
||||
renewToken(dttr);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Renewing delegation-token for:" + token.getService() +
|
||||
"; new expiration;" + dttr.expirationDate);
|
||||
requestNewHdfsDelegationTokenIfNeeded(dttr);
|
||||
// if the token is not replaced by a new token, renew the token
|
||||
if (appTokens.get(dttr.applicationId).contains(dttr)) {
|
||||
renewToken(dttr);
|
||||
setTimerForTokenRenewal(dttr);// set the next one
|
||||
} else {
|
||||
LOG.info("The token was removed already. Token = [" +dttr +"]");
|
||||
}
|
||||
|
||||
setTimerForTokenRenewal(dttr);// set the next one
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
|
||||
removeFailedDelegationToken(dttr);
|
||||
|
@ -455,12 +491,14 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
// calculate timer time
|
||||
long expiresIn = token.expirationDate - System.currentTimeMillis();
|
||||
long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
|
||||
|
||||
// need to create new task every time
|
||||
TimerTask tTask = new RenewalTimerTask(token);
|
||||
token.setTimerTask(tTask); // keep reference to the timer
|
||||
|
||||
renewalTimer.schedule(token.timerTask, new Date(renewIn));
|
||||
|
||||
LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
|
||||
+ token.applicationId);
|
||||
}
|
||||
|
||||
// renew a token
|
||||
|
@ -470,16 +508,99 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
// need to use doAs so that http can find the kerberos tgt
|
||||
// NOTE: token renewers should be responsible for the correct UGI!
|
||||
try {
|
||||
dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
|
||||
new PrivilegedExceptionAction<Long>(){
|
||||
@Override
|
||||
public Long run() throws Exception {
|
||||
return dttr.token.renew(dttr.conf);
|
||||
}
|
||||
});
|
||||
dttr.expirationDate =
|
||||
UserGroupInformation.getLoginUser().doAs(
|
||||
new PrivilegedExceptionAction<Long>() {
|
||||
@Override
|
||||
public Long run() throws Exception {
|
||||
return dttr.token.renew(dttr.conf);
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
LOG.info("Renewed delegation-token= [" + dttr + "], for "
|
||||
+ dttr.applicationId);
|
||||
}
|
||||
|
||||
// Request new hdfs token if the token is about to expire, and remove the old
|
||||
// token from the tokenToRenew list
|
||||
private void requestNewHdfsDelegationTokenIfNeeded(
|
||||
final DelegationTokenToRenew dttr) throws IOException,
|
||||
InterruptedException {
|
||||
|
||||
if (hasProxyUserPrivileges
|
||||
&& dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
|
||||
&& dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
|
||||
|
||||
// remove all old expiring hdfs tokens for this application.
|
||||
Set<DelegationTokenToRenew> tokenSet = appTokens.get(dttr.applicationId);
|
||||
if (tokenSet != null && !tokenSet.isEmpty()) {
|
||||
Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
|
||||
synchronized (tokenSet) {
|
||||
while (iter.hasNext()) {
|
||||
DelegationTokenToRenew t = iter.next();
|
||||
if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
|
||||
iter.remove();
|
||||
if (t.timerTask != null) {
|
||||
t.timerTask.cancel();
|
||||
}
|
||||
LOG.info("Removed expiring token " + t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.info("Token= (" + dttr + ") is expiring, request new token.");
|
||||
requestNewHdfsDelegationToken(dttr.applicationId, dttr.user,
|
||||
dttr.shouldCancelAtEnd);
|
||||
}
|
||||
}
|
||||
|
||||
private void requestNewHdfsDelegationToken(ApplicationId applicationId,
|
||||
String user, boolean shouldCancelAtEnd) throws IOException,
|
||||
InterruptedException {
|
||||
// Get new hdfs tokens for this user
|
||||
Credentials credentials = new Credentials();
|
||||
Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
|
||||
|
||||
// Add new tokens to the toRenew list.
|
||||
LOG.info("Received new tokens for " + applicationId + ". Received "
|
||||
+ newTokens.length + " tokens.");
|
||||
if (newTokens.length > 0) {
|
||||
for (Token<?> token : newTokens) {
|
||||
if (token.isManaged()) {
|
||||
DelegationTokenToRenew tokenToRenew =
|
||||
new DelegationTokenToRenew(applicationId, token, getConfig(),
|
||||
Time.now(), shouldCancelAtEnd, user);
|
||||
// renew the token to get the next expiration date.
|
||||
renewToken(tokenToRenew);
|
||||
setTimerForTokenRenewal(tokenToRenew);
|
||||
appTokens.get(applicationId).add(tokenToRenew);
|
||||
LOG.info("Received new token " + token);
|
||||
}
|
||||
}
|
||||
}
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
credentials.writeTokenStorageToStream(dob);
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
|
||||
}
|
||||
|
||||
protected Token<?>[] obtainSystemTokensForUser(String user,
|
||||
final Credentials credentials) throws IOException, InterruptedException {
|
||||
// Get new hdfs tokens on behalf of this user
|
||||
UserGroupInformation proxyUser =
|
||||
UserGroupInformation.createProxyUser(user,
|
||||
UserGroupInformation.getLoginUser());
|
||||
Token<?>[] newTokens =
|
||||
proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
|
||||
@Override
|
||||
public Token<?>[] run() throws Exception {
|
||||
return FileSystem.get(getConfig()).addDelegationTokens(
|
||||
UserGroupInformation.getLoginUser().getUserName(), credentials);
|
||||
}
|
||||
});
|
||||
return newTokens;
|
||||
}
|
||||
|
||||
// cancel a token
|
||||
|
@ -497,13 +618,13 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
*/
|
||||
private void removeFailedDelegationToken(DelegationTokenToRenew t) {
|
||||
ApplicationId applicationId = t.applicationId;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("removing failed delegation token for appid=" + applicationId +
|
||||
";t=" + t.token.getService());
|
||||
delegationTokens.remove(t);
|
||||
LOG.error("removing failed delegation token for appid=" + applicationId
|
||||
+ ";t=" + t.token.getService());
|
||||
appTokens.get(applicationId).remove(t);
|
||||
// cancel the timer
|
||||
if(t.timerTask!=null)
|
||||
if (t.timerTask != null) {
|
||||
t.timerTask.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -543,18 +664,21 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
}
|
||||
|
||||
private void removeApplicationFromRenewal(ApplicationId applicationId) {
|
||||
synchronized (delegationTokens) {
|
||||
Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
|
||||
while(it.hasNext()) {
|
||||
DelegationTokenToRenew dttr = it.next();
|
||||
if (dttr.applicationId.equals(applicationId)) {
|
||||
rmContext.getSystemCredentialsForApps().remove(applicationId);
|
||||
Set<DelegationTokenToRenew> tokens = appTokens.get(applicationId);
|
||||
|
||||
if (tokens != null && !tokens.isEmpty()) {
|
||||
synchronized (tokens) {
|
||||
Iterator<DelegationTokenToRenew> it = tokens.iterator();
|
||||
while (it.hasNext()) {
|
||||
DelegationTokenToRenew dttr = it.next();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removing delegation token for appId=" + applicationId +
|
||||
"; token=" + dttr.token.getService());
|
||||
LOG.debug("Removing delegation token for appId=" + applicationId
|
||||
+ "; token=" + dttr.token.getService());
|
||||
}
|
||||
|
||||
// cancel the timer
|
||||
if(dttr.timerTask!=null)
|
||||
if (dttr.timerTask != null)
|
||||
dttr.timerTask.cancel();
|
||||
|
||||
// cancel the token
|
||||
|
@ -670,17 +794,19 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
private static class DelegationTokenRenewerAppSubmitEvent extends
|
||||
static class DelegationTokenRenewerAppSubmitEvent extends
|
||||
DelegationTokenRenewerEvent {
|
||||
|
||||
private Credentials credentials;
|
||||
private boolean shouldCancelAtEnd;
|
||||
private String user;
|
||||
|
||||
public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
|
||||
Credentials credentails, boolean shouldCancelAtEnd) {
|
||||
Credentials credentails, boolean shouldCancelAtEnd, String user) {
|
||||
super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
|
||||
this.credentials = credentails;
|
||||
this.shouldCancelAtEnd = shouldCancelAtEnd;
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
public Credentials getCredentials() {
|
||||
|
@ -690,6 +816,10 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
public boolean shouldCancelAtEnd() {
|
||||
return shouldCancelAtEnd;
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return user;
|
||||
}
|
||||
}
|
||||
|
||||
enum DelegationTokenRenewerEventType {
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -54,6 +55,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -61,6 +63,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
|
@ -74,11 +77,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -88,16 +96,18 @@ import org.junit.Test;
|
|||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
/**
|
||||
* unit test -
|
||||
* tests addition/deletion/cancellation of renewals of delegation tokens
|
||||
*
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public class TestDelegationTokenRenewer {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestDelegationTokenRenewer.class);
|
||||
private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
|
||||
private static final Text KIND = new Text("HDFS_DELEGATION_TOKEN");
|
||||
|
||||
private static BlockingQueue<Event> eventQueue;
|
||||
private static volatile AtomicInteger counter;
|
||||
|
@ -125,6 +135,9 @@ public class TestDelegationTokenRenewer {
|
|||
|
||||
@Override
|
||||
public long renew(Token<?> t, Configuration conf) throws IOException {
|
||||
if ( !(t instanceof MyToken)) {
|
||||
return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
|
||||
}
|
||||
MyToken token = (MyToken)t;
|
||||
if(token.isCanceled()) {
|
||||
throw new InvalidToken("token has been canceled");
|
||||
|
@ -179,8 +192,10 @@ public class TestDelegationTokenRenewer {
|
|||
dispatcher = new AsyncDispatcher(eventQueue);
|
||||
Renewer.reset();
|
||||
delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(
|
||||
delegationTokenRenewer);
|
||||
when(mockContext.getDispatcher()).thenReturn(dispatcher);
|
||||
|
@ -290,9 +305,9 @@ public class TestDelegationTokenRenewer {
|
|||
Text user1= new Text("user1");
|
||||
|
||||
MyDelegationTokenSecretManager sm = new MyDelegationTokenSecretManager(
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
|
||||
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
|
||||
3600000, null);
|
||||
sm.startThreads();
|
||||
|
||||
|
@ -353,7 +368,7 @@ public class TestDelegationTokenRenewer {
|
|||
// register the tokens for renewal
|
||||
ApplicationId applicationId_0 =
|
||||
BuilderUtils.newApplicationId(0, 0);
|
||||
delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true);
|
||||
delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user");
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
|
||||
// first 3 initial renewals + 1 real
|
||||
|
@ -393,7 +408,7 @@ public class TestDelegationTokenRenewer {
|
|||
|
||||
|
||||
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
|
||||
delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true);
|
||||
delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user");
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
delegationTokenRenewer.applicationFinished(applicationId_1);
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
|
@ -429,7 +444,7 @@ public class TestDelegationTokenRenewer {
|
|||
|
||||
// register the tokens for renewal
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
|
||||
delegationTokenRenewer.addApplicationAsync(appId, ts, true);
|
||||
delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user");
|
||||
int waitCnt = 20;
|
||||
while (waitCnt-- >0) {
|
||||
if (!eventQueue.isEmpty()) {
|
||||
|
@ -473,7 +488,7 @@ public class TestDelegationTokenRenewer {
|
|||
|
||||
|
||||
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
|
||||
delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false);
|
||||
delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user");
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
delegationTokenRenewer.applicationFinished(applicationId_1);
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
|
@ -516,6 +531,8 @@ public class TestDelegationTokenRenewer {
|
|||
DelegationTokenRenewer localDtr =
|
||||
createNewDelegationTokenRenewer(lconf, counter);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(
|
||||
|
@ -540,7 +557,7 @@ public class TestDelegationTokenRenewer {
|
|||
|
||||
// register the tokens for renewal
|
||||
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
|
||||
localDtr.addApplicationAsync(applicationId_0, ts, true);
|
||||
localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
|
||||
waitForEventsToGetProcessed(localDtr);
|
||||
if (!eventQueue.isEmpty()){
|
||||
Event evt = eventQueue.take();
|
||||
|
@ -593,6 +610,8 @@ public class TestDelegationTokenRenewer {
|
|||
DelegationTokenRenewer localDtr =
|
||||
createNewDelegationTokenRenewer(conf, counter);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||
when(mockContext.getDelegationTokenRenewer()).thenReturn(
|
||||
|
@ -617,7 +636,7 @@ public class TestDelegationTokenRenewer {
|
|||
|
||||
// register the tokens for renewal
|
||||
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
|
||||
localDtr.addApplicationAsync(applicationId_0, ts, true);
|
||||
localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
|
||||
localDtr.applicationFinished(applicationId_0);
|
||||
waitForEventsToGetProcessed(delegationTokenRenewer);
|
||||
//Send another keep alive.
|
||||
|
@ -640,7 +659,7 @@ public class TestDelegationTokenRenewer {
|
|||
|
||||
private DelegationTokenRenewer createNewDelegationTokenRenewer(
|
||||
Configuration conf, final AtomicInteger counter) {
|
||||
return new DelegationTokenRenewer() {
|
||||
DelegationTokenRenewer renew = new DelegationTokenRenewer() {
|
||||
|
||||
@Override
|
||||
protected ThreadPoolExecutor
|
||||
|
@ -664,6 +683,8 @@ public class TestDelegationTokenRenewer {
|
|||
return pool;
|
||||
}
|
||||
};
|
||||
renew.setRMContext(TestUtils.getMockRMContext());
|
||||
return renew;
|
||||
}
|
||||
|
||||
private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
|
||||
|
@ -679,7 +700,12 @@ public class TestDelegationTokenRenewer {
|
|||
public void testDTRonAppSubmission()
|
||||
throws IOException, InterruptedException, BrokenBarrierException {
|
||||
final Credentials credsx = new Credentials();
|
||||
final Token<?> tokenx = mock(Token.class);
|
||||
final Token<DelegationTokenIdentifier> tokenx = mock(Token.class);
|
||||
when(tokenx.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
|
||||
DelegationTokenIdentifier dtId1 =
|
||||
new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
|
||||
new Text("user1"));
|
||||
when(tokenx.decodeIdentifier()).thenReturn(dtId1);
|
||||
credsx.addToken(new Text("token"), tokenx);
|
||||
doReturn(true).when(tokenx).isManaged();
|
||||
doThrow(new IOException("boom"))
|
||||
|
@ -688,6 +714,8 @@ public class TestDelegationTokenRenewer {
|
|||
final DelegationTokenRenewer dtr =
|
||||
createNewDelegationTokenRenewer(conf, counter);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||
InetSocketAddress sockAddr =
|
||||
|
@ -699,7 +727,7 @@ public class TestDelegationTokenRenewer {
|
|||
dtr.start();
|
||||
|
||||
try {
|
||||
dtr.addApplicationSync(mock(ApplicationId.class), credsx, false);
|
||||
dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user");
|
||||
fail("Catch IOException on app submission");
|
||||
} catch (IOException e){
|
||||
Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
|
||||
|
@ -716,7 +744,12 @@ public class TestDelegationTokenRenewer {
|
|||
|
||||
// this token uses barriers to block during renew
|
||||
final Credentials creds1 = new Credentials();
|
||||
final Token<?> token1 = mock(Token.class);
|
||||
final Token<DelegationTokenIdentifier> token1 = mock(Token.class);
|
||||
when(token1.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
|
||||
DelegationTokenIdentifier dtId1 =
|
||||
new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
|
||||
new Text("user1"));
|
||||
when(token1.decodeIdentifier()).thenReturn(dtId1);
|
||||
creds1.addToken(new Text("token"), token1);
|
||||
doReturn(true).when(token1).isManaged();
|
||||
doAnswer(new Answer<Long>() {
|
||||
|
@ -729,7 +762,9 @@ public class TestDelegationTokenRenewer {
|
|||
|
||||
// this dummy token fakes renewing
|
||||
final Credentials creds2 = new Credentials();
|
||||
final Token<?> token2 = mock(Token.class);
|
||||
final Token<DelegationTokenIdentifier> token2 = mock(Token.class);
|
||||
when(token2.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
|
||||
when(token2.decodeIdentifier()).thenReturn(dtId1);
|
||||
creds2.addToken(new Text("token"), token2);
|
||||
doReturn(true).when(token2).isManaged();
|
||||
doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
|
||||
|
@ -737,7 +772,9 @@ public class TestDelegationTokenRenewer {
|
|||
// fire up the renewer
|
||||
final DelegationTokenRenewer dtr =
|
||||
createNewDelegationTokenRenewer(conf, counter);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
RMContext mockContext = mock(RMContext.class);
|
||||
when(mockContext.getSystemCredentialsForApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>());
|
||||
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||
InetSocketAddress sockAddr =
|
||||
|
@ -751,14 +788,14 @@ public class TestDelegationTokenRenewer {
|
|||
Thread submitThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false);
|
||||
dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user");
|
||||
}
|
||||
};
|
||||
submitThread.start();
|
||||
|
||||
// wait till 1st submit blocks, then submit another
|
||||
startBarrier.await();
|
||||
dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false);
|
||||
dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user");
|
||||
// signal 1st to complete
|
||||
endBarrier.await();
|
||||
submitThread.join();
|
||||
|
@ -793,4 +830,139 @@ public class TestDelegationTokenRenewer {
|
|||
"Bad header found in token storage"));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test (timeout = 20000)
|
||||
public void testReplaceExpiringDelegationToken() throws Exception {
|
||||
conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
// create Token1:
|
||||
Text userText1 = new Text("user1");
|
||||
DelegationTokenIdentifier dtId1 =
|
||||
new DelegationTokenIdentifier(userText1, new Text("renewer1"),
|
||||
userText1);
|
||||
// set max date to 0 to simulate an expiring token;
|
||||
dtId1.setMaxDate(0);
|
||||
final Token<DelegationTokenIdentifier> token1 =
|
||||
new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
|
||||
"password1".getBytes(), dtId1.getKind(), new Text("service1"));
|
||||
|
||||
// create token2
|
||||
Text userText2 = new Text("user2");
|
||||
DelegationTokenIdentifier dtId2 =
|
||||
new DelegationTokenIdentifier(userText1, new Text("renewer2"),
|
||||
userText2);
|
||||
final Token<DelegationTokenIdentifier> expectedToken =
|
||||
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
||||
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
||||
|
||||
final MockRM rm = new TestSecurityMockRM(conf, null) {
|
||||
@Override
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
return new DelegationTokenRenewer() {
|
||||
@Override
|
||||
protected Token<?>[] obtainSystemTokensForUser(String user,
|
||||
final Credentials credentials) throws IOException {
|
||||
credentials.addToken(expectedToken.getService(), expectedToken);
|
||||
return new Token<?>[] { expectedToken };
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
rm.start();
|
||||
Credentials credentials = new Credentials();
|
||||
credentials.addToken(userText1, token1);
|
||||
|
||||
RMApp app =
|
||||
rm.submitApp(200, "name", "user",
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default", 1,
|
||||
credentials);
|
||||
|
||||
// wait for the initial expiring hdfs token to be removed.
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
public Boolean get() {
|
||||
return !rm.getRMContext().getDelegationTokenRenewer()
|
||||
.getDelegationTokens().contains(token1);
|
||||
}
|
||||
}, 1000, 20000);
|
||||
|
||||
// wait for the new retrieved hdfs token.
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
public Boolean get() {
|
||||
return rm.getRMContext().getDelegationTokenRenewer()
|
||||
.getDelegationTokens().contains(expectedToken);
|
||||
}
|
||||
}, 1000, 20000);
|
||||
|
||||
// check nm can retrieve the token
|
||||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
||||
ByteBuffer tokenBuffer =
|
||||
response.getSystemCredentialsForApps().get(app.getApplicationId());
|
||||
Assert.assertNotNull(tokenBuffer);
|
||||
Credentials appCredentials = new Credentials();
|
||||
DataInputByteBuffer buf = new DataInputByteBuffer();
|
||||
tokenBuffer.rewind();
|
||||
buf.reset(tokenBuffer);
|
||||
appCredentials.readTokenStorageStream(buf);
|
||||
Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
|
||||
}
|
||||
|
||||
// YARN will get the token for the app submitted without the delegation token.
|
||||
@Test
|
||||
public void testAppSubmissionWithoutDelegationToken() throws Exception {
|
||||
// create token2
|
||||
Text userText2 = new Text("user2");
|
||||
DelegationTokenIdentifier dtId2 =
|
||||
new DelegationTokenIdentifier(new Text("user2"), new Text("renewer2"),
|
||||
userText2);
|
||||
final Token<DelegationTokenIdentifier> token2 =
|
||||
new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
|
||||
"password2".getBytes(), dtId2.getKind(), new Text("service2"));
|
||||
final MockRM rm = new TestSecurityMockRM(conf, null) {
|
||||
@Override
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
return new DelegationTokenRenewer() {
|
||||
@Override
|
||||
protected Token<?>[] obtainSystemTokensForUser(String user,
|
||||
final Credentials credentials) throws IOException {
|
||||
credentials.addToken(token2.getService(), token2);
|
||||
return new Token<?>[] { token2 };
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
rm.start();
|
||||
|
||||
// submit an app without delegationToken
|
||||
RMApp app = rm.submitApp(200);
|
||||
|
||||
// wait for the new retrieved hdfs token.
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
public Boolean get() {
|
||||
return rm.getRMContext().getDelegationTokenRenewer()
|
||||
.getDelegationTokens().contains(token2);
|
||||
}
|
||||
}, 1000, 20000);
|
||||
|
||||
// check nm can retrieve the token
|
||||
final MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
||||
ByteBuffer tokenBuffer =
|
||||
response.getSystemCredentialsForApps().get(app.getApplicationId());
|
||||
Assert.assertNotNull(tokenBuffer);
|
||||
Credentials appCredentials = new Credentials();
|
||||
DataInputByteBuffer buf = new DataInputByteBuffer();
|
||||
tokenBuffer.rewind();
|
||||
buf.reset(tokenBuffer);
|
||||
appCredentials.readTokenStorageStream(buf);
|
||||
Assert.assertTrue(appCredentials.getAllTokens().contains(token2));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue