diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2090f32fb10..c0d0bdb81ff 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -334,6 +334,10 @@ Release 2.6.0 - UNRELEASED
YARN-2703. Added logUploadedTime into LogValue for better display. (Xuan Gong
via zjshen)
+ YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the
+ sake of localization and log-aggregation for long-running services. (Jian He
+ via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6ff19f8211d..20edbdcea6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -695,6 +695,10 @@ public class YarnConfiguration extends Configuration {
RM_PREFIX + "delegation-token-renewer.thread-count";
public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
+ public static final String RM_PROXY_USER_PRIVILEGES_ENABLED = RM_PREFIX
+ + "proxy-user-privileges.enabled";
+ public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
+
/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 090d3ce7bfd..6994360e6f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -553,6 +553,21 @@
30000
+
+ If true, ResourceManager will have proxy-user privileges.
+ Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to
+ do localization and log-aggregation on behalf of the user. If this is set to true,
+ ResourceManager is able to request new hdfs delegation tokens on behalf of
+ the user. This is needed by long-running-service, because the hdfs tokens
+ will eventually expire and YARN requires new valid tokens to do localization
+ and log-aggregation. Note that to enable this use case, the corresponding
+ HDFS NameNode has to configure ResourceManager as the proxy-user so that
+ ResourceManager can itself ask for new tokens on behalf of the user when
+ tokens are past their max-life-time.
+ yarn.resourcemanager.proxy-user-privileges.enabled
+ false
+
+
Interval for the roll over for the master key used to generate
application tokens
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 12e1f54478d..9fb44caf63e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
+import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,4 +60,11 @@ public interface NodeHeartbeatResponse {
String getDiagnosticsMessage();
void setDiagnosticsMessage(String diagnosticsMessage);
+
+ // Credentials (i.e. hdfs tokens) needed by NodeManagers for application
+ // localizations and logAggreations.
+ Map getSystemCredentialsForApps();
+
+ void setSystemCredentialsForApps(
+ Map systemCredentials);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 78979d5a27a..1e915143842 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -18,21 +18,26 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -49,6 +54,8 @@ public class NodeHeartbeatResponsePBImpl extends
private List containersToCleanup = null;
private List containersToBeRemovedFromNM = null;
private List applicationsToCleanup = null;
+ private Map systemCredentials = null;
+
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@@ -62,7 +69,7 @@ public class NodeHeartbeatResponsePBImpl extends
}
public NodeHeartbeatResponseProto getProto() {
- mergeLocalToProto();
+ mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
@@ -86,6 +93,19 @@ public class NodeHeartbeatResponsePBImpl extends
builder.setNmTokenMasterKey(
convertToProtoFormat(this.nmTokenMasterKey));
}
+ if (this.systemCredentials != null) {
+ addSystemCredentialsToProto();
+ }
+ }
+
+ private void addSystemCredentialsToProto() {
+ maybeInitBuilder();
+ builder.clearSystemCredentialsForApps();
+ for (Map.Entry entry : systemCredentials.entrySet()) {
+ builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder()
+ .setAppId(convertToProtoFormat(entry.getKey()))
+ .setCredentialsForApp(ProtoUtils.convertToProtoFormat(entry.getValue())));
+ }
}
private void mergeLocalToProto() {
@@ -387,6 +407,38 @@ public class NodeHeartbeatResponsePBImpl extends
builder.addAllApplicationsToCleanup(iterable);
}
+
+ @Override
+ public Map getSystemCredentialsForApps() {
+ if (this.systemCredentials != null) {
+ return this.systemCredentials;
+ }
+ initSystemCredentials();
+ return systemCredentials;
+ }
+
+ private void initSystemCredentials() {
+ NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getSystemCredentialsForAppsList();
+ this.systemCredentials = new HashMap ();
+ for (SystemCredentialsForAppsProto c : list) {
+ ApplicationId appId = convertFromProtoFormat(c.getAppId());
+ ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp());
+ this.systemCredentials.put(appId, byteBuffer);
+ }
+ }
+
+ @Override
+ public void setSystemCredentialsForApps(
+ Map systemCredentials) {
+ if (systemCredentials == null || systemCredentials.isEmpty()) {
+ return;
+ }
+ maybeInitBuilder();
+ this.systemCredentials = new HashMap();
+ this.systemCredentials.putAll(systemCredentials);
+ }
+
@Override
public long getNextHeartBeatInterval() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index d0990fb2b1b..f2d01ada027 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -59,6 +59,12 @@ message NodeHeartbeatResponseProto {
optional int64 nextHeartBeatInterval = 7;
optional string diagnostics_message = 8;
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
+ repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
+}
+
+message SystemCredentialsForAppsProto {
+ optional ApplicationIdProto appId = 1;
+ optional bytes credentialsForApp = 2;
}
message NMContainerStatusProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
index 716544506aa..ed902baa7ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
@@ -18,9 +18,18 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -29,10 +38,10 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@@ -93,4 +102,33 @@ public class TestProtocolRecords {
Assert.assertEquals(1, requestProto.getRunningApplications().size());
Assert.assertEquals(appId, requestProto.getRunningApplications().get(0));
}
+
+ @Test
+ public void testNodeHeartBeatResponse() throws IOException {
+ NodeHeartbeatResponse record =
+ Records.newRecord(NodeHeartbeatResponse.class);
+ Map appCredentials =
+ new HashMap();
+ Credentials app1Cred = new Credentials();
+
+ Token token1 =
+ new Token();
+ token1.setKind(new Text("kind1"));
+ app1Cred.addToken(new Text("token1"), token1);
+ Token token2 =
+ new Token();
+ token2.setKind(new Text("kind2"));
+ app1Cred.addToken(new Text("token2"), token2);
+
+ DataOutputBuffer dob = new DataOutputBuffer();
+ app1Cred.writeTokenStorageToStream(dob);
+ ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
+ record.setSystemCredentialsForApps(appCredentials);
+
+ NodeHeartbeatResponse proto =
+ new NodeHeartbeatResponsePBImpl(
+ ((NodeHeartbeatResponsePBImpl) record).getProto());
+ Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 956ea334935..6e7e2ec5db8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -54,6 +56,8 @@ public interface Context {
ConcurrentMap getApplications();
+ Map getSystemCredentialsForApps();
+
ConcurrentMap getContainers();
NMContainerTokenSecretManager getContainerTokenSecretManager();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 43770c188ca..22057f45674 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -32,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
@@ -313,6 +316,10 @@ public class NodeManager extends CompositeService
private NodeId nodeId = null;
protected final ConcurrentMap applications =
new ConcurrentHashMap();
+
+ private Map systemCredentials =
+ new HashMap();
+
protected final ConcurrentMap containers =
new ConcurrentSkipListMap();
@@ -420,6 +427,16 @@ public class NodeManager extends CompositeService
public void setDecommissioned(boolean isDecommissioned) {
this.isDecommissioned = isDecommissioned;
}
+
+ @Override
+ public Map getSystemCredentialsForApps() {
+ return systemCredentials;
+ }
+
+ public void setSystemCrendentials(
+ Map systemCredentials) {
+ this.systemCredentials = systemCredentials;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index bed58f51419..1c3ac5cff3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.net.ConnectException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -36,7 +37,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
@@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -525,6 +529,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return this.rmIdentifier;
}
+ private static Map parseCredentials(
+ Map systemCredentials) throws IOException {
+ Map map =
+ new HashMap();
+ for (Map.Entry entry : systemCredentials.entrySet()) {
+ Credentials credentials = new Credentials();
+ DataInputByteBuffer buf = new DataInputByteBuffer();
+ ByteBuffer buffer = entry.getValue();
+ buffer.rewind();
+ buf.reset(buffer);
+ credentials.readTokenStorageStream(buf);
+ map.put(entry.getKey(), credentials);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrieved credentials form RM: " + map);
+ }
+ return map;
+ }
+
protected void startStatusUpdater() {
statusUpdaterRunnable = new Runnable() {
@@ -598,6 +621,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}
+
+ Map systemCredentials =
+ response.getSystemCredentialsForApps();
+ if (systemCredentials != null && !systemCredentials.isEmpty()) {
+ ((NMContext) context)
+ .setSystemCrendentials(parseCredentials(systemCredentials));
+ }
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index e8001ffc628..35b232fea3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -186,7 +186,7 @@ public class ContainerManagerImpl extends CompositeService implements
this.metrics = metrics;
rsrcLocalizationSrvc =
- createResourceLocalizationService(exec, deletionContext);
+ createResourceLocalizationService(exec, deletionContext, context);
addService(rsrcLocalizationSrvc);
containersLauncher = createContainersLauncher(context, exec);
@@ -362,9 +362,9 @@ public class ContainerManagerImpl extends CompositeService implements
}
protected ResourceLocalizationService createResourceLocalizationService(
- ContainerExecutor exec, DeletionService deletionContext) {
+ ContainerExecutor exec, DeletionService deletionContext, Context context) {
return new ResourceLocalizationService(this.dispatcher, exec,
- deletionContext, dirsHandler, context.getNMStateStore());
+ deletionContext, dirsHandler, context);
}
protected ContainersLauncher createContainersLauncher(Context context,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index a6143a2aae9..549d8e7b291 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -83,11 +83,11 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -158,6 +158,7 @@ public class ResourceLocalizationService extends CompositeService
private LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler;
+ private Context nmContext;
/**
* Map of LocalResourceTrackers keyed by username, for private
@@ -177,7 +178,7 @@ public class ResourceLocalizationService extends CompositeService
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService,
- LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
+ LocalDirsHandlerService dirsHandler, Context context) {
super(ResourceLocalizationService.class.getName());
this.exec = exec;
@@ -189,7 +190,8 @@ public class ResourceLocalizationService extends CompositeService
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
- this.stateStore = stateStore;
+ this.stateStore = context.getNMStateStore();
+ this.nmContext = context;
}
FileContext getLocalFileContext(Configuration conf) {
@@ -1110,11 +1112,36 @@ public class ResourceLocalizationService extends CompositeService
}
}
+ private Credentials getSystemCredentialsSentFromRM(
+ LocalizerContext localizerContext) throws IOException {
+ ApplicationId appId =
+ localizerContext.getContainerId().getApplicationAttemptId()
+ .getApplicationId();
+ Credentials systemCredentials =
+ nmContext.getSystemCredentialsForApps().get(appId);
+ if (systemCredentials == null) {
+ return null;
+ }
+ LOG.info("Adding new framework tokens from RM for " + appId);
+ for (Token> token : systemCredentials.getAllTokens()) {
+ LOG.info("Adding new application-token for localization: " + token);
+ }
+ return systemCredentials;
+ }
+
private void writeCredentials(Path nmPrivateCTokensPath)
throws IOException {
DataOutputStream tokenOut = null;
try {
Credentials credentials = context.getCredentials();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Credentials systemCredentials =
+ getSystemCredentialsSentFromRM(context);
+ if (systemCredentials != null) {
+ credentials = systemCredentials;
+ }
+ }
+
FileContext lfs = getLocalFileContext(getConfig());
tokenOut =
lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 77176b7e263..cc717d7a06b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -39,9 +39,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -342,6 +344,18 @@ public class LogAggregationService extends AbstractService implements
Map appAcls,
LogAggregationContext logAggregationContext) {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Credentials systemCredentials =
+ context.getSystemCredentialsForApps().get(appId);
+ if (systemCredentials != null) {
+ LOG.info("Adding new framework tokens from RM for " + appId);
+ for (Token> token : systemCredentials.getAllTokens()) {
+ LOG.info("Adding new application-token for log-aggregation: " + token);
+ }
+ credentials = systemCredentials;
+ }
+ }
+
// Get user's FileSystem credentials
final UserGroupInformation userUgi =
UserGroupInformation.createRemoteUser(user);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index 5753fb8f20e..f872a55a6aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
public class DummyContainerManager extends ContainerManagerImpl {
@@ -74,9 +73,9 @@ public class DummyContainerManager extends ContainerManagerImpl {
@Override
@SuppressWarnings("unchecked")
protected ResourceLocalizationService createResourceLocalizationService(
- ContainerExecutor exec, DeletionService deletionContext) {
+ ContainerExecutor exec, DeletionService deletionContext, Context context) {
return new ResourceLocalizationService(super.dispatcher, exec,
- deletionContext, super.dirsHandler, new NMNullStateStoreService()) {
+ deletionContext, super.dirsHandler, context) {
@Override
public void handle(LocalizationEvent event) {
switch (event.getType()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 7593ce65091..5c2dd2cef1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -44,10 +44,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -561,6 +565,7 @@ public class TestNodeStatusUpdater {
// Test NodeStatusUpdater sends the right container statuses each time it
// heart beats.
+ private Credentials expectedCredentials = new Credentials();
private class MyResourceTracker4 implements ResourceTracker {
public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -576,6 +581,11 @@ public class TestNodeStatusUpdater {
createContainerStatus(5, ContainerState.COMPLETE);
public MyResourceTracker4(Context context) {
+ // create app Credentials
+ org.apache.hadoop.security.token.Token token1 =
+ new org.apache.hadoop.security.token.Token();
+ token1.setKind(new Text("kind1"));
+ expectedCredentials.addToken(new Text("token1"), token1);
this.context = context;
}
@@ -694,6 +704,14 @@ public class TestNodeStatusUpdater {
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
heartBeatNodeAction, null, null, null, null, 1000L);
nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
+ Map appCredentials =
+ new HashMap();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ expectedCredentials.writeTokenStorageToStream(dob);
+ ByteBuffer byteBuffer1 =
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
+ nhResponse.setSystemCredentialsForApps(appCredentials);
return nhResponse;
}
}
@@ -1293,6 +1311,8 @@ public class TestNodeStatusUpdater {
if(assertionFailedInThread.get()) {
Assert.fail("ContainerStatus Backup failed");
}
+ Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps()
+ .get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1")));
nm.stop();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 7850a1c665c..007fc36fcde 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -278,8 +278,7 @@ public class TestContainerManagerRecovery {
private ContainerManagerImpl createContainerManager(Context context) {
final LogHandler logHandler = mock(LogHandler.class);
final ResourceLocalizationService rsrcSrv =
- new ResourceLocalizationService(null, null, null, null,
- context.getNMStateStore()) {
+ new ResourceLocalizationService(null, null, null, null, context) {
@Override
public void serviceInit(Configuration conf) throws Exception {
}
@@ -320,7 +319,7 @@ public class TestContainerManagerRecovery {
@Override
protected ResourceLocalizationService createResourceLocalizationService(
- ContainerExecutor exec, DeletionService deletionContext) {
+ ContainerExecutor exec, DeletionService deletionContext, Context context) {
return rsrcSrv;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
index 503ce8c01c8..9e08b7f8e24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
@@ -23,7 +23,12 @@ import org.junit.Assert;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.Test;
public class TestLocalCacheDirectoryManager {
@@ -73,8 +78,12 @@ public class TestLocalCacheDirectoryManager {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
Exception e = null;
+ NMContext nmContext =
+ new NMContext(new NMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInNM(), null,
+ new ApplicationACLsManager(conf), new NMNullStateStoreService());
ResourceLocalizationService service =
- new ResourceLocalizationService(null, null, null, null, null);
+ new ResourceLocalizationService(null, null, null, null, nmContext);
try {
service.init(conf);
} catch (Exception e1) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index cf1e9fa714d..bf36651e941 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@@ -138,6 +139,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
@@ -159,7 +163,7 @@ public class TestResourceLocalizationService {
private Configuration conf;
private AbstractFileSystem spylfs;
private FileContext lfs;
-
+ private NMContext nmContext;
@BeforeClass
public static void setupClass() {
mockServer = mock(Server.class);
@@ -174,6 +178,9 @@ public class TestResourceLocalizationService {
String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+ nmContext = new NMContext(new NMContainerTokenSecretManager(
+ conf), new NMTokenSecretManagerInNM(), null,
+ new ApplicationACLsManager(conf), new NMNullStateStoreService());
}
@After
@@ -206,8 +213,7 @@ public class TestResourceLocalizationService {
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService,
- diskhandler,
- new NMNullStateStoreService()));
+ diskhandler, nmContext));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
@@ -268,8 +274,7 @@ public class TestResourceLocalizationService {
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService,
- diskhandler,
- nmStateStoreService));
+ diskhandler,nmContext));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
@@ -340,8 +345,7 @@ public class TestResourceLocalizationService {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
- dirsHandler,
- new NMNullStateStoreService());
+ dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@@ -751,8 +755,7 @@ public class TestResourceLocalizationService {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
- dirsHandler,
- new NMNullStateStoreService());
+ dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@@ -965,8 +968,7 @@ public class TestResourceLocalizationService {
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
- dirsHandler,
- new NMNullStateStoreService());
+ dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@@ -1075,7 +1077,7 @@ public class TestResourceLocalizationService {
try {
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
- dirsHandlerSpy, new NMNullStateStoreService());
+ dirsHandlerSpy, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(
@@ -1188,7 +1190,7 @@ public class TestResourceLocalizationService {
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
- localDirHandler, new NMNullStateStoreService());
+ localDirHandler, nmContext);
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
@@ -1341,7 +1343,7 @@ public class TestResourceLocalizationService {
ResourceLocalizationService rls =
new ResourceLocalizationService(dispatcher1, exec, delService,
- localDirHandler, new NMNullStateStoreService());
+ localDirHandler, nmContext);
dispatcher1.register(LocalizationEventType.class, rls);
rls.init(conf);
@@ -1507,7 +1509,7 @@ public class TestResourceLocalizationService {
// it as otherwise it will remove requests from pending queue.
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher1, exec, delService,
- dirsHandler, new NMNullStateStoreService());
+ dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
dispatcher1.register(LocalizationEventType.class, spyService);
spyService.init(conf);
@@ -1795,9 +1797,13 @@ public class TestResourceLocalizationService {
ContainerExecutor exec = mock(ContainerExecutor.class);
LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
DeletionService delService = mock(DeletionService.class);
+ NMContext nmContext =
+ new NMContext(new NMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInNM(), null,
+ new ApplicationACLsManager(conf), stateStore);
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
- dirsHandler, stateStore);
+ dirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
@@ -1861,7 +1867,7 @@ public class TestResourceLocalizationService {
// setup mocks
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
- mockDirsHandler, new NMNullStateStoreService());
+ mockDirsHandler, nmContext);
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 6e1b925e401..63333b8f62e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -278,7 +278,8 @@ public class RMAppManager implements EventHandler,
try {
credentials = parseCredentials(submissionContext);
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
- credentials, submissionContext.getCancelTokensWhenComplete());
+ credentials, submissionContext.getCancelTokensWhenComplete(),
+ application.getUser());
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
@@ -325,7 +326,8 @@ public class RMAppManager implements EventHandler,
credentials = parseCredentials(appContext);
// synchronously renew delegation token on recovery.
rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
- credentials, appContext.getCancelTokensWhenComplete());
+ credentials, appContext.getCancelTokensWhenComplete(),
+ application.getUser());
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
} catch (Exception e) {
LOG.warn("Unable to parse and renew delegation tokens.", e);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index e824634d4f9..56984e6f5d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -57,6 +58,8 @@ public interface RMContext {
ConcurrentMap getRMApps();
+ ConcurrentMap getSystemCredentialsForApps();
+
ConcurrentMap getInactiveRMNodes();
ConcurrentMap getRMNodes();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 076c3dd6cd9..7c1db3de49a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -67,6 +68,9 @@ public class RMContextImpl implements RMContext {
private final ConcurrentMap inactiveNodes
= new ConcurrentHashMap();
+ private final ConcurrentMap systemCredentials =
+ new ConcurrentHashMap();
+
private boolean isHAEnabled;
private boolean isWorkPreservingRecoveryEnabled;
private HAServiceState haServiceState =
@@ -444,4 +448,8 @@ public class RMContextImpl implements RMContext {
public void setSystemClock(Clock clock) {
this.systemClock = clock;
}
+
+ public ConcurrentMap getSystemCredentialsForApps() {
+ return systemCredentials;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 42228882721..a21e4720fbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -387,7 +390,7 @@ public class ResourceTrackerService extends AbstractService implements
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
.getResponseId()) {
LOG.info("Received duplicate heartbeat from node "
- + rmNode.getNodeAddress());
+ + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
return lastNodeHeartbeatResponse;
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
.getResponseId()) {
@@ -412,6 +415,12 @@ public class ResourceTrackerService extends AbstractService implements
populateKeys(request, nodeHeartBeatResponse);
+ ConcurrentMap systemCredentials =
+ rmContext.getSystemCredentialsForApps();
+ if (!systemCredentials.isEmpty()) {
+ nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
+ }
+
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index e0c32247dfa..2dc331e50bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
@@ -45,14 +47,20 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -82,12 +90,10 @@ public class DelegationTokenRenewer extends AbstractService {
private DelegationTokenCancelThread dtCancelThread =
new DelegationTokenCancelThread();
private ThreadPoolExecutor renewerService;
-
- // managing the list of tokens using Map
- // appId=>List
- private Set delegationTokens =
- Collections.synchronizedSet(new HashSet());
-
+
+ private ConcurrentMap> appTokens =
+ new ConcurrentHashMap>();
+
private final ConcurrentMap delayedRemovalMap =
new ConcurrentHashMap();
@@ -99,20 +105,33 @@ public class DelegationTokenRenewer extends AbstractService {
private LinkedBlockingQueue pendingEventQueue;
private boolean tokenKeepAliveEnabled;
-
+ private boolean hasProxyUserPrivileges;
+ private long credentialsValidTimeRemaining;
+
+ // this config is supposedly not used by end-users.
+ public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
+ YarnConfiguration.RM_PREFIX + "system-credentials.valid-time-remaining";
+ public static final long DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
+ 10800000; // 3h
+
public DelegationTokenRenewer() {
super(DelegationTokenRenewer.class.getName());
}
@Override
- protected synchronized void serviceInit(Configuration conf) throws Exception {
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.hasProxyUserPrivileges =
+ conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
+ YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
this.tokenKeepAliveEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
-
+ this.credentialsValidTimeRemaining =
+ conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING,
+ DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING);
setLocalSecretManagerAndServiceAddr();
renewerService = createNewThreadPoolService(conf);
pendingEventQueue = new LinkedBlockingQueue();
@@ -182,7 +201,7 @@ public class DelegationTokenRenewer extends AbstractService {
if (renewalTimer != null) {
renewalTimer.cancel();
}
- delegationTokens.clear();
+ appTokens.clear();
this.renewerService.shutdown();
dtCancelThread.interrupt();
try {
@@ -212,22 +231,28 @@ public class DelegationTokenRenewer extends AbstractService {
public long expirationDate;
public TimerTask timerTask;
public final boolean shouldCancelAtEnd;
-
- public DelegationTokenToRenew(
- ApplicationId jId, Token> token,
- Configuration conf, long expirationDate, boolean shouldCancelAtEnd) {
+ public long maxDate;
+ public String user;
+
+ public DelegationTokenToRenew(ApplicationId jId, Token> token,
+ Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
+ String user) {
this.token = token;
+ this.user = user;
+ if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+ try {
+ AbstractDelegationTokenIdentifier identifier =
+ (AbstractDelegationTokenIdentifier) token.decodeIdentifier();
+ maxDate = identifier.getMaxDate();
+ } catch (IOException e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
this.applicationId = jId;
this.conf = conf;
this.expirationDate = expirationDate;
this.timerTask = null;
this.shouldCancelAtEnd = shouldCancelAtEnd;
- if (this.token==null || this.applicationId==null || this.conf==null) {
- throw new IllegalArgumentException("Invalid params to renew token" +
- ";token=" + this.token +
- ";appId=" + this.applicationId +
- ";conf=" + this.conf);
- }
}
public void setTimerTask(TimerTask tTask) {
@@ -317,16 +342,14 @@ public class DelegationTokenRenewer extends AbstractService {
}
}
}
- //adding token
- private void addTokenToList(DelegationTokenToRenew t) {
- delegationTokens.add(t);
- }
@VisibleForTesting
public Set> getDelegationTokens() {
Set> tokens = new HashSet>();
- for(DelegationTokenToRenew delegationToken : delegationTokens) {
- tokens.add(delegationToken.token);
+ for (Set tokenList : appTokens.values()) {
+ for (DelegationTokenToRenew token : tokenList) {
+ tokens.add(token.token);
+ }
}
return tokens;
}
@@ -337,25 +360,28 @@ public class DelegationTokenRenewer extends AbstractService {
* @param ts tokens
* @param shouldCancelAtEnd true if tokens should be canceled when the app is
* done else false.
+ * @param user user
* @throws IOException
*/
public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
- boolean shouldCancelAtEnd) {
+ boolean shouldCancelAtEnd, String user) {
processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(
- applicationId, ts, shouldCancelAtEnd));
+ applicationId, ts, shouldCancelAtEnd, user));
}
/**
* Synchronously renew delegation tokens.
+ * @param user user
*/
public void addApplicationSync(ApplicationId applicationId, Credentials ts,
- boolean shouldCancelAtEnd) throws IOException{
+ boolean shouldCancelAtEnd, String user) throws IOException,
+ InterruptedException {
handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(
- applicationId, ts, shouldCancelAtEnd));
+ applicationId, ts, shouldCancelAtEnd, user));
}
private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
- throws IOException {
+ throws IOException, InterruptedException {
ApplicationId applicationId = evt.getApplicationId();
Credentials ts = evt.getCredentials();
boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
@@ -375,14 +401,21 @@ public class DelegationTokenRenewer extends AbstractService {
// all renewable tokens are valid
// At RM restart it is safe to assume that all the previously added tokens
// are valid
- List tokenList =
- new ArrayList();
+ appTokens.put(applicationId,
+ Collections.synchronizedSet(new HashSet()));
+ Set tokenList = new HashSet();
+ boolean hasHdfsToken = false;
for (Token> token : tokens) {
if (token.isManaged()) {
tokenList.add(new DelegationTokenToRenew(applicationId,
- token, getConfig(), now, shouldCancelAtEnd));
+ token, getConfig(), now, shouldCancelAtEnd, evt.getUser()));
+ if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+ LOG.info(applicationId + " found existing hdfs token " + token);
+ hasHdfsToken = true;
+ }
}
}
+
if (!tokenList.isEmpty()) {
// Renewing token and adding it to timer calls are separated purposefully
// If user provides incorrect token then it should not be added for
@@ -395,14 +428,15 @@ public class DelegationTokenRenewer extends AbstractService {
}
}
for (DelegationTokenToRenew dtr : tokenList) {
- addTokenToList(dtr);
+ appTokens.get(applicationId).add(dtr);
setTimerForTokenRenewal(dtr);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Registering token for renewal for:" + " service = "
- + dtr.token.getService() + " for appId = " + dtr.applicationId);
- }
}
}
+
+ if (!hasHdfsToken) {
+ requestNewHdfsDelegationToken(applicationId, evt.getUser(),
+ shouldCancelAtEnd);
+ }
}
/**
@@ -424,14 +458,16 @@ public class DelegationTokenRenewer extends AbstractService {
}
Token> token = dttr.token;
+
try {
- renewToken(dttr);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renewing delegation-token for:" + token.getService() +
- "; new expiration;" + dttr.expirationDate);
+ requestNewHdfsDelegationTokenIfNeeded(dttr);
+ // if the token is not replaced by a new token, renew the token
+ if (appTokens.get(dttr.applicationId).contains(dttr)) {
+ renewToken(dttr);
+ setTimerForTokenRenewal(dttr);// set the next one
+ } else {
+ LOG.info("The token was removed already. Token = [" +dttr +"]");
}
-
- setTimerForTokenRenewal(dttr);// set the next one
} catch (Exception e) {
LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
removeFailedDelegationToken(dttr);
@@ -455,12 +491,14 @@ public class DelegationTokenRenewer extends AbstractService {
// calculate timer time
long expiresIn = token.expirationDate - System.currentTimeMillis();
long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
-
// need to create new task every time
TimerTask tTask = new RenewalTimerTask(token);
token.setTimerTask(tTask); // keep reference to the timer
renewalTimer.schedule(token.timerTask, new Date(renewIn));
+
+ LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
+ + token.applicationId);
}
// renew a token
@@ -470,16 +508,99 @@ public class DelegationTokenRenewer extends AbstractService {
// need to use doAs so that http can find the kerberos tgt
// NOTE: token renewers should be responsible for the correct UGI!
try {
- dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
- new PrivilegedExceptionAction(){
- @Override
- public Long run() throws Exception {
- return dttr.token.renew(dttr.conf);
- }
- });
+ dttr.expirationDate =
+ UserGroupInformation.getLoginUser().doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public Long run() throws Exception {
+ return dttr.token.renew(dttr.conf);
+ }
+ });
} catch (InterruptedException e) {
throw new IOException(e);
}
+ LOG.info("Renewed delegation-token= [" + dttr + "], for "
+ + dttr.applicationId);
+ }
+
+ // Request new hdfs token if the token is about to expire, and remove the old
+ // token from the tokenToRenew list
+ private void requestNewHdfsDelegationTokenIfNeeded(
+ final DelegationTokenToRenew dttr) throws IOException,
+ InterruptedException {
+
+ if (hasProxyUserPrivileges
+ && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
+ && dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+
+ // remove all old expiring hdfs tokens for this application.
+ Set tokenSet = appTokens.get(dttr.applicationId);
+ if (tokenSet != null && !tokenSet.isEmpty()) {
+ Iterator iter = tokenSet.iterator();
+ synchronized (tokenSet) {
+ while (iter.hasNext()) {
+ DelegationTokenToRenew t = iter.next();
+ if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+ iter.remove();
+ if (t.timerTask != null) {
+ t.timerTask.cancel();
+ }
+ LOG.info("Removed expiring token " + t);
+ }
+ }
+ }
+ }
+ LOG.info("Token= (" + dttr + ") is expiring, request new token.");
+ requestNewHdfsDelegationToken(dttr.applicationId, dttr.user,
+ dttr.shouldCancelAtEnd);
+ }
+ }
+
+ private void requestNewHdfsDelegationToken(ApplicationId applicationId,
+ String user, boolean shouldCancelAtEnd) throws IOException,
+ InterruptedException {
+ // Get new hdfs tokens for this user
+ Credentials credentials = new Credentials();
+ Token>[] newTokens = obtainSystemTokensForUser(user, credentials);
+
+ // Add new tokens to the toRenew list.
+ LOG.info("Received new tokens for " + applicationId + ". Received "
+ + newTokens.length + " tokens.");
+ if (newTokens.length > 0) {
+ for (Token> token : newTokens) {
+ if (token.isManaged()) {
+ DelegationTokenToRenew tokenToRenew =
+ new DelegationTokenToRenew(applicationId, token, getConfig(),
+ Time.now(), shouldCancelAtEnd, user);
+ // renew the token to get the next expiration date.
+ renewToken(tokenToRenew);
+ setTimerForTokenRenewal(tokenToRenew);
+ appTokens.get(applicationId).add(tokenToRenew);
+ LOG.info("Received new token " + token);
+ }
+ }
+ }
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
+ }
+
+ protected Token>[] obtainSystemTokensForUser(String user,
+ final Credentials credentials) throws IOException, InterruptedException {
+ // Get new hdfs tokens on behalf of this user
+ UserGroupInformation proxyUser =
+ UserGroupInformation.createProxyUser(user,
+ UserGroupInformation.getLoginUser());
+ Token>[] newTokens =
+ proxyUser.doAs(new PrivilegedExceptionAction[]>() {
+ @Override
+ public Token>[] run() throws Exception {
+ return FileSystem.get(getConfig()).addDelegationTokens(
+ UserGroupInformation.getLoginUser().getUserName(), credentials);
+ }
+ });
+ return newTokens;
}
// cancel a token
@@ -497,13 +618,13 @@ public class DelegationTokenRenewer extends AbstractService {
*/
private void removeFailedDelegationToken(DelegationTokenToRenew t) {
ApplicationId applicationId = t.applicationId;
- if (LOG.isDebugEnabled())
- LOG.debug("removing failed delegation token for appid=" + applicationId +
- ";t=" + t.token.getService());
- delegationTokens.remove(t);
+ LOG.error("removing failed delegation token for appid=" + applicationId
+ + ";t=" + t.token.getService());
+ appTokens.get(applicationId).remove(t);
// cancel the timer
- if(t.timerTask!=null)
+ if (t.timerTask != null) {
t.timerTask.cancel();
+ }
}
/**
@@ -543,18 +664,21 @@ public class DelegationTokenRenewer extends AbstractService {
}
private void removeApplicationFromRenewal(ApplicationId applicationId) {
- synchronized (delegationTokens) {
- Iterator it = delegationTokens.iterator();
- while(it.hasNext()) {
- DelegationTokenToRenew dttr = it.next();
- if (dttr.applicationId.equals(applicationId)) {
+ rmContext.getSystemCredentialsForApps().remove(applicationId);
+ Set tokens = appTokens.get(applicationId);
+
+ if (tokens != null && !tokens.isEmpty()) {
+ synchronized (tokens) {
+ Iterator it = tokens.iterator();
+ while (it.hasNext()) {
+ DelegationTokenToRenew dttr = it.next();
if (LOG.isDebugEnabled()) {
- LOG.debug("Removing delegation token for appId=" + applicationId +
- "; token=" + dttr.token.getService());
+ LOG.debug("Removing delegation token for appId=" + applicationId
+ + "; token=" + dttr.token.getService());
}
// cancel the timer
- if(dttr.timerTask!=null)
+ if (dttr.timerTask != null)
dttr.timerTask.cancel();
// cancel the token
@@ -670,17 +794,19 @@ public class DelegationTokenRenewer extends AbstractService {
}
}
- private static class DelegationTokenRenewerAppSubmitEvent extends
+ static class DelegationTokenRenewerAppSubmitEvent extends
DelegationTokenRenewerEvent {
private Credentials credentials;
private boolean shouldCancelAtEnd;
+ private String user;
public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
- Credentials credentails, boolean shouldCancelAtEnd) {
+ Credentials credentails, boolean shouldCancelAtEnd, String user) {
super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
this.credentials = credentails;
this.shouldCancelAtEnd = shouldCancelAtEnd;
+ this.user = user;
}
public Credentials getCredentials() {
@@ -690,6 +816,10 @@ public class DelegationTokenRenewer extends AbstractService {
public boolean shouldCancelAtEnd() {
return shouldCancelAtEnd;
}
+
+ public String getUser() {
+ return user;
+ }
}
enum DelegationTokenRenewerEventType {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index f65fcdcb3b7..b824df73bb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -38,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -61,6 +63,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -74,11 +77,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
@@ -88,16 +96,18 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.google.common.base.Supplier;
+
/**
* unit test -
* tests addition/deletion/cancellation of renewals of delegation tokens
*
*/
-@SuppressWarnings("rawtypes")
+@SuppressWarnings({"rawtypes", "unchecked"})
public class TestDelegationTokenRenewer {
private static final Log LOG =
LogFactory.getLog(TestDelegationTokenRenewer.class);
- private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
+ private static final Text KIND = new Text("HDFS_DELEGATION_TOKEN");
private static BlockingQueue eventQueue;
private static volatile AtomicInteger counter;
@@ -125,6 +135,9 @@ public class TestDelegationTokenRenewer {
@Override
public long renew(Token> t, Configuration conf) throws IOException {
+ if ( !(t instanceof MyToken)) {
+ return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
+ }
MyToken token = (MyToken)t;
if(token.isCanceled()) {
throw new InvalidToken("token has been canceled");
@@ -179,8 +192,10 @@ public class TestDelegationTokenRenewer {
dispatcher = new AsyncDispatcher(eventQueue);
Renewer.reset();
delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
- RMContext mockContext = mock(RMContext.class);
+ RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
+ when(mockContext.getSystemCredentialsForApps()).thenReturn(
+ new ConcurrentHashMap());
when(mockContext.getDelegationTokenRenewer()).thenReturn(
delegationTokenRenewer);
when(mockContext.getDispatcher()).thenReturn(dispatcher);
@@ -290,9 +305,9 @@ public class TestDelegationTokenRenewer {
Text user1= new Text("user1");
MyDelegationTokenSecretManager sm = new MyDelegationTokenSecretManager(
- DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
3600000, null);
sm.startThreads();
@@ -353,7 +368,7 @@ public class TestDelegationTokenRenewer {
// register the tokens for renewal
ApplicationId applicationId_0 =
BuilderUtils.newApplicationId(0, 0);
- delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true);
+ delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user");
waitForEventsToGetProcessed(delegationTokenRenewer);
// first 3 initial renewals + 1 real
@@ -393,7 +408,7 @@ public class TestDelegationTokenRenewer {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true);
+ delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user");
waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -429,7 +444,7 @@ public class TestDelegationTokenRenewer {
// register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
- delegationTokenRenewer.addApplicationAsync(appId, ts, true);
+ delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user");
int waitCnt = 20;
while (waitCnt-- >0) {
if (!eventQueue.isEmpty()) {
@@ -473,7 +488,7 @@ public class TestDelegationTokenRenewer {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false);
+ delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user");
waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -516,6 +531,8 @@ public class TestDelegationTokenRenewer {
DelegationTokenRenewer localDtr =
createNewDelegationTokenRenewer(lconf, counter);
RMContext mockContext = mock(RMContext.class);
+ when(mockContext.getSystemCredentialsForApps()).thenReturn(
+ new ConcurrentHashMap());
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -540,7 +557,7 @@ public class TestDelegationTokenRenewer {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplicationAsync(applicationId_0, ts, true);
+ localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
waitForEventsToGetProcessed(localDtr);
if (!eventQueue.isEmpty()){
Event evt = eventQueue.take();
@@ -593,6 +610,8 @@ public class TestDelegationTokenRenewer {
DelegationTokenRenewer localDtr =
createNewDelegationTokenRenewer(conf, counter);
RMContext mockContext = mock(RMContext.class);
+ when(mockContext.getSystemCredentialsForApps()).thenReturn(
+ new ConcurrentHashMap());
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -617,7 +636,7 @@ public class TestDelegationTokenRenewer {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplicationAsync(applicationId_0, ts, true);
+ localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
localDtr.applicationFinished(applicationId_0);
waitForEventsToGetProcessed(delegationTokenRenewer);
//Send another keep alive.
@@ -640,7 +659,7 @@ public class TestDelegationTokenRenewer {
private DelegationTokenRenewer createNewDelegationTokenRenewer(
Configuration conf, final AtomicInteger counter) {
- return new DelegationTokenRenewer() {
+ DelegationTokenRenewer renew = new DelegationTokenRenewer() {
@Override
protected ThreadPoolExecutor
@@ -664,6 +683,8 @@ public class TestDelegationTokenRenewer {
return pool;
}
};
+ renew.setRMContext(TestUtils.getMockRMContext());
+ return renew;
}
private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
@@ -679,7 +700,12 @@ public class TestDelegationTokenRenewer {
public void testDTRonAppSubmission()
throws IOException, InterruptedException, BrokenBarrierException {
final Credentials credsx = new Credentials();
- final Token> tokenx = mock(Token.class);
+ final Token tokenx = mock(Token.class);
+ when(tokenx.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+ DelegationTokenIdentifier dtId1 =
+ new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
+ new Text("user1"));
+ when(tokenx.decodeIdentifier()).thenReturn(dtId1);
credsx.addToken(new Text("token"), tokenx);
doReturn(true).when(tokenx).isManaged();
doThrow(new IOException("boom"))
@@ -688,6 +714,8 @@ public class TestDelegationTokenRenewer {
final DelegationTokenRenewer dtr =
createNewDelegationTokenRenewer(conf, counter);
RMContext mockContext = mock(RMContext.class);
+ when(mockContext.getSystemCredentialsForApps()).thenReturn(
+ new ConcurrentHashMap());
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
@@ -699,7 +727,7 @@ public class TestDelegationTokenRenewer {
dtr.start();
try {
- dtr.addApplicationSync(mock(ApplicationId.class), credsx, false);
+ dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user");
fail("Catch IOException on app submission");
} catch (IOException e){
Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
@@ -716,7 +744,12 @@ public class TestDelegationTokenRenewer {
// this token uses barriers to block during renew
final Credentials creds1 = new Credentials();
- final Token> token1 = mock(Token.class);
+ final Token token1 = mock(Token.class);
+ when(token1.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+ DelegationTokenIdentifier dtId1 =
+ new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
+ new Text("user1"));
+ when(token1.decodeIdentifier()).thenReturn(dtId1);
creds1.addToken(new Text("token"), token1);
doReturn(true).when(token1).isManaged();
doAnswer(new Answer() {
@@ -729,7 +762,9 @@ public class TestDelegationTokenRenewer {
// this dummy token fakes renewing
final Credentials creds2 = new Credentials();
- final Token> token2 = mock(Token.class);
+ final Token token2 = mock(Token.class);
+ when(token2.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+ when(token2.decodeIdentifier()).thenReturn(dtId1);
creds2.addToken(new Text("token"), token2);
doReturn(true).when(token2).isManaged();
doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
@@ -737,7 +772,9 @@ public class TestDelegationTokenRenewer {
// fire up the renewer
final DelegationTokenRenewer dtr =
createNewDelegationTokenRenewer(conf, counter);
- RMContext mockContext = mock(RMContext.class);
+ RMContext mockContext = mock(RMContext.class);
+ when(mockContext.getSystemCredentialsForApps()).thenReturn(
+ new ConcurrentHashMap());
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
@@ -751,14 +788,14 @@ public class TestDelegationTokenRenewer {
Thread submitThread = new Thread() {
@Override
public void run() {
- dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false);
+ dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user");
}
};
submitThread.start();
// wait till 1st submit blocks, then submit another
startBarrier.await();
- dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false);
+ dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user");
// signal 1st to complete
endBarrier.await();
submitThread.join();
@@ -793,4 +830,139 @@ public class TestDelegationTokenRenewer {
"Bad header found in token storage"));
}
}
+
+
+ @Test (timeout = 20000)
+ public void testReplaceExpiringDelegationToken() throws Exception {
+ conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+
+ // create Token1:
+ Text userText1 = new Text("user1");
+ DelegationTokenIdentifier dtId1 =
+ new DelegationTokenIdentifier(userText1, new Text("renewer1"),
+ userText1);
+ // set max date to 0 to simulate an expiring token;
+ dtId1.setMaxDate(0);
+ final Token token1 =
+ new Token(dtId1.getBytes(),
+ "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+
+ // create token2
+ Text userText2 = new Text("user2");
+ DelegationTokenIdentifier dtId2 =
+ new DelegationTokenIdentifier(userText1, new Text("renewer2"),
+ userText2);
+ final Token expectedToken =
+ new Token(dtId2.getBytes(),
+ "password2".getBytes(), dtId2.getKind(), new Text("service2"));
+
+ final MockRM rm = new TestSecurityMockRM(conf, null) {
+ @Override
+ protected DelegationTokenRenewer createDelegationTokenRenewer() {
+ return new DelegationTokenRenewer() {
+ @Override
+ protected Token>[] obtainSystemTokensForUser(String user,
+ final Credentials credentials) throws IOException {
+ credentials.addToken(expectedToken.getService(), expectedToken);
+ return new Token>[] { expectedToken };
+ }
+ };
+ }
+ };
+ rm.start();
+ Credentials credentials = new Credentials();
+ credentials.addToken(userText1, token1);
+
+ RMApp app =
+ rm.submitApp(200, "name", "user",
+ new HashMap(), false, "default", 1,
+ credentials);
+
+ // wait for the initial expiring hdfs token to be removed.
+ GenericTestUtils.waitFor(new Supplier() {
+ public Boolean get() {
+ return !rm.getRMContext().getDelegationTokenRenewer()
+ .getDelegationTokens().contains(token1);
+ }
+ }, 1000, 20000);
+
+ // wait for the new retrieved hdfs token.
+ GenericTestUtils.waitFor(new Supplier() {
+ public Boolean get() {
+ return rm.getRMContext().getDelegationTokenRenewer()
+ .getDelegationTokens().contains(expectedToken);
+ }
+ }, 1000, 20000);
+
+ // check nm can retrieve the token
+ final MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+ nm1.registerNode();
+ NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+ ByteBuffer tokenBuffer =
+ response.getSystemCredentialsForApps().get(app.getApplicationId());
+ Assert.assertNotNull(tokenBuffer);
+ Credentials appCredentials = new Credentials();
+ DataInputByteBuffer buf = new DataInputByteBuffer();
+ tokenBuffer.rewind();
+ buf.reset(tokenBuffer);
+ appCredentials.readTokenStorageStream(buf);
+ Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
+ }
+
+ // YARN will get the token for the app submitted without the delegation token.
+ @Test
+ public void testAppSubmissionWithoutDelegationToken() throws Exception {
+ // create token2
+ Text userText2 = new Text("user2");
+ DelegationTokenIdentifier dtId2 =
+ new DelegationTokenIdentifier(new Text("user2"), new Text("renewer2"),
+ userText2);
+ final Token token2 =
+ new Token(dtId2.getBytes(),
+ "password2".getBytes(), dtId2.getKind(), new Text("service2"));
+ final MockRM rm = new TestSecurityMockRM(conf, null) {
+ @Override
+ protected DelegationTokenRenewer createDelegationTokenRenewer() {
+ return new DelegationTokenRenewer() {
+ @Override
+ protected Token>[] obtainSystemTokensForUser(String user,
+ final Credentials credentials) throws IOException {
+ credentials.addToken(token2.getService(), token2);
+ return new Token>[] { token2 };
+ }
+ };
+ }
+ };
+ rm.start();
+
+ // submit an app without delegationToken
+ RMApp app = rm.submitApp(200);
+
+ // wait for the new retrieved hdfs token.
+ GenericTestUtils.waitFor(new Supplier() {
+ public Boolean get() {
+ return rm.getRMContext().getDelegationTokenRenewer()
+ .getDelegationTokens().contains(token2);
+ }
+ }, 1000, 20000);
+
+ // check nm can retrieve the token
+ final MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+ nm1.registerNode();
+ NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+ ByteBuffer tokenBuffer =
+ response.getSystemCredentialsForApps().get(app.getApplicationId());
+ Assert.assertNotNull(tokenBuffer);
+ Credentials appCredentials = new Credentials();
+ DataInputByteBuffer buf = new DataInputByteBuffer();
+ tokenBuffer.rewind();
+ buf.reset(tokenBuffer);
+ appCredentials.readTokenStorageStream(buf);
+ Assert.assertTrue(appCredentials.getAllTokens().contains(token2));
+ }
}