From 0ad33e14834129fee3c5f7b93f6218069d2f40e9 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Mon, 27 Oct 2014 15:49:47 -0700 Subject: [PATCH] YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the sake of localization and log-aggregation for long-running services. Contributed by Jian He. (cherry picked from commit a16d022ca4313a41425c8e97841c841a2d6f2f54) --- hadoop-yarn-project/CHANGES.txt | 4 + .../hadoop/yarn/conf/YarnConfiguration.java | 4 + .../src/main/resources/yarn-default.xml | 15 + .../NodeHeartbeatResponse.java | 9 + .../impl/pb/NodeHeartbeatResponsePBImpl.java | 56 +++- .../yarn_server_common_service_protos.proto | 6 + .../protocolrecords/TestProtocolRecords.java | 42 ++- .../yarn/server/nodemanager/Context.java | 4 + .../yarn/server/nodemanager/NodeManager.java | 17 ++ .../nodemanager/NodeStatusUpdaterImpl.java | 30 ++ .../ContainerManagerImpl.java | 6 +- .../ResourceLocalizationService.java | 33 ++- .../logaggregation/LogAggregationService.java | 16 +- .../nodemanager/DummyContainerManager.java | 5 +- .../nodemanager/TestNodeStatusUpdater.java | 20 ++ .../TestContainerManagerRecovery.java | 5 +- .../TestLocalCacheDirectoryManager.java | 11 +- .../TestResourceLocalizationService.java | 40 +-- .../server/resourcemanager/RMAppManager.java | 6 +- .../server/resourcemanager/RMContext.java | 3 + .../server/resourcemanager/RMContextImpl.java | 8 + .../ResourceTrackerService.java | 11 +- .../security/DelegationTokenRenewer.java | 266 +++++++++++++----- .../security/TestDelegationTokenRenewer.java | 208 ++++++++++++-- 24 files changed, 701 insertions(+), 124 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f1ff99538de..b1bd6eb1045 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -370,6 +370,10 @@ Release 2.6.0 - UNRELEASED YARN-2703. Added logUploadedTime into LogValue for better display. (Xuan Gong via zjshen) + YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the + sake of localization and log-aggregation for long-running services. (Jian He + via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c545e444717..143f6ada5ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -695,6 +695,10 @@ public class YarnConfiguration extends Configuration { RM_PREFIX + "delegation-token-renewer.thread-count"; public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50; + public static final String RM_PROXY_USER_PRIVILEGES_ENABLED = RM_PREFIX + + "proxy-user-privileges.enabled"; + public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false; + /** Whether to enable log aggregation */ public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index ee7e2321684..8e3ccbcbc50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -553,6 +553,21 @@ 30000 + + If true, ResourceManager will have proxy-user privileges. + Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to + do localization and log-aggregation on behalf of the user. If this is set to true, + ResourceManager is able to request new hdfs delegation tokens on behalf of + the user. This is needed by long-running-service, because the hdfs tokens + will eventually expire and YARN requires new valid tokens to do localization + and log-aggregation. Note that to enable this use case, the corresponding + HDFS NameNode has to configure ResourceManager as the proxy-user so that + ResourceManager can itself ask for new tokens on behalf of the user when + tokens are past their max-life-time. + yarn.resourcemanager.proxy-user-privileges.enabled + false + + Interval for the roll over for the master key used to generate application tokens diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 12e1f54478d..9fb44caf63e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -58,4 +60,11 @@ public interface NodeHeartbeatResponse { String getDiagnosticsMessage(); void setDiagnosticsMessage(String diagnosticsMessage); + + // Credentials (i.e. hdfs tokens) needed by NodeManagers for application + // localizations and logAggreations. + Map getSystemCredentialsForApps(); + + void setSystemCredentialsForApps( + Map systemCredentials); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 78979d5a27a..1e915143842 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -18,21 +18,26 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -49,6 +54,8 @@ public class NodeHeartbeatResponsePBImpl extends private List containersToCleanup = null; private List containersToBeRemovedFromNM = null; private List applicationsToCleanup = null; + private Map systemCredentials = null; + private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -62,7 +69,7 @@ public class NodeHeartbeatResponsePBImpl extends } public NodeHeartbeatResponseProto getProto() { - mergeLocalToProto(); + mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; @@ -86,6 +93,19 @@ public class NodeHeartbeatResponsePBImpl extends builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.systemCredentials != null) { + addSystemCredentialsToProto(); + } + } + + private void addSystemCredentialsToProto() { + maybeInitBuilder(); + builder.clearSystemCredentialsForApps(); + for (Map.Entry entry : systemCredentials.entrySet()) { + builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder() + .setAppId(convertToProtoFormat(entry.getKey())) + .setCredentialsForApp(ProtoUtils.convertToProtoFormat(entry.getValue()))); + } } private void mergeLocalToProto() { @@ -387,6 +407,38 @@ public class NodeHeartbeatResponsePBImpl extends builder.addAllApplicationsToCleanup(iterable); } + + @Override + public Map getSystemCredentialsForApps() { + if (this.systemCredentials != null) { + return this.systemCredentials; + } + initSystemCredentials(); + return systemCredentials; + } + + private void initSystemCredentials() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getSystemCredentialsForAppsList(); + this.systemCredentials = new HashMap (); + for (SystemCredentialsForAppsProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp()); + this.systemCredentials.put(appId, byteBuffer); + } + } + + @Override + public void setSystemCredentialsForApps( + Map systemCredentials) { + if (systemCredentials == null || systemCredentials.isEmpty()) { + return; + } + maybeInitBuilder(); + this.systemCredentials = new HashMap(); + this.systemCredentials.putAll(systemCredentials); + } + @Override public long getNextHeartBeatInterval() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d0990fb2b1b..f2d01ada027 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -59,6 +59,12 @@ message NodeHeartbeatResponseProto { optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; repeated ContainerIdProto containers_to_be_removed_from_nm = 9; + repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; +} + +message SystemCredentialsForAppsProto { + optional ApplicationIdProto appId = 1; + optional bytes credentialsForApp = 2; } message NMContainerStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index 716544506aa..ed902baa7ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -18,9 +18,18 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -29,10 +38,10 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; +import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -93,4 +102,33 @@ public class TestProtocolRecords { Assert.assertEquals(1, requestProto.getRunningApplications().size()); Assert.assertEquals(appId, requestProto.getRunningApplications().get(0)); } + + @Test + public void testNodeHeartBeatResponse() throws IOException { + NodeHeartbeatResponse record = + Records.newRecord(NodeHeartbeatResponse.class); + Map appCredentials = + new HashMap(); + Credentials app1Cred = new Credentials(); + + Token token1 = + new Token(); + token1.setKind(new Text("kind1")); + app1Cred.addToken(new Text("token1"), token1); + Token token2 = + new Token(); + token2.setKind(new Text("kind2")); + app1Cred.addToken(new Text("token2"), token2); + + DataOutputBuffer dob = new DataOutputBuffer(); + app1Cred.writeTokenStorageToStream(dob); + ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1); + record.setSystemCredentialsForApps(appCredentials); + + NodeHeartbeatResponse proto = + new NodeHeartbeatResponsePBImpl( + ((NodeHeartbeatResponsePBImpl) record).getProto()); + Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 956ea334935..6e7e2ec5db8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.util.Map; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -54,6 +56,8 @@ public interface Context { ConcurrentMap getApplications(); + Map getSystemCredentialsForApps(); + ConcurrentMap getContainers(); NMContainerTokenSecretManager getContainerTokenSecretManager(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 43770c188ca..22057f45674 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -32,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ReflectionUtils; @@ -313,6 +316,10 @@ public class NodeManager extends CompositeService private NodeId nodeId = null; protected final ConcurrentMap applications = new ConcurrentHashMap(); + + private Map systemCredentials = + new HashMap(); + protected final ConcurrentMap containers = new ConcurrentSkipListMap(); @@ -420,6 +427,16 @@ public class NodeManager extends CompositeService public void setDecommissioned(boolean isDecommissioned) { this.isDecommissioned = isDecommissioned; } + + @Override + public Map getSystemCredentialsForApps() { + return systemCredentials; + } + + public void setSystemCrendentials( + Map systemCredentials) { + this.systemCredentials = systemCredentials; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index bed58f51419..1c3ac5cff3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; import java.net.ConnectException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -36,7 +37,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; @@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -525,6 +529,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return this.rmIdentifier; } + private static Map parseCredentials( + Map systemCredentials) throws IOException { + Map map = + new HashMap(); + for (Map.Entry entry : systemCredentials.entrySet()) { + Credentials credentials = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + ByteBuffer buffer = entry.getValue(); + buffer.rewind(); + buf.reset(buffer); + credentials.readTokenStorageStream(buf); + map.put(entry.getKey(), credentials); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Retrieved credentials form RM: " + map); + } + return map; + } + protected void startStatusUpdater() { statusUpdaterRunnable = new Runnable() { @@ -598,6 +621,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements new CMgrCompletedAppsEvent(appsToCleanup, CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); } + + Map systemCredentials = + response.getSystemCredentialsForApps(); + if (systemCredentials != null && !systemCredentials.isEmpty()) { + ((NMContext) context) + .setSystemCrendentials(parseCredentials(systemCredentials)); + } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e8001ffc628..35b232fea3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -186,7 +186,7 @@ public class ContainerManagerImpl extends CompositeService implements this.metrics = metrics; rsrcLocalizationSrvc = - createResourceLocalizationService(exec, deletionContext); + createResourceLocalizationService(exec, deletionContext, context); addService(rsrcLocalizationSrvc); containersLauncher = createContainersLauncher(context, exec); @@ -362,9 +362,9 @@ public class ContainerManagerImpl extends CompositeService implements } protected ResourceLocalizationService createResourceLocalizationService( - ContainerExecutor exec, DeletionService deletionContext) { + ContainerExecutor exec, DeletionService deletionContext, Context context) { return new ResourceLocalizationService(this.dispatcher, exec, - deletionContext, dirsHandler, context.getNMStateStore()); + deletionContext, dirsHandler, context); } protected ContainersLauncher createContainersLauncher(Context context, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index a6143a2aae9..549d8e7b291 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -83,11 +83,11 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -158,6 +158,7 @@ public class ResourceLocalizationService extends CompositeService private LocalResourcesTracker publicRsrc; private LocalDirsHandlerService dirsHandler; + private Context nmContext; /** * Map of LocalResourceTrackers keyed by username, for private @@ -177,7 +178,7 @@ public class ResourceLocalizationService extends CompositeService public ResourceLocalizationService(Dispatcher dispatcher, ContainerExecutor exec, DeletionService delService, - LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) { + LocalDirsHandlerService dirsHandler, Context context) { super(ResourceLocalizationService.class.getName()); this.exec = exec; @@ -189,7 +190,8 @@ public class ResourceLocalizationService extends CompositeService new ThreadFactoryBuilder() .setNameFormat("ResourceLocalizationService Cache Cleanup") .build()); - this.stateStore = stateStore; + this.stateStore = context.getNMStateStore(); + this.nmContext = context; } FileContext getLocalFileContext(Configuration conf) { @@ -1110,11 +1112,36 @@ public class ResourceLocalizationService extends CompositeService } } + private Credentials getSystemCredentialsSentFromRM( + LocalizerContext localizerContext) throws IOException { + ApplicationId appId = + localizerContext.getContainerId().getApplicationAttemptId() + .getApplicationId(); + Credentials systemCredentials = + nmContext.getSystemCredentialsForApps().get(appId); + if (systemCredentials == null) { + return null; + } + LOG.info("Adding new framework tokens from RM for " + appId); + for (Token token : systemCredentials.getAllTokens()) { + LOG.info("Adding new application-token for localization: " + token); + } + return systemCredentials; + } + private void writeCredentials(Path nmPrivateCTokensPath) throws IOException { DataOutputStream tokenOut = null; try { Credentials credentials = context.getCredentials(); + if (UserGroupInformation.isSecurityEnabled()) { + Credentials systemCredentials = + getSystemCredentialsSentFromRM(context); + if (systemCredentials != null) { + credentials = systemCredentials; + } + } + FileContext lfs = getLocalFileContext(getConfig()); tokenOut = lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 77176b7e263..cc717d7a06b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -39,9 +39,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; @@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; + import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -342,6 +344,18 @@ public class LogAggregationService extends AbstractService implements Map appAcls, LogAggregationContext logAggregationContext) { + if (UserGroupInformation.isSecurityEnabled()) { + Credentials systemCredentials = + context.getSystemCredentialsForApps().get(appId); + if (systemCredentials != null) { + LOG.info("Adding new framework tokens from RM for " + appId); + for (Token token : systemCredentials.getAllTokens()) { + LOG.info("Adding new application-token for log-aggregation: " + token); + } + credentials = systemCredentials; + } + } + // Get user's FileSystem credentials final UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index 5753fb8f20e..f872a55a6aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; -import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; public class DummyContainerManager extends ContainerManagerImpl { @@ -74,9 +73,9 @@ public class DummyContainerManager extends ContainerManagerImpl { @Override @SuppressWarnings("unchecked") protected ResourceLocalizationService createResourceLocalizationService( - ContainerExecutor exec, DeletionService deletionContext) { + ContainerExecutor exec, DeletionService deletionContext, Context context) { return new ResourceLocalizationService(super.dispatcher, exec, - deletionContext, super.dirsHandler, new NMNullStateStoreService()) { + deletionContext, super.dirsHandler, context) { @Override public void handle(LocalizationEvent event) { switch (event.getType()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 7593ce65091..5c2dd2cef1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -44,10 +44,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -561,6 +565,7 @@ public class TestNodeStatusUpdater { // Test NodeStatusUpdater sends the right container statuses each time it // heart beats. + private Credentials expectedCredentials = new Credentials(); private class MyResourceTracker4 implements ResourceTracker { public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -576,6 +581,11 @@ public class TestNodeStatusUpdater { createContainerStatus(5, ContainerState.COMPLETE); public MyResourceTracker4(Context context) { + // create app Credentials + org.apache.hadoop.security.token.Token token1 = + new org.apache.hadoop.security.token.Token(); + token1.setKind(new Text("kind1")); + expectedCredentials.addToken(new Text("token1"), token1); this.context = context; } @@ -694,6 +704,14 @@ public class TestNodeStatusUpdater { YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM); + Map appCredentials = + new HashMap(); + DataOutputBuffer dob = new DataOutputBuffer(); + expectedCredentials.writeTokenStorageToStream(dob); + ByteBuffer byteBuffer1 = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1); + nhResponse.setSystemCredentialsForApps(appCredentials); return nhResponse; } } @@ -1293,6 +1311,8 @@ public class TestNodeStatusUpdater { if(assertionFailedInThread.get()) { Assert.fail("ContainerStatus Backup failed"); } + Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps() + .get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1"))); nm.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 7850a1c665c..007fc36fcde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -278,8 +278,7 @@ public class TestContainerManagerRecovery { private ContainerManagerImpl createContainerManager(Context context) { final LogHandler logHandler = mock(LogHandler.class); final ResourceLocalizationService rsrcSrv = - new ResourceLocalizationService(null, null, null, null, - context.getNMStateStore()) { + new ResourceLocalizationService(null, null, null, null, context) { @Override public void serviceInit(Configuration conf) throws Exception { } @@ -320,7 +319,7 @@ public class TestContainerManagerRecovery { @Override protected ResourceLocalizationService createResourceLocalizationService( - ContainerExecutor exec, DeletionService deletionContext) { + ContainerExecutor exec, DeletionService deletionContext, Context context) { return rsrcSrv; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java index 503ce8c01c8..9e08b7f8e24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java @@ -23,7 +23,12 @@ import org.junit.Assert; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.junit.Test; public class TestLocalCacheDirectoryManager { @@ -73,8 +78,12 @@ public class TestLocalCacheDirectoryManager { YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1"); Exception e = null; + NMContext nmContext = + new NMContext(new NMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInNM(), null, + new ApplicationACLsManager(conf), new NMNullStateStoreService()); ResourceLocalizationService service = - new ResourceLocalizationService(null, null, null, null, null); + new ResourceLocalizationService(null, null, null, null, nmContext); try { service.init(conf); } catch (Exception e1) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index cf1e9fa714d..bf36651e941 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; @@ -138,6 +139,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; @@ -159,7 +163,7 @@ public class TestResourceLocalizationService { private Configuration conf; private AbstractFileSystem spylfs; private FileContext lfs; - + private NMContext nmContext; @BeforeClass public static void setupClass() { mockServer = mock(Server.class); @@ -174,6 +178,9 @@ public class TestResourceLocalizationService { String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString(); conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + nmContext = new NMContext(new NMContainerTokenSecretManager( + conf), new NMTokenSecretManagerInNM(), null, + new ApplicationACLsManager(conf), new NMNullStateStoreService()); } @After @@ -206,8 +213,7 @@ public class TestResourceLocalizationService { ResourceLocalizationService locService = spy(new ResourceLocalizationService(dispatcher, exec, delService, - diskhandler, - new NMNullStateStoreService())); + diskhandler, nmContext)); doReturn(lfs) .when(locService).getLocalFileContext(isA(Configuration.class)); try { @@ -268,8 +274,7 @@ public class TestResourceLocalizationService { ResourceLocalizationService locService = spy(new ResourceLocalizationService(dispatcher, exec, delService, - diskhandler, - nmStateStoreService)); + diskhandler,nmContext)); doReturn(lfs) .when(locService).getLocalFileContext(isA(Configuration.class)); try { @@ -340,8 +345,7 @@ public class TestResourceLocalizationService { ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, - dirsHandler, - new NMNullStateStoreService()); + dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker( @@ -751,8 +755,7 @@ public class TestResourceLocalizationService { ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, - dirsHandler, - new NMNullStateStoreService()); + dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class)); @@ -965,8 +968,7 @@ public class TestResourceLocalizationService { try { ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, - dirsHandler, - new NMNullStateStoreService()); + dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(lfs).when(spyService).getLocalFileContext( @@ -1075,7 +1077,7 @@ public class TestResourceLocalizationService { try { ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, - dirsHandlerSpy, new NMNullStateStoreService()); + dirsHandlerSpy, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(lfs).when(spyService).getLocalFileContext( @@ -1188,7 +1190,7 @@ public class TestResourceLocalizationService { ResourceLocalizationService rls = new ResourceLocalizationService(dispatcher1, exec, delService, - localDirHandler, new NMNullStateStoreService()); + localDirHandler, nmContext); dispatcher1.register(LocalizationEventType.class, rls); rls.init(conf); @@ -1341,7 +1343,7 @@ public class TestResourceLocalizationService { ResourceLocalizationService rls = new ResourceLocalizationService(dispatcher1, exec, delService, - localDirHandler, new NMNullStateStoreService()); + localDirHandler, nmContext); dispatcher1.register(LocalizationEventType.class, rls); rls.init(conf); @@ -1507,7 +1509,7 @@ public class TestResourceLocalizationService { // it as otherwise it will remove requests from pending queue. ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher1, exec, delService, - dirsHandler, new NMNullStateStoreService()); + dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); dispatcher1.register(LocalizationEventType.class, spyService); spyService.init(conf); @@ -1795,9 +1797,13 @@ public class TestResourceLocalizationService { ContainerExecutor exec = mock(ContainerExecutor.class); LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class); DeletionService delService = mock(DeletionService.class); + NMContext nmContext = + new NMContext(new NMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInNM(), null, + new ApplicationACLsManager(conf), stateStore); ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, - dirsHandler, stateStore); + dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker( @@ -1861,7 +1867,7 @@ public class TestResourceLocalizationService { // setup mocks ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, - mockDirsHandler, new NMNullStateStoreService()); + mockDirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 6e1b925e401..63333b8f62e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -278,7 +278,8 @@ public class RMAppManager implements EventHandler, try { credentials = parseCredentials(submissionContext); this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, - credentials, submissionContext.getCancelTokensWhenComplete()); + credentials, submissionContext.getCancelTokensWhenComplete(), + application.getUser()); } catch (Exception e) { LOG.warn("Unable to parse credentials.", e); // Sending APP_REJECTED is fine, since we assume that the @@ -325,7 +326,8 @@ public class RMAppManager implements EventHandler, credentials = parseCredentials(appContext); // synchronously renew delegation token on recovery. rmContext.getDelegationTokenRenewer().addApplicationSync(appId, - credentials, appContext.getCancelTokensWhenComplete()); + credentials, appContext.getCancelTokensWhenComplete(), + application.getUser()); application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER)); } catch (Exception e) { LOG.warn("Unable to parse and renew delegation tokens.", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index e824634d4f9..56984e6f5d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; @@ -57,6 +58,8 @@ public interface RMContext { ConcurrentMap getRMApps(); + ConcurrentMap getSystemCredentialsForApps(); + ConcurrentMap getInactiveRMNodes(); ConcurrentMap getRMNodes(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 076c3dd6cd9..7c1db3de49a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -67,6 +68,9 @@ public class RMContextImpl implements RMContext { private final ConcurrentMap inactiveNodes = new ConcurrentHashMap(); + private final ConcurrentMap systemCredentials = + new ConcurrentHashMap(); + private boolean isHAEnabled; private boolean isWorkPreservingRecoveryEnabled; private HAServiceState haServiceState = @@ -444,4 +448,8 @@ public class RMContextImpl implements RMContext { public void setSystemClock(Clock clock) { this.systemClock = clock; } + + public ConcurrentMap getSystemCredentialsForApps() { + return systemCredentials; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f5583bcfcc9..4beb895a8ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +33,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -385,7 +388,7 @@ public class ResourceTrackerService extends AbstractService implements if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse .getResponseId()) { LOG.info("Received duplicate heartbeat from node " - + rmNode.getNodeAddress()); + + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId()); return lastNodeHeartbeatResponse; } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse .getResponseId()) { @@ -410,6 +413,12 @@ public class ResourceTrackerService extends AbstractService implements populateKeys(request, nodeHeartBeatResponse); + ConcurrentMap systemCredentials = + rmContext.getSystemCredentialsForApps(); + if (!systemCredentials.isEmpty()) { + nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); + } + // 4. Send status to RMNode, saving the latest response. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index e0c32247dfa..2dc331e50bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; @@ -45,14 +47,20 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -82,12 +90,10 @@ public class DelegationTokenRenewer extends AbstractService { private DelegationTokenCancelThread dtCancelThread = new DelegationTokenCancelThread(); private ThreadPoolExecutor renewerService; - - // managing the list of tokens using Map - // appId=>List - private Set delegationTokens = - Collections.synchronizedSet(new HashSet()); - + + private ConcurrentMap> appTokens = + new ConcurrentHashMap>(); + private final ConcurrentMap delayedRemovalMap = new ConcurrentHashMap(); @@ -99,20 +105,33 @@ public class DelegationTokenRenewer extends AbstractService { private LinkedBlockingQueue pendingEventQueue; private boolean tokenKeepAliveEnabled; - + private boolean hasProxyUserPrivileges; + private long credentialsValidTimeRemaining; + + // this config is supposedly not used by end-users. + public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING = + YarnConfiguration.RM_PREFIX + "system-credentials.valid-time-remaining"; + public static final long DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING = + 10800000; // 3h + public DelegationTokenRenewer() { super(DelegationTokenRenewer.class.getName()); } @Override - protected synchronized void serviceInit(Configuration conf) throws Exception { + protected void serviceInit(Configuration conf) throws Exception { + this.hasProxyUserPrivileges = + conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED); this.tokenKeepAliveEnabled = conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); this.tokenRemovalDelayMs = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); - + this.credentialsValidTimeRemaining = + conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING, + DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING); setLocalSecretManagerAndServiceAddr(); renewerService = createNewThreadPoolService(conf); pendingEventQueue = new LinkedBlockingQueue(); @@ -182,7 +201,7 @@ public class DelegationTokenRenewer extends AbstractService { if (renewalTimer != null) { renewalTimer.cancel(); } - delegationTokens.clear(); + appTokens.clear(); this.renewerService.shutdown(); dtCancelThread.interrupt(); try { @@ -212,22 +231,28 @@ public class DelegationTokenRenewer extends AbstractService { public long expirationDate; public TimerTask timerTask; public final boolean shouldCancelAtEnd; - - public DelegationTokenToRenew( - ApplicationId jId, Token token, - Configuration conf, long expirationDate, boolean shouldCancelAtEnd) { + public long maxDate; + public String user; + + public DelegationTokenToRenew(ApplicationId jId, Token token, + Configuration conf, long expirationDate, boolean shouldCancelAtEnd, + String user) { this.token = token; + this.user = user; + if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { + try { + AbstractDelegationTokenIdentifier identifier = + (AbstractDelegationTokenIdentifier) token.decodeIdentifier(); + maxDate = identifier.getMaxDate(); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } + } this.applicationId = jId; this.conf = conf; this.expirationDate = expirationDate; this.timerTask = null; this.shouldCancelAtEnd = shouldCancelAtEnd; - if (this.token==null || this.applicationId==null || this.conf==null) { - throw new IllegalArgumentException("Invalid params to renew token" + - ";token=" + this.token + - ";appId=" + this.applicationId + - ";conf=" + this.conf); - } } public void setTimerTask(TimerTask tTask) { @@ -317,16 +342,14 @@ public class DelegationTokenRenewer extends AbstractService { } } } - //adding token - private void addTokenToList(DelegationTokenToRenew t) { - delegationTokens.add(t); - } @VisibleForTesting public Set> getDelegationTokens() { Set> tokens = new HashSet>(); - for(DelegationTokenToRenew delegationToken : delegationTokens) { - tokens.add(delegationToken.token); + for (Set tokenList : appTokens.values()) { + for (DelegationTokenToRenew token : tokenList) { + tokens.add(token.token); + } } return tokens; } @@ -337,25 +360,28 @@ public class DelegationTokenRenewer extends AbstractService { * @param ts tokens * @param shouldCancelAtEnd true if tokens should be canceled when the app is * done else false. + * @param user user * @throws IOException */ public void addApplicationAsync(ApplicationId applicationId, Credentials ts, - boolean shouldCancelAtEnd) { + boolean shouldCancelAtEnd, String user) { processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent( - applicationId, ts, shouldCancelAtEnd)); + applicationId, ts, shouldCancelAtEnd, user)); } /** * Synchronously renew delegation tokens. + * @param user user */ public void addApplicationSync(ApplicationId applicationId, Credentials ts, - boolean shouldCancelAtEnd) throws IOException{ + boolean shouldCancelAtEnd, String user) throws IOException, + InterruptedException { handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent( - applicationId, ts, shouldCancelAtEnd)); + applicationId, ts, shouldCancelAtEnd, user)); } private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) - throws IOException { + throws IOException, InterruptedException { ApplicationId applicationId = evt.getApplicationId(); Credentials ts = evt.getCredentials(); boolean shouldCancelAtEnd = evt.shouldCancelAtEnd(); @@ -375,14 +401,21 @@ public class DelegationTokenRenewer extends AbstractService { // all renewable tokens are valid // At RM restart it is safe to assume that all the previously added tokens // are valid - List tokenList = - new ArrayList(); + appTokens.put(applicationId, + Collections.synchronizedSet(new HashSet())); + Set tokenList = new HashSet(); + boolean hasHdfsToken = false; for (Token token : tokens) { if (token.isManaged()) { tokenList.add(new DelegationTokenToRenew(applicationId, - token, getConfig(), now, shouldCancelAtEnd)); + token, getConfig(), now, shouldCancelAtEnd, evt.getUser())); + if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { + LOG.info(applicationId + " found existing hdfs token " + token); + hasHdfsToken = true; + } } } + if (!tokenList.isEmpty()) { // Renewing token and adding it to timer calls are separated purposefully // If user provides incorrect token then it should not be added for @@ -395,14 +428,15 @@ public class DelegationTokenRenewer extends AbstractService { } } for (DelegationTokenToRenew dtr : tokenList) { - addTokenToList(dtr); + appTokens.get(applicationId).add(dtr); setTimerForTokenRenewal(dtr); - if (LOG.isDebugEnabled()) { - LOG.debug("Registering token for renewal for:" + " service = " - + dtr.token.getService() + " for appId = " + dtr.applicationId); - } } } + + if (!hasHdfsToken) { + requestNewHdfsDelegationToken(applicationId, evt.getUser(), + shouldCancelAtEnd); + } } /** @@ -424,14 +458,16 @@ public class DelegationTokenRenewer extends AbstractService { } Token token = dttr.token; + try { - renewToken(dttr); - if (LOG.isDebugEnabled()) { - LOG.debug("Renewing delegation-token for:" + token.getService() + - "; new expiration;" + dttr.expirationDate); + requestNewHdfsDelegationTokenIfNeeded(dttr); + // if the token is not replaced by a new token, renew the token + if (appTokens.get(dttr.applicationId).contains(dttr)) { + renewToken(dttr); + setTimerForTokenRenewal(dttr);// set the next one + } else { + LOG.info("The token was removed already. Token = [" +dttr +"]"); } - - setTimerForTokenRenewal(dttr);// set the next one } catch (Exception e) { LOG.error("Exception renewing token" + token + ". Not rescheduled", e); removeFailedDelegationToken(dttr); @@ -455,12 +491,14 @@ public class DelegationTokenRenewer extends AbstractService { // calculate timer time long expiresIn = token.expirationDate - System.currentTimeMillis(); long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration - // need to create new task every time TimerTask tTask = new RenewalTimerTask(token); token.setTimerTask(tTask); // keep reference to the timer renewalTimer.schedule(token.timerTask, new Date(renewIn)); + + LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = " + + token.applicationId); } // renew a token @@ -470,16 +508,99 @@ public class DelegationTokenRenewer extends AbstractService { // need to use doAs so that http can find the kerberos tgt // NOTE: token renewers should be responsible for the correct UGI! try { - dttr.expirationDate = UserGroupInformation.getLoginUser().doAs( - new PrivilegedExceptionAction(){ - @Override - public Long run() throws Exception { - return dttr.token.renew(dttr.conf); - } - }); + dttr.expirationDate = + UserGroupInformation.getLoginUser().doAs( + new PrivilegedExceptionAction() { + @Override + public Long run() throws Exception { + return dttr.token.renew(dttr.conf); + } + }); } catch (InterruptedException e) { throw new IOException(e); } + LOG.info("Renewed delegation-token= [" + dttr + "], for " + + dttr.applicationId); + } + + // Request new hdfs token if the token is about to expire, and remove the old + // token from the tokenToRenew list + private void requestNewHdfsDelegationTokenIfNeeded( + final DelegationTokenToRenew dttr) throws IOException, + InterruptedException { + + if (hasProxyUserPrivileges + && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining + && dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { + + // remove all old expiring hdfs tokens for this application. + Set tokenSet = appTokens.get(dttr.applicationId); + if (tokenSet != null && !tokenSet.isEmpty()) { + Iterator iter = tokenSet.iterator(); + synchronized (tokenSet) { + while (iter.hasNext()) { + DelegationTokenToRenew t = iter.next(); + if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { + iter.remove(); + if (t.timerTask != null) { + t.timerTask.cancel(); + } + LOG.info("Removed expiring token " + t); + } + } + } + } + LOG.info("Token= (" + dttr + ") is expiring, request new token."); + requestNewHdfsDelegationToken(dttr.applicationId, dttr.user, + dttr.shouldCancelAtEnd); + } + } + + private void requestNewHdfsDelegationToken(ApplicationId applicationId, + String user, boolean shouldCancelAtEnd) throws IOException, + InterruptedException { + // Get new hdfs tokens for this user + Credentials credentials = new Credentials(); + Token[] newTokens = obtainSystemTokensForUser(user, credentials); + + // Add new tokens to the toRenew list. + LOG.info("Received new tokens for " + applicationId + ". Received " + + newTokens.length + " tokens."); + if (newTokens.length > 0) { + for (Token token : newTokens) { + if (token.isManaged()) { + DelegationTokenToRenew tokenToRenew = + new DelegationTokenToRenew(applicationId, token, getConfig(), + Time.now(), shouldCancelAtEnd, user); + // renew the token to get the next expiration date. + renewToken(tokenToRenew); + setTimerForTokenRenewal(tokenToRenew); + appTokens.get(applicationId).add(tokenToRenew); + LOG.info("Received new token " + token); + } + } + } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer); + } + + protected Token[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException, InterruptedException { + // Get new hdfs tokens on behalf of this user + UserGroupInformation proxyUser = + UserGroupInformation.createProxyUser(user, + UserGroupInformation.getLoginUser()); + Token[] newTokens = + proxyUser.doAs(new PrivilegedExceptionAction[]>() { + @Override + public Token[] run() throws Exception { + return FileSystem.get(getConfig()).addDelegationTokens( + UserGroupInformation.getLoginUser().getUserName(), credentials); + } + }); + return newTokens; } // cancel a token @@ -497,13 +618,13 @@ public class DelegationTokenRenewer extends AbstractService { */ private void removeFailedDelegationToken(DelegationTokenToRenew t) { ApplicationId applicationId = t.applicationId; - if (LOG.isDebugEnabled()) - LOG.debug("removing failed delegation token for appid=" + applicationId + - ";t=" + t.token.getService()); - delegationTokens.remove(t); + LOG.error("removing failed delegation token for appid=" + applicationId + + ";t=" + t.token.getService()); + appTokens.get(applicationId).remove(t); // cancel the timer - if(t.timerTask!=null) + if (t.timerTask != null) { t.timerTask.cancel(); + } } /** @@ -543,18 +664,21 @@ public class DelegationTokenRenewer extends AbstractService { } private void removeApplicationFromRenewal(ApplicationId applicationId) { - synchronized (delegationTokens) { - Iterator it = delegationTokens.iterator(); - while(it.hasNext()) { - DelegationTokenToRenew dttr = it.next(); - if (dttr.applicationId.equals(applicationId)) { + rmContext.getSystemCredentialsForApps().remove(applicationId); + Set tokens = appTokens.get(applicationId); + + if (tokens != null && !tokens.isEmpty()) { + synchronized (tokens) { + Iterator it = tokens.iterator(); + while (it.hasNext()) { + DelegationTokenToRenew dttr = it.next(); if (LOG.isDebugEnabled()) { - LOG.debug("Removing delegation token for appId=" + applicationId + - "; token=" + dttr.token.getService()); + LOG.debug("Removing delegation token for appId=" + applicationId + + "; token=" + dttr.token.getService()); } // cancel the timer - if(dttr.timerTask!=null) + if (dttr.timerTask != null) dttr.timerTask.cancel(); // cancel the token @@ -670,17 +794,19 @@ public class DelegationTokenRenewer extends AbstractService { } } - private static class DelegationTokenRenewerAppSubmitEvent extends + static class DelegationTokenRenewerAppSubmitEvent extends DelegationTokenRenewerEvent { private Credentials credentials; private boolean shouldCancelAtEnd; + private String user; public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, - Credentials credentails, boolean shouldCancelAtEnd) { + Credentials credentails, boolean shouldCancelAtEnd, String user) { super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); this.credentials = credentails; this.shouldCancelAtEnd = shouldCancelAtEnd; + this.user = user; } public Credentials getCredentials() { @@ -690,6 +816,10 @@ public class DelegationTokenRenewer extends AbstractService { public boolean shouldCancelAtEnd() { return shouldCancelAtEnd; } + + public String getUser() { + return user; + } } enum DelegationTokenRenewerEventType { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index f65fcdcb3b7..b824df73bb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -54,6 +55,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -61,6 +63,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -74,11 +77,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Assert; @@ -88,16 +96,18 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.base.Supplier; + /** * unit test - * tests addition/deletion/cancellation of renewals of delegation tokens * */ -@SuppressWarnings("rawtypes") +@SuppressWarnings({"rawtypes", "unchecked"}) public class TestDelegationTokenRenewer { private static final Log LOG = LogFactory.getLog(TestDelegationTokenRenewer.class); - private static final Text KIND = new Text("TestDelegationTokenRenewer.Token"); + private static final Text KIND = new Text("HDFS_DELEGATION_TOKEN"); private static BlockingQueue eventQueue; private static volatile AtomicInteger counter; @@ -125,6 +135,9 @@ public class TestDelegationTokenRenewer { @Override public long renew(Token t, Configuration conf) throws IOException { + if ( !(t instanceof MyToken)) { + return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT; + } MyToken token = (MyToken)t; if(token.isCanceled()) { throw new InvalidToken("token has been canceled"); @@ -179,8 +192,10 @@ public class TestDelegationTokenRenewer { dispatcher = new AsyncDispatcher(eventQueue); Renewer.reset(); delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter); - RMContext mockContext = mock(RMContext.class); + RMContext mockContext = mock(RMContext.class); ClientRMService mockClientRMService = mock(ClientRMService.class); + when(mockContext.getSystemCredentialsForApps()).thenReturn( + new ConcurrentHashMap()); when(mockContext.getDelegationTokenRenewer()).thenReturn( delegationTokenRenewer); when(mockContext.getDispatcher()).thenReturn(dispatcher); @@ -290,9 +305,9 @@ public class TestDelegationTokenRenewer { Text user1= new Text("user1"); MyDelegationTokenSecretManager sm = new MyDelegationTokenSecretManager( - DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT, + DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT, 3600000, null); sm.startThreads(); @@ -353,7 +368,7 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true); + delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user"); waitForEventsToGetProcessed(delegationTokenRenewer); // first 3 initial renewals + 1 real @@ -393,7 +408,7 @@ public class TestDelegationTokenRenewer { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true); + delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user"); waitForEventsToGetProcessed(delegationTokenRenewer); delegationTokenRenewer.applicationFinished(applicationId_1); waitForEventsToGetProcessed(delegationTokenRenewer); @@ -429,7 +444,7 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId appId = BuilderUtils.newApplicationId(0, 0); - delegationTokenRenewer.addApplicationAsync(appId, ts, true); + delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user"); int waitCnt = 20; while (waitCnt-- >0) { if (!eventQueue.isEmpty()) { @@ -473,7 +488,7 @@ public class TestDelegationTokenRenewer { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); - delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false); + delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user"); waitForEventsToGetProcessed(delegationTokenRenewer); delegationTokenRenewer.applicationFinished(applicationId_1); waitForEventsToGetProcessed(delegationTokenRenewer); @@ -516,6 +531,8 @@ public class TestDelegationTokenRenewer { DelegationTokenRenewer localDtr = createNewDelegationTokenRenewer(lconf, counter); RMContext mockContext = mock(RMContext.class); + when(mockContext.getSystemCredentialsForApps()).thenReturn( + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getDelegationTokenRenewer()).thenReturn( @@ -540,7 +557,7 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplicationAsync(applicationId_0, ts, true); + localDtr.addApplicationAsync(applicationId_0, ts, true, "user"); waitForEventsToGetProcessed(localDtr); if (!eventQueue.isEmpty()){ Event evt = eventQueue.take(); @@ -593,6 +610,8 @@ public class TestDelegationTokenRenewer { DelegationTokenRenewer localDtr = createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); + when(mockContext.getSystemCredentialsForApps()).thenReturn( + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getDelegationTokenRenewer()).thenReturn( @@ -617,7 +636,7 @@ public class TestDelegationTokenRenewer { // register the tokens for renewal ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); - localDtr.addApplicationAsync(applicationId_0, ts, true); + localDtr.addApplicationAsync(applicationId_0, ts, true, "user"); localDtr.applicationFinished(applicationId_0); waitForEventsToGetProcessed(delegationTokenRenewer); //Send another keep alive. @@ -640,7 +659,7 @@ public class TestDelegationTokenRenewer { private DelegationTokenRenewer createNewDelegationTokenRenewer( Configuration conf, final AtomicInteger counter) { - return new DelegationTokenRenewer() { + DelegationTokenRenewer renew = new DelegationTokenRenewer() { @Override protected ThreadPoolExecutor @@ -664,6 +683,8 @@ public class TestDelegationTokenRenewer { return pool; } }; + renew.setRMContext(TestUtils.getMockRMContext()); + return renew; } private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr) @@ -679,7 +700,12 @@ public class TestDelegationTokenRenewer { public void testDTRonAppSubmission() throws IOException, InterruptedException, BrokenBarrierException { final Credentials credsx = new Credentials(); - final Token tokenx = mock(Token.class); + final Token tokenx = mock(Token.class); + when(tokenx.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN")); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"), + new Text("user1")); + when(tokenx.decodeIdentifier()).thenReturn(dtId1); credsx.addToken(new Text("token"), tokenx); doReturn(true).when(tokenx).isManaged(); doThrow(new IOException("boom")) @@ -688,6 +714,8 @@ public class TestDelegationTokenRenewer { final DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); + when(mockContext.getSystemCredentialsForApps()).thenReturn( + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -699,7 +727,7 @@ public class TestDelegationTokenRenewer { dtr.start(); try { - dtr.addApplicationSync(mock(ApplicationId.class), credsx, false); + dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user"); fail("Catch IOException on app submission"); } catch (IOException e){ Assert.assertTrue(e.getMessage().contains(tokenx.toString())); @@ -716,7 +744,12 @@ public class TestDelegationTokenRenewer { // this token uses barriers to block during renew final Credentials creds1 = new Credentials(); - final Token token1 = mock(Token.class); + final Token token1 = mock(Token.class); + when(token1.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN")); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"), + new Text("user1")); + when(token1.decodeIdentifier()).thenReturn(dtId1); creds1.addToken(new Text("token"), token1); doReturn(true).when(token1).isManaged(); doAnswer(new Answer() { @@ -729,7 +762,9 @@ public class TestDelegationTokenRenewer { // this dummy token fakes renewing final Credentials creds2 = new Credentials(); - final Token token2 = mock(Token.class); + final Token token2 = mock(Token.class); + when(token2.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN")); + when(token2.decodeIdentifier()).thenReturn(dtId1); creds2.addToken(new Text("token"), token2); doReturn(true).when(token2).isManaged(); doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class)); @@ -737,7 +772,9 @@ public class TestDelegationTokenRenewer { // fire up the renewer final DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter); - RMContext mockContext = mock(RMContext.class); + RMContext mockContext = mock(RMContext.class); + when(mockContext.getSystemCredentialsForApps()).thenReturn( + new ConcurrentHashMap()); ClientRMService mockClientRMService = mock(ClientRMService.class); when(mockContext.getClientRMService()).thenReturn(mockClientRMService); InetSocketAddress sockAddr = @@ -751,14 +788,14 @@ public class TestDelegationTokenRenewer { Thread submitThread = new Thread() { @Override public void run() { - dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false); + dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user"); } }; submitThread.start(); // wait till 1st submit blocks, then submit another startBarrier.await(); - dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false); + dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user"); // signal 1st to complete endBarrier.await(); submitThread.join(); @@ -793,4 +830,139 @@ public class TestDelegationTokenRenewer { "Bad header found in token storage")); } } + + + @Test (timeout = 20000) + public void testReplaceExpiringDelegationToken() throws Exception { + conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + // create Token1: + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + // set max date to 0 to simulate an expiring token; + dtId1.setMaxDate(0); + final Token token1 = + new Token(dtId1.getBytes(), + "password1".getBytes(), dtId1.getKind(), new Text("service1")); + + // create token2 + Text userText2 = new Text("user2"); + DelegationTokenIdentifier dtId2 = + new DelegationTokenIdentifier(userText1, new Text("renewer2"), + userText2); + final Token expectedToken = + new Token(dtId2.getBytes(), + "password2".getBytes(), dtId2.getKind(), new Text("service2")); + + final MockRM rm = new TestSecurityMockRM(conf, null) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer() { + @Override + protected Token[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException { + credentials.addToken(expectedToken.getService(), expectedToken); + return new Token[] { expectedToken }; + } + }; + } + }; + rm.start(); + Credentials credentials = new Credentials(); + credentials.addToken(userText1, token1); + + RMApp app = + rm.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + // wait for the initial expiring hdfs token to be removed. + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return !rm.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens().contains(token1); + } + }, 1000, 20000); + + // wait for the new retrieved hdfs token. + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return rm.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens().contains(expectedToken); + } + }, 1000, 20000); + + // check nm can retrieve the token + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm1.registerNode(); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + ByteBuffer tokenBuffer = + response.getSystemCredentialsForApps().get(app.getApplicationId()); + Assert.assertNotNull(tokenBuffer); + Credentials appCredentials = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokenBuffer.rewind(); + buf.reset(tokenBuffer); + appCredentials.readTokenStorageStream(buf); + Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken)); + } + + // YARN will get the token for the app submitted without the delegation token. + @Test + public void testAppSubmissionWithoutDelegationToken() throws Exception { + // create token2 + Text userText2 = new Text("user2"); + DelegationTokenIdentifier dtId2 = + new DelegationTokenIdentifier(new Text("user2"), new Text("renewer2"), + userText2); + final Token token2 = + new Token(dtId2.getBytes(), + "password2".getBytes(), dtId2.getKind(), new Text("service2")); + final MockRM rm = new TestSecurityMockRM(conf, null) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer() { + @Override + protected Token[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException { + credentials.addToken(token2.getService(), token2); + return new Token[] { token2 }; + } + }; + } + }; + rm.start(); + + // submit an app without delegationToken + RMApp app = rm.submitApp(200); + + // wait for the new retrieved hdfs token. + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + return rm.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens().contains(token2); + } + }, 1000, 20000); + + // check nm can retrieve the token + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm1.registerNode(); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + ByteBuffer tokenBuffer = + response.getSystemCredentialsForApps().get(app.getApplicationId()); + Assert.assertNotNull(tokenBuffer); + Credentials appCredentials = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokenBuffer.rewind(); + buf.reset(tokenBuffer); + appCredentials.readTokenStorageStream(buf); + Assert.assertTrue(appCredentials.getAllTokens().contains(token2)); + } }